Flink Core Programming
1,Environment
When Flink Job submits to perform calculations, it first establishes a link with the Flink framework, that is, the current Flink runtime environment in which task can be scheduled to a different taskManager execution only if environmental information is available. This environment object is relatively simple to obtain.
Batch environment
ExecutionEnvironment benv = ExecutionEnvironment.getExecutionEnvironment();
Streaming Data Processing Environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2,Source
The Flink framework can obtain data from different sources and submit the data to the framework for processing. We call the source of the data the data source.
Code demonstration:
// Define a sample class: Water level sensor: used to receive altitude data // id:Sensor number // ts:Timestamp // vc:Altitude public class WaterSensor { private String id; private Long ts; private Integer vc; public Watersensor() { } public Watersensor(String id, Long ts, Integer vc) { this.id = id; this.ts = ts; this.vc = vc; } public String getId() { return id; } public void setId(String id) { this.id = id; } public Long getTs() { return ts; } public void setTs(Long ts) { this.ts = ts; } public Integer getVc() { return vc; } public void setVc(Integer vc) { this.vc = vc; } @Override public String toString() { return "WaterSensor{" + "id='" + id + '\'' + ", ts=" + ts + ", vc=" + vc + '}'; } }
1) Read data from a collection
In general, data can be temporarily stored in memory and used as a data source after forming a special data structure. It is common for data structures to use set types here.
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.Arrays; public class Flink02_Source_Collection { public static void main(String[] args) throws Exception { //Create execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //Specify Parallelism env.setParallelism(1); //Read data from a collection DataStreamSource<WaterSensor> collectionDs = env.fromCollection(Arrays.asList( new WaterSensor("ws_001", 1577844001L, 45), new WaterSensor("ws_002", 1577844015L, 45), new WaterSensor("ws_003", 1577844020L, 45) )); //Print collectionDs.print(); //implement env.execute(); } }
2) Read data from files
Normally, we get data from storage media, and more often, log files are used as data sources.
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class Flink03_Source_File { public static void main(String[] args) throws Exception { //Create execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); //Read data from a file DataStreamSource<String> fileDs = env.readTextFile("input/sensor-data.log"); //Print fileDs.print(); //implement env.execute(); } }
3) Read data from Kafka
As a message transmission queue, Kafka is distributed, high throughput, and easy to expand subject-based publish/subscribe messaging systems. In today's enterprise development, Kafka and Flink have become the preferred choice for building a real-time data processing system.
1. Introducing dependency on Kafka connectors
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency>
(2) Code implementation reference
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import java.util.Properties; public class Flink04_Source_Kafka { public static void main(String[] args) throws Exception { //Create execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); //Read data from Kafka Properties properties = new Properties(); //kafka cluster address properties.setProperty("bootstrap.servers", "hadoop102:9092"); //Consumer Group properties.setProperty("group.id","consumer-group"); //deserializer properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //Consumption strategy properties.setProperty("auto.offset.reset", "latest"); DataStreamSource<String> kafkaDs = env.addSource(new FlinkKafkaConsumer011<String>( "sensor", new SimpleStringSchema(), properties)); kafkaDs.print("kafka source"); env.execute(); } }
4) Custom Data Source
In most cases, the previous data sources can already meet the needs, but there will inevitably be special cases, so flink also provides a way to customize the data sources.
import com.atguigu.bean.WaterSensor; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.Random; public class Flink05_Source_MySource { public static void main(String[] args) throws Exception { // 0. Create an execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // TODO 1.Source: Read from custom data sources DataStreamSource<WaterSensor> inputDS = env.addSource(new MySourceFunction()); inputDS.print(); env.execute(); } /** * Custom Data Source * 1. Implement SourceFunction, specifying the type of output * 2. Override two methods * run(): * cancel(): */ public static class MySourceFunction implements SourceFunction<WaterSensor> { // Define a flag bit to control the generation of data private boolean flag = true; @Override public void run(SourceContext<WaterSensor> ctx) throws Exception { Random random = new Random(); while (flag) { ctx.collect( new WaterSensor( "sensor_" + random.nextInt(3), System.currentTimeMillis(), random.nextInt(10) + 40 ) ); Thread.sleep(2000L); } } @Override public void cancel() { this.flag = false; } } }
3,Transform
1)map
Mapping: Converting data from a data stream to form a new data stream, consuming an element, and producing an element
Parameter: lambda expression or MapFunction implementation class
Sick Back: DataStream
//lambda SingleOutputStreamOperator<WaterSensor> sensorDS = inputDS .map((MapFunction<String, WaterSensor>) value -> { String[] datas = value.split(","); return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2])); });
import com.atguigu.bean.WaterSensor; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.Random; public class Flink06_Transform_Map { public static void main(String[] args) throws Exception { // 0. Create an execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 1. Read data from files DataStreamSource<String> inputDS = env.readTextFile("input/sensor-data.log"); // 2.Transform: Map converts to entity objects SingleOutputStreamOperator<WaterSensor> sensorDS = inputDS.map(new MyMapFunction()); // 3. Printing sensorDS.print(); env.execute(); } /** * Implement the MapFunction interface and define generics (inputs, outputs) * Override map method */ public static class MyMapFunction implements MapFunction<String, WaterSensor> { @Override public WaterSensor map(String value) throws Exception { String[] datas = value.split(","); return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2])); } } }
All Flink function classes have Rich versions. It differs from regular functions in that it captures the context of the running environment and has some lifecycle methods, so more complex functionality can be achieved. It also means providing more, richer features. For example: RichMapFunction
import com.atguigu.bean.WaterSensor; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class Flink07_Transform_RichMapFunction { public static void main(String[] args) throws Exception { // 0. Create an execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 1. Read data from files DataStreamSource<String> inputDS = env.readTextFile("input/sensor-data.log"); // DataStreamSource<String> inputDS = env.socketTextStream("localhost",9999); // 2.Transform: Map converts to entity objects SingleOutputStreamOperator<WaterSensor> sensorDS = inputDS.map(new MyRichMapFunction()); // 3. Printing sensorDS.print(); env.execute(); } /** * Inherit RichMapFunction, specify the type of input, return type * open() and close() lifecycle management methods are provided * Ability to get runtime context object= Ability to get environment information such as status, task information, etc. */ public static class MyRichMapFunction extends RichMapFunction<String, WaterSensor> { @Override public WaterSensor map(String value) throws Exception { String[] datas = value.split(","); return new WaterSensor(getRuntimeContext().getTaskName() + datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2])); } @Override public void open(Configuration parameters) throws Exception { System.out.println("open..."); } @Override public void close() throws Exception { System.out.println("close..."); } } }
*Rich Function******** has a life cycle concept. Typical lifecycle approaches are:*
The open() method is the initialization method of rich function and is called before an operator such as map or filter is called
The close() method is the last method called in the life cycle and does some cleanup
The getRuntimeContext() method provides some information about the RuntimeContext of a function, such as the parallelism of the function execution, the name of the task, and the state state
2)flatMap
Flat Mapping: Split the whole data stream into individual uses, consume one element, and produce zero to multiple elements
Parameter: lambda expression or FlatMapFunction implementation class
Sick Back: DataStream
import com.atguigu.bean.WaterSensor; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import java.util.Arrays; import java.util.List; public class Flink08_Transform_FlatMap { public static void main(String[] args) throws Exception { // 0. Create an execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 1. Read data from files DataStreamSource<List<Integer>> inputDS = env.fromCollection( Arrays.asList( Arrays.asList(1, 2, 3, 4), Arrays.asList(5, 6, 7, 8) ) ); // 2.Transform: Convert FlatMap to Entity Object inputDS .flatMap(new FlatMapFunction<List<Integer>, Integer>() { @Override //The data coming in is List{List(1, 2, 3, 4),List(5, 6, 7, 8)} public void flatMap(List<Integer> value, Collector<Integer> out) throws Exception { //Traverse through each List that comes in for (Integer number : value) { out.collect(number + 10); } } }) .print();//10 20 30 40 50 .......... env.execute(); } }
3)filter
Sick filter: Retain data that meets the condition (true) according to the specified rules, and discard data that does not meet the condition (false)
Sick parameter: Scala anonymous function or FilterFunction
Sick Back: DataStream
import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import java.util.Arrays; import java.util.List; /** * TODO * * @author cjp * @version 1.0 * @date 2020/9/16 15:29 */ public class Flink09_Transform_Filter { public static void main(String[] args) throws Exception { // 0. Create an execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 1. Read data from files DataStreamSource<Integer> inputDS = env.fromCollection( Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8) ); // TODO Transform: filter =>reserved for true, discarded for false inputDS .filter(new MyFilterFunction()) .print(); // 2,4,6,8 env.execute(); } public static class MyFilterFunction implements FilterFunction<Integer> { @Override public boolean filter(Integer value) throws Exception { return value % 2 == 0; } } }
4)keyby
There is a GroupBy operator in Spark that groups data according to specified rules, and a similar function in flink is keyBy, which shunts data according to specified keys
Sick shunt: Elements are sent to different partitions according to the hashcode of the specified Key, and the same Key is divided into one partition (in this case, one of several parallel nodes of the downstream operator). keyBy() is partitioned by hashing
Sick parameter: Scala anonymous function or POJO property or tuple index, cannot use array
Click Back: KeyedStream
package com.atguigu.chapter05; import com.atguigu.bean.WaterSensor; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.Arrays; public class Flink10_Transform_KeyBy { public static void main(String[] args) throws Exception { // 0. Create an execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 1. Read data from files DataStreamSource<String> inputDS = env.readTextFile("input/sensor-data.log"); // 2.Transform: Map converts to entity objects SingleOutputStreamOperator<WaterSensor> sensorDS = inputDS.map(new Flink06_Transform_Map.MyMapFunction()); // TODO Keyby: Grouping // The type of key returned by the location index or field name is undetermined, so it returns to Tuple, which is cumbersome to use later on // By explicitly specifying the key, the key you get is the specific type=>Implement KeySelector or lambda // Grouping is a logical grouping, i.e. labeling each data (which grouping it belongs to), rather than changing the degree of parallelism // sensorDS.keyBy(0).print(); // KeyedStream<WaterSensor, Tuple> sensorKSByFieldName = sensorDS.keyBy("id"); KeyedStream<WaterSensor, String> sensorKSByKeySelector = sensorDS.keyBy(new MyKeySelector()); // KeyedStream<WaterSensor, String> waterSensorStringKeyedStream = sensorDS.keyBy(r -> r.getId()); env.execute(); } public static class MyKeySelector implements KeySelector<WaterSensor, String> { @Override public String getKey(WaterSensor value) throws Exception { return value.getId(); } } }
5)shuffle
Shuffle shuffle: scatter data randomly downstream
Sick parameter: none
Sick Back: DataStream
package com.atguigu.chapter05; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class Flink11_Transform_Shuffle { public static void main(String[] args) throws Exception { // 0. Create an execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); // 1. Read data from files DataStreamSource<String> inputDS = env.readTextFile("input/sensor-data.log"); inputDS.print("input"); DataStream<String> resultDS = inputDS.shuffle(); resultDS.print("shuffle"); env.execute(); } }
6)split
In some cases, we need to split data streams into two or more streams based on certain characteristics, adding tags to different streams to facilitate removal from them.
import com.atguigu.bean.WaterSensor; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SplitStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.Arrays; public class Flink12_Transform_Split { public static void main(String[] args) throws Exception { // 0. Create an execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 1. Read data from files SingleOutputStreamOperator<WaterSensor> sensorDS = env .readTextFile("input/sensor-data.log") .map(new MapFunction<String, WaterSensor>() { @Override public WaterSensor map(String value) throws Exception { String[] datas = value.split(","); return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2])); } }); // TODO Split: Water level below 50 normal, water level [50,80] warning, water level above 80 warning // split does not really separate streams, it tags data SplitStream<WaterSensor> splitSS = sensorDS.split(new OutputSelector<WaterSensor>() { @Override public Iterable<String> select(WaterSensor value) { if (value.getVc() < 50) { return Arrays.asList("normal"); } else if (value.getVc() < 80) { return Arrays.asList("warn"); } else { return Arrays.asList("alarm"); } } } ); env.execute(); } }
7)select
After split ting a data stream, how do you remove different tags from the stream? The select operator is needed.
import com.atguigu.bean.WaterSensor; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SplitStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.Arrays; public class Flink13_Transform_Select { public static void main(String[] args) throws Exception { // 0. Create an execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 1. Read data from files SingleOutputStreamOperator<WaterSensor> sensorDS = env .readTextFile("input/sensor-data.log") .map(new MapFunction<String, WaterSensor>() { @Override public WaterSensor map(String value) throws Exception { String[] datas = value.split(","); return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2])); } }); // TODO Split: Water level below 50 normal, water level [50,80] warning, water level above 80 warning // split does not really separate streams SplitStream<WaterSensor> splitSS = sensorDS.split(new OutputSelector<WaterSensor>() { @Override public Iterable<String> select(WaterSensor value) { if (value.getVc() < 50) { return Arrays.asList("normal","happy","hi"); } else if (value.getVc() < 80) { return Arrays.asList("warn","happy"); } else { return Arrays.asList("alarm"); } } } ); //TODO select // Get the corresponding stream from the previous label signature // A stream can have more than one name, and when it's taken out, a given name will do splitSS.select("normal").print("normal"); // splitSS.select("hi").print("normal"); splitSS.select("happy").print("warn"); // splitSS.select("warn").print("warn"); // splitSS.select("alarm").print("alarm"); env.execute(); } }
8)connect
In some cases, we need to connect data streams from two different sources to match data, such as order payment and third-party transaction information, which comes from different data sources. After the connection, order payment and third-party transaction information are reconciled so that real payments can be calculated.
The connector operator in Flink connects two streams that hold their type of data. After they are connected by Connect, they are only placed in the same stream, keeping their data and form unchanged internally, and the two streams are independent of each other.
import com.atguigu.bean.WaterSensor; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.ConnectedStreams; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SplitStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoMapFunction; import java.util.Arrays; public class Flink14_Transform_Connect { public static void main(String[] args) throws Exception { // 0. Create an execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 1. Read data from files SingleOutputStreamOperator<WaterSensor> sensorDS = env .readTextFile("input/sensor-data.log") .map(new MapFunction<String, WaterSensor>() { @Override public WaterSensor map(String value) throws Exception { String[] datas = value.split(","); return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2])); } }); // Get another stream DataStreamSource<Integer> numDS = env.fromCollection(Arrays.asList(1, 2, 3, 4)); // TODO uses connect to connect two streams // Two stream data types can be different // Only two streams can be connected // Separate processing is also used when processing data ConnectedStreams<WaterSensor, Integer> sensorNumCS = sensorDS.connect(numDS); // Call other operators SingleOutputStreamOperator<Object> resultDS = sensorNumCS.map( new CoMapFunction<WaterSensor, Integer, Object>() { @Override public String map1(WaterSensor value) throws Exception { return value.toString(); } @Override public Integer map2(Integer value) throws Exception { return value + 10; } }); resultDS.print(); env.execute(); } }
9)union
union operations on two or more DataStreams result in a new DataStream containing all DataStream elements.
import com.atguigu.bean.WaterSensor; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.ConnectedStreams; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoMapFunction; import java.util.Arrays; public class Flink15_Transform_Union { public static void main(String[] args) throws Exception { // 0. Create an execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // Get Stream DataStreamSource<Integer> numDS = env.fromCollection(Arrays.asList(1, 2, 3, 4)); DataStreamSource<Integer> numDS1 = env.fromCollection(Arrays.asList(11, 12, 13, 14)); DataStreamSource<Integer> numDS2 = env.fromCollection(Arrays.asList(21, 22, 23, 24)); //TODO Union Connection Flow // Require that streams have the same data type // Can connect multiple streams DataStream<Integer> unionDS = numDS .union(numDS1) .union(numDS2); unionDS .map(new MapFunction<Integer, Integer>() { @Override public Integer map(Integer value) throws Exception { return value * 10; } }) .print("union"); env.execute(); } }
4,Opertor
1) Rolling aggregate operator
These operators aggregate for each branch of ****KeyedStream*. When the execution is complete, the aggregated results are synthesized into a stream and returned, so the results are *DataStream****
import com.atguigu.bean.WaterSensor; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class Flink16_Transform_RollingAgg { public static void main(String[] args) throws Exception { // 0. Create an execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 1. Read data from files DataStreamSource<String> inputDS = env // .readTextFile("input/sensor-data.log"); .socketTextStream("localhost",9999 ); // 2.Transform: Map converts to entity objects SingleOutputStreamOperator<WaterSensor> sensorDS = inputDS.map((MapFunction<String, WaterSensor>) value -> { String[] datas = value.split(","); return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2])); }); // 3. Group by id KeyedStream<Tuple3<String, Long, Integer>, String> sensorKS = sensorDS .map(new MapFunction<WaterSensor, Tuple3<String, Long, Integer>>() { @Override public Tuple3<String, Long, Integer> map(WaterSensor value) throws Exception { return new Tuple3<>(value.getId(), value.getTs(), value.getVc()); } }) .keyBy( r -> r.f0); // TODO rolling aggregation operator: one, one, one output // sensorKS.sum(2).print("sum"); sensorKS.max(2).print("max"); // sensorKS.min(2).print("min"); env.execute(); } public static class MyKeySelector implements KeySelector<WaterSensor, String> { @Override public String getKey(WaterSensor value) throws Exception { return value.getId(); } } }
2)reduce
An aggregation operation of a grouped data stream that combines the current elements with the results of the last aggregation, yielding a new value that returns a stream containing the results of each aggregation, rather than only returning the final result of the last aggregation.
import com.atguigu.bean.WaterSensor; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class Flink17_Transform_Reduce { public static void main(String[] args) throws Exception { // 0. Create an execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 1. Read data from files DataStreamSource<String> inputDS = env // .readTextFile("input/sensor-data.log"); .socketTextStream("localhost", 9999); // 2.Transform: Map converts to entity objects SingleOutputStreamOperator<WaterSensor> sensorDS = inputDS.map(new Flink06_Transform_Map.MyMapFunction()); // 3. Group by id KeyedStream<Tuple3<String, Long, Integer>, String> sensorKS = sensorDS .map(new MapFunction<WaterSensor, Tuple3<String, Long, Integer>>() { @Override public Tuple3<String, Long, Integer> map(WaterSensor value) throws Exception { return new Tuple3<>(value.getId(), value.getTs(), value.getVc()); } }) .keyBy(r -> r.f0); // TODO Reduce // 1. Enter the same type and output the same type // 2. Data from the first item will not enter reduce // 3. Help us save the intermediate state sensorKS .reduce( new ReduceFunction<Tuple3<String, Long, Integer>>() { @Override public Tuple3<String, Long, Integer> reduce(Tuple3<String, Long, Integer> value1, Tuple3<String, Long, Integer> value2) throws Exception { System.out.println(value1.toString() + " <-> " + value2.toString()); return Tuple3.of("aaa", 123L, value1.f2 + value2.f2); } } ) .print("reduce"); env.execute(); } public static class MyKeySelector implements KeySelector<WaterSensor, String> { @Override public String getKey(WaterSensor value) throws Exception { return value.getId(); } } }
3)process
Flink shunts the data flow through keyBy. If you want to get information about the environment during the process, you can use the process operator to customize it.
import com.atguigu.bean.WaterSensor; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; public class Flink18_Transform_Process { public static void main(String[] args) throws Exception { // 0. Create an execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 1. Read data from files DataStreamSource<String> inputDS = env .readTextFile("input/sensor-data.log"); // .socketTextStream("localhost", 9999); // 2.Transform: Map converts to entity objects SingleOutputStreamOperator<WaterSensor> sensorDS = inputDS.map((MapFunction<String, WaterSensor>) value -> { String[] datas = value.split(","); return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2])); }); // 3. Group by id KeyedStream<Tuple3<String, Long, Integer>, String> sensorKS = sensorDS .map(new MapFunction<WaterSensor, Tuple3<String, Long, Integer>>() { @Override public Tuple3<String, Long, Integer> map(WaterSensor value) throws Exception { return new Tuple3<>(value.getId(), value.getTs(), value.getVc()); } }) .keyBy(r -> r.f0); // TODO Process // Some environmental information is available sensorKS.process( new KeyedProcessFunction<String, Tuple3<String, Long, Integer>, String>() { /** * How to process data: Processing one at a time * @param value One Data * @param ctx context * @param out collector * @throws Exception */ @Override public void processElement(Tuple3<String, Long, Integer> value, Context ctx, Collector<String> out) throws Exception { out.collect("current key=" + ctx.getCurrentKey() + "current time=" + ctx.timestamp() + ",data=" + value); } } ) .print("process"); env.execute(); } public static class MyKeySelector implements KeySelector<WaterSensor, String> { @Override public String getKey(WaterSensor value) throws Exception { return value.getId(); } } }
5,Sink
Sink means sink. The so-called Sink in Flink can actually mean storing data or expanding the scope, meaning that the print method that we used all the time before sending the processed data to the output of the specified storage system is actually a Sink.
1)Kafka Sink
import com.atguigu.bean.WaterSensor; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; import org.apache.flink.util.Collector; public class Sink_Kafka { public static void main(String[] args) throws Exception { // 0. Create an execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 1. Read data from files DataStreamSource<String> inputDS = env .readTextFile("input/sensor-data.log"); // .socketTextStream("localhost", 9999); //TODO data Sink to Kafka // DataStream call addSink =) Note that it is not env call inputDS.addSink( new FlinkKafkaProducer011<String>( "hadoop102:9092", "sensor0421", new SimpleStringSchema()) ); env.execute(); } }
2)Redis Sink
Let's send the processed data to the Redis cache database
import com.atguigu.bean.WaterSensor; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; public class Flink20_Sink_Redis { public static void main(String[] args) throws Exception { // 0. Create an execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 1. Read data from files DataStreamSource<String> inputDS = env .readTextFile("input/sensor-data.log"); // .socketTextStream("localhost", 9999); //TODO data Sink to Redis FlinkJedisPoolConfig jedisConfig = new FlinkJedisPoolConfig.Builder() .setHost("hadoop102") .setPort(6379) .build(); inputDS.addSink( new RedisSink<String>( jedisConfig, new RedisMapper<String>() { // Command for redis: key is the outermost key @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.HSET,"sensor0421"); } // Hash type: This specifies the hash key @Override public String getKeyFromData(String data) { String[] datas = data.split(","); return datas[1]; } // Hash type: This specifies the value of hash @Override public String getValueFromData(String data) { String[] datas = data.split(","); return datas[2]; } } ) ); env.execute(); } }
3)ElasticSearch Sink
Let's send the processed data to the ElasticSearch search server.
import com.atguigu.bean.WaterSensor; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; import org.apache.http.HttpHost; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class Flink21_Sink_ES { public static void main(String[] args) throws Exception { // 0. Create an execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 1. Read data from files DataStreamSource<String> inputDS = env .readTextFile("input/sensor-data.log"); // .socketTextStream("localhost", 9999); //TODO data Sink to ES List<HttpHost> httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost("hadoop102",9200)); httpHosts.add(new HttpHost("hadoop103",9200)); httpHosts.add(new HttpHost("hadoop104",9200)); ElasticsearchSink<String> esSink = new ElasticsearchSink.Builder<String>( httpHosts, new ElasticsearchSinkFunction<String>() { @Override public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { // Place data in Map Map<String, String> dataMap = new HashMap<>(); dataMap.put("data", element); // Create IndexRequest = specify index, type, source IndexRequest indexRequest = Requests.indexRequest("sensor0421").type("reading").source(dataMap); // Add to RequestIndexer indexer.add(indexRequest); } } ) .build(); inputDS.addSink(esSink); env.execute(); } }
4) Custom Sink
If Flink does not provide a connector that we can use directly, what do we do if we want to store the data in our own storage device? No problem, Flink offers a custom Sink, and you decide how to store it yourself.
import com.atguigu.bean.WaterSensor; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink; import org.apache.http.HttpHost; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class Flink22_Sink_MySQL { public static void main(String[] args) throws Exception { // 0. Create an execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 1. Read data from files DataStreamSource<String> inputDS = env .readTextFile("input/sensor-data.log"); // .socketTextStream("localhost", 9999); //TODO data Sink to custom MySQL inputDS.addSink( new RichSinkFunction<String>() { private Connection conn = null; private PreparedStatement pstmt = null; @Override public void open(Configuration parameters) throws Exception { conn = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/test", "root", "000000"); pstmt = conn.prepareStatement("INSERT INTO sensor VALUES (?,?,?)"); } @Override public void close() throws Exception { pstmt.close(); conn.close(); } @Override public void invoke(String value, Context context) throws Exception { String[] datas = value.split(","); pstmt.setString(1, datas[0]); pstmt.setLong(2, Long.valueOf(datas[1])); pstmt.setInt(3, Integer.valueOf(datas[2])); pstmt.execute(); } } ); env.execute(); } }
6. Case Practice
1) Network traffic statistics based on buried log data
//Read log data and convert to JavaBean object for easy operation public class UserBehavior { private Long userId; //User ID private Long itemId; //commodity private Integer categoryId; //Commodity Category ID private String behavior; //Behavior Type private Long timestamp; //time stamp public Long getUserId() { return userId; } public void setUserId(Long userId) { this.userId = userId; } public Long getItemId() { return itemId; } public void setItemId(Long itemId) { this.itemId = itemId; } public Integer getCategoryId() { return categoryId; } public void setCategoryId(Integer categoryId) { this.categoryId = categoryId; } public String getBehavior() { return behavior; } public void setBehavior(String behavior) { this.behavior = behavior; } public Long getTimestamp() { return timestamp; } public void setTimestamp(Long timestamp) { this.timestamp = timestamp; } public UserBehavior(Long userId, Long itemId, Integer categoryId, String behavior, Long timestamp) { this.userId = userId; this.itemId = itemId; this.categoryId = categoryId; this.behavior = behavior; this.timestamp = timestamp; } @Override public String toString() { return "UserBehavior{" + "userId=" + userId + ", itemId=" + itemId + ", categoryId=" + categoryId + ", behavior='" + behavior + '\'' + ", timestamp=" + timestamp + '}'; } }
* Statistics of total site views (PV s)*
One of the easiest metrics for measuring Web site traffic is the amount of page views (PV) on your site. Each time a user opens a page, a PV is recorded, and multiple times the same page is opened, the total number of views is accumulated. Generally speaking, PV is proportional to the number of visitors, but PV does not directly determine the actual number of visitors to a page, just as a visitor can create a very high PV by constantly refreshing a page. Next we will use the Flink operator we learned earlier to implement the statistics in PV.
Implement PV statistics with WordCount ideas
import bean.UserBehavior; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class Case_PV { public static void main(String[] args) throws Exception { //Create execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); //Read data DataStreamSource<String> inputDs = env.readTextFile("input/UserBehavior.csv"); //Convert read data into bean objects SingleOutputStreamOperator<UserBehavior> userBehaviorDs = inputDs.map(new MapFunction<String, UserBehavior>() { @Override public UserBehavior map(String value) throws Exception { String[] datas = value.split(","); return new UserBehavior( Long.valueOf(datas[0]), Long.valueOf(datas[1]), Integer.valueOf(datas[2]), datas[3], Long.valueOf(datas[4]) ); } }); //Implement PV statistics by referring to wordcount ideas //Processing data //1. Filter out pv behavior SingleOutputStreamOperator<UserBehavior> userBehaviorDsfilter = userBehaviorDs.filter(new FilterFunction<UserBehavior>() { @Override public boolean filter(UserBehavior value) throws Exception { return "pv".equals(value.getBehavior()); } }); //2. Transforming data structures SingleOutputStreamOperator<Tuple2<String, Integer>> pvAndTuple2 = userBehaviorDsfilter.map(new MapFunction<UserBehavior, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(UserBehavior value) throws Exception { return Tuple2.of("pv", 1); } }); //3. Grouping by elements in the first position KeyedStream<Tuple2<String, Integer>, Tuple> pvAndOneKs = pvAndTuple2.keyBy(0); //4. Sum SingleOutputStreamOperator<Tuple2<String, Integer>> pvDs = pvAndOneKs.sum(1); //Print pvDs.print("pv"); env.execute(); } }
Process
import bean.UserBehavior; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; public class Case_PV2 { public static void main(String[] args) throws Exception { //Create execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); //get data DataStreamSource<String> inputDs = env.readTextFile("input/UserBehavior.csv"); //Transform data structure // SingleOutputStreamOperator<UserBehavior> userBeavirDs = inputDs.map((MapFunction<String, UserBehavior>) value -> { String[] datas = value.split(","); return new UserBehavior( Long.valueOf(datas[0]), Long.valueOf(datas[1]), Integer.valueOf(datas[2]), datas[3], Long.valueOf(datas[4]) ); }); //Filter data SingleOutputStreamOperator<UserBehavior> userBeavirfilter = userBeavirDs.filter(data -> "pv".equals(data.getBehavior())); //Grouping by dimension KeyedStream<UserBehavior, String> userBehaviorKs = userBeavirfilter.keyBy(data -> data.getBehavior()); //Summation SingleOutputStreamOperator<Long> resultDS = userBehaviorKs.process(new KeyedProcessFunction<String, UserBehavior, Long>() { //Define a variable to calculate the number of statistical bars private Long pvcount = 0L; /** * * @param valuer * @param ctx * @param out * @throws Exception */ @Override public void processElement(UserBehavior valuer, Context ctx, Collector<Long> out) throws Exception { pvcount++; out.collect(pvcount); } }); inputDs.print("pv"); env.execute(); } }
FlatMap
import com.atguigu.bean.UserBehavior; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; /** * TODO * * @author cjp * @version 1.0 * @date 2020/9/16 15:29 */ public class Flink25_Case_PVByFlatmap { public static void main(String[] args) throws Exception { // 0. Create an execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 1. Read data from a file and convert it to a bean object env .readTextFile("input/UserBehavior.csv") .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] datas = value.split(","); if ("pv".equals(datas[3])) { out.collect(Tuple2.of("pv", 1)); } } }) .keyBy(0) .sum(1) .print("pv by flatmap"); env.execute(); } }
* Statistics of the number of independent visitors (UV) to the website*
In the previous case, we counted all the browsing behavior of all users on the page, that is, the browsing behavior of the same user would be repeated. In practical applications, we often focus on how many different users have visited the site, so another important indicator of traffic is the number of independent visitors (Unique Visitor, UV) to the site.
import com.atguigu.bean.UserBehavior; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; import java.util.HashSet; import java.util.Set; public class Flink26_Case_UV { public static void main(String[] args) throws Exception { // 0. Create an execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 1. Read data from a file and convert it to a bean object SingleOutputStreamOperator<UserBehavior> userBehaviorDS = env .readTextFile("input/UserBehavior.csv") .map(new MapFunction<String, UserBehavior>() { @Override public UserBehavior map(String value) throws Exception { String[] datas = value.split(","); return new UserBehavior( Long.valueOf(datas[0]), Long.valueOf(datas[1]), Integer.valueOf(datas[2]), datas[3], Long.valueOf(datas[4]) ); } }); // TODO for UV statistics: weighting userId, statistics // => userId exists in a Set // =>Calculate the number of elements in the Set, which is the UV value // 2. Processing data // 2.1 Filter out pv behavior=> UV is the weight of pv, so behavior is pv SingleOutputStreamOperator<UserBehavior> userBehaviorFilter = userBehaviorDS.filter(data -> "pv".equals(data.getBehavior())); // 2.2 to binary ("uv",userId) // =>First given to uv for grouping, grouping for calling methods such as sum or process // =>The second one is given to userId to save userId in Set // =>Here we just need userId, no other goods, no categories SingleOutputStreamOperator<Tuple2<String, Long>> uvTuple2 = userBehaviorFilter.map(new MapFunction<UserBehavior, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> map(UserBehavior value) throws Exception { return Tuple2.of("uv", value.getUserId()); } }); // 2.3 Grouped by uv KeyedStream<Tuple2<String, Long>, String> uvKS = uvTuple2.keyBy(data -> data.f0); // 2.4 Processing with process SingleOutputStreamOperator<Integer> uvDS = uvKS.process( new KeyedProcessFunction<String, Tuple2<String, Long>, Integer>() { // Define a Set for removing and storing userId private Set<Long> uvSet = new HashSet<>(); /** * To process data one at a time * @param value * @param ctx * @param out * @throws Exception */ @Override public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Integer> out) throws Exception { // To get a piece of data, save the userId in the Set uvSet.add(value.f1); // Through the collector, send the UV value downstream=> the number of elements in the Set, which is the UV value out.collect(uvSet.size()); } } ); uvDS.print("uv"); env.execute(); } }
2) Statistical Analysis of Marketing Business Indicators
With the popularity of smartphones, more and more users are coming from mobile in today's e-commerce websites. Compared with traditional browser login methods, mobile APP has become the preferred way for more users to visit e-commerce websites. For e-commerce enterprises, they will generally promote their own APP through various channels, and the statistics of these channels (such as the click-through of advertising links on different websites, APP downloads) have become important business indicators for marketing.
public class MarketingUserBehavior { private Long userId;//user private String behavior;//Behavior: Download, Install, Update, Uninstall private String channel;//Channel: Millet, Huawei, OPPO, VIVO private Long timestamp;//time stamp public Long getUserId() { return userId; } public void setUserId(Long userId) { this.userId = userId; } public String getBehavior() { return behavior; } public void setBehavior(String behavior) { this.behavior = behavior; } public String getChannel() { return channel; } public void setChannel(String channel) { this.channel = channel; } public Long getTimestamp() { return timestamp; } public void setTimestamp(Long timestamp) { this.timestamp = timestamp; } public MarketingUserBehavior() { } public MarketingUserBehavior(Long userId, String behavior, String channel, Long timestamp) { this.userId = userId; this.behavior = behavior; this.channel = channel; this.timestamp = timestamp; } @Override public String toString() { return "MarketingUserBehavior{" + "userId=" + userId + ", behavior='" + behavior + '\'' + ", channel='" + channel + '\'' + ", timestamp=" + timestamp + '}'; } }
import bean.MarketingUserBehavior; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.Arrays; import java.util.List; import java.util.Random; public class APPMarketingAnalysis { public static void main(String[] args) throws Exception { //Create execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); //get data DataStreamSource<MarketingUserBehavior> inputDs = env.addSource(new MysourceFunction()); //Processing data //Group by dimension of statistics: channels, behavior SingleOutputStreamOperator<Tuple2<String, Integer>> chanalAndBehaviorTuple2 = inputDs.map(new MapFunction<MarketingUserBehavior, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(MarketingUserBehavior value) throws Exception { return Tuple2.of(value.getChannel() + "_" + value.getBehavior(), 1); } }); KeyedStream<Tuple2<String, Integer>, String> chanalAndBehaviorKs = chanalAndBehaviorTuple2.keyBy(data -> data.f0); SingleOutputStreamOperator<Tuple2<String, Integer>> resultDs = chanalAndBehaviorKs.sum(1); resultDs.print(); env.execute(); } //Custom Data Source public static class MysourceFunction implements SourceFunction<MarketingUserBehavior> { private boolean flag = true; private List<String> behaviorList = Arrays.asList("DOWNLOAD","INSTALL","UPDATE","UNINSTALL"); private List<String> channelList = Arrays.asList("Huawei","XioaMi","OPPO","VIVO"); @Override public void run(SourceContext<MarketingUserBehavior> ctx) throws Exception { Random random = new Random(); while(flag){ ctx.collect( new MarketingUserBehavior( Long.valueOf(random.nextInt(10)), behaviorList.get(random.nextInt(behaviorList.size())), channelList.get(random.nextInt(channelList.size())), System.currentTimeMillis() ) ); Thread.sleep(1000L); } } @Override public void cancel() { flag = false; } } }
import com.atguigu.bean.MarketingUserBehavior; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.Arrays; import java.util.List; import java.util.Random; public class Flink28_Case_APPMarketingAnalysisWithoutChannel { public static void main(String[] args) throws Exception { // 0 Execution Environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 1. Read data, convert bean s DataStreamSource<MarketingUserBehavior> appDS = env.addSource(new AppSource()); // 2. Processing data: statistics of different behaviors (regardless of channel) // 2.1 Grouping by statistical dimension: behavior // 2.1.1 Convert to (Behavior, 1) Binary SingleOutputStreamOperator<Tuple2<String, Integer>> channelAndBehaviorTuple2 = appDS.map(new MapFunction<MarketingUserBehavior, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(MarketingUserBehavior value) throws Exception { return Tuple2.of(value.getBehavior(), 1); } }); // 2.1.2 Grouped by Behavior KeyedStream<Tuple2<String, Integer>, String> channelAndBehaviorKS = channelAndBehaviorTuple2.keyBy(data -> data.f0); // 2.2 Sum SingleOutputStreamOperator<Tuple2<String, Integer>> resultDS = channelAndBehaviorKS.sum(1); // 3. Output: Print resultDS.print("app marketing analysis by behavior"); env.execute(); } public static class AppSource implements SourceFunction<MarketingUserBehavior> { private boolean flag = true; private List<String> behaviorList = Arrays.asList("DOWNLOAD", "INSTALL", "UPDATE", "UNINSTALL"); private List<String> channelList = Arrays.asList("XIAOMI", "HUAWEI", "OPPO", "VIVO"); @Override public void run(SourceContext<MarketingUserBehavior> ctx) throws Exception { while (flag) { Random random = new Random(); ctx.collect( new MarketingUserBehavior( Long.valueOf(random.nextInt(10)), behaviorList.get(random.nextInt(behaviorList.size())), channelList.get(random.nextInt(channelList.size())), System.currentTimeMillis() ) ); Thread.sleep(1000L); } } @Override public void cancel() { flag = false; } } }
3) Page Advertising Analysis
In the marketing business indicators of e-commerce websites, in addition to their own APP promotion, they also take into account the advertising on the page (including the advertising of their own products and other websites). Therefore, the statistical analysis related to advertising is also an important indicator of marketing.
For advertising statistics, the simplest and most important thing is the click-through volume of page advertisements. Websites often need to set pricing strategies and adjust promotion methods based on the click-through volume of advertisements, which can also be used to collect user preference information. A more specific application is that we can divide the users according to their geographic location, so as to summarize the preferences of users in different provinces for different advertisements, which will be more helpful for the accurate placement of advertisements.
public class AdClickLog { private Long userId;//User ID private Long adId;//Advertising ID private String province;//Province private String city;//City private Long timestamp;//time stamp public AdClickLog() { } public AdClickLog(Long userId, Long adId, String province, String city, Long timestamp) { this.userId = userId; this.adId = adId; this.province = province; this.city = city; this.timestamp = timestamp; } public Long getUserId() { return userId; } public void setUserId(Long userId) { this.userId = userId; } public Long getAdId() { return adId; } public void setAdId(Long adId) { this.adId = adId; } public String getProvince() { return province; } public void setProvince(String province) { this.province = province; } public String getCity() { return city; } public void setCity(String city) { this.city = city; } public Long getTimestamp() { return timestamp; } public void setTimestamp(Long timestamp) { this.timestamp = timestamp; } @Override public String toString() { return "AdClickLog{" + "userId=" + userId + ", adId=" + adId + ", province='" + province + '\'' + ", city='" + city + '\'' + ", timestamp=" + timestamp + '}'; } }
import bean.AdClickLog; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class Case_AdClicAnalysis { public static void main(String[] args) throws Exception { //execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); //Convert Data to Bean Object SingleOutputStreamOperator<AdClickLog> adClickDs = env .readTextFile("backup_AdClickLog.csv") .map(new MapFunction<String, AdClickLog>() { @Override public AdClickLog map(String value) throws Exception { String[] datas = value.split(","); return new AdClickLog( Long.valueOf(datas[0]), Long.valueOf(datas[1]), datas[2], datas[3], Long.valueOf(datas[4]) ); } }); //Processing data: Real-time click-through statistics for different provinces and advertisements //Grouped by statistical dimension: provinces, advertisements SingleOutputStreamOperator<Tuple2<String, Integer>> resultDs = adClickDs .map(new MapFunction<AdClickLog, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(AdClickLog value) throws Exception { return Tuple2.of(value.getProvince() + "_" + value.getAdId(), 1); } }) .keyBy(data -> data.f0) .sum(1); resultDs.print("ad"); env.execute(); } }
4) Real-time monitoring of order payment
In an e-commerce website, the payment of orders as a direct link to marketing revenue is very important in the business process. For orders, in order to control the business process correctly and to increase the willingness of users to pay, the website will generally set a payment expiration time, and orders that do not pay for a period of time will be cancelled. In addition, for the payment of orders, we should also ensure the correctness of user payment, which can make a real-time reconciliation through the transaction data of third-party payment platform.
Sick order transactions from two streams match
For order payment incidents, user payment is not actually complete, we have to confirm whether the platform account is in the account. Often this comes from different log information, so we'll read in data from both streams for merging.
import com.atguigu.bean.AdClickLog; import com.atguigu.bean.OrderEvent; import com.atguigu.bean.TxEvent; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.ConnectedStreams; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoProcessFunction; import org.apache.flink.util.Collector; import java.util.HashMap; import java.util.Map; public class Flink30_Case_OrderTxDetect { public static void main(String[] args) throws Exception { // 0 Execution Environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); // 1. Read the data and convert it into a bean object SingleOutputStreamOperator<OrderEvent> orderDS = env .readTextFile("input/OrderLog.csv") .map(new MapFunction<String, OrderEvent>() { @Override public OrderEvent map(String value) throws Exception { String[] datas = value.split(","); return new OrderEvent( Long.valueOf(datas[0]), datas[1], datas[2], Long.valueOf(datas[3]) ); } }); SingleOutputStreamOperator<TxEvent> txDS = env .readTextFile("input/ReceiptLog.csv") .map(new MapFunction<String, TxEvent>() { @Override public TxEvent map(String value) throws Exception { String[] datas = value.split(","); return new TxEvent( datas[0], datas[1], Long.valueOf(datas[2]) ); } }); // 2. Processing data: real-time reconciliation monitoring // Two streams connect to make a match through txId, which means the reconciliation is successful // For transactions of the same order, it is not certain which comes first for the data of the business system and the transaction system. // TODO normally connect s two streams by keyby, in order to match the data together // You can use keyby before connecting, or connect before keyby ConnectedStreams<OrderEvent, TxEvent> orderTxCS = (orderDS.keyBy(order -> order.getTxId())) .connect(txDS.keyBy(tx -> tx.getTxId())); // Grouping by txId allows data of the same txId to get together // ConnectedStreams<OrderEvent, TxEvent> orderTxKS = orderTxCS.keyBy( // order -> order.getTxId(), // tx -> tx.getTxId()); // Processing with process SingleOutputStreamOperator<String> resultDS = orderTxCS.process( new CoProcessFunction<OrderEvent, TxEvent, String>() { // Data used to store trading systems private Map<String, TxEvent> txMap = new HashMap<>(); // Data used to store business systems private Map<String, OrderEvent> orderMap = new HashMap<>(); /** * Process business system data one at a time * @param value * @param ctx * @param out * @throws Exception */ @Override public void processElement1(OrderEvent value, Context ctx, Collector<String> out) throws Exception { // Enter this method to indicate that the data is from the business system // Is there any transaction data to judge? // The transaction data saved by the transaction code query=>If it is not empty, the transaction data has come, matching TxEvent txEvent = txMap.get(value.getTxId()); if (txEvent == null) { // 1. Save yourself temporarily when the transaction data is not available=>etc. orderMap.put(value.getTxId(), value); } else { // 2. Explain that the transaction data came from=>Successful reconciliation out.collect("Order" + value.getOrderId() + "Successful reconciliation"); // Reconciliation succeeded, deleting saved transaction data txMap.remove(value.getTxId()); } } /** * Processing data from trading systems, one at a time * @param value * @param ctx * @param out * @throws Exception */ @Override public void processElement2(TxEvent value, Context ctx, Collector<String> out) throws Exception { // Enter this method to indicate that the data is from the trading system // Judging if business data is available? OrderEvent orderEvent = orderMap.get(value.getTxId()); if (orderEvent == null) { // 1. Explain that business data is not coming=>Save yourself temporarily txMap.put(value.getTxId(), value); } else { // 2. Explain that business data came from=>Successful reconciliation out.collect("Order" + orderEvent.getOrderId() + "Successful reconciliation"); // Reconciliation succeeded, deleting saved business data orderMap.remove(value.getTxId()); } } } ); resultDS.print(); env.execute(); } }