Performing long-running tasks in non-blocking manner with Java’s CompletableFuture and Reactor’s Mono and Flux
In Java, long-running tasks can be run in non-blocking manner by means of CompletableFuture. CompletableFuture
is used for asynchronous programming in Java. Normally, we can asynchronously run a task by the method supplyAsync(Supplier<U> task)
which returns a new CompletableFuture
. The task is running in a thread pool and the returned CompletableFuture
is completed when the task is done. For example:
The method get()
waits if necessary for the future to complete, and then returns the computation result. Useful methods on CompletableFuture
are
- thenAccept(Consumer<? super T> action)
It can run some code after receiving some value from asynchronously running task without returning any value. - thenApply(Consumer<? super T> action)
It can run some code after receiving some value from asynchronously running task and return another value after that. - thenRun(Runnable action)
It can run some code after normal completion without returning any value.
There are also three other variants of these methods: thenAcceptAsync
, thenApplyAsync
, thenRunAsync
. They let run some code in another execution thread, different from the thread defined by the CompletableFuture
. Example:
ACompletableFuture
can also be explicitly completed using complete(T value)
or completeAsync(Supplier<? extends T> supplier)
. T
is the type of value returned by get()
. We can create a method which creates CompletableFuture
and returns it immediately. Another threads or asynchron methods are responsible for executing some computations and completing the CompletableFuture
with an appropriate result.
Let’s write a LongRunningTaskService
expecting a task in form of Java’s Supplier
and returning a CompletableFuture
.
We will see a usage further, no worries. For now, let’s discuss the Project Reactor with the Sping WebFlux. Project Reactor is a reactive library for building non-blocking applications which is based on the Reactive Streams Specification — a concept, adopted by various implementations such as RxJava, Java‘ Flow classes, Akka Streams. At first glance, Reactive Streams and Java 8 Streams look pretty similar. But there is an important difference: Reactive Streams are push-based and Java 8 Streams are pull based. In Java 8 Streams you normally iterate over collections, pull values, apply operators like filter
, map
, flatMap
, etc., and terminate the processing by collecting or grouping result values. Reactive Streams have similar operators, but they offer Publishers that notify Subscribers of newly available values as they come. In the Project Reactor, there are two heavily used publishers:
Mono<T>
. It returns 0 or 1 element of typeT
.Flux<T>
. It returns 0…N elements of typeT
. The count of published elements can be potentially infinite.
Just a simple example how to create and combine two Flux streams with a zip
operator.
Spring WebFlux is a non-blocking framework built on Project Reactor that makes it possible to build reactive applications on top of HTTP layer. It does not require the Servlet API. Per default, Spring WebFlux uses an embedded HTTP/2 enabled server Reactor Netty. The major difference to Spring MVC — the REST controller’s methods return the reactive types Mono
and Flux
. An example from my current project:
A client that initiates a request, doesn’t get blocked with
Mono
andFlux
. The corresponding request thread goes back into the thread pool. Once the result of the long-running operation is available, it will be pushed to the client.
But back to our service class LongRunningTaskService
. Let’s extend it with two additional methods returning Mono
and Flux
respectively.
What is the sink
in the consumer function (sink) -> {...}
? This is an emitter of type FluxSink
which is used for emitting values to subscribers of Flux
stream created by the Flux.create()
. A full documentation of the create()
method can be found here. The next code snippet demonstrates the usage of all three methods.
Output:
Project Reactor solve many programming problems in an elegant way. For instance, you can implement the “retry pattern with exponential back-off” (design pattern for resilience). In the next blog post, I will suggest a reactive solution how to load remote data in parallel from paginated REST API (lazy data pagination). Stay tuned!