1, Before learning about Seata, take a brief look at the two-phase commit
1. Preparation stage
The transaction coordinator (transaction manager) sends a Prepare message to each participant (Resource Manager). Each participant either directly returns a failure (such as permission verification failure), or executes a transaction locally, writes local redo and undo logs, but does not commit.
2. Submission phase
If the coordinator receives the failure message or timeout from the participant, it directly sends a rollback message to each participant; Otherwise, send a commit message; Participants perform commit or rollback operations according to the instructions of the coordinator to release the lock resources used in all transaction processing. (Note: Lock resources must be released at the final stage)
TCC will inflate our business code too much, which is not discussed here.
2, Introduction to Seata
Seata provides AT, TCC, SAGA and XA transaction modes. AT is the first mode promoted by Alibaba. Because AT mode is a castrated version, Alibaba cloud has GTS.
Many functions of AT mode depend on the implementation of SDK. In the cloud oriented era, the direction of AT mode will be the lightweight and standardization of SDK, Sink most of the capabilities into the Agent layer (in the form of Agent or Sidecar), so that the application can work only with a very simple SDK and standard SQL. AT the same time, the condition of AT mode is that the database itself must support local transactions, and the table must define a primary key to generate a front image and a rear image for rollback.
Take the above figure as an example. When a user places an order, the order service is a TM and an RM. It registers branch transactions with the Seata server (TC). At the same time, the order service calls the product service service and the account service. The two called services are RM, and each service registers branch transactions with the Seata server, Listen to the transaction results of each service. As long as there is a service exception or timeout, the results of the first stage will be rolled back based on the pre image and post image.
Pre image: query the results before execution
SELECT amount FROM account WHERE user_id = 1;
At this time, it is assumed that the value of account before executing business sql is 1000
Execute business SQL
UPDATE account SET amount = 900 WHERE user_id = 1
The post image is used to query the results of update execution
SELECT amount FROM account WHERE user_id = 1;
After executing the business sql, the value of account is 900
3, Seata distributed transaction component
-
Transaction coordinator TC: the server side of seata, which is used to save global transactions, branch transactions, global locks, etc., and determine the commit and rollback of each transaction participant, commonly known as God's perspective.
-
Resource manager RM: every micro service, that is, the participant of the transaction.
-
Transaction manager Tm: it is also a microservice, but the service acts as the leading brother and determines the start, commit and rollback of global transactions. Any annotation marked with @ GlobalTransaction in the microservice can be understood as TM, and it is also an RM.
4, Seata's demo
1. Set up Seata server
Pull from github https://github.com/seata/seata/releases , because the version of Spring Cloud Alibaba I use is 2.2 5. Release, the corresponding seata client dependency is 1.3 0, so I downloaded 1.3 The server of 0. Download the directory after decompression.
Enter the conf folder of the decompression directory,
Then edit registry Conf file, I use the Nacos of Spring Cloud Alibaba in the registry, so the type is changed to Nacos. Here, it is used for communication in the cluster mode. Of course, you can also use the file direct connection mode for stand-alone. For configuration, I use the file mode, and the type is changed to file. The corresponding file is the same as registry Conf file under the same directory Conf file.
Next, change the file Conf file currently supports three modes: file, db and redis. Here I use the db mode and configure our database connection at the same time.
The three table names required by the database will be thrown into the script folder of the source code below, that is, Seata SQL file. In addition, you need to create the business table corresponding to each microservice database and the undo corresponding to each microservice database_ Log table.
Exit the conf directory of the seata server, enter the bin directory, and start seata
./seata-server.sh -h ip -p port -m db -n 1 >log.out &
- -h: The ip address of your server.
- -p: The port number of the server side, which is 8091 by default
- -m: Global transaction storage mode (file/db/redis).
- -n: When there are multiple server s, it is used to distinguish each node to avoid generating transactionId conflicts.
Seeing this screen after startup indicates that the startup is successful.
2. The microservice code and address are not explained here. If you don't understand, you can chat privately. https://gogs.tianqingzhao.com/tianqingzhao/seata-demo
5, Seata principle model
Picture address: https://www.processon.com/view/link/6108ddc3637689310e61f4ed
Get the database connection object through the proxy data source, register RM with TC, execute the pre image before executing the local transaction, that is, execute the database record before executing the business sql, and then execute the post image after executing the business sql, that is, execute the database record after executing the business sql. AT this time, if a branch transaction is abnormal, roll back the global transaction through the unlo under each microservice_ The records of the log table are inversely compensated. If the three branch transactions are in normal state, the global commit is performed. It should be noted that the default AT mode of Seata is a little different from that of 2PC: 2PC only executes non submission in the first stage, and only commits in the second stage. Seata executes and commits in the first stage.
Remember some nouns of seata
- Open global transaction
- Branch transaction registration
- Global transaction commit
- Global transaction rollback
6, The first phase of Seata starts the global transaction
Picture address: https://www.processon.com/view/link/6108de13e0b34d3e35b8e4ef
Summary: scan @ GlobalTransactional annotation, TM sends a request to TC through Netty to obtain global transaction XID; TC generates the global transaction XID and stores it in the global transaction table global_table.
7, Seata phase I branch transaction registration
Picture address: https://www.processon.com/view/link/6108de4be401fd6714ba761d
Summary: prepare the front image; Execute the target sql but not submit it; Prepare the post image and assemble undo_log. Register the branch transaction with the TC. The TC side obtains the global transaction lock and stores the branch transaction information in the branch_table and store the global transaction lock information in lock_table; RM side submits undo_log information, and store the pre image and post image in the undo under the corresponding service_ Log table, used for transaction rollback; At the same time, the RM side commits local transactions.
8, Seata phase 2 global transaction commit
Picture address: https://www.processon.com/view/link/61079f72e401fd7c4ed48a80
Summary: TC deletes the corresponding lock first_ Table and change the global transaction status to asynchronous commit. Scan the global transactions in asynchronous commit status according to the asynchronous commit task initialized at startup. First, send a request to RM through Netty to delete undo_log table. RM returns the successful submission and deletes it through the synchronization queue. TC deletes branch transactions according to the status returned by RM, that is, deletes branches_ Table data and release lock. Finally, delete the global transaction, that is, delete the global transaction_ Table data.
9, Rollback of the second phase global transaction of Seata
Picture address: https://www.processon.com/view/link/6108e656e0b34d3e35b90ed4
Summary: TC changes the global status to rollback and sends a request to RM to rollback the data. RM receives the request to reverse compensate the data and delete undo_log table and returns the status of successful rollback. Then, TC releases the lock resources corresponding to the branch transaction and deletes the branch transaction, that is, deletes the lock_table and branch_table data of two tables. Then delete the global transaction.
Take a brief look at undo and executeOn
undo method:
@Override public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException { Connection conn = null; ResultSet rs = null; PreparedStatement selectPST = null; boolean originalAutoCommit = true; for (; ; ) { try { // Get the connection to the database conn = dataSourceProxy.getPlainConnection(); // The entire undo process should run in a local transaction. // Set manual submission if (originalAutoCommit = conn.getAutoCommit()) { conn.setAutoCommit(false); } // Find UNDO LOG // Precompiled query statement selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL); // Set where condition selectPST.setLong(1, branchId); selectPST.setString(2, xid); rs = selectPST.executeQuery(); boolean exists = false; while (rs.next()) { exists = true; // It is possible that the server repeatedly sends a rollback request to roll back // the same branch transaction to multiple processes, // ensuring that only the undo_log in the normal state is processed. // Query undo_log of the log table_ Status of the status field int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS); // If it is not in normal state, it will not be rolled back if (!canUndo(state)) { if (LOGGER.isInfoEnabled()) { LOGGER.info("xid {} branch {}, ignore {} undo_log", xid, branchId, state); } return; } // Get the value of the context field, that is, the serialization type. The default is serializer=jackson String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT); // When processing the value of contextString, the final result is serializer=jackson Map<String, String> context = parseContext(contextString); byte[] rollbackInfo = getRollbackInfo(rs); // The serialized type is jackson String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY); // If it is empty, the default is jackson. Otherwise, the corresponding resolution type will be loaded according to the spi UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance() : UndoLogParserFactory.getInstance(serializer); // Resolved as a BranchUndoLog object, including xid, branchId and sqlUndoLogs, // sqlUndoLogs is also a collection. Each element contains four fields: sqlType(SELECT/ADD/UPDATE/DELETE), tableName, befomeImages and afterImages BranchUndoLog branchUndoLog = parser.decode(rollbackInfo); try { // put serializer name to local setCurrentSerializer(parser.getName()); List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs(); if (sqlUndoLogs.size() > 1) { // Used to reverse the order of a given list (L) element, in other words, it can be said that this method is used to change the order of its elements from the right. Collections.reverse(sqlUndoLogs); } for (SQLUndoLog sqlUndoLog : sqlUndoLogs) { TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta( conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId()); sqlUndoLog.setTableMeta(tableMeta); // The corresponding executor is obtained by the database type and the type of sql statement AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor( dataSourceProxy.getDbType(), sqlUndoLog); undoExecutor.executeOn(conn); } } finally { // remove serializer name // Delete serialization type removeCurrentSerializer(); } } // If undo_log exists, it means that the branch transaction has completed the first phase, // If undo_log exists, indicating that the first phase of the branch transaction has been completed // we can directly roll back and clean the undo_log // We can roll back and delete undo_ Records in the log table // Otherwise, it indicates that there is an exception in the branch transaction, // Otherwise, the branch transaction is abnormal // causing undo_log not to be written to the database. // Causes the undo log not to be written to the database. // For example, the business processing timeout, the global transaction is the initiator rolls back. // For example, when the business process times out, the global transaction is rolled back by the initiator. // To ensure data consistency, we can insert an undo_log with GlobalFinished state // To ensure data consistency, we can insert an undo log in GlobalFinished status, // to prevent the local transaction of the first phase of other programs from being correctly submitted. // To prevent local transactions in the first phase of other programs from being committed correctly. // See https://github.com/seata/seata/issues/489 // This is mainly to solve a bug, that is, undo_log does not exist, // For example, the client timed out when initiating branch transactions to the server (processGlobalTransactionCommit method), // Undo now_ There will be no records in the log table if (exists) { deleteUndoLog(xid, branchId, conn); conn.commit(); if (LOGGER.isInfoEnabled()) { LOGGER.info("xid {} branch {}, undo_log deleted with {}", xid, branchId, State.GlobalFinished.name()); } } else { // If it doesn't exist, go to undo_ A record is inserted into the log table. The status is finished. At this time, it will be considered as timeout, // Because of the timeout, the branch transaction will continue to execute, // Then the processGlobalTransactionCommit method will continue, // That is to undo_log table continues to write data. When inserting there, a primary key conflict exception will be reported because it has been inserted here. // At this time, the branch transaction will not insert the data of the business code into the business table to form dirty data. // In case of timeout, for example, if the call from order service to product service times out, Seata server should tell product service to roll back. insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn); conn.commit(); if (LOGGER.isInfoEnabled()) { LOGGER.info("xid {} branch {}, undo_log added with {}", xid, branchId, State.GlobalFinished.name()); } } return; } catch (SQLIntegrityConstraintViolationException e) { // Possible undo_log has been inserted into the database by other processes, retrying rollback undo_log if (LOGGER.isInfoEnabled()) { LOGGER.info("xid {} branch {}, undo_log inserted, retry rollback", xid, branchId); } } catch (Throwable e) { if (conn != null) { try { conn.rollback(); } catch (SQLException rollbackEx) { LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx); } } throw new BranchTransactionException(BranchRollbackFailed_Retriable, String .format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid, branchId, e.getMessage()), e); } finally { try { if (rs != null) { rs.close(); } if (selectPST != null) { selectPST.close(); } if (conn != null) { if (originalAutoCommit) { conn.setAutoCommit(true); } conn.close(); } } catch (SQLException closeEx) { LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx); } } } }
executeOn method:
public void executeOn(Connection conn) throws SQLException { if (IS_UNDO_DATA_VALIDATION_ENABLE && !dataValidationAndGoOn(conn)) { return; } try { // Build undo_log, that is, UPDATE t_product SET count = ? WHERE id = ? String undoSQL = buildUndoSQL(); // Precompiled sql PreparedStatement undoPST = conn.prepareStatement(undoSQL); // Get the number of rows to be updated, that is, the value of the pre image. In other words, it is the value before updating TableRecords undoRows = getUndoRows(); for (Row undoRow : undoRows.getRows()) { ArrayList<Field> undoValues = new ArrayList<>(); // Resolve primary keys and updated fields List<Field> pkValueList = getOrderedPkList(undoRows, undoRow, getDbType(conn)); for (Field field : undoRow.getFields()) { if (field.getKeyType() != KeyType.PRIMARY_KEY) { undoValues.add(field); } } // Bind parameters to sql undoPrepare(undoPST, undoValues, pkValueList); // Reverse compensation undoPST.executeUpdate(); } } catch (Exception ex) { if (ex instanceof SQLException) { throw (SQLException) ex; } else { throw new SQLException(ex); } } }
10, Two bug s in Seata
1. bug1
Locking based on file mode:
io.seata.server.storage.file.session.FileSessionManager#lockAndExecute method. The globalSession object here is loaded every time the database is queried. The global transaction submission is a globalSession object 1 and a branch transaction registered globalSession object 2. The locks of the two objects are isolated from each other. Therefore, it is assumed that thread 1 is locked when committing a global transaction, and thread 2 registers a branch transaction, When locking, you can also obtain lock resources.
@Override public <T> T lockAndExecute(GlobalSession globalSession, GlobalSession.LockCallable<T> lockCallable) throws TransactionException { globalSession.lock(); try { return lockCallable.call(); } finally { globalSession.unlock(); } }
2.bug2
io.seata.server.coordinator.DefaultCoordinator#doGlobalCommit io.seata.server.coordinator.DefaultCore#commit globalSession.closeAndClean(); close(); lifecycleListener.onClose(this); globalSession.setActive(false);
When the global transaction is submitted, you can close the global transaction, which prohibits the registration of branch transactions by setting the active field.
But looking back at the registration process of branch transactions:
io.seata.server.coordinator.DefaultCoordinator#doBranchRegister io.seata.server.coordinator.DefaultCore#branchRegister io.seata.server.coordinator.AbstractCore#branchRegister io.seata.server.coordinator.AbstractCore#globalSessionStatusCheck
The globalSessionStatusCheck method checks the status of the global transaction, but you should know that when the branch transaction is registered, the global transaction is queried in real time through xid, and a new GlobalSession object will be generated. However, when the global transaction is closed, it is set to memory. In other words, it is the status of the two objects, So the judgment here is invalid. In general, it is caused by the same problem as bug 1.
if (!globalSession.isActive()) { throw new GlobalTransactionException(GlobalTransactionNotActive, String.format( "Could not register branch into global session xid = %s status = %s, cause by globalSession not active", globalSession.getXid(), globalSession.getStatus())); }
3. Let's simulate these two seata bug s with pseudo code. First, create the GlobalSession class, which has an internal class GlobalSessionLock, that is, the lock object. Then there is an active field, and the default value is true.
package com.tqz.seata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * <p>Global transaction information * * @autoor tianqingzhao * @since 2021/7/25 17:40 */ public class GlobalSession { private static final Logger logger = LoggerFactory.getLogger(GlobalSession.class); private GlobalSessionLock globalSessionLock = new GlobalSessionLock(); public boolean isActive() { return active; } public void setActive(boolean active) { this.active = active; } private boolean active = true; public GlobalSessionLock getGlobalSessionLock() { return globalSessionLock; } public void setGlobalSessionLock(GlobalSessionLock globalSessionLock) { this.globalSessionLock = globalSessionLock; } public void lock() throws InterruptedException { globalSessionLock.lock(); } public void unlock() { globalSessionLock.unlock(); } private static class GlobalSessionLock { private Lock globalSessionLock = new ReentrantLock(); public Lock getGlobalSessionLock() { return globalSessionLock; } public void setGlobalSessionLock(Lock globalSessionLock) { this.globalSessionLock = globalSessionLock; } private static final int GLOBAL_SESSION_LOCK_TIME_OUT_MILLS = 2 * 1000; public void lock() throws InterruptedException { try { if (globalSessionLock.tryLock(GLOBAL_SESSION_LOCK_TIME_OUT_MILLS, TimeUnit.MILLISECONDS)) { return; } } catch (InterruptedException e) { logger.error("Interrupted error", e); } throw new RuntimeException("Lock timeout"); } public void unlock() { globalSessionLock.unlock(); } } }
Next, test, create and start two threads. The first thread commits the global transaction and the second transaction registers the branch transaction.
package com.tqz.seata; /** * <p> * * @autoor tianqingzhao * @since 2021/7/25 17:40 */ public class Test { public static void main(String[] args) { Thread commitThread = new Thread(new Runnable() { @Override public void run() { commit(); } }); commitThread.setName("commitThread"); Thread branchRegThread = new Thread(new Runnable() { @Override public void run() { branchRegister(); } }); branchRegThread.setName("branchRegThread"); commitThread.start(); branchRegThread.start(); } /** * Commit global transaction */ public static void commit() { // Simulate the query from the database GlobalSession globalSession = new GlobalSession(); try { globalSession.lock(); globalSession.setActive(false); System.out.println("Simulated execution business method"); } catch (InterruptedException e) { e.printStackTrace(); }finally { globalSession.unlock(); } } /** * Register branch transactions */ public static void branchRegister() { // Simulation from the database GlobalSession globalSession = new GlobalSession(); try { globalSession.lock(); if(!globalSession.isActive()){ throw new RuntimeException("Global transaction close failed to register branch transaction"); } } catch (InterruptedException e) { e.printStackTrace(); }finally { globalSession.unlock(); } } }
In idea, use Thread breakpoints for debugging, right-click the two breakpoints respectively, and then select Thread.
At this time, start the main method and enter the commitThread first. Here, we simulate finding out the GlobalSession object from the database, and then lock it.
Then switch back to branchRegisterThread. The first bug is locking based on file mode. commitThread and branchRegisterThread are locks of two objects, so each thread can lock successfully.
Let's look at the second bug. Let's switch to commitThread and set the active field to false.
Then switch to branchRegisterThread. The active state of the GlobalSession object of your commitThread thread is in memory. I don't care what the active state of the GlobalSession object of branchRegisterThread thread is, so I can't judge it here. I'm impressed that the bug is from zero to 1.4 The X version has not been repaired.
11, BASE theory of distributed transaction
BASE theory
- BA: Basically Available
- S: Soft state
- E: Eventually consistent
BASE theory is evolved from CAP theorem, which is the result of the trade-off between consistency and availability in CAP. Core idea: even if strong consistency cannot be achieved, each business adopts appropriate methods to achieve the final consistency of the system according to its own characteristics. In short, one should be abandoned.
Similar problems exist in distributed transactions.
- SAGA mode meets the requirements of low service intrusion and high performance.
- Seata's AT mode meets the requirements of low service intrusion and strong isolation.
- TCC mode meets the requirements of high performance and strong isolation.
Distributed transaction has always been the biggest problem in the industry. There is no 100% solution. In short, distributed transaction can not be used as far as possible!