Verification of incremental collection of mysql by logstash

Recently, there is a need to synchronize the data in a table in mysql to es. After analysis, use the jdbc plug-in of logstash or get the data in mysql. output to es. There are two collection situations: first, full collection, then incremental collection.

Process verified as follows:

1. Install longstash and mysql

The installation of mysql and logstash can be found on the Internet.

Note:

1) may need to be installed: bin / plugin install logstash input JDBC

2) MySQL driver download https://cdn.mysql.com/downloads/connector-j/mysql-connector-java-5.1.48.tar.gz

2. Create mysql tables and data

create table test.zy  (
id int,
str varchar(20)
) ;

insert into test.zy   values('1','a1');
insert into test.zy   values('2','a2');
insert into test.zy   values('3','a3');
insert into test.zy   values('4','a4');
insert into test.zy   values('5','a5');
insert into test.zy   values('6','a6');
insert into test.zy   values('7','a7');
insert into test.zy   values('8','a8');
insert into test.zy   values('9','a9');
insert into test.zy   values('10','a10');
insert into test.zy   values('11','a11');
insert into test.zy   values('12','a12');
insert into test.zy   values('13','a13');
insert into test.zy   values('14','a14');
#Incremental collection verification of inserted data
insert into test.zy   values('15','a15');
insert into test.zy   values('16','a16');

3.logstash collects mysql configuration files

#Incremental collection by field
input {
    jdbc {
        jdbc_connection_string => "jdbc:mysql://ip:3306/test"
        jdbc_user => "root"
        jdbc_password => "123456"
        jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        statement => "select * from zy where id > :sql_last_value"
	        use_column_value => true
	        tracking_column => "id"
	        record_last_run => true
	        last_run_metadata_path => "/root/test.log"
	        schedule => "*/2 * * * *"
     }
}

output {
  file {
   path => "./mysql/test-%{+YYYY-MM-dd}.txt"
  }
}

#Collect based on time stamp increment
input {
    jdbc {
        jdbc_connection_string => "jdbc:mysql://ip:port/test"
        jdbc_user => "root"
        jdbc_password => "123456"
        jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_default_timezone =>"Asia/Shanghai"  #Set the time zone of the SQL last value record, otherwise it will affect the effect of incremental collection
        statement => "select * from zy1 where time > :sql_last_value"
        use_column_value => false
        record_last_run => true
        last_run_metadata_path => "/root/test.log"
        schedule => "*/2 * * * *"
     }
}

output {
  file {
   path => "./db2/test-%{+YYYY-MM-dd}.txt"
  }
}

input Configuration resolution for
statement  implement myqsl Can also be statementpath Heel sql Path to file
use_column_value 
    //Whether to use the column value as the basis to record the last running position.
    //If set to true, the column defined by tracking column is used as: SQL last value
    //If it is set to false, then: sql_last_value reflects the last SQL run time.
tracking_column   Field name by which incremental collection is based   If use_colomn_value by false Can not write
record_last_run   Whether to record the location of the collected data
last_run_metadata_path   Set up a file to record the location of collected data
schedule         sql Frequency of script execution

4. Acquisition verification

Start logstash

    cd /usr/local/logstash

    bin/logstash -f conf/logtash.conf

logstash collection output log:

    

   

Output file content view

Record of SQL last value

Keywords: Linux MySQL JDBC SQL Java

Added by Flying Sagittarius on Sun, 24 Nov 2019 16:17:10 +0200