759 lines
24 KiB
C
759 lines
24 KiB
C
/* Copyright 2022 Bogdan Pilyugin
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*
|
|
* File name: MQTT.c
|
|
* Project: ChargePointMainboard
|
|
* Created on: 2022-07-21
|
|
* Author: Bogdan Pilyugin
|
|
* Description:
|
|
*/
|
|
#include "esp_log.h"
|
|
#include "Helpers.h"
|
|
#include "SystemConfiguration.h"
|
|
#include "jWrite.h"
|
|
#include "jRead.h"
|
|
#include "NetTransport.h"
|
|
#include "HTTPServer.h"
|
|
#include "romfs.h"
|
|
#include "MQTT.h"
|
|
|
|
#define CH_MESSAGE_BUFER_LENTH 32 //size of mqtt queue
|
|
|
|
#define MESSAGE_LENGTH 32 //base message length, mainly depended by radio requirements
|
|
#define MAX_JSON_MESSAGE 256 //max size of mqtt message to publish
|
|
#define MAX_FILE_PUBLISH 4096 //bufer for mqtt data publish
|
|
#define MAX_DYNVAR_LENTH 64
|
|
#define MAX_FILENAME_LENTH 15
|
|
#define MAX_ERROR_MESSAGE 32
|
|
#define MAX_ERROR_JSON 256
|
|
#define MAX_MESSAGE_ID 15
|
|
|
|
#define MAX_POST_DATA_LENTH 512
|
|
#define MQTT_RECONNECT_CHANGE_ADAPTER 3
|
|
|
|
#if CONFIG_WEBGUIAPP_MQTT_ENABLE
|
|
|
|
static SemaphoreHandle_t xSemaphoreMQTTHandle = NULL;
|
|
static StaticSemaphore_t xSemaphoreMQTTBuf;
|
|
static StaticQueue_t xStaticMQTT1MessagesQueue;
|
|
static StaticQueue_t xStaticMQTT2MessagesQueue;
|
|
uint8_t MQTT1MessagesQueueStorageArea[CH_MESSAGE_BUFER_LENTH * sizeof(DATA_SEND_STRUCT)];
|
|
uint8_t MQTT2MessagesQueueStorageArea[CH_MESSAGE_BUFER_LENTH * sizeof(DATA_SEND_STRUCT)];
|
|
|
|
mqtt_client_t mqtt[CONFIG_MQTT_CLIENTS_NUM] = { 0 };
|
|
|
|
|
|
|
|
const char apiver[] = "2.0";
|
|
const char tagGet[] = "GET";
|
|
const char tagPost[] = "POST";
|
|
|
|
#define TAG "MQTTApp"
|
|
|
|
const char* mqtt_app_err_descr[] = {
|
|
"Operation OK",
|
|
"Internal error",
|
|
"Wrong json format",
|
|
"Key 'idmess' not found",
|
|
"Key 'idmess' value too long",
|
|
"Key 'api' not found",
|
|
"API version not supported",
|
|
"Key 'request' not found",
|
|
"Unsupported HTTP method",
|
|
"Key 'url' not found",
|
|
"Key 'url' value too long",
|
|
"URL not found",
|
|
"Key 'postdata' not found",
|
|
"Key 'postdata' too long",
|
|
"File size too big",
|
|
"File is empty",
|
|
"Unknown error"
|
|
};
|
|
|
|
const char* mqtt_app_err_breef[] = {
|
|
"OK",
|
|
"INTERNAL_ERR",
|
|
"WRONG_JSON_ERR",
|
|
"NO_ID_ERR",
|
|
"ID_OVERSIZE_ERR",
|
|
"NO_API_ERR",
|
|
"VERSION_ERR",
|
|
"NO_REQUEST_ERR",
|
|
"UNSUPPORTED_METHOD_ERR",
|
|
"NO_URL_ERR",
|
|
"URL_OVERSIZE_ERR",
|
|
"URL_NOT_FOUND_ERR",
|
|
"NO_POSTDATA_ERR",
|
|
"POSTDATA_OVERSIZE_ERR",
|
|
"FILE_OVERSIZE_ERR",
|
|
"FILE_EMPTY_ERR",
|
|
"UNKNOWN_ERR"
|
|
};
|
|
|
|
static void ControlDataHandler(char *data, uint32_t len, int idx);
|
|
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data);
|
|
|
|
mqtt_client_t* GetMQTTHandlesPool(int idx)
|
|
{
|
|
return &mqtt[idx];
|
|
}
|
|
|
|
QueueHandle_t GetMQTTSendQueue(int idx)
|
|
{
|
|
return mqtt[idx].mqtt_queue;
|
|
}
|
|
|
|
bool GetMQTT1Connected(void)
|
|
{
|
|
return mqtt[0].is_connected;
|
|
}
|
|
|
|
bool GetMQTT2Connected(void)
|
|
{
|
|
return mqtt[1].is_connected;
|
|
}
|
|
|
|
static void log_error_if_nonzero(const char *message, int error_code)
|
|
{
|
|
if (error_code != 0)
|
|
{
|
|
ESP_LOGE(TAG, "Last error %s: 0x%x", message, error_code);
|
|
}
|
|
}
|
|
|
|
static const char topic_tx[] = "/UPLINK/"; //subtopic for transmit
|
|
static const char topic_rx[] = "/DOWNLINK/"; //subtopic to receive
|
|
|
|
static void ComposeTopic(char *topic, char *system_name, char* direct, char *client_name, char *service_name)
|
|
{
|
|
char tmp[4];
|
|
char dev_rom_id[8];
|
|
GetChipId((uint8_t*) tmp);
|
|
BytesToStr((unsigned char*) tmp, (unsigned char*) dev_rom_id, 4);
|
|
strcpy((char*) topic, system_name); // Global system name
|
|
strcat((char*) topic, "/");
|
|
strcpy((char*) topic, direct); // Data direction UPLINK or DOWNLINK
|
|
strcat((char*) topic, "/");
|
|
strcat((char*) topic, (const char*) dev_rom_id); // Unique device ID (based on ROM chip id)
|
|
strcat((char*) topic, "/");
|
|
strcat((char*) topic, client_name); // Device client name (for multiclient devices)
|
|
strcat((char*) topic, "/");
|
|
strcat((char*) topic, (const char*) service_name); // Device service name
|
|
}
|
|
|
|
|
|
static void ComposeTopicControl(char *topic, char *roottopic, char *ident, uint8_t dir)
|
|
{
|
|
char id[4];
|
|
char id2[8];
|
|
strcpy((char*) topic, roottopic);
|
|
if (dir == 0)
|
|
strcat((char*) topic, topic_rx);
|
|
else
|
|
strcat((char*) topic, topic_tx);
|
|
GetChipId((uint8_t*) id);
|
|
BytesToStr((unsigned char*) id, (unsigned char*) id2, 4);
|
|
strcat((char*) topic, (const char*) id2);
|
|
strcat((char*) topic, "/");
|
|
strcat((char*) topic, ident);
|
|
strcat((char*) topic, (const char*) "/CONTROL");
|
|
}
|
|
|
|
static void ComposeTopicScreen(char *topic, char *roottopic, char *ident, uint8_t dir)
|
|
{
|
|
char id[4];
|
|
char id2[8];
|
|
strcpy((char*) topic, roottopic);
|
|
if (dir == 0)
|
|
strcat((char*) topic, topic_rx);
|
|
else
|
|
strcat((char*) topic, topic_tx);
|
|
GetChipId((uint8_t*) id);
|
|
BytesToStr((unsigned char*) id, (unsigned char*) id2, 4);
|
|
strcat((char*) topic, (const char*) id2);
|
|
strcat((char*) topic, "/");
|
|
strcat((char*) topic, ident);
|
|
strcat((char*) topic, (const char*) "/SCREEN");
|
|
}
|
|
|
|
typedef enum
|
|
{
|
|
UNSUPPORTED = 0,
|
|
GET,
|
|
POST
|
|
} mqtt_api_request_t;
|
|
|
|
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data)
|
|
{
|
|
xSemaphoreTake(xSemaphoreMQTTHandle, pdMS_TO_TICKS(1000));
|
|
ESP_LOGI(TAG, "Event dispatched from event loop base=%s, event_id=%d", base, event_id);
|
|
esp_mqtt_event_handle_t event = event_data;
|
|
esp_mqtt_client_handle_t client = event->client;
|
|
mqtt_client_t *ctx = (mqtt_client_t*) event->user_context;
|
|
|
|
int msg_id;
|
|
static int MQTTReconnectCounter = 0; //Change network adapter every MQTT_RECONNECT_CHANGE_ADAPTER number attempts
|
|
switch ((esp_mqtt_event_id_t) event_id)
|
|
{
|
|
case MQTT_EVENT_CONNECTED:
|
|
ctx->is_connected = true;
|
|
MQTTReconnectCounter = 0;
|
|
ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED client %d", ctx->mqtt_index);
|
|
char sub[64];
|
|
|
|
char *system_name = GetSysConf()->mqttStation[ctx->mqtt_index].RootTopic;
|
|
char *client_name = GetSysConf()->mqttStation[ctx->mqtt_index].ClientID;
|
|
char direction[] = "DOWNLINK";
|
|
|
|
ComposeTopic(sub, system_name, direction, client_name, "CONTROL");
|
|
msg_id = esp_mqtt_client_subscribe(client, (const char*) sub, 0);
|
|
ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id);
|
|
|
|
ComposeTopic(sub, system_name, direction, client_name, "DATA");
|
|
msg_id = esp_mqtt_client_subscribe(client, (const char*) sub, 0);
|
|
ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id);
|
|
|
|
break;
|
|
case MQTT_EVENT_DISCONNECTED:
|
|
ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED client %d", ctx->mqtt_index);
|
|
if (++MQTTReconnectCounter > MQTT_RECONNECT_CHANGE_ADAPTER)
|
|
{
|
|
MQTTReconnectCounter = 0;
|
|
NextDefaultNetIF();
|
|
}
|
|
ctx->is_connected = false;
|
|
break;
|
|
case MQTT_EVENT_SUBSCRIBED:
|
|
ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
|
|
break;
|
|
case MQTT_EVENT_UNSUBSCRIBED:
|
|
ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
|
|
break;
|
|
case MQTT_EVENT_PUBLISHED:
|
|
ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id);
|
|
break;
|
|
case MQTT_EVENT_DATA:
|
|
ESP_LOGI(TAG, "MQTT_EVENT_DATA, client %d", ctx->mqtt_index);
|
|
char topic[64];
|
|
//Check if topic is CONTROL and pass data to handler
|
|
ComposeTopicControl(topic, GetSysConf()->mqttStation[ctx->mqtt_index].RootTopic,
|
|
GetSysConf()->mqttStation[ctx->mqtt_index].ClientID,
|
|
0);
|
|
if (!memcmp(topic, event->topic, event->topic_len))
|
|
{
|
|
ControlDataHandler(event->data, event->data_len, ctx->mqtt_index);
|
|
ESP_LOGI(TAG, "Control data handler on client %d", ctx->mqtt_index);
|
|
|
|
}
|
|
//end of CONTROL
|
|
|
|
//Check if topic is SCREEN and pass data to handler
|
|
ComposeTopicScreen(topic, GetSysConf()->mqttStation[ctx->mqtt_index].RootTopic,
|
|
GetSysConf()->mqttStation[ctx->mqtt_index].ClientID,
|
|
0);
|
|
if (!memcmp(topic, event->topic, event->topic_len))
|
|
{
|
|
// ScreenDataHandler(event->data, event->data_len, ctx->mqtt_index);
|
|
ESP_LOGI(TAG, "Screen data handler on client %d", ctx->mqtt_index);
|
|
}
|
|
|
|
//end of SCREEN
|
|
|
|
break;
|
|
case MQTT_EVENT_ERROR:
|
|
ESP_LOGI(TAG, "MQTT_EVENT_ERROR, client %d", ctx->mqtt_index);
|
|
if (event->error_handle->error_type == MQTT_ERROR_TYPE_TCP_TRANSPORT)
|
|
{
|
|
log_error_if_nonzero("reported from esp-tls", event->error_handle->esp_tls_last_esp_err);
|
|
log_error_if_nonzero("reported from tls stack", event->error_handle->esp_tls_stack_err);
|
|
log_error_if_nonzero("captured as transport's socket errno",
|
|
event->error_handle->esp_transport_sock_errno);
|
|
ESP_LOGI(TAG, "Last errno string (%s)", strerror(event->error_handle->esp_transport_sock_errno));
|
|
}
|
|
break;
|
|
default:
|
|
ESP_LOGI(TAG, "Other event id:%d", event->event_id);
|
|
break;
|
|
}
|
|
xSemaphoreGive(xSemaphoreMQTTHandle);
|
|
}
|
|
|
|
static void reconnect_MQTT_handler(void *arg, esp_event_base_t event_base,
|
|
int32_t event_id,
|
|
void *event_data)
|
|
{
|
|
for (int i = 0; i < CONFIG_MQTT_CLIENTS_NUM; ++i)
|
|
{
|
|
if (mqtt[i].mqtt)
|
|
{
|
|
esp_mqtt_client_disconnect(mqtt[i].mqtt);
|
|
esp_mqtt_client_reconnect(mqtt[i].mqtt);
|
|
}
|
|
}
|
|
}
|
|
|
|
static mqtt_app_err_t ResponceWithError(int idx,
|
|
espfs_file_t *file,
|
|
char *id,
|
|
char *url,
|
|
mqtt_app_err_t api_err)
|
|
{
|
|
espfs_fclose(file);
|
|
char JSONErrorMess[MAX_ERROR_JSON];
|
|
jwOpen(JSONErrorMess, MAX_ERROR_JSON, JW_OBJECT, JW_PRETTY);
|
|
if (id[0])
|
|
jwObj_string("messid", (char*) id);
|
|
else
|
|
jwObj_string("messid", "ERROR");
|
|
jwObj_string("api", (char*) apiver);
|
|
jwObj_string("responce", (char*) mqtt_app_err_breef[api_err]);
|
|
jwObj_string("descript", (char*) mqtt_app_err_descr[api_err]);
|
|
if (url[0])
|
|
jwObj_string("url", (char*) url);
|
|
else
|
|
jwObj_string("url", "ERROR");
|
|
jwEnd();
|
|
jwClose();
|
|
char *buf = (char*) malloc(strlen(JSONErrorMess) + 1);
|
|
if (buf)
|
|
{
|
|
memcpy(buf, JSONErrorMess, strlen(JSONErrorMess));
|
|
DATA_SEND_STRUCT DSS;
|
|
DSS.dt = PUBLISH_CONTROL_DATA;
|
|
DSS.raw_data_ptr = buf;
|
|
DSS.data_lenth = strlen(JSONErrorMess);
|
|
if (xQueueSend(mqtt[idx].mqtt_queue, &DSS, pdMS_TO_TICKS(1000)) == pdPASS)
|
|
return API_OK;
|
|
else
|
|
{
|
|
free(buf);
|
|
return API_INTERNAL_ERR;
|
|
}
|
|
}
|
|
else
|
|
{ // ERR internal error on publish error
|
|
return API_INTERNAL_ERR;
|
|
}
|
|
}
|
|
|
|
static mqtt_app_err_t ResponceWithFile(int idx, espfs_file_t *file,
|
|
char *id,
|
|
char *url)
|
|
{
|
|
struct espfs_stat_t stat;
|
|
char *filebuf = NULL;
|
|
char *outbuf = NULL;
|
|
uint32_t fileLenth, filePtr, readBytes;
|
|
espfs_fstat(file, &stat);
|
|
fileLenth = stat.size;
|
|
mqtt_app_err_t api_err = API_UNKNOWN_ERR;
|
|
if (fileLenth > MAX_FILE_PUBLISH)
|
|
{
|
|
ESP_LOGE(TAG, "File is too big");
|
|
api_err = API_FILE_OVERSIZE_ERR;
|
|
goto file_send_err;
|
|
}
|
|
outbuf = (char*) malloc(MAX_FILE_PUBLISH + 256);
|
|
filebuf = (char*) malloc(fileLenth);
|
|
|
|
if (!outbuf || !filebuf)
|
|
{
|
|
ESP_LOGE(TAG, "Failed to allocate memory");
|
|
api_err = API_INTERNAL_ERR;
|
|
goto file_send_err;
|
|
}
|
|
|
|
if (espfs_fread(file, filebuf, fileLenth) == 0)
|
|
{
|
|
ESP_LOGE(TAG, "Read no bytes from file");
|
|
api_err = API_FILE_EMPTY_ERR;
|
|
goto file_send_err;
|
|
}
|
|
espfs_fclose(file);
|
|
|
|
jwOpen(outbuf, MAX_FILE_PUBLISH, JW_OBJECT, JW_PRETTY);
|
|
jwObj_string("messid", (char*) id);
|
|
jwObj_string("api", (char*) apiver);
|
|
jwObj_string("responce", (char*) mqtt_app_err_breef[API_OK]);
|
|
jwObj_string("descript", (char*) mqtt_app_err_descr[API_OK]);
|
|
jwObj_string("url", (char*) url);
|
|
jwObj_string("body", "bodydata");
|
|
jwEnd();
|
|
jwClose();
|
|
|
|
char *fdata = memmem(outbuf, MAX_FILE_PUBLISH, "\"bodydata\"", strlen("\"bodydata\""));
|
|
filePtr = 0;
|
|
readBytes = 0;
|
|
while (filePtr < fileLenth && readBytes < (MAX_FILE_PUBLISH - MAX_DYNVAR_LENTH))
|
|
{
|
|
if (filebuf[filePtr] == '~')
|
|
{
|
|
int k = 0;
|
|
char DynVarName[16];
|
|
while (filePtr < fileLenth && k < 16 && (filebuf[++filePtr] != '~'))
|
|
DynVarName[k++] = filebuf[filePtr];
|
|
if (filebuf[filePtr] == '~')
|
|
{ //got valid dynamic variable name
|
|
DynVarName[k] = 0x00;
|
|
readBytes += HTTPPrint(NULL, &fdata[readBytes], DynVarName);
|
|
filePtr++;
|
|
}
|
|
}
|
|
else
|
|
fdata[readBytes++] = filebuf[filePtr++];
|
|
}
|
|
const char tail[] = "}";
|
|
strcat((fdata + readBytes), tail);
|
|
free(filebuf);
|
|
DATA_SEND_STRUCT DSS;
|
|
DSS.dt = PUBLISH_CONTROL_DATA;
|
|
DSS.raw_data_ptr = outbuf;
|
|
DSS.data_lenth = (fdata - outbuf) + readBytes + strlen(tail);
|
|
if (xQueueSend(mqtt[idx].mqtt_queue, &DSS, pdMS_TO_TICKS(1000)) == pdPASS)
|
|
return API_OK;
|
|
else
|
|
{
|
|
ESP_LOGE(TAG, "Failed to write mqtt queue");
|
|
api_err = API_INTERNAL_ERR;
|
|
goto file_send_err;
|
|
}
|
|
file_send_err:
|
|
free(outbuf);
|
|
free(filebuf);
|
|
return api_err;
|
|
}
|
|
|
|
static void ControlDataHandler(char *data, uint32_t len, int idx)
|
|
{
|
|
struct jReadElement result;
|
|
char URL[MAX_FILENAME_LENTH + 1];
|
|
char ID[MAX_MESSAGE_ID + 1];
|
|
char POST_DATA[MAX_POST_DATA_LENTH + 1];
|
|
mqtt_api_request_t req = UNSUPPORTED;
|
|
mqtt_app_err_t api_err = API_UNKNOWN_ERR;
|
|
ID[0] = 0x00;
|
|
URL[0] = 0x00;
|
|
espfs_file_t *file = NULL;
|
|
|
|
jRead(data, "", &result);
|
|
if (result.dataType == JREAD_OBJECT)
|
|
{
|
|
/*Checking and get ID message*/
|
|
jRead(data, "{'messid'", &result);
|
|
if (result.elements == 1)
|
|
{
|
|
if (result.bytelen > MAX_MESSAGE_ID)
|
|
{
|
|
api_err = API_ID_OVERSIZE_ERR;
|
|
goto api_json_err;
|
|
}
|
|
memcpy(ID, result.pValue, result.bytelen);
|
|
ID[result.bytelen] = 0x00;
|
|
}
|
|
else
|
|
{
|
|
api_err = API_NO_ID_ERR;
|
|
goto api_json_err;
|
|
}
|
|
|
|
/*Checking and get API version*/
|
|
jRead(data, "{'api'", &result);
|
|
if (result.elements == 1)
|
|
{
|
|
if (memcmp(apiver, result.pValue, result.bytelen))
|
|
{
|
|
api_err = API_VERSION_ERR;
|
|
goto api_json_err;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
api_err = API_NO_API_ERR;
|
|
goto api_json_err;
|
|
}
|
|
|
|
/*Checking and get request type POST, GET or UNSUPPORTED*/
|
|
jRead(data, "{'request'", &result);
|
|
if (result.elements == 1)
|
|
{
|
|
if (!memcmp(tagGet, result.pValue, result.bytelen))
|
|
req = GET;
|
|
else if (!memcmp(tagPost, result.pValue, result.bytelen))
|
|
req = POST;
|
|
}
|
|
else
|
|
{
|
|
api_err = API_NO_REQUEST_ERR;
|
|
goto api_json_err;
|
|
}
|
|
|
|
/*Checking and get url*/
|
|
jRead(data, "{'url'", &result);
|
|
if (result.elements == 1)
|
|
{
|
|
if (result.bytelen > MAX_FILENAME_LENTH)
|
|
{
|
|
api_err = API_URL_OVERSIZE_ERR;
|
|
goto api_json_err;
|
|
}
|
|
memcpy(URL, result.pValue, result.bytelen);
|
|
URL[result.bytelen] = 0x00;
|
|
file = espfs_fopen(fs, URL);
|
|
if (!file)
|
|
{
|
|
api_err = API_URL_NOT_FOUND_ERR;
|
|
goto api_json_err;
|
|
}
|
|
|
|
}
|
|
else
|
|
{
|
|
api_err = API_NO_URL_ERR;
|
|
goto api_json_err;
|
|
}
|
|
|
|
if (req == POST)
|
|
{
|
|
/*Checking and get POST DATA*/
|
|
jRead(data, "{'postdata'", &result);
|
|
if (result.elements == 1)
|
|
{
|
|
if (result.bytelen > MAX_POST_DATA_LENTH)
|
|
{
|
|
api_err = API_POSTDATA_OVERSIZE_ERR;
|
|
goto api_json_err;
|
|
}
|
|
memcpy(POST_DATA, result.pValue, result.bytelen);
|
|
POST_DATA[result.bytelen] = 0x00;
|
|
}
|
|
else
|
|
{
|
|
api_err = API_NO_POSTDATA_ERR;
|
|
goto api_json_err;
|
|
}
|
|
|
|
HTTPPostApp(NULL, URL, POST_DATA);
|
|
|
|
jRead(data, "{'reload'", &result);
|
|
if (result.elements == 1 && !memcmp("true", result.pValue, result.bytelen))
|
|
{
|
|
if (ResponceWithFile(idx, file, ID, URL) == API_OK)
|
|
return;
|
|
else
|
|
goto api_json_err;
|
|
}
|
|
else
|
|
{
|
|
if (ResponceWithError(idx, file, ID, URL, API_OK) != API_OK)
|
|
ESP_LOGE(TAG, "Failed to allocate memory for file MQTT message");
|
|
}
|
|
return;
|
|
}
|
|
else if (req == GET)
|
|
{
|
|
//Here GET handler, send file wrapped into json
|
|
if (ResponceWithFile(idx, file, ID, URL) == API_OK)
|
|
return;
|
|
else
|
|
goto api_json_err;
|
|
}
|
|
else
|
|
{
|
|
api_err = API_UNSUPPORTED_METHOD_ERR;
|
|
goto api_json_err;
|
|
}
|
|
}
|
|
else
|
|
{ //ERR no json format
|
|
api_err = API_WRONG_JSON_ERR;
|
|
goto api_json_err;
|
|
}
|
|
|
|
api_json_err:
|
|
ESP_LOGE(TAG, "ERROR:%s:%s", mqtt_app_err_breef[api_err], mqtt_app_err_descr[api_err]);
|
|
if (ResponceWithError(idx, file, ID, URL, api_err) != API_OK)
|
|
ESP_LOGE(TAG, "Failed to allocate memory for file MQTT message");
|
|
}
|
|
|
|
void MQTTStart(void)
|
|
{
|
|
for (int i = 0; i < CONFIG_MQTT_CLIENTS_NUM; ++i)
|
|
{
|
|
if (mqtt[i].mqtt)
|
|
esp_mqtt_client_reconnect(mqtt[i].mqtt);
|
|
}
|
|
}
|
|
|
|
void MQTTStop(void)
|
|
{
|
|
for (int i = 0; i < CONFIG_MQTT_CLIENTS_NUM; ++i)
|
|
{
|
|
if (mqtt[i].mqtt)
|
|
esp_mqtt_client_disconnect(mqtt[i].mqtt);
|
|
}
|
|
}
|
|
|
|
void MQTTReconnect(void)
|
|
{
|
|
for (int i = 0; i < CONFIG_MQTT_CLIENTS_NUM; ++i)
|
|
{
|
|
if (mqtt[i].mqtt)
|
|
{
|
|
esp_mqtt_client_disconnect(mqtt[i].mqtt);
|
|
esp_mqtt_client_reconnect(mqtt[i].mqtt);
|
|
}
|
|
}
|
|
}
|
|
|
|
static void MQTTPublish(mqtt_client_t *mqtt, DATA_SEND_STRUCT *DSS)
|
|
{
|
|
char top[64];
|
|
switch (DSS->dt)
|
|
{
|
|
|
|
case PUBLISH_CONTROL_DATA:
|
|
ComposeTopicControl(top, GetSysConf()->mqttStation[mqtt->mqtt_index].RootTopic,
|
|
GetSysConf()->mqttStation[mqtt->mqtt_index].ClientID,
|
|
1);
|
|
if (mqtt)
|
|
{
|
|
esp_mqtt_client_publish(mqtt->mqtt, (const char*) top, (const char*) DSS->raw_data_ptr,
|
|
DSS->data_lenth,
|
|
0,
|
|
0);
|
|
|
|
ESP_LOGI(TAG, "TOPIC=%.*s", strlen(top), top);
|
|
}
|
|
else
|
|
ESP_LOGE(TAG, "MQTT client not initialized");
|
|
|
|
break;
|
|
|
|
|
|
case PUBLISH_SCREEN_DATA:
|
|
ComposeTopicScreen(top, GetSysConf()->mqttStation[mqtt->mqtt_index].RootTopic,
|
|
GetSysConf()->mqttStation[mqtt->mqtt_index].ClientID,
|
|
1);
|
|
if (mqtt)
|
|
{
|
|
esp_mqtt_client_publish(mqtt->mqtt, (const char*) top, (const char*) DSS->raw_data_ptr,
|
|
DSS->data_lenth,
|
|
0,
|
|
0);
|
|
|
|
ESP_LOGI(TAG, "TOPIC=%.*s", strlen(top), top);
|
|
}
|
|
else
|
|
ESP_LOGE(TAG, "MQTT client not initialized");
|
|
|
|
break;
|
|
|
|
}
|
|
}
|
|
|
|
void MQTTTaskTransmit(void *pvParameter)
|
|
{
|
|
DATA_SEND_STRUCT DSS;
|
|
int idx = *(int*) pvParameter;
|
|
while (!mqtt[idx].mqtt_queue)
|
|
vTaskDelay(pdMS_TO_TICKS(300)); //wait for MQTT queue ready
|
|
while (1)
|
|
{
|
|
while (!mqtt[idx].is_connected)
|
|
vTaskDelay(pdMS_TO_TICKS(1000));
|
|
xQueuePeek(mqtt[idx].mqtt_queue, &DSS, portMAX_DELAY);
|
|
MQTTPublish(&mqtt[idx], &DSS);
|
|
switch (DSS.dt)
|
|
{
|
|
case PUBLISH_SCREEN_DATA:
|
|
xQueueReceive(mqtt[idx].mqtt_queue, &DSS, 0);
|
|
free(DSS.raw_data_ptr);
|
|
break;
|
|
|
|
case PUBLISH_CONTROL_DATA:
|
|
xQueueReceive(mqtt[idx].mqtt_queue, &DSS, 0);
|
|
free(DSS.raw_data_ptr);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
static void start_mqtt()
|
|
{
|
|
esp_mqtt_client_config_t mqtt_cfg = { 0 };
|
|
|
|
char url[40];
|
|
char tmp[40];
|
|
|
|
for (int i = 0; i < CONFIG_MQTT_CLIENTS_NUM; ++i)
|
|
{
|
|
if (GetSysConf()->mqttStation[i].Flags1.bIsGlobalEnabled)
|
|
{
|
|
esp_event_handler_register(IP_EVENT, IP_EVENT_STA_GOT_IP, &reconnect_MQTT_handler, &mqtt[i].mqtt);
|
|
esp_event_handler_register(IP_EVENT, IP_EVENT_ETH_GOT_IP, &reconnect_MQTT_handler, &mqtt[i].mqtt);
|
|
esp_event_handler_register(IP_EVENT, IP_EVENT_PPP_GOT_IP, &reconnect_MQTT_handler, &mqtt[i].mqtt);
|
|
|
|
strcpy(url, "mqtt://");
|
|
strcat(url, GetSysConf()->mqttStation[i].ServerAddr);
|
|
itoa(GetSysConf()->mqttStation[i].ServerPort, tmp, 10);
|
|
strcat(url, ":");
|
|
strcat(url, tmp);
|
|
mqtt_cfg.uri = url;
|
|
mqtt_cfg.username = GetSysConf()->mqttStation[i].UserName;
|
|
mqtt_cfg.password = GetSysConf()->mqttStation[i].UserPass;
|
|
strcpy(tmp, GetSysConf()->mqttStation[i].ClientID);
|
|
strcat(tmp, "_");
|
|
int len = strlen(tmp);
|
|
BytesToStr((unsigned char*) &GetSysConf()->imei, (unsigned char*) &tmp[len], 4);
|
|
mqtt_cfg.client_id = tmp;
|
|
mqtt[i].is_connected = false;
|
|
mqtt[i].mqtt_index = i;
|
|
mqtt_cfg.user_context = (void*) &mqtt[i];
|
|
mqtt[i].mqtt = esp_mqtt_client_init(&mqtt_cfg);
|
|
/* The last argument may be used to pass data to the event handler, in this example mqtt_event_handler */
|
|
esp_mqtt_client_register_event(mqtt[i].mqtt, ESP_EVENT_ANY_ID, mqtt_event_handler, &mqtt[i].mqtt);
|
|
esp_mqtt_client_start(mqtt[i].mqtt);
|
|
xTaskCreate(MQTTTaskTransmit, "MQTTTaskTransmit", 1024 * 4, (void*) &mqtt[i].mqtt_index, 3, NULL);
|
|
}
|
|
}
|
|
}
|
|
|
|
void MQTTRun(void)
|
|
{
|
|
xSemaphoreMQTTHandle = xSemaphoreCreateBinaryStatic(&xSemaphoreMQTTBuf);
|
|
xSemaphoreGive(xSemaphoreMQTTHandle);
|
|
|
|
MQTT1MessagesQueueHandle = NULL;
|
|
if (GetSysConf()->mqttStation[0].Flags1.bIsGlobalEnabled)
|
|
MQTT1MessagesQueueHandle = xQueueCreateStatic(CH_MESSAGE_BUFER_LENTH,
|
|
sizeof(DATA_SEND_STRUCT),
|
|
MQTT1MessagesQueueStorageArea,
|
|
&xStaticMQTT1MessagesQueue);
|
|
MQTT2MessagesQueueHandle = NULL;
|
|
if (GetSysConf()->mqttStation[1].Flags1.bIsGlobalEnabled)
|
|
MQTT2MessagesQueueHandle = xQueueCreateStatic(CH_MESSAGE_BUFER_LENTH,
|
|
sizeof(DATA_SEND_STRUCT),
|
|
MQTT2MessagesQueueStorageArea,
|
|
&xStaticMQTT2MessagesQueue);
|
|
|
|
mqtt[0].mqtt_queue = MQTT1MessagesQueueHandle;
|
|
mqtt[1].mqtt_queue = MQTT2MessagesQueueHandle;
|
|
|
|
|
|
|
|
start_mqtt();
|
|
}
|
|
|
|
#endif
|