PostgreSQL 10.0 preview sharding enhancement - support for distributed transactions

Label

PostgreSQL, 10.0, sharding, distributed transaction, 2pc, two-phase transaction

background

As a complete distributed database (sharding), it is not possible without distributed transaction support.

What is distributed transaction? For example, we compare a database to a child, each child has 100 yuan, and then children A give children B 20 yuan, so it should be A remaining 80 yuan, B remaining 120 yuan. What happens if children B accidentally loses 20 yuan in the transaction?

In theory, the transaction should be unsuccessful, A and B are back to the state of 100 yuan. There should be no intermediate state.

PostgreSQL 10.0 has built-in sharding function based on postgres_fdw. It also adds 2PC module to postgres_fdw. You can set whether foreign server supports 2PC.

When creating foreign server, you specify it by the parameter two_phase_commit.

Principle of Distributed Transaction Implementation

When will two-phase transactions be used

When a write transaction involves the write operation of the shard node >= 2_phase_commit= on.

When transactions involve only a single shard (including local writes), 2PC is not required, and local and remote consistency can be ensured by pre-commit of notify when local commits are made.

How to deal with the unknown two-stage state after crash

Two-stage transactions, if the database CRASH is in the second stage (after prepare dness is successful, before commit is ready), how to deal with it?

PostgreSQL provides two approaches

1. Call the pg_fdw_xact_resolve() function to handle it manually.

2. Use pg_fdw_xact_resolver module to process automatically.

pg_fdw_xact_resolver is a worker process, which automatically detects whether there is an unknown 2PC transaction and processes it automatically.

API

To support distributed transactions, the following new API s are added

Two-stage transaction to obtain the transaction ID generated by the 1st prepare dness phase of the shard node  
GetPreparedID() is called to get transaction identifier on pre-commit phase.  
  
Non-two-phase transaction, commit or rollback at shard node  
EndForeignTransaction() is called on commit phase and executes either COMMIT or ROLLBACK on foreign servers.  
  
Two-stage transaction, 1st, performs pre-commit at the shard node  
PrepareForeignTransaction() is called on pre-commit phase and executes PREPARE TRANSACTION on foreign servers.  
  
Two-stage transaction, 2nd, performs commit or rollback prepared xact at the shard node  
ResolvePrepareForeignTransaction() is called on commit phase and execute either COMMIT PREPARED or ROLLBACK PREPARED with given transaction identifier on foreign servers.  
  
If foreign server does not turn on two-stage support, the latter two API s are not required  
If the foreign data wrapper is not capable of two-phase-commit protocol, last two APIs are not required.  

Current limitations

Note that at present, as long as two or more transactions are involved and Two-phase-commit shard is opened, two-phase transactions will be opened.

Some performance may be affected, so use the Two-phase-commit switch as appropriate.

Two-phase-commit protocol is used even when the transaction involves with multiple servers but does not modify data.

Principle Analysis of Distributed 2PC Transaction Processing

In order to implement distributed two-stage transactions, coordinator nodes need to track transactions of shard nodes. PG records the transaction status of shard nodes by opening a shared memory area (KnownFDWXact list). The rewriting of this area requires recording REDO logs. At checkpoint, it is persisted to the $PGDATA/fdw_xact directory. Each 2PC transaction corresponds to a file in this directory. The file name is (xid, foreign server oid, user oid). .

The distributed 2PC transaction processing process is as follows:

1. two_phase_commit = on's foreign server, when a transaction is opened, is connected to the MyFDW Connection structure by registering with RegisterXactForeignServer().

2. The pre-commit phase performs the following actions (involving shard nodes used in transactions)

2.1 Gets xact id of this transaction for foreign server of two_phase_commit = on

2.2 At the node of two_phase_commit = off, executing commit requires a notify before shard can actually commit.

2.3 Write xact id into fdw_xact shared memory structure and write redo log XLOG_FDW_XACT_INSERT.

2.4 Execute PREPARE TRANSACTION at the node of two_phase_commit = on.

2.5 Call RecordTransactionCommit() locally to submit. Complete the first phase of 2PC. If the first phase fails, all nodes, including local nodes, can roll back (including two_phase_commit = off shard).

3. If the first phase is successfully executed, the node transaction indicating two_phase_commit = off has been successfully committed, and the local transaction has been successfully committed, entering the second phase.

3.1 Resolve the xact id, Resolve foreign prepared transaction of shard(foreign) nodes from shared memory.

3.2 Remove foreign transaction entry from shared memory and write WAL log XLOG_FDW_XACT_REMOVE.

If the first phase succeeds, but the second phase fails, or before it succeeds, the master and standby switch occurs, then recovery two-phase transaction is required.

Analysis of the Recovery Process of Distributed Two-Phase Transactions

The recovery process may occur in the main library crash, or on the standby. The principle of distributed two-stage transaction recovery is similar to that of common two-stage transaction recovery.

When restoring, parse XLOG_FDW_XACT_INSERT and XLOG_FDW_XACT_REMOVE record s from the wal log, and restore the 2PC status record file in the fdw_xact directory.

At the end of the redo recovery phase, PostgreSQL scans any files in the pg_fdw_xact directory and, if so, indicates 2PC transactions in an unknown state. Uncommitted 2PC transactions include transaction ID, shard id, user id, and the database needs to handle unknown 2PC transactions, commit or rollback.

Crash recovery  
  
During crash recovery, the fdw_xact entry are inserted to KnownFDWXact List or removed from KnownFDWXact list when corresponding WAL records are replayed.   
  
After the redo is done fdw_xact file is re-created and then pg_fdw_xact directory is scanned for unresolved foreign prepared transactions.  
  
The files in this directory are named as triplet (xid, foreign server oid, user oid) to create a unique name for each file.   
  
This scan also emits the oldest transaction id with an unresolved prepared foreign transactions.   
  
This affects oldest active transaction id, since the status of this transaction id is required to decide the fate of unresolved prepared foreign transaction.   
  
On standby during WAL replay files are just inserted or removed.   
  
If the standby is required to finish recovery and take over the master, pg_fdw_xact is scanned to read unresolved foreign prepared transactions into the shared memory.  
  
Many of fdw_xact.c code is inspired by two_phase.c code. So recovery mechanism and process are almost same as two_phase.   
  
The patch incorporated recent optimization of two_phase.c.  

Examples of environmental deployment

1. coordinator library settings

In order to support 2PC, the coordinator library should keep some 2PC state, so the coordinator library also has a parameter to control the maximum number of 2PC transactions allowed, which is controlled by the max_prepared_foreign_transactions parameter.

The formula is as follows

max_prepared_foreign_transactions =  (max_connections) * (# of foreign server with two_phase_commit = on)  

At the same time, the coordinator library also sets a parameter to support 2PC transactions.

max_prepared_transactions = 10  # It is recommended that max_connections be set equal.  

2. Sharing libraries also need to be set up as follows

max_prepared_transactions = 100 # same as max_connections  
log_statement = all  # Convenient observation  
log_line_prefix = '[S1]' # on shard2 server, we can set '[S2]'  

3. Create the postgres_fdw plug-in on the coordinator Library

4. Create a foreign server, assuming that there are two shard Libraries

$ psql  
=# CREATE SERVER shard_node1 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host 'shard1', dbname 'postgres', port '5432', two_phase_commit 'on');  
CREATE SERVER  
=# CREATE SERVER shard_node2 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host 'shard2', dbname 'postgres', port '5342', two_phase_commit 'on')  
CREATE SERVER  
=# SELECT * FROM pg_foreign_server;  
   srvname   | srvowner | srvfdw | srvtype | srvversion | srvacl |                   srvoptions                      
-------------+----------+--------+---------+------------+--------+-------------------------------------------------  
 shard_node1 |       10 |  16387 |         |            |        | {host=shard1,dbname=postgres,port=5432,two_phase_commit=on}  
 shard_node2 |       10 |  16387 |         |            |        | {host=shard2,dbname=postgres,port=5432,two_phase_commit=on}  
(2 rows)  

5. Create user mapping for foreign server

test

1. Start with a test of non-two-phase transactions (involving only one foreign server and not enabling two-phase transactions)

=# BEGIN;  
=# INSERT INTO ft1 VALUES(1);  
=# COMMIT;  

view log

[S1] LOG:  statement: SET search_path = pg_catalog  
[S1] LOG:  statement: SET timezone = 'UTC'  
[S1] LOG:  statement: SET datestyle = ISO  
[S1] LOG:  statement: SET intervalstyle = postgres  
[S1] LOG:  statement: SET extra_float_digits = 3  
[S1] LOG:  statement: START TRANSACTION ISOLATION LEVEL REPEATABLE READ  
[S1] LOG:  execute pgsql_fdw_prep_1: INSERT INTO public.ft1(c) VALUES ($1)  
[S1] DETAIL:  parameters: $1 = '1'  
[S1] LOG:  statement: DEALLOCATE pgsql_fdw_prep_1  
[S1] LOG:  statement: COMMIT TRANSACTION  

2. foreign server involving multiple two_phase_commit is on, which automatically opens two-phase transactions

$ psql  
=# BEGIN;  
=# INSERT INTO ft1 VALUES(2);  
=# INSERT INTO ft2 VALUES(2);  
=# COMMIT;  

The log is as follows, and you see that two-phase transactions have been started

[S1] LOG:  statement: SET search_path = pg_catalog  
[S1] LOG:  statement: SET timezone = 'UTC'  
[S1] LOG:  statement: SET datestyle = ISO  
[S1] LOG:  statement: SET intervalstyle = postgres  
[S1] LOG:  statement: SET extra_float_digits = 3  
[S1] LOG:  statement: START TRANSACTION ISOLATION LEVEL REPEATABLE READ  
[S1] LOG:  execute pgsql_fdw_prep_1: INSERT INTO public.ft1(c) VALUES ($1)  
[S1] DETAIL:  parameters: $1 = '2'  
[S1] LOG:  statement: DEALLOCATE pgsql_fdw_prep_1  
[S2] LOG:  statement: SET search_path = pg_catalog  
[S2] LOG:  statement: SET timezone = 'UTC'  
[S2] LOG:  statement: SET datestyle = ISO  
[S2] LOG:  statement: SET intervalstyle = postgres  
[S2] LOG:  statement: SET extra_float_digits = 3  
[S2] LOG:  statement: START TRANSACTION ISOLATION LEVEL REPEATABLE READ  
[S2] LOG:  execute pgsql_fdw_prep_2: INSERT INTO public.ft2(c) VALUES ($1)  
[S2] DETAIL:  parameters: $1 = '2'  
[S2] LOG:  statement: DEALLOCATE pgsql_fdw_prep_2  
[S1] LOG:  statement: PREPARE TRANSACTION 'px_1389361800_16388_10'  
[S2] LOG:  statement: PREPARE TRANSACTION 'px_53866648_16389_10'  
[S1] LOG:  statement: COMMIT PREPARED 'px_1389361800_16388_10'  
[S2] LOG:  statement: COMMIT PREPARED 'px_53866648_16389_10'  

3. Use two-stage transactions on coordinator nodes

=# BEGIN;  
=# INSERT INTO ft1 VALUES (3);  
=# INSERT INTO ft2 VALUES (3);  
=# PREPARE TRANSACTION 'gxid';  

At this point, you can see that 2PC will be sent down to the shard node

=# SELECT * FROM pg_fdw_xacts;  
 dbid  | transaction | serverid | userid |  status  |      identifier         
-------+-------------+----------+--------+----------+-----------------------  
 13182 |         564 |    16389 |     10 | prepared | px_450388264_16389_10  
 13182 |         564 |    16388 |     10 | prepared | px_569713952_16388_10  
(2 rows)  

When COMMIT PREPARED'gxid'is executed, a remote two-phase transaction is committed.

4. Two-stage transaction rollback to ensure distributed consistency

=# BEGIN;  
=# INSERT INTO lt VALUES(4);  
=# INSERT INTO ft1 VALUES(4);  
=# INSERT INTO ft2 VALUES(4);  

Close a shard and commit fails

=# COMMIT; -- error  

Check data consistency, all nodes have no data

=# SELECT * FROM lt WHERE c = '4'; -- data on local server  
 c   
---  
(0 rows)  
=# SELECT * FROM ft2 WHERE c = '4'; -- data on shard2 server  
 c   
---  
(0 rows)  
  
//When shard1 is restored, you can see that the automatic rollback has been dropped  
=# SELECT * FROM ft1 WHERE c = '4'; -- data on shard1 server  
 c   
---  
(0 rows)  

For this discussion of patch, see Mail Group, URL at the end of this article.

PostgreSQL community style is very rigorous, a patch may be discussed in the mail group for several months or even several years, according to your opinion repeated amendments, patch merged into master is very mature, so the stability of PostgreSQL is well known.

Reference resources

https://wiki.postgresql.org/wiki/2PC_on_FDW

https://www.postgresql.org/message-id/flat/CAFjFpRfQaCTt1vD9E35J%2BXxfCnZC5HONqgJgGpUjfktJdoYZVw%40mail.gmail.com#CAFjFpRfQaCTt1vD9E35J+XxfCnZC5HONqgJgGpUjfktJdoYZVw@mail.gmail.com

Keywords: PostgreSQL Database

Added by JayBachatero on Sun, 14 Jul 2019 00:35:29 +0300