properly process packets until you get the one you want (prep for callback w/good retain support)

This commit is contained in:
ladyada 2016-02-08 23:36:25 -05:00
parent 79dd62468f
commit 8f595cdb95
2 changed files with 79 additions and 4 deletions

View File

@ -191,7 +191,7 @@ int8_t Adafruit_MQTT::connect() {
// TODO: The Server is permitted to start sending PUBLISH packets matching the
// Subscription before the Server sends the SUBACK Packet. (will really need to use callbacks - ada)
if(MQTT_PROTOCOL_LEVEL > 3) {
len = readPacket(buffer, 5, CONNECT_TIMEOUT_MS);
len = processPacketsUntil(buffer, MQTT_CTRL_SUBACK, CONNECT_TIMEOUT_MS);
DEBUG_PRINT(F("SubAck:\t"));
DEBUG_PRINTBUFFER(buffer, len);
if ((len != 5) || (buffer[0] != (MQTT_CTRL_SUBACK << 4))) {
@ -204,6 +204,61 @@ int8_t Adafruit_MQTT::connect() {
return 0;
}
uint16_t Adafruit_MQTT::processPacketsUntil(uint8_t *buffer, uint8_t waitforpackettype, uint16_t timeout) {
uint16_t len;
while (len = readFullPacket(buffer, timeout)) {
// TODO: add subscription reading & processing here
if ((buffer[0] >> 4) == waitforpackettype) {
//DEBUG_PRINTLN(F("Found right packet"));
return len;
}
}
return 0;
}
uint16_t Adafruit_MQTT::readFullPacket(uint8_t *buffer, uint16_t timeout) {
// will read a packet and Do The Right Thing with length
uint8_t *pbuff = buffer;
uint8_t rlen;
// read the packet type:
rlen = readPacket(pbuff, 1, timeout);
if (rlen != 1) return 0;
//DEBUG_PRINT(F("Packet Type:\t")); DEBUG_PRINTBUFFER(pbuff, rlen);
pbuff++;
uint32_t value = 0;
uint32_t multiplier = 1;
uint8_t encodedByte;
do {
rlen = readPacket(pbuff, 1, timeout);
if (rlen != 1) return 0;
encodedByte = pbuff[0]; // save the last read val
pbuff++; // get ready for reading the next byte
uint32_t intermediate = encodedByte & 0x7F;
intermediate *= multiplier;
value += intermediate;
multiplier *= 128;
if (multiplier > 128*128*128) {
DEBUG_PRINT(F("Malformed packet len\n"));
return 0;
}
} while (encodedByte & 0x80);
//DEBUG_PRINT(F("Packet Length:\t")); DEBUG_PRINTLN(value);
rlen = readPacket(pbuff, value, timeout);
//DEBUG_PRINT(F("Remaining packet:\t")); DEBUG_PRINTBUFFER(pbuff, rlen);
return ((pbuff - buffer)+rlen);
}
const __FlashStringHelper* Adafruit_MQTT::connectErrorString(int8_t code) {
switch (code) {
case 1: return F("Wrong protocol");
@ -415,7 +470,7 @@ void Adafruit_MQTT::flushIncoming(uint16_t timeout) {
}
bool Adafruit_MQTT::ping(uint8_t num) {
flushIncoming(100);
//flushIncoming(100);
while (num--) {
// Construct and send ping packet.
@ -424,7 +479,7 @@ bool Adafruit_MQTT::ping(uint8_t num) {
continue;
// Process ping reply.
len = readPacket(buffer, 2, PING_TIMEOUT_MS);
len = processPacketsUntil(buffer, MQTT_CTRL_PINGRESP, PING_TIMEOUT_MS);
if (buffer[0] == (MQTT_CTRL_PINGRESP << 4))
return true;
}
@ -692,6 +747,7 @@ Adafruit_MQTT_Subscribe::Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver,
mqtt = mqttserver;
topic = feed;
qos = q;
callback = 0;
}
Adafruit_MQTT_Subscribe::Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver,
@ -699,4 +755,13 @@ Adafruit_MQTT_Subscribe::Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver,
mqtt = mqttserver;
topic = (const char *)feed;
qos = q;
callback = 0;
}
void Adafruit_MQTT_Subscribe::setCallback(SubscribeCallbackType cb) {
callback = cb;
}
void Adafruit_MQTT_Subscribe::removeCallback(void) {
callback = 0;
}

View File

@ -95,6 +95,9 @@
// eg max-subscription-payload-size
#define SUBSCRIPTIONDATALEN 20
//Function pointer called CallbackType that takes a float
//and returns an int
typedef void (*SubscribeCallbackType)(char *);
extern void printBuffer(uint8_t *buffer, uint8_t len);
@ -202,6 +205,11 @@ class Adafruit_MQTT {
virtual uint16_t readPacket(uint8_t *buffer, uint8_t maxlen, int16_t timeout,
bool checkForValidPubPacket = false) = 0;
// Read a full packet, keeping note of the correct length
uint16_t readFullPacket(uint8_t *buffer, uint16_t timeout);
// Properly process packets until you get to one you want
uint16_t processPacketsUntil(uint8_t *buffer, uint8_t waitforpackettype, uint16_t timeout);
// Shared state that subclasses can use:
const char *servername;
int16_t portnum;
@ -255,7 +263,8 @@ class Adafruit_MQTT_Subscribe {
Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver, const char *feedname, uint8_t q=0);
Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver, const __FlashStringHelper *feedname, uint8_t q=0);
bool setCallback(void (*callback)(char *));
void setCallback(SubscribeCallbackType callb);
void removeCallback(void);
const char *topic;
uint8_t qos;
@ -265,6 +274,7 @@ class Adafruit_MQTT_Subscribe {
// ensure nul terminating lastread.
uint8_t datalen;
private:
SubscribeCallbackType callback;
Adafruit_MQTT *mqtt;
};