From deb41b5bfd71f40f688cc111abf40b43e405661b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20Mind=C3=AAllo=20de=20Andrade?= Date: Sun, 14 Jul 2024 05:07:38 +0000 Subject: [PATCH 1/8] feat(b8): first implementation --- midealocal/devices/b8/__init__.py | 132 +++++++++++++ midealocal/devices/b8/message.py | 317 ++++++++++++++++++++++++++++++ 2 files changed, 449 insertions(+) create mode 100644 midealocal/devices/b8/__init__.py create mode 100644 midealocal/devices/b8/message.py diff --git a/midealocal/devices/b8/__init__.py b/midealocal/devices/b8/__init__.py new file mode 100644 index 00000000..bfe4229f --- /dev/null +++ b/midealocal/devices/b8/__init__.py @@ -0,0 +1,132 @@ +"""Midea local B8 device.""" + +import logging +from enum import IntEnum, StrEnum +from typing import Any + +from midealocal.device import MideaDevice + +from .message import ( + CleanMode, + ControlType, + ErrorType, + MessageB8Response, + MessageQuery, + MopState, + Moviment, +) + +_LOGGER = logging.getLogger(__name__) + + +class DeviceAttributes(StrEnum): + """Midea B8 device attributes.""" + + WORK_STATUS = "work_status" + FUNCTION_TYPE = "function_type" + CONTROL_TYPE = "control_type" + MOVE_DIRECTION = "move_direction" + CLEAN_MODE = "clean_mode" + FAN_LEVEL = "fan_level" + AREA = "area" + WATER_LEVEL = "water_level" + VOICE_VOLUME = "voice_volume" + MOP = "mop" + CARPET_SWITCH = "carpet_switch" + SPEED = "speed" + HAVE_RESERVE_TASK = "have_reserve_task" + BATTERY_PERCENT = "battery_percent" + WORK_TIME = "work_time" + UV_SWITCH = "uv_switch" + WIFI_SWITCH = "wifi_switch" + VOICE_SWITCH = "voice_switch" + COMMAND_SOURCE = "command_source" + ERROR_TYPE = "error_type" + ERROR_DESC = "error_desc" + DEVICE_ERROR = "device_error" + BOARD_COMMUNICATION_ERROR = "board_communication_error" + LASER_SENSOR_SHELTER = "laser_sensor_shelter" + LASER_SENSOR_ERROR = "laser_sensor_error" + + +class MideaB8Device(MideaDevice): + """Midea B8 device.""" + + def __init__( + self, + name: str, + device_id: int, + ip_address: str, + port: int, + token: str, + key: str, + protocol: int, + model: str, + subtype: int, + customize: str, # noqa: ARG002 + ) -> None: + """Initialize Midea B8 device.""" + super().__init__( + name=name, + device_id=device_id, + device_type=0xB8, + ip_address=ip_address, + port=port, + token=token, + key=key, + protocol=protocol, + model=model, + subtype=subtype, + attributes={ + DeviceAttributes.WORK_STATUS: None, + DeviceAttributes.FUNCTION_TYPE: None, + DeviceAttributes.CONTROL_TYPE: ControlType.NONE, + DeviceAttributes.MOVE_DIRECTION: Moviment.NONE, + DeviceAttributes.CLEAN_MODE: CleanMode.NONE, + DeviceAttributes.FAN_LEVEL: None, + DeviceAttributes.AREA: None, + DeviceAttributes.WATER_LEVEL: None, + DeviceAttributes.VOICE_VOLUME: 0, + DeviceAttributes.MOP: MopState.OFF, + DeviceAttributes.CARPET_SWITCH: False, + DeviceAttributes.SPEED: None, + DeviceAttributes.HAVE_RESERVE_TASK: False, + DeviceAttributes.BATTERY_PERCENT: 0, + DeviceAttributes.WORK_TIME: 0, + DeviceAttributes.UV_SWITCH: False, + DeviceAttributes.WIFI_SWITCH: False, + DeviceAttributes.VOICE_SWITCH: False, + DeviceAttributes.COMMAND_SOURCE: False, + DeviceAttributes.ERROR_TYPE: ErrorType.NO, + DeviceAttributes.ERROR_DESC: None, + DeviceAttributes.DEVICE_ERROR: False, + DeviceAttributes.BOARD_COMMUNICATION_ERROR: False, + DeviceAttributes.LASER_SENSOR_SHELTER: False, + DeviceAttributes.LASER_SENSOR_ERROR: False, + }, + ) + + def build_query(self) -> list[MessageQuery]: + """Midea B8 device build query.""" + return [MessageQuery(self._protocol_version)] + + def process_message(self, msg: bytes) -> dict[str, Any]: + """Midea B8 device process message.""" + message = MessageB8Response(msg) + _LOGGER.debug("[%s] Received: %s", self.device_id, message) + new_status = {} + for status in self._attributes: + if hasattr(message, str(status)): + value = getattr(message, str(status)) + if isinstance(value, IntEnum): # lowercase name for IntEnums + value = value.name.lower() + self._attributes[status] = value + new_status[str(status)] = self._attributes[status] + return new_status + + def set_attribute(self, attr: str, value: bool | int | str) -> None: + """Midea B8 device set attribute.""" + + +class MideaAppliance(MideaB8Device): + """Midea B8 appliance.""" diff --git a/midealocal/devices/b8/message.py b/midealocal/devices/b8/message.py new file mode 100644 index 00000000..c65a3aa5 --- /dev/null +++ b/midealocal/devices/b8/message.py @@ -0,0 +1,317 @@ +"""Midea local B8 message.""" + +from enum import IntEnum, StrEnum + +from midealocal.message import ( + BodyType, + MessageBody, + MessageRequest, + MessageResponse, + MessageType, +) + + +class WorkStatus(IntEnum): + """Midea B8 work status.""" + + CHARGE = 0x01 + WORK = 0x02 + STOP = 0x03 + CHARGING_ON_DOCK = 0x04 + RESERVE_TASK_FINISHED = 0x05 + CHARGE_FINISH = 0x06 + CHARGING_WITH_WIRE = 0x07 + PAUSE = 0x08 + UPDATING = 0x09 + SAVING_MAP = 0x0A + ERROR = 0x0B + SLEEP = 0x0C + CHARGE_PAUSE = 0x0D + RELOCATE = 0x0E + ELECTROLYSED_WATER_MAKING = 0x0F + DUST_COLLECTING = 0x10 + BACK_DUST_COLLECTING = 0x11 + SLEEP_IN_STATION = 0x12 + + +class FunctionType(IntEnum): + """Midea B8 function type.""" + + DUST_BOX_CLEANING = 0x01 + WATER_TANK_CLEANING = 0x02 + + +class ControlType(IntEnum): + """Midea B8 control type.""" + + NONE = 0x0 + MANUAL = 0x1 + AUTO = 0x2 + + +class Moviment(IntEnum): + """Midea B8 movement.""" + + NONE = 0x0 + FORWARD = 0x1 + BACK = 0x2 + LEFT = 0x3 + RIGHT = 0x4 + + +class CleanMode(IntEnum): + """Midea B8 clean mode.""" + + NONE = 0x00 + RANDOM = 0x01 + ARC = 0x02 + EDGE = 0x03 + EMPHASES = 0x04 + SCREW = 0x05 + BED = 0x06 + WIDE_SCREW = 0x07 + AUTO = 0x08 + AREA = 0x09 + ZONE_INDEX = 0x0A + ZONE_RECT = 0x0B + PATH = 0x0C + + +class FanLevel(IntEnum): + """Midea B8 fan level.""" + + OFF = 0x0 + SOFT = 0x1 + NORMAL = 0x2 + HIGH = 0x3 + LOW = 0x4 + + +class WaterLevel(IntEnum): + """Midea B8 water level.""" + + OFF = 0x0 + LOW = 0x1 + NORMAL = 0x2 + HIGH = 0x3 + + +class MopState(StrEnum): + """Midea B8 mop state.""" + + OFF = "off" + ON = "on" + LACK_WATER = "lack_water" + + +class Speed(StrEnum): + """Midea B8 speed.""" + + LOW = "low" + HIGH = "high" + + +class ErrorType(IntEnum): + """Midea B8 error type.""" + + NO = 0x00 + CAN_FIX = 0x01 + REBOOT = 0x02 + WARNING = 0x03 + + +class ErrorCanFixDescription(IntEnum): + """Midea B8 error can fix description.""" + + NO = 0x0 + FIX_DUST = 0x01 + FIX_WHEEL_HANG = 0x02 + FIX_WHEEL_OVERLOAD = 0x03 + FIX_SIDE_BRUSH_OVERLOAD = 0x04 + FIX_ROLL_BRUSH_OVERLOAD = 0x05 + FIX_DUST_ENGINE = 0x06 + FIX_FRONT_PANEL = 0x07 + FIX_RADAR_MASK = 0x08 + FIX_DROP_SENSOR = 0x09 + FIX_LOW_BATTERY = 0x0A + FIX_ABNORMAL_POSTURE = 0x0B + FIX_LASER_SENSOR = 0x0C + FIX_EDGE_SENSOR = 0x0D + FIX_START_IN_FORBID_AREA = 0x0E + FIX_START_IN_STRONG_MAGNETIC = 0x0F + FIX_LASER_SENSOR_BLOCKED = 0x10 + + +class ErrorRebootDescription(IntEnum): + """Midea B8 error reboot description.""" + + NO = 0x00 + REBOOT_LASER_COMM_FAIL = 0x01 + REBOOT_ROBOT_COMM_FAIL = 0x02 + REBOOT_INNER_FAIL = 0x03 + + +class ErrorWarningDescription(IntEnum): + """Midea B8 error warning description.""" + + NO = 0x00 + WARN_LOCATION_FAIL = 0x01 + WARN_LOW_BATTERY = 0x02 + WARN_FULL_DUST = 0x03 + WARN_LOW_WATER = 0x04 + + +class StatusType(IntEnum): + """B8 Status Type.""" + + X01 = 0x01 + + +class MessageB8Base(MessageRequest): + """B8 message base.""" + + def __init__( + self, + protocol_version: int, + message_type: int, + body_type: int, + ) -> None: + """Initialize B8 message base.""" + super().__init__( + device_type=0xB8, + protocol_version=protocol_version, + message_type=message_type, + body_type=body_type, + ) + + @property + def _body(self) -> bytearray: + raise NotImplementedError + + +class MessageQuery(MessageB8Base): + """B8 message query.""" + + def __init__(self, protocol_version: int) -> None: + """Initialize B8 message query.""" + super().__init__( + protocol_version=protocol_version, + message_type=MessageType.query, + body_type=BodyType.X32, + ) + + @property + def _body(self) -> bytearray: + return bytearray([0x01]) + + +class MessageSet(MessageB8Base): + """B8 message set.""" + + def __init__(self, protocol_version: int) -> None: + """Initialize B8 message set.""" + super().__init__( + protocol_version=protocol_version, + message_type=MessageType.set, + body_type=0x22, + ) + self.clean_mode = CleanMode.AUTO + self.fan_level = FanLevel.NORMAL + self.water_level = WaterLevel.LOW + self.voice_volume = 0 + self.zone_id = 0 + + @property + def _body(self) -> bytearray: + return bytearray( + [ + 0x02, + 0x02, + self.clean_mode, + self.fan_level, + self.water_level, + self.voice_volume, + self.zone_id, + ] + + [0x00] * 7, + ) + + +class MessageB8WorkStatusBody(MessageBody): + """B8 message work status body.""" + + def __init__(self, body: bytearray) -> None: + """Initialize B8 message work status body.""" + super().__init__(body) + self.work_status = WorkStatus(body[2]) + self.function_type = FunctionType(body[3]) + self.control_type = ControlType(body[4]) + self.move_direction = Moviment(body[5]) + self.clean_mode = CleanMode(body[6]) + self.fan_level = FanLevel(body[7]) + self.area = body[8] + self.water_level = WaterLevel(body[9]) + self.voice_volume = body[10] + mop = body[17] + if mop == 0: + self.mop = MopState.OFF + elif mop == 1: + self.mop = MopState.ON + else: + self.mop = MopState.LACK_WATER + self.carpet_switch = body[18] == 1 + self.speed = Speed.LOW if body[20] == 1 else Speed.HIGH + self.have_reserve_task = body[11] != 0 + self.battery_percent = body[12] + self.work_time = body[13] + err_user_high = body[19] + status_summary = body[14] + self.error_type = ErrorType(body[15]) + self.uv_switch = status_summary & 0x01 > 0 + self.wifi_switch = status_summary & 0x02 > 0 + self.voice_switch = status_summary & 0x04 > 0 + self.command_source = status_summary & 0x40 > 0 + self.device_error = status_summary & 0x80 > 0 + self.board_communication_error = err_user_high & 0x4 > 0 + self.laser_sensor_shelter = err_user_high & 0x2 > 0 + self.laser_sensor_error = err_user_high & 0x1 > 0 + self.error_desc: ( + ErrorCanFixDescription + | ErrorRebootDescription + | ErrorWarningDescription + | None + ) = None + if self.error_type == ErrorType.CAN_FIX: + self.error_desc = ErrorCanFixDescription(body[16]) + elif self.error_type == ErrorType.REBOOT: + self.error_desc = ErrorRebootDescription(body[16]) + elif self.error_type == ErrorType.WARNING: + self.error_desc = ErrorWarningDescription(body[16]) + + +class MessageB8Response(MessageResponse): + """B8 message response.""" + + def __init__(self, message: bytes) -> None: + """Initialize B8 message response.""" + super().__init__(bytearray(message)) + body = MessageB8Response.parse_body( + MessageType(self.message_type), + super().body, + ) + if body is not None: + self.set_body(body) + self.set_attr() + + @staticmethod + def parse_body(message_type: MessageType, body: bytearray) -> MessageBody | None: + """Parse body.""" + body_type = body[0] + status_type = body[1] + if ( + message_type == MessageType.query + and body_type == BodyType.X32 + and status_type == StatusType.X01 + ): + return MessageB8WorkStatusBody(body) + return None From dd801ee2c21d63116f24416aadc0d28b978015d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20Mind=C3=AAllo=20de=20Andrade?= Date: Mon, 15 Jul 2024 14:45:13 +0000 Subject: [PATCH 2/8] refactor(b8): move enums to const --- midealocal/devices/b8/__init__.py | 98 +++++-------- midealocal/devices/b8/const.py | 189 +++++++++++++++++++++++++ midealocal/devices/b8/message.py | 224 ++++++------------------------ 3 files changed, 265 insertions(+), 246 deletions(-) create mode 100644 midealocal/devices/b8/const.py diff --git a/midealocal/devices/b8/__init__.py b/midealocal/devices/b8/__init__.py index bfe4229f..c96f3d4a 100644 --- a/midealocal/devices/b8/__init__.py +++ b/midealocal/devices/b8/__init__.py @@ -1,54 +1,26 @@ """Midea local B8 device.""" import logging -from enum import IntEnum, StrEnum +from enum import IntEnum from typing import Any from midealocal.device import MideaDevice - -from .message import ( - CleanMode, - ControlType, - ErrorType, +from midealocal.devices.b8.const import ( + B8CleanMode, + B8ControlType, + B8DeviceAttributes, + B8ErrorType, + B8MopState, + B8Moviment, +) +from midealocal.devices.b8.message import ( MessageB8Response, MessageQuery, - MopState, - Moviment, ) _LOGGER = logging.getLogger(__name__) -class DeviceAttributes(StrEnum): - """Midea B8 device attributes.""" - - WORK_STATUS = "work_status" - FUNCTION_TYPE = "function_type" - CONTROL_TYPE = "control_type" - MOVE_DIRECTION = "move_direction" - CLEAN_MODE = "clean_mode" - FAN_LEVEL = "fan_level" - AREA = "area" - WATER_LEVEL = "water_level" - VOICE_VOLUME = "voice_volume" - MOP = "mop" - CARPET_SWITCH = "carpet_switch" - SPEED = "speed" - HAVE_RESERVE_TASK = "have_reserve_task" - BATTERY_PERCENT = "battery_percent" - WORK_TIME = "work_time" - UV_SWITCH = "uv_switch" - WIFI_SWITCH = "wifi_switch" - VOICE_SWITCH = "voice_switch" - COMMAND_SOURCE = "command_source" - ERROR_TYPE = "error_type" - ERROR_DESC = "error_desc" - DEVICE_ERROR = "device_error" - BOARD_COMMUNICATION_ERROR = "board_communication_error" - LASER_SENSOR_SHELTER = "laser_sensor_shelter" - LASER_SENSOR_ERROR = "laser_sensor_error" - - class MideaB8Device(MideaDevice): """Midea B8 device.""" @@ -78,31 +50,31 @@ def __init__( model=model, subtype=subtype, attributes={ - DeviceAttributes.WORK_STATUS: None, - DeviceAttributes.FUNCTION_TYPE: None, - DeviceAttributes.CONTROL_TYPE: ControlType.NONE, - DeviceAttributes.MOVE_DIRECTION: Moviment.NONE, - DeviceAttributes.CLEAN_MODE: CleanMode.NONE, - DeviceAttributes.FAN_LEVEL: None, - DeviceAttributes.AREA: None, - DeviceAttributes.WATER_LEVEL: None, - DeviceAttributes.VOICE_VOLUME: 0, - DeviceAttributes.MOP: MopState.OFF, - DeviceAttributes.CARPET_SWITCH: False, - DeviceAttributes.SPEED: None, - DeviceAttributes.HAVE_RESERVE_TASK: False, - DeviceAttributes.BATTERY_PERCENT: 0, - DeviceAttributes.WORK_TIME: 0, - DeviceAttributes.UV_SWITCH: False, - DeviceAttributes.WIFI_SWITCH: False, - DeviceAttributes.VOICE_SWITCH: False, - DeviceAttributes.COMMAND_SOURCE: False, - DeviceAttributes.ERROR_TYPE: ErrorType.NO, - DeviceAttributes.ERROR_DESC: None, - DeviceAttributes.DEVICE_ERROR: False, - DeviceAttributes.BOARD_COMMUNICATION_ERROR: False, - DeviceAttributes.LASER_SENSOR_SHELTER: False, - DeviceAttributes.LASER_SENSOR_ERROR: False, + B8DeviceAttributes.WORK_STATUS: None, + B8DeviceAttributes.FUNCTION_TYPE: None, + B8DeviceAttributes.CONTROL_TYPE: B8ControlType.NONE, + B8DeviceAttributes.MOVE_DIRECTION: B8Moviment.NONE, + B8DeviceAttributes.CLEAN_MODE: B8CleanMode.NONE, + B8DeviceAttributes.FAN_LEVEL: None, + B8DeviceAttributes.AREA: None, + B8DeviceAttributes.WATER_LEVEL: None, + B8DeviceAttributes.VOICE_VOLUME: 0, + B8DeviceAttributes.MOP: B8MopState.OFF, + B8DeviceAttributes.CARPET_SWITCH: False, + B8DeviceAttributes.SPEED: None, + B8DeviceAttributes.HAVE_RESERVE_TASK: False, + B8DeviceAttributes.BATTERY_PERCENT: 0, + B8DeviceAttributes.WORK_TIME: 0, + B8DeviceAttributes.UV_SWITCH: False, + B8DeviceAttributes.WIFI_SWITCH: False, + B8DeviceAttributes.VOICE_SWITCH: False, + B8DeviceAttributes.COMMAND_SOURCE: False, + B8DeviceAttributes.ERROR_TYPE: B8ErrorType.NO, + B8DeviceAttributes.ERROR_DESC: None, + B8DeviceAttributes.DEVICE_ERROR: False, + B8DeviceAttributes.BOARD_COMMUNICATION_ERROR: False, + B8DeviceAttributes.LASER_SENSOR_SHELTER: False, + B8DeviceAttributes.LASER_SENSOR_ERROR: False, }, ) diff --git a/midealocal/devices/b8/const.py b/midealocal/devices/b8/const.py new file mode 100644 index 00000000..9665b9aa --- /dev/null +++ b/midealocal/devices/b8/const.py @@ -0,0 +1,189 @@ +"""Midea local B8 device const.""" + +from enum import IntEnum, StrEnum + + +class B8DeviceAttributes(StrEnum): + """Midea B8 device attributes.""" + + WORK_STATUS = "work_status" + FUNCTION_TYPE = "function_type" + CONTROL_TYPE = "control_type" + MOVE_DIRECTION = "move_direction" + CLEAN_MODE = "clean_mode" + FAN_LEVEL = "fan_level" + AREA = "area" + WATER_LEVEL = "water_level" + VOICE_VOLUME = "voice_volume" + MOP = "mop" + CARPET_SWITCH = "carpet_switch" + SPEED = "speed" + HAVE_RESERVE_TASK = "have_reserve_task" + BATTERY_PERCENT = "battery_percent" + WORK_TIME = "work_time" + UV_SWITCH = "uv_switch" + WIFI_SWITCH = "wifi_switch" + VOICE_SWITCH = "voice_switch" + COMMAND_SOURCE = "command_source" + ERROR_TYPE = "error_type" + ERROR_DESC = "error_desc" + DEVICE_ERROR = "device_error" + BOARD_COMMUNICATION_ERROR = "board_communication_error" + LASER_SENSOR_SHELTER = "laser_sensor_shelter" + LASER_SENSOR_ERROR = "laser_sensor_error" + + +class B8WorkStatus(IntEnum): + """Midea B8 work status.""" + + CHARGE = 0x01 + WORK = 0x02 + STOP = 0x03 + CHARGING_ON_DOCK = 0x04 + RESERVE_TASK_FINISHED = 0x05 + CHARGE_FINISH = 0x06 + CHARGING_WITH_WIRE = 0x07 + PAUSE = 0x08 + UPDATING = 0x09 + SAVING_MAP = 0x0A + ERROR = 0x0B + SLEEP = 0x0C + CHARGE_PAUSE = 0x0D + RELOCATE = 0x0E + ELECTROLYSED_WATER_MAKING = 0x0F + DUST_COLLECTING = 0x10 + BACK_DUST_COLLECTING = 0x11 + SLEEP_IN_STATION = 0x12 + + +class B8FunctionType(IntEnum): + """Midea B8 function type.""" + + DUST_BOX_CLEANING = 0x01 + WATER_TANK_CLEANING = 0x02 + + +class B8ControlType(IntEnum): + """Midea B8 control type.""" + + NONE = 0x0 + MANUAL = 0x1 + AUTO = 0x2 + + +class B8Moviment(IntEnum): + """Midea B8 movement.""" + + NONE = 0x0 + FORWARD = 0x1 + BACK = 0x2 + LEFT = 0x3 + RIGHT = 0x4 + + +class B8CleanMode(IntEnum): + """Midea B8 clean mode.""" + + NONE = 0x00 + RANDOM = 0x01 + ARC = 0x02 + EDGE = 0x03 + EMPHASES = 0x04 + SCREW = 0x05 + BED = 0x06 + WIDE_SCREW = 0x07 + AUTO = 0x08 + AREA = 0x09 + ZONE_INDEX = 0x0A + ZONE_RECT = 0x0B + PATH = 0x0C + + +class B8FanLevel(IntEnum): + """Midea B8 fan level.""" + + OFF = 0x0 + SOFT = 0x1 + NORMAL = 0x2 + HIGH = 0x3 + LOW = 0x4 + + +class B8WaterLevel(IntEnum): + """Midea B8 water level.""" + + OFF = 0x0 + LOW = 0x1 + NORMAL = 0x2 + HIGH = 0x3 + + +class B8MopState(StrEnum): + """Midea B8 mop state.""" + + OFF = "off" + ON = "on" + LACK_WATER = "lack_water" + + +class B8Speed(StrEnum): + """Midea B8 speed.""" + + LOW = "low" + HIGH = "high" + + +class B8ErrorType(IntEnum): + """Midea B8 error type.""" + + NO = 0x00 + CAN_FIX = 0x01 + REBOOT = 0x02 + WARNING = 0x03 + + +class B8ErrorCanFixDescription(IntEnum): + """Midea B8 error can fix description.""" + + NO = 0x0 + FIX_DUST = 0x01 + FIX_WHEEL_HANG = 0x02 + FIX_WHEEL_OVERLOAD = 0x03 + FIX_SIDE_BRUSH_OVERLOAD = 0x04 + FIX_ROLL_BRUSH_OVERLOAD = 0x05 + FIX_DUST_ENGINE = 0x06 + FIX_FRONT_PANEL = 0x07 + FIX_RADAR_MASK = 0x08 + FIX_DROP_SENSOR = 0x09 + FIX_LOW_BATTERY = 0x0A + FIX_ABNORMAL_POSTURE = 0x0B + FIX_LASER_SENSOR = 0x0C + FIX_EDGE_SENSOR = 0x0D + FIX_START_IN_FORBID_AREA = 0x0E + FIX_START_IN_STRONG_MAGNETIC = 0x0F + FIX_LASER_SENSOR_BLOCKED = 0x10 + + +class B8ErrorRebootDescription(IntEnum): + """Midea B8 error reboot description.""" + + NO = 0x00 + REBOOT_LASER_COMM_FAIL = 0x01 + REBOOT_ROBOT_COMM_FAIL = 0x02 + REBOOT_INNER_FAIL = 0x03 + + +class B8ErrorWarningDescription(IntEnum): + """Midea B8 error warning description.""" + + NO = 0x00 + WARN_LOCATION_FAIL = 0x01 + WARN_LOW_BATTERY = 0x02 + WARN_FULL_DUST = 0x03 + WARN_LOW_WATER = 0x04 + + +class B8StatusType(IntEnum): + """B8 Status Type.""" + + X01 = 0x01 diff --git a/midealocal/devices/b8/message.py b/midealocal/devices/b8/message.py index c65a3aa5..94048643 100644 --- a/midealocal/devices/b8/message.py +++ b/midealocal/devices/b8/message.py @@ -1,7 +1,21 @@ """Midea local B8 message.""" -from enum import IntEnum, StrEnum - +from midealocal.devices.b8.const import ( + B8CleanMode, + B8ControlType, + B8ErrorCanFixDescription, + B8ErrorRebootDescription, + B8ErrorType, + B8ErrorWarningDescription, + B8FanLevel, + B8FunctionType, + B8MopState, + B8Moviment, + B8Speed, + B8StatusType, + B8WaterLevel, + B8WorkStatus, +) from midealocal.message import ( BodyType, MessageBody, @@ -11,162 +25,6 @@ ) -class WorkStatus(IntEnum): - """Midea B8 work status.""" - - CHARGE = 0x01 - WORK = 0x02 - STOP = 0x03 - CHARGING_ON_DOCK = 0x04 - RESERVE_TASK_FINISHED = 0x05 - CHARGE_FINISH = 0x06 - CHARGING_WITH_WIRE = 0x07 - PAUSE = 0x08 - UPDATING = 0x09 - SAVING_MAP = 0x0A - ERROR = 0x0B - SLEEP = 0x0C - CHARGE_PAUSE = 0x0D - RELOCATE = 0x0E - ELECTROLYSED_WATER_MAKING = 0x0F - DUST_COLLECTING = 0x10 - BACK_DUST_COLLECTING = 0x11 - SLEEP_IN_STATION = 0x12 - - -class FunctionType(IntEnum): - """Midea B8 function type.""" - - DUST_BOX_CLEANING = 0x01 - WATER_TANK_CLEANING = 0x02 - - -class ControlType(IntEnum): - """Midea B8 control type.""" - - NONE = 0x0 - MANUAL = 0x1 - AUTO = 0x2 - - -class Moviment(IntEnum): - """Midea B8 movement.""" - - NONE = 0x0 - FORWARD = 0x1 - BACK = 0x2 - LEFT = 0x3 - RIGHT = 0x4 - - -class CleanMode(IntEnum): - """Midea B8 clean mode.""" - - NONE = 0x00 - RANDOM = 0x01 - ARC = 0x02 - EDGE = 0x03 - EMPHASES = 0x04 - SCREW = 0x05 - BED = 0x06 - WIDE_SCREW = 0x07 - AUTO = 0x08 - AREA = 0x09 - ZONE_INDEX = 0x0A - ZONE_RECT = 0x0B - PATH = 0x0C - - -class FanLevel(IntEnum): - """Midea B8 fan level.""" - - OFF = 0x0 - SOFT = 0x1 - NORMAL = 0x2 - HIGH = 0x3 - LOW = 0x4 - - -class WaterLevel(IntEnum): - """Midea B8 water level.""" - - OFF = 0x0 - LOW = 0x1 - NORMAL = 0x2 - HIGH = 0x3 - - -class MopState(StrEnum): - """Midea B8 mop state.""" - - OFF = "off" - ON = "on" - LACK_WATER = "lack_water" - - -class Speed(StrEnum): - """Midea B8 speed.""" - - LOW = "low" - HIGH = "high" - - -class ErrorType(IntEnum): - """Midea B8 error type.""" - - NO = 0x00 - CAN_FIX = 0x01 - REBOOT = 0x02 - WARNING = 0x03 - - -class ErrorCanFixDescription(IntEnum): - """Midea B8 error can fix description.""" - - NO = 0x0 - FIX_DUST = 0x01 - FIX_WHEEL_HANG = 0x02 - FIX_WHEEL_OVERLOAD = 0x03 - FIX_SIDE_BRUSH_OVERLOAD = 0x04 - FIX_ROLL_BRUSH_OVERLOAD = 0x05 - FIX_DUST_ENGINE = 0x06 - FIX_FRONT_PANEL = 0x07 - FIX_RADAR_MASK = 0x08 - FIX_DROP_SENSOR = 0x09 - FIX_LOW_BATTERY = 0x0A - FIX_ABNORMAL_POSTURE = 0x0B - FIX_LASER_SENSOR = 0x0C - FIX_EDGE_SENSOR = 0x0D - FIX_START_IN_FORBID_AREA = 0x0E - FIX_START_IN_STRONG_MAGNETIC = 0x0F - FIX_LASER_SENSOR_BLOCKED = 0x10 - - -class ErrorRebootDescription(IntEnum): - """Midea B8 error reboot description.""" - - NO = 0x00 - REBOOT_LASER_COMM_FAIL = 0x01 - REBOOT_ROBOT_COMM_FAIL = 0x02 - REBOOT_INNER_FAIL = 0x03 - - -class ErrorWarningDescription(IntEnum): - """Midea B8 error warning description.""" - - NO = 0x00 - WARN_LOCATION_FAIL = 0x01 - WARN_LOW_BATTERY = 0x02 - WARN_FULL_DUST = 0x03 - WARN_LOW_WATER = 0x04 - - -class StatusType(IntEnum): - """B8 Status Type.""" - - X01 = 0x01 - - class MessageB8Base(MessageRequest): """B8 message base.""" @@ -215,9 +73,9 @@ def __init__(self, protocol_version: int) -> None: message_type=MessageType.set, body_type=0x22, ) - self.clean_mode = CleanMode.AUTO - self.fan_level = FanLevel.NORMAL - self.water_level = WaterLevel.LOW + self.clean_mode = B8CleanMode.AUTO + self.fan_level = B8FanLevel.NORMAL + self.water_level = B8WaterLevel.LOW self.voice_volume = 0 self.zone_id = 0 @@ -243,30 +101,30 @@ class MessageB8WorkStatusBody(MessageBody): def __init__(self, body: bytearray) -> None: """Initialize B8 message work status body.""" super().__init__(body) - self.work_status = WorkStatus(body[2]) - self.function_type = FunctionType(body[3]) - self.control_type = ControlType(body[4]) - self.move_direction = Moviment(body[5]) - self.clean_mode = CleanMode(body[6]) - self.fan_level = FanLevel(body[7]) + self.work_status = B8WorkStatus(body[2]) + self.function_type = B8FunctionType(body[3]) + self.control_type = B8ControlType(body[4]) + self.move_direction = B8Moviment(body[5]) + self.clean_mode = B8CleanMode(body[6]) + self.fan_level = B8FanLevel(body[7]) self.area = body[8] - self.water_level = WaterLevel(body[9]) + self.water_level = B8WaterLevel(body[9]) self.voice_volume = body[10] mop = body[17] if mop == 0: - self.mop = MopState.OFF + self.mop = B8MopState.OFF elif mop == 1: - self.mop = MopState.ON + self.mop = B8MopState.ON else: - self.mop = MopState.LACK_WATER + self.mop = B8MopState.LACK_WATER self.carpet_switch = body[18] == 1 - self.speed = Speed.LOW if body[20] == 1 else Speed.HIGH + self.speed = B8Speed.LOW if body[20] == 1 else B8Speed.HIGH self.have_reserve_task = body[11] != 0 self.battery_percent = body[12] self.work_time = body[13] err_user_high = body[19] status_summary = body[14] - self.error_type = ErrorType(body[15]) + self.error_type = B8ErrorType(body[15]) self.uv_switch = status_summary & 0x01 > 0 self.wifi_switch = status_summary & 0x02 > 0 self.voice_switch = status_summary & 0x04 > 0 @@ -276,17 +134,17 @@ def __init__(self, body: bytearray) -> None: self.laser_sensor_shelter = err_user_high & 0x2 > 0 self.laser_sensor_error = err_user_high & 0x1 > 0 self.error_desc: ( - ErrorCanFixDescription - | ErrorRebootDescription - | ErrorWarningDescription + B8ErrorCanFixDescription + | B8ErrorRebootDescription + | B8ErrorWarningDescription | None ) = None - if self.error_type == ErrorType.CAN_FIX: - self.error_desc = ErrorCanFixDescription(body[16]) - elif self.error_type == ErrorType.REBOOT: - self.error_desc = ErrorRebootDescription(body[16]) - elif self.error_type == ErrorType.WARNING: - self.error_desc = ErrorWarningDescription(body[16]) + if self.error_type == B8ErrorType.CAN_FIX: + self.error_desc = B8ErrorCanFixDescription(body[16]) + elif self.error_type == B8ErrorType.REBOOT: + self.error_desc = B8ErrorRebootDescription(body[16]) + elif self.error_type == B8ErrorType.WARNING: + self.error_desc = B8ErrorWarningDescription(body[16]) class MessageB8Response(MessageResponse): @@ -311,7 +169,7 @@ def parse_body(message_type: MessageType, body: bytearray) -> MessageBody | None if ( message_type == MessageType.query and body_type == BodyType.X32 - and status_type == StatusType.X01 + and status_type == B8StatusType.X01 ): return MessageB8WorkStatusBody(body) return None From 8371ea4b85ed8184cad520a2bddee0b29462a832 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20Mind=C3=AAllo=20de=20Andrade?= Date: Mon, 15 Jul 2024 16:34:56 +0000 Subject: [PATCH 3/8] feat(b8): set attributes --- midealocal/devices/b8/__init__.py | 27 +++++++- tests/devices/b8/__init__.py | 1 + tests/devices/b8/device_b8_test.py | 101 +++++++++++++++++++++++++++++ 3 files changed, 127 insertions(+), 2 deletions(-) create mode 100644 tests/devices/b8/__init__.py create mode 100644 tests/devices/b8/device_b8_test.py diff --git a/midealocal/devices/b8/__init__.py b/midealocal/devices/b8/__init__.py index c96f3d4a..74e80d5a 100644 --- a/midealocal/devices/b8/__init__.py +++ b/midealocal/devices/b8/__init__.py @@ -10,12 +10,15 @@ B8ControlType, B8DeviceAttributes, B8ErrorType, + B8FanLevel, B8MopState, B8Moviment, + B8WaterLevel, ) from midealocal.devices.b8.message import ( MessageB8Response, MessageQuery, + MessageSet, ) _LOGGER = logging.getLogger(__name__) @@ -55,9 +58,9 @@ def __init__( B8DeviceAttributes.CONTROL_TYPE: B8ControlType.NONE, B8DeviceAttributes.MOVE_DIRECTION: B8Moviment.NONE, B8DeviceAttributes.CLEAN_MODE: B8CleanMode.NONE, - B8DeviceAttributes.FAN_LEVEL: None, + B8DeviceAttributes.FAN_LEVEL: B8FanLevel.OFF, B8DeviceAttributes.AREA: None, - B8DeviceAttributes.WATER_LEVEL: None, + B8DeviceAttributes.WATER_LEVEL: B8WaterLevel.OFF, B8DeviceAttributes.VOICE_VOLUME: 0, B8DeviceAttributes.MOP: B8MopState.OFF, B8DeviceAttributes.CARPET_SWITCH: False, @@ -96,8 +99,28 @@ def process_message(self, msg: bytes) -> dict[str, Any]: new_status[str(status)] = self._attributes[status] return new_status + def _gen_set_msg_default_values(self) -> MessageSet: + msg = MessageSet(self._protocol_version) + msg.clean_mode = self.attributes[B8DeviceAttributes.CLEAN_MODE] + msg.fan_level = self.attributes[B8DeviceAttributes.FAN_LEVEL] + msg.water_level = self.attributes[B8DeviceAttributes.WATER_LEVEL] + msg.voice_volume = self.attributes[B8DeviceAttributes.VOICE_VOLUME] + return msg + def set_attribute(self, attr: str, value: bool | int | str) -> None: """Midea B8 device set attribute.""" + msg = self._gen_set_msg_default_values() + if attr == B8DeviceAttributes.CLEAN_MODE: + msg.clean_mode = B8CleanMode[str(value).upper()] + elif attr == B8DeviceAttributes.FAN_LEVEL: + msg.fan_level = B8FanLevel[str(value).upper()] + elif attr == B8DeviceAttributes.WATER_LEVEL: + msg.water_level = B8WaterLevel[str(value).upper()] + elif attr == B8DeviceAttributes.VOICE_VOLUME: + msg.voice_volume = int(value) + + if msg is not None: + self.build_send(msg) class MideaAppliance(MideaB8Device): diff --git a/tests/devices/b8/__init__.py b/tests/devices/b8/__init__.py new file mode 100644 index 00000000..11853d73 --- /dev/null +++ b/tests/devices/b8/__init__.py @@ -0,0 +1 @@ +"""Midea local B8 device tests.""" diff --git a/tests/devices/b8/device_b8_test.py b/tests/devices/b8/device_b8_test.py new file mode 100644 index 00000000..89da8500 --- /dev/null +++ b/tests/devices/b8/device_b8_test.py @@ -0,0 +1,101 @@ +"""Test B8 Device.""" + +from unittest.mock import patch + +import pytest + +from midealocal.devices.b8 import MideaB8Device +from midealocal.devices.b8.const import ( + B8CleanMode, + B8ControlType, + B8DeviceAttributes, + B8ErrorType, + B8FanLevel, + B8MopState, + B8Moviment, + B8WaterLevel, +) +from midealocal.devices.b8.message import ( + MessageQuery, +) + + +class TestMideaB8Device: + """Test Midea B8 Device.""" + + device: MideaB8Device + + @pytest.fixture(autouse=True) + def _setup_device(self) -> None: + """Midea B8 Device setup.""" + self.device = MideaB8Device( + name="Test Device", + device_id=1, + ip_address="192.168.1.1", + port=12345, + token="AA", + key="BB", + protocol=1, + model="test_model", + subtype=1, + customize="", + ) + + def test_initial_attributes(self) -> None: + """Test initial attributes.""" + assert self.device.attributes[B8DeviceAttributes.WORK_STATUS] is None + assert self.device.attributes[B8DeviceAttributes.FUNCTION_TYPE] is None + assert ( + self.device.attributes[B8DeviceAttributes.CONTROL_TYPE] + == B8ControlType.NONE + ) + assert ( + self.device.attributes[B8DeviceAttributes.MOVE_DIRECTION] == B8Moviment.NONE + ) + assert self.device.attributes[B8DeviceAttributes.CLEAN_MODE] == B8CleanMode.NONE + assert self.device.attributes[B8DeviceAttributes.FAN_LEVEL] == B8FanLevel.OFF + assert self.device.attributes[B8DeviceAttributes.AREA] is None + assert ( + self.device.attributes[B8DeviceAttributes.WATER_LEVEL] == B8WaterLevel.OFF + ) + assert self.device.attributes[B8DeviceAttributes.VOICE_VOLUME] == 0 + assert self.device.attributes[B8DeviceAttributes.MOP] is B8MopState.OFF + assert self.device.attributes[B8DeviceAttributes.CARPET_SWITCH] is False + assert self.device.attributes[B8DeviceAttributes.SPEED] is None + assert self.device.attributes[B8DeviceAttributes.HAVE_RESERVE_TASK] is False + assert self.device.attributes[B8DeviceAttributes.BATTERY_PERCENT] == 0 + assert self.device.attributes[B8DeviceAttributes.WORK_TIME] == 0 + assert self.device.attributes[B8DeviceAttributes.UV_SWITCH] is False + assert self.device.attributes[B8DeviceAttributes.WIFI_SWITCH] is False + assert self.device.attributes[B8DeviceAttributes.VOICE_SWITCH] is False + assert self.device.attributes[B8DeviceAttributes.COMMAND_SOURCE] is False + assert self.device.attributes[B8DeviceAttributes.ERROR_TYPE] == B8ErrorType.NO + assert self.device.attributes[B8DeviceAttributes.ERROR_DESC] is None + assert self.device.attributes[B8DeviceAttributes.DEVICE_ERROR] is False + assert ( + self.device.attributes[B8DeviceAttributes.BOARD_COMMUNICATION_ERROR] + is False + ) + assert self.device.attributes[B8DeviceAttributes.LASER_SENSOR_SHELTER] is False + assert self.device.attributes[B8DeviceAttributes.LASER_SENSOR_ERROR] is False + + def test_set_attribute(self) -> None: + """Test set attribute.""" + with patch.object(self.device, "send_message_v2") as mock_build_send: + self.device.set_attribute(B8DeviceAttributes.CLEAN_MODE.value, "area") + mock_build_send.assert_called() + + self.device.set_attribute(B8DeviceAttributes.FAN_LEVEL.value, "normal") + mock_build_send.assert_called() + + self.device.set_attribute(B8DeviceAttributes.WATER_LEVEL.value, "normal") + mock_build_send.assert_called() + + self.device.set_attribute(B8DeviceAttributes.VOICE_VOLUME.value, 10) + mock_build_send.assert_called() + + def test_build_query(self) -> None: + """Test build query.""" + queries = self.device.build_query() + assert len(queries) == 1 + assert isinstance(queries[0], MessageQuery) From 954366778a03d1ea440e7d7ed6827f29d378ecbf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20Mind=C3=AAllo=20de=20Andrade?= Date: Mon, 15 Jul 2024 18:28:20 +0000 Subject: [PATCH 4/8] feat(b8): treat attribute types --- midealocal/devices/b8/__init__.py | 49 ++++++++++++++++++------------ tests/devices/b8/device_b8_test.py | 43 +++++++++++++++++++------- 2 files changed, 61 insertions(+), 31 deletions(-) diff --git a/midealocal/devices/b8/__init__.py b/midealocal/devices/b8/__init__.py index 74e80d5a..16225418 100644 --- a/midealocal/devices/b8/__init__.py +++ b/midealocal/devices/b8/__init__.py @@ -55,12 +55,12 @@ def __init__( attributes={ B8DeviceAttributes.WORK_STATUS: None, B8DeviceAttributes.FUNCTION_TYPE: None, - B8DeviceAttributes.CONTROL_TYPE: B8ControlType.NONE, - B8DeviceAttributes.MOVE_DIRECTION: B8Moviment.NONE, - B8DeviceAttributes.CLEAN_MODE: B8CleanMode.NONE, - B8DeviceAttributes.FAN_LEVEL: B8FanLevel.OFF, + B8DeviceAttributes.CONTROL_TYPE: B8ControlType.NONE.name.lower(), + B8DeviceAttributes.MOVE_DIRECTION: B8Moviment.NONE.name.lower(), + B8DeviceAttributes.CLEAN_MODE: B8CleanMode.NONE.name.lower(), + B8DeviceAttributes.FAN_LEVEL: B8FanLevel.OFF.name.lower(), B8DeviceAttributes.AREA: None, - B8DeviceAttributes.WATER_LEVEL: B8WaterLevel.OFF, + B8DeviceAttributes.WATER_LEVEL: B8WaterLevel.OFF.name.lower(), B8DeviceAttributes.VOICE_VOLUME: 0, B8DeviceAttributes.MOP: B8MopState.OFF, B8DeviceAttributes.CARPET_SWITCH: False, @@ -72,7 +72,7 @@ def __init__( B8DeviceAttributes.WIFI_SWITCH: False, B8DeviceAttributes.VOICE_SWITCH: False, B8DeviceAttributes.COMMAND_SOURCE: False, - B8DeviceAttributes.ERROR_TYPE: B8ErrorType.NO, + B8DeviceAttributes.ERROR_TYPE: B8ErrorType.NO.name.lower(), B8DeviceAttributes.ERROR_DESC: None, B8DeviceAttributes.DEVICE_ERROR: False, B8DeviceAttributes.BOARD_COMMUNICATION_ERROR: False, @@ -101,26 +101,35 @@ def process_message(self, msg: bytes) -> dict[str, Any]: def _gen_set_msg_default_values(self) -> MessageSet: msg = MessageSet(self._protocol_version) - msg.clean_mode = self.attributes[B8DeviceAttributes.CLEAN_MODE] - msg.fan_level = self.attributes[B8DeviceAttributes.FAN_LEVEL] - msg.water_level = self.attributes[B8DeviceAttributes.WATER_LEVEL] + msg.clean_mode = B8CleanMode[ + self.attributes[B8DeviceAttributes.CLEAN_MODE].upper() + ] + msg.fan_level = B8FanLevel[ + self.attributes[B8DeviceAttributes.FAN_LEVEL].upper() + ] + msg.water_level = B8WaterLevel[ + self.attributes[B8DeviceAttributes.WATER_LEVEL].upper() + ] msg.voice_volume = self.attributes[B8DeviceAttributes.VOICE_VOLUME] return msg def set_attribute(self, attr: str, value: bool | int | str) -> None: """Midea B8 device set attribute.""" - msg = self._gen_set_msg_default_values() - if attr == B8DeviceAttributes.CLEAN_MODE: - msg.clean_mode = B8CleanMode[str(value).upper()] - elif attr == B8DeviceAttributes.FAN_LEVEL: - msg.fan_level = B8FanLevel[str(value).upper()] - elif attr == B8DeviceAttributes.WATER_LEVEL: - msg.water_level = B8WaterLevel[str(value).upper()] - elif attr == B8DeviceAttributes.VOICE_VOLUME: - msg.voice_volume = int(value) + try: + msg = self._gen_set_msg_default_values() + if attr == B8DeviceAttributes.CLEAN_MODE: + msg.clean_mode = B8CleanMode[str(value).upper()] + elif attr == B8DeviceAttributes.FAN_LEVEL: + msg.fan_level = B8FanLevel[str(value).upper()] + elif attr == B8DeviceAttributes.WATER_LEVEL: + msg.water_level = B8WaterLevel[str(value).upper()] + elif attr == B8DeviceAttributes.VOICE_VOLUME: + msg.voice_volume = int(value) - if msg is not None: - self.build_send(msg) + if msg is not None: + self.build_send(msg) + except KeyError: + _LOGGER.exception("Wrong value for attribute %s: %s", attr, value) class MideaAppliance(MideaB8Device): diff --git a/tests/devices/b8/device_b8_test.py b/tests/devices/b8/device_b8_test.py index 89da8500..efbf1f2f 100644 --- a/tests/devices/b8/device_b8_test.py +++ b/tests/devices/b8/device_b8_test.py @@ -47,19 +47,30 @@ def test_initial_attributes(self) -> None: assert self.device.attributes[B8DeviceAttributes.FUNCTION_TYPE] is None assert ( self.device.attributes[B8DeviceAttributes.CONTROL_TYPE] - == B8ControlType.NONE + == B8ControlType.NONE.name.lower() ) assert ( - self.device.attributes[B8DeviceAttributes.MOVE_DIRECTION] == B8Moviment.NONE + self.device.attributes[B8DeviceAttributes.MOVE_DIRECTION] + == B8Moviment.NONE.name.lower() + ) + assert ( + self.device.attributes[B8DeviceAttributes.CLEAN_MODE] + == B8CleanMode.NONE.name.lower() + ) + assert ( + self.device.attributes[B8DeviceAttributes.FAN_LEVEL] + == B8FanLevel.OFF.name.lower() ) - assert self.device.attributes[B8DeviceAttributes.CLEAN_MODE] == B8CleanMode.NONE - assert self.device.attributes[B8DeviceAttributes.FAN_LEVEL] == B8FanLevel.OFF assert self.device.attributes[B8DeviceAttributes.AREA] is None assert ( - self.device.attributes[B8DeviceAttributes.WATER_LEVEL] == B8WaterLevel.OFF + self.device.attributes[B8DeviceAttributes.WATER_LEVEL] + == B8WaterLevel.OFF.name.lower() ) assert self.device.attributes[B8DeviceAttributes.VOICE_VOLUME] == 0 - assert self.device.attributes[B8DeviceAttributes.MOP] is B8MopState.OFF + assert ( + self.device.attributes[B8DeviceAttributes.MOP] + == B8MopState.OFF.name.lower() + ) assert self.device.attributes[B8DeviceAttributes.CARPET_SWITCH] is False assert self.device.attributes[B8DeviceAttributes.SPEED] is None assert self.device.attributes[B8DeviceAttributes.HAVE_RESERVE_TASK] is False @@ -69,7 +80,10 @@ def test_initial_attributes(self) -> None: assert self.device.attributes[B8DeviceAttributes.WIFI_SWITCH] is False assert self.device.attributes[B8DeviceAttributes.VOICE_SWITCH] is False assert self.device.attributes[B8DeviceAttributes.COMMAND_SOURCE] is False - assert self.device.attributes[B8DeviceAttributes.ERROR_TYPE] == B8ErrorType.NO + assert ( + self.device.attributes[B8DeviceAttributes.ERROR_TYPE] + == B8ErrorType.NO.name.lower() + ) assert self.device.attributes[B8DeviceAttributes.ERROR_DESC] is None assert self.device.attributes[B8DeviceAttributes.DEVICE_ERROR] is False assert ( @@ -83,16 +97,23 @@ def test_set_attribute(self) -> None: """Test set attribute.""" with patch.object(self.device, "send_message_v2") as mock_build_send: self.device.set_attribute(B8DeviceAttributes.CLEAN_MODE.value, "area") - mock_build_send.assert_called() + mock_build_send.assert_called_once() + mock_build_send.reset_mock() self.device.set_attribute(B8DeviceAttributes.FAN_LEVEL.value, "normal") - mock_build_send.assert_called() + mock_build_send.assert_called_once() + mock_build_send.reset_mock() self.device.set_attribute(B8DeviceAttributes.WATER_LEVEL.value, "normal") - mock_build_send.assert_called() + mock_build_send.assert_called_once() + mock_build_send.reset_mock() self.device.set_attribute(B8DeviceAttributes.VOICE_VOLUME.value, 10) - mock_build_send.assert_called() + mock_build_send.assert_called_once() + mock_build_send.reset_mock() + + self.device.set_attribute(B8DeviceAttributes.WATER_LEVEL.value, "invalid") + mock_build_send.assert_not_called() def test_build_query(self) -> None: """Test build query.""" From 3bb29572f702b28ca5180965866df118ea00642e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20Mind=C3=AAllo=20de=20Andrade?= Date: Tue, 16 Jul 2024 18:42:13 +0000 Subject: [PATCH 5/8] feat(b8): set work mode --- midealocal/devices/b8/__init__.py | 14 ++++++++++++++ midealocal/devices/b8/const.py | 9 +++++++++ midealocal/devices/b8/message.py | 24 ++++++++++++++++++++++++ 3 files changed, 47 insertions(+) diff --git a/midealocal/devices/b8/__init__.py b/midealocal/devices/b8/__init__.py index 16225418..3370400e 100644 --- a/midealocal/devices/b8/__init__.py +++ b/midealocal/devices/b8/__init__.py @@ -14,11 +14,13 @@ B8MopState, B8Moviment, B8WaterLevel, + B8WorkMode, ) from midealocal.devices.b8.message import ( MessageB8Response, MessageQuery, MessageSet, + MessageSetCommand, ) _LOGGER = logging.getLogger(__name__) @@ -113,6 +115,18 @@ def _gen_set_msg_default_values(self) -> MessageSet: msg.voice_volume = self.attributes[B8DeviceAttributes.VOICE_VOLUME] return msg + def set_work_mode(self, work_mode: B8WorkMode) -> None: + """Midea B8 device set work mode.""" + if work_mode == B8WorkMode.WORK: + self.set_attribute( + B8DeviceAttributes.CLEAN_MODE, + self.attributes[B8DeviceAttributes.CLEAN_MODE], + ) + return + + msg = MessageSetCommand(self._protocol_version, work_mode=work_mode) + self.build_send(msg) + def set_attribute(self, attr: str, value: bool | int | str) -> None: """Midea B8 device set attribute.""" try: diff --git a/midealocal/devices/b8/const.py b/midealocal/devices/b8/const.py index 9665b9aa..d75a6c8d 100644 --- a/midealocal/devices/b8/const.py +++ b/midealocal/devices/b8/const.py @@ -33,6 +33,15 @@ class B8DeviceAttributes(StrEnum): LASER_SENSOR_ERROR = "laser_sensor_error" +class B8WorkMode(IntEnum): + """Midea B8 work mode.""" + + CHARGE = 0x01 + WORK = 0x02 + STOP = 0x03 + PAUSE = 0x1B + + class B8WorkStatus(IntEnum): """Midea B8 work status.""" diff --git a/midealocal/devices/b8/message.py b/midealocal/devices/b8/message.py index 94048643..f9c69bfd 100644 --- a/midealocal/devices/b8/message.py +++ b/midealocal/devices/b8/message.py @@ -14,6 +14,7 @@ B8Speed, B8StatusType, B8WaterLevel, + B8WorkMode, B8WorkStatus, ) from midealocal.message import ( @@ -95,6 +96,29 @@ def _body(self) -> bytearray: ) +class MessageSetCommand(MessageB8Base): + """B8 message set command.""" + + def __init__(self, protocol_version: int, work_mode: B8WorkMode) -> None: + """Initialize B8 message set command.""" + super().__init__( + protocol_version=protocol_version, + message_type=MessageType.set, + body_type=0x22, + ) + self.work_mode = work_mode + + @property + def _body(self) -> bytearray: + return bytearray( + [ + self.work_mode, + 0x00, + 0x00, + ], + ) + + class MessageB8WorkStatusBody(MessageBody): """B8 message work status body.""" From da41b0e93173f7447aebfe07c3da743e87b6da4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20Mind=C3=AAllo=20de=20Andrade?= Date: Tue, 23 Jul 2024 17:37:40 +0000 Subject: [PATCH 6/8] feat(b8): notify message --- midealocal/devices/b8/__init__.py | 16 ++- midealocal/devices/b8/const.py | 16 +-- midealocal/devices/b8/message.py | 173 +++++++++++++++++++++-------- midealocal/message.py | 1 + tests/devices/b8/device_b8_test.py | 22 +++- 5 files changed, 165 insertions(+), 63 deletions(-) diff --git a/midealocal/devices/b8/__init__.py b/midealocal/devices/b8/__init__.py index 3370400e..7d63c09e 100644 --- a/midealocal/devices/b8/__init__.py +++ b/midealocal/devices/b8/__init__.py @@ -9,12 +9,16 @@ B8CleanMode, B8ControlType, B8DeviceAttributes, + B8ErrorCanFixDescription, B8ErrorType, B8FanLevel, + B8FunctionType, B8MopState, B8Moviment, + B8Speed, B8WaterLevel, B8WorkMode, + B8WorkStatus, ) from midealocal.devices.b8.message import ( MessageB8Response, @@ -55,18 +59,18 @@ def __init__( model=model, subtype=subtype, attributes={ - B8DeviceAttributes.WORK_STATUS: None, - B8DeviceAttributes.FUNCTION_TYPE: None, + B8DeviceAttributes.WORK_STATUS: B8WorkStatus.NONE.name.lower(), + B8DeviceAttributes.FUNCTION_TYPE: B8FunctionType.NONE.name.lower(), B8DeviceAttributes.CONTROL_TYPE: B8ControlType.NONE.name.lower(), B8DeviceAttributes.MOVE_DIRECTION: B8Moviment.NONE.name.lower(), B8DeviceAttributes.CLEAN_MODE: B8CleanMode.NONE.name.lower(), B8DeviceAttributes.FAN_LEVEL: B8FanLevel.OFF.name.lower(), - B8DeviceAttributes.AREA: None, + B8DeviceAttributes.AREA: 0, B8DeviceAttributes.WATER_LEVEL: B8WaterLevel.OFF.name.lower(), B8DeviceAttributes.VOICE_VOLUME: 0, - B8DeviceAttributes.MOP: B8MopState.OFF, + B8DeviceAttributes.MOP: B8MopState.OFF.name.lower(), B8DeviceAttributes.CARPET_SWITCH: False, - B8DeviceAttributes.SPEED: None, + B8DeviceAttributes.SPEED: B8Speed.HIGH.name.lower(), B8DeviceAttributes.HAVE_RESERVE_TASK: False, B8DeviceAttributes.BATTERY_PERCENT: 0, B8DeviceAttributes.WORK_TIME: 0, @@ -75,7 +79,7 @@ def __init__( B8DeviceAttributes.VOICE_SWITCH: False, B8DeviceAttributes.COMMAND_SOURCE: False, B8DeviceAttributes.ERROR_TYPE: B8ErrorType.NO.name.lower(), - B8DeviceAttributes.ERROR_DESC: None, + B8DeviceAttributes.ERROR_DESC: B8ErrorCanFixDescription.NO.name.lower(), B8DeviceAttributes.DEVICE_ERROR: False, B8DeviceAttributes.BOARD_COMMUNICATION_ERROR: False, B8DeviceAttributes.LASER_SENSOR_SHELTER: False, diff --git a/midealocal/devices/b8/const.py b/midealocal/devices/b8/const.py index d75a6c8d..eb3d43e8 100644 --- a/midealocal/devices/b8/const.py +++ b/midealocal/devices/b8/const.py @@ -45,6 +45,7 @@ class B8WorkMode(IntEnum): class B8WorkStatus(IntEnum): """Midea B8 work status.""" + NONE = 0x00 CHARGE = 0x01 WORK = 0x02 STOP = 0x03 @@ -68,6 +69,7 @@ class B8WorkStatus(IntEnum): class B8FunctionType(IntEnum): """Midea B8 function type.""" + NONE = 0x00 DUST_BOX_CLEANING = 0x01 WATER_TANK_CLEANING = 0x02 @@ -127,19 +129,19 @@ class B8WaterLevel(IntEnum): HIGH = 0x3 -class B8MopState(StrEnum): +class B8MopState(IntEnum): """Midea B8 mop state.""" - OFF = "off" - ON = "on" - LACK_WATER = "lack_water" + OFF = 0x0 + ON = 0x1 + LACK_WATER = 0x2 -class B8Speed(StrEnum): +class B8Speed(IntEnum): """Midea B8 speed.""" - LOW = "low" - HIGH = "high" + LOW = 0x1 + HIGH = 0x0 class B8ErrorType(IntEnum): diff --git a/midealocal/devices/b8/message.py b/midealocal/devices/b8/message.py index f9c69bfd..038e6e35 100644 --- a/midealocal/devices/b8/message.py +++ b/midealocal/devices/b8/message.py @@ -3,6 +3,7 @@ from midealocal.devices.b8.const import ( B8CleanMode, B8ControlType, + B8DeviceAttributes, B8ErrorCanFixDescription, B8ErrorRebootDescription, B8ErrorType, @@ -19,6 +20,9 @@ ) from midealocal.message import ( BodyType, + BoolParser, + IntEnumParser, + IntParser, MessageBody, MessageRequest, MessageResponse, @@ -119,56 +123,133 @@ def _body(self) -> bytearray: ) -class MessageB8WorkStatusBody(MessageBody): - """B8 message work status body.""" +class MessageB8GenericBody(MessageBody): + """B8 message generic body.""" - def __init__(self, body: bytearray) -> None: - """Initialize B8 message work status body.""" + def __init__(self, body: bytearray, offset: int) -> None: + """Initialize B8 message generic body.""" super().__init__(body) - self.work_status = B8WorkStatus(body[2]) - self.function_type = B8FunctionType(body[3]) - self.control_type = B8ControlType(body[4]) - self.move_direction = B8Moviment(body[5]) - self.clean_mode = B8CleanMode(body[6]) - self.fan_level = B8FanLevel(body[7]) - self.area = body[8] - self.water_level = B8WaterLevel(body[9]) - self.voice_volume = body[10] - mop = body[17] - if mop == 0: - self.mop = B8MopState.OFF - elif mop == 1: - self.mop = B8MopState.ON - else: - self.mop = B8MopState.LACK_WATER - self.carpet_switch = body[18] == 1 - self.speed = B8Speed.LOW if body[20] == 1 else B8Speed.HIGH - self.have_reserve_task = body[11] != 0 - self.battery_percent = body[12] - self.work_time = body[13] - err_user_high = body[19] - status_summary = body[14] - self.error_type = B8ErrorType(body[15]) - self.uv_switch = status_summary & 0x01 > 0 - self.wifi_switch = status_summary & 0x02 > 0 - self.voice_switch = status_summary & 0x04 > 0 - self.command_source = status_summary & 0x40 > 0 - self.device_error = status_summary & 0x80 > 0 - self.board_communication_error = err_user_high & 0x4 > 0 - self.laser_sensor_shelter = err_user_high & 0x2 > 0 - self.laser_sensor_error = err_user_high & 0x1 > 0 + self.parser_list.extend( + [ + IntEnumParser[B8WorkStatus]( + B8DeviceAttributes.WORK_STATUS, + 1 + offset, + B8WorkStatus, + ), + IntEnumParser[B8FunctionType]( + B8DeviceAttributes.FUNCTION_TYPE, + 2 + offset, + B8FunctionType, + ), + IntEnumParser[B8ControlType]( + B8DeviceAttributes.CONTROL_TYPE, + 3 + offset, + B8ControlType, + ), + IntEnumParser[B8Moviment]( + B8DeviceAttributes.MOVE_DIRECTION, + 4 + offset, + B8Moviment, + ), + IntEnumParser[B8CleanMode]( + B8DeviceAttributes.CLEAN_MODE, + 5 + offset, + B8CleanMode, + ), + IntEnumParser[B8FanLevel]( + B8DeviceAttributes.FAN_LEVEL, + 6 + offset, + B8FanLevel, + ), + IntParser(B8DeviceAttributes.AREA, 7 + offset), + IntEnumParser[B8WaterLevel]( + B8DeviceAttributes.WATER_LEVEL, + 8 + offset, + B8WaterLevel, + ), + IntParser(B8DeviceAttributes.VOICE_VOLUME, 9 + offset, max_value=100), + BoolParser( + B8DeviceAttributes.HAVE_RESERVE_TASK, + 10 + offset, + ), + IntParser( + B8DeviceAttributes.BATTERY_PERCENT, + 11 + offset, + max_value=100, + ), + IntParser(B8DeviceAttributes.WORK_TIME, 12 + offset), + BoolParser(B8DeviceAttributes.UV_SWITCH, 13 + offset, bit=0), + BoolParser(B8DeviceAttributes.WIFI_SWITCH, 13 + offset, bit=1), + BoolParser(B8DeviceAttributes.VOICE_SWITCH, 13 + offset, bit=2), + BoolParser(B8DeviceAttributes.COMMAND_SOURCE, 13 + offset, bit=6), + BoolParser(B8DeviceAttributes.DEVICE_ERROR, 13 + offset, bit=7), + IntEnumParser[B8ErrorType]( + B8DeviceAttributes.ERROR_TYPE, + 14 + offset, + B8ErrorType, + ), + IntEnumParser[B8MopState]( + B8DeviceAttributes.MOP, + 16 + offset, + B8MopState, + default_value=B8MopState.LACK_WATER, + ), + BoolParser(B8DeviceAttributes.CARPET_SWITCH, 17 + offset), + BoolParser( + B8DeviceAttributes.LASER_SENSOR_ERROR, + 18 + offset, + bit=0, + ), + BoolParser( + B8DeviceAttributes.LASER_SENSOR_SHELTER, + 18 + offset, + bit=1, + ), + BoolParser( + B8DeviceAttributes.BOARD_COMMUNICATION_ERROR, + 18 + offset, + bit=2, + ), + IntEnumParser[B8Speed](B8DeviceAttributes.SPEED, 19 + offset, B8Speed), + ], + ) + self.parse_all() + + # Error description without parser self.error_desc: ( B8ErrorCanFixDescription | B8ErrorRebootDescription | B8ErrorWarningDescription - | None - ) = None - if self.error_type == B8ErrorType.CAN_FIX: - self.error_desc = B8ErrorCanFixDescription(body[16]) - elif self.error_type == B8ErrorType.REBOOT: - self.error_desc = B8ErrorRebootDescription(body[16]) - elif self.error_type == B8ErrorType.WARNING: - self.error_desc = B8ErrorWarningDescription(body[16]) + ) = B8ErrorCanFixDescription.NO + error_type = getattr(self, B8DeviceAttributes.ERROR_TYPE, B8ErrorType.NO) + if error_type == B8ErrorType.CAN_FIX: + self.error_desc = B8ErrorCanFixDescription( + self.read_byte(body, 15 + offset), + ) + elif error_type == B8ErrorType.REBOOT: + self.error_desc = B8ErrorRebootDescription( + self.read_byte(body, 15 + offset), + ) + elif error_type == B8ErrorType.WARNING: + self.error_desc = B8ErrorWarningDescription( + self.read_byte(body, 15 + offset), + ) + + +class MessageB8WorkStatusBody(MessageB8GenericBody): + """B8 message work status body.""" + + def __init__(self, body: bytearray) -> None: + """Initialize B8 message work status body.""" + super().__init__(body, 1) + + +class MessageB8NotifyBody(MessageB8GenericBody): + """B8 message notify body.""" + + def __init__(self, body: bytearray) -> None: + """Initialize B8 message notify body.""" + super().__init__(body, 0) class MessageB8Response(MessageResponse): @@ -183,7 +264,7 @@ def __init__(self, message: bytes) -> None: ) if body is not None: self.set_body(body) - self.set_attr() + self.set_attr() @staticmethod def parse_body(message_type: MessageType, body: bytearray) -> MessageBody | None: @@ -196,4 +277,6 @@ def parse_body(message_type: MessageType, body: bytearray) -> MessageBody | None and status_type == B8StatusType.X01 ): return MessageB8WorkStatusBody(body) + if message_type == MessageType.notify1 and body_type == BodyType.X42: + return MessageB8NotifyBody(body) return None diff --git a/midealocal/message.py b/midealocal/message.py index 070d05ad..1dd6dcaa 100644 --- a/midealocal/message.py +++ b/midealocal/message.py @@ -38,6 +38,7 @@ class BodyType(IntEnum): X31 = 0x31 X32 = 0x32 X41 = 0x41 + X42 = 0x42 X80 = 0x80 diff --git a/tests/devices/b8/device_b8_test.py b/tests/devices/b8/device_b8_test.py index efbf1f2f..a8259ca5 100644 --- a/tests/devices/b8/device_b8_test.py +++ b/tests/devices/b8/device_b8_test.py @@ -11,9 +11,12 @@ B8DeviceAttributes, B8ErrorType, B8FanLevel, + B8FunctionType, B8MopState, B8Moviment, + B8Speed, B8WaterLevel, + B8WorkStatus, ) from midealocal.devices.b8.message import ( MessageQuery, @@ -43,8 +46,14 @@ def _setup_device(self) -> None: def test_initial_attributes(self) -> None: """Test initial attributes.""" - assert self.device.attributes[B8DeviceAttributes.WORK_STATUS] is None - assert self.device.attributes[B8DeviceAttributes.FUNCTION_TYPE] is None + assert ( + self.device.attributes[B8DeviceAttributes.WORK_STATUS] + == B8WorkStatus.NONE.name.lower() + ) + assert ( + self.device.attributes[B8DeviceAttributes.FUNCTION_TYPE] + == B8FunctionType.NONE.name.lower() + ) assert ( self.device.attributes[B8DeviceAttributes.CONTROL_TYPE] == B8ControlType.NONE.name.lower() @@ -61,7 +70,7 @@ def test_initial_attributes(self) -> None: self.device.attributes[B8DeviceAttributes.FAN_LEVEL] == B8FanLevel.OFF.name.lower() ) - assert self.device.attributes[B8DeviceAttributes.AREA] is None + assert self.device.attributes[B8DeviceAttributes.AREA] == 0 assert ( self.device.attributes[B8DeviceAttributes.WATER_LEVEL] == B8WaterLevel.OFF.name.lower() @@ -72,7 +81,10 @@ def test_initial_attributes(self) -> None: == B8MopState.OFF.name.lower() ) assert self.device.attributes[B8DeviceAttributes.CARPET_SWITCH] is False - assert self.device.attributes[B8DeviceAttributes.SPEED] is None + assert ( + self.device.attributes[B8DeviceAttributes.SPEED] + == B8Speed.HIGH.name.lower() + ) assert self.device.attributes[B8DeviceAttributes.HAVE_RESERVE_TASK] is False assert self.device.attributes[B8DeviceAttributes.BATTERY_PERCENT] == 0 assert self.device.attributes[B8DeviceAttributes.WORK_TIME] == 0 @@ -84,7 +96,7 @@ def test_initial_attributes(self) -> None: self.device.attributes[B8DeviceAttributes.ERROR_TYPE] == B8ErrorType.NO.name.lower() ) - assert self.device.attributes[B8DeviceAttributes.ERROR_DESC] is None + assert self.device.attributes[B8DeviceAttributes.ERROR_DESC] == "no" assert self.device.attributes[B8DeviceAttributes.DEVICE_ERROR] is False assert ( self.device.attributes[B8DeviceAttributes.BOARD_COMMUNICATION_ERROR] From 51bb85591040c94bdeaf3c27dfdcf5226b7f40f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20Mind=C3=AAllo=20de=20Andrade?= Date: Sun, 28 Jul 2024 23:56:55 +0000 Subject: [PATCH 7/8] test(b8): additional testing --- tests/devices/b8/device_b8_test.py | 289 +++++++++++++++++++++++++++++ 1 file changed, 289 insertions(+) diff --git a/tests/devices/b8/device_b8_test.py b/tests/devices/b8/device_b8_test.py index a8259ca5..b2afdcd4 100644 --- a/tests/devices/b8/device_b8_test.py +++ b/tests/devices/b8/device_b8_test.py @@ -9,18 +9,23 @@ B8CleanMode, B8ControlType, B8DeviceAttributes, + B8ErrorCanFixDescription, + B8ErrorRebootDescription, B8ErrorType, + B8ErrorWarningDescription, B8FanLevel, B8FunctionType, B8MopState, B8Moviment, B8Speed, B8WaterLevel, + B8WorkMode, B8WorkStatus, ) from midealocal.devices.b8.message import ( MessageQuery, ) +from midealocal.message import MessageType class TestMideaB8Device: @@ -127,8 +132,292 @@ def test_set_attribute(self) -> None: self.device.set_attribute(B8DeviceAttributes.WATER_LEVEL.value, "invalid") mock_build_send.assert_not_called() + def test_set_work_mode(self) -> None: + """Test set work mode.""" + with patch.object(self.device, "send_message_v2") as mock_build_send: + self.device.set_work_mode(B8WorkMode.CHARGE) + mock_build_send.assert_called_once() + mock_build_send.reset_mock() + + self.device.set_work_mode(B8WorkMode.WORK) + mock_build_send.assert_called_once() + def test_build_query(self) -> None: """Test build query.""" queries = self.device.build_query() assert len(queries) == 1 assert isinstance(queries[0], MessageQuery) + + def test_query_response(self) -> None: + """Test query response.""" + header = bytearray([0xAA] + ([0x0] * 8) + [MessageType.query]) + body = bytearray( + [ + 0x32, + 0x1, + B8WorkStatus.CHARGING_WITH_WIRE, + B8FunctionType.NONE, + B8ControlType.AUTO, + B8Moviment.NONE, + B8CleanMode.AUTO, + B8FanLevel.NORMAL, + 0, + B8WaterLevel.NORMAL, + 40, + 0, + 80, + 20, + 0xC7, + B8ErrorType.CAN_FIX, + B8ErrorCanFixDescription.FIX_DUST, + B8MopState.ON, + 0x01, + 0x07, + B8Speed.HIGH, + 0x0, # CRC + ], + ) + self.device.process_message(bytearray(header + body)) + assert ( + self.device.attributes[B8DeviceAttributes.WORK_STATUS] + == "charging_with_wire" + ) + assert self.device.attributes[B8DeviceAttributes.FUNCTION_TYPE] == "none" + assert self.device.attributes[B8DeviceAttributes.CONTROL_TYPE] == "auto" + assert self.device.attributes[B8DeviceAttributes.MOVE_DIRECTION] == "none" + assert self.device.attributes[B8DeviceAttributes.CLEAN_MODE] == "auto" + assert self.device.attributes[B8DeviceAttributes.FAN_LEVEL] == "normal" + assert self.device.attributes[B8DeviceAttributes.AREA] == 0 + assert self.device.attributes[B8DeviceAttributes.WATER_LEVEL] == "normal" + assert self.device.attributes[B8DeviceAttributes.VOICE_VOLUME] == 40 + assert self.device.attributes[B8DeviceAttributes.HAVE_RESERVE_TASK] is False + assert self.device.attributes[B8DeviceAttributes.BATTERY_PERCENT] == 80 + assert self.device.attributes[B8DeviceAttributes.WORK_TIME] == 20 + assert self.device.attributes[B8DeviceAttributes.UV_SWITCH] is True + assert self.device.attributes[B8DeviceAttributes.WIFI_SWITCH] is True + assert self.device.attributes[B8DeviceAttributes.VOICE_SWITCH] is True + assert self.device.attributes[B8DeviceAttributes.COMMAND_SOURCE] is True + assert self.device.attributes[B8DeviceAttributes.DEVICE_ERROR] is True + assert self.device.attributes[B8DeviceAttributes.ERROR_TYPE] == "can_fix" + assert self.device.attributes[B8DeviceAttributes.ERROR_DESC] == "fix_dust" + assert self.device.attributes[B8DeviceAttributes.MOP] == "on" + assert self.device.attributes[B8DeviceAttributes.CARPET_SWITCH] is True + assert self.device.attributes[B8DeviceAttributes.LASER_SENSOR_ERROR] is True + assert self.device.attributes[B8DeviceAttributes.LASER_SENSOR_SHELTER] is True + assert ( + self.device.attributes[B8DeviceAttributes.BOARD_COMMUNICATION_ERROR] is True + ) + assert self.device.attributes[B8DeviceAttributes.SPEED] == "high" + + def test_notify_response(self) -> None: + """Test notify response.""" + header = bytearray([0xAA] + ([0x0] * 8) + [MessageType.notify1]) + body = bytearray( + [ + 0x42, + B8WorkStatus.WORK, + B8FunctionType.DUST_BOX_CLEANING, + B8ControlType.MANUAL, + B8Moviment.LEFT, + B8CleanMode.PATH, + B8FanLevel.HIGH, + 1, + B8WaterLevel.LOW, + 90, + 1, + 40, + 15, + 0x86, + B8ErrorType.WARNING, + B8ErrorWarningDescription.WARN_FULL_DUST, + B8MopState.LACK_WATER, + 0x00, + 0x06, + B8Speed.LOW, + 0x0, # CRC + ], + ) + self.device.process_message(bytearray(header + body)) + assert self.device.attributes[B8DeviceAttributes.WORK_STATUS] == "work" + assert ( + self.device.attributes[B8DeviceAttributes.FUNCTION_TYPE] + == "dust_box_cleaning" + ) + assert self.device.attributes[B8DeviceAttributes.CONTROL_TYPE] == "manual" + assert self.device.attributes[B8DeviceAttributes.MOVE_DIRECTION] == "left" + assert self.device.attributes[B8DeviceAttributes.CLEAN_MODE] == "path" + assert self.device.attributes[B8DeviceAttributes.FAN_LEVEL] == "high" + assert self.device.attributes[B8DeviceAttributes.AREA] == 1 + assert self.device.attributes[B8DeviceAttributes.WATER_LEVEL] == "low" + assert self.device.attributes[B8DeviceAttributes.VOICE_VOLUME] == 90 + assert self.device.attributes[B8DeviceAttributes.HAVE_RESERVE_TASK] is True + assert self.device.attributes[B8DeviceAttributes.BATTERY_PERCENT] == 40 + assert self.device.attributes[B8DeviceAttributes.WORK_TIME] == 15 + assert self.device.attributes[B8DeviceAttributes.UV_SWITCH] is False + assert self.device.attributes[B8DeviceAttributes.WIFI_SWITCH] is True + assert self.device.attributes[B8DeviceAttributes.VOICE_SWITCH] is True + assert self.device.attributes[B8DeviceAttributes.COMMAND_SOURCE] is False + assert self.device.attributes[B8DeviceAttributes.DEVICE_ERROR] is True + assert self.device.attributes[B8DeviceAttributes.ERROR_TYPE] == "warning" + assert self.device.attributes[B8DeviceAttributes.ERROR_DESC] == "warn_full_dust" + assert self.device.attributes[B8DeviceAttributes.MOP] == "lack_water" + assert self.device.attributes[B8DeviceAttributes.CARPET_SWITCH] is False + assert self.device.attributes[B8DeviceAttributes.LASER_SENSOR_ERROR] is False + assert self.device.attributes[B8DeviceAttributes.LASER_SENSOR_SHELTER] is True + assert ( + self.device.attributes[B8DeviceAttributes.BOARD_COMMUNICATION_ERROR] is True + ) + assert self.device.attributes[B8DeviceAttributes.SPEED] == "low" + + def test_query_response_reboot_error(self) -> None: + """Test query response.""" + header = bytearray([0xAA] + ([0x0] * 8) + [MessageType.query]) + body = bytearray( + [ + 0x32, + 0x1, + B8WorkStatus.UPDATING, + B8FunctionType.WATER_TANK_CLEANING, + B8ControlType.NONE, + B8Moviment.NONE, + B8CleanMode.NONE, + B8FanLevel.OFF, + 0, + B8WaterLevel.OFF, + 0, + 0, + 0, + 0, + 0, + B8ErrorType.REBOOT, + B8ErrorRebootDescription.REBOOT_LASER_COMM_FAIL, + B8MopState.OFF, + 0x0, + 0x0, + B8Speed.LOW, + 0x0, # CRC + ], + ) + self.device.process_message(bytearray(header + body)) + assert self.device.attributes[B8DeviceAttributes.WORK_STATUS] == "updating" + assert ( + self.device.attributes[B8DeviceAttributes.FUNCTION_TYPE] + == "water_tank_cleaning" + ) + assert self.device.attributes[B8DeviceAttributes.CONTROL_TYPE] == "none" + assert self.device.attributes[B8DeviceAttributes.MOVE_DIRECTION] == "none" + assert self.device.attributes[B8DeviceAttributes.CLEAN_MODE] == "none" + assert self.device.attributes[B8DeviceAttributes.FAN_LEVEL] == "off" + assert self.device.attributes[B8DeviceAttributes.AREA] == 0 + assert self.device.attributes[B8DeviceAttributes.WATER_LEVEL] == "off" + assert self.device.attributes[B8DeviceAttributes.VOICE_VOLUME] == 0 + assert self.device.attributes[B8DeviceAttributes.HAVE_RESERVE_TASK] is False + assert self.device.attributes[B8DeviceAttributes.BATTERY_PERCENT] == 0 + assert self.device.attributes[B8DeviceAttributes.WORK_TIME] == 0 + assert self.device.attributes[B8DeviceAttributes.UV_SWITCH] is False + assert self.device.attributes[B8DeviceAttributes.WIFI_SWITCH] is False + assert self.device.attributes[B8DeviceAttributes.VOICE_SWITCH] is False + assert self.device.attributes[B8DeviceAttributes.COMMAND_SOURCE] is False + assert self.device.attributes[B8DeviceAttributes.DEVICE_ERROR] is False + assert self.device.attributes[B8DeviceAttributes.ERROR_TYPE] == "reboot" + assert ( + self.device.attributes[B8DeviceAttributes.ERROR_DESC] + == "reboot_laser_comm_fail" + ) + assert self.device.attributes[B8DeviceAttributes.MOP] == "off" + assert self.device.attributes[B8DeviceAttributes.CARPET_SWITCH] is False + assert self.device.attributes[B8DeviceAttributes.LASER_SENSOR_ERROR] is False + assert self.device.attributes[B8DeviceAttributes.LASER_SENSOR_SHELTER] is False + assert ( + self.device.attributes[B8DeviceAttributes.BOARD_COMMUNICATION_ERROR] + is False + ) + assert self.device.attributes[B8DeviceAttributes.SPEED] == "low" + + def test_query_response_no_error(self) -> None: + """Test query response.""" + header = bytearray([0xAA] + ([0x0] * 8) + [MessageType.query]) + body = bytearray( + [ + 0x32, + 0x1, + B8WorkStatus.NONE, + B8FunctionType.NONE, + B8ControlType.NONE, + B8Moviment.NONE, + B8CleanMode.NONE, + B8FanLevel.OFF, + 0, + B8WaterLevel.OFF, + 0, + 0, + 0, + 0, + 0, + B8ErrorType.NO, + B8ErrorRebootDescription.REBOOT_LASER_COMM_FAIL, + B8MopState.OFF, + 0x0, + 0x0, + B8Speed.LOW, + 0x0, # CRC + ], + ) + self.device.process_message(bytearray(header + body)) + assert self.device.attributes[B8DeviceAttributes.WORK_STATUS] == "none" + assert self.device.attributes[B8DeviceAttributes.FUNCTION_TYPE] == "none" + assert self.device.attributes[B8DeviceAttributes.CONTROL_TYPE] == "none" + assert self.device.attributes[B8DeviceAttributes.MOVE_DIRECTION] == "none" + assert self.device.attributes[B8DeviceAttributes.CLEAN_MODE] == "none" + assert self.device.attributes[B8DeviceAttributes.FAN_LEVEL] == "off" + assert self.device.attributes[B8DeviceAttributes.AREA] == 0 + assert self.device.attributes[B8DeviceAttributes.WATER_LEVEL] == "off" + assert self.device.attributes[B8DeviceAttributes.VOICE_VOLUME] == 0 + assert self.device.attributes[B8DeviceAttributes.HAVE_RESERVE_TASK] is False + assert self.device.attributes[B8DeviceAttributes.BATTERY_PERCENT] == 0 + assert self.device.attributes[B8DeviceAttributes.WORK_TIME] == 0 + assert self.device.attributes[B8DeviceAttributes.UV_SWITCH] is False + assert self.device.attributes[B8DeviceAttributes.WIFI_SWITCH] is False + assert self.device.attributes[B8DeviceAttributes.VOICE_SWITCH] is False + assert self.device.attributes[B8DeviceAttributes.COMMAND_SOURCE] is False + assert self.device.attributes[B8DeviceAttributes.DEVICE_ERROR] is False + assert self.device.attributes[B8DeviceAttributes.ERROR_TYPE] == "no" + assert self.device.attributes[B8DeviceAttributes.ERROR_DESC] == "no" + assert self.device.attributes[B8DeviceAttributes.MOP] == "off" + assert self.device.attributes[B8DeviceAttributes.CARPET_SWITCH] is False + assert self.device.attributes[B8DeviceAttributes.LASER_SENSOR_ERROR] is False + assert self.device.attributes[B8DeviceAttributes.LASER_SENSOR_SHELTER] is False + assert ( + self.device.attributes[B8DeviceAttributes.BOARD_COMMUNICATION_ERROR] + is False + ) + assert self.device.attributes[B8DeviceAttributes.SPEED] == "low" + + def test_unexpected_response(self) -> None: + """Test unexpected response.""" + header = bytearray([0xAA] + ([0x0] * 8) + [MessageType.query]) + body = bytearray( + [ + 0x32, + 0x2, + ] + + [0x0] * 20, + ) + + with patch("midealocal.message.MessageResponse.set_attr") as mock_set_attr: + self.device.process_message(bytearray(header + body)) + + body = bytearray( + [ + 0x42, + 0x1, + ] + + [0x0] * 20, + ) + self.device.process_message(bytearray(header + body)) + header[-1] = MessageType.notify1 + body = bytearray([0x32] + [0x0] * 20) + self.device.process_message(bytearray(header + body)) + header[-1] = MessageType.set + self.device.process_message(bytearray(header + body)) + mock_set_attr.assert_not_called() From bec3bdbb8435d42441c51df8bada1d930bdcbe7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20Mind=C3=AAllo=20de=20Andrade?= Date: Tue, 30 Jul 2024 12:57:39 +0000 Subject: [PATCH 8/8] chore(b8): use bodytype const --- midealocal/devices/b8/message.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/midealocal/devices/b8/message.py b/midealocal/devices/b8/message.py index 038e6e35..b0b0355c 100644 --- a/midealocal/devices/b8/message.py +++ b/midealocal/devices/b8/message.py @@ -76,7 +76,7 @@ def __init__(self, protocol_version: int) -> None: super().__init__( protocol_version=protocol_version, message_type=MessageType.set, - body_type=0x22, + body_type=BodyType.X22, ) self.clean_mode = B8CleanMode.AUTO self.fan_level = B8FanLevel.NORMAL @@ -108,7 +108,7 @@ def __init__(self, protocol_version: int, work_mode: B8WorkMode) -> None: super().__init__( protocol_version=protocol_version, message_type=MessageType.set, - body_type=0x22, + body_type=BodyType.X22, ) self.work_mode = work_mode