Rocket Message Queue installation and application (summary)

Message: information carrier            Queue: data structure, first in first out  

Several application mechanisms of message queuing:

1. Asynchronous communication

The communication mode of message queue is similar to that of distributed service call, but asynchronous communication achieves asynchronous effect through message queue. The producer does not need to wait for the message result, but can directly put the message in the message queue, and then the consumer can take out the message at an appropriate time to realize the communication service;

2. Application decoupling

Thanks to the asynchronous mechanism of message queue, the coupling relationship between producers and consumers can be untied. Producers and consumers don't care who will send or take out messages;

3. Redundant storage

MQ itself has the function of storing data. When the producer produces more information than the consumer can take out, it can persist the redundant information;

4. Peak cutting and valley filling

When the flow input is unstable and there is a flow peak, consumers cannot take out and digest the flow in time. Therefore, through the redundant storage mechanism, the flow peak flow is converted into a stable flow output, so as to protect consumers from avalanches or spillovers;

Rocket Message Queue:

(Alibaba Message Queuing Middleware)

Common MQ: ActiveMQ, RabbitMQ, Kafka, pulsar, RocketMQ

  RocketMQ conceptual model

  • Producer: the role responsible for publishing messages and supporting distributed cluster deployment;
  • Consumer: it is responsible for consuming messages, supports distributed cluster deployment, provides real-time message subscription mechanism, and supports Push, Pull, cluster and broadcast to obtain messages;
  • NameService: a Topic routing registry that supports dynamic registration and discovery of brokers, mainly including Broker management, providing heartbeat detection mechanism and routing information management;
  • Broker server: broker is mainly responsible for message storage, delivery and query, as well as service high availability assurance;

Simple case:

1. Create Maven QuickStart project

2. Add dependency

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.1</version>
</dependency>

3. Producer side

public class Sender {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException {

        //(1) Create producer
        /**
         * Producer grouping
         */
        DefaultMQProducer producer = new DefaultMQProducer("TestSender");
        producer.setNamesrvAddr(AppConstants.ROCKETMQ_NAMESERVER_ADDR);
        //(2) Start producer
        producer.start();

        //(3) Build and send messages
        Scanner scanner = new Scanner(System.in);
        while (true) {
            System.out.println("Please enter a message to send");
            String smsContent = scanner.next();
            if (smsContent.equals("exit")) {
                //(4) Close producer
                producer.shutdown();
            }
            Message msg = new Message(AppConstants.SMS_TOPIC, "user_register", smsContent.getBytes("UTF-8"));

            //Send synchronization to RocketMQ
            SendResult sendResult = producer.send(msg);
            System.out.println("sendResult:" + sendResult);
        }
    }
}

4. Consumer side

public class Receiver {

    public static void main(String[] args) throws MQClientException {
        //(1) Create consumer instance
        //Consumers are grouped, and consumers with the same name form a cluster
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TestReceive");

        consumer.setNamesrvAddr(AppConstants.ROCKETMQ_NAMESERVER_ADDR);


        //(2) Subscribe to a topic and receive a specific message
        consumer.subscribe(AppConstants.SMS_TOPIC,"*");

        //(3) Register a listener with MQ
        /*
            msgs Message list
            context Message context
         */
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            for (MessageExt msgExt:msgs){

                try {
                    System.out.println("Message content:"+new String(msgExt.getBody(),"utf-8"));

                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }

            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        // (4) Start consumer instance
        consumer.start();
        System.out.printf("Consumer Started.%n");

    }
}

Three modes of sending messages:

Synchronization confirmation sending result:

Synchronous sending means that after sending a message, the message sender will send the next message after receiving the synchronization response from the server;

Application scenario: this method has a wide range of application scenarios, such as important notification email, registration SMS notification, marketing SMS system, etc.

  Producer Code:

public class Sender {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException {

        //(1) Create producer
        /**
         * Producer grouping
         */
        DefaultMQProducer producer = new DefaultMQProducer("TestSender");
        producer.setNamesrvAddr(AppConstants.ROCKETMQ_NAMESERVER_ADDR);
        //(2) Start producer
        producer.start();

        //Send circularly to view the callback result sequence
        for (int i = 0; i < 10; i++) {
            //(3) Build and send messages
            String smsContent = "hey";
            Message msg = new Message(AppConstants.SMS_TOPIC, "user_register", (smsContent + i).getBytes("UTF-8"));

            //Send synchronization to RocketMQ
            SendResult sendResult = producer.send(msg);
            System.out.println("sendResult:" + sendResult);
        }

        //(4) Close producer
        producer.shutdown();

    }

}

Asynchronous confirmation sending result:

Asynchronous sending refers to the communication mode in which the sender sends a message without waiting for the server to return a response, and then sends the next message. For asynchronous sending of message queue RocketMQ version, you need to implement asynchronous sending callback interface (SendCallback). After sending a message, the message sender can send a second message without waiting for a response from the server. The sender receives the server response through the callback interface and processes the response result.

Application scenario: asynchronous sending is generally used in business scenarios where the link takes a long time and is sensitive to response time. For example, you notify to start the transcoding service after uploading the video, and notify to push the transcoding results after transcoding is completed.

  Producer Code:

public class AsyncSender {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException {

        //(1) Create producer
        /**
         * Producer grouping
         */
        DefaultMQProducer producer = new DefaultMQProducer("AsyncTestSender");
        producer.setNamesrvAddr(AppConstants.ROCKETMQ_NAMESERVER_ADDR);
        //(2) Start producer
        producer.start();

        producer.setRetryTimesWhenSendAsyncFailed(0); //Number of retries after asynchronous send failure


        //(3) Build and send messages
        String smsContent = "hey";
        for (int i = 0; i < 10; i++) {

            Message msg = new Message(AppConstants.SMS_TOPIC, "user_register", (smsContent + i).getBytes("UTF-8"));
            //Send to RocketMQ and confirm asynchronously through callback
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(sendResult);
                    System.out.println("send status " + System.currentTimeMillis());
                }

                @Override
                public void onException(Throwable e) {
                    e.printStackTrace();
                }
            });

        }
        
        System.out.println("send finished " + System.currentTimeMillis());
        //(4) Close producer
        producer.shutdown();

    }
}

End oneway sending:

The sender is only responsible for sending the message, does not wait for the response returned by the server, and there is no callback function trigger, that is, only sends the request and does not wait for the response. The time-consuming process of sending messages in this way is very short, generally at the microsecond level.

Application scenario: it is applicable to some scenarios that take a very short time but do not require high reliability, such as log collection.

  Core code:

 //Send to RocketMQ without waiting for results
                producer.sendOneway(msg);

Keywords: Java Spring Distribution Middleware intellij-idea

Added by sparrrow on Tue, 07 Dec 2021 15:54:03 +0200