From a4931debb73baf2f74484337bf584d55291e9b00 Mon Sep 17 00:00:00 2001 From: Michael Zapf Date: Sat, 9 Jul 2022 22:20:20 +0200 Subject: [PATCH] Fixes communication with WebSocket servers that send data immediately after the handshake. --- src/lib/util/client_ws.hpp | 50 +++++++++++++++++++++++++++++++------- 1 file changed, 41 insertions(+), 9 deletions(-) diff --git a/src/lib/util/client_ws.hpp b/src/lib/util/client_ws.hpp index 9488a1cf12b..48d4885369b 100644 --- a/src/lib/util/client_ws.hpp +++ b/src/lib/util/client_ws.hpp @@ -1,5 +1,21 @@ // license:MIT // copyright-holders:Ole Christian Eidheim, Miodrag Milanovic + +/* + Changes to the websocket client + + July 2022: (Michael Zapf) + Symptom: During the initial handshake, the ws client reads the HTTP header + from the server and then waits for more data to come. In cases where the + websocket server sends data immediately after the handshake to the client, + these data may already be available in the streambuf without triggering + a new reception event, so the client may wait forever for a new reception. + + Correction: When going into the async_read, the client only waits for so + many bytes as to satisfy the next read from the streambuf. For this end, + the Message::needed_for method has been added. Also, leftover bytes are + copied to the next message instance. +*/ #ifndef CLIENT_WS_HPP #define CLIENT_WS_HPP @@ -123,16 +139,28 @@ namespace webpp { size_t size() const { return length; } - std::string string() const { - std::stringstream ss; - ss << rdbuf(); - return ss.str(); + std::string string() { + if (s.size() != length) { + asio::streambuf::const_buffers_type bufs = streambuf.data(); + s.assign(asio::buffers_begin(bufs), asio::buffers_begin(bufs)+length); + streambuf.consume(length); + } + return s; + } + + // Return the number of bytes that are missing in the streambuf + // before n bytes can be read from it + size_t needed_for(size_t n) const { + if (streambuf.size() >= n) + return 0; + return n - streambuf.size(); } private: Message(): std::istream(&streambuf), fin_rsv_opcode(0), length(0) { } size_t length; asio::streambuf streambuf; + std::string s; }; std::function on_open; @@ -346,10 +374,10 @@ namespace webpp { } void read_message(const std::shared_ptr &message) { - asio::async_read(*connection->socket, message->streambuf, asio::transfer_exactly(2), + asio::async_read(*connection->socket, message->streambuf, asio::transfer_exactly(message->needed_for(2)), [this, message](const std::error_code& ec, size_t bytes_transferred) { if(!ec) { - if(bytes_transferred==0) { //TODO: This might happen on server at least, might also happen here + if(message->streambuf.size()+bytes_transferred==0) { //TODO: This might happen on server at least, might also happen here read_message(message); return; } @@ -373,7 +401,7 @@ namespace webpp { if(length==126) { //2 next bytes is the size of content - asio::async_read(*connection->socket, message->streambuf, asio::transfer_exactly(2), + asio::async_read(*connection->socket, message->streambuf, asio::transfer_exactly(message->needed_for(2)), [this, message] (const std::error_code& ec, size_t /*bytes_transferred*/) { if(!ec) { @@ -395,7 +423,7 @@ namespace webpp { } else if(length==127) { //8 next bytes is the size of content - asio::async_read(*connection->socket, message->streambuf, asio::transfer_exactly(8), + asio::async_read(*connection->socket, message->streambuf, asio::transfer_exactly(message->needed_for(8)), [this, message] (const std::error_code& ec, size_t /*bytes_transferred*/) { if(!ec) { @@ -426,7 +454,7 @@ namespace webpp { } void read_message_content(const std::shared_ptr &message) { - asio::async_read(*connection->socket, message->streambuf, asio::transfer_exactly(message->length), + asio::async_read(*connection->socket, message->streambuf, asio::transfer_exactly(message->needed_for(message->length)), [this, message] (const std::error_code& ec, size_t /*bytes_transferred*/) { if(!ec) { @@ -458,6 +486,10 @@ namespace webpp { //Next message std::shared_ptr next_message(new Message()); + //Copy any leftover data in the streambuf into the next message + next_message->streambuf.commit(buffer_copy( + next_message->streambuf.prepare(message->streambuf.size()), + message->streambuf.data())); read_message(next_message); } else if(on_error)