0. Introduction
Recently, when synchronizing part of the company's mysql data to es, the Logstash input JDBC for full synchronization,Incremental synchronization with canal However, another problem is that the data structure in ES needs to be redesigned, which leads to the need for some mysql fields to be transformed and then synchronized to es
Firstly, canal supports custom clients and needs to introduce the following dependencies. This method is suitable for scenarios with complex data conversion rules and strong customization. However, considering that I also need to synchronize logstash data here, I need a more general way to realize data conversion processing, so I use es pipeline for preprocessing
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.4</version> </dependency>
1. pipeline realizes data preprocessing
Firstly, pipeline is used to preprocess data before entering the index, and it also supports the painless syntax of java, which can meet our current business needs.
Let me give an example of user table processing. In order to facilitate demonstration and desensitization, some data have been removed
1.1 user structure in MySQL
mysql8.0
id: Long code: varchar real_name: varchar role_id: varchar ,Multiple id Separated by commas dept_id: varchar ,Multiple id Separated by commas post_id: varchar ,Multiple id Separated by commas create_time: datetime
1.2 user structure in ES
The following demonstration is based on ES7 thirteen
PUT user { "mappings": { "properties": { "code": { "type": "keyword" }, "realName": { "type": "text", "analyzer": "ik_smart" }, "roleId": { "type": "long" }, "deptId": { "type": "keyword" }, "postId": { "type": "long" }, "userSource": { "type": "integer" } } } }
1.3 objectives
What we need to do includes:
1. Add role_id,dept_id,post_id is converted from string to array
2. Because it also involves synchronizing data from another wechat user table to es, in order to distinguish whether it is from wechat or pc, it is determined through the nickName field, because nickName is a unique field of wechat user table. When it exists, it indicates that the user is from the wechat table. Mark userSource as 1, otherwise it is marked as 0
1.4 writing pipeline
You can see that the string is directly converted to an array through the split function, and the user-defined script is used to mark the value of userSource
For more information on the use of pipeline, please refer to the official document: ingest pipeline
For the use of painless syntax, you can also refer to the official documents: painless guide
If you have doubts about the writing of pipeline or custom script, you can leave a message for discussion
PUT _ingest/pipeline/user_mysql_pipeline { "description": "user data mysql Import convert to es structure", "processors": [ { "split": { "field": "roleId", "separator": "," } }, { "split": { "field": "deptId", "separator": "," } }, { "split": { "field": "postId", "separator": "," } }, { "script": { "lang": "painless", "source": """ if(ctx.containsKey('nickName')){ ctx.name = ctx.nickName; ctx.remove('nickName'); ctx.userSource = 1; } """ } } ] }
1.5 calling pipeline
1. To use pipeline, you need to add the ignest role in es and modify the es configuration file
node.roles: [ignest]
2. Specify pipeline in user's settings
PUT user { "mappings": { "properties": { "code": { "type": "keyword" }, "userType": { "type": "long" }, "account": { "type": "text", "analyzer": "ik_smart" }, "realName": { "type": "text", "analyzer": "ik_smart" }, "email": { "type": "text", "fields": { "keyword": { "type": "keyword" } } }, "phone": { "type": "keyword" }, "sex": { "type": "integer" }, "roleIds": { "type": "long" }, "deptIds": { "type": "keyword" }, "postIds": { "type": "long" }, "parentDeptIds": { "type": "keyword" }, "thirdPlatformUserId": { "type": "keyword" }, "tenantUserId": { "type": "long" }, "userSource": { "type": "integer" }, "tenantId": { "type": "keyword" }, "createUser": { "type": "long" }, "createDept": { "type": "keyword" }, "createTime": { "type": "date" } } }, "settings": { "default_pipeline": "user_mysql_pipeline", "number_of_replicas": 0, // Because I use a single node for testing, I set replica sharding to 0 "number_of_shards": 1 } }
Or you can specify the pipeline when inserting data. This method is not applicable because it is automatic synchronization
PUT user/_doc/1?pipeline=user_mysql_pipeline { ... }
3. After executing the above statements in kibana or other es clients, start canal and logstash to synchronize the data, and es will preprocess the data
4. After the test, you can see that the data conversion is successful
GET user/_search?size=100