From 4cb40f87d27a10e0f84aa7f911ac94deac58cf5e Mon Sep 17 00:00:00 2001 From: Ricardo Gorosito Date: Sun, 23 May 2021 15:52:35 -0300 Subject: [PATCH 1/3] stop loosing messages --- Adafruit_MQTT.cpp | 41 +++++++++++++++++++++++++++++++++++------ Adafruit_MQTT.h | 2 ++ 2 files changed, 37 insertions(+), 6 deletions(-) diff --git a/Adafruit_MQTT.cpp b/Adafruit_MQTT.cpp index bb7f8cf..e35c0b8 100644 --- a/Adafruit_MQTT.cpp +++ b/Adafruit_MQTT.cpp @@ -351,13 +351,13 @@ bool Adafruit_MQTT::publish(const char *topic, uint8_t *data, uint16_t bLen, // If QOS level is high enough verify the response packet. if (qos > 0) { - len = readFullPacket(buffer, MAXBUFFERSIZE, PUBLISH_TIMEOUT_MS); + len = processPacketsUntil(buffer, MQTT_CTRL_PUBACK, PUBLISH_TIMEOUT_MS); + DEBUG_PRINT(F("Publish QOS1+ reply:\t")); DEBUG_PRINTBUFFER(buffer, len); if (len != 4) return false; - if ((buffer[0] >> 4) != MQTT_CTRL_PUBACK) - return false; + uint16_t packnum = buffer[2]; packnum <<= 8; packnum |= buffer[3]; @@ -508,10 +508,32 @@ void Adafruit_MQTT::processPackets(int16_t timeout) { } } Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) { - // Check if data is available to read. - uint16_t len = + + // Sync or Async subscriber with message + Adafruit_MQTT_Subscribe* s=0; + + // Check if are unread messages + for (uint8_t i = 0; i < MAXSUBSCRIPTIONS; i++) { + if (subscriptions[i] && subscriptions[i]->new_message) { + s=subscriptions[i]; + break; + } + } + + // not unread message + if ( ! s ) { + // Check if data is available to read. + uint16_t len = readFullPacket(buffer, MAXBUFFERSIZE, timeout); // return one full packet - return handleSubscriptionPacket(len); + s=handleSubscriptionPacket(len); + } + + // it there is a message, mark it as not pending + if ( s ) { + s->new_message=false; + } + + return s; } Adafruit_MQTT_Subscribe *Adafruit_MQTT::handleSubscriptionPacket(uint16_t len) { @@ -551,6 +573,12 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::handleSubscriptionPacket(uint16_t len) { topiclen) == 0) { DEBUG_PRINT(F("Found sub #")); DEBUG_PRINTLN(i); + if ( subscriptions[i]->new_message ) { + DEBUG_PRINTLN(F("Lost previous message")); + } else { + subscriptions[i]->new_message=true; + } + break; } } @@ -910,6 +938,7 @@ Adafruit_MQTT_Subscribe::Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver, callback_double = 0; callback_io = 0; io_mqtt = 0; + new_message = false; } void Adafruit_MQTT_Subscribe::setCallback(SubscribeCallbackUInt32Type cb) { diff --git a/Adafruit_MQTT.h b/Adafruit_MQTT.h index 1307753..7249aa1 100644 --- a/Adafruit_MQTT.h +++ b/Adafruit_MQTT.h @@ -320,6 +320,8 @@ public: AdafruitIO_MQTT *io_mqtt; + bool new_message; + private: Adafruit_MQTT *mqtt; }; From 986c1b405cc76953b3ece64b3a5405955f07e696 Mon Sep 17 00:00:00 2001 From: Ricardo Gorosito Date: Tue, 25 May 2021 13:16:43 -0300 Subject: [PATCH 2/3] fix c formatting --- Adafruit_MQTT.cpp | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/Adafruit_MQTT.cpp b/Adafruit_MQTT.cpp index e35c0b8..8ba4726 100644 --- a/Adafruit_MQTT.cpp +++ b/Adafruit_MQTT.cpp @@ -510,27 +510,27 @@ void Adafruit_MQTT::processPackets(int16_t timeout) { Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) { // Sync or Async subscriber with message - Adafruit_MQTT_Subscribe* s=0; + Adafruit_MQTT_Subscribe *s=0; // Check if are unread messages for (uint8_t i = 0; i < MAXSUBSCRIPTIONS; i++) { if (subscriptions[i] && subscriptions[i]->new_message) { - s=subscriptions[i]; + s = subscriptions[i]; break; } } // not unread message - if ( ! s ) { + if (!s) { // Check if data is available to read. - uint16_t len = - readFullPacket(buffer, MAXBUFFERSIZE, timeout); // return one full packet - s=handleSubscriptionPacket(len); + uint16_t len = readFullPacket(buffer, MAXBUFFERSIZE, + timeout); // return one full packet + s = handleSubscriptionPacket(len); } // it there is a message, mark it as not pending - if ( s ) { - s->new_message=false; + if (s) { + s->new_message = false; } return s; @@ -573,12 +573,12 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::handleSubscriptionPacket(uint16_t len) { topiclen) == 0) { DEBUG_PRINT(F("Found sub #")); DEBUG_PRINTLN(i); - if ( subscriptions[i]->new_message ) { + if (subscriptions[i]->new_message) { DEBUG_PRINTLN(F("Lost previous message")); } else { - subscriptions[i]->new_message=true; + subscriptions[i]->new_message = true; } - + break; } } From ab78b291b2ac9c447ef70faac31f641d818379cf Mon Sep 17 00:00:00 2001 From: Ricardo Gorosito Date: Tue, 25 May 2021 13:23:51 -0300 Subject: [PATCH 3/3] more c formatting fixing --- Adafruit_MQTT.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Adafruit_MQTT.cpp b/Adafruit_MQTT.cpp index 8ba4726..ddf2d82 100644 --- a/Adafruit_MQTT.cpp +++ b/Adafruit_MQTT.cpp @@ -510,7 +510,7 @@ void Adafruit_MQTT::processPackets(int16_t timeout) { Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) { // Sync or Async subscriber with message - Adafruit_MQTT_Subscribe *s=0; + Adafruit_MQTT_Subscribe *s = 0; // Check if are unread messages for (uint8_t i = 0; i < MAXSUBSCRIPTIONS; i++) { @@ -524,7 +524,7 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) { if (!s) { // Check if data is available to read. uint16_t len = readFullPacket(buffer, MAXBUFFERSIZE, - timeout); // return one full packet + timeout); // return one full packet s = handleSubscriptionPacket(len); }