Implementation of distributed transaction based on RocketMQ

If our system does not pursue strong consistency, the most commonly used is the final consistency scheme. Today, we will implement the distributed transaction of the final message consistency scheme based on RocketMQ.

1, Transaction message

Here, we need to understand two concepts.
  • Half Message
Messages that cannot be consumed by the Consumer temporarily. The Producer has sent the message to the Broker, but the status of the message is marked as undeliverable. The message in this status is called semi message. In fact, messages in this state will be placed in a called RMQ_ SYS_ TRANS_ HALF_ Under topic. After the Producer side confirms it twice, that is, after the Commit, the Consumer side can consume it; If it is a Rollback, the message will be deleted and will never be consumed.
  • Transaction status back check
The Producer side may not confirm this semi message due to network reasons and application problems. At this time, the Broker server will regularly scan these semi messages and actively find the Producer side to query the status of the message. Of course, when to scan, including several scans, can be configured, which will be described in detail later. In short, the implementation principle of RocketMQ transaction message is to determine whether the message is finally committed or rolled back based on two-stage commit and transaction status backcheck.
In this article, the code takes order service and integral service as examples. Combined with the above, the overall process is as follows:  
 

2, Order service

In the order service, we receive the request from the front end, create an order, and save the relevant data to the local database.

1. Transaction log table

In order service, in addition to an order table, a transaction log table is also required. It is defined as follows:
CREATE TABLE `transaction_log` (
  `id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT 'affair ID',
  `business` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT 'Business identification',
  `foreign_key` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT 'Corresponds to the primary key in the business table',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
This table is dedicated to transaction status backtracking. When submitting business data, this table also inserts a piece of data, which coexist in a local transaction. Query the table through the transaction ID. if a record is returned, it proves that the local transaction has been committed; If no record is returned, the local transaction may be in an unknown state or a rollback state.

2,TransactionMQProducer

To send a message through RocketMQ, you need to create a message sender first. It is worth noting that if a transaction message is sent, the instance created here must be TransactionMQProducer.
@Component
public class TransactionProducer {
    
    private String producerGroup = "order_trans_group";
    private TransactionMQProducer producer;

    //Listener used to perform local transactions and transaction status lookbacks
    @Autowired
    OrderTransactionListener orderTransactionListener;
    //Thread pool for executing tasks
    ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
            TimeUnit.SECONDS, new ArrayBlockingQueue<>(50));
            
    @PostConstruct
    public void init(){
        producer = new TransactionMQProducer(producerGroup);
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.setSendMsgTimeout(Integer.MAX_VALUE);
        producer.setExecutorService(executor);
        producer.setTransactionListener(orderTransactionListener);
        this.start();
    }
    private void start(){
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
    //Transaction message sending 
    public TransactionSendResult send(String data, String topic) throws MQClientException {
        Message message = new Message(topic,data.getBytes());
        return this.producer.sendMessageInTransaction(message, null);
    }
}
In the above code, it is mainly to create the sender of the transaction message. Here, focus on the OrderTransactionListener, which is responsible for performing local transactions and transaction status lookbacks.

3,OrderTransactionListener

@Component
public class OrderTransactionListener implements TransactionListener {

    @Autowired
    OrderService orderService;

    @Autowired
    TransactionLogService transactionLogService;

    Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        logger.info("Start local transaction....");
        LocalTransactionState state;
        try{
            String body = new String(message.getBody());
            OrderDTO order = JSONObject.parseObject(body, OrderDTO.class);
            orderService.createOrder(order,message.getTransactionId());
            state = LocalTransactionState.COMMIT_MESSAGE;
            logger.info("Local transaction committed.{}",message.getTransactionId());
        }catch (Exception e){
            logger.info("Failed to execute local transaction.{}",e);
            state = LocalTransactionState.ROLLBACK_MESSAGE;
        }
        return state;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        logger.info("Start back checking local transaction status.{}",messageExt.getTransactionId());
        LocalTransactionState state;
        String transactionId = messageExt.getTransactionId();
        if (transactionLogService.get(transactionId)>0){
            state = LocalTransactionState.COMMIT_MESSAGE;
        }else {
            state = LocalTransactionState.UNKNOW;
        }
        logger.info("End local transaction status query:{}",state);
        return state;
    }
}
After the transaction message is sent through producer.sendMessageInTransaction, if the message is sent successfully, the executelocetransaction method here will be called to execute the local transaction. Here, it completes the insertion of order data and transaction log. The return value LocalTransactionState of this method represents the local transaction state, which is an enumeration class.
public enum LocalTransactionState {
    //Submit a transaction message that consumers can see
    COMMIT_MESSAGE,
    //Rollback the transaction message, and the consumer will not see this message
    ROLLBACK_MESSAGE,
    //The transaction state is unknown. It is necessary to call the transaction state back check to determine whether the message is committed or rolled back
    UNKNOW;
}
Then, the checkLocalTransaction method is used for transaction status query. Here, we query transaction by transaction ID_ Log, if the query result can be found, the transaction message will be submitted; If no query is found, the unknown status is returned. Note that there is another issue involved here. If the unknown status is returned, the RocketMQ Broker server will continuously check back at an interval of 1 minute until the maximum number of transaction check back detection is reached. If the transaction status has not been queried beyond this number, this message will be rolled back. Of course, we can configure the frequency and maximum number of transaction lookbacks. On the Broker side, you can configure it as follows:
brokerConfig.setTransactionCheckInterval(10000); //Check back every 10 seconds
brokerConfig.setTransactionCheckMax(3);  //The maximum number of tests is 3

4. Business implementation class

@Service
public class OrderServiceImpl implements OrderService {
    @Autowired
    OrderMapper orderMapper;
    @Autowired
    TransactionLogMapper transactionLogMapper;
    @Autowired
    TransactionProducer producer;

    Snowflake snowflake = new Snowflake(1,1);
    Logger logger = LoggerFactory.getLogger(this.getClass());

    //Called when executing a local transaction to write the order data and transaction log to the local database
    @Transactional
    @Override
    public void createOrder(OrderDTO orderDTO,String transactionId){

        //1.Create order
        Order order = new Order();
        BeanUtils.copyProperties(orderDTO,order);
        orderMapper.createOrder(order);

        //2.Write transaction log
        TransactionLog log = new TransactionLog();
        log.setId(transactionId);
        log.setBusiness("order");
        log.setForeignKey(String.valueOf(order.getId()));
        transactionLogMapper.insert(log);

        logger.info("Order creation completed.{}",orderDTO);
    }

    //Front end call, only used to RocketMQ Send transaction message
    @Override
    public void createOrder(OrderDTO order) throws MQClientException {
        order.setId(snowflake.nextId());
        order.setOrderNo(snowflake.nextIdStr());
        producer.send(JSON.toJSONString(order),"order");
    }
}
In the order business service class, we have two methods. One is used to send transaction messages to RocketMQ, and the other is used to drop real business data.

5. Call

@RestController
public class OrderController {

    @Autowired
    OrderService orderService;
    Logger logger = LoggerFactory.getLogger(this.getClass());

    @PostMapping("/create_order")
    public void createOrder(@RequestBody OrderDTO order) throws MQClientException {
        logger.info("Order data received:{}",order.getCommodityCode());
        orderService.createOrder(order);
    }
} 

6. Summary

At present, the business logic of order service has been completed. We summarize the process as follows:
Considering the abnormal conditions, the key points here are as follows:
  • The first time createOrder is called, a transaction message is sent. If the sending fails, resulting in an error, the exception will be returned. At this time, no data security will be involved.
  • If the transaction message is sent successfully, but an exception occurs when executing the local transaction, the order data and transaction log will not be saved because they are in a local transaction.
  • If the local transaction is completed, but the local transaction state is not returned in time or the unknown state is returned. Well, it will be
  • Broker
  • Regularly check the transaction status, and then judge whether the order has been completed according to the transaction log table, and write it to the database.
Based on these elements, we can say that the consistency of order service and transaction message has been guaranteed. Then, the next step is how to correctly consume order data and complete corresponding business operations.  

3, Points service

In the point service, it mainly consumes the order data, and then adds points to the corresponding users according to the order content.  

1. Integral record form

CREATE TABLE `t_points` (
  `id` bigint(16) NOT NULL COMMENT 'Primary key',
  `user_id` bigint(16) NOT NULL COMMENT 'user id',
  `order_no` bigint(16) NOT NULL COMMENT 'Order No',
  `points` int(4) NOT NULL COMMENT 'integral',
  `remarks` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT 'remarks',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
Here, we focus on order_no field, which is an option to implement idempotent consumption.  

2. Consumer startup

@Component
public class Consumer {

    String consumerGroup = "consumer-group";
    DefaultMQPushConsumer consumer;

    @Autowired
    OrderListener orderListener;
    
    @PostConstruct
    public void init() throws MQClientException {
        consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("order","*");
        consumer.registerMessageListener(orderListener);
        consumer.start();
    }
}
It's easy to start a consumer. Let's specify the topic and listener to consume.  

3. Consumer listener

@Component
public class OrderListener implements MessageListenerConcurrently {

    @Autowired
    PointsService pointsService;
    Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
        logger.info("The consumer thread listens to the message.");
        try{
            for (MessageExt message:list) {
                logger.info("Start processing order data and prepare to increase points....");
                OrderDTO order  = JSONObject.parseObject(message.getBody(), OrderDTO.class);
                pointsService.increasePoints(order);
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }catch (Exception e){
            logger.error("An exception occurred while processing consumer data.{}",e);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
}
After monitoring the message, call the business service class to handle it. If the processing is completed, return to consult_ Success to submit. If the processing fails, it returns RECONSUME_LATER to try again.  

4. Increase points

Here, the main function is to store the integral data. However, it should be noted that judgment should be made before warehousing to achieve idempotent consumption.
@Service
public class PointsServiceImpl implements PointsService {

    @Autowired
    PointsMapper pointsMapper;

    Snowflake snowflake = new Snowflake(1,1);
    Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public void increasePoints(OrderDTO order) {
        
        //Query before warehousing to realize idempotent
        if (pointsMapper.getByOrderNo(order.getOrderNo())>0){
            logger.info("Points added, order processed.{}",order.getOrderNo());
        }else{
            Points points = new Points();
            points.setId(snowflake.nextId());
            points.setUserId(order.getUserId());
            points.setOrderNo(order.getOrderNo());
            Double amount = order.getAmount();
            points.setPoints(amount.intValue()*10);
            points.setRemarks("Total consumption of goods["+order.getAmount()+"]Yuan, get integral"+points.getPoints());
            pointsMapper.insert(points);
            logger.info("Order number is already{}Increase points.",points.getOrderNo());
        }
    }
} 

5. Idempotent consumption

There are many ways to realize idempotent consumption. The specific ways to do this depend on our own situation. For example, in this example, we directly bind the order number and integral record in the same table. Before adding integral, we can query whether the order has been processed. Alternatively, we can create an additional table to record the processing of the order.
In addition, you can directly put this information into the redis cache and query the cache before warehousing. No matter which way you do it, the general idea is to query whether the message has been processed before executing the business. Then, this involves a data primary key problem. In this example, we use the order number as the primary key or the transaction ID as the primary key. If For ordinary messages, we can also create a unique message ID as the primary key.  

6. Abnormal consumption

We know that when the consumer fails to process the message, it will return to resume_later and let the message retry. By default, it can retry up to 16 times. What if the message cannot be processed correctly for special reasons?
We consider two ways to solve this problem.
First, set the message retry times in the code. If the specified times are reached, send an email or SMS to notify the business party to intervene manually.
@Component
public class OrderListener implements MessageListenerConcurrently {

    Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
        logger.info("The consumer thread listens to the message.");
        for (MessageExt message:list) {
            if (!processor(message)){
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    /**
     * Message processing: after the third processing failure, send an email to notify manual intervention
     * @param message
     * @return
     */
    private boolean processor(MessageExt message){
        String body = new String(message.getBody());
        try {
            logger.info("Message processing....{}",body);
            int k = 1/0;
            return true;
        }catch (Exception e){
            if(message.getReconsumeTimes()>=3){
                logger.error("The maximum number of message retries has been reached. The business personnel will be notified to troubleshoot the problem.{}",message.getMsgId());
                sendMail(message);
                return true;
            }
            return false;
        }
    }
}
Second, wait for the maximum number of message retries and enter the dead letter queue. The maximum number of message retries is 16 by default. We can also set this number on the consumer side.
consumer.setMaxReconsumeTimes(3);//Sets the maximum number of message retries
The subject name of the dead letter queue is% DLQ% + consumer group name. For example, in the order data, we set the consumer group name:
String consumerGroup = "order-consumer-group";
The subject name of the dead letter queue corresponding to this consumer is% dlq% order consumer group
 
As shown in the figure above, we also need to click TOPIC configuration to modify the perm attribute to 6.
Finally, you can listen to this topic through the program code to notify manual intervention or view the processing directly on the console. Through idempotent consumption and processing of dead letter messages, you can basically ensure that the messages will be processed.

Added by Hafkas on Tue, 23 Nov 2021 08:55:58 +0200