By default, when the watermark passes through the end of window and the previous data arrives, these data will be deleted.
In order to avoid some late data being deleted, the concept of allowed lateness is generated.
In short, allowed latency is for event time. After the watermark exceeds the end of window, it is also allowed to wait for a period of time (also measured by event time) for the previous data to arrive, so as to process the data again
By default, if allowedlatency is not specified, its value is 0, that is, after the watermark exceeds the end of window, the data of this window will be deleted.
Note: when the trigger is the default EventTimeTrigger, allowedlatency will trigger the calculation of the window again, and the previously triggered data will be buffer ed until watermark exceeds the time set by end of window + watermark + allowedlatency(). The second calculation is the accumulation in the DataFlow model.
Under what circumstances will data be discarded or not calculated?
There are two situations:
a. If allowedlatency is not set, a piece of data belongs to a window, but watermark exceeds the end time of the window, the data will be discarded;
b. When allowedlatency is set, a piece of data belongs to a window, but watermark exceeds the end time + delay time of the window, the data will be discarded;
In other words, if a piece of data under a key is delayed too much, it will be discarded. This problem can be solved by using the measured output stream to solve the last or late data;
Because the data of other key s will be reported, watermark will be improved, and finally the window calculation will be triggered.
Code test:
package com.cuichunchi.watermark; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import java.text.SimpleDateFormat; import java.time.Duration; import java.util.Date; import java.util.Iterator; /** * 01,1586489566000 * 01,1586489567000 * 01,1586489568000 * 01,1586489569000 * 01,1586489570000 * 01,1586489571000 * 01,1586489572000 * 01,1586489573000 * * Corresponding time: * 2020-04-10 11:32:46 * 2020-04-10 11:32:47 * 2020-04-10 11:32:48 * 2020-04-10 11:32:49 * 2020-04-10 11:32:50 */ public class TestWaterMark { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.disableOperatorChaining(); DataStreamSource<String> socketTextStream = env.socketTextStream("s201", 9099); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); SingleOutputStreamOperator<Tuple3<String,Integer,Long>> tuple2Map = socketTextStream.map(new MapFunction<String, Tuple3<String,Integer,Long>>() { @Override public Tuple3<String,Integer, Long> map(String value) throws Exception { return new Tuple3<>(value.split(",")[0],1, Long.parseLong(value.split(",")[1])); } }); //Extract timestamp SingleOutputStreamOperator<Tuple3<String,Integer,Long>> tuple2WMDS = tuple2Map.assignTimestampsAndWatermarks( //Set the watermark for a few seconds WatermarkStrategy.<Tuple3<String,Integer,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(2)) .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String,Integer,Long>>() { @Override public long extractTimestamp(Tuple3<String,Integer,Long> element, long recordTimestamp) { System.out.println("-------Extract timestamp:" + dateFormat.format( element.f2 )+ ",Default processing timestamp:" + recordTimestamp); return element.f2; } }) .withIdleness(Duration.ofMillis(3000)) ); //Simple real-time aggregation /*SingleOutputStreamOperator<Tuple3<String, Integer, Long>> sum = tuple2WMDS.keyBy(value -> value.f0) // .window(TumblingEventTimeWindows.of(Time.milliseconds(5))) // Tolerate data with a delay of several seconds // .allowedLateness() .sum(1) //After aggregation, output .process(new ProcessFunction<Tuple3<String, Integer, Long>, Tuple3<String, Integer, Long>>() { @Override public void processElement(Tuple3<String, Integer, Long> value, Context ctx, Collector<Tuple3<String, Integer, Long>> out) throws Exception { System.out.println("------------"+value); out.collect(new Tuple3<>("21",1111,2222L)); } }) ;*/ //TODO generates watermark through the source code TimeWindow#getWindowStartWithOffset() //TODO org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows() //TODO org.apache.flink.streaming.api.windowing.windows.TimeWindow.getWindowStartWithOffset() //Windowed polymerization OutputTag<Tuple3<String,Integer,Long>> diltyData = new OutputTag<Tuple3<String,Integer,Long>>("data"){}; SingleOutputStreamOperator<Tuple2<String, Integer>> processDS = tuple2WMDS.keyBy(value -> value.f0) .window(TumblingEventTimeWindows.of(Time.seconds(5))) // Tolerate data with a delay of several seconds .allowedLateness(Time.seconds(5)) //Write the delayed data to the side output stream .sideOutputLateData(diltyData) .process(new ProcessWindowFunction<Tuple3<String, Integer, Long>, Tuple2<String, Integer>, String, TimeWindow>() { @Override public void process(String s, Context context, Iterable<Tuple3<String, Integer, Long>> elements, Collector<Tuple2<String, Integer>> out) throws Exception { long start = context.window().getStart(); long end = context.window().getEnd(); long currentWatermark = context.currentWatermark(); System.out.println("======Parameter printing:"+s+ ",elements:"+elements.toString() +",Current start window: ["+dateFormat.format(start)+"],Current end window: ["+dateFormat.format(end)+ "],current watermark:"+dateFormat.format(currentWatermark)); Iterator<Tuple3<String, Integer, Long>> it = elements.iterator(); int sum = 0; while (it.hasNext()){ Tuple3<String, Integer, Long> next = it.next(); sum += next.f1; } out.collect(new Tuple2<>(s,sum)); } }); processDS.print("Result output====>"); processDS.getSideOutput(diltyData).print("Late data measurement output stream====>"); env.execute(); } }
Watermark is set to 2 seconds, allowedlatency is set to 5 seconds, and window is set to 5 seconds. The first window: [45, 50) because the watermark is 2 seconds, it is delayed for 2 seconds. The first window will be triggered only at 52 seconds, and the delay time is allowed to be set for 5 seconds. Therefore, the first window will be cleared only when 50 + 2 + 5 = 57 seconds.
Test results:
-------Extraction timestamp: 2020-04-10 11:32:46, default processing timestamp: - 9223372036854775808
-------Extraction timestamp: 2020-04-10 11:32:47, default processing timestamp: - 9223372036854775808
-------Extraction timestamp: 2020-04-10 11:32:52, default processing timestamp: - 9223372036854775808
======Parameter printing: 01, elements: [(01,11586489566000), (01,11586489567000)], current start window: [2020-04-10 11:32:45], current end window: [2020-04-10 11:32:50], current watermark:2020-04-10 11:32:49
Result output = = = = > (01,2) (if the last end window time is 50, the first window calculation will be triggered at 52 seconds)
-------Extraction timestamp: 2020-04-10 11:32:49, default processing timestamp: - 9223372036854775808
======Parameter printing: 01, elements: [(01,11586489566000), (01,11586489567000), (01,11586489569000)], current start window: [2020-04-10 11:32:45], current end window: [2020-04-10 11:32:50], current watermark:2020-04-10 11:32:49
Result output = = = > (01,3) (within the allowable late time range of 5 seconds, 50 + 2 + 5 = 57. If it exceeds 57 seconds, the metadata of this window will be cleared and the calculation will not be triggered again. Within the range, the calculation will be triggered every time the data arrives)
-------Extraction timestamp: 2020-04-10 11:32:57, default processing timestamp: - 9223372036854775808
======Parameter printing: 01, elements: [(01,11586489572000)], current start window: [2020-04-10 11:32:50], current end window: [2020-04-10 11:32:55], current watermark:2020-04-10 11:32:54
Result output = = = = > (01,1) (57 seconds, the second window is triggered, and the metadata of the first window is cleared)
-------Extraction timestamp: 2020-04-10 11:32:47, default processing timestamp: - 9223372036854775808
Late data measurement output stream = = = = > (01,11586489567000) (send the data in the first window again, because the first window has been cleared, so it is finally output in the measurement output stream)