Building Microservices On .NET Core – Part 7 Transactional Outbox with RabbitMQ

This is the seventh article in our series about building microservices on .NET Core. In the first article we introduced the series and prepared the plan: business case and solution architecture. In the second article we described how you can structure internal architecture of one microservice using CQRS pattern and MediatR library. In the third article we described the importance of service discovery in microservice based architecture and presented practical implementation with Eureka. In the fourth part we presented how you can build API Gateways for your microservices with Ocelot. Fifth part was dedicated to implementation of data access with Marten library. In the last article we showed you how you can combine SignalR and RabbitMQ to build real time server-client communication. Source code for complete solution can be found on our GitHub.

Now our microservices communicate via publishing information about important business events like: product being activated or discontinued, policy being sold or terminated, payment being made or missed. It all looks great, but we have missed some important thing – data consistency and transaction boundary. When we decided to go with microservices way, where each microservice is an independent application with its own datastore, we left the safe land of ACID database transaction we had in our monolith solutions.

No more ACID?

Let’s take a look at the following piece of code

public async Task Handle(CreatePolicyCommand request, CancellationToken cancellationToken)
{
	using (var uow = uowProvider.Create())
	{
		var offer = await uow.Offers.WithNumber(request.OfferNumber);
		var customer =  ….
		var policy = offer.Buy(customer);

		uow.Policies.Add(policy);
		await uow.CommitChanges();  

		await eventPublisher.PublishMessage(PolicyCreated(policy));

		return new CreatePolicyResult
		{
			PolicyNumber = policy.Number
		};    
	}
}

As you can see we commit changes to unit of work before calling event publisher PublishMessage method. Why is that?
Imagine we change the order of operations, we add new domain objects to unit of work, we publish message to message broker (in our case RabbitMQ) and finally we commit changes which results in execution actual INSERT and UPDATE statements against our database.
If anything goes wrong – database connection issue, database constraint violation, deadlock or anything – then our database transaction is rolled back and no changes get persisted. That’s cool, but what about the messages we send using our messages broker? They were already sent leaving the whole system in inconsistent state. In this case our PolicyService was unable to create and store policy with given number, but all other microservices were informed that such policy was created. Not good.
So we are back with the original order of things. Is it any better? Now, if anything goes wrong during saving changes to the database, an exception is thrown, changes are rolled back and publisher is not executed. That’s good. But what happens if broker is unavailable or we have a network problem and message publishing causes exception?
Now we have our new policy in our PolicyService database, but the rest of the world does not know about it, so the state of the system as a whole won’t get eventually consistent.
What can we do about it? I know, RabbitMQ runs on OTP and is written in Erlang – it cannot happen. But even Erlang programs have bugs and are not 100% immune to configuration and network errors or hardware failures.
We can start with wrapping code responsible for sending with some kind of retry mechanism using library like Polly, but this doesn’t solve the issue completely, it just decreases the probability of failure.
There is a better solution – using an outbox pattern.

Outbox pattern

Outbox pattern is simple, yet powerful tool. Let’s see how does it work and what are the consequences of applying it.
With outbox, instead of sending a message over the wire to message broker, we save a message to our microservice database as part of current business transaction. This way we achieve internal consistency inside our service. No messages will be sent if a transaction is rolled back. Now when our message is stored in outbox, we need a process that will forward it to message broker. That’s the second part of our Outbox pattern –  a process that runs asynchronously and tries to send messages over the wire to message broker. This process reads message from outbox in the same order they were created, then it tries to send it out using message broker, if a message is successfully sent, then the process deletes it from database (or marks it as processed).
What are the consequences of this approach: as already mentioned, we do not send messages if transaction is rolled back, this way we do not send “ghost” messages related to things that did not actually happen, more importantly temporary message broker unavailability or network issue does not bring our business transaction down. This way we achieve at least once delivery guarantee. This means that we are sure that our message will be sent at least once. This means that in very rare situations our message can be sent twice or more times. This can happen only if message is successfully sent to message broker but not deleted from our microservice database. I think this is much less probable than other failures previously described. In order for our system to stay consistent it is receiver side responsibility to process messages in a way that provides idempotency. This means that our receivers must be able to detect that given message was already processed or must be able to process it in a way that does not violate data consistency. In our example when PaymentService receives a message concerning new policy being sold, it checks if account for given policy already exists before creating a new one.

RabbitMQ and NHibernate Implementation

First step is to store message instead of immediately sending it using message broker. In order to do this we must create a class that will hold message and its metadata. We will store instances of this class as part of our business transaction.

public class Message
{
	public virtual long? Id { get; protected set; }
	public virtual string Type { get; protected set; }
	public virtual string Payload { get; protected set; }

	public Message(object message)
	{
	    Type = message.GetType().FullName + ", " + message.GetType().Assembly.GetName().Name;
	    Payload = JsonConvert.SerializeObject(message);
	}

	public virtual object RecreateMessage() => JsonConvert.DeserializeObject(Payload, System.Type.GetType(Type));
}

Our message class stores data to be sent as JSON serialized to a string. It also stores type of message so it can be recreated. Remember! We are using class names as routing keys in RabbitMQ, so must ensure we don’t lose this information.

We also need to define NHibernate mapping for this class in Message.hbm.xml

<class  name="Message" table="outbox_messages">
	<id name="Id" column="id">
	    <generator class="identity" />
	</id>
	<property name="Type" column="type" length="500"/>
	<property name="Payload" column="json_payload" length="8000"/>
</class>

Now we need to get rid of RabbitEventPublisher and provide new implementation of IEventPublisher that will store Messages in database.

public class OutboxEventPublisher : IEventPublisher
{
        private readonly ISession session;

        public OutboxEventPublisher(ISession session)
        {
            this.session = session;
        }

        public async Task PublishMessage(T msg)
        {
            await session.SaveAsync(new Message(msg));
        }
}

In order to be able to inject current NHibernate session, we needed to modify our UnitOfWork implementation and our NHibernate installer. The most important lines are:

services.AddSingleton(cfg.BuildSessionFactory());

services.AddScoped(s => s.GetService().OpenSession());

services.AddScoped<IUnitOfWork, UnitOfWork>();

Here we register a session factory singleton, and factory method that creates scoped NHibernate session instance. This way we share our session between UnitOfWork used to manage business entities persistence and our OutboxEventPublisher
With this setup our messages are stored as part of the same business transactions, and won’t be saved in database if business transaction is rolled back.

Now we are ready to implement the second part – sending messages out of the outbox.

We could use Hangfire or Quartz.net schedulers to implement our async process, but I think it is a bit too for our small solution.
Instead we will take advantage of one of new .NET core features – IHostedService.
Hosted services are a way to implement background jobs in ASP.NET Core. You can read more about in the official docs.

public class OutboxSendingService : IHostedService
{
    private readonly Outbox outbox;
    private Timer timer;
    private static object locker = new object();

    public OutboxSendingService(Outbox outbox)
    {
        this.outbox = outbox;
    }


    public Task StartAsync(CancellationToken cancellationToken)
    {
        timer = new Timer
        (
            PushMessages,
            null,
            TimeSpan.Zero,
            TimeSpan.FromSeconds(1)
        );
        return Task.CompletedTask;
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        timer?.Change(Timeout.Infinite, 0);
        return Task.CompletedTask;
    }
    
    
    private async void PushMessages(object state)
    {
        var hasLock = false;

        try
        {
            Monitor.TryEnter(locker, ref hasLock);

            if (!hasLock)
            {
                return;
            }
            
            await outbox.PushPendingMessages();

        }
        finally
        {
            if (hasLock)
            {
                Monitor.Exit(locker);
            }
        }
    }
}

Here we have our hosted service that uses timer to fire outbox processing every 1 second (for production usage you can move this setting into application configuration). PushMessage function is called every 1 second, so we need to ensure that we do not run it twice at the same time, to avoid sending the same message twice. We used Monitor to synchronize execution of code responsible for actual sending. This code is moved to Outbox class presented below.

public class Outbox
{
    private readonly IBusClient busClient;
    private readonly ISessionFactory sessionFactory;
    private readonly OutboxLogger logger;

    public Outbox(IBusClient busClient, ISessionFactory sessionFactory, ILogger logger)
    {
        this.busClient = busClient;
        this.sessionFactory = sessionFactory;
        this.logger = new OutboxLogger(logger);
    }


    public async Task PushPendingMessages()
    {
        var messagesToPush = FetchPendingMessages();
        logger.LogPending(messagesToPush);

        foreach (var msg in messagesToPush)
        {
            if (!await TryPush(msg))
                break;
        }
    }

    private IList FetchPendingMessages()
    {
        List messagesToPush;
        using (var session = sessionFactory.OpenStatelessSession())
        {
            messagesToPush = session.Query()
                .OrderBy(m => m.Id)
                .Take(50)
                .ToList();
        }

        return messagesToPush;
    }

    private async Task TryPush(Message msg)
    {
        using (var session = sessionFactory.OpenStatelessSession())
        {
            var tx = session.BeginTransaction();
            try
            {
                await PublishMessage(msg);
                
                session
                    .CreateQuery("delete Message where id=:id")
                    .SetParameter("id", msg.Id)
                    .ExecuteUpdate();
                
                tx.Commit();
                logger.LogSuccessPush();
                return true;
            }
            catch (Exception e)
            {
                logger.LogFailedPush(e);
                tx?.Rollback();
                return false;
            }
        }
    }

    private async Task PublishMessage(Message msg)
    {
        var deserializedMsg = msg.RecreateMessage();
        var messageKey = deserializedMsg.GetType().Name.ToLower();
        await busClient.BasicPublishAsync(deserializedMsg,
            cfg =>
            {
                cfg.OnExchange("lab-dotnet-micro").WithRoutingKey(messageKey);
            });
    }

}

When Outbox PushPendingMessages method is called, it reads 50 oldest messages and tries to send each of them. For each message we open a new database transaction, we try to send message using RawRabbit and if this succeeds then we delete message from database. If anything goes wrong, we stop processing and wait for next time timer calls our code so we could try again. 

Now, to wrap it up, we need to modify RawRabbitInstaller to install all new components that we just developed.

services.AddScoped<IEventPublisher,OutboxEventPublisher>();
services.AddSingleton<Outbox.Outbox>();
services.AddHostedService<OutboxSendingService>();

Summary

Microservices are not a free lunch. Once we step out of our comfort zone guarded by ACID compliant relational databases there are many issues we have to tackle.

In this post we presented Outbox pattern and one of its possible implementations. Outbox pattern helps us maintain consistency when using async event based communication between services and using RDBMS to manage internal state of given microservice. There are of course multiple ways this pattern can be implemented. If you are interested you can checkout Entity Framework based implementation built around DDD aggregates concept in Kamil Grzybek’s post.
Also note that there are production ready and battle tested frameworks like NServiceBus that provide this functionality out of the box.