From 7879b6765460722ace505bcdea98e8f0dd5cbe9e Mon Sep 17 00:00:00 2001 From: Simone Chemelli Date: Tue, 28 May 2024 11:03:34 +0000 Subject: [PATCH 1/2] chore: typing AC --- midealocal/devices/ac/__init__.py | 56 ++++++++++++--------- midealocal/devices/ac/message.py | 84 +++++++++++++++++-------------- 2 files changed, 78 insertions(+), 62 deletions(-) diff --git a/midealocal/devices/ac/__init__.py b/midealocal/devices/ac/__init__.py index 4642422d..50620a6f 100644 --- a/midealocal/devices/ac/__init__.py +++ b/midealocal/devices/ac/__init__.py @@ -2,6 +2,8 @@ import logging import sys +from typing import Any + from .message import ( MessageACResponse, MessageGeneralSet, @@ -83,7 +85,7 @@ def __init__( model: str, subtype: int, customize: str, - ): + ) -> None: super().__init__( name=name, device_id=device_id, @@ -131,25 +133,32 @@ def __init__( DeviceAttributes.fresh_air_2: None, }, ) - self._fresh_air_version = None - self._default_temperature_step = 0.5 - self._temperature_step = None - self._used_subprotocol = False - self._bb_sn8_flag = False - self._bb_timer = False - self._power_analysis_method = None - self._default_power_analysis_method = 1 + self._fresh_air_version: DeviceAttributes | None = None + self._default_temperature_step: float = 0.5 + self._temperature_step: float = 0.5 + self._used_subprotocol: bool = False + self._bb_sn8_flag: bool = False + self._bb_timer: bool = False + self._power_analysis_method: int = 1 + self._default_power_analysis_method: int = 1 self.set_customize(customize) @property - def temperature_step(self): + def temperature_step(self) -> float | None: return self._temperature_step @property - def fresh_air_fan_speeds(self): + def fresh_air_fan_speeds(self) -> list[str]: return list(MideaACDevice._fresh_air_fan_speeds.values()) - def build_query(self): + def build_query( + self, + ) -> list[ + MessageSubProtocolQuery + | MessageQuery + | MessageNewProtocolQuery + | MessagePowerQuery + ]: if self._used_subprotocol: return [ MessageSubProtocolQuery(self._protocol_version, 0x10), @@ -162,7 +171,7 @@ def build_query(self): MessagePowerQuery(self._protocol_version), ] - def process_message(self, msg): + def process_byte_message(self, msg: bytearray) -> dict[str, Any]: message = MessageACResponse(msg, self._power_analysis_method) _LOGGER.debug(f"[{self.device_id}] Received: {message}") new_status = {} @@ -207,7 +216,7 @@ def process_message(self, msg): self._fresh_air_version = DeviceAttributes.fresh_air_2 return new_status - def make_message_set(self): + def make_message_set(self) -> MessageGeneralSet: message = MessageGeneralSet(self._protocol_version) message.power = self._attributes[DeviceAttributes.power] message.prompt_tone = self._attributes[DeviceAttributes.prompt_tone] @@ -230,7 +239,7 @@ def make_message_set(self): message.comfort_mode = self._attributes[DeviceAttributes.comfort_mode] return message - def make_subptotocol_message_set(self): + def make_subptotocol_message_set(self) -> MessageSubProtocolSet: message = MessageSubProtocolSet(self._protocol_version) message.power = self._attributes[DeviceAttributes.power] message.prompt_tone = self._attributes[DeviceAttributes.prompt_tone] @@ -248,16 +257,17 @@ def make_subptotocol_message_set(self): message.timer = self._bb_timer return message - def make_message_uniq_set(self): + def make_message_uniq_set(self) -> MessageSubProtocolSet | MessageGeneralSet: + message: MessageSubProtocolSet | MessageGeneralSet if self._used_subprotocol: message = self.make_subptotocol_message_set() else: message = self.make_message_set() return message - def set_attribute(self, attr, value): + def set_device_attribute(self, attr: DeviceAttributes, value: Any) -> None: # if nat a sensor - message = None + message: MessageToggleDisplay | MessageNewProtocolSet | None = None if attr not in [ DeviceAttributes.indoor_temperature, DeviceAttributes.outdoor_temperature, @@ -343,21 +353,21 @@ def set_attribute(self, attr, value): if message is not None: self.build_send(message) - def set_target_temperature(self, target_temperature, mode): - message = self.make_message_uniq_set() + def set_target_temperature(self, target_temperature: float, mode: int) -> None: + message: MessageGeneralSet = self.make_message_uniq_set() message.target_temperature = target_temperature if mode is not None: message.power = True message.mode = mode self.build_send(message) - def set_swing(self, swing_vertical, swing_horizontal): - message = self.make_message_uniq_set() + def set_swing(self, swing_vertical: bool, swing_horizontal: bool) -> None: + message: MessageGeneralSet = self.make_message_uniq_set() message.swing_vertical = swing_vertical message.swing_horizontal = swing_horizontal self.build_send(message) - def set_customize(self, customize): + def set_customize(self, customize: str) -> None: self._temperature_step = self._default_temperature_step self._power_analysis_method = self._default_power_analysis_method if customize and len(customize) > 0: diff --git a/midealocal/devices/ac/message.py b/midealocal/devices/ac/message.py index d289dc05..6c922555 100644 --- a/midealocal/devices/ac/message.py +++ b/midealocal/devices/ac/message.py @@ -25,7 +25,9 @@ class NewProtocolTags(IntEnum): class MessageACBase(MessageRequest): _message_serial = 0 - def __init__(self, protocol_version, message_type, body_type): + def __init__( + self, protocol_version: int, message_type: int, body_type: int + ) -> None: super().__init__( device_type=0xAC, protocol_version=protocol_version, @@ -38,18 +40,18 @@ def __init__(self, protocol_version, message_type, body_type): self._message_id = MessageACBase._message_serial @property - def _body(self): + def _body(self) -> bytearray: raise NotImplementedError @property - def body(self): + def body(self) -> bytearray: body = bytearray([self.body_type]) + self._body + bytearray([self._message_id]) body.append(calculate(body)) return body class MessageQuery(MessageACBase): - def __init__(self, protocol_version): + def __init__(self, protocol_version: int) -> None: super().__init__( protocol_version=protocol_version, message_type=MessageType.query, @@ -57,7 +59,7 @@ def __init__(self, protocol_version): ) @property - def _body(self): + def _body(self) -> bytearray: return bytearray( [ 0x81, @@ -84,7 +86,7 @@ def _body(self): class MessagePowerQuery(MessageACBase): - def __init__(self, protocol_version): + def __init__(self, protocol_version: int) -> None: super().__init__( protocol_version=protocol_version, message_type=MessageType.query, @@ -92,18 +94,18 @@ def __init__(self, protocol_version): ) @property - def _body(self): + def _body(self) -> bytearray: return bytearray([0x21, 0x01, 0x44, 0x00, 0x01]) @property - def body(self): + def body(self) -> bytearray: body = bytearray([self.body_type]) + self._body body.append(calculate(body)) return body class MessageToggleDisplay(MessageACBase): - def __init__(self, protocol_version): + def __init__(self, protocol_version: int) -> None: super().__init__( protocol_version=protocol_version, message_type=MessageType.query, @@ -112,7 +114,7 @@ def __init__(self, protocol_version): self.prompt_tone = False @property - def _body(self): + def _body(self) -> bytearray: prompt_tone = 0x40 if self.prompt_tone else 0 return bytearray( [ @@ -140,7 +142,7 @@ def _body(self): class MessageNewProtocolQuery(MessageACBase): - def __init__(self, protocol_version): + def __init__(self, protocol_version: int) -> None: super().__init__( protocol_version=protocol_version, message_type=MessageType.query, @@ -148,7 +150,7 @@ def __init__(self, protocol_version): ) @property - def _body(self): + def _body(self) -> bytearray: query_params = [ NewProtocolTags.indirect_wind, NewProtocolTags.breezeless, @@ -165,7 +167,9 @@ def _body(self): class MessageSubProtocol(MessageACBase): - def __init__(self, protocol_version, message_type, subprotocol_query_type): + def __init__( + self, protocol_version: int, message_type: int, subprotocol_query_type: int + ) -> None: super().__init__( protocol_version=protocol_version, message_type=message_type, @@ -174,18 +178,18 @@ def __init__(self, protocol_version, message_type, subprotocol_query_type): self._subprotocol_query_type = subprotocol_query_type @property - def _subprotocol_body(self): + def _subprotocol_body(self) -> bytes: return bytes([]) @property - def body(self): + def body(self) -> bytearray: body = bytearray([self.body_type]) + self._body body.append(calculate(body)) body.append(self.checksum(body)) return body @property - def _body(self): + def _body(self) -> bytearray: _subprotocol_body = self._subprotocol_body _body = bytearray( [ @@ -204,7 +208,7 @@ def _body(self): class MessageSubProtocolQuery(MessageSubProtocol): - def __init__(self, protocol_version, subprotocol_query_type): + def __init__(self, protocol_version: int, subprotocol_query_type: int) -> None: super().__init__( protocol_version=protocol_version, message_type=MessageType.query, @@ -213,7 +217,7 @@ def __init__(self, protocol_version, subprotocol_query_type): class MessageSubProtocolSet(MessageSubProtocol): - def __init__(self, protocol_version): + def __init__(self, protocol_version: int) -> None: super().__init__( protocol_version=protocol_version, message_type=MessageType.set, @@ -233,7 +237,7 @@ def __init__(self, protocol_version): self.prompt_tone = False @property - def _subprotocol_body(self): + def _subprotocol_body(self) -> bytes: power = 0x01 if self.power else 0 dry = 0x10 if self.power and self.dry else 0 boost_mode = 0x20 if self.boost_mode else 0 @@ -294,7 +298,7 @@ def _subprotocol_body(self): class MessageGeneralSet(MessageACBase): - def __init__(self, protocol_version): + def __init__(self, protocol_version: int) -> None: super().__init__( protocol_version=protocol_version, message_type=MessageType.set, @@ -319,7 +323,7 @@ def __init__(self, protocol_version): self.comfort_mode = False @property - def _body(self): + def _body(self) -> bytearray: # Byte1, Power, prompt_tone power = 0x01 if self.power else 0 prompt_tone = 0x40 if self.prompt_tone else 0 @@ -383,21 +387,21 @@ def _body(self): class MessageNewProtocolSet(MessageACBase): - def __init__(self, protocol_version): + def __init__(self, protocol_version: int) -> None: super().__init__( protocol_version=protocol_version, message_type=MessageType.set, body_type=0xB0, ) - self.indirect_wind = None - self.prompt_tone = None - self.breezeless = None - self.screen_display_alternate = None - self.fresh_air_1 = None - self.fresh_air_2 = None + self.indirect_wind: bytes | None = None + self.prompt_tone: bytes | None = None + self.breezeless: bytes | None = None + self.screen_display_alternate: bytes | None = None + self.fresh_air_1: bytes | None = None + self.fresh_air_2: bytes | None = None @property - def _body(self): + def _body(self) -> bytearray: pack_count = 0 payload = bytearray([0x00]) if self.breezeless is not None: @@ -470,7 +474,7 @@ def _body(self): class XA0MessageBody(MessageBody): - def __init__(self, body): + def __init__(self, body: bytearray) -> None: super().__init__(body) self.power = (body[1] & 0x1) > 0 self.target_temperature = ( @@ -492,7 +496,7 @@ def __init__(self, body): class XA1MessageBody(MessageBody): - def __init__(self, body): + def __init__(self, body: bytearray) -> None: super().__init__(body) if body[13] != 0xFF: temp_integer = int((body[13] - 50) / 2) @@ -514,7 +518,7 @@ def __init__(self, body): class XBXMessageBody(NewProtocolMessageBody): - def __init__(self, body, bt): + def __init__(self, body: bytearray, bt: int) -> None: super().__init__(body, bt) params = self.parse() if NewProtocolTags.indirect_wind in params: @@ -541,7 +545,7 @@ def __init__(self, body, bt): class XC0MessageBody(MessageBody): - def __init__(self, body): + def __init__(self, body: bytearray) -> None: super().__init__(body) self.power = (body[1] & 0x1) > 0 self.mode = (body[2] & 0xE0) >> 5 @@ -582,7 +586,7 @@ def __init__(self, body): class XC1MessageBody(MessageBody): - def __init__(self, body, analysis_method=3): + def __init__(self, body: bytearray, analysis_method: int = 3) -> None: super().__init__(body) if body[3] == 0x44: self.total_energy_consumption = XC1MessageBody.parse_consumption( @@ -609,11 +613,11 @@ def __init__(self, body, analysis_method=3): pass @staticmethod - def parse_value(byte): + def parse_value(byte: int) -> int: return (byte >> 4) * 10 + (byte & 0x0F) @staticmethod - def parse_power(analysis_method, byte1, byte2, byte3): + def parse_power(analysis_method: int, byte1: int, byte2: int, byte3: int) -> float: if analysis_method == 1: return ( float( @@ -629,7 +633,9 @@ def parse_power(analysis_method, byte1, byte2, byte3): return float(byte1 * 10000 + byte2 * 100 + byte3) / 10 @staticmethod - def parse_consumption(analysis_method, byte1, byte2, byte3, byte4): + def parse_consumption( + analysis_method: int, byte1: int, byte2: int, byte3: int, byte4: int + ) -> float: if analysis_method == 1: return ( float( @@ -647,7 +653,7 @@ def parse_consumption(analysis_method, byte1, byte2, byte3, byte4): class XBBMessageBody(MessageBody): - def __init__(self, body): + def __init__(self, body: bytearray) -> None: super().__init__(body) subprotocol_head = body[:6] subprotocol_body = body[6:] @@ -704,7 +710,7 @@ def __init__(self, body): class MessageACResponse(MessageResponse): - def __init__(self, message, power_analysis_method=3): + def __init__(self, message: bytearray, power_analysis_method: int = 3) -> None: super().__init__(message) if self.message_type == MessageType.notify2 and self.body_type == 0xA0: self.set_body(XA0MessageBody(super().body)) From 4b8274af33de00867d5a8f01b1061708dd485a19 Mon Sep 17 00:00:00 2001 From: Simone Chemelli Date: Tue, 4 Jun 2024 13:38:10 +0000 Subject: [PATCH 2/2] chore: revert method rename --- midealocal/devices/ac/__init__.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/midealocal/devices/ac/__init__.py b/midealocal/devices/ac/__init__.py index 50620a6f..5fb5ad7c 100644 --- a/midealocal/devices/ac/__init__.py +++ b/midealocal/devices/ac/__init__.py @@ -171,8 +171,8 @@ def build_query( MessagePowerQuery(self._protocol_version), ] - def process_byte_message(self, msg: bytearray) -> dict[str, Any]: - message = MessageACResponse(msg, self._power_analysis_method) + def process_message(self, msg: bytes) -> dict[str, Any]: + message = MessageACResponse(bytearray(msg), self._power_analysis_method) _LOGGER.debug(f"[{self.device_id}] Received: {message}") new_status = {} has_fresh_air = False @@ -265,7 +265,7 @@ def make_message_uniq_set(self) -> MessageSubProtocolSet | MessageGeneralSet: message = self.make_message_set() return message - def set_device_attribute(self, attr: DeviceAttributes, value: Any) -> None: + def set_attribute(self, attr: str, value: Any) -> None: # if nat a sensor message: MessageToggleDisplay | MessageNewProtocolSet | None = None if attr not in [