1 XA pattern example
Example reference: on github seata-sample
Business code level and xa are identical, just replace the database proxy level with DataSourceProxyXA, more, refer to the example.
2 Architecture
Note: This image is from the seata website.
3 Source Analysis
3.1 TM Open Global Transactions
This process is the same as AT mode, using @GlobalTransactional ly.
3.2 RM performs branch transactions
Since DataSource uses a proxy, all DB operations are handed over to DataSourceProxyXA to complete, and when the DB operation is executed, the request will be executed by ExecuteTemplateXA.
3.2.1 Execute sql:ExecuteTemplateXA#execute
As you can see in the following code, the main logic is as follows:
- Record transaction commits, autocommit or non-autocommit
- Transaction commit mode changed to non-automatic commit, see: connectionProxyXA.setAutoCommit method. This step will register the branch transaction with the TC.
- Perform DB operations.
- If the execution fails, rollback, see: connectionProxyXA.rollback()
- If the transaction starts with automatic commit, commit, see: connectionProxyXA.commit()
public static <T, S extends Statement> T execute(AbstractConnectionProxyXA connectionProxyXA, StatementCallback<T, S> statementCallback, S targetStatement, Object... args) throws SQLException { boolean autoCommitStatus = connectionProxyXA.getAutoCommit(); if (autoCommitStatus) { // XA Start connectionProxyXA.setAutoCommit(false); } try { T res = null; try { // execute SQL res = statementCallback.execute(targetStatement, args); } catch (Throwable ex) { if (autoCommitStatus) { // XA End & Rollback try { connectionProxyXA.rollback(); } catch (SQLException sqle) { // log and ignore the rollback failure. LOGGER.warn( "Failed to rollback xa branch of " + connectionProxyXA.xid + "(caused by SQL execution failure(" + ex.getMessage() + ") since " + sqle.getMessage(), sqle); } } if (ex instanceof SQLException) { throw ex; } else { throw new SQLException(ex); } } if (autoCommitStatus) { try { // XA End & Prepare connectionProxyXA.commit(); } catch (Throwable ex) { LOGGER.warn( "Failed to commit xa branch of " + connectionProxyXA.xid + ") since " + ex.getMessage(), ex); // XA End & Rollback if (!(ex instanceof SQLException) || !AbstractConnectionProxyXA.SQLSTATE_XA_NOT_END.equalsIgnoreCase(((SQLException) ex).getSQLState())) { try { connectionProxyXA.rollback(); } catch (SQLException sqle) { // log and ignore the rollback failure. LOGGER.warn( "Failed to rollback xa branch of " + connectionProxyXA.xid + "(caused by commit failure(" + ex.getMessage() + ") since " + sqle.getMessage(), sqle); } } if (ex instanceof SQLException) { throw ex; } else { throw new SQLException(ex); } } } return res; } finally { if (autoCommitStatus) { connectionProxyXA.setAutoCommit(true); } } }
3.2.2 Set autocommit mode: ConnectionProxyXA#setAutoCommit
- Register branch transactions with TC.
- Bind the current branch transaction id with xaResource, xaResource.start
public void setAutoCommit(boolean autoCommit) throws SQLException { if (currentAutoCommitStatus == autoCommit) { return; } if (autoCommit) { // According to JDBC spec: // If this method is called during a transaction and the // auto-commit mode is changed, the transaction is committed. if (xaActive) { commit(); } } else { if (xaActive) { throw new SQLException("should NEVER happen: setAutoCommit from true to false while xa branch is active"); } // Start a XA branch long branchId = 0L; try { // 1. register branch to TC then get the branchId branchId = DefaultResourceManager.get().branchRegister(BranchType.XA, resource.getResourceId(), null, xid, null, null); } catch (TransactionException te) { cleanXABranchContext(); throw new SQLException("failed to register xa branch " + xid + " since " + te.getCode() + ":" + te.getMessage(), te); } // 2. build XA-Xid with xid and branchId this.xaBranchXid = XAXidBuilder.build(xid, branchId); try { // 3. XA Start xaResource.start(this.xaBranchXid, XAResource.TMNOFLAGS); } catch (XAException e) { cleanXABranchContext(); throw new SQLException("failed to start xa branch " + xid + " since " + e.getMessage(), e); } // 4. XA is active this.xaActive = true; } currentAutoCommitStatus = autoCommit; }
3.2.3 Set autocommit mode: ConnectionProxyXA#commit
- Terminate the work performed by the xaBranchXid branch, xaResource.end
- Request the resource manager to prepare the submission of the xaBranchXid transaction, that is, xaResource.prepare
public void commit() throws SQLException { if (currentAutoCommitStatus) { // Ignore the committing on an autocommit session. return; } if (!xaActive || this.xaBranchXid == null) { throw new SQLException("should NOT commit on an inactive session", SQLSTATE_XA_NOT_END); } try { // XA End: Success xaResource.end(xaBranchXid, XAResource.TMSUCCESS); // XA Prepare xaResource.prepare(xaBranchXid); // Keep the Connection if necessary keepIfNecessary(); } catch (XAException xe) { try { // Branch Report to TC: Failed DefaultResourceManager.get().branchReport(BranchType.XA, xid, xaBranchXid.getBranchId(), BranchStatus.PhaseOne_Failed, null); } catch (TransactionException te) { LOGGER.warn("Failed to report XA branch commit-failure on " + xid + "-" + xaBranchXid.getBranchId() + " since " + te.getCode() + ":" + te.getMessage() + " and XAException:" + xe.getMessage()); } throw new SQLException( "Failed to end(TMSUCCESS)/prepare xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe .getMessage(), xe); } finally { cleanXABranchContext(); } }
3.2.4 Set autocommit mode: ConnectionProxyXA#rollback
- Report transaction execution failure to TC.
- Terminate the work performed by the xaBranchXid branch, xaResource.end
public void rollback() throws SQLException { if (currentAutoCommitStatus) { // Ignore the committing on an autocommit session. return; } if (!xaActive || this.xaBranchXid == null) { throw new SQLException("should NOT rollback on an inactive session"); } try { // Branch Report to TC DefaultResourceManager.get().branchReport(BranchType.XA, xid, xaBranchXid.getBranchId(), BranchStatus.PhaseOne_Failed, null); } catch (TransactionException te) { // log and ignore the report failure LOGGER.warn("Failed to report XA branch rollback on " + xid + "-" + xaBranchXid.getBranchId() + " since " + te.getCode() + ":" + te.getMessage()); } try { // XA End: Fail xaResource.end(xaBranchXid, XAResource.TMFAIL); } catch (XAException xe) { throw new SQLException( "Failed to end(TMFAIL) xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe .getMessage(), xe); } finally { cleanXABranchContext(); } }
3.3 TM sends a request to TC to notify Global Commit/Rollback
This process is identical to AT mode.
3.4 TC sends Branch Commit/Rollback requests to branches
This process is identical to AT mode.
3.5 RM for Branch Commit/Rollback
When AbstractRMHandler receives the request, it is ultimately handed over to ResourceManagerXA for processing. The specific Commit and Rollback are handed over to xaCommit and xaRollback, which directly trigger the commit and rollback methods of xaResource.
public void xaCommit(String xid, long branchId, String applicationData) throws XAException { XAXid xaXid = XAXidBuilder.build(xid, branchId); xaResource.commit(xaXid, false); releaseIfNecessary(); } public void xaRollback(String xid, long branchId, String applicationData) throws XAException { XAXid xaXid = XAXidBuilder.build(xid, branchId); xaResource.rollback(xaXid); releaseIfNecessary(); }
3.6 Exception Compensation
The exception compensation process and AT mode are identical.
4 XAResource
From the source level above, Seata's XA schema is based on jdk's XAResource interface. The following is a description of this interface in the JDK documentation.
Introduction to 4.1 XAResource
The XAResource interface is a Java mapping of the industry standard XA interface based on the X/Open CAE specification (Distributed Transaction: XA specification).
In a distributed transaction processing (DTP) environment, the XA interface defines the protocol between a resource manager and a transaction manager. The JDBC driver or JMS provider implements this interface to support the association of global transactions with a database or messaging service connection.
Any transaction resource that can be used by an application in an environment where transactions are controlled by an external transaction manager can support the XAResource interface. A database management system is one such resource. Applications can access data through multiple database connections. Each database connection is added to the list as a transactional resource through the transaction manager. The transaction manager acquires an XAResource for each connection that participates in a global transaction. The transaction manager uses the start method to establish associations between global transactions and resources, and the end method to remove associations between transactions and resources. Resource Manager is responsible for associating global transactions with all work performed on its data between start and end method calls.
When a transaction commits, the transaction manager tells the resource manager to prepare, commit, or roll back the transaction based on the two-phase commit protocol.
4.2 Method Summary
Return results | Method Signature | describe |
---|---|---|
void | commit(Xid xid, boolean onePhase) | Commits the global transaction specified by xid. |
void | end(Xid xid, int flags) | Terminates work performed on behalf of a transaction branch. |
void | forget(Xid xid) | Tell the resource manager to ignore heuristically completed transaction branches. |
int | getTransactionTimeout() | Gets the current transaction timeout value set for this XAResource instance. |
boolean | isSameRM(XAResource xares) | Call this method to determine if the resource manager instance represented by the target object is the same as that represented by the parameter xares. |
int | prepare(Xid xid) | Request the resource manager to prepare the transaction commit for the transaction specified in xid. |
Xid[] | recover(int flag) | Get a list of prepared transaction branches from Resource Manager. |
void | rollback(Xid xid) | Notify the resource manager to roll back the work performed on behalf of the transaction branch. |
boolean | setTransactionTimeout(int seconds) | Set the current transaction timeout value for this XAResource instance. |
void | start(Xid xid, int flags) | Starts work on behalf of the transaction branch specified in xid. |
4.3 Field Summary
type | attribute | describe |
---|---|---|
static int | TMENDRSCAN | Terminate recovery scan. |
static int | TMFAIL | Unassociate the caller and mark the transaction branch as rollback-only. |
static int | TMJOIN | The caller is connecting to an existing transaction branch. |
static int | TMNOFLAGS | Use TMNOFLAGS to indicate that no flag value is selected. |
static int | TMONEPHASE | The caller is using a phase optimization. |
static int | TMRESUME | The caller is recovering association with a pending transaction branch. |
static int | TMSTARTRSCAN | Start the recovery scan. |
static int | TMSUCCESS | Unassociate the caller from the transaction branch. |
static int | TMSUSPEND | The caller is suspending (not terminating) its association with a transaction branch. |
static int | XA_OK | Transaction is properly prepared. |
static int | XA_RDONLY | The transaction branch is read-only and committed. |