2023年6月8日木曜日

Pythonのrq-schedulerを試す

Pythonのrq-schedulerを試す

概要

rq-scheduler は Python で使えるスケジューラツールです
特定の時間や条件に応じて何か処理を実行させることができます
バックエンドには redis を使います
今回はインストールから簡単なサンプルコードを動かしてみます

環境

  • macOS 11.7.6
  • Python 3.10.2
    • rq 1.15.0
    • rq-scheduler 0.13.1

インストール

  • pipenv install rq-scheduler

redis 起動

  • brew services run redis

サンプルコード

流れとしては

  1. ジョブを redis に登録
  2. 登録したジョブをワーカーが受け取り実行

という感じになります

ジョブを登録する

rq-scheduler の cron 機能を使ってジョブを登録してみます

  • vim enqueue.py
from redis import Redis
from rq import Queue
from rq_scheduler import Scheduler
from datetime import timedelta
from lib.util import say

queue = Queue('bar', connection=Redis())
scheduler = Scheduler(queue=queue, connection=queue.connection)


# 指定の時間に実行するジョブを登録
# scheduler.enqueue_at(datetime(2023, 6, 6, 14, 36), say, "hello")

# 指定の間隔で実行するジョブを登録
# scheduler.enqueue_in(timedelta(seconds=5), say, "hello")

# cron 形式でジョブを登録
scheduler.cron(
    "*/1 * * * *",
    func=say,
    args=["hello"],
)

scheduler.run()  # なくてもいい
  • pipenv run app.py

まずこれを実行すると以下のようなキーが redis に登録されます

redis-cli keys '*'
1) "rq:job:53acc2fb-aa88-4627-9a4c-183eab6ee07a"
2) "rq:scheduler_instance:6f65031c38a2416bb1352d1fca46dfa9"
3) "rq:scheduler:scheduled_jobs"

rq:job から始まるキーが登録されたジョブの実態になります
さらにしばらくすると

redis-cli keys '*'
1) "rq:queue:bar"
2) "rq:queues"
3) "rq:job:53acc2fb-aa88-4627-9a4c-183eab6ee07a"
4) "rq:scheduler:scheduled_jobs"
5) "rq:scheduler_instance:6f65031c38a2416bb1352d1fca46dfa9"

となっておりジョブが実際に bar キューに登録された状態になります (rq:queue:bar )

ジョブを実行するワーカー

ワーカーは rq モジュールだけで OK です
実際にキューイングされたジョブを実行します

キューイングした際に指定したモジュールの関数はこのワーカーからも参照できる必要があるので注意しましょう (lib.util.say は自動でワーカーが参照してコールする)

  • vim worker.py
from rq import Worker, Queue
from redis import Redis

redis = Redis()
queue = Queue('bar', connection=redis)


if __name__ == "__main__":
    worker = Worker(queues=[queue], connection=redis)
    worker.work(with_scheduler=True)
  • pipenv run python worker.py

実行すると以下のようなログが流れます

15:50:00 Worker rq:worker:9f0f761294194e79a4bb9b87c0e35436 started with PID 90351, version 1.15.0
15:50:00 Subscribing to channel rq:pubsub:9f0f761294194e79a4bb9b87c0e35436
15:50:00 *** Listening on bar...
15:50:00 Cleaning registries for queue: bar
15:50:00 bar: lib.util.say('hello') (53acc2fb-aa88-4627-9a4c-183eab6ee07a)
hello
15:50:00 bar: Job OK (53acc2fb-aa88-4627-9a4c-183eab6ee07a)
15:50:00 Result will never expire, clean up result key manually

redis 内は以下のようになりました

keys *
 1) "rq:clean_registries:bar"
 2) "rq:job:53acc2fb-aa88-4627-9a4c-183eab6ee07a"
 3) "rq:finished:bar"
 4) "rq:scheduler_instance:6f65031c38a2416bb1352d1fca46dfa9"
 5) "rq:worker:9f0f761294194e79a4bb9b87c0e35436"
 6) "rq:scheduler-lock:bar"
 7) "rq:queues"
 8) "rq:workers"
 9) "rq:results:53acc2fb-aa88-4627-9a4c-183eab6ee07a"
10) "rq:workers:bar"
11) "rq:scheduler:scheduled_jobs"

いろいろ増えていますが完了したジョブは rq:finished:bar にあります
結果は rq:results:53acc2fb-aa88-4627-9a4c-183eab6ee07a に格納されており今回の場合はあれば永遠に保存されます

ワーカーは実装しなくても動かせる

今回のように簡単なワーカーであれば以下のコマンドだけでも OK です

  • pipenv run rq worker -q bar

エンキューの際に実行するべきメソッドが指定されているのでワーカーはどのメソッドを知る必要がないためコマンドだけでもワーカーが起動できるようになっています

再度エンキューを実行した場合に新たにジョブが登録される

もし app.py を終了して再度 app.pyを実行すると新たにジョブが登録されます
つまり前に登録した cron ジョブは残ったままになるので app.py を実行するたびに redis には新しいジョブが登録されます

ジョブの登録を解除する

もう前のジョブはいらないということであれば登録を解除してあげましょう

  • vim delete.py
from redis import Redis
from rq import Queue
from rq_scheduler import Scheduler

queue = Queue('bar', connection=Redis())
scheduler = Scheduler(queue=queue, connection=queue.connection)

jobs = scheduler.get_jobs()
for job in jobs:
    if job.get_id() == "53acc2fb-aa88-4627-9a4c-183eab6ee07a":
        print(scheduler.cancel(job))

これで再度 redis を確認すると rq:job:53acc2fb-aa88-4627-9a4c-183eab6ee07a というジョブが削除されているのが確認できると思います

最後に

rq-scheduler を試してみました
特定の時間に実行するジョブを登録しておけば定期的にワーカーが実行してくれるシステムを作りたいときに使えるかなと思います

実際はワーカーからのフィードバックやワーカーのタイムアウト、動的なジョブの登録に対応できるようにする必要がありそうです

参考サイト

0 件のコメント:

コメントを投稿