2020年12月25日金曜日

Python のソースコードをフォーマットする yapf を使ってみた

概要

yapf は Python のコードを自動フォーマットしてれる静的解析ツールです
今回は簡単なサンプルを使っていろいろとフォーマットしてみました

環境

  • macOS 11.1
  • Python 3.8.5

インストール

  • pipenv install yapf

試すコード

  • vim test.py
test_list = ["Apple", "Orange", "Banana", "Grape", "Strawberry", "Pineapple", "Cherry", "Pear", "Watermelon"]

test_dict = {
    "name": "hawksnowlog",
    "age": 10,
    "langs": ["Ruby", "Swift", "Python", "Golang", "JavaScript", "Java"]
}

test_tuple = (
    test_list,
    test_dict,
    {
        "1": "January",
        "2": "February",
        "3": "March",
        "4": "April",
        "5": "May"
    }
)

実行

  • PYTHONPATH=./ pipenv run yapf test.py

これでフォーマット後のコードが表示されます
上書きする場合は -i オプションを使います

  • PYTHONPATH=./ pipenv run yapf -i test.py

結果

配列など 1 行が長くなる場合には自動で改行されます
dict はなぜか key/value が改行されるようです
tuple に関しては要素を 1 行ごとに書いていたのが修正されてしまいました

test_list = [
    "Apple", "Orange", "Banana", "Grape", "Strawberry", "Pineapple", "Cherry",
    "Pear", "Watermelon"
]

test_dict = {
    "name":
    "hawksnowlog",
    "age":
    10,
    "langs": [
        "Ruby", "Swift", "Python", "Golang", "JavaScript", "Java", "HTML5",
        "CSS"
    ]
}

test_tuple = (test_list, test_dict, {
    "1": "January",
    "2": "February",
    "3": "March",
    "4": "April",
    "5": "May"
})

スタイルを変更してみる

デフォルトでは pep8 が使われているみたいです
google スタイルも使えるので使ってみます
スタイルを変更するには --style=google を指定します
また style は辞書形式で指定することもでき複数のスタイルを適用する場合は辞書形式で指定しましょう

  • PYTHONPATH=./ pipenv run yapf --style=google test.py


test_list = [
    "Apple", "Orange", "Banana", "Grape", "Strawberry", "Pineapple", "Cherry",
    "Pear", "Watermelon"
]

test_dict = {
    "name":
        "hawksnowlog",
    "age":
        10,
    "langs": [
        "Ruby", "Swift", "Python", "Golang", "JavaScript", "Java", "HTML5",
        "CSS"
    ]
}

test_tuple = (test_list, test_dict, {
    "1": "January",
    "2": "February",
    "3": "March",
    "4": "April",
    "5": "May"
})

dict に関しては若干スタイルが変わっていました

yapf を適用しない

特定の行だけ yapf を適用しない方法もあります
#yapf disable というコメントを使います

行末で使った場合はその行だけを適用しない範囲にします
行頭で使った場合は複数行に対して yapf を適用しないことができます
#yapf enable が出現する行までは適用しないようにすることができます

test_list = ["Apple", "Orange", "Banana", "Grape", "Strawberry", "Pineapple", "Cherry", "Pear", "Watermelon"] # yapf: disable

test_dict = {
    "name":
        "hawksnowlog",
    "age":
        10,
    "langs": [
        "Ruby", "Swift", "Python", "Golang", "JavaScript", "Java", "HTML5",
        "CSS"
    ]
}

# yapf: disable
test_tuple = (
    test_list,
    test_dict,
    {
        "1": "January",
        "2": "February",
        "3": "March",
        "4": "April",
        "5": "May"
    }
)
# yapf: enable

1 行の最大文字数を設定する

column_limit を使います
デフォルトでは 80 行以上にならないように改行されます

  • PYTHONPATH=./ pipenv run yapf --style='{based_on_style:google,column_limit:100}' test.py
test_list = [
    "Apple", "Orange", "Banana", "Grape", "Strawberry", "Pineapple", "Cherry", "Pear", "Watermelon"
]

test_dict = {
    "name": "hawksnowlog",
    "age": 10,
    "langs": ["Ruby", "Swift", "Python", "Golang", "JavaScript", "Java", "HTML5", "CSS"]
}

test_tuple = (test_list, test_dict, {
    "1": "January",
    "2": "February",
    "3": "March",
    "4": "April",
    "5": "May"
})

設定ファイルを使う

.style.yapf というファイルを作成するとオプションの指定をファイルに記載することができます

  • vim .style.yapf
[style]
based_on_style = google
column_limit = 100


  • PYTHONPATH=./ pipenv run yapf test.py


これで先程と同じ結果になります

最後に

公式にも記載していますが手動のフォーマットのほうがきれいに見える場合があるのでその場合は disable を積極的に使っていきましょう

参考サイト

2020年12月24日木曜日

Flask-Migrate でカラム名を変更すると既存のカラムのデータがなくなってしまう

概要

flask-migrate を使ってカラム名を変更すると既存のカラムの情報が新規のカラムに引き継がれません
原因はマイグレーションスクリプト内で drop_column -> add_column しているからです
今回はカラム名を変更してもデータをロストせずにマイグレートする方法を紹介します

環境

  • macOS 11.1
  • Python 3.8.5
    • flask 1.1.2
    • flask-migrate 2.5.3

事象の確認

まずは本当にカラム名を変更したら既存のカラムのデータが引き継がれないのか確認します

Flask アプリ

  • vim my_app/__init__.py
from flask import Flask
from flask_migrate import Migrate
from my_app.database import db

app = Flask(__name__)
app.debug = True
app.config['SQLALCHEMY_DATABASE_URI'] = "mysql+mysqldb://root:@localhost/test?charset=utf8"
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
app.config['SQLALCHEMY_ECHO'] = False

db.init_app(app)
migrate = Migrate(app, db)

database.py

  • vim my_app/database.py
from flask_sqlalchemy import SQLAlchemy

db = SQLAlchemy()

class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(128))
    age = db.Column(db.Integer)

    def __repr__(self):
        return "%s,%s,%i" % (self.id, self.name, self.age)

データベース作成

  • mysql> create database test;

初期マイグレート

  • FLASK_APP=my_app pipenv run flask db init
  • FLASK_APP=my_app pipenv run flask db migrate -m "Initial migration."
  • FLASK_APP=my_app pipenv run flask db upgrade

    作成されるテーブル情報は以下の通りです
mysql> show create table user \G *************************** 1. row *************************** Table: user Create Table: CREATE TABLE `user` ( `id` int NOT NULL AUTO_INCREMENT, `name` varchar(128) COLLATE utf8mb4_general_ci DEFAULT NULL, `age` int DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci 1 row in set (0.00 sec)

テストデータ登録

  • mysql> insert into user values (null, 'hawk', 10);
  • mysql> insert into user values (null, 'snowlog', 20);1
  • mysql> insert into user values (null, 'hawksnowlog', 30);

テーブル構成変更

name カラムを first_name というカラム名に変更してみます

  • vim my_app/database.py
from flask_sqlalchemy import SQLAlchemy

db = SQLAlchemy()

class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    first_name = db.Column(db.String(128))
    age = db.Column(db.Integer)

    def __repr__(self):
        return "%s,%s,%i" % (self.id, self.name, self.age)
  • FLASK_APP=my_app pipenv run flask db migrate -m "A name column to first_name."
  • FLASK_APP=my_app pipenv run flask db upgrade


これで user テーブルを確認すると first_name カラムのデータが NULL になってしまっているのが確認できます

mysql> select * from user; +—-+——+————+ | id | age | first_name | +—-+——+————+ | 1 | 10 | NULL | | 2 | 20 | NULL | | 3 | 30 | NULL | +—-+——+————+

対策

flask db migarte 時に自動生成されるマイグレートスクリプトを修正することで対応してみます

マイグレートスクリプトの編集

まずは migrate するところまで進めましょう
upgrade までしてしまうと適用されてしまうので upgrade はまだしません

  • FLASK_APP=my_app pipenv run flask db migrate -m "A name column to first_name."

そして生成されるマイグレートスクリプトを編集します
リビジョン番号はそのたびに違うので自信の環境に併せて書き換えてください

upgrade と downgrade 関数を以下のように書き換えます
ポイントは alter_column という関数を使ってカラムの情報を書き換えるようにします
また existing_typetype_ を指定しましょう
前者は変更前のカラムのタイプで後者は変更後のカラムのタイプになります
今回はタイプは変更しないので同じものを指定しています


* vim migrations/versions/be9a17e479a6_a_name_column_to_first_name.py

def upgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.alter_column('user', 'name', nullable=True, new_column_name='first_name', type_=sa.String(128), existing_type=sa.String(128))
    # ### end Alembic commands ###


def downgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.alter_column('user', 'first_name', nullable=True, new_column_name='name', type_=sa.String(128), existing_type=sa.String(128))
    # ### end Alembic commands ###

動作確認: upgrade してみる

書き換えたマイグレートスクリプトを使って upgrade してみます

  • FLASK_APP=my_app pipenv run flask db upgrade

これでテーブルを確認するとちゃんと前のデータが引き継がれていることが確認できると思います

mysql> select * from user; +—-+————-+——+ | id | first_name | age | +—-+————-+——+ | 1 | hawk | 10 | | 2 | snowlog | 20 | | 3 | hawksnowlog | 30 | +—-+————-+——+ 3 rows in set (0.00 sec)



また upgrade 後のテーブル情報は以下のようになっています

mysql> show create table user \G *************************** 1. row *************************** Table: user Create Table: CREATE TABLE `user` ( `id` int NOT NULL AUTO_INCREMENT, `first_name` varchar(128) COLLATE utf8mb4_general_ci DEFAULT NULL, `age` int DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci 1 row in set (0.00 sec)

最後に

flask-migrate を愚直に使うと drop_column してデータをロストしかねないのでマイグレートするときは必ず alter_column を使うようにしましょう

参考サイト

2020年12月22日火曜日

Youtube のライブ中のコメントを curl でサクっと取得する方法

概要

Youtube Data API v3 と Youtube Live Streaming API を使います
コメントではなくチャットを取得します
コメントは動画の下に表示される非同期のメッセージでチャットはライブ中に右に表示されるリアルタイムのメッセージになります

当然ですが今回取得する方法はライブ中でなければ取得できないので注意してください

環境

  • macOS 11.1
  • Youtube Data API v3
  • Youtube Live Streaming API

事前準備: Youtube API Key の取得

こちらの方法で取得しましょう
GCP の登録が必須です

事前準備: Youtube Data API v3 の有効化

こちらの方法で有効化してください

Youtube Data API v3 を使って activeLiveChatId を取得する

curl \
  'https://youtube.googleapis.com/youtube/v3/videos?part=liveStreamingDetails&id=xxxxxxxxxxx&key=AIzaxxxxxxxxxxxxxxxxxxxxxxxxxxxx' \
  --header 'Accept: application/json' \
  --compressed

id はライブ配信中の動画 ID になります
ブラウザアクセスして URL バーに表示される ID になります
レスポンス内の JSON に含まれる activeLiveChatId をメモしておきます

なおライブ配信中でないと activeLiveChatId は含まれないので activeLiveChatId がない場合はライブ配信中になっているか確認しましょう

Youtube Streaming API と activeLiveChatId を使ってチャット情報を取得する

curl \
  'https://youtube.googleapis.com/youtube/v3/liveChat/messages?liveChatId=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx&part=authorDetails%2Csnippet&key=AIzaxxxxxxxxxxxxxxxxxxxxxxxxxxxx' \
  --header 'Accept: application/json' \
  --compressed

part は authorDetailssnippet をカンマ区切りで指定しましょう
これで

トラブルシューティング: quotaExceeded になる場合は

なぜか quota を超えていないのに quotaExceeded になる場合があります
自分は新規でプロジェクトを作成し直してキーを作成しました

おまけ: API Explorer を活用しよう

今回紹介した curl コマンドは API Explorer を使えば簡単に作成できます
curl 以外にも Python や Java のコードも作成できるのでプログラムとして使うこともできます

Youtube Data API v3: Videos: list
https://developers.google.com/youtube/v3/docs/videos/list

Youtube Live Streaming API: LiveChatMessages: list
https://developers.google.com/youtube/v3/live/docs/liveChatMessages/list?hl=en

最後に

コードに落とす前に curl で手動で動作確認しておくとプログラミングするときに流れを理解した状態で書けるので結果的に書きやすくなると思います

SDK があれば SDK を使っても良いと思います

参考サイト

2020年12月21日月曜日

Flask でファイルを配置したらメンテナスモードにする

概要

ファイルを配置するだけでメンテナスモードにできる Flask アプリを試してみました

環境

  • macOS 11.1
  • Python 3.8.5
    • flask 1.1.2

サンプルコード

  • vim app.py
import os
from flask import Flask, abort, make_response

app = Flask(__name__)

@app.before_request
def check_under_maintenance():
    if os.path.exists("maintenance"):
        abort(503)

@app.route('/')
def index():
    return "ok"

@app.errorhandler(503)
def error_503(error):
    return make_response({"code":"server_maintenance_mode"}, 503)

動作確認

  • FLASK_APP=app.py pipenv run flask run

    で起動します

  • curl localhost:5000

    => ok


ファイルを配置するとメンテナンスモードになります
アプリの再起動は不要です

  • touch maintenance
  • curl localhost:5000

    => {"code":"server_maintenance_mode"}

最後に

アプリ層が大量にある場合は辛いので前段にリバースプロキシや LB などを配置してそこでメンテナスモードにするほうが良いと思います

参考サイト

2020年12月18日金曜日

flask.g を使う場合は同一アプリコンテキスト内でないと参照できないので注意

概要

前回 flask.g の使い方を紹介しました
前回のサンプルは暗黙的に同一アプリコンテキスト配下で g を使っていました
flask は複数のアプリを 1 つのコードで書ける仕組みになっているためコンテキストを意識しないとコンテキスト配下にある g をうまく参照できないので注意が必要です

環境

  • macOS 11.1
  • Python 3.8.5
    • flask 1.1.2

アプリコード

例えば @app.route 外で profile.set して profile.get@app.route 内で参照するとします
この場合 set と get は同じアプリコンテキストから g を参照しないと同一の値が取得できません

  • vim app.py
from lib.profile import Profile

from flask import Flask
from flask import g

app = Flask(__name__)

profile = Profile(app)
profile.set()

@app.route("/")
def index():
    name, age, time = profile.get()
    return "{} - {} - {}".format(name, age, time)

@app.route("/update")
def update():
    profile.update_time()
    return "updated"

flask.g を管理するクラス

クラスにしてアプリケーションコンテキストを属性として管理します
こうすることで同一のアプリケーションコンテキストを参照することができます

  • vim lib/profile.py
import datetime
from flask import g


class Profile:
    def __init__(self, app):
        self.app = app
        self.ctx = app.app_context()

    def set(self, name='hawksnowlog', age=10, time=None):
        with self.ctx:
            if 'name' not in g:
                g.name = name 
            if 'age' not in g:
                g.age = age
            if 'time' not in g:
                g.time = datetime.datetime.now().isoformat()

    def get(self):
        with self.ctx:
            return g.name, g.age, g.time

    def update_time(self):
        with self.ctx:
            g.time = datetime.datetime.now().isoformat()

動作確認

  • FLASK_APP=app.py pipenv run flask run
  • curl localhost:5000/
  • curl localhost:5000/update

2020年12月17日木曜日

docker swram 上で使える swarm-cronjob をサクっと使ってみる

概要

swarm-cronjob は docker swarm 環境で定期的に実行させるコンテナを管理することができます
今回は簡単なサンプルを紹介します

環境

  • Ubuntu 18.04
  • docker 19.03.13

swarm-cronjob を swarm 環境にデプロイする

このコンテンなスケジューリング管理してくれます
とりあえず manager ノードで動作させます
指定可能な環境変数はこちらが参考になります

  • vim docker-compose.yml
version: "3.2"
services:
  swarm-cronjob:
    image: crazymax/swarm-cronjob
    volumes:
      - "/var/run/docker.sock:/var/run/docker.sock"
    environment:
      - "LOG_LEVEL=info"
    deploy:
      placement:
        constraints:
          - node.role == manager
  • docker stack deploy -c docker-compose.yml swarm_cronjob

cron で実行するコンテナのデプロイ

ラベルを指定するだけです
あとは好きなコンテナをデプロイすれば OK です

  • vim docker-compose2.yml
version: "3.2"
services:
  test:
    image: busybox
    command: date
    deploy:
      mode: replicated
      replicas: 0
      labels:
        - "swarm.cronjob.enable=true"
        - "swarm.cronjob.schedule=* * * * *"
        - "swarm.cronjob.skip-running=false"
      restart_policy:
        condition: none
  • docker stack deploy -c docker-compose2.yml test

    デプロイ後にログを確認すると1分ごとに date が実行されているのが確認できると思います

  • docker service logs -f test_test

参考サイト

2020年12月16日水曜日

flask でリクエストのコンテキストに独自の変数を格納する方法

概要

一番簡単なのは flask.g を使うことです
リクエストを跨いで参照することはできないのでリクエスト単位で値を保存したい場合に便利です

環境

  • macOS 10.15.7
  • Python 3.8.5
    • flask 1.1.2

サンプルコード

  • vim app.py
from lib.profile import get_profile

from flask import Flask
from flask import g

app = Flask(__name__)

@app.route("/")
def index():
    set_profile()
    name, age, time = get_profile()
    return "{} - {} - {}".format(name, age, time)

@app.teardown_appcontext
def teardown_profile(exception):
    name = g.pop('name', None)
    age = g.pop('age', None)
    time = g.pop('time', None)
    address = g.pop('address', None)
    print("{} - {} - {} - {}".format(name, age, time, address))
  • vim lib/profile.py
import datetime
from flask import g

def set_profile(name='hawksnowlog', age=10, time=None):
    if 'name' not in g:
        g.name = name 
    if 'age' not in g:
        g.age = age
    if 'time' not in g:
        g.time = datetime.datetime.now().isoformat()

def get_profile():
    return g.name, g.age, g.time

解説

命名規則は get_X/set_X という感じで X にリソース名を指定します
そうすることで teardown_X も使えるようになります
これは teardown はリクエストコンテキストのスコープがなくなる直前にコールされるシグナルになります


値を取得するときも値を保存するときも from flask import g をするだけで OK です

動作確認

  • FLASK_APP=app.py pipenv run flask run

注意点

グローバル変数になるので Flask の Extension なども使えます
なので X のリソース名の部分が被ったり g.name のように name の部分が被ることがあります
基本は被らないように注意する感じですがアプリのコンテキスト

参考サイト

2020年12月15日火曜日

Python の logging で JSON 内にある日本語を表示するテクニック

概要

JSON のバリュー側に日本語があるとうまく表示されないケースがあります
json モジュールを使って dict - string の変換をうまく行う必要があります

環境

  • macOS 10.15.7
  • Python 3.8.5

通常の logger の場合

# -*- coding: utf-8 -*-
import logging
import json

logging.basicConfig(level = logging.DEBUG)
logger = logging.getLogger(__name__)

d = '{"msg":"こんにちわ"}'
dd = json.loads(d)

logger.info(d) # ちゃんと日本語が表示される
logger.info(type(d))

logger.info(dd)
logger.info(dd["msg"]) # ちゃんと日本語が表示される
logger.info(type(dd))

logger.info(json.dumps(dd))
logger.info(json.dumps(dd, ensure_ascii=False)) # ちゃんと日本語が表示される


まず日本語の JSON 文字列を json.loads で dict に変換する場合は冒頭のマジックコメントが必要です
これがないと SyntaxError: Non-ASCII character '\xe3' のエラーになります

logger を使った場合に正常に表示されるパターンは 3 種類で


* 日本語を含む文字列の JSON をそのまま logger.info した場合
* dict に変換後日本語のバリューに直接アクセスした場合
* json.dumps で ensure_ascii=False を指定して dict を文字列に戻した場合

になります
特に最後の ensure_ascii=False が重要で一度 json.loads で dict に変換した場合内部的には unicode で扱われているので普通に戻しても unicode で戻ってしまうので注意しましょう

python-json-logger を使う方法

過去に紹介した python-json-logger を使った場合には以下のようになります

import logging
from flask import Flask
from pythonjsonlogger import jsonlogger

app = Flask(__name__)

handler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter(json_ensure_ascii=False)
handler.setFormatter(formatter)
logging.getLogger().addHandler(handler)

@app.route("/")
def index():
    app.logger.setLevel(logging.INFO)
    app.logger.info("��スシ")
    return 'ok'

ちゃんと json_ensure_ascii のオプションを指定できるようにしてくれているのでこれに False を指定すれば OK です

参考サイト

2020年12月14日月曜日

fluent-plugin-kafka を使ってみた

概要

fluentd から kafka にメッセージを送信することができるプラグインがあるので使ってみました
kafka の構築に関してはこちらを参考にしてください
また今回 fluentd はコンテナで動作させます

環境

  • macOS 10.15.7
  • fluentd
  • kafka 2.6.0

事前準備

kafka と zookeeper を起動させておきましょう

  • brew services start zookeeper
  • brew services start kafka

fluent-plugin-kafka がインストールされたイメージの作成

fluent/fluentd には fluent-plugin-kafka がインストールされていないのでインストールされているイメージを作成します

  • vim Dockerfile
FROM fluent/fluentd

RUN apk add --update --virtual .build-deps \
        sudo build-base ruby-dev \
 && sudo gem install \
        fluent-plugin-kafka zookeeper \
 && sudo gem sources --clear-all \
 && apk del .build-deps \
 && rm -rf /var/cache/apk/* \
           /home/fluent/.gem/ruby/*/cache/*.gem


  • docker build -t my_fluentd .

fluent.conf の作成

次に作成した fluentd イメージ上で動作させる設定ファイルを作成します
コンテナを作成する場合にホストマシン上のファイルをマウントして動作させます

今回はわかりやすいように copy を使って kafka にログを流すのと同時に fluentd コンテナの標準出力にもログを出しています

  • vim fluent.conf
<source>
  @type forward
  port 24224
  bind 0.0.0.0
</source>

<match docker.**>
  @type copy
  <store>
    @type stdout
  </store>
  <store>
    @type kafka2
    brokers 192.168.1.2:9092
    zookeeper 192.168.1.2:2181
    default_topic test
    <format>
      @type json
    </format>
  </store>
</match>

fluentd コンテナの起動

作成したイメージと設定ファイルを使ってコンテナ起動します
問題なくコンテナが起動していることを確認しましょう

  • docker run -d -p 24224:24224 -p 24224:24224/udp -v $(pwd):/fluentd/etc -e FLUENTD_CONF=fluent.conf my_fluentd

動作確認用コンテナの作成

何でも OK です
今回は JSON の情報を echo で 3 秒おきに出力するコンテナにしています
ロギングドライバだけ fluentd を指定しましょう

  • docker run --rm --log-driver=fluentd --log-opt fluentd-address=192.168.1.2:24224 --log-opt tag="docker.{{.Name}}" alpine /bin/sh -c "while :;do echo \"{\"timestamp\":\"$(date)\"}\"; sleep 3; done;"

動作確認

kafka-console-consumer を使って fluentd からログが飛んできているか確認しましょう
fluentd がデフォルトで 10s バッファするのでログが飛んでくるのは 10s おきになっているのが確認できると思います

  • kafka-console-consumer --bootstrap-server 192.168.1.2:9092 --topic test --from-beginning


また fluentd コンテナで logs を確認しても良いと思います
そもそも fluentd コンテナに出力されていない場合は kafka にも当然ログは飛んできません

トラブルシューティング

zookeeper が localhost でしか LISTEN していない場合には設定ファイルを編集しましょう

  • vim /usr/local/etc/kafka/zookeeper.properties
clientPort=2181
clientPortAddress=192.168.1.2

最後に

fluent-plugin-kafka を使ってみました
今回は Output プラグインを使いましたが Input プラグインもあり kafka からの入力を受け取ることもできます

kafka がすでにあればかなり簡単に使える印象です

参考サイト

2020年12月12日土曜日

MacOS 上で kafka をインストールして使ってみる

概要

kafka はストリームでメッセージのやり取りをするための基盤です
今回は MacOS 上で簡単に動かす方法を紹介します

環境

  • macOS 10.15.7
  • kafka 2.6.0
  • zookeeper 3.6.2

kafka/zookeeper インストール

  • brew install kafka

一緒に zookeeper もインストールされます

kafka/zookeeper 起動

  • brew services start zookeeper
  • brew services start kafka

zookeeper は localhost:2181, kafka は localhost:9092 で起動します

トピックの作成

  • kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

    すべて必須のパラメータになります

  • --create でトピックの作成

  • --zookeeper localhost:2181 で zookeeper の LISTEN アドレスの指定
  • --replication-factor 1 でレプリカを 1 つのみ生成
  • --partitions 1 でパーティションを 1 つのみ生成
  • --topic でトピック名を指定

replication-factor と partitions についてはこのあたりがイメージしやすいかなと思います

トピック確認

  • kafka-topics --list --zookeeper localhost:2181

    => test

  • kafka-topics --describe --zookeeper localhost:2181 --topic test

Topic: test PartitionCount: 1 ReplicationFactor: 1 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0

メッセージの受信準備

  • kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning

    メッセージ待受状態になります

メッセージの送信

  • kafka-console-producer --broker-list localhost:9092 --topic test

    インタラクティブモードになった適当に文字列を入力しエンターします
    そして受信状態のコンソールにメッセージが表示されれば OK です

最後に

次回は fluentd から kafka にメッセージを送信してみたいと思います

参考サイト

2020年12月11日金曜日

Celery の before_task_publish を使ってみた

概要

before_task_publish タスクが実行される前にある処理を実行することができるハンドラです
今回は簡単な使い方を紹介します

環境

  • macOS 10.15.7
  • Python 3.8.5
    • celery 4.4.7

とりあえず使ってみる

  • vim sub_tasks.py
from celery import Celery
from celery.signals import before_task_publish

app = Celery('sub_tasks', backend='redis://localhost', broker='redis://localhost')

@before_task_publish.connect
def before_task_publish_test(**kwargs):
    print("before_task_publish")

@app.task
def add(x, y):
    return x + y
  • pipenv run celery -A sub_tasks worker --loglevel=info

実行してみるとわかりますが before_task_publish で表示される print 文はクライアント側で表示されます
これはデバッグ時に結構ハマりポイントなので注意しましょう

header を使ってみる

Celery のタスクには header というカスタムフィールドがあり辞書形式で値を設定することができます
例えば before_task_publish で header に値を設定しタスク側で取得するといったことができるようになります

import datetime

from celery import Celery, current_task
from celery.signals import before_task_publish

app = Celery('sub_tasks', backend='redis://localhost', broker='redis://localhost')

@before_task_publish.connect
def before_task_publish_test(headers, **kwargs):
    headers['time'] = datetime.datetime.now().isoformat()
    # headers['username'] = 'hawksnowlog'

@app.task
def add(x, y):
    time = current_task.request.get('time', None)
    print(time)
    return x + y

実行メイン

  • vim main.py
from sub_tasks import add

add.delay(100, 1)
  • pipenv run python main.py

参考サイト

2020年12月10日木曜日

flask でデフォルトのエラーログをカスタマイズする方法

概要

Flask で raise された場合デフォルトのエラーログはすべて ERROR レベルで出力されます
またエラーのトレース情報もそのまま表示されます

エラーレベルやトレース情報をカスタマイズする方法を紹介します

環境

  • macOS 10.15.07
  • Python 3.8.5
    • flask 1.1.2

log_execption を実装しログレベルを変更する

Flask クラスを継承して独自の Flask クラスを作成しその中で log_execption メソッドを実装することでエラーログの内容をカスタマイズできます

例えばエラーレベルを WARN にする場合は以下のようにします

import logging
from flask import Flask, request

class MyFlask(Flask):
    def log_exception(self, exc_info):
        self.logger.warn(f"Exception on {request.path} [{request.method}]", exc_info=exc_info)

app = MyFlask(__name__)

@app.route('/error')
def error():
    raise Exception

@app.errorhandler(500)
def abort(error):
    return 'abort'

トレース情報を変更する

exc_info が長いのでエラーが発生した箇所だけログに出したいという場合には以下のようにします

import logging
import traceback
from flask import Flask, request

class MyFlask(Flask):
    def log_exception(self, exc_info):
        tb = exc_info[2]
        tb_str = traceback.format_exception(exc_info[0], exc_info[1], exc_info[2])
        error_line = tb_str[-2].replace("\n", " ")
        self.logger.warn(f"Exception on {request.path} [{request.method}]{error_line}", exc_info=False)

app = MyFlask(__name__)

@app.route('/error')
def error():
    raise Exception

@app.errorhandler(500)
def abort(error):
    return 'abort'

JsonFormatter と組み合わせる

python-json-logger を組み合わせると以下のような感じです

import logging
import traceback
from flask import Flask, request
from pythonjsonlogger import jsonlogger

class MyFlask(Flask):
    def log_exception(self, exc_info):
        tb = exc_info[2]
        tb_str = traceback.format_exception(exc_info[0], exc_info[1], exc_info[2])
        error_line = tb_str[-2].replace("\n", " ")
        self.logger.warn(f"Exception on {request.path} [{request.method}]{error_line}", exc_info=False)

app = MyFlask(__name__)

class CustomJsonFormatter(jsonlogger.JsonFormatter):
    def add_fields(self, log_record, record, message_dict):
        super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict)

handler = logging.StreamHandler()
formatter = CustomJsonFormatter('%(asctime)s %(name)s %(levelname)s %(message)s')
handler.setFormatter(formatter)
logging.getLogger().addHandler(handler)

@app.route('/error')
def error():
    raise Exception

@app.errorhandler(500)
def abort(error):
    return 'abort'

参考サイト

2020年12月9日水曜日

Celery で flask のリクエスト ID を出力する方法

概要

前回 flask で request-id を出力する方法を紹介しました
今回は生成した request-id をバックグラウンドで動作する celery でも出力する方法を紹介します

環境

  • macOS 10.15.7
  • Python 3.8.5
    • flask 1.1.2
    • flask-log-request-id 0.10.1
    • celery 5.0.3

サンプルアプリとワーカー

  • vim app.py
import logging
from celery import Celery
from celery.utils.log import get_task_logger
from celery.signals import after_setup_task_logger, after_setup_logger
from flask import Flask
from flask_log_request_id import RequestID, RequestIDLogFilter, current_request_id
from flask_log_request_id.extras.celery import enable_request_id_propagation

# flask 用の celery 作成
def make_celery(app):
    celery = Celery(
        app.import_name,
        backend='redis://localhost:6379',
        broker='redis://localhost:6379'
    )
    celery.conf.update(app.config)

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery

# ロギングハンドラ作成
def make_handler():
    handler = logging.StreamHandler()
    handler.addFilter(RequestIDLogFilter())
    handler.setFormatter(logging.Formatter("%(asctime)s - %(name)s - level=%(levelname)s - request_id=%(request_id)s - %(message)s"))
    return handler

# flask アプリ作成
app = Flask(__name__)
RequestID(app)
logging.getLogger().addHandler(make_handler())
app.logger.setLevel(logging.INFO)

my_celery = make_celery(app)
enable_request_id_propagation(my_celery)

@app.route('/')
def hello():
    ret = add.delay(100, 1).get()
    app.logger.info(ret)
    return 'ok'

# celery ログ設定
@after_setup_task_logger.connect
def setup_task_logger(logger, *args, **kwargs):
    logger.handlers = []
    handler = make_handler()
    logger.addHandler(handler)

@after_setup_logger.connect
def setup_root_logger(logger, *args, **kwargs):
    logger.handlers = []
    handler = make_handler()
    logger.addHandler(handler)

# celery タスク
@my_celery.task
def add(x, y):
    logger = get_task_logger(__name__)
    logging.info("add")
    return x + y

アプリ起動

  • FLASK_APP=app.py pipenv run celery -A app.my_celery worker --loglevel=info 1>/dev/null

    ワーカー起動

  • FLASK_APP=app.py pipenv run flask run 1>/dev/null

サンプルログ

2020-12-07 10:39:07,019 - celery.app.trace - level=INFO - request_id=100e1cfc-a7a2-44b6-9530-572b3313ddc0 - Task app.add[1ca2cdcc-80f8-4e16-8037-a6d30c9eca1d] succeeded in 0.011482631999996329s: 101

current_request_id が動作していないっぽい

ワーカ側で試しに current_request_id() をコールしてみたところ常に None が返ってきました
しかし celery.app.trace のログにはちゃんと request_id が乗ってきていることから celery 側で参照はできるようになっているものの current_request_id だけはうまく動作しないような状況なのかもしれません

追記: 回避策

一応あったので紹介します
自分で before_task_publish を使って header にリクエスト ID を設定する感じです
これで current_task からリクエスト ID が辿れるようになります
また request_id が null でかつ x_request_id が extra で指定されている場合には既存の request_id フィールドを x_request_id フィールドで上書きする対応もしています

import logging
from celery import Celery, current_task
from celery.utils.log import get_task_logger
from celery.signals import after_setup_task_logger, after_setup_logger, before_task_publish
from flask import Flask
from flask_log_request_id import RequestID, RequestIDLogFilter, current_request_id
from flask_log_request_id.extras.celery import enable_request_id_propagation
from pythonjsonlogger import jsonlogger

# flask 用の celery 作成
def make_celery(app):
    celery = Celery(
        app.import_name,
        backend='redis://localhost:6379',
        broker='redis://localhost:6379',
    )
    celery.conf.update(app.config)

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery

# ロギングハンドラ作成
class CustomJsonFormatter(jsonlogger.JsonFormatter):
    def add_fields(self, log_record, record, message_dict):
        super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict)
        if not log_record.get('request_id') and log_record.get('x_request_id'):
            log_record['request_id'] = log_record.get('x_request_id')
            del log_record['x_request_id']

def make_handler():
    handler = logging.StreamHandler()
    handler.addFilter(RequestIDLogFilter())
    formatter = CustomJsonFormatter("%(asctime)s %(name)s %(levelname)s %(request_id)s %(message)s")
    handler.setFormatter(formatter)
    return handler

# flask アプリ作成
app = Flask(__name__)
RequestID(app)
logging.getLogger().addHandler(make_handler())
app.logger.setLevel(logging.INFO)

@app.route('/')
def hello():
    ret = add.delay(100, 1).get()
    app.logger.info(ret, extra={"username":"hawksnowlog"})
    return str(ret)

# celery 作成
celery = make_celery(app)
logger = get_task_logger(__name__)
enable_request_id_propagation(celery)
_CELERY_X_HEADER = 'x_request_id'

# celery ログ設定
@after_setup_task_logger.connect
def setup_task_logger(logger, *args, **kwargs):
    logger.handlers = []
    handler = make_handler()
    logger.addHandler(handler)

@after_setup_logger.connect
def setup_root_logger(logger, *args, **kwargs):
    logger.handlers = []
    handler = make_handler()
    logger.addHandler(handler)

# celery タスク
@before_task_publish.connect
def insert_request_id_header(headers, **kwargs):
    if _CELERY_X_HEADER not in headers:
        request_id = current_request_id()
        headers[_CELERY_X_HEADER] = request_id

@celery.task
def add(x, y):
    req_id = current_task.request.get(_CELERY_X_HEADER, None)
    logger.info("success add task", extra={'x_request_id':req_id})
    return x + y

あまり良い方法ではないかなと感じているので参考まで

参考サイト

2020年12月8日火曜日

jq で標準エラーをパイプする方法

概要

標準エラーは基本的にパイプできないので標準入力にリダイレクトしてあげましょう

環境

  • macOS 10.15.7
  • jq 1.5

動作する

  • echo '{"key":"value"}' | jq .

動作しない

  • ruby -e "STDERR.puts '{\"key\":\"value\"}'" | jq .

対処する

  • ruby -e "STDERR.puts '{\"key\":\"value\"}'" 2>&1 | jq .

2>&1 で標準エラーを標準リクエストにリダイレクトできます

おまけ: 標準エラーの tail -f を jq する場合は

  • ruby hoge.rb 2>&1 | while read line; do echo $line | jq -R '. as $line | try fromjson catch "not json"'; done

-R は入力された情報をパースせずにそのまま使用するオプションです
ダブルクォートなどが削除されません
as 変数格納で . で受け取った情報を $line という変数に格納して次のパイプで参照できるようにします
あとは try .. catch で受け取った $line を評価します
JSON ではない場合は「not json」が表示されます

テストスクリプトは以下です

loop do
  STDERR.puts '{"key":"value"}'
  STDERR.puts 'hoge'
  sleep(1)
end

参考サイト

2020年12月7日月曜日

Celery のロギングをカスタマイズする方法

概要

Celery のロギングにはワーカの開始ログなどを出力するルートロガーとタスクが出力するタスクロガーがあります
それぞれのロガーは個別に設定するためのメソッドが用意されているのでそれを使ってカスタマイズします

今回はおすすめのロガーカスタマイズを簡単に紹介します

環境

  • macOS 10.15.7
  • Python 3.8.5
    • Celery 4.4.7

after_setup_logger と after_setup_task_logger を使う

前者がルートロガーをカスタマイズできるメソッドで後者がタスクロガーをカスタマイズできるロガーです
それを踏まえた上で以下のようにロガーをカスタマイズします

  • vim sub_tasks.py
import logging

from celery import Celery
from celery.signals import after_setup_logger, after_setup_task_logger
from celery.utils.log import get_task_logger

app = Celery('sub_tasks', backend='redis://localhost', broker='redis://localhost')
logger = get_task_logger(__name__)


@after_setup_logger.connect
def setup_root_logger(logger, *args, **kwargs):
    logger.handlers = []
    handler = logging.StreamHandler()
    handler.setLevel(logging.INFO)
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)


@after_setup_task_logger.connect
def setup_task_logger(logger, *args, **kwargs):
    logger.handlers = []
    handler = logging.StreamHandler()
    handler.setLevel(logging.INFO)
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)


@app.task
def add(x, y):
    logger.info("add")
    return x + y

解説

setup_root_loggersetup_task_logger の引数で受け取れる logger はそれぞれ Celery がデフォルトで用意しているルートロガーとタスクロガーになります
それぞれハンドラが登録されているので一旦空にしましょう
そしてそのあとで独自のハンドラを登録することでデフォルトのログを非表示しつつ独自のログを出力できるようになります

今回はルートロガーにもタスクロガーにも同じハンドラを設定していますがルートロガーはファイルハンドラを使うとかもできます
ちなみにルートロガーは celery.worker などのモジュール用のロガーになります
ワーカーの起動やタスクの開始と終了時に表示されるデフォルトのログなどを管理しています

タスク側で使用するロガーは get_task_logger を使う

上記のサンプルでは実際にタスク内でログを出力する際のロガーは get_task_logger を使って取得しています
こうすることで after_setup_task_logger で設定したタスクロガーを適切に取得することができます

動作確認

  • vim main.py
from sub_tasks import add

add.delay(100, 1)


これでタスクを起動して動作確認しましょう

  • pipenv run celery -A sub_tasks worker --loglevel=info
  • pipenv run python main.py


以下のようなログが表示されるようになります

2020-12-04 11:40:29,144 - celery.worker.consumer.connection - INFO - Connected to redis://localhost:6379// 2020-12-04 11:40:29,155 - celery.worker.consumer.mingle - INFO - mingle: searching for neighbors 2020-12-04 11:40:30,211 - celery.worker.consumer.mingle - INFO - mingle: all alone 2020-12-04 11:40:30,308 - celery.apps.worker - INFO - celery@local ready. 2020-12-04 11:40:33,235 - celery.worker.strategy - INFO - Received task: sub_tasks.add[ec404b76-2e26-4ed9-a266-0031f351c0f5] 2020-12-04 11:40:33,238 - sub_tasks - INFO - add 2020-12-04 11:40:33,246 - celery.app.trace - INFO - Task sub_tasks.add[ec404b76-2e26-4ed9-a266-0031f351c0f5] succeeded in 0.009275625999999981s: 101

最後に

Celery のロガーをカスタマイズする方法を紹介しました
getLogger などを使ってもできそうですがちゃんと Celery 側で設定するためのメソッドを用意してくれているのでそれを使ったほうが良いと思います

参考サイト

2020年12月5日土曜日

flask で JSON 形式のログを出力する方法

概要

前回リクエスト ID をログに埋め込む方法を紹介しました
今回はそれを JSON で出力してみます
使用するライブラリは python-json-logger です

環境

  • macOS 10.15.7
  • Python 3.8.5
  • flask 1.1.2
  • python-json-logger 2.0.1

インストール

  • pipenv install python-json-logger

サンプルコード

jsonlogger.JsonFormatter をフォーマッタに使います
指定された文字列や dict 情報を JSON に変換して表示します

import logging
from flask import Flask
from pythonjsonlogger import jsonlogger

app = Flask(__name__)

handler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter()
handler.setFormatter(formatter)
logging.getLogger().addHandler(handler)

@app.route('/')
def hello():
    app.logger.setLevel(logging.INFO)
    app.logger.info("hello")
    return 'ok'

=> {"message": "hello"}

これだと message しか表示されません

日付やログレベルも出力してみる

flask の場合はすでにいくつかの情報が record に設定されているのでそれを参照するだけです
asctime や name, levelname が使えます (参考)

import logging
from flask import Flask
from pythonjsonlogger import jsonlogger

app = Flask(__name__)

handler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter('%(asctime)s %(name)s %(levelname)s %(message)s')
handler.setFormatter(formatter)
logging.getLogger().addHandler(handler)

@app.route('/')
def hello():
    app.logger.setLevel(logging.INFO)
    app.logger.info("hello")
    return 'ok'

=> {"asctime": "2020-12-03 14:44:39,950", "name": "app", "levelname": "INFO", "message": "hello"}

これで flask でデフォルトで表示されている情報も JSON で表示されるようになれます

カスタムフィールドを追加する

独自のフィールドも追加できます
その場合は jsonlogger.JsonFormatter を継承したクラスを作成して add_fields メソッドを実装します
log_record の dict に key/value を追加していき Formatter を作成する際に追加したフィールドを参照します

import logging
from flask import Flask
from pythonjsonlogger import jsonlogger

app = Flask(__name__)

class CustomJsonFormatter(jsonlogger.JsonFormatter):
    def add_fields(self, log_record, record, message_dict):
        super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict)
        log_record["user"] = "hawksnowlog"

handler = logging.StreamHandler()
formatter = CustomJsonFormatter('%(asctime)s %(name)s %(levelname)s %(message)s %(user)s')
handler.setFormatter(formatter)
logging.getLogger().addHandler(handler)

@app.route('/')
def hello():
    app.logger.setLevel(logging.INFO)
    app.logger.info("hello")
    return 'ok'

参考サイト

2020年12月4日金曜日

docker stack deploy で logging driver の fluentd を使う方法

概要

Swarm クラスタ上にデプロイしたコンテナで fluentd のロギングドライバを使う方法を紹介します
デプロイには docker-compose を使います

環境

  • macOS 10.15.7
  • docker-machine 0.16.2

Swarm クラスタ構築

こちらを参考に事前に構築してください
今回は上記で構築した想定で docker-machine + vagrant 上で確認します

fluentd コンテナの起動

まずは fluentd コンテナを起動します
この fluentd に Swarm クラスタ上で動作しているコンテナからログを投げて動作確認します

  • vim fluent.conf
<source>
  @type forward
</source>

<match docker.**>
  @type stdout
</match>


  • docker run -d -p 24224:24224 -p 24224:24224/udp -v $(pwd):/fluentd/etc -e FLUENTD_CONF=fluent.conf fluent/fluentd

docker-machine で構築したのであればリモートの docker を操作してもちゃんとローカルにあるファイルを転送してくれるようです

docker-compose の準備

次に Swarm 上にデプロイするコンテナを定義します
今回は簡単に確認できるように nginx のコンテナでも立ててみます
192.168.100.109 は fluentd コンテナを立ち上げたホストになります

version: '3.7'
services:
  web:
    image: nginx
    ports:
      - "80:80"
    deploy:
      replicas: 2
    logging:
      driver: fluentd
      options:
        fluentd-address: 192.168.99.109:24224
        tag: docker.{{.ID}}


  • docker stack deploy -c docker-compose.yml test

動作確認

各ホストにデプロイされた nginx にアクセスしてログがちゃんと fluentd コンテナに飛んでいるか確認します

  • docker logs -f angry_pare

angry_pare は fluentd コンテナの名前になります

  • curl $(docker-machine ip master)
  • curl $(docker-machine ip node1)

これで fluentd 側にログが流れていることが確認できれば OK です

2020-12-03 03:17:25.000000000 +0000 docker.bc58810c5e27: {"log":"10.0.0.2 - - [03/Dec/2020:03:17:25 +0000] \"GET / HTTP/1.1\" 200 612 \"-\" \"curl/7.69.1\" \"-\"","container_id":"bc58810c5e2769927422c23098fbe800c8e94b3d7e00a95434057b6642bdc4b8","container_name":"/test_web.1.wm9thorpotwi3v71l064v6b7f","source":"stdout"} 2020-12-03 03:17:32.000000000 +0000 docker.5a3def862387: {"container_id":"5a3def8623872a47750b3a66bf01a7776ddd130c65530f3d26d309a781cc58f5","container_name":"/test_web.2.zd5jilpjax5qk1qkwdr8c021a","source":"stdout","log":"10.0.0.3 - - [03/Dec/2020:03:17:32 +0000] \"GET / HTTP/1.1\" 200 612 \"-\" \"curl/7.69.1\" \"-\""}

最後に

fluentd コンテナは fluent.conf ファイルがうまく転送できれば docker-compose に混ぜても良いと思います
あとは転送したログを好きな場所に飛ばせば OK です

参考サイト

2020年12月3日木曜日

macOS 上の docker-machine で Swarm クラスタを構築してみた

概要

MacOS 上に docker-compose をインストールして Vagrant 上に docker swarm 環境を構築してみました

環境

  • macOS 10.15.7
  • docker-machine 0.16.2

インストール

  • brew install docker-machine

マスタノード構築

  • docker-machine create master

作成できたマスタノードの IP は 192.168.99.109 とします

init とトークンの取得

  • docker-machine ssh master
  • docker swarm init --advertise-addr 192.168.99.109

これでノードを join させるためのコマンドが取得できるのでメモしておきます

ワーカノード構築

  • docker-machine create node1

join

  • docker-machine ssh node1
  • docker swarm join --token SWMTKN-1-3xj5zbof09ut20zpbtzrja6jls9hk35f2vz9yq46yal9b098ed-1jmee8ltbsacdghmjxc4esx7e 192.168.99.109:2377

動作確認

  • docker-machine ls
NAME ACTIVE DRIVER STATE URL SWARM DOCKER ERRORS master - virtualbox Running tcp://192.168.99.109:2376 v19.03.12 node1 - virtualbox Running tcp://192.168.99.110:2376 v19.03.12
  • eval "$(docker-machine env master)"
  • docker node ls
ID HOSTNAME STATUS AVAILABILITY MANAGER STATUS ENGINE VERSION r5u75mh9bt2krfnvodr4m4kxp * master Ready Active Leader 19.03.12 wgpe9ktvo7dcd6tcbqha56hyn node1 Ready Active 19.03.12

おまけ: 環境変数のリセット

  • eval "$(docker-machine env -u)"

トラブルシューティング

docker run swarm create でトークンを生成しようとすると以下のエラーになり取得できません
なので今回は docker-machine ssh を使ってトークンを取得しています

2020/12/03 02:50:12 Post https://discovery.hub.docker.com/v1/clusters: dial tcp: lookup discovery.hub.docker.com on 10.0.2.3:53: no such host

最後に

docker-machine を使った場合でもちゃんと init -> join の流れで構築しましょう

参考サイト

2020年12月1日火曜日

flask でリクエスト ID をログに埋め込む方法

概要

flask でリクエスト ID を生成しログに含める方法を紹介します
flask-log-request-id というモジュールがあるのでこれを使うと比較的簡単に実現できます

環境

  • macOS 10.15.7
  • Python 3.8.5
    • flask 1.1.2
    • flask-log-request-id 0.10.1

インストール

  • pipenv install flask-log-request-id

現在のリクエスト ID を取得する

current_request_id() を参照するだけです

  • vim app.py
import logging
from flask import Flask
from flask_log_request_id import RequestID, current_request_id

app = Flask(__name__)
RequestID(app)

@app.route('/')
def hello():
    app.logger.setLevel(logging.INFO)
    app.logger.info(current_request_id())
    return 'ok'
  • FLASK_APP=app.py pipenv run flask run 1>/dev/null

ロガーにリクエスト ID を追加する

ハンドラを作成して RequestIDLogFilter を追加します

  • vim app.py
import logging
from flask import Flask
from flask_log_request_id import RequestID, RequestIDLogFilter

app = Flask(__name__)
RequestID(app)

handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(asctime)s - %(name)s - level=%(levelname)s - request_id=%(request_id)s - %(message)s"))
handler.addFilter(RequestIDLogFilter())
logging.getLogger().addHandler(handler)

@app.route('/')
def hello():
    app.logger.setLevel(logging.INFO)
    app.logger.info("hello")
    return 'ok'
  • FLASK_APP=app.py pipenv run flask run 1>/dev/null

レスポンスのヘッダに含める

@app.after_request で response オブジェクトに追加するだけです

  • vim app.py
import logging
from flask import Flask
from flask_log_request_id import RequestID, RequestIDLogFilter, current_request_id

app = Flask(__name__)
RequestID(app)

handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(asctime)s - %(name)s - level=%(levelname)s - request_id=%(request_id)s - %(message)s"))
handler.addFilter(RequestIDLogFilter())
logging.getLogger().addHandler(handler)

@app.after_request
def append_request_id(response):
    response.headers.add('X-Request-Id', current_request_id())
    return response

@app.route('/')
def hello():
    app.logger.setLevel(logging.INFO)
    app.logger.info("hello")
    return 'ok'

おまけ: werkzeug のログを表示させない

  • vim app.py
import logging
from flask import Flask
from flask_log_request_id import RequestID, RequestIDLogFilter, current_request_id

app = Flask(__name__)
RequestID(app)

handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(asctime)s - %(name)s - level=%(levelname)s - request_id=%(request_id)s - %(message)s"))
handler.addFilter(RequestIDLogFilter())
logging.getLogger().addHandler(handler)

wlogger = logging.getLogger('werkzeug')
wlogger.setLevel(logging.ERROR)

@app.route('/')
def hello():
    app.logger.setLevel(logging.INFO)
    app.logger.info("hello")
    return 'ok'

最後に

celery とのシナジーもあるモジュールなので celery を使っている場合にはオススメです

参考サイト