From 2f760c8b3defd1b44cea23ac01f98b52dd5ab47d Mon Sep 17 00:00:00 2001 From: Dan Smith Date: Wed, 1 May 2024 22:28:15 -0700 Subject: [PATCH] rt95: Detect VOX variant from RT95 --- chirp/drivers/anytone778uv.py | 62 ++++++++++++++++++++++++----------- chirp/wxui/clone.py | 1 + chirp/wxui/developer.py | 41 +++++++++++++++++++++-- 3 files changed, 81 insertions(+), 23 deletions(-) diff --git a/chirp/drivers/anytone778uv.py b/chirp/drivers/anytone778uv.py index fb8476c06..79d23c991 100644 --- a/chirp/drivers/anytone778uv.py +++ b/chirp/drivers/anytone778uv.py @@ -368,15 +368,13 @@ def cstring_to_py_string(cstring): # Check the radio version reported to see if it's one we support, -# returns bool version supported, and the band index +# returns bool version supported, the band index, and has_vox def check_ver(ver_response, allowed_types): ''' Check the returned radio version is one we approve of ''' LOG.debug('ver_response = ') LOG.debug(util.hexprint(ver_response)) - global HAS_VOX - resp = bitwise.parse(VER_FORMAT, ver_response) verok = False @@ -387,7 +385,7 @@ def check_ver(ver_response, allowed_types): (model, version)) LOG.debug('allowed_types = %s' % allowed_types) - HAS_VOX = "P" == model[-1:] + has_vox = "P" == model[-1:] if model in allowed_types: LOG.debug('model in allowed_types') @@ -398,14 +396,13 @@ def check_ver(ver_response, allowed_types): else: raise errors.RadioError('Failed to parse version response') - return verok, int(resp.bandlimit) + return verok, int(resp.bandlimit), has_vox # Put the radio in programming mode, sending the initial command and checking # the response. raise RadioError if there is no response (500ms timeout), and # if the returned version isn't matched by check_ver -def enter_program_mode(radio): - serial = radio.pipe +def enter_program_mode(serial): # place the radio in program mode, and confirm program_response = send_serial_command(serial, b'PROGRAM') @@ -414,13 +411,15 @@ def enter_program_mode(radio): LOG.debug('entered program mode') # read the radio ID string, make sure it matches one we know about - ver_response = send_serial_command(serial, b'\x02') + return send_serial_command(serial, b'\x02') + - verok, bandlimit = check_ver(ver_response, radio.ALLOWED_RADIO_TYPES) +def get_bandlimit_from_ver(radio, ver_response): + verok, bandlimit, has_vox = check_ver(ver_response, + radio.ALLOWED_RADIO_TYPES) if not verok: - exit_program_mode(radio) ver = "V2" if radio.VENDOR == "CRT" else "VOX" - if HAS_VOX and ("V2" and "VOX") not in radio.MODEL: + if has_vox and ("V2" and "VOX") not in radio.MODEL: raise errors.RadioError( 'Radio identified as model with VOX\n' 'Try %s-%s %s' % @@ -429,14 +428,18 @@ def enter_program_mode(radio): raise errors.RadioError( 'Radio version not in allowed list for %s-%s:\n' '%s' % - (radio.VENDOR, radio.MODEL, util.hexprint(ver_response))) + (radio.VENDOR, radio.MODEL, + util.hexprint(ver_response))) return bandlimit # Exit programming mode -def exit_program_mode(radio): - send_serial_command(radio.pipe, b'END') +def exit_program_mode(serial): + try: + send_serial_command(serial, b'END') + except Exception as e: + LOG.error('Failed to exit programming mode: %s', e) # Parse a packet from the radio returning the header (R/W, address, data, and @@ -456,12 +459,13 @@ def parse_read_response(resp): def do_download(radio): '''Download memories from the radio''' + # NOTE: The radio is already in programming mode because of + # detect_from_serial() + # Get the serial port connection serial = radio.pipe try: - enter_program_mode(radio) - memory_data = bytes() # status info for the UI @@ -482,6 +486,8 @@ def do_download(radio): # LOG.debug('read response:\n%s' % util.hexprint(read_response)) address, data, valid = parse_read_response(read_response) + if not valid: + raise errors.RadioError('Invalid response received from radio') memory_data += data # update UI @@ -489,11 +495,12 @@ def do_download(radio): // MEMORY_RW_BLOCK_SIZE radio.status_fn(status) - exit_program_mode(radio) except errors.RadioError as e: raise e except Exception as e: raise errors.RadioError('Failed to download from radio: %s' % e) + finally: + exit_program_mode(radio.pipe) return memmap.MemoryMapBytes(memory_data) @@ -510,7 +517,8 @@ def make_write_data_cmd(addr, data, datalen): # Upload a memory map to the radio def do_upload(radio): try: - bandlimit = enter_program_mode(radio) + ver_response = enter_program_mode(radio.pipe) + bandlimit = get_bandlimit_from_ver(radio, ver_response) if bandlimit != radio._memobj.radio_settings.bandlimit: LOG.warning('radio and image bandlimits differ' @@ -561,17 +569,18 @@ def do_upload(radio): LOG.debug(' * write cmd:\n%s' % util.hexprint(write_command)) LOG.debug(' * write response:\n%s' % util.hexprint(write_response)) - exit_program_mode(radio) + exit_program_mode(radio.pipe) raise errors.RadioError('Radio NACK\'d write command') # update UI status.cur = idx radio.status_fn(status) - exit_program_mode(radio) except errors.RadioError: raise except Exception as e: raise errors.RadioError('Failed to download from radio: %s' % e) + finally: + exit_program_mode(radio.pipe) # Get the value of @bitfield @number of bits in from 0 @@ -635,6 +644,18 @@ class AnyTone778UVBase(chirp_common.CloneModeRadio, NAME_LENGTH = 5 HAS_VOX = False + @classmethod + def detect_from_serial(cls, pipe): + ver_response = enter_program_mode(pipe) + for radio_cls in cls.detected_models(): + ver_ok, _, has_vox = check_ver(ver_response, + radio_cls.ALLOWED_RADIO_TYPES) + if ver_ok: + return radio_cls + LOG.warning('No match for ver_response: %s', + util.hexprint(ver_response)) + raise errors.RadioError('Incorrect radio model') + @classmethod def get_prompts(cls): rp = chirp_common.RadioPrompts() @@ -1780,6 +1801,7 @@ class AnyTone778UVvox(AnyTone778UVvoxBase): @directory.register +@directory.detected_by(RetevisRT95) class RetevisRT95vox(AnyTone778UVvoxBase): VENDOR = "Retevis" MODEL = "RT95 VOX" diff --git a/chirp/wxui/clone.py b/chirp/wxui/clone.py index e6f26b854..30fafd86d 100644 --- a/chirp/wxui/clone.py +++ b/chirp/wxui/clone.py @@ -47,6 +47,7 @@ def get_fakes(): 'Fake F7E': fake.FakeKenwoodSerial(), 'Fake UV17': fake.FakeUV17Serial(), 'Fake UV17Pro': fake.FakeUV17ProSerial(), + 'Fake AT778': developer.FakeAT778(), } diff --git a/chirp/wxui/developer.py b/chirp/wxui/developer.py index 68d0a5746..04b7a2534 100644 --- a/chirp/wxui/developer.py +++ b/chirp/wxui/developer.py @@ -448,12 +448,47 @@ def _page_changed(self, event): class FakeSerial(serial.SerialBase): + def __init__(self, *a, **k): + super().__init__(*a, **k) + self._fake_buf = bytearray() + + @property + def in_waiting(self): + return len(self._fake_buf) + def write(self, buf): LOG.debug('Fake serial write:\n%s' % util.hexprint(buf)) - def read(self, count): - LOG.debug('Fake serial read %i' % count) - return b'' + def read(self, count=None): + if count is None: + count = len(self._fake_buf) + data = self._fake_buf[:count] + self._fake_buf = self._fake_buf[count:] + LOG.debug('Fake serial read %i: %s', count, util.hexprint(data)) + return data + + def flush(self): + LOG.debug('Fake serial flushed') + + +class FakeAT778(FakeSerial): + def __init__(self, *a, **k): + super().__init__(*a, **k) + from chirp.drivers import anytone778uv + self._emulated = anytone778uv.RetevisRT95vox + + def write(self, buf): + if buf == b'PROGRAM': + self._fake_buf.extend(buf + b'QX\x06') + elif buf == b'\x02': + model = list(self._emulated.ALLOWED_RADIO_TYPES.keys())[0] + version = self._emulated.ALLOWED_RADIO_TYPES[model][0] + self._fake_buf.extend(buf + b'\x49%7.7s\x00%6.6s\x06' % ( + model.encode().ljust(7, b'\x00'), + version.encode().ljust(6, b'\x00'))) + else: + raise Exception('Full clone not implemented') + super().write(buf) class FakeEchoSerial(FakeSerial):