Source Details Seata-Client principles and processes for distributed transactions

Preface

Distributed transaction is a problem that must be solved in distributed systems. At present, the final consistency scheme is used mostly.Since Ali opened Fescar at the beginning of this year (renamed Seata in early April), the project has received tremendous attention and is now approaching 8000 Star s. Seata Distributed transaction challenges in the field of microservices are being addressed with the goal of high performance and zero intrusion and are currently in rapid iteration, with a small recent goal of producing an available version of Mysql.

This paper mainly builds a distributed system demo based on the structure of spring cloud + spring jpa + spring cloud alibaba fescar + mysql + seata. Through debug log and source code of seata, this paper analyses its workflow and principle from the perspective of client side (RM, TM).(Example project: https://github.com/fescar-group/fescar-samples/tree/master/springcloud-jpa-seata)

To better understand the full text, let's familiarize ourselves with the concepts:

  • XID: Unique identification of a global transaction, consisting of ip:port:sequence;
  • Transaction Coordinator (TC): A transaction coordinator that maintains the running state of a global transaction, coordinates and drives the submission or rollback of a global transaction;
  • Transaction Manager (TM): Controls the boundaries of a global transaction, opens a global transaction, and ultimately initiates a global commit or rollback resolution;
  • Resource Manager (RM): Controls branch transactions, is responsible for branch registration, status reporting, and receives instructions from the transaction coordinator to drive the submission and rollback of branch (local) transactions;

Tip: The code in this article is based on fescar-0.4.1. Since the project has just been renamed seata shortly, some of the package names, class names, jar packages and other names have not been uniformly replaced, fescar is still used to describe them below.

Distributed Framework Support

Fescar uses XID to represent a distributed transaction, which needs to be passed through the system involved in a distributed transaction request to send the processing of branch transactions to feacar-server and receive commit and rollback instructions from feacar-server.Fescar officially supports the full version of the dubbo protocol and provides implementation for the spring cloud (spring-boot) distributed project community

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-alibaba-fescar</artifactId>
    <version>2.1.0.BUILD-SNAPSHOT</version>
</dependency>

This component implements XID delivery when communicating based on RestTemplate and Feign.

Business logic

Business logic is the classic order placing, balance deduction, inventory reduction process.The module is divided into three separate services that connect to the corresponding database:

  • Order: order-server
  • Account: account-server
  • Inventory: storage-server

There are also business systems that initiate distributed transactions:

  • Business: business-server

The project structure is as follows

Normal Business:

  1. business Initiates Purchase Request
  2. storage deducts inventory
  3. order Create order
  4. account deduction balance

Exceptional Business:

  1. business Initiates Purchase Request
  2. storage deducts inventory
  3. order Create order
  4. account Deduction Balance Exception

Data in steps 2, 3, and 4 of the normal process normally updates the global commit, while data in the exception process rolls back globally due to the exception error in step 4.

configuration file

The configuration entry file for fescar is registry.conf , view the code ConfigurationFactory Knowing that the profile cannot be specified at this time, the profile name can only be registry.conf.

private static final String REGISTRY_CONF = "registry.conf";
public static final Configuration FILE_INSTANCE = new FileConfiguration(REGISTRY_CONF);

Specific configurations can be specified in the registry, using the file type by default, and configuring three parts in file.conf:

  1. Configuration of transport transport transport section corresponds NettyServerConfig Class that defines Netty-related parameters and uses Netty to communicate between TM, RM and fescar-server.
  2. service

    service {
     #vgroup->rgroup
     vgroup_mapping.my_test_tx_group = "default"
     #Configure Client Connection TC Address
     default.grouplist = "127.0.0.1:8091"
     #degrade current not support
     enableDegrade = false
     #disable
     //Whether seata's distributed transactions are enabled
     disableGlobalTransaction = false
    }
  3. client

    client {
      #Upper buffer limit after RM receives TC commit notification
      async.commit.buffer.limit = 10000
      lock {
        retry.internal = 10
        retry.times = 30
      }
    }

Data Source Proxy

In addition to the previous configuration files, where fescar has a slight amount of code in AT mode is specified for the proxy of the data source and is currently only based on the proxy of DruidDataSource.(Note: Any data source type is supported in the latest release of version 0.4.2)

@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DruidDataSource druidDataSource() {
    DruidDataSource druidDataSource = new DruidDataSource();
    return druidDataSource;
}

@Primary
@Bean("dataSource")
public DataSourceProxy dataSource(DruidDataSource druidDataSource) {
    return new DataSourceProxy(druidDataSource);
}

The purpose of using DataSourceProxy is to introduce ConnectionProxy. The non-intrusive aspect of fescar is reflected in the implementation of ConnectionProxy, that is, the starting point for a branch transaction to join a global transaction is the commit phase of a local transaction, so the design ensures that the business data and undo_log are in a local transaction.

Undo_log is a table that needs to be created on the business library, and fescar relies on it to record the status of each branch transaction and the playback data of the two-stage rollback.There is no need to worry about the single-point problem caused by the large amount of data in this table, and the undo_log corresponding to the transaction is deleted asynchronously in the global transaction commit scenario.

CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

Start Server

Go to https://github.com/seata/seata/releases Download the fescar-server corresponding to Client version to avoid protocol inconsistencies due to different versions entering the bin directory after decompression and execute:

./fescar-server.sh 8091 ../data

Start successful output:

2019-04-09 20:27:24.637 INFO [main]c.a.fescar.core.rpc.netty.AbstractRpcRemotingServer.start:152 -Server started ... 

Start Client

fescar's load entry class is located at GlobalTransactionAutoConfiguration , projects based on spring boot can be automatically loaded, and of course GlobalTransactionScanner can be instantiated in other ways.

@Configuration
@EnableConfigurationProperties({FescarProperties.class})
public class GlobalTransactionAutoConfiguration {
    private final ApplicationContext applicationContext;
    private final FescarProperties fescarProperties;

    public GlobalTransactionAutoConfiguration(ApplicationContext applicationContext, FescarProperties fescarProperties) {
        this.applicationContext = applicationContext;
        this.fescarProperties = fescarProperties;
    }

    /**
    * Sample GlobalTransactionScanner
    * scanner Initial Initiator Class for client
    */
    @Bean
    public GlobalTransactionScanner globalTransactionScanner() {
        String applicationName = this.applicationContext.getEnvironment().getProperty("spring.application.name");
        String txServiceGroup = this.fescarProperties.getTxServiceGroup();
        if (StringUtils.isEmpty(txServiceGroup)) {
            txServiceGroup = applicationName + "-fescar-service-group";
            this.fescarProperties.setTxServiceGroup(txServiceGroup);
        }
        
        return new GlobalTransactionScanner(applicationName, txServiceGroup);
    }
}

You can see that a configuration item, FescarProperties, is supported to configure the transaction group name:

spring.cloud.alibaba.fescar.tx-service-group=my_test_tx_group

If you do not specify a service group, the name is generated by default using spring.application.name+ -fescar-service-group, so starting without specifying a spring.application.name will cause an error.

@ConfigurationProperties("spring.cloud.alibaba.fescar")
public class FescarProperties {
    private String txServiceGroup;

    public FescarProperties() {
    }

    public String getTxServiceGroup() {
        return this.txServiceGroup;
    }

    public void setTxServiceGroup(String txServiceGroup) {
        this.txServiceGroup = txServiceGroup;
    }
}

After getting applicationId and txServiceGroup, create GlobalTransactionScanner Object, mainly look at the initClient method in the class.

private void initClient() {
    if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
        throw new IllegalArgumentException(
            "applicationId: " + applicationId + ", txServiceGroup: " + txServiceGroup);
    }
    //init TM
    TMClient.init(applicationId, txServiceGroup);

    //init RM
    RMClient.init(applicationId, txServiceGroup);
  
}

The method can see that both TMClient and RMClient are initialized, either TM or RM roles for a service, and when TM or RM depends on where the @GlobalTransactional annotation is labeled in a global transaction.Client creates a Netty connection to TC, so you can see two Netty Channel s in the startup log, which indicate that the transactionRole s are TMROLE and RMROLE, respectively.

2019-04-09 13:42:57.417  INFO 93715 --- [imeoutChecker_1] c.a.f.c.rpc.netty.NettyPoolableFactory   : NettyPool create channel to {"address":"127.0.0.1:8091","message":{"applicationId":"business-service","byteBuffer":{"char":"\u0000","direct":false,"double":0.0,"float":0.0,"int":0,"long":0,"readOnly":false,"short":0},"transactionServiceGroup":"my_test_tx_group","typeCode":101,"version":"0.4.1"},"transactionRole":"TMROLE"}
2019-04-09 13:42:57.505  INFO 93715 --- [imeoutChecker_1] c.a.f.c.rpc.netty.NettyPoolableFactory   : NettyPool create channel to {"address":"127.0.0.1:8091","message":{"applicationId":"business-service","byteBuffer":{"char":"\u0000","direct":false,"double":0.0,"float":0.0,"int":0,"long":0,"readOnly":false,"short":0},"transactionServiceGroup":"my_test_tx_group","typeCode":103,"version":"0.4.1"},"transactionRole":"RMROLE"}
2019-04-09 13:42:57.629 DEBUG 93715 --- [lector_TMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler    : Send:RegisterTMRequest{applicationId='business-service', transactionServiceGroup='my_test_tx_group'}
2019-04-09 13:42:57.629 DEBUG 93715 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler    : Send:RegisterRMRequest{resourceIds='null', applicationId='business-service', transactionServiceGroup='my_test_tx_group'}
2019-04-09 13:42:57.699 DEBUG 93715 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler    : Receive:version=0.4.1,extraData=null,identified=true,resultCode=null,msg=null,messageId:1
2019-04-09 13:42:57.699 DEBUG 93715 --- [lector_TMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler    : Receive:version=0.4.1,extraData=null,identified=true,resultCode=null,msg=null,messageId:2
2019-04-09 13:42:57.701 DEBUG 93715 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.AbstractRpcRemoting    : com.alibaba.fescar.core.rpc.netty.RmRpcClient@3b06d101 msgId:1, future :com.alibaba.fescar.core.protocol.MessageFuture@28bb1abd, body:version=0.4.1,extraData=null,identified=true,resultCode=null,msg=null
2019-04-09 13:42:57.701 DEBUG 93715 --- [lector_TMROLE_1] c.a.f.c.rpc.netty.AbstractRpcRemoting    : com.alibaba.fescar.core.rpc.netty.TmRpcClient@65fc3fb7 msgId:2, future :com.alibaba.fescar.core.protocol.MessageFuture@9a1e3df, body:version=0.4.1,extraData=null,identified=true,resultCode=null,msg=null
2019-04-09 13:42:57.710  INFO 93715 --- [imeoutChecker_1] c.a.fescar.core.rpc.netty.RmRpcClient    : register RM success. server version:0.4.1,channel:[id: 0xe6468995, L:/127.0.0.1:57397 - R:/127.0.0.1:8091]
2019-04-09 13:42:57.710  INFO 93715 --- [imeoutChecker_1] c.a.f.c.rpc.netty.NettyPoolableFactory   : register success, cost 114 ms, version:0.4.1,role:TMROLE,channel:[id: 0xd22fe0c5, L:/127.0.0.1:57398 - R:/127.0.0.1:8091]
2019-04-09 13:42:57.711  INFO 93715 --- [imeoutChecker_1] c.a.f.c.rpc.netty.NettyPoolableFactory   : register success, cost 125 ms, version:0.4.1,role:RMROLE,channel:[id: 0xe6468995, L:/127.0.0.1:57397 - R:/127.0.0.1:8091]

You can see it in the log

  1. Create Netty Connection
  2. Send Registration Request
  3. Get response results
  4. Successful instantiation of RmRpcClient, TmRpcClient

TM Processing Flow

In this example, the role of TM is business-service, and the purchase s method of BusinessService annotates the @GlobalTransactional annotation:

@Service
public class BusinessService {

    @Autowired
    private StorageFeignClient storageFeignClient;
    @Autowired
    private OrderFeignClient orderFeignClient;

    @GlobalTransactional
    public void purchase(String userId, String commodityCode, int orderCount){
        storageFeignClient.deduct(commodityCode, orderCount);

        orderFeignClient.create(userId, commodityCode, orderCount);
    }
}

A global transaction will be created after a method call, starting with the @GlobalTransactional annotation. GlobalTransactionalInterceptor Intercepted processing.

/**
 * AOP Intercept method calls
 */
@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
    Class<?> targetClass = (methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null);
    Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
    final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);

    //Get method GlobalTransactional annotation
    final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
    final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
    
    //If the method has a GlobalTransactional annotation, the corresponding method processing is blocked
    if (globalTransactionalAnnotation != null) {
        return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
    } else if (globalLockAnnotation != null) {
        return handleGlobalLock(methodInvocation);
    } else {
        return methodInvocation.proceed();
    }
}

Pair in handleGlobalTransaction method TransactionalTemplate The execute of is called, and as you can see from the class name, this is a standard template method that defines the standard steps TM takes for global transactions, and the comments are clear.

public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {
    // 1. get or create a transaction
    GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

    try {
        // 2. begin transaction
        try {
            triggerBeforeBegin();
            tx.begin(business.timeout(), business.name());
            triggerAfterBegin();
        } catch (TransactionException txe) {
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.BeginFailure);
        }
        Object rs = null;
        try {
            // Do Your Business
            rs = business.execute();
        } catch (Throwable ex) {
            // 3. any business exception, rollback.
            try {
                triggerBeforeRollback();
                tx.rollback();
                triggerAfterRollback();
                // 3.1 Successfully rolled back
                throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex);
            } catch (TransactionException txe) {
                // 3.2 Failed to rollback
                throw new TransactionalExecutor.ExecutionException(tx, txe,
                    TransactionalExecutor.Code.RollbackFailure, ex);
            }
        }
        // 4. everything is fine, commit.
        try {
            triggerBeforeCommit();
            tx.commit();
            triggerAfterCommit();
        } catch (TransactionException txe) {
            // 4.1 Failed to commit
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.CommitFailure);
        }
        return rs;
    } finally {
        //5. clear
        triggerAfterCompletion();
        cleanUp();
    }
}

adopt DefaultGlobalTransaction The begin method of opens a global transaction.

public void begin(int timeout, String name) throws TransactionException {
    if (role != GlobalTransactionRole.Launcher) {
        check();
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Ignore Begin(): just involved in global transaction [" + xid + "]");
        }
        return;
    }
    if (xid != null) {
        throw new IllegalStateException();
    }
    if (RootContext.getXID() != null) {
        throw new IllegalStateException();
    }
    //Specifically how to open the transaction to get the XID returned by TC
    xid = transactionManager.begin(null, null, name, timeout);
    status = GlobalStatus.Begin;
    RootContext.bind(xid);
    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug("Begin a NEW global transaction [" + xid + "]");
    }
}

If (role!= GlobalTransactionRole.Launcher) at the beginning of the method plays a key role in role judgment, indicating whether it is currently the originator or participant of a global transaction.If the @GlobalTransactional annotation is also added to the downstream system method of a distributed transaction, its role is Participant, ignoring the begin direct return that follows, and deciding whether Launcher or Participant is based on the existence of XID in the current context, Launcher is the absence of XID, and Participant is the presence of XID.Thus, global transaction creation can only be performed by Launcher, and only one Launcher exists in a distributed transaction.

DefaultTransactionManager Responsible for TM communication with TC, sending begin, commit, rollback instructions.

@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
    throws TransactionException {
    GlobalBeginRequest request = new GlobalBeginRequest();
    request.setTransactionName(name);
    request.setTimeout(timeout);
    GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);
    return response.getXid();
}

The XID returned by fescar-server at this point indicates that a global transaction was successfully created, and this process is reflected in the log.

2019-04-09 13:46:57.417 DEBUG 31326 --- [nio-8084-exec-1] c.a.f.c.rpc.netty.AbstractRpcRemoting    : offer message: timeout=60000,transactionName=purchase(java.lang.String,java.lang.String,int)
2019-04-09 13:46:57.417 DEBUG 31326 --- [geSend_TMROLE_1] c.a.f.c.rpc.netty.AbstractRpcRemoting    : write message:FescarMergeMessage timeout=60000,transactionName=purchase(java.lang.String,java.lang.String,int), channel:[id: 0xa148545e, L:/127.0.0.1:56120 - R:/127.0.0.1:8091],active?true,writable?true,isopen?true
2019-04-09 13:46:57.418 DEBUG 31326 --- [lector_TMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler    : Send:FescarMergeMessage timeout=60000,transactionName=purchase(java.lang.String,java.lang.String,int)
2019-04-09 13:46:57.421 DEBUG 31326 --- [lector_TMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler    : Receive:MergeResultMessage com.alibaba.fescar.core.protocol.transaction.GlobalBeginResponse@2dc480dc,messageId:1196
2019-04-09 13:46:57.421 DEBUG 31326 --- [nio-8084-exec-1] c.a.fescar.core.context.RootContext      : bind 192.168.224.93:8091:2008502699
2019-04-09 13:46:57.421 DEBUG 31326 --- [nio-8084-exec-1] c.a.f.tm.api.DefaultGlobalTransaction    : Begin a NEW global transaction [192.168.224.93:8091:2008502699]

After the global transaction is created, the execution of business.execute(), the business code storageFeignClient.deduct(commodityCode, orderCount), enters the RM process, where the business logic is the deduction inventory interface that calls storage-service.

RM process

@GetMapping(path = "/deduct")
public Boolean deduct(String commodityCode, Integer count){
    storageService.deduct(commodityCode,count);
    return true;
}

@Transactional
public void deduct(String commodityCode, int count){
    Storage storage = storageDAO.findByCommodityCode(commodityCode);
    storage.setCount(storage.getCount()-count);

    storageDAO.save(storage);
}

There are no fescar-related codes and annotations for the storage's interface and service methods, reflecting the non-intrusive nature of fescar.So how does it participate in this global transaction?The answer is in ConnectionProxy This is also the reason why you have to use DataSourceProxy, which is used by fescar to register branch transactions with TC and send the processing results of RM when a local transaction of business code commits.

Since the transaction commit of the business code itself is implemented by the ConnectionProxy proxy, the commit method of ConnectionProxy is actually executed when committing a local transaction.

public void commit() throws SQLException {
    //Perform a commit of a global transaction if it is currently a global transaction
    //Determine whether it is a global transaction or whether there is a XID in the current context?
    if (context.inGlobalTransaction()) {
        processGlobalTransactionCommit();
    } else if (context.isGlobalLockRequire()) {
        processLocalCommitWithGlobalLocks();
    } else {
        targetConnection.commit();
    }
}
    
private void processGlobalTransactionCommit() throws SQLException {
    try {
        //First, register RM with TC to get the branchId assigned by TC
        register();
    } catch (TransactionException e) {
        recognizeLockKeyConflictException(e);
    }

    try {
        if (context.hasUndoLog()) {
            //Write undolog
            UndoLogManager.flushUndoLogs(this);
        }

        //Commit local transaction, write undo_log and business data in the same local transaction
        targetConnection.commit();
    } catch (Throwable ex) {
        //Send notification of RM's transaction failure to TC
        report(false);
        if (ex instanceof SQLException) {
            throw new SQLException(ex);
        }
    }
    //Send notification of RM's transaction success to TC
    report(true);
    context.reset();
}
    
private void register() throws TransactionException {
    //Register RM, build request to send registration instructions to TC via netty
    Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
            null, context.getXid(), null, context.buildLockKeys());
    //The branchId that will be returned exists in the context
    context.setBranchId(branchId);
}

Verify the above process by logging.

2019-04-09 21:57:48.341 DEBUG 38933 --- [nio-8081-exec-1] o.s.c.a.f.web.FescarHandlerInterceptor   : xid in RootContext null xid in RpcContext 192.168.0.2:8091:2008546211
2019-04-09 21:57:48.341 DEBUG 38933 --- [nio-8081-exec-1] c.a.fescar.core.context.RootContext      : bind 192.168.0.2:8091:2008546211
2019-04-09 21:57:48.341 DEBUG 38933 --- [nio-8081-exec-1] o.s.c.a.f.web.FescarHandlerInterceptor   : bind 192.168.0.2:8091:2008546211 to RootContext
2019-04-09 21:57:48.386  INFO 38933 --- [nio-8081-exec-1] o.h.h.i.QueryTranslatorFactoryInitiator  : HHH000397: Using ASTQueryTranslatorFactory
Hibernate: select storage0_.id as id1_0_, storage0_.commodity_code as commodit2_0_, storage0_.count as count3_0_ from storage_tbl storage0_ where storage0_.commodity_code=?
Hibernate: update storage_tbl set count=? where id=?
2019-04-09 21:57:48.673  INFO 38933 --- [nio-8081-exec-1] c.a.fescar.core.rpc.netty.RmRpcClient    : will connect to 192.168.0.2:8091
2019-04-09 21:57:48.673  INFO 38933 --- [nio-8081-exec-1] c.a.fescar.core.rpc.netty.RmRpcClient    : RM will register :jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false
2019-04-09 21:57:48.673  INFO 38933 --- [nio-8081-exec-1] c.a.f.c.rpc.netty.NettyPoolableFactory   : NettyPool create channel to {"address":"192.168.0.2:8091","message":{"applicationId":"storage-service","byteBuffer":{"char":"\u0000","direct":false,"double":0.0,"float":0.0,"int":0,"long":0,"readOnly":false,"short":0},"resourceIds":"jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false","transactionServiceGroup":"hello-service-fescar-service-group","typeCode":103,"version":"0.4.0"},"transactionRole":"RMROLE"}
2019-04-09 21:57:48.677 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler    : Send:RegisterRMRequest{resourceIds='jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false', applicationId='storage-service', transactionServiceGroup='hello-service-fescar-service-group'}
2019-04-09 21:57:48.680 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler    : Receive:version=0.4.1,extraData=null,identified=true,resultCode=null,msg=null,messageId:9
2019-04-09 21:57:48.680 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.AbstractRpcRemoting    : com.alibaba.fescar.core.rpc.netty.RmRpcClient@7d61f5d4 msgId:9, future :com.alibaba.fescar.core.protocol.MessageFuture@186cd3e0, body:version=0.4.1,extraData=null,identified=true,resultCode=null,msg=null
2019-04-09 21:57:48.680  INFO 38933 --- [nio-8081-exec-1] c.a.fescar.core.rpc.netty.RmRpcClient    : register RM success. server version:0.4.1,channel:[id: 0xd40718e3, L:/192.168.0.2:62607 - R:/192.168.0.2:8091]
2019-04-09 21:57:48.680  INFO 38933 --- [nio-8081-exec-1] c.a.f.c.rpc.netty.NettyPoolableFactory   : register success, cost 3 ms, version:0.4.1,role:RMROLE,channel:[id: 0xd40718e3, L:/192.168.0.2:62607 - R:/192.168.0.2:8091]
2019-04-09 21:57:48.680 DEBUG 38933 --- [nio-8081-exec-1] c.a.f.c.rpc.netty.AbstractRpcRemoting    : offer message: transactionId=2008546211,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false,lockKey=storage_tbl:1
2019-04-09 21:57:48.681 DEBUG 38933 --- [geSend_RMROLE_1] c.a.f.c.rpc.netty.AbstractRpcRemoting    : write message:FescarMergeMessage transactionId=2008546211,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false,lockKey=storage_tbl:1, channel:[id: 0xd40718e3, L:/192.168.0.2:62607 - R:/192.168.0.2:8091],active?true,writable?true,isopen?true
2019-04-09 21:57:48.681 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler    : Send:FescarMergeMessage transactionId=2008546211,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false,lockKey=storage_tbl:1
2019-04-09 21:57:48.687 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler    : Receive:MergeResultMessage BranchRegisterResponse: transactionId=2008546211,branchId=2008546212,result code =Success,getMsg =null,messageId:11
2019-04-09 21:57:48.702 DEBUG 38933 --- [nio-8081-exec-1] c.a.f.rm.datasource.undo.UndoLogManager  : Flushing UNDO LOG: {"branchId":2008546212,"sqlUndoLogs":[{"afterImage":{"rows":[{"fields":[{"keyType":"PrimaryKey","name":"id","type":4,"value":1},{"keyType":"NULL","name":"count","type":4,"value":993}]}],"tableName":"storage_tbl"},"beforeImage":{"rows":[{"fields":[{"keyType":"PrimaryKey","name":"id","type":4,"value":1},{"keyType":"NULL","name":"count","type":4,"value":994}]}],"tableName":"storage_tbl"},"sqlType":"UPDATE","tableName":"storage_tbl"}],"xid":"192.168.0.2:8091:2008546211"}
2019-04-09 21:57:48.755 DEBUG 38933 --- [nio-8081-exec-1] c.a.f.c.rpc.netty.AbstractRpcRemoting    : offer message: transactionId=2008546211,branchId=2008546212,resourceId=null,status=PhaseOne_Done,applicationData=null
2019-04-09 21:57:48.755 DEBUG 38933 --- [geSend_RMROLE_1] c.a.f.c.rpc.netty.AbstractRpcRemoting    : write message:FescarMergeMessage transactionId=2008546211,branchId=2008546212,resourceId=null,status=PhaseOne_Done,applicationData=null, channel:[id: 0xd40718e3, L:/192.168.0.2:62607 - R:/192.168.0.2:8091],active?true,writable?true,isopen?true
2019-04-09 21:57:48.756 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler    : Send:FescarMergeMessage transactionId=2008546211,branchId=2008546212,resourceId=null,status=PhaseOne_Done,applicationData=null
2019-04-09 21:57:48.758 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler    : Receive:MergeResultMessage com.alibaba.fescar.core.protocol.transaction.BranchReportResponse@582a08cf,messageId:13
2019-04-09 21:57:48.799 DEBUG 38933 --- [nio-8081-exec-1] c.a.fescar.core.context.RootContext      : unbind 192.168.0.2:8091:2008546211
2019-04-09 21:57:48.799 DEBUG 38933 --- [nio-8081-exec-1] o.s.c.a.f.web.FescarHandlerInterceptor   : unbind 192.168.0.2:8091:2008546211 from RootContext
  1. Get XID from business-service
  2. Bind XID to current context
  3. Execute business logic sql
  4. Create Netty connection to TC for this RM
  5. Send information about branch transactions to TC
  6. Get branchId returned by TC
  7. Record Undo Log data
  8. Send TC the results of the PhaseOne phase of this transaction
  9. Unbind XID from current context

The first and ninth steps are FescarHandlerInterceptor Completed in, this class is not a fescar but the spring-cloud-alibaba-fescar mentioned earlier, which implements xid bind and unbind to the current request context when communicating based on feign, rest.Here the RM completes the PhaseOne phase, followed by the PhaseTwo phase processing logic.

Transaction Commit

After each branch transaction is executed, TC summarizes the reported results of each RM and sends commands for commit or rollback to each RM.

2019-04-09 21:57:49.813 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler    : Receive:xid=192.168.0.2:8091:2008546211,branchId=2008546212,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false,applicationData=null,messageId:1
2019-04-09 21:57:49.813 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.AbstractRpcRemoting    : com.alibaba.fescar.core.rpc.netty.RmRpcClient@7d61f5d4 msgId:1, body:xid=192.168.0.2:8091:2008546211,branchId=2008546212,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false,applicationData=null
2019-04-09 21:57:49.814  INFO 38933 --- [atch_RMROLE_1_8] c.a.f.core.rpc.netty.RmMessageListener   : onMessage:xid=192.168.0.2:8091:2008546211,branchId=2008546212,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false,applicationData=null
2019-04-09 21:57:49.816  INFO 38933 --- [atch_RMROLE_1_8] com.alibaba.fescar.rm.AbstractRMHandler  : Branch committing: 192.168.0.2:8091:2008546211 2008546212 jdbc:mysql://127.0.0.1:3306/db_storage?useSSL=false null
2019-04-09 21:57:49.816  INFO 38933 --- [atch_RMROLE_1_8] com.alibaba.fescar.rm.AbstractRMHandler  : Branch commit result: PhaseTwo_Committed
2019-04-09 21:57:49.817  INFO 38933 --- [atch_RMROLE_1_8] c.a.fescar.core.rpc.netty.RmRpcClient    : RmRpcClient sendResponse branchStatus=PhaseTwo_Committed,result code =Success,getMsg =null
2019-04-09 21:57:49.817 DEBUG 38933 --- [atch_RMROLE_1_8] c.a.f.c.rpc.netty.AbstractRpcRemoting    : send response:branchStatus=PhaseTwo_Committed,result code =Success,getMsg =null,channel:[id: 0xd40718e3, L:/192.168.0.2:62607 - R:/192.168.0.2:8091]
2019-04-09 21:57:49.817 DEBUG 38933 --- [lector_RMROLE_1] c.a.f.c.rpc.netty.MessageCodecHandler    : Send:branchStatus=PhaseTwo_Committed,result code =Success,getMsg =null

As you can see from the log

  1. RM received commit notification with XID=192.168.0.2:8091:2008546211 and branchId=2008546212;
  2. Perform the commit action;
  3. Send commit results to TC, branchStatus is PhaseTwo_Committed;

Looking specifically at the execution of phase 2 commit AbstractRMHandler Class's doBranchCommit method:

/**
 * Key parameters such as xid, branchId to get notifications
 * Then call RM's branchCommit
 */
protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException {
    String xid = request.getXid();
    long branchId = request.getBranchId();
    String resourceId = request.getResourceId();
    String applicationData = request.getApplicationData();
    LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);
    BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId, applicationData);
    response.setBranchStatus(status);
    LOGGER.info("Branch commit result: " + status);
}

The request from the branchCommit will eventually be invoked to AsyncWorker The branchCommit method of.The way AsyncWorker handles this is a key part of the fescar architecture, as most transactions are committed normally, so the PhaseOne phase is over, allowing the fastest release of locks.After the PhaseTwo phase receives commands from commit, asynchronous processing is sufficient.Exclude PhaseTwo's time consumption from a distributed transaction.

private static final List<Phase2Context> ASYNC_COMMIT_BUFFER = Collections.synchronizedList( new ArrayList<Phase2Context>());
        
/**
 * Add XID s that will need to be submitted to the list
 */
@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
    if (ASYNC_COMMIT_BUFFER.size() < ASYNC_COMMIT_BUFFER_LIMIT) {
        ASYNC_COMMIT_BUFFER.add(new Phase2Context(branchType, xid, branchId, resourceId, applicationData));
    } else {
        LOGGER.warn("Async commit buffer is FULL. Rejected branch [" + branchId + "/" + xid + "] will be handled by housekeeping later.");
    }
    return BranchStatus.PhaseTwo_Committed;
}
    
/**
 * Consuming XID s in list s through timed tasks
 */
public synchronized void init() {
    LOGGER.info("Async Commit Buffer Limit: " + ASYNC_COMMIT_BUFFER_LIMIT);
    timerExecutor = new ScheduledThreadPoolExecutor(1,
        new NamedThreadFactory("AsyncWorker", 1, true));
    timerExecutor.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                doBranchCommits();
            } catch (Throwable e) {
                LOGGER.info("Failed at async committing ... " + e.getMessage());
            }
        }
    }, 10, 1000 * 1, TimeUnit.MILLISECONDS);
}
    
private void doBranchCommits() {
    if (ASYNC_COMMIT_BUFFER.size() == 0) {
        return;
    }
    Map<String, List<Phase2Context>> mappedContexts = new HashMap<>();
    Iterator<Phase2Context> iterator = ASYNC_COMMIT_BUFFER.iterator();
    
    //Remove all to-do data from ASYNC_COMMIT_BUFFER in a timed loop
    //Group commit data with resourceId as key, which is the connection url to a database
    //As you can see in the previous log, the goal is to override the application's multiple data source creation
    while (iterator.hasNext()) {
        Phase2Context commitContext = iterator.next();
        List<Phase2Context> contextsGroupedByResourceId = mappedContexts.get(commitContext.resourceId);
        if (contextsGroupedByResourceId == null) {
            contextsGroupedByResourceId = new ArrayList<>();
            mappedContexts.put(commitContext.resourceId, contextsGroupedByResourceId);
        }
        contextsGroupedByResourceId.add(commitContext);

        iterator.remove();

    }

    for (Map.Entry<String, List<Phase2Context>> entry : mappedContexts.entrySet()) {
        Connection conn = null;
        try {
            try {
                //Get data sources and connections from resourceId
                DataSourceProxy dataSourceProxy = DataSourceManager.get().get(entry.getKey());
                conn = dataSourceProxy.getPlainConnection();
            } catch (SQLException sqle) {
                LOGGER.warn("Failed to get connection for async committing on " + entry.getKey(), sqle);
                continue;
            }
            List<Phase2Context> contextsGroupedByResourceId = entry.getValue();
            for (Phase2Context commitContext : contextsGroupedByResourceId) {
                try {
                    //Perform undolog processing by deleting records corresponding to xid, branchId
                    UndoLogManager.deleteUndoLog(commitContext.xid, commitContext.branchId, conn);
                } catch (Exception ex) {
                    LOGGER.warn(
                        "Failed to delete undo log [" + commitContext.branchId + "/" + commitContext.xid + "]", ex);
                }
            }

        } finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (SQLException closeEx) {
                    LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", closeEx);
                }
            }
        }
    }
}

So for the processing of commit actions, RM only needs to delete undo_log corresponding to xid and branchId.

rollback

There are two scenarios for triggering a rollback scene

  1. Branch transaction exception, that is ConnectionProxy The case of report(false) in
  2. TM caught an exception thrown on a downstream system, an exception caught by a method that initiates a global transaction with the @GlobalTransactional annotation.In front TransactionalTemplate In the execute template method of the class, the call to business.execute() is caught, and rollback is called after catch, and TM notifies TC that the transaction needs to be rolled back for XID
public void rollback() throws TransactionException {
   //Only Launcher can launch this rollback
   if (role == GlobalTransactionRole.Participant) {
       // Participant has no responsibility of committing
       if (LOGGER.isDebugEnabled()) {
           LOGGER.debug("Ignore Rollback(): just involved in global transaction [" + xid + "]");
       }
       return;
   }
   if (xid == null) {
       throw new IllegalStateException();
   }

   status = transactionManager.rollback(xid);
   if (RootContext.getXID() != null) {
       if (xid.equals(RootContext.getXID())) {
           RootContext.unbind();
       }
   }
}

The rollback directive is sent to the participant after the TC summary, and the RM is AbstractRMHandler The class's doBranchRollback method receives notifications of this rollback.

protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response) throws TransactionException {
    String xid = request.getXid();
    long branchId = request.getBranchId();
    String resourceId = request.getResourceId();
    String applicationData = request.getApplicationData();
    LOGGER.info("Branch rolling back: " + xid + " " + branchId + " " + resourceId);
    BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId, applicationData);
    response.setBranchStatus(status);
    LOGGER.info("Branch rollback result: " + status);
}

The rollback request is then passed to the branchRollback method of the DataSourceManager class.

public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
    //Get the corresponding data source from resourceId
    DataSourceProxy dataSourceProxy = get(resourceId);
    if (dataSourceProxy == null) {
        throw new ShouldNeverHappenException();
    }
    try {
        UndoLogManager.undo(dataSourceProxy, xid, branchId);
    } catch (TransactionException te) {
        if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
            return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
        } else {
            return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
        }
    }
    return BranchStatus.PhaseTwo_Rollbacked;
}

Will eventually execute UndoLogManager Class undo method, because pure jdbc operation code is relatively long and not pasted out, you can view the source code by connecting to github, and talk about the specific process of undo:

  1. Find undo_log submitted by PhaseOne phase based on xid and branchId;
  2. If found, the playback sql is generated from the data recorded in undo_log and executed, that is, the data modified in PhaseOne phase is restored.
  3. After the second step, delete the undo_log data.
  4. If the corresponding undo_log is not found in step 1, insert an undo_log with the status GlobalFinished.Failure to find may be due to a local transaction exception in the PhaseOne phase that resulted in a failure to write properly.Because xid and branchId are the only indexes, insertion of step 4 prevents successful writing after PhaseOne phase recovery, so PhaseOne phase will be abnormal so that business data will not be submitted successfully and the data will be rolled back.

summary

Locally, combined with the distributed business scenario, the main processing flow of fescar client side is analyzed, and the main source code of TM and RM roles is analyzed, in order to help everyone understand how fescar works.

With the rapid iteration of fescar and the continuous improvement of later Roadmap planning, it is believed that fescar can become the benchmark solution for open source distributed transactions over time.


_Push every Monday

First get next share

(There are also a variety of prizes waiting for you every Thursday)

Keywords: Java Netty Spring MySQL JDBC

Added by unkwntech on Sat, 18 May 2019 13:06:43 +0300