From 20b64405542865a90971729216157a9eb681e94c Mon Sep 17 00:00:00 2001 From: air <30444667+sszdot@users.noreply.github.com> Date: Fri, 23 May 2025 18:14:19 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90=E7=B1=BB=20=20=E5=9E=8B=E3=80=91?= =?UTF-8?q?=EF=BC=9Arefactor=20=E3=80=90=E5=8E=9F=20=20=E5=9B=A0=E3=80=91?= =?UTF-8?q?=EF=BC=9Amqtt=E4=B8=8D=E8=83=BD=E5=8F=91=E9=80=81Qos=E5=8F=82?= =?UTF-8?q?=E6=95=B0=20=E3=80=90=E8=BF=87=20=20=E7=A8=8B=E3=80=91=EF=BC=9A?= =?UTF-8?q?=E6=94=B9=E4=B8=BA=E4=BD=BF=E7=94=A8AsyncMqttClient=20=E5=BA=93?= =?UTF-8?q?=20=E3=80=90=E5=BD=B1=20=20=E5=93=8D=E3=80=91=EF=BC=9A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # 类型 包含: # feat:新功能(feature) # fix:修补bug # docs:文档(documentation) # style: 格式(不影响代码运行的变动) # refactor:重构(即不是新增功能,也不是修改bug的代码变动) # test:增加测试 # chore:构建过程或辅助工具的变动 --- src/FoodDeliveryBase.cpp | 108 +++++++++++++-------------------------- src/FoodDeliveryBase.h | 33 ++++++------ src/commser.cpp | 87 ++++++++++++++++++++++++------- src/commser.h | 1 + src/main.cpp | 51 ++++++++++++++---- 5 files changed, 164 insertions(+), 116 deletions(-) diff --git a/src/FoodDeliveryBase.cpp b/src/FoodDeliveryBase.cpp index 88d93c3..8c15d1b 100644 --- a/src/FoodDeliveryBase.cpp +++ b/src/FoodDeliveryBase.cpp @@ -5,7 +5,7 @@ * @description: 初始化 */ static const char *MOUDLENAME = "FC"; -FoodCube::FoodCube(char *userSsid, char *userPassword, char *userMqttServer, int userMqttPort, char *userMqttName, char *userMqttPassword, uint8_t userMavlinkSerial, uint8_t userVoiceSerial, char *userUdpServerIP, uint32_t userUdpServerPort) +FoodCube::FoodCube(const char *userSsid, const char *userPassword, const char *userMqttServer, int userMqttPort, const char *userMqttName, const char *userMqttPassword, String userMqttTopic, uint8_t userMavlinkSerial, uint8_t userVoiceSerial, const char *userUdpServerIP, uint32_t userUdpServerPort) { /*初始化*/ ssid = userSsid; // wifi帐号 @@ -14,6 +14,7 @@ FoodCube::FoodCube(char *userSsid, char *userPassword, char *userMqttServer, int mqttPort = userMqttPort; // mqtt服务器端口 mqttName = userMqttName; // mqtt帐号 mqttPassword = userMqttPassword; // mqtt密码 + mqttTopic = userMqttTopic; // mqtt主题 mavlinkSerial = userMavlinkSerial; // mavlink用的串口 voiceSerial = userVoiceSerial; // 声音模块用的串口 udpServerIP = userUdpServerIP; // 云台相机ip @@ -54,8 +55,19 @@ FoodCube::FoodCube(char *userSsid, char *userPassword, char *userMqttServer, int break; } /*初始化 连接mqtt对象 赋予mqttClient指针*/ - mqttClient = new PubSubClient(mqttServer, mqttPort, wifiClient); - SetplayvolMax(); + mqttClient = new AsyncMqttClient(); + mqttClient->setServer(mqttServer, mqttPort); + mqttClient->setCredentials(mqttName, mqttPassword); +} +/*析构*/ +FoodCube::~FoodCube() +{ + /*释放mqttClient*/ + if (mqttClient != nullptr) + { + delete mqttClient; + mqttClient = nullptr; + } } /** * @description: 日志打印 @@ -65,7 +77,7 @@ void FoodCube::log(String val) { Serial.print(val); } -void FoodCube::log(char *val) +void FoodCube::log(const char *val) { Serial.print(val); } @@ -85,7 +97,7 @@ void FoodCube::logln(String val) { ESP_LOGD(MOUDLENAME, "%s", val); } -void FoodCube::logln(char *val) +void FoodCube::logln(const char *val) { ESP_LOGD(MOUDLENAME, "%s", val); } @@ -136,10 +148,10 @@ bool FoodCube::checkWiFiStatus() { wificonnected = true; // 获取局域网ip - logln(""); - logln("WiFi connected"); - log("IP address: "); - logln(WiFi.localIP()); + // logln(""); + // logln("WiFi connected"); + // log("IP address: "); + // logln(WiFi.localIP()); localIp = WiFi.localIP(); // 设置开发板为无线终端 获取物理mac地址 WiFi.mode(WIFI_STA); @@ -151,58 +163,6 @@ bool FoodCube::checkWiFiStatus() } return wificonnected; } -/** - * @description: 连接mqtt - * @param {String} topicSub 主题 - */ -void FoodCube::connectMqtt(String topicSub) -{ - if (mqttClient->connected()) - return; - /*尝试连接mqtt*/ - if ((millis() - _tm_mqttconnect) > 3000) - { - logln("connect_mqtt"); - if (mqttClient->connect(macAdd.c_str(), mqttName, mqttPassword)) - { - logln("MQTT Server Connected."); - log("Server Address: "); - logln(mqttServer); - log("ClientId :"); - logln(macAdd); - /*连接成功 订阅主题*/ - // 订阅主题 PS:连接上mqtt服务器 订阅主题 收到信息触发回调函数 receiveCallback - subscribeTopic(topicSub, 1); - delay(500); - playText("服务器已连接"); - } - else - { - // 失败返回状态码 - log("MQTT Server Connect Failed. Client State:"); - logln(mqttClient->state()); - _tm_mqttconnect = millis(); - playText("服务器断开尝试重新连接"); - // delay(3000); - } - } -} - -/** - * @description: 写在loop函数里 检测mqtt连接情况 并保持心跳 - * @param {String} topicSub 主题 - */ -void FoodCube::mqttLoop(String topicSub) -{ - if (mqttClient->connected()) - { // 检测 如果开发板成功连接服务器 - mqttClient->loop(); // 保持心跳 - } - else - { // 如果开发板未能成功连接服务器 - connectMqtt(topicSub); // 则尝试连接服务器 - } -} /** * @description: 订阅指定主题 @@ -220,21 +180,23 @@ void FoodCube::subscribeTopic(String topicString, int Qos = 1) } /** - * @description: 向指定主题 发布信息 - * @param {String} topicString 主题名称 - * @param {String} messageString 发布的内容 + * @brief 向指定主题发布信息(基于 AsyncMqttClient,支持 QoS 设置) + * + * @param topicString MQTT主题名称(不含MAC后缀) + * @param messageString 发布的内容 + * @param qos QoS等级(0, 1 或 2)0:最多一次,1:至少一次,2:只有一次(速度慢) */ -void FoodCube::pubMQTTmsg(String topicString, String messageString) +void FoodCube::pubMQTTmsg(String topicString, String messageString, uint8_t Qos) { - // 处理主题字符串 + // 构造完整主题(拼接设备MAC地址) topicString = topicString + "/" + macAdd; - char publishTopic[topicString.length() + 1]; - strcpy(publishTopic, topicString.c_str()); - // 处理发布内容字符串 - char publishMsg[messageString.length() + 1]; - strcpy(publishMsg, messageString.c_str()); - // 向指定主题 发布信息 - mqttClient->publish(publishTopic, publishMsg); + + // 转换为 const char*,适配 AsyncMqttClient 接口 + const char *publishTopic = topicString.c_str(); + const char *publishMsg = messageString.c_str(); + + // 发布消息,retain 为 false(可按需修改) + mqttClient->publish(publishTopic, Qos, false, publishMsg); } /** diff --git a/src/FoodDeliveryBase.h b/src/FoodDeliveryBase.h index aa3b68a..d654835 100644 --- a/src/FoodDeliveryBase.h +++ b/src/FoodDeliveryBase.h @@ -4,7 +4,6 @@ /*wifi*/ #include "WiFi.h" /*mqtt*/ -#include "PubSubClient.h" #include "AsyncMqttClient.h" /*mavlink*/ @@ -35,29 +34,32 @@ public: /*前端模拟遥控的油门通道*/ uint16_t channels[4] = {1500, 1500, 1500, 1500}; /*初始化*/ - FoodCube(char *userSsid, char *userPassword, char *userMqttServer, int userMqttPort, char *userMqttName, char *userMqttPassword, uint8_t userMavlinkSerial, uint8_t userVoiceSerial, char *userUdpServerIP, uint32_t userUdpServerPort); + FoodCube(const char *userSsid, const char *userPassword, const char *userMqttServer, int userMqttPort, const char *userMqttName, const char *userMqttPassword, String userMqttTopic, uint8_t userMavlinkSerial, uint8_t userVoiceSerial, const char *userUdpServerIP, uint32_t userUdpServerPort); + ~FoodCube(); /*日志打印*/ void log(String val); - void log(char *val); + void log(const char *val); void log(int val); void log(IPAddress val); void log(bool val); void logln(String val); - void logln(char *val); + void logln(const char *val); void logln(int val); void logln(IPAddress val); void logln(bool val); /*get set value*/ String getMacAdd(); /*wifi*/ + String macAdd; // 板子的物理地址(已去掉冒号分隔符) void connectWifi(); bool checkWiFiStatus(); /*mqtt*/ - PubSubClient *mqttClient; // 指向 mqtt服务器连接 对象 + String mqttTopic; // 订阅mqtt主题 + AsyncMqttClient *mqttClient; // 指向 mqtt服务器连接 对象 void connectMqtt(String topicSub); void mqttLoop(String topicSub); void subscribeTopic(String topicString, int Qos); - void pubMQTTmsg(String topicString, String messageString); + void pubMQTTmsg(String topicString, String messageString, uint8_t qos); /*声音模块控制*/ // 枚举,音量 @@ -102,22 +104,23 @@ public: void Camera_action_zoom(uint8_t vzoom); private: - char *ssid; // wifi帐号 - char *password; // wifi密码 - char *mqttServer; // mqtt服务器地址 - int mqttPort; // mqtt服务器端口 - char *mqttName; // mqtt帐号 - char *mqttPassword; // mqtt密码 + const char *ssid; // wifi帐号 + const char *password; // wifi密码 + const char *mqttServer; // mqtt服务器地址 + int mqttPort; // mqtt服务器端口 + const char *mqttName; // mqtt帐号 + const char *mqttPassword; // mqtt密码 + uint8_t mavlinkSerial; // 飞控占用的串口号 uint8_t voiceSerial; // 飞控占用的串口号 WiFiClient wifiClient; // 网络客户端 IPAddress localIp; // 板子的IP地址 - String macAdd; // 板子的物理地址(已去掉冒号分隔符) - bool wificonnected; // 网络是否连接 + + bool wificonnected; // 网络是否连接 /*云台相机控制*/ WiFiUDP udp; // udp信息操作对象 - char *udpServerIP; // 云台相机ip地址 + const char *udpServerIP; // 云台相机ip地址 uint32_t udpServerPort; // 云台相机端口 unsigned long _tm_mqttconnect; // mqtt上次连接时间 diff --git a/src/commser.cpp b/src/commser.cpp index ce9537b..cf18cad 100644 --- a/src/commser.cpp +++ b/src/commser.cpp @@ -22,18 +22,19 @@ static const char *MOUDLENAME = "COMMSER"; // char* password = "63587839ab"; //wifi密码 // char *ssid = "flicube"; // wifi帐号 // char *password = "fxmf0622"; // wifi密码 -char *ssid = "fxmf_sc02"; // 4g wifi帐号 -char *password = "12345678"; // 4g wifi密码 -char *mqttServer = "szdot.top"; // mqtt地址 -int mqttPort = 1883; // mqtt端口 -char *mqttName = "admin"; // mqtt帐号 -char *mqttPassword = "123456"; // mqtt密码 -uint8_t mavlinkSerial = 2; // 飞控占用的串口号 -uint8_t voiceSerial = 1; // 声音模块占用串口 -char *udpServerIP = "192.168.3.92"; // 云台相机ip -uint32_t udpServerPort = 37260; // 云台相机端口 +const char *ssid = "fxmf_sc02"; // 4g wifi帐号 +const char *password = "12345678"; // 4g wifi密码 +const char *mqttServer = "szdot.top"; // mqtt地址 +int mqttPort = 1883; // mqtt端口 +const char *mqttName = "admin"; // mqtt帐号 +const char *mqttPassword = "123456"; // mqtt密码 +String mqttTopic = "cmd"; // mqtt主题前缀 (cmd/macAdd) +uint8_t mavlinkSerial = 2; // 飞控占用的串口号 +uint8_t voiceSerial = 1; // 声音模块占用串口 +const char *udpServerIP = "192.168.3.92"; // 云台相机ip +uint32_t udpServerPort = 37260; // 云台相机端口 -FoodCube fc(ssid, password, mqttServer, mqttPort, mqttName, mqttPassword, mavlinkSerial, voiceSerial, udpServerIP, udpServerPort); // 创建项目对象 +FoodCube fc(ssid, password, mqttServer, mqttPort, mqttName, mqttPassword, mqttTopic, mavlinkSerial, voiceSerial, udpServerIP, udpServerPort); // 创建项目对象 /* 发布 主题 ps:以下是登记发布json内容的组成元素 */ // 登记 json成员名字 @@ -66,6 +67,61 @@ Ticker mavTicker; // 定时 指定飞控mavlink 串口返回的数据类别 防 Ticker funTicker; // 延迟执行 // Ticker chanTicker; //定时向飞控 发送油门指定 portMUX_TYPE espMutex = portMUX_INITIALIZER_UNLOCKED; + +/** + * @brief 异步MQTT消息接收回调(处理分片拼包) + * + * 由于消息可能被拆分成多段(分片)发送,函数负责把分片数据拼接成完整消息, + * 拼接完成后调用业务处理函数 mqtt_receiveCallback 进行解析和处理。 + * + * @param topic MQTT主题字符串 + * @param payload 当前分片消息数据指针 + * @param properties MQTT消息属性(本函数未使用) + * @param len 当前分片消息长度 + * @param index 当前分片在整个消息中的起始偏移位置 + * @param total 当前完整消息总长度 + */ +void async_mqtt_receiveCallback(char *topic, char *payload, AsyncMqttClientMessageProperties properties, + size_t len, size_t index, size_t total) +{ + static uint8_t buffer[2048]; // 静态缓存区,存放整条消息,大小需根据最大消息长度调整 + static size_t expected_total = 0; // 记录期望接收的完整消息长度 + + // 如果是新消息开始(第一个分片) + if (index == 0) + { + expected_total = total; // 记录当前消息总长度 + + // 检查消息长度是否超出缓存区大小,防止溢出 + if (total > sizeof(buffer)) + { + Serial.println("Payload too large"); + expected_total = 0; // 重置,放弃此次消息 + return; + } + } + + // 防止超出缓存区边界,避免溢出 + if (index + len > sizeof(buffer)) + { + Serial.println("Payload overflow"); + expected_total = 0; // 重置状态 + return; + } + + // 将当前分片数据复制到缓存区对应位置 + memcpy(buffer + index, payload, len); + + // 判断是否已经接收完所有分片(index + len == total) + if ((index + len) == expected_total) + { + // 调用原业务处理函数,传入完整拼接的消息数据和长度 + mqtt_receiveCallback(topic, buffer, expected_total); + + // 清除状态,准备接收下一条消息 + expected_total = 0; + } +} /** * @description: mqtt订阅主题 收到信息 的回调函数 * @param {char*} topic 主题名称 msg/macadd @@ -110,11 +166,6 @@ void mqtt_receiveCallback(char *topic, byte *payload, unsigned int length) fc.playText(todo, volume); // 播放声音 } - else if (key == "questAss") - { // 飞行航点任务 questAss - String todo = value; // 转换值 - writeRoute(todo); // 写入航点 - } else if (key == "setQuestState") { // 设置飞机任务状态 @@ -127,7 +178,7 @@ void mqtt_receiveCallback(char *topic, byte *payload, unsigned int length) uint8_t state = obj["state"]; // 标记飞机状态 0 or 1 // 标记飞机状态 topicPubMsg[10] = fc.setNBit(topicPubMsg[10], n, state); - if (n = 5 && state == 1) // 执行任务 ps:切自动 + if (n == 5 && state == 1) // 执行任务 ps:切自动 { funTicker.once(3, []() { @@ -1011,7 +1062,7 @@ void pubThread() // 将JSON对象序列化为JSON字符串 String jsonString; serializeJson(doc, jsonString); - fc.pubMQTTmsg("planeState", jsonString); + fc.pubMQTTmsg("planeState", jsonString, 1); } // 添加日志 到日志队列 void addLogMessage(const String &msg) diff --git a/src/commser.h b/src/commser.h index 906069a..8fc7846 100644 --- a/src/commser.h +++ b/src/commser.h @@ -19,6 +19,7 @@ enum FlightMode LAND = 9, // 降落模式 POSHOLD = 16, // 位置保持模式 }; +extern void async_mqtt_receiveCallback(char *topic, char *payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total); extern void mqtt_receiveCallback(char *topic, byte *payload, unsigned int length); extern void pubThread(); extern void addLogMessage(const String &msg); diff --git a/src/main.cpp b/src/main.cpp index 4d5a50a..f09056d 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -120,7 +120,7 @@ void setup() button_down.attachLongPressStart(downbtn_pressstart); button_down.attachLongPressStop(downbtn_pressend); - button_checktop.setPressTicks(10); // 10毫秒就产生按下事件,用于顶部按钮检测 + button_checktop.setPressTicks(10); // 10毫秒就产生按下事件,用于顶部按钮检测 改为button_checktop.setPressMs(10); button_checktop.attachLongPressStart(ctbtn_pressstart); // 按下马上产生事件, button_checktop.attachLongPressStop(ctbtn_pressend); // 抬起 @@ -148,11 +148,8 @@ void setup() /*初始化*/ Serial1.begin(115200, SERIAL_8N1, SERIAL_REPORT_RX, SERIAL_REPORT_TX); // 声音模块引串口脚映射 fc.playText("开始启动"); - fc.connectWifi(); // 连接wifi - // fc.playText("正在连接服务器"); - // fc.connectMqtt("cmd"); // 连接mqtt - fc.mqttClient->setCallback(mqtt_receiveCallback); // 设置订阅成功 回调 - fc.mav_request_data(); // 指定飞控串口返回的数据类别(飞控启动之后发送才有意义) + fc.connectWifi(); // 连接wifi + fc.mav_request_data(); // 指定飞控串口返回的数据类别(飞控启动之后发送才有意义) /*异步线程,在核心0上回调*/ tksendinfo.attach(1, sendinfo); // 发送状态 @@ -468,6 +465,9 @@ void Task1(void *pvParameters) scale.set_scale(WEIGHT_SCALE); // 这是缩放值,根据砝码实测516.f scale.tare(); // 重置为0 + // mqtt初始化 + fc.mqttClient->onMessage(async_mqtt_receiveCallback); // 设置mqtt的订阅回调函数 + // 在这里可以添加一些代码,这样的话这个任务执行时会先执行一次这里的内容(当然后面进入while循环之后不会再执行这部分了) while (1) { @@ -483,9 +483,40 @@ void Task1(void *pvParameters) /*从飞控拿数据*/ fc.comm_receive(mavlink_receiveCallback); - // vTaskDelay(1); - if (fc.checkWiFiStatus()) - fc.mqttLoop("cmd"); // 保持mqtt心跳,如果Mqtt没有连接会自动连接 + + // 保持wifi mqtt的链接 + if (fc.checkWiFiStatus() && !fc.mqttClient->connected()) + { + // 用 static 防止每次 loop 都重新设置 onConnect 和重复订阅 + static bool mqttTopicSet = false; + // 控制 MQTT 重连尝试的时间间隔(避免频繁重连) + static unsigned long lastMqttAttempt = 0; + const unsigned long mqttRetryInterval = 5000; // 每隔 5 秒尝试一次 + + unsigned long now = millis(); // 获取当前时间 + + // 如果距离上次尝试连接的时间超过了设定间隔 + if (now - lastMqttAttempt > mqttRetryInterval) + { + lastMqttAttempt = now; // 记录本次尝试连接的时间 + + // 链接wifi后获取macAdd 再绑定如果还没有绑定 MQTT 的 onConnect 订阅 + if (!mqttTopicSet) + { + // 链接成功时设置订阅主题 + fc.mqttClient->onConnect( + [](bool sessionPresent) + { + fc.subscribeTopic(fc.mqttTopic, 1); // 订阅主题 ps:subscribeTopic内会拼接主题 cmd/macAdd 之所以写在这里因为需要先链接wifi之后才能获取到macAdd + }); + + mqttTopicSet = true; // 设置标志,避免重复绑定 + } + + // 尝试连接 MQTT 服务器(只在未连接状态下尝试) + fc.mqttClient->connect(); + } + } vTaskDelay(10); _looptm_core0 = millis() - _tm_core0; @@ -742,7 +773,7 @@ void btn_presssatonce() addLogMessage("UP_presssatonce"); led_show(255, 255, 255); // 高亮一下 fc.playText("发送对频信息"); - fc.pubMQTTmsg(topicHandle[0], fc.getMacAdd()); + fc.pubMQTTmsg(topicHandle[0], fc.getMacAdd(), 0); } // 收线按钮-长按 void upbtn_pressstart()