2023年1月27日金曜日

Celery の visibility timeout の挙動を確認してみた

Celery の visibility timeout の挙動を確認してみた

概要

タイトルの通りです

visibility timeout に設定した秒数が経過した場合にタスクがどうなるかの影響を試してみました

環境

  • macOS 11.7.2
  • Python 3.10.2
    • celery v5.2.7

サンプルコード

result_backend = "redis://localhost"
broker_url = "redis://localhost"
worker_prefetch_multiplier = 1
task_acks_late = True
result_backend_transport_options = broker_transport_options = { 'visibility_timeout': 5 }
import time
from celery import Celery

app = Celery("tasks")
app.config_from_object("celeryconfig")


@app.task(bind=True)
def add(self, x, y):
    return x + y


@app.task(bind=True)
def multi(self, x, y):
    time.sleep(1800)
    return x * y

起動

  • pipenv run celery -A tasks worker --loglevel=info
  • vim app.py

from celery import chain
from tasks import add, multi

tasks = chain(add.s(1, 2),
              multi.s(10),
              multi.s(2)).apply_async()

print(tasks.get())

起動

動作確認

実行時のログは以下のようになりました

[2023-01-27 11:50:19,854: INFO/MainProcess] Connected to redis://localhost:6379//
[2023-01-27 11:50:19,862: INFO/MainProcess] mingle: searching for neighbors
[2023-01-27 11:50:20,875: INFO/MainProcess] mingle: all alone
[2023-01-27 11:50:20,894: INFO/MainProcess] celery@node1.local ready.
[2023-01-27 11:50:24,696: INFO/MainProcess] Task tasks.add[f4ca797b-1a76-4800-8a5f-ce7e39ad727d] received
[2023-01-27 11:50:24,718: INFO/MainProcess] Task tasks.multi[322d8025-e7ea-43d6-8f7f-e54008d46db4] received
[2023-01-27 11:50:24,719: INFO/ForkPoolWorker-2] Task tasks.add[f4ca797b-1a76-4800-8a5f-ce7e39ad727d] succeeded in 0.019410155015066266s: 3
[2023-01-27 11:51:49,898: INFO/MainProcess] Task tasks.multi[322d8025-e7ea-43d6-8f7f-e54008d46db4] received
[2023-01-27 11:53:29,947: INFO/MainProcess] Task tasks.multi[322d8025-e7ea-43d6-8f7f-e54008d46db4] received

ちょっと解説

どうやら visibiity_timeout に設定した時間が経過してすぐに再キューされるわけではないようです

何度か試してみましたがまちまちで visibility_timeout = 5 のときは 1 分から 2 分後に再キューされるような挙動でした
なので visibility_timeout = 5 だからと言って time.speep(10) くらいにしてしまうと再キューが発生することなく正常に終了してしまいます

また ack_late = True のオプション (もしくは task_acks_late = True) に設定していないと visibility_timeout の時間が反映されないようなのでそこも注意が必要です

最後に

プリフェッチを無効にする場合に task_acks_late = True の設定が必須なのでその場合には visibiliti_timeout の設定も合わせて調整する必要がありそうです

参考サイト

2023年1月25日水曜日

pytest でコンストラクタをモックする際の注意事項

pytest でコンストラクタをモックする際の注意事項

概要

monkeypatch.setattr を使う場合に少し注意が必要です

環境

  • macOS 11.7.2
  • python 3.10.2
  • pytest 7.2.0

ディレクトリ構成

tree -a test module 

test
├── __init__.py
└── test_user.py
module
├── __init__.py
├── profile.py
└── user.py

サンプルコード

User クラスが Profile クラスをカプセル化しています
この Profile クラスに monkeypatch 当てることを想定します

  • vim module/profile.py
class Profile():

    def __init__(self, lang):
        self.lang = lang
  • vim module/user.py
from module.profile import Profile


class User():

    def __init__(self, name, age):
        self.name = name
        self.age = age
        self.profile = Profile("ruby")

    def show(self):
        print(self.name)
        print(self.age)
        print(self.profile.lang)

テストコード

  • vim test/test_user.py
from module.user import User

def test_show():
    user = User("hawksnowlog", 10)
    user.show()

上記が通常のテストコードになります
これで実行すると Profile.lang は「ruby」が表示されます

このテストコードに Profile に対して monkeypatch を当ててみます

from module.user import User


class DummyProfile():

    def __init__(self, _):
        self.lang = "python"


def test_show(monkeypatch):
    # monkeypatch.setattr("module.profile.Profile", DummyProfile)  # これだとうまく動かない
    monkeypatch.setattr("module.user.Profile", DummyProfile)
    user = User("hawksnowlog", 10)
    user.show()

これで pipenv run pytest -s を実行すると Profile.lang が python になっているのが確認できると思います

ポイント

monkeypatch.setattr する際のモジュールですが本来であれば実際に存在するモジュールパスを指定したくなりますが setattr を引数 2 つで指定する場合は モジュールを使用する側のパスを指定 します

これが謎な仕様ではあるのですがこうすることでパッチを当てられます

引数 3 つで setattr する場合には存在するクラスやモジュールのパスを指定するのですが 2 つの場合だけ使用する側のパスを指定する必要があるようです

参考サイト

2023年1月24日火曜日

python-mode 時のキーバンドを変更する方法

python-mode 時のキーバンドを変更する方法

概要

例えば run-python (Ctrl+c Ctrl+p) を別のキーに割り当てる方法を紹介します

環境

  • macOS 11.7.2
  • emacs 28.1

.emacs

(add-hook
 'python-mode-hook
 '(lambda ()
    (define-key python-mode-map "\C-c\C-p" 'treemacs)))

2023年1月23日月曜日

Celery カスタムクラスとして作成したタスクでハンドラの挙動を確認した

Celery カスタムクラスとして作成したタスクでハンドラの挙動を確認した

概要

前回 Celery のカスタムタスクを作成する方法を紹介しました
今回はカスタムタスクを使って各種イベントハンドラを使用する方法を紹介します
エラーやリトライ時の挙動をハンドリングできます

環境

  • macOS 11.7.2
  • Python 3.10.2
  • celery 5.2.7

サンプルコード

  • vim tasks.py
from celery import Celery

app = Celery("tasks")
app.config_from_object("celeryconfig")


class AddTask(app.Task):

    def __init__(self):
        self.name = "AddTask"

    def run(self, x, y, *args, **kwargs):
        # raise RuntimeError  # call to on_failure
        # self.retry()  # call to on_retry
        return x + y

    def before_start(self, task_id, *args, **kwargs):
        print("before_start")
        print(task_id)

    def after_return(self, status, retval, task_id, einfo = None, *args, **kwargs):
        print("after_return")
        print(status)

    def on_failure(self, exc, task_id, einfo = None, *args, **kwargs):
        print("on_failure")
        print(exc)

    def on_retry(self, exc, task_id, einfo = None, *args, **kwargs):
        print("on_retry")
        print(exc)

    def on_success(self, retval, task_id, *args, **kwargs):
        print("on_success")
        print(retval)


class MultiTask(app.Task):

    def __init__(self):
        self.name = "MultiTask"

    def run(self, x, y, *args, **kwargs):
        return x * y


app.register_task(AddTask())
app.register_task(MultiTask())
  • vim celeryconfig.py
result_backend = "redis://localhost"
broker_url = "redis://localhost"
worker_prefetch_multiplier = 1
task_acks_late = True

実行コード

  • vim app.py
from celery import chain
from tasks import AddTask, MultiTask


class Base():

    def __init__(self):
        first = AddTask().s(1, 2)
        second = MultiTask().s(10)
        third = MultiTask().s(2)
        self.tasks = chain(first,
                           second,
                           third)

    def run(self):
        return self.tasks.apply_async()

    def append_task(self, task):
        self.tasks.tasks.append(task)


class Workflow1(Base):

    def __init__(self):
        forth = MultiTask().s(3)
        super().__init__()
        self.append_task(forth)


if __name__ == '__main__':
    base = Base()
    print(base.run().get())
    wf1 = Workflow1()
    print(wf1.run().get())

ポイント

before_start と after_return はタスクが成功しようが失敗しようが必ずコールされます
他の on_success, on_failure, on_retry はその間で呼ばれるハンドラになります

リトライはデフォルトだと3回行います
失敗したあとに180秒待ったあとリトライを行います
on_retry が 3 回呼び出されて MaxRetriesExceededError になると最後に on_failure が呼び出されます

参考サイト

2023年1月21日土曜日

Celery5 でタスクをクラスとして定義する方法

Celery5 でタスクをクラスとして定義する方法

概要

過去 に紹介した方法と少し違った方法を紹介します

環境

  • macOS 11.7.2
  • Python 3.10.2
  • celery 5.2.7

サンプルコード

  • vim tasks.py
from celery import Celery

app = Celery("tasks")
app.config_from_object("celeryconfig")


class AddTask(app.Task):

    def __init__(self):
        self.name = "AddTask"

    def run(self, x, y, *args, **kwargs):
        return x + y


class MultiTask(app.Task):

    def __init__(self):
        self.name = "MultiTask"

    def run(self, x, y, *args, **kwargs):
        return x * y


app.register_task(AddTask())
app.register_task(MultiTask())
  • vim celeryconfig.py
result_backend = "redis://localhost"
broker_url = "redis://localhost"
worker_prefetch_multiplier = 1
task_acks_late = True

実行コード

  • vim app.py
from celery import chain
from tasks import AddTask, MultiTask


class Base():

    def __init__(self):
        first = AddTask().s(1, 2)
        second = MultiTask().s(10)
        third = MultiTask().s(2)
        self.tasks = chain(first,
                           second,
                           third)

    def run(self):
        return self.tasks.apply_async()

    def append_task(self, task):
        self.tasks.tasks.append(task)


class Workflow1(Base):

    def __init__(self):
        forth = MultiTask().s(3)
        super().__init__()
        self.append_task(forth)


if __name__ == '__main__':
    base = Base()
    print(base.run().get())
    wf1 = Workflow1()
    print(wf1.run().get())

少し解説

Celery から作成した app オブジェクトの Task クラスを継承して作成します

name フィールドは必須です
run メソッドも必須です

定義したタスククラスは 実行する場合は app.register_task を使って登録します
この登録方法が前回少し異なっていました
app.tasks.register を使うとなぜか AttributeError: 'NoneType' object has no attribute 'push' というエラーが発生して request_stack.push ができなくタスクが登録できませんでした

タスクを呼び出す場合は普通にクラスを生成する感じで呼び出します
あとはこれまで通りと同じように s() や chain を使ってタスクを扱います

動作確認

  • pipenv run celery -A tasks worker --loglevel=info
  • pipenv run python app.py

最後に

Celery のタスクのクラス化はドキュメントが少ない印象があるのでちょくちょく紹介していこうかなと思っています

2023年1月20日金曜日

slots を使ったクラスの属性を kwargs から初期化する方法

slots を使ったクラスの属性を kwargs から初期化する方法

概要

slots を使ったクラスを定義している場合に使えるテクニックを紹介します

環境

  • macOS 11.7.2
  • Python 3.10.2

サンプルコード

class User():
    __slots__ = (
        "name",
        "age"
    )


class Test():
    def __init__(self, **kwargs):
        self.user = User()
        for k, v in kwargs.items():
            setattr(self.user, k, v)


t1 = Test(name="hawk", age=20)
print(t1.user.name)
t2 = Test(error=-1)

解説

__slots__ は属性を宣言することができる機能です
宣言するだけなのでコンストラクタで初期化したりすることはできずまた __dict__ が生成されないのでメモリの節約になります

kwargs などをそのまま slots を使ったクラスに展開する場合は setattr を使うと良いです
slots に定義されている属性はそのまま設定されますが slots にない属性を setattr しようとするとエラーになるので注意しましょう

2023年1月19日木曜日

emacs で選択した箇所を大文字に変換する方法

emacs で選択した箇所を大文字に変換する方法

概要

いつも忘れるのでメモ

環境

  • Ubuntu 18.04
  • emacs 27.1

コマンド

upcase-region を使います

  • 変換する箇所をリージョン選択
  • M-x upcase-region

2023年1月13日金曜日

macOS で標準入力待ちしている他のプロセスにテキストを送信する方法

macOS で標準入力待ちしている他のプロセスにテキストを送信する方法

概要

macOS の場合 /proc ディレクトリがなく file descriptor を参照できないので tty を使います

環境

  • macOS 11.7.2

ターミナル1: 標準入力受付

  • cat

これで標準入力受付状態になります

  • ps aux | grep cat

でプロセス番号を取得しておきます

ターミナル2: tty を確認する

  • lsof -p 26819

26819 は先程取得した cat のプロセス番号です
これで使用している tty がわかるのでメモします

COMMAND   PID         USER   FD   TYPE DEVICE SIZE/OFF                NODE NAME
cat     26819 username0000  cwd    DIR    1,9      192               34773 /Users/username0000/Downloads
cat     26819 username0000  txt    REG    1,9   121984 1152921500312764495 /bin/cat
cat     26819 username0000  txt    REG    1,9  2547856 1152921500312766735 /usr/lib/dyld
cat     26819 username0000    0u   CHR   16,2 0t270214                 667 /dev/ttys002
cat     26819 username0000    1u   CHR   16,2 0t270214                 667 /dev/ttys002
cat     26819 username0000    2u   CHR   16,2 0t270214                 667 /dev/ttys002

動作確認

  • echo "hello" > /dev/ttys002

これでターミナル1側にメッセージが出力されることが確認できます

2023年1月12日木曜日

solargraph に curl で jsonrpc を直接送信してみる

solargraph に curl で jsonrpc を直接送信してみる

概要

普段はエディタの拡張を使って裏で通信しています
拡張を書く場合などに curl を使ってテストしたいなと思ったので solargraph に curl で jsonrpc リクエストを送信してみました

環境

  • macOS 11.7.2
  • Ruby 3.1.2
  • solargraph 0.44.3

solargraph インストール

  • gem install solargraph

solargraph 起動 (ソケットモード)

  • solargraph socket
Solargraph is listening PORT=7658 PID=24943

curl でリクエストを送信

  • curl --http0.9 -XPOST -d '{"jsonrpc":"2.0","method":"$/solargraph/checkGemVersion","id":"1"}' localhost:7658

結果

Content-Length: 188

{"jsonrpc":"2.0","method":"window/showMessageRequest","params":{"type":3,"message":"Solargraph gem version 0.48.0 is available. (Current version: 0.44.3)","actions":["Update now"]},"id":1}

Content-Length: 79

188バイト送信して79バイト受信しています

最後に

stdio モードで起動した solargraph にエディタから通信する方法はあるのだろうか

2023年1月11日水曜日

celery のタスクで chain 作成後にタスクを挿入する方法

celery のタスクで chain 作成後にタスクを挿入する方法

概要

chain は celery でタスクを順番に実行することができるワークフロー機能です
chain で作成したタスクの一覧に対して実行する前にタスクの順番を変えたい場合があると思います
そんなときのテクニックを紹介します

環境

  • macOS 11.7.2
  • Python 3.10.2
  • celery 5.2.7

タスク

from celery import Celery

app = Celery("tasks")
app.config_from_object("celeryconfig")


@app.task(bind=True)
def add(self, x, y):
    return x + y


@app.task(bind=True)
def multi(self, x, y):
    return x * y

サンプルコード

from celery import chain
from tasks import add, multi

first = add.s(1, 2)
second = multi.s(10)
third = multi.s(2)
forth = multi.s(3)

tasks = chain(first,
              second,
              third)
# tasks.tasks.insert(2, forth)
tasks.tasks.append(forth)

result = tasks.apply_async()
print(result.get())

ちょっと解説

chain で生成されたオブジェクトは _chain クラスのオブジェクトになります
このクラスにある tasks プロパティは配列で管理されておりこの中にタスクの一覧が入っています
あとはこの配列に対してタスクの追加などを行うだけです

もう少し汎用的にしてみる

この仕組みを応用してもう少し汎用的な形にしてみます
基本となる chain のタスク一覧を管理するクラスを作成しそのタスク一覧をもとに別の chain を実行するワークフローを作成します

from celery import chain
from tasks import add, multi


class Base():

    def __init__(self):
        first = add.s(1, 2)
        second = multi.s(10)
        third = multi.s(2)
        self.tasks = chain(first,
                           second,
                           third)

    def run(self):
        return self.tasks.apply_async()

    def append_task(self, task):
        self.tasks.tasks.append(task)


class Workflow1(Base):

    def __init__(self):
        forth = multi.s(3)
        super().__init__()
        self.append_task(forth)


if __name__ == '__main__':
    base = Base()
    print(base.run().get())
    wf1 = Workflow1()
    print(wf1.run().get())

こんな感じに記載することで chain で実行するタスクのリストを重複することなく管理する ことができるようになります

最後に

さらにタスクの一覧と chain オブジェクトを別のクラスとして管理してもいいかなと思います

テストも書きやすくなるかなと思います

2023年1月10日火曜日

Ruby から vault にアクセスしてみる

Ruby から vault にアクセスしてみる

概要

前回 vault + docker でシークレットを登録するところまでやってみました
今回は Ruby からアクセスする方法を紹介します

環境

  • macOS 11.7.2
  • docker 20.10.21
  • vault 1.12.1
  • Ruby 3.1.2

サンプルコール

require 'vault'

Vault.configure do |config|
  config.address = "http://192.168.1.2:8200"
  config.token = "hvs.xxxxxxx"
  config.ssl_verify = false
end

Vault.with_retries(Vault::HTTPConnectionError) do
  # Vault.kv("kv").list
  secret1 = Vault.kv("kv").read("secret1")
  p secret1.data
end

ポイント

key-value ストアのエンジンを作成した場合は kv メソッドを使用します
kv メソッドに渡している「kv」は自分で作成したエンジンのパス名になるのでご自身のパスに合わせて修正してください

read でシークレットの情報を取得します
data 配下に登録した key-value のシークレット情報があるのでこれを使用します

もし read するパスがわからない場合は list というメソッドで一覧を取得することもできます

参考サイト

2023年1月6日金曜日

vault をコンテナで起動してみた

vault をコンテナで起動してみた

概要

シークレット管理ツールの vault を docker コンテンとして起動する方法を紹介します
過去 に CLI ベースで操作する方法を紹介しましたが今回は UI ベースで勧めます

起動後の初期設定と簡単なシークレットの作成方法も紹介します

環境

  • macOS 11.7.2
  • docker 20.10.21
  • vault 1.12.1

起動

  • docker run --cap-add=IPC_LOCK -e 'VAULT_LOCAL_CONFIG={"storage": {"file": {"path": "/vault/file"}}, "listener": [{"tcp": { "address": "0.0.0.0:8200", "tls_disable": true}}], "default_lease_ttl": "168h", "max_lease_ttl": "720h", "ui": true}' -p 8200:8200 vault server

開発用として起動します
UI 付きで SSL オフで起動します

UI アクセス

localhost:8200/ui で vault の管理 UI にアクセスできます

ルートキーとアンシールキーの数としきい値の設定

まずはルートキーを作成します
ルートキーは 1 つしかありません

ルートキーを解除するためにアンシールキーを作成します
アンシールキーは複数の管理者に配布するキーになっており例えば 5 人に配布してそのうち 3 人のアンシールキーを使わないとルートキーを解除することができません

簡単に説明すると

  • ルートキー・・・vault にアクセスするためのキー
  • アンシールキー・・・ルートキーを使用するためのキー、最低必要キー数を決められる

ルートキーを忘れたり配布したアンシールキーの最低数を分のキーを忘れてしまうと vault には永遠にアクセスできなくなってしまうので注意しましょう

キーの作成完了

どちらもメモしておきましょう
入力する際は基本的にアンシールキー -> ルートキーの順番になります

試しにアンシールする

配布したアンシールキーを使用して vault にアクセスできるようにします
今回は 1 つ分のキーでアンシールできますが threshold で指定した数分アンシールキーが必要になります

vault にログイン

Token を選択して発行されたルートキーを使ってログインしましょう
ちなみに Method のプルダウンを見ると他のサービスの OAuth などとも連携できるのでわざわざルートキーを使ってログインする必要もなくせそうです

シークレットエンジンの作成

とりあえずお試しなので key-value ストアで作成します

右上の「Enable new engine」を選択

Generic -> KV を選択

Path を入力して「Enable Engine」で作成

パスワード生成

作成した key-value ストアにシークレット情報を登録してみます

「Create secret」を選択

Path を入力
Secret data に「key」「value」でシークレット情報を入力
「Save」を選択

これでシークレットの登録ができます

最後に

docker コンテナで vault を立ち上げる方法を紹介しました
トークンは忘れないようにしましょう
今回はローカルファイル上に保存されるのでコンテナを削除されるとすべてのシークレット情報が削除されるので注意してください

次回は Ruby から登録したシークレット情報を取得する方法を紹介します

参考サイト

2023年1月5日木曜日

licensed で python を使う場合の .licensed.yml の設定

licensed で python を使う場合の .licensed.yml の設定

概要

Python の場合は少し設定ファイルの記載が特殊なので注意しましょう

環境

  • macOS 11.7.2
  • Ruby 3.1.2
    • licensed 3.9.1

サンプル .licensed.yml

expected_dependency: Jinja2
source_path: ./devops
python:
  virtual_env_dir: /home/user/.local/share/virtualenvs/app-Yn-xfOlh
sources:
  pip: true
allowed:
  - mit
  - apache-2.0
  - other
  - none

ポイントは virtual_env_dirpip: true の設定です

あとは cache -> status でいつも通り使えます