2026年2月9日月曜日

外部の Redis にアクセスする場合はちゃんと pipeline を使おう

外部の Redis にアクセスする場合はちゃんと pipeline を使おう

概要

ループごとに GET するような処理はネットワークの負荷が重くなり結果としてアプリのレスポンス遅延にもつながるので可能な限り1回でほしい結果はすべて取得するようにしましょう

環境

  • Python 3.12.11
  • redis-py 7.1.0
  • redis 8.4.0

ダメなコード

def get_index_pairs(redis_client, hand_images: dict, ai_images: dict) -> list:
    profiles = get_profiles()
    pairs = []
    for num in sorted(
        set(hand_images.keys()) | set(ai_images.keys()),
        key=lambda x: int(x),
        reverse=True,
    ):
        hand_like_key = f"{num}_hand"
        ai_like_key = f"{num}_ai"
        hand_like = int(redis_client.get(hand_like_key) or 0)  # type: ignore
        ai_like = int(redis_client.get(ai_like_key) or 0)  # type: ignore
        pair = {
            "num": num,
            "hand": hand_images.get(num),
            "ai": ai_images.get(num),
            "hand_like": hand_like,
            "ai_like": ai_like,
            "name_hand": profiles.get(hand_like_key, {}).get("name", "名無し"),
            "character_hand": profiles.get(hand_like_key, {}).get(
                "character", "考え中"
            ),
            "name_ai": profiles.get(ai_like_key, {}).get("name", "名無し"),
            "character_ai": profiles.get(ai_like_key, {}).get("character", "考え中"),
            "winner": (
                "hand" if hand_like > ai_like else "ai" if ai_like > hand_like else None
            ),
        }
        pairs.append(pair)
    return pairs

良いコード

def get_index_pairs(redis_client, hand_images: dict, ai_images: dict) -> list:
    profiles = get_profiles()
    sorted_nums = sorted(
        set(hand_images.keys()) | set(ai_images.keys()),
        key=lambda x: int(x),
        reverse=True,
    )

    # --- Pipeline の開始 ---
    pipe = redis_client.pipeline()
    for num in sorted_nums:
        pipe.get(f"{num}_hand")
        pipe.get(f"{num}_ai")

    # 1回の通信で全データを取得
    raw_results = pipe.execute()
    # --- Pipeline の終了 ---

    pairs = []
    # 取得した結果を 2 つずつ取り出す
    for i, num in enumerate(sorted_nums):
        hand_like_bytes = raw_results[i * 2]
        ai_like_bytes = raw_results[i * 2 + 1]

        # decode_responses=False の場合を考慮
        hand_like = int(hand_like_bytes.decode() if hand_like_bytes else 0)
        ai_like = int(ai_like_bytes.decode() if ai_like_bytes else 0)

        pair = {
            "num": num,
            "hand": hand_images.get(num),
            "ai": ai_images.get(num),
            "hand_like": hand_like,
            "ai_like": ai_like,
            "name_hand": profiles.get(f"{num}_hand", {}).get("name", "名無し"),
            "character_hand": profiles.get(f"{num}_hand", {}).get(
                "character", "考え中"
            ),
            "name_ai": profiles.get(f"{num}_ai", {}).get("name", "名無し"),
            "character_ai": profiles.get(f"{num}_ai", {}).get("character", "考え中"),
            "winner": (
                "hand" if hand_like > ai_like else "ai" if ai_like > hand_like else None
            ),
        }
        pairs.append(pair)
    return pairs

比較

ネットワーク・レイテンシの削減

これが最も大きな改善点です。

ダメなコード(逐次処理): ループの中で毎回 redis_client.get() を呼び出しています。もし num が100個あれば、200回の通信(リクエストとレスポンスの往復)が発生します。

良いコード(パイプライン): すべての get コマンドを一旦溜めてから、pipe.execute() で 1回だけ Redisサーバーに送ります。通信回数が1回(往復1回)で済むため、ネットワークの遅延(レイテンシ)の影響を最小限に抑えられます。

スループット(処理能力)の向上

Redisサーバー側の負荷も軽減されます。

ダメなコード: サーバーは「リクエスト受信 → 解析 → 実行 → レスポンス送信」というサイクルを200回繰り返します。

良いコード: サーバーは「大きなリクエストを1つ受信 → まとめて実行 → 大きなレスポンスを1つ送信」という挙動になります。これにより、コンテキストスイッチやパケット処理のオーバーヘッドが減り、全体のスループットが向上します。

型の安全性と堅牢な実装

細かい部分ですが、データの扱いも丁寧になっています。

デコード処理: hand_like_bytes.decode() if hand_like_bytes else 0 のように、Redisから返ってくる値が None(キーが存在しない)場合や、バイト列(bytes型)である可能性を考慮して明示的にデコードしています。

マジックナンバーの排除: i * 2 と i * 2 + 1 を使ってパイプラインの結果から正しくデータを取り出しており、インデックス管理が明確です。

最後に

開発初期ではデータが少なく問題にならないですがデータが増えてくるとアプリの挙動がおかしくなるので注意しましょう

gunicorn などを使っていると Redis の応答よりもワーカーが先に死亡し以下のようなエラーになります

Traceback (most recent call last):
  File "/Users/username/.local/share/virtualenvs/ai-gallery-nT0ArqSL/lib/python3.12/site-packages/gunicorn/workers/sync.py", line 142, in handle
    self.handle_request(listener, req, client, addr)
  File "/Users/username/.local/share/virtualenvs/ai-gallery-nT0ArqSL/lib/python3.12/site-packages/gunicorn/workers/sync.py", line 185, in handle_request
    respiter = self.wsgi(environ, resp.start_response)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/username/.local/share/virtualenvs/ai-gallery-nT0ArqSL/lib/python3.12/site-packages/flask/app.py", line 1536, in __call__
    return self.wsgi_app(environ, start_response)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/username/.local/share/virtualenvs/ai-gallery-nT0ArqSL/lib/python3.12/site-packages/werkzeug/middleware/proxy_fix.py", line 183, in __call__
    return self.app(environ, start_response)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/username/.local/share/virtualenvs/ai-gallery-nT0ArqSL/lib/python3.12/site-packages/flask/app.py", line 1511, in wsgi_app
    response = self.full_dispatch_request()
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/username/.local/share/virtualenvs/ai-gallery-nT0ArqSL/lib/python3.12/site-packages/flask/app.py", line 917, in full_dispatch_request
    rv = self.dispatch_request()
         ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/username/.local/share/virtualenvs/ai-gallery-nT0ArqSL/lib/python3.12/site-packages/flask/app.py", line 902, in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)  # type: ignore[no-any-return]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/username/data/repo/ai-gallery/app.py", line 162, in index
    pairs = get_pairs()
            ^^^^^^^^^^^
  File "/Users/username/data/repo/ai-gallery/app.py", line 143, in get_pairs
    pairs = get_index_pairs(redis_client, hand_images, ai_images)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/username/data/repo/ai-gallery/lib/pair.py", line 45, in get_index_pairs
    ai_like = int(redis_client.get(ai_like_key) or 0)  # type: ignore
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/username/.local/share/virtualenvs/ai-gallery-nT0ArqSL/lib/python3.12/site-packages/redis/commands/core.py", line 1923, in get
    return self.execute_command("GET", name, keys=[name])
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/username/.local/share/virtualenvs/ai-gallery-nT0ArqSL/lib/python3.12/site-packages/redis/client.py", line 657, in execute_command
    return self._execute_command(*args, **options)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/username/.local/share/virtualenvs/ai-gallery-nT0ArqSL/lib/python3.12/site-packages/redis/client.py", line 668, in _execute_command
    return conn.retry.call_with_retry(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/username/.local/share/virtualenvs/ai-gallery-nT0ArqSL/lib/python3.12/site-packages/redis/retry.py", line 116, in call_with_retry
    return do()
           ^^^^
  File "/Users/username/.local/share/virtualenvs/ai-gallery-nT0ArqSL/lib/python3.12/site-packages/redis/client.py", line 669, in <lambda>
    lambda: self._send_command_parse_response(
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/username/.local/share/virtualenvs/ai-gallery-nT0ArqSL/lib/python3.12/site-packages/redis/client.py", line 640, in _send_command_parse_response
    return self.parse_response(conn, command_name, **options)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/username/.local/share/virtualenvs/ai-gallery-nT0ArqSL/lib/python3.12/site-packages/redis/client.py", line 691, in parse_response
    response = connection.read_response()
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/username/.local/share/virtualenvs/ai-gallery-nT0ArqSL/lib/python3.12/site-packages/redis/connection.py", line 1133, in read_response
    response = self._parser.read_response(disable_decoding=disable_decoding)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/username/.local/share/virtualenvs/ai-gallery-nT0ArqSL/lib/python3.12/site-packages/redis/_parsers/resp2.py", line 15, in read_response
    result = self._read_response(disable_decoding=disable_decoding)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/username/.local/share/virtualenvs/ai-gallery-nT0ArqSL/lib/python3.12/site-packages/redis/_parsers/resp2.py", line 25, in _read_response
    raw = self._buffer.readline()
          ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/username/.local/share/virtualenvs/ai-gallery-nT0ArqSL/lib/python3.12/site-packages/redis/_parsers/socket.py", line 115, in readline
    self._read_from_socket()
  File "/Users/username/.local/share/virtualenvs/ai-gallery-nT0ArqSL/lib/python3.12/site-packages/redis/_parsers/socket.py", line 65, in _read_from_socket
    data = self._sock.recv(socket_read_size)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/username/.local/share/virtualenvs/ai-gallery-nT0ArqSL/lib/python3.12/site-packages/gunicorn/workers/base.py", line 198, in handle_abort
    sys.exit(1)
SystemExit: 1

0 件のコメント:

コメントを投稿