3. Introduction to flink programming

1 Introduction to Flink programming

1.1 initialize Flink project template

1.1.1 preparations

Maven 3.0.4 and above and JDK 8 are required


1.1.2 using maven command to create java project template
  • Execute the maven command. If the maven local warehouse does not have dependent jar s, it needs to have a network
mvn archetype:generate  
-DarchetypeGroupId=org.apache.flink 
-DarchetypeArtifactId=flink-quickstart-java 
-DarchetypeVersion=1.12.0 
-DgroupId=cn._51doit.flink 
-DartifactId=flink-java 
-Dversion=1.0 
-Dpackage=cn._51doit.flink 
-DinteractiveMode=false

After the command is executed, the project appears in D:\code\java_code


  • Or execute the following commands on the command line. You need a network

curl https://flink.apache.org/q/quickstart.sh | bash -s 1.12.0


1.1.3 create scala project template with maven command
  • Execute the maven command. If the maven local warehouse does not have dependent jar s, it needs to have a network
mvn archetype:generate 
-DarchetypeGroupId=org.apache.flink 
-DarchetypeArtifactId=flink-quickstart-scala 
-DarchetypeVersion=1.12.0 
-DgroupId=cn._51doit.flink 
-DartifactId=flink-scala 
-Dversion=1.0 -Dpackage=cn._51doit.flink 
-DinteractiveMode=false



  • Or execute the following command on the command line

curl https://flink.apache.org/q/quickstart-scala.sh | bash -s 1.12.0


1.1.4 import maven project into IDEA or Eclipse


1.2 DataFlow programming model

Flink provides different levels of programming abstraction. By calling the abstract data set and calling the operator to build DataFlow, we can realize the streaming calculation and offline calculation of distributed data,

DataSet is an abstract data set of batch processing, and DataStream is an abstract data set of streaming computing. Their methods are Source, Transformation and Sink respectively;

  • Source is mainly responsible for reading data
  • Transformation is mainly responsible for data conversion
  • Sink is responsible for the final calculated result data output.


1.3 Flink's first entry program

1.3.1 real time WordCount

Read data from a socket port in real time, and then count the number of occurrences of the same word in real time. The program will run all the time. Before starting the program, use nc -lk 8888 to start a socket to send data

package cn._51doit.flink;

package cn._51doit.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
* Read data from the specified socket and calculate the word
*
*/
public class StreamingWordCount {
public static void main(String[] args) throws Exception {
//Create an execution environment for Flink streaming computing
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//Create DataStream
//Source
DataStream<String> lines = env.socketTextStream("cs-28-86",8888);
//Call Transformation to start
//Call Transformation
SingleOutputStreamOperator<String> wordsDataStream = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> collector) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
collector.collect(word);

}
}
});

//Combine words with 1
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = wordsDataStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1);
}
});

//grouping
KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> tp) throws Exception {
return tp.f0;
}
});

//polymerization
SingleOutputStreamOperator<Tuple2<String, Integer>> summed = keyed.sum(1);

//End of Transformation

//Call Sinnk
summed.print();

//Start execution
env.execute("StreamingWordCount");

}
}


Execute the Flink program written by yourself in IntelliJ IDEA and report an error caused by: Java lang.ClassNotFoundException: org. apache. flink. api. common. functions. FlatMapFunction

terms of settlement:


Local operation results:

The previous 4 2 1 is equivalent to Slots, which is represented as the logical core of the CPU in the local computer


1.3.2 LambdaWordCount

package cn._51doit.flink;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

/**
 * Lambda Write WordCount
 */

public class LambdaStreamingWordCount {
    public static void main(String[] args) throws Exception {
        //Localstreaminenvironment can only run in local mode and is usually used for local testing
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(8);

        DataStreamSource<String> lines = env.socketTextStream("cs-28-86", 8888);

        //Lambda expressions using java8
        SingleOutputStreamOperator<String> words = lines.flatMap((String line, Collector<String> out) ->
                Arrays.stream(line.split(" ")).forEach(out::collect)).returns(Types.STRING); //When using Lambda expressions, return information must be returned

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words.map(w -> Tuple2.of(w, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));

        SingleOutputStreamOperator<Tuple2<String, Integer>> summed = wordAndOne.keyBy(0).sum(1);

        //Printout
        summed.print();

        //Throw exception
        env.execute();

    }
}


1.3.2 scala WordCount

Add scala plug-in to IDEA:

Then restart IDEA;

Add the corresponding SDK and click OK:

----------------

package cn._51doit.flink

import org.apache.flink.streaming.api.scala._

object StreamingWordCount {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val lines: DataStream[String] = env.socketTextStream("cs-28-86", port = 8888)

    val wordAndOne: DataStream[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1))

    val result = wordAndOne.keyBy(_._1).sum(position = 1)

    result.print()

    env.execute()
  }

}


1.3.3 submit to run in the cluster

Change the host name and port to receive parameters:

package cn._51doit.flink;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

/**
 * Lambda Write WordCount
 */

public class LambdaStreamingWordCount {
    public static void main(String[] args) throws Exception {
        //LocalStreamEnvironment Only in local Mode operation, usually used for local testing
        //LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(8);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> lines = env.socketTextStream(args[0], Integer.parseInt(args[1]));

        //use java8 of Lambda expression
        SingleOutputStreamOperator<String> words = lines.flatMap((String line, Collector<String> out) ->
                Arrays.stream(line.split(" ")).forEach(out::collect)).returns(Types.STRING); //use Lambda Expression, with return Return information

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words.map(w -> Tuple2.of(w, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));

        SingleOutputStreamOperator<Tuple2<String, Integer>> summed = wordAndOne.keyBy(0).sum(1);

        //Printout
        summed.print();

        //Throw exception
        env.execute();

    }
}


Packaging procedure;


  • Upload jar to web

Start an nc first:


Submit:


Execution result:

Keywords: flink

Added by dtasman7 on Thu, 27 Jan 2022 21:02:53 +0200