本篇測評由電子工程世界的優(yōu)秀測評者“JerryZhen”提供。
本文將介紹基于米爾電子MYD-LT527開發(fā)板的網(wǎng)關方案測試。
一、系統(tǒng)概述
基于米爾-全志 T527設計一個簡易的物聯(lián)網(wǎng)網(wǎng)關,該網(wǎng)關能夠管理多臺MQTT設備,通過MQTT協(xié)議對設備進行讀寫操作,同時提供HTTP接口,允許用戶通過HTTP協(xié)議與網(wǎng)關進行交互,并對設備進行讀寫操作。
二、系統(tǒng)架構
- 網(wǎng)關服務:基于FastAPI框架構建的Web服務,提供HTTP接口。
- MQTT客戶端:負責與MQTT設備通信,管理設備連接、消息發(fā)布和訂閱。
- 設備管理:維護一個設備列表,記錄設備的基本信息和狀態(tài)。
數(shù)據(jù)存儲:使用內存或數(shù)據(jù)庫存儲設備數(shù)據(jù),確保數(shù)據(jù)持久化。
三、組件設計
MQTT組件:
- 負責與MQTT broker建立連接。
- 訂閱設備主題,接收設備發(fā)送的消息。
發(fā)布消息到設備,實現(xiàn)遠程控制。
設備管理組件:
- 維護一個設備列表,記錄設備的唯一標識符(如設備ID)、MQTT主題、連接狀態(tài)等信息。
提供設備增刪改查的方法。
HTTP組件:
- 基于FastAPI定義HTTP接口。
- 接收用戶請求,調用MQTT組件和設備管理組件進行相應操作。
返回操作結果給用戶。
四、接口設計
設備列表:
- GET /devices:返回所有設備的列表。
- POST /devices:添加新設備到網(wǎng)關。
DELETE /devices/{device_id}:從網(wǎng)關中刪除指定設備。
設備詳情:
GET /devices/{device_id}:返回指定設備的詳細信息。
設備數(shù)據(jù):
- GET /devices/{device_id}/data:獲取指定設備的最新數(shù)據(jù)。
POST /devices/{device_id}/data:發(fā)送數(shù)據(jù)到指定設備。
設備控制:
POST /devices/{device_id}/control:發(fā)送控制命令到指定設備。
五、數(shù)據(jù)結構設計
設備信息:
- 設備ID (device_id):唯一標識設備的字符串。
- MQTT主題 (mqtt_topic):設備在MQTT broker上的主題。
- 連接狀態(tài) (connection_status):表示設備是否在線的布爾值。
其他設備屬性(如名稱、描述等)。
設備數(shù)據(jù):
- 設備ID (device_id):關聯(lián)設備信息的設備ID。
- 時間戳 (timestamp):數(shù)據(jù)發(fā)送或接收的時間。
數(shù)據(jù)內容 (data):設備發(fā)送或接收的具體數(shù)據(jù),可以是JSON格式或其他格式。
六、安全性考慮
- 使用HTTPS協(xié)議提供安全的HTTP通信。
- 實現(xiàn)用戶認證和授權機制,確保只有授權用戶可以訪問和操作設備。
對于敏感操作(如刪除設備),要求用戶進行二次確認或提供額外的安全措施。
七、部署與擴展
- 使用Docker容器化部署網(wǎng)關服務,便于管理和擴展。
根據(jù)需要,可以水平擴展網(wǎng)關實例以處理更多的設備連接和請求。
八、實現(xiàn)步驟
- 安裝所需的Python庫:fastapi, uvicorn, paho-mqtt等。
- 創(chuàng)建FastAPI應用并定義路由。
- 實現(xiàn)MQTT組件,包括與MQTT broker的連接、訂閱、發(fā)布等功能。
- 實現(xiàn)設備管理組件,維護設備列表并提供增刪改查的方法。
- 實現(xiàn)HTTP組件,調用MQTT組件和設備管理組件處理用戶請求。
- 編寫測試代碼,驗證網(wǎng)關的各項功能是否正常工作。
部署網(wǎng)關服務并監(jiān)控其運行狀態(tài)。
該設計方案僅僅是概述,具體實現(xiàn)細節(jié)可能需要根據(jù)實際需求和項目環(huán)境進行調整和優(yōu)化。在實際開發(fā)中,還需要考慮異常處理、日志記錄、性能優(yōu)化等方面的問題。基于上述設計方案,以下是一個簡化版的參考代碼,展示了如何使用FastAPI和paho-mqtt庫來創(chuàng)建一個物聯(lián)網(wǎng)網(wǎng)關。需要注意,示例中不包含完整的錯誤處理、用戶認證和授權機制,這些在實際生產(chǎn)環(huán)境中都是必不可少的。依賴的主要庫版本:
fastapi==0.108.0
paho-mqtt==1.6.1
網(wǎng)關模擬代碼gateway.py:
from fastapi import FastAPI, HTTPException, Body, status from paho.mqtt.client import Client as MQTTClientfrom typing import List, Dict, Any import asyncio import json app = FastAPI() mqtt_client = None device_data = {}
subtopic="gateway/device/#"
# MQTT回調函數(shù) def on_message(client, userdata, msg): payload = msg.payload.decode() topic = msg.topic device_id = topic.split('/')[-1] device_data[device_id] = payload print(f"Received message from {device_id}: {payload}") # MQTT連接和訂閱 def mqtt_connect_and_subscribe(broker_url, broker_port): global mqtt_client mqtt_client = MQTTClient() mqtt_client.on_message = on_message mqtt_client.connect(broker_url, broker_port, 60) mqtt_client.subscribe(subtopic) mqtt_client.loop_start() # MQTT發(fā)布消息 async def mqtt_publish(topic: str, message: str): if mqtt_client is not None and mqtt_client.is_connected(): mqtt_client.publish(topic, message) else: print("MQTT client is not connected!") # 設備管理:添加設備 @app.post("/devices/", status_code=status.HTTP_201_CREATED) async def add_device(device_id: str): device_data[device_id] = None return {"message": f"Device {device_id} added"} # 設備管理:獲取設備列表 @app.get("/devices/") async def get_devices(): return list(device_data.keys()) # 設備管理:獲取設備數(shù)據(jù) @app.get("/devices/{device_id}/data") async def get_device_data(device_id: str): if device_id not in device_data: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Device {device_id} not found") return device_data.get(device_id) # 設備管理:發(fā)送數(shù)據(jù)到設備 @app.post("/devices/{device_id}/data") async def send_data_to_device(device_id: str, data: Dict[str, Any] = Body(...)): topic = f"devices/{device_id}" message = json.dumps(data) await mqtt_publish(topic, message) return {"message": f"Data sent to {device_id}"} # 設備控制:發(fā)送控制命令到設備 @app.post("/devices/{device_id}/control") async def control_device(device_id: str, command: str): topic = f"devices/device/{device_id}" await mqtt_publish(topic, command) return {"message": f"Control command sent to {device_id}"} # FastAPI啟動事件 @app.on_event("startup") async def startup_event(): mqtt_connect_and_subscribe("127.0.0.1", 1883) # FastAPI關閉事件 @app.on_event("shutdown") async def shutdown_event(): if mqtt_client is not None: mqtt_client.loop_stop() mqtt_client.disconnect() # 運行FastAPI應用 if __name__ == "__main__": import uvicorn uvicorn.run(app,host="127.0.0.1",port=8000)
設備1模擬代碼 dev1.py:
import paho.mqtt.client as mqtt
# 連接成功回調def on_connect(client, userdata, flags, rc): print('Connected with result code '+str(rc)) client.subscribe('devices/1')
# 消息接收回調def on_message(client, userdata, msg): print(msg.topic+" "+str(msg.payload)) client.publish('gateway/device/1',payload=f'echo {msg.payload}',qos=0)
client = mqtt.Client()
# 指定回調函數(shù)client.on_connect = on_connectclient.on_message = on_message
# 建立連接client.connect('127.0.0.1', 1883)# 發(fā)布消息client.publish('gateway/device/1',payload='Hello, I am device',qos=0)
client.loop_forever()
設備2模擬代碼 dev2.py
import paho.mqtt.client as mqtt# 連接成功回調def on_connect(client, userdata, flags, rc): print('Connected with result code '+str(rc)) client.subscribe('devices/2')# 消息接收回調def on_message(client, userdata, msg): print(msg.topic+" "+str(msg.payload)) client.publish('gateway/device/2',payload=f'echo {msg.payload}',qos=0)client = mqtt.Client()# 指定回調函數(shù)client.on_connect = on_connectclient.on_message = on_message# 建立連接client.connect('127.0.0.1', 1883)# 發(fā)布消息client.publish('gateway/device/2',payload='Hello, I am device',qos=0)client.loop_forever()
運行網(wǎng)關代碼,打開網(wǎng)頁得到api接口:? 通過api分別添加設備1和設備2, 在另外兩個控制臺中分別運行模擬設備1和模擬設備2的代碼通過網(wǎng)頁API向設備1發(fā)送數(shù)據(jù) 通過網(wǎng)頁API獲得設備回復的數(shù)據(jù),設備代碼中只是簡單的把網(wǎng)關發(fā)過來的數(shù)據(jù)進行回傳 我們在網(wǎng)關的后臺可以看到完整的數(shù)據(jù)流 至此一個簡易的網(wǎng)關已經(jīng)實現(xiàn)了,接下來將會嘗試實現(xiàn)樓宇里的最常見的bacnet設備進行通訊管理。
-
物聯(lián)網(wǎng)
+關注
關注
2909文章
44631瀏覽量
373248 -
開發(fā)板
+關注
關注
25文章
5047瀏覽量
97442 -
MQTT
+關注
關注
5文章
651瀏覽量
22502 -
全志T527
+關注
關注
0文章
15瀏覽量
124
發(fā)布評論請先 登錄
相關推薦
評論