Spring boot2 implements access to RabbitMQ image cluster

Spring boot2 implements access to RabbitMQ image cluster

Spring boot2 implements access to RabbitMQ image cluster

This paper mainly records the access to RabbitMQ image cluster based on SpringBoot2. The servers directly accessing RabbitMQ are basically the same, only the configuration file needs to be modified.

Brief description of RabbitMQ cluster

  1. TCP proxy load balancing address and listening port port
    192.168.132.143:5672

  2. MQ node 1
    192.168.132.137:5672

  3. MQ node two
    192.168.132.139:5672

code snippet

Maven dependency: pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.3.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.ictpaas</groupId>
    <artifactId>rabbitmq-test</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rabbitmq-test</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

		<!-- RabbitMQ -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

Configuration file: application.properties

# rabbitmq
# rabbitmq server address
# Fill in the address of TCP agent and listening port in the image cluster mode
# In non cluster mode, fill in the address and port of MQ server
spring.rabbitmq.addresses=192.168.132.143:5672
# User name and password of rabbitmq
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# The virtual host name of rabbitmq server, which can be viewed and created on the background management system
spring.rabbitmq.virtual-host=/test
# connection timed out
spring.rabbitmq.connection-timeout=15000

# rabbitmq-producer
# Allow ConfirmCallback
spring.rabbitmq.publisher-confirms=true
# Allow ReturnCallback
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true

# rabbitmq-consumer
# Concurrency number
spring.rabbitmq.listener.simple.concurrency=1
# Maximum concurrent
spring.rabbitmq.listener.simple.max-concurrency=5
# Sign in mode, manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# Limit the flow, and avoid the server down caused by processing a large number of messages at the same time. It depends on the number of threads
spring.rabbitmq.listener.simple.prefetch=1

Producer: RabbitmqProducerService.java

The producer needs to call actively.

package com.ictpaas.rabbitmqtest.service;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.Date;

@Service
public class RabbitmqProducerService {

    @Resource
    private RabbitTemplate rabbitTemplate;

    RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean isAck, String cause) {
            System.out.println(correlationData);
            System.out.println("ack: " + isAck);
            if (!isAck) {
                System.err.println(cause);
            }
        }
    };

    RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            System.out.println("code: " + replyCode + ", text: " + replyText);
            System.out.println("exchange: " + exchange + ", routingKey: " + routingKey);
            System.out.println(message);
        }
    };

    public void sendMsg(String msg) {
        CorrelationData cd = new CorrelationData("id" + new Date().getTime());
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        rabbitTemplate.convertAndSend("test-exchange", "routingkey-test", msg, cd);
    }

}

Consumer: RabbitmqConsumerService.java

The consumer side is a listener, which does not need to be called actively. After the program runs, it will automatically listen to the bound RabbitMQ queue.

package com.ictpaas.rabbitmqtest.service;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.Map;

@Service
public class RabbitmqConsumerService {

    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "test-queue", durable = "true"),
                    exchange = @Exchange(value = "test-exchange", durable = "true", type = "topic"),
                    key = "routingkey-test"
            )
    )
    @RabbitHandler
    public void testConsumer(@Payload String msg, Channel channel, @Headers Map<String, Object> headers) {
        System.out.println(msg);
        Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        try {
            // false means not receiving in batch, only receiving the current message
            channel.basicAck(tag, false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

Startup class: RabbitmqTestApplication.java

package com.ictpaas.rabbitmqtest;

import com.ictpaas.rabbitmqtest.service.RabbitmqProducerService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@SpringBootApplication
@RestController
public class RabbitmqTestApplication {

    @Resource
    private RabbitmqProducerService rabbitmqProducerService;

    public static void main(String[] args) {
        SpringApplication.run(RabbitmqTestApplication.class, args);
    }

    @GetMapping("/rabbitmq/{msg}")
    public String test(@PathVariable(name = "msg") String msg) {
    	// Call producer, publish message
        rabbitmqProducerService.sendMsg(msg);
        return "SUCCESS";
    }

}

Keywords: RabbitMQ Spring Java Maven

Added by learnphp1 on Thu, 05 Dec 2019 23:25:45 +0200