2018年5月7日月曜日

conductor の Python SDK を使ってみた

概要

前回 conductor のインストールを行いました
今回は Python SDK を使ってワーカーを作成しました

環境

  • macOS 10.13.4
  • conductor 1.9.0
  • docker 18.04.0-ce

ライブラリインストール

また pypi で公開されていないようなので自分でビルドしてインストールする必要があります
client/python に setup.py があるのでそれを利用します
https://github.com/Netflix/conductor/issues/273

ワークフロースタート

  • vim start_workflow.py
from conductor import conductor

def main():
    wfc = conductor.WorkflowClient('http://localhost:8080/api')
    wfName = 'kitchensink'
    input = { "task2Name": "task_5" }
    workflowId = wfc.startWorkflow(wfName, input, 1, None)
    print(workflowId)

if __name__ == '__main__':
    main()
  • python3 start_workflow.py

ポーリングする

  • vim polling_task.py
from conductor import conductor

def main():
    tc = conductor.TaskClient('http://localhost:8080/api')
    taskType = 'task_1'
    workerId = 'python_1'
    ret = tc.pollForTask(taskType, workerId)
    print(ret)

if __name__ == '__main__':
    main()
  • python3 polling_task.py

ここでステータスを見て IN_PROGRESS なら他のワーカーが握っているので処理をしないなど分岐条件を記載します
また、タスクの workerId を python_1 が設定されておりどのワーカーが処理しているかわかるようになっています

ステータスを更新する

  • vim update_task.py
from conductor import conductor

def main():
    tc = conductor.TaskClient('http://localhost:8080/api')
    taskObj = {
      "taskId": "aa33aaca-1ffd-4852-ab37-00ec140cdd1d",
      "workflowInstanceId": "f16446f1-eb5f-4b99-b631-68bce95e3146",
      "status": "COMPLETED",
      "output": {
        "mod": 5,
        "taskToExecute": "task_1",
        "oddEven": 0,
        "dynamicTasks": [
          {
            "name": "task_1",
            "taskReferenceName": "task_1_1",
            "type": "SIMPLE"
          },
          {
            "name": "sub_workflow_4",
            "taskReferenceName": "wf_dyn",
            "type": "SUB_WORKFLOW",
            "subWorkflowParam": {
              "name": "sub_flow_1"
            }
          }
        ],
        "inputs": {
          "task_1_1": {},
          "wf_dyn": {}
        }
      }
    }
    ret = tc.updateTask(taskObj)
    print(ret)

if __name__ == '__main__':
    main()
  • python3 update_task.py

taskId と workflowInstanceId が必須です
taskId はポーリングしたときに取得できます
"status": "COMPLETED" にすることでタスクを完了のステータスにすることができます
再度 IN_PROGRESS にしようとしてもできません
レスポンスは None が返ってきます

見て分かる通り実はただ POST /tasks の JSON をそのまま渡しているだけになります
なので、この JSON の各パラメータの意味をしっかり理解する必要があります

おまけ1 ワークフローを削除する

  • vim remove_workflow.py
from conductor import conductor

def main():
    wfc = conductor.WorkflowClient('http://localhost:8080/api')
    wfId = '7257b875-2646-458b-8ab2-8708cf9e8f0f'
    ret = wfc.removeWorkflow(wfId, False)
    print(ret)

if __name__ == '__main__':
    main()

これは執筆時点での master では動作しませんがいずれできるようになると思います

おまけ2 swagger-ui を表示する

http://localhost:8080/?url=localhost:8080 でいけます
各 API の JSON のパラメータの説明などはこれを見るのが手っ取り早いと思います

最後に

conductor の Python SDK を使ってみました
まだまだ開発中という感じがあってドキュメントも少なく始めるにはまだ敷居が高そうです
直接 client/python/conductor/conductor.py などのコードを眺める or swagger-ui を見るしかありません
JSON パラメータの説明もほぼ無いので try & error で用途を探るしかなさそうです
SDK であれば Java のほうが開発は進んでいそうです

今回紹介したメソッドも全 API の一部でしかありません
他にもたくさんの API があるので興味があれば試してみてください

参考サイト

0 件のコメント:

コメントを投稿