Spring boot2 implements access to RabbitMQ image cluster
Spring boot2 implements access to RabbitMQ image cluster
This paper mainly records the access to RabbitMQ image cluster based on SpringBoot2. The servers directly accessing RabbitMQ are basically the same, only the configuration file needs to be modified.
Brief description of RabbitMQ cluster
-
TCP proxy load balancing address and listening port port
192.168.132.143:5672 -
MQ node 1
192.168.132.137:5672 -
MQ node two
192.168.132.139:5672
code snippet
Maven dependency: pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.3.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.ictpaas</groupId> <artifactId>rabbitmq-test</artifactId> <version>0.0.1-SNAPSHOT</version> <name>rabbitmq-test</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- RabbitMQ --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
Configuration file: application.properties
# rabbitmq # rabbitmq server address # Fill in the address of TCP agent and listening port in the image cluster mode # In non cluster mode, fill in the address and port of MQ server spring.rabbitmq.addresses=192.168.132.143:5672 # User name and password of rabbitmq spring.rabbitmq.username=guest spring.rabbitmq.password=guest # The virtual host name of rabbitmq server, which can be viewed and created on the background management system spring.rabbitmq.virtual-host=/test # connection timed out spring.rabbitmq.connection-timeout=15000 # rabbitmq-producer # Allow ConfirmCallback spring.rabbitmq.publisher-confirms=true # Allow ReturnCallback spring.rabbitmq.publisher-returns=true spring.rabbitmq.template.mandatory=true # rabbitmq-consumer # Concurrency number spring.rabbitmq.listener.simple.concurrency=1 # Maximum concurrent spring.rabbitmq.listener.simple.max-concurrency=5 # Sign in mode, manual spring.rabbitmq.listener.simple.acknowledge-mode=manual # Limit the flow, and avoid the server down caused by processing a large number of messages at the same time. It depends on the number of threads spring.rabbitmq.listener.simple.prefetch=1
Producer: RabbitmqProducerService.java
The producer needs to call actively.
package com.ictpaas.rabbitmqtest.service; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.Date; @Service public class RabbitmqProducerService { @Resource private RabbitTemplate rabbitTemplate; RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean isAck, String cause) { System.out.println(correlationData); System.out.println("ack: " + isAck); if (!isAck) { System.err.println(cause); } } }; RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("code: " + replyCode + ", text: " + replyText); System.out.println("exchange: " + exchange + ", routingKey: " + routingKey); System.out.println(message); } }; public void sendMsg(String msg) { CorrelationData cd = new CorrelationData("id" + new Date().getTime()); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); rabbitTemplate.convertAndSend("test-exchange", "routingkey-test", msg, cd); } }
Consumer: RabbitmqConsumerService.java
The consumer side is a listener, which does not need to be called actively. After the program runs, it will automatically listen to the bound RabbitMQ queue.
package com.ictpaas.rabbitmqtest.service; import com.rabbitmq.client.Channel; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Service; import java.io.IOException; import java.util.Map; @Service public class RabbitmqConsumerService { @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "test-queue", durable = "true"), exchange = @Exchange(value = "test-exchange", durable = "true", type = "topic"), key = "routingkey-test" ) ) @RabbitHandler public void testConsumer(@Payload String msg, Channel channel, @Headers Map<String, Object> headers) { System.out.println(msg); Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); try { // false means not receiving in batch, only receiving the current message channel.basicAck(tag, false); } catch (IOException e) { e.printStackTrace(); } } }
Startup class: RabbitmqTestApplication.java
package com.ictpaas.rabbitmqtest; import com.ictpaas.rabbitmqtest.service.RabbitmqProducerService; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; @SpringBootApplication @RestController public class RabbitmqTestApplication { @Resource private RabbitmqProducerService rabbitmqProducerService; public static void main(String[] args) { SpringApplication.run(RabbitmqTestApplication.class, args); } @GetMapping("/rabbitmq/{msg}") public String test(@PathVariable(name = "msg") String msg) { // Call producer, publish message rabbitmqProducerService.sendMsg(msg); return "SUCCESS"; } }