diff --git a/include/MQTT.h b/include/MQTT.h index ba2782e..16a3912 100644 --- a/include/MQTT.h +++ b/include/MQTT.h @@ -52,8 +52,8 @@ typedef int mqtt_app_err_t; typedef enum { - PUBLISH_CONTROL_DATA, - PUBLISH_SCREEN_DATA + PUBLISH_SYS_DATA, + PUBLISH_USER_DATA } publish_data_type; diff --git a/src/MQTT.c b/src/MQTT.c index c025df7..f1443d5 100644 --- a/src/MQTT.c +++ b/src/MQTT.c @@ -27,7 +27,6 @@ #define CH_MESSAGE_BUFER_LENTH 32 //size of mqtt queue - #define MQTT_RECONNECT_CHANGE_ADAPTER 3 #if CONFIG_WEBGUIAPP_MQTT_ENABLE @@ -73,10 +72,7 @@ static void log_error_if_nonzero(const char *message, int error_code) } } -static const char topic_tx[] = "/UPLINK/"; //subtopic for transmit -static const char topic_rx[] = "/DOWNLINK/"; //subtopic to receive - -static void ComposeTopic(char *topic, char *system_name, char* direct, char *client_name, char *service_name) +static void ComposeTopic(char *topic, char *system_name, char *direct, char *client_name, char *service_name) { char tmp[4]; char dev_rom_id[8]; @@ -94,42 +90,6 @@ static void ComposeTopic(char *topic, char *system_name, char* direct, char *cli } -static void ComposeTopicControl(char *topic, char *roottopic, char *ident, uint8_t dir) -{ - char id[4]; - char id2[8]; - strcpy((char*) topic, roottopic); - if (dir == 0) - strcat((char*) topic, topic_rx); - else - strcat((char*) topic, topic_tx); - GetChipId((uint8_t*) id); - BytesToStr((unsigned char*) id, (unsigned char*) id2, 4); - strcat((char*) topic, (const char*) id2); - strcat((char*) topic, "/"); - strcat((char*) topic, ident); - strcat((char*) topic, (const char*) "/CONTROL"); -} - -static void ComposeTopicScreen(char *topic, char *roottopic, char *ident, uint8_t dir) -{ - char id[4]; - char id2[8]; - strcpy((char*) topic, roottopic); - if (dir == 0) - strcat((char*) topic, topic_rx); - else - strcat((char*) topic, topic_tx); - GetChipId((uint8_t*) id); - BytesToStr((unsigned char*) id, (unsigned char*) id2, 4); - strcat((char*) topic, (const char*) id2); - strcat((char*) topic, "/"); - strcat((char*) topic, ident); - strcat((char*) topic, (const char*) "/SCREEN"); -} - - - static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) { xSemaphoreTake(xSemaphoreMQTTHandle, pdMS_TO_TICKS(1000)); @@ -140,24 +100,27 @@ static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_ int msg_id; static int MQTTReconnectCounter = 0; //Change network adapter every MQTT_RECONNECT_CHANGE_ADAPTER number attempts + char topic[64]; //TODO need define max topic length switch ((esp_mqtt_event_id_t) event_id) { case MQTT_EVENT_CONNECTED: ctx->is_connected = true; MQTTReconnectCounter = 0; ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED client %d", ctx->mqtt_index); - char sub[64]; - - char *system_name = GetSysConf()->mqttStation[ctx->mqtt_index].RootTopic; - char *client_name = GetSysConf()->mqttStation[ctx->mqtt_index].ClientID; - char direction[] = "DOWNLINK"; - - ComposeTopic(sub, system_name, direction, client_name, "CONTROL"); - msg_id = esp_mqtt_client_subscribe(client, (const char*) sub, 0); + ComposeTopic(topic, + GetSysConf()->mqttStation[ctx->mqtt_index].RootTopic, + "DOWNLINK", + GetSysConf()->mqttStation[ctx->mqtt_index].ClientID, + "SYSTEM"); + msg_id = esp_mqtt_client_subscribe(client, (const char*) topic, 0); ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); - ComposeTopic(sub, system_name, direction, client_name, "DATA"); - msg_id = esp_mqtt_client_subscribe(client, (const char*) sub, 0); + ComposeTopic(topic, + GetSysConf()->mqttStation[ctx->mqtt_index].RootTopic, + "DOWNLINK", + GetSysConf()->mqttStation[ctx->mqtt_index].ClientID, + "USER"); + msg_id = esp_mqtt_client_subscribe(client, (const char*) topic, 0); ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); break; @@ -181,31 +144,28 @@ static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_ break; case MQTT_EVENT_DATA: ESP_LOGI(TAG, "MQTT_EVENT_DATA, client %d", ctx->mqtt_index); - char topic[64]; - //Check if topic is CONTROL and pass data to handler - ComposeTopicControl(topic, GetSysConf()->mqttStation[ctx->mqtt_index].RootTopic, - GetSysConf()->mqttStation[ctx->mqtt_index].ClientID, - 0); + //Check if topic is SYSTEM and pass data to handler + ComposeTopic(topic, + GetSysConf()->mqttStation[ctx->mqtt_index].RootTopic, + "DOWNLINK", + GetSysConf()->mqttStation[ctx->mqtt_index].ClientID, + "SYSTEM"); if (!memcmp(topic, event->topic, event->topic_len)) { ControlDataHandler(event->data, event->data_len, ctx->mqtt_index); ESP_LOGI(TAG, "Control data handler on client %d", ctx->mqtt_index); - } - //end of CONTROL - - //Check if topic is SCREEN and pass data to handler - ComposeTopicScreen(topic, GetSysConf()->mqttStation[ctx->mqtt_index].RootTopic, - GetSysConf()->mqttStation[ctx->mqtt_index].ClientID, - 0); + //Check if topic is USER and pass data to handler + ComposeTopic(topic, + GetSysConf()->mqttStation[ctx->mqtt_index].RootTopic, + "DOWNLINK", + GetSysConf()->mqttStation[ctx->mqtt_index].ClientID, + "USER"); if (!memcmp(topic, event->topic, event->topic_len)) { - // ScreenDataHandler(event->data, event->data_len, ctx->mqtt_index); + //Here TODO registered user define received data handler ESP_LOGI(TAG, "Screen data handler on client %d", ctx->mqtt_index); } - - //end of SCREEN - break; case MQTT_EVENT_ERROR: ESP_LOGI(TAG, "MQTT_EVENT_ERROR, client %d", ctx->mqtt_index); @@ -271,47 +231,40 @@ void MQTTReconnect(void) static void MQTTPublish(mqtt_client_t *mqtt, DATA_SEND_STRUCT *DSS) { - char top[64]; + char topic[64]; //TODO need to define max topic length switch (DSS->dt) { + case PUBLISH_SYS_DATA: + ComposeTopic(topic, + GetSysConf()->mqttStation[mqtt->mqtt_index].RootTopic, + "UPLINK", + GetSysConf()->mqttStation[mqtt->mqtt_index].ClientID, + "SYSTEM"); + if (mqtt) + { + esp_mqtt_client_publish(mqtt->mqtt, (const char*) topic, (const char*) DSS->raw_data_ptr, DSS->data_lenth, 0, + 0); + ESP_LOGI(TAG, "TOPIC=%.*s", strlen(topic), topic); + } + else + ESP_LOGE(TAG, "MQTT client not initialized"); + break; - case PUBLISH_CONTROL_DATA: - ComposeTopicControl(top, GetSysConf()->mqttStation[mqtt->mqtt_index].RootTopic, - GetSysConf()->mqttStation[mqtt->mqtt_index].ClientID, - 1); - if (mqtt) - { - esp_mqtt_client_publish(mqtt->mqtt, (const char*) top, (const char*) DSS->raw_data_ptr, - DSS->data_lenth, - 0, - 0); - - ESP_LOGI(TAG, "TOPIC=%.*s", strlen(top), top); - } - else - ESP_LOGE(TAG, "MQTT client not initialized"); - - break; - - - case PUBLISH_SCREEN_DATA: - ComposeTopicScreen(top, GetSysConf()->mqttStation[mqtt->mqtt_index].RootTopic, - GetSysConf()->mqttStation[mqtt->mqtt_index].ClientID, - 1); - if (mqtt) - { - esp_mqtt_client_publish(mqtt->mqtt, (const char*) top, (const char*) DSS->raw_data_ptr, - DSS->data_lenth, - 0, - 0); - - ESP_LOGI(TAG, "TOPIC=%.*s", strlen(top), top); - } - else - ESP_LOGE(TAG, "MQTT client not initialized"); - - break; - + case PUBLISH_USER_DATA: + ComposeTopic(topic, + GetSysConf()->mqttStation[mqtt->mqtt_index].RootTopic, + "UPLINK", + GetSysConf()->mqttStation[mqtt->mqtt_index].ClientID, + "USER"); + if (mqtt) + { + esp_mqtt_client_publish(mqtt->mqtt, (const char*) topic, (const char*) DSS->raw_data_ptr, DSS->data_lenth, 0, + 0); + ESP_LOGI(TAG, "TOPIC=%.*s", strlen(topic), topic); + } + else + ESP_LOGE(TAG, "MQTT client not initialized"); + break; } } @@ -329,12 +282,12 @@ void MQTTTaskTransmit(void *pvParameter) MQTTPublish(&mqtt[idx], &DSS); switch (DSS.dt) { - case PUBLISH_SCREEN_DATA: + case PUBLISH_USER_DATA: xQueueReceive(mqtt[idx].mqtt_queue, &DSS, 0); free(DSS.raw_data_ptr); break; - case PUBLISH_CONTROL_DATA: + case PUBLISH_SYS_DATA: xQueueReceive(mqtt[idx].mqtt_queue, &DSS, 0); free(DSS.raw_data_ptr); break; diff --git a/src/MQTTSysHandler.c b/src/MQTTSysHandler.c index 0d2b634..14b3dc4 100644 --- a/src/MQTTSysHandler.c +++ b/src/MQTTSysHandler.c @@ -119,7 +119,7 @@ static mqtt_app_err_t ResponceWithError(int idx, { memcpy(buf, JSONErrorMess, strlen(JSONErrorMess)); DATA_SEND_STRUCT DSS; - DSS.dt = PUBLISH_CONTROL_DATA; + DSS.dt = PUBLISH_SYS_DATA; DSS.raw_data_ptr = buf; DSS.data_lenth = strlen(JSONErrorMess); if (xQueueSend(GetMQTTHandlesPool(idx)->mqtt_queue, &DSS, pdMS_TO_TICKS(1000)) == pdPASS) @@ -206,7 +206,7 @@ static mqtt_app_err_t ResponceWithFile(int idx, espfs_file_t *file, strcat((fdata + readBytes), tail); free(filebuf); DATA_SEND_STRUCT DSS; - DSS.dt = PUBLISH_CONTROL_DATA; + DSS.dt = PUBLISH_SYS_DATA; DSS.raw_data_ptr = outbuf; DSS.data_lenth = (fdata - outbuf) + readBytes + strlen(tail); if (xQueueSend(GetMQTTHandlesPool(idx)->mqtt_queue, &DSS, pdMS_TO_TICKS(1000)) == pdPASS)