Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more structure to controllers #51

Merged
merged 7 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
250 changes: 146 additions & 104 deletions src/fastcs_eiger/eiger_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@
from collections.abc import Coroutine
from dataclasses import dataclass
from io import BytesIO
from itertools import product
from typing import Any, Literal

import numpy as np
from fastcs.attributes import Attribute, AttrR, AttrRW, AttrW
from fastcs.controller import Controller
from fastcs.controller import Controller, SubController
from fastcs.datatypes import Bool, Float, Int, String
from fastcs.wrappers import command, scan
from PIL import Image
Expand Down Expand Up @@ -62,27 +61,35 @@
Handler uses uri of detector to collect data for PVs
"""

name: str
uri: str
update_period: float = 0.2

async def put(self, controller: "EigerController", _: AttrW, value: Any) -> None:
GDYendell marked this conversation as resolved.
Show resolved Hide resolved
parameters_to_update = await controller.connection.put(self.name, value)
parameters_to_update = await controller.connection.put(self.uri, value)

Check warning on line 68 in src/fastcs_eiger/eiger_controller.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs_eiger/eiger_controller.py#L68

Added line #L68 was not covered by tests
if not parameters_to_update:
parameters_to_update = [self.name.split("/")[-1]]
parameters_to_update = [self.uri.split("/", 4)[-1]]

Check warning on line 70 in src/fastcs_eiger/eiger_controller.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs_eiger/eiger_controller.py#L70

Added line #L70 was not covered by tests
print(f"Manually fetching parameter {parameters_to_update}")
elif "difference_mode" in parameters_to_update:
parameters_to_update[parameters_to_update.index("difference_mode")] = (

Check warning on line 73 in src/fastcs_eiger/eiger_controller.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs_eiger/eiger_controller.py#L72-L73

Added lines #L72 - L73 were not covered by tests
"threshold/difference/mode"
)
print(

Check warning on line 76 in src/fastcs_eiger/eiger_controller.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs_eiger/eiger_controller.py#L76

Added line #L76 was not covered by tests
f"Fetching parameters after setting {self.uri}: {parameters_to_update},"
" replacing incorrect key 'difference_mode'"
)
else:
print(
f"Fetching parameters after setting {self.name}: {parameters_to_update}"
f"Fetching parameters after setting {self.uri}: {parameters_to_update}"
)

await controller.queue_update(parameters_to_update)

async def update(self, controller: "EigerController", attr: AttrR) -> None:
try:
response = await controller.connection.get(self.name)
response = await controller.connection.get(self.uri)

Check warning on line 89 in src/fastcs_eiger/eiger_controller.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs_eiger/eiger_controller.py#L89

Added line #L89 was not covered by tests
await attr.set(response["value"])
except Exception as e:
print(f"Failed to get {self.name}:\n{e.__class__.__name__} {e}")
print(f"Failed to get {self.uri}:\n{e.__class__.__name__} {e}")

Check warning on line 92 in src/fastcs_eiger/eiger_controller.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs_eiger/eiger_controller.py#L92

Added line #L92 was not covered by tests


class EigerConfigHandler(EigerHandler):
Expand Down Expand Up @@ -133,13 +140,6 @@
"""Mode of parameter within subsystem."""
response: dict[str, Any]
"""JSON response from GET of parameter."""
has_unique_key: bool = True
"""Whether this parameter has a unique key across all subsystems."""

@property
def name(self) -> str:
"""Unique name of parameter across all subsystems."""
return self.key if self.has_unique_key else f"{self.subsystem}_{self.key}"

@property
def uri(self) -> str:
Expand All @@ -151,6 +151,10 @@
EIGER_PARAMETER_MODES = EigerParameter.__annotations__["mode"].__args__


def _key_to_attribute_name(key: str):
return key.replace("/", "_")


class EigerController(Controller):
"""
Controller Class for Eiger Detector
Expand All @@ -173,7 +177,6 @@
self.connection = HTTPConnection(self._ip, self._port)

# Parameter update logic
self._parameter_updates: set[str] = set()
self._parameter_update_lock = asyncio.Lock()

async def initialise(self) -> None:
Expand All @@ -191,57 +194,147 @@
await self.initialize()

try:
parameters = await self._introspect_detector()
for subsystem in EIGER_PARAMETER_SUBSYSTEMS:
if subsystem == "detector":
controller = EigerDetectorController(
self.connection, self._parameter_update_lock
)
else:
controller = EigerSubsystemController(
subsystem, self.connection, self._parameter_update_lock
)
self.register_sub_controller(subsystem.upper(), controller)
jsouter marked this conversation as resolved.
Show resolved Hide resolved
await controller.initialise()
except HTTPRequestError:
print("\nAn HTTP request failed while introspecting detector:\n")
raise

attributes = self._create_attributes(parameters)
@detector_command
async def initialize(self):
await self.connection.put(command_uri("initialize"))

for name, attribute in attributes.items():
setattr(self, name, attribute)
@detector_command
async def arm(self):
await self.connection.put(command_uri("arm"))

Check warning on line 218 in src/fastcs_eiger/eiger_controller.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs_eiger/eiger_controller.py#L218

Added line #L218 was not covered by tests

@detector_command
async def trigger(self):
match self.trigger_mode.get(), self.trigger_exposure.get():
case ("inte", exposure) if exposure > 0.0:
await self.connection.put(command_uri("trigger"), exposure)
case ("ints" | "inte", _):
await self.connection.put(command_uri("trigger"))
case _:
raise RuntimeError("Can only do soft trigger in 'ints' or 'inte' mode")

Check warning on line 228 in src/fastcs_eiger/eiger_controller.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs_eiger/eiger_controller.py#L222-L228

Added lines #L222 - L228 were not covered by tests

@detector_command
async def disarm(self):
await self.connection.put(command_uri("disarm"))

Check warning on line 232 in src/fastcs_eiger/eiger_controller.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs_eiger/eiger_controller.py#L232

Added line #L232 was not covered by tests

@detector_command
async def abort(self):
await self.connection.put(command_uri("abort"))

Check warning on line 236 in src/fastcs_eiger/eiger_controller.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs_eiger/eiger_controller.py#L236

Added line #L236 was not covered by tests

@detector_command
async def cancel(self):
await self.connection.put(command_uri("cancel"))

Check warning on line 240 in src/fastcs_eiger/eiger_controller.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs_eiger/eiger_controller.py#L240

Added line #L240 was not covered by tests

@scan(0.1)
async def update(self):
"""Periodically check for parameters that need updating from the detector."""
await self.stale_parameters.set(

Check warning on line 245 in src/fastcs_eiger/eiger_controller.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs_eiger/eiger_controller.py#L245

Added line #L245 was not covered by tests
any(c.stale_parameters.get() for c in self.get_sub_controllers().values())
)
controller_updates = [c.update() for c in self.get_sub_controllers().values()]
await asyncio.gather(*controller_updates)

Check warning on line 249 in src/fastcs_eiger/eiger_controller.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs_eiger/eiger_controller.py#L248-L249

Added lines #L248 - L249 were not covered by tests
GDYendell marked this conversation as resolved.
Show resolved Hide resolved

@scan(1)
async def handle_monitor(self):
"""Poll monitor images to display."""
response, image_bytes = await self.connection.get_bytes(

Check warning on line 254 in src/fastcs_eiger/eiger_controller.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs_eiger/eiger_controller.py#L254

Added line #L254 was not covered by tests
"monitor/api/1.8.0/images/next"
)
if response.status != 200:
return

Check warning on line 258 in src/fastcs_eiger/eiger_controller.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs_eiger/eiger_controller.py#L257-L258

Added lines #L257 - L258 were not covered by tests
else:
image = Image.open(BytesIO(image_bytes))

Check warning on line 260 in src/fastcs_eiger/eiger_controller.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs_eiger/eiger_controller.py#L260

Added line #L260 was not covered by tests

# TODO: Populate waveform PV to display as image, once supported in PVI
print(np.array(image))

Check warning on line 263 in src/fastcs_eiger/eiger_controller.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs_eiger/eiger_controller.py#L263

Added line #L263 was not covered by tests
GDYendell marked this conversation as resolved.
Show resolved Hide resolved


class EigerSubsystemController(SubController):
stale_parameters = AttrR(Bool())

def __init__(
self,
subsystem: Literal["detector", "stream", "monitor"],
connection: HTTPConnection,
lock: asyncio.Lock,
):
self._subsystem = subsystem
self.connection = connection
self._parameter_update_lock = lock
self._parameter_updates: set[str] = set()
super().__init__()

async def _introspect_detector(self) -> list[EigerParameter]:
async def _introspect_detector_subsystem(self) -> list[EigerParameter]:
parameters = []
for subsystem, mode in product(
EIGER_PARAMETER_SUBSYSTEMS, EIGER_PARAMETER_MODES
):
for mode in EIGER_PARAMETER_MODES:
subsystem_keys = [
parameter
for parameter in await self.connection.get(
f"{subsystem}/api/1.8.0/{mode}/keys"
f"{self._subsystem}/api/1.8.0/{mode}/keys"
)
if parameter not in IGNORED_KEYS
] + MISSING_KEYS[subsystem][mode]
] + MISSING_KEYS[self._subsystem][mode]
requests = [
self.connection.get(f"{subsystem}/api/1.8.0/{mode}/{key}")
self.connection.get(f"{self._subsystem}/api/1.8.0/{mode}/{key}")
for key in subsystem_keys
]
responses = await asyncio.gather(*requests)

parameters.extend(
[
EigerParameter(
key=key, subsystem=subsystem, mode=mode, response=response
key=key, subsystem=self._subsystem, mode=mode, response=response
)
for key, response in zip(subsystem_keys, responses, strict=False)
]
)

return parameters

def _create_attributes(self, parameters: list[EigerParameter]):
async def initialise(self) -> None:
parameters = await self._introspect_detector_subsystem()
attributes = self._create_attributes(parameters)

for name, attribute in attributes.items():
setattr(self, name, attribute)

@classmethod
def _group(cls, parameter: EigerParameter):
return f"{parameter.subsystem.capitalize()}{parameter.mode.capitalize()}"

@classmethod
def _attribute_name(self, parameter: EigerParameter):
return _key_to_attribute_name(parameter.key)
GDYendell marked this conversation as resolved.
Show resolved Hide resolved

@classmethod
def _group_and_name(self, parameter: EigerParameter) -> tuple[str, str]:
return (self._group(parameter), self._attribute_name(parameter))

@classmethod
def _create_attributes(cls, parameters: list[EigerParameter]):
"""Create ``Attribute``s from ``EigerParameter``s.

Args:
parameters: ``EigerParameter``s to create ``Attributes`` from

"""
self._tag_key_clashes(parameters)

attributes: dict[str, Attribute] = {}
for parameter in parameters:
group = f"{parameter.subsystem.capitalize()}{parameter.mode.capitalize()}"
group, attribute_name = cls._group_and_name(parameter)
match parameter.response["value_type"]:
case "float":
datatype = Float()
Expand All @@ -254,9 +347,6 @@
case _:
print(f"Failed to handle {parameter}")

# Flatten nested uri keys - e.g. threshold/1/mode -> threshold_1_mode
attribute_name = parameter.name.replace("/", "_")

match parameter.response["access_mode"]:
case "r":
attributes[attribute_name] = AttrR(
Expand All @@ -274,53 +364,6 @@

return attributes

@staticmethod
def _tag_key_clashes(parameters: list[EigerParameter]):
"""Find key clashes between subsystems and tag parameters to use extended name.

Modifies list of parameters in place.

Args:
parameters: Parameters to search

"""
for idx, parameter in enumerate(parameters):
for other in parameters[idx + 1 :]:
if parameter.key == other.key:
parameter.has_unique_key = False
other.has_unique_key = False
break

@detector_command
async def initialize(self):
await self.connection.put(command_uri("initialize"))

@detector_command
async def arm(self):
await self.connection.put(command_uri("arm"))

@detector_command
async def trigger(self):
match self.trigger_mode.get(), self.trigger_exposure.get():
case ("inte", exposure) if exposure > 0.0:
await self.connection.put(command_uri("trigger"), exposure)
case ("ints" | "inte", _):
await self.connection.put(command_uri("trigger"))
case _:
raise RuntimeError("Can only do soft trigger in 'ints' or 'inte' mode")

@detector_command
async def disarm(self):
await self.connection.put(command_uri("disarm"))

@detector_command
async def abort(self):
await self.connection.put(command_uri("abort"))

@detector_command
async def cancel(self):
await self.connection.put(command_uri("cancel"))

async def queue_update(self, parameters: list[str]):
"""Add the given parameters to the list of parameters to update.

Expand All @@ -334,44 +377,43 @@

await self.stale_parameters.set(True)

@scan(0.1)
async def update(self):
"""Periodically check for parameters that need updating from the detector."""
if not self._parameter_updates:
if self.stale_parameters.get():
await self.stale_parameters.set(False)

return
GDYendell marked this conversation as resolved.
Show resolved Hide resolved

# Take a copy of the current parameters and clear. Parameters may be repopulated
# during this call and need to be updated again immediately.
async with self._parameter_update_lock:
parameters = self._parameter_updates.copy()
self._parameter_updates.clear()

# Release lock while fetching parameters - this may be slow
parameter_updates: list[Coroutine] = []
for parameter in parameters:
match getattr(self, parameter):
if parameter in IGNORED_KEYS:
continue
attr_name = _key_to_attribute_name(parameter)
GDYendell marked this conversation as resolved.
Show resolved Hide resolved
match getattr(self, attr_name, None):

Check warning on line 395 in src/fastcs_eiger/eiger_controller.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs_eiger/eiger_controller.py#L392-L395

Added lines #L392 - L395 were not covered by tests
# TODO: mypy doesn't understand AttrR as a type for some reason:
# `error: Expected type in class pattern; found "Any" [misc]`
case AttrR(updater=EigerConfigHandler() as updater) as attr: # type: ignore [misc]
parameter_updates.append(updater.config_update(self, attr))
case _:
print(f"Failed to handle update for {parameter}")

case _ as attr:
print(f"Failed to handle update for {parameter}: {attr}")

Check warning on line 401 in src/fastcs_eiger/eiger_controller.py

View check run for this annotation

Codecov / codecov/patch

src/fastcs_eiger/eiger_controller.py#L400-L401

Added lines #L400 - L401 were not covered by tests
await asyncio.gather(*parameter_updates)

@scan(1)
async def handle_monitor(self):
"""Poll monitor images to display."""
response, image_bytes = await self.connection.get_bytes(
"monitor/api/1.8.0/images/next"
)
if response.status != 200:
return
else:
image = Image.open(BytesIO(image_bytes))

# TODO: Populate waveform PV to display as image, once supported in PVI
print(np.array(image))
class EigerDetectorController(EigerSubsystemController):
def __init__(self, connection: HTTPConnection, lock: asyncio.Lock):
super().__init__("detector", connection, lock)

@classmethod
def _group_and_name(cls, parameter: EigerParameter) -> tuple[str, str]:
if "threshold" in parameter.key:
parts = parameter.key.split("/")
if len(parts) == 3 and parts[1].isnumeric():
group = f"Threshold{parts[1]}"
jsouter marked this conversation as resolved.
Show resolved Hide resolved
else:
group = "Threshold"
name = cls._attribute_name(parameter)
return (group, name)
return super()._group_and_name(parameter)
GDYendell marked this conversation as resolved.
Show resolved Hide resolved
Loading
Loading