使用Python與MQTT實(shí)現(xiàn)異步通信功能
什么是MQTT協(xié)議?
MQTT是一種輕量級(jí)的發(fā)布/訂閱消息傳輸協(xié)議,設(shè)計(jì)用于低帶寬和高延遲的網(wǎng)絡(luò)環(huán)境,非常適合物聯(lián)網(wǎng)設(shè)備之間的通信。其主要特點(diǎn)包括:
- 發(fā)布/訂閱模型:支持多對(duì)多的消息傳遞。
- 輕量級(jí)設(shè)計(jì):較低的網(wǎng)絡(luò)開(kāi)銷。
- 支持QoS等級(jí):提供不同的消息傳遞可靠性。
項(xiàng)目背景
本文的示例代碼實(shí)現(xiàn)了一個(gè)基于Python的MQTT客戶端。以下功能涵蓋在代碼中:
- 通過(guò)SSL安全連接到MQTT代理。
- 支持動(dòng)態(tài)訂閱多個(gè)主題。
- 異步處理消息,提高性能和擴(kuò)展性。
- 提供自定義消息處理功能。
核心代碼解析
以下是代碼中的主要功能與模塊解析:
MQTT 客戶端類
class MQTTClient:
def __init__(self, broker, port, username, password, ca_cert, topics):
self.client = mqtt.Client()
self.client.username_pw_set(self.username, self.password)
self.client.tls_set(ca_certs=self.ca_cert)
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
tls_set:?jiǎn)⒂肧SL/TLS以確保通信安全。主題訂閱:在連接成功時(shí),自動(dòng)訂閱指定的主題。
自定義消息處理
def set_message_handler(self, handler):
self.custom_message_handler = handler
用戶可通過(guò)該方法傳入自定義的回調(diào)函數(shù),從而根據(jù)業(yè)務(wù)邏輯處理消息。
異步啟動(dòng)客戶端
async def start_async(self):
self.connect()
await asyncio.get_event_loop().run_in_executor(None, self.client.loop_forever)
通過(guò)異步事件循環(huán)確保消息的高效處理,同時(shí)避免阻塞主線程。
示例代碼集成
在主文件main.py中,定義了如下流程:
- 初始化MQTT客戶端并傳入必要的參數(shù)。
- 注冊(cè)一個(gè)自定義的消息處理函數(shù)。
- 利用
asyncio實(shí)現(xiàn)消息處理和其他任務(wù)的并發(fā)執(zhí)行。
async def on_mqtt_message(topic, payload):
print(f"Custom handler: {topic} -> {payload}")
mqtt_client.set_message_handler(on_mqtt_message)
await mqtt_client.start_async()
使用指南
安裝依賴
確保安裝了paho-mqtt庫(kù):
pip install paho-mqtt
配置MQTT代理
更新代碼中的代理地址、端口、用戶名、密碼和證書(shū)路徑。
運(yùn)行程序
使用以下命令運(yùn)行程序:
python main.py
總結(jié)
快速搭建一個(gè)基于MQTT協(xié)議的實(shí)時(shí)通信系統(tǒng)。這種架構(gòu)不僅適用于物聯(lián)網(wǎng)場(chǎng)景,也可以在各種需要實(shí)時(shí)數(shù)據(jù)推送的應(yīng)用中發(fā)揮作用,例如聊天應(yīng)用和實(shí)時(shí)監(jiān)控系統(tǒng)。
示例代碼
mqtt.py
import paho.mqtt.client as mqtt
from datetime import datetime
import asyncio
class MQTTClient:
def __init__(self, broker, port, username, password, ca_cert, topics):
"""
初始化 MQTT 客戶端
"""
self.broker = broker
self.port = port
self.username = username
self.password = password
self.ca_cert = ca_cert
self.topics = topics
self.client = mqtt.Client()
# 配置 MQTT 客戶端
self.client.username_pw_set(self.username, self.password)
self.client.tls_set(ca_certs=self.ca_cert)
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.custom_message_handler = None # 自定義消息處理器
def set_message_handler(self, handler):
"""
設(shè)置自定義消息處理回調(diào)函數(shù)
"""
self.custom_message_handler = handler
def on_connect(self, client, userdata, flags, rc):
"""
連接成功時(shí)的回調(diào)
"""
if rc == 0:
print("SSL連接成功")
for topic in self.topics:
client.subscribe(topic)
print(f"已訂閱主題: {topic}")
else:
print(f"連接失敗,返回碼: {rc}")
def on_message(self, client, userdata, msg):
"""
收到消息時(shí)的回調(diào)
"""
current_time = datetime.now()
payload = msg.payload.decode()
print(f"收到消息: {msg.topic} -> {payload} 時(shí)間: {current_time}")
if self.custom_message_handler and self.event_loop:
asyncio.run_coroutine_threadsafe(
self.custom_message_handler(msg.topic, payload),
self.event_loop
)
def connect(self):
"""
連接到 MQTT 服務(wù)器
"""
self.client.connect(self.broker, self.port, keepalive=60)
async def start_async(self):
"""
異步運(yùn)行 MQTT 客戶端
"""
self.connect() # 確保連接到 MQTT 服務(wù)器
print("Starting MQTT client loop...")
# 異步運(yùn)行 MQTT 客戶端的事件循環(huán)
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self.client.loop_forever)
main.py
import asyncio
from mqtt import MQTTClient
# MQTT 配置
MQTT_BROKER = "你的服務(wù)器地址"
MQTT_PORT = 8883 # 使用 SSL 的端口
MQTT_USERNAME = "用戶名"
MQTT_PASSWORD = "密碼"
CA_CERT = "./emqxsl-ca.crt" # CA 證書(shū)路徑
TOPICS = ["clients/disconnect", "uhome/esp32"] # 訂閱的主題列表
async def main():
loop = asyncio.get_running_loop()
mqtt_client = MQTTClient(
broker=MQTT_BROKER,
port=MQTT_PORT,
username=MQTT_USERNAME,
password=MQTT_PASSWORD,
ca_cert=CA_CERT,
topics=TOPICS
)
async def on_mqtt_message(topic, payload):
print(f"Custom handler: {topic} -> {payload}")
mqtt_client.set_message_handler(on_mqtt_message)
mqtt_client.event_loop = loop # 將事件循環(huán)傳遞給 MQTT 客戶端
await mqtt_client.start_async()
await asyncio.gather(websocket_task, periodic_task)
if __name__ == "__main__":
asyncio.run(main())
到此這篇關(guān)于使用Python與MQTT實(shí)現(xiàn)異步通信功能的文章就介紹到這了,更多相關(guān)Python MQTT異步通信內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python抽樣方法解讀及實(shí)現(xiàn)過(guò)程
這篇文章主要介紹了python抽樣方法解讀及實(shí)現(xiàn)過(guò)程講解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-02-02
詳解Python中__new__和__init__的區(qū)別與聯(lián)系
在Python中,每個(gè)對(duì)象都有兩個(gè)特殊的方法:__new__和__init__,本文將詳細(xì)介紹這兩個(gè)方法的不同之處以及它們之間的聯(lián)系,具有一定的參考價(jià)值,感興趣的可以了解一下2023-12-12
Django?Rest?Framework實(shí)現(xiàn)身份認(rèn)證源碼詳解
這篇文章主要為大家介紹了Django?Rest?Framework實(shí)現(xiàn)身份認(rèn)證源碼詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-05-05
Python數(shù)據(jù)分析之繪制ppi-cpi剪刀差圖形
這篇文章主要介紹了Python數(shù)據(jù)分析之繪制ppi-cpi剪刀差圖形,ppi-cp剪刀差是通過(guò)這個(gè)指標(biāo)可以了解當(dāng)前的經(jīng)濟(jì)運(yùn)行狀況,下文更多詳細(xì)內(nèi)容介紹需要的小伙伴可以參考一下2022-05-05
5道關(guān)于python基礎(chǔ) while循環(huán)練習(xí)題
這篇文章主要給大家分享的是5道關(guān)于python基礎(chǔ) while循環(huán)練習(xí)題,無(wú)論學(xué)習(xí)什么語(yǔ)言,練習(xí)都是必不可少的,下面文章的練習(xí)題挺精湛的,需要的朋友可以參考一下2021-11-11
在Linux中通過(guò)Python腳本訪問(wèn)mdb數(shù)據(jù)庫(kù)的方法
這篇文章主要介紹了在Linux中通過(guò)Python腳本訪問(wèn)mdb數(shù)據(jù)庫(kù)的方法,本文示例基于debian系的Linux系統(tǒng),需要的朋友可以參考下2015-05-05
淺析Python中將單詞首字母大寫(xiě)的capitalize()方法
這篇文章主要介紹了淺析Python中將單詞首字母大寫(xiě)的capitalize()方法,是Python入門中的基礎(chǔ)知識(shí),需要的朋友可以參考下2015-05-05

