From 267a7de82ac60a1d0a4e8caed0324e8c5600bfd9 Mon Sep 17 00:00:00 2001 From: Bogdan Date: Wed, 24 Aug 2022 14:13:15 +0200 Subject: [PATCH] reworked mqtt queue data structure, added topic --- include/MQTT.h | 16 +++-------- src/MQTT.c | 63 +++++++++----------------------------------- src/MQTTSysHandler.c | 16 ++++++++--- 3 files changed, 28 insertions(+), 67 deletions(-) diff --git a/include/MQTT.h b/include/MQTT.h index fdc3dfc..b9c0fa0 100644 --- a/include/MQTT.h +++ b/include/MQTT.h @@ -47,22 +47,12 @@ typedef int mqtt_app_err_t; #define API_FILE_EMPTY_ERR 15 #define API_UNKNOWN_ERR 16 - - - -typedef enum -{ - PUBLISH_SYS_DATA, - PUBLISH_USER_DATA -} publish_data_type; - - typedef struct { - publish_data_type dt; + char topic[CONFIG_WEBGUIAPP_MQTT_MAX_TOPIC_LENGTH]; char *raw_data_ptr; - uint32_t data_lenth; -} DATA_SEND_STRUCT; + int data_length; +}DATA_SEND_STRUCT; /** * @brief wrapper around esp_mqtt_client_handle_t with additional info diff --git a/src/MQTT.c b/src/MQTT.c index 387ac08..81726c6 100644 --- a/src/MQTT.c +++ b/src/MQTT.c @@ -214,45 +214,6 @@ void MQTTReconnect(void) } } -static void MQTTPublish(mqtt_client_t *mqtt, DATA_SEND_STRUCT *DSS) -{ - char topic[64]; //TODO need to define max topic length - switch (DSS->dt) - { - case PUBLISH_SYS_DATA: - ComposeTopic(topic, - GetSysConf()->mqttStation[mqtt->mqtt_index].RootTopic, - "UPLINK", - GetSysConf()->mqttStation[mqtt->mqtt_index].ClientID, - "SYSTEM"); - if (mqtt) - { - esp_mqtt_client_publish(mqtt->mqtt, (const char*) topic, (const char*) DSS->raw_data_ptr, DSS->data_lenth, 0, - 0); - ESP_LOGI(TAG, "TOPIC=%.*s", strlen(topic), topic); - } - else - ESP_LOGE(TAG, "MQTT client not initialized"); - break; - - case PUBLISH_USER_DATA: - ComposeTopic(topic, - GetSysConf()->mqttStation[mqtt->mqtt_index].RootTopic, - "UPLINK", - GetSysConf()->mqttStation[mqtt->mqtt_index].ClientID, - "USER"); - if (mqtt) - { - esp_mqtt_client_publish(mqtt->mqtt, (const char*) topic, (const char*) DSS->raw_data_ptr, DSS->data_lenth, 0, - 0); - ESP_LOGI(TAG, "TOPIC=%.*s", strlen(topic), topic); - } - else - ESP_LOGE(TAG, "MQTT client not initialized"); - break; - } -} - void MQTTTaskTransmit(void *pvParameter) { DATA_SEND_STRUCT DSS; @@ -264,19 +225,21 @@ void MQTTTaskTransmit(void *pvParameter) while (!mqtt[idx].is_connected) vTaskDelay(pdMS_TO_TICKS(1000)); xQueuePeek(mqtt[idx].mqtt_queue, &DSS, portMAX_DELAY); - MQTTPublish(&mqtt[idx], &DSS); - switch (DSS.dt) + if (mqtt[idx].mqtt) { - case PUBLISH_USER_DATA: - xQueueReceive(mqtt[idx].mqtt_queue, &DSS, 0); - free(DSS.raw_data_ptr); - break; - - case PUBLISH_SYS_DATA: - xQueueReceive(mqtt[idx].mqtt_queue, &DSS, 0); - free(DSS.raw_data_ptr); - break; + esp_mqtt_client_publish(mqtt[idx].mqtt, + (const char*) DSS.topic, + (const char*) DSS.raw_data_ptr, + DSS.data_length, 0, 0); } + else + ESP_LOGE(TAG, "MQTT client not initialized"); + + //Here, if need, can be added repeat transmission after delivery timeout. + //In this case, follow code must be skipped here and executed on delivery confirm or on exceeded retry attempts + xQueueReceive(mqtt[idx].mqtt_queue, &DSS, 0); + free(DSS.raw_data_ptr); + } } diff --git a/src/MQTTSysHandler.c b/src/MQTTSysHandler.c index 901354b..4398c4b 100644 --- a/src/MQTTSysHandler.c +++ b/src/MQTTSysHandler.c @@ -119,9 +119,13 @@ static mqtt_app_err_t ResponceWithError(int idx, { memcpy(buf, JSONErrorMess, strlen(JSONErrorMess)); DATA_SEND_STRUCT DSS; - DSS.dt = PUBLISH_SYS_DATA; + ComposeTopic(DSS.topic, + GetSysConf()->mqttStation[idx].RootTopic, + "UPLINK", + GetSysConf()->mqttStation[idx].ClientID, + "SYSTEM"); DSS.raw_data_ptr = buf; - DSS.data_lenth = strlen(JSONErrorMess); + DSS.data_length = strlen(JSONErrorMess); if (xQueueSend(GetMQTTHandlesPool(idx)->mqtt_queue, &DSS, pdMS_TO_TICKS(1000)) == pdPASS) return API_OK; else @@ -206,9 +210,13 @@ static mqtt_app_err_t ResponceWithFile(int idx, espfs_file_t *file, strcat((fdata + readBytes), tail); free(filebuf); DATA_SEND_STRUCT DSS; - DSS.dt = PUBLISH_SYS_DATA; + ComposeTopic(DSS.topic, + GetSysConf()->mqttStation[idx].RootTopic, + "UPLINK", + GetSysConf()->mqttStation[idx].ClientID, + "SYSTEM"); DSS.raw_data_ptr = outbuf; - DSS.data_lenth = (fdata - outbuf) + readBytes + strlen(tail); + DSS.data_length = (fdata - outbuf) + readBytes + strlen(tail); if (xQueueSend(GetMQTTHandlesPool(idx)->mqtt_queue, &DSS, pdMS_TO_TICKS(1000)) == pdPASS) return API_OK; else