Flink Actual Warfare - Time & Windows Programming

0 Relevant source code

Master Flink three commonly used Time processing methods, master the use of rolling window and sliding window in Flink, and understand the watermark ing in Flink.

Flink supports different time concepts in stream processing engineering.

Processing time

The system time of the machine that performs the corresponding operator operation.

When the process runs at processing time, all time-based operator operations (such as time windows) will use the system clock of the machine running the corresponding operator. The hourly processing time window will include all records that arrive at a particular operator during the entire hour indicated by the system clock.

For example, if the application starts running at 9:15 a.m., the first hourly processing time window will include events handled between 9:15 a.m. and 10:00 a.m., and the next window will include events handled between 10:00 a.m. and 11:00 a.m.

Processing time is the simplest concept of time and does not require coordination between flow and machines.

It provides optimal performance and minimum latency. However, in distributed and asynchronous environments, processing time does not provide certainty because it is susceptible to the speed at which records arrive at the system (e.g., from message queues) to the speed at which operators are recorded in the system. And blackouts (dispatching or other).

Event time

The time at which each individual event occurs on its production facility.

This time is usually built into the record before entering Flink, and the event timestamp can be extracted from each record.

In event time, the progress of time depends on data, not any clock.

Event time program must specify how to generate event time watermarking, which is a mechanism to indicate the progress of event time.

In a perfect world, event time processing will produce completely consistent and deterministic results, regardless of when the event arrives or in sequence.

However, unless events are known to arrive sequentially (by timestamp), event time processing can cause some delays while waiting for disordered events. The possibility of deterministic event-time applications is limited by the fact that they can only wait for a limited period of time.

Assuming that all data has arrived, the operator operation will run as expected, producing correct and consistent results even when dealing with disordered or delayed events or reprocessing historical data.

For example, an hourly event time window will contain all records with event timestamps that fall into that hour, regardless of the order in which they arrive or when they are processed. (For more information, see the section on late events.)

Note that sometimes when event time programs process real-time data in real time, they use some processing time operator operations to ensure that they are in time.

Ingestion time

The time when the event entered Flink.

At the source operator, each record takes the current time of the source as a timestamp, and time-based operator operations (such as time windows) refer to the timestamp.

Conceptually, it lies between event time and processing time.

  • Compared with processing time, it is slightly more expensive, but it can provide more predictable results. Because of the use of a stable timestamp (allocated once at the source), the same timestamp will be referenced for different window operator operations of records, and in processing time, each window operator can allocate records to different windows (based on local system clocks and any transport delays)
  • Compared with the event time, it is impossible to process any disordered events or late data, but the program does not need to specify how to generate the watermarking.

Internally, the uptake time is very similar to the event time, but it has automatic timestamp allocation and automatic watermarking generation function.

4 Setup time characteristics

The first part of a Flink DataStream program usually sets basic time characteristics

  • Obviously, in Flink's streaming environment, processing time is used by default.

This setting defines the behavior of data stream sources (for example, whether they will allocate timestamps) and the concept of time that window operator operations should use, such as

KeyedStream.timeWindow(Time.seconds(30)). 

The following example shows a Flink program that aggregates events in an hourly time window. The behavior of windows adapts to time characteristics.

  • Java
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

// Optional:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer09<MyEvent>(topic, schema, props));

stream
    .keyBy( (event) -> event.getUser() )
    .timeWindow(Time.hours(1))
    .reduce( (a, b) -> a.add(b) )
    .addSink(...);
  • Scala
val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val stream: DataStream[MyEvent] = env.addSource(new FlinkKafkaConsumer09[MyEvent](topic, schema, props))

stream
    .keyBy( _.getUser )
    .timeWindow(Time.hours(1))
    .reduce( (a, b) => a.add(b) )
    .addSink(...)

Note that in order to run this example at event time, the program needs to use a source that defines event time directly for data and emits its own watermarking, or the program must inject a timestamp distributor and a watermarking generator after the source. These functions describe how event timestamps are accessed, and how disorderly the event flow is.

5 Windows

5.1 Brief Introduction

Windows is the core of dealing with infinite flow. Windows divides streams into finite "buckets" on which we can apply computations. We focus on how to execute windows in Flink and how programmers can maximize the benefits of the functions they provide.

The general structure of the window Flink program is as follows

  • The first fragment refers to the Keys stream.
  • The second fragment refers to non-Keys streams.

As you can see, the only difference is that keyBy(...) calls for Keys streams and windows (...) to become Windows All (...) non-Keyized data streams. This will also serve as a roadmap for the rest of the page.

Keyed Windows

Non-Keyed Windows

Above, the commands in square brackets (...) are optional. This shows that Flink allows you to customize window logic in many different ways to best suit your needs.

5.2 Window Life Cycle

In short, whenever the first data element that should belong to this window arrives, a window is created, and when the time (event or processing time) exceeds its end timestamp and is specified by the user, the window is completely deleted from allowed lateness (see Allowed Delay). Flink guarantees that only time-based windows are deleted, not other types, such as global windows (see Window Allocator). For example, using an event-time-based window strategy, create a non-overlapping (or rollover) window every five minutes and allow a delay of one minute. Flink will create a new window at 12:00 and 12:05 intervals. When the first data element with a timestamp falling into this interval arrives, when the watermarking passes through the 12:06 timestamp, it will Delete it.

In addition, each window will have a Trigger and a function (Process Windows Function, ReduceFunction, Aggregate Function or FoldFunction) connected to it. The function will contain calculations to be applied to the content of the window, and the Trigger specified window is considered ready to apply the condition of the function.

The triggering strategy may be similar to "when the number of data elements in the window is greater than 4" or "when the watermarking ends through the window".

The trigger can also decide to clear the contents of the window at any time between creation and deletion. In this case, clearance refers only to data elements in the window, not window metadata. This means that new data can still be added to the window.

In addition to the above, you can also specify an Evictor that can delete data elements from windows after triggers and before and/or after application functions.

5.3 Keys and Non-Keys Windows

The first thing to specify is whether your stream should be typed. This operator operation must be completed before the window is defined. Use the keyBy(...) to divide your infinite stream into logically keyed data streams. If keyBy(...) is not invoked, it means that your stream is not keyed.

For Keyized data streams, any attributes of the incoming event can be used as Keys (more details here). Having Keyized data streams will allow your window computation to be executed by multiple tasks in parallel, because each logical Keyized data stream can be processed independently of the rest of the tasks. All data elements referring to the same Keys are sent to the same parallel task.

In the case of non-Key ized data streams, your original stream will not be split into multiple logical streams, and all window logic will be executed by a single task, i.e., parallelism is 1.

6 window distributor

After specifying whether the stream has been typed, the next step is to define a window allocator.

The window allocator defines how to allocate data elements to windows by specifying your selection in Windows (...) (for Keys streams) or windows All () (for non-Keys streams) calls by Windows Assigner.

Windows Assigner is responsible for assigning each incoming data element to one or more windows

Flink comes with a predefined window allocator for the most common use case, namely

  • scroll window
  • sliding window
  • Session window
  • Global Window

You can also implement a custom window allocator by extending the Windows Assigner class. All built-in window allocators (except global windows) allocate data elements to Windows based on time, which can be processing time or event time. Check out our section on active time to see the difference between processing time and event time and how timestamps and watermarks are generated.

Time-based windows have start timestamps (including) and end timestamps (excluding), which together describe the size of the window.

In the code, Flink uses a time-based window with a query start and end timestamp method, maxTimestamp(), to return the maximum allowable timestamp for a given window.

The figure below shows the work of each allocator. The purple circle represents the data elements of the stream, which are divided by a key (in this case user 1, user 2 and user 3). The x-axis shows the progress of the time.

6.1 Scroll Window

Each data element of a scroll window allocator is assigned to the window size of the specified window. The rolling window has a fixed size and does not overlap.

For example, if a rollover window with a specified size of 5 minutes is specified, the current window is evaluated and a new window is started every five minutes, as shown in the following figure

The following code snippet shows how to use the scroll window.

  • Java
DataStream<T> input = ...;

// tumbling event-time windows
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>);

// tumbling processing-time windows
input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>);

// daily tumbling event-time windows offset by -8 hours.
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .<windowed transformation>(<window function>);
  • Scala
val input: DataStream[T] = ...

// tumbling event-time windows
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)

// tumbling processing-time windows
input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)

// daily tumbling event-time windows offset by -8 hours.
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .<windowed transformation>(<window function>)
  • Scala

  • Java

6.2 Sliding Window

The sliding window distributor distributes components to fixed length windows. Similar to the scroll window allocator, window size is configured by window size parameters

Additional window sliding parameters control the starting frequency of the sliding window. Therefore, sliding windows can overlap if the slide is smaller than the window size. In this case, data elements are allocated to multiple windows.

For example, you can slide a window of 10 minutes in size for 5 minutes. With this, you get a window every five minutes containing events that have arrived in the past 10 minutes, as shown in the figure below.

The following code snippet shows how to use sliding windows

  • Java
DataStream<T> input = ...;

// Sliding Event Time Window
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>);

//  Sliding processing time window
input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>);

// daily tumbling event-time windows offset by -8 hours.
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .<windowed transformation>(<window function>);
  • Scala
val input: DataStream[T] = ...

// tumbling event-time windows
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)

// tumbling processing-time windows
input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)

// daily tumbling event-time windows offset by -8 hours.
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .<windowed transformation>(<window function>)

7 window function

After defining the window allocator, we need to specify the calculations to be performed on each window. This is the responsibility of the window function, which is used to process the data elements of each (possibly Keys-based) window when the system determines that the window is ready for processing.

The window function can be a ReduceFunction, Aggregate Function, FoldFunction or Process Windows Function. The first two can be executed more effectively because Flink can aggregate their data elements incrementally as each window arrives.

Process Windows Function obtains all data elements contained in the Iterable window and other meta-information about the window to which the data elements belong.

Window conversion with Process Windows Function cannot be performed as effectively as in other cases, because Flink must buffer all data elements in the internal window before calling the function. This can mitigate the incremental aggregation of two window elements by combining Process Windows Function with ReduceFunction, Aggregate Function or FoldFunction, and the additional metadata window Process Windows Function receives. We'll look at examples of each variant.

7.1 ReduceFunction

Specifies how to combine two data elements in the input to generate the same type of output data elements.

Flink uses ReduceFunction to aggregate window data elements incrementally.

Definition and use

  • Java
DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce(new ReduceFunction<Tuple2<String, Long>> {
      public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
        return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
      }
    });
  • Scala
val input: DataStream[(String, Long)] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }

The original data passed in is a string. Here we use numeric types to demonstrate the incremental effect.

Here, instead of waiting for all the data in the window to be processed at one time, it's two or two processing of the data.

  • input

  • Incremental output

  • Java

7.2 The aggregate function An Aggregate Function is a general version. ReduceFunction has three types: input type (IN), accumulator type (ACC) and output type (OUT). The input type is the type of data element in the input stream, and Aggregate Function has the method of adding an input data element to the accumulator. The interface also has a method for creating an initial accumulator, for merging two accumulators into one accumulator, and for OUT to extract output (type) from the accumulator. We will see how it works in the following examples.

In the same ReduceFunction, Flink aggregates the input data elements of the window incrementally as the window arrives.

An Aggregate Function can be defined and used in this way:

/**
 * The accumulator is used to keep a running sum and a count. The {@code getResult} method
 * computes the average.
 */
private static class AverageAggregate
    implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
  @Override
  public Tuple2<Long, Long> createAccumulator() {
    return new Tuple2<>(0L, 0L);
  }

  @Override
  public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
    return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
  }

  @Override
  public Double getResult(Tuple2<Long, Long> accumulator) {
    return ((double) accumulator.f0) / accumulator.f1;
  }

  @Override
  public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
    return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
  }
}

DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .aggregate(new AverageAggregate());
  • Scala
The accumulator is used to keep a running sum and a count. The [getResult] method
 \* computes the average.
 \*/
class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
  override def createAccumulator() = (0L, 0L)

  override def add(value: (String, Long), accumulator: (Long, Long)) =
    (accumulator.\_1 + value.\_2, accumulator.\_2 + 1L)

  override def getResult(accumulator: (Long, Long)) = accumulator.\_1 / accumulator.\_2

  override def merge(a: (Long, Long), b: (Long, Long)) =
    (a.\_1 + b.\_1, a.\_2 + b.\_2)
}

val input: DataStream[(String, Long)] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .aggregate(new AverageAggregate)

7.3 ProcessWindowFunction

ProcessWindows Function obtains Iterable containing all data elements of the window and Context objects with access time and status information, which enables it to provide more flexibility than other window functions. This is at the cost of performance and resource consumption, because data elements cannot be aggregated incrementally, but need to be buffered internally until the window is deemed ready for processing.

The process Windows Function appearance signature is as follows:

public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {

    /**
     * Evaluates the window and outputs none or several elements.
     *
     * @param key The key for which this window is evaluated.
     * @param context The context in which the window is being evaluated.
     * @param elements The elements in the window being evaluated.
     * @param out A collector for emitting elements.
     *
     * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
     */
    public abstract void process(
            KEY key,
            Context context,
            Iterable<IN> elements,
            Collector<OUT> out) throws Exception;

       /**
        * The context holding window metadata.
        */
       public abstract class Context implements java.io.Serializable {
           /**
            * Returns the window that is being evaluated.
            */
           public abstract W window();

           /** Returns the current processing time. */
           public abstract long currentProcessingTime();

           /** Returns the current event-time watermark. */
           public abstract long currentWatermark();

           /**
            * State accessor for per-key and per-window state.
            *
            * <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up
            * by implementing {@link ProcessWindowFunction#clear(Context)}.
            */
           public abstract KeyedStateStore windowState();

           /**
            * State accessor for per-key global state.
            */
           public abstract KeyedStateStore globalState();
       }

}
abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function {

  /**
    * Evaluates the window and outputs none or several elements.
    *
    * @param key      The key for which this window is evaluated.
    * @param context  The context in which the window is being evaluated.
    * @param elements The elements in the window being evaluated.
    * @param out      A collector for emitting elements.
    * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
    */
  def process(
      key: KEY,
      context: Context,
      elements: Iterable[IN],
      out: Collector[OUT])

  /**
    * The context holding window metadata
    */
  abstract class Context {
    /**
      * Returns the window that is being evaluated.
      */
    def window: W

    /**
      * Returns the current processing time.
      */
    def currentProcessingTime: Long

    /**
      * Returns the current event-time watermark.
      */
    def currentWatermark: Long

    /**
      * State accessor for per-key and per-window state.
      */
    def windowState: KeyedStateStore

    /**
      * State accessor for per-key global state.
      */
    def globalState: KeyedStateStore
  }

}

The key parameter is extracted by calling the specified Keys for keyBy() by KeySelector. In the case of tuple index Keys or string field references, this key type is always Tuple, and you must manually convert it to a tuple of the correct size to extract key fields.

A Process Windows Function can be defined and used like this:

DataStream<Tuple2<String, Long>> input = ...;

input
  .keyBy(t -> t.f0)
  .timeWindow(Time.minutes(5))
  .process(new MyProcessWindowFunction());

/* ... */

public class MyProcessWindowFunction
    extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {

  @Override
  public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
    long count = 0;
    for (Tuple2<String, Long> in: input) {
      count++;
    }
    out.collect("Window: " + context.window() + "count: " + count);
  }
}
val input: DataStream[(String, Long)] = ...

input
  .keyBy(_._1)
  .timeWindow(Time.minutes(5))
  .process(new MyProcessWindowFunction())

/* ... */

class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {

  def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]): () = {
    var count = 0L
    for (in <- input) {
      count = count + 1
    }
    out.collect(s"Window ${context.window} count: $count")
  }
}

This example shows how Process Windows Function counts data elements in a window. In addition, the window function adds information about the window to the output.

Note that simple aggregation (such as count) using Process Windows Function is very inefficient

8 watermarking

  • Recommended reading

Flink Flow Computing Programming--Brief Introduction to watermark

Reference resources

Event Time

Windows

Keywords: Java Windows Scala Fragment

Added by method_man on Tue, 23 Jul 2019 08:02:46 +0300