diff --git a/Adafruit_MQTT.cpp b/Adafruit_MQTT.cpp index ee59af7..8247146 100644 --- a/Adafruit_MQTT.cpp +++ b/Adafruit_MQTT.cpp @@ -8,6 +8,10 @@ Adafruit_MQTT::Adafruit_MQTT(const char *server, uint16_t port, const PROGMEM ch clientid = cid; username = user; password = pass; + + for (uint8_t i=0; ipublish(topic, payload, qos); } + +Adafruit_MQTT_Subscribe::Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver, const char *feed, uint8_t q) { + mqtt = mqttserver; + topic = feed; + qos = q; +} diff --git a/Adafruit_MQTT.h b/Adafruit_MQTT.h index d7d6031..24e0835 100644 --- a/Adafruit_MQTT.h +++ b/Adafruit_MQTT.h @@ -12,6 +12,8 @@ #define MQTT_CTRL_CONNECT 0x01 #define MQTT_CTRL_CONNECTACK 0x02 #define MQTT_CTRL_PUBLISH 0x03 +#define MQTT_CTRL_SUBSCRIBE 0x08 +#define MQTT_CTRL_SUBACK 0x09 #define MQTT_CTRL_PINGREQ 0x0C #define MQTT_CTRL_PINGRESP 0x0D @@ -39,7 +41,18 @@ #define MQTT_CONN_CLEANSESSION 0x02 #define MQTT_CONN_KEEPALIVE 15 // in seconds -#define MAXBUFFERSIZE (60) +#define MAXBUFFERSIZE (85) +#define MAXSUBSCRIPTIONS 5 +#define SUBSCRIPTIONDATALEN 20 + + +//#define DEBUG_MQTT_CONNECT +//#define DEBUG_MQTT_SUBSCRIBE +//#define DEBUG_MQTT_READSUB +//#define DEBUG_MQTT_PUBLISH +//#define DEBUG_MQTT_PACKETREAD + +class Adafruit_MQTT_Subscribe; // forward decl class Adafruit_MQTT { public: @@ -53,6 +66,11 @@ class Adafruit_MQTT { virtual boolean ping(uint8_t t) {} uint8_t pingPacket(uint8_t *packet); + virtual boolean subscribe(Adafruit_MQTT_Subscribe *sub) {} + uint8_t subscribePacket(uint8_t *packet, const char *topic, uint8_t qos); + + virtual Adafruit_MQTT_Subscribe *readSubscription(int16_t timeout = 0) {}; + protected: int8_t errno; const char *servername; @@ -62,6 +80,7 @@ class Adafruit_MQTT { const char *username; const char *password; + Adafruit_MQTT_Subscribe *subscriptions[MAXSUBSCRIPTIONS]; uint8_t buffer[MAXBUFFERSIZE]; }; @@ -82,10 +101,16 @@ private: class Adafruit_MQTT_Subscribe { public: - Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver, char *feedname); + Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver, const char *feedname, uint8_t q=0); bool setCallback(void (*callback)(char *)); + const char *topic; + uint8_t qos; + + uint8_t * lastread[SUBSCRIPTIONDATALEN]; + private: + Adafruit_MQTT *mqtt; }; diff --git a/Adafruit_MQTT_CC3000.cpp b/Adafruit_MQTT_CC3000.cpp index dd6f0cc..b3993c9 100644 --- a/Adafruit_MQTT_CC3000.cpp +++ b/Adafruit_MQTT_CC3000.cpp @@ -2,6 +2,22 @@ #include "Adafruit_MQTT_CC3000.h" #include +static void printBuffer(uint8_t *buffer, uint8_t len) { + for (uint8_t i=0; iconnectTCP(serverip, portnum); uint8_t len = connectPacket(buffer); - Serial.println(F("MQTT connection packet:")); - for (uint8_t i=0; itopic, subscriptions[i]->qos); + +#ifdef DEBUG_MQTT_CONNECT + Serial.println(F("MQTT subscription packet:")); printBuffer(buffer, len); +#endif + + if (mqttclient.connected()) { + uint16_t ret = mqttclient.write(buffer, len); +#ifdef DEBUG_MQTT_CONNECT + Serial.print("returned: "); Serial.println(ret); +#endif + if (ret != len) return -1; + } else { +#ifdef DEBUG_MQTT_CONNECT + Serial.println(F("Connection failed")); +#endif + return -1; + } + + // Get SUBACK + len = readPacket(buffer, 5, CONNECT_TIMEOUT_MS); +#ifdef DEBUG_MQTT_CONNECT + Serial.print(F("SUBACK:\t")); printBuffer(buffer, len); +#endif + if ((len != 5) || (buffer[0] != (MQTT_CTRL_SUBACK << 4))) { + return 6; // failure to subscribe + } } - return -1; + return 0; } -uint16_t Adafruit_MQTT_CC3000::readPacket(uint8_t *buffer, uint8_t maxlen, int16_t timeout) { +uint16_t Adafruit_MQTT_CC3000::readPacket(uint8_t *buffer, uint8_t maxlen, int16_t timeout, boolean checkForValidPubPacket) { /* Read data until either the connection is closed, or the idle timeout is reached. */ uint16_t len = 0; int16_t t = timeout; - while (mqttclient.connected() && (timeout > 0)) { - Serial.print('.'); + while (mqttclient.connected() && (timeout >= 0)) { + //Serial.print('.'); while (mqttclient.available()) { - Serial.print('!'); + //Serial.print('!'); char c = mqttclient.read(); timeout = t; // reset the timeout buffer[len] = c; //Serial.print((uint8_t)c,HEX); len++; if (len == maxlen) { // we read all we want, bail - - Serial.print(F("Read packet:\t")); - for (uint8_t i=0; i 0) { - Serial.println(F("Reply:")); len = readPacket(buffer, 4, PUBLISH_TIMEOUT_MS); - for (uint8_t i=0; itopic+k) ) + flag = false; + } + if (flag) { +#ifdef DEBUG_MQTT_READSUB + Serial.println((char *)buffer+4); + Serial.print(F("Found sub #")); Serial.println(i); +#endif + break; + } + } + } + if (i==MAXSUBSCRIPTIONS) return NULL; // matching sub not found ??? + + // zero out the old data + memset(subscriptions[i]->lastread, 0, SUBSCRIPTIONDATALEN); + + datalen = len - topiclen - 4; + if (datalen > SUBSCRIPTIONDATALEN) { + datalen = SUBSCRIPTIONDATALEN-1; // cut it off + } + // extract out just the data, into the subscription object itself + memcpy(subscriptions[i]->lastread, buffer+4+topiclen, datalen); + +#ifdef DEBUG_MQTT_READSUB + Serial.print(F("Data len: ")); Serial.println(datalen); + Serial.print("Data: "); Serial.println((char *)subscriptions[i]->lastread); +#endif + + // return the valid matching subscription + return subscriptions[i]; +} diff --git a/Adafruit_MQTT_CC3000.h b/Adafruit_MQTT_CC3000.h index 5077757..fae0279 100644 --- a/Adafruit_MQTT_CC3000.h +++ b/Adafruit_MQTT_CC3000.h @@ -6,16 +6,24 @@ #include "Adafruit_MQTT_CC3000.h" #include +// delay in ms between calls of available() +#define MQTT_CC3000_INTERAVAILDELAY 10 + + class Adafruit_MQTT_CC3000 : public Adafruit_MQTT { public: Adafruit_MQTT_CC3000(Adafruit_CC3000 *cc3k, const char *server, uint16_t port, const char *cid, const char *user, const char *pass); int8_t connect(void); - uint16_t readPacket(uint8_t *buffer, uint8_t maxlen, int16_t timeout); + uint16_t readPacket(uint8_t *buffer, uint8_t maxlen, int16_t timeout, boolean checkForValidPubPacket = false); int32_t close(void); boolean publish(const char *topic, char *payload, uint8_t qos); boolean ping(uint8_t time); + boolean subscribe(Adafruit_MQTT_Subscribe *sub); + + Adafruit_MQTT_Subscribe *readSubscription(int16_t timeout=0); + private: Adafruit_CC3000 *cc3000; Adafruit_CC3000_Client mqttclient;