📚 Demo of several message sending methods of Rocket MQ

1, Basic sample

Knowledge foreshadowing

The steps of message sending are briefly listed as follows:

  1. Create a message producer and specify the producer group name
  2. Contact the Broker manager nameserver first. You need to specify the nameserver address
  3. Start producer.
  4. Create a message object and specify the Topic, Tag and message body
  5. send message
  6. Close producer

Create projects and import dependencies

First, select maven framework quick start in IDEA. After creating it, click POM Import the rocketmq client dependency into the XML file.

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.7.1</version>
</dependency>

Synchronous message sending

Synchronous messages are characterized by high reliability and wide use. Application scenarios include SMS notification and important message notification.

The code of synchronous sending message is as follows:

private static final String SYNC_MSG_TOPIC = "SYNC_MSG_TOPIC";
private static final String SYNC_MSG_TAG = "SYNC_MSG_TAG";
private static final String NAMESRV_ADDR = "localhost:19876;localhost:29876";
private static final String PRODUCER_GROUP = "GROUP_NAME";

public static void main(String[] args)
throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
    //Create Producer, set Namesrv address, start Producer
    DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
    producer.setNamesrvAddr(NAMESRV_ADDR);
    producer.start();

    for (int i = 0; i < 10; i++) {
        //Set topic, tag and content
        Message msg = new Message(SYNC_MSG_TOPIC, SYNC_MSG_TAG,
            (i + "Data content contained in message No").getBytes(StandardCharsets.UTF_8));
        //send out
        System.out.printf("%d Message No,Sending status:%s\n", i, producer.send(msg));
        //The thread sleeps for 1s before sending the next one
        TimeUnit.SECONDS.sleep(1);
    }
  
    //Close the producer, otherwise the program will not exit
    producer.shutdown();
}

Asynchronous message sending

Asynchronous messages are usually used in business scenarios that require a short response time, that is, the Producer cannot tolerate too long a response from the Broker.

The code feature of asynchronous message is to formulate callback function. When the message is returned, the callback function needs to be called to execute the processing logic of response message.

The asynchronous message sending code is as follows:

private static final String ASYNC_MSG_TOPIC = "ASYNC_MSG_TOPIC";
private static final String ASYNC_MSG_TAG = "ASYNC_MSG_TAG";
private static final String NAMESRV_ADDR = "localhost:19876;localhost:29876";
private static final String PRODUCER_GROUP = "GROUP_NAME";

@SneakyThrows
public static void main(String[] args) {
    DefaultMQProducer producer = initProducer();

    for (int i = 0; i < 10; i++) {
        Message msg = new Message(ASYNC_MSG_TOPIC, ASYNC_MSG_TAG,
            (i + "Data content contained in message No").getBytes(StandardCharsets.UTF_8));
        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult result) {
                System.out.println("Send results:" + result);
            }
            @Override
            public void onException(Throwable e) {
                System.out.println("Send exception:" + e.getMessage());
            }
        });
        
        TimeUnit.SECONDS.sleep(1);
    }
    producer.shutdown();
}

private static DefaultMQProducer initProducer() throws MQClientException {
  DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
  producer.setNamesrvAddr(NAMESRV_ADDR);
  producer.start();
  return producer;
}

According to the performance of the running program, messages sent asynchronously may also fail to be sent. For example, set the sleep delay of the following thread to

 TimeUnit.MILLISECONDS.sleep(25);

One way message sending

The producer does not care about the reception of sent messages, such as sending logs.

public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
    DefaultMQProducer producer = initProducer();

    for (int i = 0; i < 10; i++) {
        //Set topic, tag and content
        Message msg = new Message(ONE_WAY_MSG_TOPIC, ONE_WAY_MSG_TAG,
            (i + "Data content contained in message No").getBytes(StandardCharsets.UTF_8));
        //Send, no return value
        producer.sendOneway(msg);
        //The thread sleeps for 1s before sending the next one
        TimeUnit.MILLISECONDS.sleep(100);
    }
    //Close the producer, otherwise the program will not exit
    producer.shutdown();
}

Consumption news

The steps of message consumption are briefly listed as follows:

  1. Create a Consumer and make a Consumer group name
  2. Specify the NameServer address and find the Broker where you want to consume the message
  3. Subscribe to Topic and Tag
  4. Set the callback function, which is responsible for the processing logic after the message is received.
  5. Start Consumer

Using consumers, asynchronous messages generated and stored in the Broker node before consumption:

private static final String CONSUMER_GROUP = "CONSUMER_GROUP";
private static final String NAMESRV_ADDR = "localhost:19876;localhost:29876";

@SneakyThrows
public static void main(String[] args) {
    //Specifies the asynchronous producer identity and label
    DefaultMQPushConsumer consumer = initConsumer(AsyncProducer.ASYNC_MSG_TOPIC, AsyncProducer.ASYNC_MSG_TAG);
    //Receive message content
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            msgs.forEach(msg-> System.out.println(new String(msg.getBody())));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
    //Start consumer
    consumer.start();
}

private static DefaultMQPushConsumer initConsumer(String topic, String tag) throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
    consumer.setNamesrvAddr(NAMESRV_ADDR);
    consumer.subscribe(topic, tag);
    return consumer;
}

You can operate any previous producer program at the same time (pay attention to configuring topic and tag) to view the production and consumption of messages in real time and intuitively. Once the producer's production is completed, consumers can get the subscribed messages and output them immediately.

Analysis and key points:

  1. CONSUMER_ Group = "consultant_group" organize multiple consultants together and use them together with the following message model to improve the concurrent processing capability.
  2. NAMESRV_ADDR = "localhost:19876;localhost:29876"; Configuring two nameservers is still to eliminate the single point of failure of NameServer.
  3. Topic indicates the message type, which is created in advance. Messages under a topic can be classified by tag. You can specify a tag through the above consumer subscribe(topic, tag); This call can also be consumer subscribe(topic, "tag1||tag2||tag3"); This method is used to support the message reception of multiple tags.

Broadcast mode and load balancing mode of consumers

Consumers can start multiple, which can process the published messages more quickly. Since there are multiple consumers, we should consider who consumes how much news.

Broadcast mode

Each consumer consumes the same message. For example, release messages a, b and c. the broadcast mode is like sending a broadcast. Everyone can hear the same content. For consumers, everyone has to deal with a, b and c.

Cooperate with consultant_ Group = "consumer_group", which enables each consumer in the same consumer group to consume as many messages.

Load balancing mode

Consumers deal with messages a, B and c together. The number of each person is different, but the total number of messages consumed is three. For example, consumer a consumes a, B and consumer B consumes c.

Cooperate with consultant_ Group = "consumer_group", which enables each consumer in the same consumer group to consume a batch of messages together and divide their work to achieve the effect of load balancing.

Test code

Open two previous Consumer programs.

consumer.setMessageModel(model); Set the model to messagemodel in the consumer settings Clustering load balancing mode.

@SneakyThrows
public static void main(String[] args) {
    //Specifies the asynchronous producer identity and label
    DefaultMQPushConsumer consumer = initConsumer(ONE_WAY_MSG_TOPIC, "*",
        MessageModel.CLUSTERING);
		//... 
}

private static DefaultMQPushConsumer initConsumer(String topic, String tag, MessageModel model)
throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
    //...
    consumer.setMessageModel(model);
    //...
}

The default consumption mode is load balancing, which can be modified to broadcast mode messagemodel Broadcasting, you can see that each consumer consumes the same amount of messages

2, Sequential message

Sequential messages require that the order in which consumers consume messages must be consistent with the order in which producers produce messages.

Since it is a message queue, the first idea of the underlying data structure is the queue structure, which is born because of this data structure to meet the characteristics of FIFO. But rmq not so.

The broker of rmq holds multiple queues. If the pdc sends multiple Topic messages, the multiple queues of brk will be included in the queue of the corresponding Topic. Consumers use the method of monitoring and multi-threaded to receive messages. But the problem is that consumers' multiple "ears" can't listen to messages in order without the guarantee of the mechanism of controlling the order.

The method of ensuring message sequence is generally as follows:

  1. Ensure the order of global messages. Global refers to the consumption order of messages in all queues, which must meet the production order of producers. For example, if the message "open the refrigerator → put it into the elephant" is guaranteed to be consumed in order, the outgoing action of brk's "put it into the elephant" message queue must meet the outgoing action of the "open the refrigerator" message queue. It can be seen that the granularity of this control is very large, which is easy to affect the consumption speed of messages of other services.
  2. Ensure the order of local messages. In other words, I only need to ensure the sequential consumption of the business of "opening the refrigerator → putting in the elephant", that is, for example, if Party A and Party B successively initiate the business of "opening the refrigerator → putting in the elephant", then I need to ensure the sequential consumption of Party A and Party B.

To ensure the order of local messages, Party A and Party B respectively enjoy two message queues, and their messages can be stored and sent in the queue in order. When consuming, each message in the queue has a corresponding consumer (the relationship between consumer and queue is one to many) independent single thread consumption message.

The following is a case of placing an order. The process is as follows:

  1. Create order
  2. payment
  3. Push
  4. complete

Those with the same order number will be sent to the same message queue successively. During consumption, consumers will find the same queue according to the order number and obtain messages according to processes 1, 2, 3 and 4.

Order entity class

See Appendix

producer

public static final String ORDER_TOPIC = "ORDER_TOPIC";
public static final String ORDER_TAG = "ORDER_TAG";

@SneakyThrows
public static void main(String[] args) {
    DefaultMQProducer producer = MqUtils.initProducer();
    //Order set to be sent
    List<Order> orderList = Order.orders();
    //send message
    for (int i = 0, orderListSize = orderList.size(); i < orderListSize; i++) {
        Order o = orderList.get(i);
        Message msg = new Message(ORDER_TOPIC, ORDER_TAG, "index:" + i, o.toString()
            .getBytes(StandardCharsets.UTF_8));

        /*
         * MessageQueueSelector Message queue selector, which queue the producer puts the produced messages in, is determined by the rules given by the object
         * o.getOrderId() As business id: order id
         */
        SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
            /**
             *
             * @param mqs Queue collection
             * @param msg Message object
             * @param arg The parameters of the business representation come from o.getOrderId()
             * @return
             */
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                Long orderId = (Long) arg;
                //User defined order queue selection rules: module fetching
                int mod = (int) (orderId % mqs.size());

                return mqs.get(mod);
            }
        }, o.getOrderId());

        System.out.println("Send results" + sendResult);
    }

    //close
    producer.shutdown();
}

Consumer and operational results

@SneakyThrows
public static void main(String[] args) {
    DefaultMQPushConsumer consumer = MqUtils.initConsumer(SequenceProducer.ORDER_TOPIC, "*", null);
    //Register callback processing logic
    consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
        msgs.forEach(msg -> System.out.println("Thread name" + Thread.currentThread()
            .getName() + "Consumed message:" + new String(msg.getBody())));
        return ConsumeOrderlyStatus.SUCCESS;
    });

    consumer.start();

    System.out.println("Consumer launch!");
}

Consumer launch!
Thread name ConsumeMessageThread_2 consumed the message: Order(orderId=1001, description = create order)
Thread name ConsumeMessageThread_1 consumed the message: Order(orderId=2002, description = create order)
Thread name ConsumeMessageThread_3 consumed the message: Order(orderId=1099, description = create order)
Thread name ConsumeMessageThread_3 consumed message: Order(orderId=1099, description = payment)
Thread name ConsumeMessageThread_1 consumed the message: Order(orderId=2002, description = payment)
Thread name ConsumeMessageThread_2 consumed the message: Order(orderId=1001, description = payment)
Thread name ConsumeMessageThread_1 consumed the message: Order(orderId=2002, description = push)
Thread name ConsumeMessageThread_2 consumed the message: Order(orderId=1001, description = push)
Thread name ConsumeMessageThread_2 consumed the message: Order(orderId=1001, description = finished)

It can be seen that the consumption order of the same order is completely in the predetermined order. Whether it is a complete order (No. 1001) or the other two orders with incomplete process.

3, Delay message

The Message will not be consumed by the consumer immediately, but will be consumed after the delay set for the Message.

producer

private static final Integer DELAY_TIME_LEVEL_5_SEC = 2;
public static final String DELAY_TOPIC = "DELAY_TOPIC";

@SneakyThrows
public static void main(String[] args) {
    DefaultMQProducer producer = initProducer();

    for (int i = 0; i < 10; i++) {
        Message msg = new Message(DELAY_TOPIC, "*",
            ("Message content: Hello World from" + i).getBytes(StandardCharsets.UTF_8));
        msg.setDelayTimeLevel(DELAY_TIME_LEVEL_5_SEC);
        SendResult send = producer.send(msg);
        System.out.println("Send results:" + send);
        TimeUnit.SECONDS.sleep(1);
    }

    producer.shutdown();
}

Consumer and operational results

@SneakyThrows
public static void main(String[] args) {
    DefaultMQPushConsumer consumer = initConsumer(DelayProducer.DELAY_TOPIC, "*", null);

    consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
        prettyPrintMessage(msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });

    consumer.start();
    System.out.println("Consumer launch complete!");
}

Thread name ConsumeMessageThread_2 consumed message: message content: Hello World from2 delay time: 88219
Thread name ConsumeMessageThread_4 consumed message: message content: Hello World from0 delay time: 90260
Thread name ConsumeMessageThread_6 consumed message: message content: Hello World from8 delay time: 82175
Thread name ConsumeMessageThread_5 consumed message: message content: Hello World from9 delay time: 81164
Thread name ConsumeMessageThread_3 consumed message: message content: Hello World from1 delay time: 89226
Thread name ConsumeMessageThread_1 consumed message: message content: Hello World from3 delay time: 87213
Thread name ConsumeMessageThread_7 consumed message: message content: Hello World from4 delay time: 87194
Thread name ConsumeMessageThread_8 consumed message: message content: Hello World from5 delay time: 86189
Thread name ConsumeMessageThread_9 consumed message: message content: Hello World from7 delay time: 84172
Thread name ConsumeMessageThread_10 consumed message: message content: Hello World from6 delay time: 85183

Cancel the delay setting of the producer, that is, MSG The code of setdelaytimelevel (delay_time_level_5_sec) is commented out to start the producer again. Compare and observe the effect.

Thread name ConsumeMessageThread_11 consumed message: message content: Hello World from0 delay time: 3
Thread name consumemessagethread_ Message consumption time: Hello from world 12
Thread name ConsumeMessageThread_13 consumed message: message content: Hello World from2 delay time: 3
Thread name ConsumeMessageThread_14 consumed message: message content: Hello World from3 delay time: 2
Thread name ConsumeMessageThread_15 consumed message: message content: Hello World from4 delay time: 2
Thread name ConsumeMessageThread_16 consumed message: message content: Hello World from5 delay time: 3
Thread name ConsumeMessageThread_17 consumed message: message content: Hello World from6 delay time: 2
Thread name ConsumeMessageThread_18 consumed message: message content: Hello World from7 delay time: 3
Thread name ConsumeMessageThread_19 consumed message: message content: Hello World from8 delay time: 3
Thread name ConsumeMessageThread_20 consumed message: message content: Hello World from9 delay time: 3

4, Batch message

The previous message sending is done in the for loop, which calls the producer's send method and obtains the result. Of course, there is a rewriting method for producers, which can accept a set of messages and send messages in batches.

The rest of the code is roughly the same as the above, except for the difference between the place where the incoming Message object collection is sent to the producer:

Message msg1 = new Message(BATCH_TOPIC, "*", ("Message content: Hello World from 1").getBytes(UTF_8));
Message msg2 = new Message(BATCH_TOPIC, "*", ("Message content: Hello World from 1").getBytes(UTF_8));
Message msg3 = new Message(BATCH_TOPIC, "*", ("Message content: Hello World from 1").getBytes(UTF_8));
List<Message> messages = new ArrayList<>(Arrays.asList(msg1, msg2, msg3));

//Send and output
SendResult sendResult = producer.send(messages);

It should be noted that the message size cannot exceed 4M. When the message size is too large, it should be considered to send it in batches. The following is a tool class to complete the segmentation of the message set

ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
  List<Message> batch = splitter.next();
  SendResult sendResult = producer.send(batch);
  prettyPrintSendResult(sendResult);
}

The code of the tool class is placed in the appendix.

appendix

Order entity class

@Data
@Builder
public class Order {
    /**
     * order number
     */
    private Long orderId;

    /**
     * Order step description
     */
    private String description;

    public static List<Order> orders() {
        List<Order> orders = new ArrayList<>();

        orders.add(Order.builder()
            .orderId(1001L)
            .description("Create order")
            .build());
        orders.add(Order.builder()
            .orderId(1001L)
            .description("payment")
            .build());
        orders.add(Order.builder()
            .orderId(1001L)
            .description("Push")
            .build());
        orders.add(Order.builder()
            .orderId(1001L)
            .description("complete")
            .build());

        orders.add(Order.builder()
            .orderId(1099L)
            .description("Create order")
            .build());
        orders.add(Order.builder()
            .orderId(1099L)
            .description("payment")
            .build());


        orders.add(Order.builder()
            .orderId(2002L)
            .description("Create order")
            .build());
        orders.add(Order.builder()
            .orderId(2002L)
            .description("payment")
            .build());
        orders.add(Order.builder()
            .orderId(2002L)
            .description("Push")
            .build());

        return orders;
    }

}

Tool class MqUtils

public class MqUtils {
    /**
     * nameServer Address of the cluster
     */
    public static final String NAMESRV_ADDR = "localhost:19876;localhost:29876";

    /**
     * Producer group name
     */
    public static final String PRODUCER_GROUP = "GROUP_NAME";

    /**
     *Consumer group name
     */
    public static final String CONSUMER_GROUP = "CONSUMER_GROUP";

    /**
     * Initialize producer
     * PRODUCER_GROUP = "GROUP_NAME";
     */
    @NotNull
    public static DefaultMQProducer initProducer() throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
        producer.setNamesrvAddr(NAMESRV_ADDR);
        producer.start();
        return producer;
    }

    /**
     * Initialize consumer
     * PRODUCER_GROUP = "GROUP_NAME";
     */
    @NotNull
    public static DefaultMQPushConsumer initConsumer(String topic, String tag, MessageModel model)
    throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
        consumer.setNamesrvAddr(NAMESRV_ADDR);
        if (Objects.nonNull(model)){
            consumer.setMessageModel(model);
        }
        consumer.subscribe(topic, tag);
        return consumer;
    }
     
  	/**
     * Format output message related information
     */
      public static void prettyPrintMessage(List<MessageExt> msgs){
        msgs.forEach(msg -> System.out.println("Thread name" + Thread.currentThread()
            .getName() + "Consumed message:" + new String(msg.getBody())));
    }

}

Batch send message splitter ListSplitter

public class ListSplitter implements Iterator<List<Message>> {
    private static final int SIZE_LIMIT = 1024 * 1024 * 4;
    private static final int LOG_BYTE_SIZE = 20;

    private final List<Message> messages;

    private int currIndex;

    public ListSplitter(List<Message> messages) {this.messages = messages;}

    @Override
    public boolean hasNext() {
        return currIndex < messages.size();
    }

    @Override
    public List<Message> next() {
        int nextIndex = currIndex;
        int totalSize = 0;

        for (; nextIndex < messages.size(); nextIndex++) {
            Message msg = messages.get(nextIndex);
            //Subject and content
            int tmpSize = msg.getTopic()
                .length() + msg.getBody().length;
            //Additional attributes
            for (Map.Entry<String, String> entry : msg.getProperties()
                .entrySet()) {
                tmpSize += entry.getKey()
                    .length() + entry.getValue()
                    .length();
            }
            //Log size
            tmpSize += LOG_BYTE_SIZE;

            if (tmpSize > SIZE_LIMIT) {
                if (nextIndex - currIndex == 0) {
                    nextIndex++;
                }
                break;
            }
            //The total size is larger than 4M
            if (tmpSize + totalSize > SIZE_LIMIT) {
                break;
            } else {
                totalSize += tmpSize;
            }
        }
        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }
}

Keywords: Middleware message queue RocketMQ

Added by compound_eye on Thu, 03 Feb 2022 20:17:58 +0200