Statement: 1 ***
2. Because it is a personal summary, write the article with the most concise words
3. If there is any mistake or improper place, please point out
FlinkSQL & TableAPI:
Create a table environment and create tables. The table API is used for the conversion of table data and stream data
For writing SQL:
FlinkSQL writes SQL directly, and the query uses the select keyword of SQL directly,
The query of TableAPI uses the select () API;
API is a stream batch integration
Table:
The table is composed of CataLog + database + table name. CataLog and database have default values
CataLogSchema > DataBaseSchema > Table
CataLogSchema stores metadata information, such as database, table and view information
Create table environment:
// Direct creation, based on stream processing StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // How to bring the EnvironmentSettings // Stream based processing EnvironmentSettings bsSettings = EnvironmentSettings.newInstance( ) .useBlinkPlanner( ) .inStreamingMode( ) .build( ); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings); // Batch based EnvironmentSettings bbSettings = EnvironmentSettings.newInstance( ) .useBlinkPlanner( ) .inBatchMode( ). build( ); TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
Input:
1. Convert DataStream to table (common):
The data of this kind of table is constantly changing, which is a dynamic table
Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp.rowtime as ts, temperature");
2. Create a Table according to the external system:
You need to specify the file format (used to split fields) and field name
Define TableSchema:
CSV:
tableEnv.connect(new FileSystem( ).path("sensor.txt")) .withFormat(new Csv( )) .withSchema(new Schema( ) .field("id", DataTypes.STRING( )) .field("timestamp", DataTypes.BIGINT( )) .field("temperature", DataTypes.DOUBLE( )) ) .createTemporaryTable("inputTable");
Kafka:
tableEnv.connect( new Kafka( ) .version("0.11") .topic("sensor") .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092")) .withFormat(new Csv( )) .withSchema(new Schema( ) .field("id", DataTypes.STRING( )) .field("timestamp", DataTypes.BIGINT( )) .field("temperature", DataTypes.DOUBLE( )) ) .createTemporaryTable("kafkaInputTable");
Write SQL DDL statements:
String sinkDDL = "create table dataTable (" + " id varchar(20) not null, " + " ts bigint, " + " temperature double" ) with (" + " 'connector.type' = 'filesystem', " + " 'connector.path' = '/sensor.txt', " + " 'format.type' = 'csv')"; tableEnv.sqlUpdate(sinkDDL);
It also supports ElasticSearch, MySQL, HBase and Hive
Specify time field & time semantics:
-
Specified when converting DataStream to Table
// Event time. Specify the watermark delay time in DataStream .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(1)) { @Override public long extractTimestamp(SensorReading element) { return element.getTimestamp( ) * 1000L; } }); Table sensorTable = tableEnv.fromDataStream(dataStream, "id, temperature, timestamp.rowtime"); //Processing time, add a field as the time field Table sensorTable = tableEnv.fromDataStream(dataStream, "id, temperature, timestamp, pt.proctime");
-
Specified when defining Table Schema table structure
// Event time tableEnv.connect( new FileSystem( ).path("sensor.txt")) .withFormat(new Csv( )) .withSchema(new Schema( ) .field("id", DataTypes.STRING( )) .field("timestamp", DataTypes.BIGINT( )) .rowtime( new Rowtime( ) .timestampsFromField("timestamp") .watermarksPeriodicBounded(1000) ) .field("temperature", DataTypes.DOUBLE( )) ) .createTemporaryTable("inputTable"); // processing time tableEnv.connect( new FileSystem( ).path("..\\sensor.txt")) .withFormat(new Csv( )) .withSchema(new Schema( ) .field("id", DataTypes.STRING( )) .field("timestamp", DataTypes.BIGINT( )) .field("temperature", DataTypes.DOUBLE( )) .field("pt", DataTypes.TIMESTAMP(3)) .proctime( ) ) .createTemporaryTable("inputTable");
-
Specified in the DDL statement that writes SQL when creating the table
// Event time String sinkDDL = "create table dataTable (" + " id varchar(20) not null, " + " ts bigint, " + " temperature double, " + " rt AS TO_TIMESTAMP( FROM_UNIXTIME(ts) ), " + " watermark for rt as rt - interval '1' second" + ") with (" + " 'connector.type' = 'filesystem', " + " 'connector.path' = '/sensor.txt', " + " 'format.type' = 'csv')"; tableEnv.sqlUpdate(sinkDDL); // processing time String sinkDDL = "create table dataTable (" + " id varchar(20) not null, " + " ts bigint, " + " temperature double, " + " pt AS PROCTIME( ) " + ") with (" + " 'connector.type' = 'filesystem', " + " 'connector.path' = '/sensor.txt', " + " 'format.type' = 'csv')"; tableEnv.sqlUpdate(sinkDDL);
Output:
1. Convert Table to DataStream:
If there is no aggregation operation 𞓜 aggregation + window, the append mode is generally used (the calculation is carried out only when the window is closed, which is also the append mode)
Aggregation + non window, generally in withdrawal mode (with update operation)
-
append mode
DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable,Row.class);
-
Withdrawal mode
DataStream<Tuple2<Boolean, Row>> aggResultStream = tableEnv.toRetractStream(aggResultTable, Row.class); resultStream.print("result"); aggResultStream.print("aggResult");
2. Output to external system:
File:
// Building TableSink table information tableEnv.connect(new FileSystem( ).path("...\\resources\\out.txt")) .withFormat(new Csv( )) .withSchema(new Schema( ) .field("id", DataTypes.STRING( )) .field("temp", DataTypes.DOUBLE( )) ) .createTemporaryTable("outputTable"); // Output to TableSink resultSqlTable.insertInto("outputTable");
Update mode:
-
Append mode
Only insert
-
Retract mode
insert, delete
-
Update insert mode
With upsert
Can only be used when exporting to an external file system
Window:
-
scroll window
TUMBLE(time_attr, interval)
TUMBLE_PROCTIME(time_attr, interval)
TUMBLE_ROWTIME(time_attr, interval)
-
sliding window
HOP(time_attr, interval, interval)
HOP_PROCTIME(time_attr, interval, interval)
HOP_ROWTIME(time_attr, interval, interval)
Here, the parameters are opposite to the DataStream. The first interval is the window sliding step size, and the second interval is the window size
-
Session window
SESSION(time_attr, interval)
SESSION_PROCTIME(time_attr, interval)
SESSION_ROWTIME(time_attr, interval)
Other related functions:
TUMBLE_START(time_attr, interval)
HOP_START(time_attr, interval, interval)
SESSION_START(time_attr, interval)
HOP_ROWTIME(time_attr, interval, interval)
SESSION_ROWTIME(time_attr, interval)
Function:
System functions:
power(x,y)
upper("hello")
char_length("hello")
date '2020-06-14'
timestamp '2020-06-14'
to_ The default format of timestamp (from_unixtime (TS' format ') is' YYYY-MM-DD hh:mm:ss'
current_time
Function over:
partitionBy optional
oorderBy is required, and the specified field is a field with time semantics (an additional time field is generated when processing time)
preceding optional
Custom UDF:
- Inherit ScalarFunction
- Write eval method
Custom UDTF:
- Inherit tablefunction < out >
- Write eval method
In SQL, the field can be enclosed by single quotation marks
Profile:
-- Equivalent to implicit profiling split(id) as id select id, word from movie, lateral table(split(id))
Customize UDAF:
-
Inherit aggregatefunction < out, state >
STATE is an intermediate data type
-
Override the createAccumulator method
-
Override getValue method
-
Write the calculate method
Custom UDTAF
This can only be done by using TableAPI, not FlinkSQL
Commonly used to find topN
In fact, it is to group first and then aggregate the profile (this aggregate profile is to output the topn in each group)
- Inherit tableaggregatefunction < out, state >
- Override the createAccumulator method
- Write emitValue method
- Write the calculate method
Join:
The state must be used when join ing. You can specify the maximum time of the state
// 0 means the status will never be cleared tableEnv.getConfig( ).setIdleStateRetention(Duration.ofSeconds(10))
This maximum time refers to how long it takes to clear after the last use, similar to the concept of Session
The bottom layer is the timer:
Execute the task of clearing status in the timer. Whenever there is a data join: delete the current timer and re register the timer