diff --git a/.circleci/config.yml b/.circleci/config.yml index ff30bc73..f079542b 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -88,7 +88,7 @@ jobs: source activate ./atlasenv atlas init --db-dir $DATABASE_DIR --threads 1 --working-dir test/Getenvs test/reads/empty - run: - name: install environements + name: install environments command: | source activate ./atlasenv atlas run all --working-dir test/Getenvs --conda-create-envs-only --cores all diff --git a/.github/workflows/codespell.yml b/.github/workflows/codespell.yml index 7b8da68b..dd0eb8e5 100644 --- a/.github/workflows/codespell.yml +++ b/.github/workflows/codespell.yml @@ -21,7 +21,3 @@ jobs: uses: actions/checkout@v4 - name: Codespell uses: codespell-project/actions-codespell@v2 - with: - check_filenames: true - skip: ".git,*.pdf,*.svg,versioneer.py,*.css,*.html" - check_hidden: true diff --git a/.github/workflows/python-package-conda.yml b/.github/workflows/python-package-conda.yml index ee3faedb..8a8d10d0 100644 --- a/.github/workflows/python-package-conda.yml +++ b/.github/workflows/python-package-conda.yml @@ -106,7 +106,7 @@ jobs: path: databases key: conda-envs-assembly - # - name: upack conda envs + # - name: unpack conda envs # if: steps.get-envs.outputs.cache-hit != 'true' # run: tar -xzf assembly_conda_envs.tar.gz @@ -198,7 +198,7 @@ jobs: path: wd key: assembly-working-dir - - name: dryrun assembly shold need nothing to be done + - name: dryrun assembly should need nothing to be done run: | ls -l wd ls -l databases/conda_envs @@ -264,7 +264,7 @@ jobs: fail-on-cache-miss: true key: assembly-working-dir - - name: dryrun assembly shold need nothing to be done + - name: dryrun assembly should need nothing to be done run: | ls -l wd ls -l databases diff --git a/CHANGELOG.md b/CHANGELOG.md index 89e1b99e..a6faadce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,12 +20,12 @@ Fix error with downloading DRAM. Update to DRAM v1.5 - Qc reads, assembly are now written in the sample.tsv from the start. This should fix errors of partial writing to the sample.tsv https://github.com/metagenome-atlas/atlas/issues/695 - It also allows you to add external assemblies. -- singletons reads are no longer used trough the pipeline. +- singletons reads are no longer used through the pipeline. - This changes the default paths for raw reads and assemblies. assembly are now in `Assembly/fasta/{sample}.fasta` reads: `QC/reads/{sample}_{fraction}.fastq.gz` -**Seemless update**: If you update atlas and continue on an old project. Your old files will be copied. +**Seamless update**: If you update atlas and continue on an old project. Your old files will be copied. Or the path defined in the sample.tsv will be used. diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 5e41cfbf..d940038c 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -24,7 +24,7 @@ I hope we can help you... You can ask the maintainers to be added to the repository and work from a *branch* of the main atlas repository or you can work from a fork of the atlas repository. -Follow the [steps](https://github.com/metagenome-atlas/atlas#install-the-development-version-from-github) to set up the developpment version of atlas. This allows you to work with the code you have in the git repository. +Follow the [steps](https://github.com/metagenome-atlas/atlas#install-the-development-version-from-github) to set up the development version of atlas. This allows you to work with the code you have in the git repository. ## Test the code ### Locally @@ -36,8 +36,8 @@ When you created a new rule and you want to test the output of this rule `my_tar -### Continous integration -When you make a pull request to the master branch. Each change in your code get's checked by continous integration (CI). The tests should make shure that your modification don't break any other use of atlas. However due to the requeirements needed during the execution of atlas, it is not possible to test all functionalities via CI. If you add functionalities to atlas, they should also be tested. Have a look at the scripts in `.test`. +### Continuous integration +When you make a pull request to the master branch. Each change in your code gets checked by continuous integration (CI). The tests should make sure that your modification don't break any other use of atlas. However due to the requeirements needed during the execution of atlas, it is not possible to test all functionalities via CI. If you add functionalities to atlas, they should also be tested. Have a look at the scripts in `.test`. diff --git a/README.md b/README.md index 445f8757..31be0d09 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,7 @@ https://metagenome-atlas.readthedocs.io/ > doi: [10.1186/s12859-020-03585-4](https://doi.org/10.1186/s12859-020-03585-4) -# Developpment/Extensions +# Development/Extensions Here are some ideas I work or want to work on when I have time. If you want to contribute or have some ideas let me know via a feature request issue. diff --git a/atlas/atlas.py b/atlas/atlas.py index 8afc7342..b708fddb 100644 --- a/atlas/atlas.py +++ b/atlas/atlas.py @@ -9,7 +9,7 @@ from snakemake.io import load_configfile from .make_config import validate_config -from .init.atlas_init import run_init # , run_init_sra +from .init.atlas_init import run_init, run_init_sra from .__init__ import __version__ @@ -32,7 +32,7 @@ def handle_max_mem(max_mem, profile): import psutil from math import floor - # calulate max system meory in GB (float!) + # calculate max system memory in GB (float!) max_system_memory = psutil.virtual_memory().total / (1024**3) if max_mem is None: @@ -66,7 +66,7 @@ def cli(obj): cli.add_command(run_init) -# cli.add_command(run_init_sra) +cli.add_command(run_init_sra) def get_snakefile(file="workflow/Snakefile"): @@ -146,7 +146,7 @@ def get_snakefile(file="workflow/Snakefile"): def run_workflow( workflow, working_dir, config_file, jobs, max_mem, profile, dryrun, snakemake_args ): - """Runs the ATLAS pipline + """Runs the ATLAS pipeline By default all steps are executed but a sub-workflow can be specified. Needs a config-file and expects to find a sample table in the working-directory. Both can be generated with 'atlas init' diff --git a/atlas/init/atlas_init.py b/atlas/init/atlas_init.py index 9acf5c4c..855b60dc 100644 --- a/atlas/init/atlas_init.py +++ b/atlas/init/atlas_init.py @@ -4,8 +4,9 @@ import numpy as np import click from pathlib import Path -from ..make_config import make_config, validate_config +from ..make_config import make_config from .create_sample_table import get_samples_from_fastq, simplify_sample_names + from ..sample_table import ( validate_sample_table, validate_bingroup_size_cobinning, @@ -19,8 +20,8 @@ PHIX = "phiX174_virus.fa" -def prepare_sample_table_for_atlas( - sample_table, reads_are_QC=False, outfile="samples.tsv" +def write_sample_table_for_atlas( + sample_table, reads_are_QC=False, sample_table_file="samples.tsv", overwrite=False ): """ Write the file `samples.tsv` and complete the sample names and paths for all @@ -29,30 +30,38 @@ def prepare_sample_table_for_atlas( path_to_fastq (str): fastq/fasta data directory """ - if os.path.exists(outfile): - logger.error( - f"Output file {outfile} already exists I don't dare to overwrite it." - ) - exit(1) - + sample_table_file = Path(sample_table_file) + # delete samples.tsv if it exists and overwrite is set + if sample_table_file.exists(): + if overwrite: + sample_table_file.unlink() + else: + logger.info( + f"{sample_table_file} already exists, I dare not to overwrite it. " + f"Use --overwrite to overwrite this file" + ) + exit(1) simplify_sample_names(sample_table) - columns = sample_table.columns # R1 and R2 or only R1 , who knows - - # Test if paired end - if "R2" in columns: - fractions = ["R1", "R2"] - else: - sample_table.rename(columns={"R1": "se"}, inplace=True) - fractions = ["se"] - # Add prefix to fractions depending if qc or not if reads_are_QC: - prefix = "Reads_QC_" + prefix = "Reads_QC" else: - prefix = "Reads_raw_" + prefix = "Reads_raw" - sample_table.rename(columns={f: f"{prefix}{f}" for f in fractions}, inplace=True) + if is_paired(sample_table): + fractions = ["R1", "R2"] + + sample_table.rename( + columns={f: f"{prefix}_{f}" for f in fractions}, + inplace=True, + errors="raise", + ) + else: + fractions = ["se"] + sample_table.rename( + columns={"R1": f"{prefix}_se"}, inplace=True, errors="raise" + ) sample_table["BinGroup"] = "All" @@ -66,7 +75,98 @@ def prepare_sample_table_for_atlas( validate_sample_table(sample_table) - sample_table.to_csv(outfile, sep="\t") + logger.info(f"Prepared sample table with {sample_table.shape[0]} samples") + + sample_table.to_csv(sample_table_file, sep="\t") + + +def get_default_binner(sample_table): + + n_samples = sample_table.shape[0] + if n_samples <= 7: + logger.info( + "You don't have many samples in your dataset. " "I set 'metabat' as binner" + ) + binner = "metabat" + + try: + validate_bingroup_size_metabat(sample_table, logger) + except BinGroupSizeError: + pass + + else: + binner = "vamb" + try: + validate_bingroup_size_cobinning(sample_table, logger) + + except BinGroupSizeError: + pass + + return binner + + +def is_paired(sample_table): + # Test if paired end + + if not ( + sample_table.columns.str.endswith("R1").any() + or sample_table.columns.str.endswith("se").any() + ): + logger.error("I didn't find any R1 reads in your sample table") + exit(1) + + return sample_table.columns.str.endswith("R2").any() + + +def get_default_assembler(sample_table): + + if not is_paired(sample_table): + logger.info("Set assembler to megahit as spades cannot handle single-end reads") + return "megahit" + else: + return "spades" + + +def init_project( + sample_table_simple, + db_dir, + working_dir, + data_type="metagenomics", + interleaved_fastq=False, + threads=8, + skip_qc=False, + assembler=None, + binner=None, + overwrite=False, +): + # create working dir and db_dir + os.makedirs(working_dir, exist_ok=True) + os.makedirs(db_dir, exist_ok=True) + + sample_table_file = os.path.join(working_dir, "samples.tsv") + + write_sample_table_for_atlas( + sample_table_simple, + reads_are_QC=skip_qc, + sample_table_file=sample_table_file, + overwrite=overwrite, + ) + + if binner is None: + binner = get_default_binner(sample_table_simple) + + if assembler is None: + assembler = get_default_assembler(sample_table_simple) + + make_config( + db_dir, + threads, + assembler, + data_type, + interleaved_fastq, + os.path.join(working_dir, "config.yaml"), + binner=binner, + ) ###### Atlas init command ###### @@ -118,6 +218,11 @@ def prepare_sample_table_for_atlas( type=int, help="number of threads to use per multi-threaded job", ) +@click.option( + "--overwrite", + is_flag=True, + help="Overwrite previously downloaded Runinfo", +) @click.option( "--skip-qc", is_flag=True, @@ -131,6 +236,7 @@ def run_init( data_type, interleaved_fastq, threads, + overwrite=False, skip_qc=False, ): """Write the file CONFIG and complete the sample names and paths for all @@ -140,51 +246,44 @@ def run_init( the file name with the file name minus extension as the sample ID. """ - # create working dir and db_dir - os.makedirs(working_dir, exist_ok=True) - os.makedirs(db_dir, exist_ok=True) - sample_table = get_samples_from_fastq(path_to_fastq) - prepare_sample_table_for_atlas( - sample_table, - reads_are_QC=skip_qc, - outfile=os.path.join(working_dir, "samples.tsv"), + init_project( + sample_table_simple, + db_dir, + working_dir, + data_type="metagenomics", + interleaved_fastq=False, + threads=8, + overwrite=overwrite, + skip_qc=False, + assembler=None, + binner=None, ) - # Set default binner depending on number of samples - n_samples = sample_table.shape[0] - if n_samples <= 7: - logger.info( - "You don't have many samples in your dataset. " "I set 'metabat' as binner" - ) - binner = "metabat" - try: - validate_bingroup_size_metabat(sample_table, logger) - except BinGroupSizeError: - pass +########### Public init download data from SRA ############## - else: - binner = "vamb" - try: - validate_bingroup_size_cobinning(sample_table, logger) - except BinGroupSizeError: - pass +def create_sample_table_from_sample_names(Samples, paired, fastq_folder): - make_config( - db_dir, - threads, - assembler, - data_type, - interleaved_fastq, - os.path.join(working_dir, "config.yaml"), - binner=binner, - ) + sample_table = pd.DataFrame(index=Samples) + fastq_folder = Path(fastq_folder) -########### Public init download data from SRA ############## + if not paired: + sample_table["R1"] = sample_table.index.map( + lambda s: str(fastq_folder / f"{s}/{s}.fastq.gz") + ) + else: + sample_table["R1"] = sample_table.index.map( + lambda s: str(fastq_folder / f"{s}/{s}_1.fastq.gz") + ) + sample_table["R2"] = sample_table.index.map( + lambda s: str(fastq_folder / f"{s}/{s}_2.fastq.gz") + ) + + return sample_table @click.command( @@ -192,8 +291,8 @@ def run_init( short_help="Prepare atlas run from public data from SRA", help="prepare configuration file and sample table for atlas run" "based on public data from SRA\n" - "Supply a set of SRA run ids to the command, e.g.:" - "ERR1190946 PRJEB20796\n\n" + "Supply a set of SRA run ids to the command, e.g." + "PRJEB20796 ERR1190946 \n\n" "Reads are automatically downloaded and only temporarily stored on your machine.", ) @click.argument("identifiers", nargs=-1, type=str) @@ -243,11 +342,12 @@ def run_init_sra( ) sys.exit(1) - from .get_SRA_runinfo import get_runtable_from_ids + from .get_SRA_runinfo import get_table_from_accessions from .parse_sra import ( filter_runinfo, load_and_validate_runinfo_table, validate_merging_runinfo, + get_all_sample_names, ) # create working dir and db_dir @@ -259,7 +359,7 @@ def run_init_sra( SRA_subfolder = working_dir / "SRA" SRA_subfolder.mkdir(exist_ok=True) - runinfo_file = working_dir / "RunInfo.tsv" + runinfo_file = working_dir / "RunInfo.csv" if os.path.exists(runinfo_file) & (not overwrite): if not ((len(identifiers) == 1) & (identifiers[0].lower() == "continue")): @@ -274,9 +374,9 @@ def run_init_sra( logger.info(f"Downloading runinfo from SRA") # Create runinfo table in folder for SRA reads - runinfo_file_original = SRA_subfolder / "RunInfo_original.tsv" + runinfo_file_original = SRA_subfolder / "RunInfo_original.csv" - get_runtable_from_ids(identifiers, runinfo_file_original) + get_table_from_accessions(identifiers, runinfo_file_original) # Parse runtable RunTable = load_and_validate_runinfo_table(runinfo_file_original) @@ -286,17 +386,17 @@ def run_init_sra( # save filtered runtable logger.info(f"Write filtered runinfo to {runinfo_file}") - RunTable_filtered.to_csv(runinfo_file, sep="\t") + RunTable_filtered.to_csv(runinfo_file) # validate if can be merged RunTable = validate_merging_runinfo(runinfo_file) # create sample table - Samples = RunTable.BioSample.unique() + Samples = get_all_sample_names(RunTable) - if (RunTable.LibraryLayout == "PAIRED").all(): + if (RunTable.library_layout == "PAIRED").all(): paired = True - elif (RunTable.LibraryLayout == "SINGLE").all(): + elif (RunTable.library_layout == "SINGLE").all(): paired = False else: logger.error( @@ -304,46 +404,10 @@ def run_init_sra( ) exit(1) - sample_table_file = working_dir / "samples.tsv" - # delete samples.tsv if it exists and overwrite is set - if sample_table_file.exists(): - if overwrite: - sample_table_file.unlink() - else: - logger.info( - f"{sample_table_file} already exists, I dare not to overwrite it. " - f"Use --overwrite to overwrite this file" - ) - exit(1) - - # create sample table - - sample_table = pd.DataFrame(index=Samples) - - SRA_READ_PATH = SRA_subfolder.relative_to(working_dir) / "Samples" - - if not paired: - sample_table["R1"] = sample_table.index.map( - lambda s: str(SRA_READ_PATH / f"{s}/{s}.fastq.gz") - ) - else: - sample_table["R1"] = sample_table.index.map( - lambda s: str(SRA_READ_PATH / f"{s}/{s}_1.fastq.gz") - ) - sample_table["R2"] = sample_table.index.map( - lambda s: str(SRA_READ_PATH / f"{s}/{s}_2.fastq.gz") - ) - - prepare_sample_table_for_atlas( - sample_table, reads_are_QC=skip_qc, outfile=str(sample_table_file) + sample_table = create_sample_table_from_sample_names( + Samples, paired, fastq_folder=SRA_subfolder.relative_to(working_dir) / "Samples" ) - logger.info(f"Prepared sample table with {sample_table.shape[0]} samples") - - if not paired: - logger.info("Set assembler to megahit as spades cannot handle single-end reads") - assembler = "megahit" - else: - assembler = "spades" - # create config file - make_config(db_dir, config=str(working_dir / "config.yaml"), assembler=assembler) + init_project( + sample_table, db_dir, working_dir, skip_qc=skip_qc, overwrite=overwrite + ) diff --git a/atlas/init/create_sample_table.py b/atlas/init/create_sample_table.py index 734c6cab..8214e0fe 100644 --- a/atlas/init/create_sample_table.py +++ b/atlas/init/create_sample_table.py @@ -45,7 +45,7 @@ def add_sample_to_table(sample_dict, sample_id, header, fastq): def infer_split_character(base_name): - "Infer if fastq filename uses '_R1' '_1' to seperate filenames" + "Infer if fastq filename uses '_R1' '_1' to separate filenames" global split_character, is_paired @@ -59,7 +59,7 @@ def infer_split_character(base_name): is_paired = True else: logger.warning( - f"Could't find '_R1'/'_R2' or '_1'/'_2' in your filename {base_name}. Assume you have single-end reads." + f"Couldn't find '_R1'/'_R2' or '_1'/'_2' in your filename {base_name}. Assume you have single-end reads." ) split_character = None is_paired = False @@ -145,7 +145,7 @@ def get_samples_from_fastq(path, fraction_split_character=split_character): try: _, subfolders, files = next(os.walk(path)) except StopIteration: - logger.error(f"Folder {path} seems to conain no files or subfolders.") + logger.error(f"Folder {path} seems to contain no files or subfolders.") exit(1) abs_path = os.path.abspath(path) @@ -188,6 +188,8 @@ def simplify_sample_names(sample_df): assert sample_df.index.is_unique + sample_df.index = sample_df.index.astype(str) + sample_name_df = ( sample_df.index.str.split("[_, ,-]", expand=True) .to_frame() @@ -213,7 +215,7 @@ def simplify_sample_names(sample_df): lambda row: "{0}-{1}".format(*row), axis=1 ) - # cannt find unique sample ids + # cannot find unique sample ids else: logger.warning( "Didn't found a way to simplify sample names. " diff --git a/atlas/init/get_SRA_runinfo.py b/atlas/init/get_SRA_runinfo.py index 4c02c77a..cc7374cc 100644 --- a/atlas/init/get_SRA_runinfo.py +++ b/atlas/init/get_SRA_runinfo.py @@ -5,326 +5,96 @@ import traceback import xmltodict -try: - from tqdm import tqdm -except ImportError: - tqdm = list - - -class SRAUtils: - """ - Prefixes can be found here http://www.ddbj.nig.ac.jp/sub/prefix.html for DDBJ, ENA/EBI (ERA), and NCBI(SRA) - - Some interesting information here http://trace.ddbj.nig.ac.jp/dra/submission_e.html#Organization_of_metadata_objects - """ - - @staticmethod - def getInfoTableFromSearchTerm(search): - # return types and databases listed here - # https://www.ncbi.nlm.nih.gov/books/NBK25499/table/chapter4.T._valid_values_of__retmode_and/?report=objectonly - # - payload = {"save": "efetch", "db": "sra", "rettype": "runinfo", "term": search} - r = requests.get( - "http://trace.ncbi.nlm.nih.gov/Traces/sra/sra.cgi", params=payload - ) - if 200 == r.status_code: - if r.text.isspace(): - raise Exception("Got blank string from " + str(r.url)) - else: - reader_list = csv.DictReader(io.StringIO(r.text)) - infoRows = [] - for row in reader_list: - infoRows.append(row) - if 0 == len(infoRows): - raise Exception( - 'Found %d entries in SRA for "%s" when expecting at least 1' - % (len(infoRows), search) - ) - else: - return infoRows - return infoRows - else: - raise Exception( - "Error in downloading from " - + str(r.url) - + " got response code " - + str(r.status_code) - ) +import pandas as pd + +import logging + + +logger = logging.Logger("public-init") + + +from .kingfisher import SraMetadata + +SRA = SraMetadata() + + +def get_table_from_accessions(accessions, output_file="SRA_run_table.csv") -> None: - @staticmethod - def getDictTableFromSearchTerm(search): - # return types and databases listed here - # https://www.ncbi.nlm.nih.gov/books/NBK25499/table/chapter4.T._valid_values_of__retmode_and/?report=objectonly - # @todo need to add way to select only what is needed - payload = {"save": "efetch", "db": "sra", "rettype": "full", "term": search} - r = requests.get( - "http://trace.ncbi.nlm.nih.gov/Traces/sra/sra.cgi", params=payload + accessions = set(accessions) + + bioproject_accessions = [ + identifier + for identifier in accessions + if identifier.startswith("PRJ") or identifier[1:3] == "RP" + ] + + run_accessions = [ + identifier for identifier in accessions if identifier[1:3] == "RR" + ] + + not_searchable = list(accessions - set(run_accessions) - set(bioproject_accessions)) + n_searchable = len(bioproject_accessions) + len(run_accessions) + + if n_searchable < len(accessions): + + error_text = "I can only search for run-accessions, e.g. ERR1739691 " + " or projects e.g. PRJNA621514 or SRP260223\n" + + if n_searchable > 0: + error_text += f"I can search for {n_searchable} accessions,\n but " + + error_text += ( + f"I cannot search for {len(not_searchable)} accessions: " + + ",".join(not_searchable[0 : min(len(not_searchable), 4)]) ) - if 200 == r.status_code: - if r.text.isspace(): - raise Exception("Got blank string from " + str(r.url)) - else: - return xmltodict.parse(r.text) - """ - reader_list = csv.DictReader(io.StringIO()) - infoRows = [] - for row in reader_list: - infoRows.append(row) - if 0 == len(infoRows): - raise Exception('Found %d entries in SRA for "%s" when expecting at least 1' % (len(infoRows), search)) - else: - return infoRows - return infoRows - """ - else: - raise Exception( - "Error in downloading from " - + str(r.url) - + " got response code " - + str(r.status_code) - ) - @staticmethod - def getRunAccsFromInfoTable(infoTab): - runInfo = [] - for row in infoTab: - runInfo.append(str(row.get("Run"))) - return runInfo - - @staticmethod - def getSraUrlFromRunAccession(accesion): - if not type(accesion) is str: - raise Exception( - "Error in getSraUrlFromRunAccession: accesion should be str, not " - + str(type(accesion)) - ) - if len(accesion) < 7: - raise Exception( - "Error in getSraUrlFromRunAccession: accession should be least 7 character long, not " - + str(len(accesion)) - + ", for " - + str(accesion) - ) + raise Exception(error_text) - if not accesion.startswith("ERR") and not accesion.startswith("SRR"): - raise Exception( - "Error in getSraUrlFromRunAccession: accession should start with either ERR or SRR, not " - + str(accesion[0:3]) - ) + # get sra for project + + if len(bioproject_accessions) > 0: - template = "ftp://ftp-trace.ncbi.nlm.nih.gov/sra/sra-instant/reads/ByRun/sra/{PREFIX}/{PREFIX}{PREFIXNUMS}/{ACCESION}/{ACCESION}.sra" - return template.format( - PREFIX=accesion[0:3], PREFIXNUMS=accesion[3:6], ACCESION=accesion + run_accessions_for_bioprojects = SRA.fetch_runs_from_bioprojects( + bioproject_accessions ) - @staticmethod - def getInfoFromRunAcc(run): - if not run.startswith("ERR") and not run.startswith("SRR"): - raise Exception("run should start with ERR or SRR, not: " + run) - infoTab = SRAUtils.getInfoTableFromSearchTerm(run) - runInfo = [] - for row in infoTab: - if run == str(row.get("Run")): - runInfo.append(row) - if 0 == len(runInfo): - raise Exception( - 'Found %d entries in SRA for "%s" when expecting at least 1' - % (len(runInfo), run) - ) - else: - return runInfo - - @staticmethod - def getInfoFromSubmissionAcc(submission): - if not submission.startswith("ERA") and not submission.startswith("SRA"): - raise Exception( - "submission should start with ERR or SRR, not: " + submission - ) - infoTab = SRAUtils.getInfoTableFromSearchTerm(submission) - runInfo = [] - for row in infoTab: - if submission == str(row.get("Submission")): - runInfo.append(row) - if 0 == len(runInfo): - raise Exception( - 'Found %d entries in SRA for "%s" when expecting at least 1' - % (len(runInfo), submission) - ) - else: - return runInfo - - @staticmethod - def getInfoFromSampleAcc(sample): - if not sample.startswith("ERS") and not sample.startswith("SRS"): - raise Exception("sample should start with ERS or SRS, not: " + sample) - infoTab = SRAUtils.getInfoTableFromSearchTerm(sample) - runInfo = [] - for row in infoTab: - if sample == str(row.get("Sample")): - runInfo.append(row) - if 0 == len(runInfo): - raise Exception( - 'Found %d entries in SRA for "%s" when expecting at least 1' - % (len(runInfo), sample) - ) - else: - return runInfo - - @staticmethod - def getInfoFromProjectAcc(project): - if not project.startswith("ERP") and not project.startswith("SRP"): - raise Exception("project should start with ERP or SRP, not: " + project) - infoTab = SRAUtils.getInfoTableFromSearchTerm(project) - runInfo = [] - for row in infoTab: - if project == str(row.get("SRAStudy")): - runInfo.append(row) - if 0 == len(runInfo): - raise Exception( - 'Found %d entries in SRA for "%s" when expecting at least 1' - % (len(runInfo), project) - ) - else: - return runInfo - - @staticmethod - def getInfoFromExperimentAcc(experiment): - if not experiment.startswith("ERX") and not experiment.startswith("SRX"): - raise Exception( - "experiment should start with ERX or SRX, not: " + experiment - ) - infoTab = SRAUtils.getInfoTableFromSearchTerm(experiment) - runInfo = [] - for row in infoTab: - if experiment == str(row.get("Experiment")): - runInfo.append(row) - if 0 == len(runInfo): - raise Exception( - 'Found %d entries in SRA for "%s" when expecting at least 1' - % (len(runInfo), experiment) - ) - else: - return runInfo - - @staticmethod - def getInfoFromBioProjectAcc(bioProject): - # or - if ( - not bioProject.startswith("PRJNA") - and not bioProject.startswith("PRJEA") - and not bioProject.startswith("PRJEB") - and not bioProject.startswith("PRJDA") - ): - raise Exception( - "bioProject should start with PRJNA, PRJEA, PRJEB, and PRJDA, not: " - + bioProject - ) - infoTab = SRAUtils.getInfoTableFromSearchTerm(bioProject) - runInfo = [] - for row in infoTab: - if bioProject == str(row.get("BioProject")): - runInfo.append(row) - if 0 == len(runInfo): - raise Exception( - 'Found %d entries in SRA for "%s" when expecting at least 1' - % (len(runInfo), bioProject) - ) - else: - return runInfo - - @staticmethod - def getInfoFromBioSampleAcc(bioSample): - """ - SAMD DDBJ - SAME ENA/EBI - SAMN NCBI - """ - if ( - not bioSample.startswith("SAME") - and not bioSample.startswith("SAMD") - and not bioSample.startswith("SAMN") - ): - raise Exception( - "bioProject should start with SAME, SAMD, and SAMN, not: " + bioSample - ) - infoTab = SRAUtils.getInfoTableFromSearchTerm(bioSample) - runInfo = [] - for row in infoTab: - if bioSample == str(row.get("BioSample")): - runInfo.append(row) - if 0 == len(runInfo): - raise Exception( - 'Found %d entries in SRA for "%s" when expecting at least 1' - % (len(runInfo), bioSample) - ) - else: - return runInfo - - @staticmethod - def getInfoFromSRAIdentifier(identifier): - if identifier[1:3] == "RX": - return SRAUtils.getInfoFromExperimentAcc(identifier) - elif identifier[1:3] == "RP": - return SRAUtils.getInfoFromProjectAcc(identifier) - elif identifier[1:3] == "RS": - return SRAUtils.getInfoFromSampleAcc(identifier) - elif identifier[1:3] == "RR": - return SRAUtils.getInfoFromRunAcc(identifier) - elif identifier[1:3] == "RA": - return SRAUtils.getInfoFromSubmissionAcc(identifier) - elif identifier.startswith("SAM"): - # SAME, SAMD, and SAMN - return SRAUtils.getInfoFromBioSampleAcc(identifier) - elif identifier.startswith("PRJ"): - # DDBJ archvie bioproject prefix PRJNA SAMEA2796165 - return SRAUtils.getInfoFromBioProjectAcc(identifier) - else: - raise Exception( - "Error, unrecognized prefix for sra Identifier " + str(identifier) - ) + logger.info( + f"Found {len(run_accessions_for_bioprojects)} runs for {len(bioproject_accessions)} bioprojects" + ) + overlap = set(run_accessions) & set(run_accessions_for_bioprojects) -# main function + if len(overlap) > 0: + logger.warning( + f"Found {len(overlap)} runs that are in both the bioprojects and the run accessions, downloading only once" + ) + run_accessions_for_bioprojects = list( + set(run_accessions_for_bioprojects) - overlap + ) + run_accessions = run_accessions + run_accessions_for_bioprojects -def get_runtable_from_ids(identifiers, output_file="SRA_runtable.tsv", overwrite=False): - if type(identifiers) == str: - identifiers = [identifiers] + # Fetch runs + logger.info(f"Fetch info for {len(run_accessions)} runs") - with open(output_file, "w") as outInfoFile: - identifierCount = 0 + tab = SRA.efetch_sra_from_accessions(run_accessions) - # don't show progress bar if only one elelment - if len(identifiers) > 1: - identifier_with_progressbar = tqdm(identifiers) - else: - identifier_with_progressbar = identifiers + tab.set_index("run", inplace=True) - for identifier in identifier_with_progressbar: - try: - tab = SRAUtils.getInfoFromSRAIdentifier(identifier) - # write header - if 0 == identifierCount: - outInfoFile.write("\t".join(tab[0].keys()) + "\n") - for row in tab: - outInfoFile.write("\t".join(row.values()) + "\n") - identifierCount = identifierCount + 1 + tab.to_csv(output_file, index=True) - except Exception as err: - raise Exception( - "Failed to get info for " + str(identifier) + ", mess: " + str(err) - ) from err + logger.info(f"Table saved to {output_file}") def parse_arguments_from_terminal(): - ## Comand line interface + ## Command line interface import argparse parser = argparse.ArgumentParser() parser.add_argument( - "identifiers", + "accessions", type=str, - help="A list of space separated SRA identifiers e.g. SRP046206 SRX188939 SRS807544 SRR1759594 PRJNA63661", + help="A list of space separated SRA identifiers runs or bioprojects e.g. PRJNA63661 SRP046206 SRR1759594", nargs="+", ) parser.add_argument( @@ -350,15 +120,21 @@ def parse_arguments_from_terminal(): outprefix += "SRA" # Define output_file - output_file = outprefix + "_runtable.tsv" + output_file = outprefix + "_runtable.csv" if os.path.exists(output_file) and not args.overwrite: - raise Exception( - "File " + output_file + " already exists, use --overwrite to over write it" + logger.error( + f"File `{output_file}` already exists, use --overwrite to over write it or choose another prefix" ) + exit(1) + + accessions = args.accessions + # If one string transform + if type(accessions) == str: + accessions = [accessions] - return args.identifiers, output_file + return accessions, output_file if __name__ == "__main__": - get_runtable_from_ids(*parse_arguments_from_terminal()) + get_table_from_accessions(*parse_arguments_from_terminal()) diff --git a/atlas/init/kingfisher.py b/atlas/init/kingfisher.py new file mode 100644 index 00000000..b0db5d81 --- /dev/null +++ b/atlas/init/kingfisher.py @@ -0,0 +1,435 @@ +import os +import time +import requests +import xml.etree.ElementTree as ET +import logging +import collections +import json +import itertools + + +try: + from tqdm import tqdm +except ImportError: + tqdm = list + +import pandas as pd + + +# Define these constants so that they can be referred to in other classes +# without index errors. +STUDY_ACCESSION_KEY = "study_accession" +RUN_ACCESSION_KEY = "run" +BASES_KEY = "bases" +SAMPLE_NAME_KEY = "sample_name" +NCBI_API_KEY_ENV = "NCBI_API_KEY" +BIOPROJECT_ACCESSION_KEY = "bioproject" + + +# from bird_tool_utils import iterable_chunks +def iterable_chunks(iterable, n): + """Given an iterable, return it in chunks of size n. In the last chunk, the + remaining space is replaced by None entries. + """ + args = [iter(iterable)] * n + return itertools.zip_longest(*args, fillvalue=None) + + +class SraMetadata: + def add_api_key(self, other_params): + if NCBI_API_KEY_ENV in os.environ: + other_params["api_key"] = os.environ[NCBI_API_KEY_ENV] + return other_params + + def _retry_request(self, description, func): + """Retry a retests.post or requests.get 3 times, returning the request + when OK, otherwise raising an Exception""" + + num_retries = 3 + sleep_time = 60 + + def retrying(i, num_retries=3): + if i < num_retries - 1: + logging.warning( + "Retrying request (retry {} of {})".format(i + 1, num_retries - 1) + ) + + for i in range(num_retries): + try: + this_res = func() + if not this_res.ok: + logging.warning( + "Request not OK when {}: {}: {}".format( + description, this_res, this_res.text + ) + ) + logging.warning( + "Sleeping for {} seconds before retrying".format(sleep_time) + ) + time.sleep(60) + retrying(i) + else: + return this_res + except Exception as e: + logging.warning("Exception raised when {}: {}".format(description, e)) + logging.warning( + "Sleeping for {} seconds before retrying".format(sleep_time) + ) + time.sleep(60) + retrying(i) + raise Exception( + "Failed to {} after {} attempts".format(description, num_retries) + ) + + def fetch_runs_from_bioprojects(self, bioproject_accessions): + retmax = 10000 + query_string = " OR ".join( + [ + "{}[BioProject]".format(bioproject_accession) + for bioproject_accession in bioproject_accessions + ] + ) + logging.debug("Querying with string: {}".format(query_string)) + res = requests.get( + url="https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi", + params=self.add_api_key( + { + "db": "sra", + "term": query_string, + "tool": "kingfisher", + "email": "kingfisher@github.com", + "retmax": retmax, + "usehistory": "y", + } + ), + ) + if not res.ok: + raise Exception( + "HTTP Failure when requesting search from bioproject: {}: {}".format( + res, res.text + ) + ) + root = ET.fromstring(res.text) + sra_ids = list([c.text for c in root.find("IdList")]) + if len(sra_ids) == retmax: + logging.warning( + "Unexpectedly found the maximum number of results for this query, possibly some results will be missing" + ) + webenv = root.find("WebEnv").text + + # Now convert the IDs into runs + metadata = self.efetch_metadata_from_ids(webenv, None, len(sra_ids)) + return metadata[RUN_ACCESSION_KEY].to_list() + + def efetch_metadata_from_ids(self, webenv, accessions, num_ids): + data_frames = [] + + retmax = num_ids + 10 + logging.debug("Running efetch ..") + res = self._retry_request( + "efetch_from_ids", + lambda: requests.get( + url="https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi", + params=self.add_api_key( + { + "db": "sra", + "tool": "kingfisher", + "email": "kingfisher@github.com", + "webenv": webenv, + "query_key": 1, + } + ), + ), + ) + if not res.ok: + raise Exception( + "HTTP Failure when requesting efetch from IDs: {}: {}".format( + res, res.text + ) + ) + + root = ET.fromstring(res.text) + + def try_get(func): + try: + return func() + except AttributeError: + return "" + except KeyError: + return None + + if root.find("ERROR") is not None: + logging.error( + "Error when fetching metadata: {}".format(root.find("ERROR").text) + ) + + # Some samples such as SAMN13241871 are linked to multiple runs e.g. SRR10489833 + accessions_set = None if accessions is None else set(accessions) + + for pkg in root.findall("EXPERIMENT_PACKAGE"): + d = collections.OrderedDict() + d["experiment_accession"] = try_get( + lambda: pkg.find("./EXPERIMENT").attrib["accession"] + ) + d["experiment_title"] = try_get(lambda: pkg.find("./EXPERIMENT/TITLE").text) + l = pkg.find("./EXPERIMENT/DESIGN/LIBRARY_DESCRIPTOR") + d["library_name"] = try_get(lambda: l.find("LIBRARY_NAME").text) + d["library_strategy"] = try_get(lambda: l.find("LIBRARY_STRATEGY").text) + d["library_source"] = try_get(lambda: l.find("LIBRARY_SOURCE").text) + d["library_selection"] = try_get(lambda: l.find("LIBRARY_SELECTION").text) + d["library_layout"] = try_get(lambda: l.find("LIBRARY_LAYOUT")[0].tag) + d["platform"] = try_get(lambda: pkg.find("./EXPERIMENT/PLATFORM")[0].tag) + d["model"] = try_get(lambda: pkg.find("./EXPERIMENT/PLATFORM/")[0].text) + d["submitter"] = "" + for k, v in pkg.find("./SUBMISSION").attrib.items(): + if k not in ("accession", "alias"): + if d["submitter"] == "": + d["submitter"] = v + else: + d["submitter"] = "{}, {}".format(d["submitter"], v) + d[STUDY_ACCESSION_KEY] = try_get( + lambda: pkg.find("./STUDY").attrib["accession"] + ) + d[BIOPROJECT_ACCESSION_KEY] = try_get( + lambda: pkg.find( + './STUDY/IDENTIFIERS/EXTERNAL_ID[@namespace="BioProject"]' + ).text + ) + d["study_alias"] = try_get(lambda: pkg.find("./STUDY").attrib["alias"]) + d["study_centre_project_name"] = try_get( + lambda: pkg.find("./STUDY/DESCRIPTOR/CENTER_PROJECT_NAME").text + ) + d["organisation"] = try_get(lambda: pkg.find("./Organization/Name").text) + d["organisation_department"] = try_get( + lambda: pkg.find("./Organization/Address/Department").text + ) + d["organisation_institution"] = try_get( + lambda: pkg.find("./Organization/Address/Institution").text + ) + d["organisation_street"] = try_get( + lambda: pkg.find("./Organization/Address/Street").text + ) + d["organisation_city"] = try_get( + lambda: pkg.find("./Organization/Address/City").text + ) + d["organisation_country"] = try_get( + lambda: pkg.find("./Organization/Address/Country").text + ) + first_name = try_get( + lambda: pkg.find("./Organization/Contact/Name/First").text + ) + last_name = try_get( + lambda: pkg.find("./Organization/Contact/Name/Last").text + ) + d["organisation_contact_name"] = "{} {}".format(first_name, last_name) + d["organisation_contact_email"] = try_get( + lambda: pkg.find("./Organization/Contact").attrib["email"] + ) + # Remove carriage return from sample description e.g. for SRR1263047 + d["sample_description"] = try_get( + lambda: pkg.find("./SAMPLE/DESCRIPTION").text.replace("\r", "") + ) + d["sample_alias"] = try_get(lambda: pkg.find("./SAMPLE").attrib["alias"]) + d["sample_accession"] = try_get( + lambda: pkg.find("./SAMPLE").attrib["accession"] + ) + d["biosample"] = try_get( + lambda: pkg.find( + './SAMPLE/IDENTIFIERS/EXTERNAL_ID[@namespace="biosample"]' + ).text + ) + d["sample_title"] = try_get(lambda: pkg.find("./SAMPLE/TITLE").text) + d["taxon_name"] = try_get( + lambda: pkg.find("./SAMPLE/SAMPLE_NAME/SCIENTIFIC_NAME").text + ) + sample_sample_name = None + sample_title = None + if pkg.find("./SAMPLE/SAMPLE_ATTRIBUTES"): + for attr in pkg.find("./SAMPLE/SAMPLE_ATTRIBUTES"): + tag_el = attr.find("TAG") + value_el = attr.find("VALUE") + + # Some samples like + # https://www.ncbi.nlm.nih.gov/sra?term=ERS1240061&report=FullXml + # have entries with a tag, but no value. Ignore these. + if tag_el is not None and value_el is not None: + tag = tag_el.text + value = value_el.text + if tag == "Title": + sample_title = value + elif tag == "sample name": + sample_sample_name = value + d[tag] = value + if sample_sample_name is not None: + d[SAMPLE_NAME_KEY] = sample_sample_name + elif sample_title is not None: + d[SAMPLE_NAME_KEY] = sample_title + else: + d[SAMPLE_NAME_KEY] = d[ + "library_name" + ] # default, maybe there's always a title though? + d["study_title"] = try_get( + lambda: pkg.find("./STUDY/DESCRIPTOR/STUDY_TITLE").text + ) + d["design_description"] = try_get( + lambda: pkg.find("./EXPERIMENT/DESIGN/DESIGN_DESCRIPTION").text + ) + d["study_abstract"] = try_get( + lambda: pkg.find("./STUDY/DESCRIPTOR/STUDY_ABSTRACT").text + ) + study_links_xrefs = try_get(lambda: pkg.find("./STUDY/STUDY_LINKS")) + if study_links_xrefs is not None: + # Convert db to lower case because otherwise have PUBMED and pubmed e.g. ERR1914274 and SRR9113719 + study_links = list( + [ + {"db": x.find("DB").text.lower(), "id": x.find("ID").text} + for x in study_links_xrefs.findall("STUDY_LINK/XREF_LINK") + ] + ) + # Record URL links like SRR7051324 + for x in study_links_xrefs.findall("STUDY_LINK/URL_LINK"): + study_links.append( + {"label": x.find("LABEL").text, "url": x.find("URL").text} + ) + d["study_links"] = json.dumps(study_links) + else: + d["study_links"] = json.dumps([]) + + # Account for the fact that multiple runs may be associated with + # this sample + d["number_of_runs_for_sample"] = len(pkg.findall("./RUN_SET/RUN")) + + for run in pkg.findall("./RUN_SET/RUN"): + accession_here = run.attrib["accession"] + if accessions_set is None or accession_here in accessions_set: + d2 = d.copy() + d2["spots"] = try_get(lambda: int(run.attrib["total_spots"])) + d2[BASES_KEY] = try_get(lambda: int(run.attrib["total_bases"])) + d2["run_size"] = try_get(lambda: int(run.attrib["size"])) + d2[RUN_ACCESSION_KEY] = try_get(lambda: run.attrib["accession"]) + d2["published"] = try_get(lambda: run.attrib["published"]) + stats = run.find("Statistics") + if stats is not None: + for i, r in enumerate(stats): + d2["read{}_length_average".format(i + 1)] = r.attrib[ + "average" + ] + d2["read{}_length_stdev".format(i + 1)] = r.attrib["stdev"] + data_frames.append(d2) + + return pd.DataFrame(data_frames) + + def _print_xml(self, element, prefix): + if prefix is None or prefix == "": + p2 = "" + else: + p2 = "{}_".format(prefix) + for k, v in element.attrib.items(): + print("\t".join(["{}{}".format(p2, k), v])) + if element.text: + print("\t".join(["{}{}".format(p2, element.tag), element.text])) + for e in element: + self.print_xml(e, "{}{}".format(p2, e.tag)) + + def efetch_sra_from_accessions(self, accessions): + all_accessions = list(set(accessions)) + if len(all_accessions) == 0: + return [] + + metadata_chunks = [] + chunk_size = 500 + + # Complicated calculation here. + num_chunks = len(all_accessions) / chunk_size + if num_chunks > int(num_chunks): + num_chunks = int(num_chunks) + 1 + num_chunks = int(num_chunks) + + if len(all_accessions) > chunk_size: + logging.info( + "Querying for {} accessions in {} chunks".format( + len(all_accessions), num_chunks + ) + ) + accession_iter = tqdm( + iterable_chunks(all_accessions, chunk_size), total=num_chunks + ) + else: + accession_iter = [all_accessions] + + for accessions_plus in accession_iter: + accessions = [a for a in accessions_plus if a is not None] + + if num_chunks == 1: + logging.info( + "Querying NCBI esearch for {} distinct accessions e.g. {}".format( + len(accessions), accessions[0] + ) + ) + sra_ids = [] + + webenv = None + request_term = " OR ".join(["{}[accn]".format(acc) for acc in accessions]) + + retmax = len(accessions) + 10 + params = self.add_api_key( + { + "db": "sra", + "term": request_term, + "tool": "kingfisher", + "email": "kingfisher@github.com", + "retmax": retmax, + "usehistory": "y", + } + ) + if webenv is None: + params["WebEnv"] = webenv + + res = self._retry_request( + "esearch from accessions", + lambda: requests.post( + url="https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi", + data=params, + ), + ) + + root = ET.fromstring(res.text) + if webenv is None: + webenv = root.find("WebEnv").text + id_list_node = root.find("IdList") + sra_ids = list(set([c.text for c in id_list_node])) + + if len(sra_ids) == 0: + logging.warning( + "Unable to find any accessions, from the list: {}".format( + accessions + ) + ) + return None + + if num_chunks == 1: + logging.info( + "Querying NCBI efetch for {} distinct IDs e.g. {}".format( + len(sra_ids), sra_ids[0] + ) + ) + metadata = self.efetch_metadata_from_ids(webenv, accessions, len(sra_ids)) + + # Ensure all hits are found, and trim results to just those that are real hits + if RUN_ACCESSION_KEY not in metadata.columns: + raise Exception("No metadata could be retrieved") + + if len(metadata) != len(accessions): + found_runs = set(metadata[RUN_ACCESSION_KEY].to_list()) + not_found = list([a for a in accessions if a not in found_runs]) + logging.warning( + "Unable to find all accessions. The {} missing ones were: {}".format( + len(not_found), not_found + ) + ) + metadata_chunks.append(metadata) + + metadata = pd.concat(metadata_chunks) + metadata.sort_values([STUDY_ACCESSION_KEY, RUN_ACCESSION_KEY], inplace=True) + + return metadata diff --git a/atlas/init/parse_sra.py b/atlas/init/parse_sra.py index 0c2c41ff..bddaad2e 100644 --- a/atlas/init/parse_sra.py +++ b/atlas/init/parse_sra.py @@ -6,42 +6,54 @@ Expected_library_values = { - "LibrarySelection": "RANDOM", - "LibraryStrategy": "WGS", - "LibrarySource": "METAGENOMIC", - "Platform": "ILLUMINA", + "library_selection": "RANDOM", + "library_strategy": "WGS", + "library_source": "METAGENOMIC", + "platform": "ILLUMINA", } +# Values that should be the same for all runs from the same sample +Expected_same_values = ["experiment_accession", "model", "library_name"] -def load_and_validate_runinfo_table(path): - RunTable = pd.read_csv(path, sep="\t", index_col=0) + +SAMPLE_NAME_COLUMN = "sample_accession" + + +def load_and_validate_runinfo_table(path="RunInfo.csv"): + RunTable = pd.read_csv(path, index_col=0) # validate sra table format_error = False # check if all headers are present Expected_headers = [ - "LibraryLayout", - "LibrarySource", - "LibrarySelection", - "LibraryStrategy", - "BioSample", + "library_layout", + "library_source", + "library_selection", + "library_strategy", + "sample_accession", ] for header in Expected_headers: if not header in RunTable.columns: logger.error(f"Didn't found expected header {header}") format_error = True - if not all(RunTable.index.str[1:2] == "R"): + # set to string all expected headers + RunTable[Expected_headers] = RunTable[Expected_headers].astype(str) + + if not all(RunTable.index.str[1:3] == "RR"): logger.error("Expect runs as index, e.g. [E,S,D]RR000") format_error = True - if not RunTable.BioSample.str.startswith("SAM").all(): - logger.error("BioSample should start with 'SAM'") + assert ( + SAMPLE_NAME_COLUMN == "sample_accession" + ), "This code is not tested for other sample names" + if not (RunTable.sample_accession.str[1:3] == "RS").all(): + logger.error("sample_accession should start with [E,S,D]RS") format_error = True - if not RunTable.LibraryLayout.isin(["PAIRED", "SINGLE"]).all(): - logger.error("LibraryLayout should be 'PAIRED' or 'SINGLE'") + if not RunTable.library_layout.isin(["PAIRED", "SINGLE"]).all(): + logger.error("library_layout should be 'PAIRED' or 'SINGLE'") format_error = True if format_error: @@ -53,12 +65,12 @@ def load_and_validate_runinfo_table(path): def filter_runinfo(RunTable, ignore_paired=False): logger.info( - f"Start with {RunTable.shape[0]} runs from {RunTable.BioSample.unique().shape[0]} samples" + f"Start with {RunTable.shape[0]} runs from {RunTable[SAMPLE_NAME_COLUMN].unique().shape[0]} samples" ) # Filter out reads that are not metagenomics - for key in ["LibrarySource"]: + for key in ["library_source"]: Nruns_before = RunTable.shape[0] All_values = RunTable[key].unique() RunTable = RunTable.loc[RunTable[key] == Expected_library_values[key]] @@ -72,7 +84,7 @@ def filter_runinfo(RunTable, ignore_paired=False): f"Filtered out {Difference} runs" ) - for key in ["LibrarySelection", "LibraryStrategy"]: + for key in ["library_selection", "library_strategy"]: Nruns_before = RunTable.shape[0] All_values = RunTable[key].unique() if any(RunTable[key] != Expected_library_values[key]): @@ -83,8 +95,8 @@ def filter_runinfo(RunTable, ignore_paired=False): # Handle single end reads if mixed - if ("PAIRED" in RunTable.LibraryLayout) and ("SINGLE" in RunTable.LibraryLayout): - N_library_layout = RunTable.LibraryLayout.value_counts() + if ("PAIRED" in RunTable.library_layout) and ("SINGLE" in RunTable.library_layout): + N_library_layout = RunTable.library_layout.value_counts() logger.info( f"Run table contains {N_library_layout['SINGLE']} single-end " @@ -93,20 +105,20 @@ def filter_runinfo(RunTable, ignore_paired=False): if ignore_paired: logger.info(f"I drop {N_library_layout['PAIRED']} paired end libraries") - RunTable = RunTable.query("LibraryLayout == 'SINGLE'") + RunTable = RunTable.query("library_layout == 'SINGLE'") else: logger.warning(f"I drop {N_library_layout['SINGLE']} single end libraries") - RunTable = RunTable.query("LibraryLayout == 'PAIRED'") + RunTable = RunTable.query("library_layout == 'PAIRED'") # Illumina or not - if not RunTable.Platform.isin(["ILLUMINA"]).all(): - Platforms = ", ".join(RunTable.Platform.unique()) + if not RunTable.platform.isin(["ILLUMINA"]).all(): + platforms = ", ".join(RunTable.platform.unique()) logger.warning( - f"Your samples are sequenced on the folowing platform: {Platforms}\n" + f"Your samples are sequenced on the following platform: {platforms}\n" "I don't know how well Atlas handles non-illumina reads.\n" "If you have long-reads, specify them via a the longreads, column in the sample table." ) @@ -114,7 +126,7 @@ def filter_runinfo(RunTable, ignore_paired=False): # Final if RunTable.shape[0] > 0: logger.info( - f"Selected {RunTable.shape[0]} runs from {RunTable.BioSample.unique().shape[0]} samples" + f"Selected {RunTable.shape[0]} runs from {RunTable[SAMPLE_NAME_COLUMN].unique().shape[0]} samples" ) else: @@ -127,32 +139,39 @@ def filter_runinfo(RunTable, ignore_paired=False): def validate_merging_runinfo(path): RunTable = load_and_validate_runinfo_table(path) - # If each run is from a different biosample, merging is not necessary - if RunTable.shape[0] == RunTable.BioSample.unique().shape[0]: + # If each run is from a different SAMPLE_NAME_COLUMN, merging is not necessary + if RunTable.shape[0] == RunTable[SAMPLE_NAME_COLUMN].unique().shape[0]: return RunTable # Cannot merge if different platforms problematic_samples = [] - for sample, df in RunTable.groupby("BioSample"): - if not all(df.Platform == df.Platform.iloc[0]): + for sample, df in RunTable.groupby(SAMPLE_NAME_COLUMN): + if not all(df.platform == df.platform.iloc[0]): problematic_samples.append(sample) if len(problematic_samples) > 0: logger.error( - f"You attemt to merge runs from the same sample. " - f"But for {len(problematic_samples)} samples the runs are sequenced with different platforms and should't be merged.\n" - f"Please resolve the abiguity in the table {path} and rerun the command.\n" + f"You attempt to merge runs from the same sample. " + f"But for {len(problematic_samples)} samples the runs are sequenced with different platforms and shouldn't be merged.\n" + f"Please resolve the ambiguity in the table {path} and rerun the command.\n" ) exit(1) - # Warn if samples are not identical for the follwing columns - Expected_same_values = ["Experiment", "Model", "LibraryName"] + # Warn if samples are not identical values if expected the same + for key in Expected_same_values: problematic_samples = [] - for sample, df in RunTable.groupby("BioSample"): - if not all(df[key] == df[key].iloc[0]): - problematic_samples.append(sample) + + if key not in RunTable.columns: + logger.warning( + f"Didn't found column {key} in RunTable at {path}, don't check for identical values" + ) + else: + + for sample, df in RunTable.groupby(SAMPLE_NAME_COLUMN): + if not all(df[key] == df[key].iloc[0]): + problematic_samples.append(sample) if len(problematic_samples) > 0: if len(problematic_samples) > 5: @@ -161,11 +180,20 @@ def validate_merging_runinfo(path): problematic_samples_list = " ".join(problematic_samples) logger.warning( - "You attemt to merge runs from the same sample. " + "You attempt to merge runs from the same sample. " f"But for {len(problematic_samples)} samples the runs have different {key}: {problematic_samples_list}\n" f"You can modify the table {path} and rerun the command.\n" ) - logger.info("I will automatically merge runs from the same biosample.") + logger.info("I will automatically merge runs from the same sample.") return RunTable + + +def get_run_ids_for_sample(run_table, sample): + + return run_table.query(f"{SAMPLE_NAME_COLUMN} == '{sample}'").index.tolist() + + +def get_all_sample_names(run_table): + return run_table[SAMPLE_NAME_COLUMN].unique().tolist() diff --git a/atlas/sample_table.py b/atlas/sample_table.py index be7d5217..cb8b135f 100644 --- a/atlas/sample_table.py +++ b/atlas/sample_table.py @@ -60,6 +60,9 @@ def validate_sample_table(sampleTable): def load_sample_table(sample_table="samples.tsv"): sampleTable = pd.read_csv(sample_table, index_col=0, sep="\t") + + sampleTable.index = sampleTable.index.astype(str) + validate_sample_table(sampleTable) return sampleTable @@ -127,7 +130,7 @@ def validate_bingroup_size(sampleTable, config, logger): if config["final_binner"] == "DASTool": binners = config["binner"] - logger.info(f"DASTool uses the folowing binners: {binners}") + logger.info(f"DASTool uses the following binners: {binners}") if ("vamb" in binners) or ("SemiBin" in binners): validate_bingroup_size_cobinning(sampleTable, logger) diff --git a/atlasenv.yml b/atlasenv.yml index c1ec80ea..c631ffe5 100644 --- a/atlasenv.yml +++ b/atlasenv.yml @@ -16,3 +16,4 @@ dependencies: - ruamel.yaml >=0.17 - cookiecutter - wget + - xmltodict diff --git a/docs/usage/getting_started.rst b/docs/usage/getting_started.rst index 3c53f42b..f824aeea 100644 --- a/docs/usage/getting_started.rst +++ b/docs/usage/getting_started.rst @@ -15,7 +15,7 @@ Atlas is based on snakemake, which allows to run steps of the workflow in parall If you want to try atlas and have a linux computer (OSX may also work), you can use our `example data`_ for testing. -For real metagenomic data atlas should be run on a _linux_ sytem, with enough memory (min ~50GB but assembly usually requires 250GB). +For real metagenomic data atlas should be run on a _linux_ system, with enough memory (min ~50GB but assembly usually requires 250GB). @@ -183,12 +183,12 @@ You can run ``atlas init-public `` and specify any ids, like bioproject Atlas does the following steps: 1. Search SRA for the corresponding sequences (Runs) and save them in the file ``SRA/RunInfo_original.tsv``. For example, if you specify a Bioproject, it fetches the information for all runs of this project. - 2. Atlas filters the runs to contain only valid metagenome sequences. E.g. exclude singleton reads, 16S. The output will be saved in ``RunInfo.tsv`` - 3. Sometimes the same Sample is sequenced on different lanes, which will result into multiple runs from the same sample. Atlas will **merge** runs from the same biosample. + 2. Atlas filters the runs to contain only valid metagenome sequences. E.g. exclude singleton reads, 16S. The output will be saved in ``RunInfo.csv`` + 3. Sometimes the same Sample is sequenced on different lanes, which will result into multiple runs from the same sample. Atlas will **merge** runs from the same sample. 4. Prepare a sample table and a config.yaml similar to the ``atlas init`` command. -If you are not happy with the filtering atlas performs, you can go back to the ``SRA/RunInfo_original.tsv`` and create a new ``RunInfo.tsv``. +If you are not happy with the filtering atlas performs, you can go back to the ``SRA/RunInfo_original.tsv`` and create a new ``RunInfo.csv``. If you then rerun ``atlas init-public continue`` it will continue from your modified RunInfo and do step 3. & 4. above. @@ -203,8 +203,8 @@ The downloaded reads are directly processed. However, if you only want to downlo atlas run None download_sra -Example: Downloading reads from the human microbiome project2 -````````````````````````````````````````````````````````````` +Example: Downloading reads from the human microbiome project 2 +`````````````````````````````````````````````````````````````` :: atlas init-public --working-dir HMP2 PRJNA398089 @@ -213,12 +213,12 @@ Gives the output:: [Atlas] INFO: Downloading runinfo from SRA [Atlas] INFO: Start with 2979 runs from 2979 samples - [Atlas] INFO: Runs have the folowing values for LibrarySource: METAGENOMIC, METATRANSCRIPTOMIC - Select only runs LibrarySource == METAGENOMIC, Filtered out 762 runs - [Atlas] INFO: Runs have the folowing values for LibrarySelection: PCR, RT-PCR, RANDOM - Select only runs LibrarySelection == RANDOM, Filtered out 879 runs + [Atlas] INFO: Runs have the following values for library_source: METAGENOMIC, METATRANSCRIPTOMIC + Select only runs library_source == METAGENOMIC, Filtered out 762 runs + [Atlas] INFO: Runs have the following values for library_selection: PCR, RT-PCR, RANDOM + Select only runs library_selection == RANDOM, Filtered out 879 runs [Atlas] INFO: Selected 1338 runs from 1338 samples - [Atlas] INFO: Write filtered runinfo to HMP2/RunInfo.tsv + [Atlas] INFO: Write filtered runinfo to HMP2/RunInfo.csv [Atlas] INFO: Prepared sample table with 1338 samples [Atlas] INFO: Configuration file written to HMP2/config.yaml You may want to edit it using any text editor. @@ -269,7 +269,7 @@ We recommend to use atlas on a :ref:`cluster` system, which can be set up in a v -h, --help Show this message and exit. -Execue Atlas +Execute Atlas ************ diff --git a/docs/usage/output.rst b/docs/usage/output.rst index e0f2237a..76b65a2f 100644 --- a/docs/usage/output.rst +++ b/docs/usage/output.rst @@ -94,7 +94,7 @@ Genomes atlas run genomes -Binning can predict several times the same genome from different samples. To remove this reduncancy we use DeRep to filter and de-replicate the genomes. By default the threshold is set to **97.5%**, which corresponds somewhat to the *sub-species level*. The best quality genome for each cluster is choosen as the representative for each cluster. The represenative MAG are then renamed and used for annotation and quantification. +Binning can predict several times the same genome from different samples. To remove this reduncancy we use DeRep to filter and de-replicate the genomes. By default the threshold is set to **97.5%**, which corresponds somewhat to the *sub-species level*. The best quality genome for each cluster is chosen as the representative for each cluster. The representative MAG are then renamed and used for annotation and quantification. The fasta sequence of the dereplicated and renamed genomes can be found in ``genomes/genomes`` and their quality estimation are in ``genomes/checkm/completeness.tsv``. @@ -138,7 +138,7 @@ All trees are properly rooted using the midpoint. The files can be found in ``ge **Functional annotation** -Sicne version 2.8, We use `DRAM `_ to annotate the genomes with Functional annotations, e.g. KEGG and CAZy as well as to **infere pathways**, or more specifically Kegg modules. +Since version 2.8, We use `DRAM `_ to annotate the genomes with Functional annotations, e.g. KEGG and CAZy as well as to **infere pathways**, or more specifically Kegg modules. The Functional annotations for each genome can be found in ``genomes/annotations/dram/`` @@ -148,7 +148,7 @@ and are contain the following files: - ``annotations.tsv`` Table of all annotations - ``distil/metabolism_summary.xlsx`` Excel of the summary of all annotations - The tool alos produces a nice report in `distil/product.html`_. + The tool also produces a nice report in `distil/product.html`_. .. _distil/product.html: ../_static/dram_product.html @@ -290,7 +290,7 @@ Here is the R code to calculate the gene copies per million (analogous to transc Before version 2.15 the output of the counts were stored in a parquet file. -The parquet file can be opended easily with ``pandas.read_parquet`` or ``arrow::read_parquet```. +The parquet file can be opened easily with ``pandas.read_parquet`` or ``arrow::read_parquet```. However you need to load the full data into memory. .. code-block:: R diff --git a/setup.cfg b/setup.cfg index 0ef878c3..b0caaec3 100644 --- a/setup.cfg +++ b/setup.cfg @@ -4,3 +4,10 @@ style = pep440 versionfile_source = atlas/_version.py versionfile_build = atlas/_version.py tag_prefix = v + + +[codespell] +check-filenames = +skip = .git,*.pdf,*.svg,versioneer.py,*.css,*.html,databases,test +ignore-words-list = BRITE +check-hidden = \ No newline at end of file diff --git a/test/test_sra.sh b/test/test_sra.sh index 57dde706..0f454d80 100755 --- a/test/test_sra.sh +++ b/test/test_sra.sh @@ -15,16 +15,36 @@ echo "WD="$WD atlas init-public PRJEB20796 -w $WD -echo "Run Atlas" +echo "Dry run Atlas" atlas run qc -w $WD --dry-run $@ -echo "Download reads from HMP" -WD=$Test_dir/"HMP" + + +## single end + +echo "Now with a single end sample" + +WD=$Test_dir/"SingleEnd" echo "WD="$WD +atlas init-public ERR2213683 -w $WD + +echo "Dry run Atlas" + +atlas run qc -w $WD --dry-run $@ + + + + + # this fails as HMP have samples sequenced with different platforms +# HMP is also single end + +echo "Download reads from HMP" +WD=$Test_dir/"HMP" +echo "WD="$WD set +e @@ -35,39 +55,19 @@ echo "(expected errors)" echo "drop illumina samples" -sed -i.bak '/ILLUMINA/d' $WD/RunInfo.tsv - -# modify assembler as spades cannot handle single end reads +sed -i.bak '/ILLUMINA/d' $WD/RunInfo.csv -# python << END -# from ruamel.yaml import YAML -# yaml = YAML() -# config_file="$WD/config.yaml" -# config= yaml.load(open(config_file)) -# config['assembler'] = 'megahit' -# yaml.dump(config, open(config_file, 'w')) -# END -echo "create sample table" +echo "Continue public init" atlas init-public continue -w $WD -echo "Run Atlas" +echo "Dry run Atlas" atlas run qc -w $WD --dry-run $@ -## single end - -echo "Now with a single end sample" - -WD=$Test_dir/"SingleEnd" -echo "WD="$WD - -atlas init-public SAMEA104416160 -w $WD -atlas run None download_sra -w $WD $@ - -## smal data +## small data echo "Download reads from small dataset for real test" @@ -77,8 +77,18 @@ echo "WD="$WD echo "gives warning as library is selected with PCR" -atlas init-public SAMEA9831203 SAMEA9831204 -w $WD +atlas init-public ERR1739691 ERR1739692 -w $WD echo "Run Atlas" -atlas run None download_sra -w $WD $@ \ No newline at end of file +atlas run None download_sra -w $WD $@ + + +# Test single end + +echo "Run single end test" + +WD=$Test_dir/"SingleEnd" +echo "WD="$WD + +atlas run None download_sra -w $WD $@ diff --git a/workflow/Snakefile b/workflow/Snakefile index 2c12c552..e0f53441 100644 --- a/workflow/Snakefile +++ b/workflow/Snakefile @@ -15,11 +15,11 @@ import utils # add default config -# comand line adds user config +# command line adds user config configfile: os.path.join(workflow_folder, "..", "config", "default_config.yaml") -# add defualt values from python (TODO: replace this) +# add default values from python (TODO: replace this) from atlas.make_config import update_config as atlas_update_config config = atlas_update_config(config) @@ -227,7 +227,7 @@ for r in workflow.rules: # default r.resources["mem_mb"] = config["mem"] * 1000 - # add time if ot present. Simple jobs use simple time + # add time if not present. Simple jobs use simple time if "time_min" not in r.resources: r.resources["time_min"] = config["runtime"]["default"] * 60 diff --git a/workflow/envs/kingfisher.yaml b/workflow/envs/kingfisher.yaml new file mode 100644 index 00000000..5ccef509 --- /dev/null +++ b/workflow/envs/kingfisher.yaml @@ -0,0 +1,6 @@ +channels: + - conda-forge + - bioconda + - defaults +dependencies: + - kingfisher=0.4 diff --git a/workflow/report/assembly_report.py b/workflow/report/assembly_report.py index 61ecf740..71c966dd 100644 --- a/workflow/report/assembly_report.py +++ b/workflow/report/assembly_report.py @@ -29,7 +29,7 @@ def handle_exception(exc_type, exc_value, exc_traceback): # Install exception handler sys.excepthook = handle_exception -#### Begining of scripts +#### Beginning of scripts from common_report import * diff --git a/workflow/report/bin_report.py b/workflow/report/bin_report.py index c2918139..693c4297 100644 --- a/workflow/report/bin_report.py +++ b/workflow/report/bin_report.py @@ -29,7 +29,7 @@ def handle_exception(exc_type, exc_value, exc_traceback): # Install exception handler sys.excepthook = handle_exception -#### Begining of scripts +#### Beginning of scripts from common_report import * diff --git a/workflow/report/qc_report.py b/workflow/report/qc_report.py index dbc132cf..b7fb190d 100644 --- a/workflow/report/qc_report.py +++ b/workflow/report/qc_report.py @@ -27,7 +27,7 @@ def handle_exception(exc_type, exc_value, exc_traceback): # Install exception handler sys.excepthook = handle_exception -#### Begining of scripts +#### Beginning of scripts from common_report import * @@ -158,7 +158,7 @@ def make_plots( Quality_QC_pe, Quality_QC_se = get_stats_from_zips(zipfiles_QC, samples) # Quality_raw_pe, Quality_raw_se = get_stats_from_zips(zipfiles_QC,samples) - # detrmine range of quality values and if paired + # determine range of quality values and if paired max_quality = 1 + np.nanmax((Quality_QC_pe.max().max(), Quality_QC_se.max().max())) quality_range = [min_quality, max_quality] diff --git a/workflow/rules/assemble.smk b/workflow/rules/assemble.smk index 7ccd8a93..27454243 100644 --- a/workflow/rules/assemble.smk +++ b/workflow/rules/assemble.smk @@ -90,7 +90,7 @@ else: # make symlink assert len(input) == len( output - ), "Input and ouput files have not same number, can not create symlinks for all." + ), "Input and output files have not same number, can not create symlinks for all." for i in range(len(input)): os.symlink(os.path.abspath(input[i]), output[i]) @@ -170,7 +170,7 @@ rule error_correction: params: inputs=lambda wc, input: io_params_for_tadpole(input), outputs=lambda wc, output: io_params_for_tadpole(output, key="out"), - prefilter=2, # Ignore kmers with less than 2 occurance + prefilter=2, # Ignore kmers with less than 2 occurrence minprob=config["error_correction_minprob"], tossdepth=config["error_correction_minimum_kmer_depth"], tossjunk="t" if config["error_correction_remove_lowdepth"] else "f", @@ -656,7 +656,7 @@ rule pileup_contigs_sample: benchmark: "logs/benchmarks/assembly/calculate_coverage/pileup/{sample}.txt" log: - "{sample}/logs/assembly/calculate_coverage/pilup_final_contigs.log", # This log file is uesd for report + "{sample}/logs/assembly/calculate_coverage/pilup_final_contigs.log", # This log file is used for report conda: "%s/required_packages.yaml" % CONDAENV threads: config.get("threads", 1) diff --git a/workflow/rules/bin_quality.smk b/workflow/rules/bin_quality.smk index 036ce115..89403ade 100644 --- a/workflow/rules/bin_quality.smk +++ b/workflow/rules/bin_quality.smk @@ -233,7 +233,7 @@ rule get_bin_filenames: def get_list_of_files(dirs, pattern): fasta_files = [] - # searh for fasta files (.f*) in all bin folders + # search for fasta files (.f*) in all bin folders for dir in dirs: dir = Path(dir) fasta_files += list(dir.glob(pattern)) diff --git a/workflow/rules/binning.smk b/workflow/rules/binning.smk index c6d44997..5a3ffd5b 100644 --- a/workflow/rules/binning.smk +++ b/workflow/rules/binning.smk @@ -46,12 +46,12 @@ rule get_contig_coverage_from_bb: output: temp("{sample}/binning/coverage/{sample_reads}_coverage.txt"), run: - with open(input[0]) as fi, open(output[0], "w") as fo: + with open(input[0]) as fi, open(output[0], "w") as fout: # header next(fi) for line in fi: toks = line.strip().split("\t") - print(toks[0], toks[1], sep="\t", file=fo) + print(toks[0], toks[1], sep="\t", file=fout) rule combine_coverages: @@ -92,7 +92,7 @@ rule run_concoct: "{sample}/binning/concoct/intermediate_files/log.txt", conda: "%s/concoct.yaml" % CONDAENV - threads: 10 # concoct uses 10 threads by default, wit for update: https://github.com/BinPro/CONCOCT/issues/177 + threads: 10 # concoct uses 10 threads by default, with for update: https://github.com/BinPro/CONCOCT/issues/177 resources: mem_mb=config["mem"] * 1000, shell: @@ -137,7 +137,7 @@ rule get_metabat_depth_file: "{sample}/binning/metabat/metabat.log", conda: "../envs/metabat.yaml" - threads: config["threads"] # multithreaded trough OMP_NUM_THREADS + threads: config["threads"] # multithreaded through OMP_NUM_THREADS resources: mem_mb=config["mem"] * 1000, params: diff --git a/workflow/rules/cobinning.smk b/workflow/rules/cobinning.smk index 31fdb753..57861f39 100644 --- a/workflow/rules/cobinning.smk +++ b/workflow/rules/cobinning.smk @@ -43,7 +43,7 @@ def get_filtered_contigs_of_bingroup(wildcards): if len(samples_of_group) < 5: raise ValueError( f"Bin group {wildcards.bingroup} has {len(samples_of_group)} less than 5 samples." - "For cobinning we reccomend at least 5 samples per bin group." + "For cobinning we recommend at least 5 samples per bin group." "Adapt the sample.tsv to set BinGroup of size [5- 1000]" ) @@ -68,7 +68,7 @@ rule combine_contigs: log: "logs/cobinning/{bingroup}/combine_contigs.log", params: - seperator=config["cobinning_separator"], + separator=config["cobinning_separator"], samples=get_samples_of_bingroup, threads: 1 run: @@ -80,7 +80,7 @@ rule combine_contigs: for line in fin: # if line is a header add sample name if line[0] == ord(">"): - line = f">{sample}{params.seperator}".encode() + line[1:] + line = f">{sample}{params.separator}".encode() + line[1:] # write each line to the combined file fout.write(line) @@ -176,7 +176,7 @@ rule summarize_bam_contig_depths: "logs/cobinning/{bingroup}/combine_coverage.log", conda: "../envs/metabat.yaml" - threads: config["threads"] # multithreaded trough OMP_NUM_THREADS + threads: config["threads"] # multithreaded through OMP_NUM_THREADS benchmark: "logs/benchmarks/cobinning/{bingroup}/summarize_bam_contig_depths.tsv" resources: diff --git a/workflow/rules/download.smk b/workflow/rules/download.smk index 8842041c..dfe7c1a5 100644 --- a/workflow/rules/download.smk +++ b/workflow/rules/download.smk @@ -2,7 +2,7 @@ import hashlib import os from pathlib import Path -# this values are incuded in the snakefile +# this values are included in the snakefile DBDIR = Path(config["database_dir"]).resolve() GUNCDIR = DBDIR / "gunc_database" diff --git a/workflow/rules/genecatalog.smk b/workflow/rules/genecatalog.smk index a0ee605c..13007049 100644 --- a/workflow/rules/genecatalog.smk +++ b/workflow/rules/genecatalog.smk @@ -376,7 +376,7 @@ rule combine_gene_coverages: # TODO: combine RPKM -# TODO: caluclate mapping rate from pileup mapping files +# TODO: calculate mapping rate from pileup mapping files # logs/Genecatalog/alignment/sample2_pileup.log # Reads: 1207217 # Mapped reads: 1071071 diff --git a/workflow/rules/patch.smk b/workflow/rules/patch.smk index 5698fe7c..ba7992da 100644 --- a/workflow/rules/patch.smk +++ b/workflow/rules/patch.smk @@ -2,7 +2,7 @@ localrules: copy_assembly, -# Rules that are usefull temporarily to update to new version of atlas +# Rules that are useful temporarily to update to new version of atlas ruleorder: copy_assembly > finalize_contigs diff --git a/workflow/rules/predict_genes_of_genomes.py b/workflow/rules/predict_genes_of_genomes.py index 3e91f0bd..dcdd2fe2 100644 --- a/workflow/rules/predict_genes_of_genomes.py +++ b/workflow/rules/predict_genes_of_genomes.py @@ -31,7 +31,7 @@ def handle_exception(exc_type, exc_value, exc_traceback): # Install exception handler sys.excepthook = handle_exception -#### Begining of scripts +#### Beginning of scripts # python 3.5 without f strings @@ -51,7 +51,7 @@ def predict_genes(genome, fasta, out_dir, log): shell('printf "{genome}:\n" > {log}'.format(genome=genome, log=log)) shell( - "prodigal -i {fasta} -o {gff} -d {fna} -a {faa} -p sinlge -c -m -f gff 2>> {log} ".format( + "prodigal -i {fasta} -o {gff} -d {fna} -a {faa} -p single -c -m -f gff 2>> {log} ".format( fasta=fasta, log=log, gff=gff, fna=fna, faa=faa ) ) diff --git a/workflow/rules/sample_table.smk b/workflow/rules/sample_table.smk index 6c6adb56..1c5f7f93 100644 --- a/workflow/rules/sample_table.smk +++ b/workflow/rules/sample_table.smk @@ -96,7 +96,7 @@ else: if (len(colum_headers_raw) == 0) and (len(colum_headers_QC) == 0): raise IOError( "Either raw reas or QC reads need to be in the sample table. " - "I din't find any columnns with 'Reads_raw_' or 'Reads_QC_' " + "I din't find any columns with 'Reads_raw_' or 'Reads_QC_' " ) diff --git a/workflow/rules/sra.smk b/workflow/rules/sra.smk index 72cc56e0..1a2373f1 100644 --- a/workflow/rules/sra.smk +++ b/workflow/rules/sra.smk @@ -3,97 +3,64 @@ wildcard_constraints: localrules: - prefetch, + kingfisher_get, + merge_runs_to_sample, SRA_read_fractions = ["_1", "_2"] if PAIRED_END else [""] -SRA_SUBDIR_RUN = "SRA/Runs" +SRA_SUBDIR_RUN = Path("SRA/Runs") -rule prefetch: - output: - sra=temp(touch(SRA_SUBDIR_RUN + "/{sra_run}/{sra_run}_downloaded")), - # not givins sra file as output allows for continue from the same download - params: - outdir=SRA_SUBDIR_RUN, # prefetch creates file in subfolder with run name automatically - log: - "logs/SRAdownload/prefetch/{sra_run}.log", - benchmark: - "logs/benchmarks/SRAdownload/prefetch/{sra_run}.tsv" - threads: 1 - resources: - mem_mb=1000, - time_min=60 * int(config["runtime"]["simplejob"]), - internet_connection=1, - conda: - "%s/sra.yaml" % CONDAENV - shell: - " mkdir -p {params.outdir} 2> {log} " - " ; " - " prefetch " - " --output-directory {params.outdir} " - " -X 999999999 " - " --progress " - " --log-level info " - " {wildcards.sra_run} &>> {log} " - " ; " - " vdb-validate {params.outdir}/{wildcards.sra_run}/{wildcards.sra_run}.sra &>> {log} " +RunTable = None -rule extract_run: - input: - flag=rules.prefetch.output, +def load_runtable(): + global RunTable + if RunTable is None: + from atlas.init import parse_sra + + RunTable = parse_sra.load_and_validate_runinfo_table() + return RunTable + + +def get_run_ids_for_sample(wildcards): + RunTable = load_runtable() + from atlas.init import parse_sra + + return parse_sra.get_run_ids_for_sample(RunTable, wildcards.sample) + + +rule kingfisher_get: output: - temp( - expand( - SRA_SUBDIR_RUN + "/{{sra_run}}/{{sra_run}}{fraction}.fastq.gz", - fraction=SRA_read_fractions, - ) - ), + #dir = temp(directory("Reads/tmp/runs/{sample}")), + flag=temp(touch("Reads/tmp/flags/{sample}.downloaded")), params: - outdir=os.path.abspath(SRA_SUBDIR_RUN + "/{sra_run}"), - sra_file=SRA_SUBDIR_RUN + "/{sra_run}/{sra_run}.sra", + run_ids=get_run_ids_for_sample, + download_methods="ena-ascp ena-ftp prefetch", + output_dir=lambda wc: SRA_SUBDIR_RUN / wc.sample, log: - "logs/SRAdownload/extract/{sra_run}.log", - benchmark: - "logs/benchmarks/SRAdownload/fasterqdump/{sra_run}.tsv" - threads: config["simplejob_threads"] + Path("log/download_reads/download/{sample}.log").resolve(), + threads: config["threads"] resources: - time_min=60 * int(config["runtime"]["simplejob"]), - mem_mb=1000, #default 100Mb + mem_mb=config["mem"] * 1000, + time_min=config["runtime"]["long"] * 60, + ncbi_connection=1, conda: - "%s/sra.yaml" % CONDAENV + "../envs/kingfisher.yaml" shell: - " vdb-validate {params.sra_file} &>> {log} " - " ; " - " parallel-fastq-dump " - " --threads {threads} " - " --gzip --split-files " - " --outdir {params.outdir} " - " --tmpdir {resources.tmpdir} " - " --skip-technical --split-3 " - " -s {params.sra_file} &>> {log} " + " mkdir -p {params.output_dir} ; " + " cd {params.output_dir} " " ; " - " rm -f {params.sra_file} 2>> {log} " - - -RunTable = None - - -def get_runids_for_biosample(wildcards): - global RunTable - if RunTable is None: - from atlas.init.parse_sra import load_and_validate_runinfo_table + "kingfisher get --run-identifiers {params.run_ids} " + " --download-threads 2 --extraction-threads {threads} " + " --hide-download-progress " + " --output-format-possibilities 'fastq.gz' " + " --force --check-md5sums " + " --download-methods {params.download_methods} " + " -f fastq.gz &> {log}" - RunTable = load_and_validate_runinfo_table("RunInfo.tsv") - run_ids = RunTable.query(f"BioSample == '{wildcards.sample}'").index.tolist() - - return run_ids - - -def get_runs_for_biosample(wildcards): - run_ids = get_runids_for_biosample(wildcards) +def get_run_fastq_for_sample(run_ids): ReadFiles = {} for fraction in SRA_read_fractions: @@ -103,7 +70,7 @@ def get_runs_for_biosample(wildcards): key = fraction ReadFiles[key] = expand( - SRA_SUBDIR_RUN + "/{sra_run}/{sra_run}{fraction}.fastq.gz", + str(SRA_SUBDIR_RUN / "{sample}/{sra_run}{fraction}.fastq.gz"), fraction=fraction, sra_run=run_ids, ) @@ -113,20 +80,32 @@ def get_runs_for_biosample(wildcards): rule merge_runs_to_sample: input: - unpack(get_runs_for_biosample), + flag="Reads/tmp/flags/{sample}.downloaded", output: expand( "SRA/Samples/{{sample}}/{{sample}}{fraction}.fastq.gz", fraction=SRA_read_fractions, ), + params: + run_ids=get_run_ids_for_sample, threads: 1 run: from utils import io for i, fraction in enumerate(SRA_read_fractions): - if fraction == "": - fraction = "se" - io.cat_files(input[fraction], output[i]) + + run_fastqs = expand( + SRA_SUBDIR_RUN / "{sample}/{sra_run}{fraction}.fastq.gz", + fraction=fraction, + sra_run=params.run_ids, + sample=wildcards.sample, + ) + + assert all([Path(f).exists() for f in run_fastqs]), ( + " Not all fastq files exist. Expected: %s" % run_fastqs + ) + + io.cat_files(run_fastqs, output[i]) rule download_sra: diff --git a/workflow/scripts/cluster_species.py b/workflow/scripts/cluster_species.py index e973d4cc..a1a3d2c2 100644 --- a/workflow/scripts/cluster_species.py +++ b/workflow/scripts/cluster_species.py @@ -29,7 +29,7 @@ def handle_exception(exc_type, exc_value, exc_traceback): # Install exception handler sys.excepthook = handle_exception -#### Begining of scripts +#### Beginning of scripts import pandas as pd @@ -93,7 +93,7 @@ def get_float(value): n_pre_clusters = nx.connected.number_connected_components(G) -logging.info(f"Found {n_pre_clusters} pre-clusters, itterate over them.") +logging.info(f"Found {n_pre_clusters} pre-clusters, iterate over them.") logging.debug(f"Cluster with threshold {threshold} and {linkage_method}-linkage method") for i, cc in enumerate(nx.connected_components(G)): logging.info(f"Precluster {i+1}/{n_pre_clusters} with {len(cc)} genomes") @@ -165,7 +165,7 @@ def get_float(value): # drop low quality genomes - logging.info("Drop low quality genomes acording to filtercriteria") + logging.info("Drop low quality genomes according to filter criteria") try: filter_criteria = snakemake.config["genome_filter_criteria"] @@ -229,7 +229,7 @@ def get_float(value): n_species = mag2Species.SpeciesNr.unique().shape[0] logging.info(f"Identified {n_species } species in total") -# create propper species names +# create proper species names n_leading_zeros = len(str(mag2Species.SpeciesNr.max())) format_int = "sp{:0" + str(n_leading_zeros) + "d}" mag2Species["Species"] = mag2Species.SpeciesNr.apply(format_int.format) @@ -239,13 +239,13 @@ def get_float(value): logging.info("Define Quality score defined as Completeness - 5x Contamination") -# recalulate quality score as some completeness might be recalibrated. +# recalculate quality score as some completeness might be recalibrated. Q.eval("Quality_score = Completeness - 5* Contamination", inplace=True) quality_score = Q.Quality_score assert ( not quality_score.isnull().any() -), "I have NA quality values for thq quality score, it seems not all of the values defined in the quality_score_formula are presentfor all entries in tables/Genome_quality.tsv " +), "I have NA quality values for the quality score, it seems not all of the values defined in the quality_score_formula are presentfor all entries in tables/Genome_quality.tsv " # select representative diff --git a/workflow/scripts/combine_busco.py b/workflow/scripts/combine_busco.py index eea83ad2..4e063884 100644 --- a/workflow/scripts/combine_busco.py +++ b/workflow/scripts/combine_busco.py @@ -29,7 +29,7 @@ def handle_exception(exc_type, exc_value, exc_traceback): # Install exception handler sys.excepthook = handle_exception -#### Begining of scripts +#### Beginning of scripts import pandas as pd from utils.parsers import read_busco_output diff --git a/workflow/scripts/combine_checkm.py b/workflow/scripts/combine_checkm.py index dc3d0a16..63ed27df 100644 --- a/workflow/scripts/combine_checkm.py +++ b/workflow/scripts/combine_checkm.py @@ -29,7 +29,7 @@ def handle_exception(exc_type, exc_value, exc_traceback): # Install exception handler sys.excepthook = handle_exception -#### Begining of scripts +#### Beginning of scripts import pandas as pd from utils.parsers import read_checkm_output diff --git a/workflow/scripts/combine_checkm2.py b/workflow/scripts/combine_checkm2.py index 69f85155..71813ad3 100644 --- a/workflow/scripts/combine_checkm2.py +++ b/workflow/scripts/combine_checkm2.py @@ -29,7 +29,7 @@ def handle_exception(exc_type, exc_value, exc_traceback): # Install exception handler sys.excepthook = handle_exception -#### Begining of scripts +#### Beginning of scripts import pandas as pd from utils.parsers import read_checkm2_output diff --git a/workflow/scripts/combine_gene_coverages.py b/workflow/scripts/combine_gene_coverages.py index 1c9bdf14..2705d876 100644 --- a/workflow/scripts/combine_gene_coverages.py +++ b/workflow/scripts/combine_gene_coverages.py @@ -28,7 +28,7 @@ def handle_exception(exc_type, exc_value, exc_traceback): # Install exception handler sys.excepthook = handle_exception -#### Begining of script +#### Beginning of script import numpy as np import pandas as pd import gc, os @@ -94,7 +94,7 @@ def measure_memory(write_log_entry=True): "data", shape=gene_matrix_shape, fillvalue=0, compression="gzip" ) - # add Smaple names attribute + # add Sample names attribute sample_names = np.array(list(snakemake.params.samples)).astype("S") combined_cov.attrs["sample_names"] = sample_names combined_counts.attrs["sample_names"] = sample_names diff --git a/workflow/scripts/combine_taxonomy.py b/workflow/scripts/combine_taxonomy.py index c873205c..7647ec78 100644 --- a/workflow/scripts/combine_taxonomy.py +++ b/workflow/scripts/combine_taxonomy.py @@ -28,7 +28,7 @@ def handle_exception(exc_type, exc_value, exc_traceback): # Install exception handler sys.excepthook = handle_exception -#### Begining of scripts +#### Beginning of scripts import pandas as pd import numpy as np diff --git a/workflow/scripts/filter_genomes.py b/workflow/scripts/filter_genomes.py index 855186b9..f1cb2dc7 100644 --- a/workflow/scripts/filter_genomes.py +++ b/workflow/scripts/filter_genomes.py @@ -74,7 +74,7 @@ def handle_exception(exc_type, exc_value, exc_traceback): if Q.shape[0] == 0: logging.error( - f"No bins passed filtering criteria! Bad luck!. You might want to tweek the filtering criteria. Also check the {snakemake.input.quality}" + f"No bins passed filtering criteria! Bad luck!. You might want to tweak the filtering criteria. Also check the {snakemake.input.quality}" ) exit(1) diff --git a/workflow/scripts/gene2genome.py b/workflow/scripts/gene2genome.py index e65bb8f7..995f3279 100644 --- a/workflow/scripts/gene2genome.py +++ b/workflow/scripts/gene2genome.py @@ -28,7 +28,7 @@ def handle_exception(exc_type, exc_value, exc_traceback): # Install exception handler sys.excepthook = handle_exception -#### Begining of script +#### Beginning of script import pandas as pd from utils import gene_scripts diff --git a/workflow/scripts/get_fasta_of_bins.py b/workflow/scripts/get_fasta_of_bins.py index ff8c65d7..1d99c478 100644 --- a/workflow/scripts/get_fasta_of_bins.py +++ b/workflow/scripts/get_fasta_of_bins.py @@ -46,7 +46,7 @@ def get_fasta_of_bins(cluster_attribution, contigs_file, out_folder): Creates individual fasta files for each bin using the contigs fasta and the cluster attribution. input: - - cluster attribution file: tab seperated file of "contig_fasta_header bin" + - cluster attribution file: tab separated file of "contig_fasta_header bin" - contigs: fasta file of contigs - out_prefix: output_prefix for bin fastas {out_folder}/{binid}.fasta """ diff --git a/workflow/scripts/get_read_stats.py b/workflow/scripts/get_read_stats.py index 5fc55a64..4678e9e5 100644 --- a/workflow/scripts/get_read_stats.py +++ b/workflow/scripts/get_read_stats.py @@ -30,7 +30,7 @@ def handle_exception(exc_type, exc_value, exc_traceback): sys.excepthook = handle_exception -# begining of script +# beginning of script import datetime import shutil diff --git a/workflow/scripts/parse_vamb.py b/workflow/scripts/parse_vamb.py index 01f7cdb8..759b19fd 100644 --- a/workflow/scripts/parse_vamb.py +++ b/workflow/scripts/parse_vamb.py @@ -117,7 +117,7 @@ def handle_exception(exc_type, exc_value, exc_traceback): clusters.loc[~clusters.Large_enough, "SampleBin"] = "" -logging.info(f"Write reformated table to {output_culsters}") +logging.info(f"Write reformatted table to {output_culsters}") clusters.to_csv(output_culsters, sep="\t", index=False) # filter for following diff --git a/workflow/scripts/utils/genome_dist.py b/workflow/scripts/utils/genome_dist.py index 676872d9..212ba325 100644 --- a/workflow/scripts/utils/genome_dist.py +++ b/workflow/scripts/utils/genome_dist.py @@ -94,7 +94,7 @@ def load_bindash(dist_file, simplify_names=True): ['Genome1','Genome2','Distance','Pvalue','Fraction','Nmapped','Ntotal','ANI'] in header. - Bindash tables are not necessarily simetrical. + Bindash tables are not necessarily symmetrical. """ F = load_ani_table_( dist_file, ["Distance", "Pvalue", "Fraction"], simplify_names=simplify_names @@ -147,10 +147,10 @@ def pairewise2matrix(M, fillna=np.nan): """ This functions turns a pairewise genome ANI table [genome1, genome2, column...] In to a matrix [genome1 genome2] of the values of column. - When ANI values are symetrical (with minimal error), + When ANI values are symmetrical (with minimal error), usually only one halve of NxN possibilities values are calculated. - During the process missing values are inputed with 0 + During the process missing values are inputted with 0 Diagonal values are set to 1 @@ -195,7 +195,7 @@ def group_species_linkage(M, threshold=0.95, fillna=None, linkage_method="averag M is a series of ANI """ assert type(M) == pd.Series - verify_expected_range(threshold, 0.3, 1, "clustering thrshold") + verify_expected_range(threshold, 0.3, 1, "clustering threshold") verify_expected_range(M.max(), 0.05, 1, "ANI max") verify_expected_range(M.min(), 0.05, 1, "ANI min") diff --git a/workflow/scripts/utils/genome_stats.py b/workflow/scripts/utils/genome_stats.py index 0775df12..38f66157 100644 --- a/workflow/scripts/utils/genome_stats.py +++ b/workflow/scripts/utils/genome_stats.py @@ -60,14 +60,14 @@ def genome_stats(fasta_file, number_of_n_for_split=10): faiter = (x[1] for x in groupby(fasta, lambda line: line[0] == ">")) for record in faiter: - # reccord contains header + # record contains header ## join sequence lines sequence = "".join(s.strip() for s in faiter.__next__()) sequence = sequence.upper() verify_dna(sequence, is_upper=True) - # count ambigous bases + # count ambiguous bases ambigious_bases += sequence.count("N") # get set of scaffold lengths diff --git a/workflow/scripts/utils/io.py b/workflow/scripts/utils/io.py index 57640a99..73f2b852 100644 --- a/workflow/scripts/utils/io.py +++ b/workflow/scripts/utils/io.py @@ -60,7 +60,7 @@ def cat_files(files, outfilename, gzip=False): def convert_percentages(df): - """Convet all columns with strings and % at the end to percentages""" + """Convert all columns with strings and % at the end to percentages""" for col in df.columns: if df.dtypes[col] == "object": if df[col].iloc[0].endswith("%"): diff --git a/workflow/scripts/utils/parsers_bbmap.py b/workflow/scripts/utils/parsers_bbmap.py index 41338ae9..37c1af38 100644 --- a/workflow/scripts/utils/parsers_bbmap.py +++ b/workflow/scripts/utils/parsers_bbmap.py @@ -124,9 +124,9 @@ def read_bbsplit_bincov(bbsplit_bincov_file): # split first index `genome$contig` in two index = pd.Series(binCov.index.levels[0], index=binCov.index.levels[0]) - splitted = index.str.split("$", expand=True) - splitted.columns = ["Genome", "Contig"] - new_index = splitted.loc[binCov.index.get_level_values(0)] + Split = index.str.split("$", expand=True) + Split.columns = ["Genome", "Contig"] + new_index = Split.loc[binCov.index.get_level_values(0)] new_index["Position"] = binCov.index.get_level_values(1).values binCov.index = pd.MultiIndex.from_frame(new_index) diff --git a/workflow/scripts/utils/taxonomy.py b/workflow/scripts/utils/taxonomy.py index 9f661197..bfbd3ba1 100644 --- a/workflow/scripts/utils/taxonomy.py +++ b/workflow/scripts/utils/taxonomy.py @@ -16,7 +16,7 @@ def tax2table(Taxonomy_Series, split_character=";", remove_prefix=False): # drop missing values if Taxonomy_Series.isnull().any(): warnings.warn( - "Some samples have no taxonomy asigned. Samples:\n" + "Some samples have no taxonomy assigned. Samples:\n" + ", ".join(Taxonomy_Series.index[Taxonomy_Series.isnull()].astype(str)) )