2021年1月9日土曜日

fluentd -> kafka -> logstash -> elasticsearch の順番でログを格納する方法

概要

fluentd -> kafka -> logstash -> elasticsearch という順番でログを確認する方法を紹介します
fluentd, kafka, elasticsearch の構築方法は過去に紹介しているので別記事を参照しています

環境

  • kafka 2.6.0
  • logstash 7.10.1
  • elasticsearch 6.4.0

fluent-kafka-plugin が動作する環境の構築

こちらを参考に構築してください
kafka と fluent-kafka-plugin がインストールされた fluent コンテナが起動している状態になれば OK です

elasticsearch の構築

こちらを参考に構築してください
9200 ポートで elasticsearch にアクセスできれば OK です

logstash のインストール

  • brew install logstash

logstash kafka input plugin のインストールと設定

ここが今回の肝になる部分です
logstash-integration-kafka を使います

  • logstash-plugin install logstash-integration-kafka

インストールが完了したら設定ファイルを作成していきます
kafka からデータを受け取って elasticsearch に流す設定を定義します

  • cp /usr/local/etc/logstash/logstash-sample.conf /usr/local/etc/logstash/logstash.conf
  • vim /usr/local/etc/logstash/logstash.conf
input {
  kafka {
    bootstrap_servers => "192.168.1.2:9092"
    topics => ["test"]
    decorate_events => true
  }
}

filter {
  json {
    source => "message"
  }
}

output {
  elasticsearch {
    hosts => ["http://192.168.1.2:9200"]
    index => "%{[@metadata][kafka][topic]}-%{+YYYY.MM.dd}"
  }
}


decorate_events を true にしないと [@metadata][kafka][topic] が output セクションで使えないので true にしています

また filter で json を使っています
どうやらデフォルトでは kafka から受け取ったログはすべて「message」というフィールドに格納されています
なのでそれぞれのフィールドに分割するために filter を挟んでいます

  • logstash -f /usr/local/etc/logstash/logstash.conf

起動に少し時間がかかりますが ERROR がでなければ OK です

動作確認

まずは適当なログを fluentd コンテナに送ります

  • docker run --rm --log-driver=fluentd --log-opt fluentd-address=192.168.1.2:24224 --log-opt tag="docker.{{.Name}}" alpine /bin/sh -c "while :;do echo \"{\\\"timestamp\\\":\\\"$(date)\\\",\\\"msg\\\":\\\"hello\\\"}\"; sleep 3; done;"


ある程度待った後に elasticsearch にインデックスが作成されているか確認しましょう

  • curl 'http://192.168.1.2:9200/_cat/indices?v'
  • curl 'http://192.168.1.2:9200/test-2021.01.06?pretty'

最後に

必ずしも kafka は必要ではないですがスケールによっては必要になります

参考サイト

0 件のコメント:

コメントを投稿