reworked mqtt queue data structure, added topic

This commit is contained in:
Bogdan Pilyugin 2022-08-24 14:13:15 +02:00
parent 5f3b1a7a19
commit 267a7de82a
3 changed files with 28 additions and 67 deletions

View File

@ -47,22 +47,12 @@ typedef int mqtt_app_err_t;
#define API_FILE_EMPTY_ERR 15
#define API_UNKNOWN_ERR 16
typedef enum
{
PUBLISH_SYS_DATA,
PUBLISH_USER_DATA
} publish_data_type;
typedef struct
{
publish_data_type dt;
char topic[CONFIG_WEBGUIAPP_MQTT_MAX_TOPIC_LENGTH];
char *raw_data_ptr;
uint32_t data_lenth;
} DATA_SEND_STRUCT;
int data_length;
}DATA_SEND_STRUCT;
/**
* @brief wrapper around esp_mqtt_client_handle_t with additional info

View File

@ -214,45 +214,6 @@ void MQTTReconnect(void)
}
}
static void MQTTPublish(mqtt_client_t *mqtt, DATA_SEND_STRUCT *DSS)
{
char topic[64]; //TODO need to define max topic length
switch (DSS->dt)
{
case PUBLISH_SYS_DATA:
ComposeTopic(topic,
GetSysConf()->mqttStation[mqtt->mqtt_index].RootTopic,
"UPLINK",
GetSysConf()->mqttStation[mqtt->mqtt_index].ClientID,
"SYSTEM");
if (mqtt)
{
esp_mqtt_client_publish(mqtt->mqtt, (const char*) topic, (const char*) DSS->raw_data_ptr, DSS->data_lenth, 0,
0);
ESP_LOGI(TAG, "TOPIC=%.*s", strlen(topic), topic);
}
else
ESP_LOGE(TAG, "MQTT client not initialized");
break;
case PUBLISH_USER_DATA:
ComposeTopic(topic,
GetSysConf()->mqttStation[mqtt->mqtt_index].RootTopic,
"UPLINK",
GetSysConf()->mqttStation[mqtt->mqtt_index].ClientID,
"USER");
if (mqtt)
{
esp_mqtt_client_publish(mqtt->mqtt, (const char*) topic, (const char*) DSS->raw_data_ptr, DSS->data_lenth, 0,
0);
ESP_LOGI(TAG, "TOPIC=%.*s", strlen(topic), topic);
}
else
ESP_LOGE(TAG, "MQTT client not initialized");
break;
}
}
void MQTTTaskTransmit(void *pvParameter)
{
DATA_SEND_STRUCT DSS;
@ -264,19 +225,21 @@ void MQTTTaskTransmit(void *pvParameter)
while (!mqtt[idx].is_connected)
vTaskDelay(pdMS_TO_TICKS(1000));
xQueuePeek(mqtt[idx].mqtt_queue, &DSS, portMAX_DELAY);
MQTTPublish(&mqtt[idx], &DSS);
switch (DSS.dt)
if (mqtt[idx].mqtt)
{
case PUBLISH_USER_DATA:
xQueueReceive(mqtt[idx].mqtt_queue, &DSS, 0);
free(DSS.raw_data_ptr);
break;
case PUBLISH_SYS_DATA:
xQueueReceive(mqtt[idx].mqtt_queue, &DSS, 0);
free(DSS.raw_data_ptr);
break;
esp_mqtt_client_publish(mqtt[idx].mqtt,
(const char*) DSS.topic,
(const char*) DSS.raw_data_ptr,
DSS.data_length, 0, 0);
}
else
ESP_LOGE(TAG, "MQTT client not initialized");
//Here, if need, can be added repeat transmission after delivery timeout.
//In this case, follow code must be skipped here and executed on delivery confirm or on exceeded retry attempts
xQueueReceive(mqtt[idx].mqtt_queue, &DSS, 0);
free(DSS.raw_data_ptr);
}
}

View File

@ -119,9 +119,13 @@ static mqtt_app_err_t ResponceWithError(int idx,
{
memcpy(buf, JSONErrorMess, strlen(JSONErrorMess));
DATA_SEND_STRUCT DSS;
DSS.dt = PUBLISH_SYS_DATA;
ComposeTopic(DSS.topic,
GetSysConf()->mqttStation[idx].RootTopic,
"UPLINK",
GetSysConf()->mqttStation[idx].ClientID,
"SYSTEM");
DSS.raw_data_ptr = buf;
DSS.data_lenth = strlen(JSONErrorMess);
DSS.data_length = strlen(JSONErrorMess);
if (xQueueSend(GetMQTTHandlesPool(idx)->mqtt_queue, &DSS, pdMS_TO_TICKS(1000)) == pdPASS)
return API_OK;
else
@ -206,9 +210,13 @@ static mqtt_app_err_t ResponceWithFile(int idx, espfs_file_t *file,
strcat((fdata + readBytes), tail);
free(filebuf);
DATA_SEND_STRUCT DSS;
DSS.dt = PUBLISH_SYS_DATA;
ComposeTopic(DSS.topic,
GetSysConf()->mqttStation[idx].RootTopic,
"UPLINK",
GetSysConf()->mqttStation[idx].ClientID,
"SYSTEM");
DSS.raw_data_ptr = outbuf;
DSS.data_lenth = (fdata - outbuf) + readBytes + strlen(tail);
DSS.data_length = (fdata - outbuf) + readBytes + strlen(tail);
if (xQueueSend(GetMQTTHandlesPool(idx)->mqtt_queue, &DSS, pdMS_TO_TICKS(1000)) == pdPASS)
return API_OK;
else