Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update_parameters_experimental w/ sync run_udf_iter #135

Merged
merged 3 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 50 additions & 18 deletions src/libertem_live/api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import contextlib
from typing import TYPE_CHECKING, overload, Union, Optional
from typing_extensions import Literal

Expand Down Expand Up @@ -27,6 +26,32 @@
tracer = trace.get_tracer(__name__)


class CleanupResultGeneratorProxy:
def __init__(self, inner, callback):
self._inner = inner
self._inner_iter = iter(inner)
self._callback = callback

def __iter__(self):
return self

def __next__(self):
try:
return next(self._inner_iter)
except Exception:
self._callback()
raise

def close(self):
try:
self._inner.close()
finally:
self._callback()

def __getattr__(self, name):
return getattr(self._inner, name)


class LiveContext(LiberTEM_Context):
'''
:class:`LiveContext` handles the computational resources needed to run
Expand All @@ -51,14 +76,13 @@ def _create_local_executor(self):
'''
return PipelinedExecutor()

@contextlib.contextmanager
def _do_acquisition(self, acquisition, udf):
with tracer.start_as_current_span("LiveContext._do_acquisition"):
if hasattr(acquisition, 'acquire'):
with acquisition.acquire():
yield
else:
yield
def _start_acquisition(self, acquisition, udf):
if hasattr(acquisition, 'start_acquisition'):
acquisition.start_acquisition()

def _end_acquisition(self, acquisition, udf):
if hasattr(acquisition, 'end_acquisition'):
acquisition.end_acquisition()

@overload
def make_connection(
Expand Down Expand Up @@ -120,6 +144,8 @@ def make_connection(
... print("connected!")
connected!
"""
# FIXME implement similar to LiberTEM datasets once
# we have more detector types to support
if detector_type == 'dectris':
from libertem_live.detectors.dectris import DectrisConnectionBuilder
return DectrisConnectionBuilder()
Expand Down Expand Up @@ -229,8 +255,6 @@ def make_acquisition(
return instance.initialize(self.executor)

def prepare_acquisition(self, detector_type, *args, trigger=None, **kwargs):
# FIXME implement similar to LiberTEM datasets once
# we have more detector types to support
'''
This method has been removed, please use `make_connection` and `make_acquisition`.
'''
Expand All @@ -241,14 +265,22 @@ def prepare_acquisition(self, detector_type, *args, trigger=None, **kwargs):

def _run_sync(self, dataset, udf, iterate=False, *args, **kwargs):
def _run_sync_iterate():
with self._do_acquisition(dataset, udf):
res = super(LiveContext, self)._run_sync(
dataset=dataset, udf=udf, iterate=iterate, *args, **kwargs
)
yield from res
self._start_acquisition(dataset, udf)
res = super(LiveContext, self)._run_sync(
dataset=dataset, udf=udf, iterate=iterate, *args, **kwargs
)
return CleanupResultGeneratorProxy(
res, callback=lambda: self._end_acquisition(dataset, udf)
)

if iterate:
return _run_sync_iterate()
else:
with self._do_acquisition(dataset, udf):
return super()._run_sync(dataset=dataset, udf=udf, iterate=iterate, *args, **kwargs)
self._start_acquisition(dataset, udf)
try:
result = super()._run_sync(
dataset=dataset, udf=udf, iterate=iterate, *args, **kwargs
)
return result
finally:
self._end_acquisition(dataset, udf)
9 changes: 5 additions & 4 deletions src/libertem_live/detectors/asi_tpx3/acquisition.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,15 +242,16 @@ def get_correction_data(self):
def _acquire_active(self):
raise NotImplementedError("")

@contextmanager
def acquire(self):
with tracer.start_as_current_span('acquire'):
def start_acquisition(self):
with tracer.start_as_current_span('start_acquisition'):
if self._pending_aq is None: # active mode:
with tracer.start_as_current_span("AsiAcquisition.on_ready_for_data"):
self._hooks.on_ready_for_data(ReadyForDataEnv(aq=self))
else:
pass # passive mode
yield

def end_acquisition(self):
return

def check_valid(self):
""
Expand Down
7 changes: 4 additions & 3 deletions src/libertem_live/detectors/base/acquisition.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from typing import TYPE_CHECKING, Optional
from typing_extensions import Protocol
from contextlib import contextmanager
import logging
import math

Expand Down Expand Up @@ -166,8 +165,10 @@ def __init__(

super().__init__()

@contextmanager
def acquire(self):
def start_acquisition(self):
raise NotImplementedError()

def end_acquisition(self):
raise NotImplementedError()


Expand Down
9 changes: 4 additions & 5 deletions src/libertem_live/detectors/dectris/acquisition.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from contextlib import contextmanager
import base64
import logging
import time
Expand Down Expand Up @@ -355,10 +354,9 @@ def get_correction_data(self):
excluded_pixels = mask_arr > 0
return CorrectionSet(excluded_pixels=excluded_pixels)

@contextmanager
def acquire(self):
def start_acquisition(self):
''
with tracer.start_as_current_span('acquire'):
with tracer.start_as_current_span('start_acquisition'):
if self._controller is not None:
self._conn.prepare_for_active()
self._controller.apply_file_writing()
Expand All @@ -381,7 +379,8 @@ def acquire(self):
nimages=nimages,
)

yield
def end_acquisition(self):
return

def check_valid(self):
''
Expand Down
8 changes: 4 additions & 4 deletions src/libertem_live/detectors/memory/acquisition.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from contextlib import contextmanager
from typing import Optional
import logging

Expand Down Expand Up @@ -108,10 +107,11 @@ def __init__(
**conn._extra_kwargs,
)

@contextmanager
def acquire(self):
def start_acquisition(self):
if self._pending_aq is None:
self._hooks.on_ready_for_data(
ReadyForDataEnv(aq=self),
)
yield

def end_acquisition(self):
pass
36 changes: 16 additions & 20 deletions src/libertem_live/detectors/merlin/acquisition.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from contextlib import contextmanager
import logging
from typing import Optional, NamedTuple
from collections.abc import Generator, Iterator
Expand Down Expand Up @@ -136,23 +135,18 @@ def shape(self):
def meta(self):
return self._meta

@contextmanager
def acquire(self):
def start_acquisition(self):
if self._pending_aq is None:
# active case, we manage the connection:
with self._conn:
self._conn.maybe_drain()
with tracer.start_as_current_span("MerlinAcquisition.trigger"):
self._hooks.on_ready_for_data(ReadyForDataEnv(aq=self))
acq_header, stream = self._conn.get_header_and_stream()
self._acq_state = AcqState(
acq_header=acq_header,
stream=stream,
)
try:
yield
finally:
self._acq_state = None
self._conn.connect()
self._conn.maybe_drain()
with tracer.start_as_current_span("MerlinAcquisition.trigger"):
self._hooks.on_ready_for_data(ReadyForDataEnv(aq=self))
acq_header, stream = self._conn.get_header_and_stream()
self._acq_state = AcqState(
acq_header=acq_header,
stream=stream,
)
else:
# passive case, we don't manage the connection and we definitely
# don't drain anything out of the socket!
Expand All @@ -165,10 +159,12 @@ def acquire(self):
acq_header=acq_header,
stream=stream,
)
try:
yield
finally:
self._acq_state = None

def end_acquisition(self):
# active case, we manage the connection:
if self._pending_aq is None:
self._conn.close()
self._acq_state = None

def check_valid(self):
''
Expand Down
11 changes: 7 additions & 4 deletions src/libertem_live/detectors/merlin/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,13 @@
self._api_port = api_port
self._data_host = data_host
self._data_port = data_port
self._data_socket: Optional[MerlinRawSocket] = None
self._drain = drain
self._connect()
self.connect()

def _connect(self):
def connect(self):
if self._data_socket is not None:
return self._data_socket # already connected
self._data_socket = MerlinRawSocket(
host=self._data_host,
port=self._data_port,
Expand Down Expand Up @@ -133,7 +136,7 @@
{'intensity': ...}
"""
if self._data_socket is None:
self._connect()
self.connect()

Check warning on line 139 in src/libertem_live/detectors/merlin/connection.py

View check run for this annotation

Codecov / codecov/patch

src/libertem_live/detectors/merlin/connection.py#L139

Added line #L139 was not covered by tests
assert self._data_socket is not None
try:
header = self._data_socket.read_acquisition_header(cancel_timeout=timeout)
Expand All @@ -156,7 +159,7 @@

def __enter__(self):
if self._data_socket is None:
self._connect()
self.connect()
return self

def close(self):
Expand Down
16 changes: 16 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import contextlib

import numpy as np
import pytest

Expand Down Expand Up @@ -53,3 +55,17 @@ def test_dataset(ltl_ctx):
def test_removed_api(ltl_ctx):
with pytest.raises(RuntimeError):
ltl_ctx.prepare_acquisition('merlin')


def test_update_parameters_iter_sync(ltl_ctx):
data = np.random.random((13, 17, 19, 23))

with ltl_ctx.make_connection('memory').open(data=data) as conn:
aq = ltl_ctx.make_acquisition(conn=conn)

udf1 = SignalMonitorUDF()
udf2 = NoOpUDF()
res_iter = ltl_ctx.run_udf_iter(dataset=aq, udf=[udf1, udf2], sync=True)
with contextlib.closing(res_iter):
for item in res_iter:
res_iter.update_parameters_experimental([{}, {}])
Loading