Flink practice tutorial: advanced 2 - complex format data extraction

Introduction to flow computing Oceanus

Stream computing Oceanus is a powerful tool for real-time analysis of big data product ecosystem. It is an enterprise level real-time big data analysis platform based on Apache Flink with the characteristics of one-stop development, seamless connection, sub second delay, low cost, security and stability. Stream computing Oceanus aims to maximize the value of enterprise data and accelerate the construction process of real-time digitization of enterprises.

This article will introduce in detail how to obtain JSON format data in CKafka in real time, and store it in MySQL after data extraction, tiling and conversion.

Pre preparation

Create flow computing Oceanus cluster

On the flow calculation Oceanus product activity page 1 yuan to buy Oceanus cluster.

Enter Oceanus console [1], click cluster management on the left, and click Create cluster on the top left. For details, please refer to Oceanus official document to create an exclusive cluster [2].

Create message queue CKafka

Enter the CKafka console [3] and click [new] in the upper left corner to complete the creation of CKafka. For details, refer to CKafka creation instance [4].

Create Topic

Enter the CKafka instance and click [Topic Management] > [new] to complete the creation of topics. For details, please refer to CKafka create Topic [5].

Data preparation

Enter the CVM of the same subnet and start the Kafka client to simulate sending data. See running Kafka client [6] for specific operations.

// data format
{
  "id": 1,
  "message": "Flow calculation Oceanus 1 Yuan Limited second kill activity",
  "userInfo": {
      "name": "Zhang San",
      "phone": ["12345678910", "8547942"]
      },
  "companyInfo": {
      "name": "Tencent",
      "address": "Shenzhen Tencent building"
      }
}

Create MySQL instance

Enter MySQL console [7] and click new. For details, please refer to the official document to create a MySQL instance [8].

-- Create table statement
CREATE TABLE `oceanus_advanced2` (
  `id`              int (100) NOT NULL,
  `message`         varchar (100) NULL DEFAULT '',
  `name`            varchar (50)  NULL DEFAULT '',
  `phone`           varchar (11)  NULL DEFAULT '',
  `company_name`    varchar (100) NULL DEFAULT '',
  `company_address` varchar (100) NULL DEFAULT '',
  PRIMARY KEY (`id`)
) ENGINE = innodb

Flow calculation Oceanus job

1. Create Source

CREATE TABLE `kafka_json_source_table` (
    `id`             INT,
    `message`        STRING,
    `userInfo`       ROW<`name` STRING,`phone` ARRAY<STRING>>,  -- use ROW nesting ARRAY Format reception JSON field
    `companyInfo`    MAP<STRING,STRING>    -- use MAP Format reception JSON field
) WITH (
  'connector' = 'kafka',
  'topic' = 'oceanus_advanced2',                      -- Replace with the you want to consume Topic
  'scan.startup.mode' = 'earliest-offset',            -- Can be latest-offset/earliest-offset/specific-offsets/group-offsets/timestamp Any kind of
  'properties.bootstrap.servers' = '10.0.0.29:9092',  -- Replace with your Kafka Connection address
  'properties.group.id' = 'testGroup',                -- Required parameter, Be sure to specify Group ID
  'format' = 'json',                                  -- definition JSON Format, some other formats may not support extraction tiling
  'json.fail-on-missing-field' = 'false',             -- If set to false, If you encounter a missing field, you will not report an error.
  'json.ignore-parse-errors' = 'true'                 -- If set to true,Ignore any parsing errors.
);

2. Create Sink

CREATE TABLE `jdbc_upsert_sink_table` (
    `id`                INT,
    `message`           STRING,
    `name`              STRING,
    `phone`             STRING,
    `company_name`      STRING,
    `company_address`   STRING
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://10.0.0.158:3306 / testdb? Rewritebackedstatements = true & servertimezone = Asia / Shanghai '-- please replace with your actual MySQL connection parameters
    'table-name' = 'oceanus_advanced2',    -- Data table to be written
    'username' = 'root',                   -- User name for database access (required) INSERT Permissions)
    'password' = 'Tencent123$',            -- Password for database access
    'sink.buffer-flush.max-rows' = '200',  -- Number of batch output
    'sink.buffer-flush.interval' = '2s'    -- Interval for batch output
);

3. Write business SQL

INSERT INTO `jdbc_upsert_sink_table`
SELECT
id                        AS  id,
message                   AS  message,
userInfo.name             AS  name,              -- obtain Row Adopted by members in.Member mode
userInfo.phone[1]         AS  phone,             -- obtain Array Adopted by members in [Array subscript] The way
companyInfo['name']       AS  company_name,      -- obtain Map Adopted by members in ['Attribute name'] The way
companyInfo['address']    AS  company_address
FROM `kafka_json_source_table`;

The new version of Flink 1.13 cluster does not require users to select the built-in Connector, and the platform will automatically match and obtain

summary

This paper describes in detail how to define and obtain MAP, ARRAY and ROW type data through SQL operations. For more built-in operators and functions, please refer to the official Oceanus documentation [9].

Reference link

[1] Oceanus console: https://console.cloud.tencent.com/oceanus/overview

[2] Create an exclusive cluster: https://cloud.tencent.com/document/product/849/48298

[3] CKafka console: https://console.cloud.tencent.com/ckafka/index?rid=1

[4] CKafka create instance: https://cloud.tencent.com/document/product/597/54839

[5] Ckafka create Topic: https://cloud.tencent.com/document/product/597/54854

[6] Run Kafka client: https://cloud.tencent.com/document/product/597/56840

[7] MySQL console: https://console.cloud.tencent.com/cdb

[8] Create MySQL instance: https://cloud.tencent.com/document/product/236/46433

[9] Built in operators and functions: https://cloud.tencent.com/document/product/849/18083

Keywords: flink

Added by markjreed on Sat, 04 Dec 2021 21:35:49 +0200