備忘録/にわかエンジニアが好きなように書く

個人的にとりあえず仕組みを知るためにとりあえず動くまで構築や動作をみただけの単なる操作ログです。個人用の備忘録となり、最新の導入手順は個別に確認してください。 ※変な内容や間違いを書いているなどありましたらコメントやご指摘いただけると幸いです。

ELK(Elasticsearch/Logstash/Kibana)を使用したSRXファイアウォールのログ分析

 

目的

SRXから出力されたsyslogを収集、フィールドを分離し分析/可視化するために

 ・Syslogサーバ:ログを収集する。
 ・Logstash:Syslogサーバ上のファイルからフィルタリングとラベル付け
 ・Elasticsearch:データのインデックス作成と保存
 ・kibana:データの可視化と分析

を行えるようにします。

構成

f:id:pocket01:20180630013815p:plain

確認環境

 ・ELK環境 (Version:6.3)

 ・Syslogサーバ (flow.logというファイルにログ書き込みを行う)

 ・SRX (ログ分析対象のネットワークデバイス)

        →通信全許可+UTMポリシ全許可+logオプション

前提条件

ELKが動作する環境は構築済みとする。

logstash設定

フィルタ作成用Syslogについて

以下のSRXから出力されるsyslogメッセージ2種類を解析して処理するフィルタ作成する。

■通信ポリシーログ(sample)
<14>1 2018-06-30T02:17:22.753Z fw0001 RT_FLOW - RT_FLOW_SESSION_CLOSE [junos@2636.1.1.1.2.41 reason="idle Timeout" source-address="192.168.255.2" source-port="62047" destination-address="8.8.8.8" destination-port="53" service-name="junos-dns-udp" nat-source-address="192.168.0.47" nat-source-port="20215" nat-destination-address="8.8.8.8" nat-destination-port="53" src-nat-rule-type="source rule" src-nat-rule-name="rule001" dst-nat-rule-type="N/A" dst-nat-rule-name="N/A" protocol-id="17" policy-name="trust-to-untrust-001" source-zone-name="trust" destination-zone-name="untrust" session-id-32="9621" packets-from-client="1" bytes-from-client="67" packets-from-server="1" bytes-from-server="116" elapsed-time="3" application="UNKNOWN" nested-application="UNKNOWN" username="N/A" roles="N/A" packet-incoming-interface="fe-0/0/1.0" encrypted="UNKNOWN"]

■UTMポリシログ(sample)
<12>1 2018-06-26T21:48:53.478Z fw0001 RT_UTM - WEBFILTER_URL_PERMITTED [junos@2636.1.1.1.2.41 source-address="192.168.255.2" source-port="50336" destination-address="59.106.194.16" destination-port="443" category="N/A" reason="BY_LOCAL_DEFAULT" profile="SNI-Profile" url="b.hatena.ne.jp" obj="/" username="N/A" roles="N/A"]

フィルター作成/設定 

フィルター概要

■input

 - path : syslogのファイルパス指定

 - type : syslog 

■filter

 - grok条件

  →上記のログ2種類がmatch条件に適用できる設定

  →syslogメッセージのフィールドを分離するフィルタ作成)

 - 宛先IPアドレスにGeoIP(位置情報を取得する)を適用させる。

■output

 - elasticsearchのアドレス指定

 - indexパターンを指定

 

Grokのmatch条件を作成するときに参考にしたサイト

logstash-patterns-core/grok-patterns at master · logstash-plugins/logstash-patterns-core · GitHub

Grok Debugger 

→生ログに対してGrokをリアルタイムで判断しの解析するので作成時に役立ちます。

フィルターの内容

/etc/logstash/conf.d/01-srx-url.conf

input {
file {
path => "/var/log/flow.log"
type => "srx"
}
}
filter {
if [type] == "srx" {
grok {
match => [ "message", "%{DATA:label} %{TIMESTAMP_ISO8601:timestamp} %{DATA:sysloghost} %{DATA:firewall_category} %{WORD:flowtype}%{SPACE}\[%{DATA:emib} reason=\"%{DATA:reason}\" source-address=\"%{IPORHOST:source_address}\" source-port=\"%{DATA:source_port}\" destination-address=\"%{IPORHOST:destination_address}\" destination-port=\"%{DATA:destination_port}\" service-name=\"%{DATA:service_name}\" nat-source-address=\"%{IPORHOST:nat_source_address}\" nat-source-port=\"%{DATA:nat_source_port}\" nat-destination-address=\"%{IPORHOST:nat_destination_address}\" nat-destination-port=\"%{DATA:nat_destination_port}\" src-nat-rule-type=\"%{DATA:src_nat_rule_type}\" src-nat-rule-name=\"%{DATA:src_nat_rule_name}\" dst-nat-rule-type=\"%{DATA:dst_nat_rule_type}\" dst-nat-rule-name=\"%{DATA:dst_nat_rule_name}\" protocol-id=\"%{DATA:protocol_id}\" policy-name=\"%{DATA:policy_name}\" source-zone-name=\"%{DATA:source_zone_name}\" destination-zone-name=\"%{DATA:destination_zone_name}\" session-id-32=\"%{DATA:session_id_32}\" packets-from-client=\"%{DATA:packets_from_client}\" bytes-from-client=\"%{DATA:bytes_from_client}\" packets-from-server=\"%{DATA:packets_from_server}\" bytes-from-server=\"%{DATA:bytes_from_server}\" elapsed-time=\"%{DATA:elapsed_time}\" application=\"%{DATA:application}\" nested-application=\"%{DATA:nested_application}\" username=\"%{DATA:username}\" roles=\"%{DATA:roles}\" packet-incoming-interface=\"%{DATA:packet_incoming_interface}\" encrypted=\"%{DATA:encrypted}\"\]" ]
match => [ "message", "%{DATA:label} %{TIMESTAMP_ISO8601:timestamp} %{DATA:sysloghost} %{DATA:utm_firewall_category} %{WORD:utm_firewall_flowtype} \[%{DATA:utm_emib} source-address=\"%{IPORHOST:utm_source_address}\" source-port=\"%{DATA:utm_source_port}\" destination-address=\"%{IPORHOST:utm_destination_address}\" destination-port=\"%{DATA:utm_destination_port}\" category=\"%{DATA:utm_category}\" reason=\"%{DATA:utm_reason}\" profile=\"%{DATA:utm_profile}\" url=\"%{DATA:utm_url}\" obj=\"%{DATA:utm_obj}\" username=\"%{DATA:utm_username}\" roles=\"%{DATA:utm_roles}\"\]" ]
}
geoip {
source => "destination_address"
target => "dest_address_geoip"
}
geoip {
source => "utm_destination_address"
target => "utm_dest_address_geoip"
}
}
}
output {
if [type] == "srx" {
elasticsearch {
hosts => ["172.16.100.30:9200"]
index => "log-junoslogs-%{+YYYY.MM}"
}
}
}

match条件は、とりあえずフィールド分離だけを目的としてデータ型は適当なので、数値とか変更したり、環境に合わせたindex名に修正する必要ありました。

あと、messageフィールドは不要なら削除しても問題ないかも。

#elasticsearchのアドレスは環境ごとに修正が必要です。

logstash再起動

■再起動
# systemctl restart logstash

■起動時ログ確認
# tail -f /var/log/logstash/logstash-plain.log
→フィルタの文法や構文が間違えているとエラーで起動しないが、データ型指定が間違えていると正常に起動し、kibanaで確認したときにデータがフィルタされない

kibanaでelasticsearchのindex情報確認

①"Management" > "Index Management" を選択

f:id:pocket01:20180630032642p:plain

②index情報の確認

f:id:pocket01:20180707221559p:plain

→elasticsearchのindex情報が無事に取得できていることが分かる

kibanaで可視化

Indexパターンの取り込み(未実施の場合)

kibanaのダッシュボードで

①"Management" > "Index Patterns" > "Create Index Pattern"

②Index patternでフィルタのoutputで指定したindex名を入力

で、作成する。

Discoverでログ取得確認

f:id:pocket01:20180630030014p:plain

 →無事にsyslogを取り込むことができ、ログ情報やグラフが表示された。

 

ログの内容確認

取り込んだログから通信ポリシーとUTMポリシーの詳細情報を確認し、ラベル付与ができていることを確認

■通信ポリシーログ(DNSとの通信)

f:id:pocket01:20180630025853p:plain

■UTMポリシーログ(www.yahoo.co.jpへのアクセス時)

f:id:pocket01:20180707221939p:plain

無事にアドレスやポート情報、URLがラベルごとに分割されていることが分かる。

 

分離できたフィールドを使用した可視化

アクセス先のIPアドレス統計 , 接続先のサービス情報+送信元アドレス , 通信フロークローズ時の条件と宛先アドレス 

色々と組み合わせでいろんな統計や使用者の傾向が見れるかも。

f:id:pocket01:20180630035155p:plain

 

 

データ分析基盤構築入門[Fluentd,Elasticsearch,Kibanaによるログ収集と可視化]

データ分析基盤構築入門[Fluentd,Elasticsearch,Kibanaによるログ収集と可視化]