目的
SRXから出力されたsyslogを収集、フィールドを分離し分析/可視化するために
・Syslogサーバ:ログを収集する。
・Logstash:Syslogサーバ上のファイルからフィルタリングとラベル付け
・Elasticsearch:データのインデックス作成と保存
・kibana:データの可視化と分析
を行えるようにします。
構成
確認環境
・ELK環境 (Version:6.3)
・Syslogサーバ (flow.logというファイルにログ書き込みを行う)
・SRX (ログ分析対象のネットワークデバイス)
→通信全許可+UTMポリシ全許可+logオプション
前提条件
ELKが動作する環境は構築済みとする。
logstash設定
フィルタ作成用Syslogについて
以下のSRXから出力されるsyslogメッセージ2種類を解析して処理するフィルタ作成する。
■通信ポリシーログ(sample)
<14>1 2018-06-30T02:17:22.753Z fw0001 RT_FLOW - RT_FLOW_SESSION_CLOSE [junos@2636.1.1.1.2.41 reason="idle Timeout" source-address="192.168.255.2" source-port="62047" destination-address="8.8.8.8" destination-port="53" service-name="junos-dns-udp" nat-source-address="192.168.0.47" nat-source-port="20215" nat-destination-address="8.8.8.8" nat-destination-port="53" src-nat-rule-type="source rule" src-nat-rule-name="rule001" dst-nat-rule-type="N/A" dst-nat-rule-name="N/A" protocol-id="17" policy-name="trust-to-untrust-001" source-zone-name="trust" destination-zone-name="untrust" session-id-32="9621" packets-from-client="1" bytes-from-client="67" packets-from-server="1" bytes-from-server="116" elapsed-time="3" application="UNKNOWN" nested-application="UNKNOWN" username="N/A" roles="N/A" packet-incoming-interface="fe-0/0/1.0" encrypted="UNKNOWN"]
■UTMポリシログ(sample)
<12>1 2018-06-26T21:48:53.478Z fw0001 RT_UTM - WEBFILTER_URL_PERMITTED [junos@2636.1.1.1.2.41 source-address="192.168.255.2" source-port="50336" destination-address="59.106.194.16" destination-port="443" category="N/A" reason="BY_LOCAL_DEFAULT" profile="SNI-Profile" url="b.hatena.ne.jp" obj="/" username="N/A" roles="N/A"]
フィルター作成/設定
フィルター概要
■input
- path : syslogのファイルパス指定
- type : syslog
■filter
- grok条件
→上記のログ2種類がmatch条件に適用できる設定
→syslogメッセージのフィールドを分離するフィルタ作成)
- 宛先IPアドレスにGeoIP(位置情報を取得する)を適用させる。
■output
- elasticsearchのアドレス指定
- indexパターンを指定
Grokのmatch条件を作成するときに参考にしたサイト
logstash-patterns-core/grok-patterns at master · logstash-plugins/logstash-patterns-core · GitHub
→生ログに対してGrokをリアルタイムで判断しの解析するので作成時に役立ちます。
フィルターの内容
/etc/logstash/conf.d/01-srx-url.conf
input {
file {
path => "/var/log/flow.log"
type => "srx"
}
}
filter {
if [type] == "srx" {
grok {
match => [ "message", "%{DATA:label} %{TIMESTAMP_ISO8601:timestamp} %{DATA:sysloghost} %{DATA:firewall_category} %{WORD:flowtype}%{SPACE}\[%{DATA:emib} reason=\"%{DATA:reason}\" source-address=\"%{IPORHOST:source_address}\" source-port=\"%{DATA:source_port}\" destination-address=\"%{IPORHOST:destination_address}\" destination-port=\"%{DATA:destination_port}\" service-name=\"%{DATA:service_name}\" nat-source-address=\"%{IPORHOST:nat_source_address}\" nat-source-port=\"%{DATA:nat_source_port}\" nat-destination-address=\"%{IPORHOST:nat_destination_address}\" nat-destination-port=\"%{DATA:nat_destination_port}\" src-nat-rule-type=\"%{DATA:src_nat_rule_type}\" src-nat-rule-name=\"%{DATA:src_nat_rule_name}\" dst-nat-rule-type=\"%{DATA:dst_nat_rule_type}\" dst-nat-rule-name=\"%{DATA:dst_nat_rule_name}\" protocol-id=\"%{DATA:protocol_id}\" policy-name=\"%{DATA:policy_name}\" source-zone-name=\"%{DATA:source_zone_name}\" destination-zone-name=\"%{DATA:destination_zone_name}\" session-id-32=\"%{DATA:session_id_32}\" packets-from-client=\"%{DATA:packets_from_client}\" bytes-from-client=\"%{DATA:bytes_from_client}\" packets-from-server=\"%{DATA:packets_from_server}\" bytes-from-server=\"%{DATA:bytes_from_server}\" elapsed-time=\"%{DATA:elapsed_time}\" application=\"%{DATA:application}\" nested-application=\"%{DATA:nested_application}\" username=\"%{DATA:username}\" roles=\"%{DATA:roles}\" packet-incoming-interface=\"%{DATA:packet_incoming_interface}\" encrypted=\"%{DATA:encrypted}\"\]" ]
match => [ "message", "%{DATA:label} %{TIMESTAMP_ISO8601:timestamp} %{DATA:sysloghost} %{DATA:utm_firewall_category} %{WORD:utm_firewall_flowtype} \[%{DATA:utm_emib} source-address=\"%{IPORHOST:utm_source_address}\" source-port=\"%{DATA:utm_source_port}\" destination-address=\"%{IPORHOST:utm_destination_address}\" destination-port=\"%{DATA:utm_destination_port}\" category=\"%{DATA:utm_category}\" reason=\"%{DATA:utm_reason}\" profile=\"%{DATA:utm_profile}\" url=\"%{DATA:utm_url}\" obj=\"%{DATA:utm_obj}\" username=\"%{DATA:utm_username}\" roles=\"%{DATA:utm_roles}\"\]" ]
}
geoip {
source => "destination_address"
target => "dest_address_geoip"
}
geoip {
source => "utm_destination_address"
target => "utm_dest_address_geoip"
}
}
}
output {
if [type] == "srx" {
elasticsearch {
hosts => ["172.16.100.30:9200"]
index => "log-junoslogs-%{+YYYY.MM}"
}
}
}
match条件は、とりあえずフィールド分離だけを目的としてデータ型は適当なので、数値とか変更したり、環境に合わせたindex名に修正する必要ありました。
あと、messageフィールドは不要なら削除しても問題ないかも。
#elasticsearchのアドレスは環境ごとに修正が必要です。
logstash再起動
■再起動
# systemctl restart logstash
■起動時ログ確認
# tail -f /var/log/logstash/logstash-plain.log
→フィルタの文法や構文が間違えているとエラーで起動しないが、データ型指定が間違えていると正常に起動し、kibanaで確認したときにデータがフィルタされない
kibanaでelasticsearchのindex情報確認
①"Management" > "Index Management" を選択
②index情報の確認
→elasticsearchのindex情報が無事に取得できていることが分かる
kibanaで可視化
Indexパターンの取り込み(未実施の場合)
kibanaのダッシュボードで
①"Management" > "Index Patterns" > "Create Index Pattern"
②Index patternでフィルタのoutputで指定したindex名を入力
で、作成する。
Discoverでログ取得確認
→無事にsyslogを取り込むことができ、ログ情報やグラフが表示された。
ログの内容確認
取り込んだログから通信ポリシーとUTMポリシーの詳細情報を確認し、ラベル付与ができていることを確認
■通信ポリシーログ(DNSとの通信)
■UTMポリシーログ(www.yahoo.co.jpへのアクセス時)
無事にアドレスやポート情報、URLがラベルごとに分割されていることが分かる。
分離できたフィールドを使用した可視化
アクセス先のIPアドレス統計 , 接続先のサービス情報+送信元アドレス , 通信フロークローズ時の条件と宛先アドレス
色々と組み合わせでいろんな統計や使用者の傾向が見れるかも。
データ分析基盤構築入門[Fluentd,Elasticsearch,Kibanaによるログ収集と可視化]
- 作者: 鈴木健太,吉田健太郎,大谷純,道井俊介
- 出版社/メーカー: 技術評論社
- 発売日: 2017/09/21
- メディア: Kindle版
- この商品を含むブログ (1件) を見る