diff --git a/include/MQTT.h b/include/MQTT.h index 5258ea7..7097c87 100644 --- a/include/MQTT.h +++ b/include/MQTT.h @@ -76,5 +76,6 @@ void ComposeTopic(char *topic, int idx, char *service_name, char *direct); void SystemDataHandler(char *data, uint32_t len, int idx); mqtt_app_err_t PublicTestMQTT(int idx); +esp_err_t ExternalServiceMQTTSend(char *data, int len, int idx); #endif /* MAIN_INCLUDE_MQTT_H_ */ diff --git a/src/MQTT.c b/src/MQTT.c index f7114bf..8883c3a 100644 --- a/src/MQTT.c +++ b/src/MQTT.c @@ -28,6 +28,7 @@ #define TAG "MQTT" #define SERVICE_NAME "SYSTEM" // Dedicated service name +#define EXTERNAL_SERVICE_NAME "RS485" #define UPLINK_SUBTOPIC "UPLINK" // Device publish to this topic #define DOWNLINK_SUBTOPIC "DWLINK" // Device listen from this topic @@ -138,6 +139,29 @@ esp_err_t SysServiceMQTTSend(char *data, int len, int idx) return ESP_ERR_NO_MEM; } +esp_err_t ExternalServiceMQTTSend(char *data, int len, int idx) +{ + if (GetMQTTHandlesPool(idx)->mqtt_queue == NULL) + return ESP_ERR_NOT_FOUND; + char *buf = (char*) malloc(len); + if (buf) + { + memcpy(buf, data, len); + MQTT_DATA_SEND_STRUCT DSS; + ComposeTopic(DSS.topic, idx, EXTERNAL_SERVICE_NAME, UPLINK_SUBTOPIC); + DSS.raw_data_ptr = buf; + DSS.data_length = len; + if (xQueueSend(GetMQTTHandlesPool(idx)->mqtt_queue, &DSS, pdMS_TO_TICKS(1000)) == pdPASS) + return ESP_OK; + else + { + free(buf); + return ESP_ERR_TIMEOUT; + } + } + return ESP_ERR_NO_MEM; +} + #define MAX_ERROR_JSON 256 mqtt_app_err_t PublicTestMQTT(int idx) { @@ -192,9 +216,19 @@ static void mqtt_system_event_handler(int idx, void *handler_args, esp_event_bas ComposeTopic(topic, idx, SERVICE_NAME, DOWNLINK_SUBTOPIC); msg_id = esp_mqtt_client_subscribe(client, (const char*) topic, 0); #if MQTT_DEBUG_MODE > 0 - ESP_LOGI(TAG, "Subscribe to %s", topic); ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); + ESP_LOGI(TAG, "Subscribe to %s", topic); #endif +#ifdef CONFIG_UART_TO_MQTT_BRIDGE_ENABLED + ComposeTopic(topic, idx, EXTERNAL_SERVICE_NAME, DOWNLINK_SUBTOPIC); + //Subscribe to the service called "APP" + msg_id = esp_mqtt_client_subscribe(client, (const char*) topic, 0); +#if MQTT_DEBUG_MODE > 0 + ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); + ESP_LOGI(TAG, "Subscribe to %s", topic); +#endif +#endif + break; case MQTT_EVENT_DISCONNECTED: @@ -255,6 +289,15 @@ static void mqtt_system_event_handler(int idx, void *handler_args, esp_event_bas ESP_LOGE(TAG, "Out of free RAM for MQTT API handle"); } +#ifdef CONFIG_UART_TO_MQTT_BRIDGE_ENABLED + ComposeTopic(topic, idx, EXTERNAL_SERVICE_NAME, DOWNLINK_SUBTOPIC); + if (!memcmp(topic, event->topic, event->topic_len)) + { + TransmitSerialPort(event->data, event->data_len); + } +#endif + + break; case MQTT_EVENT_ERROR: ESP_LOGE(TAG, "MQTT_EVENT_ERROR, client %d", idx); diff --git a/src/SerialPort.c b/src/SerialPort.c index 652445a..162996a 100644 --- a/src/SerialPort.c +++ b/src/SerialPort.c @@ -95,6 +95,8 @@ static void ReceiveHandlerAPI() } } + + void serial_RX_task(void *arg) { uart_event_t event; @@ -127,7 +129,7 @@ void serial_RX_task(void *arg) ESP_LOGI(TAG, "read of %d bytes: %s", buffered_size, rxbuf); #endif -#ifdef CONFIG_ UART_TO_MQTT_BRIDGE_ENABLED +#ifdef CONFIG_UART_TO_MQTT_BRIDGE_ENABLED ExternalServiceMQTTSend(rxbuf, buffered_size, 0); ExternalServiceMQTTSend(rxbuf, buffered_size, 1); #else diff --git a/src/SysComm.c b/src/SysComm.c index 9bfa9d9..e13f048 100644 --- a/src/SysComm.c +++ b/src/SysComm.c @@ -284,26 +284,21 @@ static sys_error_code DataHeaderParser(data_message_t *MSG) return SYS_ERROR_PARSE_PAYLOADTYPE; sys_error_code err = SYS_ERROR_HANDLER_NOT_SET; - - //ToDo move payload type from integer to string switch (MSG->parsedData.payloadType) { case PAYLOAD_DEFAULT: - err = CustomPayloadTypeHandler(MSG); + err = PayloadDefaultTypeHandler(MSG); break; } - if (err != SYS_ERROR_HANDLER_NOT_SET) return err; if (CustomPayloadTypeHandler) err = CustomPayloadTypeHandler(MSG); - if (err != SYS_ERROR_HANDLER_NOT_SET) return err; return PayloadDefaultTypeHandler(MSG); - } esp_err_t ServiceDataHandler(data_message_t *MSG) @@ -321,7 +316,10 @@ esp_err_t ServiceDataHandler(data_message_t *MSG) MSG->err_code = SYS_ERROR_UNKNOWN; } else - MSG->err_code = (int) DataHeaderParser(MSG); + { + int er = DataHeaderParser(MSG); + MSG->err_code = er; + } if (MSG->err_code == SYS_GOT_RESPONSE_MESSAGE) {