Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: add scene history sensor #1361

Merged
merged 9 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 50 additions & 41 deletions custom_components/xiaomi_miot/core/xiaomi_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,53 +249,66 @@ def get_home_devices(self):
}
return rdt

async def async_get_devices(self, renew=False, return_all=False):
async def _async_all_devices(self, renew=False):
al-one marked this conversation as resolved.
Show resolved Hide resolved
if not self.user_id:
return None
fnm = f'xiaomi_miot/devices-{self.user_id}-{self.default_server}.json'
store = Store(self.hass, 1, fnm)

filename = f'xiaomi_miot/devices-{self.user_id}-{self.default_server}.json'
store = Store(self.hass, 1, filename)
now = time.time()
cds = []
dvs = []

cached_data = None
try:
dat = await store.async_load() or {}
cached_data = await store.async_load() or {}
if cached_data and isinstance(cached_data, dict):
if not renew and cached_data.get('update_time', 0) > (now - 86400):
return cached_data
except ValueError:
await store.async_remove()
dat = {}
if isinstance(dat, dict):
cds = dat.get('devices') or []
if not renew and dat.get('update_time', 0) > (now - 86400):
dvs = cds
if not dvs:
try:
dvs = await self.hass.async_add_executor_job(self.get_device_list)
if dvs:
hls = await self.hass.async_add_executor_job(self.get_home_devices)
if hls:
hds = hls.get('devices') or {}
dvs = [
{**d, **(hds.get(d.get('did')) or {})}
for d in dvs
]
dat = {
'update_time': now,
'devices': dvs,
'homes': hls.get('homelist', []),
}
await store.async_save(dat)
_LOGGER.info('Got %s devices from xiaomi cloud', len(dvs))
except requests.exceptions.ConnectionError as exc:
if not cds:
raise exc
dvs = cds
_LOGGER.warning('Get xiaomi devices filed: %s, use cached %s devices.', exc, len(cds))
if return_all:
return dat
return dvs

try:
devices = await self.hass.async_add_executor_job(self.get_device_list)
if devices is None:
# exec self.get_device_list failed
return cached_data or {}

homes = await self.hass.async_add_executor_job(self.get_home_devices)
if homes:
device2home = homes.get('devices') or {}
for device in devices:
home_info = device2home.get(device.get('did'))
if not home_info:
continue
device.update(home_info)

refreshed = {
'update_time': now,
'devices': devices,
'homes': homes.get('homelist', []),
}

await store.async_save(refreshed)
_LOGGER.info('Got %s devices from xiaomi cloud', len(devices))
return refreshed

except requests.exceptions.ConnectionError as exc:
if not cached_data:
raise exc

_LOGGER.warning('Get xiaomi devices filed: %s, use cached %s devices.', exc, len(cached_data.get('devices') or []))
return cached_data or {}

async def async_get_devices(self, renew=False):
result = await self._async_all_devices(renew=renew)
return result.get('devices', [])

async def async_renew_devices(self):
return await self.async_get_devices(renew=True)

async def async_get_homerooms(self, renew=False):
result = await self._async_all_devices(renew=renew)
return result.get('homes') or []

async def async_get_devices_by_key(self, key, renew=False, filters=None):
dat = {}
if filters is None:
Expand Down Expand Up @@ -327,10 +340,6 @@ async def async_get_devices_by_key(self, key, renew=False, filters=None):
dat[k] = d
return dat

async def async_get_homerooms(self, renew=False):
dat = await self.async_get_devices(renew=renew, return_all=True) or {}
return dat.get('homes') or []

async def async_get_beaconkey(self, did):
dat = {'did': did or self.miot_did, 'pdid': 1}
rdt = await self.async_request_api('v2/device/blt_get_beaconkey', dat) or {}
Expand Down
166 changes: 159 additions & 7 deletions custom_components/xiaomi_miot/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import json
from typing import cast
from datetime import datetime, timedelta
from functools import partial
from functools import partial, cmp_to_key

from homeassistant.const import * # noqa: F401
from homeassistant.helpers.entity import (
Expand Down Expand Up @@ -63,12 +63,27 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
cfg = hass.data[DOMAIN].get(config_entry.entry_id) or {}
mic = cfg.get(CONF_XIAOMI_CLOUD)
config_data = config_entry.data or {}
if isinstance(mic, MiotCloud) and mic.user_id and not config_data.get('disable_message'):
hass.data[DOMAIN]['accounts'].setdefault(mic.user_id, {})
if not hass.data[DOMAIN]['accounts'][mic.user_id].get('messenger'):
entity = MihomeMessageSensor(hass, mic)
hass.data[DOMAIN]['accounts'][mic.user_id]['messenger'] = entity
async_add_entities([entity], update_before_add=False)

if isinstance(mic, MiotCloud) and mic.user_id:
if not config_data.get('disable_message'):
hass.data[DOMAIN]['accounts'].setdefault(mic.user_id, {})

if not hass.data[DOMAIN]['accounts'][mic.user_id].get('messenger'):
entity = MihomeMessageSensor(hass, mic)
hass.data[DOMAIN]['accounts'][mic.user_id]['messenger'] = entity
async_add_entities([entity], update_before_add=False)

if not config_data.get('disable_scene_history'):
al-one marked this conversation as resolved.
Show resolved Hide resolved
homes = await mic.async_get_homerooms()
for home in homes:
home_id = home.get('id')
if hass.data[DOMAIN]['accounts'][mic.user_id].get(f'scene_history_{home_id}'):
continue

entity = MihomeSceneHistorySensor(hass, mic, home_id, home.get('uid'))
hass.data[DOMAIN]['accounts'][mic.user_id][f'scene_history_{home_id}'] = entity
async_add_entities([entity], update_before_add=False)

await async_setup_config_entry(hass, config_entry, async_setup_platform, async_add_entities, ENTITY_DOMAIN)


Expand Down Expand Up @@ -711,6 +726,143 @@ async def fetch_latest_message(self):
return msg


class MihomeSceneHistorySensor(MiCoordinatorEntity, SensorEntity, RestoreEntity):
MESSAGE_TIMEOUT = 60

_has_none_message = False

def __init__(self, hass, cloud: MiotCloud, home_id, owner_user_id):
self.hass = hass
self.cloud = cloud
self.home_id = int(home_id)
self.owner_user_id = int(owner_user_id)
self.entity_id = f'{ENTITY_DOMAIN}.mi_{cloud.user_id}_{home_id}_scene_history'
self._attr_unique_id = f'{DOMAIN}-mihome-scene-history-{cloud.user_id}_{home_id}'
self._attr_name = f'Xiaomi {cloud.user_id}_{home_id} Scene History'
self._attr_icon = 'mdi:message'
self._attr_should_poll = False
self._attr_native_value = None
self._attr_extra_state_attributes = {
'entity_class': self.__class__.__name__,
}
self.coordinator = DataUpdateCoordinator(
hass,
_LOGGER,
name=self._attr_unique_id,
update_method=self.fetch_latest_message,
update_interval=timedelta(seconds=5),
al-one marked this conversation as resolved.
Show resolved Hide resolved
)
super().__init__(self.coordinator)

async def async_added_to_hass(self):
await super().async_added_to_hass()
self.hass.data[DOMAIN]['entities'][self.entity_id] = self
if sec := self.custom_config_integer('interval_seconds'):
self.coordinator.update_interval = timedelta(seconds=sec)

if restored := await self.async_get_last_extra_data():
restored_dict = restored.as_dict()

attrs = restored_dict.get('attrs', {})
if ts := attrs.get('ts'):
attrs['timestamp'] = datetime.fromtimestamp(ts, local_zone()) if ts else None

_LOGGER.debug(
'xiaomi scene history %s %d, async_added_to_hass restore state: state= %s attrs= %s',
self.cloud.user_id, self.home_id, restored_dict.get('state'), attrs,
)
self._attr_native_value = restored_dict.get('state')
self._attr_extra_state_attributes.update(attrs)

await self.coordinator.async_config_entry_first_refresh()

async def async_will_remove_from_hass(self):
"""Run when entity will be removed from hass.
To be extended by integrations.
"""
await super().async_will_remove_from_hass()
self.hass.data[DOMAIN]['accounts'].get(self.cloud.user_id, {}).pop(f'scene_history_{self.home_id}', None)

@property
def extra_restore_state_data(self):
"""Return entity specific state data to be restored."""
return RestoredExtraData({
'state': self.native_value,
'attrs': self._attr_extra_state_attributes,
})

def trim_message(self, msg):
ts = msg.get('time') or int(time.time())
return {
"from": msg.get('from'),
"name": msg.get('name'),
"ts": ts,
'timestamp': datetime.fromtimestamp(ts, local_zone()),
"scene_id": str(msg.get('userSceneId')),
"targets": msg.get('msg', []),
}

@staticmethod
def _cmp_message(a, b):
a_ts, b_ts = a.get('ts', 0), b.get('ts', 0)
if a_ts != b_ts:
return a_ts - b_ts

a_scene_id, b_scene_id = a.get('scene_id', 0), b.get('scene_id', 0)
if a_scene_id < b_scene_id:
return -1
if a_scene_id > b_scene_id:
return 1
return 0

async def async_set_message(self, msg):
self._attr_native_value = msg.get('name')
_LOGGER.debug('New xiaomi scene history for %s %d: %s', self.cloud.user_id, self.home_id, self._attr_native_value)

old = self._attr_extra_state_attributes or {}
self._attr_extra_state_attributes.update({**msg, 'prev_value': old.get('name'), 'prev_scene_id': old.get('scene_id')})

async def fetch_latest_message(self):
res = await self.cloud.async_request_api('scene/history', data={
"home_id": self.home_id,
"uid": int(self.cloud.user_id),
"owner_uid": self.owner_user_id,
"command": "history",
"limit": 15,
}) or {}

messages = [self.trim_message(msg) for msg in (res.get('result') or {}).get('history') or []]
if not messages:
if not self._has_none_message:
_LOGGER.warning('Get xiaomi scene history for %s %d failed: %s', self.cloud.user_id, self.home_id, res)

self._has_none_message = True
return {}

messages.sort(key=cmp_to_key(self._cmp_message), reverse=False)
_LOGGER.debug(
'Get xiaomi scene history for %s %d success: prev_timestamp= %d prev_scene_id= %s messages= %s,',
self.cloud.user_id, self.home_id,
self._attr_extra_state_attributes.get('ts') or 0,
self._attr_extra_state_attributes.get('scene_id') or '',
messages,
)

must_after = int(time.time()) - self.MESSAGE_TIMEOUT
for msg in messages:
if msg.get('ts') < must_after:
continue

if self._cmp_message(msg, self._attr_extra_state_attributes) <= 0:
continue

await self.async_set_message(msg)
self._has_none_message = False
return msg

return {}


class XiaoaiConversationSensor(MiCoordinatorEntity, BaseSensorSubEntity):
def __init__(self, parent, hass, option=None):
BaseSensorSubEntity.__init__(self, parent, 'conversation', option)
Expand Down