Detailed explanation of watermark of Flink

watermark introduction

In Flink, Watermark is a mechanism proposed by Apache Flink to handle EventTime window calculation. It is essentially a timestamp. Used to deal with the disorder problem in real-time data, it is usually realized by the combination of waterline and window.
From the generation of real-time stream events by the device, to Flink's source, and then to multiple oparator s processing data, the process will be affected by many factors such as network delay and back pressure, resulting in data disorder. During window processing, it is impossible to wait indefinitely for the delayed data to arrive. When a specific watermark is reached, it is considered that all the data before the watermark has been reached (even if there are delayed data behind), and the window calculation can be triggered. This mechanism is the watermark.



As shown in the figure above:
● w(11): indicates that the data before 11 has arrived, and the data before 11 can be calculated.
● w(20): indicates that the data before 20 has arrived, and the data before 20 can be calculated.

Use of watermark

Generation timing

Watermark can generate watermark immediately after receiving data from DataSource. You can also generate watermark after using map or filter operation after DataSource.
The best location for watermark production is as close to the data source as possible, because some assumptions about element order and relative timestamp will be made when watermark generation. Because the data source reading process is parallel, all operations that cause Flink to redistribute cross row data stream partitions (such as changing parallelism, keyby, etc.) will lead to element timestamp disorder. However, some initialized filter s and map s will not cause element redistribution, so they can be considered before generating watermarks.

Calculation of watermark

watermark = maximum event time to enter the Flink window (maxEventTime) - specified delay time (t)

Generation mode

First: With Periodic Watermarks

This periodically triggers the generation and sending of watermark.
The periodic allocation watermark will be more commonly used in the program. It is the watermark that we will instruct the system to send out at fixed time intervals.
When the time is set to event time, the time interval will be set to 200ms by default. You can set it yourself if you need to adjust it.

Set task time type and

 val env = StreamExecutionEnvironment.getExecutionEnvironment
 //Set time use event time
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 //Set the parallelism to 1
 env.setParallelism(1)
 //Set the automatic periodic generation of watermark. The default value is 200 milliseconds
 env.getConfig.setAutoWatermarkInterval(1000)

Sets the value of watermark watermark

     
    //Get data through local socket port
    val dataStream = env.socketTextStream("127.0.0.1",10010)
     //Convert the data to tuple2 format
     val tupStream = dataStream.map(line => {
        val arr = line.split(" ")
        (arr(0),arr(1).toLong)
      })
          
   //Set watermark
    val waterDataStream = tupStream.assignTimestampsAndWatermarks(
      //Set time minimum delay
      WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))
        //Set timestamp
        .withTimestampAssigner(new SerializableTimestampAssigner[Tuple2[String,Long]] {
          //Current maximum value
          var currentMaxNum = 0l
          val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
          override def extractTimestamp(t: Tuple2[String,Long], recordTimesstamp: Long): Long = {
            val eTime = t._2
            currentMaxNum = Math.max(eTime,currentMaxNum)
            //The current maximum value minus the data allowed to be out of order is the current watermark value.
            //Note: these codes are only for the convenience of local observation. They do not need to be written in normal development.
            val waterMark = currentMaxNum - 2000;
            println("data:"+t.toString()+"  ,"+sdf.format(eTime)+" ,  current watermark: "+sdf.format(waterMark))
            eTime
          }
        })
    )

    //Calculate and output data
    waterDataStream.keyBy(_._1).timeWindow(Time.seconds(3)).reduce((e1, e2)=>{
            (e1._1,e1._2+e2._2)
          }).print()

Input and output:

--------------------input
s3 1639100010955
s2 1639100009955
s1 1639100008955
s0 1639100007955
s4 1639100011955
s5 1639100012955
s6 1639100013955
s7 1639100016955
          
          
--------------------output
          
data:(s3,1639100010955)  ,2021-12-10 09:33:30 ,  current watermark: 2021-12-10 09:33:28
 data:(s2,1639100009955)  ,2021-12-10 09:33:29 ,  current watermark: 2021-12-10 09:33:28
 data:(s1,1639100008955)  ,2021-12-10 09:33:28 ,  current watermark: 2021-12-10 09:33:28
 data:(s0,1639100007955)  ,2021-12-10 09:33:27 ,  current watermark: 2021-12-10 09:33:28
 data:(s4,1639100011955)  ,2021-12-10 09:33:31 ,  current watermark: 2021-12-10 09:33:29
 data:(s5,1639100012955)  ,2021-12-10 09:33:32 ,  current watermark: 2021-12-10 09:33:30
(s2,1639100009955)
(s0,1639100007955)
(s1,1639100008955)
data:(s6,1639100013955)  ,2021-12-10 09:33:33 ,  current watermark: 2021-12-10 09:33:31
 data:(s7,1639100016955)  ,2021-12-10 09:33:36 ,  current watermark: 2021-12-10 09:33:34
(s3,1639100010955)
(s5,1639100012955)
(s4,1639100011955)

explain:

  • When using timeWindow, windows within one minute will be divided into:
    0-2,3-5,6-8,9-11,12-14,15-17,18-20,21-23,24-26,27-29,30-32,33-35...
  • The value of watermark is the maximum timestamp - de disorder time in the current input data. The data before watermark will be recognized as normal and can be calculated by window.
  • When s3-s4 is input in the above program, the seconds of watermark are 28 and 29, which are in the time window 27-29 divided by the time window, so the calculation is not triggered. The calculation of windows 27-29 will not be triggered in the window of another window 30-32 until s5 is input. Therefore, the values of S2, S0 and S1 are output.
  • Similarly, at s7, there is another window 33-35, so the calculation of the previous window is triggered.

Second: with punctated watermarks

Fixed point watermarks (marked watermarks) are not commonly used. They are mainly because the input stream contains some special tuples and marks to indicate the system progress, which is convenient for the scene of generating watermarks according to the input elements.
Because each incremental EventTime in the data flow will generate a Watermark.
In actual production, Punctuated mode will produce a large number of watermarks in the scene with high TPS, which will put pressure on the downstream operators to a certain extent. Therefore, Punctuated mode will be selected for Watermark generation only in the scene with high real-time requirements.

public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        if (event.hasWatermarkMarker()) {
            output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));
        }
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // Already implemented in onEvent
    }
}

Processing method of delayed data

There are three processing schemes for data with too long delay:

  1. Discard (default)
  2. Allowedlatency: Specifies the time allowed for data latency
  3. sideOutputLateData: collect late data
  • Data that is too late is discarded by default. Window will not be triggered. Because the window where the input data is located has been executed. Flink's solution to these late data is to discard them.

  • If you are late for a short time and the window where the input data is located has not been executed, it will not be discarded. This depends on the window size and the maximum allowable data disorder time.

Attach Flink official document address:

https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/datastream/event-time/generating_watermarks/

Keywords: Big Data flink

Added by watthehell on Sat, 11 Dec 2021 06:24:10 +0200