diff --git a/Adafruit_MQTT.cpp b/Adafruit_MQTT.cpp index bb7f8cf..ddf2d82 100644 --- a/Adafruit_MQTT.cpp +++ b/Adafruit_MQTT.cpp @@ -351,13 +351,13 @@ bool Adafruit_MQTT::publish(const char *topic, uint8_t *data, uint16_t bLen, // If QOS level is high enough verify the response packet. if (qos > 0) { - len = readFullPacket(buffer, MAXBUFFERSIZE, PUBLISH_TIMEOUT_MS); + len = processPacketsUntil(buffer, MQTT_CTRL_PUBACK, PUBLISH_TIMEOUT_MS); + DEBUG_PRINT(F("Publish QOS1+ reply:\t")); DEBUG_PRINTBUFFER(buffer, len); if (len != 4) return false; - if ((buffer[0] >> 4) != MQTT_CTRL_PUBACK) - return false; + uint16_t packnum = buffer[2]; packnum <<= 8; packnum |= buffer[3]; @@ -508,10 +508,32 @@ void Adafruit_MQTT::processPackets(int16_t timeout) { } } Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) { - // Check if data is available to read. - uint16_t len = - readFullPacket(buffer, MAXBUFFERSIZE, timeout); // return one full packet - return handleSubscriptionPacket(len); + + // Sync or Async subscriber with message + Adafruit_MQTT_Subscribe *s = 0; + + // Check if are unread messages + for (uint8_t i = 0; i < MAXSUBSCRIPTIONS; i++) { + if (subscriptions[i] && subscriptions[i]->new_message) { + s = subscriptions[i]; + break; + } + } + + // not unread message + if (!s) { + // Check if data is available to read. + uint16_t len = readFullPacket(buffer, MAXBUFFERSIZE, + timeout); // return one full packet + s = handleSubscriptionPacket(len); + } + + // it there is a message, mark it as not pending + if (s) { + s->new_message = false; + } + + return s; } Adafruit_MQTT_Subscribe *Adafruit_MQTT::handleSubscriptionPacket(uint16_t len) { @@ -551,6 +573,12 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::handleSubscriptionPacket(uint16_t len) { topiclen) == 0) { DEBUG_PRINT(F("Found sub #")); DEBUG_PRINTLN(i); + if (subscriptions[i]->new_message) { + DEBUG_PRINTLN(F("Lost previous message")); + } else { + subscriptions[i]->new_message = true; + } + break; } } @@ -910,6 +938,7 @@ Adafruit_MQTT_Subscribe::Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver, callback_double = 0; callback_io = 0; io_mqtt = 0; + new_message = false; } void Adafruit_MQTT_Subscribe::setCallback(SubscribeCallbackUInt32Type cb) { diff --git a/Adafruit_MQTT.h b/Adafruit_MQTT.h index 1307753..7249aa1 100644 --- a/Adafruit_MQTT.h +++ b/Adafruit_MQTT.h @@ -320,6 +320,8 @@ public: AdafruitIO_MQTT *io_mqtt; + bool new_message; + private: Adafruit_MQTT *mqtt; };