Clever cache for Reactor’s Mono objects
Data caching is a widespread technique in the programming. It allows to quickly retrieve data without making long-running operations. But there is a problem with caching of data retrieved as result of some long-running operation. If a cache value is missed, it will be requested. If it is requested by a long-running HTTP request or SQL command, the next request for the cache value can leads to multiple HTTP requests / SQL commands again and again. I was looking for a cache implementation which solves this issue in projects using Project Reactor. Project Reactor is built on top of the Reactive Streams Specification — a standard for building reactive applications. You probably know Mono
and Flux
objects from Spring WebFlux. Project Reactor is the reactive library of choice for Spring WebFlux.
In this article, I will suggest a reactive cache implementation inspired by CacheMono
from Reactor’s addons project. We will assume, that the result of a long-running HTTP request or SQL command is represented as aMono
object. A Mono
object is “materialized” and cached in form of Reactor’s Signal
object which represents a Mono
. Signals are “dematerialized” to Mono’s if a cache value is requested by the lookup
method. Multiple lookups with the same key will retieve the same Mono
object, so that a long-running operation is only triggered once!
Let’s create a class CacheMono
with three factory methods.
Not yet cached values will be retrieved either by valueSupplier
or valuePublisher
. The first one uses the “pull” principle and the second one uses the “push” principle to retrieve not yet cached values. That means, either valueSupplier
or valuePublisher
along with keyExtractor
and valueExtractor
should be set.
Keep in mind: if you create more than one CacheMono
from the same value publisher, you should pass in a Flux
stream which caches the history and emits cached items from the beginning to future subscribers. This is necessary because this CacheMono
implementation subscribes to the passed in Flux stream in order to fill cache automatically once the source Flux stream publishes values (reactive “push” way vs. “pull” provided by another factory method). The simplest way to create a such Flux
stream from existing one would be invoking of cache()
method on any Flux
stream.
As you could see, we cache instances of CacheMonoValue
. This is just a wrapper around Mono
or Signal
. We can implement this class as an inner class.
We will see in few words, that a Mono
value from a long-running operation is cached immediately. The same Mono
instance is retrieved for all subsequent lookups with the same key. Once the result of Mono
is available, the real value is cached as Signal
under the same key. Well, step by step. Look at the lookup
method first. It uses a well-known pattern: if value is missed in the cache, the logic within the switchIfEmpty
operator gets executed.
In the onCacheMissResume
, a missed value will be retieved by the mentioned above valueSupplier
or valuePublisher
. As I said, the value is cached immediately as a Mono
object and is returned for all subsequent lookups. As soon as the value from the long-running operation is available, the logic within monoValue.doOnEach(...)
is executed. The value is encapsulated in Signal
and can be returned by invokingsignal.get()
.
Let’s implement some convenient methods as well. Especially methods which return already existing (cached) values from the cache.
The usage of CacheMono
class is simple. Just two code snippets from my current project. The first one creates a CacheMono
instance by calling CacheMono.fromSupplier
.
The second one creates a CacheMono
instance by calling CacheMono.fromPublisher
.
That’s all. Have fun!