RocketMQ simple basic usage (III)
In the last article, I demonstrated how to get started with RocketMQ. Next, I'll learn more about how to use RocketMQ through some simple examples.
1, General message
RocketMQ provides three ways to send ordinary messages: reliable synchronous sending, reliable asynchronous sending and one-way sending.
1. Reliable synchronous transmission
Synchronous sending refers to the communication mode in which the message sender sends data and sends the next data packet after receiving the response from the receiver. This method has a wide range of application scenarios, such as important notification email, registration SMS notification, marketing SMS system, etc.
package com.linhuaming.rocketmq.demo.producer; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.UnsupportedEncodingException; /** * Reliable synchronous transmission */ public class SyncProducer { public static void main(String[] args) { // 1. Create a producer object and specify a producer group DefaultMQProducer producer = new DefaultMQProducer("xiaolin-producer"); // 2. Specify the mq server address producer.setNamesrvAddr("127.0.0.1:9876"); try { // 3. Start the producer producer.start(); for(int i=0; i<10; i++){ int num = i + 1; // 4. Create a message. The parameters are: subject, label and message body String sendContent = "Test message, hello,I'm the news"+num; Message message = new Message("topic1","tag1",sendContent.getBytes("utf-8")); // 5. Send message SendResult send = producer.send(message); System.out.printf("The first"+num+"Messages:%s%n", send); } // 6. Close resources producer.shutdown(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } catch (MQClientException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
2. Reliable asynchronous transmission
Asynchronous transmission refers to the communication mode in which the sender sends data, does not wait for the receiver to send back the response, and then sends the next data packet. The sender receives the server response through the callback interface and processes the response results.
Asynchronous sending is generally used in business scenarios where the link takes a long time and is sensitive to RT response time, such as notifying the user to start the transcoding service after uploading the video, notifying the user to push the transcoding results after transcoding is completed, etc.
package com.linhuaming.rocketmq.demo.producer; import com.alibaba.fastjson.JSON; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.UnsupportedEncodingException; /** * Reliable asynchronous transmission */ public class ASyncProducer { public static void main(String[] args) { // 1. Create a producer object and specify a producer group DefaultMQProducer producer = new DefaultMQProducer("xiaolin-producer"); // 2. Specify the mq server address producer.setNamesrvAddr("127.0.0.1:9876"); try { // 3. Start the producer producer.start(); for(int i=0; i<5; i++){ int num = i + 1; // 4. Create a message. The parameters are: subject, label and message body String sendContent = "Test message, hello,I'm the news"+num; Message message = new Message("topic1","tag1",sendContent.getBytes("utf-8")); // 5. Send synchronization messages to a Broker producer.send(message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("Message sent successfully"); System.out.println(JSON.toJSONString(sendResult)); } @Override public void onException(Throwable e) { System.out.println("Message sending failed"+e.getMessage()); System.out.println("Processing failure message"); } }); } // Let the thread not terminate, otherwise an error will be reported Thread.sleep(30000000); // If you no longer send messages, close the Producer instance. producer.shutdown(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
3. Unidirectional transmission
One way sending means that the sender is only responsible for sending messages without waiting for a response from the server and no callback function is triggered, that is, only sending requests without waiting for a response. It is applicable to some scenarios that take a very short time but do not require high reliability, such as log collection.
package com.linhuaming.rocketmq.demo.producer; import com.alibaba.fastjson.JSON; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.UnsupportedEncodingException; /** * One way message */ public class OneWayProducer { public static void main(String[] args) { // 1. Create a producer object and specify a producer group DefaultMQProducer producer = new DefaultMQProducer("xiaolin-producer"); // 2. Specify the mq server address producer.setNamesrvAddr("127.0.0.1:9876"); try { // 3. Start the producer producer.start(); for(int i=0; i<5; i++){ int num = i + 1; // 4. Create a message. The parameters are: subject, label and message body String sendContent = "Test message, hello,I'm the news"+num; Message message = new Message("topic1","tag1",sendContent.getBytes("utf-8")); // 5. Send a one-way message to a Broker producer.sendOneway(message); } // 6. If the message is not sent again, close the Producer instance. producer.shutdown(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
4. Comparison of three sending methods
Sending method | Time sent | Send feedback results | Is data lost |
---|---|---|---|
Synchronous transmission | fast | have | Not lost |
Asynchronous transmission | fast | have | Not lost |
One way message | Faster | nothing | May be lost |
2, Sequential message
Although the data structure of RocketMQ is queue, it seems that it naturally supports sequential messages. When there is only one queue, it naturally supports sequential messages. However, there are multiple queues in the Broker. When sending multiple messages, the Broker will place multiple messages in different queues according to polling, and consumers will consume messages in a multi-threaded manner, Therefore, there is no guarantee that the way of consuming messages is the same as that of sending messages. The solution is to send all messages to a queue.
For example, the process of an order is: create, pay, push and complete. With the same order number
Sequential message is a message type that message queue provides to publish and consume in strict order.
producer
OrderStep class
package com.linhuaming.rocketmq.demo.domain; import java.util.ArrayList; import java.util.List; /** * Order builder */ public class OrderStep { private long orderId; private String desc; public long getOrderId() { return orderId; } public void setOrderId(long orderId) { this.orderId = orderId; } public String getDesc() { return desc; } public void setDesc(String desc) { this.desc = desc; } @Override public String toString() { return "OrderStep{" + "orderId=" + orderId + ", desc='" + desc + '\'' + '}'; } public static List<OrderStep> buildOrders() { // 1039L: create payment push completed // 1065L: create payment // 7235L: create payment List<OrderStep> orderList = new ArrayList<OrderStep>(); OrderStep orderDemo = new OrderStep(); orderDemo.setOrderId(1039L); orderDemo.setDesc("establish"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(1065L); orderDemo.setDesc("establish"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(1039L); orderDemo.setDesc("payment"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(7235L); orderDemo.setDesc("establish"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(1065L); orderDemo.setDesc("payment"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(7235L); orderDemo.setDesc("payment"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(1065L); orderDemo.setDesc("complete"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(1039L); orderDemo.setDesc("Push"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(7235L); orderDemo.setDesc("complete"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(1039L); orderDemo.setDesc("complete"); orderList.add(orderDemo); return orderList; } }
OrderProducer class
package com.linhuaming.rocketmq.demo.producer; import com.linhuaming.rocketmq.demo.domain.OrderStep; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import java.util.List; /** * Sequential message */ public class OrderProducer { public static void main(String[] args) throws Exception { // 1. Create a message producer and specify the producer group name DefaultMQProducer producer = new DefaultMQProducer("xiaoxi-producer"); // 2. Specify Nameserver address producer.setNamesrvAddr("127.0.0.1:9876"); // 3. Start producer producer.start(); // Build message collection List<OrderStep> orderSteps = OrderStep.buildOrders(); // Traverse send message for (int i = 0; i < orderSteps.size(); i++) { // 4. Create a message String body = orderSteps.get(i) + ""; Message message = new Message("topic1", "tag1", "i" + i, body.getBytes()); // 5. Send message SendResult sendResult = producer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message message, Object arg) { long orderId = (long) arg; long index = orderId % mqs.size(); return mqs.get((int) index); } },orderSteps.get(i).getOrderId()); System.out.println("Send results:" + sendResult); } // 6. Close resources producer.shutdown(); } }
consumer
OrderConsumer class
package com.linhuaming.rocketmq.demo.consumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class OrderConsumer { public static void main(String[] args) throws Exception { // 1. Create a Consumer and make a Consumer group name DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("xiaoxi-consumer"); // 2. Specify Nameserver address consumer.setNamesrvAddr("127.0.0.1:9876"); // 3. Subscribe to Topic and Tag consumer.subscribe("topic1", "*"); // 4. Register the message listener. The message listener is consumed in order consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) { for (MessageExt msg : msgs) { System.out.println("Thread Name:[" + Thread.currentThread().getName() + "]:" + new String(msg.getBody())); } return ConsumeOrderlyStatus.SUCCESS; } }); //5. Start consumers consumer.start(); System.out.println("Consumer starting..."); } }
3, Transaction message
RocketMQ provides transaction messages, through which the final consistency of distributed transactions can be achieved. The process of transaction message interaction is as follows:
Basic concepts of transaction messages:
- Semi transaction message: a message that cannot be delivered temporarily. The sender has successfully sent the message to the RocketMQ server, but the server has not received the producer's secondary confirmation of the message. At this time, the message is marked as "temporarily undeliverable". The message in this state is a semi transaction message.
- Message check back: the secondary confirmation of a transaction message is lost due to network flash off, producer application restart and other reasons. When the RocketMQ server finds that a message has been in "semi transaction message" for a long time through scanning, it needs to actively ask the message producer about the final state (Commit or Rollback) of the message. This query process is message check back.
Sending steps of transaction message:
- The sender sends the semi transaction message to the RocketMQ server.
- After persisting the message, the RocketMQ server returns a confirmation message to the sender that the message has been sent successfully. At this time, the message is a semi transaction message.
- The sender starts executing local transaction logic.
- The sender submits a secondary confirmation (Commit or Rollback) to the server according to the local transaction execution result. When the server receives the Commit status, it marks the transaction message as deliverable, and the subscriber will eventually receive the message. If the server receives the Rollback status, the semi transaction message will be deleted, and the subscriber will not receive the message.
Transaction message back check steps:
- In the special case of network disconnection or application restart, the secondary confirmation submitted in step 4 above finally does not reach the server. After a fixed time, the server will initiate a message back check on the message.
- After receiving the message query, the sender needs to check the final result of the local transaction execution of the corresponding message.
- The sender submits the secondary confirmation again according to the final status of the local transaction, and the server still operates the semi transaction message according to step 4.
4, Delay message
For example, in e-commerce, you can send a delay message after submitting an order, check the status of the order after 1h, and cancel the order and release the inventory if it is still unpaid. We can use delayed messages to complete this function.
Restrictions on the use of delay messages. At present, RocketMq does not support any time delay. It is necessary to set several fixed delay levels from 1s to 2h, corresponding to levels 1 to 18 respectively.
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
Message consumption failure will enter the delayed message queue. The message sending time is related to the set delay level and retry times.
producer
package com.linhuaming.rocketmq.demo.producer; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; public class ScheduledMessageProducer { private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; public static void main(String[] args) throws Exception { // 1. Instantiate a producer to generate a delay message DefaultMQProducer producer = new DefaultMQProducer("xiaoxi-producer"); // 2. Specify the mq server address producer.setNamesrvAddr("127.0.0.1:9876"); // 3. Start the producer producer.start(); Message message = new Message("topic1", ("delay message").getBytes()); // Set the delay Level 3, and the message will be sent after 10s (now only a few fixed times are supported, see delaytimelevel for details) message.setDelayTimeLevel(3); // send message producer.send(message); // Close producer producer.shutdown(); } }
consumer
package com.linhuaming.rocketmq.demo.consumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class ScheduledMessageConsumer { public static void main(String[] args) throws Exception { // 1. Create a Consumer and make a Consumer group name DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("xiaoxi-consumer"); // 2. Specify the mq server address consumer.setNamesrvAddr("127.0.0.1:9876"); // 3. Subscribe to Topics consumer.subscribe("topic1", "*"); // 4. Register message listener consumer.registerMessageListener(new MessageListenerConcurrently(){ @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt message : messages) { System.out.println("Receive message[msgId=" + message.getMsgId() + "]:"+ new String(message.getBody()) ); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 5. Start consumers consumer.start(); } }
5, Message filtering
When consuming messages, we can specify which messages to consume. At this time, we need to use message filtering, which is divided into two types:
- Filter by tag
- Filtering through SQL statements
1. Filter by tag
producer
package com.linhuaming.rocketmq.demo.producer; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; /** * Label filter - producer */ import java.util.concurrent.TimeUnit; public class FilterProducer { public static void main(String[] args) throws Exception { // 1. Create a message producer and specify the producer group name DefaultMQProducer producer = new DefaultMQProducer("xiaoxi-producer"); // 2. Specify the mq server address producer.setNamesrvAddr("127.0.0.1:9876"); // 3. Start producer producer.start(); for (int i = 0; i < 3; i++) { //4. Create a message object and specify the Topic, Tag and message body Message msg = new Message("topic1", "tag2", ("Message filtering" + i).getBytes()); // 5. Send message SendResult result = producer.send(msg); System.out.printf(result.toString()); //Thread sleep for 1 second TimeUnit.SECONDS.sleep(1); } //6. Close the producer producer.shutdown(); } }
consumer
package com.linhuaming.rocketmq.demo.consumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * Tag filtering - Consumer */ public class FilterConsumer { public static void main(String[] args) throws Exception { // 1. Create a Consumer and specify the Consumer group name DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("xiaoxi-consumer"); // 2. Specify the mq server address consumer.setNamesrvAddr("127.0.0.1:9876"); // 3. Subscribe to Topics consumer.subscribe("topic1", "tag1 || tag2"); // 4. Register message listener consumer.registerMessageListener(new MessageListenerConcurrently(){ @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt message : messages) { System.out.println("Receive message[msgId=" + message.getMsgId() + "]:"+ new String(message.getBody()) ); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 5. Start consumers consumer.start(); } }
2. Filtering through SQL statements
RocketMQ only defines some basic syntax to support this feature. You can also easily expand it.
- Numerical comparison, such as: >, > =, <, < =, BETWEEN=
- Character comparison, such as: =, < >, IN
- IS NULL or IS NOT NULL
- Logical symbols AND, OR, NOT
Constant support types are:
- Value, e.g. 123, 3.1415
- Characters, such as' abc ', must be enclosed in single quotes
- NULL, special constant
- Boolean, TRUE or FALSE
producer
When sending a message, you can set the properties of the message through putUserProperty.
package com.linhuaming.rocketmq.demo.producer; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import java.util.concurrent.TimeUnit; /** * SQL Filter - producer */ public class SQLFilterProducer { public static void main(String[] args) throws Exception { // 1. Create a message producer and specify the producer group name DefaultMQProducer producer = new DefaultMQProducer("xiaoxi-producer"); // 2. Specify the mq server address producer.setNamesrvAddr("127.0.0.1:9876"); // 3. Start producer producer.start(); for (int i = 0; i < 3; i++) { // 4. Create a message object and specify the Topic, Tag and message body Message msg = new Message("topic1", "tag1", ("Message filtering" + i).getBytes()); // Set some properties for the message object msg.putUserProperty("a", String.valueOf(i)); // 5. Send message SendResult result = producer.send(msg); System.out.println(result.toString()); } //6. Close the producer producer.shutdown(); } }
consumer
Use messageselector Bysql to filter messages using sql.
package com.linhuaming.rocketmq.demo.consumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * SQL Filtering - Consumer */ public class SQLFilterConsumer { public static void main(String[] args) throws Exception { // 1. Create a Consumer and specify the Consumer group name DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("xiaoxi-consumer"); // 2. Specify the mq server address consumer.setNamesrvAddr("127.0.0.1:9876"); // 3. Subscribe to Topics consumer.subscribe("topic1", MessageSelector.bySql("a between 0 and 1")); // sql is used here to filter messages // 4. Register message listener consumer.registerMessageListener(new MessageListenerConcurrently(){ @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt message : messages) { System.out.println("Receive message[msgId=" + message.getMsgId() + "]:"+ new String(message.getBody()) ); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 5. Start consumers consumer.start(); } }
"127.0.0.1:9876");
// 3. Subscribe to Topics consumer.subscribe("topic1", MessageSelector.bySql("a between 0 and 1")); // sql is used here to filter messages // 4. Register message listener consumer.registerMessageListener(new MessageListenerConcurrently(){ @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt message : messages) { System.out.println("Receive message[msgId=" + message.getMsgId() + "]:"+ new String(message.getBody()) ); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 5. Start consumers consumer.start(); }
}