brief introduction
This article will first explain to you what Flink connector and CDC are, and then build a simple Flink connector for GBase8s with you by hand, and complete the practical project, that is, synchronize data to GBase8s through Mysql CDC in real time through the connector.
What is Flink connector
Flink has built-in basic data sources and receivers that are always available. The predefined data source includes files, Mysql, RabbitMq, Kafka, ES, etc. it also supports data output to files, Mysql, RabbitMq, Kafka, ES, etc.
To put it simply: the flink connector encapsulates some data source loading and data output (connectors). As long as we introduce the corresponding connector dependency, we can quickly complete the loading of data sources and data output.
What is CDC (Change Data Capture)
First, what is CDC? It is the abbreviation of Change Data Capture, that is, the abbreviation of Change Data Capture. Using CDC, we can obtain the submitted changes from the database and send them to the downstream for downstream use. These changes can include INSERT,DELETE,UPDATE and other operations.
Its main application scenarios:
-
Data synchronization or backup between heterogeneous databases / establishment of data analysis and calculation platform
-
Shared data status between microservices
-
Update cache / CQRS Query view update
CDC is a broad concept. As long as we can capture changed data, we can call it CDC. There are mainly query based CDC and log based CDC in the industry. You can compare their functions and differences from the table below.
Query based CDC | Log based CDC | |
---|---|---|
concept | Each time a change is captured, initiate a Select query to scan the whole table and filter out the data changed between queries | Read the log of the data storage system, such as binlog in MySQL for continuous monitoring |
Open source products | Sqoop, Kafka JDBC Source | Canal, Maxwell, Debezium |
Execution mode | Batch | Streaming |
Capture all data changes | ❌ | ✅ |
Low latency without increasing database load | ❌ | ✅ |
No intrusion service (LastUpdated field) | ❌ | ✅ |
Captures the status of delete events and old records | ❌ | ✅ |
Capture the status of old records | ❌ | ✅ |
flink-connector-gbasedbt
In fact, we can directly import CDC data into our target database by handwritten Sink. Isn't that elegant enough? Can we import data into GBase8s through Flink SQL? The answer is yes. Next, let's implement a simple Flink connector for GBase8s
-
Build row converter
-
Build Dialect
-
Register dynamic table factory and related Sink programs
After the above three steps, you can implement a simple connector. Next, let's look at how to achieve:
Build row converter
package wang.datahub.converter; import org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter; import org.apache.flink.table.types.logical.RowType; /** * @author lijiaqi */ public class GBasedbtRowConverter extends AbstractJdbcRowConverter { public GBasedbtRowConverter(RowType rowType) { super(rowType); } private static final long serialVersionUID = 1L; @Override public String converterName() { return "gbasedbt"; } }
Build Dialect
package wang.datahub.dialect; import org.apache.flink.connector.jdbc.dialect.JdbcDialect; import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.types.logical.RowType; import wang.datahub.converter.GBasedbtRowConverter; import java.util.Optional; /** * * @author lijiaqi */ public class GBasedbtDialect implements JdbcDialect { private static final long serialVersionUID = 1L; @Override public String dialectName() { return "gbasedbt"; } @Override public boolean canHandle(String url) { return url.startsWith("jdbc:gbasedbt-sqli:"); } @Override public JdbcRowConverter getRowConverter(RowType rowType) { return new GBasedbtRowConverter(rowType); } @Override public String getLimitClause(long l) { return null; } @Override public void validate(TableSchema schema) throws ValidationException { JdbcDialect.super.validate(schema); } @Override public Optional<String> defaultDriverName() { return Optional.of("com.gbasedbt.jdbc.Driver"); } @Override public String quoteIdentifier(String identifier) { return "'" + identifier + "'"; } @Override public Optional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) { return JdbcDialect.super.getUpsertStatement(tableName, fieldNames, uniqueKeyFields); } @Override public String getRowExistsStatement(String tableName, String[] conditionFields) { return JdbcDialect.super.getRowExistsStatement(tableName, conditionFields); } @Override public String getInsertIntoStatement(String tableName, String[] fieldNames) { return JdbcDialect.super.getInsertIntoStatement(tableName, fieldNames); } @Override public String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) { return JdbcDialect.super.getUpdateStatement(tableName, fieldNames, conditionFields); } @Override public String getDeleteStatement(String tableName, String[] conditionFields) { return JdbcDialect.super.getDeleteStatement(tableName, conditionFields); } @Override public String getSelectFromStatement(String tableName, String[] selectFields, String[] conditionFields) { return JdbcDialect.super.getSelectFromStatement(tableName, selectFields, conditionFields); } }
Register dynamic table factory and related Sink programs
First, create GBasedbtSinkFunction to accept RowData data input and Sink it into the configured database
package wang.datahub.table; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.jdbc.internal.options.JdbcOptions; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import java.sql.Connection; import java.sql.DriverManager; import java.sql.Statement; /** * @author lijiaqi */ public class GBasedbtSinkFunction extends RichSinkFunction<RowData> { private static final long serialVersionUID = 1L; private final JdbcOptions jdbcOptions; private final SerializationSchema<RowData> serializationSchema = null; private DataType dateType; private Connection conn; private Statement stmt; public GBasedbtSinkFunction(JdbcOptions jdbcOptions) { this.jdbcOptions = jdbcOptions; } public GBasedbtSinkFunction(JdbcOptions jdbcOptions, DataType dataType) { this.jdbcOptions = jdbcOptions; this.dateType = dataType; } @Override public void open(Configuration parameters) { System.out.println("open connection !!!!!"); try { if (null == conn) { Class.forName(jdbcOptions.getDriverName()); conn = DriverManager.getConnection(jdbcOptions.getDbURL(),jdbcOptions.getUsername().orElse(null),jdbcOptions.getPassword().orElse(null)); } } catch (Exception e) { e.printStackTrace(); } } @Override public void invoke(RowData value, Context context) throws Exception { try { stmt = conn.createStatement(); String sql = "insert into " + this.jdbcOptions.getTableName() + " values ( "; for (int i = 0; i < value.getArity(); i++) { //Here you need to match according to the type of event if(dateType.getChildren().get(i).getConversionClass().equals(Integer.class)){ sql += +value.getInt(i)+ " ,"; }else { sql += "'"+value.getString(i) + "' ,"; } } sql = sql.substring(0, sql.length() - 1); sql += " ); "; System.out.println("sql ==>" + sql); stmt.execute(sql); }catch(Exception e){ e.printStackTrace(); } } @Override public void close() throws Exception { if (stmt != null) { stmt.close(); } if (conn != null) { conn.close(); } } }
Building GBasedbtDynamicTableSink
package wang.datahub.table; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.connector.jdbc.internal.options.JdbcOptions; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.format.EncodingFormat; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.sink.SinkFunctionProvider; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; /** * @author lijiaqi */ public class GBasedbtDynamicTableSink implements DynamicTableSink { private final JdbcOptions jdbcOptions; private final EncodingFormat<SerializationSchema<RowData>> encodingFormat; private final DataType dataType; public GBasedbtDynamicTableSink(JdbcOptions jdbcOptions, EncodingFormat<SerializationSchema<RowData>> encodingFormat, DataType dataType) { this.jdbcOptions = jdbcOptions; this.encodingFormat = encodingFormat; this.dataType = dataType; } @Override public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { return requestedMode; } @Override public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { System.out.println("SinkRuntimeProvider"); System.out.println(dataType); GBasedbtSinkFunction gbasedbtSinkFunction = new GBasedbtSinkFunction(jdbcOptions,dataType); return SinkFunctionProvider.of(gbasedbtSinkFunction); } @Override public DynamicTableSink copy() { return new GBasedbtDynamicTableSink(jdbcOptions, encodingFormat, dataType); } @Override public String asSummaryString() { return "gbasedbt Table Sink"; } }
Building GBasedbtDynamicTableFactory
package wang.datahub.table; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.connector.jdbc.internal.options.JdbcOptions; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.factories.DynamicTableSinkFactory; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.types.DataType; import org.apache.flink.table.utils.TableSchemaUtils; import wang.datahub.dialect.GBasedbtDialect; import java.util.HashSet; import java.util.Set; /** * @author lijiaqi */ public class GBasedbtDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory { public static final String IDENTIFIER = "gbasedbt"; private static final String DRIVER_NAME = "com.gbasedbt.jdbc.Driver"; public static final ConfigOption<String> URL = ConfigOptions .key("url") .stringType() .noDefaultValue() .withDescription("the jdbc database url."); public static final ConfigOption<String> DRIVER = ConfigOptions .key("driver") .stringType() .defaultValue(DRIVER_NAME) .withDescription("the jdbc driver."); public static final ConfigOption<String> TABLE_NAME = ConfigOptions .key("table-name") .stringType() .noDefaultValue() .withDescription("the jdbc table name."); public static final ConfigOption<String> USERNAME = ConfigOptions .key("username") .stringType() .noDefaultValue() .withDescription("the jdbc user name."); public static final ConfigOption<String> PASSWORD = ConfigOptions .key("password") .stringType() .noDefaultValue() .withDescription("the jdbc password."); // public static final ConfigOption<String> FORMAT = ConfigOptions // .key("format") // .stringType() // .noDefaultValue() // .withDescription("the format."); @Override public String factoryIdentifier() { return IDENTIFIER; } @Override public Set<ConfigOption<?>> requiredOptions() { Set<ConfigOption<?>> requiredOptions = new HashSet<>(); requiredOptions.add(URL); requiredOptions.add(TABLE_NAME); requiredOptions.add(USERNAME); requiredOptions.add(PASSWORD); // requiredOptions.add(FORMAT); return requiredOptions; } @Override public Set<ConfigOption<?>> optionalOptions() { return new HashSet<>(); } @Override public DynamicTableSource createDynamicTableSource(Context context) { final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); final ReadableConfig config = helper.getOptions(); helper.validate(); JdbcOptions jdbcOptions = getJdbcOptions(config); TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); return new GBasedbtDynamicTableSource(jdbcOptions, physicalSchema); } @Override public DynamicTableSink createDynamicTableSink(Context context) { final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); // final EncodingFormat<SerializationSchema<RowData>> encodingFormat = helper.discoverEncodingFormat( // SerializationFormatFactory.class, // FactoryUtil.FORMAT); final ReadableConfig config = helper.getOptions(); helper.validate(); JdbcOptions jdbcOptions = getJdbcOptions(config); final DataType dataType = context.getCatalogTable().getSchema().toPhysicalRowDataType(); return new GBasedbtDynamicTableSink(jdbcOptions, null, dataType); } private JdbcOptions getJdbcOptions(ReadableConfig readableConfig) { final String url = readableConfig.get(URL); final JdbcOptions.Builder builder = JdbcOptions.builder() .setDriverName(DRIVER_NAME) .setDBUrl(url) .setTableName(readableConfig.get(TABLE_NAME)) .setDialect(new GBasedbtDialect()); readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername); readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword); return builder.build(); } }
Next, register the dynamic table through SPI: create the file resources \ meta-inf \ services \ org apache. flink. table. factories. Factory content is registered as Wang datahub. table. GBasedbtDynamicTableFactory
So far, our Flink connector has been built. Next, we will use it to complete a real project.
Actual combat project
The following is the overall architecture diagram of the project. We get the change data from mysql through the flick CDC, and then sink the data into gbase8s through the flick SQL
Next, let's take a look at how to implement CDC through Flink SQL. Only three SQL statements are required.
Create data source table
// Data source table String sourceDDL = "CREATE TABLE mysql_binlog (\n" + " id INT NOT NULL,\n" + " name STRING,\n" + " description STRING\n" + ") WITH (\n" + " 'connector' = 'mysql-cdc',\n" + " 'hostname' = 'localhost',\n" + " 'port' = '3306',\n" + " 'username' = 'flinkcdc',\n" + " 'password' = '123456',\n" + " 'database-name' = 'test',\n" + " 'table-name' = 'test_cdc'\n" + ")";
Create an output table and output to GBase8s, where the connector is set to gbasedbt
String url = "jdbc:gbasedbt-sqli://172.31.95.133:9088/t1:GBASEDBTSERVER=myserver;NEWCODESET=UTF8,zh_cn.UTF8,57372;DATABASE=mydb;DB_LOCALE=en_US.819;"; String userName = "gbasedbt"; String password = "123456"; String gbasedbtSinkTable = "ta"; // Output target table String sinkDDL = "CREATE TABLE test_cdc_sink (\n" + " id INT NOT NULL,\n" + " name STRING,\n" + " description STRING,\n" + " PRIMARY KEY (id) NOT ENFORCED \n " + ") WITH (\n" + " 'connector' = 'gbasedbt',\n" + // " 'driver' = 'com.gbasedbt.jdbc.Driver',\n" + " 'url' = '" + url + "',\n" + " 'username' = '" + userName + "',\n" + " 'password' = '" + password + "',\n" + " 'table-name' = '" + gbasedbtSinkTable + "' \n" + ")";
Here we import the data directly
String transformSQL = "insert into test_cdc_sink select * from mysql_binlog";
Full reference code
package wang.datahub.cdc; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class MysqlToGBasedbtlMain { public static void main(String[] args) throws Exception { EnvironmentSettings fsSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings); tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); // Data source table String sourceDDL = "CREATE TABLE mysql_binlog (\n" + " id INT NOT NULL,\n" + " name STRING,\n" + " description STRING\n" + ") WITH (\n" + " 'connector' = 'mysql-cdc',\n" + " 'hostname' = 'localhost',\n" + " 'port' = '3306',\n" + " 'username' = 'flinkcdc',\n" + " 'password' = '123456',\n" + " 'database-name' = 'test',\n" + " 'table-name' = 'test_cdc'\n" + ")"; String url = "jdbc:gbasedbt-sqli://172.31.95.133:9088/t1:GBASEDBTSERVER=myserver;NEWCODESET=UTF8,zh_cn.UTF8,57372;DATABASE=mydb;DB_LOCALE=en_US.819;"; String userName = "gbasedbt"; String password = "123456"; String gbasedbtSinkTable = "ta"; // Output target table String sinkDDL = "CREATE TABLE test_cdc_sink (\n" + " id INT NOT NULL,\n" + " name STRING,\n" + " description STRING,\n" + " PRIMARY KEY (id) NOT ENFORCED \n " + ") WITH (\n" + " 'connector' = 'gbasedbt',\n" + // " 'driver' = 'com.gbasedbt.jdbc.Driver',\n" + " 'url' = '" + url + "',\n" + " 'username' = '" + userName + "',\n" + " 'password' = '" + password + "',\n" + " 'table-name' = '" + gbasedbtSinkTable + "' \n" + ")"; String transformSQL = "insert into test_cdc_sink select * from mysql_binlog"; tableEnv.executeSql(sourceDDL); tableEnv.executeSql(sinkDDL); TableResult result = tableEnv.executeSql(transformSQL); result.print(); env.execute("sync-flink-cdc"); } }
Operation results
Check the data and enter it into the database
Reference link:
https://blog.csdn.net/zhangjun5965/article/details/107605396
https://cloud.tencent.com/developer/article/1745233?from=article.detail.1747773
https://segmentfault.com/a/1190000039662261
https://www.cnblogs.com/weijiqian/p/13994870.html
https://blog.csdn.net/dafei1288/article/details/118192917