Skip to content

Restructure and merge circuit into zone if possible #75

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
252 changes: 48 additions & 204 deletions custom_components/mypyllant/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
from __future__ import annotations

import asyncio
import logging
from asyncio.exceptions import CancelledError
from datetime import datetime as dt, timedelta, timezone
from typing import TypedDict
from datetime import datetime as dt, timedelta
import voluptuous as vol
from aiohttp.client_exceptions import ClientResponseError
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import Platform
from homeassistant.core import (
Expand All @@ -17,30 +13,34 @@
)
from homeassistant.helpers import selector
from homeassistant.helpers.template import as_datetime
from homeassistant.helpers.entity_registry import async_migrate_entries, RegistryEntry
from homeassistant.core import callback

from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from myPyllant import export, report

from myPyllant.api import MyPyllantAPI
from myPyllant.const import DEFAULT_BRAND
from myPyllant.models import DeviceData, DeviceDataBucketResolution, System
from myPyllant.models import DeviceDataBucketResolution
from myPyllant.tests import generate_test_data

from custom_components.mypyllant.coordinator import (
DeviceDataCoordinator,
SystemCoordinator,
)

from custom_components.mypyllant.utils import async_remove_orphaned_devices

from .const import (
API_DOWN_PAUSE_INTERVAL,
DEFAULT_COUNTRY,
DEFAULT_REFRESH_DELAY,
DEFAULT_UPDATE_INTERVAL,
DOMAIN,
OPTION_BRAND,
OPTION_COUNTRY,
OPTION_REFRESH_DELAY,
OPTION_UPDATE_INTERVAL,
QUOTA_PAUSE_INTERVAL,
SERVICE_GENERATE_TEST_DATA,
SERVICE_EXPORT,
SERVICE_REPORT,
)
from .utils import is_quota_exceeded_exception

_LOGGER = logging.getLogger(__name__)

Expand All @@ -67,21 +67,38 @@ async def async_migrate_entry(hass: HomeAssistant, config_entry: ConfigEntry):

if config_entry.version == 1:
"""
from homeassistant.helpers.entity_registry import async_migrate_entries, RegistryEntry
from homeassistant.helpers.device_registry import async_entries_for_config_entry
from homeassistant.core import callback

devices = async_entries_for_config_entry(
hass.data["device_registry"], config_entry.entry_id
)
"""

@callback
def update_unique_id(entity_entry: RegistryEntry):
return {"new_unique_id": entity_entry.unique_id} # change entity_entry.unique_id
# All entities which are bound to system needs to be starting with {system_id}_home
if entity_entry.unique_id.count(
"_"
) == 4 and entity_entry.unique_id.endswith("_heating_energy_efficiency"):
return {
"new_unique_id": entity_entry.unique_id.replace(
"_heating_energy_efficiency", "_home_heating_energy_efficiency"
)
}
# old: {DOMAIN} {self.system.id} => {DOMAIN}_{self.system.id}
if entity_entry.unique_id.count(
"_"
) == 4 and entity_entry.unique_id.endswith("_cooling_allowed"):
return {"new_unique_id": entity_entry.unique_id.replace(" ", "_")}

# old: {DOMAIN}_{self.system.id}_{self.device.device_uuid}_{self.da_index} => {DOMAIN}_{self.system.id}_device_{self.device.device_uuid}_{self.da_index}
sep = entity_entry.unique_id.split("_")
if len(sep) == 4 and len(sep[3]) == 1 and sep[3].isnumeric():
return {"new_unique_id": f"{sep[0]}_{sep[1]}_device_{sep[2]}_{sep[3]}"}
return None

await async_migrate_entries(hass, config_entry.entry_id, update_unique_id)
config_entry.version = 2 # set to new version
"""
config_entry.version = 2 # set to new version

_LOGGER.debug("Migration to version %s successful", config_entry.version)
return True
Expand Down Expand Up @@ -127,13 +144,23 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
hass.data[DOMAIN][entry.entry_id]["system_coordinator"] = system_coordinator

# Daily data coordinator is updated hourly, but requests data for the whole day
daily_data_coordinator = DailyDataCoordinator(hass, api, entry, timedelta(hours=1))
device_data_coordinator = DeviceDataCoordinator(
hass, api, entry, timedelta(hours=1)
)
_LOGGER.debug("Refreshing DailyDataCoordinator")
await daily_data_coordinator.async_refresh()
hass.data[DOMAIN][entry.entry_id]["daily_data_coordinator"] = daily_data_coordinator
await device_data_coordinator.async_refresh()
hass.data[DOMAIN][entry.entry_id][
"device_data_coordinator"
] = device_data_coordinator

await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)

# Cleanup orphaned devices
try:
await async_remove_orphaned_devices(hass, entry)
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Error in async_remove_orphaned_devices")

async def handle_export(call: ServiceCall) -> ServiceResponse:
return {
"export": await export.main(
Expand Down Expand Up @@ -217,191 +244,8 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"system_coordinator"
].api.aiohttp_session.close()
await hass.data[DOMAIN][entry.entry_id][
"daily_data_coordinator"
"device_data_coordinator"
].api.aiohttp_session.close()
hass.data[DOMAIN].pop(entry.entry_id)

return unload_ok


class MyPyllantCoordinator(DataUpdateCoordinator):
api: MyPyllantAPI

def __init__(
self,
hass: HomeAssistant,
api: MyPyllantAPI,
entry: ConfigEntry,
update_interval: timedelta | None,
) -> None:
self.api = api
self.hass = hass
self.entry = entry

super().__init__(
hass,
_LOGGER,
name="myVAILLANT",
update_interval=update_interval,
)

@property
def hass_data(self):
return self.hass.data[DOMAIN][self.entry.entry_id]

async def _refresh_session(self):
if (
self.api.oauth_session_expires is None
or self.api.oauth_session_expires
< dt.now(timezone.utc) + timedelta(seconds=180)
):
_LOGGER.debug("Refreshing token for %s", self.api.username)
await self.api.refresh_token()
else:
delta = self.api.oauth_session_expires - (
dt.now(timezone.utc) + timedelta(seconds=180)
)
_LOGGER.debug(
"Waiting %ss until token refresh for %s",
delta.seconds,
self.api.username,
)

async def async_request_refresh_delayed(self, delay=None):
"""
The API takes a long time to return updated values (i.e. after setting a new heating mode)
This function waits for a few second and then refreshes
"""
if not delay:
delay = self.entry.options.get(OPTION_REFRESH_DELAY, DEFAULT_REFRESH_DELAY)
if delay:
await asyncio.sleep(delay)
await self.async_request_refresh()

def _raise_api_down(self, exc_info: CancelledError | TimeoutError) -> None:
"""
Raises UpdateFailed if a TimeoutError or CancelledError occurred during updating

Sets a quota time, so the API isn't queried as often while it is down
"""
self.hass_data["quota_time"] = dt.now(timezone.utc)
self.hass_data["quota_exc_info"] = exc_info
raise UpdateFailed(
f"myVAILLANT API is down, skipping update of myVAILLANT data for another {QUOTA_PAUSE_INTERVAL}s"
) from exc_info

def _set_quota_and_raise(self, exc_info: ClientResponseError) -> None:
"""
Check if the API raises a ClientResponseError with "Quota Exceeded" in the message
Raises UpdateFailed if a quota error is detected
"""
if is_quota_exceeded_exception(exc_info):
self.hass_data["quota_time"] = dt.now(timezone.utc)
self.hass_data["quota_exc_info"] = exc_info
self._raise_if_quota_hit()

def _raise_if_quota_hit(self) -> None:
"""
Check if we previously hit a quota, and if the quota was hit within a certain interval
If yes, we keep raising UpdateFailed() until after the interval to avoid spamming the API
"""
quota_time: dt = self.hass_data["quota_time"]
if not quota_time:
return

time_elapsed = (dt.now(timezone.utc) - quota_time).seconds
exc_info: Exception = self.hass_data["quota_exc_info"]

if is_quota_exceeded_exception(exc_info):
_LOGGER.debug(
"Quota was hit %ss ago on %s",
time_elapsed,
quota_time,
exc_info=exc_info,
)
if time_elapsed < QUOTA_PAUSE_INTERVAL:
raise UpdateFailed(
f"{exc_info.message} on {exc_info.request_info.real_url}, " # type: ignore
f"skipping update of myVAILLANT data for another {QUOTA_PAUSE_INTERVAL - time_elapsed}s"
) from exc_info
else:
_LOGGER.debug(
"myVAILLANT API is down since %ss (%s)",
time_elapsed,
quota_time,
exc_info=exc_info,
)
if time_elapsed < API_DOWN_PAUSE_INTERVAL:
raise UpdateFailed(
f"myVAILLANT API is down, skipping update of myVAILLANT data for another"
f" {API_DOWN_PAUSE_INTERVAL - time_elapsed}s"
) from exc_info


class SystemCoordinator(MyPyllantCoordinator):
data: list[System]

async def _async_update_data(self) -> list[System]:
self._raise_if_quota_hit()
_LOGGER.debug("Starting async update data for SystemCoordinator")
try:
await self._refresh_session()
data = [
s
async for s in await self.hass.async_add_executor_job(
self.api.get_systems, True, True
)
]
return data
except ClientResponseError as e:
self._set_quota_and_raise(e)
raise UpdateFailed() from e
except (CancelledError, TimeoutError) as e:
self._raise_api_down(e)
return [] # mypy


class SystemWithDeviceData(TypedDict):
home_name: str
devices_data: list[list[DeviceData]]


class DailyDataCoordinator(MyPyllantCoordinator):
data: dict[str, SystemWithDeviceData]

async def _async_update_data(self) -> dict[str, SystemWithDeviceData]:
self._raise_if_quota_hit()
_LOGGER.debug("Starting async update data for DailyDataCoordinator")
try:
await self._refresh_session()
data: dict[str, SystemWithDeviceData] = {}
async for system in await self.hass.async_add_executor_job(
self.api.get_systems
):
start = dt.now(system.timezone).replace(
microsecond=0, second=0, minute=0, hour=0
)
end = start + timedelta(days=1)
_LOGGER.debug(
"Getting daily data for %s from %s to %s", system, start, end
)
if len(system.devices) == 0:
continue
data[system.id] = {
"home_name": system.home.home_name or system.home.nomenclature,
"devices_data": [],
}
for device in system.devices:
device_data = self.api.get_data_by_device(
device, DeviceDataBucketResolution.DAY, start, end
)
data[system.id]["devices_data"].append(
[da async for da in device_data]
)
return data
except ClientResponseError as e:
self._set_quota_and_raise(e)
raise UpdateFailed() from e
except (CancelledError, TimeoutError) as e:
self._raise_api_down(e)
return {} # mypy
Loading