Background
MQ component is an indispensable tool in the system architecture. At the design level, it can reduce the system coupling, and high concurrency scenarios can also play the role of peak cutting and valley filling. From single application to cluster deployment scheme, to the current microservice architecture, MQ has been widely recognized for its excellent performance and high reliability.
With the increase of data volume, the system pressure becomes larger, and this phenomenon begins to appear: the database has been updated, but the message hasn't been sent out, or the message has been sent first, but the database update fails later. As a result, various data of R & D children's shoes are repaired. The probability of this kind of production problem is small, but it is very depressing. This is actually the consistency between database transactions and MQ messages. In short, database transactions and normal MQ message sending cannot be directly bound together with database transactions. For example, the two problem scenarios mentioned above:
- Send MQ message after database transaction submission;
- MQ messages are sent first, and then database transactions are committed.
The problem in scenario 1 is that the server is down just after the database transaction is submitted, and the MQ message is not sent out. The problem in scenario 2 is that the MQ message is sent out, but the database transaction submission fails, and there is no way to append the sent MQ message. As a result, the data is not updated, the downstream has received the message, and the final transaction is inconsistent.
2. Export of transaction messages
We take the shopping scenario of micro service architecture as an example, and refer to the official example of RocketMQ. After user A initiates an order and pays 100 yuan, it can get 100 points. Account service and member service are two independent micro service modules with their own databases. According to the above mentioned problem possibilities, these situations will occur:
- If you deduct the money first, and then send a message, maybe the money has just been deducted, the machine is down, the message hasn't been sent out, and the score hasn't increased.
- If you send a message first, and then deduct money, the points may have increased, but the money has not been deducted, and you have given 100 points to others for nothing.
- The money was deducted normally, and the message was sent successfully. However, there was a problem with the consumption message of the member service instance, and the score did not increase.
This leads to the problem of transaction consistency between database transaction and MQ message. The problem solved by rocketmq transaction message is to solve the atomicity problem of local transaction execution and message sending. The boundary here must be clear. It is to ensure that the MQ production side correctly sends out messages without multiple or missed messages. However, as for the normal consumption of the consumer end after sending (for example, in the third case mentioned above, the money is deducted normally, and the message is sent, but the downstream consumption has a problem, resulting in incorrect points), this abnormal scenario will be guaranteed by the MQ message consumption failure retry mechanism, which is not in the scope of this discussion.
Common MQ components have their own implementation schemes for this scenario. For example, ActiveMQ uses AMQP protocol (second-order submission method) to ensure that messages are sent correctly. Here we focus on RocketMQ.
III. RocketMQ transaction message design ideas
According to CAP theory, RocketMQ transaction messages ensure the final consistency of transactions by asynchronous way. In the design process, the two-stage submission theory is used for reference, and the flow chart is as follows:
- When the application module encounters a scenario to send transaction messages, it first sends the prepare message to MQ.
- After the prepare message is sent successfully, the application module executes the database transaction (local transaction).
- According to the result of database transaction execution, then return Commit or Rollback to MQ.
- If it is Commit, MQ will send the message to the Consumer side. If it is Rollback, delete the prepare message directly.
- If the execution result of step 3 is unresponsive or time-out, start the scheduled task to check the transaction status (retry 15 times at most, and discard the message by default). The processing result is the same as that of step 4.
- The success mechanism of MQ consumption is guaranteed by MQ itself.
IV. RocketMQ transaction message implementation process
Taking rocketmq version 4.5.2 as an example, there is a special queue RMQ sys trans half Topic for transaction messages. All prepare messages are put here first. When the message receives the Commit request, it will be put into the real Topic queue for Consumer consumption, and a message will be put into RMQ sys trans OP half Topic at the same time. The simple flow chart is as follows:
In the above process, please allow me to divide module responsibilities as follows:
- RocketMQ Client is the dependent jar package imported in our project, RocketMQ Broker is the deployed server, and NameServer is not reflected yet.
- The application modules appear in pairs, the upstream is the transaction message production end, and the downstream is the transaction message consumer end (the transaction message is transparent to the consumer end, consistent with the ordinary message).
If the transaction of the application module cannot respond immediately due to interruption or other network reasons, RocketMQ treats it as unknown. RocketMQ transaction message also provides a remedy: query the database transaction status of the transaction message regularly.
The simple flow chart is as follows:
V. source code analysis
The idea of explanation is basically based on the following flow chart, according to the module responsibility and process analysis one by one.
- Environmental preparation
Before reading the source code, you need to obtain and debug the source code of RocketMQ on the IDE. Please refer to the method for this part.
- Application module (transaction message production side) core source code
Create a listening class, implement the TransactionListener interface, and simulate the results of the database transaction submission method and the transaction status method of the callback.
/** * @program: rocket * @description: Debug transaction message sample code * @author: Huang * @create: 2019-10-16 **/ public class SelfTransactionListener implements TransactionListener { private AtomicInteger transactionIndex = new AtomicInteger(0); private AtomicInteger checkTimes = new AtomicInteger(0); private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>(); /** * Execute local transaction * * @param message * @param o * @return */ @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { String msgKey = message.getKeys(); System.out.println("start execute local transaction " + msgKey); LocalTransactionState state; if (msgKey.contains("1")) { // The first message got him through. state = LocalTransactionState.COMMIT_MESSAGE; } else if (msgKey.contains("2")) { // The second message simulates an exception and explicitly replies to the rollback operation state = LocalTransactionState.ROLLBACK_MESSAGE; } else { // The third message is unresponsive. Let it call the callback transaction method. state = LocalTransactionState.UNKNOW; // Give the remaining 3 messages and put 1, 2 and 3 statuses localTrans.put(msgKey, transactionIndex.incrementAndGet()); } System.out.println("executeLocalTransaction:" + message.getKeys() + ",execute state:" + state + ",current time: " + System.currentTimeMillis()); return state; } /** * Retrieve local transaction results * * @param messageExt * @return */ @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { String msgKey = messageExt.getKeys(); System.out.println("start check local transaction " + msgKey); Integer state = localTrans.get(msgKey); switch (state) { case 1: System.out.println("check result unknown Recall times" + checkTimes.incrementAndGet()); return LocalTransactionState.UNKNOW; case 2: System.out.println("check result commit message, Recall times" + checkTimes.incrementAndGet()); return LocalTransactionState.COMMIT_MESSAGE; case 3: System.out.println("check result rollback message, Recall times" + checkTimes.incrementAndGet()); return LocalTransactionState.ROLLBACK_MESSAGE; default: return LocalTransactionState.COMMIT_MESSAGE; } } }
The producer code example of transaction message sends 5 messages, basically including all scenarios. The sleep time is set enough to ensure that the instance is still running when the transaction is queried back. The code is as follows:
/** * @program: rocket * @description: Rocketmq Transaction message * @author: Huang * @create: 2019-10-16 **/ public class TransactionProducer { public static void main(String[] args) { try { TransactionMQProducer producer = new TransactionMQProducer("transactionMQProducer"); producer.setNamesrvAddr("10.0.133.29:9876"); producer.setTransactionListener(new SelfTransactionListener()); producer.start(); for (int i = 1; i < 6; i++) { Message message = new Message("TransactionTopic", "transactionTest","msg-" + i, ("Hello" + ":" + i).getBytes()); try { SendResult result = producer.sendMessageInTransaction(message, "Hello" + ":" + i); System.out.printf("Topic:%s send success, misId is:%s%n", message.getTopic(), result.getMsgId()); } catch (Exception e) { e.printStackTrace(); } } Thread.sleep(Integer.MAX_VALUE); producer.shutdown(); } catch (MQClientException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
- RocketMQ Client code. The main logic of the code can be divided into three sections: the first section is to set the message to prepare and send it to RocketMQ server.
SendResult sendResult = null; MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); try { sendResult = this.send(msg); } catch (Exception e) { throw new MQClientException("send message Exception", e); }
Second paragraph: after the message is sent successfully, call the application module database transaction method to obtain the transaction result (to save space, the code is abridged)
switch (sendResult.getSendStatus()) { case SEND_OK: { try { if (null != localTransactionExecuter) { localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); } else if (transactionListener != null) { log.debug("Used new transaction API"); localTransactionState = transactionListener.executeLocalTransaction(msg, arg); } if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } } catch (Throwable e) { log.info("executeLocalTransactionBranch exception", e); log.info(msg.toString()); localException = e; } } break; case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break; default: break; }
Section 3: send the transaction result to RocketMQ, end the transaction, and respond to the result to the application module
try { this.endTransaction(sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); }
- RocketMQ Broker side transaction commit / rollback operation (here take the endTransaction part)
Code entry: org.apache.rocketmq.broker.processor.EndTransactionProcessor
OperationResult result = new OperationResult(); if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader); if (result.getResponseCode() == ResponseCode.SUCCESS) { RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); if (res.getCode() == ResponseCode.SUCCESS) { // Change the Topic of the message from RMQ sys trans half Topic to real Topic MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage()); msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback())); msgInner.setQueueOffset(requestHeader.getTranStateTableOffset()); msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset()); msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp()); // Store the message in the real Topic for Consumer consumption RemotingCommand sendResult = sendFinalMessage(msgInner); if (sendResult.getCode() == ResponseCode.SUCCESS) { // Store the message in RMQ sys trans OP half topic and mark it as deletion status. The transaction message will be processed in the scheduled task of back query. this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return sendResult; } return res; } } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) { result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader); if (result.getResponseCode() == ResponseCode.SUCCESS) { RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); if (res.getCode() == ResponseCode.SUCCESS) { this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return res; } }
- RocketMQ Broker side scheduled task back query database transaction part
Method entry: org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService
@Override protected void onWaitEnd() { long timeout = brokerController.getBrokerConfig().getTransactionTimeOut(); // This message is discarded by default after more than 15 failed lookbacks on transaction status int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax(); long begin = System.currentTimeMillis(); log.info("Begin to check prepare message, begin time:{}", begin); this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener()); log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin); }
Callback transaction call entry:
// This code is the check method in the TransactionalMessageServiceImpl class List<MessageExt> opMsg = pullResult.getMsgFoundList(); boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime) || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout)) || (valueOfCurrentMinusBorn <= -1 ); if (isNeedCheck) { if (!putBackHalfMsgQueue(msgExt, i)) { continue; } // Call the listener.resolveHalfMsg(msgExt); } else { pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset); log.info("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i, messageQueue, pullResult); continue; } // This method is in the AbstractTransactionalMessageCheckListener class public void resolveHalfMsg(final MessageExt msgExt) { executorService.execute(new Runnable() { @Override public void run() { try { sendCheckMessage(msgExt); } catch (Exception e) { LOGGER.error("Send check message error!", e); } } }); } // This method is in the AbstractTransactionalMessageCheckListener class public void sendCheckMessage(MessageExt msgExt) throws Exception { CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader(); checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset()); checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId()); checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX)); checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId()); checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset()); msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC)); msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID))); msgExt.setStoreSize(0); String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP); Channel channel = brokerController.getProducerManager().getAvaliableChannel(groupId); if (channel != null) { // Send the request to the RocketMQ Client through Netty, and execute the checkTransactionState method brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt); } else { LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId); } }
After the RocketMQ Client receives the request from the server, it calls the back query database transaction method again, and submits the transaction result to the RocketMQ Broker again.
Method entry: method of org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl class
try { if (transactionCheckListener != null) { localTransactionState = transactionCheckListener.checkLocalTransactionState(message); } else if (transactionListener != null) { log.debug("Used new check API in transaction message"); localTransactionState = transactionListener.checkLocalTransaction(message); } else { log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group); } } catch (Throwable e) { log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e); exception = e; } this.processTransactionState( localTransactionState, group, exception);
6. Add a question
It's mentioned on the official website that transaction messages do not support delay messages and batch messages. I tried to delay messages manually. I set a DelayTimeLevel for transaction messages. As a result, this message can't be removed from RMQ sys trans half topic all the time. The log of the application module finds that it's repeatedly trying to query transactions. The message query list of RMQ sys trans half topic on the Console interface is very large. It's almost 2000 records. Why?
We return to the code level for analysis, and the process is as follows:
1. After the DelayTimeLevel is set, when the data transaction is submitted (or the database transaction is completed) and the message is written to the target Topic, due to the interference of the DelayTimeLevel, the target Topic will become "schedule" Topic XXXX, and the real Topic will become RMQ "sys" trans "half" Topic. The real Topic has been lost in this phase.
// Processing after the RocketMQ Broker accepts the transaction submission org.apache.rocketmq.broker.processor.EndTransactionProcessor class OperationResult result = new OperationResult(); if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { // The putMessage method of CommitLog is called here result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader); if (result.getResponseCode() == ResponseCode.SUCCESS) { RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); if (res.getCode() == ResponseCode.SUCCESS) { // Change the Topic of the message from RMQ sys trans half Topic to real Topic MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage()); msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback())); msgInner.setQueueOffset(requestHeader.getTranStateTableOffset()); msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset()); msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp()); // Store the message in the real Topic. At this time, the Topic has become schedule Topic XXXX. RemotingCommand sendResult = sendFinalMessage(msgInner); if (sendResult.getCode() == ResponseCode.SUCCESS) { // Store the message in RMQ sys trans OP half topic and mark it as deletion status. The transaction message will be processed in the scheduled task of back query. this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return sendResult; } return res; } } // This code is in the putMessage method of org.apache.rocketmq.store.CommitLog class // Due to the interference of DelayTimeLevel, the target topic will become schedule? Topic? XXXX final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // Delay Delivery if (msg.getDelayTimeLevel() > 0) { if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } topic = ScheduleMessageService.SCHEDULE_TOPIC; queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId); } }
An example of a printed log is as follows:
2019-10-17 14\:41\:05 INFO EndTransactionThread_4 - Transaction op message write successfully. messageId=0A00851D00002A9F0000000000000E09, queueId=0 msgExt:MessageExt [queueId=0, storeSize=335, queueOffset=5, sysFlag=8, bornTimestamp=1571293959305, bornHost=/10.0.133.29:54634, storeTimestamp=1571294460555, storeHost=/10.0.133.29:10911, msgId=0A00851D00002A9F0000000000000E09, commitLogOffset=3593, bodyCRC=1849408413, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='SCHEDULE_TOPIC_XXXX', flag=0, properties={REAL_TOPIC=RMQ_SYS_TRANS_HALF_TOPIC, TRANSACTION_CHECK_TIMES=3, KEYS=msg-test-3, TRAN_MSG=true, UNIQ_KEY=0A00851D422C18B4AAC25584B0880000, WAIT=false, DELAY=1, PGROUP=transactionMQProducer, TAGS=transactionTest, REAL_QID=0}, body=[72, 101, 108, 108, 111, 84, 105, 109, 101, 58, 51], transactionId='null'}]
2. The delay message is triggered by a scheduled task. The delay I just set is 1 second. The scheduled task puts the message back into RMQ sys trans half topic. Note that only RMQ sys trans half topic has messages at this time. RMQ sys trans OP half topic queue does not have this message, as shown in the following code:
// This code is in the org.apache.rocketmq.store.schedule.ScheduleMessageService class executeontimeout method try { // Message returns to RMQ sys trans half topic queue MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); PutMessageResult putMessageResult = ScheduleMessageService.this.writeMessageStore .putMessage(msgInner); if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { continue; } else { log.error( "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}", msgExt.getTopic(), msgExt.getMsgId()); ScheduleMessageService.this.timer.schedule( new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_PERIOD); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } } catch (Exception e) { log.error( "ScheduleMessageService, messageTimeup execute error, drop it. msgExt=" + msgExt + ", nextOffset=" + nextOffset + ",offsetPy=" + offsetPy + ",sizePy=" + sizePy, e); }
3. The transaction message timing task starts. Check if there is a message in RMQ sys trans half topic, but there is no message in RMQ sys trans OP half topic. In order to ensure that the message is written in order, fill the message in RMQ sys trans OP half topic again, and trigger a query back transaction operation. The example code is the same as the above callback transaction call entry:
// This code is the check method in the TransactionalMessageServiceImpl class List<MessageExt> opMsg = pullResult.getMsgFoundList(); boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime) || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout)) || (valueOfCurrentMinusBorn <= -1 ); if (isNeedCheck) { if (!putBackHalfMsgQueue(msgExt, i)) { continue; } listener.resolveHalfMsg(msgExt); } else { pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset); log.info("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i, messageQueue, pullResult); continue; }
This constitutes a dead cycle, and the message is not discarded until 15 attempts (the default maximum number of attempts is 15), which is a little expensive. For the optimization of this problem, PR has been submitted to the RocketMQ community. After the new version is released, the transaction message will block the DelayTimeLevel, and this problem will not appear again.
Before the new version is released, our solution:
- It is clear that DelayTimeLevel cannot be set for transaction messages during R & D.
It feels risky. After all, the new children's shoes, which are not particularly familiar with this part of the function, may be added with hand shaking (like my earliest one).
- A simple encapsulation is made for RocketMQ Client, such as providing a rocketmq spring boot starter, which does not provide a set entry in the method of sending transaction messages, as shown in the following example:
/** * Transaction message sending * Delayed sending and bulk sending are not supported */ public void sendMessageInTransaction(String topic, String tag, Object message, String requestId) throws Exception { TransactionMQProducer producer = annotationScan.getProducer(topic + "_" + tag); producer.sendMessageInTransaction(MessageBuilder.of(topic, tag, message, requestId).build(), message); }
It should be more reliable. After all, the setting of DelayTimeLevel parameter is eliminated from the source.
Conclusion
This article briefly introduces the solution scenario of transaction message and the boundary of responsibilities, basic design ideas and processes. In this article, I learn from the sketch of RocketMQ author, and then select part of the code for a brief explanation, or my own digging process. Please leave a message for guidance in case of any inaccuracy or incompleteness in the article. Thank you.
Focus on Java high concurrency, distributed architecture, more technology dry goods sharing and experience, please pay attention to the public number: Java architecture community