Flinkcdc Hudi: full introduction to MySQL data entering the lake in real time III: explore and realize the high availability of FlinkCDC mysql master-slave database synchronization

Preface: Hudi series:

Flinkcdc Hudi: introduction to MySQL data entering the lake in real time 1: preliminary test
Flinkcdc Hudi: full introduction to MySQL data entering the lake in real time II: exceptions and solutions encountered during the integration of Hudi and Spark

1, Background

In the production environment, mysql is generally equipped with master-slave libraries to meet the needs of data backup, service disaster recovery, read-write separation and so on. When using flinkcdc for mysql data entering the lake, it is inevitable to deal with the master-slave database. The degree to which flinkcdc supports the switching of mysql master-slave libraries, how to configure the database, and how to cooperate with the operation and development of synchronization programs are the items that must be checked before flinkcdc is put into production and application.
This paper records the main verification process of Mysql master-slave data synchronization using FlinkCDC for reference.

2, Environmental preparation before verification

2.1 FlinkCDC+Hudi environment preparation

For the specific process of FlinkCDC+Hudi environment preparation, see Flinkcdc Hudi: introduction to MySQL data entering the lake in real time 1: preliminary test , this article will not repeat.

2.2 preparation of MySQL master-slave database environment

Prepare one Mysql master and one Mysql slave.
Main library: 192.168.2.100
Slave library 1:192.168.2.101
Slave library 2: 192.168.2.102

*The simple mysql installation commands are as follows:

#ubuntu install command
sudo apt install mysql-server -y
#linux Installation command
yum install mysql-server

For more installation applications, please refer to:
Ubuntu18.04 installing MySQL
Installing Mysql for Linux*

2.2 master slave library configuration

After mysql installation, configure the master-slave database. The default configuration file for mysql is in / etc / mysql / my cnf. The corresponding configuration of master-slave library is as follows.
Note: only the simplest configuration of master-slave library is provided here, which is only for testing. Do not apply these configurations directly to the production environment.

2.2.1 main warehouse configuration

!includedir /etc/mysql/conf.d/
!includedir /etc/mysql/mysql.conf.d/

[client]
default-character-set = utf8mb4

[mysql]
default-character-set = utf8mb4

[mysqld]
collation-server = utf8mb4_unicode_ci
init-connect='SET NAMES utf8mb4'
character-set-server = utf8mb4

bind-address = 0.0.0.0
server_id = 1
log-bin = /var/lib/mysql/mysql-bin
#binlog-do-db = *
log-slave-updates
sync_binlog = 1
auto_increment_offset = 1
auto_increment_increment = 1
log_bin_trust_function_creators = 1

2.2.2 slave library installation and configuration

!includedir /etc/mysql/conf.d/
!includedir /etc/mysql/mysql.conf.d/
[client]
default-character-set = utf8mb4

[mysql]
default-character-set = utf8mb4

[mysqld]
collation-server = utf8mb4_unicode_ci
init-connect='SET NAMES utf8mb4'
character-set-server = utf8mb4

bind-address = 0.0.0.0
server_id = 2
log-bin = /var/lib/mysql/mysql-bin
log-slave-updates
sync_binlog = 0     
##Specify which library the slave will copy
replicate-do-db = flink_cdc        
##During MySQL Master-slave replication, when the network between the Master and slave is interrupted, but the Master and slave cannot detect it (such as firewall or routing problems). Slave will wait for slave_ net_ After the timeout is set to the number of seconds, the network can be considered to have failed, and then it can be reconnected and catch up with the data of the main database during this period
slave-net-timeout = 60                    
log_bin_trust_function_creators = 1
read_only = 1

2.2.3 initialization of slave database data

The newly installed master-slave library does not need to initialize the slave library data because there is no data yet.
If a master library already exists and a slave library is added, data initialization is required.
1. Lock the database to be synchronized in the main database

mysql> use flink_cdc;
mysql> flush tables with read lock; 

2. dump all data in the main database

mysqldump -uroot -proot_password flink_cdc> flink_cdc.sql

3. After the data dump is completed in step 2, release the lock

mysql> unlock tables;

4. Import all data from library

mysql> source /path/flink_cdc.sql;

2.2.4 main library authorization

#Create slave account user_ User, test password_ test_ password
mysql> grant select,replication slave,replication client on *.* to 'user_test'@'%' identified by 'user_test_password';
#Update database permissions
mysql> flush privileges;
#View master binlog
### To initialize from the library, you need to execute before unlock tables.
mysql> show master status\G;
*************************** 1. row ***************************
             File: mysql-bin.000037   //Current binlog file
         Position: 700                //Current binlog file displacement
     Binlog_Do_DB: 
 Binlog_Ignore_DB: 
1 row in set (0.00 sec)

2.2.4 enable slave synchronization

#Execute the synchronization command, set the master server ip, synchronization account, password and synchronization location
mysql> change master to master_host='192.168.2.100',master_user='user_test',master_password='user_test_password',master_log_file='mysql-bin.000037',master_log_pos=700;
#Enable synchronization function
mysql> start slave;
#View slave synchronization status
mysql> show slave status\G;
*************************** 1. row ***************************
               Slave_IO_State: Connecting to master
                  Master_Host: 192.168.2.100
                  Master_User: user_test
                  Master_Port: 3306
                Connect_Retry: 60
              Master_Log_File: mysql-bin.000037
          Read_Master_Log_Pos: 700
               Relay_Log_File: node-142-relay-bin.000001
                Relay_Log_Pos: 4
        Relay_Master_Log_File: mysql-bin.000037
             Slave_IO_Running: Yes
            Slave_SQL_Running: Yes
              Replicate_Do_DB: flink_cdc
          Replicate_Ignore_DB: 
           Replicate_Do_Table: 
       Replicate_Ignore_Table: 
      Replicate_Wild_Do_Table: 
  Replicate_Wild_Ignore_Table: 
                   Last_Errno: 0
                   Last_Error: 
                 Skip_Counter: 0
          Exec_Master_Log_Pos: 700
              Relay_Log_Space: 154
              Until_Condition: None
               Until_Log_File: 
                Until_Log_Pos: 0
           Master_SSL_Allowed: No
           Master_SSL_CA_File: 
           Master_SSL_CA_Path: 
              Master_SSL_Cert: 
            Master_SSL_Cipher: 
               Master_SSL_Key: 
        Seconds_Behind_Master: NULL
Master_SSL_Verify_Server_Cert: No
                Last_IO_Errno: 0
                Last_IO_Error: 
               Last_SQL_Errno: 0
               Last_SQL_Error: 
  Replicate_Ignore_Server_Ids: 
             Master_Server_Id: 0
                  Master_UUID: 
             Master_Info_File: /var/lib/mysql/master.info
                    SQL_Delay: 0
          SQL_Remaining_Delay: NULL
      Slave_SQL_Running_State: 
           Master_Retry_Count: 86400
                  Master_Bind: 
      Last_IO_Error_Timestamp: 
     Last_SQL_Error_Timestamp: 
               Master_SSL_Crl: 
           Master_SSL_Crlpath: 
           Retrieved_Gtid_Set: 
            Executed_Gtid_Set: d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61167
                Auto_Position: 0
         Replicate_Rewrite_DB: 
                 Channel_Name: 
           Master_TLS_Version: 
1 row in set (0.00 sec)

show slave status_ IO_ Running and slave_ SQL_ The running process must be in the Yes status to enable synchronization normally. Abnormal condition Last_IO_Errno,Last_IO_Error, Last_SQL_Errno,Last_SQL_Error will be prompted accordingly.
After setting, you can add or delete data in the master database and verify the synchronization status in the slave database. It will not unfold here.

3, FlinkCDC syncs from the library into the lake for verification

3.1 flinkcdc Hudi operating environment

It is assumed that the reader has built the FlinkCDC+Hudi environment. This operation only records the necessary steps to verify that FlinkCDC synchronizes mysql master-slave data.
For the specific process of FlinkCDC+Hudi environment preparation, see Flinkcdc Hudi: introduction to MySQL data entering the lake in real time 1: preliminary test

3.1.1 mysql test table preparation

Create test table 1 in the main library

mysql> use flink_cdc;
mysql> CREATE TABLE `test_1` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `data` varchar(10) DEFAULT NULL,
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; 
## Repeat and insert multiple pieces of data
mysql> insert into test_1(data) values('data');

3.1.2 start synchronization job using FlinkSql

3.1.2.1 start FlinkSql

FLINK_HOME/bin/yarn-session.sh -s 4 -jm 1024 -tm 2048 -nm flink-hudi-0.10 -d
FLINK_HOME/bin/sql-client.sh embedded -s yarn-session -j ./lib/hudi-flink-bundle_2.11-0.10.0.jar shell

3.1.2.2 start job in FlinkSql

Flink SQL> set execution.checkpointing.interval=30sec;

##Create the flinksql mysqlcdc table
Flink SQL> create table mysql_test_1(
id bigint primary key not enforced,
data String,
create_time Timestamp(3)
) with (
'connector'='mysql-cdc',
'hostname'='192.168.2.101',    --Start synchronization from library 1, and change to the corresponding one when switching to master library or slave library 2 ip
'port'='3306',
'server-id'='5600-5604',
'username'='user_test',
'password'='user_test_password',
'server-time-zone'='Asia/Shanghai',
'debezium.snapshot.mode'='initial',
'database-name'='flink_cdc',
'table-name'='test_1'
)  

##Create the flinksql hudi table
Flink SQL> create table hudi_test_1(
id bigint,
data String,
create_time Timestamp(3),
PRIMARY KEY (`id`) NOT ENFORCED
)
with(
'connector'='hudi',
'path'='hdfs:///tmp/flink/cdcata/hudi_test_1',
'hoodie.datasource.write.recordkey.field'='id',
'hoodie.parquet.max.file.size'='268435456',
'write.precombine.field'='create_time',
'write.tasks'='1',
'write.bucket_assign.tasks'='1',
'write.task.max.size'='1024',
'write.rate.limit'='30000',
'table.type'='MERGE_ON_READ',
'compaction.tasks'='1',
'compaction.async.enabled'='true',
'compaction.delta_commits'='1',
'compaction.max_memory'='500',
'changelog.enabled'='true',
'read.streaming.enabled'='true',
'read.streaming.check.interval'='3',
'hive_sync.enable'='true',
'hive_sync.mode'='hms',
'hive_sync.metastore.uris'='thrift://hiveserver2:9083',
'hive_sync.db'='test',
'hive_sync.table'='hudi_test_1',
'hive_sync.username'='flinkcdc',
'hive_sync.support_timestamp'='true'
);
##Start the flick job
Flink SQL> set pipeline.name = flinkcdc_test_1;
Flink SQL> set execution.checkpointing.externalized-checkpoint-retention= RETAIN_ON_CANCELLATION;
Flink SQL> insert into hudi_test_1 select * from mysql_test_1;

3.1.2.3 successful start of operation

3.1.2.4 verify hudi landing data in FlinkSQL.

Flink SQL> select * from hudi_test_1;

3.2 simulation slave database data synchronization exception

At present, the environment we build is one master and two slaves. Here, we verify the exception from one and switch to slave two. Of course, you can also switch to the main, and the process of switching verification is the same. However, the general production environment will not synchronize data directly from the main library.

3.2.1 simulation slave exception

Here, we directly stop slave library 1 and simulate slave library downtime. Turn off mysql from library 1.

service mysql stop

At this time, the flink job will hang up.

Viewing the exception record of the flick job, two exceptions can be observed.
One is that the service connection timed out and did not receive a response from the mysql server.

-- The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.

The other is one or more fetchers have enumerated exception caused by the continuous restart of Flink job. Because the flinkcdc synchronizes binlog by simulating the behavior of the slave, each slave will have a serverid (configured in the definition of the flinksql mysql table, 'server ID' = '5600-5604'). The same serverid will be used when the job is restarted. This exception will be reported when the interval between restarting the Flink job is less than the timeout between the Flink and mysql.

The detailed exception stack information of these two exceptions is as follows:

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
	at com.mysql.cj.jdbc.exceptions.SQLError.createCommunicationsException(SQLError.java:174)
	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:64)
	at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:836)
	at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:456)
	at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:246)
	at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:197)
	at io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$1(JdbcConnection.java:231)
	at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:872)
	at io.debezium.connector.mysql.MySqlConnection.connection(MySqlConnection.java:79)
	at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:867)
	at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:550)
	at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:498)
	at io.debezium.connector.mysql.MySqlConnection.querySystemVariables(MySqlConnection.java:125)
	... 15 more
Caused by: com.mysql.cj.exceptions.CJCommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
	at sun.reflect.GeneratedConstructorAccessor47.newInstance(Unknown Source)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:61)
	at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:105)
	at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:151)
	at com.mysql.cj.exceptions.ExceptionFactory.createCommunicationsException(ExceptionFactory.java:167)
	at com.mysql.cj.protocol.a.NativeSocketConnection.connect(NativeSocketConnection.java:91)
	at com.mysql.cj.NativeSession.connect(NativeSession.java:144)
	at com.mysql.cj.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:956)
	at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:826)
	... 25 more
Caused by: java.net.ConnectException: Connection refused (Connection refused)
	at java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:589)
	at com.mysql.cj.protocol.StandardSocketFactory.connect(StandardSocketFactory.java:155)
	at com.mysql.cj.protocol.a.NativeSocketConnection.connect(NativeSocketConnection.java:65)
	... 28 more
2022-02-15 19:00:54
java.lang.RuntimeException: One or more fetchers have encountered exception
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:223)
	at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154)
	at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116)
	at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294)
	at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:148)
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:103)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: io.debezium.DebeziumException: Error reading MySQL variables: Communications link failure

3.2.2 the synchronization of manual manufacturing slave warehouse is inconsistent

At this time, multiple pieces of data are continuously written to the master database, so that the states of the two slave databases will be inconsistent.
a. View the data points synchronized from the master database through show slave status\G.

Master_Log_File: mysql-bin.000039
Read_Master_Log_Pos: 2759
Relay_Log_File: node-142-relay-bin.000006
Relay_Log_Pos: 2885
Relay_Master_Log_File: mysql-bin.000039

b. You can only query the status of slave library 2 here, because slave library 1 hangs up. However, we compare the synchronization status through two binlog files generated from the library. You can simply view the modification time of the binlog file and compare the synchronization differences.
Query the directory where mysql data is saved:

mysql> show variables like '%dir%';
+-----------------------------------------+----------------------------+
| Variable_name                           | Value                      |
+-----------------------------------------+----------------------------+
| datadir                                 | /var/lib/mysql/            |

ll data directory to see the status of binlog file generation.

ll /var/lib/mysql/

To view the binlog status of slave library 1:

-rw-r----- 1 mysql mysql      241 Feb 15 00:10 mysql-bin.000035
-rw-r----- 1 mysql mysql      723 Feb 15 17:08 mysql-bin.000036
-rw-r----- 1 mysql mysql     2786 Feb 15 18:57 mysql-bin.000037
-rw-r----- 1 mysql mysql      488 Feb 16 10:22 mysql-bin.000038
-rw-r----- 1 mysql mysql      759 Feb 16 10:37 mysql-bin.000039
-rw-r----- 1 mysql mysql      247 Feb 16 10:29 mysql-bin.index

View binlog status of slave library 2:

-rw-r----- 1 mysql mysql      177 Feb 15 15:12 mysql-bin.000001
-rw-r----- 1 mysql mysql     2770 Feb 16 00:10 mysql-bin.000002
-rw-r----- 1 mysql mysql     2593 Feb 16 10:50 mysql-bin.000003
-rw-r----- 1 mysql mysql       96 Feb 16 00:10 mysql-bin.index

From MySQL bin. Of library 1 000039 has stopped updating at 10:37, while MySQL bin. From library 2 00000 3 is continuously updated.
Read the difference through mysqlbinlog file. The author here built slave library 1 much earlier than slave library 2, so the binlog file number of slave library 1 is larger.
From library 1:

mysqlbinlog --start-datetime='2022-02-16 10:00:00' /var/lib/mysql/mysql-bin.000039

# at 705
#220216 10:37:15 server id 1  end_log_pos 736 CRC32 0x8523929e 	Xid = 183
COMMIT/*!*/;
# at 736
#220216 10:37:48 server id 2  end_log_pos 759 CRC32 0x238de33d 	Stop

From library 2:

mysqlbinlog --start-datetime='2022-02-16 10:00:00'  --stop-datetime='2020-02-16 11:00:00'  --no-defaults /var/lib/mysql/mysql-bin.000003

#220216 10:50:42 server id 1  end_log_pos 2562 CRC32 0x5e8d4964 	Write_rows: table id 156 flags: STMT_END_F

BINLOG '
gmYMYhMBAAAAOwAAAM0JAAAAAJwAAAAAAAEACWZsaW5rX2NkYwAGdGVzdF8xAAMIDxEDKAAAAhD4
QQc=
gmYMYh4BAAAANQAAAAIKAAAAAJwAAAAAAAEAAgAD//gQAAAAAAAAAARkYXRhYgxmgmRJjV4=
'/*!*/;
# at 2562
#220216 10:50:42 server id 1  end_log_pos 2593 CRC32 0xbc78edd3 	Xid = 486
COMMIT/*!*/;
# at 2593
#220216 11:39:28 server id 3  end_log_pos 2616 CRC32 0x3fe31a47 	Stop

3.3 first verification of high availability of flinkcdc MySQL

3.3.1 obtain the last checkpoint information from the job checkpoint information

3.3.2 switch to re storage 2

Mysql_ test_ The hostname of 1 is changed to slave library 2.

Flink SQL> drop table mysql_test_1;
Flink SQL> create table mysql_test_1(
> id bigint primary key not enforced,
> data String,
> create_time Timestamp(3)
> ) with (
> 'connector'='mysql-cdc',
> 'hostname'='192.168.2.102',
> 'port'='3306',
> 'server-id'='5600-5604',
> 'username'='user_flink',
> 'password'='flink@testdb',
> 'server-time-zone'='Asia/Shanghai',
> 'debezium.snapshot.mode'='initial',
> 'database-name'='flink_cdc',
> 'table-name'='test_1'
> );

3.1.3 restart through checkpoint

Flink SQL> set 'execution.savepoint.path'='hdfs:///tmp/flink/checkpoints/88541fd8a08e1cee71aac55d2f39951f/chk-3'
Flink SQL> insert into hudi_test_1 select * from mysql_test_1;

At this time, check the newly started job on the flick Web UI and find that the job restart fails. Check the exception stack and prompt that the binlog file no longer exists and the job cannot be started normally.

 The connector is trying to read binlog starting at Struct{version=1.5.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1644982889087,db=,server_id=0,file=mysql-bin.000039,pos=530,row=0}, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.

The "file=mysql-bin.000039,pos=530" prompted in the exception message is actually the point to which the binlog file generated from library 1 and flinkcdc have been synchronized. This is the synchronization status saved during checkpoint.
As we know from the previous binlog file analysis, the latest binlog from library 2 is MySQL bin 000003. In this way, we find that the binlog files of slave library 1 and slave library 2 are inconsistent, and the master-slave switch cannot be performed directly from the checkpoint.

The details of exception stack are as follows:

2022-02-16 11:41:29
java.lang.RuntimeException: One or more fetchers have encountered exception
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:223)
	at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154)
	at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116)
	at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294)
	at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:148)
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:103)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.lang.IllegalStateException: The connector is trying to read binlog starting at Struct{version=1.5.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1644982889087,db=,server_id=0,file=mysql-bin.000039,pos=530,row=0}, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.
	at com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.loadStartingOffsetState(StatefulTaskContext.java:179)
	at com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.configure(StatefulTaskContext.java:113)
	at com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.submitSplit(BinlogSplitReader.java:93)
	at com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.submitSplit(BinlogSplitReader.java:65)
	at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.checkSplitOrStartNext(MySqlSplitReader.java:147)
	at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:69)
	at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56)
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:140)
	... 6 more

3.4 second verification of high availability of flinkcdc MySQL

According to the latest Official document of FlinkCDC mysql connector , mysqlcdc supports high availability. Why did the verification fail? We dig deep into the official documents and find that to achieve high availability, mysql needs to turn on gtid.

3.4.1 introduction to gtid

3.4.1.1 what is gtid

Gtid is a new feature introduced by mysql in 5.7. It is used to solve the defect of master-slave synchronization using binlog - position mechanism: as we verified earlier, it cannot cope with high availability!

GTID (Global Transaction ID) is the global transaction ID, which is the unique identifier generated on the master database and bound to the transaction. This identifier is unique not only on the master database, but also in the MySQL Cluster.
GTID by server_uuid:transaction_id composition. server_uuid is the UUID and transaction of mysql instance_ ID represents the number of transactions executed on the instance, which increases with the number of transactions executed.

3.4.1.2 GTID example

Here is an example of a gtid:

d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61169

Among them, d667b1cd-778a-11ec-aa60-6c92bf64e18c is the service uuid and 1-61169 code. This instance executes 61169 transactions with IDs from 1 to 61169.

3.4.1.3 addressing mode of gtid

After gtid is enabled, the gtid range of the previous binlog file will be added to the binlog file header. During addressing, confirm the binlog location of the gtid to be synchronized by comparing the gtid range. The schematic diagram is as follows:

mysql-binlog.00001 previous-gtids:empty
mysql-binlog.00002 previous-gtids:1-100
mysql-binlog.00003 previous-gtids:101-200

Look for gtid=50, and the latest binlog file MySQL binlog 00003 read the previous gtids: 101-200, compare 50 with 101-200, confirm that 50 is smaller than 101, and then look for the first two files, that is, from MySQL binlog 00001 find.

Supplement:
For more information on gtid, see:
MySQL5.7 new killer features: GTID principle and Practice

Next, let's embark on the road of gtid high availability verification.

3.4.2 mysql enable gtid

Add the following configuration to the corresponding mysql configuration file (/ etc/msyql/my.cnf):
Main library configuration:

gtid_mode = on
enforce_gtid_consistency = on

Configuration from library:

gtid_mode = on
enforce_gtid_consistency = on
log-slave-updates = 1

After modifying the configuration, restart the mysql service:

service mysql restart

Connect the slave database through MySQL CLI and show slave status\G to view the slave database's slave synchronization status.
The first mock exam is synchronized with the main library. binlog parsing with gtid enabled is different from that without gtid enabled. At this time, slave will be displayed from the library_ IO_ Running=No,Last_IO_Error prompt parsing failed.

Slave_IO_Running: No
Slave_SQL_Running: Yes
Last_IO_Error: Got fatal error 1236 from master when reading data from binary log: 'Cannot replicate anonymous transaction when @@GLOBAL.GTID_MODE = ON, at file /var/lib/mysql/mysql-bin.000039, position 1049.; the first event 'mysql-bin.000039' at 1049, the last event read from '/var/lib/mysql/mysql-bin.000039' at 1114, the last byte read from '/var/lib/mysql/mysql-bin.000039' at 1114.'

At this time, you can roll back the configuration of the master-slave library, restart the slave library 1, and let the slave library 1 close the gtid_mode, and then update the configuration.
If the master-slave library gtid_ If mode is not in the same state, the following error will be reported.
Change the status to the same status.

Slave_IO_Running: No
Slave_SQL_Running: Yes
Last_IO_Error: The replication receiver thread cannot start because the master has GTID_MODE = ON and this server has GTID_MODE = OFF.

After successfully opening gtid mode, a gtid will be generated every time the update data statement is executed. We can view the synchronization status of gtids through show master/slave status.
master status:

mysql> show master status\G;
*************************** 1. row ***************************
             File: mysql-bin.000042
         Position: 764
     Binlog_Do_DB: 
 Binlog_Ignore_DB: 
Executed_Gtid_Set: d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61169

From library I status:

mysql> show slave status\G;
##Intercept only information related to gtid
*************************** 1. row ***************************
                  Master_UUID: d667b1cd-778a-11ec-aa60-6c92bf64e18c
           Retrieved_Gtid_Set: d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61169
            Executed_Gtid_Set: d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61169
1 row in set (0.00 sec)

Slave library 2 status:

mysql> show slave status\G;
*************************** 1. row ***************************
                  Master_UUID: d667b1cd-778a-11ec-aa60-6c92bf64e18c
           Retrieved_Gtid_Set: d667b1cd-778a-11ec-aa60-6c92bf64e18c:61168-61169
            Executed_Gtid_Set: d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61169

It can be seen that the master-slave database data synchronization has reached the latest gtid: d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61169.

3.4.3 restart the job to generate a checkpoint with gtid

Mysql_ test_ The Host of 1 is set to slave library 1, so that the job can be recovered from the checkpoint generated from library 1. After the job runs successfully, kill the slave library 1. At this time, the job will report an error. The phenomenon is consistent with the above process and will not be repeated.

be careful:
In the online environment, if the failure is not recovered, it is more to synchronize from the new library.

3.4.4 recovering from checkpoint with gtid

We get the checkpoint containing gtid from the web ui and restart the job. It is still reported that the binlog file is abnormal.

Caused by: java.lang.IllegalStateException: The connector is trying to read binlog starting at Struct{version=1.5.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1645074443471,db=,server_id=0,file=mysql-bin.000044,pos=194,row=0}, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.
	at com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.loadStartingOffsetState(StatefulTaskContext.java:179)
	at com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.configure(StatefulTaskContext.java:113)
	at com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.submitSplit(BinlogSplitReader.java:93)
	at com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.submitSplit(BinlogSplitReader.java:65)
	at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.checkSplitOrStartNext(MySqlSplitReader.java:147)
	at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:69)
	at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56)
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:140)
	... 6 more

3.4.5 abnormal source code analysis

Is the gtid not in effect? Check the log of taskmanager. The checkpoint already contains gtid information, which is also passed to sourcereader base.

2022-02-17 13:00:54,205 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [MySqlBinlogSplit{splitId='binlog-split', offset={ts_sec=0, file=mysql-bin.000044, pos=194, gtids=d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61239, row=0, event=0}, endOffset={ts_sec=0, file=, pos=-9223372036854775808, row=0, event=0}}]

Check the code in the exception stack again. In the StatefulTaskContext, the loadStartingOffsetState determines that it is isBinlogAvailable. If binlog is not available, throw an exception.

    private MySqlOffsetContext loadStartingOffsetState(
            OffsetContext.Loader loader, MySqlSplit mySqlSplit) {
        BinlogOffset offset =
                mySqlSplit.isSnapshotSplit()
                        ? BinlogOffset.INITIAL_OFFSET
                        : mySqlSplit.asBinlogSplit().getStartingOffset();

        MySqlOffsetContext mySqlOffsetContext =
                (MySqlOffsetContext) loader.load(offset.getOffset());

        if (!isBinlogAvailable(mySqlOffsetContext)) {
            throw new IllegalStateException(
                    "The connector is trying to read binlog starting at "
                            + mySqlOffsetContext.getSourceInfo()
                            + ", but this is no longer "
                            + "available on the server. Reconfigure the connector to use a snapshot when needed.");
        }
        return mySqlOffsetContext;
    }

isBinlogAvailable only determines whether the file name is found, not gtidset.

    private boolean isBinlogAvailable(MySqlOffsetContext offset) {
        String binlogFilename = offset.getSourceInfo().getString(BINLOG_FILENAME_OFFSET_KEY);
        if (binlogFilename == null) {
            return true; // start at current position
        }
        if (binlogFilename.equals("")) {
            return true; // start at beginning
        }

        // Accumulate the available binlog filenames ...
        List<String> logNames = connection.availableBinlogFiles();

        // And compare with the one we're supposed to use ...
        boolean found = logNames.stream().anyMatch(binlogFilename::equals);
        if (!found) {
            LOG.info(
                    "Connector requires binlog file '{}', but MySQL only has {}",
                    binlogFilename,
                    String.join(", ", logNames));
        } else {
            LOG.info("MySQL has the binlog file '{}' required by the connector", binlogFilename);
        }
        return found;
    }

3.4.6 gtid bugfix

The addressing mode of gtid is described in 3.4.1.3. In mysql high availability mode, when switching the main library, slave will compare the previous gtidset of the master binlog file according to the synchronized gtid, and finally confirm which binlog file of the new master to start synchronization. The source code does not have such an addressing step at all.

So it's certain that the flinkcdc bug. Pick up the bug on github. gtid is not valid

According to the feedback on githup, a brother found the bug at the end of 21 years and submitted bugfix in January [mysql] update check gtid set #761 . However, the current modified code has not been incorporated into the master branch.

Check the code modified by the other party, and add checkGtidSet in isBinlogAvailable

    private boolean isBinlogAvailable(MySqlOffsetContext offset) {
        String gtidStr = offset.gtidSet();
        if (gtidStr != null) {
            return checkGtidSet(offset);
        }
        return checkBinlogFilename(offset);
    }

The checkGtidSet does the following:
1. The gtidset – availableGtidStr of the master is queried,
2. Compared with the gtidset of checkpoint, calculate the gtid range gtidSetToReplicate that needs to be synchronized.
3. Query whether gtid – purgedGtidSet has been cleared, and compare it with gtidSetToReplicate to calculate the range that has not been cleared. If it is confirmed that it has not been cleared, continue the point synchronization from the checkpoint, otherwise restart the full synchronization.

    private boolean checkGtidSet(MySqlOffsetContext offset) {
        String gtidStr = offset.gtidSet();

        if (gtidStr.trim().isEmpty()) {
            return true; // start at beginning ...
        }

        String availableGtidStr = connection.knownGtidSet();
        if (availableGtidStr == null || availableGtidStr.trim().isEmpty()) {
            // Last offsets had GTIDs but the server does not use them ...
            LOG.warn(
                    "Connector used GTIDs previously, but MySQL does not know of any GTIDs or they are not enabled");
            return false;
        }
        // GTIDs are enabled
        GtidSet gtidSet = new GtidSet(gtidStr);
        // Get the GTID set that is available in the server ...
        GtidSet availableGtidSet = new GtidSet(availableGtidStr);
        if (gtidSet.isContainedWithin(availableGtidSet)) {
            LOG.info(
                    "MySQL current GTID set {} does contain the GTID set {} required by the connector.",
                    availableGtidSet,
                    gtidSet);
            // The replication is concept of mysql master-slave replication protocol ...
            final GtidSet gtidSetToReplicate =
                    connection.subtractGtidSet(availableGtidSet, gtidSet);
            final GtidSet purgedGtidSet = connection.purgedGtidSet();
            LOG.info("Server has already purged {} GTIDs", purgedGtidSet);
            final GtidSet nonPurgedGtidSetToReplicate =
                    connection.subtractGtidSet(gtidSetToReplicate, purgedGtidSet);
            LOG.info(
                    "GTID set {} known by the server but not processed yet, for replication are available only GTID set {}",
                    gtidSetToReplicate,
                    nonPurgedGtidSetToReplicate);
            if (!gtidSetToReplicate.equals(nonPurgedGtidSetToReplicate)) {
                LOG.warn("Some of the GTIDs needed to replicate have been already purged");
                return false;
            }
            return true;
        }
        LOG.info("Connector last known GTIDs are {}, but MySQL has {}", gtidSet, availableGtidSet);
        return false;
    }

3.4.7 merge the bugfix code to verify high availability

Download the code from gitbub( https://github.com/ververica/flink-cdc-connectors.git ), put bugfix – gtid is not valid The code is modified into the master, compiled and packaged, and updated into the flink environment. From the above verification process, it is observed that:
1. When switching between master and slave, no more binlog file is reported, and no exception is found.
2. The job is synchronized from the checkpoint instead of the full amount.
The phenomenon shows that mysqlcdc high availability handover is successful.

We check the operation log to further verify:
1. Get the initial gtids=d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61282
2. Confirm that MySQL's current gtid set d667b1cd-778a-11ec-aa60-6c92bf64e18c: 1-61290 contains the initial GTID set d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61282.
3. Calculate the synchronizable GTID set d667b1cd-778a-11ec-aa60-6c92bf64e18c:61283-61290.
4. Set the synchronization starting point to start synchronization.

The detailed log is as follows:

2022-02-17 20:05:55,961 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [MySqlBinlogSplit{splitId='binlog-split', offset={ts_sec=0, file=mysql-bin.000005, pos=31359, gtids=d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61282, row=0, event=0}, endOffset={ts_sec=0, file=, pos=-9223372036854775808, row=0, event=0}, isSuspended=false}]
2022-02-17 20:05:55,966 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 0
2022-02-17 20:05:55,970 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: TableSourceScan(table=[[default_catalog, default_database, mysql_test_1]], fields=[id, data, create_time]) -> NotNullEnforcer(fields=[id]) -> Map (1/1)#0 (7fdfa75135e212f596f80bd2716e0837) switched from INITIALIZING to RUNNING.
com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader [] - BinlogSplitReader is created.
2022-02-17 20:05:56,237 INFO  com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext [] - MySQL current GTID set d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61290 does contain the GTID set d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61282 required by the connector.
2022-02-17 20:05:56,247 INFO  com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext [] - Server has already purged d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61103 GTIDs
2022-02-17 20:05:56,248 INFO  com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext [] - GTID set d667b1cd-778a-11ec-aa60-6c92bf64e18c:61283-61290 known by the server but not processed yet, for replication are available only GTID set d667b1cd-778a-11ec-aa60-6c92bf64e18c:61283-61290
2022-02-17 20:05:56,248 INFO  io.debezium.relational.history.DatabaseHistoryMetrics        [] - Started database history recovery
2022-02-17 20:05:56,249 INFO  io.debezium.relational.history.DatabaseHistoryMetrics        [] - Finished database history recovery of 0 change(s) in 1 ms
2022-02-17 20:05:56,277 INFO  io.debezium.util.Threads                                     [] - Requested thread factory for connector MySqlConnector, id = mysql_binlog_source named = binlog-client
2022-02-17 20:05:56,287 INFO  io.debezium.connector.mysql.MySqlStreamingChangeEventSource  [] - GTID set purged on server: d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61103
2022-02-17 20:05:56,287 INFO  io.debezium.connector.mysql.MySqlStreamingChangeEventSource  [] - Attempting to generate a filtered GTID set
2022-02-17 20:05:56,287 INFO  io.debezium.connector.mysql.MySqlStreamingChangeEventSource  [] - GTID set from previous recorded offset: d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61282
2022-02-17 20:05:56,288 INFO  io.debezium.connector.mysql.MySqlStreamingChangeEventSource  [] - GTID set available on server: d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61290
2022-02-17 20:05:56,288 INFO  io.debezium.connector.mysql.MySqlStreamingChangeEventSource  [] - Using first available positions for new GTID channels
2022-02-17 20:05:56,288 INFO  io.debezium.connector.mysql.MySqlStreamingChangeEventSource  [] - Relevant GTID set available on server: d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61290
2022-02-17 20:05:56,289 INFO  io.debezium.connector.mysql.MySqlStreamingChangeEventSource  [] - Final merged GTID set to use when connecting to MySQL: d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61282
2022-02-17 20:05:56,289 INFO  io.debezium.connector.mysql.MySqlStreamingChangeEventSource  [] - Registering binlog reader with GTID set: d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61282
2022-02-17 20:05:56,289 INFO  io.debezium.connector.mysql.MySqlStreamingChangeEventSource  [] - Skip 0 events on streaming start
2022-02-17 20:05:56,289 INFO  io.debezium.connector.mysql.MySqlStreamingChangeEventSource  [] - Skip 0 rows on streaming start
2022-02-17 20:05:56,290 INFO  io.debezium.util.Threads                                     [] - Creating thread debezium-mysqlconnector-mysql_binlog_source-binlog-client
2022-02-17 20:05:56,293 INFO  io.debezium.util.Threads                                     [] - Creating thread debezium-mysqlconnector-mysql_binlog_source-binlog-client
2022-02-17 20:05:56,302 INFO  io.debezium.connector.mysql.MySqlStreamingChangeEventSource  [] - Connected to MySQL binlog at 10.130.49.141:3306, starting at MySqlOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=mysql-bin.000005, currentBinlogPosition=31359, currentRowNumber=0, serverId=0, sourceTime=null, threadId=-1, currentQuery=null, tableIds=[], databaseName=null], partition={server=mysql_binlog_source}, snapshotCompleted=false, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet=d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61282, currentGtidSet=d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61282, restartBinlogFilename=mysql-bin.000005, restartBinlogPosition=31359, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId=null]

4, Summary

So far, three rounds of verification have been conducted to confirm that flynkcdc can support mysql high availability. The required conditions are:
1. mysql service enable gtid_mode
2. At present, there are bugs in the officially released version (2.2-SNAPSHOT), which can not directly support high availability. After compiling bugfix, you can use it manually.

Keywords: Database MySQL flink Hudi

Added by rulian on Fri, 18 Feb 2022 06:00:15 +0200