Basic TCP server based on libuv [Inaki Baz Castillo,Miodrag Milanovic]

This commit is contained in:
Miodrag Milanovic 2016-04-10 14:02:31 +02:00
parent dd89b30c63
commit ffbe0c66be
14 changed files with 1344 additions and 5 deletions

View File

@ -91,6 +91,18 @@ function osdmodulesbuild()
MAME_DIR .. "src/osd/modules/output/none.cpp",
MAME_DIR .. "src/osd/modules/output/console.cpp",
MAME_DIR .. "src/osd/modules/output/network.cpp",
MAME_DIR .. "src/osd/modules/ipc/tcp_connection.cpp",
MAME_DIR .. "src/osd/modules/ipc/tcp_connection.h",
MAME_DIR .. "src/osd/modules/ipc/tcp_server.cpp",
MAME_DIR .. "src/osd/modules/ipc/tcp_server.h",
MAME_DIR .. "src/osd/modules/ipc/raw_tcp_connection.cpp",
MAME_DIR .. "src/osd/modules/ipc/raw_tcp_connection.h",
MAME_DIR .. "src/osd/modules/ipc/raw_tcp_server.cpp",
MAME_DIR .. "src/osd/modules/ipc/raw_tcp_server.h",
MAME_DIR .. "src/osd/modules/ipc/rtc_tcp_connection.cpp",
MAME_DIR .. "src/osd/modules/ipc/rtc_tcp_connection.h",
MAME_DIR .. "src/osd/modules/ipc/rtc_tcp_server.cpp",
MAME_DIR .. "src/osd/modules/ipc/rtc_tcp_server.h",
}
includedirs {
ext_includedir("uv"),

View File

@ -0,0 +1,29 @@
// license:BSD-3-Clause
// copyright-holders:Inaki Baz Castillo,Miodrag Milanovic
#include "raw_tcp_connection.h"
/* Instance methods. */
raw_tcp_connection::raw_tcp_connection(listener* listener, size_t bufferSize) :
tcp_connection(bufferSize),
m_listener(listener)
{
}
raw_tcp_connection::~raw_tcp_connection()
{
}
void raw_tcp_connection::user_on_tcp_connection_read()
{
// We may receive multiple packets in the same TCP chunk. If one of them is
// a DTLS Close Alert this would be closed (close() called) so we cannot call
// our listeners anymore.
if (is_closing())
return;
m_listener->on_data_recv(this, m_buffer, m_buffer_data_len);
m_buffer_data_len = 0;
}

View File

@ -0,0 +1,30 @@
// license:BSD-3-Clause
// copyright-holders:Inaki Baz Castillo,Miodrag Milanovic
#ifndef RAW_TCP_CONNECTION_H
#define RAW_TCP_CONNECTION_H
#include "tcp_connection.h"
class raw_tcp_connection : public tcp_connection
{
public:
class listener
{
public:
virtual ~listener(){ }
virtual void on_data_recv(raw_tcp_connection *connection, const uint8_t* data, size_t len) = 0;
};
raw_tcp_connection(listener* listener, size_t bufferSize);
virtual ~raw_tcp_connection();
/* Pure virtual methods inherited from tcp_connection. */
virtual void user_on_tcp_connection_read() override;
private:
// Passed by argument.
listener* m_listener;
};
#endif

View File

@ -0,0 +1,42 @@
// license:BSD-3-Clause
// copyright-holders:Inaki Baz Castillo,Miodrag Milanovic
#include "raw_tcp_server.h"
#include <string>
#define MAX_TCP_CONNECTIONS_PER_SERVER 10
/* Instance methods. */
raw_tcp_server::raw_tcp_server(uv_loop_t* loop, const std::string &ip, uint16_t port, int backlog, listener* listener, raw_tcp_connection::listener* connListener) :
tcp_server(loop, ip, port, backlog),
m_listener(listener),
m_conn_listener(connListener)
{
}
void raw_tcp_server::user_on_tcp_connection_alloc(tcp_connection** connection)
{
// Allocate a new raw_tcp_connection for the raw_tcp_server to handle it.
*connection = new raw_tcp_connection(m_conn_listener, 65536);
}
void raw_tcp_server::user_on_new_tcp_connection(tcp_connection* connection)
{
// Allow just MAX_TCP_CONNECTIONS_PER_SERVER.
if (get_num_connections() > MAX_TCP_CONNECTIONS_PER_SERVER)
connection->close();
}
void raw_tcp_server::user_on_tcp_connection_closed(tcp_connection* connection, bool is_closed_by_peer)
{
// Notify the listener.
// NOTE: Don't do it if closing (since at this point the listener is already freed).
// At the end, this is just called if the connection was remotely closed.
if (!is_closing())
m_listener->on_raw_tcp_connection_closed(this, static_cast<raw_tcp_connection*>(connection), is_closed_by_peer);
}
void raw_tcp_server::user_on_tcp_server_closed()
{
}

View File

@ -0,0 +1,36 @@
// license:BSD-3-Clause
// copyright-holders:Inaki Baz Castillo,Miodrag Milanovic
#ifndef RAW_TCP_SERVER_H
#define RAW_TCP_SERVER_H
#include "tcp_server.h"
#include "tcp_connection.h"
#include "raw_tcp_connection.h"
#include <uv.h>
class raw_tcp_server : public tcp_server
{
public:
class listener
{
public:
virtual ~listener() { }
virtual void on_raw_tcp_connection_closed(raw_tcp_server* tcpServer, raw_tcp_connection* connection, bool is_closed_by_peer) = 0;
};
raw_tcp_server(uv_loop_t* loop, const std::string &ip, uint16_t port, int backlog, listener* listener, raw_tcp_connection::listener* connListener);
/* Pure virtual methods inherited from TCPServer. */
virtual void user_on_tcp_connection_alloc(tcp_connection** connection) override;
virtual void user_on_new_tcp_connection(tcp_connection* connection) override;
virtual void user_on_tcp_connection_closed(tcp_connection* connection, bool is_closed_by_peer) override;
virtual void user_on_tcp_server_closed() override;
private:
// Passed by argument.
listener* m_listener;
raw_tcp_connection::listener* m_conn_listener;
};
#endif

View File

@ -0,0 +1,150 @@
// license:BSD-3-Clause
// copyright-holders:Inaki Baz Castillo,Miodrag Milanovic
#include "emu.h"
#include "rtc_tcp_connection.h"
/* Instance methods. */
rtc_tcp_connection::rtc_tcp_connection(listener* listener, size_t bufferSize) :
tcp_connection(bufferSize),
m_listener(listener),
m_frame_start(0)
{
}
rtc_tcp_connection::~rtc_tcp_connection()
{
}
inline uint16_t get2bytes(const uint8_t* data, size_t i)
{
return (uint16_t)(data[i + 1]) | ((uint16_t)(data[i])) << 8;
}
inline void set2bytes(uint8_t* data, size_t i, uint16_t value)
{
data[i + 1] = (uint8_t)(value);
data[i] = (uint8_t)(value >> 8);
}
void rtc_tcp_connection::user_on_tcp_connection_read()
{
/*
* Framing RFC 4571
*
* 0 1 2 3
* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
* ---------------------------------------------------------------
* | LENGTH | STUN / DTLS / RTP / RTCP |
* ---------------------------------------------------------------
*
* A 16-bit unsigned integer LENGTH field, coded in network byte order
* (big-endian), begins the frame. If LENGTH is non-zero, an RTP or
* RTCP packet follows the LENGTH field. The value coded in the LENGTH
* field MUST equal the number of octets in the RTP or RTCP packet.
* Zero is a valid value for LENGTH, and it codes the null packet.
*/
// Be ready to parse more than a single frame in a single TCP chunk.
while (true)
{
// We may receive multiple packets in the same TCP chunk. If one of them is
// a DTLS Close Alert this would be closed (Close() called) so we cannot call
// our listeners anymore.
if (is_closing())
return;
size_t data_len = m_buffer_data_len - m_frame_start;
size_t packet_len = 0;
if (data_len >= 2)
packet_len = (size_t)get2bytes(m_buffer + m_frame_start, 0);
// We have packet_len bytes.
if ((data_len >= 2) && data_len >= 2 + packet_len)
{
const uint8_t* packet = m_buffer + m_frame_start + 2;
// Notify the listener.
if (packet_len != 0)
{
m_listener->on_packet_recv(this, packet, packet_len);
}
// If there is no more space available in the buffer and that is because
// the latest parsed frame filled it, then empty the full buffer.
if ((m_frame_start + 2 + packet_len) == m_buffer_size)
{
osd_printf_error("no more space in the buffer, emptying the buffer data");
m_frame_start = 0;
m_buffer_data_len = 0;
}
// If there is still space in the buffer, set the beginning of the next
// frame to the next position after the parsed frame.
else
{
m_frame_start += 2 + packet_len;
}
// If there is more data in the buffer after the parsed frame then
// parse again. Otherwise break here and wait for more data.
if (m_buffer_data_len > m_frame_start)
{
// osd_printf_error("there is more data after the parsed frame, continue parsing");
continue;
}
else
{
break;
}
}
else // Incomplete packet.
{
// Check if the buffer is full.
if (m_buffer_data_len == m_buffer_size)
{
// First case: the incomplete frame does not begin at position 0 of
// the buffer, so move the frame to the position 0.
if (m_frame_start != 0)
{
// osd_printf_error("no more space in the buffer, moving parsed bytes to the beginning of the buffer and wait for more data");
std::memmove(m_buffer, m_buffer + m_frame_start, m_buffer_size - m_frame_start);
m_buffer_data_len = m_buffer_size - m_frame_start;
m_frame_start = 0;
}
// Second case: the incomplete frame begins at position 0 of the buffer.
// The frame is too big, so close the connection.
else
{
osd_printf_error("no more space in the buffer for the unfinished frame being parsed, closing the connection");
// Close the socket.
close();
}
}
// The buffer is not full.
else
{
osd_printf_verbose("frame not finished yet, waiting for more data");
}
// Exit the parsing loop.
break;
}
}
}
void rtc_tcp_connection::send(const uint8_t* data, size_t len)
{
// Write according to Framing RFC 4571.
uint8_t frame_len[2];
set2bytes(frame_len, 0, len);
write(frame_len, 2, data, len);
}

View File

@ -0,0 +1,33 @@
// license:BSD-3-Clause
// copyright-holders:Inaki Baz Castillo,Miodrag Milanovic
#ifndef RTC_TCP_CONNECTION_H
#define RTC_TCP_CONNECTION_H
#include "tcp_connection.h"
class rtc_tcp_connection : public tcp_connection
{
public:
class listener
{
public:
virtual ~listener(){ }
virtual void on_packet_recv(rtc_tcp_connection *connection, const uint8_t* data, size_t len) = 0;
};
rtc_tcp_connection(listener* listener, size_t bufferSize);
virtual ~rtc_tcp_connection();
/* Pure virtual methods inherited from tcp_connection. */
virtual void user_on_tcp_connection_read() override;
void send(const uint8_t* data, size_t len);
private:
// Passed by argument.
listener* m_listener;
size_t m_frame_start; // Where the latest frame starts.
};
#endif

View File

@ -0,0 +1,42 @@
// license:BSD-3-Clause
// copyright-holders:Inaki Baz Castillo,Miodrag Milanovic
#include "rtc_tcp_server.h"
#include <string>
#define MAX_TCP_CONNECTIONS_PER_SERVER 10
/* Instance methods. */
rtc_tcp_server::rtc_tcp_server(uv_loop_t* loop, const std::string &ip, uint16_t port, int backlog, listener* listener, rtc_tcp_connection::listener* connListener) :
tcp_server(loop, ip, port, backlog),
m_listener(listener),
m_conn_listener(connListener)
{
}
void rtc_tcp_server::user_on_tcp_connection_alloc(tcp_connection** connection)
{
// Allocate a new rtc_tcp_connection for the rtc_tcp_server to handle it.
*connection = new rtc_tcp_connection(m_conn_listener, 65536);
}
void rtc_tcp_server::user_on_new_tcp_connection(tcp_connection* connection)
{
// Allow just MAX_TCP_CONNECTIONS_PER_SERVER.
if (get_num_connections() > MAX_TCP_CONNECTIONS_PER_SERVER)
connection->close();
}
void rtc_tcp_server::user_on_tcp_connection_closed(tcp_connection* connection, bool is_closed_by_peer)
{
// Notify the listener.
// NOTE: Don't do it if closing (since at this point the listener is already freed).
// At the end, this is just called if the connection was remotely closed.
if (!is_closing())
m_listener->on_rtc_tcp_connection_closed(this, static_cast<rtc_tcp_connection*>(connection), is_closed_by_peer);
}
void rtc_tcp_server::user_on_tcp_server_closed()
{
}

View File

@ -0,0 +1,36 @@
// license:BSD-3-Clause
// copyright-holders:Inaki Baz Castillo,Miodrag Milanovic
#ifndef RTC_TCP_SERVER_H
#define RTC_TCP_SERVER_H
#include "tcp_server.h"
#include "tcp_connection.h"
#include "rtc_tcp_connection.h"
#include <uv.h>
class rtc_tcp_server : public tcp_server
{
public:
class listener
{
public:
virtual ~listener() { }
virtual void on_rtc_tcp_connection_closed(rtc_tcp_server* tcpServer, rtc_tcp_connection* connection, bool is_closed_by_peer) = 0;
};
rtc_tcp_server(uv_loop_t* loop, const std::string &ip, uint16_t port, int backlog, listener* listener, rtc_tcp_connection::listener* connListener);
/* Pure virtual methods inherited from TCPServer. */
virtual void user_on_tcp_connection_alloc(tcp_connection** connection) override;
virtual void user_on_new_tcp_connection(tcp_connection* connection) override;
virtual void user_on_tcp_connection_closed(tcp_connection* connection, bool is_closed_by_peer) override;
virtual void user_on_tcp_server_closed() override;
private:
// Passed by argument.
listener* m_listener;
rtc_tcp_connection::listener* m_conn_listener;
};
#endif

View File

@ -0,0 +1,414 @@
// license:BSD-3-Clause
// copyright-holders:Inaki Baz Castillo,Miodrag Milanovic
#include "emu.h"
#include "tcp_connection.h"
#include <cstdint> // uint8_t, etc
#include <cstdlib> // std::malloc(), std::free()
/* Static methods for UV callbacks. */
static inline void on_alloc(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
{
static_cast<tcp_connection*>(handle->data)->on_uv_read_alloc(suggested_size, buf);
}
static inline void on_read(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf)
{
static_cast<tcp_connection*>(handle->data)->on_uv_read(nread, buf);
}
static inline void on_write(uv_write_t* req, int status)
{
tcp_connection::tcp_uv_write_data* write_data = static_cast<tcp_connection::tcp_uv_write_data*>(req->data);
tcp_connection* connection = write_data->connection;
// Delete the UvWriteData struct (which includes the uv_req_t and the store char[]).
std::free(write_data);
// Just notify the TCPConnection when error.
if (status)
connection->on_uv_write_error(status);
}
static inline void on_shutdown(uv_shutdown_t* req, int status)
{
static_cast<tcp_connection*>(req->data)->on_uv_shutdown(req, status);
}
static inline void on_close(uv_handle_t* handle)
{
static_cast<tcp_connection*>(handle->data)->on_uv_closed();
}
/* Instance methods. */
tcp_connection::tcp_connection(size_t bufferSize) :
m_listener(nullptr),
m_local_addr(nullptr),
m_is_closing(false),
m_is_closed_by_peer(false),
m_has_error(false),
m_buffer_size(bufferSize),
m_buffer(nullptr),
m_buffer_data_len(0),
m_local_port(0),
m_peer_port(0)
{
m_uv_handle = new uv_tcp_t;
m_uv_handle->data = (void*)this;
// NOTE: Don't allocate the buffer here. Instead wait for the first uv_alloc_cb().
}
tcp_connection::~tcp_connection()
{
if (m_uv_handle)
delete m_uv_handle;
if (m_buffer)
delete[] m_buffer;
}
void tcp_connection::setup(uv_loop_t* loop, listener* listener, struct sockaddr_storage* localAddr, const std::string &localIP, uint16_t localPort)
{
int err;
// Set the UV handle.
err = uv_tcp_init(loop, m_uv_handle);
if (err)
{
delete m_uv_handle;
m_uv_handle = nullptr;
throw emu_fatalerror("uv_tcp_init() failed: %s", uv_strerror(err));
}
// Set the listener.
m_listener = listener;
// Set the local address.
m_local_addr = localAddr;
m_local_ip = localIP;
m_local_port = localPort;
}
void tcp_connection::close()
{
if (m_is_closing)
return;
int err;
m_is_closing = true;
// Don't read more.
err = uv_read_stop((uv_stream_t*)m_uv_handle);
if (err)
throw emu_fatalerror("uv_read_stop() failed: %s", uv_strerror(err));
// If there is no error and the peer didn't close its connection side then close gracefully.
if (!m_has_error && !m_is_closed_by_peer)
{
// Use uv_shutdown() so pending data to be written will be sent to the peer
// before closing.
uv_shutdown_t* req = new uv_shutdown_t;
req->data = (void*)this;
err = uv_shutdown(req, (uv_stream_t*)m_uv_handle, (uv_shutdown_cb)on_shutdown);
if (err)
throw emu_fatalerror("uv_shutdown() failed: %s", uv_strerror(err));
}
// Otherwise directly close the socket.
else
{
uv_close((uv_handle_t*)m_uv_handle, (uv_close_cb)on_close);
}
}
void tcp_connection::terminate()
{
if (m_is_closing)
return;
m_is_closing = true;
uv_close((uv_handle_t*)m_uv_handle, (uv_close_cb)on_close);
}
void tcp_connection::dump()
{
osd_printf_verbose("[TCP, local:%s :%d, remote:%s :%d, status:%s]\r\n",
m_local_ip.c_str(), (uint16_t)m_local_port,
m_peer_ip.c_str(), (uint16_t)m_peer_port,
(!m_is_closing) ? "open" : "closed");
}
void tcp_connection::start()
{
if (m_is_closing)
return;
int err;
err = uv_read_start((uv_stream_t*)m_uv_handle, (uv_alloc_cb)on_alloc, (uv_read_cb)on_read);
if (err)
throw emu_fatalerror("uv_read_start() failed: %s", uv_strerror(err));
// Get the peer address.
if (!set_peer_address())
throw emu_fatalerror("error setting peer IP and port");
}
void tcp_connection::write(const uint8_t* data, size_t len)
{
if (m_is_closing)
return;
if (len == 0)
return;
uv_buf_t buffer;
int written;
int err;
// First try uv_try_write(). In case it can not directly write all the given
// data then build a uv_req_t and use uv_write().
buffer = uv_buf_init((char*)data, len);
written = uv_try_write((uv_stream_t*)m_uv_handle, &buffer, 1);
// All the data was written. Done.
if (written == (int)len)
{
return;
}
// Cannot write any data at first time. Use uv_write().
else if (written == UV_EAGAIN || written == UV_ENOSYS)
{
// Set written to 0 so pending_len can be properly calculated.
written = 0;
}
// Error. Should not happen.
else if (written < 0)
{
osd_printf_warning("uv_try_write() failed, closing the connection: %s", uv_strerror(written));
close();
return;
}
// osd_printf_info("could just write %zu bytes (%zu given) at first time, using uv_write() now", (size_t)written, len);
size_t pending_len = len - written;
// Allocate a special UvWriteData struct pointer.
tcp_uv_write_data* write_data = (tcp_uv_write_data*)std::malloc(sizeof(tcp_uv_write_data) + pending_len);
write_data->connection = this;
std::memcpy(write_data->store, data + written, pending_len);
write_data->req.data = (void*)write_data;
buffer = uv_buf_init((char*)write_data->store, pending_len);
err = uv_write(&write_data->req, (uv_stream_t*)m_uv_handle, &buffer, 1, (uv_write_cb)on_write);
if (err)
throw emu_fatalerror("uv_write() failed: %s", uv_strerror(err));
}
void tcp_connection::write(const uint8_t* data1, size_t len1, const uint8_t* data2, size_t len2)
{
if (m_is_closing)
return;
if (len1 == 0 && len2 == 0)
return;
size_t total_len = len1 + len2;
uv_buf_t buffers[2];
int written;
int err;
// First try uv_try_write(). In case it can not directly write all the given
// data then build a uv_req_t and use uv_write().
buffers[0] = uv_buf_init((char*)data1, len1);
buffers[1] = uv_buf_init((char*)data2, len2);
written = uv_try_write((uv_stream_t*)m_uv_handle, buffers, 2);
// All the data was written. Done.
if (written == (int)total_len)
{
return;
}
// Cannot write any data at first time. Use uv_write().
else if (written == UV_EAGAIN || written == UV_ENOSYS)
{
// Set written to 0 so pending_len can be properly calculated.
written = 0;
}
// Error. Should not happen.
else if (written < 0)
{
osd_printf_warning("uv_try_write() failed, closing the connection: %s", uv_strerror(written));
close();
return;
}
// osd_printf_info("could just write %zu bytes (%zu given) at first time, using uv_write() now", (size_t)written, total_len);
size_t pending_len = total_len - written;
// Allocate a special UvWriteData struct pointer.
tcp_uv_write_data* write_data = (tcp_uv_write_data*)std::malloc(sizeof(tcp_uv_write_data) + pending_len);
write_data->connection = this;
// If the first buffer was not entirely written then splice it.
if ((size_t)written < len1)
{
std::memcpy(write_data->store, data1 + (size_t)written, len1 - (size_t)written);
std::memcpy(write_data->store + (len1 - (size_t)written), data2, len2);
}
// Otherwise just take the pending data in the second buffer.
else
{
std::memcpy(write_data->store, data2 + ((size_t)written - len1), len2 - ((size_t)written - len1));
}
write_data->req.data = (void*)write_data;
uv_buf_t buffer = uv_buf_init((char*)write_data->store, pending_len);
err = uv_write(&write_data->req, (uv_stream_t*)m_uv_handle, &buffer, 1, (uv_write_cb)on_write);
if (err)
throw emu_fatalerror("uv_write() failed: %s", uv_strerror(err));
}
void GetAddressInfo(const struct sockaddr* addr, int* family, std::string &ip, uint16_t* port)
{
char _ip[INET6_ADDRSTRLEN + 1];
int err;
err = uv_inet_ntop(AF_INET, &((struct sockaddr_in*)addr)->sin_addr, _ip, INET_ADDRSTRLEN);
if (err)
throw emu_fatalerror("uv_inet_ntop() failed: %s", uv_strerror(err));
*port = (uint16_t)ntohs(((struct sockaddr_in*)addr)->sin_port);
*family = addr->sa_family;
ip.assign(_ip);
}
bool tcp_connection::set_peer_address()
{
int err;
int len = sizeof(m_peer_addr);
err = uv_tcp_getpeername(m_uv_handle, (struct sockaddr*)&m_peer_addr, &len);
if (err)
{
osd_printf_error("uv_tcp_getpeername() failed: %s", uv_strerror(err));
return false;
}
int family;
GetAddressInfo((const struct sockaddr*)&m_peer_addr, &family, m_peer_ip, &m_peer_port);
return true;
}
void tcp_connection::on_uv_read_alloc(size_t suggested_size, uv_buf_t* buf)
{
// If this is the first call to onUvReadAlloc() then allocate the receiving buffer now.
if (!m_buffer)
m_buffer = new uint8_t[m_buffer_size];
// Tell UV to write after the last data byte in the buffer.
buf->base = (char *)(m_buffer + m_buffer_data_len);
// Give UV all the remaining space in the buffer.
if (m_buffer_size > m_buffer_data_len)
{
buf->len = m_buffer_size - m_buffer_data_len;
}
else
{
buf->len = 0;
osd_printf_warning("no available space in the buffer");
}
}
void tcp_connection::on_uv_read(::ssize_t nread, const uv_buf_t* buf)
{
if (m_is_closing)
return;
if (nread == 0)
return;
// Data received.
if (nread > 0)
{
// Update the buffer data length.
m_buffer_data_len += (size_t)nread;
// Notify the subclass.
user_on_tcp_connection_read();
}
// Client disconneted.
else if (nread == UV_EOF || nread == UV_ECONNRESET)
{
osd_printf_info("connection closed by peer, closing server side");
m_is_closed_by_peer = true;
// Close server side of the connection.
close();
}
// Some error.
else
{
osd_printf_info("read error, closing the connection: %s", uv_strerror(nread));
m_has_error = true;
// Close server side of the connection.
close();
}
}
void tcp_connection::on_uv_write_error(int error)
{
if (m_is_closing)
return;
if (error == UV_EPIPE || error == UV_ENOTCONN)
{
osd_printf_info("write error, closing the connection: %s", uv_strerror(error));
}
else
{
osd_printf_info("write error, closing the connection: %s", uv_strerror(error));
m_has_error = true;
}
close();
}
void tcp_connection::on_uv_shutdown(uv_shutdown_t* req, int status)
{
delete req;
if (status == UV_EPIPE || status == UV_ENOTCONN || status == UV_ECANCELED)
osd_printf_info("shutdown error: %s", uv_strerror(status));
else if (status)
osd_printf_info("shutdown error: %s", uv_strerror(status));
// Now do close the handle.
uv_close((uv_handle_t*)m_uv_handle, (uv_close_cb)on_close);
}
void tcp_connection::on_uv_closed()
{
// Notify the listener.
m_listener->on_tcp_connection_closed(this, m_is_closed_by_peer);
// And delete this.
delete this;
}

View File

@ -0,0 +1,137 @@
// license:BSD-3-Clause
// copyright-holders:Inaki Baz Castillo,Miodrag Milanovic
#ifndef TCP_CONNECTION_H
#define TCP_CONNECTION_H
#include <string>
#include <uv.h>
extern void GetAddressInfo(const sockaddr * addr, int * family, std::string & ip, uint16_t * port);
class tcp_connection
{
public:
class listener
{
public:
virtual ~listener() { }
virtual void on_tcp_connection_closed(tcp_connection* connection, bool is_closed_by_peer) = 0;
};
/* Struct for the data field of uv_req_t when writing into the connection. */
struct tcp_uv_write_data
{
tcp_connection* connection;
uv_write_t req;
uint8_t store[1];
};
tcp_connection(size_t bufferSize);
virtual ~tcp_connection();
void close();
void terminate();
virtual void dump();
void setup(uv_loop_t* loop, listener* listener, struct sockaddr_storage* localAddr, const std::string &localIP, uint16_t localPort);
bool is_closing();
uv_tcp_t* get_uv_handle();
void start();
void write(const uint8_t* data, size_t len);
void write(const uint8_t* data1, size_t len1, const uint8_t* data2, size_t len2);
void write(const std::string &data);
const struct sockaddr* get_local_address();
const std::string& get_local_ip();
uint16_t get_local_port();
const struct sockaddr* get_peer_address();
const std::string& get_peer_ip();
uint16_t get_peer_port();
private:
bool set_peer_address();
/* Callbacks fired by UV events. */
public:
void on_uv_read_alloc(size_t suggested_size, uv_buf_t* buf);
void on_uv_read(ssize_t nread, const uv_buf_t* buf);
void on_uv_write_error(int error);
void on_uv_shutdown(uv_shutdown_t* req, int status);
void on_uv_closed();
/* Pure virtual methods that must be implemented by the subclass. */
protected:
virtual void user_on_tcp_connection_read() = 0;
private:
// Passed by argument.
listener* m_listener;
// Allocated by this.
uv_tcp_t* m_uv_handle;
// Others.
struct sockaddr_storage* m_local_addr;
bool m_is_closing;
bool m_is_closed_by_peer;
bool m_has_error;
protected:
// Passed by argument.
size_t m_buffer_size;
// Allocated by this.
uint8_t* m_buffer;
// Others.
size_t m_buffer_data_len;
std::string m_local_ip;
uint16_t m_local_port;
struct sockaddr_storage m_peer_addr;
std::string m_peer_ip;
uint16_t m_peer_port;
};
/* Inline methods. */
inline bool tcp_connection::is_closing()
{
return m_is_closing;
}
inline uv_tcp_t* tcp_connection::get_uv_handle()
{
return m_uv_handle;
}
inline void tcp_connection::write(const std::string &data)
{
write((const uint8_t*)data.c_str(), data.size());
}
inline const sockaddr* tcp_connection::get_local_address()
{
return (const struct sockaddr*)m_local_addr;
}
inline const std::string& tcp_connection::get_local_ip()
{
return m_local_ip;
}
inline uint16_t tcp_connection::get_local_port()
{
return m_local_port;
}
inline const sockaddr* tcp_connection::get_peer_address()
{
return (const struct sockaddr*)&m_peer_addr;
}
inline const std::string& tcp_connection::get_peer_ip()
{
return m_peer_ip;
}
inline uint16_t tcp_connection::get_peer_port()
{
return m_peer_port;
}
#endif

View File

@ -0,0 +1,279 @@
// license:BSD-3-Clause
// copyright-holders:Inaki Baz Castillo,Miodrag Milanovic
#include "emu.h"
#include "tcp_server.h"
/* Static methods for UV callbacks. */
static inline void on_connection(uv_stream_t* handle, int status)
{
static_cast<tcp_server*>(handle->data)->on_uv_connection(status);
}
static inline void on_close(uv_handle_t* handle)
{
static_cast<tcp_server*>(handle->data)->on_uv_closed();
}
static inline void on_error_close(uv_handle_t* handle)
{
delete handle;
}
/* Instance methods. */
tcp_server::tcp_server(uv_loop_t* loop, const std::string &ip, uint16_t port, int backlog)
: m_uv_handle(nullptr),
m_loop(nullptr),
m_is_closing(false),
m_local_port(0)
{
int err;
int flags = 0;
m_uv_handle = new uv_tcp_t;
m_uv_handle->data = (void*)this;
m_loop = loop;
err = uv_tcp_init(loop, m_uv_handle);
if (err)
{
delete m_uv_handle;
m_uv_handle = nullptr;
throw emu_fatalerror("uv_tcp_init() failed: %s", uv_strerror(err));
}
struct sockaddr_storage bind_addr;
err = uv_ip4_addr(ip.c_str(), (int)port, (struct sockaddr_in*)&bind_addr);
if (err)
throw emu_fatalerror("uv_ipv4_addr() failed: %s", uv_strerror(err));
err = uv_tcp_bind(m_uv_handle, (const struct sockaddr*)&bind_addr, flags);
if (err)
{
uv_close((uv_handle_t*)m_uv_handle, (uv_close_cb)on_error_close);
throw emu_fatalerror("uv_tcp_bind() failed: %s", uv_strerror(err));
}
err = uv_listen((uv_stream_t*)m_uv_handle, backlog, (uv_connection_cb)on_connection);
if (err)
{
uv_close((uv_handle_t*)m_uv_handle, (uv_close_cb)on_error_close);
throw emu_fatalerror("uv_listen() failed: %s", uv_strerror(err));
}
// Set local address.
if (!set_local_address())
{
uv_close((uv_handle_t*)m_uv_handle, (uv_close_cb)on_error_close);
throw emu_fatalerror("error setting local IP and port");
}
}
tcp_server::~tcp_server()
{
if (m_uv_handle)
delete m_uv_handle;
}
void tcp_server::close()
{
if (m_is_closing)
return;
m_is_closing = true;
// If there are no connections then close now.
if (m_connections.empty())
{
uv_close((uv_handle_t*)m_uv_handle, (uv_close_cb)on_close);
}
// Otherwise close all the connections (but not the TCP server).
else
{
osd_printf_info("closing %d active connections", (int)m_connections.size());
for (auto it = m_connections.begin(); it != m_connections.end(); ++it)
{
tcp_connection* connection = *it;
connection->close();
}
}
}
void tcp_server::terminate()
{
if (m_is_closing)
return;
m_is_closing = true;
// If there are no connections then close now.
if (m_connections.empty())
{
uv_close((uv_handle_t*)m_uv_handle, (uv_close_cb)on_close);
}
// Otherwise close all the connections (but not the TCP server).
else
{
osd_printf_info("closing %d active connections", (int)m_connections.size());
for (auto it = m_connections.begin(); it != m_connections.end(); ++it)
{
tcp_connection* connection = *it;
connection->terminate();
}
}
}
void tcp_server::send_to_all(const uint8_t* data, size_t len)
{
// If there are no connections then close now.
if (!m_connections.empty())
{
for (auto it = m_connections.begin(); it != m_connections.end(); ++it)
{
tcp_connection* connection = *it;
connection->write(data, len);
}
}
}
void tcp_server::dump()
{
osd_printf_info("[TCP, local:%s :%d, status:%s, connections:%d]",
m_local_ip.c_str(), (uint16_t)m_local_port,
(!m_is_closing) ? "open" : "closed",
(int)m_connections.size());
}
const sockaddr* tcp_server::get_local_address()
{
return (const struct sockaddr*)&m_local_addr;
}
const std::string& tcp_server::get_local_ip()
{
return m_local_ip;
}
uint16_t tcp_server::get_local_port()
{
return m_local_port;
}
bool tcp_server::set_local_address()
{
int err;
int len = sizeof(m_local_addr);
err = uv_tcp_getsockname(m_uv_handle, (struct sockaddr*)&m_local_addr, &len);
if (err)
{
osd_printf_error("uv_tcp_getsockname() failed: %s", uv_strerror(err));
return false;
}
int family;
GetAddressInfo((const struct sockaddr*)&m_local_addr, &family, m_local_ip, &m_local_port);
return true;
}
inline void tcp_server::on_uv_connection(int status)
{
if (m_is_closing)
return;
int err;
if (status)
{
osd_printf_error("error while receiving a new TCP connection: %s", uv_strerror(status));
return;
}
// Notify the subclass so it provides an allocated derived class of TCPConnection.
tcp_connection* connection = nullptr;
user_on_tcp_connection_alloc(&connection);
if (connection != nullptr)
osd_printf_error("tcp_server pointer was not allocated by the user");
try
{
connection->setup(m_loop, this, &(m_local_addr), m_local_ip, m_local_port);
}
catch (...)
{
delete connection;
return;
}
// Accept the connection.
err = uv_accept((uv_stream_t*)m_uv_handle, (uv_stream_t*)connection->get_uv_handle());
if (err)
throw emu_fatalerror("uv_accept() failed: %s", uv_strerror(err));
// Insert the TCPConnection in the set.
m_connections.insert(connection);
// Start receiving data.
try
{
connection->start();
}
catch (emu_exception &error)
{
osd_printf_error("cannot run the TCP connection, closing the connection: %s", error.what());
connection->close();
// NOTE: Don't return here so the user won't be notified about a "onclose" for a TCP connection
// for which there was not a previous "onnew" event.
}
osd_printf_info("new TCP connection:");
connection->dump();
// Notify the subclass.
user_on_new_tcp_connection(connection);
}
void tcp_server::on_uv_closed()
{
// Motify the subclass.
user_on_tcp_server_closed();
}
void tcp_server::on_tcp_connection_closed(tcp_connection* connection, bool is_closed_by_peer)
{
// NOTE:
// Worst scenario is that in which this is the latest connection,
// which is remotely closed (no tcp_server.Close() was called) and the user
// call tcp_server.Close() on userOnTCPConnectionClosed() callback, so Close()
// is called with zero connections and calls uv_close(), but then
// onTCPConnectionClosed() continues and finds that isClosing is true and
// there are zero connections, so calls uv_close() again and get a crash.
//
// SOLUTION:
// Check isClosing value *before* onTCPConnectionClosed() callback.
bool wasClosing = m_is_closing;
osd_printf_info("TCP connection closed:");
connection->dump();
// Remove the TCPConnection from the set.
m_connections.erase(connection);
// Notify the subclass.
user_on_tcp_connection_closed(connection, is_closed_by_peer);
// Check if the server was closing connections, and if this is the last
// connection then close the server now.
if (wasClosing && m_connections.empty())
uv_close((uv_handle_t*)m_uv_handle, (uv_close_cb)on_close);
}

View File

@ -0,0 +1,72 @@
// license:BSD-3-Clause
// copyright-holders:Inaki Baz Castillo,Miodrag Milanovic
#ifndef TCP_SERVER_H
#define TCP_SERVER_H
#include "tcp_connection.h"
#include <string>
#include <unordered_set>
#include <uv.h>
class tcp_server : public tcp_connection::listener
{
public:
tcp_server(uv_loop_t* m_loop, const std::string &ip, uint16_t port, int backlog);
virtual ~tcp_server();
void close();
void terminate();
virtual void dump();
void send_to_all(const uint8_t* data, size_t len);
bool is_closing();
const struct sockaddr* get_local_address();
const std::string& get_local_ip();
uint16_t get_local_port();
size_t get_num_connections();
private:
bool set_local_address();
/* Pure virtual methods that must be implemented by the subclass. */
protected:
virtual void user_on_tcp_connection_alloc(tcp_connection** connection) = 0;
virtual void user_on_new_tcp_connection(tcp_connection* connection) = 0;
virtual void user_on_tcp_connection_closed(tcp_connection* connection, bool is_closed_by_peer) = 0;
virtual void user_on_tcp_server_closed() = 0;
/* Callbacks fired by UV events. */
public:
void on_uv_connection(int status);
void on_uv_closed();
/* Methods inherited from tcp_connection::listener. */
virtual void on_tcp_connection_closed(tcp_connection* connection, bool is_closed_by_peer) override;
private:
// Allocated by this (may be passed by argument).
uv_tcp_t* m_uv_handle;
uv_loop_t* m_loop;
// Others.
std::unordered_set<tcp_connection*> m_connections;
bool m_is_closing;
protected:
struct sockaddr_storage m_local_addr;
std::string m_local_ip;
uint16_t m_local_port;
};
/* Inline methods. */
inline bool tcp_server::is_closing()
{
return m_is_closing;
}
inline size_t tcp_server::get_num_connections()
{
return m_connections.size();
}
#endif

View File

@ -10,17 +10,40 @@
#include "output_module.h"
#include "modules/osdmodule.h"
#include "modules/ipc/raw_tcp_server.h"
#include "modules/ipc/raw_tcp_connection.h"
#include <uv.h>
#include <mutex>
#include <thread>
class output_network_server :
public raw_tcp_server::listener,
public raw_tcp_connection::listener
{
public:
output_network_server(uv_loop_t* loop) { m_tcp_server = new raw_tcp_server(loop, "0.0.0.0", 8000, 256, this, this); }
virtual ~output_network_server() { delete m_tcp_server; }
void terminate_all() { m_tcp_server->terminate(); }
void send_to_all(const uint8_t* data, size_t len) { m_tcp_server->send_to_all(data,len); }
/* Pure virtual methods inherited from raw_tcp_server::listener. */
virtual void on_raw_tcp_connection_closed(raw_tcp_server* tcpServer, raw_tcp_connection* connection, bool is_closed_by_peer) override { }
/* Pure virtual methods inherited from raw_tcp_connection::listener. */
virtual void on_data_recv(raw_tcp_connection *connection, const uint8_t* data, size_t len) override { }
private:
raw_tcp_server* m_tcp_server;
};
class output_network : public osd_module, public output_module
{
public:
output_network()
: osd_module(OSD_OUTPUT_PROVIDER, "network"), output_module(), m_loop(nullptr)
{
: osd_module(OSD_OUTPUT_PROVIDER, "network"), output_module(), m_loop(nullptr), m_server(nullptr)
{
}
virtual ~output_network() {
}
@ -31,29 +54,33 @@ public:
if (err) {
return 1;
}
m_working_thread = std::thread([](output_network* self) { self->process_output(); }, this);
return 0;
}
virtual void exit() override {
m_working_thread.join();
m_server->terminate_all();
m_working_thread.join();
uv_loop_close(m_loop);
delete m_loop;
delete m_server;
}
// output_module
virtual void notify(const char *outname, INT32 value) override { }
virtual void notify(const char *outname, INT32 value) override { m_server->send_to_all((const uint8_t*)outname, strlen(outname)); }
// implementation
void process_output()
{
m_server = new output_network_server(m_loop);
uv_run(m_loop, UV_RUN_DEFAULT);
}
private:
std::thread m_working_thread;
uv_loop_t* m_loop;
output_network_server *m_server;
};
MODULE_DEFINITION(OUTPUT_NETWORK, output_network)