rocketmq Source Resolution End Transaction Processor II

Said ahead

End transaction manager

Source code analysis
Return to this method and send the final message successfully deleting the prepared transaction submission message, org. apache. rocketmq. broker. transaction. queue. Transactional Message Service Impl # deletePrepareMessage

 @Override
    public boolean deletePrepareMessage(MessageExt msgExt) {
//        =>
        if (this.transactionalMessageBridge.putOpMessage(msgExt, TransactionalMessageUtil.REMOVETAG)) {
            log.info("Transaction op message write successfully. messageId={}, queueId={} msgExt:{}", msgExt.getMsgId(), msgExt.getQueueId(), msgExt);
            return true;
        } else {
            log.error("Transaction op message write failed. messageId is {}, queueId is {}", msgExt.getMsgId(), msgExt.getQueueId());
            return false;
        }
    }

Enter this method, org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#putOpMessage

    public boolean putOpMessage(MessageExt messageExt, String opType) {
        MessageQueue messageQueue = new MessageQueue(messageExt.getTopic(),
            this.brokerController.getBrokerConfig().getBrokerName(), messageExt.getQueueId());
        if (TransactionalMessageUtil.REMOVETAG.equals(opType)) {
//            Add Delete Marker= when Transaction Message Submits or Rolls Back
            return addRemoveTagInTransactionOp(messageExt, messageQueue);
        }
        return true;
    }

Enter this method by adding deletion tags for transaction message submission or rollback, org. apache. rocketmq. broker. transaction. queue. Transactional MessageBridge # addRemoveTagInTransactionOp

 private boolean addRemoveTagInTransactionOp(MessageExt messageExt, MessageQueue messageQueue) {
        Message message = new Message(TransactionalMessageUtil.buildOpTopic(), TransactionalMessageUtil.REMOVETAG,
            String.valueOf(messageExt.getQueueOffset()).getBytes(TransactionalMessageUtil.charset));
        writeOp(message, messageQueue);
        return true;
    }

Enter this method, org. apache. rocketmq. broker. transaction. queue. Transactional MessageBridge # writeOp

  private void writeOp(Message message, MessageQueue mq) {
        MessageQueue opQueue;
        if (opQueueMap.containsKey(mq)) {
            opQueue = opQueueMap.get(mq);
        } else {
            opQueue = getOpQueueByHalf(mq);
            MessageQueue oldQueue = opQueueMap.putIfAbsent(mq, opQueue);
            if (oldQueue != null) {
                opQueue = oldQueue;
            }
        }
        if (opQueue == null) {
            opQueue = new MessageQueue(TransactionalMessageUtil.buildOpTopic(), mq.getBrokerName(), mq.getQueueId());
        }
//        Store message
        putMessage(makeOpMessageInner(message, opQueue));
    }

Enter this method, as described earlier in org. apache. rocketmq. broker. transaction. queue. Transactional MessageBridge putMessage.

Back to this method, transaction message rollback, org. apache. rocketmq. broker. transaction. queue. Transactional Message Service Impl# rollbackMessage

  @Override
    public OperationResult rollbackMessage(EndTransactionRequestHeader requestHeader) {
        return getHalfMessageByOffset(requestHeader.getCommitLogOffset());
    }

Enter this method, org. apache. rocketmq. broker. transaction. queue. Transactional Message Service Impl # getHalfMessageByOffset

 private OperationResult getHalfMessageByOffset(long commitLogOffset) {
        OperationResult response = new OperationResult();
//        Query the message according to offset =
        MessageExt messageExt = this.transactionalMessageBridge.lookMessageByOffset(commitLogOffset);
        if (messageExt != null) {
            response.setPrepareMessage(messageExt);
            response.setResponseCode(ResponseCode.SUCCESS);
        } else {
            response.setResponseCode(ResponseCode.SYSTEM_ERROR);
            response.setResponseRemark("Find prepared transaction message failed");
        }
        return response;
    }

Enter this method, according to offset query message, as described earlier in org. apache. rocketmq. store. DefaultMessageStore #look MessageByOffset (long).

Back to this method, the transaction message rollback successfully deletes the transaction message ready to commit, as described earlier in org. apache. rocketmq. broker. transaction. queue. Transactional Message Service Impl # deletePrepareMessage.

Back to this method, org. apache. rocketmq. broker. processor. EndTransaction Processor processRequest ends.

Said at the end
This analysis only represents personal views, for reference only.

Nail Technology Group

Keywords: Apache

Added by regoch on Tue, 01 Oct 2019 13:35:45 +0300