Skip to content

Commit

Permalink
feat: service call adaptation (#628)
Browse files Browse the repository at this point in the history
* feat: service call adaptation

* feat: toggle-on service call adaptation

* feat: prefer service call transition
  • Loading branch information
protyposis authored Jul 19, 2023
1 parent d16a9a5 commit ed80bd7
Show file tree
Hide file tree
Showing 6 changed files with 610 additions and 67 deletions.
3 changes: 1 addition & 2 deletions custom_components/adaptive_lighting/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ async def async_unload_entry(hass, config_entry: ConfigEntry) -> bool:
if len(data) == 1 and ATTR_TURN_ON_OFF_LISTENER in data:
# no more config_entries
turn_on_off_listener = data.pop(ATTR_TURN_ON_OFF_LISTENER)
turn_on_off_listener.remove_listener()
turn_on_off_listener.remove_listener2()
turn_on_off_listener.disable()

if not data:
hass.data.pop(DOMAIN)
Expand Down
2 changes: 2 additions & 0 deletions custom_components/adaptive_lighting/adaptation_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ class AdaptationData:
context: Context
sleep_time: float
service_call_datas: AsyncGenerator[ServiceData, None]
initial_sleep: bool = False

async def next_service_call_data(self) -> ServiceData | None:
"""Return data for the next service call, or none if no more data exists."""
Expand All @@ -149,6 +150,7 @@ def prepare_adaptation_data(
split: bool,
filter_by_state: bool,
) -> AdaptationData:
"Prepares a data object carrying all data required to execute an adaptation."
service_datas = (
[service_data] if not split else _split_service_call_data(service_data)
)
Expand Down
64 changes: 64 additions & 0 deletions custom_components/adaptive_lighting/hass_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""Utility functions for HA core."""
from collections.abc import Awaitable
from typing import Callable

from homeassistant.core import HomeAssistant, ServiceCall
from homeassistant.util.read_only_dict import ReadOnlyDict

from .adaptation_utils import ServiceData


def setup_service_call_interceptor(
hass: HomeAssistant,
domain: str,
service: str,
intercept_func: Callable[[ServiceCall, ServiceData], Awaitable[None] | None],
) -> Callable[[], None]:
"""Inject a function into a registered service call to preprocess service data.
The injected interceptor function receives the service call and a writeable data dictionary
(the data of the service call is read-only) before the service call is executed."""
try:
# HACK: Access protected attribute of HA service registry.
# This is necessary to replace a registered service handler with our
# proxy handler to intercept calls.
registered_services = (
hass.services._services # pylint: disable=protected-access
)
except AttributeError as error:
raise RuntimeError(
"Intercept failed because registered services are no longer accessible "
"(internal API may have changed)"
) from error

if domain not in registered_services or service not in registered_services[domain]:
raise RuntimeError(
f"Intercept failed because service {domain}.{service} is not registered"
)

existing_service = registered_services[domain][service]

async def service_func_proxy(call: ServiceCall) -> None:
# Convert read-only data to writeable dictionary for modification by interceptor
data = dict(call.data)

# Call interceptor
await intercept_func(call, data)

# Convert data back to read-only
call.data = ReadOnlyDict(data)

# Call original service handler with processed data
await existing_service.job.target(call)

hass.services.async_register(
domain, service, service_func_proxy, existing_service.schema
)

def remove():
# Remove the interceptor by reinstalling the original service handler
hass.services.async_register(
domain, service, existing_service.job.target, existing_service.schema
)

return remove
Loading

0 comments on commit ed80bd7

Please sign in to comment.