MQTT new topic compose completed, old removed

This commit is contained in:
Bogdan Pilyugin 2022-08-19 09:32:19 +02:00
parent d6e2375fe1
commit 0ba0da77af
3 changed files with 64 additions and 111 deletions

View File

@ -52,8 +52,8 @@ typedef int mqtt_app_err_t;
typedef enum typedef enum
{ {
PUBLISH_CONTROL_DATA, PUBLISH_SYS_DATA,
PUBLISH_SCREEN_DATA PUBLISH_USER_DATA
} publish_data_type; } publish_data_type;

View File

@ -27,7 +27,6 @@
#define CH_MESSAGE_BUFER_LENTH 32 //size of mqtt queue #define CH_MESSAGE_BUFER_LENTH 32 //size of mqtt queue
#define MQTT_RECONNECT_CHANGE_ADAPTER 3 #define MQTT_RECONNECT_CHANGE_ADAPTER 3
#if CONFIG_WEBGUIAPP_MQTT_ENABLE #if CONFIG_WEBGUIAPP_MQTT_ENABLE
@ -73,10 +72,7 @@ static void log_error_if_nonzero(const char *message, int error_code)
} }
} }
static const char topic_tx[] = "/UPLINK/"; //subtopic for transmit static void ComposeTopic(char *topic, char *system_name, char *direct, char *client_name, char *service_name)
static const char topic_rx[] = "/DOWNLINK/"; //subtopic to receive
static void ComposeTopic(char *topic, char *system_name, char* direct, char *client_name, char *service_name)
{ {
char tmp[4]; char tmp[4];
char dev_rom_id[8]; char dev_rom_id[8];
@ -94,42 +90,6 @@ static void ComposeTopic(char *topic, char *system_name, char* direct, char *cli
} }
static void ComposeTopicControl(char *topic, char *roottopic, char *ident, uint8_t dir)
{
char id[4];
char id2[8];
strcpy((char*) topic, roottopic);
if (dir == 0)
strcat((char*) topic, topic_rx);
else
strcat((char*) topic, topic_tx);
GetChipId((uint8_t*) id);
BytesToStr((unsigned char*) id, (unsigned char*) id2, 4);
strcat((char*) topic, (const char*) id2);
strcat((char*) topic, "/");
strcat((char*) topic, ident);
strcat((char*) topic, (const char*) "/CONTROL");
}
static void ComposeTopicScreen(char *topic, char *roottopic, char *ident, uint8_t dir)
{
char id[4];
char id2[8];
strcpy((char*) topic, roottopic);
if (dir == 0)
strcat((char*) topic, topic_rx);
else
strcat((char*) topic, topic_tx);
GetChipId((uint8_t*) id);
BytesToStr((unsigned char*) id, (unsigned char*) id2, 4);
strcat((char*) topic, (const char*) id2);
strcat((char*) topic, "/");
strcat((char*) topic, ident);
strcat((char*) topic, (const char*) "/SCREEN");
}
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data)
{ {
xSemaphoreTake(xSemaphoreMQTTHandle, pdMS_TO_TICKS(1000)); xSemaphoreTake(xSemaphoreMQTTHandle, pdMS_TO_TICKS(1000));
@ -140,24 +100,27 @@ static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_
int msg_id; int msg_id;
static int MQTTReconnectCounter = 0; //Change network adapter every MQTT_RECONNECT_CHANGE_ADAPTER number attempts static int MQTTReconnectCounter = 0; //Change network adapter every MQTT_RECONNECT_CHANGE_ADAPTER number attempts
char topic[64]; //TODO need define max topic length
switch ((esp_mqtt_event_id_t) event_id) switch ((esp_mqtt_event_id_t) event_id)
{ {
case MQTT_EVENT_CONNECTED: case MQTT_EVENT_CONNECTED:
ctx->is_connected = true; ctx->is_connected = true;
MQTTReconnectCounter = 0; MQTTReconnectCounter = 0;
ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED client %d", ctx->mqtt_index); ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED client %d", ctx->mqtt_index);
char sub[64]; ComposeTopic(topic,
GetSysConf()->mqttStation[ctx->mqtt_index].RootTopic,
char *system_name = GetSysConf()->mqttStation[ctx->mqtt_index].RootTopic; "DOWNLINK",
char *client_name = GetSysConf()->mqttStation[ctx->mqtt_index].ClientID; GetSysConf()->mqttStation[ctx->mqtt_index].ClientID,
char direction[] = "DOWNLINK"; "SYSTEM");
msg_id = esp_mqtt_client_subscribe(client, (const char*) topic, 0);
ComposeTopic(sub, system_name, direction, client_name, "CONTROL");
msg_id = esp_mqtt_client_subscribe(client, (const char*) sub, 0);
ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id);
ComposeTopic(sub, system_name, direction, client_name, "DATA"); ComposeTopic(topic,
msg_id = esp_mqtt_client_subscribe(client, (const char*) sub, 0); GetSysConf()->mqttStation[ctx->mqtt_index].RootTopic,
"DOWNLINK",
GetSysConf()->mqttStation[ctx->mqtt_index].ClientID,
"USER");
msg_id = esp_mqtt_client_subscribe(client, (const char*) topic, 0);
ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id);
break; break;
@ -181,31 +144,28 @@ static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_
break; break;
case MQTT_EVENT_DATA: case MQTT_EVENT_DATA:
ESP_LOGI(TAG, "MQTT_EVENT_DATA, client %d", ctx->mqtt_index); ESP_LOGI(TAG, "MQTT_EVENT_DATA, client %d", ctx->mqtt_index);
char topic[64]; //Check if topic is SYSTEM and pass data to handler
//Check if topic is CONTROL and pass data to handler ComposeTopic(topic,
ComposeTopicControl(topic, GetSysConf()->mqttStation[ctx->mqtt_index].RootTopic, GetSysConf()->mqttStation[ctx->mqtt_index].RootTopic,
"DOWNLINK",
GetSysConf()->mqttStation[ctx->mqtt_index].ClientID, GetSysConf()->mqttStation[ctx->mqtt_index].ClientID,
0); "SYSTEM");
if (!memcmp(topic, event->topic, event->topic_len)) if (!memcmp(topic, event->topic, event->topic_len))
{ {
ControlDataHandler(event->data, event->data_len, ctx->mqtt_index); ControlDataHandler(event->data, event->data_len, ctx->mqtt_index);
ESP_LOGI(TAG, "Control data handler on client %d", ctx->mqtt_index); ESP_LOGI(TAG, "Control data handler on client %d", ctx->mqtt_index);
} }
//end of CONTROL //Check if topic is USER and pass data to handler
ComposeTopic(topic,
//Check if topic is SCREEN and pass data to handler GetSysConf()->mqttStation[ctx->mqtt_index].RootTopic,
ComposeTopicScreen(topic, GetSysConf()->mqttStation[ctx->mqtt_index].RootTopic, "DOWNLINK",
GetSysConf()->mqttStation[ctx->mqtt_index].ClientID, GetSysConf()->mqttStation[ctx->mqtt_index].ClientID,
0); "USER");
if (!memcmp(topic, event->topic, event->topic_len)) if (!memcmp(topic, event->topic, event->topic_len))
{ {
// ScreenDataHandler(event->data, event->data_len, ctx->mqtt_index); //Here TODO registered user define received data handler
ESP_LOGI(TAG, "Screen data handler on client %d", ctx->mqtt_index); ESP_LOGI(TAG, "Screen data handler on client %d", ctx->mqtt_index);
} }
//end of SCREEN
break; break;
case MQTT_EVENT_ERROR: case MQTT_EVENT_ERROR:
ESP_LOGI(TAG, "MQTT_EVENT_ERROR, client %d", ctx->mqtt_index); ESP_LOGI(TAG, "MQTT_EVENT_ERROR, client %d", ctx->mqtt_index);
@ -271,47 +231,40 @@ void MQTTReconnect(void)
static void MQTTPublish(mqtt_client_t *mqtt, DATA_SEND_STRUCT *DSS) static void MQTTPublish(mqtt_client_t *mqtt, DATA_SEND_STRUCT *DSS)
{ {
char top[64]; char topic[64]; //TODO need to define max topic length
switch (DSS->dt) switch (DSS->dt)
{ {
case PUBLISH_SYS_DATA:
case PUBLISH_CONTROL_DATA: ComposeTopic(topic,
ComposeTopicControl(top, GetSysConf()->mqttStation[mqtt->mqtt_index].RootTopic, GetSysConf()->mqttStation[mqtt->mqtt_index].RootTopic,
"UPLINK",
GetSysConf()->mqttStation[mqtt->mqtt_index].ClientID, GetSysConf()->mqttStation[mqtt->mqtt_index].ClientID,
1); "SYSTEM");
if (mqtt) if (mqtt)
{ {
esp_mqtt_client_publish(mqtt->mqtt, (const char*) top, (const char*) DSS->raw_data_ptr, esp_mqtt_client_publish(mqtt->mqtt, (const char*) topic, (const char*) DSS->raw_data_ptr, DSS->data_lenth, 0,
DSS->data_lenth,
0,
0); 0);
ESP_LOGI(TAG, "TOPIC=%.*s", strlen(topic), topic);
ESP_LOGI(TAG, "TOPIC=%.*s", strlen(top), top);
} }
else else
ESP_LOGE(TAG, "MQTT client not initialized"); ESP_LOGE(TAG, "MQTT client not initialized");
break; break;
case PUBLISH_USER_DATA:
case PUBLISH_SCREEN_DATA: ComposeTopic(topic,
ComposeTopicScreen(top, GetSysConf()->mqttStation[mqtt->mqtt_index].RootTopic, GetSysConf()->mqttStation[mqtt->mqtt_index].RootTopic,
"UPLINK",
GetSysConf()->mqttStation[mqtt->mqtt_index].ClientID, GetSysConf()->mqttStation[mqtt->mqtt_index].ClientID,
1); "USER");
if (mqtt) if (mqtt)
{ {
esp_mqtt_client_publish(mqtt->mqtt, (const char*) top, (const char*) DSS->raw_data_ptr, esp_mqtt_client_publish(mqtt->mqtt, (const char*) topic, (const char*) DSS->raw_data_ptr, DSS->data_lenth, 0,
DSS->data_lenth,
0,
0); 0);
ESP_LOGI(TAG, "TOPIC=%.*s", strlen(topic), topic);
ESP_LOGI(TAG, "TOPIC=%.*s", strlen(top), top);
} }
else else
ESP_LOGE(TAG, "MQTT client not initialized"); ESP_LOGE(TAG, "MQTT client not initialized");
break; break;
} }
} }
@ -329,12 +282,12 @@ void MQTTTaskTransmit(void *pvParameter)
MQTTPublish(&mqtt[idx], &DSS); MQTTPublish(&mqtt[idx], &DSS);
switch (DSS.dt) switch (DSS.dt)
{ {
case PUBLISH_SCREEN_DATA: case PUBLISH_USER_DATA:
xQueueReceive(mqtt[idx].mqtt_queue, &DSS, 0); xQueueReceive(mqtt[idx].mqtt_queue, &DSS, 0);
free(DSS.raw_data_ptr); free(DSS.raw_data_ptr);
break; break;
case PUBLISH_CONTROL_DATA: case PUBLISH_SYS_DATA:
xQueueReceive(mqtt[idx].mqtt_queue, &DSS, 0); xQueueReceive(mqtt[idx].mqtt_queue, &DSS, 0);
free(DSS.raw_data_ptr); free(DSS.raw_data_ptr);
break; break;

View File

@ -119,7 +119,7 @@ static mqtt_app_err_t ResponceWithError(int idx,
{ {
memcpy(buf, JSONErrorMess, strlen(JSONErrorMess)); memcpy(buf, JSONErrorMess, strlen(JSONErrorMess));
DATA_SEND_STRUCT DSS; DATA_SEND_STRUCT DSS;
DSS.dt = PUBLISH_CONTROL_DATA; DSS.dt = PUBLISH_SYS_DATA;
DSS.raw_data_ptr = buf; DSS.raw_data_ptr = buf;
DSS.data_lenth = strlen(JSONErrorMess); DSS.data_lenth = strlen(JSONErrorMess);
if (xQueueSend(GetMQTTHandlesPool(idx)->mqtt_queue, &DSS, pdMS_TO_TICKS(1000)) == pdPASS) if (xQueueSend(GetMQTTHandlesPool(idx)->mqtt_queue, &DSS, pdMS_TO_TICKS(1000)) == pdPASS)
@ -206,7 +206,7 @@ static mqtt_app_err_t ResponceWithFile(int idx, espfs_file_t *file,
strcat((fdata + readBytes), tail); strcat((fdata + readBytes), tail);
free(filebuf); free(filebuf);
DATA_SEND_STRUCT DSS; DATA_SEND_STRUCT DSS;
DSS.dt = PUBLISH_CONTROL_DATA; DSS.dt = PUBLISH_SYS_DATA;
DSS.raw_data_ptr = outbuf; DSS.raw_data_ptr = outbuf;
DSS.data_lenth = (fdata - outbuf) + readBytes + strlen(tail); DSS.data_lenth = (fdata - outbuf) + readBytes + strlen(tail);
if (xQueueSend(GetMQTTHandlesPool(idx)->mqtt_queue, &DSS, pdMS_TO_TICKS(1000)) == pdPASS) if (xQueueSend(GetMQTTHandlesPool(idx)->mqtt_queue, &DSS, pdMS_TO_TICKS(1000)) == pdPASS)