From 2ea8a0b3fce916fa87b6a3730125153965c61f25 Mon Sep 17 00:00:00 2001 From: xdylanm Date: Wed, 12 May 2021 21:31:22 -0700 Subject: [PATCH 1/3] Support publishing and receiving large messages. Send multiple 250 byte packets (ref PR#113) for larger messages. Correctly identify topic start position for >127byte messages (ref PR#166). Resolves issue #102 --- Adafruit_MQTT.cpp | 32 +++++++++++++++++++++++++------- Adafruit_MQTT_Client.cpp | 7 ++++--- 2 files changed, 29 insertions(+), 10 deletions(-) diff --git a/Adafruit_MQTT.cpp b/Adafruit_MQTT.cpp index 7d5141b..70a849d 100644 --- a/Adafruit_MQTT.cpp +++ b/Adafruit_MQTT.cpp @@ -233,7 +233,7 @@ uint16_t Adafruit_MQTT::readFullPacket(uint8_t *buffer, uint16_t maxsize, // will read a packet and Do The Right Thing with length uint8_t *pbuff = buffer; - uint8_t rlen; + uint16_t rlen; // read the packet type: rlen = readPacket(pbuff, 1, timeout); @@ -473,6 +473,22 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) { return handleSubscriptionPacket(len); } +namespace { + +uint16_t topicOffsetFromLenth(uint16_t const len) +{ + if (len < 128) { // 7 bits (+1 continuation bit) + return 0; + } else if (len < 16384) { // 14 bits (+2 continuation bits) + return 1; + } else if (len < 2097152) { // 21 bits + return 2; + } + return 3; +} + +} // anon + Adafruit_MQTT_Subscribe *Adafruit_MQTT::handleSubscriptionPacket(uint16_t len) { uint16_t i, topiclen, datalen; @@ -491,7 +507,9 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::handleSubscriptionPacket(uint16_t len) { } // Parse out length of packet. - topiclen = buffer[3]; + uint16_t const topicoffset = topicOffsetFromLength(len); + uint16_t const topicstart = topicoffset + 4; + topiclen = buffer[3+topicoffset]; DEBUG_PRINT(F("Looking for subscription len ")); DEBUG_PRINTLN(topiclen); @@ -504,7 +522,7 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::handleSubscriptionPacket(uint16_t len) { continue; // Stop if the subscription topic matches the received topic. Be careful // to make comparison case insensitive. - if (strncasecmp((char *)buffer + 4, subscriptions[i]->topic, topiclen) == + if (strncasecmp((char *)buffer + topicstart, subscriptions[i]->topic, topiclen) == 0) { DEBUG_PRINT(F("Found sub #")); DEBUG_PRINTLN(i); @@ -520,20 +538,20 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::handleSubscriptionPacket(uint16_t len) { // Check if it is QoS 1, TODO: we dont support QoS 2 if ((buffer[0] & 0x6) == 0x2) { packet_id_len = 2; - packetid = buffer[topiclen + 4]; + packetid = buffer[topiclen + topicstart]; packetid <<= 8; - packetid |= buffer[topiclen + 5]; + packetid |= buffer[topiclen + topicstart + 1]; } // zero out the old data memset(subscriptions[i]->lastread, 0, SUBSCRIPTIONDATALEN); - datalen = len - topiclen - packet_id_len - 4; + datalen = len - topiclen - packet_id_len - topicstart; if (datalen > SUBSCRIPTIONDATALEN) { datalen = SUBSCRIPTIONDATALEN - 1; // cut it off } // extract out just the data, into the subscription object itself - memmove(subscriptions[i]->lastread, buffer + 4 + topiclen + packet_id_len, + memmove(subscriptions[i]->lastread, buffer + topicstart + topiclen + packet_id_len, datalen); subscriptions[i]->datalen = datalen; DEBUG_PRINT(F("Data len: ")); diff --git a/Adafruit_MQTT_Client.cpp b/Adafruit_MQTT_Client.cpp index 560ef46..2720ea9 100644 --- a/Adafruit_MQTT_Client.cpp +++ b/Adafruit_MQTT_Client.cpp @@ -83,18 +83,19 @@ uint16_t Adafruit_MQTT_Client::readPacket(uint8_t *buffer, uint16_t maxlen, bool Adafruit_MQTT_Client::sendPacket(uint8_t *buffer, uint16_t len) { uint16_t ret = 0; - + uint16_t offset = 0; while (len > 0) { if (client->connected()) { // send 250 bytes at most at a time, can adjust this later based on Client uint16_t sendlen = len > 250 ? 250 : len; // Serial.print("Sending: "); Serial.println(sendlen); - ret = client->write(buffer, sendlen); + ret = client->write(buffer + offset, sendlen); DEBUG_PRINT(F("Client sendPacket returned: ")); DEBUG_PRINTLN(ret); len -= ret; - + offset += ret; + if (ret != sendlen) { DEBUG_PRINTLN("Failed to send packet."); return false; From 93a35d1a3e63b67a4fa5cbeaa4524312289c3581 Mon Sep 17 00:00:00 2001 From: xdylanm Date: Wed, 12 May 2021 21:54:08 -0700 Subject: [PATCH 2/3] fix typo --- Adafruit_MQTT.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Adafruit_MQTT.cpp b/Adafruit_MQTT.cpp index 70a849d..84e4ad1 100644 --- a/Adafruit_MQTT.cpp +++ b/Adafruit_MQTT.cpp @@ -475,7 +475,7 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) { namespace { -uint16_t topicOffsetFromLenth(uint16_t const len) +uint16_t topicOffsetFromLength(uint16_t const len) { if (len < 128) { // 7 bits (+1 continuation bit) return 0; From db003d30d1c167f8029e746069e801026d3ee1be Mon Sep 17 00:00:00 2001 From: xdylanm Date: Fri, 14 May 2021 08:09:07 -0700 Subject: [PATCH 3/3] Addressing comments from PR #193 with merge from PR#166. Clang-format, version ticked to 2.3.0, replaced duplicate topicOffsetFromLength with packetAdditionalLen. --- Adafruit_MQTT.cpp | 62 ++++++++++++++++------------------------ Adafruit_MQTT_Client.cpp | 2 +- library.properties | 2 +- 3 files changed, 26 insertions(+), 40 deletions(-) diff --git a/Adafruit_MQTT.cpp b/Adafruit_MQTT.cpp index 4c4dd96..a38ec80 100644 --- a/Adafruit_MQTT.cpp +++ b/Adafruit_MQTT.cpp @@ -96,6 +96,22 @@ static uint8_t *stringprint(uint8_t *p, const char *s, uint16_t maxlen = 0) { return p + len; } +// packetAdditionalLen is a helper function used to figure out +// how bigger the payload needs to be in order to account for +// its variable length field. As per +// http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Table_2.4_Size +// See also readFullPacket +static uint16_t packetAdditionalLen(uint32_t currLen) { + /* Increase length field based on current length */ + if (currLen < 128) // 7-bits + return 0; + if (currLen < 16384) // 14-bits + return 1; + if (currLen < 2097152) // 21-bits + return 2; + return 3; +} + // Adafruit_MQTT Definition //////////////////////////////////////////////////// Adafruit_MQTT::Adafruit_MQTT(const char *server, uint16_t port, const char *cid, @@ -267,7 +283,8 @@ uint16_t Adafruit_MQTT::readFullPacket(uint8_t *buffer, uint16_t maxsize, DEBUG_PRINT(F("Packet Length:\t")); DEBUG_PRINTLN(value); - if (value > (maxsize - (pbuff - buffer) - 1)) { + // maxsize is limited to 65536 by 16-bit unsigned + if (value > uint32_t(maxsize - (pbuff - buffer) - 1)) { DEBUG_PRINTLN(F("Packet too big for buffer")); rlen = readPacket(pbuff, (maxsize - (pbuff - buffer) - 1), timeout); } else { @@ -474,22 +491,6 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) { return handleSubscriptionPacket(len); } -namespace { - -uint16_t topicOffsetFromLength(uint16_t const len) -{ - if (len < 128) { // 7 bits (+1 continuation bit) - return 0; - } else if (len < 16384) { // 14 bits (+2 continuation bits) - return 1; - } else if (len < 2097152) { // 21 bits - return 2; - } - return 3; -} - -} // anon - Adafruit_MQTT_Subscribe *Adafruit_MQTT::handleSubscriptionPacket(uint16_t len) { uint16_t i, topiclen, datalen; @@ -508,9 +509,9 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::handleSubscriptionPacket(uint16_t len) { } // Parse out length of packet. - uint16_t const topicoffset = topicOffsetFromLength(len); + uint16_t const topicoffset = packetAdditionalLen(len); uint16_t const topicstart = topicoffset + 4; - topiclen = buffer[3+topicoffset]; + topiclen = buffer[3 + topicoffset]; DEBUG_PRINT(F("Looking for subscription len ")); DEBUG_PRINTLN(topiclen); @@ -523,8 +524,8 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::handleSubscriptionPacket(uint16_t len) { continue; // Stop if the subscription topic matches the received topic. Be careful // to make comparison case insensitive. - if (strncasecmp((char *)buffer + topicstart, subscriptions[i]->topic, topiclen) == - 0) { + if (strncasecmp((char *)buffer + topicstart, subscriptions[i]->topic, + topiclen) == 0) { DEBUG_PRINT(F("Found sub #")); DEBUG_PRINTLN(i); break; @@ -552,8 +553,8 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::handleSubscriptionPacket(uint16_t len) { datalen = SUBSCRIPTIONDATALEN - 1; // cut it off } // extract out just the data, into the subscription object itself - memmove(subscriptions[i]->lastread, buffer + topicstart + topiclen + packet_id_len, - datalen); + memmove(subscriptions[i]->lastread, + buffer + topicstart + topiclen + packet_id_len, datalen); subscriptions[i]->datalen = datalen; DEBUG_PRINT(F("Data len: ")); DEBUG_PRINTLN(datalen); @@ -687,21 +688,6 @@ uint8_t Adafruit_MQTT::connectPacket(uint8_t *packet) { return len; } -// packetAdditionalLen is a helper function used to figure out -// how bigger the payload needs to be in order to account for -// its variable length field. As per -// http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Table_2.4_Size -static uint16_t packetAdditionalLen(uint16_t currLen) { - /* Increase length field based on current length */ - if (currLen < 128) - return 0; - if (currLen < 16384) - return 1; - if (currLen < 2097151) - return 2; - return 3; -} - // as per // http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718040 uint16_t Adafruit_MQTT::publishPacket(uint8_t *packet, const char *topic, diff --git a/Adafruit_MQTT_Client.cpp b/Adafruit_MQTT_Client.cpp index 2720ea9..9aa136c 100644 --- a/Adafruit_MQTT_Client.cpp +++ b/Adafruit_MQTT_Client.cpp @@ -95,7 +95,7 @@ bool Adafruit_MQTT_Client::sendPacket(uint8_t *buffer, uint16_t len) { DEBUG_PRINTLN(ret); len -= ret; offset += ret; - + if (ret != sendlen) { DEBUG_PRINTLN("Failed to send packet."); return false; diff --git a/library.properties b/library.properties index 230e502..d544748 100644 --- a/library.properties +++ b/library.properties @@ -1,5 +1,5 @@ name=Adafruit MQTT Library -version=2.2.0 +version=2.3.0 author=Adafruit maintainer=Adafruit sentence=MQTT library that supports the FONA, ESP8266, Yun, and generic Arduino Client hardware.