DataStream API for Flink learning (python version)

💦 Today, let's learn about the more basic DataStream API in flink, which is used to process stream data. This article is mainly in the form of pyflink to explain to the small partners who are interested in the content of previous periods 👇:

💛 The APIs of this blog are all python. According to the different stages of stream data processing, go to the official pyflink document to find the corresponding python API and summarize it. If there are omissions, please correct them.

1. Install pyflink

Flink supports Python 3 6. 3.7 and 3.8, and flink1 After 11, it also supports windows system. You can install it as long as you run the command directly.

#Installation command
python3 -m pip install apache-flink -i

I installed it in ubuntu. Remember to install java8 or 11. The following interface will be successful.

2. DataStream API

DataStream API is an important interface for Flink framework to deal with unbounded data flow. As mentioned earlier, any complete Flink application should include the following three parts:

  • Data source.
  • Transformation.
  • DataSink.

2.1 data sources data input

  1. Read data from file
env.read_text_file(file_path: str, charset_name: str = 'UTF-8')

  1. Read data from Collection
env.from_collection(collection: List[Any], type_info: pyflink.common.typeinfo.TypeInformation = None)
  1. Custom data source
env.add_source(source_func: pyflink.datastream.functions.SourceFunction, source_name: str = 'Custom Source', type_info: pyflink.common.typeinfo.TypeInformation = None)
  1. It also supports other data sources, which are more common.

2.2 datastream conversion

After the Flink application generates the data source, it needs to calculate the elements on the data flow through a series of conversion operations according to the business requirements, so as to output the final result.

  1. map

Sometimes, we need to process each element in the data stream, such as converting a single text into a tuple, that is, the 1-to-1 conversion operation. At this time, we can complete the map conversion operation., output_type) 
#func – The MapFunction that is called for each element of the DataStream.
#output_type – The type information of the MapFunction output data.
#The transformed DataStream.
  1. flat_map

In some cases, it is necessary to generate multiple outputs for each element in the data stream, that is, the conversion operation of 1 to N. at this time, the flatMap operation can be used.

datastreamsource.flat_map(func, output_type) 
#func – The FlatMapFunction that is called for each element of the DataStream.
#output_type – The type information of output data.
#The transformed DataStream.
  1. fliter

Sometimes to filter out the expected data from the data stream, you need to filter the data stream, that is, use the filter conversion operation.

#func – The FilterFunction that is called for each element of the DataStream.
#The filtered DataStream.
  1. key_by

For different data flow elements, sometimes the data needs to be processed in parallel as the Key of the partition according to some field values. At this time, the keyBy conversion operation is required. It converts a DataStream type data stream into a KeyedStream data stream type

#key_selector – The KeySelector to be used for extracting the key for partitioning.
#key_type – The type information describing the key type.
#The DataStream with partitioned state(i.e. KeyedStream).
  1. reduce

For partitioned data streams, reduce the data. It is actually an aggregation operation that combines two input elements into one output element. It is an operation on the KeyedStream stream

#func – The ReduceFunction that is called for each element of the DataStream.
#The transformed DataStream.

For example:

ds = env.from_collection([(1, 'a'), (2, 'a'), (3, 'a'), (4, 'b'])
ds.key_by(lambda x: x[1]).reduce(lambda a, b: a[0] + b[0], b[1])
  1. union

In the flow operation scenario, sometimes multiple flows need to be merged, that is, multiple data flows need to be merged into one data flow. At this time, the union conversion operation can be used (up to 3 flows can be merged)

#Stream 1 merge 2, 3
#datastreamsource – The DataStream to union outputwith.
#Returns The DataStream.

  1. connect

In addition to union, you can also use connect to merge two data streams, and the data types of the two streams can be different.

#ds – The DataStream with which this stream will be connected.
#The ConnectedStreams.
  1. project
#dataStreamSource. The project (1, 0) method filters out two fields from the data source datastreamsource. The field indexes are 1 and 0 respectively. At this time, the columns are also reordered.
datastreamsource.project(*field_indexes: int) 
#field_indexes – The field indexes of the input tuples that are retained. The order of fields in the output tuple corresponds to the order of field indexes.
#The projected DataStream.
  1. partition_custom

partition_custom conversion can make partition rules according to their own needs. partitionCustom can only partition a single Key and does not support composite keys.

datastreamsource.partition_custom(partitioner, key_selector) 
#partitioner – The partitioner to assign partitions to keys.
#key_selector – The KeySelector with which the DataStream is partitioned.
#The partitioned DataStream.

  1. window conversion operation

Flink divides the unbounded data stream into multiple bounded data streams through the window mechanism, so as to carry out data statistical analysis on the bounded data stream. There are many conversion operations on the window, such as max for the maximum value of the window, sum for the sum of elements in the window, etc. When the built-in conversion operation in the window cannot meet the business requirements, you can customize the internal processing logic, that is, use the apply method to pass in a custom WindowFunction

#CountWindow divides datastream into several windows
datastreamsource.CountWindow(id: int)

2.3 DataSinks data output

When the data stream needs to output the calculation results after a series of transformations, the operator responsible for the output results is called Sink.

  1. sink_to
datastreamsource.sink_to(sink: pyflink.datastream.connectors.Sink) 
#Adds the given sink to this DataStream. Only streams with sinks added will be executed once the execute() method is called.

#sink – The user defined sink.
#The closed DataStream.
  1. add_sink
datastreamsource.add_sink(sink_func: pyflink.datastream.functions.SinkFunction) 
#Adds the given sink to this DataStream. Only streams with sinks added will be executed once the StreamExecutionEnvironment.execute() method is called.

#sink_func – The SinkFunction object.
#The closed DataStream.

3. DataSet

In the above part, we mainly describe the DataSource data source, DataStream conversion operation and DataSink data collection of streaming DataStream. Batch data is called DataSet in Flink. The processing of batch data is summarized as follows:

  • Data source: similar to DataStream
  • Conversion operation: refer to spark's batch api
  • Data sink: similar to DataStream

DataSet will not be described here.

4. References

PyDocs (pyflink official document)
Flink introduction and Practice
Kafka authoritative guide
Apache Flink must know
Getting started with Apache Flink zero Basics
Flink basics tutorial

Keywords: Python flink

Added by urgido on Thu, 17 Feb 2022 19:55:58 +0200