Micronaut with RabbitMQ integration
To 1.1.0.M1 version Micronaut did not have integration with RabbitMQ. We could use libraries that enabled integration with Apache Kafka – we wrote about it in one of our previous articles.
Apache Kafka is a great tool, highly scalable with lots of great features related to events stream processing. But not all applications require such superpowers. I think that if the project needs a “simple” message bus, then RabbitMQ is a simpler solution and, above all, easier to deploy and manage in production.
Together with Micronaut 1.1.0.M1 a complementary library has been published that supports the use of RabbitMQ in a very simple declarative way – analogous to the one provided for Apache Kafka in previous Micronaut versions.
In the following article we want to show how easy it is to integrate with RabbitMQ using Micronaut and micronaut-rabbitmq library.
Add RabbitMQ support to our project
We want to use RabbitMQ as message broker in our simplified insurance sales system made in a microservice architecture. Comprehensive guide describing exactly the architecture, applied design patterns and technologies can be found on our blog in article Building Microservices with Micronaut.
You can see code for this post on rabbitmq branch on GitHub.
Please find below, updated architecture diagram with RabbitMQ instead of Kafka:
Policy Service publish two types of messages (PolicyRegisteredEvent and PolicyTerminatedEvent). Other microservices (Documents Service, Payment Service, Policy Search Service) subscribe on this.
The first thing we need to do is add dependency to pom.xml:
<dependency>
<groupId>io.micronaut.configuration</groupId>
<artifactId>micronaut-rabbitmq</artifactId>
<version>LATEST</version>
<scope>compile</scope>
</dependency>
Create exchanges, queues and bindings
At the beginning I would like to explain some basic concepts related to RabbitMQ: exchange, queue, binding and routing key. Exchange is just entry-point into RabbitMQ. Exchange can be one of specific type: fanout, direct, topic and headers (more info). It is not related to the specific message or message type. Multiple types can be published to one exchange and multiple applications can publish to the same exchange. An exchange dispatch messages to queues that have the correct bindings defined. Queue is an endpoint that applications can subscribe to. Binding is the connection between exchange and queue. It consists of three things: exchange, queue and routing key. Routing key is an attribute of the message, thanks to which RabbitMQ knows where the message should be sent.
Exchanges, bindings and queues must already exist before can publish/consume messages. You can read in docs:
The purpose of this library is to consume and publish messages with RabbitMQ. Any setup of queues, exchanges, or the binding between them is not done automatically. If your requirements dictate that your application should be creating those entities, then a BeanCreatedEventListener can be registered to intercept the ChannelPool to perform operations with the Java API directly.
You can do it in our application, as described above, or do it outside of the application. RabbitMQ allows you to do it through the REST API. We chose second option and wrote a script rabbit-create-bindings-queues.sh
that creates exchange micronaut-microservices-poc
, topics policy-registered
and policy-terminated
and bindings by routing key.
#!/usr/bin/env bash
# Create exchange micronaut-microservices-poc
curl -i -u guest:guest -H "content-type:application/json" \
-XPUT -d'{"type":"topic","durable":true}' \
http://localhost:15672/api/exchanges/%2f/micronaut-microservices-poc
# Create topic policy-registered
curl -i -u guest:guest -H "content-type:application/json" \
-XPUT -d'{"durable":true}' \
http://localhost:15672/api/queues/%2f/policy-registered
# Create binding to policy-registered
curl -i -u guest:guest -H "content-type:application/json" \
-XPOST -d'{"routing_key":"policy-registered"}' \
http://localhost:15672/api/bindings/%2f/e/micronaut-microservices-poc/q/policy-registered
# Create topic policy-terminated
curl -i -u guest:guest -H "content-type:application/json" \
-XPUT -d'{"durable":true}' \
http://localhost:15672/api/queues/%2f/policy-terminated
# Create binding to policy-terminated
curl -i -u guest:guest -H "content-type:application/json" \
-XPOST -d'{"routing_key":"policy-registered"}' \
http://localhost:15672/api/bindings/%2f/e/micronaut-microservices-poc/q/policy-registered
The same can be done through Rabbit Management UI, but thanks to the script, we can do it automatically, for example in our CI / CD processes.
Publish message
We must create a RabbitMQ client that produces messages. To do this you should to create an interface with annotation @RabbitClient
and method with @Binding
annotation. Thanks @Binding
annotation we can specify the routing key of the message.
import io.micronaut.configuration.rabbitmq.annotation.Binding;
import io.micronaut.configuration.rabbitmq.annotation.RabbitClient;
import pl.altkom.asc.lab.micronaut.poc.policy.service.api.v1.events.PolicyRegisteredEvent;
import pl.altkom.asc.lab.micronaut.poc.policy.service.api.v1.events.PolicyTerminatedEvent;
@RabbitClient("micronaut-microservices-poc")
public interface EventPublisher {
@Binding("policy-registered")
void policyRegisteredEvent(PolicyRegisteredEvent event);
@Binding("policy-terminated")
void policyTerminatedEvent(PolicyTerminatedEvent event);
}
This client sends information to micronaut-microservices-poc
exchange. The queues are connected to exchange via bindings.
Subscribe for messages
To subscribe for a message, we need to create a class with the @RabbitListener
annotation and a @Queue
annotation method. The @Queue
parameter is the name of the queue we want to use.
Example from Policy Search Service:
import io.micronaut.configuration.rabbitmq.annotation.Queue;
import io.micronaut.configuration.rabbitmq.annotation.RabbitListener;
import pl.altkom.asc.lab.micronaut.poc.policy.service.api.v1.events.PolicyTerminatedEvent;
@RabbitListener
public class PolicyTerminatedListener extends AbstractPolicyListener {
@Queue("policy-terminated")
void onPolicyTerminated(PolicyTerminatedEvent event) {
saveMappedPolicy(event.getPolicy());
}
}
Our simple example shows only basic features that Micronaut offers. With Micronaut RabbitMQ support you can also:
- Add message headers,
- Add RabbitMQ properties on producers and consumers (thanks this we can configure content type, encoding, expiration etc),
- Change default serializers,
- Create queues/exchanges,
- Create a consumer thread pool configuration.
Summary
In the article we showed how to publish a message on a message bus and how to consume it using Micronaut and RabbitMQ. This is something that I missed so far in Micronaut. In newest Micronaut version we can do this in simple way, without hundreds of lines of code.
If you want to know more about the possibilities of integration, check out the documentation.