Publish/Subscribe
On In a tutorial, We created a work queue. The assumption behind the work queue is that each task is delivered to a worker. In this section, we will do something completely different - we will deliver a message to multiple consumers. This pattern is called Publish/Subscribe.
To illustrate this pattern, we will build a simple logging system. It will consist of two programs -- the first will issue log messages, and the second will receive and print them. In our logging system, each running copy of the receiver program receives a message. In this way, we can run a receiver and direct the logs to disk; At the same time, we will be able to run another receiver and view the log on the screen. In essence, published log messages will be broadcast to all recipients.
Exchanges
In the previous tutorials, we implemented sending messages to and receiving messages from queues. It's time to introduce the complete messaging model in Rabbit.
Let's quickly review what we introduced in the previous tutorial:
- The producer is the application that sends the message.
- A queue is a buffer that stores messages.
- Consumers are applications that receive messages.
The core idea of RabbitMQ messaging model is that producers never send any messages directly to the queue. In fact, producers often don't even know if messages will be delivered to any queue. Instead, the producer can only send messages to the exchange. Exchange is a very simple thing. On the one hand, it receives messages from producers and on the other hand, it pushes them to queues. The exchange must know exactly how to handle the messages it receives. Should it be attached to a specific queue? Should it be attached to many queues? Or it should be discarded. Its rules are defined by the exchange type .
There are several exchange types available: direct, topic, and headers And fanout. We will focus on the last one - fanout. Let's create an exchange of this type and name it logs:
channel.exchange_declare(exchange= 'logs' , exchange_type = 'fanout' )
fanout exchange is very simple. As you might guess from the name, it just broadcasts all received messages to all queues it knows, which is what we need for logs.
Now we can publish to our named exchange instead:
channel.basic_publish(exchange='logs', routing_key='', body=message)
Temporary queue
You may remember that we used to use queues with specific names (remember hello and task_queue?). Being able to name queues is crucial for us - we need to point staff to the same queue. When you want to share a queue between producers and consumers, it is important to name the queue.
But for our logger, this is not the case. We want to know all log messages, not just some of them. We are only interested in the current flowing news, not the old news. In order to solve this problem, we need to do two things.
- First, whenever we connect to Rabbit, we need a new empty queue. To do this, we can create a queue with a random name, or let the server choose a random queue name for us. We can send a message to the queue_declare provides an empty queue parameter to:
result = channel.queue_declare(queue='')
At this point, result.method.queue contains a random queue name. For example, it might look like amq.gen-JzTY20BRgKO-HjmUJj0wLg.
- Second, once the consumer connection is closed, the queue should be deleted. There is an exclusive flag:
result = channel.queue_declare(queue='', exclusive=True)
binding
We have created a fanout exchange and a queue. Now we need to tell the switch to send messages to our queue. This relationship between exchange and queue is called binding.
channel.queue_bind(exchange='logs', queue=result.method.queue)
From now on, the logs exchange will attach messages to our queue.
Put it all together
The producer program that generates log messages does not look much different from the previous tutorial. The most important change is that we now want to publish messages to our logs exchange instead of anonymous exchange. We need to provide a routing when sending_ Key, but its value is ignored during fanout exchange.
emit_ The log.py file code is:
import pika import sys connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()
As you can see, after the connection is established, we declare the exchange. This step is necessary because publishing to non-existent exchanges is prohibited. If no queue is bound to the switch, the message will be lost, but this is OK for us; If no consumer is listening, we can safely discard the message.
receive_ The logs.py file code is:
import pika connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()
At this point, we have completed the overall code. If you want to save the log to a file, simply open the console and type:
python receive_logs.py > logs_from_rabbit.log
If you want to see the log on the screen, generate a new terminal and run:
python receive_logs.py
Of course, to issue a log type:
python emit_log.py
Using rabbitmqctl list_bindings you can verify that the code does create bindings and queues as we need them. Run two receive_ After the logs.py program, you should see the following:
sudo rabbitmqctl list_bindings # => Listing bindings ... # => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] # => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] # => ...done.