diff --git a/mutant/cli.py b/mutant/cli.py index bf1c101..3320f01 100644 --- a/mutant/cli.py +++ b/mutant/cli.py @@ -10,7 +10,6 @@ from mutant import version, log, WD, TIMESTAMP from mutant.modules.artic_illumina.start import RunSC2 -from mutant.modules.artic_nanopore.parser import ParserNanopore from mutant.modules.artic_nanopore.reformat import reformat_fastq_folder from mutant.modules.artic_nanopore.report import ReportPrinterNanopore from mutant.modules.generic_parser import get_json @@ -100,20 +99,12 @@ def sarscov2( if nanopore: if config_case != "": - parser = ParserNanopore( - caseinfo=config_case, - ) - result: dict = parser.collect_results( - resdir=os.path.abspath(resdir), barcode_to_sampleid=barcode_to_sampleid - ) - variants: list = parser.collect_variants( - resdir=os.path.abspath(resdir), barcode_to_sampleid=barcode_to_sampleid - ) - report_printer = ReportPrinterNanopore( + nanopore_report = ReportPrinterNanopore( caseinfo=config_case, indir=os.path.abspath(resdir), + barcode_to_sampleid=barcode_to_sampleid, ) - report_printer.create_all_nanopore_files(result=result, variants=variants) + nanopore_report.create_all_nanopore_files() else: # Report diff --git a/mutant/modules/artic_illumina/report.py b/mutant/modules/artic_illumina/report.py index b1bbefb..5d4fc9f 100644 --- a/mutant/modules/artic_illumina/report.py +++ b/mutant/modules/artic_illumina/report.py @@ -11,7 +11,6 @@ import yaml import json from datetime import date -from pathlib import Path from mutant.modules.generic_parser import ( get_sarscov2_config, read_filelines, @@ -21,6 +20,7 @@ get_artic_results, get_results_paths, ) +from mutant.modules.generic_reporter import GenericReporter class ReportSC2: @@ -41,7 +41,11 @@ def __init__(self, caseinfo, indir, config_artic, fastq_dir, timestamp): self.articdata = dict() def create_all_files(self): - self.create_trailblazer_config() + generic_reporter = GenericReporter( + indir=self.indir, + nanopore=False, + ) + generic_reporter.create_trailblazer_config() self.load_lookup_dict() self.create_concat_pangolin() self.create_concat_pangolin_fohm() @@ -55,29 +59,6 @@ def create_all_files(self): self.create_jsonfile() self.create_instrument_properties() - def get_finished_slurm_ids(self) -> list: - """Get slurm IDs""" - - trace_file_path = Path(self.indir, "pipeline_info", "execution_trace.txt") - slurm_id_list = [] - with open(trace_file_path, "r") as trace_file_contents: - for line in trace_file_contents: - slurm_id = line.split()[2] - try: - slurm_id_list.append(int(slurm_id)) - except Exception: - continue - return slurm_id_list - - def create_trailblazer_config(self) -> None: - """Create Trailblazer config file""" - - trailblazer_config_path = Path(self.indir, "trailblazer_config.yaml") - finished_slurm_ids = self.get_finished_slurm_ids() - if not finished_slurm_ids: - return - with open(trailblazer_config_path, "w") as trailblazer_config_file: - yaml.dump(data={"jobs": finished_slurm_ids}, stream=trailblazer_config_file) def load_lookup_dict(self): """Loads articdata with data from various sources. Atm, artic output and the case config input file""" diff --git a/mutant/modules/artic_nanopore/parser.py b/mutant/modules/artic_nanopore/parser.py index e03b5cd..7f98a70 100644 --- a/mutant/modules/artic_nanopore/parser.py +++ b/mutant/modules/artic_nanopore/parser.py @@ -8,274 +8,275 @@ QC_PASS_THRESHOLD_COVERAGE_10X_OR_HIGHER = 90 -class ParserNanopore: - def __init__(self, caseinfo: str): - self.caseinfo = caseinfo - - def get_line(self, filename: str, line_index_of_interest: int) -> str: - """Return a certain line of a given file""" - with open(filename) as opened_file: - for i, line in enumerate(opened_file): - if i == line_index_of_interest: - return line - - def get_cust_sample_id(self, line_to_parse: str, barcode_to_sampleid: dict) -> str: - """Return the customer ID of a sample""" - split_on_slash = line_to_parse.split("/") - sample_folder = split_on_slash[0] - split_on_underscore = sample_folder.split("_") - barcode = split_on_underscore[-1] - cust_sample_id = barcode_to_sampleid[barcode] - return cust_sample_id - - def get_data_from_config(self, parsed_config: dict) -> dict: - """Collect data for selection criteria and region""" - data_to_report = {} - for sample in parsed_config: - cust_sample_id = sample["Customer_ID_sample"] - data_to_report[cust_sample_id] = {} - data_to_report[cust_sample_id]["selection_criteria"] = sample[ - "selection_criteria" - ] - data_to_report[cust_sample_id]["region_code"] = sample["region_code"] - return data_to_report - - def get_fraction_n(self, input_file: str) -> float: - """Calculates fraction of N bases in a fasta file""" - with open(input_file, "r") as fasta: - total_bases = 0 - total_N = 0 - for line in fasta: - stripped_line = line.strip() - total_bases += len(stripped_line) - total_N += stripped_line.count("N") - percentage_N_two_decimals = round((total_N / total_bases) * 100, 2) - return percentage_N_two_decimals - - def parse_assembly( - self, results: dict, resdir: str, barcode_to_sampleid: dict - ) -> dict: - """Collects data by parsing the assembly""" - base_path = "/".join( - [resdir, "articNcovNanopore_sequenceAnalysisMedaka_articMinIONMedaka"] - ) - for filename in os.listdir(base_path): - if filename.endswith(".consensus.fasta"): - abs_path = os.path.join(base_path, filename) - first_line: str = self.get_line( - filename=abs_path, line_index_of_interest=0 - ) - cust_sample_id: str = self.get_cust_sample_id( - line_to_parse=first_line, barcode_to_sampleid=barcode_to_sampleid - ) - fraction_N: float = self.get_fraction_n(input_file=abs_path) - results[cust_sample_id]["fraction_n_bases"] = fraction_N - return results - - def get_depth_files_paths(self, barcode: str, base_path: str) -> list: - """Returns the paths to coverage statistics for a certain barcode""" - wild_card_path = "*".join([base_path, barcode, "depths"]) - depth_files = glob.glob(wild_card_path) - return depth_files - - def count_bases_w_10x_cov_or_more(self, coverage_stats: list) -> int: - """Counts how many times a number is higher than 10 in a list""" - numbers_higher_than_10 = 0 - for number in coverage_stats: - if number >= 10: - numbers_higher_than_10 += 1 - return numbers_higher_than_10 - - def initiate_coverage_stats_list(self, file_path: str) -> list: - """Collect coverage statistics""" - coverage_stats = [] - with open(file_path, "r") as file1: - for line in file1: - stripped_line = line.strip() - columns: list = stripped_line.split("\t") - coverage_stats.append(int(columns[3])) - return coverage_stats - - def calculate_coverage( - self, results: dict, resdir: str, barcode_to_sampleid: dict - ) -> dict: - """Collects data for the fraction of each assembly that got 10x coverage or more""" - base_path = "/".join( - [resdir, "articNcovNanopore_sequenceAnalysisMedaka_articMinIONMedaka/"] - ) - for barcode in barcode_to_sampleid: - coverage_files_paths: list = self.get_depth_files_paths( - barcode=barcode, base_path=base_path - ) - coverage_stats: list = self.initiate_coverage_stats_list( - file_path=coverage_files_paths[0] - ) - with open(coverage_files_paths[1], "r") as file2: - for line in file2: - stripped_line = line.strip() - columns: list = stripped_line.split("\t") - coverage_stats[int(columns[2])] += int(columns[3]) - bases_w_10x_cov_or_more: int = self.count_bases_w_10x_cov_or_more( - coverage_stats=coverage_stats - ) - percentage_equal_or_greater_than_10 = round( - (bases_w_10x_cov_or_more / len(coverage_stats)) * 100, 2 - ) - results[barcode_to_sampleid[barcode]][ - "pct_10x_coverage" - ] = percentage_equal_or_greater_than_10 - if ( - percentage_equal_or_greater_than_10 - >= QC_PASS_THRESHOLD_COVERAGE_10X_OR_HIGHER - ): - results[barcode_to_sampleid[barcode]]["qc_pass"] = "TRUE" - else: - results[barcode_to_sampleid[barcode]]["qc_pass"] = "FALSE" - return results - - def get_pangolin_type(self, raw_pangolin_result: str) -> str: - """Return the pangolin type of a sample""" - split_on_comma = raw_pangolin_result.split(",") - lineage = split_on_comma[1] - return lineage - - def get_pangoLEARN_version(self, raw_pangolin_result: str) -> str: - """Return the pangoLEARN_version of a sample""" - split_on_comma = raw_pangolin_result.split(",") - pango_learn_version = split_on_comma[9] - return pango_learn_version - - def identify_classifications(self) -> dict: - """Parse which lineages are classified as VOC/VOI etc""" - classifications_path = "{0}/standalone/classifications.csv".format(WD) - voc_strains: dict = parse_classifications(csv_path=classifications_path) - return voc_strains - - def parse_pangolin( - self, results: dict, barcode_to_sampleid: dict, resdir: str - ) -> dict: - """Collect data for pangolin types""" - base_path = "/".join( - [resdir, "articNcovNanopore_sequenceAnalysisMedaka_pangolinTyping"] - ) - for filename in os.listdir(base_path): +def get_line(filename: str, line_index_of_interest: int) -> str: + """Return a certain line of a given file""" + with open(filename) as opened_file: + for i, line in enumerate(opened_file): + if i == line_index_of_interest: + return line + + +def get_cust_sample_id(line_to_parse: str, barcode_to_sampleid: dict) -> str: + """Return the customer ID of a sample""" + split_on_slash = line_to_parse.split("/") + sample_folder = split_on_slash[0] + split_on_underscore = sample_folder.split("_") + barcode = split_on_underscore[-1] + cust_sample_id = barcode_to_sampleid[barcode] + return cust_sample_id + + +def get_data_from_config(parsed_config: dict) -> dict: + """Collect data for selection criteria and region""" + data_to_report = {} + for sample in parsed_config: + cust_sample_id = sample["Customer_ID_sample"] + data_to_report[cust_sample_id] = {} + data_to_report[cust_sample_id]["selection_criteria"] = sample[ + "selection_criteria" + ] + data_to_report[cust_sample_id]["region_code"] = sample["region_code"] + return data_to_report + + +def get_fraction_n(input_file: str) -> float: + """Calculates fraction of N bases in a fasta file""" + with open(input_file, "r") as fasta: + total_bases = 0 + total_N = 0 + for line in fasta: + stripped_line = line.strip() + total_bases += len(stripped_line) + total_N += stripped_line.count("N") + percentage_N_two_decimals = round((total_N / total_bases) * 100, 2) + return percentage_N_two_decimals + + +def parse_assembly(results: dict, resdir: str, barcode_to_sampleid: dict) -> dict: + """Collects data by parsing the assembly""" + base_path = "/".join( + [resdir, "articNcovNanopore_sequenceAnalysisMedaka_articMinIONMedaka"] + ) + for filename in os.listdir(base_path): + if filename.endswith(".consensus.fasta"): abs_path = os.path.join(base_path, filename) - second_line: str = self.get_line( - filename=abs_path, line_index_of_interest=1 - ) - cust_sample_id: str = self.get_cust_sample_id( - line_to_parse=second_line, barcode_to_sampleid=barcode_to_sampleid - ) - pangolin_type: str = self.get_pangolin_type(raw_pangolin_result=second_line) - results[cust_sample_id]["pangolin_type"] = pangolin_type - pangoLEARN_version: str = self.get_pangoLEARN_version( - raw_pangolin_result=second_line + first_line: str = get_line(filename=abs_path, line_index_of_interest=0) + cust_sample_id: str = get_cust_sample_id( + line_to_parse=first_line, barcode_to_sampleid=barcode_to_sampleid ) - results[cust_sample_id]["pangolearn_version"] = pangoLEARN_version - voc_strains: dict = self.identify_classifications() - if pangolin_type in voc_strains["lineage"]: - index = voc_strains["lineage"].index(pangolin_type) - results[cust_sample_id]["voc"] = voc_strains["class"][index] - else: - results[cust_sample_id]["voc"] = "No" - return results - - def get_mutations_of_interest(self) -> list: - """Collects data for mutations of interest into a list""" - mutations_of_interest = "{0}/standalone/spike_mutations.csv".format(WD) - mutations_list = [] - with open(mutations_of_interest, "r") as csv: - next(csv) - for line in csv: - stripped_line = line.strip() - mutations_list.append(stripped_line) - return mutations_list - - def get_sample_id_from_filename( - self, filename: str, barcode_to_sampleid: dict - ) -> str: - """Returns sample ID that correspond to a specific file""" - split_on_dot = filename.split(".") - prefix = split_on_dot[0] - split_on_underscore = prefix.split("_") - barcode = split_on_underscore[-1] - cust_sample_id = barcode_to_sampleid[barcode] - return cust_sample_id - - def initiate_mutations_dict(self, results: dict) -> dict: - """There wont be variant files for all samples, this give all keys in the dict a value""" - samples = results.keys() - for sample in samples: - results[sample]["mutations"] = "-" - return results - - def parse_mutations( - self, results: dict, resdir: str, barcode_to_sampleid: dict - ) -> dict: - """If a mutation of interest is present in a sample it will be added to a dict""" - mutations_of_interest: list = self.get_mutations_of_interest() - base_path = "/".join( - [resdir, "articNcovNanopore_Genotyping_typeVariants", "variants"] + fraction_N: float = get_fraction_n(input_file=abs_path) + results[cust_sample_id]["fraction_n_bases"] = fraction_N + return results + + +def get_depth_files_paths(barcode: str, base_path: str) -> list: + """Returns the paths to coverage statistics for a certain barcode""" + wild_card_path = "*".join([base_path, barcode, "depths"]) + depth_files = glob.glob(wild_card_path) + return depth_files + + +def count_bases_w_10x_cov_or_more(coverage_stats: list) -> int: + """Counts how many times a number is higher than 10 in a list""" + numbers_higher_than_10 = 0 + for number in coverage_stats: + if number >= 10: + numbers_higher_than_10 += 1 + return numbers_higher_than_10 + + +def initiate_coverage_stats_list(file_path: str) -> list: + """Collect coverage statistics""" + coverage_stats = [] + with open(file_path, "r") as file1: + for line in file1: + stripped_line = line.strip() + columns: list = stripped_line.split("\t") + coverage_stats.append(int(columns[3])) + return coverage_stats + + +def calculate_coverage(results: dict, resdir: str, barcode_to_sampleid: dict) -> dict: + """Collects data for the fraction of each assembly that got 10x coverage or more""" + base_path = "/".join( + [resdir, "articNcovNanopore_sequenceAnalysisMedaka_articMinIONMedaka/"] + ) + for barcode in barcode_to_sampleid: + coverage_files_paths: list = get_depth_files_paths( + barcode=barcode, base_path=base_path ) - results: dict = self.initiate_mutations_dict(results=results) - for filename in os.listdir(base_path): - abs_path = os.path.join(base_path, filename) - cust_sample_id: str = self.get_sample_id_from_filename( - filename=filename, barcode_to_sampleid=barcode_to_sampleid - ) - with open(abs_path, "r") as variant_file: - next(variant_file) - for line in variant_file: - split_on_comma = line.split(",") - if split_on_comma[2] in mutations_of_interest: - if results[cust_sample_id]["mutations"] == "-": - results[cust_sample_id]["mutations"] = split_on_comma[2] - else: - results[cust_sample_id]["mutations"] = ";".join( - [ - results[cust_sample_id]["mutations"], - split_on_comma[2], - ] - ) - return results - - def collect_results(self, resdir: str, barcode_to_sampleid: dict) -> dict: - """Build a dictionary with data for the report""" - parsed_config: dict = get_sarscov2_config(config=self.caseinfo) - results: dict = self.get_data_from_config(parsed_config=parsed_config) - results: dict = self.parse_assembly( - results=results, resdir=resdir, barcode_to_sampleid=barcode_to_sampleid + coverage_stats: list = initiate_coverage_stats_list( + file_path=coverage_files_paths[0] ) - results: dict = self.calculate_coverage( - results=results, resdir=resdir, barcode_to_sampleid=barcode_to_sampleid + with open(coverage_files_paths[1], "r") as file2: + for line in file2: + stripped_line = line.strip() + columns: list = stripped_line.split("\t") + coverage_stats[int(columns[2])] += int(columns[3]) + bases_w_10x_cov_or_more: int = count_bases_w_10x_cov_or_more( + coverage_stats=coverage_stats ) - results: dict = self.parse_pangolin( - results=results, barcode_to_sampleid=barcode_to_sampleid, resdir=resdir + percentage_equal_or_greater_than_10 = round( + (bases_w_10x_cov_or_more / len(coverage_stats)) * 100, 2 + ) + results[barcode_to_sampleid[barcode]][ + "pct_10x_coverage" + ] = percentage_equal_or_greater_than_10 + if ( + percentage_equal_or_greater_than_10 + >= QC_PASS_THRESHOLD_COVERAGE_10X_OR_HIGHER + ): + results[barcode_to_sampleid[barcode]]["qc_pass"] = "TRUE" + else: + results[barcode_to_sampleid[barcode]]["qc_pass"] = "FALSE" + return results + + +def get_pangolin_type(raw_pangolin_result: str) -> str: + """Return the pangolin type of a sample""" + split_on_comma = raw_pangolin_result.split(",") + lineage = split_on_comma[1] + return lineage + + +def get_pangoLEARN_version(raw_pangolin_result: str) -> str: + """Return the pangoLEARN_version of a sample""" + split_on_comma = raw_pangolin_result.split(",") + pango_learn_version = split_on_comma[9] + return pango_learn_version + + +def identify_classifications() -> dict: + """Parse which lineages are classified as VOC/VOI etc""" + classifications_path = "{0}/standalone/classifications.csv".format(WD) + voc_strains: dict = parse_classifications(csv_path=classifications_path) + return voc_strains + + +def parse_pangolin(results: dict, barcode_to_sampleid: dict, resdir: str) -> dict: + """Collect data for pangolin types""" + base_path = "/".join( + [resdir, "articNcovNanopore_sequenceAnalysisMedaka_pangolinTyping"] + ) + for filename in os.listdir(base_path): + abs_path = os.path.join(base_path, filename) + second_line: str = get_line(filename=abs_path, line_index_of_interest=1) + cust_sample_id: str = get_cust_sample_id( + line_to_parse=second_line, barcode_to_sampleid=barcode_to_sampleid ) - results: dict = self.parse_mutations( - results=results, resdir=resdir, barcode_to_sampleid=barcode_to_sampleid + pangolin_type: str = get_pangolin_type(raw_pangolin_result=second_line) + results[cust_sample_id]["pangolin_type"] = pangolin_type + pangoLEARN_version: str = get_pangoLEARN_version( + raw_pangolin_result=second_line ) - return results + results[cust_sample_id]["pangolearn_version"] = pangoLEARN_version + voc_strains: dict = identify_classifications() + if pangolin_type in voc_strains["lineage"]: + index = voc_strains["lineage"].index(pangolin_type) + results[cust_sample_id]["voc"] = voc_strains["class"][index] + else: + results[cust_sample_id]["voc"] = "No" + return results - def extract_barcode_from_variant_file(self, filename: str) -> str: - prefix = filename.split("_")[2] - barcode = prefix.split(".")[0] - return barcode - def collect_variants(self, resdir: str, barcode_to_sampleid: dict) -> list: - variants_list = [] - base_path = "/".join( - [resdir, "articNcovNanopore_Genotyping_typeVariants", "variants"] +def get_mutations_of_interest() -> list: + """Collects data for mutations of interest into a list""" + mutations_of_interest = "{0}/standalone/spike_mutations.csv".format(WD) + mutations_list = [] + with open(mutations_of_interest, "r") as csv: + next(csv) + for line in csv: + stripped_line = line.strip() + mutations_list.append(stripped_line) + return mutations_list + + +def get_sample_id_from_filename(filename: str, barcode_to_sampleid: dict) -> str: + """Returns sample ID that correspond to a specific file""" + split_on_dot = filename.split(".") + prefix = split_on_dot[0] + split_on_underscore = prefix.split("_") + barcode = split_on_underscore[-1] + cust_sample_id = barcode_to_sampleid[barcode] + return cust_sample_id + + +def initiate_mutations_dict(results: dict) -> dict: + """There wont be variant files for all samples, this give all keys in the dict a value""" + samples = results.keys() + for sample in samples: + results[sample]["mutations"] = "-" + return results + + +def parse_mutations(results: dict, resdir: str, barcode_to_sampleid: dict) -> dict: + """If a mutation of interest is present in a sample it will be added to a dict""" + mutations_of_interest: list = get_mutations_of_interest() + base_path = "/".join( + [resdir, "articNcovNanopore_Genotyping_typeVariants", "variants"] + ) + results: dict = initiate_mutations_dict(results=results) + for filename in os.listdir(base_path): + abs_path = os.path.join(base_path, filename) + cust_sample_id: str = get_sample_id_from_filename( + filename=filename, barcode_to_sampleid=barcode_to_sampleid ) - for filename in os.listdir(base_path): - abs_path = os.path.join(base_path, filename) - with open(abs_path, "r") as variant_file: - next(variant_file) - for line in variant_file: - barcode = self.extract_barcode_from_variant_file(filename=filename) - line_except_sampleid = line.split(",")[1] + "," + line.split(",")[2] - modified_line = ( - barcode_to_sampleid[barcode] + "," + line_except_sampleid - ) - variants_list.append(modified_line) - return variants_list + with open(abs_path, "r") as variant_file: + next(variant_file) + for line in variant_file: + split_on_comma = line.split(",") + if split_on_comma[2] in mutations_of_interest: + if results[cust_sample_id]["mutations"] == "-": + results[cust_sample_id]["mutations"] = split_on_comma[2] + else: + results[cust_sample_id]["mutations"] = ";".join( + [ + results[cust_sample_id]["mutations"], + split_on_comma[2], + ] + ) + return results + + +def collect_results(resdir: str, barcode_to_sampleid: dict, caseinfo: str) -> dict: + """Build a dictionary with data for the report""" + parsed_config: dict = get_sarscov2_config(config=caseinfo) + results: dict = get_data_from_config(parsed_config=parsed_config) + results: dict = parse_assembly( + results=results, resdir=resdir, barcode_to_sampleid=barcode_to_sampleid + ) + results: dict = calculate_coverage( + results=results, resdir=resdir, barcode_to_sampleid=barcode_to_sampleid + ) + results: dict = parse_pangolin( + results=results, barcode_to_sampleid=barcode_to_sampleid, resdir=resdir + ) + results: dict = parse_mutations( + results=results, resdir=resdir, barcode_to_sampleid=barcode_to_sampleid + ) + return results + + +def extract_barcode_from_variant_file(filename: str) -> str: + prefix = filename.split("_")[2] + barcode = prefix.split(".")[0] + return barcode + + +def collect_variants(resdir: str, barcode_to_sampleid: dict) -> list: + variants_list = [] + base_path = "/".join( + [resdir, "articNcovNanopore_Genotyping_typeVariants", "variants"] + ) + for filename in os.listdir(base_path): + abs_path = os.path.join(base_path, filename) + with open(abs_path, "r") as variant_file: + next(variant_file) + for line in variant_file: + barcode = extract_barcode_from_variant_file(filename=filename) + line_except_sampleid = line.split(",")[1] + "," + line.split(",")[2] + modified_line = ( + barcode_to_sampleid[barcode] + "," + line_except_sampleid + ) + variants_list.append(modified_line) + return variants_list diff --git a/mutant/modules/artic_nanopore/report.py b/mutant/modules/artic_nanopore/report.py index c166eca..14be699 100644 --- a/mutant/modules/artic_nanopore/report.py +++ b/mutant/modules/artic_nanopore/report.py @@ -1,21 +1,39 @@ """ Using a dict as input, this class will print a report covering the information requested by the sarscov2-customers at Clinical Genomics """ +import glob -from mutant.modules.generic_parser import get_sarscov2_config +from mutant.modules.artic_nanopore.parser import collect_results, collect_variants +from mutant.modules.generic_parser import get_sarscov2_config, read_filelines +from mutant.modules.generic_reporter import GenericReporter class ReportPrinterNanopore: - def __init__(self, caseinfo: str, indir: str): + def __init__(self, caseinfo: str, indir: str, barcode_to_sampleid: dict): self.casefile = caseinfo self.caseinfo = get_sarscov2_config(caseinfo) self.case = self.caseinfo[0]["case_ID"] self.ticket = self.caseinfo[0]["Customer_ID_project"] self.indir = indir + self.barcode_to_sampleid = barcode_to_sampleid - def create_all_nanopore_files(self, result: dict, variants: list): + def create_all_nanopore_files(self): + result: dict = collect_results( + resdir=self.indir, + barcode_to_sampleid=self.barcode_to_sampleid, + caseinfo=self.casefile, + ) + variants: list = collect_variants( + resdir=self.indir, barcode_to_sampleid=self.barcode_to_sampleid + ) self.print_report(result=result) self.print_variants(variants=variants) + generic_reporter = GenericReporter( + indir=self.indir, + nanopore=True, + ) + generic_reporter.create_trailblazer_config() + self.create_concat_pangolin() def print_variants(self, variants: list) -> None: """Append data to the variant report""" @@ -76,3 +94,32 @@ def print_report(self, result: dict) -> None: ) file_to_append.write(line_to_append) file_to_append.close() + + def extract_barcode_from_pangolin_csv(self, line: str) -> str: + """ line in format noblecat_220721-191417_barcode01/ARTIC/medaka_MN908947.3 """ + split_on_slash = line.split("/") + splid_on_underscore = split_on_slash[0].split("_") + return splid_on_underscore[2] + + def create_concat_pangolin(self): + """Concatenate nanopore pangolin results""" + + indir = "{0}/articNcovNanopore_sequenceAnalysisMedaka_pangolinTyping".format(self.indir) + concatfile = "{0}/{1}.pangolin.csv".format(self.indir, self.ticket) + pango_csvs = glob.glob("{0}/*.pangolin.csv".format(indir)) + + header = read_filelines(pango_csvs[0])[0] + with open(concatfile, "w") as concat: + concat.write(header) + # Parse sample pangolin data + for csv in pango_csvs: + data: list = read_filelines(csv)[1:] + for line in data: + split_on_comma = line.split(",") + barcode = self.extract_barcode_from_pangolin_csv(line=split_on_comma[0]) + split_on_comma[0] = self.barcode_to_sampleid[barcode] + concatenated_line = "" + for section in split_on_comma: + concatenated_line = ",".join([concatenated_line, section]) + formatted_line = concatenated_line[1:] + concat.write(formatted_line) diff --git a/mutant/modules/generic_reporter.py b/mutant/modules/generic_reporter.py new file mode 100644 index 0000000..4c1d30d --- /dev/null +++ b/mutant/modules/generic_reporter.py @@ -0,0 +1,35 @@ +from pathlib import Path + +import yaml + + +class GenericReporter: + def __init__(self, indir: str, nanopore: bool): + self.indir = indir + self.nanopore = nanopore + + + def get_finished_slurm_ids(self) -> list: + """Get slurm IDs""" + + trace_file_path = Path(self.indir, "pipeline_info", "execution_trace.txt") + slurm_id_list = [] + with open(trace_file_path, "r") as trace_file_contents: + for line in trace_file_contents: + slurm_id = line.split()[2] + try: + slurm_id_list.append(int(slurm_id)) + except Exception: + continue + return slurm_id_list + + def create_trailblazer_config(self) -> None: + """Create Trailblazer config file""" + + trailblazer_config_path = Path(self.indir, "trailblazer_config.yaml") + finished_slurm_ids = self.get_finished_slurm_ids() + if not finished_slurm_ids: + return + with open(trailblazer_config_path, "w") as trailblazer_config_file: + yaml.dump(data={"jobs": finished_slurm_ids}, stream=trailblazer_config_file) +