Spring Reactor common mistakes

I recently gotten fond of Spring Reactor, reactive java library used by Spring 5.0. There is a lot of resources out there which describe it, so I’m not going to repeat the internet here. But in short it makes any asynchronous development a LOT easier and I dare to say its the first library which makes it almost trivial. There is a catch, learning curve is steep. And when I mean steep I mean it took me at least half a year (senior java dev) to wrap my head around it. Me and my colleagues made some stupid mistakes so here are 4 so far, will add more in the future.

Subscribe is not blocking current thread (usually).

Flux.range(1, 10).subscribe(integer -> logger.info(integer.toString()))

I have seen this many times and most people did this mistake at least once, most often in tests. It is not good to rely on the fact that subscribe is a blocking call. It may be, but only if somewhere in operator chain we do not switch threads with subcribeOn or publishOn. In that case the original thread is released and will continue to execute code after subscribe. As a general rule of thumb, use block (toIterable, toStream) if you want to block execution of current thread.

Be careful with distinct

Distinct is a useful operator, which enables you to quickly filter out duplicates from stream.

Flux.range(1, 10)
.mergeWith(Flux.range(1,10))
.distinct()
.toIterable().forEach(this::log);

But in the end its backed by a set which accumulates already emitted values. So be careful what you put there as it may lead to OutOfMemory exceptions real fast for long running streams processing large objects. Its always better to use distinct with limited value, e.g. distinct(Data:getId);

toStream – is not able to cancel the subscription

Consider the code below:

try {
    Flux.interval(Duration.ofMillis(100))
            .map(aLong -> new Random().nextInt(1000))
            .doOnNext(this::log)
            .toStream()
            .forEach(value -> {
                if (value > 900) {
                    throw new UnsupportedOperationException("NOOO");
                }
            });
} catch (Exception e) {
    log("Exception caught, but the original flux continues on");
    Thread.sleep(1000);
}

We have subscribed to a flux with toStream, but then we have thrown an exception when processing values from the stream. One would thought that the subscribed flux would be cancelled along with the stream, but that is sadly not the case and numbers keep coming. I have discussed this with authors and they say there is no way to keep the subscription object in java stream class (they are of course right). So be careful with this and move exception throwing code into the flux, not to the stream.

Be careful with recursion

Using recursion is sometimes the easiest way to go, but note that Reactor does not support tail recursion and stack overflow tends to occur a little bit faster and in unexpected places. For example following code with maxDepth=10000 throws StactOverflow exception when flux processes onComplete signal. So use it with limited depth or use a loop.

public void testRecursion(int maxDepth) throws InterruptedException {
    Function<Integer, Integer> producer = iteration -> {
        if (iteration > maxDepth) {
            return null;
        }
        return Thread.currentThread().getStackTrace().length;
    };
    recursiveFlux(1, producer)
            .toIterable()
            .forEach(this::logReceived);
}

private Flux<String> recursiveFlux(int iteration, Function<Integer, Integer> producer) {
    return Flux.defer(() -> {
        Integer result = producer.apply(iteration);

        if (result == null) {
            return Flux.empty();
        } else {
            return Flux.concat(
                    Flux.just(String.format("%d - stack %d", iteration, result)),
                    recursiveFlux(iteration + 1, producer));
        }
    });
}