How to ensure consistency:
RocketMQ Solves Distributed Transactions (Reliable Message Final Consistency Scheme)
1. System A sends a prepared message to MQ. If the prepared message fails, cancel the operation directly and do not execute.
2. If the message is sent successfully, then execute Local Transaction. If it is successful, tell MQ to send confirmation message. If it fails, tell MQ to send rollback message.
3. If the confirmation message is sent, then the B system receives the confirmation message and executes the local transaction.
4. Step 2 above, sending confirmation or rollback message failed due to network reasons, but broker has polling mechanism. MQ will automatically poll all prepared messages to call back your interface (checkLocal Transaction) according to the unique id to query the status of local transactions. Ask you if the message failed to process local transactions. Do you want to continue retrying or rollback all messages that did not send confirmation? In the first edition, you can check the database to see if the local transaction was executed before. If it rolled back, then it rolled back here. This is to avoid the possibility that the local transaction has been successfully executed and the acknowledgement message has failed to send.
Order-Inventory-Distributed Transaction
Here, an example is given to illustrate how RocketMQ implements the specific encoding of distributed transactions.
Scenario: In the order placing scenario, the order service generates an order. When the order is successfully paid, the order status is modified and the inventory service is notified to deduct the inventory.
Database design:
CREATE TABLE `yzy_order` ( `id` int(11) NOT NULL, `order_id` varchar(100) NOT NULL DEFAULT '' COMMENT 'Order id', `buy_num` int(11) DEFAULT NULL COMMENT 'Purchase Quantity', `good_id` int(11) DEFAULT NULL COMMENT 'commodity ID', `user_id` int(11) DEFAULT NULL COMMENT 'user ID', `pay_status` int(11) DEFAULT NULL COMMENT 'Payment status, 0: no payment, 1: paid', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci CREATE TABLE `yzy_repo` ( `id` int(11) NOT NULL AUTO_INCREMENT, `good_name` varchar(100) NOT NULL DEFAULT '' COMMENT 'Name of commodity', `num` int(11) NOT NULL DEFAULT '0' COMMENT 'Inventory Quantity', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='Testing, Inventory Tables'
Begin actual combat
Main methods of order service
package com.transaction.order; import com.alibaba.dubbo.config.annotation.Reference; import com.transaction.repository.IRepositoryService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Service; import org.springframework.web.client.RestTemplate; import java.util.List; @Service public class OrderService { @Autowired OrderDao orderDao; public final int PAY_DONE = 1; /** * Check whether the order exists and the status is payment completed **/ public boolean checkOrderPaySuccess(String orderId){ List<YzyOrder> allOrders = orderDao.findAll(); return allOrders.stream() .anyMatch(order -> order.getOrderId().equals(orderId) && order.getPayStatus() == PAY_DONE); } /** * Updating orders is done for payment **/ public void updatePayStatusByOrderId(String orderId){ orderDao.updatePayStatusByOrderId(orderId, PAY_DONE); } /** * Generate orders, status defaults to unpaid **/ public void save(String orderId, int num, int goodId,int userId) { YzyOrder yzyOrder = new YzyOrder(); yzyOrder.setOrderId(orderId); yzyOrder.setBuyNum(num); yzyOrder.setGoodId(goodId); yzyOrder.setUserId(userId); orderDao.save(yzyOrder); } }
operation flow
1. Create a status in the order form that is an unpaid order
Curl'127.0.0.1:8081/order/save? Num = 2 & good_id = 1 & user_id = 1001'
/** * Generating Order Interface * @param num * @param goodId * @param userId * @return */ @GetMapping("save") public String makeOrder( @RequestParam("num") int num, @RequestParam("good_id") int goodId, @RequestParam("user_id") int userId) { orderService.save(UUID.randomUUID().toString(), num, goodId,userId); return "success"; }
2. User pays and notifies inventory service to deduct inventory through MQ
OrderController:pay sends the MQ transaction message that the order payment succeeds. Note here that it is not a direct call to OrderService::updatePayStatusByOrderId and then sends the ordinary MQ message. Instead, it sends a transaction message to MQ first, and then MQ calls back the TransactionListener::executeLocalTransaction of the order service, where it completes the update of the order status to ensure the consistency of sending transaction messages and updating the order status.
@GetMapping("pay") public String pay(@RequestParam("order_id") String orderId) throws UnsupportedEncodingException, MQClientException, JsonProcessingException { transactionProducer.sendOrderPaySucessEvent(orderId); return "success"; }
3. Transaction message listener on order server
@Component public class TransactionProducer implements InitializingBean { private TransactionMQProducer producer; @Autowired private OrderService orderService; @Autowired private OrderDao orderDao; @Override public void afterPropertiesSet() throws Exception { producer = new TransactionMQProducer("order-pay-group"); producer.setNamesrvAddr("mq01.stag.kk.srv:9876;mq02.stag.kk.srv:9876"); ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("transaction-thread-name-%s").build(); ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30), threadFactory); producer.setExecutorService(executor); //Setting up callbacks for sending messages producer.setTransactionListener(new TransactionListener() { /** * Judging whether to execute a local transaction based on the result of message sending * * Callback to this method indicates that the message has been successfully sent to MQ and the order status can be updated to "Payment Success" */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { // Determine whether a transaction message needs commit and rollback based on whether the local transaction is executed or not ObjectMapper objectMapper = new ObjectMapper(); LocalTransactionState state = LocalTransactionState.UNKNOW; try { OrderRecord record = objectMapper.readValue(msg.getBody(), OrderRecord.class); //MQ has received the transaction message sent by the TransactionProducer send method, and the local transaction is executed below. //Locally record order information orderService.updatePayStatusByOrderId(record.getOrderId()); state = LocalTransactionState.COMMIT_MESSAGE; } catch (UnsupportedEncodingException e) { e.printStackTrace(); state = LocalTransactionState.ROLLBACK_MESSAGE; } catch (IOException e) { e.printStackTrace(); state = LocalTransactionState.ROLLBACK_MESSAGE; } return state; } /** * RocketMQ Callback tells broker whether the message was delivered successfully based on whether the local transaction was successfully executed * @return */ @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { ObjectMapper objectMapper = new ObjectMapper(); LocalTransactionState state = LocalTransactionState.UNKNOW; OrderRecord record = null; try { record = objectMapper.readValue(msg.getBody(), OrderRecord.class); } catch (IOException e) { e.printStackTrace(); } try { //Determine whether the transaction is successful based on whether transaction_id corresponds to the transfer record boolean isLocalSuccess = orderService.checkOrderPaySuccess(record.getOrderId()); if (isLocalSuccess) { state = LocalTransactionState.COMMIT_MESSAGE; } else { state = LocalTransactionState.ROLLBACK_MESSAGE; } } catch (Exception e) { e.printStackTrace(); } return state; } }); producer.start(); } public void sendOrderPaySucessEvent(String orderId) throws JsonProcessingException, UnsupportedEncodingException, MQClientException { ObjectMapper objectMapper = new ObjectMapper(); YzyOrder order = orderDao.findAll().stream() .filter(item->item.getOrderId().equals(orderId)) .collect(Collectors.toList()).get(0); if(order == null){ System.out.println("not found order " + orderId); } // Construct the sent transaction message OrderRecord record = new OrderRecord(); record.setUserId(order.getUserId()); record.setOrderId(orderId); record.setBuyNum(order.getBuyNum()); record.setPayStatus(order.getPayStatus()); record.setGoodId(order.getGoodId()); Message message = new Message("Order-Success", "", record.getOrderId(), objectMapper.writeValueAsString(record).getBytes(RemotingHelper.DEFAULT_CHARSET)); TransactionSendResult result = producer.sendMessageInTransaction(message, null); System.out.println("Send Transaction Messages ,orderId = " + record.getOrderId() + " " + result.toString()); } }
4. Inventory service deducts inventory
Attention should be paid to the following issues:
1. Deduction of inventory should be prevented from being deducted into negative numbers in case of concurrency.
2. To update inventory by select ing first and updating later, we need to add distributed locks or database optimistic locks. The UPDATE statement needs to be idempotent.
UPDATE t_yue SET money=$new_money WHERE id=$good_id AND money=$old_money;
3. Pay attention to consumptive idempotency through msgId or orderId
@Override public int reduce(Integer buyNum, Integer goodId) { //In the case of concurrency, there are three solutions to prevent inventory from being deducted into negative numbers //1. select for update (must be placed in a transaction) //2. This logic plus distributed locks //3. Add a version field to the database, optimistic lock while (true){ Optional<YzyRepo> repoOption = repositoryDao.findById(goodId); if (!repoOption.isPresent()) { return 0; } YzyRepo repo = repoOption.get(); //Avoid reducing database inventory deduction to zero if (repo.getNum() - buyNum < 0) { return -1; } repo.setNum(repo.getNum() - buyNum); int affect = repositoryDao.updateGoodNum(repo.getNum() - buyNum, repo.getNum(), goodId); if(affect > 0){ return affect; } } }