概要
conductor はワークフローツールで Netflix が開発している OSS です
今回はインストールと挙動の確認までやってみました
ドキュメントがまだ少ないので不確かな情報が多いです
環境
- macOS 10.13.4
- conductor 1.9.0
- docker 18.04.0-ce
インストール
- git clone https://github.com/Netflix/conductor.git
- cd conductor
でコードが取得できたらまずはビルドします
docker を使ってビルドできるので docker を使います
docker build -f docker/server/Dockerfile.build -t conductor:server-build .
docker run -v $(pwd):/conductor conductor:server-build
docker build -f docker/server/Dockerfile -t conductor:server .
docker build -f docker/ui/Dockerfile -t conductor:ui .
これでサーバと UI のイメージが作成されます
あとはコンテナを起動すれば OK です
この 2 つでローカル上で動作サーバと UI を動作するための jar ファイルなどが生成されます
これを元にサーバと UI の docker イメージを作成します
cd conductor/dockerdocker-compose build
でビルドが完了したら
- cd docker
- docker-compose up -d
で UI とサーバを起動します
これで localhost:5000
にアクセスすると conductor の UI が表示されます
また localhost:8080
が API サーバのアクセスポイントになります
docker-compose を使わないのであれば
cd conductor/dockerdocker build -t conductor:server ./server
docker build -t conductor:ui ./ui
でビルドしてから up でも OK です
個別に run すると依存している elasticsearch と dynamite も手動で上げる必要があるので起動するときは docker-compose を素直に使うことをおすすめします
kitchensink というサンプルのワークフローを使って挙動を確認してみる
conductor server が起動すると kitchensink というワークフローが登録されているのが確認できます
まだワークフローだけで実際に動作するワーカーがないので手動 (curl) でワークフローを辿ってみたいと思います
まずワークフローを実行させましょう
実行は UI からはできず API から行います
UI はステータスの確認を行うのに使います
curl -X POST --header 'Content-Type: application/json' --header 'Accept: text/plain' 'http://localhost:8080/api/workflow/kitchensink' -d '{ "task2Name": "task_5" }'
これでワークフローが起動します
他のワークフローはいろいろ試した残骸なので無視してください
更にワークフローの詳細を確認すると task_1
というタスクが SCHEDULED
になっていることが確認できます
要するにワークフローが開始され一番初めのタスクが待ち状態になりました
待ち状態というのは具体的にいうとワーカーさんに「処理してください」という状態になったということです (おそらく)
task_1 の情報を取得しましょう
以下の API は所謂ポーリング処理になり実際はワーカーから定期的に投げられる処理になります
curl http://localhost:8080/api/tasks/poll/task_1
すると以下のような JSON が返ってくると思います
{
"taskType": "task_1",
"status": "IN_PROGRESS",
"inputData": {
"mod": null,
"oddEven": null,
"env": {
"taskId": "eb934c01-731f-48b2-a5e7-a9ee81162256",
"workflowId": "14c8631506a5"
}
},
"referenceTaskName": "task_1",
"retryCount": 0,
"seq": 1,
"pollCount": 1,
"taskDefName": "task_1",
"scheduledTime": 1524795601602,
"startTime": 1524795883025,
"endTime": 0,
"updateTime": 1524795883025,
"startDelayInSeconds": 0,
"retried": false,
"callbackFromWorker": true,
"responseTimeoutSeconds": 3600,
"workflowInstanceId": "54313f13-199c-497c-9732-843e20a86b67",
"taskId": "eb934c01-731f-48b2-a5e7-a9ee81162256",
"callbackAfterSeconds": 0,
"workflowTask": {
"name": "task_1",
"taskReferenceName": "task_1",
"inputParameters": {
"mod": "${workflow.input.mod}",
"oddEven": "${workflow.input.oddEven}",
"env": {
"taskId": "${CPEWF_TASK_ID}",
"workflowId": "${HOSTNAME}"
}
},
"type": "SIMPLE",
"startDelay": 0
},
"queueWaitTime": 281423,
"taskStatus": "IN_PROGRESS"
}
長ったらしいので自分も全部は理解していませんがタスクの状態や実行時間、入力情報などがあります
ポイントは status
でこれが IN_PROGRESS
になっているのがわかります
要するにワーカーが一度ポーリングするとステータスが IN_PROGRESS
になりどれかのワーカーに握られたタスクだとということを明示しています
本来はここでタスクに紐付いたロジックをワーカーが実行します
そして処理が完了したらタスクのステータスを完了にします
ここでは成功した状態にします
curl -H 'Content-Type:application/json' -H 'Accept:application/json' -X POST http://localhost:8080/api/tasks/ -d '
{
"taskId": "eb934c01-731f-48b2-a5e7-a9ee81162256",
"workflowInstanceId": "54313f13-199c-497c-9732-843e20a86b67",
"status": "COMPLETED",
"output": {
"mod": 5,
"taskToExecute": "task_1",
"oddEven": 0,
"dynamicTasks": [
{
"name": "task_1",
"taskReferenceName": "task_1_1",
"type": "SIMPLE"
},
{
"name": "sub_workflow_4",
"taskReferenceName": "wf_dyn",
"type": "SUB_WORKFLOW",
"subWorkflowParam": {
"name": "sub_flow_1"
}
}
],
"inputs": {
"task_1_1": {},
"wf_dyn": {}
}
}
}'
API 的には /tasks
に対して POST を投げているだけですがボディの JSON がやたら長ったらしいです
おそらくちゃんと運用するようになったら各パラメータが何を意味するかちゃんと理解する必要はあると思います
ここでは冒頭の workflowInstanceId
と taskId
を変更します
先ほどポーリングした際に task_1
から取得したものを設定してください
そしてリクエストすると task_1
のタスクが終了し次の処理に移行しているのがわかると思います
途中でイベントが挟まっていますがイベントは本来別の外部キューシステムなどにエンキューしたりするのができるようです
https://netflix.github.io/conductor/metadata/systask/#event
次のタスクは task_5
となっているので再度ポーリングするとステータスが SCHEDULED
-> IN_PROGRESS
に変わるのが確認できると思います
だいたいこんな感じで、タスクポーリング -> ワーカー処理 -> タスクにフィードバックという流れでフローが進んでいくようです
ことあとで更に Decision
タスクや Fork
タスクなどが出てくるようですがそれはまた別の機会に試してみたいと思います
最後に
Netflix/conductor の起動方法と簡単な使い方の確認をしてみました
本格的に使うにはワーカーが必要です
どうやら SDK として Java or Python があるようなので次回は Python SDK を使ってワーカーと連携させてみたいと思います
まだ利便性が見えてきているわけではないですが、感じとしては Amazon SQS のようなキューイングシステムになるのでワーカー側で結構頑張らないとダメな印象はあります
イベントパイプラインのような pub/sub システムではないのでその辺りの区別も必要かなと思います
0 件のコメント:
コメントを投稿