概要
celery でワークフローを実現するための仕組みとして Canvas という仕組みがあります
これはワークフローを実現するための機能で様々なメソッドが提供されています
今回は chain や group と言ったワークフローを実現するためのメソッドを試してみました
環境
- macOS 10.15.5
- Python 3.7.3
chain を試す
まずは chain を試してみます
chain は簡単に言えば前に実行したタスクの結果を使って次のタスクを実行することができる機能です
まずはいくつかのタスクを作成します
ここで定義したタスクを chain でつなぎ合わせてみます
from celery import Celery
app = Celery('sub_tasks', backend='redis://localhost', broker='redis://localhost')
@app.task
def add(x, y):
return x + y
@app.task
def multi(x, y):
return x * y
ワーカーとして起動しておきましょう
pipenv run celery -A sub_tasks worker -l info
次にタスクを chain でつなぎ合わせ実際に実行するスクリプトを作成します
from celery import chain
from sub_tasks import add, multi
tasks = chain(add.s(1, 2), multi.s(10)).apply_async()
print(tasks.get())
ポイントは multi
というタスクの引数が 1 つになっているところです
これは add の結果 (3) を 1 つ目の引数として渡すためで multi の引数を 2 つにしてしまうと引数が合わないよというエラーになってしまいます
あとは実行すればワーカーで実行された 2 つのタスクの結果が返ってくると思います
pipenv run python3 chain_tasks.py
=> 30
[2020-06-02 15:37:59,798: INFO/ForkPoolWorker-2] Task sub_tasks.add[5e35ecc1-19e5-405a-b355-21578f1d85cd] succeeded in 0.03750605799999107s: 3
[2020-06-02 15:37:59,814: INFO/ForkPoolWorker-4] Task sub_tasks.multi[081a469e-03ea-4273-8884-2a43a4988dcb] succeeded in 0.014889055999987022s: 30
group を試す
group はまとめたタスクを並列で実行することができる機能です
先程の chain は前のタスクの結果に依存するため前のタスクが終わらないとダメですが group はすべてのタスクが並列に実行されます
sub_task.py
は先程作成したものを使います
from celery import group
from sub_tasks import add, multi
tasks = group(add.s(1, 2), multi.s(10, 10)).apply_async()
print(tasks.get())
group の場合前のタスクの結果を参照しないのでちゃんとそれぞれのタスクの引数は 2 つ指定してあげます
これで実行すると各タスクで実行された結果が配列で取得できます
pipenv run python3 group_test.py
=> [3, 100]
[2020-06-02 15:41:21,307: INFO/ForkPoolWorker-4] Task sub_tasks.multi[129a3b43-ab45-4578-b659-f50c6bd8de5c] succeeded in 0.0007359960000030696s: 100
[2020-06-02 15:41:21,307: INFO/ForkPoolWorker-2] Task sub_tasks.add[c6d50e58-abb5-469f-8106-0fbe219cbe11] succeeded in 0.000807351000048584s: 3
chord を試してみる
コードと読みます
chord は group に似ていますが並列処理の最後に指定したコールバックメソッド用のタスクを実行することができます
sub_tasks.py
を少し書き換えます
from celery import Celery
app = Celery('sub_tasks', backend='redis://localhost', broker='redis://localhost')
@app.task
def add(x, y):
return x + y
@app.task
def multi(x, y):
return x * y
@app.task
def total(ary):
return sum(ary)
from celery import chord
from sub_tasks import add, multi, total
tasks = chord((add.s(1, 2) for i in range(5)), total.s()).apply_async()
print(tasks.get())
1 + 2 = 3 を 5 回繰り返して生成される配列に対してそれぞれの要素を足し合わせた結果 (15) を返します
pipenv run python3 chord_test.py
=> 15
結果を確認するとわかりますが add タスクがすべて実行されたあとに total タスクが実行されていることがわかります
[2020-06-02 15:59:52,309: INFO/MainProcess] Received task: sub_tasks.add[18d99f43-02bf-4fac-a051-20ad627eaa73]
[2020-06-02 15:59:52,312: INFO/ForkPoolWorker-2] Task sub_tasks.add[18d99f43-02bf-4fac-a051-20ad627eaa73] succeeded in 0.0014969600000256378s: 3
[2020-06-02 15:59:52,313: INFO/MainProcess] Received task: sub_tasks.add[262d1e52-a80f-49f4-a486-5dd43b24866d]
[2020-06-02 15:59:52,316: INFO/ForkPoolWorker-2] Task sub_tasks.add[262d1e52-a80f-49f4-a486-5dd43b24866d] succeeded in 0.001473454999995738s: 3
[2020-06-02 15:59:52,317: INFO/MainProcess] Received task: sub_tasks.add[e0d01505-7791-4336-aeee-75dad07babd6]
[2020-06-02 15:59:52,320: INFO/MainProcess] Received task: sub_tasks.add[f9bd8bc1-054c-4efb-838e-80f69d3fe128]
[2020-06-02 15:59:52,320: INFO/ForkPoolWorker-2] Task sub_tasks.add[e0d01505-7791-4336-aeee-75dad07babd6] succeeded in 0.0011679110000102355s: 3
[2020-06-02 15:59:52,323: INFO/MainProcess] Received task: sub_tasks.add[600bc612-8d29-4950-aaa1-d019b26176d8]
[2020-06-02 15:59:52,325: INFO/ForkPoolWorker-4] Task sub_tasks.add[600bc612-8d29-4950-aaa1-d019b26176d8] succeeded in 0.001488192000010713s: 3
[2020-06-02 15:59:52,362: INFO/ForkPoolWorker-2] Task sub_tasks.add[f9bd8bc1-054c-4efb-838e-80f69d3fe128] succeeded in 0.038503546000015376s: 3
[2020-06-02 15:59:52,363: INFO/MainProcess] Received task: sub_tasks.total[0b704079-30b5-4a00-9083-8027f010603e]
[2020-06-02 15:59:52,365: INFO/ForkPoolWorker-2] Task sub_tasks.total[0b704079-30b5-4a00-9083-8027f010603e] succeeded in 0.0006079719999831923s: 15
map を試してみる
ここから紹介する map と starmap はタスクに対して実行する機能です
map は引数が 1 つのタスクに対して引数で与えた Iteratable な値を順番に実行します
from sub_tasks import total
ret = total.map([(1, 2, 3, 4, 5)]).apply_async()
print(ret.get())
pipenv run python3 map_test.py
=> [15]
starmap を試してみる
map が単一の引数のタスクに対する命令でしたが starmap は引数が複数あるタスクに対して実行できます
from sub_tasks import add
ret = add.starmap([(1, 2), (3, 4)]).apply_async()
print(ret.get())
pipenv run python3 starmap_test.py
=> [3, 7]
chunks を試してみる
chunks は与えられた Iteratable な引数から実行するべきタスク数を算出し最後に与えられた引数分エンキューするジョブを分割します
from sub_tasks import add
ret = add.chunks([(1, 2), (3, 4), (5, 6), (7, 8), (9, 10), (11, 12)], 2).apply_async()
print(ret.get())
# ret = add.starmap([(1, 2), (3, 4), (5, 6), (7, 8), (9, 10), (11, 12)]).apply_async()
# print(ret.get())
pipenv run python3 starmap_test.py
=> [[3, 7], [11, 15], [19, 23]]
結果を見るとわかるのですが 1 つのタスクの結果としてではなく 3 つのタスクの結果として受け取れます (2 つずつに分割したため)
ちなみに starmap 側を実行するとわかりますが 1 つのタスクの結果としてすべての結果が受け取れます
=> [3, 7, 11, 15, 19, 23]
最後に
celery の Canvas 機能を試してみました
chain や group, chord と言った便利なメソッドとタスクを組み合わせることでワークフローを実現することができます
map, starmap, chunks はワークフローとは少し毛色が違いますがタスクを反復的に実行したい場合に使えるメソッドです
Canvas は 1 つのシンプルなタスクのみを実行する場合には気にする必要はありませんが複数のタスクを組み合わせて 1 つの処理にする場合には必須な機能だと思います
参考サイト