1 problem introduction
When using RabbitMQ, we may encounter such a problem that the producer does not know whether the message has arrived at the server after sending the message, which is a mystery to the producer. By default, the producer does not receive any response. So if we want to know where the news is going, what should we do? As a result, RabbitMQ's message confirmation mechanism came out.
Two mechanisms of message confirmation
2.1 transaction mechanism
Note that after sending a message, the transaction mechanism will block the sender until RabbitMQ responds to send the next message. Therefore, the performance of the transaction mechanism is poor, which will cause the low throughput of RabbitMQ.
2.2 confirm mechanism
Compared with transaction mechanism, confirm mechanism is more lightweight.
We need to note that the transaction mechanism and confirm mechanism are mutually exclusive and cannot have both. This blog will focus on the confirm mechanism.
3 code
Next, we use the confirm mechanism to confirm the message. Note that we use the manual ack to confirm whether the message is received correctly (the default is to use the automatic ACK). If we choose manual ack but do not send ack to RabbitMQ, it may lead to some serious consequences.
No explanation, just code first
3.1 application.properties
server.port: 8080 spring.application.name: provider spring.rabbitmq.host: 127.0.0.1 spring.rabbitmq.port: 5672 spring.rabbitmq.username: guest spring.rabbitmq.password: guest spring.rabbitmq.virtual-host: / # Turn on the confirm mechanism spring.rabbitmq.publisher-confirms: true # open return Acknowledgement mechanism spring.rabbitmq.publisher-returns: true # Manual response spring.rabbitmq.listener.simple.acknowledge-mode: manual # Specify the minimum number of consumers spring.rabbitmq.listener.simple.concurrency: 1 # Specify the maximum number of consumers spring.rabbitmq.listener.simple.max-concurrency: 1 # Whether retry is supported spring.rabbitmq.listener.simple.retry.enabled: true
3.2 configuration
package com.example.provider.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Direct connected switch * @author 30309 * */ @Configuration public class DirectRabbitConfig { //Queue named DirectQueue @Bean public Queue DirectQueue() { return new Queue("DirectQueue",true); //true indicates whether it is persistent } //Direct connect switch named DirectExchange @Bean DirectExchange DirectExchange() { return new DirectExchange("DirectExchange"); } //Bind queue to switch and set to match key: DirectRouting @Bean Binding bindingDirect() { return BindingBuilder.bind(DirectQueue()).to(DirectExchange()).with("DirectRouting"); } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); // Message sending failure returned to the queue, publisher returns needs to be configured for the profile: true rabbitTemplate.setMandatory(true); // Message return, the configuration file needs to configure publisher returns: true // The ReturnCallback interface is used to implement the callback when a message is sent to the RabbitMQ switch, but there is no corresponding queue bound to the switch. rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { System.out.println("Message sending failed:No corresponding queue bound to switch"); }); // Message confirmation, the configuration file needs to configure publisher confirms: true // The ConfirmCallback interface is used to receive ack callbacks after messages are sent to the RabbitMQ exchanger. rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { System.out.println("Message sent successfully:Message sent to RabbitMQ exchanger"); } else { System.out.println("Message sending failed:Message not sent to RabbitMQ exchanger"); } }); return rabbitTemplate; } }
3.3 producers
package com.example.provider.controller; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; /** * Producer * @author 30309 * */ @RestController public class SendMessageController{ @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/sendDirectMessage") public String sendDirectMessage() { //Send message with binding key value DirectRouting to switch DirectExchange rabbitTemplate.convertAndSend("DirectExchange", "DirectRouting", "Hello World"); return "ok"; } }
3.4 consumers
package com.example.consumer.receiver; import java.io.IOException; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import com.rabbitmq.client.Channel; /** * Consumer 1 * @author 30309 * */ @Component public class DirectReceiver1 { @RabbitListener(queues = "DirectQueue")//The name of the listening queue is DirectQueue @RabbitHandler public void process(String str,Channel channel, Message message) { System.out.println("DirectReceiver1 Consumer receives message: " + str ); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //Deny the news, be denied, then rejoin the team and be consumed again //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); //Reject the message, the message will be discarded and will not be returned to the queue //channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException e) { e.printStackTrace(); } } }
Note that when we use basicAck, we need to pass two parameters. The first parameter indicates the unique ID. when the second parameter is true, we can confirm all messages whose unique ID is less than or equal to the incoming value at one time (that is, batch processing manual confirmation is used to reduce network traffic).
Unique ID: after the consumer registers with RabbitMQ, a Channel will be established. When RabbitMQ pushes a message to the consumer, it will carry an ID, which represents the unique ID of the message delivered by RabbitMQ to the Channel, and it is a monotonically increasing positive integer. Note that the range of unique identity IDS is limited to channels.
3.5 test results
Producer:
Consumer:
Prove our test is successful
4 Summary
- A message will be removed from the message queue after it is correctly received by consumers. If a queue is not subscribed by any consumers, the messages in the queue will be cached all the time.
- If the message is correctly received through ACK, every message needs to be acknowledged. Among them, acknowledgement can be divided into manual ACK and automatic ack. Automatic ack mode is used by default.
- Automatic ack can confirm the message immediately after it is sent to the consumer, but there is also the possibility of losing the message (the consumer throws an exception). Manual ack means that the consumer calls ACK, nack and reject to confirm, and can perform some operations after the business fails.
- If the message is not acked, it will be sent to the next consumer. If a service forgets the ACK, RabbitMQ will think that the processing capacity of the service is limited and will not send data to it.