Introduction
This is RabbitMQ in Depth's learning notes.
AMQ
The AMQ(Advanced Message Queuing) model defines three abstract components:
- Exchange - Message broker component that routes messages to queues
- Queue - Data structure for storing messages
- Binding - Rule that tells Exchange which Queue messages should be stored
In addition to being able to bind queues to Exchange, RabbitMQ implements AMQP that allows Exchange to bind to other Exchanges.
How RabbitMQ communicates
AMQP as RPC Transport
Open Session
Convert to the correct Channel
RPC Frame Structure of AMQP
AMQP classes define the scope of functionality, and each class contains methods for performing different tasks.
During connection establishment, the RabbitMQ service disconnected a Connection.Start command, compiled into a frame, sent to the client.
Components of frame in AMQP
frame encodes the information to be transmitted.
AMQP frame consists of five components:
- Frame type
- Channel Number
- Frame size (encoded in bytes)
- Frame's payload
- End Mark
Type of Frame
The AMQP specification defines five frames: protocol header frame, method frame, message header frame, message body frame, and heartbeat frame.
- The protocol header frame is sent only once and only used when connected to RabbitMQ
- Method frame contains RPC requests or responses to send or receive
- The header frame contains the size and properties of the message
- Message body frame contains the contents of the message
Encode message to frame
When a message is sent to RabbitMQ, the method, header, and message body frames are used. The first frame sent is the method frame, which contains commands and parameters that need to be executed, such as exchange and routing key. Next comes the message frame: containing the header and body. The message header contains the message properties and the message body size. AMQP has a maximum frame size, and if your message content exceeds that size, it will be sent in multiple message body frames.
These frames are sent in the same order: method frame, message header, and one or more message body frames.
For transmission efficiency, the contents of the method and message header frame are encoded in binary.
The contents of the message body frame can be either plain text or binary image data.
Profiling method frame
The method frame contains the parameters required for your RPC's method and transport. In the figure below, the method frame contains a Basic. A Publish command that contains binary data describing the command and request parameters.
The first two fields are the numeric representation describing the Basic class and the Publish method, followed by the string values for the exchange name and routing key. The mandatory identity describes whether this message must be routed or whether a Basic is sent. Returnframe tells you that this message cannot be routed.
Header frame
The header frame tells RabbitMQ how big your message is. It also contains properties that describe your message. These properties are treated as Basic. Values in the Properties table, but may be empty.
Message body frame
The message body frame is a data structure that contains the actual message data. Depending on your needs, you can make data in any format.
Use protocol
Before you can send a message to a queue, you need to configure it.
At a minimum, you need to create an exchange and queue and then bind them.
Declare an exchange
Exchange passes through Exchange. The Declare command is created with a name, type, and other metadata.
Once RabbitMQ receives this message and creates the exchange successfully, an Exchange is returned. DeclareOk method frame; Otherwise, if the creation fails, return to Channel.Close command.
Declare a Queue
Once exchange is established, you need to go through Queue below. Declare command to create a queue.
Bind queue to exchange
Through Queue.Bind command binding, which specifies one queue at a time.
Publish a message
When you publish a message to RabbitMQ, you need to send at least three frames: Basic.Public method frame, a header frame, and a body frame
When RabbitMQ receives all frames for a message, it first looks at the method frame to decide the next step. Match Basic. The change name and routing key contained in the Public method frame.
When a message matches any bound queue, RabbitMQ queues the message in FIFO order. Instead of queuing the actual message, it is a reference to the message. When RabbitMQ is ready to transmit this message, it will use the reference combination to send the desired message and send it over the network. This provides significant optimization when multiple queues need to be sent, and keeping only one instance of a message when multiple destinations need to be sent greatly reduces memory usage.
And the processing of this message by one queue does not affect other queues. When copies (references) of all messages are transferred or removed, RabbitMQ removes the message instance from memory.
Consumer News
In order to consume messages, a consumer application needs to register (subscribe) to the queue through Basic.Consume command. Consumers will start with Basic. The Deliver command receives messages. Delivery of message header frame and message body frame.
If the consumer wants to stop receiving messages, it needs to send Basic.Cancel command. When consuming messages, you need to specify some settings. One of them is no_ The ACK parameter, when true, continues to send messages until the consumer sends Basic.Cancel command or drop line; When false, the consumer must send Basic.AckRPC requests to determine each received message.
Write a message producer in Python
import rabbitpy
# rabbitmq NodePort 10.43.114.17 <none> 5672:30595/TCP,15672:31847/TCP,15692:32074/TCP 15h url = 'amqp://guest:guest@192.168.89.38:30595/%2F'
connection = rabbitpy.Connection(url)
channel = connection.channel()
# Declare an exchange, specify the channel name exchange = rabbitpy.Exchange(channel, 'chapter2-example') # Send Exchange using the declare method. Declare command exchange.declare()
# Create a queue, specify the name of the queue queue = rabbitpy.Queue(channel, 'example') # Send Queue.Declare command, returned (number of messages in this queue, number of consumers in this queue) queue.declare()
(0, 0)
# Bind queue to exchange # Queue will be sent. Bind command, incoming exchange and routing key queue.bind(exchange, 'example-routing-key')
True
# Now you can send the message for message_number in range(10): message = rabbitpy.Message(channel, # Appoint channel f'Test message {message_number}', # Message Body {'content_type':'text/plain'}, # Message Properties(Dictionaries) opinionated=True) # Create Basic.Public method frame, message header frame, and a message body frame are then transmitted to RabbitMQ message. Publish (exchange,'example-routing-key')
[External chain picture transfer failed, source station may have anti-theft chain mechanism, it is recommended to save the picture and upload it directly (img-hRnL1CcD-1626325783097)(attachment:876ed54c-b4c1-4905-a8e1-8252714425a7.png)]
You can now see 10 messages in the administration console.
# Here's to consume the news channel = connection.channel() queue = rabbitpy.Queue(channel, 'example') while len(queue) > 0: message = queue.get() print('Message') print(f' ID: {message.properties["message_id"]}') print(f' Time: {message.properties["timestamp"].isoformat()}') print(f' Body: {message.body}') message.ack() # confirmation message
Message ID: 6c873463-4b2d-42bb-a824-6771a8551af9 Time: 2021-06-19T04:01:23 Body: bytearray(b'Test message 0') Message ID: 2e9c7ef1-d2fa-4725-adc6-447572444939 Time: 2021-06-19T04:01:23 Body: bytearray(b'Test message 1') Message ID: 4152aa9e-9a82-487d-bf4c-f062061632a2 Time: 2021-06-19T04:01:23 Body: bytearray(b'Test message 2') Message ID: 524572b7-1982-4e63-994b-2163d6464519 Time: 2021-06-19T04:01:23 Body: bytearray(b'Test message 3') Message ID: 31e282ab-4150-46a4-b56e-e645e8abcb5f Time: 2021-06-19T04:01:23 Body: bytearray(b'Test message 4') Message ID: 1a97d253-cb19-4cbb-b778-f6a562478059 Time: 2021-06-19T04:01:23 Body: bytearray(b'Test message 5') Message ID: a78b3fd5-abd0-4f53-87f4-edcb6fa92652 Time: 2021-06-19T04:01:23 Body: bytearray(b'Test message 6') Message ID: ef2b9114-8ce5-449c-8283-2ee0f4d893c4 Time: 2021-06-19T04:01:23 Body: bytearray(b'Test message 7') Message ID: 7c4e951e-c50e-4806-a710-43435fa2dc5d Time: 2021-06-19T04:01:23 Body: bytearray(b'Test message 8') Message ID: 348109cc-4776-43cc-9ee2-0b756fbefe25 Time: 2021-06-19T04:01:23 Body: bytearray(b'Test message 9')
Message Properties
We'll talk about AMQP's Basic.Properties is a data structure that is carried with each message.
Using Properties properties
The message property carried in the header frame is one that passes through Basic.Properties predefined data structures.
RabbitMQ uses defined properties to implement specific behavior of messages. For example, the delivery-mode property, whose value tells RabbitMQ whether to save the message in memory or disk first when it is queued.
AMQP message properties provide a good starting point for defining and carrying message metadata. We'll explore each of the basic properties shown in the diagram above:
-
Use the content-type property to let consumers know how to interpret message bodies
-
Use ``content-encoding'to indicate that the message body may be compressed or encoded in a particular way
-
Fill in the message ``message-id and related-id` to uniquely identify the message and message response and track the message through your workflow
-
Reduce message size using timestamp property and establish specification definition of message creation time
-
Use expiration property to specify message expiration time
-
Tell RabbitMQ to use delivery-mode to write messages to a queue on disk or in memory
-
Use app-id and user-id to help track problematic message Publishers
-
Define publisher and consumer contracts using the type attribute
-
Routing reply-to messages using the reply-to attribute
-
Use headers table attributes for free-form attribute definitions and RabbitMQ routing
The contract for the message above refers to the definition of the message format and content.
Create a message contract for display using Content-Type
Basic. The content-type property of the Properties data structure specifies the data format for each message body.
Typically, MIME type s like those used by HTTP protocols, such as JSON format specified as application/json.
Reduce message size with GZIP compression and Content-Encoding
AMQP messages are not compressed by default. By specifying content-encoding as gzip, your message publisher can compress the message and decompress it automatically when it is received.
Referencing messages through Message-id and Correlation-id
Message-id
Is the unique identity of the message.
Some messages, such as login events, do not require a unique message ID; However, some messages are required, such as order messages.
Correlation-id
Message-id to save the related message.
Generation time: Timestamp property
Like message-id and correlation-id, timestamp is also specified as "designed for application".
Automatic expiration message
The expiration property tells RabbitMQ when unexpended messages can be discarded. Strangely, it's a string type, but it saves a time stamp (in seconds).
Balance speed and safety with DELIVERY-MODE
The delivery-mode size is one byte, and 2 indicates that you want to store the message to disk first before delivering it to a consumer.
Storing messages means that even if the RabbitMQ server restarts, the message will remain in the queue as long as it is not consumed.
delivery-mode has only two values: 1 means no messages are stored (just left in memory); 2 means to store messages (to disk).
As shown in the figure below, if your message is a non-stored message, RabbitMQ uses a memory-based queue.
[External chain picture transfer failed, source station may have anti-theft chain mechanism, it is recommended to save the picture and upload it directly (img-dtd8RnIH-1626325783101). ( https://learning.oreilly.com/library/view/rabbitmq-in-depth/9781617291005/03fig09_alt.jpg )]
Since memory is much faster than disk, specifying delivery-mode 1 will keep your messages as delayed as possible.
When 2 is specified, messages are stored in a disk-based queue.
Validate message source with APP-ID and USER-ID
app-id and user-id provide additional information that messages may need.
app-id
App-ids can save new apps related to your app, such as specifying the API version of your app. Messages from unknown sources can also be discarded based on the app-id attribute.
user-id
user-id is usually used to save logged-in user information.
Use the TYPE property of the message to get specific information
The type property is usually used to specify the message type name.
REPLY-TO for dynamic workflows
reply-to may be used to specify a private queue for the response message.
Save custom properties using the HEADER property
The headers property is a key-value pair table that holds any user-defined key and value. Keys can make strings, and values can be any valid AMQP value type.
PRIORITY attribute
Priority is defined as an integer from 0 to 9 that specifies the priority of messages in the queue. The smaller the number, the higher the priority.
Attributes you cannot use: CLUSTER-ID/RESERVED
The cluster-id attribute has been removed and renamed reserved, and should be empty. You should not use it.
summary
Property | Type | For use by | Suggested or specified use |
---|---|---|---|
app-id | short-string | Application | Useful for defining the application publishing the messages. |
content-encoding | short-string | Application | Specify whether your message body is encoded in some special way, such as zlib, deflate, or Base64. |
content-type | short-string | Application | Specify the type of the message body using mime-types. |
correlation-id | short-string | Application | If the message is in reference to some other message or uniquely identifiable item, the correlation-id is a good way to indicate what the message is referencing. |
delivery-mode | octet | RabbitMQ | A value of 1 tells RabbitMQ it can keep the message in memory; 2 indicates it should also write it to disk. |
expiration | short-string | RabbitMQ | An epoch or Unix timestamp value as a text string that indicates when the message should expire. |
headers | table | Both | A free-form key/value table that you can use to add additional metadata about your message; RabbitMQ can route based upon this if desired. |
message-id | short-string | Application | A unique identifier such as a UUID that your application can use to identify the message. |
priority | octet | RabbitMQ | A property for priority ordering in queues. |
timestamp | timestamp | Application | An epoch or Unix timestamp value that can be used to indicate when the message was created. |
type | short-string | Application | A text string your application can use to describe the message type or payload. |
user-id | short-string | Both | A free-form string that, if used, RabbitMQ will validate against the connected user and drop messages if they don't match. |
Performance trade-offs
Balance transmission speed with reliable transmission
Using the transmission guarantee mechanism can guarantee the reliability of the transmission, but at the same time, it will reduce the transmission speed. These questions can help you find a balance between high performance and reliability.
-
How important is it to ensure that messages are queued after they are published?
-
If a message cannot be routed, should it be returned to the publisher?
-
If a message cannot be routed, should it be sent elsewhere so that it can be reconciled later?
-
Is it okay to lose messages when the RabbitMQ server crashes?
-
Should RabbitMQ confirm when processing a new message that it has performed all the requested routing and persistence tasks to the publisher?
-
Should the publisher be able to bulk messaging and then receive confirmation from RabbitMQ that all requested routing and persistence tasks have been applied to all messages in the batch?
-
If batch publishing requires routing and persistence confirmation, do you need to make a true atomic commit to the message's target queue?
-
Are there acceptable tradeoffs for reliable delivery that publishers can use to achieve higher performance and message throughput?
-
Does other aspects of message publishing affect message throughput and performance?
Set mandatory so RabbitMQ does not receive non-routable messages
If you need the server monitoring data to always route to RabbitMQ before it is collected, all collectd needs to do is tell RabbitMQ that the message published is mandatory (mandatory=True). The mandatory flag follows Basic. The PublishRPC command tells RabbitMQ that if a message cannot be routed, it should pass through Basic.ReturnRPC returns the message to the publisher.
import rabbitpy,datetime url = 'amqp://guest:guest@192.168.89.38:30595/%2F' with rabbitpy.Connection(url) as connection: with connection.channel() as channel: body = 'server.cpu.utilization 25.5 1350884514' message = rabbitpy.Message(channel, body, {'content_type': 'text/plain', 'timestamp': datetime.datetime.now(), 'message_type': 'graphite metric'}) message.publish('chapter2-example', 'server-metrics', mandatory=True)
When you execute this code, you receive an exception similar to the one below. Basic.Return is asynchronous and may be called at any time after the message has been sent.
Message was returned by RabbitMQ: (312) for exchange NO_ROUTE
Let's add exception capture code to it.
import rabbitpy,datetime url = 'amqp://guest:guest@192.168.89.38:30595/%2F' import datetime import rabbitpy connection = rabbitpy.Connection(url) try: with connection.channel() as channel: properties = {'content_type': 'text/plain', 'timestamp': datetime.datetime.now(), 'message_type': 'graphite metric'} body = 'server.cpu.utilization 25.5 1350884514' message = rabbitpy.Message(channel, body, properties) message.publish('chapter2-example', 'server-metrics', mandatory=True) except rabbitpy.exceptions.MessageReturnedException as error: print('Publish failure: %s' % error)
Output:
Publish failure: Message was returned by RabbitMQ: (312) for exchange NO_ROUTE
Publish acknowledgment as a lightweight alternative to transactions
Publisher Confirms are RabbitMQ enhancements to the AMQP protocol. The message publisher must issue a Confirm before publishing any message. Select RPC requests to RabbitMQ and waits for Confirm.SelectOk response to get transfer confirmation if enabled. At this point, for each message sent by the publisher to RabbitMQ, the server will either return an ACK response (``Basic.Ack) or a negative ack response (Basic.Nack), or contain an integer value that specifies the offset of the message it is confirming. Confirm that the value follows in Confirm. Select` The order in which messages are received after an RPC request refers to the message.
When a message published by a publisher is consumed, or when a message that needs to be persisted is queued and persisted, it receives a Basic.Ack request. If a message cannot be routed, the broker sends a Basic message. NackRPC indicates this error. It is up to the publisher to decide what to do next.
import rabbitpy,datetime url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F' with rabbitpy.Connection(url) as connection: with connection.channel() as channel: exchange = rabbitpy.Exchange(channel, 'chapter4-example') exchange.declare() channel.enable_publisher_confirms() # Open Publish Confirmation body = 'This is an important message' message = rabbitpy.Message(channel, body, {'content_type': 'text/plain', 'message_type': 'very important'}) if message.publish('chapter4-example','important.message'): print('The message was confirmed')
Use alternate exchange for messages that cannot be routed
In order to use an alternate exchange, you first need to set up an exchange as an alternate. When the primary exchange is established, add the alternate-exchange parameter to Exchange.Declare specifies the alternate exchange. This process is described in code below.
import rabbitpy url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F' with rabbitpy.Connection(url) as connection: with connection.channel() as channel: my_ae = rabbitpy.Exchange(channel, 'my_ae') # Create backup exchange first my_ae.declare() args = {'alternate-exchange': my_ae.name} # Specify the name whose parameter is the backup exchange exchange = rabbitpy.Exchange(channel, 'graphite', exchange_type='topic', arguments=args) # Specify by arguments exchange.declare() queue = rabbitpy.Queue(channel, 'unroutable-messages') # Create a queue for non-routable messages queue.declare() if queue.bind(my_ae, '#'): # '#'Receive all messages print('Queue bound to alternate-exchange')
Batch with transactions
AMQP transactions, or TX, provide a mechanism for messages to be published in batches to RabbitMQ, then commit to a queue or rollback. The following example shows how easy it is to write code for a transaction.
import rabbitpy url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F' with rabbitpy.Connection(url) as connection: with connection.channel() as channel: tx = rabbitpy.Tx(channel) tx.select() # Open Transaction message = rabbitpy.Message(channel, 'This is an important message', {'content_type': 'text/plain', 'delivery_mode': 2, 'message_type': 'important'}) message.publish('chapter4-example', 'important.message') try: if tx.commit(): # Submit Transaction print('Transaction committed') except rabbitpy.exceptions.NoActiveTransactionError: print('Tried to commit without active transaction')
To start a transaction, the publisher sends a TX.SelectRPC request to RabbitMQ, which then returns a TX.SelectOk response. Once the transaction is opened, the publisher can send one or more messages to RabbitMQ.
Use highly available queues to avoid node failures
Highly Available Queues (HAs) are enhancements to RabbitMQ, not a protocol of AMQP, which allows queues to have redundant copies across RaibbtMQ clusters.
Next, we create a new queue that spans every node in the RabbitMQ cluster.
import rabbitpy url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F' with rabbitpy.Connection(url) as connection: try: with connection.channel() as channel: queue = rabbitpy.Queue(channel, 'my-ha-queue', arguments={'x-ha-policy': 'all'}) # Incoming x-ha-policy if queue.declare(): print('Queue declared') except rabbitpy.exceptions.RemoteClosedChannelException as error: print('Queue declare failed: %s' % error)
When a message is sent to a HA queue, it is sent to each server in the cluster responsible for the HA queue. Once the message is acknowledged by any node in the cluster, copies of messages from other nodes in the cluster are immediately removed from their queues.
Transactions for HA Queue
If you are using a transaction or transport acknowledgement mechanism, RabbitMQ will not send a successful response until a message is acknowledged by all active nodes in the HA queue. This will cause some delay.
Set delivery-mode to store messages to disk
delivery-mode is a Basic specified by the AMQP protocol. Properties property. `` delivery-mode`defaults to 1, meaning there is no need to store messages to disk, at which point, if RabbitMQ restarts, these unexpended messages are lost.
If set to 2, RabbitMQ ensures that messages are saved to disk. This ensures that if restarted, the message will not be lost.
In addition to setting delivery-mode to 2, there is another way to guarantee that messages will exist after restart by declaring the queue durable=True.
RabbitMQ stores messages on disk and tracks them by reference until they are not in any queue. Once all references to the message are removed from the queue, RabbitMQ removes the message from disk.
However, it is important to note that this can greatly slow down the processing speed.
When RabbitMQ returns the message
In RabbitMQ 3.2, RabbitMQ extends the AMQP protocol and adds a notification mechanism that notifies clients that a connection has been blocked when the threshold for a connection is triggered. Connection.Blocked and Connection.Unblocked is an asynchronous method that notifies clients at any time.
Check connection status with rabbitpy
import rabbitpy url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F' with rabbitpy.Connection(url) as connection: print('Connection is Blocked? %s' % connection.blocked)
Connection is Blocked? False
Don't Get, Consume
Message consumer applications typically use the term to receive and process messages. For example, if you use RabbitMQ to implement RPC mode, applications that issue RPC requests can also consume RPC responses.
BASIC.GET VS BASIC.CONSUME
RabbitMQ implements two different AMQP RPC commands to retrieve messages from the queue: Basic.Get and Basic.Consume. Basic.Get is not the ideal way to retrieve messages. Basic.Get is a polling model, while Basic.Consume is a push model.
Basic.Get
When the application uses Basic. When a Get request retrieves a message, it must send a new request each time it wants to receive a message, even if there are multiple messages in the queue. If there is a message in the queue where you are retrieving the message, RabbitMQ returns Basic.GetOkRPC response.
If you send a Basic. There is an available message for the GetRPC request and RabbitMQ responds to Basic.GetOk and this message.
If there is no message in the queue, it will respond to Basic.GetEmpty.
If no message is available, RabbitMQ returns Basic.Emtpy
When using Basic.Get. Your application needs to evaluate the RPC response from RabbitMQ to determine if a message has been received. This is not an effective way to receive and process messages for most long-running programs that receive messages from RabbitMQ.
First run the following code to generate the message:
import rabbitpy url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F' for iteration in range(10): rabbitpy.publish(url,'', 'test-messages', 'go') rabbitpy.publish(url,'', 'test-messages', 'stop')
Let's take a look at using Basic. Examples of Get consumer messages:
import rabbitpy url = 'amqp://guest:guest@192.168.89.38:30595/%2F' with rabbitpy.Connection(url) as connection: with connection.channel() as channel: queue = rabbitpy.Queue(channel, 'test-messages') queue.declare() while True: # Round Robin training message = queue.get() if message: message.pprint() message.ack() if message.body == 'stop': break
Basic.Consume
Through Basic.ConsumeRPC commands consume messages, and you register your application for RabbitMQ and tell it to send messages to your consumers asynchronously.
When client sends Basic.Consume, RabbitMQ sends messages to clients as long as they are available until they send Basic.Cancel command
And when your app receives a message, it doesn't need to be evaluated for an empty response.
import rabbitpy url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F' for message in rabbitpy.consume(url,'test-messages'): message.pprint() message.ack()
When your application starts Basic.Consume, a unique string is created to mark the application of the open channel on RabbitMQ. This string is called a consumer tag and is sent to your application with each message.
This consumer tag can be used to cancel receiving messages from RabbitMQ by sending only one Basic. The Cancel command will do. This tag can then be sent by the client library in general.
import rabbitpy url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F' with rabbitpy.Connection(url) as connection: with connection.channel() as channel: for message in rabbitpy.Queue(channel, 'test-messages'): message.pprint() message.ack() if message.body == b'stop': break
Consumer Performance Tuning
When a message is published, a balanced consideration of throughput and reliable transmission is required when consuming messages. As shown in the figure below, there are several options that can be used to speed up message transmission. However, as the message speed increases, the reliability of message transmission decreases.
Use no-ack mode to improve throughput
When consuming messages, your app registers itself with RabbitMQ. A Basic will be sent. Consume request with no-ack flag. When this flag is True, it tells RabbitMQ that your consumer will not confirm the message received, so RabbitMQ should send the message as quickly as possible.
import rabbitpy url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F' with rabbitpy.Connection(url) as connection: with connection.channel() as channel: queue = rabbitpy.Queue(channel, 'test-messages') for message in queue.consume_messages(no_ack=True): message.pprint()
Although this method is very card-intensive, it is too laggy to be reliably transmitted. To know why, it is important to consider each step that a message must go through before it can be received by a consumer application.
When RabbitMQ sends a message over an open connection, it communicates with the client over a TCP socket connection. If the connection is open and writable, RabbitMQ assumes that everything is working and that the message has been delivered. If there is a network error, RabbitMQ receives a socker error message and knows there is a network problem. If no error message is received, RabbitMQ assumes that the message has already been transmitted. Customers simply send Basic. From AckRPC to RabbitMQ, RabbitMQ knows that the client has successfully received the message and most likely has already processed it.
If message confirmation is turned off, RabbitMQ sends another message without waiting for confirmation. This way, as long as there are enough messages, RabbitMQ will continue to send messages to consumers until the socket buffer is filled.
It is precisely because there is no need to wait for confirmation of consumer messages to achieve the highest throughput. This is the ideal way to speed up one-time messages, but it does not mean there is no significant risk. Consider what happens when a consumer application crashes, assuming that there are still 1 KB of messages in the socket receive buffer of the operating system.
RabbitMQ thinks these messages have been sent and doesn't know how many messages have been consumed when the app crashes and the socket shuts down.
If this method of consuming messages is not appropriate for your application architecture, you need faster message throughput instead of sending and confirming a message.
Controlling consumer prefetching through quality of service settings
The AMQP specification requires channel to have quality of service (QoS) settings where consumers can set the number of specific messages to receive before confirming them. Qos settings allow RabbitMQ to send messages more efficiently by specifying how many messages are pre-allocated to consumers.
Unlike consumer disable ack, if your consumer application crashes before confirming the message, all pre-pulled messages will be returned to the queue when the socket is closed.
At the protocol level, send a Basic on the channel. A QoSRPC request specifies a QoS. You can specify whether the Qos settings apply to this channel or to all channels opened by the connection. Basic.QoSRPC requests can be done at any time, but by sending Basic to consumers. Before the Consume request.
In the figure below, based on a single consumer, the preset prefetch number of 2500 is the best setting for peak message speed at this time.
One benefit of using a QoS setting is that you don't have to confirm every message you receive. Basic. The AckRPC response has an attribute called multiple that, when True, lets RabbitMQ know that your application wants to confirm all previously unacknowledged messages.
import rabbitpy url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F' with rabbitpy.Connection(url) as connection: with connection.channel() as channel: channel.prefetch_count(10) unacknowledged =0 for message in rabbitpy.Queue(channel, 'test-messages'): message.pprint() unacknowledged += 1 if unacknowledged == 10: message.ack(all_previous=True) unacknowledged = 10
Use Transactions
Transactions allow your application to commit and roll back bulk operations. However, transactions have a negative impact on message throughput with one exception. If you are not using the QoS settings, you may notice a slight improvement in performance after using transactions.
Transaction failed after ack was disabled
Reject message
A good way to confirm a message is to make sure RabbitMQ knows that a message has been received and processed by the consumer, but what if there is a problem? It may be the message itself or when the message is processed. At this point, RabbitMQ provides two mechanisms to get the message back to broker:Basic.Reject and Basic.Nack.
Basic.Reject
Basic.Reject is an AMQP-specified RPC response that informs the broker that a message cannot be processed. When a consumer rejects a message, RabbitMQ can discard it or requeue the message back into the queue.
The code below shows how to rejoin a message when the redelivered flag tells the next consumer that the message has been delivered.
import rabbitpy url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F' for message in rabbitpy.consume(url,'test-messages'): message.pprint() print('Redelivered: %s' % message.redelivered) message.reject(True)
And Basic. Like Ack, if a message has no-ack set and is delivered, use Basic.Reject releases the reference to the message.
Use Basic.Rejct can reject only one message at a time.
Basic.Nack
RabbitMQ implements a new response that the AMQP protocol does not have, called Basic.Nack. It can reject multiple messages at once.
Dead Letter Queue
RabbitMQ's dead-letter exchange (DLX) is also an extension of AMQP, which is an optional option to set when rejecting a transmitted message. DLK s are useful when you want to know why you are consuming such messages.
DLX is also a common exchange. One thing to do to make an exchange a DLX is to let it be used for rejected messages when a queue is created. Once a message is rejected and it is not requeued, RabbitMQ routes the message to the exchange specified in the queue's x-dead-letter-exchange parameter.
Specifying a dead letter queue when declaring a queue is fairly simple. Simply create the queue object with the exchange name as dead_ Letter_ The exchange parameter is passed in, or a Queue is being sent. Declare is passed in as an x-dead-letter-exchange parameter when requested.
import rabbitpy url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F' with rabbitpy.Connection(url) as connection: with connection.channel() as channel: rabbitpy.Exchange(channel, 'rejected-messages').declare() queue = rabbitpy.Queue(channel, 'dlx-example', dead_letter_exchange='rejected-messages') queue.declare()
Queue Control
When defining a queue, there are many settings that determine the behavior of the queue. For example, a queue can do the following:
- Delete yourself
- Allow only one consumer to consume
- Expire messages
- Limit the number of messages
- Remove old messages
Note that queue settings are not modifiable, and once you create a queue, you cannot modify its settings later. Unless you delete the rebuild.
Temporary Queue
Like Tom Cruise's briefcase in Spy on Disk, RabbitMQ provides the ability to delete its own queue as long as it is used and no longer needed. These queues can be created with messages that will be automatically deleted once the consumer connects, retrieves the messages, and disconnects.
By Queue. Setting the auto-delete flag to True in DeclareRPC requests makes it easy to create these self-deleting queues.
import rabbitpy url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F' with rabbitpy.Connection(url) as connection: with connection.channel() as channel: queue = rabbitpy.Queue(channel, 'ad-example',auto_delete=True) queue.declare()
Note that any number of consumers can consume from the self-deletion queue, which only deletes itself if no consumer connects it.
Permanent Queue
If you want a queue to remain after RabbitMQ restarts, you can set the durable flag to True. May be confused with persistent messages. Persistent messages are delivered by setting delivery-mode to 2. The durable flag tells RabbitMQ that you want the queue to exist until Queue. The Delete request was invoked.
import rabbitpy url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F' with rabbitpy.Connection(url) as connection: with connection.channel() as channel: queue = rabbitpy.Queue(channel, 'durable-queue',durable=True) if queue.declare(): print('Queue declared')
For those messages that are not important, you may want them to be automatically discarded over time. The TTL value of the message can be set to specify the maximum message lifetime. Each message can expire at a different time.
Instead of setting the expiration property of a message, an x-message-ttl queue can set the expiration time for all messages in the queue.
import rabbitpy url = 'amqp://rabbit:rabbit@192.168.89.38:30595/%2F' with rabbitpy.Connection(url) as connection: with connection.channel() as channel: # Set the message inside to expire in 10 seconds queue = rabbitpy.Queue(channel, 'expiring-msg-queue',arguments={'x-message-ttl':10000}) if queue.declare(): print('Queue declared')
After RabbitMQ 3.1.0, the queue can set the maximum number of messages x-max-length, and once the elements in the queue reach the maximum, RabbitMQ will discard these messages.