logo

개요

ELK Stack을 이용한 www.hongsnet.net System 로그분석을 운영 한다.

ELK System Dashboard Overview

elk_system_dashboard

ELK Stack 구성 참조

ELK DEMO의 로그인 정보는 포트폴리오 참조

www.hongsnet.net System Log 구성

elk_stack_hongnset

  • Server rsyslog.conf 설정

조건 : Local System에도 로그는 기록되어야 한다.

# cat /etc/rsyslog.conf
...중략
*.info;mail.none;authpriv.none;cron.none                /var/log/messages
*.info;mail.none;authpriv.none;cron.none                @172.16.0.228:514

authpriv.*                                              /var/log/secure
authpriv.*                                              @172.16.0.228:514

mail.*                                                  -/var/log/maillog

cron.*                                                  /var/log/cron
cron.*                                                  @172.16.0.228:514

*.emerg                                                 :omusrmsg:*
*.emerg                                                 @172.16.0.228:514

uucp,news.crit                                          /var/log/spooler

local7.*                                                /var/log/boot.log
  • Network Switch syslog 설정

설정대상 스위치는 Cisco C3750/3650 이다.

# Switch(config)# loggin on
# Switch(config)# logging 172.16.0.228
# Switch(config)# logging trap warning

참고적으로 Switch의 Logging Level은 다음과 같다.

 - emergency
 - alert
 - critical
 - error
 - warning
 - notification
 - informational
 - debug

debug Level이 가장많은 로그를 남기게 된다.

  • Logstash 설정
# cat /etc/logstash/conf.d/logstash.conf
input {
  beats {
    port => 5444
    host => "0.0.0.0"
    client_inactivity_timeout => "1200"
    type => "filebeats"
  }

  udp {
    port => 514
    host => "0.0.0.0"
    type => "syslog"
  }
}

filter {

  if [type] == "messages" {

    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
      add_field => [ "received_at", "%{@timestamp}" ]
      add_field => [ "received_from", "%{host}" ]
    }
    syslog_pri { }
    date {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
    }

  }else if [type] == "syslog" {

    if "Received disconnect" in [message] {
       drop { }
    }else if "Disconnected from" in [message] {
       drop { }
    }else if "Removed slice User Slice of" in [message] {
       drop { }
    }else if "NetworkDB stats" in [message] {
       drop { }
    }else if "Created slice User Slice" in [message] {
       drop { }
    }else if "Started Session" in [message] {
       drop { }
    }else if "Removed slice User Slice" in [message] {
       drop { }
    }else if "authentication failure" in [message] {
       drop { }
    }else if "reverse mapping checking getaddrinfo for" in [message] {
       drop { }
    }else if "not met by user" in [message] {
       drop { }
    }else if "Removed session" in [message] {
       drop { }
    }else if "Invalid user" in [message] {
       drop { }
    }

    grok {
      add_tag => [ "sshd_fail" ]
      match => { "message" => "Failed %{WORD:sshd_auth_type} for %{USERNAME:sshd_invalid_user} from %{IPORHOST:sshd_client_ip} port %{NUMBER:sshd_port} %{GREEDYDATA:sshd_protocol}" }
    }

    grok {
      add_tag => [ "sshd_fail2" ]
      match => { "message" => "Failed %{WORD:sshd_auth_type} for invalid user %{USERNAME:sshd_invalid_user} from %{IPORHOST:sshd_client_ip} port %{NUMBER:sshd_port} %{GREEDYDATA:sshd_protocol}" }
    }

    grok {
      add_tag => [ "sshd_accept" ]
      match => { "message" => "%{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: Accepted password for %{USERNAME:sshd_invalid_user} from %{IPORHOST:sshd_client_ip} port %{NUMBER:sshd_port} %{GREEDYDATA:sshd_protocol}" }
    }

    grok {
      add_tag => [ "HDD_SMART_CHECK_ERROR" ]
      match => { "message" => "Error updating SMART data: Error sending ATA command CHECK" }
    }

    grok {
      add_tag => [ "DISK_ERROR" ]
      match => { "message" => "Buffer I/O error" }
    }

    mutate {
      convert => {"geoip.city_name" => "string"}
    }
    geoip {
      source => "sshd_client_ip"
    }


  }else if [type] == "filebeats" {

    if "ZABBIXDB" in [message] {
       drop { }
    }else if "ZABBIX_DEMO" in [message] {
       drop { }
    }else if "zabbix" in [message] {
       drop { }
    }

    grok {
      add_tag => [ "db_conn" ]
      match => { "message" => "%{YEAR:year}%{MONTHNUM:month}%{MONTHDAY:day} %{TIME:time},%{GREEDYDATA:host},%{GREEDYDATA:username},%{GREEDYDATA:client_hostname},%{INT:connection_id},%{INT:query_id},%{GREEDYDATA:operation},%{GREEDYDATA:schema},%{GREEDYDATA:object},%{INT:return_code}" }
    }

    grok {
        match => [ "message", "^# User@Host: %{USER:query_user}(?:\[[^\]]+\])?\s+@\s+%{HOSTNAME:query_host}?\s+\[%{IP:query_ip}?\]" ]
    }

    grok {
        match => [ "message", "^# Thread_id: %{NUMBER:thread_id:int}\s+Schema: %{USER:schema}\s+Last_errno: %{NUMBER:last_errno:int}\s+Killed: %{NUMBER:killed:int}"]
    }

    grok {
        match => [ "message", "^# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time}\s+ Rows_sent: %{NUMBER:rows_sent:int} \s+Rows_examined: %{NUMBER:rows_examined:int}\s+Rows_affected: %{NUMBER:rows_affected:int}\s+Rows_read: %{NUMBER:rows_read:int}"]
    }

    grok {  match => [ "message", "^# Bytes_sent: %{NUMBER:bytes_sent:float}"]   }
    grok {  match => [ "message", "^SET timestamp=%{NUMBER:timestamp}" ]      }
    grok {  match => [ "message", "^SET timestamp=%{NUMBER};\s+%{GREEDYDATA:query}" ]   }
    date {  match => [ "timestamp", "UNIX" ]  }

    mutate {
        remove_field => "timestamp"
    }

  }

}

output {
  if [type] == "syslog" {
    elasticsearch {
      hosts => "172.16.0.228:9200"
      manage_template => false
      index => "rsyslog-%{+YYYY.MM.dd}"
    }
  }else if [type] == "filebeats" {
    elasticsearch {
      hosts => "172.16.0.228:9200"
      manage_template => false
      index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
    }
  }
}

핵심 내역

다음과 같이 Log Method에 따라 Type이라는 변수를 선언한다.

  • udp/514 : syslog

이는 시스템의 rsyslog.conf 또는 Switch의 syslog 설정에 따라 로그가 인입되는 경우에 해당된다.

  • tcp/5444 : filebeats

이는 시스템에 설치된 filebeats 설정에 따라 로그가 인입되는 경우에 해당된다.

  • Log DROP

logstash.conf 설정에 여려 가지 조건들이 존재하지만, 이해를 돕기위해 핵심적인 내역만을 명시한다.

    else if "authentication failure" in [message] {
       drop { }
    }else if "reverse mapping checking getaddrinfo for" in [message] {
       drop { }
    }else if "Invalid user" in [message] {
       drop { }
    }

위의 조건은 시스템에 비정상적인 로그인을 시도하는 로그에 대한 예외처리를 의미한다. 이 설정을 이해하기 위해 아래의 grok 조건문을 보면,

grok {
      add_tag => [ "sshd_fail" ]
      match => { "message" => "Failed %{WORD:sshd_auth_type} for %{USERNAME:sshd_invalid_user} from %{IPORHOST:sshd_client_ip} port %{NUMBER:sshd_port} %{GREEDYDATA:sshd_protocol}" }
    }

시스템의 /var/log/messages에 Failed ~ 로 시작되는 Log는 비정상적인 로그인시도가 맞다. 그런데, Failed 로 Prefix되는 로그만 남는것이 아닌, 위의 예외처리한 로그가 같이 기록된다.

즉, 하나의 행위에 한 개 이상의 로그가 기록되는 것이다. 따라서 Failed 로 시작되는 로그만 남기고 나머지 연관된 로그는 불필요하므로, drop 처리하는 것이다.

  • ElasticSearch 전송
  if [type] == "syslog" {
    elasticsearch {
      hosts => "172.16.0.228:9200"
      manage_template => false
      index => "rsyslog-%{+YYYY.MM.dd}"
    }
  }else if [type] == "filebeats" {
    elasticsearch {
      hosts => "172.16.0.228:9200"
      manage_template => false
      index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
    }
  }

syslog 로의 type이면, Elasticsearch의 INDEX Pattern이 "rsyslog**- 로 시작되고, filebeats면 metadata 형식(filebeat-)으로 생성된다.