Skip to content

Commit

Permalink
Merge branch 'release_23.1' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
nsoranzo committed Sep 5, 2023
2 parents 9877fed + 5c38cae commit 82c30f6
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 16 deletions.
4 changes: 2 additions & 2 deletions lib/galaxy/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,8 +588,6 @@ def __init__(self, configure_logging=True, use_converters=True, use_display_appl

self._configure_tool_shed_registry()
self._register_singleton(tool_shed_registry.Registry, self.tool_shed_registry)
# Tool Data Tables
self._configure_tool_data_tables(from_shed_config=False)
self._register_celery_galaxy_task_components()

def _register_celery_galaxy_task_components(self):
Expand Down Expand Up @@ -665,6 +663,8 @@ def __init__(self, **kwargs) -> None:
)
self.api_keys_manager = self._register_singleton(ApiKeyManager)

# Tool Data Tables
self._configure_tool_data_tables(from_shed_config=False)
# Load dbkey / genome build manager
self._configure_genome_builds(data_table_name="__dbkeys__", load_old_style=True)

Expand Down
10 changes: 10 additions & 0 deletions lib/galaxy/celery/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from galaxy.model.scoped_session import galaxy_scoped_session
from galaxy.objectstore import BaseObjectStore
from galaxy.objectstore.caching import check_caches
from galaxy.queue_worker import GalaxyQueueWorker
from galaxy.schema.tasks import (
ComputeDatasetHashTaskRequest,
GenerateHistoryContentDownload,
Expand All @@ -63,6 +64,11 @@
log = get_logger(__name__)


@lru_cache()
def setup_data_table_manager(app):
app._configure_tool_data_tables(from_shed_config=False)


@lru_cache()
def cached_create_tool_from_representation(app, raw_tool_source):
return create_tool_from_representation(
Expand Down Expand Up @@ -417,6 +423,7 @@ def compute_dataset_hash(

@galaxy_task(action="import a data bundle")
def import_data_bundle(
app: MinimalManagerApp,
hda_manager: HDAManager,
ldda_manager: LDDAManager,
tool_data_import_manager: ToolDataImportManager,
Expand All @@ -427,6 +434,7 @@ def import_data_bundle(
tool_data_file_path: Optional[str] = None,
task_user_id: Optional[int] = None,
):
setup_data_table_manager(app)
if src == "uri":
assert uri
tool_data_import_manager.import_data_bundle_by_uri(config, uri, tool_data_file_path=tool_data_file_path)
Expand All @@ -438,6 +446,8 @@ def import_data_bundle(
else:
dataset = ldda_manager.by_id(id)
tool_data_import_manager.import_data_bundle_by_dataset(config, dataset, tool_data_file_path=tool_data_file_path)
queue_worker = GalaxyQueueWorker(app)
queue_worker.send_control_task("reload_tool_data_tables")


@galaxy_task(action="pruning history audit table")
Expand Down
11 changes: 9 additions & 2 deletions lib/galaxy/model/dataset_collections/structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,19 @@ def _walk_collections(self, collection_dict):
def get_element(collection):
return collection[index] # noqa: B023

when_value = None
if self.when_values:
if len(self.when_values) == 1:
when_value = self.when_values[0]
else:
when_value = self.when_values[index]

if substructure.is_leaf:
yield dict_map(get_element, collection_dict), self.when_values[index] if self.when_values else None
yield dict_map(get_element, collection_dict), when_value
else:
sub_collections = dict_map(lambda collection: get_element(collection).child_collection, collection_dict)
for element, _when_value in substructure._walk_collections(sub_collections):
yield element, self.when_values[index] if self.when_values else None
yield element, when_value

@property
def is_leaf(self):
Expand Down
18 changes: 16 additions & 2 deletions lib/galaxy/tools/actions/model_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def execute(
execution_cache=None,
collection_info=None,
job_callback=None,
skip=False,
**kwargs,
):
incoming = incoming or {}
Expand Down Expand Up @@ -93,12 +94,16 @@ def execute(
history=history,
tags=preserved_tags,
hdca_tags=preserved_hdca_tags,
skip=skip,
)
self._record_inputs(trans, tool, job, incoming, inp_data, inp_dataset_collections)
self._record_outputs(job, out_data, output_collections)
if job_callback:
job_callback(job)
job.state = job.states.OK
if skip:
job.state = job.states.SKIPPED
else:
job.state = job.states.OK
trans.sa_session.add(job)

# Queue the job for execution
Expand All @@ -108,7 +113,7 @@ def execute(
return job, out_data, history

def _produce_outputs(
self, trans: "ProvidesUserContext", tool, out_data, output_collections, incoming, history, tags, hdca_tags
self, trans: "ProvidesUserContext", tool, out_data, output_collections, incoming, history, tags, hdca_tags, skip
):
tag_handler = trans.tag_handler
tool.produce_outputs(
Expand All @@ -128,4 +133,13 @@ def _produce_outputs(
value.visible = False
mapped_over_elements[name].hda = value

# We probably need to mark all outputs as skipped, not just the outputs of whatever the database op tools do ?
# This is probably not exactly right, but it might also work in most cases
if skip:
for output_collection in output_collections.out_collections.values():
output_collection.mark_as_populated()
for hdca in output_collections.out_collection_instances.values():
hdca.visible = False
# Would we also need to replace the datasets with skipped datasets?

trans.sa_session.add_all(out_data.values())
37 changes: 27 additions & 10 deletions lib/galaxy/workflow/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,8 +512,15 @@ def compute_collection_info(self, progress, step, all_inputs):
collections_to_match = self._find_collections_to_match(progress, step, all_inputs)
# Have implicit collections...
collection_info = self.trans.app.dataset_collection_manager.match_collections(collections_to_match)
if collection_info and progress.subworkflow_collection_info:
collection_info.when_values = progress.subworkflow_collection_info.when_values
if collection_info:
if progress.subworkflow_collection_info:
# We've mapped over a subworkflow. Slices of the invocation might be conditional
# and progress.subworkflow_collection_info.when_values holds the appropriate when_values
collection_info.when_values = progress.subworkflow_collection_info.when_values
else:
# The invocation is not mapped over, but it might still be conditional.
# Multiplication and linking should be handled by slice_collection()
collection_info.when_values = progress.when_values
return collection_info or progress.subworkflow_collection_info

def _find_collections_to_match(self, progress, step, all_inputs):
Expand Down Expand Up @@ -763,12 +770,15 @@ def execute(self, trans, progress, invocation_step, use_cached_job=False):
assert len(progress.when_values) == 1, "Got more than 1 when value, this shouldn't be possible"
iteration_elements_iter = [(None, progress.when_values[0] if progress.when_values else None)]

when_values = []
if step.when_expression:
for iteration_elements, when_value in iteration_elements_iter:
if when_value is False:
when_values.append(when_value)
continue
when_values: List[Union[bool, None]] = []
for iteration_elements, when_value in iteration_elements_iter:
if when_value is False or not step.when_expression:
# We're skipping this step (when==False) or we keep
# the current when_value if there is no explicit when_expression on this step.
when_values.append(when_value)
else:
# Got a conditional step and we could potentially run it,
# so we have to build the step state and evaluate the expression
extra_step_state = {}
for step_input in step.inputs:
step_input_name = step_input.name
Expand All @@ -783,8 +793,8 @@ def execute(self, trans, progress, invocation_step, use_cached_job=False):
progress, step, execution_state={}, extra_step_state=extra_step_state
)
)
if collection_info:
collection_info.when_values = when_values
if collection_info:
collection_info.when_values = when_values

subworkflow_invoker = progress.subworkflow_invoker(
trans,
Expand Down Expand Up @@ -2295,6 +2305,13 @@ def callback(input, prefixed_name, **kwargs):
self._handle_mapped_over_post_job_actions(
step, step_inputs, step_outputs, progress.effective_replacement_dict()
)
if progress.when_values == [False] and not progress.subworkflow_collection_info:
# Step skipped entirely. We hide the output to avoid confusion.
# Could be revisited if we have a nice visual way to say these are skipped ?
for output in step_outputs.values():
if isinstance(output, (model.HistoryDatasetAssociation, model.HistoryDatasetCollectionAssociation)):
output.visible = False

if execution_tracker.execution_errors:
# TODO: formalize into InvocationFailure ?
message = f"Failed to create {len(execution_tracker.execution_errors)} job(s) for workflow step {step.order_index + 1}: {str(execution_tracker.execution_errors[0])}"
Expand Down
138 changes: 138 additions & 0 deletions lib/galaxy_test/api/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2042,6 +2042,66 @@ def test_run_workflow_fails_when_expression_not_boolean(self):
assert message["details"] == "Type is: str"
assert message["workflow_step_id"] == 2

def test_run_workflow_subworkflow_conditional_with_simple_mapping_step(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_workflow(
"""class: GalaxyWorkflow
inputs:
should_run:
type: boolean
some_collection:
type: data_collection
steps:
subworkflow:
run:
class: GalaxyWorkflow
inputs:
some_collection:
type: data_collection
should_run:
type: boolean
steps:
a_tool_step:
tool_id: cat1
in:
input1: some_collection
in:
some_collection: some_collection
should_run: should_run
outputs:
inner_out: a_tool_step/out_file1
when: $(inputs.should_run)
outputs:
outer_output:
outputSource: subworkflow/inner_out
""",
test_data="""
some_collection:
collection_type: list
elements:
- identifier: true
content: A
- identifier: false
content: B
type: File
should_run:
value: false
type: raw
""",
history_id=history_id,
wait=True,
assert_ok=True,
)
invocation_details = self.workflow_populator.get_invocation(summary.invocation_id, step_details=True)
subworkflow_invocation_id = invocation_details["steps"][-1]["subworkflow_invocation_id"]
self.workflow_populator.wait_for_invocation_and_jobs(
history_id=history_id, workflow_id="whatever", invocation_id=subworkflow_invocation_id
)
invocation_details = self.workflow_populator.get_invocation(subworkflow_invocation_id, step_details=True)
for step in invocation_details["steps"]:
if step["workflow_step_label"] == "a_tool_step":
assert sum(1 for j in step["jobs"] if j["state"] == "skipped") == 2

def test_run_workflow_subworkflow_conditional_step(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_workflow(
Expand Down Expand Up @@ -2097,6 +2157,84 @@ def test_run_workflow_subworkflow_conditional_step(self):
if step["workflow_step_label"] == "a_tool_step":
assert sum(1 for j in step["jobs"] if j["state"] == "skipped") == 1

def test_run_nested_conditional_workflow_steps(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_workflow(
"""
class: GalaxyWorkflow
inputs:
dataset:
type: data
when:
type: boolean
outputs:
output:
outputSource: outer_subworkflow/output
steps:
- label: outer_subworkflow
when: $(inputs.when)
in:
dataset:
source: dataset
when:
source: when
run:
class: GalaxyWorkflow
label: subworkflow cat1
inputs:
dataset:
type: data
outputs:
output:
outputSource: cat1_workflow/output
steps:
- label: cat1_workflow
in:
dataset:
source: dataset
run:
class: GalaxyWorkflow
label: cat1
inputs:
dataset:
type: data
outputs:
output:
outputSource: cat1/out_file1
steps:
- tool_id: cat1
label: cat1
in:
input1:
source: dataset
""",
test_data="""
dataset:
value: 1.bed
type: File
when:
value: false
type: raw
""",
history_id=history_id,
wait=True,
assert_ok=True,
)
invocation_details = self.workflow_populator.get_invocation(summary.invocation_id, step_details=True)
subworkflow_invocation_id = invocation_details["steps"][-1]["subworkflow_invocation_id"]
self.workflow_populator.wait_for_invocation_and_jobs(
history_id=history_id, workflow_id="whatever", invocation_id=subworkflow_invocation_id
)
invocation_details = self.workflow_populator.get_invocation(subworkflow_invocation_id, step_details=True)
subworkflow_invocation_id = invocation_details["steps"][-1]["subworkflow_invocation_id"]
self.workflow_populator.wait_for_invocation_and_jobs(
history_id=history_id, workflow_id="whatever", invocation_id=subworkflow_invocation_id
)
invocation_details = self.workflow_populator.get_invocation(subworkflow_invocation_id, step_details=True)
for step in invocation_details["steps"]:
if step["workflow_step_label"] == "cat1":
assert sum(1 for j in step["jobs"] if j["state"] == "skipped") == 1

def test_run_workflow_conditional_step_map_over_expression_tool(self):
with self.dataset_populator.test_history() as history_id:
summary = self._run_workflow(
Expand Down
2 changes: 2 additions & 0 deletions test/integration/test_tool_data_delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ def _testbase_fields(self):
return show_response.json()["fields"]

def _testbeta_field_count(self) -> int:
# We need to wait for the reload message to reach the control message consumer
time.sleep(1)
return len(self._testbase_fields())


Expand Down

0 comments on commit 82c30f6

Please sign in to comment.