概要
前回 conductor のインストールを行いました
今回は Python SDK を使ってワーカーを作成しました
環境
- macOS 10.13.4
- conductor 1.9.0
- docker 18.04.0-ce
ライブラリインストール
- git clone https://github.com/Netflix/conductor.git
- cd client/python
pip3 install ./
また pypi で公開されていないようなので自分でビルドしてインストールする必要があります
client/python
に setup.py があるのでそれを利用します
https://github.com/Netflix/conductor/issues/273
ワークフロースタート
- vim start_workflow.py
from conductor import conductor
def main():
wfc = conductor.WorkflowClient('http://localhost:8080/api')
wfName = 'kitchensink'
input = { "task2Name": "task_5" }
workflowId = wfc.startWorkflow(wfName, input, 1, None)
print(workflowId)
if __name__ == '__main__':
main()
- python3 start_workflow.py
ポーリングする
- vim polling_task.py
from conductor import conductor
def main():
tc = conductor.TaskClient('http://localhost:8080/api')
taskType = 'task_1'
workerId = 'python_1'
ret = tc.pollForTask(taskType, workerId)
print(ret)
if __name__ == '__main__':
main()
- python3 polling_task.py
ここでステータスを見て IN_PROGRESS
なら他のワーカーが握っているので処理をしないなど分岐条件を記載します
また、タスクの workerId を python_1
が設定されておりどのワーカーが処理しているかわかるようになっています
ステータスを更新する
- vim update_task.py
from conductor import conductor
def main():
tc = conductor.TaskClient('http://localhost:8080/api')
taskObj = {
"taskId": "aa33aaca-1ffd-4852-ab37-00ec140cdd1d",
"workflowInstanceId": "f16446f1-eb5f-4b99-b631-68bce95e3146",
"status": "COMPLETED",
"output": {
"mod": 5,
"taskToExecute": "task_1",
"oddEven": 0,
"dynamicTasks": [
{
"name": "task_1",
"taskReferenceName": "task_1_1",
"type": "SIMPLE"
},
{
"name": "sub_workflow_4",
"taskReferenceName": "wf_dyn",
"type": "SUB_WORKFLOW",
"subWorkflowParam": {
"name": "sub_flow_1"
}
}
],
"inputs": {
"task_1_1": {},
"wf_dyn": {}
}
}
}
ret = tc.updateTask(taskObj)
print(ret)
if __name__ == '__main__':
main()
- python3 update_task.py
taskId と workflowInstanceId が必須です
taskId はポーリングしたときに取得できます
"status": "COMPLETED"
にすることでタスクを完了のステータスにすることができます
再度 IN_PROGRESS
にしようとしてもできません
レスポンスは None が返ってきます
見て分かる通り実はただ POST /tasks
の JSON をそのまま渡しているだけになります
なので、この JSON の各パラメータの意味をしっかり理解する必要があります
おまけ1 ワークフローを削除する
- vim remove_workflow.py
from conductor import conductor
def main():
wfc = conductor.WorkflowClient('http://localhost:8080/api')
wfId = '7257b875-2646-458b-8ab2-8708cf9e8f0f'
ret = wfc.removeWorkflow(wfId, False)
print(ret)
if __name__ == '__main__':
main()
これは執筆時点での master では動作しませんがいずれできるようになると思います
おまけ2 swagger-ui を表示する
http://localhost:8080/?url=localhost:8080
でいけます
各 API の JSON のパラメータの説明などはこれを見るのが手っ取り早いと思います
最後に
conductor の Python SDK を使ってみました
まだまだ開発中という感じがあってドキュメントも少なく始めるにはまだ敷居が高そうです
直接 client/python/conductor/conductor.py
などのコードを眺める or swagger-ui を見るしかありません
JSON パラメータの説明もほぼ無いので try & error で用途を探るしかなさそうです
SDK であれば Java のほうが開発は進んでいそうです
今回紹介したメソッドも全 API の一部でしかありません
他にもたくさんの API があるので興味があれば試してみてください
0 件のコメント:
コメントを投稿