Simple implementation of Request-Reply pattern over messaging in Dart
The request-reply pattern over messaging uses a message broker as a communication medium. There are a lot of advantages of this pattern over traditional RESTful communication between client and server. Just to mention a few:
- Communication over messaging increases the system availability. Many message broker offer message delivery guarantees.
- A message broker can buffer last messages which are then available for upcommig subscribers. This feature makes possible for an application to retrieve the last system state on initial load.
- Using an unique identifier, called correlation Id, you can also solve the problem when some requests are processed faster than the others. Belonging together response and request should have identical correlation Id.
The following picture is taken from the Enterprise Integration Patterns.
This is a quite normally publish–subscribe messaging system. The publishers and subscribers are unaware of each other. Messages in publish-subscribe system are routed by a centralised broker.
Currently, I’m developing a mobile app in Flutter and Dart. Due to entire message based communication with backend, we have picked the package mqtt_client — MQTT client for Dart. Then we implemented a service class MessagingService
which can listen to incoming data from the specified topics and send some data over the topics by means of Dart’s mqtt_client. These are two typically tasks in a publish–subscribe messaging system. Below is a code snippet from MessagingService
with appropriate methods for each task (full implementation is not necessary to understand this blog post).
One challenge was how to publish and receive asynchronous messages in a synchronous manner. What we wanted was this call sequence: subscribe to a defined topic for incoming data → send a message to another topic → wait until a response message is arrived on the subscribed topic → evaluate the response and continue with program flow. To accomplish this challenge, we are going to implement aRequestReplyService
which has just one method execute
with two functions as parameters. The first parameter points to a request function and the second one points to a response function.
The service uses a so-called Completer under the hood. Completers are objects that encapsulate the idea of finishing a long-running task at some later point. If you have a Completer
, you can get a Future from it. When a task is finished or aborted, we can resolve or reject this Future
by calling complete(data)
or completeError(error)
respectively. An example of the usage from my project looks like as follows:
Note a keyword await
in front of requestReplyService.execute(...)
. Two private methods _requestTrackPositions
and _trackPositionRetrieved$
invoke the request
and updatesFor
methods from the MessagingService
respectively.
That’s all.