SpringBook Integration AMQP (RabbitMQ)

SpringBook Integration AMQP (RabbitMQ)

  1. Adding pom dependencies

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
  2. application.properties configuration

    spring.rabbitmq.host=***.***.***.***
    spring.rabbitmq.port=5762
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=***
  3. All messages submitted by message producers in RabbitMQ are redistributed by Exchange, which distributes messages to different Queue s according to different strategies. RabbitMQ provides four different strategies: Direct, Fanout, Topic and Header. The first three strategies have higher utilization rate.

    • Direct

      The routing strategy of DirectExchange is to bind the message queue to a DirectExchange, but when a message arrives at DirectExchange, it is forwarded to the same Queue as the routing key of the message.

      • DirectExchange is configured as follows:

        /**
         * @author wsyjlly
         * @create 2019.07.17 - 21:33
         **/
        @Configuration
        public class RabbitDirectConfig {
            public final static String DIRECTNAME = "ysw-direct";
            @Bean
            Queue queue1(){
                return new Queue("queue-direct1");
            }
        
            @Bean
        
            Queue queue2(){
                return new Queue("queue-direct2");
            }
        
            @Bean
            Queue queue3(){
                return new Queue("queue-direct3");
            }
        
            @Bean
            DirectExchange directExchange(){
                return new DirectExchange(DIRECTNAME,true,false);
            }
        
            @Bean
            Binding binding1(){
                return BindingBuilder.bind(queue1()).to(directExchange()).with("direct1");
            }
        
            @Bean
            Binding binding2(){
                return BindingBuilder.bind(queue2()).to(directExchange()).with("direct2");
            }
        }

        The configuration of DirectExchange and Binding beans can be omitted, that is, if you use DirectExchange, you can configure an instance of Queue.

      • Configuration of consumers

        /**
         * @author wsyjlly
         * @create 2019.07.17 - 21:42
         **/
        @Component
        public class DirectReceiver {
            Logger logger= LoggerFactory.getLogger(getClass());
        
            @RabbitListener(queues = "queue-direct1")
            public void directHandler1(String msg){
                logger.info("\033[30;4m"+"queue-direct1:"+msg+"\033[0m");
            }
        
            @RabbitListener(queues = "queue-direct2")
            public void directHandler2(String msg){
                logger.info("\033[30;4m"+"queue-direct2:"+msg+"\033[0m");
            }
        
            @RabbitListener(queues = "queue-direct3")
            public void directHandler3(String msg){
                logger.info("\033[30;4m"+"queue-direct3:"+msg+"\033[0m");
            }
        }

        The @RabbitListener annotation specifies that a method is a consumer method whose parameters are the messages received.

      • message sending

        By injecting the RabbitTemplate object to send messages, here I make it send by itself through a timed task, which needs to be turned on. Detailed operations can be seen in the section.

        /**
         * @author wsyjlly
         * @create 2019.07.18 - 1:13
         **/
        @Component
        public class RabbitmqSchedule {
            @Autowired
            RabbitTemplate rabbitTemplate;
            Logger logger = LoggerFactory.getLogger(getClass());
        
            @Scheduled(fixedDelay = 5000,initialDelay = 3000)
            public void direct(){
                String message = "direct-task";
                logger.info("\033[30;4m"+message+"\033[0m");
                rabbitTemplate.convertAndSend("ysw-direct","direct1",message);
                rabbitTemplate.convertAndSend("ysw-direct","direct2",message);
                rabbitTemplate.convertAndSend("queue-direct3",message);
            }
        }
    • Fanout

      FanoutExchange's data exchange strategy is to forward all messages arriving at FanoutExchange to all Queue s bound to him, in which routingkey will not work.

      • FanoutExchange is configured as follows:

        /**
         * @author wsyjlly
         * @create 2019.07.17 - 21:33
         **/
        @Configuration
        public class RabbitFanoutConfig {
            public final static String FANOUTNAME = "ysw-fanout";
        
            @Bean
            Queue queue4(){
                return new Queue("queue-fanout1");
            }
        
            @Bean
            Queue queue5(){
                return new Queue("queue-fanout2");
            }
        
            @Bean
            Queue queue6(){
                return new Queue("queue-fanout3");
            }
        
            @Bean
            FanoutExchange fanoutExchange(){
                return new FanoutExchange(FANOUTNAME,true,false);
            }
        
            @Bean
            Binding binding4(){
                return BindingBuilder.bind(queue4()).to(fanoutExchange());
            }
        
            @Bean
            Binding binding5(){
                return BindingBuilder.bind(queue5()).to(fanoutExchange());
            }
        
            @Bean
            Binding binding6(){
                return BindingBuilder.bind(queue6()).to(fanoutExchange());
            }
        }
      • Configuration of consumers

        /**
         * @author wsyjlly
         * @create 2019.07.17 - 21:42
         **/
        @Component
        public class FanoutReceiver {
            Logger logger= LoggerFactory.getLogger(getClass());
        
            @RabbitListener(queues = "queue-fanout1")
            public void fanoutHandler1(String msg){
                logger.info("\033[31;4m"+"queue-fanout1:"+msg+"\033[0m");
            }
        
            @RabbitListener(queues = "queue-fanout2")
            public void fanoutHandler2(String msg){
                logger.info("\033[31;4m"+"queue-fanout2:"+msg+"\033[0m");
            }
        
            @RabbitListener(queues = "queue-fanout3")
            public void fanoutHandler3(String msg){
                logger.info("\033[31;4m"+"queue-fanout3:"+msg+"\033[0m");
            }
        }
      • message sending

        /**
         * @author wsyjlly
         * @create 2019.07.18 - 1:13
         **/
        @Component
        public class RabbitmqSchedule {
            @Autowired
            RabbitTemplate rabbitTemplate;
            Logger logger = LoggerFactory.getLogger(getClass());
        
            Scheduled(fixedDelay = 5000,initialDelay = 4000)
            public void fanout(){
                String message = "fanout-task";
                logger.info("\033[31;4m"+message+"\033[0m");
                rabbitTemplate.convertAndSend("ysw-fanout",null,message);
            }
        }
    • Topic

      TopicExchange is a complex and flexible routing strategy. In TopicExchange, Queue is bound to TopicExchange by routing key. When a message is sent to TopicExchange, TopicExchange routes the message to one or more Queues according to the routing key of the message.

      • TopicExchange is configured as follows:

        /**
         * @author wsyjlly
         * @create 2019.07.17 - 21:33
         **/
        @Configuration
        public class RabbitTopicConfig {
            public final static String TOPIC_NAME = "ysw-topic";
            @Bean
            Queue queue7(){
                return new Queue("queue-topic1");
            }
        
            @Bean
            Queue queue8(){
                return new Queue("queue-topic2");
            }
        
            @Bean
            Queue queue9(){
                return new Queue("queue-topic3");
            }
        
            @Bean
            TopicExchange topicExchange(){
                return new TopicExchange(TOPIC_NAME,true,false);
            }
        
            @Bean
            Binding binding7(){
                 /*
                 * Matching rules
                 * The binding key must also be in this form. Messages sent with specific routing keys are sent to queues with all the binding keys matching them. But there are two special cases of binding keys:
                 * The binding key must also be in this form. Messages sent with specific routing keys are sent to queues with all the binding keys matching them. But there are two special cases of binding keys:
                 * ①*(Asterisk) stands for only one word
                 * ②#(Well number) stands for any word.
                 **/
                return BindingBuilder.bind(queue7()).to(topicExchange()).with("#.topic1");
            }
        
            @Bean
            Binding binding8(){
                return BindingBuilder.bind(queue8()).to(topicExchange()).with("topic2.#");
            }
        
            @Bean
            Binding binding9(){
                return BindingBuilder.bind(queue9()).to(topicExchange()).with("#.topic3.*");
            }
        }
      • Configuration of consumers

        /**
         * @author wsyjlly
         * @create 2019.07.17 - 21:42
         **/
        @Component
        public class TopicReceiver {
            Logger logger= LoggerFactory.getLogger(getClass());
        
            @RabbitListener(queues = "queue-topic1")
            public void topicHandler1(String msg){
                logger.info("\033[32;4m"+"queue-topic1:"+msg+"\033[0m");
            }
        
            @RabbitListener(queues = "queue-topic2")
            public void topicHandler2(String msg){
                logger.info("\033[32;4m"+"queue-topic2:"+msg+"\033[0m");
            }
        
            @RabbitListener(queues = "queue-topic3")
            public void topicHandler3(String msg){
                logger.info("\033[32;4m"+"queue-topic3:"+msg+"\033[0m");
            }
        }
      • message sending

        /**
         * @author wsyjlly
         * @create 2019.07.18 - 1:13
         **/
        @Component
        public class RabbitmqSchedule {
            @Autowired
            RabbitTemplate rabbitTemplate;
            Logger logger = LoggerFactory.getLogger(getClass());
        
            @Scheduled(cron = "0-30/6 * * * * ?")
            public void topic(){
                String message = "topic-task";
                int i = 0;
                logger.info("\033[32;4m"+message+"\033[0m");
                rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME,
                "topic1.news",message + 1);//topic1
                rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME,
                "topic1.salary",message + 2);//topic1
                rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME,
                "topic2.news",message + 3);//topic2
                rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME,
                "topic2.item",message + 4);//topic2
                rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME,
                "topic2.sth.topic1",message + 5);//topic2&topic1
                rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME,
                "topic1.sth.topic2",message + 6);//topic2&topic1
                rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME,
                "topic3",message + 7);//topic3
                rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME,
                "topic3.news",message + 8);//topic3
                rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME,
                "topic1.topic3",message + 9); //topic1&topic3
                rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME,
                "topic2.topic3",message + 10);//topic2&topic3
                rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME,
                "topic3.topic1",message + 11);//topic3&topic1
                rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME,
                "topic2.topic3.topic1",message + 12);//topic1&topic2&topic3
            }
        }
    • Header

      HeaderExchange is a less commonly used routing strategy. HeaderExchange routes messages to different Queue s based on the Header of the message, which is also independent of routing key.

      • HeaderExchange is configured as follows:

        /**
         * @author wsyjlly
         * @create 2019.07.17 - 21:33
         **/
        @Configuration
        public class RabbitHeaderConfig {
            public final static String HEADER_NAME = "ysw-header";
            @Bean
            Queue queue10(){
                return new Queue("queue-header1");
            }
        
            @Bean
        
            Queue queue11(){
                return new Queue("queue-header2");
            }
        
            @Bean
            Queue queue12(){
                return new Queue("queue-header3");
            }
        
            @Bean
            HeadersExchange headersExchange(){
                return new HeadersExchange(HEADER_NAME,true,false);
            }
        
            @Bean
            Binding binding10(){
                Map<String,Object> map = new HashMap<>();
                map.put("age", "18");
                map.put("name", "ysw");
                return BindingBuilder.bind(queue10()).to(headersExchange()).whereAny(map).match();
            }
        
            @Bean
            Binding binding11(){
                Map<String,Object> map = new HashMap<>();
                map.put("name", "ysw");
                return BindingBuilder.bind(queue11()).to(headersExchange()).where("age").exists();
            }
        
            @Bean
            Binding binding12(){
                Map<String,Object> map = new HashMap<>();
                map.put("age", "18");
                map.put("name", "ysw");
                return BindingBuilder.bind(queue12()).to(headersExchange()).whereAll(map).match();
            }
        }

        Binding configuration annotation: whereAny indicates that whenever a header matches the key/value in the map, the message is routed to a Queue named "queue-header 1"; where All means that all the headers of the message match before the message is routed to a Queue named "queue-header 2"; The header of the message contains age, and the message is routed to a Queue named "queue-header 2" regardless of the age value.

      • Configuration of consumers

        /**
         * @author wsyjlly
         * @create 2019.07.17 - 21:42
         **/
        @Component
        public class HeaderReceiver {
            Logger logger= LoggerFactory.getLogger(getClass());
        
            @RabbitListener(queues = "queue-header1")
            public void headerHandler1(byte[] msg){
                logger.info("\033[33;4m"+"queue-header1:"+new String(msg,0,msg.length)+"\033[0m");
            }
        
            @RabbitListener(queues = "queue-header2")
            public void headerHandler2(byte[] msg){
                logger.info("\033[33;4m"+"queue-header2:"+new String(msg,0,msg.length)+"\033[0m");
            }
        
            @RabbitListener(queues = "queue-header3")
            public void headerHandler3(byte[] msg){
                logger.info("\033[33;4m"+"queue-header3:"+new String(msg,0,msg.length)+"\033[0m");
            }
        }
      • message sending

        /**
         * @author wsyjlly
         * @create 2019.07.18 - 1:13
         **/
        @Component
        public class RabbitmqSchedule {
            @Autowired
            RabbitTemplate rabbitTemplate;
            Logger logger = LoggerFactory.getLogger(getClass());
        
            @Scheduled(cron = "0-30/3 * * * * ?")
            public void header(){
                String message = "header-task";
                logger.info("\033[33;4m"+message+"\033[0m");
        
                Message message1 = MessageBuilder.withBody("name=name".getBytes())
                .setHeader("name", "aaa").build();
                Message message2 = MessageBuilder.withBody("name=ysw".getBytes())
                .setHeader("name", "ysw").build();
                Message message3 = MessageBuilder.withBody("age=19".getBytes())
                .setHeader("age", "19").build();
                Message message4 = MessageBuilder.withBody("age=18".getBytes())
                .setHeader("age", "18").build();
                Message message5 = MessageBuilder.withBody("name=ysw&age=18".getBytes())
                .setHeader("name", "ysw").setHeader("age","18").build();
                Message message6 = MessageBuilder.withBody("name=ysw&age=19".getBytes())
                .setHeader("name", "ysw").setHeader("age","19").build();
                Message message7 = MessageBuilder.withBody("name=aaa&age=18".getBytes())
                .setHeader("name", "aaa").setHeader("age","18").build();
        
                rabbitTemplate.convertAndSend(RabbitHeaderConfig.HEADER_NAME,
                null,message1);
                rabbitTemplate.convertAndSend(RabbitHeaderConfig.HEADER_NAME,
                null,message2);
                rabbitTemplate.convertAndSend(RabbitHeaderConfig.HEADER_NAME,
                null,message3);
                rabbitTemplate.convertAndSend(RabbitHeaderConfig.HEADER_NAME,
                null,message4);
                rabbitTemplate.convertAndSend(RabbitHeaderConfig.HEADER_NAME,
                null,message5);
                rabbitTemplate.convertAndSend(RabbitHeaderConfig.HEADER_NAME,
                null,message6);
                rabbitTemplate.convertAndSend(RabbitHeaderConfig.HEADER_NAME,
                null,message7);
            }
        }

Keywords: Java RabbitMQ Spring Asterisk less

Added by Knutty on Tue, 13 Aug 2019 08:32:19 +0300