RocketMQ learning notes
1. Introduction to rocketmq
1.1 official API
1.1.1 concept and characteristics
- Concept : introduce the basic conceptual model of RocketMQ.
- Features : describes the functions and features implemented by RocketMQ.
1.1.2 architecture design
- Architecture : introduce RocketMQ deployment architecture and technical architecture.
- Design : This paper introduces the design principle of key mechanisms of RocketMQ, mainly including message storage, communication mechanism, message filtering, load balancing, transaction messages, etc.
1.1.3 example
- Example : describes the common usage of RocketMQ, including basic sample, sequence message sample, delay message sample, batch message sample, filter message sample, transaction message sample, etc.
1.1.4 best practices
- Best Practice : introduce the best practices of RocketMQ, including the best practices of producers, consumers, brokers and NameServer, the configuration mode of clients, and the best parameter configuration of JVM and linux.
- Message trace : describes how to use RocketMQ message tracks.
- Authority management (Auth Management) : describes how to quickly deploy and use a RocketMQ cluster that supports permission control.
- Dledger quick start : This paper introduces the fast building method of Dledger.
- Cluster deployment : introduce the cluster deployment mode of Dledger.
1.1.5 operation and maintenance management
- Cluster deployment (Operation) : introduce the deployment methods of various forms of RocketMQ clusters, such as single Master mode, multi Master mode, multi Master and multi slave mode, and the use of the operation and maintenance tool mqadmin.
1.1.6 API Reference
2.1 usage scenario, installation, advantages and disadvantages
Background: RocketMQ is the official designated messaging product of Alibaba on the "double 11" and supports all messaging services of Alibaba group. After more than ten years of rigorous test of high availability and high reliability, RocketMQ is the core product of Alibaba trading link;
2.1.1 usage scenario
-
decoupling
Scenario: system A sends data to three BCD systems and sends it through interface call. What if the E system also needs this data? What if system C doesn't need it now? The head of system A almost collapsed
In this scenario, system A is seriously coupled with other messy systems. System A generates A key data, and many systems need system A to send this data. System A should always consider BCDE. What if the four systems hang up? Do you want to resend it or save it? My hair is white!
If MQ is used, system A generates A piece of data and sends it to MQ. Which system needs the data to be consumed in MQ itself. If the new system needs data, it can be consumed directly from MQ; If A system does not need this data, you can cancel the consumption of MQ messages. In this way, system A does not need to consider who to send data to, maintain this code, or whether others call successfully, fail and timeout.
-
asynchronous
Scenario: when system A receives A request, it needs to write in its own local database. It also needs to write in the three BCD systems. It takes 3MS to write in its own local database, and 300ms, 450ms and 200ms to write in the three BCD systems respectively. The total delay of the final request is 3 + 300 + 450 + 200 = 953ms, which is close to 1s. The user feels that something is too slow. It is almost unacceptable for A user to initiate A request through the browser and wait for 1s.
If MQ is used, system A sends three messages to the MQ queue continuously. If it takes 5ms, the total time from receiving A request to returning A response to the user is 3 + 5 = 8ms. For the user, it actually feels like clicking A button and returning directly after 8ms.
-
Peak clipping
Scenario: from 0:00 to 12:00 every day, system a is calm, with 50 concurrent requests per second. As a result, from 12:00 to 13:00 every time, the number of concurrent requests per second will suddenly increase to 5k +. However, the system is directly based on MySQL. A large number of requests flow into mysql, and about 5k SQL messages are executed on MySQL every second. Generally, MySQL can handle 2k requests per second. If it can handle 5k requests per second, MySQL may be killed directly, resulting in system crash and users can no longer use the system. However, once the peak period is over, it becomes a low peak period in the afternoon. Maybe 1w users operate on the website at the same time, and the number of requests per second may be 50, which has almost no pressure on the whole system.
If MQ is used, 5k requests are written to MQ every second. System A can process up to 2k requests per second, because MySQL can process up to 2k requests per second. System A slowly pulls requests from MQ. It pulls 2k requests every second. It should not exceed the maximum number of requests it can handle per second. In this way, system A will never hang up even during peak hours. MQ sends 5k requests in and 2k requests out every second. As A result, hundreds of thousands or even millions of requests may be overstocked in MQ during the noon peak (one hour).
2.1.2 installation steps
-
Download RocketMQ
Latest version of RocketMQ: 4.9.0
# Unzip unzip rocketmq-all-4.9.0-bin-release.zip
-
Environmental requirements
- Linux 64 bit operating system
- JDK1.8(64 bit)
- Maven 3.0 is required for source code installation 2.x
2.1.3 advantages and disadvantages of use
- advantage
- shortcoming
2.2 starting and closing RocketMQ
- Starting and shutting down NameServer
# 1. Start NameServer nohup sh bin/mqnamesrv & # 2. View log tail -f ~/logs/rocketmqlogs/namesrv.log # 3. Close NameServer sh bin/mqshutdown namesrv
- Starting and closing Broker
# 1. Start Broker autoCreateTopicEnable=true: topics can be created automatically and manually nohup sh bin/mqbroker -c ./conf/broker.conf -n localhost:9876 autoCreateTopicEnable=true & # 2. View log tail -f ~/logs/rocketmqlogs/broker.log # 3. Close the Broker sh bin/mqshutdown broker
- Modify the default memory size [when the memory size is insufficient]
# 1. Modify runserver SH initialization memory size JAVA_OPT="${JAVA_OPT} -server -Xms521m -Xmx521m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" # 2. Modify runbroker SH initialization memory size JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"
2.3 testing RocketMQ
2.3.1 sending messages
# 1. Set phase variables export NAMESRV_ADDR=localhost:9876 # 2. Send messages using the installed demo package sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
2.3.2 accept message
# 1. Set phase variables export NAMESRV_ADDR=localhost:9876 # 2. Accept messages sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
2.4 components
- Broker: the component that actually handles business [message storage and forwarding]
- NameServer: saves Broker related service information, similar to the registry
- Consumer: Consumer News
- Producer: source of information
Related functions of vernacular comprehension components
- Messages generated by the producer are delivered to the Broker through the NameServer
- Consumers call Broker from NameServer to get messages.
2.4.1 turn off the firewall
# 1. Turn off the firewall systemctl stop firewalld.service
2.4.2 Architecture Principle
[the external chain image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-vb6ga1yd-1625476775579) (C: \ users \ Laisheng \ appdata \ roaming \ typora \ typora user images \ 1625210735249. PNG)]
-
Four cluster modes
- Single Master mode: single Master node. The disadvantage is that the whole service may not be available after downtime;
- Multi Master mode: multiple Master nodes share and store our messages. Disadvantages: without Slave node, the message data may be lost after the main Master node goes down;
- Multi Master and multi Slave mode - asynchronous replication: multiple Master and Slave nodes adopt asynchronous mode, which is very efficient, and the data may be delayed temporarily (in milliseconds)
- Multi Master and multi Slave mode - synchronous double write: multiple Master and Slave nodes adopt the synchronous form, which has low efficiency and no data delay.
-
Cluster configuration
# Cluster name can distinguish different clusters, and multiple clusters can be built for different services brokerClusterName=mayikt # The Broker names Master and Slave indicate the relationship by using the same Broker name to indicate which Master's Slave a Slave is. brokerName=broker-a # There can be multiple Slave in a MasterBroker. 0 indicates the Master, and greater than 0 indicates the ID s of different Slave. brokerId=0 # Echoing with the fileReservedTim parameter, it indicates when to delete messages. The default value of 04 indicates 4 a.m. deleteWhen=04 # nameServer cluster configuration address namesrvAddr=192.168.1.1:9876;192.168.1.2:9876 # Turn on automatic Topic creation autoCreateTopicEnable=true # topic number of queues created by default defaultTopicQueueNums=4 # Whether to allow the Broker to automatically create subscription groups. It is recommended to open offline and close online. The default is [true] autoCreateSubscriptionGroup=true # The port number that the Broker listens to. If multiple brokers are started on one machine, set different port numbers to avoid conflicts. listenPort=10911 brokerIp=192.168.1.12.5
2.5 distributed transactions
2.5.1 background
In the context of microservices with multiple data sources:
Service A is A data source and service B is A data source. Service A needs to asynchronously push data to service B. at this time, when service A has finished pushing data, an exception occurs, resulting in data rollback, but service B has received data. Generally speaking, A friend goes to the bank to withdraw money and withdraws 5W yuan, However, the digital amount in the bank card has not changed, so there is A problem of transaction consistency.
2.5.2 solutions
Half message: it is equivalent to the message delivered in the past, but there is an ID to determine whether it is actually delivered to MQ.
- The producer (message delivery party) delivers the transaction message to the Broker, and sets the message as a semi message and cannot be consumed;
- Start executing our local transaction and send the execution result (commit / rollback) of the local transaction to the Broker;
- The Broker obtains the rollback or commit. If it is rollback, delete the message. If it is commit, the message can be consumed by the consumer;
- If the Broker fails to obtain the sender's local transaction results in time, it will actively query the local transaction results.
2.RocketMQ usage
2.1 environment construction
2.1.1 stand alone version
-
Download and install RocketMQ [reference] 2.1.2]
-
Modify RocketMQ default memory size [reference] 2.2]
-
Start NameServer service [reference] 2.2]
-
Start the broker service [reference] 2.2]
-
Verify that the service is started: jps
[the external chain image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-ntgc5huh-1625476775582) (C: \ users \ Laisheng \ appdata \ roaming \ typora user images \ 1625448615536. PNG)]
2.1.2 cluster version
Four cluster modes: the operation principle is the same
Here is only a common example: multiple Master and Slave nodes - asynchronous replication
-
Prepare two machines [192.168.1.1192.168.1.2], install the stand-alone version, and deploy all environments
A Broker of Master role and Slave role should be started on each machine, and they are the Master and backup of each other
- That is, start the master node of broker-a and the slave node of broker-b-s on machine A;
- Start the master node of broker-b and the slave node of broker-a-s on machine B.
-
Modify profile
Examples of several cluster mode configuration files are provided in the conf directory
-
2m noslave: dual master mode;
- 2m-2s-sync: dual master dual slave synchronous dual write mode;
- 2m-2s-async: dual master and dual slave asynchronous replication mode;
-
Edit the configuration file broker-a.properties of the Master Broker on the 192.168.1.1 machine
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #Cluster name brokerClusterName=rocketmq-cluster #Broker name. Note that different configuration files here are filled in differently. For example, write broker-a in the a.properties file and broker-b in the b.properties file brokerName=broker-a #0 means Master, > 0 means Slave brokerId=0 #Delete file time point. The default is 4 a.m deleteWhen=04 #File retention time, 48 hours by default fileReservedTime=120 #Role of Broker, ASYNC_MASTER = asynchronous replication master, SYNC_MASTER = synchronous double write master, SLAVE=slave node brokerRole=ASYNC_MASTER #Disc brushing mode, ASYNC_FLUSH = asynchronous disk brushing, SYNC_FLUSH = synchronous brush disc flushDiskType=SYNC_FLUSH #Listening port of Broker external service listenPort=10911 #Nameserver address, where nameserver is a single server. If nameserver is a cluster of multiple servers, it is divided by semicolons (i.e. namesrvAddr=ip1:port1;ip2:port2;ip3:port3) namesrvAddr=192.168.1.1:9876;192.168.1.2:9876 #The number of queues corresponding to each topic is 4 by default. In fact, the number of consumer instances should be referred to. If the value is too small, it is not conducive to consumer load balancing defaultTopicQueueNums=8 #Whether to allow the Broker to automatically create a Topic. It is recommended to close the production autoCreateTopicEnable=true #Whether to allow the Broker to automatically create subscription groups. It is recommended to close production autoCreateSubscriptionGroup=true #Set broker IP brokerIP1=192.168.1.1 #Storage path storePathRootDir=/data/rocketmq-all-4.7.0-bin-release/data/store-a #commitLog storage path storePathCommitLog=/data/rocketmq-all-4.7.0-bin-release/data/store-a/commitlog #Consumption queue storage path storage path storePathConsumerQueue=/data/rocketmq-all-4.7.0-bin-release/data/store-a/consumequeue #Message index storage path storePathIndex=/data/rocketmq-all-4.7.0-bin-release/data/store-a/index #checkpoint file storage path storeCheckpoint=/data/rocketmq-all-4.7.0-bin-release/data/store-a/checkpoint #abort file storage path abortFile=/data/rocketmq-all-4.7.0-bin-release/data/store-a/abort #commitLog the default size of each file is 1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue stores 30W files by default, which can be adjusted according to business conditions mapedFileSizeConsumeQueue=300000
-
Edit the configuration file broker-b-s.properties of the Master Broker on the 192.168.1.1 machine
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #Cluster name brokerClusterName=rocketmq-cluster #The name of the broker. Note that different configuration files here are filled in differently. For example, write broker-a in the a.properties file and broker-b in the b.properties file brokerName=broker-b #0 means Master, > 0 means Slave brokerId=1 #Delete file time point. The default is 4 a.m deleteWhen=04 #File retention time, 48 hours by default fileReservedTime=120 #Role of Broker, ASYNC_MASTER = asynchronous replication master, SYNC_MASTER = synchronous double write master, SLAVE=slave node brokerRole=SLAVE #Disc brushing mode, ASYNC_FLUSH = asynchronous disk brushing, SYNC_FLUSH = synchronous brush disc flushDiskType=SYNC_FLUSH #Listening port of Broker external service listenPort=11011 #Nameserver address, where nameserver is a single server. If nameserver is a cluster of multiple servers, it is divided by semicolons (i.e. namesrvAddr=ip1:port1;ip2:port2;ip3:port3) namesrvAddr=192.168.1.1:9876;192.168.1.2:9876 #The number of queues corresponding to each topic is 4 by default. In fact, the number of consumer instances should be referred to. If the value is too small, it is not conducive to consumer load balancing defaultTopicQueueNums=8 #Whether to allow the Broker to automatically create a Topic. It is recommended to close the production autoCreateTopicEnable=true #Whether to allow the Broker to automatically create subscription groups. Production is recommended to close autoCreateSubscriptionGroup=true #Set broker IP brokerIP1=192.168.1.1 #Storage path storePathRootDir=/data/rocketmq-all-4.7.0-bin-release/data/store-b #commitLog storage path storePathCommitLog=/data/rocketmq-all-4.7.0-bin-release/data/store-b/commitlog #Consumption queue storage path storage path storePathConsumerQueue=/data/rocketmq-all-4.7.0-bin-release/data/store-b/consumequeue #Message index storage path storePathIndex=/data/rocketmq-all-4.7.0-bin-release/data/store-b/index #checkpoint file storage path storeCheckpoint=/data/rocketmq-all-4.7.0-bin-release/data/store-b/checkpoint #abort file storage path abortFile=/data/rocketmq-all-4.7.0-bin-release/data/store-b/abort #commitLog the default size of each file is 1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue stores 30W files by default, which can be adjusted according to business conditions mapedFileSizeConsumeQueue=300000
-
Edit the configuration file broker-b.properties of the Master Broker on the 192.168.1.2 machine
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #Cluster name brokerClusterName=rocketmq-cluster #The name of the broker. Note that different configuration files here are filled in differently. For example, write broker-a in the a.properties file and broker-b in the b.properties file brokerName=broker-b #0 means Master, > 0 means Slave brokerId=0 #Delete file time point. The default is 4 a.m deleteWhen=04 #File retention time, 48 hours by default fileReservedTime=120 #Role of Broker, ASYNC_MASTER = asynchronous replication master, SYNC_MASTER = synchronous double write master, SLAVE=slave node brokerRole=ASYNC_MASTER #Disc brushing mode, ASYNC_FLUSH = asynchronous disk brushing, SYNC_FLUSH = synchronous brush disc flushDiskType=SYNC_FLUSH #Listening port of Broker external service listenPort=10911 #Nameserver address, where nameserver is a single server. If nameserver is a cluster of multiple servers, it is divided by semicolons (i.e. namesrvAddr=ip1:port1;ip2:port2;ip3:port3) namesrvAddr=192.168.1.1:9876;192.168.1.2:9876 #The number of queues corresponding to each topic is 4 by default. In fact, the number of consumer instances should be referred to. If the value is too small, it is not conducive to consumer load balancing defaultTopicQueueNums=8 #Whether to allow the Broker to automatically create a Topic. It is recommended to close the production autoCreateTopicEnable=true #Whether to allow the Broker to automatically create subscription groups. Production is recommended to close autoCreateSubscriptionGroup=true #Set broker IP brokerIP1=192.168.1.2 #Storage path storePathRootDir=/data/rocketmq-all-4.7.0-bin-release/data/store-b #commitLog storage path storePathCommitLog=/data/rocketmq-all-4.7.0-bin-release/data/store-b/commitlog #Consumption queue storage path storage path storePathConsumerQueue=/data/rocketmq-all-4.7.0-bin-release/data/store-b/consumequeue #Message index storage path storePathIndex=/data/rocketmq-all-4.7.0-bin-release/data/store-b/index #checkpoint file storage path storeCheckpoint=/data/rocketmq-all-4.7.0-bin-release/data/store-b/checkpoint #abort file storage path abortFile=/data/rocketmq-all-4.7.0-bin-release/data/store-b/abort #commitLog the default size of each file is 1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue stores 30W files by default, which can be adjusted according to business conditions mapedFileSizeConsumeQueue=300000
-
Edit the configuration file broker-a-s.properties of the Master Broker on the 192.168.1.2 machine
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #Cluster name brokerClusterName=rocketmq-cluster #The name of the broker. Note that different configuration files here are filled in differently. For example, write broker-a in the a.properties file and broker-b in the b.properties file brokerName=broker-a #0 means Master, > 0 means Slave brokerId=1 #Delete file time point. The default is 4 a.m deleteWhen=04 #File retention time, 48 hours by default fileReservedTime=120 #Role of Broker, ASYNC_MASTER = asynchronous replication master, SYNC_MASTER = synchronous double write master, SLAVE=slave node brokerRole=SLAVE #Disc brushing mode, ASYNC_FLUSH = asynchronous disk brushing, SYNC_FLUSH = synchronous brush disc flushDiskType=SYNC_FLUSH #Listening port of Broker external service listenPort=11011 #Nameserver address, where nameserver is a single server. If nameserver is a cluster of multiple servers, it is divided by semicolons (i.e. namesrvAddr=ip1:port1;ip2:port2;ip3:port3) namesrvAddr=192.168.1.1:9876;192.168.1.2:9876 #The number of queues corresponding to each topic is 4 by default. In fact, the number of consumer instances should be referred to. If the value is too small, it is not conducive to consumer load balancing defaultTopicQueueNums=8 #Whether to allow the Broker to automatically create a Topic. It is recommended to close the production autoCreateTopicEnable=true #Whether to allow the Broker to automatically create subscription groups. Production is recommended to close autoCreateSubscriptionGroup=true #Set broker IP brokerIP1=192.168.1.2 #Storage path storePathRootDir=/data/rocketmq-all-4.7.0-bin-release/data/store-a #commitLog storage path storePathCommitLog=/data/rocketmq-all-4.7.0-bin-release/data/store-a/commitlog #Consumption queue storage path storage path storePathConsumerQueue=/data/rocketmq-all-4.7.0-bin-release/data/store-a/consumequeue #Message index storage path storePathIndex=/data/rocketmq-all-4.7.0-bin-release/data/store-a/index #checkpoint file storage path storeCheckpoint=/data/rocketmq-all-4.7.0-bin-release/data/store-a/checkpoint #abort file storage path abortFile=/data/rocketmq-all-4.7.0-bin-release/data/store-a/abort #commitLog the default size of each file is 1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue stores 30W files by default, which can be adjusted according to business conditions mapedFileSizeConsumeQueue=300000
-
-
Start the Broker service
-
Start 192.168.1.1 main Broker service
-
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a.properties &
-
-
Start 192.168.1.2 main Broker service
-
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b.properties &
-
-
Start 192.168.1.2 standby Broker service
-
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a-s.properties &
-
-
Start 192.168.1.1 standby Broker service
-
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b-s.properties &
-
-
-
Installing the rocketmq console
-
Download and package rocketmq console download address: rocketmq-console
-
Modify configuration file: rocketmq config. namesrvAddr=‘192.168.1.1:9876; 192.168.1.2:9876’
-
Start app Java no exception access: http://localhost:8080
[the external chain image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-kecxcojg-1625476775584) (C: \ users \ Laisheng \ appdata \ roaming \ typora user images \ 1625450979077. PNG)]
-
2.2 code integration
2.2.1 simple example
<!--Add dependency--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.0</version> </dependency>
-
Send message synchronously, send message asynchronously, send message unidirectionally
package com; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import org.junit.Test; import java.io.UnsupportedEncodingException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** * @author Laisheng * @version 1.0 * @date 2021-07-05 * @className RocketMQSimplenessTest * @description rockerMq Simple example API **/ public class RocketMQSimplenessTest { /** * Send messages synchronously * Reference scenario: notification message, SMS notification, SMS marketing system */ @Test public void syncSendMsg() throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException { // 1. Instantiate with producer group name DefaultMQProducer producer = new DefaultMQProducer("defaultProducer"); // 2. Specify the server address producer.setNamesrvAddr("192.168.1.20:9876"); // 3. Start the instance producer.start(); for (int i = 0; i < 50; i++){ Message msg = new Message("TopicTest" /* Topic theme */, "TagA" /* Tag edition*/, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body Message subject*/ ); // Call send message to deliver the message to one of the agents SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } // Once the producer instance is no longer in use, close it producer.shutdown(); } /** * Send message asynchronously * Usage scenario: generally used for business scenarios sensitive to response time */ @Test public void asyncSendMsg() throws MQClientException, InterruptedException { // 1. Instantiate with producer group name DefaultMQProducer producer = new DefaultMQProducer("defaultProducer"); // 2. Specify the server address producer.setNamesrvAddr("192.168.1.20:9876"); // 3. Start the instance producer.start(); // 4. In asynchronous mode, the maximum number of retries performed internally before declaring the sending failure producer.setRetryTimesWhenSendAsyncFailed(0); int messageCount = 50; // 5. Count initialization final CountDownLatch countDownLatch = new CountDownLatch(messageCount); for (int i = 0; i < messageCount; i++) { try { final int index = i; Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); // Send message SendCallback: callback executed when sending is completed, successful or unsuccessful producer.send(msg, new SendCallback() { // success @Override public void onSuccess(SendResult sendResult) { // Decrements the count of the latch and releases all waiting threads if the count reaches zero countDownLatch.countDown(); System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); } // fail @Override public void onException(Throwable e) { // Decrements the count of the latch and releases all waiting threads if the count reaches zero countDownLatch.countDown(); System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } }); } catch (Exception e) { e.printStackTrace(); } } // Wait for the child thread to finish running countDownLatch.await(5, TimeUnit.SECONDS); // Once the producer instance is no longer in use, close it producer.shutdown(); } /** * Single send message * Usage scenario: log collection. */ @Test public void oneWaySendMsg() throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException { // 1. Instantiate with producer group name DefaultMQProducer producer = new DefaultMQProducer("defaultProducer"); // 2. Specify the server address producer.setNamesrvAddr("192.168.1.20:9876"); // 3. Start the instance producer.start(); for (int i = 0; i < 100; i++) { // Create a message instance that specifies the subject, tag, and message body Message msg = new Message("TopicTest" /* Topic theme*/, "TagA" /* Tag edition*/, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body Message body*/ ); // Call send message to deliver the message to one of the agents. producer.sendOneway(msg); } // 4. Wait for sending to complete Thread.sleep(5000); producer.shutdown(); } }
-
Consumption news
package com; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * @author Laisheng * @date 2021/7/5 0005 * @className Consumer * @description Consumption news * @version 1.0 **/ public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { // 1. Instantiate with the specified user group name DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("defaultProducer"); // 2. Specify the server address consumer.setNamesrvAddr("192.168.1.20:9876"); // 3. Subscribe to a topic to use consumer.subscribe("TopicTest", "*"); // 4. Register the callback to execute when the message obtained from the agent arrives consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msg, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //5. Start the user instance consumer.start(); System.out.printf("Consumer Started.%n"); } }
2.2.2 order example
-
Message push using global and partition sorting
package com.producer; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.UnsupportedEncodingException; import java.util.List; /** * @author Laisheng * @version 1.0 * @date 2021-07-05 * @className OrderProducer * @description Message push using global and partition sorting **/ public class OrderProducer { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException { // 1. Instantiate with producer group name DefaultMQProducer producer = new DefaultMQProducer("defaultProducer"); // 2. Specify the server address producer.setNamesrvAddr("192.168.1.20:9876"); // 2. Start the instance producer.start(); // 3. Define zoning String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 50; i++) { int orderId = i % 10; // Create a message instance that specifies the subject, tag, and message body Message msg = new Message("TopicOrderTest", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); /** * @param msg Message to send * @param selector Message queue selector, through which we get the target message queue to deliver messages * @param arg Parameters used with message queue selectors */ SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId); System.out.printf("%s%n", sendResult); } // 4. Once the producer instance is no longer in use, close it producer.shutdown(); } }
-
Subscribe to messages
package com.consumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; import java.util.concurrent.atomic.AtomicLong; /** * @author Laisheng * @version 1.0 * @date 2021-07-05 * @className OrderedConsumer * @description Subscribe to messages **/ public class OrderedConsumer { public static void main(String[] args) throws Exception { // 1. Instantiate with producer group name DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("defaultProducer"); // 2. Specify the server address consumer.setNamesrvAddr("192.168.1.20:9876"); // 3. The consumer starts the consumer point program and obtains data from the message queue header for the first time consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 4. Subscribe to topics to consume subscriptions consumer.subscribe("TopicOrderTest", "TagA || TagC || TagD"); // 5. Monitoring consumption consumer.registerMessageListener(new MessageListenerOrderly() { AtomicLong consumeTimes = new AtomicLong(0); @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msg, ConsumeOrderlyContext context) { context.setAutoCommit(false); System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msg + "%n"); this.consumeTimes.incrementAndGet(); if ((this.consumeTimes.get() % 2) == 0) { return ConsumeOrderlyStatus.SUCCESS; } if ((this.consumeTimes.get() % 3) == 0) { return ConsumeOrderlyStatus.ROLLBACK; } if ((this.consumeTimes.get() % 4) == 0) { return ConsumeOrderlyStatus.COMMIT; } if ((this.consumeTimes.get() % 5) == 0) { context.setSuspendCurrentQueueTimeMillis(3000); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
2.2.3 broadcast example
Broadcast: Send a message to all subscribers [consumers] subscribing to the topic
-
Producer example
package com.producer; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; /** * @author Laisheng * @date 2021/7/5 0005 * @className BroadcastProducer * @description radio broadcast * @version 1.0 **/ public class BroadcastProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); // 2. Specify the server address producer.setNamesrvAddr("192.168.1.20:9876"); producer.start(); for (int i = 0; i < 50; i++) { Message msg = new Message("TopicGroupTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } producer.shutdown(); } }
-
Consumer example
package com.consumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import java.util.List; /** * @author Laisheng * @date 2021/7/5 0005 * @className BroadcastConsumer * @description radio broadcast * @version 1.0 **/ public class BroadcastConsumer { public static void main(String[] args) throws Exception { // 1. Instantiate with producer group name DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ProducerGroupName"); // 2. Specify the server address consumer.setNamesrvAddr("192.168.1.20:9876"); // 3. The consumer starts the consumer point program and obtains data from the message queue header for the first time consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 4. Set to broadcast mode consumer.setMessageModel(MessageModel.BROADCASTING); // 5. Subscribe to topics to consume subscriptions consumer.subscribe("TopicGroupTest", "TagA || TagC || TagD"); // 6. Monitoring consumption consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msg,ConsumeConcurrentlyContext context) { System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msg + "%n"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Broadcast Consumer Started.%n"); } }
2.2.4 schedule example
Scheduled messages: scheduled messages differ from normal messages in that they are not delivered until after a specified time.
-
Start the consumer waiting for incoming subscription messages
package com.consumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * @author Laisheng * @date 2021/7/5 0005 * @className ScheduledMessageConsumer * @description Start the consumer waiting for incoming subscription messages * @version 1.0 **/ public class ScheduledMessageConsumer { public static void main(String[] args) throws Exception { // 1. Instantiate with producer group name DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer"); // 2. Specify the server address consumer.setNamesrvAddr("192.168.1.20:9876"); // 3. Subscribe to a topic to use consumer.subscribe("TestExampleTopic", "*"); // 4. Register message listener consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) { for (MessageExt message : messages) { // Print approximate delay period System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later"); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // Start consumer consumer.start(); } }
-
Send scheduled message
package com.consumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * @author Laisheng * @date 2021/7/5 0005 * @className ScheduledMessageConsumer * @description Start the consumer waiting for incoming subscription messages * @version 1.0 **/ public class ScheduledMessageConsumer { public static void main(String[] args) throws Exception { // 1. Instantiate with producer group name DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer"); // 2. Specify the server address consumer.setNamesrvAddr("192.168.1.20:9876"); // 3. Subscribe to a topic to use consumer.subscribe("TestExampleTopic", "*"); // 4. Register message listener consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) { for (MessageExt message : messages) { // Print approximate delay period System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later"); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // Start consumer consumer.start(); } }
2.2.5 batch processing example
Why batch processing?
- Sending messages in bulk improves the performance of delivering small messages.
Use restrictions:
Messages in the same batch should have the same subject, the same waitStoreMsgOK, and no scheduling support.
In addition, the total size of a batch of messages should not exceed 1MiB.
-
Batch send message [message size not exceeding 1MiB]
package com.producer; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import java.util.ArrayList; import java.util.List; /** * @author Laisheng * @version 1.0 * @date 2021-07-05 * @className BatchingProducer * @description Batch send message **/ public class BatchingProducer { public static void main(String[] args) throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer("ProducerBatchingName"); // 2. Specify the server address producer.setNamesrvAddr("192.168.1.20:9876"); producer.start(); String topic = "BatchTestTopic"; List<Message> messages = new ArrayList<>(); messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes())); messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes())); messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes())); try { producer.send(messages); } catch (Exception e) { throw new RuntimeException("Send message failed"); }finally { producer.shutdown(); } } }
-
Batch send message [message size exceeds 1MiB]
package com.producer; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; /** * @author Laisheng * @version 1.0 * @date 2021-07-05 * @className BroadcastsProducer * @description Batch send message [message size exceeds 1MiB] **/ public class BroadcastsProducer implements Iterator<List<Message>> { /** * Page size */ private final int SIZE_LIMIT = 1000 * 1000; /** * Message collection */ private final List<Message> messages; /** * Current index */ private int currIndex; public BroadcastsProducer(List<Message> messages) { this.messages = messages; } /** *Detects whether there are elements in the collection * @return */ @Override public boolean hasNext() { return currIndex < messages.size(); } /** * Returns the next element of the iterator * @return */ @Override public List<Message> next() { // Current index int nextIndex = currIndex; for (int totalSize = 0; nextIndex < messages.size(); nextIndex++) { // Get current message Message message = messages.get(nextIndex); // Subject length + message body length int tmpSize = message.getTopic().length() + message.getBody().length; // Gets the configuration information of the current message Map<String, String> properties = message.getProperties(); // Get the total length of key+value for (Map.Entry<String, String> entry : properties.entrySet()) { tmpSize += entry.getKey().length() + entry.getValue().length(); } // For log overhead tmpSize = tmpSize + 20; if (tmpSize > SIZE_LIMIT) { // Single message exceeds SIZE_LIMIT // Let it go here, or it will stop the division process if (nextIndex - currIndex == 0) { // If there is no element in the next sub list, add this and interrupt, otherwise interrupt nextIndex++; } break; } if (tmpSize + totalSize > SIZE_LIMIT) { break; } else { totalSize += tmpSize; } } // Split message List<Message> subList = messages.subList(currIndex, nextIndex); currIndex = nextIndex; return subList; } public static void main(String[] args) throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer("ProducerBatchingName"); // 2. Specify the server address producer.setNamesrvAddr("192.168.1.20:9876"); producer.start(); String topic = "BatchTestTopic"; List<Message> messages = new ArrayList<>(); messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes())); messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes())); messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes())); // Divide the big list into small lists BroadcastsProducer splitter = new BroadcastsProducer(messages); while (splitter.hasNext()) { try { List<Message> listItem = splitter.next(); producer.send(listItem); } catch (Exception e) { throw new RuntimeException("Send message failed"); }finally { producer.shutdown(); } } } }
2.2.6 filter example
In most cases, tags are a simple and useful design for selecting the messages you want. For example:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE"); consumer.setNamesrvAddr("192.168.1.20:9876"); consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
The consumer will receive a message containing TAGA or TAGB or TAGC. However, the limitation is that a message can only have one label, which may not be applicable to complex scenarios. In this case, you can use SQL expressions to filter messages.
principle
The SQL function can perform some calculations through the properties you enter when sending a message. Under the syntax defined by RocketMQ, you can implement some interesting logic. Here is an example:
grammar
RocketMQ only defines some basic syntax to support this feature. You can also easily expand it.
- Numerical comparison, such as >, > =, <, < =, between, =;
- Character comparison, such as =, < >, in;
- IS NULL or IS NOT NULL;
- Logical AND, OR, NOT;
Constant types are:
- Numbers, such as 123, 3.1415;
- Characters, such as' abc ', must be caused by single quotation marks;
- NULL, special constant;
- Boolean, TRUE or FALSE;
Use restrictions
Only push consumers can select messages through SQL92. [consumer.subscribe(“TOPIC”, “TAGA || TAGB || TAGC”)]
-
Producer example
package com.producer; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.UnsupportedEncodingException; /** * @author Laisheng * @version 1.0 * @date 2021-07-05 * @className FiltrationProducer * @description filter **/ public class FiltrationProducer { public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException { // 1. Instantiate with producer group name DefaultMQProducer producer = new DefaultMQProducer("ProducerFiltrationName"); // 2. Specify the server address producer.setNamesrvAddr("192.168.1.20:9876"); // 3. Start producer.start(); for (int i = 0; i < 50; i++) { Message msg = new Message("TopicFiltrationTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); // Set some properties. msg.putUserProperty("a", String.valueOf(i)); SendResult sendResult = producer.send(msg); } producer.shutdown(); } }
-
Consumer example
package com.consumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * @author Laisheng * @version 1.0 * @date 2021-07-05 * @className FiltrationConsumer * @description filter **/ public class FiltrationConsumer { public static void main(String[] args) throws MQClientException { // 1. Instantiate with producer group name DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ProducerFiltrationName"); // 2. Specify the server address consumer.setNamesrvAddr("192.168.1.20:9876"); // 3. Only subscription messages have attributes a, also a > = 0 and a < = 3 consumer.subscribe("TopicFiltrationTest", MessageSelector.bySql("a between 0 and 3")); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msg, ConsumeConcurrentlyContext context) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }
2.2.7 example of log add-on program
RocketMQ logappender provides log4j appender, log4j2 appender and logback appender for business use
- Use the log4j property to configure the file
log4j.appender.mq=org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender log4j.appender.mq.Tag=yourTag log4j.appender.mq.Topic=yourLogTopic log4j.appender.mq.ProducerGroup=yourLogGroup log4j.appender.mq.NameServerAddress=yourRocketmqNameserverAddress log4j.appender.mq.layout=org.apache.log4j.PatternLayout log4j.appender.mq.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-4r [%t] (%F:%L) %-5p - %m%n
- When using log4j xml configuration file, configure it as follows and add an asynchronous add-on:
<appender name="mqAppender1" class="org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender"> <param name="Tag" value="yourTag" /> <param name="Topic" value="yourLogTopic" /> <param name="ProducerGroup" value="yourLogGroup" /> <param name="NameServerAddress" value="yourRocketmqNameserverAddress"/> <layout class="org.apache.log4j.PatternLayout"> <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss}-%p %t %c - %m%n" /> </layout> </appender> <appender name="mqAsyncAppender1" class="org.apache.log4j.AsyncAppender"> <param name="BufferSize" value="1024" /> <param name="Blocking" value="false" /> <appender-ref ref="mqAppender1"/> </appender>
- When log4j2 is used, the configuration is as follows
<!-- If you want no blocks, just ref Configure a asyncAppender --> <RocketMQ name="rocketmqAppender" producerGroup="yourLogGroup" nameServerAddress="yourRocketmqNameserverAddress" topic="yourLogTopic" tag="yourTag"> <PatternLayout pattern="%d [%p] hahahah %c %m%n"/> </RocketMQ>
- When using logback, you also need an asyncAppender
<appender name="mqAppender1" class="org.apache.rocketmq.logappender.logback.RocketmqLogbackAppender"> <tag>yourTag</tag> <topic>yourLogTopic</topic> <producerGroup>yourLogGroup</producerGroup> <nameServerAddress>yourRocketmqNameserverAddress</nameServerAddress> <layout> <pattern>%date %p %t - %m%n</pattern> </layout> </appender> <appender name="mqAsyncAppender1" class="ch.qos.logback.classic.AsyncAppender"> <queueSize>1024</queueSize> <discardingThreshold>80</discardingThreshold> <maxFlushTime>2000</maxFlushTime> <neverBlock>true</neverBlock> <appender-ref ref="mqAppender1"/> </appender>
2.2.8 openmessaging example
OpenMessaging , including the establishment of industry guidelines and messaging and flow specifications to provide a common framework for the fields of finance, e-commerce, Internet of things and big data. The design principle is cloud oriented, simple, flexible and language independent in distributed heterogeneous environment. Compliance with these specifications will make it possible to develop heterogeneous messaging applications across all major platforms and operating systems.
RocketMQ provides a partial implementation of OpenMessaging 0.1.0-alpha. The following example demonstrates how to access RocketMQ based on OpenMessaging.
- OMS producer: the following example shows how to send messages to RocketMQ broker in synchronous, asynchronous or one-way transmission.
public class OMSProducer { public static void main(String[] args) { final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); final Producer producer = messagingAccessPoint.createProducer(); messagingAccessPoint.startup(); System.out.printf("MessagingAccessPoint startup OK%n"); producer.startup(); System.out.printf("Producer startup OK%n"); { Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))); SendResult sendResult = producer.send(message); System.out.printf("Send sync message OK, msgId: %s%n", sendResult.messageId()); } { final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); result.addListener(new PromiseListener<SendResult>() { @Override public void operationCompleted(Promise<SendResult> promise) { System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId()); } @Override public void operationFailed(Promise<SendResult> promise) { System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage()); } }); } { producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); System.out.printf("Send oneway message OK%n"); } producer.shutdown(); messagingAccessPoint.shutdown(); } }
- OMSP pull consumer: use OMS PullConsumer to poll messages from the specified queue.
public class OMSPullConsumer { public static void main(String[] args) { final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC", OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER")); messagingAccessPoint.startup(); System.out.printf("MessagingAccessPoint startup OK%n"); consumer.startup(); System.out.printf("Consumer startup OK%n"); Message message = consumer.poll(); if (message != null) { String msgId = message.headers().getString(MessageHeader.MESSAGE_ID); System.out.printf("Received one message: %s%n", msgId); consumer.ack(msgId); } consumer.shutdown(); messagingAccessPoint.shutdown(); } }
2.2.9 transaction example [transaction]
What are transactional messages?
It can be considered as the implementation of two-stage submission message to ensure the final consistency in distributed system. Transactional messages ensure that the execution of local transactions and the sending of messages can be performed atomically.
Use restrictions
- Transactional messages do not have scheduling and batch support.
- In order to avoid the accumulation of semi queue messages caused by too many times of checking a single message, we limit the number of times of checking a single message to 15 by default, but users can change the "transactionCheckMax" parameter. In the broker configuration, if a message is checked "transactionCheckMax" times, the broker will discard the message by default and print the error log at the same time. Users can change this behavior by overriding the AbstractTransactionCheckListener class.
- The transaction message will be checked after a certain time, which is determined by the parameter "transactionTimeout" in the agent configuration. In addition, the user can also change this limit by setting the user attribute "check_integrity_time_in_seconds" when sending transaction messages. This parameter takes precedence over the "transactionMsgTimeout" parameter.
- A transaction message may be checked or consumed more than once.
- Message replies submitted to the user's target topic may fail. At present, this depends on logging. High availability is guaranteed by RocketMQ's own high availability mechanism. If you want to ensure that transaction messages are not lost and transaction integrity, it is recommended to use synchronous double write mechanism.
- The producer ID of a transactional message cannot be shared with the producer ID of other types of messages. Unlike other types of messages, transactional messages allow backward queries. MQ Server queries the client through the producer ID.
application
1, Transaction status
Transaction messages have three states:
- TransactionStatus.CommitTransaction: commit a transaction, indicating that the consumer is allowed to consume this message.
- TransactionStatus.RollbackTransaction: rolls back the transaction, indicating that the message will be deleted and consumption is not allowed.
- TransactionStatus.Unknown: intermediate status, indicating that MQ check back is required to determine the status.
2, Send transaction message
- Create transaction producer
Use the TransactionMQProducer class to create a producer client and specify a unique producer group. You can set a custom thread pool to process check requests. After executing the local transaction, you need to reply to MQ according to the execution result, and the reply status is described in the previous section.
package com.producer; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; /** * @author Laisheng * @date 2021/7/5 0005 * @className TransactionListenerImpl * @description Transaction listening * @version 1.0 **/ public class TransactionListenerImpl implements TransactionListener { private AtomicInteger transactionIndex = new AtomicInteger(0); private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>(); /** * This method is called to execute the local transaction when the transactional preparation (semi) message is sent successfully. * @param msg * @param arg * @return */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { int value = transactionIndex.getAndIncrement(); int status = value % 3; localTrans.put(msg.getTransactionId(), status); return LocalTransactionState.UNKNOW; } /** * When there is no response to the preparation (semi) message The broker will send a check message to check the transaction status, and call this method to obtain the local transaction status. * @param msg * @return */ @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { Integer status = localTrans.get(msg.getTransactionId()); if (null != status) { switch (status) { case 0: return LocalTransactionState.UNKNOW; case 1: return LocalTransactionState.COMMIT_MESSAGE; case 2: return LocalTransactionState.ROLLBACK_MESSAGE; } } return LocalTransactionState.COMMIT_MESSAGE; } } package com.producer; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.io.UnsupportedEncodingException; import java.util.concurrent.*; /** * @author Laisheng * @version 1.0 * @date 2021/7/5 0005 * @className TransactionProducer * @description affair **/ public class TransactionProducer { public static void main(String[] args) throws MQClientException, InterruptedException { // Define transaction listening TransactionListener transactionListener = new TransactionListenerImpl(); // Specifies the constructor for the namespace, producer group, and RPC hook. TransactionMQProducer producer = new TransactionMQProducer("transactionMQProducer"); // Specify the server address producer.setNamesrvAddr("192.168.1.20:9876"); // Define thread pool ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } }); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.start(); String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 10; i++) { try { Message msg = new Message("TopicTest1234", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", sendResult); Thread.sleep(10); } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); } } for (int i = 0; i < 100000; i++) { Thread.sleep(1000); } producer.shutdown(); } }
2.2.10 common problems
usage
- Where does the newly created Consumer ID start consuming messages?
- If the subject sends a message within three days, the consumer starts consuming the message from the first message saved in the server.
- If the subject sends a message three days ago, the consumer consumes the message from the latest message in the server, in other words, from the end of the message queue.
- If such a consumer restarts, it starts consuming messages from the last consumer location.
- How to re consume messages after consumption failure?
- Cluster consumption mode consumption business logic code returns action Reconsumerlater, NULL, or throw an exception. If a message consumption fails, retry it up to 16 times, and then the message will be discarded.
- Broadcast consumption mode broadcast consumption still ensures that the message is consumed at least once, but does not provide the option of resending.
- How to query failure messages when consumption fails?
- Using query by time topic, you can query messages over a period of time.
- Use Topic and Message Id to accurately query messages.
- Using Topic and Message Key, you can accurately query a class of messages with the same Message Key.
- Is the message delivered only once?
- RocketMQ ensures that all messages are delivered at least once. In most cases, messages do not repeat.
- How to add a new consumer?
- Start a new agent and register it with the same list of name servers.
- By default, only internal system themes and consumer groups are automatically created. If you want to have your business themes and consumer groups on the new node, copy them from the existing agent. Administrative tools and command lines are provided to deal with this problem.
Configuration related
- How long does the message stay on the server?
- Stored messages will be saved for up to 3 days, and messages unused for more than 3 days will be deleted.
- What is the size limit of the message body?
- Generally 256KB.
- How to set the number of consumer threads?
- When starting the Consumer, set a consumethreadnum property, as shown in the following example
consumer.setConsumeThreadMin(20); consumer.setConsumeThreadMax(20);
error
- If you fail to start a producer or consumer and the error message is that the producer group or consumer is repeated?
- Cause: using the same Producer/Consumer Group to start multiple Producer/Consumer instances in the same JVM may cause the client to fail to start.
- Solution: ensure that the JVM corresponding to a Producer/Consumer Group only starts one Producer/Consumer instance.
- If the consumer fails to start loading json files in broadcast mode?
- Reason: the fastjson version is too low to allow broadcast consumers to load local offsets JSON, causing the consumer to fail to start. A corrupted fastjson file can cause the same problem.
- Solution: the fastjson version must be upgraded to the rocketmq client dependent version to ensure local offsets JSON can be loaded. By default, offsets The JSON file is in / home / {user} / rocketmq_ In offsets. Or check the integrity of fastjson.
- What is the impact of Broker downtime?
- Primary Broker down: messages can no longer be sent to this proxy set, but if another proxy set is available, messages can still be sent even if there is a subject. News can still be consumed from slaves.
- Standby Broker downtime: as long as there is another working slave, it will not affect the sending of messages. It also has no impact on consumption messages unless the consumer group is set to preferentially consume from the slave. By default, consumer groups consume from master.
- Downtime of all standby brokers: sending messages to the master will not have any impact. However, if the master is SYNC_MASTER and producer will get a SLAVE_NOT_AVAILABLE indicates that the message is not sent to any slave. It also has no impact on consumption messages, unless the consumer group is set to consume best from slave. By default, consumer groups consume from master.
- Producer reports an error "No Topic Route Info". How to diagnose it?
- Background: this happens when you try to send a message to a topic whose routing information is not available to the producer.
- Ensure that producers can connect to the name server and get routing meta information from it.
- Ensure that the name server does contain routing meta information for the topic. You can query the routing meta information from the name server through topicRoute using the management tool or the Web console.
- Make sure your agent sends a heartbeat to the same list of name servers to which your producer is connected.
- Ensure that the permission of the subject is 6(rw -), or at least 2(-w -).
- If this topic cannot be found, create it on the agent through the administrative tool command updateTopic or the Web console.