Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First of many steps towards refactoring the anytone.py driver... #638

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 269 additions & 0 deletions chirp/drivers/anytone_clone.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
# Copyright 2023 Dan Smith <dsmith@danplanet.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from builtins import bytes
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "no new uses of future" is because you're using this compatibility layer, but shouldn't.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I didn't realize. That just carried over form the previous code and I didn't even notice it. Thanks.

import struct
import logging
from abc import ABC, abstractmethod

from chirp import util, errors

LOG = logging.getLogger(__name__)


# ############################################################################
# Radio Communication Protocol
# ############################################################################

# TODO Consider lifting this to common for all drivers to utilize
class RadioCommunicationProtocol(ABC):
"""Abstract base class for encapsulating the logic that communicates with
the radio -- while remaining agnositic as to what content is being
communicated.

Usage example (within the download code, similar for upload):
with MyProtocol(pipe, block_size, file_ident) as protocol:
# `protocol.start_session()` is automatically called.
# `protocol.verify_radio_ident()` is automatically called.

for ...
for ...
block = protocol.read_block(addr)
data += block

# `protocol.end_session()` is automatically called.

"""

def __init__(
self, pipe, block_size: int, file_ident, echos_write=True
) -> None:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I've mentioned this on the list, but surely many times in other PRs. I absolutely detest black formatting style (as do many others). I'm sorry you've done this work in this way and will have to reformat it, but that's how it is. I have to deal with it at work, but I get to choose here. These parens on their own lines are one of the ugliest aspects of it, IMHO.

Most of the rest of the chirp code is fairly conventional python formatting. Please reformat this to look like the rest of the codebase.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not Black, then which tool should I use? Any special settings?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't offer you a tool to automatically format the code in more conventional python and chirp style, as you know. You can start by writing code that the current checker (flake8) will allow, and not adding arbitrary vertical stacking of things that will fit perfectly fine on fewer lines. Anywhere there's a closing paren on a line by itself is an easy regex for not-okay-ness :)

"""@file_ident can be a single ID or a list of multiple possible
IDs."""
assert block_size > 0 and block_size % 2 == 0
self.pipe = pipe
self.block_size = block_size
self.file_idents = (
[file_ident]
if isinstance(file_ident, (bytes, str))
else file_ident
)
self.echos_write = echos_write

def __enter__(self):
self.start_session()
self._verify_radio_ident()
return self # what gets assigned to the `as` variable

def __exit__(self, exc_type, exc_val, exc_tb):
# This is called automatically when the `with` statement ends.
# @exc_type,@exc_val, and @exc_tb are the type, value, and traceback
# info, respectively, if an exception was raised within the body of
# the with statement. Otherwise, all three are None.
if exc_type is None:
self.end_session()
return False # do not supress the exception, if any

def read_bytes(self, length):
"""Reads the @length number of bytes from the pipe and returns the
result. Raises errors.RadioError if the read is unsuccessful, or if
the length of the result falls short of @length."""
try:
data = self.pipe.read(length)
except Exception as e:
LOG.error(f"Error reading from radio: {e}")
raise errors.RadioError("Unable to read from radio") from e

if len(data) != length:
LOG.error(
f"Short read from radio ({len(data)}, expected {length})"
)
LOG.debug(util.hexprint(data))
raise errors.RadioError("Short read from radio")
assert type(data) == bytes
return data

def write_bytes(self, data):
"""Writes @data to the pipe, then advances the pipe cursor to the end
of what was just written. Raises errors.RadioError if the write is
unsuccessful."""
try:
self.pipe.write(data)
if self.echos_write:
echoed = self.pipe.read(len(data))
assert echoed == data
except Exception as e:
LOG.error(f"Error writing to radio: {e}")
raise errors.RadioError("Unable to write to radio") from e

def _verify_radio_ident(self):
file_ident = self.inquire_model_number()
if file_ident not in self.file_idents:
LOG.debug(
f"Model inquiry response was: {util.hexprint(file_ident)}"
)
raise errors.RadioError(
f"Unsupported model for this driver: {str(file_ident)}"
)

@abstractmethod
def start_session(self):
"""Implement this method with whatever handshake is required to begin
a programming session with the radio. This method will be automatically
called by the `with` statement before executing the code in the body.
"""
pass

@abstractmethod
def end_session(self):
"""Implement this method with whatever command is required to end
a programming session with the radio. This method will be automatically
called by the `with` statement after executing the code in the body
(assuming no uncaught exception is raised).
"""
pass

@abstractmethod
def inquire_model_number(self) -> bytes:
"""Implement this method with whatever code is needed to ask the radio
for its model number."""
pass


class AnytoneProtocol(RadioCommunicationProtocol):
"""This class encapsulates the logic for communicating with any radio that
uses the Anytone protocol, while remaining agnositic as to what content
is actually being communicated (a data block is just a data block)."""

ACK = b"\x06"
CMD_BEGIN_PROGRAMMING_SESSION = b"PROGRAM"
CMD_INQUIRE_MODEL = b"\x02"
CMD_END_SESSION = b"\x45\x4E\x44" # aka end frame
CMD_READ = b"R"
CMD_WRITE = b"W"
ACK_CLEAN_START = b"QX\x06"
MODEL_NUMBER_FIELD_LEN = 16 # Including the version info
MODEL_NUMBER_LEN = 7 # The model number itself is 7
# > = big-endian, c = char, H = unsigned short, b = signed char
FRAME_HEADER_FORMAT = b">cHb"
FRAME_FOOTER_FORMAT = b"BB"

@classmethod
def checksum(cls, data):
"""Anytone's checksum algorithm."""
return sum(data) % 256

def start_session(self):
self.pipe.timeout = 1
response = self._send_simple_command(
self.CMD_BEGIN_PROGRAMMING_SESSION, len(self.ACK_CLEAN_START)
)
if response != self.ACK_CLEAN_START:
LOG.debug(
"Start of programming session response was: "
f"{util.hexprint(response)}, expected: {self.ACK_CLEAN_START}"
)
raise errors.RadioError("Unsupported model or bad connection")

def inquire_model_number(self) -> bytes:
response = self._send_simple_command(
self.CMD_INQUIRE_MODEL, self.MODEL_NUMBER_FIELD_LEN
)
file_ident: bytes = response[1 : self.MODEL_NUMBER_LEN + 1]
file_ident.strip(b"\x00")
return file_ident

def end_session(self):
result = self._send_simple_command(self.CMD_END_SESSION, 1)
# FIXME I'm baffled as to why the radio sometimes returns \x06 as
# it's supposed to but usually returns \x00
if result not in [self.ACK, b"\x00"]:
LOG.debug(f"End session response:\n{util.hexprint(result)}")
raise errors.RadioError("Radio did not finish cleanly.")

def _send_simple_command(self, cmd, response_length) -> bytes:
self.write_bytes(cmd)
response = self.read_bytes(response_length)
LOG.debug(
f"Cmd: {util.hexprint(cmd)}, "
f"Response:\n{util.hexprint(response)}"
)
return response

def _send_frame_command(self, cmd, addr, length, data=None) -> bytes:
"""Reads or writes a frame of data to the radio and then returns the
response -- either the data that's read, or a simple acknowledgment in
the case of a write."""
frame = struct.pack(self.FRAME_HEADER_FORMAT, cmd, addr, length)
if cmd == self.CMD_WRITE:
frame += data
frame += struct.pack(
self.FRAME_FOOTER_FORMAT, self.checksum(frame[1:]), self.ACK
)
self.write_bytes(frame)
LOG.debug(f"Sent Frame:\n{util.hexprint(frame)}")
return (
self.read_bytes(1)
if cmd == self.CMD_WRITE
else self.read_bytes(length + 6)
)

def read_block(self, addr, out_of=None) -> bytes:
"""Asks the radio to return one block's worth of data found at
@addr. @out_of is the number of blocks total (optional; only used in
debug massages)"""
result = self._send_frame_command(self.CMD_READ, addr, self.block_size)
out_of_part = f" of {out_of:4x}" if out_of else ""
LOG.debug(
f"Frame @{addr:4x} {out_of_part}...\n{util.hexprint(result)}"
)
header = result[:4]
data = result[4:-2]
# The following colon insures that a bytes type is returned (via an
# iterable) rather than an int
ack = result[-1:]

if ack != self.ACK:
LOG.debug(f"Expected ACK, got: {repr(ack)}")
raise errors.RadioError("Radio NAK'd block at %04x" % addr)
_cmd, _addr, _length = struct.unpack(self.FRAME_HEADER_FORMAT, header)
if _addr != addr or _length != self.block_size:
LOG.debug(
"Block read error, Expected length %02x, but received %02x"
% (self.block_size, _length)
)
LOG.debug(
"Block read error, Expected addr %04x, but received %04x"
% (addr, _addr)
)
raise errors.RadioError("Radio sent an unexpected data block.")
cs = self.checksum(header[1:] + data)
if cs != result[-2]:
LOG.debug("Calculated checksum: %02x" % cs)
LOG.debug("Actual checksum: %02x" % result[-2])
raise errors.RadioError("Block at 0x%04x failed checksum" % addr)
return data

def write_block(self, addr, data):
"""Sends @data to the radio with the instruction to write it to @addr.
@data is expected to be exactly one block's worth."""
result = self._send_frame_command(
self.CMD_WRITE, addr, self.block_size, data
)
if result != self.ACK:
LOG.debug(f"write_block() expected ACK, got: {repr(result)}")
raise errors.RadioError(
"Radio did not accept block at %04x" % addr
)
127 changes: 127 additions & 0 deletions tests/unit/test_anytone_clone.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
from builtins import bytes
import unittest
import io

from chirp import errors
import chirp.drivers.anytone_clone as ac

MODEL_VERSION = b"Ianytone" + b"\x00" * (16 - 8)
SIXTEEN_BYTES = (
b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15"
)
THIRTYTWO_BYTES = (
b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15"
b"\x16\x17\x18\x19\x20\x21\x22\x23\x24\x25\x26\x27\x28\x29\x30\x31"
)
CHECKSUM_16_BYTES = b"\x9C"
CHECKSUM_32_BYTES = b"\xE8"


class FakeProtocol(ac.RadioCommunicationProtocol):
def __init__(
self, pipe=None, block_size: int = 16, file_ident=b"faker"
) -> None:
super().__init__(pipe, block_size, file_ident, echos_write=False)
if not pipe:
pipe = io.BytesIO(b"fake data")

def start_session(self):
pass

def end_session(self):
pass

def inquire_model_number(self) -> bytes:
return b"faker"


class TestRadioCommunicationProtocol(unittest.TestCase):
"""Tests the communications protocol base class (radio-agnostic)."""

def test_verify_radio_ident_single(self):
proto = FakeProtocol()
proto._verify_radio_ident()

def test_verify_radio_ident_double(self):
proto = FakeProtocol(file_ident=[b"faker", b"faker1"])
proto._verify_radio_ident()

def test_verify_radio_ident_no_such(self):
proto = FakeProtocol(file_ident=[b"faker1", b"faker2"])
with self.assertRaises(
errors.RadioError, msg="Unsupported model for this driver: faker2"
):
proto._verify_radio_ident()

def test_read_bytes(self):
with FakeProtocol(pipe=io.BytesIO(THIRTYTWO_BYTES)) as proto:
self.assertEquals(b"\x00\x01\x02\x03", proto.read_bytes(4))
self.assertEquals(b"\x04\x05", proto.read_bytes(2))

def test_read_bytes_short(self):
with FakeProtocol(pipe=io.BytesIO(THIRTYTWO_BYTES)) as proto:
with self.assertRaises(
errors.RadioError, msg="Short read from radio"
):
proto.read_bytes(33)

def test_write_bytes(self):
proto = FakeProtocol(pipe=io.BytesIO(THIRTYTWO_BYTES))
self.assertEquals(b"\x00\x01\x02\x03", proto.read_bytes(4))

# overwrites the 5th and 6th bytes and advances the cursor
proto.write_bytes(b"\xFF\xFF")

self.assertEquals(b"\x06", proto.read_bytes(1))


class TestAnytoneProtocol(unittest.TestCase):
"""Tests the non-radio-specific aspects of the communications protocol
base class."""

def test_checksum(self):
self.assertEquals(156, ac.AnytoneProtocol.checksum(SIXTEEN_BYTES))
self.assertEquals(200, ac.AnytoneProtocol.checksum(THIRTYTWO_BYTES))

def test_read_one_frame(self):
CHECKSUM_BLOCK_1 = bytes(
(ac.AnytoneProtocol.checksum(b"\x00\x00\x10" + SIXTEEN_BYTES),)
)

SESSION = (
# space to write the start command
ac.AnytoneProtocol.CMD_BEGIN_PROGRAMMING_SESSION
+
# read the ack
ac.AnytoneProtocol.ACK_CLEAN_START
+
# space to write the inquire command
ac.AnytoneProtocol.CMD_INQUIRE_MODEL
+
# read the model number response
MODEL_VERSION
+
# space to write the read-frame command
b"R\x00\x00\x10"
+
# read the frame: header, data, checksum, ack
b"R\x00\x00\x10"
+ SIXTEEN_BYTES
+ CHECKSUM_BLOCK_1
+ ac.AnytoneProtocol.ACK
+
# space to write the end command
ac.AnytoneProtocol.CMD_END_SESSION
+
# read the ack
ac.AnytoneProtocol.ACK
)

with ac.AnytoneProtocol(
pipe=io.BytesIO(SESSION),
block_size=16,
file_ident=b"anytone",
echos_write=False,
) as proto:
result = proto.read_block(0)
self.assertEquals(SIXTEEN_BYTES, result)