概要
Sidekiq にはワーカーのプロセスを制御する API はありますがワーカー内で動作するジョブを制御する API はありません
なので例えば特定のジョブを停止させたい場合はワーカー自体を停止するしかありません
ただその場合はワーカーがリトライ用のジョブを再登録するための再度起動後に同じジョブを 1 から実行してしまいます
そうではなくジョブを停止して再実行した場合に停止した箇所から再実行できるようなワーカーを考えてみたいと思います
環境
- macOS 10.14.2
- Redis 4.0.9
- Sidekiq 5.2.5
仕組みを検討する
Sidekiq::Process
はワーカーのプロセスを制御する API です
これで stop!
を掛けてしまうとワーカーが停止してしまいます
なのでこれは使わず特定のキーを Redis に登録することで「ジョブの停止」
「どこまで実行したかのステート」「実行中のジョブ」を管理したいと思います
Worker
まずはワーカーです
ポイントは 2 つです
- すでに「どこまで実行したかのステート」がある場合はそのステートを使う
- ジョブ実行中に停止用のキーが Redis にセットされたら「どこまで実行したかのステート」をセットしてジョブを終了する
になります
それらを考慮したコードは以下の通りです
require 'sidekiq'
class ResumeWorker
include Sidekiq::Worker
COUNT = 20
KEY_PREFIX = 'resume'
STOP = -1
RUN = -3
def perform(jid = nil)
jid, counter = init(jid)
while
sleep 1
counter += 1
logger.info counter
Sidekiq.redis { |c|
v = c.get(jid).to_i
if v == STOP
c.set(jid, counter)
return
end
}
break if counter == COUNT
end
Sidekiq.redis { |c| c.del(jid) }
rescue Interrupt
logger.warn "stop worker by SIGTERM"
end
def init(jid)
jid = self.jid if jid.nil?
jid = "#{KEY_PREFIX}:#{jid}"
counter = 0
Sidekiq.redis { |c|
v = c.get(jid)
counter = v.to_i unless v.nil?
c.set(jid, RUN)
return jid, counter
}
end
end
登録したキーを識別できるように prefix を設けています
init
内で「どこまで実行したかのステート」をチェックし設定しています
今回ワーカーは単純なカウントをするだけのワーカーになっています
20 まで行ったら終了します
なのでステートはカウントした数字になります
ワーカーのメイン処理「perform」内でカウントするたびに Redis 内に停止用のキー (-1
) が登録されたかをチェックします
もし登録されていたらどこまでカウントしたかを Redis に登録し return
しています
カウントごとに Redis を read するので少しパフォーマンスが気になりますが今回は単純なカウントなので毎回チェックしています
やり方はいろいろありますが、ある一定のタイミングでチェックするような仕組みであれば別の方法でも良いと思います
API
次に API です
今回は API を叩くことでジョブの新規登録 (/start
)、再開 (/start/:id
)、停止 (/stop/:id
) をできるようにします
フレームワークは Sinatra を使っています
ポイントは 2 つです
- 再開時に停止中のジョブがあるかないかチェックする
- 停止するため条件は「すでに実行中である」「停止の信号をすでに送信している場合は何もしない」「実行中の
Sidekiq::Workers
に該当のジョブ ID が存在している」
特に停止時は注意が必要です
コードは以下の通りです
require 'sinatra/base'
require 'sidekiq/api'
require 'logger'
require './resume_worker.rb'
class ResumeApp < Sinatra::Base
logger = Logger.new(STDOUT)
get '/start' do
ResumeWorker.perform_async
end
get '/start/:id' do
Sidekiq.redis { |c|
v = c.get("#{ResumeWorker::KEY_PREFIX}:#{params['id']}")
logger.info v
return "no resumed id: #{params['id']}" if v.nil? || v.to_i == ResumeWorker::RUN
ResumeWorker.perform_async(params['id'])
return params['id']
}
end
get '/stop/:id' do
Sidekiq.redis { |c|
v = c.get("#{ResumeWorker::KEY_PREFIX}:#{params['id']}")
logger.info v
return c.set("#{ResumeWorker::KEY_PREFIX}:#{params['id']}", ResumeWorker::STOP) if v.to_i == ResumeWorker::RUN
return "alreay send stop signal #{params['id']}" if v.to_i != ResumeWorker::RUN
workers = Sidekiq::Workers.new
return "worker does no running #{params['id']}" unless workers.any? { |pid, tid, info| info['payload']['jid'] == params['id'] }
c.set("#{ResumeWorker::KEY_PREFIX}:#{params['id']}", ResumeWorker::STOP)
}
end
end
ジョブを停止する信号を送信するとワーカー側でそれを検知してステートを保存します
その際にすでに信号は登録されているがワーカー自体はまだジョブを手放していない状態になる可能性があります
その状態で /stop
をコールした際に保存したステートを上書きしないようにチェックする必要があります
新規でジョブが開始された場合とジョブを再開した場合の戻り値はジョブ ID を返します
ジョブを停止した場合は「OK」だけを返しています
それ以外のエラー系に関してはジョブ ID+メッセージを返しています
考慮すべき点
今回の場合サンプルレベルなのでそこまでいろいろ考えていません
実際に運用する場合は更に以下を考慮したほうが良いと思います
/stop
を全くジョブが動作していない状況でもできるようにする- レジューム中のジョブの一覧などを取得できるようにする
- ワーカーが強制停止してしまった場合の対応
あたりは考えたほうが良いと思います
最後の強制停止 (SIGTERM
) は rescue Interrupt
で拾えるのは拾えるのですがこの中で Sidekiq.redis
が使えません
なのでまた別の方法でステートを保存する必要が出てきます
最後に
Sinatra + Sidekiq でジョブをレジュームできるワーカーを簡単ですが実装してみました
やってみるとレジューム機能がかなり考えることが多く難しい機能だということがわかりました
機能的に便利なので余裕があれば実装してみると良いかなと思います
0 件のコメント:
コメントを投稿