I. Introduction
Topic Exchange
Match the routing key to a pattern. At this time, the queue needs to be bound to a mode. The symbol "ා" matches one or more words, and the symbol "*" does not match many words. So "audit. *" can match "audit.irs.corporate", but "audit. *" only matches "audit.irs".
Business scenario:
1. The log server records three services: user service, commodity service and order service.
2. The log server has three Log Services: INFO log processing service, ERROR log processing service and full log processing service.
3. Use the Topic switch to process the logs. The matching rules are *. log.info, *. log.error and *. log. *.
II. Configuration file
Or create two projects, one as a producer and one as a consumer.
Producer configuration:
server.port=8883 spring.application.name=hello-world spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.thymeleaf.cache=false //Set switch name mq.config.exchange=log.topic
Consumer configuration:
server.port=8884 spring.application.name=lesson1 spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest #Set switch name mq.config.exchange=log.topic #info queue name mq.config.queue.info=log.info #error queue name mq.config.queue.error=log.error #log queue name mq.config.queue.logs=log.all
III. create producers
1. Order service
package com.example.amqptopicprovider; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** * Author:aijiaxiang * Date:2020/4/26 * Description:Send message by simulation order service */ @Component public class OrderSender { @Autowired private AmqpTemplate amqpTemplate; //exChange exchanger @Value("${mq.config.exchange}") private String exChange; /** * How to send a message * @param msg */ public void send(String msg){ //Send message to message queue //Parameter 1: queue name //Parameter 2: message // this.amqpTemplate.convertAndSend("hello-queue",msg); //Send message to message queue //Parameter 1: switch name //Parameter 2: route key //Parameter 3: message this.amqpTemplate.convertAndSend(exChange,"order.log.debug","order.log.debug-"+msg); this.amqpTemplate.convertAndSend(exChange,"order.log.info","order.log.info-"+msg); this.amqpTemplate.convertAndSend(exChange,"order.log.warn","order.log.warn-"+msg); this.amqpTemplate.convertAndSend(exChange,"order.log.error","order.log.error-"+msg); } }
2. Goods and services
package com.example.amqptopicprovider; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** * Author:aijiaxiang * Date:2020/4/26 * Description:Send message of simulated goods and services */ @Component public class ProductSender { @Autowired private AmqpTemplate amqpTemplate; //exChange exchanger @Value("${mq.config.exchange}") private String exChange; /** * How to send a message * @param msg */ public void send(String msg){ //Send message to message queue //Parameter 1: queue name //Parameter 2: message // this.amqpTemplate.convertAndSend("hello-queue",msg); //Send message to message queue //Parameter 1: switch name //Parameter 2: route key //Parameter 3: message this.amqpTemplate.convertAndSend(exChange,"product.log.debug","product.log.debug-"+msg); this.amqpTemplate.convertAndSend(exChange,"product.log.info","product.log.info-"+msg); this.amqpTemplate.convertAndSend(exChange,"product.log.warn","product.log.warn-"+msg); this.amqpTemplate.convertAndSend(exChange,"product.log.error","product.log.error-"+msg); } }
3. User service
package com.example.amqptopicprovider; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** * Author:aijiaxiang * Date:2020/4/26 * Description:Send message as user service */ @Component public class UserSender { @Autowired private AmqpTemplate amqpTemplate; //exChange exchanger @Value("${mq.config.exchange}") private String exChange; /** * How to send a message * @param msg */ public void send(String msg){ //Send message to message queue //Parameter 1: queue name //Parameter 2: message // this.amqpTemplate.convertAndSend("hello-queue",msg); //Send message to message queue //Parameter 1: switch name //Parameter 2: route key //Parameter 3: message this.amqpTemplate.convertAndSend(exChange,"user.log.debug","user.log.debug-"+msg); this.amqpTemplate.convertAndSend(exChange,"user.log.info","user.log.info-"+msg); this.amqpTemplate.convertAndSend(exChange,"user.log.warn","user.log.warn-"+msg); this.amqpTemplate.convertAndSend(exChange,"user.log.error","user.log.error-"+msg); } }
IV. create consumers
1.ERROR log processing service
package com.ant.amqptopicconsumer; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; /** * Author:aijiaxiang * Date:2020/4/26 * Description:Message Receiver * @RabbitListener bindings:Bind queue * @QueueBinding value: Name of binding queue * exchange: Configure switch * @Queue : value: Configure queue name * autoDelete:Is it a temporary queue that can be deleted * @Exchange value:Name the switch * type:Specify specific exchanger type */ @Component @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "${mq.config.queue.error}",autoDelete = "true"), exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC), key = "*.log.error" ) ) public class TopicErrorReceiver { /** * Message receiving method adopts message queue monitoring mechanism * @param msg */ @RabbitHandler public void process(String msg){ System.out.println("error-receiver: "+msg); } }
2.INFO log processing service
package com.ant.amqptopicconsumer; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; /** * Author:aijiaxiang * Date:2020/4/26 * Description:Message Receiver * @RabbitListener bindings:Bind queue * @QueueBinding value: Name of binding queue * exchange: Configure switch * @Queue : value: Configure queue name * autoDelete:Is it a temporary queue that can be deleted * @Exchange value:Name the switch * type:Specify specific exchanger type */ @Component @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "${mq.config.queue.info}",autoDelete = "true"), exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC), key = "*.log.info" ) ) public class TopicInfoReceiver { /** * Message receiving method adopts message queue monitoring mechanism * @param msg */ @RabbitHandler public void process(String msg){ System.out.println("info-receiver: "+msg); } }
3. Full log processing service
package com.ant.amqptopicconsumer; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; /** * Author:aijiaxiang * Date:2020/4/26 * Description:Message Receiver * * @RabbitListener bindings:Bind queue * @QueueBinding value: Name of binding queue * exchange: Configure switch * key:Routing key * @Queue : value: Configure queue name * autoDelete:Is it a temporary queue that can be deleted * @Exchange value:Name the switch * type:Specify specific exchanger type */ @Component @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "${mq.config.queue.logs}",autoDelete = "true"), exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC), key = "*.log.*" ) ) public class TopicLogReceiver { /** * Message receiving method adopts message queue monitoring mechanism * @param msg */ @RabbitHandler public void process(String msg){ System.out.println("all-receiver: "+msg); } }
V. test of old rules
package com.example.amqp; import com.example.ampq.Sender; import com.example.amqptopicprovider.OrderSender; import com.example.amqptopicprovider.ProductSender; import com.example.amqptopicprovider.UserSender; import com.example.helloworld.HelloworldApplication; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; /** * Author:aijiaxiang * Date:2020/4/26 * Description: */ @RunWith(SpringRunner.class) @SpringBootTest(classes = HelloworldApplication.class) public class QueueTest { @Autowired private Sender sender; @Autowired private UserSender userSender; @Autowired private ProductSender productSender; @Autowired private OrderSender orderSender; /** * Test message queue */ // @Test // public void test1() throws InterruptedException { // while (true){ // Thread.sleep(1000); // sender.send("hello"); // } // // } @Test public void test2(){ userSender.send("usersend"); productSender.send("productsend"); orderSender.send("ordersend"); } }
Note: the routing key in the log service processing class is directly configured in the way of hard coding, mainly for the convenience of viewing at a glance. However, it is recommended to configure the routing key in the configuration file and use "${}" for reading.