From 8f9eacb8c2425a68a1ab8510d3a48e032187b31b Mon Sep 17 00:00:00 2001 From: "Querejeta Lomas, Leire" <leire.querejeta@tecnalia.com> Date: Tue, 30 Aug 2022 09:41:02 +0200 Subject: [PATCH] Subir nuevo archivo --- src/class_server/src/class_server.cpp | 540 ++++++++++++++++++++++++++ 1 file changed, 540 insertions(+) create mode 100644 src/class_server/src/class_server.cpp diff --git a/src/class_server/src/class_server.cpp b/src/class_server/src/class_server.cpp new file mode 100644 index 0000000..85011a9 --- /dev/null +++ b/src/class_server/src/class_server.cpp @@ -0,0 +1,540 @@ +/** + * @file class_server.cpp + * @author Alfonso Dominguez <alfonso.dominguez@tecnalia.com> + * @date 2020 + * + * Copyright 2020 Tecnalia Research & Innovation. + * Distributed under the GNU GPL v3. + * For full terms see https://www.gnu.org/licenses/gpl.txt + * + * @brief Class for server which connects to CLASS device and waits for client connections + */ + +#include <class_server/class_server.hpp> +#include <iostream> +#include <regex> +#include <typeinfo> +#include <stdio.h> +#include <string> +#include "../../commands.h" + +using namespace std; + +ClassServer::ClassServer() +{ + log_("Constructor"); + turn_off_ = false; + continue_serial_ = true; + continue_udp_ = true; +} + +ClassServer::~ClassServer() +{ + log_("Destructor"); + if (udp_server_ != nullptr) + { + delete udp_server_; + } + + if (udp_send_thread_ != nullptr) + { + delete udp_send_thread_; + } + + if (serial_ != nullptr) + { + delete serial_; + } + + if (serial_send_thread_ != nullptr) + { + delete serial_send_thread_; + } + + if (wait_thread_ != nullptr) + { + delete wait_thread_; + } + + for (unsigned int i = 0; i < udp_clients_.size(); i++) + { + if (udp_clients_[i] != nullptr) + { + delete udp_clients_[i]; + } + } + + // myfile_.close(); +} + +int ClassServer::start(const std::string &port, const int &listening_port, boost::asio::io_service *io_service) +{ + io_service_ = io_service; + if (connectToSerial(port) != 0) + { + log_error_stream_("[start] Error connecting to CLASS"); + return -1; + } + else + { + log_stream_("[start] Connected to CLASS"); + } + + if (startUDPServer(listening_port) != 0) + { + log_error_stream_("[start] Error starting UDP server"); + return -1; + } + else + { + log_stream_("[start] UDP server is listening"); + } + + wait_thread_ = new std::thread(&ClassServer::waitThreadFunction, this); + wait_thread_->detach(); + + // myfile_.open ("msgs.txt"); + + return 0; +} + +int ClassServer::stop() +{ + disconnectFromSerial(); + stopUDPServer(); + io_service_->stop(); + return 0; +} + +int ClassServer::connectToSerial(const std::string &port) +{ + serial_ = new AsyncSerial(port, 115200); + // starts a thread for sending msgs to CLASS each 100ms + serial_->addListener((SerialListener *)this); + + serial_send_thread_ = new std::thread(&ClassServer::serialSendThreadFunction, this); + serial_send_thread_->detach(); + + return 0; +} + +int ClassServer::disconnectFromSerial() +{ + if (serial_ != nullptr) + { + log_("[disconnectFromSerial] close serial"); + serial_->close(); + delete serial_; + serial_ = nullptr; + log_("[disconnectFromSerial] serial closed"); + } + + if (serial_send_thread_ != nullptr) + { + delete serial_send_thread_; + serial_send_thread_ = nullptr; + } + + turn_off_mutex_.lock(); + continue_serial_ = false; + turn_off_mutex_.unlock(); + + return 0; +} + +void ClassServer::serialMsgReceived(std::string msg) +{ + //log_stream_("[serialMsgReceived] Msg received from CLASS: " << msg) + // myfile_ << msg; + + if (msg.find("turnning off") != std::string::npos) + { + log_("[serialMsgReceived] Device turned OFF"); + turn_off_mutex_.lock(); + turn_off_ = true; + turn_off_mutex_.unlock(); + } + msgs_from_class_mutex_.lock(); + msgs_from_class_.push_back(msg); + msgs_from_class_mutex_.unlock(); + + if(msg.find("tic") != string::npos) + { + log_stream_("[serialMsgReceived] Msg received from CLASS: " << msg) + } +} + +void ClassServer::waitThreadFunction() +{ + bool continueLoop = true; + while (continueLoop) + { + turn_off_mutex_.lock(); + continueLoop = continue_udp_ || continue_serial_; + turn_off_mutex_.unlock(); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + log_("[waitThreadFunction] Thread stopped. Stop external service"); + io_service_->stop(); +} + +void ClassServer::serialSendThreadFunction() +{ + bool continueLoop = true; + while (continueLoop) + { + turn_off_mutex_.lock(); + continueLoop = !turn_off_; + turn_off_mutex_.unlock(); + + bool is_empty; + msgs_to_class_mutex_.lock(); + is_empty = msgs_to_class_.empty(); + msgs_to_class_mutex_.unlock(); + + if (!is_empty) + { + // send the first msg in queue + std::string msg; + msgs_to_class_mutex_.lock(); + msg = msgs_to_class_[0]; + commands::ClassCommands d; + log_stream_("[serialSendThreadFunction] Msg sent to CLASS device : " << d.Log(msg)); + msgs_to_class_.erase(msgs_to_class_.begin()); + msgs_to_class_mutex_.unlock(); + serial_->writeString(msg); + std::string msg_without_ending = msg; + replace(msg_without_ending, "\r\n", "\\r\\n"); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + else + { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + } + + log_("[serialSendThreadFunction] Thread stopped. Disconnect from serial"); + disconnectFromSerial(); +} + +int ClassServer::startUDPServer(const int &listening_port) +{ + if (udp_server_ != nullptr) + { + log_("[startUDPServer] There is already a UDP server listening"); + } + + log_stream_("[startUDPServer] Starting UDP server on port " << listening_port); + udp_server_ = new UDPServer(listening_port); + udp_server_->startListening(); + udp_server_->addListener((UdpServerListener *)this); + + // starts a thread for sending msgs to clients + udp_send_thread_ = new std::thread(&ClassServer::udpSendThreadFunction, this); + udp_send_thread_->detach(); + + return 0; +} + +int ClassServer::stopUDPServer() +{ + if (udp_server_ != nullptr) + { + log_("[stopUDPServer] Stopping UDP server"); + udp_server_->removeListener((UdpServerListener *)this); + udp_server_->stopListening(); + delete udp_server_; + udp_server_ = nullptr; + turn_off_mutex_.lock(); + continue_udp_ = false; + turn_off_mutex_.unlock(); + log_("[stopUDPServer] UDP server stopped"); + } + + return 0; +} + +void ClassServer::udpMsgReceived(std::string msg, std::string ip) +{ + // parse message from client + if (msg.find("flush") != std::string::npos) + { + log_stream_("[udpMsgReceived] Message received from client (IP: " << ip << "): Flush"); + // flush async_serial buffer + if (serial_ != nullptr) + { + serial_->flush(); + } + // flush internal buffer + msgs_from_class_mutex_.lock(); + msgs_from_class_.clear(); + msgs_from_class_mutex_.unlock(); + } + else if (msg.find("disconnect") != std::string::npos) + { + // extract port where the client was listenening + std::regex regex("\\ "); + std::vector<std::string> out( + std::sregex_token_iterator(msg.begin(), msg.end(), regex, -1), + std::sregex_token_iterator()); + if (out.size() != 2) + { + return; + } + int client_port = 0; + try + { + client_port = std::stoi(out[1]); + } + catch (const std::exception &e) + { + log_error_stream_("[udpMsgReceived] disconnect command must be followed by an integer (port in which the client listens) but (" << out[1] << ") has been provided. Exc:" << e.what()); + return; + } + + if (udp_mutex_.try_lock()) + { + for (unsigned int i = 0; i < udp_clients_.size(); i++) + { + if (udp_clients_[i]->getIP() == ip && udp_clients_[i]->getPort() == client_port) + { + log_stream_("[udpMsgReceived] Client (IP: " << ip << ", port: " << client_port << ") is not listening anymore"); + delete udp_clients_[i]; + udp_clients_.erase(udp_clients_.begin() + i); + break; + } + } + udp_mutex_.unlock(); + } + } + else if (msg.find("connect") != std::string::npos) + { + // extract port where the client will listen + // extract port + std::regex regex("\\ "); + std::vector<std::string> out( + std::sregex_token_iterator(msg.begin(), msg.end(), regex, -1), + std::sregex_token_iterator()); + if (out.size() != 2) + { + return; + } + int client_port = 0; + try + { + client_port = std::stoi(out[1]); + } + catch (const std::exception &e) + { + log_error_stream_("[udpMsgReceived] connect command must be followed by an iteger (port in which the client listens) but (" << out[1] << ") has been provided. Exc:" << e.what()); + return; + } + + if (udp_mutex_.try_lock()) + { + bool found = false; + for (unsigned int i = 0; i < udp_clients_.size(); i++) + { + if (udp_clients_[i]->getIP() == ip && udp_clients_[i]->getPort() == client_port) + { + log_stream_("[udpMsgReceived] There is a client already listening (IP: " << ip << ", port: " << client_port << ")"); + found = true; + break; + } + } + + if (!found) + { + log_stream_("[udpMsgReceived] New client is listening (IP: " << ip << ", port: " << client_port << ")"); + udp_client_ips_.push_back(ip); + udp_clients_.push_back(new UDPClient(ip, out[1], -1)); + } + udp_mutex_.unlock(); + } + } + else + { + // send msg to CLASS_DEVICE + std::vector<string>::iterator it = std::find(udp_client_ips_.begin(), udp_client_ips_.end(), ip); + if (it == udp_client_ips_.end()) + { + log_warn_stream_("[udpMsgReceived] Client (IP: " << ip << ") is not connected"); + } + else + { + // std::string msg_without_ending = msg; + // replace(msg_without_ending, "\r\n", "\\r\\n"); + // log_stream_("[udpMsgReceived] Message received from client (IP: " << ip << "): " << msg_without_ending); + msgs_to_class_mutex_.lock(); + msgs_to_class_.push_back(msg); + msgs_to_class_mutex_.unlock(); + } + } +} + +bool ClassServer::replace(std::string &str, const std::string &from, const std::string &to) +{ + size_t start_pos = str.find(from); + while (start_pos != std::string::npos) + { + str.replace(start_pos, from.length(), to); + start_pos = str.find(from); + } + return true; +} + +std::string ClassServer::trim(string str) +{ + size_t first = str.find_first_not_of(' '); + if (string::npos == first) + { + return ""; + } + size_t last = str.find_last_not_of(' '); + return str.substr(first, (last - first + 1)); +} + +void ClassServer::udpSendThreadFunction() +{ + bool continueLoop = true; + while (continueLoop) + { + turn_off_mutex_.lock(); + continueLoop = !turn_off_; + turn_off_mutex_.unlock(); + + bool is_empty; + msgs_from_class_mutex_.lock(); + is_empty = msgs_from_class_.empty(); + msgs_from_class_mutex_.unlock(); + + if (!is_empty) + { + if (!udp_clients_.empty()) + // send the first msg in queue + { + std::vector<std::string> msgs; + msgs_from_class_mutex_.lock(); + msgs = msgs_from_class_; + msgs_from_class_.clear(); + msgs_from_class_mutex_.unlock(); + + if (!msgs.empty()) + { + if (udp_mutex_.try_lock()) + { + for (unsigned int i = 0; i < msgs.size(); i++) + { + for (unsigned int j = 0; j < udp_clients_.size(); j++) + { + msgs[i].erase(std::remove(msgs[i].begin(), msgs[i].end(), '\n'), msgs[i].end()); + + msgs[i].erase(std::remove(msgs[i].begin(), msgs[i].end(), '\r'), msgs[i].end()); + trim(msgs[i]); + + if (msgs[i].find("mac ") != std::string::npos){ + //log_stream_("mac text found"); + continue; + } + + if (!msgs[i].empty()) + { + //log_stream_("[SENDING ---->>>>] " << msgs[i]); + udp_clients_[j]->send(msgs[i]); + } + + // if (!msgs[i].empty()) + // { + // udp_clients_[j]->send(msgs[i]); + // } + } + } + udp_mutex_.unlock(); + } + // else + // { + // log_warn_stream_("[udpSendThreadFunction] udp_mutex_ not get amd msgs pending: " << msgs[0]); + // } + } + } + } + + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + log_("[udpSendThreadFunction] Thread stopped. UDP server stopped"); + stopUDPServer(); +} + +/*logging*/ + +void ClassServer::log_(const std::string &text) +{ + LOG_INFO << "[ClassServer]: " << text; +} + +void ClassServer::log_(std::ostream &text) +{ + LOG_INFO << "[ClassServer]: " << text.rdbuf(); +} +void ClassServer::log_(const std::stringstream &text) +{ + LOG_INFO << "[ClassServer]: " << text.str(); +} +// see http://www.cplusplus.com/forum/unices/36461/ +// see https://docs.microsoft.com/en-us/windows/console/setconsoletextattribute + +#ifdef __unix__ +const std::string bold_red("\033[1;31m"); +const std::string bold_yellow("\033[1;33m"); +const std::string reset("\033[0m"); +#endif + +void ClassServer::log_err_(const std::string &text) +{ +#ifdef __unix__ + LOG_ERROR << bold_red << "[ClassServer]: " << text << reset; +#elif defined(_WIN32) || defined(WIN32) + HANDLE hConsole = GetStdHandle(STD_OUTPUT_HANDLE); + SetConsoleTextAttribute(hConsole, 12); + LOG_ERROR << "[ClassServer]: " << text; + SetConsoleTextAttribute(hConsole, 7); +#endif +} + +void ClassServer::log_err_(std::ostream &text) +{ +#ifdef __unix__ + LOG_ERROR << bold_red << "[ClassServer]: " << text.rdbuf() << reset; +#elif defined(_WIN32) || defined(WIN32) + HANDLE hConsole = GetStdHandle(STD_OUTPUT_HANDLE); + SetConsoleTextAttribute(hConsole, 12); + LOG_ERROR << "[ClassServer]: " << text.rdbuf(); + SetConsoleTextAttribute(hConsole, 7); +#endif +} + +void ClassServer::log_warn_(const std::string &text) +{ +#ifdef __unix__ + LOG_WARNING << bold_yellow << "[ClassServer]: " << text << reset; +#elif defined(_WIN32) || defined(WIN32) + HANDLE hConsole = GetStdHandle(STD_OUTPUT_HANDLE); + SetConsoleTextAttribute(hConsole, 14); + LOG_WARNING << "[ClassServer]: " << text; + SetConsoleTextAttribute(hConsole, 7); +#endif +} + +void ClassServer::log_warn_(std::ostream &text) +{ +#ifdef __unix__ + LOG_WARNING << bold_yellow << "[ClassServer]: " << text.rdbuf() << reset; +#elif defined(_WIN32) || defined(WIN32) + HANDLE hConsole = GetStdHandle(STD_OUTPUT_HANDLE); + SetConsoleTextAttribute(hConsole, 14); + LOG_WARNING << "[ClassServer]: " << text.rdbuf(); + SetConsoleTextAttribute(hConsole, 7); +#endif +} \ No newline at end of file -- GitLab