RabbitMQ of the message queue from knocking on the door to introducing (Lecture 5) - "Routing"

RabbitMQ of the message queue from knocking on the door to introducing (Lecture 5) - "Routing"

This article is translated from RabbitMQ Official Course (If the link fails, please visit: http://www.rabbitmq.com/tutorials/tutorial-four-python.html)

Prerequisite

Like other Python tutorials, we use the Pika RabbitMQ client, version number. 0.11.0.

Focus of this tutorial

In the previous tutorial, we built a simple log system. We can broadcast log messages to multiple recipients.

In this tutorial, we will add a new feature to it - we will only push a subset of messages. For example, we can only write important error messages to log files (to save disk space) and still print all log messages on the console terminal.

binding

In the previous example, we have created the binding. You can recall the code:

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name)

Binding is the relationship between switches and queues. This can be simply understood as: queues are interested in messages in switches.

Binding can use additional routing_key parameters. To avoid confusion with the basic_publish parameter, we call it binding key. The following example demonstrates how to create a binding using a key:

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name,
                   routing_key='black')

The meaning of a binding key depends on the type of exchange. The fanout exchange we used earlier simply ignores this value.

Direct exchange

In the previous tutorial, our log system broadcasts all messages back to all consumers. We want to extend its functionality to allow filtering based on the severity of the message. For example, we might want scripts that are writing log messages to disk to write only serious errors, instead of warning and prompt messages, so that disk space is not wasted.

The fanout switching we used before was very inflexible -- it only broadcasts unconsciously.

We need to use direct exchange instead. The routing algorithm behind direct switching is simple -- messages enter a queue that matches the routing key of the message perfectly.

To illustrate this, please refer to the following settings:

In this setup, we can see that direct exchange x has two queues. The first queue binds the keyword orange, and the second queue has two bindings, one binds the keyword black and the other binds the keyword green.

In such a setting, a message pushed to the switch with the routing key orange is routed to queue Q1. Messages with routing keys black and green are routed to queue Q2. All other messages will be discarded.

Multiple binding

It is perfectly legal to bind multiple queues with the same binding key. In our example, we can add a binding between x and Q1 using the binding key black. In this case, direct switching broadcasts messages to all matching queues, just like fanout switching. Messages with the routing key black are passed to Q1 and Q2.

Sending logs

We will use this mode for our logging system. Instead of using fanout exchange, we send messages to dircet exchange. We regard the seriousness of the message as routing key. In this way, the receiving script can receive the message corresponding to the seriousness it wants to receive. Let's first focus on the sending logs.

As we always need to do, let's first create an exchange:

channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

Now we are ready to send a message:

channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)

To simplify, we assume that severity is one of info, warning and error.

Subscribe

Receiving messages is like in previous tutorials, with one exception -- we will create new bindings for each severity we are interested in.

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

Merge code

The code for the emit_log_direct.py script:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

severity = sys.argv[1] if len(sys.argv) > 2 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

The code for the receive_logs_direct.py script:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

If you want to save only'warning'and'error' (rather than'info') log messages to a file, just open a console terminal and enter:

python receive_logs_direct.py warning error > logs_from_rabbit.log

If you want to see all the log messages on the screen, open a new console terminal and type:

python receive_logs_direct.py info warning error
# => [*] Waiting for logs. To exit press CTRL+C

Then, for example, to send an error log message, you only need to enter:

python emit_log_direct.py error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'

Get into Part 5 of this tutorial (If the link fails, please visit: http://www.rabbitmq.com/tutorials/tutorial-five-python.html ) Learn how to listen to news according to patterns.

Keywords: Python RabbitMQ

Added by lmhart on Wed, 15 May 2019 16:37:38 +0300