Skip to content

Commit

Permalink
refactor mqtt subscription task
Browse files Browse the repository at this point in the history
  • Loading branch information
krahabb committed Oct 13, 2024
1 parent 4153377 commit 3c4d3fa
Showing 1 changed file with 52 additions and 38 deletions.
90 changes: 52 additions & 38 deletions custom_components/meross_lan/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@

if typing.TYPE_CHECKING:

import asyncio

from homeassistant.components.mqtt import async_publish as mqtt_async_publish
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import ServiceCall, ServiceResponse
Expand All @@ -63,7 +65,7 @@ class HAMQTTConnection(MQTTConnection):
"_unsub_mqtt_subscribe",
"_unsub_mqtt_disconnected",
"_unsub_mqtt_connected",
"_mqtt_subscribing",
"_mqtt_subscribe_future",
"_unsub_random_disconnect",
)

Expand All @@ -76,14 +78,14 @@ def __init__(self, api: "MerossApi"):
self._unsub_mqtt_subscribe: typing.Callable | None = None
self._unsub_mqtt_disconnected: typing.Callable | None = None
self._unsub_mqtt_connected: typing.Callable | None = None
self._mqtt_subscribing = False # guard for asynchronous mqtt sub registration
self._mqtt_subscribe_future: "asyncio.Future[bool] | None" = None
if MEROSSDEBUG:
# TODO : check bug in hass shutdown
async def _async_random_disconnect():
self._unsub_random_disconnect = api.schedule_async_callback(
60, _async_random_disconnect
)
if self._mqtt_subscribing:
if self._mqtt_subscribe_future:
return
elif self._unsub_mqtt_subscribe is None:
if MEROSSDEBUG.mqtt_random_connect():
Expand Down Expand Up @@ -130,46 +132,58 @@ async def _async_mqtt_publish(
def mqtt_is_subscribed(self):
return self._unsub_mqtt_subscribe is not None

async def async_mqtt_subscribe(self):
if not (self._mqtt_subscribing or self._unsub_mqtt_subscribe):
# dumb re-entrant code protection
self._mqtt_subscribing = True
with self.exception_warning("async_mqtt_subscribe"):
from homeassistant.components import mqtt

global mqtt_async_publish
mqtt_async_publish = mqtt.async_publish
hass = MerossApi.hass
self._unsub_mqtt_subscribe = await mqtt.async_subscribe(
hass, mc.TOPIC_DISCOVERY, self.async_mqtt_message
)
async def async_mqtt_subscribe(self) -> bool:
if self._unsub_mqtt_subscribe:
return True

@callback
def _connection_status_callback(connected: bool):
if connected:
self._mqtt_connected()
else:
self._mqtt_disconnected()

try:
# HA core 2024.6
self._unsub_mqtt_connected = mqtt.async_subscribe_connection_status(
hass, _connection_status_callback
)
except:
self._unsub_mqtt_disconnected = mqtt.async_dispatcher_connect(
hass, mqtt.MQTT_DISCONNECTED, self._mqtt_disconnected # type: ignore (removed in HA core 2024.6)
)
self._unsub_mqtt_connected = mqtt.async_dispatcher_connect(
hass, mqtt.MQTT_CONNECTED, self._mqtt_connected # type: ignore (removed in HA core 2024.6)
)
if mqtt.is_connected(hass):
if self._mqtt_subscribe_future:
return await self._mqtt_subscribe_future

hass = MerossApi.hass
self._mqtt_subscribe_future = hass.loop.create_future()
try:
from homeassistant.components import mqtt

global mqtt_async_publish
mqtt_async_publish = mqtt.async_publish

self._unsub_mqtt_subscribe = await mqtt.async_subscribe(
hass, mc.TOPIC_DISCOVERY, self.async_mqtt_message
)

@callback
def _connection_status_callback(connected: bool):
if connected:
self._mqtt_connected()
self._mqtt_subscribing = False
else:
self._mqtt_disconnected()

return self._unsub_mqtt_subscribe is not None
try:
# HA core 2024.6
self._unsub_mqtt_connected = mqtt.async_subscribe_connection_status(
hass, _connection_status_callback
)
except:
self._unsub_mqtt_disconnected = mqtt.async_dispatcher_connect(
hass, mqtt.MQTT_DISCONNECTED, self._mqtt_disconnected # type: ignore (removed in HA core 2024.6)
)
self._unsub_mqtt_connected = mqtt.async_dispatcher_connect(
hass, mqtt.MQTT_CONNECTED, self._mqtt_connected # type: ignore (removed in HA core 2024.6)
)
if mqtt.is_connected(hass):
self._mqtt_connected()
result = True
except Exception as exception:
self.log_exception(self.WARNING, exception, "async_mqtt_subscribe")
result = False

self._mqtt_subscribe_future.set_result(result)
self._mqtt_subscribe_future = None
return result

async def async_mqtt_unsubscribe(self):
if self._mqtt_subscribe_future:
await self._mqtt_subscribe_future
if self._unsub_mqtt_connected:
self._unsub_mqtt_connected()
self._unsub_mqtt_connected = None
Expand Down

0 comments on commit 3c4d3fa

Please sign in to comment.