備忘録/にわかエンジニアが好きなように書く

個人的にとりあえず仕組みを知るために、触りたように好きにとりあえず動くような構築してみる 個人用の備忘録となるので内容の保証はないのでその点はご了承ください。 ※変な内容や間違いを書いているなどありましたらコメントやご指摘いただけると幸いです。

RTX1210ログをFluentd/Elasticsearch/Kibanaで可視化

目的 

RTX1210のパケットフィルタ機能で出力されたSyslogを可視化してみる。

#ファイアーウォールではないのでフロー単位では確認できない。
また、グローバルからのアクセス元アドレスをfluentdのプラグイン(GeoIP)を使用して位置情報を取得してみる。

構成

f:id:pocket01:20190131062756p:plain

接続イメージ

f:id:pocket01:20190131062928p:plain

※最終的なkibanaでの画面1※

f:id:pocket01:20190203140435p:plain

※最終的なkibanaでの画面2※

フィルター条件で、Passしたものを抽出したダッシュボード

f:id:pocket01:20190203140540p:plain 

環境

Fluentd(td-agent) 

+syslogサーバとして使用

1.2.6
elasticsearch  6.5.4
kibana 6.5.4

RTX1210のSyslog設定

Syslog設定

GUIの管理画面からfacility設定ができないためコマンドにより設定する。

syslog host 172.16.100.231
syslog facility local7
syslog notice on

 ※GUIからのSyslog画面

f:id:pocket01:20190131051730p:plain

出力確認

 syslogサーバでSyslog確認を受信していることを確認する。

# tail -f syslog.log
Jan 21 06:02:21 192.168.180.2 [INSPECT] LAN1[out][100] TCP ◆.●.■.▲:56460 > 192.168.181.10:22 (2019/02/01 07:29:15)
Jan 21 06:02:21 192.168.180.2 [INSPECT] LAN2[in][100] TCP ◆.●.■.▲:56460 > 192.168.181.10:22 (2019/02/01 07:29:15)
Jan 21 06:02:21 LAN2 Passed at IN(1000) filter: TCP ◆.●.■.▲:62318 > 192.168.181.10:2200
Jan 21 06:02:21 LAN1 Passed at OUT(1000) filter: TCP ◆.●.■.▲:62318 > 192.168.181.10:2200
Jan 21 06:02:21 LAN2 Passed at IN(1000) filter: TCP ◆.●.■.▲:53996 > 192.168.181.10:22

 ※◆.●.■.▲:グローバルIPアドレス

IPフィルター設定(LAN2)

全ての条件で"ログ出力あり"として受信側(IN)と送信側(OUT)で設定とした。

  - Reject:ssh(22) , telnet(23) , http(80) , https(443) ,5060

  - Pass : 残りすべて許可

f:id:pocket01:20190203121353p:plain

Fluentd設定

以下のRTX1210ログ(以下の3タイプ)をFluentdでログ加工を行うような設定となる。

※今回のログは、全てLAN2のインターフェースログとなるためVPNやPP関連でfileterさせないログは多数あるので環境に合わせたfilter条件作成が必要になる。

filterタイプ①

Jan 21 06:02:36 LAN2 Passed at IN(1000) filter: TCP ◆.●.■.▲:47787 > 192.168.181.10:18056

 ※◆.●.■.▲:グローバルIPアドレス

フィルター方法
キー
time

2019/01/21 06:02:36 +0000

target LAN2
filter Passed
direction IN
filter_num 1000
proto TCP
src_ip ◆.●.■.▲
src_port 47787
dest_ip 192.168.181.10
dest_port 18056

filterタイプ②

Feb 3 07:53:35 LAN2 Passed at IN(101100) filter: ICMP ◆.●.■.▲ > 192.168.181.10 : unreachable other

 ※◆.●.■.▲:グローバルIPアドレス

フィルター方法
キー
time 2019/02/03 07:53:35 +0000
target LAN2
filter Passed
direction IN
filter_num 101100
proto ICMP
src_ip ◆.●.■.▲
dest_ip 192.168.181.10
display_type unreachable other

filterタイプ③ ※今回はデータ取得のみ※

Feb 3 04:12:28 192.168.180.2 [INSPECT] LAN2[out][100] TCP 192.168.181.10:39294 > ◆.●.■.▲:443 (2019/02/03 04:12:24)

 ※◆.●.■.▲:グローバルIPアドレス

フィルター方法
キー
time 2019/02/03 04:12:28 +0000
host 192.168.180.2
logtype INSPECT
target LAN2
direction out
filter_num 100
in_proto TCP
in_src_ip 192.168.181.10
in_src_port 39294
in_dest_ip ◆.●.■.▲
in_dest_port 443
in_time 2019/2/3 4:12

 

プラグイン追加

GeoIP関連

以下の内容を参考にプラグインと関連パッケージ追加を実施

github.com

 

fluentd設定ファイル修正

上記のフィルター方法に合わせてFilterの正規表現を作成

#YAMAHA RTX-log
<source>
  @type tail
  @id input_rtx_syslog
  path /var/log/yamaha/syslog.log
  pos_file /var/log/yamaha/rtx.log.pos
  tag rtx.syslog.raw
  <parse>
  @type multi_format
  #Flow
   <pattern>
      format /^(?<time>[^ ]* {1,2}[^ ]* [^ ]*) *(?<host>.+) \[(?<logtype>.+)\] (?<target>[^ ]*) *\[(?<direction>.+)\]\[(?<filter_num>\d+)\] (?<in_proto>[^ ]*) *(?<in_src_ip>[^ ]*):(?<in_src_port>.+) > *(?<in_dest_ip>[^ ]*):(?<in_dest_port>.+) \((?<in_time>.+)\)/
      time_format %b %d %H:%M:%S
   </pattern>
   <pattern>
      format /^(?<time>[^ ]* {1,2}[^ ]* [^ ]*) *(?<target>[^ ]*) (?<filter>.+) at (?<direction>.+)\((?<filter_num>\d+)\) filter: (?<proto>[^ ]*) *(?<src_ip>[^ ]*):(?<src_port>.+) > *(?<dest_ip>[^ ]*):(?<dest_port>.+)/
      time_format %b %d %H:%M:%S
   </pattern>
   <pattern>
      format /^(?<time>[^ ]* {1,2}[^ ]* [^ ]*) *(?<target>[^ ]*) (?<filter>.+) at (?<direction>.+)\((?<filter_num>\d+)\) filter: (?<proto>[^ ]*) *(?<src_ip>[^ ]*) > *(?<dest_ip>[^ ]*) : (?<display_type>.+)/
      time_format %b %d %H:%M:%S
   </pattern>
   #other
  <pattern>
      format /^(?<time>[^ ]* {1,2}[^ ]* [^ ]*) *(?<message>.*)/
        time_format %b %d %H:%M:%S
   </pattern>
  </parse>
</source>

#rewrite
## GeoIP
<match rtx.syslog.raw>
  @type rewrite_tag_filter
  #rewriteruleN <key> <pattern> <new_tag>
  <rule>
  key filter
  pattern ^Passed$
tag rtx.syslog.geo
</rule>
<rule>
key filter
pattern ^Rejected$
tag rtx.syslog.geo
</rule>
<rule>
key logtype
pattern ^INSPECT$
tag rtx.syslog.geo.inspect
</rule>
</match>

# match tag=rtx YAMAHA(RTX)-log
## GeoIP filter1
<filter rtx.syslog.geo>
@type geoip
geoip_lookup_keys src_ip
backend_library geoip2_c

<record>
location_properties '{ "lat" : ${location.latitude["src_ip"]}, "lon" : ${location.longitude["src_ip"]} }'
location_string ${location.latitude["src_ip"]},${location.longitude["src_ip"]}
location_array '[${location.longitude["src_ip"]},${location.latitude["src_ip"]}]'
city ${city.names.en["src_ip"]}
country ${country.code["src_ip"]}
country_name ${country.names.en["src_ip"]}
postal_code ${postal.code["src_ip"]}
</record>
skip_adding_null_record true
</filter>

## GeoIP filter2
<filter rtx.syslog.geo.inspect>
@type geoip
geoip_lookup_keys in_dest_ip
backend_library geoip2_c

<record>
location_properties '{ "lat" : ${location.latitude["in_dest_ip"]}, "lon" : ${location.longitude["in_dest_ip"]} }'
location_string ${location.latitude["in_dest_ip"]},${location.longitude["in_dest_ip"]}
location_array '[${location.longitude["in_dest_ip"]},${location.latitude["in_dest_ip"]}]'
city ${city.names.en["in_dest_ip"]}
country ${country.code["in_dest_ip"]}
country_name ${country.names.en["in_dest_ip"]}
postal_code ${postal.code["in_dest_ip"]}
</record>
skip_adding_null_record true
</filter>

## Send to Elasticsearch
## match tag rtx.syslog.**
<match rtx.syslog.**>
@type elasticsearch
host 172.16.100.211
port 9200
index_name rtx_syslog
logstash_prefix yamahalog
type_name ylog
logstash_format true
<buffer tag>
@type memory # or file
flush_mode interval
retry_type exponential_backoff
flush_thread_count 5
flush_interval 1s
retry_forever
retry_max_interval 30
chunk_limit_size 256M
queue_limit_length 4096
overflow_action block
</buffer>
</match>

 ※matchに関しての記述は、設定の意味や動作がよくわっていないので適当に動いていそうな設定を流用して(IPやtag名のみ修正とした)いいるだけ。bufferのタイプを"memory"としているがリアルタイム性や速度等は求めていないのでタイプはfile のほうがよさそう。 

 

elasticsearch設定

(重要)日付毎の名前でIndex作成し登録しているキーに紐づく型(Type)情報で、fluentdから届く値が数値のみの場合だと、Elasticsearchで自動判定している型がFloat , text などが変化し値が取得できない日付のIndexが出てきたため、Type設定は極力全てのキーで行ったほうが無難かもしれない。

 

GeoIPを使用してマップ化するために elasticsearch側のINDEX typeを修正する。

elasticsearchへ putを行いIndex typeを登録するが、この方法が通所のMapping変更の方法となるのかは不明。

curl -XPUT http://172.16.100.211:9200/_template/yamahalog -H 'Content-Type: application/json' -d'{
"template": "yamahalog-*",
"mappings": {
"_def_": {
"numeric_detection" : true,
"properties": {
"location_properties": { "type": "geo_point" }
}
}
}
}'

※他のキーのタイプでelasticsearchで自動でタイプ判定するらいしい。

※今回は、"Date" , "text"(string) , "geo_poing"の3タイプで問題ないがポート番号等のフィールドのタイプは数値側とことなることがあったので、その場合は手動によりタイプ登録を"float" -> "text"に変更を実施している。

www.elastic.co

 

discuss.elastic.co

kibana確認

表示確認

Discover

f:id:pocket01:20190203133756p:plain

Index情報

 とりあえずlocation_propertiesのtypeが "geo_point"であることが確認できた

f:id:pocket01:20190203134144p:plain

 

あとは、いろいろな条件での一覧取得やグラフ化(上位数件の表示など)を行い、

 -visualization作成で WAN側⇒内部 / WAN側⇒内部 の Pass / Reject した一覧

 -フィルター番号と該当ポート(IN/OUT)でのグラフ化

 -国別マップを作成

グラフ化した情報をもとにダッシュボードの作成を行えば、導入直後の可視化まで一通り終了と思う。

適宜必要に応じて必要な情報の可視化用でvisualization作成やフィルター条件修正を行いつつ、アラート連携、不正なアクセスと思われるものを抽出しつつRTXへの破棄フィルター条件の追加していくといいかな。

 

セキュリティのためのログ分析入門 サイバー攻撃の痕跡を見つける技術 (Software Design plusシリーズ)

セキュリティのためのログ分析入門 サイバー攻撃の痕跡を見つける技術 (Software Design plusシリーズ)

  • 作者: 折原慎吾,鐘本楊,神谷和憲,松橋亜希子,阿部慎司,永井信弘,羽田大樹,朝倉浩志,田辺英昭
  • 出版社/メーカー: 技術評論社
  • 発売日: 2018/09/07
  • メディア: 単行本(ソフトカバー)
  • この商品を含むブログを見る
 
データ分析基盤構築入門[Fluentd、Elasticsearch、Kibanaによるログ収集と可視化]

データ分析基盤構築入門[Fluentd、Elasticsearch、Kibanaによるログ収集と可視化]