Elk -- grok regular filtering Linux system login log

Filter Linux system login log / var/log/secure

Landing successfully

Jan  6 17:11:47 localhost sshd[3324]: Received disconnect from 172.16.0.13: 11: disconnected by user
Jan  6 17:11:47 localhost sshd[3324]: pam_unix(sshd:session): session closed for user root
Jan  6 17:11:48 localhost sshd[3358]: Address 172.16.0.13 maps to localhost, but this does not map back to the address - POSSIBLE BREAK-IN ATTEMPT!
Jan  6 17:11:51 localhost sshd[3358]: Accepted password for root from 172.16.0.13 port 38604 ssh2
Jan  6 17:11:51 localhost sshd[3358]: pam_unix(sshd:session): session opened for user root by (uid=0)

Landing failed

Jan  6 17:13:10 localhost sshd[3380]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=172.16.0.39  user=root
Jan  6 17:13:12 localhost sshd[3380]: Failed password for root from 172.16.0.39 port 58481 ssh2

In the above information, we only judge the success or failure of login

Jan  6 17:11:51 localhost sshd[3358]: Accepted password for root from 172.16.0.13 port 38604 ssh2
//perhaps
Jan  6 17:13:12 localhost sshd[3380]: Failed password for root from 172.16.0.39 port 58481 ssh2
--------------------- 

logstash configuration

input {
    file {
        path => "/var/log/secure"
    }
}

filter {
    grok {
        match => {
            "message" => ".* sshd\[\d+\]: (?<status>\S+) .* (?<ClientIP>(?:\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})?) .*"
        }
        overwrite => ["message"]
    }
}

output {
    if [ClientIP] =~ /\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/ and ([status] == "Accepted" or [status] == "Failed") {
        elasticsearch {
            hosts => "172.16.11.199"
            index => "logstash-%{+YYYY.MM.dd}"
        }
    }
}

Configuration explanation:

  • The input plug-in uses file to read the log file
  • The filter plug-in uses grok to match the corresponding log lines

    1. Two Fields are defined in message, which match the login source IP and login status respectively
    2. Overwrite means to overwrite the message line
  • The output plug-in specifies where to output the filtered information, which is output to elastic search
    1. A condition judgment is used to determine whether the two fields defined in the filter match. If they match, they will be output to elastic search. If they do not match, they will not be operated

Regular interpretation

Jan  6 17:11:51 localhost sshd[3358]: Accepted password for root from 172.16.0.13 port 38604 ssh2
  • . * match Jan 6 17:11:51 localhost
  • sshd[\d +]: match sshd[3358]: segment \ d + match multiple numbers
  • (?<status>\S+):
    1. (? < xxx > regular expression): define a regular expression after xxx field matching, similar to {xxx: matching result}, which can be used to use the matching result by the condition judgment in the above output
    2.\S + represents multiple strings, that is, matching Accepted or Failed

  • (?<ClientIP>(?:\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3})?)
    1. Define a ClientIP field first
    2. (?:...)? Means matching an ip but not saving it for future reference. If (...), then you can use $1 to call the matching value later. The last? Means non greedy matching, with as few matching as possible

Final output:

{
       "message" => "Mar 22 10:16:51 k8s-n2 sshd[27997]: Failed password for root from 10.201.1.10 port 39302 ssh2",
      "@version" => "1",
    "@timestamp" => "2019-03-22T02:16:51.813Z",
          "path" => "/var/log/secure",
          "host" => "k8s-n2",
        "status" => "Failed",
      "ClientIP" => "10.201.1.10"
}

Keywords: Linux Session ssh ElasticSearch

Added by sheephat on Sun, 01 Dec 2019 22:06:30 +0200