data receive handler replaced by the more general and more flexible event handler

This commit is contained in:
Bogdan Pilyugin 2022-08-23 17:43:52 +02:00
parent fb0fc0a30a
commit 8914422aac
5 changed files with 61 additions and 73 deletions

27
Kconfig
View File

@ -551,43 +551,48 @@ menu "WebGuiApp configuration"
default y default y
if WEBGUIAPP_MQTT_ENABLE if WEBGUIAPP_MQTT_ENABLE
config MQTT_CLIENTS_NUM config WEBGUIAPP_MQTT_CLIENTS_NUM
int "Number of MQTT clients" int "Number of MQTT clients"
range 1 2 range 1 2
default 2 default 2
config MQTT_ON config WEBGUIAPP_MQTT_ON
bool "Enable MQTT client" bool "Enable MQTT client"
default y default y
config MQTT_SERVER_URL config WEBGUIAPP_MQTT_MAX_TOPIC_LENGTH
int "Max topic length"
range 32 512
default 64
config WEBGUIAPP_MQTT_SERVER_URL
string "MQTT server URL" string "MQTT server URL"
default "myfirstmqttserver.com" default "myfirstmqttserver.com"
config MQTT_SERVER_PORT config WEBGUIAPP_MQTT_SERVER_PORT
int "MQTT server port" int "MQTT server port"
range 1 65535 range 1 65535
default 1883 default 1883
config MQTT_CLIENT_ID_1 config WEBGUIAPP_MQTT_CLIENT_ID_1
string "MQTT_1 client ID" string "MQTT_1 client ID"
default "DEVID1" default "DEVID1"
if MQTT_CLIENTS_NUM > 1 if WEBGUIAPP_MQTT_CLIENTS_NUM > 1
config MQTT_CLIENT_ID_2 config WEBGUIAPP_MQTT_CLIENT_ID_2
string "MQTT_2 client ID" string "MQTT_2 client ID"
default "DEVID2" default "DEVID2"
endif endif
config MQTT_ROOT_TOPIC config WEBGUIAPP_MQTT_ROOT_TOPIC
string "MQTT root topic" string "MQTT root topic"
default "ROOTTOPIC" default "ROOTTOPIC"
config MQTT_USERNAME config WEBGUIAPP_MQTT_USERNAME
string "MQTT user name" string "MQTT user name"
default "username" default "username"
config MQTT_PASSWORD config WEBGUIAPP_MQTT_PASSWORD
string "MQTT user password" string "MQTT user password"
default "password" default "password"

View File

@ -79,8 +79,8 @@ typedef struct
mqtt_client_t* GetMQTTHandlesPool(int idx); mqtt_client_t* GetMQTTHandlesPool(int idx);
QueueHandle_t GetMQTTSendQueue(int idx); QueueHandle_t GetMQTTSendQueue(int idx);
void ComposeTopic(char *topic, char *system_name, char *direct, char *client_name, char *service_name);
void regUserDataHandler(void (*data_handler)(char *data, uint32_t len, int idx)); void regUserEventHandler(void (*event_handler)(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data));
void SystemDataHandler(char *data, uint32_t len, int idx); void SystemDataHandler(char *data, uint32_t len, int idx);
#endif /* MAIN_INCLUDE_MQTT_H_ */ #endif /* MAIN_INCLUDE_MQTT_H_ */

View File

@ -149,7 +149,7 @@
char b6 :1; char b6 :1;
char bIsGlobalEnabled :1; char bIsGlobalEnabled :1;
} Flags1; } Flags1;
} mqttStation[CONFIG_MQTT_CLIENTS_NUM]; } mqttStation[CONFIG_WEBGUIAPP_MQTT_CLIENTS_NUM];
#endif #endif
#if CONFIG_WEBGUIAPP_ETHERNET_ENABLE #if CONFIG_WEBGUIAPP_ETHERNET_ENABLE

View File

@ -37,17 +37,16 @@ static StaticQueue_t xStaticMQTT2MessagesQueue;
uint8_t MQTT1MessagesQueueStorageArea[CH_MESSAGE_BUFER_LENTH * sizeof(DATA_SEND_STRUCT)]; uint8_t MQTT1MessagesQueueStorageArea[CH_MESSAGE_BUFER_LENTH * sizeof(DATA_SEND_STRUCT)];
uint8_t MQTT2MessagesQueueStorageArea[CH_MESSAGE_BUFER_LENTH * sizeof(DATA_SEND_STRUCT)]; uint8_t MQTT2MessagesQueueStorageArea[CH_MESSAGE_BUFER_LENTH * sizeof(DATA_SEND_STRUCT)];
mqtt_client_t mqtt[CONFIG_MQTT_CLIENTS_NUM] = { 0 }; mqtt_client_t mqtt[CONFIG_WEBGUIAPP_MQTT_CLIENTS_NUM] = { 0 };
#define TAG "MQTTApp" #define TAG "MQTTApp"
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data); static void mqtt_system_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data);
void (*UserDataHandler)(char *data, uint32_t len, int idx); void (*UserEventHandler)(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data);
void regUserEventHandler(void (*event_handler)(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data))
void regUserDataHandler(void (*data_handler)(char *data, uint32_t len, int idx))
{ {
UserDataHandler = data_handler; UserEventHandler = event_handler;
} }
mqtt_client_t* GetMQTTHandlesPool(int idx) mqtt_client_t* GetMQTTHandlesPool(int idx)
@ -78,25 +77,25 @@ static void log_error_if_nonzero(const char *message, int error_code)
} }
} }
static void ComposeTopic(char *topic, char *system_name, char *direct, char *client_name, char *service_name) void ComposeTopic(char *topic, char *system_name, char *direct, char *client_name, char *service_name)
{ {
char tmp[4]; char tmp[4];
char dev_rom_id[8]; char dev_rom_id[8];
GetChipId((uint8_t*) tmp); GetChipId((uint8_t*) tmp);
BytesToStr((unsigned char*) tmp, (unsigned char*) dev_rom_id, 4); BytesToStr((unsigned char*) tmp, (unsigned char*) dev_rom_id, 4);
strcpy((char*) topic, system_name); // Global system name strcpy((char*) topic, system_name); // Global system name
strcat((char*) topic, "/"); strcat((char*) topic, "/");
strcat((char*) topic, direct); // Data direction UPLINK or DOWNLINK strcat((char*) topic, direct); // Data direction UPLINK or DOWNLINK
strcat((char*) topic, "/"); strcat((char*) topic, "/");
strcat((char*) topic, (const char*) dev_rom_id); // Unique device ID (based on ROM chip id) strcat((char*) topic, (const char*) dev_rom_id); // Unique device ID (based on ROM chip id)
strcat((char*) topic, "/"); strcat((char*) topic, "/");
strcat((char*) topic, client_name); // Device client name (for multiclient devices) strcat((char*) topic, client_name); // Device client name (for multiclient devices)
strcat((char*) topic, "/"); strcat((char*) topic, "/");
strcat((char*) topic, (const char*) service_name); // Device service name strcat((char*) topic, (const char*) service_name); // Device service name
} }
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) static void mqtt_system_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data)
{ {
xSemaphoreTake(xSemaphoreMQTTHandle, pdMS_TO_TICKS(1000)); xSemaphoreTake(xSemaphoreMQTTHandle, pdMS_TO_TICKS(1000));
ESP_LOGI(TAG, "Event dispatched from event loop base=%s, event_id=%d", base, event_id); ESP_LOGI(TAG, "Event dispatched from event loop base=%s, event_id=%d", base, event_id);
@ -106,7 +105,7 @@ static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_
int msg_id; int msg_id;
static int MQTTReconnectCounter = 0; //Change network adapter every MQTT_RECONNECT_CHANGE_ADAPTER number attempts static int MQTTReconnectCounter = 0; //Change network adapter every MQTT_RECONNECT_CHANGE_ADAPTER number attempts
char topic[64]; //TODO need define max topic length char topic[CONFIG_WEBGUIAPP_MQTT_MAX_TOPIC_LENGTH]; //TODO need define max topic length
switch ((esp_mqtt_event_id_t) event_id) switch ((esp_mqtt_event_id_t) event_id)
{ {
case MQTT_EVENT_CONNECTED: case MQTT_EVENT_CONNECTED:
@ -120,15 +119,6 @@ static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_
"SYSTEM"); "SYSTEM");
msg_id = esp_mqtt_client_subscribe(client, (const char*) topic, 0); msg_id = esp_mqtt_client_subscribe(client, (const char*) topic, 0);
ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id);
ComposeTopic(topic,
GetSysConf()->mqttStation[ctx->mqtt_index].RootTopic,
"DOWNLINK",
GetSysConf()->mqttStation[ctx->mqtt_index].ClientID,
"USER");
msg_id = esp_mqtt_client_subscribe(client, (const char*) topic, 0);
ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id);
break; break;
case MQTT_EVENT_DISCONNECTED: case MQTT_EVENT_DISCONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED client %d", ctx->mqtt_index); ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED client %d", ctx->mqtt_index);
@ -161,18 +151,6 @@ static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_
SystemDataHandler(event->data, event->data_len, ctx->mqtt_index); SystemDataHandler(event->data, event->data_len, ctx->mqtt_index);
ESP_LOGI(TAG, "Control data handler on client %d", ctx->mqtt_index); ESP_LOGI(TAG, "Control data handler on client %d", ctx->mqtt_index);
} }
//Check if topic is USER and pass data to handler
ComposeTopic(topic,
GetSysConf()->mqttStation[ctx->mqtt_index].RootTopic,
"DOWNLINK",
GetSysConf()->mqttStation[ctx->mqtt_index].ClientID,
"USER");
if (!memcmp(topic, event->topic, event->topic_len))
{
if (UserDataHandler != NULL)
UserDataHandler(event->data, event->data_len, ctx->mqtt_index);
ESP_LOGI(TAG, "Screen data handler on client %d", ctx->mqtt_index);
}
break; break;
case MQTT_EVENT_ERROR: case MQTT_EVENT_ERROR:
ESP_LOGI(TAG, "MQTT_EVENT_ERROR, client %d", ctx->mqtt_index); ESP_LOGI(TAG, "MQTT_EVENT_ERROR, client %d", ctx->mqtt_index);
@ -196,7 +174,7 @@ static void reconnect_MQTT_handler(void *arg, esp_event_base_t event_base,
int32_t event_id, int32_t event_id,
void *event_data) void *event_data)
{ {
for (int i = 0; i < CONFIG_MQTT_CLIENTS_NUM; ++i) for (int i = 0; i < CONFIG_WEBGUIAPP_MQTT_CLIENTS_NUM; ++i)
{ {
if (mqtt[i].mqtt) if (mqtt[i].mqtt)
{ {
@ -208,7 +186,7 @@ static void reconnect_MQTT_handler(void *arg, esp_event_base_t event_base,
void MQTTStart(void) void MQTTStart(void)
{ {
for (int i = 0; i < CONFIG_MQTT_CLIENTS_NUM; ++i) for (int i = 0; i < CONFIG_WEBGUIAPP_MQTT_CLIENTS_NUM; ++i)
{ {
if (mqtt[i].mqtt) if (mqtt[i].mqtt)
esp_mqtt_client_reconnect(mqtt[i].mqtt); esp_mqtt_client_reconnect(mqtt[i].mqtt);
@ -217,7 +195,7 @@ void MQTTStart(void)
void MQTTStop(void) void MQTTStop(void)
{ {
for (int i = 0; i < CONFIG_MQTT_CLIENTS_NUM; ++i) for (int i = 0; i < CONFIG_WEBGUIAPP_MQTT_CLIENTS_NUM; ++i)
{ {
if (mqtt[i].mqtt) if (mqtt[i].mqtt)
esp_mqtt_client_disconnect(mqtt[i].mqtt); esp_mqtt_client_disconnect(mqtt[i].mqtt);
@ -226,7 +204,7 @@ void MQTTStop(void)
void MQTTReconnect(void) void MQTTReconnect(void)
{ {
for (int i = 0; i < CONFIG_MQTT_CLIENTS_NUM; ++i) for (int i = 0; i < CONFIG_WEBGUIAPP_MQTT_CLIENTS_NUM; ++i)
{ {
if (mqtt[i].mqtt) if (mqtt[i].mqtt)
{ {
@ -309,7 +287,7 @@ static void start_mqtt()
char url[40]; char url[40];
char tmp[40]; char tmp[40];
for (int i = 0; i < CONFIG_MQTT_CLIENTS_NUM; ++i) for (int i = 0; i < CONFIG_WEBGUIAPP_MQTT_CLIENTS_NUM; ++i)
{ {
if (GetSysConf()->mqttStation[i].Flags1.bIsGlobalEnabled) if (GetSysConf()->mqttStation[i].Flags1.bIsGlobalEnabled)
{ {
@ -334,8 +312,9 @@ static void start_mqtt()
mqtt[i].mqtt_index = i; mqtt[i].mqtt_index = i;
mqtt_cfg.user_context = (void*) &mqtt[i]; mqtt_cfg.user_context = (void*) &mqtt[i];
mqtt[i].mqtt = esp_mqtt_client_init(&mqtt_cfg); mqtt[i].mqtt = esp_mqtt_client_init(&mqtt_cfg);
/* The last argument may be used to pass data to the event handler, in this example mqtt_event_handler */ /* The last argument may be used to pass data to the event handler, in this example mqtt_system_event_handler */
esp_mqtt_client_register_event(mqtt[i].mqtt, ESP_EVENT_ANY_ID, mqtt_event_handler, &mqtt[i].mqtt); esp_mqtt_client_register_event(mqtt[i].mqtt, ESP_EVENT_ANY_ID, mqtt_system_event_handler, &mqtt[i].mqtt);
esp_mqtt_client_register_event(mqtt[i].mqtt, ESP_EVENT_ANY_ID, UserEventHandler, &mqtt[i].mqtt);
esp_mqtt_client_start(mqtt[i].mqtt); esp_mqtt_client_start(mqtt[i].mqtt);
xTaskCreate(MQTTTaskTransmit, "MQTTTaskTransmit", 1024 * 4, (void*) &mqtt[i].mqtt_index, 3, NULL); xTaskCreate(MQTTTaskTransmit, "MQTTTaskTransmit", 1024 * 4, (void*) &mqtt[i].mqtt_index, 3, NULL);
} }
@ -348,20 +327,24 @@ void MQTTRun(void)
xSemaphoreGive(xSemaphoreMQTTHandle); xSemaphoreGive(xSemaphoreMQTTHandle);
MQTT1MessagesQueueHandle = NULL; MQTT1MessagesQueueHandle = NULL;
MQTT2MessagesQueueHandle = NULL;
if (GetSysConf()->mqttStation[0].Flags1.bIsGlobalEnabled) if (GetSysConf()->mqttStation[0].Flags1.bIsGlobalEnabled)
MQTT1MessagesQueueHandle = xQueueCreateStatic(CH_MESSAGE_BUFER_LENTH, MQTT1MessagesQueueHandle = xQueueCreateStatic(CH_MESSAGE_BUFER_LENTH,
sizeof(DATA_SEND_STRUCT), sizeof(DATA_SEND_STRUCT),
MQTT1MessagesQueueStorageArea, MQTT1MessagesQueueStorageArea,
&xStaticMQTT1MessagesQueue); &xStaticMQTT1MessagesQueue);
MQTT2MessagesQueueHandle = NULL; mqtt[0].mqtt_queue = MQTT1MessagesQueueHandle;
#if CONFIG_WEBGUIAPP_MQTT_CLIENTS_NUM == 2
if (GetSysConf()->mqttStation[1].Flags1.bIsGlobalEnabled) if (GetSysConf()->mqttStation[1].Flags1.bIsGlobalEnabled)
MQTT2MessagesQueueHandle = xQueueCreateStatic(CH_MESSAGE_BUFER_LENTH, MQTT2MessagesQueueHandle = xQueueCreateStatic(CH_MESSAGE_BUFER_LENTH,
sizeof(DATA_SEND_STRUCT), sizeof(DATA_SEND_STRUCT),
MQTT2MessagesQueueStorageArea, MQTT2MessagesQueueStorageArea,
&xStaticMQTT2MessagesQueue); &xStaticMQTT2MessagesQueue);
mqtt[0].mqtt_queue = MQTT1MessagesQueueHandle;
mqtt[1].mqtt_queue = MQTT2MessagesQueueHandle; mqtt[1].mqtt_queue = MQTT2MessagesQueueHandle;
#endif
start_mqtt(); start_mqtt();
} }

View File

@ -314,21 +314,21 @@ static void ResetSysConfig(SYS_CONFIG *Conf)
#endif #endif
#if CONFIG_WEBGUIAPP_MQTT_ENABLE #if CONFIG_WEBGUIAPP_MQTT_ENABLE
Conf->mqttStation[0].Flags1.bIsGlobalEnabled = CONFIG_MQTT_ON; Conf->mqttStation[0].Flags1.bIsGlobalEnabled = CONFIG_WEBGUIAPP_MQTT_ON;
memcpy(Conf->mqttStation[0].ServerAddr, CONFIG_MQTT_SERVER_URL, sizeof(CONFIG_MQTT_SERVER_URL)); memcpy(Conf->mqttStation[0].ServerAddr, CONFIG_WEBGUIAPP_MQTT_SERVER_URL, sizeof(CONFIG_WEBGUIAPP_MQTT_SERVER_URL));
Conf->mqttStation[0].ServerPort = CONFIG_MQTT_SERVER_PORT; Conf->mqttStation[0].ServerPort = CONFIG_WEBGUIAPP_MQTT_SERVER_PORT;
memcpy(Conf->mqttStation[0].ClientID, CONFIG_MQTT_CLIENT_ID_1, sizeof(CONFIG_MQTT_CLIENT_ID_1)); memcpy(Conf->mqttStation[0].ClientID, CONFIG_WEBGUIAPP_MQTT_CLIENT_ID_1, sizeof(CONFIG_WEBGUIAPP_MQTT_CLIENT_ID_1));
memcpy(Conf->mqttStation[0].RootTopic, CONFIG_MQTT_ROOT_TOPIC, sizeof(CONFIG_MQTT_ROOT_TOPIC)); memcpy(Conf->mqttStation[0].RootTopic, CONFIG_WEBGUIAPP_MQTT_ROOT_TOPIC, sizeof(CONFIG_WEBGUIAPP_MQTT_ROOT_TOPIC));
memcpy(Conf->mqttStation[0].UserName, CONFIG_MQTT_USERNAME, sizeof(CONFIG_MQTT_USERNAME)); memcpy(Conf->mqttStation[0].UserName, CONFIG_WEBGUIAPP_MQTT_USERNAME, sizeof(CONFIG_WEBGUIAPP_MQTT_USERNAME));
memcpy(Conf->mqttStation[0].UserPass, CONFIG_MQTT_PASSWORD, sizeof(CONFIG_MQTT_PASSWORD)); memcpy(Conf->mqttStation[0].UserPass, CONFIG_WEBGUIAPP_MQTT_PASSWORD, sizeof(CONFIG_WEBGUIAPP_MQTT_PASSWORD));
#if CONFIG_MQTT_CLIENTS_NUM == 2 #if CONFIG_MQTT_CLIENTS_NUM == 2
Conf->mqttStation[1].Flags1.bIsGlobalEnabled = CONFIG_MQTT_ON; Conf->mqttStation[1].Flags1.bIsGlobalEnabled = CONFIG_WEBGUIAPP_MQTT_ON;
memcpy(Conf->mqttStation[1].ServerAddr, CONFIG_MQTT_SERVER_URL, sizeof(CONFIG_MQTT_SERVER_URL)); memcpy(Conf->mqttStation[1].ServerAddr, CONFIG_WEBGUIAPP_MQTT_SERVER_URL, sizeof(CONFIG_WEBGUIAPP_MQTT_SERVER_URL));
Conf->mqttStation[1].ServerPort = CONFIG_MQTT_SERVER_PORT; Conf->mqttStation[1].ServerPort = CONFIG_WEBGUIAPP_MQTT_SERVER_PORT;
memcpy(Conf->mqttStation[1].ClientID, CONFIG_MQTT_CLIENT_ID_2, sizeof(CONFIG_MQTT_CLIENT_ID_2)); memcpy(Conf->mqttStation[1].ClientID, CONFIG_WEBGUIAPP_MQTT_CLIENT_ID_2, sizeof(CONFIG_WEBGUIAPP_MQTT_CLIENT_ID_2));
memcpy(Conf->mqttStation[1].RootTopic, CONFIG_MQTT_ROOT_TOPIC, sizeof(CONFIG_MQTT_ROOT_TOPIC)); memcpy(Conf->mqttStation[1].RootTopic, CONFIG_WEBGUIAPP_MQTT_ROOT_TOPIC, sizeof(CONFIG_WEBGUIAPP_MQTT_ROOT_TOPIC));
memcpy(Conf->mqttStation[1].UserName, CONFIG_MQTT_USERNAME, sizeof(CONFIG_MQTT_USERNAME)); memcpy(Conf->mqttStation[1].UserName, CONFIG_WEBGUIAPP_MQTT_USERNAME, sizeof(CONFIG_WEBGUIAPP_MQTT_USERNAME));
memcpy(Conf->mqttStation[1].UserPass, CONFIG_MQTT_PASSWORD, sizeof(CONFIG_MQTT_PASSWORD)); memcpy(Conf->mqttStation[1].UserPass, CONFIG_WEBGUIAPP_MQTT_PASSWORD, sizeof(CONFIG_WEBGUIAPP_MQTT_PASSWORD));
#endif #endif
#endif #endif
GetChipId(Conf->imei); GetChipId(Conf->imei);