2020年6月6日土曜日

celery でワークフローやパイプラインを実現するための chain や group を試してみた

概要

celery でワークフローを実現するための仕組みとして Canvas という仕組みがあります
これはワークフローを実現するための機能で様々なメソッドが提供されています
今回は chain や group と言ったワークフローを実現するためのメソッドを試してみました

環境

  • macOS 10.15.5
  • Python 3.7.3
    • celery 4.4.3

chain を試す

まずは chain を試してみます
chain は簡単に言えば前に実行したタスクの結果を使って次のタスクを実行することができる機能です

まずはいくつかのタスクを作成します
ここで定義したタスクを chain でつなぎ合わせてみます

  • vim sub_tasks.py
from celery import Celery

app = Celery('sub_tasks', backend='redis://localhost', broker='redis://localhost')

@app.task
def add(x, y):
    return x + y

@app.task
def multi(x, y):
    return x * y

ワーカーとして起動しておきましょう

  • pipenv run celery -A sub_tasks worker -l info

次にタスクを chain でつなぎ合わせ実際に実行するスクリプトを作成します

  • vim chain_test.py
from celery import chain
from sub_tasks import add, multi

tasks = chain(add.s(1, 2), multi.s(10)).apply_async()

print(tasks.get())

ポイントは multi というタスクの引数が 1 つになっているところです
これは add の結果 (3) を 1 つ目の引数として渡すためで multi の引数を 2 つにしてしまうと引数が合わないよというエラーになってしまいます
あとは実行すればワーカーで実行された 2 つのタスクの結果が返ってくると思います

  • pipenv run python3 chain_tasks.py

=> 30

[2020-06-02 15:37:59,798: INFO/ForkPoolWorker-2] Task sub_tasks.add[5e35ecc1-19e5-405a-b355-21578f1d85cd] succeeded in 0.03750605799999107s: 3 [2020-06-02 15:37:59,814: INFO/ForkPoolWorker-4] Task sub_tasks.multi[081a469e-03ea-4273-8884-2a43a4988dcb] succeeded in 0.014889055999987022s: 30

group を試す

group はまとめたタスクを並列で実行することができる機能です
先程の chain は前のタスクの結果に依存するため前のタスクが終わらないとダメですが group はすべてのタスクが並列に実行されます
sub_task.py は先程作成したものを使います

  • vim group_test.py
from celery import group
from sub_tasks import add, multi

tasks = group(add.s(1, 2), multi.s(10, 10)).apply_async()

print(tasks.get())

group の場合前のタスクの結果を参照しないのでちゃんとそれぞれのタスクの引数は 2 つ指定してあげます
これで実行すると各タスクで実行された結果が配列で取得できます

  • pipenv run python3 group_test.py

=> [3, 100]

[2020-06-02 15:41:21,307: INFO/ForkPoolWorker-4] Task sub_tasks.multi[129a3b43-ab45-4578-b659-f50c6bd8de5c] succeeded in 0.0007359960000030696s: 100 [2020-06-02 15:41:21,307: INFO/ForkPoolWorker-2] Task sub_tasks.add[c6d50e58-abb5-469f-8106-0fbe219cbe11] succeeded in 0.000807351000048584s: 3

chord を試してみる

コードと読みます
chord は group に似ていますが並列処理の最後に指定したコールバックメソッド用のタスクを実行することができます
sub_tasks.py を少し書き換えます

  • vim sub_tasks.py
from celery import Celery

app = Celery('sub_tasks', backend='redis://localhost', broker='redis://localhost')

@app.task
def add(x, y):
    return x + y

@app.task
def multi(x, y):
    return x * y

@app.task
def total(ary):
    return sum(ary)
  • vim chord_test.py
from celery import chord
from sub_tasks import add, multi, total

tasks = chord((add.s(1, 2) for i in range(5)), total.s()).apply_async()

print(tasks.get())

1 + 2 = 3 を 5 回繰り返して生成される配列に対してそれぞれの要素を足し合わせた結果 (15) を返します

  • pipenv run python3 chord_test.py

=> 15

結果を確認するとわかりますが add タスクがすべて実行されたあとに total タスクが実行されていることがわかります

[2020-06-02 15:59:52,309: INFO/MainProcess] Received task: sub_tasks.add[18d99f43-02bf-4fac-a051-20ad627eaa73] [2020-06-02 15:59:52,312: INFO/ForkPoolWorker-2] Task sub_tasks.add[18d99f43-02bf-4fac-a051-20ad627eaa73] succeeded in 0.0014969600000256378s: 3 [2020-06-02 15:59:52,313: INFO/MainProcess] Received task: sub_tasks.add[262d1e52-a80f-49f4-a486-5dd43b24866d] [2020-06-02 15:59:52,316: INFO/ForkPoolWorker-2] Task sub_tasks.add[262d1e52-a80f-49f4-a486-5dd43b24866d] succeeded in 0.001473454999995738s: 3 [2020-06-02 15:59:52,317: INFO/MainProcess] Received task: sub_tasks.add[e0d01505-7791-4336-aeee-75dad07babd6] [2020-06-02 15:59:52,320: INFO/MainProcess] Received task: sub_tasks.add[f9bd8bc1-054c-4efb-838e-80f69d3fe128] [2020-06-02 15:59:52,320: INFO/ForkPoolWorker-2] Task sub_tasks.add[e0d01505-7791-4336-aeee-75dad07babd6] succeeded in 0.0011679110000102355s: 3 [2020-06-02 15:59:52,323: INFO/MainProcess] Received task: sub_tasks.add[600bc612-8d29-4950-aaa1-d019b26176d8] [2020-06-02 15:59:52,325: INFO/ForkPoolWorker-4] Task sub_tasks.add[600bc612-8d29-4950-aaa1-d019b26176d8] succeeded in 0.001488192000010713s: 3 [2020-06-02 15:59:52,362: INFO/ForkPoolWorker-2] Task sub_tasks.add[f9bd8bc1-054c-4efb-838e-80f69d3fe128] succeeded in 0.038503546000015376s: 3 [2020-06-02 15:59:52,363: INFO/MainProcess] Received task: sub_tasks.total[0b704079-30b5-4a00-9083-8027f010603e] [2020-06-02 15:59:52,365: INFO/ForkPoolWorker-2] Task sub_tasks.total[0b704079-30b5-4a00-9083-8027f010603e] succeeded in 0.0006079719999831923s: 15

map を試してみる

ここから紹介する map と starmap はタスクに対して実行する機能です
map は引数が 1 つのタスクに対して引数で与えた Iteratable な値を順番に実行します

  • vim map_test.py
from sub_tasks import total

ret = total.map([(1, 2, 3, 4, 5)]).apply_async()
print(ret.get())
  • pipenv run python3 map_test.py

=> [15]

starmap を試してみる

map が単一の引数のタスクに対する命令でしたが starmap は引数が複数あるタスクに対して実行できます

  • vim starmap_test.py
from sub_tasks import add

ret = add.starmap([(1, 2), (3, 4)]).apply_async()
print(ret.get())
  • pipenv run python3 starmap_test.py

=> [3, 7]

chunks を試してみる

chunks は与えられた Iteratable な引数から実行するべきタスク数を算出し最後に与えられた引数分エンキューするジョブを分割します

  • vim chunks_test.py
from sub_tasks import add

ret = add.chunks([(1, 2), (3, 4), (5, 6), (7, 8), (9, 10), (11, 12)], 2).apply_async()
print(ret.get())
# ret = add.starmap([(1, 2), (3, 4), (5, 6), (7, 8), (9, 10), (11, 12)]).apply_async()
# print(ret.get())
  • pipenv run python3 starmap_test.py

=> [[3, 7], [11, 15], [19, 23]]

結果を見るとわかるのですが 1 つのタスクの結果としてではなく 3 つのタスクの結果として受け取れます (2 つずつに分割したため)

ちなみに starmap 側を実行するとわかりますが 1 つのタスクの結果としてすべての結果が受け取れます

=> [3, 7, 11, 15, 19, 23]

最後に

celery の Canvas 機能を試してみました
chain や group, chord と言った便利なメソッドとタスクを組み合わせることでワークフローを実現することができます
map, starmap, chunks はワークフローとは少し毛色が違いますがタスクを反復的に実行したい場合に使えるメソッドです

Canvas は 1 つのシンプルなタスクのみを実行する場合には気にする必要はありませんが複数のタスクを組み合わせて 1 つの処理にする場合には必須な機能だと思います

参考サイト

0 件のコメント:

コメントを投稿