Skip to content

Commit

Permalink
Refactor health monitoring to allow multiple instances
Browse files Browse the repository at this point in the history
  • Loading branch information
ekutner committed Feb 8, 2024
1 parent 5d79821 commit ebd7e47
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 58 deletions.
2 changes: 1 addition & 1 deletion home_connect_async/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from .homeconnect import HomeConnect
from .appliance import Appliance
from .auth import AuthManager, AbstractAuth
from .common import GlobalStatus, HomeConnectError, ConditionalLogger
from .common import HealthStatus, HomeConnectError, ConditionalLogger
from .const import Events

9 changes: 5 additions & 4 deletions home_connect_async/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from aiohttp import ClientResponse

from .auth import AbstractAuth
from .common import ConditionalLogger, HomeConnectError, GlobalStatus
from .common import ConditionalLogger, HomeConnectError, HealthStatus

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -43,9 +43,10 @@ def error_description(self) -> str | None:
return None


def __init__(self, auth:AbstractAuth, lang:str=None):
def __init__(self, auth:AbstractAuth, lang:str, health:HealthStatus):
self._auth = auth
self._lang = lang
self._health = health
self._call_counter = 0

async def _async_request(self, method:str, endpoint:str, data=None) -> ApiResponse:
Expand Down Expand Up @@ -84,9 +85,9 @@ async def _async_request(self, method:str, endpoint:str, data=None) -> ApiRespon
if response.status == 429: # Too Many Requests
wait_time = response.headers.get('Retry-After')
_LOGGER.debug('HTTP Error 429 - Too Many Requests. Sleeping for %s seconds and will retry', wait_time)
GlobalStatus.set_status(GlobalStatus.Status.BLOCKED, int(wait_time))
self._health.set_status(self._health.Status.BLOCKED, int(wait_time))
await asyncio.sleep(int(wait_time)+1)
GlobalStatus.unset_status(GlobalStatus.Status.BLOCKED)
self._health.unset_status(self._health.Status.BLOCKED)
elif method in ["PUT", "DELETE"] and response.status == 204:
result = self.ApiResponse(response, None)
return result
Expand Down
77 changes: 36 additions & 41 deletions home_connect_async/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,22 @@ class LogMode(IntFlag):
_log_flags:LogMode = None

@classmethod
def mode(cls, log_flags:LogMode=None) -> LogMode:
def mode(self, log_flags:LogMode=None) -> LogMode:
""" Gets or Sets the log flags for conditional logging """
if log_flags:
cls._log_flags = log_flags
return cls._log_flags
self._log_flags = log_flags
return self._log_flags


@classmethod
def ismode(cls, logmode:LogMode) -> bool:
def ismode(self, logmode:LogMode) -> bool:
""" Check if the specified logmode is enabled """
return cls._log_flags & logmode
return self._log_flags & logmode

@classmethod
def debug(cls, logger:Logger, logmode:LogMode, *args, **kwargs ) -> None:
def debug(self, logger:Logger, logmode:LogMode, *args, **kwargs ) -> None:
""" Conditional debug log """
if cls._log_flags & logmode:
if self._log_flags & logmode:
logger.debug(*args, **kwargs)


Expand All @@ -59,8 +59,8 @@ class Synchronization():
selected_program_lock = asyncio.Lock()


class GlobalStatus:
""" Store a global status for the library """
class HealthStatus:
""" Store the Home Connect connection health status """
class Status(IntFlag):
""" Enum for the current status of the Home Connect data loading process """
INIT = 0
Expand All @@ -72,50 +72,45 @@ class Status(IntFlag):
LOADING_FAILED = 8
BLOCKED = 16

_status:Status = Status.INIT
_blocked_until:datetime = None
def __init__(self) -> None:
self._status:self.Status = self.Status.INIT
self._blocked_until:datetime = None

@classmethod
def set_status(cls, status:Status, delay:int=None) -> None:
def set_status(self, status:Status, delay:int=None) -> None:
""" Set the status """
cls._status |= status
self._status |= status
if delay:
cls._blocked_until = datetime.now() + timedelta(seconds=delay)
self._blocked_until = datetime.now() + timedelta(seconds=delay)

@classmethod
def unset_status(cls, status:Status) -> None:
def unset_status(self, status:Status) -> None:
""" Set the status """
cls._status &= ~status
if status == cls.Status.BLOCKED:
cls._blocked_until = None
self._status &= ~status
if status == self.Status.BLOCKED:
self._blocked_until = None

@classmethod
def get_status(cls) -> Status:
def get_status(self) -> Status:
""" Get the status """
if cls._status & cls.Status.BLOCKED:
return cls.Status.BLOCKED
elif cls._status & cls.Status.LOADING_FAILED:
return cls.Status.LOADING_FAILED
return cls._status
if self._status & self.Status.BLOCKED:
return self.Status.BLOCKED
elif self._status & self.Status.LOADING_FAILED:
return self.Status.LOADING_FAILED
return self._status

@classmethod
def get_status_str(cls) -> str:
def get_status_str(self) -> str:
""" Return the status as a formatted string"""
if cls._blocked_until:
return f"Blocked for {cls.get_block_time_str()}"
elif cls._status & cls.Status.LOADING_FAILED:
return cls.Status.LOADING_FAILED.name
if self._blocked_until:
return f"Blocked for {self.get_block_time_str()}"
elif self._status & self.Status.LOADING_FAILED:
return self.Status.LOADING_FAILED.name
else:
return cls._status.name
return self._status.name

@classmethod
def get_blocked_until(cls):
return cls._blocked_until
def get_blocked_until(self):
return self._blocked_until

@classmethod
def get_block_time_str(cls):
if cls._blocked_until:
delta = (cls._blocked_until - datetime.now()).seconds
def get_block_time_str(self):
if self._blocked_until:
delta = (self._blocked_until - datetime.now()).seconds
if delta < 60:
return f"{delta}s"
else:
Expand Down
29 changes: 18 additions & 11 deletions home_connect_async/homeconnect.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from aiohttp_sse_client.client import MessageEvent

from .const import Events
from .common import ConditionalLogger, HomeConnectError, GlobalStatus
from .common import ConditionalLogger, HomeConnectError, HealthStatus
from .callback_registery import CallbackRegistry
from .appliance import Appliance
from .auth import AuthManager
Expand Down Expand Up @@ -55,6 +55,7 @@ class RefreshMode(Enum):
_api:Optional[HomeConnectApi] = field(default=None, metadata=config(encoder=lambda val: None, exclude=lambda val: True))
_updates_task:Optional[Task] = field(default=None, metadata=config(encoder=lambda val: None, exclude=lambda val: True))
_load_task:Optional[Task] = field(default=None, metadata=config(encoder=lambda val: None, exclude=lambda val: True))
_health:Optional[HealthStatus] = field(default=None, metadata=config(encoder=lambda val: None, exclude=lambda val: True))
_callbacks:Optional[CallbackRegistry] = field(default_factory=lambda: CallbackRegistry(), metadata=config(encoder=lambda val: None, exclude=lambda val: True))
_sse_timeout:Optional[int] = field(default=None)

Expand Down Expand Up @@ -82,7 +83,8 @@ async def async_create(cls,
If auto_update is set to False then subscribe_for_updates() should be called to receive real-time updates to the data
"""
api = HomeConnectApi(am, lang)
health = HealthStatus()
api = HomeConnectApi(am, lang, health)
hc:HomeConnect = None
if json_data:
try:
Expand All @@ -99,6 +101,7 @@ async def async_create(cls,
hc = HomeConnect()

hc._api = api
hc._health = health
hc._refresh_mode = refresh
hc._disabled_appliances = disabled_appliances
hc._sse_timeout = sse_timeout
Expand Down Expand Up @@ -135,8 +138,8 @@ async def async_load_data(self,
) -> None:
""" Loads or just refreshes the data model from the cloud service """
#self.status |= self.HomeConnectStatus.LOADING
GlobalStatus.set_status(GlobalStatus.Status.RUNNING)
GlobalStatus.unset_status(GlobalStatus.Status.LOADING_FAILED)
self._health.set_status(self._health.Status.RUNNING)
self._health.unset_status(self._health.Status.LOADING_FAILED)

try:
if refresh == self.RefreshMode.NOTHING:
Expand Down Expand Up @@ -182,11 +185,11 @@ async def async_load_data(self,
del self.appliances[haId]

#self.status |= self.HomeConnectStatus.LOADED
GlobalStatus.set_status(GlobalStatus.Status.LOADED)
self._health.set_status(self._health.Status.LOADED)
except Exception as ex:
_LOGGER.warning("Failed to load data from Home Connect (%s)", str(ex), exc_info=ex)
#self.status = self.HomeConnectStatus.LOADING_FAILED
GlobalStatus.set_status(GlobalStatus.Status.LOADING_FAILED)
self._health.set_status(self._health.Status.LOADING_FAILED)
if on_error:
if inspect.iscoroutinefunction(on_error):
await on_error(self, ex)
Expand Down Expand Up @@ -235,6 +238,10 @@ def __getitem__(self, haId) -> Appliance:
""" Supports simple access to an appliance based on its haId """
return self.appliances.get(haId)

@property
def health(self):
return self._health


#region - Event stream and updates

Expand All @@ -258,7 +265,7 @@ def parse_sse_error(error:str) -> int:
event_source = await self._api.async_get_event_stream('/api/homeappliances/events', self._sse_timeout)
await event_source.connect()
#self.status |= self.HomeConnectStatus.UPDATES
GlobalStatus.set_status(GlobalStatus.Status.UPDATES)
self._health.set_status(self._health.Status.UPDATES)

async for event in event_source:
_LOGGER.debug("Received event from SSE stream: %s", str(event))
Expand All @@ -271,11 +278,11 @@ def parse_sse_error(error:str) -> int:
break
except ConnectionRefusedError as ex:
#self.status &= self.HomeConnectStatus.NOUPDATES
GlobalStatus.unset_status(GlobalStatus.Status.UPDATES)
self._health.unset_status(self._health.Status.UPDATES)
_LOGGER.debug('ConnectionRefusedError in SSE connection refused. Will try again', exc_info=ex)
except ConnectionError as ex:
#self.status &= self.HomeConnectStatus.NOUPDATES
GlobalStatus.unset_status(GlobalStatus.Status.UPDATES)
self._health.unset_status(self._health.Status.UPDATES)
error_code = parse_sse_error(ex.args[0])
if error_code == 429:
backoff *= 2
Expand All @@ -294,7 +301,7 @@ def parse_sse_error(error:str) -> int:
_LOGGER.debug("The SSE connection timeed-out, will renew and retry")
except Exception as ex:
#self.status &= self.HomeConnectStatus.NOUPDATES
GlobalStatus.unset_status(GlobalStatus.Status.UPDATES)
self._health.unset_status(self._health.Status.UPDATES)
_LOGGER.debug('Exception in SSE event stream. Will wait for %d seconds and retry ', backoff, exc_info=ex)
await asyncio.sleep(backoff)
backoff *= 2
Expand All @@ -306,7 +313,7 @@ def parse_sse_error(error:str) -> int:
event_source = None

#self.status &= self.HomeConnectStatus.NOUPDATES
GlobalStatus.unset_status(GlobalStatus.Status.UPDATES)
self._health.unset_status(self._health.Status.UPDATES)
_LOGGER.debug("Exiting SSE event stream")


Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
setup(
name = 'home-connect-async',
packages = ['home_connect_async'],
version = '0.7.16',
version = '0.8.0',
license='MIT',
description = 'Async SDK for BSH Home Connect API',
author = 'Eran Kutner',
Expand Down

0 comments on commit ebd7e47

Please sign in to comment.