From 958da7606fbd25d85c46dd5a8f744e97659d7daf Mon Sep 17 00:00:00 2001 From: Craig Jones Date: Sat, 10 Jun 2023 22:42:16 -0700 Subject: [PATCH] First of many steps towards refactoring the anytone.py driver to extract the core commonality among the variant models (e.g. Anytone 5888UV vs. the Powerwerx DB-750X). This step encapsulates the communication protocol (via the "pipe.") --- chirp/drivers/anytone_clone.py | 269 +++++++++++++++++++++++++++++++ tests/unit/test_anytone_clone.py | 127 +++++++++++++++ 2 files changed, 396 insertions(+) create mode 100644 chirp/drivers/anytone_clone.py create mode 100644 tests/unit/test_anytone_clone.py diff --git a/chirp/drivers/anytone_clone.py b/chirp/drivers/anytone_clone.py new file mode 100644 index 000000000..f0c51028e --- /dev/null +++ b/chirp/drivers/anytone_clone.py @@ -0,0 +1,269 @@ +# Copyright 2023 Dan Smith +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from builtins import bytes +import struct +import logging +from abc import ABC, abstractmethod + +from chirp import util, errors + +LOG = logging.getLogger(__name__) + + +# ############################################################################ +# Radio Communication Protocol +# ############################################################################ + +# TODO Consider lifting this to common for all drivers to utilize +class RadioCommunicationProtocol(ABC): + """Abstract base class for encapsulating the logic that communicates with + the radio -- while remaining agnositic as to what content is being + communicated. + + Usage example (within the download code, similar for upload): + with MyProtocol(pipe, block_size, file_ident) as protocol: + # `protocol.start_session()` is automatically called. + # `protocol.verify_radio_ident()` is automatically called. + + for ... + for ... + block = protocol.read_block(addr) + data += block + + # `protocol.end_session()` is automatically called. + + """ + + def __init__( + self, pipe, block_size: int, file_ident, echos_write=True + ) -> None: + """@file_ident can be a single ID or a list of multiple possible + IDs.""" + assert block_size > 0 and block_size % 2 == 0 + self.pipe = pipe + self.block_size = block_size + self.file_idents = ( + [file_ident] + if isinstance(file_ident, (bytes, str)) + else file_ident + ) + self.echos_write = echos_write + + def __enter__(self): + self.start_session() + self._verify_radio_ident() + return self # what gets assigned to the `as` variable + + def __exit__(self, exc_type, exc_val, exc_tb): + # This is called automatically when the `with` statement ends. + # @exc_type,@exc_val, and @exc_tb are the type, value, and traceback + # info, respectively, if an exception was raised within the body of + # the with statement. Otherwise, all three are None. + if exc_type is None: + self.end_session() + return False # do not supress the exception, if any + + def read_bytes(self, length): + """Reads the @length number of bytes from the pipe and returns the + result. Raises errors.RadioError if the read is unsuccessful, or if + the length of the result falls short of @length.""" + try: + data = self.pipe.read(length) + except Exception as e: + LOG.error(f"Error reading from radio: {e}") + raise errors.RadioError("Unable to read from radio") from e + + if len(data) != length: + LOG.error( + f"Short read from radio ({len(data)}, expected {length})" + ) + LOG.debug(util.hexprint(data)) + raise errors.RadioError("Short read from radio") + assert type(data) == bytes + return data + + def write_bytes(self, data): + """Writes @data to the pipe, then advances the pipe cursor to the end + of what was just written. Raises errors.RadioError if the write is + unsuccessful.""" + try: + self.pipe.write(data) + if self.echos_write: + echoed = self.pipe.read(len(data)) + assert echoed == data + except Exception as e: + LOG.error(f"Error writing to radio: {e}") + raise errors.RadioError("Unable to write to radio") from e + + def _verify_radio_ident(self): + file_ident = self.inquire_model_number() + if file_ident not in self.file_idents: + LOG.debug( + f"Model inquiry response was: {util.hexprint(file_ident)}" + ) + raise errors.RadioError( + f"Unsupported model for this driver: {str(file_ident)}" + ) + + @abstractmethod + def start_session(self): + """Implement this method with whatever handshake is required to begin + a programming session with the radio. This method will be automatically + called by the `with` statement before executing the code in the body. + """ + pass + + @abstractmethod + def end_session(self): + """Implement this method with whatever command is required to end + a programming session with the radio. This method will be automatically + called by the `with` statement after executing the code in the body + (assuming no uncaught exception is raised). + """ + pass + + @abstractmethod + def inquire_model_number(self) -> bytes: + """Implement this method with whatever code is needed to ask the radio + for its model number.""" + pass + + +class AnytoneProtocol(RadioCommunicationProtocol): + """This class encapsulates the logic for communicating with any radio that + uses the Anytone protocol, while remaining agnositic as to what content + is actually being communicated (a data block is just a data block).""" + + ACK = b"\x06" + CMD_BEGIN_PROGRAMMING_SESSION = b"PROGRAM" + CMD_INQUIRE_MODEL = b"\x02" + CMD_END_SESSION = b"\x45\x4E\x44" # aka end frame + CMD_READ = b"R" + CMD_WRITE = b"W" + ACK_CLEAN_START = b"QX\x06" + MODEL_NUMBER_FIELD_LEN = 16 # Including the version info + MODEL_NUMBER_LEN = 7 # The model number itself is 7 + # > = big-endian, c = char, H = unsigned short, b = signed char + FRAME_HEADER_FORMAT = b">cHb" + FRAME_FOOTER_FORMAT = b"BB" + + @classmethod + def checksum(cls, data): + """Anytone's checksum algorithm.""" + return sum(data) % 256 + + def start_session(self): + self.pipe.timeout = 1 + response = self._send_simple_command( + self.CMD_BEGIN_PROGRAMMING_SESSION, len(self.ACK_CLEAN_START) + ) + if response != self.ACK_CLEAN_START: + LOG.debug( + "Start of programming session response was: " + f"{util.hexprint(response)}, expected: {self.ACK_CLEAN_START}" + ) + raise errors.RadioError("Unsupported model or bad connection") + + def inquire_model_number(self) -> bytes: + response = self._send_simple_command( + self.CMD_INQUIRE_MODEL, self.MODEL_NUMBER_FIELD_LEN + ) + file_ident: bytes = response[1 : self.MODEL_NUMBER_LEN + 1] + file_ident.strip(b"\x00") + return file_ident + + def end_session(self): + result = self._send_simple_command(self.CMD_END_SESSION, 1) + # FIXME I'm baffled as to why the radio sometimes returns \x06 as + # it's supposed to but usually returns \x00 + if result not in [self.ACK, b"\x00"]: + LOG.debug(f"End session response:\n{util.hexprint(result)}") + raise errors.RadioError("Radio did not finish cleanly.") + + def _send_simple_command(self, cmd, response_length) -> bytes: + self.write_bytes(cmd) + response = self.read_bytes(response_length) + LOG.debug( + f"Cmd: {util.hexprint(cmd)}, " + f"Response:\n{util.hexprint(response)}" + ) + return response + + def _send_frame_command(self, cmd, addr, length, data=None) -> bytes: + """Reads or writes a frame of data to the radio and then returns the + response -- either the data that's read, or a simple acknowledgment in + the case of a write.""" + frame = struct.pack(self.FRAME_HEADER_FORMAT, cmd, addr, length) + if cmd == self.CMD_WRITE: + frame += data + frame += struct.pack( + self.FRAME_FOOTER_FORMAT, self.checksum(frame[1:]), self.ACK + ) + self.write_bytes(frame) + LOG.debug(f"Sent Frame:\n{util.hexprint(frame)}") + return ( + self.read_bytes(1) + if cmd == self.CMD_WRITE + else self.read_bytes(length + 6) + ) + + def read_block(self, addr, out_of=None) -> bytes: + """Asks the radio to return one block's worth of data found at + @addr. @out_of is the number of blocks total (optional; only used in + debug massages)""" + result = self._send_frame_command(self.CMD_READ, addr, self.block_size) + out_of_part = f" of {out_of:4x}" if out_of else "" + LOG.debug( + f"Frame @{addr:4x} {out_of_part}...\n{util.hexprint(result)}" + ) + header = result[:4] + data = result[4:-2] + # The following colon insures that a bytes type is returned (via an + # iterable) rather than an int + ack = result[-1:] + + if ack != self.ACK: + LOG.debug(f"Expected ACK, got: {repr(ack)}") + raise errors.RadioError("Radio NAK'd block at %04x" % addr) + _cmd, _addr, _length = struct.unpack(self.FRAME_HEADER_FORMAT, header) + if _addr != addr or _length != self.block_size: + LOG.debug( + "Block read error, Expected length %02x, but received %02x" + % (self.block_size, _length) + ) + LOG.debug( + "Block read error, Expected addr %04x, but received %04x" + % (addr, _addr) + ) + raise errors.RadioError("Radio sent an unexpected data block.") + cs = self.checksum(header[1:] + data) + if cs != result[-2]: + LOG.debug("Calculated checksum: %02x" % cs) + LOG.debug("Actual checksum: %02x" % result[-2]) + raise errors.RadioError("Block at 0x%04x failed checksum" % addr) + return data + + def write_block(self, addr, data): + """Sends @data to the radio with the instruction to write it to @addr. + @data is expected to be exactly one block's worth.""" + result = self._send_frame_command( + self.CMD_WRITE, addr, self.block_size, data + ) + if result != self.ACK: + LOG.debug(f"write_block() expected ACK, got: {repr(result)}") + raise errors.RadioError( + "Radio did not accept block at %04x" % addr + ) diff --git a/tests/unit/test_anytone_clone.py b/tests/unit/test_anytone_clone.py new file mode 100644 index 000000000..0fa008cfa --- /dev/null +++ b/tests/unit/test_anytone_clone.py @@ -0,0 +1,127 @@ +from builtins import bytes +import unittest +import io + +from chirp import errors +import chirp.drivers.anytone_clone as ac + +MODEL_VERSION = b"Ianytone" + b"\x00" * (16 - 8) +SIXTEEN_BYTES = ( + b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15" +) +THIRTYTWO_BYTES = ( + b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15" + b"\x16\x17\x18\x19\x20\x21\x22\x23\x24\x25\x26\x27\x28\x29\x30\x31" +) +CHECKSUM_16_BYTES = b"\x9C" +CHECKSUM_32_BYTES = b"\xE8" + + +class FakeProtocol(ac.RadioCommunicationProtocol): + def __init__( + self, pipe=None, block_size: int = 16, file_ident=b"faker" + ) -> None: + super().__init__(pipe, block_size, file_ident, echos_write=False) + if not pipe: + pipe = io.BytesIO(b"fake data") + + def start_session(self): + pass + + def end_session(self): + pass + + def inquire_model_number(self) -> bytes: + return b"faker" + + +class TestRadioCommunicationProtocol(unittest.TestCase): + """Tests the communications protocol base class (radio-agnostic).""" + + def test_verify_radio_ident_single(self): + proto = FakeProtocol() + proto._verify_radio_ident() + + def test_verify_radio_ident_double(self): + proto = FakeProtocol(file_ident=[b"faker", b"faker1"]) + proto._verify_radio_ident() + + def test_verify_radio_ident_no_such(self): + proto = FakeProtocol(file_ident=[b"faker1", b"faker2"]) + with self.assertRaises( + errors.RadioError, msg="Unsupported model for this driver: faker2" + ): + proto._verify_radio_ident() + + def test_read_bytes(self): + with FakeProtocol(pipe=io.BytesIO(THIRTYTWO_BYTES)) as proto: + self.assertEquals(b"\x00\x01\x02\x03", proto.read_bytes(4)) + self.assertEquals(b"\x04\x05", proto.read_bytes(2)) + + def test_read_bytes_short(self): + with FakeProtocol(pipe=io.BytesIO(THIRTYTWO_BYTES)) as proto: + with self.assertRaises( + errors.RadioError, msg="Short read from radio" + ): + proto.read_bytes(33) + + def test_write_bytes(self): + proto = FakeProtocol(pipe=io.BytesIO(THIRTYTWO_BYTES)) + self.assertEquals(b"\x00\x01\x02\x03", proto.read_bytes(4)) + + # overwrites the 5th and 6th bytes and advances the cursor + proto.write_bytes(b"\xFF\xFF") + + self.assertEquals(b"\x06", proto.read_bytes(1)) + + +class TestAnytoneProtocol(unittest.TestCase): + """Tests the non-radio-specific aspects of the communications protocol + base class.""" + + def test_checksum(self): + self.assertEquals(156, ac.AnytoneProtocol.checksum(SIXTEEN_BYTES)) + self.assertEquals(200, ac.AnytoneProtocol.checksum(THIRTYTWO_BYTES)) + + def test_read_one_frame(self): + CHECKSUM_BLOCK_1 = bytes( + (ac.AnytoneProtocol.checksum(b"\x00\x00\x10" + SIXTEEN_BYTES),) + ) + + SESSION = ( + # space to write the start command + ac.AnytoneProtocol.CMD_BEGIN_PROGRAMMING_SESSION + + + # read the ack + ac.AnytoneProtocol.ACK_CLEAN_START + + + # space to write the inquire command + ac.AnytoneProtocol.CMD_INQUIRE_MODEL + + + # read the model number response + MODEL_VERSION + + + # space to write the read-frame command + b"R\x00\x00\x10" + + + # read the frame: header, data, checksum, ack + b"R\x00\x00\x10" + + SIXTEEN_BYTES + + CHECKSUM_BLOCK_1 + + ac.AnytoneProtocol.ACK + + + # space to write the end command + ac.AnytoneProtocol.CMD_END_SESSION + + + # read the ack + ac.AnytoneProtocol.ACK + ) + + with ac.AnytoneProtocol( + pipe=io.BytesIO(SESSION), + block_size=16, + file_ident=b"anytone", + echos_write=False, + ) as proto: + result = proto.read_block(0) + self.assertEquals(SIXTEEN_BYTES, result)