diff --git a/poetry.lock b/poetry.lock index bddab73..6f06a8b 100644 --- a/poetry.lock +++ b/poetry.lock @@ -239,24 +239,6 @@ tomli = {version = ">=1", markers = "python_version < \"3.11\""} [package.extras] dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"] -[[package]] -name = "pytest-asyncio" -version = "0.24.0" -description = "Pytest support for asyncio" -optional = false -python-versions = ">=3.8" -files = [ - {file = "pytest_asyncio-0.24.0-py3-none-any.whl", hash = "sha256:a811296ed596b69bf0b6f3dc40f83bcaf341b155a269052d82efa2b25ac7037b"}, - {file = "pytest_asyncio-0.24.0.tar.gz", hash = "sha256:d081d828e576d85f875399194281e92bf8a68d60d72d1a2faf2feddb6c46b276"}, -] - -[package.dependencies] -pytest = ">=8.2,<9" - -[package.extras] -docs = ["sphinx (>=5.3)", "sphinx-rtd-theme (>=1.0)"] -testing = ["coverage (>=6.2)", "hypothesis (>=5.7.1)"] - [[package]] name = "pytest-cov" version = "5.0.0" @@ -378,4 +360,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = ">=3.10" -content-hash = "2bc089e35fba887dfdffcd351f820832b67a6e5cba518b1fe8b542ce2b29d088" +content-hash = "2addb2207acb6c98823afe48d0100e3526f623594c3366c9e6991928cd26b3d0" diff --git a/pymbus/telegrams/base.py b/pymbus/telegrams/base.py index 30a6952..e8134b8 100644 --- a/pymbus/telegrams/base.py +++ b/pymbus/telegrams/base.py @@ -134,7 +134,3 @@ def fields(self) -> list[TelegramField]: def as_bytes(self) -> bytes: return bytes(field.byte for field in self.fields) - - -class TelegramBlock(TelegramContainer): - """Base Telegram Block class.""" diff --git a/pymbus/telegrams/blocks/__init__.py b/pymbus/telegrams/blocks/__init__.py index e69de29..5344afa 100644 --- a/pymbus/telegrams/blocks/__init__.py +++ b/pymbus/telegrams/blocks/__init__.py @@ -0,0 +1,2 @@ +from pymbus.telegrams.blocks.data_info import DataInformationBlock +from pymbus.telegrams.blocks.value_info import ValueInformationBlock diff --git a/pymbus/telegrams/blocks/data_info.py b/pymbus/telegrams/blocks/data_info.py index 5e242aa..36b94d7 100644 --- a/pymbus/telegrams/blocks/data_info.py +++ b/pymbus/telegrams/blocks/data_info.py @@ -2,9 +2,11 @@ from pymbus.exceptions import MBusError from pymbus.telegrams.base import ( - TelegramBlock, TelegramBytesType, ) +from pymbus.telegrams.base import ( + TelegramContainer as TelegramBlock, +) from pymbus.telegrams.fields.data_info import ( DataInformationField as DIF, ) diff --git a/pymbus/telegrams/blocks/value_info.py b/pymbus/telegrams/blocks/value_info.py index d6c6b8d..18025b1 100644 --- a/pymbus/telegrams/blocks/value_info.py +++ b/pymbus/telegrams/blocks/value_info.py @@ -2,9 +2,11 @@ from pymbus.exceptions import MBusError from pymbus.telegrams.base import ( - TelegramBlock, TelegramBytesType, ) +from pymbus.telegrams.base import ( + TelegramContainer as TelegramBlock, +) from pymbus.telegrams.fields.value_info import ( ValueInformationField as VIF, ) diff --git a/pymbus/telegrams/records.py b/pymbus/telegrams/records.py new file mode 100644 index 0000000..7cef5d4 --- /dev/null +++ b/pymbus/telegrams/records.py @@ -0,0 +1,46 @@ +from pymbus.exceptions import MBusError +from pymbus.telegrams.base import ( + TelegramBytesType, +) +from pymbus.telegrams.base import ( + TelegramContainer as TelegramRecord, +) +from pymbus.telegrams.blocks import ( + DataInformationBlock as DIB, +) +from pymbus.telegrams.blocks import ( + ValueInformationBlock as VIB, +) + + +class DataRecord(TelegramRecord): + """The "Data Record" (DR) class. + + Typically encountered as Data Record Header (DRH). + + The structure of the DR(H): + ----------------- + | DIB | VIB | + ----------------- + + DIB = Data Information Block. + VIB = Value Information Block. + """ + + def __init__(self, ibytes: TelegramBytesType): + try: + it = iter(ibytes) + except TypeError as e: + msg = f"{ibytes} is not iterable" + raise MBusError(msg) from e + + self._dib = DIB(it) + self._vib = VIB(it) + + @property + def dib(self) -> DIB: + return self._dib + + @property + def vib(self) -> VIB: + return self._vib diff --git a/pymbus/types.py b/pymbus/types.py index 3ecd3a6..cbc57d4 100644 --- a/pymbus/types.py +++ b/pymbus/types.py @@ -48,7 +48,6 @@ from pymbus.constants import BIG_ENDIAN, BYTE, NIBBLE from pymbus.exceptions import MBusError -# TODO: type A and E # TODO: a unified integer type: as_uint, as_bcd etc. BytesType = bytes | bytearray | Iterable[int] @@ -57,15 +56,59 @@ ## integer types section +def parse_bcd_uint(ibytes: BytesType) -> int: + """Returns the unsigned integer from `ibytes`. + + BCD = Binary-Coded Decimal. + The "Unsigned Integer BCD" type = "Type A". + The bytes are parsed along the Big endian order. + + The function is greedy. + + Parameters + ---------- + ibytes: BytesType + the sequence of bytes for "Type A" parsing + + Raises + ------ + MBusDecodeError + if an empty byte sequence is given + + Returns + ------- + int + """ + + _bytes = bytes(reversed(ibytes)) + if not _bytes: + msg = "cannot parse empty bytes" + raise MBusError(msg) + + msp, lsp = 0b1111_0000, 0b0000_1111 + masks = (lsp, msp) + + number, power = 0, 0 + for byte in _bytes: + for mask in masks: + digit = byte & mask + number += digit * 10**power + power += 1 + + return number + + def parse_int(ibytes: BytesType) -> int: - """Returns the signed binary integer from `bytez`. + """Returns the signed integer from `ibytes`. The "Binary Integer" type = "Type B". The bytes are parsed along the Big endian order. + The function is greedy. + Parameters ---------- - ibytes: bytes | bytearray + ibytes: BytesType the sequence of bytes for "Type B" parsing Raises @@ -102,14 +145,16 @@ def parse_int(ibytes: BytesType) -> int: def parse_uint(ibytes: BytesType) -> int: - """Returns the unsigned integer from `bytez`. + """Returns the unsigned integer from `ibytes`. The "Unsigned Integer" type = "Type C". The bytes are parsed along the Big endian order. + The function is greedy. + Parameters ---------- - ibytes: bytes | bytearray + ibytes: BytesType the sequence of bytes for "Type C" parsing Raises @@ -130,18 +175,20 @@ def parse_uint(ibytes: BytesType) -> int: return int.from_bytes(_bytes, byteorder=BIG_ENDIAN, signed=False) -## boolean sectio +## boolean section def parse_bool(ibytes: BytesType) -> bool: - """Returns the boolean value from `bytez`. + """Returns the boolean value from `ibytes`. The "Boolean" type = "Type D". The bytes are parsed along the Big endian order. + The function is greedy. + Parameters ---------- - ibytes: bytes | bytearray + ibytes: BytesType the sequence of bytes for "Type D" parsing Raises @@ -157,6 +204,70 @@ def parse_bool(ibytes: BytesType) -> bool: return bool(parse_uint(ibytes)) +## types and units information (Type E = Compound CP16) + + +class UnitType: + """Type E = Compound CP16: types and units information.""" + + @classmethod + def from_bytes(cls, frame: BytesType) -> Self: + """Return a `UnitType` from an array of bytes.""" + + return cls(frame) + + @classmethod + def from_hexstring(cls, hex: str) -> Self: + """Return a `UnitType` from a hexadecimal string.""" + + barr = bytearray.fromhex(hex) + return cls.from_bytes(barr) + + def __init__(self, ibytes: BytesType): + it = iter(ibytes) + try: + _bytes = bytes([next(it) for _ in range(2)]) + except StopIteration as e: + msg = f"invalid length for {ibytes}" + raise MBusError(msg) from e + + unit_mask, media_mask = 0b1100_0000, 0b0011_1111 + self._unit1 = _bytes[1] & unit_mask + self._unit2 = _bytes[0] & unit_mask + self._media = bytes( + [ + _bytes[0] & media_mask, + _bytes[1] & media_mask, + ] + ) + + +def parse_unit_type(ibytes: BytesType) -> UnitType: + """Returns the boolean value from `ibytes`. + + The "Boolean" type = "Type D". + The bytes are parsed along the Big endian order. + + The function is NOT greedy. + + Parameters + ---------- + ibytes: BytesType + the sequence of bytes for "Type E" parsing + + Raises + ------ + MBusDecodeError + if an empty byte sequence is given + + Returns + ------- + UnitType + """ + + return UnitType(ibytes) + + ## date, time and datetime types section @@ -215,22 +326,36 @@ def get_date(ibytes: BytesType) -> date: def parse_date(frame: BytesType) -> date: - """Return the Python date from a byte iterator. + """Return the Python date from `frame`. + + The "Date" type = Type G (Compound CP16). + + The function is NOT greedy. Parameters ---------- - frame: Iterator[int] + frame: BytesType a frame of bytes for date parsing + Raises + ------ + MBusError: + invalid frame length + Returns ------- date """ it = iter(frame) - lst = [next(it) for _ in range(2)] + msg = f"frame length mismatch for {frame}" - return get_date(bytes(lst)) + try: + lst = [next(it) for _ in range(2)] + except StopIteration as e: + raise MBusError(msg) from e + + return get_date(lst) class Date: @@ -243,7 +368,7 @@ def from_date(cls, date_: date) -> Self: return cls(year=date_.year, month=date_.month, day=date_.day) @classmethod - def from_binary(cls, frame: bytes | bytearray) -> Self: + def from_bytes(cls, frame: BytesType) -> Self: """Return a `Date` from an array of bytes.""" date_ = parse_date(frame) @@ -254,14 +379,7 @@ def from_hexstring(cls, hex: str) -> Self: """Return a `Date` from a hexadecimal string.""" barr = bytearray.fromhex(hex) - return cls.from_binary(barr) - - @classmethod - def from_integers(cls, ints: Iterable[int]) -> Self: - """Return a `Date` from a sequence of integers.""" - - barr = bytearray(iter(ints)) - return cls.from_binary(barr) + return cls.from_bytes(barr) def __init__(self, year: int, month: int, day: int): self._date = date(year=year, month=month, day=day) @@ -343,12 +461,16 @@ def get_time(ibytes: BytesType) -> time: def parse_time(frame: BytesType) -> time: - """Return the Python time from a byte iterator. + """Return the Python time from `frame`. + + Not a part of standard Meter-Bus types. + + The function is NOT greedy. Parameters ---------- - frame: Iterator[int] - a frame of bytes for time parsing + frame: BytesType + a frame of bytes for date parsing Returns ------- @@ -369,7 +491,7 @@ def parse_time(frame: BytesType) -> time: if sec_byte: lst += [sec_byte] - return get_time(bytes(lst)) + return get_time(lst) class Time: @@ -390,7 +512,7 @@ def from_time(cls, time_: time) -> Self: ) @classmethod - def from_binary(cls, frame: bytes | bytearray) -> Self: + def from_bytes(cls, frame: BytesType) -> Self: """Return a `Time` from an array of bytes.""" time_ = parse_time(frame) @@ -401,14 +523,7 @@ def from_hexstring(cls, hex: str) -> Self: """Return a `Time` from a hexadecimal string.""" barr = bytearray.fromhex(hex) - return cls.from_binary(barr) - - @classmethod - def from_integers(cls, it: Iterable[int]) -> Self: - """Return a `Time` from an iterable of integers.""" - - time_ = parse_time(iter(it)) - return cls.from_time(time_) + return cls.from_bytes(barr) def __init__(self, hour: int, minute: int, second: int = 0): self._time = time(hour=hour, minute=minute, second=second) @@ -484,12 +599,21 @@ def get_datetime(ibytes: BytesType) -> datetime: def parse_datetime(frame: BytesType) -> datetime: - """Return the Python datetime from a byte iterator. + """Return the Python date from `frame`. + + The "DateTime" type = Type F (Compound CP32). + + The function is NOT greedy. Parameters ---------- - frame: Iterator[int] - a frame of bytes for datetime parsing + frame: BytesType + a frame of bytes for date parsing + + Raises + ------ + MBusError: + invalid frame length Returns ------- @@ -497,12 +621,11 @@ def parse_datetime(frame: BytesType) -> datetime: """ it = iter(frame) + msg = f"frame length mismatch for {frame}" try: lst = [next(it) for _ in range(4)] except StopIteration as e: - fr = list(iter(frame)) - msg = f"frame length mismatch for {fr}" raise MBusError(msg) from e try: @@ -530,7 +653,7 @@ def from_datetime(cls, datetime_: datetime) -> Self: ) @classmethod - def from_binary(cls, frame: bytes | bytearray) -> Self: + def from_bytes(cls, frame: BytesType) -> Self: """Return a `DateTime` from an array of bytes.""" datetime_ = parse_datetime(frame) @@ -540,15 +663,8 @@ def from_binary(cls, frame: bytes | bytearray) -> Self: def from_hexstring(cls, hex: str) -> Self: """Return a `DateTime` from a hexadecimal string.""" - barr = bytearray.fromhex(hex) - return cls.from_binary(barr) - - @classmethod - def from_integers(cls, it: Iterable[int]) -> Self: - """Return a `DateTime` from an iterable of integers.""" - - datetime_ = parse_datetime(iter(it)) - return cls.from_datetime(datetime_) + barr = bytes.fromhex(hex) + return cls.from_bytes(barr) def __init__( self, diff --git a/pyproject.toml b/pyproject.toml index 48af128..a42bfd1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [tool.poetry] name = "pymbus" -version = "0.1.0" -description = "Python Meter-Bus Package2" +version = "0.0.2" +description = "Python Meter-Bus Package" authors = ["Stanley Kudrow "] license = "MIT" readme = "README.md" @@ -17,7 +17,6 @@ ruff = "^0.6.4" [tool.poetry.group.test.dependencies] pytest = "^8.3.2" -pytest-asyncio = "^0.24.0" pytest-cov = "^5.0.0" pytest-mock = "^3.14.0" pytest-random-order = "^1.1.1" @@ -41,8 +40,17 @@ skip_empty = true title = "Python Meter-Bus Coverage Report" [tool.pytest.ini_options] -minversion = "8" -addopts = "-s -vvv --cov=pymbus --cov-report html --random-order" +minversion = "8.0" +addopts = [ + "-s", + "-vvv", + "--cov=pymbus", + "--maxfail=1", + "--random-order", + "--strict-config", + "--strict-markers", +] +xfail_strict = true testpaths = ["tests"] [tool.ruff] @@ -56,6 +64,9 @@ target-version = "py312" [tool.ruff.lint] extend-select = ["I", "F"] +[tool.ruff.lint.per-file-ignores] +"__init__.py" = ["F401"] + [tool.ruff.format] quote-style = "double" indent-style = "space" diff --git a/tests/mbus_types/test_datetimes.py b/tests/mbus_types/test_datetimes.py index e00a1d8..3a2a2e2 100644 --- a/tests/mbus_types/test_datetimes.py +++ b/tests/mbus_types/test_datetimes.py @@ -72,9 +72,8 @@ def test_parse_date(hexdata: str, dd: date): bindata = bytearray.fromhex(hexdata) integers = list(bindata) - assert Date.from_binary(bindata) == dd + assert Date.from_bytes(bindata) == dd assert Date.from_hexstring(hexdata) == dd - assert Date.from_integers(integers) == dd assert parse_date(integers) == dd @@ -158,9 +157,8 @@ def test_parse_time(hexdata: str, tt: time): bindata = bytearray.fromhex(hexdata) integers = list(bindata) - assert Time.from_binary(bindata) == tt + assert Time.from_bytes(bindata) == tt assert Time.from_hexstring(hexdata) == tt - assert Time.from_integers(integers) == tt assert parse_time(integers) == tt @@ -307,9 +305,8 @@ def test_parse_datetime(hexdata: str, dt: datetime): bindata = bytearray.fromhex(hexdata) integers = list(bindata) - assert DateTime.from_binary(bindata) == dt + assert DateTime.from_bytes(bindata) == dt assert DateTime.from_hexstring(hexdata) == dt - assert DateTime.from_integers(integers) == dt assert parse_datetime(integers) == dt