Flink multi parallelism and WaterMark

Flink multi parallelism and WaterMark

Recently, when reviewing flink, I found that the demo s written before are all single parallelism. Is the window trigger under the sudden fantasy multi parallelism the same as the single parallelism? Therefore, the following problems are extended.

First, I set the data delay time to 2s, and then set a window with a length of 5 seconds and a sliding distance of 1s.
test data
a 1
b 2
c 3
a 4
f 5
z 6
v 7

The digital part is time stamp

Single parallelism
  • When a 4 enters, trigger the window calculation of [- 3 ~ 2] (the window calculation is as follows)
Window start time calculation logic
// org.apache.flink.streaming.api.windowing.windows.TimeWindow function location
/**
     * Method to get the window start for a timestamp.
     *
     * @param timestamp epoch millisecond to get the window start.
     * @param offset The offset which window start would be shifted by.
     * @param windowSize The size of the generated windows.
     * @return window start
     */
    public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
        return timestamp - (timestamp - offset + windowSize) % windowSize;
    }
  • According to the above rules, the following formula is obtained,

    1 - (1 - 2 + 5)% 5 = -3
    
    # 1 event time
    # 2 delay time
    # 5 window length
    
Multi parallelism

When the parallelism is set to 4, the "weird" time occurs. When the data is sent to a 4, no calculation is triggered, so I continue to send the data

f 5

z 6

v 7

Until now, the window calculation of [- 3 ~ 2] is triggered. It makes me wonder! After further study, we come to the following conclusions.

All partition data shall meet [event time of current partition > = current minimum watermark of all partitions] as follows:


  • The first window shown in the figure above should be [- 3 ~ 2], and the smallest watermark in all partitions should be 4. However, when the data at time 4 came, the window did not close because each partition maintained its own watermark.

    At this time, only p4 has reached the standard of triggering window [- 32] to close. Therefore, the [- 32] window will not be closed

  • At this time, p1 enters a data with event time of 5, which also meets the standard of triggering window [- 3 ~ 2] to close. However, p2 and p3 still do not meet the trigger time standard, so the window is still not closed

  • Until the event time 7 enters p3, all partition data has exceeded the minimum watermark 4, and the trigger window [- 3 ~ 2] is closed

Extension: suppose that after 6 enters p2, the out of order data is less than 4, for example, 3 enters p3, which occurs frequently in the follow-up. As a result, the window [- 3 ~ 2] cannot be closed for a long time, and all the data is overstocked. How to solve it?

Attach test code
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
 * @Author:admin
 * @Description:
 * @Date:Created in 16:04 2022/1/4
 * @Modified By:
 */
public class Test {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
       // executionEnvironment.setParallelism(4);

        DataStreamSource<String> source = executionEnvironment.socketTextStream("node28.testbigdate", 9991);

        SingleOutputStreamOperator<JSONObject> map = source.map(line -> {
            JSONObject json = new JSONObject();
            String[] s = line.split(" ");
            json.put("name", s[0]);
            json.put("date", s[1]);
            return json;
        }).assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
                    @Override
                    public long extractTimestamp(JSONObject element, long recordTimestamp) {
                        return element.getLong("date") * 1000L;
                    }
                }));
        map.print("===========>");

        KeyedStream<JSONObject, String> name = map.keyBy(t -> t.getString("name"));
        name.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)))
                .process(new ProcessWindowFunction<JSONObject, Object, String, TimeWindow>() {

                    @Override
                    public void process(String s, Context context, Iterable<JSONObject> elements, Collector<Object> out) throws Exception {
                        long start = context.window().getStart();
                        long end = context.window().getEnd();
                        System.out.println("start time" + start);
                        System.out.println("End time" + end);
                        System.out.println("total" + elements.spliterator().estimateSize());
                    }
                });

        executionEnvironment.execute();

    }
}

Keywords: Big Data flink

Added by craige on Thu, 06 Jan 2022 12:16:23 +0200