Streaming logs using RabbitMQ

Published by Coenraad Pretorius on

More data stacks are shifting towards microservices and communication between them are essential as it sometimes need to communicate to more than one service. Message brokers play a critical part in this architecture by allowing services to communicate by routing messages and storing them. In this post we will be creating a real-time log viewer that consolidates errors across several apps and a python app to process error logs.

Adapted from image by Freepik.


In a previous post we created a streaming log viewer using FastAPI that read the last few lines a log file. While this is a good application, issues may arise with having multiples of these running in terms of file read/write access. We may also miss warning or error messages as no filtering was applied.

To overcome these issues, we are going to use RabbitMQ as a message broker as it is lightweight, easy to deploy and supports multiple messaging protocols. The working concept is illustrated in the below image. You have a producer of a message which goes into an exchange. From the exchange, you define one or more message queues that a consumer will connect to. The type of exchanges will vary according to the application and protocol used.

Image source:

In this case, we are going to have three log producers, which are our python apps. They will send log data to the topic exchange. For our consumers, the first one will be a simple HTML page that connects via websockets, directly to the exchange. The second consumer will be a python app to process error logs from the message queues.

Setting up RabbitMQ

The first step is to install Erlang, which is a prerequisite, and then RabbitMQ. Once installed, we open the RabbitMQ command line and enable the management web interface and the MQTT over websockets plug-ins:

# web management plugin
rabbitmq-plugins enable rabbitmq_management

# mqtt over websockets
rabbitmq-plugins enable rabbitmq_web_mqtt

The management interface will be available at http://localhost:15672/ with default username and password guest. All MQTT messages go via the amq.topic exchange by default but can be configured in the config file.

RabbitMQ exchanges tab in the management interface.

We will use the default exchange and next create the three queues we will need. For app1 and app2 we only create queues for ERROR messages while for app3 we will queue all log messages.

Created queues that we can bind to the exchange.

Now that we have the queues, we need to bind them to the exchange and add the Routing key. The routing key would be the topic from the producers and the python package we will be using will create topics for us using the logger’s name and then the logging level, e.g. app1/ERROR.

Important to note that topics normally contain / which would need to be converted to . in the Routing key. Thus, a topic defined as app3/ERROR would have a routing key of app3.ERROR. Also worth noting, these are case-sensitive.

Binding queues to the exchange and defining the routing keys.

Creating the log producers

For the producers, we will setup three python apps that log messages. We will be using a python package called python_logging_rabbitmq that can be found here. We setup a logging handler specifically for RabbitMQ and specify the exchange explicitly as amq.topic as the default defined in the python package is logs.

View this gist on GitHub

The log handler will produce JSON messages that are sent to the exchange. When defining the handler, we can also add additional fields to the message, in this case {"source": f"{APP_NAME}-producer", "env": "development"}. A typical JSON message will look like this:

  "name": "app1",
  "msg": "*** this is a log message from 'app1'.",
  "args": null,
  "levelname": "ERROR",
  "levelno": 40,
  "pathname": ".\\",
  "filename": "",
  "module": "producer1",
  "exc_info": null,
  "exc_text": null,
  "stack_info": null,
  "lineno": 38,
  "funcName": "<module>",
  "created": 1668934072.0913537,
  "msecs": 91.3536548614502,
  "relativeCreated": 310199.5060443878,
  "thread": 17616,
  "threadName": "MainThread",
  "processName": "MainProcess",
  "process": 32972,
  "source": "app1-producer",
  "env": "development",
  "host": "instinct"

Once we run all three producers, we can see the messages coming through and messages being added to the queues. In producer three, we can see more messages are being logged as we are logging DEBUG, INFO, WARNING and ERROR messages. Clicking on the queues gives more details and statistics.

Messages being added the newly created queues.

Creating the log consumers

For the consumer, we create a simple HTML page and style it with Tailwind CSS. We use JavaScript to initiate the websockets connection and process the log messages.

View this gist on GitHub

In RabbitMQ, we see a new queue from our web browser, prefixed with mqtt-subscription.

New queue created for direct messages for MQTT over websockets.
Real-time log viewer directly from the topic exchange.

With this connection, we are consuming messages directly from the exchange amq.topic. We are not consuming messages from the queues; however, this can be enabled in the newer versions if needed. We can see on the messages still increasing in number in the queues. Once our simulated producers stop running, the state will turn to idle, but the messages will remain in the queues.

Reading messages from the queues is a destructive operation, meaning once it is read, it is removed from the queue. This is something we may not want for our ERROR messages. We want to log the errors in a central location for all our apps. Here we create our second consumer that reads messages from the queues using the pika library. The code comes from one of the RabbitMQ tutorials.

View this gist on GitHub

Below we see the output of the second consumer, processing messages from the queue and just printing them for illustration. In practise, these will typically be logged to a file or database for reporting. We can go one step further and use these logs to trigger restart of services, creating tickets or launching backfill processes.

Processing app1-error queue messages with

As we processed all the previous messages in the app1-error queue, the queue length is zero as we process new messages as they come in with the second consumer.

Queues tab showing we processed all app1-error from the past and continuing the process new messages as they arrive.


We leveled up our logging game by consolidating our ERROR messages across three apps into central message queues, using RabbitMQ. Then we created a web consumer to monitor our application in real-time in an HTML page. Our second consumer processed the messages in the queues from the past to the present.

While this case shows the application in logging, the application of message queues reaches far beyond this. For example, in IoT data streaming where internet connections are not stable it provides local redundancy and ensures data is not lost. It can also be used for event streaming, like changing of process states which may require human intervention.

Full code available on GitHub.


Leave a Reply

Avatar placeholder

Your email address will not be published. Required fields are marked *