diff --git a/CHANGELOG.md b/CHANGELOG.md index 92ac3b5a..0eaa9b02 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ - Report and genes overview endpoints accept only POST requests with form data now (application/x-www-form-urlencoded) - no json - Sort alphabetically the list genes that are incompletely covered on report page - `d4_genes_condensed_summary` coverage endpoint will not convert `nan` or `inf` coverage values to None, but to str(value) +- Use new percent coverage option from d4tools to compute coverage completeness ### Fixed - Updated dependencies including `certifi` to address dependabot alert - Update pytest to v.7.4.4 to address a `ReDoS` vulnerability diff --git a/src/chanjo2/meta/handle_d4.py b/src/chanjo2/meta/handle_d4.py index c5622dbf..d0ae0258 100644 --- a/src/chanjo2/meta/handle_d4.py +++ b/src/chanjo2/meta/handle_d4.py @@ -1,3 +1,4 @@ +import logging import subprocess import tempfile from statistics import mean @@ -21,6 +22,8 @@ STOP_INDEX = 2 STATS_MEAN_COVERAGE_INDEX = 3 +LOG = logging.getLogger(__name__) + def get_d4tools_chromosome_mean_coverage( d4_file_path: str, chromosomes=List[str] @@ -64,107 +67,138 @@ def get_d4tools_intervals_coverage( ["d4tools", "stat", "--region", bed_file_path, d4_file_path, "--stat", "mean"], text=True, ) + return [ float(line.rstrip().split("\t")[3]) for line in d4tools_stats_mean_cmd.splitlines() ] +def get_d4tools_intervals_completeness( + d4_file_path: str, bed_file_path: str, completeness_thresholds: List[int] +) -> List[Dict]: + """Return coverage completeness over all intervals of a bed file.""" + covered_threshold_stats = [] + d4tools_stats_perc_cov: str = subprocess.check_output( + [ + "d4tools", + "stat", + "-s", + f"perc_cov={','.join(str(threshold) for threshold in completeness_thresholds)}", + "--region", + bed_file_path, + d4_file_path, + ], + text=True, + ) + for line in d4tools_stats_perc_cov.splitlines(): + stats_dict: Dict = dict( + ( + zip( + completeness_thresholds, + [float(stat) for stat in line.rstrip().split("\t")[3:]], + ) + ) + ) + covered_threshold_stats.append(stats_dict) + + return covered_threshold_stats + + def get_report_sample_interval_coverage( d4_file_path: str, sample_name: str, gene_ids_mapping: Dict[str, dict], - sql_intervals: List[Union[SQLGene, SQLTranscript, SQLExon]], - intervals_coords: List[str], + bed_lines: List[str], completeness_thresholds: List[Optional[int]], default_threshold: int, report_data: dict, ) -> None: """Compute stats to populate a coverage report and coverage overview for one sample.""" - if not intervals_coords: - return + # Write bed lines to a temporary file + with tempfile.NamedTemporaryFile(mode="w") as intervals_bed: + intervals_bed.write("\n".join(bed_lines)) + intervals_bed.flush() - # Compute intervals coverage - intervals_coverage: List[float] = get_d4tools_intervals_mean_coverage( - d4_file_path=d4_file_path, intervals=intervals_coords - ) - completeness_row_dict: dict = {"mean_coverage": mean(intervals_coverage)} + # Compute intervals coverage + intervals_coverage = get_d4tools_intervals_coverage( + d4_file_path=d4_file_path, bed_file_path=intervals_bed.name + ) + completeness_row_dict = {"mean_coverage": mean(intervals_coverage)} - # Compute intervals coverage completeness - interval_ids_coords: List[Tuple[str, Tuple[str, int, int]]] = [ - (interval.ensembl_id, (interval.chromosome, interval.start, interval.stop)) - for interval in sql_intervals - ] - intervals_coverage_completeness: Dict[str, dict] = ( - coverage_completeness_multitasker( + # Compute intervals completeness + intervals_completeness = get_d4tools_intervals_completeness( d4_file_path=d4_file_path, - thresholds=completeness_thresholds, - interval_ids_coords=interval_ids_coords, + bed_file_path=intervals_bed.name, + completeness_thresholds=completeness_thresholds, ) - ) - interval_ids = set() + if len(bed_lines) != len(intervals_coverage) or len(bed_lines) != len( + intervals_completeness + ): + LOG.error("Mismatch in the number of intervals for coverage and completeness") + return + + # Initialize mappings for completeness and intervals coverage + for gene_mapping in gene_ids_mapping.values(): + gene_mapping["completeness"] = { + threshold: [] for threshold in completeness_thresholds + } + gene_mapping["intervals_covered_below_custom_threshold"] = [] + thresholds_dict = {threshold: [] for threshold in completeness_thresholds} - incomplete_coverages_rows: List[Tuple[int, str, str, str, float]] = [] - nr_intervals_covered_under_custom_threshold: int = 0 - genes_covered_under_custom_threshold = set() - for interval_nr, interval in enumerate(sql_intervals): - if interval.ensembl_id in interval_ids: - continue + # Populate completeness data for each gene + for line_nr, line in enumerate(bed_lines): + ensembl_gene = line.rstrip().split("\t")[3] for threshold in completeness_thresholds: - interval_coverage_at_threshold: float = intervals_coverage_completeness[ - interval.ensembl_id - ][threshold] - thresholds_dict[threshold].append(interval_coverage_at_threshold) - - # Collect intervals which are not completely covered at the custom threshold - if threshold == default_threshold and interval_coverage_at_threshold < 1: - nr_intervals_covered_under_custom_threshold += 1 - interval_ensembl_gene: str = ( - interval.ensembl_id - if interval.ensembl_id.startswith("ENSG") - else interval.ensembl_gene_id - ) - interval_hgnc_id: int = gene_ids_mapping[interval_ensembl_gene][ - "hgnc_id" - ] - interval_hgnc_symbol: str = gene_ids_mapping[interval_ensembl_gene][ - "hgnc_symbol" - ] - genes_covered_under_custom_threshold.add(interval_hgnc_symbol) - incomplete_coverages_rows.append( - ( - interval_hgnc_symbol, - interval_hgnc_id, - interval.ensembl_id, - sample_name, - round(interval_coverage_at_threshold * 100, 2), - ) - ) - - interval_ids.add(interval.ensembl_id) + interval_coverage = intervals_completeness[line_nr][threshold] + thresholds_dict[threshold].append(interval_coverage) + gene_ids_mapping[ensembl_gene]["completeness"][threshold].append( + interval_coverage + ) - for threshold in completeness_thresholds: + # Calculate and store overall completeness data + for threshold, values in thresholds_dict.items(): completeness_row_dict[f"completeness_{threshold}"] = round( - mean(thresholds_dict[threshold]) * 100, 2 + mean(values) * 100, 2 ) - report_data["completeness_rows"].append((sample_name, completeness_row_dict)) - report_data["incomplete_coverage_rows"] += incomplete_coverages_rows - fully_covered_intervals_percent = round( - 100 - * (len(interval_ids) - nr_intervals_covered_under_custom_threshold) - / len(interval_ids), - 2, + + # Collect genes not completely covered at the default threshold + incomplete_coverages_rows = [] + nr_intervals_under_threshold = 0 + genes_under_threshold = [] + + for ensembl_gene, gene_mapping in gene_ids_mapping.items(): + mean_coverage = mean(gene_mapping["completeness"][default_threshold]) + if mean_coverage < 1: + nr_intervals_under_threshold += 1 + incomplete_coverages_rows.append( + ( + gene_mapping["hgnc_symbol"], + gene_mapping["hgnc_id"], + ensembl_gene, + sample_name, + round(mean_coverage * 100, 2), + ) + ) + genes_under_threshold.append(gene_mapping["hgnc_symbol"]) + + report_data["incomplete_coverage_rows"] = incomplete_coverages_rows + + # Calculate percentage of fully covered intervals + fully_covered_percent = round( + 100 * (len(bed_lines) - nr_intervals_under_threshold) / len(bed_lines), 2 ) + report_data["default_level_completeness_rows"].append( ( sample_name, - fully_covered_intervals_percent, - f"{nr_intervals_covered_under_custom_threshold}/{len(interval_ids)}", - genes_covered_under_custom_threshold, + fully_covered_percent, + f"{nr_intervals_under_threshold}/{len(bed_lines)}", + genes_under_threshold, ) ) diff --git a/src/chanjo2/meta/handle_report_contents.py b/src/chanjo2/meta/handle_report_contents.py index 78303c9f..07909ca6 100644 --- a/src/chanjo2/meta/handle_report_contents.py +++ b/src/chanjo2/meta/handle_report_contents.py @@ -132,8 +132,8 @@ def get_report_data( transcript_tags=[TranscriptTag.REFSEQ_MRNA], ) - intervals_coords: List[str] = [ - f"{interval.chromosome}\t{interval.start}\t{interval.stop}" + bed_lines: List[str] = [ + f"{interval.chromosome}\t{interval.start}\t{interval.stop}\t{interval.ensembl_gene_id or interval.ensembl_id}" for interval in sql_intervals ] @@ -142,8 +142,7 @@ def get_report_data( d4_file_path=sample.coverage_file_path, sample_name=sample.name, gene_ids_mapping=gene_ids_mapping, - sql_intervals=sql_intervals, - intervals_coords=intervals_coords, + bed_lines=bed_lines, completeness_thresholds=( [query.default_level] if is_overview else query.completeness_thresholds ),