Merge pull request #170 from Fapiko/better-ping-handling

Better ping handling
This commit is contained in:
Brent Rubell 2020-11-06 10:39:12 -05:00 committed by GitHub
commit f63a37b6fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 32 additions and 25 deletions

View File

@ -152,10 +152,12 @@ int8_t Adafruit_MQTT::connect() {
// Read connect response packet and verify it // Read connect response packet and verify it
len = readFullPacket(buffer, MAXBUFFERSIZE, CONNECT_TIMEOUT_MS); len = readFullPacket(buffer, MAXBUFFERSIZE, CONNECT_TIMEOUT_MS);
if (len != 4) if (len != 4) {
return -1; return -1;
if ((buffer[0] != (MQTT_CTRL_CONNECTACK << 4)) || (buffer[1] != 2)) }
if ((buffer[0] != (MQTT_CTRL_CONNECTACK << 4)) || (buffer[1] != 2)) {
return -1; return -1;
}
if (buffer[3] != 0) if (buffer[3] != 0)
return buffer[3]; return buffer[3];
@ -182,7 +184,6 @@ int8_t Adafruit_MQTT::connect() {
// the Subscription before the Server sends the SUBACK Packet. (will // the Subscription before the Server sends the SUBACK Packet. (will
// really need to use callbacks - ada) // really need to use callbacks - ada)
// Serial.println("\t**looking for suback");
if (processPacketsUntil(buffer, MQTT_CTRL_SUBACK, SUBACK_TIMEOUT_MS)) { if (processPacketsUntil(buffer, MQTT_CTRL_SUBACK, SUBACK_TIMEOUT_MS)) {
success = true; success = true;
break; break;
@ -213,10 +214,15 @@ uint16_t Adafruit_MQTT::processPacketsUntil(uint8_t *buffer,
break; break;
} }
if ((buffer[0] >> 4) == waitforpackettype) { uint8_t packetType = (buffer[0] >> 4);
if (packetType == waitforpackettype) {
return len; return len;
} else { } else {
ERROR_PRINTLN(F("Dropped a packet")); if (packetType == MQTT_CTRL_PUBLISH) {
handleSubscriptionPacket(len);
} else {
ERROR_PRINTLN(F("Dropped a packet"));
}
} }
} }
return 0; return 0;
@ -432,30 +438,21 @@ void Adafruit_MQTT::processPackets(int16_t timeout) {
while (elapsed < (uint32_t)timeout) { while (elapsed < (uint32_t)timeout) {
Adafruit_MQTT_Subscribe *sub = readSubscription(timeout - elapsed); Adafruit_MQTT_Subscribe *sub = readSubscription(timeout - elapsed);
if (sub) { if (sub) {
// Serial.println("**** sub packet received");
if (sub->callback_uint32t != NULL) { if (sub->callback_uint32t != NULL) {
// huh lets do the callback in integer mode // huh lets do the callback in integer mode
uint32_t data = 0; uint32_t data = 0;
data = atoi((char *)sub->lastread); data = atoi((char *)sub->lastread);
// Serial.print("*** calling int callback with : ");
// Serial.println(data);
sub->callback_uint32t(data); sub->callback_uint32t(data);
} else if (sub->callback_double != NULL) { } else if (sub->callback_double != NULL) {
// huh lets do the callback in doublefloat mode // huh lets do the callback in doublefloat mode
double data = 0; double data = 0;
data = atof((char *)sub->lastread); data = atof((char *)sub->lastread);
// Serial.print("*** calling double callback with : ");
// Serial.println(data);
sub->callback_double(data); sub->callback_double(data);
} else if (sub->callback_buffer != NULL) { } else if (sub->callback_buffer != NULL) {
// huh lets do the callback in buffer mode // huh lets do the callback in buffer mode
// Serial.print("*** calling buffer callback with : ");
// Serial.println((char *)sub->lastread);
sub->callback_buffer((char *)sub->lastread, sub->datalen); sub->callback_buffer((char *)sub->lastread, sub->datalen);
} else if (sub->callback_io != NULL) { } else if (sub->callback_io != NULL) {
// huh lets do the callback in io mode // huh lets do the callback in io mode
// Serial.print("*** calling io instance callback with : ");
// Serial.println((char *)sub->lastread);
((sub->io_mqtt)->*(sub->callback_io))((char *)sub->lastread, ((sub->io_mqtt)->*(sub->callback_io))((char *)sub->lastread,
sub->datalen); sub->datalen);
} }
@ -469,23 +466,29 @@ void Adafruit_MQTT::processPackets(int16_t timeout) {
elapsed += (endtime - starttime); elapsed += (endtime - starttime);
} }
} }
Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) { Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) {
uint16_t i, topiclen, datalen;
// Check if data is available to read. // Check if data is available to read.
uint16_t len = uint16_t len =
readFullPacket(buffer, MAXBUFFERSIZE, timeout); // return one full packet readFullPacket(buffer, MAXBUFFERSIZE, timeout); // return one full packet
if (!len) return handleSubscriptionPacket(len);
}
Adafruit_MQTT_Subscribe *Adafruit_MQTT::handleSubscriptionPacket(uint16_t len) {
uint16_t i, topiclen, datalen;
if (!len) {
return NULL; // No data available, just quit. return NULL; // No data available, just quit.
}
DEBUG_PRINT("Packet len: "); DEBUG_PRINT("Packet len: ");
DEBUG_PRINTLN(len); DEBUG_PRINTLN(len);
DEBUG_PRINTBUFFER(buffer, len); DEBUG_PRINTBUFFER(buffer, len);
if (len < 3) if (len < 3) {
return NULL; return NULL;
if ((buffer[0] & 0xF0) != (MQTT_CTRL_PUBLISH) << 4) }
if ((buffer[0] & 0xF0) != (MQTT_CTRL_PUBLISH) << 4) {
return NULL; return NULL;
}
// Parse out length of packet. // Parse out length of packet.
topiclen = buffer[3]; topiclen = buffer[3];

View File

@ -206,6 +206,10 @@ public:
// messages! // messages!
Adafruit_MQTT_Subscribe *readSubscription(int16_t timeout = 0); Adafruit_MQTT_Subscribe *readSubscription(int16_t timeout = 0);
// Handle any data coming in for subscriptions and fires them off to the
// appropriate callback
Adafruit_MQTT_Subscribe *handleSubscriptionPacket(uint16_t len);
void processPackets(int16_t timeout); void processPackets(int16_t timeout);
// Ping the server to ensure the connection is still alive. // Ping the server to ensure the connection is still alive.

View File

@ -55,6 +55,10 @@ uint16_t Adafruit_MQTT_Client::readPacket(uint8_t *buffer, uint16_t maxlen,
uint16_t len = 0; uint16_t len = 0;
int16_t t = timeout; int16_t t = timeout;
if (maxlen == 0) { // handle zero-length packets
return 0;
}
while (client->connected() && (timeout >= 0)) { while (client->connected() && (timeout >= 0)) {
// DEBUG_PRINT('.'); // DEBUG_PRINT('.');
while (client->available()) { while (client->available()) {
@ -65,10 +69,6 @@ uint16_t Adafruit_MQTT_Client::readPacket(uint8_t *buffer, uint16_t maxlen,
// DEBUG_PRINTLN((uint8_t)c, HEX); // DEBUG_PRINTLN((uint8_t)c, HEX);
len++; len++;
if (maxlen == 0) { // handle zero-length packets
return 0;
}
if (len == maxlen) { // we read all we want, bail if (len == maxlen) { // we read all we want, bail
DEBUG_PRINT(F("Read data:\t")); DEBUG_PRINT(F("Read data:\t"));
DEBUG_PRINTBUFFER(buffer, len); DEBUG_PRINTBUFFER(buffer, len);

View File

@ -1,5 +1,5 @@
name=Adafruit MQTT Library name=Adafruit MQTT Library
version=2.0.0 version=2.1.0
author=Adafruit author=Adafruit
maintainer=Adafruit <info@adafruit.com> maintainer=Adafruit <info@adafruit.com>
sentence=MQTT library that supports the FONA, ESP8266, Yun, and generic Arduino Client hardware. sentence=MQTT library that supports the FONA, ESP8266, Yun, and generic Arduino Client hardware.