Design ideas
In the past, we split the data into independent Kafka topics through diversion and other processing methods. Next, when processing the data, we should consider processing the index items used in real-time calculation. Timeliness is the pursuit of real-time data warehouse. Therefore, in some scenarios, it is not necessary to have a large and complete middle layer like off-line data warehouse. We only need the middle layer to save some calculation indexes, Provide convenience for the next calculation.
Therefore, we need to consider some real-time calculation index requirements. Outputting these indexes in the form of topic wide table is the dws layer
Some of the indicators listed here are mainly service visualization and large screen computing. After all, why hasn't the dwm layer appeared? Don't worry. The dwm layer mainly serves the dws layer. Because some requirements directly from the dwd layer to the dws layer will have some complex calculations, and the results of these calculations are likely to be reused by multiple topics in the dws layer, so the dwd layer forms a layer of dwm, which mainly involves business
- Visitor UV calculation
- Jump out of detail calculation
- Order width table
- Payment width table
Visitor UV calculation
UV(Unique Visitor), i.e. independent visitor, also known as dau (daily active user), i.e. daily active user
The user behavior log identifies the visitors of the day in two ways:
- Identify the first page opened by the visitor, indicating that the visitor begins to enter our application
- Since visitors can enter the application many times a day, we can de duplicate it within a day
Data flow: Web / app - > nginx - > springboot - > Kafka (ODS) - > flinkapp - > Kafka (DWD) - > flinkapp - > Kafka (DWM)
Program: mocklog - > nginx - > logger sh -> Kafka(ZK) -> BaseLogApp -> kafka -> UniqueVisitApp -> Kafka
//TODO 1. Get execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //TODO 2. Read Kafka dwd_page_log subject data String groupId = "unique_visit_app_210325"; String sourceTopic = "dwd_page_log"; String sinkTopic = "dwm_unique_visit"; DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId)); //TODO 3. Convert each row of data into a JSON object SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject); //TODO 4. Filtering data status programming only retains the data of the first login of each mid every day KeyedStream<JSONObject, String> keyedStream = jsonObjDS.keyBy(jsonObj -> jsonObj.getJSONObject("common").getString("mid")); SingleOutputStreamOperator<JSONObject> uvDS = keyedStream.filter(new RichFilterFunction<JSONObject>() { private ValueState<String> dateState; private SimpleDateFormat simpleDateFormat; @Override public void open(Configuration parameters) throws Exception { ValueStateDescriptor<String> valueStateDescriptor = new ValueStateDescriptor<>("date-state", String.class); //Set the timeout time of the status and how to update the time StateTtlConfig stateTtlConfig = new StateTtlConfig .Builder(Time.hours(24)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .build(); valueStateDescriptor.enableTimeToLive(stateTtlConfig); dateState = getRuntimeContext().getState(valueStateDescriptor); simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd"); } @Override public boolean filter(JSONObject value) throws Exception { //Take out the previous page information String lastPageId = value.getJSONObject("page").getString("last_page_id"); //Determine whether the previous page is Null if (lastPageId == null || lastPageId.length() <= 0) { //Fetch status data String lastDate = dateState.value(); //Take out today's date String curDate = simpleDateFormat.format(value.getLong("ts")); //Judge whether the two dates are the same if (!curDate.equals(lastDate)) { dateState.update(curDate); return true; } } return false; } }); //TODO 5. Write data to Kafka uvDS.print(); uvDS.map(JSONAware::toJSONString) .addSink(MyKafkaUtil.getKafkaProducer(sinkTopic)); //TODO 6. Start task env.execute("UniqueVisitApp");
Data flow: Web / app - > nginx - > springboot - > Kafka (ODS) - > flinkapp - > Kafka (DWD) - > flinkapp - > Kafka (DWM)
Program: mocklog - > nginx - > logger sh -> Kafka(ZK) -> BaseLogApp -> kafka -> UserJumpDetailApp -> Kafka
Jump out of detail calculation
//TODO 1. Get execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //Production environment, consistent with the number of Kafka partitions //TODO 2. Read the data of Kafka topic and create a stream String sourceTopic = "dwd_page_log"; String groupId = "userJumpDetailApp"; String sinkTopic = "dwm_user_jump_detail"; DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId)); //TODO 3. Convert each row of data into a JSON object and extract the timestamp to generate a Watermark SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject) .assignTimestampsAndWatermarks(WatermarkStrategy .<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(1)) .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() { @Override public long extractTimestamp(JSONObject element, long recordTimestamp) { return element.getLong("ts"); } })); //TODO 4. Define pattern sequence Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("start").where(new SimpleCondition<JSONObject>() { @Override public boolean filter(JSONObject value) throws Exception { String lastPageId = value.getJSONObject("page").getString("last_page_id"); return lastPageId == null || lastPageId.length() <= 0; } }).next("next").where(new SimpleCondition<JSONObject>() { @Override public boolean filter(JSONObject value) throws Exception { String lastPageId = value.getJSONObject("page").getString("last_page_id"); return lastPageId == null || lastPageId.length() <= 0; } }).within(Time.seconds(10)); //Defining pattern sequences using circular patterns Pattern.<JSONObject>begin("start").where(new SimpleCondition<JSONObject>() { @Override public boolean filter(JSONObject value) throws Exception { String lastPageId = value.getJSONObject("page").getString("last_page_id"); return lastPageId == null || lastPageId.length() <= 0; } }) .times(2) .consecutive() //Strict nearest neighbor .within(Time.seconds(10)); //TODO 5. Apply a pattern sequence to a stream PatternStream<JSONObject> patternStream = CEP .pattern(jsonObjDS.keyBy(json -> json.getJSONObject("common").getString("mid")) , pattern); //TODO 6. Extract and timeout events on matching OutputTag<JSONObject> timeOutTag = new OutputTag<JSONObject>("timeOut") { }; SingleOutputStreamOperator<JSONObject> selectDS = patternStream.select(timeOutTag, new PatternTimeoutFunction<JSONObject, JSONObject>() { @Override public JSONObject timeout(Map<String, List<JSONObject>> map, long ts) throws Exception { return map.get("start").get(0); } }, new PatternSelectFunction<JSONObject, JSONObject>() { @Override public JSONObject select(Map<String, List<JSONObject>> map) throws Exception { return map.get("start").get(0); } }); DataStream<JSONObject> timeOutDS = selectDS.getSideOutput(timeOutTag); //TODO 7.UNION two events DataStream<JSONObject> unionDS = selectDS.union(timeOutDS); //TODO 8. Write data to Kafka unionDS.print(); unionDS.map(JSONAware::toJSONString) .addSink(MyKafkaUtil.getKafkaProducer(sinkTopic)); //TODO 9. Start task env.execute("UserJumpDetailApp"); }
Order wide table design
//TODO 1. Get execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //TODO 2. Read the data of Kafka theme and convert it into JavaBean object & extract the timest amp to generate WaterMark String orderInfoSourceTopic = "dwd_order_info"; String orderDetailSourceTopic = "dwd_order_detail"; String orderWideSinkTopic = "dwm_order_wide"; String groupId = "order_wide_group_0325"; SingleOutputStreamOperator<OrderInfo> orderInfoDS = env.addSource(MyKafkaUtil.getKafkaConsumer(orderInfoSourceTopic, groupId)) .map(line -> { OrderInfo orderInfo = JSON.parseObject(line, OrderInfo.class); String create_time = orderInfo.getCreate_time(); String[] dateTimeArr = create_time.split(" "); orderInfo.setCreate_date(dateTimeArr[0]); orderInfo.setCreate_hour(dateTimeArr[1].split(":")[0]); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); orderInfo.setCreate_ts(sdf.parse(create_time).getTime()); return orderInfo; }).assignTimestampsAndWatermarks(WatermarkStrategy.<OrderInfo>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<OrderInfo>() { @Override public long extractTimestamp(OrderInfo element, long recordTimestamp) { return element.getCreate_ts(); } })); SingleOutputStreamOperator<OrderDetail> orderDetailDS = env.addSource(MyKafkaUtil.getKafkaConsumer(orderDetailSourceTopic, groupId)) .map(line -> { OrderDetail orderDetail = JSON.parseObject(line, OrderDetail.class); String create_time = orderDetail.getCreate_time(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); orderDetail.setCreate_ts(sdf.parse(create_time).getTime()); return orderDetail; }).assignTimestampsAndWatermarks(WatermarkStrategy.<OrderDetail>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<OrderDetail>() { @Override public long extractTimestamp(OrderDetail element, long recordTimestamp) { return element.getCreate_ts(); } })); //TODO 3. Double flow JOIN SingleOutputStreamOperator<OrderWide> orderWideWithNoDimDS = orderInfoDS.keyBy(OrderInfo::getId) .intervalJoin(orderDetailDS.keyBy(OrderDetail::getOrder_id)) .between(Time.seconds(-5), Time.seconds(5)) //Maximum delay time in the build environment .process(new ProcessJoinFunction<OrderInfo, OrderDetail, OrderWide>() { @Override public void processElement(OrderInfo orderInfo, OrderDetail orderDetail, Context ctx, Collector<OrderWide> out) throws Exception { out.collect(new OrderWide(orderInfo, orderDetail)); } }); //Print test orderWideWithNoDimDS.print("orderWideWithNoDimDS>>>>>>>>>"); //TODO 4. Associated dimension information HBase Phoenix // orderWideWithNoDimDS.map(orderWide -> { // //Associated user dimension // Long user_id = orderWide.getUser_id(); // //According to user_id query Phoenix user information // //Add user information to orderWide // //Area // //SKU // //SPU // //. . . // //Return results // return orderWide; // }); //4.1 associated user dimension SingleOutputStreamOperator<OrderWide> orderWideWithUserDS = AsyncDataStream.unorderedWait( orderWideWithNoDimDS, new DimAsyncFunction<OrderWide>("DIM_USER_INFO") { @Override public String getKey(OrderWide orderWide) { return orderWide.getUser_id().toString(); } @Override public void join(OrderWide orderWide, JSONObject dimInfo) throws ParseException { orderWide.setUser_gender(dimInfo.getString("GENDER")); String birthday = dimInfo.getString("BIRTHDAY"); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); long currentTs = System.currentTimeMillis(); long ts = sdf.parse(birthday).getTime(); long age = (currentTs - ts) / (1000 * 60 * 60 * 24 * 365L); orderWide.setUser_age((int) age); } }, 60, TimeUnit.SECONDS); //Print test // orderWideWithUserDS.print("orderWideWithUserDS"); //4.2 related region dimension SingleOutputStreamOperator<OrderWide> orderWideWithProvinceDS = AsyncDataStream.unorderedWait(orderWideWithUserDS, new DimAsyncFunction<OrderWide>("DIM_BASE_PROVINCE") { @Override public String getKey(OrderWide orderWide) { return orderWide.getProvince_id().toString(); } @Override public void join(OrderWide orderWide, JSONObject dimInfo) throws ParseException { orderWide.setProvince_name(dimInfo.getString("NAME")); orderWide.setProvince_area_code(dimInfo.getString("AREA_CODE")); orderWide.setProvince_iso_code(dimInfo.getString("ISO_CODE")); orderWide.setProvince_3166_2_code(dimInfo.getString("ISO_3166_2")); } }, 60, TimeUnit.SECONDS); //4.3 associated SKU dimensions SingleOutputStreamOperator<OrderWide> orderWideWithSkuDS = AsyncDataStream.unorderedWait( orderWideWithProvinceDS, new DimAsyncFunction<OrderWide>("DIM_SKU_INFO") { @Override public void join(OrderWide orderWide, JSONObject jsonObject) throws ParseException { orderWide.setSku_name(jsonObject.getString("SKU_NAME")); orderWide.setCategory3_id(jsonObject.getLong("CATEGORY3_ID")); orderWide.setSpu_id(jsonObject.getLong("SPU_ID")); orderWide.setTm_id(jsonObject.getLong("TM_ID")); } @Override public String getKey(OrderWide orderWide) { return String.valueOf(orderWide.getSku_id()); } }, 60, TimeUnit.SECONDS); //4.4 associated SPU dimension SingleOutputStreamOperator<OrderWide> orderWideWithSpuDS = AsyncDataStream.unorderedWait( orderWideWithSkuDS, new DimAsyncFunction<OrderWide>("DIM_SPU_INFO") { @Override public void join(OrderWide orderWide, JSONObject jsonObject) throws ParseException { orderWide.setSpu_name(jsonObject.getString("SPU_NAME")); } @Override public String getKey(OrderWide orderWide) { return String.valueOf(orderWide.getSpu_id()); } }, 60, TimeUnit.SECONDS); //4.5 associated TM dimensions SingleOutputStreamOperator<OrderWide> orderWideWithTmDS = AsyncDataStream.unorderedWait( orderWideWithSpuDS, new DimAsyncFunction<OrderWide>("DIM_BASE_TRADEMARK") { @Override public void join(OrderWide orderWide, JSONObject jsonObject) throws ParseException { orderWide.setTm_name(jsonObject.getString("TM_NAME")); } @Override public String getKey(OrderWide orderWide) { return String.valueOf(orderWide.getTm_id()); } }, 60, TimeUnit.SECONDS); //4.6 associated Category dimension SingleOutputStreamOperator<OrderWide> orderWideWithCategory3DS = AsyncDataStream.unorderedWait( orderWideWithTmDS, new DimAsyncFunction<OrderWide>("DIM_BASE_CATEGORY3") { @Override public void join(OrderWide orderWide, JSONObject jsonObject) throws ParseException { orderWide.setCategory3_name(jsonObject.getString("NAME")); } @Override public String getKey(OrderWide orderWide) { return String.valueOf(orderWide.getCategory3_id()); } }, 60, TimeUnit.SECONDS); orderWideWithCategory3DS.print("orderWideWithCategory3DS>>>>>>>>>>>"); //TODO 5. Write data to Kafka orderWideWithCategory3DS .map(JSONObject::toJSONString) .addSink(MyKafkaUtil.getKafkaProducer(orderWideSinkTopic)); //TODO 6. Start task env.execute("OrderWideApp"); }
Pay wide table design
//TODO 1. Get execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //TODO 2. Read the data of Kafka theme, create a stream and convert it into JavaBean object, extract the timestamp and generate WaterMark String groupId = "payment_wide_group"; String paymentInfoSourceTopic = "dwd_payment_info"; String orderWideSourceTopic = "dwm_order_wide"; String paymentWideSinkTopic = "dwm_payment_wide"; SingleOutputStreamOperator<OrderWide> orderWideDS = env.addSource(MyKafkaUtil.getKafkaConsumer(orderWideSourceTopic, groupId)) .map(line -> JSON.parseObject(line, OrderWide.class)) .assignTimestampsAndWatermarks(WatermarkStrategy.<OrderWide>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<OrderWide>() { @Override public long extractTimestamp(OrderWide element, long recordTimestamp) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); try { return sdf.parse(element.getCreate_time()).getTime(); } catch (ParseException e) { e.printStackTrace(); return recordTimestamp; } } })); SingleOutputStreamOperator<PaymentInfo> paymentInfoDS = env.addSource(MyKafkaUtil.getKafkaConsumer(paymentInfoSourceTopic, groupId)) .map(line -> JSON.parseObject(line, PaymentInfo.class)) .assignTimestampsAndWatermarks(WatermarkStrategy.<PaymentInfo>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<PaymentInfo>() { @Override public long extractTimestamp(PaymentInfo element, long recordTimestamp) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); try { return sdf.parse(element.getCreate_time()).getTime(); } catch (ParseException e) { e.printStackTrace(); return recordTimestamp; } } })); //TODO 3. Double flow JOIN SingleOutputStreamOperator<PaymentWide> paymentWideDS = paymentInfoDS.keyBy(PaymentInfo::getOrder_id) .intervalJoin(orderWideDS.keyBy(OrderWide::getOrder_id)) .between(Time.minutes(-15), Time.seconds(5)) .process(new ProcessJoinFunction<PaymentInfo, OrderWide, PaymentWide>() { @Override public void processElement(PaymentInfo paymentInfo, OrderWide orderWide, Context ctx, Collector<PaymentWide> out) throws Exception { out.collect(new PaymentWide(paymentInfo, orderWide)); } }); //TODO 4. Write data to Kafka paymentWideDS.print(">>>>>>>>>"); paymentWideDS .map(JSONObject::toJSONString) .addSink(MyKafkaUtil.getKafkaProducer(paymentWideSinkTopic)); //TODO 5. Start task env.execute("PaymentWideApp"); }
DWM layer summary
It is mainly used to convert one kind of details into another through calculation to deal with subsequent statistics and calculation
De duplication using state
CEP can be used to screen and judge a group of data
Learn to use intervaljoin to process stream join
Handle dimension association and optimize performance through caching and asynchronous query