rocketMq learning and Practice (I)
Installation configuration
download
Binary download address: http://rocketmq.apache.org/release_notes/release-notes-4.8.0/
Docker pull address: docker pull foxiswho / rocketmq: server-4.3 two
install
-
Configure environment variables
-
Find the installation package, start nameserver, enter the bin directory, and double-click mqnamesrv CMD started. Start the broker command \mqbroker -n 127.0.0.1:9876, specify configuration to enable automatic creation of topic and start bin / mqbroker - n 127.0 0.1:9876 autoCreateTopicEnable=true -c conf/broker. conf &
-
Visual plug-in download
-
Download address: https://github.com/apache/rocketmq-externals.git
-
Modify application. Under rocketmq externals \ rocketmq console \ SRC \ main \ resources Properties, modify port and namesrvAddr, and the configuration is as follows
server.contextPath= # Modify port number server.port=8088 #spring.application.index=true spring.application.name=rocketmq-console spring.http.encoding.charset=UTF-8 spring.http.encoding.enabled=true spring.http.encoding.force=true logging.config=classpath:logback.xml #if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876 # Add rocketmq config. namesrvAddr rocketmq.config.namesrvAddr=127.0.0.1:9876 #if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true rocketmq.config.isVIPChannel= #rocketmq-console's data path:dashboard/monitor rocketmq.config.dataPath=/tmp/rocketmq-console/data #set it false if you don't want use dashboard.default true rocketmq.config.enableDashBoardCollect=true
-
Start the compilation, enter the \ rocketmq externals \ rocketmq console folder with CMD, and execute MVN clean package - dmaven test. Skip = true, compiled and generated. After the compilation is successful, CMD enters the target folder and executes Java - jar rocketmq-console-ng-1.0 0.jar, start rocketmq-console-ng-1.0 0.jar. Enter '127.0' in the browser 0.1:8088 ', you can view it after success.
-
Start nameserv start mqnamesrv CMD start mqbroker cmd -n 127.0. 0.1:9876 autoCreateTopicEnable=true
-
Architecture diagram
springboot integration
Reference example https://blog.csdn.net/qq_38366063/article/details/93387680
Official website: https://github.com/apache/rocketmq-spring
rely on
- maven
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.0</version> </dependency>
-
gradle
compile 'org.apache.rocketmq:rocketmq-spring-boot-starter:2.2.0'
to configure
rocketmq: name-server: 127.0.0.1:9876 producer: group: qinajia # Timeout 5 minutes send-message-timeout: 300000 # Number of retries: 3 retry-times-when-send-failed: 3 consumer: group: qianjia
Producer implementation
rocketMQTemplate.send("test-topic-1", new GenericMessage(demo));
Producer implementation
@Service @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1") public class TestConsumer implements RocketMQListener<Demo> { @Override public void onMessage(Demo demo) { System.out.print("------- OrderPaidEventConsumer received:"+ JSON.toJSONString(demo)); } }
3 message sending methods
-
Send sync synchronously
The sending message adopts the synchronization mode, which returns the result only after the message is completely sent. This method has the time cost of waiting for the sending result synchronously.
This method has an internal retry mechanism, that is, the internal implementation will retry a certain number of times before actively declaring that the message sending fails, The default is twice (DefaultMQProducer getRetryTimesWhenSendFailed). The same message may be sent to the broker multiple times in the sent result. Here, the application developer needs to handle the idempotency problem on the consumer side. An example is as follows
public void sync() { rocketMQTemplate.syncSend("topic-name", "send sync message !"); }
-
Send sync asynchronously
The sending message adopts the asynchronous sending mode. It returns immediately after sending. When the message is completely sent, the callback function sendCallback will be called to inform the sender whether the sending is successful or failed. Asynchronous mode is usually used in response time sensitive business scenarios, that is, it can not bear the time-consuming cost of waiting for return when sending messages synchronously.
Like synchronous sending, the asynchronous mode also implements the retry mechanism internally. The default number of times is 2 (defaultmqproducer #getretrytimeswhensensensayncfailed}). The same message may be sent to the broker many times in the sent result. The application developer needs to deal with the idempotency problem on the consumer side.
public void async() { rocketMQTemplate.asyncSend("topic-name", "send async message!", new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("send successful"); } @Override public void onException(Throwable throwable) { log.info("send fail; {}", throwable.getMessage()); } }); }
-
Send one-way directly
When sending a message in one-way mode, the sender will return immediately after sending the message, and will not wait for an ack from the broker to tell whether the message has been sent completely. This method has a large throughput, but there is a risk of message loss, so it is suitable for sending unimportant messages, such as log collection.
public void oneWay() { rocketMQTemplate.sendOneWay("topic-name", "send one-way message"); }
Two consumption patterns
-
Load balancing consumption
@Service @RocketMQMessageListener( topic = "topicName", consumerGroup = "my-group1", selectorExpression = "tag01||tag02", // Load balancing mode messageModel = MessageModel.CLUSTERING ) @Slf4j public class Consumer2 implements RocketMQListener<Demo> { @Override public void onMessage(Demo demo) { log.debug("Message content {}", demo.toString()); } }
-
Broadcast mode
@Service @RocketMQMessageListener( topic = "topicName", consumerGroup = "mall", selectorExpression = "tag01||tag02", // Broadcast mode messageModel = MessageModel.BROADCASTING ) @Slf4j public class Consumer implements RocketMQListener<Demo> { @Override public void onMessage(Demo demo) { System.out.println("Message content" + demo.toString()); } }
Sequential consumption
Why there is sequential consumption? First, we need to understand the design model of rocketmq. In rocketmq, nameserver is used to register borker. In borker, topic is used to distinguish message types. If you want to distinguish more finely, tag is used as a secondary message type. There are multiple queues in a topic. In this way, even if the messages are placed in the same topic, there may not be the same Queue when sending messages. All consumers are naturally out of order when consuming. To ensure that the consumption order is consistent with the sending order of the generator, the only way is to put the messages of the same business group in the same Queue. The Queue has the characteristics of first in first out data structure, All messages that can only specify the same group of services when generating messages are in the same Queue. The specific code implementation is as follows.
/** * The spring boot rocketmq encapsulated syncSendOrderly(String destination, Object payload, String hashKey) used here * Method to implement sequential messages. hashkey is a parameter used to select the same queue. Generally, it is enough to pass unique IDs such as orderId. * ((producer) **/ @Test public void orderMessage() { SendResult result = rocketMQTemplate.syncSendOrderly("topicName:tag01", "Lau Andy", "1111"); System.out.println("1:" + result.toString()); SendResult result2 = rocketMQTemplate.syncSendOrderly("topicName:tag02", "Xue You Zhang", "1111"); System.out.println("2:" +result2.toString()); SendResult result3 = rocketMQTemplate.syncSendOrderly("topicName:tag03", "Li Si-", "1111"); System.out.println("3:" + result3.toString()); SendResult result4 = rocketMQTemplate.syncSendOrderly("topicName:tag04", "Wang Wu", "1111"); System.out.println("4:" + result4.toString()); } /** * Consumer selects order for consumeMode **/ @Service @RocketMQMessageListener( topic = "topicName", consumerGroup = "mall", // Broadcast mode messageModel = MessageModel.CLUSTERING, // Select sequential mode consumeMode = ConsumeMode.ORDERLY ) @Slf4j public class Consumer implements RocketMQListener<String> { @Override public void onMessage(String demo) { System.out.println("Message content" + demo); } }
Delay queue
@Test private void delayMessage() { Demo demo = new Demo(); demo.setName("leyang"); demo.setSkill("coder"); Message<Demo> message = new GenericMessage(demo); // syncSend(String destination, Message<?> message, long timeout, int delayLevel) // Delaylevel 1s 5S 10s 30s 1m 2m 4m 6m 7m 8m 9m 10m 20m 30m 1H 2h level 1 and 1s, maximum delay 2h // Timeout timeout has nothing to do with the delay time SendResult result = rocketMQTemplate.syncSend("topicName:tag01", message, 2000, 1); }
Batch message sending
Note: the amount of data sent in batch cannot exceed 4 m. after that, the message needs to be divided.
@Test public void batchMessage() { List<Message<Demo>> messages = new ArrayList<>(); for (int i = 0; i < 10; i++) { Demo demo = new Demo(); demo.setName("leyang" + (i + 1)); demo.setSkill("coder"); Message<Demo> message = new GenericMessage(demo); messages.add(message); } SendResult result = rocketMQTemplate.syncSend("topicName:tag01", messages); }
Message filtering
-
Filter by tag
// producer @Test public void orderMessage() { // through SendResult result = rocketMQTemplate.syncSend("topicName:tag01", "Lau Andy"); System.out.println("1:" + result.toString()); SendResult result2 = rocketMQTemplate.syncSend("topicName:tag02", "Xue You Zhang"); System.out.println("2:" +result2.toString()); SendResult result3 = rocketMQTemplate.syncSend("topicName:tag03", "Li Si-"); System.out.println("3:" + result3.toString()); SendResult result4 = rocketMQTemplate.syncSend("topicName:tag04", "Wang Wu"); System.out.println("4:" + result4.toString()); } // consumer @Service @RocketMQMessageListener( topic = "topicName", consumerGroup = "mall", // Broadcast mode messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.ORDERLY, // Select tag01 and tag05 for consumption selectorExpression = "tag01 || tag05" ) @Slf4j public class Consumer implements RocketMQListener<String> { @Override public void onMessage(String demo) { System.out.println("Message content" + demo); } }
-
Filter based on custom criteria
matters needing attention:
You need to modify the broker Conf configuration file, add the following configuration
# Turn on attribute filtering enablePropertyFilter=true
Then, when starting, you need to set the configured file mode to start, and start the command start mqbroker cmd -n 127.0. 0.1:9876 -c ../ conf/broker. conf
No configuration will not take effect.
// Generator @Test public void filterMessage() { for (int i = 0; i < 10; i++) { // You need to use the rocketmq native message object to set custom properties org.apache.rocketmq.common.message.Message rocketMsg = new org.apache.rocketmq.common.message.Message(); Demo demo = new Demo(); demo.setName("leyang" + (i + 1)); demo.setSkill("coder"); rocketMsg.setBody(JSON.toJSONBytes(demo)); rocketMsg.putUserProperty("i", String.valueOf(i + 1)); rocketMsg.setTopic("topicName"); rocketMsg.setTags("tag01"); try { // rocketMQTemplate.getProducer() send org apache. rocketmq. common. Message takes effect SendResult send = rocketMQTemplate.getProducer().send(rocketMsg); } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } // consumer @Service @RocketMQMessageListener( topic = "topicName", consumerGroup = "mall", // Broadcast mode messageModel = MessageModel.CLUSTERING ) @Slf4j public class Consumer implements RocketMQListener<Demo>, RocketMQPushConsumerLifecycleListener { @Override public void onMessage(Demo demo) { System.out.println("Message content" + demo); } @Override public void prepareStart(DefaultMQPushConsumer consumer) { // set consumer consume message from now consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP); consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis())); try { consumer.subscribe("topicName", MessageSelector.bySql("i > 5")); } catch (MQClientException e) { e.printStackTrace(); } } }
Abnormal problem
[question 1]
Abnormal information
Caused by: org.apache.rocketmq.client.exception.MQBrokerException: CODE: 14 DESC: service not available now. It may be caused by one of the following reasons: the broker's disk is full [CL: 0.93 CQ: 0.93 INDEX: 0.93], messages are put to the slave, message store has been shut down, etc. BROKER: 172.31.254.27:10911
resolvent
Edit the / conf/2m-2s-async/broker-a.properties file and add diskMaxUsedSpaceRatio=98. An error will not be reported until the disk occupies 98%
Reference blog
Architecture analysis https://www.cnblogs.com/qdhxhz/p/11094624.html
Use specification reference: https://www.jianshu.com/p/9c0d4bde8153
Detailed explanation of rocketmq Foundation: https://blog.csdn.net/weixin_39615596/article/details/111611635
rocketmq comparison kafka: https://blog.csdn.net/damacheng/article/details/42846549
docker installation tutorial: https://zhuanlan.zhihu.com/p/342022297