Fluentd-Elastic Search configures two or three things

Last weekend, I had nothing to do at home, so I set up EFK environment for the project team to collect logs. The final goal of the deployment is as follows:

In each application machine, a Fluent serves as the proxy, reads the specified application log file in tail mode, and then Forward to Fluentd, which acts as the aggregation end. The aggregation end processes and decomposes the log content into structured content, and then stores it in Elastic Search. The presentation of log content is implemented by Kinana.

Fluentd What is it? Fluentd is a completely free, fully open source log collector that can work with More than 125 systems Relatively, achieve the "log everything" architecture.

Because Elastic Search and Kibana have been deployed on Kubernetes container platform before, we are busy with the installation and configuration of Fluentd at the weekend. We haven't used it before, but we are now learning to resell it. As a result, there are several problems in the process of tossing and turning. Let me record them here.

Output plug-in request timeout

In our deployment, only two Output plug-ins, Fluentd, are used. One is Ouput-Forward, which is used to transfer log content backwards on the proxy side, and the other is Output-Elastic Search, which is used to store data in ES.

  • Proxy Output-Forward Plug-in Configuration (initial)
  <match **>
    @type             forward
    require_ack_response ture
    <server>
      host             10.xxx.xx.xx
      port             24224
    </server>
    <buffer tag>
      @type            file
      path             /xxx/xxx/buffer
      flush_interval   10s
      total_limit_size 1G
    </buffer>
  </match>

  • Configuration of Output-Elastic Search Plug-in on the sink side (initial)
  <match xxxxx.xxxxx.**>
    @type              elasticsearch
    host               10.xxx.xx.xxx
    port               30172
    logstash_format    true
    logstash_prefix    log.xxxxx.xxxxx.xxxxx-
    <buffer tag>
      @type            file
      path             "/xxx/xxx/fluentd/aggregator/xxxx/buffer"
      total_limit_size 20G
      flush_interval   10s
    </buffer>
  </match>

After configuring, after simple test run-through, take the log number generated in January-March on the application of a machine with full joy. From the Kibana interface, the number of logs on the Green Column grows upwards, thousands of... Tens of thousands... millions... Two million, and then sent pictures to the project team to show off. Think back that there can't be so many log records!? Log in to the application server log directory wc, the total number of file lines is only 1.5 million! Stunned!

Running into the Kibana interface and looking at it, we found that there were duplicates in the log records, such as 233 times for a record. At that time, we did not know the specific reason. We speculated that the Output-Elastic Search plug-in might have retransmitted the content to Elastic Search because the response timed out. So we tried to find the corresponding parameter: request_timeout was added to the sink configuration. The default value of the parameter is: 5s.

<match xxxxx.xxxxx.**>
  @type              elasticsearch
  ...
  request_timeout    30s
  <buffer tag>
    ...
  </buffer>
</match>

Signal HUP to the sink-side Fluentd process to reload the modified configuration, try again, and no duplicate logging occurs. However, the timeout is only an appearance, the real reason is the buffer configuration operation, see the next item.

Request Entity Too Large

Before explaining this problem, first look at Fluentd's buffer mechanism, which acts as the buffer of the Output plug-in. When the output terminal (e.g. Elastic Search) service is not available, Fluentd temporarily caches the output content to the file or memory, and then retries it to the output terminal.

What? Then the output service is always not restored, light can not go in, the enterprise is not to burst the Fluend buffer to lose log data?

When the buffer Full, the BufferOverflow Error exception is generated by default, and the input plug-in handles the exception by itself. The in_tail plug-in we use stops reading the log file content, and the in_forward plug-in returns errors to the upper ouput_forward plug-in. In addition to generating exceptions, overflow_action (>= 1.0) or buffer_queue_full_action (< 1.0) can be used to control the behavior of the buffer when it is full:

  • throw_exception
  • block
  • drop_oldest

For more details, please see: Official Website Output Plugin Overview (>= 1.0) or Buffer Plugin Overview

Why do we talk about buffer? Because the problem is related to it. After modifying the request_timeout parameter, you often see error messages:

2018-03-04 15:10:12 +0800 [warn]: #0 fluent/log.rb:336:warn: Could not push logs to Elasticsearch, resetting connection and trying again. Connection reset by peer (Errno::ECONNRESET)

Display that the connection is forcibly disconnected by the Elastic Search end. What happens? Force trace log level when fluentd is started on the command line, and no error can be found in full screen output except ECCONNRESET. The Elastic Search end log also did not see abnormal information. We have to turn to another killer, tcpdump, to grab the communication package between ES and tcpdump.

sudo tcpdump -A -nn -s 0 'tcp  port 9200 ' -i eth1  #9200yes ES Port to accept requests

Yes, request the header:

User-Agent: Faraday v0.13.1
Content-Type: application/json
Host: 10.210.39.136:30172
Content-Length: 129371575

Response Packet Header:

HTTP/1.1 413 Request Entity Too Large
content-length: 0

Request Entity Too Large! Check the Elastic Search document. The default acceptable packet size for ES is 100MB, controlled by the `http.max_content_length'parameter. And the length of the packets we upload is significantly larger than this limitation Content-Length: 129371575

.

What caused the result? From the above schematic diagram of Fluentd Buffer, we can see that the Output plug-in sends data packets to the output terminal in Chunk unit, and the size of Chunk is set by the parameter chunk_limit_size.( Description document here Default value: memory buffer 8MB / file buffer 256MB. We didn't set it in our initial configuration. We used the default value, so it's not uncommon for Request Entity Too Large! Errors to occur. Adding chunk_limit_size parameter settings to buffer-related configurations at the proxy and sink ends respectively:

<buffer tag>
  ...
  chunk_limit_size        10M
  ...
</buffer>
<buffer tag>
  ...
  chunk_limit_size        15M
  ...
</buffer>

Think about the previous section which explained that the request timeout between ES is related to the default chunk size setting. The default 256M chunk size of file buffer is given to the ES side, which is 100MB in size not exceeding its limit. It also allows ES to process for a long time, so there will be a request timeout.

In the process of configuration, I misread the old version of the document and used the parameter buffer_chunk_limit to set it up. It didn't work and confused me for a while.

In addition, can you set the chunk size of the sink side to be smaller than that of the proxy side? Yes, but warning logs will be generated at the sink. Look at the following code roughly, and say what I understand (not necessarily right). When Flunetd receives the chunk data from the previous level, it does not deal with the size first. When it sends to the next level, it encodes the chunk according to the type of the latter level, and then compares it with the size of the chunk set. If it is larger than the setting, it splits it into more than one and then sends the code. A warning log is generated during this period.

Increase Elastic Search log data reconfiguration

In order to prevent the recurrence of duplication of log records, in addition to the above configuration processing, it is better to generate a key value for each log before sending Elastic Search, so that when ES receives duplicate records, if the records corresponding to the primary key already exist, then Update, otherwise Insert.

<filter aplus.batch.**>
  @type            elasticsearch_genid
  hash_id_key      _hash    # storing generated hash id key (default is _hash)
</filter>

<match aplus.batch.**>
  @type              elasticsearch
  ...
  id_key             _hash # specify same key name which is specified in hash_id_key
  remove_keys        _hash # Elasticsearch doesn't like keys that start with _
  ...
</match>

Timestamp is Unix Time format!

Because in the log record sent to Elastic Searc, the domain value indicating the log time is spliced and converted to the date type:

 ${ DateTime.parse(record["systemDate"] + " " + record["systemTime"] + " +0800").to_time.to_i }

Note that you must use the to_i method at the end to convert it into Unix Time format. Otherwise, an error will occur when encoding the ES data, as large as the error message "msgPack method on the Time type is not found".

Additionally: Output-Elastic Search plug-in documentation, Official website The content is incomplete. Here.

Attach all configuration file contents:

<system>
  worker            2
  root_dir          /xxxx/xxxxx/fluentd/agent
  log_level         info
</system>

<source>
  @type             tail
  tag               xxxxx.xxxxx.agent.log
  path_key          path
  path              /xxxx/xxxxx/agent/logs/xxxxx/*.batch*.log
  pos_file          /xxxx/xxxxx/fluentd/agent/pos/batch_agent.db
  read_from_head    true

  <parse>
    @type             multiline
    format_firstline  /^(\d+-\d+-\d+\s+)?\d+:\d+:\d+\|/
    format1           /^(?<log>(\d+-\d+-\d+\s+)?\d+:\d+:\d+\|.*)$/
  </parse>
  #multiline_flush_interval 5s
</source>

<filter foo.bar>
  @type             record_transformer
  <record>
    hostname        "#{Socket.gethostname}"
  </record>
</filter>

<match **>
  @type             forward
  require_ack_response ture
  <server>
    host             xx.xxx.xx.xx
    port             24224
  </server>
  <buffer tag>
    @type            file
    path             /xxxx/xxxxx/fluentd/agent/buffer
    flush_interval   10s
    total_limit_size 1G
    chunk_limit_size 10M
  </buffer>
</match>
<source>
  @type               forward
  bind                0.0.0.0
  source_address_key  hostIP
</source>

<filter  xxxxx.xxxxx.agent.log>
   @type           parser
   reserve_data    true
   key_name        log
   <parse>
      @type        regexp
      expression   /(?mx)^(?<systemDate>\d+-\d+-\d+)?\s?(?<systemTime>\d+:\d+:\d+)\|(?<bizDate>.*)\|(?<artiPersonCode>.*)\|(?<invokeSeqNo>.*)\|(?<orgCode>.*)\|(?<tranCode>.*)\|(?<bizNo>.*)\|(?<dueNum>.*)\|(?<jobId>.*)\|(?<jobRunId>.*)\|(?<message>.*)\|(?<file>.*)\|(?<line>.*)\|(?<thread>.*)\|(?<logLevel>.*)\|(?<misc>.*)$/
   </parse>
</filter>

<filter xxxxx.xxxxx.agent.log>
   @type           record_transformer
   enable_ruby     true
   <record>
     systemDate    ${ if !(o=record["systemDate"]).nil? and o.length!=0 then o else if (d=record["path"].scan(/\d{4}-\d{2}-\d{2}/).last).nil? then Time.new.strftime("%Y-%m-%d") else d end  end }
   </record>
</filter>

<filter xxxxx.xxxxx.agent.log>
   @type          record_transformer
   enable_ruby    true
   <record>
     logtime      ${ DateTime.parse(record["systemDate"] + " " + record["systemTime"] + " +0800").to_time.to_i }
   </record>
   renew_time_key logtime
   remove_keys    logtime,systemDate,systemTime
</filter>

<filter xxxxx.xxxxx.**>
  @type            elasticsearch_genid
  hash_id_key      _hash    # storing generated hash id key (default is _hash)
</filter>

<match aplus.batch.**>
  @type              elasticsearch
  host               xx.xxx.xx.xxxx
  port               9200
  id_key             _hash # specify same key name which is specified in hash_id_key
  remove_keys        _hash # Elasticsearch doesn't like keys that start with _
  logstash_format    true
  logstash_prefix    log.xxxx.xxxxx.agent-
  request_timeout    30s
# reload_connections false
  slow_flush_log_threshold 30s
  <buffer tag>
    @type            file
    path             "/xxxx/xxxxx/fluentd/aggregator/#{ENV['HOSTNAME']}/buffer"
    total_limit_size 20G
    chunk_limit_size 15M
    flush_interval   10s
    retry_wait       10.0
  </buffer>
</match>

Keywords: ElasticSearch Unix Kubernetes sudo

Added by kayess2004 on Sun, 19 May 2019 14:48:56 +0300