Elastic actual combat: realize the data preprocessing of mysql synchronous data to es through pipeline

0. Introduction

Recently, when synchronizing part of the company's mysql data to es, the Logstash input JDBC for full synchronizationIncremental synchronization with canal However, another problem is that the data structure in ES needs to be redesigned, which leads to the need for some mysql fields to be transformed and then synchronized to es

Firstly, canal supports custom clients and needs to introduce the following dependencies. This method is suitable for scenarios with complex data conversion rules and strong customization. However, considering that I also need to synchronize logstash data here, I need a more general way to realize data conversion processing, so I use es pipeline for preprocessing

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.4</version>
</dependency>

1. pipeline realizes data preprocessing

Firstly, pipeline is used to preprocess data before entering the index, and it also supports the painless syntax of java, which can meet our current business needs.

Let me give an example of user table processing. In order to facilitate demonstration and desensitization, some data have been removed

1.1 user structure in MySQL

mysql8.0

id: Long
code: varchar
real_name: varchar
role_id: varchar ,Multiple id Separated by commas
dept_id: varchar ,Multiple id Separated by commas
post_id: varchar ,Multiple id Separated by commas
create_time: datetime

1.2 user structure in ES

The following demonstration is based on ES7 thirteen

PUT user
{
  "mappings": {
    "properties": {
      "code": {
        "type": "keyword"
      }, 
      "realName": {
        "type": "text",
        "analyzer": "ik_smart"
      },   
      "roleId": {
        "type": "long"
      },
      "deptId": {
        "type": "keyword"
      },
      "postId": {
        "type": "long"
      },  
      "userSource": {
        "type": "integer"
      }
    }
  }
}

1.3 objectives

What we need to do includes:
1. Add role_id,dept_id,post_id is converted from string to array
2. Because it also involves synchronizing data from another wechat user table to es, in order to distinguish whether it is from wechat or pc, it is determined through the nickName field, because nickName is a unique field of wechat user table. When it exists, it indicates that the user is from the wechat table. Mark userSource as 1, otherwise it is marked as 0

1.4 writing pipeline

You can see that the string is directly converted to an array through the split function, and the user-defined script is used to mark the value of userSource

For more information on the use of pipeline, please refer to the official document: ingest pipeline

For the use of painless syntax, you can also refer to the official documents: painless guide

If you have doubts about the writing of pipeline or custom script, you can leave a message for discussion

PUT _ingest/pipeline/user_mysql_pipeline
{
  "description": "user data mysql Import convert to es structure",
  "processors": [
    {
      "split": {
        "field": "roleId",
        "separator": ","
      }
    },
    {
      "split": {
        "field": "deptId",
        "separator": ","
      }
    },
    {
      "split": {
        "field": "postId",
        "separator": ","
      }
    },
    {
      "script": {
        "lang": "painless", 
        "source": """ 
          if(ctx.containsKey('nickName')){
            ctx.name = ctx.nickName;
            ctx.remove('nickName');
            ctx.userSource = 1;
          }
        """
      }
    }
  ]
}

1.5 calling pipeline

1. To use pipeline, you need to add the ignest role in es and modify the es configuration file

node.roles: [ignest]

2. Specify pipeline in user's settings

PUT user
{
  "mappings": {
    "properties": {
      "code": {
        "type": "keyword"
      },
      "userType": {
        "type": "long"
      },
      "account": {
        "type": "text",
        "analyzer": "ik_smart"
      },
      "realName": {
        "type": "text",
        "analyzer": "ik_smart"
      },
      "email": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword"
          }
        }
      },
      "phone": {
        "type": "keyword"
      },
      "sex": {
        "type": "integer"
      },
      "roleIds": {
        "type": "long"
      },
      "deptIds": {
        "type": "keyword"
      },
      "postIds": {
        "type": "long"
      },
      "parentDeptIds": {
        "type": "keyword"
      },
      "thirdPlatformUserId": {
        "type": "keyword"
      },
      "tenantUserId": {
        "type": "long"
      },
      "userSource": {
        "type": "integer"
      },
      "tenantId": {
        "type": "keyword"
      },
      "createUser": {
        "type": "long"
      },
      "createDept": {
        "type": "keyword"
      },
      "createTime": {
        "type": "date"
      }
    }
  },
  "settings": {
    "default_pipeline": "user_mysql_pipeline",
    "number_of_replicas": 0,  // Because I use a single node for testing, I set replica sharding to 0
    "number_of_shards": 1
  }
}

Or you can specify the pipeline when inserting data. This method is not applicable because it is automatic synchronization

PUT user/_doc/1?pipeline=user_mysql_pipeline
{
   ...
}

3. After executing the above statements in kibana or other es clients, start canal and logstash to synchronize the data, and es will preprocess the data

4. After the test, you can see that the data conversion is successful

GET user/_search?size=100

Keywords: Database MySQL ElasticSearch

Added by vadercole on Sun, 16 Jan 2022 11:14:57 +0200