Skip to content

Commit

Permalink
rollback add_default_workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
antgonza committed Feb 20, 2024
1 parent 2ef9604 commit f063ca0
Showing 1 changed file with 30 additions and 41 deletions.
71 changes: 30 additions & 41 deletions qiita_db/metadata_template/prep_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -790,22 +790,21 @@ def _get_node_info(workflow, node):
ccmd.name, ccmd.merging_scheme, parent_cmd_name,
parent_merging_scheme, cparams, [], pparams)

def _get_previous_predecessors(workflow, node):
parents, cxns = [], []
for pnode in workflow.graph.predecessors(node):
parents.append(pnode)
cxns.append(
{x[0]: x[1] for x in workflow.graph.get_edge_data(
pnode, node)['connections'].connections})
return [parents, node, cxns]

def _get_predecessors(workflow, node):
data = [_get_previous_predecessors(workflow, node)]
for pnode in data[0][0]:
d = _get_previous_predecessors(workflow, pnode)
if d[0]:
data.append(d)
return data
# recursive method to get predecessors of a given node
pred = []

for pnode in workflow.graph.predecessors(node):
pred = _get_predecessors(workflow, pnode)
cxns = {x[0]: x[2]
for x in workflow.graph.get_edge_data(
pnode, node)['connections'].connections}
data = [pnode, node, cxns]
if pred is None:
pred = [data]
else:
pred.append(data)
return pred

# Note: we are going to use the final BIOMs to figure out which
# processing is missing from the back/end to the front, as this
Expand Down Expand Up @@ -915,32 +914,24 @@ def _get_predecessors(workflow, node):
previous_jobs = dict()
for ma, node in wk_data.items():
predecessors = _get_predecessors(wk, node)
predecessors.reverse()
cmds_to_create = []
init_artifacts = None
# we only need to "loop" over the first element
for pnodes, cnode, cxns in predecessors[:1]:
for i, (pnode, cnode, cxns) in enumerate(predecessors):
cdp = cnode.default_parameter
cdp_cmd = cdp.command
params = cdp.values.copy()

# note that this assume that commands cannot have
# multiple inputs with the same name, which should
# be a safe assumption
for pnode, cxn in zip(pnodes, cxns):
info = _get_node_info(wk, pnode)
if info in merging_schemes:
if init_artifacts is None:
init_artifacts = dict()
for k, v in merging_schemes[info].items():
if k in cxn:
k = cxn[k]
init_artifacts[k] = v

rp = cdp_cmd.required_parameters
reqp = {x: y[1][0] for x, y in rp.items()}

icxns = {y: x for x, y in cxns.items()}
reqp = {x: icxns[y[1][0]]
for x, y in cdp_cmd.required_parameters.items()}
cmds_to_create.append([cdp_cmd, params, reqp])

info = _get_node_info(wk, pnode)
if info in merging_schemes:
if set(merging_schemes[info]) >= set(cxns):
init_artifacts = merging_schemes[info]
break
if init_artifacts is None:
pdp = pnode.default_parameter
pdp_cmd = pdp.command
Expand Down Expand Up @@ -975,17 +966,15 @@ def _get_predecessors(workflow, node):
previous_job = current_job
if previous_job is None:
req_params = dict()
for aname, aid in rp.items():
if aname not in init_artifacts:
msg = (f'Missing Artifact type: "{aname}" in '
for iname, dname in rp.items():
if dname not in init_artifacts:
msg = (f'Missing Artifact type: "{dname}" in '
'this preparation; this might be due '
'to missing steps or not having the '
'correct raw data.')
init_artifacts[aname] = init_artifacts[aid]
if aname not in init_artifacts:
# raises option c.
raise ValueError(msg)
req_params[aname] = init_artifacts[aname]
# raises option c.
raise ValueError(msg)
req_params[iname] = init_artifacts[dname]
else:
req_params = dict()
connections = dict()
Expand Down

0 comments on commit f063ca0

Please sign in to comment.