What is a difference between flatMap and concatMap? It’s really, important to understand a difference. I’ll try to explain it as simple as possible.

flatMap vs concatMap

Just look at following reactor/flux example - it’s just a theoretical example.

class Foo {
    int accumulator;

    Flux<Bar> method(List<Request> requests) {
        return Flux.fromIterable(requests)
                   .concatMap(request -> externalApi.execute(request)) // (1) 
                   .concatMap(response -> repository.store(response)) // (2)
                   .flatMap(response -> { // (3)
                        accumulator = accumulator + 1;
                        return Mono.just(response);
                   }); 
    }
}

In the above example we have a list of requests. We would like to execute a batch of requests one-by-one (1), and then store response into a database (2). What do you think what value we will find in accumulator variable, for 5 requests? …

Cannot tell. Result is not deterministic. It could be 5, it could be 3, or 4. The reason is that flatMap is subscribing publisher as-soon-as-possible. On the other hand contactMap is waiting for a previous publisher, and then subscribe.

The following example will give you a valid accumulator number.

class Foo {
    int accumulator;

    Flux<Bar> method(List<Request> requests) {
        return Flux.fromIterable(requests)
                   .concatMap(request -> externalApi.execute(request)) // (1) 
                   .concatMap(response -> repository.store(response)) // (2)
                   .concatMap(response -> { // (3)
                        accumulator = accumulator + 1;
                        return Mono.just(response);
                   }); 
    }
}

In general, you can have one rule - use flatMap by default, - and concatMap if you don’t want to run job paralleled.

More real-life example

One of the common cases is when you have repository with Serializable isolation, like this:

interface BarRepository extends CrudRepository<Bar, Long> {
    @Modifying
    @Transactional(isolation = Isolation.SERIALIZABLE)
    Mono<Void> save(Bar bar);
}
class Foo {
    Flux<Void> method(List<Request> requests) {
        return Flux.fromIterable(requests)
                   .concatMap(request -> externalApi.execute(request))
                   .flatMap(barResponse -> repository.save(barResponse)) // could throw  `ERROR:  Can't serialize access due to concurrent update` error
    }
}

In such a case we wouldn’t like to execute this save method in parallel. Otherwise, we could get exception like ERROR: Can't serialize access due to concurrent update. What’s the solution? concatMap

class Foo {
    Flux<Void> method(List<Request> requests) {
        return Flux.fromIterable(requests)
                   .concatMap(request -> externalApi.execute(request))
                   .concatMap(barResponse -> repository.save(barResponse)) // now is fine
    }
}