Skip to content

Commit

Permalink
WIP recover from interrupted workflows
Browse files Browse the repository at this point in the history
  • Loading branch information
Sandip117 committed Feb 1, 2024
1 parent b6e77ce commit de6b5f2
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 6 deletions.
2 changes: 1 addition & 1 deletion app/controllers/subprocesses/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def __progress_percent(curr_state: int, total_states: int, state_progress: int)


def is_status_subprocess_running(workflow: WorkflowDBSchema):
proc_count = get_process_count("status", args.data)
proc_count = get_process_count("app/controllers/subprocesses/status.py", args.data)

if not workflow.stale:
return True
Expand Down
9 changes: 6 additions & 3 deletions app/controllers/subprocesses/wf_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def manage_workflow(db_key: str, test: bool):

workflow = analysis_retry(workflow)

if workflow.started or not workflow.response.status or test:
if not workflow.response.status or test:
# Do nothing and return
reason = f"Workflow request failed. {workflow.response.error}." if not workflow.response.status \
else f"Workflow already started."
Expand Down Expand Up @@ -118,6 +118,9 @@ def manage_workflow(db_key: str, test: bool):
workflow.response.error = Error.analysis.value + str(ex)
workflow.response.status = False
update_workflow(key, workflow)
case State.COMPLETED:
logger.info(f"Request is now complete. Exiting while loop. ",extra=d)
return

logger.info(f"Calling status update subprocess.", extra=d)
update_status(request)
Expand Down Expand Up @@ -166,7 +169,7 @@ def update_status(request: WorkflowRequestSchema):
"""
d_data = request_to_dict(request)
str_data = json.dumps(d_data)
proc_count = get_process_count("status", str_data)
proc_count = get_process_count("app/controllers/subprocesses/status.py", str_data)
logger.debug(f"{proc_count} subprocess of status manager running on the system.", extra=d)
if proc_count > 0:
logger.info(f"No new status subprocess started.", extra=d)
Expand Down Expand Up @@ -283,7 +286,7 @@ def do_cube_create_feed(request: WorkflowRequestSchema, cube_url: str, retries:
data_path = client.getSwiftPath(pacs_details)
logger.info(f"Received data path: {data_path}", extra=d)
if retries < 5:
feed_name = feed_name + f"retry#{retries}"
feed_name = feed_name + f"-retry#{retries}"

# Get plugin Id
plugin_search_params = {"name": "pl-dircopy"}
Expand Down
4 changes: 2 additions & 2 deletions app/controllers/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def manage_workflow(str_data: str, mode: str):
"""
Manage a workflow request in a separate subprocess
"""
proc_count = get_process_count("wf_manager", str_data)
proc_count = get_process_count("app/controllers/subprocesses/wf_manager.py", str_data)
logger.debug(f"{proc_count} subprocess of workflow manager running on the system.", extra=d)
if proc_count > 0:
logger.info(f"No new manager subprocess started.", extra=d)
Expand All @@ -196,7 +196,7 @@ def update_workflow_status(str_data: str, mode: str):
"""
Update the current status of a workflow request in a separate process
"""
proc_count = get_process_count("status", str_data)
proc_count = get_process_count("app/controllers/subprocesses/status.py", str_data)
logger.debug(f"{proc_count} subprocess of status manager running on the system.", extra=d)
if proc_count > 0:
logger.info(f"No new status subprocess started.", extra=d)
Expand Down

0 comments on commit de6b5f2

Please sign in to comment.