Clever cache for Reactor’s Mono objects

Photo by Olivia Bauso on Unsplash

Data caching is a widespread technique in the programming. It allows to quickly retrieve data without making long-running operations. But there is a problem with caching of data retrieved as result of some long-running operation. If a cache value is missed, it will be requested. If it is requested by a long-running HTTP request or SQL command, the next request for the cache value can leads to multiple HTTP requests / SQL commands again and again. I was looking for a cache implementation which solves this issue in projects using Project Reactor. Project Reactor is built on top of the Reactive Streams Specification — a standard for building reactive applications. You probably know Mono and Flux objects from Spring WebFlux. Project Reactor is the reactive library of choice for Spring WebFlux.

In this article, I will suggest a reactive cache implementation inspired by CacheMono from Reactor’s addons project. We will assume, that the result of a long-running HTTP request or SQL command is represented as aMono object. A Mono object is “materialized” and cached in form of Reactor’s Signal object which represents a Mono. Signals are “dematerialized” to Mono’s if a cache value is requested by the lookup method. Multiple lookups with the same key will retieve the same Mono object, so that a long-running operation is only triggered once!

Let’s create a class CacheMono with three factory methods.

Not yet cached values will be retrieved either by valueSupplier or valuePublisher. The first one uses the “pull” principle and the second one uses the “push” principle to retrieve not yet cached values. That means, either valueSupplier or valuePublisher along with keyExtractor and valueExtractor should be set.

Keep in mind: if you create more than one CacheMono from the same value publisher, you should pass in a Flux stream which caches the history and emits cached items from the beginning to future subscribers. This is necessary because this CacheMono implementation subscribes to the passed in Flux stream in order to fill cache automatically once the source Flux stream publishes values (reactive “push” way vs. “pull” provided by another factory method). The simplest way to create a such Flux stream from existing one would be invoking of cache() method on any Flux stream.

As you could see, we cache instances of CacheMonoValue. This is just a wrapper around Mono or Signal. We can implement this class as an inner class.

We will see in few words, that a Mono value from a long-running operation is cached immediately. The same Mono instance is retrieved for all subsequent lookups with the same key. Once the result of Mono is available, the real value is cached as Signal under the same key. Well, step by step. Look at the lookup method first. It uses a well-known pattern: if value is missed in the cache, the logic within the switchIfEmpty operator gets executed.

In the onCacheMissResume, a missed value will be retieved by the mentioned above valueSupplier or valuePublisher. As I said, the value is cached immediately as a Mono object and is returned for all subsequent lookups. As soon as the value from the long-running operation is available, the logic within monoValue.doOnEach(...) is executed. The value is encapsulated in Signal and can be returned by invokingsignal.get().

Let’s implement some convenient methods as well. Especially methods which return already existing (cached) values from the cache.

The usage of CacheMono class is simple. Just two code snippets from my current project. The first one creates a CacheMono instance by calling CacheMono.fromSupplier.

The second one creates a CacheMono instance by calling CacheMono.fromPublisher.

That’s all. Have fun!

Thoughts on software development. Author of “PrimeFaces Cookbook” and “Angular UI Development with PrimeNG”. My old blog: http://ovaraksin.blogspot.de