Seata-XA schema principles

1 XA pattern example

Example reference: on github seata-sample
Business code level and xa are identical, just replace the database proxy level with DataSourceProxyXA, more, refer to the example.

2 Architecture


Note: This image is from the seata website.

3 Source Analysis

3.1 TM Open Global Transactions

This process is the same as AT mode, using @GlobalTransactional ly.

3.2 RM performs branch transactions

Since DataSource uses a proxy, all DB operations are handed over to DataSourceProxyXA to complete, and when the DB operation is executed, the request will be executed by ExecuteTemplateXA.

3.2.1 Execute sql:ExecuteTemplateXA#execute

As you can see in the following code, the main logic is as follows:

  • Record transaction commits, autocommit or non-autocommit
  • Transaction commit mode changed to non-automatic commit, see: connectionProxyXA.setAutoCommit method. This step will register the branch transaction with the TC.
  • Perform DB operations.
  • If the execution fails, rollback, see: connectionProxyXA.rollback()
  • If the transaction starts with automatic commit, commit, see: connectionProxyXA.commit()
public static <T, S extends Statement> T execute(AbstractConnectionProxyXA connectionProxyXA, StatementCallback<T, S> statementCallback, S targetStatement, Object... args) throws SQLException {
    boolean autoCommitStatus = connectionProxyXA.getAutoCommit();
    if (autoCommitStatus) {
        // XA Start
        connectionProxyXA.setAutoCommit(false);
    }
    try {
        T res = null;
        try {
            // execute SQL
            res = statementCallback.execute(targetStatement, args);

        } catch (Throwable ex) {
            if (autoCommitStatus) {
                // XA End & Rollback
                try {
                    connectionProxyXA.rollback();
                } catch (SQLException sqle) {
                    // log and ignore the rollback failure.
                    LOGGER.warn(
                        "Failed to rollback xa branch of " + connectionProxyXA.xid +
                            "(caused by SQL execution failure(" + ex.getMessage() + ") since " + sqle.getMessage(),
                        sqle);
                }
            }

            if (ex instanceof SQLException) {
                throw ex;
            } else {
                throw new SQLException(ex);
            }

        }
        if (autoCommitStatus) {
            try {
                // XA End & Prepare
                connectionProxyXA.commit();
            } catch (Throwable ex) {
                LOGGER.warn(
                    "Failed to commit xa branch of " + connectionProxyXA.xid + ") since " + ex.getMessage(),
                    ex);
                // XA End & Rollback
                if (!(ex instanceof SQLException) || !AbstractConnectionProxyXA.SQLSTATE_XA_NOT_END.equalsIgnoreCase(((SQLException) ex).getSQLState())) {
                    try {
                        connectionProxyXA.rollback();
                    } catch (SQLException sqle) {
                        // log and ignore the rollback failure.
                        LOGGER.warn(
                            "Failed to rollback xa branch of " + connectionProxyXA.xid +
                                "(caused by commit failure(" + ex.getMessage() + ") since " + sqle.getMessage(),
                            sqle);
                    }
                }

                if (ex instanceof SQLException) {
                    throw ex;
                } else {
                    throw new SQLException(ex);
                }

            }
        }
        return res;
    } finally {
        if (autoCommitStatus) {
            connectionProxyXA.setAutoCommit(true);
        }

    }
}

3.2.2 Set autocommit mode: ConnectionProxyXA#setAutoCommit

  • Register branch transactions with TC.
  • Bind the current branch transaction id with xaResource, xaResource.start
public void setAutoCommit(boolean autoCommit) throws SQLException {
    if (currentAutoCommitStatus == autoCommit) {
        return;
    }
    if (autoCommit) {
        // According to JDBC spec:
        // If this method is called during a transaction and the
        // auto-commit mode is changed, the transaction is committed.
        if (xaActive) {
            commit();
        }
    } else {
        if (xaActive) {
            throw new SQLException("should NEVER happen: setAutoCommit from true to false while xa branch is active");
        }
        // Start a XA branch
        long branchId = 0L;
        try {
            // 1. register branch to TC then get the branchId
            branchId = DefaultResourceManager.get().branchRegister(BranchType.XA, resource.getResourceId(), null, xid, null,
                null);
        } catch (TransactionException te) {
            cleanXABranchContext();
            throw new SQLException("failed to register xa branch " + xid + " since " + te.getCode() + ":" + te.getMessage(), te);
        }
        // 2. build XA-Xid with xid and branchId
        this.xaBranchXid = XAXidBuilder.build(xid, branchId);
        try {
            // 3. XA Start
            xaResource.start(this.xaBranchXid, XAResource.TMNOFLAGS);
        } catch (XAException e) {
            cleanXABranchContext();
            throw new SQLException("failed to start xa branch " + xid + " since " + e.getMessage(), e);
        }
        // 4. XA is active
        this.xaActive = true;

    }

    currentAutoCommitStatus = autoCommit;
}

3.2.3 Set autocommit mode: ConnectionProxyXA#commit

  • Terminate the work performed by the xaBranchXid branch, xaResource.end
  • Request the resource manager to prepare the submission of the xaBranchXid transaction, that is, xaResource.prepare
public void commit() throws SQLException {
    if (currentAutoCommitStatus) {
        // Ignore the committing on an autocommit session.
        return;
    }
    if (!xaActive || this.xaBranchXid == null) {
        throw new SQLException("should NOT commit on an inactive session", SQLSTATE_XA_NOT_END);
    }
    try {
        // XA End: Success
        xaResource.end(xaBranchXid, XAResource.TMSUCCESS);
        // XA Prepare
        xaResource.prepare(xaBranchXid);
        // Keep the Connection if necessary
        keepIfNecessary();
    } catch (XAException xe) {
        try {
            // Branch Report to TC: Failed
            DefaultResourceManager.get().branchReport(BranchType.XA, xid, xaBranchXid.getBranchId(),
                BranchStatus.PhaseOne_Failed, null);
        } catch (TransactionException te) {
            LOGGER.warn("Failed to report XA branch commit-failure on " + xid + "-" + xaBranchXid.getBranchId()
                + " since " + te.getCode() + ":" + te.getMessage() + " and XAException:" + xe.getMessage());

        }
        throw new SQLException(
            "Failed to end(TMSUCCESS)/prepare xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe
                .getMessage(), xe);
    } finally {
        cleanXABranchContext();
    }
}

3.2.4 Set autocommit mode: ConnectionProxyXA#rollback

  • Report transaction execution failure to TC.
  • Terminate the work performed by the xaBranchXid branch, xaResource.end
public void rollback() throws SQLException {
    if (currentAutoCommitStatus) {
        // Ignore the committing on an autocommit session.
        return;
    }
    if (!xaActive || this.xaBranchXid == null) {
        throw new SQLException("should NOT rollback on an inactive session");
    }
    try {
        // Branch Report to TC
        DefaultResourceManager.get().branchReport(BranchType.XA, xid, xaBranchXid.getBranchId(), BranchStatus.PhaseOne_Failed, null);

    } catch (TransactionException te) {
        // log and ignore the report failure
        LOGGER.warn("Failed to report XA branch rollback on " + xid + "-" + xaBranchXid.getBranchId()
            + " since " + te.getCode() + ":" + te.getMessage());
    }
    try {
        // XA End: Fail
        xaResource.end(xaBranchXid, XAResource.TMFAIL);
    } catch (XAException xe) {
        throw new SQLException(
            "Failed to end(TMFAIL) xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe
                .getMessage(), xe);
    } finally {
        cleanXABranchContext();
    }
}

3.3 TM sends a request to TC to notify Global Commit/Rollback

This process is identical to AT mode.

3.4 TC sends Branch Commit/Rollback requests to branches

This process is identical to AT mode.

3.5 RM for Branch Commit/Rollback

When AbstractRMHandler receives the request, it is ultimately handed over to ResourceManagerXA for processing. The specific Commit and Rollback are handed over to xaCommit and xaRollback, which directly trigger the commit and rollback methods of xaResource.

public void xaCommit(String xid, long branchId, String applicationData) throws XAException {
    XAXid xaXid = XAXidBuilder.build(xid, branchId);
    xaResource.commit(xaXid, false);
    releaseIfNecessary();
}

public void xaRollback(String xid, long branchId, String applicationData) throws XAException {
    XAXid xaXid = XAXidBuilder.build(xid, branchId);
    xaResource.rollback(xaXid);
    releaseIfNecessary();
}

3.6 Exception Compensation

The exception compensation process and AT mode are identical.

4 XAResource

From the source level above, Seata's XA schema is based on jdk's XAResource interface. The following is a description of this interface in the JDK documentation.

Introduction to 4.1 XAResource

The XAResource interface is a Java mapping of the industry standard XA interface based on the X/Open CAE specification (Distributed Transaction: XA specification).

In a distributed transaction processing (DTP) environment, the XA interface defines the protocol between a resource manager and a transaction manager. The JDBC driver or JMS provider implements this interface to support the association of global transactions with a database or messaging service connection.

Any transaction resource that can be used by an application in an environment where transactions are controlled by an external transaction manager can support the XAResource interface. A database management system is one such resource. Applications can access data through multiple database connections. Each database connection is added to the list as a transactional resource through the transaction manager. The transaction manager acquires an XAResource for each connection that participates in a global transaction. The transaction manager uses the start method to establish associations between global transactions and resources, and the end method to remove associations between transactions and resources. Resource Manager is responsible for associating global transactions with all work performed on its data between start and end method calls.

When a transaction commits, the transaction manager tells the resource manager to prepare, commit, or roll back the transaction based on the two-phase commit protocol.

4.2 Method Summary

Return resultsMethod Signaturedescribe
voidcommit(Xid xid, boolean onePhase)Commits the global transaction specified by xid.
voidend(Xid xid, int flags)Terminates work performed on behalf of a transaction branch.
voidforget(Xid xid)Tell the resource manager to ignore heuristically completed transaction branches.
intgetTransactionTimeout()Gets the current transaction timeout value set for this XAResource instance.
booleanisSameRM(XAResource xares)Call this method to determine if the resource manager instance represented by the target object is the same as that represented by the parameter xares.
intprepare(Xid xid)Request the resource manager to prepare the transaction commit for the transaction specified in xid.
Xid[]recover(int flag)Get a list of prepared transaction branches from Resource Manager.
voidrollback(Xid xid)Notify the resource manager to roll back the work performed on behalf of the transaction branch.
booleansetTransactionTimeout(int seconds)Set the current transaction timeout value for this XAResource instance.
voidstart(Xid xid, int flags)Starts work on behalf of the transaction branch specified in xid.

4.3 Field Summary

typeattributedescribe
static intTMENDRSCANTerminate recovery scan.
static intTMFAILUnassociate the caller and mark the transaction branch as rollback-only.
static intTMJOINThe caller is connecting to an existing transaction branch.
static intTMNOFLAGSUse TMNOFLAGS to indicate that no flag value is selected.
static intTMONEPHASEThe caller is using a phase optimization.
static intTMRESUMEThe caller is recovering association with a pending transaction branch.
static intTMSTARTRSCANStart the recovery scan.
static intTMSUCCESSUnassociate the caller from the transaction branch.
static intTMSUSPENDThe caller is suspending (not terminating) its association with a transaction branch.
static intXA_OKTransaction is properly prepared.
static intXA_RDONLYThe transaction branch is read-only and committed.

5 Reference Documents

Seata XA mode.

Keywords: Java seata

Added by jesseledwards on Tue, 08 Feb 2022 20:34:14 +0200