Flink Sql introduction
In the actual development process, I personally focus on Stream API, and Flink Sql does not write much. Here I mainly refer to the original project code. I am not very familiar with the specific details. I suggest you can learn about Flink Sql alone; To put it simply, first obtain the required field data from the corresponding data table (mainly the order wide table of DWM layer) through Sql statement to form a dynamic table, then convert the dynamic table into the form of flow, and then save it to ClickHouse.
Points to note:
1. Flink Sql and MySql are written in slightly different languages;
2. When converting to stream, pay attention to the selected mode. Flink has three modes:
Convert Table to DataStream
Append Mode
– used in scenarios where the table will only be changed by the Insert operation
DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable, Row.class);
Retract Mode
– for any scenario. Some are similar to the Retract mode in the update mode. It has only Insert and Delete operations.
– a Boolean type identification bit (the first field returned) will be added to the obtained data to indicate whether it is new data (Insert) or deleted data (Delete)
DataStream<Tuple2<Boolean, Row>> aggResultStream = tableEnv.toRetractStream(aggResultTable , Row.class);
Regional theme wide table production
I won't talk about this table alone, because the design is relatively simple, and I have written the specific thinking code very clearly in the project, that is, making zoning statistics according to provinces and cities. Here, the corresponding is the heat map distribution of the map of China of the project process; The total of order amount is the total value of large screen amount;
Keywords wide table making
Keyword display is also the result of dimension aggregation. The size of keywords is determined according to the size of aggregation. The first important source of keywords is the user's search in the search bar, and the other is from the statistics with commodity as the theme Get keywords from. //Here is the user's search keywords in the search bar, from dwd_page_log: "page_id":"good_list","item": "book"; Therefore, we need to segment long text into words one by one. This word segmentation technology may be used in search engines. For Chinese word segmentation, the current search engines basically use the third-party word segmentation. What are we doing You can also use word segmentation consistent with that in the search engine, IK. //How to complete word segmentation from Sql statements? User defined functions are used here; //Case: //@FunctionHint(output = @DataTypeHint("ROW<s STRING, i INT>")) public static class OverloadedFunction extends TableFunction<Row> { public void eval(int a, int b) { collect(Row.of("Sum", a + b)); } // overloading of arguments is still possible public void eval() { collect(Row.of("Empty args", -1)); } } Here is UDTF An annotation for function definition: //@Functionhint (output = @ datatypehint ("row < word string >")) specifies the type of output; Decoupling type derivation and evaluation method, type derivation depends entirely on FunctionHint //@FunctionHint( // input = {@DataTypeHint("INT"), @DataTypeHint("INT")}, // output = @DataTypeHint("INT") //) //technological process: //1. Create environment and register user-defined functions; (note that UDTF is used here. It can be seen from the subsequent aggregation idea display that it is a multi line output result; //2. Create a dynamic table; //3. Query data from dynamic table; (where page['page_id']='good_list' and page['item'] IS NOT NULL) the search term of the product page; //4. Polymerization; //5. Convert to stream; //6. Write clickhouse; //Click house to create a keyword topic wide table: create table keyword_stats_0709 ( stt DateTime, edt DateTime, keyword String , source String , ct UInt64 , ts UInt64 )engine =ReplacingMergeTree( ts) partition by toYYYYMMDD(stt) order by ( stt,edt,keyword,source ); // General join usage in select function: SELECT * FROM Orders INNER JOIN Product ON Orders.product_id = Product.id select Table functions in join method; SELECT order_id, res FROM Orders, //Orders is a Table; LATERAL TABLE(table_func(order_id)) t(res) //Final table indicates association; dwd_page_log Data: what we need is one of them"page_id":"good_list","item":"books",Contents of the; page:2> {"common":{"ar":"530000","uid":"36","os":"Android 11.0","ch":"wandoujia","is_new":"0", "md":"Xiaomi ten Pro ","mid":"mid_9","vc":"v2.1.134","ba":"Xiaomi"},"page":{"page_id":"good_list","item":"books", "during_time":7183,"item_type":"keyword","last_page_id":"search"},"displays":[{"display_type":"recommend", "item":"1","item_type":"sku_id","pos_id":5,"order":1},{"display_type":"recommend","item":"5", "item_type":"sku_id","pos_id":2,"order":2},{"display_type":"query","item":"2","item_type":"sku_id","pos_id":2, "order":3},{"display_type":"promotion","item":"5","item_type":"sku_id","pos_id":3,"order":4}, {"display_type":"promotion","item":"9","item_type":"sku_id","pos_id":4,"order":5},{"display_type":"query", "item":"9","item_type":"sku_id","pos_id":5,"order":6}],"ts":1626684732000} //Overall test ➢ start-up ZK,Kafka,logger.sh,ClickHouse ➢ function BaseLogApp ➢ function KeywordStatsApp ➢ function rt_applog Under directory jar package ➢ View console output ➢ see ClickHouse in keyword_stats_0709 Table data //The result type is: // >>>>: 4 > keywordstats (keyword = box, ct=2, source=SEARCH, stt=2021-07-19 12:58:30, edt=2021-07-19 12:58:40, ts=162675712 ninety 00) //The data in clickhouse is: ────────────────stt─┬─────────────────edt─┬─keyword─┬─source─┬─ct─┬────────────ts─┐ │ 2021-07-19 13:00:50 │ 2021-07-19 13:01:00 │ television │ SEARCH │ 1 │ 1626757293000 │ │ 2021-07-19 13:01:00 │ 2021-07-19 13:01:10 │ books │ SEARCH │ 2 │ 1626757293000 │ │ 2021-07-19 13:01:00 │ 2021-07-19 13:01:10 │ television │ SEARCH │ 1 │ 1626757293000 │ │ 2021-07-19 13:01:00 │ 2021-07-19 13:01:10 │ Box │ SEARCH │ 1 │ 1626757293000 │ │ 2021-07-19 13:01:10 │ 2021-07-19 13:01:20 │ television │ SEARCH │ 1 │ 1626757293000 │ //Get keywords from commodity themed statistics. //Obtain the statistical table of commodity keywords, click times, order times and added shopping times from the commodity theme. Save in clickhouse table keyword_stats_0709; //Display of aggregation calculation ideas: After the commodity subject statistics are completed, UDTF The design idea diagram is shown as follows: product_stats_0709 commodity SPU Number of hits Order times Additional purchase times Mi phones 50 80 100 After word segmentation, the result is obtained key word Number of hits Order times Additional purchase times millet 50 80 100 mobile phone 50 80 100 The goal is statistics: keyword heat--->Keyword_Stats_0820 key word Number of operations source millet 50 Click millet 80 Order millet 100 Cart mobile phone 50 Click mobile phone 80 Order mobile phone 100 Cart Get hot words Search 10 Click 9 to place an order 8 to add purchase 7 key word Number of operations source score millet 90 search 10 millet 50 Click 9 millet 80 Order 8 millet 100 Cart 7 mobile phone 50 Click mobile phone 80 Order mobile phone 100 Cart sum key word 90 * 10 50 * 9 80 * 8 100 * 6 Overall test ➢ start-up ZK,Kafka,logger.sh,ClickHouse,Redis,HDFS,Hbase,Maxwell ➢ function BaseLogApp ➢ function BaseDBApp ➢ function OrderWideApp ➢ function PaymentWideApp ➢ function ProductsStatsApp ➢ function KeywordStats4ProductApp ➢ function rt_applog Under directory jar package ➢ function rt_dblog Under directory jar package ➢ View console output ➢ see ClickHouse in products_stats_0709 Table data keywordStatsProductDataStream The data type is: //Corresponding to click, order and shopping cart respectively; 1> KeywordStats(keyword=10x, ct=2, source=CLICK, stt=2021-07-20 13:29:10, edt=2021-07-20 13:29:20, ts=1626758982000) 1> KeywordStats(keyword=redmi, ct=12, source=ORDER, stt=2021-07-20 13:28:50, edt=2021-07-20 13:29:00, ts=1626758978000) 1> KeywordStats(keyword=clear, ct=178, source=CART, stt=2021-07-19 21:58:10, edt=2021-07-19 21:58:20, ts=1626758976000) clickhouse The data in is: ┌─────────────────stt─┬─────────────────edt─┬─keyword───┬─source─┬─ct─┬────────────ts─┐ │ 2021-07-20 13:27:40 │ 2021-07-20 13:27:50 │ 10 │ CLICK │ 1 │ 1626758976000 │ │ 2021-07-20 13:27:40 │ 2021-07-20 13:27:50 │ 10x │ CLICK │ 1 │ 1626758977000 │ │ 2021-07-20 13:27:40 │ 2021-07-20 13:27:50 │ redmi │ CLICK │ 1 │ 1626758977000 │ │ 2021-07-20 13:27:40 │ 2021-07-20 13:27:50 │ millet │ CLICK │ 1 │ 1626758976000 │ │ 2021-07-20 13:27:50 │ 2021-07-20 13:28:00 │ 10 │ CLICK │ 2 │ 1626758977000 │ │ 2021-07-20 13:27:50 │ 2021-07-20 13:28:00 │ 10 │ ORDER │ 9 │ 1626758977000 │ //There are few purchase order data, so it's hard to find here, but there are three kinds of data;
summary
The whole project is basically over here. I won't explain the subsequent visualization links and the preparation of data interface here, because it involves a simple use of SpringBoot. If you are interested, you can also check the code by yourself. It is all a simple data statistics process, and then convert it into json format according to the format required by sugar; If you are interested, you can study it. The technology is not difficult. It is all hard work and meticulous work.
If you are interested in this project or Flink technology, you can continue to pay attention to me. In the future, I will gradually update the Flink framework and some learning methods and routes of real-time and streaming processing. I hope you can make progress together~~