diff --git a/pyproject.toml b/pyproject.toml index b7637c7..fa25f73 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,6 +36,7 @@ classifiers = [ requires-python = ">=3.11" dependencies = [ "argparse", + "ccbr_tools@git+https://github.com/CCBR/Tools", "Click >= 8.1.3", "PySimpleGui < 5", "snakemake >= 7.32, < 8", diff --git a/src/renee/__main__.py b/src/renee/__main__.py index 2a46b9c..7d3c181 100755 --- a/src/renee/__main__.py +++ b/src/renee/__main__.py @@ -12,7 +12,6 @@ """ # Python standard library -from __future__ import print_function from shutil import copy import json import os @@ -22,22 +21,22 @@ # 3rd party imports from pypi import argparse - -# local imports -from .cache import get_sif_cache_dir -from .run import run -from .dryrun import dryrun -from .gui import launch_gui -from .conditions import fatal -from .util import ( +from ccbr_tools.pipeline.util import ( get_hpcname, get_tmp_dir, get_genomes_list, - get_version, check_python_version, _cp_r_safe_, - orchestrate, ) +from ccbr_tools.pipeline.cache import get_sif_cache_dir + +# local imports +from .run import run +from .dryrun import dryrun +from .gui import launch_gui +from .conditions import fatal +from .util import renee_base, get_version +from .orchestrate import orchestrate # Pipeline Metadata and globals RENEE_PATH = os.path.dirname( @@ -398,9 +397,11 @@ def build(sub_args): ) ) elif sub_args.mode == "slurm": - jobid = ( - open(os.path.join(sub_args.output, "logfiles", "bjobid.log")).read().strip() - ) + with open( + os.path.join(sub_args.output, "logfiles", "bjobid.log"), "r" + ) as infile: + jobid = infile.read().strip() + if int(masterjob.returncode) == 0: print("Successfully submitted master job: ", end="") else: @@ -770,7 +771,12 @@ def parsed_arguments(name, description): {2}{3}Prebuilt genome+annotation combos:{4} {5} """.format( - "renee", __version__, c.bold, c.url, c.end, list(get_genomes_list()) + "renee", + __version__, + c.bold, + c.url, + c.end, + list(get_genomes_list(repo_base=renee_base)), ) ) @@ -817,7 +823,9 @@ def parsed_arguments(name, description): "--genome", required=True, type=lambda option: str( - genome_options(subparser_run, option, get_genomes_list()) + genome_options( + subparser_run, option, get_genomes_list(repo_base=renee_base) + ) ), help=argparse.SUPPRESS, ) @@ -1126,7 +1134,12 @@ def parsed_arguments(name, description): {2}{3}Prebuilt genome+annotation combos:{4} {5} """.format( - "renee", __version__, c.bold, c.url, c.end, list(get_genomes_list()) + "renee", + __version__, + c.bold, + c.url, + c.end, + list(get_genomes_list(repo_base=renee_base)), ) ) diff --git a/src/renee/cache.py b/src/renee/cache.py deleted file mode 100644 index a908634..0000000 --- a/src/renee/cache.py +++ /dev/null @@ -1,63 +0,0 @@ -import json -import os -import sys - - -def get_singularity_cachedir(output_dir, cache_dir=None): - """Returns the singularity cache directory. - If no user-provided cache directory is provided, - the default singularity cache is in the output directory. - """ - if not cache_dir: - cache_dir = os.path.join(output_dir, ".singularity") - return cache_dir - - -def get_sif_cache_dir(hpc=None): - sif_dir = None - if hpc == "biowulf": - sif_dir = "/data/CCBR_Pipeliner/SIFS" - elif hpc == "frce": - sif_dir = "/mnt/projects/CCBR-Pipelines/SIFs" - return sif_dir - - -def image_cache(sub_args, config): - """Adds Docker Image URIs, or SIF paths to config if singularity cache option is provided. - If singularity cache option is provided and a local SIF does not exist, a warning is - displayed and the image will be pulled from URI in 'config/containers/images.json'. - @param sub_args : - Parsed arguments for run sub-command - @params config : - Docker Image config file - @return config : - Updated config dictionary containing user information (username and home directory) - """ - images = os.path.join(sub_args.output, "config", "containers", "images.json") - - # Read in config for docker image uris - with open(images, "r") as fh: - data = json.load(fh) - # Check if local sif exists - for image, uri in data["images"].items(): - if sub_args.sif_cache: - sif = os.path.join( - sub_args.sif_cache, - "{}.sif".format(os.path.basename(uri).replace(":", "_")), - ) - if not os.path.exists(sif): - # If local sif does not exist on in cache, print warning - # and default to pulling from URI in config/containers/images.json - print( - 'Warning: Local image "{}" does not exist in singularity cache'.format( - sif - ), - file=sys.stderr, - ) - else: - # Change pointer to image from Registry URI to local SIF - data["images"][image] = sif - - config.update(data) - - return config diff --git a/src/renee/gui.py b/src/renee/gui.py index 12aad81..f139f2e 100755 --- a/src/renee/gui.py +++ b/src/renee/gui.py @@ -7,17 +7,16 @@ import sys from tkinter import Tk -from .util import ( +from ccbr_tools.pipeline.util import ( get_genomes_dict, get_tmp_dir, - get_shared_resources_dir, - renee_base, - get_version, - get_singularity_cachedir, get_hpcname, ) -from .cache import get_sif_cache_dir -from .run import run_in_context +from ccbr_tools.pipeline.cache import get_sif_cache_dir, get_singularity_cachedir +from ccbr_tools.shell import exec_in_context + +from .util import get_version, renee_base, get_shared_resources_dir +from .run import run # TODO: get rid of all the global variables # TODO: let's use a tmp dir and put these files there instead. see for inspiration:https://github.com/CCBR/RENEE/blob/16d13dca1d5f0f43c7dfda379efb882a67635d17/tests/test_cache.py#L14-L28 @@ -27,7 +26,7 @@ def launch_gui(sub_args, debug=True): # get drop down genome+annotation options - jsons = get_genomes_dict(error_on_warnings=True) + jsons = get_genomes_dict(repo_base=renee_base, error_on_warnings=True) genome_annotation_combinations = list(jsons.keys()) genome_annotation_combinations.sort() if debug: @@ -191,7 +190,7 @@ def launch_gui(sub_args, debug=True): threads=2, ) # execute dry run and capture stdout/stderr - allout = run_in_context(run_args) + allout = exec_in_context(run, run_args) sg.popup_scrolled( allout, title="Dryrun:STDOUT/STDERR", @@ -211,7 +210,7 @@ def launch_gui(sub_args, debug=True): if ch == "Yes": run_args.dry_run = False # execute live run - allout = run_in_context(run_args) + allout = exec_in_context(run, run_args) sg.popup_scrolled( allout, title="Dryrun:STDOUT/STDERR", diff --git a/src/renee/initialize.py b/src/renee/initialize.py index 75d2207..4fcf6db 100644 --- a/src/renee/initialize.py +++ b/src/renee/initialize.py @@ -2,9 +2,7 @@ import re import sys -from .util import ( - _cp_r_safe_, -) +from ccbr_tools.pipeline.util import _cp_r_safe_, _sym_safe_ def initialize(sub_args, repo_path, output_path): @@ -51,94 +49,3 @@ def initialize(sub_args, repo_path, output_path): inputs = _sym_safe_(input_data=sub_args.input, target=output_path) return inputs - - -def _sym_safe_(input_data, target): - """Creates re-named symlinks for each FastQ file provided - as input. If a symlink already exists, it will not try to create a new symlink. - If relative source PATH is provided, it will be converted to an absolute PATH. - @param input_data ]>: - List of input files to symlink to target location - @param target : - Target path to copy templates and required resources - @return input_fastqs list[]: - List of renamed input FastQs - """ - input_fastqs = [] # store renamed fastq file names - for file in input_data: - filename = os.path.basename(file) - renamed = os.path.join(target, rename(filename)) - input_fastqs.append(renamed) - - if not os.path.exists(renamed): - # Create a symlink if it does not already exist - # Follow source symlinks to resolve any binding issues - os.symlink(os.path.abspath(os.path.realpath(file)), renamed) - - return input_fastqs - - -def rename(filename): - """Dynamically renames FastQ file to have one of the following extensions: *.R1.fastq.gz, *.R2.fastq.gz - To automatically rename the fastq files, a few assumptions are made. If the extension of the - FastQ file cannot be inferred, an exception is raised telling the user to fix the filename - of the fastq files. - @param filename : - Original name of file to be renamed - @return filename : - A renamed FastQ filename - """ - # Covers common extensions from SF, SRA, EBI, TCGA, and external sequencing providers - # key = regex to match string and value = how it will be renamed - extensions = { - # Matches: _R[12]_fastq.gz, _R[12].fastq.gz, _R[12]_fq.gz, etc. - ".R1.f(ast)?q.gz$": ".R1.fastq.gz", - ".R2.f(ast)?q.gz$": ".R2.fastq.gz", - # Matches: _R[12]_001_fastq_gz, _R[12].001.fastq.gz, _R[12]_001.fq.gz, etc. - # Capture lane information as named group - ".R1.(?P...).f(ast)?q.gz$": ".R1.fastq.gz", - ".R2.(?P...).f(ast)?q.gz$": ".R2.fastq.gz", - # Matches: _[12].fastq.gz, _[12].fq.gz, _[12]_fastq_gz, etc. - "_1.f(ast)?q.gz$": ".R1.fastq.gz", - "_2.f(ast)?q.gz$": ".R2.fastq.gz", - } - - if filename.endswith(".R1.fastq.gz") or filename.endswith(".R2.fastq.gz"): - # Filename is already in the correct format - return filename - - converted = False - for regex, new_ext in extensions.items(): - matched = re.search(regex, filename) - if matched: - # regex matches with a pattern in extensions - converted = True - # Try to get substring for named group lane, retain this in new file extension - # Come back to this later, I am not sure if this is necessary - # That string maybe static (i.e. always the same) - # https://support.illumina.com/help/BaseSpace_OLH_009008/Content/Source/Informatics/BS/NamingConvention_FASTQ-files-swBS.htm# - try: - new_ext = "_{}{}".format(matched.group("lane"), new_ext) - except IndexError: - pass # Does not contain the named group lane - - filename = re.sub(regex, new_ext, filename) - break # only rename once - - if not converted: - raise NameError( - """\n\tFatal: Failed to rename provided input '{}'! - Cannot determine the extension of the user provided input file. - Please rename the file list above before trying again. - Here is example of acceptable input file extensions: - sampleName.R1.fastq.gz sampleName.R2.fastq.gz - sampleName_R1_001.fastq.gz sampleName_R2_001.fastq.gz - sampleName_1.fastq.gz sampleName_2.fastq.gz - Please also check that your input files are gzipped? - If they are not, please gzip them before proceeding again. - """.format( - filename - ) - ) - - return filename diff --git a/src/renee/orchestrate.py b/src/renee/orchestrate.py new file mode 100644 index 0000000..4792e17 --- /dev/null +++ b/src/renee/orchestrate.py @@ -0,0 +1,167 @@ +import os +import subprocess + +from ccbr_tools.pipeline.util import ( + get_hpcname, + get_tmp_dir, +) +from ccbr_tools.pipeline.cache import get_singularity_cachedir + + +def orchestrate( + mode, + outdir, + additional_bind_paths, + alt_cache, + threads=2, + submission_script="runner", + masterjob="pl:renee", + tmp_dir=None, + wait="", + hpcname=get_hpcname(), +): + """Runs RENEE pipeline via selected executor: local or slurm. + If 'local' is selected, the pipeline is executed locally on a compute node/instance. + If 'slurm' is selected, jobs will be submitted to the cluster using SLURM job scheduler. + Support for additional job schedulers (i.e. PBS, SGE, LSF) may be added in the future. + @param outdir : + Pipeline output PATH + @param mode : + Execution method or mode: + local runs serially a compute instance without submitting to the cluster. + slurm will submit jobs to the cluster using the SLURM job scheduler. + @param additional_bind_paths : + Additional paths to bind to container filesystem (i.e. input file paths) + @param alt_cache : + Alternative singularity cache location + @param threads : + Number of threads to use for local execution method + @param submission_script : + Path to master jobs submission script: + renee run = /path/to/output/resources/runner + renee build = /path/to/output/resources/builder + @param masterjob : + Name of the master job + @param tmp_dir : + Absolute Path to temp dir for compute node + @param wait : + "--wait" to wait for master job to finish. This waits when pipeline is called via NIDAP API + @param hpcname : + "biowulf" if run on biowulf, "frce" if run on frce, blank otherwise. hpcname is determined in setup() function + @return masterjob : + """ + # Add additional singularity bind PATHs + # to mount the local filesystem to the + # containers filesystem, NOTE: these + # PATHs must be an absolute PATHs + outdir = os.path.abspath(outdir) + # Add any default PATHs to bind to + # the container's filesystem, like + # tmp directories, /lscratch + addpaths = [] + # set tmp_dir depending on hpc + tmp_dir = get_tmp_dir(tmp_dir, outdir) + temp = os.path.dirname(tmp_dir.rstrip("/")) + if temp == os.sep: + temp = tmp_dir.rstrip("/") + if outdir not in additional_bind_paths.split(","): + addpaths.append(outdir) + if temp not in additional_bind_paths.split(","): + addpaths.append(temp) + bindpaths = ",".join(addpaths) + + # Set ENV variable 'SINGULARITY_CACHEDIR' + # to output directory + my_env = {} + my_env.update(os.environ) + + cache = get_singularity_cachedir(output_dir=outdir, cache_dir=alt_cache) + my_env["SINGULARITY_CACHEDIR"] = cache + + if additional_bind_paths: + # Add Bind PATHs for outdir and tmp dir + if bindpaths: + bindpaths = ",{}".format(bindpaths) + bindpaths = "{}{}".format(additional_bind_paths, bindpaths) + + if not os.path.exists(os.path.join(outdir, "logfiles")): + # Create directory for logfiles + os.makedirs(os.path.join(outdir, "logfiles")) + + if os.path.exists(os.path.join(outdir, "logfiles", "snakemake.log")): + mtime = _get_file_mtime(os.path.join(outdir, "logfiles", "snakemake.log")) + newname = os.path.join(outdir, "logfiles", "snakemake." + str(mtime) + ".log") + os.rename(os.path.join(outdir, "logfiles", "snakemake.log"), newname) + + # Create .singularity directory for installations of snakemake + # without setuid which create a sandbox in the SINGULARITY_CACHEDIR + if not os.path.exists(cache): + # Create directory for sandbox and image layers + os.makedirs(cache) + + # Run on compute node or instance without submitting jobs to a scheduler + if mode == "local": + # Run RENEE: instantiate main/master process + # Look into later: it maybe worth replacing Popen subprocess with a direct + # snakemake API call: https://snakemake.readthedocs.io/en/stable/api_reference/snakemake.html + # Create log file for pipeline + logfh = open(os.path.join(outdir, "logfiles", "snakemake.log"), "w") + masterjob = subprocess.Popen( + [ + "snakemake", + "-pr", + "--use-singularity", + "--singularity-args", + "'-B {}'".format(bindpaths), + "--cores", + str(threads), + "--configfile=config.json", + ], + cwd=outdir, + env=my_env, + ) + + # Submitting jobs to cluster via SLURM's job scheduler + elif mode == "slurm": + # Run RENEE: instantiate main/master process + # Look into later: it maybe worth replacing Popen subprocess with a direct + # snakemake API call: https://snakemake.readthedocs.io/en/stable/api_reference/snakemake.html + # snakemake --latency-wait 120 -s $R/Snakefile -d $R --printshellcmds + # --cluster-config $R/cluster.json --keep-going --restart-times 3 + # --cluster "sbatch --gres {cluster.gres} --cpus-per-task {cluster.threads} -p {cluster.partition} -t {cluster.time} --mem {cluster.mem} --job-name={params.rname}" + # -j 500 --rerun-incomplete --stats $R/Reports/initialqc.stats -T + # 2>&1| tee -a $R/Reports/snakemake.log + + # Create log file for master job information + logfh = open(os.path.join(outdir, "logfiles", "master.log"), "w") + # submission_script for renee run is /path/to/output/resources/runner + # submission_script for renee build is /path/to/output/resources/builder + cmdlist = [ + str(os.path.join(outdir, "resources", str(submission_script))), + mode, + "-j", + str(masterjob), + "-b", + str(bindpaths), + "-o", + str(outdir), + "-c", + str(cache), + "-t", + str(tmp_dir), + ] + if str(wait) == "--wait": + cmdlist.append("-w") + if str(hpcname) != "": + cmdlist.append("-n") + cmdlist.append(hpcname) + else: + cmdlist.append("-n") + cmdlist.append("unknown") + + print(" ".join(cmdlist)) + masterjob = subprocess.Popen( + cmdlist, cwd=outdir, stderr=subprocess.STDOUT, stdout=logfh, env=my_env + ) + logfh.close() + return masterjob diff --git a/src/renee/run.py b/src/renee/run.py index bc7cd0e..69ffbae 100644 --- a/src/renee/run.py +++ b/src/renee/run.py @@ -4,12 +4,14 @@ import os import pathlib import sys +from ccbr_tools.pipeline.util import get_hpcname, get_tmp_dir -from .util import renee_base, get_hpcname, get_tmp_dir, orchestrate +from .util import renee_base from .conditions import fatal from .initialize import initialize from .setup import setup from .dryrun import dryrun +from .orchestrate import orchestrate def run(sub_args): @@ -113,11 +115,11 @@ def run(sub_args): ) ) elif sub_args.mode == "slurm": - jobid = ( - open(os.path.join(sub_args.output, "logfiles", "mjobid.log")) - .read() - .strip() - ) + with open( + os.path.join(sub_args.output, "logfiles", "mjobid.log"), "r" + ) as file: + jobid = file.read().strip() + if int(masterjob.returncode) == 0: print("Successfully submitted master job: ", end="") else: @@ -204,13 +206,3 @@ def get_fastq_screen_paths(fastq_screen_confs, match="DATABASE", file_index=-1): db_path = line.strip().split()[file_index] databases.append(db_path) return databases - - -def run_in_context(args): - """Execute the run function in a context manager to capture stdout/stderr""" - with contextlib.redirect_stdout(io.StringIO()) as out_f, contextlib.redirect_stderr( - io.StringIO() - ) as err_f: - run(args) - allout = out_f.getvalue() + "\n" + err_f.getvalue() - return allout diff --git a/src/renee/setup.py b/src/renee/setup.py index 000e3c8..c69f20d 100644 --- a/src/renee/setup.py +++ b/src/renee/setup.py @@ -4,12 +4,13 @@ import subprocess import sys -from .util import ( +from ccbr_tools.pipeline.util import ( get_hpcname, - get_version, get_tmp_dir, ) -from .cache import image_cache +from ccbr_tools.pipeline.cache import image_cache + +from .util import get_version def setup(sub_args, ifiles, repo_path, output_path): diff --git a/src/renee/util.py b/src/renee/util.py index 1aa2e66..4a6e273 100644 --- a/src/renee/util.py +++ b/src/renee/util.py @@ -1,21 +1,13 @@ -import datetime -import glob -import os -import subprocess -import shutil -import sys -import warnings -from .cache import get_singularity_cachedir +import pathlib +from ccbr_tools.pipeline.util import get_hpcname def renee_base(*paths): """Get the absolute path to a file in the repository @return abs_path """ - basedir = os.path.dirname( - os.path.dirname(os.path.dirname(os.path.realpath(__file__))) - ) - return os.path.join(basedir, *paths) + basedir = pathlib.Path(__file__).absolute().parent.parent.parent + return str(basedir.joinpath(*paths)) def get_version(): @@ -27,45 +19,6 @@ def get_version(): return version -def scontrol_show(): - """Run scontrol show config and parse the output as a dictionary - @return scontrol_dict : - """ - scontrol_dict = dict() - scontrol_out = subprocess.run( - "scontrol show config", shell=True, capture_output=True, text=True - ).stdout - if len(scontrol_out) > 0: - for line in scontrol_out.split("\n"): - line_split = line.split("=") - if len(line_split) > 1: - scontrol_dict[line_split[0].strip()] = line_split[1].strip() - return scontrol_dict - - -def get_hpcname(): - """Get the HPC name (biowulf, frce, or an empty string) - @return hpcname - """ - scontrol_out = scontrol_show() - hpc = scontrol_out["ClusterName"] if "ClusterName" in scontrol_out.keys() else "" - if hpc == "fnlcr": - hpc = "frce" - return hpc - - -def get_tmp_dir(tmp_dir, outdir, hpc=get_hpcname()): - """Get default temporary directory for biowulf and frce. Allow user override.""" - if not tmp_dir: - if hpc == "biowulf": - tmp_dir = "/lscratch/$SLURM_JOBID" - elif hpc == "frce": - tmp_dir = outdir - else: - tmp_dir = None - return tmp_dir - - def get_shared_resources_dir(shared_dir, hpc=get_hpcname()): """Get default shared resources directory for biowulf and frce. Allow user override.""" if not shared_dir: @@ -76,249 +29,3 @@ def get_shared_resources_dir(shared_dir, hpc=get_hpcname()): elif hpc == "frce": shared_dir = "/mnt/projects/CCBR-Pipelines/pipelines/RENEE/resources/shared_resources" return shared_dir - - -def get_genomes_list(hpcname=get_hpcname(), error_on_warnings=False): - """Get list of genome annotations available for the current platform - @return genomes_list - """ - return sorted( - list( - get_genomes_dict( - hpcname=hpcname, error_on_warnings=error_on_warnings - ).keys() - ) - ) - - -def get_genomes_dict(hpcname=get_hpcname(), error_on_warnings=False): - """Get dictionary of genome annotation versions and the paths to the corresponding JSON files - @return genomes_dict { genome_name: json_file_path } - """ - if error_on_warnings: - warnings.filterwarnings("error") - genomes_dir = renee_base(os.path.join("config", "genomes", hpcname)) - if not os.path.exists(genomes_dir): - warnings.warn(f"Folder does not exist: {genomes_dir}") - search_term = genomes_dir + "/*.json" - json_files = glob.glob(search_term) - if len(json_files) == 0: - warnings.warn( - f"No Genome+Annotation JSONs found in {genomes_dir}. Please specify a custom genome json file with `--genome`" - ) - genomes_dict = { - os.path.basename(json_file).replace(".json", ""): json_file - for json_file in json_files - } - warnings.resetwarnings() - return genomes_dict - - -def check_python_version(): - # version check - # glob.iglob requires 3.11 for using "include_hidden=True" - MIN_PYTHON = (3, 11) - try: - assert sys.version_info >= MIN_PYTHON - print( - "Python version: {0}.{1}.{2}".format( - sys.version_info.major, sys.version_info.minor, sys.version_info.micro - ) - ) - except AssertionError: - exit( - f"{sys.argv[0]} requires Python {'.'.join([str(n) for n in MIN_PYTHON])} or newer" - ) - - -def _cp_r_safe_( - source, target, resources=["workflow", "resources", "config"], safe_mode=True -): - """Private function: Given a list paths it will recursively copy each to the - target location. If a target path already exists, it will not over-write the - existing paths data when `safe_mode` is on. - @param resources : - List of paths to copy over to target location. - Default: ["workflow", "resources", "config"] - @params source : - Add a prefix PATH to each resource - @param target : - Target path to copy templates and required resources (aka destination) - @param safe_mode : - Only copy the resources to the target path - if they do not exist in the target path (default: True) - """ - for resource in resources: - destination = os.path.join(target, resource) - if os.path.exists(destination) and safe_mode: - print(f"🚫 path exists and `safe_mode` is ON, not copying: {destination}") - else: - # Required resources do not exist, or safe mode is off - shutil.copytree( - os.path.join(source, resource), destination, dirs_exist_ok=not safe_mode - ) - - -def orchestrate( - mode, - outdir, - additional_bind_paths, - alt_cache, - threads=2, - submission_script="runner", - masterjob="pl:renee", - tmp_dir=None, - wait="", - hpcname="", -): - """Runs RENEE pipeline via selected executor: local or slurm. - If 'local' is selected, the pipeline is executed locally on a compute node/instance. - If 'slurm' is selected, jobs will be submitted to the cluster using SLURM job scheduler. - Support for additional job schedulers (i.e. PBS, SGE, LSF) may be added in the future. - @param outdir : - Pipeline output PATH - @param mode : - Execution method or mode: - local runs serially a compute instance without submitting to the cluster. - slurm will submit jobs to the cluster using the SLURM job scheduler. - @param additional_bind_paths : - Additional paths to bind to container filesystem (i.e. input file paths) - @param alt_cache : - Alternative singularity cache location - @param threads : - Number of threads to use for local execution method - @param submission_script : - Path to master jobs submission script: - renee run = /path/to/output/resources/runner - renee build = /path/to/output/resources/builder - @param masterjob : - Name of the master job - @param tmp_dir : - Absolute Path to temp dir for compute node - @param wait : - "--wait" to wait for master job to finish. This waits when pipeline is called via NIDAP API - @param hpcname : - "biowulf" if run on biowulf, "frce" if run on frce, blank otherwise. hpcname is determined in setup() function - @return masterjob : - """ - # Add additional singularity bind PATHs - # to mount the local filesystem to the - # containers filesystem, NOTE: these - # PATHs must be an absolute PATHs - outdir = os.path.abspath(outdir) - # Add any default PATHs to bind to - # the container's filesystem, like - # tmp directories, /lscratch - addpaths = [] - # set tmp_dir depending on hpc - tmp_dir = get_tmp_dir(tmp_dir, outdir) - temp = os.path.dirname(tmp_dir.rstrip("/")) - if temp == os.sep: - temp = tmp_dir.rstrip("/") - if outdir not in additional_bind_paths.split(","): - addpaths.append(outdir) - if temp not in additional_bind_paths.split(","): - addpaths.append(temp) - bindpaths = ",".join(addpaths) - - # Set ENV variable 'SINGULARITY_CACHEDIR' - # to output directory - my_env = {} - my_env.update(os.environ) - - cache = get_singularity_cachedir(output_dir=outdir, cache_dir=alt_cache) - my_env["SINGULARITY_CACHEDIR"] = cache - - if additional_bind_paths: - # Add Bind PATHs for outdir and tmp dir - if bindpaths: - bindpaths = ",{}".format(bindpaths) - bindpaths = "{}{}".format(additional_bind_paths, bindpaths) - - if not os.path.exists(os.path.join(outdir, "logfiles")): - # Create directory for logfiles - os.makedirs(os.path.join(outdir, "logfiles")) - - if os.path.exists(os.path.join(outdir, "logfiles", "snakemake.log")): - mtime = _get_file_mtime(os.path.join(outdir, "logfiles", "snakemake.log")) - newname = os.path.join(outdir, "logfiles", "snakemake." + str(mtime) + ".log") - os.rename(os.path.join(outdir, "logfiles", "snakemake.log"), newname) - - # Create .singularity directory for installations of snakemake - # without setuid which create a sandbox in the SINGULARITY_CACHEDIR - if not os.path.exists(cache): - # Create directory for sandbox and image layers - os.makedirs(cache) - - # Run on compute node or instance without submitting jobs to a scheduler - if mode == "local": - # Run RENEE: instantiate main/master process - # Look into later: it maybe worth replacing Popen subprocess with a direct - # snakemake API call: https://snakemake.readthedocs.io/en/stable/api_reference/snakemake.html - # Create log file for pipeline - logfh = open(os.path.join(outdir, "logfiles", "snakemake.log"), "w") - masterjob = subprocess.Popen( - [ - "snakemake", - "-pr", - "--use-singularity", - "--singularity-args", - "'-B {}'".format(bindpaths), - "--cores", - str(threads), - "--configfile=config.json", - ], - cwd=outdir, - env=my_env, - ) - - # Submitting jobs to cluster via SLURM's job scheduler - elif mode == "slurm": - # Run RENEE: instantiate main/master process - # Look into later: it maybe worth replacing Popen subprocess with a direct - # snakemake API call: https://snakemake.readthedocs.io/en/stable/api_reference/snakemake.html - # snakemake --latency-wait 120 -s $R/Snakefile -d $R --printshellcmds - # --cluster-config $R/cluster.json --keep-going --restart-times 3 - # --cluster "sbatch --gres {cluster.gres} --cpus-per-task {cluster.threads} -p {cluster.partition} -t {cluster.time} --mem {cluster.mem} --job-name={params.rname}" - # -j 500 --rerun-incomplete --stats $R/Reports/initialqc.stats -T - # 2>&1| tee -a $R/Reports/snakemake.log - - # Create log file for master job information - logfh = open(os.path.join(outdir, "logfiles", "master.log"), "w") - # submission_script for renee run is /path/to/output/resources/runner - # submission_script for renee build is /path/to/output/resources/builder - cmdlist = [ - str(os.path.join(outdir, "resources", str(submission_script))), - mode, - "-j", - str(masterjob), - "-b", - str(bindpaths), - "-o", - str(outdir), - "-c", - str(cache), - "-t", - str(tmp_dir), - ] - if str(wait) == "--wait": - cmdlist.append("-w") - if str(hpcname) != "": - cmdlist.append("-n") - cmdlist.append(hpcname) - else: - cmdlist.append("-n") - cmdlist.append("unknown") - - print(" ".join(cmdlist)) - masterjob = subprocess.Popen( - cmdlist, cwd=outdir, stderr=subprocess.STDOUT, stdout=logfh, env=my_env - ) - - return masterjob - - -def _get_file_mtime(f): - timestamp = datetime.fromtimestamp(os.path.getmtime(os.path.abspath(f))) - mtime = timestamp.strftime("%y%m%d%H%M%S") - return mtime diff --git a/tests/test_cache.py b/tests/test_cache.py index 846929f..76d87f8 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -3,7 +3,8 @@ import os.path import subprocess -from renee.src.renee.cache import get_sif_cache_dir, get_singularity_cachedir +from ccbr_tools.pipeline.cache import get_sif_cache_dir, get_singularity_cachedir +from ccbr_tools.shell import shell_run renee_run = ( "./bin/renee run " @@ -16,12 +17,7 @@ def run_in_temp(command_str): with tempfile.TemporaryDirectory() as tmp_dir: outdir = os.path.join(tmp_dir, "testout") - output = subprocess.run( - f"{command_str} --output {outdir}", - capture_output=True, - shell=True, - text=True, - ) + output = shell_run(f"{command_str} --output {outdir}") if os.path.exists(os.path.join(outdir, "config.json")): with open(os.path.join(outdir, "config.json"), "r") as infile: config = json.load(infile) @@ -36,13 +32,13 @@ def test_cache_sif(): config["images"]["arriba"].endswith( "tests/data/sifs/ccbr_arriba_2.0.0_v0.0.1.sif" ), - "does not exist in singularity cache" in output.stderr, + "does not exist in singularity cache" in output, ] assert all(assertions) def test_cache_nosif(): - output, config = run_in_temp(f"{renee_run}") + output, config = run_in_temp(f"{renee_run} --sif-cache not/a/path") assertions = [ config["images"]["arriba"] == "docker://nciccbr/ccbr_arriba_2.0.0:v0.0.1" ] diff --git a/tests/test_run.py b/tests/test_run.py index 229d40a..a2b05be 100644 --- a/tests/test_run.py +++ b/tests/test_run.py @@ -3,25 +3,24 @@ import os import tempfile -from renee.src.renee.util import ( +from ccbr_tools.pipeline.util import ( get_tmp_dir, - get_shared_resources_dir, - renee_base, + get_hpcname, ) -from renee.src.renee.cache import get_sif_cache_dir -from renee.src.renee.run import run, run_in_context -from renee.src.renee.util import get_hpcname +from ccbr_tools.pipeline.cache import get_sif_cache_dir +from ccbr_tools.shell import exec_in_context + +from renee.src.renee.util import renee_base, get_shared_resources_dir +from renee.src.renee.run import run def test_dryrun(): if get_hpcname() == "biowulf": with tempfile.TemporaryDirectory() as tmp_dir: run_args = argparse.Namespace( - input=list(glob.glob(os.path.join(renee_base(".tests"), "*.fastq.gz"))), + input=list(glob.glob(f"{renee_base('.tests')}/*.fastq.gz")), output=tmp_dir, - genome=os.path.join( - renee_base("config"), "genomes", "biowulf", "hg38_36.json" - ), + genome=renee_base("config", "genomes", "biowulf", "hg38_36.json"), mode="slurm", runmode="run", dry_run=True, @@ -36,7 +35,7 @@ def test_dryrun(): threads=2, ) # execute dry run and capture stdout/stderr - allout = run_in_context(run_args) + allout = exec_in_context(run, run_args) assert ( "This was a dry-run (flag -n). The order of jobs does not reflect the order of execution." in allout diff --git a/tests/test_util.py b/tests/test_util.py index d344e21..17f8b5f 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -6,17 +6,18 @@ import tempfile import warnings -from renee.src.renee.util import ( - renee_base, +from ccbr_tools.pipeline.util import ( _cp_r_safe_, get_genomes_dict, get_genomes_list, ) +from renee.src.renee.util import renee_base + def test_renee_base(): renee_bin = renee_base(os.path.join("bin", "renee")) - assert renee_bin.endswith("/bin/renee") and os.path.exists(renee_bin) + assert str(renee_bin).endswith("/bin/renee") and os.path.exists(renee_bin) def test_cp_safe(): @@ -52,7 +53,7 @@ def test_cp_unsafe(): def test_get_genomes_warnings(): with warnings.catch_warnings(record=True) as raised_warnings: - genomes = get_genomes_list(hpcname="notAnOption") + genomes = get_genomes_list(repo_base=renee_base, hpcname="notAnOption") assertions = [ "len(genomes) == 0", "len(raised_warnings) == 2", @@ -68,10 +69,12 @@ def test_get_genomes_warnings(): def test_get_genomes_error(): with pytest.raises(UserWarning) as exception_info: - get_genomes_list(hpcname="notAnOption", error_on_warnings=True) + get_genomes_list( + repo_base=renee_base, hpcname="notAnOption", error_on_warnings=True + ) assert "Folder does not exist" in str(exception_info.value) def test_get_genomes_biowulf(): - genomes_dict = get_genomes_dict(hpcname="biowulf") + genomes_dict = get_genomes_dict(repo_base=renee_base, hpcname="biowulf") assert len(genomes_dict) > 10