2018年5月8日火曜日

conductor のサンプルワークフロー kitchensink を完走してみた

概要

前回 conductor に入門した際にサンプルのワークフローである kitchensink を使ってみました
ただ、すべてのタスクを COMPLETED にしていません
なので、今回はすべてのタスクを COMPLETED にしてみました
クライアントは Python SDK を使っています

環境

  • macOS 10.13.4
  • conductor 1.9.0
  • docker 18.04.0-ce
  • Python 3.6.4

ワークフローの定義を追加する

docker で動作させている場合既存の kitchensink は必ず失敗します
なぜなら ElasticSearch に問い合わせる際に localhost を見ているため conductor_server のコンテナから ElasticSearch にアクセスできません

  • vim docker/docker-compose.yaml
version: '2'
services:
  conductor-server:
    environment:
      - CONFIG_PROP=config.properties
    image: conductor:server
    build:
      context: ../
      dockerfile: docker/server/Dockerfile
    ports:
      - 8080:8080
    links:
      - elasticsearch:es
      - dynomite:dyno1

  conductor-ui:
    environment:
      - WF_SERVER=http://conductor-server:8080/api/
    image: conductor:ui
    build:
      context: ../
      dockerfile: docker/ui/Dockerfile
    ports:
      - 5000:5000
    links:
      - conductor-server

  dynomite:
    image: v1r3n/dynomite

  elasticsearch:
    image: elasticsearch:2.4
    ports:
      - 9200:9200

最後の ports の定義を追加しているだけです
これでホストマシン経由で ElasticSearch にアクセスできるようにします
docker-compose.yaml で links を使っているので、そこで指定していコンテナ名を指定しても OK だと思います

これで docker-compose up したあとにタスク定義を新規で追加します
タスク定義を追加するスクリプトは以下の通りです
また、Python SDK が使える環境の構築はこちらを参考にしてください

  • cd client/python
  • vim register_workflow.py
from conductor import conductor

def main():
    tc = conductor.MetadataClient('http://localhost:8080/api')
    wfdObj = {
      "createTime": 1525651296485,
      "name": "my_kitchensink",
      "description": "kitchensink workflow",
      "version": 1,
      "tasks": [
        {
          "name": "task_1",
          "taskReferenceName": "task_1",
          "inputParameters": {
            "mod": "${workflow.input.mod}",
            "oddEven": "${workflow.input.oddEven}",
            "env": {
              "taskId": "${CPEWF_TASK_ID}",
              "workflowId": "${HOSTNAME}"
            }
          },
          "type": "SIMPLE",
          "startDelay": 0
        },
        {
          "name": "event_task",
          "taskReferenceName": "event_0",
          "inputParameters": {
            "mod": "${workflow.input.mod}",
            "oddEven": "${workflow.input.oddEven}"
          },
          "type": "EVENT",
          "startDelay": 0,
          "sink": "conductor"
        },
        {
          "name": "dyntask",
          "taskReferenceName": "task_2",
          "inputParameters": {
            "taskToExecute": "${workflow.input.task2Name}"
          },
          "type": "DYNAMIC",
          "dynamicTaskNameParam": "taskToExecute",
          "startDelay": 0
        },
        {
          "name": "oddEvenDecision",
          "taskReferenceName": "oddEvenDecision",
          "inputParameters": {
            "oddEven": "${task_2.output.oddEven}"
          },
          "type": "DECISION",
          "caseValueParam": "oddEven",
          "decisionCases": {
            "0": [
              {
                "name": "task_4",
                "taskReferenceName": "task_4",
                "inputParameters": {
                  "mod": "${task_2.output.mod}",
                  "oddEven": "${task_2.output.oddEven}"
                },
                "type": "SIMPLE",
                "startDelay": 0
              },
              {
                "name": "dynamic_fanout",
                "taskReferenceName": "fanout1",
                "inputParameters": {
                  "dynamicTasks": "${task_4.output.dynamicTasks}",
                  "input": "${task_4.output.inputs}"
                },
                "type": "FORK_JOIN_DYNAMIC",
                "dynamicForkTasksParam": "dynamicTasks",
                "dynamicForkTasksInputParamName": "input",
                "startDelay": 0
              },
              {
                "name": "dynamic_join",
                "taskReferenceName": "join1",
                "type": "JOIN",
                "startDelay": 0
              }
            ],
            "1": [
              {
                "name": "fork_join",
                "taskReferenceName": "forkx",
                "type": "FORK_JOIN",
                "forkTasks": [
                  [
                    {
                      "name": "task_10",
                      "taskReferenceName": "task_10",
                      "type": "SIMPLE",
                      "startDelay": 0
                    },
                    {
                      "name": "sub_workflow_x",
                      "taskReferenceName": "wf3",
                      "inputParameters": {
                        "mod": "${task_1.output.mod}",
                        "oddEven": "${task_1.output.oddEven}"
                      },
                      "type": "SUB_WORKFLOW",
                      "startDelay": 0,
                      "subWorkflowParam": {
                        "name": "sub_flow_1",
                        "version": 1
                      }
                    }
                  ],
                  [
                    {
                      "name": "task_11",
                      "taskReferenceName": "task_11",
                      "type": "SIMPLE",
                      "startDelay": 0
                    },
                    {
                      "name": "sub_workflow_x",
                      "taskReferenceName": "wf4",
                      "inputParameters": {
                        "mod": "${task_1.output.mod}",
                        "oddEven": "${task_1.output.oddEven}"
                      },
                      "type": "SUB_WORKFLOW",
                      "startDelay": 0,
                      "subWorkflowParam": {
                        "name": "sub_flow_1",
                        "version": 1
                      }
                    }
                  ]
                ],
                "startDelay": 0
              },
              {
                "name": "join",
                "taskReferenceName": "join2",
                "type": "JOIN",
                "startDelay": 0,
                "joinOn": [
                  "wf3",
                  "wf4"
                ]
              }
            ]
          },
          "startDelay": 0
        },
        {
          "name": "search_elasticsearch",
          "taskReferenceName": "get_es_1",
          "inputParameters": {
            "http_request": {
              "uri": "http://172.21.0.1:9200/conductor/_search?size=10",
              "method": "GET"
            }
          },
          "type": "HTTP",
          "startDelay": 0
        },
        {
          "name": "task_30",
          "taskReferenceName": "task_30",
          "inputParameters": {
            "statuses": "${get_es_1.output..status}",
            "workflowIds": "${get_es_1.output..workflowId}"
          },
          "type": "SIMPLE",
          "startDelay": 0
        }
      ],
      "outputParameters": {
        "statues": "${get_es_1.output..status}",
        "workflowIds": "${get_es_1.output..workflowId}"
      },
      "schemaVersion": 2
    }
    ret = tc.createWorkflowDef(wfdObj)
    print(ret)

if __name__ == '__main__':
    main()

長ったらしいですが既存の kitchensink の JSON をほぼ使い回しています
変更しているのは search_elasticsearch タスクの inputParameters -> http_request -> url の部分です
先ほど ElasticSearch をホストマシン経由でアクセスできるようにしたので localhost だった部分をホストマシンの IP に変更しています
これでワークフローを追加してあげましょう

  • python3 register_workflow.py

すると以下のようにワークフローが追加されると思います
kitchensink_goal1.png

作成したワークフローを開始する

  • vim start_workflow.py
from conductor import conductor

def main():
    wfc = conductor.WorkflowClient('http://localhost:8080/api')
    wfName = 'my_kitchensink'
    input = { "task2Name": "task_5" }
    workflowId = wfc.startWorkflow(wfName, input, 1, None)
    print(workflowId)

if __name__ == '__main__':
    main()
  • python3 start_workflow.py

これで新規で作成した my_kitchensink ワークフローが RUNNING になります
kitchensink_goal2.png

workflowInstanceId は後で使うのでメモしておきます

各タスクを更新していく

あとはここから各タスクを更新していくだけです
ステータスを更新対象のタスクの taskId は本来であればポーリングして取得しますが今回は UI から取得して設定しています

task_1 を COMPLETED にする

  • vim update_task1.py
from conductor import conductor

def main():
    tc = conductor.TaskClient('http://localhost:8080/api')
    taskObj = {
      "taskId": "a9a38752-7382-4b0b-a1ca-2200636af57c",
      "workflowInstanceId": "13d73f0d-202a-4b54-a11c-b44dbe0893c3",
      "status": "COMPLETED",
      "outputData": {
      }
    }
    ret = tc.updateTask(taskObj)
    print(ret)

if __name__ == '__main__':
    main()

こんな感じになります
kitchensink_goal3.png

task_2 を更新する

続けて task_2 を更新します
コードは先ほどの taskId の部分を変更するだけです
実行すると以下のようになります
kitchensink_goal4.png

search_elasticsearch の部分は自動で COMPLETED になるはずです
もしここで失敗する場合は以下のエラーになっていると思います

Failed to invoke http task due to: com.sun.jersey.api.client.ClientHandlerException: java.net.ConnectException: Connection refused (Connection refused)

これは冒頭で説明したとおり conductor_server から ElasticServer にアクセスできないのが原因なので docker-compose の設定やワークフローの定義の IP などが間違っていないか確認してみてください

また search_elasticsearch の inputParameters の定義は以下のようになっています
"type": "HTTP" にし http_request というパラメータを使うとタスクが SCHEDULED になったときに指定の uri に自動でリクエストを送信してくれるようです
また、そのレスポンスを outputData に自動で格納してくれるようです

"inputParameters": {
  "http_request": {
    "uri": "http://172.21.0.1:9200/conductor/_search?size=10",
    "method": "GET"
  }
}

task_30 を更新する

あとは最後のタスクを更新すれば OK です
これも先ほどのコードで taskId の部分だけ書き換えれば OK です
完了すると以下のようになります
kitchensink_goal5.png

ワークフローのステータスも COMPLETED に変わっています
これで最後まで完走できました

分岐させるには

無事完走はできましたが実は分岐の部分を完全にスルーしています
ちゃんと分岐させるには task_2 を更新する際に以下の用に outputData を設定します

from conductor import conductor

def main():
    tc = conductor.TaskClient('http://localhost:8080/api')
    taskObj = {
      "taskId": "f707da00-5fec-462b-ac07-1f2e2205866d",
      "workflowInstanceId": "13d73f0d-202a-4b54-a11c-b44dbe0893c3",
      "status": "COMPLETED",
      "outputData": {
        "oddEven": 0
      }
    }
    ret = tc.updateTask(taskObj)
    print(ret)

if __name__ == '__main__':
    main()

分岐を制御している oddEvenDecision のタスクを見るとわかりますが条件分岐の判定を以下のように定義しています

"inputParameters": {
  "oddEven": "${task_2.output.oddEven}"
}

task_2 の output (outputData) の oddEvent というキーの値を見ています
なのでこれを設定してあげることで分岐に入ることができます
kitchensink_goal6.png

あとは同様に task_4 を更新してあげれば次のタスクに進むことができるようになります
ただなぜか Input to the dynamically forked tasks is not a map -> expecting a map of K,V but found null のエラーになり先に進めませんでした
おそらく FORK_JOIN_DYNAMIC タスクの扱い方がおかしいんだと思います

最後に

kitchensink のタスクを完走してみました
とりあえず完走はできましたが一部動作しなかったタスクもありました
FORK_JOIN_DYNAMIC など、タスクの種類は他にもありタスクをうまく使いこなせないと conductor は使いこなせないと思います

参考サイト

0 件のコメント:

コメントを投稿