From 4269d874925e1b56f2d9d2ea51488d2a49766d4b Mon Sep 17 00:00:00 2001 From: ladyada Date: Fri, 8 Jul 2016 17:01:40 -0400 Subject: [PATCH] callback support --- Adafruit_MQTT.cpp | 75 ++++++++++++++++++++++++++++++++++++++++------- Adafruit_MQTT.h | 25 +++++++++++----- 2 files changed, 83 insertions(+), 17 deletions(-) diff --git a/Adafruit_MQTT.cpp b/Adafruit_MQTT.cpp index 861bc1f..6106a07 100644 --- a/Adafruit_MQTT.cpp +++ b/Adafruit_MQTT.cpp @@ -219,11 +219,12 @@ int8_t Adafruit_MQTT::connect() { // TODO: The Server is permitted to start sending PUBLISH packets matching the // Subscription before the Server sends the SUBACK Packet. (will really need to use callbacks - ada) - len = processPacketsUntil(buffer, MQTT_CTRL_SUBACK, SUBACK_TIMEOUT_MS); - if ((len != 5) || (buffer[0] != (MQTT_CTRL_SUBACK << 4))) { - continue; // retry! + //Serial.println("\t**looking for suback"); + if (processPacketsUntil(buffer, MQTT_CTRL_SUBACK, SUBACK_TIMEOUT_MS)) { + success = true; + break; } - success = true; + //Serial.println("\t**failed, retrying!"); } if (! success) return -2; // failed to sub for some reason } @@ -431,8 +432,48 @@ bool Adafruit_MQTT::unsubscribe(Adafruit_MQTT_Subscribe *sub) { } +void Adafruit_MQTT::processPackets(int16_t timeout) { + uint16_t len; + + uint32_t elapsed = 0, endtime, starttime = millis(); + + while (elapsed < (uint32_t)timeout) { + Adafruit_MQTT_Subscribe *sub = readSubscription(timeout - elapsed); + + if (sub) { + //Serial.println("**** sub packet received"); + if (sub->callback_uint32t != NULL) { + // huh lets do the callback in integer mode + uint32_t data = 0; + data = atoi((char *)sub->lastread); + //Serial.print("*** calling int callback with : "); Serial.println(data); + sub->callback_uint32t(data); + } + else if (sub->callback_double != NULL) { + // huh lets do the callback in doublefloat mode + double data = 0; + data = atof((char *)sub->lastread); + //Serial.print("*** calling double callback with : "); Serial.println(data); + sub->callback_double(data); + } + else if (sub->callback_buffer != NULL) { + // huh lets do the callback in buffer mode + //Serial.print("*** calling buffer callback with : "); Serial.println(data); + sub->callback_buffer((char *)sub->lastread, sub->datalen); + } + } + + // keep track over elapsed time + endtime = millis(); + if (endtime < starttime) { + starttime = endtime; // looped around!") + } + elapsed += (endtime - starttime); + } +} + Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) { - uint8_t i, topiclen, datalen; + uint16_t i, topiclen, datalen; // Check if data is available to read. uint16_t len = readFullPacket(buffer, MAXBUFFERSIZE, timeout); // return one full packet @@ -801,7 +842,9 @@ Adafruit_MQTT_Subscribe::Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver, topic = feed; qos = q; datalen = 0; - callback = 0; + callback_uint32t = 0; + callback_buffer = 0; + callback_double = 0; } Adafruit_MQTT_Subscribe::Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver, @@ -810,13 +853,25 @@ Adafruit_MQTT_Subscribe::Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver, topic = (const char *)feed; qos = q; datalen = 0; - callback = 0; + callback_uint32t = 0; + callback_buffer = 0; + callback_double = 0; } -void Adafruit_MQTT_Subscribe::setCallback(SubscribeCallbackType cb) { - callback = cb; +void Adafruit_MQTT_Subscribe::setCallback(SubscribeCallbackUInt32Type cb) { + callback_uint32t = cb; +} + +void Adafruit_MQTT_Subscribe::setCallback(SubscribeCallbackDoubleType cb) { + callback_double = cb; +} + +void Adafruit_MQTT_Subscribe::setCallback(SubscribeCallbackBufferType cb) { + callback_buffer = cb; } void Adafruit_MQTT_Subscribe::removeCallback(void) { - callback = 0; + callback_uint32t = 0; + callback_buffer = 0; + callback_double = 0; } diff --git a/Adafruit_MQTT.h b/Adafruit_MQTT.h index 23431c7..4813541 100644 --- a/Adafruit_MQTT.h +++ b/Adafruit_MQTT.h @@ -30,7 +30,7 @@ #endif // Uncomment/comment to turn on/off debug output messages. -#define MQTT_DEBUG +//#define MQTT_DEBUG // Uncomment/comment to turn on/off error output messages. #define MQTT_ERROR @@ -98,9 +98,12 @@ // eg max-subscription-payload-size #define SUBSCRIPTIONDATALEN 20 -//Function pointer called CallbackType that takes a float -//and returns an int -typedef void (*SubscribeCallbackType)(char *); +//Function pointer that returns an int +typedef void (*SubscribeCallbackUInt32Type)(uint32_t); +// returns a double +typedef void (*SubscribeCallbackDoubleType)(double); +// returns a chunk of raw data +typedef void (*SubscribeCallbackBufferType)(char *str, uint16_t len); extern void printBuffer(uint8_t *buffer, uint16_t len); @@ -186,6 +189,8 @@ class Adafruit_MQTT { // that subscribe should be called first for each topic that receives messages! Adafruit_MQTT_Subscribe *readSubscription(int16_t timeout=0); + void processPackets(int16_t timeout); + // Ping the server to ensure the connection is still alive. bool ping(uint8_t n = 1); @@ -264,7 +269,9 @@ class Adafruit_MQTT_Subscribe { Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver, const char *feedname, uint8_t q=0); Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver, const __FlashStringHelper *feedname, uint8_t q=0); - void setCallback(SubscribeCallbackType callb); + void setCallback(SubscribeCallbackUInt32Type callb); + void setCallback(SubscribeCallbackDoubleType callb); + void setCallback(SubscribeCallbackBufferType callb); void removeCallback(void); const char *topic; @@ -273,9 +280,13 @@ class Adafruit_MQTT_Subscribe { uint8_t lastread[SUBSCRIPTIONDATALEN]; // Number valid bytes in lastread. Limited to SUBSCRIPTIONDATALEN-1 to // ensure nul terminating lastread. - uint8_t datalen; + uint16_t datalen; + + SubscribeCallbackUInt32Type callback_uint32t; + SubscribeCallbackDoubleType callback_double; + SubscribeCallbackBufferType callback_buffer; + private: - SubscribeCallbackType callback; Adafruit_MQTT *mqtt; };