概要
前回 conductor に入門した際にサンプルのワークフローである kitchensink を使ってみました
ただ、すべてのタスクを COMPLETED にしていません
なので、今回はすべてのタスクを COMPLETED にしてみました
クライアントは Python SDK を使っています
環境
- macOS 10.13.4
- conductor 1.9.0
- docker 18.04.0-ce
- Python 3.6.4
ワークフローの定義を追加する
docker で動作させている場合既存の kitchensink は必ず失敗します
なぜなら ElasticSearch に問い合わせる際に localhost を見ているため conductor_server
のコンテナから ElasticSearch にアクセスできません
- vim docker/docker-compose.yaml
version: '2'
services:
conductor-server:
environment:
- CONFIG_PROP=config.properties
image: conductor:server
build:
context: ../
dockerfile: docker/server/Dockerfile
ports:
- 8080:8080
links:
- elasticsearch:es
- dynomite:dyno1
conductor-ui:
environment:
- WF_SERVER=http://conductor-server:8080/api/
image: conductor:ui
build:
context: ../
dockerfile: docker/ui/Dockerfile
ports:
- 5000:5000
links:
- conductor-server
dynomite:
image: v1r3n/dynomite
elasticsearch:
image: elasticsearch:2.4
ports:
- 9200:9200
最後の ports の定義を追加しているだけです
これでホストマシン経由で ElasticSearch にアクセスできるようにします
docker-compose.yaml で links を使っているので、そこで指定していコンテナ名を指定しても OK だと思います
これで docker-compose up したあとにタスク定義を新規で追加します
タスク定義を追加するスクリプトは以下の通りです
また、Python SDK が使える環境の構築はこちらを参考にしてください
- cd client/python
- vim register_workflow.py
from conductor import conductor
def main():
tc = conductor.MetadataClient('http://localhost:8080/api')
wfdObj = {
"createTime": 1525651296485,
"name": "my_kitchensink",
"description": "kitchensink workflow",
"version": 1,
"tasks": [
{
"name": "task_1",
"taskReferenceName": "task_1",
"inputParameters": {
"mod": "${workflow.input.mod}",
"oddEven": "${workflow.input.oddEven}",
"env": {
"taskId": "${CPEWF_TASK_ID}",
"workflowId": "${HOSTNAME}"
}
},
"type": "SIMPLE",
"startDelay": 0
},
{
"name": "event_task",
"taskReferenceName": "event_0",
"inputParameters": {
"mod": "${workflow.input.mod}",
"oddEven": "${workflow.input.oddEven}"
},
"type": "EVENT",
"startDelay": 0,
"sink": "conductor"
},
{
"name": "dyntask",
"taskReferenceName": "task_2",
"inputParameters": {
"taskToExecute": "${workflow.input.task2Name}"
},
"type": "DYNAMIC",
"dynamicTaskNameParam": "taskToExecute",
"startDelay": 0
},
{
"name": "oddEvenDecision",
"taskReferenceName": "oddEvenDecision",
"inputParameters": {
"oddEven": "${task_2.output.oddEven}"
},
"type": "DECISION",
"caseValueParam": "oddEven",
"decisionCases": {
"0": [
{
"name": "task_4",
"taskReferenceName": "task_4",
"inputParameters": {
"mod": "${task_2.output.mod}",
"oddEven": "${task_2.output.oddEven}"
},
"type": "SIMPLE",
"startDelay": 0
},
{
"name": "dynamic_fanout",
"taskReferenceName": "fanout1",
"inputParameters": {
"dynamicTasks": "${task_4.output.dynamicTasks}",
"input": "${task_4.output.inputs}"
},
"type": "FORK_JOIN_DYNAMIC",
"dynamicForkTasksParam": "dynamicTasks",
"dynamicForkTasksInputParamName": "input",
"startDelay": 0
},
{
"name": "dynamic_join",
"taskReferenceName": "join1",
"type": "JOIN",
"startDelay": 0
}
],
"1": [
{
"name": "fork_join",
"taskReferenceName": "forkx",
"type": "FORK_JOIN",
"forkTasks": [
[
{
"name": "task_10",
"taskReferenceName": "task_10",
"type": "SIMPLE",
"startDelay": 0
},
{
"name": "sub_workflow_x",
"taskReferenceName": "wf3",
"inputParameters": {
"mod": "${task_1.output.mod}",
"oddEven": "${task_1.output.oddEven}"
},
"type": "SUB_WORKFLOW",
"startDelay": 0,
"subWorkflowParam": {
"name": "sub_flow_1",
"version": 1
}
}
],
[
{
"name": "task_11",
"taskReferenceName": "task_11",
"type": "SIMPLE",
"startDelay": 0
},
{
"name": "sub_workflow_x",
"taskReferenceName": "wf4",
"inputParameters": {
"mod": "${task_1.output.mod}",
"oddEven": "${task_1.output.oddEven}"
},
"type": "SUB_WORKFLOW",
"startDelay": 0,
"subWorkflowParam": {
"name": "sub_flow_1",
"version": 1
}
}
]
],
"startDelay": 0
},
{
"name": "join",
"taskReferenceName": "join2",
"type": "JOIN",
"startDelay": 0,
"joinOn": [
"wf3",
"wf4"
]
}
]
},
"startDelay": 0
},
{
"name": "search_elasticsearch",
"taskReferenceName": "get_es_1",
"inputParameters": {
"http_request": {
"uri": "http://172.21.0.1:9200/conductor/_search?size=10",
"method": "GET"
}
},
"type": "HTTP",
"startDelay": 0
},
{
"name": "task_30",
"taskReferenceName": "task_30",
"inputParameters": {
"statuses": "${get_es_1.output..status}",
"workflowIds": "${get_es_1.output..workflowId}"
},
"type": "SIMPLE",
"startDelay": 0
}
],
"outputParameters": {
"statues": "${get_es_1.output..status}",
"workflowIds": "${get_es_1.output..workflowId}"
},
"schemaVersion": 2
}
ret = tc.createWorkflowDef(wfdObj)
print(ret)
if __name__ == '__main__':
main()
長ったらしいですが既存の kitchensink の JSON をほぼ使い回しています
変更しているのは search_elasticsearch
タスクの inputParameters
-> http_request
-> url
の部分です
先ほど ElasticSearch をホストマシン経由でアクセスできるようにしたので localhost だった部分をホストマシンの IP に変更しています
これでワークフローを追加してあげましょう
- python3 register_workflow.py
すると以下のようにワークフローが追加されると思います
作成したワークフローを開始する
- vim start_workflow.py
from conductor import conductor
def main():
wfc = conductor.WorkflowClient('http://localhost:8080/api')
wfName = 'my_kitchensink'
input = { "task2Name": "task_5" }
workflowId = wfc.startWorkflow(wfName, input, 1, None)
print(workflowId)
if __name__ == '__main__':
main()
- python3 start_workflow.py
これで新規で作成した my_kitchensink ワークフローが RUNNING になります
workflowInstanceId は後で使うのでメモしておきます
各タスクを更新していく
あとはここから各タスクを更新していくだけです
ステータスを更新対象のタスクの taskId は本来であればポーリングして取得しますが今回は UI から取得して設定しています
task_1 を COMPLETED にする
- vim update_task1.py
from conductor import conductor
def main():
tc = conductor.TaskClient('http://localhost:8080/api')
taskObj = {
"taskId": "a9a38752-7382-4b0b-a1ca-2200636af57c",
"workflowInstanceId": "13d73f0d-202a-4b54-a11c-b44dbe0893c3",
"status": "COMPLETED",
"outputData": {
}
}
ret = tc.updateTask(taskObj)
print(ret)
if __name__ == '__main__':
main()
こんな感じになります
task_2 を更新する
続けて task_2 を更新します
コードは先ほどの taskId の部分を変更するだけです
実行すると以下のようになります
search_elasticsearch
の部分は自動で COMPLETED になるはずです
もしここで失敗する場合は以下のエラーになっていると思います
Failed to invoke http task due to: com.sun.jersey.api.client.ClientHandlerException: java.net.ConnectException: Connection refused (Connection refused)
これは冒頭で説明したとおり conductor_server から ElasticServer にアクセスできないのが原因なので docker-compose の設定やワークフローの定義の IP などが間違っていないか確認してみてください
また search_elasticsearch の inputParameters
の定義は以下のようになっています
"type": "HTTP"
にし http_request
というパラメータを使うとタスクが SCHEDULED になったときに指定の uri に自動でリクエストを送信してくれるようです
また、そのレスポンスを outputData
に自動で格納してくれるようです
"inputParameters": {
"http_request": {
"uri": "http://172.21.0.1:9200/conductor/_search?size=10",
"method": "GET"
}
}
task_30 を更新する
あとは最後のタスクを更新すれば OK です
これも先ほどのコードで taskId の部分だけ書き換えれば OK です
完了すると以下のようになります
ワークフローのステータスも COMPLETED に変わっています
これで最後まで完走できました
分岐させるには
無事完走はできましたが実は分岐の部分を完全にスルーしています
ちゃんと分岐させるには task_2 を更新する際に以下の用に outputData
を設定します
from conductor import conductor
def main():
tc = conductor.TaskClient('http://localhost:8080/api')
taskObj = {
"taskId": "f707da00-5fec-462b-ac07-1f2e2205866d",
"workflowInstanceId": "13d73f0d-202a-4b54-a11c-b44dbe0893c3",
"status": "COMPLETED",
"outputData": {
"oddEven": 0
}
}
ret = tc.updateTask(taskObj)
print(ret)
if __name__ == '__main__':
main()
分岐を制御している oddEvenDecision
のタスクを見るとわかりますが条件分岐の判定を以下のように定義しています
"inputParameters": {
"oddEven": "${task_2.output.oddEven}"
}
task_2 の output (outputData) の oddEvent というキーの値を見ています
なのでこれを設定してあげることで分岐に入ることができます
あとは同様に task_4 を更新してあげれば次のタスクに進むことができるようになります
ただなぜか Input to the dynamically forked tasks is not a map -> expecting a map of K,V but found null
のエラーになり先に進めませんでした
おそらく FORK_JOIN_DYNAMIC
タスクの扱い方がおかしいんだと思います
最後に
kitchensink のタスクを完走してみました
とりあえず完走はできましたが一部動作しなかったタスクもありました
FORK_JOIN_DYNAMIC
など、タスクの種類は他にもありタスクをうまく使いこなせないと conductor は使いこなせないと思います
0 件のコメント:
コメントを投稿