概要
fluentd -> kafka -> logstash -> elasticsearch という順番でログを確認する方法を紹介します
fluentd, kafka, elasticsearch の構築方法は過去に紹介しているので別記事を参照しています
環境
- kafka 2.6.0
- logstash 7.10.1
- elasticsearch 6.4.0
fluent-kafka-plugin が動作する環境の構築
こちらを参考に構築してください
kafka と fluent-kafka-plugin がインストールされた fluent コンテナが起動している状態になれば OK です
elasticsearch の構築
こちらを参考に構築してください
9200 ポートで elasticsearch にアクセスできれば OK です
logstash のインストール
brew install logstash
logstash kafka input plugin のインストールと設定
ここが今回の肝になる部分です
logstash-integration-kafka を使います
logstash-plugin install logstash-integration-kafka
インストールが完了したら設定ファイルを作成していきます
kafka からデータを受け取って elasticsearch に流す設定を定義します
cp /usr/local/etc/logstash/logstash-sample.conf /usr/local/etc/logstash/logstash.conf
vim /usr/local/etc/logstash/logstash.conf
input {
kafka {
bootstrap_servers => "192.168.1.2:9092"
topics => ["test"]
decorate_events => true
}
}
filter {
json {
source => "message"
}
}
output {
elasticsearch {
hosts => ["http://192.168.1.2:9200"]
index => "%{[@metadata][kafka][topic]}-%{+YYYY.MM.dd}"
}
}
decorate_events を true にしないと [@metadata][kafka][topic]
が output セクションで使えないので true にしています
また filter で json を使っています
どうやらデフォルトでは kafka から受け取ったログはすべて「message」というフィールドに格納されています
なのでそれぞれのフィールドに分割するために filter を挟んでいます
logstash -f /usr/local/etc/logstash/logstash.conf
起動に少し時間がかかりますが ERROR がでなければ OK です
動作確認
まずは適当なログを fluentd コンテナに送ります
docker run --rm --log-driver=fluentd --log-opt fluentd-address=192.168.1.2:24224 --log-opt tag="docker.{{.Name}}" alpine /bin/sh -c "while :;do echo \"{\\\"timestamp\\\":\\\"$(date)\\\",\\\"msg\\\":\\\"hello\\\"}\"; sleep 3; done;"
ある程度待った後に elasticsearch にインデックスが作成されているか確認しましょう
curl 'http://192.168.1.2:9200/_cat/indices?v'
curl 'http://192.168.1.2:9200/test-2021.01.06?pretty'
最後に
必ずしも kafka は必要ではないですがスケールによっては必要になります
0 件のコメント:
コメントを投稿