ESP32 MQTT的庫(kù)有很多,凌順實(shí)驗(yàn)室(lingshunlab.com)這次主要使用AsyncMQTT_ESP32,以后有機(jī)會(huì)再更多的MQTT其他庫(kù)的使用方法。
前提條件
樹莓派部署本地的MQTT服務(wù)端,具體安裝請(qǐng)查看以下連接:
https://lingshunlab.com/book/raspberry-pi/raspberry-pi-install-mosquitto-mqtt-server-and-test-mqtt
ESP32和樹莓派在同一WIFI網(wǎng)絡(luò)里面
效果實(shí)現(xiàn)
凌順實(shí)驗(yàn)室(lingshunlab.com)在本示例展示了使用兩個(gè)ESP32,分別實(shí)現(xiàn)發(fā)布MQTT的主題消息和訂閱并輸出MQTT的主題內(nèi)容。當(dāng)然,可能會(huì)問能不能一個(gè)ESP32同時(shí)又是發(fā)布者,又是訂閱者?答案是可以的,因?yàn)樽鳛榭蛻舳耍际菍?duì)中間商做信息交換。
BOM
需要準(zhǔn)備2個(gè)ESP32,
一個(gè)ESP32用于發(fā)布,
一個(gè)ESP32用于訂閱。
庫(kù)的安裝
可以在Arduino IDE的庫(kù)管理里搜索并安裝:
點(diǎn)擊菜單欄的「工具」---> 「庫(kù)管理」,然后在搜索框中輸入“AsyncMQTT_ESP32”,點(diǎn)擊安裝即可
下圖在我本地已經(jīng)安裝好了:
又或者在Github中下載,并安裝到Arduino的 "libraries"文件夾里
Github 地址:
https://github.com/khoih-prog/AsyncMQTT_ESP32
程序提點(diǎn)
1, 首先,需要加載AsyncMQTT_ESP32的庫(kù)
#include
2,配置MQTT的服務(wù)器信息,可以是IP或者域名的方式
//#define MQTT_HOST IPAddress(192, 168, 100, 100) #define MQTT_HOST "broker.emqx.io" // Broker address #define MQTT_PORT 1883
3,設(shè)置主題,發(fā)布需要主題,訂閱也需要主題
const char *Topic = "lingshunlab/ESP32"; // 主題
4,創(chuàng)建MQTT客戶端的實(shí)例
// 創(chuàng)建MQTT客戶端的實(shí)例,名為mqttClient AsyncMqttClient mqttClient;
5,認(rèn)識(shí)mqttClient的可用的回調(diào)函數(shù)
當(dāng)MQTT觸發(fā)特定事件的時(shí)候,可以配置自定義的函數(shù)
mqttClient.onConnect(onMqttConnect); // 設(shè)置 當(dāng)MQTT連接時(shí)的回調(diào)函數(shù) mqttClient.onDisconnect(onMqttDisconnect); // 設(shè)置 當(dāng)MQTT斷開連接時(shí)的回調(diào)函數(shù) mqttClient.onSubscribe(onMqttSubscribe); // 設(shè)置 當(dāng)MQTT訂閱主題時(shí)的回調(diào)函數(shù) mqttClient.onUnsubscribe(onMqttUnsubscribe); // 設(shè)置 當(dāng)MQTT取消訂閱主題時(shí)的回調(diào)函數(shù) mqttClient.onMessage(onMqttMessage); // 設(shè)置 當(dāng)MQTT訂閱主題時(shí)的回調(diào)函數(shù) mqttClient.onPublish(onMqttPublish); // 設(shè)置 當(dāng)取消MQTT訂閱主題時(shí)的回調(diào)函數(shù) mqttClient.setServer(MQTT_HOST, MQTT_PORT); // 設(shè)置 MQTT服務(wù)器信息
6, 連接MQTT服務(wù)器
mqttClient.setServer(MQTT_HOST, MQTT_PORT); //連接MQTT服務(wù)器
7,發(fā)布主題
通過以下代碼,可以對(duì)配置好的主題發(fā)布消息
// 發(fā)布主題消息 uint16_t packetIdPub = mqttClient.publish(PubTopic, 2, true, "welcome to Lingshunlab.com"); Serial.print("Publisshing at QoS 2, packetId: "); Serial.println(packetIdPub); delay(2000);
8,訂閱主題
通過以下代碼,可以訂閱配置好的主題
// 訂閱MQTT主題,并QoS設(shè)置為2 uint16_t packetIdSub = mqttClient.subscribe(SubTopic, 2); Serial.print("Subscribing at QoS 2, packetId: "); Serial.println(packetIdSub);
9,當(dāng)發(fā)生主題消息變化的時(shí)候的回調(diào)函數(shù)
mqttClient的回調(diào)函數(shù)有很多種,請(qǐng)仔細(xì)學(xué)習(xí)查看例子中其他的回調(diào)函數(shù)。在這里,特別說明一下onMessage的回調(diào)函數(shù)onMqttMessage(這個(gè)函數(shù)名稱你可以自己定義,隨喜),里面有不少參數(shù),例如topic,payload等,其中payload即是消息的內(nèi)容,可以通過輸出顯示。
void onMqttMessage(char* topic, char* payload, const AsyncMqttClientMessageProperties& properties, const size_t& len, const size_t& index, const size_t& total) { (void) payload; Serial.println("=====On MQTT Message====="); Serial.println("Publish received."); Serial.print(" topic: "); Serial.println(topic); Serial.print(" qos: "); Serial.println(properties.qos); Serial.print(" dup: "); Serial.println(properties.dup); Serial.print(" retain: "); Serial.println(properties.retain); Serial.print(" len: "); Serial.println(len); Serial.print(" index: "); Serial.println(index); Serial.print(" total: "); Serial.println(total); Serial.print("payload: "); Serial.println(payload); // 輸出消息內(nèi)容 }
10,請(qǐng)查看AsyncMQTT_ESP32的官方例子
可以學(xué)習(xí)到FreeRTOS的多線程如何應(yīng)用。
發(fā)布主題de完整代碼
#include// 配置 WIFI #define WIFI_SSID "***your wifi***" #define WIFI_PASSWORD "***your wifi password***" // 加載AsyncMQTT_ESP32庫(kù) #include // 配置MQTT服務(wù)器地址和端口 #define MQTT_HOST IPAddress(192,168,100,100) // Broker IP // #define MQTT_HOST "broker.emqx.io" // Broker address #define MQTT_PORT 1883 const char *PubTopic = "lingshunlab/ESP32"; // 發(fā)布消息的主題 AsyncMqttClient mqttClient; // 創(chuàng)建 MQTT客戶端實(shí)例 void onMqttConnect(bool sessionPresent) // 編寫對(duì)應(yīng)的回調(diào)函數(shù) { Serial.println("=====On MQTT Connect====="); Serial.print("Connected to MQTT broker: "); Serial.print(MQTT_HOST); Serial.print(", port: "); Serial.println(MQTT_PORT); Serial.print("PubTopic: "); Serial.println(PubTopic); } void onMqttDisconnect(AsyncMqttClientDisconnectReason reason) { (void) reason; Serial.println("Disconnected from MQTT."); } void onMqttSubscribe(const uint16_t& packetId, const uint8_t& qos) { Serial.println("Subscribe acknowledged."); Serial.print(" packetId: "); Serial.println(packetId); Serial.print(" qos: "); Serial.println(qos); } void onMqttUnsubscribe(const uint16_t& packetId) { Serial.println("Unsubscribe acknowledged."); Serial.print(" packetId: "); Serial.println(packetId); } void onMqttMessage(char* topic, char* payload, const AsyncMqttClientMessageProperties& properties, const size_t& len, const size_t& index, const size_t& total) { (void) payload; Serial.println("=====On MQTT Message====="); Serial.println("Publish received."); Serial.print(" topic: "); Serial.println(topic); Serial.print(" qos: "); Serial.println(properties.qos); Serial.print(" dup: "); Serial.println(properties.dup); Serial.print(" retain: "); Serial.println(properties.retain); Serial.print(" len: "); Serial.println(len); Serial.print(" index: "); Serial.println(index); Serial.print(" total: "); Serial.println(total); Serial.print("payload: "); Serial.println(payload); // 輸出消息內(nèi)容 } void onMqttPublish(const uint16_t& packetId) { Serial.println("Publish acknowledged."); Serial.print(" packetId: "); Serial.println(packetId); } void setup() { Serial.begin(115200); while (!Serial && millis() < 5000); delay(500); // 連接WIFI WiFi.begin(WIFI_SSID, WIFI_PASSWORD); while (WiFi.waitForConnectResult() != WL_CONNECTED) { Serial.println("Connection Failed! Rebooting..."); delay(5000); ESP.restart(); // 重啟esp32 } delay(500); mqttClient.onConnect(onMqttConnect); // 設(shè)置 當(dāng)MQTT連接時(shí)的回調(diào)函數(shù) mqttClient.onDisconnect(onMqttDisconnect); // 設(shè)置 當(dāng)MQTT斷開連接時(shí)的回調(diào)函數(shù) mqttClient.onMessage(onMqttMessage); // 設(shè)置 當(dāng)MQTT訂閱主題時(shí)的回調(diào)函數(shù) mqttClient.onPublish(onMqttPublish); // 設(shè)置 當(dāng)取消MQTT訂閱主題時(shí)的回調(diào)函數(shù) mqttClient.setServer(MQTT_HOST, MQTT_PORT); // 設(shè)置 MQTT服務(wù)器信息 mqttClient.connect(); // 連接 MQTT delay(500); } void loop() { // 發(fā)布主題消息 uint16_t packetIdPub = mqttClient.publish(PubTopic, 2, true, "welcome to Lingshunlab.com"); Serial.print("Publisshing at QoS 2, packetId: "); Serial.println(packetIdPub); delay(2000); }
上傳代碼后,程序?qū)?huì)先連接WIFI,然后連接MQTT服務(wù)器,再之后每隔2秒發(fā)布一個(gè)對(duì)應(yīng)主題的消息
訂閱主題de完整代碼
#include// 配置 WIFI #define WIFI_SSID "***your wifi***" #define WIFI_PASSWORD "***your wifi password***" // 加載 AsyncMQTT_ESP32 庫(kù) #include // 配置MQTT服務(wù)器地址和端口 #define MQTT_HOST IPAddress(192,168,1,55) // Broker IP // #define MQTT_HOST "broker.emqx.io" // Broker address #define MQTT_PORT 1883 const char *SubTopic = "lingshunlab/ESP32"; // 訂閱的主題 AsyncMqttClient mqttClient; void onMqttConnect(bool sessionPresent) { Serial.println("=====On MQTT Connect====="); Serial.print("Connected to MQTT broker: "); Serial.print(MQTT_HOST); Serial.print(", port: "); Serial.println(MQTT_PORT); Serial.print("PubTopic: "); Serial.println(SubTopic); // 訂閱MQTT主題,并QoS設(shè)置為2 uint16_t packetIdSub = mqttClient.subscribe(SubTopic, 2); Serial.print("Subscribing at QoS 2, packetId: "); Serial.println(packetIdSub); } void onMqttDisconnect(AsyncMqttClientDisconnectReason reason) { (void) reason; Serial.println("Disconnected from MQTT."); } void onMqttSubscribe(const uint16_t& packetId, const uint8_t& qos) { Serial.println("=====On MQTT Subscribe====="); Serial.println("Subscribe acknowledged."); Serial.print(" packetId: "); Serial.println(packetId); Serial.print(" qos: "); Serial.println(qos); } void onMqttUnsubscribe(const uint16_t& packetId) { Serial.println("Unsubscribe acknowledged."); Serial.print(" packetId: "); Serial.println(packetId); } void onMqttMessage(char* topic, char* payload, const AsyncMqttClientMessageProperties& properties, const size_t& len, const size_t& index, const size_t& total) { (void) payload; Serial.println("=====On MQTT Message====="); Serial.println("Publish received."); Serial.print(" topic: "); Serial.println(topic); Serial.print(" qos: "); Serial.println(properties.qos); Serial.print(" dup: "); Serial.println(properties.dup); Serial.print(" retain: "); Serial.println(properties.retain); Serial.print(" len: "); Serial.println(len); Serial.print(" index: "); Serial.println(index); Serial.print(" total: "); Serial.println(total); Serial.print("payload: "); Serial.println(payload); } void setup() { Serial.begin(115200); while (!Serial && millis() < 5000); delay(500); // 連接WIFI WiFi.begin(WIFI_SSID, WIFI_PASSWORD); while (WiFi.waitForConnectResult() != WL_CONNECTED) { Serial.println("Connection Failed! Rebooting..."); delay(5000); ESP.restart(); } delay(500); mqttClient.onConnect(onMqttConnect); // 設(shè)置 當(dāng)MQTT連接時(shí)的回調(diào)函數(shù) mqttClient.onDisconnect(onMqttDisconnect); // 設(shè)置 當(dāng)MQTT斷開連接時(shí)的回調(diào)函數(shù) mqttClient.onSubscribe(onMqttSubscribe); // 設(shè)置 當(dāng)MQTT訂閱主題時(shí)的回調(diào)函數(shù) mqttClient.onUnsubscribe(onMqttUnsubscribe); // 設(shè)置 當(dāng)取消MQTT訂閱主題時(shí)的回調(diào)函數(shù) mqttClient.onMessage(onMqttMessage); // 設(shè)置 當(dāng)MQTT收到主題消息時(shí)的回調(diào)函數(shù) mqttClient.setServer(MQTT_HOST, MQTT_PORT); // 設(shè)置 MQTT服務(wù)器信息 mqttClient.connect(); // 連接 MQTT delay(500); } void loop() { }
上傳代碼后,程序?qū)?huì)先連接WIFI,然后連接MQTT服務(wù)器,當(dāng)連接MQTT時(shí),則會(huì)訂閱主題,之后每隔2秒就會(huì)收到主題發(fā)布的消息
審核編輯:劉清
-
回調(diào)函數(shù)
+關(guān)注
關(guān)注
0文章
87瀏覽量
11562 -
Arduino
+關(guān)注
關(guān)注
188文章
6469瀏覽量
187077 -
樹莓派
+關(guān)注
關(guān)注
116文章
1707瀏覽量
105633 -
MQTT
+關(guān)注
關(guān)注
5文章
651瀏覽量
22506 -
ESP32
+關(guān)注
關(guān)注
18文章
971瀏覽量
17261
原文標(biāo)題:ESP32 運(yùn)行MQTT客戶端進(jìn)行主題的發(fā)布和訂閱
文章出處:【微信號(hào):凌順實(shí)驗(yàn)室,微信公眾號(hào):凌順實(shí)驗(yàn)室】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論