2018年5月10日木曜日

conductor を使ってそれっぽいワークフロー処理を実装してみた

概要

conductor を使ってそれっぽいワークフローを作成してみました
ワークフローの代表的なユースケースである承認システムを想定しています
conductor のインストール方法や基本的な使い方はこれまでの記事を参考にしてください

環境

  • macOS 10.13.4
  • conductor 1.9.0
  • docker 18.04.0-ce
  • Python 3.6.4

必要なタスクを登録する

今回は 3 つのタスクを登録します
2 つは承認用のタスクで 1 つはエラー処理用のタスクとします

  • vim create_task.py
from conductor import conductor

def main():
    tc = conductor.MetadataClient('http://localhost:8080/api')
    listOfTaskDefObj = [
      {
        "createTime": 1525672041,
        "name": "approval_1",
        "description": "Low level approval",
        "retryCount": 1,
        "timeoutSeconds": 0,
        "timeoutPolicy": "TIME_OUT_WF",
        "retryLogic": "FIXED",
        "retryDelaySeconds": 60,
        "responseTimeoutSeconds": 3600
      },
      {
        "createTime": 1525672041,
        "name": "approval_2",
        "description": "High level approval",
        "retryCount": 1,
        "timeoutSeconds": 0,
        "timeoutPolicy": "TIME_OUT_WF",
        "retryLogic": "FIXED",
        "retryDelaySeconds": 60,
        "responseTimeoutSeconds": 3600
      },
      {
        "createTime": 1525672041,
        "name": "error_1",
        "description": "Your approval has been denied",
        "retryCount": 0,
        "timeoutSeconds": 0,
        "timeoutPolicy": "TIME_OUT_WF",
        "retryLogic": "FIXED",
        "retryDelaySeconds": 0,
        "responseTimeoutSeconds": 0
      }
    ]
    ret = tc.registerTaskDefs(listOfTaskDefObj)
    print(ret)

if __name__ == '__main__':
    main()

これをベースにワークフローを作成していきます
作成済みのタスクの一覧を取得するには tc.getAllTaskDefs() を使います
my_workflow1.png

my_workflow2.png

ワークフローを作成する

先ほど作成したタスクを用いてワークフローを作成します

  • vim create_workflow.py
from conductor import conductor

def main():
    tc = conductor.MetadataClient('http://localhost:8080/api')
    wfdObj = {
      "createTime": 1525673798,
      "name": "approval_workflow",
      "description": "Get the money throught the approvals",
      "version": 1,
      "tasks": [
        {
          "name": "approval_1",
          "taskReferenceName": "approval_1",
          "inputParameters": {
            "money": "${workflow.input.money}",
          },
          "type": "SIMPLE",
          "startDelay": 0
        },
        {
          "name": "check_1",
          "taskReferenceName": "check_1",
          "inputParameters": {
            "status": "${approval_1.output.status}"
          },
          "type": "DECISION",
          "caseValueParam": "status",
          "decisionCases": {
            "ng": [
              {
                "name": "error_1",
                "taskReferenceName": "error_1",
                "type": "SIMPLE",
                "startDelay": 0
              },
            ],
            "ok": [
              {
                "name": "approval_2",
                "taskReferenceName": "approval_2",
                "inputParameters": {
                  "money": "${workflow.input.money}",
                },
                "type": "SIMPLE",
                "startDelay": 0
              },
              {
                "name": "check_2",
                "taskReferenceName": "check_2",
                "inputParameters": {
                  "status": "${approval_2.output.status}"
                },
                "type": "DECISION",
                "caseValueParam": "status",
                "decisionCases": {
                  "ng": [
                    {
                      "name": "error_1",
                      "taskReferenceName": "error_1",
                      "type": "SIMPLE",
                      "startDelay": 0
                    },
                  ],
                  "ok": [
                    {
                      "name": "event_1",
                      "taskReferenceName": "event_1",
                      "type": "EVENT",
                      "startDelay": 0,
                      "sink": "conductor"
                    },
                  ]
                },
                "startDelay": 0
              }
            ]
          },
          "startDelay": 0
        }
      ],
      "outputParameters": {
        "your_got_the_money": "${approval_2.output.money}"
      },
      "schemaVersion": 2
    }
    ret = tc.createWorkflowDef(wfdObj)
    print(ret)

if __name__ == '__main__':
    main()

これを実行すると以下のワークフローが作成されます
my_workflow3.png

少し流れを解説します
全体の流れとしては 2 つの承認が両方共 OK ならば依頼した金額を得ることができるというワークフローになります
後ほど紹介しますが金額はワークフロー開始時に入力情報として指定します
もし NG になった場合は error_1 のタスクに入ります
ここでは強制的にステータスを FAILED にします
なので、error_1 の流れに来た場合は最後までたどり着かずに必ずフローが失敗するようになっています
conductor の場合タスクを途中でぶった切って終了させるというタスクがないのでこのようにしています

最終的にワークフローが COMPLETED になった場合はいくら金額が取得できたのかを表示します
outputParametersyour_got_the_money がそれになります
あとは 2 つ目の承認が完了した際にイベントタスクを挟んでいます
今回は特にこのイベントに対する処理は書いていませんが、キューに情報が入るのでその情報を元に通知するなどの用途を考えています

だいたいの流れは上記となります
あとはそれぞれのタスクのワーカーを作成していきます

作成したワークフローを削除することは現状できないので、updateWorkflowDefs を使って更新するようにしてください

ワークフローをスタートする

まずはワークフローをスタートします
先ほど説明したように申請する金額の情報を設定します

  • vim start_workflow.py
from conductor import conductor

def main():
    wfc = conductor.WorkflowClient('http://localhost:8080/api')
    wfName = 'approval_workflow'
    input = { "money": 1000 }
    workflowId = wfc.startWorkflow(wfName, input, 1, None)
    print(workflowId)

if __name__ == '__main__':
    main()

これでワークフローの定義で金額を ${workflow.input.money} で参照できるようになります
ワークフロースタートすると RUNNING の状態になります
my_workflow4.png

approval_1 のワーカーを作成する

ワークフローをスタートすると 1 つ目のタスク approval_1SCHEDULED のステータスになります
1 つ目の承認用ののワーカーを作成しましょう

  • vim a1_worker.py
import sys
from conductor import conductor

def main(argv):
    tc = conductor.TaskClient('http://localhost:8080/api')
    taskType = 'approval_1'
    workerId = 'approval_worker_1'
    ret = tc.pollForTask(taskType, workerId)
    print(ret)
    val = "ok" if len(argv) <= 1 else argv[1]
    taskObj = {
      "taskId": ret['taskId'],
      "workflowInstanceId": ret['workflowInstanceId'],
      "status": "COMPLETED",
      "outputData": {
        "approver": "user001",
        "status": val,
      }
    }
    ret = tc.updateTask(taskObj)
    print(ret)

if __name__ == '__main__':
    main(sys.argv)

やっていることの概要は「ポーリング」->「ステータス設定」->「タスクの状態を更新」になります
ステータス情報はワーカーを起動するときの引数で okng を切り替えられるようになっています
今回はそれっぽくするために承認者である approver も設定しています

ポーリングとタスクのステータス更新を同じワーカー内で行っていますが本来であればポーリングしてから何かしらのアクションがあってステータスを更新するのが自然の流れかなと思います
例えば承認者に対して通知をしてから承認ボタンを押すことでステータスが更新されるといった流れが自然かなと思います

分岐について

今回は a1_worker.pyoutputData.status に応じてどちらに分岐するか決まります
conductor の DECISION タスクはその前のタスクの outputData を使って自動的にどちらに分岐するか決定してくれます
分岐が決まるとその先にあるタスクを自動的に SCHEDULED にしてくれるためワーカーなどを用意する必要はありません

タスクが SCHEDULED になったそのタスクに対するワーカーをまた動かすだけです

approval_2 のワーカーを作成する

2 つ目の承認のワーカーを作成しましょう
大まかな処理は approval_1 と変わりません

  • vim a2_worker.py
import sys
from conductor import conductor

def main(argv):
    tc = conductor.TaskClient('http://localhost:8080/api')
    taskType = 'approval_2'
    workerId = 'approval_worker_2'
    ret = tc.pollForTask(taskType, workerId)
    print(ret)
    val = "ok" if len(argv) <= 1 else argv[1]
    money = 0
    if val == "ok":
      money = tc.getTask(ret['taskId'])['inputData']['money']
    taskObj = {
      "taskId": ret['taskId'],
      "workflowInstanceId": ret['workflowInstanceId'],
      "status": "COMPLETED",
      "outputData": {
        "approver": "admin001",
        "status": val,
        "money": money
      }
    }
    ret = tc.updateTask(taskObj)
    print(ret)

if __name__ == '__main__':
    main(sys.argv)

違うのは金額情報 (money) をタスクの詳細から取得している点です
もし ok の場合はワークフロー実行時に設定した money を取得して設定します
そしてそれを outputData.money に設定することで最終的にこの値をワークフローの output として表示しています

error_1 のワーカーを作成する

エラーに遷移した場合にエラー処理をするワーカーが必要なので作成します
基本的にはタスクを FAILED にして次に遷移しないようにしかつエラーメッセージを表示します

  • vim error_worker.py
from conductor import conductor

def main():
    tc = conductor.TaskClient('http://localhost:8080/api')
    taskType = 'error_1'
    workerId = 'error_worker_1'
    ret = tc.pollForTask(taskType, workerId)
    print(ret)
    taskObj = {
      "taskId": ret['taskId'],
      "workflowInstanceId": ret['workflowInstanceId'],
      "status": "FAILED",
      "outputData": {
        "message": "It's too high."
      }
    }
    ret = tc.updateTask(taskObj)
    print(ret)

if __name__ == '__main__':
    main()

outputData.message は正直何でも OK です
とりあえず承認を拒否した理由なんかがあると良いかなと思いメッセージを指定できるようにしています
大事なのは "status": "FAILED" になります
こうすることでワークフロー全体が失敗の状態になり次に進まずここで終了になります

先ほども軽く説明しましたが conductor には後ろに繋げず強制終了するタスクの種類がないのでこのようなことをしています

また方法としては別の方法もありエラーに入った場合にエラー内容をセットし最終的に合流したタスクでそのエラー内容を判断して金額を表示しないという処理でも問題ないです
その場合にはワークフロー全体が FAILED にならず未承認の場合でもワークフローは必ず COMPLETED になると思います

動作確認

動作確認するにはタスクおよびワークフローを作成したら以下のように実行すれば OK です

  • python3 start_workflow.py
  • python3 a1_worker.py
  • python3 a2_worker.py

これで以下のようなフローになります
my_workflow5.png

失敗のフローに入れたい場合には

  • python3 start_workflow.py
  • python3 a1_worker.py ng
  • python3 error_worker.py

という感じで実行すれば以下のようになります
もちろん approval_2 で ng にしても OK です
my_workflow6.png

最後までいった場合はワークフロー全体の出力が { "your_got_the_money": 1000 } という感じになりエラーになった場合は { "your_got_the_money": null } になります

最後に

conductor を使って承認システムっぽいものを実装してみました
もし、たくさんの承認をしなければいけない場合はワーカーを増やせば簡単にスケールできます

今回はかなりワーカーを簡単に書きました
本来は DB や他のサービスとの連携などもう少し考えることはあるかなと思います
またロールバック処理も考えていないので本来であればそのフローも考える必要があるかなと思います

0 件のコメント:

コメントを投稿