Refactor server_{ws,http}.hpp into separate interface and implementation. (#2548)

Refactor server_{ws,http}.hpp into separate interface and implementation headers.
When shutting down the HTTP server, also explicitly stop the asio::io_context.
This commit is contained in:
Christian Brunschen 2017-08-06 13:25:55 +02:00 committed by Vas Crabb
parent 9968fc71b0
commit 336a636aed
8 changed files with 1621 additions and 1533 deletions

View File

@ -9,8 +9,8 @@ HTTP server handling
***************************************************************************/
#include "emu.h"
#include "server_ws.hpp"
#include "server_http.hpp"
#include "server_ws_impl.hpp"
#include "server_http_impl.hpp"
#include <fstream>
#include <inttypes.h>
@ -115,13 +115,13 @@ static std::string extension_to_type(const std::string& extension)
struct http_request_impl : public http_manager::http_request
{
public:
std::shared_ptr<webpp::http_server::Request> m_request;
std::shared_ptr<webpp::Request> m_request;
std::size_t m_query;
std::size_t m_fragment;
std::size_t m_path_end;
std::size_t m_query_end;
http_request_impl(std::shared_ptr<webpp::http_server::Request> request) : m_request(request) {
http_request_impl(std::shared_ptr<webpp::Request> request) : m_request(request) {
std::size_t len = m_request->path.length();
m_fragment = m_request->path.find('#');
@ -181,13 +181,13 @@ public:
/** An HTTP response. */
struct http_response_impl : public http_manager::http_response {
std::shared_ptr<webpp::http_server::Response> m_response;
std::shared_ptr<webpp::Response> m_response;
int m_status;
std::string m_content_type;
std::stringstream m_headers;
std::stringstream m_body;
http_response_impl(std::shared_ptr<webpp::http_server::Response> response) : m_response(response) { }
http_response_impl(std::shared_ptr<webpp::Response> response) : m_response(response) { }
/** Sets the HTTP status to be returned to the client. */
virtual void set_status(int status) {
@ -220,9 +220,9 @@ struct http_response_impl : public http_manager::http_response {
struct websocket_endpoint_impl : public http_manager::websocket_endpoint {
/** The underlying edpoint. */
std::shared_ptr<webpp::ws_server::Endpoint> m_endpoint;
webpp::ws_server::Endpoint *m_endpoint;
websocket_endpoint_impl(std::shared_ptr<webpp::ws_server::Endpoint> endpoint,
websocket_endpoint_impl(webpp::ws_server::Endpoint *endpoint,
http_manager::websocket_open_handler on_open,
http_manager::websocket_message_handler on_message,
http_manager::websocket_close_handler on_close,
@ -239,20 +239,24 @@ struct websocket_connection_impl : public http_manager::websocket_connection {
/** The server */
webpp::ws_server *m_wsserver;
/* The underlying Commection. */
std::shared_ptr<webpp::ws_server::Connection> m_connection;
websocket_connection_impl(webpp::ws_server *server, std::shared_ptr<webpp::ws_server::Connection> connection)
std::weak_ptr<webpp::Connection> m_connection;
websocket_connection_impl(webpp::ws_server *server, std::shared_ptr<webpp::Connection> connection)
: m_wsserver(server), m_connection(connection) { }
/** Sends a message to the client that is connected on the other end of this Websocket connection. */
virtual void send_message(const std::string &payload, int opcode) {
std::shared_ptr<webpp::ws_server::SendStream> message_stream = std::make_shared<webpp::ws_server::SendStream>();
(*message_stream) << payload;
m_wsserver->send(m_connection, message_stream, nullptr, opcode | 0x80);
if (auto connection = m_connection.lock()) {
std::shared_ptr<webpp::ws_server::SendStream> message_stream = std::make_shared<webpp::ws_server::SendStream>();
(*message_stream) << payload;
m_wsserver->send(connection, message_stream, nullptr, opcode | 0x80);
}
}
/** Closes this open Websocket connection. */
virtual void close() {
m_wsserver->send_close(m_connection, 1000 /* normal close */);
if (auto connection = m_connection.lock()) {
m_wsserver->send_close(connection, 1000 /* normal close */);
}
}
};
@ -338,11 +342,12 @@ http_manager::~http_manager()
if (!m_server) return;
m_server->stop();
m_io_context->stop();
if (m_server_thread.joinable())
m_server_thread.join();
}
static void on_get(http_manager::http_handler handler, std::shared_ptr<webpp::http_server::Response> response, std::shared_ptr<webpp::http_server::Request> request) {
static void on_get(http_manager::http_handler handler, std::shared_ptr<webpp::Response> response, std::shared_ptr<webpp::Request> request) {
auto request_impl = std::make_shared<http_request_impl>(request);
auto response_impl = std::make_shared<http_response_impl>(response);
@ -351,23 +356,22 @@ static void on_get(http_manager::http_handler handler, std::shared_ptr<webpp::ht
response_impl->send();
}
void http_manager::on_open(http_manager::websocket_endpoint_ptr endpoint, void *connection) {
void http_manager::on_open(http_manager::websocket_endpoint_ptr endpoint, std::shared_ptr<webpp::Connection> connection) {
std::lock_guard<std::mutex> lock(m_connections_mutex);
webpp::ws_server *ws_server = m_wsserver.get();
// Keep an oening shared_ptr to the Connection, so it won't go away while we are using it.
std::shared_ptr<webpp::ws_server::Connection> conn = (static_cast<webpp::ws_server::Connection *>(connection))->ptr();
http_manager::websocket_connection_ptr connection_impl = std::make_shared<websocket_connection_impl>(ws_server, conn);
m_connections[connection] = connection_impl;
http_manager::websocket_connection_ptr connection_impl = std::make_shared<websocket_connection_impl>(ws_server, connection);
m_connections[connection.get()] = connection_impl;
if (endpoint->on_open) {
endpoint->on_open(connection_impl);
}
}
void http_manager::on_message(http_manager::websocket_endpoint_ptr endpoint, void *connection, const std::string &payload, int opcode) {
void http_manager::on_message(http_manager::websocket_endpoint_ptr endpoint, std::shared_ptr<webpp::Connection> connection, const std::string &payload, int opcode) {
if (endpoint->on_message) {
std::lock_guard<std::mutex> lock(m_connections_mutex);
auto i = m_connections.find(connection);
auto i = m_connections.find(connection.get());
if (i != m_connections.end()) {
http_manager::websocket_connection_ptr websocket_connection_impl = (*i).second;
endpoint->on_message(websocket_connection_impl, payload, opcode);
@ -375,31 +379,31 @@ void http_manager::on_message(http_manager::websocket_endpoint_ptr endpoint, voi
}
}
void http_manager::on_close(http_manager::websocket_endpoint_ptr endpoint, void *connection,
void http_manager::on_close(http_manager::websocket_endpoint_ptr endpoint, std::shared_ptr<webpp::Connection> connection,
int status, const std::string& reason) {
std::lock_guard<std::mutex> lock(m_connections_mutex);
auto i = m_connections.find(connection);
auto i = m_connections.find(connection.get());
if (i != m_connections.end()) {
if (endpoint->on_close) {
http_manager::websocket_connection_ptr websocket_connection_impl = (*i).second;
endpoint->on_close(websocket_connection_impl, status, reason);
}
m_connections.erase(connection);
m_connections.erase(connection.get());
}
}
void http_manager::on_error(http_manager::websocket_endpoint_ptr endpoint, void *connection,
void http_manager::on_error(http_manager::websocket_endpoint_ptr endpoint, std::shared_ptr<webpp::Connection> connection,
const std::error_code& error_code) {
std::lock_guard<std::mutex> lock(m_connections_mutex);
auto i = m_connections.find(connection);
auto i = m_connections.find(connection.get());
if (i != m_connections.end()) {
if (endpoint->on_error) {
http_manager::websocket_connection_ptr websocket_connection_impl = (*i).second;
endpoint->on_error(websocket_connection_impl, error_code);
}
m_connections.erase(connection);
m_connections.erase(connection.get());
}
}
@ -527,25 +531,25 @@ http_manager::websocket_endpoint_ptr http_manager::add_endpoint(const std::strin
using namespace std::placeholders;
auto &endpoint = m_wsserver->m_endpoint[path];
std::shared_ptr<webpp::ws_server::Endpoint> endpoint_ptr(&endpoint);
webpp::ws_server::Endpoint *endpoint_ptr(&endpoint);
auto endpoint_impl = std::make_shared<websocket_endpoint_impl>(endpoint_ptr, on_open, on_message, on_close, on_error);
endpoint.on_open = [&, this, endpoint_impl](std::shared_ptr<webpp::ws_server::Connection> conn) {
this->on_open(endpoint_impl, conn.get());
endpoint.on_open = [&, this, endpoint_impl](std::shared_ptr<webpp::Connection> connection) {
this->on_open(endpoint_impl, connection);
};
endpoint.on_message = [&, this, endpoint_impl](std::shared_ptr<webpp::ws_server::Connection> conn, std::shared_ptr<webpp::ws_server::Message> message) {
endpoint.on_message = [&, this, endpoint_impl](std::shared_ptr<webpp::Connection> connection, std::shared_ptr<webpp::ws_server::Message> message) {
std::string payload = message->string();
int opcode = message->fin_rsv_opcode & 0x0f;
this->on_message(endpoint_impl, conn.get(), payload, opcode);
this->on_message(endpoint_impl, connection, payload, opcode);
};
endpoint.on_close = [&, this, endpoint_impl](std::shared_ptr<webpp::ws_server::Connection> conn, int status, const std::string& reason) {
this->on_close(endpoint_impl, conn.get(), status, reason);
endpoint.on_close = [&, this, endpoint_impl](std::shared_ptr<webpp::Connection> connection, int status, const std::string& reason) {
this->on_close(endpoint_impl, connection, status, reason);
};
endpoint.on_error = [&, this, endpoint_impl](std::shared_ptr<webpp::ws_server::Connection> conn, const std::error_code& error_code) {
this->on_error(endpoint_impl, conn.get(), error_code);
endpoint.on_error = [&, this, endpoint_impl](std::shared_ptr<webpp::Connection> connection, const std::error_code& error_code) {
this->on_error(endpoint_impl, connection, error_code);
};
m_endpoints[path] = endpoint_impl;

View File

@ -19,6 +19,8 @@ HTTP server handling
#include <thread>
#include <time.h>
#include "server_http.hpp"
#include "server_ws.hpp"
//**************************************************************************
// TYPE DEFINITIONS
@ -29,11 +31,6 @@ namespace asio
{
class io_context;
}
namespace webpp
{
class http_server;
class ws_server;
}
class http_manager
{
@ -41,7 +38,7 @@ class http_manager
public:
/** An HTTP Request. */
struct http_request : public std::enable_shared_from_this<http_request>
struct http_request
{
/** Retrieves the requested resource. */
virtual const std::string get_resource() = 0; // The entire resource: path, query and fragment.
@ -67,7 +64,7 @@ public:
typedef std::shared_ptr<http_request> http_request_ptr;
/** An HTTP response. */
struct http_response : public std::enable_shared_from_this<http_response>
struct http_response
{
/** Sets the HTTP status to be returned to the client. */
virtual void set_status(int status) = 0;
@ -84,7 +81,7 @@ public:
typedef std::shared_ptr<http_response> http_response_ptr;
/** Identifies a Websocket connection. */
struct websocket_connection : public std::enable_shared_from_this<websocket_connection>
struct websocket_connection
{
/** Sends a message to the client that is connected on the other end of this Websocket connection. */
virtual void send_message(const std::string &payload, int opcode) = 0;
@ -162,13 +159,13 @@ public:
}
private:
void on_open(http_manager::websocket_endpoint_ptr endpoint, void *onnection);
void on_open(http_manager::websocket_endpoint_ptr endpoint, std::shared_ptr<webpp::Connection> connection);
void on_message(http_manager::websocket_endpoint_ptr endpoint, void *connection, const std::string& payload, int opcode);
void on_message(http_manager::websocket_endpoint_ptr endpoint, std::shared_ptr<webpp::Connection> connection, const std::string& payload, int opcode);
void on_close(http_manager::websocket_endpoint_ptr endpoint, void *connection, int status, const std::string& reason);
void on_close(http_manager::websocket_endpoint_ptr endpoint, std::shared_ptr<webpp::Connection> connection, int status, const std::string& reason);
void on_error(http_manager::websocket_endpoint_ptr endpoint, void *connection, const std::error_code& error_code);
void on_error(http_manager::websocket_endpoint_ptr endpoint, std::shared_ptr<webpp::Connection> connection, const std::error_code& error_code);
bool read_file(std::ostream &os, const std::string &path);

View File

@ -1,14 +1,8 @@
// license:MIT
// copyright-holders:Ole Christian Eidheim, Miodrag Milanovic
#ifndef SERVER_HTTP_HPP
#define SERVER_HTTP_HPP
#ifndef MAME_LIB_UTIL_SERVER_HTTP_HPP
#define MAME_LIB_UTIL_SERVER_HTTP_HPP
#if defined(_MSC_VER)
#pragma warning(disable:4503)
#endif
#include "asio.h"
#include "asio/system_timer.hpp"
#include "path_to_regex.hpp"
#include <map>
@ -41,471 +35,32 @@ public:
return seed;
}
};
#endif
#endif /* CASE_INSENSITIVE_EQUALS_AND_HASH */
namespace webpp {
template <class socket_type>
class Server;
struct Request {
std::string method, path, http_version;
template <class socket_type>
class ServerBase {
public:
virtual ~ServerBase() {}
std::unordered_multimap<std::string, std::string, case_insensitive_hash, case_insensitive_equals> header;
class Response {
friend class ServerBase<socket_type>;
path2regex::Keys keys;
std::map<std::string, std::string> params;
asio::streambuf m_streambuf;
std::shared_ptr<socket_type> m_socket;
std::ostream m_ostream;
std::stringstream m_header;
explicit Response(const std::shared_ptr<socket_type> &socket) : m_socket(socket), m_ostream(&m_streambuf) {}
static std::string statusToString(int status)
{
switch (status) {
default:
case 200: return "HTTP/1.0 200 OK\r\n";
case 201: return "HTTP/1.0 201 Created\r\n";
case 202: return "HTTP/1.0 202 Accepted\r\n";
case 204: return "HTTP/1.0 204 No Content\r\n";
case 300: return "HTTP/1.0 300 Multiple Choices\r\n";
case 301: return "HTTP/1.0 301 Moved Permanently\r\n";
case 302: return "HTTP/1.0 302 Moved Temporarily\r\n";
case 304: return "HTTP/1.0 304 Not Modified\r\n";
case 400: return "HTTP/1.0 400 Bad Request\r\n";
case 401: return "HTTP/1.0 401 Unauthorized\r\n";
case 403: return "HTTP/1.0 403 Forbidden\r\n";
case 404: return "HTTP/1.0 404 Not Found\r\n";
case 500: return "HTTP/1.0 500 Internal Server Error\r\n";
case 501: return "HTTP/1.0 501 Not Implemented\r\n";
case 502: return "HTTP/1.0 502 Bad Gateway\r\n";
case 504: return "HTTP/1.0 503 Service Unavailable\r\n";
}
}
public:
Response& status(int number) { m_ostream << statusToString(number); return *this; }
void type(std::string str) { m_header << "Content-Type: "<< str << "\r\n"; }
void send(std::string str) { m_ostream << m_header.str() << "Content-Length: " << str.length() << "\r\n\r\n" << str; }
size_t size() const { return m_streambuf.size(); }
std::shared_ptr<socket_type> socket() { return m_socket; }
/// If true, force server to close the connection after the response have been sent.
///
/// This is useful when implementing a HTTP/1.0-server sending content
/// without specifying the content length.
bool close_connection_after_response = false;
};
class Content : public std::istream {
friend class ServerBase<socket_type>;
public:
size_t size() const {
return streambuf.size();
}
std::string string() const {
std::stringstream ss;
ss << rdbuf();
return ss.str();
}
private:
asio::streambuf &streambuf;
explicit Content(asio::streambuf &streambuf): std::istream(&streambuf), streambuf(streambuf) {}
};
class Request {
friend class ServerBase<socket_type>;
friend class Server<socket_type>;
public:
std::string method, path, http_version;
Content content;
std::unordered_multimap<std::string, std::string, case_insensitive_hash, case_insensitive_equals> header;
path2regex::Keys keys;
std::map<std::string, std::string> params;
std::string remote_endpoint_address;
unsigned short remote_endpoint_port;
private:
Request(const socket_type &socket): content(streambuf) {
try {
remote_endpoint_address=socket.lowest_layer().remote_endpoint().address().to_string();
remote_endpoint_port=socket.lowest_layer().remote_endpoint().port();
}
catch(...) {}
}
asio::streambuf streambuf;
};
class Config {
friend class ServerBase<socket_type>;
Config(unsigned short port) : port(port) {}
public:
/// Port number to use. Defaults to 80 for HTTP and 443 for HTTPS.
unsigned short port;
/// Number of threads that the server will use when start() is called. Defaults to 1 thread.
size_t thread_pool_size=1;
/// Timeout on request handling. Defaults to 5 seconds.
size_t timeout_request=5;
/// Timeout on content handling. Defaults to 300 seconds.
size_t timeout_content=300;
/// IPv4 address in dotted decimal form or IPv6 address in hexadecimal notation.
/// If empty, the address will be any address.
std::string address;
/// Set to false to avoid binding the socket to an address that is already in use. Defaults to true.
bool reuse_address=true;
};
///Set before calling start().
Config m_config;
private:
class regex_orderable : public std::regex {
std::string str;
public:
regex_orderable(std::regex reg, const std::string &regex_str) : std::regex(reg), str(regex_str) {}
bool operator<(const regex_orderable &rhs) const {
return str<rhs.str;
}
std::string getstr() const { return str; }
};
using http_handler = std::function<void(std::shared_ptr<Response>, std::shared_ptr<Request>)>;
public:
template<class T> void on_get(std::string regex, T&& func) { std::lock_guard<std::mutex> lock(m_resource_mutex); path2regex::Keys keys; auto reg = path2regex::path_to_regex(regex, keys); m_resource[regex_orderable(reg,regex)]["GET"] = std::make_tuple(std::move(keys), func); }
template<class T> void on_get(T&& func) { std::lock_guard<std::mutex> lock(m_resource_mutex); m_default_resource["GET"] = func; }
template<class T> void on_post(std::string regex, T&& func) { std::lock_guard<std::mutex> lock(m_resource_mutex); path2regex::Keys keys; auto reg = path2regex::path_to_regex(regex, keys); m_resource[regex_orderable(reg, regex)]["POST"] = std::make_tuple(std::move(keys), func); }
template<class T> void on_post(T&& func) { std::lock_guard<std::mutex> lock(m_resource_mutex); m_default_resource["POST"] = func; }
template<class T> void on_put(std::string regex, T&& func) { std::lock_guard<std::mutex> lock(m_resource_mutex); path2regex::Keys keys; auto reg = path2regex::path_to_regex(regex, keys); m_resource[regex_orderable(reg, regex)]["PUT"] = std::make_tuple(std::move(keys), func); }
template<class T> void on_put(T&& func) { std::lock_guard<std::mutex> lock(m_resource_mutex); m_default_resource["PUT"] = func; }
template<class T> void on_patch(std::string regex, T&& func) { std::lock_guard<std::mutex> lock(m_resource_mutex); path2regex::Keys keys; auto reg = path2regex::path_to_regex(regex, keys); m_resource[regex_orderable(reg, regex)]["PATCH"] = std::make_tuple(std::move(keys), func); }
template<class T> void on_patch(T&& func) { std::lock_guard<std::mutex> lock(m_resource_mutex); m_default_resource["PATCH"] = func; }
template<class T> void on_delete(std::string regex, T&& func) { std::lock_guard<std::mutex> lock(m_resource_mutex); path2regex::Keys keys; auto reg = path2regex::path_to_regex(regex, keys); m_resource[regex_orderable(reg, regex)]["DELETE"] = std::make_tuple(std::move(keys), func); }
template<class T> void on_delete(T&& func) { std::lock_guard<std::mutex> lock(m_resource_mutex); m_default_resource["DELETE"] = func; }
void remove_handler(std::string regex)
{
std::lock_guard<std::mutex> lock(m_resource_mutex);
for (auto it = m_resource.begin(); it != m_resource.end(); ++it)
{
if (it->first.getstr() == regex)
{
m_resource.erase(it);
break;
}
}
}
void clear()
{
std::lock_guard<std::mutex> lock(m_resource_mutex);
m_resource.clear();
}
std::function<void(std::shared_ptr<typename ServerBase<socket_type>::Request>, const std::error_code&)> on_error;
std::function<void(std::shared_ptr<socket_type> socket, std::shared_ptr<typename ServerBase<socket_type>::Request>)> on_upgrade;
private:
/// Warning: do not access (red or write) m_resources without holding m_resource_mutex
std::map<regex_orderable, std::map<std::string, std::tuple<path2regex::Keys, http_handler>>> m_resource;
std::mutex m_resource_mutex;
std::map<std::string, http_handler> m_default_resource;
public:
virtual void start() {
if(!m_io_context)
m_io_context=std::make_shared<asio::io_context>();
if(m_io_context->stopped())
m_io_context.reset();
asio::ip::tcp::endpoint endpoint;
if(m_config.address.size()>0)
endpoint=asio::ip::tcp::endpoint(asio::ip::make_address(m_config.address), m_config.port);
else
endpoint=asio::ip::tcp::endpoint(asio::ip::tcp::v4(), m_config.port);
if(!acceptor)
acceptor= std::make_unique<asio::ip::tcp::acceptor>(*m_io_context);
acceptor->open(endpoint.protocol());
acceptor->set_option(asio::socket_base::reuse_address(m_config.reuse_address));
acceptor->bind(endpoint);
acceptor->listen();
accept();
if (!m_external_context)
m_io_context->run();
}
void stop() const
{
acceptor->close();
if (!m_external_context)
m_io_context->stop();
}
///Use this function if you need to recursively send parts of a longer message
void send(const std::shared_ptr<Response> &response, const std::function<void(const std::error_code&)>& callback=nullptr) const {
asio::async_write(*response->socket(), response->m_streambuf, [this, response, callback](const std::error_code& ec, size_t /*bytes_transferred*/) {
if(callback)
callback(ec);
});
}
void set_io_context(std::shared_ptr<asio::io_context> new_io_context)
{
m_io_context = new_io_context;
m_external_context = true;
}
protected:
std::shared_ptr<asio::io_context> m_io_context;
bool m_external_context;
std::unique_ptr<asio::ip::tcp::acceptor> acceptor;
std::vector<std::thread> threads;
ServerBase(unsigned short port) : m_config(port), m_external_context(false) {}
virtual void accept()=0;
std::shared_ptr<asio::system_timer> get_timeout_timer(const std::shared_ptr<socket_type> &socket, long seconds) {
if(seconds==0)
return nullptr;
auto timer = std::make_shared<asio::system_timer>(*m_io_context);
timer->expires_at(std::chrono::system_clock::now() + std::chrono::seconds(seconds));
timer->async_wait([socket](const std::error_code& ec){
if(!ec) {
std::error_code newec = ec;
socket->lowest_layer().shutdown(asio::ip::tcp::socket::shutdown_both, newec);
socket->lowest_layer().close();
}
});
return timer;
}
void read_request_and_content(const std::shared_ptr<socket_type> &socket) {
//Create new streambuf (Request::streambuf) for async_read_until()
//shared_ptr is used to pass temporary objects to the asynchronous functions
std::shared_ptr<Request> request(new Request(*socket));
//Set timeout on the following asio::async-read or write function
auto timer = get_timeout_timer(socket, m_config.timeout_request);
asio::async_read_until(*socket, request->streambuf, "\r\n\r\n",
[this, socket, request, timer](const std::error_code& ec, size_t bytes_transferred) {
if(timer)
timer->cancel();
if(!ec) {
//request->streambuf.size() is not necessarily the same as bytes_transferred, from Boost-docs:
//"After a successful async_read_until operation, the streambuf may contain additional data beyond the delimiter"
//The chosen solution is to extract lines from the stream directly when parsing the header. What is left of the
//streambuf (maybe some bytes of the content) is appended to in the async_read-function below (for retrieving content).
size_t num_additional_bytes=request->streambuf.size()-bytes_transferred;
if (!parse_request(request))
return;
//If content, read that as well
auto it = request->header.find("Content-Length");
if (it != request->header.end()) {
unsigned long long content_length;
try {
content_length = stoull(it->second);
}
catch (const std::exception &) {
if (on_error)
on_error(request, std::error_code(EPROTO, std::generic_category()));
return;
}
if (content_length > num_additional_bytes) {
//Set timeout on the following asio::async-read or write function
auto timer2 = get_timeout_timer(socket, m_config.timeout_content);
asio::async_read(*socket, request->streambuf,
asio::transfer_exactly(size_t(content_length) - num_additional_bytes),
[this, socket, request, timer2]
(const std::error_code& ec, size_t /*bytes_transferred*/) {
if (timer2)
timer2->cancel();
if (!ec)
find_resource(socket, request);
else if (on_error)
on_error(request, ec);
});
}
else {
find_resource(socket, request);
}
}
else {
find_resource(socket, request);
}
}
else if (on_error)
on_error(request, ec);
});
}
bool parse_request(const std::shared_ptr<Request> &request) const {
std::string line;
getline(request->content, line);
size_t method_end;
if((method_end=line.find(' '))!=std::string::npos) {
size_t path_end;
if((path_end=line.find(' ', method_end+1))!=std::string::npos) {
request->method=line.substr(0, method_end);
request->path=line.substr(method_end+1, path_end-method_end-1);
size_t protocol_end;
if((protocol_end=line.find('/', path_end+1))!=std::string::npos) {
if(line.compare(path_end+1, protocol_end-path_end-1, "HTTP")!=0)
return false;
request->http_version=line.substr(protocol_end+1, line.size()-protocol_end-2);
}
else
return false;
getline(request->content, line);
size_t param_end;
while((param_end=line.find(':'))!=std::string::npos) {
size_t value_start=param_end+1;
if((value_start)<line.size()) {
if(line[value_start]==' ')
value_start++;
if(value_start<line.size())
request->header.emplace(line.substr(0, param_end), line.substr(value_start, line.size() - value_start - 1));
}
getline(request->content, line);
}
}
else
return false;
}
else
return false;
return true;
}
void find_resource(const std::shared_ptr<socket_type> &socket, const std::shared_ptr<Request> &request) {
std::lock_guard<std::mutex> lock(m_resource_mutex);
//Upgrade connection
if(on_upgrade) {
auto it=request->header.find("Upgrade");
if(it!=request->header.end()) {
on_upgrade(socket, request);
return;
}
}
//Find path- and method-match, and call write_response
for(auto& regex_method : m_resource) {
auto it = regex_method.second.find(request->method);
if (it != regex_method.second.end()) {
std::smatch sm_res;
if (std::regex_match(request->path, sm_res, regex_method.first)) {
request->keys = std::get<0>(it->second);
for (size_t i = 0; i < request->keys.size(); i++) {
request->params.insert(std::pair<std::string, std::string>(request->keys[i].name, sm_res[i + 1]));
}
write_response(socket, request, std::get<1>(it->second));
return;
}
}
}
auto it=m_default_resource.find(request->method);
if(it!=m_default_resource.end()) {
write_response(socket, request, it->second);
}
}
void write_response(const std::shared_ptr<socket_type> &socket, const std::shared_ptr<Request> &request, http_handler& resource_function) {
//Set timeout on the following asio::async-read or write function
auto timer = get_timeout_timer(socket, m_config.timeout_content);
auto response=std::shared_ptr<Response>(new Response(socket), [this, request, timer](Response *response_ptr) {
auto response=std::shared_ptr<Response>(response_ptr);
send(response, [this, response, request, timer](const std::error_code& ec) {
if (timer)
timer->cancel();
if (!ec) {
float http_version;
try {
http_version = stof(request->http_version);
}
catch (const std::exception &) {
if (on_error)
on_error(request, std::error_code(EPROTO, std::generic_category()));
return;
}
if (response->close_connection_after_response)
return;
auto range = request->header.equal_range("Connection");
case_insensitive_equals check;
for (auto it = range.first; it != range.second; ++it) {
if (check(it->second, "close"))
return;
}
if (http_version > 1.05)
read_request_and_content(response->socket());
}
else if (on_error)
on_error(request, ec);
});
});
try {
resource_function(response, request);
}
catch(const std::exception &) {
if (on_error)
on_error(request, std::error_code(EPROTO, std::generic_category()));
}
}
std::string remote_endpoint_address;
unsigned short remote_endpoint_port;
virtual ~Request() {}
};
struct Response {
virtual Response& status(int number) = 0;
virtual void type(std::string str) = 0;
virtual void send(std::string str) = 0;
virtual size_t size() const = 0;
template<class socket_type>
class Server : public ServerBase<socket_type> {
public:
Server(unsigned short port, size_t num_threads, long timeout_request, long timeout_send_or_receive)
: ServerBase<socket_type>(port, num_threads, timeout_request, timeout_send_or_receive)
{
}
};
using HTTP = asio::ip::tcp::socket;
template<>
class Server<HTTP> : public ServerBase<HTTP> {
public:
Server() : ServerBase<HTTP>::ServerBase(80) {}
protected:
void accept() override {
//Create new socket for this connection
//Shared_ptr is used to pass temporary objects to the asynchronous functions
auto socket = std::make_shared<HTTP>(*m_io_context);
acceptor->async_accept(*socket, [this, socket](const std::error_code& ec){
//Immediately start accepting a new connection (if io_context hasn't been stopped)
if (ec != asio::error::operation_aborted)
accept();
if(!ec) {
asio::ip::tcp::no_delay option(true);
socket->set_option(option);
read_request_and_content(socket);
} else if (on_error)
on_error(std::shared_ptr<Request>(new Request(*socket)), ec);
});
}
};
class http_server : public Server<HTTP> {
public:
http_server() : Server<HTTP>::Server() {}
virtual ~Response() {}
};
class http_server;
}
#endif /* SERVER_HTTP_HPP */
#endif /* MAME_LIB_UTIL_SERVER_HTTP_HPP */

View File

@ -0,0 +1,482 @@
// license:MIT
// copyright-holders:Ole Christian Eidheim, Miodrag Milanovic
#ifndef MAME_LIB_UTIL_SERVER_HTTP_IMPL_HPP
#define MAME_LIB_UTIL_SERVER_HTTP_IMPL_HPP
#if defined(_MSC_VER)
#pragma warning(disable:4503)
#endif
#include "server_http.hpp"
#include "asio.h"
#include "asio/system_timer.hpp"
#include "path_to_regex.hpp"
#include <map>
#include <unordered_map>
#include <thread>
#include <functional>
#include <iostream>
#include <sstream>
#include <regex>
namespace webpp {
template <class socket_type>
class Server;
template <class socket_type>
class ServerBase {
public:
virtual ~ServerBase() {}
class Response : public webpp::Response {
friend class ServerBase<socket_type>;
asio::streambuf m_streambuf;
std::shared_ptr<socket_type> m_socket;
std::ostream m_ostream;
std::stringstream m_header;
explicit Response(const std::shared_ptr<socket_type> &socket) : m_socket(socket), m_ostream(&m_streambuf) {}
static std::string statusToString(int status)
{
switch (status) {
default:
case 200: return "HTTP/1.0 200 OK\r\n";
case 201: return "HTTP/1.0 201 Created\r\n";
case 202: return "HTTP/1.0 202 Accepted\r\n";
case 204: return "HTTP/1.0 204 No Content\r\n";
case 300: return "HTTP/1.0 300 Multiple Choices\r\n";
case 301: return "HTTP/1.0 301 Moved Permanently\r\n";
case 302: return "HTTP/1.0 302 Moved Temporarily\r\n";
case 304: return "HTTP/1.0 304 Not Modified\r\n";
case 400: return "HTTP/1.0 400 Bad Request\r\n";
case 401: return "HTTP/1.0 401 Unauthorized\r\n";
case 403: return "HTTP/1.0 403 Forbidden\r\n";
case 404: return "HTTP/1.0 404 Not Found\r\n";
case 500: return "HTTP/1.0 500 Internal Server Error\r\n";
case 501: return "HTTP/1.0 501 Not Implemented\r\n";
case 502: return "HTTP/1.0 502 Bad Gateway\r\n";
case 504: return "HTTP/1.0 503 Service Unavailable\r\n";
}
}
public:
virtual Response& status(int number) { m_ostream << statusToString(number); return *this; }
virtual void type(std::string str) { m_header << "Content-Type: "<< str << "\r\n"; }
virtual void send(std::string str) { m_ostream << m_header.str() << "Content-Length: " << str.length() << "\r\n\r\n" << str; }
virtual size_t size() const { return m_streambuf.size(); }
std::shared_ptr<socket_type> socket() { return m_socket; }
/// If true, force server to close the connection after the response have been sent.
///
/// This is useful when implementing a HTTP/1.0-server sending content
/// without specifying the content length.
bool close_connection_after_response = false;
virtual ~Response() {}
};
class Content : public std::istream {
friend class ServerBase<socket_type>;
public:
size_t size() const {
return streambuf.size();
}
std::string string() const {
std::stringstream ss;
ss << rdbuf();
return ss.str();
}
private:
asio::streambuf &streambuf;
explicit Content(asio::streambuf &streambuf): std::istream(&streambuf), streambuf(streambuf) {}
};
class Request : public webpp::Request {
friend class ServerBase<socket_type>;
friend class Server<socket_type>;
public:
Content content;
virtual ~Request() {}
private:
Request(const socket_type &socket): content(streambuf) {
try {
remote_endpoint_address=socket.lowest_layer().remote_endpoint().address().to_string();
remote_endpoint_port=socket.lowest_layer().remote_endpoint().port();
}
catch(...) {}
}
asio::streambuf streambuf;
};
class Config {
friend class ServerBase<socket_type>;
Config(unsigned short port) : port(port) {}
public:
/// Port number to use. Defaults to 80 for HTTP and 443 for HTTPS.
unsigned short port;
/// Number of threads that the server will use when start() is called. Defaults to 1 thread.
size_t thread_pool_size=1;
/// Timeout on request handling. Defaults to 5 seconds.
size_t timeout_request=5;
/// Timeout on content handling. Defaults to 300 seconds.
size_t timeout_content=300;
/// IPv4 address in dotted decimal form or IPv6 address in hexadecimal notation.
/// If empty, the address will be any address.
std::string address;
/// Set to false to avoid binding the socket to an address that is already in use. Defaults to true.
bool reuse_address=true;
};
///Set before calling start().
Config m_config;
private:
class regex_orderable : public std::regex {
std::string str;
public:
regex_orderable(std::regex reg, const std::string &regex_str) : std::regex(reg), str(regex_str) {}
bool operator<(const regex_orderable &rhs) const {
return str<rhs.str;
}
std::string getstr() const { return str; }
};
using http_handler = std::function<void(std::shared_ptr<Response>, std::shared_ptr<Request>)>;
public:
template<class T> void on_get(std::string regex, T&& func) { std::lock_guard<std::mutex> lock(m_resource_mutex); path2regex::Keys keys; auto reg = path2regex::path_to_regex(regex, keys); m_resource[regex_orderable(reg,regex)]["GET"] = std::make_tuple(std::move(keys), func); }
template<class T> void on_get(T&& func) { std::lock_guard<std::mutex> lock(m_resource_mutex); m_default_resource["GET"] = func; }
template<class T> void on_post(std::string regex, T&& func) { std::lock_guard<std::mutex> lock(m_resource_mutex); path2regex::Keys keys; auto reg = path2regex::path_to_regex(regex, keys); m_resource[regex_orderable(reg, regex)]["POST"] = std::make_tuple(std::move(keys), func); }
template<class T> void on_post(T&& func) { std::lock_guard<std::mutex> lock(m_resource_mutex); m_default_resource["POST"] = func; }
template<class T> void on_put(std::string regex, T&& func) { std::lock_guard<std::mutex> lock(m_resource_mutex); path2regex::Keys keys; auto reg = path2regex::path_to_regex(regex, keys); m_resource[regex_orderable(reg, regex)]["PUT"] = std::make_tuple(std::move(keys), func); }
template<class T> void on_put(T&& func) { std::lock_guard<std::mutex> lock(m_resource_mutex); m_default_resource["PUT"] = func; }
template<class T> void on_patch(std::string regex, T&& func) { std::lock_guard<std::mutex> lock(m_resource_mutex); path2regex::Keys keys; auto reg = path2regex::path_to_regex(regex, keys); m_resource[regex_orderable(reg, regex)]["PATCH"] = std::make_tuple(std::move(keys), func); }
template<class T> void on_patch(T&& func) { std::lock_guard<std::mutex> lock(m_resource_mutex); m_default_resource["PATCH"] = func; }
template<class T> void on_delete(std::string regex, T&& func) { std::lock_guard<std::mutex> lock(m_resource_mutex); path2regex::Keys keys; auto reg = path2regex::path_to_regex(regex, keys); m_resource[regex_orderable(reg, regex)]["DELETE"] = std::make_tuple(std::move(keys), func); }
template<class T> void on_delete(T&& func) { std::lock_guard<std::mutex> lock(m_resource_mutex); m_default_resource["DELETE"] = func; }
void remove_handler(std::string regex)
{
std::lock_guard<std::mutex> lock(m_resource_mutex);
for (auto it = m_resource.begin(); it != m_resource.end(); ++it)
{
if (it->first.getstr() == regex)
{
m_resource.erase(it);
break;
}
}
}
void clear()
{
std::lock_guard<std::mutex> lock(m_resource_mutex);
m_resource.clear();
}
std::function<void(std::shared_ptr<typename ServerBase<socket_type>::Request>, const std::error_code&)> on_error;
std::function<void(std::shared_ptr<socket_type> socket, std::shared_ptr<typename ServerBase<socket_type>::Request>)> on_upgrade;
private:
/// Warning: do not access (red or write) m_resources without holding m_resource_mutex
std::map<regex_orderable, std::map<std::string, std::tuple<path2regex::Keys, http_handler>>> m_resource;
std::mutex m_resource_mutex;
std::map<std::string, http_handler> m_default_resource;
public:
virtual void start() {
if(!m_io_context)
m_io_context=std::make_shared<asio::io_context>();
if(m_io_context->stopped())
m_io_context.reset();
asio::ip::tcp::endpoint endpoint;
if(m_config.address.size()>0)
endpoint=asio::ip::tcp::endpoint(asio::ip::make_address(m_config.address), m_config.port);
else
endpoint=asio::ip::tcp::endpoint(asio::ip::tcp::v4(), m_config.port);
if(!acceptor)
acceptor= std::make_unique<asio::ip::tcp::acceptor>(*m_io_context);
acceptor->open(endpoint.protocol());
acceptor->set_option(asio::socket_base::reuse_address(m_config.reuse_address));
acceptor->bind(endpoint);
acceptor->listen();
accept();
if (!m_external_context)
m_io_context->run();
}
void stop() const
{
acceptor->close();
if (!m_external_context)
m_io_context->stop();
}
///Use this function if you need to recursively send parts of a longer message
void send(const std::shared_ptr<Response> &response, const std::function<void(const std::error_code&)>& callback=nullptr) const {
asio::async_write(*response->socket(), response->m_streambuf, [this, response, callback](const std::error_code& ec, size_t /*bytes_transferred*/) {
if(callback)
callback(ec);
});
}
void set_io_context(std::shared_ptr<asio::io_context> new_io_context)
{
m_io_context = new_io_context;
m_external_context = true;
}
protected:
std::shared_ptr<asio::io_context> m_io_context;
bool m_external_context;
std::unique_ptr<asio::ip::tcp::acceptor> acceptor;
std::vector<std::thread> threads;
ServerBase(unsigned short port) : m_config(port), m_external_context(false) {}
virtual void accept()=0;
std::shared_ptr<asio::system_timer> get_timeout_timer(const std::shared_ptr<socket_type> &socket, long seconds) {
if(seconds==0)
return nullptr;
auto timer = std::make_shared<asio::system_timer>(*m_io_context);
timer->expires_at(std::chrono::system_clock::now() + std::chrono::seconds(seconds));
timer->async_wait([socket](const std::error_code& ec){
if(!ec) {
std::error_code newec = ec;
socket->lowest_layer().shutdown(asio::ip::tcp::socket::shutdown_both, newec);
socket->lowest_layer().close();
}
});
return timer;
}
void read_request_and_content(const std::shared_ptr<socket_type> &socket) {
//Create new streambuf (Request::streambuf) for async_read_until()
//shared_ptr is used to pass temporary objects to the asynchronous functions
std::shared_ptr<Request> request(new Request(*socket));
//Set timeout on the following asio::async-read or write function
auto timer = get_timeout_timer(socket, m_config.timeout_request);
asio::async_read_until(*socket, request->streambuf, "\r\n\r\n",
[this, socket, request, timer](const std::error_code& ec, size_t bytes_transferred) {
if(timer)
timer->cancel();
if(!ec) {
//request->streambuf.size() is not necessarily the same as bytes_transferred, from Boost-docs:
//"After a successful async_read_until operation, the streambuf may contain additional data beyond the delimiter"
//The chosen solution is to extract lines from the stream directly when parsing the header. What is left of the
//streambuf (maybe some bytes of the content) is appended to in the async_read-function below (for retrieving content).
size_t num_additional_bytes=request->streambuf.size()-bytes_transferred;
if (!parse_request(request))
return;
//If content, read that as well
auto it = request->header.find("Content-Length");
if (it != request->header.end()) {
unsigned long long content_length;
try {
content_length = stoull(it->second);
}
catch (const std::exception &) {
if (on_error)
on_error(request, std::error_code(EPROTO, std::generic_category()));
return;
}
if (content_length > num_additional_bytes) {
//Set timeout on the following asio::async-read or write function
auto timer2 = get_timeout_timer(socket, m_config.timeout_content);
asio::async_read(*socket, request->streambuf,
asio::transfer_exactly(size_t(content_length) - num_additional_bytes),
[this, socket, request, timer2]
(const std::error_code& ec, size_t /*bytes_transferred*/) {
if (timer2)
timer2->cancel();
if (!ec)
find_resource(socket, request);
else if (on_error)
on_error(request, ec);
});
}
else {
find_resource(socket, request);
}
}
else {
find_resource(socket, request);
}
}
else if (on_error)
on_error(request, ec);
});
}
bool parse_request(const std::shared_ptr<Request> &request) const {
std::string line;
getline(request->content, line);
size_t method_end;
if((method_end=line.find(' '))!=std::string::npos) {
size_t path_end;
if((path_end=line.find(' ', method_end+1))!=std::string::npos) {
request->method=line.substr(0, method_end);
request->path=line.substr(method_end+1, path_end-method_end-1);
size_t protocol_end;
if((protocol_end=line.find('/', path_end+1))!=std::string::npos) {
if(line.compare(path_end+1, protocol_end-path_end-1, "HTTP")!=0)
return false;
request->http_version=line.substr(protocol_end+1, line.size()-protocol_end-2);
}
else
return false;
getline(request->content, line);
size_t param_end;
while((param_end=line.find(':'))!=std::string::npos) {
size_t value_start=param_end+1;
if((value_start)<line.size()) {
if(line[value_start]==' ')
value_start++;
if(value_start<line.size())
request->header.emplace(line.substr(0, param_end), line.substr(value_start, line.size() - value_start - 1));
}
getline(request->content, line);
}
}
else
return false;
}
else
return false;
return true;
}
void find_resource(const std::shared_ptr<socket_type> &socket, const std::shared_ptr<Request> &request) {
std::lock_guard<std::mutex> lock(m_resource_mutex);
//Upgrade connection
if(on_upgrade) {
auto it=request->header.find("Upgrade");
if(it!=request->header.end()) {
on_upgrade(socket, request);
return;
}
}
//Find path- and method-match, and call write_response
for(auto& regex_method : m_resource) {
auto it = regex_method.second.find(request->method);
if (it != regex_method.second.end()) {
std::smatch sm_res;
if (std::regex_match(request->path, sm_res, regex_method.first)) {
request->keys = std::get<0>(it->second);
for (size_t i = 0; i < request->keys.size(); i++) {
request->params.insert(std::pair<std::string, std::string>(request->keys[i].name, sm_res[i + 1]));
}
write_response(socket, request, std::get<1>(it->second));
return;
}
}
}
auto it=m_default_resource.find(request->method);
if(it!=m_default_resource.end()) {
write_response(socket, request, it->second);
}
}
void write_response(const std::shared_ptr<socket_type> &socket, const std::shared_ptr<Request> &request, http_handler& resource_function) {
//Set timeout on the following asio::async-read or write function
auto timer = get_timeout_timer(socket, m_config.timeout_content);
auto response=std::shared_ptr<Response>(new Response(socket), [this, request, timer](Response *response_ptr) {
auto response=std::shared_ptr<Response>(response_ptr);
send(response, [this, response, request, timer](const std::error_code& ec) {
if (timer)
timer->cancel();
if (!ec) {
float http_version;
try {
http_version = stof(request->http_version);
}
catch (const std::exception &) {
if (on_error)
on_error(request, std::error_code(EPROTO, std::generic_category()));
return;
}
if (response->close_connection_after_response)
return;
auto range = request->header.equal_range("Connection");
case_insensitive_equals check;
for (auto it = range.first; it != range.second; ++it) {
if (check(it->second, "close"))
return;
}
if (http_version > 1.05)
read_request_and_content(response->socket());
}
else if (on_error)
on_error(request, ec);
});
});
try {
resource_function(response, request);
}
catch(const std::exception &) {
if (on_error)
on_error(request, std::error_code(EPROTO, std::generic_category()));
}
}
};
template<class socket_type>
class Server : public ServerBase<socket_type> {
public:
Server(unsigned short port, size_t num_threads, long timeout_request, long timeout_send_or_receive)
: ServerBase<socket_type>(port, num_threads, timeout_request, timeout_send_or_receive)
{
}
};
using HTTP = asio::ip::tcp::socket;
template<>
class Server<HTTP> : public ServerBase<HTTP> {
public:
Server() : ServerBase<HTTP>::ServerBase(80) {}
protected:
void accept() override {
//Create new socket for this connection
//Shared_ptr is used to pass temporary objects to the asynchronous functions
auto socket = std::make_shared<HTTP>(*m_io_context);
acceptor->async_accept(*socket, [this, socket](const std::error_code& ec){
//Immediately start accepting a new connection (if io_context hasn't been stopped)
if (ec != asio::error::operation_aborted)
accept();
if(!ec) {
asio::ip::tcp::no_delay option(true);
socket->set_option(option);
read_request_and_content(socket);
} else if (on_error)
on_error(std::shared_ptr<Request>(new Request(*socket)), ec);
});
}
};
class http_server : public Server<HTTP> {
public:
http_server() : Server<HTTP>::Server() {}
};
}
#endif /* MAME_LIB_UTIL_SERVER_HTTP_IMPL_HPP */

View File

@ -1,24 +1,9 @@
// license:MIT
// copyright-holders:Ole Christian Eidheim, Miodrag Milanovic
#ifndef SERVER_WS_HPP
#define SERVER_WS_HPP
#include "path_to_regex.hpp"
#include "crypto.hpp"
#include "asio.h"
#include "asio/system_timer.hpp"
#include <unordered_map>
#include <thread>
#include <mutex>
#include <unordered_set>
#include <list>
#ifndef MAME_LIB_UTIL_SERVER_WS_HPP
#define MAME_LIB_UTIL_SERVER_WS_HPP
#include <memory>
#include <atomic>
#include <iostream>
#include <map>
#include <chrono>
#include <regex>
#ifndef CASE_INSENSITIVE_EQUALS_AND_HASH
#define CASE_INSENSITIVE_EQUALS_AND_HASH
@ -42,681 +27,23 @@ public:
return seed;
}
};
#endif
#endif /* CASE_INSENSITIVE_EQUALS_AND_HASH */
namespace webpp {
template <class socket_type>
class SocketServer;
struct Connection {
std::string method, path, http_version;
template <class socket_type>
class SocketServerBase {
public:
virtual ~SocketServerBase() {}
std::unordered_multimap<std::string, std::string, case_insensitive_hash, case_insensitive_equals> header;
class SendStream : public std::ostream, public std::enable_shared_from_this<SendStream> {
friend class SocketServerBase<socket_type>;
std::smatch path_match;
asio::streambuf streambuf;
public:
SendStream(): std::ostream(&streambuf) {}
size_t size() const {
return streambuf.size();
}
};
class Connection : public std::enable_shared_from_this<Connection> {
friend class SocketServerBase<socket_type>;
friend class SocketServer<socket_type>;
public:
explicit Connection(const std::shared_ptr<socket_type> &socket) : remote_endpoint_port(0), socket(socket), strand(socket->get_io_service()), closed(false) { }
std::string method, path, http_version;
std::unordered_multimap<std::string, std::string, case_insensitive_hash, case_insensitive_equals> header;
std::smatch path_match;
std::string remote_endpoint_address;
unsigned short remote_endpoint_port;
std::shared_ptr<Connection> ptr() {
return this->shared_from_this();
}
private:
explicit Connection(socket_type *socket): remote_endpoint_port(0), socket(socket), strand(socket->get_io_service()), closed(false) { }
class SendData {
public:
SendData(const std::shared_ptr<SendStream> &header_stream, const std::shared_ptr<SendStream> &message_stream,
const std::function<void(const std::error_code)> &callback) :
header_stream(header_stream), message_stream(message_stream), callback(callback) {}
std::shared_ptr<SendStream> header_stream;
std::shared_ptr<SendStream> message_stream;
std::function<void(const std::error_code)> callback;
};
std::shared_ptr<socket_type> socket;
asio::io_context::strand strand;
std::list<SendData> send_queue;
void send_from_queue(const std::shared_ptr<Connection> &connection) {
strand.post([this, connection]() {
asio::async_write(*socket, send_queue.begin()->header_stream->streambuf,
strand.wrap([this, connection](const std::error_code& ec, size_t /*bytes_transferred*/) {
if(!ec) {
asio::async_write(*socket, send_queue.begin()->message_stream->streambuf,
strand.wrap([this, connection]
(const std::error_code& ec, size_t /*bytes_transferred*/) {
auto send_queued=send_queue.begin();
if(send_queued->callback)
send_queued->callback(ec);
if(!ec) {
send_queue.erase(send_queued);
if(send_queue.size()>0)
send_from_queue(connection);
}
else
send_queue.clear();
}));
}
else {
auto send_queued=send_queue.begin();
if(send_queued->callback)
send_queued->callback(ec);
send_queue.clear();
}
}));
});
}
std::atomic<bool> closed;
std::unique_ptr<asio::system_timer> timer_idle;
void read_remote_endpoint_data() {
try {
remote_endpoint_address=socket->lowest_layer().remote_endpoint().address().to_string();
remote_endpoint_port=socket->lowest_layer().remote_endpoint().port();
}
catch (...) {}
}
};
class Message : public std::istream, public std::enable_shared_from_this<Message> {
friend class SocketServerBase<socket_type>;
public:
unsigned char fin_rsv_opcode;
size_t size() const {
return length;
}
std::string string() const {
std::stringstream ss;
ss << rdbuf();
return ss.str();
}
private:
Message(): std::istream(&streambuf), fin_rsv_opcode(0), length(0) {}
size_t length;
asio::streambuf streambuf;
};
class Endpoint : public std::enable_shared_from_this<Endpoint> {
friend class SocketServerBase<socket_type>;
std::unordered_set<std::shared_ptr<Connection> > connections;
std::mutex connections_mutex;
public:
std::function<void(std::shared_ptr<Connection>)> on_open;
std::function<void(std::shared_ptr<Connection>, std::shared_ptr<Message>)> on_message;
std::function<void(std::shared_ptr<Connection>, int, const std::string&)> on_close;
std::function<void(std::shared_ptr<Connection>, const std::error_code&)> on_error;
std::unordered_set<std::shared_ptr<Connection> > get_connections() {
std::lock_guard<std::mutex> lock(connections_mutex);
auto copy=connections;
return copy;
}
};
class Config {
friend class SocketServerBase<socket_type>;
Config(unsigned short port) : port(port) {}
public:
/// Port number to use. Defaults to 80 for HTTP and 443 for HTTPS.
unsigned short port;
/// Number of threads that the server will use when start() is called. Defaults to 1 thread.
size_t thread_pool_size=1;
/// Timeout on request handling. Defaults to 5 seconds.
size_t timeout_request=5;
/// Idle timeout. Defaults to no timeout.
size_t timeout_idle=0;
/// IPv4 address in dotted decimal form or IPv6 address in hexadecimal notation.
/// If empty, the address will be any address.
std::string address;
/// Set to false to avoid binding the socket to an address that is already in use. Defaults to true.
bool reuse_address=true;
};
///Set before calling start().
Config config;
private:
class regex_orderable : public std::regex {
std::string str;
path2regex::Keys keys;
public:
regex_orderable(const char *regex_cstr) : std::regex(path2regex::path_to_regex(regex_cstr, keys)), str(regex_cstr) {}
regex_orderable(const std::string &regex_cstr) : std::regex(path2regex::path_to_regex(regex_cstr, keys)), str(regex_cstr) {}
bool operator<(const regex_orderable &rhs) const {
return str<rhs.str;
}
};
public:
/// Warning: do not access (red or write) m_endpoint without holding m_endpoint_mutex
std::map<regex_orderable, Endpoint> m_endpoint;
std::mutex m_endpoint_mutex;
virtual void start() {
if(!io_context)
io_context=std::make_shared<asio::io_context>();
if(io_context->stopped())
io_context->reset();
asio::ip::tcp::endpoint endpoint;
if(config.address.size()>0)
endpoint=asio::ip::tcp::endpoint(asio::ip::address::from_string(config.address), config.port);
else
endpoint=asio::ip::tcp::endpoint(asio::ip::tcp::v4(), config.port);
if(!acceptor)
acceptor= std::make_unique<asio::ip::tcp::acceptor>(*io_context);
acceptor->open(endpoint.protocol());
acceptor->set_option(asio::socket_base::reuse_address(config.reuse_address));
acceptor->bind(endpoint);
acceptor->listen();
accept();
io_context->run();
}
void stop() {
acceptor->close();
io_context->stop();
std::lock_guard<std::mutex> lock(m_endpoint_mutex);
for(auto& p: m_endpoint)
p.second.connections.clear();
}
///fin_rsv_opcode: 129=one fragment, text, 130=one fragment, binary, 136=close connection.
///See http://tools.ietf.org/html/rfc6455#section-5.2 for more information
void send(const std::shared_ptr<Connection> &connection, const std::shared_ptr<SendStream> &message_stream,
const std::function<void(const std::error_code&)>& callback=nullptr,
unsigned char fin_rsv_opcode=129) const {
if(fin_rsv_opcode!=136)
timer_idle_reset(connection);
auto header_stream = std::make_shared<SendStream>();
size_t length=message_stream->size();
header_stream->put(fin_rsv_opcode);
//unmasked (first length byte<128)
if(length>=126) {
int num_bytes;
if(length>0xffff) {
num_bytes=8;
header_stream->put(127);
}
else {
num_bytes=2;
header_stream->put(126);
}
for(int c=num_bytes-1;c>=0;c--) {
header_stream->put((static_cast<unsigned long long>(length) >> (8 * c)) % 256);
}
}
else
header_stream->put(static_cast<unsigned char>(length));
connection->strand.post([this, connection, header_stream, message_stream, callback]() {
connection->send_queue.emplace_back(header_stream, message_stream, callback);
if(connection->send_queue.size()==1)
connection->send_from_queue(connection);
});
}
void send_close(const std::shared_ptr<Connection> &connection, int status, const std::string& reason="",
const std::function<void(const std::error_code&)>& callback=nullptr) const {
//Send close only once (in case close is initiated by server)
if(connection->closed)
return;
connection->closed=true;
auto send_stream=std::make_shared<SendStream>();
send_stream->put(status>>8);
send_stream->put(status%256);
*send_stream << reason;
//fin_rsv_opcode=136: message close
send(connection, send_stream, callback, 136);
}
std::unordered_set<std::shared_ptr<Connection> > get_connections() {
std::unordered_set<std::shared_ptr<Connection> > all_connections;
std::lock_guard<std::mutex> lock(m_endpoint_mutex);
for(auto& e: m_endpoint) {
std::lock_guard<std::mutex> lock(e.second.connections_mutex);
all_connections.insert(e.second.connections.begin(), e.second.connections.end());
}
return all_connections;
}
/**
* Upgrades a request, from for instance Simple-Web-Server, to a WebSocket connection.
* The parameters are moved to the Connection object.
* See also Server::on_upgrade in the Simple-Web-Server project.
* The socket's io_service is used, thus running start() is not needed.
*
* Example use:
* server.on_upgrade=[&socket_server] (auto socket, auto request) {
* auto connection=std::make_shared<SimpleWeb::SocketServer<SimpleWeb::WS>::Connection>(socket);
* connection->method=std::move(request->method);
* connection->path=std::move(request->path);
* connection->http_version=std::move(request->http_version);
* connection->header=std::move(request->header);
* connection->remote_endpoint_address=std::move(request->remote_endpoint_address);
* connection->remote_endpoint_port=request->remote_endpoint_port;
* socket_server.upgrade(connection);
* }
*/
void upgrade(const std::shared_ptr<Connection> &connection) {
auto read_buffer=std::make_shared<asio::streambuf>();
write_handshake(connection, read_buffer);
}
/// If you have your own asio::io_context, store its pointer here before running start().
/// You might also want to set config.num_threads to 0.
std::shared_ptr<asio::io_context> io_context;
protected:
const std::string ws_magic_string="258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
std::unique_ptr<asio::ip::tcp::acceptor> acceptor;
std::vector<std::thread> threads;
SocketServerBase(unsigned short port) :
config(port) {}
virtual void accept()=0;
std::shared_ptr<asio::system_timer> get_timeout_timer(const std::shared_ptr<Connection> &connection, size_t seconds) {
if (seconds == 0)
return nullptr;
auto timer = std::make_shared<asio::system_timer>(connection->socket->get_io_service());
timer->expires_at(std::chrono::system_clock::now() + std::chrono::seconds(static_cast<long>(seconds)));
timer->async_wait([connection](const std::error_code& ec){
if(!ec) {
connection->socket->lowest_layer().shutdown(asio::ip::tcp::socket::shutdown_both);
connection->socket->lowest_layer().close();
}
});
return timer;
}
void read_handshake(const std::shared_ptr<Connection> &connection) {
connection->read_remote_endpoint_data();
//Create new read_buffer for async_read_until()
//Shared_ptr is used to pass temporary objects to the asynchronous functions
auto read_buffer = std::make_shared<asio::streambuf>();
//Set timeout on the following asio::async-read or write function
auto timer = get_timeout_timer(connection, config.timeout_request);
asio::async_read_until(*connection->socket, *read_buffer, "\r\n\r\n",
[this, connection, read_buffer, timer]
(const std::error_code& ec, size_t /*bytes_transferred*/) {
if(timer)
timer->cancel();
if(!ec) {
//Convert to istream to extract string-lines
std::istream stream(read_buffer.get());
parse_handshake(connection, stream);
write_handshake(connection, read_buffer);
}
});
}
void parse_handshake(const std::shared_ptr<Connection> &connection, std::istream& stream) const {
std::string line;
getline(stream, line);
size_t method_end;
if((method_end=line.find(' '))!=std::string::npos) {
size_t path_end;
if((path_end=line.find(' ', method_end+1))!=std::string::npos) {
connection->method=line.substr(0, method_end);
connection->path=line.substr(method_end+1, path_end-method_end-1);
if((path_end+6)<line.size())
connection->http_version=line.substr(path_end+6, line.size()-(path_end+6)-1);
else
connection->http_version="1.1";
getline(stream, line);
size_t param_end;
while((param_end=line.find(':'))!=std::string::npos) {
size_t value_start=param_end+1;
if((value_start)<line.size()) {
if(line[value_start]==' ')
value_start++;
if(value_start<line.size())
connection->header.emplace(line.substr(0, param_end), line.substr(value_start, line.size()-value_start-1));
}
getline(stream, line);
}
}
}
}
void write_handshake(const std::shared_ptr<Connection> &connection, const std::shared_ptr<asio::streambuf> &read_buffer) {
//Find path- and method-match, and generate response
std::lock_guard<std::mutex> lock(m_endpoint_mutex);
for (auto &regex_endpoint : m_endpoint) {
std::smatch path_match;
if(std::regex_match(connection->path, path_match, regex_endpoint.first)) {
auto write_buffer = std::make_shared<asio::streambuf>();
std::ostream handshake(write_buffer.get());
if(generate_handshake(connection, handshake)) {
connection->path_match=std::move(path_match);
//Capture write_buffer in lambda so it is not destroyed before async_write is finished
asio::async_write(*connection->socket, *write_buffer,
[this, connection, write_buffer, read_buffer, &regex_endpoint]
(const std::error_code& ec, size_t /*bytes_transferred*/) {
if(!ec) {
connection_open(connection, regex_endpoint.second);
read_message(connection, read_buffer, regex_endpoint.second);
}
else
connection_error(connection, regex_endpoint.second, ec);
});
}
return;
}
}
}
bool generate_handshake(const std::shared_ptr<Connection> &connection, std::ostream& handshake) const {
auto header_it = connection->header.find("Sec-WebSocket-Key");
if (header_it == connection->header.end())
return false;
auto sha1=sha1_encode(header_it->second + ws_magic_string);
handshake << "HTTP/1.1 101 Web Socket Protocol Handshake\r\n";
handshake << "Upgrade: websocket\r\n";
handshake << "Connection: Upgrade\r\n";
handshake << "Sec-WebSocket-Accept: " << base64_encode(sha1) << "\r\n";
handshake << "\r\n";
return true;
}
void read_message(const std::shared_ptr<Connection> &connection,
const std::shared_ptr<asio::streambuf> &read_buffer, Endpoint& endpoint) const {
asio::async_read(*connection->socket, *read_buffer, asio::transfer_exactly(2),
[this, connection, read_buffer, &endpoint]
(const std::error_code& ec, size_t bytes_transferred) {
if(!ec) {
if(bytes_transferred==0) { //TODO: why does this happen sometimes?
read_message(connection, read_buffer, endpoint);
return;
}
std::istream stream(read_buffer.get());
std::vector<unsigned char> first_bytes;
first_bytes.resize(2);
stream.read(reinterpret_cast<char*>(&first_bytes[0]), 2);
unsigned char fin_rsv_opcode=first_bytes[0];
//Close connection if unmasked message from client (protocol error)
if(first_bytes[1]<128) {
const std::string reason("message from client not masked");
send_close(connection, 1002, reason, [this, connection](const std::error_code& /*ec*/) {});
connection_close(connection, endpoint, 1002, reason);
return;
}
size_t length=(first_bytes[1]&127);
if(length==126) {
//2 next bytes is the size of content
asio::async_read(*connection->socket, *read_buffer, asio::transfer_exactly(2),
[this, connection, read_buffer, &endpoint, fin_rsv_opcode]
(const std::error_code& ec, size_t /*bytes_transferred*/) {
if(!ec) {
std::istream stream(read_buffer.get());
std::vector<unsigned char> length_bytes;
length_bytes.resize(2);
stream.read(reinterpret_cast<char*>(&length_bytes[0]), 2);
size_t length=0;
int num_bytes=2;
for(int c=0;c<num_bytes;c++)
length+=length_bytes[c]<<(8*(num_bytes-1-c));
read_message_content(connection, read_buffer, length, endpoint, fin_rsv_opcode);
}
else
connection_error(connection, endpoint, ec);
});
}
else if(length==127) {
//8 next bytes is the size of content
asio::async_read(*connection->socket, *read_buffer, asio::transfer_exactly(8),
[this, connection, read_buffer, &endpoint, fin_rsv_opcode]
(const std::error_code& ec, size_t /*bytes_transferred*/) {
if(!ec) {
std::istream stream(read_buffer.get());
std::vector<unsigned char> length_bytes;
length_bytes.resize(8);
stream.read(reinterpret_cast<char*>(&length_bytes[0]), 8);
size_t length=0;
int num_bytes=8;
for(int c=0;c<num_bytes;c++)
length+=length_bytes[c]<<(8*(num_bytes-1-c));
read_message_content(connection, read_buffer, length, endpoint, fin_rsv_opcode);
}
else
connection_error(connection, endpoint, ec);
});
}
else
read_message_content(connection, read_buffer, length, endpoint, fin_rsv_opcode);
}
else
connection_error(connection, endpoint, ec);
});
}
void read_message_content(const std::shared_ptr<Connection> &connection, const std::shared_ptr<asio::streambuf> &read_buffer,
size_t length, Endpoint& endpoint, unsigned char fin_rsv_opcode) const {
asio::async_read(*connection->socket, *read_buffer, asio::transfer_exactly(4+length),
[this, connection, read_buffer, length, &endpoint, fin_rsv_opcode]
(const std::error_code& ec, size_t /*bytes_transferred*/) {
if(!ec) {
std::istream raw_message_data(read_buffer.get());
//Read mask
std::vector<unsigned char> mask;
mask.resize(4);
raw_message_data.read(reinterpret_cast<char*>(&mask[0]), 4);
std::shared_ptr<Message> message(new Message());
message->length=length;
message->fin_rsv_opcode=fin_rsv_opcode;
std::ostream message_data_out_stream(&message->streambuf);
for(size_t c=0;c<length;c++) {
message_data_out_stream.put(raw_message_data.get()^mask[c%4]);
}
//If connection close
if((fin_rsv_opcode&0x0f)==8) {
int status=0;
if(length>=2) {
unsigned char byte1=message->get();
unsigned char byte2=message->get();
status=(byte1<<8)+byte2;
}
auto reason=message->string();
send_close(connection, status, reason, [this, connection](const std::error_code& /*ec*/) {});
connection_close(connection, endpoint, status, reason);
return;
}
else {
//If ping
if((fin_rsv_opcode&0x0f)==9) {
//send pong
auto empty_send_stream=std::make_shared<SendStream>();
send(connection, empty_send_stream, nullptr, fin_rsv_opcode+1);
}
else if(endpoint.on_message) {
timer_idle_reset(connection);
endpoint.on_message(connection, message);
}
//Next message
read_message(connection, read_buffer, endpoint);
}
}
else
connection_error(connection, endpoint, ec);
});
}
void connection_open(const std::shared_ptr<Connection> &connection, Endpoint& endpoint) {
timer_idle_init(connection);
{
std::lock_guard<std::mutex> lock(endpoint.connections_mutex);
endpoint.connections.insert(connection);
}
if(endpoint.on_open)
endpoint.on_open(connection);
}
void connection_close(const std::shared_ptr<Connection> &connection, Endpoint& endpoint, int status, const std::string& reason) const {
timer_idle_cancel(connection);
{
std::lock_guard<std::mutex> lock(endpoint.connections_mutex);
endpoint.connections.erase(connection);
}
if(endpoint.on_close)
endpoint.on_close(connection, status, reason);
}
void connection_error(const std::shared_ptr<Connection> &connection, Endpoint& endpoint, const std::error_code& ec) const {
timer_idle_cancel(connection);
{
std::lock_guard<std::mutex> lock(endpoint.connections_mutex);
endpoint.connections.erase(connection);
}
if(endpoint.on_error) {
std::error_code ec_tmp=ec;
endpoint.on_error(connection, ec_tmp);
}
}
void timer_idle_init(const std::shared_ptr<Connection> &connection) {
if(config.timeout_idle>0) {
connection->timer_idle= std::make_unique<asio::system_timer>(connection->socket->get_io_service());
connection->timer_idle->expires_from_now(std::chrono::seconds(static_cast<unsigned long>(config.timeout_idle)));
timer_idle_expired_function(connection);
}
}
void timer_idle_reset(const std::shared_ptr<Connection> &connection) const {
if(config.timeout_idle>0 && connection->timer_idle->expires_from_now(std::chrono::seconds(static_cast<unsigned long>(config.timeout_idle)))>0)
timer_idle_expired_function(connection);
}
void timer_idle_cancel(const std::shared_ptr<Connection> &connection) const {
if(config.timeout_idle>0)
connection->timer_idle->cancel();
}
void timer_idle_expired_function(const std::shared_ptr<Connection> &connection) const {
connection->timer_idle->async_wait([this, connection](const std::error_code& ec){
if(!ec)
send_close(connection, 1000, "idle timeout"); //1000=normal closure
});
}
};
template<class socket_type>
class SocketServer : public SocketServerBase<socket_type> {
public:
SocketServer(unsigned short port, size_t timeout_request, size_t timeout_idle)
: SocketServerBase<socket_type>(port, timeout_request, timeout_idle)
{
}
};
using WS = asio::ip::tcp::socket;
template<>
class SocketServer<WS> : public SocketServerBase<WS> {
public:
SocketServer() : SocketServerBase<WS>(80) {}
protected:
void accept() override {
//Create new socket for this connection (stored in Connection::socket)
//Shared_ptr is used to pass temporary objects to the asynchronous functions
std::shared_ptr<Connection> connection(new Connection(new WS(*io_context)));
acceptor->async_accept(*connection->socket, [this, connection](const std::error_code& ec) {
//Immediately start accepting a new connection (if io_context hasn't been stopped)
if (ec != asio::error::operation_aborted)
accept();
if(!ec) {
asio::ip::tcp::no_delay option(true);
connection->socket->set_option(option);
read_handshake(connection);
}
});
}
};
class ws_server : public SocketServer<WS> {
public:
ws_server() : SocketServer<WS>::SocketServer() {}
std::string remote_endpoint_address;
unsigned short remote_endpoint_port;
Connection(unsigned short remote_endpoint_port) : remote_endpoint_port(remote_endpoint_port) {}
virtual ~Connection() {}
};
class ws_server;
}
#endif /* SERVER_WS_HPP */
#endif /* MAME_LIB_UTIL_SERVER_WS_HPP */

View File

@ -0,0 +1,717 @@
// license:MIT
// copyright-holders:Ole Christian Eidheim, Miodrag Milanovic
#ifndef MAME_LIB_UTIL_SERVER_WS_IMPL_HPP
#define MAME_LIB_UTIL_SERVER_WS_IMPL_HPP
#include "path_to_regex.hpp"
#include "crypto.hpp"
#include "asio.h"
#include "asio/system_timer.hpp"
#include "server_ws.hpp"
#include <unordered_map>
#include <thread>
#include <mutex>
#include <unordered_set>
#include <list>
#include <memory>
#include <atomic>
#include <iostream>
#include <map>
#include <chrono>
#include <regex>
#ifndef CASE_INSENSITIVE_EQUALS_AND_HASH
#define CASE_INSENSITIVE_EQUALS_AND_HASH
class case_insensitive_equals {
public:
bool operator()(const std::string &key1, const std::string &key2) const {
return key1.size() == key2.size()
&& equal(key1.cbegin(), key1.cend(), key2.cbegin(),
[](std::string::value_type key1v, std::string::value_type key2v)
{ return tolower(key1v) == tolower(key2v); });
}
};
class case_insensitive_hash {
public:
size_t operator()(const std::string &key) const {
size_t seed = 0;
for (auto &c : key) {
std::hash<char> hasher;
seed ^= hasher(std::tolower(c)) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
}
return seed;
}
};
#endif
namespace webpp {
template <class socket_type>
class SocketServer;
template <class socket_type>
class SocketServerBase {
public:
virtual ~SocketServerBase() {}
class SendStream : public std::ostream, public std::enable_shared_from_this<SendStream> {
friend class SocketServerBase<socket_type>;
asio::streambuf streambuf;
public:
SendStream(): std::ostream(&streambuf) {}
size_t size() const {
return streambuf.size();
}
};
class Connection : public webpp::Connection {
friend class SocketServerBase<socket_type>;
friend class SocketServer<socket_type>;
using super = webpp::Connection;
public:
virtual ~Connection() {}
explicit Connection(const std::shared_ptr<socket_type> &socket) : super(0), socket(socket), strand(socket->get_io_service()), closed(false) { }
private:
explicit Connection(socket_type *socket): super(0), socket(socket), strand(socket->get_io_service()), closed(false) { }
class SendData {
public:
SendData(const std::shared_ptr<SendStream> &header_stream, const std::shared_ptr<SendStream> &message_stream,
const std::function<void(const std::error_code)> &callback) :
header_stream(header_stream), message_stream(message_stream), callback(callback) {}
std::shared_ptr<SendStream> header_stream;
std::shared_ptr<SendStream> message_stream;
std::function<void(const std::error_code)> callback;
};
std::shared_ptr<socket_type> socket;
asio::io_context::strand strand;
std::list<SendData> send_queue;
void send_from_queue(const std::shared_ptr<Connection> &connection) {
strand.post([this, connection]() {
asio::async_write(*socket, send_queue.begin()->header_stream->streambuf,
strand.wrap([this, connection](const std::error_code& ec, size_t /*bytes_transferred*/) {
if(!ec) {
asio::async_write(*socket, send_queue.begin()->message_stream->streambuf,
strand.wrap([this, connection]
(const std::error_code& ec, size_t /*bytes_transferred*/) {
auto send_queued=send_queue.begin();
if(send_queued->callback)
send_queued->callback(ec);
if(!ec) {
send_queue.erase(send_queued);
if(send_queue.size()>0)
send_from_queue(connection);
}
else
send_queue.clear();
}));
}
else {
auto send_queued=send_queue.begin();
if(send_queued->callback)
send_queued->callback(ec);
send_queue.clear();
}
}));
});
}
std::atomic<bool> closed;
std::unique_ptr<asio::system_timer> timer_idle;
void read_remote_endpoint_data() {
try {
remote_endpoint_address=socket->lowest_layer().remote_endpoint().address().to_string();
remote_endpoint_port=socket->lowest_layer().remote_endpoint().port();
}
catch (...) {}
}
};
class Message : public std::istream, public std::enable_shared_from_this<Message> {
friend class SocketServerBase<socket_type>;
public:
unsigned char fin_rsv_opcode;
size_t size() const {
return length;
}
std::string string() const {
std::stringstream ss;
ss << rdbuf();
return ss.str();
}
private:
Message(): std::istream(&streambuf), fin_rsv_opcode(0), length(0) {}
size_t length;
asio::streambuf streambuf;
};
class Endpoint : public std::enable_shared_from_this<Endpoint> {
friend class SocketServerBase<socket_type>;
std::unordered_set<std::shared_ptr<Connection> > connections;
std::mutex connections_mutex;
public:
std::function<void(std::shared_ptr<Connection>)> on_open;
std::function<void(std::shared_ptr<Connection>, std::shared_ptr<Message>)> on_message;
std::function<void(std::shared_ptr<Connection>, int, const std::string&)> on_close;
std::function<void(std::shared_ptr<Connection>, const std::error_code&)> on_error;
std::unordered_set<std::shared_ptr<Connection> > get_connections() {
std::lock_guard<std::mutex> lock(connections_mutex);
auto copy=connections;
return copy;
}
};
class Config {
friend class SocketServerBase<socket_type>;
Config(unsigned short port) : port(port) {}
public:
/// Port number to use. Defaults to 80 for HTTP and 443 for HTTPS.
unsigned short port;
/// Number of threads that the server will use when start() is called. Defaults to 1 thread.
size_t thread_pool_size=1;
/// Timeout on request handling. Defaults to 5 seconds.
size_t timeout_request=5;
/// Idle timeout. Defaults to no timeout.
size_t timeout_idle=0;
/// IPv4 address in dotted decimal form or IPv6 address in hexadecimal notation.
/// If empty, the address will be any address.
std::string address;
/// Set to false to avoid binding the socket to an address that is already in use. Defaults to true.
bool reuse_address=true;
};
///Set before calling start().
Config config;
private:
class regex_orderable : public std::regex {
std::string str;
path2regex::Keys keys;
public:
regex_orderable(const char *regex_cstr) : std::regex(path2regex::path_to_regex(regex_cstr, keys)), str(regex_cstr) {}
regex_orderable(const std::string &regex_cstr) : std::regex(path2regex::path_to_regex(regex_cstr, keys)), str(regex_cstr) {}
bool operator<(const regex_orderable &rhs) const {
return str<rhs.str;
}
};
public:
/// Warning: do not access (red or write) m_endpoint without holding m_endpoint_mutex
std::map<regex_orderable, Endpoint> m_endpoint;
std::mutex m_endpoint_mutex;
virtual void start() {
if(!io_context)
io_context=std::make_shared<asio::io_context>();
if(io_context->stopped())
io_context->reset();
asio::ip::tcp::endpoint endpoint;
if(config.address.size()>0)
endpoint=asio::ip::tcp::endpoint(asio::ip::address::from_string(config.address), config.port);
else
endpoint=asio::ip::tcp::endpoint(asio::ip::tcp::v4(), config.port);
if(!acceptor)
acceptor= std::make_unique<asio::ip::tcp::acceptor>(*io_context);
acceptor->open(endpoint.protocol());
acceptor->set_option(asio::socket_base::reuse_address(config.reuse_address));
acceptor->bind(endpoint);
acceptor->listen();
accept();
io_context->run();
}
void stop() {
acceptor->close();
io_context->stop();
std::lock_guard<std::mutex> lock(m_endpoint_mutex);
for(auto& p: m_endpoint)
p.second.connections.clear();
}
///fin_rsv_opcode: 129=one fragment, text, 130=one fragment, binary, 136=close connection.
///See http://tools.ietf.org/html/rfc6455#section-5.2 for more information
void send(std::shared_ptr<webpp::Connection> conn, const std::shared_ptr<SendStream> &message_stream,
const std::function<void(const std::error_code&)>& callback=nullptr,
unsigned char fin_rsv_opcode=129) const {
std::shared_ptr<Connection> connection = std::dynamic_pointer_cast<Connection> (conn);
if (!connection) return;
if(fin_rsv_opcode!=136)
timer_idle_reset(connection);
auto header_stream = std::make_shared<SendStream>();
size_t length=message_stream->size();
header_stream->put(fin_rsv_opcode);
//unmasked (first length byte<128)
if(length>=126) {
int num_bytes;
if(length>0xffff) {
num_bytes=8;
header_stream->put(127);
}
else {
num_bytes=2;
header_stream->put(126);
}
for(int c=num_bytes-1;c>=0;c--) {
header_stream->put((static_cast<unsigned long long>(length) >> (8 * c)) % 256);
}
}
else
header_stream->put(static_cast<unsigned char>(length));
connection->strand.post([this, connection, header_stream, message_stream, callback]() {
connection->send_queue.emplace_back(header_stream, message_stream, callback);
if(connection->send_queue.size()==1)
connection->send_from_queue(connection);
});
}
void send_close(std::shared_ptr<webpp::Connection> conn, int status, const std::string& reason="",
const std::function<void(const std::error_code&)>& callback=nullptr) const {
std::shared_ptr<Connection> connection = std::dynamic_pointer_cast<Connection> (conn);
if (!connection) return;
//Send close only once (in case close is initiated by server)
if(connection->closed)
return;
connection->closed=true;
auto send_stream=std::make_shared<SendStream>();
send_stream->put(status>>8);
send_stream->put(status%256);
*send_stream << reason;
//fin_rsv_opcode=136: message close
send(connection, send_stream, callback, 136);
}
std::unordered_set<std::shared_ptr<Connection> > get_connections() {
std::unordered_set<std::shared_ptr<Connection> > all_connections;
std::lock_guard<std::mutex> lock(m_endpoint_mutex);
for(auto& e: m_endpoint) {
std::lock_guard<std::mutex> lock(e.second.connections_mutex);
all_connections.insert(e.second.connections.begin(), e.second.connections.end());
}
return all_connections;
}
/**
* Upgrades a request, from for instance Simple-Web-Server, to a WebSocket connection.
* The parameters are moved to the Connection object.
* See also Server::on_upgrade in the Simple-Web-Server project.
* The socket's io_service is used, thus running start() is not needed.
*
* Example use:
* server.on_upgrade=[&socket_server] (auto socket, auto request) {
* auto connection=std::make_shared<SimpleWeb::SocketServer<SimpleWeb::WS>::Connection>(socket);
* connection->method=std::move(request->method);
* connection->path=std::move(request->path);
* connection->http_version=std::move(request->http_version);
* connection->header=std::move(request->header);
* connection->remote_endpoint_address=std::move(request->remote_endpoint_address);
* connection->remote_endpoint_port=request->remote_endpoint_port;
* socket_server.upgrade(connection);
* }
*/
void upgrade(const std::shared_ptr<Connection> &connection) {
auto read_buffer=std::make_shared<asio::streambuf>();
write_handshake(connection, read_buffer);
}
/// If you have your own asio::io_context, store its pointer here before running start().
/// You might also want to set config.num_threads to 0.
std::shared_ptr<asio::io_context> io_context;
protected:
const std::string ws_magic_string="258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
std::unique_ptr<asio::ip::tcp::acceptor> acceptor;
std::vector<std::thread> threads;
SocketServerBase(unsigned short port) :
config(port) {}
virtual void accept()=0;
std::shared_ptr<asio::system_timer> get_timeout_timer(const std::shared_ptr<Connection> &connection, size_t seconds) {
if (seconds == 0)
return nullptr;
auto timer = std::make_shared<asio::system_timer>(connection->socket->get_io_service());
timer->expires_at(std::chrono::system_clock::now() + std::chrono::seconds(static_cast<long>(seconds)));
timer->async_wait([connection](const std::error_code& ec){
if(!ec) {
connection->socket->lowest_layer().shutdown(asio::ip::tcp::socket::shutdown_both);
connection->socket->lowest_layer().close();
}
});
return timer;
}
void read_handshake(const std::shared_ptr<Connection> &connection) {
connection->read_remote_endpoint_data();
//Create new read_buffer for async_read_until()
//Shared_ptr is used to pass temporary objects to the asynchronous functions
auto read_buffer = std::make_shared<asio::streambuf>();
//Set timeout on the following asio::async-read or write function
auto timer = get_timeout_timer(connection, config.timeout_request);
asio::async_read_until(*connection->socket, *read_buffer, "\r\n\r\n",
[this, connection, read_buffer, timer]
(const std::error_code& ec, size_t /*bytes_transferred*/) {
if(timer)
timer->cancel();
if(!ec) {
//Convert to istream to extract string-lines
std::istream stream(read_buffer.get());
parse_handshake(connection, stream);
write_handshake(connection, read_buffer);
}
});
}
void parse_handshake(const std::shared_ptr<Connection> &connection, std::istream& stream) const {
std::string line;
getline(stream, line);
size_t method_end;
if((method_end=line.find(' '))!=std::string::npos) {
size_t path_end;
if((path_end=line.find(' ', method_end+1))!=std::string::npos) {
connection->method=line.substr(0, method_end);
connection->path=line.substr(method_end+1, path_end-method_end-1);
if((path_end+6)<line.size())
connection->http_version=line.substr(path_end+6, line.size()-(path_end+6)-1);
else
connection->http_version="1.1";
getline(stream, line);
size_t param_end;
while((param_end=line.find(':'))!=std::string::npos) {
size_t value_start=param_end+1;
if((value_start)<line.size()) {
if(line[value_start]==' ')
value_start++;
if(value_start<line.size())
connection->header.emplace(line.substr(0, param_end), line.substr(value_start, line.size()-value_start-1));
}
getline(stream, line);
}
}
}
}
void write_handshake(const std::shared_ptr<Connection> &connection, const std::shared_ptr<asio::streambuf> &read_buffer) {
//Find path- and method-match, and generate response
std::lock_guard<std::mutex> lock(m_endpoint_mutex);
for (auto &regex_endpoint : m_endpoint) {
std::smatch path_match;
if(std::regex_match(connection->path, path_match, regex_endpoint.first)) {
auto write_buffer = std::make_shared<asio::streambuf>();
std::ostream handshake(write_buffer.get());
if(generate_handshake(connection, handshake)) {
connection->path_match=std::move(path_match);
//Capture write_buffer in lambda so it is not destroyed before async_write is finished
asio::async_write(*connection->socket, *write_buffer,
[this, connection, write_buffer, read_buffer, &regex_endpoint]
(const std::error_code& ec, size_t /*bytes_transferred*/) {
if(!ec) {
connection_open(connection, regex_endpoint.second);
read_message(connection, read_buffer, regex_endpoint.second);
}
else
connection_error(connection, regex_endpoint.second, ec);
});
}
return;
}
}
}
bool generate_handshake(const std::shared_ptr<Connection> &connection, std::ostream& handshake) const {
auto header_it = connection->header.find("Sec-WebSocket-Key");
if (header_it == connection->header.end())
return false;
auto sha1=sha1_encode(header_it->second + ws_magic_string);
handshake << "HTTP/1.1 101 Web Socket Protocol Handshake\r\n";
handshake << "Upgrade: websocket\r\n";
handshake << "Connection: Upgrade\r\n";
handshake << "Sec-WebSocket-Accept: " << base64_encode(sha1) << "\r\n";
handshake << "\r\n";
return true;
}
void read_message(const std::shared_ptr<Connection> &connection,
const std::shared_ptr<asio::streambuf> &read_buffer, Endpoint& endpoint) const {
asio::async_read(*connection->socket, *read_buffer, asio::transfer_exactly(2),
[this, connection, read_buffer, &endpoint]
(const std::error_code& ec, size_t bytes_transferred) {
if(!ec) {
if(bytes_transferred==0) { //TODO: why does this happen sometimes?
read_message(connection, read_buffer, endpoint);
return;
}
std::istream stream(read_buffer.get());
std::vector<unsigned char> first_bytes;
first_bytes.resize(2);
stream.read(reinterpret_cast<char*>(&first_bytes[0]), 2);
unsigned char fin_rsv_opcode=first_bytes[0];
//Close connection if unmasked message from client (protocol error)
if(first_bytes[1]<128) {
const std::string reason("message from client not masked");
send_close(connection, 1002, reason, [this, connection](const std::error_code& /*ec*/) {});
connection_close(connection, endpoint, 1002, reason);
return;
}
size_t length=(first_bytes[1]&127);
if(length==126) {
//2 next bytes is the size of content
asio::async_read(*connection->socket, *read_buffer, asio::transfer_exactly(2),
[this, connection, read_buffer, &endpoint, fin_rsv_opcode]
(const std::error_code& ec, size_t /*bytes_transferred*/) {
if(!ec) {
std::istream stream(read_buffer.get());
std::vector<unsigned char> length_bytes;
length_bytes.resize(2);
stream.read(reinterpret_cast<char*>(&length_bytes[0]), 2);
size_t length=0;
int num_bytes=2;
for(int c=0;c<num_bytes;c++)
length+=length_bytes[c]<<(8*(num_bytes-1-c));
read_message_content(connection, read_buffer, length, endpoint, fin_rsv_opcode);
}
else
connection_error(connection, endpoint, ec);
});
}
else if(length==127) {
//8 next bytes is the size of content
asio::async_read(*connection->socket, *read_buffer, asio::transfer_exactly(8),
[this, connection, read_buffer, &endpoint, fin_rsv_opcode]
(const std::error_code& ec, size_t /*bytes_transferred*/) {
if(!ec) {
std::istream stream(read_buffer.get());
std::vector<unsigned char> length_bytes;
length_bytes.resize(8);
stream.read(reinterpret_cast<char*>(&length_bytes[0]), 8);
size_t length=0;
int num_bytes=8;
for(int c=0;c<num_bytes;c++)
length+=length_bytes[c]<<(8*(num_bytes-1-c));
read_message_content(connection, read_buffer, length, endpoint, fin_rsv_opcode);
}
else
connection_error(connection, endpoint, ec);
});
}
else
read_message_content(connection, read_buffer, length, endpoint, fin_rsv_opcode);
}
else
connection_error(connection, endpoint, ec);
});
}
void read_message_content(const std::shared_ptr<Connection> &connection, const std::shared_ptr<asio::streambuf> &read_buffer,
size_t length, Endpoint& endpoint, unsigned char fin_rsv_opcode) const {
asio::async_read(*connection->socket, *read_buffer, asio::transfer_exactly(4+length),
[this, connection, read_buffer, length, &endpoint, fin_rsv_opcode]
(const std::error_code& ec, size_t /*bytes_transferred*/) {
if(!ec) {
std::istream raw_message_data(read_buffer.get());
//Read mask
std::vector<unsigned char> mask;
mask.resize(4);
raw_message_data.read(reinterpret_cast<char*>(&mask[0]), 4);
std::shared_ptr<Message> message(new Message());
message->length=length;
message->fin_rsv_opcode=fin_rsv_opcode;
std::ostream message_data_out_stream(&message->streambuf);
for(size_t c=0;c<length;c++) {
message_data_out_stream.put(raw_message_data.get()^mask[c%4]);
}
//If connection close
if((fin_rsv_opcode&0x0f)==8) {
int status=0;
if(length>=2) {
unsigned char byte1=message->get();
unsigned char byte2=message->get();
status=(byte1<<8)+byte2;
}
auto reason=message->string();
send_close(connection, status, reason, [this, connection](const std::error_code& /*ec*/) {});
connection_close(connection, endpoint, status, reason);
return;
}
else {
//If ping
if((fin_rsv_opcode&0x0f)==9) {
//send pong
auto empty_send_stream=std::make_shared<SendStream>();
send(connection, empty_send_stream, nullptr, fin_rsv_opcode+1);
}
else if(endpoint.on_message) {
timer_idle_reset(connection);
endpoint.on_message(connection, message);
}
//Next message
read_message(connection, read_buffer, endpoint);
}
}
else
connection_error(connection, endpoint, ec);
});
}
void connection_open(const std::shared_ptr<Connection> &connection, Endpoint& endpoint) {
timer_idle_init(connection);
{
std::lock_guard<std::mutex> lock(endpoint.connections_mutex);
endpoint.connections.insert(connection);
}
if(endpoint.on_open)
endpoint.on_open(connection);
}
void connection_close(const std::shared_ptr<Connection> &connection, Endpoint& endpoint, int status, const std::string& reason) const {
timer_idle_cancel(connection);
{
std::lock_guard<std::mutex> lock(endpoint.connections_mutex);
endpoint.connections.erase(connection);
}
if(endpoint.on_close)
endpoint.on_close(connection, status, reason);
}
void connection_error(const std::shared_ptr<Connection> &connection, Endpoint& endpoint, const std::error_code& ec) const {
timer_idle_cancel(connection);
{
std::lock_guard<std::mutex> lock(endpoint.connections_mutex);
endpoint.connections.erase(connection);
}
if(endpoint.on_error) {
std::error_code ec_tmp=ec;
endpoint.on_error(connection, ec_tmp);
}
}
void timer_idle_init(const std::shared_ptr<Connection> &connection) {
if(config.timeout_idle>0) {
connection->timer_idle= std::make_unique<asio::system_timer>(connection->socket->get_io_service());
connection->timer_idle->expires_from_now(std::chrono::seconds(static_cast<unsigned long>(config.timeout_idle)));
timer_idle_expired_function(connection);
}
}
void timer_idle_reset(const std::shared_ptr<Connection> &connection) const {
if(config.timeout_idle>0 && connection->timer_idle->expires_from_now(std::chrono::seconds(static_cast<unsigned long>(config.timeout_idle)))>0)
timer_idle_expired_function(connection);
}
void timer_idle_cancel(const std::shared_ptr<Connection> &connection) const {
if(config.timeout_idle>0)
connection->timer_idle->cancel();
}
void timer_idle_expired_function(const std::shared_ptr<Connection> &connection) const {
connection->timer_idle->async_wait([this, connection](const std::error_code& ec){
if(!ec)
send_close(connection, 1000, "idle timeout"); //1000=normal closure
});
}
};
template<class socket_type>
class SocketServer : public SocketServerBase<socket_type> {
public:
SocketServer(unsigned short port, size_t timeout_request, size_t timeout_idle)
: SocketServerBase<socket_type>(port, timeout_request, timeout_idle)
{
}
};
using WS = asio::ip::tcp::socket;
template<>
class SocketServer<WS> : public SocketServerBase<WS> {
public:
SocketServer() : SocketServerBase<WS>(80) {}
protected:
void accept() override {
//Create new socket for this connection (stored in Connection::socket)
//Shared_ptr is used to pass temporary objects to the asynchronous functions
std::shared_ptr<Connection> connection(new Connection(new WS(*io_context)));
acceptor->async_accept(*connection->socket, [this, connection](const std::error_code& ec) {
//Immediately start accepting a new connection (if io_context hasn't been stopped)
if (ec != asio::error::operation_aborted)
accept();
if(!ec) {
asio::ip::tcp::no_delay option(true);
connection->socket->set_option(option);
read_handshake(connection);
}
});
}
};
class ws_server : public SocketServer<WS> {
public:
ws_server() : SocketServer<WS>::SocketServer() {}
};
}
#endif /* MAME_LIB_UTIL_SERVER_WS_IMPL_HPP */

View File

@ -19,369 +19,373 @@
#include <thread>
#include <list>
class external_panel;
namespace esqpanel {
using external_panel_ptr = std::shared_ptr<external_panel>;
typedef std::map<http_manager::websocket_connection_ptr, external_panel_ptr, std::owner_less<http_manager::websocket_connection_ptr>> connection_to_panel_map;
class external_panel;
enum message_type {
UNKNOWN = 0,
ANALOG = 1 << 0,
BUTTON = 1 << 1,
CONTROL = 1 << 2,
DISPLAY = 1 << 3,
INFO = 1 << 4
};
using external_panel_ptr = std::shared_ptr<external_panel>;
typedef std::map<http_manager::websocket_connection_ptr, external_panel_ptr, std::owner_less<http_manager::websocket_connection_ptr>> connection_to_panel_map;
class external_panel : public std::enable_shared_from_this<external_panel>
{
public:
static int get_message_type(const char c)
{
switch(c)
{
case 'A':
return message_type::ANALOG;
case 'B':
return message_type::BUTTON;
case 'C':
return message_type::CONTROL;
case 'D':
return message_type::DISPLAY;
case 'I':
return message_type::INFO;
default:
return message_type::UNKNOWN;
}
}
external_panel() : m_send_message_types(0)
{
// printf("session: constructed\n");
}
int handle_control_message(const std::string &command)
{
int old = m_send_message_types;
std::istringstream is(command);
if (get_message_type(is.get()) != message_type::CONTROL)
{
return 0;
}
int n;
while (!is.eof()) {
char c = is.get();
int message_type = external_panel::get_message_type(c);
is >> n;
int send = (n != 0);
if (send)
{
m_send_message_types |= message_type;
}
else
{
m_send_message_types &= ~message_type;
}
}
return m_send_message_types ^ old;
}
int send_message_types()
{
return m_send_message_types;
}
bool send_display_data()
{
return m_send_message_types & message_type::DISPLAY;
}
bool send_analog_values()
{
return m_send_message_types & message_type::ANALOG;
}
bool send_buttons()
{
return m_send_message_types & message_type::BUTTON;
}
private:
int m_send_message_types;
};
class esqpanel_external_panel_server
{
public:
enum websocket_opcode {
text = 1,
binary = 2
enum message_type {
UNKNOWN = 0,
ANALOG = 1 << 0,
BUTTON = 1 << 1,
CONTROL = 1 << 2,
DISPLAY = 1 << 3,
INFO = 1 << 4
};
esqpanel_external_panel_server(http_manager *webserver) :
m_server(webserver),
m_keyboard("unknown"),
m_version("1")
{
using namespace std::placeholders;
if (m_server->is_active()) {
m_server->add_endpoint("/esqpanel/socket",
std::bind(&esqpanel_external_panel_server::on_open, this, _1),
std::bind(&esqpanel_external_panel_server::on_message, this, _1, _2, _3),
std::bind(&esqpanel_external_panel_server::on_close, this, _1, _2, _3),
std::bind(&esqpanel_external_panel_server::on_error, this, _1, _2)
);
}
}
virtual ~esqpanel_external_panel_server()
class external_panel
{
if (m_server->is_active()) {
m_server->remove_endpoint("/esqpanel/socket");
}
}
void send_to_all(char c)
{
// printf("server: send_to_all(%02x)\n", ((unsigned int) c) & 0xff);
std::lock_guard<std::recursive_mutex> lock(m_mutex);
// printf("server: sending '%02x' to all\n", ((unsigned int) c) & 0xff);
m_to_send.str("");
m_to_send.put('D');
m_to_send.put(c);
const std::string &s = m_to_send.str();
for (auto iter: m_panels)
public:
static int get_message_type(const char c)
{
external_panel_ptr panel = iter.second;
if (panel->send_display_data())
switch(c)
{
send(iter.first, s);
case 'A':
return message_type::ANALOG;
case 'B':
return message_type::BUTTON;
case 'C':
return message_type::CONTROL;
case 'D':
return message_type::DISPLAY;
case 'I':
return message_type::INFO;
default:
return message_type::UNKNOWN;
}
}
}
void on_open(http_manager::websocket_connection_ptr connection)
external_panel() : m_send_message_types(0)
{
// printf("session: constructed\n");
}
int handle_control_message(const std::string &command)
{
int old = m_send_message_types;
std::istringstream is(command);
if (get_message_type(is.get()) != message_type::CONTROL)
{
return 0;
}
int n;
while (!is.eof()) {
char c = is.get();
int message_type = external_panel::get_message_type(c);
is >> n;
int send = (n != 0);
if (send)
{
m_send_message_types |= message_type;
}
else
{
m_send_message_types &= ~message_type;
}
}
return m_send_message_types ^ old;
}
int send_message_types()
{
return m_send_message_types;
}
bool send_display_data()
{
return m_send_message_types & message_type::DISPLAY;
}
bool send_analog_values()
{
return m_send_message_types & message_type::ANALOG;
}
bool send_buttons()
{
return m_send_message_types & message_type::BUTTON;
}
private:
int m_send_message_types;
};
class external_panel_server
{
using namespace std::placeholders;
std::lock_guard<std::recursive_mutex> lock(m_mutex);
m_panels[connection] = std::make_shared<external_panel>();
}
void on_message(http_manager::websocket_connection_ptr connection, const std::string &payload, int opcode)
{
external_panel_ptr panel = external_panel_for_connection(connection);
const std::string &command = payload;
int t = external_panel::get_message_type(command.front());
if (t == message_type::CONTROL)
public:
enum websocket_opcode {
text = 1,
binary = 2
};
external_panel_server(http_manager *webserver) :
m_server(webserver),
m_keyboard("unknown"),
m_version("1")
{
int changed = panel->handle_control_message(command);
// printf("server: control message, changed = '%x'\n", changed);
if ((changed & message_type::DISPLAY) && panel->send_display_data())
{
// printf("server: control message, sending contents\n");
send_contents(connection);
}
if ((changed & message_type::ANALOG) && panel->send_analog_values())
{
// printf("server: control message, sending analog values\n");
send_analog_values(connection);
}
if ((changed & message_type::BUTTON) && panel->send_buttons())
{
// printf("server: control message, sending button states\n");
send_button_states(connection);
using namespace std::placeholders;
if (m_server->is_active()) {
m_server->add_endpoint("/esqpanel/socket",
std::bind(&external_panel_server::on_open, this, _1),
std::bind(&external_panel_server::on_message, this, _1, _2, _3),
std::bind(&external_panel_server::on_close, this, _1, _2, _3),
std::bind(&external_panel_server::on_error, this, _1, _2)
);
}
}
else if (t == message_type::INFO)
{
std::ostringstream o;
o << "I" << get_keyboard() << "," << get_version();
send(connection, o.str());
}
else
{
{
std::lock_guard<std::recursive_mutex> lock(m_mutex);
m_commands.emplace_back(command);
}
// Echo the non-command message to any other connected panels that want it
virtual ~external_panel_server()
{
if (m_server->is_active()) {
m_server->remove_endpoint("/esqpanel/socket");
}
}
void send_to_all(char c)
{
// printf("server: send_to_all(%02x)\n", ((unsigned int) c) & 0xff);
std::lock_guard<std::recursive_mutex> lock(m_mutex);
// printf("server: sending '%02x' to all\n", ((unsigned int) c) & 0xff);
m_to_send.str("");
m_to_send.put('D');
m_to_send.put(c);
const std::string &s = m_to_send.str();
for (auto iter: m_panels)
{
external_panel_ptr other_panel = iter.second;
if (other_panel != panel && (t & other_panel->send_message_types()) != 0)
external_panel_ptr panel = iter.second;
if (panel->send_display_data())
{
send(iter.first, command);
send(iter.first, s);
}
}
}
}
void on_close(http_manager::websocket_connection_ptr connection, int status, const std::string& reason)
{
std::lock_guard<std::recursive_mutex> lock(m_mutex);
m_panels.erase(connection);
}
void on_error(http_manager::websocket_connection_ptr connection, const std::error_code& error_code)
{
std::lock_guard<std::recursive_mutex> lock(m_mutex);
m_panels.erase(connection);
}
void on_document_request(http_manager::http_request_ptr request, http_manager::http_response_ptr response, const std::string &filename)
{
m_server->serve_document(request, response, filename);
}
void on_template_request(http_manager::http_request_ptr request, http_manager::http_response_ptr response, const std::string &filename)
{
using namespace std::placeholders;
m_server->serve_template(request, response, filename, std::bind(&esqpanel_external_panel_server::get_template_value, this, _1), '$', '$');
}
external_panel_ptr external_panel_for_connection(http_manager::websocket_connection_ptr connection)
{
auto it = m_panels.find(connection);
if (it == m_panels.end()) {
// this connection is not in the list. This really shouldn't happen
// and probably means something else is wrong.
throw std::invalid_argument("No panel avaliable for connection");
}
return it->second;
}
bool has_commands()
{
// printf("server: has_commands()\n");
std::lock_guard<std::recursive_mutex> lock(m_mutex);
return !m_commands.empty();
}
std::string get_next_command()
{
// printf("server: get_next_command()\n");
std::lock_guard<std::recursive_mutex> lock(m_mutex);
std::string command = std::move(m_commands.front());
m_commands.pop_front();
return command;
}
void set_index(const std::string &index)
{
m_index = index;
}
void add_http_document(const std::string &path, const std::string &filename)
{
m_server->remove_http_handler(path);
if (filename != "")
void on_open(http_manager::websocket_connection_ptr connection)
{
using namespace std::placeholders;
m_server->add_http_handler(path, std::bind(&esqpanel_external_panel_server::on_document_request, this, _1, _2, filename));
std::lock_guard<std::recursive_mutex> lock(m_mutex);
m_panels[connection] = std::make_shared<external_panel>();
}
}
void add_http_template(const std::string &path, const std::string &filename)
{
m_server->remove_http_handler(path);
if (filename != "")
void on_message(http_manager::websocket_connection_ptr connection, const std::string &payload, int opcode)
{
using namespace std::placeholders;
m_server->add_http_handler(path, std::bind(&esqpanel_external_panel_server::on_template_request, this, _1, _2, filename));
}
}
external_panel_ptr panel = external_panel_for_connection(connection);
const std::string &command = payload;
void set_content_provider(std::function<bool(std::ostream&)> provider)
{
m_content_provider = provider;
}
int t = external_panel::get_message_type(command.front());
void set_keyboard(const std::string &keyboard)
{
m_keyboard = keyboard;
}
const std::string &get_keyboard() const
{
return m_keyboard;
}
const std::string &get_version() const
{
return m_version;
}
bool get_template_value(std::string &s)
{
if (s == "keyboard")
{
s = m_keyboard;
return true;
}
else if (s == "version")
{
s = m_version;
return true;
}
else
{
return false;
}
}
private:
void send(http_manager::websocket_connection_ptr connection, const std::string &s)
{
connection->send_message(s, websocket_opcode::binary);
}
void send_contents(http_manager::websocket_connection_ptr connection)
{
if (m_content_provider)
{
m_to_send.str("");
m_to_send.put('D');
if (m_content_provider(m_to_send))
if (t == message_type::CONTROL)
{
send(connection, m_to_send.str());
int changed = panel->handle_control_message(command);
// printf("server: control message, changed = '%x'\n", changed);
if ((changed & message_type::DISPLAY) && panel->send_display_data())
{
// printf("server: control message, sending contents\n");
send_contents(connection);
}
if ((changed & message_type::ANALOG) && panel->send_analog_values())
{
// printf("server: control message, sending analog values\n");
send_analog_values(connection);
}
if ((changed & message_type::BUTTON) && panel->send_buttons())
{
// printf("server: control message, sending button states\n");
send_button_states(connection);
}
}
else if (t == message_type::INFO)
{
std::ostringstream o;
o << "I" << get_keyboard() << "," << get_version();
send(connection, o.str());
}
else
{
{
std::lock_guard<std::recursive_mutex> lock(m_mutex);
m_commands.emplace_back(command);
}
// Echo the non-command message to any other connected panels that want it
for (auto iter: m_panels)
{
external_panel_ptr other_panel = iter.second;
if (other_panel != panel && (t & other_panel->send_message_types()) != 0)
{
send(iter.first, command);
}
}
}
}
}
void send_analog_values(http_manager::websocket_connection_ptr connection)
{
// TODO(cbrunschen): get the current analog values and send them
}
void on_close(http_manager::websocket_connection_ptr connection, int status, const std::string& reason)
{
std::lock_guard<std::recursive_mutex> lock(m_mutex);
m_panels.erase(connection);
}
void send_button_states(http_manager::websocket_connection_ptr connection)
{
// TODO(cbrunschen): track current button states and send them
}
void on_error(http_manager::websocket_connection_ptr connection, const std::error_code& error_code)
{
std::lock_guard<std::recursive_mutex> lock(m_mutex);
m_panels.erase(connection);
}
http_manager *m_server;
std::recursive_mutex m_mutex;
void on_document_request(http_manager::http_request_ptr request, http_manager::http_response_ptr response, const std::string &filename)
{
m_server->serve_document(request, response, filename);
}
connection_to_panel_map m_panels;
std::list<std::string> m_commands;
std::thread m_working_thread;
std::ostringstream m_to_send;
void on_template_request(http_manager::http_request_ptr request, http_manager::http_response_ptr response, const std::string &filename)
{
using namespace std::placeholders;
m_server->serve_template(request, response, filename, std::bind(&external_panel_server::get_template_value, this, _1), '$', '$');
}
std::string m_index;
std::string m_keyboard;
std::string m_version;
std::function<bool(std::ostream&)> m_content_provider;
std::map<const std::string, const std::string> m_template_values;
};
external_panel_ptr external_panel_for_connection(http_manager::websocket_connection_ptr connection)
{
auto it = m_panels.find(connection);
if (it == m_panels.end()) {
// this connection is not in the list. This really shouldn't happen
// and probably means something else is wrong.
throw std::invalid_argument("No panel avaliable for connection");
}
return it->second;
}
bool has_commands()
{
// printf("server: has_commands()\n");
std::lock_guard<std::recursive_mutex> lock(m_mutex);
return !m_commands.empty();
}
std::string get_next_command()
{
// printf("server: get_next_command()\n");
std::lock_guard<std::recursive_mutex> lock(m_mutex);
std::string command = std::move(m_commands.front());
m_commands.pop_front();
return command;
}
void set_index(const std::string &index)
{
m_index = index;
}
void add_http_document(const std::string &path, const std::string &filename)
{
m_server->remove_http_handler(path);
if (filename != "")
{
using namespace std::placeholders;
m_server->add_http_handler(path, std::bind(&external_panel_server::on_document_request, this, _1, _2, filename));
}
}
void add_http_template(const std::string &path, const std::string &filename)
{
m_server->remove_http_handler(path);
if (filename != "")
{
using namespace std::placeholders;
m_server->add_http_handler(path, std::bind(&external_panel_server::on_template_request, this, _1, _2, filename));
}
}
void set_content_provider(std::function<bool(std::ostream&)> provider)
{
m_content_provider = provider;
}
void set_keyboard(const std::string &keyboard)
{
m_keyboard = keyboard;
}
const std::string &get_keyboard() const
{
return m_keyboard;
}
const std::string &get_version() const
{
return m_version;
}
bool get_template_value(std::string &s)
{
if (s == "keyboard")
{
s = m_keyboard;
return true;
}
else if (s == "version")
{
s = m_version;
return true;
}
else
{
return false;
}
}
private:
void send(http_manager::websocket_connection_ptr connection, const std::string &s)
{
connection->send_message(s, websocket_opcode::binary);
}
void send_contents(http_manager::websocket_connection_ptr connection)
{
if (m_content_provider)
{
m_to_send.str("");
m_to_send.put('D');
if (m_content_provider(m_to_send))
{
send(connection, m_to_send.str());
}
}
}
void send_analog_values(http_manager::websocket_connection_ptr connection)
{
// TODO(cbrunschen): get the current analog values and send them
}
void send_button_states(http_manager::websocket_connection_ptr connection)
{
// TODO(cbrunschen): track current button states and send them
}
http_manager *m_server;
std::recursive_mutex m_mutex;
connection_to_panel_map m_panels;
std::list<std::string> m_commands;
std::thread m_working_thread;
std::ostringstream m_to_send;
std::string m_index;
std::string m_keyboard;
std::string m_version;
std::function<bool(std::ostream&)> m_content_provider;
std::map<const std::string, const std::string> m_template_values;
};
} // namespace esqpanel
//**************************************************************************
// MACROS / CONSTANTS
@ -423,7 +427,7 @@ void esqpanel_device::device_start()
m_write_tx.resolve_safe();
m_write_analog.resolve_safe();
m_external_panel_server = new esqpanel_external_panel_server(machine().manager().http());
m_external_panel_server = new esqpanel::external_panel_server(machine().manager().http());
if (machine().manager().http()->is_active()) {
m_external_panel_server->set_keyboard(owner()->shortname());
m_external_panel_server->set_index("/esqpanel/FrontPanel.html");

View File

@ -62,7 +62,9 @@
// ======================> esqpanel_device
class esqpanel_external_panel_server;
namespace esqpanel {
class external_panel_server;
}
class esqpanel_device : public device_t, public device_serial_interface
{
@ -107,7 +109,7 @@ protected:
bool m_eps_mode;
esqpanel_external_panel_server *m_external_panel_server;
esqpanel::external_panel_server *m_external_panel_server;
private:
static const int XMIT_RING_SIZE = 16;