SpringBoot+RabbitMQ learning notes Topic of three switches using RabbitMQ

I. Introduction

Topic Exchange 
Match the routing key to a pattern. At this time, the queue needs to be bound to a mode. The symbol "ා" matches one or more words, and the symbol "*" does not match many words. So "audit. *" can match "audit.irs.corporate", but "audit. *" only matches "audit.irs".

Business scenario:

1. The log server records three services: user service, commodity service and order service.

2. The log server has three Log Services: INFO log processing service, ERROR log processing service and full log processing service.

3. Use the Topic switch to process the logs. The matching rules are *. log.info, *. log.error and *. log. *.

II. Configuration file

Or create two projects, one as a producer and one as a consumer.

Producer configuration:




//Set switch name

Consumer configuration:




#Set switch name
#info queue name
#error queue name
#log queue name

III. create producers

1. Order service

package com.example.amqptopicprovider;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

 * Author:aijiaxiang
 * Date:2020/4/26
 * Description:Send message by simulation order service
public class OrderSender {

    private AmqpTemplate amqpTemplate;

    //exChange exchanger
    private String exChange;

     * How to send a message
     * @param msg
    public void send(String msg){
        //Send message to message queue
        //Parameter 1: queue name
        //Parameter 2: message
        // this.amqpTemplate.convertAndSend("hello-queue",msg);

        //Send message to message queue
        //Parameter 1: switch name
        //Parameter 2: route key
        //Parameter 3: message


2. Goods and services

package com.example.amqptopicprovider;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

 * Author:aijiaxiang
 * Date:2020/4/26
 * Description:Send message of simulated goods and services
public class ProductSender {

    private AmqpTemplate amqpTemplate;

    //exChange exchanger
    private String exChange;

     * How to send a message
     * @param msg
    public void send(String msg){
        //Send message to message queue
        //Parameter 1: queue name
        //Parameter 2: message
        // this.amqpTemplate.convertAndSend("hello-queue",msg);

        //Send message to message queue
        //Parameter 1: switch name
        //Parameter 2: route key
        //Parameter 3: message


3. User service

package com.example.amqptopicprovider;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

 * Author:aijiaxiang
 * Date:2020/4/26
 * Description:Send message as user service
public class UserSender {

    private AmqpTemplate amqpTemplate;

    //exChange exchanger
    private String exChange;

     * How to send a message
     * @param msg
    public void send(String msg){
        //Send message to message queue
        //Parameter 1: queue name
        //Parameter 2: message
        // this.amqpTemplate.convertAndSend("hello-queue",msg);

        //Send message to message queue
        //Parameter 1: switch name
        //Parameter 2: route key
        //Parameter 3: message


IV. create consumers

1.ERROR log processing service

package com.ant.amqptopicconsumer;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

 * Author:aijiaxiang
 * Date:2020/4/26
 * Description:Message Receiver 
 * @RabbitListener bindings:Bind queue
 * @QueueBinding  value: Name of binding queue
 *                  exchange: Configure switch
 * @Queue : value: Configure queue name
 *          autoDelete:Is it a temporary queue that can be deleted
 * @Exchange value:Name the switch
 *           type:Specify specific exchanger type
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.error}",autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
                key = "*.log.error"
public class TopicErrorReceiver {

     * Message receiving method adopts message queue monitoring mechanism
     * @param msg
    public void process(String msg){
        System.out.println("error-receiver: "+msg);

2.INFO log processing service

package com.ant.amqptopicconsumer;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

 * Author:aijiaxiang
 * Date:2020/4/26
 * Description:Message Receiver 
 * @RabbitListener bindings:Bind queue
 * @QueueBinding  value: Name of binding queue
 *                  exchange: Configure switch
 * @Queue : value: Configure queue name
 *          autoDelete:Is it a temporary queue that can be deleted
 * @Exchange value:Name the switch
 *           type:Specify specific exchanger type
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.info}",autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
                key = "*.log.info"
public class TopicInfoReceiver {

     * Message receiving method adopts message queue monitoring mechanism
     * @param msg
    public void process(String msg){
        System.out.println("info-receiver: "+msg);

3. Full log processing service

package com.ant.amqptopicconsumer;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

 * Author:aijiaxiang
 * Date:2020/4/26
 * Description:Message Receiver 
 * @RabbitListener bindings:Bind queue
 * @QueueBinding  value: Name of binding queue
 *                  exchange: Configure switch
 *                  key:Routing key
 * @Queue : value: Configure queue name
 *          autoDelete:Is it a temporary queue that can be deleted
 * @Exchange value:Name the switch
 *           type:Specify specific exchanger type
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.logs}",autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
                key = "*.log.*"
public class TopicLogReceiver {

     * Message receiving method adopts message queue monitoring mechanism
     * @param msg
    public void process(String msg){
        System.out.println("all-receiver: "+msg);

V. test of old rules

package com.example.amqp;

import com.example.ampq.Sender;
import com.example.amqptopicprovider.OrderSender;
import com.example.amqptopicprovider.ProductSender;
import com.example.amqptopicprovider.UserSender;
import com.example.helloworld.HelloworldApplication;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

 * Author:aijiaxiang
 * Date:2020/4/26
 * Description:
@SpringBootTest(classes = HelloworldApplication.class)
public class QueueTest {
    private Sender sender;

    private UserSender userSender;

    private ProductSender productSender;

    private OrderSender orderSender;

     * Test message queue
//    @Test
//    public void test1() throws InterruptedException {
//        while (true){
//            Thread.sleep(1000);
//            sender.send("hello");
//        }
//    }

    public void test2(){

Note: the routing key in the log service processing class is directly configured in the way of hard coding, mainly for the convenience of viewing at a glance. However, it is recommended to configure the routing key in the configuration file and use "${}" for reading.

Keywords: Java Spring RabbitMQ Junit Thymeleaf

Added by jholzy on Tue, 28 Apr 2020 19:52:58 +0300