#include #include "utils.h" #include "MQTT.h" Adafruit_MQTT_Client *mqtt; Adafruit_MQTT_Publish *mqtt_temp; Adafruit_MQTT_Publish *mqtt_pressure; Adafruit_MQTT_Publish *mqtt_dht_temp; Adafruit_MQTT_Publish *mqtt_dht_humidity; Adafruit_MQTT_Publish *mqtt_dry; Adafruit_MQTT_Publish *mqtt_ip; Adafruit_MQTT_Publish *mqttGpio[MAXSUBSCRIPTIONS] = {}; Adafruit_MQTT_Publish *mqttPwm[MAXSUBSCRIPTIONS] = {}; #define FEED_MAX_SIZE 96 //FEED have the following formats /feeds/USER/DEVICE_NAME/.... #define TEMPERATURE_FEED_FORMAT "/feeds/%s/%s/temperature" #define PRESSURE_FEED_FORMAT "/feeds/%s/%s/pressure" #define TEMPERATURE_DHT_FEED_FORMAT "/feeds/%s/%s/dht/temperature" #define HUMIDITY_DHT_FEED_FORMAT "/feeds/%s/%s/dht/humidity" #define DRY_FEED_FORMAT "/feeds/%s/%s/dry" #define GPIO_FEED_FORMAT "/feeds/%s/%s/gpio/%d" #define GPIO_SET_FEED_FORMAT "/feeds/%s/%s/gpio/%d/set" #define PWM_FEED_FORMAT "/feeds/%s/%s/gpio/%d" #define PWM_SET_FEED_FORMAT "/feeds/%s/%s/gpio/%d/set" #define IP_FEED_FORMAT "/feeds/%s/%s/configuration/ip/addr" // Should have less that MAXSUBSCRIPTIONS elements // MAXSUBSCRIPTIONS is defined is Adafruit_mqtt.h const int gpioWatched[] = CONFIG_MQTT_CONTROLLED_GPIO; const int pwmWatched[] = CONFIG_CONTROLLED_PWM; char *mqttId; bool isMqttConfigured = false; bool useMqtts = false; int MqttSetup(char *server, char *user, char *passwd, int port, char *hostname) { mqttId = hostname; useMqtts = (port == 8883); isMqttConfigured = server[0] != '\0'; if(!isMqttConfigured) return 0; if(useMqtts) mqtt = new Adafruit_MQTT_Client(new WiFiClientSecure(), server, port, user, passwd); else mqtt = new Adafruit_MQTT_Client(new WiFiClient(), server, port, user, passwd); mqtt_dht_temp = MqttCreatePublisher(TEMPERATURE_DHT_FEED_FORMAT, user, mqttId); mqtt_dht_humidity = MqttCreatePublisher(HUMIDITY_DHT_FEED_FORMAT, user, mqttId); mqtt_temp = MqttCreatePublisher(TEMPERATURE_FEED_FORMAT, user, mqttId); mqtt_pressure = MqttCreatePublisher(PRESSURE_FEED_FORMAT, user, mqttId); mqtt_dry = MqttCreatePublisher(DRY_FEED_FORMAT, user, mqttId); mqtt_ip = MqttCreatePublisher(IP_FEED_FORMAT, user, mqttId); if( NB_ELEMENTS(gpioWatched) + NB_ELEMENTS(pwmWatched) > MAXSUBSCRIPTIONS){ SKETCH_DEBUG_PRINTF("Too much gpio/pwm to control\n Nb gpio %d Nb pwm %d Max is %d", NB_ELEMENTS(gpioWatched), NB_ELEMENTS(pwmWatched), MAXSUBSCRIPTIONS); return -1; } for (uint i = 0 ; i < NB_ELEMENTS(gpioWatched) && i < MAXSUBSCRIPTIONS; i++) { mqtt->subscribe(MqttCreateSubscribe(GPIO_SET_FEED_FORMAT, user, mqttId, gpioWatched[i])); mqttGpio[i] = MqttCreatePublisher(GPIO_FEED_FORMAT, user, mqttId, gpioWatched[i]); } for (uint i = 0 ; i < NB_ELEMENTS(gpioWatched) && i < MAXSUBSCRIPTIONS; i++) { mqtt->subscribe(MqttCreateSubscribe(PWM_SET_FEED_FORMAT, user, mqttId, pwmWatched[i])); mqttPwm[i] = MqttCreatePublisher(PWM_FEED_FORMAT, user, mqttId, pwmWatched[i]); } return 0; } Adafruit_MQTT_Publish *MqttCreatePublisher(const char *fmt, ...){ char buf[FEED_MAX_SIZE]; va_list args; va_start (args, fmt); vsnprintf(buf, sizeof(buf), (const char *)fmt, args); va_end(args); return new Adafruit_MQTT_Publish(mqtt, strdup(buf)); } Adafruit_MQTT_Subscribe *MqttCreateSubscribe(const char *fmt, ...){ char buf[FEED_MAX_SIZE]; va_list args; va_start (args, fmt); vsnprintf(buf, sizeof(buf), (const char *)fmt, args); va_end(args); return new Adafruit_MQTT_Subscribe(mqtt, strdup(buf)); } int MqttIsConnected() { return (isMqttConfigured) ? mqtt->connected() : 0; } // Function to connect and reconnect as necessary to the MQTT server. // Should be called in the loop function and it will take care if connecting. int MqttConnect() { int8_t ret; if(!isMqttConfigured) return -1; // Stop if already connected. if (mqtt->connected()) { return 0; } uint8_t retries = 3; while ((ret = mqtt->connect()) != 0) { // connect will return 0 for connected SKETCH_DEBUG_PRINTLN(mqtt->connectErrorString(ret)); SKETCH_DEBUG_PRINTLN("Retrying MQTT connection ..."); mqtt->disconnect(); delay(100); // wait retries--; if (retries == 0) { return -1; } } return 0; } int MqttPublish(double temp, double pressure) { if (MqttConnect() == 0) { SKETCH_DEBUG_PRINTLN("publishing !"); mqtt_temp->publish(temp); mqtt_pressure->publish(pressure); } return 0; } int MqttDryPublish(int dry) { if (MqttConnect() == 0) { SKETCH_DEBUG_PRINTLN("publishing dry!"); mqtt_dry->publish((dry*100)/1024); } return 0; } int MqttIPPublish(const String &ip) { if (MqttConnect() == 0) { SKETCH_DEBUG_PRINTLN("publishing IP!"); mqtt_ip->publish(ip.c_str()); } return 0; } int MqttDhtPublish(float temp, float humidity) { if (MqttConnect() == 0) { SKETCH_DEBUG_PRINTLN("publishing DHT!"); mqtt_dht_temp->publish(temp); mqtt_dht_humidity->publish(humidity); } return 0; } int getGpioFromSubscription(Adafruit_MQTT_Subscribe *subscription, const char *pattern) { char *temp = strstr(subscription->topic, pattern); if (!temp) return -1; String gpioStr(temp + strlen(pattern)); int idx = gpioStr.indexOf("/"); int gpio = gpioStr.substring(0, idx).toInt(); if (gpio >= 0 && gpio < 32 ) return gpio; else return -1; } void MqttChangeGpioValue(int gpio, int value) { pinMode(gpio, OUTPUT); digitalWrite(gpio, value); int watchIdx = findIndex(gpio, gpioWatched); if (watchIdx >= 0 ) { mqttGpio[watchIdx]->publish(value); } } void MqttChangePWMValue(int gpio, int value) { analogWrite(gpio, value); int watchIdx = findIndex(gpio, gpioWatched); if (watchIdx >= 0 ) { mqttGpio[watchIdx]->publish(value); } } void MqttCheckSubscription() { if (MqttConnect() == 0) { Adafruit_MQTT_Subscribe *subscription; while ((subscription = mqtt->readSubscription(0))) { int gpio = getGpioFromSubscription(subscription, "/gpio/"); if (gpio > 0 && findIndex(gpio, gpioWatched) >= 0) { SKETCH_DEBUG_PRINTF("Got Subscription for GPIO %d\n", gpio); char *value = (char *) subscription->lastread; SKETCH_DEBUG_PRINTF("Receive data: %s\n", value); MqttChangeGpioValue(gpio, atoi(value)); } gpio = getGpioFromSubscription(subscription, "/pwm/"); if (gpio > 0 && findIndex(gpio, pwmWatched) >= 0) { SKETCH_DEBUG_PRINTF("Got Subscription for PWM %d\n", gpio); char *value = (char *) subscription->lastread; SKETCH_DEBUG_PRINTF("Receive data: %s\n", value); MqttChangePWMValue(gpio, atoi(value)); } } } }