Message acknowledgement. When the message is sent to the consumer by RabbitMQ, it will be removed from memory immediately. In this case, as long as you stop a worker, the message being processed will be lost. At the same time, all unprocessed messages sent to the worker will be lost.
RabbitMQ provides message responses to prevent message loss. The consumer will tell RabbitMQ that a message has been received and processed through an ack (response), and then RabbitMQ will release and delete the message. If the consumer hangs up and does not send a response, RabbitMQ will think that the message has not been fully processed and resend it to other consumers.
1 consumer confirmation
The consumer confirmation mode is divided into automatic confirmation and manual confirmation.
1.1 automatic confirmation
Set basic_ Consumer() fourth argument $no_ack set to true indicates automatic confirmation. If the consumer receives the message, it will automatically send an ack.
/** * Send ack automatically */ public function index() { // Connect and establish channels $connect = new AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin2021'); $channel = $connection->channel(); $channel->basic_consume('queue_name', '', false, true, false, false, $callback); // Consumer message callback $callback = function ($msg) { print_r(">receive the message:".$msg->body."\n"); }; // Set no when consuming messages_ ack=true $channel->basic_consume('hello', '', false, true, false, false, $callback); print_r("consuming...\n"); while ($channel->is_consuming()) { $channel->wait(); } $channel->close(); $connect->close(); }
1.2 manual confirmation
Set basic_ Consumer() fourth argument $no_ack set to false indicates manual confirmation. If the consumer does not manually send an ack when receiving the message, RabbitMQ will resend it later.
- basic.ack is used for positive confirmation.
- basic.nack is used for negative acknowledgement and can reject or re queue multiple messages at a time. (if you need to re queue, $request = true)
- basic.reject is used for negative acknowledgement. You can reject or re queue a message. (if you need to re queue, $request = true)
/** * Send ack manually */ public function index() { // Connect and establish channels $connect = new AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin2021'); $channel = $connect->channel(); $channel->queue_declare('queue_name', false, false, false, false); // Consumption news $callback = function ($msg) { print_r(">receive the message:".$msg->body."\n"); $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; // Set no when consuming messages_ ack=false $channel->basic_consume('hello', '', false, false, false, false, $callback); print_r("consuming...\n"); while ($channel->is_consuming()) { $channel->wait(); } $channel->close(); $connect->close(); }
Official document of QoS (Quality Of Service assurance): www.rabbitmq.com/consumer-prefetch...
On the premise of manually confirming messages, QoS can set an integer value N, indicating that a consumer can only pull N messages at most once. If N messages are not processed, new messages will not be obtained from the queue until a message is ACK.
The function of setting QoS is to prevent consumers from pulling all messages from the queue at once, resulting in service crash or exception. On the one hand, this mechanism can realize the speed limit (temporarily storing the message in the RabbitMQ memory) and on the other hand, it can ensure the quality of message confirmation (such as confirming but handling exceptions).
2 publisher confirmation
confirm_select() enables the sending confirmation function of RabbitMQ and requires RabbitMQ to explicitly inform the publisher whether the message has been sent successfully.
When the message is delivered normally, the RabbitMQ client will call set asynchronously_ ack_ Handler () indicates that the message has been successfully delivered.
When an exception occurs in message delivery, set_nack_handler() will be called.
public function index() { $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin2021'); $channel = $connection->channel(); $channel->exchange_declare('normal_exchange', AMQPExchangeType::DIRECT, false, true, false); // Enable confirmation mode $channel->confirm_select(); // nack callback $channel->set_nack_handler(function (AMQPMessage $msg) { print_r("{$msg->getBody()},nack\n"); }); // ack callback $channel->set_ack_handler(function (AMQPMessage $msg) { print_r("{$msg->getBody()},ack\n"); }); // send message for ($i = 1; $i <= 10; $i++) { $message = new AMQPMessage("This is the second{$i}Message"); $channel->basic_publish($message, 'normal_exchange', 'route_key'); } // Wait for ack and nacks from the receiving server $channel->wait_for_pending_acks(); $channel->close(); $connection->close(); }
If the article helps you, don't forget to like it
This article was first published in LearnKu.com On the website.