Refactor Adafruit_MQTT to have a simple interface for packet sending & receiving that subclasses implement.

This commit is contained in:
Tony DiCola 2015-06-04 21:16:08 -07:00
parent ae0b4778a1
commit ac74ef4fd6
4 changed files with 362 additions and 323 deletions

View File

@ -1,10 +1,57 @@
#include "Adafruit_MQTT.h" #include "Adafruit_MQTT.h"
Adafruit_MQTT::Adafruit_MQTT(const char *server, uint16_t port, const PROGMEM char *cid, const PROGMEM char *user, const PROGMEM char *pass) { void printBuffer(uint8_t *buffer, uint8_t len) {
for (uint8_t i=0; i<len; i++) {
if (isprint(buffer[i]))
DEBUG_PRINTER.write(buffer[i]);
else
DEBUG_PRINTER.print(" ");
DEBUG_PRINTER.print(F(" [0x"));
if (buffer[i] < 0x10)
DEBUG_PRINTER.print("0");
DEBUG_PRINTER.print(buffer[i],HEX);
DEBUG_PRINTER.print("], ");
if (i % 8 == 7) DEBUG_PRINTER.println();
}
DEBUG_PRINTER.println();
}
static uint8_t *stringprint(uint8_t *p, char *s) {
uint16_t len = strlen(s);
p[0] = len >> 8; p++;
p[0] = len & 0xFF; p++;
memcpy(p, s, len);
return p+len;
}
static uint8_t *stringprint_P(uint8_t *p, const char *s, uint16_t maxlen=0) {
// If maxlen is specified (has a non-zero value) then use it as the maximum
// length of the source string to write to the buffer. Otherwise write
// the entire source string.
uint16_t len = strlen_P(s);
if (maxlen > 0 && len > maxlen) {
len = maxlen;
}
/*
for (uint8_t i=0; i<len; i++) {
Serial.write(pgm_read_byte(s+i));
}
*/
p[0] = len >> 8; p++;
p[0] = len & 0xFF; p++;
strncpy_P((char *)p, s, len);
return p+len;
}
// Adafruit_MQTT Definition ////////////////////////////////////////////////////
Adafruit_MQTT::Adafruit_MQTT(const char *server, uint16_t port,
const PROGMEM char *cid, const PROGMEM char *user,
const PROGMEM char *pass) {
servername = server; servername = server;
portnum = port; portnum = port;
serverip = 0;
clientid = cid; clientid = cid;
username = user; username = user;
password = pass; password = pass;
@ -32,34 +79,169 @@ Adafruit_MQTT::Adafruit_MQTT(char *server, uint16_t port, char *cid, char *user,
} }
*/ */
uint8_t Adafruit_MQTT::pingPacket(uint8_t *packet) { int8_t Adafruit_MQTT::connect() {
packet[0] = MQTT_CTRL_PINGREQ << 4; // Connect to the server.
packet[1] = 0; if (!connectServer())
return 2; return -1;
}
// Construct and send connect packet.
uint8_t len = connectPacket(buffer);
if (!sendPacket(buffer, len))
return -1;
static uint8_t *stringprint(uint8_t *p, char *s) { // Read connect response packet and verify it
uint16_t len = strlen(s); len = readPacket(buffer, 4, CONNECT_TIMEOUT_MS);
p[0] = len >> 8; p++; if (len != 4)
p[0] = len & 0xFF; p++; return -1;
memcpy(p, s, len); if ((buffer[0] != (MQTT_CTRL_CONNECTACK << 4)) || (buffer[1] != 2))
return p+len; return -1;
} if (buffer[3] != 0)
return buffer[3];
static uint8_t *stringprint_P(uint8_t *p, const char *s) { // Setup subscriptions once connected.
uint16_t len = strlen_P(s); for (uint8_t i=0; i<MAXSUBSCRIPTIONS; i++) {
/* // Ignore subscriptions that aren't defined.
for (uint8_t i=0; i<len; i++) { if (subscriptions[i] == 0) continue;
Serial.write(pgm_read_byte(s+i));
// Construct and send subscription packet.
uint8_t len = subscribePacket(buffer, subscriptions[i]->topic, subscriptions[i]->qos);
if (!sendPacket(buffer, len))
return -1;
// Get SUBACK
len = readPacket(buffer, 5, CONNECT_TIMEOUT_MS);
DEBUG_PRINT(F("SUBACK:\t"));
DEBUG_PRINTBUFFER(buffer, len);
if ((len != 5) || (buffer[0] != (MQTT_CTRL_SUBACK << 4))) {
return 6; // failure to subscribe
}
} }
*/
p[0] = len >> 8; p++; return 0;
p[0] = len & 0xFF; p++;
strncpy_P((char *)p, s, len);
return p+len;
} }
// http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718028 bool Adafruit_MQTT::publish(const char *topic, char *data, uint8_t qos) {
// Construct and send publish packet.
uint8_t len = publishPacket(buffer, topic, data, qos);
if (!sendPacket(buffer, len))
return false;
// If QOS level is high enough verify the response packet.
if (qos > 0) {
len = readPacket(buffer, 4, PUBLISH_TIMEOUT_MS);
DEBUG_PRINT(F("Publish QOS1+ reply:\t"));
DEBUG_PRINTBUFFER(buffer, len);
//TODO: Verify response packet?
}
return true;
}
bool Adafruit_MQTT::subscribe(Adafruit_MQTT_Subscribe *sub) {
uint8_t i;
// see if we are already subscribed
for (i=0; i<MAXSUBSCRIPTIONS; i++) {
if (subscriptions[i] == sub) {
DEBUG_PRINTLN(F("Already subscribed"));
break;
}
}
if (i==MAXSUBSCRIPTIONS) { // add to subscriptionlist
for (i=0; i<MAXSUBSCRIPTIONS; i++) {
if (subscriptions[i] == 0) {
DEBUG_PRINT(F("Added sub ")); DEBUG_PRINTLN(i);
subscriptions[i] = sub;
break;
}
}
}
if (i==MAXSUBSCRIPTIONS) {
DEBUG_PRINTLN(F("no more subscription space :("));
return false;
}
}
Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) {
uint8_t i, topiclen, datalen;
// Check if data is available to read.
uint16_t len = readPacket(buffer, MAXBUFFERSIZE, timeout, true); // return one full packet
if (!len)
return NULL; // No data available, just quit.
DEBUG_PRINTBUFFER(buffer, len);
// Parse out length of packet.
topiclen = buffer[3];
DEBUG_PRINT(F("Looking for subscription len ")); DEBUG_PRINTLN(topiclen);
// Find subscription associated with this packet.
for (i=0; i<MAXSUBSCRIPTIONS; i++) {
if (subscriptions[i]) {
// Skip this subscription if its name length isn't the same as the
// received topic name.
if (strlen_P(subscriptions[i]->topic) != topiclen)
continue;
// Stop if the subscription topic matches the received topic. Be careful
// to make comparison case insensitive.
if (strncasecmp_P((char*)buffer+4, subscriptions[i]->topic, topiclen) == 0) {
DEBUG_PRINTLN(subscriptions[i]->topic);
DEBUG_PRINT(F("Found sub #")); DEBUG_PRINTLN(i);
break;
}
// bool flag = true;
// // TODO: REPLACE WITH MEMCMP?
// for (uint8_t k=0; k<topiclen; k++) {
// if ( buffer[4+k] != pgm_read_byte(subscriptions[i]->topic+k) )
// flag = false;
// }
// bool flag = strcmp_P()
// if (flag) {
// DEBUG_PRINTLN_VERBOSE((char *)buffer+4);
// DEBUG_PRINT_VERBOSE(F("Found sub #")); DEBUG_PRINTLN_VERBOSE(i);
// break;
// }
}
}
if (i==MAXSUBSCRIPTIONS) return NULL; // matching sub not found ???
// zero out the old data
memset(subscriptions[i]->lastread, 0, SUBSCRIPTIONDATALEN);
datalen = len - topiclen - 4;
if (datalen > SUBSCRIPTIONDATALEN) {
datalen = SUBSCRIPTIONDATALEN-1; // cut it off
}
// extract out just the data, into the subscription object itself
memcpy(subscriptions[i]->lastread, buffer+4+topiclen, datalen);
DEBUG_PRINT(F("Data len: ")); DEBUG_PRINTLN(datalen);
DEBUG_PRINT(F("Data: ")); DEBUG_PRINTLN((char *)subscriptions[i]->lastread);
// return the valid matching subscription
return subscriptions[i];
}
bool Adafruit_MQTT::ping(uint8_t times) {
while (times) {
// Construct and send ping packet.
uint8_t len = pingPacket(buffer);
if (!sendPacket(buffer, len))
return false;
// Process ping reply.
len = readPacket(buffer, 2, PING_TIMEOUT_MS);
if (buffer[0] == (MQTT_CTRL_PINGRESP << 4))
return true;
}
return false;
}
// Packet Generation Functions /////////////////////////////////////////////////
// The current MQTT spec is 3.1.1 and available here:
// http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718028
// However this connect packet and code follows the MQTT 3.1 spec here (some
// small differences in the protocol):
// http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#connect
uint8_t Adafruit_MQTT::connectPacket(uint8_t *packet) { uint8_t Adafruit_MQTT::connectPacket(uint8_t *packet) {
uint8_t *p = packet; uint8_t *p = packet;
uint16_t len; uint16_t len;
@ -69,7 +251,7 @@ uint8_t Adafruit_MQTT::connectPacket(uint8_t *packet) {
p+=2; p+=2;
// fill in packet[1] last // fill in packet[1] last
p = stringprint(p, "MQIsdp"); p = stringprint_P(p, PSTR("MQIsdp"));
p[0] = MQTT_PROTOCOL_LEVEL; p[0] = MQTT_PROTOCOL_LEVEL;
p++; p++;
@ -87,7 +269,7 @@ uint8_t Adafruit_MQTT::connectPacket(uint8_t *packet) {
p[0] = MQTT_CONN_KEEPALIVE & 0xFF; p[0] = MQTT_CONN_KEEPALIVE & 0xFF;
p++; p++;
p = stringprint_P(p, clientid); p = stringprint_P(p, clientid, 23); // Limit client ID to first 23 characters.
if (pgm_read_byte(username) != 0) { if (pgm_read_byte(username) != 0) {
p = stringprint_P(p, username); p = stringprint_P(p, username);
@ -99,11 +281,13 @@ uint8_t Adafruit_MQTT::connectPacket(uint8_t *packet) {
len = p - packet; len = p - packet;
packet[1] = len-2; // don't include the 2 bytes of fixed header data packet[1] = len-2; // don't include the 2 bytes of fixed header data
DEBUG_PRINTLN(F("MQTT connect packet:"));
DEBUG_PRINTBUFFER(buffer, len);
return len; return len;
} }
uint8_t Adafruit_MQTT::publishPacket(uint8_t *packet, const char *topic, char *data, uint8_t qos) { uint8_t Adafruit_MQTT::publishPacket(uint8_t *packet, const char *topic,
char *data, uint8_t qos) {
uint8_t *p = packet; uint8_t *p = packet;
uint16_t len; uint16_t len;
@ -117,10 +301,13 @@ uint8_t Adafruit_MQTT::publishPacket(uint8_t *packet, const char *topic, char *d
p+=strlen(data); p+=strlen(data);
len = p - packet; len = p - packet;
packet[1] = len-2; // don't include the 2 bytes of fixed header data packet[1] = len-2; // don't include the 2 bytes of fixed header data
DEBUG_PRINTLN(F("MQTT publish packet:"));
DEBUG_PRINTBUFFER(buffer, len);
return len; return len;
} }
uint8_t Adafruit_MQTT::subscribePacket(uint8_t *packet, const char *topic, uint8_t qos) { uint8_t Adafruit_MQTT::subscribePacket(uint8_t *packet, const char *topic,
uint8_t qos) {
uint8_t *p = packet; uint8_t *p = packet;
uint16_t len; uint16_t len;
@ -140,19 +327,29 @@ uint8_t Adafruit_MQTT::subscribePacket(uint8_t *packet, const char *topic, uint8
len = p - packet; len = p - packet;
packet[1] = len-2; // don't include the 2 bytes of fixed header data packet[1] = len-2; // don't include the 2 bytes of fixed header data
DEBUG_PRINTLN(F("MQTT subscription packet:"));
DEBUG_PRINTBUFFER(buffer, len);
return len; return len;
} }
uint8_t Adafruit_MQTT::pingPacket(uint8_t *packet) {
packet[0] = MQTT_CTRL_PINGREQ << 4;
packet[1] = 0;
DEBUG_PRINTLN(F("MQTT ping packet:"));
DEBUG_PRINTBUFFER(buffer, 2);
return 2;
}
Adafruit_MQTT_Publish::Adafruit_MQTT_Publish(Adafruit_MQTT *mqttserver, const char *feed, uint8_t q) { // Adafruit_MQTT_Publish Definition ////////////////////////////////////////////
Adafruit_MQTT_Publish::Adafruit_MQTT_Publish(Adafruit_MQTT *mqttserver,
const char *feed, uint8_t q) {
mqtt = mqttserver; mqtt = mqttserver;
topic = feed; topic = feed;
qos = q; qos = q;
} }
bool Adafruit_MQTT_Publish::publish(int32_t i) { bool Adafruit_MQTT_Publish::publish(int32_t i) {
char payload[18]; char payload[18];
itoa(i, payload, 10); itoa(i, payload, 10);
@ -164,12 +361,16 @@ bool Adafruit_MQTT_Publish::publish(uint32_t i) {
itoa(i, payload, 10); itoa(i, payload, 10);
return mqtt->publish(topic, payload, qos); return mqtt->publish(topic, payload, qos);
} }
bool Adafruit_MQTT_Publish::publish(char *payload) { bool Adafruit_MQTT_Publish::publish(char *payload) {
return mqtt->publish(topic, payload, qos); return mqtt->publish(topic, payload, qos);
} }
Adafruit_MQTT_Subscribe::Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver, const char *feed, uint8_t q) { // Adafruit_MQTT_Subscribe Definition //////////////////////////////////////////
Adafruit_MQTT_Subscribe::Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver,
const char *feed, uint8_t q) {
mqtt = mqttserver; mqtt = mqttserver;
topic = feed; topic = feed;
qos = q; qos = q;

View File

@ -7,6 +7,23 @@
#include "WProgram.h" #include "WProgram.h"
#endif #endif
// Uncomment/comment to turn on/off debug output messages.
//#define MQTT_DEBUG
// Set where debug messages will be printed.
#define DEBUG_PRINTER Serial
// Define actual debug output functions when necessary.
#ifdef MQTT_DEBUG
#define DEBUG_PRINT(...) { DEBUG_PRINTER.print(__VA_ARGS__); }
#define DEBUG_PRINTLN(...) { DEBUG_PRINTER.println(__VA_ARGS__); }
#define DEBUG_PRINTBUFFER(buffer, len) { printBuffer(buffer, len); }
#else
#define DEBUG_PRINT(...) {}
#define DEBUG_PRINTLN(...) {}
#define DEBUG_PRINTBUFFER(buffer, len) {}
#endif
#define MQTT_PROTOCOL_LEVEL 3 #define MQTT_PROTOCOL_LEVEL 3
#define MQTT_CTRL_CONNECT 0x01 #define MQTT_CTRL_CONNECT 0x01
@ -22,13 +39,12 @@
#define SERVERNAME_SIZE 25 #define SERVERNAME_SIZE 25
#define PASSWORD_SIZE 25 #define PASSWORD_SIZE 50 // Need to be able to store an AIO key which is 41 chars.
#define USERNAME_SIZE 41 #define USERNAME_SIZE 41
#define CLIENTID_SIZE 23 #define CLIENTID_SIZE 23
#define FEEDNAME_SIZE 40 #define FEEDNAME_SIZE 40
#define CONNECT_TIMEOUT_MS 3000 #define CONNECT_TIMEOUT_MS 3000
#define PUBLISH_TIMEOUT_MS 500 #define PUBLISH_TIMEOUT_MS 500
#define PING_TIMEOUT_MS 500 #define PING_TIMEOUT_MS 500
@ -41,49 +57,90 @@
#define MQTT_CONN_CLEANSESSION 0x02 #define MQTT_CONN_CLEANSESSION 0x02
#define MQTT_CONN_KEEPALIVE 15 // in seconds #define MQTT_CONN_KEEPALIVE 15 // in seconds
#define MAXBUFFERSIZE (85) #define MAXBUFFERSIZE (105) // Need to be able to store at least ~90 chars
// for a connect packet with full 23 char client ID.
#define MAXSUBSCRIPTIONS 5 #define MAXSUBSCRIPTIONS 5
#define SUBSCRIPTIONDATALEN 20 #define SUBSCRIPTIONDATALEN 20
//#define DEBUG_MQTT_CONNECT extern void printBuffer(uint8_t *buffer, uint8_t len);
//#define DEBUG_MQTT_SUBSCRIBE
//#define DEBUG_MQTT_READSUB
//#define DEBUG_MQTT_PUBLISH
//#define DEBUG_MQTT_PACKETREAD
class Adafruit_MQTT_Subscribe; // forward decl class Adafruit_MQTT_Subscribe; // forward decl
class Adafruit_MQTT { class Adafruit_MQTT {
public: public:
Adafruit_MQTT(const char *server, uint16_t port, const PROGMEM char *cid, const PROGMEM char *user, const PROGMEM char *pass); Adafruit_MQTT(const char *server, uint16_t port, const PROGMEM char *cid,
const PROGMEM char *user, const PROGMEM char *pass);
virtual ~Adafruit_MQTT() {}
uint8_t connectPacket(uint8_t *packet); // Connect to the MQTT server. Returns 0 on success, otherwise an error code
// that indicates something went wrong:
// -1 = Error connecting to server
// 1 = Wrong protocol
// 2 = ID rejected
// 3 = Server unavailable
// 4 = Bad username or password
// 5 = Not authenticated
// 6 = Failed to subscribe
int8_t connect();
virtual boolean publish(const char *topic, char *payload, uint8_t qos) {} // Disconnect from the MQTT server. Returns true if disconnected, false
uint8_t publishPacket(uint8_t *packet, const char *topic, char *payload, uint8_t qos); // otherwise.
virtual bool disconnect() = 0; // Subclasses need to fill this in!
virtual boolean ping(uint8_t t) {} // Publish a message to a topic using the specified QoS level. Returns true
uint8_t pingPacket(uint8_t *packet); // if the message was published, false otherwise.
bool publish(const char *topic, char *payload, uint8_t qos);
// Add a subscription to receive messages for a topic. Returns true if the
// subscription could be added, false otherwise.
bool subscribe(Adafruit_MQTT_Subscribe *sub);
virtual boolean subscribe(Adafruit_MQTT_Subscribe *sub) {} // Check if any subscriptions have new messages. Will return a reference to
uint8_t subscribePacket(uint8_t *packet, const char *topic, uint8_t qos); // an Adafruit_MQTT_Subscribe object which has a new message. Should be called
// in the sketch's loop function to ensure new messages are recevied. Note
// that subscribe should be called first for each topic that receives messages!
Adafruit_MQTT_Subscribe *readSubscription(int16_t timeout=0);
virtual Adafruit_MQTT_Subscribe *readSubscription(int16_t timeout = 0) {}; // Ping the server to ensure the connection is still alive. Returns true if
// successful, otherwise false.
bool ping(uint8_t t);
protected: protected:
int8_t errno; // Interface that subclasses need to implement:
// Connect to the server and return true if successful, false otherwise.
virtual bool connectServer() = 0;
// Send data to the server specified by the buffer and length of data.
virtual bool sendPacket(uint8_t *buffer, uint8_t len) = 0;
// Read MQTT packet from the server. Will read up to maxlen bytes and store
// the data in the provided buffer. Waits up to the specified timeout (in
// milliseconds) for data to be available. If checkForValidPubPacket is true
// then the received data is verified to make sure it's a complete packet.
virtual uint16_t readPacket(uint8_t *buffer, uint8_t maxlen, int16_t timeout,
bool checkForValidPubPacket = false) = 0;
// Shared state that subclasses can use:
const char *servername; const char *servername;
uint32_t serverip;
int16_t portnum; int16_t portnum;
const char *clientid; const char *clientid;
const char *username; const char *username;
const char *password; const char *password;
Adafruit_MQTT_Subscribe *subscriptions[MAXSUBSCRIPTIONS];
uint8_t buffer[MAXBUFFERSIZE]; uint8_t buffer[MAXBUFFERSIZE];
private:
Adafruit_MQTT_Subscribe *subscriptions[MAXSUBSCRIPTIONS];
// Functions to generate MQTT packets.
uint8_t connectPacket(uint8_t *packet);
uint8_t publishPacket(uint8_t *packet, const char *topic, char *payload, uint8_t qos);
uint8_t subscribePacket(uint8_t *packet, const char *topic, uint8_t qos);
uint8_t pingPacket(uint8_t *packet);
}; };
class Adafruit_MQTT_Publish { class Adafruit_MQTT_Publish {
public: public:
Adafruit_MQTT_Publish(Adafruit_MQTT *mqttserver, const char *feed, uint8_t qos = 0); Adafruit_MQTT_Publish(Adafruit_MQTT *mqttserver, const char *feed, uint8_t qos = 0);
@ -114,4 +171,4 @@ class Adafruit_MQTT_Subscribe {
}; };
#endif /* header guard */ #endif

View File

@ -1,29 +1,9 @@
#include <Adafruit_SleepyDog.h>
#include "Adafruit_MQTT.h" #include "Adafruit_MQTT.h"
#include "Adafruit_MQTT_CC3000.h" #include "Adafruit_MQTT_CC3000.h"
#include <Adafruit_SleepyDog.h>
static void printBuffer(uint8_t *buffer, uint8_t len) {
for (uint8_t i=0; i<len; i++) {
if (isprint(buffer[i]))
Serial.write(buffer[i]);
else
Serial.print(" ");
Serial.print(F(" [0x"));
if (buffer[i] < 0x10)
Serial.print("0");
Serial.print(buffer[i],HEX);
Serial.print("], ");
if (i % 8 == 7) Serial.println();
}
Serial.println();
}
Adafruit_MQTT_CC3000::Adafruit_MQTT_CC3000(Adafruit_CC3000 *cc3k, const char *server, uint16_t port, const char *cid, const char *user, const char *pass) : Adafruit_MQTT(server, port, cid, user, pass), cc3000(cc3k) bool Adafruit_MQTT_CC3000::connectServer(void) {
{
// nothin doin
}
int8_t Adafruit_MQTT_CC3000::connect(void) {
uint32_t ip = 0; uint32_t ip = 0;
Watchdog.reset(); Watchdog.reset();
@ -38,12 +18,12 @@ int8_t Adafruit_MQTT_CC3000::connect(void) {
Watchdog.reset(); Watchdog.reset();
while (ip == 0) { while (ip == 0) {
if (! cc3000->getHostByName((char *)buffer, &ip)) { if (! cc3000->getHostByName((char *)buffer, &ip)) {
Serial.println(F("Couldn't resolve!")); Serial.println(F("Couldn't resolve!"));
dnsretries--; dnsretries--;
Watchdog.reset(); Watchdog.reset();
} }
//Serial.println("OK"); Serial.println(ip, HEX); //Serial.println("OK"); Serial.println(ip, HEX);
if (!dnsretries) return -1; if (!dnsretries) return false;
delay(500); delay(500);
} }
@ -55,107 +35,46 @@ int8_t Adafruit_MQTT_CC3000::connect(void) {
Watchdog.reset(); Watchdog.reset();
// connect to server // connect to server
#ifdef DEBUG_MQTT_CONNECT DEBUG_PRINTLN(F("Connecting to TCP"));
Serial.println(F("Connecting to TCP"));
#endif
mqttclient = cc3000->connectTCP(serverip, portnum); mqttclient = cc3000->connectTCP(serverip, portnum);
uint8_t len = connectPacket(buffer);
#ifdef DEBUG_MQTT_CONNECT
Serial.println(F("MQTT connection packet:")); printBuffer(buffer, len);
#endif
if (mqttclient.connected()) {
uint16_t ret = mqttclient.write(buffer, len);
#ifdef DEBUG_MQTT_CONNECT
Serial.print("returned: "); Serial.println(ret);
#endif
if (ret != len) return -1;
} else {
#ifdef DEBUG_MQTT_CONNECT
Serial.println(F("Connection failed"));
#endif
return -1;
}
len = readPacket(buffer, 4, CONNECT_TIMEOUT_MS); return mqttclient.connected();
if (len != 4) return -1;
if ((buffer[0] != (MQTT_CTRL_CONNECTACK << 4)) || (buffer[1] != 2)) {
return -1;
}
if (buffer[3] != 0) return buffer[3];
/**************** subscription time! */
for (uint8_t i=0; i<MAXSUBSCRIPTIONS; i++) {
if (subscriptions[i] == 0) continue;
#ifdef DEBUG_MQTT_CONNECT
Serial.print(F("Subscribing..."));
#endif
uint8_t len = subscribePacket(buffer, subscriptions[i]->topic, subscriptions[i]->qos);
#ifdef DEBUG_MQTT_CONNECT
Serial.println(F("MQTT subscription packet:")); printBuffer(buffer, len);
#endif
if (mqttclient.connected()) {
uint16_t ret = mqttclient.write(buffer, len);
#ifdef DEBUG_MQTT_CONNECT
Serial.print("returned: "); Serial.println(ret);
#endif
if (ret != len) return -1;
} else {
#ifdef DEBUG_MQTT_CONNECT
Serial.println(F("Connection failed"));
#endif
return -1;
}
// Get SUBACK
len = readPacket(buffer, 5, CONNECT_TIMEOUT_MS);
#ifdef DEBUG_MQTT_CONNECT
Serial.print(F("SUBACK:\t")); printBuffer(buffer, len);
#endif
if ((len != 5) || (buffer[0] != (MQTT_CTRL_SUBACK << 4))) {
return 6; // failure to subscribe
}
}
return 0;
} }
uint16_t Adafruit_MQTT_CC3000::readPacket(uint8_t *buffer, uint8_t maxlen, int16_t timeout, boolean checkForValidPubPacket) { bool Adafruit_MQTT_CC3000::disconnect(void) {
return (mqttclient.close() == 0);
}
uint16_t Adafruit_MQTT_CC3000::readPacket(uint8_t *buffer, uint8_t maxlen,
int16_t timeout,
bool checkForValidPubPacket) {
/* Read data until either the connection is closed, or the idle timeout is reached. */ /* Read data until either the connection is closed, or the idle timeout is reached. */
uint16_t len = 0; uint16_t len = 0;
int16_t t = timeout; int16_t t = timeout;
while (mqttclient.connected() && (timeout >= 0)) { while (mqttclient.connected() && (timeout >= 0)) {
//Serial.print('.'); //DEBUG_PRINT('.');
while (mqttclient.available()) { while (mqttclient.available()) {
//Serial.print('!'); //DEBUG_PRINT('!');
char c = mqttclient.read(); char c = mqttclient.read();
timeout = t; // reset the timeout timeout = t; // reset the timeout
buffer[len] = c; buffer[len] = c;
//Serial.print((uint8_t)c,HEX); //DEBUG_PRINTLN((uint8_t)c, HEX);
len++; len++;
if (len == maxlen) { // we read all we want, bail if (len == maxlen) { // we read all we want, bail
#ifdef DEBUG_MQTT_PACKETREAD DEBUG_PRINT(F("Read packet:\t"));
Serial.print(F("Read packet:\t")); printBuffer(buffer, len); DEBUG_PRINTBUFFER(buffer, len);
#endif return len;
return len;
} }
// special case where we just one one publication packet at a time // special case where we just one one publication packet at a time
if (checkForValidPubPacket) { if (checkForValidPubPacket) {
if ((buffer[0] == (MQTT_CTRL_PUBLISH << 4)) && (buffer[1] == len-2)) { if ((buffer[0] == (MQTT_CTRL_PUBLISH << 4)) && (buffer[1] == len-2)) {
// oooh a valid publish packet! // oooh a valid publish packet!
#ifdef DEBUG_MQTT_PACKETREAD DEBUG_PRINT(F("Read PUBLISH packet:\t"));
Serial.print(F("PUBLISH packet:\t")); printBuffer(buffer, len); DEBUG_PRINTBUFFER(buffer, len);
#endif return len;
return len; }
}
} }
} }
Watchdog.reset(); Watchdog.reset();
@ -165,154 +84,17 @@ uint16_t Adafruit_MQTT_CC3000::readPacket(uint8_t *buffer, uint8_t maxlen, int16
return len; return len;
} }
boolean Adafruit_MQTT_CC3000::ping(uint8_t times) { bool Adafruit_MQTT_CC3000::sendPacket(uint8_t *buffer, uint8_t len) {
while (times) {
uint8_t len = pingPacket(buffer);
Serial.print(F("Sending:\t")); printBuffer(buffer, len);
if (mqttclient.connected()) {
uint16_t ret = mqttclient.write(buffer, len);
//Serial.print("returned: "); Serial.println(ret);
if (ret != len) return false;
} else {
Serial.println(F("Connection failed"));
return false;
}
// process ping reply
len = readPacket(buffer, 2, PING_TIMEOUT_MS);
if (buffer[0] == (MQTT_CTRL_PINGRESP << 4))
return true;
}
return false;
}
int32_t Adafruit_MQTT_CC3000::close(void) {
return mqttclient.close();
}
boolean Adafruit_MQTT_CC3000::publish(const char *topic, char *data, uint8_t qos) {
uint8_t len = publishPacket(buffer, topic, data, qos);
#ifdef DEBUG_MQTT_PUBLISH
Serial.println(F("MQTT publish packet:")); printBuffer(buffer, len);
#endif
if (mqttclient.connected()) { if (mqttclient.connected()) {
uint16_t ret = mqttclient.write(buffer, len); uint16_t ret = mqttclient.write(buffer, len);
#ifdef DEBUG_MQTT_PUBLISH DEBUG_PRINT(F("sendPacket returned: ")); DEBUG_PRINTLN(ret);
Serial.print("returned: "); Serial.println(ret); if (ret != len) {
#endif DEBUG_PRINTLN("Failed to send complete packet.")
if (ret != len) return false; return false;
}
} else { } else {
#ifdef DEBUG_MQTT_PUBLISH DEBUG_PRINTLN(F("Connection failed!"));
Serial.println(F("Connection failed"));
#endif
return false; return false;
} }
return true;
if (qos > 0) {
len = readPacket(buffer, 4, PUBLISH_TIMEOUT_MS);
#ifdef DEBUG_MQTT_PUBLISH
Serial.print(F("Reply:\t")); printBuffer(buffer, len);
#endif
return true;
} else {
return true;
}
}
boolean Adafruit_MQTT_CC3000::subscribe(Adafruit_MQTT_Subscribe *sub) {
uint8_t i;
// see if we are already subscribed
for (i=0; i<MAXSUBSCRIPTIONS; i++) {
if (subscriptions[i] == sub) {
#ifdef DEBUG_MQTT_SUBSCRIBE
Serial.println(F("Already subscribed"));
#endif
break;
}
}
if (i==MAXSUBSCRIPTIONS) { // add to subscriptionlist
for (i=0; i<MAXSUBSCRIPTIONS; i++) {
if (subscriptions[i] == 0) {
#ifdef DEBUG_MQTT_SUBSCRIBE
Serial.print(F("Added sub ")); Serial.println(i);
#endif
subscriptions[i] = sub;
break;
}
}
}
if (i==MAXSUBSCRIPTIONS) {
#ifdef DEBUG_MQTT_SUBSCRIBE
Serial.println(F("no more space :("));
#endif
return false;
}
}
Adafruit_MQTT_Subscribe *Adafruit_MQTT_CC3000::readSubscription(int16_t timeout) {
uint8_t i, topiclen, datalen;
#ifdef DEBUG_MQTT_READSUB
Serial.println(F("reading..."));
#endif
uint16_t len = readPacket(buffer, MAXBUFFERSIZE, timeout, true); // return one full packet
#ifdef DEBUG_MQTT_READSUB
printBuffer(buffer, len);
#endif
if (!len) return NULL;
topiclen = buffer[3];
#ifdef DEBUG_MQTT_READSUB
Serial.print(F("Looking for subscription len ")); Serial.println(topiclen);
#endif
// figure out what subscription this is!
for (i=0; i<MAXSUBSCRIPTIONS; i++) {
if (subscriptions[i]) {
//Serial.print(i);
boolean flag = true;
// TODO: REPLACE WITH MEMCMP?
for (uint8_t k=0; k<topiclen; k++) {
if ( buffer[4+k] != pgm_read_byte(subscriptions[i]->topic+k) )
flag = false;
}
if (flag) {
#ifdef DEBUG_MQTT_READSUB
Serial.println((char *)buffer+4);
Serial.print(F("Found sub #")); Serial.println(i);
#endif
break;
}
}
}
if (i==MAXSUBSCRIPTIONS) return NULL; // matching sub not found ???
// zero out the old data
memset(subscriptions[i]->lastread, 0, SUBSCRIPTIONDATALEN);
datalen = len - topiclen - 4;
if (datalen > SUBSCRIPTIONDATALEN) {
datalen = SUBSCRIPTIONDATALEN-1; // cut it off
}
// extract out just the data, into the subscription object itself
memcpy(subscriptions[i]->lastread, buffer+4+topiclen, datalen);
#ifdef DEBUG_MQTT_READSUB
Serial.print(F("Data len: ")); Serial.println(datalen);
Serial.print("Data: "); Serial.println((char *)subscriptions[i]->lastread);
#endif
// return the valid matching subscription
return subscriptions[i];
} }

View File

@ -1,10 +1,8 @@
#ifndef _ADAFRUIT_MQTT_CC3000_H_ #ifndef _ADAFRUIT_MQTT_CC3000_H_
#define _ADAFRUIT_MQTT_CC3000_H_ #define _ADAFRUIT_MQTT_CC3000_H_
#include "Adafruit_MQTT.h"
#include "Adafruit_MQTT_CC3000.h"
#include <Adafruit_CC3000.h> #include <Adafruit_CC3000.h>
#include "Adafruit_MQTT.h"
// delay in ms between calls of available() // delay in ms between calls of available()
#define MQTT_CC3000_INTERAVAILDELAY 10 #define MQTT_CC3000_INTERAVAILDELAY 10
@ -12,19 +10,20 @@
class Adafruit_MQTT_CC3000 : public Adafruit_MQTT { class Adafruit_MQTT_CC3000 : public Adafruit_MQTT {
public: public:
Adafruit_MQTT_CC3000(Adafruit_CC3000 *cc3k, const char *server, uint16_t port, const char *cid, const char *user, const char *pass); Adafruit_MQTT_CC3000(Adafruit_CC3000 *cc3k, const char *server, uint16_t port,
int8_t connect(void); const char *cid, const char *user, const char *pass):
uint16_t readPacket(uint8_t *buffer, uint8_t maxlen, int16_t timeout, boolean checkForValidPubPacket = false); Adafruit_MQTT(server, port, cid, user, pass),
int32_t close(void); cc3000(cc3k)
{}
boolean publish(const char *topic, char *payload, uint8_t qos); bool connectServer();
boolean ping(uint8_t time); bool disconnect();
uint16_t readPacket(uint8_t *buffer, uint8_t maxlen, int16_t timeout,
boolean subscribe(Adafruit_MQTT_Subscribe *sub); bool checkForValidPubPacket = false);
bool sendPacket(uint8_t *buffer, uint8_t len);
Adafruit_MQTT_Subscribe *readSubscription(int16_t timeout=0);
private: private:
uint32_t serverip;
Adafruit_CC3000 *cc3000; Adafruit_CC3000 *cc3000;
Adafruit_CC3000_Client mqttclient; Adafruit_CC3000_Client mqttclient;
}; };