Building Microservices On .NET Core – Part 6 Real time server client communication with SignalR and RabbitMQ

This is the sixth article in our series about building microservices on .NET Core. In the first article we introduced the series and prepared the plan: business case and solution architecture. In the second article we described how you can structure internal architecture of one microservice using CQRS pattern and MediatR library. In the third article we described the importance of service discovery in microservice based architecture and presented practical implementation with Eureka. In the fourth part we presented how you can build API Gateways for your microservices with Ocelot. Fifth part was dedicated to implementation of data access with Marten library.

In this article we are going to show you how you can combine SignalR and RabbitMQ to build real time server-client communication. We will extend our insurance sales portal with chat service. This chat will let insurance agents communicate with each other. We also will use this chat service to send users information about certain business events like new product availability, successful sale or insurance product or tariff changes.

Source code for complete solution can be found on our GitHub.

What we are going to build?

To our existing microservice based insurance sales system we are going to add a chat-service.

This chat service will have two functions:

  • it will allow insurance agents to talk to each other using our app,
  • it will allow our system to send notification when important business event happens, examples of such events are new product or tariff introduction, successful sale, commission calculation or payment receival.

Rabbit MQ

We start with regular .NET Web API application, but this time we are going to use version 2.2, which will have significant consequences. As usual we add MediatR. Before we can start chat implementation we need to secure our service and setup CORS.

For securing access to our new service we are going to use JWT token based approach.
We need to set it up in the Startup class by adding code to Configure and ConfigureServices methods. Below is a code snippet showing key configuration parts:

var appSettings = appSettingsSection.Get();
var key = Encoding.ASCII.GetBytes(appSettings.Secret);

services
	.AddAuthentication(x =>
	{
	    x.DefaultAuthenticateScheme = JwtBearerDefaults.AuthenticationScheme;
	    x.DefaultChallengeScheme = JwtBearerDefaults.AuthenticationScheme;    
	})
	.AddJwtBearer(x =>
	{
	    x.TokenValidationParameters = new TokenValidationParameters
	    {
		ValidateIssuerSigningKey = true,
		IssuerSigningKey = new SymmetricSecurityKey(key),
		ValidateIssuer = false,
		ValidateAudience = false,
		ValidateActor = false
	    };
	    x.Events = new JwtBearerEvents
	    {
		OnMessageReceived = context =>
		{
		    var accessToken = context.Request.Query["access_token"];

		    var path = context.HttpContext.Request.Path;
		    if (!string.IsNullOrEmpty(accessToken) &&
		        (path.StartsWithSegments("/agentsChat")))
		    {
		        context.Token = accessToken;
		    }
		    return Task.CompletedTask;    
		}
	    };
	});

This is typical JWT setup with secret stored in config file. The only unusual thing is OnMessageReceived which is needed for SignalR to work properly (you can find more info here).

Next thing is CORS setup. With .NET Core 2.2 we have a breaking change here. We are no longer allowed to have public API (the one with AllowAnyOrigin) combined with AllowCredentials.
That’s why we need to specify all our allowed clients in configuration. With clients listed in configuration:

"AppSettings": {
    "AllowedChatOrigins" : ["http://localhost:8080"]
}

We can now configure CORS:

services.AddCors(opt => opt.AddPolicy("CorsPolicy",
	builder =>
	{
		Builder
			.AllowAnyHeader()
			.AllowAnyMethod()
			.AllowCredentials()
			.WithOrigins(appSettingsSection.Get().AllowedChatOrigins);
	}
));

Building chat service with SignalR

What is SignalR?

SignalR is a library that allows .NET developers to add real-time communication to web applications. It has the ability to receive messages from connected clients and to send notification from server to clients.
Apart from server side implementation you can find client implementation for most popular platforms: JavaScript, Java, .NET and .NET Core.

Adding to project

Now we can add SignalR to our project. First step is adding it to Startup class ConfigureServices method:

services.AddSignalR();

Next we need to add a hub, which is a central point of communication between the server and clients. This is pretty simple. You just have to add a class that extends Hub.

[Authorize(AuthenticationSchemes = JwtBearerDefaults.AuthenticationScheme)]
public class AgentChatHub : Hub

We also add Authorize attribute and specify which authentication schema SignalR should use.

We also need to provide a service that will give SignalR “user name” based on currently user principal. For this purpose we need to implement IUserIdProvider and register this implementation.

public class NameUserIdProvider : IUserIdProvider
{
	public string GetUserId(HubConnectionContext connection)
	{
		return connection.User?.Identity?.Name;
	}
}

//Startup.cs ConfigureServices
services.AddSingleton<IUserIdProvider, NameUserIdProvider>();

Last step is adding URL mapping for our hub. We need to add it to Startup class Configure method:

app.UseSignalR(routes =>
{
	routes.MapHub("/agentsChat");
});

Now our hub can be accessed from the external world by clients that provide valid JWT token and are listed in AllowedChatOrigins configuration setting.

Hub

SignalR hub allows clients to send messages to server and the other way round. Hub maintains list of connected clients and allows server side code to “execute” methods on clients.
We add a method SendMessage to our AgentChatHub now. This method will be called by client SPA application when one of the agents wants to send a message to all currently connected users.

public async Task SendMessage(string message)
{
	var avatar = Context.User?.Claims?.FirstOrDefault(c=>c.Type=="avatar")?.Value;
            
	await Clients.All.SendAsync("ReceiveMessage", Context.User?.Identity?.Name, avatar, message);
}

This method extract some information from authenticated user data and calls all clients “ReceiveMessage”, passing user name, avatar and message.

Hub also reacts to events like user connected or disconnected. Below you can find simple methods that broadcast information about such events to all users.

public override async Task OnConnectedAsync()
{
	await Clients.Others.SendAsync("ReceiveNotification", $"{Context.User?.Identity?.Name} join chat");
}

public override async Task OnDisconnectedAsync(Exception exception)
{
	await Clients.Others.SendAsync("ReceiveNotification", $"{Context.User?.Identity?.Name} left chat");
}

Implementing client with VueJS

To use SignalR from VueJS we need to install appropriate package using npm.

"dependencies": {
	"@aspnet/signalr": "^1.1.2",
	…
}

Now we can add implement sending messages and handling the arrival of new messages from server. Most of the interesting code is located in Chat.vue component.

We need to connect to hub. This happens in created handler:

this.hubConnection
	.start()
	.then(()=>console.info("connected to hub"))
	.catch(err => console.error(err));

We also need to register listeners for server sent events

this.hubConnection.on("ReceiveMessage",(usr, avatar,msg) => {
	this.appendMsgToChat(usr,avatar,msg);
});

this.hubConnection.on("ReceiveNotification", msg => {
	this.appendAlertToChat(msg);
});

The only missing part is a method to send a message to our hub:

send() {       
	this.hubConnection.invoke("SendMessage", this.message);
	this.message = '';
}

Here is a screenshot from a running application.

application

Pub-sub implementation with RabbitMQ

RabbitMQ is lightweight, easy to use and operate, open source message broker. ASC LAB Team used it in many projects for at least 6 years. It can be used to integrate system components asynchronously with publisher-subscriber pattern or can be also used as an RPC mechanism.
For using RabbitMQ with .NET Core we recommend RawRabbit library.
Before we start implementing our pub-sub solution we must install RabbitMQ. Install instructions for most popular operating systems can be found on the official documentation page.

One you have RabbitMQ up and running we can start implementing our solution.

Publishing events

We are going to add functionality of publishing domain events to PolicyService. When someone buys or terminates policy we are going to send a message to RabbitMQ so other microservices interested in such events can react. For example PaymentService may create an account for policy that was just sold and close the account, when policy is terminated.
In this post we will see how ChatService can subscribe to event of new policy being sold and how it uses SignalR to notify currently logged users about such case.

To implement events publishing with RawRabbit we need to add the following NuGet packages.

<PackageReference Include="RawRabbit.DependencyInjection.ServiceCollection" Version="2.0.0-rc5" />
<PackageReference Include="RawRabbit.Operations.Tools" Version="2.0.0-rc5" />

Now we can configure the library. We encapsulated configuration in RawRabbitInstaller class.

services.AddRawRabbit(new RawRabbitOptions
{
	ClientConfiguration = new RawRabbit.Configuration.RawRabbitConfiguration
	{
	    Username = "guest",
	    Password = "guest",
	    VirtualHost = "/",
	    Port = 5672,
	    Hostnames = new List {"localhost"},
	    RequestTimeout = TimeSpan.FromSeconds(10),
	    PublishConfirmTimeout = TimeSpan.FromSeconds(1),
	    RecoveryInterval = TimeSpan.FromSeconds(1),
	    PersistentDeliveryMode = true,
	    AutoCloseConnection = true,
	    AutomaticRecovery = true,
	    TopologyRecovery = true,
	    Exchange = new RawRabbit.Configuration.GeneralExchangeConfiguration
	    {
		Durable = true,
		AutoDelete = false,
		Type = RawRabbit.Configuration.Exchange.ExchangeType.Topic
	    },
	    Queue = new RawRabbit.Configuration.GeneralQueueConfiguration
	    {
		Durable = true,
		AutoDelete = false,
		Exclusive = false
	    }
	}
});

Here we configure broker address, user credentials for connection, exchanges and queues default settings. For production solution we should move most of these params to system configuration.

With this in place, we can register implementation of publisher, that will be used in our codebase.

services.AddSingleton<IEventPublisher,RabbitEventPublisher>();

We must of course add service in Startup class in ConfigureServices method:

services.AddRabbit();

And here is a publisher code:

public class RabbitEventPublisher : IEventPublisher
{
	private readonly IBusClient busClient;

	public RabbitEventPublisher(IBusClient busClient)
	{
		this.busClient = busClient;
	}

	public Task PublishMessage(T msg)
	{
		return busClient.BasicPublishAsync(msg, cfg => {
			cfg.OnExchange("lab-dotnet-micro").WithRoutingKey(typeof(T).Name.ToLower());
		});
	}
}

It accepts busClient which is part of RawRabbit library and uses it to send message to exchange named lab-dotnet-micro, with routing key equal to name of message class and content being an instance of message type.
For example, we have PolicyCreated class that represents event of new policy being sold. If we do publisher.PublishMessage(new PolicyCreated { … }) a message with routing key PolicyCreated and content being JSON serialized PolicyCreated class will be send to lab-dotnet-micro exchange.

We can now modify CreatePolicyHandler to send event when new policy is sold.
We need to add dependency on IEventPublisher and use it to send event.

public async Task Handle(CreatePolicyCommand request, CancellationToken cancellationToken)
{
	using (var uow = uowProvider.Create())
	{
		var offer = await uow.Offers.WithNumber(request.OfferNumber);
		var customer =  ….
		var policy = offer.Buy(customer);

		uow.Policies.Add(policy);
		await uow.CommitChanges();

		await eventPublisher.PublishMessage(PolicyCreated(policy));

		return new CreatePolicyResult
		{
		PolicyNumber = policy.Number
		};    
	}
}

private PolicyCreated PolicyCreated(Policy policy)
{
	return new PolicyCreated
	{
		PolicyNumber = policy.Number,
	    	…
	};
}

Listening to events and sending SignalR notifications to users

Now, when publishing part is ready, we can implement listener for PolicyCreated event in our ChatService.

For a start we need to add NuGet dependency on the following packages in ChatService.csproj.

<PackageReference Include="RawRabbit.DependencyInjection.ServiceCollection" Version="2.0.0-rc5" />
<PackageReference Include="RawRabbit.Operations.Subscribe" Version="2.0.0-rc5" />

Now we can configure connection to RabbitMQ and configure listeners. Connection configuration is the same as for publisher part. You can find it in RawRabbitInstaller for ChatService.

Before we get into the details of how we are going to implement subscription to messages, we need to talk a little bit about architectural principles in our solution. So far we used commands and queries handlers as something that encapsulates domain logic and exposes it to external world. These commands and queries are exposed using Web API controllers.
Our architecture was based on MediatR in this area. For the sake of consistency we are also going to use MediatR for handling messages. MediatR has definitions of INotification and  INotificationHandler interfaces. First one is used to mark event or message classes and the second one is used to implement handler for given notification type (event/message type).

As you may already have seen PolicyCreated class is marked with INotification interface.
Below you can see a handler for PolicyCreated notification.

public class PolicyCreatedHandler : INotificationHandler
{
	private readonly IHubContext chatHubContext;

	public PolicyCreatedHandler(IHubContext chatHubContext)
	{
		this.chatHubContext = chatHubContext;
	}

	public async Task Handle(PolicyCreated notification, CancellationToken cancellationToken)
	{
		await chatHubContext.Clients.All.SendAsync("ReceiveNotification",$"{notification.AgentLogin} just sold policy for {notification.ProductCode}!!!");
	}
}

Here you can see that we use injected instance of our AgentChatHub to send a message to all connected users that agent with given login has sold a policy of given type. This way we combined SignalR with message coming from other microservice. But we still did not see how to glue together MediatR notification handler with RabbitMQ messages.

Let’s analyze RabbitEventListener and RabbitListenersInstaller classes. Let’s examine what  RabbitEventListener ListenTo method does

public void ListenTo(List eventsToSubscribe)
{
	foreach (var evtType in eventsToSubscribe)
	{
		This.GetType()
			.GetMethod("Subscribe", 
				System.Reflection.BindingFlags.NonPublic|
				System.Reflection.BindingFlags.Instance)
			.MakeGenericMethod(evtType)
			.Invoke(this, new object[] { });
	}
}

This method takes a list of types that represent messages we’d like to listen to. We iterate over this list and for each type we call Subscribe<T> method. This is where actual work is done.

private void Subscribe() where T : INotification
{
	this.busClient.SubscribeAsync(
	async (msg) =>
	{
		using (var scope = serviceProvider.CreateScope())
		{
			var internalBus = scope.ServiceProvider.GetRequiredService();
			await internalBus.Publish(msg);
		}
	},
	cfg => cfg.UseSubscribeConfiguration(
		c => c
		.OnDeclaredExchange(e => e
			.WithName("lab-dotnet-micro")
			.WithType(RawRabbit.Configuration.Exchange.ExchangeType.Topic)
			.WithArgument("key", typeof(T).Name.ToLower()))
			.FromDeclaredQueue(q => q.WithName("lab-chat-service-" + typeof(T).Name)))
	);
}

Here we connect to exchange lab-dotnet-micro and declare a queue that will collect messages with routing key equal to event type name. Queue name is lab-chat-service plus event type name.
When the message with such key arrives at the exchange it is placed in our queue. When our ChatService is up and running it will receive this message and code in bold will be executed.
This code resoves IMediator and uses it to route to appropriate INotificationHandler registered.

Remember that we use MediatR integration with ASP.NET Core dependency injection so we do not need to make any additional steps (apart from calling services.AddMediatR() in our Startup class) to have our PolicyCreatedHandler registered.

So we only need to register types of messages we want to handle so RabbitEventListener will initiate a queue and start subscription.
We do it by adding the following code to Startup class Configure method:

app.UseRabbitListeners(new List<Type> {typeof(PolicyCreated)});

This way if you want to handle several types of messages, for example PolicyTerminated, ProductActivated all we had to do would be:

  • Create INotificationHandler for given type
  • Add it to a list of types passed to UserRabbitListeners

Summary

As you can see with a little help of SignalR and RawRabbit we can implement fairly sophisticated functionality. Combining MadiatR with previously mentioned components keeps our business logic code away from infrastructural concerns and helps keep us clean architecture of the whole solution.

We also added a great business value to our solution. Now our insurance agent can talk to each other and also they get real-time notifications about various business events.

You can check out complete solution source code at  https://github.com/asc-lab/dotnetcore-microservices-poc.