2018年6月18日月曜日

conductor のタスクの timeoutSeconds と responseTimeoutSeconds の挙動を確認してみた

概要

タスクにはタイムアウト機能が備わっているようで一定の時間タスクが完了にならない場合には自動的にタイムアウトのステータスにしてくれるようです
今回はその機能を試してみました
で、結果的にはデフォルトでは動作せる自分でコードを書き換えてあげることでタイムアウト機能が使えるようになりました
その辺りの対処方法を紹介します

環境

  • macOS 10.13.5
  • docker 18.03.1-ce
  • conductor 1.0.0
  • Python 3.6.5

事前準備

conductor サーバの準備および Python SDK が使えるようにしておいてください

timeoutSeconds を 60 に設定

公式 を見ると timeoutSeconds は指定の時間の間ステータスが IN_PROGRESS であった場合に TIMED_OUT のステータスに変更してくれる機能です

まずは timeoutSeconds を 60 に設定したタスクおよびそれを使うワークフローを登録します
また、作成したワークフローを開始するスクリプトと開始したワークフローをポーリングするスクリプトも作成します

  • タスク (sample1_task.json)
{
  "name": "Sample_1",
  "retryCount": 0,
  "timeoutSeconds": 60,
  "timeoutPolicy": "TIME_OUT_WF",
  "retryLogic": "FIXED",
  "retryDelaySeconds": 0,
  "responseTimeoutSeconds": 0
}
  • ワークフロー (sample1_wf.json)
{
  "name": "sample1",
  "description": "sample1 wf",
  "version": 1,
  "tasks": [
    {
      "name": "Sample_1",
      "taskReferenceName": "s1",
      "type": "SIMPLE",
      "start_delay": 0
    }
  ],
  "schemaVersion": 2
}

それぞれ conductor にインポートするには registerTaskDefscreateWorkflowDef メソッドを使ってください
使い方はこちらで紹介しています

ワークフローは以下のような感じで登録できれば OK です
conductor_timeout3.png

ワークフローまで登録できたらワークフローをスタートするスクリプトとポーリングするスクリプトを準備します

  • vim start_wf1.py
from conductor import conductor

def main():
    wfc = conductor.WorkflowClient('http://localhost:8080/api')
    wfName = 'sample1'
    workflowId = wfc.startWorkflow(wfName, {}, 1, None)
    print(workflowId)

if __name__ == '__main__':
    main()

スタートするワークフロー名を指定します

  • vim polling1.py
from conductor import conductor

def main():
    tc = conductor.TaskClient('http://localhost:8080/api')
    taskType = 'Sample_1'
    workerId = 'python_1'
    ret = tc.pollForTask(taskType, workerId)
    print(ret)

if __name__ == '__main__':
    main()

ポーリングするタスク名を指定します

responseTimeoutSeconds を 60 に設定

responseTimeoutSeconds は指定の時間ステータスに変化がない場合に再度ステータスを SCHEDULED に変更してくれる機能です

それぞれの定義ですが先ほどの timeoutResponse とほぼ同様です
違うのはタスク用の JSON だけです
あとは Sample_1 となっているところを Sample_2 などに変更して同様にインポートしましょう

以下のようにワークフローが作成されれば OK です
conductor_timeout5.png

動作確認

それぞれワークフローを開始して、タスクをポーリングしてみましょう
タスクの詳細が以下のような感じになります

  • timeoutSeconds 60

conductor_timeout1.png

  • responseTimeoutSeconds 60

conductor_timeout2.png

これの Last Polled/Updated の 60 秒後にタスクのステータスに何かしらの変化が起こるはずです

が、、、なんと何も起きません
待てど待てどステータスはずっと IN_PROGRESS のままで本来であれば TIMED_OUT などになるはずです

なぜか

どうやらデフォルトではタイムアウト機能は disable になっているようです (理由は全くわかりません)
タイムタウト機能を enable にするには直接コードを編集しないとダメそうです
https://github.com/Netflix/conductor/issues/362

で、やってみました

まずは ServiceModule.java を開きます

  • vim conductor/server/src/main/java/com/netflix/conductor/server/ServiceModule.java

まずヘッダ部分に import 文を 1 行追加します

import com.netflix.conductor.core.execution.WorkflowSweeper;

次に 103 行目あたり、configure() メソッドの最後の行に以下を追記します

bind(WorkflowSweeper.class).asEagerSingleton();

これで OK です
あとは再度ソースをビルドして、jar と war を作成しそこから docker のイメージを作成し docker-compose up すれば OK です
ソースのビルド方法はこちらで紹介しています

再度動作確認してみた

ソースを変更後に再度 timeoutSeconds を試してみたところ正常に動作しました
conductor_timeout4.png

メッセージを見るとぴったり 60 秒ではないようです
また単位はミリ秒で表示されるようです

ちなみに responseTimeoutSeconds はステータスは IN_PROGRESS ままだったのですが指定時間後に再度ポーリングすると取得できたのでうまく動作しているんだと思います

最後に

conductor の timeoutSeconds と responseTimeoutSeconds の挙動を試してみました
試してみたところデフォルトでは ON になっていないようで現状はソースコードを修正する必要があるようです
なぜデフォルトで OFF になっているかは不明です
現状 Pull Request もまだないようなので、出しちゃってもいいかなと思っています

0 件のコメント:

コメントを投稿