[Flink] [Chapter 6 Window]

Window overview

Streaming computing is a data processing engine designed to process infinite data sets, which refers to a growing essentially infinite data set, and window is a means to cut infinite data into finite blocks.
Window is the core of infinite data stream processing. Window splits an infinite stream into "buckets" of finite size. We can perform calculation operations on these buckets.
The window in the flash can be understood as a bucket, and the data can be understood as water flow. The water flow is continuous. For DataStream, it is a data processing process. After having a bucket, the data is loaded into the bucket for processing, and the calculation results are output when the window is closed

Conclusion: window = data bucket; Window essence = data union

Window type

Window in Flink can be divided into two categories

(1) CountWindow: generates a Window according to the specified number of data pieces, regardless of time.

  • Scroll count window
  • Sliding count window

(2) TimeWindow: generates a Window by time.

  • scroll window
  • sliding window
  • session Window

There is only a time window in Spark

1. Scrolling windows

(1) Slice the data according to the fixed window length.
(2) Features: fixed window length, no data overlap between windows; The window is closed on the left and open on the right; Window alignment
The rolling window allocator allocates each element to a window with a specified window size. The rolling window has a fixed size and will not overlap. For example, if you specify a 5-minute scrolling window, the creation of the window is shown in the following figure:

2. Sliding Windows

Features: time alignment, fixed window length, data overlap between windows; Left closed right open; Window alignment
For example, if you have a 10 minute window and a 5-minute slide, the 5-minute window in each window contains the data generated in the last 10 minutes, as shown in the following figure:

Generally, the window size is an integer multiple of the step size, otherwise some data will belong to n windows and some data will belong to n+1 windows; The larger the step size, the less overlapping data between windows.
Window 1h, step 30min, each data belongs to two windows
Window 1h, step 5min, each data belongs to 12 windows

3. Session window (Session Windows)

The session window belongs to the time window
Features: the length of the window is not fixed, but when no data is generated after a period of time, the window is closed; Time no alignment

  • You need to specify the session gap time interval. Session gap time is the minimum time interval between windows. When no data is generated in gap time, the window will be intercepted
  • It is composed of a series of events combined with a timeout gap of a specified length of time, which is similar to the session of a web application, that is, if new data is not received for a period of time, a new window will be generated.
  • The session window allocator groups elements through session activities. Compared with rolling windows and sliding windows, the session window does not have overlapping and fixed start time and end time. On the contrary, when it no longer receives elements within a fixed time period, that is, the inactive interval is generated, the window will be closed. A session window is configured through a session interval, which defines the length of the inactive cycle. When the inactive cycle is generated, the current session will be closed and subsequent elements will be allocated to the new session window.

Window API

Windowing operations are based on keyedStream, because aggregation is often based on class aggregation. The only common dataStream is windowsall

1. Window creation

Flink offers timeWindow and The countWindow method is used to define the time window and count window
① Scrolling time window

.timeWindow(Time.seconds(5))

② Sliding time window

.timeWindow(Time.seconds(5),Time.seconds(2))

③ session window

.window(EventTimeSessionWindows.withGap(Time.seconds(5));)

④ Scrolling count window

.countWindow(5)

⑤ Sliding count windowslidingcountwindow

.countWindow(10,2)

1.1 TimeWindow

TimeWindow is used to compose all data within the specified time range into one window and calculate all data in one window at a time

1.1.1 scrolling window

Flink's default time window is divided into windows according to Processing Time, and the data obtained by Flink is divided into different windows according to the time of entering Flink.

DataStream<Tuple2<String, Double>> minTempPerWindowStream = dataStream
                .map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
                    @Override
                    public Tuple2<String, Double> map(SensorReading value) throws Exception {
                        return new Tuple2<>(value.getId(), value.getTemperature());
                    }
                })
                .keyBy(data -> data.f0)
                .timeWindow( Time.seconds(15) )
                .minBy(1);

(1) The time interval can be through time milliseconds(x),Time.seconds(x),Time.minutes(x) and so on.
(2) Scroll the window, calculate and output once according to the window size and time

1.1.2 sliding window

  • The function names of sliding window and rolling window are exactly the same, but two parameters need to be passed in when passing parameters, one is window_size, one is sliding_size.
  • Sliding in the following code_ The size is set to 5S, that is, the output result is calculated every 5S, and the window range of each calculation is all elements within 15s.
DataStream<SensorReading> minTempPerWindowStream = dataStream
                .keyBy(SensorReading::getId) 
                .timeWindow( Time.seconds(15), Time.seconds(5) )
                .minBy("temperature");

(1) Sliding window, output once according to step time, and output window / step times in total
(2) The rolling window is essentially a sliding window. The step size of the rolling window is consistent with the window size. It is output only once at a time, and the output interval is the window size

1.1.3 Session Window

  1. Put the continuously accessed data in a window. When the stop time exceeds the specified time, start window cutting calculation.
  2. The creation of session window is not abbreviated. It must be through the window() method. The parameter is the window allocator

Execution effect: continuous input will not see the result. Stop for 5s and the result can be output.

1.2 CountWindow

CountWindow triggers execution according to the number of the same key elements in the window. During execution, only the result corresponding to the key whose number of elements reaches the window size is calculated

1.2.1 scrolling window

The default CountWindow is a scrolling window. You only need to specify the window size. When the number of elements reaches the window size, the execution of the window will be triggered.

DataStream<SensorReading> minTempPerWindowStream = dataStream
                .keyBy(SensorReading::getId) 
                .countWindow( 5 )
                .minBy("temperature");

Note: CountWindow is based on the number of messages and is a keyedStream, so the window of CountWindow is_ Size and step_size refers to the number of elements with the same Key, not the total number of all elements entered.

1.2.2 sliding window

The function names of sliding window and rolling window are exactly the same, but two parameters need to be passed in when passing parameters, one is window_size, one is sliding_size.
Sliding step size: sliding in the following code_ The size is set to 2, that is, it is calculated every time two data with the same key are received, and the window range of each calculation is 5 elements

DataStream<SensorReading> minTempPerWindowStream = dataStream
                .keyBy(SensorReading::getId) 
                .countWindow( 5, 2 )
                .minBy("temperature");

2. Window based aggregation

1. The purpose of collecting data in the window is for calculation. The window function is provided in the flash to define the calculation operation to be performed on the data collected in the window;

2. The calculation and output of the flick window can be carried out independently;
In other words, you can only calculate without output, and wait until a certain time to trigger the output

2.1 incremental aggregation functions

Features: a piece of data is calculated once, but not output. It will be output only when the window reaches the closing condition

Typical incremental aggregation functions are:

  • ReduceFunction
  • Aggregatefunction (can only be used after keyedStream is opened)
  • Max,maxBy,min,minBy

2.2 full window functions

Features: collect all the data of the window without calculation. When the window is closed, an iterator is provided to store all the data in the window. When the calculation is triggered, it will traverse all the data for calculation.

Full window functions include:

  • ProcessWindowFunction
  • apply()
/ 
* @param function The window function.
 * @return The data stream that is the result of applying the window function to the window.
 */
public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> function) {
WindowFunction: Is an interface with four parameters.
 
* @param <IN> The type of the input value.
 * @param <OUT> The type of the output value.
 * @param <KEY> The type of the key.
 * @param <W> The type of {@code Window} that this window function can be applied on.
 */
@Public
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
 public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999);
        SingleOutputStreamOperator<Tuple2<String, Integer>> map = source.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                return new Tuple2<>(value, 1);
            }
        });

        KeyedStream<Tuple2<String, Integer>, Tuple> kStream = map.keyBy(0);
        WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> ttwindow = kStream.timeWindow(Time.seconds(5));
       //todo uses the full function as the wordCount full function of the window. There are two kinds of full functions: apply and ProcessWindowFunction
       // The use of apply() is shown here
       // The parameter of apply is windowfunction, which is an interface that inherits Function and serialization
        SingleOutputStreamOperator<Tuple2<String, Integer>> apply = ttwindow.apply(new myWindowFunction());

        apply.print();
        
        env.execute();

    }
    //todo needs four generics to do the wordcount windowfunction
//        * @param <IN> The type of the input value.
//        * @param <OUT> The type of the output value.
//        * @param <KEY> The type of the key. (key type of keydStream)
//        * @param <W> The type of {@code Window} that this window function can be applied on.
//
    public static class myWindowFunction
            implements WindowFunction<Tuple2<String, Integer>,Tuple2<String, Integer>,
            Tuple,TimeWindow>{

        @Override
        public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {
             //TODO implements the apply method with four parameters:
            //  The first is the key of KeyedStream
            //  The second is that window can obtain window information, such as the start and end time of the window
            //  The third is the iterator composed of all the data in the window
            //  The fourth is the Collector, which is used to write out the data

            //todo 1. Get the key and change it to String type
            String key = tuple.getField(0);
            // todo 2. The full function will load the same data as the window key into an iterator
            //  Traversal iterators process the same dataset as the key
            Iterator<Tuple2<String, Integer>> inputIter = input.iterator();
            int count = 0;
            while(inputIter.hasNext()){
                count += inputIter.next().f1;
            }
            // todo 3. After processing, the collector is used for output, which is the same as flatMap
            out.collect(new Tuple2<String,Integer>(key,count));
        }


Additional notes:

  • The incremental function is more efficient than the total quantity
  • The full application scenario is that all data must be obtained before calculation.

apply function: the parameter is WindowFunction

2.3 other optional API s

Trigger

  • Define when the window closes, trigger calculation and output results
  • Controls when the window closes and when the data is calculated.
  • Window closing, calculation and output can be operated independently.

Monitor () -- remover

  • Define the logic to remove some data
  • Generally not, it has been filtered in front of the window

The following three functions can only be used in event time semantics

  • . allowedLateness() -- allow processing of late data
  • . sideOutputLateData() -- put the late data into the side output stream
  • . getSideOutput() -- get side output stream
    If the waterMark has reached the window closing time, but late data is allowed, the late data will be put into the side output stream

Complete window call process

Additional note: brackets in the figure are optional, and aggregation function is required (the window is opened for aggregation calculation)

(1) Keyed Windows

(2) Noe-Keyed Windows

  • For non keyed windowall, there must be a trigger, otherwise it will not be calculated and data will be collected all the time.
  • TimeWindowAll can trigger calculation by itself
  • The windowsall operation is generally not used

Time offset of the time window

Both the time window in Flink and the time window in Spark are integral points. For example, if the step size is 3s, the start and end of the window must be an integral multiple of 3s. The reasons are as follows:

In the window allocator of a scrolling window:

  • There will be a default parameter offset of 0 in the window allocator of the scrolling window
  • By default, all time windows are created in integer time. For example, if the window is 5s, the window range is:
    [x min 0s,x min 5s), [x min 5s,x min 10s).
  • The function of time offset is to adjust the offset of the window. For example, if offset is set to 1s, the window range becomes: [x min 1s,x min 6s), [x min 6s,x min 11s)

Application scenario: the underlying time is UTC. For East 8 District of China, it is 8h earlier than UTC standard time. In China, if the window size is: time Days (1), the window range is from 8:00 a.m. to 8:00 p.m., not 0:00 to 24:00. offset needs to be set to - 8

Start time of time window

(1) Scroll time window

The assignWindows method is used to assign windows

Where: now is the current time of the system, start is the start time of the window, and the start time of the window is calculated as follows:

Current time - (current time - offset + window size)% window size

  • For example, the window size is 5s and the current time is 9:00:01
  • 9: 00:01 - (9:00:01 - 0 + 5s)% 5 = 01 - (06)% 5 = 0, so the window range is [9:00:00,9:00:05)
  • It can also be seen from the above that the offset can move the offset of the window

+Purpose of windowSize: timestamp - offset may be negative. When the time is 1970, timestamp is 0, which will lead to A negative timestamp - number, that is, the starting position of the window opened at time A will be behind the opening time A

The start time of the window in Spark is also determined in this way

(3) Sliding time window


In the slidingProcessingTimeWindows class:

For a sliding window, the start time of the window is determined through the slide.

  • Start time = timestamp - (timestamp step)% step

For example, the time of a piece of data is 9:01, the window size is 15min, and the step size is 5min = >
1.window array: Windows = new ArrayList < > (3)
2. Start time of the last window: lastStart = 9:00,
3. Use the window size and step traversal to create a new window and add it to the window array:

for(long start = lastStart;start>timestamp - size;start -= slide){
Windows.add(new TimeWindow(start,start+size))
}  

The window is closed to the left and opened to the right

In the TimeWindow class:

Get the maximum time of the current window:

Subtract 1ms from the end of the window, that is, the window size is 5s, but the window only accepts data within the time range of 9:00:0000 to 9:00:0499

Sliding window problem

(1) Sliding window, how many windows does a data belong to

List: for sliding windows, creating windows is actually creating multiple windows. Number of windows = size/slide
(2) Window size and sliding step size are not integer multiples
Suppose the time is 9:01, the window size is 5min, and the step size is 2min

It can be seen from the figure that the data between 00-01s falls in three windows and the data between 01-02 falls in two windows
Conclusion: the window size and step size of sliding window are not integer multiples, which will lead to some data falling in n+1 windows and some falling in n windows; n = WindowSize / SlideSize

Keywords: flink

Added by s_ainley87 on Thu, 13 Jan 2022 18:28:15 +0200