2023年6月30日金曜日

Python paho-mqtt ライブラリを独自クラスとして定義する

Python paho-mqtt ライブラリを独自クラスとして定義する

概要

mqtt.Client をそのまま使わず継承して使います
その場合 on_connect などをインスタンスメソッドとして定義できます

環境

  • macOS 13.4.1
  • Python 3.11.3
  • paho-mqtt 1.6.1
  • mosquitto 2.0.15

mosquittoの起動

  • brew install mosquitto
  • brew services run mosquitto
  • mosquitto_sub -h localhost -p 1883 -t default

publish するサンプルコード

  • vim ./app.py
from paho.mqtt.client import Client


class CustomClient(Client):
    def __init__(self):
        super().__init__()

    def on_disconnect(self, client, userdata, rc):
        print("on_disconnect")
        print(client)
        print(userdata)
        print(rc)

    def on_publish(self, client, userdata, mid):
        print("on_publish")
        print(client)
        print(userdata)
        print(mid)
        self.disconnect()


if __name__ == "__main__":
    print("start")
    client = CustomClient()
    client.connect("localhost", port=1883, keepalive=60, bind_address="")
    client.publish(topic="default", payload='{"payload": "hoge"}', qos=0, retain=False)
    print("end")
  • pipenv run python app.py
  • mosquitto_sub -h localhost -p 1883 -t default

subscribe するサンプルコード

  • vim ./app.py
from paho.mqtt.client import Client


class CustomClient(Client):
    def __init__(self):
        super().__init__()

    def on_connect(self, client, userdata, flags, rc):
        print("on_connect")
        print(client)
        print(userdata)
        print(flags)
        print(rc)

    def on_disconnect(self, client, userdata, rc):
        print("on_disconnect")
        print(client)
        print(userdata)
        print(rc)

    def on_message(self, client, userdata, message):
        print("on_message")
        print(client)
        print(userdata)
        print(message.topic)
        print(message.payload)
        self.disconnect()

    def on_subscribe(self, client, userdata, mid, granted_qos):
        print("on_subscribe")
        print(client)
        print(userdata)
        print(mid)
        print(granted_qos)


if __name__ == "__main__":
    print("start")
    client = CustomClient()
    client.connect("localhost", port=1883, keepalive=60, bind_address="")
    client.subscribe("default", qos=0)
    client.loop_forever()
    print("end")

loop_forever (loop_start/loop_stop) はメッセージを受信するときに使います
publish するときには不要です

  • pipenv run python app.py
  • mosquitto_sub -h localhost -p 1883 -t default
  • mosquitto_pub -h localhost -p 1883 -t default -m "hoge"

最後に

クラス化したほうが管理は楽になるかなと思います

参考サイト

0 件のコメント:

コメントを投稿