Flink_ 09_ CEP (personal summary)

Use a pattern to match events in the data stream, similar to regular expressions

Patterns are rules for handling events

Pattern API:

API example & Select & flatselect:

Pattern<LoginEvent,LoginEvent> patternStart = Pattern.<LoginEvent>begin("start");
Pattern<LoginEvent,LoginEvent> wherePattern = patternStart.where((event)->return event.getName.startWith("foo"))
PatternStream<LoginEvent> patternStream = CEP.pattern(dataDs, wherePattern)

// Select the desired event according to the name of the individual mode and process it
patternStream.select(new PatternSelectFunction<LoginEvent,ComplexEvent>{
    public ComplexEvent select(Map<String,List<LoginEvent> map) throws Exception {
        // Map < string, List < loginevent > > i.e. "patternname" - > List set. For the singleton mode of individual mode, there is only one element in the List
        return xxx;

// Enhancement: timeout data beyond the within time range (whether it can be matched or not) can be put into the side output stream
        // When processing the timeout data, the return value data will be thrown into the side output stream
        new PatternTimeoutFunction<LoginEvent, ComplexEvent>( ) {
            public ComplexEvent timeout(Map<String, List<LoginEvent>> map, long ts) throws Exception {
                return null;
        // Process normal data
        new PatternSelectFunction<LoginEvent, ComplexEvent>( ) {
            public ComplexEvent select(Map<String, List<LoginEvent>> pattern) throws Exception {
                return null;
// For flatSelect, enhancements: out Collect to output any number of elements

Individual mode:

You can match an event directly

It is divided into single case mode and circular mode: single case mode can only receive one event, and circular mode can receive multiple events

Quantifier: Specifies the number of cycles

  1. .times(4)

    Match occurs 4 times

  2. .times(4).optional

    Matches occur 0 or 4 times

  3. .times(2,4)

    Matching occurs 2-4 times

  4. .times(2,4).greedy

    Matching occurs 2-4 times, and greedily matches as many as possible

  5. .oneorMore

    If the match occurs at least once, the termination condition until() should be brought

  6. .timesOrMore(2).optional.greedy

    Matches occur 0, 2, or more times, and greedily match as many times as possible, with the termination condition until()

Simple condition: pass where() to filter

patternStart.where((event)->return event.getName.startWith("foo"))

Combination condition: or or where combination of simple conditions, where continue Where is essentially an and operation

patternStart.where((event)->return event.getName.startWith("foo")).or((event)->return event.getName.startWith("stupid"))

Termination condition: for the specified number of cycles of xxxMore, the termination condition until() should be carried

Iteration condition: it can handle the previously received events

ctx.getEventsForPattern("name").where(( )->{ Logical processing })

Mode sequence (combined mode):

Combine many individual patterns

Must start with "start" mode begin(“start”)

**Strict nearest neighbor: * * all events appear in strict order, and other data cannot be interspersed in the middle; For example, a next b, then b must follow a to match

​ . next("otherPattern"), the parameter is another individual pattern

Loose close proximity: allow other data to be interspersed in the middle; For example, a followedby, B, as long as B is behind a, it doesn't have to follow closely

​ .followedBy("otherPattern")

Non deterministic relaxed nearest neighbor: further relax the conditions, and the previously matched events can be used again;

For example, a followedbany B, the event sequence [a,c,b1,b2] can be matched to {a,b1},{a,b2}

​ .followedByAny("otherPattern")

Other relationships:

  1. .notNext("otherPattern")

  2. .notFollowedBy("otherPattern")

be careful:

  • . notFollowedBy cannot be used at the end, otherwise it will never terminate

  • You can specify a time constraint for the pattern, so that you can have the opportunity to match within the specified time;


    • within does not care about the grouping of keyBy. It only recognizes the time and does not recognize whether it is the time of the same key; next and other conditions in the pattern sequence recognize key
    • The next and other conditions in the pattern sequence are calculated after the end of within time
    • It can be regarded as a window whose window size is within the length of time period. The difference is that it will be automatically sorted by timestamp and then calculated
  • Patterns of type notXXX cannot be decorated with optional

Mode group:

Generally, it's not used. It's a dolly. A pattern sequence is placed in the individual pattern as a condition

