2020-01-23

[ELK] Fortigate Firewall 로그 수집 및 분석 - Logstash filter


이번시간에는 ELK 를 활용하여  Fortigate 방화벽 로그를 수집하고 구성하는 방법에 대해서 알아보도록 하겠습니다.
 로그 수집시 중요한 것은 장비 밴더사나 로그 특성등에 따라서 형식이 다 다르기 때문에 수집한 로그에 대해서 적절한 필터링을 통해 정형화 해 주어야 로그 분석 및 탐지가 용이하게 됩니다. 
 여기서는 방화벽에서 ELK Stack으로 로그를 전송을 Logstash.conf 필터를 통해서 로그 데이터 파싱하고 인덱싱하는 부분에 대해서 살펴보도록 하겠습니다. .
1. Fortigate 방화벽 Syslog 설정.
  우선 Fortigate 방화벽 장비 CLI 접속하여 Logstash 서버로 로그 데이터를 전송하도록 설정을 해 줍니다.
  syslog 설정은 최대 4개까지 설정이 가능하며, Fortigate Web UI 에서는 첫번채 syslogd 만 세팅이 가능하여 중복설정등이 필요할 경우 CLI를 통해 설정하는 것을 추천 드립니다. 
config log syslogd2 setting
 set status enable
 set server "192.168.0.100"    
 set port 5514  
 저는 syslogd2에 설정하였으며, 로그 수집 서버 IP와 포트를 지정을 해 주고 enable 활성화 해 주시면 됩니다.
 syslog 포트는 default 514(udp) 인데 udp 514는 다른 로그 수집용 포트로 사용하고 있어서 구분을 위해서 5514로 지정하였습니다. 
  - syslog 설정 확인.
Fortigate # config log syslogd3 setting

Fortigate (setting) # show
config log syslogd3 setting
    set status enable
    set server "192.168.0.100"
    set port 5514
end

Fortigate (setting) # 

2. ELK 로그 서버 Logstash.conf 필터 설정
  방화벽에서 로그 전송 설정을 했으면 이제 Logstash.conf 설정 파일을 통해 로그를 수집하여 필터링 해 주시면 됩니다.
  logstash.conf 파일은 /etc/logstash/conf.d/logstash.conf 에 만들었으며 아래와 같은 형식으로 만들었습니다.  
Input
  - udp 플러그인을 사용.
  - 0.0.0.0 네트워크 바인딩.
  - 5514 포트 활성화
  - 로그 수집시 Tag 필드에 에 "FWlog" 생성. ( Kibana 대시보드 구성시 tag를 활용하여 로그 데이터 구분 )
input { 
    udp { 
        tags => "FWlog" 
        host => "0.0.0.0" 
        port =>  5514 
        #codec => "json" 
    } 
} 

Filter 
 -  필터에는 다양한 플러그인 이 있으며, 아래와 같이 grok, data, mutate, geoip 등 플러그인을 활용하여 로그 데이터 변환을 해 주었습니다. 
filter { 

    if "FWlog" in [tags] { 
        grok { 
            match => ["message", "%{SYSLOG5424PRI:syslog_index}%{GREEDYDATA:message}"] 
            overwrite => [ "message" ] 
        } 

        date { 
            match => ["timestamp" , "yyyy-MM-dd'T'HH:mm:ss.SSSZ"] 
            target => "@timestamp" 
            #add_field => { "debug" => "timestampMatched"} 
        } 

        kv { 
            source => "message" 
            exclude_keys => [ "type", "tags"] 
        } 

        geoip { source => "dst" } 
        geoip { source => "dstip" } 
        geoip { source => "src" } 
        geoip { source => "srcip" } 

        mutate { 
            rename => [ "dst", "dst_ip" ] 
            rename => [ "dstip", "dst_ip" ] 
            rename => [ "dstport", "dst_port" ] 
            rename => [ "devname", "device_id" ] 
            rename => [ "status", "action" ] 
            rename => [ "src", "src_ip" ] 
            rename => [ "srcip", "src_ip" ] 
            rename => [ "zone", "src_intf" ] 
            rename => [ "srcintf", "src_intf" ] 
            rename => [ "srcport", "src_port" ] 
            rename => [ "rcvd", "byte_recieved" ] 
            rename => [ "rcvdbyte", "bytes_recieved" ] 
            rename => [ "sentbyte", "bytes_sent" ] 
            rename => [ "sent", "bytes_sent" ] 
            rename => [ "cpu", "cpu_usage" ] 
            rename => [ "mem", "mem_usage" ] 
            rename => [ "disk", "disk_usage" ] 
            convert => ["bytes_recieved", "integer"] 
            convert => ["bytes_sent", "integer"] 
            convert => ["cpu_usage", "integer"] 
            convert => ["mem_usage", "integer"] 
            convert => ["disk_usage", "integer"] 
            convert => ["disklograte", "integer"] 
        } 
    } 
} 

Output

 - 필터링된 데이터를 elasticsearch로 전송해 주는 설정입니다.
  - 로그 전송시 index를 day로 구분하도록 하였으며, 인덱스를 분산해 주시면 Elasticsearch에서 로그 검색 및 분석 성능 향상에 도움이 됩니다. 
output { 

       if "FWlog" in [tags] { 
           elasticsearch { 
             hosts => ["localhost:9200"] 
             index => "fwlog-%{+YYYY.MM.dd}" 
           } 
           stdout { codec => rubydebug } 
       } 
}

3. Logstash 시작
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash.conf  --path.settings /etc/logstash &

4. Kibana 접속 및 로그 확인. 
 - Kibana 접속 후 discover 항목을 보면 아래 와 같이 fields(column) 로 구분되어 데이터가 수집이 되는 것을 확인 할 수 있습니다. 

댓글 없음:

댓글 쓰기

Elasticsearch Heap Size

  Elasticsearch는 Java 기반으로 동작을 합니다. Java는 가비시 컬렉터 (garbage-collected)에 의해서 관리가 되며, Java 객체는 힙(Heap) 이라고하는 메모리의 런타임 영역에 상주합니다.  Elasticsear...