概要
fluentd から kafka にメッセージを送信することができるプラグインがあるので使ってみました
kafka の構築に関してはこちらを参考にしてください
また今回 fluentd はコンテナで動作させます
環境
- macOS 10.15.7
- fluentd
- kafka 2.6.0
事前準備
kafka と zookeeper を起動させておきましょう
brew services start zookeeper
brew services start kafka
fluent-plugin-kafka がインストールされたイメージの作成
fluent/fluentd には fluent-plugin-kafka がインストールされていないのでインストールされているイメージを作成します
vim Dockerfile
FROM fluent/fluentd
RUN apk add --update --virtual .build-deps \
sudo build-base ruby-dev \
&& sudo gem install \
fluent-plugin-kafka zookeeper \
&& sudo gem sources --clear-all \
&& apk del .build-deps \
&& rm -rf /var/cache/apk/* \
/home/fluent/.gem/ruby/*/cache/*.gem
docker build -t my_fluentd .
fluent.conf の作成
次に作成した fluentd イメージ上で動作させる設定ファイルを作成します
コンテナを作成する場合にホストマシン上のファイルをマウントして動作させます
今回はわかりやすいように copy を使って kafka にログを流すのと同時に fluentd コンテナの標準出力にもログを出しています
vim fluent.conf
<source>
@type forward
port 24224
bind 0.0.0.0
</source>
<match docker.**>
@type copy
<store>
@type stdout
</store>
<store>
@type kafka2
brokers 192.168.1.2:9092
zookeeper 192.168.1.2:2181
default_topic test
<format>
@type json
</format>
</store>
</match>
fluentd コンテナの起動
作成したイメージと設定ファイルを使ってコンテナ起動します
問題なくコンテナが起動していることを確認しましょう
docker run -d -p 24224:24224 -p 24224:24224/udp -v $(pwd):/fluentd/etc -e FLUENTD_CONF=fluent.conf my_fluentd
動作確認用コンテナの作成
何でも OK です
今回は JSON の情報を echo で 3 秒おきに出力するコンテナにしています
ロギングドライバだけ fluentd を指定しましょう
docker run --rm --log-driver=fluentd --log-opt fluentd-address=192.168.1.2:24224 --log-opt tag="docker.{{.Name}}" alpine /bin/sh -c "while :;do echo \"{\"timestamp\":\"$(date)\"}\"; sleep 3; done;"
動作確認
kafka-console-consumer
を使って fluentd からログが飛んできているか確認しましょう
fluentd がデフォルトで 10s バッファするのでログが飛んでくるのは 10s おきになっているのが確認できると思います
kafka-console-consumer --bootstrap-server 192.168.1.2:9092 --topic test --from-beginning
また fluentd コンテナで logs を確認しても良いと思います
そもそも fluentd コンテナに出力されていない場合は kafka にも当然ログは飛んできません
トラブルシューティング
zookeeper が localhost でしか LISTEN していない場合には設定ファイルを編集しましょう
vim /usr/local/etc/kafka/zookeeper.properties
clientPort=2181
clientPortAddress=192.168.1.2
最後に
fluent-plugin-kafka を使ってみました
今回は Output プラグインを使いましたが Input プラグインもあり kafka からの入力を受け取ることもできます
kafka がすでにあればかなり簡単に使える印象です
0 件のコメント:
コメントを投稿