Replaced code usage of libuv with asio library (nw)

This commit is contained in:
Miodrag Milanovic 2016-10-07 14:15:01 +02:00
parent 85f4741fcf
commit c0a5494de5
20 changed files with 174 additions and 1574 deletions

View File

@ -61,7 +61,6 @@
# USE_SYSTEM_LIB_LUA = 1
# USE_SYSTEM_LIB_PORTMIDI = 1
# USE_SYSTEM_LIB_PORTAUDIO = 1
# USE_SYSTEM_LIB_UV = 1
# USE_BUNDLED_LIB_SDL2 = 1
# MESA_INSTALL_ROOT = /opt/mesa
@ -96,7 +95,6 @@
# FORCE_VERSION_COMPILE = 1
# MSBUILD = 1
# USE_LIBUV = 1
# IGNORE_BAD_LOCALISATION=1
# PRECOMPILE = 0
@ -437,10 +435,6 @@ ifdef USE_BUNDLED_LIB_SDL2
PARAMS += --with-bundled-sdl2
endif
ifdef USE_SYSTEM_LIB_UV
PARAMS += --with-system-uv='$(USE_SYSTEM_LIB_UV)'
endif
#-------------------------------------------------
# distribution may change things
#-------------------------------------------------
@ -737,10 +731,6 @@ ifdef PLATFORM
TARGET_PARAMS += --PLATFORM='$(PLATFORM)'
endif
ifdef USE_LIBUV
PARAMS += --USE_LIBUV='$(USE_LIBUV)'
endif
ifdef PRECOMPILE
PARAMS += --precompile='$(PRECOMPILE)'
endif
@ -1025,7 +1015,7 @@ endif
.PHONY: vs2015_uwp
vs2015_uwp: generate
$(SILENT) $(GENIE) $(PARAMS) $(TARGET_PARAMS) --vs=winstore82 --osd=windows --NO_USE_MIDI=1 --USE_LIBUV=0 --NO_OPENGL=1 --USE_QTDEBUG=0 --MODERN_WIN_API=1 vs2015
$(SILENT) $(GENIE) $(PARAMS) $(TARGET_PARAMS) --vs=winstore82 --osd=windows --NO_USE_MIDI=1 --NO_OPENGL=1 --USE_QTDEBUG=0 --MODERN_WIN_API=1 vs2015
ifdef MSBUILD
$(SILENT) msbuild.exe $(PROJECTDIR_WIN)/vs2015-winstore82/$(PROJECT_NAME).sln $(MSBUILD_PARAMS)
endif

View File

@ -13,7 +13,6 @@ local extlibs = {
portmidi = { "portmidi", "3rdparty/portmidi/pm_common" },
portaudio = { "portaudio", "3rdparty/portaudio/include" },
lua = { "lua", "3rdparty/lua/src" },
uv = { "uv" , "3rdparty/libuv/include" },
}
-- system lib options
@ -52,11 +51,6 @@ newoption {
description = "Use system LUA library",
}
newoption {
trigger = 'with-system-uv',
description = 'Use system uv library',
}
-- build helpers
function ext_lib(lib)
local opt = _OPTIONS["with-system-" .. lib]

View File

@ -975,207 +975,6 @@ end
--}
--end
--------------------------------------------------
-- libuv library objects
--------------------------------------------------
if _OPTIONS["USE_LIBUV"]=="1" then
if not _OPTIONS["with-system-uv"] then
project "uv"
uuid "cd2afe7f-139d-49c3-9000-fc9119f3cea0"
kind "StaticLib"
includedirs {
MAME_DIR .. "3rdparty/libuv/include",
MAME_DIR .. "3rdparty/libuv/src",
MAME_DIR .. "3rdparty/libuv/src/win",
}
configuration { "gmake or ninja" }
buildoptions_c {
"-Wno-strict-prototypes",
"-Wno-bad-function-cast",
"-Wno-write-strings",
"-Wno-missing-braces",
"-Wno-undef",
"-Wno-unused-variable",
}
local version = str_to_version(_OPTIONS["gcc_version"])
if (_OPTIONS["gcc"]~=nil) then
if string.find(_OPTIONS["gcc"], "clang") or string.find(_OPTIONS["gcc"], "android") then
buildoptions_c {
"-Wno-unknown-warning-option",
"-Wno-unknown-attributes",
"-Wno-null-dereference",
"-Wno-unused-but-set-variable",
"-Wno-maybe-uninitialized",
}
else
buildoptions_c {
"-Wno-unused-but-set-variable",
"-Wno-maybe-uninitialized",
}
end
end
configuration { "vs*" }
buildoptions {
"/wd4054", -- warning C4054: 'type cast' : from function pointer 'xxx' to data pointer 'void *'
"/wd4204", -- warning C4204: nonstandard extension used : non-constant aggregate initializer
"/wd4210", -- warning C4210: nonstandard extension used : function given file scope
"/wd4701", -- warning C4701: potentially uninitialized local variable 'xxx' used
"/wd4703", -- warning C4703: potentially uninitialized local pointer variable 'xxx' used
"/wd4477", -- warning C4477: '<function>' : format string '<format-string>' requires an argument of type '<type>', but variadic argument <position> has type '<type>'
}
configuration { }
files {
MAME_DIR .. "3rdparty/libuv/src/fs-poll.c",
MAME_DIR .. "3rdparty/libuv/src/inet.c",
MAME_DIR .. "3rdparty/libuv/src/threadpool.c",
MAME_DIR .. "3rdparty/libuv/src/uv-common.c",
MAME_DIR .. "3rdparty/libuv/src/version.c",
}
if _OPTIONS["targetos"]=="windows" then
defines {
"WIN32_LEAN_AND_MEAN",
"_WIN32_WINNT=0x0502",
}
configuration { }
files {
MAME_DIR .. "3rdparty/libuv/src/win/async.c",
MAME_DIR .. "3rdparty/libuv/src/win/core.c",
MAME_DIR .. "3rdparty/libuv/src/win/dl.c",
MAME_DIR .. "3rdparty/libuv/src/win/error.c",
MAME_DIR .. "3rdparty/libuv/src/win/fs-event.c",
MAME_DIR .. "3rdparty/libuv/src/win/fs.c",
MAME_DIR .. "3rdparty/libuv/src/win/getaddrinfo.c",
MAME_DIR .. "3rdparty/libuv/src/win/getnameinfo.c",
MAME_DIR .. "3rdparty/libuv/src/win/handle.c",
MAME_DIR .. "3rdparty/libuv/src/win/loop-watcher.c",
MAME_DIR .. "3rdparty/libuv/src/win/pipe.c",
MAME_DIR .. "3rdparty/libuv/src/win/poll.c",
MAME_DIR .. "3rdparty/libuv/src/win/process-stdio.c",
MAME_DIR .. "3rdparty/libuv/src/win/process.c",
MAME_DIR .. "3rdparty/libuv/src/win/req.c",
MAME_DIR .. "3rdparty/libuv/src/win/signal.c",
MAME_DIR .. "3rdparty/libuv/src/win/stream.c",
MAME_DIR .. "3rdparty/libuv/src/win/tcp.c",
MAME_DIR .. "3rdparty/libuv/src/win/thread.c",
MAME_DIR .. "3rdparty/libuv/src/win/timer.c",
MAME_DIR .. "3rdparty/libuv/src/win/tty.c",
MAME_DIR .. "3rdparty/libuv/src/win/udp.c",
MAME_DIR .. "3rdparty/libuv/src/win/util.c",
MAME_DIR .. "3rdparty/libuv/src/win/winapi.c",
MAME_DIR .. "3rdparty/libuv/src/win/winsock.c",
}
end
if _OPTIONS["targetos"]~="windows" then
files {
MAME_DIR .. "3rdparty/libuv/src/unix/async.c",
MAME_DIR .. "3rdparty/libuv/src/unix/atomic-ops.h",
MAME_DIR .. "3rdparty/libuv/src/unix/core.c",
MAME_DIR .. "3rdparty/libuv/src/unix/dl.c",
MAME_DIR .. "3rdparty/libuv/src/unix/fs.c",
MAME_DIR .. "3rdparty/libuv/src/unix/getaddrinfo.c",
MAME_DIR .. "3rdparty/libuv/src/unix/getnameinfo.c",
MAME_DIR .. "3rdparty/libuv/src/unix/internal.h",
MAME_DIR .. "3rdparty/libuv/src/unix/loop-watcher.c",
MAME_DIR .. "3rdparty/libuv/src/unix/loop.c",
MAME_DIR .. "3rdparty/libuv/src/unix/pipe.c",
MAME_DIR .. "3rdparty/libuv/src/unix/poll.c",
MAME_DIR .. "3rdparty/libuv/src/unix/process.c",
MAME_DIR .. "3rdparty/libuv/src/unix/signal.c",
MAME_DIR .. "3rdparty/libuv/src/unix/spinlock.h",
MAME_DIR .. "3rdparty/libuv/src/unix/stream.c",
MAME_DIR .. "3rdparty/libuv/src/unix/tcp.c",
MAME_DIR .. "3rdparty/libuv/src/unix/thread.c",
MAME_DIR .. "3rdparty/libuv/src/unix/timer.c",
MAME_DIR .. "3rdparty/libuv/src/unix/tty.c",
MAME_DIR .. "3rdparty/libuv/src/unix/udp.c",
}
end
if _OPTIONS["targetos"]=="linux" then
defines {
"_GNU_SOURCE",
}
files {
MAME_DIR .. "3rdparty/libuv/src/unix/linux-core.c",
MAME_DIR .. "3rdparty/libuv/src/unix/linux-inotify.c",
MAME_DIR .. "3rdparty/libuv/src/unix/linux-syscalls.c",
MAME_DIR .. "3rdparty/libuv/src/unix/linux-syscalls.h",
MAME_DIR .. "3rdparty/libuv/src/unix/proctitle.c",
}
end
if _OPTIONS["targetos"]=="macosx" then
defines {
"_DARWIN_USE_64_BIT_INODE=1",
"_DARWIN_UNLIMITED_SELECT=1",
}
files {
MAME_DIR .. "3rdparty/libuv/src/unix/darwin.c",
MAME_DIR .. "3rdparty/libuv/src/unix/darwin-proctitle.c",
MAME_DIR .. "3rdparty/libuv/src/unix/fsevents.c",
MAME_DIR .. "3rdparty/libuv/src/unix/kqueue.c",
MAME_DIR .. "3rdparty/libuv/src/unix/proctitle.c",
}
end
if _OPTIONS["targetos"]=="android" then
defines {
"_GNU_SOURCE",
}
buildoptions {
"-Wno-header-guard",
}
files {
MAME_DIR .. "3rdparty/libuv/src/unix/proctitle.c",
MAME_DIR .. "3rdparty/libuv/src/unix/linux-core.c",
MAME_DIR .. "3rdparty/libuv/src/unix/linux-inotify.c",
MAME_DIR .. "3rdparty/libuv/src/unix/linux-syscalls.c",
MAME_DIR .. "3rdparty/libuv/src/unix/linux-syscalls.h",
MAME_DIR .. "3rdparty/libuv/src/unix/pthread-fixes.c",
MAME_DIR .. "3rdparty/libuv/src/unix/android-ifaddrs.c",
}
end
if _OPTIONS["targetos"]=="solaris" then
defines {
"__EXTENSIONS__",
"_XOPEN_SOURCE=500",
}
files {
MAME_DIR .. "3rdparty/libuv/src/unix/sunos.c",
}
end
if _OPTIONS["targetos"]=="freebsd" then
files {
MAME_DIR .. "3rdparty/libuv/src/unix/freebsd.c",
MAME_DIR .. "3rdparty/libuv/src/unix/kqueue.c",
}
end
if _OPTIONS["targetos"]=="netbsd" then
files {
MAME_DIR .. "3rdparty/libuv/src/unix/netbsd.c",
MAME_DIR .. "3rdparty/libuv/src/unix/kqueue.c",
}
links {
"kvm",
}
end
if (_OPTIONS["SHADOW_CHECK"]=="1") then
removebuildoptions {
"-Wshadow"
}
end
end
end
--------------------------------------------------
-- SDL2 library
--------------------------------------------------

View File

@ -229,11 +229,6 @@ if (STANDALONE~=true) then
"lualibs",
}
end
if _OPTIONS["USE_LIBUV"]=="1" then
links {
ext_lib("uv"),
}
end
links {
ext_lib("zlib"),
ext_lib("flac"),

View File

@ -50,6 +50,7 @@ function osdmodulesbuild()
}
files {
MAME_DIR .. "src/osd/asio.cpp",
MAME_DIR .. "src/osd/osdnet.cpp",
MAME_DIR .. "src/osd/osdnet.h",
MAME_DIR .. "src/osd/watchdog.cpp",
@ -107,18 +108,6 @@ function osdmodulesbuild()
MAME_DIR .. "src/osd/modules/output/network.cpp",
MAME_DIR .. "src/osd/modules/output/win32_output.cpp",
MAME_DIR .. "src/osd/modules/output/win32_output.h",
MAME_DIR .. "src/osd/modules/ipc/tcp_connection.cpp",
MAME_DIR .. "src/osd/modules/ipc/tcp_connection.h",
MAME_DIR .. "src/osd/modules/ipc/tcp_server.cpp",
MAME_DIR .. "src/osd/modules/ipc/tcp_server.h",
MAME_DIR .. "src/osd/modules/ipc/raw_tcp_connection.cpp",
MAME_DIR .. "src/osd/modules/ipc/raw_tcp_connection.h",
MAME_DIR .. "src/osd/modules/ipc/raw_tcp_server.cpp",
MAME_DIR .. "src/osd/modules/ipc/raw_tcp_server.h",
MAME_DIR .. "src/osd/modules/ipc/rtc_tcp_connection.cpp",
MAME_DIR .. "src/osd/modules/ipc/rtc_tcp_connection.h",
MAME_DIR .. "src/osd/modules/ipc/rtc_tcp_server.cpp",
MAME_DIR .. "src/osd/modules/ipc/rtc_tcp_server.h",
MAME_DIR .. "src/osd/modules/monitor/monitor_common.h",
MAME_DIR .. "src/osd/modules/monitor/monitor_common.cpp",
MAME_DIR .. "src/osd/modules/monitor/monitor_win32.cpp",
@ -126,7 +115,7 @@ function osdmodulesbuild()
MAME_DIR .. "src/osd/modules/monitor/monitor_sdl.cpp",
}
includedirs {
ext_includedir("uv"),
MAME_DIR .. "3rdparty/asio/include",
}
if _OPTIONS["targetos"]=="windows" then

24
src/osd/asio.cpp Normal file
View File

@ -0,0 +1,24 @@
// license:BSD-3-Clause
// copyright-holders:Miodrag Milanovic
/***************************************************************************
asio.cpp
ASIO library implementation loader
***************************************************************************/
// Clang warnings with -Weverything
#ifdef __clang__
#pragma clang diagnostic ignored "-Winconsistent-missing-override"
#elif defined(__GNUC__)
#pragma GCC diagnostic ignored "-Wsuggest-override"
#endif
#define ASIO_STANDALONE
#define ASIO_SEPARATE_COMPILATION
#define ASIO_NOEXCEPT noexcept(true)
#define ASIO_NOEXCEPT_OR_NOTHROW noexcept(true)
#define ASIO_ERROR_CATEGORY_NOEXCEPT noexcept(true)
#include "asio/impl/src.hpp"

24
src/osd/asio.h Normal file
View File

@ -0,0 +1,24 @@
// license:BSD-3-Clause
// copyright-holders:Miodrag Milanovic
/***************************************************************************
asio.hpp
ASIO library implementation loader
***************************************************************************/
#ifdef __clang__
#pragma clang diagnostic ignored "-Winconsistent-missing-override"
#elif defined(__GNUC__)
#pragma GCC diagnostic ignored "-Wsuggest-override"
#endif
#define ASIO_HEADER_ONLY
#define ASIO_STANDALONE
#define ASIO_SEPARATE_COMPILATION
#define ASIO_NOEXCEPT noexcept(true)
#define ASIO_NOEXCEPT_OR_NOTHROW noexcept(true)
#define ASIO_ERROR_CATEGORY_NOEXCEPT noexcept(true)
#include "asio/include/asio.hpp"

View File

@ -1,29 +0,0 @@
// license:BSD-3-Clause
// copyright-holders:Inaki Baz Castillo,Miodrag Milanovic
#include "raw_tcp_connection.h"
/* Instance methods. */
raw_tcp_connection::raw_tcp_connection(listener* listener, size_t bufferSize) :
tcp_connection(bufferSize),
m_listener(listener)
{
}
raw_tcp_connection::~raw_tcp_connection()
{
}
void raw_tcp_connection::user_on_tcp_connection_read()
{
// We may receive multiple packets in the same TCP chunk. If one of them is
// a DTLS Close Alert this would be closed (close() called) so we cannot call
// our listeners anymore.
if (is_closing())
return;
m_listener->on_data_recv(this, m_buffer, m_buffer_data_len);
m_buffer_data_len = 0;
}

View File

@ -1,30 +0,0 @@
// license:BSD-3-Clause
// copyright-holders:Inaki Baz Castillo,Miodrag Milanovic
#ifndef RAW_TCP_CONNECTION_H
#define RAW_TCP_CONNECTION_H
#include "tcp_connection.h"
class raw_tcp_connection : public tcp_connection
{
public:
class listener
{
public:
virtual ~listener(){ }
virtual void on_data_recv(raw_tcp_connection *connection, const uint8_t* data, size_t len) = 0;
};
raw_tcp_connection(listener* listener, size_t bufferSize);
virtual ~raw_tcp_connection();
/* Pure virtual methods inherited from tcp_connection. */
virtual void user_on_tcp_connection_read() override;
private:
// Passed by argument.
listener* m_listener;
};
#endif

View File

@ -1,42 +0,0 @@
// license:BSD-3-Clause
// copyright-holders:Inaki Baz Castillo,Miodrag Milanovic
#include "raw_tcp_server.h"
#include <string>
#define MAX_TCP_CONNECTIONS_PER_SERVER 10
/* Instance methods. */
raw_tcp_server::raw_tcp_server(uv_loop_t* loop, const std::string &ip, uint16_t port, int backlog, listener* listener, raw_tcp_connection::listener* connListener) :
tcp_server(loop, ip, port, backlog),
m_listener(listener),
m_conn_listener(connListener)
{
}
void raw_tcp_server::user_on_tcp_connection_alloc(tcp_connection** connection)
{
// Allocate a new raw_tcp_connection for the raw_tcp_server to handle it.
*connection = new raw_tcp_connection(m_conn_listener, 65536);
}
void raw_tcp_server::user_on_new_tcp_connection(tcp_connection* connection)
{
// Allow just MAX_TCP_CONNECTIONS_PER_SERVER.
if (get_num_connections() > MAX_TCP_CONNECTIONS_PER_SERVER)
connection->close();
}
void raw_tcp_server::user_on_tcp_connection_closed(tcp_connection* connection, bool is_closed_by_peer)
{
// Notify the listener.
// NOTE: Don't do it if closing (since at this point the listener is already freed).
// At the end, this is just called if the connection was remotely closed.
if (!is_closing())
m_listener->on_raw_tcp_connection_closed(this, static_cast<raw_tcp_connection*>(connection), is_closed_by_peer);
}
void raw_tcp_server::user_on_tcp_server_closed()
{
}

View File

@ -1,36 +0,0 @@
// license:BSD-3-Clause
// copyright-holders:Inaki Baz Castillo,Miodrag Milanovic
#ifndef RAW_TCP_SERVER_H
#define RAW_TCP_SERVER_H
#include "tcp_server.h"
#include "tcp_connection.h"
#include "raw_tcp_connection.h"
#include <uv.h>
class raw_tcp_server : public tcp_server
{
public:
class listener
{
public:
virtual ~listener() { }
virtual void on_raw_tcp_connection_closed(raw_tcp_server* tcpServer, raw_tcp_connection* connection, bool is_closed_by_peer) = 0;
};
raw_tcp_server(uv_loop_t* loop, const std::string &ip, uint16_t port, int backlog, listener* listener, raw_tcp_connection::listener* connListener);
/* Pure virtual methods inherited from TCPServer. */
virtual void user_on_tcp_connection_alloc(tcp_connection** connection) override;
virtual void user_on_new_tcp_connection(tcp_connection* connection) override;
virtual void user_on_tcp_connection_closed(tcp_connection* connection, bool is_closed_by_peer) override;
virtual void user_on_tcp_server_closed() override;
private:
// Passed by argument.
listener* m_listener;
raw_tcp_connection::listener* m_conn_listener;
};
#endif

View File

@ -1,150 +0,0 @@
// license:BSD-3-Clause
// copyright-holders:Inaki Baz Castillo,Miodrag Milanovic
#include "emu.h"
#include "rtc_tcp_connection.h"
/* Instance methods. */
rtc_tcp_connection::rtc_tcp_connection(listener* listener, size_t bufferSize) :
tcp_connection(bufferSize),
m_listener(listener),
m_frame_start(0)
{
}
rtc_tcp_connection::~rtc_tcp_connection()
{
}
inline uint16_t get2bytes(const uint8_t* data, size_t i)
{
return (uint16_t)(data[i + 1]) | ((uint16_t)(data[i])) << 8;
}
inline void set2bytes(uint8_t* data, size_t i, uint16_t value)
{
data[i + 1] = (uint8_t)(value);
data[i] = (uint8_t)(value >> 8);
}
void rtc_tcp_connection::user_on_tcp_connection_read()
{
/*
* Framing RFC 4571
*
* 0 1 2 3
* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
* ---------------------------------------------------------------
* | LENGTH | STUN / DTLS / RTP / RTCP |
* ---------------------------------------------------------------
*
* A 16-bit unsigned integer LENGTH field, coded in network byte order
* (big-endian), begins the frame. If LENGTH is non-zero, an RTP or
* RTCP packet follows the LENGTH field. The value coded in the LENGTH
* field MUST equal the number of octets in the RTP or RTCP packet.
* Zero is a valid value for LENGTH, and it codes the null packet.
*/
// Be ready to parse more than a single frame in a single TCP chunk.
while (true)
{
// We may receive multiple packets in the same TCP chunk. If one of them is
// a DTLS Close Alert this would be closed (Close() called) so we cannot call
// our listeners anymore.
if (is_closing())
return;
size_t data_len = m_buffer_data_len - m_frame_start;
size_t packet_len = 0;
if (data_len >= 2)
packet_len = (size_t)get2bytes(m_buffer + m_frame_start, 0);
// We have packet_len bytes.
if ((data_len >= 2) && data_len >= 2 + packet_len)
{
const uint8_t* packet = m_buffer + m_frame_start + 2;
// Notify the listener.
if (packet_len != 0)
{
m_listener->on_packet_recv(this, packet, packet_len);
}
// If there is no more space available in the buffer and that is because
// the latest parsed frame filled it, then empty the full buffer.
if ((m_frame_start + 2 + packet_len) == m_buffer_size)
{
osd_printf_error("no more space in the buffer, emptying the buffer data\n");
m_frame_start = 0;
m_buffer_data_len = 0;
}
// If there is still space in the buffer, set the beginning of the next
// frame to the next position after the parsed frame.
else
{
m_frame_start += 2 + packet_len;
}
// If there is more data in the buffer after the parsed frame then
// parse again. Otherwise break here and wait for more data.
if (m_buffer_data_len > m_frame_start)
{
// osd_printf_error("there is more data after the parsed frame, continue parsing\n");
continue;
}
else
{
break;
}
}
else // Incomplete packet.
{
// Check if the buffer is full.
if (m_buffer_data_len == m_buffer_size)
{
// First case: the incomplete frame does not begin at position 0 of
// the buffer, so move the frame to the position 0.
if (m_frame_start != 0)
{
// osd_printf_error("no more space in the buffer, moving parsed bytes to the beginning of the buffer and wait for more data\n");
std::memmove(m_buffer, m_buffer + m_frame_start, m_buffer_size - m_frame_start);
m_buffer_data_len = m_buffer_size - m_frame_start;
m_frame_start = 0;
}
// Second case: the incomplete frame begins at position 0 of the buffer.
// The frame is too big, so close the connection.
else
{
osd_printf_error("no more space in the buffer for the unfinished frame being parsed, closing the connection\n");
// Close the socket.
close();
}
}
// The buffer is not full.
else
{
osd_printf_verbose("frame not finished yet, waiting for more data\n");
}
// Exit the parsing loop.
break;
}
}
}
void rtc_tcp_connection::send(const uint8_t* data, size_t len)
{
// Write according to Framing RFC 4571.
uint8_t frame_len[2];
set2bytes(frame_len, 0, len);
write(frame_len, 2, data, len);
}

View File

@ -1,33 +0,0 @@
// license:BSD-3-Clause
// copyright-holders:Inaki Baz Castillo,Miodrag Milanovic
#ifndef RTC_TCP_CONNECTION_H
#define RTC_TCP_CONNECTION_H
#include "tcp_connection.h"
class rtc_tcp_connection : public tcp_connection
{
public:
class listener
{
public:
virtual ~listener(){ }
virtual void on_packet_recv(rtc_tcp_connection *connection, const uint8_t* data, size_t len) = 0;
};
rtc_tcp_connection(listener* listener, size_t bufferSize);
virtual ~rtc_tcp_connection();
/* Pure virtual methods inherited from tcp_connection. */
virtual void user_on_tcp_connection_read() override;
void send(const uint8_t* data, size_t len);
private:
// Passed by argument.
listener* m_listener;
size_t m_frame_start; // Where the latest frame starts.
};
#endif

View File

@ -1,42 +0,0 @@
// license:BSD-3-Clause
// copyright-holders:Inaki Baz Castillo,Miodrag Milanovic
#include "rtc_tcp_server.h"
#include <string>
#define MAX_TCP_CONNECTIONS_PER_SERVER 10
/* Instance methods. */
rtc_tcp_server::rtc_tcp_server(uv_loop_t* loop, const std::string &ip, uint16_t port, int backlog, listener* listener, rtc_tcp_connection::listener* connListener) :
tcp_server(loop, ip, port, backlog),
m_listener(listener),
m_conn_listener(connListener)
{
}
void rtc_tcp_server::user_on_tcp_connection_alloc(tcp_connection** connection)
{
// Allocate a new rtc_tcp_connection for the rtc_tcp_server to handle it.
*connection = new rtc_tcp_connection(m_conn_listener, 65536);
}
void rtc_tcp_server::user_on_new_tcp_connection(tcp_connection* connection)
{
// Allow just MAX_TCP_CONNECTIONS_PER_SERVER.
if (get_num_connections() > MAX_TCP_CONNECTIONS_PER_SERVER)
connection->close();
}
void rtc_tcp_server::user_on_tcp_connection_closed(tcp_connection* connection, bool is_closed_by_peer)
{
// Notify the listener.
// NOTE: Don't do it if closing (since at this point the listener is already freed).
// At the end, this is just called if the connection was remotely closed.
if (!is_closing())
m_listener->on_rtc_tcp_connection_closed(this, static_cast<rtc_tcp_connection*>(connection), is_closed_by_peer);
}
void rtc_tcp_server::user_on_tcp_server_closed()
{
}

View File

@ -1,36 +0,0 @@
// license:BSD-3-Clause
// copyright-holders:Inaki Baz Castillo,Miodrag Milanovic
#ifndef RTC_TCP_SERVER_H
#define RTC_TCP_SERVER_H
#include "tcp_server.h"
#include "tcp_connection.h"
#include "rtc_tcp_connection.h"
#include <uv.h>
class rtc_tcp_server : public tcp_server
{
public:
class listener
{
public:
virtual ~listener() { }
virtual void on_rtc_tcp_connection_closed(rtc_tcp_server* tcpServer, rtc_tcp_connection* connection, bool is_closed_by_peer) = 0;
};
rtc_tcp_server(uv_loop_t* loop, const std::string &ip, uint16_t port, int backlog, listener* listener, rtc_tcp_connection::listener* connListener);
/* Pure virtual methods inherited from TCPServer. */
virtual void user_on_tcp_connection_alloc(tcp_connection** connection) override;
virtual void user_on_new_tcp_connection(tcp_connection* connection) override;
virtual void user_on_tcp_connection_closed(tcp_connection* connection, bool is_closed_by_peer) override;
virtual void user_on_tcp_server_closed() override;
private:
// Passed by argument.
listener* m_listener;
rtc_tcp_connection::listener* m_conn_listener;
};
#endif

View File

@ -1,414 +0,0 @@
// license:BSD-3-Clause
// copyright-holders:Inaki Baz Castillo,Miodrag Milanovic
#include "emu.h"
#include "tcp_connection.h"
#include <cstdint> // uint8_t, etc
#include <cstdlib> // std::malloc(), std::free()
/* Static methods for UV callbacks. */
static inline void on_alloc(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
{
static_cast<tcp_connection*>(handle->data)->on_uv_read_alloc(suggested_size, buf);
}
static inline void on_read(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf)
{
static_cast<tcp_connection*>(handle->data)->on_uv_read(nread, buf);
}
static inline void on_write(uv_write_t* req, int status)
{
tcp_connection::tcp_uv_write_data* write_data = static_cast<tcp_connection::tcp_uv_write_data*>(req->data);
tcp_connection* connection = write_data->connection;
// Delete the UvWriteData struct (which includes the uv_req_t and the store char[]).
std::free(write_data);
// Just notify the TCPConnection when error.
if (status)
connection->on_uv_write_error(status);
}
static inline void on_shutdown(uv_shutdown_t* req, int status)
{
static_cast<tcp_connection*>(req->data)->on_uv_shutdown(req, status);
}
static inline void on_close(uv_handle_t* handle)
{
static_cast<tcp_connection*>(handle->data)->on_uv_closed();
}
/* Instance methods. */
tcp_connection::tcp_connection(size_t bufferSize) :
m_listener(nullptr),
m_local_addr(nullptr),
m_is_closing(false),
m_is_closed_by_peer(false),
m_has_error(false),
m_buffer_size(bufferSize),
m_buffer(nullptr),
m_buffer_data_len(0),
m_local_port(0),
m_peer_port(0)
{
m_uv_handle = new uv_tcp_t;
m_uv_handle->data = (void*)this;
// NOTE: Don't allocate the buffer here. Instead wait for the first uv_alloc_cb().
}
tcp_connection::~tcp_connection()
{
if (m_uv_handle)
delete m_uv_handle;
if (m_buffer)
delete[] m_buffer;
}
void tcp_connection::setup(uv_loop_t* loop, listener* listener, struct sockaddr_storage* localAddr, const std::string &localIP, uint16_t localPort)
{
int err;
// Set the UV handle.
err = uv_tcp_init(loop, m_uv_handle);
if (err)
{
delete m_uv_handle;
m_uv_handle = nullptr;
throw emu_fatalerror("uv_tcp_init() failed: %s", uv_strerror(err));
}
// Set the listener.
m_listener = listener;
// Set the local address.
m_local_addr = localAddr;
m_local_ip = localIP;
m_local_port = localPort;
}
void tcp_connection::close()
{
if (m_is_closing)
return;
int err;
m_is_closing = true;
// Don't read more.
err = uv_read_stop((uv_stream_t*)m_uv_handle);
if (err)
throw emu_fatalerror("uv_read_stop() failed: %s", uv_strerror(err));
// If there is no error and the peer didn't close its connection side then close gracefully.
if (!m_has_error && !m_is_closed_by_peer)
{
// Use uv_shutdown() so pending data to be written will be sent to the peer
// before closing.
uv_shutdown_t* req = new uv_shutdown_t;
req->data = (void*)this;
err = uv_shutdown(req, (uv_stream_t*)m_uv_handle, (uv_shutdown_cb)on_shutdown);
if (err)
throw emu_fatalerror("uv_shutdown() failed: %s", uv_strerror(err));
}
// Otherwise directly close the socket.
else
{
uv_close((uv_handle_t*)m_uv_handle, (uv_close_cb)on_close);
}
}
void tcp_connection::terminate()
{
if (m_is_closing)
return;
m_is_closing = true;
uv_close((uv_handle_t*)m_uv_handle, (uv_close_cb)on_close);
}
void tcp_connection::dump()
{
osd_printf_verbose("[TCP, local:%s :%d, remote:%s :%d, status:%s]\r\n",
m_local_ip.c_str(), (uint16_t)m_local_port,
m_peer_ip.c_str(), (uint16_t)m_peer_port,
(!m_is_closing) ? "open" : "closed");
}
void tcp_connection::start()
{
if (m_is_closing)
return;
int err;
err = uv_read_start((uv_stream_t*)m_uv_handle, (uv_alloc_cb)on_alloc, (uv_read_cb)on_read);
if (err)
throw emu_fatalerror("uv_read_start() failed: %s", uv_strerror(err));
// Get the peer address.
if (!set_peer_address())
throw emu_fatalerror("error setting peer IP and port");
}
void tcp_connection::write(const uint8_t* data, size_t len)
{
if (m_is_closing)
return;
if (len == 0)
return;
uv_buf_t buffer;
int written;
int err;
// First try uv_try_write(). In case it can not directly write all the given
// data then build a uv_req_t and use uv_write().
buffer = uv_buf_init((char*)data, len);
written = uv_try_write((uv_stream_t*)m_uv_handle, &buffer, 1);
// All the data was written. Done.
if (written == (int)len)
{
return;
}
// Cannot write any data at first time. Use uv_write().
else if (written == UV_EAGAIN || written == UV_ENOSYS)
{
// Set written to 0 so pending_len can be properly calculated.
written = 0;
}
// Error. Should not happen.
else if (written < 0)
{
osd_printf_warning("uv_try_write() failed, closing the connection: %s\n", uv_strerror(written));
close();
return;
}
// osd_printf_info("could just write %zu bytes (%zu given) at first time, using uv_write() now", (size_t)written, len);
size_t pending_len = len - written;
// Allocate a special UvWriteData struct pointer.
tcp_uv_write_data* write_data = (tcp_uv_write_data*)std::malloc(sizeof(tcp_uv_write_data) + pending_len);
write_data->connection = this;
std::memcpy(write_data->store, data + written, pending_len);
write_data->req.data = (void*)write_data;
buffer = uv_buf_init((char*)write_data->store, pending_len);
err = uv_write(&write_data->req, (uv_stream_t*)m_uv_handle, &buffer, 1, (uv_write_cb)on_write);
if (err)
throw emu_fatalerror("uv_write() failed: %s", uv_strerror(err));
}
void tcp_connection::write(const uint8_t* data1, size_t len1, const uint8_t* data2, size_t len2)
{
if (m_is_closing)
return;
if (len1 == 0 && len2 == 0)
return;
size_t total_len = len1 + len2;
uv_buf_t buffers[2];
int written;
int err;
// First try uv_try_write(). In case it can not directly write all the given
// data then build a uv_req_t and use uv_write().
buffers[0] = uv_buf_init((char*)data1, len1);
buffers[1] = uv_buf_init((char*)data2, len2);
written = uv_try_write((uv_stream_t*)m_uv_handle, buffers, 2);
// All the data was written. Done.
if (written == (int)total_len)
{
return;
}
// Cannot write any data at first time. Use uv_write().
else if (written == UV_EAGAIN || written == UV_ENOSYS)
{
// Set written to 0 so pending_len can be properly calculated.
written = 0;
}
// Error. Should not happen.
else if (written < 0)
{
osd_printf_warning("uv_try_write() failed, closing the connection: %s\n", uv_strerror(written));
close();
return;
}
// osd_printf_info("could just write %zu bytes (%zu given) at first time, using uv_write() now", (size_t)written, total_len);
size_t pending_len = total_len - written;
// Allocate a special UvWriteData struct pointer.
tcp_uv_write_data* write_data = (tcp_uv_write_data*)std::malloc(sizeof(tcp_uv_write_data) + pending_len);
write_data->connection = this;
// If the first buffer was not entirely written then splice it.
if ((size_t)written < len1)
{
std::memcpy(write_data->store, data1 + (size_t)written, len1 - (size_t)written);
std::memcpy(write_data->store + (len1 - (size_t)written), data2, len2);
}
// Otherwise just take the pending data in the second buffer.
else
{
std::memcpy(write_data->store, data2 + ((size_t)written - len1), len2 - ((size_t)written - len1));
}
write_data->req.data = (void*)write_data;
uv_buf_t buffer = uv_buf_init((char*)write_data->store, pending_len);
err = uv_write(&write_data->req, (uv_stream_t*)m_uv_handle, &buffer, 1, (uv_write_cb)on_write);
if (err)
throw emu_fatalerror("uv_write() failed: %s", uv_strerror(err));
}
void GetAddressInfo(const struct sockaddr* addr, int* family, std::string &ip, uint16_t* port)
{
char _ip[INET6_ADDRSTRLEN + 1];
int err;
err = uv_inet_ntop(AF_INET, &((struct sockaddr_in*)addr)->sin_addr, _ip, INET_ADDRSTRLEN);
if (err)
throw emu_fatalerror("uv_inet_ntop() failed: %s", uv_strerror(err));
*port = (uint16_t)ntohs(((struct sockaddr_in*)addr)->sin_port);
*family = addr->sa_family;
ip.assign(_ip);
}
bool tcp_connection::set_peer_address()
{
int err;
int len = sizeof(m_peer_addr);
err = uv_tcp_getpeername(m_uv_handle, (struct sockaddr*)&m_peer_addr, &len);
if (err)
{
osd_printf_error("uv_tcp_getpeername() failed: %s\n", uv_strerror(err));
return false;
}
int family;
GetAddressInfo((const struct sockaddr*)&m_peer_addr, &family, m_peer_ip, &m_peer_port);
return true;
}
void tcp_connection::on_uv_read_alloc(size_t suggested_size, uv_buf_t* buf)
{
// If this is the first call to onUvReadAlloc() then allocate the receiving buffer now.
if (!m_buffer)
m_buffer = new uint8_t[m_buffer_size];
// Tell UV to write after the last data byte in the buffer.
buf->base = (char *)(m_buffer + m_buffer_data_len);
// Give UV all the remaining space in the buffer.
if (m_buffer_size > m_buffer_data_len)
{
buf->len = m_buffer_size - m_buffer_data_len;
}
else
{
buf->len = 0;
osd_printf_warning("no available space in the buffer\n");
}
}
void tcp_connection::on_uv_read(::ssize_t nread, const uv_buf_t* buf)
{
if (m_is_closing)
return;
if (nread == 0)
return;
// Data received.
if (nread > 0)
{
// Update the buffer data length.
m_buffer_data_len += (size_t)nread;
// Notify the subclass.
user_on_tcp_connection_read();
}
// Client disconneted.
else if (nread == UV_EOF || nread == UV_ECONNRESET)
{
osd_printf_verbose("connection closed by peer, closing server side\n");
m_is_closed_by_peer = true;
// Close server side of the connection.
close();
}
// Some error.
else
{
osd_printf_verbose("read error, closing the connection: %s\n", uv_strerror(nread));
m_has_error = true;
// Close server side of the connection.
close();
}
}
void tcp_connection::on_uv_write_error(int error)
{
if (m_is_closing)
return;
if (error == UV_EPIPE || error == UV_ENOTCONN)
{
osd_printf_verbose("write error, closing the connection: %s\n", uv_strerror(error));
}
else
{
osd_printf_verbose("write error, closing the connection: %s\n", uv_strerror(error));
m_has_error = true;
}
close();
}
void tcp_connection::on_uv_shutdown(uv_shutdown_t* req, int status)
{
delete req;
if (status == UV_EPIPE || status == UV_ENOTCONN || status == UV_ECANCELED)
osd_printf_verbose("shutdown error: %s\n", uv_strerror(status));
else if (status)
osd_printf_verbose("shutdown error: %s\n", uv_strerror(status));
// Now do close the handle.
uv_close((uv_handle_t*)m_uv_handle, (uv_close_cb)on_close);
}
void tcp_connection::on_uv_closed()
{
// Notify the listener.
m_listener->on_tcp_connection_closed(this, m_is_closed_by_peer);
// And delete this.
delete this;
}

View File

@ -1,137 +0,0 @@
// license:BSD-3-Clause
// copyright-holders:Inaki Baz Castillo,Miodrag Milanovic
#ifndef TCP_CONNECTION_H
#define TCP_CONNECTION_H
#include <string>
#include <uv.h>
extern void GetAddressInfo(const sockaddr * addr, int * family, std::string & ip, uint16_t * port);
class tcp_connection
{
public:
class listener
{
public:
virtual ~listener() { }
virtual void on_tcp_connection_closed(tcp_connection* connection, bool is_closed_by_peer) = 0;
};
/* Struct for the data field of uv_req_t when writing into the connection. */
struct tcp_uv_write_data
{
tcp_connection* connection;
uv_write_t req;
uint8_t store[1];
};
tcp_connection(size_t bufferSize);
virtual ~tcp_connection();
void close();
void terminate();
virtual void dump();
void setup(uv_loop_t* loop, listener* listener, struct sockaddr_storage* localAddr, const std::string &localIP, uint16_t localPort);
bool is_closing();
uv_tcp_t* get_uv_handle();
void start();
void write(const uint8_t* data, size_t len);
void write(const uint8_t* data1, size_t len1, const uint8_t* data2, size_t len2);
void write(const std::string &data);
const struct sockaddr* get_local_address();
const std::string& get_local_ip();
uint16_t get_local_port();
const struct sockaddr* get_peer_address();
const std::string& get_peer_ip();
uint16_t get_peer_port();
private:
bool set_peer_address();
/* Callbacks fired by UV events. */
public:
void on_uv_read_alloc(size_t suggested_size, uv_buf_t* buf);
void on_uv_read(ssize_t nread, const uv_buf_t* buf);
void on_uv_write_error(int error);
void on_uv_shutdown(uv_shutdown_t* req, int status);
void on_uv_closed();
/* Pure virtual methods that must be implemented by the subclass. */
protected:
virtual void user_on_tcp_connection_read() = 0;
private:
// Passed by argument.
listener* m_listener;
// Allocated by this.
uv_tcp_t* m_uv_handle;
// Others.
struct sockaddr_storage* m_local_addr;
bool m_is_closing;
bool m_is_closed_by_peer;
bool m_has_error;
protected:
// Passed by argument.
size_t m_buffer_size;
// Allocated by this.
uint8_t* m_buffer;
// Others.
size_t m_buffer_data_len;
std::string m_local_ip;
uint16_t m_local_port;
struct sockaddr_storage m_peer_addr;
std::string m_peer_ip;
uint16_t m_peer_port;
};
/* Inline methods. */
inline bool tcp_connection::is_closing()
{
return m_is_closing;
}
inline uv_tcp_t* tcp_connection::get_uv_handle()
{
return m_uv_handle;
}
inline void tcp_connection::write(const std::string &data)
{
write((const uint8_t*)data.c_str(), data.size());
}
inline const sockaddr* tcp_connection::get_local_address()
{
return (const struct sockaddr*)m_local_addr;
}
inline const std::string& tcp_connection::get_local_ip()
{
return m_local_ip;
}
inline uint16_t tcp_connection::get_local_port()
{
return m_local_port;
}
inline const sockaddr* tcp_connection::get_peer_address()
{
return (const struct sockaddr*)&m_peer_addr;
}
inline const std::string& tcp_connection::get_peer_ip()
{
return m_peer_ip;
}
inline uint16_t tcp_connection::get_peer_port()
{
return m_peer_port;
}
#endif

View File

@ -1,279 +0,0 @@
// license:BSD-3-Clause
// copyright-holders:Inaki Baz Castillo,Miodrag Milanovic
#include "emu.h"
#include "tcp_server.h"
/* Static methods for UV callbacks. */
static inline void on_connection(uv_stream_t* handle, int status)
{
static_cast<tcp_server*>(handle->data)->on_uv_connection(status);
}
static inline void on_close(uv_handle_t* handle)
{
static_cast<tcp_server*>(handle->data)->on_uv_closed();
}
static inline void on_error_close(uv_handle_t* handle)
{
delete handle;
}
/* Instance methods. */
tcp_server::tcp_server(uv_loop_t* loop, const std::string &ip, uint16_t port, int backlog)
: m_uv_handle(nullptr),
m_loop(nullptr),
m_is_closing(false),
m_local_port(0)
{
int err;
int flags = 0;
m_uv_handle = new uv_tcp_t;
m_uv_handle->data = (void*)this;
m_loop = loop;
err = uv_tcp_init(loop, m_uv_handle);
if (err)
{
delete m_uv_handle;
m_uv_handle = nullptr;
throw emu_fatalerror("uv_tcp_init() failed: %s", uv_strerror(err));
}
struct sockaddr_storage bind_addr;
err = uv_ip4_addr(ip.c_str(), (int)port, (struct sockaddr_in*)&bind_addr);
if (err)
throw emu_fatalerror("uv_ipv4_addr() failed: %s", uv_strerror(err));
err = uv_tcp_bind(m_uv_handle, (const struct sockaddr*)&bind_addr, flags);
if (err)
{
uv_close((uv_handle_t*)m_uv_handle, (uv_close_cb)on_error_close);
throw emu_fatalerror("uv_tcp_bind() failed: %s", uv_strerror(err));
}
err = uv_listen((uv_stream_t*)m_uv_handle, backlog, (uv_connection_cb)on_connection);
if (err)
{
uv_close((uv_handle_t*)m_uv_handle, (uv_close_cb)on_error_close);
throw emu_fatalerror("uv_listen() failed: %s", uv_strerror(err));
}
// Set local address.
if (!set_local_address())
{
uv_close((uv_handle_t*)m_uv_handle, (uv_close_cb)on_error_close);
throw emu_fatalerror("error setting local IP and port");
}
}
tcp_server::~tcp_server()
{
if (m_uv_handle)
delete m_uv_handle;
}
void tcp_server::close()
{
if (m_is_closing)
return;
m_is_closing = true;
// If there are no connections then close now.
if (m_connections.empty())
{
uv_close((uv_handle_t*)m_uv_handle, (uv_close_cb)on_close);
}
// Otherwise close all the connections (but not the TCP server).
else
{
osd_printf_verbose("closing %d active connections\n", (int)m_connections.size());
for (auto it = m_connections.begin(); it != m_connections.end(); ++it)
{
tcp_connection* connection = *it;
connection->close();
}
}
}
void tcp_server::terminate()
{
if (m_is_closing)
return;
m_is_closing = true;
// If there are no connections then close now.
if (m_connections.empty())
{
uv_close((uv_handle_t*)m_uv_handle, (uv_close_cb)on_close);
}
// Otherwise close all the connections (but not the TCP server).
else
{
osd_printf_verbose("closing %d active connections\n", (int)m_connections.size());
for (auto it = m_connections.begin(); it != m_connections.end(); ++it)
{
tcp_connection* connection = *it;
connection->terminate();
}
}
}
void tcp_server::send_to_all(const uint8_t* data, size_t len)
{
// If there are no connections then close now.
if (!m_connections.empty())
{
for (auto it = m_connections.begin(); it != m_connections.end(); ++it)
{
tcp_connection* connection = *it;
connection->write(data, len);
}
}
}
void tcp_server::dump()
{
osd_printf_verbose("[TCP, local:%s :%d, status:%s, connections:%d]",
m_local_ip.c_str(), (uint16_t)m_local_port,
(!m_is_closing) ? "open" : "closed",
(int)m_connections.size());
}
const sockaddr* tcp_server::get_local_address()
{
return (const struct sockaddr*)&m_local_addr;
}
const std::string& tcp_server::get_local_ip()
{
return m_local_ip;
}
uint16_t tcp_server::get_local_port()
{
return m_local_port;
}
bool tcp_server::set_local_address()
{
int err;
int len = sizeof(m_local_addr);
err = uv_tcp_getsockname(m_uv_handle, (struct sockaddr*)&m_local_addr, &len);
if (err)
{
osd_printf_error("uv_tcp_getsockname() failed: %s\n", uv_strerror(err));
return false;
}
int family;
GetAddressInfo((const struct sockaddr*)&m_local_addr, &family, m_local_ip, &m_local_port);
return true;
}
inline void tcp_server::on_uv_connection(int status)
{
if (m_is_closing)
return;
int err;
if (status)
{
osd_printf_error("error while receiving a new TCP connection: %s\n", uv_strerror(status));
return;
}
// Notify the subclass so it provides an allocated derived class of TCPConnection.
tcp_connection* connection = nullptr;
user_on_tcp_connection_alloc(&connection);
if (connection == nullptr)
osd_printf_error("tcp_server pointer was not allocated by the user\n");
try
{
connection->setup(m_loop, this, &(m_local_addr), m_local_ip, m_local_port);
}
catch (...)
{
delete connection;
return;
}
// Accept the connection.
err = uv_accept((uv_stream_t*)m_uv_handle, (uv_stream_t*)connection->get_uv_handle());
if (err)
throw emu_fatalerror("uv_accept() failed: %s\n", uv_strerror(err));
// Insert the TCPConnection in the set.
m_connections.insert(connection);
// Start receiving data.
try
{
connection->start();
}
catch (emu_exception &error)
{
osd_printf_error("cannot run the TCP connection, closing the connection: %s\n", error.what());
connection->close();
// NOTE: Don't return here so the user won't be notified about a "onclose" for a TCP connection
// for which there was not a previous "onnew" event.
}
osd_printf_verbose("new TCP connection:\n");
connection->dump();
// Notify the subclass.
user_on_new_tcp_connection(connection);
}
void tcp_server::on_uv_closed()
{
// Motify the subclass.
user_on_tcp_server_closed();
}
void tcp_server::on_tcp_connection_closed(tcp_connection* connection, bool is_closed_by_peer)
{
// NOTE:
// Worst scenario is that in which this is the latest connection,
// which is remotely closed (no tcp_server.Close() was called) and the user
// call tcp_server.Close() on userOnTCPConnectionClosed() callback, so Close()
// is called with zero connections and calls uv_close(), but then
// onTCPConnectionClosed() continues and finds that isClosing is true and
// there are zero connections, so calls uv_close() again and get a crash.
//
// SOLUTION:
// Check isClosing value *before* onTCPConnectionClosed() callback.
bool wasClosing = m_is_closing;
osd_printf_verbose("TCP connection closed:\n");
connection->dump();
// Remove the TCPConnection from the set.
m_connections.erase(connection);
// Notify the subclass.
user_on_tcp_connection_closed(connection, is_closed_by_peer);
// Check if the server was closing connections, and if this is the last
// connection then close the server now.
if (wasClosing && m_connections.empty())
uv_close((uv_handle_t*)m_uv_handle, (uv_close_cb)on_close);
}

View File

@ -1,72 +0,0 @@
// license:BSD-3-Clause
// copyright-holders:Inaki Baz Castillo,Miodrag Milanovic
#ifndef TCP_SERVER_H
#define TCP_SERVER_H
#include "tcp_connection.h"
#include <string>
#include <unordered_set>
#include <uv.h>
class tcp_server : public tcp_connection::listener
{
public:
tcp_server(uv_loop_t* m_loop, const std::string &ip, uint16_t port, int backlog);
virtual ~tcp_server();
void close();
void terminate();
virtual void dump();
void send_to_all(const uint8_t* data, size_t len);
bool is_closing();
const struct sockaddr* get_local_address();
const std::string& get_local_ip();
uint16_t get_local_port();
size_t get_num_connections();
private:
bool set_local_address();
/* Pure virtual methods that must be implemented by the subclass. */
protected:
virtual void user_on_tcp_connection_alloc(tcp_connection** connection) = 0;
virtual void user_on_new_tcp_connection(tcp_connection* connection) = 0;
virtual void user_on_tcp_connection_closed(tcp_connection* connection, bool is_closed_by_peer) = 0;
virtual void user_on_tcp_server_closed() = 0;
/* Callbacks fired by UV events. */
public:
void on_uv_connection(int status);
void on_uv_closed();
/* Methods inherited from tcp_connection::listener. */
virtual void on_tcp_connection_closed(tcp_connection* connection, bool is_closed_by_peer) override;
private:
// Allocated by this (may be passed by argument).
uv_tcp_t* m_uv_handle;
uv_loop_t* m_loop;
// Others.
std::unordered_set<tcp_connection*> m_connections;
bool m_is_closing;
protected:
struct sockaddr_storage m_local_addr;
std::string m_local_ip;
uint16_t m_local_port;
};
/* Inline methods. */
inline bool tcp_server::is_closing()
{
return m_is_closing;
}
inline size_t tcp_server::get_num_connections()
{
return m_connections.size();
}
#endif

View File

@ -6,63 +6,147 @@
Network output interface.
*******************************************************************c********/
***************************************************************************/
#include "output_module.h"
#include "modules/osdmodule.h"
#include "modules/ipc/raw_tcp_server.h"
#include "modules/ipc/raw_tcp_connection.h"
#include <uv.h>
#include <mutex>
#include <thread>
#include <set>
#include "asio.h"
class output_network_server :
public raw_tcp_server::listener,
public raw_tcp_connection::listener
class output_client
{
public:
output_network_server(uv_loop_t* loop) { m_tcp_server = new raw_tcp_server(loop, "0.0.0.0", 8000, 256, this, this); }
virtual ~output_network_server() { delete m_tcp_server; }
void terminate_all() const { m_tcp_server->terminate(); }
void send_to_all(const uint8_t* data, size_t len) const { m_tcp_server->send_to_all(data,len); }
/* Pure virtual methods inherited from raw_tcp_server::listener. */
virtual void on_raw_tcp_connection_closed(raw_tcp_server* tcpServer, raw_tcp_connection* connection, bool is_closed_by_peer) override { }
/* Pure virtual methods inherited from raw_tcp_connection::listener. */
virtual void on_data_recv(raw_tcp_connection *connection, const uint8_t* data, size_t len) override { }
private:
raw_tcp_server* m_tcp_server;
virtual ~output_client() {}
virtual void deliver(std::string &msg) = 0;
};
using output_client_ptr = std::shared_ptr<output_client>;
using client_set = std::set<output_client_ptr>;
class output_session : public output_client,
public std::enable_shared_from_this<output_session>
{
public:
output_session(asio::ip::tcp::socket socket, client_set *clients) :
m_socket(std::move(socket)),
m_clients(clients)
{
}
void start()
{
m_clients->insert(shared_from_this());
do_read();
}
private:
void deliver(std::string &msg)
{
std::strncpy(m_data, msg.c_str(), max_length);
do_write(msg.size());
}
void do_read()
{
auto self(shared_from_this());
m_socket.async_read_some(asio::buffer(m_input_m_data, max_length),
[this, self](std::error_code ec, std::size_t length)
{
if (!ec)
{
do_read();
}
else
{
m_clients->erase(shared_from_this());
}
});
}
void do_write(std::size_t length)
{
auto self(shared_from_this());
asio::async_write(m_socket, asio::buffer(m_data, length),
[this, self](std::error_code ec, std::size_t /*length*/)
{
if (ec)
{
m_clients->erase(shared_from_this());
}
});
}
asio::ip::tcp::socket m_socket;
enum { max_length = 1024 };
char m_data[max_length];
char m_input_m_data[max_length];
client_set *m_clients;
};
class output_network_server
{
public:
output_network_server(asio::io_context& io_context, short port) :
m_acceptor(io_context, asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port))
{
do_accept();
}
void deliver_to_all(std::string msg)
{
for (auto client: m_clients)
client->deliver(msg);
}
private:
void do_accept()
{
m_acceptor.async_accept(
[this](std::error_code ec, asio::ip::tcp::socket socket)
{
if (!ec)
{
std::make_shared<output_session>(std::move(socket),&m_clients)->start();
}
do_accept();
});
}
asio::ip::tcp::acceptor m_acceptor;
client_set m_clients;
};
class output_network : public osd_module, public output_module
{
public:
output_network()
: osd_module(OSD_OUTPUT_PROVIDER, "network"), output_module(), m_loop(nullptr), m_server(nullptr)
{
: osd_module(OSD_OUTPUT_PROVIDER, "network"),
output_module(),
m_io_context(nullptr), m_server(nullptr)
{
}
virtual ~output_network() {
virtual ~output_network()
{
}
virtual int init(const osd_options &options) override {
m_loop = new uv_loop_t;
int err = uv_loop_init(m_loop);
if (err) {
return 1;
}
virtual int init(const osd_options &options) override
{
m_working_thread = std::thread([](output_network* self) { self->process_output(); }, this);
return 0;
}
virtual void exit() override {
m_server->terminate_all();
virtual void exit() override
{
m_io_context->stop();
m_working_thread.join();
uv_loop_close(m_loop);
delete m_loop;
delete m_server;
delete m_io_context;
}
// output_module
@ -71,19 +155,20 @@ public:
{
static char buf[256];
sprintf(buf, "%s = %d\n", ((outname==nullptr) ? "none" : outname), value);
m_server->send_to_all((const uint8_t*)buf, strlen(buf));
m_server->deliver_to_all(buf);
}
// implementation
void process_output()
{
m_server = new output_network_server(m_loop);
uv_run(m_loop, UV_RUN_DEFAULT);
m_io_context = new asio::io_context();
m_server = new output_network_server(*m_io_context, 8000);
m_io_context->run();
}
private:
std::thread m_working_thread;
uv_loop_t* m_loop;
asio::io_context *m_io_context;
output_network_server *m_server;
};