#include // Create an ESP8266 WiFiClient class to connect to the MQTT server. WiFiClient client; Adafruit_MQTT_Client *mqtt; Adafruit_MQTT_Publish *mqtt_temp; Adafruit_MQTT_Publish *mqtt_pressure; Adafruit_MQTT_Publish *mqtt_dht_temp; Adafruit_MQTT_Publish *mqtt_dht_humidity; #define NB_ELEMENTS(x) (sizeof(x)/ sizeof(x[0])) #define FEED_MAX_SIZE 64 #define TEMPERATURE_FEED_FORMAT "/feeds/%s/temperature" #define PRESSURE_FEED_FORMAT "/feeds/%s/pressure" #define TEMPERATURE_DHT_FEED_FORMAT "/feeds/%s/dht/temperature" #define HUMIDITY_DHT_FEED_FORMAT "/feeds/%s/dht/humidity" char temperatureFeed[FEED_MAX_SIZE] = {}; char pressureFeed[FEED_MAX_SIZE] = {}; char temperatureDhtFeed[FEED_MAX_SIZE] = {}; char humidityDhtFeed[FEED_MAX_SIZE] = {}; // Should have less that MAXSUBSCRIPTIONS elements // MAXSUBSCRIPTIONS is defined is Adafruit_mqtt.h const int gpioWatched[] = CONFIG_MQTT_CONTROLLED_GPIO; #define GPIO_FEED_FORMAT "/feeds/%s/gpio/%d" #define GPIO_SET_FEED_FORMAT "/feeds/%s/gpio/%d/set" char *mqttId; char GPIO_FEED[MAXSUBSCRIPTIONS][FEED_MAX_SIZE] = {}; char GPIO_SET_FEED[MAXSUBSCRIPTIONS][FEED_MAX_SIZE] = {}; Adafruit_MQTT_Publish * mqttGpio[MAXSUBSCRIPTIONS] = {}; bool isMqttConfigured = false; int MqttSetup(char *server, char *user, char *passwd, int port, char * hostname) { mqttId = hostname; snprintf(temperatureFeed, FEED_MAX_SIZE, TEMPERATURE_FEED_FORMAT, mqttId); snprintf(pressureFeed, FEED_MAX_SIZE, PRESSURE_FEED_FORMAT, mqttId); snprintf(temperatureDhtFeed, FEED_MAX_SIZE, TEMPERATURE_DHT_FEED_FORMAT, mqttId); snprintf(humidityDhtFeed, FEED_MAX_SIZE, HUMIDITY_DHT_FEED_FORMAT, mqttId); mqtt = new Adafruit_MQTT_Client(&client, server, port, user, passwd); mqtt_temp = new Adafruit_MQTT_Publish(mqtt, temperatureFeed); mqtt_pressure = new Adafruit_MQTT_Publish(mqtt, pressureFeed); mqtt_dht_temp = new Adafruit_MQTT_Publish(mqtt, temperatureDhtFeed); mqtt_dht_humidity = new Adafruit_MQTT_Publish(mqtt, humidityDhtFeed); isMqttConfigured = server[0] != '\0'; for (uint i = 0 ; i < NB_ELEMENTS(gpioWatched) && i < MAXSUBSCRIPTIONS; i++) { snprintf(GPIO_FEED[i], FEED_MAX_SIZE, GPIO_FEED_FORMAT, mqttId, gpioWatched[i]); snprintf(GPIO_SET_FEED[i], FEED_MAX_SIZE, GPIO_SET_FEED_FORMAT, mqttId, gpioWatched[i]); Adafruit_MQTT_Subscribe *gpioSet = new Adafruit_MQTT_Subscribe(mqtt, GPIO_SET_FEED[i]); mqtt->subscribe(gpioSet); Adafruit_MQTT_Publish *gpio = new Adafruit_MQTT_Publish(mqtt, GPIO_FEED[i]); mqttGpio[i] = gpio; } return 0; } Adafruit_MQTT_Publish *MqttCreatePublisher(const char *fmt, ...){ char buf[128]; va_list args; va_start (args, fmt); vsnprintf(buf, sizeof(buf), (const char *)fmt, args); va_end(args); String *url = new String(buf); return new Adafruit_MQTT_Publish(mqtt, url->c_str()); } bool MqttIsConfigured() { return isMqttConfigured; } int MqttIsConnected() { return (mqtt != NULL) ? mqtt->connected() : 0; } // Function to connect and reconnect as necessary to the MQTT server. // Should be called in the loop function and it will take care if connecting. int MqttConnect() { int8_t ret; if(!isMqttConfigured) return -1; // Stop if already connected. if (mqtt->connected()) { return 0; } uint8_t retries = 3; while ((ret = mqtt->connect()) != 0) { // connect will return 0 for connected SKETCH_DEBUG_PRINTLN(mqtt->connectErrorString(ret)); SKETCH_DEBUG_PRINTLN("Retrying MQTT connection ..."); mqtt->disconnect(); delay(100); // wait retries--; if (retries == 0) { return -1; } } return 0; } int MqttPublish(double temp, double pressure) { if (MqttConnect() == 0) { SKETCH_DEBUG_PRINTLN("publishing !"); mqtt_temp->publish(temp); mqtt_pressure->publish(pressure); } return 0; } int MqttDhtPublish(float temp, float humidity) { if (MqttConnect() == 0) { SKETCH_DEBUG_PRINTLN("publishing DHT!"); mqtt_dht_temp->publish(temp); mqtt_dht_humidity->publish(humidity); } return 0; } int getGpioFromSubscription(Adafruit_MQTT_Subscribe *subscription) { char *temp = strstr(subscription->topic, "/gpio/"); if (!temp) return -1; String gpioStr(temp + strlen("/gpio/")); int idx = gpioStr.indexOf("/"); int gpio = gpioStr.substring(0, idx).toInt(); if (gpio >= 0 && gpio < 32 ) return gpio; else return -1; } int getGpioWatchedIndex(int gpio) { for ( uint i = 0; i < NB_ELEMENTS(gpioWatched); i++) { if (gpio == gpioWatched[i]) return i; } return -1; } void MqttChangeGpioValue(int gpio, int value) { pinMode(gpio, OUTPUT); digitalWrite(gpio, value); int watchIdx = getGpioWatchedIndex(gpio); if (watchIdx >= 0 ) { mqttGpio[watchIdx]->publish(value); } } void MqttCheckSubscription() { if (MqttConnect() == 0) { Adafruit_MQTT_Subscribe *subscription; while ((subscription = mqtt->readSubscription(0))) { int gpio = getGpioFromSubscription(subscription); SKETCH_DEBUG_PRINT("Got Subscription for gpio "); SKETCH_DEBUG_PRINTLN(gpio); if (gpio > 0 && getGpioWatchedIndex(gpio) >= 0) { char *value = (char *) subscription->lastread; SKETCH_DEBUG_PRINT("Receive data: "); SKETCH_DEBUG_PRINTLN(value); MqttChangeGpioValue(gpio, atoi(value)); } } } }