"Sure enough fresh" e-commerce project (44) - use Logstash to automatically synchronize database content to ES

1. How to synchronize database with ES

We all know that ES is a search engine. Its search speed is much faster than that of the database. How to synchronize the contents of the database to es? The middleware Logstash is needed at this time.

Logstash is an open source data collection engine with real-time pipeline function. Logstash can dynamically unify the data from different data sources and standardize the data to the destination you choose.

I drew a schematic diagram of database synchronization with ES, as follows:

First, configure relevant information in Logstash as follows:

1. MySQL configuration information includes:

Database connection information
 Database query latest data SQL

2. Configure timer:

Configure the timing rules of the timer (e.g. once every minute from the database)

3. Configure ES:

to configure ES Connection information
ES file id Other information

2. Database data entry

The table design of the database has been explained in the previous design of commodity search service database. In order to have the demonstration effect of this article and the explanation of blog in the future, insert false data first

category table:

INSERT INTO `guoranxinxian-goods`.`category`(`ID`, `PARENT_ID`, `NAME`, `STATUS`, `SORT_ORDER`, `REVISION`, `CREATED_BY`, `CREATED_TIME`, `UPDATED_BY`, `UPDATED_TIME`) VALUES (1, 0, 'Fresh fruit', 0, 0, NULL, NULL, '2020-03-02 15:00:57', '2020-03-02 15:00:57', '2020-03-02 15:00:57');
INSERT INTO `guoranxinxian-goods`.`category`(`ID`, `PARENT_ID`, `NAME`, `STATUS`, `SORT_ORDER`, `REVISION`, `CREATED_BY`, `CREATED_TIME`, `UPDATED_BY`, `UPDATED_TIME`) VALUES (2, 1, 'Hot fruit', 0, 0, NULL, NULL, '2020-03-02 15:02:08', '2020-03-02 15:02:08', '2020-03-02 15:02:08');
INSERT INTO `guoranxinxian-goods`.`category`(`ID`, `PARENT_ID`, `NAME`, `STATUS`, `SORT_ORDER`, `REVISION`, `CREATED_BY`, `CREATED_TIME`, `UPDATED_BY`, `UPDATED_TIME`) VALUES (3, 1, 'fruits in season', 0, 0, NULL, NULL, '2020-03-02 15:02:08', '2020-03-02 15:02:08', '2020-03-02 15:02:08');
INSERT INTO `guoranxinxian-goods`.`category`(`ID`, `PARENT_ID`, `NAME`, `STATUS`, `SORT_ORDER`, `REVISION`, `CREATED_BY`, `CREATED_TIME`, `UPDATED_BY`, `UPDATED_TIME`) VALUES (4, 1, 'Tropical fruits', 0, 0, NULL, NULL, '2020-03-02 15:02:08', '2020-03-02 15:02:08', '2020-03-02 15:02:08');
INSERT INTO `guoranxinxian-goods`.`category`(`ID`, `PARENT_ID`, `NAME`, `STATUS`, `SORT_ORDER`, `REVISION`, `CREATED_BY`, `CREATED_TIME`, `UPDATED_BY`, `UPDATED_TIME`) VALUES (5, 1, 'Landmark fruit', 0, 0, NULL, NULL, '2020-03-02 15:02:08', '2020-03-02 15:02:08', '2020-03-02 15:02:08');
INSERT INTO `guoranxinxian-goods`.`category`(`ID`, `PARENT_ID`, `NAME`, `STATUS`, `SORT_ORDER`, `REVISION`, `CREATED_BY`, `CREATED_TIME`, `UPDATED_BY`, `UPDATED_TIME`) VALUES (6, 2, 'Apple', 0, 0, NULL, NULL, '2020-03-02 15:03:19', '2020-03-02 15:03:19', '2020-03-02 15:03:19');
INSERT INTO `guoranxinxian-goods`.`category`(`ID`, `PARENT_ID`, `NAME`, `STATUS`, `SORT_ORDER`, `REVISION`, `CREATED_BY`, `CREATED_TIME`, `UPDATED_BY`, `UPDATED_TIME`) VALUES (7, 2, 'orange', 0, 0, NULL, NULL, '2020-03-02 15:03:19', '2020-03-02 15:03:19', '2020-03-02 15:03:19');
INSERT INTO `guoranxinxian-goods`.`category`(`ID`, `PARENT_ID`, `NAME`, `STATUS`, `SORT_ORDER`, `REVISION`, `CREATED_BY`, `CREATED_TIME`, `UPDATED_BY`, `UPDATED_TIME`) VALUES (8, 0, 'Seafood and aquatic products', 0, 0, NULL, NULL, '2020-03-02 15:28:39', '2020-03-02 15:28:39', '2020-03-02 15:28:39');
INSERT INTO `guoranxinxian-goods`.`category`(`ID`, `PARENT_ID`, `NAME`, `STATUS`, `SORT_ORDER`, `REVISION`, `CREATED_BY`, `CREATED_TIME`, `UPDATED_BY`, `UPDATED_TIME`) VALUES (9, 8, 'fish', 0, 0, NULL, NULL, '2020-03-02 15:28:49', '2020-03-02 15:28:49', '2020-03-02 15:28:49');
INSERT INTO `guoranxinxian-goods`.`category`(`ID`, `PARENT_ID`, `NAME`, `STATUS`, `SORT_ORDER`, `REVISION`, `CREATED_BY`, `CREATED_TIME`, `UPDATED_BY`, `UPDATED_TIME`) VALUES (10, 8, 'Shrimp', 0, 0, NULL, NULL, '2020-03-02 15:28:49', '2020-03-02 15:28:49', '2020-03-02 15:28:49');
INSERT INTO `guoranxinxian-goods`.`category`(`ID`, `PARENT_ID`, `NAME`, `STATUS`, `SORT_ORDER`, `REVISION`, `CREATED_BY`, `CREATED_TIME`, `UPDATED_BY`, `UPDATED_TIME`) VALUES (11, 9, 'salmon', 0, 0, NULL, NULL, '2020-03-02 15:30:10', '2020-03-02 15:30:10', '2020-03-02 15:30:10');
INSERT INTO `guoranxinxian-goods`.`category`(`ID`, `PARENT_ID`, `NAME`, `STATUS`, `SORT_ORDER`, `REVISION`, `CREATED_BY`, `CREATED_TIME`, `UPDATED_BY`, `UPDATED_TIME`) VALUES (12, 9, 'Cod', 0, 0, NULL, NULL, '2020-03-02 15:30:10', '2020-03-02 15:30:10', '2020-03-02 15:30:10');

attribute_key table:

INSERT INTO `guoranxinxian-goods`.`attribute_key`(`ID`, `CATEGORY_ID`, `ATTRIBUTE_NAME`, `NAME_SORT`, `REVISION`, `CREATED_BY`, `CREATED_TIME`, `UPDATED_BY`, `UPDATED_TIME`) VALUES (1, 6, 'weight', '0', 1, NULL, '2020-03-02 15:34:35', '2020-03-02 15:34:35', '2020-03-02 15:34:35');
INSERT INTO `guoranxinxian-goods`.`attribute_key`(`ID`, `CATEGORY_ID`, `ATTRIBUTE_NAME`, `NAME_SORT`, `REVISION`, `CREATED_BY`, `CREATED_TIME`, `UPDATED_BY`, `UPDATED_TIME`) VALUES (2, 6, 'category', '0', 1, NULL, '2020-03-02 15:34:35', '2020-03-02 15:34:35', '2020-03-02 15:34:35');
INSERT INTO `guoranxinxian-goods`.`attribute_key`(`ID`, `CATEGORY_ID`, `ATTRIBUTE_NAME`, `NAME_SORT`, `REVISION`, `CREATED_BY`, `CREATED_TIME`, `UPDATED_BY`, `UPDATED_TIME`) VALUES (3, 6, 'country of origin', '0', 1, NULL, '2020-03-02 15:34:35', '2020-03-02 15:34:35', '2020-03-02 15:34:35');
INSERT INTO `guoranxinxian-goods`.`attribute_key`(`ID`, `CATEGORY_ID`, `ATTRIBUTE_NAME`, `NAME_SORT`, `REVISION`, `CREATED_BY`, `CREATED_TIME`, `UPDATED_BY`, `UPDATED_TIME`) VALUES (4, 6, 'Mode of sale', '0', 1, NULL, '2020-03-02 15:34:35', '2020-03-02 15:34:35', '2020-03-02 15:34:35');

attribute_vallue table:

INSERT INTO `guoranxinxian-goods`.`attribute_value`(`ID`, `ATTRIBUTE_ID`, `ATTRIBUTE_VALUE`, `VALUE_SORT`, `REVISION`, `CREATED_BY`, `CREATED_TIME`, `UPDATED_BY`, `UPDATED_TIME`) VALUES (1, '1', '3kg-4kg', '0', 1, NULL, '2020-03-02 15:36:27', '2020-03-02 15:36:27', '2020-03-02 15:36:27');
INSERT INTO `guoranxinxian-goods`.`attribute_value`(`ID`, `ATTRIBUTE_ID`, `ATTRIBUTE_VALUE`, `VALUE_SORT`, `REVISION`, `CREATED_BY`, `CREATED_TIME`, `UPDATED_BY`, `UPDATED_TIME`) VALUES (2, '1', '1000g following', '0', 1, NULL, '2020-03-02 15:36:42', '2020-03-02 15:36:42', '2020-03-02 15:36:42');
INSERT INTO `guoranxinxian-goods`.`attribute_value`(`ID`, `ATTRIBUTE_ID`, `ATTRIBUTE_VALUE`, `VALUE_SORT`, `REVISION`, `CREATED_BY`, `CREATED_TIME`, `UPDATED_BY`, `UPDATED_TIME`) VALUES (3, '1', '1000-1999g', '0', 1, NULL, '2020-03-02 15:36:43', '2020-03-02 15:36:43', '2020-03-02 15:36:43');
INSERT INTO `guoranxinxian-goods`.`attribute_value`(`ID`, `ATTRIBUTE_ID`, `ATTRIBUTE_VALUE`, `VALUE_SORT`, `REVISION`, `CREATED_BY`, `CREATED_TIME`, `UPDATED_BY`, `UPDATED_TIME`) VALUES (4, '1', '2000-3999g', '0', 1, NULL, '2020-03-02 15:36:43', '2020-03-02 15:36:43', '2020-03-02 15:36:43');
INSERT INTO `guoranxinxian-goods`.`attribute_value`(`ID`, `ATTRIBUTE_ID`, `ATTRIBUTE_VALUE`, `VALUE_SORT`, `REVISION`, `CREATED_BY`, `CREATED_TIME`, `UPDATED_BY`, `UPDATED_TIME`) VALUES (5, '2', 'Red Fuji', '0', 1, NULL, '2020-03-02 15:38:55', '2020-03-02 15:38:55', '2020-03-02 15:38:55');
INSERT INTO `guoranxinxian-goods`.`attribute_value`(`ID`, `ATTRIBUTE_ID`, `ATTRIBUTE_VALUE`, `VALUE_SORT`, `REVISION`, `CREATED_BY`, `CREATED_TIME`, `UPDATED_BY`, `UPDATED_TIME`) VALUES (6, '2', 'Snow lotus fruit', '0', 1, NULL, '2020-03-02 15:38:55', '2020-03-02 15:38:55', '2020-03-02 15:38:55');
INSERT INTO `guoranxinxian-goods`.`attribute_value`(`ID`, `ATTRIBUTE_ID`, `ATTRIBUTE_VALUE`, `VALUE_SORT`, `REVISION`, `CREATED_BY`, `CREATED_TIME`, `UPDATED_BY`, `UPDATED_TIME`) VALUES (7, '2', 'Xu Xiang', '0', 1, NULL, '2020-03-02 15:38:55', '2020-03-02 15:38:55', '2020-03-02 15:38:55');
INSERT INTO `guoranxinxian-goods`.`attribute_value`(`ID`, `ATTRIBUTE_ID`, `ATTRIBUTE_VALUE`, `VALUE_SORT`, `REVISION`, `CREATED_BY`, `CREATED_TIME`, `UPDATED_BY`, `UPDATED_TIME`) VALUES (8, '3', 'Shaanxi', '0', 1, NULL, '2020-03-02 15:38:57', '2020-03-02 15:38:57', '2020-03-02 15:38:57');
INSERT INTO `guoranxinxian-goods`.`attribute_value`(`ID`, `ATTRIBUTE_ID`, `ATTRIBUTE_VALUE`, `VALUE_SORT`, `REVISION`, `CREATED_BY`, `CREATED_TIME`, `UPDATED_BY`, `UPDATED_TIME`) VALUES (9, '3', 'Sichuan', '0', 1, NULL, '2020-03-02 15:38:57', '2020-03-02 15:38:57', '2020-03-02 15:38:57');
INSERT INTO `guoranxinxian-goods`.`attribute_value`(`ID`, `ATTRIBUTE_ID`, `ATTRIBUTE_VALUE`, `VALUE_SORT`, `REVISION`, `CREATED_BY`, `CREATED_TIME`, `UPDATED_BY`, `UPDATED_TIME`) VALUES (10, '3', 'Thailand', '0', 1, NULL, '2020-03-02 15:38:57', '2020-03-02 15:38:57', '2020-03-02 15:38:57');
INSERT INTO `guoranxinxian-goods`.`attribute_value`(`ID`, `ATTRIBUTE_ID`, `ATTRIBUTE_VALUE`, `VALUE_SORT`, `REVISION`, `CREATED_BY`, `CREATED_TIME`, `UPDATED_BY`, `UPDATED_TIME`) VALUES (11, '3', 'Hainan', '0', 1, NULL, '2020-03-02 15:38:59', '2020-03-02 15:38:59', '2020-03-02 15:38:59');
INSERT INTO `guoranxinxian-goods`.`attribute_value`(`ID`, `ATTRIBUTE_ID`, `ATTRIBUTE_VALUE`, `VALUE_SORT`, `REVISION`, `CREATED_BY`, `CREATED_TIME`, `UPDATED_BY`, `UPDATED_TIME`) VALUES (12, '3', 'Shandong', '0', 1, NULL, '2020-03-02 15:38:59', '2020-03-02 15:38:59', '2020-03-02 15:38:59');
INSERT INTO `guoranxinxian-goods`.`attribute_value`(`ID`, `ATTRIBUTE_ID`, `ATTRIBUTE_VALUE`, `VALUE_SORT`, `REVISION`, `CREATED_BY`, `CREATED_TIME`, `UPDATED_BY`, `UPDATED_TIME`) VALUES (13, '3', 'Guangxi', '0', 1, NULL, '2020-03-02 15:38:59', '2020-03-02 15:38:59', '2020-03-02 15:38:59');
INSERT INTO `guoranxinxian-goods`.`attribute_value`(`ID`, `ATTRIBUTE_ID`, `ATTRIBUTE_VALUE`, `VALUE_SORT`, `REVISION`, `CREATED_BY`, `CREATED_TIME`, `UPDATED_BY`, `UPDATED_TIME`) VALUES (14, '3', 'Vietnam', '0', 1, NULL, '2020-03-02 15:38:59', '2020-03-02 15:38:59', '2020-03-02 15:38:59');

product table:

INSERT INTO `guoranxinxian-goods`.`product`(`ID`, `CATEGORY_ID`, `NAME`, `SUBTITLE`, `MAIN_IMAGE`, `SUB_IMAGES`, `DETAIL`, `ATTRIBUTE_LIST`, `PRICE`, `STOCK`, `STATUS`, `REVISION`, `CREATED_BY`, `CREATED_TIME`, `UPDATED_BY`, `UPDATED_TIME`) VALUES (1, 6, 'Yantai Red Fuji apple', 'Red Fuji apple', 'https://ss1. bdstatic. com/70cFvXSh_ Q1YnxGkpoWK1HF6hhy/it/u=1296845473,2218916964&fm=26&gp=0. jpg', '{\"imgages\":[{\" https://ss1.bdstatic.com/70cFuXSh_Q1YnxGkpoWK1HF6hhy/it/u=3443692772 ,1819655971&fm=26&gp=0. jpg\"},{\" https://ss0.bdstatic.com/70cFuHSh_Q1YnxGkpoWK1HF6hhy/it/u=1435166030 ,2305236542&fm=26&gp=0. Jpg \ "}]} ',' 12 Yantai Fuji apples with a net weight of more than 2.6kg ',' 1,2,3,4 ', null, null, 0, 1, null,' 2020-03-02 16:02:40 ', null,' 2020-03-02 16:02:40 ');

product_specs table:

INSERT INTO `guoranxinxian-goods`.`product_specs`(`ID`, `PRODUCT_ID`, `PRODUCT_SPECS`, `SPECS_SEQ`, `PRODUCT_STOCK`, `PRODUCT_PRICE`, `REVISION`, `CREATED_BY`, `CREATED_TIME`, `UPDATED_BY`, `UPDATED_TIME`) VALUES (1, 1, '{\"Memory\":\"4G\",\"colour\":\"gules\",\"particular year\":\"2019\",\"size\":\"16 inch\"}', 0, 30, 3699.00000000, 1, NULL, '2019-03-02 15:50:04', '2019-03-02 15:50:04', '2019-03-02 15:50:04');
INSERT INTO `guoranxinxian-goods`.`product_specs`(`ID`, `PRODUCT_ID`, `PRODUCT_SPECS`, `SPECS_SEQ`, `PRODUCT_STOCK`, `PRODUCT_PRICE`, `REVISION`, `CREATED_BY`, `CREATED_TIME`, `UPDATED_BY`, `UPDATED_TIME`) VALUES (2, 1, '{\"Memory\":\"8G\",\"colour\":\"white\",\"particular year\":\"2019\",\"size\":\"16 inch\"}', 0, 30, 3899.00000000, 1, NULL, '2019-03-02 15:50:04', '2019-03-02 15:50:04', '2019-03-02 15:50:04');
INSERT INTO `guoranxinxian-goods`.`product_specs`(`ID`, `PRODUCT_ID`, `PRODUCT_SPECS`, `SPECS_SEQ`, `PRODUCT_STOCK`, `PRODUCT_PRICE`, `REVISION`, `CREATED_BY`, `CREATED_TIME`, `UPDATED_BY`, `UPDATED_TIME`) VALUES (3, 1, '{\"Memory\":\"16G\",\"colour\":\"white\",\"particular year\":\"2019\",\"size\":\"16 inch\"}', 0, 30, 4199.00000000, 1, NULL, '2019-03-02 15:50:04', '2019-03-02 15:50:04', '2019-03-02 15:50:04');

3. logstash synchronizes database contents to ES

3.1 logstash installation plug-in

3.1.1 installation of logstash input JDBC plug-in

If you want to realize logstash to synchronize MySQL data to ES, the first premise is to establish a connection between logstash and mysql. How to establish it? In fact, you need to install the jdbc plug-in. The installation method is as follows:

1. Start logstash:

docker start logstash

2. Enter the logstash plug-in Directory:

docker exec -it logstash /bin/bash
cd /usr/share/logstash/bin

3. Install jdbc plug-in online:

./logstash-plugin install logstash-input-jdbc


Error: the installation is terminated, and the plug-in "logstash input JDBC" has been supported by 'logstash integration JDBC'. It means that it has been installed and does not need to be installed again.

3.1.2 installation of logstash output elastic search plug-in

To enable logstash and es to interact with each other, you also need to install the "logstash output elastic search" plug-in.
After entering the logstash directory, execute the installation command:

./logstash-plugin install logstash-output-elasticsearch


Installation succeeded!

3.2 logstash configuration

3.2.1 explanation of logstash configuration file

First paste the logstash configuration content (mysql.conf):

input {
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/lib/mysql-connector-java-5.1.46.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.18.166:3306/taodong-goods"
    jdbc_user => "root"
    jdbc_password => "123456"
    schedule => "* * * * *"
    statement => "SELECT * FROM product WHERE UPDATED_TIME >= :sql_last_value"
    use_column_value => true
    tracking_column_type => "timestamp"
    tracking_column => "updated_time"
    last_run_metadata_path => "syncpoint_table"
  }
}


output {
    elasticsearch {
        hosts => ["192.168.162.134:9200"]
        index => "product"
        document_id => "%{id}"
        document_type => "product"
    }
    stdout {
        codec => json_lines
    }
}

Configure field resolution:

3.2.2 uploading configuration files

Note: the MySQL driver package is configured in the above configuration file, so you need to download the driver package. Download address: https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.46/mysql-connector-java-5.1.46.jar

First, upload the msql driver package to the Lib directory of logstash / usr/share/logstash/lib

Then upload the configuration file mysql Conf to / usr/share/logstash/config directory (the method is the same as above):

4. Test

1. Start elasticsearch successfully:

docker start elasticsearch


2. Start kibana successfully:

docker start kibana


3. On kibana console, search all and you can see three pieces of data:

4. Modify logstash Conf, exit and restart logstash

## Enter container
docker exec -it logstash /bin/bash
cd /usr/share/logstash/config/

## Modify mysql Conf name
mv mysql.conf logstash.conf

## Replace default startup profile
rm -rf /usr/share/logstash/pipeline/logstash.conf 
mv logstash.conf /usr/share/logstash/pipeline

exit
docker restart logstash

5. View log:

docker logs -f logstash

Wait patiently, and you can see the contents of the regular query database, as shown in the following figure:

6. Check es and you can see that it has been queried:

GET product/_search

7. Insert a piece of data into the local database again

INSERT INTO `guoranxinxian-goods`.`product`(`ID`, `CATEGORY_ID`, `NAME`, `SUBTITLE`, `MAIN_IMAGE`, `SUB_IMAGES`, `DETAIL`, `ATTRIBUTE_LIST`, `PRICE`, `STOCK`, `STATUS`, `REVISION`, `CREATED_BY`, `CREATED_TIME`, `UPDATED_BY`, `UPDATED_TIME`) VALUES (2, 6, 'Gannan navel orange', 'Sweet orange', 'https://ss1. bdstatic. com/70cFvXSh_ Q1YnxGkpoWK1HF6hhy/it/u=1296845473,2218916964&fm=26&gp=0. jpg', '{\"imgages\":[{\" https://ss1.bdstatic.com/70cFuXSh_Q1YnxGkpoWK1HF6hhy/it/u=3443692772 ,1819655971&fm=26&gp=0. jpg\"},{\" https://ss0.bdstatic.com/70cFuHSh_Q1YnxGkpoWK1HF6hhy/it/u=1435166030 ,2305236542&fm=26&gp=0. Jpg \ "}]} ',' Gannan navel orange fresh sweet orange 2.5kg single fruit 160g-200g fresh fruit ',' 1,2,3,4 ', null, null, 0, 1, null,' 2020-03-31 17:00:40 ', null,' 2020-03-31 17:00:40 ');


You can see the new data queried in the log

es can also see new data written:

5. Summary

This article mainly explains "how to use Logstash to automatically synchronize database content to ES", and explains the principle and attention details.

Keywords: Database ElasticSearch

Added by shwanky on Sat, 12 Feb 2022 04:21:11 +0200