Rabbitmq annotation and message serialization

Rabbitmq annotation and message serialization

MessageConvert

  • Application serialization involving network transmission is inevitable. The sender converts the message into a byte array for transmission according to some rules, and the receiver parses the byte [] array according to the agreed rules
  • The serialization of RabbitMQ refers to the body attribute of Message, that is, the content we really need to transmit. RabbitMQ abstracts a MessageConvert interface to process Message serialization. In fact, there are SimpleMessageConverter (default), Jackson2JsonMessageConverter, etc
  • When the convertAndSend method is called, MessageConvert is used for message serialization
  • The SimpleMessageConverter does not process the message body to be sent when it is byte []. If it is a String, it will be converted into a byte array. If it is a Java object, it will use jdk serialization to convert the message into a byte array. The converted result is large, including class name, class corresponding methods and other information. Therefore, the performance is poor
  • When RabbitMQ is used as middleware, the amount of data is relatively large. At this time, it is necessary to consider using serialization forms such as Jackson2JsonMessageConverter to improve performance

@RabbitListener usage

**Annotation method using annotation @ RabbitListener**

@The bindings property of the RabbitListener declares Binding (if the Queue, Exchange and RouteKey required by the Binding do not exist in RabbitMQ, it will be created automatically, and an exception will be thrown if it exists)

@RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "queue_email", durable = "true"),
            exchange = @Exchange(
                    value = "topic.exchange",
                    ignoreDeclarationExceptions = "true",
                    type = ExchangeTypes.TOPIC
            ),
            key = {"topic.#.email.#","email.*"}))
    public void receiveEmail(String msg){
        System.out.println(" [mail serve] received : " + msg + "!");
    }

@The RabbitListener can be marked on the class and needs to be used together with the @ RabbitHandler annotation

@The RabbitListener tag on the class indicates that when a message is received, it will be handed over to the method of @ RabbitHandler for processing. The specific method to be used for processing depends on the parameter type converted by MessageConverter

@Component
@RabbitListener(queues = "consumer_queue")
public class Receiver {

    @RabbitHandler
    public void processMessage1(String message) {
        System.out.println(message);
    }

    @RabbitHandler
    public void processMessage2(byte[] message) {
        System.out.println(new String(message));
    }
}

be careful:

  • The message processing method parameters are converted by the MessageConverter. If a custom MessageConverter is used, it needs to be set in the RabbitListenerContainerFactory instance (spring implements simplerabitlistenercontainerfactory by default)

  • Content of the Message_ The type attribute indicates the data format in which the Message body data is stored. In addition to using the Message object to receive the Message (including Message attributes and other information), you can also directly use the corresponding type to receive the Message body content. However, if the method parameter type is incorrect, an exception will be thrown

    • Application / octet stream: binary byte array storage, using byte []
    • application/x-java-serialized-object: stored in Java Object serialization format, using Object and corresponding type (the type should have the same name as the package during deserialization, or else an exception will be thrown if the class cannot be found)
    • text/plain: text data type storage, using String
    • application/json: JSON format, using Object and corresponding types

@Headers and @ Payload

  • @Header injects a single property of the message header
  • @Payload injects the message body into a JavaBean
  • @Headers inject all headers into a Map
    /**
     * You can declare switches, bindings, and queues directly through annotations. But if the declared is inconsistent with what already exists in rabbitMq
     * It will report errors for testing. I don't use persistence here. It will be deleted automatically after there are no consumers
     * {@link RabbitListener}It can be repeated. There can also be multiple key s that declare queue binding
     *
     * @param headers
     * @param msg
     */
@RabbitListener(
        bindings = @QueueBinding(
            exchange = @Exchange(value = RabbitMQConstant.DEFAULT_EXCHANGE, type = ExchangeTypes.TOPIC,
                durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = RabbitMQConstant.true_CONSTANT),
            value = @Queue(value = RabbitMQConstant.DEFAULT_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT,
                autoDelete = RabbitMQConstant.true_CONSTANT),
            key = DKEY
        ),
        //Manually specify the consumer's listening container. By default, Spring automatically generates a SimpleMessageListenerContainer
        containerFactory = "container",
        //Specify the number of threads of the consumer. A thread will open a Channel, and messages on a queue will be consumed only once (regardless of message re entering the queue). The following indicates that at least 5 threads will be opened, up to 10 threads. The number of threads needs to be determined according to your tasks. If it is calculation intensive, the number of threads should be less
        concurrency = "5-10"
    )
    public void process(@Headers Map<String, Object> headers, @Payload ExampleEvent msg) {
        log.info("basic consumer receive message:{headers = [" + headers + "], msg = [" + msg + "]}");
    }
 
 
/**
     * {@link Queue#ignoreDeclarationExceptions}Declaring the queue ignores errors. If the queue is not declared, the consumer is still available
     *
     * @param headers
     * @param msg
     */
    @RabbitListener(queuesToDeclare = @Queue(value = RabbitMQConstant.DEFAULT_QUEUE, ignoreDeclarationExceptions = RabbitMQConstant.true_CONSTANT))
    public void process2(@Headers Map<String, Object> headers, @Payload ExampleEvent msg) {
        log.info("basic2 consumer receive message:{headers = [" + headers + "], msg = [" + msg + "]}");
    }

be careful

If it's com rabbitmq. client. Channel,org. springframework. amqp. core. Message and org springframework. messaging. Message types can be injected directly without annotation. If they are not these types, the parameters without annotations will be treated as the message body. Cannot have more than one message body. The following method, ExampleEvent, is the default message body

public void process2(@Headers Map<String, Object> headers,ExampleEvent msg);

Keywords: Java RabbitMQ

Added by jmabbate on Sat, 18 Dec 2021 16:05:02 +0200