2019年2月13日水曜日

Sinatra + Sidekiq でレジューム可能なワーカーを考える

概要

Sidekiq にはワーカーのプロセスを制御する API はありますがワーカー内で動作するジョブを制御する API はありません
なので例えば特定のジョブを停止させたい場合はワーカー自体を停止するしかありません
ただその場合はワーカーがリトライ用のジョブを再登録するための再度起動後に同じジョブを 1 から実行してしまいます

そうではなくジョブを停止して再実行した場合に停止した箇所から再実行できるようなワーカーを考えてみたいと思います

環境

  • macOS 10.14.2
  • Redis 4.0.9
  • Sidekiq 5.2.5

仕組みを検討する

Sidekiq::Process はワーカーのプロセスを制御する API です
これで stop! を掛けてしまうとワーカーが停止してしまいます
なのでこれは使わず特定のキーを Redis に登録することで「ジョブの停止」
「どこまで実行したかのステート」「実行中のジョブ」を管理したいと思います

Worker

まずはワーカーです
ポイントは 2 つです

  • すでに「どこまで実行したかのステート」がある場合はそのステートを使う
  • ジョブ実行中に停止用のキーが Redis にセットされたら「どこまで実行したかのステート」をセットしてジョブを終了する

になります
それらを考慮したコードは以下の通りです

require 'sidekiq'

class ResumeWorker
  include Sidekiq::Worker
  COUNT = 20
  KEY_PREFIX = 'resume'
  STOP = -1
  RUN = -3

  def perform(jid = nil)
    jid, counter = init(jid)
    while
      sleep 1
      counter += 1
      logger.info counter
      Sidekiq.redis { |c|
        v = c.get(jid).to_i
        if v == STOP
          c.set(jid, counter)
          return
        end
      }
      break if counter == COUNT
    end
    Sidekiq.redis { |c| c.del(jid) }
  rescue Interrupt
    logger.warn "stop worker by SIGTERM"
  end

  def init(jid)
    jid = self.jid if jid.nil?
    jid = "#{KEY_PREFIX}:#{jid}"
    counter = 0
    Sidekiq.redis { |c|
      v = c.get(jid)
      counter = v.to_i unless v.nil?
      c.set(jid, RUN)
      return jid, counter
    }
  end
end

登録したキーを識別できるように prefix を設けています
init 内で「どこまで実行したかのステート」をチェックし設定しています
今回ワーカーは単純なカウントをするだけのワーカーになっています
20 まで行ったら終了します
なのでステートはカウントした数字になります

ワーカーのメイン処理「perform」内でカウントするたびに Redis 内に停止用のキー (-1) が登録されたかをチェックします
もし登録されていたらどこまでカウントしたかを Redis に登録し return しています
カウントごとに Redis を read するので少しパフォーマンスが気になりますが今回は単純なカウントなので毎回チェックしています
やり方はいろいろありますが、ある一定のタイミングでチェックするような仕組みであれば別の方法でも良いと思います

API

次に API です
今回は API を叩くことでジョブの新規登録 (/start)、再開 (/start/:id)、停止 (/stop/:id) をできるようにします
フレームワークは Sinatra を使っています
ポイントは 2 つです

  • 再開時に停止中のジョブがあるかないかチェックする
  • 停止するため条件は「すでに実行中である」「停止の信号をすでに送信している場合は何もしない」「実行中の Sidekiq::Workers に該当のジョブ ID が存在している」

特に停止時は注意が必要です
コードは以下の通りです

require 'sinatra/base'
require 'sidekiq/api'
require 'logger'
require './resume_worker.rb'

class ResumeApp < Sinatra::Base
  logger = Logger.new(STDOUT)

  get '/start' do
    ResumeWorker.perform_async
  end

  get '/start/:id' do
    Sidekiq.redis { |c|
      v = c.get("#{ResumeWorker::KEY_PREFIX}:#{params['id']}")
      logger.info v
      return "no resumed id: #{params['id']}" if v.nil? || v.to_i ==  ResumeWorker::RUN
      ResumeWorker.perform_async(params['id'])
      return params['id']
    }
  end

  get '/stop/:id' do
    Sidekiq.redis { |c|
      v = c.get("#{ResumeWorker::KEY_PREFIX}:#{params['id']}")
      logger.info v
      return c.set("#{ResumeWorker::KEY_PREFIX}:#{params['id']}", ResumeWorker::STOP) if v.to_i == ResumeWorker::RUN
      return "alreay send stop signal #{params['id']}" if v.to_i != ResumeWorker::RUN
      workers = Sidekiq::Workers.new
      return "worker does no running #{params['id']}" unless workers.any? { |pid, tid, info| info['payload']['jid'] == params['id'] }
      c.set("#{ResumeWorker::KEY_PREFIX}:#{params['id']}", ResumeWorker::STOP)
    }
  end
end

ジョブを停止する信号を送信するとワーカー側でそれを検知してステートを保存します
その際にすでに信号は登録されているがワーカー自体はまだジョブを手放していない状態になる可能性があります
その状態で /stop をコールした際に保存したステートを上書きしないようにチェックする必要があります

新規でジョブが開始された場合とジョブを再開した場合の戻り値はジョブ ID を返します
ジョブを停止した場合は「OK」だけを返しています
それ以外のエラー系に関してはジョブ ID+メッセージを返しています

考慮すべき点

今回の場合サンプルレベルなのでそこまでいろいろ考えていません
実際に運用する場合は更に以下を考慮したほうが良いと思います

  • /stop を全くジョブが動作していない状況でもできるようにする
  • レジューム中のジョブの一覧などを取得できるようにする
  • ワーカーが強制停止してしまった場合の対応

あたりは考えたほうが良いと思います
最後の強制停止 (SIGTERM) は rescue Interrupt で拾えるのは拾えるのですがこの中で Sidekiq.redis が使えません
なのでまた別の方法でステートを保存する必要が出てきます

最後に

Sinatra + Sidekiq でジョブをレジュームできるワーカーを簡単ですが実装してみました
やってみるとレジューム機能がかなり考えることが多く難しい機能だということがわかりました
機能的に便利なので余裕があれば実装してみると良いかなと思います

参考サイト

0 件のコメント:

コメントを投稿