Use demo of RocketMq

ps: Many of the demos in this article refer to the official document of rocketmq, but different from the official document, many easier demos are provided here

rely on

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.3.0</version>
</dependency>

ps client configuration information

Both producers and consumers belong to MQ clients and inherit from the ClientConfig class, which is the public configuration class of the client. Here, the related configuration information of the client is written in the front. You can see the properties, Client configuration

1. The Producer sends a synchronization message

This reliable synchronous sending method is widely used, such as important message notification and short message notification.

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        // Instantiate message Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // Set the address of the NameServer
        producer.setNamesrvAddr("localhost:9876");
        // Start Producer instance
        producer.start();
        for (int i = 0; i < 100; i++) {
            // Create a message and specify Topic, Tag and message body
            Message msg = new Message("TopicTest" /* Topic */,
            "TagA" /* Tag */,
            ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            // Send messages to a Broker
            SendResult sendResult = producer.send(msg);
            // Whether the message was successfully delivered is returned through sendResult
            System.out.printf("%s%n", sendResult);
        }
        // If you no longer send messages, close the Producer instance.
        producer.shutdown();
    }
}

2. Send asynchronous message

Asynchronous messages are usually used in business scenarios that are sensitive to response time, that is, the sender cannot tolerate waiting for a Broker's response for a long time.

be careful
  • Asynchronous sending actually adds a callback function to send
  • Since send is an asynchronous request, if we haven't finished sending it, we'll start producer shutdown(); Closing the instantiation producer will cause send failure. Therefore, countDownLatch is referenced here. You don't need to pay attention to when it ends. The subtraction operation is not seen here because it is performed by the other end If you only use a simple timeunit SECONDS. sleep(10); Maybe we haven't finished push ing. Our producer instance has been closed
public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        // Instantiate message Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // Set the address of the NameServer
        producer.setNamesrvAddr("localhost:9876");
        // Start Producer instance
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);
    
    int messageCount = 100;
        // Instantiate the countdown calculator based on the number of messages
    final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
        for (int i = 0; i < messageCount; i++) {
                final int index = i;
                // Create a message and specify Topic, Tag and message body
                Message msg = new Message("TopicTest",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // SendCallback receives a callback that returns results asynchronously
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.printf("%-10d OK %s %n", index,
                            sendResult.getMsgId());
                    }
                    @Override
                    public void onException(Throwable e) {
                      System.out.printf("%-10d Exception %s %n", index, e);
                      e.printStackTrace();
                    }
                });
        }
    // Wait for 5s
    countDownLatch.await(5, TimeUnit.SECONDS);
        // If you no longer send messages, close the Producer instance.
        producer.shutdown();
    }
}

3. One way send message

This method is mainly used in scenarios that do not particularly care about sending results, such as log sending.

public class OnewayProducer {
    public static void main(String[] args) throws Exception{
        // Instantiate message Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // Set the address of the NameServer
        producer.setNamesrvAddr("localhost:9876");
        // Start Producer instance
        producer.start();
        for (int i = 0; i < 100; i++) {
            // Create a message and specify Topic, Tag and message body
            Message msg = new Message("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            // Send a one-way message without returning any results
            producer.sendOneway(msg);

        }
        // If you no longer send messages, close the Producer instance.
        producer.shutdown();
    }
}

1.3 consumption news

be careful,
  • Subscribe is used here. Multiple topic s or tags can be passed in here, but they should be separated from each other directly with ðž“œ and spaces, such as consumer subscribe("TopicTest", "tag1 || tag2 ||tag3");
  • The consumption here is consumed through multiple threads in the thread pool. We can define the number of consuming threads setConsumeThreadMax(count),setConsumeThreadMin(count), or how many messages each thread takes at one time setConsumeMessageBatchMaxSize(count)
public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        // Instantiate consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

        // Set the address of the NameServer
        consumer.setNamesrvAddr("localhost:9876");

        // Subscribe to one or more topics and tags to filter messages to be consumed
        consumer.subscribe("TopicTest", "*");
        // Register the callback implementation class to handle the messages pulled back from the broker
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // Mark that the message has been successfully consumed
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // Start consumer instance
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

2 sequence message example

Message ordering means that messages can be consumed (FIFO) according to the sending order of messages. RocketMQ can strictly guarantee message order, which can be divided into partition order or global order.

The principle of sequential consumption is analyzed. By default, Round Robin polling will be used to send messages to different queues (partition queues); When consuming messages, pull messages from multiple queues. In this case, the order of sending and consumption cannot be guaranteed. However, if the control sending sequence messages are only sent to the same queue in turn, and the consumption is only pulled from this queue in turn, the sequence is guaranteed. When there is only one queue for sending and consuming, it is globally ordered; If multiple queues participate, the partition is ordered, that is, the messages are ordered relative to each queue.

The following is an example of order partitioning. The sequential process of an order is: create, pay, push and complete. Messages with the same order number will be sent to the same queue successively. When consuming, the same OrderId must get the same queue.

2.1 sequential message production

target

Publish the messages that you want to consume in order to a unified queue. Here, three orders are defined. You want them to put them in three queues, and the messages are consumed in the order I add them.

public class OrderMsgProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("order_topic_producer");

        producer.setNamesrvAddr("1xxxxxxxx:9876");

        producer.start();


        // Order list
        List<OrderStep> orderList = new OrderMsgProducer().buildOrders();

        for (OrderStep orderStep : orderList) {
            Message msg = new Message("order_topic",orderStep.toString().getBytes(StandardCharsets.UTF_8));

            //The three messages fell into three queues
            producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Long id = (Long) arg;  //Select the send queue according to the order id
                    long index = id % mqs.size();
                    return mqs.get((int) index);
                }
            }, orderStep.getOrderId());//Order id
        }
        producer.shutdown();
    }

    /**
     * Order steps
     */
    @Data
    private static class OrderStep {
        private long orderId;
        private String type;
        private String desc;
    }

    /**
     * Generate simulated order data
     * 3 orders with different processes
     */
    private List<OrderStep> buildOrders() {
        List<OrderStep> orderList = new ArrayList<OrderStep>();

        OrderStep orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setType("A");
        orderDemo.setDesc("establish");
        orderList.add(orderDemo);


        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setType("A");
        orderDemo.setDesc("payment");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setType("A");
        orderDemo.setDesc("Push");
        orderList.add(orderDemo);


        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setType("A");
        orderDemo.setDesc("complete");
        orderList.add(orderDemo);


        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("establish");
        orderDemo.setType("B");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("payment");
        orderDemo.setType("B");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setType("B");
        orderDemo.setDesc("complete");
        orderList.add(orderDemo);


        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("establish");
        orderDemo.setType("C");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setType("C");
        orderDemo.setDesc("payment");
        orderList.add(orderDemo);


        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setType("C");
        orderDemo.setDesc("complete");
        orderList.add(orderDemo);

        return orderList;
    }
}

The length of the figure is not enough. After pulling, you can see that 10 messages enter three queues, 4, 3 and 3 structures. Specifically, is it what we preset? You can consume it and have a look

2.2 sequential consumption message

public class OrderMsgConsumer {

    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderTopicCustomer");
        consumer.setNamesrvAddr("1xxxxxxx6");
        consumer.subscribe("order_topic","*");

        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                //The return value of ConsumeOrderlyStatus is a local transaction management, and its automatic submission is enabled here
                context.setAutoCommit(true);

                //consumption
                for (MessageExt msg : msgs) {
                    // You can see that each queue has a unique consumer thread to consume, and the orders are ordered for each queue (partition)
                    System.out.println(
                            "consumeThread: " + Thread.currentThread().getName() +
                            ", queueId: " + msg.getQueueId() +
                            ", content: " + new String(msg.getBody())
                    );
                }

                try {
                    //Simulating business logic processing
                    TimeUnit.SECONDS.sleep(1);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();

        System.out.println("Consumer Started.");
    }
}

Viewing the print results, you can find that the messages of the same order enter the same queue, and each queue is consumed by a thread, and each type of order is consumed in push order

3 example of delay message

3.1 start consumer waiting for incoming subscription message

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

public class ScheduledMessageConsumer {
   public static void main(String[] args) throws Exception {
      // Instantiate consumer
      DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
      // Subscribe to Topics
      consumer.subscribe("TestTopic", "*");
      // Register message listener
      consumer.registerMessageListener(new MessageListenerConcurrently() {
          @Override
          public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
              for (MessageExt message : messages) {
                  // Print approximate delay time period
                  System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
              }
              return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
          }
      });
      // Start consumer
      consumer.start();
  }
}

3.2 sending delay message

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class ScheduledMessageProducer {
   public static void main(String[] args) throws Exception {
      // Instantiate a producer to generate a delayed message
      DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
      // Start producer
      producer.start();
      int totalMessagesToSend = 100;
      for (int i = 0; i < totalMessagesToSend; i++) {
          Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
          // Set the delay Level 3, and the message will be sent after 10s (now only a few fixed times are supported, see delaytimelevel for details)
          message.setDelayTimeLevel(3);
          // send message
          producer.send(message);
      }
       // Close producer
      producer.shutdown();
  }
}

3.3 verification

You will see that the message is consumed 10 seconds later than the storage time.

3.4 usage scenario of delay message

For example, in e-commerce, you can send a delay message after submitting an order, check the status of the order after 1h, and cancel the order and release the inventory if it is still unpaid.

3.5 restrictions on the use of delay messages

// org/apache/rocketmq/store/config/MessageStoreConfig.java

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

At present, RocketMq does not support any time delay. It is necessary to set several fixed delay levels, from 1s to 2h, corresponding to levels 1 to 18 respectively. If the message consumption fails, it will enter the delay message queue. The message sending time is related to the set delay level and retry times. See code sendmessageprocessor for details java

4 sample batch message

Sending messages in bulk can significantly improve the performance of delivering small messages. The limitation is that these batch messages should have the same topic, the same waitStoreMsgOK, and cannot be delayed messages. In addition, the total size of this batch of messages should not exceed 4MB.

4.1 sending batch messages

If you only send messages of no more than 4MB at a time, it is easy to use batch processing. An example is as follows:

String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
   producer.send(messages);
} catch (Exception e) {
   e.printStackTrace();
   //Processing error
}

4.2 message list segmentation

The complexity increases only when you send a large number of messages. You may not be sure whether it exceeds the size limit (4MB). At this time, you'd better split your message list:

public class ListSplitter implements Iterator<List<Message>> { 
    private final int SIZE_LIMIT = 1024 * 1024 * 4;
    private final List<Message> messages;
    private int currIndex;
    public ListSplitter(List<Message> messages) { 
        this.messages = messages;
    }
    @Override public boolean hasNext() {
        return currIndex < messages.size(); 
    }
    @Override public List<Message> next() { 
        int startIndex = getStartIndex();
        int nextIndex = startIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex); 
            int tmpSize = calcMessageSize(message);
            if (tmpSize + totalSize > SIZE_LIMIT) {
                break; 
            } else {
                totalSize += tmpSize; 
            }
        }
        List<Message> subList = messages.subList(startIndex, nextIndex); 
        currIndex = nextIndex;
        return subList;
    }
    private int getStartIndex() {
        Message currMessage = messages.get(currIndex); 
        int tmpSize = calcMessageSize(currMessage); 
        while(tmpSize > SIZE_LIMIT) {
            currIndex += 1;
            Message message = messages.get(curIndex); 
            tmpSize = calcMessageSize(message);
        }
        return currIndex; 
    }
    private int calcMessageSize(Message message) {
        int tmpSize = message.getTopic().length() + message.getBody().length(); 
        Map<String, String> properties = message.getProperties();
        for (Map.Entry<String, String> entry : properties.entrySet()) {
            tmpSize += entry.getKey().length() + entry.getValue().length(); 
        }
        tmpSize = tmpSize + 20; // Increase the overhead of log by 20 bytes
        return tmpSize; 
    }
}
//Split a big message into several small messages
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
  try {
      List<Message>  listItem = splitter.next();
      producer.send(listItem);
  } catch (Exception e) {
      e.printStackTrace();
      //Processing error
  }
}

5 example of filtering messages

In most cases, TAG is a simple and useful design to select the messages you want. For example:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

The consumer will receive a message containing TAGA or TAGB or TAGC. However, the limitation is that a message can only have one label, which may not work in complex scenarios. In this case, you can filter messages using SQL expressions. The SQL feature can be calculated by the properties when the message is sent. Under the syntax defined by RocketMQ, you can implement some simple logic. Here is an example:

------------
| message  |
|----------|  a > 5 AND b = 'abc'
| a = 10   |  --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message  |
|----------|   a > 5 AND b = 'abc'
| a = 1    |  --------------------> Missed
| b = 'abc'|
| c = true |
------------

5.1 basic grammar

RocketMQ only defines some basic syntax to support this feature. You can also easily expand it.

  • Numerical comparison, such as: >, > =, <, < =, BETWEEN, =;
  • Character comparison, such as: =, < >, IN;
  • IS NULL or IS NOT NULL;
  • Logical symbols AND, OR, NOT;

Constant support types are:

  • Value, such as 123, 3.1415;
  • Characters, such as' abc ', must be enclosed in single quotation marks;
  • NULL, special constant
  • Boolean, TRUE or FALSE

Only consumers using push mode can use SQL92 standard sql statements. The interfaces are as follows:

public void subscribe(finalString topic, final MessageSelector messageSelector)

5.2 use examples

1. Producer sample

When sending a message, you can set the properties of the message through putUserProperty

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest",
   tag,
   ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// Set some properties
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);

producer.shutdown();

2. Consumer sample

Use messageselector Bysql to filter messages using sql

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// Only subscribed messages have this attribute a, a > = 0 and a < = 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
   @Override
   public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
       return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
   }
});
consumer.start();

6 example of message transaction

Transaction messages have three statuses: commit status, rollback status and intermediate status:

  • TransactionStatus.CommitTransaction: commit a transaction that allows the consumer to consume this message.
  • TransactionStatus.RollbackTransaction: rollback transaction, which means that the message will be deleted and cannot be consumed.
  • TransactionStatus.Unknown: intermediate status, which represents the need to check the message queue to determine the status.

In addition, among them

  • Half(Prepare) Message It refers to a message that cannot be delivered temporarily. The sender has successfully sent the message to the MQ server, but the server has not received the producer's secondary request for the message Confirm. At this time, the message is marked as "temporarily undeliverable", and the message in this state is half message.
  • Message Status Check The secondary confirmation of a transaction message is lost due to network flash off, producer application restart and other reasons. The MQ server finds that a message is long through scanning When the period is in "half message", it is necessary to actively ask the message producer about the final status (Commit or Rollback) of the message. This process is the message reply Check.

Execution flow of transaction message

  1. The sender sends messages to the MQ server.
  2. After the MQ Server succeeds in persisting the message, it confirms to the sender that the message has been sent successfully. At this time, the message is a semi message.
  3. The sender starts executing local transaction logic.
  4. The sender submits a secondary confirmation (Commit or Rollback) to the MQ Server according to the local transaction execution result, and the MQ Server receives it In the Commit status, the semi message is marked as deliverable, and the subscriber will eventually receive the message; MQ Server deletes the Rollback status if it receives it Message, the subscriber will not accept the message.
  5. In the special case of network disconnection or application restart, the secondary confirmation submitted in step 4 above does not reach the MQ Server after a fixed time MQ Server will initiate a message lookup on this message.
  6. After receiving the message query, the sender needs to check the final result of the local transaction execution of the corresponding message.
  7. The sender submits the second confirmation again according to the final status of the local transaction, and the MQ Server still operates on the half message according to step 4.

6.1 example of sending transaction message

Transaction messages do not involve consumers, because success or failure is entirely controlled by the producer.

1. Create transactional producer

Using the TransactionMQProducer class to create a producer and specify a unique producer group, you can set up a custom thread pool to process these check requests. After executing a local transaction, you need to reply to the message queue according to the execution result. Please refer to the previous section for the returned transaction status.

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class TransactionProducer {
   public static void main(String[] args) throws MQClientException, InterruptedException {
       TransactionListener transactionListener = new TransactionListenerImpl();
       TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
       ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
           @Override
           public Thread newThread(Runnable r) {
               Thread thread = new Thread(r);
               thread.setName("client-transaction-msg-check-thread");
               return thread;
           }
       });
       producer.setExecutorService(executorService);
       producer.setTransactionListener(transactionListener);
       producer.start();
       String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
       for (int i = 0; i < 10; i++) {
           try {
               Message msg =
                   new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                       ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
               SendResult sendResult = producer.sendMessageInTransaction(msg, null);
               System.out.printf("%s%n", sendResult);
               Thread.sleep(10);
           } catch (MQClientException | UnsupportedEncodingException e) {
               e.printStackTrace();
           }
       }
       for (int i = 0; i < 100000; i++) {
           Thread.sleep(1000);
       }
       producer.shutdown();
   }
}

2. Implement the transaction listening interface -- used to set the success and failure conditions

When sending a semi successful message, we use the executelocetransaction method to execute the local transaction. It returns one of the three transaction states mentioned in the previous section. The checkLocalTransaction method is used to check the local transaction status and respond to the check request of the message queue. It is also one of the three transaction states mentioned in the previous section.

public class TransactionListenerImpl implements TransactionListener {
  private AtomicInteger transactionIndex = new AtomicInteger(0);
  private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
  @Override
  public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
      int value = transactionIndex.getAndIncrement();
      int status = value % 3;
      localTrans.put(msg.getTransactionId(), status);
      return LocalTransactionState.UNKNOW;
  }
  @Override
  public LocalTransactionState checkLocalTransaction(MessageExt msg) {
      Integer status = localTrans.get(msg.getTransactionId());
      if (null != status) {
          switch (status) {
              case 0:
                  return LocalTransactionState.UNKNOW;
              case 1:
                  return LocalTransactionState.COMMIT_MESSAGE;
              case 2:
                  return LocalTransactionState.ROLLBACK_MESSAGE;
          }
      }
      return LocalTransactionState.COMMIT_MESSAGE;
  }
}

6.2 restrictions on the use of transaction messages

  1. Delayed messages and bulk messages are not supported for transaction messages.
  2. In order to avoid the accumulation of semi queue messages caused by too many times of single message checking, we limit the number of times of single message checking to 15 by default, but users can modify this limit through the transactionCheckMax parameter of the Broker configuration file. If a message has been checked more than N times (N = transactionCheckMax), the Broker will discard the message and print the error log at the same time by default. The user can modify this behavior by overriding the AbstractTransactionalMessageCheckListener class.
  3. The transaction message will be checked after a specific length of time such as the parameter transactionTimeout in the Broker configuration file. When sending a transaction message, the user can also set the user attribute CHECK_IMMUNITY_TIME_IN_SECONDS to change this limit. This parameter takes precedence over the transactionTimeout parameter.
  4. Transactional messages may be checked or consumed more than once.
  5. The target topic message submitted to the user may fail. At present, this depends on the log record. Its high availability is guaranteed by RocketMQ's own high availability mechanism. If you want to ensure that transaction messages are not lost and transaction integrity is guaranteed, it is recommended to use synchronous dual write mechanism.
  6. The producer ID of a transaction message cannot be shared with the producer ID of other types of messages. Unlike other types of messages, transaction messages allow reverse queries, and MQ servers can query consumers through their producer ID.

7 Logappender example

RocketMQ log provides log4j, log4j2 and logback log frameworks as business applications. The following is a configuration example

7.1 log4j example

Use the log4j property configuration as shown in the following example

log4j.appender.mq=org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender
log4j.appender.mq.Tag=yourTag
log4j.appender.mq.Topic=yourLogTopic
log4j.appender.mq.ProducerGroup=yourLogGroup
log4j.appender.mq.NameServerAddress=yourRocketmqNameserverAddress
log4j.appender.mq.layout=org.apache.log4j.PatternLayout
log4j.appender.mq.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-4r [%t] (%F:%L) %-5p - %m%n

Follow the example below to use log4j xml configuration to add logs asynchronously

<appender name="mqAppender1"class="org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender">
  <param name="Tag" value="yourTag" />
  <param name="Topic" value="yourLogTopic" />
  <param name="ProducerGroup" value="yourLogGroup" />
  <param name="NameServerAddress" value="yourRocketmqNameserverAddress"/>
  <layout class="org.apache.log4j.PatternLayout">
      <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss}-%p %t %c - %m%n" />
  </layout>
</appender>
<appender name="mqAsyncAppender1"class="org.apache.log4j.AsyncAppender">
  <param name="BufferSize" value="1024" />
  <param name="Blocking" value="false" />
  <appender-ref ref="mqAppender1"/>
</appender>

7.2 log4j2 example

When using log4j2, the configuration is as follows. If you want to be non blocking, you only need to add references asynchronously

<RocketMQ name="rocketmqAppender" producerGroup="yourLogGroup" nameServerAddress="yourRocketmqNameserverAddress"
   topic="yourLogTopic" tag="yourTag">
  <PatternLayout pattern="%d [%p] hahahah %c %m%n"/>
</RocketMQ>

7.3 logback example

<appender name="mqAppender1"class="org.apache.rocketmq.logappender.logback.RocketmqLogbackAppender">
  <tag>yourTag</tag>
  <topic>yourLogTopic</topic>
  <producerGroup>yourLogGroup</producerGroup>
  <nameServerAddress>yourRocketmqNameserverAddress</nameServerAddress>
  <layout>
      <pattern>%date %p %t - %m%n</pattern>
  </layout>
</appender>
<appender name="mqAsyncAppender1"class="ch.qos.logback.classic.AsyncAppender">
  <queueSize>1024</queueSize>
  <discardingThreshold>80</discardingThreshold>
  <maxFlushTime>2000</maxFlushTime>
  <neverBlock>true</neverBlock>
  <appender-ref ref="mqAppender1"/>
</appender>

8 OpenMessaging sample

OpenMessaging It aims to establish message and stream processing specifications to provide a general framework and industrial guidance scheme for the fields of finance, e-commerce, Internet of things and big data. In the distributed heterogeneous environment, the design principle is cloud oriented, simple, flexible and language independent. Conforming to these specifications will help enterprises easily develop heterogeneous messaging applications across platforms and operating systems. OpenMessaging API 0.3.0 is provided Partial implementation of 0-alpha. The following example demonstrates how to access RocketMQ based on OpenMessaging.

8.1 OMSProducer example

The following example demonstrates how to send a message to the RocketMQ agent in synchronous, asynchronous, or one-way transport.

import io.openmessaging.Future;
import io.openmessaging.FutureListener;
import io.openmessaging.Message;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.producer.Producer;
import io.openmessaging.producer.SendResult;
import java.nio.charset.Charset;
import java.util.concurrent.CountDownLatch;

public class SimpleProducer {
    public static void main(String[] args) {
       final MessagingAccessPoint messagingAccessPoint =
           OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
       final Producer producer = messagingAccessPoint.createProducer();
       messagingAccessPoint.startup();
       System.out.printf("MessagingAccessPoint startup OK%n");
       producer.startup();
       System.out.printf("Producer startup OK%n");
       {
           Message message = producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
           SendResult sendResult = producer.send(message);
           //final Void aVoid = result.get(3000L);
           System.out.printf("Send async message OK, msgId: %s%n", sendResult.messageId());
       }
       final CountDownLatch countDownLatch = new CountDownLatch(1);
       {
           final Future<SendResult> result = producer.sendAsync(producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
           result.addListener(new FutureListener<SendResult>() {
               @Override
               public void operationComplete(Future<SendResult> future) {
                   if (future.getThrowable() != null) {
                       System.out.printf("Send async message Failed, error: %s%n", future.getThrowable().getMessage());
                   } else {
                       System.out.printf("Send async message OK, msgId: %s%n", future.get().messageId());
                   }
                   countDownLatch.countDown();
               }
           });
       }
       {
           producer.sendOneway(producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
           System.out.printf("Send oneway message OK%n");
       }
       try {
           countDownLatch.await();
           Thread.sleep(500); // Wait some time to send the message
       } catch (InterruptedException ignore) {
       }
       producer.shutdown();
   }
}

8.2 OMSPullConsumer

Use OMS PullConsumer to pull messages from the specified queue

import io.openmessaging.Message;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.consumer.PullConsumer;
import io.openmessaging.producer.Producer;
import io.openmessaging.producer.SendResult;

public class SimplePullConsumer {
    public static void main(String[] args) {
       final MessagingAccessPoint messagingAccessPoint =
           OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
       messagingAccessPoint.startup();
       final Producer producer = messagingAccessPoint.createProducer();
       final PullConsumer consumer = messagingAccessPoint.createPullConsumer(
           OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "OMS_CONSUMER"));
       messagingAccessPoint.startup();
       System.out.printf("MessagingAccessPoint startup OK%n");
       final String queueName = "TopicTest";
       producer.startup();
       Message msg = producer.createBytesMessage(queueName, "Hello Open Messaging".getBytes());
       SendResult sendResult = producer.send(msg);
       System.out.printf("Send Message OK. MsgId: %s%n", sendResult.messageId());
       producer.shutdown();
       consumer.attachQueue(queueName);
       consumer.startup();
       System.out.printf("Consumer startup OK%n");
       // Run until a message is found to have been sent
       boolean stop = false;
       while (!stop) {
           Message message = consumer.receive();
           if (message != null) {
               String msgId = message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID);
               System.out.printf("Received one message: %s%n", msgId);
               consumer.ack(msgId);
               if (!stop) {
                   stop = msgId.equalsIgnoreCase(sendResult.messageId());
               }
           } else {
               System.out.printf("Return without any message%n");
           }
       }
       consumer.shutdown();
       messagingAccessPoint.shutdown();
   }
}

8.3 OMSPushConsumer

The following example shows how to add OMS PushConsumer to the specified queue and consume these messages through MessageListener.

import io.openmessaging.Message;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.consumer.MessageListener;
import io.openmessaging.consumer.PushConsumer;

public class SimplePushConsumer {
    public static void main(String[] args) {
       final MessagingAccessPoint messagingAccessPoint = OMS
           .getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
       final PushConsumer consumer = messagingAccessPoint.
           createPushConsumer(OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "OMS_CONSUMER"));
       messagingAccessPoint.startup();
       System.out.printf("MessagingAccessPoint startup OK%n");
       Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
           @Override
           public void run() {
               consumer.shutdown();
               messagingAccessPoint.shutdown();
           }
       }));
       consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
           @Override
           public void onReceived(Message message, Context context) {
               System.out.printf("Received one message: %s%n", message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID));
               context.ack();
           }
       });
       consumer.startup();
       System.out.printf("Consumer startup OK%n");
   }
}

Added by Push Eject on Wed, 05 Jan 2022 01:58:27 +0200