Flink_ 08_ SQL (personal summary)

Statement: 1 ***
              2. Because it is a personal summary, write the article with the most concise words
              3. If there is any mistake or improper place, please point out

FlinkSQL & TableAPI:

Create a table environment and create tables. The table API is used for the conversion of table data and stream data

For writing SQL:

FlinkSQL writes SQL directly, and the query uses the select keyword of SQL directly,

The query of TableAPI uses the select () API;

API is a stream batch integration


The table is composed of CataLog + database + table name. CataLog and database have default values

CataLogSchema > DataBaseSchema > Table

CataLogSchema stores metadata information, such as database, table and view information

Create table environment:

// Direct creation, based on stream processing
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// How to bring the EnvironmentSettings
// Stream based processing
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance( )
    .useBlinkPlanner( )
    .inStreamingMode( )
    .build( );
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);

// Batch based
EnvironmentSettings bbSettings = EnvironmentSettings.newInstance( )
    .useBlinkPlanner( )
    .inBatchMode( ).
    build( );
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);


1. Convert DataStream to table (common):

The data of this kind of table is constantly changing, which is a dynamic table

Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp.rowtime as ts, temperature");

2. Create a Table according to the external system:

You need to specify the file format (used to split fields) and field name

Define TableSchema:


tableEnv.connect(new FileSystem( ).path("sensor.txt")) 
    .withFormat(new Csv( ))  
    .withSchema(new Schema( )
    .field("id", DataTypes.STRING( ))
    .field("timestamp", DataTypes.BIGINT( ))
    .field("temperature", DataTypes.DOUBLE( ))


    new Kafka( )
    .property("zookeeper.connect", "localhost:2181")
    .property("bootstrap.servers", "localhost:9092"))
    .withFormat(new Csv( ))
    .withSchema(new Schema( )
    .field("id", DataTypes.STRING( ))
    .field("timestamp", DataTypes.BIGINT( ))
    .field("temperature", DataTypes.DOUBLE( ))

Write SQL DDL statements:

String sinkDDL = "create table dataTable (" +
 " id varchar(20) not null, " +
 " ts bigint, " +
 " temperature double"
  ) with (" +
 " 'connector.type' = 'filesystem', " +
 " 'connector.path' = '/sensor.txt', " +
 " 'format.type' = 'csv')";

It also supports ElasticSearch, MySQL, HBase and Hive

Specify time field & time semantics:

  1. Specified when converting DataStream to Table

    // Event time. Specify the watermark delay time in DataStream
     .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(1)) {
    	 public long extractTimestamp(SensorReading element) {
    		 return element.getTimestamp( ) * 1000L;
    Table sensorTable = tableEnv.fromDataStream(dataStream, "id, temperature, timestamp.rowtime");
    //Processing time, add a field as the time field
    Table sensorTable = tableEnv.fromDataStream(dataStream, "id, temperature, timestamp, pt.proctime");
  2. Specified when defining Table Schema table structure

    // Event time
     new FileSystem( ).path("sensor.txt"))
     .withFormat(new Csv( ))
     .withSchema(new Schema( )
     .field("id", DataTypes.STRING( ))
     .field("timestamp", DataTypes.BIGINT( ))
     new Rowtime( )
     .field("temperature", DataTypes.DOUBLE( ))
    // processing time 
     new FileSystem( ).path("..\\sensor.txt"))
     .withFormat(new Csv( ))
     .withSchema(new Schema( )
     .field("id", DataTypes.STRING( ))
     .field("timestamp", DataTypes.BIGINT( ))
     .field("temperature", DataTypes.DOUBLE( ))
     .field("pt", DataTypes.TIMESTAMP(3))
     .proctime( ) 
  3. Specified in the DDL statement that writes SQL when creating the table

    // Event time
    String sinkDDL = "create table dataTable (" +
     " id varchar(20) not null, " +
     " ts bigint, " +
     " temperature double, " +
     " rt AS TO_TIMESTAMP( FROM_UNIXTIME(ts) ), " +
     " watermark for rt as rt - interval '1' second" +
     ") with (" +
     " 'connector.type' = 'filesystem', " +
     " 'connector.path' = '/sensor.txt', " +
     " 'format.type' = 'csv')";
    // processing time 
    String sinkDDL = "create table dataTable (" +
     " id varchar(20) not null, " +
     " ts bigint, " +
     " temperature double, " +
     " pt AS PROCTIME( ) " +
     ") with (" +
     " 'connector.type' = 'filesystem', " +
     " 'connector.path' = '/sensor.txt', " +
     " 'format.type' = 'csv')";


1. Convert Table to DataStream:

If there is no aggregation operation 𞓜 aggregation + window, the append mode is generally used (the calculation is carried out only when the window is closed, which is also the append mode)

Aggregation + non window, generally in withdrawal mode (with update operation)

  1. append mode

    DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable,Row.class);
  2. Withdrawal mode

    DataStream<Tuple2<Boolean, Row>> aggResultStream = tableEnv.toRetractStream(aggResultTable, Row.class);

2. Output to external system:


// Building TableSink table information
tableEnv.connect(new FileSystem( ).path("...\\resources\\out.txt")) 
   		 .withFormat(new Csv( ))  
  	        .withSchema(new Schema( )
              .field("id", DataTypes.STRING( ))
              .field("temp", DataTypes.DOUBLE( ))

// Output to TableSink

Update mode:

  1. Append mode

    Only insert

  2. Retract mode

    insert, delete

  3. Update insert mode

    With upsert

    Can only be used when exporting to an external file system


  1. scroll window

    TUMBLE(time_attr, interval)

    TUMBLE_PROCTIME(time_attr, interval)

    TUMBLE_ROWTIME(time_attr, interval)

  2. sliding window

    HOP(time_attr, interval, interval)

    HOP_PROCTIME(time_attr, interval, interval)

    HOP_ROWTIME(time_attr, interval, interval)

    Here, the parameters are opposite to the DataStream. The first interval is the window sliding step size, and the second interval is the window size

  3. Session window

    SESSION(time_attr, interval)

    SESSION_PROCTIME(time_attr, interval)

    SESSION_ROWTIME(time_attr, interval)

Other related functions:

TUMBLE_START(time_attr, interval)
HOP_START(time_attr, interval, interval)
SESSION_START(time_attr, interval)

HOP_ROWTIME(time_attr, interval, interval)
SESSION_ROWTIME(time_attr, interval)


System functions:




date '2020-06-14'

timestamp '2020-06-14'

to_ The default format of timestamp (from_unixtime (TS' format ') is' YYYY-MM-DD hh:mm:ss'


Function over:

partitionBy optional

oorderBy is required, and the specified field is a field with time semantics (an additional time field is generated when processing time)

preceding optional

Custom UDF:

  1. Inherit ScalarFunction
  2. Write eval method

Custom UDTF:

  1. Inherit tablefunction < out >
  2. Write eval method

In SQL, the field can be enclosed by single quotation marks


-- Equivalent to implicit profiling split(id) as id
select id, word from movie, lateral table(split(id))

Customize UDAF:

  1. Inherit aggregatefunction < out, state >

    STATE is an intermediate data type

  2. Override the createAccumulator method

  3. Override getValue method

  4. Write the calculate method

Custom UDTAF

This can only be done by using TableAPI, not FlinkSQL

Commonly used to find topN

In fact, it is to group first and then aggregate the profile (this aggregate profile is to output the topn in each group)

  1. Inherit tableaggregatefunction < out, state >
  2. Override the createAccumulator method
  3. Write emitValue method
  4. Write the calculate method


The state must be used when join ing. You can specify the maximum time of the state

// 0 means the status will never be cleared
tableEnv.getConfig( ).setIdleStateRetention(Duration.ofSeconds(10))

This maximum time refers to how long it takes to clear after the last use, similar to the concept of Session

The bottom layer is the timer:

Execute the task of clearing status in the timer. Whenever there is a data join: delete the current timer and re register the timer

