+ Blop is a Python library for performing optimization for beamline
+ experiments. It is designed to integrate nicely with the Bluesky
+ ecosystem and primarily targets rapid beamline data acquisition
+ and control.
+
+
+ Our goal is to provide a simple and practical data-driven
+ optimization interface for beamline experiments.
+
+
+
+
+
+ [Visualization Placeholder]
+
+
+
+
+
+
+
+
Installation
+
+
+
+
PyTorch Installation Options
+
+ By default, blop installs PyTorch with GPU support (~7GB).
+ For environments without GPU support, or to reduce installation size, you can install a CPU-only version (~900MB) using uv.
+ This is particularly useful for containerized environments with GPU access, CI/CD pipelines, development environments on laptops without NVIDIA GPUs, or edge computing scenarios.
+
+
+ Note: CPU-only installation requires uv, a fast Python package installer.
+
+ {# We define the list so additional items can be easily added #}
+ {% set nav_items = [
+ ("Tutorials", "tutorials.html", "Step-by-step guides to get started with Blop fundamentals and basic workflows"),
+ ("How-to", "howto.html", "Practical recipes and solutions for specific beamline optimization tasks"),
+ ("References", "references.html", "Complete API documentation, class references, and technical specifications"),
+ ("Release History", "release-history.html", "Version updates, new features, bug fixes, and changelog for v" + release)
+ ] %}
+
+ {% for title, link, description in nav_items %}
+
If you use this package in your work, please cite the following paper:
+
+
+ Morris, T. W., Rakitin, M., Du, Y., Fedurin, M., Giles, A. C., Leshchev, D., Li, W. H., Romasky, B., Stavitski, E., Walter, A. L., Moeller, P., Nash, B., & Islegen-Wojdyla, A. (2024). A general Bayesian algorithm for the autonomous alignment of beamlines. Journal of Synchrotron Radiation, 31(6), 1446–1456.
+
+ https://doi.org/10.1107/S1600577524008993
+
+
+
+
BibTeX:
+
+ @Article{Morris2024,
+ author = {Morris, Thomas W. and Rakitin, Max and Du, Yonghua and Fedurin, Mikhail and Giles, Abigail C. and Leshchev, Denis and Li, William H. and Romasky, Brianna and Stavitski, Eli and Walter, Andrew L. and Moeller, Paul and Nash, Boaz and Islegen-Wojdyla, Antoine},
+ journal = {Journal of Synchrotron Radiation},
+ title = {A general Bayesian algorithm for the autonomous alignment of beamlines},
+ year = {2024},
+ month = {Nov},
+ number = {6},
+ pages = {1446--1456},
+ volume = {31},
+ doi = {10.1107/S1600577524008993},
+ keywords = {Bayesian optimization, automated alignment, synchrotron radiation, digital twins, machine learning},
+ url = {https://doi.org/10.1107/S1600577524008993},
+ }
+
+
+
+{% endblock %}
+
+{% block stylesheets %}
+ {{ super() }}
+
+{% endblock %}
+
+{% block scripts %}
+ {{ super() }}
+
+{% endblock %}
diff --git a/docs/source/conf.py b/docs/source/conf.py
index 1756eed1..7fd91048 100644
--- a/docs/source/conf.py
+++ b/docs/source/conf.py
@@ -144,7 +144,11 @@
# further. For a list of options available for each theme, see the
# documentation.
#
-# html_theme_options = {}
+html_theme_options = {
+ "navbar_start": ["navbar-logo"],
+ "navbar_center": ["navbar-nav"],
+ "navbar_end": [],
+}
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
@@ -169,8 +173,13 @@
# Add custom CSS to fix .content height constraint for plotly plots
html_css_files = [
"fix-content-height.css",
+ "css/styles.css",
]
+html_additional_pages = {
+ "index": "index.html",
+}
+
# -- Options for LaTeX output ---------------------------------------------
diff --git a/docs/source/how-to-guides.rst b/docs/source/how-to-guides.rst
index dae99aa4..da738adb 100644
--- a/docs/source/how-to-guides.rst
+++ b/docs/source/how-to-guides.rst
@@ -7,6 +7,7 @@ How-to Guides
how-to-guides/use-ophyd-devices.rst
how-to-guides/attach-data-to-experiments.rst
how-to-guides/custom-generation-strategies.rst
+ how-to-guides/manual-suggestions.rst
how-to-guides/set-dof-constraints.rst
how-to-guides/set-outcome-constraints.rst
how-to-guides/acquire-baseline.rst
diff --git a/docs/source/how-to-guides/manual-suggestions.rst b/docs/source/how-to-guides/manual-suggestions.rst
new file mode 100644
index 00000000..33db0b86
--- /dev/null
+++ b/docs/source/how-to-guides/manual-suggestions.rst
@@ -0,0 +1,253 @@
+.. testsetup::
+
+ from unittest.mock import MagicMock
+ from typing import Any
+ import time
+
+ from bluesky.protocols import NamedMovable, Readable, Status, Hints, HasHints, HasParent
+ from bluesky.run_engine import RunEngine
+ from tiled.client.container import Container
+
+ class AlwaysSuccessfulStatus(Status):
+ def add_callback(self, callback) -> None:
+ callback(self)
+
+ def exception(self, timeout = 0.0):
+ return None
+
+ @property
+ def done(self) -> bool:
+ return True
+
+ @property
+ def success(self) -> bool:
+ return True
+
+ class ReadableSignal(Readable, HasHints, HasParent):
+ def __init__(self, name: str) -> None:
+ self._name = name
+ self._value = 0.0
+
+ @property
+ def name(self) -> str:
+ return self._name
+
+ @property
+ def hints(self) -> Hints:
+ return {
+ "fields": [self._name],
+ "dimensions": [],
+ "gridding": "rectilinear",
+ }
+
+ @property
+ def parent(self) -> Any | None:
+ return None
+
+ def read(self):
+ return {
+ self._name: { "value": self._value, "timestamp": time.time() }
+ }
+
+ def describe(self):
+ return {
+ self._name: { "source": self._name, "dtype": "number", "shape": [] }
+ }
+
+ class MovableSignal(ReadableSignal, NamedMovable):
+ def __init__(self, name: str, initial_value: float = 0.0) -> None:
+ super().__init__(name)
+ self._value: float = initial_value
+
+ def set(self, value: float) -> Status:
+ self._value = value
+ return AlwaysSuccessfulStatus()
+
+ db = MagicMock(spec=Container)
+ RE = RunEngine({})
+
+ sensor = ReadableSignal("signal")
+ motor_x = MovableSignal("motor_x")
+ motor_y = MovableSignal("motor_y")
+
+ # Mock evaluation function for examples
+ def evaluation_function(uid: str, suggestions: list[dict]) -> list[dict]:
+ """Mock evaluation function that returns constant outcomes."""
+ outcomes = []
+ for suggestion in suggestions:
+ outcome = {
+ "_id": suggestion["_id"],
+ "signal": 0.5,
+ }
+ outcomes.append(outcome)
+ return outcomes
+
+Manual Point Injection
+======================
+
+This guide shows how to inject custom parameter combinations based on domain knowledge or external sources, alongside optimizer-driven suggestions.
+
+Basic Usage
+-----------
+
+To evaluate manually-specified points, use the ``sample_suggestions`` method with parameter combinations (without ``"_id"`` keys). The optimizer will automatically register these trials and incorporate the results into the Bayesian model.
+
+.. testcode::
+
+ from blop.ax import Agent, RangeDOF, Objective
+
+ # Configure agent
+ agent = Agent(
+ sensors=[sensor],
+ dofs=[
+ RangeDOF(actuator=motor_x, bounds=(-10, 10), parameter_type="float"),
+ RangeDOF(actuator=motor_y, bounds=(-10, 10), parameter_type="float"),
+ ],
+ objectives=[Objective(name="signal", minimize=False)],
+ evaluation_function=evaluation_function,
+ )
+
+ # Define points of interest
+ manual_points = [
+ {'motor_x': 0.5, 'motor_y': 1.0}, # Center region
+ {'motor_x': 0.0, 'motor_y': 0.0}, # Origin
+ ]
+
+ # Evaluate them
+ RE(agent.sample_suggestions(manual_points))
+
+.. testoutput::
+ :hide:
+
+ ...
+
+The manual points will be treated just like optimizer suggestions - they'll be tracked, evaluated, and used to improve the model.
+
+Mixed Workflows
+---------------
+
+You can combine optimizer suggestions with manual points throughout your optimization:
+
+.. testcode::
+
+ from blop.ax import Agent, RangeDOF, Objective
+
+ agent = Agent(
+ sensors=[sensor],
+ dofs=[
+ RangeDOF(actuator=motor_x, bounds=(-10, 10), parameter_type="float"),
+ RangeDOF(actuator=motor_y, bounds=(-10, 10), parameter_type="float"),
+ ],
+ objectives=[Objective(name="signal", minimize=False)],
+ evaluation_function=evaluation_function,
+ )
+
+ # Run optimizer for initial exploration
+ RE(agent.optimize(iterations=3))
+
+ # Try a manual point based on domain insight
+ manual_point = [{'motor_x': 0.75, 'motor_y': 0.25}]
+ RE(agent.sample_suggestions(manual_point))
+
+ # Continue optimization
+ RE(agent.optimize(iterations=3))
+
+.. testoutput::
+ :hide:
+
+ ...
+
+The optimizer will incorporate your manual point into its model and use it to inform future suggestions.
+
+Manual Approval Workflow
+-------------------------
+
+You can review optimizer suggestions before running them by using ``suggest()`` to get suggestions without acquiring data:
+
+.. testcode::
+
+ from blop.ax import Agent, RangeDOF, Objective
+
+ agent = Agent(
+ sensors=[sensor],
+ dofs=[
+ RangeDOF(actuator=motor_x, bounds=(-10, 10), parameter_type="float"),
+ RangeDOF(actuator=motor_y, bounds=(-10, 10), parameter_type="float"),
+ ],
+ objectives=[Objective(name="signal", minimize=False)],
+ evaluation_function=evaluation_function,
+ )
+
+ # Get suggestions without running
+ suggestions = agent.suggest(num_points=5)
+
+ # Review and filter
+ print("Reviewing suggestions:")
+ for s in suggestions:
+ trial_id = s['_id']
+ x = s['motor_x']
+ y = s['motor_y']
+ print(f" Trial {trial_id}: x={x:.2f}, y={y:.2f}")
+
+ # Only run approved suggestions
+ approved = [s for s in suggestions if s['motor_x'] > -5.0]
+
+ if approved:
+ RE(agent.sample_suggestions(approved))
+ else:
+ print("No suggestions approved")
+
+.. testoutput::
+
+ Reviewing suggestions:
+ ...
+
+This workflow allows you to apply safety checks, domain constraints, or other validation before running trials.
+
+Iterative Refinement
+--------------------
+
+A common pattern is to alternate between automated optimization and targeted manual exploration:
+
+.. testcode::
+
+ from blop.ax import Agent, RangeDOF, Objective
+
+ agent = Agent(
+ sensors=[sensor],
+ dofs=[
+ RangeDOF(actuator=motor_x, bounds=(-10, 10), parameter_type="float"),
+ RangeDOF(actuator=motor_y, bounds=(-10, 10), parameter_type="float"),
+ ],
+ objectives=[Objective(name="signal", minimize=False)],
+ evaluation_function=evaluation_function,
+ )
+
+ for cycle in range(3):
+ # Automated exploration
+ RE(agent.optimize(iterations=2, n_points=2))
+
+ # Review results and manually probe interesting regions
+ # (Look at plots, current best, etc.)
+
+ # Try edge cases or special points
+ if cycle == 1:
+ # After first cycle, check boundaries
+ boundary_points = [
+ {'motor_x': -10.0, 'motor_y': 0.0},
+ {'motor_x': 10.0, 'motor_y': 0.0},
+ ]
+ RE(agent.sample_suggestions(boundary_points))
+
+.. testoutput::
+ :hide:
+
+ ...
+
+See Also
+--------
+
+- :meth:`blop.ax.Agent.suggest` - Get optimizer suggestions without running
+- :meth:`blop.ax.Agent.sample_suggestions` - Evaluate specific suggestions
+- :meth:`blop.ax.Agent.optimize` - Run full optimization loop
+- :class:`blop.protocols.CanRegisterSuggestions` - Protocol for manual trial support
diff --git a/pyproject.toml b/pyproject.toml
index fae747b5..ec1a9413 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -46,6 +46,7 @@ dynamic = ["version"]
[project.optional-dependencies]
dev = [
"pytest",
+ "pytest-cov",
"ipykernel",
"ruff",
"nbstripout",
diff --git a/src/blop/ax/agent.py b/src/blop/ax/agent.py
index 8e99e2b9..306fbbbe 100644
--- a/src/blop/ax/agent.py
+++ b/src/blop/ax/agent.py
@@ -15,7 +15,8 @@
# ===============================
from bluesky.utils import MsgGenerator
-from ..plans import acquire_baseline, optimize
+from ..plans import acquire_baseline, optimize, sample_suggestions
+from ..plans.utils import InferredReadable
from ..protocols import AcquisitionPlan, Actuator, EvaluationFunction, OptimizationProblem, Sensor
from .dof import DOF, DOFConstraint
from .objective import Objective, OutcomeConstraint, to_ax_objective_str
@@ -98,6 +99,7 @@ def __init__(
checkpoint_path=checkpoint_path,
**kwargs,
)
+ self._readable_cache: dict[str, InferredReadable] = {}
@classmethod
def from_checkpoint(
@@ -293,7 +295,37 @@ def optimize(self, iterations: int = 1, n_points: int = 1) -> MsgGenerator[None]
suggest : Get point suggestions without running acquisition.
ingest : Manually ingest evaluation results.
"""
- yield from optimize(self.to_optimization_problem(), iterations=iterations, n_points=n_points)
+ yield from optimize(
+ self.to_optimization_problem(), iterations=iterations, n_points=n_points, readable_cache=self._readable_cache
+ )
+
+ def sample_suggestions(self, suggestions: list[dict]) -> MsgGenerator[tuple[str, list[dict], list[dict]]]:
+ """
+ Evaluate specific parameter combinations.
+
+ Acquires data for given suggestions and ingests results. Supports both
+ optimizer suggestions and manual points.
+
+ Parameters
+ ----------
+ suggestions : list[dict]
+ Either optimizer suggestions (with "_id") or manual points (without "_id").
+
+ Returns
+ -------
+ tuple[str, list[dict], list[dict]]
+ Bluesky run UID, suggestions with "_id", and outcomes.
+
+ See Also
+ --------
+ suggest : Get optimizer suggestions.
+ optimize : Run full optimization loop.
+ """
+ return (
+ yield from sample_suggestions(
+ self.to_optimization_problem(), suggestions=suggestions, readable_cache=self._readable_cache
+ )
+ )
def plot_objective(
self, x_dof_name: str, y_dof_name: str, objective_name: str, *args: Any, **kwargs: Any
diff --git a/src/blop/ax/optimizer.py b/src/blop/ax/optimizer.py
index 9b473246..aafa93a8 100644
--- a/src/blop/ax/optimizer.py
+++ b/src/blop/ax/optimizer.py
@@ -3,10 +3,10 @@
from ax import ChoiceParameterConfig, Client, RangeParameterConfig
-from ..protocols import ID_KEY, Checkpointable, Optimizer
+from ..protocols import ID_KEY, CanRegisterSuggestions, Checkpointable, Optimizer
-class AxOptimizer(Optimizer, Checkpointable):
+class AxOptimizer(Optimizer, Checkpointable, CanRegisterSuggestions):
"""
An optimizer that uses Ax as the backend for optimization and experiment tracking.
@@ -158,6 +158,37 @@ def ingest(self, points: list[dict]) -> None:
trial_idx = self._client.attach_baseline(parameters=parameters)
self._client.complete_trial(trial_index=trial_idx, raw_data=outcomes)
+ def register_suggestions(self, suggestions: list[dict]) -> list[dict]:
+ """
+ Register manual suggestions with the Ax experiment.
+
+ Attaches trials to the experiment and returns the suggestions with "_id" keys
+ added for tracking. This enables manual point injection alongside optimizer-driven
+ suggestions.
+
+ Parameters
+ ----------
+ suggestions : list[dict]
+ Parameter combinations to register. The "_id" key will be overwritten if present.
+
+ Returns
+ -------
+ list[dict]
+ The same suggestions with "_id" keys added.
+ """
+ registered = []
+ for suggestion in suggestions:
+ # Extract parameters (ignore _id if present)
+ parameters = {k: v for k, v in suggestion.items() if k != ID_KEY}
+
+ # Attach trial to Ax experiment
+ trial_idx = self._client.attach_trial(parameters=parameters)
+
+ # Return with trial ID
+ registered.append({ID_KEY: trial_idx, **parameters})
+
+ return registered
+
def checkpoint(self) -> None:
"""
Save the optimizer's state to JSON file.
diff --git a/src/blop/plans/__init__.py b/src/blop/plans/__init__.py
index fc196814..efc9d6cd 100644
--- a/src/blop/plans/__init__.py
+++ b/src/blop/plans/__init__.py
@@ -6,6 +6,7 @@
optimize_step,
per_step_background_read,
read,
+ sample_suggestions,
take_reading_with_background,
)
from .utils import get_route_index, route_suggestions
@@ -18,6 +19,7 @@
"optimize",
"optimize_step",
"per_step_background_read",
+ "sample_suggestions",
"read",
"route_suggestions",
"take_reading_with_background",
diff --git a/src/blop/plans/plans.py b/src/blop/plans/plans.py
index 3918ecd2..5b5ba6a6 100644
--- a/src/blop/plans/plans.py
+++ b/src/blop/plans/plans.py
@@ -11,14 +11,15 @@
from bluesky.protocols import Readable, Reading
from bluesky.utils import MsgGenerator, plan
-from ..protocols import ID_KEY, Actuator, Checkpointable, OptimizationProblem, Optimizer, Sensor
-from .utils import InferredReadable, route_suggestions
+from ..protocols import ID_KEY, Actuator, CanRegisterSuggestions, Checkpointable, OptimizationProblem, Optimizer, Sensor
+from .utils import InferredReadable, collect_optimization_metadata, route_suggestions
logger = logging.getLogger(__name__)
_BLUESKY_UID_KEY: Literal["bluesky_uid"] = "bluesky_uid"
_SUGGESTION_IDS_KEY: Literal["suggestion_ids"] = "suggestion_ids"
_DEFAULT_ACQUIRE_RUN_KEY: Literal["default_acquire"] = "default_acquire"
+_SAMPLE_SUGGESTIONS_RUN_KEY: Literal["sample_suggestions"] = "sample_suggestions"
_OPTIMIZE_RUN_KEY: Literal["optimize"] = "optimize"
@@ -253,7 +254,7 @@ def optimize(
iterations: int = 1,
n_points: int = 1,
checkpoint_interval: int | None = None,
- *args: Any,
+ readable_cache: dict[str, InferredReadable] | None = None,
**kwargs: Any,
) -> MsgGenerator[None]:
"""
@@ -271,8 +272,9 @@ def optimize(
The number of iterations between optimizer checkpoints. If None, checkpoints
will not be saved. Optimizer must implement the
:class:`blop.protocols.Checkpointable` protocol.
- *args : Any
- Additional positional arguments to pass to the :func:`optimize_step` plan.
+ readable_cache: dict[str, InferredReadable] | None = None
+ Cache of readable objects to store the suggestions and outcomes as events.
+ If None, a new cache will be created.
**kwargs : Any
Additional keyword arguments to pass to the :func:`optimize_step` plan.
@@ -284,37 +286,26 @@ def optimize(
"""
# Cache to track readables created from suggestions and outcomes
- readable_cache: dict[str, InferredReadable] = {}
-
- # Collect metadata for this optimization run
- if hasattr(optimization_problem.evaluation_function, "__name__"):
- evaluation_function_name = optimization_problem.evaluation_function.__name__ # type: ignore[attr-defined]
- else:
- evaluation_function_name = optimization_problem.evaluation_function.__class__.__name__
- if hasattr(optimization_problem.acquisition_plan, "__name__"):
- acquisition_plan_name = optimization_problem.acquisition_plan.__name__ # type: ignore[attr-defined]
- else:
- acquisition_plan_name = optimization_problem.acquisition_plan.__class__.__name__
- _md = {
- "plan_name": "optimize",
- "sensors": [sensor.name for sensor in optimization_problem.sensors],
- "actuators": [actuator.name for actuator in optimization_problem.actuators],
- "evaluation_function": evaluation_function_name,
- "acquisition_plan": acquisition_plan_name,
- "optimizer": optimization_problem.optimizer.__class__.__name__,
- "iterations": iterations,
- "n_points": n_points,
- "checkpoint_interval": checkpoint_interval,
- "run_key": _OPTIMIZE_RUN_KEY,
- }
+ readable_cache = readable_cache or {}
+
+ _md = collect_optimization_metadata(optimization_problem)
+ _md.update(
+ {
+ "plan_name": "optimize",
+ "iterations": iterations,
+ "n_points": n_points,
+ "checkpoint_interval": checkpoint_interval,
+ "run_key": _OPTIMIZE_RUN_KEY,
+ }
+ )
# Encapsulate the optimization plan in a run decorator
@bpp.set_run_key_decorator(_OPTIMIZE_RUN_KEY)
@bpp.run_decorator(md=_md)
- def _optimize():
+ def _optimize() -> MsgGenerator[None]:
for i in range(iterations):
# Perform a single step of the optimization
- uid, suggestions, outcomes = yield from optimize_step(optimization_problem, n_points, *args, **kwargs)
+ uid, suggestions, outcomes = yield from optimize_step(optimization_problem, n_points, **kwargs)
# Read the optimization step into the Bluesky and emit events for each suggestion and outcome
yield from _read_step(uid, suggestions, outcomes, n_points, readable_cache)
@@ -326,6 +317,100 @@ def _optimize():
return (yield from _optimize())
+@plan
+def sample_suggestions(
+ optimization_problem: OptimizationProblem,
+ suggestions: list[dict],
+ readable_cache: dict[str, InferredReadable] | None = None,
+ **kwargs: Any,
+) -> MsgGenerator[tuple[str, list[dict], list[dict]]]:
+ """
+ Evaluate specific parameter combinations.
+
+ This plan acquires data for given suggestions and ingests results into the optimizer.
+ Supports both optimizer-generated suggestions (with "_id") and manual points
+ (without "_id", if optimizer implements CanRegisterSuggestions).
+
+ Parameters
+ ----------
+ optimization_problem : OptimizationProblem
+ The optimization problem.
+ suggestions : list[dict]
+ Parameter combinations to evaluate. Can be:
+
+ - Optimizer suggestions (with "_id" keys from suggest())
+ - Manual points (without "_id", requires CanRegisterSuggestions protocol)
+
+ readable_cache : dict[str, InferredReadable] | None
+ Cache for storing suggestions/outcomes as events.
+ **kwargs : Any
+ Additional arguments for acquisition plan.
+
+ Returns
+ -------
+ uid : str
+ Bluesky run UID.
+ suggestions : list[dict]
+ Suggestions with "_id" keys.
+ outcomes : list[dict]
+ Evaluated outcomes.
+
+ Raises
+ ------
+ ValueError
+ If suggestions lack "_id" and optimizer doesn't implement CanRegisterSuggestions.
+
+ See Also
+ --------
+ optimize_step : Standard optimizer-driven step.
+ blop.protocols.CanRegisterSuggestions : Protocol for manual suggestions.
+ """
+
+ # Ensure the suggestions have an ID_KEY or register them with the optimizer
+ if not isinstance(optimization_problem.optimizer, CanRegisterSuggestions) and any(
+ ID_KEY not in suggestion for suggestion in suggestions
+ ):
+ raise ValueError(
+ f"All suggestions must contain an '{ID_KEY}' key to later match with the outcomes or your optimizer must "
+ "implement the `blop.protocols.CanRegisterSuggestions` protocol. Please review your optimizer "
+ f"implementation. Got suggestions: {suggestions}"
+ )
+ elif isinstance(optimization_problem.optimizer, CanRegisterSuggestions):
+ suggestions = optimization_problem.optimizer.register_suggestions(suggestions)
+
+ # Collect the metadata for the run
+ _md = collect_optimization_metadata(optimization_problem)
+ _md.update(
+ {
+ "plan_name": "sample_suggestions",
+ "suggestions": suggestions,
+ "run_key": _SAMPLE_SUGGESTIONS_RUN_KEY,
+ }
+ )
+
+ @bpp.set_run_key_decorator(_SAMPLE_SUGGESTIONS_RUN_KEY)
+ @bpp.run_decorator(md=_md)
+ def _inner_sample_suggestions() -> MsgGenerator[tuple[str, list[dict], list[dict]]]:
+
+ # Acquire data, evaluate, and ingest outcomes
+ if optimization_problem.acquisition_plan is None:
+ acquisition_plan = default_acquire
+ else:
+ acquisition_plan = optimization_problem.acquisition_plan
+ uid = yield from acquisition_plan(
+ suggestions, optimization_problem.actuators, optimization_problem.sensors, **kwargs
+ )
+ outcomes = optimization_problem.evaluation_function(uid, suggestions)
+ optimization_problem.optimizer.ingest(outcomes)
+
+ # Emit a Bluesky event
+ yield from _read_step(uid, suggestions, outcomes, len(suggestions), readable_cache or {})
+
+ return uid, suggestions, outcomes
+
+ return (yield from _inner_sample_suggestions())
+
+
@plan
def read(readables: Sequence[Readable], **kwargs: Any) -> MsgGenerator[dict[str, Any]]:
"""
diff --git a/src/blop/plans/utils.py b/src/blop/plans/utils.py
index 05a95aef..815bf4fa 100644
--- a/src/blop/plans/utils.py
+++ b/src/blop/plans/utils.py
@@ -8,7 +8,7 @@
from event_model import DataKey
from numpy.typing import ArrayLike
-from ..protocols import ID_KEY
+from ..protocols import ID_KEY, OptimizationProblem
def _infer_data_key(value: ArrayLike) -> DataKey:
@@ -133,3 +133,24 @@ def route_suggestions(suggestions: list[dict], starting_position: dict | None =
starting_point = np.array([starting_position[dim] for dim in dims_to_route]) if starting_position else None
return [suggestions[i] for i in get_route_index(points=points, starting_point=starting_point)]
+
+
+def collect_optimization_metadata(optimization_problem: OptimizationProblem) -> dict[str, Any]:
+ """
+ Collect the metadata for the optimization problem.
+ """
+ if hasattr(optimization_problem.evaluation_function, "__name__"):
+ evaluation_function_name = optimization_problem.evaluation_function.__name__ # type: ignore[attr-defined]
+ else:
+ evaluation_function_name = optimization_problem.evaluation_function.__class__.__name__
+ if hasattr(optimization_problem.acquisition_plan, "__name__"):
+ acquisition_plan_name = optimization_problem.acquisition_plan.__name__ # type: ignore[attr-defined]
+ else:
+ acquisition_plan_name = optimization_problem.acquisition_plan.__class__.__name__
+ return {
+ "evaluation_function": evaluation_function_name,
+ "acquisition_plan": acquisition_plan_name,
+ "optimizer": optimization_problem.optimizer.__class__.__name__,
+ "sensors": [sensor.name for sensor in optimization_problem.sensors],
+ "actuators": [actuator.name for actuator in optimization_problem.actuators],
+ }
diff --git a/src/blop/protocols.py b/src/blop/protocols.py
index 66938492..dc31f0c6 100644
--- a/src/blop/protocols.py
+++ b/src/blop/protocols.py
@@ -10,6 +10,31 @@
Sensor = Readable | EventCollectable | EventPageCollectable
+@runtime_checkable
+class CanRegisterSuggestions(Protocol):
+ """
+ A protocol for optimizers that can register suggestions. This
+ allows them to add an "_id" key to the suggestions dynamically and ensure
+ that the suggestions are unique.
+ """
+
+ def register_suggestions(self, suggestions: list[dict]) -> list[dict]:
+ """
+ Register the suggestions with the optimizer.
+
+ Parameters
+ ----------
+ suggestions: list[dict]
+ The suggestions to register. The "_id" key is optional and will be overwritten if present.
+
+ Returns
+ -------
+ list[dict]
+ The original suggestions with an "_id" key added.
+ """
+ ...
+
+
@runtime_checkable
class Checkpointable(Protocol):
"""