2981 lines
102 KiB
Python
2981 lines
102 KiB
Python
#!/usr/bin/env python
|
|
# python script to convert & process VGM files for SN76489 PSG
|
|
# by scrubbly 2016
|
|
# Released under MIT license
|
|
#
|
|
# VGM files can be loaded, filtered, transposed for different clock speeds, and quantized to fixed playback rates (lossy)
|
|
#
|
|
# Created primarily to enable porting of NTSC or PAL versions of SN76489 chip tunes to the BBC Micro, but is generally useful for other platforms.
|
|
#
|
|
# Based on https://github.com/cdodd/vgmparse
|
|
#
|
|
# Useful VGM/SN76489 References & Resources:
|
|
# http://www.smspower.org/Development/SN76489
|
|
# http://vgmrips.net/wiki/VGM_Specification
|
|
# http://vgmrips.net/packs/pack/svc-motm
|
|
# http://www.wothke.ch/webvgm/
|
|
# http://www.stairwaytohell.com/music/index.html?page=vgmarchive
|
|
# http://www.zeridajh.org/articles/various/sn76489/index.htm
|
|
# http://www.smspower.org/Music/Homebrew
|
|
# http://www.tommowalker.co.uk/music.html
|
|
# http://battleofthebits.org/arena/Tag/SN76489/
|
|
# http://battleofthebits.org/browser/
|
|
|
|
|
|
import gzip
|
|
import struct
|
|
import sys
|
|
import binascii
|
|
import math
|
|
from os.path import basename
|
|
|
|
if (sys.version_info > (3, 0)):
|
|
from io import BytesIO as ByteBuffer
|
|
else:
|
|
from StringIO import StringIO as ByteBuffer
|
|
|
|
|
|
|
|
#-----------------------------------------------------------------------------
|
|
|
|
|
|
class FatalError(Exception):
|
|
pass
|
|
|
|
|
|
|
|
|
|
class VgmStream:
|
|
|
|
|
|
# VGM commands:
|
|
# 0x50 [dd] = PSG SN76489 write value dd
|
|
# 0x61 [nnnn] = WAIT n cycles (0-65535)
|
|
# 0x62 = WAIT 735 samples (1/60 sec)
|
|
# 0x63 = WAIT 882 samples (1/50 sec)
|
|
# 0x66 = END
|
|
# 0x7n = WAIT n+1 samples (0-15)
|
|
|
|
#--------------------------------------------------------------------------------------------------------------------------------
|
|
# SN76489 register writes
|
|
# If bit 7 is 1 then the byte is a LATCH/DATA byte.
|
|
# %1cctdddd
|
|
# cc - channel (0-3)
|
|
# t - type (1 to latch volume, 1 to latch tone/noise)
|
|
# dddd - placed into the low 4 bits of the relevant register. For the three-bit noise register, the highest bit is discarded.
|
|
#
|
|
# If bit 7 is 0 then the byte is a DATA byte.
|
|
# %0-DDDDDD
|
|
# If the currently latched register is a tone register then the low 6 bits of the byte (DDDDDD)
|
|
# are placed into the high 6 bits of the latched register. If the latched register is less than 6 bits wide
|
|
# (ie. not one of the tone registers), instead the low bits are placed into the corresponding bits of the
|
|
# register, and any extra high bits are discarded.
|
|
#
|
|
# Tone registers
|
|
# DDDDDDdddd = cccccccccc
|
|
# DDDDDDdddd gives the 10-bit half-wave counter reset value.
|
|
#
|
|
# Volume registers
|
|
# (DDDDDD)dddd = (--vvvv)vvvv
|
|
# dddd gives the 4-bit volume value.
|
|
# If a data byte is written, the low 4 bits of DDDDDD update the 4-bit volume value. However, this is unnecessary.
|
|
#
|
|
# Noise register
|
|
# (DDDDDD)dddd = (---trr)-trr
|
|
# The low 2 bits of dddd select the shift rate and the next highest bit (bit 2) selects the mode (white (1) or "periodic" (0)).
|
|
# If a data byte is written, its low 3 bits update the shift rate and mode in the same way.
|
|
#--------------------------------------------------------------------------------------------------------------------------------
|
|
|
|
# script vars / configs
|
|
|
|
VGM_FREQUENCY = 44100
|
|
|
|
|
|
# script options
|
|
RETUNE_PERIODIC = True # [TO BE REMOVED] if true will attempt to retune any use of the periodic noise effect
|
|
VERBOSE = False
|
|
STRIP_GD3 = False
|
|
LENGTH = 0 # required output length (in seconds)
|
|
|
|
# VGM file identifier
|
|
vgm_magic_number = b'Vgm '
|
|
|
|
disable_dual_chip = True # [TODO] handle dual PSG a bit better
|
|
|
|
vgm_source_clock = 0
|
|
vgm_target_clock = 0
|
|
vgm_filename = ''
|
|
vgm_loop_offset = 0
|
|
vgm_loop_length = 0
|
|
|
|
# Supported VGM versions
|
|
supported_ver_list = [
|
|
0x00000101,
|
|
0x00000110,
|
|
0x00000150,
|
|
0x00000151,
|
|
0x00000160,
|
|
0x00000161,
|
|
]
|
|
|
|
# VGM metadata offsets
|
|
metadata_offsets = {
|
|
# SDM Hacked version number 101 too
|
|
0x00000101: {
|
|
'vgm_ident': {'offset': 0x00, 'size': 4, 'type_format': None},
|
|
'eof_offset': {'offset': 0x04, 'size': 4, 'type_format': '<I'},
|
|
'version': {'offset': 0x08, 'size': 4, 'type_format': '<I'},
|
|
'sn76489_clock': {'offset': 0x0c, 'size': 4, 'type_format': '<I'},
|
|
'ym2413_clock': {'offset': 0x10, 'size': 4, 'type_format': '<I'},
|
|
'gd3_offset': {'offset': 0x14, 'size': 4, 'type_format': '<I'},
|
|
'total_samples': {'offset': 0x18, 'size': 4, 'type_format': '<I'},
|
|
'loop_offset': {'offset': 0x1c, 'size': 4, 'type_format': '<I'},
|
|
'loop_samples': {'offset': 0x20, 'size': 4, 'type_format': '<I'},
|
|
'rate': {'offset': 0x24, 'size': 4, 'type_format': '<I'},
|
|
'sn76489_feedback': {
|
|
'offset': 0x28,
|
|
'size': 2,
|
|
'type_format': '<H',
|
|
},
|
|
'sn76489_shift_register_width': {
|
|
'offset': 0x2a,
|
|
'size': 1,
|
|
'type_format': 'B',
|
|
},
|
|
'ym2612_clock': {'offset': 0x2c, 'size': 4, 'type_format': '<I'},
|
|
'ym2151_clock': {'offset': 0x30, 'size': 4, 'type_format': '<I'},
|
|
'vgm_data_offset': {
|
|
'offset': 0x34,
|
|
'size': 4,
|
|
'type_format': '<I',
|
|
},
|
|
},
|
|
|
|
# Version 1.10`
|
|
0x00000110: {
|
|
'vgm_ident': {'offset': 0x00, 'size': 4, 'type_format': None},
|
|
'eof_offset': {'offset': 0x04, 'size': 4, 'type_format': '<I'},
|
|
'version': {'offset': 0x08, 'size': 4, 'type_format': '<I'},
|
|
'sn76489_clock': {'offset': 0x0c, 'size': 4, 'type_format': '<I'},
|
|
'ym2413_clock': {'offset': 0x10, 'size': 4, 'type_format': '<I'},
|
|
'gd3_offset': {'offset': 0x14, 'size': 4, 'type_format': '<I'},
|
|
'total_samples': {'offset': 0x18, 'size': 4, 'type_format': '<I'},
|
|
'loop_offset': {'offset': 0x1c, 'size': 4, 'type_format': '<I'},
|
|
'loop_samples': {'offset': 0x20, 'size': 4, 'type_format': '<I'},
|
|
'rate': {'offset': 0x24, 'size': 4, 'type_format': '<I'},
|
|
'sn76489_feedback': {
|
|
'offset': 0x28,
|
|
'size': 2,
|
|
'type_format': '<H',
|
|
},
|
|
'sn76489_shift_register_width': {
|
|
'offset': 0x2a,
|
|
'size': 1,
|
|
'type_format': 'B',
|
|
},
|
|
'ym2612_clock': {'offset': 0x2c, 'size': 4, 'type_format': '<I'},
|
|
'ym2151_clock': {'offset': 0x30, 'size': 4, 'type_format': '<I'},
|
|
'vgm_data_offset': {
|
|
'offset': 0x34,
|
|
'size': 4,
|
|
'type_format': '<I',
|
|
},
|
|
},
|
|
# Version 1.50`
|
|
0x00000150: {
|
|
'vgm_ident': {'offset': 0x00, 'size': 4, 'type_format': None},
|
|
'eof_offset': {'offset': 0x04, 'size': 4, 'type_format': '<I'},
|
|
'version': {'offset': 0x08, 'size': 4, 'type_format': '<I'},
|
|
'sn76489_clock': {'offset': 0x0c, 'size': 4, 'type_format': '<I'},
|
|
'ym2413_clock': {'offset': 0x10, 'size': 4, 'type_format': '<I'},
|
|
'gd3_offset': {'offset': 0x14, 'size': 4, 'type_format': '<I'},
|
|
'total_samples': {'offset': 0x18, 'size': 4, 'type_format': '<I'},
|
|
'loop_offset': {'offset': 0x1c, 'size': 4, 'type_format': '<I'},
|
|
'loop_samples': {'offset': 0x20, 'size': 4, 'type_format': '<I'},
|
|
'rate': {'offset': 0x24, 'size': 4, 'type_format': '<I'},
|
|
'sn76489_feedback': {
|
|
'offset': 0x28,
|
|
'size': 2,
|
|
'type_format': '<H',
|
|
},
|
|
'sn76489_shift_register_width': {
|
|
'offset': 0x2a,
|
|
'size': 1,
|
|
'type_format': 'B',
|
|
},
|
|
'ym2612_clock': {'offset': 0x2c, 'size': 4, 'type_format': '<I'},
|
|
'ym2151_clock': {'offset': 0x30, 'size': 4, 'type_format': '<I'},
|
|
'vgm_data_offset': {
|
|
'offset': 0x34,
|
|
'size': 4,
|
|
'type_format': '<I',
|
|
},
|
|
},
|
|
# SDM Hacked version number, we are happy enough to parse v1.51 as if it were 1.50 since the 1.51 updates dont apply to us anyway
|
|
0x00000151: {
|
|
'vgm_ident': {'offset': 0x00, 'size': 4, 'type_format': None},
|
|
'eof_offset': {'offset': 0x04, 'size': 4, 'type_format': '<I'},
|
|
'version': {'offset': 0x08, 'size': 4, 'type_format': '<I'},
|
|
'sn76489_clock': {'offset': 0x0c, 'size': 4, 'type_format': '<I'},
|
|
'ym2413_clock': {'offset': 0x10, 'size': 4, 'type_format': '<I'},
|
|
'gd3_offset': {'offset': 0x14, 'size': 4, 'type_format': '<I'},
|
|
'total_samples': {'offset': 0x18, 'size': 4, 'type_format': '<I'},
|
|
'loop_offset': {'offset': 0x1c, 'size': 4, 'type_format': '<I'},
|
|
'loop_samples': {'offset': 0x20, 'size': 4, 'type_format': '<I'},
|
|
'rate': {'offset': 0x24, 'size': 4, 'type_format': '<I'},
|
|
'sn76489_feedback': {
|
|
'offset': 0x28,
|
|
'size': 2,
|
|
'type_format': '<H',
|
|
},
|
|
'sn76489_shift_register_width': {
|
|
'offset': 0x2a,
|
|
'size': 1,
|
|
'type_format': 'B',
|
|
},
|
|
'ym2612_clock': {'offset': 0x2c, 'size': 4, 'type_format': '<I'},
|
|
'ym2151_clock': {'offset': 0x30, 'size': 4, 'type_format': '<I'},
|
|
'vgm_data_offset': {
|
|
'offset': 0x34,
|
|
'size': 4,
|
|
'type_format': '<I',
|
|
},
|
|
},
|
|
# SDM Hacked version number, we are happy enough to parse v1.60 as if it were 1.50 since the 1.51 updates dont apply to us anyway
|
|
0x00000160: {
|
|
'vgm_ident': {'offset': 0x00, 'size': 4, 'type_format': None},
|
|
'eof_offset': {'offset': 0x04, 'size': 4, 'type_format': '<I'},
|
|
'version': {'offset': 0x08, 'size': 4, 'type_format': '<I'},
|
|
'sn76489_clock': {'offset': 0x0c, 'size': 4, 'type_format': '<I'},
|
|
'ym2413_clock': {'offset': 0x10, 'size': 4, 'type_format': '<I'},
|
|
'gd3_offset': {'offset': 0x14, 'size': 4, 'type_format': '<I'},
|
|
'total_samples': {'offset': 0x18, 'size': 4, 'type_format': '<I'},
|
|
'loop_offset': {'offset': 0x1c, 'size': 4, 'type_format': '<I'},
|
|
'loop_samples': {'offset': 0x20, 'size': 4, 'type_format': '<I'},
|
|
'rate': {'offset': 0x24, 'size': 4, 'type_format': '<I'},
|
|
'sn76489_feedback': {
|
|
'offset': 0x28,
|
|
'size': 2,
|
|
'type_format': '<H',
|
|
},
|
|
'sn76489_shift_register_width': {
|
|
'offset': 0x2a,
|
|
'size': 1,
|
|
'type_format': 'B',
|
|
},
|
|
'ym2612_clock': {'offset': 0x2c, 'size': 4, 'type_format': '<I'},
|
|
'ym2151_clock': {'offset': 0x30, 'size': 4, 'type_format': '<I'},
|
|
'vgm_data_offset': {
|
|
'offset': 0x34,
|
|
'size': 4,
|
|
'type_format': '<I',
|
|
},
|
|
|
|
},
|
|
# SDM Hacked version number, we are happy enough to parse v1.61 as if it were 1.50 since the 1.51 updates dont apply to us anyway
|
|
0x00000161: {
|
|
'vgm_ident': {'offset': 0x00, 'size': 4, 'type_format': None},
|
|
'eof_offset': {'offset': 0x04, 'size': 4, 'type_format': '<I'},
|
|
'version': {'offset': 0x08, 'size': 4, 'type_format': '<I'},
|
|
'sn76489_clock': {'offset': 0x0c, 'size': 4, 'type_format': '<I'},
|
|
'ym2413_clock': {'offset': 0x10, 'size': 4, 'type_format': '<I'},
|
|
'gd3_offset': {'offset': 0x14, 'size': 4, 'type_format': '<I'},
|
|
'total_samples': {'offset': 0x18, 'size': 4, 'type_format': '<I'},
|
|
'loop_offset': {'offset': 0x1c, 'size': 4, 'type_format': '<I'},
|
|
'loop_samples': {'offset': 0x20, 'size': 4, 'type_format': '<I'},
|
|
'rate': {'offset': 0x24, 'size': 4, 'type_format': '<I'},
|
|
'sn76489_feedback': {
|
|
'offset': 0x28,
|
|
'size': 2,
|
|
'type_format': '<H',
|
|
},
|
|
'sn76489_shift_register_width': {
|
|
'offset': 0x2a,
|
|
'size': 1,
|
|
'type_format': 'B',
|
|
},
|
|
'ym2612_clock': {'offset': 0x2c, 'size': 4, 'type_format': '<I'},
|
|
'ym2151_clock': {'offset': 0x30, 'size': 4, 'type_format': '<I'},
|
|
'vgm_data_offset': {
|
|
'offset': 0x34,
|
|
'size': 4,
|
|
'type_format': '<I',
|
|
},
|
|
}
|
|
}
|
|
|
|
|
|
# constructor - pass in the filename of the VGM
|
|
def __init__(self, vgm_filename):
|
|
|
|
self.vgm_filename = vgm_filename
|
|
print " VGM file loaded : '" + vgm_filename + "'"
|
|
|
|
# open the vgm file and parse it
|
|
vgm_file = open(vgm_filename, 'rb')
|
|
vgm_data = vgm_file.read()
|
|
|
|
# Store the VGM data and validate it
|
|
self.data = ByteBuffer(vgm_data)
|
|
|
|
vgm_file.close()
|
|
|
|
# parse
|
|
self.validate_vgm_data()
|
|
|
|
# Set up the variables that will be populated
|
|
self.command_list = []
|
|
self.data_block = None
|
|
self.gd3_data = {}
|
|
self.metadata = {}
|
|
|
|
# Parse the VGM metadata and validate the VGM version
|
|
self.parse_metadata()
|
|
|
|
# Display info about the file
|
|
self.vgm_loop_offset = self.metadata['loop_offset']
|
|
self.vgm_loop_length = self.metadata['loop_samples']
|
|
|
|
print " VGM Version : " + "%x" % int(self.metadata['version'])
|
|
print "VGM SN76489 clock : " + str(float(self.metadata['sn76489_clock'])/1000000) + " MHz"
|
|
print " VGM Rate : " + str(float(self.metadata['rate'])) + " Hz"
|
|
print " VGM Samples : " + str(int(self.metadata['total_samples'])) + " (" + str(int(self.metadata['total_samples'])/self.VGM_FREQUENCY) + " seconds)"
|
|
print " VGM Loop Offset : " + str(self.vgm_loop_offset)
|
|
print " VGM Loop Length : " + str(self.vgm_loop_length)
|
|
|
|
|
|
|
|
|
|
# Validation to check we can parse it
|
|
self.validate_vgm_version()
|
|
|
|
# Sanity check this VGM is suitable for this script - must be SN76489 only
|
|
if self.metadata['sn76489_clock'] == 0 or self.metadata['ym2413_clock'] !=0 or self.metadata['ym2413_clock'] !=0 or self.metadata['ym2413_clock'] !=0:
|
|
raise FatalError("This script only supports VGM's for SN76489 PSG")
|
|
|
|
# see if this VGM uses Dual Chip mode
|
|
if (self.metadata['sn76489_clock'] & 0x40000000) == 0x40000000:
|
|
self.dual_chip_mode_enabled = True
|
|
else:
|
|
self.dual_chip_mode_enabled = False
|
|
|
|
print " VGM Dual Chip : " + str(self.dual_chip_mode_enabled)
|
|
|
|
|
|
# override/disable dual chip commands in the output stream if required
|
|
if (self.disable_dual_chip == True) and (self.dual_chip_mode_enabled == True) :
|
|
# remove the clock flag that enables dual chip mode
|
|
self.metadata['sn76489_clock'] = self.metadata['sn76489_clock'] & 0xbfffffff
|
|
self.dual_chip_mode_enabled = False
|
|
print "Dual Chip Mode Disabled - DC Commands will be removed"
|
|
|
|
# take a copy of the clock speed for the VGM processor functions
|
|
self.vgm_source_clock = self.metadata['sn76489_clock']
|
|
self.vgm_target_clock = self.vgm_source_clock
|
|
|
|
# Parse GD3 data and the VGM commands
|
|
self.parse_gd3()
|
|
self.parse_commands()
|
|
|
|
print " VGM Commands # : " + str(len(self.command_list))
|
|
print ""
|
|
|
|
|
|
def validate_vgm_data(self):
|
|
# Save the current position of the VGM data
|
|
original_pos = self.data.tell()
|
|
|
|
# Seek to the start of the file
|
|
self.data.seek(0)
|
|
|
|
# Perform basic validation on the given file by checking for the VGM
|
|
# magic number ('Vgm ')
|
|
if self.data.read(4) != self.vgm_magic_number:
|
|
# Could not find the magic number. The file could be gzipped (e.g.
|
|
# a vgz file). Try un-gzipping the file and trying again.
|
|
self.data.seek(0)
|
|
self.data = gzip.GzipFile(fileobj=self.data, mode='rb')
|
|
|
|
try:
|
|
if self.data.read(4) != self.vgm_magic_number:
|
|
print "Error: Data does not appear to be a valid VGM file"
|
|
raise ValueError('Data does not appear to be a valid VGM file')
|
|
except IOError:
|
|
print "Error: Data does not appear to be a valid VGM file"
|
|
# IOError will be raised if the file is not a valid gzip file
|
|
raise ValueError('Data does not appear to be a valid VGM file')
|
|
|
|
# Seek back to the original position in the VGM data
|
|
self.data.seek(original_pos)
|
|
|
|
def parse_metadata(self):
|
|
# Save the current position of the VGM data
|
|
original_pos = self.data.tell()
|
|
|
|
# Create the list to store the VGM metadata
|
|
self.metadata = {}
|
|
|
|
# Iterate over the offsets and parse the metadata
|
|
for version, offsets in self.metadata_offsets.items():
|
|
for value, offset_data in offsets.items():
|
|
|
|
# Seek to the data location and read the data
|
|
self.data.seek(offset_data['offset'])
|
|
data = self.data.read(offset_data['size'])
|
|
|
|
# Unpack the data if required
|
|
if offset_data['type_format'] is not None:
|
|
self.metadata[value] = struct.unpack(
|
|
offset_data['type_format'],
|
|
data,
|
|
)[0]
|
|
else:
|
|
self.metadata[value] = data
|
|
|
|
# Seek back to the original position in the VGM data
|
|
self.data.seek(original_pos)
|
|
|
|
def validate_vgm_version(self):
|
|
if self.metadata['version'] not in self.supported_ver_list:
|
|
print "VGM version is not supported"
|
|
raise FatalError('VGM version is not supported')
|
|
|
|
def parse_gd3(self):
|
|
# Save the current position of the VGM data
|
|
original_pos = self.data.tell()
|
|
|
|
# Seek to the start of the GD3 data
|
|
self.data.seek(
|
|
self.metadata['gd3_offset'] +
|
|
self.metadata_offsets[self.metadata['version']]['gd3_offset']['offset']
|
|
)
|
|
|
|
# Skip 8 bytes ('Gd3 ' string and 4 byte version identifier)
|
|
self.data.seek(8, 1)
|
|
|
|
# Get the length of the GD3 data, then read it
|
|
gd3_length = struct.unpack('<I', self.data.read(4))[0]
|
|
gd3_data = ByteBuffer(self.data.read(gd3_length))
|
|
|
|
# Parse the GD3 data
|
|
gd3_fields = []
|
|
current_field = b''
|
|
while True:
|
|
# Read two bytes. All characters (English and Japanese) in the GD3
|
|
# data use two byte encoding
|
|
char = gd3_data.read(2)
|
|
|
|
# Break if we are at the end of the GD3 data
|
|
if char == b'':
|
|
break
|
|
|
|
# Check if we are at the end of a field, if not then continue to
|
|
# append to "current_field"
|
|
if char == b'\x00\x00':
|
|
gd3_fields.append(current_field)
|
|
current_field = b''
|
|
else:
|
|
current_field += char
|
|
|
|
# Once all the fields have been parsed, create a dict with the data
|
|
# some Gd3 tags dont have notes section
|
|
gd3_notes = ''
|
|
gd3_title_eng = basename(self.vgm_filename).encode("utf_16")
|
|
if len(gd3_fields) > 10:
|
|
gd3_notes = gd3_fields[10]
|
|
|
|
if len(gd3_fields) > 8:
|
|
|
|
if len(gd3_fields[0]) > 0:
|
|
gd3_title_eng = gd3_fields[0]
|
|
|
|
|
|
self.gd3_data = {
|
|
'title_eng': gd3_title_eng,
|
|
'title_jap': gd3_fields[1],
|
|
'game_eng': gd3_fields[2],
|
|
'game_jap': gd3_fields[3],
|
|
'console_eng': gd3_fields[4],
|
|
'console_jap': gd3_fields[5],
|
|
'artist_eng': gd3_fields[6],
|
|
'artist_jap': gd3_fields[7],
|
|
'date': gd3_fields[8],
|
|
'vgm_creator': gd3_fields[9],
|
|
'notes': gd3_notes
|
|
}
|
|
else:
|
|
print "WARNING: Malformed/missing GD3 tag"
|
|
self.gd3_data = {
|
|
'title_eng': gd3_title_eng,
|
|
'title_jap': '',
|
|
'game_eng': '',
|
|
'game_jap': '',
|
|
'console_eng': '',
|
|
'console_jap': '',
|
|
'artist_eng': 'Unknown'.encode("utf_16"),
|
|
'artist_jap': '',
|
|
'date': '',
|
|
'vgm_creator': '',
|
|
'notes': ''
|
|
}
|
|
|
|
|
|
# Seek back to the original position in the VGM data
|
|
self.data.seek(original_pos)
|
|
|
|
#-------------------------------------------------------------------------------------------------
|
|
|
|
def parse_commands(self):
|
|
# Save the current position of the VGM data
|
|
original_pos = self.data.tell()
|
|
|
|
# Seek to the start of the VGM data
|
|
self.data.seek(
|
|
self.metadata['vgm_data_offset'] +
|
|
self.metadata_offsets[self.metadata['version']]['vgm_data_offset']['offset']
|
|
)
|
|
|
|
while True:
|
|
# Read a byte, this will be a VGM command, we will then make
|
|
# decisions based on the given command
|
|
command = self.data.read(1)
|
|
|
|
# Break if we are at the end of the file
|
|
if command == '':
|
|
break
|
|
|
|
# 0x4f dd - Game Gear PSG stereo, write dd to port 0x06
|
|
# 0x50 dd - PSG (SN76489/SN76496) write value dd
|
|
if command in [b'\x4f', b'\x50']:
|
|
self.command_list.append({
|
|
'command': command,
|
|
'data': self.data.read(1),
|
|
})
|
|
|
|
# 0x51 aa dd - YM2413, write value dd to register aa
|
|
# 0x52 aa dd - YM2612 port 0, write value dd to register aa
|
|
# 0x53 aa dd - YM2612 port 1, write value dd to register aa
|
|
# 0x54 aa dd - YM2151, write value dd to register aa
|
|
elif command in [b'\x51', b'\x52', b'\x53', b'\x54']:
|
|
self.command_list.append({
|
|
'command': command,
|
|
'data': self.data.read(2),
|
|
})
|
|
|
|
# 0x61 nn nn - Wait n samples, n can range from 0 to 65535
|
|
elif command == b'\x61':
|
|
self.command_list.append({
|
|
'command': command,
|
|
'data': self.data.read(2),
|
|
})
|
|
|
|
# 0x62 - Wait 735 samples (60th of a second)
|
|
# 0x63 - Wait 882 samples (50th of a second)
|
|
# 0x66 - End of sound data
|
|
elif command in [b'\x62', b'\x63', b'\x66']:
|
|
self.command_list.append({'command': command, 'data': None})
|
|
|
|
# Stop processing commands if we are at the end of the music
|
|
# data
|
|
if command == b'\x66':
|
|
break
|
|
|
|
# 0x67 0x66 tt ss ss ss ss - Data block
|
|
elif command == b'\x67':
|
|
# Skip the compatibility and type bytes (0x66 tt)
|
|
self.data.seek(2, 1)
|
|
|
|
# Read the size of the data block
|
|
data_block_size = struct.unpack('<I', self.data.read(4))[0]
|
|
|
|
# Store the data block for later use
|
|
self.data_block = ByteBuffer(self.data.read(data_block_size))
|
|
|
|
# 0x7n - Wait n+1 samples, n can range from 0 to 15
|
|
# 0x8n - YM2612 port 0 address 2A write from the data bank, then
|
|
# wait n samples; n can range from 0 to 15
|
|
elif b'\x70' <= command <= b'\x8f':
|
|
self.command_list.append({'command': command, 'data': None})
|
|
|
|
# 0xe0 dddddddd - Seek to offset dddddddd (Intel byte order) in PCM
|
|
# data bank
|
|
elif command == b'\xe0':
|
|
self.command_list.append({
|
|
'command': command,
|
|
'data': self.data.read(4),
|
|
})
|
|
|
|
# 0x30 dd - dual chip command
|
|
elif command == b'\x30':
|
|
if self.dual_chip_mode_enabled:
|
|
self.command_list.append({
|
|
'command': command,
|
|
'data': self.data.read(1),
|
|
})
|
|
|
|
|
|
# Seek back to the original position in the VGM data
|
|
self.data.seek(original_pos)
|
|
|
|
|
|
#-------------------------------------------------------------------------------------------------
|
|
|
|
|
|
|
|
def write_vgm(self, filename):
|
|
|
|
print " VGM Processing : Writing output VGM file '" + filename + "'"
|
|
vgm_stream = bytearray()
|
|
vgm_time = 0
|
|
|
|
# convert the VGM command list to a byte array
|
|
for elem in self.command_list:
|
|
command = elem['command']
|
|
data = elem['data']
|
|
|
|
|
|
# track time offset for debug purposes
|
|
if b'\x70' <= command <= b'\x7f':
|
|
pdata = binascii.hexlify(command)
|
|
t = int(pdata, 16)
|
|
t &= 15
|
|
t += 1
|
|
vgm_time += t
|
|
scommand = "WAITn"
|
|
#if self.VERBOSE: print " WAITN=" + str(t)
|
|
else:
|
|
pcommand = binascii.hexlify(command)
|
|
|
|
if pcommand == "61":
|
|
scommand = "WAIT"
|
|
pdata = binascii.hexlify(data)
|
|
t = int(pdata, 16)
|
|
# sdm: swap bytes to LSB
|
|
lsb = t & 255
|
|
msb = (t / 256)
|
|
t = (lsb * 256) + msb
|
|
vgm_time += t
|
|
#if self.VERBOSE: print " WAIT=" + str(t)
|
|
else:
|
|
if pcommand == "62": #wait60
|
|
vgm_time += 735
|
|
else:
|
|
if pcommand == "63": #wait50
|
|
vgm_time += 882
|
|
|
|
|
|
|
|
if (data != None):
|
|
if self.VERBOSE: print "command=" + str(binascii.hexlify(command)) + ", data=" + str(binascii.hexlify(data)) + ", time=" + str(float(vgm_time)/44100.0) + " secs"
|
|
|
|
# filter dual chip
|
|
if b'\x30' == command:
|
|
if self.VERBOSE: print "DUAL CHIP COMMAND"
|
|
#continue
|
|
#command = b'\x50'
|
|
|
|
|
|
vgm_stream.extend(command)
|
|
if (data != None):
|
|
vgm_stream.extend(data)
|
|
|
|
vgm_stream_length = len(vgm_stream)
|
|
|
|
# build the GD3 data block
|
|
gd3_data = bytearray()
|
|
gd3_stream = bytearray()
|
|
gd3_stream_length = 0
|
|
|
|
gd3_offset = 0
|
|
if self.STRIP_GD3 == False:
|
|
gd3_data.extend(self.gd3_data['title_eng'] + b'\x00\x00')
|
|
gd3_data.extend(self.gd3_data['title_jap'] + b'\x00\x00')
|
|
gd3_data.extend(self.gd3_data['game_eng'] + b'\x00\x00')
|
|
gd3_data.extend(self.gd3_data['game_jap'] + b'\x00\x00')
|
|
gd3_data.extend(self.gd3_data['console_eng'] + b'\x00\x00')
|
|
gd3_data.extend(self.gd3_data['console_jap'] + b'\x00\x00')
|
|
gd3_data.extend(self.gd3_data['artist_eng'] + b'\x00\x00')
|
|
gd3_data.extend(self.gd3_data['artist_jap'] + b'\x00\x00')
|
|
gd3_data.extend(self.gd3_data['date'] + b'\x00\x00')
|
|
gd3_data.extend(self.gd3_data['vgm_creator'] + b'\x00\x00')
|
|
gd3_data.extend(self.gd3_data['notes'] + b'\x00\x00')
|
|
|
|
gd3_stream.extend('Gd3 ')
|
|
gd3_stream.extend(struct.pack('I', 0x100)) # GD3 version
|
|
gd3_stream.extend(struct.pack('I', len(gd3_data))) # GD3 length
|
|
gd3_stream.extend(gd3_data)
|
|
|
|
gd3_offset = (64-20) + vgm_stream_length
|
|
gd3_stream_length = len(gd3_stream)
|
|
else:
|
|
print " VGM Processing : GD3 tag was stripped"
|
|
|
|
# build the full VGM output stream
|
|
vgm_data = bytearray()
|
|
vgm_data.extend(self.vgm_magic_number)
|
|
vgm_data.extend(struct.pack('I', 64 + vgm_stream_length + gd3_stream_length - 4)) # EoF offset
|
|
vgm_data.extend(struct.pack('I', 0x00000151)) # Version
|
|
vgm_data.extend(struct.pack('I', self.metadata['sn76489_clock']))
|
|
vgm_data.extend(struct.pack('I', self.metadata['ym2413_clock']))
|
|
vgm_data.extend(struct.pack('I', gd3_offset)) # GD3 offset
|
|
vgm_data.extend(struct.pack('I', self.metadata['total_samples'])) # total samples
|
|
vgm_data.extend(struct.pack('I', 0)) #self.metadata['loop_offset'])) # loop offset
|
|
vgm_data.extend(struct.pack('I', 0)) #self.metadata['loop_samples'])) # loop # samples
|
|
vgm_data.extend(struct.pack('I', self.metadata['rate'])) # rate
|
|
vgm_data.extend(struct.pack('H', self.metadata['sn76489_feedback'])) # sn fb
|
|
vgm_data.extend(struct.pack('B', self.metadata['sn76489_shift_register_width'])) # SNW
|
|
vgm_data.extend(struct.pack('B', 0)) # SN Flags
|
|
vgm_data.extend(struct.pack('I', self.metadata['ym2612_clock']))
|
|
vgm_data.extend(struct.pack('I', self.metadata['ym2151_clock']))
|
|
vgm_data.extend(struct.pack('I', 12)) # VGM data offset
|
|
vgm_data.extend(struct.pack('I', 0)) # SEGA PCM clock
|
|
vgm_data.extend(struct.pack('I', 0)) # SPCM interface
|
|
|
|
# attach the vgm data
|
|
vgm_data.extend(vgm_stream)
|
|
|
|
# attach the vgm gd3 tag if required
|
|
if self.STRIP_GD3 == False:
|
|
vgm_data.extend(gd3_stream)
|
|
|
|
# write to output file
|
|
vgm_file = open(filename, 'wb')
|
|
vgm_file.write(vgm_data)
|
|
vgm_file.close()
|
|
|
|
print " VGM Processing : Written " + str(int(len(vgm_data))) + " bytes, GD3 tag used " + str(gd3_stream_length) + " bytes"
|
|
|
|
print "All done."
|
|
|
|
#-------------------------------------------------------------------------------------------------
|
|
|
|
# clock_type can be NTSC, PAL or BBC (case insensitive)
|
|
def set_target_clock(self, clock_type):
|
|
if clock_type.lower() == 'ntsc':
|
|
self.metadata['sn76489_feedback'] = 0x0006 # 0x0006 for SN76494, SN76496
|
|
self.metadata['sn76489_clock'] = 3579545 # usually 3.579545 MHz (NTSC) for Sega-based PSG tunes
|
|
self.metadata['sn76489_shift_register_width'] = 16 #
|
|
self.vgm_target_clock = self.metadata['sn76489_clock']
|
|
else:
|
|
if clock_type.lower() == 'pal':
|
|
self.metadata['sn76489_feedback'] = 0x0006 # 0x0006 for SN76494, SN76496
|
|
self.metadata['sn76489_clock'] = 4433619 # 4.43361875 Mz for PAL
|
|
self.metadata['sn76489_shift_register_width'] = 16 #
|
|
self.vgm_target_clock = self.metadata['sn76489_clock']
|
|
else:
|
|
if clock_type.lower() == 'bbc':
|
|
self.metadata['sn76489_feedback'] = 0x0003 # 0x0003 for BBC configuration of SN76489
|
|
self.metadata['sn76489_clock'] = 4000000 # 4.0 Mhz on Beeb,
|
|
self.metadata['sn76489_shift_register_width'] = 15 # BBC taps bit 15 on the SR
|
|
self.vgm_target_clock = self.metadata['sn76489_clock']
|
|
|
|
|
|
|
|
#-------------------------------------------------------------------------------------------------
|
|
def set_verbose(self, verbose):
|
|
self.VERBOSE = verbose
|
|
|
|
#-------------------------------------------------------------------------------------------------
|
|
def set_length(self, length):
|
|
self.LENGTH = int(length)
|
|
|
|
|
|
#-------------------------------------------------------------------------------------------------
|
|
|
|
# helper function
|
|
# given a start offset (default 0) into the command list, find the next index where
|
|
# the command matches search_command or return -1 if no more of these commands can be found.
|
|
def find_next_command(self, search_command, offset = 0):
|
|
for j in range(offset, len(self.command_list)):
|
|
c = self.command_list[j]["command"]
|
|
|
|
# only process write data commands
|
|
if c == search_command:
|
|
return j
|
|
else:
|
|
return -1
|
|
|
|
#-------------------------------------------------------------------------------------------------
|
|
|
|
# iterate through the command list, removing any write commands that are destined for filter_channel_id
|
|
def filter_channel(self, filter_channel_id):
|
|
print " VGM Processing : Filtering channel " + str(filter_channel_id)
|
|
|
|
filtered_command_list = []
|
|
j = 0
|
|
latched_channel = 0
|
|
for q in self.command_list:
|
|
|
|
# only process write data commands
|
|
if q["command"] != struct.pack('B', 0x50):
|
|
filtered_command_list.append(q)
|
|
else:
|
|
# Check if LATCH/DATA write
|
|
qdata = q["data"]
|
|
qw = int(binascii.hexlify(qdata), 16)
|
|
if qw & 128:
|
|
# Get channel id and latch it
|
|
latched_channel = (qw>>5)&3
|
|
|
|
if latched_channel != filter_channel_id:
|
|
filtered_command_list.append(q)
|
|
|
|
self.command_list = filtered_command_list
|
|
|
|
#-------------------------------------------------------------------------------------------------
|
|
# iterate through the command list, unpacking any single tone writes on a channel
|
|
def unpack_tones(self):
|
|
print " VGM Processing : Unpacking tones "
|
|
|
|
filtered_command_list = []
|
|
j = 0
|
|
latched_channel = 0
|
|
latched_tone_frequencies = [0, 0, 0, 0]
|
|
|
|
for n in range(len(self.command_list)):
|
|
q = self.command_list[n]
|
|
|
|
# we always output at least the same data stream, but we might inject a new tone write if needed
|
|
filtered_command_list.append(q)
|
|
# only process write data commands
|
|
if q["command"] == struct.pack('B', 0x50):
|
|
|
|
# Check if LATCH/DATA write
|
|
qdata = q["data"]
|
|
qw = int(binascii.hexlify(qdata), 16)
|
|
if qw & 128:
|
|
# Get channel id and latch it
|
|
latched_channel = (qw>>5)&3
|
|
|
|
|
|
# Check if TONE update
|
|
if (qw & 16) == 0:
|
|
|
|
# get low 4 bits and merge with latched channel's frequency register
|
|
qfreq = (qw & 0b00001111)
|
|
latched_tone_frequencies[latched_channel] = (latched_tone_frequencies[latched_channel] & 0b1111110000) | qfreq
|
|
|
|
|
|
# look ahead, and see if the next command is a DATA write as if so, this will be part of the same tone commmand
|
|
# so load this into our register as well so that we have the correct tone frequency to work with
|
|
|
|
multi_write = False
|
|
nindex = n
|
|
while (nindex < (len(self.command_list)-1)):# check we dont overflow the array, bail if we do, since it means we didn't find any further DATA writes.
|
|
nindex += 1
|
|
|
|
ncommand = self.command_list[nindex]["command"]
|
|
# skip any non-VGM-write commands
|
|
if ncommand != struct.pack('B', 0x50):
|
|
continue
|
|
else:
|
|
# found the next VGM write command
|
|
ndata = self.command_list[nindex]["data"]
|
|
|
|
# Check if next this is a DATA write, and capture frequency if so
|
|
# otherwise, its a LATCH/DATA write, so no additional frequency to process
|
|
nw = int(binascii.hexlify(ndata), 16)
|
|
if (nw & 128) == 0:
|
|
multi_write = True
|
|
nfreq = (nw & 0b00111111)
|
|
latched_tone_frequencies[latched_channel] = (latched_tone_frequencies[latched_channel] & 0b0000001111) | (nfreq << 4)
|
|
|
|
break
|
|
|
|
# if we detected a single register tone write, we need as unpack it, to make sure transposing works correctly
|
|
if multi_write == False and latched_channel != 3:
|
|
if self.VERBOSE: print " UNPACKING SINGLE REGISTER TONE WRITE on CHANNEL " + str(latched_channel)
|
|
# inject additional tone write to prevent any more single register tone writes
|
|
# re-program q and re-cycle it
|
|
|
|
hi_data = (latched_tone_frequencies[latched_channel]>>4) & 0b00111111
|
|
new_q = { "command" : q["command"], "data" : struct.pack('B', hi_data) }
|
|
filtered_command_list.append(new_q)
|
|
|
|
|
|
self.command_list = filtered_command_list
|
|
|
|
#-------------------------------------------------------------------------------------------------
|
|
|
|
# Process the tone frequencies in the VGM for the given clock_type ('ntsc', 'pal' or 'bbc')
|
|
# such that the output VGM plays at the same pitch as the original, but using the target clock speeds.
|
|
# Tuned periodic and white noise are also transposed.
|
|
def transpose(self, clock_type):
|
|
|
|
self.unpack_tones()
|
|
|
|
|
|
# setup the correct target chip parameters
|
|
self.set_target_clock(clock_type)
|
|
|
|
# total number of commands in the vgm stream
|
|
num_commands = len(self.command_list)
|
|
|
|
# re-tune any tone commands if target clock is different to source clock
|
|
# i think it's safe to do this in the quantized packets we've created, as they tend to be completed within a single time slot
|
|
# (eg. little or no chance of a multi-tone LATCH+DATA write being split by a wait command)
|
|
|
|
if (self.vgm_source_clock != self.vgm_target_clock):
|
|
|
|
print " VGM Processing : Re-tuning VGM to new clock speed"
|
|
print " VGM Processing : Original clock " + str(float(self.vgm_source_clock)/1000000.0) + " MHz, Target Clock " + str(float(self.vgm_target_clock)/1000000.0) + " MHz"
|
|
|
|
# used by the clock retuning code, initialized once at the start of the song, so that latched register states are preserved across the song
|
|
latched_tone_frequencies = [0, 0, 0, 0]
|
|
latched_volumes = [0, 0, 0, 0]
|
|
tone2_offsets = [-1, -1]
|
|
latched_channel = 0
|
|
vgm_time = 0
|
|
|
|
# helper function
|
|
# calculates a retuned tone frequency based on given frequency & periodic noise indication
|
|
# returns retuned frequency.
|
|
# does not change any external state
|
|
def recalc_frequency(tone_frequency, is_periodic_noise_tone = False):
|
|
|
|
if self.VERBOSE: print " recalc_frequency(), vgm_time=" + str(vgm_time) + " clock time=" + str(float(vgm_time)/44100.0) + " secs"
|
|
|
|
|
|
# compute the correct frequency
|
|
# first check it is not 0 (illegal value)
|
|
output_freq = 0
|
|
if tone_frequency == 0:
|
|
if self.VERBOSE: print "Zero frequency tone detected on channel "# + str(latched_channel)
|
|
else:
|
|
|
|
# compute correct hz frequency of current tone from formula:
|
|
#
|
|
# hz = Clock Or for periodic noise: hz = Clock where SR is 15 or 16 depending on chip
|
|
# ------------- ------------------
|
|
# ( 2 x N x 16) ( 2 x N x 16 x SR)
|
|
|
|
if is_periodic_noise_tone:
|
|
if self.VERBOSE: print "Periodic noise tone"
|
|
noise_ratio = (15.0 / 16.0) * (float(self.vgm_source_clock) / float(self.vgm_target_clock))
|
|
v = float(tone_frequency) / noise_ratio
|
|
if self.VERBOSE: print "noise_ratio=" + str(noise_ratio)
|
|
if self.VERBOSE: print "original freq=" + str(tone_frequency) + ", new freq=" + str(v)
|
|
if self.VERBOSE: print "retuned periodic noise effect on channel 2"
|
|
|
|
else:
|
|
if self.VERBOSE: print "Normal tone"
|
|
# compute corrected tone register value for generating the same frequency using the target chip's clock rate
|
|
hz = float(self.vgm_source_clock) / ( 2.0 * float(tone_frequency) * 16.0)
|
|
if self.VERBOSE: print "hz=" + str(hz)
|
|
v = float(self.vgm_target_clock) / (2.0 * hz * 16.0 )
|
|
if self.VERBOSE: print "v=" + str(v)
|
|
|
|
# due to the integer maths, some precision is lost at the lower end
|
|
output_freq = int(round(v)) # using round minimizes error margin at lower precision
|
|
# clamp range to 10 bits
|
|
if output_freq > 1023:
|
|
output_freq = 1023
|
|
if output_freq < 1:
|
|
output_freq = 1
|
|
|
|
if is_periodic_noise_tone:
|
|
hz1 = float(self.vgm_source_clock) / (2.0 * float(tone_frequency) * 16.0 * 15.0) # target frequency
|
|
hz2 = float(self.vgm_target_clock) / (2.0 * float(output_freq) * 16.0 * 15.0)
|
|
else:
|
|
hz1 = float(self.vgm_source_clock) / (2.0 * float(tone_frequency) * 16.0) # target frequency
|
|
hz2 = float(self.vgm_target_clock) / (2.0 * float(output_freq) * 16.0)
|
|
|
|
hz_err = hz2-hz1
|
|
if self.VERBOSE: print "channel=" + str(latched_channel) + ", old frequency=" + str(tone_frequency) + ", new frequency=" + str(output_freq) + ", source_clock=" + str(self.vgm_source_clock) + ", target_clock=" + str(self.vgm_target_clock) + ", src_hz=" + str(hz1) + ", tgt_hz=" + str(hz2) + ", hz_err =" + str(hz_err)
|
|
if hz_err > 2.0 or hz_err < -2.0:
|
|
print " WARNING: Large error transposing tone! [" + str(hz_err) + " Hz ] (channel="+str(latched_channel)+", PN="+str(is_periodic_noise_tone)+")"
|
|
#if self.VERBOSE: print ""
|
|
|
|
return output_freq
|
|
|
|
|
|
TEST_OUTPUT = False # show additional test output in this section
|
|
|
|
# iterate through write commands looking for tone writes and recalculate their frequencies
|
|
## first create a reference copy of the command list (just for a tuning hack below)
|
|
#command_list_copy = list(self.command_list)
|
|
|
|
for n in range(len(self.command_list)):
|
|
command = self.command_list[n]["command"]
|
|
|
|
# track time offset for debug purposes
|
|
data = self.command_list[n]["data"]
|
|
if b'\x70' <= command <= b'\x7f':
|
|
pdata = binascii.hexlify(command)
|
|
t = int(pdata, 16)
|
|
t &= 15
|
|
t += 1
|
|
vgm_time += t
|
|
scommand = "WAITn"
|
|
#if self.VERBOSE: print " WAITN=" + str(t)
|
|
else:
|
|
pcommand = binascii.hexlify(command)
|
|
|
|
if pcommand == "61":
|
|
scommand = "WAIT"
|
|
pdata = binascii.hexlify(data)
|
|
t = int(pdata, 16)
|
|
# sdm: swap bytes to LSB
|
|
lsb = t & 255
|
|
msb = (t / 256)
|
|
t = (lsb * 256) + msb
|
|
vgm_time += t
|
|
#if self.VERBOSE: print " WAIT=" + str(t)
|
|
else:
|
|
if pcommand == "62": #wait60
|
|
vgm_time += 735
|
|
else:
|
|
if pcommand == "63": #wait50
|
|
vgm_time += 882
|
|
|
|
if TEST_OUTPUT:
|
|
print "TEST: vgm frame (1/60ths) = " + str( (vgm_time/735) )
|
|
|
|
# only process write data commands
|
|
if command == struct.pack('B', 0x50):
|
|
qdata = self.command_list[n]["data"]
|
|
|
|
# Check if LATCH/DATA write
|
|
qw = int(binascii.hexlify(qdata), 16)
|
|
if qw & 128:
|
|
|
|
# low tone values (min 0x001) generate high frequency
|
|
# high tone values (max 0x3ff) generate low frequency
|
|
|
|
# Get channel id and latch it
|
|
latched_channel = (qw>>5)&3
|
|
|
|
# Check if TONE or VOLUME update
|
|
if (qw & 16) != 0:
|
|
# track volumes so we can apply the periodic noise retune if necessary
|
|
|
|
# hack to force channel 2 volume high (so we can test periodic noise channel tuning)
|
|
#if latched_channel == 2:
|
|
# qw = qw & 0xf0
|
|
# quantized_command_list[n]["data"] = struct.pack('B', qw)
|
|
|
|
#------------------------------------------------------------------------------------------------------------
|
|
# extra test for scenarios where channel 2 and channel 3 volumes are audible together during tuned PN
|
|
# AND we are targetting an SN76489 chip with 15 bit duty cycle (SR) because these are not harmonious
|
|
# Explanation:
|
|
# On some SN76489 chips the periodic noise cycle is 16 bits and on some it is 15 bits.
|
|
# SN tone generators output continuous squarewaves at the given frequency, whereas the periodic noise generator
|
|
# emits a squarewave at the given frequency on channel 2 *every duty cycle* -
|
|
# which is either every 1/16 clocks or every 1/15 clocks (depending on chip config)
|
|
# At 1/16, the output frequency is in harmony with channel 2. So 440Hz on tone2
|
|
# will be 27.5Hz on PN channel3 (exactly 4 octaves lower).
|
|
# With the BBC, 440Hz on tone2 will deliver 29.33Hz on PN channel3 (*almost* 4 octaves lower - but out of tune).
|
|
#
|
|
# So some tunes are able to play all 4 channels with channel 3 in tuned PN mode, in harmony.
|
|
# Such tunes are not compatible with SN chip variants with 15-bit LFSR registers.
|
|
#
|
|
# Our coping strategy here is to detect and prioritize any currently playing tuned periodic noise, and ignore
|
|
# any volume changes on channel 2.
|
|
# We could attempt to stop the PN from playing if we detect volume changes on channel 2, but that would lead
|
|
# to much more complexity in how the registers are updated.
|
|
#
|
|
# remember that on SN chip, volume 15 is silent, and 0 is full
|
|
|
|
# UPDATE: there is an issue with this approach. If channel 2 keys on with a volume setting BEFORE channel 3
|
|
# has keyed off a previously playing tuned periodic noise, it will mistakenly think this is a quad tone.
|
|
|
|
|
|
|
|
new_volume = qw & 15
|
|
if TEST_OUTPUT:
|
|
print "TEST: channel " + str(latched_channel) + " volume set to " + str(new_volume)
|
|
|
|
# True/False to enable detection & correction of "quad tones"
|
|
if True:
|
|
if latched_channel == 2:
|
|
if new_volume != 15:
|
|
if latched_volumes[3] != 15:
|
|
if (latched_tone_frequencies[3]) & 3 == 3:
|
|
print "WARNING: Volume non zero on channel 2 when channel 3 is playing periodic noise, analysing..."
|
|
|
|
# ok, to make doubly sure we arent muting channel 2 unnecessarily, look ahead
|
|
# to see if any other volume writes (to set volume 0) on channel 3 are incoming for this time slot
|
|
volume_channel3_will_be_zeroed = False
|
|
nindex = n
|
|
while (nindex < (len(self.command_list)-1)):# check we dont overflow the array, bail if we do, since it means we didn't find any further DATA writes.
|
|
nindex += 1
|
|
|
|
ncommand = self.command_list[nindex]["command"]
|
|
# interpret any non-VGM-write commands to mean end of time slot
|
|
if ncommand != struct.pack('B', 0x50):
|
|
if TEST_OUTPUT:
|
|
print " INFO: end of time slot (command " + str(binascii.hexlify(ncommand)) + ")"
|
|
break
|
|
else:
|
|
if TEST_OUTPUT:
|
|
print " INFO: found command in same time slot"
|
|
# found the next VGM write command
|
|
ndata = self.command_list[nindex]["data"]
|
|
|
|
# Check if next this is a DATA write, and check if a channel 3 volume
|
|
nw = int(binascii.hexlify(ndata), 16)
|
|
if (nw & 128): # check for data
|
|
|
|
# Check incoming channel id and if volume command
|
|
incoming_channel_id = (nw>>5)&3
|
|
if TEST_OUTPUT:
|
|
print " INFO: is a data command on channel " + str(incoming_channel_id)
|
|
if (incoming_channel_id == 3) and (nw & 16) != 0:
|
|
if TEST_OUTPUT:
|
|
print " INFO: is a volume setting of " + str(nw & 15)
|
|
|
|
# Yes, it's a volume command on channel 3
|
|
if (nw & 15) == 15: # silent
|
|
print " INFO: detected incoming volume off on channel 3, overrides correction"
|
|
volume_channel3_will_be_zeroed = True
|
|
break
|
|
|
|
# so only mute channel 2 if we know channel 3 isnt going to be set to volume 15 this frame
|
|
if not volume_channel3_will_be_zeroed:
|
|
print " INFO: corrected volume, channel 2 was auto-muted due to active channel 3 periodic noise"
|
|
new_volume = 15
|
|
|
|
lo_data = (qw & 0b11110000) | (new_volume & 0b00001111)
|
|
self.command_list[n]["data"] = struct.pack('B', lo_data)
|
|
|
|
latched_volumes[latched_channel] = new_volume
|
|
else:
|
|
|
|
# save the index of this tone write if it's channel 2 (used below)
|
|
# since that might be influencing the frequency on channel 3
|
|
if latched_channel == 2:
|
|
tone2_offsets[0] = n
|
|
tone2_offsets[1] = -1
|
|
|
|
# get low 4 bits and merge with latched channel's frequency register
|
|
qfreq = (qw & 0b00001111)
|
|
latched_tone_frequencies[latched_channel] = (latched_tone_frequencies[latched_channel] & 0b1111110000) | qfreq
|
|
|
|
if TEST_OUTPUT:
|
|
print "TEST: channel " + str(latched_channel) + " pitch set to " + str(qfreq)
|
|
|
|
# check for non-tuned periodic noise (might sound out of tune when converted, because clock speed drives this)
|
|
# bit 2 of frequency on noise channel =0 for PN, or =1 for white noise
|
|
if latched_channel == 3 and (latched_tone_frequencies[3] < 3):
|
|
print "WARNING: Non-tuned periodic noise detected, may not sound in tune due to different clock speed."
|
|
|
|
# sanity check - detect if ratio of DATA writes is 1:1 with LATCH writes
|
|
if False:
|
|
nindex = n
|
|
dcount = 0
|
|
while (nindex < (len(self.command_list)-1)):# check we dont overflow the array, bail if we do, since it means we didn't find any further DATA writes.
|
|
nindex += 1
|
|
|
|
ncommand = self.command_list[nindex]["command"]
|
|
# skip any non-VGM-write commands
|
|
if ncommand != struct.pack('B', 0x50):
|
|
continue
|
|
else:
|
|
# found the next VGM write command
|
|
ndata = self.command_list[nindex]["data"]
|
|
|
|
# Check if next this is a DATA write, and capture frequency if so
|
|
# otherwise, its a LATCH/DATA write, so no additional frequency to process
|
|
nw = int(binascii.hexlify(ndata), 16)
|
|
if (nw & 128) == 0:
|
|
dcount += 1
|
|
else:
|
|
#if dcount > 1:
|
|
print "WARNING: DCOUNT=" + str(dcount) #DANGER WILL ROBINSON"
|
|
break
|
|
|
|
# look ahead, and see if the next command is a DATA write as if so, this will be part of the same tone commmand
|
|
# so load this into our register as well so that we have the correct tone frequency to work with
|
|
|
|
multi_write = False
|
|
nindex = n
|
|
while (nindex < (len(self.command_list)-1)):# check we dont overflow the array, bail if we do, since it means we didn't find any further DATA writes.
|
|
nindex += 1
|
|
|
|
ncommand = self.command_list[nindex]["command"]
|
|
# skip any non-VGM-write commands
|
|
if ncommand != struct.pack('B', 0x50):
|
|
continue
|
|
else:
|
|
# found the next VGM write command
|
|
ndata = self.command_list[nindex]["data"]
|
|
|
|
# Check if next this is a DATA write, and capture frequency if so
|
|
# otherwise, its a LATCH/DATA write, so no additional frequency to process
|
|
nw = int(binascii.hexlify(ndata), 16)
|
|
if (nw & 128) == 0:
|
|
multi_write = True
|
|
nfreq = (nw & 0b00111111)
|
|
latched_tone_frequencies[latched_channel] = (latched_tone_frequencies[latched_channel] & 0b0000001111) | (nfreq << 4)
|
|
|
|
# cache offset of the last tone2 channel write
|
|
if latched_channel == 2:
|
|
tone2_offsets[1] = nindex
|
|
break
|
|
|
|
|
|
|
|
|
|
# calculate the correct retuned frequncy for this channel
|
|
|
|
# leave channel 3 (noise channel) alone mostly.. it's not a frequency, unless its a tuned white/periodic noise
|
|
if latched_channel == 3:
|
|
new_freq = latched_tone_frequencies[latched_channel]
|
|
|
|
# if we're starting a tuned periodic or white noise, we may need to do further adjustments
|
|
# We check if volume on channel 2 is 15 (zero volume) because that indicates
|
|
# a tuned noise effect
|
|
# this is detected by bit0 and bit1 being set (bit2 being white(1) or periodic noise(0))
|
|
if True:
|
|
if (new_freq & 3 == 3) and latched_volumes[2] == 15:
|
|
|
|
if tone2_offsets[0] < 0:
|
|
# Likely cause of this is that the tuned PN is started, and the pitch is set on channel2 afterwards
|
|
print "WARNING: Unexpected scenario - tone2 offset is not set"
|
|
else:
|
|
|
|
#print "POTENTIAL RETUNE REQUIRED"
|
|
# ok we've detected a tuned noise on ch3, which is slightly more involved to correct.
|
|
# some tunes setup ch2 tone THEN ch2 vol THEN start the periodic noise, so we have to detect this case.
|
|
# we record the index in the command stream of when tone on ch2 was last set
|
|
# then we refer backwards to find the last ch2 tone write & correct it
|
|
# the current latched_tone_frequency is captured though, so transpose that as usual
|
|
f = recalc_frequency(latched_tone_frequencies[2], True)
|
|
|
|
# now write back to the previous channel 2 tone command(s) with the newly corrected frequency
|
|
zdata = self.command_list[tone2_offsets[0]]["data"]
|
|
zw = int(binascii.hexlify(zdata), 16)
|
|
lo_data = (zw & 0b11110000) | (f & 0b00001111)
|
|
|
|
|
|
self.command_list[tone2_offsets[0]]["data"] = struct.pack('B', lo_data)
|
|
|
|
# if this was part of a multi-write command (eg. one LATCH/DATA followed by one DATA write)
|
|
# update the second command too, with the correct frequency
|
|
if tone2_offsets[1] >= 0:
|
|
hi_data = (f>>4) & 0b00111111
|
|
self.command_list[tone2_offsets[1]]["data"] = struct.pack('B', hi_data)
|
|
tone2_offsets[1] = -1 # reset offset
|
|
|
|
|
|
|
|
else:
|
|
# to use the periodic noise effect as a bass line, it uses the tone on channel 2 to drive PN frequency on channel 3
|
|
# when the clock is different, the PN is different, so we have to apply a further correction
|
|
# typically tracks that use this effect will disable the volume of channel 2
|
|
# we detect this case and detune channel 2 tone by a further amount to correct for this
|
|
is_periodic_noise_tone = self.RETUNE_PERIODIC == True and latched_channel == 2 and latched_volumes[2] == 15 and (latched_tone_frequencies[3] & 3 == 3)
|
|
|
|
#if latched_channel == 2 and latched_volumes[2] != 15 and (latched_tone_frequencies[3] & 3 == 3):
|
|
# print "Found non-muted channel 2 with tuned channel 3 periodic noise "
|
|
|
|
new_freq = recalc_frequency(latched_tone_frequencies[latched_channel], is_periodic_noise_tone)
|
|
|
|
# write back the command(s) with the correct frequency
|
|
lo_data = (qw & 0b11110000) | (new_freq & 0b00001111)
|
|
self.command_list[n]["data"] = struct.pack('B', lo_data)
|
|
|
|
# if this was part of a multi-write command (eg. one LATCH/DATA followed by one DATA write)
|
|
# update the second command too, with the correct frequency
|
|
hi_data = -1
|
|
if multi_write == True:
|
|
hi_data = (new_freq>>4) & 0b00111111
|
|
self.command_list[nindex]["data"] = struct.pack('B', hi_data)
|
|
else:
|
|
if self.VERBOSE: print "SINGLE REGISTER TONE WRITE on CHANNEL " + str(latched_channel)
|
|
|
|
if self.VERBOSE: print "new_freq=0x" + format(new_freq, 'x') + ", lo_data=0x" + format(lo_data, '02x') + ", hi_data=0x" + format(hi_data, '02x')
|
|
if self.VERBOSE: print ""
|
|
else:
|
|
print "transpose() - No transposing necessary as target clock matches source clock"
|
|
|
|
#-------------------------------------------------------------------------------------------------
|
|
# iterate through the command list, removing any duplicate volume or tone writes
|
|
def optimize(self):
|
|
print " VGM Processing : Optimizing VGM Stream "
|
|
|
|
# total number of commands in the vgm stream
|
|
num_commands = len(self.command_list)
|
|
|
|
latched_tone_frequencies = [-1, -1, -1, -1]
|
|
latched_volumes = [-1, -1, -1, -1]
|
|
latched_channel = 0
|
|
|
|
optimized_command_list = []
|
|
|
|
removed_volume_count = 0
|
|
removed_tone_count = 0
|
|
|
|
skip_next_data_write = False
|
|
|
|
first_command = True
|
|
|
|
for i in range(num_commands):
|
|
|
|
# check if previous command has detected a redundant tone data write
|
|
# and move along if so.
|
|
if skip_next_data_write:
|
|
skip_next_data_write = False
|
|
continue
|
|
|
|
# fetch next command & associated data
|
|
command = self.command_list[i]["command"]
|
|
data = self.command_list[i]["data"]
|
|
|
|
# process the command
|
|
|
|
pcommand = binascii.hexlify(command)
|
|
# write command - add to optimized command list
|
|
if pcommand == "50":
|
|
|
|
pdata = binascii.hexlify(data)
|
|
w = int(pdata, 16)
|
|
|
|
# capture current channel
|
|
last_latched_channel = latched_channel
|
|
if w & 128:
|
|
latched_channel = (w>>5)&3
|
|
|
|
|
|
# latch volumes so that we can strip duplicate volume writes
|
|
if True:
|
|
if (w & 128+16) == (128+16):
|
|
vol = w & 15
|
|
# check if volume is the same and discard if so
|
|
if latched_volumes[latched_channel] != -1 and vol == latched_volumes[latched_channel]:
|
|
#print "Removed duplicate volume write"
|
|
removed_volume_count += 1
|
|
latched_channel = last_latched_channel
|
|
continue
|
|
else:
|
|
latched_volumes[latched_channel] = vol
|
|
|
|
# strip duplicate tone writes
|
|
if True:
|
|
if (w & 128+16) == 128:
|
|
|
|
# get low 4 bits and merge with latched channel's frequency register
|
|
tone_lo = (w & 0b00001111)
|
|
tone_hi = latched_tone_frequencies[latched_channel] & 0b1111110000
|
|
|
|
# look ahead to see if next command is a tone data write
|
|
if i < num_commands-1:
|
|
|
|
# fetch next command & associated data
|
|
c = self.command_list[i+1]
|
|
|
|
ncommand = c["command"]
|
|
ndata = c["data"]
|
|
|
|
# write command - add to optimized command list
|
|
if ncommand == struct.pack('B', 0x50):
|
|
|
|
nw = int(binascii.hexlify(ndata), 16)
|
|
if nw & 128 == 0:
|
|
tone_hi = (nw & 0b0000111111) << 4
|
|
skip_next_data_write = True
|
|
|
|
tone = tone_lo | tone_hi
|
|
|
|
# EXPERIMENTAL - remove non-white noise tones on channel 3
|
|
# since they cannot be translated to different clocks.
|
|
# doesnt seem to work as expected. :/
|
|
if False:
|
|
if self.vgm_target_clock != self.vgm_source_clock:
|
|
if latched_channel == 3 and (tone & 4) == 0:
|
|
if tone & 3 != 3:
|
|
removed_tone_count += 1
|
|
continue
|
|
|
|
if latched_tone_frequencies[latched_channel] != -1 and (latched_tone_frequencies[latched_channel] == tone):
|
|
#print "Removed duplicate tone write"
|
|
removed_tone_count += 1
|
|
continue
|
|
else:
|
|
latched_tone_frequencies[latched_channel] = tone
|
|
skip_next_data_write = False
|
|
|
|
|
|
# add the latest command to the list
|
|
optimized_command_list.append( { 'command' : command, 'data' : data } )
|
|
else:
|
|
# for all other commands, add to optimized_command_list
|
|
optimized_command_list.append( { 'command' : command, 'data' : data } )
|
|
|
|
|
|
print "- Removed " + str(removed_volume_count) + " duplicate volume commands"
|
|
print "- Removed " + str(removed_tone_count) + " duplicate tone commands"
|
|
print "- originally contained " + str(num_commands) + " commands, now contains " + str(len(optimized_command_list)) + " commands"
|
|
|
|
# replace internal command list with optimized command list
|
|
self.command_list = optimized_command_list
|
|
|
|
#-------------------------------------------------------------------------------------------------
|
|
# given a subset command list, sort the commands so that volumes come before tones
|
|
# returns a new list object containing the sorted command list
|
|
def sort_command_list(self, input_commands):
|
|
#return input_commands
|
|
|
|
# sorted by type
|
|
volume_list = []
|
|
tone_list = []
|
|
|
|
|
|
|
|
for c in input_commands:
|
|
|
|
# fetch next command & associated data
|
|
command = c["command"]
|
|
data = c["data"]
|
|
|
|
pcommand = binascii.hexlify(command)
|
|
# write command - add to optimized command list, removing any it replaces
|
|
if pcommand == "50":
|
|
|
|
pdata = binascii.hexlify(data)
|
|
w = int(pdata, 16)
|
|
# Check if LATCH/DATA write enabled - since this is the start of a write command
|
|
if (w & (128+16)) == (128+16):
|
|
volume_list.append( c )
|
|
else:
|
|
tone_list.append( c )
|
|
|
|
else:
|
|
print "ERROR - WAS NOT EXPECTING non register data in command list"
|
|
|
|
|
|
##### EXPERIMENTAL CODE TO SORT COMMANDS INTO CHANNEL ORDER ####
|
|
# Part of some tests to see if compression can be improved.
|
|
# Disabled for the moment
|
|
if False:
|
|
# sorted by type & channel
|
|
volume_channel_list = []
|
|
tone_channel_list = []
|
|
|
|
# sort volumes into channel order
|
|
for channel in range(0,4):
|
|
for c in volume_list:
|
|
# fetch next command & associated data
|
|
command = c["command"]
|
|
data = c["data"]
|
|
|
|
if binascii.hexlify(command) == "50":
|
|
w = int(binascii.hexlify(data), 16)
|
|
# already know its a volume command, so just check channel
|
|
if ((w >> 5) & 3) == channel:
|
|
volume_channel_list.append( c )
|
|
|
|
# sort tones into channel order
|
|
for channel in range(0,4):
|
|
next_tone_write = False
|
|
for c in tone_list:
|
|
# fetch next command & associated data
|
|
command = c["command"]
|
|
data = c["data"]
|
|
|
|
if binascii.hexlify(command) == "50":
|
|
w = int(binascii.hexlify(data), 16)
|
|
# already know its a tone command, so just check channel
|
|
if (w & 128):
|
|
if ((w >> 5) & 3) == channel:
|
|
tone_channel_list.append( c )
|
|
next_tone_write = True
|
|
else:
|
|
if next_tone_write:
|
|
tone_channel_list.append( c )
|
|
next_tone_write = False
|
|
|
|
# replace original lists with sorted lists
|
|
volume_list = volume_channel_list
|
|
tone_list = tone_channel_list
|
|
|
|
|
|
# return the commands sorted into volumes first followed by tones
|
|
output_list = []
|
|
output_list += volume_list
|
|
output_list += tone_list
|
|
return output_list
|
|
|
|
|
|
#-------------------------------------------------------------------------------------------------
|
|
# Slightly different 'lossy' optimization, mainly of use with quantization
|
|
# iterate through the command list, and for each update interval,
|
|
# remove any register writes we consider to be "redundant" (ie. multiple writes to the same register within a single wait period)
|
|
# we also sort the register updates so that volumes are set before tones
|
|
# this allows for better frequency correction - some tunes set tones before volumes which makes it tricky
|
|
# to detect tuned noise effects and compensate accordingly. Sorting register updates makes this more accurate.
|
|
def optimize2(self):
|
|
print " VGM Processing : Optimizing VGM Packets "
|
|
|
|
# total number of commands in the vgm stream
|
|
num_commands = len(self.command_list)
|
|
|
|
optimized_command_list = []
|
|
output_command_list = []
|
|
|
|
redundant_count = 0
|
|
|
|
for i in range(num_commands):
|
|
|
|
# fetch next command & associated data
|
|
command = self.command_list[i]["command"]
|
|
data = self.command_list[i]["data"]
|
|
|
|
# process the command
|
|
# writes get accumulated into time slots
|
|
|
|
|
|
pcommand = binascii.hexlify(command)
|
|
# write command - add to optimized command list, removing any it replaces
|
|
if pcommand == "50":
|
|
|
|
pdata = binascii.hexlify(data)
|
|
w = int(pdata, 16)
|
|
|
|
|
|
if (len(optimized_command_list) > 0):
|
|
|
|
# Check if LATCH/DATA write enabled - since this is the start of a write command
|
|
if w & 128:
|
|
# Get channel id
|
|
channel = (w>>5)&3
|
|
|
|
|
|
# first check for volume writes as these are easier
|
|
# Check if VOLUME flag set
|
|
if (w & 16):
|
|
# scan previous commands to see if same channel volume has been set
|
|
# if so, remove the previous one
|
|
temp_command_list = []
|
|
for c in optimized_command_list:
|
|
qdata = c["data"]
|
|
qw = int(binascii.hexlify(qdata), 16)
|
|
redundant = False
|
|
|
|
# Check if LATCH/DATA write enabled
|
|
if qw & 128:
|
|
|
|
# Check if VOLUME flag set
|
|
if (qw & 16):
|
|
# Get channel id
|
|
qchannel = (qw>>5)&3
|
|
if (qchannel == channel):
|
|
redundant = True
|
|
|
|
# we cant remove the item directly from optimized_command_list since we are iterating through it
|
|
# so we build a second optimized list
|
|
if (not redundant):
|
|
temp_command_list.append(c)
|
|
else:
|
|
if self.VERBOSE: print "Command#" + str(i) + " Removed redundant volume write"
|
|
|
|
# replace command list with optimized command list
|
|
optimized_command_list = temp_command_list
|
|
|
|
else:
|
|
# process tones, these are a bit more complex, since they might comprise two commands
|
|
|
|
# scan previous commands to see if a tone has been previously set on the same channel
|
|
# if so, remove the previous one
|
|
temp_command_list = []
|
|
redundant_tone_data = False # set to true if
|
|
for c in optimized_command_list:
|
|
qdata = c["data"]
|
|
qw = int(binascii.hexlify(qdata), 16)
|
|
|
|
redundant = False
|
|
|
|
# if a previous tone command was removed as redundant, any subsequent non-latch tone writes are also redundant
|
|
if (redundant_tone_data == True):
|
|
redundant_tone_data = False
|
|
if (qw & 128) == 0: # detect non latched data write
|
|
redundant = True
|
|
else:
|
|
# Check if LATCH/DATA write enabled
|
|
if qw & 128:
|
|
|
|
# Check if VOLUME flag NOT set (ie. TONE)
|
|
if (qw & 16) == 0:
|
|
# Get channel id
|
|
qchannel = (qw>>5)&3
|
|
if (qchannel == channel):
|
|
redundant = True
|
|
redundant_tone_data = True # indicate that if next command is a non-latched tone data write, it too is redundant
|
|
|
|
|
|
# we cant remove the item directly from quantized_command_list since we are iterating through it
|
|
# so we build a second optimized list
|
|
if (not redundant):
|
|
temp_command_list.append(c)
|
|
else:
|
|
redundant_count += 1
|
|
if self.VERBOSE: print "Command#" + str(i) + " Removed redundant tone write"
|
|
|
|
# replace command list with optimized command list
|
|
optimized_command_list = temp_command_list
|
|
|
|
# add the latest command to the list
|
|
optimized_command_list.append( { 'command' : command, 'data' : data } )
|
|
else:
|
|
# for all other commands, output any pending optimized_command_list
|
|
|
|
# first, sort the optimized command list so that volumes are set before tones
|
|
optimized_command_list = self.sort_command_list(optimized_command_list)
|
|
|
|
|
|
# now output the optmized command list
|
|
output_command_list += optimized_command_list
|
|
optimized_command_list = []
|
|
output_command_list.append( { 'command' : command, 'data' : data } )
|
|
|
|
print "- Removed " + str(redundant_count) + " redundant commands"
|
|
print "- originally contained " + str(num_commands) + " commands, now contains " + str(len(output_command_list)) + " commands"
|
|
|
|
# replace internal command list with optimized command list
|
|
self.command_list = output_command_list
|
|
|
|
|
|
|
|
#-------------------------------------------------------------------------------------------------
|
|
|
|
def quantize(self, play_rate):
|
|
|
|
print " VGM Processing : Quantizing VGM to " + str(play_rate) + " Hz"
|
|
|
|
if self.VGM_FREQUENCY % play_rate != 0:
|
|
print " ERROR - Cannot quantize to a fractional interval, must be an integer factor of 44100"
|
|
return
|
|
|
|
# total number of commands in the vgm stream
|
|
num_commands = len(self.command_list)
|
|
|
|
# total number of samples in the vgm stream
|
|
total_samples = int(self.metadata['total_samples'])
|
|
|
|
vgm_time = 0
|
|
playback_time = 0
|
|
|
|
interval_time = self.VGM_FREQUENCY/play_rate
|
|
|
|
vgm_command_index = 0
|
|
|
|
unhandled_commands = 0
|
|
|
|
# first step is to quantize the command stream to the playback rate rather than the sample rate
|
|
|
|
output_command_list = []
|
|
|
|
# clip the output to the desired length if specified as non zero number of 'play_rate' frames
|
|
total_frames = self.LENGTH
|
|
if total_frames > 0:
|
|
print "Limiting total frames to " + str(total_frames)
|
|
print "original total_samples " + str(total_samples)
|
|
total_samples = (self.VGM_FREQUENCY * total_frames)
|
|
print "new total_samples " + str(total_samples)
|
|
self.metadata['total_samples'] = total_samples
|
|
|
|
|
|
accumulated_time = 0
|
|
# process the entire vgm
|
|
while playback_time < total_samples:
|
|
|
|
quantized_command_list = []
|
|
playback_time += interval_time
|
|
|
|
# if playback time has caught up with vgm_time, process the commands
|
|
while vgm_time <= playback_time and vgm_command_index < len(self.command_list):
|
|
|
|
# fetch next command & associated data
|
|
command = self.command_list[vgm_command_index]["command"]
|
|
data = self.command_list[vgm_command_index]["data"]
|
|
|
|
# process the command
|
|
# writes get accumulated in this time slot
|
|
# waits get accumulated to vgm_time
|
|
|
|
if b'\x70' <= command <= b'\x7f':
|
|
pdata = binascii.hexlify(command)
|
|
t = int(pdata, 16)
|
|
t &= 15
|
|
t += 1
|
|
vgm_time += t
|
|
scommand = "WAITn"
|
|
if self.VERBOSE: print "WAITN=" + str(t)
|
|
else:
|
|
pcommand = binascii.hexlify(command)
|
|
|
|
if pcommand == "50":
|
|
# add the latest command to the list
|
|
quantized_command_list.append( { 'command' : command, 'data' : data } )
|
|
else:
|
|
if pcommand == "61":
|
|
scommand = "WAIT"
|
|
pdata = binascii.hexlify(data)
|
|
t = int(pdata, 16)
|
|
# sdm: swap bytes to LSB
|
|
lsb = t & 255
|
|
msb = (t / 256)
|
|
t = (lsb * 256) + msb
|
|
vgm_time += t
|
|
if self.VERBOSE: print "WAIT=" + str(t)
|
|
else:
|
|
if pcommand == "66": #end
|
|
# send the end command
|
|
output_command_list.append( { 'command' : command, 'data' : data } )
|
|
# end
|
|
else:
|
|
if pcommand == "62": #wait60
|
|
vgm_time += 735
|
|
else:
|
|
if pcommand == "63": #wait50
|
|
vgm_time += 882
|
|
else:
|
|
unhandled_commands += 1
|
|
|
|
if self.VERBOSE: print "vgm_time=" + str(vgm_time) + ", playback_time=" + str(playback_time) + ", vgm_command_index=" + str(vgm_command_index) + ", output_command_list=" + str(len(output_command_list)) + ", command=" + pcommand
|
|
vgm_command_index += 1
|
|
|
|
if self.VERBOSE: print "vgm_time has caught up with playback_time"
|
|
|
|
|
|
|
|
# we've caught up with playback time, so append the quantized command list to the output command list
|
|
if (len(quantized_command_list) > 0) :
|
|
|
|
|
|
|
|
# flush any pending wait commands before data writes, to optimize redundant wait commands
|
|
|
|
if self.VERBOSE: print "Flushing " + str(len(quantized_command_list)) + " commands, accumulated_time=" + str(accumulated_time)
|
|
|
|
# make sure we limit the max time delay to be the nearest value under 65535
|
|
# that is wholly divisible by the quantization interval
|
|
max_accumulated_time = 65535 / (self.VGM_FREQUENCY/play_rate)
|
|
max_accumulated_time = max_accumulated_time * (self.VGM_FREQUENCY/play_rate)
|
|
while (accumulated_time > 0):
|
|
|
|
# ensure no wait commands exceed the 16-bit limit
|
|
t = accumulated_time
|
|
if (t > max_accumulated_time):
|
|
t = max_accumulated_time
|
|
|
|
# optimization: if quantization time step is 1/50 or 1/60 of a second use the single byte wait
|
|
if t == 882: # 50Hz
|
|
if self.VERBOSE: print "Outputting WAIT50"
|
|
output_command_list.append( { 'command' : b'\x63', 'data' : None } )
|
|
else:
|
|
if t == 882*2: # 25Hz
|
|
if self.VERBOSE: print "Outputting 2x WAIT50 "
|
|
output_command_list.append( { 'command' : b'\x63', 'data' : None } )
|
|
output_command_list.append( { 'command' : b'\x63', 'data' : None } )
|
|
else:
|
|
if t == 735: # 60Hz
|
|
if self.VERBOSE: print "Outputting WAIT60"
|
|
output_command_list.append( { 'command' : b'\x62', 'data' : None } )
|
|
else:
|
|
if t == 735*2: # 30Hz
|
|
if self.VERBOSE: print "Outputting WAIT60 x 2"
|
|
output_command_list.append( { 'command' : b'\x62', 'data' : None } )
|
|
output_command_list.append( { 'command' : b'\x62', 'data' : None } )
|
|
else:
|
|
if self.VERBOSE: print "Outputting WAIT " + str(t) + " (" + str(float(t)/float(interval_time)) + " intervals)"
|
|
# else emit the full 16-bit wait command (3 bytes)
|
|
output_command_list.append( { 'command' : b'\x61', 'data' : struct.pack('H', t) } )
|
|
|
|
accumulated_time -= t
|
|
|
|
# output pending commands
|
|
output_command_list += quantized_command_list
|
|
|
|
|
|
# accumulate time to next quantized time period
|
|
next_w = (self.VGM_FREQUENCY/play_rate)
|
|
accumulated_time += next_w
|
|
if self.VERBOSE: print "next_w=" + str(next_w)
|
|
|
|
|
|
# report
|
|
print "Processed VGM stream, quantized to " + str(play_rate) + "Hz playback intervals"
|
|
print "- originally contained " + str(num_commands) + " commands, now contains " + str(len(output_command_list)) + " commands"
|
|
|
|
self.command_list = output_command_list
|
|
num_commands = len(output_command_list)
|
|
self.metadata['rate'] = play_rate
|
|
|
|
|
|
|
|
#-------------------------------------------------------------------------------------------------
|
|
|
|
def analyse(self):
|
|
|
|
|
|
# now we've quantized we can eliminate redundant register writes
|
|
# for each tone channel
|
|
# only store the last write
|
|
# for each volume channel
|
|
# only store the last write
|
|
# maybe incorporate this into the quantization
|
|
|
|
|
|
|
|
|
|
# total number of commands in the vgm stream
|
|
num_commands = len(self.command_list)
|
|
|
|
# total number of samples in the vgm stream
|
|
total_samples = int(self.metadata['total_samples'])
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# analysis / output
|
|
|
|
minwait = 99999
|
|
minwaitn = 99999
|
|
writecount = 0
|
|
totalwritecount = 0
|
|
maxwritecount = 0
|
|
writedictionary = []
|
|
waitdictionary = []
|
|
tonedictionary = []
|
|
maxtonedata = 0
|
|
numtonedatawrites = 0
|
|
unhandledcommands = 0
|
|
totaltonewrites = 0
|
|
totalvolwrites = 0
|
|
latchtone = 0
|
|
|
|
# convert to event sequence, one event per channel, with tones & volumes changed
|
|
|
|
#event = { "wait" : 0, "t0" : -1, "v0" : -1, "t1" : -1, "v1" : -1, "t2" : -1, "v2" : -1, "t3" : -1, "v3" : - 1 }
|
|
event = None
|
|
|
|
#nnnnnn tttttt vv ttttt vv tttt vv ttttt vvv
|
|
|
|
eventlist = []
|
|
|
|
waittime = 0
|
|
tonechannel = 0
|
|
|
|
for n in range(num_commands):
|
|
command = self.command_list[n]["command"]
|
|
data = self.command_list[n]["data"]
|
|
pdata = "NONE"
|
|
|
|
# process command
|
|
if b'\x70' <= command <= b'\x7f':
|
|
pcommand = "WAITn"
|
|
else:
|
|
pcommand = binascii.hexlify(command)
|
|
|
|
|
|
if pcommand == "50":
|
|
pcommand = "WRITE"
|
|
# count number of serial writes
|
|
writecount += 1
|
|
totalwritecount += 1
|
|
if data not in writedictionary:
|
|
writedictionary.append(data)
|
|
else:
|
|
if writecount > maxwritecount:
|
|
maxwritecount = writecount
|
|
writecount = 0
|
|
if pcommand == "61":
|
|
pcommand = "WAIT "
|
|
else:
|
|
if pcommand == "66":
|
|
pcommand = "END"
|
|
else:
|
|
if pcommand == "62":
|
|
pcommand = "WAIT60"
|
|
else:
|
|
if pcommand == "63":
|
|
pcommand = "WAIT50"
|
|
else:
|
|
unhandledcommands += 1
|
|
pdata = "UNKNOWN COMMAND"
|
|
|
|
|
|
|
|
|
|
|
|
# process data
|
|
# handle data writes first
|
|
if pcommand == "WRITE":
|
|
|
|
# flush any pending wait events
|
|
if waittime > 0:
|
|
# create a new event object, serial writes will be added to this single object
|
|
event = { "wait" : waittime, "t0" : -1, "v0" : -1, "t1" : -1, "v1" : -1, "t2" : -1, "v2" : -1, "t3" : -1, "v3" : - 1 }
|
|
eventlist.append(event)
|
|
waittime = 0
|
|
event = None
|
|
|
|
if event == None:
|
|
event = { "wait" : 0, "t0" : -1, "v0" : -1, "t1" : -1, "v1" : -1, "t2" : -1, "v2" : -1, "t3" : -1, "v3" : - 1 }
|
|
|
|
# process the write data
|
|
pdata = binascii.hexlify(data)
|
|
w = int(pdata, 16)
|
|
s = pdata
|
|
pdata = s + " (" + str(w) + ")"
|
|
if w & 128:
|
|
tonechannel = (w&96)>>5
|
|
pdata += " LATCH"
|
|
pdata += " CH" + str(tonechannel)
|
|
|
|
if (w & 16):
|
|
pdata += " VOL"
|
|
totalvolwrites += 1
|
|
vol = w & 15
|
|
if tonechannel == 0:
|
|
event["v0"] = vol
|
|
if tonechannel == 1:
|
|
event["v1"] = vol
|
|
if tonechannel == 2:
|
|
event["v2"] = vol
|
|
if tonechannel == 3:
|
|
event["v3"] = vol
|
|
|
|
|
|
else:
|
|
pdata += " TONE"
|
|
totaltonewrites += 1
|
|
latchtone = w & 15
|
|
pdata += " " + str(w & 15)
|
|
else:
|
|
pdata += " DATA"
|
|
numtonedatawrites += 1
|
|
if w > maxtonedata:
|
|
maxtonedata = w
|
|
tone = latchtone + (w << 4)
|
|
pdata += " " + str(w) + " (tone=" + str(tone) + ")"
|
|
|
|
latchtone = 0
|
|
if tone not in tonedictionary:
|
|
tonedictionary.append(tone)
|
|
|
|
if tonechannel == 0:
|
|
event["t0"] = tone
|
|
if tonechannel == 1:
|
|
event["t1"] = tone
|
|
if tonechannel == 2:
|
|
event["t2"] = tone
|
|
if tonechannel == 3:
|
|
event["t3"] = tone
|
|
else:
|
|
# process wait or end commands
|
|
|
|
# flush any previously gathered write event
|
|
if event != None:
|
|
eventlist.append(event)
|
|
event = None
|
|
|
|
if pcommand == "WAIT60":
|
|
t = 735
|
|
waittime += t
|
|
if t not in waitdictionary:
|
|
waitdictionary.append(t)
|
|
|
|
if pcommand == "WAIT50":
|
|
t = 882
|
|
waittime += t
|
|
if t not in waitdictionary:
|
|
waitdictionary.append(t)
|
|
|
|
if pcommand == "WAIT ":
|
|
pdata = binascii.hexlify(data)
|
|
t = int(pdata, 16)
|
|
# sdm: swap bytes to LSB
|
|
lsb = t & 255
|
|
msb = (t / 256)
|
|
t = (lsb * 256) + msb
|
|
waittime += t
|
|
if t < minwait:
|
|
minwait = t
|
|
ms = t * 1000 / self.VGM_FREQUENCY
|
|
pdata = str(ms) +"ms, " + str(t) + " samples (" + pdata +")"
|
|
if t not in waitdictionary:
|
|
waitdictionary.append(t)
|
|
|
|
|
|
if pcommand == "WAITn":
|
|
# data will be "None" for this but thats ok.
|
|
pdata = binascii.hexlify(command)
|
|
t = int(pdata, 16)
|
|
t &= 15
|
|
waittime += t
|
|
if t < minwaitn:
|
|
minwaitn = t
|
|
ms = t * 1000 / self.VGM_FREQUENCY
|
|
pdata = str(ms) +"ms, " + str(t) + " samples (" + pdata +")"
|
|
if t not in waitdictionary:
|
|
waitdictionary.append(t)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print "#" + str(n) + " Command:" + pcommand + " Data:" + pdata # '{:02x}'.format(data)
|
|
|
|
# NOTE: multiple register writes happen instantaneously
|
|
# ideas:
|
|
# quantize tone from 10-bit to 8-bit? Doubt it would sound the same.
|
|
# doesn't seem to be many tone changes, and tones are few in range (i bet vibrato and arpeggios change this though)
|
|
# volume is the main variable - possibly separate the volume stream and resample it?
|
|
# volume can be changed using one byte
|
|
# tone requires two bytes and could be quantized to larger time steps?
|
|
|
|
|
|
totalwaitcommands = num_commands - totalwritecount
|
|
clockspeed = 2000000
|
|
samplerate = self.VGM_FREQUENCY
|
|
cyclespersample = clockspeed/samplerate
|
|
|
|
|
|
#--------------------------------
|
|
print "--------------------------------------------------------------------------"
|
|
print "Number of sampled events: " + str(len(eventlist))
|
|
|
|
for n in range(len(eventlist)):
|
|
event = eventlist[n]
|
|
print "%6d" % n + " " + str(event)
|
|
|
|
|
|
print "--------------------------------------------------------------------------"
|
|
|
|
# compile volume channel 0 stream
|
|
|
|
eventlist_v0 = []
|
|
eventlist_v1 = []
|
|
eventlist_v2 = []
|
|
eventlist_v3 = []
|
|
|
|
eventlist_t0 = []
|
|
eventlist_t1 = []
|
|
eventlist_t2 = []
|
|
eventlist_t3 = []
|
|
|
|
def printEvents(eventlistarray, arrayname):
|
|
print ""
|
|
print "Total " + arrayname + " events: " + str(len(eventlistarray))
|
|
for n in range(len(eventlistarray)):
|
|
event = eventlistarray[n]
|
|
print "%6d" % n + " " + str(event)
|
|
|
|
def processEvents(eventsarray_in, eventsarray_out, tag_in, tag_out):
|
|
waittime = 0
|
|
for n in range(len(eventsarray_in)):
|
|
event = eventsarray_in[n]
|
|
t = event["wait"]
|
|
if t > 0:
|
|
waittime += t
|
|
else:
|
|
v = event[tag_in]
|
|
if v > -1:
|
|
eventsarray_out.append({ "wait" : waittime, tag_out : v })
|
|
waittime = 0
|
|
|
|
printEvents(eventsarray_out, tag_in)
|
|
|
|
processEvents(eventlist, eventlist_v0, "v0", "v")
|
|
processEvents(eventlist, eventlist_v1, "v1", "v")
|
|
processEvents(eventlist, eventlist_v2, "v2", "v")
|
|
processEvents(eventlist, eventlist_v3, "v3", "v")
|
|
|
|
processEvents(eventlist, eventlist_t0, "t0", "t")
|
|
processEvents(eventlist, eventlist_t1, "t1", "t")
|
|
processEvents(eventlist, eventlist_t2, "t2", "t")
|
|
processEvents(eventlist, eventlist_t3, "t3", "t")
|
|
|
|
|
|
# ----------------------- analysis
|
|
|
|
|
|
print "Number of commands in data file: " + str(num_commands)
|
|
print "Total samples in data file: " + str(total_samples) + " (" + str(total_samples*1000/self.VGM_FREQUENCY) + " ms)"
|
|
print "Smallest wait time was: " + str(minwait) + " samples"
|
|
print "Smallest waitN time was: " + str(minwaitn) + " samples"
|
|
print "ClockSpeed:" + str(clockspeed) + " SampleRate:" + str(samplerate) + " CyclesPerSample:" + str(cyclespersample) + " CyclesPerWrite:" + str(cyclespersample*minwait)
|
|
print "Updates Per Second:" + str(clockspeed/(cyclespersample*minwait))
|
|
print "Total register writes:" + str(totalwritecount) + " Max Sequential Writes:" + str(maxwritecount) # sequential writes happen at same time, in series
|
|
print "Total tone writes:" + str(totaltonewrites)
|
|
print "Total vol writes:" + str(totalvolwrites)
|
|
print "Total wait commands:" + str(totalwaitcommands)
|
|
print "Write dictionary contains " + str(len(writedictionary)) + " unique entries"
|
|
print "Wait dictionary contains " + str(len(waitdictionary)) + " unique entries"
|
|
print "Tone dictionary contains " + str(len(tonedictionary)) + " unique entries"
|
|
print "Largest Tone Data Write value was " + str(maxtonedata)
|
|
print "Number of Tone Data writes was " + str(numtonedatawrites)
|
|
print "Number of unhandled commands was " + str(unhandledcommands)
|
|
|
|
|
|
estimatedfilesize = totalwritecount + totalwaitcommands
|
|
|
|
print "Estimated file size is " + str(estimatedfilesize) + " bytes, assuming 1 byte per command can be achieved"
|
|
|
|
|
|
print ""
|
|
|
|
print "num t0 events: " + str(len(eventlist_t0)) + " (" + str(len(eventlist_t0)*3) + " bytes)"
|
|
print "num t1 events: " + str(len(eventlist_t1)) + " (" + str(len(eventlist_t1)*3) + " bytes)"
|
|
print "num t2 events: " + str(len(eventlist_t2)) + " (" + str(len(eventlist_t2)*3) + " bytes)"
|
|
print "num t3 events: " + str(len(eventlist_t3)) + " (" + str(len(eventlist_t3)*3) + " bytes)"
|
|
print "num v0 events: " + str(len(eventlist_v0)) + " (" + str(len(eventlist_v0)*3) + " bytes)"
|
|
print "num v1 events: " + str(len(eventlist_v1)) + " (" + str(len(eventlist_v1)*3) + " bytes)"
|
|
print "num v2 events: " + str(len(eventlist_v2)) + " (" + str(len(eventlist_v2)*3) + " bytes)"
|
|
print "num v3 events: " + str(len(eventlist_v3)) + " (" + str(len(eventlist_v3)*3) + " bytes)"
|
|
|
|
total_volume_events = len(eventlist_v0) + len(eventlist_v1) + len(eventlist_v2) + len(eventlist_v3)
|
|
total_tone_events = len(eventlist_t0) + len(eventlist_t1) + len(eventlist_t2) + len(eventlist_t3)
|
|
size_volume_events = (total_volume_events * 4 / 8) + total_volume_events*2 / 4
|
|
size_tone_events = (total_tone_events * 10 / 8) + total_tone_events*2
|
|
|
|
print "total_volume_events = " + str(total_volume_events) + " (" + str(size_volume_events) + " bytes)"
|
|
print "total_tone_events = " + str(total_tone_events) + " (" + str(size_tone_events) + " bytes)"
|
|
|
|
|
|
# seems you can playback at any frequency, by simply processing the VGM data stream to catchup with the simulated/real time
|
|
# this implies a bunch of registers will be written in one go. So for any tones or volumes that duplicate within the time slot, we can eliminate those
|
|
# therefore, you could in principle 'resample' a VGM at a given update frequency (eg. 50Hz) which would eliminate any redundant data sampled at 44100 hz
|
|
|
|
# basically, we'd play the song at a given playback rate, capture the output, and rewrite the VGM with these new values.
|
|
# we can test the process in the web player to see if any fidelity would be lost.
|
|
# at the very least, the wait time numbers would be smaller and therefore easier to pack
|
|
#
|
|
# another solution is to splice a tune into repeated patterns
|
|
#
|
|
# Alternatively, analyse the tune - assuming it was originally sequenced at some BPM, there would have to be a pattern
|
|
# Also, assume that instruments were used where tone/volume envelopes were used
|
|
# Capture when tone changes happen, then look for the volume patterns to create instruments
|
|
# then re-sequence as an instrument/pattern based format
|
|
|
|
|
|
|
|
#-------------------------------------------------------------------------------------------------
|
|
|
|
# iterate through the command list, seeing how we might be able to reduce filesize
|
|
# binary format schema is:
|
|
# We assume the VGM has been quantized to fixed intervals. Therefore we do not need to emit wait commands, just packets of data writes.
|
|
|
|
# <header>
|
|
# [byte] - header size - indicates number of bytes in header section
|
|
# [byte] - indicates the required playback rate in Hz
|
|
# [byte] - packet count lsb
|
|
# [byte] - packet count msb
|
|
# [byte] - duration minutes
|
|
# [byte] - duration seconds
|
|
# todo: add looping offsets
|
|
# <title>
|
|
# [byte] - title string size
|
|
# [dd] ... - ZT title string
|
|
# <author>
|
|
# [byte] - author string size
|
|
# [dd] ... - ZT author string
|
|
# <packets>
|
|
# [byte] - indicating number of data writes within the current packet (max 11)
|
|
# [dd] ... - data
|
|
# [byte] - number of data writes within the next packet
|
|
# [dd] ... - data
|
|
# ...
|
|
# <eof>
|
|
# [0xff] - eof
|
|
# Max packet length will be 11 bytes as that is all that is needed to update all SN tone + volume registers for all 4 channels in one interval.
|
|
|
|
def insights(self):
|
|
|
|
print "--------------------------------------"
|
|
print "insights"
|
|
print "--------------------------------------"
|
|
|
|
packet_dict = []
|
|
volume_packet_dict = []
|
|
tone_packet_dict = []
|
|
|
|
volume_dict = []
|
|
volume_write_count = 0
|
|
|
|
tone_dict = []
|
|
tone_latch_write_count = 0
|
|
tone_data_write_count = 0
|
|
tone_single_write_count = 0
|
|
tone_count_7bit = 0
|
|
|
|
|
|
packet_size_counts = [0,0,0,0,0,0,0,0,0,0,0,0,0]
|
|
packet_dict_counts = [0,0,0,0,0,0,0,0,0,0,0,0,0]
|
|
|
|
common_packets = 0
|
|
packet_count = 0
|
|
|
|
packet_block = bytearray()
|
|
volume_packet_block = bytearray()
|
|
tone_packet_block = bytearray()
|
|
|
|
tone_value = 0
|
|
|
|
tone_latch_write = False
|
|
for q in self.command_list:
|
|
|
|
command = q["command"]
|
|
if command == struct.pack('B', 0x50):
|
|
|
|
data = q['data']
|
|
packet_block.extend(data)
|
|
|
|
w = int(binascii.hexlify(data), 16)
|
|
|
|
# gather volume data
|
|
if w & (128+16) == (128+16):
|
|
|
|
# handle tones where only one write occurred
|
|
if tone_latch_write == True:
|
|
tone_single_write_count += 1
|
|
if tone_value not in tone_dict:
|
|
tone_dict.append(tone_value)
|
|
|
|
volume_packet_block.extend(data)
|
|
volume_write_count += 1
|
|
if w not in volume_dict:
|
|
volume_dict.append(w)
|
|
tone_latch_write = False
|
|
|
|
|
|
# gather tone latch data
|
|
if w & (128+16) == 128:
|
|
tone_packet_block.extend(data)
|
|
tone_latch_write_count += 1
|
|
|
|
# handle tones where only one write occurred
|
|
if tone_latch_write == True:
|
|
tone_single_write_count += 1
|
|
if tone_value not in tone_dict:
|
|
tone_dict.append(tone_value)
|
|
|
|
|
|
tone_value = w & 15
|
|
tone_latch_write = True
|
|
|
|
|
|
# gather tone data
|
|
if (w & 128) == 0:
|
|
if tone_latch_write == False:
|
|
print "ERROR: UNEXPECTED tone data write with no previous latch write"
|
|
tone_packet_block.extend(data)
|
|
tone_data_write_count += 1
|
|
tone_latch_write = False
|
|
tone_value |= (w & 63) << 4
|
|
if tone_value not in tone_dict:
|
|
tone_dict.append(tone_value)
|
|
|
|
|
|
else:
|
|
packet_count += 1
|
|
|
|
packet_size_counts[len(packet_block)] += 1
|
|
|
|
# function to compare packets with dictionary library
|
|
# returns true if unique or false if in dictionary
|
|
def process_packet(dict, block):
|
|
|
|
# build up a dictionary of packets - curious to see how much repetition exists
|
|
new_packet = True
|
|
|
|
for i in range(len(dict)):
|
|
pd = dict[i]
|
|
if len(pd) != len(block):
|
|
#print "Different size - so doesnt match"
|
|
continue
|
|
else:
|
|
#print "Found packet with matching size"
|
|
# same size so compare
|
|
match = True
|
|
for j in range(len(pd)):
|
|
if pd[j] != block[j]:
|
|
match = False
|
|
# we found a match, so it wont be added to the list
|
|
if (match == True):
|
|
new_packet = False
|
|
break
|
|
|
|
if new_packet == True:
|
|
#print "Non matching - Adding packet"
|
|
dict.append(block)
|
|
|
|
return new_packet
|
|
|
|
# add the various packets to dictionaries so we can determine level of repetition
|
|
process_packet(volume_packet_dict, volume_packet_block)
|
|
process_packet(tone_packet_dict, tone_packet_block)
|
|
|
|
new_packet = process_packet(packet_dict, packet_block)
|
|
if new_packet == True:
|
|
packet_dict_counts[len(packet_block)] += 1
|
|
|
|
if False:
|
|
# build up a dictionary of packets - curious to see how much repetition exists
|
|
new_packet = True
|
|
|
|
for i in range(len(packet_dict)):
|
|
pd = packet_dict[i]
|
|
if len(pd) != len(packet_block):
|
|
#print "Different size - Adding packet"
|
|
packet_dict.append(packet_block)
|
|
break
|
|
else:
|
|
#print "Found packet with matching size"
|
|
# same size so compare
|
|
mp = True
|
|
for j in range(len(pd)):
|
|
if pd[j] != packet_block[j]:
|
|
mp = False
|
|
if (mp == False):
|
|
new_packet = False
|
|
break
|
|
|
|
if new_packet == True:
|
|
#print "Non matching - Adding packet"
|
|
packet_dict.append(packet_block)
|
|
else:
|
|
common_packets += 1
|
|
#print "Found matching packet " + str(len(packet_block)) + " bytes"
|
|
|
|
# start new packet
|
|
packet_block = bytearray()
|
|
volume_packet_block = bytearray()
|
|
tone_packet_block = bytearray()
|
|
|
|
|
|
# print " Found " + str(common_packets) + " common packets out of total " + str(packet_count) + " packets"
|
|
|
|
print " There were " + str(len(packet_dict)) + " unique packets out of total "+ str(packet_count) + " packets"
|
|
print " There were " + str(len(volume_packet_dict)) + " unique volume packets out of total "+ str(packet_count) + " packets"
|
|
print " There were " + str(len(tone_packet_dict)) + " unique tone packets out of total "+ str(packet_count) + " packets"
|
|
print ""
|
|
|
|
def get_packet_dict_size(dict):
|
|
sz = 0
|
|
for p in dict:
|
|
sz += len(p)
|
|
return sz
|
|
|
|
print " Packet dictionary size " + str(get_packet_dict_size(packet_dict)) + " bytes"
|
|
print " Volume dictionary size " + str(get_packet_dict_size(volume_packet_dict)) + " bytes"
|
|
print " Tone dictionary size " + str(get_packet_dict_size(tone_packet_dict)) + " bytes"
|
|
print ""
|
|
|
|
print " Number of unique volumes " + str(len(volume_dict)) + " (max 64)" # should max out at 64 (4x16)
|
|
print " Number of volume writes " + str(volume_write_count)
|
|
print ""
|
|
print " Number of unique tones " + str(len(tone_dict))
|
|
print " Number of tone latch writes " + str(tone_latch_write_count)
|
|
print " Number of tone data writes " + str(tone_data_write_count)
|
|
print " Total 16-bit tone data writes " + str(tone_latch_write_count+tone_data_write_count)
|
|
print " Number of single tone latch writes " + str(tone_single_write_count)
|
|
print ""
|
|
print " Packet size distributions (0-11 bytes):"
|
|
|
|
t = 0
|
|
for i in range(0,12):
|
|
t += packet_size_counts[i]
|
|
print packet_size_counts, t
|
|
|
|
|
|
print ""
|
|
print " Unique Packet dict distributions (0-11 bytes):"
|
|
t = 0
|
|
for i in range(0,12):
|
|
t += packet_dict_counts[i]
|
|
print packet_dict_counts, t
|
|
|
|
print ""
|
|
print " Byte cost distributions (0-11 bytes):"
|
|
o = "[ "
|
|
t = 0
|
|
for i in range(0,12):
|
|
n = (packet_dict_counts[i]) * (i)
|
|
t += n
|
|
o += str(n) + ", "
|
|
print o + "]", t
|
|
|
|
|
|
print ""
|
|
print " Byte saving distributions (0-11 bytes):"
|
|
t = 0
|
|
o = "[ "
|
|
for i in range(0,12):
|
|
n = (packet_size_counts[i] - packet_dict_counts[i]) * (i)
|
|
t += n
|
|
o += str(n) + ", "
|
|
print o + "]", t
|
|
|
|
|
|
|
|
print ""
|
|
tp = 0
|
|
bs = 0
|
|
size = 1
|
|
for n in packet_size_counts:
|
|
tp += n
|
|
bs += n * size
|
|
size += 1
|
|
|
|
print " (total packets " + str(tp) + ")"
|
|
print " (total stream bytesize " + str(bs) + ")"
|
|
print " (write count byte size " + str(volume_write_count+tone_latch_write_count+tone_data_write_count+packet_count) + ")"
|
|
|
|
print " Volume writes represent " + str( volume_write_count * 100 / (bs-packet_count) ) + " % of filesize"
|
|
print " Tone writes represent " + str( (tone_latch_write_count+tone_data_write_count) * 100 / (bs-packet_count) ) + " % of filesize"
|
|
|
|
print " Filesize using packet LUT " + str( packet_count*2 + get_packet_dict_size(packet_dict))
|
|
print " Filesize using vol/tone packet LUT " + str( packet_count*4 + get_packet_dict_size(volume_packet_dict) + get_packet_dict_size(tone_packet_dict) )
|
|
print "--------------------------------------"
|
|
|
|
#--------------------------------------------------------------------------------------------------------------
|
|
|
|
# Apply a sliding window dictionary compression to the packet data
|
|
def compress_packets(self):
|
|
|
|
print "--------------------------------------"
|
|
print "packet compression"
|
|
print "--------------------------------------"
|
|
|
|
packet_list = []
|
|
packet_dict = []
|
|
|
|
output_stream = bytearray()
|
|
dict_stream = bytearray()
|
|
|
|
packet_block = bytearray()
|
|
|
|
for q in self.command_list:
|
|
|
|
command = q["command"]
|
|
if command == struct.pack('B', 0x50):
|
|
|
|
data = q['data']
|
|
packet_block.extend(data)
|
|
|
|
else:
|
|
packet_list.append(packet_block)
|
|
|
|
# start new packet
|
|
packet_block = bytearray()
|
|
|
|
|
|
print "Found " + str(len(packet_list)) + " packets"
|
|
|
|
|
|
# approach:
|
|
# as we process each new packet, scan a dictionary memory window to see if already exists
|
|
# if it does, emit an index into the dictionary, otherwise emit the new packet (and add it to the dictionary)
|
|
|
|
if True:
|
|
|
|
|
|
|
|
window_size = 2048 # must be power of 2, 2Kb seems to be the sweet spot
|
|
window_size_mask = window_size-1
|
|
window_ptr = 0
|
|
window_data = bytearray()
|
|
for i in range(0,window_size):
|
|
window_data.append(0)
|
|
|
|
# process all packets
|
|
# we wont support packets that 'wrap' the window
|
|
for packet in packet_list:
|
|
|
|
|
|
# see if new packet already exists in dictionary
|
|
packet_index = -1
|
|
packet_size = len(packet)
|
|
|
|
# only compress packets of a certain size
|
|
if packet_size > 2:
|
|
for i in range(0, window_size-packet_size):
|
|
|
|
# compare window at current index for a packet match
|
|
packet_found = True
|
|
for j in range(len(packet)):
|
|
index = (i+j) # & window_size_mask
|
|
if window_data[index] != packet[j]:
|
|
packet_found = False
|
|
break
|
|
|
|
if packet_found:
|
|
packet_index = i
|
|
|
|
if packet_index < 0:
|
|
# new packet, so add to dictionary
|
|
if window_ptr+packet_size > window_size:
|
|
window_ptr = 0
|
|
|
|
print "New packet added to window index " + str(window_ptr)
|
|
for j in range(packet_size):
|
|
window_data[window_ptr+j] = packet[j]
|
|
|
|
window_ptr += packet_size
|
|
|
|
# output data
|
|
if packet_index < 0:
|
|
# not found, emit the packet
|
|
output_stream.append(packet_size)
|
|
output_stream.extend(packet)
|
|
else:
|
|
print "Found packet at index " + str(packet_index)
|
|
output_stream.extend(struct.pack('h', packet_index))
|
|
|
|
|
|
print "Output stream size " + str(len(output_stream))
|
|
bin_file = open("xxx.bin", 'wb')
|
|
bin_file.write(output_stream)
|
|
bin_file.close()
|
|
else:
|
|
|
|
# build up a dictionary of packets - curious to see how much repetition exists
|
|
|
|
|
|
total_new_packets = 0
|
|
for packet in packet_list:
|
|
|
|
packet_index = -1
|
|
# see if new packet already exists in dictionary
|
|
for i in range(len(packet_dict)):
|
|
pd = packet_dict[i]
|
|
if len(pd) != len(packet):
|
|
#print "Different size - so doesnt match"
|
|
continue
|
|
else:
|
|
#print "Found packet with matching size"
|
|
# same size so compare
|
|
match = True
|
|
for j in range(len(pd)):
|
|
if pd[j] != packet[j]:
|
|
match = False
|
|
# we found a match, so set the dictionary index
|
|
if (match == True):
|
|
packet_index = i
|
|
break
|
|
|
|
if packet_index < 0:
|
|
#print "Non matching - Adding packet"
|
|
packet_index = len(packet_dict)
|
|
packet_dict.append(packet)
|
|
dict_stream.extend(packet)
|
|
total_new_packets += 1
|
|
|
|
output_stream.extend(struct.pack('h', packet_index))
|
|
|
|
|
|
print "Unique packets " + str(total_new_packets)
|
|
print "Dict stream size " + str(len(dict_stream))
|
|
print "Output stream size " + str(len(output_stream))
|
|
|
|
|
|
# write to output file
|
|
bin_file = open("xxx.bin", 'wb')
|
|
bin_file.write(output_stream)
|
|
bin_file.close()
|
|
bin_file = open("xxx2.bin", 'wb')
|
|
bin_file.write(dict_stream)
|
|
bin_file.close()
|
|
|
|
print "--------------------------------------"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#--------------------------------------------------------------------------------------------------------------
|
|
|
|
def write_binary(self, filename, rawheader = True):
|
|
print " VGM Processing : Output binary file "
|
|
|
|
# debug data to dump out information about the packet stream
|
|
#self.insights()
|
|
#self.compress_packets()
|
|
|
|
byte_size = 1
|
|
packet_size = 0
|
|
play_rate = self.metadata['rate']
|
|
play_interval = self.VGM_FREQUENCY / play_rate
|
|
data_block = bytearray()
|
|
packet_block = bytearray()
|
|
|
|
packet_count = 0
|
|
|
|
# emit the packet data
|
|
for q in self.command_list:
|
|
|
|
command = q["command"]
|
|
if command != struct.pack('B', 0x50):
|
|
|
|
# non-write command, so flush any pending packet data
|
|
if self.VERBOSE: print "Packet length " + str(len(packet_block))
|
|
|
|
data_block.append(struct.pack('B', len(packet_block)))
|
|
data_block.extend(packet_block)
|
|
packet_count += 1
|
|
|
|
#if packet_count > 30*play_rate:
|
|
# break
|
|
|
|
# start new packet
|
|
packet_block = bytearray()
|
|
|
|
if self.VERBOSE: print "Command " + str(binascii.hexlify(command))
|
|
|
|
|
|
|
|
# see if command is a wait longer than one interval and emit empty packets to compensate
|
|
wait = 0
|
|
if command == struct.pack('B', 0x61):
|
|
t = int(binascii.hexlify(q["data"]), 16)
|
|
wait = ((t & 255) * 256) + (t>>8)
|
|
else:
|
|
if command == struct.pack('B', 0x62):
|
|
wait = 735
|
|
else:
|
|
if command == struct.pack('B', 0x63):
|
|
wait = 882
|
|
|
|
if wait != 0:
|
|
intervals = wait / (self.VGM_FREQUENCY / play_rate)
|
|
if intervals == 0:
|
|
print "ERROR in data stream, wait value (" + str(wait) + ") was not divisible by play_rate (" + str((self.VGM_FREQUENCY / play_rate)) + "), bailing"
|
|
return
|
|
else:
|
|
if self.VERBOSE: print "WAIT " + str(intervals) + " intervals"
|
|
|
|
# emit empty packet headers to simulate wait commands
|
|
intervals -= 1
|
|
while intervals > 0:
|
|
data_block.append(0)
|
|
if self.VERBOSE: print "Packet length 0"
|
|
intervals -= 1
|
|
packet_count += 1
|
|
|
|
|
|
|
|
else:
|
|
if self.VERBOSE: print "Data " + str(binascii.hexlify(command))
|
|
packet_block.extend(q['data'])
|
|
|
|
# eof
|
|
data_block.append(0x00) # append one last wait
|
|
data_block.append(0xFF) # signal EOF
|
|
|
|
|
|
header_block = bytearray()
|
|
# emit the play rate
|
|
print "play rate is " + str(play_rate)
|
|
header_block.append(struct.pack('B', play_rate & 0xff))
|
|
header_block.append(struct.pack('B', packet_count & 0xff))
|
|
header_block.append(struct.pack('B', (packet_count >> 8) & 0xff))
|
|
|
|
print " Num packets " + str(packet_count)
|
|
duration = packet_count / play_rate
|
|
duration_mm = int(duration / 60.0)
|
|
duration_ss = int(duration % 60.0)
|
|
print " Song duration " + str(duration) + " seconds, " + str(duration_mm) + "m" + str(duration_ss) + "s"
|
|
header_block.append(struct.pack('B', duration_mm)) # minutes
|
|
header_block.append(struct.pack('B', duration_ss)) # seconds
|
|
|
|
# output the final byte stream
|
|
output_block = bytearray()
|
|
|
|
# send header
|
|
output_block.append(struct.pack('B', len(header_block)))
|
|
output_block.extend(header_block)
|
|
|
|
# send title
|
|
title = self.gd3_data['title_eng'].decode("utf_16")
|
|
title = title.encode('ascii', 'ignore')
|
|
|
|
if len(title) > 254:
|
|
title = title[:254]
|
|
output_block.append(struct.pack('B', len(title) + 1)) # title string length
|
|
output_block.extend(title)
|
|
output_block.append(struct.pack('B', 0)) # zero terminator
|
|
|
|
# send author
|
|
author = self.gd3_data['artist_eng'].decode("utf_16")
|
|
author = author.encode('ascii', 'ignore')
|
|
|
|
# use filename if no author listed
|
|
if len(author) == 0:
|
|
author = basename(self.vgm_filename)
|
|
|
|
if len(author) > 254:
|
|
author = author[:254]
|
|
output_block.append(struct.pack('B', len(author) + 1)) # author string length
|
|
output_block.extend(author)
|
|
output_block.append(struct.pack('B', 0)) # zero terminator
|
|
|
|
# send data with or without header
|
|
if rawheader:
|
|
output_block.extend(data_block)
|
|
else:
|
|
output_block = data_block
|
|
|
|
# write file
|
|
print "Compressed VGM is " + str(len(output_block)) + " bytes long"
|
|
|
|
# write to output file
|
|
bin_file = open(filename, 'wb')
|
|
bin_file.write(output_block)
|
|
bin_file.close()
|
|
|
|
#------------------------------------------------------------------------------------------
|
|
# Main
|
|
#------------------------------------------------------------------------------------------
|
|
|
|
# for testing
|
|
my_command_line = None
|
|
if False:
|
|
filename = "vgms/sms/10 Page 4.vgm"
|
|
filename = "vgms/sms/18 - 14 Dan's Theme.vgm"
|
|
filename = "vgms/bbc/Galaforce2-title.vgm"
|
|
filename = "vgms/bbc/Firetrack-ingame.vgm"
|
|
filename = "vgms/bbc/CodenameDroid-title.vgm"
|
|
filename = "vgms/sms/07 - 07 COOL JAM.vgm"
|
|
filename = "vgms/sms/09 - 13 Ken's Theme.vgm"
|
|
filename = "vgms/ntsc/15 Diamond Maze.vgm"
|
|
filename = "vgms/ntsc/01 Game Start.vgm"
|
|
|
|
|
|
#filename = "vgms/ntsc/ne7-magic_beansmaster_system_psg.vgm"
|
|
filename = "vgms/ntsc/Chris Kelly - SMS Power 15th Anniversary Competitions - Collision Chaos.vgz"
|
|
#filename = "vgms/ntsc/BotB 16439 Chip Champion - frozen dancehall of the pharaoh.vgm" # pathological fail, uses the built-in periodic noises which are tuned differently
|
|
|
|
#filename = "pn.vgm"
|
|
#filename = "vgms/ntsc/en vard fyra javel.vgm"
|
|
#filename = "chris.vgm"
|
|
filename = "vgms/ntsc/MISSION76496.vgm"
|
|
#filename = "vgms/ntsc/fluid.vgm"
|
|
#filename = "ng.vgm"
|
|
|
|
# for testing...
|
|
my_command_line = 'vgmconverter "' + filename + '" -t bbc -q 50 -o "test.vgm"'
|
|
|
|
|
|
|
|
|
|
#------------------------------------------------------------------------------------------
|
|
|
|
if my_command_line != None:
|
|
argv = my_command_line.split()
|
|
else:
|
|
argv = sys.argv
|
|
|
|
argc = len(argv)
|
|
|
|
if argc < 2:
|
|
print "VGM Conversion Utility for VGM files based on TI SN76849 sound chips"
|
|
print " Supports gzipped VGM or .vgz files."
|
|
print ""
|
|
print " Usage:"
|
|
print " vgmconverter <vgmfile> [-transpose <n>] [-quantize <n>] [-filter <n>] [-rawfile <filename>] [-output <filename>] [-length] [-norawheader] [-dump] [-verbose]"
|
|
print ""
|
|
print " where:"
|
|
print " <vgmfile> is the source VGM file to be processed. Wildcards are not yet supported."
|
|
print ""
|
|
print " options:"
|
|
print " [-transpose <n>, -t <n>] transpose the source VGM to a new frequency. For <n> Specify 'ntsc' (3.57MHz), 'pal' (4.2MHz) or 'bbc' (4.0MHz)"
|
|
print " [-quantize <n>, -q <n>] quantize the VGM to a specific playback update interval. For <n> specify an integer Hz value"
|
|
print " [-filter <n>, -n <n>] strip one or more output channels from the VGM. For <n> specify a string of channels to filter eg. '0123' or '13' etc."
|
|
print " [-rawfile <filename>, -r <filename>] output a raw binary file version of the chip data within the source VGM. A default quantization of 60Hz will be applied if not specified with -q"
|
|
print " [-output <filename>, -o <filename>] specifies the filename to output a processed VGM. Optional."
|
|
print " [-length <secs>, -l <secs>] limits output to <secs> seconds. Optional."
|
|
print " [-norawheader, -n] removes header from raw file output. Optional."
|
|
|
|
print " [-dump] output human readable version of the VGM"
|
|
print " [-verbose] enable debug information"
|
|
exit()
|
|
|
|
# pre-process argv to merge quoted arguments
|
|
argi = 0
|
|
inquotes = False
|
|
outargv = []
|
|
quotedarg = []
|
|
#print argv
|
|
for s in argv:
|
|
#print "s=" + s
|
|
#print "quotedarg=" + str(quotedarg)
|
|
|
|
if s.startswith('"') and s.endswith('"'):
|
|
outargv.append(s[1:-1])
|
|
continue
|
|
|
|
if not inquotes and s.startswith('"'):
|
|
inquotes = True
|
|
quotedarg.append(s[1:] + ' ')
|
|
continue
|
|
|
|
if inquotes and s.endswith('"'):
|
|
inquotes = False
|
|
quotedarg.append(s[:-1])
|
|
outargv.append("".join(quotedarg))
|
|
quotedarg = []
|
|
continue
|
|
|
|
if inquotes:
|
|
quotedarg.append(s + ' ')
|
|
continue
|
|
|
|
outargv.append(s)
|
|
|
|
if inquotes:
|
|
print "Error parsing command line " + str(" ".join(argv))
|
|
exit()
|
|
|
|
argv = outargv
|
|
|
|
# validate source file
|
|
source_filename = None
|
|
if argv[1][0] != '-':
|
|
source_filename = argv[1]
|
|
|
|
# setup option defaults
|
|
option_verbose = None
|
|
option_outputfile = None
|
|
option_transpose = None
|
|
option_quantize = None
|
|
option_filter = None
|
|
option_rawfile = None
|
|
option_dump = None
|
|
option_length = None
|
|
option_rawheader = True # determines if header added to raw output
|
|
|
|
# process command line
|
|
for i in range(2, len(argv)):
|
|
arg = argv[i]
|
|
if arg[0] == '-':
|
|
option = arg[1:].lower()
|
|
if option == 'o' or option == 'output':
|
|
option_outputfile = argv[i+1]
|
|
else:
|
|
if option == 't' or option == 'transpose':
|
|
option_transpose = argv[i+1]
|
|
else:
|
|
if option == 'q' or option == 'quantize':
|
|
option_quantize = argv[i+1]
|
|
else:
|
|
if option == 'f' or option == 'filter':
|
|
option_filter = argv[i+1]
|
|
else:
|
|
if option == 'r' or option == 'rawfile':
|
|
option_rawfile = argv[i+1]
|
|
else:
|
|
if option == 'd' or option == 'dump':
|
|
option_dump = True
|
|
else:
|
|
if option == 'v' or option == 'verbose':
|
|
option_verbose = True
|
|
else:
|
|
if option == 'l' or option == 'length':
|
|
option_length = argv[i+1]
|
|
else:
|
|
if option == 'n' or option == 'norawheader':
|
|
option_rawheader = False
|
|
else:
|
|
print "ERROR: Unrecognised option '" + arg + "'"
|
|
|
|
# load the VGM
|
|
if source_filename == None:
|
|
print "ERROR: No source <filename> provided."
|
|
exit()
|
|
|
|
# if rawfile output is specified, but no quantization option given, force a default quantization of 60Hz (NTSC)
|
|
if option_rawfile != None:
|
|
if option_quantize == None:
|
|
option_quantize = 60
|
|
|
|
# debug code
|
|
if False:
|
|
print "source " + str(source_filename)
|
|
print "verbose " + str(option_verbose)
|
|
print "output " + str(option_outputfile)
|
|
print "transpose " + str(option_transpose)
|
|
print "quantize " + str(option_quantize)
|
|
print "filter " + str(option_filter)
|
|
print "rawfile " + str(option_rawfile)
|
|
print "dump " + str(option_dump)
|
|
print ""
|
|
|
|
|
|
|
|
vgm_stream = VgmStream(source_filename)
|
|
|
|
# turn on verbose mode if required
|
|
if option_verbose == True:
|
|
vgm_stream.set_verbose(True)
|
|
|
|
# Apply output length if provided
|
|
if option_length != None:
|
|
vgm_stream.set_length(option_length)
|
|
|
|
|
|
# apply channel filters
|
|
if option_filter != None:
|
|
if option_filter.find('0') != -1:
|
|
vgm_stream.filter_channel(0)
|
|
if option_filter.find('1') != -1:
|
|
vgm_stream.filter_channel(1)
|
|
if option_filter.find('2') != -1:
|
|
vgm_stream.filter_channel(2)
|
|
if option_filter.find('3') != -1:
|
|
vgm_stream.filter_channel(3)
|
|
|
|
# Fixed optimization - non-lossy. Only removes duplicate register writes that are wholly unnecessary
|
|
vgm_stream.optimize()
|
|
|
|
# Second optimization - for each update interval, eliminate redundant register writes
|
|
# and sort the writes for each interval so that volumes are set before tones.
|
|
# This is in principle 'lossy' since the output VGM will be different to the source, but
|
|
# technically it will not influence the output audio stream.
|
|
vgm_stream.optimize2()
|
|
|
|
# Run first optimization again to take advantage of any redundancy from last optimization
|
|
vgm_stream.optimize()
|
|
|
|
# apply transpose
|
|
if option_transpose != None:
|
|
vgm_stream.transpose(option_transpose)
|
|
|
|
# quantize the VGM if required
|
|
if option_quantize != None:
|
|
hz = int(option_quantize)
|
|
vgm_stream.quantize(hz)
|
|
|
|
# optimize the stream
|
|
vgm_stream.optimize()
|
|
# optimize the packets
|
|
vgm_stream.optimize2()
|
|
# optimize the stream again, since packet optimization may have reduced data set further
|
|
vgm_stream.optimize()
|
|
|
|
|
|
# emit a raw binary file if required, rawheader is on by default
|
|
if option_rawfile != None:
|
|
vgm_stream.write_binary(option_rawfile, option_rawheader)
|
|
|
|
# write out the processed VGM if required
|
|
if option_outputfile != None:
|
|
vgm_stream.write_vgm(option_outputfile)
|
|
|
|
# dump the processed VGM
|
|
if option_dump != None:
|
|
vgm_stream.analyse()
|
|
|
|
# all done
|
|
print ""
|
|
print "Processing complete."
|
|
|
|
|