Skip to content

Commit

Permalink
add pymbus mvp (#2)
Browse files Browse the repository at this point in the history
Co-authored-by: Stanley Kudrow <stankudrow@reply.no>
  • Loading branch information
stankudrow and Stanley Kudrow authored Nov 5, 2024
1 parent 559a041 commit 0ee2da9
Show file tree
Hide file tree
Showing 39 changed files with 3,444 additions and 0 deletions.
Empty file added pymbus/__init__.py
Empty file.
5 changes: 5 additions & 0 deletions pymbus/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
BYTE = 8
NIBBLE = 4

BIG_ENDIAN = "big"
LITTLE_ENDIAN = "little"
2 changes: 2 additions & 0 deletions pymbus/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class MBusError(Exception):
"""Meter-Bus Base Error."""
Empty file added pymbus/structures/__init__.py
Empty file.
125 changes: 125 additions & 0 deletions pymbus/structures/fixed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
from enum import IntEnum


class MeasuredMedium(IntEnum):
"""The table (enum) for media.
1. Record Medium/Unit is always least significant byte first.
2. H.C.A. = Heat Cost Allocator.
3. Media from "Gas Mode2" to "H.C.A. Mode2" are defined in EN1434-3
for some existing meters with CI-Field 73h (intentionally mode1),
which transmit the multibyte records with high byte first
in contrast to the CI-Field. The master must know that
these media codes mean mode 2 or high byte first.
Further use of these codes for "pseudo media"
is NOT ALLOWED for new developments.
"""

other = 0x00 # 0000
oil = 0x01 # 0001
electricity = 0x02 # 0010
gas = 0x03 # 0011
heat = 0x04 # 0100
steam = 0x05 # 0101
hot_water = 0x06 # 0110
water = 0x07 # 0111
hca = 0x08 # 1000
reserved1 = 0x09 # 1001
gas_mode_2 = 0x0A # 1010
heat_mode_2 = 0x0B # 1011
hot_water_mode_2 = 0x0C # 1100
water_mode_2 = 0x0D # 1101
hca_mode_2 = 0x0E # 1110
reserved2 = 0x0F # 1111


class PhysicalUnits(IntEnum):
"""The table of physical units."""

hour_minute_second = 0x00
day_month_year = 0x01

watt_hour = 0x02
watt_hour_times_10 = 0x03
watt_hour_times_100 = 0x04

kilo_watt_hour = 0x05
kilo_watt_hour_times_10 = 0x06
kilo_watt_hour_times_100 = 0x07

mega_watt_hour = 0x08
mega_watt_hour_times_10 = 0x09
mega_watt_hour_times_100 = 0x0A

kilo_joule = 0x0B
kilo_joule_times_10 = 0x0C
kilo_joule_times_100 = 0x0D

mega_joule = 0x0E
mega_joule_times_10 = 0x0F
mega_joule_times_100 = 0x10

giga_joule = 0x11
giga_joule_times_10 = 0x12
giga_joule_times_100 = 0x13

watt = 0x14
watt_times_10 = 0x15
watt_times_100 = 0x16

kilo_watt = 0x17
kilo_watt_times_10 = 0x18
kilo_watt_times_100 = 0x19

mega_watt = 0x1A
mega_watt_times_10 = 0x1B
mega_watt_times_100 = 0x1C

kilo_joule_per_hour = 0x1D
kilo_joule_per_hour_times_10 = 0x1E
kilo_joule_per_hour_times_100 = 0x1F

mega_joule_per_hour = 0x20
mega_joule_per_hour_times_10 = 0x21
mega_joule_per_hour_times_100 = 0x22

giga_joule_per_hour = 0x23
giga_joule_per_hour_times_10 = 0x24
giga_joule_per_hour_times_100 = 0x25

milli_liter = 0x26
milli_liter_times_10 = 0x27
milli_liter_times_100 = 0x28

liter = 0x29
liter_times_10 = 0x2A
liter_times_100 = 0x2B

meter_cubic = 0x2C
meter_cubic_times_10 = 0x2D
meter_cubic_times_100 = 0x2E

milli_liter_per_hour = 0x2F
milli_liter_per_hour_times_10 = 0x30
milli_liter_per_hour_times_100 = 0x31

liter_per_hour = 0x32
liter_per_hour_times_10 = 0x33
liter_per_hour_times_100 = 0x34

meter_cubic_per_h = 0x35
meter_cubic_per_h_times_10 = 0x36
meter_cubic_per_h_times_100 = 0x37

celsius_times_10_to_minus_3 = 0x38 # Celsius * 10^(-3)

units_for_hca = 0x39 # H.C.A. = Heat Cost Allocator

reserved1 = 0x3A
reserved2 = 0x3B
reserved3 = 0x3C
reserved4 = 0x3D

same_but_historic = 0x3E

without_units = 0x3F # dimensionless
Empty file added pymbus/telegrams/__init__.py
Empty file.
140 changes: 140 additions & 0 deletions pymbus/telegrams/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
from collections.abc import Generator, Iterable
from typing import Self

from pymbus.exceptions import MBusError


def validate_byte(nbr: int) -> int:
"""Validates an integer number to be a byte.
In Python, a byte must be in range(0, 256).
This is the range for an 8-bit unsigned integer.
Raises
------
MbusError: `nbr` is out of the [0, 255] segment.
Returns
-------
int - the validated byte
"""

try:
bytes([nbr])
except ValueError as e:
raise MBusError from e

return nbr


class TelegramField:
"""The base "Field" class.
It is a base wrapper for a byte value.
A Field is a part of blocks, frames and other Telegram containers.
"""

def __init__(self, byte: int):
self._byte = validate_byte(byte)

def __eq__(self, other) -> bool:
sbyte = self.byte
if isinstance(other, TelegramField):
return sbyte == other.byte
return sbyte == other

def __repr__(self) -> str:
cls_name = self.__class__.__name__
return f"{cls_name}(byte={self.byte})"

@property
def byte(self) -> int:
"""Return the byte value of the field."""

return self._byte


TelegramBytesType = bytes | bytearray | Iterable[int | TelegramField]


def parse_byte(byte_like: int | TelegramField) -> int:
"""Return the byte value from the `byte_like` argument.
If byte_like is a TelegramField (TF),
then its byte property get called.
Parameters
----------
byte_like: int | TelegramField (TF)
either a byte-like integer or a TF as a byte wrapper
Returns
-------
int
"""

if isinstance(byte_like, TelegramField):
return byte_like.byte
return byte_like


class TelegramContainer:
"""The base class for Telegram containers.
A telegram container consists of telegram fields
and it is an iterable object, which may also be an iterator.
"""

@classmethod
def from_hexstring(cls, hex: str) -> Self:
"""Return a class instance from a hexadecimal string."""

barr = bytearray.fromhex(hex)
return cls(barr)

@classmethod
def from_integers(cls, ints: Iterable[int]) -> Self:
"""Return a class instance from a sequence of integers."""

barr = bytearray(iter(ints))
return cls(barr)

def __init__(self, ibytes: TelegramBytesType) -> None:
self._fields = []
for ib in ibytes:
field = ib if isinstance(ib, TelegramField) else TelegramField(ib)
self._fields.append(field)

def __eq__(self, other) -> bool:
sfields = self.fields
if isinstance(other, TelegramContainer):
return sfields == other.fields
return sfields == other

def __getitem__(self, idx: int) -> TelegramField:
return self._fields[idx]

def __iter__(self) -> Generator[None, None, TelegramField]:
for field in self.fields:
yield field

def __len__(self) -> int:
return len(self.fields)

def __repr__(self) -> str:
cls_name = self.__class__.__name__
return f"{cls_name}(fields={self.fields})"

def __str__(self) -> str:
return str(list(self))

@property
def fields(self) -> list[TelegramField]:
return self._fields

def as_bytes(self) -> bytes:
return bytes(field.byte for field in self.fields)


class TelegramBlock(TelegramContainer):
"""Base Telegram Block class."""
Empty file.
77 changes: 77 additions & 0 deletions pymbus/telegrams/blocks/data_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from collections.abc import Iterator

from pymbus.exceptions import MBusError
from pymbus.telegrams.base import (
TelegramBlock,
TelegramBytesType,
)
from pymbus.telegrams.fields.data_info import (
DataInformationField as DIF,
)
from pymbus.telegrams.fields.data_info import (
DataInformationFieldExtension as DIFE,
)

DataFieldType = DIF | DIFE


class DataInformationBlock(TelegramBlock):
"""The "Data Information Block" (DIB) class.
The DIB describes the length, type and coding of the data.
The DIB contains at least one byte (DIF - Data Information Field).
The DIF of a DIB can be followed with DIF Extensions (DIFE):
from 0 to 10 DIFE frames 1 byte each (as the DIF).
The structure of the DIB:
-------------------------------
| DIF | DIFE |
+--------+--------------------+
| 1 byte | 0-10 (1 byte each) |
-------------------------------
"""

MAX_DIFE_FRAMES = 10

def __init__(self, ibytes: TelegramBytesType):
try:
blocks = self._parse_blocks(iter(ibytes))
except StopIteration as e:
cls_name = self.__class__.__name__
msg = f"failed to parse {ibytes} as {cls_name}"
raise MBusError(msg) from e

self._dif = blocks[0]
self._difes = blocks[1:]

self._fields = blocks

@property
def dif(self) -> DIF:
return self._dif

@property
def difes(self) -> list[DIFE]:
return self._difes

def _parse_blocks(self, ibytes: Iterator[int]) -> list[DataFieldType]:
dif = DIF(byte=next(ibytes))

blocks: list[DataFieldType] = [dif]
if not dif.extension:
return blocks

pos = 1
max_frame = self.MAX_DIFE_FRAMES + 1
while byte := next(ibytes):
dife = DIFE(byte=byte)
blocks.append(dife)
if not dife.extension:
break
pos += 1
if pos == max_frame:
if dife.extension:
msg = f"the last {dife} has the extension bit set"
raise MBusError(msg)
break
return blocks
Loading

0 comments on commit 0ee2da9

Please sign in to comment.