概要
タスクにはタイムアウト機能が備わっているようで一定の時間タスクが完了にならない場合には自動的にタイムアウトのステータスにしてくれるようです
今回はその機能を試してみました
で、結果的にはデフォルトでは動作せる自分でコードを書き換えてあげることでタイムアウト機能が使えるようになりました
その辺りの対処方法を紹介します
環境
- macOS 10.13.5
- docker 18.03.1-ce
- conductor 1.0.0
- Python 3.6.5
事前準備
conductor サーバの準備および Python SDK が使えるようにしておいてください
timeoutSeconds を 60 に設定
公式 を見ると timeoutSeconds
は指定の時間の間ステータスが IN_PROGRESS
であった場合に TIMED_OUT
のステータスに変更してくれる機能です
まずは timeoutSeconds を 60 に設定したタスクおよびそれを使うワークフローを登録します
また、作成したワークフローを開始するスクリプトと開始したワークフローをポーリングするスクリプトも作成します
- タスク (sample1_task.json)
{
"name": "Sample_1",
"retryCount": 0,
"timeoutSeconds": 60,
"timeoutPolicy": "TIME_OUT_WF",
"retryLogic": "FIXED",
"retryDelaySeconds": 0,
"responseTimeoutSeconds": 0
}
- ワークフロー (sample1_wf.json)
{
"name": "sample1",
"description": "sample1 wf",
"version": 1,
"tasks": [
{
"name": "Sample_1",
"taskReferenceName": "s1",
"type": "SIMPLE",
"start_delay": 0
}
],
"schemaVersion": 2
}
それぞれ conductor にインポートするには registerTaskDefs
と createWorkflowDef
メソッドを使ってください
使い方はこちらで紹介しています
ワークフローは以下のような感じで登録できれば OK です
ワークフローまで登録できたらワークフローをスタートするスクリプトとポーリングするスクリプトを準備します
- vim start_wf1.py
from conductor import conductor
def main():
wfc = conductor.WorkflowClient('http://localhost:8080/api')
wfName = 'sample1'
workflowId = wfc.startWorkflow(wfName, {}, 1, None)
print(workflowId)
if __name__ == '__main__':
main()
スタートするワークフロー名を指定します
- vim polling1.py
from conductor import conductor
def main():
tc = conductor.TaskClient('http://localhost:8080/api')
taskType = 'Sample_1'
workerId = 'python_1'
ret = tc.pollForTask(taskType, workerId)
print(ret)
if __name__ == '__main__':
main()
ポーリングするタスク名を指定します
responseTimeoutSeconds を 60 に設定
responseTimeoutSeconds
は指定の時間ステータスに変化がない場合に再度ステータスを SCHEDULED
に変更してくれる機能です
それぞれの定義ですが先ほどの timeoutResponse とほぼ同様です
違うのはタスク用の JSON だけです
あとは Sample_1 となっているところを Sample_2 などに変更して同様にインポートしましょう
以下のようにワークフローが作成されれば OK です
動作確認
それぞれワークフローを開始して、タスクをポーリングしてみましょう
タスクの詳細が以下のような感じになります
- timeoutSeconds 60
- responseTimeoutSeconds 60
これの Last Polled/Updated
の 60 秒後にタスクのステータスに何かしらの変化が起こるはずです
が、、、なんと何も起きません
待てど待てどステータスはずっと IN_PROGRESS
のままで本来であれば TIMED_OUT
などになるはずです
なぜか
どうやらデフォルトではタイムアウト機能は disable になっているようです (理由は全くわかりません)
タイムタウト機能を enable にするには直接コードを編集しないとダメそうです
https://github.com/Netflix/conductor/issues/362
で、やってみました
まずは ServiceModule.java
を開きます
vim conductor/server/src/main/java/com/netflix/conductor/server/ServiceModule.java
まずヘッダ部分に import 文を 1 行追加します
import com.netflix.conductor.core.execution.WorkflowSweeper;
次に 103 行目あたり、configure()
メソッドの最後の行に以下を追記します
bind(WorkflowSweeper.class).asEagerSingleton();
これで OK です
あとは再度ソースをビルドして、jar と war を作成しそこから docker のイメージを作成し docker-compose up すれば OK です
ソースのビルド方法はこちらで紹介しています
再度動作確認してみた
ソースを変更後に再度 timeoutSeconds を試してみたところ正常に動作しました
メッセージを見るとぴったり 60 秒ではないようです
また単位はミリ秒で表示されるようです
ちなみに responseTimeoutSeconds はステータスは IN_PROGRESS
ままだったのですが指定時間後に再度ポーリングすると取得できたのでうまく動作しているんだと思います
最後に
conductor の timeoutSeconds と responseTimeoutSeconds の挙動を試してみました
試してみたところデフォルトでは ON になっていないようで現状はソースコードを修正する必要があるようです
なぜデフォルトで OFF になっているかは不明です
現状 Pull Request もまだないようなので、出しちゃってもいいかなと思っています
0 件のコメント:
コメントを投稿