Skip to content

Commit

Permalink
↕️ refactor for cover
Browse files Browse the repository at this point in the history
  • Loading branch information
al-one committed Dec 25, 2024
1 parent 9107ae0 commit 26b7556
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 219 deletions.
17 changes: 15 additions & 2 deletions custom_components/xiaomi_miot/core/converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,15 @@ def encode(self, device: 'Device', payload: dict, value: tuple):
num = rgb_to_int(rgb)
super().encode(device, payload, num)

@dataclass
class MiotCoverConv(MiotServiceConv):
domain: str = 'cover'

def __post_init__(self):
if not self.main_props:
self.main_props = ['motor_control']
super().__post_init__()

@dataclass
class PercentagePropConv(MiotPropConv):
ranged = None
Expand All @@ -265,12 +274,16 @@ def __post_init__(self):

def decode(self, device: 'Device', payload: dict, value: int):
if self.ranged:
value = int(percentage.ranged_value_to_percentage(self.ranged, value))
value = int(percentage.scale_ranged_value_to_int_range(self.ranged, (0, 100), value))
super().decode(device, payload, value)

def encode(self, device: 'Device', payload: dict, value: int):
if self.ranged:
value = int(percentage.percentage_to_ranged_value(self.ranged, value))
value = int(percentage.scale_to_ranged_value((0, 100), self.ranged, value))
if value < self.ranged[0]:
value = self.ranged[0]
if value > self.ranged[1]:
value = self.ranged[1]
super().encode(device, payload, value)

class MiotTargetPositionConv(PercentagePropConv):
Expand Down
8 changes: 8 additions & 0 deletions custom_components/xiaomi_miot/core/device_customizes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2561,6 +2561,14 @@
{'props': ['mode'], 'desc': True},
],
},
{
'class': MiotCoverConv,
'services': ['airer', 'curtain', 'window_opener', 'motor_controller'],
'converters' : [
{'props': ['status', 'motor_control', 'current_position']},
{'props': ['target_position'], 'class': MiotTargetPositionConv},
],
},
{
'services': ['physical_control_locked', 'physical_controls_locked'],
'converters' : [
Expand Down
272 changes: 55 additions & 217 deletions custom_components/xiaomi_miot/cover.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,17 @@
DOMAIN as ENTITY_DOMAIN,
CoverEntity as BaseEntity,
CoverEntityFeature, # v2022.5
CoverDeviceClass,
ATTR_POSITION,
)

from . import (
DOMAIN,
CONF_MODEL,
XIAOMI_CONFIG_SCHEMA as PLATFORM_SCHEMA, # noqa: F401
HassEntry,
XEntity,
MiotEntity,
async_setup_config_entry,
bind_services_to_entries,
)
from .core.miot_spec import (
MiotSpec,
MiotService,
MiotProperty,
)
from .core.miot_spec import MiotProperty
from .core.converters import MiotPropConv, MiotTargetPositionConv

_LOGGER = logging.getLogger(__name__)
Expand All @@ -43,17 +35,6 @@ async def async_setup_platform(hass, config, async_add_entities, discovery_info=
hass.data.setdefault(DATA_KEY, {})
hass.data[DOMAIN]['add_entities'][ENTITY_DOMAIN] = async_add_entities
config['hass'] = hass
model = str(config.get(CONF_MODEL) or '')
spec = hass.data[DOMAIN]['miot_specs'].get(model)
entities = []
if isinstance(spec, MiotSpec):
for srv in spec.get_services(ENTITY_DOMAIN, 'curtain', 'airer', 'window_opener', 'motor_controller'):
if not srv.get_property('motor_control'):
continue
entities.append(MiotCoverEntity(config, srv))
for entity in entities:
hass.data[DOMAIN]['entities'][entity.unique_id] = entity
async_add_entities(entities, update_before_add=True)
bind_services_to_entries(hass, SERVICE_TO_METHOD)


Expand All @@ -66,9 +47,24 @@ class CoverEntity(XEntity, BaseEntity):
_conv_current_position = None
_conv_target_position = None
_current_range = None
_target_range = None
_target_range = (0, 100)
_motor_reverse = None
_position_reverse = None
_open_texts = ['Open', 'Up']
_close_texts = ['Close', 'Down']
_closed_position = 0
_deviated_position = 0
_target2current_position = None

def on_init(self):
self._motor_reverse = self.custom_config_bool('motor_reverse', False)
self._position_reverse = self.custom_config_bool('position_reverse', self._motor_reverse)
self._open_texts = self.custom_config_list('open_texts', self._open_texts)
self._close_texts = self.custom_config_list('close_texts', self._close_texts)
if self._motor_reverse:
self._open_texts, self._close_texts = self._close_texts, self._open_texts
self._target2current_position = self.custom_config_bool('target2current_position')

for conv in self.device.converters:
prop = getattr(conv, 'prop', None)
if not isinstance(prop, MiotProperty):
Expand All @@ -77,6 +73,10 @@ def on_init(self):
self._conv_status = conv
elif prop.in_list(['motor_control']):
self._conv_motor = conv
self._attr_supported_features |= CoverEntityFeature.OPEN
self._attr_supported_features |= CoverEntityFeature.CLOSE
if prop.list_first('Stop', 'Pause') != None:
self._attr_supported_features |= CoverEntityFeature.STOP
elif prop.in_list(['current_position']) and prop.value_range:
self._conv_current_position = conv
self._current_range = (prop.range_min, prop.range_max)
Expand All @@ -89,7 +89,21 @@ def on_init(self):
self._target_range = (prop.range_min(), prop.range_max())
self._attr_supported_features |= CoverEntityFeature.SET_POSITION

self._deviated_position = self.custom_config_integer('deviated_position', 1)
if self._current_range:
if self._position_reverse:
pos = self._current_range[1] - self._deviated_position
else:
pos = self._current_range[0] + self._deviated_position
self._closed_position = self.custom_config_integer('closed_position', pos)

def set_state(self, data: dict):
prop_status = getattr(self._conv_status, 'prop', None) if self._conv_status else None
if prop_status:
val = self._conv_status.value_from_dict(data)
self._attr_is_opening = val in prop_status.list_search('Opening', 'Rising')
self._attr_is_closing = val in prop_status.list_search('Closing', 'Falling')
self._attr_is_closed = val in prop_status.list_search('Closed')
if self._conv_current_position:
val = self._conv_current_position.value_from_dict(data)
if val is not None:
Expand All @@ -100,218 +114,42 @@ def set_state(self, data: dict):
self._attr_target_cover_position = int(val)
if not self._conv_current_position:
self._attr_current_cover_position = self._attr_target_cover_position
if self._target2current_position:
self._attr_current_cover_position = self._attr_target_cover_position
if (val := self._attr_current_cover_position) != None:
if self._position_reverse:
self._attr_is_closed = val >= self._closed_position
else:
self._attr_is_closed = val <= self._closed_position

async def async_open_cover(self, **kwargs):
if self._conv_motor:
val = self._conv_motor.prop.list_first('Open', 'Up')
val = self._conv_motor.prop.list_first(self._open_texts)
if val is not None:
await self.device.async_write({self._conv_motor.full_name: val})
return
await self.async_set_cover_position(100)
await self.async_set_cover_position(0 if self._position_reverse else 100)

async def async_close_cover(self, **kwargs):
if self._conv_motor:
val = self._conv_motor.prop.list_first('Close', 'Down')
val = self._conv_motor.prop.list_first(self._close_texts)
if val is not None:
await self.device.async_write({self._conv_motor.full_name: val})
return
await self.async_set_cover_position(0)
await self.async_set_cover_position(100 if self._position_reverse else 0)

async def async_stop_cover(self, **kwargs):
if not self._conv_motor:
return
val = self._conv_motor.prop.list_first('Stop', 'Pause')
if val is not None:
await self.device.async_write({self._conv_motor.full_name: val})

async def async_set_cover_position(self, position, **kwargs):
if not self._conv_target_position:
return
if self._position_reverse:
position = self._target_range[1] - position
await self.device.async_write({self._conv_target_position.full_name: position})

XEntity.CLS[ENTITY_DOMAIN] = CoverEntity


class MiotCoverEntity(MiotEntity, BaseEntity):
def __init__(self, config: dict, miot_service: MiotService):
super().__init__(miot_service, config=config, logger=_LOGGER)

self._prop_status = miot_service.get_property('status')
self._prop_motor_control = miot_service.get_property('motor_control')
self._prop_current_position = None
for p in miot_service.get_properties('current_position'):
self._prop_current_position = p
if p.value_range:
# https://home.miot-spec.com/spec/hyd.airer.lyjpro
break
self._prop_target_position = miot_service.get_property('target_position')

self._motor_reverse = False
self._position_reverse = False
self._target2current = False
self._open_texts = []
self._close_texts = []
self._supported_features = CoverEntityFeature.OPEN | CoverEntityFeature.CLOSE

async def async_added_to_hass(self):
await super().async_added_to_hass()
if self._prop_target_position:
if not self.custom_config_bool('disable_target_position'):
self._supported_features |= CoverEntityFeature.SET_POSITION
else:
self._prop_target_position = None
if self._prop_motor_control.list_first('Pause', 'Stop') is not None:
self._supported_features |= CoverEntityFeature.STOP

self._target2current = self.custom_config_bool('target2current_position')
if self._target2current and self._prop_target_position:
self._prop_current_position = self._prop_target_position

self._motor_reverse = self.custom_config_bool('motor_reverse', False)
self._position_reverse = self.custom_config_bool('position_reverse', self._motor_reverse)
self._open_texts = self.custom_config_list('open_texts', ['Opening', 'Opened', 'Open', 'Up', 'Rising', 'Risen', 'Rise'])
self._close_texts = self.custom_config_list('close_texts', ['Closing', 'Closed', 'Close', 'Down', 'Falling', 'Descent'])
if self._motor_reverse:
self._open_texts, self._close_texts = self._close_texts, self._open_texts

@property
def device_class(self):
if cls := self.get_device_class(CoverDeviceClass):
return cls
typ = f'{self.model} {self._miot_service.spec.type}'
if 'curtain' in typ:
return CoverDeviceClass.CURTAIN
if 'window_opener' in typ:
return CoverDeviceClass.WINDOW
return None

async def async_update(self):
await super().async_update()
if not self._available:
return
if prop_reverse := self._miot_service.get_property('motor_reverse'):
if prop_reverse.from_device(self.device):
if self.custom_config_bool('auto_position_reverse'):
self._position_reverse = True

@property
def current_cover_position(self):
pos = -1
if self._prop_current_position:
try:
cur = round(self._prop_current_position.from_device(self.device), 2)
except (TypeError, ValueError):
cur = None
if cur is None:
return None
pos = cur
range_max = self._prop_current_position.range_max()
dic = self.custom_config_json('cover_position_mapping')
if dic:
if cur in dic:
pos = dic.get(cur, cur)
elif self._prop_current_position.value_list:
# mrbond.airer.m53c
for v in self._prop_current_position.value_list:
if cur != v.get('value'):
continue
des = str(v.get('description')).lower()
if 'top' in des:
pos = 100
elif 'middle' in des:
pos = 50
elif 'button' in des:
pos = 0
elif range_max != 100:
pos = cur / range_max * 100
if pos < 0:
# If the motor controller is stopped, generate fake middle position
if self._prop_status:
sta = int(self._prop_status.from_device(self.device) or -1)
if sta in self._prop_status.list_search('Stopped'):
return 50
return None
dev = int(self.custom_config_integer('deviated_position', 1) or 0)
if pos <= dev:
pos = 0
elif pos >= 100 - dev:
pos = 100
if self._position_reverse:
pos = 100 - pos
return pos

@property
def target_cover_position(self):
pos = None
if not self._prop_target_position:
return pos
pos = self._prop_target_position.from_device(self.device)
if pos is None:
return pos
pos = int(pos)
if self._position_reverse:
pos = 100 - pos
return pos

def set_cover_position(self, **kwargs):
pos = round(kwargs.get(ATTR_POSITION) or 0)
if self._position_reverse and self._target2current:
pos = 100 - pos
srv = self._miot_service
for p in srv.get_properties('target_position'):
if not p.value_range:
continue
if p.range_min() <= pos <= p.range_max():
return self.set_miot_property(srv.iid, p.iid, pos)
cur = self.current_cover_position or 50
if pos > cur:
return self.open_cover()
if pos < cur:
return self.close_cover()
return False

@property
def is_closed(self):
cur = self.current_cover_position
if cur is not None:
pos = self.custom_config_number('closed_position', 1)
return cur <= pos
if self._prop_status:
sta = int(self._prop_status.from_device(self.device) or -1)
cvs = self.custom_config_list('closed_status') or []
if cvs:
return sta in cvs or f'{sta}' in cvs
return None

@property
def is_closing(self):
if not self._prop_status:
return None
sta = int(self._prop_status.from_device(self.device) or -1)
return sta in self._prop_status.list_search(*self._close_texts)

@property
def is_opening(self):
if not self._prop_status:
return None
sta = int(self._prop_status.from_device(self.device) or -1)
return sta in self._prop_status.list_search(*self._open_texts)

def motor_control(self, open_cover=True, **kwargs):
tls = self._open_texts if open_cover else self._close_texts
val = self.custom_config_integer('open_cover_value' if open_cover else 'close_cover_value')
if val is None:
val = self._prop_motor_control.list_first(*tls)
if val is None:
_LOGGER.error('Motor control value is invalid for %s', self.name)
return False
ret = self.set_property(self._prop_motor_control, val)
if ret and self._prop_status:
self.update_attrs({
self._prop_status.full_name: self._prop_status.list_first(*tls)
})
return ret

def open_cover(self, **kwargs):
return self.motor_control(open_cover=True, **kwargs)

def close_cover(self, **kwargs):
return self.motor_control(open_cover=False, **kwargs)

def stop_cover(self, **kwargs):
val = self._prop_motor_control.list_first('Pause', 'Stop')
val = self.custom_config_integer('stop_cover_value', val)
return self.set_property(self._prop_motor_control, val)

1 comment on commit 26b7556

@al-one
Copy link
Owner Author

@al-one al-one commented on 26b7556 Dec 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.