Author: Tencent cloud flow computing Oceanus team
Introduction to flow computing Oceanus
Stream computing Oceanus is a powerful tool for real-time analysis of big data product ecosystem. It is an enterprise level real-time big data analysis platform based on Apache Flink with the characteristics of one-stop development, seamless connection, sub second delay, low cost, security and stability. Stream computing Oceanus aims to maximize the value of enterprise data and accelerate the construction process of real-time digitization of enterprises.
Flink Jar supports both DataStream API programming and Table API/SQL programming. Table API and SQL can also be easily integrated and embedded into DataStream programs. Please refer to the chapter [1] integrating with DataStream API to learn how to convert DataStream to table. Flow computing Oceanus supports Flink Jar jobs and Flink SQL jobs. This article will introduce you in detail how to use Flink DataStream API to develop Jar jobs and run them on the flow computing Oceanus platform.
Pre preparation
Create flow computing Oceanus cluster
Enter the flow calculation Oceanus console [2], click cluster management on the left, and click Create cluster on the top left. For details, please refer to the flow calculation Oceanus official document to create an exclusive cluster [3].
Create message queue CKafka
Enter the CKafka console [4] and click [new] in the upper left corner to complete the creation of CKafka. For details, refer to CKafka creation instance [5].
Create Topic:
Enter the CKafka instance and click [Topic Management] > [new] to complete the creation of topics. For details, please refer to CKafka create Topic [6].
Developing DataStream jobs
1. New Maven project
Create a new Maven Project in the local IDEA and configure the pom.xml file. Pom.xml file is as follows:
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi=" http://www.w3.org/2001/XMLSchema-instance " xsi:schemaLocation=" http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd "> <modelVersion>4.0.0</modelVersion> <groupId>com.oceanus</groupId> <artifactId>jar_ demos</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <!-- The Oceanus platform comes with flex Java Flink streaming and other dependencies -- > < dependency > < groupid > org. Apache. Flink < / groupid > < artifactid > Flink Java < / artifactid > < version > 1.13.2 < / version > < scope > provided < / scope > < / dependency > < dependency > < groupid > org. Apache. Flink < / groupid > < artifactid > Flink streaming Java_ 2.11</artifactId> <version>1.13.2</version> <scope>provided</scope> </dependency> <!-- Use Oceanus built-in Kafka connector -- > < dependency > < groupid > org. Apache. Flyk < / groupid > < artifactid > flyk connector Kafka_ 2.11</artifactId> <version>1.13.2</version> <scope>provided</scope> </dependency> <!-- test --> <!-- Flink clients is used for local debugging -- > < dependency > < groupid > org. Apache. Flink < / groupid > < artifactid > Flink clients_ 2.11</artifactId> <version>1.13.2</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>3.2.0</version> < configuration> <!-- Set main class -- > < archive > < manifestentries > < main class > com. Demos. HelloWorld < / main class > < / manifestentries > < / archive > < / configuration > < / plugin > < / plugins > < / build > < / Project >
2. Coding
Flink DataStream job code is as follows:
package com.demos;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import org.apache.flink.util.Collector;import java.util.ArrayList;import java.util.List;import java.util.Properties;public class HelloWorld { public static void main(String[] args) throws Exception { // 1. Set the running environment StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); List<Integer> data = new ArrayList<>(); For (int i = 0; I < 100; I + +) {data. Add (I);} / / 2. Configure data sources to read data. / / predefined data sources support reading data from files, sockets and collections; The user-defined data source supports Kafka, MySQL, etc. to read data using the addSource() function. Datastream source < list < integer > > datastream = SENV. Fromelements (data); / / 3. Data processing datastream DS = datastream. Flatmap (New flatmapfunction < list < integer >, string > () {@ override public void flatmap (list < integer > value, collector < string > out) throws exception {value. Foreach (V - > out. Collect (v.tostring());}}); / / 4. Data output / / the predefined destination supports writing data to files, standard output (stdout), standard error output (stderr) and sockets; The user-defined destination supports Kafka, MySQL, etc. to write out data using the addSink() function. Properties sinkprops = new properties(); String hosts = "10.0.0.29:9092"; sinkProps.setProperty("bootstrap.servers", hosts); String outTopic = "flink-demo9"; FlinkKafkaProducer<String> producer = new FlinkKafkaProducer(outTopic, new SimpleStringSchema(), sinkProps); ds.addSink(producer); // ds.print(); / / 5. Execute the program sEnv.execute("helloworld");}}
Pack Jar bag
Use the built-in packaging tool Build Artifacts of IDEA or the command line to package. Command line packaging command:
mvn clean package
The Jar package generated after the command line packaging can be found in the project target directory. The Jar name is jar_demos-1.0-SNAPSHOT.jar.
Flow calculation Oceanus job
1. Upload dependency
On the flow calculation Oceanus console, click dependency management on the left, click New in the upper left corner to create a dependency, and upload the local Jar package.
2. Create job
On the flow calculation Oceanus console, click [job management] on the left, click [new] in the upper left corner to create a new job, select Jar job as the job type, and click [development debugging] to enter the job editing page. [main package] select the dependency just uploaded and select the latest version. Refer to pom.xml file to fill in the main class, and fill in com.demos.HelloWorld here.
3. Operation
Click [publish draft] to run. You can view the operation information through the [log] panel TaskManager or Flink UI.
summary
-
DataStream jobs support various heterogeneous data sources and data destinations. The user-defined data source supports Kafka, MySQL, etc., and uses the addSource() function to read data; The custom destination supports Kafka, MySQL, etc., and writes out data using the addSink() function.
-
There is no need to package the flink core dependency when packaging, and the flow computing Oceanus platform has provided it.
Reading reference
[1] Integration with DataStream API: https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/data_stream_api/ [2] Flow calculation Oceanus console: https://console.cloud.tencent.com/oceanus/overview [3] Create an exclusive cluster: https://cloud.tencent.com/document/product/849/48298 [4] CKafka console: https://console.cloud.tencent.com/ckafka/index?rid=1 [5] CKafka create instance: https://cloud.tencent.com/document/product/597/54839 [6] Ckafka create Topic: https://cloud.tencent.com/document/product/597/54854
Flow computing Oceanus Limited second kill exclusive activity is hot ↓
Focus on Tencent public data, official account, technical exchange, latest activities, service exclusive Get~