Table API for Flink learning (python version)

⛄ After learning some basic Datastream APIs yesterday, let's continue to learn the Table API and SQL on the upper layer of Datastream. Both APIs deal with relational data, which can reduce the development threshold of flink. Students who are interested in the contents of previous periods can refer to the following contents 👇:

💦 Table API and SQL can process data streams in a more intuitive way, such as selection, filtering, grouping, summation and multi table connection. They also support window operation. Let's start today's study!

1. Construction of development environment

We have introduced DataStream API in the previous chapter. Here is a brief introduction:

  • Prepare a machine with Java 8 or 11 and python(3.6,3.7,3.8)
  • Write command 👇
python3 -m pip install apache-flink

After the development environment of TableEnvironment is built, you can code Table API and SQL in IDEA.

2. Create TableEnvironment

To develop Table API and SQL, you must first declare the execution context as TableEnvironment, which is an interface with the following functions:

  • Create Table
  • Register Table as a temporary Table
  • Execute SQL query. For more details, please refer to SQL
  • Register user-defined (scalar, table valued, or aggregate) functions
  • Configuration job
  • Manage Python dependencies
  • Submit job execution

For example: TableEnvironment for creating streams and batches

from pyflink.table import EnvironmentSettings, TableEnvironment
#Create stream TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

# Create batch TableEnvironment
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)

3. Connect external data sources

Flink Table API and SQL can interact with external file system through Connector for reading stream batch data. External file system includes traditional relational database, K-V memory database, message queue and distributed file system.

3.1 files connected to the file system

# Environment configuration
t_env = TableEnvironment.create(
    environment_settings=EnvironmentSettings.in_batch_mode())
#Connect file system files
create table test01(
id int,
name string
)WITH(
'connector' = 'filesystem',
'format' = 'csv',
'path' = '/Users/liuxiaocong/data01.csv'
)

The content in WITH(...) refers to the configuration of external connectors. Different external connectors involve different configurations, and the required and optional configurations are different.

Explain: 'connector Type '=' filesystem 'indicates that the type of external connector is file system,' connector path’ =’/Users/liuxiaocong/data01.csv 'specifies the file path of the external file system connector,' format Type '='csv' indicates that the data format of the file system is CSV.

3.2 connection message queue kafka

create table test02(
id int,
name string
)WITH(
'connector. type' = 'kafka'
'connector.version'='0.11'
'connector. topic'='topic name'
'connector.properties.bootstrap. servers'='localhost:9092'
'connector.properties.group.id'='my group id'
'connector.startup-mode'='earliest-offset'
'format.type '='json')

Explain: 'connector Type '='kafka' indicates that the type of this external connector is Kafka, 'connector topic’ = 'topic_ 'name' specifies the Kafka message subject, 'connector properties. bootstrap. Servers' ='localhost:9092 'specify the address of Kafka service‘ format.type '='json' specifies that the data format is JSON.

3.3 connecting ElasticSearch (ES)

create table test02(
id int,
name string
)
WITH
(
'connector. type'='elasticsearch',
'connector.version'='7',
'connector. hosts'='http://hostl:9092;http://host2:9093',
'connector. index'='myusers',
'connector. document-type'='user',
'format. type'='json',
'update-mode'='append',
'connector. key-delimiter'='$',
'connector. key-null-literal'='n/a',
'connector. failure-handler'='fail',
'connector. flush-on-checkpoint'='true',
'connector. bulk-flush.max-actions'='42',
'connector.bulk-flush.max-size'='42 mb',
'connector.bulk-flush. interval'='60000',
'connector. bulk-flush.backoff.type'='disabled',
'connector.bulk-flush.backoff.max-retries'='3',
'connector, bulk-flush.backoff.delay'='30000',
'connector. connection-max-retry-timeout'='3'

Explanation: connector Type '='Elasticsearch' indicates that the type of this external connector is Elasticsearch‘ connector.hosts’=' http://host1:9092;http://host2:9093 ’Specify the address of the Elasticsearch service‘ format.type '=' JSON 'specifies that the data format is JSON.

3.4 Table API creates tables based on SQL

I don't know if you have any doubts after reading the above. It's all SQL. What's the relationship between SQL and Table? This is a good question. execute_sql can execute DDL statements related to Create Table to obtain Table.

For example:

from pyflink.table import *

# Environment configuration
t_env = TableEnvironment.create(
    environment_settings=EnvironmentSettings.in_batch_mode())

# Register the Orders table in the table environment
source_data_path = "/Users/liuxiaocong/test/"
source_ddl = f"""
        create table Orders(
            a VARCHAR,
            b BIGINT,
            c BIGINT,
            rowtime TIMESTAMP(3),
            WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND
        ) with (
            'connector' = 'filesystem',
            'format' = 'csv',
            'path' = '{source_data_path}'
        )
        """
#execute_sql table creation
t_env.execute_sql(source_ddl)
#Get Orders table
orders = t_env.from_path("Orders") 

4. Table operation

Some common operations are described below. All of the following operations are written in python and support Streaming and Batch

4.1 from_path

Similar to the FROM clause of an SQL query, a scan of a registered table is performed.

orders = t_env.from_path("Orders")

4.2 from_elements

It is similar to the VALUES clause in SQL query. Generates an inline table based on the rows provided.

#1. Don't specify the type, guess for yourself
table = t_env.from_elements([(1, 'ABC'), (2, 'ABCDE')])
#2. Specify type
table = t_env.from_elements([(1, 'ABC'), (2, 'ABCDE')],
  schema=DataTypes.Row([DataTypes.FIELD('id', DataTypes.DECIMAL(10, 2)),
                        DataTypes.FIELD('name', DataTypes.STRING())]))

4.3 select

Similar to the SELECT clause of SQL. Perform a selection operation.

#1.select basic usage
orders = t_env.from_path("Orders")
result = orders.select(orders.a, orders.c.alias('d'))
#2. You can also select*
from pyflink.table.expressions import col
result = orders.select(col("*"))

4.4 as

Rename field

orders = t_env.from_path("Orders")
result = orders.alias("x, y, z, t")

4.5 where and filter

Similar to the WHERE clause of SQL. Filter out rows that do not validate the filter predicate.

#1.where
orders = t_env.from_path("Orders")
result = orders.where(orders.a == 'red')
#2.filter
orders = t_env.from_path("Orders")
result = orders.filter(orders.a == 'red')

4.6 add_columns

Perform field addition. If the added field already exists, an exception will be thrown.

from pyflink.table.expressions import concat
orders = t_env.from_path("Orders")
result = orders.add_columns(concat(orders.c, 'sunny'))

4.7 add_or_replace_columns

Perform field addition. If the added column name is the same as the existing column name, the existing field will be replaced. In addition, if there are duplicate field names in the added field, the last field will be used.

from pyflink.table.expressions import concat
orders = t_env.from_path("Orders")
result = orders.add_or_replace_columns(concat(orders.c, 'sunny').alias('desc'))

4.8 drop_column

Delete a column

orders = t_env.from_path("Orders")
result = orders.drop_columns(orders.b, orders.c)

4.9 rename_columns

Perform the field rename operation. The field expression should be an alias expression and can only be renamed if the field already exists.

orders = t_env.from_path("Orders")
result = orders.rename_columns(orders.b.alias('b2'), orders.c.alias('c2'))

4.10 group_by

It is similar to the GROUP BY clause of SQL. Use the adjoint key to group the rows according to the aggregation group.

orders = t_env.from_path("Orders")
result = orders.group_by(orders.a).select(orders.a, orders.b.sum.alias('d'))

4.11 Window group_by

Use the grouping window to group and aggregate tables in combination with single or multiple grouping keys.

from pyflink.table.window import Tumble
from pyflink.table.expressions import lit, col

orders = t_env.from_path("Orders")
result = orders.window(Tumble.over(lit(5).minutes).on(orders.rowtime).alias("w")) \ 
               .group_by(orders.a, col('w')) \
               .select(orders.a, col('w').start, col('w').end, orders.b.sum.alias('d'))

4.12 over_window

Similar to the OVER clause of SQL, all aggregations must be defined in the same window, such as the same partition, sort and range. At present, only the window from predicting to the current row range (unbounded or bounded) is supported. FOLLOWING scoped windows are not yet supported. The ORDER BY operation must specify a single time attribute.

from pyflink.table.window import Over
from pyflink.table.expressions import col, UNBOUNDED_RANGE, CURRENT_RANGE

orders = t_env.from_path("Orders")
result = orders.over_window(Over.partition_by(orders.a).order_by(orders.rowtime)
                            .preceding(UNBOUNDED_RANGE).following(CURRENT_RANGE)
                            .alias("w")) \
               .select(orders.a, orders.b.avg.over(col('w')), orders.b.max.over(col('w')), orders.b.min.over(col('w')))

4.13 distinct

Similar to SQL DISTINCT aggregate clauses, such as COUNT(DISTINCT a). Aggregate functions declared by Distinct aggregate (built-in or user-defined) apply only to input values that are different from each other

#1.distinct is used with aggregate function
from pyflink.table.expressions import col, lit, UNBOUNDED_RANGE
orders = t_env.from_path("Orders")
# Aggregation of different (different from each other, de duplication) after grouping by attributes
group_by_distinct_result = orders.group_by(orders.a) \
                                 .select(orders.a, orders.b.sum.distinct.alias('d'))
# Different (different from each other, de duplication) aggregation after grouping by attribute and time window
group_by_window_distinct_result = orders.window(
    Tumble.over(lit(5).minutes).on(orders.rowtime).alias("w")).group_by(orders.a, col('w')) \
    .select(orders.a, orders.b.sum.distinct.alias('d'))
# Different (different from each other, de duplication) aggregation on over window
result = orders.over_window(Over
                       .partition_by(orders.a)
                       .order_by(orders.rowtime)
                       .preceding(UNBOUNDED_RANGE)
                       .alias("w")) \
                       .select(orders.a, orders.b.avg.distinct.over(col('w')), orders.b.max.over(col('w')), orders.b.min.over(col('w')))

#2. Separate weight removal
orders = t_env.from_path("Orders")
result = orders.distinct()

4.14 Inner Join

It is similar to the join clause of SQL. Associate two tables. Two tables must have different field names, and at least one join equality join predicate must be defined through the join operator or using the where or filter operator.

from pyflink.table.expressions import col
left = t_env.from_path("Source1").select(col('a'), col('b'), col('c'))
right = t_env.from_path("Source2").select(col('d'), col('e'), col('f'))
result = left.join(right).where(left.a == right.d).select(left.a, left.b, right.e)

4.15 Outer Join

Similar to SQL LEFT/RIGHT/FULL OUTER JOIN clause. Associate two tables. Two tables must have different field names, and at least one equality join predicate must be defined.

from pyflink.table.expressions import col
left = t_env.from_path("Source1").select(col('a'), col('b'), col('c'))
right = t_env.from_path("Source2").select(col('d'), col('e'), col('f'))
# Left connection
left_outer_result = left.left_outer_join(right, left.a == right.d).select(left.a, left.b, right.e)
#Right link
right_outer_result = left.right_outer_join(right, left.a == right.d).select(left.a, left.b, right.e)
#Full connection
full_outer_result = left.full_outer_join(right, left.a == right.d).select(left.a, left.b, right.e)

5. Summary

This chapter was originally intended to write the Table API of flink together with SQL, but it is found that the content of Table API alone is enough, and the time concept, flow conversion, window operation, conversion with pandas in this part of Table API have not been described, which will be supplemented in the future.

6. References

PyDocs (pyflink official document)
Flink introduction and Practice
Kafka authoritative guide
Apache Flink must know
Getting started with Apache Flink zero Basics
Flink basics tutorial

Keywords: Python flink

Added by nankoweap on Fri, 18 Feb 2022 17:15:19 +0200