Spring Boot RabbitMQ - priority queue

Docker With RabbitMQ

Official Docker image warehouse address

Running RabbitMQ locally

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Access the visualization panel

Spring Boot With RabbitMQ

Spring Boot integration RabbitMQ

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

Basic parameter configuration

# host & port
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672

Queue / Exchange / Routing configuration

/**
 * RabbitMQ To configure
 */
@Configuration
public class RabbitMQConfig {

    private static final String EXCHANGE = "priority-exchange";

    public static final String QUEUE = "priority-queue";

    private static final String ROUTING_KEY = "priority.queue.#";

    /**
     * Define priority queue
     */
    @Bean
    Queue queue() {
        Map<String, Object> args= new HashMap<>();
        args.put("x-max-priority", 100);
        return new Queue(QUEUE, false, false, false, args);
    }

    /**
     * Define exchanger
     */
    @Bean
    TopicExchange exchange() {
        return new TopicExchange(EXCHANGE);
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
    }

}

For the definition of priority queue, please refer to the official document: https://www.rabbitmq.com/priority.html

After the Spring Boot application is started, the Queue and Exchange will be created automatically and bound to each other. The priority Queue will be identified as shown in the figure.

RabbitMQ Publisher

Spring Boot related configuration

# Whether to open the callback after sending the message to the Exchange
spring.rabbitmq.publisher-confirms=false
# Whether to enable the callback after the message is sent to the Queue
spring.rabbitmq.publisher-returns=false
# Message sending failure retry related configuration
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=3000ms
spring.rabbitmq.template.retry.max-attempts=3
spring.rabbitmq.template.retry.max-interval=10000ms
spring.rabbitmq.template.retry.multiplier=1

send message

@Component
@AllArgsConstructor
public class FileMessageSender {

    private static final String EXCHANGE = "priority-exchange";

    private static final String ROUTING_KEY_PREFIX = "priority.queue.";

    private final RabbitTemplate rabbitTemplate;

    /**
     * Send message with priority set
     *
     * @param priority priority
     */
    public void sendPriorityMessage(String content, Integer priority) {
        rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY_PREFIX + "test", content,
                message -> {
                    message.getMessageProperties().setPriority(priority);
                    return message;
                });
    }

}

RabbitMQ Consumer

Spring Boot related configuration

# Message receiving confirmation, optional modes: NONE, AUTO, MANUAL
spring.rabbitmq.listener.simple.acknowledge-mode=AUTO
# Minimum number of threads
spring.rabbitmq.listener.simple.concurrency=10
# Maximum number of threads
spring.rabbitmq.listener.simple.max-concurrency=10
# Maximum number of unacknowledged messages that may not be completed per consumer
spring.rabbitmq.listener.simple.prefetch=1

If the consumer takes a long time to execute, it is recommended that spring.rabbitmq.listener.simple.prefetch be set to a smaller value, so that messages with higher priority can be added to the consumer thread faster.

Monitoring messages

@Slf4j
@Component
public class MessageListener {

    /**
     * Processing message
     */
    @RabbitListener(queues = "priority-queue")
    public void listen(String message) {
        log.info(message);
    }

}

Supplement

1. User defined callback for message sending confirmation

  • The configuration is as follows:
# Trigger callback after message sent to Exchange is turned on
spring.rabbitmq.publisher-confirms=true
# Trigger a callback when a message is sent to a Queue
spring.rabbitmq.publisher-returns=true
  • Custom RabbitTemplate.ConfirmCallback implementation class
@Slf4j
public class RabbitConfirmCallBack implements RabbitTemplate.ConfirmCallback{

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("Message unique ID: {}", correlationData);
        log.info("Acknowledgement state: {}", ack);
        log.info("Cause: {}", cause);
    }

}
  • Custom RabbitTemplate.ConfirmCallback implementation class
@Slf4j
public class RabbitReturnCallback implements RabbitTemplate.ReturnCallback {

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("Message subject: {}", message);
        log.info("Reply code: {}", replyCode);
        log.info("Reply content: {}", replyText);
        log.info("exchanger: {}", exchange);
        log.info("Routing key: {}", routingKey);
    }

}
  • Configure rabbitTemplate
@Component
@AllArgsConstructor
public class RabbitTemplateInitializingBean implements InitializingBean {

    private final RabbitTemplate rabbitTemplate;

    @Override
    public void afterPropertiesSet() {
        rabbitTemplate.setConfirmCallback(new RabbitConfirmCallBack());
        rabbitTemplate.setReturnCallback(new RabbitReturnCallback());
    }
    
}

2. RabbitMQ Exchange type

Keywords: Java RabbitMQ Spring Docker

Added by kovudalion on Sat, 02 Nov 2019 06:20:12 +0200