Flink built-in trigger and custom window trigger
flink: 1.10.1
Github: https://github.com/shirukai/flink-examples-trigger.git
Some time ago, colleagues encountered a demand in development, which requires counting according to the 10 minute window of event time. At the same time, when new data is needed, the results are output every minute. Window counting is very simple. Setting a window and then using an aggregation function can solve it. However, when there is new data, the result is output every minute. It seems that it is not easy to implement, because according to the usual window, the calculation will not be triggered until watermark exceeds the current window after 10 minutes. There is also a problem involved here. If no data is generated after the 10 minute window, the watermark will not grow and the window will not trigger the calculation, so we can't get the result all the time. How to solve the above problems? Here we need to involve the concept of trigger. This article first introduces several built-in triggers in Flink, and uses custom window triggers to solve the above problems.
1 create a sample project
Use maven to create a project based on Flink's official template
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.10.1 -DgroupId=flink.examples -DartifactId=flink-examples-trigger -Dversion=1.0 -Dpackage=flink.trigger.example -DinteractiveMode=false
2 Flink built-in trigger
We need to know when to use the built-in trigger and when to calculate the trigger time of the window before we use the built-in trigger. Flink's built-in trigger is as follows:
trigger | describe |
---|---|
CountTrigger | Trigger triggered when the count of elements in the window reaches a given number |
EventTimeTrigger | Trigger triggered when the watermark superelevation window ends |
ProcessingTimeTrigger | Trigger triggered when the system time exceeds the window end time |
ContinuousEventTimeTrigger | Given a time interval, the trigger is triggered continuously according to the event time |
ContinuousProcessTrigger | A trigger that is triggered continuously according to the processing time given a time interval |
PurgingTrigger | You can wrap any trigger to clear the window and state after it is triggered |
DeltaTrigger | Specify a DeltaFunction and a threshold, which is the trigger triggered when the calculated Delta value exceeds the given threshold |
1.1 CountTrigger
Given a maximum number, when the number of elements in the window is greater than or equal to this number, the window calculation is triggered.
The implementation is also relatively simple. A ReduceState is used to count and specify the implementation of an accumulation method. count+1 for each piece of data, and then judge whether it exceeds the specified quantity. If it exceeds the specified quantity, trigger the calculation and clear the counter.
@Override public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception { ReducingState<Long> count = ctx.getPartitionedState(stateDesc); count.add(1L); if (count.get() >= maxCount) { count.clear(); return TriggerResult.FIRE; } return TriggerResult.CONTINUE; }
1.2 EventTimeTrigger
The trigger based on event time triggers the calculation when the maximum time of the window is less than watermark.
The specific implementation of the following code can be seen. When the data comes, it will judge whether the maximum event of the current window is smaller than the current watermark. If it is smaller, the calculation will be triggered. Otherwise, register a timer based on the event time and return a CONTINUE.
@Override public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { // if the watermark is already past the window fire immediately return TriggerResult.FIRE; } else { ctx.registerEventTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } }
1.3 ProcessingTimeTrigger
Trigger based on processing time, which is triggered when the system time exceeds the maximum time of the window.
The specific implementation is as follows: when the data arrives, only a timer based on the processing time is registered. When the timer is triggered, colleagues also trigger the window calculation.
@Override public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) { ctx.registerProcessingTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) { return TriggerResult.FIRE; }
1.4 ContinuousEventTimeTrigger
Literally, it is a continuous event time trigger, which needs to specify a time interval. The timer based on the event time will trigger the window according to the time interval.
The specific code is as follows. When the window data arrives, the previous logic is the same as EventTimeTrigger. First judge whether the current watermark exceeds the maximum value of the current window. If so, trigger the window calculation. Otherwise, register a timer based on the event time to the maximum time of the window. Then get the timer timestamp triggered according to the interval from the state. If this timestamp does not exist, recalculate the timestamp of the next timing trigger, register the timer based on the event time, and save the timestamp to the state again. When the event time timer is executed, judge whether it is triggered and register a new timer.
private ContinuousEventTimeTrigger(long interval) { this.interval = interval; } @Override public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception { if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { // if the watermark is already past the window fire immediately return TriggerResult.FIRE; } else { ctx.registerEventTimeTimer(window.maxTimestamp()); } ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc); if (fireTimestamp.get() == null) { long start = timestamp - (timestamp % interval); long nextFireTimestamp = start + interval; ctx.registerEventTimeTimer(nextFireTimestamp); fireTimestamp.add(nextFireTimestamp); } return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception { if (time == window.maxTimestamp()){ return TriggerResult.FIRE; } ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc); Long fireTimestamp = fireTimestampState.get(); if (fireTimestamp != null && fireTimestamp == time) { fireTimestampState.clear(); fireTimestampState.add(time + interval); ctx.registerEventTimeTimer(time + interval); return TriggerResult.FIRE; } return TriggerResult.CONTINUE; }
1.5 ContinuousProcessTrigger
Literally, it is a continuous processing time trigger, which needs to specify a time interval. The timer based on the processing time will trigger the window according to the time interval.
The specific code is as follows. When the window data arrives, get the time stamp based on the processing time timer from the state. If the time stamp does not exist, recalculate the new timer time stamp, and register a timer based on the processing time based on this time stamp. When the timer of processing time is executed, the window calculation is triggered and a new timer is registered based on the time interval.
private ContinuousProcessingTimeTrigger(long interval) { this.interval = interval; } @Override public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception { ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc); timestamp = ctx.getCurrentProcessingTime(); if (fireTimestamp.get() == null) { long start = timestamp - (timestamp % interval); long nextFireTimestamp = start + interval; ctx.registerProcessingTimeTimer(nextFireTimestamp); fireTimestamp.add(nextFireTimestamp); return TriggerResult.CONTINUE; } return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception { ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc); if (fireTimestamp.get().equals(time)) { fireTimestamp.clear(); fireTimestamp.add(time + interval); ctx.registerProcessingTimeTimer(time + interval); return TriggerResult.FIRE; } return TriggerResult.CONTINUE; }
1.6 DeltaTrigger
You need to specify a DeltaFunction to calculate the difference and a threshold to compare whether the difference is greater than the specified threshold. The calculation is triggered only when the difference is greater than the threshold.
The code implementation is as follows. In the construction method, the user needs to pass in a DeltaFunction and specify a threshold. When the window data arrives, first judge whether the previous value exists in the state. If so, call the user's DeltaFunction and pass in the previous value and the current value to calculate the travel value. If the difference is greater than the threshold, the calculation will be triggered.
private DeltaTrigger(double threshold, DeltaFunction<T> deltaFunction, TypeSerializer<T> stateSerializer) { this.deltaFunction = deltaFunction; this.threshold = threshold; stateDesc = new ValueStateDescriptor<>("last-element", stateSerializer); } @Override public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception { ValueState<T> lastElementState = ctx.getPartitionedState(stateDesc); if (lastElementState.value() == null) { lastElementState.update(element); return TriggerResult.CONTINUE; } if (deltaFunction.getDelta(lastElementState.value(), element) > this.threshold) { lastElementState.update(element); return TriggerResult.FIRE; } return TriggerResult.CONTINUE; }
1.7 PurgingTrigger
You can wrap any trigger to clear the window and state after it is triggered.
private PurgingTrigger(Trigger<T, W> nestedTrigger) { this.nestedTrigger = nestedTrigger; } @Override public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception { TriggerResult triggerResult = nestedTrigger.onElement(element, timestamp, window, ctx); return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult; } @Override public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception { TriggerResult triggerResult = nestedTrigger.onEventTime(time, window, ctx); return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult; } @Override public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception { TriggerResult triggerResult = nestedTrigger.onProcessingTime(time, window, ctx); return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult; }
2 custom trigger
By comparing the above triggers, it can be found that the continuous process trigger can meet our needs of outputting results every minute, but there is a problem with this trigger, that is, it will output results every minute no matter whether there is new data generation or not. If we want to trigger only when there is data, we can make some changes based on this trigger.
package flink.trigger.example.custom; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import java.util.Objects; /** * Event interval trigger * copy from {@link org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger} * Rewrite {@ link org. Apache. Flick. Streaming. API. Windowing. Triggers. Eventtimetrigger #oneventtime (long, timewindow, triggercontext)} * And {@ link org. Apache. Flick. Streaming. API. Windowing. Triggers. Eventtimetrigger #onprocessingtime (long, timewindow, triggercontext)} * method: * 1. Add a timer based on processing time in onElement method * 2. In the onProcessingTime method, add the logic to send the window after the timer is triggered * * @author shirukai */ public class EventTimeIntervalTrigger<T> extends Trigger<T, TimeWindow> { private final long interval; /** * When merging we take the lowest of all fire timestamps as the new fire timestamp. */ private final ReducingStateDescriptor<Long> stateDesc = new ReducingStateDescriptor<>("fire-time", new EventTimeIntervalTrigger.Min(), LongSerializer.INSTANCE); private EventTimeIntervalTrigger(long interval) { this.interval = interval; } /** * Create trigger instance * * @param interval interval * @param <T> Data type generics * @return EventTimeIntervalTrigger */ public static <T> EventTimeIntervalTrigger<T> of(Time interval) { return new EventTimeIntervalTrigger<>(interval.toMilliseconds()); } @Override public TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { // if the watermark is already past the window fire immediately return TriggerResult.FIRE; } else { ctx.registerEventTimeTimer(window.maxTimestamp()); // Judge whether the timer has been registered ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc); if (Objects.isNull(fireTimestamp.get()) || fireTimestamp.get() < 0) { // Register a timer based on processing time long currentTimestamp = ctx.getCurrentProcessingTime(); long nextFireTimestamp = currentTimestamp - (currentTimestamp % interval) + interval; ctx.registerProcessingTimeTimer(nextFireTimestamp); fireTimestamp.add(nextFireTimestamp); } return TriggerResult.CONTINUE; } } @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { ctx.getPartitionedState(stateDesc).add(Long.MIN_VALUE); return TriggerResult.FIRE; } @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE; } @Override public void clear(TimeWindow window, TriggerContext ctx) throws Exception { ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc); // Clean up event time timer ctx.deleteEventTimeTimer(window.maxTimestamp()); // Cleaning processing time timer if (Objects.isNull(fireTimestamp.get()) || fireTimestamp.get() > 0) { ctx.deleteProcessingTimeTimer(fireTimestamp.get()); } // Cleaning status fireTimestamp.clear(); } @Override public boolean canMerge() { return true; } @Override public void onMerge(TimeWindow window, OnMergeContext ctx) { // only register a timer if the watermark is not yet past the end of the merged window // this is in line with the logic in onElement(). If the watermark is past the end of // the window onElement() will fire and setting a timer here would fire the window twice. long windowMaxTimestamp = window.maxTimestamp(); if (windowMaxTimestamp > ctx.getCurrentWatermark()) { ctx.registerEventTimeTimer(windowMaxTimestamp); } } private static class Min implements ReduceFunction<Long> { private static final long serialVersionUID = 1L; @Override public Long reduce(Long value1, Long value2) throws Exception { return Math.min(value1, value2); } } @Override public String toString() { return "EventTimeIntervalTrigger()"; } }