diff --git a/Adafruit_MQTT.cpp b/Adafruit_MQTT.cpp index 4e98c4a..7d5141b 100644 --- a/Adafruit_MQTT.cpp +++ b/Adafruit_MQTT.cpp @@ -152,10 +152,12 @@ int8_t Adafruit_MQTT::connect() { // Read connect response packet and verify it len = readFullPacket(buffer, MAXBUFFERSIZE, CONNECT_TIMEOUT_MS); - if (len != 4) + if (len != 4) { return -1; - if ((buffer[0] != (MQTT_CTRL_CONNECTACK << 4)) || (buffer[1] != 2)) + } + if ((buffer[0] != (MQTT_CTRL_CONNECTACK << 4)) || (buffer[1] != 2)) { return -1; + } if (buffer[3] != 0) return buffer[3]; @@ -182,7 +184,6 @@ int8_t Adafruit_MQTT::connect() { // the Subscription before the Server sends the SUBACK Packet. (will // really need to use callbacks - ada) - // Serial.println("\t**looking for suback"); if (processPacketsUntil(buffer, MQTT_CTRL_SUBACK, SUBACK_TIMEOUT_MS)) { success = true; break; @@ -213,10 +214,15 @@ uint16_t Adafruit_MQTT::processPacketsUntil(uint8_t *buffer, break; } - if ((buffer[0] >> 4) == waitforpackettype) { + uint8_t packetType = (buffer[0] >> 4); + if (packetType == waitforpackettype) { return len; } else { - ERROR_PRINTLN(F("Dropped a packet")); + if (packetType == MQTT_CTRL_PUBLISH) { + handleSubscriptionPacket(len); + } else { + ERROR_PRINTLN(F("Dropped a packet")); + } } } return 0; @@ -432,30 +438,21 @@ void Adafruit_MQTT::processPackets(int16_t timeout) { while (elapsed < (uint32_t)timeout) { Adafruit_MQTT_Subscribe *sub = readSubscription(timeout - elapsed); if (sub) { - // Serial.println("**** sub packet received"); if (sub->callback_uint32t != NULL) { // huh lets do the callback in integer mode uint32_t data = 0; data = atoi((char *)sub->lastread); - // Serial.print("*** calling int callback with : "); - // Serial.println(data); sub->callback_uint32t(data); } else if (sub->callback_double != NULL) { // huh lets do the callback in doublefloat mode double data = 0; data = atof((char *)sub->lastread); - // Serial.print("*** calling double callback with : "); - // Serial.println(data); sub->callback_double(data); } else if (sub->callback_buffer != NULL) { // huh lets do the callback in buffer mode - // Serial.print("*** calling buffer callback with : "); - // Serial.println((char *)sub->lastread); sub->callback_buffer((char *)sub->lastread, sub->datalen); } else if (sub->callback_io != NULL) { // huh lets do the callback in io mode - // Serial.print("*** calling io instance callback with : "); - // Serial.println((char *)sub->lastread); ((sub->io_mqtt)->*(sub->callback_io))((char *)sub->lastread, sub->datalen); } @@ -469,23 +466,29 @@ void Adafruit_MQTT::processPackets(int16_t timeout) { elapsed += (endtime - starttime); } } - Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) { - uint16_t i, topiclen, datalen; - // Check if data is available to read. uint16_t len = readFullPacket(buffer, MAXBUFFERSIZE, timeout); // return one full packet - if (!len) + return handleSubscriptionPacket(len); +} + +Adafruit_MQTT_Subscribe *Adafruit_MQTT::handleSubscriptionPacket(uint16_t len) { + uint16_t i, topiclen, datalen; + + if (!len) { return NULL; // No data available, just quit. + } DEBUG_PRINT("Packet len: "); DEBUG_PRINTLN(len); DEBUG_PRINTBUFFER(buffer, len); - if (len < 3) + if (len < 3) { return NULL; - if ((buffer[0] & 0xF0) != (MQTT_CTRL_PUBLISH) << 4) + } + if ((buffer[0] & 0xF0) != (MQTT_CTRL_PUBLISH) << 4) { return NULL; + } // Parse out length of packet. topiclen = buffer[3]; diff --git a/Adafruit_MQTT.h b/Adafruit_MQTT.h index 33c4c96..089d0d4 100644 --- a/Adafruit_MQTT.h +++ b/Adafruit_MQTT.h @@ -206,6 +206,10 @@ public: // messages! Adafruit_MQTT_Subscribe *readSubscription(int16_t timeout = 0); + // Handle any data coming in for subscriptions and fires them off to the + // appropriate callback + Adafruit_MQTT_Subscribe *handleSubscriptionPacket(uint16_t len); + void processPackets(int16_t timeout); // Ping the server to ensure the connection is still alive. diff --git a/Adafruit_MQTT_Client.cpp b/Adafruit_MQTT_Client.cpp index 454952b..560ef46 100644 --- a/Adafruit_MQTT_Client.cpp +++ b/Adafruit_MQTT_Client.cpp @@ -55,6 +55,10 @@ uint16_t Adafruit_MQTT_Client::readPacket(uint8_t *buffer, uint16_t maxlen, uint16_t len = 0; int16_t t = timeout; + if (maxlen == 0) { // handle zero-length packets + return 0; + } + while (client->connected() && (timeout >= 0)) { // DEBUG_PRINT('.'); while (client->available()) { @@ -65,10 +69,6 @@ uint16_t Adafruit_MQTT_Client::readPacket(uint8_t *buffer, uint16_t maxlen, // DEBUG_PRINTLN((uint8_t)c, HEX); len++; - if (maxlen == 0) { // handle zero-length packets - return 0; - } - if (len == maxlen) { // we read all we want, bail DEBUG_PRINT(F("Read data:\t")); DEBUG_PRINTBUFFER(buffer, len); diff --git a/library.properties b/library.properties index dc4c4ee..957bb7e 100644 --- a/library.properties +++ b/library.properties @@ -1,5 +1,5 @@ name=Adafruit MQTT Library -version=2.0.0 +version=2.1.0 author=Adafruit maintainer=Adafruit sentence=MQTT library that supports the FONA, ESP8266, Yun, and generic Arduino Client hardware.