RocketMQ learning notes

RocketMQ learning notes

1. Introduction to rocketmq

1.1 official API

1.1.1 concept and characteristics

  • Concept : introduce the basic conceptual model of RocketMQ.
  • Features : describes the functions and features implemented by RocketMQ.

1.1.2 architecture design

  • Architecture : introduce RocketMQ deployment architecture and technical architecture.
  • Design : This paper introduces the design principle of key mechanisms of RocketMQ, mainly including message storage, communication mechanism, message filtering, load balancing, transaction messages, etc.

1.1.3 example

  • Example : describes the common usage of RocketMQ, including basic sample, sequence message sample, delay message sample, batch message sample, filter message sample, transaction message sample, etc.

1.1.4 best practices

  • Best Practice : introduce the best practices of RocketMQ, including the best practices of producers, consumers, brokers and NameServer, the configuration mode of clients, and the best parameter configuration of JVM and linux.
  • Message trace : describes how to use RocketMQ message tracks.
  • Authority management (Auth Management) : describes how to quickly deploy and use a RocketMQ cluster that supports permission control.
  • Dledger quick start : This paper introduces the fast building method of Dledger.
  • Cluster deployment : introduce the cluster deployment mode of Dledger.

1.1.5 operation and maintenance management

  • Cluster deployment (Operation) : introduce the deployment methods of various forms of RocketMQ clusters, such as single Master mode, multi Master mode, multi Master and multi slave mode, and the use of the operation and maintenance tool mqadmin.

1.1.6 API Reference

2.1 usage scenario, installation, advantages and disadvantages

Background: RocketMQ is the official designated messaging product of Alibaba on the "double 11" and supports all messaging services of Alibaba group. After more than ten years of rigorous test of high availability and high reliability, RocketMQ is the core product of Alibaba trading link;

2.1.1 usage scenario

  1. decoupling

    Scenario: system A sends data to three BCD systems and sends it through interface call. What if the E system also needs this data? What if system C doesn't need it now? The head of system A almost collapsed

    In this scenario, system A is seriously coupled with other messy systems. System A generates A key data, and many systems need system A to send this data. System A should always consider BCDE. What if the four systems hang up? Do you want to resend it or save it? My hair is white!

    If MQ is used, system A generates A piece of data and sends it to MQ. Which system needs the data to be consumed in MQ itself. If the new system needs data, it can be consumed directly from MQ; If A system does not need this data, you can cancel the consumption of MQ messages. In this way, system A does not need to consider who to send data to, maintain this code, or whether others call successfully, fail and timeout.

  2. asynchronous

    Scenario: when system A receives A request, it needs to write in its own local database. It also needs to write in the three BCD systems. It takes 3MS to write in its own local database, and 300ms, 450ms and 200ms to write in the three BCD systems respectively. The total delay of the final request is 3 + 300 + 450 + 200 = 953ms, which is close to 1s. The user feels that something is too slow. It is almost unacceptable for A user to initiate A request through the browser and wait for 1s.

    If MQ is used, system A sends three messages to the MQ queue continuously. If it takes 5ms, the total time from receiving A request to returning A response to the user is 3 + 5 = 8ms. For the user, it actually feels like clicking A button and returning directly after 8ms.

  3. Peak clipping

    Scenario: from 0:00 to 12:00 every day, system a is calm, with 50 concurrent requests per second. As a result, from 12:00 to 13:00 every time, the number of concurrent requests per second will suddenly increase to 5k +. However, the system is directly based on MySQL. A large number of requests flow into mysql, and about 5k SQL messages are executed on MySQL every second. Generally, MySQL can handle 2k requests per second. If it can handle 5k requests per second, MySQL may be killed directly, resulting in system crash and users can no longer use the system. However, once the peak period is over, it becomes a low peak period in the afternoon. Maybe 1w users operate on the website at the same time, and the number of requests per second may be 50, which has almost no pressure on the whole system.

    If MQ is used, 5k requests are written to MQ every second. System A can process up to 2k requests per second, because MySQL can process up to 2k requests per second. System A slowly pulls requests from MQ. It pulls 2k requests every second. It should not exceed the maximum number of requests it can handle per second. In this way, system A will never hang up even during peak hours. MQ sends 5k requests in and 2k requests out every second. As A result, hundreds of thousands or even millions of requests may be overstocked in MQ during the noon peak (one hour).

2.1.2 installation steps

  • Download RocketMQ

    Latest version of RocketMQ: 4.9.0

    # Unzip
    unzip rocketmq-all-4.9.0-bin-release.zip
    
  • Environmental requirements

    • Linux 64 bit operating system
    • JDK1.8(64 bit)
    • Maven 3.0 is required for source code installation 2.x

2.1.3 advantages and disadvantages of use

  • advantage
  • shortcoming

2.2 starting and closing RocketMQ

  1. Starting and shutting down NameServer
# 1. Start NameServer
nohup sh bin/mqnamesrv &
# 2. View log
tail -f ~/logs/rocketmqlogs/namesrv.log
# 3. Close NameServer
sh bin/mqshutdown namesrv
  1. Starting and closing Broker
# 1. Start Broker autoCreateTopicEnable=true: topics can be created automatically and manually
nohup sh bin/mqbroker -c ./conf/broker.conf -n localhost:9876 autoCreateTopicEnable=true & 
# 2. View log
tail -f ~/logs/rocketmqlogs/broker.log
# 3. Close the Broker
sh bin/mqshutdown broker
  1. Modify the default memory size [when the memory size is insufficient]
# 1. Modify runserver SH initialization memory size
JAVA_OPT="${JAVA_OPT} -server -Xms521m -Xmx521m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
# 2. Modify runbroker SH initialization memory size
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"

2.3 testing RocketMQ

2.3.1 sending messages

# 1. Set phase variables
export NAMESRV_ADDR=localhost:9876
# 2. Send messages using the installed demo package
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

2.3.2 accept message

# 1. Set phase variables
export NAMESRV_ADDR=localhost:9876
# 2. Accept messages
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

2.4 components

  • Broker: the component that actually handles business [message storage and forwarding]
  • NameServer: saves Broker related service information, similar to the registry
  • Consumer: Consumer News
  • Producer: source of information

Related functions of vernacular comprehension components

  1. Messages generated by the producer are delivered to the Broker through the NameServer
  2. Consumers call Broker from NameServer to get messages.

2.4.1 turn off the firewall

# 1. Turn off the firewall
systemctl stop firewalld.service

2.4.2 Architecture Principle

[the external chain image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-vb6ga1yd-1625476775579) (C: \ users \ Laisheng \ appdata \ roaming \ typora \ typora user images \ 1625210735249. PNG)]

  • Four cluster modes

    • Single Master mode: single Master node. The disadvantage is that the whole service may not be available after downtime;
    • Multi Master mode: multiple Master nodes share and store our messages. Disadvantages: without Slave node, the message data may be lost after the main Master node goes down;
    • Multi Master and multi Slave mode - asynchronous replication: multiple Master and Slave nodes adopt asynchronous mode, which is very efficient, and the data may be delayed temporarily (in milliseconds)
    • Multi Master and multi Slave mode - synchronous double write: multiple Master and Slave nodes adopt the synchronous form, which has low efficiency and no data delay.
  • Cluster configuration

    # Cluster name can distinguish different clusters, and multiple clusters can be built for different services
    brokerClusterName=mayikt
    # The Broker names Master and Slave indicate the relationship by using the same Broker name to indicate which Master's Slave a Slave is.
    brokerName=broker-a
    # There can be multiple Slave in a MasterBroker. 0 indicates the Master, and greater than 0 indicates the ID s of different Slave.
    brokerId=0 
    # Echoing with the fileReservedTim parameter, it indicates when to delete messages. The default value of 04 indicates 4 a.m.
    deleteWhen=04
    # nameServer cluster configuration address
    namesrvAddr=192.168.1.1:9876;192.168.1.2:9876
    # Turn on automatic Topic creation
    autoCreateTopicEnable=true
    # topic number of queues created by default
    defaultTopicQueueNums=4
    # Whether to allow the Broker to automatically create subscription groups. It is recommended to open offline and close online. The default is [true]
    autoCreateSubscriptionGroup=true
    # The port number that the Broker listens to. If multiple brokers are started on one machine, set different port numbers to avoid conflicts.
    listenPort=10911
    brokerIp=192.168.1.12.5
    

2.5 distributed transactions

2.5.1 background

In the context of microservices with multiple data sources:

Service A is A data source and service B is A data source. Service A needs to asynchronously push data to service B. at this time, when service A has finished pushing data, an exception occurs, resulting in data rollback, but service B has received data. Generally speaking, A friend goes to the bank to withdraw money and withdraws 5W yuan, However, the digital amount in the bank card has not changed, so there is A problem of transaction consistency.

2.5.2 solutions

Half message: it is equivalent to the message delivered in the past, but there is an ID to determine whether it is actually delivered to MQ.

  1. The producer (message delivery party) delivers the transaction message to the Broker, and sets the message as a semi message and cannot be consumed;
  2. Start executing our local transaction and send the execution result (commit / rollback) of the local transaction to the Broker;
  3. The Broker obtains the rollback or commit. If it is rollback, delete the message. If it is commit, the message can be consumed by the consumer;
  4. If the Broker fails to obtain the sender's local transaction results in time, it will actively query the local transaction results.

2.RocketMQ usage

2.1 environment construction

2.1.1 stand alone version

  1. Download and install RocketMQ [reference] 2.1.2]

  2. Modify RocketMQ default memory size [reference] 2.2]

  3. Start NameServer service [reference] 2.2]

  4. Start the broker service [reference] 2.2]

  5. Verify that the service is started: jps

    [the external chain image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-ntgc5huh-1625476775582) (C: \ users \ Laisheng \ appdata \ roaming \ typora user images \ 1625448615536. PNG)]

2.1.2 cluster version

Four cluster modes: the operation principle is the same

Here is only a common example: multiple Master and Slave nodes - asynchronous replication

  1. Prepare two machines [192.168.1.1192.168.1.2], install the stand-alone version, and deploy all environments

    A Broker of Master role and Slave role should be started on each machine, and they are the Master and backup of each other

    1. That is, start the master node of broker-a and the slave node of broker-b-s on machine A;
    2. Start the master node of broker-b and the slave node of broker-a-s on machine B.
  2. Modify profile

    Examples of several cluster mode configuration files are provided in the conf directory

    1. 2m noslave: dual master mode;

      1. 2m-2s-sync: dual master dual slave synchronous dual write mode;
      2. 2m-2s-async: dual master and dual slave asynchronous replication mode;
    • Edit the configuration file broker-a.properties of the Master Broker on the 192.168.1.1 machine

      # Licensed to the Apache Software Foundation (ASF) under one or more
      # contributor license agreements.  See the NOTICE file distributed with
      # this work for additional information regarding copyright ownership.
      # The ASF licenses this file to You under the Apache License, Version 2.0
      # (the "License"); you may not use this file except in compliance with
      # the License.  You may obtain a copy of the License at
      #
      #     http://www.apache.org/licenses/LICENSE-2.0
      #
      # Unless required by applicable law or agreed to in writing, software
      # distributed under the License is distributed on an "AS IS" BASIS,
      # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      # See the License for the specific language governing permissions and
      # limitations under the License.
      #Cluster name
      brokerClusterName=rocketmq-cluster
      #Broker name. Note that different configuration files here are filled in differently. For example, write broker-a in the a.properties file and broker-b in the b.properties file
      brokerName=broker-a
      #0 means Master, > 0 means Slave
      brokerId=0
      #Delete file time point. The default is 4 a.m
      deleteWhen=04
      #File retention time, 48 hours by default
      fileReservedTime=120
      #Role of Broker, ASYNC_MASTER = asynchronous replication master, SYNC_MASTER = synchronous double write master, SLAVE=slave node
      brokerRole=ASYNC_MASTER
      #Disc brushing mode, ASYNC_FLUSH = asynchronous disk brushing, SYNC_FLUSH = synchronous brush disc 
      flushDiskType=SYNC_FLUSH
      #Listening port of Broker external service
      listenPort=10911
      #Nameserver address, where nameserver is a single server. If nameserver is a cluster of multiple servers, it is divided by semicolons (i.e. namesrvAddr=ip1:port1;ip2:port2;ip3:port3)
      namesrvAddr=192.168.1.1:9876;192.168.1.2:9876
      #The number of queues corresponding to each topic is 4 by default. In fact, the number of consumer instances should be referred to. If the value is too small, it is not conducive to consumer load balancing
      defaultTopicQueueNums=8
      #Whether to allow the Broker to automatically create a Topic. It is recommended to close the production
      autoCreateTopicEnable=true
      #Whether to allow the Broker to automatically create subscription groups. It is recommended to close production
      autoCreateSubscriptionGroup=true
      #Set broker IP
      brokerIP1=192.168.1.1
      #Storage path
      storePathRootDir=/data/rocketmq-all-4.7.0-bin-release/data/store-a
      #commitLog storage path
      storePathCommitLog=/data/rocketmq-all-4.7.0-bin-release/data/store-a/commitlog
      #Consumption queue storage path storage path
      storePathConsumerQueue=/data/rocketmq-all-4.7.0-bin-release/data/store-a/consumequeue
      #Message index storage path
      storePathIndex=/data/rocketmq-all-4.7.0-bin-release/data/store-a/index
      #checkpoint file storage path
      storeCheckpoint=/data/rocketmq-all-4.7.0-bin-release/data/store-a/checkpoint
      #abort file storage path
      abortFile=/data/rocketmq-all-4.7.0-bin-release/data/store-a/abort
      #commitLog the default size of each file is 1G
      mapedFileSizeCommitLog=1073741824
      #ConsumeQueue stores 30W files by default, which can be adjusted according to business conditions
      mapedFileSizeConsumeQueue=300000
      
    • Edit the configuration file broker-b-s.properties of the Master Broker on the 192.168.1.1 machine

      # Licensed to the Apache Software Foundation (ASF) under one or more
      # contributor license agreements.  See the NOTICE file distributed with
      # this work for additional information regarding copyright ownership.
      # The ASF licenses this file to You under the Apache License, Version 2.0
      # (the "License"); you may not use this file except in compliance with
      # the License.  You may obtain a copy of the License at
      #
      #     http://www.apache.org/licenses/LICENSE-2.0
      #
      # Unless required by applicable law or agreed to in writing, software
      # distributed under the License is distributed on an "AS IS" BASIS,
      # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      # See the License for the specific language governing permissions and
      # limitations under the License.
      #Cluster name
      brokerClusterName=rocketmq-cluster
      #The name of the broker. Note that different configuration files here are filled in differently. For example, write broker-a in the a.properties file and broker-b in the b.properties file
      brokerName=broker-b
      #0 means Master, > 0 means Slave
      brokerId=1
      #Delete file time point. The default is 4 a.m
      deleteWhen=04
      #File retention time, 48 hours by default
      fileReservedTime=120
      #Role of Broker, ASYNC_MASTER = asynchronous replication master, SYNC_MASTER = synchronous double write master, SLAVE=slave node
      brokerRole=SLAVE
      #Disc brushing mode, ASYNC_FLUSH = asynchronous disk brushing, SYNC_FLUSH = synchronous brush disc 
      flushDiskType=SYNC_FLUSH
      #Listening port of Broker external service
      listenPort=11011
      #Nameserver address, where nameserver is a single server. If nameserver is a cluster of multiple servers, it is divided by semicolons (i.e. namesrvAddr=ip1:port1;ip2:port2;ip3:port3)
      namesrvAddr=192.168.1.1:9876;192.168.1.2:9876
      #The number of queues corresponding to each topic is 4 by default. In fact, the number of consumer instances should be referred to. If the value is too small, it is not conducive to consumer load balancing
      defaultTopicQueueNums=8
      #Whether to allow the Broker to automatically create a Topic. It is recommended to close the production
      autoCreateTopicEnable=true
      #Whether to allow the Broker to automatically create subscription groups. Production is recommended to close
      autoCreateSubscriptionGroup=true
      #Set broker IP
      brokerIP1=192.168.1.1
      #Storage path
      storePathRootDir=/data/rocketmq-all-4.7.0-bin-release/data/store-b
      #commitLog storage path
      storePathCommitLog=/data/rocketmq-all-4.7.0-bin-release/data/store-b/commitlog
      #Consumption queue storage path storage path
      storePathConsumerQueue=/data/rocketmq-all-4.7.0-bin-release/data/store-b/consumequeue
      #Message index storage path
      storePathIndex=/data/rocketmq-all-4.7.0-bin-release/data/store-b/index
      #checkpoint file storage path
      storeCheckpoint=/data/rocketmq-all-4.7.0-bin-release/data/store-b/checkpoint
      #abort file storage path
      abortFile=/data/rocketmq-all-4.7.0-bin-release/data/store-b/abort
      #commitLog the default size of each file is 1G
      mapedFileSizeCommitLog=1073741824
      #ConsumeQueue stores 30W files by default, which can be adjusted according to business conditions
      mapedFileSizeConsumeQueue=300000
      
    • Edit the configuration file broker-b.properties of the Master Broker on the 192.168.1.2 machine

      # Licensed to the Apache Software Foundation (ASF) under one or more
      # contributor license agreements.  See the NOTICE file distributed with
      # this work for additional information regarding copyright ownership.
      # The ASF licenses this file to You under the Apache License, Version 2.0
      # (the "License"); you may not use this file except in compliance with
      # the License.  You may obtain a copy of the License at
      #
      #     http://www.apache.org/licenses/LICENSE-2.0
      #
      # Unless required by applicable law or agreed to in writing, software
      # distributed under the License is distributed on an "AS IS" BASIS,
      # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      # See the License for the specific language governing permissions and
      # limitations under the License.
      #Cluster name
      brokerClusterName=rocketmq-cluster
      #The name of the broker. Note that different configuration files here are filled in differently. For example, write broker-a in the a.properties file and broker-b in the b.properties file
      brokerName=broker-b
      #0 means Master, > 0 means Slave
      brokerId=0
      #Delete file time point. The default is 4 a.m
      deleteWhen=04
      #File retention time, 48 hours by default
      fileReservedTime=120
      #Role of Broker, ASYNC_MASTER = asynchronous replication master, SYNC_MASTER = synchronous double write master, SLAVE=slave node
      brokerRole=ASYNC_MASTER
      #Disc brushing mode, ASYNC_FLUSH = asynchronous disk brushing, SYNC_FLUSH = synchronous brush disc 
      flushDiskType=SYNC_FLUSH
      #Listening port of Broker external service
      listenPort=10911
      #Nameserver address, where nameserver is a single server. If nameserver is a cluster of multiple servers, it is divided by semicolons (i.e. namesrvAddr=ip1:port1;ip2:port2;ip3:port3)
      namesrvAddr=192.168.1.1:9876;192.168.1.2:9876
      #The number of queues corresponding to each topic is 4 by default. In fact, the number of consumer instances should be referred to. If the value is too small, it is not conducive to consumer load balancing
      defaultTopicQueueNums=8
      #Whether to allow the Broker to automatically create a Topic. It is recommended to close the production
      autoCreateTopicEnable=true
      #Whether to allow the Broker to automatically create subscription groups. Production is recommended to close
      autoCreateSubscriptionGroup=true
      #Set broker IP
      brokerIP1=192.168.1.2
      #Storage path
      storePathRootDir=/data/rocketmq-all-4.7.0-bin-release/data/store-b
      #commitLog storage path
      storePathCommitLog=/data/rocketmq-all-4.7.0-bin-release/data/store-b/commitlog
      #Consumption queue storage path storage path
      storePathConsumerQueue=/data/rocketmq-all-4.7.0-bin-release/data/store-b/consumequeue
      #Message index storage path
      storePathIndex=/data/rocketmq-all-4.7.0-bin-release/data/store-b/index
      #checkpoint file storage path
      storeCheckpoint=/data/rocketmq-all-4.7.0-bin-release/data/store-b/checkpoint
      #abort file storage path
      abortFile=/data/rocketmq-all-4.7.0-bin-release/data/store-b/abort
      #commitLog the default size of each file is 1G
      mapedFileSizeCommitLog=1073741824
      #ConsumeQueue stores 30W files by default, which can be adjusted according to business conditions
      mapedFileSizeConsumeQueue=300000
      
    • Edit the configuration file broker-a-s.properties of the Master Broker on the 192.168.1.2 machine

      # Licensed to the Apache Software Foundation (ASF) under one or more
      # contributor license agreements.  See the NOTICE file distributed with
      # this work for additional information regarding copyright ownership.
      # The ASF licenses this file to You under the Apache License, Version 2.0
      # (the "License"); you may not use this file except in compliance with
      # the License.  You may obtain a copy of the License at
      #
      #     http://www.apache.org/licenses/LICENSE-2.0
      #
      # Unless required by applicable law or agreed to in writing, software
      # distributed under the License is distributed on an "AS IS" BASIS,
      # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      # See the License for the specific language governing permissions and
      # limitations under the License.
      #Cluster name
      brokerClusterName=rocketmq-cluster
      #The name of the broker. Note that different configuration files here are filled in differently. For example, write broker-a in the a.properties file and broker-b in the b.properties file
      brokerName=broker-a
      #0 means Master, > 0 means Slave
      brokerId=1
      #Delete file time point. The default is 4 a.m
      deleteWhen=04
      #File retention time, 48 hours by default
      fileReservedTime=120
      #Role of Broker, ASYNC_MASTER = asynchronous replication master, SYNC_MASTER = synchronous double write master, SLAVE=slave node
      brokerRole=SLAVE
      #Disc brushing mode, ASYNC_FLUSH = asynchronous disk brushing, SYNC_FLUSH = synchronous brush disc 
      flushDiskType=SYNC_FLUSH
      #Listening port of Broker external service
      listenPort=11011
      #Nameserver address, where nameserver is a single server. If nameserver is a cluster of multiple servers, it is divided by semicolons (i.e. namesrvAddr=ip1:port1;ip2:port2;ip3:port3)
      namesrvAddr=192.168.1.1:9876;192.168.1.2:9876
      #The number of queues corresponding to each topic is 4 by default. In fact, the number of consumer instances should be referred to. If the value is too small, it is not conducive to consumer load balancing
      defaultTopicQueueNums=8
      #Whether to allow the Broker to automatically create a Topic. It is recommended to close the production
      autoCreateTopicEnable=true
      #Whether to allow the Broker to automatically create subscription groups. Production is recommended to close
      autoCreateSubscriptionGroup=true
      #Set broker IP
      brokerIP1=192.168.1.2
      #Storage path
      storePathRootDir=/data/rocketmq-all-4.7.0-bin-release/data/store-a
      #commitLog storage path
      storePathCommitLog=/data/rocketmq-all-4.7.0-bin-release/data/store-a/commitlog
      #Consumption queue storage path storage path
      storePathConsumerQueue=/data/rocketmq-all-4.7.0-bin-release/data/store-a/consumequeue
      #Message index storage path
      storePathIndex=/data/rocketmq-all-4.7.0-bin-release/data/store-a/index
      #checkpoint file storage path
      storeCheckpoint=/data/rocketmq-all-4.7.0-bin-release/data/store-a/checkpoint
      #abort file storage path
      abortFile=/data/rocketmq-all-4.7.0-bin-release/data/store-a/abort
      #commitLog the default size of each file is 1G
      mapedFileSizeCommitLog=1073741824
      #ConsumeQueue stores 30W files by default, which can be adjusted according to business conditions
      mapedFileSizeConsumeQueue=300000
      
  3. Start the Broker service

    • Start 192.168.1.1 main Broker service

      • nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a.properties & 
        
    • Start 192.168.1.2 main Broker service

      • nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b.properties &
        
    • Start 192.168.1.2 standby Broker service

      • nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a-s.properties &
        
    • Start 192.168.1.1 standby Broker service

      • nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b-s.properties &
        
  4. Installing the rocketmq console

    1. Download and package rocketmq console download address: rocketmq-console

    2. Modify configuration file: rocketmq config. namesrvAddr=‘192.168.1.1:9876; 192.168.1.2:9876’

    3. Start app Java no exception access: http://localhost:8080

      [the external chain image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-kecxcojg-1625476775584) (C: \ users \ Laisheng \ appdata \ roaming \ typora user images \ 1625450979077. PNG)]

2.2 code integration

2.2.1 simple example

 <!--Add dependency-->
<dependency>
   <groupId>org.apache.rocketmq</groupId>
   <artifactId>rocketmq-client</artifactId>
   <version>4.3.0</version>
</dependency>
  • Send message synchronously, send message asynchronously, send message unidirectionally

    package com;
    
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendCallback;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    import org.junit.Test;
    
    import java.io.UnsupportedEncodingException;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author Laisheng
     * @version 1.0
     * @date 2021-07-05
     * @className RocketMQSimplenessTest
     * @description rockerMq Simple example API
     **/
    public class RocketMQSimplenessTest {
    
        /**
         * Send messages synchronously
         * Reference scenario: notification message, SMS notification, SMS marketing system
         */
        @Test
        public void syncSendMsg() throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
            // 1. Instantiate with producer group name
            DefaultMQProducer producer = new DefaultMQProducer("defaultProducer");
            // 2. Specify the server address
            producer.setNamesrvAddr("192.168.1.20:9876");
            // 3. Start the instance
            producer.start();
            for (int i = 0; i < 50; i++){
                Message msg = new Message("TopicTest" /* Topic theme */,
                        "TagA" /* Tag edition*/,
                        ("Hello RocketMQ " +
                                i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body Message subject*/
                );
                // Call send message to deliver the message to one of the agents
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            }
            // Once the producer instance is no longer in use, close it
            producer.shutdown();
        }
    
        /**
         * Send message asynchronously
         * Usage scenario: generally used for business scenarios sensitive to response time
         */
        @Test
        public void asyncSendMsg() throws MQClientException, InterruptedException {
            // 1. Instantiate with producer group name
            DefaultMQProducer producer = new DefaultMQProducer("defaultProducer");
            // 2. Specify the server address
            producer.setNamesrvAddr("192.168.1.20:9876");
            // 3. Start the instance
            producer.start();
            // 4. In asynchronous mode, the maximum number of retries performed internally before declaring the sending failure
            producer.setRetryTimesWhenSendAsyncFailed(0);
            int messageCount = 50;
            // 5. Count initialization
            final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
            for (int i = 0; i < messageCount; i++) {
                try {
                    final int index = i;
                    Message msg = new Message("TopicTest",
                            "TagA",
                            "OrderID188",
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    // Send message SendCallback: callback executed when sending is completed, successful or unsuccessful
                    producer.send(msg, new SendCallback() {
                        // success
                        @Override
                        public void onSuccess(SendResult sendResult) {
                            // Decrements the count of the latch and releases all waiting threads if the count reaches zero
                            countDownLatch.countDown();
                            System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                        }
                        // fail
                        @Override
                        public void onException(Throwable e) {
                            // Decrements the count of the latch and releases all waiting threads if the count reaches zero
                            countDownLatch.countDown();
                            System.out.printf("%-10d Exception %s %n", index, e);
                            e.printStackTrace();
                        }
                    });
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            // Wait for the child thread to finish running
            countDownLatch.await(5, TimeUnit.SECONDS);
            // Once the producer instance is no longer in use, close it
            producer.shutdown();
        }
    
        /**
         * Single send message
         * Usage scenario: log collection.
         */
        @Test
        public void oneWaySendMsg() throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {
            // 1. Instantiate with producer group name
            DefaultMQProducer producer = new DefaultMQProducer("defaultProducer");
            // 2. Specify the server address
            producer.setNamesrvAddr("192.168.1.20:9876");
            // 3. Start the instance
            producer.start();
            for (int i = 0; i < 100; i++) {
                // Create a message instance that specifies the subject, tag, and message body
                Message msg = new Message("TopicTest" /* Topic theme*/,
                        "TagA" /* Tag edition*/,
                        ("Hello RocketMQ " +
                                i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body Message body*/
                );
                // Call send message to deliver the message to one of the agents.
                producer.sendOneway(msg);
            }
            // 4. Wait for sending to complete
            Thread.sleep(5000);
            producer.shutdown();
        }
    }
    
  • Consumption news

    package com;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    /**
     * @author Laisheng
     * @date 2021/7/5 0005
     * @className Consumer
     * @description Consumption news
     * @version 1.0
     **/
    public class Consumer {
    
        public static void main(String[] args) throws InterruptedException, MQClientException {
            // 1. Instantiate with the specified user group name
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("defaultProducer");
            // 2. Specify the server address
            consumer.setNamesrvAddr("192.168.1.20:9876");
            // 3. Subscribe to a topic to use
            consumer.subscribe("TopicTest", "*");
            // 4. Register the callback to execute when the message obtained from the agent arrives
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msg, ConsumeConcurrentlyContext context) {
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            //5. Start the user instance
            consumer.start();
            System.out.printf("Consumer Started.%n");
        }
    }
    

2.2.2 order example

  • Message push using global and partition sorting

    package com.producer;
    
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.MQProducer;
    import org.apache.rocketmq.client.producer.MessageQueueSelector;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageQueue;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    import java.io.UnsupportedEncodingException;
    import java.util.List;
    
    /**
     * @author Laisheng
     * @version 1.0
     * @date 2021-07-05
     * @className OrderProducer
     * @description Message push using global and partition sorting
     **/
    public class OrderProducer {
    
        public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
            // 1. Instantiate with producer group name
            DefaultMQProducer producer = new DefaultMQProducer("defaultProducer");
            // 2. Specify the server address
            producer.setNamesrvAddr("192.168.1.20:9876");
            // 2. Start the instance
            producer.start();
            // 3. Define zoning
            String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
            for (int i = 0; i < 50; i++) {
                int orderId = i % 10;
                // Create a message instance that specifies the subject, tag, and message body
                Message msg = new Message("TopicOrderTest", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                /**
                 *  @param msg Message to send
                 *  @param selector Message queue selector, through which we get the target message queue to deliver messages
                 *  @param arg Parameters used with message queue selectors
                 */
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);
                System.out.printf("%s%n", sendResult);
            }
            // 4. Once the producer instance is no longer in use, close it
            producer.shutdown();
        }
    }
    
  • Subscribe to messages

    package com.consumer;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    import java.util.concurrent.atomic.AtomicLong;
    
    /**
     * @author Laisheng
     * @version 1.0
     * @date 2021-07-05
     * @className OrderedConsumer
     * @description Subscribe to messages
     **/
    public class OrderedConsumer {
    
        public static void main(String[] args) throws Exception {
    
            // 1. Instantiate with producer group name
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("defaultProducer");
            // 2. Specify the server address
            consumer.setNamesrvAddr("192.168.1.20:9876");
            // 3. The consumer starts the consumer point program and obtains data from the message queue header for the first time 
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            // 4. Subscribe to topics to consume subscriptions
            consumer.subscribe("TopicOrderTest", "TagA || TagC || TagD");
            // 5. Monitoring consumption
            consumer.registerMessageListener(new MessageListenerOrderly() {
                AtomicLong consumeTimes = new AtomicLong(0);
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msg, ConsumeOrderlyContext context) {
                    context.setAutoCommit(false);
                    System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msg + "%n");
                    this.consumeTimes.incrementAndGet();
                    if ((this.consumeTimes.get() % 2) == 0) {
                        return ConsumeOrderlyStatus.SUCCESS;
                    }
                    if ((this.consumeTimes.get() % 3) == 0) {
                        return ConsumeOrderlyStatus.ROLLBACK;
                    }
                    if ((this.consumeTimes.get() % 4) == 0) {
                        return ConsumeOrderlyStatus.COMMIT;
                    }
                    if ((this.consumeTimes.get() % 5) == 0) {
                        context.setSuspendCurrentQueueTimeMillis(3000);
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });
            consumer.start();
            System.out.printf("Consumer Started.%n");
        }
    }
    

2.2.3 broadcast example

Broadcast: Send a message to all subscribers [consumers] subscribing to the topic

  • Producer example

    package com.producer;
    
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    /**
     * @author Laisheng
     * @date 2021/7/5 0005
     * @className BroadcastProducer
     * @description radio broadcast
     * @version 1.0
     **/
    public class BroadcastProducer {
        public static void main(String[] args) throws Exception {
            DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
            // 2. Specify the server address
            producer.setNamesrvAddr("192.168.1.20:9876");
            producer.start();
            for (int i = 0; i < 50; i++) {
                Message msg = new Message("TopicGroupTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            }
            producer.shutdown();
        }
    }
    
  • Consumer example

    package com.consumer;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
    
    import java.util.List;
    
    /**
     * @author Laisheng
     * @date 2021/7/5 0005
     * @className BroadcastConsumer
     * @description radio broadcast
     * @version 1.0
     **/
    public class BroadcastConsumer {
        public static void main(String[] args) throws Exception {
            // 1. Instantiate with producer group name
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ProducerGroupName");
            // 2. Specify the server address
            consumer.setNamesrvAddr("192.168.1.20:9876");
            // 3. The consumer starts the consumer point program and obtains data from the message queue header for the first time
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            // 4. Set to broadcast mode
            consumer.setMessageModel(MessageModel.BROADCASTING);
            // 5. Subscribe to topics to consume subscriptions
            consumer.subscribe("TopicGroupTest", "TagA || TagC || TagD");
            // 6. Monitoring consumption
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msg,ConsumeConcurrentlyContext context) {
                    System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msg + "%n");
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
            System.out.printf("Broadcast Consumer Started.%n");
        }
    }
    

2.2.4 schedule example

Scheduled messages: scheduled messages differ from normal messages in that they are not delivered until after a specified time.

  • Start the consumer waiting for incoming subscription messages

    package com.consumer;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    /**
     * @author Laisheng
     * @date 2021/7/5 0005
     * @className ScheduledMessageConsumer
     * @description Start the consumer waiting for incoming subscription messages
     * @version 1.0
     **/
    public class ScheduledMessageConsumer {
        
         public static void main(String[] args) throws Exception {
             // 1. Instantiate with producer group name
             DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
             // 2. Specify the server address
             consumer.setNamesrvAddr("192.168.1.20:9876");
             // 3. Subscribe to a topic to use
             consumer.subscribe("TestExampleTopic", "*");
             // 4. Register message listener
             consumer.registerMessageListener(new MessageListenerConcurrently() {
                 @Override
                 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                     for (MessageExt message : messages) {
                         // Print approximate delay period
                         System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
                                 + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
                     }
                     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                 }
             });
             // Start consumer
             consumer.start();
         }
     }
    
  • Send scheduled message

    package com.consumer;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    /**
     * @author Laisheng
     * @date 2021/7/5 0005
     * @className ScheduledMessageConsumer
     * @description Start the consumer waiting for incoming subscription messages
     * @version 1.0
     **/
    public class ScheduledMessageConsumer {
        
         public static void main(String[] args) throws Exception {
             // 1. Instantiate with producer group name
             DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
             // 2. Specify the server address
             consumer.setNamesrvAddr("192.168.1.20:9876");
             // 3. Subscribe to a topic to use
             consumer.subscribe("TestExampleTopic", "*");
             // 4. Register message listener
             consumer.registerMessageListener(new MessageListenerConcurrently() {
                 @Override
                 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                     for (MessageExt message : messages) {
                         // Print approximate delay period
                         System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
                                 + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
                     }
                     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                 }
             });
             // Start consumer
             consumer.start();
         }
     }
    

2.2.5 batch processing example

Why batch processing?

  1. Sending messages in bulk improves the performance of delivering small messages.

Use restrictions:

Messages in the same batch should have the same subject, the same waitStoreMsgOK, and no scheduling support.

In addition, the total size of a batch of messages should not exceed 1MiB.

  • Batch send message [message size not exceeding 1MiB]

    package com.producer;
    
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.common.message.Message;
    
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * @author Laisheng
     * @version 1.0
     * @date 2021-07-05
     * @className BatchingProducer
     * @description Batch send message
     **/
    public class BatchingProducer {
    
        public static void main(String[] args) throws MQClientException {
            DefaultMQProducer producer = new DefaultMQProducer("ProducerBatchingName");
            // 2. Specify the server address
            producer.setNamesrvAddr("192.168.1.20:9876");
            producer.start();
            String topic = "BatchTestTopic";
            List<Message> messages = new ArrayList<>();
            messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
            messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
            messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
            try {
                producer.send(messages);
            } catch (Exception e) {
               throw new RuntimeException("Send message failed");
            }finally {
                producer.shutdown();
            }
        }
    }
    
  • Batch send message [message size exceeds 1MiB]

    package com.producer;
    
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.common.message.Message;
    
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Map;
    
    /**
     * @author Laisheng
     * @version 1.0
     * @date 2021-07-05
     * @className BroadcastsProducer
     * @description Batch send message [message size exceeds 1MiB]
     **/
    public class BroadcastsProducer implements Iterator<List<Message>> {
    
        /**
         * Page size
         */
        private final int SIZE_LIMIT = 1000 * 1000;
    
        /**
         * Message collection
         */
        private final List<Message> messages;
    
        /**
         * Current index
         */
        private int currIndex;
    
        public BroadcastsProducer(List<Message> messages) {
            this.messages = messages;
        }
    
        /**
         *Detects whether there are elements in the collection
         * @return
         */
        @Override
        public boolean hasNext() {
            return currIndex < messages.size();
        }
    
        /**
         * Returns the next element of the iterator
         * @return
         */
        @Override
        public List<Message> next() {
            // Current index
            int nextIndex = currIndex;
            for (int totalSize = 0; nextIndex < messages.size(); nextIndex++) {
                // Get current message
                Message message = messages.get(nextIndex);
                // Subject length + message body length
                int tmpSize = message.getTopic().length() + message.getBody().length;
                // Gets the configuration information of the current message
                Map<String, String> properties = message.getProperties();
                // Get the total length of key+value
                for (Map.Entry<String, String> entry : properties.entrySet()) {
                    tmpSize += entry.getKey().length() + entry.getValue().length();
                }
                // For log overhead
                tmpSize = tmpSize + 20;
                if (tmpSize > SIZE_LIMIT) {
                    // Single message exceeds SIZE_LIMIT
                    // Let it go here, or it will stop the division process
                    if (nextIndex - currIndex == 0) {
                        // If there is no element in the next sub list, add this and interrupt, otherwise interrupt
                        nextIndex++;
                    }
                    break;
                }
                if (tmpSize + totalSize > SIZE_LIMIT) {
                    break;
                } else {
                    totalSize += tmpSize;
                }
    
            }
            // Split message
            List<Message> subList = messages.subList(currIndex, nextIndex);
            currIndex = nextIndex;
            return subList;
        }
        
        public static void main(String[] args) throws MQClientException {
            DefaultMQProducer producer = new DefaultMQProducer("ProducerBatchingName");
            // 2. Specify the server address
            producer.setNamesrvAddr("192.168.1.20:9876");
            producer.start();
            String topic = "BatchTestTopic";
            List<Message> messages = new ArrayList<>();
            messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
            messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
            messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
            // Divide the big list into small lists
            BroadcastsProducer splitter = new BroadcastsProducer(messages);
            while (splitter.hasNext()) {
                try {
                    List<Message> listItem = splitter.next();
                    producer.send(listItem);
                } catch (Exception e) {
                    throw new RuntimeException("Send message failed");
                }finally {
                    producer.shutdown();
                }
            }
        }
    }
    

2.2.6 filter example

In most cases, tags are a simple and useful design for selecting the messages you want. For example:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.setNamesrvAddr("192.168.1.20:9876");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

The consumer will receive a message containing TAGA or TAGB or TAGC. However, the limitation is that a message can only have one label, which may not be applicable to complex scenarios. In this case, you can use SQL expressions to filter messages.

principle

The SQL function can perform some calculations through the properties you enter when sending a message. Under the syntax defined by RocketMQ, you can implement some interesting logic. Here is an example:

grammar

RocketMQ only defines some basic syntax to support this feature. You can also easily expand it.

  1. Numerical comparison, such as >, > =, <, < =, between, =;
  2. Character comparison, such as =, < >, in;
  3. IS NULL or IS NOT NULL;
  4. Logical AND, OR, NOT;

Constant types are:

  1. Numbers, such as 123, 3.1415;
  2. Characters, such as' abc ', must be caused by single quotation marks;
  3. NULL, special constant;
  4. Boolean, TRUE or FALSE;

Use restrictions

Only push consumers can select messages through SQL92. [consumer.subscribe(“TOPIC”, “TAGA || TAGB || TAGC”)]

  • Producer example

    package com.producer;
    
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    import java.io.UnsupportedEncodingException;
    
    /**
     * @author Laisheng
     * @version 1.0
     * @date 2021-07-05
     * @className FiltrationProducer
     * @description filter
     **/
    public class FiltrationProducer {
    
        public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException {
            // 1. Instantiate with producer group name
            DefaultMQProducer producer = new DefaultMQProducer("ProducerFiltrationName");
            // 2. Specify the server address
            producer.setNamesrvAddr("192.168.1.20:9876");
            // 3. Start
            producer.start();
            for (int i = 0; i < 50; i++) {
                Message msg = new Message("TopicFiltrationTest",
                        "TagA",
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
                );
                // Set some properties.
                msg.putUserProperty("a", String.valueOf(i));
                SendResult sendResult = producer.send(msg);
            }
            producer.shutdown();
        }
    }
    
  • Consumer example

    package com.consumer;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.MessageSelector;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    /**
     * @author Laisheng
     * @version 1.0
     * @date 2021-07-05
     * @className FiltrationConsumer
     * @description filter
     **/
    public class FiltrationConsumer {
    
        public static void main(String[] args) throws MQClientException {
            //  1. Instantiate with producer group name
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ProducerFiltrationName");
            // 2. Specify the server address
            consumer.setNamesrvAddr("192.168.1.20:9876");
            // 3. Only subscription messages have attributes a, also a > = 0 and a < = 3
            consumer.subscribe("TopicFiltrationTest", MessageSelector.bySql("a between 0 and 3"));
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msg, ConsumeConcurrentlyContext context) {
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start(); 
        }
    
    }
    

2.2.7 example of log add-on program

RocketMQ logappender provides log4j appender, log4j2 appender and logback appender for business use

  • Use the log4j property to configure the file
log4j.appender.mq=org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender
log4j.appender.mq.Tag=yourTag
log4j.appender.mq.Topic=yourLogTopic
log4j.appender.mq.ProducerGroup=yourLogGroup
log4j.appender.mq.NameServerAddress=yourRocketmqNameserverAddress
log4j.appender.mq.layout=org.apache.log4j.PatternLayout
log4j.appender.mq.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-4r [%t] (%F:%L) %-5p - %m%n
  • When using log4j xml configuration file, configure it as follows and add an asynchronous add-on:
<appender name="mqAppender1" class="org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender">
    <param name="Tag" value="yourTag" />
    <param name="Topic" value="yourLogTopic" />
    <param name="ProducerGroup" value="yourLogGroup" />
    <param name="NameServerAddress" value="yourRocketmqNameserverAddress"/>
    <layout class="org.apache.log4j.PatternLayout">
        <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss}-%p %t %c - %m%n" />
    </layout>
</appender>

<appender name="mqAsyncAppender1" class="org.apache.log4j.AsyncAppender">
    <param name="BufferSize" value="1024" />
    <param name="Blocking" value="false" />
    <appender-ref ref="mqAppender1"/>
</appender>
  • When log4j2 is used, the configuration is as follows
<!-- If you want no blocks, just ref Configure a asyncAppender -->
<RocketMQ name="rocketmqAppender" producerGroup="yourLogGroup" nameServerAddress="yourRocketmqNameserverAddress"
     topic="yourLogTopic" tag="yourTag">
    <PatternLayout pattern="%d [%p] hahahah %c %m%n"/>
</RocketMQ>
  • When using logback, you also need an asyncAppender
<appender name="mqAppender1" class="org.apache.rocketmq.logappender.logback.RocketmqLogbackAppender">
    <tag>yourTag</tag>
    <topic>yourLogTopic</topic>
    <producerGroup>yourLogGroup</producerGroup>
    <nameServerAddress>yourRocketmqNameserverAddress</nameServerAddress>
    <layout>
        <pattern>%date %p %t - %m%n</pattern>
    </layout>
</appender>

<appender name="mqAsyncAppender1" class="ch.qos.logback.classic.AsyncAppender">
    <queueSize>1024</queueSize>
    <discardingThreshold>80</discardingThreshold>
    <maxFlushTime>2000</maxFlushTime>
    <neverBlock>true</neverBlock>
    <appender-ref ref="mqAppender1"/>
</appender>

2.2.8 openmessaging example

OpenMessaging , including the establishment of industry guidelines and messaging and flow specifications to provide a common framework for the fields of finance, e-commerce, Internet of things and big data. The design principle is cloud oriented, simple, flexible and language independent in distributed heterogeneous environment. Compliance with these specifications will make it possible to develop heterogeneous messaging applications across all major platforms and operating systems.

RocketMQ provides a partial implementation of OpenMessaging 0.1.0-alpha. The following example demonstrates how to access RocketMQ based on OpenMessaging.

  • OMS producer: the following example shows how to send messages to RocketMQ broker in synchronous, asynchronous or one-way transmission.
public class OMSProducer {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");

        final Producer producer = messagingAccessPoint.createProducer();

        messagingAccessPoint.startup();
        System.out.printf("MessagingAccessPoint startup OK%n");

        producer.startup();
        System.out.printf("Producer startup OK%n");

        {
            Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
            SendResult sendResult = producer.send(message);
            System.out.printf("Send sync message OK, msgId: %s%n", sendResult.messageId());
        }

        {
            final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
            result.addListener(new PromiseListener<SendResult>() {
                @Override
                public void operationCompleted(Promise<SendResult> promise) {
                    System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId());
                }

                @Override
                public void operationFailed(Promise<SendResult> promise) {
                    System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage());
                }
            });
        }

        {
            producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
            System.out.printf("Send oneway message OK%n");
        }

        producer.shutdown();
        messagingAccessPoint.shutdown();
    }
}
  • OMSP pull consumer: use OMS PullConsumer to poll messages from the specified queue.
public class OMSPullConsumer {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");

        final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
            OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));

        messagingAccessPoint.startup();
        System.out.printf("MessagingAccessPoint startup OK%n");
        
        consumer.startup();
        System.out.printf("Consumer startup OK%n");

        Message message = consumer.poll();
        if (message != null) {
            String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
            System.out.printf("Received one message: %s%n", msgId);
            consumer.ack(msgId);
        }

        consumer.shutdown();
        messagingAccessPoint.shutdown();
    }
}

2.2.9 transaction example [transaction]

What are transactional messages?

It can be considered as the implementation of two-stage submission message to ensure the final consistency in distributed system. Transactional messages ensure that the execution of local transactions and the sending of messages can be performed atomically.

Use restrictions

  1. Transactional messages do not have scheduling and batch support.
  2. In order to avoid the accumulation of semi queue messages caused by too many times of checking a single message, we limit the number of times of checking a single message to 15 by default, but users can change the "transactionCheckMax" parameter. In the broker configuration, if a message is checked "transactionCheckMax" times, the broker will discard the message by default and print the error log at the same time. Users can change this behavior by overriding the AbstractTransactionCheckListener class.
  3. The transaction message will be checked after a certain time, which is determined by the parameter "transactionTimeout" in the agent configuration. In addition, the user can also change this limit by setting the user attribute "check_integrity_time_in_seconds" when sending transaction messages. This parameter takes precedence over the "transactionMsgTimeout" parameter.
  4. A transaction message may be checked or consumed more than once.
  5. Message replies submitted to the user's target topic may fail. At present, this depends on logging. High availability is guaranteed by RocketMQ's own high availability mechanism. If you want to ensure that transaction messages are not lost and transaction integrity, it is recommended to use synchronous double write mechanism.
  6. The producer ID of a transactional message cannot be shared with the producer ID of other types of messages. Unlike other types of messages, transactional messages allow backward queries. MQ Server queries the client through the producer ID.

application

1, Transaction status

Transaction messages have three states:

  • TransactionStatus.CommitTransaction: commit a transaction, indicating that the consumer is allowed to consume this message.
  • TransactionStatus.RollbackTransaction: rolls back the transaction, indicating that the message will be deleted and consumption is not allowed.
  • TransactionStatus.Unknown: intermediate status, indicating that MQ check back is required to determine the status.

2, Send transaction message

  1. Create transaction producer
    Use the TransactionMQProducer class to create a producer client and specify a unique producer group. You can set a custom thread pool to process check requests. After executing the local transaction, you need to reply to MQ according to the execution result, and the reply status is described in the previous section.
package com.producer;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author Laisheng
 * @date 2021/7/5 0005
 * @className TransactionListenerImpl
 * @description Transaction listening
 * @version 1.0
**/
public class TransactionListenerImpl implements TransactionListener {

 private AtomicInteger transactionIndex = new AtomicInteger(0);

 private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

 /**
     * This method is called to execute the local transaction when the transactional preparation (semi) message is sent successfully.
     * @param msg
     * @param arg
     * @return
  */
 @Override
 public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
     int value = transactionIndex.getAndIncrement();
     int status = value % 3;
     localTrans.put(msg.getTransactionId(), status);
     return LocalTransactionState.UNKNOW;
 }

 /**
     * When there is no response to the preparation (semi) message The broker will send a check message to check the transaction status, and call this method to obtain the local transaction status.
     * @param msg
     * @return
  */
 @Override
 public LocalTransactionState checkLocalTransaction(MessageExt msg) {
     Integer status = localTrans.get(msg.getTransactionId());
     if (null != status) {
         switch (status) {
             case 0:
                 return LocalTransactionState.UNKNOW;
             case 1:
                 return LocalTransactionState.COMMIT_MESSAGE;
             case 2:
                 return LocalTransactionState.ROLLBACK_MESSAGE;
         }
     }
     return LocalTransactionState.COMMIT_MESSAGE;
 }
}

package com.producer;


import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.io.UnsupportedEncodingException;
import java.util.concurrent.*;

/**
 * @author Laisheng
 * @version 1.0
 * @date 2021/7/5 0005
 * @className TransactionProducer
 * @description affair
**/
public class TransactionProducer {

 public static void main(String[] args) throws MQClientException, InterruptedException {
     // Define transaction listening
     TransactionListener transactionListener = new TransactionListenerImpl();
     // Specifies the constructor for the namespace, producer group, and RPC hook.
     TransactionMQProducer producer = new TransactionMQProducer("transactionMQProducer");
     // Specify the server address
     producer.setNamesrvAddr("192.168.1.20:9876");
     // Define thread pool
     ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
         @Override
         public Thread newThread(Runnable r) {
             Thread thread = new Thread(r);
             thread.setName("client-transaction-msg-check-thread");
             return thread;
         }
     });

     producer.setExecutorService(executorService);
     producer.setTransactionListener(transactionListener);
     producer.start();
     String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
     for (int i = 0; i < 10; i++) {
         try {
             Message msg =
                     new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                             ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
             SendResult sendResult = producer.sendMessageInTransaction(msg, null);
             System.out.printf("%s%n", sendResult);
             Thread.sleep(10);
         } catch (MQClientException | UnsupportedEncodingException e) {
             e.printStackTrace();
         }
     }
     for (int i = 0; i < 100000; i++) {
         Thread.sleep(1000);
     }
     producer.shutdown();
 }
}

2.2.10 common problems

usage

  1. Where does the newly created Consumer ID start consuming messages?
  • If the subject sends a message within three days, the consumer starts consuming the message from the first message saved in the server.
  • If the subject sends a message three days ago, the consumer consumes the message from the latest message in the server, in other words, from the end of the message queue.
  • If such a consumer restarts, it starts consuming messages from the last consumer location.
  1. How to re consume messages after consumption failure?
  • Cluster consumption mode consumption business logic code returns action Reconsumerlater, NULL, or throw an exception. If a message consumption fails, retry it up to 16 times, and then the message will be discarded.
  • Broadcast consumption mode broadcast consumption still ensures that the message is consumed at least once, but does not provide the option of resending.
  1. How to query failure messages when consumption fails?
  • Using query by time topic, you can query messages over a period of time.
  • Use Topic and Message Id to accurately query messages.
  • Using Topic and Message Key, you can accurately query a class of messages with the same Message Key.
  1. Is the message delivered only once?
  • RocketMQ ensures that all messages are delivered at least once. In most cases, messages do not repeat.
  1. How to add a new consumer?
  • Start a new agent and register it with the same list of name servers.
  • By default, only internal system themes and consumer groups are automatically created. If you want to have your business themes and consumer groups on the new node, copy them from the existing agent. Administrative tools and command lines are provided to deal with this problem.

Configuration related

  1. How long does the message stay on the server?
  • Stored messages will be saved for up to 3 days, and messages unused for more than 3 days will be deleted.
  1. What is the size limit of the message body?
  • Generally 256KB.
  1. How to set the number of consumer threads?
  • When starting the Consumer, set a consumethreadnum property, as shown in the following example
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(20);

error

  1. If you fail to start a producer or consumer and the error message is that the producer group or consumer is repeated?
  • Cause: using the same Producer/Consumer Group to start multiple Producer/Consumer instances in the same JVM may cause the client to fail to start.
  • Solution: ensure that the JVM corresponding to a Producer/Consumer Group only starts one Producer/Consumer instance.
  1. If the consumer fails to start loading json files in broadcast mode?
  • Reason: the fastjson version is too low to allow broadcast consumers to load local offsets JSON, causing the consumer to fail to start. A corrupted fastjson file can cause the same problem.
  • Solution: the fastjson version must be upgraded to the rocketmq client dependent version to ensure local offsets JSON can be loaded. By default, offsets The JSON file is in / home / {user} / rocketmq_ In offsets. Or check the integrity of fastjson.
  1. What is the impact of Broker downtime?
  • Primary Broker down: messages can no longer be sent to this proxy set, but if another proxy set is available, messages can still be sent even if there is a subject. News can still be consumed from slaves.
  • Standby Broker downtime: as long as there is another working slave, it will not affect the sending of messages. It also has no impact on consumption messages unless the consumer group is set to preferentially consume from the slave. By default, consumer groups consume from master.
  • Downtime of all standby brokers: sending messages to the master will not have any impact. However, if the master is SYNC_MASTER and producer will get a SLAVE_NOT_AVAILABLE indicates that the message is not sent to any slave. It also has no impact on consumption messages, unless the consumer group is set to consume best from slave. By default, consumer groups consume from master.
  1. Producer reports an error "No Topic Route Info". How to diagnose it?
  • Background: this happens when you try to send a message to a topic whose routing information is not available to the producer.
    • Ensure that producers can connect to the name server and get routing meta information from it.
    • Ensure that the name server does contain routing meta information for the topic. You can query the routing meta information from the name server through topicRoute using the management tool or the Web console.
    • Make sure your agent sends a heartbeat to the same list of name servers to which your producer is connected.
    • Ensure that the permission of the subject is 6(rw -), or at least 2(-w -).
  • If this topic cannot be found, create it on the agent through the administrative tool command updateTopic or the Web console.

Keywords: Java

Added by mattastic on Sat, 22 Jan 2022 01:51:11 +0200