rocketMq learning and Practice

rocketMq learning and Practice (I)

Installation configuration

download

Binary download address: http://rocketmq.apache.org/release_notes/release-notes-4.8.0/
Docker pull address: docker pull foxiswho / rocketmq: server-4.3 two

install

  • Configure environment variables

  • Find the installation package, start nameserver, enter the bin directory, and double-click mqnamesrv CMD started. Start the broker command \mqbroker -n 127.0.0.1:9876, specify configuration to enable automatic creation of topic and start bin / mqbroker - n 127.0 0.1:9876 autoCreateTopicEnable=true -c conf/broker. conf &

  • Visual plug-in download

    • Download address: https://github.com/apache/rocketmq-externals.git

    • Modify application. Under rocketmq externals \ rocketmq console \ SRC \ main \ resources Properties, modify port and namesrvAddr, and the configuration is as follows

      server.contextPath=
      # Modify port number
      server.port=8088
      #spring.application.index=true
      spring.application.name=rocketmq-console
      spring.http.encoding.charset=UTF-8
      spring.http.encoding.enabled=true
      spring.http.encoding.force=true
      logging.config=classpath:logback.xml
      #if this value is empty,use env value rocketmq.config.namesrvAddr  NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876
      # Add rocketmq config. namesrvAddr
      rocketmq.config.namesrvAddr=127.0.0.1:9876
      #if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
      rocketmq.config.isVIPChannel=
      #rocketmq-console's data path:dashboard/monitor
      rocketmq.config.dataPath=/tmp/rocketmq-console/data
      #set it false if you don't want use dashboard.default true
      rocketmq.config.enableDashBoardCollect=true
      
    • Start the compilation, enter the \ rocketmq externals \ rocketmq console folder with CMD, and execute MVN clean package - dmaven test. Skip = true, compiled and generated. After the compilation is successful, CMD enters the target folder and executes Java - jar rocketmq-console-ng-1.0 0.jar, start rocketmq-console-ng-1.0 0.jar. Enter '127.0' in the browser 0.1:8088 ', you can view it after success.

    • Start nameserv start mqnamesrv CMD start mqbroker cmd -n 127.0. 0.1:9876 autoCreateTopicEnable=true

Architecture diagram

springboot integration

Reference example https://blog.csdn.net/qq_38366063/article/details/93387680
Official website: https://github.com/apache/rocketmq-spring

rely on

  • maven
<dependency>
    <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>
  • gradle

    compile 'org.apache.rocketmq:rocketmq-spring-boot-starter:2.2.0'
    

to configure

rocketmq:
 name-server: 127.0.0.1:9876
 producer:
   group: qinajia
   # Timeout 5 minutes
   send-message-timeout: 300000
   # Number of retries: 3
   retry-times-when-send-failed: 3
 consumer:
   group: qianjia

Producer implementation

rocketMQTemplate.send("test-topic-1", new GenericMessage(demo));

Producer implementation

@Service
@RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
public class TestConsumer implements RocketMQListener<Demo> {

    @Override
    public void onMessage(Demo demo) {
        System.out.print("------- OrderPaidEventConsumer received:"+ JSON.toJSONString(demo));
    }
}

3 message sending methods

  • Send sync synchronously

    The sending message adopts the synchronization mode, which returns the result only after the message is completely sent. This method has the time cost of waiting for the sending result synchronously.

    This method has an internal retry mechanism, that is, the internal implementation will retry a certain number of times before actively declaring that the message sending fails, The default is twice (DefaultMQProducer getRetryTimesWhenSendFailed). The same message may be sent to the broker multiple times in the sent result. Here, the application developer needs to handle the idempotency problem on the consumer side. An example is as follows

    public void sync() {
         rocketMQTemplate.syncSend("topic-name", "send sync message !");
     }
    
  • Send sync asynchronously

    The sending message adopts the asynchronous sending mode. It returns immediately after sending. When the message is completely sent, the callback function sendCallback will be called to inform the sender whether the sending is successful or failed. Asynchronous mode is usually used in response time sensitive business scenarios, that is, it can not bear the time-consuming cost of waiting for return when sending messages synchronously.

    Like synchronous sending, the asynchronous mode also implements the retry mechanism internally. The default number of times is 2 (defaultmqproducer #getretrytimeswhensensensayncfailed}). The same message may be sent to the broker many times in the sent result. The application developer needs to deal with the idempotency problem on the consumer side.

    public void async() {
         rocketMQTemplate.asyncSend("topic-name", "send async message!", new SendCallback() {
             @Override
             public void onSuccess(SendResult sendResult) {
                 log.info("send successful");
             }
    
             @Override
             public void onException(Throwable throwable) {
                 log.info("send fail; {}", throwable.getMessage());
             }
         });
     }
    
  • Send one-way directly

    When sending a message in one-way mode, the sender will return immediately after sending the message, and will not wait for an ack from the broker to tell whether the message has been sent completely. This method has a large throughput, but there is a risk of message loss, so it is suitable for sending unimportant messages, such as log collection.

    public void oneWay() {
         rocketMQTemplate.sendOneWay("topic-name", "send one-way message");
    }
    

Two consumption patterns

  • Load balancing consumption

    @Service
    @RocketMQMessageListener(
            topic = "topicName",
            consumerGroup = "my-group1",
            selectorExpression = "tag01||tag02",
            // Load balancing mode
            messageModel = MessageModel.CLUSTERING
    )
    @Slf4j
    public class Consumer2 implements RocketMQListener<Demo> {
    
        @Override
        public void onMessage(Demo demo) {
            log.debug("Message content {}", demo.toString());
        }
    }
    
  • Broadcast mode

    @Service
    @RocketMQMessageListener(
            topic = "topicName",
            consumerGroup = "mall",
            selectorExpression = "tag01||tag02",
            // Broadcast mode
            messageModel = MessageModel.BROADCASTING
    )
    @Slf4j
    public class Consumer implements RocketMQListener<Demo> {
    
        @Override
        public void onMessage(Demo demo) {
            System.out.println("Message content" + demo.toString());
        }
    }
    

Sequential consumption

Why there is sequential consumption? First, we need to understand the design model of rocketmq. In rocketmq, nameserver is used to register borker. In borker, topic is used to distinguish message types. If you want to distinguish more finely, tag is used as a secondary message type. There are multiple queues in a topic. In this way, even if the messages are placed in the same topic, there may not be the same Queue when sending messages. All consumers are naturally out of order when consuming. To ensure that the consumption order is consistent with the sending order of the generator, the only way is to put the messages of the same business group in the same Queue. The Queue has the characteristics of first in first out data structure, All messages that can only specify the same group of services when generating messages are in the same Queue. The specific code implementation is as follows.

 /**
    * The spring boot rocketmq encapsulated syncSendOrderly(String destination, Object payload, String hashKey) used here 
    * Method to implement sequential messages. hashkey is a parameter used to select the same queue. Generally, it is enough to pass unique IDs such as orderId.
    * ((producer)
    **/
	@Test
    public void orderMessage() {
        SendResult result = rocketMQTemplate.syncSendOrderly("topicName:tag01", "Lau Andy", "1111");
        System.out.println("1:" +  result.toString());
        SendResult result2 = rocketMQTemplate.syncSendOrderly("topicName:tag02", "Xue You Zhang", "1111");
        System.out.println("2:" +result2.toString());
        SendResult result3 = rocketMQTemplate.syncSendOrderly("topicName:tag03", "Li Si-", "1111");
        System.out.println("3:" + result3.toString());
        SendResult result4 = rocketMQTemplate.syncSendOrderly("topicName:tag04", "Wang Wu", "1111");
        System.out.println("4:" + result4.toString());
    }
	/**
	* Consumer selects order for consumeMode
	**/
    @Service
    @RocketMQMessageListener(
            topic = "topicName",
            consumerGroup = "mall",
            // Broadcast mode
            messageModel = MessageModel.CLUSTERING,
        	// Select sequential mode
            consumeMode = ConsumeMode.ORDERLY
    )
    @Slf4j
    public class Consumer implements RocketMQListener<String> {

        @Override
        public void onMessage(String demo) {
            System.out.println("Message content" + demo);
        }
    }

Delay queue

   @Test
   private void delayMessage() {
       Demo demo = new Demo();
       demo.setName("leyang");
       demo.setSkill("coder");
       Message<Demo> message = new GenericMessage(demo);
       // syncSend(String destination, Message<?> message, long timeout, int delayLevel) 
       // Delaylevel 1s 5S 10s 30s 1m 2m 4m 6m 7m 8m 9m 10m 20m 30m 1H 2h level 1 and 1s, maximum delay 2h
       // Timeout timeout has nothing to do with the delay time
       SendResult result = rocketMQTemplate.syncSend("topicName:tag01", message, 2000, 1);
   }

Batch message sending

Note: the amount of data sent in batch cannot exceed 4 m. after that, the message needs to be divided.

    @Test
    public void batchMessage() {
        List<Message<Demo>> messages = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Demo demo = new Demo();
            demo.setName("leyang" + (i + 1));
            demo.setSkill("coder");
            Message<Demo> message = new GenericMessage(demo);
            messages.add(message);
        }
        SendResult result = rocketMQTemplate.syncSend("topicName:tag01", messages);
    }

Message filtering

  • Filter by tag

        // producer
    	@Test
        public void orderMessage() {
            // through
            SendResult result = rocketMQTemplate.syncSend("topicName:tag01", "Lau Andy");
            System.out.println("1:" +  result.toString());
            SendResult result2 = rocketMQTemplate.syncSend("topicName:tag02", "Xue You Zhang");
            System.out.println("2:" +result2.toString());
            SendResult result3 = rocketMQTemplate.syncSend("topicName:tag03", "Li Si-");
            System.out.println("3:" + result3.toString());
            SendResult result4 = rocketMQTemplate.syncSend("topicName:tag04", "Wang Wu");
            System.out.println("4:" + result4.toString());
        }
    	// consumer
    	@Service
        @RocketMQMessageListener(
                topic = "topicName",
                consumerGroup = "mall",
                // Broadcast mode
                messageModel = MessageModel.CLUSTERING,
                consumeMode = ConsumeMode.ORDERLY,
            	// Select tag01 and tag05 for consumption 
                selectorExpression = "tag01 || tag05"
        )
        @Slf4j
        public class Consumer implements RocketMQListener<String> {
    
            @Override
            public void onMessage(String demo) {
                System.out.println("Message content" + demo);
            }
        }
    
  • Filter based on custom criteria

    matters needing attention:

    You need to modify the broker Conf configuration file, add the following configuration

    # Turn on attribute filtering
    enablePropertyFilter=true
    

    Then, when starting, you need to set the configured file mode to start, and start the command start mqbroker cmd -n 127.0. 0.1:9876 -c ../ conf/broker. conf

    No configuration will not take effect.

        // Generator
    	@Test
        public void filterMessage() {
            for (int i = 0; i < 10; i++) {
                // You need to use the rocketmq native message object to set custom properties
                org.apache.rocketmq.common.message.Message rocketMsg = new org.apache.rocketmq.common.message.Message();
                Demo demo = new Demo();
                demo.setName("leyang" + (i + 1));
                demo.setSkill("coder");
                rocketMsg.setBody(JSON.toJSONBytes(demo));
                rocketMsg.putUserProperty("i", String.valueOf(i + 1));
                rocketMsg.setTopic("topicName");
                rocketMsg.setTags("tag01");
                try {
                    // rocketMQTemplate.getProducer() send org apache. rocketmq. common. Message takes effect
                    SendResult send = rocketMQTemplate.getProducer().send(rocketMsg);
                } catch (MQClientException e) {
                    e.printStackTrace();
                } catch (RemotingException e) {
                    e.printStackTrace();
                } catch (MQBrokerException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    	// consumer
    	@Service
        @RocketMQMessageListener(
                topic = "topicName",
                consumerGroup = "mall",
                // Broadcast mode
                messageModel = MessageModel.CLUSTERING
        )
        @Slf4j
        public class Consumer implements RocketMQListener<Demo>, RocketMQPushConsumerLifecycleListener {
    
            @Override
            public void onMessage(Demo demo) {
                System.out.println("Message content" + demo);
            }
    
            @Override
            public void prepareStart(DefaultMQPushConsumer consumer) {
                // set consumer consume message from now
                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
                consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
                try {
                    consumer.subscribe("topicName", MessageSelector.bySql("i > 5"));
                } catch (MQClientException e) {
                    e.printStackTrace();
                }
            }
        }
    

Abnormal problem

[question 1]

Abnormal information

Caused by: org.apache.rocketmq.client.exception.MQBrokerException: CODE: 14  DESC: service not available now. It may be caused by one of the following reasons: the broker's disk is full [CL:  0.93 CQ:  0.93 INDEX:  0.93], messages are put to the slave, message store has been shut down, etc. BROKER: 172.31.254.27:10911

resolvent

Edit the / conf/2m-2s-async/broker-a.properties file and add diskMaxUsedSpaceRatio=98. An error will not be reported until the disk occupies 98%

Reference blog

Architecture analysis https://www.cnblogs.com/qdhxhz/p/11094624.html
Use specification reference: https://www.jianshu.com/p/9c0d4bde8153
Detailed explanation of rocketmq Foundation: https://blog.csdn.net/weixin_39615596/article/details/111611635
rocketmq comparison kafka: https://blog.csdn.net/damacheng/article/details/42846549
docker installation tutorial: https://zhuanlan.zhihu.com/p/342022297

Keywords: Java Docker kafka Middleware

Added by snaack on Thu, 30 Dec 2021 11:27:01 +0200