概要
前回 kitchensink を完走してみました
が、FORK_JOIN_DYNAMIC
のタスクの実行に失敗しました
今回は動かすことに成功したのでその方法を紹介します
Dynamic Fork はその名の通り動的にタスクを動かすことができるタスクです
例えばある処理をした際にその結果に応じて動かすタスクを動的に変えたい場合があると思います
Dynamic Fork はそんなときに使えるタスクになります
環境
- macOS 10.13.4
- conductor 1.9.0
- docker 18.04.0-ce
- Python 3.6.4
Dynamic Fork させてみる
まず Dynamic Fork させる前のワークフローの状態は以下のようになります
Dynamic Fork のタスクは fanout1
になります
ここでタスクを動的に作るために task_4 の outputData の中に Dynamic Fork させるためのデータを格納する必要があります
fanout1 が Dynamic Fork するために必要なパラメータは以下のようになっています
{
"name": "dynamic_fanout",
"taskReferenceName": "fanout1",
"inputParameters": {
"dynamicTasks": "${task_4.output.dynamicTasks}",
"input": "${task_4.output.inputs}"
},
"type": "FORK_JOIN_DYNAMIC",
"dynamicForkTasksParam": "dynamicTasks",
"dynamicForkTasksInputParamName": "input",
"startDelay": 0
}
Dynamic Fork をするには type を FORK_JOIN_DYNAMIC
にし、かつ dynamicForkTasksParam
と dynamicForkTasksInputParamName
というパラメータの指定が必要です
今回この 2 つそれぞれ dynamicTasks
と input
と指定しています
このパラメータがどこを指しているかというと inputParameters
のキーの値になります
inputParameters
のこの値を使うというのを指定する感じになります
そしてそのキーに設定される値は何かというとそれぞれ ${task_4.output.dynamicTasks}
と ${task_4.output.inputs}
になります
これが何かというと task_4 の outputData
に設定されている値になります
ということを踏まえて taks_4 のステータスを COMPLETED に変更してみます
- vim update_task4.py
from conductor import conductor
def main():
tc = conductor.TaskClient('http://localhost:8080/api')
taskObj = {
"taskId": "457a1546-f9fd-4012-88cf-25555913742e",
"workflowInstanceId": "13d73f0d-202a-4b54-a11c-b44dbe0893c3",
"status": "COMPLETED",
"outputData":
{
"inputs": {
"task_5": {
"params": {
"recipe": "jpg"
}
}
},
"dynamicTasks": [
{
"name": "task_5",
"taskReferenceName": "task_5",
"type": "SIMPLE"
}
]
}
}
ret = tc.updateTask(taskObj)
print(ret)
if __name__ == '__main__':
main()
taskId と workflowInstanceId は適宜変更してください
先ほど fanout1 内で指定した inputs
と dynamicTasks
を outputData に指定します
そして dynamicTasks
には動的に動かしたいタスクを指定します
当然ですがここで指定するタスクは既に conductor に登録しておく必要があります
もし存在しないタスクを指定するとワークフローがエラーとなります
そして inputs
側には動的に動かすタスクに対して与えるパラメータを指定することができます
今回の場合は task_5 に対して "recipe": "jpg"
という値を渡すことになります
また変数名が複数形であることからもわかるように動かすタスクは複数動かすことが可能です
これで task_4 を更新するとワークフローが以下のようになると思います
task_5 が突如出現しました
あとはこれまで通りポーリング -> タスクの更新とすれば join1 につながり、その後 search_elasticsearch のタスクに流れます
もし複数のタスクを動かしたのであれば、それらすべてのタスクが COMPLETED になるまで join1 には進みません
これが Dynamic Fork の仕組みになります
最後に
conductor の Dynamic Fork の仕組みを試してみました
冒頭にも説明しましたが、動かすタスクの数が決まっていない場合や動かすタスクの種類が決まっていない場合などに使えます
今回も kitchensink のサンプルを使っていますが右側の分岐は普通の Fork タスクの流れになります
こちらは始めからタスクが 2 つだけ Fork していることがわかると思います
公式のドキュメントにもありましたが Dynamic Fork を使う場合には必ず Join タスクも含めるようにしてください
0 件のコメント:
コメントを投稿