RocketMQ for reliable message final consistency

RocketMQ for reliable message final consistency

Business description

In this case, RocketMQ middleware is used to realize the final consistent distributed transaction of reliable messages and simulate the transfer transaction process of two accounts.
Two accounts are in different banks (Zhang San is in bank 1, Li Si is in bank 2), bank 1 and bank 2 are two micro services.
The transaction process is that Zhang San transfers the specified amount to Li Si.

In the above transaction steps, the two operations of Zhang San's deduction amount and sending a transfer message to bank 2 must be an integrated transaction.

Program components

Database: MySQL-8.X (including bank 1 and bank 2 databases)
JDK:1.8.X
Microservice framework: SpringBoot2.3, SpringCloudHoxton
RocketMQ: rocketmq-spring-boot-starter.2.0.4
Relationship between microservices and databases:
DTX / hacker bank MQ / bank1 server MQ bank 1 operation Zhang San user, connect bank1
DTX / hacker bank MQ / bank2 server MQ bank 2 operate user Li Si and connect to bank2

Create database

Bank 1, including Zhang San's account

DROP DATABASE IF  EXISTS  `bank1`;
CREATE DATABASE `bank1` CHARACTER SET 'utf8mb4';
USE `bank1`;

DROP TABLE IF EXISTS `account_info`;
CREATE TABLE `account_info` (
		id bigint ( 20 ) NOT NULL  AUTO_INCREMENT COMMENT 'Primary key',
		account_name VARCHAR ( 100 ) COMMENT 'Account name',
		account_no VARCHAR ( 100 ) COMMENT 'Account Card No',
		account_password VARCHAR ( 100 ) COMMENT 'Account password',
		account_balance DECIMAL ( 10,2) COMMENT 'Account balance',
		PRIMARY KEY ( id ) 
) COMMENT = 'Account table';

INSERT INTO `account_info` VALUES ('1' , 'Zhang San's account','1', '', 10000 );

Bank 2, including Li Si's account

DROP DATABASE IF  EXISTS  `bank2`;
CREATE DATABASE `bank2` CHARACTER SET 'utf8mb4';
USE `bank2`;

DROP TABLE IF EXISTS `account_info`;
CREATE TABLE `account_info` (
		id bigint ( 20 ) NOT NULL  AUTO_INCREMENT COMMENT 'Primary key',
		account_name VARCHAR ( 100 ) COMMENT 'Account name',
		account_no VARCHAR ( 100 ) COMMENT 'Account Card No',
		account_password VARCHAR ( 100 ) COMMENT 'Account password',
		account_balance DECIMAL ( 10,2)  COMMENT 'Account balance',
		PRIMARY KEY ( id ) 
) COMMENT = 'Account table';

INSERT INTO `account_info` VALUES ('1' , 'Li Si's account','2', '', 0);

Create de for each database_ duplication

DROP TABLE IF EXISTS `de_duplication`;
CREATE TABLE `de_duplication` (
		tx_no VARCHAR ( 64) NOT NULL COMMENT 'affair ID',
		create_time datetime COMMENT 'Creation time',
		PRIMARY KEY ( tx_no ) 
) COMMENT = 'Transaction execution log';

Construction project

Start RocketMQ

(1) Download RocketMQ server
Download address: http://mirrors.tuna.tsinghua.edu.cn/apache/rocketmq/4.5.0/rocketmq-all-4.5.0-bin- release.zip
(2) Unzip and start
Start nameserver:

set ROCKETMQ_HOME=[rocketmq server decompression path] start [rocketmq server decompression path] / bin/mqnamesrv.cmd

Start broker:

set ROCKETMQ_HOME=[rocketmq server decompression path] start [rocketmq server decompression path] / bin/mqbroker.cmd  ‐n 127.0.0.1:9876 autoCreateTopicEnable=true

**I use the rockerMQ created by the docker container**

docker-compose.yml

version: '3.5'
services:
  rmqnamesrv:
    image: foxiswho/rocketmq:server
    container_name: rmqnamesrv
    ports:
      - 9876:9876
    volumes:
      - ./data/logs:/opt/logs
      - ./data/store:/opt/store
    environment:
        JAVA_OPTS: " -Duser.home=/opt"
        JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"
    networks:
        rmq:
          aliases:
            - rmqnamesrv

  rmqbroker:
    image: foxiswho/rocketmq:broker
    container_name: rmqbroker
    ports:
      - 10909:10909
      - 10911:10911
    volumes:
      - ./data/logs:/opt/logs
      - ./data/store:/opt/store
      - ./data/brokerconf/broker.conf:/etc/rocketmq/broker.conf
    environment:
        NAMESRV_ADDR: "rmqnamesrv:9876"
        JAVA_OPTS: " -Duser.home=/opt"
        JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"
    command: mqbroker -c /etc/rocketmq/broker.conf
    depends_on:
      - rmqnamesrv
    networks:
      rmq:
        aliases:
          - rmqbroker

  rmqconsole:
    image: styletang/rocketmq-console-ng
    container_name: rmqconsole
    ports:
      - 8080:8080
    environment:
        JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
    depends_on:
      - rmqnamesrv
    networks:
      rmq:
        aliases:
          - rmqconsole

networks:
  rmq:
    name: rmq
    driver: bridge

broker.conf
RocketMQ Broker needs a configuration file. According to the above Compose configuration, we need to create a file named. / data/brokerconf / under the directory broker.conf The configuration file of is as follows:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.


# Cluster name
brokerClusterName=DefaultCluster

# Broker name. Note that different configuration files fill in different names. If you use: broker-a in broker-a.properties,
# Use: broker-b in broker-b.properties
brokerName=broker-a

# 0 for Master, > 0 for Slave
brokerId=0

# nameServer address, semicolon split
# namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876

# Start IP, if docker reports com.alibaba.rocketmq . remoting.exception.RemotingConnectException : connect to <192.168.0.120:10909> failed
# Add a sentence to solution 1 producer.setVipChannelEnabled(false);, solution 2: set the host IP of the broker IP1 instead of using the internal IP of the docker
# brokerIP1=192.168.0.253

# When sending messages, topic s that do not exist in the server are automatically created. The number of queues created by default
defaultTopicQueueNums=4

# Whether Broker is allowed to automatically create Topic? It is recommended to open offline and close online!!! Look at it carefully. It's false, false, false
autoCreateTopicEnable=true

# Whether Broker is allowed to automatically create subscription groups. It is recommended to enable offline and close Online
autoCreateSubscriptionGroup=true

# Broker listening port for external services
listenPort=10911

# Delete file at 4 a.m. by default
deleteWhen=04

# File retention time, default 48 hours
fileReservedTime=120

# commitLog the size of each file is 1G by default
mapedFileSizeCommitLog=1073741824

# Each file of ConsumeQueue saves 30W by default, which is adjusted according to the business situation
mapedFileSizeConsumeQueue=300000

# destroyMapedFileIntervalForcibly=120000
# redeleteHangedFileInterval=120000
# Detect physical file disk space
diskMaxUsedSpaceRatio=88
# Storage path
# storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
# commitLog storage path
# storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
# Consumption queue storage
# storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
# Message index storage path
# storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
# checkpoint file storage path
# storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
# abort file storage path
# abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
# Message size limit
maxMessageSize=65536

# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000

# Broker's role
# - ASYNC_MASTER asynchronous replication master
# - SYNC_MASTER synchronous dual write master
# - SLAVE
brokerRole=ASYNC_MASTER

# Brush disk mode
# - ASYNC_FLUSH asynchronous disc brush
# - SYNC_FLUSH synchronous brush disk
flushDiskType=ASYNC_FLUSH

# Number of message thread pools
# sendMessageThreadPoolNums=128
# Number of pull message thread pools
# pullMessageThreadPoolNums=128

RocketMQ console
visit http://rmqIP:8080 log in to the console

Construction project

  1. pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.wry.dtx</groupId>
    <artifactId>bank1-server-mq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>bank1-server-mq</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Hoxton.SR5</spring-cloud.version>
        <spring-cloud-alibaba.version>2.2.0.RELEASE</spring-cloud-alibaba.version>
        <mybatis-plus.version>3.3.2</mybatis-plus.version>
        <mysql.version>8.0.19</mysql.version>
        <rocketmq.version>2.0.4</rocketmq.version>
    </properties>

    <dependencies>
        <!--rocketmq-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>${rocketmq.version}</version>
        </dependency>
        <!--web-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>${mybatis-plus.version}</version>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>com.alibaba.cloud</groupId>
                <artifactId>spring-cloud-alibaba-dependencies</artifactId>
                <version>${spring-cloud-alibaba.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

  1. application.yml
server:
  port: 8005

spring:
  application:
    name: bank1-server-mq
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://39.96.3.100:3306/bank1?allowPublicKeyRetrieval=true&useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=UTC&nullCatalogMeansCurrent=true
    username: root
    password: 123456

logging:
  level:
    root: info

mybatis-plus:
  mapper-locations: classpath:/mapper/*.xml
  typeAliasesPackage: com.wry.dtx.bank1.entity
  global-config:
    db-config:
      field-strategy: not-empty
      id-type: auto
      db-type: mysql

ribbon:
  ReadTimeout: 3000
  ConnectTimeout: 3000

rocketmq:
  name-server: 39.96.3.100:9876
  producer:
    group: producer_bank1

  1. Startup class
package com.wry.dtx.bank1;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Bank1ServerMqApplication {

    public static void main(String[] args) {
        SpringApplication.run(Bank1ServerMqApplication.class, args);
    }

}

For specific code, please refer to Github: https://github.com/hobbyWang/DistributedTransaction/tree/master/rocket-bank-mq

Summary

The final consistency of reliable messages is to ensure the consistency of messages from the producer through the message middleware to the consumer. In this case, RocketMQ is used as the message middleware,
It mainly solves two functions:
1. Atomicity of local transaction and message sending.
2. Reliability of messages received by transaction participants.

Reliable message final consistent transactions are suitable for scenarios with long execution cycle and low real-time requirements. After the introduction of message mechanism, the synchronous transaction operation becomes asynchronous operation based on message execution, which avoids the influence of synchronous blocking operation in distributed transaction, and realizes the decoupling of two services.

Keywords: Spring MySQL Apache Database

Added by legomez2000 on Sun, 21 Jun 2020 09:31:11 +0300