RocketMQ for reliable message final consistency
Business description
In this case, RocketMQ middleware is used to realize the final consistent distributed transaction of reliable messages and simulate the transfer transaction process of two accounts.
Two accounts are in different banks (Zhang San is in bank 1, Li Si is in bank 2), bank 1 and bank 2 are two micro services.
The transaction process is that Zhang San transfers the specified amount to Li Si.
In the above transaction steps, the two operations of Zhang San's deduction amount and sending a transfer message to bank 2 must be an integrated transaction.
Program components
Database: MySQL-8.X (including bank 1 and bank 2 databases)
JDK:1.8.X
Microservice framework: SpringBoot2.3, SpringCloudHoxton
RocketMQ: rocketmq-spring-boot-starter.2.0.4
Relationship between microservices and databases:
DTX / hacker bank MQ / bank1 server MQ bank 1 operation Zhang San user, connect bank1
DTX / hacker bank MQ / bank2 server MQ bank 2 operate user Li Si and connect to bank2
Create database
Bank 1, including Zhang San's account
DROP DATABASE IF EXISTS `bank1`; CREATE DATABASE `bank1` CHARACTER SET 'utf8mb4';
USE `bank1`; DROP TABLE IF EXISTS `account_info`; CREATE TABLE `account_info` ( id bigint ( 20 ) NOT NULL AUTO_INCREMENT COMMENT 'Primary key', account_name VARCHAR ( 100 ) COMMENT 'Account name', account_no VARCHAR ( 100 ) COMMENT 'Account Card No', account_password VARCHAR ( 100 ) COMMENT 'Account password', account_balance DECIMAL ( 10,2) COMMENT 'Account balance', PRIMARY KEY ( id ) ) COMMENT = 'Account table'; INSERT INTO `account_info` VALUES ('1' , 'Zhang San's account','1', '', 10000 );
Bank 2, including Li Si's account
DROP DATABASE IF EXISTS `bank2`; CREATE DATABASE `bank2` CHARACTER SET 'utf8mb4';
USE `bank2`; DROP TABLE IF EXISTS `account_info`; CREATE TABLE `account_info` ( id bigint ( 20 ) NOT NULL AUTO_INCREMENT COMMENT 'Primary key', account_name VARCHAR ( 100 ) COMMENT 'Account name', account_no VARCHAR ( 100 ) COMMENT 'Account Card No', account_password VARCHAR ( 100 ) COMMENT 'Account password', account_balance DECIMAL ( 10,2) COMMENT 'Account balance', PRIMARY KEY ( id ) ) COMMENT = 'Account table'; INSERT INTO `account_info` VALUES ('1' , 'Li Si's account','2', '', 0);
Create de for each database_ duplication
DROP TABLE IF EXISTS `de_duplication`; CREATE TABLE `de_duplication` ( tx_no VARCHAR ( 64) NOT NULL COMMENT 'affair ID', create_time datetime COMMENT 'Creation time', PRIMARY KEY ( tx_no ) ) COMMENT = 'Transaction execution log';
Construction project
Start RocketMQ
(1) Download RocketMQ server
Download address: http://mirrors.tuna.tsinghua.edu.cn/apache/rocketmq/4.5.0/rocketmq-all-4.5.0-bin- release.zip
(2) Unzip and start
Start nameserver:
set ROCKETMQ_HOME=[rocketmq server decompression path] start [rocketmq server decompression path] / bin/mqnamesrv.cmd
Start broker:
set ROCKETMQ_HOME=[rocketmq server decompression path] start [rocketmq server decompression path] / bin/mqbroker.cmd ‐n 127.0.0.1:9876 autoCreateTopicEnable=true
**I use the rockerMQ created by the docker container**
docker-compose.yml
version: '3.5' services: rmqnamesrv: image: foxiswho/rocketmq:server container_name: rmqnamesrv ports: - 9876:9876 volumes: - ./data/logs:/opt/logs - ./data/store:/opt/store environment: JAVA_OPTS: " -Duser.home=/opt" JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m" networks: rmq: aliases: - rmqnamesrv rmqbroker: image: foxiswho/rocketmq:broker container_name: rmqbroker ports: - 10909:10909 - 10911:10911 volumes: - ./data/logs:/opt/logs - ./data/store:/opt/store - ./data/brokerconf/broker.conf:/etc/rocketmq/broker.conf environment: NAMESRV_ADDR: "rmqnamesrv:9876" JAVA_OPTS: " -Duser.home=/opt" JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m" command: mqbroker -c /etc/rocketmq/broker.conf depends_on: - rmqnamesrv networks: rmq: aliases: - rmqbroker rmqconsole: image: styletang/rocketmq-console-ng container_name: rmqconsole ports: - 8080:8080 environment: JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" depends_on: - rmqnamesrv networks: rmq: aliases: - rmqconsole networks: rmq: name: rmq driver: bridge
broker.conf
RocketMQ Broker needs a configuration file. According to the above Compose configuration, we need to create a file named. / data/brokerconf / under the directory broker.conf The configuration file of is as follows:
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # Cluster name brokerClusterName=DefaultCluster # Broker name. Note that different configuration files fill in different names. If you use: broker-a in broker-a.properties, # Use: broker-b in broker-b.properties brokerName=broker-a # 0 for Master, > 0 for Slave brokerId=0 # nameServer address, semicolon split # namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 # Start IP, if docker reports com.alibaba.rocketmq . remoting.exception.RemotingConnectException : connect to <192.168.0.120:10909> failed # Add a sentence to solution 1 producer.setVipChannelEnabled(false);, solution 2: set the host IP of the broker IP1 instead of using the internal IP of the docker # brokerIP1=192.168.0.253 # When sending messages, topic s that do not exist in the server are automatically created. The number of queues created by default defaultTopicQueueNums=4 # Whether Broker is allowed to automatically create Topic? It is recommended to open offline and close online!!! Look at it carefully. It's false, false, false autoCreateTopicEnable=true # Whether Broker is allowed to automatically create subscription groups. It is recommended to enable offline and close Online autoCreateSubscriptionGroup=true # Broker listening port for external services listenPort=10911 # Delete file at 4 a.m. by default deleteWhen=04 # File retention time, default 48 hours fileReservedTime=120 # commitLog the size of each file is 1G by default mapedFileSizeCommitLog=1073741824 # Each file of ConsumeQueue saves 30W by default, which is adjusted according to the business situation mapedFileSizeConsumeQueue=300000 # destroyMapedFileIntervalForcibly=120000 # redeleteHangedFileInterval=120000 # Detect physical file disk space diskMaxUsedSpaceRatio=88 # Storage path # storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store # commitLog storage path # storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog # Consumption queue storage # storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue # Message index storage path # storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index # checkpoint file storage path # storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint # abort file storage path # abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort # Message size limit maxMessageSize=65536 # flushCommitLogLeastPages=4 # flushConsumeQueueLeastPages=2 # flushCommitLogThoroughInterval=10000 # flushConsumeQueueThoroughInterval=60000 # Broker's role # - ASYNC_MASTER asynchronous replication master # - SYNC_MASTER synchronous dual write master # - SLAVE brokerRole=ASYNC_MASTER # Brush disk mode # - ASYNC_FLUSH asynchronous disc brush # - SYNC_FLUSH synchronous brush disk flushDiskType=ASYNC_FLUSH # Number of message thread pools # sendMessageThreadPoolNums=128 # Number of pull message thread pools # pullMessageThreadPoolNums=128
RocketMQ console
visit http://rmqIP:8080 log in to the console
Construction project
- pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.1.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.wry.dtx</groupId> <artifactId>bank1-server-mq</artifactId> <version>0.0.1-SNAPSHOT</version> <name>bank1-server-mq</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> <spring-cloud.version>Hoxton.SR5</spring-cloud.version> <spring-cloud-alibaba.version>2.2.0.RELEASE</spring-cloud-alibaba.version> <mybatis-plus.version>3.3.2</mybatis-plus.version> <mysql.version>8.0.19</mysql.version> <rocketmq.version>2.0.4</rocketmq.version> </properties> <dependencies> <!--rocketmq--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>${rocketmq.version}</version> </dependency> <!--web--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.version}</version> </dependency> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>${mybatis-plus.version}</version> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-dependencies</artifactId> <version>${spring-cloud-alibaba.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
- application.yml
server: port: 8005 spring: application: name: bank1-server-mq datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://39.96.3.100:3306/bank1?allowPublicKeyRetrieval=true&useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=UTC&nullCatalogMeansCurrent=true username: root password: 123456 logging: level: root: info mybatis-plus: mapper-locations: classpath:/mapper/*.xml typeAliasesPackage: com.wry.dtx.bank1.entity global-config: db-config: field-strategy: not-empty id-type: auto db-type: mysql ribbon: ReadTimeout: 3000 ConnectTimeout: 3000 rocketmq: name-server: 39.96.3.100:9876 producer: group: producer_bank1
- Startup class
package com.wry.dtx.bank1; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class Bank1ServerMqApplication { public static void main(String[] args) { SpringApplication.run(Bank1ServerMqApplication.class, args); } }
For specific code, please refer to Github: https://github.com/hobbyWang/DistributedTransaction/tree/master/rocket-bank-mq
Summary
The final consistency of reliable messages is to ensure the consistency of messages from the producer through the message middleware to the consumer. In this case, RocketMQ is used as the message middleware,
It mainly solves two functions:
1. Atomicity of local transaction and message sending.
2. Reliability of messages received by transaction participants.
Reliable message final consistent transactions are suitable for scenarios with long execution cycle and low real-time requirements. After the introduction of message mechanism, the synchronous transaction operation becomes asynchronous operation based on message execution, which avoids the influence of synchronous blocking operation in distributed transaction, and realizes the decoupling of two services.