概要
rq-scheduler は Python で使えるスケジューラツールです
特定の時間や条件に応じて何か処理を実行させることができます
バックエンドには redis を使います
今回はインストールから簡単なサンプルコードを動かしてみます
環境
- macOS 11.7.6
- Python 3.10.2
- rq 1.15.0
- rq-scheduler 0.13.1
インストール
- pipenv install rq-scheduler
redis 起動
- brew services run redis
サンプルコード
流れとしては
- ジョブを redis に登録
- 登録したジョブをワーカーが受け取り実行
という感じになります
ジョブを登録する
rq-scheduler の cron 機能を使ってジョブを登録してみます
-
vim enqueue.py
from redis import Redis
from rq import Queue
from rq_scheduler import Scheduler
from datetime import timedelta
from lib.util import say
queue = Queue('bar', connection=Redis())
scheduler = Scheduler(queue=queue, connection=queue.connection)
# 指定の時間に実行するジョブを登録
# scheduler.enqueue_at(datetime(2023, 6, 6, 14, 36), say, "hello")
# 指定の間隔で実行するジョブを登録
# scheduler.enqueue_in(timedelta(seconds=5), say, "hello")
# cron 形式でジョブを登録
scheduler.cron(
"*/1 * * * *",
func=say,
args=["hello"],
)
scheduler.run() # なくてもいい
-
pipenv run app.py
まずこれを実行すると以下のようなキーが redis に登録されます
redis-cli keys '*'
1) "rq:job:53acc2fb-aa88-4627-9a4c-183eab6ee07a"
2) "rq:scheduler_instance:6f65031c38a2416bb1352d1fca46dfa9"
3) "rq:scheduler:scheduled_jobs"
rq:job から始まるキーが登録されたジョブの実態になります
さらにしばらくすると
redis-cli keys '*'
1) "rq:queue:bar"
2) "rq:queues"
3) "rq:job:53acc2fb-aa88-4627-9a4c-183eab6ee07a"
4) "rq:scheduler:scheduled_jobs"
5) "rq:scheduler_instance:6f65031c38a2416bb1352d1fca46dfa9"
となっておりジョブが実際に bar キューに登録された状態になります (rq:queue:bar )
ジョブを実行するワーカー
ワーカーは rq モジュールだけで OK です
実際にキューイングされたジョブを実行します
キューイングした際に指定したモジュールの関数はこのワーカーからも参照できる必要があるので注意しましょう (lib.util.say は自動でワーカーが参照してコールする)
-
vim worker.py
from rq import Worker, Queue
from redis import Redis
redis = Redis()
queue = Queue('bar', connection=redis)
if __name__ == "__main__":
worker = Worker(queues=[queue], connection=redis)
worker.work(with_scheduler=True)
-
pipenv run python worker.py
実行すると以下のようなログが流れます
15:50:00 Worker rq:worker:9f0f761294194e79a4bb9b87c0e35436 started with PID 90351, version 1.15.0
15:50:00 Subscribing to channel rq:pubsub:9f0f761294194e79a4bb9b87c0e35436
15:50:00 *** Listening on bar...
15:50:00 Cleaning registries for queue: bar
15:50:00 bar: lib.util.say('hello') (53acc2fb-aa88-4627-9a4c-183eab6ee07a)
hello
15:50:00 bar: Job OK (53acc2fb-aa88-4627-9a4c-183eab6ee07a)
15:50:00 Result will never expire, clean up result key manually
redis 内は以下のようになりました
keys *
1) "rq:clean_registries:bar"
2) "rq:job:53acc2fb-aa88-4627-9a4c-183eab6ee07a"
3) "rq:finished:bar"
4) "rq:scheduler_instance:6f65031c38a2416bb1352d1fca46dfa9"
5) "rq:worker:9f0f761294194e79a4bb9b87c0e35436"
6) "rq:scheduler-lock:bar"
7) "rq:queues"
8) "rq:workers"
9) "rq:results:53acc2fb-aa88-4627-9a4c-183eab6ee07a"
10) "rq:workers:bar"
11) "rq:scheduler:scheduled_jobs"
いろいろ増えていますが完了したジョブは rq:finished:bar にあります
結果は rq:results:53acc2fb-aa88-4627-9a4c-183eab6ee07a に格納されており今回の場合はあれば永遠に保存されます
ワーカーは実装しなくても動かせる
今回のように簡単なワーカーであれば以下のコマンドだけでも OK です
- pipenv run rq worker -q bar
エンキューの際に実行するべきメソッドが指定されているのでワーカーはどのメソッドを知る必要がないためコマンドだけでもワーカーが起動できるようになっています
再度エンキューを実行した場合に新たにジョブが登録される
もし app.py
を終了して再度 app.py
を実行すると新たにジョブが登録されます
つまり前に登録した cron ジョブは残ったままになるので app.py
を実行するたびに redis には新しいジョブが登録されます
ジョブの登録を解除する
もう前のジョブはいらないということであれば登録を解除してあげましょう
-
vim delete.py
from redis import Redis
from rq import Queue
from rq_scheduler import Scheduler
queue = Queue('bar', connection=Redis())
scheduler = Scheduler(queue=queue, connection=queue.connection)
jobs = scheduler.get_jobs()
for job in jobs:
if job.get_id() == "53acc2fb-aa88-4627-9a4c-183eab6ee07a":
print(scheduler.cancel(job))
これで再度 redis を確認すると rq:job:53acc2fb-aa88-4627-9a4c-183eab6ee07a というジョブが削除されているのが確認できると思います
最後に
rq-scheduler を試してみました
特定の時間に実行するジョブを登録しておけば定期的にワーカーが実行してくれるシステムを作りたいときに使えるかなと思います
実際はワーカーからのフィードバックやワーカーのタイムアウト、動的なジョブの登録に対応できるようにする必要がありそうです
0 件のコメント:
コメントを投稿