Reference resources: dapeng Log Collection, Processing and Query Application
Recently, according to dapeng's log management system, I think I'll try to build an EFK myself. Here I record my experience of trampling pits. Thank you very much. Ever_00,Yangyang_3720 Support and help from big boys
Technical Selection
Returning to the topic, we still choose fluent-bit+fluentd+kafka+elastic search as the scheme of the log system. The dapeng service has integrated single-node fluent-bit to collect log files from various docker containers and send them to fluentd. Fluentd collects all logs as a transit and sends them to kafak for peak shaving and valley filling, after peak shaving. The data is then sent to elastic search via fluentd for storage. Instead of using Kibana, the building process uses elastic search-head to display the log interface.
For non-dapeng services, you need to modify the Docker file yourself and pack the modified fluent-bit to run sh/opt/fluent-bit/fluent-bit.sh when the container service starts.
fluent-bit Log Collection Configuration
fluent-bit-dapeng.conf
[SERVICE] Flush 5 Daemon On Log_Level error Log_File /fluent-bit/log/fluent-bit.log Parsers_File parse_dapeng.conf [INPUT] Name tail Path /dapeng-container/logs/*.log Exclude_Path /dapeng-container/logs/fluent*.log,/dapeng-container/logs/console.log,/dapeng-container/logs/gc*.log Tag dapeng Multiline on Buffer_Chunk_Size 2m buffer_max_size 30m Mem_Buf_Limit 32m DB.Sync Normal db_count 400 Parser_Firstline dapeng_multiline db /fluent-bit/db/logs.db [FILTER] Name record_modifier Match * Record hostname ${soa_container_ip} Record tag ${serviceName} [OUTPUT] Name Forward Match * Host fluentd Port 24224 HostStandby fluentdStandby PortStandby 24224
In dapeng services, the configuration of service Name, soa_container_ip, fluentd, fluentdStandby is essential for each service. Path, Exclude_Path are used to specify which logs need to be collected and which need to be filtered, which can be modified by environment variables:
fluentBitLogPath=/dapeng-container/logs/*.log fluentBitLogPathExclude=/dapeng-container/logs/fluent*.log,/dapeng-container/logs/console.log,/dapeng-container/logs/gc*.log
At the same time, you need to mount the above fluent-bit-dapeng.conf to / opt/fluent-bit/etc/fluent-bit.conf.
environment: - serviceName=payment - container_ip=${host_ip} - soa_container_port=${payment_port} - soa_container_ip=${host_ip} - host_ip=${host_ip} - soa_service_timeout=60000 - JAVA_OPTS=-Dname=payment -Dfile.encoding=UTF-8 -Dsun.jun.encoding=UTF-8 -Dio.netty.leakDetectionLevel=advanced - kafka_consumer_host=${kafka_host_ip}:9092 - kafka_producer_host=${kafka_host_ip}:9092 env_file: - .envs/application.env - .envs/common.env volumes: - "/data/logs/payment:/dapeng-container/logs" - "/data/var/fluent/order/:/fluent-bit/db/" - "./config/fluent-bit-dapeng.conf:/opt/fluent-bit/etc/fluent-bit.conf" - "/data/var/shm:/data/shm" ports: - "${payment_port}:${payment_port}" extra_hosts: - "fluentd:${fluentd_host}" - "fluentdStandby:${fluentdStandby_host}" - "db-master:${mysql_host_ip}" - "soa_zookeeper:${zookeeper_host_ip}" - "redis_host:${redis_host_ip}"
At the same time, you can see parse_dapeng.conf in the Dapeng service container as follows
[PARSER] Name dapeng_multiline Format regex Regex (?<logtime>\d{1,2}-\d{1,2} \d{1,2}:\d{1,2}:\d{1,2} \d{1,3}) (?<threadPool>.*) (?<level>.*) \[(?<sessionTid>.*)\] - (?<message>.*)
Regex is to match logs regularly and parse the information we need, such as logtime, message, etc.
We can also set analytic expressions by using environment variables.
fluentbitParserRegex=(?<logtime>^\d{2}-\d{2} \d{2}:\d{2}:\d{2} \d{3}) (?<threadPool>[^ ]+|Check idle connection Thread) (?<level>[^ ]+) \[(?<sessionTid>\w*)\] - (?<message>.*)
Note: Although dapeng integrates fluent-bit, it is not turned on by default, and environment variables need to be modified:
fluent_bit_enable=true
Mirror image of fluentd
First, prepare the mirror of fluentd. Here is the Docker file of fluentd.
FROM fluent/fluentd:v1.2 #Add es plug-in, kafka plug-in RUN fluent-gem install fluent-plugin-elasticsearch RUN fluent-gem install fluent-plugin-kafka CMD exec fluentd -c /fluentd/etc/${FLUENTD_CONF} -p /fluentd/plugins $FLUENTD_OPT
- Packing image (note in the directory where Dockerfile is located, that is, in the context of Dockerfile)
docker build docker.****.com:80/basic/fluentd:v1.2 . - push to docker
docker push docker.****.com:80/basic/fluentd:v1.2 - Configure fluentd in dc-all.yml file (dapeng's source-compose encapsulates docker-compose)
fluentd: container_name: fluentd image: docker.****.com:80/basic/fluentd:v1.2 restart: on-failure:3 volumes: - /data/var/fluentd/log:/fluentd/log - /data/var/fluentd/etc:/fluentd/etc environment: - LANG=zh_CN.UTF-8 - TZ=CST-8 ports: - "24224:24224" labels: - project.source= - project.extra=public-image - project.depends= - project.owner=
The configuration for fluentd is configured under / data/var/fluentd/etc
fluent.conf Configuration of fluentd Transmitter
In theory, we need to start two fluentd s to do the following 1 and 2 jobs respectively. Here we merge them into one service first.
# 1. Collect logs and send them to kafka. Topic is efk # Open 8 worker threads with ports accumulating from 24225 <system> log_level error flush_thread_count 8 workers 8 </system> <source> @type forward port 24224 </source> <source> @type monitor_agent port 24225 </source> <match dapeng> @type kafka_buffered brokers kafak server address:9092 topic_key efk buffer_type file buffer_path /tmp/buffer flush_interval 5s default_topic efk output_data_type json compression_codec gzip max_send_retries 3 required_acks -1 discard_kafka_delivery_failed true </match> # 1. Collection logs are sent to kafka, topic is the end of efk # 2. Log messages in consuming kafka are sent to elastic search, topic is efk, group is efk-consumer #<system> # log_level error # flush_thread_count 2 # workers 2 #</system> #<source> # @type monitor_agent # port 24225 #</source> <source> @type kafka_group brokers kafka server address:9092 consumer_group efk-consumer topics efk format json start_from_beginning false max_wait_time 5 max_bytes 1500000 </source> <match> @type elasticsearch hosts elasticsearch server address:9200 index_name dapeng_log_index type_name dapeng_log #content_type application/x-ndjson buffer_type file buffer_path /tmp/buffer_file buffer_chunk_limit 10m buffer_queue_limit 512 flush_mode interval flush_interval 5s request_timeout 5s flush_thread_count 2 reload_on_failure true resurrect_after 30s reconnect_on_error true with_transporter_log true logstash_format true logstash_prefix dapeng_log_index template_name dapeng_log_index template_file /fluentd/etc/template.json num_threads 2 utc_index false </match> # 2. Consumption of log messages in kafka is sent to the end of elastic search
template.json configures elastic search's template for index creation
{ "template": "dapeng_log_index-*", "mappings": { "dapeng_log": { "properties": { "logtime": { "type": "date", "format": "MM-dd HH:mm:ss SSS" }, "threadPool": { "type": "keyword", "norms": false, "index_options": "docs" }, "level": { "type": "keyword", "norms": false, "index_options": "docs" }, "tag": { "type": "keyword", "norms": false, "index_options": "docs" }, "message": { "type": "keyword", "ignore_above": 2048, "norms": false, "index_options": "docs" }, "hostname": { "type": "keyword", "norms": false, "index_options": "docs" }, "sessionTid": { "type": "keyword", "norms": false, "index_options": "docs" }, "log": { "type": "keyword", "norms": false, "index_options": "docs" } } } }, "settings": { "index": { "max_result_window": "100000000", "number_of_shards": "3", "number_of_replicas": "1", "codec": "best_compression", "translog": { "sync_interval": "60s", "durability": "async", "flush_threshold_size": "1024mb" }, "merge":{ "policy":{ "max_merged_segment": "2gb" } }, "refresh_interval": "10s" } }, "warmers": {} }
Mirror preparation of elastic search-head
- First, clone elastic search-head project to / data/workspace directory
git clone git://github.com/mobz/elasticsearch-head.git - Configure elastic search-head in dc-all.yml file
elasticsearch-head: image: mobz/elasticsearch-head:5 container_name: elasticsearch-head restart: on-failure:3 environment: - LANG=zh_CN.UTF-8 - TZ=CST-8 volumes: - /data/workspace/elasticsearch-head/Gruntfile.js:/usr/src/app/Gruntfile.js - /data/workspace/elasticsearch-head/_site/app.js:/usr/src/app/_site/app.js ports: - "9100:9100" labels: - project.source= - project.extra=public-image - project.depends= - project.owner=
For Gruntfile.js, 97 lines need to be changed as follows:
connect: { server: { options: { hostname: '0.0.0.0', port: 9100, base: '.', keepalive: true } } }
For app.js, you need to change line 4379: modify localhost to the elastic search cluster address
/** Modify localhost to the elastic search cluster address. In Docker deployment, the elastic search host address is generally used. */ this.base_uri = this.config.base_uri || this.prefs.get("app-base_uri") || "http://Elastic search server address: 9200/";
Mirror preparation of elastic search
- Configuration of elastic search in dc-all.yml file
elasticsearch: image: elasticsearch:6.7.1 container_name: elasticsearch restart: on-failure:3 environment: - LANG=zh_CN.UTF-8 - TZ=CST-8 - "ES_JAVA_OPTS=-Xms512m -Xmx512m" volumes: - /data/var/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml ports: - "9200:9200" - "9300:9300" labels: - project.source= - project.extra=public-image - project.depends= - project.owner=
The elastic search. YML configuration enables cross-domain access to cors, and you can access elastic search through elastic search-head
cluster.name: "docker-cluster" network.host: 0.0.0.0 http.cors.enabled: true http.cors.allow-origin: "*" http.cors.allow-methods: OPTIONS, HEAD, GET, POST, PUT, DELETE http.cors.allow-headers: "X-Requested-With, Content-Type, Content-Length, X-User"
Elastic search startup error:
max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
That is, the memory permissions of the elastic search user are too small, requiring at least 262144
The sudo vi/etc/sysctl.conf file ends with a line: vm.max_map_count=262144, then sudo sysctl-p reloads the configuration and restarts elastic search.
Service startup
After all the above services are started, visit the http://elastic search-head server address: 9100/, and you can see the following interface (cluster health value is yellow, because I did not make a backup)
Of course, we can't see the log at first because we didn't create the index. We can add a timed task to automatically create the index every day and deal with the previous index:
autoIndex4DapengLog.sh: Periodically save the index for 7 days, open the index for the last three days, and create the index for the next day
#!/bin/bash # # Index closure and deletion # @ date 10 May 2018 18:00:00 # @description Copyright (c) 2015, github.com/dapeng-soa All Rights Reserved. date=`date -d "2 days ago" +%Y.%m.%d` date1=`date -d "6 days ago" +%Y.%m.%d` echo $date echo $date1 #Close the index curl -H "Content-Type: application/json" -XPOST http://Elastic search server address: 9200/dapeng_log_index-$date/_close #Delete index curl -H "Content-Type: application/json" -XDELETE "http://Elastic search server address: 9200/dapeng_log_index-$date1“ #Adding Index tomorrow=`date -d tomorrow +%Y.%m.%d` # List of elastic search servers that need to be indexed ipList=(elasticsearch server address:9200) for i in ${ipList[@]};do curl -H "Content-Type: application/json" -XPUT http://$i/dapeng_log_index-$tomorrow -d' { "mappings": { "_default_": { "_all": { "enabled": "false" } }, "dapeng_log": { "properties": { "logtime": { "type": "date", "format": "MM-dd HH:mm:ss SSS" }, "threadPool": { "type": "keyword", "norms": false, "index_options": "docs" }, "level": { "type": "keyword", "norms": false, "index_options": "docs" }, "tag": { "type": "keyword", "norms": false, "index_options": "docs" }, "message": { "type": "keyword", "ignore_above": 2048, "norms": false, "index_options": "docs" }, "hostname": { "type": "keyword", "norms": false, "index_options": "docs" }, "sessionTid": { "type": "keyword", "norms": false, "index_options": "docs" }, "log": { "type": "keyword", "norms": false, "index_options": "docs" } } } }, "settings": { "index": { "max_result_window": "100000000", "number_of_shards": "3", "number_of_replicas": "1", "codec": "best_compression", "translog": { "sync_interval": "60s", "durability": "async", "flush_threshold_size": "1024mb" }, "merge":{ "policy":{ "max_merged_segment": "2gb" } }, "refresh_interval": "10s" } }, "warmers": {} }' response=`curl -H "Content-Type: application/json" -s "http://$i/_cat/indices?v" |grep open | grep dapeng_log_index-$tomorrow |wc -l` echo -e "\n" if [ "$response" == 1 ];then break else continue fi done;
Crontab-e adds this command to the timed task and executes it at 23:00 a day to create the index for the next day:
0 23 * * * (cd /data/workspace/elasticsearch-head/; sh autoIndex4DapengLog.sh) > /data/workspace/elasticsearch-head/autoIndex4DapengLog.log
Now you can view the log data
If you want to remove some field information (such as _index, _id, _score, etc.) that comes with elastic search to display in the table, you need to modify elastic search-head/_site/app.js, and change line 2038 as follows:
_data_handler: function(store) { // Remove useless fields from the result set var customFields = ["logtime", "hostname", "tag", "sessionTid", "threadPool", "level", "message", "log"]; store.columns = customFields; //store.columns = store.columns.filter(i => customFields.indexOf(i) > -1); this.tools.text(store.summary); this.headers.empty().append(this._header_template(store.columns)); this.body.empty().append(this._body_template(store.data, store.columns)); this._reflow(); },
Note that the fields in customFields are identical to those in index creation, and some of them are resolved by fluent-bit
todo
- Of course, you can also see some empty fields in this figure. Copy the value of the log field to https://regex101.com/ Web site parsing, and found that the previous regular parsing Regex does not match, so part of the field did not parse to the value, can not parse part of the content in the log, the subsequent need to filter out these contents.
- Development of Real-time Production Fault Warning System Based on Existing Log System