EFK Architecture of Logging System

Reference resources: dapeng Log Collection, Processing and Query Application

Recently, according to dapeng's log management system, I think I'll try to build an EFK myself. Here I record my experience of trampling pits. Thank you very much. Ever_00,Yangyang_3720 Support and help from big boys

Technical Selection

Returning to the topic, we still choose fluent-bit+fluentd+kafka+elastic search as the scheme of the log system. The dapeng service has integrated single-node fluent-bit to collect log files from various docker containers and send them to fluentd. Fluentd collects all logs as a transit and sends them to kafak for peak shaving and valley filling, after peak shaving. The data is then sent to elastic search via fluentd for storage. Instead of using Kibana, the building process uses elastic search-head to display the log interface.

For non-dapeng services, you need to modify the Docker file yourself and pack the modified fluent-bit to run sh/opt/fluent-bit/fluent-bit.sh when the container service starts.

fluent-bit Log Collection Configuration

fluent-bit-dapeng.conf

[SERVICE]
    Flush        5
    Daemon       On
    Log_Level    error
    Log_File     /fluent-bit/log/fluent-bit.log
    Parsers_File parse_dapeng.conf

[INPUT]
    Name tail
    Path /dapeng-container/logs/*.log
    Exclude_Path  /dapeng-container/logs/fluent*.log,/dapeng-container/logs/console.log,/dapeng-container/logs/gc*.log
    Tag  dapeng
    Multiline  on
    Buffer_Chunk_Size 2m
    buffer_max_size  30m
    Mem_Buf_Limit  32m
    DB.Sync  Normal
    db_count 400
    Parser_Firstline dapeng_multiline
    db  /fluent-bit/db/logs.db

[FILTER]
    Name record_modifier
    Match *
    Record hostname ${soa_container_ip}
    Record tag ${serviceName}

[OUTPUT]
    Name  Forward
    Match *
    Host  fluentd
    Port  24224
    HostStandby fluentdStandby
    PortStandby 24224

In dapeng services, the configuration of service Name, soa_container_ip, fluentd, fluentdStandby is essential for each service. Path, Exclude_Path are used to specify which logs need to be collected and which need to be filtered, which can be modified by environment variables:

fluentBitLogPath=/dapeng-container/logs/*.log
fluentBitLogPathExclude=/dapeng-container/logs/fluent*.log,/dapeng-container/logs/console.log,/dapeng-container/logs/gc*.log

At the same time, you need to mount the above fluent-bit-dapeng.conf to / opt/fluent-bit/etc/fluent-bit.conf.

    environment:
       - serviceName=payment
       - container_ip=${host_ip}
       - soa_container_port=${payment_port}
       - soa_container_ip=${host_ip}
       - host_ip=${host_ip}
       - soa_service_timeout=60000
       - JAVA_OPTS=-Dname=payment -Dfile.encoding=UTF-8 -Dsun.jun.encoding=UTF-8  -Dio.netty.leakDetectionLevel=advanced
       - kafka_consumer_host=${kafka_host_ip}:9092
       - kafka_producer_host=${kafka_host_ip}:9092
     env_file:
       - .envs/application.env
       - .envs/common.env
     volumes:
       - "/data/logs/payment:/dapeng-container/logs"
       - "/data/var/fluent/order/:/fluent-bit/db/"
       - "./config/fluent-bit-dapeng.conf:/opt/fluent-bit/etc/fluent-bit.conf"
       - "/data/var/shm:/data/shm"
     ports:
       - "${payment_port}:${payment_port}"
     extra_hosts:
       - "fluentd:${fluentd_host}"
       - "fluentdStandby:${fluentdStandby_host}"
       - "db-master:${mysql_host_ip}"
       - "soa_zookeeper:${zookeeper_host_ip}"
       - "redis_host:${redis_host_ip}"

At the same time, you can see parse_dapeng.conf in the Dapeng service container as follows

[PARSER]
    Name        dapeng_multiline
    Format      regex
    Regex       (?<logtime>\d{1,2}-\d{1,2} \d{1,2}:\d{1,2}:\d{1,2} \d{1,3}) (?<threadPool>.*) (?<level>.*) \[(?<sessionTid>.*)\] - (?<message>.*)

Regex is to match logs regularly and parse the information we need, such as logtime, message, etc.
We can also set analytic expressions by using environment variables.

fluentbitParserRegex=(?<logtime>^\d{2}-\d{2} \d{2}:\d{2}:\d{2} \d{3}) (?<threadPool>[^ ]+|Check idle connection Thread) (?<level>[^ ]+) \[(?<sessionTid>\w*)\] - (?<message>.*)

Note: Although dapeng integrates fluent-bit, it is not turned on by default, and environment variables need to be modified:
fluent_bit_enable=true

Mirror image of fluentd

First, prepare the mirror of fluentd. Here is the Docker file of fluentd.

FROM fluent/fluentd:v1.2
#Add es plug-in, kafka plug-in
RUN  fluent-gem install fluent-plugin-elasticsearch
RUN  fluent-gem install fluent-plugin-kafka
CMD exec fluentd -c /fluentd/etc/${FLUENTD_CONF} -p /fluentd/plugins $FLUENTD_OPT
  1. Packing image (note in the directory where Dockerfile is located, that is, in the context of Dockerfile)
    docker build docker.****.com:80/basic/fluentd:v1.2 .
  2. push to docker
    docker push docker.****.com:80/basic/fluentd:v1.2
  3. Configure fluentd in dc-all.yml file (dapeng's source-compose encapsulates docker-compose)
   fluentd:
    container_name: fluentd
    image: docker.****.com:80/basic/fluentd:v1.2
    restart: on-failure:3
    volumes:
      - /data/var/fluentd/log:/fluentd/log
      - /data/var/fluentd/etc:/fluentd/etc
    environment:
      - LANG=zh_CN.UTF-8
      - TZ=CST-8
    ports:
      - "24224:24224"
    labels:
      - project.source=
      - project.extra=public-image
      - project.depends=
      - project.owner=

The configuration for fluentd is configured under / data/var/fluentd/etc
fluent.conf Configuration of fluentd Transmitter
In theory, we need to start two fluentd s to do the following 1 and 2 jobs respectively. Here we merge them into one service first.

# 1. Collect logs and send them to kafka. Topic is efk  
# Open 8 worker threads with ports accumulating from 24225
<system>
        log_level error
        flush_thread_count 8
        workers 8
</system>
<source>
  @type  forward
  port  24224
</source>
<source>
  @type monitor_agent
  port 24225
</source>

<match dapeng>
  @type kafka_buffered
  brokers kafak server address:9092
  topic_key efk
  buffer_type file
  buffer_path /tmp/buffer
  flush_interval 5s
  default_topic efk
  output_data_type json
  compression_codec gzip
  max_send_retries 3
  required_acks -1
  discard_kafka_delivery_failed true
</match>
# 1. Collection logs are sent to kafka, topic is the end of efk

# 2. Log messages in consuming kafka are sent to elastic search, topic is efk, group is efk-consumer
#<system>
#        log_level error
#        flush_thread_count 2
#        workers 2
#</system>
#<source>
#  @type monitor_agent
#  port 24225
#</source>
<source>
  @type kafka_group
  brokers kafka server address:9092
  consumer_group efk-consumer
  topics efk
  format json
  start_from_beginning false
  max_wait_time 5
  max_bytes 1500000
</source>

<match>
    @type elasticsearch
    hosts elasticsearch server address:9200
    index_name dapeng_log_index
    type_name  dapeng_log
    #content_type application/x-ndjson
    buffer_type file
    buffer_path /tmp/buffer_file
    buffer_chunk_limit 10m
    buffer_queue_limit 512
    flush_mode interval
    flush_interval 5s
    request_timeout 5s
    flush_thread_count 2
    reload_on_failure true
    resurrect_after 30s
    reconnect_on_error true
    with_transporter_log true
    logstash_format true
    logstash_prefix dapeng_log_index
    template_name dapeng_log_index
    template_file  /fluentd/etc/template.json
    num_threads 2
    utc_index  false
</match>
# 2. Consumption of log messages in kafka is sent to the end of elastic search

template.json configures elastic search's template for index creation

{
  "template": "dapeng_log_index-*",
  "mappings": {
    "dapeng_log": {
      "properties": {
        "logtime": {
          "type": "date",
          "format": "MM-dd HH:mm:ss SSS"
        },
        "threadPool": {
          "type": "keyword",
          "norms": false,
          "index_options": "docs"
        },
        "level": {
          "type": "keyword",
          "norms": false,
          "index_options": "docs"
        },
        "tag": {
          "type": "keyword",
          "norms": false,
          "index_options": "docs"
        },
        "message": {
          "type": "keyword",
          "ignore_above": 2048,
          "norms": false,
          "index_options": "docs"
        },
        "hostname": {
          "type": "keyword",
          "norms": false,
          "index_options": "docs"
        },
        "sessionTid": {
          "type": "keyword",
          "norms": false,
          "index_options": "docs"
        },
        "log": {
          "type": "keyword",
          "norms": false,
          "index_options": "docs"
        }
      }
    }
  },
  "settings": {
    "index": {
      "max_result_window": "100000000",
      "number_of_shards": "3",
      "number_of_replicas": "1",
      "codec": "best_compression",
      "translog": {
        "sync_interval": "60s",
        "durability": "async",
        "flush_threshold_size": "1024mb"
      },
      "merge":{
        "policy":{
          "max_merged_segment": "2gb"
        }
      },
      "refresh_interval": "10s"
    }
  },
  "warmers": {}
}

Mirror preparation of elastic search-head

  1. First, clone elastic search-head project to / data/workspace directory
    git clone git://github.com/mobz/elasticsearch-head.git
  2. Configure elastic search-head in dc-all.yml file
  elasticsearch-head:
    image: mobz/elasticsearch-head:5
    container_name: elasticsearch-head
    restart: on-failure:3
    environment:
      - LANG=zh_CN.UTF-8
      - TZ=CST-8
    volumes:
      - /data/workspace/elasticsearch-head/Gruntfile.js:/usr/src/app/Gruntfile.js
      - /data/workspace/elasticsearch-head/_site/app.js:/usr/src/app/_site/app.js
    ports:
      - "9100:9100"
    labels:
      - project.source=
      - project.extra=public-image
      - project.depends=
      - project.owner=

For Gruntfile.js, 97 lines need to be changed as follows:

connect: {
        server: {
                options: {
                        hostname: '0.0.0.0',
                        port: 9100,
                        base: '.',
                        keepalive: true
                }
        }
}

For app.js, you need to change line 4379: modify localhost to the elastic search cluster address

/** Modify localhost to the elastic search cluster address. In Docker deployment, the elastic search host address is generally used. */
this.base_uri = this.config.base_uri || this.prefs.get("app-base_uri") || "http://Elastic search server address: 9200/";

Mirror preparation of elastic search

  1. Configuration of elastic search in dc-all.yml file
  elasticsearch:
    image: elasticsearch:6.7.1
    container_name: elasticsearch
    restart: on-failure:3
    environment:
      - LANG=zh_CN.UTF-8
      - TZ=CST-8
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - /data/var/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
    ports:
      - "9200:9200"
      - "9300:9300"
    labels:
      - project.source=
      - project.extra=public-image
      - project.depends=
      - project.owner=

The elastic search. YML configuration enables cross-domain access to cors, and you can access elastic search through elastic search-head

cluster.name: "docker-cluster"
network.host: 0.0.0.0

http.cors.enabled: true
http.cors.allow-origin: "*"
http.cors.allow-methods: OPTIONS, HEAD, GET, POST, PUT, DELETE
http.cors.allow-headers: "X-Requested-With, Content-Type, Content-Length, X-User"

Elastic search startup error:

max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]

That is, the memory permissions of the elastic search user are too small, requiring at least 262144

The sudo vi/etc/sysctl.conf file ends with a line: vm.max_map_count=262144, then sudo sysctl-p reloads the configuration and restarts elastic search.

Service startup

After all the above services are started, visit the http://elastic search-head server address: 9100/, and you can see the following interface (cluster health value is yellow, because I did not make a backup)

Of course, we can't see the log at first because we didn't create the index. We can add a timed task to automatically create the index every day and deal with the previous index:
autoIndex4DapengLog.sh: Periodically save the index for 7 days, open the index for the last three days, and create the index for the next day

#!/bin/bash
#
# Index closure and deletion

# @ date 10 May 2018 18:00:00
# @description Copyright (c) 2015, github.com/dapeng-soa All Rights Reserved.


date=`date -d "2 days ago" +%Y.%m.%d`
date1=`date -d "6 days ago" +%Y.%m.%d`
echo $date
echo $date1
#Close the index
curl -H "Content-Type: application/json" -XPOST http://Elastic search server address: 9200/dapeng_log_index-$date/_close
#Delete index
curl -H "Content-Type: application/json" -XDELETE "http://Elastic search server address: 9200/dapeng_log_index-$date1“
#Adding Index
tomorrow=`date -d tomorrow +%Y.%m.%d`
# List of elastic search servers that need to be indexed
ipList=(elasticsearch server address:9200)
for i in ${ipList[@]};do
curl -H "Content-Type: application/json" -XPUT http://$i/dapeng_log_index-$tomorrow -d'
{
  "mappings": {
    "_default_": {
            "_all": {
                "enabled": "false"
            }
        },
    "dapeng_log": {
      "properties": {
        "logtime": {
          "type": "date",
          "format": "MM-dd HH:mm:ss SSS"
        },
        "threadPool": {
          "type": "keyword",
          "norms": false,
          "index_options": "docs"
        },
        "level": {
          "type": "keyword",
          "norms": false,
          "index_options": "docs"
        },
        "tag": {
          "type": "keyword",
          "norms": false,
          "index_options": "docs"
        },
        "message": {
          "type": "keyword",
          "ignore_above": 2048,
          "norms": false,
          "index_options": "docs"
        },
        "hostname": {
          "type": "keyword",
          "norms": false,
          "index_options": "docs"
        },
        "sessionTid": {
          "type": "keyword",
          "norms": false,
          "index_options": "docs"
        },
        "log": {
          "type": "keyword",
          "norms": false,
          "index_options": "docs"
        }
      }
    }
  },
  "settings": {
    "index": {
      "max_result_window": "100000000",
      "number_of_shards": "3",
      "number_of_replicas": "1",
      "codec": "best_compression",
      "translog": {
        "sync_interval": "60s",
        "durability": "async",
        "flush_threshold_size": "1024mb"
      },
      "merge":{
        "policy":{
          "max_merged_segment": "2gb"
        }
      },
      "refresh_interval": "10s"

    }
  },
  "warmers": {}
}'
response=`curl -H "Content-Type: application/json" -s "http://$i/_cat/indices?v" |grep open | grep dapeng_log_index-$tomorrow |wc -l`

echo -e "\n"

if [ "$response" == 1 ];then
    break
else
    continue
fi
done;

Crontab-e adds this command to the timed task and executes it at 23:00 a day to create the index for the next day:

0 23 * * *    (cd /data/workspace/elasticsearch-head/; sh autoIndex4DapengLog.sh) > /data/workspace/elasticsearch-head/autoIndex4DapengLog.log

Now you can view the log data

If you want to remove some field information (such as _index, _id, _score, etc.) that comes with elastic search to display in the table, you need to modify elastic search-head/_site/app.js, and change line 2038 as follows:

_data_handler: function(store) {
        // Remove useless fields from the result set
        var customFields = ["logtime", "hostname", "tag", "sessionTid", "threadPool", "level", "message", "log"];
        store.columns = customFields;
        //store.columns = store.columns.filter(i => customFields.indexOf(i) > -1);
        this.tools.text(store.summary);
        this.headers.empty().append(this._header_template(store.columns));
        this.body.empty().append(this._body_template(store.data, store.columns));
        this._reflow();
},

Note that the fields in customFields are identical to those in index creation, and some of them are resolved by fluent-bit

todo

  1. Of course, you can also see some empty fields in this figure. Copy the value of the log field to https://regex101.com/ Web site parsing, and found that the previous regular parsing Regex does not match, so part of the field did not parse to the value, can not parse part of the content in the log, the subsequent need to filter out these contents.
  2. Development of Real-time Production Fault Warning System Based on Existing Log System

Keywords: Linux ElasticSearch Docker kafka JSON

Added by soianyc on Thu, 29 Aug 2019 13:02:03 +0300