⛄ After learning some basic Datastream APIs yesterday, let's continue to learn the Table API and SQL on the upper layer of Datastream. Both APIs deal with relational data, which can reduce the development threshold of flink. Students who are interested in the contents of previous periods can refer to the following contents 👇:
- hadoop topics: hadoop series articles.
- spark topics: spark series.
- Flynk topics: Flink series.
💦 Table API and SQL can process data streams in a more intuitive way, such as selection, filtering, grouping, summation and multi table connection. They also support window operation. Let's start today's study!
1. Construction of development environment
We have introduced DataStream API in the previous chapter. Here is a brief introduction:
- Prepare a machine with Java 8 or 11 and python(3.6,3.7,3.8)
- Write command 👇
python3 -m pip install apache-flink
After the development environment of TableEnvironment is built, you can code Table API and SQL in IDEA.
2. Create TableEnvironment
To develop Table API and SQL, you must first declare the execution context as TableEnvironment, which is an interface with the following functions:
- Create Table
- Register Table as a temporary Table
- Execute SQL query. For more details, please refer to SQL
- Register user-defined (scalar, table valued, or aggregate) functions
- Configuration job
- Manage Python dependencies
- Submit job execution
For example: TableEnvironment for creating streams and batches
from pyflink.table import EnvironmentSettings, TableEnvironment #Create stream TableEnvironment env_settings = EnvironmentSettings.in_streaming_mode() table_env = TableEnvironment.create(env_settings) # Create batch TableEnvironment env_settings = EnvironmentSettings.in_batch_mode() table_env = TableEnvironment.create(env_settings)
3. Connect external data sources
Flink Table API and SQL can interact with external file system through Connector for reading stream batch data. External file system includes traditional relational database, K-V memory database, message queue and distributed file system.
3.1 files connected to the file system
# Environment configuration t_env = TableEnvironment.create( environment_settings=EnvironmentSettings.in_batch_mode()) #Connect file system files create table test01( id int, name string )WITH( 'connector' = 'filesystem', 'format' = 'csv', 'path' = '/Users/liuxiaocong/data01.csv' )
The content in WITH(...) refers to the configuration of external connectors. Different external connectors involve different configurations, and the required and optional configurations are different.
Explain: 'connector Type '=' filesystem 'indicates that the type of external connector is file system,' connector path’ =’/Users/liuxiaocong/data01.csv 'specifies the file path of the external file system connector,' format Type '='csv' indicates that the data format of the file system is CSV.
3.2 connection message queue kafka
create table test02( id int, name string )WITH( 'connector. type' = 'kafka' 'connector.version'='0.11' 'connector. topic'='topic name' 'connector.properties.bootstrap. servers'='localhost:9092' 'connector.properties.group.id'='my group id' 'connector.startup-mode'='earliest-offset' 'format.type '='json')
Explain: 'connector Type '='kafka' indicates that the type of this external connector is Kafka, 'connector topic’ = 'topic_ 'name' specifies the Kafka message subject, 'connector properties. bootstrap. Servers' ='localhost:9092 'specify the address of Kafka service‘ format.type '='json' specifies that the data format is JSON.
3.3 connecting ElasticSearch (ES)
create table test02( id int, name string ) WITH ( 'connector. type'='elasticsearch', 'connector.version'='7', 'connector. hosts'='http://hostl:9092;http://host2:9093', 'connector. index'='myusers', 'connector. document-type'='user', 'format. type'='json', 'update-mode'='append', 'connector. key-delimiter'='$', 'connector. key-null-literal'='n/a', 'connector. failure-handler'='fail', 'connector. flush-on-checkpoint'='true', 'connector. bulk-flush.max-actions'='42', 'connector.bulk-flush.max-size'='42 mb', 'connector.bulk-flush. interval'='60000', 'connector. bulk-flush.backoff.type'='disabled', 'connector.bulk-flush.backoff.max-retries'='3', 'connector, bulk-flush.backoff.delay'='30000', 'connector. connection-max-retry-timeout'='3'
Explanation: connector Type '='Elasticsearch' indicates that the type of this external connector is Elasticsearch‘ connector.hosts’=' http://host1:9092;http://host2:9093 ’Specify the address of the Elasticsearch service‘ format.type '=' JSON 'specifies that the data format is JSON.
3.4 Table API creates tables based on SQL
I don't know if you have any doubts after reading the above. It's all SQL. What's the relationship between SQL and Table? This is a good question. execute_sql can execute DDL statements related to Create Table to obtain Table.
For example:
from pyflink.table import * # Environment configuration t_env = TableEnvironment.create( environment_settings=EnvironmentSettings.in_batch_mode()) # Register the Orders table in the table environment source_data_path = "/Users/liuxiaocong/test/" source_ddl = f""" create table Orders( a VARCHAR, b BIGINT, c BIGINT, rowtime TIMESTAMP(3), WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND ) with ( 'connector' = 'filesystem', 'format' = 'csv', 'path' = '{source_data_path}' ) """ #execute_sql table creation t_env.execute_sql(source_ddl) #Get Orders table orders = t_env.from_path("Orders")
4. Table operation
Some common operations are described below. All of the following operations are written in python and support Streaming and Batch
4.1 from_path
Similar to the FROM clause of an SQL query, a scan of a registered table is performed.
orders = t_env.from_path("Orders")
4.2 from_elements
It is similar to the VALUES clause in SQL query. Generates an inline table based on the rows provided.
#1. Don't specify the type, guess for yourself table = t_env.from_elements([(1, 'ABC'), (2, 'ABCDE')]) #2. Specify type table = t_env.from_elements([(1, 'ABC'), (2, 'ABCDE')], schema=DataTypes.Row([DataTypes.FIELD('id', DataTypes.DECIMAL(10, 2)), DataTypes.FIELD('name', DataTypes.STRING())]))
4.3 select
Similar to the SELECT clause of SQL. Perform a selection operation.
#1.select basic usage orders = t_env.from_path("Orders") result = orders.select(orders.a, orders.c.alias('d')) #2. You can also select* from pyflink.table.expressions import col result = orders.select(col("*"))
4.4 as
Rename field
orders = t_env.from_path("Orders") result = orders.alias("x, y, z, t")
4.5 where and filter
Similar to the WHERE clause of SQL. Filter out rows that do not validate the filter predicate.
#1.where orders = t_env.from_path("Orders") result = orders.where(orders.a == 'red') #2.filter orders = t_env.from_path("Orders") result = orders.filter(orders.a == 'red')
4.6 add_columns
Perform field addition. If the added field already exists, an exception will be thrown.
from pyflink.table.expressions import concat orders = t_env.from_path("Orders") result = orders.add_columns(concat(orders.c, 'sunny'))
4.7 add_or_replace_columns
Perform field addition. If the added column name is the same as the existing column name, the existing field will be replaced. In addition, if there are duplicate field names in the added field, the last field will be used.
from pyflink.table.expressions import concat orders = t_env.from_path("Orders") result = orders.add_or_replace_columns(concat(orders.c, 'sunny').alias('desc'))
4.8 drop_column
Delete a column
orders = t_env.from_path("Orders") result = orders.drop_columns(orders.b, orders.c)
4.9 rename_columns
Perform the field rename operation. The field expression should be an alias expression and can only be renamed if the field already exists.
orders = t_env.from_path("Orders") result = orders.rename_columns(orders.b.alias('b2'), orders.c.alias('c2'))
4.10 group_by
It is similar to the GROUP BY clause of SQL. Use the adjoint key to group the rows according to the aggregation group.
orders = t_env.from_path("Orders") result = orders.group_by(orders.a).select(orders.a, orders.b.sum.alias('d'))
4.11 Window group_by
Use the grouping window to group and aggregate tables in combination with single or multiple grouping keys.
from pyflink.table.window import Tumble from pyflink.table.expressions import lit, col orders = t_env.from_path("Orders") result = orders.window(Tumble.over(lit(5).minutes).on(orders.rowtime).alias("w")) \ .group_by(orders.a, col('w')) \ .select(orders.a, col('w').start, col('w').end, orders.b.sum.alias('d'))
4.12 over_window
Similar to the OVER clause of SQL, all aggregations must be defined in the same window, such as the same partition, sort and range. At present, only the window from predicting to the current row range (unbounded or bounded) is supported. FOLLOWING scoped windows are not yet supported. The ORDER BY operation must specify a single time attribute.
from pyflink.table.window import Over from pyflink.table.expressions import col, UNBOUNDED_RANGE, CURRENT_RANGE orders = t_env.from_path("Orders") result = orders.over_window(Over.partition_by(orders.a).order_by(orders.rowtime) .preceding(UNBOUNDED_RANGE).following(CURRENT_RANGE) .alias("w")) \ .select(orders.a, orders.b.avg.over(col('w')), orders.b.max.over(col('w')), orders.b.min.over(col('w')))
4.13 distinct
Similar to SQL DISTINCT aggregate clauses, such as COUNT(DISTINCT a). Aggregate functions declared by Distinct aggregate (built-in or user-defined) apply only to input values that are different from each other
#1.distinct is used with aggregate function from pyflink.table.expressions import col, lit, UNBOUNDED_RANGE orders = t_env.from_path("Orders") # Aggregation of different (different from each other, de duplication) after grouping by attributes group_by_distinct_result = orders.group_by(orders.a) \ .select(orders.a, orders.b.sum.distinct.alias('d')) # Different (different from each other, de duplication) aggregation after grouping by attribute and time window group_by_window_distinct_result = orders.window( Tumble.over(lit(5).minutes).on(orders.rowtime).alias("w")).group_by(orders.a, col('w')) \ .select(orders.a, orders.b.sum.distinct.alias('d')) # Different (different from each other, de duplication) aggregation on over window result = orders.over_window(Over .partition_by(orders.a) .order_by(orders.rowtime) .preceding(UNBOUNDED_RANGE) .alias("w")) \ .select(orders.a, orders.b.avg.distinct.over(col('w')), orders.b.max.over(col('w')), orders.b.min.over(col('w'))) #2. Separate weight removal orders = t_env.from_path("Orders") result = orders.distinct()
4.14 Inner Join
It is similar to the join clause of SQL. Associate two tables. Two tables must have different field names, and at least one join equality join predicate must be defined through the join operator or using the where or filter operator.
from pyflink.table.expressions import col left = t_env.from_path("Source1").select(col('a'), col('b'), col('c')) right = t_env.from_path("Source2").select(col('d'), col('e'), col('f')) result = left.join(right).where(left.a == right.d).select(left.a, left.b, right.e)
4.15 Outer Join
Similar to SQL LEFT/RIGHT/FULL OUTER JOIN clause. Associate two tables. Two tables must have different field names, and at least one equality join predicate must be defined.
from pyflink.table.expressions import col left = t_env.from_path("Source1").select(col('a'), col('b'), col('c')) right = t_env.from_path("Source2").select(col('d'), col('e'), col('f')) # Left connection left_outer_result = left.left_outer_join(right, left.a == right.d).select(left.a, left.b, right.e) #Right link right_outer_result = left.right_outer_join(right, left.a == right.d).select(left.a, left.b, right.e) #Full connection full_outer_result = left.full_outer_join(right, left.a == right.d).select(left.a, left.b, right.e)
5. Summary
This chapter was originally intended to write the Table API of flink together with SQL, but it is found that the content of Table API alone is enough, and the time concept, flow conversion, window operation, conversion with pandas in this part of Table API have not been described, which will be supplemented in the future.
6. References
PyDocs (pyflink official document)
Flink introduction and Practice
Kafka authoritative guide
Apache Flink must know
Getting started with Apache Flink zero Basics
Flink basics tutorial