SpringBoot+RabbitMQ learning notes Topic of three switches using RabbitMQ

I. Introduction

Topic Exchange 
Match the routing key to a pattern. At this time, the queue needs to be bound to a mode. The symbol "ා" matches one or more words, and the symbol "*" does not match many words. So "audit. *" can match "audit.irs.corporate", but "audit. *" only matches "audit.irs".

Business scenario:

1. The log server records three services: user service, commodity service and order service.

2. The log server has three Log Services: INFO log processing service, ERROR log processing service and full log processing service.

3. Use the Topic switch to process the logs. The matching rules are *. log.info, *. log.error and *. log. *.

II. Configuration file

Or create two projects, one as a producer and one as a consumer.

Producer configuration:

server.port=8883

spring.application.name=hello-world

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.thymeleaf.cache=false

//Set switch name
mq.config.exchange=log.topic

Consumer configuration:

server.port=8884

spring.application.name=lesson1

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

#Set switch name
mq.config.exchange=log.topic
#info queue name
mq.config.queue.info=log.info
#error queue name
mq.config.queue.error=log.error
#log queue name
mq.config.queue.logs=log.all

III. create producers

1. Order service

package com.example.amqptopicprovider;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * Author:aijiaxiang
 * Date:2020/4/26
 * Description:Send message by simulation order service
 */
@Component
public class OrderSender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    //exChange exchanger
    @Value("${mq.config.exchange}")
    private String exChange;

    /**
     * How to send a message
     * @param msg
     */
    public void send(String msg){
        //Send message to message queue
        //Parameter 1: queue name
        //Parameter 2: message
        // this.amqpTemplate.convertAndSend("hello-queue",msg);

        //Send message to message queue
        //Parameter 1: switch name
        //Parameter 2: route key
        //Parameter 3: message
        this.amqpTemplate.convertAndSend(exChange,"order.log.debug","order.log.debug-"+msg);
        this.amqpTemplate.convertAndSend(exChange,"order.log.info","order.log.info-"+msg);
        this.amqpTemplate.convertAndSend(exChange,"order.log.warn","order.log.warn-"+msg);
        this.amqpTemplate.convertAndSend(exChange,"order.log.error","order.log.error-"+msg);

    }
}

2. Goods and services

package com.example.amqptopicprovider;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * Author:aijiaxiang
 * Date:2020/4/26
 * Description:Send message of simulated goods and services
 */
@Component
public class ProductSender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    //exChange exchanger
    @Value("${mq.config.exchange}")
    private String exChange;

    /**
     * How to send a message
     * @param msg
     */
    public void send(String msg){
        //Send message to message queue
        //Parameter 1: queue name
        //Parameter 2: message
        // this.amqpTemplate.convertAndSend("hello-queue",msg);

        //Send message to message queue
        //Parameter 1: switch name
        //Parameter 2: route key
        //Parameter 3: message
        this.amqpTemplate.convertAndSend(exChange,"product.log.debug","product.log.debug-"+msg);
        this.amqpTemplate.convertAndSend(exChange,"product.log.info","product.log.info-"+msg);
        this.amqpTemplate.convertAndSend(exChange,"product.log.warn","product.log.warn-"+msg);
        this.amqpTemplate.convertAndSend(exChange,"product.log.error","product.log.error-"+msg);

    }
}

3. User service

package com.example.amqptopicprovider;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * Author:aijiaxiang
 * Date:2020/4/26
 * Description:Send message as user service
 */
@Component
public class UserSender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    //exChange exchanger
    @Value("${mq.config.exchange}")
    private String exChange;

    /**
     * How to send a message
     * @param msg
     */
    public void send(String msg){
        //Send message to message queue
        //Parameter 1: queue name
        //Parameter 2: message
        // this.amqpTemplate.convertAndSend("hello-queue",msg);

        //Send message to message queue
        //Parameter 1: switch name
        //Parameter 2: route key
        //Parameter 3: message
        this.amqpTemplate.convertAndSend(exChange,"user.log.debug","user.log.debug-"+msg);
        this.amqpTemplate.convertAndSend(exChange,"user.log.info","user.log.info-"+msg);
        this.amqpTemplate.convertAndSend(exChange,"user.log.warn","user.log.warn-"+msg);
        this.amqpTemplate.convertAndSend(exChange,"user.log.error","user.log.error-"+msg);

    }
}

IV. create consumers

1.ERROR log processing service

package com.ant.amqptopicconsumer;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/**
 * Author:aijiaxiang
 * Date:2020/4/26
 * Description:Message Receiver 
 * @RabbitListener bindings:Bind queue
 * @QueueBinding  value: Name of binding queue
 *                  exchange: Configure switch
 * @Queue : value: Configure queue name
 *          autoDelete:Is it a temporary queue that can be deleted
 * @Exchange value:Name the switch
 *           type:Specify specific exchanger type
 */
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.error}",autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
                key = "*.log.error"
        )
)
public class TopicErrorReceiver {

    /**
     * Message receiving method adopts message queue monitoring mechanism
     * @param msg
     */
    @RabbitHandler
    public void process(String msg){
        System.out.println("error-receiver: "+msg);
    }
}

2.INFO log processing service

package com.ant.amqptopicconsumer;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/**
 * Author:aijiaxiang
 * Date:2020/4/26
 * Description:Message Receiver 
 * @RabbitListener bindings:Bind queue
 * @QueueBinding  value: Name of binding queue
 *                  exchange: Configure switch
 * @Queue : value: Configure queue name
 *          autoDelete:Is it a temporary queue that can be deleted
 * @Exchange value:Name the switch
 *           type:Specify specific exchanger type
 */
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.info}",autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
                key = "*.log.info"
        )
)
public class TopicInfoReceiver {

    /**
     * Message receiving method adopts message queue monitoring mechanism
     * @param msg
     */
    @RabbitHandler
    public void process(String msg){
        System.out.println("info-receiver: "+msg);
    }
}

3. Full log processing service

package com.ant.amqptopicconsumer;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/**
 * Author:aijiaxiang
 * Date:2020/4/26
 * Description:Message Receiver 
 *
 * @RabbitListener bindings:Bind queue
 * @QueueBinding  value: Name of binding queue
 *                  exchange: Configure switch
 *                  key:Routing key
 * @Queue : value: Configure queue name
 *          autoDelete:Is it a temporary queue that can be deleted
 * @Exchange value:Name the switch
 *           type:Specify specific exchanger type
 */
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.logs}",autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
                key = "*.log.*"
        )
)
public class TopicLogReceiver {

    /**
     * Message receiving method adopts message queue monitoring mechanism
     * @param msg
     */
    @RabbitHandler
    public void process(String msg){
        System.out.println("all-receiver: "+msg);
    }
}

V. test of old rules

package com.example.amqp;

import com.example.ampq.Sender;
import com.example.amqptopicprovider.OrderSender;
import com.example.amqptopicprovider.ProductSender;
import com.example.amqptopicprovider.UserSender;
import com.example.helloworld.HelloworldApplication;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * Author:aijiaxiang
 * Date:2020/4/26
 * Description:
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = HelloworldApplication.class)
public class QueueTest {
    @Autowired
    private Sender sender;

    @Autowired
    private UserSender userSender;

    @Autowired
    private ProductSender productSender;

    @Autowired
    private OrderSender orderSender;

    /**
     * Test message queue
     */
//    @Test
//    public void test1() throws InterruptedException {
//        while (true){
//            Thread.sleep(1000);
//            sender.send("hello");
//        }
//
//    }

    @Test
    public void test2(){
        userSender.send("usersend");
        productSender.send("productsend");
        orderSender.send("ordersend");
    }
}

Note: the routing key in the log service processing class is directly configured in the way of hard coding, mainly for the convenience of viewing at a glance. However, it is recommended to configure the routing key in the configuration file and use "${}" for reading.

Keywords: Java Spring RabbitMQ Junit Thymeleaf

Added by jholzy on Tue, 28 Apr 2020 19:52:58 +0300