stop loosing messages

This commit is contained in:
Ricardo Gorosito 2021-05-23 15:52:35 -03:00
parent e524b5f3ae
commit 4cb40f87d2
2 changed files with 37 additions and 6 deletions

View File

@ -351,13 +351,13 @@ bool Adafruit_MQTT::publish(const char *topic, uint8_t *data, uint16_t bLen,
// If QOS level is high enough verify the response packet. // If QOS level is high enough verify the response packet.
if (qos > 0) { if (qos > 0) {
len = readFullPacket(buffer, MAXBUFFERSIZE, PUBLISH_TIMEOUT_MS); len = processPacketsUntil(buffer, MQTT_CTRL_PUBACK, PUBLISH_TIMEOUT_MS);
DEBUG_PRINT(F("Publish QOS1+ reply:\t")); DEBUG_PRINT(F("Publish QOS1+ reply:\t"));
DEBUG_PRINTBUFFER(buffer, len); DEBUG_PRINTBUFFER(buffer, len);
if (len != 4) if (len != 4)
return false; return false;
if ((buffer[0] >> 4) != MQTT_CTRL_PUBACK)
return false;
uint16_t packnum = buffer[2]; uint16_t packnum = buffer[2];
packnum <<= 8; packnum <<= 8;
packnum |= buffer[3]; packnum |= buffer[3];
@ -508,10 +508,32 @@ void Adafruit_MQTT::processPackets(int16_t timeout) {
} }
} }
Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) { Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) {
// Check if data is available to read.
uint16_t len = // Sync or Async subscriber with message
Adafruit_MQTT_Subscribe* s=0;
// Check if are unread messages
for (uint8_t i = 0; i < MAXSUBSCRIPTIONS; i++) {
if (subscriptions[i] && subscriptions[i]->new_message) {
s=subscriptions[i];
break;
}
}
// not unread message
if ( ! s ) {
// Check if data is available to read.
uint16_t len =
readFullPacket(buffer, MAXBUFFERSIZE, timeout); // return one full packet readFullPacket(buffer, MAXBUFFERSIZE, timeout); // return one full packet
return handleSubscriptionPacket(len); s=handleSubscriptionPacket(len);
}
// it there is a message, mark it as not pending
if ( s ) {
s->new_message=false;
}
return s;
} }
Adafruit_MQTT_Subscribe *Adafruit_MQTT::handleSubscriptionPacket(uint16_t len) { Adafruit_MQTT_Subscribe *Adafruit_MQTT::handleSubscriptionPacket(uint16_t len) {
@ -551,6 +573,12 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::handleSubscriptionPacket(uint16_t len) {
topiclen) == 0) { topiclen) == 0) {
DEBUG_PRINT(F("Found sub #")); DEBUG_PRINT(F("Found sub #"));
DEBUG_PRINTLN(i); DEBUG_PRINTLN(i);
if ( subscriptions[i]->new_message ) {
DEBUG_PRINTLN(F("Lost previous message"));
} else {
subscriptions[i]->new_message=true;
}
break; break;
} }
} }
@ -910,6 +938,7 @@ Adafruit_MQTT_Subscribe::Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver,
callback_double = 0; callback_double = 0;
callback_io = 0; callback_io = 0;
io_mqtt = 0; io_mqtt = 0;
new_message = false;
} }
void Adafruit_MQTT_Subscribe::setCallback(SubscribeCallbackUInt32Type cb) { void Adafruit_MQTT_Subscribe::setCallback(SubscribeCallbackUInt32Type cb) {

View File

@ -320,6 +320,8 @@ public:
AdafruitIO_MQTT *io_mqtt; AdafruitIO_MQTT *io_mqtt;
bool new_message;
private: private:
Adafruit_MQTT *mqtt; Adafruit_MQTT *mqtt;
}; };