Support publishing and receiving large messages. Send multiple 250 byte packets (ref PR#113) for larger messages. Correctly identify topic start position for >127byte messages (ref PR#166). Resolves issue #102
This commit is contained in:
parent
e8922f60a9
commit
2ea8a0b3fc
@ -233,7 +233,7 @@ uint16_t Adafruit_MQTT::readFullPacket(uint8_t *buffer, uint16_t maxsize,
|
||||
// will read a packet and Do The Right Thing with length
|
||||
uint8_t *pbuff = buffer;
|
||||
|
||||
uint8_t rlen;
|
||||
uint16_t rlen;
|
||||
|
||||
// read the packet type:
|
||||
rlen = readPacket(pbuff, 1, timeout);
|
||||
@ -473,6 +473,22 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) {
|
||||
return handleSubscriptionPacket(len);
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
uint16_t topicOffsetFromLenth(uint16_t const len)
|
||||
{
|
||||
if (len < 128) { // 7 bits (+1 continuation bit)
|
||||
return 0;
|
||||
} else if (len < 16384) { // 14 bits (+2 continuation bits)
|
||||
return 1;
|
||||
} else if (len < 2097152) { // 21 bits
|
||||
return 2;
|
||||
}
|
||||
return 3;
|
||||
}
|
||||
|
||||
} // anon
|
||||
|
||||
Adafruit_MQTT_Subscribe *Adafruit_MQTT::handleSubscriptionPacket(uint16_t len) {
|
||||
uint16_t i, topiclen, datalen;
|
||||
|
||||
@ -491,7 +507,9 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::handleSubscriptionPacket(uint16_t len) {
|
||||
}
|
||||
|
||||
// Parse out length of packet.
|
||||
topiclen = buffer[3];
|
||||
uint16_t const topicoffset = topicOffsetFromLength(len);
|
||||
uint16_t const topicstart = topicoffset + 4;
|
||||
topiclen = buffer[3+topicoffset];
|
||||
DEBUG_PRINT(F("Looking for subscription len "));
|
||||
DEBUG_PRINTLN(topiclen);
|
||||
|
||||
@ -504,7 +522,7 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::handleSubscriptionPacket(uint16_t len) {
|
||||
continue;
|
||||
// Stop if the subscription topic matches the received topic. Be careful
|
||||
// to make comparison case insensitive.
|
||||
if (strncasecmp((char *)buffer + 4, subscriptions[i]->topic, topiclen) ==
|
||||
if (strncasecmp((char *)buffer + topicstart, subscriptions[i]->topic, topiclen) ==
|
||||
0) {
|
||||
DEBUG_PRINT(F("Found sub #"));
|
||||
DEBUG_PRINTLN(i);
|
||||
@ -520,20 +538,20 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::handleSubscriptionPacket(uint16_t len) {
|
||||
// Check if it is QoS 1, TODO: we dont support QoS 2
|
||||
if ((buffer[0] & 0x6) == 0x2) {
|
||||
packet_id_len = 2;
|
||||
packetid = buffer[topiclen + 4];
|
||||
packetid = buffer[topiclen + topicstart];
|
||||
packetid <<= 8;
|
||||
packetid |= buffer[topiclen + 5];
|
||||
packetid |= buffer[topiclen + topicstart + 1];
|
||||
}
|
||||
|
||||
// zero out the old data
|
||||
memset(subscriptions[i]->lastread, 0, SUBSCRIPTIONDATALEN);
|
||||
|
||||
datalen = len - topiclen - packet_id_len - 4;
|
||||
datalen = len - topiclen - packet_id_len - topicstart;
|
||||
if (datalen > SUBSCRIPTIONDATALEN) {
|
||||
datalen = SUBSCRIPTIONDATALEN - 1; // cut it off
|
||||
}
|
||||
// extract out just the data, into the subscription object itself
|
||||
memmove(subscriptions[i]->lastread, buffer + 4 + topiclen + packet_id_len,
|
||||
memmove(subscriptions[i]->lastread, buffer + topicstart + topiclen + packet_id_len,
|
||||
datalen);
|
||||
subscriptions[i]->datalen = datalen;
|
||||
DEBUG_PRINT(F("Data len: "));
|
||||
|
@ -83,17 +83,18 @@ uint16_t Adafruit_MQTT_Client::readPacket(uint8_t *buffer, uint16_t maxlen,
|
||||
|
||||
bool Adafruit_MQTT_Client::sendPacket(uint8_t *buffer, uint16_t len) {
|
||||
uint16_t ret = 0;
|
||||
|
||||
uint16_t offset = 0;
|
||||
while (len > 0) {
|
||||
if (client->connected()) {
|
||||
// send 250 bytes at most at a time, can adjust this later based on Client
|
||||
|
||||
uint16_t sendlen = len > 250 ? 250 : len;
|
||||
// Serial.print("Sending: "); Serial.println(sendlen);
|
||||
ret = client->write(buffer, sendlen);
|
||||
ret = client->write(buffer + offset, sendlen);
|
||||
DEBUG_PRINT(F("Client sendPacket returned: "));
|
||||
DEBUG_PRINTLN(ret);
|
||||
len -= ret;
|
||||
offset += ret;
|
||||
|
||||
if (ret != sendlen) {
|
||||
DEBUG_PRINTLN("Failed to send packet.");
|
||||
|
Loading…
Reference in New Issue
Block a user