Skip to content
Snippets Groups Projects
Commit 8f9eacb8 authored by Querejeta Lomas, Leire's avatar Querejeta Lomas, Leire
Browse files

Subir nuevo archivo

parent 3b8da250
No related branches found
No related tags found
No related merge requests found
/**
* @file class_server.cpp
* @author Alfonso Dominguez <alfonso.dominguez@tecnalia.com>
* @date 2020
*
* Copyright 2020 Tecnalia Research & Innovation.
* Distributed under the GNU GPL v3.
* For full terms see https://www.gnu.org/licenses/gpl.txt
*
* @brief Class for server which connects to CLASS device and waits for client connections
*/
#include <class_server/class_server.hpp>
#include <iostream>
#include <regex>
#include <typeinfo>
#include <stdio.h>
#include <string>
#include "../../commands.h"
using namespace std;
ClassServer::ClassServer()
{
log_("Constructor");
turn_off_ = false;
continue_serial_ = true;
continue_udp_ = true;
}
ClassServer::~ClassServer()
{
log_("Destructor");
if (udp_server_ != nullptr)
{
delete udp_server_;
}
if (udp_send_thread_ != nullptr)
{
delete udp_send_thread_;
}
if (serial_ != nullptr)
{
delete serial_;
}
if (serial_send_thread_ != nullptr)
{
delete serial_send_thread_;
}
if (wait_thread_ != nullptr)
{
delete wait_thread_;
}
for (unsigned int i = 0; i < udp_clients_.size(); i++)
{
if (udp_clients_[i] != nullptr)
{
delete udp_clients_[i];
}
}
// myfile_.close();
}
int ClassServer::start(const std::string &port, const int &listening_port, boost::asio::io_service *io_service)
{
io_service_ = io_service;
if (connectToSerial(port) != 0)
{
log_error_stream_("[start] Error connecting to CLASS");
return -1;
}
else
{
log_stream_("[start] Connected to CLASS");
}
if (startUDPServer(listening_port) != 0)
{
log_error_stream_("[start] Error starting UDP server");
return -1;
}
else
{
log_stream_("[start] UDP server is listening");
}
wait_thread_ = new std::thread(&ClassServer::waitThreadFunction, this);
wait_thread_->detach();
// myfile_.open ("msgs.txt");
return 0;
}
int ClassServer::stop()
{
disconnectFromSerial();
stopUDPServer();
io_service_->stop();
return 0;
}
int ClassServer::connectToSerial(const std::string &port)
{
serial_ = new AsyncSerial(port, 115200);
// starts a thread for sending msgs to CLASS each 100ms
serial_->addListener((SerialListener *)this);
serial_send_thread_ = new std::thread(&ClassServer::serialSendThreadFunction, this);
serial_send_thread_->detach();
return 0;
}
int ClassServer::disconnectFromSerial()
{
if (serial_ != nullptr)
{
log_("[disconnectFromSerial] close serial");
serial_->close();
delete serial_;
serial_ = nullptr;
log_("[disconnectFromSerial] serial closed");
}
if (serial_send_thread_ != nullptr)
{
delete serial_send_thread_;
serial_send_thread_ = nullptr;
}
turn_off_mutex_.lock();
continue_serial_ = false;
turn_off_mutex_.unlock();
return 0;
}
void ClassServer::serialMsgReceived(std::string msg)
{
//log_stream_("[serialMsgReceived] Msg received from CLASS: " << msg)
// myfile_ << msg;
if (msg.find("turnning off") != std::string::npos)
{
log_("[serialMsgReceived] Device turned OFF");
turn_off_mutex_.lock();
turn_off_ = true;
turn_off_mutex_.unlock();
}
msgs_from_class_mutex_.lock();
msgs_from_class_.push_back(msg);
msgs_from_class_mutex_.unlock();
if(msg.find("tic") != string::npos)
{
log_stream_("[serialMsgReceived] Msg received from CLASS: " << msg)
}
}
void ClassServer::waitThreadFunction()
{
bool continueLoop = true;
while (continueLoop)
{
turn_off_mutex_.lock();
continueLoop = continue_udp_ || continue_serial_;
turn_off_mutex_.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
log_("[waitThreadFunction] Thread stopped. Stop external service");
io_service_->stop();
}
void ClassServer::serialSendThreadFunction()
{
bool continueLoop = true;
while (continueLoop)
{
turn_off_mutex_.lock();
continueLoop = !turn_off_;
turn_off_mutex_.unlock();
bool is_empty;
msgs_to_class_mutex_.lock();
is_empty = msgs_to_class_.empty();
msgs_to_class_mutex_.unlock();
if (!is_empty)
{
// send the first msg in queue
std::string msg;
msgs_to_class_mutex_.lock();
msg = msgs_to_class_[0];
commands::ClassCommands d;
log_stream_("[serialSendThreadFunction] Msg sent to CLASS device : " << d.Log(msg));
msgs_to_class_.erase(msgs_to_class_.begin());
msgs_to_class_mutex_.unlock();
serial_->writeString(msg);
std::string msg_without_ending = msg;
replace(msg_without_ending, "\r\n", "\\r\\n");
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
else
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
log_("[serialSendThreadFunction] Thread stopped. Disconnect from serial");
disconnectFromSerial();
}
int ClassServer::startUDPServer(const int &listening_port)
{
if (udp_server_ != nullptr)
{
log_("[startUDPServer] There is already a UDP server listening");
}
log_stream_("[startUDPServer] Starting UDP server on port " << listening_port);
udp_server_ = new UDPServer(listening_port);
udp_server_->startListening();
udp_server_->addListener((UdpServerListener *)this);
// starts a thread for sending msgs to clients
udp_send_thread_ = new std::thread(&ClassServer::udpSendThreadFunction, this);
udp_send_thread_->detach();
return 0;
}
int ClassServer::stopUDPServer()
{
if (udp_server_ != nullptr)
{
log_("[stopUDPServer] Stopping UDP server");
udp_server_->removeListener((UdpServerListener *)this);
udp_server_->stopListening();
delete udp_server_;
udp_server_ = nullptr;
turn_off_mutex_.lock();
continue_udp_ = false;
turn_off_mutex_.unlock();
log_("[stopUDPServer] UDP server stopped");
}
return 0;
}
void ClassServer::udpMsgReceived(std::string msg, std::string ip)
{
// parse message from client
if (msg.find("flush") != std::string::npos)
{
log_stream_("[udpMsgReceived] Message received from client (IP: " << ip << "): Flush");
// flush async_serial buffer
if (serial_ != nullptr)
{
serial_->flush();
}
// flush internal buffer
msgs_from_class_mutex_.lock();
msgs_from_class_.clear();
msgs_from_class_mutex_.unlock();
}
else if (msg.find("disconnect") != std::string::npos)
{
// extract port where the client was listenening
std::regex regex("\\ ");
std::vector<std::string> out(
std::sregex_token_iterator(msg.begin(), msg.end(), regex, -1),
std::sregex_token_iterator());
if (out.size() != 2)
{
return;
}
int client_port = 0;
try
{
client_port = std::stoi(out[1]);
}
catch (const std::exception &e)
{
log_error_stream_("[udpMsgReceived] disconnect command must be followed by an integer (port in which the client listens) but (" << out[1] << ") has been provided. Exc:" << e.what());
return;
}
if (udp_mutex_.try_lock())
{
for (unsigned int i = 0; i < udp_clients_.size(); i++)
{
if (udp_clients_[i]->getIP() == ip && udp_clients_[i]->getPort() == client_port)
{
log_stream_("[udpMsgReceived] Client (IP: " << ip << ", port: " << client_port << ") is not listening anymore");
delete udp_clients_[i];
udp_clients_.erase(udp_clients_.begin() + i);
break;
}
}
udp_mutex_.unlock();
}
}
else if (msg.find("connect") != std::string::npos)
{
// extract port where the client will listen
// extract port
std::regex regex("\\ ");
std::vector<std::string> out(
std::sregex_token_iterator(msg.begin(), msg.end(), regex, -1),
std::sregex_token_iterator());
if (out.size() != 2)
{
return;
}
int client_port = 0;
try
{
client_port = std::stoi(out[1]);
}
catch (const std::exception &e)
{
log_error_stream_("[udpMsgReceived] connect command must be followed by an iteger (port in which the client listens) but (" << out[1] << ") has been provided. Exc:" << e.what());
return;
}
if (udp_mutex_.try_lock())
{
bool found = false;
for (unsigned int i = 0; i < udp_clients_.size(); i++)
{
if (udp_clients_[i]->getIP() == ip && udp_clients_[i]->getPort() == client_port)
{
log_stream_("[udpMsgReceived] There is a client already listening (IP: " << ip << ", port: " << client_port << ")");
found = true;
break;
}
}
if (!found)
{
log_stream_("[udpMsgReceived] New client is listening (IP: " << ip << ", port: " << client_port << ")");
udp_client_ips_.push_back(ip);
udp_clients_.push_back(new UDPClient(ip, out[1], -1));
}
udp_mutex_.unlock();
}
}
else
{
// send msg to CLASS_DEVICE
std::vector<string>::iterator it = std::find(udp_client_ips_.begin(), udp_client_ips_.end(), ip);
if (it == udp_client_ips_.end())
{
log_warn_stream_("[udpMsgReceived] Client (IP: " << ip << ") is not connected");
}
else
{
// std::string msg_without_ending = msg;
// replace(msg_without_ending, "\r\n", "\\r\\n");
// log_stream_("[udpMsgReceived] Message received from client (IP: " << ip << "): " << msg_without_ending);
msgs_to_class_mutex_.lock();
msgs_to_class_.push_back(msg);
msgs_to_class_mutex_.unlock();
}
}
}
bool ClassServer::replace(std::string &str, const std::string &from, const std::string &to)
{
size_t start_pos = str.find(from);
while (start_pos != std::string::npos)
{
str.replace(start_pos, from.length(), to);
start_pos = str.find(from);
}
return true;
}
std::string ClassServer::trim(string str)
{
size_t first = str.find_first_not_of(' ');
if (string::npos == first)
{
return "";
}
size_t last = str.find_last_not_of(' ');
return str.substr(first, (last - first + 1));
}
void ClassServer::udpSendThreadFunction()
{
bool continueLoop = true;
while (continueLoop)
{
turn_off_mutex_.lock();
continueLoop = !turn_off_;
turn_off_mutex_.unlock();
bool is_empty;
msgs_from_class_mutex_.lock();
is_empty = msgs_from_class_.empty();
msgs_from_class_mutex_.unlock();
if (!is_empty)
{
if (!udp_clients_.empty())
// send the first msg in queue
{
std::vector<std::string> msgs;
msgs_from_class_mutex_.lock();
msgs = msgs_from_class_;
msgs_from_class_.clear();
msgs_from_class_mutex_.unlock();
if (!msgs.empty())
{
if (udp_mutex_.try_lock())
{
for (unsigned int i = 0; i < msgs.size(); i++)
{
for (unsigned int j = 0; j < udp_clients_.size(); j++)
{
msgs[i].erase(std::remove(msgs[i].begin(), msgs[i].end(), '\n'), msgs[i].end());
msgs[i].erase(std::remove(msgs[i].begin(), msgs[i].end(), '\r'), msgs[i].end());
trim(msgs[i]);
if (msgs[i].find("mac ") != std::string::npos){
//log_stream_("mac text found");
continue;
}
if (!msgs[i].empty())
{
//log_stream_("[SENDING ---->>>>] " << msgs[i]);
udp_clients_[j]->send(msgs[i]);
}
// if (!msgs[i].empty())
// {
// udp_clients_[j]->send(msgs[i]);
// }
}
}
udp_mutex_.unlock();
}
// else
// {
// log_warn_stream_("[udpSendThreadFunction] udp_mutex_ not get amd msgs pending: " << msgs[0]);
// }
}
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
log_("[udpSendThreadFunction] Thread stopped. UDP server stopped");
stopUDPServer();
}
/*logging*/
void ClassServer::log_(const std::string &text)
{
LOG_INFO << "[ClassServer]: " << text;
}
void ClassServer::log_(std::ostream &text)
{
LOG_INFO << "[ClassServer]: " << text.rdbuf();
}
void ClassServer::log_(const std::stringstream &text)
{
LOG_INFO << "[ClassServer]: " << text.str();
}
// see http://www.cplusplus.com/forum/unices/36461/
// see https://docs.microsoft.com/en-us/windows/console/setconsoletextattribute
#ifdef __unix__
const std::string bold_red("\033[1;31m");
const std::string bold_yellow("\033[1;33m");
const std::string reset("\033[0m");
#endif
void ClassServer::log_err_(const std::string &text)
{
#ifdef __unix__
LOG_ERROR << bold_red << "[ClassServer]: " << text << reset;
#elif defined(_WIN32) || defined(WIN32)
HANDLE hConsole = GetStdHandle(STD_OUTPUT_HANDLE);
SetConsoleTextAttribute(hConsole, 12);
LOG_ERROR << "[ClassServer]: " << text;
SetConsoleTextAttribute(hConsole, 7);
#endif
}
void ClassServer::log_err_(std::ostream &text)
{
#ifdef __unix__
LOG_ERROR << bold_red << "[ClassServer]: " << text.rdbuf() << reset;
#elif defined(_WIN32) || defined(WIN32)
HANDLE hConsole = GetStdHandle(STD_OUTPUT_HANDLE);
SetConsoleTextAttribute(hConsole, 12);
LOG_ERROR << "[ClassServer]: " << text.rdbuf();
SetConsoleTextAttribute(hConsole, 7);
#endif
}
void ClassServer::log_warn_(const std::string &text)
{
#ifdef __unix__
LOG_WARNING << bold_yellow << "[ClassServer]: " << text << reset;
#elif defined(_WIN32) || defined(WIN32)
HANDLE hConsole = GetStdHandle(STD_OUTPUT_HANDLE);
SetConsoleTextAttribute(hConsole, 14);
LOG_WARNING << "[ClassServer]: " << text;
SetConsoleTextAttribute(hConsole, 7);
#endif
}
void ClassServer::log_warn_(std::ostream &text)
{
#ifdef __unix__
LOG_WARNING << bold_yellow << "[ClassServer]: " << text.rdbuf() << reset;
#elif defined(_WIN32) || defined(WIN32)
HANDLE hConsole = GetStdHandle(STD_OUTPUT_HANDLE);
SetConsoleTextAttribute(hConsole, 14);
LOG_WARNING << "[ClassServer]: " << text.rdbuf();
SetConsoleTextAttribute(hConsole, 7);
#endif
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment