RocketMQ message transaction

Half(Prepare) Message

Semi message is a special message type. Messages in this state cannot be consumed by consumers for the time being. When a transaction message is successfully delivered to the Broker, but the Broker does not receive the secondary confirmation sent by the Producer, the transaction message is in the "temporarily unavailable for consumption" state, and the transaction message in this state is called semi message.

Message Status Check

Due to network jitter, Producer restart and other reasons, the secondary confirmation message sent by Producer to Broker may not be delivered successfully. If the Broker detects that a transaction message is in the semi message state for a long time, it will actively initiate a backcheck operation to the Producer to query the transaction state (Commit or Rollback) of the transaction message at the Producer. It can be seen that Message Status Check is mainly used to solve the timeout problem in distributed transactions.

Execution process

  • Step 1: the Producer sends a Half Message to the Broker;
  • Step 2: Broker ACK, Half Message sent successfully;
  • Step 3: the Producer executes local transactions;
  • Step 4: after the local transaction is completed, the Producer sends a secondary confirmation message to the Broker to confirm the Commit or Rollback status of the HalfMessage according to the transaction status. After the Broker receives the secondary confirmation message, for the Commit status, it will be directly sent to the Consumer to execute the consumption logic, while for the Rollback, it will be directly marked as failure, cleared after a period of time, and will not be sent to the Consumer. Under normal circumstances, this distributed transaction has been completed, and the rest to be handled is the timeout problem, that is, the Broker still does not receive the secondary confirmation message from the Producer after a period of time;
  • Step 5: for the timeout status, the Broker actively initiates a message check back to the Producer;
  • Step6: Producer processes the query back message and returns the execution result of the corresponding local transaction;
  • Step 7: the Broker performs a Commit or Rollback operation on the result of the query back message, the same as step 4.

Take a chestnut

producer

package transaction;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class Producer {
    public static void main(String[] args) throws Exception{
        //Create message producer
        TransactionMQProducer producer = new TransactionMQProducer("demo_producer_transaction_group");
        //2. The producer should actively contact namesrvAddr
        producer.setNamesrvAddr("8.131.84.120:9876");
        //Specify the message listening object, which is used for executing local transactions and message back checking
        producer.setTransactionListener(new TransactionListener() {
            //Execute local transactions
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                if (StringUtils.equals("TAGA",message.getTags())){
                    return LocalTransactionState.COMMIT_MESSAGE;
                }else if (StringUtils.equals("TAGB",message.getTags())){
                    return  LocalTransactionState.ROLLBACK_MESSAGE;
                }else if (StringUtils.equals("TAGC",message.getTags())){
                    //If the transaction message is not received, the processing will call the callback method
                    return LocalTransactionState.UNKNOW;
                }
                return LocalTransactionState.UNKNOW;
            }
            //Message check back
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                System.out.println(messageExt.getTags());
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });
        //3. Start the producer after the connection is successful
        producer.start();

        String tags[] = {"TAGA","TAGB","TAGC"};
        for (int i = 0; i < 3; i++) {
            //4. Create a message class, including topic and body
            Message message = new Message(
                    "Topic_transaction_demo", //theme
                    tags[i],                        //Message filtering
                    "keys_",                   //Unique value of the message
                    ("hello world").getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            //5. Producer sends transaction message
            TransactionSendResult result = producer.sendMessageInTransaction(message, null);
            System.out.println(result);
        }
        //6. Shut down producer
        //producer.shutdown();
    }
}

consumer

package transaction;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.io.UnsupportedEncodingException;
import java.util.List;

public class Consumer {
    public static void main(String[] args) throws Exception {
        //1. Create DefaultMQPushConsumer
        DefaultMQPushConsumer mqPushConsumer = new DefaultMQPushConsumer("demo-consumer-group");
        //2. Set namesrv address
        mqPushConsumer.setNamesrvAddr("8.131.84.120:9876");
        //3. Set subscribe to read subject information
        /**
         * The producer is similar to an author, and namesrv is similar to a magazine. Consumers must subscribe to a newspaper before they can receive the articles written by the producer to the newspaper
         * Each consumer can only subscribe to one topic
         * topic: Address of the following message
         * Filter *: indicates no filtering
         */
        mqPushConsumer.subscribe("Topic_transaction_demo","*");
        //4. Consumers register a listener so that they can know in time after the news provided by the producer comes in from namesrv
        //Messagelistenercurrently refers to ordinary message reception, and MessageListenerOrderly refers to sequential message reception
        mqPushConsumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {
                for (MessageExt msg : msgs) {
                    try {
                        //Get theme
                        String topic = msg.getTopic();
                        //Get tag
                        String tags = msg.getTags();
                        //Get message
                        String result = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
                        System.out.println("subject: "+topic+", tag: "+tags+", message: "+result);
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                        //Message retry
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        } );
        //Start Consumer
        mqPushConsumer.start();
        System.out.println("consumer start...");
    }
}

The above explanation:

The consumer finally consumes two messages. When the local transaction status is unknown, the message check back method checkLocalTransaction is called

Keywords: RocketMQ

Added by nyfael on Fri, 11 Feb 2022 12:52:50 +0200