Integrating RabbitMQ with ASP.NET Core - Quickstart with an Example

ASP.NET Core Posted 25 days ago

Why do we need a Message Queue?

Imagine we have a producer server that relays a real-time Hero record information that needs to be pushed on to the database. Now this information is so random and so high-volume that if we are to write the data to the database as it is received, the chances of causing a memory leak or a performance hit are high.

So instead of consuming the data as it is received on the API end, we'd introduce a queue in the middle.

The producer server puts all the Hero records into this queue, which signals the subscribed Receiver Service inside our Hero API that a new record has been received. This serializes the entire process and helps the receiver reduce its load. The records are now process in a FCFS (First Come First Served) fashion.

The Message Queue here takes the role of a Postbox, where the Producer drops the records as it creates and the Queue also behaves like a Postman who delivers it to the intended (or subscribed) Receiver in a serialized fashion, one record at a time. It creates a temporary buffer where the data produced by one component resides for a while, before it gets consumed by the other component.

A bit about RabbitMQ

RabbitMQ is a message-broker that offers multiple messaging protocols and features that help components asynchronously pass data (in the form of messages) among one another.

In this article, let's look at how do we setup a RabbitMQ and utilize its message queuing capabilities for queueing data created by a remote producer server and consume it and store it to a database via an AspNetCore API.

We're going to use RabbitMQ to play the Postman and the Postbox here - our choice of a Message Queue.

Installation:

RabbitMQ can be installed as a standalone server on various environments (Linux/Windows/Cloud) or can be used a docker image (my personal favorite). I'm going by the Docker route which can be installed as below:

> docker run -it --rm --name mymq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

RabbitMQ server runs on the port 5672 by default and we're now running a docker image of the RabbitMQ server which also comes with a management UI. This management UI visualizes all the additional settings, the available queues and options on these queues for the developer. If you're not interested in the management stuff, you can simply install just the RabbitMQ server as below:

> docker run -rm --name mymq -p 5672:5672 rabbitmq

Integration - Creating the Producer Server:

As mentioned before, we're now creating a Producer server that creates and puts new Heroes into the MQ, which is consumed by a Receiver service in our AspNetCore API. To get started, one needs to install the RabbitMQ.Client nuget package in the code to use the libraries to programmatically connect to and push content into the queue or to subscribe to and receive content from the queue.

Let's say we have a Producer server that is a .NET Core (.NET 5) console application. In it we install the RabbitMQ.Client library

> dotnet add package RabbitMQ.Client

Then in the Program.cs, we add the necessary logic to:

  1. Create Connection
  2. Create Channel
  3. Connect to (or Create) Queue
  4. Put data into the Queue

The code looks like below:

namespace RabbitMqSend
{
    class Program
    {
        public static void Main()
        {
            // specify the MQ server we're connecting to
            // in our case its localhost since we're running
            // in a local docker container
            var factory = new ConnectionFactory() { HostName = "localhost" };
            
            // 1. create connection
            using (var connection = factory.CreateConnection())

            // 2. create channel
            using (var channel = connection.CreateModel())
            {
                // 3. connect to the queue
                channel.QueueDeclare(queue: "heroes",
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);

                int index = 1;
                while (index <= 99999)
                {
                    // we need to write data in the form of bytes
                    string message = $"{index}|SuperHero{10000+index}|Fly,Eat,Sleep,Manga|1|{DateTime.UtcNow.ToLongDateString()}|0|0";
                    var body = Encoding.UTF8.GetBytes(message);

                    // push content into the queue
                    channel.BasicPublish(exchange: "",
                                         routingKey: "heroes",
                                         basicProperties: null,
                                         body: body);
                    Console.WriteLine(" [x] Sent {0}", message);
                    index++;
                    Thread.Sleep(10000);
                }
            }
        }
    }
}

The code just creates a new Hero record (with all its attributes seperated by a |) and pushes it into the queue at an interval of 10000ms. The setup just tries to mimic a general produces who produces some content frequently. In the real-world this would be a server that pushes data infinitely.

If we look at the code, we can observe the producer does the four steps mentioned before.

// 1. create connection
var connection = factory.CreateConnection()

// 2. create channel (that creates a channel with session and a model)
var channel = connection.CreateModel()

// 3. connect to the queue
channel.QueueDeclare(queue: "heroes", durable: false, exclusive: false, autoDelete: false, arguments: null);

// 4. put data into the queue
channel.BasicPublish(exchange: "", routingKey: "heroes", basicProperties: null, body: body);

Since we're using these variables in a "using" block, the connections and channels are closed once the job is over. But a receiver needs to persist on the channel for receiving the data.

Creating a Consumer Service within the AspNetCore API Space:

We'd now create a receiver who'd continuously listen on this queue and get data as its placed. This receiver then inserts this received message into the database which is exposed for clients by the Web API.

But how do we run this receiver that is expected to run all the time? The answer is - wrap this functionality inside a Background Service.

The receiver service is a class that derives the BackgroundService class and overrides the ExecuteAsync() method. The implementation is as below:

namespace RabbitHeroesApi
{
    public class HeroReceiverService : BackgroundService
    {
        private IServiceProvider _sp;
        private ConnectionFactory _factory;
        private IConnection _connection;
        private IModel _channel;

        // initialize the connection, channel and queue 
        // inside the constructor to persist them 
        // for until the service (or the application) runs
        public HeroReceiverService(IServiceProvider sp)
        {
            _sp = sp;
            
            _factory = new ConnectionFactory() { HostName = "localhost" };
            
            _connection = _factory.CreateConnection();
            
            _channel = _connection.CreateModel();
            
            _channel.QueueDeclare(
                queue: "heroes", 
                durable: false, 
                exclusive: false, 
                autoDelete: false, 
                arguments: null);
        }

        protected override Task ExecuteAsync(CancellationToken stoppingToken)
        {
            // when the service is stopping
            // dispose these references
            // to prevent leaks
            if (stoppingToken.IsCancellationRequested)
            {
                _channel.Dispose();
                _connection.Dispose();
                return Task.CompletedTask;
            }

            // create a consumer that listens on the channel (queue)
            var consumer = new EventingBasicConsumer(_channel);

            // handle the Received event on the consumer
            // this is triggered whenever a new message
            // is added to the queue by the producer
            consumer.Received += (model, ea) =>
            {
                // read the message bytes
                var body = ea.Body.ToArray();
                
                // convert back to the original string
                // {index}|SuperHero{10000+index}|Fly,Eat,Sleep,Manga|1|{DateTime.UtcNow.ToLongDateString()}|0|0
                // is received here
                var message = Encoding.UTF8.GetString(body);
                
                Console.WriteLine(" [x] Received {0}", message);
                

                Task.Run(() =>
                {
                    // split the incoming message
                    // into chunks which are inserted
                    // into respective columns of the Heroes table
                    var chunks = message.Split("|");

                    var hero = new Hero();
                    if(chunks.Length == 7)
                    {
                        hero.Name = chunks[1];
                        hero.Powers = chunks[2];
                        hero.HasCape = chunks[3] == "1";
                        hero.IsAlive = chunks[5] == "1";
                        hero.Category = Enum.Parse<Category>(chunks[6]);
                    }

                    // BackgroundService is a Singleton service
                    // IHeroesRepository is declared a Scoped service
                    // by definition a Scoped service can't be consumed inside a Singleton
                    // to solve this, we create a custom scope inside the Singleton and 
                    // perform the insertion.
                    using (var scope = _sp.CreateScope())
                    {
                        var db = scope.ServiceProvider.GetRequiredService<IHeroesRepository>();
                        db.Create(hero);
                    }
                });
            };

            _channel.BasicConsume(queue: "heroes", autoAck: true, consumer: consumer);

            return Task.CompletedTask;
        }
    }
}

To keep the connection persistent, we pull the declarations out of using blocks and place them inside the constructor. The same four steps are followed similar to the Producer flow, except that in this case we create a "Consumer" that subscribes to the queue and on any message received, an event handler is registered where read the message out of the queue and process it. In our case we insert the received message into our database as a new Hero. Finally, we register this service as a HostedService in our Startup class. Since we're reusing the RedisHeroes API application we built to demonstrate usage of Redis Cache, the HostedService comes below all other existing registrations.

services.AddScoped<IDapperr, Dapperr>();
services.AddScoped<IHeroesRepository, DapperHeroesRepository>();
services.AddSingleton<ICacheService, CacheService>();
services.AddHostedService<HeroReceiverService>();

Rabbit in Action:

On running both the producer and the API simultaneously, we can see that the data created and written on to the queue is immediately consumed by the API via the Received event handler.

data/Admin/2021/4/rabbitmq-output-in-action.PNG

Final Thoughts:

The idea of Message Queues is important for microservice architecture deployed in the cloud, where each node processes different responsibilities and in the case where one request might require other services be notified of the changes, MQs play an important role of providing a temporary buffer for communication. By leveraging messaging protocols such as AMQP (Avdanced Message Queueing Protocol) and a pub-sub model, RabbitMQ makes things easy for to create such a buffer to be shared and accessed by multiple service nodes and help notify each other of any required data.

Complete Example: https://github.com/referbruv/aspnetcore-rabbitmq-heroes-example

Author-Image

Ram

I'm a full-stack developer and a software enthusiast who likes to play around with cloud and tech stack out of curiosity.

Hope this article was helpful. You can now show us your support. 😊

Join the Newsletter

Subscribe to get our latest content by email.
    We won't send you spam. Unsubscribe at any time.
    We use cookies to provide you with a great user experience, analyze traffic and serve targeted promotions.   Learn More   Accept