What is a difference between flatMap
and concatMap
? It’s really, important to understand a difference. I’ll try to explain it as simple as possible.
flatMap vs concatMap
Just look at following reactor/flux example - it’s just a theoretical example.
class Foo {
int accumulator;
Flux<Bar> method(List<Request> requests) {
return Flux.fromIterable(requests)
.concatMap(request -> externalApi.execute(request)) // (1)
.concatMap(response -> repository.store(response)) // (2)
.flatMap(response -> { // (3)
accumulator = accumulator + 1;
return Mono.just(response);
});
}
}
In the above example we have a list of requests. We would like to execute a batch of requests one-by-one (1)
, and then store response into a database (2)
.
What do you think what value we will find in accumulator
variable, for 5 requests? …
Cannot tell. Result is not deterministic. It could be 5, it could be 3, or 4. The reason is that flatMap
is subscribing publisher as-soon-as-possible.
On the other hand contactMap
is waiting for a previous publisher, and then subscribe.
The following example will give you a valid accumulator number.
class Foo {
int accumulator;
Flux<Bar> method(List<Request> requests) {
return Flux.fromIterable(requests)
.concatMap(request -> externalApi.execute(request)) // (1)
.concatMap(response -> repository.store(response)) // (2)
.concatMap(response -> { // (3)
accumulator = accumulator + 1;
return Mono.just(response);
});
}
}
In general, you can have one rule - use flatMap
by default, - and concatMap
if you don’t want to run job paralleled.
More real-life example
One of the common cases is when you have repository with Serializable isolation, like this:
interface BarRepository extends CrudRepository<Bar, Long> {
@Modifying
@Transactional(isolation = Isolation.SERIALIZABLE)
Mono<Void> save(Bar bar);
}
class Foo {
Flux<Void> method(List<Request> requests) {
return Flux.fromIterable(requests)
.concatMap(request -> externalApi.execute(request))
.flatMap(barResponse -> repository.save(barResponse)) // could throw `ERROR: Can't serialize access due to concurrent update` error
}
}
In such a case we wouldn’t like to execute this save
method in parallel. Otherwise, we could get exception like ERROR: Can't serialize access due to concurrent update
.
What’s the solution? concatMap
class Foo {
Flux<Void> method(List<Request> requests) {
return Flux.fromIterable(requests)
.concatMap(request -> externalApi.execute(request))
.concatMap(barResponse -> repository.save(barResponse)) // now is fine
}
}