diff --git a/Adafruit_MQTT.cpp b/Adafruit_MQTT.cpp index c319384..7a87f16 100644 --- a/Adafruit_MQTT.cpp +++ b/Adafruit_MQTT.cpp @@ -26,7 +26,7 @@ void printBuffer(uint8_t *buffer, uint8_t len) { for (uint8_t i=0; itopic, subscriptions[i]->qos); if (!sendPacket(buffer, len)) return -1; - - // Get SUBACK - len = readPacket(buffer, 5, CONNECT_TIMEOUT_MS); - DEBUG_PRINT(F("SUBACK:\t")); - DEBUG_PRINTBUFFER(buffer, len); - if ((len != 5) || (buffer[0] != (MQTT_CTRL_SUBACK << 4))) { - return 6; // failure to subscribe - } + + // Check for SUBACK if using MQTT 3.1.1 or higher + // TODO: The Server is permitted to start sending PUBLISH packets matching the + // Subscription before the Server sends the SUBACK Packet. + // if(MQTT_PROTOCOL_LEVEL > 3) { + // len = readPacket(buffer, 5, CONNECT_TIMEOUT_MS); + // DEBUG_PRINT(F("SUBACK:\t")); + // DEBUG_PRINTBUFFER(buffer, len); + // if ((len != 5) || (buffer[0] != (MQTT_CTRL_SUBACK << 4))) { + // return 6; // failure to subscribe + // } + // } + } return 0; @@ -169,12 +209,24 @@ const __FlashStringHelper* Adafruit_MQTT::connectErrorString(int8_t code) { } } +bool Adafruit_MQTT::disconnect() { + + // Construct and send disconnect packet. + uint8_t len = disconnectPacket(buffer); + if (! sendPacket(buffer, len)) + DEBUG_PRINTLN(F("Unable to send disconnect packet")); + + return disconnectServer(); + +} + + bool Adafruit_MQTT::publish(const char *topic, const char *data, uint8_t qos) { // Construct and send publish packet. uint8_t len = publishPacket(buffer, topic, data, qos); if (!sendPacket(buffer, len)) return false; - + // If QOS level is high enough verify the response packet. if (qos > 0) { len = readPacket(buffer, 4, PUBLISH_TIMEOUT_MS); @@ -186,6 +238,22 @@ bool Adafruit_MQTT::publish(const char *topic, const char *data, uint8_t qos) { return true; } +bool Adafruit_MQTT::will(const char *topic, const char *payload, uint8_t qos, uint8_t retain) { + + if (connected()) { + DEBUG_PRINT(F("Will defined after connect")); + return false; + } + + will_topic = topic; + will_payload = payload; + will_qos = qos; + will_retain = retain; + + return true; + +} + bool Adafruit_MQTT::subscribe(Adafruit_MQTT_Subscribe *sub) { uint8_t i; // see if we are already subscribed @@ -209,6 +277,50 @@ bool Adafruit_MQTT::subscribe(Adafruit_MQTT_Subscribe *sub) { return false; } +bool Adafruit_MQTT::unsubscribe(Adafruit_MQTT_Subscribe *sub) { + uint8_t i; + + // see if we are already subscribed + for (i=0; itopic); + + // sending unsubscribe failed + if (! sendPacket(buffer, len)) + return false; + + // if QoS for this subscription is 1 or 2, we need + // to wait for the unsuback to confirm unsubscription + if(subscriptions[i]->qos > 0 && MQTT_PROTOCOL_LEVEL > 3) { + + // wait for UNSUBACK + len = readPacket(buffer, 5, CONNECT_TIMEOUT_MS); + DEBUG_PRINT(F("UNSUBACK:\t")); + DEBUG_PRINTBUFFER(buffer, len); + + if ((len != 5) || (buffer[0] != (MQTT_CTRL_UNSUBACK << 4))) { + return false; // failure to unsubscribe + } + + } + + subscriptions[i] = 0; + return true; + + } + + } + + // subscription not found, so we are unsubscribed + return true; + +} + Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) { uint8_t i, topiclen, datalen; @@ -233,7 +345,7 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) { // to make comparison case insensitive. if (strncasecmp_P((char*)buffer+4, subscriptions[i]->topic, topiclen) == 0) { DEBUG_PRINT(F("Found sub #")); DEBUG_PRINTLN(i); - break; + break; } } } @@ -256,18 +368,27 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) { return subscriptions[i]; } -bool Adafruit_MQTT::ping(uint8_t times) { - while (times) { +void Adafruit_MQTT::flushIncoming(uint16_t timeout) { + // flush input! + DEBUG_PRINTLN(F("Flushing input buffer")); + while (readPacket(buffer, MAXBUFFERSIZE, timeout)); +} + +bool Adafruit_MQTT::ping(uint8_t num) { + flushIncoming(100); + + while (num--) { // Construct and send ping packet. uint8_t len = pingPacket(buffer); if (!sendPacket(buffer, len)) - return false; + continue; // Process ping reply. len = readPacket(buffer, 2, PING_TIMEOUT_MS); if (buffer[0] == (MQTT_CTRL_PINGRESP << 4)) - return true; + return true; } + return false; } @@ -287,31 +408,64 @@ uint8_t Adafruit_MQTT::connectPacket(uint8_t *packet) { p+=2; // fill in packet[1] last -#if (MQTT_PROTOCOL_LEVEL == 4) - p = stringprint_P(p, PSTR("MQTT")); -#elif (MQTT_PROTOCOL_LEVEL == 3) - p = stringprint_P(p, PSTR("MQIsdp")); +#if MQTT_PROTOCOL_LEVEL == 3 + p = stringprint_P(p, PSTR("MQIsdp")); +#elif MQTT_PROTOCOL_LEVEL == 4 + p = stringprint_P(p, PSTR("MQTT")); #else - #error "No MQTT version selected!" + #error "MQTT level not supported" #endif p[0] = MQTT_PROTOCOL_LEVEL; p++; + // always clean the session p[0] = MQTT_CONN_CLEANSESSION; + + // set the will flags if needed + if (will_topic && pgm_read_byte(will_topic) != 0) { + + p[0] |= MQTT_CONN_WILLFLAG; + + if(will_qos == 1) + p[0] |= MQTT_CONN_WILLQOS_1; + else if(will_qos == 2) + p[0] |= MQTT_CONN_WILLQOS_2; + + if(will_retain == 1) + p[0] |= MQTT_CONN_WILLRETAIN; + + } + if (pgm_read_byte(username) != 0) p[0] |= MQTT_CONN_USERNAMEFLAG; if (pgm_read_byte(password) != 0) p[0] |= MQTT_CONN_PASSWORDFLAG; p++; - // TODO: add WILL support? p[0] = MQTT_CONN_KEEPALIVE >> 8; p++; p[0] = MQTT_CONN_KEEPALIVE & 0xFF; p++; - p = stringprint_P(p, clientid, 23); // Limit client ID to first 23 characters. + if(MQTT_PROTOCOL_LEVEL == 3) { + p = stringprint_P(p, clientid, 23); // Limit client ID to first 23 characters. + } else { + if (pgm_read_byte(clientid) != 0) { + p = stringprint_P(p, clientid); + } else { + p[0] = 0x0; + p++; + p[0] = 0x0; + p++; + DEBUG_PRINTLN(F("SERVER GENERATING CLIENT ID")); + } + } + + if (will_topic && pgm_read_byte(will_topic) != 0) { + p = stringprint_P(p, will_topic); + p = stringprint_P(p, will_payload); + } if (pgm_read_byte(username) != 0) { p = stringprint_P(p, username); @@ -337,6 +491,16 @@ uint8_t Adafruit_MQTT::publishPacket(uint8_t *packet, const char *topic, // fill in packet[1] last p+=2; + // add packet identifier. used for checking PUBACK in QOS > 0 + if(qos > 0) { + p[0] = (packet_id_counter >> 8) & 0xFF; + p[1] = packet_id_counter & 0xFF; + p+=2; + + // increment the packet id + packet_id_counter++; + } + p = stringprint_P(p, topic); memcpy(p, data, strlen(data)); @@ -357,11 +521,14 @@ uint8_t Adafruit_MQTT::subscribePacket(uint8_t *packet, const char *topic, // fill in packet[1] last p+=2; - // put in a message id, - p[0] = 0xAD; - p[1] = 0xAF; + // packet identifier. used for checking SUBACK + p[0] = (packet_id_counter >> 8) & 0xFF; + p[1] = packet_id_counter & 0xFF; p+=2; + // increment the packet id + packet_id_counter++; + p = stringprint_P(p, topic); p[0] = qos; @@ -374,6 +541,33 @@ uint8_t Adafruit_MQTT::subscribePacket(uint8_t *packet, const char *topic, return len; } +uint8_t Adafruit_MQTT::unsubscribePacket(uint8_t *packet, const char *topic) { + + uint8_t *p = packet; + uint16_t len; + + p[0] = MQTT_CTRL_UNSUBSCRIBE << 4 | 0x1; + // fill in packet[1] last + p+=2; + + // packet identifier. used for checking UNSUBACK + p[0] = (packet_id_counter >> 8) & 0xFF; + p[1] = packet_id_counter & 0xFF; + p+=2; + + // increment the packet id + packet_id_counter++; + + p = stringprint_P(p, topic); + + len = p - packet; + packet[1] = len-2; // don't include the 2 bytes of fixed header data + DEBUG_PRINTLN(F("MQTT unsubscription packet:")); + DEBUG_PRINTBUFFER(buffer, len); + return len; + +} + uint8_t Adafruit_MQTT::pingPacket(uint8_t *packet) { packet[0] = MQTT_CTRL_PINGREQ << 4; packet[1] = 0; @@ -382,6 +576,13 @@ uint8_t Adafruit_MQTT::pingPacket(uint8_t *packet) { return 2; } +uint8_t Adafruit_MQTT::disconnectPacket(uint8_t *packet) { + packet[0] = MQTT_CTRL_DISCONNECT << 4; + packet[1] = 0; + DEBUG_PRINTLN(F("MQTT disconnect packet:")); + DEBUG_PRINTBUFFER(buffer, 2); + return 2; +} // Adafruit_MQTT_Publish Definition //////////////////////////////////////////// diff --git a/Adafruit_MQTT.h b/Adafruit_MQTT.h index 9d9fc71..7bba0a4 100644 --- a/Adafruit_MQTT.h +++ b/Adafruit_MQTT.h @@ -48,20 +48,27 @@ // Use 3 (MQTT 3.0) or 4 (MQTT 3.1.1) #define MQTT_PROTOCOL_LEVEL 4 -#define MQTT_CTRL_CONNECT 0x01 -#define MQTT_CTRL_CONNECTACK 0x02 -#define MQTT_CTRL_PUBLISH 0x03 -#define MQTT_CTRL_SUBSCRIBE 0x08 -#define MQTT_CTRL_SUBACK 0x09 -#define MQTT_CTRL_PINGREQ 0x0C -#define MQTT_CTRL_PINGRESP 0x0D +#define MQTT_CTRL_CONNECT 0x1 +#define MQTT_CTRL_CONNECTACK 0x2 +#define MQTT_CTRL_PUBLISH 0x3 +#define MQTT_CTRL_PUBACK 0x4 +#define MQTT_CTRL_PUBREC 0x5 +#define MQTT_CTRL_PUBREL 0x6 +#define MQTT_CTRL_PUBCOMP 0x7 +#define MQTT_CTRL_SUBSCRIBE 0x8 +#define MQTT_CTRL_SUBACK 0x9 +#define MQTT_CTRL_UNSUBSCRIBE 0xA +#define MQTT_CTRL_UNSUBACK 0xB +#define MQTT_CTRL_PINGREQ 0xC +#define MQTT_CTRL_PINGRESP 0xD +#define MQTT_CTRL_DISCONNECT 0xE #define MQTT_QOS_1 0x1 #define MQTT_QOS_0 0x0 #define CONNECT_TIMEOUT_MS 3000 #define PUBLISH_TIMEOUT_MS 500 -#define PING_TIMEOUT_MS 500 +#define PING_TIMEOUT_MS 500 // Adjust as necessary, in seconds. Default to 5 minutes. #define MQTT_CONN_KEEPALIVE 300 @@ -71,15 +78,15 @@ // 23 char client ID. #define MAXBUFFERSIZE (125) -#define MQTT_CONN_USERNAMEFLAG 0x80 -#define MQTT_CONN_PASSWORDFLAG 0x40 -#define MQTT_CONN_WILLRETAIN 0x20 -#define MQTT_CONN_WILLQOS 0x08 -#define MQTT_CONN_WILLFLAG 0x04 -#define MQTT_CONN_CLEANSESSION 0x02 +#define MQTT_CONN_USERNAMEFLAG 0x80 +#define MQTT_CONN_PASSWORDFLAG 0x40 +#define MQTT_CONN_WILLRETAIN 0x20 +#define MQTT_CONN_WILLQOS_1 0x08 +#define MQTT_CONN_WILLQOS_2 0x18 +#define MQTT_CONN_WILLFLAG 0x04 +#define MQTT_CONN_CLEANSESSION 0x02 -// how many subscriptions we want to be able to -// track +// how many subscriptions we want to be able to track #define MAXSUBSCRIPTIONS 5 // how much data we save in a subscription object @@ -93,10 +100,24 @@ class Adafruit_MQTT_Subscribe; // forward decl class Adafruit_MQTT { public: - Adafruit_MQTT(const char *server, uint16_t port, const char *cid, - const char *user, const char *pass); - Adafruit_MQTT(const __FlashStringHelper *server, uint16_t port, const __FlashStringHelper *cid, - const __FlashStringHelper *user, const __FlashStringHelper *pass); + Adafruit_MQTT(const char *server, + uint16_t port, + const char *cid, + const char *user, + const char *pass); + Adafruit_MQTT(const __FlashStringHelper *server, + uint16_t port, + const __FlashStringHelper *cid, + const __FlashStringHelper *user, + const __FlashStringHelper *pass); + Adafruit_MQTT(const char *server, + uint16_t port, + const char *user, + const char *pass); + Adafruit_MQTT(const __FlashStringHelper *server, + uint16_t port, + const __FlashStringHelper *user, + const __FlashStringHelper *pass); virtual ~Adafruit_MQTT() {} // Connect to the MQTT server. Returns 0 on success, otherwise an error code @@ -118,13 +139,20 @@ class Adafruit_MQTT { // Serial.println without any further processing. const __FlashStringHelper* connectErrorString(int8_t code); - // Disconnect from the MQTT server. Returns true if disconnected, false - // otherwise. - virtual bool disconnect() = 0; // Subclasses need to fill this in! + // Sends MQTT disconnect packet and calls disconnectServer() + bool disconnect(); // Return true if connected to the MQTT server, otherwise false. virtual bool connected() = 0; // Subclasses need to fill this in! + // Set MQTT last will topic, payload, QOS, and retain. This needs + // to be called before connect() because it is sent as part of the + // connect control packet. + bool will(const char *topic, const char *payload, uint8_t qos = 0, uint8_t retain = 0); + bool will(const __FlashStringHelper *topic, const char *payload, uint8_t qos = 0, uint8_t retain = 0) { + return will((const char *)topic, payload, qos, retain); + } + // Publish a message to a topic using the specified QoS level. Returns true // if the message was published, false otherwise. // The topic must be stored in PROGMEM. It can either be a @@ -140,15 +168,17 @@ class Adafruit_MQTT { // is made is not currently supported. bool subscribe(Adafruit_MQTT_Subscribe *sub); + // Unsubscribe from a previously subscribed MQTT topic. + bool unsubscribe(Adafruit_MQTT_Subscribe *sub); + // Check if any subscriptions have new messages. Will return a reference to // an Adafruit_MQTT_Subscribe object which has a new message. Should be called // in the sketch's loop function to ensure new messages are recevied. Note // that subscribe should be called first for each topic that receives messages! Adafruit_MQTT_Subscribe *readSubscription(int16_t timeout=0); - // Ping the server to ensure the connection is still alive. Returns true if - // successful, otherwise false. - bool ping(uint8_t t); + // Ping the server to ensure the connection is still alive. + bool ping(uint8_t n = 1); protected: // Interface that subclasses need to implement: @@ -156,6 +186,9 @@ class Adafruit_MQTT { // Connect to the server and return true if successful, false otherwise. virtual bool connectServer() = 0; + // Disconnect from the MQTT server. Returns true if disconnected, false otherwise. + virtual bool disconnectServer() = 0; // Subclasses need to fill this in! + // Send data to the server specified by the buffer and length of data. virtual bool sendPacket(uint8_t *buffer, uint8_t len) = 0; @@ -172,15 +205,24 @@ class Adafruit_MQTT { const char *clientid; const char *username; const char *password; + const char *will_topic; + const char *will_payload; + uint8_t will_qos; + uint8_t will_retain; uint8_t buffer[MAXBUFFERSIZE]; // one buffer, used for all incoming/outgoing + uint16_t packet_id_counter; private: Adafruit_MQTT_Subscribe *subscriptions[MAXSUBSCRIPTIONS]; + void flushIncoming(uint16_t timeout); + // Functions to generate MQTT packets. uint8_t connectPacket(uint8_t *packet); + uint8_t disconnectPacket(uint8_t *packet); uint8_t publishPacket(uint8_t *packet, const char *topic, const char *payload, uint8_t qos); uint8_t subscribePacket(uint8_t *packet, const char *topic, uint8_t qos); + uint8_t unsubscribePacket(uint8_t *packet, const char *topic); uint8_t pingPacket(uint8_t *packet); }; diff --git a/Adafruit_MQTT_CC3000.h b/Adafruit_MQTT_CC3000.h index a1f6ea6..277a0fb 100644 --- a/Adafruit_MQTT_CC3000.h +++ b/Adafruit_MQTT_CC3000.h @@ -43,6 +43,12 @@ class Adafruit_MQTT_CC3000 : public Adafruit_MQTT { cc3000(cc3k) {} + Adafruit_MQTT_CC3000(Adafruit_CC3000 *cc3k, const char *server, uint16_t port, + const char *user, const char *pass): + Adafruit_MQTT(server, port, user, pass), + cc3000(cc3k) + {} + bool connectServer() { uint32_t ip = 0; @@ -81,7 +87,7 @@ class Adafruit_MQTT_CC3000 : public Adafruit_MQTT { return mqttclient.connected(); } - bool disconnect() { + bool disconnectServer() { if (connected()) { return (mqttclient.close() == 0); } diff --git a/Adafruit_MQTT_Client.cpp b/Adafruit_MQTT_Client.cpp index e0686d9..6612c8f 100644 --- a/Adafruit_MQTT_Client.cpp +++ b/Adafruit_MQTT_Client.cpp @@ -33,7 +33,7 @@ bool Adafruit_MQTT_Client::connectServer() { return r != 0; } -bool Adafruit_MQTT_Client::disconnect() { +bool Adafruit_MQTT_Client::disconnectServer() { // Stop connection if connected and return success (stop has no indication of // failure). if (client->connected()) { diff --git a/Adafruit_MQTT_Client.h b/Adafruit_MQTT_Client.h index e78c81b..c9b4366 100644 --- a/Adafruit_MQTT_Client.h +++ b/Adafruit_MQTT_Client.h @@ -36,14 +36,19 @@ class Adafruit_MQTT_Client : public Adafruit_MQTT { public: Adafruit_MQTT_Client(Client *client, const char *server, uint16_t port, - const char *cid, const char *user, - const char *pass): + const char *cid, const char *user, const char *pass): Adafruit_MQTT(server, port, cid, user, pass), client(client) {} + Adafruit_MQTT_Client(Client *client, const char *server, uint16_t port, + const char *user, const char *pass): + Adafruit_MQTT(server, port, user, pass), + client(client) + {} + bool connectServer(); - bool disconnect(); + bool disconnectServer(); bool connected(); uint16_t readPacket(uint8_t *buffer, uint8_t maxlen, int16_t timeout, bool checkForValidPubPacket = false); diff --git a/Adafruit_MQTT_FONA.h b/Adafruit_MQTT_FONA.h index 17d73fe..a65b99d 100644 --- a/Adafruit_MQTT_FONA.h +++ b/Adafruit_MQTT_FONA.h @@ -41,6 +41,12 @@ class Adafruit_MQTT_FONA : public Adafruit_MQTT { fona(f) {} + Adafruit_MQTT_FONA(Adafruit_FONA *f, const char *server, uint16_t port, + const char *user, const char *pass): + Adafruit_MQTT(server, port, user, pass), + fona(f) + {} + bool connectServer() { char server[40]; strncpy_P(server, servername, 40); @@ -51,7 +57,7 @@ class Adafruit_MQTT_FONA : public Adafruit_MQTT { return fona->TCPconnect(server, portnum); } - bool disconnect() { + bool disconnectServer() { return fona->TCPclose(); } diff --git a/examples/mqtt_cc3k/mqtt_cc3k.ino b/examples/mqtt_cc3k/mqtt_cc3k.ino index bce77ed..c7db7ee 100644 --- a/examples/mqtt_cc3k/mqtt_cc3k.ino +++ b/examples/mqtt_cc3k/mqtt_cc3k.ino @@ -45,18 +45,14 @@ // Setup the main CC3000 class, just like a normal CC3000 sketch. Adafruit_CC3000 cc3000 = Adafruit_CC3000(ADAFRUIT_CC3000_CS, ADAFRUIT_CC3000_IRQ, ADAFRUIT_CC3000_VBAT); -// Store the MQTT server, client ID, username, and password in flash memory. +// Store the MQTT server, username, and password in flash memory. // This is required for using the Adafruit MQTT library. const char MQTT_SERVER[] PROGMEM = AIO_SERVER; -// Set a unique MQTT client ID using the AIO key + the date and time the sketch -// was compiled (so this should be unique across multiple devices for a user, -// alternatively you can manually set this to a GUID or other random value). -const char MQTT_CLIENTID[] PROGMEM = __TIME__ AIO_USERNAME; const char MQTT_USERNAME[] PROGMEM = AIO_USERNAME; const char MQTT_PASSWORD[] PROGMEM = AIO_KEY; // Setup the CC3000 MQTT class by passing in the CC3000 class and MQTT server and login details. -Adafruit_MQTT_CC3000 mqtt(&cc3000, MQTT_SERVER, AIO_SERVERPORT, MQTT_CLIENTID, MQTT_USERNAME, MQTT_PASSWORD); +Adafruit_MQTT_CC3000 mqtt(&cc3000, MQTT_SERVER, AIO_SERVERPORT, MQTT_USERNAME, MQTT_PASSWORD); // You don't need to change anything below this line! #define halt(s) { Serial.println(F( s )); while(1); } @@ -108,16 +104,6 @@ void loop() { // connection and automatically reconnect when disconnected). See the MQTT_connect // function definition further below. MQTT_connect(); - - // Try to ping the MQTT server - /* - if (! mqtt.ping(3) ) { - // MQTT pings failed, let's reconnect by forcing a watchdog reset. - Serial.println("Ping fail! Resetting..."); - Watchdog.enable(8000); - delay(10000); - } - */ // this is our 'wait for incoming subscription packets' busy subloop Adafruit_MQTT_Subscribe *subscription; @@ -137,6 +123,12 @@ void loop() { } else { Serial.println(F("OK!")); } + + // ping the server to keep the mqtt connection alive + if(! mqtt.ping()) { + Serial.println(F("MQTT Ping failed.")); + } + } // Function to connect and reconnect as necessary to the MQTT server. diff --git a/examples/mqtt_esp8266/mqtt_esp8266.ino b/examples/mqtt_esp8266/mqtt_esp8266.ino index 8f06265..8defec1 100644 --- a/examples/mqtt_esp8266/mqtt_esp8266.ino +++ b/examples/mqtt_esp8266/mqtt_esp8266.ino @@ -35,18 +35,14 @@ // Create an ESP8266 WiFiClient class to connect to the MQTT server. WiFiClient client; -// Store the MQTT server, client ID, username, and password in flash memory. +// Store the MQTT server, username, and password in flash memory. // This is required for using the Adafruit MQTT library. const char MQTT_SERVER[] PROGMEM = AIO_SERVER; -// Set a unique MQTT client ID using the AIO key + the date and time the sketch -// was compiled (so this should be unique across multiple devices for a user, -// alternatively you can manually set this to a GUID or other random value). -const char MQTT_CLIENTID[] PROGMEM = __TIME__ AIO_USERNAME; const char MQTT_USERNAME[] PROGMEM = AIO_USERNAME; const char MQTT_PASSWORD[] PROGMEM = AIO_KEY; // Setup the MQTT client class by passing in the WiFi client and MQTT server and login details. -Adafruit_MQTT_Client mqtt(&client, MQTT_SERVER, AIO_SERVERPORT, MQTT_CLIENTID, MQTT_USERNAME, MQTT_PASSWORD); +Adafruit_MQTT_Client mqtt(&client, MQTT_SERVER, AIO_SERVERPORT, MQTT_USERNAME, MQTT_PASSWORD); /****************************** Feeds ***************************************/ @@ -94,14 +90,6 @@ void loop() { // function definition further below. MQTT_connect(); - // Try to ping the MQTT server - /* - if (! mqtt.ping(3) ) { - // MQTT pings failed, lets reconnect - Serial.println("Ping fail!"); - } - */ - // this is our 'wait for incoming subscription packets' busy subloop Adafruit_MQTT_Subscribe *subscription; while ((subscription = mqtt.readSubscription(1000))) { @@ -121,7 +109,13 @@ void loop() { Serial.println(F("OK!")); } + // ping the server to keep the mqtt connection alive + if(! mqtt.ping()) { + mqtt.disconnect(); + } + delay(1000); + } // Function to connect and reconnect as necessary to the MQTT server. diff --git a/examples/mqtt_ethernet/mqtt_ethernet.ino b/examples/mqtt_ethernet/mqtt_ethernet.ino index c27f8a8..ecb1a24 100644 --- a/examples/mqtt_ethernet/mqtt_ethernet.ino +++ b/examples/mqtt_ethernet/mqtt_ethernet.ino @@ -91,13 +91,6 @@ void loop() { // connection and automatically reconnect when disconnected). See the MQTT_connect // function definition further below. MQTT_connect(); - - // Try to ping the MQTT server - if (! mqtt.ping(3) ) { - // MQTT pings failed, let's reconnect by forcing a watchdog reset. - Serial.println("Ping fail! Resetting..."); - delay(10000); - } // this is our 'wait for incoming subscription packets' busy subloop Adafruit_MQTT_Subscribe *subscription; @@ -117,6 +110,12 @@ void loop() { } else { Serial.println(F("OK!")); } + + // ping the server to keep the mqtt connection alive + if(! mqtt.ping()) { + mqtt.disconnect(); + } + } // Function to connect and reconnect as necessary to the MQTT server. diff --git a/examples/mqtt_fona/mqtt_fona.ino b/examples/mqtt_fona/mqtt_fona.ino index 4af766a..a7b00aa 100644 --- a/examples/mqtt_fona/mqtt_fona.ino +++ b/examples/mqtt_fona/mqtt_fona.ino @@ -52,18 +52,14 @@ Adafruit_FONA fona = Adafruit_FONA(FONA_RST); /************ Global State (you don't need to change this!) ******************/ -// Store the MQTT server, client ID, username, and password in flash memory. +// Store the MQTT server, username, and password in flash memory. // This is required for using the Adafruit MQTT library. const char MQTT_SERVER[] PROGMEM = AIO_SERVER; -// Set a unique MQTT client ID using the AIO key + the date and time the sketch -// was compiled (so this should be unique across multiple devices for a user, -// alternatively you can manually set this to a GUID or other random value). -const char MQTT_CLIENTID[] PROGMEM = __TIME__ AIO_USERNAME; const char MQTT_USERNAME[] PROGMEM = AIO_USERNAME; const char MQTT_PASSWORD[] PROGMEM = AIO_KEY; // Setup the FONA MQTT class by passing in the FONA class and MQTT server and login details. -Adafruit_MQTT_FONA mqtt(&fona, MQTT_SERVER, AIO_SERVERPORT, MQTT_CLIENTID, MQTT_USERNAME, MQTT_PASSWORD); +Adafruit_MQTT_FONA mqtt(&fona, MQTT_SERVER, AIO_SERVERPORT, MQTT_USERNAME, MQTT_PASSWORD); // You don't need to change anything below this line! #define halt(s) { Serial.println(F( s )); while(1); } @@ -117,30 +113,10 @@ void loop() { // Make sure to reset watchdog every loop iteration! Watchdog.reset(); - // check if we're still connected - if (!fona.TCPconnected() || (txfailures >= MAXTXFAILURES)) { - Serial.println(F("Connecting to MQTT...")); - int8_t ret, retries = 5; - while (retries && (ret = mqtt.connect()) != 0) { - Serial.println(mqtt.connectErrorString(ret)); - Serial.println(F("Retrying MQTT connection")); - retries--; - if (retries == 0) halt("Resetting system"); - delay(5000); - } - Serial.println(F("MQTT Connected!")); - txfailures = 0; - } - - - // Try to ping the MQTT server - /* - if (! mqtt.ping(3) ) { - // MQTT pings failed, lets reconnect - Serial.println("Ping fail!"); - } - */ - + // Ensure the connection to the MQTT server is alive (this will make the first + // connection and automatically reconnect when disconnected). See the MQTT_connect + // function definition further below. + MQTT_connect(); // this is our 'wait for incoming subscription packets' busy subloop Adafruit_MQTT_Subscribe *subscription; @@ -162,4 +138,31 @@ void loop() { Serial.println(F("OK!")); txfailures = 0; } + + // ping the server to keep the mqtt connection alive + if(! mqtt.ping()) { + Serial.println(F("MQTT Ping failed.")); + } + +} + +// Function to connect and reconnect as necessary to the MQTT server. +// Should be called in the loop function and it will take care if connecting. +void MQTT_connect() { + int8_t ret; + + // Stop if already connected. + if (mqtt.connected()) { + return; + } + + Serial.print("Connecting to MQTT... "); + + while ((ret = mqtt.connect()) != 0) { // connect will return 0 for connected + Serial.println(mqtt.connectErrorString(ret)); + Serial.println("Retrying MQTT connection in 5 seconds..."); + mqtt.disconnect(); + delay(5000); // wait 5 seconds + } + Serial.println("MQTT Connected!"); } diff --git a/examples/mqtt_yun/mqtt_yun.ino b/examples/mqtt_yun/mqtt_yun.ino index cea49d6..b02d06d 100644 --- a/examples/mqtt_yun/mqtt_yun.ino +++ b/examples/mqtt_yun/mqtt_yun.ino @@ -35,18 +35,14 @@ // Create a YunClient instance to communicate using the Yun's brighe & Linux OS. YunClient client; -// Store the MQTT server, client ID, username, and password in flash memory. +// Store the MQTT server, username, and password in flash memory. // This is required for using the Adafruit MQTT library. const char MQTT_SERVER[] PROGMEM = AIO_SERVER; -// Set a unique MQTT client ID using the AIO key + the date and time the sketch -// was compiled (so this should be unique across multiple devices for a user, -// alternatively you can manually set this to a GUID or other random value). -const char MQTT_CLIENTID[] PROGMEM = __TIME__ AIO_USERNAME; const char MQTT_USERNAME[] PROGMEM = AIO_USERNAME; const char MQTT_PASSWORD[] PROGMEM = AIO_KEY; // Setup the MQTT client class by passing in the WiFi client and MQTT server and login details. -Adafruit_MQTT_Client mqtt(&client, MQTT_SERVER, AIO_SERVERPORT, MQTT_CLIENTID, MQTT_USERNAME, MQTT_PASSWORD); +Adafruit_MQTT_Client mqtt(&client, MQTT_SERVER, AIO_SERVERPORT, MQTT_USERNAME, MQTT_PASSWORD); /****************************** Feeds ***************************************/ @@ -78,14 +74,6 @@ void loop() { // function definition further below. MQTT_connect(); - // Try to ping the MQTT server - /* - if (! mqtt.ping(3) ) { - // MQTT pings failed, lets reconnect - Console.println("Ping fail!"); - } - */ - // this is our 'wait for incoming subscription packets' busy subloop Adafruit_MQTT_Subscribe *subscription; while ((subscription = mqtt.readSubscription(1000))) { @@ -105,7 +93,13 @@ void loop() { Console.println(F("OK!")); } + // ping the server to keep the mqtt connection alive + if(! mqtt.ping()) { + Serial.println(F("MQTT Ping failed.")); + } + delay(1000); + } // Function to connect and reconnect as necessary to the MQTT server.