From 8f595cdb955adb561c5d1cafcdf737ec5e68d88c Mon Sep 17 00:00:00 2001 From: ladyada Date: Mon, 8 Feb 2016 23:36:25 -0500 Subject: [PATCH] properly process packets until you get the one you want (prep for callback w/good retain support) --- Adafruit_MQTT.cpp | 71 +++++++++++++++++++++++++++++++++++++++++++++-- Adafruit_MQTT.h | 12 +++++++- 2 files changed, 79 insertions(+), 4 deletions(-) diff --git a/Adafruit_MQTT.cpp b/Adafruit_MQTT.cpp index 3a27aab..c4ff4e2 100644 --- a/Adafruit_MQTT.cpp +++ b/Adafruit_MQTT.cpp @@ -191,7 +191,7 @@ int8_t Adafruit_MQTT::connect() { // TODO: The Server is permitted to start sending PUBLISH packets matching the // Subscription before the Server sends the SUBACK Packet. (will really need to use callbacks - ada) if(MQTT_PROTOCOL_LEVEL > 3) { - len = readPacket(buffer, 5, CONNECT_TIMEOUT_MS); + len = processPacketsUntil(buffer, MQTT_CTRL_SUBACK, CONNECT_TIMEOUT_MS); DEBUG_PRINT(F("SubAck:\t")); DEBUG_PRINTBUFFER(buffer, len); if ((len != 5) || (buffer[0] != (MQTT_CTRL_SUBACK << 4))) { @@ -204,6 +204,61 @@ int8_t Adafruit_MQTT::connect() { return 0; } +uint16_t Adafruit_MQTT::processPacketsUntil(uint8_t *buffer, uint8_t waitforpackettype, uint16_t timeout) { + uint16_t len; + while (len = readFullPacket(buffer, timeout)) { + + // TODO: add subscription reading & processing here + + if ((buffer[0] >> 4) == waitforpackettype) { + //DEBUG_PRINTLN(F("Found right packet")); + return len; + } + } + return 0; +} + +uint16_t Adafruit_MQTT::readFullPacket(uint8_t *buffer, uint16_t timeout) { + // will read a packet and Do The Right Thing with length + uint8_t *pbuff = buffer; + + uint8_t rlen; + + // read the packet type: + rlen = readPacket(pbuff, 1, timeout); + if (rlen != 1) return 0; + + //DEBUG_PRINT(F("Packet Type:\t")); DEBUG_PRINTBUFFER(pbuff, rlen); + pbuff++; + + uint32_t value = 0; + uint32_t multiplier = 1; + uint8_t encodedByte; + + do { + rlen = readPacket(pbuff, 1, timeout); + if (rlen != 1) return 0; + encodedByte = pbuff[0]; // save the last read val + pbuff++; // get ready for reading the next byte + + uint32_t intermediate = encodedByte & 0x7F; + intermediate *= multiplier; + value += intermediate; + multiplier *= 128; + if (multiplier > 128*128*128) { + DEBUG_PRINT(F("Malformed packet len\n")); + return 0; + } + } while (encodedByte & 0x80); + + //DEBUG_PRINT(F("Packet Length:\t")); DEBUG_PRINTLN(value); + + rlen = readPacket(pbuff, value, timeout); + //DEBUG_PRINT(F("Remaining packet:\t")); DEBUG_PRINTBUFFER(pbuff, rlen); + + return ((pbuff - buffer)+rlen); +} + const __FlashStringHelper* Adafruit_MQTT::connectErrorString(int8_t code) { switch (code) { case 1: return F("Wrong protocol"); @@ -415,7 +470,7 @@ void Adafruit_MQTT::flushIncoming(uint16_t timeout) { } bool Adafruit_MQTT::ping(uint8_t num) { - flushIncoming(100); + //flushIncoming(100); while (num--) { // Construct and send ping packet. @@ -424,7 +479,7 @@ bool Adafruit_MQTT::ping(uint8_t num) { continue; // Process ping reply. - len = readPacket(buffer, 2, PING_TIMEOUT_MS); + len = processPacketsUntil(buffer, MQTT_CTRL_PINGRESP, PING_TIMEOUT_MS); if (buffer[0] == (MQTT_CTRL_PINGRESP << 4)) return true; } @@ -692,6 +747,7 @@ Adafruit_MQTT_Subscribe::Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver, mqtt = mqttserver; topic = feed; qos = q; + callback = 0; } Adafruit_MQTT_Subscribe::Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver, @@ -699,4 +755,13 @@ Adafruit_MQTT_Subscribe::Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver, mqtt = mqttserver; topic = (const char *)feed; qos = q; + callback = 0; +} + +void Adafruit_MQTT_Subscribe::setCallback(SubscribeCallbackType cb) { + callback = cb; +} + +void Adafruit_MQTT_Subscribe::removeCallback(void) { + callback = 0; } diff --git a/Adafruit_MQTT.h b/Adafruit_MQTT.h index 6898752..a747216 100644 --- a/Adafruit_MQTT.h +++ b/Adafruit_MQTT.h @@ -95,6 +95,9 @@ // eg max-subscription-payload-size #define SUBSCRIPTIONDATALEN 20 +//Function pointer called CallbackType that takes a float +//and returns an int +typedef void (*SubscribeCallbackType)(char *); extern void printBuffer(uint8_t *buffer, uint8_t len); @@ -202,6 +205,11 @@ class Adafruit_MQTT { virtual uint16_t readPacket(uint8_t *buffer, uint8_t maxlen, int16_t timeout, bool checkForValidPubPacket = false) = 0; + // Read a full packet, keeping note of the correct length + uint16_t readFullPacket(uint8_t *buffer, uint16_t timeout); + // Properly process packets until you get to one you want + uint16_t processPacketsUntil(uint8_t *buffer, uint8_t waitforpackettype, uint16_t timeout); + // Shared state that subclasses can use: const char *servername; int16_t portnum; @@ -255,7 +263,8 @@ class Adafruit_MQTT_Subscribe { Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver, const char *feedname, uint8_t q=0); Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver, const __FlashStringHelper *feedname, uint8_t q=0); - bool setCallback(void (*callback)(char *)); + void setCallback(SubscribeCallbackType callb); + void removeCallback(void); const char *topic; uint8_t qos; @@ -265,6 +274,7 @@ class Adafruit_MQTT_Subscribe { // ensure nul terminating lastread. uint8_t datalen; private: + SubscribeCallbackType callback; Adafruit_MQTT *mqtt; };