Apache Flink learning notes Window in streaming processing

Window concept

In most scenarios, the data streams we need to count are unbounded, so we can't wait until the whole data stream is terminated. Usually, we only need to make statistical analysis on the data within a certain time range or quantity range: for example, count the hits of all goods in the past hour every five minutes; Or after every 1000 clicks, count the proportion of each product's click through rate. In Flink, we use window to realize this kind of function. According to different statistical dimensions, the windows in Flink can be divided into time windows and count windows.

window is a way to cut an infinite stream into a finite stream. It will distribute the stream data to bucket s of finite size for analysis.

Temporal semantics

In Flink, if the boundary is divided by time period, time is an extremely important field.

There are three types of time in Flink, as shown in the figure below:

  • Event Time: is the time when the event was created. It is usually described by the timestamp in the event. For example, in the collected log data, each log will record its generation time, and Flink accesses the event timestamp through the timestamp allocator.
  • Ingestion Time: the time when data enters Flink and datasource
  • Processing Time: refers to the local system time of each operator executing time-based operation. It is related to the machine. The default time attribute is Processing Time.

We tend to care more about Event Time

Window

Official explanation: streaming computing is a data processing engine designed to process infinite data sets, which refers to a growing essentially infinite data set, and window is a means to cut infinite data into finite blocks for processing.

Therefore, window is the core of infinite data stream processing. Window splits an infinite stream into "buckets" of limited size. We can do calculation operations on these buckets.

And the window opening operation can only be carried out after keyby.

There are two ways to divide windows:

Time Window

  • Scroll time window
  • Sliding time window
  • Session window
    Count Window
  • Scroll count window
  • Sliding counting window

Time Windows

Windows time is used to aggregate data in the dimension of time.

Rolling window

Tumbling windows refers to windows that do not overlap each other. For example, if the number of commodity hits in the past hour is counted every hour, then a day can only be divided into 24 windows, and there is no overlap between each window, as follows:

Illustration:

  • user1, user2 and user3 represent different groups.
  • The rolling window allocator allocates each element to a window with a specified window size. The rolling window has a fixed size and will not overlap.

TumblingTimeWindow API

Assuming that the size of the scrolling window is 15s, the sliding step is also 15s, that is, calculate and output every 20s, and then slide to the next window.

input data:

sensor_1,1547718147,12
sensor_6,1547718201,15.0
sensor_6,1547718202,15.0
sensor_6,1547718203,6.7
sensor_8,1547718204,38.0
sensor_8,1547718201,38.0
sensor_8,1547718202,15.4
sensor_1,1547718203,6.7
sensor_6,1547718204,38.1
import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class TransFormTimeWindow {
    public static void main(String[] args) throws Exception {
        // Create execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // Read data from nc
        DataStreamSource<String> stringDataStreamSource = env.socketTextStream("localhost", 7777);

        // map operation
        SingleOutputStreamOperator<SensorReading2> dataStream = stringDataStreamSource.map((String value) -> {
            String[] fields = value.split(",");
            return new SensorReading2(fields[0], new Long(fields[1]), new Double(fields[2]));
        }).setParallelism(1);

        KeyedStream<SensorReading2, String> keyedStream = dataStream.
                keyBy((SensorReading2 value) -> {
            return value.getId();
        });

        keyedStream
        		// The effect is a rolling time window. Based on the Processing Time, each group will count once every 15s and output the results
                .window(TumblingProcessingTimeWindows.of(Time.seconds(15)))
        		.aggregate(new AggregateFunction<SensorReading2, Integer, Integer>() {
            /*
             * @param <IN> The type of the values that are aggregated (input values)
             * @param <ACC> The type of the accumulator (intermediate aggregate state).
             * @param <OUT> The type of the aggregated result
             */

            /*
            Create an accumulator and give an initial value
             */
            @Override
            public Integer createAccumulator() {
                return 0;
            }

            /*
            Aggregate data
             */
            @Override
            public Integer add(SensorReading2 value, Integer accumulator) {
                return accumulator + 1;
            }

            /*
            Accumulator as output
             */
            @Override
            public Integer getResult(Integer accumulator) {
                return accumulator;
            }

            /*
            It is generally not used for scrolling and sliding windows, but for session windows
             */
            @Override
            public Integer merge(Integer a, Integer b) {
                return null;
            }
        }).print();
        env.execute();
    }

    public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {
        private static final long serialVersionUID = -5224012503623543819L;

        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception{
            // participle
            String[] words = value.split(" ");
            for (String word : words){
                out.collect(new Tuple2<String,Integer>(word,1));
            }
        }
    }

}

Sliding Windows

Sliding window is a more generalized form of fixed window. Sliding window is composed of fixed window length and sliding interval. However, the sliding step of the scrolling window is equal to the window length. In general, the sliding step should be less than the window size.

Illustration:

  • It is more important for window to understand the initial state of sliding window
  • If it is a time sliding window, when the time of sliding step is over, statistics should be made in the window. For example, the sliding window is 15s and the sliding step is 5S, that is, the statistics will start after 5S of the initial state, but what if the window does not reach 15s? At this time, the initial state is regarded as a 15s window. At this time, after another 5S, that is, after a total of 10s, another statistics will be done, but it is a statistics done within 10s. Similarly, it is regarded as 15s. After another 5S, a 15s window will appear at this time. From then on, the data within 15s will be completely counted in 5S, and there may be errors in the first few times.
  • Similarly, if the sliding window is counted, for example, the sliding window has 10 data and the sliding step size is 5 data. At this time, once the data in the initial state reaches 5 data, statistics will be made once, but if the window has not reached 10 data, it will be regarded as 10 data. Then start sliding. Every 5 data, statistics will be made in 10 data.

SlidingTimeWindow API

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.benchmark.SensorReading2;

public class TransFormSlidingTimeWindow {
    public static void main(String[] args)throws Exception {
        // Create execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // Read data from nc
        DataStreamSource<String> stringDataStreamSource = env.socketTextStream("localhost", 7777);

        // map operation
        SingleOutputStreamOperator<SensorReading2> dataStream = stringDataStreamSource.map((String value) -> {
            String[] fields = value.split(",");
            return new SensorReading2(fields[0], new Long(fields[1]), new Double(fields[2]));
        }).setParallelism(2);

        KeyedStream<SensorReading2, String> keyedStream = dataStream.keyBy((SensorReading2 value) -> {
            return value.getId();
        });

        keyedStream
        		// Make statistics every 15s, and remember to continue sliding even if the initial state is less than 30s
                .window(SlidingProcessingTimeWindows.of(Time.seconds(30),Time.seconds(15)))
                // The incremental aggregation function counts the number of data
                .aggregate(new AggregateFunction<SensorReading2, Integer, Integer>() {
                    /*
                     * @param <IN> The type of the values that are aggregated (input values)
                     * @param <ACC> The type of the accumulator (intermediate aggregate state).
                     * @param <OUT> The type of the aggregated result
                     */

                    /*
                    Create an accumulator and give an initial value
                     */
                    @Override
                    public Integer createAccumulator() {
                        return 0;
                    }

                    /*
                    Aggregate data
                     */
                    @Override
                    public Integer add(SensorReading2 value, Integer accumulator) {
                        return accumulator + 1;
                    }

                    /*
                    Accumulator as output
                     */
                    @Override
                    public Integer getResult(Integer accumulator) {
                        return accumulator;
                    }

                    /*
                    It is generally not used for scrolling and sliding windows, but for session windows
                     */
                    @Override
                    public Integer merge(Integer a, Integer b) {
                        return a + b;
                    }
                }).print();
        env.execute();
    }
}

Session window


The session window allocator groups the elements through the session activity. Compared with the rolling window and sliding window, the session window will not have overlapping and fixed start time and end time. On the contrary, when it no longer receives elements within a fixed time period, that is, the inactive interval is generated, the window will be closed. A session window is configured through a session interval, which defines the length of the inactive cycle. When the inactive cycle is generated, the current session will be closed and subsequent elements will be allocated to the new session window.

Generally speaking, it is to set a time interval. If there is no data in this interval, another window will be opened

SessionWindow API

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.benchmark.SensorReading2;

public class TransFormTimeSessionWindow {
    public static void main(String[] args)throws Exception {
        // Create execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // Read data from nc
        DataStreamSource<String> stringDataStreamSource = env.socketTextStream("localhost", 7777);

        // map operation
        SingleOutputStreamOperator<SensorReading2> dataStream = stringDataStreamSource.map((String value) -> {
            String[] fields = value.split(",");
            return new SensorReading2(fields[0], new Long(fields[1]), new Double(fields[2]));
        }).setParallelism(2);

        KeyedStream<SensorReading2, String> keyedStream = dataStream.keyBy((SensorReading2 value) -> {
            return value.getId();
        });

        keyedStream
        		// If there is no data after 15s, calculate it once
                .window(ProcessingTimeSessionWindows.withGap(Time.seconds(15)))
                // The incremental aggregation function counts the number of data
                .aggregate(new AggregateFunction<SensorReading2, Integer, Integer>() {
                    /*
                     * @param <IN> The type of the values that are aggregated (input values)
                     * @param <ACC> The type of the accumulator (intermediate aggregate state).
                     * @param <OUT> The type of the aggregated result
                     */

                    /*
                    Create an accumulator and give an initial value
                     */
                    @Override
                    public Integer createAccumulator() {
                        return 0;
                    }

                    /*
                    Aggregate data
                     */
                    @Override
                    public Integer add(SensorReading2 value, Integer accumulator) {
                        return accumulator + 1;
                    }

                    /*
                    Accumulator as output
                     */
                    @Override
                    public Integer getResult(Integer accumulator) {
                        return accumulator;
                    }

                    /*
                    It is generally not used for scrolling and sliding windows, but for session windows
                     */
                    @Override
                    public Integer merge(Integer a, Integer b) {
                        return a + b;
                    }
                }).print();
        env.execute();
    }
}

CountWindow

CountWindow triggers execution according to the number of the same key elements in the window. During execution, only the result corresponding to the key whose number of elements reaches the size of the window is calculated.

Rolling window

CountWindow window_size refers to the number of elements with the same key, not the total number of all elements entered. For example, if the size of the scrolling window is 10, a data stream with key 1 will be counted once when it reaches 10, and a data stream with key 2 will not be counted when it reaches 5.

Tumbling CountWindow API

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.benchmark.SensorReading2;

public class TransFormTumblingCountWindow {
    public static void main(String[] args) throws Exception{
        // Create execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // Read data from nc
        DataStreamSource<String> stringDataStreamSource = env.socketTextStream("localhost", 7777);

        // map operation
        SingleOutputStreamOperator<SensorReading2> dataStream = stringDataStreamSource.map((String value) -> {
            String[] fields = value.split(",");
            return new SensorReading2(fields[0], new Long(fields[1]), new Double(fields[2]));
        }).setParallelism(2);

        KeyedStream<SensorReading2, String> keyedStream = dataStream.keyBy((SensorReading2 value) -> {
            return value.getId();
        });

        keyedStream
        		// Statistics are made every five, so the output result is always 5
                .countWindow(5)
                // The incremental aggregation function counts the number of data
                .aggregate(new AggregateFunction<SensorReading2, Integer, Integer>() {
                    /*
                     * @param <IN> The type of the values that are aggregated (input values)
                     * @param <ACC> The type of the accumulator (intermediate aggregate state).
                     * @param <OUT> The type of the aggregated result
                     */

                    /*
                    Create an accumulator and give an initial value
                     */
                    @Override
                    public Integer createAccumulator() {
                        return 0;
                    }

                    /*
                    Aggregate data
                     */
                    @Override
                    public Integer add(SensorReading2 value, Integer accumulator) {
                        return accumulator + 1;
                    }

                    /*
                    Accumulator as output
                     */
                    @Override
                    public Integer getResult(Integer accumulator) {
                        return accumulator;
                    }

                    /*
                    It is generally not used for scrolling and sliding windows, but for session windows
                     */
                    @Override
                    public Integer merge(Integer a, Integer b) {
                        return a + b;
                    }
                }).print();
        env.execute();
    }
}

Sliding Windows

It is also the operation of window length and sliding window: the window length is 10 and the sliding length is 5. Statistics are made every 5, but the sliding length of 5 data in the initial state does not reach the window length of 10 data. Therefore, the window size in the initial state can be regarded as a window of 10 data, and the rest positions are empty.

As shown in the following figure, if there are only five data in window 1, make statistics on the five data. After sliding, it can reach the size of 10 data. At this time, it is time to make statistics on 10 data.

Sliding CountWindow API

Here is a function to calculate the number of data.

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.benchmark.SensorReading2;

public class TransFormSlidingCountWindow {
    public static void main(String[] args)throws Exception {
        // Create execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // Read data from nc
        DataStreamSource<String> stringDataStreamSource = env.socketTextStream("localhost", 7777);

        // map operation
        SingleOutputStreamOperator<SensorReading2> dataStream = stringDataStreamSource.map((String value) -> {
            String[] fields = value.split(",");
            return new SensorReading2(fields[0], new Long(fields[1]), new Double(fields[2]));
        }).setParallelism(2);

        KeyedStream<SensorReading2, String> keyedStream = dataStream.keyBy((SensorReading2 value) -> {
            return value.getId();
        });

        keyedStream
                // The initial state is 5. Calculate and output these 5 data, and then there are 10 in each window to calculate the poetry data
                .countWindow(10,5)
                // The incremental aggregation function counts the number of data
                .aggregate(new AggregateFunction<SensorReading2, Integer, Integer>() {
                    /*
                     * @param <IN> The type of the values that are aggregated (input values)
                     * @param <ACC> The type of the accumulator (intermediate aggregate state).
                     * @param <OUT> The type of the aggregated result
                     */

                    /*
                    Create an accumulator and give an initial value
                     */
                    @Override
                    public Integer createAccumulator() {
                        return 0;
                    }

                    /*
                    Aggregate data
                     */
                    @Override
                    public Integer add(SensorReading2 value, Integer accumulator) {
                        return accumulator + 1;
                    }

                    /*
                    Accumulator as output
                     */
                    @Override
                    public Integer getResult(Integer accumulator) {
                        return accumulator;
                    }

                    /*
                    It is generally not used for scrolling and sliding windows, but for session windows
                     */
                    @Override
                    public Integer merge(Integer a, Integer b) {
                        return a + b;
                    }
                }).print();
        env.execute();
    }
}

window function

After window processing, aggregation is also required, as shown below:

It can be divided into two categories

incremental aggregation functions

  • Each data will be calculated when it comes, keeping a simple state
  • ReduceFunction, AggregateFunction

In addition, you can also directly use max, maxby, sum, etc

full window functions

Mainly use the apply function

  • First collect all the data in the window, and then traverse all the data when calculating
  • ProcessWindowFunction,WindowFunction

The basic functions of ProcessWindowFunction and WindowFunction have been, but ProcessWindowFunction can get more information.

Other optional API s

  • . trigger() -- trigger: defines when the window closes, triggers the calculation and outputs the result
  • . evictor() -- remover: defines the logic to remove some data
  • . allowedLateness() -- allow processing of late data
  • sideOutputLateData() -- put the late data into the side output stream
  • getSideOutput() -- get side output stream

window API overview

Keywords: flink

Added by phpr0ck5 on Sat, 29 Jan 2022 09:53:54 +0200