Skip to content

Commit

Permalink
Merge branch 'release_24.1' into dev
Browse files Browse the repository at this point in the history
# Conflicts:
#	lib/galaxy/version.py
#	packages/app/HISTORY.rst
#	packages/app/setup.cfg
#	packages/auth/HISTORY.rst
#	packages/auth/setup.cfg
#	packages/config/HISTORY.rst
#	packages/config/setup.cfg
#	packages/data/HISTORY.rst
#	packages/data/setup.cfg
#	packages/files/HISTORY.rst
#	packages/files/setup.cfg
#	packages/job_execution/HISTORY.rst
#	packages/job_execution/setup.cfg
#	packages/job_metrics/HISTORY.rst
#	packages/job_metrics/setup.cfg
#	packages/navigation/HISTORY.rst
#	packages/navigation/setup.cfg
#	packages/objectstore/HISTORY.rst
#	packages/objectstore/setup.cfg
#	packages/schema/HISTORY.rst
#	packages/schema/setup.cfg
#	packages/selenium/HISTORY.rst
#	packages/selenium/setup.cfg
#	packages/test_api/HISTORY.rst
#	packages/test_api/setup.cfg
#	packages/test_base/HISTORY.rst
#	packages/test_base/setup.cfg
#	packages/test_driver/HISTORY.rst
#	packages/test_driver/setup.cfg
#	packages/tool_shed/HISTORY.rst
#	packages/tool_shed/setup.cfg
#	packages/tool_util/HISTORY.rst
#	packages/tool_util/setup.cfg
#	packages/tours/HISTORY.rst
#	packages/tours/setup.cfg
#	packages/util/HISTORY.rst
#	packages/util/setup.cfg
#	packages/web_apps/HISTORY.rst
#	packages/web_apps/setup.cfg
#	packages/web_framework/HISTORY.rst
#	packages/web_framework/setup.cfg
#	packages/web_stack/HISTORY.rst
#	packages/web_stack/setup.cfg
  • Loading branch information
bernt-matthias committed Sep 25, 2024
2 parents c9d6ca6 + a788fbd commit 7dd606a
Show file tree
Hide file tree
Showing 30 changed files with 519 additions and 48 deletions.
4 changes: 2 additions & 2 deletions lib/galaxy/managers/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -1660,7 +1660,7 @@ def _workflow_to_dict_instance(self, trans, stored, workflow, legacy=True):
inputs = {}
for step in workflow.input_steps:
step_type = step.type
step_label = step.label or step.tool_inputs.get("name")
step_label = step.label or step.tool_inputs and step.tool_inputs.get("name")
if step_label:
label = step_label
elif step_type == "data_input":
Expand Down Expand Up @@ -1954,7 +1954,7 @@ def __set_default_label(self, step, module, state):
to the actual `label` attribute which is available for all module types, unique, and mapped to its own database column.
"""
if not module.label and module.type in ["data_input", "data_collection_input"]:
new_state = safe_loads(state)
new_state = safe_loads(state) or {}
default_label = new_state.get("name")
if default_label and util.unicodify(default_label).lower() not in [
"input dataset",
Expand Down
2 changes: 2 additions & 0 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4544,7 +4544,9 @@ class DatasetInstance(RepresentById, UsesCreateAndUpdateTime, _HasTable):
creating_job_associations: List[Union[JobToOutputDatasetCollectionAssociation, JobToOutputDatasetAssociation]]
copied_from_history_dataset_association: Optional["HistoryDatasetAssociation"]
copied_from_library_dataset_dataset_association: Optional["LibraryDatasetDatasetAssociation"]
dependent_jobs: List[JobToInputLibraryDatasetAssociation]
implicitly_converted_datasets: List["ImplicitlyConvertedDatasetAssociation"]
implicitly_converted_parent_datasets: List["ImplicitlyConvertedDatasetAssociation"]

validated_states = DatasetValidatedState

Expand Down
30 changes: 22 additions & 8 deletions lib/galaxy/model/store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,7 @@ def _reassign_hids(self, object_import_tracker: "ObjectImportTracker", history:

if object_import_tracker.copy_hid_for:
# in an if to avoid flush if unneeded
for from_dataset, to_dataset in object_import_tracker.copy_hid_for.items():
for from_dataset, to_dataset in object_import_tracker.copy_hid_for:
to_dataset.hid = from_dataset.hid
self._session_add(to_dataset)
self._flush()
Expand Down Expand Up @@ -1276,18 +1276,24 @@ def _import_implicit_dataset_conversions(self, object_import_tracker: "ObjectImp
metadata_safe = False
idc = model.ImplicitlyConvertedDatasetAssociation(metadata_safe=metadata_safe, for_import=True)
idc.type = idc_attrs["file_type"]
if idc_attrs.get("parent_hda"):
idc.parent_hda = object_import_tracker.hdas_by_key[idc_attrs["parent_hda"]]
# We may not have exported the parent, so only set the parent_hda attribute if we did.
if (parent_hda_id := idc_attrs.get("parent_hda")) and (
parent_hda := object_import_tracker.hdas_by_key.get(parent_hda_id)
):
# exports created prior to 24.2 may not have a parent if the parent had been purged
idc.parent_hda = parent_hda
if idc_attrs.get("hda"):
idc.dataset = object_import_tracker.hdas_by_key[idc_attrs["hda"]]

# we have a the dataset and the parent, lets ensure they land up with the same HID
if idc.dataset and idc.parent_hda and idc.parent_hda in object_import_tracker.requires_hid:
# we have the dataset and the parent, lets ensure they land up with the same HID
if idc.dataset and idc.parent_hda:
try:
object_import_tracker.requires_hid.remove(idc.dataset)
except ValueError:
pass # we wanted to remove it anyway.
object_import_tracker.copy_hid_for[idc.parent_hda] = idc.dataset
# A HDA can be the parent of multiple implicitly converted dataset,
# that's thy we use [(source, target)] here
object_import_tracker.copy_hid_for.append((idc.parent_hda, idc.dataset))

self._session_add(idc)

Expand Down Expand Up @@ -1370,7 +1376,7 @@ class ObjectImportTracker:
hdca_copied_from_sinks: Dict[ObjectKeyType, ObjectKeyType]
jobs_by_key: Dict[ObjectKeyType, model.Job]
requires_hid: List["HistoryItem"]
copy_hid_for: Dict["HistoryItem", "HistoryItem"]
copy_hid_for: List[Tuple["HistoryItem", "HistoryItem"]]

def __init__(self) -> None:
self.libraries_by_key = {}
Expand All @@ -1388,7 +1394,7 @@ def __init__(self) -> None:
self.implicit_collection_jobs_by_key: Dict[str, ImplicitCollectionJobs] = {}
self.workflows_by_key: Dict[str, model.Workflow] = {}
self.requires_hid = []
self.copy_hid_for = {}
self.copy_hid_for = []

self.new_history: Optional[model.History] = None

Expand Down Expand Up @@ -2301,6 +2307,14 @@ def add_implicit_conversion_dataset(
include_files: bool,
conversion: model.ImplicitlyConvertedDatasetAssociation,
) -> None:
parent_hda = conversion.parent_hda
if parent_hda and parent_hda not in self.included_datasets:
# We should always include the parent of an implicit conversion
# to avoid holes in the provenance.
self.included_datasets[parent_hda] = (parent_hda, include_files)
grand_parent_association = parent_hda.implicitly_converted_parent_datasets
if grand_parent_association and (grand_parent_hda := grand_parent_association[0].parent_hda):
self.add_implicit_conversion_dataset(grand_parent_hda, include_files, grand_parent_association[0])
self.included_datasets[dataset] = (dataset, include_files)
self.dataset_implicit_conversions[dataset] = conversion

Expand Down
67 changes: 42 additions & 25 deletions lib/galaxy/tools/actions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from galaxy import model
from galaxy.exceptions import (
AuthenticationRequired,
ItemAccessibilityException,
RequestParameterInvalidException,
)
Expand Down Expand Up @@ -726,14 +727,6 @@ def handle_output(name, output, hidden=None):
# Remap any outputs if this is a rerun and the user chose to continue dependent jobs
# This functionality requires tracking jobs in the database.
if app.config.track_jobs_in_database and rerun_remap_job_id is not None:
# Need to flush here so that referencing outputs by id works
session = trans.sa_session()
try:
session.expire_on_commit = False
with transaction(session):
session.commit()
finally:
session.expire_on_commit = True
self._remap_job_on_rerun(
trans=trans,
galaxy_session=galaxy_session,
Expand Down Expand Up @@ -774,30 +767,54 @@ def handle_output(name, output, hidden=None):

return job, out_data, history

def _remap_job_on_rerun(self, trans, galaxy_session, rerun_remap_job_id, current_job, out_data):
def _remap_job_on_rerun(
self,
trans: ProvidesHistoryContext,
galaxy_session: Optional[model.GalaxySession],
rerun_remap_job_id: int,
current_job: Job,
out_data,
):
"""
Re-connect dependent datasets for a job that is being rerun (because it failed initially).
If a job fails, the user has the option to try the job again with changed parameters.
To be able to resume jobs that depend on this jobs output datasets we change the dependent's job
input datasets to be those of the job that is being rerun.
"""
old_job = trans.sa_session.get(Job, rerun_remap_job_id)
if not old_job:
# I don't think that can really happen
raise RequestParameterInvalidException("rerun_remap_job_id parameter is invalid")
old_tool = trans.app.toolbox.get_tool(old_job.tool_id, exact=False)
new_tool = trans.app.toolbox.get_tool(current_job.tool_id, exact=False)
if old_tool and new_tool and old_tool.old_id != new_tool.old_id:
# If we currently only have the old or new tool installed we'll find the other tool anyway with `exact=False`.
# If we don't have the tool at all we'll fail anyway, no need to worry here.
raise RequestParameterInvalidException(
f"Old tool id ({old_job.tool_id}) does not match rerun tool id ({current_job.tool_id})"
)
if trans.user is not None:
if old_job.user_id != trans.user.id:
raise RequestParameterInvalidException(
"Cannot remap job dependencies for job not created by current user."
)
elif trans.user is None and galaxy_session:
if old_job.session_id != galaxy_session.id:
raise RequestParameterInvalidException(
"Cannot remap job dependencies for job not created by current user."
)
else:
raise AuthenticationRequired("Authentication required to remap job dependencies")
# Need to flush here so that referencing outputs by id works
session = trans.sa_session()
try:
session.expire_on_commit = False
with transaction(session):
session.commit()
finally:
session.expire_on_commit = True
try:
old_job = trans.sa_session.get(Job, rerun_remap_job_id)
assert old_job is not None, f"({rerun_remap_job_id}/{current_job.id}): Old job id is invalid"
assert (
old_job.tool_id == current_job.tool_id
), f"({old_job.id}/{current_job.id}): Old tool id ({old_job.tool_id}) does not match rerun tool id ({current_job.tool_id})"
if trans.user is not None:
assert (
old_job.user_id == trans.user.id
), f"({old_job.id}/{current_job.id}): Old user id ({old_job.user_id}) does not match rerun user id ({trans.user.id})"
elif trans.user is None and isinstance(galaxy_session, trans.model.GalaxySession):
assert (
old_job.session_id == galaxy_session.id
), f"({old_job.id}/{current_job.id}): Old session id ({old_job.session_id}) does not match rerun session id ({galaxy_session.id})"
else:
raise Exception(f"({old_job.id}/{current_job.id}): Remapping via the API is not (yet) supported")
# Start by hiding current job outputs before taking over the old job's (implicit) outputs.
current_job.hide_outputs(flush=False)
# Duplicate PJAs before remap.
Expand All @@ -819,7 +836,7 @@ def _remap_job_on_rerun(self, trans, galaxy_session, rerun_remap_job_id, current
for jtod in old_job.output_datasets:
for job_to_remap, jtid in [(jtid.job, jtid) for jtid in jtod.dataset.dependent_jobs]:
if (trans.user is not None and job_to_remap.user_id == trans.user.id) or (
trans.user is None and job_to_remap.session_id == galaxy_session.id
trans.user is None and galaxy_session and job_to_remap.session_id == galaxy_session.id
):
self.__remap_parameters(job_to_remap, jtid, jtod, out_data)
trans.sa_session.add(job_to_remap)
Expand Down
12 changes: 8 additions & 4 deletions lib/galaxy/tools/parameters/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -1468,8 +1468,8 @@ def get_column_list(self, trans, other_values):
# Use representative dataset if a dataset collection is parsed
if isinstance(dataset, HistoryDatasetCollectionAssociation):
dataset = dataset.to_hda_representative()
if isinstance(dataset, DatasetCollectionElement) and dataset.hda:
dataset = dataset.hda
if isinstance(dataset, DatasetCollectionElement):
dataset = dataset.first_dataset_instance()
if isinstance(dataset, HistoryDatasetAssociation) and self.ref_input and self.ref_input.formats:
direct_match, target_ext, converted_dataset = dataset.find_conversion_destination(
self.ref_input.formats
Expand Down Expand Up @@ -1561,9 +1561,13 @@ def is_file_empty(self, trans, other_values):
for dataset in util.listify(other_values.get(self.data_ref)):
# Use representative dataset if a dataset collection is parsed
if isinstance(dataset, HistoryDatasetCollectionAssociation):
dataset = dataset.to_hda_representative()
if dataset.populated:
dataset = dataset.to_hda_representative()
else:
# That's fine, we'll check again on execution
return True
if isinstance(dataset, DatasetCollectionElement):
dataset = dataset.hda
dataset = dataset.first_dataset_instance()
if isinstance(dataset, DatasetInstance):
return not dataset.has_data()
if is_runtime_value(dataset):
Expand Down
26 changes: 26 additions & 0 deletions lib/galaxy_test/api/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,32 @@ def test_no_hide_on_rerun(self):
assert hdca["visible"]
assert isoparse(hdca["update_time"]) > (isoparse(first_update_time))

def test_rerun_exception_handling(self):
with self.dataset_populator.test_history() as history_id:
other_run_response = self.dataset_populator.run_tool(
tool_id="job_properties",
inputs={},
history_id=history_id,
)
unrelated_job_id = other_run_response["jobs"][0]["id"]
run_response = self._run_map_over_error(history_id)
job_id = run_response["jobs"][0]["id"]
self.dataset_populator.wait_for_job(job_id)
failed_hdca = self.dataset_populator.get_history_collection_details(
history_id=history_id,
content_id=run_response["implicit_collections"][0]["id"],
assert_ok=False,
)
assert failed_hdca["visible"]
rerun_params = self._get(f"jobs/{job_id}/build_for_rerun").json()
inputs = rerun_params["state_inputs"]
inputs["rerun_remap_job_id"] = unrelated_job_id
before_rerun_items = self.dataset_populator.get_history_contents(history_id)
rerun_response = self._run_detect_errors(history_id=history_id, inputs=inputs)
assert "does not match rerun tool id" in rerun_response["err_msg"]
after_rerun_items = self.dataset_populator.get_history_contents(history_id)
assert len(before_rerun_items) == len(after_rerun_items)

@skip_without_tool("empty_output")
def test_common_problems(self):
with self.dataset_populator.test_history() as history_id:
Expand Down
30 changes: 30 additions & 0 deletions lib/galaxy_test/api/test_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -2578,6 +2578,36 @@ def test_implicit_reduce_with_mapping(self):
)
assert output_hdca["collection_type"] == "list"

@skip_without_tool("column_multi_param")
def test_multi_param_column_nested_list(self):
with self.dataset_populator.test_history() as history_id:
hdca = self.dataset_collection_populator.create_list_of_list_in_history(
history_id, ext="tabular", wait=True
).json()
inputs = {
"input1": {"src": "hdca", "id": hdca["id"]},
# FIXME: integers don't work here
"col": "1",
}
response = self._run("column_multi_param", history_id, inputs, assert_ok=True)
self.dataset_populator.wait_for_job(job_id=response["jobs"][0]["id"], assert_ok=True)

@skip_without_tool("column_multi_param")
def test_multi_param_column_nested_list_fails_on_invalid_column(self):
with self.dataset_populator.test_history() as history_id:
hdca = self.dataset_collection_populator.create_list_of_list_in_history(
history_id, ext="tabular", wait=True
).json()
inputs = {
"input1": {"src": "hdca", "id": hdca["id"]},
"col": "10",
}
try:
self._run("column_multi_param", history_id, inputs, assert_ok=True)
except AssertionError as e:
exception_raised = e
assert exception_raised, "Expected invalid column selection to fail job"

@skip_without_tool("column_multi_param")
def test_implicit_conversion_and_reduce(self):
with self.dataset_populator.test_history() as history_id:
Expand Down
4 changes: 2 additions & 2 deletions lib/galaxy_test/base/populators.py
Original file line number Diff line number Diff line change
Expand Up @@ -2906,7 +2906,7 @@ def __create_payload(self, history_id: str, *args, **kwds):
else:
return self.__create_payload_collection(history_id, *args, **kwds)

def __create_payload_fetch(self, history_id: str, collection_type, **kwds):
def __create_payload_fetch(self, history_id: str, collection_type, ext="txt", **kwds):
contents = None
if "contents" in kwds:
contents = kwds["contents"]
Expand All @@ -2928,7 +2928,7 @@ def __create_payload_fetch(self, history_id: str, collection_type, **kwds):
elements.append(contents_level)
continue

element = {"src": "pasted", "ext": "txt"}
element = {"src": "pasted", "ext": ext}
# Else older style list of contents or element ID and contents,
# convert to fetch API.
if isinstance(contents_level, tuple):
Expand Down
Loading

0 comments on commit 7dd606a

Please sign in to comment.