Based on flink1 13.3 build quasi real-time data warehouse with hudi 0.10.0-release

This paper is based on Flink SQL and hudi Build quasi real-time data warehouse from Flink kafka After accessing the data, all data is stored in hudi, including all intermediate processing data and final data. Articles< Real time data warehouse | based on flink1 Exploration and practice of building real-time data warehouse based on SQL 11 (qq.com) >This paper describes the real-time data warehouse based on Flink SQL and kafka. This paper is based on the above article.

While completing the practice of this paper, you can refer to the above articles synchronously.

Final result:

Background introduction

Taking e-commerce business as an example, this paper shows the data processing flow of quasi real-time data warehouse.

Component and configuration description

Flink 1.13.3

flink cdc 2.0.2

hudi 0.10.0 (latest release on 2021.12.08, address: https://github.com/apache/hudi/releases/tag/release-0.10.0 )

hadoop 3.2.0

zeppelin 0.10.0

mysql 5.7 (enable binlog)

kafka 2.5.0

Due to the convenience of zeppelin, this article is based on zeppelin for task submission. If you can't use zeppelin, you can refer to: https://lrting-top.blog.csdn.net/article/details/120681666 . Of course, if you don't want to use zeppelin, there's no problem submitting with Flink SQL Client.

In this experiment, Flink starts checkpoint and is set to 60s.

Before completing the following tasks, make sure you have

  • Deploy Flink 1.13.3, package the jar package corresponding to hudi correctly and place it in the lib directory of Flink, and place the jar package corresponding to flink cdc in the lib directory.
  • Deploy and start zeppelin 0.10.0, and specify Flink on the Flink interpreter of zeppelin_ Home and HADOOP_CLASSPATH
  • At the same time, start hadoop, mysql and kafka

Processing flow

MySQL table creation and raw data loading

First, obtain the mysql table creation statement and simulation data from the following address:

https://obs-githubhelper.obs.cn-east-3.myhuaweicloud.com/blog-images/category/bigdata/hudi/flink-hudi-realtime/realtime_table.zip

After downloading the above table creation statement, enter mysql and create a new realtime_dw_demo_1. Enter the database realtime_dw_demo_1. Initialize the database

mysql -u root -p

create database realtime_dw_demo_1;

use database realtime_dw_demo_1;

source realtime_table.sql

Synchronize mysql table data to kafka

Synchronize mysql data to kafka by using flick CDC. The following are related sql statements:

Read mysql source table data

%flink.ssql

drop table if exists base_category1;
drop table if exists base_category2;
drop table if exists base_category3;
drop table if exists base_province;
drop table if exists base_region;
drop table if exists base_trademark;
drop table if exists date_info;
drop table if exists holiday_info;
drop table if exists holiday_year;
drop table if exists order_detail;
drop table if exists order_info;
drop table if exists order_status_log;
drop table if exists payment_info;
drop table if exists sku_info;
drop table if exists user_info;

---mysql table

CREATE TABLE `base_category1` (
  `id` bigint NOT NULL,
  `name` string NOT NULL,
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'base_category1'
);

CREATE TABLE `base_category2` (
  `id` bigint NOT NULL COMMENT 'number',
  `name` string NOT NULL COMMENT 'Secondary classification name',
  `category1_id` bigint  NULL COMMENT 'Primary classification number',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'base_category2'
);


CREATE TABLE `base_category3` (
  `id` bigint NOT NULL COMMENT 'number',
  `name` string NOT NULL COMMENT 'Three level classification name',
  `category2_id` bigint  NULL COMMENT 'Secondary classification number',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'base_category3'
);

CREATE TABLE `base_province` (
  `id` int  NULL COMMENT 'id',
  `name` string  NULL COMMENT 'Province name',
  `region_id` int  NULL COMMENT 'Large area id',
  `area_code` string  NULL COMMENT 'Administrative location code',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'base_province'
);

CREATE TABLE `base_region` (
  `id` int NOT NULL COMMENT 'Large area id',
  `region_name` string  NULL COMMENT 'Region name',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'base_region'
);

CREATE TABLE `base_trademark` (
  `tm_id` string  NULL COMMENT 'brand id',
  `tm_name` string  NULL COMMENT 'Brand name',
  PRIMARY KEY (`tm_id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'base_trademark'
);


CREATE TABLE `date_info` (
  `date_id` int NOT NULL,
  `week_id` int  NULL,
  `week_day` int  NULL,
  `day` int  NULL,
  `month` int  NULL,
  `quarter` int  NULL,
  `year` int  NULL,
  `is_workday` int  NULL,
  `holiday_id` int  NULL,
  PRIMARY KEY (`date_id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'date_info'
);


CREATE TABLE `holiday_info` (
  `holiday_id` int NOT NULL,
  `holiday_name` string  NULL,
  PRIMARY KEY (`holiday_id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'holiday_info'
);

CREATE TABLE `holiday_year` (
  `year_id` int  NULL,
  `holiday_id` int  NULL,
  `start_date_id` int  NULL,
  `end_date_id` int  NULL,
  PRIMARY KEY (`end_date_id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'holiday_year'
);

CREATE TABLE `order_detail` (
  `id` bigint NOT NULL COMMENT 'number',
  `order_id` bigint  NULL COMMENT 'Order No',
  `sku_id` bigint  NULL COMMENT 'sku_id',
  `sku_name` string  NULL COMMENT 'sku Name (redundant))',
  `img_url` string  NULL COMMENT 'Picture name (redundant))',
  `order_price` decimal(10,2)  NULL COMMENT 'Purchase price(When placing an order sku Price)',
  `sku_num` string  NULL COMMENT 'Number of purchases',
  `create_time` timestamp(3)  NULL COMMENT 'Creation time',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'order_detail'
);


CREATE TABLE `order_info` (
  `id` bigint NOT NULL COMMENT 'number',
  `consignee` string  NULL COMMENT 'consignee',
  `consignee_tel` string  NULL COMMENT 'Recipient phone',
  `total_amount` decimal(10,2)  NULL COMMENT 'Total amount',
  `order_status` string  NULL COMMENT 'Order status',
  `user_id` bigint  NULL COMMENT 'user id',
  `payment_way` string  NULL COMMENT 'payment method',
  `delivery_address` string  NULL COMMENT 'Shipping address',
  `order_comment` string  NULL COMMENT 'Order remarks',
  `out_trade_no` string  NULL COMMENT 'Order transaction number (for third party payment))',
  `trade_body` string  NULL COMMENT 'Order description(For third party payment)',
  `create_time` timestamp(3)  NULL COMMENT 'Creation time',
  `operate_time` timestamp(3)  NULL COMMENT 'Operation time',
  `expire_time` timestamp(3)  NULL COMMENT 'Failure time',
  `tracking_no` string  NULL COMMENT 'Logistics order No',
  `parent_order_id` bigint  NULL COMMENT 'Parent order number',
  `img_url` string  NULL COMMENT 'Picture path',
  `province_id` int  NULL COMMENT 'region',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'order_info'
);


CREATE TABLE `order_status_log` (
  `id` int NOT NULL,
  `order_id` int  NULL,
  `order_status` int  NULL,
  `operate_time` timestamp(3)  NULL,
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'order_status_log'
);


CREATE TABLE `payment_info` (
  `id` bigint NOT NULL COMMENT 'number',
  `out_trade_no` string  NULL COMMENT 'External business No',
  `order_id` string  NULL COMMENT 'Order No',
  `user_id` string  NULL COMMENT 'User number',
  `alipay_trade_no` string  NULL COMMENT 'Alipay transaction flow code',
  `total_amount` decimal(16,2)  NULL COMMENT 'Payment amount',
  `subject` string  NULL COMMENT 'Transaction content',
  `payment_type` string  NULL COMMENT 'Payment method',
  `payment_time` string  NULL COMMENT 'Payment time',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'payment_info'
);


CREATE TABLE `sku_info` (
  `id` bigint NOT NULL COMMENT 'skuid(itemID)',
  `spu_id` bigint  NULL COMMENT 'spuid',
  `price` decimal(10,0)  NULL COMMENT 'Price',
  `sku_name` string  NULL COMMENT 'sku name',
  `sku_desc` string  NULL COMMENT 'Product specification description',
  `weight` decimal(10,2)  NULL COMMENT 'weight',
  `tm_id` bigint  NULL COMMENT 'brand(redundancy)',
  `category3_id` bigint  NULL COMMENT 'Three level classification id(redundancy)',
  `sku_default_img` string  NULL COMMENT 'Default display picture(redundancy)',
  `create_time` timestamp(3)  NULL COMMENT 'Creation time',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'sku_info'
);

CREATE TABLE `user_info` (
  `id` bigint NOT NULL COMMENT 'number',
  `login_name` string  NULL COMMENT 'User name',
  `nick_name` string  NULL COMMENT 'User nickname',
  `passwd` string  NULL COMMENT 'User password',
  `name` string  NULL COMMENT 'User name',
  `phone_num` string  NULL COMMENT 'cell-phone number',
  `email` string  NULL COMMENT 'mailbox',
  `head_img` string  NULL COMMENT 'head portrait',
  `user_level` string  NULL COMMENT 'User level',
  `birthday` date  NULL COMMENT 'User birthday',
  `gender` string  NULL COMMENT 'Gender M male,F female',
  `create_time` timestamp(3)  NULL COMMENT 'Creation time',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'mysql-cdc',
  'hostname' = 'vhost-118-23',
  'port' = '3306',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'server-time-zone'= 'Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name' = 'realtime_dw_demo_1',
  'table-name' = 'user_info'
);

kafka sink table creation statement

%flink.ssql

drop table if exists base_category1_topic;
drop table if exists base_category2_topic;
drop table if exists base_category3_topic;
drop table if exists base_province_topic;
drop table if exists base_region_topic;
drop table if exists base_trademark_topic;
drop table if exists date_info_topic;
drop table if exists holiday_info_topic;
drop table if exists holiday_year_topic;
drop table if exists order_detail_topic;
drop table if exists order_info_topic;
drop table if exists order_status_log_topic;
drop table if exists payment_info_topic;
drop table if exists sku_info_topic;
drop table if exists user_info_topic;

CREATE TABLE `base_category1_topic` (
  `id` bigint NOT NULL  COMMENT 'number',
  `name` string NOT NULL COMMENT 'Classification name',
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_category1'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

CREATE TABLE `base_category2_topic` (
  `id` bigint NOT NULL  COMMENT 'number',
  `name` string NOT NULL COMMENT 'Secondary classification name',
  `category1_id` bigint  NULL COMMENT 'Primary classification number',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_category2'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);


CREATE TABLE `base_category3_topic` (
  `id` bigint NOT NULL  COMMENT 'number',
  `name` string NOT NULL COMMENT 'Three level classification name',
  `category2_id` bigint  NULL COMMENT 'Secondary classification number',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_category3'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

CREATE TABLE `base_province_topic` (
  `id` int  NULL COMMENT 'id',
  `name` string  NULL COMMENT 'Province name',
  `region_id` int  NULL COMMENT 'Large area id',
  `area_code` string  NULL COMMENT 'Administrative location code'
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_province'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

CREATE TABLE `base_region_topic` (
  `id` int NOT NULL COMMENT 'Large area id',
  `region_name` string  NULL COMMENT 'Region name',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_region'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

CREATE TABLE `base_trademark_topic` (
  `tm_id` string  NULL COMMENT 'brand id',
  `tm_name` string  NULL COMMENT 'Brand name',
  PRIMARY KEY (`tm_id`) NOT ENFORCED

)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_trademark'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);


CREATE TABLE `date_info_topic` (
  `date_id` int NOT NULL,
  `week_id` int  NULL,
  `week_day` int  NULL,
  `day` int  NULL,
  `month` int  NULL,
  `quarter` int  NULL,
  `year` int  NULL,
  `is_workday` int  NULL,
  `holiday_id` int  NULL,
  PRIMARY KEY (`date_id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.date_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);


CREATE TABLE `holiday_info_topic` (
  `holiday_id` int NOT NULL,
  `holiday_name` string  NULL,
  PRIMARY KEY (`holiday_id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.holiday_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

CREATE TABLE `holiday_year_topic` (
  `year_id` int  NULL,
  `holiday_id` int  NULL,
  `start_date_id` int  NULL,
  `end_date_id` int  NULL
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.holiday_year'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

CREATE TABLE `order_detail_topic` (
  `id` bigint NOT NULL  COMMENT 'number',
  `order_id` bigint  NULL COMMENT 'Order No',
  `sku_id` bigint  NULL COMMENT 'sku_id',
  `sku_name` string  NULL COMMENT 'sku Name (redundant))',
  `img_url` string  NULL COMMENT 'Picture name (redundant))',
  `order_price` decimal(10,2)  NULL COMMENT 'Purchase price(When placing an order sku Price)',
  `sku_num` string  NULL COMMENT 'Number of purchases',
  `create_time` timestamp(3)  NULL COMMENT 'Creation time',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.order_detail'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);


CREATE TABLE `order_info_topic` (
  `id` bigint NOT NULL  COMMENT 'number',
  `consignee` string  NULL COMMENT 'consignee',
  `consignee_tel` string  NULL COMMENT 'Recipient phone',
  `total_amount` decimal(10,2)  NULL COMMENT 'Total amount',
  `order_status` string  NULL COMMENT 'Order status',
  `user_id` bigint  NULL COMMENT 'user id',
  `payment_way` string  NULL COMMENT 'payment method',
  `delivery_address` string  NULL COMMENT 'Shipping address',
  `order_comment` string  NULL COMMENT 'Order remarks',
  `out_trade_no` string  NULL COMMENT 'Order transaction number (for third party payment))',
  `trade_body` string  NULL COMMENT 'Order description(For third party payment)',
  `create_time` timestamp(3)  NULL COMMENT 'Creation time',
  `operate_time` timestamp(3)  NULL COMMENT 'Operation time',
  `expire_time` timestamp(3)  NULL COMMENT 'Failure time',
  `tracking_no` string  NULL COMMENT 'Logistics order No',
  `parent_order_id` bigint  NULL COMMENT 'Parent order number',
  `img_url` string  NULL COMMENT 'Picture path',
  `province_id` int  NULL COMMENT 'region',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.order_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);


CREATE TABLE `order_status_log_topic` (
  `id` int NOT NULL ,
  `order_id` int  NULL,
  `order_status` int  NULL,
  `operate_time` timestamp(3)  NULL,
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.order_status_log'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);


CREATE TABLE `payment_info_topic` (
  `id` bigint NOT NULL  COMMENT 'number',
  `out_trade_no` string  NULL COMMENT 'External business No',
  `order_id` string  NULL COMMENT 'Order No',
  `user_id` string  NULL COMMENT 'User number',
  `alipay_trade_no` string  NULL COMMENT 'Alipay transaction flow code',
  `total_amount` decimal(16,2)  NULL COMMENT 'Payment amount',
  `subject` string  NULL COMMENT 'Transaction content',
  `payment_type` string  NULL COMMENT 'Payment method',
  `payment_time` string  NULL COMMENT 'Payment time',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.payment_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);


CREATE TABLE `sku_info_topic` (
  `id` bigint NOT NULL  COMMENT 'skuid(itemID)',
  `spu_id` bigint  NULL COMMENT 'spuid',
  `price` decimal(10,0)  NULL COMMENT 'Price',
  `sku_name` string  NULL COMMENT 'sku name',
  `sku_desc` string  NULL COMMENT 'Product specification description',
  `weight` decimal(10,2)  NULL COMMENT 'weight',
  `tm_id` bigint  NULL COMMENT 'brand(redundancy)',
  `category3_id` bigint  NULL COMMENT 'Three level classification id(redundancy)',
  `sku_default_img` string  NULL COMMENT 'Default display picture(redundancy)',
  `create_time` timestamp(3)  NULL COMMENT 'Creation time',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.sku_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

CREATE TABLE `user_info_topic` (
  `id` bigint NOT NULL  COMMENT 'number',
  `login_name` string  NULL COMMENT 'User name',
  `nick_name` string  NULL COMMENT 'User nickname',
  `passwd` string  NULL COMMENT 'User password',
  `name` string  NULL COMMENT 'User name',
  `phone_num` string  NULL COMMENT 'cell-phone number',
  `email` string  NULL COMMENT 'mailbox',
  `head_img` string  NULL COMMENT 'head portrait',
  `user_level` string  NULL COMMENT 'User level',
  `birthday` date  NULL COMMENT 'User birthday',
  `gender` varchar(1)  NULL COMMENT 'Gender M male,F female',
  `create_time` timestamp(3)  NULL COMMENT 'Creation time',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.user_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
);

insert statement to import mysql binlog data into the topic corresponding to kafka

%flink.ssql(runAsOne=true)
insert into base_category1_topic select * from base_category1;
insert into base_category2_topic select * from base_category2;
insert into base_category3_topic select * from base_category3;
insert into base_province_topic select * from base_province;
insert into base_region_topic select * from base_region;
insert into base_trademark_topic select * from base_trademark;
insert into date_info_topic select * from date_info;
insert into holiday_info_topic select * from holiday_info;
insert into holiday_year_topic select * from holiday_year;
insert into order_detail_topic select * from order_detail;
insert into order_info_topic select * from order_info;
insert into order_status_log_topic select * from order_status_log;
insert into payment_info_topic select * from payment_info;
insert into sku_info_topic select * from sku_info;
insert into user_info_topic select * from user_info;

Import dimension table data into hudi

Set my5 base_ Province and my1 base_ Region two region dimension tables are written into hudi COW table

%flink.ssql
drop table if exists base_province_topic_source;
drop table if exists base_province_hudi;

CREATE TABLE `base_province_topic_source` (
  `id` int  NULL COMMENT 'id',
  `name` string  NULL COMMENT 'Province name',
  `region_id` int  NULL COMMENT 'Large area id',
  `area_code` string  NULL COMMENT 'Administrative location code'
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_province'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset'
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE `base_province_hudi` (
  `id` int  NULL COMMENT 'id',
  `name` string  NULL COMMENT 'Province name',
  `region_id` int  NULL COMMENT 'Large area id',
  `area_code` string  NULL COMMENT 'Administrative location code',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'hudi',
  'path' = 'hdfs://host117:8020/realtime-demo-2/base_province_hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'id',
  'read.streaming.enabled' = 'true'
);
%flink.ssql
insert into base_province_hudi select * from base_province_topic_source;
%flink.ssql
drop table if exists base_region_topic_source;
drop table if exists base_region_hudi;

CREATE TABLE `base_region_topic_source` (
  `id` int NOT NULL COMMENT 'Large area id',
  `region_name` string  NULL COMMENT 'Region name',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_region'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset'
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE `base_region_hudi` (
  `id` int NOT NULL COMMENT 'Large area id',
  `region_name` string  NULL COMMENT 'Region name',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'hudi',
  'path' = 'hdfs://host117:8020/realtime-demo-2/base_region_hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'id',
  'read.streaming.enabled' = 'true'
);
%flink.ssql
insert into base_region_hudi select * from base_region_topic_source;

Use the above two dimension tables to create dim_province table

%flink.ssql
DROP TABLE IF EXISTS dim_province_hudi;
create table dim_province_hudi (
  province_id INT,
  province_name STRING,
  area_code STRING,
  region_id INT,
  region_name STRING ,
  PRIMARY KEY (province_id) NOT ENFORCED
) with (
    'connector' = 'hudi',
  'path' = 'hdfs://host117:8020/realtime-demo-2/dim_province_hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'province_id',
  'read.streaming.enabled' = 'true'
);
%flink.ssql

insert into dim_province_hudi
SELECT
  bp.id AS province_id,
  bp.name AS province_name,
  bp.area_code AS area_code,
  br.id AS region_id,
  br.region_name AS region_name
FROM base_region_hudi br 
     JOIN base_province_hudi bp ON br.id= bp.region_id
;

Add item dimension table my5 base_ Category1 and my5 base_ Category2 write the information of the two commodity dimension tables into the hudi COW table

%flink.ssql
drop table if exists base_category1_topic_source;
drop table if exists base_category1_hudi;
CREATE TABLE `base_category1_topic_source` (
  `id` bigint NOT NULL  COMMENT 'number',
  `name` string NOT NULL COMMENT 'Classification name',
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_category1'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset'
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE `base_category1_hudi` (
  `id` bigint NOT NULL  COMMENT 'number',
  `name` string NOT NULL COMMENT 'Classification name',
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'hudi',
  'path' = 'hdfs://host117:8020/realtime-demo-2/base_category1_hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'id',
  'read.streaming.enabled' = 'true'
);
%flink.ssql
insert into base_category1_hudi select * from base_category1_topic_source;
%flink.ssql
drop table if exists base_category2_topic_source;
drop table if exists base_category2_hudi;
CREATE TABLE `base_category2_topic_source` (
  `id` bigint NOT NULL  COMMENT 'number',
  `name` string NOT NULL COMMENT 'Classification name',
  `category1_id` bigint NULL COMMENT 'Primary classification number',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_category2'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset'
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE `base_category2_hudi` (
  `id` bigint NOT NULL  COMMENT 'number',
  `name` string NOT NULL COMMENT 'Classification name',
  `category1_id` bigint NULL COMMENT 'Primary classification number',
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'hudi',
  'path' = 'hdfs://host117:8020/realtime-demo-2/base_category2_hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'id',
  'read.streaming.enabled' = 'true'
);
%flink.ssql
insert into base_category2_hudi select * from base_category2_topic_source;
%flink.ssql
drop table if exists base_category3_topic_source;
drop table if exists base_category3_hudi;
CREATE TABLE `base_category3_topic_source` (
  `id` bigint NOT NULL  COMMENT 'number',
  `name` string NOT NULL COMMENT 'Classification name',
  `category2_id` bigint NULL COMMENT 'Secondary classification number',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.base_category3'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset'
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE `base_category3_hudi` (
  `id` bigint NOT NULL  COMMENT 'number',
  `name` string NOT NULL COMMENT 'Classification name',
  `category2_id` bigint NULL COMMENT 'Secondary classification number',
  PRIMARY KEY (`id`) NOT ENFORCED
) 
with(
  'connector' = 'hudi',
  'path' = 'hdfs://host117:8020/realtime-demo-2/base_category3_hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'id',
  'read.streaming.enabled' = 'true'
);
%flink.ssql
insert into base_category3_hudi select * from base_category3_topic_source;

Import item table into hudi

%flink.ssql
drop table if exists sku_info_topic_source;
drop table if exists sku_info_topic_hudi;

CREATE TABLE `sku_info_topic_source` (
  `id` bigint NOT NULL  COMMENT 'skuid(itemID)',
  `spu_id` bigint  NULL COMMENT 'spuid',
  `price` decimal(10,0)  NULL COMMENT 'Price',
  `sku_name` string  NULL COMMENT 'sku name',
  `sku_desc` string  NULL COMMENT 'Product specification description',
  `weight` decimal(10,2)  NULL COMMENT 'weight',
  `tm_id` bigint  NULL COMMENT 'brand(redundancy)',
  `category3_id` bigint  NULL COMMENT 'Three level classification id(redundancy)',
  `sku_default_img` string  NULL COMMENT 'Default display picture(redundancy)',
  `create_time` timestamp(3)  NULL COMMENT 'Creation time',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.sku_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset'
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE `sku_info_topic_hudi` (
  `id` bigint NOT NULL  COMMENT 'skuid(itemID)',
  `spu_id` bigint  NULL COMMENT 'spuid',
  `price` decimal(10,0)  NULL COMMENT 'Price',
  `sku_name` string  NULL COMMENT 'sku name',
  `sku_desc` string  NULL COMMENT 'Product specification description',
  `weight` decimal(10,2)  NULL COMMENT 'weight',
  `tm_id` bigint  NULL COMMENT 'brand(redundancy)',
  `category3_id` bigint  NULL COMMENT 'Three level classification id(redundancy)',
  `sku_default_img` string  NULL COMMENT 'Default display picture(redundancy)',
  `create_time` timestamp(3)  NULL COMMENT 'Creation time',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'hudi',
  'path' = 'hdfs://host117:8020/realtime-demo-2/sku_info_topic_hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.precombine.field' = 'id',
  'read.streaming.enabled' = 'true'
);
%flink.ssql
insert into sku_info_topic_hudi select * from sku_info_topic_source;

Based on the above steps, we synchronize the basic data of the commodity dimension table to hudi. Similarly, we use the commodity dimension table to create dim_sku_info view

%flink.ssql
drop view if exists dim_sku_info;
CREATE VIEW dim_sku_info AS
SELECT
  si.id AS id,
  si.sku_name AS sku_name,
  si.category3_id AS c3_id,
  si.weight AS weight,
  si.tm_id AS tm_id,
  si.price AS price,
  si.spu_id AS spu_id,
  c3.name AS c3_name,
  c2.id AS c2_id,
  c2.name AS c2_name,
  c3.id AS c1_id,
  c3.name AS c1_name
FROM
  sku_info_topic_hudi si 
  JOIN base_category3_hudi c3 ON si.category3_id = c3.id
  JOIN base_category2_hudi c2 ON c3.category2_id =c2.id
  JOIN base_category1_hudi c1 ON c2.category1_id = c1.id
;

DWD layer data processing

After the above steps, we have prepared the dimension table for use. Next, we will process the original data of ODS and process it into a schedule of DWD layer.

%flink.ssql
drop table if exists ods_order_detail_topic;
drop table if exists ods_order_info_topic;
drop table if exists dwd_paid_order_detail_hudi;

CREATE TABLE `ods_order_detail_topic` (
  `id` bigint NOT NULL  COMMENT 'number',
  `order_id` bigint  NULL COMMENT 'Order No',
  `sku_id` bigint  NULL COMMENT 'sku_id',
  `sku_name` string  NULL COMMENT 'sku Name (redundant))',
  `img_url` string  NULL COMMENT 'Picture name (redundant))',
  `order_price` decimal(10,2)  NULL COMMENT 'Purchase price(When placing an order sku Price)',
  `sku_num` int  NULL COMMENT 'Number of purchases',
  `create_time` timestamp(3)  NULL COMMENT 'Creation time',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.order_detail'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset' 
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE `ods_order_info_topic` (
  `id` bigint NOT NULL  COMMENT 'number',
  `consignee` string  NULL COMMENT 'consignee',
  `consignee_tel` string  NULL COMMENT 'Recipient phone',
  `total_amount` decimal(10,2)  NULL COMMENT 'Total amount',
  `order_status` string  NULL COMMENT 'Order status',
  `user_id` bigint  NULL COMMENT 'user id',
  `payment_way` string  NULL COMMENT 'payment method',
  `delivery_address` string  NULL COMMENT 'Shipping address',
  `order_comment` string  NULL COMMENT 'Order remarks',
  `out_trade_no` string  NULL COMMENT 'Order transaction number (for third party payment))',
  `trade_body` string  NULL COMMENT 'Order description(For third party payment)',
  `create_time` timestamp(3)  NULL COMMENT 'Creation time',
  `operate_time` timestamp(3)  NULL COMMENT 'Operation time',
  `expire_time` timestamp(3)  NULL COMMENT 'Failure time',
  `tracking_no` string  NULL COMMENT 'Logistics order No',
  `parent_order_id` bigint  NULL COMMENT 'Parent order number',
  `img_url` string  NULL COMMENT 'Picture path',
  `province_id` int  NULL COMMENT 'region',
  PRIMARY KEY (`id`) NOT ENFORCED
)
with(
  'connector' = 'kafka'
  ,'topic' = 'my5.order_info'
  ,'properties.bootstrap.servers' = '172.21.73.53:6667,172.21.73.54:6667,172.21.73.55:6667'
  ,'format' = 'debezium-json'
  ,'scan.startup.mode' = 'earliest-offset' 
  ,'properties.group.id' = 'hudiGroup'
);

CREATE TABLE dwd_paid_order_detail_hudi
(
  detail_id BIGINT,
  order_id BIGINT,
  user_id BIGINT,
  province_id INT,
  sku_id BIGINT,
  sku_name STRING,
  sku_num INT,
  order_price DECIMAL(10,0),
  create_time TIMESTAMP(3),
  pay_time  TIMESTAMP(3),
  primary key (detail_id) not enforced
 ) WITH (
    'connector' = 'hudi',
    'path' = 'hdfs://host117:8020/realtime-demo-2/dwd_paid_order_detail_hudi',
    'scan.startup.mode' = 'earliest-offset',
    'table.type' = 'MERGE_ON_READ',
    'compaction.async.enabled' = 'false',
    'read.streaming.enabled' = 'true'
);
%flink.ssql

insert into dwd_paid_order_detail_hudi
SELECT
  od.id,
  oi.id order_id,
  oi.user_id,
  oi.province_id,
  od.sku_id,
  od.sku_name,
  od.sku_num,
  od.order_price,
  oi.create_time,
  oi.operate_time
FROM
    (
    SELECT * 
    FROM ods_order_info_topic
    WHERE order_status = '2' -- Paid
    ) oi JOIN
    (
    SELECT *
    FROM ods_order_detail_topic
    ) od 
    ON oi.id = od.order_id;

ADS layer data

After the above steps, we created a dwd_paid_order_detail the schedule and store it in hudi. Next, we will use this detail wide table and dimension table to JOIN to get our ADS application layer data.

ads_province_index_hudi

%flink.ssql
drop table if exists ads_province_index_hudi;
drop table if exists tmp_province_index_hudi;

-- ---------------------------------
-- use DDL establish MySQL Medium ADS Layer table
-- Indicator: 1.Orders per province per day
--      2.Order amount per province per day
-- ---------------------------------

CREATE TABLE ads_province_index_hudi(
  province_id INT,
  area_code STRING,
  province_name STRING,
  region_id INT,
  region_name STRING,
  order_amount DECIMAL(10,2),
  order_count BIGINT,
  dt STRING,
  PRIMARY KEY (province_id, dt) NOT ENFORCED  
) WITH (
    'connector' = 'hudi',
    'path' = 'hdfs://host117:8020/realtime-demo-2/ads_province_index_hudi',
    'table.type' = 'MERGE_ON_READ',
    'compaction.async.enabled' = 'false',
    'read.streaming.enabled' = 'true'
);

-- ---------------------------------
-- tmp_province_index
-- Order summary temporary table
-- ---------------------------------

CREATE TABLE tmp_province_index_hudi(
    province_id INT,
    order_count BIGINT,-- Number of orders
    order_amount DECIMAL(10,2), -- Order amount
    pay_date DATE,
    primary key(province_id) not enforced
)WITH (
    'connector' = 'hudi',
    'path' = 'hdfs://host117:8020/realtime-demo-2/tmp_province_index_hudi',
    'table.type' = 'MERGE_ON_READ',
    'compaction.async.enabled' = 'false',
    'read.streaming.enabled' = 'true'

);
%flink.ssql
-- ---------------------------------
-- tmp_province_index
-- Order summary temporary table data loading
-- ---------------------------------
INSERT INTO tmp_province_index_hudi
SELECT
      province_id,
      count(distinct order_id) order_count,-- Number of orders
      sum(order_price * sku_num) order_amount, -- Order amount
      TO_DATE(DATE_FORMAT(pay_time,'yyyy-MM-dd')) pay_date
FROM dwd_paid_order_detail_hudi
GROUP BY province_id,TO_DATE(DATE_FORMAT(pay_time,'yyyy-MM-dd'))
%flink.ssql

INSERT INTO ads_province_index_hudi
SELECT
  pc.province_id,
  dp.area_code,
  dp.province_name,
  dp.region_id,
  dp.region_name,
  pc.order_amount,
  pc.order_count,
  cast(pc.pay_date as VARCHAR)
FROM
tmp_province_index_hudi pc
  JOIN dim_province_hudi as dp 
  ON dp.province_id = pc.province_id;

View ADS of ADS layer_ province_ index_ Hudi table data:

ads_sku_index_hudi

%flink.ssql

-- ---------------------------------
-- use DDL establish hudi Medium ADS Layer table
-- Indicator: 1.Number of orders corresponding to each commodity per day
--      2.Order amount corresponding to each commodity per day
--      3.Quantity of each item per day
-- ---------------------------------


drop table if exists ads_sku_index_hudi;
CREATE TABLE ads_sku_index_hudi
(
  sku_id BIGINT,
  sku_name VARCHAR,
  weight DOUBLE,
  tm_id BIGINT,
  price DOUBLE,
  spu_id BIGINT,
  c3_id BIGINT,
  c3_name VARCHAR ,
  c2_id BIGINT,
  c2_name VARCHAR,
  c1_id BIGINT,
  c1_name VARCHAR,
  order_amount DOUBLE,
  order_count BIGINT,
  sku_count BIGINT,
  dt varchar,
  PRIMARY KEY (sku_id,dt) NOT ENFORCED
) with (
    'connector' = 'hudi',
    'path' = 'hdfs://host117:8020/realtime-demo-2/ads_sku_index_hudi',
    'table.type' = 'MERGE_ON_READ',
    'compaction.async.enabled' = 'false',
    'read.streaming.enabled' = 'true'
);

-- ---------------------------------
-- tmp_sku_index
-- Commodity index statistics
-- ---------------------------------
drop table if exists tmp_sku_index_hudi;
CREATE TABLE tmp_sku_index_hudi(
    sku_id BIGINT,
    order_count BIGINT,-- Number of orders
    order_amount DECIMAL(10,2), -- Order amount
    order_sku_num BIGINT,
    pay_date DATE,
    PRIMARY KEY (sku_id) NOT ENFORCED
)WITH (
    'connector' = 'hudi',
    'path' = 'hdfs://host117:8020/realtime-demo-2/tmp_sku_index_hudi',
    'table.type' = 'MERGE_ON_READ',
    'compaction.async.enabled' = 'false',
    'read.streaming.enabled' = 'true'
);
%flink.ssql

-- ---------------------------------
-- tmp_sku_index
-- Data loading
-- ---------------------------------

INSERT INTO tmp_sku_index_hudi
SELECT
      sku_id,
      count(distinct order_id) order_count, -- Number of orders
      sum(order_price * sku_num) order_amount, -- Order amount
      sum(sku_num) order_sku_num,
      TO_DATE(DATE_FORMAT(pay_time,'yyyy-MM-dd')) pay_date
FROM dwd_paid_order_detail_hudi
GROUP BY sku_id,TO_DATE(DATE_FORMAT(pay_time,'yyyy-MM-dd'))
%flink.ssql
INSERT INTO ads_sku_index_hudi
SELECT
  sku_id ,
  sku_name ,
  weight ,
  tm_id ,
  price ,
  spu_id ,
  c3_id ,
  c3_name,
  c2_id ,
  c2_name ,
  c1_id ,
  c1_name ,
  sc.order_amount,
  sc.order_count ,
  sc.order_sku_num ,
  cast(sc.pay_date as VARCHAR)
FROM
tmp_sku_index_hudi sc 
  JOIN dim_sku_info ds
  ON ds.id = sc.sku_id
%flink.ssql
select * from ads_sku_index_hudi;

This article is the original article of "xiaozhch5", a blogger from big data to artificial intelligence. It follows the CC 4.0 BY-SA copyright agreement. Please attach the original source link and this statement for reprint.

Original link: https://lrting.top/uncategorized/2744/

Added by lobobr on Tue, 18 Jan 2022 19:43:54 +0200