Commit e80c8b86 authored by JooHan Hong's avatar JooHan Hong

elk www.hongsnet.net dbms, dashboard add

parent 80e3e619
Pipeline #5290 passed with stages
in 45 seconds
[![logo](https://www.hongsnet.net/images/logo.gif)](https://www.hongsnet.net)
# 개요
ELK Stack을 이용한 www.hongsnet.net `DBMS` 로그분석을 운영 한다.
# ELK DBMS Dashboard Overview
![elk_dbms_dashboard](./images/elk_dbms_dashboard_overview.png)
# ELK Stack 구성 참조
- [ELK Stack 구성내역 바로가기](https://gitlab.hongsnet.net/joohan.hong/joohanhong/tree/master/LOG)
- [ELK System Dashboard 바로가기](https://elk-demo.hongsnet.net/app/dashboards#/view/eb4eeb10-8b7e-11eb-8686-cb443298d4ed?_g=(filters:!(),refreshInterval:(pause:!t,value:0),time:(from:now-24h,to:now))&_a=(description:'',filters:!(),fullScreenMode:!f,options:(hidePanelTitles:!f,useMargins:!t),query:(language:kuery,query:''),timeRestore:!f,title:'www.hongsnet.net%20DBMS%20%EC%9A%B4%EC%9A%A9%ED%98%84%ED%99%A9',viewMode:view))
> ELK DEMO의 **로그인 정보**는 포트폴리오 참조
# www.hongsnet.net DBMS Log 구성
![elk_stack_hongnset](./images/elk-stack-dbms.png)
- **Filebeats 설정**
> 조건 : 사전에 `MariaDB audit Log`가 적용되어야 한다.
- **MariaDB audit Log 설정 (`MariaDB-10.3.28`)**
리눅스에서 audit은 감사(감시하는) Daemon 이다. MySQL에서도 이런 역할을 하는 기능이 있는데, MySQL에서는 Plugin 형태로 제공한다. 단, 외부의 패키지를 별도로 설치할 필요가 없으며 자체 쿼리로 설치가 가능하다.
> **참고** : MariaDB 5.5.37 이후와 10.0.10 이후부터는 Default로 Audit Plugin이 내장되어 있다. 즉, 별도의 패키지를 설치할 필요가 없다.
먼저 다음과 같이 플러그인의 경로를 확인한다.
```bash
MariaDB [(none)]> show variables like 'plugin_dir';
+---------------+--------------------------+
| Variable_name | Value |
+---------------+--------------------------+
| plugin_dir | /usr/lib64/mysql/plugin/ |
+---------------+--------------------------+
1 row in set (0.001 sec)
```
다음은 현재 설치되어 있는 Plugin을 확인한 결과이다.
```bash
MariaDB [(none)]> show plugins;
+-------------------------------+----------+--------------------+---------+---------+
| Name | Status | Type | Library | License |
+-------------------------------+----------+--------------------+---------+---------+
| binlog | ACTIVE | STORAGE ENGINE | NULL | GPL |
...중략
+-------------------------------+----------+--------------------+---------+---------+
53 rows in set (0.002 sec)
```
다음과 같이 확인해보면, 아직 Audit Plugin이 설치되지 않은 상태이다.
```bash
MariaDB [(none)]> show global variables like "server_audit%";
Empty set (0.001 sec)
```
그럼 이제 다음과 같이 Audit Plugin을 설치한다.
```bash
MariaDB [(none)]> install plugin server_audit soname 'server_audit.so';
Query OK, 0 rows affected (0.001 sec)
```
위와 같이 설치가 완료되면, 다음과 같이 Plugin Table에 등록되었는지를 확인한다.
```bash
MariaDB [(none)]> select * from mysql.plugin;
+--------------+-----------------+
| name | dl |
+--------------+-----------------+
| server_audit | server_audit.so |
+--------------+-----------------+
1 row in set (0.038 sec)
```
또한 다음과 같이 설정이 존재하는지도 확인한다.
```bash
MariaDB [(none)]> show global variables like "server_audit%";
+-------------------------------+-----------------------+
| Variable_name | Value |
+-------------------------------+-----------------------+
| server_audit_events | |
| server_audit_excl_users | |
| server_audit_file_path | server_audit.log |
| server_audit_file_rotate_now | OFF |
| server_audit_file_rotate_size | 1000000 |
| server_audit_file_rotations | 9 |
| server_audit_incl_users | |
| server_audit_logging | OFF |
| server_audit_mode | 0 |
| server_audit_output_type | file |
| server_audit_query_log_limit | 1024 |
| server_audit_syslog_facility | LOG_USER |
| server_audit_syslog_ident | mysql-server_auditing |
| server_audit_syslog_info | |
| server_audit_syslog_priority | LOG_INFO |
+-------------------------------+-----------------------+
15 rows in set (0.001 sec)
```
만약 실시간으로 반영하려면, 다음과 같이 쿼리를 수행한다(왜냐하면, 위의 server_audit_logging 값이 OFF이기 때문).
```bash
MariaDB [(none)]> set global server_audit_logging=ON;
Query OK, 0 rows affected (0.000 sec)
```
마지막으로 MariaDB 재기동 시에도 실행될 수 있도록 my.cnf 파일에 등록해주자.
```bash
[mysqld]
...
server_audit_logging = 1
server_audit_output_type='FILE'
server_audit_file_rotate_size = 1024000000
server_audit_file_rotations = 10
server_audit_events='CONNECT,QUERY,TABLE,QUERY_DDL,QUERY_DML,QUERY_DCL,QUERY_DML_NO_SELECT'
```
- **Filebeats 설정**
위의 MariaDB audit LOG 설정에 따라 다음과 같이 로그를 Logstash에 전송할 Filebeats 설정을 진행해야 한다.
```bash
# cat /etc/filebeat/filebeat.yml
logging.level: error
logging.to_files: true
logging.files:
path: /var/log/filebeat
name: filebeat
keepfiles: 7
permissions: 0644
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/lib/mysql/server_audit.log
tags: ["db_audit"]
- type: log
enabled: true
paths:
- /var/lib/mysql/slow-query.log
tags: ["db_slow"]
output.logstash:
hosts: [ "172.16.0.228:5444" ]
```
- **Logstash 설정**
```bash
# cat /etc/logstash/conf.d/logstash.conf
input {
beats {
port => 5444
host => "0.0.0.0"
client_inactivity_timeout => "1200"
type => "filebeats"
}
udp {
port => 514
host => "0.0.0.0"
type => "syslog"
}
}
filter {
if [type] == "messages" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
syslog_pri { }
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}else if [type] == "syslog" {
if "Received disconnect" in [message] {
drop { }
}else if "Disconnected from" in [message] {
drop { }
}else if "Removed slice User Slice of" in [message] {
drop { }
}else if "NetworkDB stats" in [message] {
drop { }
}else if "Created slice User Slice" in [message] {
drop { }
}else if "Started Session" in [message] {
drop { }
}else if "Removed slice User Slice" in [message] {
drop { }
}else if "authentication failure" in [message] {
drop { }
}else if "reverse mapping checking getaddrinfo for" in [message] {
drop { }
}else if "not met by user" in [message] {
drop { }
}else if "Removed session" in [message] {
drop { }
}else if "Invalid user" in [message] {
drop { }
}
grok {
add_tag => [ "sshd_fail" ]
match => { "message" => "Failed %{WORD:sshd_auth_type} for %{USERNAME:sshd_invalid_user} from %{IPORHOST:sshd_client_ip} port %{NUMBER:sshd_port} %{GREEDYDATA:sshd_protocol}" }
}
grok {
add_tag => [ "sshd_fail2" ]
match => { "message" => "Failed %{WORD:sshd_auth_type} for invalid user %{USERNAME:sshd_invalid_user} from %{IPORHOST:sshd_client_ip} port %{NUMBER:sshd_port} %{GREEDYDATA:sshd_protocol}" }
}
grok {
add_tag => [ "sshd_accept" ]
match => { "message" => "%{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: Accepted password for %{USERNAME:sshd_invalid_user} from %{IPORHOST:sshd_client_ip} port %{NUMBER:sshd_port} %{GREEDYDATA:sshd_protocol}" }
}
grok {
add_tag => [ "HDD_SMART_CHECK_ERROR" ]
match => { "message" => "Error updating SMART data: Error sending ATA command CHECK" }
}
grok {
add_tag => [ "DISK_ERROR" ]
match => { "message" => "Buffer I/O error" }
}
mutate {
convert => {"geoip.city_name" => "string"}
}
geoip {
source => "sshd_client_ip"
}
}else if [type] == "filebeats" {
if "ZABBIXDB" in [message] {
drop { }
}else if "ZABBIX_DEMO" in [message] {
drop { }
}else if "zabbix" in [message] {
drop { }
}
grok {
add_tag => [ "db_conn" ]
match => { "message" => "%{YEAR:year}%{MONTHNUM:month}%{MONTHDAY:day} %{TIME:time},%{GREEDYDATA:host},%{GREEDYDATA:username},%{GREEDYDATA:client_hostname},%{INT:connection_id},%{INT:query_id},%{GREEDYDATA:operation},%{GREEDYDATA:schema},%{GREEDYDATA:object},%{INT:return_code}" }
}
grok {
match => [ "message", "^# User@Host: %{USER:query_user}(?:\[[^\]]+\])?\s+@\s+%{HOSTNAME:query_host}?\s+\[%{IP:query_ip}?\]" ]
}
grok {
match => [ "message", "^# Thread_id: %{NUMBER:thread_id:int}\s+Schema: %{USER:schema}\s+Last_errno: %{NUMBER:last_errno:int}\s+Killed: %{NUMBER:killed:int}"]
}
grok {
match => [ "message", "^# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time}\s+ Rows_sent: %{NUMBER:rows_sent:int} \s+Rows_examined: %{NUMBER:rows_examined:int}\s+Rows_affected: %{NUMBER:rows_affected:int}\s+Rows_read: %{NUMBER:rows_read:int}"]
}
grok { match => [ "message", "^# Bytes_sent: %{NUMBER:bytes_sent:float}"] }
grok { match => [ "message", "^SET timestamp=%{NUMBER:timestamp}" ] }
grok { match => [ "message", "^SET timestamp=%{NUMBER};\s+%{GREEDYDATA:query}" ] }
date { match => [ "timestamp", "UNIX" ] }
mutate {
remove_field => "timestamp"
}
}
}
output {
if [type] == "syslog" {
elasticsearch {
hosts => "172.16.0.228:9200"
manage_template => false
index => "rsyslog-%{+YYYY.MM.dd}"
}
}else if [type] == "filebeats" {
elasticsearch {
hosts => "172.16.0.228:9200"
manage_template => false
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
}
}
}
```
# 핵심 내역
- **Log DROP**
logstash.conf 설정에 여려 가지 조건들이 존재하지만, 이해를 돕기위해 핵심적인 내역만을 명시한다.
```
if "ZABBIXDB" in [message] {
drop { }
}else if "ZABBIX_DEMO" in [message] {
drop { }
}else if "zabbix" in [message] {
drop { }
}
```
위의 조건은 `현재 운용중인 Zabbix Monitoring 솔루션에 대한 audit 로그를 DROP하는 설정`이다. Zabbix는 지속적으로 등록된 Host를 Discovery하기 때문에 상당히 많은 쿼리가 쌓이게 되는데, 이는 예외처리하는 것이다.
- **ElasticSearch 전송**
```plaintext
if [type] == "syslog" {
elasticsearch {
hosts => "172.16.0.228:9200"
manage_template => false
index => "rsyslog-%{+YYYY.MM.dd}"
}
}else if [type] == "filebeats" {
elasticsearch {
hosts => "172.16.0.228:9200"
manage_template => false
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
}
}
```
> syslog 로의 type이면, Elasticsearch의 INDEX Pattern이 "rsyslog**- 로 시작되고, filebeats면 metadata 형식(filebeat-)으로 생성된다.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment