Skip to content

Commit

Permalink
Merge branch 'main' into chemelli74-duty-segregation
Browse files Browse the repository at this point in the history
  • Loading branch information
chemelli74 authored Jul 23, 2024
2 parents 62f849d + c636eee commit ca1d0ea
Show file tree
Hide file tree
Showing 4 changed files with 342 additions and 5 deletions.
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ repos:
- id: no-commit-to-branch
args: ["--branch", "main"]
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.5.2
rev: v0.5.4
hooks:
- id: ruff
args:
- --fix
- id: ruff-format
- repo: https://github.com/commitizen-tools/commitizen
rev: v3.27.0
rev: v3.28.0
hooks:
- id: commitizen
stages: [commit-msg]
Expand Down
3 changes: 1 addition & 2 deletions midealocal/devices/e2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ def _normalize_old_protocol(self, value: str | bool | int) -> OldProtocol:
if return_value == OldProtocol.auto:
result = (
self.subtype <= E2SubType.T82
or self.subtype == E2SubType.T85
or self.subtype == E2SubType.T36353
or self.subtype in [E2SubType.T85, E2SubType.T36353],
)
return_value = OldProtocol.true if result else OldProtocol.false
if isinstance(value, bool | int):
Expand Down
151 changes: 150 additions & 1 deletion midealocal/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import logging
from enum import IntEnum
from typing import SupportsIndex, cast
from typing import Generic, SupportsIndex, TypeVar, cast

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -300,12 +300,156 @@ def body(self) -> bytearray:
return bytearray([0x00] * 19)


T = TypeVar("T")
E = TypeVar("E", bound="IntEnum")


class BodyParser(Generic[T]):
"""Body parser to decode message."""

def __init__(
self,
name: str,
byte: int,
bit: int | None = None,
length_in_bytes: int = 1,
first_upper: bool = True,
default_raw_value: int = 0,
) -> None:
"""Init body parser with attribute name."""
self.name = name
self._byte = byte
self._bit = bit
self._length_in_bytes = length_in_bytes
self._first_upper = first_upper
self._default_raw_value = default_raw_value
if length_in_bytes < 0:
raise ValueError("Length in bytes must be a positive value.")
if bit is not None and (bit < 0 or bit >= length_in_bytes * 8):
raise ValueError(
"Bit, if set, must be a valid value position for %d bytes.",
length_in_bytes,
)

def _get_raw_value(self, body: bytearray) -> int:
"""Get raw value from body."""
if len(body) < self._byte + self._length_in_bytes:
return self._default_raw_value
data = 0
for i in range(self._length_in_bytes):
byte = (
self._byte + self._length_in_bytes - 1 - i
if self._first_upper
else self._byte + i
)
data += body[byte] << (8 * i)
if self._bit is not None:
data = (data & (1 << self._bit)) >> self._bit
return data

def get_value(self, body: bytearray) -> T:
"""Get attribute value."""
return self._parse(self._get_raw_value(body))

def _parse(self, raw_value: int) -> T:
"""Convert raw value to attribute value."""
raise NotImplementedError


class BoolParser(BodyParser[bool]):
"""Bool message body parser."""

def __init__(
self,
name: str,
byte: int,
bit: int | None = None,
true_value: int = 1,
false_value: int = 0,
default_value: bool = True,
) -> None:
"""Init bool body parser."""
super().__init__(name, byte, bit)
self._true_value = true_value
self._default_value = default_value
self._false_value = false_value

def _parse(self, raw_value: int) -> bool:
if raw_value not in [self._true_value, self._false_value]:
return self._default_value
return raw_value == self._true_value


class IntEnumParser(BodyParser[E]):
"""IntEnum message body parser."""

def __init__(
self,
name: str,
byte: int,
enum_class: type[E],
length_in_bytes: int = 1,
first_upper: bool = False,
default_value: E | None = None,
) -> None:
"""Init IntEnum body parser."""
super().__init__(
name,
byte,
length_in_bytes=length_in_bytes,
first_upper=first_upper,
)
self._enum_class = enum_class
self._default_value = default_value

def _parse(self, raw_value: int) -> E:
try:
return self._enum_class(raw_value)
except ValueError:
return (
self._default_value
if self._default_value is not None
else self._enum_class(0)
)


class IntParser(BodyParser[int]):
"""IntEnum message body parser."""

def __init__(
self,
name: str,
byte: int,
max_value: int = 255,
min_value: int = 0,
length_in_bytes: int = 1,
first_upper: bool = False,
) -> None:
"""Init IntEnum body parser."""
super().__init__(
name,
byte,
length_in_bytes=length_in_bytes,
first_upper=first_upper,
)
self._max_value = max_value
self._min_value = min_value

def _parse(self, raw_value: int) -> int:
if raw_value > self._max_value:
return self._max_value
if raw_value < self._min_value:
return self._min_value
return raw_value


class MessageBody:
"""Message body."""

def __init__(self, body: bytearray) -> None:
"""Initialize message body."""
self._data = body
self.parser_list: list[BodyParser] = []

@property
def data(self) -> bytearray:
Expand All @@ -322,6 +466,11 @@ def read_byte(body: bytearray, byte: int, default_value: int = 0) -> int:
"""Read bytes for message body."""
return body[byte] if len(body) > byte else default_value

def parse_all(self) -> None:
"""Process parses and set body attrs."""
for parse in self.parser_list:
setattr(self, parse.name, parse.get_value(self._data))


class NewProtocolPackLength(IntEnum):
"""New Protocol Pack Length."""
Expand Down
189 changes: 189 additions & 0 deletions tests/message_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
"""Midea local message test."""

import pytest

from midealocal.message import (
BodyParser,
BodyType,
BoolParser,
IntEnumParser,
IntParser,
MessageBody,
)


def test_init_validations() -> None:
"""Test body parser init validations."""
with pytest.raises(
ValueError,
match="Length in bytes must be a positive value.",
):
BodyParser[int]("name", byte=3, length_in_bytes=-1)

with pytest.raises(
ValueError,
match="('Bit, if set, must be a valid value position for %d bytes.', 2)",
):
BodyParser[int]("name", byte=3, length_in_bytes=2, bit=-1)

with pytest.raises(
ValueError,
match="('Bit, if set, must be a valid value position for %d bytes.', 3)",
):
BodyParser[int]("name", byte=3, length_in_bytes=3, bit=24)


class TestBodyParser:
"""Body parser test."""

@pytest.fixture(autouse=True)
def _setup_body(self) -> None:
"""Create body for test."""
self.body = bytearray(
[
0x00,
0x01,
0x02,
0x03,
0x04,
0x05,
],
)

def test_get_raw_value_1_byte(self) -> None:
"""Test get raw value with 1 byte."""
parser = BodyParser[int]("name", 2)
value = parser._get_raw_value(self.body)
assert value == 0x02

def test_get_raw_value_2_bytes(self) -> None:
"""Test get raw value with 2 bytes."""
parser = BodyParser[int]("name", 2, length_in_bytes=2)
value = parser._get_raw_value(self.body)
assert value == 0x0203

def test_get_raw_value_2_bytes_first_lower(self) -> None:
"""Test get raw value with 2 bytes first lower."""
parser = BodyParser[int]("name", 2, length_in_bytes=2, first_upper=False)
value = parser._get_raw_value(self.body)
assert value == 0x0302

def test_get_raw_out_of_bounds(self) -> None:
"""Test get raw value out of bounds."""
parser = BodyParser[int]("name", 6)
value = parser._get_raw_value(self.body)
assert value == 0

def test_get_raw_data_size_out_of_bounds(self) -> None:
"""Test get raw value out of bounds."""
parser = BodyParser[int]("name", 5, length_in_bytes=2)
value = parser._get_raw_value(self.body)
assert value == 0

def test_get_raw_data_bit(self) -> None:
"""Test get raw value out of bounds."""
for i in range(16):
parser = BodyParser[int]("name", 4, length_in_bytes=2, bit=i)
value = parser._get_raw_value(self.body)
assert value == (1 if i in [0, 2, 10] else 0)

def test_parse_unimplemented(self) -> None:
"""Test parse unimplemented."""
parser = BodyParser[int]("name", 4, length_in_bytes=2, bit=2)
with pytest.raises(NotImplementedError):
parser.get_value(self.body)


class TestBoolParser:
"""Test BoolParser."""

def test_bool_default(self) -> None:
"""Test default behaviour."""
parser = BoolParser("name", 0)
assert parser._parse(0) is False
assert parser._parse(1) is True
assert parser._parse(2) is True

def test_bool_default_false(self) -> None:
"""Test default behaviour with default value false."""
parser = BoolParser("name", 0, default_value=False)
assert parser._parse(0) is False
assert parser._parse(1) is True
assert parser._parse(2) is False

def test_bool_inverted(self) -> None:
"""Test True=0 and False=1."""
parser = BoolParser("name", 0, true_value=0, false_value=1)
assert parser._parse(0) is True
assert parser._parse(1) is False
assert parser._parse(2) is True


class TestIntEnumParser:
"""Test IntEnumParser."""

def test_intenum_default(self) -> None:
"""Test default behaviour."""
parser = IntEnumParser[BodyType]("name", 0, BodyType)
assert parser._parse(0x01) == BodyType.X01
assert parser._parse(0x00) == BodyType.X00
assert parser._parse(0x10) == BodyType.X00

parser = IntEnumParser[BodyType]("name", 0, BodyType, default_value=BodyType.A0)
assert parser._parse(0x01) == BodyType.X01
assert parser._parse(0x00) == BodyType.X00
assert parser._parse(0x10) == BodyType.A0


class TestIntParser:
"""Test IntParser."""

def test_int_default(self) -> None:
"""Test default behaviour."""
parser = IntParser("name", 0)
for i in range(-10, 260):
if i < 0:
assert parser._parse(i) == 0
elif i > 255:
assert parser._parse(i) == 255
else:
assert parser._parse(i) == i


class TestMessageBody:
"""Test message body."""

def test_parse_all(self) -> None:
"""Test parse all."""
data = bytearray(
[
0x00,
0x01,
0x02,
0x03,
0x04,
0x05,
],
)

body = MessageBody(data)
body.parser_list.extend(
[
IntEnumParser("bt", 0, BodyType),
BoolParser("power", 1),
BoolParser("feature_1", 2, 0),
BoolParser("feature_2", 2, 1),
IntParser("speed", 3),
],
)
body.parse_all()
assert hasattr(body, "bt") is True
assert getattr(body, "bt", None) == BodyType.X00
assert hasattr(body, "power") is True
assert getattr(body, "power", False) is True
assert hasattr(body, "feature_1") is True
assert getattr(body, "feature_1", True) is False
assert hasattr(body, "feature_2") is True
assert getattr(body, "feature_2", False) is True
assert hasattr(body, "speed") is True
assert getattr(body, "speed", 0) == 3

0 comments on commit ca1d0ea

Please sign in to comment.