From 336a636aed43cb12fe0a2b6e6d364f1fe3f7e007 Mon Sep 17 00:00:00 2001 From: Christian Brunschen Date: Sun, 6 Aug 2017 13:25:55 +0200 Subject: [PATCH] Refactor server_{ws,http}.hpp into separate interface and implementation. (#2548) Refactor server_{ws,http}.hpp into separate interface and implementation headers. When shutting down the HTTP server, also explicitly stop the asio::io_context. --- src/emu/http.cpp | 78 ++-- src/emu/http.h | 21 +- src/lib/util/server_http.hpp | 491 +------------------- src/lib/util/server_http_impl.hpp | 482 ++++++++++++++++++++ src/lib/util/server_ws.hpp | 703 +---------------------------- src/lib/util/server_ws_impl.hpp | 717 ++++++++++++++++++++++++++++++ src/mame/machine/esqpanel.cpp | 656 +++++++++++++-------------- src/mame/machine/esqpanel.h | 6 +- 8 files changed, 1621 insertions(+), 1533 deletions(-) create mode 100644 src/lib/util/server_http_impl.hpp create mode 100644 src/lib/util/server_ws_impl.hpp diff --git a/src/emu/http.cpp b/src/emu/http.cpp index e483a32a099..d7f77474bd1 100644 --- a/src/emu/http.cpp +++ b/src/emu/http.cpp @@ -9,8 +9,8 @@ HTTP server handling ***************************************************************************/ #include "emu.h" -#include "server_ws.hpp" -#include "server_http.hpp" +#include "server_ws_impl.hpp" +#include "server_http_impl.hpp" #include #include @@ -115,13 +115,13 @@ static std::string extension_to_type(const std::string& extension) struct http_request_impl : public http_manager::http_request { public: - std::shared_ptr m_request; + std::shared_ptr m_request; std::size_t m_query; std::size_t m_fragment; std::size_t m_path_end; std::size_t m_query_end; - http_request_impl(std::shared_ptr request) : m_request(request) { + http_request_impl(std::shared_ptr request) : m_request(request) { std::size_t len = m_request->path.length(); m_fragment = m_request->path.find('#'); @@ -181,13 +181,13 @@ public: /** An HTTP response. */ struct http_response_impl : public http_manager::http_response { - std::shared_ptr m_response; + std::shared_ptr m_response; int m_status; std::string m_content_type; std::stringstream m_headers; std::stringstream m_body; - http_response_impl(std::shared_ptr response) : m_response(response) { } + http_response_impl(std::shared_ptr response) : m_response(response) { } /** Sets the HTTP status to be returned to the client. */ virtual void set_status(int status) { @@ -220,9 +220,9 @@ struct http_response_impl : public http_manager::http_response { struct websocket_endpoint_impl : public http_manager::websocket_endpoint { /** The underlying edpoint. */ - std::shared_ptr m_endpoint; + webpp::ws_server::Endpoint *m_endpoint; - websocket_endpoint_impl(std::shared_ptr endpoint, + websocket_endpoint_impl(webpp::ws_server::Endpoint *endpoint, http_manager::websocket_open_handler on_open, http_manager::websocket_message_handler on_message, http_manager::websocket_close_handler on_close, @@ -239,20 +239,24 @@ struct websocket_connection_impl : public http_manager::websocket_connection { /** The server */ webpp::ws_server *m_wsserver; /* The underlying Commection. */ - std::shared_ptr m_connection; - websocket_connection_impl(webpp::ws_server *server, std::shared_ptr connection) + std::weak_ptr m_connection; + websocket_connection_impl(webpp::ws_server *server, std::shared_ptr connection) : m_wsserver(server), m_connection(connection) { } /** Sends a message to the client that is connected on the other end of this Websocket connection. */ virtual void send_message(const std::string &payload, int opcode) { - std::shared_ptr message_stream = std::make_shared(); - (*message_stream) << payload; - m_wsserver->send(m_connection, message_stream, nullptr, opcode | 0x80); + if (auto connection = m_connection.lock()) { + std::shared_ptr message_stream = std::make_shared(); + (*message_stream) << payload; + m_wsserver->send(connection, message_stream, nullptr, opcode | 0x80); + } } /** Closes this open Websocket connection. */ virtual void close() { - m_wsserver->send_close(m_connection, 1000 /* normal close */); + if (auto connection = m_connection.lock()) { + m_wsserver->send_close(connection, 1000 /* normal close */); + } } }; @@ -338,11 +342,12 @@ http_manager::~http_manager() if (!m_server) return; m_server->stop(); + m_io_context->stop(); if (m_server_thread.joinable()) m_server_thread.join(); } -static void on_get(http_manager::http_handler handler, std::shared_ptr response, std::shared_ptr request) { +static void on_get(http_manager::http_handler handler, std::shared_ptr response, std::shared_ptr request) { auto request_impl = std::make_shared(request); auto response_impl = std::make_shared(response); @@ -351,23 +356,22 @@ static void on_get(http_manager::http_handler handler, std::shared_ptrsend(); } -void http_manager::on_open(http_manager::websocket_endpoint_ptr endpoint, void *connection) { +void http_manager::on_open(http_manager::websocket_endpoint_ptr endpoint, std::shared_ptr connection) { std::lock_guard lock(m_connections_mutex); webpp::ws_server *ws_server = m_wsserver.get(); - // Keep an oening shared_ptr to the Connection, so it won't go away while we are using it. - std::shared_ptr conn = (static_cast(connection))->ptr(); - http_manager::websocket_connection_ptr connection_impl = std::make_shared(ws_server, conn); - m_connections[connection] = connection_impl; + + http_manager::websocket_connection_ptr connection_impl = std::make_shared(ws_server, connection); + m_connections[connection.get()] = connection_impl; if (endpoint->on_open) { endpoint->on_open(connection_impl); } } -void http_manager::on_message(http_manager::websocket_endpoint_ptr endpoint, void *connection, const std::string &payload, int opcode) { +void http_manager::on_message(http_manager::websocket_endpoint_ptr endpoint, std::shared_ptr connection, const std::string &payload, int opcode) { if (endpoint->on_message) { std::lock_guard lock(m_connections_mutex); - auto i = m_connections.find(connection); + auto i = m_connections.find(connection.get()); if (i != m_connections.end()) { http_manager::websocket_connection_ptr websocket_connection_impl = (*i).second; endpoint->on_message(websocket_connection_impl, payload, opcode); @@ -375,31 +379,31 @@ void http_manager::on_message(http_manager::websocket_endpoint_ptr endpoint, voi } } -void http_manager::on_close(http_manager::websocket_endpoint_ptr endpoint, void *connection, +void http_manager::on_close(http_manager::websocket_endpoint_ptr endpoint, std::shared_ptr connection, int status, const std::string& reason) { std::lock_guard lock(m_connections_mutex); - auto i = m_connections.find(connection); + auto i = m_connections.find(connection.get()); if (i != m_connections.end()) { if (endpoint->on_close) { http_manager::websocket_connection_ptr websocket_connection_impl = (*i).second; endpoint->on_close(websocket_connection_impl, status, reason); } - m_connections.erase(connection); + m_connections.erase(connection.get()); } } -void http_manager::on_error(http_manager::websocket_endpoint_ptr endpoint, void *connection, +void http_manager::on_error(http_manager::websocket_endpoint_ptr endpoint, std::shared_ptr connection, const std::error_code& error_code) { std::lock_guard lock(m_connections_mutex); - auto i = m_connections.find(connection); + auto i = m_connections.find(connection.get()); if (i != m_connections.end()) { if (endpoint->on_error) { http_manager::websocket_connection_ptr websocket_connection_impl = (*i).second; endpoint->on_error(websocket_connection_impl, error_code); } - m_connections.erase(connection); + m_connections.erase(connection.get()); } } @@ -527,25 +531,25 @@ http_manager::websocket_endpoint_ptr http_manager::add_endpoint(const std::strin using namespace std::placeholders; auto &endpoint = m_wsserver->m_endpoint[path]; - std::shared_ptr endpoint_ptr(&endpoint); + webpp::ws_server::Endpoint *endpoint_ptr(&endpoint); auto endpoint_impl = std::make_shared(endpoint_ptr, on_open, on_message, on_close, on_error); - endpoint.on_open = [&, this, endpoint_impl](std::shared_ptr conn) { - this->on_open(endpoint_impl, conn.get()); + endpoint.on_open = [&, this, endpoint_impl](std::shared_ptr connection) { + this->on_open(endpoint_impl, connection); }; - endpoint.on_message = [&, this, endpoint_impl](std::shared_ptr conn, std::shared_ptr message) { + endpoint.on_message = [&, this, endpoint_impl](std::shared_ptr connection, std::shared_ptr message) { std::string payload = message->string(); int opcode = message->fin_rsv_opcode & 0x0f; - this->on_message(endpoint_impl, conn.get(), payload, opcode); + this->on_message(endpoint_impl, connection, payload, opcode); }; - endpoint.on_close = [&, this, endpoint_impl](std::shared_ptr conn, int status, const std::string& reason) { - this->on_close(endpoint_impl, conn.get(), status, reason); + endpoint.on_close = [&, this, endpoint_impl](std::shared_ptr connection, int status, const std::string& reason) { + this->on_close(endpoint_impl, connection, status, reason); }; - endpoint.on_error = [&, this, endpoint_impl](std::shared_ptr conn, const std::error_code& error_code) { - this->on_error(endpoint_impl, conn.get(), error_code); + endpoint.on_error = [&, this, endpoint_impl](std::shared_ptr connection, const std::error_code& error_code) { + this->on_error(endpoint_impl, connection, error_code); }; m_endpoints[path] = endpoint_impl; diff --git a/src/emu/http.h b/src/emu/http.h index 783720bd2d1..6c845757757 100644 --- a/src/emu/http.h +++ b/src/emu/http.h @@ -19,6 +19,8 @@ HTTP server handling #include #include +#include "server_http.hpp" +#include "server_ws.hpp" //************************************************************************** // TYPE DEFINITIONS @@ -29,11 +31,6 @@ namespace asio { class io_context; } -namespace webpp -{ - class http_server; - class ws_server; -} class http_manager { @@ -41,7 +38,7 @@ class http_manager public: /** An HTTP Request. */ - struct http_request : public std::enable_shared_from_this + struct http_request { /** Retrieves the requested resource. */ virtual const std::string get_resource() = 0; // The entire resource: path, query and fragment. @@ -67,7 +64,7 @@ public: typedef std::shared_ptr http_request_ptr; /** An HTTP response. */ - struct http_response : public std::enable_shared_from_this + struct http_response { /** Sets the HTTP status to be returned to the client. */ virtual void set_status(int status) = 0; @@ -84,7 +81,7 @@ public: typedef std::shared_ptr http_response_ptr; /** Identifies a Websocket connection. */ - struct websocket_connection : public std::enable_shared_from_this + struct websocket_connection { /** Sends a message to the client that is connected on the other end of this Websocket connection. */ virtual void send_message(const std::string &payload, int opcode) = 0; @@ -162,13 +159,13 @@ public: } private: - void on_open(http_manager::websocket_endpoint_ptr endpoint, void *onnection); + void on_open(http_manager::websocket_endpoint_ptr endpoint, std::shared_ptr connection); - void on_message(http_manager::websocket_endpoint_ptr endpoint, void *connection, const std::string& payload, int opcode); + void on_message(http_manager::websocket_endpoint_ptr endpoint, std::shared_ptr connection, const std::string& payload, int opcode); - void on_close(http_manager::websocket_endpoint_ptr endpoint, void *connection, int status, const std::string& reason); + void on_close(http_manager::websocket_endpoint_ptr endpoint, std::shared_ptr connection, int status, const std::string& reason); - void on_error(http_manager::websocket_endpoint_ptr endpoint, void *connection, const std::error_code& error_code); + void on_error(http_manager::websocket_endpoint_ptr endpoint, std::shared_ptr connection, const std::error_code& error_code); bool read_file(std::ostream &os, const std::string &path); diff --git a/src/lib/util/server_http.hpp b/src/lib/util/server_http.hpp index b50161b1e3b..b822daecfc8 100644 --- a/src/lib/util/server_http.hpp +++ b/src/lib/util/server_http.hpp @@ -1,14 +1,8 @@ // license:MIT // copyright-holders:Ole Christian Eidheim, Miodrag Milanovic -#ifndef SERVER_HTTP_HPP -#define SERVER_HTTP_HPP +#ifndef MAME_LIB_UTIL_SERVER_HTTP_HPP +#define MAME_LIB_UTIL_SERVER_HTTP_HPP -#if defined(_MSC_VER) -#pragma warning(disable:4503) -#endif - -#include "asio.h" -#include "asio/system_timer.hpp" #include "path_to_regex.hpp" #include @@ -41,471 +35,32 @@ public: return seed; } }; -#endif +#endif /* CASE_INSENSITIVE_EQUALS_AND_HASH */ + namespace webpp { - template - class Server; + struct Request { + std::string method, path, http_version; - template - class ServerBase { - public: - virtual ~ServerBase() {} + std::unordered_multimap header; - class Response { - friend class ServerBase; + path2regex::Keys keys; + std::map params; - asio::streambuf m_streambuf; - - std::shared_ptr m_socket; - std::ostream m_ostream; - std::stringstream m_header; - explicit Response(const std::shared_ptr &socket) : m_socket(socket), m_ostream(&m_streambuf) {} - - static std::string statusToString(int status) - { - switch (status) { - default: - case 200: return "HTTP/1.0 200 OK\r\n"; - case 201: return "HTTP/1.0 201 Created\r\n"; - case 202: return "HTTP/1.0 202 Accepted\r\n"; - case 204: return "HTTP/1.0 204 No Content\r\n"; - case 300: return "HTTP/1.0 300 Multiple Choices\r\n"; - case 301: return "HTTP/1.0 301 Moved Permanently\r\n"; - case 302: return "HTTP/1.0 302 Moved Temporarily\r\n"; - case 304: return "HTTP/1.0 304 Not Modified\r\n"; - case 400: return "HTTP/1.0 400 Bad Request\r\n"; - case 401: return "HTTP/1.0 401 Unauthorized\r\n"; - case 403: return "HTTP/1.0 403 Forbidden\r\n"; - case 404: return "HTTP/1.0 404 Not Found\r\n"; - case 500: return "HTTP/1.0 500 Internal Server Error\r\n"; - case 501: return "HTTP/1.0 501 Not Implemented\r\n"; - case 502: return "HTTP/1.0 502 Bad Gateway\r\n"; - case 504: return "HTTP/1.0 503 Service Unavailable\r\n"; - } - } - public: - Response& status(int number) { m_ostream << statusToString(number); return *this; } - void type(std::string str) { m_header << "Content-Type: "<< str << "\r\n"; } - void send(std::string str) { m_ostream << m_header.str() << "Content-Length: " << str.length() << "\r\n\r\n" << str; } - size_t size() const { return m_streambuf.size(); } - std::shared_ptr socket() { return m_socket; } - - /// If true, force server to close the connection after the response have been sent. - /// - /// This is useful when implementing a HTTP/1.0-server sending content - /// without specifying the content length. - bool close_connection_after_response = false; - }; - - class Content : public std::istream { - friend class ServerBase; - public: - size_t size() const { - return streambuf.size(); - } - std::string string() const { - std::stringstream ss; - ss << rdbuf(); - return ss.str(); - } - private: - asio::streambuf &streambuf; - explicit Content(asio::streambuf &streambuf): std::istream(&streambuf), streambuf(streambuf) {} - }; - - class Request { - friend class ServerBase; - friend class Server; - public: - std::string method, path, http_version; - - Content content; - - std::unordered_multimap header; - - path2regex::Keys keys; - std::map params; - - std::string remote_endpoint_address; - unsigned short remote_endpoint_port; - - private: - Request(const socket_type &socket): content(streambuf) { - try { - remote_endpoint_address=socket.lowest_layer().remote_endpoint().address().to_string(); - remote_endpoint_port=socket.lowest_layer().remote_endpoint().port(); - } - catch(...) {} - } - asio::streambuf streambuf; - }; - - class Config { - friend class ServerBase; - - Config(unsigned short port) : port(port) {} - public: - /// Port number to use. Defaults to 80 for HTTP and 443 for HTTPS. - unsigned short port; - /// Number of threads that the server will use when start() is called. Defaults to 1 thread. - size_t thread_pool_size=1; - /// Timeout on request handling. Defaults to 5 seconds. - size_t timeout_request=5; - /// Timeout on content handling. Defaults to 300 seconds. - size_t timeout_content=300; - /// IPv4 address in dotted decimal form or IPv6 address in hexadecimal notation. - /// If empty, the address will be any address. - std::string address; - /// Set to false to avoid binding the socket to an address that is already in use. Defaults to true. - bool reuse_address=true; - }; - ///Set before calling start(). - Config m_config; - private: - class regex_orderable : public std::regex { - std::string str; - public: - regex_orderable(std::regex reg, const std::string ®ex_str) : std::regex(reg), str(regex_str) {} - bool operator<(const regex_orderable &rhs) const { - return str, std::shared_ptr)>; - - public: - template void on_get(std::string regex, T&& func) { std::lock_guard lock(m_resource_mutex); path2regex::Keys keys; auto reg = path2regex::path_to_regex(regex, keys); m_resource[regex_orderable(reg,regex)]["GET"] = std::make_tuple(std::move(keys), func); } - template void on_get(T&& func) { std::lock_guard lock(m_resource_mutex); m_default_resource["GET"] = func; } - template void on_post(std::string regex, T&& func) { std::lock_guard lock(m_resource_mutex); path2regex::Keys keys; auto reg = path2regex::path_to_regex(regex, keys); m_resource[regex_orderable(reg, regex)]["POST"] = std::make_tuple(std::move(keys), func); } - template void on_post(T&& func) { std::lock_guard lock(m_resource_mutex); m_default_resource["POST"] = func; } - template void on_put(std::string regex, T&& func) { std::lock_guard lock(m_resource_mutex); path2regex::Keys keys; auto reg = path2regex::path_to_regex(regex, keys); m_resource[regex_orderable(reg, regex)]["PUT"] = std::make_tuple(std::move(keys), func); } - template void on_put(T&& func) { std::lock_guard lock(m_resource_mutex); m_default_resource["PUT"] = func; } - template void on_patch(std::string regex, T&& func) { std::lock_guard lock(m_resource_mutex); path2regex::Keys keys; auto reg = path2regex::path_to_regex(regex, keys); m_resource[regex_orderable(reg, regex)]["PATCH"] = std::make_tuple(std::move(keys), func); } - template void on_patch(T&& func) { std::lock_guard lock(m_resource_mutex); m_default_resource["PATCH"] = func; } - template void on_delete(std::string regex, T&& func) { std::lock_guard lock(m_resource_mutex); path2regex::Keys keys; auto reg = path2regex::path_to_regex(regex, keys); m_resource[regex_orderable(reg, regex)]["DELETE"] = std::make_tuple(std::move(keys), func); } - template void on_delete(T&& func) { std::lock_guard lock(m_resource_mutex); m_default_resource["DELETE"] = func; } - - void remove_handler(std::string regex) - { - std::lock_guard lock(m_resource_mutex); - for (auto it = m_resource.begin(); it != m_resource.end(); ++it) - { - if (it->first.getstr() == regex) - { - m_resource.erase(it); - break; - } - } - } - void clear() - { - std::lock_guard lock(m_resource_mutex); - m_resource.clear(); - } - - std::function::Request>, const std::error_code&)> on_error; - - std::function socket, std::shared_ptr::Request>)> on_upgrade; - private: - /// Warning: do not access (red or write) m_resources without holding m_resource_mutex - std::map>> m_resource; - std::mutex m_resource_mutex; - - std::map m_default_resource; - - public: - virtual void start() { - if(!m_io_context) - m_io_context=std::make_shared(); - - if(m_io_context->stopped()) - m_io_context.reset(); - - asio::ip::tcp::endpoint endpoint; - if(m_config.address.size()>0) - endpoint=asio::ip::tcp::endpoint(asio::ip::make_address(m_config.address), m_config.port); - else - endpoint=asio::ip::tcp::endpoint(asio::ip::tcp::v4(), m_config.port); - - if(!acceptor) - acceptor= std::make_unique(*m_io_context); - acceptor->open(endpoint.protocol()); - acceptor->set_option(asio::socket_base::reuse_address(m_config.reuse_address)); - acceptor->bind(endpoint); - acceptor->listen(); - - accept(); - - if (!m_external_context) - m_io_context->run(); - } - - void stop() const - { - acceptor->close(); - if (!m_external_context) - m_io_context->stop(); - } - - ///Use this function if you need to recursively send parts of a longer message - void send(const std::shared_ptr &response, const std::function& callback=nullptr) const { - asio::async_write(*response->socket(), response->m_streambuf, [this, response, callback](const std::error_code& ec, size_t /*bytes_transferred*/) { - if(callback) - callback(ec); - }); - } - - void set_io_context(std::shared_ptr new_io_context) - { - m_io_context = new_io_context; - m_external_context = true; - } - protected: - std::shared_ptr m_io_context; - bool m_external_context; - std::unique_ptr acceptor; - std::vector threads; - - ServerBase(unsigned short port) : m_config(port), m_external_context(false) {} - - virtual void accept()=0; - - std::shared_ptr get_timeout_timer(const std::shared_ptr &socket, long seconds) { - if(seconds==0) - return nullptr; - auto timer = std::make_shared(*m_io_context); - timer->expires_at(std::chrono::system_clock::now() + std::chrono::seconds(seconds)); - timer->async_wait([socket](const std::error_code& ec){ - if(!ec) { - std::error_code newec = ec; - socket->lowest_layer().shutdown(asio::ip::tcp::socket::shutdown_both, newec); - socket->lowest_layer().close(); - } - }); - return timer; - } - - void read_request_and_content(const std::shared_ptr &socket) { - //Create new streambuf (Request::streambuf) for async_read_until() - //shared_ptr is used to pass temporary objects to the asynchronous functions - std::shared_ptr request(new Request(*socket)); - - //Set timeout on the following asio::async-read or write function - auto timer = get_timeout_timer(socket, m_config.timeout_request); - - asio::async_read_until(*socket, request->streambuf, "\r\n\r\n", - [this, socket, request, timer](const std::error_code& ec, size_t bytes_transferred) { - if(timer) - timer->cancel(); - if(!ec) { - //request->streambuf.size() is not necessarily the same as bytes_transferred, from Boost-docs: - //"After a successful async_read_until operation, the streambuf may contain additional data beyond the delimiter" - //The chosen solution is to extract lines from the stream directly when parsing the header. What is left of the - //streambuf (maybe some bytes of the content) is appended to in the async_read-function below (for retrieving content). - size_t num_additional_bytes=request->streambuf.size()-bytes_transferred; - - if (!parse_request(request)) - return; - - //If content, read that as well - auto it = request->header.find("Content-Length"); - if (it != request->header.end()) { - unsigned long long content_length; - try { - content_length = stoull(it->second); - } - catch (const std::exception &) { - if (on_error) - on_error(request, std::error_code(EPROTO, std::generic_category())); - return; - } - if (content_length > num_additional_bytes) { - //Set timeout on the following asio::async-read or write function - auto timer2 = get_timeout_timer(socket, m_config.timeout_content); - asio::async_read(*socket, request->streambuf, - asio::transfer_exactly(size_t(content_length) - num_additional_bytes), - [this, socket, request, timer2] - (const std::error_code& ec, size_t /*bytes_transferred*/) { - if (timer2) - timer2->cancel(); - if (!ec) - find_resource(socket, request); - else if (on_error) - on_error(request, ec); - }); - } - else { - find_resource(socket, request); - } - } - else { - find_resource(socket, request); - } - } - else if (on_error) - on_error(request, ec); - }); - } - - bool parse_request(const std::shared_ptr &request) const { - std::string line; - getline(request->content, line); - size_t method_end; - if((method_end=line.find(' '))!=std::string::npos) { - size_t path_end; - if((path_end=line.find(' ', method_end+1))!=std::string::npos) { - request->method=line.substr(0, method_end); - request->path=line.substr(method_end+1, path_end-method_end-1); - - size_t protocol_end; - if((protocol_end=line.find('/', path_end+1))!=std::string::npos) { - if(line.compare(path_end+1, protocol_end-path_end-1, "HTTP")!=0) - return false; - request->http_version=line.substr(protocol_end+1, line.size()-protocol_end-2); - } - else - return false; - - getline(request->content, line); - size_t param_end; - while((param_end=line.find(':'))!=std::string::npos) { - size_t value_start=param_end+1; - if((value_start)header.emplace(line.substr(0, param_end), line.substr(value_start, line.size() - value_start - 1)); - } - - getline(request->content, line); - } - } - else - return false; - } - else - return false; - return true; - } - - void find_resource(const std::shared_ptr &socket, const std::shared_ptr &request) { - std::lock_guard lock(m_resource_mutex); - //Upgrade connection - if(on_upgrade) { - auto it=request->header.find("Upgrade"); - if(it!=request->header.end()) { - on_upgrade(socket, request); - return; - } - } - //Find path- and method-match, and call write_response - for(auto& regex_method : m_resource) { - auto it = regex_method.second.find(request->method); - if (it != regex_method.second.end()) { - std::smatch sm_res; - if (std::regex_match(request->path, sm_res, regex_method.first)) { - request->keys = std::get<0>(it->second); - for (size_t i = 0; i < request->keys.size(); i++) { - request->params.insert(std::pair(request->keys[i].name, sm_res[i + 1])); - } - write_response(socket, request, std::get<1>(it->second)); - return; - } - } - } - auto it=m_default_resource.find(request->method); - if(it!=m_default_resource.end()) { - write_response(socket, request, it->second); - } - } - - void write_response(const std::shared_ptr &socket, const std::shared_ptr &request, http_handler& resource_function) { - //Set timeout on the following asio::async-read or write function - auto timer = get_timeout_timer(socket, m_config.timeout_content); - - auto response=std::shared_ptr(new Response(socket), [this, request, timer](Response *response_ptr) { - auto response=std::shared_ptr(response_ptr); - send(response, [this, response, request, timer](const std::error_code& ec) { - if (timer) - timer->cancel(); - if (!ec) { - float http_version; - try { - http_version = stof(request->http_version); - } - catch (const std::exception &) { - if (on_error) - on_error(request, std::error_code(EPROTO, std::generic_category())); - return; - } - - if (response->close_connection_after_response) - return; - - auto range = request->header.equal_range("Connection"); - case_insensitive_equals check; - for (auto it = range.first; it != range.second; ++it) { - if (check(it->second, "close")) - return; - } - if (http_version > 1.05) - read_request_and_content(response->socket()); - } - else if (on_error) - on_error(request, ec); - }); - }); - - try { - resource_function(response, request); - } - catch(const std::exception &) { - if (on_error) - on_error(request, std::error_code(EPROTO, std::generic_category())); - } - } + std::string remote_endpoint_address; + unsigned short remote_endpoint_port; + + virtual ~Request() {} }; + + struct Response { + virtual Response& status(int number) = 0; + virtual void type(std::string str) = 0; + virtual void send(std::string str) = 0; + virtual size_t size() const = 0; - template - class Server : public ServerBase { - public: - Server(unsigned short port, size_t num_threads, long timeout_request, long timeout_send_or_receive) - : ServerBase(port, num_threads, timeout_request, timeout_send_or_receive) - { - } - }; - - using HTTP = asio::ip::tcp::socket; - - template<> - class Server : public ServerBase { - public: - Server() : ServerBase::ServerBase(80) {} - protected: - void accept() override { - //Create new socket for this connection - //Shared_ptr is used to pass temporary objects to the asynchronous functions - auto socket = std::make_shared(*m_io_context); - - acceptor->async_accept(*socket, [this, socket](const std::error_code& ec){ - //Immediately start accepting a new connection (if io_context hasn't been stopped) - if (ec != asio::error::operation_aborted) - accept(); - - if(!ec) { - asio::ip::tcp::no_delay option(true); - socket->set_option(option); - - read_request_and_content(socket); - } else if (on_error) - on_error(std::shared_ptr(new Request(*socket)), ec); - }); - } - }; - - class http_server : public Server { - public: - http_server() : Server::Server() {} + virtual ~Response() {} }; + + class http_server; } -#endif /* SERVER_HTTP_HPP */ +#endif /* MAME_LIB_UTIL_SERVER_HTTP_HPP */ diff --git a/src/lib/util/server_http_impl.hpp b/src/lib/util/server_http_impl.hpp new file mode 100644 index 00000000000..e8b76ad2f74 --- /dev/null +++ b/src/lib/util/server_http_impl.hpp @@ -0,0 +1,482 @@ +// license:MIT +// copyright-holders:Ole Christian Eidheim, Miodrag Milanovic +#ifndef MAME_LIB_UTIL_SERVER_HTTP_IMPL_HPP +#define MAME_LIB_UTIL_SERVER_HTTP_IMPL_HPP + +#if defined(_MSC_VER) +#pragma warning(disable:4503) +#endif + +#include "server_http.hpp" +#include "asio.h" +#include "asio/system_timer.hpp" +#include "path_to_regex.hpp" + +#include +#include +#include +#include +#include +#include +#include + +namespace webpp { + template + class Server; + + template + class ServerBase { + public: + virtual ~ServerBase() {} + + class Response : public webpp::Response { + friend class ServerBase; + + asio::streambuf m_streambuf; + std::shared_ptr m_socket; + + std::ostream m_ostream; + std::stringstream m_header; + + explicit Response(const std::shared_ptr &socket) : m_socket(socket), m_ostream(&m_streambuf) {} + + static std::string statusToString(int status) + { + switch (status) { + default: + case 200: return "HTTP/1.0 200 OK\r\n"; + case 201: return "HTTP/1.0 201 Created\r\n"; + case 202: return "HTTP/1.0 202 Accepted\r\n"; + case 204: return "HTTP/1.0 204 No Content\r\n"; + case 300: return "HTTP/1.0 300 Multiple Choices\r\n"; + case 301: return "HTTP/1.0 301 Moved Permanently\r\n"; + case 302: return "HTTP/1.0 302 Moved Temporarily\r\n"; + case 304: return "HTTP/1.0 304 Not Modified\r\n"; + case 400: return "HTTP/1.0 400 Bad Request\r\n"; + case 401: return "HTTP/1.0 401 Unauthorized\r\n"; + case 403: return "HTTP/1.0 403 Forbidden\r\n"; + case 404: return "HTTP/1.0 404 Not Found\r\n"; + case 500: return "HTTP/1.0 500 Internal Server Error\r\n"; + case 501: return "HTTP/1.0 501 Not Implemented\r\n"; + case 502: return "HTTP/1.0 502 Bad Gateway\r\n"; + case 504: return "HTTP/1.0 503 Service Unavailable\r\n"; + } + } + public: + virtual Response& status(int number) { m_ostream << statusToString(number); return *this; } + virtual void type(std::string str) { m_header << "Content-Type: "<< str << "\r\n"; } + virtual void send(std::string str) { m_ostream << m_header.str() << "Content-Length: " << str.length() << "\r\n\r\n" << str; } + virtual size_t size() const { return m_streambuf.size(); } + std::shared_ptr socket() { return m_socket; } + + /// If true, force server to close the connection after the response have been sent. + /// + /// This is useful when implementing a HTTP/1.0-server sending content + /// without specifying the content length. + bool close_connection_after_response = false; + virtual ~Response() {} + }; + + class Content : public std::istream { + friend class ServerBase; + public: + size_t size() const { + return streambuf.size(); + } + std::string string() const { + std::stringstream ss; + ss << rdbuf(); + return ss.str(); + } + private: + asio::streambuf &streambuf; + explicit Content(asio::streambuf &streambuf): std::istream(&streambuf), streambuf(streambuf) {} + }; + + class Request : public webpp::Request { + friend class ServerBase; + friend class Server; + public: + Content content; + + virtual ~Request() {} + private: + Request(const socket_type &socket): content(streambuf) { + try { + remote_endpoint_address=socket.lowest_layer().remote_endpoint().address().to_string(); + remote_endpoint_port=socket.lowest_layer().remote_endpoint().port(); + } + catch(...) {} + } + asio::streambuf streambuf; + }; + + class Config { + friend class ServerBase; + + Config(unsigned short port) : port(port) {} + public: + /// Port number to use. Defaults to 80 for HTTP and 443 for HTTPS. + unsigned short port; + /// Number of threads that the server will use when start() is called. Defaults to 1 thread. + size_t thread_pool_size=1; + /// Timeout on request handling. Defaults to 5 seconds. + size_t timeout_request=5; + /// Timeout on content handling. Defaults to 300 seconds. + size_t timeout_content=300; + /// IPv4 address in dotted decimal form or IPv6 address in hexadecimal notation. + /// If empty, the address will be any address. + std::string address; + /// Set to false to avoid binding the socket to an address that is already in use. Defaults to true. + bool reuse_address=true; + }; + ///Set before calling start(). + Config m_config; + private: + class regex_orderable : public std::regex { + std::string str; + public: + regex_orderable(std::regex reg, const std::string ®ex_str) : std::regex(reg), str(regex_str) {} + bool operator<(const regex_orderable &rhs) const { + return str, std::shared_ptr)>; + + public: + template void on_get(std::string regex, T&& func) { std::lock_guard lock(m_resource_mutex); path2regex::Keys keys; auto reg = path2regex::path_to_regex(regex, keys); m_resource[regex_orderable(reg,regex)]["GET"] = std::make_tuple(std::move(keys), func); } + template void on_get(T&& func) { std::lock_guard lock(m_resource_mutex); m_default_resource["GET"] = func; } + template void on_post(std::string regex, T&& func) { std::lock_guard lock(m_resource_mutex); path2regex::Keys keys; auto reg = path2regex::path_to_regex(regex, keys); m_resource[regex_orderable(reg, regex)]["POST"] = std::make_tuple(std::move(keys), func); } + template void on_post(T&& func) { std::lock_guard lock(m_resource_mutex); m_default_resource["POST"] = func; } + template void on_put(std::string regex, T&& func) { std::lock_guard lock(m_resource_mutex); path2regex::Keys keys; auto reg = path2regex::path_to_regex(regex, keys); m_resource[regex_orderable(reg, regex)]["PUT"] = std::make_tuple(std::move(keys), func); } + template void on_put(T&& func) { std::lock_guard lock(m_resource_mutex); m_default_resource["PUT"] = func; } + template void on_patch(std::string regex, T&& func) { std::lock_guard lock(m_resource_mutex); path2regex::Keys keys; auto reg = path2regex::path_to_regex(regex, keys); m_resource[regex_orderable(reg, regex)]["PATCH"] = std::make_tuple(std::move(keys), func); } + template void on_patch(T&& func) { std::lock_guard lock(m_resource_mutex); m_default_resource["PATCH"] = func; } + template void on_delete(std::string regex, T&& func) { std::lock_guard lock(m_resource_mutex); path2regex::Keys keys; auto reg = path2regex::path_to_regex(regex, keys); m_resource[regex_orderable(reg, regex)]["DELETE"] = std::make_tuple(std::move(keys), func); } + template void on_delete(T&& func) { std::lock_guard lock(m_resource_mutex); m_default_resource["DELETE"] = func; } + + void remove_handler(std::string regex) + { + std::lock_guard lock(m_resource_mutex); + for (auto it = m_resource.begin(); it != m_resource.end(); ++it) + { + if (it->first.getstr() == regex) + { + m_resource.erase(it); + break; + } + } + } + void clear() + { + std::lock_guard lock(m_resource_mutex); + m_resource.clear(); + } + + std::function::Request>, const std::error_code&)> on_error; + + std::function socket, std::shared_ptr::Request>)> on_upgrade; + private: + /// Warning: do not access (red or write) m_resources without holding m_resource_mutex + std::map>> m_resource; + std::mutex m_resource_mutex; + + std::map m_default_resource; + + public: + virtual void start() { + if(!m_io_context) + m_io_context=std::make_shared(); + + if(m_io_context->stopped()) + m_io_context.reset(); + + asio::ip::tcp::endpoint endpoint; + if(m_config.address.size()>0) + endpoint=asio::ip::tcp::endpoint(asio::ip::make_address(m_config.address), m_config.port); + else + endpoint=asio::ip::tcp::endpoint(asio::ip::tcp::v4(), m_config.port); + + if(!acceptor) + acceptor= std::make_unique(*m_io_context); + acceptor->open(endpoint.protocol()); + acceptor->set_option(asio::socket_base::reuse_address(m_config.reuse_address)); + acceptor->bind(endpoint); + acceptor->listen(); + + accept(); + + if (!m_external_context) + m_io_context->run(); + } + + void stop() const + { + acceptor->close(); + if (!m_external_context) + m_io_context->stop(); + } + + ///Use this function if you need to recursively send parts of a longer message + void send(const std::shared_ptr &response, const std::function& callback=nullptr) const { + asio::async_write(*response->socket(), response->m_streambuf, [this, response, callback](const std::error_code& ec, size_t /*bytes_transferred*/) { + if(callback) + callback(ec); + }); + } + + void set_io_context(std::shared_ptr new_io_context) + { + m_io_context = new_io_context; + m_external_context = true; + } + protected: + std::shared_ptr m_io_context; + bool m_external_context; + std::unique_ptr acceptor; + std::vector threads; + + ServerBase(unsigned short port) : m_config(port), m_external_context(false) {} + + virtual void accept()=0; + + std::shared_ptr get_timeout_timer(const std::shared_ptr &socket, long seconds) { + if(seconds==0) + return nullptr; + auto timer = std::make_shared(*m_io_context); + timer->expires_at(std::chrono::system_clock::now() + std::chrono::seconds(seconds)); + timer->async_wait([socket](const std::error_code& ec){ + if(!ec) { + std::error_code newec = ec; + socket->lowest_layer().shutdown(asio::ip::tcp::socket::shutdown_both, newec); + socket->lowest_layer().close(); + } + }); + return timer; + } + + void read_request_and_content(const std::shared_ptr &socket) { + //Create new streambuf (Request::streambuf) for async_read_until() + //shared_ptr is used to pass temporary objects to the asynchronous functions + std::shared_ptr request(new Request(*socket)); + + //Set timeout on the following asio::async-read or write function + auto timer = get_timeout_timer(socket, m_config.timeout_request); + + asio::async_read_until(*socket, request->streambuf, "\r\n\r\n", + [this, socket, request, timer](const std::error_code& ec, size_t bytes_transferred) { + if(timer) + timer->cancel(); + if(!ec) { + //request->streambuf.size() is not necessarily the same as bytes_transferred, from Boost-docs: + //"After a successful async_read_until operation, the streambuf may contain additional data beyond the delimiter" + //The chosen solution is to extract lines from the stream directly when parsing the header. What is left of the + //streambuf (maybe some bytes of the content) is appended to in the async_read-function below (for retrieving content). + size_t num_additional_bytes=request->streambuf.size()-bytes_transferred; + + if (!parse_request(request)) + return; + + //If content, read that as well + auto it = request->header.find("Content-Length"); + if (it != request->header.end()) { + unsigned long long content_length; + try { + content_length = stoull(it->second); + } + catch (const std::exception &) { + if (on_error) + on_error(request, std::error_code(EPROTO, std::generic_category())); + return; + } + if (content_length > num_additional_bytes) { + //Set timeout on the following asio::async-read or write function + auto timer2 = get_timeout_timer(socket, m_config.timeout_content); + asio::async_read(*socket, request->streambuf, + asio::transfer_exactly(size_t(content_length) - num_additional_bytes), + [this, socket, request, timer2] + (const std::error_code& ec, size_t /*bytes_transferred*/) { + if (timer2) + timer2->cancel(); + if (!ec) + find_resource(socket, request); + else if (on_error) + on_error(request, ec); + }); + } + else { + find_resource(socket, request); + } + } + else { + find_resource(socket, request); + } + } + else if (on_error) + on_error(request, ec); + }); + } + + bool parse_request(const std::shared_ptr &request) const { + std::string line; + getline(request->content, line); + size_t method_end; + if((method_end=line.find(' '))!=std::string::npos) { + size_t path_end; + if((path_end=line.find(' ', method_end+1))!=std::string::npos) { + request->method=line.substr(0, method_end); + request->path=line.substr(method_end+1, path_end-method_end-1); + + size_t protocol_end; + if((protocol_end=line.find('/', path_end+1))!=std::string::npos) { + if(line.compare(path_end+1, protocol_end-path_end-1, "HTTP")!=0) + return false; + request->http_version=line.substr(protocol_end+1, line.size()-protocol_end-2); + } + else + return false; + + getline(request->content, line); + size_t param_end; + while((param_end=line.find(':'))!=std::string::npos) { + size_t value_start=param_end+1; + if((value_start)header.emplace(line.substr(0, param_end), line.substr(value_start, line.size() - value_start - 1)); + } + + getline(request->content, line); + } + } + else + return false; + } + else + return false; + return true; + } + + void find_resource(const std::shared_ptr &socket, const std::shared_ptr &request) { + std::lock_guard lock(m_resource_mutex); + //Upgrade connection + if(on_upgrade) { + auto it=request->header.find("Upgrade"); + if(it!=request->header.end()) { + on_upgrade(socket, request); + return; + } + } + //Find path- and method-match, and call write_response + for(auto& regex_method : m_resource) { + auto it = regex_method.second.find(request->method); + if (it != regex_method.second.end()) { + std::smatch sm_res; + if (std::regex_match(request->path, sm_res, regex_method.first)) { + request->keys = std::get<0>(it->second); + for (size_t i = 0; i < request->keys.size(); i++) { + request->params.insert(std::pair(request->keys[i].name, sm_res[i + 1])); + } + write_response(socket, request, std::get<1>(it->second)); + return; + } + } + } + auto it=m_default_resource.find(request->method); + if(it!=m_default_resource.end()) { + write_response(socket, request, it->second); + } + } + + void write_response(const std::shared_ptr &socket, const std::shared_ptr &request, http_handler& resource_function) { + //Set timeout on the following asio::async-read or write function + auto timer = get_timeout_timer(socket, m_config.timeout_content); + + auto response=std::shared_ptr(new Response(socket), [this, request, timer](Response *response_ptr) { + auto response=std::shared_ptr(response_ptr); + send(response, [this, response, request, timer](const std::error_code& ec) { + if (timer) + timer->cancel(); + if (!ec) { + float http_version; + try { + http_version = stof(request->http_version); + } + catch (const std::exception &) { + if (on_error) + on_error(request, std::error_code(EPROTO, std::generic_category())); + return; + } + + if (response->close_connection_after_response) + return; + + auto range = request->header.equal_range("Connection"); + case_insensitive_equals check; + for (auto it = range.first; it != range.second; ++it) { + if (check(it->second, "close")) + return; + } + if (http_version > 1.05) + read_request_and_content(response->socket()); + } + else if (on_error) + on_error(request, ec); + }); + }); + + try { + resource_function(response, request); + } + catch(const std::exception &) { + if (on_error) + on_error(request, std::error_code(EPROTO, std::generic_category())); + } + } + }; + + template + class Server : public ServerBase { + public: + Server(unsigned short port, size_t num_threads, long timeout_request, long timeout_send_or_receive) + : ServerBase(port, num_threads, timeout_request, timeout_send_or_receive) + { + } + }; + + using HTTP = asio::ip::tcp::socket; + + template<> + class Server : public ServerBase { + public: + Server() : ServerBase::ServerBase(80) {} + protected: + void accept() override { + //Create new socket for this connection + //Shared_ptr is used to pass temporary objects to the asynchronous functions + auto socket = std::make_shared(*m_io_context); + + acceptor->async_accept(*socket, [this, socket](const std::error_code& ec){ + //Immediately start accepting a new connection (if io_context hasn't been stopped) + if (ec != asio::error::operation_aborted) + accept(); + + if(!ec) { + asio::ip::tcp::no_delay option(true); + socket->set_option(option); + + read_request_and_content(socket); + } else if (on_error) + on_error(std::shared_ptr(new Request(*socket)), ec); + }); + } + }; + + class http_server : public Server { + public: + http_server() : Server::Server() {} + }; +} +#endif /* MAME_LIB_UTIL_SERVER_HTTP_IMPL_HPP */ diff --git a/src/lib/util/server_ws.hpp b/src/lib/util/server_ws.hpp index ad9887c38dd..563bd473b6d 100644 --- a/src/lib/util/server_ws.hpp +++ b/src/lib/util/server_ws.hpp @@ -1,24 +1,9 @@ // license:MIT // copyright-holders:Ole Christian Eidheim, Miodrag Milanovic -#ifndef SERVER_WS_HPP -#define SERVER_WS_HPP -#include "path_to_regex.hpp" -#include "crypto.hpp" - -#include "asio.h" -#include "asio/system_timer.hpp" - -#include -#include -#include -#include -#include +#ifndef MAME_LIB_UTIL_SERVER_WS_HPP +#define MAME_LIB_UTIL_SERVER_WS_HPP #include #include -#include -#include -#include -#include #ifndef CASE_INSENSITIVE_EQUALS_AND_HASH #define CASE_INSENSITIVE_EQUALS_AND_HASH @@ -42,681 +27,23 @@ public: return seed; } }; -#endif +#endif /* CASE_INSENSITIVE_EQUALS_AND_HASH */ namespace webpp { - template - class SocketServer; + struct Connection { + std::string method, path, http_version; - template - class SocketServerBase { - public: - virtual ~SocketServerBase() {} + std::unordered_multimap header; - class SendStream : public std::ostream, public std::enable_shared_from_this { - friend class SocketServerBase; + std::smatch path_match; - asio::streambuf streambuf; - public: - SendStream(): std::ostream(&streambuf) {} - size_t size() const { - return streambuf.size(); - } - }; - - - class Connection : public std::enable_shared_from_this { - friend class SocketServerBase; - friend class SocketServer; - - public: - explicit Connection(const std::shared_ptr &socket) : remote_endpoint_port(0), socket(socket), strand(socket->get_io_service()), closed(false) { } - - std::string method, path, http_version; - - std::unordered_multimap header; - - std::smatch path_match; - - std::string remote_endpoint_address; - unsigned short remote_endpoint_port; - - std::shared_ptr ptr() { - return this->shared_from_this(); - } - - private: - explicit Connection(socket_type *socket): remote_endpoint_port(0), socket(socket), strand(socket->get_io_service()), closed(false) { } - - class SendData { - public: - SendData(const std::shared_ptr &header_stream, const std::shared_ptr &message_stream, - const std::function &callback) : - header_stream(header_stream), message_stream(message_stream), callback(callback) {} - std::shared_ptr header_stream; - std::shared_ptr message_stream; - std::function callback; - }; - - std::shared_ptr socket; - - asio::io_context::strand strand; - - std::list send_queue; - - void send_from_queue(const std::shared_ptr &connection) { - strand.post([this, connection]() { - asio::async_write(*socket, send_queue.begin()->header_stream->streambuf, - strand.wrap([this, connection](const std::error_code& ec, size_t /*bytes_transferred*/) { - if(!ec) { - asio::async_write(*socket, send_queue.begin()->message_stream->streambuf, - strand.wrap([this, connection] - (const std::error_code& ec, size_t /*bytes_transferred*/) { - auto send_queued=send_queue.begin(); - if(send_queued->callback) - send_queued->callback(ec); - if(!ec) { - send_queue.erase(send_queued); - if(send_queue.size()>0) - send_from_queue(connection); - } - else - send_queue.clear(); - })); - } - else { - auto send_queued=send_queue.begin(); - if(send_queued->callback) - send_queued->callback(ec); - send_queue.clear(); - } - })); - }); - } - - std::atomic closed; - - std::unique_ptr timer_idle; - - void read_remote_endpoint_data() { - try { - remote_endpoint_address=socket->lowest_layer().remote_endpoint().address().to_string(); - remote_endpoint_port=socket->lowest_layer().remote_endpoint().port(); - } - catch (...) {} - } - }; - - class Message : public std::istream, public std::enable_shared_from_this { - friend class SocketServerBase; - - public: - unsigned char fin_rsv_opcode; - size_t size() const { - return length; - } - std::string string() const { - std::stringstream ss; - ss << rdbuf(); - return ss.str(); - } - private: - Message(): std::istream(&streambuf), fin_rsv_opcode(0), length(0) {} - - size_t length; - asio::streambuf streambuf; - }; - - class Endpoint : public std::enable_shared_from_this { - friend class SocketServerBase; - std::unordered_set > connections; - std::mutex connections_mutex; - - public: - std::function)> on_open; - std::function, std::shared_ptr)> on_message; - std::function, int, const std::string&)> on_close; - std::function, const std::error_code&)> on_error; - - std::unordered_set > get_connections() { - std::lock_guard lock(connections_mutex); - auto copy=connections; - return copy; - } - }; - - class Config { - friend class SocketServerBase; - - Config(unsigned short port) : port(port) {} - public: - /// Port number to use. Defaults to 80 for HTTP and 443 for HTTPS. - unsigned short port; - /// Number of threads that the server will use when start() is called. Defaults to 1 thread. - size_t thread_pool_size=1; - /// Timeout on request handling. Defaults to 5 seconds. - size_t timeout_request=5; - /// Idle timeout. Defaults to no timeout. - size_t timeout_idle=0; - /// IPv4 address in dotted decimal form or IPv6 address in hexadecimal notation. - /// If empty, the address will be any address. - std::string address; - /// Set to false to avoid binding the socket to an address that is already in use. Defaults to true. - bool reuse_address=true; - }; - ///Set before calling start(). - Config config; - - private: - class regex_orderable : public std::regex { - std::string str; - path2regex::Keys keys; - public: - regex_orderable(const char *regex_cstr) : std::regex(path2regex::path_to_regex(regex_cstr, keys)), str(regex_cstr) {} - regex_orderable(const std::string ®ex_cstr) : std::regex(path2regex::path_to_regex(regex_cstr, keys)), str(regex_cstr) {} - bool operator<(const regex_orderable &rhs) const { - return str m_endpoint; - std::mutex m_endpoint_mutex; - - virtual void start() { - if(!io_context) - io_context=std::make_shared(); - - if(io_context->stopped()) - io_context->reset(); - - asio::ip::tcp::endpoint endpoint; - if(config.address.size()>0) - endpoint=asio::ip::tcp::endpoint(asio::ip::address::from_string(config.address), config.port); - else - endpoint=asio::ip::tcp::endpoint(asio::ip::tcp::v4(), config.port); - - if(!acceptor) - acceptor= std::make_unique(*io_context); - acceptor->open(endpoint.protocol()); - acceptor->set_option(asio::socket_base::reuse_address(config.reuse_address)); - acceptor->bind(endpoint); - acceptor->listen(); - - accept(); - - io_context->run(); - } - - void stop() { - acceptor->close(); - io_context->stop(); - - std::lock_guard lock(m_endpoint_mutex); - for(auto& p: m_endpoint) - p.second.connections.clear(); - } - - ///fin_rsv_opcode: 129=one fragment, text, 130=one fragment, binary, 136=close connection. - ///See http://tools.ietf.org/html/rfc6455#section-5.2 for more information - void send(const std::shared_ptr &connection, const std::shared_ptr &message_stream, - const std::function& callback=nullptr, - unsigned char fin_rsv_opcode=129) const { - if(fin_rsv_opcode!=136) - timer_idle_reset(connection); - - auto header_stream = std::make_shared(); - - size_t length=message_stream->size(); - - header_stream->put(fin_rsv_opcode); - //unmasked (first length byte<128) - if(length>=126) { - int num_bytes; - if(length>0xffff) { - num_bytes=8; - header_stream->put(127); - } - else { - num_bytes=2; - header_stream->put(126); - } - - for(int c=num_bytes-1;c>=0;c--) { - header_stream->put((static_cast(length) >> (8 * c)) % 256); - } - } - else - header_stream->put(static_cast(length)); - - connection->strand.post([this, connection, header_stream, message_stream, callback]() { - connection->send_queue.emplace_back(header_stream, message_stream, callback); - if(connection->send_queue.size()==1) - connection->send_from_queue(connection); - }); - } - - void send_close(const std::shared_ptr &connection, int status, const std::string& reason="", - const std::function& callback=nullptr) const { - //Send close only once (in case close is initiated by server) - if(connection->closed) - return; - connection->closed=true; - - auto send_stream=std::make_shared(); - - send_stream->put(status>>8); - send_stream->put(status%256); - - *send_stream << reason; - - //fin_rsv_opcode=136: message close - send(connection, send_stream, callback, 136); - } - - std::unordered_set > get_connections() { - std::unordered_set > all_connections; - std::lock_guard lock(m_endpoint_mutex); - for(auto& e: m_endpoint) { - std::lock_guard lock(e.second.connections_mutex); - all_connections.insert(e.second.connections.begin(), e.second.connections.end()); - } - return all_connections; - } - - /** - * Upgrades a request, from for instance Simple-Web-Server, to a WebSocket connection. - * The parameters are moved to the Connection object. - * See also Server::on_upgrade in the Simple-Web-Server project. - * The socket's io_service is used, thus running start() is not needed. - * - * Example use: - * server.on_upgrade=[&socket_server] (auto socket, auto request) { - * auto connection=std::make_shared::Connection>(socket); - * connection->method=std::move(request->method); - * connection->path=std::move(request->path); - * connection->http_version=std::move(request->http_version); - * connection->header=std::move(request->header); - * connection->remote_endpoint_address=std::move(request->remote_endpoint_address); - * connection->remote_endpoint_port=request->remote_endpoint_port; - * socket_server.upgrade(connection); - * } - */ - void upgrade(const std::shared_ptr &connection) { - auto read_buffer=std::make_shared(); - write_handshake(connection, read_buffer); - } - - /// If you have your own asio::io_context, store its pointer here before running start(). - /// You might also want to set config.num_threads to 0. - std::shared_ptr io_context; - protected: - const std::string ws_magic_string="258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; - - std::unique_ptr acceptor; - - std::vector threads; - - SocketServerBase(unsigned short port) : - config(port) {} - - virtual void accept()=0; - - std::shared_ptr get_timeout_timer(const std::shared_ptr &connection, size_t seconds) { - if (seconds == 0) - return nullptr; - auto timer = std::make_shared(connection->socket->get_io_service()); - timer->expires_at(std::chrono::system_clock::now() + std::chrono::seconds(static_cast(seconds))); - timer->async_wait([connection](const std::error_code& ec){ - if(!ec) { - connection->socket->lowest_layer().shutdown(asio::ip::tcp::socket::shutdown_both); - connection->socket->lowest_layer().close(); - } - }); - return timer; - } - - void read_handshake(const std::shared_ptr &connection) { - connection->read_remote_endpoint_data(); - - //Create new read_buffer for async_read_until() - //Shared_ptr is used to pass temporary objects to the asynchronous functions - auto read_buffer = std::make_shared(); - - //Set timeout on the following asio::async-read or write function - auto timer = get_timeout_timer(connection, config.timeout_request); - - asio::async_read_until(*connection->socket, *read_buffer, "\r\n\r\n", - [this, connection, read_buffer, timer] - (const std::error_code& ec, size_t /*bytes_transferred*/) { - if(timer) - timer->cancel(); - if(!ec) { - //Convert to istream to extract string-lines - std::istream stream(read_buffer.get()); - - parse_handshake(connection, stream); - - write_handshake(connection, read_buffer); - } - }); - } - - void parse_handshake(const std::shared_ptr &connection, std::istream& stream) const { - std::string line; - getline(stream, line); - size_t method_end; - if((method_end=line.find(' '))!=std::string::npos) { - size_t path_end; - if((path_end=line.find(' ', method_end+1))!=std::string::npos) { - connection->method=line.substr(0, method_end); - connection->path=line.substr(method_end+1, path_end-method_end-1); - if((path_end+6)http_version=line.substr(path_end+6, line.size()-(path_end+6)-1); - else - connection->http_version="1.1"; - - getline(stream, line); - size_t param_end; - while((param_end=line.find(':'))!=std::string::npos) { - size_t value_start=param_end+1; - if((value_start)header.emplace(line.substr(0, param_end), line.substr(value_start, line.size()-value_start-1)); - } - - getline(stream, line); - } - } - } - } - - void write_handshake(const std::shared_ptr &connection, const std::shared_ptr &read_buffer) { - //Find path- and method-match, and generate response - std::lock_guard lock(m_endpoint_mutex); - for (auto ®ex_endpoint : m_endpoint) { - std::smatch path_match; - if(std::regex_match(connection->path, path_match, regex_endpoint.first)) { - auto write_buffer = std::make_shared(); - std::ostream handshake(write_buffer.get()); - - if(generate_handshake(connection, handshake)) { - connection->path_match=std::move(path_match); - //Capture write_buffer in lambda so it is not destroyed before async_write is finished - asio::async_write(*connection->socket, *write_buffer, - [this, connection, write_buffer, read_buffer, ®ex_endpoint] - (const std::error_code& ec, size_t /*bytes_transferred*/) { - if(!ec) { - connection_open(connection, regex_endpoint.second); - read_message(connection, read_buffer, regex_endpoint.second); - } - else - connection_error(connection, regex_endpoint.second, ec); - }); - } - return; - } - } - } - - bool generate_handshake(const std::shared_ptr &connection, std::ostream& handshake) const { - auto header_it = connection->header.find("Sec-WebSocket-Key"); - if (header_it == connection->header.end()) - return false; - - auto sha1=sha1_encode(header_it->second + ws_magic_string); - - handshake << "HTTP/1.1 101 Web Socket Protocol Handshake\r\n"; - handshake << "Upgrade: websocket\r\n"; - handshake << "Connection: Upgrade\r\n"; - handshake << "Sec-WebSocket-Accept: " << base64_encode(sha1) << "\r\n"; - handshake << "\r\n"; - - return true; - } - - void read_message(const std::shared_ptr &connection, - const std::shared_ptr &read_buffer, Endpoint& endpoint) const { - asio::async_read(*connection->socket, *read_buffer, asio::transfer_exactly(2), - [this, connection, read_buffer, &endpoint] - (const std::error_code& ec, size_t bytes_transferred) { - if(!ec) { - if(bytes_transferred==0) { //TODO: why does this happen sometimes? - read_message(connection, read_buffer, endpoint); - return; - } - std::istream stream(read_buffer.get()); - - std::vector first_bytes; - first_bytes.resize(2); - stream.read(reinterpret_cast(&first_bytes[0]), 2); - - unsigned char fin_rsv_opcode=first_bytes[0]; - - //Close connection if unmasked message from client (protocol error) - if(first_bytes[1]<128) { - const std::string reason("message from client not masked"); - send_close(connection, 1002, reason, [this, connection](const std::error_code& /*ec*/) {}); - connection_close(connection, endpoint, 1002, reason); - return; - } - - size_t length=(first_bytes[1]&127); - - if(length==126) { - //2 next bytes is the size of content - asio::async_read(*connection->socket, *read_buffer, asio::transfer_exactly(2), - [this, connection, read_buffer, &endpoint, fin_rsv_opcode] - (const std::error_code& ec, size_t /*bytes_transferred*/) { - if(!ec) { - std::istream stream(read_buffer.get()); - - std::vector length_bytes; - length_bytes.resize(2); - stream.read(reinterpret_cast(&length_bytes[0]), 2); - - size_t length=0; - int num_bytes=2; - for(int c=0;csocket, *read_buffer, asio::transfer_exactly(8), - [this, connection, read_buffer, &endpoint, fin_rsv_opcode] - (const std::error_code& ec, size_t /*bytes_transferred*/) { - if(!ec) { - std::istream stream(read_buffer.get()); - - std::vector length_bytes; - length_bytes.resize(8); - stream.read(reinterpret_cast(&length_bytes[0]), 8); - - size_t length=0; - int num_bytes=8; - for(int c=0;c &connection, const std::shared_ptr &read_buffer, - size_t length, Endpoint& endpoint, unsigned char fin_rsv_opcode) const { - asio::async_read(*connection->socket, *read_buffer, asio::transfer_exactly(4+length), - [this, connection, read_buffer, length, &endpoint, fin_rsv_opcode] - (const std::error_code& ec, size_t /*bytes_transferred*/) { - if(!ec) { - std::istream raw_message_data(read_buffer.get()); - - //Read mask - std::vector mask; - mask.resize(4); - raw_message_data.read(reinterpret_cast(&mask[0]), 4); - - std::shared_ptr message(new Message()); - message->length=length; - message->fin_rsv_opcode=fin_rsv_opcode; - - std::ostream message_data_out_stream(&message->streambuf); - for(size_t c=0;c=2) { - unsigned char byte1=message->get(); - unsigned char byte2=message->get(); - status=(byte1<<8)+byte2; - } - - auto reason=message->string(); - send_close(connection, status, reason, [this, connection](const std::error_code& /*ec*/) {}); - connection_close(connection, endpoint, status, reason); - return; - } - else { - //If ping - if((fin_rsv_opcode&0x0f)==9) { - //send pong - auto empty_send_stream=std::make_shared(); - send(connection, empty_send_stream, nullptr, fin_rsv_opcode+1); - } - else if(endpoint.on_message) { - timer_idle_reset(connection); - endpoint.on_message(connection, message); - } - - //Next message - read_message(connection, read_buffer, endpoint); - } - } - else - connection_error(connection, endpoint, ec); - }); - } - - void connection_open(const std::shared_ptr &connection, Endpoint& endpoint) { - timer_idle_init(connection); - - { - std::lock_guard lock(endpoint.connections_mutex); - endpoint.connections.insert(connection); - } - - if(endpoint.on_open) - endpoint.on_open(connection); - } - - void connection_close(const std::shared_ptr &connection, Endpoint& endpoint, int status, const std::string& reason) const { - timer_idle_cancel(connection); - - { - std::lock_guard lock(endpoint.connections_mutex); - endpoint.connections.erase(connection); - } - - if(endpoint.on_close) - endpoint.on_close(connection, status, reason); - } - - void connection_error(const std::shared_ptr &connection, Endpoint& endpoint, const std::error_code& ec) const { - timer_idle_cancel(connection); - - { - std::lock_guard lock(endpoint.connections_mutex); - endpoint.connections.erase(connection); - } - - if(endpoint.on_error) { - std::error_code ec_tmp=ec; - endpoint.on_error(connection, ec_tmp); - } - } - - void timer_idle_init(const std::shared_ptr &connection) { - if(config.timeout_idle>0) { - connection->timer_idle= std::make_unique(connection->socket->get_io_service()); - connection->timer_idle->expires_from_now(std::chrono::seconds(static_cast(config.timeout_idle))); - timer_idle_expired_function(connection); - } - } - void timer_idle_reset(const std::shared_ptr &connection) const { - if(config.timeout_idle>0 && connection->timer_idle->expires_from_now(std::chrono::seconds(static_cast(config.timeout_idle)))>0) - timer_idle_expired_function(connection); - } - void timer_idle_cancel(const std::shared_ptr &connection) const { - if(config.timeout_idle>0) - connection->timer_idle->cancel(); - } - - void timer_idle_expired_function(const std::shared_ptr &connection) const { - connection->timer_idle->async_wait([this, connection](const std::error_code& ec){ - if(!ec) - send_close(connection, 1000, "idle timeout"); //1000=normal closure - }); - } - }; - - template - class SocketServer : public SocketServerBase { - public: - SocketServer(unsigned short port, size_t timeout_request, size_t timeout_idle) - : SocketServerBase(port, timeout_request, timeout_idle) - { - } - }; - - using WS = asio::ip::tcp::socket; - - template<> - class SocketServer : public SocketServerBase { - public: - SocketServer() : SocketServerBase(80) {} - protected: - void accept() override { - //Create new socket for this connection (stored in Connection::socket) - //Shared_ptr is used to pass temporary objects to the asynchronous functions - std::shared_ptr connection(new Connection(new WS(*io_context))); - - acceptor->async_accept(*connection->socket, [this, connection](const std::error_code& ec) { - //Immediately start accepting a new connection (if io_context hasn't been stopped) - if (ec != asio::error::operation_aborted) - accept(); - - if(!ec) { - asio::ip::tcp::no_delay option(true); - connection->socket->set_option(option); - - read_handshake(connection); - } - }); - } - }; - - class ws_server : public SocketServer { - public: - ws_server() : SocketServer::SocketServer() {} + std::string remote_endpoint_address; + unsigned short remote_endpoint_port; + + Connection(unsigned short remote_endpoint_port) : remote_endpoint_port(remote_endpoint_port) {} + virtual ~Connection() {} }; + + class ws_server; } -#endif /* SERVER_WS_HPP */ +#endif /* MAME_LIB_UTIL_SERVER_WS_HPP */ diff --git a/src/lib/util/server_ws_impl.hpp b/src/lib/util/server_ws_impl.hpp new file mode 100644 index 00000000000..90f1a4a1385 --- /dev/null +++ b/src/lib/util/server_ws_impl.hpp @@ -0,0 +1,717 @@ +// license:MIT +// copyright-holders:Ole Christian Eidheim, Miodrag Milanovic +#ifndef MAME_LIB_UTIL_SERVER_WS_IMPL_HPP +#define MAME_LIB_UTIL_SERVER_WS_IMPL_HPP +#include "path_to_regex.hpp" +#include "crypto.hpp" + +#include "asio.h" +#include "asio/system_timer.hpp" + +#include "server_ws.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#ifndef CASE_INSENSITIVE_EQUALS_AND_HASH +#define CASE_INSENSITIVE_EQUALS_AND_HASH +class case_insensitive_equals { +public: + bool operator()(const std::string &key1, const std::string &key2) const { + return key1.size() == key2.size() + && equal(key1.cbegin(), key1.cend(), key2.cbegin(), + [](std::string::value_type key1v, std::string::value_type key2v) + { return tolower(key1v) == tolower(key2v); }); + } +}; +class case_insensitive_hash { +public: + size_t operator()(const std::string &key) const { + size_t seed = 0; + for (auto &c : key) { + std::hash hasher; + seed ^= hasher(std::tolower(c)) + 0x9e3779b9 + (seed << 6) + (seed >> 2); + } + return seed; + } +}; +#endif + +namespace webpp { + template + class SocketServer; + + template + class SocketServerBase { + public: + virtual ~SocketServerBase() {} + + class SendStream : public std::ostream, public std::enable_shared_from_this { + friend class SocketServerBase; + + asio::streambuf streambuf; + public: + SendStream(): std::ostream(&streambuf) {} + size_t size() const { + return streambuf.size(); + } + }; + + + class Connection : public webpp::Connection { + friend class SocketServerBase; + friend class SocketServer; + using super = webpp::Connection; + + public: + virtual ~Connection() {} + explicit Connection(const std::shared_ptr &socket) : super(0), socket(socket), strand(socket->get_io_service()), closed(false) { } + + private: + explicit Connection(socket_type *socket): super(0), socket(socket), strand(socket->get_io_service()), closed(false) { } + + class SendData { + public: + SendData(const std::shared_ptr &header_stream, const std::shared_ptr &message_stream, + const std::function &callback) : + header_stream(header_stream), message_stream(message_stream), callback(callback) {} + std::shared_ptr header_stream; + std::shared_ptr message_stream; + std::function callback; + }; + + std::shared_ptr socket; + + asio::io_context::strand strand; + + std::list send_queue; + + void send_from_queue(const std::shared_ptr &connection) { + strand.post([this, connection]() { + asio::async_write(*socket, send_queue.begin()->header_stream->streambuf, + strand.wrap([this, connection](const std::error_code& ec, size_t /*bytes_transferred*/) { + if(!ec) { + asio::async_write(*socket, send_queue.begin()->message_stream->streambuf, + strand.wrap([this, connection] + (const std::error_code& ec, size_t /*bytes_transferred*/) { + auto send_queued=send_queue.begin(); + if(send_queued->callback) + send_queued->callback(ec); + if(!ec) { + send_queue.erase(send_queued); + if(send_queue.size()>0) + send_from_queue(connection); + } + else + send_queue.clear(); + })); + } + else { + auto send_queued=send_queue.begin(); + if(send_queued->callback) + send_queued->callback(ec); + send_queue.clear(); + } + })); + }); + } + + std::atomic closed; + + std::unique_ptr timer_idle; + + void read_remote_endpoint_data() { + try { + remote_endpoint_address=socket->lowest_layer().remote_endpoint().address().to_string(); + remote_endpoint_port=socket->lowest_layer().remote_endpoint().port(); + } + catch (...) {} + } + }; + + class Message : public std::istream, public std::enable_shared_from_this { + friend class SocketServerBase; + + public: + unsigned char fin_rsv_opcode; + size_t size() const { + return length; + } + std::string string() const { + std::stringstream ss; + ss << rdbuf(); + return ss.str(); + } + private: + Message(): std::istream(&streambuf), fin_rsv_opcode(0), length(0) {} + + size_t length; + asio::streambuf streambuf; + }; + + class Endpoint : public std::enable_shared_from_this { + friend class SocketServerBase; + std::unordered_set > connections; + std::mutex connections_mutex; + + public: + std::function)> on_open; + std::function, std::shared_ptr)> on_message; + std::function, int, const std::string&)> on_close; + std::function, const std::error_code&)> on_error; + + std::unordered_set > get_connections() { + std::lock_guard lock(connections_mutex); + auto copy=connections; + return copy; + } + }; + + class Config { + friend class SocketServerBase; + + Config(unsigned short port) : port(port) {} + public: + /// Port number to use. Defaults to 80 for HTTP and 443 for HTTPS. + unsigned short port; + /// Number of threads that the server will use when start() is called. Defaults to 1 thread. + size_t thread_pool_size=1; + /// Timeout on request handling. Defaults to 5 seconds. + size_t timeout_request=5; + /// Idle timeout. Defaults to no timeout. + size_t timeout_idle=0; + /// IPv4 address in dotted decimal form or IPv6 address in hexadecimal notation. + /// If empty, the address will be any address. + std::string address; + /// Set to false to avoid binding the socket to an address that is already in use. Defaults to true. + bool reuse_address=true; + }; + ///Set before calling start(). + Config config; + + private: + class regex_orderable : public std::regex { + std::string str; + path2regex::Keys keys; + public: + regex_orderable(const char *regex_cstr) : std::regex(path2regex::path_to_regex(regex_cstr, keys)), str(regex_cstr) {} + regex_orderable(const std::string ®ex_cstr) : std::regex(path2regex::path_to_regex(regex_cstr, keys)), str(regex_cstr) {} + bool operator<(const regex_orderable &rhs) const { + return str m_endpoint; + std::mutex m_endpoint_mutex; + + virtual void start() { + if(!io_context) + io_context=std::make_shared(); + + if(io_context->stopped()) + io_context->reset(); + + asio::ip::tcp::endpoint endpoint; + if(config.address.size()>0) + endpoint=asio::ip::tcp::endpoint(asio::ip::address::from_string(config.address), config.port); + else + endpoint=asio::ip::tcp::endpoint(asio::ip::tcp::v4(), config.port); + + if(!acceptor) + acceptor= std::make_unique(*io_context); + acceptor->open(endpoint.protocol()); + acceptor->set_option(asio::socket_base::reuse_address(config.reuse_address)); + acceptor->bind(endpoint); + acceptor->listen(); + + accept(); + + io_context->run(); + } + + void stop() { + acceptor->close(); + io_context->stop(); + + std::lock_guard lock(m_endpoint_mutex); + for(auto& p: m_endpoint) + p.second.connections.clear(); + } + + ///fin_rsv_opcode: 129=one fragment, text, 130=one fragment, binary, 136=close connection. + ///See http://tools.ietf.org/html/rfc6455#section-5.2 for more information + void send(std::shared_ptr conn, const std::shared_ptr &message_stream, + const std::function& callback=nullptr, + unsigned char fin_rsv_opcode=129) const { + std::shared_ptr connection = std::dynamic_pointer_cast (conn); + if (!connection) return; + + if(fin_rsv_opcode!=136) + timer_idle_reset(connection); + + auto header_stream = std::make_shared(); + + size_t length=message_stream->size(); + + header_stream->put(fin_rsv_opcode); + //unmasked (first length byte<128) + if(length>=126) { + int num_bytes; + if(length>0xffff) { + num_bytes=8; + header_stream->put(127); + } + else { + num_bytes=2; + header_stream->put(126); + } + + for(int c=num_bytes-1;c>=0;c--) { + header_stream->put((static_cast(length) >> (8 * c)) % 256); + } + } + else + header_stream->put(static_cast(length)); + + connection->strand.post([this, connection, header_stream, message_stream, callback]() { + connection->send_queue.emplace_back(header_stream, message_stream, callback); + if(connection->send_queue.size()==1) + connection->send_from_queue(connection); + }); + } + + void send_close(std::shared_ptr conn, int status, const std::string& reason="", + const std::function& callback=nullptr) const { + std::shared_ptr connection = std::dynamic_pointer_cast (conn); + if (!connection) return; + //Send close only once (in case close is initiated by server) + if(connection->closed) + return; + connection->closed=true; + + auto send_stream=std::make_shared(); + + send_stream->put(status>>8); + send_stream->put(status%256); + + *send_stream << reason; + + //fin_rsv_opcode=136: message close + send(connection, send_stream, callback, 136); + } + + std::unordered_set > get_connections() { + std::unordered_set > all_connections; + std::lock_guard lock(m_endpoint_mutex); + for(auto& e: m_endpoint) { + std::lock_guard lock(e.second.connections_mutex); + all_connections.insert(e.second.connections.begin(), e.second.connections.end()); + } + return all_connections; + } + + /** + * Upgrades a request, from for instance Simple-Web-Server, to a WebSocket connection. + * The parameters are moved to the Connection object. + * See also Server::on_upgrade in the Simple-Web-Server project. + * The socket's io_service is used, thus running start() is not needed. + * + * Example use: + * server.on_upgrade=[&socket_server] (auto socket, auto request) { + * auto connection=std::make_shared::Connection>(socket); + * connection->method=std::move(request->method); + * connection->path=std::move(request->path); + * connection->http_version=std::move(request->http_version); + * connection->header=std::move(request->header); + * connection->remote_endpoint_address=std::move(request->remote_endpoint_address); + * connection->remote_endpoint_port=request->remote_endpoint_port; + * socket_server.upgrade(connection); + * } + */ + void upgrade(const std::shared_ptr &connection) { + auto read_buffer=std::make_shared(); + write_handshake(connection, read_buffer); + } + + /// If you have your own asio::io_context, store its pointer here before running start(). + /// You might also want to set config.num_threads to 0. + std::shared_ptr io_context; + protected: + const std::string ws_magic_string="258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; + + std::unique_ptr acceptor; + + std::vector threads; + + SocketServerBase(unsigned short port) : + config(port) {} + + virtual void accept()=0; + + std::shared_ptr get_timeout_timer(const std::shared_ptr &connection, size_t seconds) { + if (seconds == 0) + return nullptr; + auto timer = std::make_shared(connection->socket->get_io_service()); + timer->expires_at(std::chrono::system_clock::now() + std::chrono::seconds(static_cast(seconds))); + timer->async_wait([connection](const std::error_code& ec){ + if(!ec) { + connection->socket->lowest_layer().shutdown(asio::ip::tcp::socket::shutdown_both); + connection->socket->lowest_layer().close(); + } + }); + return timer; + } + + void read_handshake(const std::shared_ptr &connection) { + connection->read_remote_endpoint_data(); + + //Create new read_buffer for async_read_until() + //Shared_ptr is used to pass temporary objects to the asynchronous functions + auto read_buffer = std::make_shared(); + + //Set timeout on the following asio::async-read or write function + auto timer = get_timeout_timer(connection, config.timeout_request); + + asio::async_read_until(*connection->socket, *read_buffer, "\r\n\r\n", + [this, connection, read_buffer, timer] + (const std::error_code& ec, size_t /*bytes_transferred*/) { + if(timer) + timer->cancel(); + if(!ec) { + //Convert to istream to extract string-lines + std::istream stream(read_buffer.get()); + + parse_handshake(connection, stream); + + write_handshake(connection, read_buffer); + } + }); + } + + void parse_handshake(const std::shared_ptr &connection, std::istream& stream) const { + std::string line; + getline(stream, line); + size_t method_end; + if((method_end=line.find(' '))!=std::string::npos) { + size_t path_end; + if((path_end=line.find(' ', method_end+1))!=std::string::npos) { + connection->method=line.substr(0, method_end); + connection->path=line.substr(method_end+1, path_end-method_end-1); + if((path_end+6)http_version=line.substr(path_end+6, line.size()-(path_end+6)-1); + else + connection->http_version="1.1"; + + getline(stream, line); + size_t param_end; + while((param_end=line.find(':'))!=std::string::npos) { + size_t value_start=param_end+1; + if((value_start)header.emplace(line.substr(0, param_end), line.substr(value_start, line.size()-value_start-1)); + } + + getline(stream, line); + } + } + } + } + + void write_handshake(const std::shared_ptr &connection, const std::shared_ptr &read_buffer) { + //Find path- and method-match, and generate response + std::lock_guard lock(m_endpoint_mutex); + for (auto ®ex_endpoint : m_endpoint) { + std::smatch path_match; + if(std::regex_match(connection->path, path_match, regex_endpoint.first)) { + auto write_buffer = std::make_shared(); + std::ostream handshake(write_buffer.get()); + + if(generate_handshake(connection, handshake)) { + connection->path_match=std::move(path_match); + //Capture write_buffer in lambda so it is not destroyed before async_write is finished + asio::async_write(*connection->socket, *write_buffer, + [this, connection, write_buffer, read_buffer, ®ex_endpoint] + (const std::error_code& ec, size_t /*bytes_transferred*/) { + if(!ec) { + connection_open(connection, regex_endpoint.second); + read_message(connection, read_buffer, regex_endpoint.second); + } + else + connection_error(connection, regex_endpoint.second, ec); + }); + } + return; + } + } + } + + bool generate_handshake(const std::shared_ptr &connection, std::ostream& handshake) const { + auto header_it = connection->header.find("Sec-WebSocket-Key"); + if (header_it == connection->header.end()) + return false; + + auto sha1=sha1_encode(header_it->second + ws_magic_string); + + handshake << "HTTP/1.1 101 Web Socket Protocol Handshake\r\n"; + handshake << "Upgrade: websocket\r\n"; + handshake << "Connection: Upgrade\r\n"; + handshake << "Sec-WebSocket-Accept: " << base64_encode(sha1) << "\r\n"; + handshake << "\r\n"; + + return true; + } + + void read_message(const std::shared_ptr &connection, + const std::shared_ptr &read_buffer, Endpoint& endpoint) const { + asio::async_read(*connection->socket, *read_buffer, asio::transfer_exactly(2), + [this, connection, read_buffer, &endpoint] + (const std::error_code& ec, size_t bytes_transferred) { + if(!ec) { + if(bytes_transferred==0) { //TODO: why does this happen sometimes? + read_message(connection, read_buffer, endpoint); + return; + } + std::istream stream(read_buffer.get()); + + std::vector first_bytes; + first_bytes.resize(2); + stream.read(reinterpret_cast(&first_bytes[0]), 2); + + unsigned char fin_rsv_opcode=first_bytes[0]; + + //Close connection if unmasked message from client (protocol error) + if(first_bytes[1]<128) { + const std::string reason("message from client not masked"); + send_close(connection, 1002, reason, [this, connection](const std::error_code& /*ec*/) {}); + connection_close(connection, endpoint, 1002, reason); + return; + } + + size_t length=(first_bytes[1]&127); + + if(length==126) { + //2 next bytes is the size of content + asio::async_read(*connection->socket, *read_buffer, asio::transfer_exactly(2), + [this, connection, read_buffer, &endpoint, fin_rsv_opcode] + (const std::error_code& ec, size_t /*bytes_transferred*/) { + if(!ec) { + std::istream stream(read_buffer.get()); + + std::vector length_bytes; + length_bytes.resize(2); + stream.read(reinterpret_cast(&length_bytes[0]), 2); + + size_t length=0; + int num_bytes=2; + for(int c=0;csocket, *read_buffer, asio::transfer_exactly(8), + [this, connection, read_buffer, &endpoint, fin_rsv_opcode] + (const std::error_code& ec, size_t /*bytes_transferred*/) { + if(!ec) { + std::istream stream(read_buffer.get()); + + std::vector length_bytes; + length_bytes.resize(8); + stream.read(reinterpret_cast(&length_bytes[0]), 8); + + size_t length=0; + int num_bytes=8; + for(int c=0;c &connection, const std::shared_ptr &read_buffer, + size_t length, Endpoint& endpoint, unsigned char fin_rsv_opcode) const { + asio::async_read(*connection->socket, *read_buffer, asio::transfer_exactly(4+length), + [this, connection, read_buffer, length, &endpoint, fin_rsv_opcode] + (const std::error_code& ec, size_t /*bytes_transferred*/) { + if(!ec) { + std::istream raw_message_data(read_buffer.get()); + + //Read mask + std::vector mask; + mask.resize(4); + raw_message_data.read(reinterpret_cast(&mask[0]), 4); + + std::shared_ptr message(new Message()); + message->length=length; + message->fin_rsv_opcode=fin_rsv_opcode; + + std::ostream message_data_out_stream(&message->streambuf); + for(size_t c=0;c=2) { + unsigned char byte1=message->get(); + unsigned char byte2=message->get(); + status=(byte1<<8)+byte2; + } + + auto reason=message->string(); + send_close(connection, status, reason, [this, connection](const std::error_code& /*ec*/) {}); + connection_close(connection, endpoint, status, reason); + return; + } + else { + //If ping + if((fin_rsv_opcode&0x0f)==9) { + //send pong + auto empty_send_stream=std::make_shared(); + send(connection, empty_send_stream, nullptr, fin_rsv_opcode+1); + } + else if(endpoint.on_message) { + timer_idle_reset(connection); + endpoint.on_message(connection, message); + } + + //Next message + read_message(connection, read_buffer, endpoint); + } + } + else + connection_error(connection, endpoint, ec); + }); + } + + void connection_open(const std::shared_ptr &connection, Endpoint& endpoint) { + timer_idle_init(connection); + + { + std::lock_guard lock(endpoint.connections_mutex); + endpoint.connections.insert(connection); + } + + if(endpoint.on_open) + endpoint.on_open(connection); + } + + void connection_close(const std::shared_ptr &connection, Endpoint& endpoint, int status, const std::string& reason) const { + timer_idle_cancel(connection); + + { + std::lock_guard lock(endpoint.connections_mutex); + endpoint.connections.erase(connection); + } + + if(endpoint.on_close) + endpoint.on_close(connection, status, reason); + } + + void connection_error(const std::shared_ptr &connection, Endpoint& endpoint, const std::error_code& ec) const { + timer_idle_cancel(connection); + + { + std::lock_guard lock(endpoint.connections_mutex); + endpoint.connections.erase(connection); + } + + if(endpoint.on_error) { + std::error_code ec_tmp=ec; + endpoint.on_error(connection, ec_tmp); + } + } + + void timer_idle_init(const std::shared_ptr &connection) { + if(config.timeout_idle>0) { + connection->timer_idle= std::make_unique(connection->socket->get_io_service()); + connection->timer_idle->expires_from_now(std::chrono::seconds(static_cast(config.timeout_idle))); + timer_idle_expired_function(connection); + } + } + void timer_idle_reset(const std::shared_ptr &connection) const { + if(config.timeout_idle>0 && connection->timer_idle->expires_from_now(std::chrono::seconds(static_cast(config.timeout_idle)))>0) + timer_idle_expired_function(connection); + } + void timer_idle_cancel(const std::shared_ptr &connection) const { + if(config.timeout_idle>0) + connection->timer_idle->cancel(); + } + + void timer_idle_expired_function(const std::shared_ptr &connection) const { + connection->timer_idle->async_wait([this, connection](const std::error_code& ec){ + if(!ec) + send_close(connection, 1000, "idle timeout"); //1000=normal closure + }); + } + }; + + template + class SocketServer : public SocketServerBase { + public: + SocketServer(unsigned short port, size_t timeout_request, size_t timeout_idle) + : SocketServerBase(port, timeout_request, timeout_idle) + { + } + }; + + using WS = asio::ip::tcp::socket; + + template<> + class SocketServer : public SocketServerBase { + public: + SocketServer() : SocketServerBase(80) {} + protected: + void accept() override { + //Create new socket for this connection (stored in Connection::socket) + //Shared_ptr is used to pass temporary objects to the asynchronous functions + std::shared_ptr connection(new Connection(new WS(*io_context))); + + acceptor->async_accept(*connection->socket, [this, connection](const std::error_code& ec) { + //Immediately start accepting a new connection (if io_context hasn't been stopped) + if (ec != asio::error::operation_aborted) + accept(); + + if(!ec) { + asio::ip::tcp::no_delay option(true); + connection->socket->set_option(option); + + read_handshake(connection); + } + }); + } + }; + + class ws_server : public SocketServer { + public: + ws_server() : SocketServer::SocketServer() {} + }; +} +#endif /* MAME_LIB_UTIL_SERVER_WS_IMPL_HPP */ diff --git a/src/mame/machine/esqpanel.cpp b/src/mame/machine/esqpanel.cpp index 2348cb29549..f86b0181b29 100644 --- a/src/mame/machine/esqpanel.cpp +++ b/src/mame/machine/esqpanel.cpp @@ -19,369 +19,373 @@ #include #include -class external_panel; +namespace esqpanel { -using external_panel_ptr = std::shared_ptr; -typedef std::map> connection_to_panel_map; + class external_panel; -enum message_type { - UNKNOWN = 0, - ANALOG = 1 << 0, - BUTTON = 1 << 1, - CONTROL = 1 << 2, - DISPLAY = 1 << 3, - INFO = 1 << 4 -}; + using external_panel_ptr = std::shared_ptr; + typedef std::map> connection_to_panel_map; -class external_panel : public std::enable_shared_from_this -{ -public: - static int get_message_type(const char c) - { - switch(c) - { - case 'A': - return message_type::ANALOG; - case 'B': - return message_type::BUTTON; - case 'C': - return message_type::CONTROL; - case 'D': - return message_type::DISPLAY; - case 'I': - return message_type::INFO; - default: - return message_type::UNKNOWN; - } - } - - external_panel() : m_send_message_types(0) - { - // printf("session: constructed\n"); - } - - int handle_control_message(const std::string &command) - { - int old = m_send_message_types; - std::istringstream is(command); - if (get_message_type(is.get()) != message_type::CONTROL) - { - return 0; - } - - int n; - while (!is.eof()) { - char c = is.get(); - int message_type = external_panel::get_message_type(c); - is >> n; - int send = (n != 0); - if (send) - { - m_send_message_types |= message_type; - } - else - { - m_send_message_types &= ~message_type; - } - } - - return m_send_message_types ^ old; - } - - int send_message_types() - { - return m_send_message_types; - } - - bool send_display_data() - { - return m_send_message_types & message_type::DISPLAY; - } - - bool send_analog_values() - { - return m_send_message_types & message_type::ANALOG; - } - - bool send_buttons() - { - return m_send_message_types & message_type::BUTTON; - } - -private: - int m_send_message_types; -}; - -class esqpanel_external_panel_server -{ -public: - enum websocket_opcode { - text = 1, - binary = 2 + enum message_type { + UNKNOWN = 0, + ANALOG = 1 << 0, + BUTTON = 1 << 1, + CONTROL = 1 << 2, + DISPLAY = 1 << 3, + INFO = 1 << 4 }; - esqpanel_external_panel_server(http_manager *webserver) : - m_server(webserver), - m_keyboard("unknown"), - m_version("1") - { - using namespace std::placeholders; - if (m_server->is_active()) { - m_server->add_endpoint("/esqpanel/socket", - std::bind(&esqpanel_external_panel_server::on_open, this, _1), - std::bind(&esqpanel_external_panel_server::on_message, this, _1, _2, _3), - std::bind(&esqpanel_external_panel_server::on_close, this, _1, _2, _3), - std::bind(&esqpanel_external_panel_server::on_error, this, _1, _2) - ); - } - } - virtual ~esqpanel_external_panel_server() + class external_panel { - if (m_server->is_active()) { - m_server->remove_endpoint("/esqpanel/socket"); - } - } - - void send_to_all(char c) - { - // printf("server: send_to_all(%02x)\n", ((unsigned int) c) & 0xff); - std::lock_guard lock(m_mutex); - // printf("server: sending '%02x' to all\n", ((unsigned int) c) & 0xff); - m_to_send.str(""); - m_to_send.put('D'); - m_to_send.put(c); - const std::string &s = m_to_send.str(); - - for (auto iter: m_panels) + public: + static int get_message_type(const char c) { - external_panel_ptr panel = iter.second; - if (panel->send_display_data()) + switch(c) { - send(iter.first, s); + case 'A': + return message_type::ANALOG; + case 'B': + return message_type::BUTTON; + case 'C': + return message_type::CONTROL; + case 'D': + return message_type::DISPLAY; + case 'I': + return message_type::INFO; + default: + return message_type::UNKNOWN; } } - } - void on_open(http_manager::websocket_connection_ptr connection) + external_panel() : m_send_message_types(0) + { + // printf("session: constructed\n"); + } + + int handle_control_message(const std::string &command) + { + int old = m_send_message_types; + std::istringstream is(command); + if (get_message_type(is.get()) != message_type::CONTROL) + { + return 0; + } + + int n; + while (!is.eof()) { + char c = is.get(); + int message_type = external_panel::get_message_type(c); + is >> n; + int send = (n != 0); + if (send) + { + m_send_message_types |= message_type; + } + else + { + m_send_message_types &= ~message_type; + } + } + + return m_send_message_types ^ old; + } + + int send_message_types() + { + return m_send_message_types; + } + + bool send_display_data() + { + return m_send_message_types & message_type::DISPLAY; + } + + bool send_analog_values() + { + return m_send_message_types & message_type::ANALOG; + } + + bool send_buttons() + { + return m_send_message_types & message_type::BUTTON; + } + + private: + int m_send_message_types; + }; + + class external_panel_server { - using namespace std::placeholders; - - std::lock_guard lock(m_mutex); - m_panels[connection] = std::make_shared(); - } - - void on_message(http_manager::websocket_connection_ptr connection, const std::string &payload, int opcode) - { - external_panel_ptr panel = external_panel_for_connection(connection); - const std::string &command = payload; - - int t = external_panel::get_message_type(command.front()); - - if (t == message_type::CONTROL) + public: + enum websocket_opcode { + text = 1, + binary = 2 + }; + external_panel_server(http_manager *webserver) : + m_server(webserver), + m_keyboard("unknown"), + m_version("1") { - int changed = panel->handle_control_message(command); - // printf("server: control message, changed = '%x'\n", changed); - if ((changed & message_type::DISPLAY) && panel->send_display_data()) - { - // printf("server: control message, sending contents\n"); - send_contents(connection); - } - - if ((changed & message_type::ANALOG) && panel->send_analog_values()) - { - // printf("server: control message, sending analog values\n"); - send_analog_values(connection); - } - - if ((changed & message_type::BUTTON) && panel->send_buttons()) - { - // printf("server: control message, sending button states\n"); - send_button_states(connection); + using namespace std::placeholders; + if (m_server->is_active()) { + m_server->add_endpoint("/esqpanel/socket", + std::bind(&external_panel_server::on_open, this, _1), + std::bind(&external_panel_server::on_message, this, _1, _2, _3), + std::bind(&external_panel_server::on_close, this, _1, _2, _3), + std::bind(&external_panel_server::on_error, this, _1, _2) + ); } } - else if (t == message_type::INFO) - { - std::ostringstream o; - o << "I" << get_keyboard() << "," << get_version(); - send(connection, o.str()); - } - else - { - { - std::lock_guard lock(m_mutex); - m_commands.emplace_back(command); - } - // Echo the non-command message to any other connected panels that want it + virtual ~external_panel_server() + { + if (m_server->is_active()) { + m_server->remove_endpoint("/esqpanel/socket"); + } + } + + void send_to_all(char c) + { + // printf("server: send_to_all(%02x)\n", ((unsigned int) c) & 0xff); + std::lock_guard lock(m_mutex); + // printf("server: sending '%02x' to all\n", ((unsigned int) c) & 0xff); + m_to_send.str(""); + m_to_send.put('D'); + m_to_send.put(c); + const std::string &s = m_to_send.str(); + for (auto iter: m_panels) { - external_panel_ptr other_panel = iter.second; - if (other_panel != panel && (t & other_panel->send_message_types()) != 0) + external_panel_ptr panel = iter.second; + if (panel->send_display_data()) { - send(iter.first, command); + send(iter.first, s); } } } - } - void on_close(http_manager::websocket_connection_ptr connection, int status, const std::string& reason) - { - std::lock_guard lock(m_mutex); - m_panels.erase(connection); - } - - void on_error(http_manager::websocket_connection_ptr connection, const std::error_code& error_code) - { - std::lock_guard lock(m_mutex); - m_panels.erase(connection); - } - - void on_document_request(http_manager::http_request_ptr request, http_manager::http_response_ptr response, const std::string &filename) - { - m_server->serve_document(request, response, filename); - } - - void on_template_request(http_manager::http_request_ptr request, http_manager::http_response_ptr response, const std::string &filename) - { - using namespace std::placeholders; - m_server->serve_template(request, response, filename, std::bind(&esqpanel_external_panel_server::get_template_value, this, _1), '$', '$'); - } - - external_panel_ptr external_panel_for_connection(http_manager::websocket_connection_ptr connection) - { - auto it = m_panels.find(connection); - - if (it == m_panels.end()) { - // this connection is not in the list. This really shouldn't happen - // and probably means something else is wrong. - throw std::invalid_argument("No panel avaliable for connection"); - } - - return it->second; - } - - bool has_commands() - { - // printf("server: has_commands()\n"); - std::lock_guard lock(m_mutex); - return !m_commands.empty(); - } - - std::string get_next_command() - { - // printf("server: get_next_command()\n"); - std::lock_guard lock(m_mutex); - std::string command = std::move(m_commands.front()); - m_commands.pop_front(); - return command; - } - - void set_index(const std::string &index) - { - m_index = index; - } - - void add_http_document(const std::string &path, const std::string &filename) - { - m_server->remove_http_handler(path); - if (filename != "") + void on_open(http_manager::websocket_connection_ptr connection) { using namespace std::placeholders; - m_server->add_http_handler(path, std::bind(&esqpanel_external_panel_server::on_document_request, this, _1, _2, filename)); + + std::lock_guard lock(m_mutex); + m_panels[connection] = std::make_shared(); } - } - void add_http_template(const std::string &path, const std::string &filename) - { - m_server->remove_http_handler(path); - if (filename != "") + void on_message(http_manager::websocket_connection_ptr connection, const std::string &payload, int opcode) { - using namespace std::placeholders; - m_server->add_http_handler(path, std::bind(&esqpanel_external_panel_server::on_template_request, this, _1, _2, filename)); - } - } + external_panel_ptr panel = external_panel_for_connection(connection); + const std::string &command = payload; - void set_content_provider(std::function provider) - { - m_content_provider = provider; - } + int t = external_panel::get_message_type(command.front()); - void set_keyboard(const std::string &keyboard) - { - m_keyboard = keyboard; - } - - const std::string &get_keyboard() const - { - return m_keyboard; - } - - const std::string &get_version() const - { - return m_version; - } - - bool get_template_value(std::string &s) - { - if (s == "keyboard") - { - s = m_keyboard; - return true; - } - else if (s == "version") - { - s = m_version; - return true; - } - else - { - return false; - } - } - -private: - void send(http_manager::websocket_connection_ptr connection, const std::string &s) - { - connection->send_message(s, websocket_opcode::binary); - } - - void send_contents(http_manager::websocket_connection_ptr connection) - { - if (m_content_provider) - { - m_to_send.str(""); - m_to_send.put('D'); - if (m_content_provider(m_to_send)) + if (t == message_type::CONTROL) { - send(connection, m_to_send.str()); + int changed = panel->handle_control_message(command); + // printf("server: control message, changed = '%x'\n", changed); + if ((changed & message_type::DISPLAY) && panel->send_display_data()) + { + // printf("server: control message, sending contents\n"); + send_contents(connection); + } + + if ((changed & message_type::ANALOG) && panel->send_analog_values()) + { + // printf("server: control message, sending analog values\n"); + send_analog_values(connection); + } + + if ((changed & message_type::BUTTON) && panel->send_buttons()) + { + // printf("server: control message, sending button states\n"); + send_button_states(connection); + } + } + else if (t == message_type::INFO) + { + std::ostringstream o; + o << "I" << get_keyboard() << "," << get_version(); + send(connection, o.str()); + } + else + { + { + std::lock_guard lock(m_mutex); + m_commands.emplace_back(command); + } + + // Echo the non-command message to any other connected panels that want it + for (auto iter: m_panels) + { + external_panel_ptr other_panel = iter.second; + if (other_panel != panel && (t & other_panel->send_message_types()) != 0) + { + send(iter.first, command); + } + } } } - } - void send_analog_values(http_manager::websocket_connection_ptr connection) - { - // TODO(cbrunschen): get the current analog values and send them - } + void on_close(http_manager::websocket_connection_ptr connection, int status, const std::string& reason) + { + std::lock_guard lock(m_mutex); + m_panels.erase(connection); + } - void send_button_states(http_manager::websocket_connection_ptr connection) - { - // TODO(cbrunschen): track current button states and send them - } + void on_error(http_manager::websocket_connection_ptr connection, const std::error_code& error_code) + { + std::lock_guard lock(m_mutex); + m_panels.erase(connection); + } - http_manager *m_server; - std::recursive_mutex m_mutex; + void on_document_request(http_manager::http_request_ptr request, http_manager::http_response_ptr response, const std::string &filename) + { + m_server->serve_document(request, response, filename); + } - connection_to_panel_map m_panels; - std::list m_commands; - std::thread m_working_thread; - std::ostringstream m_to_send; + void on_template_request(http_manager::http_request_ptr request, http_manager::http_response_ptr response, const std::string &filename) + { + using namespace std::placeholders; + m_server->serve_template(request, response, filename, std::bind(&external_panel_server::get_template_value, this, _1), '$', '$'); + } - std::string m_index; - std::string m_keyboard; - std::string m_version; - std::function m_content_provider; - std::map m_template_values; -}; + external_panel_ptr external_panel_for_connection(http_manager::websocket_connection_ptr connection) + { + auto it = m_panels.find(connection); + + if (it == m_panels.end()) { + // this connection is not in the list. This really shouldn't happen + // and probably means something else is wrong. + throw std::invalid_argument("No panel avaliable for connection"); + } + + return it->second; + } + + bool has_commands() + { + // printf("server: has_commands()\n"); + std::lock_guard lock(m_mutex); + return !m_commands.empty(); + } + + std::string get_next_command() + { + // printf("server: get_next_command()\n"); + std::lock_guard lock(m_mutex); + std::string command = std::move(m_commands.front()); + m_commands.pop_front(); + return command; + } + + void set_index(const std::string &index) + { + m_index = index; + } + + void add_http_document(const std::string &path, const std::string &filename) + { + m_server->remove_http_handler(path); + if (filename != "") + { + using namespace std::placeholders; + m_server->add_http_handler(path, std::bind(&external_panel_server::on_document_request, this, _1, _2, filename)); + } + } + + void add_http_template(const std::string &path, const std::string &filename) + { + m_server->remove_http_handler(path); + if (filename != "") + { + using namespace std::placeholders; + m_server->add_http_handler(path, std::bind(&external_panel_server::on_template_request, this, _1, _2, filename)); + } + } + + void set_content_provider(std::function provider) + { + m_content_provider = provider; + } + + void set_keyboard(const std::string &keyboard) + { + m_keyboard = keyboard; + } + + const std::string &get_keyboard() const + { + return m_keyboard; + } + + const std::string &get_version() const + { + return m_version; + } + + bool get_template_value(std::string &s) + { + if (s == "keyboard") + { + s = m_keyboard; + return true; + } + else if (s == "version") + { + s = m_version; + return true; + } + else + { + return false; + } + } + + private: + void send(http_manager::websocket_connection_ptr connection, const std::string &s) + { + connection->send_message(s, websocket_opcode::binary); + } + + void send_contents(http_manager::websocket_connection_ptr connection) + { + if (m_content_provider) + { + m_to_send.str(""); + m_to_send.put('D'); + if (m_content_provider(m_to_send)) + { + send(connection, m_to_send.str()); + } + } + } + + void send_analog_values(http_manager::websocket_connection_ptr connection) + { + // TODO(cbrunschen): get the current analog values and send them + } + + void send_button_states(http_manager::websocket_connection_ptr connection) + { + // TODO(cbrunschen): track current button states and send them + } + + http_manager *m_server; + std::recursive_mutex m_mutex; + + connection_to_panel_map m_panels; + std::list m_commands; + std::thread m_working_thread; + std::ostringstream m_to_send; + + std::string m_index; + std::string m_keyboard; + std::string m_version; + std::function m_content_provider; + std::map m_template_values; + }; + +} // namespace esqpanel //************************************************************************** // MACROS / CONSTANTS @@ -423,7 +427,7 @@ void esqpanel_device::device_start() m_write_tx.resolve_safe(); m_write_analog.resolve_safe(); - m_external_panel_server = new esqpanel_external_panel_server(machine().manager().http()); + m_external_panel_server = new esqpanel::external_panel_server(machine().manager().http()); if (machine().manager().http()->is_active()) { m_external_panel_server->set_keyboard(owner()->shortname()); m_external_panel_server->set_index("/esqpanel/FrontPanel.html"); diff --git a/src/mame/machine/esqpanel.h b/src/mame/machine/esqpanel.h index 7fd461a8da5..de2e9918f83 100644 --- a/src/mame/machine/esqpanel.h +++ b/src/mame/machine/esqpanel.h @@ -62,7 +62,9 @@ // ======================> esqpanel_device -class esqpanel_external_panel_server; +namespace esqpanel { + class external_panel_server; +} class esqpanel_device : public device_t, public device_serial_interface { @@ -107,7 +109,7 @@ protected: bool m_eps_mode; - esqpanel_external_panel_server *m_external_panel_server; + esqpanel::external_panel_server *m_external_panel_server; private: static const int XMIT_RING_SIZE = 16;