Article Directory
- 1. Requirements
- 2. Analysis
- 3. Technical Points
- 4. Join Architecture
- 5. Tool Class and Standard Class
- 6. Main Line Code
- 1. Loading data sources from kafka
- 2. Parse the json data pulled from kafka
- 3. Extract EventTime to generate WaterMark
- 4. Left Out JOIN with Order Detail as the left table
- 5. Handle special cases that will occur: there is no join on
- 5.1. Processing late data for the left table (orderDetailWithWaterMark)
- 5.2. Processing data late for the right table - > that is, data null for the right table after join
- 5.3. union data together
- 6. Tool class for querying MySql database
1. Requirements
1. Requirement Description
-
Order data tables exist in a variety of systems
-
Order form: order master table, order detail table
Order master table: Order id, order status, total order amount, order time, user ID Order Detail Table: ID of the main order form, commodity ID, classification ID of the commodity, unit price of the commodity, quantity of the commodity
Counting the transaction amount for a commodity classification
- Order status is transacted (in main table)
- Classification ID of the commodity (in the breakdown table)
- Amount of commodity is unit price * quantity (detailed list)
2. Data preparation
- OrderMain:
- OrderMain imports data into Kafka:
{"data":[{"oid":"29001","create_time":"2020-03-16 00:38:01","total_money":"2000.0","status":"1", "update_time":"2020-03-16 00:38:01","uid":"9999","province":"Beijing"}],"database":"doit12", "es":1584333481000,"id":4,"isDdl":false,"mysqlType":{"oid":"bigint(20)","create_time":"timestamp", "total_money":"double","status":"int(11)","update_time":"timestamp","uid":"varchar(20)", "province":"varchar(50)"},"old":null,"pkNames":["oid"],"sql":"","sqlType":{"oid":-5,"create_time":93, "total_money":8,"status":4,"update_time":93,"uid":12,"province":12},"table":"ordermain", "ts":1584333481540,"type":"INSERT"}
- Order Detail:
- OrderDetail imports data to Kafka:
{"data":[{"id":"10","order_id":"29001","category_id":"2","sku":"20001","money":"1000.0","amount":"1", "create_time":"2020-03-16 00:38:01","update_time":"2020-03-16 00:38:01"}],"database":"doit12", "es":1584333481000,"id":4,"isDdl":false,"mysqlType":{"id":"bigint(20)","order_id":"bigint(20)", "category_id":"int(11)","sku":"varchar(50)","money":"double","amount":"int(11)", "create_time":"timestamp","update_time":"timestamp"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id ":-5,"order_id":-5,"category_id":4,"sku":12,"money":8,"amount":4,"create_time":93,"update_time":93} ,"table":"orderdetail","ts":1584333481600,"type":"INSERT"}
2. Analysis
To make statistics, you need to pull data from both tables into Flink to make a Join Association
- Data in business systems is basically stored in relational databases, such as Mysql
- With canal, you can import business data from MySQL into kafka (canal masquerades as Save from Mysql)
- Flink pulls business data from kafka via KafkaSource
- The pulled data is processed, and the main table and the secondary table are join ed.
- Then do the statistics of the transaction amount
3. Technical Points
- Use and principle of canal
- kafka producers, consumers, Topic
- Flink's EventTime Scrolling Window
- Flink's two-stream LeftJoin loop
- Flink's Flow Measurement Output Late Data Loops
- Mysql Connections and Queries
4. Join Architecture
There are two scenarios without join s:
- (1) Late arrival of left table data
->Use the flow measurement output, pull it out separately, then look up the database of the main table (right table) of the order, and associate the data of the main table (right table) - (2) Late data on the right
->Query the information of the right table based on the query criteria of the left table, look up the database or api interface - Then, write the two data union s together
5. Tool Class and Standard Class
1. FlinkUtilsV2 Get Tool Class for KafkaSource
import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Arrays; import java.util.List; import java.util.Properties; public class FlinkUtilsV2 { private static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Method overload 1: Incoming profile and data type public static <T> DataStream<T> createKafkaDataStream(ParameterTool parameters, Class<? extends DeserializationSchema<T>> clazz) throws Exception { String topics = parameters.getRequired("kafka.topics"); String groupId = parameters.getRequired("group.id"); return createKafkaDataStream(parameters, topics, groupId, clazz); } // Method overload two: incoming profile and data type public static <T> DataStream<T> createKafkaDataStream(ParameterTool parameters, String topics, Class<? extends DeserializationSchema<T>> clazz) throws Exception { String groupId = parameters.getRequired("group.id"); return createKafkaDataStream(parameters, topics, groupId, clazz); } // Method overload 1: Incoming profile and data type public static <T> DataStream<T> createKafkaDataStream(ParameterTool parameters, String topics, String groupId, Class<? extends DeserializationSchema<T>> clazz) throws Exception { // Set Profile as Global Profile env.getConfig().setGlobalJobParameters(parameters); // KafkaSink needs to configure this //parameters.getProperties().setProperty("transaction.timeout.ms", 1000 * 60 * 5 + ""); //Open checkpoint env.enableCheckpointing(parameters.getLong("checkpoint.interval", 10000L), CheckpointingMode.EXACTLY_ONCE); //Set restart policy env.setRestartStrategy(RestartStrategies.fixedDelayRestart( parameters.getInt("restart.times", 10), Time.seconds(3))); //Set statebackend String path = parameters.get("state.backend.path"); if (path != null) { //The best way to configure setStateBackend to flink-conf.yaml in Flink's global configuration file env.setStateBackend(new FsStateBackend(path)); } //Setting cancel task does not automatically delete checkpoint env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //Set maximum degree of parallelism env.getCheckpointConfig().setMaxConcurrentCheckpoints(4); //When multiple topic s occur, place in the list collection List<String> topicList = Arrays.asList(topics.split(",")); Properties properties = parameters.getProperties(); properties.setProperty("group.id", groupId); //Create FlinkKafkaConsumer FlinkKafkaConsumer<T> kafkaConsumer = new FlinkKafkaConsumer<T>( topicList, clazz.newInstance(), properties ); //Save the Kafka offset to Kafka's special Topic at Checkpoint, defaulting to true kafkaConsumer.setCommitOffsetsOnCheckpoints(false); // Return kafkaDataStream (lines) return env.addSource(kafkaConsumer); } public static StreamExecutionEnvironment getEnv() { return env; } }
2. OrderMainBean Order Master Table Class
import java.util.Date; public class OrderMainBean { private Long oid; private Date create_time; private Double total_money; private int status; private Date update_time; private String province; private String uid; //Types of operations on databases: INSERT, UPDATE private String type; public OrderMainBean() { } public OrderMainBean(Long oid, Date create_time, Double total_money, int status, Date update_time, String province, String uid, String type) { this.oid = oid; this.create_time = create_time; this.total_money = total_money; this.status = status; this.update_time = update_time; this.province = province; this.uid = uid; this.type = type; } @Override public String toString() { return "OrderMainBean{" + "oid=" + oid + ", create_time=" + create_time + ", total_money=" + total_money + ", status=" + status + ", update_time=" + update_time + ", province='" + province + '\'' + ", uid='" + uid + '\'' + ", type='" + type + '\'' + '}'; } public Long getOid() { return oid; } public void setOid(Long oid) { this.oid = oid; } public Date getCreate_time() { return create_time; } public void setCreate_time(Date create_time) { this.create_time = create_time; } public Double getTotal_money() { return total_money; } public void setTotal_money(Double total_money) { this.total_money = total_money; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public Date getUpdate_time() { return update_time; } public void setUpdate_time(Date update_time) { this.update_time = update_time; } public String getProvince() { return province; } public void setProvince(String province) { this.province = province; } public String getUid() { return uid; } public void setUid(String uid) { this.uid = uid; } public String getType() { return type; } public void setType(String type) { this.type = type; } }
3. OrderDetailBean Order Detail Table Class
import java.util.Date; public class OrderDetailBean { private Long id; private Long order_id; private int category_id; private Long sku; private Double money; private int amount; private Date create_time; private Date update_time; //Types of operations on databases: INSERT, UPDATE private String type; public OrderDetailBean() { } public OrderDetailBean(Long id, Long order_id, int category_id, Long sku, Double money, int amount, Date create_time, Date update_time, String type) { this.id = id; this.order_id = order_id; this.category_id = category_id; this.sku = sku; this.money = money; this.amount = amount; this.create_time = create_time; this.update_time = update_time; this.type = type; } @Override public String toString() { return "OrderDetailBean{" + "id=" + id + ", order_id=" + order_id + ", category_id=" + category_id + ", sku=" + sku + ", money=" + money + ", amount=" + amount + ", create_time=" + create_time + ", update_time=" + update_time + ", type='" + type + '\'' + '}'; } public Long getId() { return id; } public void setId(Long id) { this.id = id; } public Long getOrder_id() { return order_id; } public void setOrder_id(Long order_id) { this.order_id = order_id; } public int getCategory_id() { return category_id; } public void setCategory_id(int category_id) { this.category_id = category_id; } public Long getSku() { return sku; } public void setSku(Long sku) { this.sku = sku; } public Double getMoney() { return money; } public void setMoney(Double money) { this.money = money; } public int getAmount() { return amount; } public void setAmount(int amount) { this.amount = amount; } public Date getCreate_time() { return create_time; } public void setCreate_time(Date create_time) { this.create_time = create_time; } public Date getUpdate_time() { return update_time; } public void setUpdate_time(Date update_time) { this.update_time = update_time; } public String getType() { return type; } public void setType(String type) { this.type = type; } }
6. Main Line Code
- configuration file
checkpoint.interval=30000 restart.times=3 state.backend.path=file:///D:\\doit12logs bootstrap.servers=linux01:9092,linux02:9092,linux03:9092 group.id=g22 auto.offset.reset=earliest
1. Loading data sources from kafka
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import java.sql.*; /** * @date: 2020/3/16 21:22 * @site: www.ianlou.cn * @author: lekko Six Water * @qq: 496208110 * @description: */ public class OrderJoinAdv { public static void main(String[] args) throws Exception { //Get Configuration File ParameterTool propertiesFile = ParameterTool.fromPropertiesFile(args[0]); StreamExecutionEnvironment env = FlinkUtilsV2.getEnv(); // Use EventTime as the time standard env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // Loading data sources from kafka DataStream<String> orderMainLines = FlinkUtilsV2.createKafkaDataStream(propertiesFile, "ordermain", "g011", SimpleStringSchema.class); DataStream<String> orderDetailLines = FlinkUtilsV2.createKafkaDataStream(propertiesFile, "orderdetail", "g011", SimpleStringSchema.class);
2. Parse the json data pulled from kafka
// 2. Analyzing json data pulled from kafka // Loading data in OrderMain SingleOutputStreamOperator<OrderMainBean> orderMainBeanDS = orderMainLines.process(new ProcessFunction<String, OrderMainBean>() { @Override public void processElement(String input, Context ctx, Collector<OrderMainBean> out) throws Exception { try { JSONObject jsonObject = JSON.parseObject(input); String type = jsonObject.getString("type"); if (type.equals("INSERT") || type.equals("UPDATE")) { //Remove the contents from {} of the data[] array JSONArray jsonArray = jsonObject.getJSONArray("data"); for (int i = 0; i < jsonArray.size(); i++) { OrderMainBean orderMain = jsonArray.getObject(i, OrderMainBean.class); orderMain.setType(type); //Set Operation Type out.collect(orderMain); } } } catch (Exception e) { //e.printStackTrace(); //TODO Records Error Data } } }); //Loading data in OrderDetail SingleOutputStreamOperator<OrderDetailBean> orderDetailBeanDS = orderDetailLines.process(new ProcessFunction<String, OrderDetailBean>() { @Override public void processElement(String input, Context ctx, Collector<OrderDetailBean> out) throws Exception { try { JSONObject jsonObject = JSON.parseObject(input); String type = jsonObject.getString("type"); if (type.equals("INSERT") || type.equals("UPDATE")) { JSONArray jsonArray = jsonObject.getJSONArray("data"); for (int i = 0; i < jsonArray.size(); i++) { OrderDetailBean orderDetail = jsonArray.getObject(i, OrderDetailBean.class); orderDetail.setType(type); //Set Operation Type out.collect(orderDetail); } } } catch (Exception e) { //e.printStackTrace(); //Record wrong data } } });
3. Extract EventTime to generate WaterMark
int delaySeconds = 2; int windowSize = 5; SingleOutputStreamOperator<OrderMainBean> orderMainWithWaterMark = orderMainBeanDS.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor <OrderMainBean>(Time.seconds(delaySeconds)) { @Override public long extractTimestamp(OrderMainBean element) { return element.getCreate_time().getTime(); } }); SingleOutputStreamOperator<OrderDetailBean> orderDetailWithWaterMark = orderDetailBeanDS.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor <OrderDetailBean>(Time.seconds(delaySeconds)) { @Override public long extractTimestamp(OrderDetailBean element) { return element.getCreate_time().getTime(); } });
4. Left Out JOIN with Order Detail as the left table
DataStream<Tuple2<OrderDetailBean, OrderMainBean>> joined = orderDetailWithWaterMark.coGroup(orderMainWithWaterMark) .where(new KeySelector<OrderDetailBean, Long>() { @Override public Long getKey(OrderDetailBean value) throws Exception { return value.getOrder_id(); } }) .equalTo(new KeySelector<OrderMainBean, Long>() { @Override public Long getKey(OrderMainBean value) throws Exception { return value.getOid(); } }) .window(TumblingEventTimeWindows.of(Time.seconds(windowSize))) .apply(new CoGroupFunction<OrderDetailBean, OrderMainBean, Tuple2<OrderDetailBean, OrderMainBean>>() { // After join ing, the output @Override public void coGroup(Iterable<OrderDetailBean> first, Iterable<OrderMainBean> second, Collector<Tuple2<OrderDetailBean, OrderMainBean>> out) throws Exception { for (OrderDetailBean orderDetailBean : first) { boolean isJoined = false; for (OrderMainBean orderMainBean : second) { out.collect(Tuple2.of(orderDetailBean, orderMainBean)); isJoined = true; } if (!isJoined) { out.collect(Tuple2.of(orderDetailBean, null)); } } } });
5. Handle special cases that will occur: there is no join on
There are two scenarios without join s:
- (1) Late arrival of left table data
->Use the flow measurement output, pull it out separately, then look up the database of the main table (right table) of the order, and associate the data of the main table (right table) - (2) Late data on the right
->Query the information of the right table based on the query criteria of the left table, look up the database or api interface - Then, write the two data union s together
5.1. Processing late data for the left table (orderDetailWithWaterMark)
OutputTag<OrderDetailBean> outputTag = new OutputTag<OrderDetailBean>("leftLate-date") { }; // Divide windows to match window size and type of join above SingleOutputStreamOperator<OrderDetailBean> orderDetailLateData = orderDetailWithWaterMark.windowAll(TumblingEventTimeWindows.of(Time.seconds(windowSize))) .sideOutputLateData(outputTag) .apply(new AllWindowFunction<OrderDetailBean, OrderDetailBean, TimeWindow>() { @Override public void apply(TimeWindow window, Iterable<OrderDetailBean> values, Collector<OrderDetailBean> out) throws Exception { //Nothing to do, just label the late data // apply is used because you want to operate on the full amount of data in the window } }); // Get Late Data from Left Table DataStream<OrderDetailBean> leftLateDate = orderDetailLateData.getSideOutput(outputTag); // Lookup, associate right table (OrderMain) data SingleOutputStreamOperator<Tuple2<OrderDetailBean, OrderMainBean>> lateOrderDetailAndOrderMain = leftLateDate.map(new RichMapFunction<OrderDetailBean, Tuple2<OrderDetailBean, OrderMainBean>>() { private transient Connection conn = null; // Create a JDBC connection @Override public void open(Configuration parameters) throws Exception { conn = DriverManager.getConnection( "jdbc:mysql://linux04:3306/doit12?characterEncoding=utf8", "root", "123456" ); } //query data base @Override public Tuple2<OrderDetailBean, OrderMainBean> map(OrderDetailBean value) throws Exception { Long order_id = value.getOrder_id(); String type = value.getType(); OrderMainBean orderMainB = queryOrderMainFromMySQL(order_id, type, conn); return Tuple2.of(value, orderMainB); } // Close JDBC connection @Override public void close() throws Exception { conn.close(); } });
5.2. Processing data late for the right table - > that is, data null for the right table after join
SingleOutputStreamOperator<Tuple2<OrderDetailBean, OrderMainBean>> lateOrderMainAndOrderMain = joined.map(new RichMapFunction<Tuple2<OrderDetailBean, OrderMainBean>, Tuple2<OrderDetailBean, OrderMainBean>>() { private transient Connection conn = null; @Override public void open(Configuration parameters) throws Exception { conn = DriverManager.getConnection( "jdbc:mysql://linux04:3306/doit12?characterEncoding=utf8", "root", "123456" ); } //Query database Query data from right table according to the query criteria of left table @Override public Tuple2<OrderDetailBean, OrderMainBean> map(Tuple2<OrderDetailBean, OrderMainBean> value) throws Exception { OrderMainBean orderMainB = null; if (value.f1 == null) { Long order_id = value.f0.getOrder_id(); String type = value.f0.getType(); orderMainB = queryOrderMainFromMySQL(order_id, type, conn); } return Tuple2.of(value.f0, orderMainB); } @Override public void close() throws Exception { conn.close(); } });
5.3. union data together
DataStream<Tuple2<OrderDetailBean, OrderMainBean>> allOrderStream = lateOrderMainAndOrderMain.union(lateOrderDetailAndOrderMain); allOrderStream.print(); FlinkUtilsV2.getEnv().execute("OrderJoinAdv"); }
6. Tool class for querying MySql database
private static OrderMainBean queryOrderMainFromMySQL(Long order_id, String type, Connection conn) throws Exception { ResultSet resultSet = null; PreparedStatement preparedStatement = null; try { String sql = "select oid, create_time, total_money, status, update_time, province, uid from ordermain" + " where oid = ?"; preparedStatement = conn.prepareStatement(sql); preparedStatement.setLong(1, order_id); resultSet = preparedStatement.executeQuery(); } finally { if (resultSet != null) { resultSet.close(); } if (preparedStatement != null) { preparedStatement.close(); } if(conn != null){ conn.close(); } } long oid = resultSet.getLong("oid"); Date create_time = resultSet.getDate("create_time"); double total_money1 = resultSet.getDouble("total_money"); int status = resultSet.getInt("status"); Date update_time = resultSet.getDate("update_time"); String province = resultSet.getString("province"); String uid = resultSet.getString("uid"); OrderMainBean orderMain = new OrderMainBean(oid, create_time, total_money1, status, update_time, province, uid, type); return orderMain; } }