gigatron/rom/Contrib/psr/py-gtemu/gtemu.py
2025-01-28 19:17:01 +03:00

710 lines
23 KiB
Python

"""Python wrapper for the gtemu emulator
"""
import itertools
from contextlib import contextmanager
import asm
import _gtemu
__all__ = ["Emulator", "RAM", "ROM"]
_BLANK_RAM = bytearray([0 for _ in range(1 << 15)])
def _make_state_field_accessor(name):
"""Return a descriptor that accesses the fields of the state"""
def _getter(self):
state = self._state
return getattr(state, name)
def _setter(self, value):
state = self._state
setattr(state, name, value)
return property(
_getter,
_setter,
doc=f"Get or set the current state of the {name} register",
)
def _make_zero_page_accessor(
name, address, *, width=2, byteorder="little", signed=False
):
"""Return a descriptor that accesses zero-page memory"""
def _getter(self):
return int.from_bytes(
RAM[address : address + width], byteorder=byteorder, signed=signed
)
def _setter(self, value):
RAM[address : address + width] = value.to_bytes(
width, byteorder=byteorder, signed=signed
)
return property(
_getter, _setter, doc=f"Get or set the current state of the {name} register"
)
def _to_address(address_or_symbol):
"""Convert a value that may be an address or a symbol, to an address
Labels are the most likely symbols we encounter"""
from_symbol = asm.symbol(address_or_symbol)
if from_symbol is not None:
address = from_symbol
else:
try:
address = int(address_or_symbol)
except (TypeError, ValueError):
raise ValueError(
f"{address_or_symbol:r} is not a valid address or symbol"
)
if not (0 <= address < (1 << 16)):
raise ValueError(f"{address:x} is out of range for an address")
return address
# When stepping the vCPU where must we stop.
# TODO: We should probably be loading these from asm.py
_STEP_VCPU_BREAKPOINTS = frozenset(
[
0x02FF, # ENTER
0x0301, # NEXT
]
)
# Maximum latency between vCPU instructions if nothing goes wrong.
# TODO: Find the right value, needs to work if we're in slow (rendering all four lines) mode.
_STEP_VCPU_MAX_CYCLES = 1000 # It should be possible to calculate a more precise value
class _Emulator:
"""Provides programatic control over the Gigatron Emulator"""
def __init__(self):
self.reset()
def load_rom_file(self, path):
with open(path, "rb") as fp:
fp.readinto(_gtemu.ffi.buffer(ROM))
def load_rom_from_asm_module(self):
"""Populates the ROM from the contents of the asm module
This requires that an assembly script has already been executed.
"""
def gen_rom_data():
for opcode, operand in zip(asm._rom0, asm._rom1):
yield opcode
yield operand
rom_data = bytearray(gen_rom_data())
_gtemu.ffi.buffer(ROM)[0 : len(rom_data)] = rom_data
def reset(self):
self._state = _gtemu.ffi.new("CpuState *")[0]
self._last_pc = None
self._print = False
self._vga_x = 0
self._vga_y = 0
self._xout = 0
self.breakpoints = set()
_gtemu.ffi.buffer(RAM)[:] = _BLANK_RAM
# Needed for bit shuffling
_gtemu.ffi.buffer(RAM)[0b1000_0000] = b"\x01"
PC = _make_state_field_accessor("PC")
IR = _make_state_field_accessor("IR")
D = _make_state_field_accessor("D")
AC = _make_state_field_accessor("AC")
X = _make_state_field_accessor("X")
Y = _make_state_field_accessor("Y")
OUT = _make_state_field_accessor("OUT")
@property
def XOUT(self):
"""State of the eXtended OUTput register
This is set from AC when the hsync signal is low
"""
return self._xout
@property
def hsync(self):
"""Return the state of the HSYNC signal.
True is high, and False is low, but remember this is an active low signal"""
return bool(self.OUT & _HSYNC)
@property
def vsync(self):
"""Return the state of the VSYNC value.
True is high, and False is low, but remember this is an active low signal"""
return bool(self.OUT & _VSYNC)
@property
def next_instruction(self):
"""Get the address of the next instruction to execute"""
return self._last_pc
@next_instruction.setter
def next_instruction(self, address):
"""Set program execution to proceed from `address`
This sets the PC to address + 1, having loaded the instruction at address,
as if we had just executed address - 1.
"""
# To start from an address, we need to fill the pipeline with the instruction at address
# and set PC to address + 1.
address = _to_address(address)
self.PC = address + 1
self.IR = _gtemu.lib.ROM[address][0]
self.D = _gtemu.lib.ROM[address][1]
self._last_pc = address
@property
def state(self):
"""Return a string representation of the current state"""
registers = [
("PC", 2),
("IR", 1),
("D", 1),
("AC", 1),
("X", 1),
("Y", 1),
("OUT", 1),
("XOUT", 1),
]
heading = " ".join(
[r.rjust(w * 2 + 1) for r, w in registers] + ["Loaded instruction"]
)
separator = " ".join(
["-" * (2 * w + 1) for _, w in registers] + ["------------------"]
)
values = " ".join(
[("${:0%dx}" % (w * 2,)).format(getattr(self, r)) for r, w in registers]
+ [asm.disassemble(self.IR, self.D)]
)
return "\n".join([heading, separator, values])
def _step(self):
"""Run a single step of the interpreter"""
# Store the current PC, so that we can return it as next_instruction
# This is needed because of the pipeline
self._last_pc = self.PC
old_state = self._state
self._state = new_state = _gtemu.lib.cpuCycle(self._state)
# Detect changes in the vertical and horizontal sync signals
hsync_edge = (new_state.OUT & _HSYNC) - (old_state.OUT & _HSYNC)
vsync_edge = (new_state.OUT & _VSYNC) - (old_state.OUT & _VSYNC)
# TODO: It might be nice to detect timing errors - but it's tricky because the ROM
# doesn't go straight into standard timings.
# We keep these values in order to support run_to_vsync and run_to_hsync
self._vga_x += 1
if vsync_edge < 0:
self._vga_y = -36
if hsync_edge > 0:
self._vga_x = 0
self._vga_y += 1
self._xout = self.AC
def step(self):
self._step()
if self._print:
print(self.state)
def _run_for(self, instructions):
for i in range(instructions):
self._step()
if self._last_pc in self.breakpoints:
return i + 1
return instructions
def run_for(self, instructions):
"""Run the emulator for a fixed number of cycles
Will stop at breakpoints if they are hit,
but always executes at least one cycle
Returns the number of cycles executed.
"""
try:
self._run_for(instructions)
finally:
if self._print:
print(self.state)
def _run_to_breakpoint(self, max_instructions):
cycles = self._run_for(max_instructions)
if self._last_pc not in self.breakpoints:
raise ValueError("timeout")
return cycles
def run_to(self, address, max_instructions=1000):
"""Run the emulator until it is about to execute the instruction at `address`
Due to the pipeline, this means that for the previous instruction PC was `address`,
and therefore we have loaded the instruction.
Will stop at breakpoints if they are hit,
but always executes at least one cycle
"""
address = _to_address(address)
iterator = (
range(max_instructions)
if max_instructions is not None
else itertools.count()
)
try:
for i, _ in enumerate(iterator):
self._step()
if self._last_pc == address or self._last_pc in self.breakpoints:
return i + 1
raise ValueError(
"Did not hit address in %d instructions" % (max_instructions,)
)
finally:
if self._print:
print(self.state)
def run_to_hblank(self):
"""Run the emulator until we get to the next horizontal blank period
This is the "front-porch" period which follows the visible pixels
and preceeds the hsync signal going low
If we are already in the hblank period, we advance to the following one
"""
# Counting from the rising edge of HSYNC, there are
# (48 / 4) back porch pixels
# + (640 / 4) visible pixels
# Before we blank
hblank_start = 48 / 4 + 640 / 4 # = 172
# Skip through current blank
while self._vga_x > hblank_start:
self._step()
while self._vga_x < hblank_start:
self._step()
if self._print:
print(self.state)
def run_to_vblank(self):
"""Run the emulator until we get to the next vertical blank period
This is the "front-porch" period which follows the visible lines
and preceeds the vsync signal going low
If we are already in the vblank period, we advance to the following one
"""
# Our counter starts from -36, following the code in gtemu.c, and so
# we can just need to wait until we hit line 480
# If we are currently in a vertical blank interval, advance through it.
# the caller wants the next one
while self._vga_y > 480:
self._step()
while self._vga_y < 480:
self._step()
if self._print:
print(self.state)
@contextmanager
def additional_breakpoints(self, *additional_breakpoints):
"""Returns a context manager which will add breakpoints on entry and restore them on completion
Additional_breakpoints can be a single collection or multiple arguments.
Note, this swaps out and restores the breakpoint set on exit.
To prevent breakpoints being added and lost when the context expires, a frozenset is used.
"""
# This approach is basically meant to optimise for the vCPU step usecase
existing_breakpoints = self.breakpoints
try:
additional_breakpoints = frozenset(*additional_breakpoints)
except TypeError:
additional_breakpoints = frozenset(additional_breakpoints)
self.breakpoints = (
additional_breakpoints | existing_breakpoints
if existing_breakpoints
else additional_breakpoints
)
try:
yield
finally:
self.breakpoints = existing_breakpoints
# vCPU related methods
vPC = _make_zero_page_accessor("vPC", 0x16)
vAC = _make_zero_page_accessor("vAC", 0x18)
vLR = _make_zero_page_accessor("vLR", 0x1A)
vSP = _make_zero_page_accessor("vSP", 0x1C, width=1)
@property
def vcpu_state(self):
registers = [
("vPC", 2),
("vAC", 2),
("vLR", 2),
("vSP", 1),
]
heading = " ".join(
[r.rjust(w * 2 + 1) for r, w in registers] + ["Next instruction"]
)
separator = " ".join(
["-" * (2 * w + 1) for _, w in registers] + ["----------------"]
)
values = " ".join(
[("${:0%dx}" % (w * 2,)).format(getattr(self, r)) for r, w in registers]
+ [_disassemble_vcpu_next()]
)
return "\n".join([heading, separator, values])
def step_vcpu(self):
"""Run the emulator up to NEXT or ENTER.
Returns the number of machine instructions exectuted.
Does stop at breakpoints if they are set.
The method name is slightly misleading.
It will always complete the currently executing instruction if there is one,
but it doesn't follow that it leaves us in a state where we're about to execute the next one.
We might instead be about to return to the display loop,
or the next instruction might be a SYS function which doesn't have time to execute.
It might therefore take several call to see a change in vPC.
"""
try:
with self.additional_breakpoints(_STEP_VCPU_BREAKPOINTS):
return self._run_to_breakpoint(_STEP_VCPU_MAX_CYCLES)
finally:
if self._print:
print(self.vcpu_state)
def run_vcpu_to(self, address, *, always_run_some_code=True):
"""Run the emulator until we are at NEXT or ENTER, before vPC is advanced to address.
Returns the number of machine instructions exectuted.
Does stop at breakpoints if they are set.
This will always run some code if always_run_some_code is True (the default),
even if the vPC is already in the desired state,
to allow repeated calls in a loop.
See the docstring for step_vcpu for further caveats.
"""
# Before we run the instruction at 0x200, vPC should be 0x2fe,
# Because only the low byte is incremented
target_vpc = address & 0xFF00 | (address - 2) & 0xFF
cycles = 0
try:
with self.additional_breakpoints(_STEP_VCPU_BREAKPOINTS):
while always_run_some_code and self.vPC == target_vpc:
cycles += self._run_to_breakpoint(_STEP_VCPU_MAX_CYCLES)
if self._last_pc not in _STEP_VCPU_BREAKPOINTS:
return cycles
while self.vPC != target_vpc:
cycles += self._run_to_breakpoint(_STEP_VCPU_MAX_CYCLES)
if self._last_pc not in _STEP_VCPU_BREAKPOINTS:
return cycles
return cycles
finally:
if self._print:
print(self.vcpu_state)
def send_byte(self, value):
"""Send a byte through the input port"""
# While communication to the Gigatron is serial,
# it's captured by a shift register, and is presented as a byte
# so for our purposes we can just write the byte.
#
# The shift register is read once per vertical refresh
# so wait until vsync goes low, write the value and hold it
# through the rising edge round to the next falling edge
while self.OUT & _VSYNC:
self._step()
_IN[0] = value
while not (self.OUT & _VSYNC):
self._step()
while self.OUT & _VSYNC:
self._step()
_IN[0] = 0xFF # Restore to the initial value
def read_serial(self, *, bits):
"""Read a single serial value through the output port
Returns the value read and the number of bits that were read
"""
# Conversely, from an architectural point of view, the Gigatron has a single 8-bit
# output port, which is written as a byte. However this drives multiple output devices.
# Primarily the display, of course, but the /HSYNC signal also clocks the XOUT register
# (which copies the value of the accumulator) for the LEDS and sound
# and also the game controller port.
# The way in which serial output works is actually modulating the duration of the
# /VSYNC pulse, which can be counted in terms of /HSYNC pulses.
# 8 is the default length and so can be interpretted as no data transmission,
# but it is shortened to 7 for a zero bit, and stretched to 9 for a 1.
# This code is kinda complicated
def generate_outputs():
"""Step the emulator yielding OUT"""
while True:
yield self.OUT
self._step()
# create two generators yielding just the vsync and hsync bits.
# The place value doesn't matter because we're going to edge-detect
# and only look at sign.
_outs1, _outs2 = itertools.tee(generate_outputs())
vsyncs, hsyncs = (o & _VSYNC for o in _outs1), (o & _HSYNC for o in _outs2)
def detect_edges(gen):
"""Yield positive numbers for rising edges, and negative for falling"""
# Pairwise iteration
a, b = itertools.tee(gen)
next(b, None)
for prev, curr in zip(a, b):
yield curr - prev
def debug_detect_edges_short(edges):
"""Useful routine to use when debugging the code below"""
for value in edges:
if value < 0:
print("v", end="")
elif value > 0:
print("^", end="")
yield value
def debug_detect_edges(edges):
"""Useful routine to use when debugging the code below"""
state = "X"
for i, value in enumerate(edges):
if i % 6400 == 0:
print("\n", self.state)
print("\n", self.vcpu_state)
elif i % 80 == 0:
print()
if value == 0:
print(state, end="")
elif value < 0:
state = "_"
print("\\", end="")
else:
state = ""
print("/", end="")
yield value
# vsync_edges, hsync_edges = (
# debug_detect_edges_short(detect_edges(vsyncs)),
# debug_detect_edges(detect_edges(hsyncs)),
# )
vsync_edges, hsync_edges = detect_edges(vsyncs), detect_edges(hsyncs)
def do_line_counting(vsync_edges, hsync_edges):
"""Yield count of lines in each negative vsync pulse"""
pairs = zip(vsync_edges, hsync_edges)
while True:
# Advance to falling edge
i = itertools.dropwhile(lambda pair: pair[0] >= 0, pairs)
# .. and continue until we see a rising edge
i = itertools.takewhile(lambda pair: pair[0] <= 0, i)
# count falling HSYNC edges within this window
yield sum(1 for _, h_edge in i if h_edge < 0)
lines_per_vsync = do_line_counting(vsync_edges, hsync_edges)
def debug_counting(lines_per_vsync):
"""useful routine when debugging the code below"""
for frame, hsync_edges in enumerate(lines_per_vsync):
print(f"Frame {frame} has {hsync_edges} line vsync pulse")
yield hsync_edges
# lines_per_vsync = debug_counting(lines_per_vsync)
# Skip frames with 8 line VSYNC signals
lines_per_vsync = itertools.dropwhile(lambda count: count == 8, lines_per_vsync)
# and continue until we return to 8
lines_per_vsync = itertools.takewhile(lambda count: count != 8, lines_per_vsync)
# Data is sent in little-endian order.
# So a 1 in the first frame represents 1
# and a 1 in the second frame represents 2
result = 0
for bit_number, lines in enumerate(lines_per_vsync):
assert lines in (7, 9), f"Unexpected VSYNC length {lines}"
if lines == 7:
value = 0
if lines == 9:
value = 1 << bit_number
result |= value
if bit_number == bits - 1:
break
else:
# We did not see the expected number of bits in the transmission
raise AssertionError(f"Only saw {bit_number} bits in the transmission")
return result
_VSYNC = 0b1000_0000
_HSYNC = 0b0100_0000
# This is effectively a pointer to the IN global variable, which can be
# Updated through list assignment.
_IN = _gtemu.ffi.addressof(_gtemu.lib, "IN")
RAM = _gtemu.lib.RAM
ROM = _gtemu.lib.ROM
Emulator = _Emulator()
# Little vCPU disassembler copied from gt1dump.py
def _disassemble_vcpu_next():
"""Return the string disassembly of the next instruction"""
# The next vPC is not Emulator.vPC + 2 - it doesn't carry into the high byte
next_vpc = Emulator.vPC & 0xFF00 | ((Emulator.vPC + 2 & 0xFF))
opcode = RAM[next_vpc]
# If we don't know a mnemonic, just use hex - no operands
mnemonic, number_of_operands = _OPCODES.get(opcode, (f"${opcode:02x}", 0))
asm_parts = [mnemonic]
operands = bytearray(RAM[next_vpc + 1 : next_vpc + 1 + number_of_operands])
while operands:
# Poor man's switch statement
if mnemonic in _ZP_MODE_OPCODES:
operand = operands.pop(0)
decoding = _ZERO_PAGE_SYMBOLS.get(operand, f"${operand:02x}")
asm_parts.append(decoding)
continue
if mnemonic == "Bcc":
operand = operands.pop(0)
if operand in _BCC_CODES:
asm_parts = [f"B{_BCC_CODES[operand]}"]
# Fall through
if mnemonic in {"Bcc", "BRA", "DEF"}:
# Next operand is a target within a page
operand = operands.pop(0)
target = next_vpc & 0xFF00 | (operand + 2) & 0xFF
asm_parts.append(f"${target:04x}")
continue
if mnemonic == "SYS":
operand = operands.pop(0)
if operand != 128:
max_cycles = 28 - 2 * ((operand ^ 128) - 128)
asm_parts.append(str(max_cycles))
else:
asm_parts = ["HALT"]
continue
# Else...
# Treat any remaining bytes as a single operand
operand = int.from_bytes(operands, "little", signed=False)
asm_parts.append(f"${operand:02x}" if len(operands) == 1 else f"${operand:04x}")
break
return " ".join(asm_parts)
_OPCODES = {
0x11: ("LDWI", 2),
0x1A: ("LD", 1),
0x1F: ("CMPHS", 1),
0x21: ("LDW", 1),
0x2B: ("STW", 1),
0x35: ("Bcc", 2),
0x59: ("LDI", 1),
0x5E: ("ST", 1),
0x63: ("POP", 0),
0x75: ("PUSH", 0),
0x7F: ("LUP", 1),
0x82: ("ANDI", 1),
0x85: ("CALLI", 2),
0x88: ("ORI", 1),
0x8C: ("XORI", 1),
0x90: ("BRA", 1),
0x93: ("INC", 1),
0x97: ("CMPHU", 1),
0x99: ("ADDW", 1),
0xAD: ("PEEK", 0),
0xB4: ("SYS", 1),
0xB8: ("SUBW", 1),
0xCD: ("DEF", 1),
0xCF: ("CALL", 1),
0xDF: ("ALLOC", 1),
0xE3: ("ADDI", 1),
0xE6: ("SUBI", 1),
0xE9: ("LSLW", 0),
0xEC: ("STLW", 1),
0xEE: ("LDLW", 1),
0xF0: ("POKE", 1),
0xF3: ("DOKE", 1),
0xF6: ("DEEK", 0),
0xF8: ("ANDW", 1),
0xFA: ("ORW", 1),
0xFC: ("XORW", 1),
0xFF: ("RET", 0),
}
_BCC_CODES = {
0x3F: "EQ",
0x4D: "GT",
0x50: "LT",
0x53: "GE",
0x56: "LE",
0x72: "NE",
}
_ZERO_PAGE_SYMBOLS = {
0x00: "zeroConst",
0x01: "memSize",
0x06: "entropy",
0x09: "videoY",
0x0E: "frameCount",
0x0F: "serialRaw",
0x11: "buttonState",
0x14: "xoutMask",
0x16: "vPC",
0x17: "vPC+1",
0x18: "vAC",
0x19: "vAC+1",
0x1A: "vLR",
0x1B: "vLR+1",
0x1C: "vSP",
0x21: "romType",
0x22: "sysFn",
0x23: "sysFn+1",
0x24: "sysArgs+0",
0x25: "sysArgs+1",
0x26: "sysArgs+2",
0x27: "sysArgs+3",
0x28: "sysArgs+4",
0x29: "sysArgs+5",
0x2A: "sysArgs+6",
0x2B: "sysArgs+7",
0x2C: "soundTimer",
0x2E: "ledState_v2",
0x2F: "ledTempo",
0x80: "oneConst",
}
# Opcodes that address page zero
_ZP_MODE_OPCODES = {
"LD",
"LDW",
"STW",
"ST",
"INC",
"ADDW",
"SUBW",
"CALL",
"POKE",
"DOKE",
"ANDW",
"ORW",
"XORW",
"CMPHS",
"CMPHU",
}