Flink practice tutorial: introduction 9-Jar job development

Introduction to flow computing Oceanus

Stream computing Oceanus is a powerful tool for real-time analysis of big data product ecosystem. It is an enterprise level real-time big data analysis platform based on Apache Flink with the characteristics of one-stop development, seamless connection, sub second delay, low cost, security and stability. Stream computing Oceanus aims to maximize the value of enterprise data and accelerate the construction process of real-time digitization of enterprises.

Flink Jar job supports both DataStream API programming and Table API/SQL programming. Table API and SQL can also be easily integrated and embedded into DataStream programs. See Integration with DataStream API [1] In this chapter, learn how to convert DataStream to Table.

Flow computing Oceanus supports Flink Jar jobs and Flink SQL jobs. This article will introduce you in detail how to use Flink DataStream API to develop Jar jobs and run them on the flow computing Oceanus platform.

Pre preparation

Create flow computing Oceanus cluster

On the flow calculation Oceanus product activity page 1 yuan to buy Oceanus cluster.

get into Oceanus console [2] , click cluster management on the left and create cluster on the top left. For details, please refer to the official Oceanus documentation Create an exclusive cluster [3].

Create message queue CKafka

get into CKafka console [4] , click [new] in the upper left corner to complete the creation of CKafka. For details, please refer to CKafka create instance [5].

Create Topic:

Enter the CKafka instance and click [Topic Management] > [new] to complete the creation of topic. For details, please refer to CKafka create Topic [6].

Developing DataStream jobs

1. New Maven project.

Create a new Maven Project in the local IDEA and configure the pom.xml file. Pom.xml file is as follows:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
​
    <groupId>com.oceanus</groupId>
    <artifactId>jar_demos</artifactId>
    <version>1.0-SNAPSHOT</version>
​
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>
        <!-- Oceanus The platform comes with it flink-java,flink-streaming Equal dependence -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.13.2</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.13.2</version>
            <scope>provided</scope>
        </dependency>
        <!-- use Oceanus built-in Kafka Connector -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.13.2</version>
            <scope>provided</scope>
        </dependency>
​
        <!-- test -->
        <!-- flink-clients For local debugging -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.13.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
​
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>3.2.0</version>
                <configuration>
                    <!-- Set main class -->
                    <archive>
                        <manifestEntries>
                            <Main-Class>com.demos.HelloWorld</Main-Class>
                        </manifestEntries>
                    </archive>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

2. Coding

Flink DataStream job code is as follows:

package com.demos;
​
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;
​
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
​
​
public class HelloWorld {
    public static void main(String[] args) throws Exception {
        // 1. Set the operating environment
        StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
​
        List<Integer> data = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            data.add(i);
        }
​
        // 2. Configure the data source to read data
        // Predefined data sources support reading data from files, sockets and collections; The user-defined data source supports Kafka, MySQL, etc. to read data using the addSource() function
        DataStreamSource<List<Integer>> dataStream = sEnv.fromElements(data);
​
        // 3. Data processing
        DataStream ds = dataStream.flatMap(new FlatMapFunction<List<Integer>, String>() {
            @Override
            public void flatMap(List<Integer> value, Collector<String> out) throws Exception {
                value.forEach(v -> out.collect(v.toString()));
            }
        });
​
        // 4. Data output
        // The predefined destination supports writing data to files, standard output (stdout), standard error output (stderr) and socket s; The user-defined destination supports Kafka, MySQL, etc. to write out data using the addSink() function
        Properties sinkProps = new Properties();
        String hosts = "10.0.0.29:9092";
        sinkProps.setProperty("bootstrap.servers", hosts);
        String outTopic = "flink-demo9";
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer(outTopic, new SimpleStringSchema(), sinkProps);
        ds.addSink(producer);
        // ds.print();
​
        // 5. Implementation procedures
        sEnv.execute("helloworld");
    }
}

Pack Jar bag

Use the built-in packaging tool Build Artifacts of IDEA or the command line to package. Command line packaging command:

mvn clean package

The Jar package generated after the command line packaging can be found in the project target directory. The Jar name is jar_demos-1.0-SNAPSHOT.jar.

Flow calculation Oceanus job

1. Upload dependency

On the Oceanus console, click dependency management on the left, click New in the upper left corner to create a dependency, and upload the local Jar package.

2. Create job

On the Oceanus console, click [job management] on the left, click [new] in the upper left corner to create a new job, select Jar job as the job type, and click [development debugging] to enter the job editing page.

[main package] select the dependency just uploaded and select the latest version. Refer to pom.xml file to fill in the main class, and fill in com.demos.HelloWorld here.

3. Operation

Click [publish draft] to run. You can view the operation information through the [log] panel TaskManager or Flink UI.

summary

  1. DataStream jobs support various heterogeneous data sources and data destinations. The user-defined data source supports Kafka, MySQL, etc., and uses the addSource() function to read data; The custom destination supports Kafka, MySQL, etc., and writes out data using the addSink() function.
  2. There is no need to package the flink core dependency when packaging, and the flow computing Oceanus platform has provided it.

Reading reference

[1] Integration with DataStream API: https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/data_stream_api/

[2] Oceanus console: https://console.cloud.tencent.com/oceanus

[3] Create an exclusive cluster: https://cloud.tencent.com/document/product/849/48298

[4] CKafka console: https://console.cloud.tencent.com/ckafka

[5] CKafka create instance: https://cloud.tencent.com/document/product/597/54839

[6] Ckafka create Topic: https://cloud.tencent.com/document/product/597/54854

Keywords: flink

Added by rubio on Tue, 30 Nov 2021 04:42:23 +0200