RabbitMQ of the message queue from knocking on the door to introducing (Lecture 5) - "Routing"
This article is translated from RabbitMQ Official Course (If the link fails, please visit: http://www.rabbitmq.com/tutorials/tutorial-four-python.html)
Prerequisite
Like other Python tutorials, we use the Pika RabbitMQ client, version number. 0.11.0.
Focus of this tutorial
In the previous tutorial, we built a simple log system. We can broadcast log messages to multiple recipients.
In this tutorial, we will add a new feature to it - we will only push a subset of messages. For example, we can only write important error messages to log files (to save disk space) and still print all log messages on the console terminal.
binding
In the previous example, we have created the binding. You can recall the code:
channel.queue_bind(exchange=exchange_name, queue=queue_name)
Binding is the relationship between switches and queues. This can be simply understood as: queues are interested in messages in switches.
Binding can use additional routing_key parameters. To avoid confusion with the basic_publish parameter, we call it binding key. The following example demonstrates how to create a binding using a key:
channel.queue_bind(exchange=exchange_name,
queue=queue_name,
routing_key='black')
The meaning of a binding key depends on the type of exchange. The fanout exchange we used earlier simply ignores this value.
Direct exchange
In the previous tutorial, our log system broadcasts all messages back to all consumers. We want to extend its functionality to allow filtering based on the severity of the message. For example, we might want scripts that are writing log messages to disk to write only serious errors, instead of warning and prompt messages, so that disk space is not wasted.
The fanout switching we used before was very inflexible -- it only broadcasts unconsciously.
We need to use direct exchange instead. The routing algorithm behind direct switching is simple -- messages enter a queue that matches the routing key of the message perfectly.
To illustrate this, please refer to the following settings:
In this setup, we can see that direct exchange x has two queues. The first queue binds the keyword orange, and the second queue has two bindings, one binds the keyword black and the other binds the keyword green.
In such a setting, a message pushed to the switch with the routing key orange is routed to queue Q1. Messages with routing keys black and green are routed to queue Q2. All other messages will be discarded.
Multiple binding
It is perfectly legal to bind multiple queues with the same binding key. In our example, we can add a binding between x and Q1 using the binding key black. In this case, direct switching broadcasts messages to all matching queues, just like fanout switching. Messages with the routing key black are passed to Q1 and Q2.
Sending logs
We will use this mode for our logging system. Instead of using fanout exchange, we send messages to dircet exchange. We regard the seriousness of the message as routing key. In this way, the receiving script can receive the message corresponding to the seriousness it wants to receive. Let's first focus on the sending logs.
As we always need to do, let's first create an exchange:
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
Now we are ready to send a message:
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
To simplify, we assume that severity is one of info, warning and error.
Subscribe
Receiving messages is like in previous tutorials, with one exception -- we will create new bindings for each severity we are interested in.
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
Merge code
The code for the emit_log_direct.py script:
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
severity = sys.argv[1] if len(sys.argv) > 2 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
The code for the receive_logs_direct.py script:
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
If you want to save only'warning'and'error' (rather than'info') log messages to a file, just open a console terminal and enter:
python receive_logs_direct.py warning error > logs_from_rabbit.log
If you want to see all the log messages on the screen, open a new console terminal and type:
python receive_logs_direct.py info warning error
# => [*] Waiting for logs. To exit press CTRL+C
Then, for example, to send an error log message, you only need to enter:
python emit_log_direct.py error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'
Get into Part 5 of this tutorial (If the link fails, please visit: http://www.rabbitmq.com/tutorials/tutorial-five-python.html ) Learn how to listen to news according to patterns.