ti99: Prevent race conditions in TIPI by syncing the webservice client via mutex.

This commit is contained in:
Michael Zapf 2024-08-04 21:11:01 +02:00
parent 323aa3f0d8
commit b885d1226f
2 changed files with 191 additions and 141 deletions

View File

@ -18,16 +18,66 @@
The general concept is that there is a set of memory-mapped ports between
the TI and the Raspberry Pi.
0x5ff9 (RCIN): PI Control Signal (input from RPi)
0x5ffb (RDIN): PI Data (input)
0x5ffd (TCOUT): TI Control Signal (output to RPi)
0x5fff (TDOUT): TI Data (output)
0x5ff9 (RC): PI Control Signal (input from RPi)
0x5ffb (RD): PI Data (input)
0x5ffd (TC): TI Control Signal (output to RPi)
0x5fff (TD): TI Data (output)
CRU map
-------
1x00: Activate the card and its ROM (x=0...f)
1x02: Send RESET to Raspberry Pi
TI-TIPI protocol [2]
--------------------
Byte layer protocol
----------------
Reset-sync:
TI: rs -> TC rs = f1
PI: (TC==rs) rs -> RC
PIemu: (TC==rs) rs -> RC; (mode==sync) send("SYNC")
Write-byte(byte):
TI: byte -> TD
wb -> TC wb = 02 / 03 (alternating)
PI: (TC==wb) send (TD); (TC) -> RC
PIemu: (TC==wb) q.append(TD); (TC) -> RC
byte=Read-byte:
TI: rb -> TC rb = 06 / 07 (alternating)
(RC==rb) return (RD)
PI: (TC==rb) byte -> RD; (TC) -> RC
PIemu: (TC==rb) (q.length > 0) q.pop -> RD; (TC) -> RC
Message layer protocol
----------------------
Send-message(c1,c2,...,cn), n = (n1<<8)+n2:
Write-byte(n1)
Write-byte(n2)
for i=1..n: Write-byte(c[i])
(c1,...cn) = Receive-message:
n1=Read-byte
n2=Read-byte
for i=1..(n1<<8)+n2: c[i] = Read-byte
Communication layer protocol
----------------------------
Communication:
Reset-sync
Send-message(ms)
rm = Receive-message
Emulation adaptation
--------------------
Send-messageEmu(c1,c2,...,cn), n = (n1<<8)+n2:
Write-byte(n1)
Write-byte(n2)
for i=1..n: Write-byte(c[i])
send(q.content)
Raspberry connection
--------------------
@ -61,7 +111,8 @@
Logging
-------
Since we use a background thread for the websocket client, and logging
is not thread-safe, log output may look broken at some times.
is not thread-safe, log output may look broken at some times (in particular
when setting the LOG_PORTS flag).
Michael Zapf
@ -70,6 +121,7 @@
References
[1] TIPI description on Github: https://github.com/jedimatt42/tipi
[2] TIPI Protocol: https://github.com/jedimatt42/tipi/wiki/TIPI-Protocol
*****************************************************************************/
@ -81,13 +133,15 @@
#define LOG_CRU (1U << 3)
#define LOG_PORTS (1U << 4)
#define LOG_RPI (1U << 5)
#define LOG_QUEUE (1U << 6)
#define LOG_PROT (1U << 7)
#define LOG_DETAIL (1U << 8)
#define LOG_RECV (1U << 6)
#define LOG_SEND (1U << 7)
#define LOG_PROT (1U << 8)
#define VERBOSE (LOG_GENERAL | LOG_WARN)
#define RASPI "rpi"
#define PAUSEBASE 1000
#include "logmacro.h"
DEFINE_DEVICE_TYPE(TI99_TIPI, bus::ti99::peb::tipi_card_device, "ti99_tipi", "TIPI card")
@ -104,10 +158,12 @@ tipi_card_device::tipi_card_device(const machine_config &mconfig, const char *ta
m_portaccess(false),
m_waitinit(false),
m_syncmode(true),
m_connected(false),
m_rpiconn(false),
m_pausetime(PAUSEBASE),
m_pending_read(false),
m_tc(0),
m_rd(0),
m_lasttc(0)
m_rd(0)
{
}
@ -157,13 +213,13 @@ void tipi_card_device::readz(offs_t offset, uint8_t *value)
int val = 0;
if (m_address & 2)
{
LOGMASKED(LOG_PORTS, "RDIN -> %02x\n", m_rd);
val = m_rd;
val = get_rd();
LOGMASKED(LOG_PORTS, "RD -> %02x\n", m_rd);
}
else
{
val = m_rc;
LOGMASKED(LOG_PORTS, "RCIN -> %02x\n", val);
LOGMASKED(LOG_PORTS, "RC -> %02x\n", val);
}
*value = val;
}
@ -189,33 +245,17 @@ void tipi_card_device::write(offs_t offset, uint8_t data)
{
if (m_address & 2)
{
LOGMASKED(LOG_PORTS, "TDOUT <- %02x\n", data);
set_td(data);
LOGMASKED(LOG_PORTS, "TD <- %02x\n", data);
m_td = data;
}
else
{
LOGMASKED(LOG_PORTS, "TCOUT <- %02x\n", data);
set_tc(data);
}
}
}
}
void tipi_card_device::set_td(u8 data)
{
m_td = data;
}
void tipi_card_device::set_tc(u8 data)
{
if (m_tc != data)
{
m_tc = data;
m_lasttc = 0;
process_message();
}
}
/*
Debugger access.
*/
@ -277,6 +317,14 @@ void tipi_card_device::cruwrite(offs_t offset, uint8_t data)
}
}
u8 tipi_card_device::get_rd()
{
// Prevent a read access to RD while RD and RC are being
// changed by the websocket client. It should suffice to guard RD.
const std::lock_guard<std::mutex> lock(m_mutex);
return m_rd;
}
void tipi_card_device::send(const char* message)
{
if (m_rpiconn && m_connected)
@ -292,7 +340,7 @@ void tipi_card_device::send(u8* message, int len)
{
for (int i=0; i < len; i++)
{
LOGMASKED(LOG_DETAIL, "Sending byte %02x\n", message[i]);
LOGMASKED(LOG_SEND, "Sending byte %02x\n", message[i]);
*m_send_stream << message[i];
}
m_wsclient->send(m_send_stream, nullptr, 130); // binary (10000010)
@ -307,104 +355,90 @@ enum
};
/*
Messages from server:
"RD=xxx"
"RC=xxx"
Process the command submitted to the TC register.
The TSRSET only sends a sync to the Pi in sync mode; the current TIPI
implementation normally uses the async mode.
processMessage occurs for each incoming message
and each time that TC has been changed
Write a message to RPi:
for i=-2..(n-1)
Write byte to TD
Write (02 | (i & 1)) to TC
Byte sequence: len1 len2 m0 m1 m2 m3 ... m(n-1), where n = len1<<8 | len2;
Message will be written to RPi when complete
Read a message from RPi:
n = message.length
n = n1<<8 | n2
Write 06 to TC
n1 = Read byte from TD
Write 07 to TC
n2 = Read byte from TD
for i=0..(n-1)
Write (06 | (i & 1)) to TC
Read byte from TD
Apart from that, TSRSET resets the pointers in preparation of getting a
received message or composing a message to be sent.
TSWB: Write a byte. Instead of sending single bytes, the message is composed
in an array and sent when it is complete.
TSRB: Read a byte. This requires a message to be available in the input
queue. If the queue is empty, the read operation is not acknowledged.
This is done in websocket_incoming.
*/
void tipi_card_device::process_message()
void tipi_card_device::set_tc(u8 command)
{
if (m_tc == TSRSET) // Reset command
// Synchronize between websocket client and main thread
const std::lock_guard<std::mutex> lock(m_mutex);
LOGMASKED(LOG_PORTS, "TC <- %02x\n", command);
// Only call once for the same value
if (m_tc == command) return;
m_tc = command;
switch (m_tc)
{
LOGMASKED(LOG_PROT, "Reset\n");
case TSRSET:
LOGMASKED(LOG_PROT, "TSRSET\n");
m_rpimessage = nullptr;
m_msgindex = -2;
m_rc = m_tc;
m_lasttc = 0;
m_tc = 0;
if (m_syncmode)
{
LOGMASKED(LOG_PROT, "Sending SYNC\n");
send("SYNC"); // only for SYNC mode
}
}
else
{
if (m_lasttc != m_tc)
break;
case TSWB:
case TSWB+1:
if (m_msgindex == -2)
m_msglength = m_td << 8;
else
{
if ((m_tc & 0xfe) == TSWB) // 02 and 03: write byte (LSB as strobe)
if (m_msgindex == -1)
{
switch (m_msgindex)
{
case -2:
m_msglength = m_td << 8;
break;
case -1:
m_msglength |= m_td;
m_rpimessage = std::make_unique<u8[]>(m_msglength);
break;
default:
if (m_rpimessage != nullptr)
m_rpimessage[m_msgindex] = m_td;
}
m_msgindex++;
if (m_msgindex == m_msglength)
{
// Message is complete - transmit it
LOGMASKED(LOG_RPI, "Sending message, length %d\n", m_msglength);
send(m_rpimessage.get(), m_msglength);
m_rpimessage = nullptr;
}
else
m_lasttc = m_tc;
m_rc = m_tc; // Auto-acknowledge
m_msglength |= m_td;
m_rpimessage = std::make_unique<u8[]>(m_msglength);
}
else
{
if ((m_tc & 0xfe) == TSRB) // 06 and 07: read byte (LSB as strobe)
{
if (!m_indqueue.empty())
{
m_rc = m_tc; // Auto-acknowledge
m_lasttc = m_tc;
m_rd = m_indqueue.front();
m_indqueue.pop();
}
}
}
if (m_rpimessage != nullptr)
m_rpimessage[m_msgindex] = m_td;
}
m_msgindex++;
if (m_msgindex == m_msglength)
{
// Message is complete - transmit it
LOGMASKED(LOG_RPI, "Sending message, length %d\n", m_msglength);
send(m_rpimessage.get(), m_msglength);
}
m_rc = m_tc; // Auto-acknowledge
break;
case TSRB:
case TSRB+1:
if (!m_indqueue.empty())
{
m_rc = m_tc; // Auto-acknowledge
m_rd = m_indqueue.front();
m_indqueue.pop();
}
else
{
// We have to wait for the TIPI to send more data
LOGMASKED(LOG_PROT, "Pending read\n");
m_pending_read = true;
}
break;
}
}
@ -467,11 +501,15 @@ void tipi_card_device::websocket_opened()
LOG("Connection established\n");
m_connected = true;
m_rc = 0;
m_pausetime = PAUSEBASE;
if (m_waitinit) m_slot->set_ready(ASSERT_LINE);
}
void tipi_card_device::websocket_incoming(std::shared_ptr<webpp::ws_client::Message> message)
{
// Synchronize between websocket client and main thread
const std::lock_guard<std::mutex> lock(m_mutex);
// Caution: Message is an istream, so the string method consumes the Message
// Size is not changed, nor is fin_rsv_opcode
std::string msg;
@ -494,7 +532,7 @@ void tipi_card_device::websocket_incoming(std::shared_ptr<webpp::ws_client::Mess
}
if (msg.find("ASYNC")==0)
{
LOG("TIPI server offers ASYNC communication\n");
LOGMASKED(LOG_PROT, "TIPI server offers ASYNC communication\n");
m_syncmode = false;
}
break;
@ -508,19 +546,29 @@ void tipi_card_device::websocket_incoming(std::shared_ptr<webpp::ws_client::Mess
u8 len1 = (message->size()>>8) & 0xff;
u8 len2 = message->size() & 0xff;
m_indqueue.push(len1);
LOGMASKED(LOG_QUEUE, "<- %02x (len1) [%d]\n", len1, m_indqueue.size());
LOGMASKED(LOG_RECV, "<- %02x (len1) [%d]\n", len1, m_indqueue.size());
m_indqueue.push(len2);
LOGMASKED(LOG_QUEUE, "<- %02x (len2) [%d]\n", len2, m_indqueue.size());
LOGMASKED(LOG_RECV, "<- %02x (len2) [%d]\n", len2, m_indqueue.size());
}
for (int i=0; i < message->size(); i++)
{
u8 msgbyte = (u8)message->get();
m_indqueue.push(msgbyte);
LOGMASKED(LOG_QUEUE, "<- %02x [%d]\n", msgbyte, m_indqueue.size());
LOGMASKED(LOG_RECV, "<- %02x [%d]\n", msgbyte, m_indqueue.size());
}
LOGMASKED(LOG_PROT, "Queue length: %d\n", m_indqueue.size());
if (m_pending_read)
{
LOGMASKED(LOG_PROT, "Serving pending read\n");
m_rc = m_tc; // Auto-acknowledge
m_rd = m_indqueue.front();
m_indqueue.pop();
m_pending_read = false;
}
process_message();
break;
default:
@ -531,41 +579,39 @@ void tipi_card_device::websocket_incoming(std::shared_ptr<webpp::ws_client::Mess
void tipi_card_device::websocket_error(const std::error_code& code)
{
if (code.value() == std::make_error_code(std::errc::connection_reset).value())
m_pausetime = m_pausetime*2;
m_attempts--;
if (m_attempts > 0)
{
m_wsclient->stop();
if (m_attempts > 0)
if (code.value() == std::make_error_code(std::errc::connection_reset).value())
{
m_attempts--;
LOGMASKED(LOG_WARN, "Error, connection reset. Retrying.\n");
m_restart_timer->adjust(attotime::from_msec(2500));
LOGMASKED(LOG_WARN, "Error, connection reset. Retrying after %d secs.\n", m_pausetime/1000);
}
else
{
LOGMASKED(LOG_WARN, "Connection has been reset several times; giving up\n");
m_connected = false;
// End-of-file (2) occurs when the Pi is rebooted
LOGMASKED(LOG_WARN, "Got error: %s (%d); retrying after %d secs\n", code.message().c_str(), code.value(), m_pausetime/1000);
// Reconnection is not implemented on the Pi side for emulations,
// so this may cause the config program to wait forever
// Needs to be documented
}
m_restart_timer->adjust(attotime::from_msec(m_pausetime));
}
else
{
// End-of-file (2) occurs when the Pi is rebooted
LOGMASKED(LOG_WARN, "Got error: %s (%d)\n", code.message().c_str(), code.value());
// Reconnection is not implemented on the Pi side for emulations,
// so this may cause the config program to wait forever
// Needs to be documented
m_restart_timer->adjust(attotime::from_msec(20000));
m_attempts = 20;
LOGMASKED(LOG_WARN, "Unable to re-establish connection to TIPI; giving up\n");
m_connected = false;
}
}
void tipi_card_device::websocket_closed(int i, const std::string& msg)
{
LOG("Closing connection\n");
LOG("Closing connection; retrying after %d secs\n", m_pausetime/1000);
if (m_waitinit) m_slot->set_ready(CLEAR_LINE);
// Open it again after 3 secs
m_wsclient->stop();
m_restart_timer->adjust(attotime::from_msec(3000));
m_attempts = 5;
// Open it again
m_restart_timer->adjust(attotime::from_msec(m_pausetime));
m_attempts = 8;
}
void tipi_card_device::device_start()
@ -585,6 +631,7 @@ void tipi_card_device::device_start()
m_restart_timer = timer_alloc(FUNC(tipi_card_device::open_websocket), this);
m_rpiconn = (m_rpi->exists());
m_connected = false;
}
void tipi_card_device::device_reset()
@ -595,13 +642,13 @@ void tipi_card_device::device_reset()
m_address = 0;
m_dsr = false;
m_portaccess = false;
m_pausetime = PAUSEBASE;
if (!m_rpiconn) LOGMASKED(LOG_WARN, "No Raspberry Pi connected\n");
// READY=0 is ignored at this point because the CPU is being reset at this time
open_websocket(0);
m_connected = false;
m_lasttc = 0;
m_attempts = 5;
m_attempts = 8;
if (!m_connected) m_restart_timer->adjust(attotime::from_msec(m_pausetime));
}
void tipi_card_device::device_stop()

View File

@ -19,6 +19,7 @@
#include "client_ws.hpp"
#include <queue>
#include <mutex>
namespace bus::ti99::peb {
@ -57,8 +58,7 @@ private:
void send(const char* message);
void send(u8* message, int len);
void process_message();
void set_td(u8 data);
u8 get_rd();
void set_tc(u8 data);
required_device<tipi_attached_device> m_rpi;
@ -82,9 +82,12 @@ private:
int m_attempts;
bool m_connected;
bool m_rpiconn;
int m_pausetime;
// Incoming queue
std::queue<u8> m_indqueue;
std::mutex m_mutex;
bool m_pending_read;
// Computer interface
u8 m_tc;