Skip to content

Commit

Permalink
Cleanup old SSE handling code in Api class
Browse files Browse the repository at this point in the history
  • Loading branch information
ekutner committed Dec 21, 2023
1 parent d4d4b34 commit 8e28ac6
Showing 1 changed file with 55 additions and 55 deletions.
110 changes: 55 additions & 55 deletions home_connect_async/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,58 +130,58 @@ async def async_get_event_stream(self, endpoint:str, timeout:int):
return await self._auth.stream(endpoint, self._lang, timeout)


async def async_stream(self, endpoint:str, timeout:int, event_handler:Callable[[str], None]):
""" Implements a SSE consumer which calls the defined event handler on every new event"""
backoff = 2
event_source = None
while True:
try:
event_source = await self._auth.stream(endpoint, self._lang, timeout)
await event_source.connect()
async for event in event_source:
backoff = 1
try:
await event_handler(event)
except Exception as ex:
_LOGGER.exception('Unhandled exception in stream event handler', exc_info=ex)
except asyncio.CancelledError:
break
except ConnectionRefusedError as ex:
_LOGGER.exception('ConnectionRefusedError in SSE connection refused. Will try again', exc_info=ex)
except ConnectionError as ex:
error_code = self.parse_sse_error(ex.args[0])
if error_code == 429:
backoff *= 2
if backoff > 3600: backoff = 3600
elif backoff < 60: backoff = 60
_LOGGER.info('Got error 429 when opening event stream connection, will sleep for %s seconds and retry', backoff)
else:
_LOGGER.exception('ConnectionError in SSE event stream. Will wait for %d seconds and retry ', backoff, exc_info=ex)
backoff *= 2
if backoff > 120: backoff = 120

await asyncio.sleep(backoff)

except asyncio.TimeoutError:
# it is expected that the connection will time out every hour
_LOGGER.debug("The SSE connection timeout, will renew and retry")
except Exception as ex:
_LOGGER.exception('Exception in SSE event stream. Will wait for %d seconds and retry ', backoff, exc_info=ex)
await asyncio.sleep(backoff)
backoff *= 2
if backoff > 120: backoff = 120

finally:
if event_source:
await event_source.close()
event_source = None


def parse_sse_error(self, error:str) -> int:
""" Helper function to parse the error code from a SSE exception """
try:
parts = error.split(': ')
error_code = int(parts[-1])
return error_code
except:
return 0
# async def async_stream(self, endpoint:str, timeout:int, event_handler:Callable[[str], None]):
# """ Implements a SSE consumer which calls the defined event handler on every new event"""
# backoff = 2
# event_source = None
# while True:
# try:
# event_source = await self._auth.stream(endpoint, self._lang, timeout)
# await event_source.connect()
# async for event in event_source:
# backoff = 1
# try:
# await event_handler(event)
# except Exception as ex:
# _LOGGER.exception('Unhandled exception in stream event handler', exc_info=ex)
# except asyncio.CancelledError:
# break
# except ConnectionRefusedError as ex:
# _LOGGER.exception('ConnectionRefusedError in SSE connection refused. Will try again', exc_info=ex)
# except ConnectionError as ex:
# error_code = self.parse_sse_error(ex.args[0])
# if error_code == 429:
# backoff *= 2
# if backoff > 3600: backoff = 3600
# elif backoff < 60: backoff = 60
# _LOGGER.info('Got error 429 when opening event stream connection, will sleep for %s seconds and retry', backoff)
# else:
# _LOGGER.exception('ConnectionError in SSE event stream. Will wait for %d seconds and retry ', backoff, exc_info=ex)
# backoff *= 2
# if backoff > 120: backoff = 120

# await asyncio.sleep(backoff)

# except asyncio.TimeoutError:
# # it is expected that the connection will time out every hour
# _LOGGER.debug("The SSE connection timeout, will renew and retry")
# except Exception as ex:
# _LOGGER.exception('Exception in SSE event stream. Will wait for %d seconds and retry ', backoff, exc_info=ex)
# await asyncio.sleep(backoff)
# backoff *= 2
# if backoff > 120: backoff = 120

# finally:
# if event_source:
# await event_source.close()
# event_source = None


# def parse_sse_error(self, error:str) -> int:
# """ Helper function to parse the error code from a SSE exception """
# try:
# parts = error.split(': ')
# error_code = int(parts[-1])
# return error_code
# except:
# return 0

0 comments on commit 8e28ac6

Please sign in to comment.