Flink Practice Tutorial: Getting Started: Writing to Elasticsearch

Author: Tencent Cloud Flow Computing Oceanus Team

Introduction to Oceanus for Stream Computing

Flow computing Oceanus is a real-time analysis tool for the ecosystem of data products. It is an enterprise real-time large data analysis platform based on Apache Flink, which has the features of one-stop development, seamless connection, subsecond delay, low cost, security and stability. Stream computing Oceanus aims to maximize the value of enterprise data and accelerate the process of real-time digitization.

This article will show you how to use the datagen connector to generate random data, stream Oceanus, and ultimately store the calculated data in Elasticsearch.

Operational Video


Create Stream Computing Oceanus Cluster

Get into Flow Computing Oceanus Console , click Cluster Management on the left, and click Create Cluster on the top left to calculate Oceanus official documents with reference to streams Create Exclusive Cluster.

Create Elasticsearch Cluster

Get into Elasticsearch Console To create an Elasticsearch instance, click New on the top left, and visit Create Elasticsearch Cluster

! The VPC selected when creating the stream computing Oceanus and Elasticsearch clusters must be the same VPC.

Oceanus job for stream computing

1. Create Source

-- Datagen Connector Some data can be randomly generated for testing
-- See https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/datagen.html

CREATE TABLE random_source ( 
  f_sequence INT, 
  f_random INT, 
  f_random_str VARCHAR 
  ) WITH ( 
  'connector' = 'datagen', 
  'rows-per-second'='1',  -- Number of data bars generated per second
  'fields.f_sequence.kind'='sequence',   -- Bounded Sequence (stop output automatically when finished)
  'fields.f_sequence.start'='1',         -- The starting value of the sequence
  'fields.f_sequence.end'='10000',       -- Termination Value of Sequence
  'fields.f_random.kind'='random',       -- Unbounded Random Numbers
  'fields.f_random.min'='1',             -- Minimum value of random number
  'fields.f_random.max'='1000',          -- Maximum of Random Numbers
  'fields.f_random_str.length'='10'      -- Length of random string

2. Create Sink

-- Elasticsearch Can only be used as a data destination table ( Sink)Write in
-- Be careful! If you enable Elasticsearch User name password authentication function, Currently only available Flink 1.10 If authentication is not required, Then you can use Flink 1.11 New grammar.
-- See https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector

    `user_id`   INT,
    `user_name` VARCHAR
) WITH (
    'connector.type' = 'elasticsearch', -- Output to Elasticsearch

    'connector.version' = '6',            -- Appoint Elasticsearch Version of, for example '6', '7'. Note the built-in that must be and selected Connector Version Consistency
    'connector.hosts' = '', --Connected address of Elasticsearch
    'connector.index' = 'Student',        -- Elasticsearch Of Index name
    'connector.document-type' = 'stu',    -- Elasticsearch Of Document type
    'connector.username' = 'elastic',     -- Optional parameters: Please replace with actual Elasticsearch User name
    'connector.password' = 'xxxxxxxxxx',  -- Optional parameters: Please replace with actual Elasticsearch Password

    'update-mode' = 'append',             -- Optional without primary key 'append' Mode, or with a primary key 'upsert' Pattern     
    'connector.key-delimiter' = '$',      -- Optional parameters, Connection characters for composite primary keys (Default is _ Symbol, for example key1_key2_key3)
    'connector.key-null-literal' = 'n/a',  -- The primary key is null Substitution string, default is 'null'
    'connector.failure-handler' = 'retry-rejected',   -- Optional error handling. Optional 'fail' (Throw an exception),'ignore'(Ignore any errors),'retry-rejected'(Retry)

    'connector.flush-on-checkpoint' = 'true',   -- Optional parameters, Bulk writes are not allowed on snapshots ( flush), Default to true
    'connector.bulk-flush.max-actions' = '42',  -- Optional parameters, Maximum number of bars per batch
    'connector.bulk-flush.max-size' = '42 mb',  -- Optional parameters, Cumulative maximum size per batch (Only supported mb)
    'connector.bulk-flush.interval' = '60000',  -- Optional parameters, Interval between bulk writes (ms)
    'connector.connection-max-retry-timeout' = '300',     -- Maximum time-out per request (ms)

    'format.type' = 'json'        -- Output Data Format, Currently only supported 'json'

3. Write business SQL

f_sequence   AS user_id,
f_random_str AS user_name
FROM random_source;

4. Select Connector

Click Job Parameters, select flink-connector-elasticsearch 6 in Built-in Connector, and click Save > Publish Draft to run the job.

? The new Flink 1.13 cluster does not require users to select a built-in Connector. For other versions of the cluster, select the corresponding Connector based on the version of Elasticsearch actually purchased.

5. Data Query

Get into Elasticsearch Console , click on the previously purchased Elasticsearch instance, click on the upper right corner [Kibana], and enter Kibana to query data. Refer to the specific query method Accessing clusters through Kibana


This example uses a Datagen connector to randomly generate data, streams Oceanus to perform the most basic data conversion functions, and finally Sink to Elasticsearch without the user having to create an index in Elasticsearch ahead of time.

Focus on "Tencent Cloud Big Data" public number, one stop for technical exchange, latest activities and services

Keywords: Big Data ElasticSearch flink

Added by splitinfo on Sun, 31 Oct 2021 23:51:54 +0200