目的
RTX1210のパケットフィルタ機能で出力されたSyslogを可視化してみる。
#ファイアーウォールではないのでフロー単位では確認できない。
また、グローバルからのアクセス元アドレスをfluentdのプラグイン(GeoIP)を使用して位置情報を取得してみる。
構成
接続イメージ
※最終的なkibanaでの画面1※
※最終的なkibanaでの画面2※
フィルター条件で、Passしたものを抽出したダッシュボード
環境
Fluentd(td-agent) +syslogサーバとして使用 |
1.2.6 |
elasticsearch | 6.5.4 |
kibana | 6.5.4 |
RTX1210のSyslog設定
Syslog設定
GUIの管理画面からfacility設定ができないためコマンドにより設定する。
syslog host 172.16.100.231
syslog facility local7
syslog notice on
※GUIからのSyslog画面
出力確認
syslogサーバでSyslog確認を受信していることを確認する。
# tail -f syslog.log
Jan 21 06:02:21 192.168.180.2 [INSPECT] LAN1[out][100] TCP ◆.●.■.▲:56460 > 192.168.181.10:22 (2019/02/01 07:29:15)
Jan 21 06:02:21 192.168.180.2 [INSPECT] LAN2[in][100] TCP ◆.●.■.▲:56460 > 192.168.181.10:22 (2019/02/01 07:29:15)
Jan 21 06:02:21 LAN2 Passed at IN(1000) filter: TCP ◆.●.■.▲:62318 > 192.168.181.10:2200
Jan 21 06:02:21 LAN1 Passed at OUT(1000) filter: TCP ◆.●.■.▲:62318 > 192.168.181.10:2200
Jan 21 06:02:21 LAN2 Passed at IN(1000) filter: TCP ◆.●.■.▲:53996 > 192.168.181.10:22
※◆.●.■.▲:グローバルIPアドレス
IPフィルター設定(LAN2)
全ての条件で"ログ出力あり"として受信側(IN)と送信側(OUT)で設定とした。
- Reject:ssh(22) , telnet(23) , http(80) , https(443) ,5060
- Pass : 残りすべて許可
Fluentd設定
以下のRTX1210ログ(以下の3タイプ)をFluentdでログ加工を行うような設定となる。
※今回のログは、全てLAN2のインターフェースログとなるためVPNやPP関連でfileterさせないログは多数あるので環境に合わせたfilter条件作成が必要になる。
filterタイプ①
Jan 21 06:02:36 LAN2 Passed at IN(1000) filter: TCP ◆.●.■.▲:47787 > 192.168.181.10:18056
※◆.●.■.▲:グローバルIPアドレス
フィルター方法
キー | 値 |
time |
2019/01/21 06:02:36 +0000 |
target | LAN2 |
filter | Passed |
direction | IN |
filter_num | 1000 |
proto | TCP |
src_ip | ◆.●.■.▲ |
src_port | 47787 |
dest_ip | 192.168.181.10 |
dest_port | 18056 |
filterタイプ②
Feb 3 07:53:35 LAN2 Passed at IN(101100) filter: ICMP ◆.●.■.▲ > 192.168.181.10 : unreachable other
※◆.●.■.▲:グローバルIPアドレス
フィルター方法
キー | 値 |
time | 2019/02/03 07:53:35 +0000 |
target | LAN2 |
filter | Passed |
direction | IN |
filter_num | 101100 |
proto | ICMP |
src_ip | ◆.●.■.▲ |
dest_ip | 192.168.181.10 |
display_type | unreachable other |
filterタイプ③ ※今回はデータ取得のみ※
Feb 3 04:12:28 192.168.180.2 [INSPECT] LAN2[out][100] TCP 192.168.181.10:39294 > ◆.●.■.▲:443 (2019/02/03 04:12:24)
※◆.●.■.▲:グローバルIPアドレス
フィルター方法
キー | 値 |
time | 2019/02/03 04:12:28 +0000 |
host | 192.168.180.2 |
logtype | INSPECT |
target | LAN2 |
direction | out |
filter_num | 100 |
in_proto | TCP |
in_src_ip | 192.168.181.10 |
in_src_port | 39294 |
in_dest_ip | ◆.●.■.▲ |
in_dest_port | 443 |
in_time | 2019/2/3 4:12 |
プラグイン追加
GeoIP関連
以下の内容を参考にプラグインと関連パッケージ追加を実施
fluentd設定ファイル修正
上記のフィルター方法に合わせてFilterの正規表現を作成
#YAMAHA RTX-log
<source>
@type tail
@id input_rtx_syslog
path /var/log/yamaha/syslog.log
pos_file /var/log/yamaha/rtx.log.pos
tag rtx.syslog.raw
<parse>
@type multi_format
#Flow
<pattern>
format /^(?<time>[^ ]* {1,2}[^ ]* [^ ]*) *(?<host>.+) \[(?<logtype>.+)\] (?<target>[^ ]*) *\[(?<direction>.+)\]\[(?<filter_num>\d+)\] (?<in_proto>[^ ]*) *(?<in_src_ip>[^ ]*):(?<in_src_port>.+) > *(?<in_dest_ip>[^ ]*):(?<in_dest_port>.+) \((?<in_time>.+)\)/
time_format %b %d %H:%M:%S
</pattern>
<pattern>
format /^(?<time>[^ ]* {1,2}[^ ]* [^ ]*) *(?<target>[^ ]*) (?<filter>.+) at (?<direction>.+)\((?<filter_num>\d+)\) filter: (?<proto>[^ ]*) *(?<src_ip>[^ ]*):(?<src_port>.+) > *(?<dest_ip>[^ ]*):(?<dest_port>.+)/
time_format %b %d %H:%M:%S
</pattern>
<pattern>
format /^(?<time>[^ ]* {1,2}[^ ]* [^ ]*) *(?<target>[^ ]*) (?<filter>.+) at (?<direction>.+)\((?<filter_num>\d+)\) filter: (?<proto>[^ ]*) *(?<src_ip>[^ ]*) > *(?<dest_ip>[^ ]*) : (?<display_type>.+)/
time_format %b %d %H:%M:%S
</pattern>
#other
<pattern>
format /^(?<time>[^ ]* {1,2}[^ ]* [^ ]*) *(?<message>.*)/
time_format %b %d %H:%M:%S
</pattern>
</parse>
</source>
#rewrite
## GeoIP
<match rtx.syslog.raw>
@type rewrite_tag_filter
#rewriteruleN <key> <pattern> <new_tag>
<rule>
key filter
pattern ^Passed$
tag rtx.syslog.geo
</rule>
<rule>
key filter
pattern ^Rejected$
tag rtx.syslog.geo
</rule>
<rule>
key logtype
pattern ^INSPECT$
tag rtx.syslog.geo.inspect
</rule>
</match>
# match tag=rtx YAMAHA(RTX)-log
## GeoIP filter1
<filter rtx.syslog.geo>
@type geoip
geoip_lookup_keys src_ip
backend_library geoip2_c
<record>
location_properties '{ "lat" : ${location.latitude["src_ip"]}, "lon" : ${location.longitude["src_ip"]} }'
location_string ${location.latitude["src_ip"]},${location.longitude["src_ip"]}
location_array '[${location.longitude["src_ip"]},${location.latitude["src_ip"]}]'
city ${city.names.en["src_ip"]}
country ${country.code["src_ip"]}
country_name ${country.names.en["src_ip"]}
postal_code ${postal.code["src_ip"]}
</record>
skip_adding_null_record true
</filter>
## GeoIP filter2
<filter rtx.syslog.geo.inspect>
@type geoip
geoip_lookup_keys in_dest_ip
backend_library geoip2_c
<record>
location_properties '{ "lat" : ${location.latitude["in_dest_ip"]}, "lon" : ${location.longitude["in_dest_ip"]} }'
location_string ${location.latitude["in_dest_ip"]},${location.longitude["in_dest_ip"]}
location_array '[${location.longitude["in_dest_ip"]},${location.latitude["in_dest_ip"]}]'
city ${city.names.en["in_dest_ip"]}
country ${country.code["in_dest_ip"]}
country_name ${country.names.en["in_dest_ip"]}
postal_code ${postal.code["in_dest_ip"]}
</record>
skip_adding_null_record true
</filter>
## Send to Elasticsearch
## match tag rtx.syslog.**
<match rtx.syslog.**>
@type elasticsearch
host 172.16.100.211
port 9200
index_name rtx_syslog
logstash_prefix yamahalog
type_name ylog
logstash_format true
<buffer tag>
@type memory # or file
flush_mode interval
retry_type exponential_backoff
flush_thread_count 5
flush_interval 1s
retry_forever
retry_max_interval 30
chunk_limit_size 256M
queue_limit_length 4096
overflow_action block
</buffer>
</match>
※matchに関しての記述は、設定の意味や動作がよくわっていないので適当に動いていそうな設定を流用して(IPやtag名のみ修正とした)いいるだけ。bufferのタイプを"memory"としているがリアルタイム性や速度等は求めていないのでタイプはfile のほうがよさそう。
elasticsearch設定
(重要)日付毎の名前でIndex作成し登録しているキーに紐づく型(Type)情報で、fluentdから届く値が数値のみの場合だと、Elasticsearchで自動判定している型がFloat , text などが変化し値が取得できない日付のIndexが出てきたため、Type設定は極力全てのキーで行ったほうが無難かもしれない。
GeoIPを使用してマップ化するために elasticsearch側のINDEX typeを修正する。
elasticsearchへ putを行いIndex typeを登録するが、この方法が通所のMapping変更の方法となるのかは不明。
curl -XPUT http://172.16.100.211:9200/_template/yamahalog -H 'Content-Type: application/json' -d'{
"template": "yamahalog-*",
"mappings": {
"_def_": {
"numeric_detection" : true,
"properties": {
"location_properties": { "type": "geo_point" }
}
}
}
}'
※他のキーのタイプでelasticsearchで自動でタイプ判定するらいしい。
※今回は、"Date" , "text"(string) , "geo_poing"の3タイプで問題ないがポート番号等のフィールドのタイプは数値側とことなることがあったので、その場合は手動によりタイプ登録を"float" -> "text"に変更を実施している。
kibana確認
表示確認
Discover
Index情報
とりあえずlocation_propertiesのtypeが "geo_point"であることが確認できた
あとは、いろいろな条件での一覧取得やグラフ化(上位数件の表示など)を行い、
-visualization作成で WAN側⇒内部 / WAN側⇒内部 の Pass / Reject した一覧
-フィルター番号と該当ポート(IN/OUT)でのグラフ化
-国別マップを作成
グラフ化した情報をもとにダッシュボードの作成を行えば、導入直後の可視化まで一通り終了と思う。
適宜必要に応じて必要な情報の可視化用でvisualization作成やフィルター条件修正を行いつつ、アラート連携、不正なアクセスと思われるものを抽出しつつRTXへの破棄フィルター条件の追加していくといいかな。
セキュリティのためのログ分析入門 サイバー攻撃の痕跡を見つける技術 (Software Design plusシリーズ)
- 作者: 折原慎吾,鐘本楊,神谷和憲,松橋亜希子,阿部慎司,永井信弘,羽田大樹,朝倉浩志,田辺英昭
- 出版社/メーカー: 技術評論社
- 発売日: 2018/09/07
- メディア: 単行本(ソフトカバー)
- この商品を含むブログを見る
データ分析基盤構築入門[Fluentd、Elasticsearch、Kibanaによるログ収集と可視化]
- 作者: 鈴木健太,吉田健太郎,大谷純,道井俊介
- 出版社/メーカー: 技術評論社
- 発売日: 2017/09/21
- メディア: 単行本(ソフトカバー)
- この商品を含むブログ (2件) を見る