Using logstash to synchronize MySQL data to ES

Summary

                        But without special business requirements, the official logstash has its advantages.
   when using logstash, we should first understand its characteristics, and then decide whether to use it:

  • No development is needed, just install and configure logstash;
  • Any logstash that can be realized by SQL can be realized (this is to query data through SQL).
  • Support full synchronization or incremental synchronization according to specific fields (such as incremental ID, modification time);
  • The synchronization frequency is controllable, and the fastest synchronization frequency is once per minute (if the effectiveness is required to be high, use it with caution);
  • Synchronization of data deleted by physical deletion is not supported for physical deletion of data in ES (logical deletion field IsDelete can be added in table design to identify data deletion).

1, installation

            go to the official website to download logstash, download address https://www.elastic.co/downloads/logstash , the zip package is about 160M (if you think the official website download is slow, you can go to the CSDN of @ zxiaofan to download);
   program directory: [windows] G:\ELK\logstash-6.5.4; [linux] / tomcat/logstash/logstash-6.5.4.
                   

2, configuration

2.1. Create a new directory to store configuration files and mysql dependency packages

                         
                 create a new jdbc.conf file in the [program directory] \ mysql directory, which will configure the core information such as database connection information, query data sql,
   please refer to the note information.

2.2 single table synchronization configuration

input {
	stdin {}
	jdbc {
		type => "jdbc"
		 # Database connection address
		jdbc_connection_string => "jdbc:mysql://192.168.1.1:3306/TestDB?characterEncoding=UTF-8&autoReconnect=true""
		 # Database connection account password;
		jdbc_user => "username"
		jdbc_password => "pwd"
		 # MySQL depends on the package path;
		jdbc_driver_library => "mysql/mysql-connector-java-5.1.34.jar"
		 # the name of the driver class for mysql
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		 # Database reconnection attempts
		connection_retry_attempts => "3"
		 # Determine whether the database connection is available. false is not enabled by default.
		jdbc_validate_connection => "true"
		 # Database connection available verification timeout, 3600S by default
		jdbc_validation_timeout => "3600"
		 # Turn on paging query (false is not turned on by default);
		jdbc_paging_enabled => "true"
		 # The number of single page queries (100000 by default, if there are many fields and the update frequency is high, it is recommended to lower this value);
		jdbc_page_size => "500"
		 # Statement is the query data sql. If the sql is complex, it is recommended to configure the storage path of sql file through statement [filepath].
		 # SQL last value is a built-in variable that stores the value of the last data tracking column in the last query result, which is ModifyTime.
		 # statement_filepath => "mysql/jdbc.sql"
		statement => "SELECT KeyId,TradeTime,OrderUserName,ModifyTime FROM `DetailTab` WHERE ModifyTime>= :sql_last_value order by ModifyTime asc"
		 # Whether to convert the field name to lowercase is true by default (if there is a need for data serialization and deserialization, it is recommended to change it to false);
		lowercase_column_names => false
		 # Value can be any of: fatal,error,warn,info,debug, default info;
		sql_log_level => warn
		 #
		 # Whether to record the last execution result. true means that the value of the tracking column field of the last execution result will be saved to the file specified by last run metadata path.
		record_last_run => true
		 # When you need to record the value of a field in the query result, this field is true. Otherwise, the default tracking column is the value of timestamp.
		use_column_value => true
		 # Fields to be recorded, for incremental synchronization, need to be database fields
		tracking_column => "ModifyTime"
		 # Value can be any of: numeric,timestamp,Default value is "numeric"
		tracking_column_type => timestamp
		 # Record > last > run the last data storage location;
		last_run_metadata_path => "mysql/last_id.txt"
		 # To clear the record of last run metadata path, this field must be false when incremental synchronization is required.
		clean_run => false
		 #
		 # Synchronization frequency (time, day, month and year), once per minute by default;
		schedule => "* * * * *"
	}
}

filter {
	json {
		source => "message"
		remove_field => ["message"]
	}
	# convert field type conversion, change the data type of TotalMoney field to float;
	mutate {
		convert => {
			"TotalMoney" => "float"
		}
	}
}
output {
	elasticsearch {
		 # host => "192.168.1.1"
		 # port => "9200"
		 # Configure ES cluster address
		hosts => ["192.168.1.1:9200", "192.168.1.2:9200", "192.168.1.3:9200"]
		 # Index name, must be lowercase
		index => "consumption"
		 # Data unique index (database KeyID is recommended)
		document_id => "%{KeyId}"
	}
	stdout {
		codec => json_lines
	}
}

2.3 multi meter synchronization

The difference between multi table configuration and single table configuration is that the jdbc module of input module has several types, and the output module needs to have several types.

input {
	stdin {}
	jdbc {
		 # When multiple tables are synchronized, the table types are differentiated. It is recommended to name it "library name" table name. Each jdbc module needs to correspond to one type.
		type => "TestDB_DetailTab"
		
		 # Other configurations are omitted here, refer to single table configuration
		 # ...
		 # ...
		 # Record > last > run the last data storage location;
		last_run_metadata_path => "mysql\last_id.txt"
		 # To clear the record of last run metadata path, this field must be false when incremental synchronization is required.
		clean_run => false
		 #
		 # Synchronization frequency (time, day, month and year), once per minute by default;
		schedule => "* * * * *"
	}
	jdbc {
		 # When multiple tables are synchronized, the table types are differentiated. It is recommended to name it "library name" table name. Each jdbc module needs to correspond to one type.
		type => "TestDB_Tab2"
		# During multi table synchronization, the paths of last run metadata path configuration should be inconsistent to avoid impact.
		 # Other configurations are omitted here
		 # ...
		 # ...
	}
}

filter {
	json {
		source => "message"
		remove_field => ["message"]
	}
}

output {
	# The type of output module should be the same as that of jdbc module
	if [type] == "TestDB_DetailTab" {
		elasticsearch {
			 # host => "192.168.1.1"
			 # port => "9200"
			 # Configure ES cluster address
			hosts => ["192.168.1.1:9200", "192.168.1.2:9200", "192.168.1.3:9200"]
			 # Index name, must be lowercase
			index => "detailtab1"
			 # Data unique index (database KeyID is recommended)
			document_id => "%{KeyId}"
		}
	}
	if [type] == "TestDB_Tab2" {
		elasticsearch {
			# host => "192.168.1.1"
			# port => "9200"
			# Configure ES cluster address
			hosts => ["192.168.1.1:9200", "192.168.1.2:9200", "192.168.1.3:9200"]
			# Index name, must be lowercase
			index => "detailtab2"
			# Data unique index (database KeyID is recommended)
			document_id => "%{KeyId}"
		}
	}
	stdout {
		codec => json_lines
	}
}

3. Start operation

    execute the following command in the [program directory] directory to start:

[windows]
bin\logstash.bat -f mysql\jdbc.conf
[linux]
nohup ./bin/logstash -f mysql/jdbc_jx_moretable.conf &

                  
                   

Note:
   5.x/6.X/7.x requires jdk8 support. If the default JDK version is not jdk8, you need to add two environment variables at the beginning of the line of logstash or logstash.lib.sh:

export JAVA_CMD="/usr/tools/jdk1.8.0_162/bin"
export JAVA_HOME="/usr/tools/jdk1.8.0_162/"

Start up:

4. Problems and Solutions

4.1 after data synchronization, ES has no data

The index of   output.elasticsearch module must be all lowercase;

4.2. After incremental synchronization, the content of last run metadata path file does not change.

   if lowercase   column   names is not configured to be false, the tracking   column field must be configured to be all lowercase.

4.3. Prompt that jdbc driver library cannot be found

2032 com.mysql.jdbc.Driver not loaded.
Are you sure you've included the correct jdbc driver in :jdbc_driver_library?

   check whether the configured address is correct. If it is a linux environment, note that the path separator is' / ', not' \ '.

4.4 data loss

In the sql configured by    statement, if the comparison field is larger than ">", there may be data loss.
                                
   solution: use a comparison field greater than or equal to "> =".

4.5 repeated data update

                      
                                 
   when there is a lot of special data above, and there is no new data update for a long time, a large number of data will be synchronized to ES repeatedly.
When will the above situation occur: ① the comparison field is not "auto increment"; ② the comparison field is program generated insertion.
Solution:

  • ① the auto increment of comparison field ensures no repetition or minimal repetition probability (such as using auto increment ID or timestamp of database), which can avoid most abnormal situations;
  • ② if there is a large number of program inserted data, the update time is the same, and there may be no data update for a long time, it can be considered to regularly update a test data in the database to avoid a large number of data in the maximum value.

4.6, disaster tolerance

                      
                               
                  

4.7 mass data synchronization

Why is it slow? logstash paging query uses temporary table paging. Each paging SQL queries the whole set as a temporary table, and then pages the query on the temporary table. This results in a full table scan of the primary table for each paging query.

SELECT * FROM (SELECT * FROM `ImageCN1`
 WHERE ModifyTime>= '1970-01-01 08:00:00'
 order by ModifyTime asc) AS `t1`
 LIMIT 5000 OFFSET 10000000;

   the amount of data is too large. How to safely transition synchronization for the first synchronization?
                         First, a small amount of data is tested and verified, and then, according to the test conditions, modify the interval conditions to start logstash to complete synchronization. For example, modify SQL to:

SELECT
	* 
FROM
	`ImageCN1` 
WHERE
	ModifyTime < '2018-10-10 10:10:10' AND ModifyTime >= '1970-01-01 08:00:00' 
ORDER BY
	ModifyTime ASC

   when the data in the modifytime < 10:10:10:10 'interval of October 10, 2018 is synchronized, the data in the remaining interval of the modified SQL is synchronized.
                       We can modify SQL as follows:

SELECT
	* 
FROM
	`ImageCN1` 
WHERE
	ModifyTime &gt;= '1970-01-01 08:00:00' 
ORDER BY
	ModifyTime ASC 
	LIMIT 100000

                      

[SQL]USE XXXDataDB;
//Rows affected: 0
//Time: 0.001s

[SQL]
SELECT
	* 
FROM
	( SELECT * FROM `ImageCN1` WHERE ModifyTime &gt;= '1970-01-01 08:00:00' ORDER BY ModifyTime ASC ) AS `t1` 
	LIMIT 5000 OFFSET 900000;
//Rows affected: 0
//Time: 7.229s

[SQL]
SELECT
	* 
FROM
	( SELECT * FROM `ImageCN1` WHERE ModifyTime &gt;= '2018-07-18 19:35:10' ORDER BY ModifyTime ASC LIMIT 100000 ) AS `t1` 
	LIMIT 5000 OFFSET 90000
//Rows affected: 0
//Time: 1.778s

From the    test, it can be seen that when the limit is not added to the SQL, the later the paging query is, the slower the time-consuming reaches 8S, while the time-consuming of the SQL with the limit condition is stable within 2S.

>Good luck!
Life is all about choices!
In the future, you will be grateful for yourself!
[CSDN][GitHub][OSCHINA][Nuggets][Wechat public account]

Keywords: Database SQL JDBC MySQL

Added by ztealmax on Tue, 22 Oct 2019 21:17:59 +0300