Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more structure to controllers #51

Merged
merged 7 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
249 changes: 151 additions & 98 deletions src/fastcs_eiger/eiger_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@
from collections.abc import Coroutine
from dataclasses import dataclass
from io import BytesIO
from itertools import product
from typing import Any, Literal

import numpy as np
from fastcs.attributes import Attribute, AttrR, AttrRW, AttrW
from fastcs.controller import Controller
from fastcs.controller import Controller, SubController
from fastcs.datatypes import Bool, Float, Int, String
from fastcs.wrappers import command, scan
from PIL import Image
Expand Down Expand Up @@ -62,27 +61,37 @@
Handler uses uri of detector to collect data for PVs
"""

name: str
uri: str
update_period: float = 0.2

async def put(self, controller: "EigerController", _: AttrW, value: Any) -> None:
parameters_to_update = await controller.connection.put(self.name, value)
async def put(
self, controller: "EigerSubsystemController", _: AttrW, value: Any
) -> None:
parameters_to_update = await controller.connection.put(self.uri, value)

Check warning on line 70 in src/fastcs_eiger/eiger_controller.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs_eiger/eiger_controller.py#L70

Added line #L70 was not covered by tests
if not parameters_to_update:
parameters_to_update = [self.name.split("/")[-1]]
parameters_to_update = [self.uri.split("/", 4)[-1]]

Check warning on line 72 in src/fastcs_eiger/eiger_controller.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs_eiger/eiger_controller.py#L72

Added line #L72 was not covered by tests
print(f"Manually fetching parameter {parameters_to_update}")
elif "difference_mode" in parameters_to_update:
parameters_to_update[parameters_to_update.index("difference_mode")] = (

Check warning on line 75 in src/fastcs_eiger/eiger_controller.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs_eiger/eiger_controller.py#L74-L75

Added lines #L74 - L75 were not covered by tests
"threshold/difference/mode"
)
print(

Check warning on line 78 in src/fastcs_eiger/eiger_controller.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs_eiger/eiger_controller.py#L78

Added line #L78 was not covered by tests
f"Fetching parameters after setting {self.uri}: {parameters_to_update},"
" replacing incorrect key 'difference_mode'"
)
else:
print(
f"Fetching parameters after setting {self.name}: {parameters_to_update}"
f"Fetching parameters after setting {self.uri}: {parameters_to_update}"
)

await controller.queue_update(parameters_to_update)

async def update(self, controller: "EigerController", attr: AttrR) -> None:
try:
response = await controller.connection.get(self.name)
response = await controller.connection.get(self.uri)

Check warning on line 91 in src/fastcs_eiger/eiger_controller.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs_eiger/eiger_controller.py#L91

Added line #L91 was not covered by tests
await attr.set(response["value"])
except Exception as e:
print(f"Failed to get {self.name}:\n{e.__class__.__name__} {e}")
print(f"Failed to get {self.uri}:\n{e.__class__.__name__} {e}")

Check warning on line 94 in src/fastcs_eiger/eiger_controller.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs_eiger/eiger_controller.py#L94

Added line #L94 was not covered by tests


class EigerConfigHandler(EigerHandler):
Expand Down Expand Up @@ -133,13 +142,10 @@
"""Mode of parameter within subsystem."""
response: dict[str, Any]
"""JSON response from GET of parameter."""
has_unique_key: bool = True
"""Whether this parameter has a unique key across all subsystems."""

@property
def name(self) -> str:
"""Unique name of parameter across all subsystems."""
return self.key if self.has_unique_key else f"{self.subsystem}_{self.key}"
def attribute_name(self):
return _key_to_attribute_name(self.key)

@property
def uri(self) -> str:
Expand All @@ -151,6 +157,10 @@
EIGER_PARAMETER_MODES = EigerParameter.__annotations__["mode"].__args__


def _key_to_attribute_name(key: str):
return key.replace("/", "_")


class EigerController(Controller):
"""
Controller Class for Eiger Detector
Expand All @@ -159,12 +169,8 @@
Sets up all connections with the Simplon API to send and receive information
"""

# Detector parameters to use in internal logic
trigger_mode = AttrRW(String()) # TODO: Include URI and validate type from API

# Internal Attributes
# Internal Attribute
stale_parameters = AttrR(Bool())
trigger_exposure = AttrRW(Float(), handler=LogicHandler())

def __init__(self, ip: str, port: int) -> None:
super().__init__()
Expand All @@ -173,7 +179,6 @@
self.connection = HTTPConnection(self._ip, self._port)

# Parameter update logic
self._parameter_updates: set[str] = set()
self._parameter_update_lock = asyncio.Lock()

async def initialise(self) -> None:
Expand All @@ -184,64 +189,118 @@
"""
self.connection.open()

# Check current state of detector_state to see if initializing is required.
state_val = await self.connection.get("detector/api/1.8.0/status/state")
if state_val["value"] == "na":
print("Initializing Detector")
await self.initialize()

try:
parameters = await self._introspect_detector()
for subsystem in EIGER_PARAMETER_SUBSYSTEMS:
match subsystem:
case "detector":
controller = EigerDetectorController(
self.connection, self._parameter_update_lock
)
# detector subsystem initialises first
# Check current state of detector_state to see
# if initializing is required.
state_val = await self.connection.get(
"detector/api/1.8.0/status/state"
)
if state_val["value"] == "na":
print("Initializing Detector")
# send initialize command to detector
await controller.initialize()
case "monitor":
controller = EigerMonitorController(
self.connection, self._parameter_update_lock
)
case "stream":
controller = EigerStreamController(
self.connection, self._parameter_update_lock
)
case _:
raise NotImplementedError(

Check warning on line 218 in src/fastcs_eiger/eiger_controller.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs_eiger/eiger_controller.py#L217-L218

Added lines #L217 - L218 were not covered by tests
f"No subcontroller implemented for subsystem {subsystem}"
)
self.register_sub_controller(subsystem.capitalize(), controller)
await controller.initialise()

except HTTPRequestError:
print("\nAn HTTP request failed while introspecting detector:\n")
raise

attributes = self._create_attributes(parameters)
@scan(0.1)
async def update(self):
"""Periodically check for parameters that need updating from the detector."""
await self.stale_parameters.set(

Check warning on line 231 in src/fastcs_eiger/eiger_controller.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs_eiger/eiger_controller.py#L231

Added line #L231 was not covered by tests
any(c.stale_parameters.get() for c in self.get_sub_controllers().values())
)
controller_updates = [c.update() for c in self.get_sub_controllers().values()]
await asyncio.gather(*controller_updates)

Check warning on line 235 in src/fastcs_eiger/eiger_controller.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs_eiger/eiger_controller.py#L234-L235

Added lines #L234 - L235 were not covered by tests
GDYendell marked this conversation as resolved.
Show resolved Hide resolved

for name, attribute in attributes.items():
setattr(self, name, attribute)

async def _introspect_detector(self) -> list[EigerParameter]:
class EigerSubsystemController(SubController):
_subsystem: Literal["detector", "stream", "monitor"]
stale_parameters = AttrR(Bool())

def __init__(
self,
connection: HTTPConnection,
lock: asyncio.Lock,
):
self.connection = connection
self._parameter_update_lock = lock
self._parameter_updates: set[str] = set()
super().__init__()

async def _introspect_detector_subsystem(self) -> list[EigerParameter]:
parameters = []
for subsystem, mode in product(
EIGER_PARAMETER_SUBSYSTEMS, EIGER_PARAMETER_MODES
):
for mode in EIGER_PARAMETER_MODES:
subsystem_keys = [
parameter
for parameter in await self.connection.get(
f"{subsystem}/api/1.8.0/{mode}/keys"
f"{self._subsystem}/api/1.8.0/{mode}/keys"
)
if parameter not in IGNORED_KEYS
] + MISSING_KEYS[subsystem][mode]
] + MISSING_KEYS[self._subsystem][mode]
requests = [
self.connection.get(f"{subsystem}/api/1.8.0/{mode}/{key}")
self.connection.get(f"{self._subsystem}/api/1.8.0/{mode}/{key}")
for key in subsystem_keys
]
responses = await asyncio.gather(*requests)

parameters.extend(
[
EigerParameter(
key=key, subsystem=subsystem, mode=mode, response=response
key=key, subsystem=self._subsystem, mode=mode, response=response
)
for key, response in zip(subsystem_keys, responses, strict=False)
]
)

return parameters

def _create_attributes(self, parameters: list[EigerParameter]):
async def initialise(self) -> None:
parameters = await self._introspect_detector_subsystem()
attributes = self._create_attributes(parameters)

for name, attribute in attributes.items():
setattr(self, name, attribute)

@classmethod
def _group(cls, parameter: EigerParameter):
if "/" in parameter.key:
group_parts = parameter.key.split("/")[:-1]
# e.g. "threshold/difference/mode" -> ThresholdDifference
return "".join(list(map(str.capitalize, group_parts)))
return f"{parameter.subsystem.capitalize()}{parameter.mode.capitalize()}"

@classmethod
def _create_attributes(cls, parameters: list[EigerParameter]):
"""Create ``Attribute``s from ``EigerParameter``s.

Args:
parameters: ``EigerParameter``s to create ``Attributes`` from

"""
self._tag_key_clashes(parameters)

attributes: dict[str, Attribute] = {}
for parameter in parameters:
group = f"{parameter.subsystem.capitalize()}{parameter.mode.capitalize()}"
match parameter.response["value_type"]:
case "float":
datatype = Float()
Expand All @@ -254,18 +313,16 @@
case _:
print(f"Failed to handle {parameter}")

# Flatten nested uri keys - e.g. threshold/1/mode -> threshold_1_mode
attribute_name = parameter.name.replace("/", "_")

group = cls._group(parameter)
match parameter.response["access_mode"]:
case "r":
attributes[attribute_name] = AttrR(
attributes[parameter.attribute_name] = AttrR(
datatype,
handler=EIGER_HANDLERS[parameter.mode](parameter.uri),
group=group,
)
case "rw":
attributes[attribute_name] = AttrRW(
attributes[parameter.attribute_name] = AttrRW(
datatype,
handler=EIGER_HANDLERS[parameter.mode](parameter.uri),
group=group,
Expand All @@ -274,22 +331,51 @@

return attributes

@staticmethod
def _tag_key_clashes(parameters: list[EigerParameter]):
"""Find key clashes between subsystems and tag parameters to use extended name.

Modifies list of parameters in place.
async def queue_update(self, parameters: list[str]):
"""Add the given parameters to the list of parameters to update.

Args:
parameters: Parameters to search
parameters: Parameters to be updated

"""
for idx, parameter in enumerate(parameters):
for other in parameters[idx + 1 :]:
if parameter.key == other.key:
parameter.has_unique_key = False
other.has_unique_key = False
break
async with self._parameter_update_lock:
for parameter in parameters:
self._parameter_updates.add(parameter)

await self.stale_parameters.set(True)

async def update(self):
if not self._parameter_updates:
if self.stale_parameters.get():
await self.stale_parameters.set(False)
return

Check warning on line 351 in src/fastcs_eiger/eiger_controller.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs_eiger/eiger_controller.py#L348-L351

Added lines #L348 - L351 were not covered by tests

async with self._parameter_update_lock:
keys_to_check = self._parameter_updates.copy()
self._parameter_updates.clear()

Check warning on line 355 in src/fastcs_eiger/eiger_controller.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs_eiger/eiger_controller.py#L353-L355

Added lines #L353 - L355 were not covered by tests

# Release lock while fetching parameters - this may be slow
parameter_updates: list[Coroutine] = []
for key in keys_to_check:
if key in IGNORED_KEYS:
continue
attr_name = _key_to_attribute_name(key)
match getattr(self, attr_name, None):

Check warning on line 363 in src/fastcs_eiger/eiger_controller.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs_eiger/eiger_controller.py#L358-L363

Added lines #L358 - L363 were not covered by tests
# TODO: mypy doesn't understand AttrR as a type for some reason:
# `error: Expected type in class pattern; found "Any" [misc]`
case AttrR(updater=EigerConfigHandler() as updater) as attr: # type: ignore [misc]
parameter_updates.append(updater.config_update(self, attr))
case _ as attr:
print(f"Failed to handle update for {key}: {attr}")
await asyncio.gather(*parameter_updates)

Check warning on line 370 in src/fastcs_eiger/eiger_controller.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs_eiger/eiger_controller.py#L366-L370

Added lines #L366 - L370 were not covered by tests


class EigerDetectorController(EigerSubsystemController):
_subsystem = "detector"

# Detector parameters to use in internal logic
trigger_mode = AttrRW(String()) # TODO: Include URI and validate type from API
trigger_exposure = AttrRW(Float(), handler=LogicHandler())

@detector_command
async def initialize(self):
Expand Down Expand Up @@ -321,46 +407,9 @@
async def cancel(self):
await self.connection.put(command_uri("cancel"))

async def queue_update(self, parameters: list[str]):
"""Add the given parameters to the list of parameters to update.

Args:
parameters: Parameters to be updated

"""
async with self._parameter_update_lock:
for parameter in parameters:
self._parameter_updates.add(parameter)

await self.stale_parameters.set(True)

@scan(0.1)
async def update(self):
"""Periodically check for parameters that need updating from the detector."""
if not self._parameter_updates:
if self.stale_parameters.get():
await self.stale_parameters.set(False)

return
GDYendell marked this conversation as resolved.
Show resolved Hide resolved

# Take a copy of the current parameters and clear. Parameters may be repopulated
# during this call and need to be updated again immediately.
async with self._parameter_update_lock:
parameters = self._parameter_updates.copy()
self._parameter_updates.clear()

# Release lock while fetching parameters - this may be slow
parameter_updates: list[Coroutine] = []
for parameter in parameters:
match getattr(self, parameter):
# TODO: mypy doesn't understand AttrR as a type for some reason:
# `error: Expected type in class pattern; found "Any" [misc]`
case AttrR(updater=EigerConfigHandler() as updater) as attr: # type: ignore [misc]
parameter_updates.append(updater.config_update(self, attr))
case _:
print(f"Failed to handle update for {parameter}")

await asyncio.gather(*parameter_updates)
class EigerMonitorController(EigerSubsystemController):
_subsystem = "monitor"

@scan(1)
async def handle_monitor(self):
Expand All @@ -375,3 +424,7 @@

# TODO: Populate waveform PV to display as image, once supported in PVI
print(np.array(image))


class EigerStreamController(EigerSubsystemController):
_subsystem = "stream"
Loading
Loading