#ifndef CONFIG_DISABLE_MQTT #include #include "utils.h" #include "MQTT.h" #define MAX_PIN 15 #define MAX_GPIO_OBSERVED (MAXSUBSCRIPTIONS*2) Adafruit_MQTT_Client *mqtt; Adafruit_MQTT_Publish *mqtt_ip; Adafruit_MQTT_Publish *mqttGpio[MAXSUBSCRIPTIONS] = {}; Adafruit_MQTT_Publish *mqttPwm[MAXSUBSCRIPTIONS] = {}; Adafruit_MQTT_Publish *mqttGpioObserved[MAX_GPIO_OBSERVED] = {}; gpioInfo mqttIRQ[MAX_PIN + 1] = {}; #define FEED_MAX_SIZE 96 bool isMqttConfigured = false; bool useMqtts = false; int MqttSetup(const char *server, const char *user, const char *passwd, int port, const char *hostname) { useMqtts = (port == 8883); isMqttConfigured = server[0] != '\0'; if (!isMqttConfigured) return 0; #ifndef CONFIG_DISABLE_SSL if (useMqtts) mqtt = new Adafruit_MQTT_Client(new WiFiClientSecure(), server, port, user, passwd); else #endif mqtt = new Adafruit_MQTT_Client(new WiFiClient(), server, port, user, passwd); mqtt_ip = MqttCreatePublisher(0, 1, IP_FEED_FORMAT, user, hostname); if (NB_ELEMENTS(gpioControlled) + NB_ELEMENTS(pwmControlled) > MAXSUBSCRIPTIONS) { SKETCH_DEBUG_PRINTF("Too much gpio/pwm to control\n Nb gpio %d Nb pwm %d Max is %d", NB_ELEMENTS(gpioControlled), NB_ELEMENTS(pwmControlled), MAXSUBSCRIPTIONS); return -1; } if (NB_ELEMENTS(gpioObserved) > MAX_GPIO_OBSERVED) { SKETCH_DEBUG_PRINTF("Too much gpio observed\n Nb gpio %d Nb is %d", NB_ELEMENTS(gpioObserved), MAX_GPIO_OBSERVED); return -1; } for (uint i = 0 ; i < NB_ELEMENTS(gpioControlled); i++) { mqtt->subscribe(MqttCreateSubscribe(GPIO_SET_FEED_FORMAT, user, hostname, gpioControlled[i])); mqttGpio[i] = MqttCreatePublisher(0, 0, GPIO_FEED_FORMAT, user, hostname, gpioControlled[i]); } for (uint i = 0 ; i < NB_ELEMENTS(gpioObserved) && i < MAX_GPIO_OBSERVED ; i++) { mqttGpioObserved[i] = MqttCreatePublisher(0, 0, GPIO_FEED_FORMAT, user, hostname, gpioObserved[i]); new HIB(gpioObserved[i], HIGH, MqttNofityIRQ , MqttNofityIRQ, NULL ); } for (uint i = 0 ; i < NB_ELEMENTS(pwmControlled); i++) { mqtt->subscribe(MqttCreateSubscribe(PWM_SET_FEED_FORMAT, user, hostname, pwmControlled[i])); mqttPwm[i] = MqttCreatePublisher(0, 0, PWM_FEED_FORMAT, user, hostname, pwmControlled[i]); } return 0; } Adafruit_MQTT_Publish *MqttCreatePublisher( uint8_t qos, uint8_t retain, const char *fmt, ...) { char buf[FEED_MAX_SIZE]; va_list args; va_start (args, fmt); vsnprintf(buf, sizeof(buf), (const char *)fmt, args); va_end(args); return new Adafruit_MQTT_Publish(mqtt, strdup(buf), qos, retain); } int MqttBatchPublish(std::vector tab, ...) { if (MqttConnect()) { SKETCH_DEBUG_PRINTLN("cannot connect to mqtt"); return -1; } for (auto info : tab) { char buf[FEED_MAX_SIZE]; va_list args; va_start (args, tab); vsnprintf(buf, sizeof(buf), (const char *)info.topic, args); va_end(args); // SKETCH_DEBUG_PRINTF("publishing %f for %s\n", info.value, buf); Adafruit_MQTT_Publish client(mqtt, buf, info.qos, info.retain); if (!client.publish(info.value)) SKETCH_DEBUG_PRINTLN("Fail :("); } return 0; } Adafruit_MQTT_Subscribe *MqttCreateSubscribe(const char *fmt, ...) { char buf[FEED_MAX_SIZE]; va_list args; va_start (args, fmt); vsnprintf(buf, sizeof(buf), (const char *)fmt, args); va_end(args); return new Adafruit_MQTT_Subscribe(mqtt, strdup(buf)); } int MqttIsConnected() { return (isMqttConfigured && (mode == BOOTMODE_NORMAL)) ? mqtt->connected() : 0; } // Function to connect and reconnect as necessary to the MQTT server. // Should be called in the loop function and it will take care if connecting. int MqttConnect() { int8_t ret; if (!isMqttConfigured || mode != BOOTMODE_NORMAL) return -1; // Stop if already connected. if (mqtt->connected()) { return 0; } uint8_t retries = 3; while ((ret = mqtt->connect()) != 0) { // connect will return 0 for connected SKETCH_DEBUG_PRINTLN(mqtt->connectErrorString(ret)); SKETCH_DEBUG_PRINTLN("Retrying MQTT connection ..."); mqtt->disconnect(); delay(100); // wait retries--; if (retries == 0) { return -1; } } return 0; } template int MqttPublish(Adafruit_MQTT_Publish *publisher, T value) { if (MqttConnect() == 0) { publisher->publish(value); return 0; } return -1; } int MqttPublishIP(const String &ip) { return MqttPublish(mqtt_ip, ip.c_str()); } int getGpioFromSubscription(Adafruit_MQTT_Subscribe *subscription, const char *pattern) { char *temp = strstr(subscription->topic, pattern); if (!temp) return -1; String gpioStr(temp + strlen(pattern)); int idx = gpioStr.indexOf("/"); int gpio = gpioStr.substring(0, idx).toInt(); if (gpio >= 0 && gpio < 32 ) return gpio; else return -1; } void MqttNofityIRQ(uint8_t gpio, int value) { mqttIRQ[gpio].updated = 1; mqttIRQ[gpio].value = value; } void MqttNofity(int gpio, int value) { if (MqttIsConnected()) { int watchIdx = findIndex(gpio, gpioControlled); if (watchIdx >= 0 ) { mqttGpio[watchIdx]->publish(value); } else { watchIdx = findIndex(gpio, gpioObserved); if (watchIdx >= 0 ) mqttGpioObserved[watchIdx]->publish(value); } } } void MqttChangeGpioValue(int gpio, int value) { pinMode(gpio, OUTPUT); digitalWrite(gpio, value); MqttNofity(gpio, value); } void MqttChangePWMValue(int gpio, int value) { analogWrite(gpio, value); MqttNofity(gpio, value); } void MqttCheckIRQ() { for (uint i = 0 ; i < NB_ELEMENTS(mqttIRQ); i++) { if (mqttIRQ[i].updated == 1) { mqttIRQ[i].updated = 0; MqttNofity(i, mqttIRQ[i].value); } } } void MqttCheckSubscription() { if (MqttConnect() == 0) { Adafruit_MQTT_Subscribe *subscription; while ((subscription = mqtt->readSubscription(0))) { int gpio = getGpioFromSubscription(subscription, "/gpio/"); if (gpio > 0 && findIndex(gpio, gpioControlled) >= 0) { SKETCH_DEBUG_PRINTF("Got Subscription for GPIO %d\n", gpio); char *value = (char *) subscription->lastread; SKETCH_DEBUG_PRINTF("Receive data: %s\n", value); MqttChangeGpioValue(gpio, atoi(value)); } gpio = getGpioFromSubscription(subscription, "/pwm/"); if (gpio > 0 && findIndex(gpio, pwmControlled) >= 0) { SKETCH_DEBUG_PRINTF("Got Subscription for PWM %d\n", gpio); char *value = (char *) subscription->lastread; SKETCH_DEBUG_PRINTF("Receive data: %s\n", value); MqttChangePWMValue(gpio, atoi(value)); } } } } #endif