Handle construction of Flink connector based on GBase8s

brief introduction

This article will first explain to you what Flink connector and CDC are, and then build a simple Flink connector for GBase8s with you by hand, and complete the practical project, that is, synchronize data to GBase8s through Mysql CDC in real time through the connector.

What is Flink connector

Flink has built-in basic data sources and receivers that are always available. The predefined data source includes files, Mysql, RabbitMq, Kafka, ES, etc. it also supports data output to files, Mysql, RabbitMq, Kafka, ES, etc.

To put it simply: the flink connector encapsulates some data source loading and data output (connectors). As long as we introduce the corresponding connector dependency, we can quickly complete the loading of data sources and data output.

What is CDC (Change Data Capture)

First, what is CDC? It is the abbreviation of Change Data Capture, that is, the abbreviation of Change Data Capture. Using CDC, we can obtain the submitted changes from the database and send them to the downstream for downstream use. These changes can include INSERT,DELETE,UPDATE and other operations.

Its main application scenarios:

  • Data synchronization or backup between heterogeneous databases / establishment of data analysis and calculation platform

  • Shared data status between microservices

  • Update cache / CQRS Query view update

CDC is a broad concept. As long as we can capture changed data, we can call it CDC. There are mainly query based CDC and log based CDC in the industry. You can compare their functions and differences from the table below.

Query based CDCLog based CDC
conceptEach time a change is captured, initiate a Select query to scan the whole table and filter out the data changed between queriesRead the log of the data storage system, such as binlog in MySQL for continuous monitoring
Open source productsSqoop, Kafka JDBC SourceCanal, Maxwell, Debezium
Execution modeBatchStreaming
Capture all data changes
Low latency without increasing database load
No intrusion service (LastUpdated field)
Captures the status of delete events and old records
Capture the status of old records

flink-connector-gbasedbt

In fact, we can directly import CDC data into our target database by handwritten Sink. Isn't that elegant enough? Can we import data into GBase8s through Flink SQL? The answer is yes. Next, let's implement a simple Flink connector for GBase8s

  1. Build row converter

  2. Build Dialect

  3. Register dynamic table factory and related Sink programs

After the above three steps, you can implement a simple connector. Next, let's look at how to achieve:

Build row converter

package wang.datahub.converter;

import org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter;
import org.apache.flink.table.types.logical.RowType;

/**
* @author lijiaqi
*/
public class GBasedbtRowConverter extends AbstractJdbcRowConverter {

   public GBasedbtRowConverter(RowType rowType) {
       super(rowType);
  }

   private static final long serialVersionUID = 1L;

   @Override
   public String converterName() {
       return "gbasedbt";
  }

}

Build Dialect

package wang.datahub.dialect;

import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.types.logical.RowType;
import wang.datahub.converter.GBasedbtRowConverter;

import java.util.Optional;

/**
*
* @author lijiaqi
*/
public class GBasedbtDialect implements JdbcDialect {

   private static final long serialVersionUID = 1L;

   @Override
   public String dialectName() {
       return "gbasedbt";
  }

   @Override
   public boolean canHandle(String url) {
       return url.startsWith("jdbc:gbasedbt-sqli:");
  }

   @Override
   public JdbcRowConverter getRowConverter(RowType rowType) {
       return new GBasedbtRowConverter(rowType);
  }

   @Override
   public String getLimitClause(long l) {
       return null;
  }

   @Override
   public void validate(TableSchema schema) throws ValidationException {
       JdbcDialect.super.validate(schema);
  }

   @Override
   public Optional<String> defaultDriverName() {
       return Optional.of("com.gbasedbt.jdbc.Driver");
  }

   @Override
   public String quoteIdentifier(String identifier) {
       return "'" + identifier + "'";
  }

   @Override
   public Optional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) {
       return JdbcDialect.super.getUpsertStatement(tableName, fieldNames, uniqueKeyFields);
  }

   @Override
   public String getRowExistsStatement(String tableName, String[] conditionFields) {
       return JdbcDialect.super.getRowExistsStatement(tableName, conditionFields);
  }

   @Override
   public String getInsertIntoStatement(String tableName, String[] fieldNames) {
       return JdbcDialect.super.getInsertIntoStatement(tableName, fieldNames);
  }

   @Override
   public String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) {
       return JdbcDialect.super.getUpdateStatement(tableName, fieldNames, conditionFields);
  }

   @Override
   public String getDeleteStatement(String tableName, String[] conditionFields) {
       return JdbcDialect.super.getDeleteStatement(tableName, conditionFields);
  }

   @Override
   public String getSelectFromStatement(String tableName, String[] selectFields, String[] conditionFields) {
       return JdbcDialect.super.getSelectFromStatement(tableName, selectFields, conditionFields);
  }

}

Register dynamic table factory and related Sink programs

First, create GBasedbtSinkFunction to accept RowData data input and Sink it into the configured database

package wang.datahub.table;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;

/**
* @author lijiaqi
*/
public class GBasedbtSinkFunction extends RichSinkFunction<RowData> {

   private static final long serialVersionUID = 1L;

   private final JdbcOptions jdbcOptions;
   private final SerializationSchema<RowData> serializationSchema = null;
   private DataType dateType;

   private Connection conn;
   private Statement stmt;

   public GBasedbtSinkFunction(JdbcOptions jdbcOptions) {
       this.jdbcOptions = jdbcOptions;
  }

   public GBasedbtSinkFunction(JdbcOptions jdbcOptions, DataType dataType) {
       this.jdbcOptions = jdbcOptions;
       this.dateType = dataType;
  }

   @Override
   public void open(Configuration parameters) {
       System.out.println("open connection !!!!!");
       try {
           if (null == conn) {
               Class.forName(jdbcOptions.getDriverName());
               conn = DriverManager.getConnection(jdbcOptions.getDbURL(),jdbcOptions.getUsername().orElse(null),jdbcOptions.getPassword().orElse(null));
          }
      } catch (Exception e) {
           e.printStackTrace();
      }
  }

   @Override
   public void invoke(RowData value, Context context) throws Exception {

       try {
           stmt = conn.createStatement();
           String sql = "insert into " + this.jdbcOptions.getTableName() + " values ( ";
           for (int i = 0; i < value.getArity(); i++) {
               //Here you need to match according to the type of event
               if(dateType.getChildren().get(i).getConversionClass().equals(Integer.class)){
                   sql += +value.getInt(i)+ " ,";
              }else {
                   sql += "'"+value.getString(i) + "' ,";
              }
          }
           sql = sql.substring(0, sql.length() - 1);
           sql += " ); ";

           System.out.println("sql ==>" + sql);

           stmt.execute(sql);
      }catch(Exception e){
           e.printStackTrace();
      }
  }

   @Override
   public void close() throws Exception {
       if (stmt != null) {
           stmt.close();
      }
       if (conn != null) {
           conn.close();
      }
  }

}

Building GBasedbtDynamicTableSink

package wang.datahub.table;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;

/**
* @author lijiaqi
*/
public class GBasedbtDynamicTableSink implements DynamicTableSink {

   private final JdbcOptions jdbcOptions;
   private final EncodingFormat<SerializationSchema<RowData>> encodingFormat;
   private final DataType dataType;

   public GBasedbtDynamicTableSink(JdbcOptions jdbcOptions, EncodingFormat<SerializationSchema<RowData>> encodingFormat, DataType dataType) {
       this.jdbcOptions = jdbcOptions;
       this.encodingFormat = encodingFormat;
       this.dataType = dataType;
  }

   @Override
   public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
       return requestedMode;
  }

   @Override
   public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
       System.out.println("SinkRuntimeProvider");
       System.out.println(dataType);
       GBasedbtSinkFunction gbasedbtSinkFunction = new GBasedbtSinkFunction(jdbcOptions,dataType);
       return SinkFunctionProvider.of(gbasedbtSinkFunction);
  }

   @Override
   public DynamicTableSink copy() {
       return new GBasedbtDynamicTableSink(jdbcOptions, encodingFormat, dataType);
  }

   @Override
   public String asSummaryString() {
       return "gbasedbt Table Sink";
  }

}

Building GBasedbtDynamicTableFactory

package wang.datahub.table;


import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.TableSchemaUtils;
import wang.datahub.dialect.GBasedbtDialect;

import java.util.HashSet;
import java.util.Set;

/**
* @author lijiaqi
*/
public class GBasedbtDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {

   public static final String IDENTIFIER = "gbasedbt";

   private static final String DRIVER_NAME = "com.gbasedbt.jdbc.Driver";

   public static final ConfigOption<String> URL = ConfigOptions
          .key("url")
          .stringType()
          .noDefaultValue()
          .withDescription("the jdbc database url.");

   public static final ConfigOption<String> DRIVER = ConfigOptions
          .key("driver")
          .stringType()
          .defaultValue(DRIVER_NAME)
          .withDescription("the jdbc driver.");

   public static final ConfigOption<String> TABLE_NAME = ConfigOptions
          .key("table-name")
          .stringType()
          .noDefaultValue()
          .withDescription("the jdbc table name.");

   public static final ConfigOption<String> USERNAME = ConfigOptions
          .key("username")
          .stringType()
          .noDefaultValue()
          .withDescription("the jdbc user name.");

   public static final ConfigOption<String> PASSWORD = ConfigOptions
          .key("password")
          .stringType()
          .noDefaultValue()
          .withDescription("the jdbc password.");

//   public static final ConfigOption<String> FORMAT = ConfigOptions
//           .key("format")
//           .stringType()
//           .noDefaultValue()
//           .withDescription("the format.");

   @Override
   public String factoryIdentifier() {
       return IDENTIFIER;
  }

   @Override
   public Set<ConfigOption<?>> requiredOptions() {
       Set<ConfigOption<?>> requiredOptions = new HashSet<>();
       requiredOptions.add(URL);
       requiredOptions.add(TABLE_NAME);
       requiredOptions.add(USERNAME);
       requiredOptions.add(PASSWORD);
//       requiredOptions.add(FORMAT);
       return requiredOptions;
  }

   @Override
   public Set<ConfigOption<?>> optionalOptions() {
       return new HashSet<>();
  }

   @Override
   public DynamicTableSource createDynamicTableSource(Context context) {
       
       final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);

       final ReadableConfig config = helper.getOptions();
       
       helper.validate();

       JdbcOptions jdbcOptions = getJdbcOptions(config);
       
       TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());

       return new GBasedbtDynamicTableSource(jdbcOptions, physicalSchema);

  }

   @Override
   public DynamicTableSink createDynamicTableSink(Context context) {
       
       final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);

//       final EncodingFormat<SerializationSchema<RowData>> encodingFormat = helper.discoverEncodingFormat(
//               SerializationFormatFactory.class,
//               FactoryUtil.FORMAT);

       final ReadableConfig config = helper.getOptions();

       helper.validate();

       JdbcOptions jdbcOptions = getJdbcOptions(config);

       final DataType dataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();

       return new GBasedbtDynamicTableSink(jdbcOptions, null, dataType);
  }

   private JdbcOptions getJdbcOptions(ReadableConfig readableConfig) {
       final String url = readableConfig.get(URL);
       final JdbcOptions.Builder builder = JdbcOptions.builder()
              .setDriverName(DRIVER_NAME)
              .setDBUrl(url)
              .setTableName(readableConfig.get(TABLE_NAME))
              .setDialect(new GBasedbtDialect());

       readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
       readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword);
       return builder.build();
  }

}

Next, register the dynamic table through SPI: create the file resources \ meta-inf \ services \ org apache. flink. table. factories. Factory content is registered as Wang datahub. table. GBasedbtDynamicTableFactory

So far, our Flink connector has been built. Next, we will use it to complete a real project.

Actual combat project

The following is the overall architecture diagram of the project. We get the change data from mysql through the flick CDC, and then sink the data into gbase8s through the flick SQL

Next, let's take a look at how to implement CDC through Flink SQL. Only three SQL statements are required.

Create data source table

       // Data source table
       String sourceDDL =
               "CREATE TABLE mysql_binlog (\n" +
                       " id INT NOT NULL,\n" +
                       " name STRING,\n" +
                       " description STRING\n" +
                       ") WITH (\n" +
                       " 'connector' = 'mysql-cdc',\n" +
                       " 'hostname' = 'localhost',\n" +
                       " 'port' = '3306',\n" +
                       " 'username' = 'flinkcdc',\n" +
                       " 'password' = '123456',\n" +
                       " 'database-name' = 'test',\n" +
                       " 'table-name' = 'test_cdc'\n" +
                       ")";

Create an output table and output to GBase8s, where the connector is set to gbasedbt

       String url = "jdbc:gbasedbt-sqli://172.31.95.133:9088/t1:GBASEDBTSERVER=myserver;NEWCODESET=UTF8,zh_cn.UTF8,57372;DATABASE=mydb;DB_LOCALE=en_US.819;";
       String userName = "gbasedbt";
       String password = "123456";
       String gbasedbtSinkTable = "ta";
       // Output target table
       String sinkDDL =
               "CREATE TABLE test_cdc_sink (\n" +
                       " id INT NOT NULL,\n" +
                       " name STRING,\n" +
                       " description STRING,\n" +
                       " PRIMARY KEY (id) NOT ENFORCED \n " +
                       ") WITH (\n" +
                       " 'connector' = 'gbasedbt',\n" +
//                       " 'driver' = 'com.gbasedbt.jdbc.Driver',\n" +
                       " 'url' = '" + url + "',\n" +
                       " 'username' = '" + userName + "',\n" +
                       " 'password' = '" + password + "',\n" +
                       " 'table-name' = '" + gbasedbtSinkTable + "' \n" +
                       ")";

Here we import the data directly

       String transformSQL =
               "insert into test_cdc_sink select * from mysql_binlog";

Full reference code

package wang.datahub.cdc;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class MysqlToGBasedbtlMain {
   public static void main(String[] args) throws Exception {
       EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
              .useBlinkPlanner()
              .inStreamingMode()
              .build();
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setParallelism(1);
       StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);



       tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);


       // Data source table
       String sourceDDL =
               "CREATE TABLE mysql_binlog (\n" +
                       " id INT NOT NULL,\n" +
                       " name STRING,\n" +
                       " description STRING\n" +
                       ") WITH (\n" +
                       " 'connector' = 'mysql-cdc',\n" +
                       " 'hostname' = 'localhost',\n" +
                       " 'port' = '3306',\n" +
                       " 'username' = 'flinkcdc',\n" +
                       " 'password' = '123456',\n" +
                       " 'database-name' = 'test',\n" +
                       " 'table-name' = 'test_cdc'\n" +
                       ")";


       String url = "jdbc:gbasedbt-sqli://172.31.95.133:9088/t1:GBASEDBTSERVER=myserver;NEWCODESET=UTF8,zh_cn.UTF8,57372;DATABASE=mydb;DB_LOCALE=en_US.819;";
       String userName = "gbasedbt";
       String password = "123456";
       String gbasedbtSinkTable = "ta";
       // Output target table
       String sinkDDL =
               "CREATE TABLE test_cdc_sink (\n" +
                       " id INT NOT NULL,\n" +
                       " name STRING,\n" +
                       " description STRING,\n" +
                       " PRIMARY KEY (id) NOT ENFORCED \n " +
                       ") WITH (\n" +
                       " 'connector' = 'gbasedbt',\n" +
//                       " 'driver' = 'com.gbasedbt.jdbc.Driver',\n" +
                       " 'url' = '" + url + "',\n" +
                       " 'username' = '" + userName + "',\n" +
                       " 'password' = '" + password + "',\n" +
                       " 'table-name' = '" + gbasedbtSinkTable + "' \n" +
                       ")";

       String transformSQL =
               "insert into test_cdc_sink select * from mysql_binlog";

       tableEnv.executeSql(sourceDDL);
       tableEnv.executeSql(sinkDDL);
       TableResult result = tableEnv.executeSql(transformSQL);

       result.print();
       env.execute("sync-flink-cdc");
  }

}

Operation results

Check the data and enter it into the database

Reference link:

https://blog.csdn.net/zhangjun5965/article/details/107605396

https://cloud.tencent.com/developer/article/1745233?from=article.detail.1747773

https://segmentfault.com/a/1190000039662261

https://www.cnblogs.com/weijiqian/p/13994870.html

https://blog.csdn.net/dafei1288/article/details/118192917

Keywords: Java flink gbase CDC

Added by tnewton on Fri, 14 Jan 2022 08:43:23 +0200