SpringBook Integration AMQP (RabbitMQ)
-
Adding pom dependencies
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
application.properties configuration
spring.rabbitmq.host=***.***.***.*** spring.rabbitmq.port=5762 spring.rabbitmq.username=admin spring.rabbitmq.password=***
-
All messages submitted by message producers in RabbitMQ are redistributed by Exchange, which distributes messages to different Queue s according to different strategies. RabbitMQ provides four different strategies: Direct, Fanout, Topic and Header. The first three strategies have higher utilization rate.
-
Direct
The routing strategy of DirectExchange is to bind the message queue to a DirectExchange, but when a message arrives at DirectExchange, it is forwarded to the same Queue as the routing key of the message.
-
DirectExchange is configured as follows:
/** * @author wsyjlly * @create 2019.07.17 - 21:33 **/ @Configuration public class RabbitDirectConfig { public final static String DIRECTNAME = "ysw-direct"; @Bean Queue queue1(){ return new Queue("queue-direct1"); } @Bean Queue queue2(){ return new Queue("queue-direct2"); } @Bean Queue queue3(){ return new Queue("queue-direct3"); } @Bean DirectExchange directExchange(){ return new DirectExchange(DIRECTNAME,true,false); } @Bean Binding binding1(){ return BindingBuilder.bind(queue1()).to(directExchange()).with("direct1"); } @Bean Binding binding2(){ return BindingBuilder.bind(queue2()).to(directExchange()).with("direct2"); } }
The configuration of DirectExchange and Binding beans can be omitted, that is, if you use DirectExchange, you can configure an instance of Queue.
-
Configuration of consumers
/** * @author wsyjlly * @create 2019.07.17 - 21:42 **/ @Component public class DirectReceiver { Logger logger= LoggerFactory.getLogger(getClass()); @RabbitListener(queues = "queue-direct1") public void directHandler1(String msg){ logger.info("\033[30;4m"+"queue-direct1:"+msg+"\033[0m"); } @RabbitListener(queues = "queue-direct2") public void directHandler2(String msg){ logger.info("\033[30;4m"+"queue-direct2:"+msg+"\033[0m"); } @RabbitListener(queues = "queue-direct3") public void directHandler3(String msg){ logger.info("\033[30;4m"+"queue-direct3:"+msg+"\033[0m"); } }
The @RabbitListener annotation specifies that a method is a consumer method whose parameters are the messages received.
-
message sending
By injecting the RabbitTemplate object to send messages, here I make it send by itself through a timed task, which needs to be turned on. Detailed operations can be seen in the section.
/** * @author wsyjlly * @create 2019.07.18 - 1:13 **/ @Component public class RabbitmqSchedule { @Autowired RabbitTemplate rabbitTemplate; Logger logger = LoggerFactory.getLogger(getClass()); @Scheduled(fixedDelay = 5000,initialDelay = 3000) public void direct(){ String message = "direct-task"; logger.info("\033[30;4m"+message+"\033[0m"); rabbitTemplate.convertAndSend("ysw-direct","direct1",message); rabbitTemplate.convertAndSend("ysw-direct","direct2",message); rabbitTemplate.convertAndSend("queue-direct3",message); } }
-
-
Fanout
FanoutExchange's data exchange strategy is to forward all messages arriving at FanoutExchange to all Queue s bound to him, in which routingkey will not work.
-
FanoutExchange is configured as follows:
/** * @author wsyjlly * @create 2019.07.17 - 21:33 **/ @Configuration public class RabbitFanoutConfig { public final static String FANOUTNAME = "ysw-fanout"; @Bean Queue queue4(){ return new Queue("queue-fanout1"); } @Bean Queue queue5(){ return new Queue("queue-fanout2"); } @Bean Queue queue6(){ return new Queue("queue-fanout3"); } @Bean FanoutExchange fanoutExchange(){ return new FanoutExchange(FANOUTNAME,true,false); } @Bean Binding binding4(){ return BindingBuilder.bind(queue4()).to(fanoutExchange()); } @Bean Binding binding5(){ return BindingBuilder.bind(queue5()).to(fanoutExchange()); } @Bean Binding binding6(){ return BindingBuilder.bind(queue6()).to(fanoutExchange()); } }
-
Configuration of consumers
/** * @author wsyjlly * @create 2019.07.17 - 21:42 **/ @Component public class FanoutReceiver { Logger logger= LoggerFactory.getLogger(getClass()); @RabbitListener(queues = "queue-fanout1") public void fanoutHandler1(String msg){ logger.info("\033[31;4m"+"queue-fanout1:"+msg+"\033[0m"); } @RabbitListener(queues = "queue-fanout2") public void fanoutHandler2(String msg){ logger.info("\033[31;4m"+"queue-fanout2:"+msg+"\033[0m"); } @RabbitListener(queues = "queue-fanout3") public void fanoutHandler3(String msg){ logger.info("\033[31;4m"+"queue-fanout3:"+msg+"\033[0m"); } }
-
message sending
/** * @author wsyjlly * @create 2019.07.18 - 1:13 **/ @Component public class RabbitmqSchedule { @Autowired RabbitTemplate rabbitTemplate; Logger logger = LoggerFactory.getLogger(getClass()); Scheduled(fixedDelay = 5000,initialDelay = 4000) public void fanout(){ String message = "fanout-task"; logger.info("\033[31;4m"+message+"\033[0m"); rabbitTemplate.convertAndSend("ysw-fanout",null,message); } }
-
-
Topic
TopicExchange is a complex and flexible routing strategy. In TopicExchange, Queue is bound to TopicExchange by routing key. When a message is sent to TopicExchange, TopicExchange routes the message to one or more Queues according to the routing key of the message.
-
TopicExchange is configured as follows:
/** * @author wsyjlly * @create 2019.07.17 - 21:33 **/ @Configuration public class RabbitTopicConfig { public final static String TOPIC_NAME = "ysw-topic"; @Bean Queue queue7(){ return new Queue("queue-topic1"); } @Bean Queue queue8(){ return new Queue("queue-topic2"); } @Bean Queue queue9(){ return new Queue("queue-topic3"); } @Bean TopicExchange topicExchange(){ return new TopicExchange(TOPIC_NAME,true,false); } @Bean Binding binding7(){ /* * Matching rules * The binding key must also be in this form. Messages sent with specific routing keys are sent to queues with all the binding keys matching them. But there are two special cases of binding keys: * The binding key must also be in this form. Messages sent with specific routing keys are sent to queues with all the binding keys matching them. But there are two special cases of binding keys: * ①*(Asterisk) stands for only one word * ②#(Well number) stands for any word. **/ return BindingBuilder.bind(queue7()).to(topicExchange()).with("#.topic1"); } @Bean Binding binding8(){ return BindingBuilder.bind(queue8()).to(topicExchange()).with("topic2.#"); } @Bean Binding binding9(){ return BindingBuilder.bind(queue9()).to(topicExchange()).with("#.topic3.*"); } }
-
Configuration of consumers
/** * @author wsyjlly * @create 2019.07.17 - 21:42 **/ @Component public class TopicReceiver { Logger logger= LoggerFactory.getLogger(getClass()); @RabbitListener(queues = "queue-topic1") public void topicHandler1(String msg){ logger.info("\033[32;4m"+"queue-topic1:"+msg+"\033[0m"); } @RabbitListener(queues = "queue-topic2") public void topicHandler2(String msg){ logger.info("\033[32;4m"+"queue-topic2:"+msg+"\033[0m"); } @RabbitListener(queues = "queue-topic3") public void topicHandler3(String msg){ logger.info("\033[32;4m"+"queue-topic3:"+msg+"\033[0m"); } }
-
message sending
/** * @author wsyjlly * @create 2019.07.18 - 1:13 **/ @Component public class RabbitmqSchedule { @Autowired RabbitTemplate rabbitTemplate; Logger logger = LoggerFactory.getLogger(getClass()); @Scheduled(cron = "0-30/6 * * * * ?") public void topic(){ String message = "topic-task"; int i = 0; logger.info("\033[32;4m"+message+"\033[0m"); rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME, "topic1.news",message + 1);//topic1 rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME, "topic1.salary",message + 2);//topic1 rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME, "topic2.news",message + 3);//topic2 rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME, "topic2.item",message + 4);//topic2 rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME, "topic2.sth.topic1",message + 5);//topic2&topic1 rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME, "topic1.sth.topic2",message + 6);//topic2&topic1 rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME, "topic3",message + 7);//topic3 rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME, "topic3.news",message + 8);//topic3 rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME, "topic1.topic3",message + 9); //topic1&topic3 rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME, "topic2.topic3",message + 10);//topic2&topic3 rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME, "topic3.topic1",message + 11);//topic3&topic1 rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_NAME, "topic2.topic3.topic1",message + 12);//topic1&topic2&topic3 } }
-
-
Header
HeaderExchange is a less commonly used routing strategy. HeaderExchange routes messages to different Queue s based on the Header of the message, which is also independent of routing key.
-
HeaderExchange is configured as follows:
/** * @author wsyjlly * @create 2019.07.17 - 21:33 **/ @Configuration public class RabbitHeaderConfig { public final static String HEADER_NAME = "ysw-header"; @Bean Queue queue10(){ return new Queue("queue-header1"); } @Bean Queue queue11(){ return new Queue("queue-header2"); } @Bean Queue queue12(){ return new Queue("queue-header3"); } @Bean HeadersExchange headersExchange(){ return new HeadersExchange(HEADER_NAME,true,false); } @Bean Binding binding10(){ Map<String,Object> map = new HashMap<>(); map.put("age", "18"); map.put("name", "ysw"); return BindingBuilder.bind(queue10()).to(headersExchange()).whereAny(map).match(); } @Bean Binding binding11(){ Map<String,Object> map = new HashMap<>(); map.put("name", "ysw"); return BindingBuilder.bind(queue11()).to(headersExchange()).where("age").exists(); } @Bean Binding binding12(){ Map<String,Object> map = new HashMap<>(); map.put("age", "18"); map.put("name", "ysw"); return BindingBuilder.bind(queue12()).to(headersExchange()).whereAll(map).match(); } }
Binding configuration annotation: whereAny indicates that whenever a header matches the key/value in the map, the message is routed to a Queue named "queue-header 1"; where All means that all the headers of the message match before the message is routed to a Queue named "queue-header 2"; The header of the message contains age, and the message is routed to a Queue named "queue-header 2" regardless of the age value.
-
Configuration of consumers
/** * @author wsyjlly * @create 2019.07.17 - 21:42 **/ @Component public class HeaderReceiver { Logger logger= LoggerFactory.getLogger(getClass()); @RabbitListener(queues = "queue-header1") public void headerHandler1(byte[] msg){ logger.info("\033[33;4m"+"queue-header1:"+new String(msg,0,msg.length)+"\033[0m"); } @RabbitListener(queues = "queue-header2") public void headerHandler2(byte[] msg){ logger.info("\033[33;4m"+"queue-header2:"+new String(msg,0,msg.length)+"\033[0m"); } @RabbitListener(queues = "queue-header3") public void headerHandler3(byte[] msg){ logger.info("\033[33;4m"+"queue-header3:"+new String(msg,0,msg.length)+"\033[0m"); } }
-
message sending
/** * @author wsyjlly * @create 2019.07.18 - 1:13 **/ @Component public class RabbitmqSchedule { @Autowired RabbitTemplate rabbitTemplate; Logger logger = LoggerFactory.getLogger(getClass()); @Scheduled(cron = "0-30/3 * * * * ?") public void header(){ String message = "header-task"; logger.info("\033[33;4m"+message+"\033[0m"); Message message1 = MessageBuilder.withBody("name=name".getBytes()) .setHeader("name", "aaa").build(); Message message2 = MessageBuilder.withBody("name=ysw".getBytes()) .setHeader("name", "ysw").build(); Message message3 = MessageBuilder.withBody("age=19".getBytes()) .setHeader("age", "19").build(); Message message4 = MessageBuilder.withBody("age=18".getBytes()) .setHeader("age", "18").build(); Message message5 = MessageBuilder.withBody("name=ysw&age=18".getBytes()) .setHeader("name", "ysw").setHeader("age","18").build(); Message message6 = MessageBuilder.withBody("name=ysw&age=19".getBytes()) .setHeader("name", "ysw").setHeader("age","19").build(); Message message7 = MessageBuilder.withBody("name=aaa&age=18".getBytes()) .setHeader("name", "aaa").setHeader("age","18").build(); rabbitTemplate.convertAndSend(RabbitHeaderConfig.HEADER_NAME, null,message1); rabbitTemplate.convertAndSend(RabbitHeaderConfig.HEADER_NAME, null,message2); rabbitTemplate.convertAndSend(RabbitHeaderConfig.HEADER_NAME, null,message3); rabbitTemplate.convertAndSend(RabbitHeaderConfig.HEADER_NAME, null,message4); rabbitTemplate.convertAndSend(RabbitHeaderConfig.HEADER_NAME, null,message5); rabbitTemplate.convertAndSend(RabbitHeaderConfig.HEADER_NAME, null,message6); rabbitTemplate.convertAndSend(RabbitHeaderConfig.HEADER_NAME, null,message7); } }
-
-