callback support
This commit is contained in:
parent
9ed79466b2
commit
4269d87492
@ -219,11 +219,12 @@ int8_t Adafruit_MQTT::connect() {
|
|||||||
// TODO: The Server is permitted to start sending PUBLISH packets matching the
|
// TODO: The Server is permitted to start sending PUBLISH packets matching the
|
||||||
// Subscription before the Server sends the SUBACK Packet. (will really need to use callbacks - ada)
|
// Subscription before the Server sends the SUBACK Packet. (will really need to use callbacks - ada)
|
||||||
|
|
||||||
len = processPacketsUntil(buffer, MQTT_CTRL_SUBACK, SUBACK_TIMEOUT_MS);
|
//Serial.println("\t**looking for suback");
|
||||||
if ((len != 5) || (buffer[0] != (MQTT_CTRL_SUBACK << 4))) {
|
if (processPacketsUntil(buffer, MQTT_CTRL_SUBACK, SUBACK_TIMEOUT_MS)) {
|
||||||
continue; // retry!
|
success = true;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
success = true;
|
//Serial.println("\t**failed, retrying!");
|
||||||
}
|
}
|
||||||
if (! success) return -2; // failed to sub for some reason
|
if (! success) return -2; // failed to sub for some reason
|
||||||
}
|
}
|
||||||
@ -431,8 +432,48 @@ bool Adafruit_MQTT::unsubscribe(Adafruit_MQTT_Subscribe *sub) {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Adafruit_MQTT::processPackets(int16_t timeout) {
|
||||||
|
uint16_t len;
|
||||||
|
|
||||||
|
uint32_t elapsed = 0, endtime, starttime = millis();
|
||||||
|
|
||||||
|
while (elapsed < (uint32_t)timeout) {
|
||||||
|
Adafruit_MQTT_Subscribe *sub = readSubscription(timeout - elapsed);
|
||||||
|
|
||||||
|
if (sub) {
|
||||||
|
//Serial.println("**** sub packet received");
|
||||||
|
if (sub->callback_uint32t != NULL) {
|
||||||
|
// huh lets do the callback in integer mode
|
||||||
|
uint32_t data = 0;
|
||||||
|
data = atoi((char *)sub->lastread);
|
||||||
|
//Serial.print("*** calling int callback with : "); Serial.println(data);
|
||||||
|
sub->callback_uint32t(data);
|
||||||
|
}
|
||||||
|
else if (sub->callback_double != NULL) {
|
||||||
|
// huh lets do the callback in doublefloat mode
|
||||||
|
double data = 0;
|
||||||
|
data = atof((char *)sub->lastread);
|
||||||
|
//Serial.print("*** calling double callback with : "); Serial.println(data);
|
||||||
|
sub->callback_double(data);
|
||||||
|
}
|
||||||
|
else if (sub->callback_buffer != NULL) {
|
||||||
|
// huh lets do the callback in buffer mode
|
||||||
|
//Serial.print("*** calling buffer callback with : "); Serial.println(data);
|
||||||
|
sub->callback_buffer((char *)sub->lastread, sub->datalen);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// keep track over elapsed time
|
||||||
|
endtime = millis();
|
||||||
|
if (endtime < starttime) {
|
||||||
|
starttime = endtime; // looped around!")
|
||||||
|
}
|
||||||
|
elapsed += (endtime - starttime);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) {
|
Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) {
|
||||||
uint8_t i, topiclen, datalen;
|
uint16_t i, topiclen, datalen;
|
||||||
|
|
||||||
// Check if data is available to read.
|
// Check if data is available to read.
|
||||||
uint16_t len = readFullPacket(buffer, MAXBUFFERSIZE, timeout); // return one full packet
|
uint16_t len = readFullPacket(buffer, MAXBUFFERSIZE, timeout); // return one full packet
|
||||||
@ -801,7 +842,9 @@ Adafruit_MQTT_Subscribe::Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver,
|
|||||||
topic = feed;
|
topic = feed;
|
||||||
qos = q;
|
qos = q;
|
||||||
datalen = 0;
|
datalen = 0;
|
||||||
callback = 0;
|
callback_uint32t = 0;
|
||||||
|
callback_buffer = 0;
|
||||||
|
callback_double = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
Adafruit_MQTT_Subscribe::Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver,
|
Adafruit_MQTT_Subscribe::Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver,
|
||||||
@ -810,13 +853,25 @@ Adafruit_MQTT_Subscribe::Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver,
|
|||||||
topic = (const char *)feed;
|
topic = (const char *)feed;
|
||||||
qos = q;
|
qos = q;
|
||||||
datalen = 0;
|
datalen = 0;
|
||||||
callback = 0;
|
callback_uint32t = 0;
|
||||||
|
callback_buffer = 0;
|
||||||
|
callback_double = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Adafruit_MQTT_Subscribe::setCallback(SubscribeCallbackType cb) {
|
void Adafruit_MQTT_Subscribe::setCallback(SubscribeCallbackUInt32Type cb) {
|
||||||
callback = cb;
|
callback_uint32t = cb;
|
||||||
|
}
|
||||||
|
|
||||||
|
void Adafruit_MQTT_Subscribe::setCallback(SubscribeCallbackDoubleType cb) {
|
||||||
|
callback_double = cb;
|
||||||
|
}
|
||||||
|
|
||||||
|
void Adafruit_MQTT_Subscribe::setCallback(SubscribeCallbackBufferType cb) {
|
||||||
|
callback_buffer = cb;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Adafruit_MQTT_Subscribe::removeCallback(void) {
|
void Adafruit_MQTT_Subscribe::removeCallback(void) {
|
||||||
callback = 0;
|
callback_uint32t = 0;
|
||||||
|
callback_buffer = 0;
|
||||||
|
callback_double = 0;
|
||||||
}
|
}
|
||||||
|
@ -30,7 +30,7 @@
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
// Uncomment/comment to turn on/off debug output messages.
|
// Uncomment/comment to turn on/off debug output messages.
|
||||||
#define MQTT_DEBUG
|
//#define MQTT_DEBUG
|
||||||
// Uncomment/comment to turn on/off error output messages.
|
// Uncomment/comment to turn on/off error output messages.
|
||||||
#define MQTT_ERROR
|
#define MQTT_ERROR
|
||||||
|
|
||||||
@ -98,9 +98,12 @@
|
|||||||
// eg max-subscription-payload-size
|
// eg max-subscription-payload-size
|
||||||
#define SUBSCRIPTIONDATALEN 20
|
#define SUBSCRIPTIONDATALEN 20
|
||||||
|
|
||||||
//Function pointer called CallbackType that takes a float
|
//Function pointer that returns an int
|
||||||
//and returns an int
|
typedef void (*SubscribeCallbackUInt32Type)(uint32_t);
|
||||||
typedef void (*SubscribeCallbackType)(char *);
|
// returns a double
|
||||||
|
typedef void (*SubscribeCallbackDoubleType)(double);
|
||||||
|
// returns a chunk of raw data
|
||||||
|
typedef void (*SubscribeCallbackBufferType)(char *str, uint16_t len);
|
||||||
|
|
||||||
extern void printBuffer(uint8_t *buffer, uint16_t len);
|
extern void printBuffer(uint8_t *buffer, uint16_t len);
|
||||||
|
|
||||||
@ -186,6 +189,8 @@ class Adafruit_MQTT {
|
|||||||
// that subscribe should be called first for each topic that receives messages!
|
// that subscribe should be called first for each topic that receives messages!
|
||||||
Adafruit_MQTT_Subscribe *readSubscription(int16_t timeout=0);
|
Adafruit_MQTT_Subscribe *readSubscription(int16_t timeout=0);
|
||||||
|
|
||||||
|
void processPackets(int16_t timeout);
|
||||||
|
|
||||||
// Ping the server to ensure the connection is still alive.
|
// Ping the server to ensure the connection is still alive.
|
||||||
bool ping(uint8_t n = 1);
|
bool ping(uint8_t n = 1);
|
||||||
|
|
||||||
@ -264,7 +269,9 @@ class Adafruit_MQTT_Subscribe {
|
|||||||
Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver, const char *feedname, uint8_t q=0);
|
Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver, const char *feedname, uint8_t q=0);
|
||||||
Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver, const __FlashStringHelper *feedname, uint8_t q=0);
|
Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver, const __FlashStringHelper *feedname, uint8_t q=0);
|
||||||
|
|
||||||
void setCallback(SubscribeCallbackType callb);
|
void setCallback(SubscribeCallbackUInt32Type callb);
|
||||||
|
void setCallback(SubscribeCallbackDoubleType callb);
|
||||||
|
void setCallback(SubscribeCallbackBufferType callb);
|
||||||
void removeCallback(void);
|
void removeCallback(void);
|
||||||
|
|
||||||
const char *topic;
|
const char *topic;
|
||||||
@ -273,9 +280,13 @@ class Adafruit_MQTT_Subscribe {
|
|||||||
uint8_t lastread[SUBSCRIPTIONDATALEN];
|
uint8_t lastread[SUBSCRIPTIONDATALEN];
|
||||||
// Number valid bytes in lastread. Limited to SUBSCRIPTIONDATALEN-1 to
|
// Number valid bytes in lastread. Limited to SUBSCRIPTIONDATALEN-1 to
|
||||||
// ensure nul terminating lastread.
|
// ensure nul terminating lastread.
|
||||||
uint8_t datalen;
|
uint16_t datalen;
|
||||||
|
|
||||||
|
SubscribeCallbackUInt32Type callback_uint32t;
|
||||||
|
SubscribeCallbackDoubleType callback_double;
|
||||||
|
SubscribeCallbackBufferType callback_buffer;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
SubscribeCallbackType callback;
|
|
||||||
Adafruit_MQTT *mqtt;
|
Adafruit_MQTT *mqtt;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user