Introduction to flow computing Oceanus
Stream computing Oceanus is a powerful tool for real-time analysis of big data product ecosystem. It is an enterprise level real-time big data analysis platform based on Apache Flink with the characteristics of one-stop development, seamless connection, sub second delay, low cost, security and stability. Stream computing Oceanus aims to maximize the value of enterprise data and accelerate the construction process of real-time digitization of enterprises.
This article will introduce in detail how to obtain JSON format data in CKafka in real time, and store it in MySQL after data extraction, tiling and conversion.
Pre preparation
Create flow computing Oceanus cluster
On the flow calculation Oceanus product activity page 1 yuan to buy Oceanus cluster.
Enter Oceanus console [1], click cluster management on the left, and click Create cluster on the top left. For details, please refer to Oceanus official document to create an exclusive cluster [2].
Create message queue CKafka
Enter the CKafka console [3] and click [new] in the upper left corner to complete the creation of CKafka. For details, refer to CKafka creation instance [4].
Create Topic
Enter the CKafka instance and click [Topic Management] > [new] to complete the creation of topics. For details, please refer to CKafka create Topic [5].
Data preparation
Enter the CVM of the same subnet and start the Kafka client to simulate sending data. See running Kafka client [6] for specific operations.
// data format { "id": 1, "message": "Flow calculation Oceanus 1 Yuan Limited second kill activity", "userInfo": { "name": "Zhang San", "phone": ["12345678910", "8547942"] }, "companyInfo": { "name": "Tencent", "address": "Shenzhen Tencent building" } }
Create MySQL instance
Enter MySQL console [7] and click new. For details, please refer to the official document to create a MySQL instance [8].
-- Create table statement CREATE TABLE `oceanus_advanced2` ( `id` int (100) NOT NULL, `message` varchar (100) NULL DEFAULT '', `name` varchar (50) NULL DEFAULT '', `phone` varchar (11) NULL DEFAULT '', `company_name` varchar (100) NULL DEFAULT '', `company_address` varchar (100) NULL DEFAULT '', PRIMARY KEY (`id`) ) ENGINE = innodb
Flow calculation Oceanus job
1. Create Source
CREATE TABLE `kafka_json_source_table` ( `id` INT, `message` STRING, `userInfo` ROW<`name` STRING,`phone` ARRAY<STRING>>, -- use ROW nesting ARRAY Format reception JSON field `companyInfo` MAP<STRING,STRING> -- use MAP Format reception JSON field ) WITH ( 'connector' = 'kafka', 'topic' = 'oceanus_advanced2', -- Replace with the you want to consume Topic 'scan.startup.mode' = 'earliest-offset', -- Can be latest-offset/earliest-offset/specific-offsets/group-offsets/timestamp Any kind of 'properties.bootstrap.servers' = '10.0.0.29:9092', -- Replace with your Kafka Connection address 'properties.group.id' = 'testGroup', -- Required parameter, Be sure to specify Group ID 'format' = 'json', -- definition JSON Format, some other formats may not support extraction tiling 'json.fail-on-missing-field' = 'false', -- If set to false, If you encounter a missing field, you will not report an error. 'json.ignore-parse-errors' = 'true' -- If set to true,Ignore any parsing errors. );
2. Create Sink
CREATE TABLE `jdbc_upsert_sink_table` ( `id` INT, `message` STRING, `name` STRING, `phone` STRING, `company_name` STRING, `company_address` STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://10.0.0.158:3306 / testdb? Rewritebackedstatements = true & servertimezone = Asia / Shanghai '-- please replace with your actual MySQL connection parameters 'table-name' = 'oceanus_advanced2', -- Data table to be written 'username' = 'root', -- User name for database access (required) INSERT Permissions) 'password' = 'Tencent123$', -- Password for database access 'sink.buffer-flush.max-rows' = '200', -- Number of batch output 'sink.buffer-flush.interval' = '2s' -- Interval for batch output );
3. Write business SQL
INSERT INTO `jdbc_upsert_sink_table` SELECT id AS id, message AS message, userInfo.name AS name, -- obtain Row Adopted by members in.Member mode userInfo.phone[1] AS phone, -- obtain Array Adopted by members in [Array subscript] The way companyInfo['name'] AS company_name, -- obtain Map Adopted by members in ['Attribute name'] The way companyInfo['address'] AS company_address FROM `kafka_json_source_table`;
The new version of Flink 1.13 cluster does not require users to select the built-in Connector, and the platform will automatically match and obtain
summary
This paper describes in detail how to define and obtain MAP, ARRAY and ROW type data through SQL operations. For more built-in operators and functions, please refer to the official Oceanus documentation [9].
Reference link
[1] Oceanus console: https://console.cloud.tencent.com/oceanus/overview
[2] Create an exclusive cluster: https://cloud.tencent.com/document/product/849/48298
[3] CKafka console: https://console.cloud.tencent.com/ckafka/index?rid=1
[4] CKafka create instance: https://cloud.tencent.com/document/product/597/54839
[5] Ckafka create Topic: https://cloud.tencent.com/document/product/597/54854
[6] Run Kafka client: https://cloud.tencent.com/document/product/597/56840
[7] MySQL console: https://console.cloud.tencent.com/cdb
[8] Create MySQL instance: https://cloud.tencent.com/document/product/236/46433
[9] Built in operators and functions: https://cloud.tencent.com/document/product/849/18083