Skip to content

Commit

Permalink
Bug fixes and WIP improving MXLIMS handling
Browse files Browse the repository at this point in the history
  • Loading branch information
rhfogh committed Nov 11, 2024
1 parent 4c35b38 commit e60a91e
Show file tree
Hide file tree
Showing 8 changed files with 368 additions and 132 deletions.
2 changes: 1 addition & 1 deletion mxcubecore/HardwareObjects/Beamline.py
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ def get_default_acquisition_parameters(self, acquisition_type="default"):
acq_parameters.detector_binning_mode = ""

try:
acq_parameters.detector_roi_mode = self.detector.get_roi_mode()
acq_parameters.detector_roi_mode = self.detector.get_roi_mode_name()
except Exception:
logging.getLogger("HWR").warning(
"get_default_acquisition_parameters: "
Expand Down
11 changes: 10 additions & 1 deletion mxcubecore/HardwareObjects/Gphl/GphlQueueEntry.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@


import logging
from datetime import datetime

from mxcubecore import HardwareRepository as HWR
from mxcubecore.queue_entry.base_queue_entry import BaseQueueEntry

from mxlims.pydantic import crystallography as mxmodel
from mxcubecore.utils import mxlims as mxutils

__credits__ = ["MXCuBE collaboration"]
__license__ = "LGPLv3+"
__category__ = "queue"
Expand Down Expand Up @@ -69,4 +73,9 @@ def init_mxlims(self):

mxexperiment: mxmodel.MXExperiment = self.get_mxlims_record()
if mxexperiment is None:
self._mxlims_record = mxutils.create_mxexperiment(self.get_data_model())
data_model = self.get_data_model()
self._mxlims_record = mxutils.create_mxexperiment(
data_model,
start_time = datetime.now(),
measured_flux = HWR.beamline.flux.get_value()
)
110 changes: 63 additions & 47 deletions mxcubecore/HardwareObjects/Gphl/GphlWorkflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import socket
import subprocess
import time
import uuid
from collections import OrderedDict

import f90nml
Expand All @@ -58,8 +57,6 @@
QueueAbortedException,
)

from mxlims import crystallography as mxmodel


@enum.unique
class GphlWorkflowStates(enum.Enum):
Expand All @@ -79,7 +76,6 @@ class GphlWorkflowStates(enum.Enum):
COMPLETED = 4
UNKNOWN = 5


__copyright__ = """ Copyright © 2016 - 2019 by Global Phasing Ltd. """
__license__ = "LGPLv3+"
__author__ = "Rasmus H Fogh"
Expand Down Expand Up @@ -231,6 +227,9 @@ def __init__(self, name):

self.recentring_file = None

# Scan number for MXLISM Scan ordering
self.next_scan_number = 0

# # TEST mxcubeweb UI
# self.gevent_event = gevent.event.Event()
# self.params_dict = {}
Expand All @@ -253,6 +252,7 @@ def init(self):
"WorkflowAborted": self.workflow_aborted,
"WorkflowCompleted": self.workflow_completed,
"WorkflowFailed": self.workflow_failed,
"StartEnactment": self.start_enactment,
}

# Set standard configurable file paths
Expand Down Expand Up @@ -680,7 +680,6 @@ def query_pre_strategy_params(self, choose_lattice=None):
dispatcher.connect(
self.receive_pre_strategy_data,
self.PARAMETER_RETURN_SIGNAL,
dispatcher.Any,
)
responses = dispatcher.send(
self.PARAMETERS_NEEDED,
Expand All @@ -703,7 +702,6 @@ def query_pre_strategy_params(self, choose_lattice=None):
dispatcher.disconnect(
self.receive_pre_strategy_data,
self.PARAMETER_RETURN_SIGNAL,
dispatcher.Any,
)
self._return_parameters = None

Expand Down Expand Up @@ -802,10 +800,30 @@ def pre_execute(self, queue_entry):

self._workflow_queue = gevent.queue.Queue()

def start_enactment(self, enactment_id:str):
def start_enactment(self, enactment_id:str, correlation_id:str):
"""Set enactment_id and initialise MXLIMS MXExperiment"""
self._queue_entry.get_data_model().enactment_id = enactment_id
self._queue_entry.start_enactment()
data_model = self._queue_entry.get_data_model()
tracking_data = data_model.tracking_data
workflow_parameters = data_model.workflow_parameters
tracking_data.uuid = enactment_id
tracking_data.workflow_uid = (
workflow_parameters.get("workflow_uid") or enactment_id
)
# NB it is not set it will be overwritten later
tracking_data.workflow_name = workflow_parameters.get("workflow_name")
tracking_data.workflow_type = (
workflow_parameters.get("workflow_type")
or data_model.strategy_type
)
tracking_data.location_id = workflow_parameters.get("workflow_position_id")
# NB first orientation only:
tracking_data.orientation_id = workflow_parameters.get(
"workflow_kappa_settings_id"
)
tracking_data.characterisation_id = workflow_parameters.get(
"characterisation_id"
)
self._queue_entry.init_mxlims()

def execute(self):

Expand Down Expand Up @@ -857,8 +875,6 @@ def execute(self):
elif message_type == "String":
if not self.settings.get("suppress_external_log_output"):
func(payload, correlation_id)
elif message_type == "StartEnactment":
self.start_enactment(payload)
else:
logging.getLogger("HWR").info(
"GΦL queue processing %s", message_type
Expand Down Expand Up @@ -1222,10 +1238,9 @@ def query_collection_strategy(self, geometric_strategy):
raise ValueError(
"invalid default recentring mode '%s' " % default_recentring_mode
)
use_modes = ["sweep"]
use_modes = ["sweep", "none"]
if len(orientations) > 1:
use_modes.append("start")
use_modes.append("none")
if is_interleaved:
use_modes.append("scan")
for indx in range(len(modes) - 1, -1, -1):
Expand Down Expand Up @@ -1339,7 +1354,6 @@ def query_collection_strategy(self, geometric_strategy):
dispatcher.connect(
self.receive_pre_collection_data,
self.PARAMETER_RETURN_SIGNAL,
dispatcher.Any,
)
responses = dispatcher.send(
self.PARAMETERS_NEEDED,
Expand All @@ -1359,7 +1373,6 @@ def query_collection_strategy(self, geometric_strategy):
dispatcher.disconnect(
self.receive_pre_collection_data,
self.PARAMETER_RETURN_SIGNAL,
dispatcher.Any,
)
self._return_parameters = None

Expand Down Expand Up @@ -1918,7 +1931,6 @@ def collect_data(self, payload, correlation_id):
last_orientation = ()
maxdev = -1
snapshotted_rotation_ids = set()
scan_numbers = {}
for scan in scans:
sweep = scan.sweep
acq = queue_model_objects.Acquisition()
Expand Down Expand Up @@ -1992,11 +2004,6 @@ def collect_data(self, payload, correlation_id):
path_template.run_number = int(ss0) if ss0 else 1
path_template.start_num = acq_parameters.first_image
path_template.num_files = acq_parameters.num_images
if path_template.suffix.endswith("h5"):
# Add scan number to prefix for interleaved hdf5 files (only)
# NBNB Tempoary fix, pending solution to hdf5 interleaving problem
scan_numbers[prefix] = scan_no = scan_numbers.get(prefix, 0) + 1
prefix += "_s%s" % scan_no
path_template.base_prefix = prefix

key = (
Expand All @@ -2008,31 +2015,45 @@ def collect_data(self, payload, correlation_id):

# Handle orientations and (re) centring
goniostatRotation = sweep.goniostatSweepSetting
rotation_id = orientation_id = goniostatRotation.id_

model_workflow_parameters = gphl_workflow_model.workflow_parameters
if not model_workflow_parameters.get("workflow_name"):
model_workflow_parameters["workflow_name"] = gphl_workflow_model.wfname
if not model_workflow_parameters.get("workflow_type"):
model_workflow_parameters["workflow_type"] = gphl_workflow_model.wftype
if not model_workflow_parameters.get("workflow_uid"):
model_workflow_parameters["workflow_uid"] = str(
HWR.beamline.gphl_connection._enactment_id
)
if not model_workflow_parameters.get("workflow_position_id"):
# As of 20240911 all workflows use a single position,
model_workflow_parameters["workflow_position_id"] = str(uuid.uuid1())
rotation_id = goniostatRotation.id_


# handle mxlims
# handle workflow parameters
new_workflow_parameters = gphl_workflow_model.workflow_parameters.copy()
wf_tracking_data = gphl_workflow_model.tracking_data
data_collection = queue_model_objects.DataCollection([acq], crystal)
# Workflow parameters for ICAT / external workflow
# The 'if' statement is to allow this to work in multiple versions
data_collection.workflow_parameters = new_workflow_parameters
tracking_data = data_collection.tracking_data
tracking_data.uuid = scan.id_
tracking_data.workflow_name = wf_tracking_data.experiment_strategy
tracking_data.workflow_type = wf_tracking_data.workflow_type
tracking_data.workflow_uid = wf_tracking_data.uuid
tracking_data.location_id = wf_tracking_data.location_id
tracking_data.orientation_id = rotation_id
tracking_data.sweep_id = sweep.id_
if (
gphl_workflow_model.wftype == "acquisition"
and not gphl_workflow_model.characterisation_done
and not model_workflow_parameters.get("workflow_characterisation_id")
):
model_workflow_parameters["workflow_characterisation_id"] = str(
sweep.id_
)
model_workflow_parameters["workflow_kappa_settings_id"] = str(
orientation_id
)
characterisation_id = sweep.id_
tracking_data.characterisation_id = characterisation_id#
wf_tracking_data.characterisation_id = characterisation_id
tracking_data.role = "Characterisation"
else:
tracking_data.characterisation_id = wf_tracking_data.characterisation_id#
tracking_data.role = "Result"
tracking_data.scan_number = self.next_scan_number
self.next_scan_number += 1

new_workflow_parameters["workflow_name"] = tracking_data.workflow_name
new_workflow_parameters["workflow_type"] = tracking_data.workflow_type
new_workflow_parameters["workflow_uid"] = tracking_data.workflow_uid
new_workflow_parameters["workflow_position_id"] = tracking_data.location_id
new_workflow_parameters["characterisation_id"] = tracking_data.characterisation_id
new_workflow_parameters["workflow_kappa_settings_id"] = tracking_data.orientation_id

initial_settings = sweep.get_initial_settings()
orientation = (
Expand Down Expand Up @@ -2094,11 +2115,6 @@ def collect_data(self, payload, correlation_id):
acq_parameters.num_images_per_trigger * acq_parameters.osc_range
- sweep_offset
)
data_collection = queue_model_objects.DataCollection([acq], crystal)
# Workflow parameters for ICAT / external workflow
# The 'if' statement is to allow this to work in multiple versions
if hasattr(data_collection, "workflow_parameters"):
data_collection.workflow_parameters.update(model_workflow_parameters)
data_collections.append(data_collection)
data_collection.set_enabled(True)
data_collection.ispyb_group_data_collections = True
Expand Down
2 changes: 1 addition & 1 deletion mxcubecore/HardwareObjects/Native/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def queue_update_result(server_hwobj, node_id, html_report):
return result


def queue_get_full_path(server_hwobj, suddir, tag):
def queue_get_full_path(server_hwobj, subdir, tag):
""" """
return HWR.beamline.session.get_full_path(subdir, tag)

Expand Down
71 changes: 71 additions & 0 deletions mxcubecore/model/queue_model_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import copy
import logging
import os
from typing import Optional

from pydantic import Field, BaseModel

from mxcubecore.model import queue_model_enumerables

Expand Down Expand Up @@ -58,6 +61,66 @@
__license__ = "LGPLv3+"


class TrackingData(BaseModel):
"""Data to connect different tasks into workflows, LIMS input, MXLIMS, etc.
NB Should be harmonised and merged with workflow_parameters"""

uuid: Optional[str] = Field(
default=None,
description="Unique identifier string for this queue_model_object",
)
workflow_name: Optional[str] = Field(
default=None,
description="Name of workflow that this queue_model_object belongs to",
)
workflow_type: Optional[str] = Field(
default=None,
description="Type of workflow that this queue_model_object belongs to",
)
workflow_uid: Optional[str] = Field(
default=None,
description="Unique identifier string for the workflow this queue_model_object belongs to",
)
location_id: Optional[str] = Field(
default=None,
description="Unique identifier string for the location / LogisticalSample "
"of this queue_model_object",
)
orientation_id: Optional[str] = Field(
default=None,
description="Unique identifier string for the orientation (kappa/phi/chi settings) "
"for this queue_model_object",
)
characterisation_id: Optional[str] = Field(
default=None,
description="Unique identifier string for characterisation data acquisition "
"that is relevant for this queue_model_object",
)
sweep_id: Optional[str] = Field(
default=None,
description="Unique identifier string for the sweep that this queue_model_object "
"is part of. Used to combine multiple Acquisitions as scans of a single sweep."
)
scan_number: Optional[int] = Field(
default=None,
description="Ordinal number (starting at 0), for this queue_model_object "
"in the experiment. Defines the time ordering of acquisitions and scans.",
)
role: Optional[str] = Field(
default=None,
description="Role of this Task result within the experiment.",
json_schema_extra={
"examples": [
"Result",
"Intermediate",
"Characterisation",
"Centring",
],
},
)


class TaskNode(object):
"""
Objects that inherit TaskNode can be added to and handled by
Expand All @@ -77,6 +140,8 @@ def __init__(self, task_data=None):
self._requires_centring = True
self._origin = None
self._task_data = task_data
# tracking data for connecting jobs into workflows, mxlims output, etrc.
self.tracking_data: TrackingData = TrackingData()

@property
def task_data(self):
Expand Down Expand Up @@ -2339,6 +2404,7 @@ def init_from_task_data(self, sample_model, params):
raise ValueError(
"No GΦL workflow strategy named %s found" % params["strategy_name"]
)
self.tracking_data.workflow_type = self.strategy_type

self.shape = params.get("shape", "")
for tag in (
Expand Down Expand Up @@ -2471,6 +2537,11 @@ def strategy_name(self):
""" "Strategy full name, e.g. "Two-wavelength MAD" """
return self.strategy_settings["title"]

@property
def strategy_short_name(self):
""" "Strategy full name, e.g. "Two-wavelength MAD" """
return self.strategy_settings["short_name"]

# Run name equal to base_prefix
def get_name(self):
# Required to conform to TaskNode
Expand Down
Loading

0 comments on commit e60a91e

Please sign in to comment.