diff --git a/midealocal/devices/ce/__init__.py b/midealocal/devices/ce/__init__.py index fd919b90..e8cc68fd 100644 --- a/midealocal/devices/ce/__init__.py +++ b/midealocal/devices/ce/__init__.py @@ -1,6 +1,7 @@ import json import logging import sys +from typing import Any from .message import MessageCEResponse, MessageQuery, MessageSet @@ -50,7 +51,7 @@ def __init__( model: str, subtype: int, customize: str, - ): + ) -> None: super().__init__( name=name, device_id=device_id, @@ -88,17 +89,17 @@ def __init__( self.set_customize(customize) @property - def speed_count(self): + def speed_count(self) -> int: return self._speed_count @property - def preset_modes(self): + def preset_modes(self) -> list[str]: return self._modes - def build_query(self): + def build_query(self) -> list[MessageQuery]: return [MessageQuery(self._protocol_version)] - def process_message(self, msg): + def process_message(self, msg: bytes) -> dict[str, Any]: message = MessageCEResponse(msg) _LOGGER.debug(f"[{self.device_id}] Received: {message}") new_status = {} @@ -118,7 +119,7 @@ def process_message(self, msg): ] return new_status - def make_message_set(self): + def make_message_set(self) -> MessageSet: message = MessageSet(self._protocol_version) message.power = self._attributes[DeviceAttributes.power] message.fan_speed = self._attributes[DeviceAttributes.fan_speed] @@ -131,7 +132,7 @@ def make_message_set(self): message.child_lock = self._attributes[DeviceAttributes.child_lock] return message - def set_attribute(self, attr, value): + def set_attribute(self, attr: str, value: Any) -> None: message = self.make_message_set() if attr == DeviceAttributes.mode: message.sleep_mode = False @@ -144,7 +145,7 @@ def set_attribute(self, attr, value): setattr(message, str(attr), value) self.build_send(message) - def set_customize(self, customize): + def set_customize(self, customize: str) -> None: self._speed_count = self._default_speed_count if customize and len(customize) > 0: try: diff --git a/midealocal/devices/ce/message.py b/midealocal/devices/ce/message.py index f4714dc9..ef0bc500 100644 --- a/midealocal/devices/ce/message.py +++ b/midealocal/devices/ce/message.py @@ -7,7 +7,9 @@ class MessageFABase(MessageRequest): - def __init__(self, protocol_version, message_type, body_type): + def __init__( + self, protocol_version: int, message_type: int, body_type: int + ) -> None: super().__init__( device_type=0xCE, protocol_version=protocol_version, @@ -16,12 +18,12 @@ def __init__(self, protocol_version, message_type, body_type): ) @property - def _body(self): + def _body(self) -> bytearray: raise NotImplementedError class MessageQuery(MessageFABase): - def __init__(self, protocol_version): + def __init__(self, protocol_version: int) -> None: super().__init__( protocol_version=protocol_version, message_type=MessageType.query, @@ -29,12 +31,12 @@ def __init__(self, protocol_version): ) @property - def _body(self): + def _body(self) -> bytearray: return bytearray([]) class MessageSet(MessageFABase): - def __init__(self, protocol_version): + def __init__(self, protocol_version: int) -> None: super().__init__( protocol_version=protocol_version, message_type=MessageType.set, @@ -52,7 +54,7 @@ def __init__(self, protocol_version): self.child_lock = False @property - def _body(self): + def _body(self) -> bytearray: power = 0x80 if self.power else 0x00 link_to_ac = 0x01 if self.link_to_ac else 0x00 sleep_mode = 0x02 if self.sleep_mode else 0x00 @@ -74,7 +76,7 @@ def _body(self): class CEGeneralMessageBody(MessageBody): - def __init__(self, body): + def __init__(self, body: bytearray) -> None: super().__init__(body) self.power = (body[1] & 0x80) > 0 self.child_lock = (body[1] & 0x20) > 0 @@ -82,25 +84,22 @@ def __init__(self, body): self.fan_speed = body[2] self.pm25 = (body[3] << 8) + body[4] self.co2 = (body[5] << 8) + body[6] + self.current_humidity: float | None = None + self.current_temperature: float | None = None + self.hcho: float | None = None + self.aux_heating: bool | None = None + if body[7] != 0xFF: self.current_humidity = (body[7] << 8) + body[8] / 10 - else: - self.current_humidity = None if body[9] != 0xFF: self.current_temperature = (body[9] << 8) + (body[10] - 60) / 2 - else: - self.current_temperature = None if body[11] != 0xFF: self.hcho = (body[11] << 8) + body[12] / 1000 - else: - self.hcho = None self.link_to_ac = (body[17] & 0x01) > 0 self.sleep_mode = (body[17] & 0x02) > 0 self.eco_mode = (body[17] & 0x04) > 0 if (body[19] & 0x02) > 0: self.aux_heating = (body[17] & 0x08) > 0 - else: - self.aux_heating = None self.powerful_purify = (body[17] & 0x10) > 0 self.filter_cleaning_reminder = (body[18] & 0x01) > 0 self.filter_change_reminder = (body[18] & 0x02) > 0 @@ -108,28 +107,26 @@ def __init__(self, body): class CENotifyMessageBody(MessageBody): - def __init__(self, body): + def __init__(self, body: bytearray) -> None: super().__init__(body) + self.current_humidity: float | None = None + self.current_temperature: float | None = None + self.hcho: float | None = None + self.pm25 = (body[1] << 8) + body[2] self.co2 = (body[3] << 8) + body[4] if body[5] != 0xFF: self.current_humidity = (body[5] << 8) + body[6] / 10 - else: - self.current_humidity = None if body[7] != 0xFF: self.current_temperature = (body[7] << 8) + (body[8] - 60) / 2 - else: - self.current_temperature = None if body[9] != 0xFF: self.hcho = (body[9] << 8) + body[10] / 1000 - else: - self.hcho = None self.error_code = body[12] class MessageCEResponse(MessageResponse): - def __init__(self, message): - super().__init__(message) + def __init__(self, message: bytes) -> None: + super().__init__(bytearray(message)) if ( self.message_type in [MessageType.query, MessageType.set] and self.body_type == 0x01