FlinkCEP mode API introduction and entry case

The code in this article can run normally in flink 1.13.3

1. Introduction to mode API

Based on the pattern API of FlinkCep, you can define a complex pattern sequence to be extracted from the input stream.
Each complex pattern sequence consists of multiple simple patterns

  • patterns single pattern: refers to a simple pattern, such as a pattern for finding a single event with the same attributes
  • Pattern sequence: refers to the final complex pattern sequence searched in the stream
    • Combine multiple single patterns together, starting with an initialization pattern (. begin(...))
    • It can be regarded as a directed graph of simple patterns, which transforms from one pattern to the next based on the conditions specified by the user
  • match: an input event sequence that accesses all patterns of the complex pattern sequence through a series of effective pattern transformations.

Note: each pattern must have a unique name that identifies the matching event. The schema name cannot contain the character ':'.

2. Single mode

The single mode can be singleton mode or cyclic mode. Each pattern can have one or more conditions according to which it accepts events
optional() specifies whether the mode is optional
Such as a, b + C? A, C and D in D are single case mode, and b + is cyclic mode

  • Singleton mode: only one event is accepted. By default, all modes are singleton mode
  • Circular mode: multiple events can be accepted. Single case mode with quantifiers is circular mode

2.1. quantifiers

For cycle mode
You can define how many events can be accepted for the loop pattern through the functions in the table
Green() specifies whether the loop mode is green

Note: by default, loose internal continuity (between subsequent events) is used

functionexplainStatus cleanup
times(4)4 times
times(4).optional()0 or 4 times
times(2, 4)2, 3 or 4 times
times(2, 4).optional()0, 2, 3 or 4 times
oneOrMore()>=Onceeither until() or within() to enable state clearing.
oneOrMore().optional()>=0 timeseither until() or within() to enable state clearing.
timesOrMore(2)>=2 timeseither until() or within() to enable state clearing.
timesOrMore(2).optional()0 or > = 2 timeseither until() or within() to enable state clearing.

2.2. Conditions

For each mode, you can specify a condition to determine whether an incoming event is accepted into this mode

  • Iteration condition: the most common case is to specify the conditions for accepting subsequent events according to the properties of previously accepted events or the statistics of one of the subsets
    • Iterative conditions can be powerful, especially when used in conjunction with circular patterns, such as oneOrMore().
    • The call to ctx.getEventsForPattern(...) will find all previously accepted events for a given potential match. The cost of this operation may be different, so please minimize its use when implementing the condition.
  • Simple condition: extends the IterativeCondition class mentioned earlier. It determines whether to accept an event only depends on the attributes of the event itself.
  • Combined conditions: combine multiple conditions and apply to any condition
    • The intersection of multiple conditions is realized by calling where() in turn to combine conditions. Namely and
    • Use OR to combine conditions by calling the or() method
  • Stop condition (until): if the loop mode (one or more, times or more) is used, you can specify a stop condition, otherwise it may be OOM. It means that after the event meeting the given conditions occurs, no event will be accepted into the mode
    • Exception: until can only be used in loop conditions.
    • Note: it allows the state of the corresponding pattern to be cleaned up under event based conditions.

2.3. Comprehensive code example

    public class MyCEPTest {
    public static void main(String args[]) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        DataStream<EventMsg> dataStream =
                env.fromElements(
                        new EventMsg(1L, LocalDateTime.parse("2020-04-15 08:05:01", dateTimeFormatter), "AA", "info", 1.00),
                        new EventMsg(2L, LocalDateTime.parse("2020-04-15 08:06:11", dateTimeFormatter), "AA", "error", 1.00),
                        new EventMsg(3L, LocalDateTime.parse("2020-04-15 08:07:21", dateTimeFormatter), "AA", "critical", 4.00),
                        new EventMsg(4L, LocalDateTime.parse("2020-04-15 08:08:21", dateTimeFormatter), "AA", "info", 1.00),
                        new EventMsg(5L, LocalDateTime.parse("2020-04-15 08:09:21", dateTimeFormatter), "AB", "error", 2.00),
                        new EventMsg(6L, LocalDateTime.parse("2020-04-15 08:11:51", dateTimeFormatter), "AB", "error", 2.00),
                        new EventMsg(7L, LocalDateTime.parse("2020-04-15 08:12:20", dateTimeFormatter), "AB", "critical", 1.00),
                        new EventMsg(8L, LocalDateTime.parse("2020-04-15 08:15:22", dateTimeFormatter), "AB", "error", 2.30),
                        new EventMsg(9L, LocalDateTime.parse("2020-04-15 08:17:34", dateTimeFormatter), "AB", "error", 1.00));

        SingleOutputStreamOperator<EventMsg> watermarks = dataStream.assignTimestampsAndWatermarks(
                // Maximum disorder degree
                WatermarkStrategy.<EventMsg>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner(
                                (SerializableTimestampAssigner<EventMsg>) (element, recordTimestamp) -> toEpochMilli(element.getEventTime()))
        );

        Pattern<EventMsg, ?> pattern = Pattern.<EventMsg>begin("start")
                .where(new SimpleCondition<EventMsg>() {
                    @Override
                    public boolean filter(EventMsg value) throws Exception {
                        return value.getEventType().equals("error");
                    }
                }).where(new SimpleCondition<EventMsg>() {
                    @Override
                    public boolean filter(EventMsg value) throws Exception {
                        return value.getEventName().startsWith("AB");
                    }
                }).timesOrMore(2)
                .followedBy("end")
                .where(new IterativeCondition<EventMsg>() {
                    @Override
                    public boolean filter(EventMsg value, Context<EventMsg> ctx) throws Exception {
                        double sum = value.getPrice();
                        for (EventMsg event : ctx.getEventsForPattern("start")) {
                            sum += event.getPrice();
                        }
                        return Double.compare(sum, 6.0) > 0;
                    }
                });


        PatternStream<EventMsg> patternStream = CEP.pattern(watermarks, pattern);

        DataStream<String> alerts = patternStream.select(new PatternSelectFunction<EventMsg, String>() {
            @Override
            public String select(Map<String, List<EventMsg>> msgs) throws Exception {
                StringBuffer sb = new StringBuffer();
                msgs.forEach((k, v) -> {
                    sb.append(k + ",");
                    sb.append(v.toString() + "\n");
                });
                return sb.toString();
            }
        });


        alerts.print();
        env.execute("Flink CEP Test");
    }

    public static final ZoneOffset zoneOffset8 = ZoneOffset.of("+8");

    public static long toEpochMilli(LocalDateTime dt) {
        return dt.toInstant(zoneOffset8).toEpochMilli();
    }

    //Must be a static class with properties
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class EventMsg {
        public long eventId;
        public LocalDateTime eventTime;
        public String eventName;
        public String eventType;
        public double price;

        @Override
        public String toString() {
            return String.format("%s-%s-%s-%s-%s", eventId, eventName, eventType, eventTime, price);
        }
    }
}

3. Combination mode

Pattern sequence: refers to the final complex pattern sequence searched in the stream

  • Combine multiple single patterns together, starting with an initialization pattern (. begin(...))
  • It can be regarded as a directed graph of simple patterns, which transforms from one pattern to the next based on the conditions specified by the user
  • A pattern sequence can only have one time constraint. If multiple such constraints are defined on different schemas, the smallest constraint is applied. Such as. within(Time.seconds(10));
  • **Note: * * cannot end with notFollowedBy(), and there cannot be an optional mode before NOT mode.

3.1. contiguity conditions

Continuity conditions

  • Strict continuity: all matching events are expected to appear strictly one by one without any mismatched events.
    • next() means strictly adjacent
    • notNext(), if you do not want one event type to follow another event type directly
    • It can solve the disorder problem
  • Relaxed continuity: ignore non matching events between matching events.
    • followedBy()
    • notFollowedBy() if you do not want the event type to be between the other two event types.
  • Non deterministic relaxed continuity: further relax the continuity, allowing additional matching of some matching events to be ignored.
    • followedByAny()

Example:
For mode "a", "B", given the event sequence "a", "c", "b1", "b2", the following results will be obtained:

  • Strict consistency: between 'a' and 'b': {} (mismatch), 'c' after 'a' causes' a 'to be discarded
  • Relaxed continuity: between 'a' and 'b': {a b1}, relaxed continuity is treated as "skip non matching event until the next matching event"
  • Non deterministic relaxed consistency: between "a" and "b": {a b1}, {a b2}

3.2 Contiguity within looping patterns

For mode "a, B + c", given the event sequence "a", "b1", "d1", "b2", "d2", "b3" and "c", the following results will be obtained:

  • Strict Contiguity: {a b1 b2 b3 c}
  • Relaxed Contiguity: {a b1 c}, {a b1 b2 c}, {a b1 b2 b3 c}
    • If combination is allowed, add the result: {a b1 b3 c}
  • Non-Deterministic Relaxed Contiguity: {a b1 c}, {a b2 c}, {a b3 c}, {a b1 b2 c}, {a b2 b3 c}, {a b1 b2 b3 c}
    • If combination is allowed, add the result: {a b1 b3 c}

3.3. Important functions

  • allowCombinations: works with oneOrMore() and times() and imposes strict continuity between matching events, that is, any mismatched elements break the match (such as next()). If not applied, use relaxed continuity (such as followedBy()).
  • allowCombinations: works with oneOrMore() and times() and imposes nondeterministic relaxed continuity (such as followbyany()) between matching events. If not applied, use relaxed continuity (such as followedBy()).

3.4. Comprehensive code example

The example code is as follows:

public class MyCEPTest {
    public static void main(String args[]) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        DataStream<EventMsg> dataStream =
                env.fromElements(
                        new EventMsg(1L, LocalDateTime.parse("2020-04-15 08:05:01", dateTimeFormatter), "a", "info", 1.00),
                        new EventMsg(2L, LocalDateTime.parse("2020-04-15 08:06:11", dateTimeFormatter), "b", "error", 1.00),
                        new EventMsg(3L, LocalDateTime.parse("2020-04-15 08:07:21", dateTimeFormatter), "d", "critical", 4.00),
                        new EventMsg(4L, LocalDateTime.parse("2020-04-15 08:08:21", dateTimeFormatter), "b", "info", 1.00),
                        new EventMsg(5L, LocalDateTime.parse("2020-04-15 08:09:21", dateTimeFormatter), "d", "error", 2.00),
                        new EventMsg(6L, LocalDateTime.parse("2020-04-15 08:11:51", dateTimeFormatter), "b", "error", 2.00),
                        new EventMsg(7L, LocalDateTime.parse("2020-04-15 08:12:20", dateTimeFormatter), "c", "critical", 1.00),
                        new EventMsg(8L, LocalDateTime.parse("2020-04-15 08:15:22", dateTimeFormatter), "f", "error", 2.30),
                        new EventMsg(9L, LocalDateTime.parse("2020-04-15 08:17:34", dateTimeFormatter), "f", "error", 1.00));

        SingleOutputStreamOperator<EventMsg> watermarks = dataStream.assignTimestampsAndWatermarks(
                // Maximum disorder degree
                WatermarkStrategy.<EventMsg>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner(
                                (SerializableTimestampAssigner<EventMsg>) (element, recordTimestamp) -> toEpochMilli(element.getEventTime()))
        );

        Pattern<EventMsg, ?> pattern = Pattern.<EventMsg>begin("start")
                .where(new SimpleCondition<EventMsg>() {
                    @Override
                    public boolean filter(EventMsg value) throws Exception {
                        return value.getEventName().equals("a");
                    }
                })
                .followedByAny("middle")
                .where(new SimpleCondition<EventMsg>() {
                    @Override
                    public boolean filter(EventMsg value) throws Exception {
                        return value.getEventName().equals("b");
                    }
                }).oneOrMore()
                .followedByAny("end")
                .where(new SimpleCondition<EventMsg>() {
                    @Override
                    public boolean filter(EventMsg value) throws Exception {
                        return value.getEventName().equals("c");
                    }
                });


        PatternStream<EventMsg> patternStream = CEP.pattern(watermarks, pattern);

        DataStream<String> alerts = patternStream.select(new PatternSelectFunction<EventMsg, String>() {
            @Override
            public String select(Map<String, List<EventMsg>> msgs) throws Exception {
                StringBuffer sb = new StringBuffer();
                msgs.forEach((k, v) -> {
                    sb.append(k + ",");
                    sb.append(v.toString() + "\n");
                });
                return sb.toString();
            }
        });


        alerts.print();
        env.execute("Flink CEP Test");
    }

    public static final ZoneOffset zoneOffset8 = ZoneOffset.of("+8");

    public static long toEpochMilli(LocalDateTime dt) {
        return dt.toInstant(zoneOffset8).toEpochMilli();
    }

    //Must be a static class with properties
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class EventMsg {
        public long eventId;
        public LocalDateTime eventTime;
        public String eventName;
        public String eventType;
        public double price;

        @Override
        public String toString() {
            return String.format("%s-%s-%s-%s-%s", eventId, eventName, eventType, eventTime, price);
        }
    }
}

3. Groups of patterns

  • You can define a pattern sequence as a condition for begin, followedBy, followedbany, and next
  • The pattern sequence is logically treated as a matching condition and returns a GrouPattern
  • You can apply oneOrMore(), times(#ofTimes), times(#fromTimes, #toTimes), optional(), consecutive(), allowCombinations() to GroupPattern
Pattern<Event, ?> start = Pattern.begin(
    Pattern.<Event>begin("start").where(...).followedBy("start_middle").where(...)
);

// strict contiguity
Pattern<Event, ?> strict = start.next(
    Pattern.<Event>begin("next_start").where(...).followedBy("next_middle").where(...)
).times(3);

// relaxed contiguity
Pattern<Event, ?> relaxed = start.followedBy(
    Pattern.<Event>begin("followedby_start").where(...).followedBy("followedby_middle").where(...)
).oneOrMore();

// non-deterministic relaxed contiguity
Pattern<Event, ?> nonDetermin = start.followedByAny(
    Pattern.<Event>begin("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
).optional();

appendix

Flink(1.13) FlinkCEP
Flink(1.13) FlinkCEP official website

Keywords: flink

Added by mikes127 on Wed, 08 Dec 2021 21:23:19 +0200