Merge pull request #193 from xdylanm/send_rcv_long_packets

Support publishing and receiving large messages.
This commit is contained in:
Brent Rubell 2021-05-14 12:01:40 -04:00 committed by GitHub
commit ea2fb46de3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 33 additions and 28 deletions

View File

@ -96,6 +96,22 @@ static uint8_t *stringprint(uint8_t *p, const char *s, uint16_t maxlen = 0) {
return p + len; return p + len;
} }
// packetAdditionalLen is a helper function used to figure out
// how bigger the payload needs to be in order to account for
// its variable length field. As per
// http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Table_2.4_Size
// See also readFullPacket
static uint16_t packetAdditionalLen(uint32_t currLen) {
/* Increase length field based on current length */
if (currLen < 128) // 7-bits
return 0;
if (currLen < 16384) // 14-bits
return 1;
if (currLen < 2097152) // 21-bits
return 2;
return 3;
}
// Adafruit_MQTT Definition //////////////////////////////////////////////////// // Adafruit_MQTT Definition ////////////////////////////////////////////////////
Adafruit_MQTT::Adafruit_MQTT(const char *server, uint16_t port, const char *cid, Adafruit_MQTT::Adafruit_MQTT(const char *server, uint16_t port, const char *cid,
@ -233,7 +249,7 @@ uint16_t Adafruit_MQTT::readFullPacket(uint8_t *buffer, uint16_t maxsize,
// will read a packet and Do The Right Thing with length // will read a packet and Do The Right Thing with length
uint8_t *pbuff = buffer; uint8_t *pbuff = buffer;
uint8_t rlen; uint16_t rlen;
// read the packet type: // read the packet type:
rlen = readPacket(pbuff, 1, timeout); rlen = readPacket(pbuff, 1, timeout);
@ -267,7 +283,8 @@ uint16_t Adafruit_MQTT::readFullPacket(uint8_t *buffer, uint16_t maxsize,
DEBUG_PRINT(F("Packet Length:\t")); DEBUG_PRINT(F("Packet Length:\t"));
DEBUG_PRINTLN(value); DEBUG_PRINTLN(value);
if (value > (maxsize - (pbuff - buffer) - 1)) { // maxsize is limited to 65536 by 16-bit unsigned
if (value > uint32_t(maxsize - (pbuff - buffer) - 1)) {
DEBUG_PRINTLN(F("Packet too big for buffer")); DEBUG_PRINTLN(F("Packet too big for buffer"));
rlen = readPacket(pbuff, (maxsize - (pbuff - buffer) - 1), timeout); rlen = readPacket(pbuff, (maxsize - (pbuff - buffer) - 1), timeout);
} else { } else {
@ -492,7 +509,9 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::handleSubscriptionPacket(uint16_t len) {
} }
// Parse out length of packet. // Parse out length of packet.
topiclen = buffer[3]; uint16_t const topicoffset = packetAdditionalLen(len);
uint16_t const topicstart = topicoffset + 4;
topiclen = buffer[3 + topicoffset];
DEBUG_PRINT(F("Looking for subscription len ")); DEBUG_PRINT(F("Looking for subscription len "));
DEBUG_PRINTLN(topiclen); DEBUG_PRINTLN(topiclen);
@ -505,8 +524,8 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::handleSubscriptionPacket(uint16_t len) {
continue; continue;
// Stop if the subscription topic matches the received topic. Be careful // Stop if the subscription topic matches the received topic. Be careful
// to make comparison case insensitive. // to make comparison case insensitive.
if (strncasecmp((char *)buffer + 4, subscriptions[i]->topic, topiclen) == if (strncasecmp((char *)buffer + topicstart, subscriptions[i]->topic,
0) { topiclen) == 0) {
DEBUG_PRINT(F("Found sub #")); DEBUG_PRINT(F("Found sub #"));
DEBUG_PRINTLN(i); DEBUG_PRINTLN(i);
break; break;
@ -521,21 +540,21 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::handleSubscriptionPacket(uint16_t len) {
// Check if it is QoS 1, TODO: we dont support QoS 2 // Check if it is QoS 1, TODO: we dont support QoS 2
if ((buffer[0] & 0x6) == 0x2) { if ((buffer[0] & 0x6) == 0x2) {
packet_id_len = 2; packet_id_len = 2;
packetid = buffer[topiclen + 4]; packetid = buffer[topiclen + topicstart];
packetid <<= 8; packetid <<= 8;
packetid |= buffer[topiclen + 5]; packetid |= buffer[topiclen + topicstart + 1];
} }
// zero out the old data // zero out the old data
memset(subscriptions[i]->lastread, 0, SUBSCRIPTIONDATALEN); memset(subscriptions[i]->lastread, 0, SUBSCRIPTIONDATALEN);
datalen = len - topiclen - packet_id_len - 4; datalen = len - topiclen - packet_id_len - topicstart;
if (datalen > SUBSCRIPTIONDATALEN) { if (datalen > SUBSCRIPTIONDATALEN) {
datalen = SUBSCRIPTIONDATALEN - 1; // cut it off datalen = SUBSCRIPTIONDATALEN - 1; // cut it off
} }
// extract out just the data, into the subscription object itself // extract out just the data, into the subscription object itself
memmove(subscriptions[i]->lastread, buffer + 4 + topiclen + packet_id_len, memmove(subscriptions[i]->lastread,
datalen); buffer + topicstart + topiclen + packet_id_len, datalen);
subscriptions[i]->datalen = datalen; subscriptions[i]->datalen = datalen;
DEBUG_PRINT(F("Data len: ")); DEBUG_PRINT(F("Data len: "));
DEBUG_PRINTLN(datalen); DEBUG_PRINTLN(datalen);
@ -669,21 +688,6 @@ uint8_t Adafruit_MQTT::connectPacket(uint8_t *packet) {
return len; return len;
} }
// packetAdditionalLen is a helper function used to figure out
// how bigger the payload needs to be in order to account for
// its variable length field. As per
// http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Table_2.4_Size
static uint16_t packetAdditionalLen(uint16_t currLen) {
/* Increase length field based on current length */
if (currLen < 128)
return 0;
if (currLen < 16384)
return 1;
if (currLen < 2097151)
return 2;
return 3;
}
// as per // as per
// http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718040 // http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718040
uint16_t Adafruit_MQTT::publishPacket(uint8_t *packet, const char *topic, uint16_t Adafruit_MQTT::publishPacket(uint8_t *packet, const char *topic,

View File

@ -83,17 +83,18 @@ uint16_t Adafruit_MQTT_Client::readPacket(uint8_t *buffer, uint16_t maxlen,
bool Adafruit_MQTT_Client::sendPacket(uint8_t *buffer, uint16_t len) { bool Adafruit_MQTT_Client::sendPacket(uint8_t *buffer, uint16_t len) {
uint16_t ret = 0; uint16_t ret = 0;
uint16_t offset = 0;
while (len > 0) { while (len > 0) {
if (client->connected()) { if (client->connected()) {
// send 250 bytes at most at a time, can adjust this later based on Client // send 250 bytes at most at a time, can adjust this later based on Client
uint16_t sendlen = len > 250 ? 250 : len; uint16_t sendlen = len > 250 ? 250 : len;
// Serial.print("Sending: "); Serial.println(sendlen); // Serial.print("Sending: "); Serial.println(sendlen);
ret = client->write(buffer, sendlen); ret = client->write(buffer + offset, sendlen);
DEBUG_PRINT(F("Client sendPacket returned: ")); DEBUG_PRINT(F("Client sendPacket returned: "));
DEBUG_PRINTLN(ret); DEBUG_PRINTLN(ret);
len -= ret; len -= ret;
offset += ret;
if (ret != sendlen) { if (ret != sendlen) {
DEBUG_PRINTLN("Failed to send packet."); DEBUG_PRINTLN("Failed to send packet.");

View File

@ -1,5 +1,5 @@
name=Adafruit MQTT Library name=Adafruit MQTT Library
version=2.2.0 version=2.3.0
author=Adafruit author=Adafruit
maintainer=Adafruit <info@adafruit.com> maintainer=Adafruit <info@adafruit.com>
sentence=MQTT library that supports the FONA, ESP8266, Yun, and generic Arduino Client hardware. sentence=MQTT library that supports the FONA, ESP8266, Yun, and generic Arduino Client hardware.