RabbitMQ: Chapter 3: spring boot integration RabbitMQ (direct connection mode, work queue mode, publish subscribe mode, routing mode, wildcard mode)

Catalogue of series articles

RabbitMQ: Chapter 1: six working modes and message confirmation mechanism (combination of theory and code)

RabbitMQ: Chapter 2: Spring integrates RabbitMQ (simple mode, broadcast mode, routing mode, wildcard mode, message reliability delivery, message loss prevention, TTL, dead letter queue, delay queue, message backlog, message idempotence)

RabbitMQ: Chapter 3: spring boot integration RabbitMQ (direct connection mode, work queue mode, publish subscribe mode, routing mode, wildcard mode)

Chapter 4: building RabbitMQ cluster

Article catalogue

preface

Tip: the Springboot integration Rabbitmq actual combat case is demonstrated through interface call.

Tip: the following is the main content of this article. The following cases can be used for reference

1, Integration steps

1, Producer:

Create SpringBoot project

Introducing pom dependency

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Writing rabbitmq configuration messages

Configuration classes that define switches, queues, and binding relationships

Inject RabbitTemplate and call method to complete message sending

2, Consumer:

Create SpringBoot project

Introducing pom dependency

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Write rabbitmq configuration message

Define a listening class and use the @ RabbitListener annotation to complete queue listening.

2, Implementation steps

1. Project architecture

2. Create project

The code is as follows (example):

1.pom dependency

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.sky</groupId>
    <artifactId>springboot-rabbitmq-module</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springboot-rabbitmq-module</name>
    <description>springboot-rabbitmq-module</description>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <finalName>springboot_rabbitmq</finalName>
        <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
            <plugins>
                <plugin>
                    <artifactId>maven-clean-plugin</artifactId>
                    <version>3.1.0</version>
                </plugin>
                <!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_war_packaging -->
                <plugin>
                    <artifactId>maven-resources-plugin</artifactId>
                    <version>3.0.2</version>
                </plugin>
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.8.0</version>
                </plugin>
                <plugin>
                    <artifactId>maven-surefire-plugin</artifactId>
                    <version>2.22.1</version>
                </plugin>
                <plugin>
                    <artifactId>maven-war-plugin</artifactId>
                    <version>3.2.2</version>
                </plugin>
                <plugin>
                    <artifactId>maven-install-plugin</artifactId>
                    <version>2.5.2</version>
                </plugin>
                <plugin>
                    <artifactId>maven-deploy-plugin</artifactId>
                    <version>2.8.2</version>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

</project>

2.application.properties configuration

server.port=8080
#spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.addresses=110.42.239.246
spring.rabbitmq.virtual-host=springboot

#spring.rabbitmq.addresses=110.42.239.246:5672,110.42.239.247:5672,110.42.239.248:5672

Note: rabbitmq connection mode is provided here for free for everyone to use and learn

3.config configuration

HelloWorldConfig

package com.sky.springbootrabbitmqmodule.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * HelloWorld rabbitmq The first working mode explained in class
 * The direct connection mode only needs to declare the queue, and all messages are forwarded through the queue.
 * There is no need to set up a switch
 */
@Configuration
public class HelloWorldConfig {

    @Bean
    public Queue setQueue() {
        return new Queue("helloWorldqueue");
    }
}

FanoutConfig

package com.sky.springbootrabbitmqmodule.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Fanout The mode needs to declare exchange and bind the queue, and exchange is responsible for forwarding to the queue.
 * The broadcast mode switch type is set to fanout
 */
@Configuration
public class FanoutConfig {

    //Declaration queue
    @Bean
    public Queue fanoutQ1() {
        return new Queue("fanout.q1");
    }
    @Bean
    public Queue fanoutQ2() {
        return new Queue("fanout.q2");
    }


    //Declare exchange
    @Bean
    public FanoutExchange setFanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }


    //Declare the binding relationship between Binding,exchange and queue
    @Bean
    public Binding bindQ1() {
        return BindingBuilder.bind(fanoutQ1()).to(setFanoutExchange());
    }
    @Bean
    public Binding bindQ2() {
        return BindingBuilder.bind(fanoutQ2()).to(setFanoutExchange());
    }

}

WorkConfig

package com.sky.springbootrabbitmqmodule.config;



import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class WorkConfig {


    //Declaration queue
    @Bean
    public Queue workQ1() {
        return new Queue("work_sb_mq_q");
    }

}

DirectConfig

package com.sky.springbootrabbitmqmodule.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/*
   Routing mode | routing mode switch type: direct
*/
@Configuration
public class DirectConfig {

    //Declaration queue
    @Bean
    public Queue directQ1() {
        return new Queue("direct_sb_mq_q1");
    }
    @Bean
    public Queue directQ2() {
        return new Queue("direct_sb_mq_q2");
    }


    //Declare exchange
    @Bean
    public DirectExchange setDirectExchange() {
        return new DirectExchange("directExchange");
    }

    //To declare binding, you need to declare a routingKey
    @Bean
    public Binding bindDirectBind1() {
        return BindingBuilder.bind(directQ1()).to(setDirectExchange()).with("directBind.one");
    }
    @Bean
    public Binding bindDirectBind2() {
            return BindingBuilder.bind(directQ2()).to(setDirectExchange()).with("directBind.two");
    }

}

TopicConfig

package com.sky.springbootrabbitmqmodule.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/*
Topics Mode switch type topic
* */
@Configuration
public class TopicConfig {

    //Declaration queue
    @Bean
    public Queue topicQ1() {
        return new Queue("topic_sb_mq_q1");
    }
    @Bean
    public Queue topicQ2() {
        return new Queue("topic_sb_mq_q2");
    }


    //Declare exchange
    @Bean
    public TopicExchange setTopicExchange() {
        return new TopicExchange("topicExchange");
    }

    //To declare binding, you need to declare a roytingKey
    @Bean
    public Binding bindTopicHebei1() {
        return BindingBuilder.bind(topicQ1()).to(setTopicExchange()).with("directBind.*");
    }
    @Bean
    public Binding bindTopicHebei2() {
        return BindingBuilder.bind(topicQ2()).to(setTopicExchange()).with("#.two");
    }

}

4. Consumer component

package com.sky.springbootrabbitmqmodule.component;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
public class ConcumerReceiver {

    //Multiple consumers in the direct connection mode will be assigned to one of them for consumption. Similar to task mode
    //Set some properties by injecting RabbitContainerFactory object, which is equivalent to channel in task basicQos
    @RabbitListener(queues="helloWorldqueue")
    public void helloWorldReceive(String message) {
         System.out.println("helloWorld pattern received message : " +message);
    }

    //Work queue mode
    @RabbitListener(queues="work_sb_mq_q")
    public void wordQueueReceiveq1(String message) {
        System.out.println("Work queue mode 1 received message : " +message);
    }

    @RabbitListener(queues="work_sb_mq_q")
    public void wordQueueReceiveq2(String message) {
        System.out.println("Work queue mode 2 received message : " +message);
    }


    //Message listening in pub/sub mode
    @RabbitListener(queues="fanout.q1")
    public void fanoutReceiveq1(String message) {
        System.out.println("Publish subscribe mode 1 received message : " +message);
    }
    @RabbitListener(queues="fanout.q2")
    public void fanoutReceiveq2(String message) {
        System.out.println("Publish subscribe mode 2 received message : " +message);
    }


    //Routing mode
    @RabbitListener(queues="direct_sb_mq_q1")
    public void routingReceiveq1(String message) {
        System.out.println("Routing Routing mode routingReceiveqOne received message : " +message);
    }

    @RabbitListener(queues="direct_sb_mq_q2")
    public void routingReceiveq2(String message) {
        System.out.println("Routing Routing mode routingReceiveqTwo received message : " +message);
    }


    //topic mode
    //Note that this pattern will have a priority matching principle. For example, send routingkey = Hunan It, that matches Hunan* (Hunan. It, Hunan. ECO), then you won't match * ITd
    @RabbitListener(queues="topic_sb_mq_q1")
    public void topicReceiveq1(String message) {
        System.out.println("Topic pattern topic_sb_mq_q1 received message : " +message);
    }

    @RabbitListener(queues="topic_sb_mq_q2")
    public void topicReceiveq2(String message) {
        System.out.println("Topic pattern topic_sb_mq_q2 received  message : " +message);
    }
    
}

5. Producer controller

package com.sky.springbootrabbitmqmodule.controller;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.UnsupportedEncodingException;

@RestController
public class ProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    //helloWorld direct connection mode
    @GetMapping(value="/helloWorldSend")
    public Object helloWorldSend(String message) throws AmqpException, UnsupportedEncodingException {
        //Set some request parameters
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
        //Send a message
        rabbitTemplate.send("helloWorldqueue",new Message(message.getBytes("UTF-8"),messageProperties));
        return "message sended : "+message;
    }


    //Work queue mode
    @GetMapping(value="/workqueueSend")
    public Object workqueueSend(String message) throws AmqpException, UnsupportedEncodingException {
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
        //Create multiple messages for sending operation
        for (int i = 0; i <10 ; i++) {
            rabbitTemplate.send("work_sb_mq_q",  new Message(message.getBytes("UTF-8"),messageProperties));
        }
        return "message sended : "+message;
    }


    // pub/sub publish subscribe mode switch type fanout
    @GetMapping(value="/fanoutSend")
    public Object fanoutSend(String message) throws AmqpException, UnsupportedEncodingException {
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
        //fanout mode only sends messages to exchange. Distribute to all queue s under exchange
        rabbitTemplate.send("fanoutExchange", "", new Message(message.getBytes("UTF-8"),messageProperties));
        return "message sended : "+message;
    }


    //routing working mode switch type direct
    @GetMapping(value="/directSend")
    public Object routingSend(String routingKey,String message) throws AmqpException, UnsupportedEncodingException {
        if(null == routingKey) {
            routingKey="directBind.one";
        }
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
        //fanout mode only sends messages to exchange. Distribute to all queue s under exchange
        rabbitTemplate.send("directExchange", routingKey, new Message(message.getBytes("UTF-8"),messageProperties));
        return "message sended : routingKey >"+routingKey+";message > "+message;
    }


    //Topic working mode switch type topic
    @GetMapping(value="/topicSend")
    public Object topicSend(String routingKey,String message) throws AmqpException, UnsupportedEncodingException {
        if(null == routingKey) {
            routingKey="directBind.one";
        }
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
        //fanout mode only sends messages to exchange. Distribute to all exchange queue s
        rabbitTemplate.send("topicExchange", routingKey, new Message(message.getBytes("UTF-8"),messageProperties));
        return "message sended : routingKey >"+routingKey+";message > "+message;
    }

}

The above is all the code of this project, and the following is the Demo content.

3, Demonstration steps

1. Start the project

2. Demonstration of calling interface

1. Direct connection mode

1. Interface call

2. Console printing

2. Work queue mode

1. Interface call

2. Console printing

3. Publish subscribe mode (switch type: fanout)

1. Interface call

2. Console printing

4. Routing mode (switch type: direct)

1. Interface call

2. Console printing

5. Wildcard mode (switch type: topic)

1. Interface call

2. Console printing

In addition, I also provided the project address to clone. The address link is: https://gitee.com/java_wxid/liao

summary

Tip: the above is what I want to talk about today. This paper introduces how Springboot can quickly integrate Rabbitmq, and provides Demo cases of five modes for your reference. I hope it will be helpful to you.

Added by melqui on Mon, 07 Mar 2022 06:02:37 +0200