// Create an ESP8266 WiFiClient class to connect to the MQTT server. WiFiClient client; Adafruit_MQTT_Client *mqtt; Adafruit_MQTT_Publish *mqtt_temp; Adafruit_MQTT_Publish *mqtt_pressure; #define NB_ELEMENTS(x) (sizeof(x)/ sizeof(x[0])) const char TEMPERATURE_FEED[] = "/feeds/temperature"; const char PRESSURE_FEED[] = "/feeds/pressure"; // Should have less that MAXSUBSCRIPTIONS elements // MAXSUBSCRIPTIONS is defined is Adafruit_mqtt.h const int gpioWatched[] = {12, 13}; #define GPIO_BASE "/feeds/gpio/" const char *GPIO_FEED[] = { GPIO_BASE"12", GPIO_BASE"13"}; const char *GPIO_SET_FEED[] = { GPIO_BASE"12/set", GPIO_BASE"13/set"}; Adafruit_MQTT_Publish * mqttGpio[MAXSUBSCRIPTIONS] = {}; int MqttSetup(char *server, char *user, char *passwd, int port) { // Setup the MQTT client class by passing in the WiFi client and MQTT server and login details. mqtt = new Adafruit_MQTT_Client(&client, server, port, user, passwd); mqtt_temp = new Adafruit_MQTT_Publish(mqtt, TEMPERATURE_FEED); mqtt_pressure = new Adafruit_MQTT_Publish(mqtt, PRESSURE_FEED); for (int i = 0 ; i < NB_ELEMENTS(gpioWatched) && i < MAXSUBSCRIPTIONS; i++) { Adafruit_MQTT_Subscribe *gpioSet = new Adafruit_MQTT_Subscribe(mqtt, GPIO_SET_FEED[i]); mqtt->subscribe(gpioSet); Adafruit_MQTT_Publish *gpio = new Adafruit_MQTT_Publish(mqtt, GPIO_FEED[i]); mqttGpio[i] = gpio; } return 0; } int MqttIsConnected() { return mqtt->connected(); } // Function to connect and reconnect as necessary to the MQTT server. // Should be called in the loop function and it will take care if connecting. int MqttConnect() { int8_t ret; // Stop if already connected. if (mqtt->connected()) { return 0; } uint8_t retries = 3; while ((ret = mqtt->connect()) != 0) { // connect will return 0 for connected Serial.println(mqtt->connectErrorString(ret)); Serial.println("Retrying MQTT connection ..."); mqtt->disconnect(); delay(100); // wait retries--; if (retries == 0) { return -1; } } return 0; } int MqttPublish(double temp, double pressure) { if (MqttConnect() == 0) { Serial.println("publishing !"); mqtt_temp->publish(temp); mqtt_pressure->publish(pressure); } return 0; } int getGpioFromSubscription(Adafruit_MQTT_Subscribe *subscription) { if (!strstr(subscription->topic, GPIO_BASE)) return -1; String gpioStr(subscription->topic + strlen(GPIO_BASE)); int idx = gpioStr.indexOf("/"); int gpio = gpioStr.substring(0, idx).toInt(); if (gpio >= 0 && gpio < 32 ) return gpio; else return -1; } int getGpioWatchedIndex(int gpio) { for ( int i = 0; i < NB_ELEMENTS(gpioWatched); i++) { if (gpio == gpioWatched[i]) return i; } return -1; } void MqttChangeGpioValue(int gpio, int value) { pinMode(gpio, OUTPUT); digitalWrite(gpio, value); int watchIdx = getGpioWatchedIndex(gpio); if (watchIdx >= 0 ) { mqttGpio[watchIdx]->publish(value); } } void MqttCheckSubscription() { if (MqttConnect() == 0) { Adafruit_MQTT_Subscribe *subscription; while (subscription = mqtt->readSubscription(0)) { int gpio = getGpioFromSubscription(subscription); Serial.print("Got Subscription for gpio "); Serial.println(gpio); if (gpio > 0 && getGpioWatchedIndex(gpio) >= 0) { char *value = (char *) subscription->lastread; Serial.print("Receive data: "); Serial.println(value); MqttChangeGpioValue(gpio, atoi(value)); } } } }