RabbitMQ--SpringBoot -- Integration / use / usage / instance / example / actual combat

Original website: RabbitMQ--SpringBoot -- Integration / use / usage / instance / example / actual combat_ CSDN blog

Other web sites

Spring AMQP 2.1.2.RELEASE Chinese document - 1. Preface | Docs4dev

RabbitMQ: @ RabbitListener and @ RabbitHandler and message serialization - short book

Several postures of springboot + rabbit MQ
RabbitMQ and SpringBoot integration practice - brief book

annotation

Other web sites

RabbitMQ: @ RabbitListener and @ RabbitHandler and message serialization - short book

@RabbitListener

Used in methods

When the supervisor hears a message in the queue, it will receive and process it. If it does not exist, an error will be reported.

@Component
public class Receiver {
    //You can also listen to multiple queues: @ RabbitListener(queues = {"hello", "hi"})
    @RabbitListener(queues = "hello")
    public void process(String hello) {
        System.out.println ("Receiver : "  + hello);
    }
}

@The bindings property of the RabbitListener declares Binding (an error will be reported if the Queue, Exchange and RouteKey required for the Binding do not exist in RabbitMQ)

@RabbitListener(bindings = @QueueBinding(
        exchange = @Exchange(value = "topic.exchange",durable = "true",type = "topic"),
        value    = @Queue(value = "consumer_queue",durable = "true"),
        key = "key.#"
))
public void processMessage1(Message message) {
    System.out.println(message);
}

Used on classes

  1. To be used with @ RabbitHandler annotation
  2. @The RabbitListener label on the class indicates that when a message is received, it will be handed over to the method of @ RabbitHandler for processing. The specific method to use for processing is determined according to the parameter type after MessageConverter conversion.
@Component
@RabbitListener(queues = "consumer_queue")
public class Receiver {
    @RabbitHandler
    public void processMessage1(String message) {
        System.out.println(message);
    }

    @RabbitHandler
    public void processMessage2(byte[] message) {
        System.out.println(new String(message));
    }
}

@Payload and @ Headers

Other web sites

Spring AMQP 2.1.2.RELEASE Chinese document - 3. Reference | Docs4dev
RabbitMQ (III) RabbitMQ advanced integrated application - blind scavenger - blog Park

brief introduction

@Headers must be received through Map.
//@Header("amqp_receivedRoutingKey") String rk directly obtains a key in the header  

Get the body and headers information in the message

@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Headers Map<String,Object> headers) {
    System.out.println("body: "+body);
    System.out.println("Headers: "+headers);
}

Get a single Header property

@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Header String token) {
    System.out.println("body: "+body);
    System.out.println("token: "+token);
}

binding

Other web sites

SpringBoot integration rabbitmq

brief introduction

1. There are two ways to create switch / queue / binding instances

Switch (the following two methods are equivalent):

ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_WELCOME).durable(true).build();
new TopicExchange(EXCHANGE_TOPIC_WELCOME, true, false)

Queue (the following two methods are equivalent):

QueueBuilder.durable("Hi").build();
new Queue(QUEUE_HI, true)

Binding (the following two methods are equivalent):

Note: the first parameter is not a string.

BindingBuilder.bind(helloQueue).to(welcomExchange).with("hello.#")
new Binding("Queue@hello", Binding.DestinationType.QUEUE,
                "Exchange@topic.welcome", "hello.#", null)

The degree of recommendation decreases in turn.

Method 1: configure by configuration class (concise)

package com.example.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitRouterConfig {
    public static final String QUEUE_HELLO              = "Queue@hello";
    public static final String QUEUE_HI                 = "Queue@hi";
    public static final String EXCHANGE_TOPIC_WELCOME   = "Exchange@topic.welcome";
    public static final String ROUTINGKEY_HELLOS        = "hello.#";

    @Autowired
    AmqpAdmin amqpAdmin;

    @Bean
    Object initBindingTest() {
        amqpAdmin.declareExchange(new TopicExchange(EXCHANGE_TOPIC_WELCOME, true, false));

        amqpAdmin.declareQueue(new Queue(QUEUE_HI, true));
        amqpAdmin.declareQueue(new Queue(QUEUE_HELLO, true));

        amqpAdmin.declareBinding(new Binding(QUEUE_HELLO, Binding.DestinationType.QUEUE,
                EXCHANGE_TOPIC_WELCOME, ROUTINGKEY_HELLOS, null));

        return new Object();
    }
}

amqpAdmin.declareBinding

A Binding object is required as a parameter

  • Exchange: exchange name
  • Type: exchanger type. The BuiltinExchangeType enumeration class has the following 4 types of switches: DIRECT("direct"), FANOUT("fanout"), TOPIC("topic"), HEADERS("headers")
  • durable: sets whether to persist. true: persistent, false: non persistent. Persistence can save the switch to disk, and relevant messages will not be lost when the server is restarted.
  • autoDelete: sets whether to delete automatically. true: automatically delete; false: not automatically delete. The premise of automatic deletion is that at least one queue or switch is bound to this switch, and then all queues or switches bound to this switch are unbound from this switch.
  • internal: set whether it is built-in. true: built-in switch, false: non built-in switch. Built in switch, the client cannot directly send messages to this switch, and can only route to the switch through the switch.
  • arguments: other structured parameters. Such as backup switch: alternate exchange, timeout. Example configuration timeout method:
Map<String, Object> params = new HashMap();
params.put("x-message-ttl", 2000);
amqpAdmin.declareBinding(new Binding(QUEUE_HELLO, Binding.DestinationType.QUEUE,
                EXCHANGE_TOPIC_WELCOME, ROUTINGKEY_HELLOS, params));

Method 2: configure by configuration class (cumbersome)

Applicable when there are few queues and switches

package com.lly.order.message;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfig {
    public final static String QUEUE_DIRECT     = "Queue@direct";
    public final static String QUEUE_TOPIC_ONE  = "Queue@topic_one";
    public final static String TOPIC_QUEUE_TWO  = "Queue@topic_two";
    public final static String QUEUE_FANOUT_ONE = "Queue@fanout_one";
    public final static String QUEUE_FANOUT_TWO = "Queue@fanout_two";

    public final static String EXCHANGE_TOPIC   = "Exchange@topic";
    public final static String EXCHANGE_FANOUT  = "Exchange@fanout";

    public final static String ROUTINGKEY_TOPIC_ONE = "hello.key";
    public final static String ROUTINGKEY_TOPIC_TWO = "*.key";

	//  direct mode queue
    @Bean
    public Queue directQueue() {
        return new Queue(QUEUE_DIRECT, true);
    }
	
	//  topic subscriber mode queue
    @Bean
    public Queue topicQueueOne() {
        return new Queue(QUEUE_TOPIC_ONE, true);
    }
    @Bean
    public Queue topicQueueTwo() {
        return new Queue(TOPIC_QUEUE_TWO, true);
    }
	
	//  fanout broadcaster mode queue
    @Bean
    public Queue fanoutQueueOne() {
        return new Queue(QUEUE_FANOUT_ONE, true);
    }
    @Bean
    public Queue fanoutQueueTwo() {
        return new Queue(QUEUE_FANOUT_TWO, true);
    }
	
	//  topic exchanger
    @Bean
    public TopicExchange topExchange() {
        return new TopicExchange(EXCHANGE_TOPIC);
    }
	
	//  fanout exchanger
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(EXCHANGE_FANOUT);
    }

	//   Subscriber mode binding
    @Bean
    public Binding topicExchangeBingingOne() {
        return BindingBuilder.bind(topicQueueOne()).to(topExchange()).with(ROUTINGKEY_TOPIC_ONE);
    }

    @Bean
    public Binding topicExchangeBingingTwo() {
        return BindingBuilder.bind(topicQueueTwo()).to(topicExchange()).with(ROUTINGKEY_TOPIC_TWO);
    }
	
	//   Broadcast mode binding
    @Bean
    public Binding fanoutExchangeBingingOne() {
        return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange());
    }
    @Bean
    public Binding fanoutExchangeBingingTwo() {
        return BindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange());
    }
}

Method 3: configure by code

@Component
public class Receiver {
    @RabbitListener(queues = "hello")
    public void process(String hello) {
        System.out.println ("Receiver : "  + hello);
    }

	@RabbitListener(bindings = @QueueBinding(
			exchange = @Exchange(value = "Exchange@topic.Hello",durable = "true",type = "topic"),
			value    = @Queue(value = "Queue@Hello",durable = "true"),
			key      = "key.#"
	))
	public void processMessage1(Message message) {
		System.out.println(message);
	}
}

Method 4: configure through MQ web page

Add switch: http://localhost:15672/#/exchanges    // For example: Exchange@topic.Hello
Add queue: http://localhost:15672/#/queues             // For example: Queue@Hello
Add routing key to switch: http://localhost:15672/#/exchanges =>Click switch name = > binding = > Add queue and route

to configure

RabbitTemplate configuration

example

effect

rabbitTemplate.setConfirmCallback(rabbitConfirmCallback);

Set the confirm callback function.

rabbitTemplate.setReturnCallback(rabbitReturnCallback);

Set the return callback function.

rabbitTemplate.setMandatory(true);

When mandatory is set to true, if exchange cannot find a suitable queue to store the message according to its own type and message routingKey, the broker will call the basic.return method to return the message to the producer. When the mandatory is set to false, the broker will directly discard the message when the above situation occurs.

rabbitTemplate.setUsePublisherConnection(true); 

Use a separate sending connection to avoid the same blocking of consumers due to the blocking of producers for various reasons

example

Operation RabbitMQ

Start RabbitMQ

Command line run:

rabbitmq-server.bat

//For the first time, run rabbitmq-plugins.bat enable rabbitmq_management

Configure an administrator user

User name: admin     Password: 123456

The permissions are configured as:/

See: RabbitMQ series - Overview_ feiying0canglang's blog - CSDN blog

code

rely on

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.yml

server:
#  port: 9100
  port: 9101
spring:
  application:
#    name: demo-rabbitmq-sender
    name: demo-rabbitmq-receiver
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: 123456
#    virtualHost: /

Queue configuration

package com.example.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitRouterConfig {
    public static final String QUEUE_HELLO              = "Queue@hello";
    public static final String QUEUE_HI                 = "Queue@hi";
    public static final String EXCHANGE_TOPIC_WELCOME   = "Exchange@topic.welcome";
    public static final String ROUTINGKEY_HELLOS        = "hello.#";

    @Autowired
    AmqpAdmin amqpAdmin;

    @Bean
    Object initBindingTest() {
        amqpAdmin.declareExchange(new TopicExchange(EXCHANGE_TOPIC_WELCOME, true, false));

        amqpAdmin.declareQueue(new Queue(QUEUE_HI, true));
        amqpAdmin.declareQueue(new Queue(QUEUE_HELLO, true));

        amqpAdmin.declareBinding(new Binding(QUEUE_HELLO, Binding.DestinationType.QUEUE,
                EXCHANGE_TOPIC_WELCOME, ROUTINGKEY_HELLOS, null));

        return new Object();
    }
}

Controller

package com.example.controller;

import com.example.config.RabbitConfig;
import com.example.mq.Sender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;

@RestController
public class HelloController {
    @Autowired
    private Sender sender;

    @PostMapping("/hi")
    public void hi() {
        sender.send(RabbitConfig.QUEUE_HI, "hi1 message:" + LocalDateTime.now());
    }

    @PostMapping("/hello1")
    public void hello1() {
        sender.send("hello.a", "hello1 message:" + LocalDateTime.now());
    }

    @PostMapping("/hello2")
    public void hello2() {
        sender.send("Exchange@topic.welcome","hello.b", "hello2 message:" + LocalDateTime.now());
    }
}

sender

package com.example.mq;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class Sender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send(String routingKey, String message) {
        this.rabbitTemplate.convertAndSend(routingKey, message);
    }

    public void send(String exchange, String routingKey, String message) {
        this.rabbitTemplate.convertAndSend(exchange, routingKey, message);
    }
}

receiver

package com.example.mq;

import com.example.config.RabbitRouterConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
public class Receiver {
    @RabbitListener(queues = RabbitRouterConfig.QUEUE_HELLO)
    public void hello(String hello) {
        System.out.println ("Receiver(hello) : "  + hello);
    }

    @RabbitListener(queues = RabbitRouterConfig.QUEUE_HI)
    public void hi(String payload) {
        System.out.println ("Receiver(hi) : "  + payload);
        throw new RuntimeException("Manual abnormality");
    }

//    @RabbitListener(queues = RabbitRouterConfig.QUEUE_HELLO)
//    public void helloAll(@Payload String payload, Message message, Channel channel) {
//        System.out.println("Receiver(hello): ");
//        System.out.println("payload:" + payload);
//        System.out.println("message:" + message);
//        System.out.println("channel:" + channel);
//    }
//    @RabbitListener(queues = RabbitRouterConfig.QUEUE_HI)
//    public void hiAll(@Payload String payload, Message message, Channel channel) {
//        System.out.println("Receiver(hi): ");
//        System.out.println("payload:" + payload);
//        System.out.println("message:" + message);
//        System.out.println("channel:" + channel);
//    }
}

Basic test

Run sender

server:
  port: 9100
#  port: 9101
spring:
  application:
    name: demo-rabbitmq-sender
#    name: demo-rabbitmq-receiver
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: 123456
#    virtualHost: /

Run receiver

server:
#  port: 9100
  port: 9101
spring:
  application:
#    name: demo-rabbitmq-sender
    name: demo-rabbitmq-receiver
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: 123456
#    virtualHost: /

MQ page

Test data only (payload)

postman access: Node Exporter

receiver print results

Receiver(hi) : hi message:2020-10-21T18:52:24.766

postman access: Node Exporter

receiver print result: no print

That is, if the queue has been bound to the switch, the switch and routing key must be specified.

postman access: Node Exporter

receiver print results

Receiver(hello) : hello2 message:2020-10-21T18:52:41.434

Test details

Test to get payload/message/channel

Use the second code of the receiver.

postman access: Node Exporter

receiver print results

Receiver(hi): 
payload:hi1 message:2020-10-22T15:41:11.796
message:(Body:'hi1 message:2020-10-22T15:41:11.796' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=Queue@hi, deliveryTag=1, consumerTag=amq.ctag-K0Yka5vHxrq6JzNXlg3ncQ, consumerQueue=Queue@hi])
channel:Cached Rabbit Channel: AMQChannel(amqp://admin@127.0.0.1:5672/,1), conn: Proxy@4f4c88f9 Shared Rabbit Connection: SimpleConnection@7c52fc81 [delegate=amqp://admin@127.0.0.1:5672/, localPort= 62067]

postman access: Node Exporter

receiver print result: no print

That is, if the queue has been bound to the switch, the switch and routing key must be specified.

postman access: Node Exporter

receiver print results

Receiver(hello): 
payload:hello2 message:2020-10-22T15:42:13.126
message:(Body:'hello2 message:2020-10-22T15:42:13.126' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=Exchange@topic.welcome, receivedRoutingKey=hello.b, deliveryTag=1, consumerTag=amq.ctag-yZryIR801zNV5f0f9mJd9g, consumerQueue=Queue@hello])
channel:Cached Rabbit Channel: AMQChannel(amqp://admin@127.0.0.1:5672/,2), conn: Proxy@4f4c88f9 Shared Rabbit Connection: SimpleConnection@7c52fc81 [delegate=amqp://admin@127.0.0.1:5672/, localPort= 62067]

Keywords: Java RabbitMQ MQ

Added by Petrushka on Sun, 26 Sep 2021 14:05:50 +0300