Flink practice tutorial - Advanced: CEP complex event processing

Author: Tencent cloud flow computing Oceanus team

Introduction to flow computing Oceanus

Stream computing Oceanus is a powerful tool for real-time analysis of big data product ecosystem. It is an enterprise level real-time big data analysis platform based on Apache Flink with the characteristics of one-stop development, seamless connection, sub second delay, low cost, security and stability. Stream computing Oceanus aims to maximize the value of enterprise data and accelerate the construction process of real-time digitization of enterprises.  

Flink CEP[1] is a complex event processing library implemented on the upper layer of Flink. This article will introduce you in detail how to use Flink CEP to handle complex events. The sample program uses the DataStream API to read the stock data in Kafka, find the low point of the stock price, complete the processing of complex events, and finally output the results to another Topic in Kafka.

Pre preparation

Create flow computing Oceanus cluster

Enter Oceanus console [2], click cluster management on the left, and click Create cluster on the top left. For details, please refer to Oceanus official document to create an exclusive cluster [3].  

Create Kafka Topic

Enter the CKafka console [4] and click [new] in the upper left corner to complete the creation of CKafka instances and create two topics, demo6 CEP source and demo6 CEP dest.

Developing DataStream jobs

1. New Maven project.  

Create a Maven project in the local IDEA and configure POM XML file. pom. The contents of the XML file are as follows:

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">   <modelVersion>4.0.0</modelVersion>
   <groupId>com.demos</groupId>   <artifactId>DemoCEP</artifactId>   <version>1.0-SNAPSHOT</version>
   <properties>       <maven.compiler.source>8</maven.compiler.source>       <maven.compiler.target>8</maven.compiler.target>   </properties>
   <dependencies>       <dependency>           <groupId>org.apache.flink</groupId>           <artifactId>flink-core</artifactId>           <version>1.13.2</version>           <scope>provided</scope>       </dependency>       <dependency>           <groupId>org.apache.flink</groupId>           <artifactId>flink-java</artifactId>           <version>1.13.2</version>           <scope>provided</scope>       </dependency>       <dependency>           <groupId>org.apache.flink</groupId>           <artifactId>flink-streaming-java_2.11</artifactId>           <version>1.13.2</version>           <scope>provided</scope>       </dependency>       <dependency>           <groupId>org.apache.flink</groupId>           <artifactId>flink-connector-kafka_2.11</artifactId>           <version>1.13.2</version>       </dependency>       <dependency>           <groupId>org.apache.flink</groupId>           <artifactId>flink-cep_2.11</artifactId>           <version>1.13.2</version>       </dependency>
       <!-- test -->       <dependency>           <groupId>org.apache.flink</groupId>           <artifactId>flink-clients_2.11</artifactId>           <version>1.13.2</version>           <scope>test</scope>       </dependency>   </dependencies>   <build>       <plugins>           <plugin>               <groupId>org.apache.maven.plugins</groupId>               <artifactId>maven-assembly-plugin</artifactId>               <version>3.2.0</version>               <configuration>                   <descriptorRefs>                       <descriptorRef>jar-with-dependencies</descriptorRef>                   </descriptorRefs>                   <archive>                       <manifest>                           <mainClass>com.demos.CEPTest</mainClass>                       </manifest>                   </archive>               </configuration>               <executions>                   <execution>                       <id>make-assembly</id>                       <phase>package</phase>                       <goals>                           <goal>single</goal>                       </goals>                   </execution>               </executions>           </plugin>       </plugins>   </build></project>

2. Coding

In the Flink DataStream job, the Stock POJO class is used to accept JSON format data from Kafka, the StockSerializerDeserializer class is used for serialization and deserialization, and CEPTest is the main program class. Stock class

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">   <modelVersion>4.0.0</modelVersion>
   <groupId>com.demos</groupId>   <artifactId>DemoCEP</artifactId>   <version>1.0-SNAPSHOT</version>
   <properties>       <maven.compiler.source>8</maven.compiler.source>       <maven.compiler.target>8</maven.compiler.target>   </properties>
   <dependencies>       <dependency>           <groupId>org.apache.flink</groupId>           <artifactId>flink-core</artifactId>           <version>1.13.2</version>           <scope>provided</scope>       </dependency>       <dependency>           <groupId>org.apache.flink</groupId>           <artifactId>flink-java</artifactId>           <version>1.13.2</version>           <scope>provided</scope>       </dependency>       <dependency>           <groupId>org.apache.flink</groupId>           <artifactId>flink-streaming-java_2.11</artifactId>           <version>1.13.2</version>           <scope>provided</scope>       </dependency>       <dependency>           <groupId>org.apache.flink</groupId>           <artifactId>flink-connector-kafka_2.11</artifactId>           <version>1.13.2</version>       </dependency>       <dependency>           <groupId>org.apache.flink</groupId>           <artifactId>flink-cep_2.11</artifactId>           <version>1.13.2</version>       </dependency>
       <!-- test -->       <dependency>           <groupId>org.apache.flink</groupId>           <artifactId>flink-clients_2.11</artifactId>           <version>1.13.2</version>           <scope>test</scope>       </dependency>   </dependencies>   <build>       <plugins>           <plugin>               <groupId>org.apache.maven.plugins</groupId>               <artifactId>maven-assembly-plugin</artifactId>               <version>3.2.0</version>               <configuration>                   <descriptorRefs>                       <descriptorRef>jar-with-dependencies</descriptorRef>                   </descriptorRefs>                   <archive>                       <manifest>                           <mainClass>com.demos.CEPTest</mainClass>                       </manifest>                   </archive>               </configuration>               <executions>                   <execution>                       <id>make-assembly</id>                       <phase>package</phase>                       <goals>                           <goal>single</goal>                       </goals>                   </execution>               </executions>           </plugin>       </plugins>   </build></project>

StockSerializerDeserializer class

// Serialization and deserialization classes public class stockserializerdeserializer implements serializationschema < stock >, deserializationschema < stock >{
   private final ObjectMapper mapper = new ObjectMapper();
   @Override   public byte[] serialize(Stock stock) {       try {           return mapper.writeValueAsBytes(stock);      } catch (JsonProcessingException e) {           throw new RuntimeException(e);      }  }
   @Override   public Stock deserialize(byte[] bytes) throws IOException {       return mapper.readValue(bytes, Stock.class);  }
   @Override   public boolean isEndOfStream(Stock secEvent) {       return false;  }
   @Override   public TypeInformation<Stock> getProducedType() {       return TypeExtractor.getForClass(Stock.class);  }}

CEPTest main program class

public class CEPTest {   public static void main(String[] args) {       // Setting the environment streaminexecutionenvironment streamemv = streaminexecutionenvironment getExecutionEnvironment();
       Properties properties = new Properties();       properties.setProperty("bootstrap.servers", "127.0.0.1:9092");       properties.setProperty("group.id", "test");       String intTopic = "demo6-cep-source";       FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<Stock>(intTopic, new StockSerializerDeserializer(), properties);       consumer.setStartFromLatest();
       // Add data source datastream < stock > input = streamemv addSource(consumer);
       // Define the pattern to match. That is, the low point of the stock pattern < stock,? > pattern = Pattern.< Stock>begin("start"). where(               new SimpleCondition<Stock>() {                   @Override                   public boolean filter(Stock stock) {                       return stock.getPrice() > 10;                  }              }      ). next("bottom"). where(               new SimpleCondition<Stock>() {                   @Override                   public boolean filter(Stock stock) {                       return stock.getPrice() < 10;                  }              }      ). next("up"). where(               new SimpleCondition<Stock>() {                   @Override                   public boolean filter(Stock Stock) {                       return Stock.getPrice() > 10;                  }              }      );
       DataStream<String> result = CEP.pattern(input, pattern)              .inProcessingTime()              .flatSelect(                      (p, o) -> {                           StringBuilder builder = new StringBuilder();                           builder.append("\n");                           builder.append(p.get("start").get(0))                                  .append(",\n")                                  .append(p.get("bottom").get(0))                                  .append(",\n")                                  .append(p.get("up").get(0));
                           o.collect(builder.toString());                      },                       Types.STRING);
       String topicOut = "demo6-cep-dest";       FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>(topicOut, new SimpleStringSchema(), properties);            // Output to Kafka topic result addSink(producer);        try {           streamEnv.execute();      }  catch (Exception e) {           e.printStackTrace();      }  }}

3. Project packaging

Use the built-in packaging tool Build Artifacts of IDEA or the command line to package. Command line packaging command:

mvn clean package

The Jar package generated after the command line packaging can be found in the project target directory. The Jar name is jar_demos-1.0-SNAPSHOT.jar.  

Flow calculation Oceanus job

1. Upload dependency

On the Oceanus console, click dependency management on the left, click New in the upper left corner to create a dependency, and upload the local Jar package.

2. Create job

On the Oceanus console, click [job management] on the left, click [new] in the upper left corner to create a new job, select Jar job as the job type, and click [development debugging] to enter the job editing page. [main package] select the dependency just uploaded and select the latest version. Refer to POM Fill in the main class in the XML file, and fill in com demos. CEPTest.  

3. Operation

Click [publish draft] to start the operation. You can view the operation information through the [log] panel TaskManager or Flink UI.

4. Analog data

Send data to topic demo6 CEP source through Kafka Client. Send command:

[root@VM-3-centos ~]# cd /opt/kafka_2.11-2.4.1/bin/[root@VM-3-centos bin]# bash kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic demo6-cep-source --producer.config ../config/producer.properties

Example of analog data:

{"symbol": "ACME", "rowtime": "2020-12-11 10:00:00", "price": 12, "tax":1}{"symbol": "ACME", "rowtime": "2020-12-11 10:00:01", "price": 9, "tax":2}{"symbol": "ACME", "rowtime": "2020-12-11 10:00:02", "price": 12, "tax":1}

For more access methods, please refer to CKafka messaging [5]

5. View operation results

View the received data in topic demo6 CEP dest to get the desired data.

summary

  1. When using CEP in DataStream, the equals() and hashCode() methods of POJO class must be implemented. Because Flink CEP will compare and match objects according to the equals() and hashCode() methods of POJO class.  
  2. To use CEP in Table SQL, please refer to pattern detection [6].  
  3. There is no need to package the flink core dependency when packaging, and the flow computing Oceanus platform has provided it.  

Reading reference

[1] Flink CEP (complex event handling): https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/libs/cep/

[2] Oceanus console: https://console.cloud.tencent.com/oceanus/overview

[3] Create an exclusive cluster: https://cloud.tencent.com/document/product/849/48298

[4] CKafka console: https://console.cloud.tencent.com/ckafka/index?rid=1

[5] CKafka sends and receives messages: https://cloud.tencent.com/document/product/597/54834

[6] Mode detection: https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/match_recognize/

Stream computing ↓ Oceanus ↓ Limited second kill exclusive activity is in progress ↓

Click "read the original text" at the end of the article to learn more about Tencent cloud computing Oceanus~

Tencent cloud big data

Long press QR code Focus on us

Added by JustFoo on Sat, 15 Jan 2022 15:18:12 +0200