Skip to content

Commit

Permalink
merged and fixed conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
Acribbs committed Nov 3, 2024
2 parents c436807 + e9d153a commit c1e8b3a
Show file tree
Hide file tree
Showing 5 changed files with 397 additions and 219 deletions.
1 change: 1 addition & 0 deletions all-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ pytest -v tests/test_pipeline_control.py
pytest -v tests/test_pipeline_execution.py
pytest -v tests/test_pipeline_cli.py
pytest -v tests/test_pipeline_actions.py
pytest -v tests/test_execution_cleanup.py
pytest -v tests/test_s3_decorators.py
351 changes: 133 additions & 218 deletions cgatcore/pipeline/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
from cgatcore.experiment import get_header, MultiLineFormatter
from cgatcore.pipeline.utils import get_caller, get_caller_locals, is_test
from cgatcore.pipeline.execution import execute, start_session, \
close_session
close_session, Executor


# redirect os.stat and other OS utilities to cached versions to speed
Expand Down Expand Up @@ -869,6 +869,9 @@ def parse_commandline(argv=None, optparse=True, **kwargs):
type=str,
help="working directory. Will be created if it does not exist")

parser.add_argument("--cleanup-on-fail", action="store_true", default=True,
help="Enable cleanup of jobs on pipeline failure.")

group = parser.add_argument_group("pipeline logging configuration")

group.add_argument("--pipeline-logfile", dest="pipeline_logfile",
Expand Down Expand Up @@ -1256,239 +1259,151 @@ def initialize(argv=None, caller=None, defaults=None, optparse=True, **kwargs):


def run_workflow(args, argv=None, pipeline=None):
"""run workflow given options in args.
argv is kept for backwards compatibility.
"""

logger = logging.getLogger("cgatcore.pipeline")
logger.debug(f"Starting run_workflow with action {args.pipeline_action}")

# Instantiate Executor to manage job tracking and cleanup
executor = Executor(job_threads=args.multiprocess, work_dir=get_params()["work_dir"])
executor.setup_signal_handlers() # Set up signal handlers for cleanup on interruption

# Determine tasks to force-run if specified
forcedtorun_tasks = (
ruffus.pipeline_get_task_names() if args.force_run == "all" else args.pipeline_targets
) if args.force_run else []

# Start workflow execution based on the specified action
try:
# Ensure the temporary directory exists
if not os.path.exists(get_params()["tmpdir"]):
logger.warn(f"Local temporary directory {get_params()['tmpdir']} did not exist - created")
try:
os.makedirs(get_params()["tmpdir"])
except OSError:
pass

logger.debug("starting run_workflow with action {}".format(
args.pipeline_action))
logger.info(f"Temporary directory is {get_params()['tmpdir']}")

if args.force_run:
if args.force_run == "all":
forcedtorun_tasks = ruffus.pipeline_get_task_names()
else:
forcedtorun_tasks = args.pipeline_targets
else:
forcedtorun_tasks = []

# create local scratch if it does not already exists. Note that
# directory itself will be not deleted while its contents should
# be cleaned up.
if not os.path.exists(get_params()["tmpdir"]):
logger.warn("local temporary directory {} did not exist - created".format(
get_params()["tmpdir"]))
try:
os.makedirs(get_params()["tmpdir"])
except OSError:
# file exists
pass

logger.info("temporary directory is {}".format(get_params()["tmpdir"]))

# set multiprocess to a sensible setting if there is no cluster
run_on_cluster = HAS_DRMAA is True and not args.without_cluster
if args.multiprocess is None:
if not run_on_cluster:
args.multiprocess = int(math.ceil(
multiprocessing.cpu_count() / 2.0))
else:
args.multiprocess = 40

# see inputValidation function in Parameters.py
if args.input_validation:
input_validation(get_params(), sys.argv[0])

elif args.pipeline_action == "debug":
# create the session proxy
start_session()

method_name = args.pipeline_targets[0]
caller = get_caller()
method = getattr(caller, method_name)
method(*args.pipeline_targets[1:])

elif args.pipeline_action in ("make",
"show",
"state",
"svg",
"plot",
"dot",
"touch",
"regenerate"):

messenger = None
try:
# Configure multiprocessing settings
run_on_cluster = HAS_DRMAA and not args.without_cluster
args.multiprocess = args.multiprocess or (
int(math.ceil(multiprocessing.cpu_count() / 2.0)) if not run_on_cluster else 40
)

# Start pipeline session for 'make' action
if args.pipeline_action == "make":
start_session()
try:
# Run pipeline and catch any errors
ruffus.pipeline_run(
args.pipeline_targets, forcedtorun_tasks=forcedtorun_tasks,
logger=logger, verbose=args.loglevel, log_exceptions=args.log_exceptions,
exceptions_terminate_immediately=args.exceptions_terminate_immediately,
checksum_level=args.ruffus_checksums_level, pipeline=pipeline,
one_second_per_job=False
)

except ruffus.ruffus_exceptions.RethrownJobError as ex:
if not args.debug:
# Summarise errors if debug mode is off
error_summary = f"{len(ex.args)} tasks encountered errors. Summary:"
error_messages = []
for idx, e in enumerate(ex.args):
task, job, error, msg, traceback = e
task = re.sub("__main__.", "", task or 'Unknown task')
job = re.sub(r"\s", "", job or 'Unknown job')
msg = msg if isinstance(msg, str) else str(msg) if msg else "No specific message"
if len(msg.splitlines()) > 1:
msg = "" # Show only single-line messages
error_messages.append(
f"{idx + 1}: Task={task}, Error={error}, Job={job}, Message={msg}"
)

E.error(error_summary)
for message in error_messages:
E.error(message)
E.error(f"Full traceback can be found in {args.pipeline_logfile}")
logger.error("Start of all error messages")
logger.error(ex)
logger.error("End of all error messages")

# Execute cleanup if configured
if getattr(args, "cleanup_on_fail", True): # Check if cleanup is enabled on fail
logger.info("Cleaning up all jobs due to pipeline failure.")
executor.cleanup_all_jobs()

raise ValueError("Pipeline failed with errors") from ex
finally:
# Close pipeline session
close_session()

elif args.pipeline_action in (
"show", "touch", "regenerate", "svg", "state"
):
with cache_os_functions():
if args.pipeline_action == "make":

if not args.without_cluster and not HAS_DRMAA and not get_params()['testing']:
E.critical(
"DRMAA API not found so cannot talk to a cluster.")
E.critical("Please use --local to run the pipeline"
" on this host: {}".format(os.uname()[1]))
sys.exit(-1)

# get tasks to be done. This essentially replicates
# the state information within ruffus.
stream = StringIO()
if args.pipeline_action == "show":
ruffus.pipeline_printout(
stream,
args.pipeline_targets,
verbose=5,
pipeline=pipeline,
checksum_level=args.ruffus_checksums_level)

messenger = LoggingFilterProgress(stream.getvalue())
logger.addFilter(messenger)

global task
if args.without_cluster:
# use ThreadPool to avoid taking multiple CPU for pipeline
# controller.
opts = {"multithread": args.multiprocess}
else:
# use cooperative multitasking instead of multiprocessing.
opts = {"multiprocess": args.multiprocess,
"pool_manager": "gevent"}
# create the session proxy
start_session()

logger.info("current directory is {}".format(os.getcwd()))

ruffus.pipeline_run(
args.pipeline_targets,
forcedtorun_tasks=forcedtorun_tasks,
logger=logger,
verbose=args.loglevel,
log_exceptions=args.log_exceptions,
exceptions_terminate_immediately=args.exceptions_terminate_immediately,
checksum_level=args.ruffus_checksums_level,
pipeline=pipeline,
one_second_per_job=False,
**opts
args.stdout, args.pipeline_targets, forcedtorun_tasks=forcedtorun_tasks,
verbose=args.loglevel, pipeline=pipeline,
checksum_level=args.ruffus_checksums_level
)

close_session()

elif args.pipeline_action == "show":
ruffus.pipeline_printout(
args.stdout,
args.pipeline_targets,
forcedtorun_tasks=forcedtorun_tasks,
verbose=args.loglevel,
pipeline=pipeline,
checksum_level=args.ruffus_checksums_level)

elif args.pipeline_action == "touch":
ruffus.pipeline_run(
args.pipeline_targets,
touch_files_only=True,
verbose=args.loglevel,
pipeline=pipeline,
checksum_level=args.ruffus_checksums_level)

args.pipeline_targets, touch_files_only=True,
verbose=args.loglevel, pipeline=pipeline,
checksum_level=args.ruffus_checksums_level
)
elif args.pipeline_action == "regenerate":
ruffus.pipeline_run(
args.pipeline_targets,
touch_files_only=args.ruffus_checksums_level,
pipeline=pipeline,
verbose=args.loglevel)

args.pipeline_targets, touch_files_only=args.ruffus_checksums_level,
pipeline=pipeline, verbose=args.loglevel
)
elif args.pipeline_action == "svg":
ruffus.pipeline_printout_graph(
args.stdout.buffer,
args.pipeline_format,
args.pipeline_targets,
forcedtorun_tasks=forcedtorun_tasks,
pipeline=pipeline,
checksum_level=args.ruffus_checksums_level)

args.stdout.buffer, args.pipeline_format, args.pipeline_targets,
forcedtorun_tasks=forcedtorun_tasks, pipeline=pipeline,
checksum_level=args.ruffus_checksums_level
)
elif args.pipeline_action == "state":
ruffus_return_dag(
args.stdout,
target_tasks=args.pipeline_targets,
forcedtorun_tasks=forcedtorun_tasks,
verbose=args.loglevel,
pipeline=pipeline,
checksum_level=args.ruffus_checksums_level)

elif args.pipeline_action == "plot":
outf, filename = tempfile.mkstemp()
ruffus.pipeline_printout_graph(
os.fdopen(outf, "wb"),
args.pipeline_format,
args.pipeline_targets,
pipeline=pipeline,
checksum_level=args.ruffus_checksums_level)
execute("inkscape %s" % filename)
os.unlink(filename)

except ruffus.ruffus_exceptions.RethrownJobError as ex:

if not args.debug:
E.error("%i tasks with errors, please see summary below:" %
len(ex.args))
for idx, e in enumerate(ex.args):
task, job, error, msg, traceback = e

if task is None:
# this seems to be errors originating within ruffus
# such as a missing dependency
# msg then contains a RethrownJobJerror
msg = str(msg)
else:
task = re.sub("__main__.", "", task)
job = re.sub(r"\s", "", job)

# display only single line messages
if len([x for x in msg.split("\n") if x != ""]) > 1:
msg = ""

E.error("%i: Task=%s Error=%s %s: %s" %
(idx, task, error, job, msg))

E.error("full traceback is in %s" % args.pipeline_logfile)

logger.error("start of all error messages")
logger.error(ex)
logger.error("end of all error messages")
raise ValueError("pipeline failed with errors") from ex
else:
raise

elif args.pipeline_action == "dump":
args.stdout.write((json.dumps(get_params())) + "\n")

elif args.pipeline_action == "printconfig":
E.info("printing out pipeline parameters: ")
p = get_params()
for k in sorted(get_params()):
print(k, "=", p[k])
print_config_files()

elif args.pipeline_action == "config":
# Level needs to be 2:
# 0th level -> cgatflow.py
# 1st level -> Control.py
# 2nd level -> pipeline_xyz.py
f = sys._getframe(2)
caller = f.f_globals["__file__"]
pipeline_path = os.path.splitext(caller)[0]
general_path = os.path.join(os.path.dirname(pipeline_path),
"configuration")
write_config_files(pipeline_path, general_path)

elif args.pipeline_action == "clone":
clone_pipeline(args.pipeline_targets[0])
args.stdout, target_tasks=args.pipeline_targets,
forcedtorun_tasks=forcedtorun_tasks, verbose=args.loglevel,
pipeline=pipeline, checksum_level=args.ruffus_checksums_level
)

else:
raise ValueError("unknown pipeline action %s" %
args.pipeline_action)
# Dump pipeline parameters
elif args.pipeline_action == "dump":
args.stdout.write((json.dumps(get_params())) + "\n")

E.stop(logger=get_logger())
# Print configuration settings
elif args.pipeline_action == "printconfig":
E.info("Printing out pipeline parameters:")
for k, v in sorted(get_params().items()):
print(k, "=", v)
print_config_files()

# Generate default config files
elif args.pipeline_action == "config":
pipeline_path = os.path.splitext(get_caller().__file__)[0]
general_path = os.path.join(os.path.dirname(pipeline_path), "configuration")
write_config_files(pipeline_path, general_path)

# Clone pipeline structure
elif args.pipeline_action == "clone":
clone_pipeline(args.pipeline_targets[0])

else:
raise ValueError(f"Unknown pipeline action {args.pipeline_action}")

except Exception as e:
logger.exception("An error occurred during pipeline execution.")
if getattr(args, "cleanup_on_fail", True):
logger.info("Cleaning up all jobs due to pipeline failure.")
executor.cleanup_all_jobs()
raise e

finally:
# End of pipeline run, stop logging
E.stop(logger=get_logger())


def main(argv=None):
Expand Down
Loading

0 comments on commit c1e8b3a

Please sign in to comment.