Flink Notes-Delayed Data Processing

Flink Notes-Delayed Data Processing


Flink's window processing streaming data provides a WaterMark mechanism based on EventTime, but it can only solve the problem of data disorder to a certain extent. In some extreme cases, data latency can be very serious, even through the WaterMark mechanism, it is impossible to wait until all data enters the window for processing. By default, Flink discards the severely late data; if users want to process and output the results even if the data arrives late, they can use Allowed Lateness mechanism to process the late data extra.

Out Of Order&Late

They are all concepts developed to deal with disorder. The differences are as follows:

  • To deal with the out-of-order problem through watermark mechanism belongs to the first layer of protection, and belongs to the overall protection. Usually, the solution to the disordered problem refers to this kind of protection.
  • The problem of out-of-order is dealt with through the allowed Lateness mechanism on the window, which belongs to the second layer of protection and the protection of a specific window operator, and the problem of late element refers to this kind of protection.

AllowedLateness&OutputTag

The DataStream API provides an allowed Lateness method to specify whether late data is processed. When specified, the Flink window calculates the end time of the window plus that time as the last release time of the window. When the EventTime in the accessed data does not exceed the last release time of the window, but WaterMark has exceeded the last release time of the window. Windows EndTime triggers window calculation directly. Conversely, if the event time exceeds the time when the window is finally released (the maximum latency), the data can only be discarded.
By default, the maximum Lateness time for Globle Windows is Long.MAX_VALUE, i.e. no timeout, so the data will accumulate continuously in the window, waiting to be triggered.

demo

/**
 * @author qingh.yxb
 * @since 2019/9/7
 */
public class AllowLateness {
    // def OutputTag
    private static final OutputTag<Tuple2<String, Integer>> myTag = new OutputTag<Tuple2<String, Integer>>("myTag") {
    };
    public static void main(String[] args) throws Exception {
        List<Tuple2<String, Integer>> source = Lists.newArrayList();
        source.add(new Tuple2<>("qingh1", 1));
        source.add(new Tuple2<>("qingh2", 2));
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Tuple2<String, Integer>> dataStreamSource = env.fromCollection(source);

        env.enableCheckpointing(20000, CheckpointingMode.EXACTLY_ONCE);
        //env.enableCheckpointing(20000);
        env.getCheckpointConfig() //  Clearance strategy
                .enableExternalizedCheckpoints(CheckpointConfig.
                        ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.setRestartStrategy(RestartStrategies.
                fixedDelayRestart(3,
                        10000));
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        SingleOutputStreamOperator<String> result = dataStreamSource.assignTimestampsAndWatermarks(
                new AssignerWithPunctuatedWatermarks<Tuple2<String, Integer>>() {
                    @Nullable
                    @Override
                    public Watermark checkAndGetNextWatermark(Tuple2<String, Integer> lastElement, long extractedTimestamp) {
                        return new Watermark(System.currentTimeMillis()-500);
                    }

                    @Override
                    public long extractTimestamp(Tuple2<String, Integer> element, long previousElementTimestamp) {
                        return System.currentTimeMillis()-1000;
                    }
                }
        ).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return "key";
            }
        }).timeWindow(Time.milliseconds(10)).allowedLateness(Time.milliseconds(10)).
                sideOutputLateData(myTag)
                //.sum(1);
                .process(new ProcessWindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>(){
                    @Override
                    public void process(String s, Context context, Iterable<Tuple2<String, Integer>> elements, Collector<String> out) throws Exception {
                        for (Tuple2<String, Integer> element : elements) {
                            out.collect(element.f0);
                        }
                    }
                });
        DataStream<Tuple2<String, Integer>> sideOutput = result.getSideOutput(myTag);
        //Output PutTag only
        sideOutput.print();
        env.execute("qinghh Demo");
    }
}

Test results:

Why hasn't the first data been shown? The problem seems to be here at org. apache. flink. streaming. api. operators. InternalTimeService Manager# advanced Watermark. It can also manually change the value of isSkippedElement to true when testing. Simple mock window does not have late, that is to say, isSkippedElement responds to whether the current window is late or not, that is, the clearance time of the window (in the event time type: maximum timestamp of data in the window + allowed Lateness) is less than the current water level.

Add result.print(); and the results are as follows:

On Output Tag

OutputTag is a side output identifier with name and type information; flink allows functions such as ProcessFunction, CoProcessFunction, ProcessWindows function, ProcessAll Windows function to output side output, which has an output (OutputTag output Tag, X value) method to output elements to side. Output
SingleOutputStream Operator provides getSideOutput method, which can be used to get the site output from the previous function based on OutputTag; Windows Operator's processElement method will finally judge that if isSkippedElement is true and isElementLate is true, then LatDataOutputTag will not be null. Output element of late to side output

demo is as follows:

~~~~.timeWindow(Time.milliseconds(10)).allowedLateness(Time.milliseconds(10)).
                sideOutputLateData(myTag)
                .process(new ProcessWindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>(){
                    @Override
                    public void process(String s, Context context, Iterable<Tuple2<String, Integer>> elements, Collector<String> out) throws Exception {
                        for (Tuple2<String, Integer> element : elements) {
                           //Ignore normal logic
                            //Output data to outPutTag
                            context.output(myTag,element);
                        }
                    }
                });
        DataStream<Tuple2<String, Integer>> sideOutput = result.getSideOutput(myTag);

The focus is context.output(myTag,element);

Keywords: Windows Apache less

Added by Amanda1998 on Sat, 07 Sep 2019 14:34:19 +0300