Actual RocketMQ Solves Distributed Transaction Problem

How to ensure consistency:

RocketMQ Solves Distributed Transactions (Reliable Message Final Consistency Scheme)

1. System A sends a prepared message to MQ. If the prepared message fails, cancel the operation directly and do not execute.

2. If the message is sent successfully, then execute Local Transaction. If it is successful, tell MQ to send confirmation message. If it fails, tell MQ to send rollback message.

3. If the confirmation message is sent, then the B system receives the confirmation message and executes the local transaction.

4. Step 2 above, sending confirmation or rollback message failed due to network reasons, but broker has polling mechanism. MQ will automatically poll all prepared messages to call back your interface (checkLocal Transaction) according to the unique id to query the status of local transactions. Ask you if the message failed to process local transactions. Do you want to continue retrying or rollback all messages that did not send confirmation? In the first edition, you can check the database to see if the local transaction was executed before. If it rolled back, then it rolled back here. This is to avoid the possibility that the local transaction has been successfully executed and the acknowledgement message has failed to send.

 

Order-Inventory-Distributed Transaction

Here, an example is given to illustrate how RocketMQ implements the specific encoding of distributed transactions.

Scenario: In the order placing scenario, the order service generates an order. When the order is successfully paid, the order status is modified and the inventory service is notified to deduct the inventory.

Database design:

CREATE TABLE `yzy_order` (
  `id` int(11) NOT NULL,
  `order_id` varchar(100) NOT NULL DEFAULT '' COMMENT 'Order id',
  `buy_num` int(11) DEFAULT NULL COMMENT 'Purchase Quantity',
  `good_id` int(11) DEFAULT NULL COMMENT 'commodity ID',
  `user_id` int(11) DEFAULT NULL COMMENT 'user ID',
  `pay_status` int(11) DEFAULT NULL COMMENT 'Payment status, 0: no payment, 1: paid',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci




CREATE TABLE `yzy_repo` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `good_name` varchar(100) NOT NULL DEFAULT '' COMMENT 'Name of commodity',
  `num` int(11) NOT NULL DEFAULT '0' COMMENT 'Inventory Quantity',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='Testing, Inventory Tables'




Begin actual combat

Main methods of order service

package com.transaction.order;

import com.alibaba.dubbo.config.annotation.Reference;
import com.transaction.repository.IRepositoryService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

import java.util.List;

@Service
public class OrderService {
    @Autowired
    OrderDao orderDao;

    public final int PAY_DONE = 1;

    /**
     *  Check whether the order exists and the status is payment completed
    **/
    public boolean checkOrderPaySuccess(String orderId){
        List<YzyOrder> allOrders = orderDao.findAll();
        return  allOrders.stream()
                .anyMatch(order -> order.getOrderId().equals(orderId) && order.getPayStatus() == PAY_DONE);
    }

 /**
     *  Updating orders is done for payment
    **/
    public void updatePayStatusByOrderId(String orderId){
        orderDao.updatePayStatusByOrderId(orderId, PAY_DONE);
    }

 /**
     *  Generate orders, status defaults to unpaid
    **/

    public void save(String orderId, int num, int goodId,int userId) {

        YzyOrder yzyOrder = new YzyOrder();
        yzyOrder.setOrderId(orderId);
        yzyOrder.setBuyNum(num);
        yzyOrder.setGoodId(goodId);
        yzyOrder.setUserId(userId);

        orderDao.save(yzyOrder);
    }
}

 

operation flow

1. Create a status in the order form that is an unpaid order

Curl'127.0.0.1:8081/order/save? Num = 2 & good_id = 1 & user_id = 1001'

 /**
     * Generating Order Interface
     * @param num
     * @param goodId
     * @param userId
     * @return
     */
    @GetMapping("save")
    public String makeOrder(
            @RequestParam("num") int num,
            @RequestParam("good_id") int goodId,
            @RequestParam("user_id") int userId) {

        orderService.save(UUID.randomUUID().toString(), num, goodId,userId);
        return "success";
    }

2. User pays and notifies inventory service to deduct inventory through MQ

OrderController:pay sends the MQ transaction message that the order payment succeeds. Note here that it is not a direct call to OrderService::updatePayStatusByOrderId and then sends the ordinary MQ message. Instead, it sends a transaction message to MQ first, and then MQ calls back the TransactionListener::executeLocalTransaction of the order service, where it completes the update of the order status to ensure the consistency of sending transaction messages and updating the order status.

  @GetMapping("pay")
    public String pay(@RequestParam("order_id") String orderId)
            throws UnsupportedEncodingException, MQClientException, JsonProcessingException {
        transactionProducer.sendOrderPaySucessEvent(orderId);
        return "success";
    }

 

3. Transaction message listener on order server

@Component
public class TransactionProducer implements InitializingBean {

    private TransactionMQProducer producer;

    @Autowired
    private OrderService orderService;

    @Autowired
    private OrderDao orderDao;

    @Override
    public void afterPropertiesSet() throws Exception {
        producer = new TransactionMQProducer("order-pay-group");
        producer.setNamesrvAddr("mq01.stag.kk.srv:9876;mq02.stag.kk.srv:9876");
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("transaction-thread-name-%s").build();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 60,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(30), threadFactory);
        producer.setExecutorService(executor);
        //Setting up callbacks for sending messages
        producer.setTransactionListener(new TransactionListener() {
            /**
             * Judging whether to execute a local transaction based on the result of message sending
             *
             * Callback to this method indicates that the message has been successfully sent to MQ and the order status can be updated to "Payment Success"
             */
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                // Determine whether a transaction message needs commit and rollback based on whether the local transaction is executed or not
                ObjectMapper objectMapper = new ObjectMapper();
                LocalTransactionState state = LocalTransactionState.UNKNOW;
                try {
                    OrderRecord record = objectMapper.readValue(msg.getBody(), OrderRecord.class);

                    //MQ has received the transaction message sent by the TransactionProducer send method, and the local transaction is executed below.
                    //Locally record order information
                    orderService.updatePayStatusByOrderId(record.getOrderId());

                    state = LocalTransactionState.COMMIT_MESSAGE;
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                    state = LocalTransactionState.ROLLBACK_MESSAGE;
                } catch (IOException e) {
                    e.printStackTrace();
                    state = LocalTransactionState.ROLLBACK_MESSAGE;
                }
                return state;
            }
            /**
             * RocketMQ Callback tells broker whether the message was delivered successfully based on whether the local transaction was successfully executed
             * @return
             */
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                ObjectMapper objectMapper = new ObjectMapper();
                LocalTransactionState state = LocalTransactionState.UNKNOW;
                OrderRecord record = null;
                try {
                    record = objectMapper.readValue(msg.getBody(), OrderRecord.class);
                } catch (IOException e) {
                    e.printStackTrace();
                }
                try {
                    //Determine whether the transaction is successful based on whether transaction_id corresponds to the transfer record
                    boolean isLocalSuccess = orderService.checkOrderPaySuccess(record.getOrderId());

                    if (isLocalSuccess) {
                        state = LocalTransactionState.COMMIT_MESSAGE;
                    } else {
                        state = LocalTransactionState.ROLLBACK_MESSAGE;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return state;
            }
        });
        producer.start();
    }

    public void sendOrderPaySucessEvent(String orderId) throws JsonProcessingException, UnsupportedEncodingException, MQClientException {
        ObjectMapper objectMapper = new ObjectMapper();
        YzyOrder order = orderDao.findAll().stream()
                .filter(item->item.getOrderId().equals(orderId))
                .collect(Collectors.toList()).get(0);
        if(order == null){
            System.out.println("not found order " + orderId);
        }
        // Construct the sent transaction message
        OrderRecord record = new OrderRecord();
        record.setUserId(order.getUserId());
        record.setOrderId(orderId);
        record.setBuyNum(order.getBuyNum());
        record.setPayStatus(order.getPayStatus());
        record.setGoodId(order.getGoodId());

        Message message = new Message("Order-Success", "", record.getOrderId(),
                objectMapper.writeValueAsString(record).getBytes(RemotingHelper.DEFAULT_CHARSET));

        TransactionSendResult result = producer.sendMessageInTransaction(message, null);
        System.out.println("Send Transaction Messages ,orderId = " + record.getOrderId() + " " + result.toString());
    }
}

 

4. Inventory service deducts inventory

Attention should be paid to the following issues:

1. Deduction of inventory should be prevented from being deducted into negative numbers in case of concurrency.

2. To update inventory by select ing first and updating later, we need to add distributed locks or database optimistic locks. The UPDATE statement needs to be idempotent.

   UPDATE t_yue SET money=$new_money WHERE id=$good_id AND money=$old_money;

3. Pay attention to consumptive idempotency through msgId or orderId

 @Override
    public int reduce(Integer buyNum, Integer goodId) {

        //In the case of concurrency, there are three solutions to prevent inventory from being deducted into negative numbers
        //1. select for update (must be placed in a transaction)
        //2. This logic plus distributed locks
        //3. Add a version field to the database, optimistic lock

        while (true){
            Optional<YzyRepo> repoOption = repositoryDao.findById(goodId);
            if (!repoOption.isPresent()) {
                return 0;
            }

            YzyRepo repo = repoOption.get();

            //Avoid reducing database inventory deduction to zero
            if (repo.getNum() - buyNum < 0) {
                return -1;
            }
            repo.setNum(repo.getNum() - buyNum);

            int affect = repositoryDao.updateGoodNum(repo.getNum() - buyNum, repo.getNum(), goodId);
            if(affect > 0){
                return affect;
            }
        }
    }

Keywords: Programming Database network encoding Dubbo

Added by tomm098 on Fri, 06 Sep 2019 10:51:18 +0300