2018年5月9日水曜日

conductor の Dynamic Fork を試してみる

概要

前回 kitchensink を完走してみました
が、FORK_JOIN_DYNAMIC のタスクの実行に失敗しました
今回は動かすことに成功したのでその方法を紹介します

Dynamic Fork はその名の通り動的にタスクを動かすことができるタスクです
例えばある処理をした際にその結果に応じて動かすタスクを動的に変えたい場合があると思います
Dynamic Fork はそんなときに使えるタスクになります

環境

  • macOS 10.13.4
  • conductor 1.9.0
  • docker 18.04.0-ce
  • Python 3.6.4

Dynamic Fork させてみる

まず Dynamic Fork させる前のワークフローの状態は以下のようになります
dynamic_fork1.png

Dynamic Fork のタスクは fanout1 になります
ここでタスクを動的に作るために task_4 の outputData の中に Dynamic Fork させるためのデータを格納する必要があります

fanout1 が Dynamic Fork するために必要なパラメータは以下のようになっています

{
  "name": "dynamic_fanout",
  "taskReferenceName": "fanout1",
  "inputParameters": {
    "dynamicTasks": "${task_4.output.dynamicTasks}",
    "input": "${task_4.output.inputs}"
  },
  "type": "FORK_JOIN_DYNAMIC",
  "dynamicForkTasksParam": "dynamicTasks",
  "dynamicForkTasksInputParamName": "input",
  "startDelay": 0
}

Dynamic Fork をするには type を FORK_JOIN_DYNAMIC にし、かつ dynamicForkTasksParamdynamicForkTasksInputParamName というパラメータの指定が必要です
今回この 2 つそれぞれ dynamicTasksinput と指定しています
このパラメータがどこを指しているかというと inputParameters のキーの値になります
inputParameters のこの値を使うというのを指定する感じになります

そしてそのキーに設定される値は何かというとそれぞれ ${task_4.output.dynamicTasks}${task_4.output.inputs} になります
これが何かというと task_4 の outputData に設定されている値になります

ということを踏まえて taks_4 のステータスを COMPLETED に変更してみます

  • vim update_task4.py
from conductor import conductor

def main():
    tc = conductor.TaskClient('http://localhost:8080/api')
    taskObj = {
      "taskId": "457a1546-f9fd-4012-88cf-25555913742e",
      "workflowInstanceId": "13d73f0d-202a-4b54-a11c-b44dbe0893c3",
      "status": "COMPLETED",
      "outputData": 
        {
          "inputs": {
            "task_5": {
              "params": {
                "recipe": "jpg"
              }
            }
        },
        "dynamicTasks": [
          {
            "name": "task_5",
              "taskReferenceName": "task_5",
              "type": "SIMPLE"
          }
        ]
      }
    }
    ret = tc.updateTask(taskObj)
    print(ret)

if __name__ == '__main__':
    main()

taskId と workflowInstanceId は適宜変更してください
先ほど fanout1 内で指定した inputsdynamicTasks を outputData に指定します
そして dynamicTasks には動的に動かしたいタスクを指定します
当然ですがここで指定するタスクは既に conductor に登録しておく必要があります
もし存在しないタスクを指定するとワークフローがエラーとなります
そして inputs 側には動的に動かすタスクに対して与えるパラメータを指定することができます
今回の場合は task_5 に対して "recipe": "jpg" という値を渡すことになります

また変数名が複数形であることからもわかるように動かすタスクは複数動かすことが可能です

これで task_4 を更新するとワークフローが以下のようになると思います
dynamic_fork2.png

task_5 が突如出現しました
あとはこれまで通りポーリング -> タスクの更新とすれば join1 につながり、その後 search_elasticsearch のタスクに流れます

もし複数のタスクを動かしたのであれば、それらすべてのタスクが COMPLETED になるまで join1 には進みません
これが Dynamic Fork の仕組みになります

最後に

conductor の Dynamic Fork の仕組みを試してみました
冒頭にも説明しましたが、動かすタスクの数が決まっていない場合や動かすタスクの種類が決まっていない場合などに使えます

今回も kitchensink のサンプルを使っていますが右側の分岐は普通の Fork タスクの流れになります
こちらは始めからタスクが 2 つだけ Fork していることがわかると思います

公式のドキュメントにもありましたが Dynamic Fork を使う場合には必ず Join タスクも含めるようにしてください

参考サイト

0 件のコメント:

コメントを投稿