Flink Core Programming

Flink Core Programming

1,Environment

When Flink Job submits to perform calculations, it first establishes a link with the Flink framework, that is, the current Flink runtime environment in which task can be scheduled to a different taskManager execution only if environmental information is available. This environment object is relatively simple to obtain.

Batch environment

ExecutionEnvironment benv = ExecutionEnvironment.getExecutionEnvironment();

Streaming Data Processing Environment

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

2,Source

The Flink framework can obtain data from different sources and submit the data to the framework for processing. We call the source of the data the data source.

Code demonstration:

// Define a sample class: Water level sensor: used to receive altitude data
// id:Sensor number
// ts:Timestamp
// vc:Altitude
public class WaterSensor {
    private String id;
    private Long ts;
    private Integer vc;

    public Watersensor() {
    }

    public Watersensor(String id, Long ts, Integer vc) {
        this.id = id;
        this.ts = ts;
        this.vc = vc;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public Long getTs() {
        return ts;
    }

    public void setTs(Long ts) {
        this.ts = ts;
    }

    public Integer getVc() {
        return vc;
    }

    public void setVc(Integer vc) {
        this.vc = vc;
    }

    @Override
    public String toString() {
        return "WaterSensor{" +
                "id='" + id + '\'' +
                ", ts=" + ts +
                ", vc=" + vc +
                '}';
    }
}

1) Read data from a collection

In general, data can be temporarily stored in memory and used as a data source after forming a special data structure. It is common for data structures to use set types here.

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

public class Flink02_Source_Collection {
    public static void main(String[] args) throws Exception {
        //Create execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //Specify Parallelism
        env.setParallelism(1);
        //Read data from a collection
        DataStreamSource<WaterSensor> collectionDs = env.fromCollection(Arrays.asList(
                new WaterSensor("ws_001", 1577844001L, 45),
                new WaterSensor("ws_002", 1577844015L, 45),
                new WaterSensor("ws_003", 1577844020L, 45)
        ));
        //Print
        collectionDs.print();
        //implement
        env.execute();
    }
}

2) Read data from files

Normally, we get data from storage media, and more often, log files are used as data sources.

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Flink03_Source_File {
    public static void main(String[] args) throws Exception {
        //Create execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        //Read data from a file
        DataStreamSource<String> fileDs = env.readTextFile("input/sensor-data.log");
        //Print
        fileDs.print();
        //implement
        env.execute();

    }
}

3) Read data from Kafka

As a message transmission queue, Kafka is distributed, high throughput, and easy to expand subject-based publish/subscribe messaging systems. In today's enterprise development, Kafka and Flink have become the preferred choice for building a real-time data processing system.

1. Introducing dependency on Kafka connectors
<dependency>
    <groupId>org.apache.flink</groupId>
   <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
(2) Code implementation reference
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

import java.util.Properties;

public class Flink04_Source_Kafka {
    public static void main(String[] args) throws Exception {
        //Create execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        //Read data from Kafka
        Properties properties = new Properties();
        //kafka cluster address
        properties.setProperty("bootstrap.servers", "hadoop102:9092");
        //Consumer Group
        properties.setProperty("group.id","consumer-group");
        //deserializer
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //Consumption strategy
        properties.setProperty("auto.offset.reset", "latest");
        
        DataStreamSource<String> kafkaDs = env.addSource(new FlinkKafkaConsumer011<String>(
                "sensor", 
                new SimpleStringSchema(), 
                properties));
        kafkaDs.print("kafka source");
        env.execute();
    }
}

4) Custom Data Source

In most cases, the previous data sources can already meet the needs, but there will inevitably be special cases, so flink also provides a way to customize the data sources.

import com.atguigu.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Random;

public class Flink05_Source_MySource {
    public static void main(String[] args) throws Exception {
        // 0. Create an execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // TODO 1.Source: Read from custom data sources
        DataStreamSource<WaterSensor> inputDS = env.addSource(new MySourceFunction());

        inputDS.print();

        env.execute();
    }


    /**
     * Custom Data Source
     * 1. Implement SourceFunction, specifying the type of output
     * 2. Override two methods
     * run():
     * cancel():
     */
    public static class MySourceFunction implements SourceFunction<WaterSensor> {
        // Define a flag bit to control the generation of data
        private boolean flag = true;

        @Override
        public void run(SourceContext<WaterSensor> ctx) throws Exception {
            Random random = new Random();
            while (flag) {
                ctx.collect(
                        new WaterSensor(
                                "sensor_" + random.nextInt(3),
                                System.currentTimeMillis(),
                                random.nextInt(10) + 40
                        )
                );
                Thread.sleep(2000L);
            }
        }

        @Override
        public void cancel() {
            this.flag = false;
        }
    }
}

3,Transform

1)map

Mapping: Converting data from a data stream to form a new data stream, consuming an element, and producing an element

Parameter: lambda expression or MapFunction implementation class

Sick Back: DataStream

//lambda
SingleOutputStreamOperator<WaterSensor> sensorDS = inputDS
                .map((MapFunction<String, WaterSensor>) value -> {
                    String[] datas = value.split(",");
                    return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
                });
import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Random;

public class Flink06_Transform_Map {
    public static void main(String[] args) throws Exception {
        // 0. Create an execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 1. Read data from files
        DataStreamSource<String> inputDS = env.readTextFile("input/sensor-data.log");

        // 2.Transform: Map converts to entity objects
        SingleOutputStreamOperator<WaterSensor> sensorDS = inputDS.map(new MyMapFunction());

        // 3. Printing
        sensorDS.print();

        env.execute();
    }

    /**
     * Implement the MapFunction interface and define generics (inputs, outputs)
     * Override map method
     */
    public static class MyMapFunction implements MapFunction<String, WaterSensor> {

        @Override
        public WaterSensor map(String value) throws Exception {
            String[] datas = value.split(",");
            return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
        }
    }
}

All Flink function classes have Rich versions. It differs from regular functions in that it captures the context of the running environment and has some lifecycle methods, so more complex functionality can be achieved. It also means providing more, richer features. For example: RichMapFunction

import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Flink07_Transform_RichMapFunction {
    public static void main(String[] args) throws Exception {

        // 0. Create an execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 1. Read data from files
        DataStreamSource<String> inputDS = env.readTextFile("input/sensor-data.log");
//        DataStreamSource<String> inputDS = env.socketTextStream("localhost",9999);

        // 2.Transform: Map converts to entity objects
        SingleOutputStreamOperator<WaterSensor> sensorDS = inputDS.map(new MyRichMapFunction());

        // 3. Printing
        sensorDS.print();

        env.execute();
    }

    /**
     * Inherit RichMapFunction, specify the type of input, return type
     * open() and close() lifecycle management methods are provided
     * Ability to get runtime context object= Ability to get environment information such as status, task information, etc.
     */
    public static class MyRichMapFunction extends RichMapFunction<String, WaterSensor> {
        @Override
        public WaterSensor map(String value) throws Exception {
            String[] datas = value.split(",");
            return new WaterSensor(getRuntimeContext().getTaskName() + datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
        }


        @Override
        public void open(Configuration parameters) throws Exception {
            System.out.println("open...");
        }

        @Override
        public void close() throws Exception {
            System.out.println("close...");
        }
    }
}

*Rich Function******** has a life cycle concept. Typical lifecycle approaches are:*

The open() method is the initialization method of rich function and is called before an operator such as map or filter is called

The close() method is the last method called in the life cycle and does some cleanup

The getRuntimeContext() method provides some information about the RuntimeContext of a function, such as the parallelism of the function execution, the name of the task, and the state state

2)flatMap

Flat Mapping: Split the whole data stream into individual uses, consume one element, and produce zero to multiple elements

Parameter: lambda expression or FlatMapFunction implementation class

Sick Back: DataStream

import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;
import java.util.List;

public class Flink08_Transform_FlatMap {
    public static void main(String[] args) throws Exception {
        // 0. Create an execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 1. Read data from files
        DataStreamSource<List<Integer>> inputDS = env.fromCollection(
                Arrays.asList(
                        Arrays.asList(1, 2, 3, 4),
                        Arrays.asList(5, 6, 7, 8)
                )
        );
        // 2.Transform: Convert FlatMap to Entity Object
        inputDS
                .flatMap(new FlatMapFunction<List<Integer>, Integer>() {
                    @Override
                    //The data coming in is List{List(1, 2, 3, 4),List(5, 6, 7, 8)}
                    public void flatMap(List<Integer> value, Collector<Integer> out) throws Exception {
                       //Traverse through each List that comes in
                        for (Integer number : value) {
                            out.collect(number + 10);
                        }
                    }
                })
                .print();//10 20 30 40 50 ..........
        env.execute();
    }
}

3)filter

Sick filter: Retain data that meets the condition (true) according to the specified rules, and discard data that does not meet the condition (false)

Sick parameter: Scala anonymous function or FilterFunction

Sick Back: DataStream

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;
import java.util.List;

/**
 * TODO
 *
 * @author cjp
 * @version 1.0
 * @date 2020/9/16 15:29
 */
public class Flink09_Transform_Filter {
    public static void main(String[] args) throws Exception {
        // 0. Create an execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 1. Read data from files
        DataStreamSource<Integer> inputDS = env.fromCollection(
                Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8)
        );

        // TODO Transform: filter =>reserved for true, discarded for false
        inputDS
                .filter(new MyFilterFunction())
                .print(); // 2,4,6,8

        env.execute();
    }

    public static class MyFilterFunction implements FilterFunction<Integer> {

        @Override
        public boolean filter(Integer value) throws Exception {
            return value % 2 == 0;
        }
    }
}

4)keyby

There is a GroupBy operator in Spark that groups data according to specified rules, and a similar function in flink is keyBy, which shunts data according to specified keys

Sick shunt: Elements are sent to different partitions according to the hashcode of the specified Key, and the same Key is divided into one partition (in this case, one of several parallel nodes of the downstream operator). keyBy() is partitioned by hashing

Sick parameter: Scala anonymous function or POJO property or tuple index, cannot use array

Click Back: KeyedStream

package com.atguigu.chapter05;

import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;


public class Flink10_Transform_KeyBy {
    public static void main(String[] args) throws Exception {

        // 0. Create an execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 1. Read data from files
        DataStreamSource<String> inputDS = env.readTextFile("input/sensor-data.log");

        // 2.Transform: Map converts to entity objects
        SingleOutputStreamOperator<WaterSensor> sensorDS = inputDS.map(new Flink06_Transform_Map.MyMapFunction());

        // TODO Keyby: Grouping
        // The type of key returned by the location index or field name is undetermined, so it returns to Tuple, which is cumbersome to use later on
        // By explicitly specifying the key, the key you get is the specific type=>Implement KeySelector or lambda
        // Grouping is a logical grouping, i.e. labeling each data (which grouping it belongs to), rather than changing the degree of parallelism

//        sensorDS.keyBy(0).print();
//        KeyedStream<WaterSensor, Tuple> sensorKSByFieldName = sensorDS.keyBy("id");
        KeyedStream<WaterSensor, String> sensorKSByKeySelector = sensorDS.keyBy(new MyKeySelector());

//        KeyedStream<WaterSensor, String> waterSensorStringKeyedStream = sensorDS.keyBy(r -> r.getId());

        env.execute();
    }


    public static class MyKeySelector implements KeySelector<WaterSensor, String> {

        @Override
        public String getKey(WaterSensor value) throws Exception {
            return value.getId();
        }
    }
}

5)shuffle

Shuffle shuffle: scatter data randomly downstream

Sick parameter: none

Sick Back: DataStream

package com.atguigu.chapter05;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Flink11_Transform_Shuffle {
    public static void main(String[] args) throws Exception {
        // 0. Create an execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        // 1. Read data from files
        DataStreamSource<String> inputDS = env.readTextFile("input/sensor-data.log");
        inputDS.print("input");

        DataStream<String> resultDS = inputDS.shuffle();
        resultDS.print("shuffle");

        env.execute();
    }
}

6)split

In some cases, we need to split data streams into two or more streams based on certain characteristics, adding tags to different streams to facilitate removal from them.

import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;


public class Flink12_Transform_Split {
    public static void main(String[] args) throws Exception {
        // 0. Create an execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 1. Read data from files
        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .readTextFile("input/sensor-data.log")
                .map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String value) throws Exception {
                        String[] datas = value.split(",");
                        return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
                    }
                });

        // TODO Split: Water level below 50 normal, water level [50,80] warning, water level above 80 warning
        // split does not really separate streams, it tags data
        SplitStream<WaterSensor> splitSS = sensorDS.split(new OutputSelector<WaterSensor>() {
                                                              @Override
                                                              public Iterable<String> select(WaterSensor value) {
                                                                  if (value.getVc() < 50) {
                                                                      return Arrays.asList("normal");
                                                                  } else if (value.getVc() < 80) {
                                                                      return Arrays.asList("warn");
                                                                  } else {
                                                                      return Arrays.asList("alarm");
                                                                  }
                                                              }
                                                          }
        );

        env.execute();
    }
}

7)select

After split ting a data stream, how do you remove different tags from the stream? The select operator is needed.

import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;


public class Flink13_Transform_Select {
    public static void main(String[] args) throws Exception {
        // 0. Create an execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 1. Read data from files
        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .readTextFile("input/sensor-data.log")
                .map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String value) throws Exception {
                        String[] datas = value.split(",");
                        return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
                    }
                });

        // TODO Split: Water level below 50 normal, water level [50,80] warning, water level above 80 warning
        // split does not really separate streams
        SplitStream<WaterSensor> splitSS = sensorDS.split(new OutputSelector<WaterSensor>() {
                                                              @Override
                                                              public Iterable<String> select(WaterSensor value) {
                                                                  if (value.getVc() < 50) {
                                                                      return Arrays.asList("normal","happy","hi");
                                                                  } else if (value.getVc() < 80) {
                                                                      return Arrays.asList("warn","happy");
                                                                  } else {
                                                                      return Arrays.asList("alarm");
                                                                  }
                                                              }
                                                          }
        );

        //TODO select
        // Get the corresponding stream from the previous label signature
        // A stream can have more than one name, and when it's taken out, a given name will do
        splitSS.select("normal").print("normal");
//        splitSS.select("hi").print("normal");
        splitSS.select("happy").print("warn");
//        splitSS.select("warn").print("warn");
//        splitSS.select("alarm").print("alarm");


        env.execute();
    }
}

8)connect

In some cases, we need to connect data streams from two different sources to match data, such as order payment and third-party transaction information, which comes from different data sources. After the connection, order payment and third-party transaction information are reconciled so that real payments can be calculated.

The connector operator in Flink connects two streams that hold their type of data. After they are connected by Connect, they are only placed in the same stream, keeping their data and form unchanged internally, and the two streams are independent of each other.

import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

import java.util.Arrays;

public class Flink14_Transform_Connect {
    public static void main(String[] args) throws Exception {
        // 0. Create an execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 1. Read data from files
        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .readTextFile("input/sensor-data.log")
                .map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String value) throws Exception {
                        String[] datas = value.split(",");
                        return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
                    }
                });

        // Get another stream
        DataStreamSource<Integer> numDS = env.fromCollection(Arrays.asList(1, 2, 3, 4));

        // TODO uses connect to connect two streams
        // Two stream data types can be different
        // Only two streams can be connected
        // Separate processing is also used when processing data
        ConnectedStreams<WaterSensor, Integer> sensorNumCS = sensorDS.connect(numDS);

        // Call other operators
        SingleOutputStreamOperator<Object> resultDS = sensorNumCS.map(
                new CoMapFunction<WaterSensor, Integer, Object>() {

                    @Override
                    public String map1(WaterSensor value) throws Exception {
                        return value.toString();
                    }

                    @Override
                    public Integer map2(Integer value) throws Exception {
                        return value + 10;
                    }
                });

        resultDS.print();
        env.execute();
    }
}

9)union

union operations on two or more DataStreams result in a new DataStream containing all DataStream elements.

import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

import java.util.Arrays;


public class Flink15_Transform_Union {
    public static void main(String[] args) throws Exception {
        // 0. Create an execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // Get Stream
        DataStreamSource<Integer> numDS = env.fromCollection(Arrays.asList(1, 2, 3, 4));
        DataStreamSource<Integer> numDS1 = env.fromCollection(Arrays.asList(11, 12, 13, 14));
        DataStreamSource<Integer> numDS2 = env.fromCollection(Arrays.asList(21, 22, 23, 24));

        //TODO Union Connection Flow
        // Require that streams have the same data type
        // Can connect multiple streams
        DataStream<Integer> unionDS = numDS
                .union(numDS1)
                .union(numDS2);

        unionDS
                .map(new MapFunction<Integer, Integer>() {
                    @Override
                    public Integer map(Integer value) throws Exception {
                        return value * 10;
                    }
                })
                .print("union");
        env.execute();
    }
}

4,Opertor

1) Rolling aggregate operator

These operators aggregate for each branch of ****KeyedStream*. When the execution is complete, the aggregated results are synthesized into a stream and returned, so the results are *DataStream****

import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class Flink16_Transform_RollingAgg {
    public static void main(String[] args) throws Exception {

        // 0. Create an execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 1. Read data from files
        DataStreamSource<String> inputDS = env
//                .readTextFile("input/sensor-data.log");
                .socketTextStream("localhost",9999 );

        // 2.Transform: Map converts to entity objects
        SingleOutputStreamOperator<WaterSensor> sensorDS = inputDS.map((MapFunction<String, WaterSensor>) value -> {
                    String[] datas = value.split(",");
                    return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
                });

        // 3. Group by id
        KeyedStream<Tuple3<String, Long, Integer>, String> sensorKS = sensorDS
                .map(new MapFunction<WaterSensor, Tuple3<String, Long, Integer>>() {
                    @Override
                    public Tuple3<String, Long, Integer> map(WaterSensor value) throws Exception {
                        return new Tuple3<>(value.getId(), value.getTs(), value.getVc());
                    }
                })
                .keyBy( r -> r.f0);

        // TODO rolling aggregation operator: one, one, one output
//        sensorKS.sum(2).print("sum");
        sensorKS.max(2).print("max");
//        sensorKS.min(2).print("min");

        env.execute();
    }


    public static class MyKeySelector implements KeySelector<WaterSensor, String> {

        @Override
        public String getKey(WaterSensor value) throws Exception {
            return value.getId();
        }
    }

}

2)reduce

An aggregation operation of a grouped data stream that combines the current elements with the results of the last aggregation, yielding a new value that returns a stream containing the results of each aggregation, rather than only returning the final result of the last aggregation.

import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class Flink17_Transform_Reduce {
    public static void main(String[] args) throws Exception {

        // 0. Create an execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 1. Read data from files
        DataStreamSource<String> inputDS = env
//                .readTextFile("input/sensor-data.log");
                .socketTextStream("localhost", 9999);

        // 2.Transform: Map converts to entity objects
        SingleOutputStreamOperator<WaterSensor> sensorDS = inputDS.map(new Flink06_Transform_Map.MyMapFunction());

        // 3. Group by id
        KeyedStream<Tuple3<String, Long, Integer>, String> sensorKS = sensorDS
                .map(new MapFunction<WaterSensor, Tuple3<String, Long, Integer>>() {
                    @Override
                    public Tuple3<String, Long, Integer> map(WaterSensor value) throws Exception {
                        return new Tuple3<>(value.getId(), value.getTs(), value.getVc());
                    }
                })
                .keyBy(r -> r.f0);

        // TODO Reduce
        // 1. Enter the same type and output the same type
        // 2. Data from the first item will not enter reduce
        // 3. Help us save the intermediate state
        sensorKS
                .reduce(
                        new ReduceFunction<Tuple3<String, Long, Integer>>() {
                            @Override
                            public Tuple3<String, Long, Integer> reduce(Tuple3<String, Long, Integer> value1, Tuple3<String, Long, Integer> value2) throws Exception {
                                System.out.println(value1.toString() + " <-> " + value2.toString());
                                return Tuple3.of("aaa", 123L, value1.f2 + value2.f2);
                            }
                        }
                )
                .print("reduce");

        env.execute();
    }

    public static class MyKeySelector implements KeySelector<WaterSensor, String> {

        @Override
        public String getKey(WaterSensor value) throws Exception {
            return value.getId();
        }
    }
}

3)process

Flink shunts the data flow through keyBy. If you want to get information about the environment during the process, you can use the process operator to customize it.

import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;


public class Flink18_Transform_Process {
    public static void main(String[] args) throws Exception {

        // 0. Create an execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 1. Read data from files
        DataStreamSource<String> inputDS = env
                .readTextFile("input/sensor-data.log");
//                .socketTextStream("localhost", 9999);

        // 2.Transform: Map converts to entity objects
        SingleOutputStreamOperator<WaterSensor> sensorDS = inputDS.map((MapFunction<String, WaterSensor>) value -> {
                    String[] datas = value.split(",");
                    return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
                });

        // 3. Group by id
        KeyedStream<Tuple3<String, Long, Integer>, String> sensorKS = sensorDS
                .map(new MapFunction<WaterSensor, Tuple3<String, Long, Integer>>() {
                    @Override
                    public Tuple3<String, Long, Integer> map(WaterSensor value) throws Exception {
                        return new Tuple3<>(value.getId(), value.getTs(), value.getVc());
                    }
                })
                .keyBy(r -> r.f0);

        // TODO Process
        // Some environmental information is available
        sensorKS.process(
                new KeyedProcessFunction<String, Tuple3<String, Long, Integer>, String>() {
                    /**
                     * How to process data: Processing one at a time
                     * @param value One Data
                     * @param ctx   context
                     * @param out   collector
                     * @throws Exception
                     */
                    @Override
                    public void processElement(Tuple3<String, Long, Integer> value, Context ctx, Collector<String> out) throws Exception {
                        out.collect("current key=" + ctx.getCurrentKey() + "current time=" + ctx.timestamp() + ",data=" + value);
                    }
                }
        )
                .print("process");


        env.execute();
    }

    public static class MyKeySelector implements KeySelector<WaterSensor, String> {

        @Override
        public String getKey(WaterSensor value) throws Exception {
            return value.getId();
        }
    }

}

5,Sink

Sink means sink. The so-called Sink in Flink can actually mean storing data or expanding the scope, meaning that the print method that we used all the time before sending the processed data to the output of the specified storage system is actually a Sink.

1)Kafka Sink

import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.util.Collector;


public class Sink_Kafka {
    public static void main(String[] args) throws Exception {

        // 0. Create an execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 1. Read data from files
        DataStreamSource<String> inputDS = env
                .readTextFile("input/sensor-data.log");
//                .socketTextStream("localhost", 9999);


        //TODO data Sink to Kafka
        // DataStream call addSink =) Note that it is not env call
        inputDS.addSink(
                new FlinkKafkaProducer011<String>(
                        "hadoop102:9092",
                        "sensor0421",
                        new SimpleStringSchema())
        );


        env.execute();
    }
}

2)Redis Sink

Let's send the processed data to the Redis cache database

import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;


public class Flink20_Sink_Redis {
    public static void main(String[] args) throws Exception {

        // 0. Create an execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 1. Read data from files
        DataStreamSource<String> inputDS = env
                .readTextFile("input/sensor-data.log");
//                .socketTextStream("localhost", 9999);

        //TODO data Sink to Redis

        FlinkJedisPoolConfig jedisConfig = new FlinkJedisPoolConfig.Builder()
                .setHost("hadoop102")
                .setPort(6379)
                .build();

        inputDS.addSink(
                new RedisSink<String>(
                        jedisConfig,
                        new RedisMapper<String>() {
                            // Command for redis: key is the outermost key
                            @Override
                            public RedisCommandDescription getCommandDescription() {
                                return new RedisCommandDescription(RedisCommand.HSET,"sensor0421");
                            }

                            // Hash type: This specifies the hash key
                            @Override
                            public String getKeyFromData(String data) {
                                String[] datas = data.split(",");
                                return datas[1];
                            }

                            // Hash type: This specifies the value of hash
                            @Override
                            public String getValueFromData(String data) {
                                String[] datas = data.split(",");
                                return datas[2];
                            }
                        }
                )
        );

        env.execute();
    }
}

3)ElasticSearch Sink

Let's send the processed data to the ElasticSearch search server.

import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class Flink21_Sink_ES {
    public static void main(String[] args) throws Exception {

        // 0. Create an execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 1. Read data from files
        DataStreamSource<String> inputDS = env
                .readTextFile("input/sensor-data.log");
//                .socketTextStream("localhost", 9999);


        //TODO data Sink to ES

        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("hadoop102",9200));
        httpHosts.add(new HttpHost("hadoop103",9200));
        httpHosts.add(new HttpHost("hadoop104",9200));

        ElasticsearchSink<String> esSink = new ElasticsearchSink.Builder<String>(
                httpHosts,
                new ElasticsearchSinkFunction<String>() {
                    @Override
                    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                        // Place data in Map
                        Map<String, String> dataMap = new HashMap<>();
                        dataMap.put("data", element);
                        // Create IndexRequest = specify index, type, source
                        IndexRequest indexRequest = Requests.indexRequest("sensor0421").type("reading").source(dataMap);
                        // Add to RequestIndexer
                        indexer.add(indexRequest);
                    }
                }
        )
                .build();

        inputDS.addSink(esSink);


        env.execute();
    }
}

4) Custom Sink

If Flink does not provide a connector that we can use directly, what do we do if we want to store the data in our own storage device? No problem, Flink offers a custom Sink, and you decide how to store it yourself.

import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;


public class Flink22_Sink_MySQL {
    public static void main(String[] args) throws Exception {

        // 0. Create an execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 1. Read data from files
        DataStreamSource<String> inputDS = env
                .readTextFile("input/sensor-data.log");
//                .socketTextStream("localhost", 9999);


        //TODO data Sink to custom MySQL
        inputDS.addSink(
                new RichSinkFunction<String>() {

                    private Connection conn = null;
                    private PreparedStatement pstmt = null;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        conn = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/test", "root", "000000");
                        pstmt = conn.prepareStatement("INSERT INTO sensor VALUES (?,?,?)");
                    }

                    @Override
                    public void close() throws Exception {
                        pstmt.close();
                        conn.close();
                    }

                    @Override
                    public void invoke(String value, Context context) throws Exception {
                        String[] datas = value.split(",");
                        pstmt.setString(1, datas[0]);
                        pstmt.setLong(2, Long.valueOf(datas[1]));
                        pstmt.setInt(3, Integer.valueOf(datas[2]));
                        pstmt.execute();
                    }
                }

        );

        env.execute();
    }

}

6. Case Practice

1) Network traffic statistics based on buried log data

//Read log data and convert to JavaBean object for easy operation
public class UserBehavior {
    private Long userId;    //User ID
    private Long itemId;    //commodity
    private Integer categoryId; //Commodity Category ID
    private String behavior;    //Behavior Type
    private Long timestamp; //time stamp

    public Long getUserId() {
        return userId;
    }

    public void setUserId(Long userId) {
        this.userId = userId;
    }

    public Long getItemId() {
        return itemId;
    }

    public void setItemId(Long itemId) {
        this.itemId = itemId;
    }

    public Integer getCategoryId() {
        return categoryId;
    }

    public void setCategoryId(Integer categoryId) {
        this.categoryId = categoryId;
    }

    public String getBehavior() {
        return behavior;
    }

    public void setBehavior(String behavior) {
        this.behavior = behavior;
    }

    public Long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }

    public UserBehavior(Long userId, Long itemId, Integer categoryId, String behavior, Long timestamp) {
        this.userId = userId;
        this.itemId = itemId;
        this.categoryId = categoryId;
        this.behavior = behavior;
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "UserBehavior{" +
                "userId=" + userId +
                ", itemId=" + itemId +
                ", categoryId=" + categoryId +
                ", behavior='" + behavior + '\'' +
                ", timestamp=" + timestamp +
                '}';
    }
}

* Statistics of total site views (PV s)*

One of the easiest metrics for measuring Web site traffic is the amount of page views (PV) on your site. Each time a user opens a page, a PV is recorded, and multiple times the same page is opened, the total number of views is accumulated. Generally speaking, PV is proportional to the number of visitors, but PV does not directly determine the actual number of visitors to a page, just as a visitor can create a very high PV by constantly refreshing a page. Next we will use the Flink operator we learned earlier to implement the statistics in PV.

Implement PV statistics with WordCount ideas

import bean.UserBehavior;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Case_PV {
    public static void main(String[] args) throws Exception {
        //Create execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        //Read data
        DataStreamSource<String> inputDs = env.readTextFile("input/UserBehavior.csv");

        //Convert read data into bean objects
        SingleOutputStreamOperator<UserBehavior> userBehaviorDs = inputDs.map(new MapFunction<String, UserBehavior>() {
            @Override
            public UserBehavior map(String value) throws Exception {
                String[] datas = value.split(",");
                return new UserBehavior(
                        Long.valueOf(datas[0]),
                        Long.valueOf(datas[1]),
                        Integer.valueOf(datas[2]),
                        datas[3],
                        Long.valueOf(datas[4])
                );
            }
        });
        //Implement PV statistics by referring to wordcount ideas
        //Processing data

        //1. Filter out pv behavior
        SingleOutputStreamOperator<UserBehavior> userBehaviorDsfilter = userBehaviorDs.filter(new FilterFunction<UserBehavior>() {
            @Override
            public boolean filter(UserBehavior value) throws Exception {
                return "pv".equals(value.getBehavior());
            }
        });

        //2. Transforming data structures
        SingleOutputStreamOperator<Tuple2<String, Integer>> pvAndTuple2 = userBehaviorDsfilter.map(new MapFunction<UserBehavior, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(UserBehavior value) throws Exception {
                return Tuple2.of("pv", 1);
            }
        });

        //3. Grouping by elements in the first position
        KeyedStream<Tuple2<String, Integer>, Tuple> pvAndOneKs = pvAndTuple2.keyBy(0);

        //4. Sum
        SingleOutputStreamOperator<Tuple2<String, Integer>> pvDs = pvAndOneKs.sum(1);

        //Print
        pvDs.print("pv");


        env.execute();
    }
}

Process

import bean.UserBehavior;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

public class Case_PV2 {
    public static void main(String[] args) throws Exception {
        //Create execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        //get data
        DataStreamSource<String> inputDs = env.readTextFile("input/UserBehavior.csv");
        //Transform data structure
//
        SingleOutputStreamOperator<UserBehavior> userBeavirDs = inputDs.map((MapFunction<String, UserBehavior>) value -> {
            String[] datas = value.split(",");
            return new UserBehavior(
                    Long.valueOf(datas[0]),
                    Long.valueOf(datas[1]),
                    Integer.valueOf(datas[2]),
                    datas[3],
                    Long.valueOf(datas[4])
            );
        });
        //Filter data
        SingleOutputStreamOperator<UserBehavior> userBeavirfilter = userBeavirDs.filter(data -> "pv".equals(data.getBehavior()));

        //Grouping by dimension
        KeyedStream<UserBehavior, String> userBehaviorKs = userBeavirfilter.keyBy(data -> data.getBehavior());
        //Summation
        SingleOutputStreamOperator<Long> resultDS = userBehaviorKs.process(new KeyedProcessFunction<String, UserBehavior, Long>() {

            //Define a variable to calculate the number of statistical bars
            private Long pvcount = 0L;

            /**
             *
             * @param valuer
             * @param ctx
             * @param out
             * @throws Exception
             */
            @Override
            public void processElement(UserBehavior valuer, Context ctx, Collector<Long> out) throws Exception {
                pvcount++;
                out.collect(pvcount);
            }
        });
        inputDs.print("pv");

        env.execute();
    }
}

FlatMap

import com.atguigu.bean.UserBehavior;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

/**
 * TODO
 *
 * @author cjp
 * @version 1.0
 * @date 2020/9/16 15:29
 */
public class Flink25_Case_PVByFlatmap {
    public static void main(String[] args) throws Exception {

        // 0. Create an execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 1. Read data from a file and convert it to a bean object
        env
                .readTextFile("input/UserBehavior.csv")
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        String[] datas = value.split(",");
                        if ("pv".equals(datas[3])) {
                            out.collect(Tuple2.of("pv", 1));
                        }
                    }
                })
                .keyBy(0)
                .sum(1)
                .print("pv by flatmap");


        env.execute();
    }

}

* Statistics of the number of independent visitors (UV) to the website*

In the previous case, we counted all the browsing behavior of all users on the page, that is, the browsing behavior of the same user would be repeated. In practical applications, we often focus on how many different users have visited the site, so another important indicator of traffic is the number of independent visitors (Unique Visitor, UV) to the site.

import com.atguigu.bean.UserBehavior;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.util.HashSet;
import java.util.Set;

public class Flink26_Case_UV {
    public static void main(String[] args) throws Exception {

        // 0. Create an execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 1. Read data from a file and convert it to a bean object
        SingleOutputStreamOperator<UserBehavior> userBehaviorDS = env
                .readTextFile("input/UserBehavior.csv")
                .map(new MapFunction<String, UserBehavior>() {

                    @Override
                    public UserBehavior map(String value) throws Exception {
                        String[] datas = value.split(",");
                        return new UserBehavior(
                                Long.valueOf(datas[0]),
                                Long.valueOf(datas[1]),
                                Integer.valueOf(datas[2]),
                                datas[3],
                                Long.valueOf(datas[4])
                        );
                    }
                });

        // TODO for UV statistics: weighting userId, statistics
        // => userId exists in a Set
        // =>Calculate the number of elements in the Set, which is the UV value

        // 2. Processing data
        // 2.1 Filter out pv behavior=> UV is the weight of pv, so behavior is pv
        SingleOutputStreamOperator<UserBehavior> userBehaviorFilter = userBehaviorDS.filter(data -> "pv".equals(data.getBehavior()));
        // 2.2 to binary ("uv",userId)
        // =>First given to uv for grouping, grouping for calling methods such as sum or process
        // =>The second one is given to userId to save userId in Set
        // =>Here we just need userId, no other goods, no categories
        SingleOutputStreamOperator<Tuple2<String, Long>> uvTuple2 = userBehaviorFilter.map(new MapFunction<UserBehavior, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(UserBehavior value) throws Exception {
                return Tuple2.of("uv", value.getUserId());
            }
        });

        // 2.3 Grouped by uv
        KeyedStream<Tuple2<String, Long>, String> uvKS = uvTuple2.keyBy(data -> data.f0);
        
        // 2.4 Processing with process
        SingleOutputStreamOperator<Integer> uvDS = uvKS.process(
                new KeyedProcessFunction<String, Tuple2<String, Long>, Integer>() {
                    // Define a Set for removing and storing userId
                    private Set<Long> uvSet = new HashSet<>();

                    /**
                     * To process data one at a time
                     * @param value
                     * @param ctx
                     * @param out
                     * @throws Exception
                     */
                    @Override
                    public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Integer> out) throws Exception {
                        // To get a piece of data, save the userId in the Set
                        uvSet.add(value.f1);
                        // Through the collector, send the UV value downstream=> the number of elements in the Set, which is the UV value
                        out.collect(uvSet.size());
                    }
                }
        );

        uvDS.print("uv");

        env.execute();
    }
}

2) Statistical Analysis of Marketing Business Indicators

With the popularity of smartphones, more and more users are coming from mobile in today's e-commerce websites. Compared with traditional browser login methods, mobile APP has become the preferred way for more users to visit e-commerce websites. For e-commerce enterprises, they will generally promote their own APP through various channels, and the statistics of these channels (such as the click-through of advertising links on different websites, APP downloads) have become important business indicators for marketing.

public class MarketingUserBehavior {
        private Long userId;//user
        private String behavior;//Behavior: Download, Install, Update, Uninstall
        private String channel;//Channel: Millet, Huawei, OPPO, VIVO
        private Long timestamp;//time stamp

        public Long getUserId() {
            return userId;
        }

        public void setUserId(Long userId) {
            this.userId = userId;
        }

        public String getBehavior() {
            return behavior;
        }

        public void setBehavior(String behavior) {
            this.behavior = behavior;
        }

        public String getChannel() {
            return channel;
        }

        public void setChannel(String channel) {
            this.channel = channel;
        }

        public Long getTimestamp() {
            return timestamp;
        }

        public void setTimestamp(Long timestamp) {
            this.timestamp = timestamp;
        }

        public MarketingUserBehavior() {
        }

        public MarketingUserBehavior(Long userId, String behavior, String channel, Long timestamp) {
            this.userId = userId;
            this.behavior = behavior;
            this.channel = channel;
            this.timestamp = timestamp;
        }

        @Override
        public String toString() {
            return "MarketingUserBehavior{" +
                    "userId=" + userId +
                    ", behavior='" + behavior + '\'' +
                    ", channel='" + channel + '\'' +
                    ", timestamp=" + timestamp +
                    '}';
        }
}
import bean.MarketingUserBehavior;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Arrays;
import java.util.List;
import java.util.Random;

public class APPMarketingAnalysis {
    public static void main(String[] args) throws Exception {
        //Create execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        //get data
        DataStreamSource<MarketingUserBehavior> inputDs = env.addSource(new MysourceFunction());
        //Processing data
        //Group by dimension of statistics: channels, behavior
        SingleOutputStreamOperator<Tuple2<String, Integer>> chanalAndBehaviorTuple2 = inputDs.map(new MapFunction<MarketingUserBehavior, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(MarketingUserBehavior value) throws Exception {
                return Tuple2.of(value.getChannel() + "_" + value.getBehavior(), 1);
            }
        });
        KeyedStream<Tuple2<String, Integer>, String> chanalAndBehaviorKs = chanalAndBehaviorTuple2.keyBy(data -> data.f0);
        SingleOutputStreamOperator<Tuple2<String, Integer>> resultDs = chanalAndBehaviorKs.sum(1);

        resultDs.print();


        env.execute();
    }
    //Custom Data Source
    public static class MysourceFunction implements SourceFunction<MarketingUserBehavior> {

        private boolean flag = true;
        private List<String> behaviorList = Arrays.asList("DOWNLOAD","INSTALL","UPDATE","UNINSTALL");
        private List<String> channelList = Arrays.asList("Huawei","XioaMi","OPPO","VIVO");
        @Override
        public void run(SourceContext<MarketingUserBehavior> ctx) throws Exception {
            Random random = new Random();
            while(flag){
                ctx.collect(
                        new MarketingUserBehavior(
                            Long.valueOf(random.nextInt(10)),
                            behaviorList.get(random.nextInt(behaviorList.size())),
                            channelList.get(random.nextInt(channelList.size())),
                            System.currentTimeMillis()
                        )
                );
                Thread.sleep(1000L);
            }
        }
        @Override
        public void cancel() {
            flag = false;
        }
    }
}
import com.atguigu.bean.MarketingUserBehavior;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Arrays;
import java.util.List;
import java.util.Random;


public class Flink28_Case_APPMarketingAnalysisWithoutChannel {
    public static void main(String[] args) throws Exception {
        // 0 Execution Environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 1. Read data, convert bean s
        DataStreamSource<MarketingUserBehavior> appDS = env.addSource(new AppSource());

        // 2. Processing data: statistics of different behaviors (regardless of channel)
        // 2.1 Grouping by statistical dimension: behavior
        // 2.1.1 Convert to (Behavior, 1) Binary
        SingleOutputStreamOperator<Tuple2<String, Integer>> channelAndBehaviorTuple2 = appDS.map(new MapFunction<MarketingUserBehavior, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(MarketingUserBehavior value) throws Exception {
                return Tuple2.of(value.getBehavior(), 1);
            }
        });
        // 2.1.2 Grouped by Behavior
        KeyedStream<Tuple2<String, Integer>, String> channelAndBehaviorKS = channelAndBehaviorTuple2.keyBy(data -> data.f0);

        // 2.2 Sum
        SingleOutputStreamOperator<Tuple2<String, Integer>> resultDS = channelAndBehaviorKS.sum(1);

        // 3. Output: Print
        resultDS.print("app marketing analysis by behavior");


        env.execute();
    }


    public static class AppSource implements SourceFunction<MarketingUserBehavior> {

        private boolean flag = true;
        private List<String> behaviorList = Arrays.asList("DOWNLOAD", "INSTALL", "UPDATE", "UNINSTALL");
        private List<String> channelList = Arrays.asList("XIAOMI", "HUAWEI", "OPPO", "VIVO");

        @Override
        public void run(SourceContext<MarketingUserBehavior> ctx) throws Exception {
            while (flag) {
                Random random = new Random();
                ctx.collect(
                        new MarketingUserBehavior(
                                Long.valueOf(random.nextInt(10)),
                                behaviorList.get(random.nextInt(behaviorList.size())),
                                channelList.get(random.nextInt(channelList.size())),
                                System.currentTimeMillis()
                        )
                );
                Thread.sleep(1000L);
            }
        }

        @Override
        public void cancel() {
            flag = false;
        }
    }
}

3) Page Advertising Analysis

In the marketing business indicators of e-commerce websites, in addition to their own APP promotion, they also take into account the advertising on the page (including the advertising of their own products and other websites). Therefore, the statistical analysis related to advertising is also an important indicator of marketing.

For advertising statistics, the simplest and most important thing is the click-through volume of page advertisements. Websites often need to set pricing strategies and adjust promotion methods based on the click-through volume of advertisements, which can also be used to collect user preference information. A more specific application is that we can divide the users according to their geographic location, so as to summarize the preferences of users in different provinces for different advertisements, which will be more helpful for the accurate placement of advertisements.

public class AdClickLog {
    private Long userId;//User ID
    private Long adId;//Advertising ID
    private String province;//Province
    private String city;//City
    private Long timestamp;//time stamp

    public AdClickLog() {
    }

    public AdClickLog(Long userId, Long adId, String province, String city, Long timestamp) {
        this.userId = userId;
        this.adId = adId;
        this.province = province;
        this.city = city;
        this.timestamp = timestamp;
    }

    public Long getUserId() {
        return userId;
    }

    public void setUserId(Long userId) {
        this.userId = userId;
    }

    public Long getAdId() {
        return adId;
    }

    public void setAdId(Long adId) {
        this.adId = adId;
    }

    public String getProvince() {
        return province;
    }

    public void setProvince(String province) {
        this.province = province;
    }

    public String getCity() {
        return city;
    }

    public void setCity(String city) {
        this.city = city;
    }

    public Long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "AdClickLog{" +
                "userId=" + userId +
                ", adId=" + adId +
                ", province='" + province + '\'' +
                ", city='" + city + '\'' +
                ", timestamp=" + timestamp +
                '}';
    }
}

import bean.AdClickLog;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Case_AdClicAnalysis {
    public static void main(String[] args) throws Exception {
        //execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        //Convert Data to Bean Object
        SingleOutputStreamOperator<AdClickLog> adClickDs = 
                env
                .readTextFile("backup_AdClickLog.csv")
                .map(new MapFunction<String, AdClickLog>() {
                    @Override
                    public AdClickLog map(String value) throws Exception {
                        String[] datas = value.split(",");
                        return new AdClickLog(
                                Long.valueOf(datas[0]),
                                Long.valueOf(datas[1]),
                                datas[2],
                                datas[3],
                                Long.valueOf(datas[4])
                        );
                    }
                });
        //Processing data: Real-time click-through statistics for different provinces and advertisements
        //Grouped by statistical dimension: provinces, advertisements
        SingleOutputStreamOperator<Tuple2<String, Integer>> resultDs = adClickDs
                .map(new MapFunction<AdClickLog, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(AdClickLog value) throws Exception {
                        return Tuple2.of(value.getProvince() + "_" + value.getAdId(), 1);
                    }
                })
                .keyBy(data -> data.f0)
                .sum(1);
        resultDs.print("ad");

        env.execute();

    }
}

4) Real-time monitoring of order payment

In an e-commerce website, the payment of orders as a direct link to marketing revenue is very important in the business process. For orders, in order to control the business process correctly and to increase the willingness of users to pay, the website will generally set a payment expiration time, and orders that do not pay for a period of time will be cancelled. In addition, for the payment of orders, we should also ensure the correctness of user payment, which can make a real-time reconciliation through the transaction data of third-party payment platform.

Sick order transactions from two streams match

For order payment incidents, user payment is not actually complete, we have to confirm whether the platform account is in the account. Often this comes from different log information, so we'll read in data from both streams for merging.

import com.atguigu.bean.AdClickLog;
import com.atguigu.bean.OrderEvent;
import com.atguigu.bean.TxEvent;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;

import java.util.HashMap;
import java.util.Map;

public class Flink30_Case_OrderTxDetect {
    public static void main(String[] args) throws Exception {
        // 0 Execution Environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        // 1. Read the data and convert it into a bean object
        SingleOutputStreamOperator<OrderEvent> orderDS = env
                .readTextFile("input/OrderLog.csv")
                .map(new MapFunction<String, OrderEvent>() {
                    @Override
                    public OrderEvent map(String value) throws Exception {
                        String[] datas = value.split(",");
                        return new OrderEvent(
                                Long.valueOf(datas[0]),
                                datas[1],
                                datas[2],
                                Long.valueOf(datas[3])
                        );
                    }
                });

        SingleOutputStreamOperator<TxEvent> txDS = env
                .readTextFile("input/ReceiptLog.csv")
                .map(new MapFunction<String, TxEvent>() {
                    @Override
                    public TxEvent map(String value) throws Exception {
                        String[] datas = value.split(",");
                        return new TxEvent(
                                datas[0],
                                datas[1],
                                Long.valueOf(datas[2])
                        );
                    }
                });

        // 2. Processing data: real-time reconciliation monitoring
        // Two streams connect to make a match through txId, which means the reconciliation is successful
        // For transactions of the same order, it is not certain which comes first for the data of the business system and the transaction system.
        // TODO normally connect s two streams by keyby, in order to match the data together
        // You can use keyby before connecting, or connect before keyby
        ConnectedStreams<OrderEvent, TxEvent> orderTxCS = (orderDS.keyBy(order -> order.getTxId()))
                .connect(txDS.keyBy(tx -> tx.getTxId()));

        // Grouping by txId allows data of the same txId to get together
//        ConnectedStreams<OrderEvent, TxEvent> orderTxKS = orderTxCS.keyBy(
//                order -> order.getTxId(),
//                tx -> tx.getTxId());

        // Processing with process
        SingleOutputStreamOperator<String> resultDS = orderTxCS.process(
                new CoProcessFunction<OrderEvent, TxEvent, String>() {
                    // Data used to store trading systems
                    private Map<String, TxEvent> txMap = new HashMap<>();
                    // Data used to store business systems
                    private Map<String, OrderEvent> orderMap = new HashMap<>();

                    /**
                     * Process business system data one at a time
                     * @param value
                     * @param ctx
                     * @param out
                     * @throws Exception
                     */
                    @Override
                    public void processElement1(OrderEvent value, Context ctx, Collector<String> out) throws Exception {
                        // Enter this method to indicate that the data is from the business system
                        // Is there any transaction data to judge?
                        // The transaction data saved by the transaction code query=>If it is not empty, the transaction data has come, matching
                        TxEvent txEvent = txMap.get(value.getTxId());
                        if (txEvent == null) {
                            // 1. Save yourself temporarily when the transaction data is not available=>etc.
                            orderMap.put(value.getTxId(), value);
                        } else {
                            // 2. Explain that the transaction data came from=>Successful reconciliation
                            out.collect("Order" + value.getOrderId() + "Successful reconciliation");
                            // Reconciliation succeeded, deleting saved transaction data
                            txMap.remove(value.getTxId());
                        }
                    }

                    /**
                     * Processing data from trading systems, one at a time
                     * @param value
                     * @param ctx
                     * @param out
                     * @throws Exception
                     */
                    @Override
                    public void processElement2(TxEvent value, Context ctx, Collector<String> out) throws Exception {
                        // Enter this method to indicate that the data is from the trading system
                        // Judging if business data is available?
                        OrderEvent orderEvent = orderMap.get(value.getTxId());
                        if (orderEvent == null) {
                            // 1. Explain that business data is not coming=>Save yourself temporarily
                            txMap.put(value.getTxId(), value);
                        } else {
                            // 2. Explain that business data came from=>Successful reconciliation
                            out.collect("Order" + orderEvent.getOrderId() + "Successful reconciliation");
                            // Reconciliation succeeded, deleting saved business data
                            orderMap.remove(value.getTxId());
                        }
                    }
                }
        );
        resultDS.print();
        env.execute();
    }
}

Keywords: Big Data Hadoop hive flink hdfs

Added by MadRhino on Wed, 24 Nov 2021 22:44:02 +0200