add support for reading retained subscriptions (only works for one subscription since the pub packet gets eaten otherwise

This commit is contained in:
ladyada 2016-02-01 18:42:28 -05:00
parent 19b581c2b7
commit 79dd62468f
2 changed files with 41 additions and 13 deletions

View File

@ -189,15 +189,15 @@ int8_t Adafruit_MQTT::connect() {
// Check for SUBACK if using MQTT 3.1.1 or higher
// TODO: The Server is permitted to start sending PUBLISH packets matching the
// Subscription before the Server sends the SUBACK Packet.
// if(MQTT_PROTOCOL_LEVEL > 3) {
// len = readPacket(buffer, 5, CONNECT_TIMEOUT_MS);
// DEBUG_PRINT(F("SUBACK:\t"));
// DEBUG_PRINTBUFFER(buffer, len);
// if ((len != 5) || (buffer[0] != (MQTT_CTRL_SUBACK << 4))) {
// return 6; // failure to subscribe
// }
// }
// Subscription before the Server sends the SUBACK Packet. (will really need to use callbacks - ada)
if(MQTT_PROTOCOL_LEVEL > 3) {
len = readPacket(buffer, 5, CONNECT_TIMEOUT_MS);
DEBUG_PRINT(F("SubAck:\t"));
DEBUG_PRINTBUFFER(buffer, len);
if ((len != 5) || (buffer[0] != (MQTT_CTRL_SUBACK << 4))) {
return 6; // failure to subscribe
}
}
}
@ -329,12 +329,10 @@ bool Adafruit_MQTT::unsubscribe(Adafruit_MQTT_Subscribe *sub) {
if ((len != 5) || (buffer[0] != (MQTT_CTRL_UNSUBACK << 4))) {
return false; // failure to unsubscribe
}
}
subscriptions[i] = 0;
return true;
}
}
@ -374,19 +372,38 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) {
}
if (i==MAXSUBSCRIPTIONS) return NULL; // matching sub not found ???
uint8_t packet_id_len = 0;
uint16_t packetid;
// Check if it is QoS 1, TODO: we dont support QoS 2
if ((buffer[0] & 0x6) == 0x2) {
packet_id_len = 2;
packetid = buffer[topiclen+4];
packetid <<= 8;
packetid |= buffer[topiclen+5];
}
// zero out the old data
memset(subscriptions[i]->lastread, 0, SUBSCRIPTIONDATALEN);
datalen = len - topiclen - 4;
datalen = len - topiclen - packet_id_len - 4;
if (datalen > SUBSCRIPTIONDATALEN) {
datalen = SUBSCRIPTIONDATALEN-1; // cut it off
}
// extract out just the data, into the subscription object itself
memmove(subscriptions[i]->lastread, buffer+4+topiclen, datalen);
memmove(subscriptions[i]->lastread, buffer+4+topiclen+packet_id_len, datalen);
subscriptions[i]->datalen = datalen;
DEBUG_PRINT(F("Data len: ")); DEBUG_PRINTLN(datalen);
DEBUG_PRINT(F("Data: ")); DEBUG_PRINTLN((char *)subscriptions[i]->lastread);
if ((MQTT_PROTOCOL_LEVEL > 3) &&(buffer[0] & 0x6) == 0x2) {
uint8_t ackpacket[4];
// Construct and send puback packet.
uint8_t len = pubackPacket(ackpacket, packetid);
if (!sendPacket(ackpacket, len))
DEBUG_PRINT(F("Failed"));
}
// return the valid matching subscription
return subscriptions[i];
}
@ -605,6 +622,16 @@ uint8_t Adafruit_MQTT::pingPacket(uint8_t *packet) {
return 2;
}
uint8_t Adafruit_MQTT::pubackPacket(uint8_t *packet, uint16_t packetid) {
packet[0] = MQTT_CTRL_PUBACK << 4;
packet[1] = 2;
packet[2] = packetid >> 8;
packet[3] = packetid;
DEBUG_PRINTLN(F("MQTT puback packet:"));
DEBUG_PRINTBUFFER(buffer, 4);
return 4;
}
uint8_t Adafruit_MQTT::disconnectPacket(uint8_t *packet) {
packet[0] = MQTT_CTRL_DISCONNECT << 4;
packet[1] = 0;

View File

@ -227,6 +227,7 @@ class Adafruit_MQTT {
uint8_t subscribePacket(uint8_t *packet, const char *topic, uint8_t qos);
uint8_t unsubscribePacket(uint8_t *packet, const char *topic);
uint8_t pingPacket(uint8_t *packet);
uint8_t pubackPacket(uint8_t *packet, uint16_t packetid);
};