K8s log collection using fluent bit+ClickHouse

Preparation and understanding

Fluent bit is a plug-in, lightweight, multi platform and open source log collection tool written in C. It allows data to be collected from different sources and sent to multiple destinations. Fully compatible with docker and kubernetes ecological environment.

img

The above figure is a global overview of its workflow. It collects logs through input, transformation, filtering, buffering and routing to output.

By configuring the fluent bit input and output plug-ins, you can collect logs from different channels and output them to the target channel.

Fluent bit itself is written in C language, so it is difficult to extend plug-ins. The official provides the implementation. The official provides fluent bit go, which can write plug-ins in go language. At present, it only supports the writing of output.

To collect K8s container logs using fluent bit, you need to complete the following input configuration in the conf file of fluent bit (the location of container log file running in docker is configured here, and fluent will collect logs in the file and automatically complete simple filtering and cleaning operations):

[INPUT]
 Name              tail #The tail plug-in is used here, which has a similar behavior to the tail -f shell command. The following configuration items will vary depending on the plug-in selected here
 Tag               kube.*  #The tag of the read row
 Path              /var/log/containers/*.log #Specify the path to a specific log file or multiple log files by using generic wildcards.
 Parser            docker #Specifies the name of the parser 
 DB                /var/log/flb_kube.db  #Specify a database file to track monitored files and offsets
 Mem_Buf_Limit     5MB    #Set the memory limit that the Tail plug-in can use.
 Skip_Long_Lines   On     #When the monitored file reaches the buffer capacity due to a long line, monitoring is stopped by default
 Refresh_Interval  10 //The interval, in seconds, between refreshing the list of monitored files

The output configuration is completed according to the output plug-in written by yourself (a ClickHouse plug-in is written):

[OUTPUT]
 Name            clickhouse #Plug in name
 Match           * #Matching range

At present, the official output plug-in of fluent bit does not support clickhouse, so it needs to be developed by itself.

Package the output plug-in as an OS package and combine fluent bit to form a docker image. Configure fluent bit to run as a container in k8s by mounting the configMap configuration file. Run a pod on each node node. Fluent bit collects logs through the log collection location configured in the configuration file, After processing, the configured output plug-in is used to store the data in ClickHouse.

According to the user's needs, the user can edit the output plug-in to connect the log to other databases or middleware for output (such as mysql, redis, etc.)

The following output plug-in code supporting clickhouse is attached:

Fluent bit will call the flbpluginregister (plug-in registration, identifying the plug-in name, which should correspond to OUTPUT.Name in fluent bit configuration), flbplugininit (plug-in initialization), flbpluginflush (plug-in output logic, which can simply process log data and complete output operation) and FLBPluginExit (plug-in exit) methods written by the user to complete the output of log data.

package main

import (
   "C"
   "database/sql"
   "fmt"
   "os"
   "strconv"
   "sync"
   "time"
   "unsafe"
   "github.com/fluent/fluent-bit-go/output"
   "github.com/kshvakov/clickhouse"
   klog "k8s.io/klog"
)

var (
   client *sql.DB

   database  string
   table     string
   batchSize int

   insertSQL = "INSERT INTO %s.%s(date, cluster, namespace, app, pod_name, container_name, host, log, ts) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"

   rw     sync.RWMutex
   buffer = make([]Log, 0)
)

const (
   DefaultWriteTimeout string = "20"
   DefaultReadTimeout  string = "10"

   DefaultBatchSize int = 1024
)

type Log struct {
   Cluster   string
   Namespace string
   App       string
   Pod       string
   Container string
   Host      string
   Log       string
   Ts        time.Time
}

//export FLBPluginRegister
func FLBPluginRegister(ctx unsafe.Pointer) int {
   return output.FLBPluginRegister(ctx, "clickhouse", "Clickhouse Output Plugin.!")
}

//export FLBPluginInit
// ctx (context) pointer to fluentbit context (state/ c code)
func FLBPluginInit(ctx unsafe.Pointer) int {

   var host string
   if v := os.Getenv("CLICKHOUSE_HOST"); v != "" {
      host = v
   } else {
      klog.Error("you must set host of clickhouse!")
      return output.FLB_ERROR
   }

   var user string
   if v := os.Getenv("CLICKHOUSE_USER"); v != "" {
      user = v
   } else {
      klog.Error("you must set user of clickhouse!")
      return output.FLB_ERROR
   }

   var password string
   if v := os.Getenv("CLICKHOUSE_PASSWORD"); v != "" {
      password = v
   } else {
      klog.Error("you must set password of clickhouse!")
      return output.FLB_ERROR
   }

   if v := os.Getenv("CLICKHOUSE_DATABASE"); v != "" {
      database = v
   } else {
      klog.Error("you must set database of clickhouse!")
      return output.FLB_ERROR
   }

   if v := os.Getenv("CLICKHOUSE_TABLE"); v != "" {
      table = v
   } else {
      klog.Error("you must set table of clickhouse!")
      return output.FLB_ERROR
   }

   if v := os.Getenv("CLICKHOUSE_BATCH_SIZE"); v != "" {
      size, err := strconv.Atoi(v)
      if err != nil {
         klog.Infof("you set the default bacth_size: %d", DefaultBatchSize)
         batchSize = DefaultBatchSize
      }
      batchSize = size
   } else {
      klog.Infof("you set the default bacth_size: %d", DefaultBatchSize)
      batchSize = DefaultBatchSize
   }

   var writeTimeout string
   if v := os.Getenv("CLICKHOUSE_WRITE_TIMEOUT"); v != "" {
      writeTimeout = v
   } else {
      klog.Infof("you set the default write_timeout: %s", DefaultWriteTimeout)
      writeTimeout = DefaultWriteTimeout
   }

   var readTimeout string
   if v := os.Getenv("CLICKHOUSE_READ_TIMEOUT"); v != "" {
      readTimeout = v
   } else {
      klog.Infof("you set the default read_timeout: %s", DefaultReadTimeout)
      readTimeout = DefaultReadTimeout
   }

   dsn := "tcp://" + host + "?username=" + user + "&password=" + password + "&database=" + database + "&write_timeout=" + writeTimeout + "&read_timeout=" + readTimeout

   db, err := sql.Open("clickhouse", dsn)
   if err != nil {
      klog.Error("connecting to clickhouse: %v", err)
      return output.FLB_ERROR
   }

   if err := db.Ping(); err != nil {
      if exception, ok := err.(*clickhouse.Exception); ok {
         klog.Errorf("[%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
      } else {
         klog.Errorf("Failed to ping clickhouse: %v", err)
      }
      return output.FLB_ERROR
   }
   // ==
   client = db

   return output.FLB_OK
}

//export FLBPluginFlush
// FLBPluginFlush is called from fluent-bit when data need to be sent. is called from fluent-bit when data need to be sent.
func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
   rw.Lock()
   defer rw.Unlock()
   if err := client.Ping(); err != nil {
      if exception, ok := err.(*clickhouse.Exception); ok {
         klog.Errorf("[%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
      } else {
         klog.Errorf("Failed to ping clickhouse: %v", err)
      }
      return output.FLB_ERROR
   }
   var ret int
   var timestampData interface{}
   var mapData map[interface{}]interface{}
   dec := output.NewDecoder(data, int(length))

   for {
      ret, timestampData, mapData = output.GetRecord(dec)
      if ret != 0 {
         break
      }
      var timestamp time.Time
      switch t := timestampData.(type) {
      case output.FLBTime:
         timestamp = timestampData.(output.FLBTime).Time
      case uint64:
         timestamp = time.Unix(int64(t), 0)
      default:
         timestamp = time.Now()
      }

      flattenData, err := Flatten(mapData, "", UnderscoreStyle)
      if err != nil {
         break
      }

      log := Log{}
      for k, v := range flattenData {
         value := ""
         switch t := v.(type) {
         case string:
            value = t
         case []byte:
            value = string(t)
         default:
            value = fmt.Sprintf("%v", v)
         }

         switch k {
         case "cluster":
            log.Cluster = value
         case "kubernetes_namespace_name":
            log.Namespace = value
         case "kubernetes_labels_app":
            log.App = value
         case "kubernetes_labels_k8s-app":
            log.App = value
         case "kubernetes_pod_name":
            log.Pod = value
         case "kubernetes_container_name":
            log.Container = value
         case "kubernetes_host":
            log.Host = value
         case "log":
            log.Log = value
         }

      }

      log.Ts = timestamp
      buffer = append(buffer, log)
   }

   // sink data
   if len(buffer) < batchSize {
      return output.FLB_OK
   }

   sql := fmt.Sprintf(insertSQL, database, table)

   tx, err := client.Begin()
   if err != nil {
      klog.Errorf("begin transaction failure: %s", err.Error())
      return output.FLB_ERROR
   }

   // build statements
   smt, err := tx.Prepare(sql)
   if err != nil {
      klog.Errorf("prepare statement failure: %s", err.Error())
      return output.FLB_ERROR
   }
   for _, l := range buffer {
      _, err = smt.Exec(l.Ts, l.Cluster, l.Namespace, l.App, l.Pod, l.Container, l.Host,
         l.Log, l.Ts)

      if err != nil {
         klog.Errorf("statement exec failure: %s", err.Error())
         return output.FLB_ERROR
      }
   }

   // commit and record metrics
   if err = tx.Commit(); err != nil {
      klog.Errorf("commit failed failure: %s", err.Error())
      return output.FLB_ERROR
   }

   buffer = make([]Log, 0)

   return output.FLB_OK
}

//export FLBPluginExit
func FLBPluginExit() int {
   return output.FLB_OK
}

func main() {
}

Data structure of ClickHouse

Create table sql

The table structure here can be modified as needed. The specific warehousing operations are developed in the output plug-in.

CREATE DATABASE IF NOT EXISTS scmp;
CREATE TABLE IF NOT EXISTS scmp.logs(
    date Date DEFAULT toDate(0),
    cluster String,namespace String,
    app String,
    pod_name String,
    container_name String,
    host String,log String,
    ts DateTime) 
ENGINE = MergeTree() PARTITION BY toYYYYMMDD(date) 
ORDER BY (cluster, namespace, app, pod_name, container_name, host,ts);

Table structure

img

Deploy corresponding K8s resources

Mounted profile:

apiVersion: v1
kind: ConfigMap
metadata:
  name: k8s-log-agent-config
  namespace: kube
  labels:
    k8s-app: k8s-log-agent
data:
  # Configuration files: server, input, filters and output
  # ======================================================
  fluent-bit.conf: |
    [SERVICE]
     Flush         1
     Log_Level     error
     Daemon        off
     Parsers_File  parsers.conf
     HTTP_Server   On
     HTTP_Listen   0.0.0.0
     HTTP_Port     2020

    @INCLUDE input-kubernetes.conf
    @INCLUDE filter-kubernetes.conf
    @INCLUDE output-kubernetes.conf

  input-kubernetes.conf: |
    [INPUT]
        Name              tail
        Tag               kube.*
        Path              /var/log/containers/*.log
        Parser            docker
        DB                /var/log/flb_kube.db
        Mem_Buf_Limit     5MB
        Skip_Long_Lines   On
        Refresh_Interval  10

  filter-kubernetes.conf: |
    [FILTER]
     Name                kubernetes
     Match               *
     Kube_URL            https://kubernetes.default.svc.cluster.local:443
     Merge_Log           On
     Annotations         Off
     Kube_Tag_Prefix     kube.var.log.containers.
     Merge_Log_Key       log_processed

    [FILTER]
     Name                modify
     Match               *
     Set  cluster  ${CLUSTER_NAME}
  output-kubernetes.conf: |
    [OUTPUT]
     Name            clickhouse
     Match           *
  parsers.conf: |
    [PARSER]
     Name   apache
     Format regex
     Regex  ^(?<host>[^ ]*) [^ ]* (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^\"]*?)(?: +\S*)?)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)")?$
     Time_Key time
     Time_Format %d/%b/%Y:%H:%M:%S %z

    [PARSER]
     Name   apache2
     Format regex
     Regex  ^(?<host>[^ ]*) [^ ]* (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^ ]*) +\S*)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)")?$
     Time_Key time
     Time_Format %d/%b/%Y:%H:%M:%S %z

    [PARSER]
     Name   apache_error
     Format regex
     Regex  ^\[[^ ]* (?<time>[^\]]*)\] \[(?<level>[^\]]*)\](?: \[pid (?<pid>[^\]]*)\])?( \[client (?<client>[^\]]*)\])? (?<message>.*)$

    [PARSER]
     Name   nginx
     Format regex
     Regex ^(?<remote>[^ ]*) (?<host>[^ ]*) (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^\"]*?)(?: +\S*)?)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)")?$
     Time_Key time
     Time_Format %d/%b/%Y:%H:%M:%S %z

    [PARSER]
     Name   json
     Format json
     Time_Key time
     Time_Format %d/%b/%Y:%H:%M:%S %z

    [PARSER]
     Name         docker
     Format       json
     Time_Key     time
     Time_Format  %Y-%m-%dT%H:%M:%S.%L
     Time_Keep    On

    [PARSER]
     Name        syslog
     Format      regex
     Regex       ^\<(?<pri>[0-9]+)\>(?<time>[^ ]* {1,2}[^ ]* [^ ]*) (?<host>[^ ]*) (?<ident>[a-zA-Z0-9_\/\.\-]*)(?:\[(?<pid>[0-9]+)\])?(?:[^\:]*\:)? *(?<message>.*)$
     Time_Key    time
     Time_Format %b %d %H:%M:%S

Resource permission control:

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
    name: k8s-log-agent-read
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: k8s-log-agent-read
subjects:
  - kind: ServiceAccount
    name: k8s-log-agent
    namespace: kube-system

---

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: k8s-log-agent-read
rules:
  - apiGroups: [""]
    resources:
      - namespaces
      - pods
    verbs: ["get", "list", "watch"]

---


apiVersion: v1
kind: ServiceAccount
metadata:
  name:k8s-log-agent
  namespace: kube

Start the container on each node:

apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: k8s-log-agent
  namespace: kube
  labels:
    k8s-app: k8s-log-agent
    version: v1
    kubernetes.io/cluster-service: "true"
spec:
  selector:
    matchLabels:
      k8s-app: k8s-log-agent
  template:
    metadata:
      labels:
        k8s-app: k8s-log-agent
        version: v1
        kubernetes.io/cluster-service: "true"
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/port: "2020"
        prometheus.io/path: /api/v1/metrics/prometheus
    spec:
      containers:
        - name: fluent-bit
          image: iyacontrol/fluent-bit-ck:1.2.2
          imagePullPolicy: Always
          ports:
            - containerPort: 2020
          resources:
            limits:
              cpu: 200m
              memory: 200Mi
            requests:
              cpu: 200m
              memory: 200Mi
          env:
            - name: CLUSTER_NAME
              value: "clickhouse"
            - name: CLICKHOUSE_HOST
              value: "10.1.62.62:9150"
            - name: CLICKHOUSE_USER
              value: "oms"
            - name: CLICKHOUSE_PASSWORD
              value: "EBupt123"
            - name: CLICKHOUSE_DATABASE
              value: "scmp"
            - name: CLICKHOUSE_TABLE
              value: "logs"
            - name: NODENAME
              valueFrom:
                fieldRef:
                  fieldPath: spec.nodeName
          volumeMounts:
            - name: varlog
              mountPath: /var/log
            - name: varlibdockercontainers
              mountPath: /var/lib/docker/containers
              readOnly: true
            - name: k8s-log-agent-config
              mountPath: /fluent-bit/etc/
      terminationGracePeriodSeconds: 10
      volumes:
      - name: varlog
        hostPath:
          path: /var/log
      - name: varlibdockercontainers
        hostPath:
          path: /var/lib/docker/containers
      - name: k8s-log-agent-config
        configMap:
          name: k8s-log-agent-config
      serviceAccountName: k8s-log-agent
      tolerations:
      - key: node-role.kubernetes.io/master
        operator: Exists
        effect: NoSchedule

Database storage results

img

img

fluent simple performance test

Test preparation

FLB input plug-in configuration:

Name              Tail  #Using tail mode
Buffer_Chunk_Size 32k   #Set the initial buffer size to read file data. The default is 32
Buffer_Max_Siz    32k   #Set the buffer size limit for each monitored file. The default is 32
Mem_Buf_Limit     5MB   #Set the memory limit that the Tail plug-in can use when attaching data to the engine. If the limit is reached, suspend; When the data is refreshed, it is restored. No default
Skip_Long_Lines   On    #When the length of a log line exceeds buffer_ Max_ When size is set to no, stop monitoring the file. Set to on to skip long lines and continue processing other lines suitable for the buffer size. The default off is to stop monitoring the file
Refresh_Interval  10    #The time interval for getting the log again is 60 by default
Read_from_Head    False #For newly discovered files at the beginning (without database offset / location), read the contents from the head of the file instead of the tail. The default is False
Rotate_Wait       5      #Specifies the extra time to monitor the file once when some pending data is refreshed. The default is 5
DB                /var/log/flb_kube.db  #Specify a database file to record monitored files and offsets 
DB.sync           normal    #Set the default synchronization (I/O) method to normal
DB.locking        false     #The specified database can only be accessed by Fluent Bit. The default is false
DB.journal_mode   WAL       #Set the log mode of the database to default wal
exit_on_eof       false     #When reading a file, it exits the default false immediately when it reaches the end of the file

FLB filter configuration:

Name                kubernetes #Using tail mode
Match               * #Matching rules
Kube_URL            https://kubernetes.default.svc.cluster.local:443 #k8s API server endpoint
Merge_Log           On   #When enabled, it checks whether the content of the log field is a JSON string mapping, and if so, attaches the mapping field as part of the log structure.
Annotations         On   #Include the Kubernetes resource annotation in the additional metadata. The default is on
Kube_Tag_Prefix     kube.var.log.containers. #When the source record is from the Tail input plug-in, this option specifies the prefix used in the Tail configuration
Merge_Log_Key       log_processed #Enable merge_ After log, the filter will try to assume that the log field in the incoming message is a JSON string message, and structurally represent it at the same level of the log field in the mapping. After setting the key, all new structured fields extracted from the original log content will be inserted under the new key

No upper limit is set for cpu and mem of FLB operating environment

Test start

Record the change range of cpu and memory within 10 minutes under FLB no-load, log printing frequency of 5000 pieces / s, 1000 pieces / s, 500 pieces / s, 200 pieces / s, 50 pieces / s and 1 piece / s, the time from the beginning of log recording to the beginning of retention under various conditions, and the number of log retention in a period of time after retention.

The test is performed directly using the htop command

The number of detained entries after 3 minutes is not recorded within the number of logs generated in 10 seconds

img

An interesting point found in the test is that when the log printing frequency is in the range of 1000 / s and 500 / s, the fluctuation of cpu is relatively stable, not more than 10%. However, in other conditions, in the states of 5000 / s and 100 / s and 50 / s, the fluctuation range of cpu is very large, the highest value can reach 52 and the lowest value can reach 35, This special value occurs more times than 1000 and 500 per second.

In combination with the previous tests, the stability interval of cpu at 1000 bars / S is lower than that at 500 bars / s.

Added by kishanprasad on Sun, 02 Jan 2022 12:14:20 +0200