match =>{"message"=>"%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}"}
add_field =>["received_at", "%{@timestamp}"]
add_field =>["received_from", "%{host}"]
}
syslog_pri {}
date{
match =>["syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss"]
}
}else if[type]=="syslog"{
if"Received disconnect"in[message] {
drop {}
}else if"Disconnected from"in[message] {
drop {}
}else if"Removed slice User Slice of"in[message] {
match =>{"message"=>"Failed %{WORD:sshd_auth_type} for %{USERNAME:sshd_invalid_user} from %{IPORHOST:sshd_client_ip} port %{NUMBER:sshd_port} %{GREEDYDATA:sshd_protocol}"}
}
grok {
add_tag =>["sshd_fail2"]
match =>{"message"=>"Failed %{WORD:sshd_auth_type} for invalid user %{USERNAME:sshd_invalid_user} from %{IPORHOST:sshd_client_ip} port %{NUMBER:sshd_port} %{GREEDYDATA:sshd_protocol}"}
}
grok {
add_tag =>["sshd_accept"]
match =>{"message"=>"%{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: Accepted password for %{USERNAME:sshd_invalid_user} from %{IPORHOST:sshd_client_ip} port %{NUMBER:sshd_port} %{GREEDYDATA:sshd_protocol}"}
}
mutate {
convert =>{"geoip.city_name"=>"string"}
}
geoip {
source=>"sshd_client_ip"
}
}else if[type]=="filebeats"{
if"ZABBIXDB"in[message] {
drop {}
}else if"ZABBIX_DEMO"in[message] {
drop {}
}else if"zabbix"in[message] {
drop {}
}
grok {
add_tag =>["db_conn"]
match =>{"message"=>"%{YEAR:year}%{MONTHNUM:month}%{MONTHDAY:day} %{TIME:time},%{GREEDYDATA:host},%{GREEDYDATA:username},%{GREEDYDATA:client_hostname},%{INT:connection_id},%{INT:query_id},%{GREEDYDATA:operation},%{GREEDYDATA:schema},%{GREEDYDATA:object},%{INT:return_code}"}
}
grok {
match =>["message", "^# User@Host: %{USER:query_user}(?:\[[^\]]+\])?\s+@\s+%{HOSTNAME:query_host}?\s+\[%{IP:query_ip}?\]"]
}
grok {
match =>["message", "^# Thread_id: %{NUMBER:thread_id:int}\s+Schema: %{USER:schema}\s+Last_errno: %{NUMBER:last_errno:int}\s+Killed: %{NUMBER:killed:int}"]
grok { match =>["message", "^# Bytes_sent: %{NUMBER:bytes_sent:float}"]}
grok { match =>["message", "^SET timestamp=%{NUMBER:timestamp}"]}
grok { match =>["message", "^SET timestamp=%{NUMBER};\s+%{GREEDYDATA:query}"]}
date{ match =>["timestamp", "UNIX"]}
mutate {
remove_field =>"timestamp"
}
}
}
output {
if[type]=="syslog"{
elasticsearch {
hosts =>"172.16.0.228:9200"
manage_template =>false
index =>"rsyslog-%{+YYYY.MM.dd}"
}
}else if[type]=="filebeats"{
elasticsearch {
hosts =>"172.16.0.228:9200"
manage_template =>false
index =>"%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
}
}
}
```
# 핵심 내역
다음과 같이 Log Method에 따라 `Type`이라는 변수를 선언한다.
- udp/514 : **syslog**
> 이는 시스템의 rsyslog.conf 또는 Switch의 syslog 설정에 따라 로그가 인입되는 경우에 해당된다.
- tcp/5444 : **filebeats**
> 이는 시스템에 설치된 filebeats 설정에 따라 로그가 인입되는 경우에 해당된다.
-**Log DROP**
logstash.conf 설정에 여려 가지 조건들이 존재하지만, 이해를 돕기위해 핵심적인 내역만을 명시한다.
```plaintext
else if "authentication failure" in [message] {
drop { }
}else if "reverse mapping checking getaddrinfo for" in [message] {
drop { }
}else if "Invalid user" in [message] {
drop { }
}
```
위의 조건은 시스템에 **비정상적인 로그인**을 시도하는 로그에 대한 예외처리를 의미한다. 이 설정을 이해하기 위해 아래의 grok 조건문을 보면,
```plaintext
grok {
add_tag => [ "sshd_fail" ]
match => { "message" => "Failed %{WORD:sshd_auth_type} for %{USERNAME:sshd_invalid_user} from %{IPORHOST:sshd_client_ip} port %{NUMBER:sshd_port} %{GREEDYDATA:sshd_protocol}" }
}
```
시스템의 /var/log/messages에 Failed ~ 로 시작되는 Log는 비정상적인 로그인시도가 맞다. 그런데, **Failed** 로 Prefix되는 로그만 남는것이 아닌, `위의 예외처리한 로그가 같이 기록`된다.
즉, **하나의 행위에 한 개 이상의 로그가 기록되는 것**이다. 따라서 Failed 로 시작되는 로그만 남기고 나머지 연관된 로그는 불필요하므로, drop 처리하는 것이다.
-**ElasticSearch 전송**
```plaintext
if [type] == "syslog" {
elasticsearch {
hosts => "172.16.0.228:9200"
manage_template => false
index => "rsyslog-%{+YYYY.MM.dd}"
}
}else if [type] == "filebeats" {
elasticsearch {
hosts => "172.16.0.228:9200"
manage_template => false
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
}
}
```
> syslog 로의 type이면, Elasticsearch의 INDEX Pattern이 "rsyslog**- 로 시작되고, filebeats면 metadata 형식(filebeat-)으로 생성된다.