From ffbe0c66becfa5cf65aca3fef451cf162b251b4b Mon Sep 17 00:00:00 2001 From: Miodrag Milanovic Date: Sun, 10 Apr 2016 14:02:31 +0200 Subject: [PATCH] Basic TCP server based on libuv [Inaki Baz Castillo,Miodrag Milanovic] --- scripts/src/osd/modules.lua | 12 + src/osd/modules/ipc/raw_tcp_connection.cpp | 29 ++ src/osd/modules/ipc/raw_tcp_connection.h | 30 ++ src/osd/modules/ipc/raw_tcp_server.cpp | 42 +++ src/osd/modules/ipc/raw_tcp_server.h | 36 ++ src/osd/modules/ipc/rtc_tcp_connection.cpp | 150 ++++++++ src/osd/modules/ipc/rtc_tcp_connection.h | 33 ++ src/osd/modules/ipc/rtc_tcp_server.cpp | 42 +++ src/osd/modules/ipc/rtc_tcp_server.h | 36 ++ src/osd/modules/ipc/tcp_connection.cpp | 414 +++++++++++++++++++++ src/osd/modules/ipc/tcp_connection.h | 137 +++++++ src/osd/modules/ipc/tcp_server.cpp | 279 ++++++++++++++ src/osd/modules/ipc/tcp_server.h | 72 ++++ src/osd/modules/output/network.cpp | 37 +- 14 files changed, 1344 insertions(+), 5 deletions(-) create mode 100644 src/osd/modules/ipc/raw_tcp_connection.cpp create mode 100644 src/osd/modules/ipc/raw_tcp_connection.h create mode 100644 src/osd/modules/ipc/raw_tcp_server.cpp create mode 100644 src/osd/modules/ipc/raw_tcp_server.h create mode 100644 src/osd/modules/ipc/rtc_tcp_connection.cpp create mode 100644 src/osd/modules/ipc/rtc_tcp_connection.h create mode 100644 src/osd/modules/ipc/rtc_tcp_server.cpp create mode 100644 src/osd/modules/ipc/rtc_tcp_server.h create mode 100644 src/osd/modules/ipc/tcp_connection.cpp create mode 100644 src/osd/modules/ipc/tcp_connection.h create mode 100644 src/osd/modules/ipc/tcp_server.cpp create mode 100644 src/osd/modules/ipc/tcp_server.h diff --git a/scripts/src/osd/modules.lua b/scripts/src/osd/modules.lua index 9c85a9cc998..91eaa395ba1 100644 --- a/scripts/src/osd/modules.lua +++ b/scripts/src/osd/modules.lua @@ -91,6 +91,18 @@ function osdmodulesbuild() MAME_DIR .. "src/osd/modules/output/none.cpp", MAME_DIR .. "src/osd/modules/output/console.cpp", MAME_DIR .. "src/osd/modules/output/network.cpp", + MAME_DIR .. "src/osd/modules/ipc/tcp_connection.cpp", + MAME_DIR .. "src/osd/modules/ipc/tcp_connection.h", + MAME_DIR .. "src/osd/modules/ipc/tcp_server.cpp", + MAME_DIR .. "src/osd/modules/ipc/tcp_server.h", + MAME_DIR .. "src/osd/modules/ipc/raw_tcp_connection.cpp", + MAME_DIR .. "src/osd/modules/ipc/raw_tcp_connection.h", + MAME_DIR .. "src/osd/modules/ipc/raw_tcp_server.cpp", + MAME_DIR .. "src/osd/modules/ipc/raw_tcp_server.h", + MAME_DIR .. "src/osd/modules/ipc/rtc_tcp_connection.cpp", + MAME_DIR .. "src/osd/modules/ipc/rtc_tcp_connection.h", + MAME_DIR .. "src/osd/modules/ipc/rtc_tcp_server.cpp", + MAME_DIR .. "src/osd/modules/ipc/rtc_tcp_server.h", } includedirs { ext_includedir("uv"), diff --git a/src/osd/modules/ipc/raw_tcp_connection.cpp b/src/osd/modules/ipc/raw_tcp_connection.cpp new file mode 100644 index 00000000000..946aefcafa1 --- /dev/null +++ b/src/osd/modules/ipc/raw_tcp_connection.cpp @@ -0,0 +1,29 @@ +// license:BSD-3-Clause +// copyright-holders:Inaki Baz Castillo,Miodrag Milanovic + +#include "raw_tcp_connection.h" + +/* Instance methods. */ + +raw_tcp_connection::raw_tcp_connection(listener* listener, size_t bufferSize) : + tcp_connection(bufferSize), + m_listener(listener) +{ +} + +raw_tcp_connection::~raw_tcp_connection() +{ +} + +void raw_tcp_connection::user_on_tcp_connection_read() +{ + // We may receive multiple packets in the same TCP chunk. If one of them is + // a DTLS Close Alert this would be closed (close() called) so we cannot call + // our listeners anymore. + if (is_closing()) + return; + + m_listener->on_data_recv(this, m_buffer, m_buffer_data_len); + + m_buffer_data_len = 0; +} diff --git a/src/osd/modules/ipc/raw_tcp_connection.h b/src/osd/modules/ipc/raw_tcp_connection.h new file mode 100644 index 00000000000..7bcfe80bfc6 --- /dev/null +++ b/src/osd/modules/ipc/raw_tcp_connection.h @@ -0,0 +1,30 @@ +// license:BSD-3-Clause +// copyright-holders:Inaki Baz Castillo,Miodrag Milanovic + +#ifndef RAW_TCP_CONNECTION_H +#define RAW_TCP_CONNECTION_H + +#include "tcp_connection.h" + + +class raw_tcp_connection : public tcp_connection +{ +public: + class listener + { + public: + virtual ~listener(){ } + virtual void on_data_recv(raw_tcp_connection *connection, const uint8_t* data, size_t len) = 0; + }; + + raw_tcp_connection(listener* listener, size_t bufferSize); + virtual ~raw_tcp_connection(); + + /* Pure virtual methods inherited from tcp_connection. */ + virtual void user_on_tcp_connection_read() override; + +private: + // Passed by argument. + listener* m_listener; +}; +#endif diff --git a/src/osd/modules/ipc/raw_tcp_server.cpp b/src/osd/modules/ipc/raw_tcp_server.cpp new file mode 100644 index 00000000000..deaee6c5694 --- /dev/null +++ b/src/osd/modules/ipc/raw_tcp_server.cpp @@ -0,0 +1,42 @@ +// license:BSD-3-Clause +// copyright-holders:Inaki Baz Castillo,Miodrag Milanovic + +#include "raw_tcp_server.h" +#include + +#define MAX_TCP_CONNECTIONS_PER_SERVER 10 + +/* Instance methods. */ + +raw_tcp_server::raw_tcp_server(uv_loop_t* loop, const std::string &ip, uint16_t port, int backlog, listener* listener, raw_tcp_connection::listener* connListener) : + tcp_server(loop, ip, port, backlog), + m_listener(listener), + m_conn_listener(connListener) +{ +} + +void raw_tcp_server::user_on_tcp_connection_alloc(tcp_connection** connection) +{ + // Allocate a new raw_tcp_connection for the raw_tcp_server to handle it. + *connection = new raw_tcp_connection(m_conn_listener, 65536); +} + +void raw_tcp_server::user_on_new_tcp_connection(tcp_connection* connection) +{ + // Allow just MAX_TCP_CONNECTIONS_PER_SERVER. + if (get_num_connections() > MAX_TCP_CONNECTIONS_PER_SERVER) + connection->close(); +} + +void raw_tcp_server::user_on_tcp_connection_closed(tcp_connection* connection, bool is_closed_by_peer) +{ + // Notify the listener. + // NOTE: Don't do it if closing (since at this point the listener is already freed). + // At the end, this is just called if the connection was remotely closed. + if (!is_closing()) + m_listener->on_raw_tcp_connection_closed(this, static_cast(connection), is_closed_by_peer); +} + +void raw_tcp_server::user_on_tcp_server_closed() +{ +} diff --git a/src/osd/modules/ipc/raw_tcp_server.h b/src/osd/modules/ipc/raw_tcp_server.h new file mode 100644 index 00000000000..576490094e3 --- /dev/null +++ b/src/osd/modules/ipc/raw_tcp_server.h @@ -0,0 +1,36 @@ +// license:BSD-3-Clause +// copyright-holders:Inaki Baz Castillo,Miodrag Milanovic + +#ifndef RAW_TCP_SERVER_H +#define RAW_TCP_SERVER_H + +#include "tcp_server.h" +#include "tcp_connection.h" +#include "raw_tcp_connection.h" +#include + +class raw_tcp_server : public tcp_server +{ +public: + class listener + { + public: + virtual ~listener() { } + virtual void on_raw_tcp_connection_closed(raw_tcp_server* tcpServer, raw_tcp_connection* connection, bool is_closed_by_peer) = 0; + }; + + raw_tcp_server(uv_loop_t* loop, const std::string &ip, uint16_t port, int backlog, listener* listener, raw_tcp_connection::listener* connListener); + + /* Pure virtual methods inherited from TCPServer. */ + virtual void user_on_tcp_connection_alloc(tcp_connection** connection) override; + virtual void user_on_new_tcp_connection(tcp_connection* connection) override; + virtual void user_on_tcp_connection_closed(tcp_connection* connection, bool is_closed_by_peer) override; + virtual void user_on_tcp_server_closed() override; + +private: + // Passed by argument. + listener* m_listener; + raw_tcp_connection::listener* m_conn_listener; +}; + +#endif diff --git a/src/osd/modules/ipc/rtc_tcp_connection.cpp b/src/osd/modules/ipc/rtc_tcp_connection.cpp new file mode 100644 index 00000000000..1cc800b7d77 --- /dev/null +++ b/src/osd/modules/ipc/rtc_tcp_connection.cpp @@ -0,0 +1,150 @@ +// license:BSD-3-Clause +// copyright-holders:Inaki Baz Castillo,Miodrag Milanovic +#include "emu.h" +#include "rtc_tcp_connection.h" + +/* Instance methods. */ + +rtc_tcp_connection::rtc_tcp_connection(listener* listener, size_t bufferSize) : + tcp_connection(bufferSize), + m_listener(listener), + m_frame_start(0) +{ +} + +rtc_tcp_connection::~rtc_tcp_connection() +{ +} + +inline uint16_t get2bytes(const uint8_t* data, size_t i) +{ + return (uint16_t)(data[i + 1]) | ((uint16_t)(data[i])) << 8; +} + +inline void set2bytes(uint8_t* data, size_t i, uint16_t value) +{ + data[i + 1] = (uint8_t)(value); + data[i] = (uint8_t)(value >> 8); +} + + +void rtc_tcp_connection::user_on_tcp_connection_read() +{ + /* + * Framing RFC 4571 + * + * 0 1 2 3 + * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + * --------------------------------------------------------------- + * | LENGTH | STUN / DTLS / RTP / RTCP | + * --------------------------------------------------------------- + * + * A 16-bit unsigned integer LENGTH field, coded in network byte order + * (big-endian), begins the frame. If LENGTH is non-zero, an RTP or + * RTCP packet follows the LENGTH field. The value coded in the LENGTH + * field MUST equal the number of octets in the RTP or RTCP packet. + * Zero is a valid value for LENGTH, and it codes the null packet. + */ + + // Be ready to parse more than a single frame in a single TCP chunk. + while (true) + { + // We may receive multiple packets in the same TCP chunk. If one of them is + // a DTLS Close Alert this would be closed (Close() called) so we cannot call + // our listeners anymore. + if (is_closing()) + return; + + size_t data_len = m_buffer_data_len - m_frame_start; + size_t packet_len = 0; + + if (data_len >= 2) + packet_len = (size_t)get2bytes(m_buffer + m_frame_start, 0); + + // We have packet_len bytes. + if ((data_len >= 2) && data_len >= 2 + packet_len) + { + const uint8_t* packet = m_buffer + m_frame_start + 2; + + // Notify the listener. + if (packet_len != 0) + { + m_listener->on_packet_recv(this, packet, packet_len); + } + + // If there is no more space available in the buffer and that is because + // the latest parsed frame filled it, then empty the full buffer. + if ((m_frame_start + 2 + packet_len) == m_buffer_size) + { + osd_printf_error("no more space in the buffer, emptying the buffer data"); + + m_frame_start = 0; + m_buffer_data_len = 0; + } + // If there is still space in the buffer, set the beginning of the next + // frame to the next position after the parsed frame. + else + { + m_frame_start += 2 + packet_len; + } + + // If there is more data in the buffer after the parsed frame then + // parse again. Otherwise break here and wait for more data. + if (m_buffer_data_len > m_frame_start) + { + // osd_printf_error("there is more data after the parsed frame, continue parsing"); + + continue; + } + else + { + break; + } + } + else // Incomplete packet. + { + // Check if the buffer is full. + if (m_buffer_data_len == m_buffer_size) + { + // First case: the incomplete frame does not begin at position 0 of + // the buffer, so move the frame to the position 0. + if (m_frame_start != 0) + { + // osd_printf_error("no more space in the buffer, moving parsed bytes to the beginning of the buffer and wait for more data"); + + std::memmove(m_buffer, m_buffer + m_frame_start, m_buffer_size - m_frame_start); + m_buffer_data_len = m_buffer_size - m_frame_start; + m_frame_start = 0; + } + // Second case: the incomplete frame begins at position 0 of the buffer. + // The frame is too big, so close the connection. + else + { + osd_printf_error("no more space in the buffer for the unfinished frame being parsed, closing the connection"); + + // Close the socket. + close(); + } + } + // The buffer is not full. + else + { + osd_printf_verbose("frame not finished yet, waiting for more data"); + } + + // Exit the parsing loop. + break; + } + } +} + +void rtc_tcp_connection::send(const uint8_t* data, size_t len) +{ + // Write according to Framing RFC 4571. + + uint8_t frame_len[2]; + + set2bytes(frame_len, 0, len); + + write(frame_len, 2, data, len); +} diff --git a/src/osd/modules/ipc/rtc_tcp_connection.h b/src/osd/modules/ipc/rtc_tcp_connection.h new file mode 100644 index 00000000000..9ec74b90537 --- /dev/null +++ b/src/osd/modules/ipc/rtc_tcp_connection.h @@ -0,0 +1,33 @@ +// license:BSD-3-Clause +// copyright-holders:Inaki Baz Castillo,Miodrag Milanovic + +#ifndef RTC_TCP_CONNECTION_H +#define RTC_TCP_CONNECTION_H + +#include "tcp_connection.h" + + +class rtc_tcp_connection : public tcp_connection +{ +public: + class listener + { + public: + virtual ~listener(){ } + virtual void on_packet_recv(rtc_tcp_connection *connection, const uint8_t* data, size_t len) = 0; + }; + + rtc_tcp_connection(listener* listener, size_t bufferSize); + virtual ~rtc_tcp_connection(); + + /* Pure virtual methods inherited from tcp_connection. */ + virtual void user_on_tcp_connection_read() override; + + void send(const uint8_t* data, size_t len); + +private: + // Passed by argument. + listener* m_listener; + size_t m_frame_start; // Where the latest frame starts. +}; +#endif diff --git a/src/osd/modules/ipc/rtc_tcp_server.cpp b/src/osd/modules/ipc/rtc_tcp_server.cpp new file mode 100644 index 00000000000..52345454983 --- /dev/null +++ b/src/osd/modules/ipc/rtc_tcp_server.cpp @@ -0,0 +1,42 @@ +// license:BSD-3-Clause +// copyright-holders:Inaki Baz Castillo,Miodrag Milanovic + +#include "rtc_tcp_server.h" +#include + +#define MAX_TCP_CONNECTIONS_PER_SERVER 10 + +/* Instance methods. */ + +rtc_tcp_server::rtc_tcp_server(uv_loop_t* loop, const std::string &ip, uint16_t port, int backlog, listener* listener, rtc_tcp_connection::listener* connListener) : + tcp_server(loop, ip, port, backlog), + m_listener(listener), + m_conn_listener(connListener) +{ +} + +void rtc_tcp_server::user_on_tcp_connection_alloc(tcp_connection** connection) +{ + // Allocate a new rtc_tcp_connection for the rtc_tcp_server to handle it. + *connection = new rtc_tcp_connection(m_conn_listener, 65536); +} + +void rtc_tcp_server::user_on_new_tcp_connection(tcp_connection* connection) +{ + // Allow just MAX_TCP_CONNECTIONS_PER_SERVER. + if (get_num_connections() > MAX_TCP_CONNECTIONS_PER_SERVER) + connection->close(); +} + +void rtc_tcp_server::user_on_tcp_connection_closed(tcp_connection* connection, bool is_closed_by_peer) +{ + // Notify the listener. + // NOTE: Don't do it if closing (since at this point the listener is already freed). + // At the end, this is just called if the connection was remotely closed. + if (!is_closing()) + m_listener->on_rtc_tcp_connection_closed(this, static_cast(connection), is_closed_by_peer); +} + +void rtc_tcp_server::user_on_tcp_server_closed() +{ +} diff --git a/src/osd/modules/ipc/rtc_tcp_server.h b/src/osd/modules/ipc/rtc_tcp_server.h new file mode 100644 index 00000000000..f0733c95397 --- /dev/null +++ b/src/osd/modules/ipc/rtc_tcp_server.h @@ -0,0 +1,36 @@ +// license:BSD-3-Clause +// copyright-holders:Inaki Baz Castillo,Miodrag Milanovic + +#ifndef RTC_TCP_SERVER_H +#define RTC_TCP_SERVER_H + +#include "tcp_server.h" +#include "tcp_connection.h" +#include "rtc_tcp_connection.h" +#include + +class rtc_tcp_server : public tcp_server +{ +public: + class listener + { + public: + virtual ~listener() { } + virtual void on_rtc_tcp_connection_closed(rtc_tcp_server* tcpServer, rtc_tcp_connection* connection, bool is_closed_by_peer) = 0; + }; + + rtc_tcp_server(uv_loop_t* loop, const std::string &ip, uint16_t port, int backlog, listener* listener, rtc_tcp_connection::listener* connListener); + + /* Pure virtual methods inherited from TCPServer. */ + virtual void user_on_tcp_connection_alloc(tcp_connection** connection) override; + virtual void user_on_new_tcp_connection(tcp_connection* connection) override; + virtual void user_on_tcp_connection_closed(tcp_connection* connection, bool is_closed_by_peer) override; + virtual void user_on_tcp_server_closed() override; + +private: + // Passed by argument. + listener* m_listener; + rtc_tcp_connection::listener* m_conn_listener; +}; + +#endif diff --git a/src/osd/modules/ipc/tcp_connection.cpp b/src/osd/modules/ipc/tcp_connection.cpp new file mode 100644 index 00000000000..fb6af2f976a --- /dev/null +++ b/src/osd/modules/ipc/tcp_connection.cpp @@ -0,0 +1,414 @@ +// license:BSD-3-Clause +// copyright-holders:Inaki Baz Castillo,Miodrag Milanovic + +#include "emu.h" +#include "tcp_connection.h" +#include // uint8_t, etc +#include // std::malloc(), std::free() + +/* Static methods for UV callbacks. */ + +static inline void on_alloc(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) +{ + static_cast(handle->data)->on_uv_read_alloc(suggested_size, buf); +} + +static inline void on_read(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) +{ + static_cast(handle->data)->on_uv_read(nread, buf); +} + +static inline void on_write(uv_write_t* req, int status) +{ + tcp_connection::tcp_uv_write_data* write_data = static_cast(req->data); + tcp_connection* connection = write_data->connection; + + // Delete the UvWriteData struct (which includes the uv_req_t and the store char[]). + std::free(write_data); + + // Just notify the TCPConnection when error. + if (status) + connection->on_uv_write_error(status); +} + +static inline void on_shutdown(uv_shutdown_t* req, int status) +{ + static_cast(req->data)->on_uv_shutdown(req, status); +} + +static inline void on_close(uv_handle_t* handle) +{ + static_cast(handle->data)->on_uv_closed(); +} + +/* Instance methods. */ + +tcp_connection::tcp_connection(size_t bufferSize) : + m_listener(nullptr), + m_local_addr(nullptr), + m_is_closing(false), + m_is_closed_by_peer(false), + m_has_error(false), + m_buffer_size(bufferSize), + m_buffer(nullptr), + m_buffer_data_len(0), + m_local_port(0), + m_peer_port(0) +{ + m_uv_handle = new uv_tcp_t; + m_uv_handle->data = (void*)this; + + // NOTE: Don't allocate the buffer here. Instead wait for the first uv_alloc_cb(). +} + +tcp_connection::~tcp_connection() +{ + if (m_uv_handle) + delete m_uv_handle; + if (m_buffer) + delete[] m_buffer; +} + +void tcp_connection::setup(uv_loop_t* loop, listener* listener, struct sockaddr_storage* localAddr, const std::string &localIP, uint16_t localPort) +{ + int err; + + // Set the UV handle. + err = uv_tcp_init(loop, m_uv_handle); + if (err) + { + delete m_uv_handle; + m_uv_handle = nullptr; + throw emu_fatalerror("uv_tcp_init() failed: %s", uv_strerror(err)); + } + + // Set the listener. + m_listener = listener; + + // Set the local address. + m_local_addr = localAddr; + m_local_ip = localIP; + m_local_port = localPort; +} + +void tcp_connection::close() +{ + if (m_is_closing) + return; + + int err; + + m_is_closing = true; + + // Don't read more. + err = uv_read_stop((uv_stream_t*)m_uv_handle); + if (err) + throw emu_fatalerror("uv_read_stop() failed: %s", uv_strerror(err)); + + // If there is no error and the peer didn't close its connection side then close gracefully. + if (!m_has_error && !m_is_closed_by_peer) + { + // Use uv_shutdown() so pending data to be written will be sent to the peer + // before closing. + uv_shutdown_t* req = new uv_shutdown_t; + req->data = (void*)this; + err = uv_shutdown(req, (uv_stream_t*)m_uv_handle, (uv_shutdown_cb)on_shutdown); + if (err) + throw emu_fatalerror("uv_shutdown() failed: %s", uv_strerror(err)); + } + // Otherwise directly close the socket. + else + { + uv_close((uv_handle_t*)m_uv_handle, (uv_close_cb)on_close); + } +} + +void tcp_connection::terminate() +{ + if (m_is_closing) + return; + m_is_closing = true; + uv_close((uv_handle_t*)m_uv_handle, (uv_close_cb)on_close); +} + +void tcp_connection::dump() +{ + osd_printf_verbose("[TCP, local:%s :%d, remote:%s :%d, status:%s]\r\n", + m_local_ip.c_str(), (uint16_t)m_local_port, + m_peer_ip.c_str(), (uint16_t)m_peer_port, + (!m_is_closing) ? "open" : "closed"); +} + +void tcp_connection::start() +{ + if (m_is_closing) + return; + + int err; + + err = uv_read_start((uv_stream_t*)m_uv_handle, (uv_alloc_cb)on_alloc, (uv_read_cb)on_read); + if (err) + throw emu_fatalerror("uv_read_start() failed: %s", uv_strerror(err)); + + // Get the peer address. + if (!set_peer_address()) + throw emu_fatalerror("error setting peer IP and port"); +} + +void tcp_connection::write(const uint8_t* data, size_t len) +{ + if (m_is_closing) + return; + + if (len == 0) + return; + + uv_buf_t buffer; + int written; + int err; + + // First try uv_try_write(). In case it can not directly write all the given + // data then build a uv_req_t and use uv_write(). + + buffer = uv_buf_init((char*)data, len); + written = uv_try_write((uv_stream_t*)m_uv_handle, &buffer, 1); + + // All the data was written. Done. + if (written == (int)len) + { + return; + } + // Cannot write any data at first time. Use uv_write(). + else if (written == UV_EAGAIN || written == UV_ENOSYS) + { + // Set written to 0 so pending_len can be properly calculated. + written = 0; + } + // Error. Should not happen. + else if (written < 0) + { + osd_printf_warning("uv_try_write() failed, closing the connection: %s", uv_strerror(written)); + + close(); + return; + } + + // osd_printf_info("could just write %zu bytes (%zu given) at first time, using uv_write() now", (size_t)written, len); + + size_t pending_len = len - written; + + // Allocate a special UvWriteData struct pointer. + tcp_uv_write_data* write_data = (tcp_uv_write_data*)std::malloc(sizeof(tcp_uv_write_data) + pending_len); + + write_data->connection = this; + std::memcpy(write_data->store, data + written, pending_len); + write_data->req.data = (void*)write_data; + + buffer = uv_buf_init((char*)write_data->store, pending_len); + + err = uv_write(&write_data->req, (uv_stream_t*)m_uv_handle, &buffer, 1, (uv_write_cb)on_write); + if (err) + throw emu_fatalerror("uv_write() failed: %s", uv_strerror(err)); +} + +void tcp_connection::write(const uint8_t* data1, size_t len1, const uint8_t* data2, size_t len2) +{ + if (m_is_closing) + return; + + if (len1 == 0 && len2 == 0) + return; + + size_t total_len = len1 + len2; + uv_buf_t buffers[2]; + int written; + int err; + + // First try uv_try_write(). In case it can not directly write all the given + // data then build a uv_req_t and use uv_write(). + + buffers[0] = uv_buf_init((char*)data1, len1); + buffers[1] = uv_buf_init((char*)data2, len2); + written = uv_try_write((uv_stream_t*)m_uv_handle, buffers, 2); + + // All the data was written. Done. + if (written == (int)total_len) + { + return; + } + // Cannot write any data at first time. Use uv_write(). + else if (written == UV_EAGAIN || written == UV_ENOSYS) + { + // Set written to 0 so pending_len can be properly calculated. + written = 0; + } + // Error. Should not happen. + else if (written < 0) + { + osd_printf_warning("uv_try_write() failed, closing the connection: %s", uv_strerror(written)); + + close(); + return; + } + + // osd_printf_info("could just write %zu bytes (%zu given) at first time, using uv_write() now", (size_t)written, total_len); + + size_t pending_len = total_len - written; + + // Allocate a special UvWriteData struct pointer. + tcp_uv_write_data* write_data = (tcp_uv_write_data*)std::malloc(sizeof(tcp_uv_write_data) + pending_len); + + write_data->connection = this; + // If the first buffer was not entirely written then splice it. + if ((size_t)written < len1) + { + std::memcpy(write_data->store, data1 + (size_t)written, len1 - (size_t)written); + std::memcpy(write_data->store + (len1 - (size_t)written), data2, len2); + } + // Otherwise just take the pending data in the second buffer. + else + { + std::memcpy(write_data->store, data2 + ((size_t)written - len1), len2 - ((size_t)written - len1)); + } + write_data->req.data = (void*)write_data; + + uv_buf_t buffer = uv_buf_init((char*)write_data->store, pending_len); + + err = uv_write(&write_data->req, (uv_stream_t*)m_uv_handle, &buffer, 1, (uv_write_cb)on_write); + if (err) + throw emu_fatalerror("uv_write() failed: %s", uv_strerror(err)); +} + +void GetAddressInfo(const struct sockaddr* addr, int* family, std::string &ip, uint16_t* port) +{ + char _ip[INET6_ADDRSTRLEN + 1]; + int err; + + + err = uv_inet_ntop(AF_INET, &((struct sockaddr_in*)addr)->sin_addr, _ip, INET_ADDRSTRLEN); + if (err) + throw emu_fatalerror("uv_inet_ntop() failed: %s", uv_strerror(err)); + *port = (uint16_t)ntohs(((struct sockaddr_in*)addr)->sin_port); + + *family = addr->sa_family; + ip.assign(_ip); +} +bool tcp_connection::set_peer_address() +{ + int err; + int len = sizeof(m_peer_addr); + + err = uv_tcp_getpeername(m_uv_handle, (struct sockaddr*)&m_peer_addr, &len); + if (err) + { + osd_printf_error("uv_tcp_getpeername() failed: %s", uv_strerror(err)); + + return false; + } + + int family; + GetAddressInfo((const struct sockaddr*)&m_peer_addr, &family, m_peer_ip, &m_peer_port); + + return true; +} + +void tcp_connection::on_uv_read_alloc(size_t suggested_size, uv_buf_t* buf) +{ + // If this is the first call to onUvReadAlloc() then allocate the receiving buffer now. + if (!m_buffer) + m_buffer = new uint8_t[m_buffer_size]; + + // Tell UV to write after the last data byte in the buffer. + buf->base = (char *)(m_buffer + m_buffer_data_len); + // Give UV all the remaining space in the buffer. + if (m_buffer_size > m_buffer_data_len) + { + buf->len = m_buffer_size - m_buffer_data_len; + } + else + { + buf->len = 0; + + osd_printf_warning("no available space in the buffer"); + } +} + +void tcp_connection::on_uv_read(::ssize_t nread, const uv_buf_t* buf) +{ + if (m_is_closing) + return; + + if (nread == 0) + return; + + // Data received. + if (nread > 0) + { + // Update the buffer data length. + m_buffer_data_len += (size_t)nread; + + // Notify the subclass. + user_on_tcp_connection_read(); + } + // Client disconneted. + else if (nread == UV_EOF || nread == UV_ECONNRESET) + { + osd_printf_info("connection closed by peer, closing server side"); + + m_is_closed_by_peer = true; + + // Close server side of the connection. + close(); + } + // Some error. + else + { + osd_printf_info("read error, closing the connection: %s", uv_strerror(nread)); + + m_has_error = true; + + // Close server side of the connection. + close(); + } +} + +void tcp_connection::on_uv_write_error(int error) +{ + if (m_is_closing) + return; + + if (error == UV_EPIPE || error == UV_ENOTCONN) + { + osd_printf_info("write error, closing the connection: %s", uv_strerror(error)); + } + else + { + osd_printf_info("write error, closing the connection: %s", uv_strerror(error)); + + m_has_error = true; + } + + close(); +} + +void tcp_connection::on_uv_shutdown(uv_shutdown_t* req, int status) +{ + delete req; + + if (status == UV_EPIPE || status == UV_ENOTCONN || status == UV_ECANCELED) + osd_printf_info("shutdown error: %s", uv_strerror(status)); + else if (status) + osd_printf_info("shutdown error: %s", uv_strerror(status)); + + // Now do close the handle. + uv_close((uv_handle_t*)m_uv_handle, (uv_close_cb)on_close); +} + +void tcp_connection::on_uv_closed() +{ + // Notify the listener. + m_listener->on_tcp_connection_closed(this, m_is_closed_by_peer); + + // And delete this. + delete this; +} diff --git a/src/osd/modules/ipc/tcp_connection.h b/src/osd/modules/ipc/tcp_connection.h new file mode 100644 index 00000000000..55e70d39688 --- /dev/null +++ b/src/osd/modules/ipc/tcp_connection.h @@ -0,0 +1,137 @@ +// license:BSD-3-Clause +// copyright-holders:Inaki Baz Castillo,Miodrag Milanovic + +#ifndef TCP_CONNECTION_H +#define TCP_CONNECTION_H + +#include +#include + +extern void GetAddressInfo(const sockaddr * addr, int * family, std::string & ip, uint16_t * port); + +class tcp_connection +{ +public: + class listener + { + public: + virtual ~listener() { } + virtual void on_tcp_connection_closed(tcp_connection* connection, bool is_closed_by_peer) = 0; + }; + + /* Struct for the data field of uv_req_t when writing into the connection. */ + struct tcp_uv_write_data + { + tcp_connection* connection; + uv_write_t req; + uint8_t store[1]; + }; + + tcp_connection(size_t bufferSize); + virtual ~tcp_connection(); + + void close(); + void terminate(); + virtual void dump(); + void setup(uv_loop_t* loop, listener* listener, struct sockaddr_storage* localAddr, const std::string &localIP, uint16_t localPort); + bool is_closing(); + uv_tcp_t* get_uv_handle(); + void start(); + void write(const uint8_t* data, size_t len); + void write(const uint8_t* data1, size_t len1, const uint8_t* data2, size_t len2); + void write(const std::string &data); + const struct sockaddr* get_local_address(); + const std::string& get_local_ip(); + uint16_t get_local_port(); + const struct sockaddr* get_peer_address(); + const std::string& get_peer_ip(); + uint16_t get_peer_port(); + +private: + bool set_peer_address(); + + /* Callbacks fired by UV events. */ +public: + void on_uv_read_alloc(size_t suggested_size, uv_buf_t* buf); + void on_uv_read(ssize_t nread, const uv_buf_t* buf); + void on_uv_write_error(int error); + void on_uv_shutdown(uv_shutdown_t* req, int status); + void on_uv_closed(); + + /* Pure virtual methods that must be implemented by the subclass. */ +protected: + virtual void user_on_tcp_connection_read() = 0; + +private: + // Passed by argument. + listener* m_listener; + // Allocated by this. + uv_tcp_t* m_uv_handle; + // Others. + struct sockaddr_storage* m_local_addr; + bool m_is_closing; + bool m_is_closed_by_peer; + bool m_has_error; + +protected: + // Passed by argument. + size_t m_buffer_size; + // Allocated by this. + uint8_t* m_buffer; + // Others. + size_t m_buffer_data_len; + std::string m_local_ip; + uint16_t m_local_port; + struct sockaddr_storage m_peer_addr; + std::string m_peer_ip; + uint16_t m_peer_port; +}; + +/* Inline methods. */ + +inline bool tcp_connection::is_closing() +{ + return m_is_closing; +} + +inline uv_tcp_t* tcp_connection::get_uv_handle() +{ + return m_uv_handle; +} + +inline void tcp_connection::write(const std::string &data) +{ + write((const uint8_t*)data.c_str(), data.size()); +} + +inline const sockaddr* tcp_connection::get_local_address() +{ + return (const struct sockaddr*)m_local_addr; +} + +inline const std::string& tcp_connection::get_local_ip() +{ + return m_local_ip; +} + +inline uint16_t tcp_connection::get_local_port() +{ + return m_local_port; +} + +inline const sockaddr* tcp_connection::get_peer_address() +{ + return (const struct sockaddr*)&m_peer_addr; +} + +inline const std::string& tcp_connection::get_peer_ip() +{ + return m_peer_ip; +} + +inline uint16_t tcp_connection::get_peer_port() +{ + return m_peer_port; +} + +#endif diff --git a/src/osd/modules/ipc/tcp_server.cpp b/src/osd/modules/ipc/tcp_server.cpp new file mode 100644 index 00000000000..a4de2df12d9 --- /dev/null +++ b/src/osd/modules/ipc/tcp_server.cpp @@ -0,0 +1,279 @@ +// license:BSD-3-Clause +// copyright-holders:Inaki Baz Castillo,Miodrag Milanovic + +#include "emu.h" +#include "tcp_server.h" + +/* Static methods for UV callbacks. */ + +static inline void on_connection(uv_stream_t* handle, int status) +{ + static_cast(handle->data)->on_uv_connection(status); +} + +static inline void on_close(uv_handle_t* handle) +{ + static_cast(handle->data)->on_uv_closed(); +} + +static inline void on_error_close(uv_handle_t* handle) +{ + delete handle; +} + +/* Instance methods. */ + +tcp_server::tcp_server(uv_loop_t* loop, const std::string &ip, uint16_t port, int backlog) + : m_uv_handle(nullptr), + m_loop(nullptr), + m_is_closing(false), + m_local_port(0) +{ + int err; + int flags = 0; + + m_uv_handle = new uv_tcp_t; + m_uv_handle->data = (void*)this; + m_loop = loop; + + err = uv_tcp_init(loop, m_uv_handle); + if (err) + { + delete m_uv_handle; + m_uv_handle = nullptr; + throw emu_fatalerror("uv_tcp_init() failed: %s", uv_strerror(err)); + } + + struct sockaddr_storage bind_addr; + + + err = uv_ip4_addr(ip.c_str(), (int)port, (struct sockaddr_in*)&bind_addr); + if (err) + throw emu_fatalerror("uv_ipv4_addr() failed: %s", uv_strerror(err)); + + err = uv_tcp_bind(m_uv_handle, (const struct sockaddr*)&bind_addr, flags); + if (err) + { + uv_close((uv_handle_t*)m_uv_handle, (uv_close_cb)on_error_close); + throw emu_fatalerror("uv_tcp_bind() failed: %s", uv_strerror(err)); + } + + err = uv_listen((uv_stream_t*)m_uv_handle, backlog, (uv_connection_cb)on_connection); + if (err) + { + uv_close((uv_handle_t*)m_uv_handle, (uv_close_cb)on_error_close); + throw emu_fatalerror("uv_listen() failed: %s", uv_strerror(err)); + } + + // Set local address. + if (!set_local_address()) + { + uv_close((uv_handle_t*)m_uv_handle, (uv_close_cb)on_error_close); + throw emu_fatalerror("error setting local IP and port"); + } +} + +tcp_server::~tcp_server() +{ + if (m_uv_handle) + delete m_uv_handle; +} + +void tcp_server::close() +{ + if (m_is_closing) + return; + + m_is_closing = true; + + // If there are no connections then close now. + if (m_connections.empty()) + { + uv_close((uv_handle_t*)m_uv_handle, (uv_close_cb)on_close); + } + // Otherwise close all the connections (but not the TCP server). + else + { + osd_printf_info("closing %d active connections", (int)m_connections.size()); + + for (auto it = m_connections.begin(); it != m_connections.end(); ++it) + { + tcp_connection* connection = *it; + connection->close(); + } + } +} + +void tcp_server::terminate() +{ + if (m_is_closing) + return; + + m_is_closing = true; + + // If there are no connections then close now. + if (m_connections.empty()) + { + uv_close((uv_handle_t*)m_uv_handle, (uv_close_cb)on_close); + } + // Otherwise close all the connections (but not the TCP server). + else + { + osd_printf_info("closing %d active connections", (int)m_connections.size()); + + for (auto it = m_connections.begin(); it != m_connections.end(); ++it) + { + tcp_connection* connection = *it; + connection->terminate(); + } + } +} + +void tcp_server::send_to_all(const uint8_t* data, size_t len) +{ + // If there are no connections then close now. + if (!m_connections.empty()) + { + for (auto it = m_connections.begin(); it != m_connections.end(); ++it) + { + tcp_connection* connection = *it; + connection->write(data, len); + } + } +} + +void tcp_server::dump() +{ + osd_printf_info("[TCP, local:%s :%d, status:%s, connections:%d]", + m_local_ip.c_str(), (uint16_t)m_local_port, + (!m_is_closing) ? "open" : "closed", + (int)m_connections.size()); +} + +const sockaddr* tcp_server::get_local_address() +{ + return (const struct sockaddr*)&m_local_addr; +} + +const std::string& tcp_server::get_local_ip() +{ + return m_local_ip; +} + +uint16_t tcp_server::get_local_port() +{ + return m_local_port; +} + +bool tcp_server::set_local_address() +{ + int err; + int len = sizeof(m_local_addr); + + err = uv_tcp_getsockname(m_uv_handle, (struct sockaddr*)&m_local_addr, &len); + if (err) + { + osd_printf_error("uv_tcp_getsockname() failed: %s", uv_strerror(err)); + + return false; + } + + int family; + GetAddressInfo((const struct sockaddr*)&m_local_addr, &family, m_local_ip, &m_local_port); + + return true; +} + +inline void tcp_server::on_uv_connection(int status) +{ + if (m_is_closing) + return; + + int err; + + if (status) + { + osd_printf_error("error while receiving a new TCP connection: %s", uv_strerror(status)); + + return; + } + + // Notify the subclass so it provides an allocated derived class of TCPConnection. + tcp_connection* connection = nullptr; + user_on_tcp_connection_alloc(&connection); + if (connection != nullptr) + osd_printf_error("tcp_server pointer was not allocated by the user"); + + try + { + connection->setup(m_loop, this, &(m_local_addr), m_local_ip, m_local_port); + } + catch (...) + { + delete connection; + return; + } + + // Accept the connection. + err = uv_accept((uv_stream_t*)m_uv_handle, (uv_stream_t*)connection->get_uv_handle()); + if (err) + throw emu_fatalerror("uv_accept() failed: %s", uv_strerror(err)); + + // Insert the TCPConnection in the set. + m_connections.insert(connection); + + // Start receiving data. + try + { + connection->start(); + } + catch (emu_exception &error) + { + osd_printf_error("cannot run the TCP connection, closing the connection: %s", error.what()); + connection->close(); + // NOTE: Don't return here so the user won't be notified about a "onclose" for a TCP connection + // for which there was not a previous "onnew" event. + } + + osd_printf_info("new TCP connection:"); + connection->dump(); + + // Notify the subclass. + user_on_new_tcp_connection(connection); +} + +void tcp_server::on_uv_closed() +{ + // Motify the subclass. + user_on_tcp_server_closed(); +} + +void tcp_server::on_tcp_connection_closed(tcp_connection* connection, bool is_closed_by_peer) +{ + // NOTE: + // Worst scenario is that in which this is the latest connection, + // which is remotely closed (no tcp_server.Close() was called) and the user + // call tcp_server.Close() on userOnTCPConnectionClosed() callback, so Close() + // is called with zero connections and calls uv_close(), but then + // onTCPConnectionClosed() continues and finds that isClosing is true and + // there are zero connections, so calls uv_close() again and get a crash. + // + // SOLUTION: + // Check isClosing value *before* onTCPConnectionClosed() callback. + + bool wasClosing = m_is_closing; + + osd_printf_info("TCP connection closed:"); + connection->dump(); + + // Remove the TCPConnection from the set. + m_connections.erase(connection); + + // Notify the subclass. + user_on_tcp_connection_closed(connection, is_closed_by_peer); + + // Check if the server was closing connections, and if this is the last + // connection then close the server now. + if (wasClosing && m_connections.empty()) + uv_close((uv_handle_t*)m_uv_handle, (uv_close_cb)on_close); +} diff --git a/src/osd/modules/ipc/tcp_server.h b/src/osd/modules/ipc/tcp_server.h new file mode 100644 index 00000000000..3a4c75de2d7 --- /dev/null +++ b/src/osd/modules/ipc/tcp_server.h @@ -0,0 +1,72 @@ +// license:BSD-3-Clause +// copyright-holders:Inaki Baz Castillo,Miodrag Milanovic + +#ifndef TCP_SERVER_H +#define TCP_SERVER_H + +#include "tcp_connection.h" +#include +#include +#include + +class tcp_server : public tcp_connection::listener +{ +public: + tcp_server(uv_loop_t* m_loop, const std::string &ip, uint16_t port, int backlog); + virtual ~tcp_server(); + + void close(); + void terminate(); + virtual void dump(); + void send_to_all(const uint8_t* data, size_t len); + bool is_closing(); + const struct sockaddr* get_local_address(); + const std::string& get_local_ip(); + uint16_t get_local_port(); + size_t get_num_connections(); + +private: + bool set_local_address(); + + /* Pure virtual methods that must be implemented by the subclass. */ +protected: + virtual void user_on_tcp_connection_alloc(tcp_connection** connection) = 0; + virtual void user_on_new_tcp_connection(tcp_connection* connection) = 0; + virtual void user_on_tcp_connection_closed(tcp_connection* connection, bool is_closed_by_peer) = 0; + virtual void user_on_tcp_server_closed() = 0; + + /* Callbacks fired by UV events. */ +public: + void on_uv_connection(int status); + void on_uv_closed(); + + /* Methods inherited from tcp_connection::listener. */ + virtual void on_tcp_connection_closed(tcp_connection* connection, bool is_closed_by_peer) override; + +private: + // Allocated by this (may be passed by argument). + uv_tcp_t* m_uv_handle; + uv_loop_t* m_loop; + // Others. + std::unordered_set m_connections; + bool m_is_closing; + +protected: + struct sockaddr_storage m_local_addr; + std::string m_local_ip; + uint16_t m_local_port; +}; + +/* Inline methods. */ + +inline bool tcp_server::is_closing() +{ + return m_is_closing; +} + +inline size_t tcp_server::get_num_connections() +{ + return m_connections.size(); +} + +#endif diff --git a/src/osd/modules/output/network.cpp b/src/osd/modules/output/network.cpp index 13504054d85..3bca29f3dbb 100644 --- a/src/osd/modules/output/network.cpp +++ b/src/osd/modules/output/network.cpp @@ -10,17 +10,40 @@ #include "output_module.h" #include "modules/osdmodule.h" +#include "modules/ipc/raw_tcp_server.h" +#include "modules/ipc/raw_tcp_connection.h" #include #include #include +class output_network_server : + public raw_tcp_server::listener, + public raw_tcp_connection::listener +{ + +public: + output_network_server(uv_loop_t* loop) { m_tcp_server = new raw_tcp_server(loop, "0.0.0.0", 8000, 256, this, this); } + + virtual ~output_network_server() { delete m_tcp_server; } + + void terminate_all() { m_tcp_server->terminate(); } + void send_to_all(const uint8_t* data, size_t len) { m_tcp_server->send_to_all(data,len); } + + /* Pure virtual methods inherited from raw_tcp_server::listener. */ + virtual void on_raw_tcp_connection_closed(raw_tcp_server* tcpServer, raw_tcp_connection* connection, bool is_closed_by_peer) override { } + /* Pure virtual methods inherited from raw_tcp_connection::listener. */ + virtual void on_data_recv(raw_tcp_connection *connection, const uint8_t* data, size_t len) override { } +private: + raw_tcp_server* m_tcp_server; +}; + class output_network : public osd_module, public output_module { public: output_network() - : osd_module(OSD_OUTPUT_PROVIDER, "network"), output_module(), m_loop(nullptr) - { + : osd_module(OSD_OUTPUT_PROVIDER, "network"), output_module(), m_loop(nullptr), m_server(nullptr) +{ } virtual ~output_network() { } @@ -31,29 +54,33 @@ public: if (err) { return 1; } - m_working_thread = std::thread([](output_network* self) { self->process_output(); }, this); return 0; } + virtual void exit() override { - m_working_thread.join(); + m_server->terminate_all(); + m_working_thread.join(); uv_loop_close(m_loop); delete m_loop; + delete m_server; } // output_module - virtual void notify(const char *outname, INT32 value) override { } + virtual void notify(const char *outname, INT32 value) override { m_server->send_to_all((const uint8_t*)outname, strlen(outname)); } // implementation void process_output() { + m_server = new output_network_server(m_loop); uv_run(m_loop, UV_RUN_DEFAULT); } private: std::thread m_working_thread; uv_loop_t* m_loop; + output_network_server *m_server; }; MODULE_DEFINITION(OUTPUT_NETWORK, output_network)