Preface: Hudi series:
Flinkcdc Hudi: introduction to MySQL data entering the lake in real time 1: preliminary test
Flinkcdc Hudi: full introduction to MySQL data entering the lake in real time II: exceptions and solutions encountered during the integration of Hudi and Spark
1, Background
In the production environment, mysql is generally equipped with master-slave libraries to meet the needs of data backup, service disaster recovery, read-write separation and so on. When using flinkcdc for mysql data entering the lake, it is inevitable to deal with the master-slave database. The degree to which flinkcdc supports the switching of mysql master-slave libraries, how to configure the database, and how to cooperate with the operation and development of synchronization programs are the items that must be checked before flinkcdc is put into production and application.
This paper records the main verification process of Mysql master-slave data synchronization using FlinkCDC for reference.
2, Environmental preparation before verification
2.1 FlinkCDC+Hudi environment preparation
For the specific process of FlinkCDC+Hudi environment preparation, see Flinkcdc Hudi: introduction to MySQL data entering the lake in real time 1: preliminary test , this article will not repeat.
2.2 preparation of MySQL master-slave database environment
Prepare one Mysql master and one Mysql slave.
Main library: 192.168.2.100
Slave library 1:192.168.2.101
Slave library 2: 192.168.2.102
*The simple mysql installation commands are as follows:
#ubuntu install command sudo apt install mysql-server -y #linux Installation command yum install mysql-server
For more installation applications, please refer to:
Ubuntu18.04 installing MySQL
Installing Mysql for Linux*
2.2 master slave library configuration
After mysql installation, configure the master-slave database. The default configuration file for mysql is in / etc / mysql / my cnf. The corresponding configuration of master-slave library is as follows.
Note: only the simplest configuration of master-slave library is provided here, which is only for testing. Do not apply these configurations directly to the production environment.
2.2.1 main warehouse configuration
!includedir /etc/mysql/conf.d/ !includedir /etc/mysql/mysql.conf.d/ [client] default-character-set = utf8mb4 [mysql] default-character-set = utf8mb4 [mysqld] collation-server = utf8mb4_unicode_ci init-connect='SET NAMES utf8mb4' character-set-server = utf8mb4 bind-address = 0.0.0.0 server_id = 1 log-bin = /var/lib/mysql/mysql-bin #binlog-do-db = * log-slave-updates sync_binlog = 1 auto_increment_offset = 1 auto_increment_increment = 1 log_bin_trust_function_creators = 1
2.2.2 slave library installation and configuration
!includedir /etc/mysql/conf.d/ !includedir /etc/mysql/mysql.conf.d/ [client] default-character-set = utf8mb4 [mysql] default-character-set = utf8mb4 [mysqld] collation-server = utf8mb4_unicode_ci init-connect='SET NAMES utf8mb4' character-set-server = utf8mb4 bind-address = 0.0.0.0 server_id = 2 log-bin = /var/lib/mysql/mysql-bin log-slave-updates sync_binlog = 0 ##Specify which library the slave will copy replicate-do-db = flink_cdc ##During MySQL Master-slave replication, when the network between the Master and slave is interrupted, but the Master and slave cannot detect it (such as firewall or routing problems). Slave will wait for slave_ net_ After the timeout is set to the number of seconds, the network can be considered to have failed, and then it can be reconnected and catch up with the data of the main database during this period slave-net-timeout = 60 log_bin_trust_function_creators = 1 read_only = 1
2.2.3 initialization of slave database data
The newly installed master-slave library does not need to initialize the slave library data because there is no data yet.
If a master library already exists and a slave library is added, data initialization is required.
1. Lock the database to be synchronized in the main database
mysql> use flink_cdc; mysql> flush tables with read lock;
2. dump all data in the main database
mysqldump -uroot -proot_password flink_cdc> flink_cdc.sql
3. After the data dump is completed in step 2, release the lock
mysql> unlock tables;
4. Import all data from library
mysql> source /path/flink_cdc.sql;
2.2.4 main library authorization
#Create slave account user_ User, test password_ test_ password mysql> grant select,replication slave,replication client on *.* to 'user_test'@'%' identified by 'user_test_password'; #Update database permissions mysql> flush privileges; #View master binlog ### To initialize from the library, you need to execute before unlock tables. mysql> show master status\G; *************************** 1. row *************************** File: mysql-bin.000037 //Current binlog file Position: 700 //Current binlog file displacement Binlog_Do_DB: Binlog_Ignore_DB: 1 row in set (0.00 sec)
2.2.4 enable slave synchronization
#Execute the synchronization command, set the master server ip, synchronization account, password and synchronization location mysql> change master to master_host='192.168.2.100',master_user='user_test',master_password='user_test_password',master_log_file='mysql-bin.000037',master_log_pos=700; #Enable synchronization function mysql> start slave; #View slave synchronization status mysql> show slave status\G; *************************** 1. row *************************** Slave_IO_State: Connecting to master Master_Host: 192.168.2.100 Master_User: user_test Master_Port: 3306 Connect_Retry: 60 Master_Log_File: mysql-bin.000037 Read_Master_Log_Pos: 700 Relay_Log_File: node-142-relay-bin.000001 Relay_Log_Pos: 4 Relay_Master_Log_File: mysql-bin.000037 Slave_IO_Running: Yes Slave_SQL_Running: Yes Replicate_Do_DB: flink_cdc Replicate_Ignore_DB: Replicate_Do_Table: Replicate_Ignore_Table: Replicate_Wild_Do_Table: Replicate_Wild_Ignore_Table: Last_Errno: 0 Last_Error: Skip_Counter: 0 Exec_Master_Log_Pos: 700 Relay_Log_Space: 154 Until_Condition: None Until_Log_File: Until_Log_Pos: 0 Master_SSL_Allowed: No Master_SSL_CA_File: Master_SSL_CA_Path: Master_SSL_Cert: Master_SSL_Cipher: Master_SSL_Key: Seconds_Behind_Master: NULL Master_SSL_Verify_Server_Cert: No Last_IO_Errno: 0 Last_IO_Error: Last_SQL_Errno: 0 Last_SQL_Error: Replicate_Ignore_Server_Ids: Master_Server_Id: 0 Master_UUID: Master_Info_File: /var/lib/mysql/master.info SQL_Delay: 0 SQL_Remaining_Delay: NULL Slave_SQL_Running_State: Master_Retry_Count: 86400 Master_Bind: Last_IO_Error_Timestamp: Last_SQL_Error_Timestamp: Master_SSL_Crl: Master_SSL_Crlpath: Retrieved_Gtid_Set: Executed_Gtid_Set: d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61167 Auto_Position: 0 Replicate_Rewrite_DB: Channel_Name: Master_TLS_Version: 1 row in set (0.00 sec)
show slave status_ IO_ Running and slave_ SQL_ The running process must be in the Yes status to enable synchronization normally. Abnormal condition Last_IO_Errno,Last_IO_Error, Last_SQL_Errno,Last_SQL_Error will be prompted accordingly.
After setting, you can add or delete data in the master database and verify the synchronization status in the slave database. It will not unfold here.
3, FlinkCDC syncs from the library into the lake for verification
3.1 flinkcdc Hudi operating environment
It is assumed that the reader has built the FlinkCDC+Hudi environment. This operation only records the necessary steps to verify that FlinkCDC synchronizes mysql master-slave data.
For the specific process of FlinkCDC+Hudi environment preparation, see Flinkcdc Hudi: introduction to MySQL data entering the lake in real time 1: preliminary test
3.1.1 mysql test table preparation
Create test table 1 in the main library
mysql> use flink_cdc; mysql> CREATE TABLE `test_1` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `data` varchar(10) DEFAULT NULL, `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; ## Repeat and insert multiple pieces of data mysql> insert into test_1(data) values('data');
3.1.2 start synchronization job using FlinkSql
3.1.2.1 start FlinkSql
FLINK_HOME/bin/yarn-session.sh -s 4 -jm 1024 -tm 2048 -nm flink-hudi-0.10 -d FLINK_HOME/bin/sql-client.sh embedded -s yarn-session -j ./lib/hudi-flink-bundle_2.11-0.10.0.jar shell
3.1.2.2 start job in FlinkSql
Flink SQL> set execution.checkpointing.interval=30sec; ##Create the flinksql mysqlcdc table Flink SQL> create table mysql_test_1( id bigint primary key not enforced, data String, create_time Timestamp(3) ) with ( 'connector'='mysql-cdc', 'hostname'='192.168.2.101', --Start synchronization from library 1, and change to the corresponding one when switching to master library or slave library 2 ip 'port'='3306', 'server-id'='5600-5604', 'username'='user_test', 'password'='user_test_password', 'server-time-zone'='Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name'='flink_cdc', 'table-name'='test_1' ) ##Create the flinksql hudi table Flink SQL> create table hudi_test_1( id bigint, data String, create_time Timestamp(3), PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector'='hudi', 'path'='hdfs:///tmp/flink/cdcata/hudi_test_1', 'hoodie.datasource.write.recordkey.field'='id', 'hoodie.parquet.max.file.size'='268435456', 'write.precombine.field'='create_time', 'write.tasks'='1', 'write.bucket_assign.tasks'='1', 'write.task.max.size'='1024', 'write.rate.limit'='30000', 'table.type'='MERGE_ON_READ', 'compaction.tasks'='1', 'compaction.async.enabled'='true', 'compaction.delta_commits'='1', 'compaction.max_memory'='500', 'changelog.enabled'='true', 'read.streaming.enabled'='true', 'read.streaming.check.interval'='3', 'hive_sync.enable'='true', 'hive_sync.mode'='hms', 'hive_sync.metastore.uris'='thrift://hiveserver2:9083', 'hive_sync.db'='test', 'hive_sync.table'='hudi_test_1', 'hive_sync.username'='flinkcdc', 'hive_sync.support_timestamp'='true' ); ##Start the flick job Flink SQL> set pipeline.name = flinkcdc_test_1; Flink SQL> set execution.checkpointing.externalized-checkpoint-retention= RETAIN_ON_CANCELLATION; Flink SQL> insert into hudi_test_1 select * from mysql_test_1;
3.1.2.3 successful start of operation
3.1.2.4 verify hudi landing data in FlinkSQL.
Flink SQL> select * from hudi_test_1;
3.2 simulation slave database data synchronization exception
At present, the environment we build is one master and two slaves. Here, we verify the exception from one and switch to slave two. Of course, you can also switch to the main, and the process of switching verification is the same. However, the general production environment will not synchronize data directly from the main library.
3.2.1 simulation slave exception
Here, we directly stop slave library 1 and simulate slave library downtime. Turn off mysql from library 1.
service mysql stop
At this time, the flink job will hang up.
Viewing the exception record of the flick job, two exceptions can be observed.
One is that the service connection timed out and did not receive a response from the mysql server.
-- The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
The other is one or more fetchers have enumerated exception caused by the continuous restart of Flink job. Because the flinkcdc synchronizes binlog by simulating the behavior of the slave, each slave will have a serverid (configured in the definition of the flinksql mysql table, 'server ID' = '5600-5604'). The same serverid will be used when the job is restarted. This exception will be reported when the interval between restarting the Flink job is less than the timeout between the Flink and mysql.
The detailed exception stack information of these two exceptions is as follows:
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. at com.mysql.cj.jdbc.exceptions.SQLError.createCommunicationsException(SQLError.java:174) at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:64) at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:836) at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:456) at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:246) at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:197) at io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$1(JdbcConnection.java:231) at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:872) at io.debezium.connector.mysql.MySqlConnection.connection(MySqlConnection.java:79) at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:867) at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:550) at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:498) at io.debezium.connector.mysql.MySqlConnection.querySystemVariables(MySqlConnection.java:125) ... 15 more Caused by: com.mysql.cj.exceptions.CJCommunicationsException: Communications link failure The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. at sun.reflect.GeneratedConstructorAccessor47.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:61) at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:105) at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:151) at com.mysql.cj.exceptions.ExceptionFactory.createCommunicationsException(ExceptionFactory.java:167) at com.mysql.cj.protocol.a.NativeSocketConnection.connect(NativeSocketConnection.java:91) at com.mysql.cj.NativeSession.connect(NativeSession.java:144) at com.mysql.cj.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:956) at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:826) ... 25 more Caused by: java.net.ConnectException: Connection refused (Connection refused) at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:589) at com.mysql.cj.protocol.StandardSocketFactory.connect(StandardSocketFactory.java:155) at com.mysql.cj.protocol.a.NativeSocketConnection.connect(NativeSocketConnection.java:65) ... 28 more
2022-02-15 19:00:54 java.lang.RuntimeException: One or more fetchers have encountered exception at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:223) at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294) at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:148) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:103) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Caused by: io.debezium.DebeziumException: Error reading MySQL variables: Communications link failure
3.2.2 the synchronization of manual manufacturing slave warehouse is inconsistent
At this time, multiple pieces of data are continuously written to the master database, so that the states of the two slave databases will be inconsistent.
a. View the data points synchronized from the master database through show slave status\G.
Master_Log_File: mysql-bin.000039 Read_Master_Log_Pos: 2759 Relay_Log_File: node-142-relay-bin.000006 Relay_Log_Pos: 2885 Relay_Master_Log_File: mysql-bin.000039
b. You can only query the status of slave library 2 here, because slave library 1 hangs up. However, we compare the synchronization status through two binlog files generated from the library. You can simply view the modification time of the binlog file and compare the synchronization differences.
Query the directory where mysql data is saved:
mysql> show variables like '%dir%'; +-----------------------------------------+----------------------------+ | Variable_name | Value | +-----------------------------------------+----------------------------+ | datadir | /var/lib/mysql/ |
ll data directory to see the status of binlog file generation.
ll /var/lib/mysql/
To view the binlog status of slave library 1:
-rw-r----- 1 mysql mysql 241 Feb 15 00:10 mysql-bin.000035 -rw-r----- 1 mysql mysql 723 Feb 15 17:08 mysql-bin.000036 -rw-r----- 1 mysql mysql 2786 Feb 15 18:57 mysql-bin.000037 -rw-r----- 1 mysql mysql 488 Feb 16 10:22 mysql-bin.000038 -rw-r----- 1 mysql mysql 759 Feb 16 10:37 mysql-bin.000039 -rw-r----- 1 mysql mysql 247 Feb 16 10:29 mysql-bin.index
View binlog status of slave library 2:
-rw-r----- 1 mysql mysql 177 Feb 15 15:12 mysql-bin.000001 -rw-r----- 1 mysql mysql 2770 Feb 16 00:10 mysql-bin.000002 -rw-r----- 1 mysql mysql 2593 Feb 16 10:50 mysql-bin.000003 -rw-r----- 1 mysql mysql 96 Feb 16 00:10 mysql-bin.index
From MySQL bin. Of library 1 000039 has stopped updating at 10:37, while MySQL bin. From library 2 00000 3 is continuously updated.
Read the difference through mysqlbinlog file. The author here built slave library 1 much earlier than slave library 2, so the binlog file number of slave library 1 is larger.
From library 1:
mysqlbinlog --start-datetime='2022-02-16 10:00:00' /var/lib/mysql/mysql-bin.000039 # at 705 #220216 10:37:15 server id 1 end_log_pos 736 CRC32 0x8523929e Xid = 183 COMMIT/*!*/; # at 736 #220216 10:37:48 server id 2 end_log_pos 759 CRC32 0x238de33d Stop
From library 2:
mysqlbinlog --start-datetime='2022-02-16 10:00:00' --stop-datetime='2020-02-16 11:00:00' --no-defaults /var/lib/mysql/mysql-bin.000003 #220216 10:50:42 server id 1 end_log_pos 2562 CRC32 0x5e8d4964 Write_rows: table id 156 flags: STMT_END_F BINLOG ' gmYMYhMBAAAAOwAAAM0JAAAAAJwAAAAAAAEACWZsaW5rX2NkYwAGdGVzdF8xAAMIDxEDKAAAAhD4 QQc= gmYMYh4BAAAANQAAAAIKAAAAAJwAAAAAAAEAAgAD//gQAAAAAAAAAARkYXRhYgxmgmRJjV4= '/*!*/; # at 2562 #220216 10:50:42 server id 1 end_log_pos 2593 CRC32 0xbc78edd3 Xid = 486 COMMIT/*!*/; # at 2593 #220216 11:39:28 server id 3 end_log_pos 2616 CRC32 0x3fe31a47 Stop
3.3 first verification of high availability of flinkcdc MySQL
3.3.1 obtain the last checkpoint information from the job checkpoint information
3.3.2 switch to re storage 2
Mysql_ test_ The hostname of 1 is changed to slave library 2.
Flink SQL> drop table mysql_test_1; Flink SQL> create table mysql_test_1( > id bigint primary key not enforced, > data String, > create_time Timestamp(3) > ) with ( > 'connector'='mysql-cdc', > 'hostname'='192.168.2.102', > 'port'='3306', > 'server-id'='5600-5604', > 'username'='user_flink', > 'password'='flink@testdb', > 'server-time-zone'='Asia/Shanghai', > 'debezium.snapshot.mode'='initial', > 'database-name'='flink_cdc', > 'table-name'='test_1' > );
3.1.3 restart through checkpoint
Flink SQL> set 'execution.savepoint.path'='hdfs:///tmp/flink/checkpoints/88541fd8a08e1cee71aac55d2f39951f/chk-3' Flink SQL> insert into hudi_test_1 select * from mysql_test_1;
At this time, check the newly started job on the flick Web UI and find that the job restart fails. Check the exception stack and prompt that the binlog file no longer exists and the job cannot be started normally.
The connector is trying to read binlog starting at Struct{version=1.5.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1644982889087,db=,server_id=0,file=mysql-bin.000039,pos=530,row=0}, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.
The "file=mysql-bin.000039,pos=530" prompted in the exception message is actually the point to which the binlog file generated from library 1 and flinkcdc have been synchronized. This is the synchronization status saved during checkpoint.
As we know from the previous binlog file analysis, the latest binlog from library 2 is MySQL bin 000003. In this way, we find that the binlog files of slave library 1 and slave library 2 are inconsistent, and the master-slave switch cannot be performed directly from the checkpoint.
The details of exception stack are as follows:
2022-02-16 11:41:29 java.lang.RuntimeException: One or more fetchers have encountered exception at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:223) at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294) at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:148) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:103) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Caused by: java.lang.IllegalStateException: The connector is trying to read binlog starting at Struct{version=1.5.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1644982889087,db=,server_id=0,file=mysql-bin.000039,pos=530,row=0}, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed. at com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.loadStartingOffsetState(StatefulTaskContext.java:179) at com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.configure(StatefulTaskContext.java:113) at com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.submitSplit(BinlogSplitReader.java:93) at com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.submitSplit(BinlogSplitReader.java:65) at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.checkSplitOrStartNext(MySqlSplitReader.java:147) at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:69) at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:140) ... 6 more
3.4 second verification of high availability of flinkcdc MySQL
According to the latest Official document of FlinkCDC mysql connector , mysqlcdc supports high availability. Why did the verification fail? We dig deep into the official documents and find that to achieve high availability, mysql needs to turn on gtid.
3.4.1 introduction to gtid
3.4.1.1 what is gtid
Gtid is a new feature introduced by mysql in 5.7. It is used to solve the defect of master-slave synchronization using binlog - position mechanism: as we verified earlier, it cannot cope with high availability!
GTID (Global Transaction ID) is the global transaction ID, which is the unique identifier generated on the master database and bound to the transaction. This identifier is unique not only on the master database, but also in the MySQL Cluster.
GTID by server_uuid:transaction_id composition. server_uuid is the UUID and transaction of mysql instance_ ID represents the number of transactions executed on the instance, which increases with the number of transactions executed.
3.4.1.2 GTID example
Here is an example of a gtid:
d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61169
Among them, d667b1cd-778a-11ec-aa60-6c92bf64e18c is the service uuid and 1-61169 code. This instance executes 61169 transactions with IDs from 1 to 61169.
3.4.1.3 addressing mode of gtid
After gtid is enabled, the gtid range of the previous binlog file will be added to the binlog file header. During addressing, confirm the binlog location of the gtid to be synchronized by comparing the gtid range. The schematic diagram is as follows:
mysql-binlog.00001 previous-gtids:empty mysql-binlog.00002 previous-gtids:1-100 mysql-binlog.00003 previous-gtids:101-200
Look for gtid=50, and the latest binlog file MySQL binlog 00003 read the previous gtids: 101-200, compare 50 with 101-200, confirm that 50 is smaller than 101, and then look for the first two files, that is, from MySQL binlog 00001 find.
Supplement:
For more information on gtid, see:
MySQL5.7 new killer features: GTID principle and Practice
Next, let's embark on the road of gtid high availability verification.
3.4.2 mysql enable gtid
Add the following configuration to the corresponding mysql configuration file (/ etc/msyql/my.cnf):
Main library configuration:
gtid_mode = on enforce_gtid_consistency = on
Configuration from library:
gtid_mode = on enforce_gtid_consistency = on log-slave-updates = 1
After modifying the configuration, restart the mysql service:
service mysql restart
Connect the slave database through MySQL CLI and show slave status\G to view the slave database's slave synchronization status.
The first mock exam is synchronized with the main library. binlog parsing with gtid enabled is different from that without gtid enabled. At this time, slave will be displayed from the library_ IO_ Running=No,Last_IO_Error prompt parsing failed.
Slave_IO_Running: No Slave_SQL_Running: Yes Last_IO_Error: Got fatal error 1236 from master when reading data from binary log: 'Cannot replicate anonymous transaction when @@GLOBAL.GTID_MODE = ON, at file /var/lib/mysql/mysql-bin.000039, position 1049.; the first event 'mysql-bin.000039' at 1049, the last event read from '/var/lib/mysql/mysql-bin.000039' at 1114, the last byte read from '/var/lib/mysql/mysql-bin.000039' at 1114.'
At this time, you can roll back the configuration of the master-slave library, restart the slave library 1, and let the slave library 1 close the gtid_mode, and then update the configuration.
If the master-slave library gtid_ If mode is not in the same state, the following error will be reported.
Change the status to the same status.
Slave_IO_Running: No Slave_SQL_Running: Yes Last_IO_Error: The replication receiver thread cannot start because the master has GTID_MODE = ON and this server has GTID_MODE = OFF.
After successfully opening gtid mode, a gtid will be generated every time the update data statement is executed. We can view the synchronization status of gtids through show master/slave status.
master status:
mysql> show master status\G; *************************** 1. row *************************** File: mysql-bin.000042 Position: 764 Binlog_Do_DB: Binlog_Ignore_DB: Executed_Gtid_Set: d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61169
From library I status:
mysql> show slave status\G; ##Intercept only information related to gtid *************************** 1. row *************************** Master_UUID: d667b1cd-778a-11ec-aa60-6c92bf64e18c Retrieved_Gtid_Set: d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61169 Executed_Gtid_Set: d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61169 1 row in set (0.00 sec)
Slave library 2 status:
mysql> show slave status\G; *************************** 1. row *************************** Master_UUID: d667b1cd-778a-11ec-aa60-6c92bf64e18c Retrieved_Gtid_Set: d667b1cd-778a-11ec-aa60-6c92bf64e18c:61168-61169 Executed_Gtid_Set: d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61169
It can be seen that the master-slave database data synchronization has reached the latest gtid: d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61169.
3.4.3 restart the job to generate a checkpoint with gtid
Mysql_ test_ The Host of 1 is set to slave library 1, so that the job can be recovered from the checkpoint generated from library 1. After the job runs successfully, kill the slave library 1. At this time, the job will report an error. The phenomenon is consistent with the above process and will not be repeated.
be careful:
In the online environment, if the failure is not recovered, it is more to synchronize from the new library.
3.4.4 recovering from checkpoint with gtid
We get the checkpoint containing gtid from the web ui and restart the job. It is still reported that the binlog file is abnormal.
Caused by: java.lang.IllegalStateException: The connector is trying to read binlog starting at Struct{version=1.5.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1645074443471,db=,server_id=0,file=mysql-bin.000044,pos=194,row=0}, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed. at com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.loadStartingOffsetState(StatefulTaskContext.java:179) at com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.configure(StatefulTaskContext.java:113) at com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.submitSplit(BinlogSplitReader.java:93) at com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.submitSplit(BinlogSplitReader.java:65) at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.checkSplitOrStartNext(MySqlSplitReader.java:147) at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:69) at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:140) ... 6 more
3.4.5 abnormal source code analysis
Is the gtid not in effect? Check the log of taskmanager. The checkpoint already contains gtid information, which is also passed to sourcereader base.
2022-02-17 13:00:54,205 INFO org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [MySqlBinlogSplit{splitId='binlog-split', offset={ts_sec=0, file=mysql-bin.000044, pos=194, gtids=d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61239, row=0, event=0}, endOffset={ts_sec=0, file=, pos=-9223372036854775808, row=0, event=0}}]
Check the code in the exception stack again. In the StatefulTaskContext, the loadStartingOffsetState determines that it is isBinlogAvailable. If binlog is not available, throw an exception.
private MySqlOffsetContext loadStartingOffsetState( OffsetContext.Loader loader, MySqlSplit mySqlSplit) { BinlogOffset offset = mySqlSplit.isSnapshotSplit() ? BinlogOffset.INITIAL_OFFSET : mySqlSplit.asBinlogSplit().getStartingOffset(); MySqlOffsetContext mySqlOffsetContext = (MySqlOffsetContext) loader.load(offset.getOffset()); if (!isBinlogAvailable(mySqlOffsetContext)) { throw new IllegalStateException( "The connector is trying to read binlog starting at " + mySqlOffsetContext.getSourceInfo() + ", but this is no longer " + "available on the server. Reconfigure the connector to use a snapshot when needed."); } return mySqlOffsetContext; }
isBinlogAvailable only determines whether the file name is found, not gtidset.
private boolean isBinlogAvailable(MySqlOffsetContext offset) { String binlogFilename = offset.getSourceInfo().getString(BINLOG_FILENAME_OFFSET_KEY); if (binlogFilename == null) { return true; // start at current position } if (binlogFilename.equals("")) { return true; // start at beginning } // Accumulate the available binlog filenames ... List<String> logNames = connection.availableBinlogFiles(); // And compare with the one we're supposed to use ... boolean found = logNames.stream().anyMatch(binlogFilename::equals); if (!found) { LOG.info( "Connector requires binlog file '{}', but MySQL only has {}", binlogFilename, String.join(", ", logNames)); } else { LOG.info("MySQL has the binlog file '{}' required by the connector", binlogFilename); } return found; }
3.4.6 gtid bugfix
The addressing mode of gtid is described in 3.4.1.3. In mysql high availability mode, when switching the main library, slave will compare the previous gtidset of the master binlog file according to the synchronized gtid, and finally confirm which binlog file of the new master to start synchronization. The source code does not have such an addressing step at all.
So it's certain that the flinkcdc bug. Pick up the bug on github. gtid is not valid
According to the feedback on githup, a brother found the bug at the end of 21 years and submitted bugfix in January [mysql] update check gtid set #761 . However, the current modified code has not been incorporated into the master branch.
Check the code modified by the other party, and add checkGtidSet in isBinlogAvailable
private boolean isBinlogAvailable(MySqlOffsetContext offset) { String gtidStr = offset.gtidSet(); if (gtidStr != null) { return checkGtidSet(offset); } return checkBinlogFilename(offset); }
The checkGtidSet does the following:
1. The gtidset – availableGtidStr of the master is queried,
2. Compared with the gtidset of checkpoint, calculate the gtid range gtidSetToReplicate that needs to be synchronized.
3. Query whether gtid – purgedGtidSet has been cleared, and compare it with gtidSetToReplicate to calculate the range that has not been cleared. If it is confirmed that it has not been cleared, continue the point synchronization from the checkpoint, otherwise restart the full synchronization.
private boolean checkGtidSet(MySqlOffsetContext offset) { String gtidStr = offset.gtidSet(); if (gtidStr.trim().isEmpty()) { return true; // start at beginning ... } String availableGtidStr = connection.knownGtidSet(); if (availableGtidStr == null || availableGtidStr.trim().isEmpty()) { // Last offsets had GTIDs but the server does not use them ... LOG.warn( "Connector used GTIDs previously, but MySQL does not know of any GTIDs or they are not enabled"); return false; } // GTIDs are enabled GtidSet gtidSet = new GtidSet(gtidStr); // Get the GTID set that is available in the server ... GtidSet availableGtidSet = new GtidSet(availableGtidStr); if (gtidSet.isContainedWithin(availableGtidSet)) { LOG.info( "MySQL current GTID set {} does contain the GTID set {} required by the connector.", availableGtidSet, gtidSet); // The replication is concept of mysql master-slave replication protocol ... final GtidSet gtidSetToReplicate = connection.subtractGtidSet(availableGtidSet, gtidSet); final GtidSet purgedGtidSet = connection.purgedGtidSet(); LOG.info("Server has already purged {} GTIDs", purgedGtidSet); final GtidSet nonPurgedGtidSetToReplicate = connection.subtractGtidSet(gtidSetToReplicate, purgedGtidSet); LOG.info( "GTID set {} known by the server but not processed yet, for replication are available only GTID set {}", gtidSetToReplicate, nonPurgedGtidSetToReplicate); if (!gtidSetToReplicate.equals(nonPurgedGtidSetToReplicate)) { LOG.warn("Some of the GTIDs needed to replicate have been already purged"); return false; } return true; } LOG.info("Connector last known GTIDs are {}, but MySQL has {}", gtidSet, availableGtidSet); return false; }
3.4.7 merge the bugfix code to verify high availability
Download the code from gitbub( https://github.com/ververica/flink-cdc-connectors.git ), put bugfix – gtid is not valid The code is modified into the master, compiled and packaged, and updated into the flink environment. From the above verification process, it is observed that:
1. When switching between master and slave, no more binlog file is reported, and no exception is found.
2. The job is synchronized from the checkpoint instead of the full amount.
The phenomenon shows that mysqlcdc high availability handover is successful.
We check the operation log to further verify:
1. Get the initial gtids=d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61282
2. Confirm that MySQL's current gtid set d667b1cd-778a-11ec-aa60-6c92bf64e18c: 1-61290 contains the initial GTID set d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61282.
3. Calculate the synchronizable GTID set d667b1cd-778a-11ec-aa60-6c92bf64e18c:61283-61290.
4. Set the synchronization starting point to start synchronization.
The detailed log is as follows:
2022-02-17 20:05:55,961 INFO org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [MySqlBinlogSplit{splitId='binlog-split', offset={ts_sec=0, file=mysql-bin.000005, pos=31359, gtids=d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61282, row=0, event=0}, endOffset={ts_sec=0, file=, pos=-9223372036854775808, row=0, event=0}, isSuspended=false}] 2022-02-17 20:05:55,966 INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 0 2022-02-17 20:05:55,970 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, mysql_test_1]], fields=[id, data, create_time]) -> NotNullEnforcer(fields=[id]) -> Map (1/1)#0 (7fdfa75135e212f596f80bd2716e0837) switched from INITIALIZING to RUNNING. com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader [] - BinlogSplitReader is created. 2022-02-17 20:05:56,237 INFO com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext [] - MySQL current GTID set d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61290 does contain the GTID set d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61282 required by the connector. 2022-02-17 20:05:56,247 INFO com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext [] - Server has already purged d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61103 GTIDs 2022-02-17 20:05:56,248 INFO com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext [] - GTID set d667b1cd-778a-11ec-aa60-6c92bf64e18c:61283-61290 known by the server but not processed yet, for replication are available only GTID set d667b1cd-778a-11ec-aa60-6c92bf64e18c:61283-61290 2022-02-17 20:05:56,248 INFO io.debezium.relational.history.DatabaseHistoryMetrics [] - Started database history recovery 2022-02-17 20:05:56,249 INFO io.debezium.relational.history.DatabaseHistoryMetrics [] - Finished database history recovery of 0 change(s) in 1 ms 2022-02-17 20:05:56,277 INFO io.debezium.util.Threads [] - Requested thread factory for connector MySqlConnector, id = mysql_binlog_source named = binlog-client 2022-02-17 20:05:56,287 INFO io.debezium.connector.mysql.MySqlStreamingChangeEventSource [] - GTID set purged on server: d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61103 2022-02-17 20:05:56,287 INFO io.debezium.connector.mysql.MySqlStreamingChangeEventSource [] - Attempting to generate a filtered GTID set 2022-02-17 20:05:56,287 INFO io.debezium.connector.mysql.MySqlStreamingChangeEventSource [] - GTID set from previous recorded offset: d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61282 2022-02-17 20:05:56,288 INFO io.debezium.connector.mysql.MySqlStreamingChangeEventSource [] - GTID set available on server: d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61290 2022-02-17 20:05:56,288 INFO io.debezium.connector.mysql.MySqlStreamingChangeEventSource [] - Using first available positions for new GTID channels 2022-02-17 20:05:56,288 INFO io.debezium.connector.mysql.MySqlStreamingChangeEventSource [] - Relevant GTID set available on server: d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61290 2022-02-17 20:05:56,289 INFO io.debezium.connector.mysql.MySqlStreamingChangeEventSource [] - Final merged GTID set to use when connecting to MySQL: d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61282 2022-02-17 20:05:56,289 INFO io.debezium.connector.mysql.MySqlStreamingChangeEventSource [] - Registering binlog reader with GTID set: d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61282 2022-02-17 20:05:56,289 INFO io.debezium.connector.mysql.MySqlStreamingChangeEventSource [] - Skip 0 events on streaming start 2022-02-17 20:05:56,289 INFO io.debezium.connector.mysql.MySqlStreamingChangeEventSource [] - Skip 0 rows on streaming start 2022-02-17 20:05:56,290 INFO io.debezium.util.Threads [] - Creating thread debezium-mysqlconnector-mysql_binlog_source-binlog-client 2022-02-17 20:05:56,293 INFO io.debezium.util.Threads [] - Creating thread debezium-mysqlconnector-mysql_binlog_source-binlog-client 2022-02-17 20:05:56,302 INFO io.debezium.connector.mysql.MySqlStreamingChangeEventSource [] - Connected to MySQL binlog at 10.130.49.141:3306, starting at MySqlOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=mysql-bin.000005, currentBinlogPosition=31359, currentRowNumber=0, serverId=0, sourceTime=null, threadId=-1, currentQuery=null, tableIds=[], databaseName=null], partition={server=mysql_binlog_source}, snapshotCompleted=false, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet=d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61282, currentGtidSet=d667b1cd-778a-11ec-aa60-6c92bf64e18c:1-61282, restartBinlogFilename=mysql-bin.000005, restartBinlogPosition=31359, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId=null]
4, Summary
So far, three rounds of verification have been conducted to confirm that flynkcdc can support mysql high availability. The required conditions are:
1. mysql service enable gtid_mode
2. At present, there are bugs in the officially released version (2.2-SNAPSHOT), which can not directly support high availability. After compiling bugfix, you can use it manually.