Skip to content

Commit

Permalink
fix(da): avoid indexes out of bounds (#199)
Browse files Browse the repository at this point in the history
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **Bug Fixes**
- Updated logic for assigning device attributes to ensure correct
handling based on attribute values and array lengths.

- **Tests**
- Added comprehensive test cases for initial attribute settings, message
processing, query building, and attribute setting functionalities of the
`MideaDADevice` class.
- Added test cases for various message types related to the DA system,
including query, power, start, and response messages.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
  • Loading branch information
rokam authored Jul 5, 2024
1 parent 330b80f commit 246f9c7
Show file tree
Hide file tree
Showing 4 changed files with 274 additions and 16 deletions.
37 changes: 21 additions & 16 deletions midealocal/devices/da/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,30 +137,35 @@ def process_message(self, msg: bytes) -> dict[str, Any]:
]
for status in self._attributes:
if hasattr(message, str(status)):
value = getattr(message, str(status))
if status == DeviceAttributes.progress:
self._attributes[status] = progress[getattr(message, str(status))]
self._attributes[status] = (
None if value >= len(progress) else progress[value]
)
elif status == DeviceAttributes.program:
self._attributes[status] = program[getattr(message, str(status))]
self._attributes[status] = (
None if value >= len(program) else program[value]
)
elif status == DeviceAttributes.rinse_level:
temp_rinse_level = getattr(message, str(status))
if temp_rinse_level == MIN_TEMP:
self._attributes[status] = "-"
else:
self._attributes[status] = temp_rinse_level
self._attributes[status] = "-" if value == MIN_TEMP else value
elif status == DeviceAttributes.dehydration_speed:
temp_speed = getattr(message, str(status))
if temp_speed == MIN_TEMP:
self._attributes[status] = "-"
else:
self._attributes[status] = speed[temp_speed]
self._attributes[status] = (
None if value >= len(speed) else speed[value]
)
elif status == DeviceAttributes.detergent:
self._attributes[status] = detergent[getattr(message, str(status))]
self._attributes[status] = (
None if value >= len(detergent) else detergent[value]
)
elif status == DeviceAttributes.softener:
self._attributes[status] = softener[getattr(message, str(status))]
self._attributes[status] = (
None if value >= len(softener) else softener[value]
)
elif status == DeviceAttributes.wash_strength:
self._attributes[status] = strength[getattr(message, str(status))]
self._attributes[status] = (
None if value >= len(strength) else strength[value]
)
else:
self._attributes[status] = getattr(message, str(status))
self._attributes[status] = value
new_status[str(status)] = self._attributes[status]
return new_status

Expand Down
1 change: 1 addition & 0 deletions tests/devices/da/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Midea local DA device tests."""
124 changes: 124 additions & 0 deletions tests/devices/da/device_da_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
"""Test da Device."""

from unittest.mock import patch

import pytest

from midealocal.devices.da import DeviceAttributes, MideaDADevice
from midealocal.devices.da.message import MessageQuery
from midealocal.exceptions import ValueWrongType


class TestMideaDADevice:
"""Test Midea DA Device."""

device: MideaDADevice

@pytest.fixture(autouse=True)
def _setup_device(self) -> None:
"""Midea DA Device setup."""
self.device = MideaDADevice(
name="Test Device",
device_id=1,
ip_address="192.168.1.100",
port=6444,
token="AA",
key="BB",
protocol=3,
model="test_model",
subtype=1,
customize="test_customize",
)

def test_initial_attributes(self) -> None:
"""Test initial attributes."""
assert not self.device.attributes[DeviceAttributes.power]
assert not self.device.attributes[DeviceAttributes.start]
assert self.device.attributes[DeviceAttributes.error_code] is None
assert self.device.attributes[DeviceAttributes.washing_data] == bytearray([])
assert self.device.attributes[DeviceAttributes.program] is None
assert self.device.attributes[DeviceAttributes.progress] == "Unknown"
assert self.device.attributes[DeviceAttributes.time_remaining] is None
assert self.device.attributes[DeviceAttributes.wash_time] is None
assert self.device.attributes[DeviceAttributes.soak_time] is None
assert self.device.attributes[DeviceAttributes.dehydration_time] is None
assert self.device.attributes[DeviceAttributes.dehydration_speed] is None
assert self.device.attributes[DeviceAttributes.rinse_count] is None
assert self.device.attributes[DeviceAttributes.rinse_level] is None
assert self.device.attributes[DeviceAttributes.wash_level] is None
assert self.device.attributes[DeviceAttributes.wash_strength] is None
assert self.device.attributes[DeviceAttributes.softener] is None
assert self.device.attributes[DeviceAttributes.detergent] is None

def test_process_message(self) -> None:
"""Test process message."""
with patch("midealocal.devices.da.MessageDAResponse") as mock_message_response:
mock_message = mock_message_response.return_value
mock_message.protocol_version = 3
mock_message.power = True
mock_message.start = True
mock_message.error_code = 10
mock_message.program = 5
mock_message.wash_time = 30
mock_message.soak_time = 10
mock_message.dehydration_time = 2
mock_message.dehydration_speed = 3
mock_message.rinse_count = 3
mock_message.rinse_level = 4
mock_message.wash_level = 1
mock_message.wash_strength = 2
mock_message.softener = 5
mock_message.detergent = 4
mock_message.progress = 2
mock_message.time_remaining = 15 + 60
new_status = self.device.process_message(b"")
assert new_status[DeviceAttributes.power.value]
assert new_status[DeviceAttributes.start.value]
assert new_status[DeviceAttributes.error_code.value] == 10
assert new_status[DeviceAttributes.program.value] == "Memory"
assert new_status[DeviceAttributes.progress.value] == "Rinse"
assert new_status[DeviceAttributes.time_remaining.value] == 75
assert new_status[DeviceAttributes.wash_time.value] == 30
assert new_status[DeviceAttributes.soak_time.value] == 10
assert new_status[DeviceAttributes.dehydration_time.value] == 2
assert new_status[DeviceAttributes.dehydration_speed.value] == "High"
assert new_status[DeviceAttributes.rinse_count.value] == 3
assert new_status[DeviceAttributes.rinse_level.value] == 4
assert new_status[DeviceAttributes.wash_level.value] == 1
assert new_status[DeviceAttributes.wash_strength.value] == "Medium"
assert new_status[DeviceAttributes.softener.value] == "5"
assert new_status[DeviceAttributes.detergent.value] == "4"

mock_message.progress = 15
mock_message.program = 15
mock_message.rinse_level = 15
mock_message.dehydration_speed = 15
mock_message.detergent = 15
mock_message.softener = 15
mock_message.wash_strength = 15
new_status = self.device.process_message(b"")
assert new_status[DeviceAttributes.program.value] is None
assert new_status[DeviceAttributes.progress.value] is None
assert new_status[DeviceAttributes.rinse_level.value] == "-"
assert new_status[DeviceAttributes.dehydration_speed.value] is None
assert new_status[DeviceAttributes.softener.value] is None
assert new_status[DeviceAttributes.detergent.value] is None
assert new_status[DeviceAttributes.wash_strength.value] is None

def test_build_query(self) -> None:
"""Test build query."""
queries = self.device.build_query()
assert len(queries) == 1
assert isinstance(queries[0], MessageQuery)

def test_set_attribute(self) -> None:
"""Test set attribute."""
with patch.object(self.device, "build_send") as mock_build_send:
self.device.set_attribute(DeviceAttributes.power, True)
mock_build_send.assert_called_once()

self.device.set_attribute(DeviceAttributes.start, True)
mock_build_send.assert_called()

with pytest.raises(ValueWrongType):
self.device.set_attribute(DeviceAttributes.start, "On")
128 changes: 128 additions & 0 deletions tests/devices/da/message_da_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
"""Test DA message."""

import pytest

from midealocal.devices.da.message import (
MessageDABase,
MessageDAResponse,
MessagePower,
MessageQuery,
MessageStart,
)


class TestMessageDABase:
"""Test DA Message Base."""

def test_body_not_implemented(self) -> None:
"""Test body not implemented."""
msg = MessageDABase(protocol_version=1, message_type=1, body_type=1)
with pytest.raises(NotImplementedError):
_ = msg.body


class TestMessageQuery:
"""Test Message Query."""

def test_query_body(self) -> None:
"""Test query body."""
query = MessageQuery(protocol_version=1)
expected_body = bytearray([0x03])
assert query.body == expected_body


class TestMessagePower:
"""Test Message Power."""

def test_power_body(self) -> None:
"""Test power body."""
power = MessagePower(protocol_version=1)
expected_body = bytearray([0x02, 0x00, 0xFF])
assert power.body == expected_body
power.power = True
expected_body = bytearray([0x02, 0x01, 0xFF])
assert power.body == expected_body


class TestMessageStart:
"""Test Message Start."""

def test_start_body(self) -> None:
"""Test start body."""
start = MessageStart(protocol_version=1)
expected_body = bytearray([0x02, 0xFF, 0x00])
assert start.body == expected_body
start.start = True
expected_body = bytearray([0x02, 0xFF, 0x01])
assert start.body == expected_body
start.washing_data = bytearray([0x01, 0x02, 0x03])
assert start.body == expected_body + start.washing_data


class TestMessageDAResponse:
"""Test Message DA Response."""

def test_da_general_response(self) -> None:
"""Test general response."""
header = bytearray(
[
0xAA,
0x00,
0xDA,
0x00,
0x00,
0x00,
0x00,
0x00,
0x01,
0x03,
],
)
body = bytearray(26)
body[0] = 0x04 # Body Type
body[1] = 1 # Set power to True
body[2] = 2 # Set start condition to True
body[24] = 10 # Mock error_code
body[4] = 5 # Mock program
body[9] = 30 # Mock wash_time
body[12] = 10 # Mock soak_time
body[10] = (2 << 4) | 3 # Mock dehydration_time and rinse_count
body[6] = (3 << 4) | 2 # Mock dehydration_speed and wash_strength
body[5] = (4 << 4) | 1 # Mock rinse_level and wash_level
body[8] = (5 << 4) | 4 # Mock softener and detergent
body[16] = 2 << 1 # Mock progress
body[17] = 15 # Mock time_remaining lower byte
body[18] = 1 # Mock time_remaining upper byte
response = MessageDAResponse(header + body)
assert hasattr(response, "power")
assert response.power
assert hasattr(response, "start")
assert response.start
assert hasattr(response, "error_code")
assert response.error_code == 10
assert hasattr(response, "program")
assert response.program == 5
assert hasattr(response, "wash_time")
assert response.wash_time == 30
assert hasattr(response, "soak_time")
assert response.soak_time == 10
assert hasattr(response, "dehydration_time")
assert response.dehydration_time == 2
assert hasattr(response, "dehydration_speed")
assert response.dehydration_speed == 3
assert hasattr(response, "rinse_count")
assert response.rinse_count == 3
assert hasattr(response, "rinse_level")
assert response.rinse_level == 4
assert hasattr(response, "wash_level")
assert response.wash_level == 1
assert hasattr(response, "wash_strength")
assert response.wash_strength == 2
assert hasattr(response, "softener")
assert response.softener == 5
assert hasattr(response, "detergent")
assert response.detergent == 4
assert hasattr(response, "progress")
assert response.progress == 2
assert hasattr(response, "time_remaining")
assert response.time_remaining == 15 + 60

0 comments on commit 246f9c7

Please sign in to comment.