Merge pull request #28 from stuthedew/Arbitrary_data_publish

Arbitrary data publish
This commit is contained in:
Limor "Ladyada" Fried 2016-02-01 15:09:20 -05:00
commit 19b581c2b7
8 changed files with 449 additions and 17 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
*.bts*

View File

@ -51,7 +51,7 @@ static uint8_t *stringprint(uint8_t *p, char *s) {
uint16_t len = strlen(s);
p[0] = len >> 8; p++;
p[0] = len & 0xFF; p++;
memcpy(p, s, len);
memmove(p, s, len);
return p+len;
}
*/
@ -230,8 +230,12 @@ bool Adafruit_MQTT::disconnect() {
bool Adafruit_MQTT::publish(const char *topic, const char *data, uint8_t qos) {
return publish(topic, (uint8_t*)(data), strlen(data), qos);
}
bool Adafruit_MQTT::publish(const char *topic, uint8_t *data, uint8_t bLen, uint8_t qos) {
// Construct and send publish packet.
uint8_t len = publishPacket(buffer, topic, data, qos);
uint8_t len = publishPacket(buffer, topic, data, bLen, qos);
if (!sendPacket(buffer, len))
return false;
@ -240,17 +244,17 @@ bool Adafruit_MQTT::publish(const char *topic, const char *data, uint8_t qos) {
len = readPacket(buffer, 4, PUBLISH_TIMEOUT_MS);
DEBUG_PRINT(F("Publish QOS1+ reply:\t"));
DEBUG_PRINTBUFFER(buffer, len);
if (len != 4)
if (len != 4)
return false;
if ((buffer[0] >> 4) != MQTT_CTRL_PUBACK)
if ((buffer[0] >> 4) != MQTT_CTRL_PUBACK)
return false;
uint16_t packnum = buffer[2];
packnum <<= 8;
packnum |= buffer[3];
// we increment the packet_id_counter right after publishing so inc here too to match
packnum++;
if (packnum != packet_id_counter)
packnum++;
if (packnum != packet_id_counter)
return false;
}
@ -378,7 +382,7 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) {
datalen = SUBSCRIPTIONDATALEN-1; // cut it off
}
// extract out just the data, into the subscription object itself
memcpy(subscriptions[i]->lastread, buffer+4+topiclen, datalen);
memmove(subscriptions[i]->lastread, buffer+4+topiclen, datalen);
subscriptions[i]->datalen = datalen;
DEBUG_PRINT(F("Data len: ")); DEBUG_PRINTLN(datalen);
DEBUG_PRINT(F("Data: ")); DEBUG_PRINTLN((char *)subscriptions[i]->lastread);
@ -401,13 +405,13 @@ bool Adafruit_MQTT::ping(uint8_t num) {
uint8_t len = pingPacket(buffer);
if (!sendPacket(buffer, len))
continue;
// Process ping reply.
len = readPacket(buffer, 2, PING_TIMEOUT_MS);
if (buffer[0] == (MQTT_CTRL_PINGRESP << 4))
return true;
}
return false;
}
@ -501,9 +505,10 @@ uint8_t Adafruit_MQTT::connectPacket(uint8_t *packet) {
return len;
}
// as per http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718040
uint8_t Adafruit_MQTT::publishPacket(uint8_t *packet, const char *topic,
const char *data, uint8_t qos) {
uint8_t Adafruit_MQTT::publishPacket(uint8_t *packet, const char *topic,
uint8_t *data, uint8_t bLen, uint8_t qos) {
uint8_t *p = packet;
uint16_t len;
@ -524,16 +529,17 @@ uint8_t Adafruit_MQTT::publishPacket(uint8_t *packet, const char *topic,
packet_id_counter++;
}
memcpy(p, data, strlen(data));
p+=strlen(data);
memmove(p, data, bLen);
p+= bLen;
len = p - packet;
packet[1] = len-2; // don't include the 2 bytes of fixed header data
DEBUG_PRINTLN(F("MQTT publish packet:"));
DEBUG_PRINTBUFFER(buffer, len);
return len;
}
uint8_t Adafruit_MQTT::subscribePacket(uint8_t *packet, const char *topic,
uint8_t Adafruit_MQTT::subscribePacket(uint8_t *packet, const char *topic,
uint8_t qos) {
uint8_t *p = packet;
uint16_t len;
@ -562,6 +568,8 @@ uint8_t Adafruit_MQTT::subscribePacket(uint8_t *packet, const char *topic,
return len;
}
uint8_t Adafruit_MQTT::unsubscribePacket(uint8_t *packet, const char *topic) {
uint8_t *p = packet;
@ -607,7 +615,7 @@ uint8_t Adafruit_MQTT::disconnectPacket(uint8_t *packet) {
// Adafruit_MQTT_Publish Definition ////////////////////////////////////////////
Adafruit_MQTT_Publish::Adafruit_MQTT_Publish(Adafruit_MQTT *mqttserver,
Adafruit_MQTT_Publish::Adafruit_MQTT_Publish(Adafruit_MQTT *mqttserver,
const char *feed, uint8_t q) {
mqtt = mqttserver;
topic = feed;
@ -643,10 +651,16 @@ bool Adafruit_MQTT_Publish::publish(const char *payload) {
return mqtt->publish(topic, payload, qos);
}
//publish buffer of arbitrary length
bool Adafruit_MQTT_Publish::publish(uint8_t *payload, uint8_t bLen) {
return mqtt->publish(topic, payload, bLen, qos);
}
// Adafruit_MQTT_Subscribe Definition //////////////////////////////////////////
Adafruit_MQTT_Subscribe::Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver,
Adafruit_MQTT_Subscribe::Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver,
const char *feed, uint8_t q) {
mqtt = mqttserver;
topic = feed;

View File

@ -160,6 +160,7 @@ class Adafruit_MQTT {
// The topic must be stored in PROGMEM. It can either be a
// char*, or a __FlashStringHelper* (the result of the F() macro).
bool publish(const char *topic, const char *payload, uint8_t qos = 0);
bool publish(const char *topic, uint8_t *payload, uint8_t bLen, uint8_t qos = 0);
bool publish(const __FlashStringHelper *topic, const char *payload, uint8_t qos = 0) {
return publish((const char *)topic, payload, qos);
}
@ -222,7 +223,7 @@ class Adafruit_MQTT {
// Functions to generate MQTT packets.
uint8_t connectPacket(uint8_t *packet);
uint8_t disconnectPacket(uint8_t *packet);
uint8_t publishPacket(uint8_t *packet, const char *topic, const char *payload, uint8_t qos);
uint8_t publishPacket(uint8_t *packet, const char *topic, uint8_t *payload, uint8_t bLen, uint8_t qos);
uint8_t subscribePacket(uint8_t *packet, const char *topic, uint8_t qos);
uint8_t unsubscribePacket(uint8_t *packet, const char *topic);
uint8_t pingPacket(uint8_t *packet);
@ -239,6 +240,8 @@ class Adafruit_MQTT_Publish {
// This might be ignored and a higher precision value sent.
bool publish(int32_t i);
bool publish(uint32_t i);
bool publish(uint8_t *b, uint8_t bLen);
private:
Adafruit_MQTT *mqtt;

View File

@ -0,0 +1,108 @@
# Adafruit MQTT Library Arbitrary Data Publish Example
This example illustrates publishing an arbitrary data packet using the Adafruit MQTT library to an MQTT feed which can then be parsed by the included python subscriber client. Possible usage cases include adding metadata (collection time, sensor info etc) to a datapoint.
![alt-text](https://raw.githubusercontent.com/stuthedew/Adafruit_MQTT_Library/Arbitrary_data_publish/examples/mqtt_arbitrary_data/python_subscriber/mqtt_figure.png "Arbitrary data flow diagram")
My motivation for this was wanting to be able to include metadata to a post.
Specifically, I was playing around with a [Teviso RD3024 radiation sensor](http://www.teviso.com/en/products/radiation-sensor-rd3024.htm), and a salvaged Americium radiation source from a smoke detector, at varying distances from the sensor. I wanted a way to associate the collection time, and distance between the source and sensor with the actual radiation reading itself.
---
## Installing and configuring Mosquitto broker (minimal working setup):
####_Installing on Raspberry Pi/Linux:_
```bash
sudo apt-get install mosquitto
cd /etc/mosquitto/
#See "Configuring Mosquitto Broker below"
```
####_Installing On a Mac:_
```bash
brew install mosquitto
cd /usr/local/etc/mosquitto
#See "Configuring Mosquitto Broker below"
```
---
####Configuring Mosquitto broker
```bash
sudo nano mosquitto.conf
```
Now we have to enable a password file to correctly interface with the Adafruit MQTT library. Scroll about two thirds of the way down until you see:
```bash
# -----------------------------------------------------------------
# Default authentication and topic access control
# -----------------------------------------------------------------
```
You should see `#password_file` about a paragraph after that.
Change
```bash
#password_file
```
To
```bash
password_file pwfile
```
Now `ctrl-x` to save and exit.
You're almost done! We just have to create and populate the password file we just configured. The default user info is:
* **Arduino Subscriber:**
* Username: TestUser
* Password: TestUser
* **Python Subscriber:**
* Username: TestPy
* Password: TestPy
```bash
touch pwfile #create the password file
mosquitto_passwd pwfile TestUser #Enter and confirm password when prompted
mosquitto_passwd pwfile TestPy #Enter and confirm password when prompted
```
####Running Mosquitto broker
Now run Mosquitto broker to allow Arduino publisher and Python subscriber to communicate
```bash
mosquitto
```
---
## Using Example Python Subscriber:
####Installing Python subscriber
Install dependencies if you haven't already
```bash
cd ../Adafruit_MQTT_Library/examples/mqtt_arbitrary_buffer/python_subscriber
pip install -r requirements.txt
```
####Installing Python subscriber
Run python script with default values and watch your parsed data print out.
```bash
python subscriber.py #Add -h flag to see modifiable options
```
Assuming that the Mosquitto broker is running in the background and the Adafruit_MQTT client (Arduino) is publishing, you should see the example data print out every 10 seconds.
```bash
MQTT: Connection successful
Connection successful
Subscribed to /feeds/arb_packet
Received char Array: "Hello!", val1: -4533, val2: 73102, val3: 3354...
Received char Array: "Hello!", val1: -4533, val2: 83611, val3: 3354...
Received char Array: "Hello!", val1: -4533, val2: 94115, val3: 3354...
```

View File

@ -0,0 +1,190 @@
/***************************************************
Adafruit MQTT Library Arbitrary Data Example
Must use ESP8266 Arduino from:
https://github.com/esp8266/Arduino
Works great with Adafruit's Huzzah ESP board & Feather
----> https://www.adafruit.com/product/2471
----> https://www.adafruit.com/products/2821
Adafruit invests time and resources providing this open source code,
please support Adafruit and open-source hardware by purchasing
products from Adafruit!
Written by Stuart Feichtinger
Modifed from the mqtt_esp8266 example written by Tony DiCola for Adafruit Industries.
MIT license, all text above must be included in any redistribution
****************************************************/
#include <ESP8266WiFi.h>
#include "Adafruit_MQTT.h"
#include "Adafruit_MQTT_Client.h"
/************************* WiFi Access Point *********************************/
#define WLAN_SSID "...your SSID..."
#define WLAN_PASS "...your password..."
/************************* Adafruit.io Setup *********************************/
#define ARB_SERVER "...host computer ip address..."
#define ARB_SERVERPORT 1883 // use 8883 for SSL
#define ARB_USERNAME "TestUser"
#define ARB_PW "TestUser"
/************ Global State (you don't need to change this!) ******************/
// Create an ESP8266 WiFiClient class to connect to the MQTT server.
WiFiClient client;
// or... use WiFiFlientSecure for SSL
//WiFiClientSecure client;
// Store the MQTT server, username, and password in flash memory.
// This is required for using the Adafruit MQTT library.
const char MQTT_SERVER[] PROGMEM = ARB_SERVER;
const char MQTT_USERNAME[] PROGMEM = ARB_USERNAME;
const char MQTT_PASSWORD[] PROGMEM = ARB_PW;
// Setup the MQTT client class by passing in the WiFi client and MQTT server and login details.
Adafruit_MQTT_Client mqtt(&client, MQTT_SERVER, ARB_SERVERPORT, MQTT_USERNAME, MQTT_PASSWORD);
/****************************** Feeds ***************************************/
// Setup a feed called 'arb_packet' for publishing.
// Notice MQTT paths for AIO follow the form: <username>/feeds/<feedname>
const char ARB_FEED[] PROGMEM = "/feeds/arb_packet";
Adafruit_MQTT_Publish ap = Adafruit_MQTT_Publish(&mqtt, ARB_FEED);
// Arbitrary Payload
// Union allows for easier interaction of members in struct form with easy publishing
// of "raw" bytes
typedef union{
//Customize struct with whatever variables/types you like.
struct __attribute__((__packed__)){ // packed to eliminate padding for easier parsing.
char charAry[10];
int16_t val1;
unsigned long val2;
uint16_t val3;
}s;
uint8_t raw[sizeof(s)]; // For publishing
/*
// Alternate Option with anonymous struct, but manual byte count:
struct __attribute__((__packed__)){ // packed to eliminate padding for easier parsing.
char charAry[10]; // 10 x 1 byte = 10 bytes
int16_t val1; // 1 x 2 bytes = 2 bytes
unsigned long val2; // 1 x 4 bytes = 4 bytes
uint16_t val3; // 1 x 2 bytes = 2 bytes
-------------------
TOTAL = 18 bytes
};
uint8_t raw[18]; // For publishing
*/
} packet_t;
/*************************** Sketch Code ************************************/
// Bug workaround for Arduino 1.6.6, it seems to need a function declaration
// for some reason (only affects ESP8266, likely an arduino-builder bug).
void MQTT_connect();
void setup() {
Serial.begin(115200);
delay(10);
Serial.println(F("Adafruit MQTT demo"));
// Connect to WiFi access point.
Serial.println(); Serial.println();
Serial.print(F("Connecting to "));
Serial.println(WLAN_SSID);
WiFi.begin(WLAN_SSID, WLAN_PASS);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(F("."));
}
Serial.println();
Serial.println(F("WiFi connected"));
Serial.println(F("IP address: ")); Serial.println(WiFi.localIP());
}
packet_t arbPac;
const char strVal[] PROGMEM = "Hello!";
void loop() {
// Ensure the connection to the MQTT server is alive (this will make the first
// connection and automatically reconnect when disconnected). See the MQTT_connect
// function definition further below.
MQTT_connect();
//Update arbitrary packet values
strcpy_P(arbPac.s.charAry, strVal);
arbPac.s.val1 = -4533;
arbPac.s.val2 = millis();
arbPac.s.val3 = 3354;
/*
// Alternate Union with anonymous struct
// (see union declaration above)
strcpy_P(arbPac.charAry, strVal);
arbPac.val1 = -4533;
arbPac.val2 = millis();
arbPac.val3 = 3354;
*/
if (! ap.publish(arbPac.raw, sizeof(packet_t)))
Serial.println(F("Publish Failed."));
else {
Serial.println(F("Publish Success!"));
delay(500);
}
delay(10000);
// ping the server to keep the mqtt connection alive
// NOT required if you are publishing once every KEEPALIVE seconds
/*
if(! mqtt.ping()) {
mqtt.disconnect();
}
*/
}
// Function to connect and reconnect as necessary to the MQTT server.
// Should be called in the loop function and it will take care if connecting.
void MQTT_connect() {
int8_t ret;
// Stop if already connected.
if (mqtt.connected()) {
return;
}
Serial.print(F("Connecting to MQTT... "));
uint8_t retries = 3;
while ((ret = mqtt.connect()) != 0) { // connect will return 0 for connected
Serial.println(mqtt.connectErrorString(ret));
Serial.println(F("Retrying MQTT connection in 5 seconds..."));
mqtt.disconnect();
delay(5000); // wait 5 seconds
retries--;
if (retries == 0) {
// basically die and wait for WDT to reset me
while (1);
}
}
Serial.println(F("MQTT Connected!"));
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 865 KiB

View File

@ -0,0 +1 @@
paho-mqtt>=1.1

View File

@ -0,0 +1,114 @@
'''MQTT subscriber for Adafruit MQTT library mqtt_arbitrary_buffer example'''
import paho.mqtt.client as mqtt
import argparse
import struct
import array
import sys
return_str =[
"Connection successful",
"incorrect protocol version",
"invalid client identifier",
"server unavailable",
"bad username or password",
"not authorised"
]
args = None
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, rc):
"""callback function on connect. Subscribes or exits depending on outcome"""
print("MQTT: "),
print(return_str[rc])
if(rc > 1):
print("Connection refused - " + return_str[rc])
sys.exit(rc)
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
else:
print(return_str[rc])
client.subscribe(args.topic)
print("Subscribed to {}".format(args.topic))
def on_disconnect(client, userdata, rc):
"""Callback for disconnect"""
if rc != 0:
print("Unexpected disconnection.")
client.reconnect()
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
try:
pMsg = parseMsg(msg.payload)
print("Received char Array: \"{}\", val1: {}, val2: {}, val3: {}...".format(pMsg[0], pMsg[1], pMsg[2], pMsg[3]))
except Exception as err:
print err
def argBegin():
parser = argparse.ArgumentParser(description='MQTT subscriber for Adafruit MQTT library mqtt_arbitrary_buffer example')
parser.add_argument("--host", default="localhost", help='mqtt host to connect to. Defaults to localhost.')
parser.add_argument("-p", "--port", default=1883, help='network port to connect to. Defaults to 1883.')
parser.add_argument("-t", "--topic", nargs='*', default="/feeds/arb_packet", help="mqtt topic to subscribe to. May be repeated multiple times.")
parser.add_argument("-u", "--username", default="testPy", help="provide a username (requires MQTT 3.1 broker)")
parser.add_argument("-P", "--password", default="testPy", help="provide a password (requires MQTT 3.1 broker)")
parser.add_argument("-k", "--keepalive", default=60, help="keep alive in seconds for this client. Defaults to 60.")
return parser.parse_args()
def parseMsg(payload):
"""Parses C struct from MQTT publisher Adafruit MQTT client to Python list"""
arr = array.array('B', payload) #convert python list to C-like array of unsigned char (B)
parsedStruct = struct.Struct('< 10s h L H') #define struct template (see below)
'''
Format of Struct from Adafruit MQTT client, Arduino, etc:
Adafruit MQTT client == Little endian (<)
Var NAME | C TYPE (python symbol) | size of member x bytes
-------------------------------------------------------------------
"charAry" | uchar (s) | 10s x 1 = 10 bytes
"val1" | int16 / short (h) | 1h x 2 = 2 bytes
"val2" | unsigned long (L) | 1L x 4 = 4 bytes
"val3" | uint16/unsigned short(H)| 1H x 2 = 2 bytes
------------------------------------------------------------------
Total packet size = | 18 bytes |
See Section 7.3.2 of Python struct module documentation for complete format list
https://docs.python.org/2/library/struct.html
'''
charAry, val1, val2, val3 = parsedStruct.unpack_from(arr) #convert byte array to formatted struct
charAry = charAry.rstrip(' \t\r\n\0') #remove trailing white space from buffer
return charAry, val1, val2, val3
def main():
"""Wait for incoming message published by Adafruit MQTT client"""
global args
args = argBegin()
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.username_pw_set(args.username, args.password)
client.connect(args.host, args.port, args.keepalive)
# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
client.loop_forever()
if __name__ == '__main__':
sys.exit(main())