Catalogue of series articles
Chapter 4: building RabbitMQ cluster
Article catalogue
- Catalogue of series articles
- preface
- 1, Integration steps
- 2, Implementation steps
- 3, Demonstration steps
- summary
preface
Tip: the Springboot integration Rabbitmq actual combat case is demonstrated through interface call.
Tip: the following is the main content of this article. The following cases can be used for reference
1, Integration steps
1, Producer:
Create SpringBoot project
Introducing pom dependency
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
Writing rabbitmq configuration messages
Configuration classes that define switches, queues, and binding relationships
Inject RabbitTemplate and call method to complete message sending
2, Consumer:
Create SpringBoot project
Introducing pom dependency
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
Write rabbitmq configuration message
Define a listening class and use the @ RabbitListener annotation to complete queue listening.
2, Implementation steps
1. Project architecture
2. Create project
The code is as follows (example):
1.pom dependency
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.5.5</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.sky</groupId> <artifactId>springboot-rabbitmq-module</artifactId> <version>0.0.1-SNAPSHOT</version> <name>springboot-rabbitmq-module</name> <description>springboot-rabbitmq-module</description> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> </dependencies> <build> <finalName>springboot_rabbitmq</finalName> <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) --> <plugins> <plugin> <artifactId>maven-clean-plugin</artifactId> <version>3.1.0</version> </plugin> <!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_war_packaging --> <plugin> <artifactId>maven-resources-plugin</artifactId> <version>3.0.2</version> </plugin> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> </plugin> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.22.1</version> </plugin> <plugin> <artifactId>maven-war-plugin</artifactId> <version>3.2.2</version> </plugin> <plugin> <artifactId>maven-install-plugin</artifactId> <version>2.5.2</version> </plugin> <plugin> <artifactId>maven-deploy-plugin</artifactId> <version>2.8.2</version> </plugin> </plugins> </pluginManagement> </build> </project>
2.application.properties configuration
server.port=8080 #spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.addresses=110.42.239.246 spring.rabbitmq.virtual-host=springboot #spring.rabbitmq.addresses=110.42.239.246:5672,110.42.239.247:5672,110.42.239.248:5672
Note: rabbitmq connection mode is provided here for free for everyone to use and learn
3.config configuration
HelloWorldConfig
package com.sky.springbootrabbitmqmodule.config; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * HelloWorld rabbitmq The first working mode explained in class * The direct connection mode only needs to declare the queue, and all messages are forwarded through the queue. * There is no need to set up a switch */ @Configuration public class HelloWorldConfig { @Bean public Queue setQueue() { return new Queue("helloWorldqueue"); } }
FanoutConfig
package com.sky.springbootrabbitmqmodule.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Fanout The mode needs to declare exchange and bind the queue, and exchange is responsible for forwarding to the queue. * The broadcast mode switch type is set to fanout */ @Configuration public class FanoutConfig { //Declaration queue @Bean public Queue fanoutQ1() { return new Queue("fanout.q1"); } @Bean public Queue fanoutQ2() { return new Queue("fanout.q2"); } //Declare exchange @Bean public FanoutExchange setFanoutExchange() { return new FanoutExchange("fanoutExchange"); } //Declare the binding relationship between Binding,exchange and queue @Bean public Binding bindQ1() { return BindingBuilder.bind(fanoutQ1()).to(setFanoutExchange()); } @Bean public Binding bindQ2() { return BindingBuilder.bind(fanoutQ2()).to(setFanoutExchange()); } }
WorkConfig
package com.sky.springbootrabbitmqmodule.config; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class WorkConfig { //Declaration queue @Bean public Queue workQ1() { return new Queue("work_sb_mq_q"); } }
DirectConfig
package com.sky.springbootrabbitmqmodule.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /* Routing mode | routing mode switch type: direct */ @Configuration public class DirectConfig { //Declaration queue @Bean public Queue directQ1() { return new Queue("direct_sb_mq_q1"); } @Bean public Queue directQ2() { return new Queue("direct_sb_mq_q2"); } //Declare exchange @Bean public DirectExchange setDirectExchange() { return new DirectExchange("directExchange"); } //To declare binding, you need to declare a routingKey @Bean public Binding bindDirectBind1() { return BindingBuilder.bind(directQ1()).to(setDirectExchange()).with("directBind.one"); } @Bean public Binding bindDirectBind2() { return BindingBuilder.bind(directQ2()).to(setDirectExchange()).with("directBind.two"); } }
TopicConfig
package com.sky.springbootrabbitmqmodule.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /* Topics Mode switch type topic * */ @Configuration public class TopicConfig { //Declaration queue @Bean public Queue topicQ1() { return new Queue("topic_sb_mq_q1"); } @Bean public Queue topicQ2() { return new Queue("topic_sb_mq_q2"); } //Declare exchange @Bean public TopicExchange setTopicExchange() { return new TopicExchange("topicExchange"); } //To declare binding, you need to declare a roytingKey @Bean public Binding bindTopicHebei1() { return BindingBuilder.bind(topicQ1()).to(setTopicExchange()).with("directBind.*"); } @Bean public Binding bindTopicHebei2() { return BindingBuilder.bind(topicQ2()).to(setTopicExchange()).with("#.two"); } }
4. Consumer component
package com.sky.springbootrabbitmqmodule.component; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; @Component public class ConcumerReceiver { //Multiple consumers in the direct connection mode will be assigned to one of them for consumption. Similar to task mode //Set some properties by injecting RabbitContainerFactory object, which is equivalent to channel in task basicQos @RabbitListener(queues="helloWorldqueue") public void helloWorldReceive(String message) { System.out.println("helloWorld pattern received message : " +message); } //Work queue mode @RabbitListener(queues="work_sb_mq_q") public void wordQueueReceiveq1(String message) { System.out.println("Work queue mode 1 received message : " +message); } @RabbitListener(queues="work_sb_mq_q") public void wordQueueReceiveq2(String message) { System.out.println("Work queue mode 2 received message : " +message); } //Message listening in pub/sub mode @RabbitListener(queues="fanout.q1") public void fanoutReceiveq1(String message) { System.out.println("Publish subscribe mode 1 received message : " +message); } @RabbitListener(queues="fanout.q2") public void fanoutReceiveq2(String message) { System.out.println("Publish subscribe mode 2 received message : " +message); } //Routing mode @RabbitListener(queues="direct_sb_mq_q1") public void routingReceiveq1(String message) { System.out.println("Routing Routing mode routingReceiveqOne received message : " +message); } @RabbitListener(queues="direct_sb_mq_q2") public void routingReceiveq2(String message) { System.out.println("Routing Routing mode routingReceiveqTwo received message : " +message); } //topic mode //Note that this pattern will have a priority matching principle. For example, send routingkey = Hunan It, that matches Hunan* (Hunan. It, Hunan. ECO), then you won't match * ITd @RabbitListener(queues="topic_sb_mq_q1") public void topicReceiveq1(String message) { System.out.println("Topic pattern topic_sb_mq_q1 received message : " +message); } @RabbitListener(queues="topic_sb_mq_q2") public void topicReceiveq2(String message) { System.out.println("Topic pattern topic_sb_mq_q2 received message : " +message); } }
5. Producer controller
package com.sky.springbootrabbitmqmodule.controller; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.io.UnsupportedEncodingException; @RestController public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; //helloWorld direct connection mode @GetMapping(value="/helloWorldSend") public Object helloWorldSend(String message) throws AmqpException, UnsupportedEncodingException { //Set some request parameters MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN); //Send a message rabbitTemplate.send("helloWorldqueue",new Message(message.getBytes("UTF-8"),messageProperties)); return "message sended : "+message; } //Work queue mode @GetMapping(value="/workqueueSend") public Object workqueueSend(String message) throws AmqpException, UnsupportedEncodingException { MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN); //Create multiple messages for sending operation for (int i = 0; i <10 ; i++) { rabbitTemplate.send("work_sb_mq_q", new Message(message.getBytes("UTF-8"),messageProperties)); } return "message sended : "+message; } // pub/sub publish subscribe mode switch type fanout @GetMapping(value="/fanoutSend") public Object fanoutSend(String message) throws AmqpException, UnsupportedEncodingException { MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN); //fanout mode only sends messages to exchange. Distribute to all queue s under exchange rabbitTemplate.send("fanoutExchange", "", new Message(message.getBytes("UTF-8"),messageProperties)); return "message sended : "+message; } //routing working mode switch type direct @GetMapping(value="/directSend") public Object routingSend(String routingKey,String message) throws AmqpException, UnsupportedEncodingException { if(null == routingKey) { routingKey="directBind.one"; } MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN); //fanout mode only sends messages to exchange. Distribute to all queue s under exchange rabbitTemplate.send("directExchange", routingKey, new Message(message.getBytes("UTF-8"),messageProperties)); return "message sended : routingKey >"+routingKey+";message > "+message; } //Topic working mode switch type topic @GetMapping(value="/topicSend") public Object topicSend(String routingKey,String message) throws AmqpException, UnsupportedEncodingException { if(null == routingKey) { routingKey="directBind.one"; } MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN); //fanout mode only sends messages to exchange. Distribute to all exchange queue s rabbitTemplate.send("topicExchange", routingKey, new Message(message.getBytes("UTF-8"),messageProperties)); return "message sended : routingKey >"+routingKey+";message > "+message; } }
The above is all the code of this project, and the following is the Demo content.
3, Demonstration steps
1. Start the project
2. Demonstration of calling interface
1. Direct connection mode
1. Interface call
2. Console printing
2. Work queue mode
1. Interface call
2. Console printing
3. Publish subscribe mode (switch type: fanout)
1. Interface call
2. Console printing
4. Routing mode (switch type: direct)
1. Interface call
2. Console printing
5. Wildcard mode (switch type: topic)
1. Interface call
2. Console printing
In addition, I also provided the project address to clone. The address link is: https://gitee.com/java_wxid/liao
summary
Tip: the above is what I want to talk about today. This paper introduces how Springboot can quickly integrate Rabbitmq, and provides Demo cases of five modes for your reference. I hope it will be helpful to you.