Skip to content

Commit

Permalink
chore: typing ce (#59)
Browse files Browse the repository at this point in the history
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **Refactor**
- Introduced type annotations to enhance code readability and
maintainability.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
  • Loading branch information
chemelli74 authored Jun 4, 2024
1 parent a87b071 commit 2bc283d
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 32 deletions.
17 changes: 9 additions & 8 deletions midealocal/devices/ce/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import logging
import sys
from typing import Any

from .message import MessageCEResponse, MessageQuery, MessageSet

Expand Down Expand Up @@ -50,7 +51,7 @@ def __init__(
model: str,
subtype: int,
customize: str,
):
) -> None:
super().__init__(
name=name,
device_id=device_id,
Expand Down Expand Up @@ -88,17 +89,17 @@ def __init__(
self.set_customize(customize)

@property
def speed_count(self):
def speed_count(self) -> int:
return self._speed_count

@property
def preset_modes(self):
def preset_modes(self) -> list[str]:
return self._modes

def build_query(self):
def build_query(self) -> list[MessageQuery]:
return [MessageQuery(self._protocol_version)]

def process_message(self, msg):
def process_message(self, msg: bytes) -> dict[str, Any]:
message = MessageCEResponse(msg)
_LOGGER.debug(f"[{self.device_id}] Received: {message}")
new_status = {}
Expand All @@ -118,7 +119,7 @@ def process_message(self, msg):
]
return new_status

def make_message_set(self):
def make_message_set(self) -> MessageSet:
message = MessageSet(self._protocol_version)
message.power = self._attributes[DeviceAttributes.power]
message.fan_speed = self._attributes[DeviceAttributes.fan_speed]
Expand All @@ -131,7 +132,7 @@ def make_message_set(self):
message.child_lock = self._attributes[DeviceAttributes.child_lock]
return message

def set_attribute(self, attr, value):
def set_attribute(self, attr: str, value: Any) -> None:
message = self.make_message_set()
if attr == DeviceAttributes.mode:
message.sleep_mode = False
Expand All @@ -144,7 +145,7 @@ def set_attribute(self, attr, value):
setattr(message, str(attr), value)
self.build_send(message)

def set_customize(self, customize):
def set_customize(self, customize: str) -> None:
self._speed_count = self._default_speed_count
if customize and len(customize) > 0:
try:
Expand Down
45 changes: 21 additions & 24 deletions midealocal/devices/ce/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@


class MessageFABase(MessageRequest):
def __init__(self, protocol_version, message_type, body_type):
def __init__(
self, protocol_version: int, message_type: int, body_type: int
) -> None:
super().__init__(
device_type=0xCE,
protocol_version=protocol_version,
Expand All @@ -16,25 +18,25 @@ def __init__(self, protocol_version, message_type, body_type):
)

@property
def _body(self):
def _body(self) -> bytearray:
raise NotImplementedError


class MessageQuery(MessageFABase):
def __init__(self, protocol_version):
def __init__(self, protocol_version: int) -> None:
super().__init__(
protocol_version=protocol_version,
message_type=MessageType.query,
body_type=0x01,
)

@property
def _body(self):
def _body(self) -> bytearray:
return bytearray([])


class MessageSet(MessageFABase):
def __init__(self, protocol_version):
def __init__(self, protocol_version: int) -> None:
super().__init__(
protocol_version=protocol_version,
message_type=MessageType.set,
Expand All @@ -52,7 +54,7 @@ def __init__(self, protocol_version):
self.child_lock = False

@property
def _body(self):
def _body(self) -> bytearray:
power = 0x80 if self.power else 0x00
link_to_ac = 0x01 if self.link_to_ac else 0x00
sleep_mode = 0x02 if self.sleep_mode else 0x00
Expand All @@ -74,62 +76,57 @@ def _body(self):


class CEGeneralMessageBody(MessageBody):
def __init__(self, body):
def __init__(self, body: bytearray) -> None:
super().__init__(body)
self.power = (body[1] & 0x80) > 0
self.child_lock = (body[1] & 0x20) > 0
self.scheduled = (body[1] & 0x40) > 0
self.fan_speed = body[2]
self.pm25 = (body[3] << 8) + body[4]
self.co2 = (body[5] << 8) + body[6]
self.current_humidity: float | None = None
self.current_temperature: float | None = None
self.hcho: float | None = None
self.aux_heating: bool | None = None

if body[7] != 0xFF:
self.current_humidity = (body[7] << 8) + body[8] / 10
else:
self.current_humidity = None
if body[9] != 0xFF:
self.current_temperature = (body[9] << 8) + (body[10] - 60) / 2
else:
self.current_temperature = None
if body[11] != 0xFF:
self.hcho = (body[11] << 8) + body[12] / 1000
else:
self.hcho = None
self.link_to_ac = (body[17] & 0x01) > 0
self.sleep_mode = (body[17] & 0x02) > 0
self.eco_mode = (body[17] & 0x04) > 0
if (body[19] & 0x02) > 0:
self.aux_heating = (body[17] & 0x08) > 0
else:
self.aux_heating = None
self.powerful_purify = (body[17] & 0x10) > 0
self.filter_cleaning_reminder = (body[18] & 0x01) > 0
self.filter_change_reminder = (body[18] & 0x02) > 0
self.error_code = body[24]


class CENotifyMessageBody(MessageBody):
def __init__(self, body):
def __init__(self, body: bytearray) -> None:
super().__init__(body)
self.current_humidity: float | None = None
self.current_temperature: float | None = None
self.hcho: float | None = None

self.pm25 = (body[1] << 8) + body[2]
self.co2 = (body[3] << 8) + body[4]
if body[5] != 0xFF:
self.current_humidity = (body[5] << 8) + body[6] / 10
else:
self.current_humidity = None
if body[7] != 0xFF:
self.current_temperature = (body[7] << 8) + (body[8] - 60) / 2
else:
self.current_temperature = None
if body[9] != 0xFF:
self.hcho = (body[9] << 8) + body[10] / 1000
else:
self.hcho = None
self.error_code = body[12]


class MessageCEResponse(MessageResponse):
def __init__(self, message):
super().__init__(message)
def __init__(self, message: bytes) -> None:
super().__init__(bytearray(message))
if (
self.message_type in [MessageType.query, MessageType.set]
and self.body_type == 0x01
Expand Down

0 comments on commit 2bc283d

Please sign in to comment.