2023年8月31日木曜日

fastapiでリスト内にデータ構造が含まれるようなフォームデータを扱う方法

fastapiでリスト内にデータ構造が含まれるようなフォームデータを扱う方法

概要

リクエストでいうと以下のようなリクエストが来ることを想定して作ります

  • curl -XPOST -H "Action: Hoge" localhost:8000/submit/ -H "content-type: application/x-www-form-urlencoded" -d "data[0][lang]=ruby&data[0][framework]=sinatra&data[1][lang]=python&data[1][framework]=fastapi"

data はリストになっていてその中に lang, framework というデータ構造を持つオブジェクトが入っているイメージです

環境

  • Python 3.11.3
  • fastapi 0.100.1
  • pydantic 2.1.1

サンプルコード

from fastapi import Depends, FastAPI, Request
from pydantic import BaseModel, field_validator

app = FastAPI()


# リスト内のデータ構造を管理するクラス
class Profile(BaseModel):
    lang: str
    framework: str

    @field_validator("lang")
    @classmethod
    def validate_lang(cls, v: str):
        if v not in ["ruby", "python"]:
            raise ValueError()
        return v


# リクエストのリスト部分を管理するクラス
class RequestData(BaseModel):
    profile: list[Profile]


# 独自のフォームパーサ
async def parse_form_data(request: Request) -> RequestData:
    form_data = await request.form()
    print(form_data)
    request_data: list[Profile] = []
    # とりあえず 100 リストまで対応
    for i in range(100):
        # lang が含まれているか確認、0から始まっていないもしくは指定のデータ構造がない場合などは終了
        lang = form_data.get(f"data[{i}][lang]")
        if lang is None:
            break
        # framework が含まれているか確認、0から始まっていないもしくは指定のデータ構造がない場合などは終了
        framework = form_data.get(f"data[{i}][framework]")
        if framework is None:
            break
        # データ構造の作成
        profile = Profile(lang=str(lang), framework=str(framework))
        # リストへの追加
        request_data.append(profile)
    # Depends の返り値を指定
    return RequestData(profile=request_data)


@app.post("/submit/")
async def submit_form(form_data: RequestData = Depends(parse_form_data)):
    # Depends で parse_form_data を指定することでリクエストデータを独自でパースする
    return form_data

動作確認

  • curl -XPOST -H "Action: Hoge" localhost:8000/submit/ -H "content-type: application/x-www-form-urlencoded" -d "data[0][lang]=ruby&data[0][framework]=sinatra&data[1][lang]=python&data[1][framework]=fastapi"

は通ります

  • curl -XPOST -H "Action: Hoge" localhost:8000/submit/ -H "content-type: application/x-www-form-urlencoded" -d "data[0][lang]=ruby&data[0][framework]=sinatra&data[1][lang]=python3&data[1][framework]=fastapi"

はエラーになります

最後に

そもそもこういったリクエストは form ではなく json で受け取るべきだと思います
ファイルを送信したくどうしても form を使わざるを得ない場合は仕方ないかなと思います

2023年8月30日水曜日

fastapiでヘッダのアクション名に応じてコールするルーティングを変更する方法

fastapiでヘッダのアクション名に応じてコールするルーティングを変更する方法

概要

前回の続きです
ヘッダの情報に合わせて適切なルーティングをコールするようにしてみました

環境

  • Python 3.11.3
  • fastapi 0.100.1
  • pydantic 2.1.1

サンプルコード

from typing import Self

from fastapi import Depends, FastAPI, Form, Request
from pydantic import BaseModel, field_validator

app = FastAPI()


async def get_action(request: Request) -> str:
    action = request.headers.get("Action", "default")
    return action


class UserForm(BaseModel):
    username: str
    password: str

    @classmethod
    def as_form(cls, username: str = Form(...), password: str = Form(...)) -> Self:
        return cls(username=username, password=password)

    @field_validator("username")
    def validate_username(cls, v):
        if v != "hawksnowlog":
            raise ValueError("Invalid username.")
        return v


@app.get("/")
async def root(request: Request, action: str = Depends(get_action)):
    form_data = await request.form()
    if action == "Hoge":
        # FormData は immutableDict なのでアンパックできる
        return await hoge(action, UserForm(**form_data))  # type: ignore
    return action


@app.post("/hoge")
async def hoge(
    action: str = Depends(get_action), form_data: UserForm = Depends(UserForm.as_form)
):
    return action, form_data.username

動作確認

  • pipenv run uvicorn app:app --reload

POST で送信できる

  • curl -XPOST -H "Action: Hoge" localhost:8000/hoge -H "content-type: application/x-www-form-urlencoded" -d "username=hawksnowlog&password=pass"

GET からも送信できる

  • curl -XGET -H "Action: Hoge" localhost:8000 -H "content-type: application/x-www-form-urlencoded" -d "username=hawksnowlog&password=pass"

どちらも username のバリデーションが動作していることも確認できると思います

最後に

同一パスに対する異なる処理を書きたい場合には使えるテクニックかなと思います

2023年8月29日火曜日

fastapiでヘッダ情報とフォームデータを同時に扱う方法

fastapiでヘッダ情報とフォームデータを同時に扱う方法

概要

ヘッダは Depends で処理してフォームデータは BaseModel でリクエストとして処理します

環境

  • Python 3.11.3
  • fastapi 0.100.1
  • pydantic 2.1.1

サンプルコード

from typing import Self

from fastapi import Depends, FastAPI, Form, Request
from pydantic import BaseModel, field_validator

app = FastAPI()


async def get_action(request: Request) -> str:
    action = request.headers.get("Action", "default")
    return action


class UserForm(BaseModel):
    username: str
    password: str

    @classmethod
    def as_form(cls, username: str = Form(...), password: str = Form(...)) -> Self:
        return cls(username=username, password=password)

    @field_validator("username")
    def validate_username(cls, v):
        if v != "hawksnowlog":
            raise ValueError("Invalid username.")
        return v


@app.get("/")
async def root(
    action: str = Depends(get_action), form_data: UserForm = Depends(UserForm.as_form)
):
    return action, form_data

動作確認

  • pipenv run uvicorn app:app --reload
  • curl -H "Action: Hoge" localhost:8000 -XGET -H "content-type: application/x-www-form-urlencoded" -d "username=hawksnowlog&password=pass"

=> ["Hoge",{"username":"hawksnowlog","password":"pass"}]

最後に

更にアクションに応じて処理する場合は一度ヘッダを取得したあとに別のルーティングをコールするなどの処理が必要なのかもしれない

2023年8月21日月曜日

PydanticV2 で Xml なレスポンスを扱う方法を考える

PydanticV2 で Xml なレスポンスを扱う方法を考える

概要

過去 に Pydantic V1 で検討しましたが V2 でも検討してみます
ydantic-xml が V1 にしか対応していないので pydantic-xml を使わない方針で作成してみます

環境

  • Python 3.11.3
  • fastapi 0.100.1
  • pypdantic 2.1.1

方針

  • Xml レスポンスは fastapi のレスポンスクラスを継承して作成
  • Xml 用のモデル定義は pydantic の BaseModel を継承する
    • レスポンスを Xml に変換するためのシリアライザを持たせる
    • Xml フィールドのキーはパスカルケースに変換する

サンプルコード

import textwrap
from typing import LiteralString, Union

import dicttoxml
from fastapi import FastAPI, Response
from pydantic import BaseModel, ConfigDict, Field
from pydantic.alias_generators import to_pascal

app = FastAPI()


class User(BaseModel):
    # alias_generator を指定することで Xml のキー情報をパスカルケースにする
    model_config = ConfigDict(populate_by_name=True, alias_generator=to_pascal)

    name: str = Field()
    age: int = Field()

    # model_dump (dict) を xml (str) に変換する
    def to_xml(self) -> LiteralString | bytes:
        return dicttoxml.dicttoxml(
            self.model_dump(by_alias=True),
            custom_root=to_pascal("some_custom_root"),
            attr_type=False,
        )


# fastapi で Xml レスポンスを生成するためのクラス
class XmlResponse(Response):
    def __init__(self, content: Union[str, bytes], status_code: int = 200):
        super().__init__(content, status_code=status_code, media_type="application/xml")


@app.get(
    "/",
    response_class=XmlResponse,
    response_model=User,
    # openapi-doc はここで生成する
    responses={
        200: {
            "description": "Success",
            "content": {
                "application/xml": {
                    "schema": {"$ref": "#/components/schemas/User"},
                    "example": textwrap.dedent(
                        """
                        <?xml version="1.0" encoding="UTF-8"?>
                        <User>
                          <Name>hawk</Name>
                          <Age>20</Age>
                        </User>
                    """
                    )[1:-1],
                },
                "application/json": None,
            },
        },
    },
)
def xml():
    result = User(name="hawksnowlog", age=10)
    return XmlResponse(content=result.to_xml())

動作確認

  • curl localhost:8000
<?xml version="1.0" encoding="UTF-8" ?><SomeCustomRoot><Name>hawksnowlog</Name><Age>10</Age></SomeCustomRoot>

localhost:8000/docs

最後に

pydantic V2 で Xml レスポンスを表現する方法を検討してみました
to_xml などはシリアライザを指定して自動で呼び出してくれるようになるともっといい感じなるかなと思います

参考サイト

2023年8月16日水曜日

Python3 エンジニア認定実践試験に合格しました

Python3 エンジニア認定実践試験に合格しました

概要

8/16に受験し合格したので勉強方法などを紹介します

結果

  • 875/1000 合格

40問で1000点満点なので一問25点で計算すると

  • 35/40

の正解率になります

コンテキスト

  • 普段からPython3を使った業務をしている
  • Python経験は2年ほど
  • Python 基礎認定試験保持

勉強期間

  • 約1ヶ月

勉強方法

実践レシピはキャンペーンでTwitterで受験宣言するともらえるのでそれを利用しました

ExamApp は少ないですが100%回答できるまで繰り返しましょう
実践レシピは内容をほぼ暗記するくらいじゃないと厳しいです (関数の使い方、オプションの指定方法、レスポンスの型使い方など)

それだけでも少し足りない気もしていてやはり「ちゃんと実践的な使い方ができる」レベルまで求められている感じもしました

ただ自分は ExamApp の繰り返しと実践レシピの暗記で合格できました

全体的な所感

  • ExamApp だけだと厳しい
    • 過去問からそのまま出題された問題は一問もなかった
  • 教本もしっかり読んで理解しないとダメ
    • モジュールの使い方やクラスの使い方はもちろん関数のオプションの使い方も理解している必要がある
  • 問題はすべて選択式の4択
    • 消去法でも何とかなるので最悪2択くらいまで絞れればいけるかも
  • 引っ掛けっぽい問題がある
    • 結構あったイメージ
    • 説明の文言など
    • オプションのあるなしなど
  • 単純な問題ではなく複数の要素を組み合わせた問題もある
    • 内包記法+ジェネレータ+参照方法など
  • 基礎認定試験は難しい

2023年8月12日土曜日

Python でクラスに対してデコレートするとどうなるか

Python でクラスに対してデコレートするとどうなるか

概要

クラスからオブジェクトを作成する前後や作成するときに好きな処理を追加することができます

環境

  • Python 3.11.3

サンプルコード

from typing import Type


class User:
    def __init__(self, name: str) -> None:
        self.name = name


def deco(User: Type[User]):
    # これがコンストラクタの代わりのように振る舞う
    def func(name):
        return User(name + "!")

    return func


if __name__ == "__main__":
    # これだけだと内部の func はコールされない
    DecoUser = deco(User)
    # 実際にオブジェクトを生成する (デコレートされたクラスを使用する) と func を通る
    user = DecoUser("hawk")
    # ! マークが付与されていることが確認できる
    print(user.name)

最後に

デコレータは関数だけではなくクラス自体にも使えます
デコレータというよりかは内部関数 (クロージャ) の使い方かもしれませんが

2023年8月11日金曜日

rqscheduler でジョブの実行結果を保存する期間を指定する

rqscheduler でジョブの実行結果を保存する期間を指定する

概要

result_ttl をジョブ登録時に指定します
ただ最初に注意事項として記載しますが result_ttl を設定するとジョブのキー自体が削除されて再スケジュールされなくなるので注意してください

例えば1分おきのジョブを登録した場合に result_ttl=10 (10秒) に設定すると一度ジョブが実行されて10秒後にジョブ自体が消えてしまうので次の1分後にジョブが実行されなくなります

環境

  • Python 3.11.3
  • flask-rq2 18.3
  • rq-scheduler 0.13.1

サンプルコード

前回からのを流用しているのでインタフェースは Flask になります
ポイントは scheduler.cron 時に result_ttl を設定するところです

  • vim ./app.py
from flask import Flask, request
from flask_rq2 import RQ
from rq.job import Job

app = Flask(__name__)
app.config["RQ_REDIS_URL"] = "redis://localhost:6379/0"

rq = RQ(app)


def hello():
    return "hello"


@app.route("/update")
def update_job():
    job_id = request.args["job_id"]
    cron_string = request.args.get("cron_string", "*/2 * * * *")
    scheduler = rq.get_scheduler(queue="bar")
    job = Job.fetch(job_id, connection=scheduler.connection)
    job.meta["cron_string"] = cron_string
    job.save_meta()
    return {"id": job.id, "cron_string": job.meta.get("cron_string")}


@app.route("/")
def get_jobs():
    scheduler = rq.get_scheduler(queue="bar")
    jobs = scheduler.get_jobs()
    return [
        {
            "id": job.id,
            "cron_string": job.meta.get("cron_string"),
            "results": [
                f"{r.type}:{r.return_value}:{r.created_at}" for r in job.results()
            ],
        }
        for job in jobs
    ]


@app.route("/register")
def register_job():
    scheduler = rq.get_scheduler(queue="bar")
    job = scheduler.cron(
        "*/1 * * * *",
        func=hello,
        args=[],
        result_ttl=10,  # ここを追加、単位は秒
    )
    return job.get_id()

ちなみに redis 上では rq:results:xxx というキーに stream 型で保存されます
return_value は base64 エンコードされています

127.0.0.1:6379> type rq:results:2064a66f-b2cf-4e42-962d-e71d9313a4bd
stream
127.0.0.1:6379> xrange rq:results:2064a66f-b2cf-4e42-962d-e71d9313a4bd - +
1) 1) "1691107781067-0"
   2) 1) "type"
      2) "1"
      3) "return_value"
      4) "gAWVCQAAAAAAAACMBWhlbGxvlC4="
2) 1) "1691107841069-0"
   2) 1) "type"
      2) "1"
      3) "return_value"
      4) "gAWVCQAAAAAAAACMBWhlbGxvlC4="
3) 1) "1691107901019-0"
   2) 1) "type"
      2) "1"
      3) "return_value"
      4) "gAWVCQAAAAAAAACMBWhlbGxvlC4="

対策方法

幸いにも結果を保存するキーは別なので単純にそれだけ定期的に削除するようにすればいいかなと思います

127.0.0.1:6379> del rq:results:2064a66f-b2cf-4e42-962d-e71d9313a4bd

もしかするとそれをやってくれる API が rq にあるかもしれません

最後に

冒頭も記載しましたがジョブ (redis のキー自体) がなくなるので注意しましょう
公式にもありますが cron ジョブでは基本的には result_ttl=-1 で無期限かかなり長めの値を設定するのがいいようです

参考サイト

2023年8月10日木曜日

pydantic の BaseModel でデータベースに接続してバリデーションを行うケースを考える

pydantic の BaseModel でデータベースに接続してバリデーションを行うケースを考える

概要

結論からすると pydantic の BaseModel 内で DB を参照するのは好ましくないので context を使いましょう

コードは前回のものを流用します

環境

  • Python 3.11.3
  • fastapi 0.100.1
  • pypdantic 2.1.1

サンプルコード

from enum import Enum

from fastapi import Depends, FastAPI, Query, Request
from fastapi.datastructures import QueryParams
from pydantic import BaseModel, Field, FieldValidationInfo, field_validator
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import Session, declarative_base, sessionmaker

app = FastAPI()


engine = create_engine("mysql+pymysql://root@localhost/test?charset=utf8mb4")
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()


class User(Base):
    __tablename__ = "user"

    id = Column(Integer, primary_key=True)
    name = Column(String(50))
    age = Column(Integer)


def get_db():
    db: Session = SessionLocal()
    try:
        yield db
        db.commit()
    except Exception:
        db.rollback()
    finally:
        db.close()


# 各種アクション名を管理する列挙型のクラス
class ActionName(Enum):
    CREATE_INSTANCE = "CreateInstance"


# CreateInstance(例) という名前のアクションのバリデーションを管理するモデル
class CreateInstance(BaseModel):
    instance_name: str = Field(alias="InstanceName")

    @field_validator("instance_name")
    @classmethod
    def validate_instance_name(cls, v: str, info: FieldValidationInfo) -> str:
        # context は FieldValidationInfo に格納されているので使用する場合は引数に追加する
        context = info.context
        # データベースから取得した値が取得できていることが確認できる
        print(context)
        if v != "ins01":
            raise ValueError()
        return v


# 起点となる Action 名のバリデーションを管理するモデル
class Action(BaseModel):
    action: str = Field(Query(alias="Action"))

    @field_validator("action")
    @classmethod
    def validate_action(cls, v: str) -> str:
        # ここでは許可するバリデーション名など基本的なバリデーションのみを行う
        if v not in [ActionName.CREATE_INSTANCE.value]:
            raise ValueError()
        return v

    # アクション名に応じて各種バリデーションをコールする
    def validate(self, query_params: QueryParams, context: dict):
        if self.action == ActionName.CREATE_INSTANCE.value:
            CreateInstance.model_validate(query_params, context=context)


@app.get("/")
async def root(
    request: Request, action: Action = Depends(), db: Session = Depends(get_db)
):
    # データベースの値はここで取得してバリデーション対応の値を context として渡す
    users = db.query(User).all()
    names = [user.name for user in users]
    # ルーティングの引数で Request オブジェクトを参照できるのでこれを使って各種アクションのバリデーションをコールする
    action.validate(request.query_params, context={"names": names})
    return action

解説

ポイントは context です
データベースへの参照はルーティング内でのみ行います
そして必要な値をデータベース側で渡すようにしましょう

このケースの場合は model_validate を使うことが前提となっているのでルーティングの引数でモデルを指定して自動でバリデーションしているような場合は少し改修が必要になります

最後に

一応データベースを参照することもできますがその場合はグローバルにアクセスできる関数やデータベースのコンテキストを考える必要があるので少し面倒なのとコードの可読性が下がるかなと思います

参考サイト

2023年8月9日水曜日

fastapi で AWS のような Action 名ベースの API をどう構築するか考える (ルーティング編)

fastapi で AWS のような Action 名ベースの API をどう構築するか考える (ルーティング編)

概要

前回バリデーションをどうするか検討しました
今回はさらにルーティングをどう管理するか考えます

環境

  • Python 3.11.3
  • fastapi 0.100.1
  • pypdantic 2.1.1

方針

  • ユーザインタフェースとなるルーティングは一つだが内部では Action ごとにルーティングを分けたい
  • root で一旦すべてのルーティングを受けて Action 名で判断し適切なルーティングをコールする

サンプルコード

from enum import Enum

from fastapi import Depends, FastAPI, Query, Request
from fastapi.datastructures import QueryParams
from pydantic import BaseModel, Field, FieldValidationInfo, field_validator
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import Session, declarative_base, sessionmaker

app = FastAPI()


engine = create_engine("mysql+pymysql://root@localhost/test?charset=utf8mb4")
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()


class User(Base):
    __tablename__ = "user"

    id = Column(Integer, primary_key=True)
    name = Column(String(50))
    age = Column(Integer)


def get_db():
    db: Session = SessionLocal()
    try:
        yield db
        db.commit()
    except Exception:
        db.rollback()
    finally:
        db.close()


# 各種アクション名を管理する列挙型のクラス
class ActionName(Enum):
    CREATE_INSTANCE = "CreateInstance"


# CreateInstance(例) という名前のアクションのバリデーションを管理するモデル
class CreateInstance(BaseModel):
    instance_name: str = Field(alias="InstanceName")

    @field_validator("instance_name")
    @classmethod
    def validate_instance_name(cls, v: str, info: FieldValidationInfo) -> str:
        # context は FieldValidationInfo に格納されているので使用する場合は引数に追加する
        context = info.context
        # データベースから取得した値が取得できていることが確認できる
        print(context)
        if v != "ins01":
            raise ValueError()
        return v


# 起点となる Action 名のバリデーションを管理するモデル
class Action(BaseModel):
    action: str = Field(Query(alias="Action"))

    @field_validator("action")
    @classmethod
    def validate_action(cls, v: str) -> str:
        # ここでは許可するバリデーション名など基本的なバリデーションのみを行う
        if v not in [ActionName.CREATE_INSTANCE.value]:
            raise ValueError()
        return v

    # アクション名に応じて各種バリデーションをコールする
    def validate(self, query_params: QueryParams, context: dict):
        if self.action == ActionName.CREATE_INSTANCE.value:
            CreateInstance.model_validate(query_params, context=context)


@app.get("/")
async def root(
    request: Request, action: Action = Depends(), db: Session = Depends(get_db)
):
    if action == ActionName.CREATE_INSTANCE.value:
        return await create_instance(request, action, db)


async def create_instance(
    request: Request, action: Action = Depends(), db: Session = Depends(get_db)
):
    # データベースの値はここで取得してバリデーション対応の値を context として渡す
    users = db.query(User).all()
    names = [user.name for user in users]
    # ルーティングの引数で Request オブジェクトを参照できるのでこれを使って各種アクションのバリデーションをコールする
    action.validate(request.query_params, context={"names": names})
    return action

解説

CreateInstance アクションの場合は create_instance のルーティングを使います
@app.get などは付与せずにただの非同期関数として定義します

もし create_instance をインタフェースとして公開したい場合には @app.get などを付与して公開することができるようにしておきます

root ルーティングでは Action 名に応じてルーティングを決定しています
バリデーション時も同じような判定をしているのでここはもう少しうまくやればキレイに書けるかもしれません

最後に

Action 名ごとにルーティングを定義する場合には Action 名ごとにルーティングを作成するとコードがキレイになるかもしれません

2023年8月8日火曜日

fastapi で AWS のような Action 名ベースの API をどう構築するか考える (バリデーション編)

fastapi で AWS のような Action 名ベースの API をどう構築するか考える (バリデーション編)

概要

クエリストリングで来る値をどうやってルーティングしバリデーションするか考えます

環境

  • Python 3.11.3
  • fastapi 0.100.1
  • pypdantic 2.1.1

方針

  • ルーティングのパラメータでは Action のみバリデーションする
  • ルーティング内で Request オブジェクトを参照して Action 名に応じた各種バリデーションをコールする (mode_validate)

上記方針の理由としては BaseModel に Request オブジェクトが渡せないためです

サンプルコード

from enum import Enum

from fastapi import Depends, FastAPI, Query, Request
from fastapi.datastructures import QueryParams
from pydantic import BaseModel, Field, field_validator

app = FastAPI()


# 各種アクション名を管理する列挙型のクラス
class ActionName(Enum):
    CREATE_INSTANCE = "CreateInstance"


# CreateInstance(例) という名前のアクションのバリデーションを管理するモデル
class CreateInstance(BaseModel):
    instance_name: str = Field(alias="InstanceName")

    @field_validator("instance_name")
    @classmethod
    def validate_instance_name(cls, v: str) -> str:
        if v != "ins01":
            raise ValueError()
        return v


# 起点となる Action 名のバリデーションを管理するモデル
class Action(BaseModel):
    action: str = Field(Query(alias="Action"))

    @field_validator("action")
    @classmethod
    def validate_action(cls, v: str) -> str:
        # ここでは許可するバリデーション名など基本的なバリデーションのみを行う
        if v not in [ActionName.CREATE_INSTANCE.value]:
            raise ValueError()
        return v

    # アクション名に応じて各種バリデーションをコールする
    def validate(self, query_params: QueryParams):
        if self.action == ActionName.CREATE_INSTANCE.value:
            CreateInstance.model_validate(query_params)


@app.get("/")
async def root(request: Request, action: Action = Depends()):
    # ルーティングの引数で Request オブジェクトを参照できるのでこれを使って各種アクションのバリデーションをコールする
    action.validate(request.query_params)
    return action

解説

クエリストリングはキャメルケースで来ることを想定しているので alias プロパティを使ってクエリストリング側のキー名を指定します

BaseModel を使って起点となる Action 名を管理するクラスと Action 名に基づく各種属性を管理するクラスを作成します
モデルクラスは必ず Field を使って定義しましょう、そうしないとうまくバリデーションメソッドがコールされません

Action クラスでは単純な名前のチェックだけを行います
細かいバリデーションはそれぞれで定義したアクション用のクラスを使います

pydantic は V2 を使っているので model_validate や field_validator を使っています

最後に

とりあえずこれをベースに更にデータベースとの連携やバリデーション用のオブジェクトの受け渡しなども考えてみたいと思います

参考サイト

2023年8月7日月曜日

rqscheduler で登録したジョブの cron 情報を変更する方法

rqscheduler で登録したジョブの cron 情報を変更する方法

概要

ジョブの meta 情報を書き換えます
ジョブの操作は Flask-RQ2 を使っています

環境

  • Python 3.11.3
  • flask-rq2 18.3
  • rq-scheduler 0.13.1

サンプルコード

from flask import Flask, request
from flask_rq2 import RQ
from rq.job import Job

app = Flask(__name__)
app.config["RQ_REDIS_URL"] = "redis://localhost:6379/0"

rq = RQ(app)


def hello():
    return "hello"


@app.route("/update")
def update_job():
    job_id = request.args["job_id"]
    cron_string = request.args.get("cron_string", "*/2 * * * *")
    scheduler = rq.get_scheduler(queue="bar")
    job = Job.fetch(job_id, connection=scheduler.connection)
    job.meta["cron_string"] = cron_string
    job.save_meta()
    return {"id": job.id, "cron_string": job.meta.get("cron_string")}


@app.route("/")
def get_jobs():
    scheduler = rq.get_scheduler(queue="bar")
    jobs = scheduler.get_jobs()
    return [{"id": job.id, "cron_string": job.meta.get("cron_string")} for job in jobs]


@app.route("/register")
def register_job():
    scheduler = rq.get_scheduler(queue="bar")
    job = scheduler.cron(
        "*/1 * * * *",
        func=hello,
        args=[],
    )
    return job.get_id()

update でジョブのメタ情報を更新しています
Job.fetch で job_id を指定して取得しその後 job.meta を更新することで cron 情報を変更することができます

動作確認

  • pipenv run rq worker bar
  • pipenv run rqscheduler
  • pipenv run flask run flask

で各種プロセスを起動しておきます

  • curl localhost:5000/register

で一度ジョブを登録し

  • curl "localhost:5000/update?job_id=b6498c8c-f78b-4be6-a3e4-857c663f9d18&cron_string=%2A%2F3+%2A+%2A+%2A+%2A"

で cron 情報を書き換えます
そしてワーカーのログで更新した時間間隔で実行されていることを確認しましょう

念のため redis 側の情報も書き換わっているか確認しましょう
meta のフィールドが cron 情報になります

127.0.0.1:6379> hgetall rq:job:b6498c8c-f78b-4be6-a3e4-857c663f9d18
 1) "meta"
 2) "\x80\x05\x957\x00\x00\x00\x00\x00\x00\x00}\x94(\x8c\x0bcron_string\x94\x8c\x0b*/3 * * * *\x94\x8c\x12use_local_timezone\x94\x89u."
 3) "timeout"
 4) "180"
 5) "result_ttl"
 6) "-1"
 7) "created_at"
 8) "2023-08-03T06:20:34.186347Z"
 9) "started_at"
10) "2023-08-03T06:54:52.683199Z"
11) "status"
12) "finished"
13) "stopped_callback_name"
14) ""
15) "failure_callback_name"
16) ""
17) "success_callback_name"
18) ""
19) "ended_at"
20) "2023-08-03T06:54:52.801018Z"
21) "data"
22) "x\x9ck`\x9d*\xca\x00\x01\x1a=\x9c\x89\x05\x05z\x19\xa999\xf9S\xfcb\xa7\xd4N)\x99\xa2\a\x00y^\t\x8a"
23) "enqueued_at"
24) "2023-08-03T06:54:52.671411Z"
25) "last_heartbeat"
26) "2023-08-03T06:54:52.801019Z"
27) "description"
28) "app.hello()"
29) "worker_name"
30) "e3bb9dd7b1f544ba9042ea2b1675cc50"
31) "origin"
32) "bar"

最後に

一度登録したジョブでもメタ情報を更新することで実行時間を変更することができました

参考サイト

2023年8月4日金曜日

fastapi でクエリストリングを BaseModel を使って定義する方法

fastapi でクエリストリングを BaseModel を使って定義する方法

概要

Query クラスを使って属性として定義することでクエリストリングをクラスベースで定義できます

環境

  • Python 3.11.3
  • fastapi 0.100.1
  • pypdantic 2.1.1

サンプルコード

  • vim ./app.py
from typing import Optional, Type

from fastapi import Depends, FastAPI, Query, Request
from pydantic import BaseModel, field_validator

app = FastAPI()


class User(BaseModel):
    name: str = Query()
    height: Optional[int] = Query(default=100)

    @field_validator("name")
    @classmethod
    def validate_name(cls, v: str) -> str:
        if v != "hawksnowlog":
            raise ValueError()
        return v


def validate(model: Type[User]):
    async def func(
        request: Request,
        content: model = Depends(),
        name: str = "default",
        height: Optional[int] = 100,
        age: int = Depends(get_age),
    ) -> model:
        print(request)
        print(content)
        if age > 10:
            raise ValueError()
        return User(name=name, height=height)

    return func


def get_age():
    return 10


@app.get("/")
async def query_test(user: User = Depends(validate(User))):
    return user

解説

ちょっと余計な validate が入っていますがこれは普通は不要です

BasModel を継承して Query でフィールドを定義すれば OK です
pydantic V2 からは field_validator というデコレータを使ってクラスメソッドとして各種フィールドのバリデーションを定義することができます

request と content が BaseModel 生成時に渡されるようです

最後に

pydantic V2 を今回は使っています
使い方はあまり変わらないですが使用するデコレータ名が変わっているので注意が必要です

参考サイト