Docker With RabbitMQ
Official Docker image warehouse address
Running RabbitMQ locally
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
Access the visualization panel
- Address: http://127.0.0.1:15672/
- Default account: guest
- Default password: guest
Spring Boot With RabbitMQ
Spring Boot integration RabbitMQ
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
Basic parameter configuration
# host & port spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672
Queue / Exchange / Routing configuration
/** * RabbitMQ To configure */ @Configuration public class RabbitMQConfig { private static final String EXCHANGE = "priority-exchange"; public static final String QUEUE = "priority-queue"; private static final String ROUTING_KEY = "priority.queue.#"; /** * Define priority queue */ @Bean Queue queue() { Map<String, Object> args= new HashMap<>(); args.put("x-max-priority", 100); return new Queue(QUEUE, false, false, false, args); } /** * Define exchanger */ @Bean TopicExchange exchange() { return new TopicExchange(EXCHANGE); } @Bean Binding binding(Queue queue, TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY); } }
For the definition of priority queue, please refer to the official document: https://www.rabbitmq.com/priority.html
After the Spring Boot application is started, the Queue and Exchange will be created automatically and bound to each other. The priority Queue will be identified as shown in the figure.
RabbitMQ Publisher
Spring Boot related configuration
# Whether to open the callback after sending the message to the Exchange spring.rabbitmq.publisher-confirms=false # Whether to enable the callback after the message is sent to the Queue spring.rabbitmq.publisher-returns=false # Message sending failure retry related configuration spring.rabbitmq.template.retry.enabled=true spring.rabbitmq.template.retry.initial-interval=3000ms spring.rabbitmq.template.retry.max-attempts=3 spring.rabbitmq.template.retry.max-interval=10000ms spring.rabbitmq.template.retry.multiplier=1
send message
@Component @AllArgsConstructor public class FileMessageSender { private static final String EXCHANGE = "priority-exchange"; private static final String ROUTING_KEY_PREFIX = "priority.queue."; private final RabbitTemplate rabbitTemplate; /** * Send message with priority set * * @param priority priority */ public void sendPriorityMessage(String content, Integer priority) { rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY_PREFIX + "test", content, message -> { message.getMessageProperties().setPriority(priority); return message; }); } }
RabbitMQ Consumer
Spring Boot related configuration
# Message receiving confirmation, optional modes: NONE, AUTO, MANUAL spring.rabbitmq.listener.simple.acknowledge-mode=AUTO # Minimum number of threads spring.rabbitmq.listener.simple.concurrency=10 # Maximum number of threads spring.rabbitmq.listener.simple.max-concurrency=10 # Maximum number of unacknowledged messages that may not be completed per consumer spring.rabbitmq.listener.simple.prefetch=1
If the consumer takes a long time to execute, it is recommended that spring.rabbitmq.listener.simple.prefetch be set to a smaller value, so that messages with higher priority can be added to the consumer thread faster.
Monitoring messages
@Slf4j @Component public class MessageListener { /** * Processing message */ @RabbitListener(queues = "priority-queue") public void listen(String message) { log.info(message); } }
Supplement
1. User defined callback for message sending confirmation
- The configuration is as follows:
# Trigger callback after message sent to Exchange is turned on spring.rabbitmq.publisher-confirms=true # Trigger a callback when a message is sent to a Queue spring.rabbitmq.publisher-returns=true
- Custom RabbitTemplate.ConfirmCallback implementation class
@Slf4j public class RabbitConfirmCallBack implements RabbitTemplate.ConfirmCallback{ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("Message unique ID: {}", correlationData); log.info("Acknowledgement state: {}", ack); log.info("Cause: {}", cause); } }
- Custom RabbitTemplate.ConfirmCallback implementation class
@Slf4j public class RabbitReturnCallback implements RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("Message subject: {}", message); log.info("Reply code: {}", replyCode); log.info("Reply content: {}", replyText); log.info("exchanger: {}", exchange); log.info("Routing key: {}", routingKey); } }
- Configure rabbitTemplate
@Component @AllArgsConstructor public class RabbitTemplateInitializingBean implements InitializingBean { private final RabbitTemplate rabbitTemplate; @Override public void afterPropertiesSet() { rabbitTemplate.setConfirmCallback(new RabbitConfirmCallBack()); rabbitTemplate.setReturnCallback(new RabbitReturnCallback()); } }
2. RabbitMQ Exchange type