diff --git a/Adafruit_MQTT.cpp b/Adafruit_MQTT.cpp index 253d633..3a27aab 100644 --- a/Adafruit_MQTT.cpp +++ b/Adafruit_MQTT.cpp @@ -189,15 +189,15 @@ int8_t Adafruit_MQTT::connect() { // Check for SUBACK if using MQTT 3.1.1 or higher // TODO: The Server is permitted to start sending PUBLISH packets matching the - // Subscription before the Server sends the SUBACK Packet. - // if(MQTT_PROTOCOL_LEVEL > 3) { - // len = readPacket(buffer, 5, CONNECT_TIMEOUT_MS); - // DEBUG_PRINT(F("SUBACK:\t")); - // DEBUG_PRINTBUFFER(buffer, len); - // if ((len != 5) || (buffer[0] != (MQTT_CTRL_SUBACK << 4))) { - // return 6; // failure to subscribe - // } - // } + // Subscription before the Server sends the SUBACK Packet. (will really need to use callbacks - ada) + if(MQTT_PROTOCOL_LEVEL > 3) { + len = readPacket(buffer, 5, CONNECT_TIMEOUT_MS); + DEBUG_PRINT(F("SubAck:\t")); + DEBUG_PRINTBUFFER(buffer, len); + if ((len != 5) || (buffer[0] != (MQTT_CTRL_SUBACK << 4))) { + return 6; // failure to subscribe + } + } } @@ -329,12 +329,10 @@ bool Adafruit_MQTT::unsubscribe(Adafruit_MQTT_Subscribe *sub) { if ((len != 5) || (buffer[0] != (MQTT_CTRL_UNSUBACK << 4))) { return false; // failure to unsubscribe } - } subscriptions[i] = 0; return true; - } } @@ -374,19 +372,38 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) { } if (i==MAXSUBSCRIPTIONS) return NULL; // matching sub not found ??? + uint8_t packet_id_len = 0; + uint16_t packetid; + // Check if it is QoS 1, TODO: we dont support QoS 2 + if ((buffer[0] & 0x6) == 0x2) { + packet_id_len = 2; + packetid = buffer[topiclen+4]; + packetid <<= 8; + packetid |= buffer[topiclen+5]; + } + // zero out the old data memset(subscriptions[i]->lastread, 0, SUBSCRIPTIONDATALEN); - datalen = len - topiclen - 4; + datalen = len - topiclen - packet_id_len - 4; if (datalen > SUBSCRIPTIONDATALEN) { datalen = SUBSCRIPTIONDATALEN-1; // cut it off } // extract out just the data, into the subscription object itself - memmove(subscriptions[i]->lastread, buffer+4+topiclen, datalen); + memmove(subscriptions[i]->lastread, buffer+4+topiclen+packet_id_len, datalen); subscriptions[i]->datalen = datalen; DEBUG_PRINT(F("Data len: ")); DEBUG_PRINTLN(datalen); DEBUG_PRINT(F("Data: ")); DEBUG_PRINTLN((char *)subscriptions[i]->lastread); + if ((MQTT_PROTOCOL_LEVEL > 3) &&(buffer[0] & 0x6) == 0x2) { + uint8_t ackpacket[4]; + + // Construct and send puback packet. + uint8_t len = pubackPacket(ackpacket, packetid); + if (!sendPacket(ackpacket, len)) + DEBUG_PRINT(F("Failed")); + } + // return the valid matching subscription return subscriptions[i]; } @@ -605,6 +622,16 @@ uint8_t Adafruit_MQTT::pingPacket(uint8_t *packet) { return 2; } +uint8_t Adafruit_MQTT::pubackPacket(uint8_t *packet, uint16_t packetid) { + packet[0] = MQTT_CTRL_PUBACK << 4; + packet[1] = 2; + packet[2] = packetid >> 8; + packet[3] = packetid; + DEBUG_PRINTLN(F("MQTT puback packet:")); + DEBUG_PRINTBUFFER(buffer, 4); + return 4; +} + uint8_t Adafruit_MQTT::disconnectPacket(uint8_t *packet) { packet[0] = MQTT_CTRL_DISCONNECT << 4; packet[1] = 0; diff --git a/Adafruit_MQTT.h b/Adafruit_MQTT.h index 6940947..6898752 100644 --- a/Adafruit_MQTT.h +++ b/Adafruit_MQTT.h @@ -227,6 +227,7 @@ class Adafruit_MQTT { uint8_t subscribePacket(uint8_t *packet, const char *topic, uint8_t qos); uint8_t unsubscribePacket(uint8_t *packet, const char *topic); uint8_t pingPacket(uint8_t *packet); + uint8_t pubackPacket(uint8_t *packet, uint16_t packetid); };