Several pits synchronized to ES using logstash

1. Preface

Record several issues encountered when using logstash to synchronize data from sqlserver to ES.Version used is es6.8.3+logstash6.8.3

2.logstash profile

2.1input

input {
    jdbc {
        jdbc_driver_library => "/usr/local/logstash-6.8.3/logstashconfs/sqljdbc4.jar"#Driver jar package for sqlserver
        jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
        jdbc_connection_string => "jdbc:sqlserver://192.168.1.101:1433;databaseName=test;"
        jdbc_user => "sa"
        jdbc_password => "123456"
        jdbc_default_timezone => "Asia/Shanghai"
		jdbc_paging_enabled => "true"#paging
		record_last_run => true#Record last run value
		use_column_value => true#Use field tracking in the database
		tracking_column => "update_time"#Tracked field name
		tracking_column_type => "timestamp"#Tracked field type
		last_run_metadata_path => "/usr/local/logstash-6.8.3/logstashconfs/sync-logs/consumer_statistics_update_time"#File Address of Value Store of Last Run
		clean_run => false#Use field tracking in the database
		statement => "SELECT * FROM v_test WHERE update_time>:sql_last_value and update_time<GETDATE() "#sql statement
		schedule => "*/5 * * * * *"#Execute every 5s
    }
}
  • statement

Since the data to be looked up is table-related data, the initial idea is to create multiple jdbc s, store the data in different indexes of es, and use parent-child documents for associated queries.

Later, it was found that this method was inefficient and affected the performance of ES, so the solution was to set up a number of views joined by tables in sqlserver.

The v_test in the statement here is the view created.

Since incremental Logstash updates are set, you must use the restrictions update_time>: sql_last_value and update_time<GETDATE() to ensure that the data is not lost or duplicated

For specific reasons see: How to use Logstash to synchronize data between relational databases and ElasticSearch

  • schedule

Many tutorials on the Internet say that the minimum interval is 1 minute, which is actually seconds.

Schedule =>'*/5 * * * * * * * * *'Just add one more * unit before it is the second, which is executed every 5s

2.2filter

filter {
	if ![test]{ruby{code =>'event.set("test","")'}}	
	mutate{
		convert => { "id" => "integer" }
		remove_field => ["@timestamp"]
		remove_field => ["@version"]
	}
}

This is mainly about some processing of the data found from sqlserver database. I have deleted most of the content here, leaving only some representative.

  • if ![test]{ruby{code =>'event.set("test","")'}}

This means that when the test field is null, it is handled in the language ruby, where code =>''is where the code is written

event.set("test") means to set the content of the test field to ""

Of course, we can also get event.get("test") first, get the contents of the test field, and then do a series of processing before event.set, so that we can save the value of the processed field.

The specific syntax of the ruby language can refer to this: Ruby Tutorial

  • convert => { "id" => "integer" }

This means converting the type of the id field to integer, or timestamp if a field is a time type

2.3output

output {
		elasticsearch {
			hosts => ["htkj101:9200","htkj102:9200","htkj103:9200"]
			index => "consumer_statistics"#Index Name
			document_id => "%{id}"#id of index
			document_type => "consumer_statistics"#The type of index, which has been obsolete since version 6.x, can be ignored
			template_name => "consumer_statistics"#Name of index template
		}
}
  • document_id => "%{id}"

The id of the document is the id of the imported data so that the setting can be idempotent

  • template_name => "consumer_statistics"

The name of the index template is consumer_statistics, and ES calls the template name to create an index for consumer_statistics.

You have to create this template first, of course

3. Creation of Index Template

  • instructions

curl -H "Content-Type: application/json" -XPUT http://Htkj101:9200/_template/consumer_statistics-d'Enter the template you created here'
  • Template

{
	"template": "consumer_statistics",
	"order": 2,
	"version": 60001,
	"index_patterns": ["consumer_statistics"],
	"settings": {
		"index": {
			"refresh_interval": "5s",
			"max_result_window": "2147483647"#Set maximum from+size
		}
	},
	"mappings": {
		"_default_": {
			"dynamic_templates": [{
				"message_field": {
					"path_match": "message",
					"mapping": {
						"norms": false,
						"type": "text"
					},
					"match_mapping_type": "string"
				}
			}, {
				"string_fields": {
					"mapping": {
						"norms": false,
						"type": "text",
						"fields": {
							"keyword": {
								"ignore_above": 1024,#Set field length not indexed
								"type": "keyword"
							}
						}
					},
					"match_mapping_type": "string",
					"match": "*"
				}
			}],
			"properties": {
				"@timestamp": {
					"type": "date"
				},
				"geoip": {
					"dynamic": true,
					"properties": {
						"ip": {
							"type": "ip"
						},
						"latitude": {
							"type": "half_float"
						},
						"location": {
							"type": "geo_point"
						},
						"longitude": {
							"type": "half_float"
						}
					}
				},
				"@version": {
					"type": "keyword"
				}
			}
		}
	},
	"aliases": {}
}
  • "max_result_window": "2147483647"

Paging is often required in business processing. The JAVA-API of ES sets the number of pages and the number of pages per page by from,size.

By default from+size must be less than 10000, but if actual demand is greater than 10000, it must be set here

I set the max_result_window maximum here, which is not really necessary.

Because ES sorts in memory, if the result returned at one time is too large, it may cause service downtime.

  • "ignore_above": 1024

By default, 256 means that if a field has more than 256 bytes of content, it will not be indexed.

That is, you can see the existence of this data from ES, but if you specify a query criteria, it will not be found.

For example, there are now two fields in ES: id,test, and a total of 100 pieces of data

Only one piece of data in the test field exceeds 256 bytes. Now I query the test field for data containing "1".

Even if this data exceeds 256 bytes and contains 1, it will not be queried.

To allow him to be indexed, change 256 to 1024, meaning that only more than 1024 bytes will not be indexed.

  • Complete command
curl -H "Content-Type: application/json" -XPUT http://htkj101:9200/_template/consumer_statistics -d '
{"template":"consumer_statistics","order":2,"version":60001,"index_patterns":["consumer_statistics"],"settings":{"index":{"refresh_interval":"5s","max_result_window":"2147483647"}},"mappings":{"_default_":{"dynamic_templates":[{"message_field":{"path_match":"message","mapping":{"norms":false,"type":"text"},"match_mapping_type":"string"}},{"string_fields":{"mapping":{"norms":false,"type":"text","fields":{"keyword":{"ignore_above":1024,"type":"keyword"}}},"match_mapping_type":"string","match":"*"}}],"properties":{"@timestamp":{"type":"date"},"geoip":{"dynamic":true,"properties":{"ip":{"type":"ip"},"latitude":{"type":"half_float"},"location":{"type":"geo_point"},"longitude":{"type":"half_float"}}},"@version":{"type":"keyword"}}}},"aliases":{}}'

When you create a template, you find that it always fails to create, and then you find that you can make two lines like this without making a mistake.

Keywords: Big Data Ruby JDBC Database ElasticSearch

Added by jarvishr on Sat, 11 Jan 2020 05:04:10 +0200