概要
conductor を使ってそれっぽいワークフローを作成してみました
ワークフローの代表的なユースケースである承認システムを想定しています
conductor のインストール方法や基本的な使い方はこれまでの記事を参考にしてください
環境
- macOS 10.13.4
- conductor 1.9.0
- docker 18.04.0-ce
- Python 3.6.4
必要なタスクを登録する
今回は 3 つのタスクを登録します
2 つは承認用のタスクで 1 つはエラー処理用のタスクとします
- vim create_task.py
from conductor import conductor
def main():
tc = conductor.MetadataClient('http://localhost:8080/api')
listOfTaskDefObj = [
{
"createTime": 1525672041,
"name": "approval_1",
"description": "Low level approval",
"retryCount": 1,
"timeoutSeconds": 0,
"timeoutPolicy": "TIME_OUT_WF",
"retryLogic": "FIXED",
"retryDelaySeconds": 60,
"responseTimeoutSeconds": 3600
},
{
"createTime": 1525672041,
"name": "approval_2",
"description": "High level approval",
"retryCount": 1,
"timeoutSeconds": 0,
"timeoutPolicy": "TIME_OUT_WF",
"retryLogic": "FIXED",
"retryDelaySeconds": 60,
"responseTimeoutSeconds": 3600
},
{
"createTime": 1525672041,
"name": "error_1",
"description": "Your approval has been denied",
"retryCount": 0,
"timeoutSeconds": 0,
"timeoutPolicy": "TIME_OUT_WF",
"retryLogic": "FIXED",
"retryDelaySeconds": 0,
"responseTimeoutSeconds": 0
}
]
ret = tc.registerTaskDefs(listOfTaskDefObj)
print(ret)
if __name__ == '__main__':
main()
これをベースにワークフローを作成していきます
作成済みのタスクの一覧を取得するには tc.getAllTaskDefs()
を使います
ワークフローを作成する
先ほど作成したタスクを用いてワークフローを作成します
vim create_workflow.py
from conductor import conductor
def main():
tc = conductor.MetadataClient('http://localhost:8080/api')
wfdObj = {
"createTime": 1525673798,
"name": "approval_workflow",
"description": "Get the money throught the approvals",
"version": 1,
"tasks": [
{
"name": "approval_1",
"taskReferenceName": "approval_1",
"inputParameters": {
"money": "${workflow.input.money}",
},
"type": "SIMPLE",
"startDelay": 0
},
{
"name": "check_1",
"taskReferenceName": "check_1",
"inputParameters": {
"status": "${approval_1.output.status}"
},
"type": "DECISION",
"caseValueParam": "status",
"decisionCases": {
"ng": [
{
"name": "error_1",
"taskReferenceName": "error_1",
"type": "SIMPLE",
"startDelay": 0
},
],
"ok": [
{
"name": "approval_2",
"taskReferenceName": "approval_2",
"inputParameters": {
"money": "${workflow.input.money}",
},
"type": "SIMPLE",
"startDelay": 0
},
{
"name": "check_2",
"taskReferenceName": "check_2",
"inputParameters": {
"status": "${approval_2.output.status}"
},
"type": "DECISION",
"caseValueParam": "status",
"decisionCases": {
"ng": [
{
"name": "error_1",
"taskReferenceName": "error_1",
"type": "SIMPLE",
"startDelay": 0
},
],
"ok": [
{
"name": "event_1",
"taskReferenceName": "event_1",
"type": "EVENT",
"startDelay": 0,
"sink": "conductor"
},
]
},
"startDelay": 0
}
]
},
"startDelay": 0
}
],
"outputParameters": {
"your_got_the_money": "${approval_2.output.money}"
},
"schemaVersion": 2
}
ret = tc.createWorkflowDef(wfdObj)
print(ret)
if __name__ == '__main__':
main()
これを実行すると以下のワークフローが作成されます
少し流れを解説します
全体の流れとしては 2 つの承認が両方共 OK ならば依頼した金額を得ることができるというワークフローになります
後ほど紹介しますが金額はワークフロー開始時に入力情報として指定します
もし NG になった場合は error_1
のタスクに入ります
ここでは強制的にステータスを FAILED
にします
なので、error_1
の流れに来た場合は最後までたどり着かずに必ずフローが失敗するようになっています
conductor の場合タスクを途中でぶった切って終了させるというタスクがないのでこのようにしています
最終的にワークフローが COMPLETED になった場合はいくら金額が取得できたのかを表示します
outputParameters
の your_got_the_money
がそれになります
あとは 2 つ目の承認が完了した際にイベントタスクを挟んでいます
今回は特にこのイベントに対する処理は書いていませんが、キューに情報が入るのでその情報を元に通知するなどの用途を考えています
だいたいの流れは上記となります
あとはそれぞれのタスクのワーカーを作成していきます
作成したワークフローを削除することは現状できないので、updateWorkflowDefs
を使って更新するようにしてください
ワークフローをスタートする
まずはワークフローをスタートします
先ほど説明したように申請する金額の情報を設定します
- vim start_workflow.py
from conductor import conductor
def main():
wfc = conductor.WorkflowClient('http://localhost:8080/api')
wfName = 'approval_workflow'
input = { "money": 1000 }
workflowId = wfc.startWorkflow(wfName, input, 1, None)
print(workflowId)
if __name__ == '__main__':
main()
これでワークフローの定義で金額を ${workflow.input.money}
で参照できるようになります
ワークフロースタートすると RUNNING の状態になります
approval_1 のワーカーを作成する
ワークフローをスタートすると 1 つ目のタスク approval_1
が SCHEDULED
のステータスになります
1 つ目の承認用ののワーカーを作成しましょう
- vim a1_worker.py
import sys
from conductor import conductor
def main(argv):
tc = conductor.TaskClient('http://localhost:8080/api')
taskType = 'approval_1'
workerId = 'approval_worker_1'
ret = tc.pollForTask(taskType, workerId)
print(ret)
val = "ok" if len(argv) <= 1 else argv[1]
taskObj = {
"taskId": ret['taskId'],
"workflowInstanceId": ret['workflowInstanceId'],
"status": "COMPLETED",
"outputData": {
"approver": "user001",
"status": val,
}
}
ret = tc.updateTask(taskObj)
print(ret)
if __name__ == '__main__':
main(sys.argv)
やっていることの概要は「ポーリング」->「ステータス設定」->「タスクの状態を更新」になります
ステータス情報はワーカーを起動するときの引数で ok
、ng
を切り替えられるようになっています
今回はそれっぽくするために承認者である approver
も設定しています
ポーリングとタスクのステータス更新を同じワーカー内で行っていますが本来であればポーリングしてから何かしらのアクションがあってステータスを更新するのが自然の流れかなと思います
例えば承認者に対して通知をしてから承認ボタンを押すことでステータスが更新されるといった流れが自然かなと思います
分岐について
今回は a1_worker.py
の outputData.status
に応じてどちらに分岐するか決まります
conductor の DECISION
タスクはその前のタスクの outputData
を使って自動的にどちらに分岐するか決定してくれます
分岐が決まるとその先にあるタスクを自動的に SCHEDULED
にしてくれるためワーカーなどを用意する必要はありません
タスクが SCHEDULED
になったそのタスクに対するワーカーをまた動かすだけです
approval_2 のワーカーを作成する
2 つ目の承認のワーカーを作成しましょう
大まかな処理は approval_1
と変わりません
- vim a2_worker.py
import sys
from conductor import conductor
def main(argv):
tc = conductor.TaskClient('http://localhost:8080/api')
taskType = 'approval_2'
workerId = 'approval_worker_2'
ret = tc.pollForTask(taskType, workerId)
print(ret)
val = "ok" if len(argv) <= 1 else argv[1]
money = 0
if val == "ok":
money = tc.getTask(ret['taskId'])['inputData']['money']
taskObj = {
"taskId": ret['taskId'],
"workflowInstanceId": ret['workflowInstanceId'],
"status": "COMPLETED",
"outputData": {
"approver": "admin001",
"status": val,
"money": money
}
}
ret = tc.updateTask(taskObj)
print(ret)
if __name__ == '__main__':
main(sys.argv)
違うのは金額情報 (money) をタスクの詳細から取得している点です
もし ok
の場合はワークフロー実行時に設定した money を取得して設定します
そしてそれを outputData.money
に設定することで最終的にこの値をワークフローの output として表示しています
error_1 のワーカーを作成する
エラーに遷移した場合にエラー処理をするワーカーが必要なので作成します
基本的にはタスクを FAILED
にして次に遷移しないようにしかつエラーメッセージを表示します
- vim error_worker.py
from conductor import conductor
def main():
tc = conductor.TaskClient('http://localhost:8080/api')
taskType = 'error_1'
workerId = 'error_worker_1'
ret = tc.pollForTask(taskType, workerId)
print(ret)
taskObj = {
"taskId": ret['taskId'],
"workflowInstanceId": ret['workflowInstanceId'],
"status": "FAILED",
"outputData": {
"message": "It's too high."
}
}
ret = tc.updateTask(taskObj)
print(ret)
if __name__ == '__main__':
main()
outputData.message
は正直何でも OK です
とりあえず承認を拒否した理由なんかがあると良いかなと思いメッセージを指定できるようにしています
大事なのは "status": "FAILED"
になります
こうすることでワークフロー全体が失敗の状態になり次に進まずここで終了になります
先ほども軽く説明しましたが conductor には後ろに繋げず強制終了するタスクの種類がないのでこのようなことをしています
また方法としては別の方法もありエラーに入った場合にエラー内容をセットし最終的に合流したタスクでそのエラー内容を判断して金額を表示しないという処理でも問題ないです
その場合にはワークフロー全体が FAILED
にならず未承認の場合でもワークフローは必ず COMPLETED
になると思います
動作確認
動作確認するにはタスクおよびワークフローを作成したら以下のように実行すれば OK です
- python3 start_workflow.py
- python3 a1_worker.py
- python3 a2_worker.py
これで以下のようなフローになります
失敗のフローに入れたい場合には
- python3 start_workflow.py
- python3 a1_worker.py ng
- python3 error_worker.py
という感じで実行すれば以下のようになります
もちろん approval_2 で ng にしても OK です
最後までいった場合はワークフロー全体の出力が { "your_got_the_money": 1000 }
という感じになりエラーになった場合は { "your_got_the_money": null }
になります
最後に
conductor を使って承認システムっぽいものを実装してみました
もし、たくさんの承認をしなければいけない場合はワーカーを増やせば簡単にスケールできます
今回はかなりワーカーを簡単に書きました
本来は DB や他のサービスとの連携などもう少し考えることはあるかなと思います
またロールバック処理も考えていないので本来であればそのフローも考える必要があるかなと思います
0 件のコメント:
コメントを投稿