Flink custom window trigger

Flink built-in trigger and custom window trigger

flink: 1.10.1

Github: https://github.com/shirukai/flink-examples-trigger.git

Some time ago, colleagues encountered a demand in development, which requires counting according to the 10 minute window of event time. At the same time, when new data is needed, the results are output every minute. Window counting is very simple. Setting a window and then using an aggregation function can solve it. However, when there is new data, the result is output every minute. It seems that it is not easy to implement, because according to the usual window, the calculation will not be triggered until watermark exceeds the current window after 10 minutes. There is also a problem involved here. If no data is generated after the 10 minute window, the watermark will not grow and the window will not trigger the calculation, so we can't get the result all the time. How to solve the above problems? Here we need to involve the concept of trigger. This article first introduces several built-in triggers in Flink, and uses custom window triggers to solve the above problems.

1 create a sample project

Use maven to create a project based on Flink's official template

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.10.1 -DgroupId=flink.examples -DartifactId=flink-examples-trigger -Dversion=1.0 -Dpackage=flink.trigger.example -DinteractiveMode=false

2 Flink built-in trigger

We need to know when to use the built-in trigger and when to calculate the trigger time of the window before we use the built-in trigger. Flink's built-in trigger is as follows:

triggerdescribe
CountTriggerTrigger triggered when the count of elements in the window reaches a given number
EventTimeTriggerTrigger triggered when the watermark superelevation window ends
ProcessingTimeTriggerTrigger triggered when the system time exceeds the window end time
ContinuousEventTimeTriggerGiven a time interval, the trigger is triggered continuously according to the event time
ContinuousProcessTriggerA trigger that is triggered continuously according to the processing time given a time interval
PurgingTriggerYou can wrap any trigger to clear the window and state after it is triggered
DeltaTriggerSpecify a DeltaFunction and a threshold, which is the trigger triggered when the calculated Delta value exceeds the given threshold

1.1 CountTrigger

Given a maximum number, when the number of elements in the window is greater than or equal to this number, the window calculation is triggered.

The implementation is also relatively simple. A ReduceState is used to count and specify the implementation of an accumulation method. count+1 for each piece of data, and then judge whether it exceeds the specified quantity. If it exceeds the specified quantity, trigger the calculation and clear the counter.

	@Override
	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
		ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
		count.add(1L);
		if (count.get() >= maxCount) {
			count.clear();
			return TriggerResult.FIRE;
		}
		return TriggerResult.CONTINUE;
	}

1.2 EventTimeTrigger

The trigger based on event time triggers the calculation when the maximum time of the window is less than watermark.

The specific implementation of the following code can be seen. When the data comes, it will judge whether the maximum event of the current window is smaller than the current watermark. If it is smaller, the calculation will be triggered. Otherwise, register a timer based on the event time and return a CONTINUE.

	@Override
	public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
		if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
			// if the watermark is already past the window fire immediately
			return TriggerResult.FIRE;
		} else {
			ctx.registerEventTimeTimer(window.maxTimestamp());
			return TriggerResult.CONTINUE;
		}
	}

1.3 ProcessingTimeTrigger

Trigger based on processing time, which is triggered when the system time exceeds the maximum time of the window.

The specific implementation is as follows: when the data arrives, only a timer based on the processing time is registered. When the timer is triggered, colleagues also trigger the window calculation.

	@Override
	public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
		ctx.registerProcessingTimeTimer(window.maxTimestamp());
		return TriggerResult.CONTINUE;
	}
	@Override
	public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
		return TriggerResult.FIRE;
	}

1.4 ContinuousEventTimeTrigger

Literally, it is a continuous event time trigger, which needs to specify a time interval. The timer based on the event time will trigger the window according to the time interval.

The specific code is as follows. When the window data arrives, the previous logic is the same as EventTimeTrigger. First judge whether the current watermark exceeds the maximum value of the current window. If so, trigger the window calculation. Otherwise, register a timer based on the event time to the maximum time of the window. Then get the timer timestamp triggered according to the interval from the state. If this timestamp does not exist, recalculate the timestamp of the next timing trigger, register the timer based on the event time, and save the timestamp to the state again. When the event time timer is executed, judge whether it is triggered and register a new timer.

	private ContinuousEventTimeTrigger(long interval) {
		this.interval = interval;
	}
	@Override
	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {

		if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
			// if the watermark is already past the window fire immediately
			return TriggerResult.FIRE;
		} else {
			ctx.registerEventTimeTimer(window.maxTimestamp());
		}

		ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
		if (fireTimestamp.get() == null) {
			long start = timestamp - (timestamp % interval);
			long nextFireTimestamp = start + interval;
			ctx.registerEventTimeTimer(nextFireTimestamp);
			fireTimestamp.add(nextFireTimestamp);
		}

		return TriggerResult.CONTINUE;
	}

	@Override
	public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {

		if (time == window.maxTimestamp()){
			return TriggerResult.FIRE;
		}

		ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);

		Long fireTimestamp = fireTimestampState.get();

		if (fireTimestamp != null && fireTimestamp == time) {
			fireTimestampState.clear();
			fireTimestampState.add(time + interval);
			ctx.registerEventTimeTimer(time + interval);
			return TriggerResult.FIRE;
		}

		return TriggerResult.CONTINUE;
	}

1.5 ContinuousProcessTrigger

Literally, it is a continuous processing time trigger, which needs to specify a time interval. The timer based on the processing time will trigger the window according to the time interval.

The specific code is as follows. When the window data arrives, get the time stamp based on the processing time timer from the state. If the time stamp does not exist, recalculate the new timer time stamp, and register a timer based on the processing time based on this time stamp. When the timer of processing time is executed, the window calculation is triggered and a new timer is registered based on the time interval.

	private ContinuousProcessingTimeTrigger(long interval) {
		this.interval = interval;
	}

	@Override
	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
		ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);

		timestamp = ctx.getCurrentProcessingTime();

		if (fireTimestamp.get() == null) {
			long start = timestamp - (timestamp % interval);
			long nextFireTimestamp = start + interval;

			ctx.registerProcessingTimeTimer(nextFireTimestamp);

			fireTimestamp.add(nextFireTimestamp);
			return TriggerResult.CONTINUE;
		}
		return TriggerResult.CONTINUE;
	}
	
		@Override
	public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
		ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);

		if (fireTimestamp.get().equals(time)) {
			fireTimestamp.clear();
			fireTimestamp.add(time + interval);
			ctx.registerProcessingTimeTimer(time + interval);
			return TriggerResult.FIRE;
		}
		return TriggerResult.CONTINUE;
	}

1.6 DeltaTrigger

You need to specify a DeltaFunction to calculate the difference and a threshold to compare whether the difference is greater than the specified threshold. The calculation is triggered only when the difference is greater than the threshold.

The code implementation is as follows. In the construction method, the user needs to pass in a DeltaFunction and specify a threshold. When the window data arrives, first judge whether the previous value exists in the state. If so, call the user's DeltaFunction and pass in the previous value and the current value to calculate the travel value. If the difference is greater than the threshold, the calculation will be triggered.

	private DeltaTrigger(double threshold, DeltaFunction<T> deltaFunction, TypeSerializer<T> stateSerializer) {
		this.deltaFunction = deltaFunction;
		this.threshold = threshold;
		stateDesc = new ValueStateDescriptor<>("last-element", stateSerializer);

	}

	@Override
	public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
		ValueState<T> lastElementState = ctx.getPartitionedState(stateDesc);
		if (lastElementState.value() == null) {
			lastElementState.update(element);
			return TriggerResult.CONTINUE;
		}
		if (deltaFunction.getDelta(lastElementState.value(), element) > this.threshold) {
			lastElementState.update(element);
			return TriggerResult.FIRE;
		}
		return TriggerResult.CONTINUE;
	}

1.7 PurgingTrigger

You can wrap any trigger to clear the window and state after it is triggered.

	private  PurgingTrigger(Trigger<T, W> nestedTrigger) {
		this.nestedTrigger = nestedTrigger;
	}

	@Override
	public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
		TriggerResult triggerResult = nestedTrigger.onElement(element, timestamp, window, ctx);
		return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
	}

	@Override
	public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
		TriggerResult triggerResult = nestedTrigger.onEventTime(time, window, ctx);
		return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
	}

	@Override
	public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
		TriggerResult triggerResult = nestedTrigger.onProcessingTime(time, window, ctx);
		return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
	}

2 custom trigger

By comparing the above triggers, it can be found that the continuous process trigger can meet our needs of outputting results every minute, but there is a problem with this trigger, that is, it will output results every minute no matter whether there is new data generation or not. If we want to trigger only when there is data, we can make some changes based on this trigger.

package flink.trigger.example.custom;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

import java.util.Objects;

/**
 * Event interval trigger
 * copy from {@link org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger}
 * Rewrite {@ link org. Apache. Flick. Streaming. API. Windowing. Triggers. Eventtimetrigger #oneventtime (long, timewindow, triggercontext)}
 * And {@ link org. Apache. Flick. Streaming. API. Windowing. Triggers. Eventtimetrigger #onprocessingtime (long, timewindow, triggercontext)}
 * method:
 * 1. Add a timer based on processing time in onElement method
 * 2. In the onProcessingTime method, add the logic to send the window after the timer is triggered
 *
 * @author shirukai
 */
public class EventTimeIntervalTrigger<T> extends Trigger<T, TimeWindow> {
    private final long interval;

    /**
     * When merging we take the lowest of all fire timestamps as the new fire timestamp.
     */
    private final ReducingStateDescriptor<Long> stateDesc =
            new ReducingStateDescriptor<>("fire-time", new EventTimeIntervalTrigger.Min(), LongSerializer.INSTANCE);

    private EventTimeIntervalTrigger(long interval) {
        this.interval = interval;
    }

    /**
     * Create trigger instance
     *
     * @param interval interval
     * @param <T>      Data type generics
     * @return EventTimeIntervalTrigger
     */
    public static <T> EventTimeIntervalTrigger<T> of(Time interval) {
        return new EventTimeIntervalTrigger<>(interval.toMilliseconds());
    }

    @Override
    public TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            // if the watermark is already past the window fire immediately
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            // Judge whether the timer has been registered
            ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
            if (Objects.isNull(fireTimestamp.get()) || fireTimestamp.get() < 0) {
                // Register a timer based on processing time
                long currentTimestamp = ctx.getCurrentProcessingTime();
                long nextFireTimestamp = currentTimestamp - (currentTimestamp % interval) + interval;
                ctx.registerProcessingTimeTimer(nextFireTimestamp);
                fireTimestamp.add(nextFireTimestamp);
            }
            return TriggerResult.CONTINUE;
        }
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.getPartitionedState(stateDesc).add(Long.MIN_VALUE);
        return TriggerResult.FIRE;
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return time == window.maxTimestamp() ?
                TriggerResult.FIRE :
                TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
        // Clean up event time timer
        ctx.deleteEventTimeTimer(window.maxTimestamp());
        // Cleaning processing time timer
        if (Objects.isNull(fireTimestamp.get()) || fireTimestamp.get() > 0) {
            ctx.deleteProcessingTimeTimer(fireTimestamp.get());
        }
        // Cleaning status
        fireTimestamp.clear();
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(TimeWindow window,
                        OnMergeContext ctx) {
        // only register a timer if the watermark is not yet past the end of the merged window
        // this is in line with the logic in onElement(). If the watermark is past the end of
        // the window onElement() will fire and setting a timer here would fire the window twice.
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
            ctx.registerEventTimeTimer(windowMaxTimestamp);
        }
    }

    private static class Min implements ReduceFunction<Long> {
        private static final long serialVersionUID = 1L;

        @Override
        public Long reduce(Long value1, Long value2) throws Exception {
            return Math.min(value1, value2);
        }
    }

    @Override
    public String toString() {
        return "EventTimeIntervalTrigger()";
    }
}

Keywords: flink

Added by twistisking on Wed, 02 Feb 2022 01:17:05 +0200