#include #include #include #include #include #include #include #include #include #include #include #include #include #include "settings.hpp" #include "mqtt_client.hpp" #include "t3hs_proto.hpp" #include "utils.hpp" #include "static_settings.hpp" // on SMP systems this should be under mutex locks cv::Mat bgr, gray; std::unique_ptr params; std::unique_ptr mqtt; int px_in_frame; std::string usage(const std::string&); std::string parse_cli(int, char**); void mjpeg_stream(float, int, FrameMode); void on_mqtt_message(mosquitto*, void*, const mosquitto_message*); int main(int argc, char* argv[]) { // read config file path from cli args std::string cfg_path; try { cfg_path = parse_cli(argc, argv); } catch (const std::runtime_error& e) { std::cerr << "ERROR: Cannot parse CLI args: " << e.what() << std::endl; std::cerr << usage(argv[0]) << std::endl; return 1; } // get settings from config file try { params = std::make_unique(cfg_path); } catch (const std::exception& e) { std::cerr << "ERROR: Cannot parse settings from file: " << e.what() << "\n"; return 1; } std::cout << "CONFIG:" << std::endl << params->to_string() << std::endl; // initialize mqtt client try { mqtt = std::make_unique( params->mqtt_client.host, params->mqtt_client.port, params->mqtt_client.user, params->mqtt_client.password, params->mqtt_client.sign_secret ); } catch (const std::exception& e) { std::cerr << "ERROR: Cannot create mqtt client: " << e.what() << "\n"; return 1; } mqtt->set_message_callback(on_mqtt_message); if (!mqtt->subscribe(params->mqtt_frame_exporter.listen_topic)) { std::cerr << "ERROR: Cannot subscribe on topic '" << params->mqtt_frame_exporter.listen_topic << "', exiting" << std::endl; return 1; } // Camera init cv::VideoCapture cap; cap.set(cv::CAP_PROP_FRAME_WIDTH, params->frame.width); cap.set(cv::CAP_PROP_FRAME_HEIGHT, params->frame.height); cap.open(V4L2_DEVICE_NUM); // "Warmup" camera while (bgr.total() * bgr.elemSize() == 0) { cap >> bgr; std::this_thread::sleep_for(std::chrono::milliseconds(100)); } // QR scanner init zbar::ImageScanner scanner; std::unordered_map qr_appears; if (params->qr_scanner.enabled) { scanner.set_config(zbar::ZBAR_NONE, zbar::ZBAR_CFG_ENABLE, 1); scanner.set_config(zbar::ZBAR_QRCODE, zbar::ZBAR_CFG_ENABLE, 1); } // MJPEG stream init std::thread mjpeg_thread; if (params->mjpeg.mode != FrameMode::DISABLE) { mjpeg_thread = std::thread(mjpeg_stream, params->mjpeg.fps, params->mjpeg.port, params->mjpeg.mode); } long long last_frame = millis_timestamp(); float frame_time = 1000.0 / params->frame.fps; px_in_frame = params->frame.width * params->frame.height; while (true) { // capture a frame cap >> bgr; if (bgr.empty()) { std::cerr << "ERROR: Captured frame is empty" << std::endl; break; } // convert to grayscale if (params->qr_scanner.enabled || params->mjpeg.mode == FrameMode::GRAYSCALE) { cv::cvtColor(bgr, gray, cv::COLOR_BGR2GRAY); } long long now = millis_timestamp(); if (params->qr_scanner.enabled) { zbar::Image zbar_image( params->frame.width, params->frame.height, "Y800", gray.data, px_in_frame ); int n = scanner.scan(zbar_image); if (n > 0) { std::cout << "INFO: Found " << n << " QRs:" << std::endl; std::vector qrs_to_send; for ( zbar::Image::SymbolIterator symbol = zbar_image.symbol_begin(); symbol != zbar_image.symbol_end(); ++symbol ) { std::string data = symbol->get_data(); auto appear = qr_appears.find(data); if (appear == qr_appears.end()) { qr_appears.insert({data, now}); std::cout << " [x] " << data << std::endl; qrs_to_send.emplace_back(data); } else { std::cout << " [ ] " << data << std::endl; } } if (!qrs_to_send.empty()) { t3hs::QrMessage msg = { .srcid = params->mqtt_client.id, .dstid = params->qr_scanner.dstid, .msgid = t3hs::gen_id(), .timestamp = micros_timestamp(), .qrs = std::move(qrs_to_send) }; nlohmann::json qr_json = std::move(msg); if (!mqtt->send_message(params->qr_scanner.send_topic, qr_json)) { std::cerr << "ERROR: Cannot send MQTT message" << std::endl; exit(1); } else { std::cout << "INFO: Sent QR data" << std::endl; } } } auto it = qr_appears.begin(); while (it != qr_appears.end()) { if (it->second + params->qr_scanner.rescan_timeout <= now) { it = qr_appears.erase(it); } else { ++it; } } } long long frame_duration = now - last_frame; long long remaining_frame_time = static_cast(std::min(frame_time - frame_duration, 0.0f)); std::this_thread::sleep_for(std::chrono::milliseconds(remaining_frame_time)); last_frame = now; } cap.release(); return 0; } std::string usage(const std::string& basename) { std::ostringstream oss; oss << "Usage: " << basename << " []"; return oss.str(); } std::string parse_cli(int argc, char* argv[]) { std::string cfg_path; if (argc == 1) { std::cout << "INFO: Using default config file: " << DEFAULT_CONFIG_PATH << std::endl; cfg_path = DEFAULT_CONFIG_PATH; } else if (argc == 2) { cfg_path = argv[1]; if (file_exists(cfg_path)) { std::cout << "INFO: Using config file '" << cfg_path << "'" << std::endl; } else { std::cerr << "ERROR: Config file '" << cfg_path << "' does not exist or is not accessible. Using default config in " << DEFAULT_CONFIG_PATH << std::endl; cfg_path = DEFAULT_CONFIG_PATH; } } else { throw std::runtime_error("Invalid CLI args"); } return cfg_path; } void mjpeg_stream(float fps, int port, FrameMode mode) { cv::VideoWriter http; http.open("httpjpg", port); std::cout << "INFO: MJPEG thread started" << std::endl; long long last_frame = millis_timestamp(); float frame_time = 1000.0f; while (true) { if (!http.isOpened()) { std::cerr << "ERROR: MJPEG writer failed" << std::endl; return; } if (mode == FrameMode::COLOR) { http << bgr; } else { http << gray; } long long now = millis_timestamp(); long long frame_duration = now - last_frame; long long remaining_frame_time = static_cast(std::min(frame_time - frame_duration, 0.0f)); std::this_thread::sleep_for(std::chrono::milliseconds(remaining_frame_time)); last_frame = now; } } void on_mqtt_message(mosquitto *, void *, const mosquitto_message *msg) { // validating incomming message if (!msg->payload || msg->payloadlen == 0) { std::cerr << "WARNING: Received MQTT empty message" << std::endl;; return; } nlohmann::json msg_json; try { const char* char_data = static_cast(msg->payload); std::string_view json_str(char_data, msg->payloadlen); msg_json = nlohmann::json::parse(json_str); } catch (const nlohmann::json::parse_error& e) { std::cerr << "WARNING: JSON parse error: " << e.what() << std::endl; return; } catch (const nlohmann::json::type_error& e) { std::cerr << "WARNING: JSON type error: " << e.what() << std::endl; return; } catch (const nlohmann::json::out_of_range& e) { std::cerr << "WARNING: JSON out of range: " << e.what() << std::endl; return; } catch (const std::exception& e) { std::cerr << "WARNING: JSON error: " << e.what() << std::endl; return; } t3hs::Request req; try { req = std::move(msg_json); } catch (const std::invalid_argument& e) { std::cerr << "WARNING: JSON format is incorrect: " << e.what() << std::endl; return; } // skip message if not for this device if (req.dstid != params->mqtt_client.id) { return; } const nlohmann::json& req_vars = req.variables; if (req_vars.find("quality") == req_vars.end() || !req_vars["quality"].is_number_integer()) { std::cerr << "WARNING: JSON format is incorrect: Field 'quality' in 'variables' is missing or not an integer" << std::endl; return; } else if (req_vars["quality"] < 1 || req_vars["quality"] > 100) { std::cerr << "WARNING: Field 'quality' in 'variables' is expected to be between 1 and 100%" << std::endl; return; } if (req_vars.find("mode") == req_vars.end() || !req_vars["mode"].is_string()) { std::cerr << "WARNING: JSON format is incorrect: Field 'mode' in 'variables' is missing or not a string" << std::endl; return; } else if (req_vars["mode"] != "color" && req_vars["mode"] != "grayscale") { std::cerr << "WARNING: Field 'mode' in 'variables' is expected to be equal 'color' or 'grayscale'" << std::endl; return; } unsigned int quality = req_vars["quality"].get(); FrameMode mode = FrameModeUtils::from_string(req_vars["mode"].get()); // obtain fresh grayscale frame if needed if (mode == FrameMode::GRAYSCALE && !(params->qr_scanner.enabled || params->mjpeg.mode == FrameMode::GRAYSCALE)) { cv::cvtColor(bgr, gray, cv::COLOR_BGR2GRAY); } // preapre jpeg data std::vector jpeg; if (mode == FrameMode::COLOR) { jpeg = frame_to_jpeg(bgr, quality); } else { jpeg = frame_to_jpeg(gray, quality); } // split frame into chunks and send answers std::vector blocks = t3hs::FileBlock::from_raw(jpeg, t3hs::FileBlockOpertype::WRITE, DEFAULT_MQTT_FRAME_EXPORTER_MAX_BLOCK_SIZE, "frame"); long long now = micros_timestamp(); for (const auto& block : blocks) { nlohmann::json block_json = std::move(block); nlohmann::json resp_vars = { {"quality", quality}, {"mode", FrameModeUtils::to_string(mode)}, {"file_block", std::move(block_json)} }; if (req_vars.find("exposure") != req_vars.end()) { std::string pname = "exposure"; auto [min, max, value] = get_camera_param(pname); resp_vars["exposure"] = { {"min", min}, {"max", max}, {"value", value}, }; } if (req_vars.find("analogue_gain") != req_vars.end()) { std::string pname = "analogue_gain"; auto [min, max, value] = get_camera_param(pname); resp_vars["analogue_gain"] = { {"min", min}, {"max", max}, {"value", value}, }; } t3hs::Response resp = { .srcid = params->mqtt_client.id, .dstid = params->mqtt_frame_exporter.dstid, .msgid = t3hs::gen_id(), .timestamp = now, .variables = std::move(resp_vars) }; nlohmann::json resp_json = std::move(resp); if (!mqtt->send_message(params->mqtt_frame_exporter.send_topic, resp_json)) { std::cerr << "ERROR: Cannot send MQTT message" << std::endl; exit(1); } else { std::cout << "INFO: Sent frame part" << std::endl; } } }