diff --git a/src/genophenocorr/analysis/_api.py b/src/genophenocorr/analysis/_api.py index f07f0a329..cc4bab92d 100644 --- a/src/genophenocorr/analysis/_api.py +++ b/src/genophenocorr/analysis/_api.py @@ -6,7 +6,8 @@ import pandas as pd from genophenocorr.model import VariantEffect, Patient, FeatureType -from .predicate import PatientCategory +from .predicate import PolyPredicate, PatientCategory +from .predicate.phenotype import P PatientsByHPO = namedtuple('PatientsByHPO', field_names=['all_with_hpo', 'all_without_hpo']) @@ -16,18 +17,20 @@ class GenotypePhenotypeAnalysisResult: `GenotypePhenotypeAnalysisResult` summarizes results of genotype-phenotype correlation analysis of a cohort. """ - def __init__(self, n_usable: pd.Series, - all_counts: pd.DataFrame, - pvals: pd.Series, - corrected_pvals: typing.Optional[pd.Series], - phenotype_categories: typing.Iterable[PatientCategory], - question: str): + def __init__( + self, n_usable: pd.Series, + all_counts: typing.Mapping[P, pd.DataFrame], + pvals: pd.Series, + corrected_pvals: typing.Optional[pd.Series], + phenotype_categories: typing.Iterable[PatientCategory], + geno_predicate: PolyPredicate, + ): self._n_usable = n_usable self._all_counts = all_counts self._pvals = pvals self._corrected_pvals = corrected_pvals self._phenotype_categories = tuple(phenotype_categories) - self._question = question + self._geno_predicate = geno_predicate @property def n_usable(self) -> pd.Series: @@ -38,23 +41,22 @@ def n_usable(self) -> pd.Series: return self._n_usable @property - def all_counts(self) -> pd.DataFrame: + def all_counts(self) -> typing.Mapping[P, pd.DataFrame]: """ - Get a :class:`pandas.DataFrame` with counts of patients in genotype and phenotype groups. + Get a mapping from the phenotype item to :class:`pandas.DataFrame` with counts of patients + in genotype and phenotype groups. An example for a genotype predicate that bins into two categories (`Yes` and `No`) based on presence - of a missense variant in transcript, and phenotype predicate that checks presence/absence of a phenotype term:: - - Has MISSENSE_VARIANT in NM_123456.7 - No Yes - HPO Present - HP:0001166 Yes 1 13 - HP:0001166 No 7 5 - HP:0003011 Yes 3 11 - HP:0003011 No 7 16 - HP:0012638 Yes 9 15 - HP:0012638 No 3 4 + of a missense variant in transcript `NM_123456.7`, and phenotype predicate that checks + presence/absence of `HP:0001166` (a phenotype term):: + Has MISSENSE_VARIANT in NM_123456.7 + No Yes + Present + Yes 1 13 + No 7 5 + + The rows correspond to the phenotype categories, and the columns represent the genotype categories. """ return self._all_counts @@ -79,70 +81,50 @@ def phenotype_categories(self) -> typing.Sequence[PatientCategory]: """ return self._phenotype_categories - def summarize(self, hpo: hpotk.MinimalOntology, - phenotype_category: PatientCategory) -> pd.DataFrame: + def summarize( + self, hpo: hpotk.MinimalOntology, + category: PatientCategory, + ) -> pd.DataFrame: """ Create a data frame with summary of the genotype phenotype analysis. The *rows* of the frame correspond to the analyzed HPO terms. The columns of the data frame have `Count` and `Percentage` per used genotype predicate. - For instance, if we used :class:`genophenocorr.analysis.predicate.genotype.VariantEffectPredicate` - which compares phenotype with and without variants with certain :class:`genophenocorr.model.VariantEffect`, - we will have the following columns: - ===== ======= ===== ======= - Yes No - -------------- -------------- - Count Percent Count Percent - ===== ======= ===== ======= + **Example** - The final data frame may look something like this:: + If we use :class:`genophenocorr.analysis.predicate.genotype.VariantEffectPredicate` + which can compare phenotype with and without a missense variant, we will have a data frame + that looks like this:: MISSENSE_VARIANT on `NM_1234.5` No Yes Count Percent Count Percent p value Corrected p value - Arachnodactyly [HP:0001166] 1/14 7.14% 13/14 92.86% 0.000781 0.020299 - Abnormality of the musculature [HP:0003011] 6/17 35.29% 11/17 64.71% 1.000000 1.000000 - Abnormal nervous system physiology [HP:0012638] 9/24 37.50% 15/24 62.50% 1.000000 1.000000 + Arachnodactyly [HP:0001166] 1/10 10% 13/16 81% 0.000781 0.020299 + Abnormality of the musculature [HP:0003011] 6/6 100% 11/11 100% 1.000000 1.000000 + Abnormal nervous system physiology [HP:0012638] 9/9 100% 15/15 100% 1.000000 1.000000 ... ... ... ... ... ... ... """ - if phenotype_category not in self._phenotype_categories: - raise ValueError(f'Unknown phenotype category: {phenotype_category}. Use one of {self._phenotype_categories}') + if category not in self._phenotype_categories: + raise ValueError(f'Unknown phenotype category: {category}. Use one of {self._phenotype_categories}') # Row index: a list of tested HPO terms pheno_idx = pd.Index(self._n_usable.index) # Column index: multiindex of counts and percentages for all genotype predicate groups geno_idx = pd.MultiIndex.from_product( - iterables=(self._all_counts.columns, ('Count', 'Percent')), - names=(self._question, None)) + iterables=(self._geno_predicate.get_categories(), ('Count', 'Percent')), + names=(self._geno_predicate.get_question(), None), + ) # We'll fill this frame with data df = pd.DataFrame(index=pheno_idx, columns=geno_idx) - # We must choose the counts with the `phenotype_category` of interest. - # This depends on the used phenotype predicate. - # Usually, we are interested patients who HAVE a feature, - # but we can also investigate those who do NOT have the feature. - # Hence, we need `phenotype_category`. - counts = self._all_counts.loc[(slice(None), phenotype_category), :] - counts.set_index(pheno_idx, inplace=True) - - # We must find the total count of all samples in each genotype category - # stack turns all indexes into columns. swaplevel switches the pheno_category with geno_category. - # unstack puts pheno_category as the column and geno_category as the multi-index. - # sum(axis=1) adds across the geno_category, giving us the total patients in that category, removing pheno_category. - # unstack puts the geno_category back as the column name. reindex_like(counts) formats it like counts, making the - # following for loop work. - totals = self._all_counts.stack().swaplevel().unstack().sum(axis=1).unstack().reindex_like(counts) - - # Fill the frame cells - for col in geno_idx.levels[0]: - cnt = counts[col] - tot = totals[col] - # Format `Count` as `N/M` string, where `N` is the sample count for the genotype category - # within that phenotype category and `M` is the total count of all samples in that genotype - df[col, 'Count'] = cnt.map(str) + '/' + tot.map(str) - df[col, 'Percent'] = (cnt * 100 / tot).round(decimals=2).map(str) + '%' + for pf, count in self._all_counts.items(): + total = count.sum() # Sum across the phenotype categories (collapse the rows). + for gt_cat in count.columns: + cnt = count.loc[category, gt_cat] + df.loc[pf, (gt_cat, 'Count')] = f'{cnt}/{total[gt_cat]}' + df.loc[pf, (gt_cat, 'Percent')] = f'{round(cnt * 100 / total[gt_cat])}%' # Add columns with p values and corrected p values (if present) df.insert(df.shape[1], ('', self._pvals.name), self._pvals) diff --git a/src/genophenocorr/analysis/_config.py b/src/genophenocorr/analysis/_config.py index 63b96a066..1714c4470 100644 --- a/src/genophenocorr/analysis/_config.py +++ b/src/genophenocorr/analysis/_config.py @@ -11,7 +11,6 @@ from ._filter import SimplePhenotypeFilter from ._gp_analysis import FisherExactAnalyzer from ._gp_impl import GpCohortAnalysis -from .predicate.phenotype import PropagatingPhenotypeBooleanPredicateFactory P_VAL_OPTIONS = ( 'bonferroni', 'b', @@ -196,14 +195,8 @@ def configure_cohort_analysis(cohort: Cohort, config.min_perc_patients_w_hpo, ) - # Configure the nuts and bolts of the G/P analysis. - phenotype_predicate_factory = PropagatingPhenotypeBooleanPredicateFactory( - hpo=hpo, - missing_implies_excluded=config.missing_implies_excluded, - ) # Choosing a simple Fisher's exact test for now. gp_analyzer = FisherExactAnalyzer( - pheno_predicate_factory=phenotype_predicate_factory, p_val_correction=config.pval_correction, mtc_alpha=config.mtc_alpha, ) @@ -214,6 +207,7 @@ def configure_cohort_analysis(cohort: Cohort, protein_service=protein_metadata_service, phenotype_filter=phenotype_filter, gp_analyzer=gp_analyzer, + missing_implies_excluded=config.missing_implies_excluded, include_sv=config.include_sv, ) diff --git a/src/genophenocorr/analysis/_gp_analysis.py b/src/genophenocorr/analysis/_gp_analysis.py index 38c7dad49..96384d3b7 100644 --- a/src/genophenocorr/analysis/_gp_analysis.py +++ b/src/genophenocorr/analysis/_gp_analysis.py @@ -1,38 +1,31 @@ import abc import typing -import hpotk - import pandas as pd from statsmodels.stats import multitest from genophenocorr.model import Patient -from .predicate import PolyPredicate -from .predicate.phenotype import PhenotypePredicateFactory +from .predicate import PolyPredicate, PatientCategory +from .predicate.phenotype import PhenotypePolyPredicate, P from ._api import GenotypePhenotypeAnalysisResult from ._stats import run_fisher_exact, run_recessive_fisher_exact -class GPAnalyzer(metaclass=abc.ABCMeta): +class GPAnalyzer(typing.Generic[P], metaclass=abc.ABCMeta): """ `GPAnalyzer` calculates p values for genotype-phenotype correlation of phenotypic features of interest. """ - def __init__(self, - pheno_predicate_factory: PhenotypePredicateFactory, - ): - self._pheno_predicate_factory = hpotk.util.validate_instance( - pheno_predicate_factory, PhenotypePredicateFactory, 'pheno_predicate_factory') - @abc.abstractmethod - def analyze(self, - patients: typing.Iterable[Patient], - phenotypic_features: typing.Iterable[hpotk.TermId], - predicate: PolyPredicate, - ) -> GenotypePhenotypeAnalysisResult: + def analyze( + self, + patients: typing.Iterable[Patient], + pheno_predicates: typing.Iterable[PhenotypePolyPredicate[P]], + gt_predicate: PolyPredicate, + ) -> GenotypePhenotypeAnalysisResult: """ Test for association between `phenotypic_features` of interest groups or `patients` determined by the `predicate`. @@ -42,75 +35,91 @@ def analyze(self, pass @staticmethod - def _count_patients(patients: typing.Iterable[Patient], - phenotypes_of_interest: typing.Iterable[hpotk.TermId], - pheno_predicate_factory: PhenotypePredicateFactory, - geno_predicate: PolyPredicate) -> typing.Tuple[pd.Series, pd.DataFrame]: - row_idx = pd.MultiIndex.from_product( - iterables=(phenotypes_of_interest, pheno_predicate_factory.get_categories()), - names=('phenotypic_feature', 'category') - ) - col_idx = pd.Index(geno_predicate.get_categories(), name=geno_predicate.get_question()) - counts = pd.DataFrame(data=0, index=row_idx, columns=col_idx) - n_usable_patients = pd.Series(data=0, index=pd.Index(phenotypes_of_interest)) + def _count_patients( + patients: typing.Iterable[Patient], + pheno_predicates: typing.Iterable[PhenotypePolyPredicate[P]], + gt_predicate: PolyPredicate, + ) -> typing.Tuple[typing.Iterable[PatientCategory], pd.Series, typing.Mapping[P, pd.DataFrame]]: + phenotypes = set() + categories = set() + for predicate in pheno_predicates: + categories.update(predicate.get_categories()) + phenotypes.update(predicate.phenotypes) + + n_usable_patients = pd.Series(data=0, index=pd.Index(phenotypes)) # Apply genotype and phenotype predicates - for pf in phenotypes_of_interest: - pheno_predicate = pheno_predicate_factory.get_predicate(pf) + counts = {} + for ph_predicate in pheno_predicates: + phenotypes = ph_predicate.phenotypes + + for phenotype in phenotypes: + if phenotype not in counts: + # Make an empty frame for keeping track of the counts. + counts[phenotype] = pd.DataFrame( + data=0, + index=pd.Index( + data=ph_predicate.get_categories(), + name=ph_predicate.get_question(), + ), + columns=pd.Index( + data=gt_predicate.get_categories(), + name=gt_predicate.get_question(), + ), + ) for patient in patients: - pheno_cat = pheno_predicate.test(patient) - geno_cat = geno_predicate.test(patient) + pheno_cat = ph_predicate.test(patient) + geno_cat = gt_predicate.test(patient) if pheno_cat is not None and geno_cat is not None: - counts.loc[(pf, pheno_cat), geno_cat] += 1 - n_usable_patients[pf] += 1 + counts[pheno_cat.phenotype].loc[pheno_cat.category, geno_cat.category] += 1 + n_usable_patients[pheno_cat.phenotype] += 1 - return n_usable_patients, counts + return categories, n_usable_patients, counts -class FisherExactAnalyzer(GPAnalyzer): +class FisherExactAnalyzer(typing.Generic[P], GPAnalyzer[P]): """ `FisherExactAnalyzer` uses Fisher's exact test to calculate p value for phenotypic features of interest. Following the test, the code applies one of the multiple testing corrections provided by :func:`statsmodels.stats.multitest.multipletests` function at given `mtc_alpha`. - :param p_val_correction: a `str` with name of the multiple testing correction method or `None` if no correction should be applied. + :param p_val_correction: a `str` with name of the multiple testing correction method or `None` if no correction + should be applied. :param mtc_alpha: a `float` in range :math:`(0, 1]` with the multiple testing correction alpha value. """ - def __init__(self, - pheno_predicate_factory: PhenotypePredicateFactory, - p_val_correction: typing.Optional[str] = None, - mtc_alpha: float = .05, - ): - super().__init__(pheno_predicate_factory) + def __init__( + self, + p_val_correction: typing.Optional[str] = None, + mtc_alpha: float = .05, + ): self._correction = p_val_correction self._mtc_alpha = mtc_alpha def analyze( self, patients: typing.Iterable[Patient], - phenotypic_features: typing.Iterable[hpotk.TermId], - predicate: PolyPredicate, + pheno_predicates: typing.Iterable[PhenotypePolyPredicate[P]], + gt_predicate: PolyPredicate, ) -> GenotypePhenotypeAnalysisResult: # 1) Count the patients - n_usable, all_counts = GPAnalyzer._count_patients( - patients, phenotypic_features, - self._pheno_predicate_factory, predicate, + categories, n_usable, all_counts = GPAnalyzer._count_patients( + patients, pheno_predicates, gt_predicate, ) # 2) Statistical tests - pvals_idx = pd.Index(phenotypic_features, name='p_val') - pvals = pd.Series(float('nan'), index=pvals_idx, name='p value') - for pf in phenotypic_features: - counts = all_counts.loc[pf] + pheno_idx = pd.Index(n_usable.index, name='p_val') + pvals = pd.Series(float('nan'), index=pheno_idx, name='p value') + for phenotype in pheno_idx: + counts = all_counts[phenotype] # TODO - this is where we must fail unless we have the contingency table of the right size! if counts.shape == (2, 2): - pvals[pf] = run_fisher_exact(counts) + pvals[phenotype] = run_fisher_exact(counts) elif counts.shape == (3, 2): - pvals[pf] = run_recessive_fisher_exact(counts) + pvals[phenotype] = run_recessive_fisher_exact(counts) else: raise ValueError( "Invalid number of categories. " @@ -120,7 +129,7 @@ def analyze( # 3) Multiple correction if self._correction is not None: _, pvals_corrected, _, _ = multitest.multipletests(pvals, alpha=self._mtc_alpha, method=self._correction) - corrected_idx = pd.Index(phenotypic_features, name='p_val_corrected') + corrected_idx = pd.Index(n_usable.index, name='p_val_corrected') corrected_pvals_series = pd.Series(data=pvals_corrected, index=corrected_idx, name='Corrected p value') else: @@ -132,6 +141,6 @@ def analyze( all_counts=all_counts, pvals=pvals, corrected_pvals=corrected_pvals_series, - phenotype_categories=self._pheno_predicate_factory.get_categories(), - question=predicate.get_question(), + phenotype_categories=categories, + geno_predicate=gt_predicate, ) diff --git a/src/genophenocorr/analysis/_gp_impl.py b/src/genophenocorr/analysis/_gp_impl.py index ce90551ca..d3cd08209 100644 --- a/src/genophenocorr/analysis/_gp_impl.py +++ b/src/genophenocorr/analysis/_gp_impl.py @@ -1,12 +1,14 @@ import logging +import typing import hpotk from genophenocorr.model import Cohort, VariantEffect, FeatureType from genophenocorr.preprocessing import ProteinMetadataService -from .predicate import BooleanPredicate, PolyPredicate +from .predicate import GenotypePolyPredicate, GenotypeBooleanPredicate from .predicate.genotype import VariantEffectPredicate, VariantPredicate, ExonPredicate, ProtFeatureTypePredicate, ProtFeaturePredicate from .predicate.genotype import VariantEffectsPredicate, VariantsPredicate, ExonsPredicate, ProtFeaturesPredicate, ProtFeatureTypesPredicate +from .predicate.phenotype import PhenotypePolyPredicate, P, PropagatingPhenotypePredicate from ._api import CohortAnalysis, GenotypePhenotypeAnalysisResult from ._filter import PhenotypeFilter @@ -20,6 +22,7 @@ def __init__(self, cohort: Cohort, protein_service: ProteinMetadataService, phenotype_filter: PhenotypeFilter, gp_analyzer: GPAnalyzer, + missing_implies_excluded: bool, include_sv: bool = False, ): if not isinstance(cohort, Cohort): @@ -38,57 +41,98 @@ def __init__(self, cohort: Cohort, raise ValueError('No patients left for analysis!') self._hpo_terms_of_interest = self._phenotype_filter.filter_features(self._patient_list) + self._missing_implies_excluded = missing_implies_excluded def compare_by_variant_effect(self, effect: VariantEffect, tx_id: str) -> GenotypePhenotypeAnalysisResult: predicate = VariantEffectPredicate(tx_id, effect) - return self._apply_boolean_predicate(predicate) + return self._apply_boolean_predicate_on_hpo_terms(predicate) def compare_by_variant_key(self, variant_key: str) -> GenotypePhenotypeAnalysisResult: predicate = VariantPredicate(variant_key) - return self._apply_boolean_predicate(predicate) + return self._apply_boolean_predicate_on_hpo_terms(predicate) def compare_by_exon(self, exon_number: int, tx_id: str) -> GenotypePhenotypeAnalysisResult: predicate = ExonPredicate(tx_id, exon_number) - return self._apply_boolean_predicate(predicate) + return self._apply_boolean_predicate_on_hpo_terms(predicate) def compare_by_protein_feature_type(self, feature_type: FeatureType, tx_id: str) -> GenotypePhenotypeAnalysisResult: predicate = ProtFeatureTypePredicate(tx_id, feature_type, self._protein_service) - return self._apply_boolean_predicate(predicate) + return self._apply_boolean_predicate_on_hpo_terms(predicate) def compare_by_protein_feature(self, feature: str, tx_id: str) -> GenotypePhenotypeAnalysisResult: predicate = ProtFeaturePredicate(tx_id, feature, self._protein_service) - return self._apply_boolean_predicate(predicate) - - def compare_by_variant_effects(self, effect1: VariantEffect, effect2: VariantEffect, tx_id: str) -> GenotypePhenotypeAnalysisResult: + return self._apply_boolean_predicate_on_hpo_terms(predicate) + + def compare_by_variant_effects( + self, + effect1: VariantEffect, + effect2: VariantEffect, + tx_id: str, + ) -> GenotypePhenotypeAnalysisResult: predicate = VariantEffectsPredicate(tx_id, effect1, effect2) - return self._apply_poly_predicate(predicate) + return self._apply_poly_predicate_on_hpo_terms(predicate) def compare_by_variant_keys(self, variant_key1: str, variant_key2: str) -> GenotypePhenotypeAnalysisResult: predicate = VariantsPredicate(variant_key1, variant_key2) - return self._apply_poly_predicate(predicate) + return self._apply_poly_predicate_on_hpo_terms(predicate) def compare_by_exons(self, exon1_number: int, exon2_number: int, tx_id: str) -> GenotypePhenotypeAnalysisResult: predicate = ExonsPredicate(tx_id, exon1_number, exon2_number) - return self._apply_poly_predicate(predicate) - - def compare_by_protein_feature_types(self, feature_type1: FeatureType, feature_type2: FeatureType, tx_id: str) -> GenotypePhenotypeAnalysisResult: + return self._apply_poly_predicate_on_hpo_terms(predicate) + + def compare_by_protein_feature_types( + self, + feature_type1: FeatureType, + feature_type2: FeatureType, + tx_id: str, + ) -> GenotypePhenotypeAnalysisResult: predicate = ProtFeatureTypesPredicate(tx_id, feature_type1, feature_type2, self._protein_service) - return self._apply_poly_predicate(predicate) - - def compare_by_protein_features(self, feature1: str, feature2: str, tx_id: str) -> GenotypePhenotypeAnalysisResult: + return self._apply_poly_predicate_on_hpo_terms(predicate) + + def compare_by_protein_features( + self, + feature1: str, + feature2: str, + tx_id: str, + ) -> GenotypePhenotypeAnalysisResult: predicate = ProtFeaturesPredicate(tx_id, feature1, feature2, self._protein_service) - return self._apply_poly_predicate(predicate) - - def _apply_boolean_predicate(self, predicate: BooleanPredicate) -> GenotypePhenotypeAnalysisResult: - assert isinstance(predicate, BooleanPredicate), f'{type(predicate)} is not an instance of `BooleanPredicate`' - - return self._apply_poly_predicate(predicate) - - def _apply_poly_predicate(self, predicate: PolyPredicate) -> GenotypePhenotypeAnalysisResult: - assert isinstance(predicate, PolyPredicate), f'{type(predicate)} is not an instance of `PolyPredicate`' - + return self._apply_poly_predicate_on_hpo_terms(predicate) + + def _apply_boolean_predicate_on_hpo_terms( + self, + predicate: GenotypeBooleanPredicate, + ) -> GenotypePhenotypeAnalysisResult: + assert isinstance(predicate, GenotypeBooleanPredicate), \ + f'{type(predicate)} is not an instance of `GenotypeBooleanPredicate`' + + return self._apply_poly_predicate_on_hpo_terms(predicate) + + def _apply_poly_predicate_on_hpo_terms( + self, + gt_predicate: GenotypePolyPredicate, + ) -> GenotypePhenotypeAnalysisResult: + assert isinstance(gt_predicate, GenotypePolyPredicate), \ + f'{type(gt_predicate)} is not an instance of `GenotypePolyPredicate`' + + pheno_predicates = self._prepare_phenotype_predicates() + return self._apply_poly_predicate(pheno_predicates, gt_predicate) + + def _apply_poly_predicate( + self, + pheno_predicates: typing.Iterable[PhenotypePolyPredicate[P]], + gt_predicate: GenotypePolyPredicate, + ): return self._gp_analyzer.analyze( patients=self._patient_list, - phenotypic_features=self._hpo_terms_of_interest, - predicate=predicate, + pheno_predicates=pheno_predicates, + gt_predicate=gt_predicate, ) + + def _prepare_phenotype_predicates(self) -> typing.Sequence[PhenotypePolyPredicate[P]]: + return tuple( + PropagatingPhenotypePredicate( + hpo=self._hpo, + query=query, + missing_implies_excluded=self._missing_implies_excluded, + ) + for query in self._hpo_terms_of_interest) diff --git a/src/genophenocorr/analysis/predicate/__init__.py b/src/genophenocorr/analysis/predicate/__init__.py index 80af3d2a7..ef5c7fec1 100644 --- a/src/genophenocorr/analysis/predicate/__init__.py +++ b/src/genophenocorr/analysis/predicate/__init__.py @@ -1,8 +1,12 @@ from . import genotype from . import phenotype -from ._api import BooleanPredicate, PatientCategory, PolyPredicate, GroupingPredicate +from ._api import PatientCategory, PatientCategories, Categorization, C +from ._api import PolyPredicate, GroupingPredicate +from ._api import GenotypePolyPredicate, GenotypeBooleanPredicate __all__ = [ - 'PolyPredicate', 'PatientCategory', 'BooleanPredicate', 'GroupingPredicate' + 'PatientCategory', 'PatientCategories', 'Categorization', 'C', + 'PolyPredicate', 'GroupingPredicate', + 'GenotypePolyPredicate', 'GenotypeBooleanPredicate', ] diff --git a/src/genophenocorr/analysis/predicate/_api.py b/src/genophenocorr/analysis/predicate/_api.py index 64b503edc..866447ff9 100644 --- a/src/genophenocorr/analysis/predicate/_api.py +++ b/src/genophenocorr/analysis/predicate/_api.py @@ -48,7 +48,7 @@ def __repr__(self) -> str: f"description={self.description})" def __str__(self) -> str: - return self.name + return self._name def __eq__(self, other) -> bool: return isinstance(other, PatientCategory) \ @@ -60,10 +60,60 @@ def __hash__(self) -> int: return hash((self.cat_id, self.name, self.description)) -class PolyPredicate(metaclass=abc.ABCMeta): +class PatientCategories(metaclass=abc.ABCMeta): + """ + A static utility class to serve common patient categories. + """ + + YES = PatientCategory(1, 'Yes', 'The patient belongs to the group.') + """ + Category for a patient who *belongs* to the tested group. + """ + + NO = PatientCategory(0, 'No', 'The patient does not belong to the group.') + """ + Category for a patient who does *not* belong to the tested group. + """ + + +class Categorization: + """ + `Categorization` represents one of discrete group a :class:`genophenocorr.model.Patient` can be assigned into. + """ + + def __init__( + self, + category: PatientCategory, + ): + self._category = hpotk.util.validate_instance(category, PatientCategory, 'category') + + @property + def category(self) -> PatientCategory: + return self._category + + def __eq__(self, other): + return isinstance(other, Categorization) and self._category == other._category + + def __hash__(self): + return hash((self._category,)) + + def __repr__(self): + return f'Categorization(category={self._category})' + + def __str__(self): + return repr(self) + + +C = typing.TypeVar('C', bound=Categorization) +""" +A generic bound for types that extend :class:`Categorization`. +""" + + +class PolyPredicate(typing.Generic[C], metaclass=abc.ABCMeta): """ `PolyPredicate` bins a :class:`genophenocorr.model.Patient` into one of several discrete groups represented - by :class:`PatientCategory`. + by :class:`Categorization`. The groups must be *exclusive* - the patient can be binned into one and only one group, and *exhaustive* - the groups must cover all possible scenarios. @@ -74,14 +124,19 @@ class PolyPredicate(metaclass=abc.ABCMeta): Predicate will *not* check if, for instance, the patient variants are compatible with a certain mode of inheritance. """ - @staticmethod @abc.abstractmethod - def get_categories() -> typing.Sequence[PatientCategory]: + def get_categorizations(self) -> typing.Sequence[C]: """ Get a sequence of all categories which the `PolyPredicate` can produce. """ pass + def get_categories(self) -> typing.Sequence[PatientCategory]: + """ + Get a sequence with :class:`PatientCategory` instances that the predicate can produce. + """ + return tuple(c.category for c in self.get_categorizations()) + @abc.abstractmethod def get_question(self) -> str: """ @@ -90,20 +145,19 @@ def get_question(self) -> str: pass @abc.abstractmethod - def test(self, patient: Patient) -> typing.Optional[PatientCategory]: + def test(self, patient: Patient) -> typing.Optional[C]: """ - Assign a `patient` into a category. + Assign a `patient` into a categorization. Return `None` if the patient cannot be assigned into any meaningful category. """ pass - @classmethod - def n_categories(cls) -> int: + def n_categorizations(self) -> int: """ - Get the number of categories the predicate can produce. + Get the number of categorizations the predicate can produce. """ - return len(cls.get_categories()) + return len(self.get_categorizations()) @staticmethod def _check_patient(patient: Patient): @@ -114,46 +168,46 @@ def _check_patient(patient: Patient): raise ValueError(f"patient must be type Patient but was type {type(patient)}") -class BooleanPredicate(PolyPredicate, metaclass=abc.ABCMeta): +class GenotypePolyPredicate(PolyPredicate[Categorization], metaclass=abc.ABCMeta): """ - `BooleanPredicate` tests if a :class:`genophenocorr.model.Patient` belongs to a group and returns a boolean binning. + `GenotypePolyPredicate` constrains `PolyPredicate` to investigate the genotype aspects + of patients. """ + pass - NO = PatientCategory(0, 'No', 'The patient does not belong to the group.') - """ - Category for a patient who does *not* belong to the tested group. - """ - YES = PatientCategory(1, 'Yes', 'The patient belongs to the group.') +class GenotypeBooleanPredicate(GenotypePolyPredicate, metaclass=abc.ABCMeta): """ - Category for a patient who *belongs* to the tested group. + `GenotypeBooleanPredicate` tests if a :class:`genophenocorr.model.Patient` belongs to a genotype group + and returns a boolean binning. """ + YES = Categorization(PatientCategories.YES) + NO = Categorization(PatientCategories.NO) - @staticmethod - def get_categories() -> typing.Sequence[PatientCategory]: + def get_categorizations(self) -> typing.Sequence[Categorization]: """ The predicate bins a patient into :class:`BooleanPredicate.NO` or :class:`BooleanPredicate.YES` category. """ - return BooleanPredicate.NO, BooleanPredicate.YES - -class GroupingPredicate(PolyPredicate, metaclass=abc.ABCMeta): + return GenotypeBooleanPredicate.YES, GenotypeBooleanPredicate.NO + + +class GroupingPredicate(GenotypePolyPredicate, metaclass=abc.ABCMeta): """ `GroupingPredicate` tests if a :class:`genophenocorr.model.Patient` belongs to one of two groups and returns FIRST or SECOND based on which group Patient belongs in. """ - FIRST = PatientCategory(0, 'First', 'The patient belongs in the first group.') + FIRST = Categorization(PatientCategory(0, 'First', 'The patient belongs in the first group.')) """ Category for a patient who belongs in the first given tested group. """ - SECOND = PatientCategory(1, 'Second', 'The patient belongs in the second group.') + SECOND = Categorization(PatientCategory(1, 'Second', 'The patient belongs in the second group.')) """ Category for a patient who belongs to the second given tested group. """ - - @staticmethod - def get_categories() -> typing.Sequence[PatientCategory]: + + def get_categorizations(self) -> typing.Sequence[Categorization]: """ The predicate bins a patient into :class:`GroupingPredicate.FIRST` or :class:`GroupingPredicate.SECOND` category. """ - return GroupingPredicate.FIRST, GroupingPredicate.SECOND \ No newline at end of file + return GroupingPredicate.FIRST, GroupingPredicate.SECOND diff --git a/src/genophenocorr/analysis/predicate/genotype/__init__.py b/src/genophenocorr/analysis/predicate/genotype/__init__.py index 6fc43b9c2..231a56f70 100644 --- a/src/genophenocorr/analysis/predicate/genotype/__init__.py +++ b/src/genophenocorr/analysis/predicate/genotype/__init__.py @@ -1,14 +1,12 @@ from ._geno_bool import ProtFeaturePredicate, ProtFeatureTypePredicate from ._geno_bool import VariantEffectPredicate, VariantPredicate, ExonPredicate -from ._geno_bool import HOMOZYGOUS, HETEROZYGOUS, NO_VARIANT from ._geno_group import VariantEffectsPredicate, VariantsPredicate, ExonsPredicate from ._geno_group import ProtFeaturesPredicate, ProtFeatureTypesPredicate __all__ = [ - 'VariantEffectPredicate', 'VariantEffectsPredicate', - 'VariantPredicate', 'VariantsPredicate', + 'VariantEffectPredicate', 'VariantEffectsPredicate', + 'VariantPredicate', 'VariantsPredicate', 'ExonPredicate', 'ExonsPredicate', 'ProtFeaturePredicate', 'ProtFeaturesPredicate', 'ProtFeatureTypePredicate', 'ProtFeatureTypesPredicate', - 'HOMOZYGOUS', 'HETEROZYGOUS', 'NO_VARIANT', -] \ No newline at end of file +] diff --git a/src/genophenocorr/analysis/predicate/genotype/_geno_bool.py b/src/genophenocorr/analysis/predicate/genotype/_geno_bool.py index ea503b92d..7882b1ac2 100644 --- a/src/genophenocorr/analysis/predicate/genotype/_geno_bool.py +++ b/src/genophenocorr/analysis/predicate/genotype/_geno_bool.py @@ -5,31 +5,10 @@ from genophenocorr.model import Patient, FeatureType, VariantEffect from genophenocorr.preprocessing import ProteinMetadataService -from .._api import PatientCategory, BooleanPredicate +from .._api import Categorization, GenotypeBooleanPredicate -# TODO - should we remove these three? -HETEROZYGOUS = PatientCategory(cat_id=0, - name='Heterozygous', - description=""" - This sample has the tested attribute on one allele. - """) #: :meta hide-value: - -HOMOZYGOUS = PatientCategory(cat_id=1, - name='Homozygous', - description=""" - This sample has the tested attribute on both alleles. - """) #: :meta hide-value: - -NO_VARIANT = PatientCategory(cat_id=2, - name='No Variant', - description=""" - The sample does not have the tested attribute. - """) #: :meta hide-value: - - - -class VariantEffectPredicate(BooleanPredicate): +class VariantEffectPredicate(GenotypeBooleanPredicate): """ `VariantEffectPredicate` tests if the `patient` has at least one variant that is predicted to have the functional `effect` on the transcript of interest. @@ -46,7 +25,7 @@ def __init__(self, transcript_id: str, def get_question(self) -> str: return f'{self._effect.name} on {self._tx_id}' - def test(self, patient: Patient) -> typing.Optional[PatientCategory]: + def test(self, patient: Patient) -> typing.Optional[Categorization]: self._check_patient(patient) if len(patient.variants) == 0: @@ -57,9 +36,9 @@ def test(self, patient: Patient) -> typing.Optional[PatientCategory]: if ann.transcript_id == self._tx_id: for var_eff in ann.variant_effects: if var_eff == self._effect: - return BooleanPredicate.YES + return GenotypeBooleanPredicate.YES - return BooleanPredicate.NO + return GenotypeBooleanPredicate.NO def __str__(self): return repr(self) @@ -68,7 +47,7 @@ def __repr__(self): return f'VariantEffectPredicate(transcript_id={self._tx_id}, effect={self._effect})' -class VariantPredicate(BooleanPredicate): +class VariantPredicate(GenotypeBooleanPredicate): """ `VariantPredicate` tests if the `patient` has ar least one allele of the variant described by the `variant_key`. @@ -85,7 +64,7 @@ def __init__(self, variant_key: str) -> None: def get_question(self) -> str: return f'>=1 allele of the variant {self._variant_key}' - def test(self, patient: Patient) -> typing.Optional[PatientCategory]: + def test(self, patient: Patient) -> typing.Optional[Categorization]: self._check_patient(patient) if len(patient.variants) == 0: @@ -93,9 +72,9 @@ def test(self, patient: Patient) -> typing.Optional[PatientCategory]: for variant in patient.variants: if variant.variant_coordinates.variant_key == self._variant_key: - return BooleanPredicate.YES + return GenotypeBooleanPredicate.YES - return BooleanPredicate.NO + return GenotypeBooleanPredicate.NO def __str__(self): return repr(self) @@ -104,7 +83,7 @@ def __repr__(self): return f'VariantPredicate(variant_key={self._variant_key})' -class ExonPredicate(BooleanPredicate): +class ExonPredicate(GenotypeBooleanPredicate): """ `ExonPredicate` tests if the `patient` has a variant that affects *n*-th exon of the transcript of interest. @@ -116,7 +95,8 @@ class ExonPredicate(BooleanPredicate): .. warning:: We do not check if the `exon_number` spans beyond the number of exons of the given `transcript_id`! - Therefore, ``exon_number==10,000`` will effectively return :attr:`BooleanPredicate.FALSE` for *all* patients!!! 😱 + Therefore, ``exon_number==10,000`` will effectively return :attr:`GenotypeBooleanPredicate.FALSE` + for *all* patients!!! 😱 Well, at least the patients of the *Homo sapiens sapiens* taxon... :param transcript_id: the accession of the transcript of interest. @@ -133,7 +113,7 @@ def __init__(self, transcript_id: str, def get_question(self) -> str: return f'Variant in exon {self._exon_number} on {self._tx_id}' - def test(self, patient: Patient) -> typing.Optional[PatientCategory]: + def test(self, patient: Patient) -> typing.Optional[Categorization]: self._check_patient(patient) if len(patient.variants) == 0: @@ -144,9 +124,9 @@ def test(self, patient: Patient) -> typing.Optional[PatientCategory]: if ann.transcript_id == self._tx_id: if ann.overlapping_exons is not None: if self._exon_number in ann.overlapping_exons: - return BooleanPredicate.YES + return GenotypeBooleanPredicate.YES - return BooleanPredicate.NO + return GenotypeBooleanPredicate.NO def __str__(self): return repr(self) @@ -155,7 +135,7 @@ def __repr__(self): return f'ExonPredicate(tx_id={self._tx_id}, exon_number={self._exon_number})' -class ProtFeatureTypePredicate(BooleanPredicate): +class ProtFeatureTypePredicate(GenotypeBooleanPredicate): """ `ProtFeatureTypePredicate` tests if the `patient` has a variant that affects a :class:`FeatureType` in the protein encoded by the transcript of interest. @@ -173,7 +153,7 @@ def __init__(self, transcript_id: str, feature_type: FeatureType, protein_servic def get_question(self) -> str: return f'Variant that affects {self._feature_type.name} feature type on protein encoded by transcript {self._tx_id}' - def test(self, patient: Patient) -> typing.Optional[PatientCategory]: + def test(self, patient: Patient) -> typing.Optional[Categorization]: self._check_patient(patient) if len(patient.variants) == 0: @@ -191,11 +171,11 @@ def test(self, patient: Patient) -> typing.Optional[PatientCategory]: for feat in prot.protein_features: if feat.feature_type == self._feature_type: if prot_loc.overlaps_with(feat.info.region): - return BooleanPredicate.YES + return GenotypeBooleanPredicate.YES - return BooleanPredicate.NO + return GenotypeBooleanPredicate.NO #TODO: Add a logger field, add a branch that handles the state where prot_id is set but prot_loc is not - gives warning - + def __str__(self): return repr(self) @@ -203,7 +183,7 @@ def __repr__(self): return f'ProtFeatureTypePredicate(tx_id={self._tx_id}, feature_type={self._feature_type})' -class ProtFeaturePredicate(BooleanPredicate): +class ProtFeaturePredicate(GenotypeBooleanPredicate): """ `ProtFeaturePredicate` tests if the `patient` has a variant that overlaps with a protein feature. @@ -222,7 +202,7 @@ def __init__(self, transcript_id: str, protein_feature_name: str, protein_servic def get_question(self) -> str: return f'Variant that affects {self._pf_name} feature on protein encoded by transcript {self._tx_id}' - def test(self, patient: Patient) -> typing.Optional[PatientCategory]: + def test(self, patient: Patient) -> typing.Optional[Categorization]: self._check_patient(patient) if len(patient.variants) == 0: @@ -240,10 +220,9 @@ def test(self, patient: Patient) -> typing.Optional[PatientCategory]: for feat in prot.protein_features: if feat.info.name == self._pf_name: if prot_loc.overlaps_with(feat.info.region): - return BooleanPredicate.YES - - return BooleanPredicate.NO + return GenotypeBooleanPredicate.YES + return GenotypeBooleanPredicate.NO def __str__(self): return repr(self) diff --git a/src/genophenocorr/analysis/predicate/genotype/_geno_group.py b/src/genophenocorr/analysis/predicate/genotype/_geno_group.py index d9ad72152..a56f95a3d 100644 --- a/src/genophenocorr/analysis/predicate/genotype/_geno_group.py +++ b/src/genophenocorr/analysis/predicate/genotype/_geno_group.py @@ -53,7 +53,7 @@ def test(self, patient: Patient) -> typing.Optional[PatientCategory]: if len(patient.variants) == 0: return None - + results = [False, False] for variant in patient.variants: for ann in variant.tx_annotations: @@ -79,7 +79,7 @@ def __repr__(self): class VariantsPredicate(GroupingPredicate): """ - `VariantsPredicate` tests if the `patient` has at least one allele of one of the variants described by + `VariantsPredicate` tests if the `patient` has at least one allele of one of the variants described by the `variant_key1` and `variant_key2`. **If patient has both variant_keys, it is not included** @@ -103,7 +103,7 @@ def test(self, patient: Patient) -> typing.Optional[PatientCategory]: if len(patient.variants) == 0: return None - + result = [False, False] for variant in patient.variants: if variant.variant_coordinates.variant_key == self._variant_key1: @@ -126,7 +126,7 @@ def __repr__(self): class ExonsPredicate(GroupingPredicate): """ - `ExonsPredicate` tests if the `patient` has a variant that affects one of the + `ExonsPredicate` tests if the `patient` has a variant that affects one of the two given *n*-th exons of the transcript of interest. .. warning:: @@ -188,7 +188,7 @@ def __repr__(self): class ProtFeatureTypesPredicate(GroupingPredicate): """ - `ProtFeatureTypesPredicate` tests if the `patient` has a variant that affects one of the two given + `ProtFeatureTypesPredicate` tests if the `patient` has a variant that affects one of the two given :class:`FeatureType`s in the transcript of interest. :param transcript_id: the accession of the transcript of interest. @@ -297,7 +297,7 @@ def test(self, patient: Patient) -> typing.Optional[PatientCategory]: elif results == [False, True]: return GroupingPredicate.SECOND else: - return None + return None def __str__(self): return repr(self) diff --git a/src/genophenocorr/analysis/predicate/phenotype/__init__.py b/src/genophenocorr/analysis/predicate/phenotype/__init__.py index c1d44e150..c73be8d53 100644 --- a/src/genophenocorr/analysis/predicate/phenotype/__init__.py +++ b/src/genophenocorr/analysis/predicate/phenotype/__init__.py @@ -1,7 +1,9 @@ -from ._pheno import PhenotypePredicateFactory, PropagatingPhenotypePredicate, PropagatingPhenotypeBooleanPredicate -from ._pheno import PropagatingPhenotypeBooleanPredicateFactory +from ._pheno import PhenotypePolyPredicate, PropagatingPhenotypePredicate +from ._pheno import DiseasePresencePredicate +from ._pheno import PhenotypeCategorization, Comparable, P __all__ = [ - 'PropagatingPhenotypePredicate', 'PropagatingPhenotypeBooleanPredicate', - 'PhenotypePredicateFactory', 'PropagatingPhenotypeBooleanPredicateFactory' + 'PhenotypePolyPredicate', 'PropagatingPhenotypePredicate', + 'DiseasePresencePredicate', + 'PhenotypeCategorization', 'Comparable', 'P', ] diff --git a/src/genophenocorr/analysis/predicate/phenotype/_pheno.py b/src/genophenocorr/analysis/predicate/phenotype/_pheno.py index 94d8a437e..efc83e1ec 100644 --- a/src/genophenocorr/analysis/predicate/phenotype/_pheno.py +++ b/src/genophenocorr/analysis/predicate/phenotype/_pheno.py @@ -5,162 +5,156 @@ from genophenocorr.model import Patient -from .._api import PolyPredicate, BooleanPredicate, PatientCategory +from .._api import PolyPredicate, PatientCategory, PatientCategories, C, Categorization -class PhenotypePolyPredicate(PolyPredicate, metaclass=abc.ABCMeta): - pass +class Comparable(metaclass=abc.ABCMeta): + """ + A protocol for annotating comparable types. + """ + @abc.abstractmethod + def __eq__(self, other: typing.Any) -> bool: + pass -class BasePhenotypeBooleanPredicate(PhenotypePolyPredicate, BooleanPredicate, metaclass=abc.ABCMeta): - pass + @abc.abstractmethod + def __lt__(self, other: typing.Any) -> bool: + pass -class PropagatingPhenotypeBooleanPredicate(BasePhenotypeBooleanPredicate): +P = typing.TypeVar('P', typing.Hashable, Comparable) +""" +Phenotype entity of interest, such as :class:`hpotk.model.TermId`, representing an HPO term or an OMIM/MONDO term. - def __init__(self, hpo: hpotk.MinimalOntology, - phenotypic_feature: hpotk.TermId, - missing_implies_excluded: bool = False) -> None: - self._hpo = hpotk.util.validate_instance(hpo, hpotk.MinimalOntology, 'hpo') - self._query = hpotk.util.validate_instance(phenotypic_feature, hpotk.TermId, 'phenotypic_feature') - self._missing_implies_excluded = hpotk.util.validate_instance(missing_implies_excluded, bool, - 'missing_implies_excluded') +However, phenotype entity can be anything as long as it is hashable and comparable. +""" - def get_question(self) -> str: - query_label = self._hpo.get_term(self._query).name - return f'Is \'{query_label}\' present in the patient?' - def test(self, patient: Patient) -> typing.Optional[PatientCategory]: - self._check_patient(patient) +class PhenotypeCategorization(typing.Generic[P], Categorization): + """" + On top of the attributes of the `Categorization`, `PhenotypeCategorization` keeps track of the target phenotype `P`. + """ - if len(patient.phenotypes) == 0: - return None + def __init__( + self, + category: PatientCategory, + phenotype: P, + ): + super().__init__(category) + self._phenotype = phenotype - for phenotype in patient.phenotypes: - if phenotype.is_observed: - if any(self._query == anc for anc in self._hpo.graph.get_ancestors(phenotype, include_source=True)): - return BooleanPredicate.YES - else: - if self._missing_implies_excluded: - return BooleanPredicate.NO - else: - if any(self._query == desc for desc in - self._hpo.graph.get_descendants(phenotype, include_source=True)): - return BooleanPredicate.NO + @property + def phenotype(self) -> P: + return self._phenotype - return None + def __repr__(self) -> str: + return f"PhenotypeCategorization(" \ + f"category={self._category}, " \ + f"phenotype={self._phenotype})" + def __str__(self) -> str: + return repr(self) -class PropagatingPhenotypePredicate(PhenotypePolyPredicate): - """ - `PropagatingPhenotypePredicate` tests if the `patient` is annotated with a `query` HPO term. + def __eq__(self, other) -> bool: + return isinstance(other, PhenotypeCategorization) \ + and self._category == other._category \ + and self._phenotype == other._phenotype - The predicate returns the following results: + def __hash__(self) -> int: + return hash((self._category, self._phenotype)) - * :attr:`PropagatingPhenotypePredicate.PRESENT` if the patient is annotated with the `query` term or its descendant - * :attr:`PropagatingPhenotypePredicate.EXCLUDED` presence of the `query` term or its ancestor was specifically - excluded in the patient - * :attr:`PropagatingPhenotypePredicate.NOT_MEASURED` if the patient is not annotated with the `query` and presence of `query` - was *not* excluded + +class PhenotypePolyPredicate(typing.Generic[P], PolyPredicate[PhenotypeCategorization[P]], metaclass=abc.ABCMeta): """ + `PhenotypePolyPredicate` investigates a patient in context of one or more phenotype categories `P`. + + Each phenotype category `P` can be a :class:`hpotk.model.TermId` representing an HPO term or an OMIM/MONDO term. - PRESENT = PatientCategory(cat_id=0, - name='Present', - description=""" - The sample *is* annotated with the tested phenotype feature `q`. + Most of the time, only one category is investigated, and :attr:`phenotype` will return a sequence with + one element only (e.g. *Arachnodactyly* `HP:0001166`). However, multiple categories can be sought as well, + such as when the predicate bins the patient into one of discrete diseases + (e.g. Geleophysic dysplasia, Marfan syndrome, ...). Then the predicate should return a sequence of disease + identifiers. + """ - This is either because the sample is annotated with `q` (exact match), - or because one of sample's annotations is a descendant `q` (annotation propagation). - For instance, we tested for a Seizure and the sample *had* a Clonic seizure - (a descendant of Seizure). - """) #: :meta hide-value: + @property + @abc.abstractmethod + def phenotypes(self) -> typing.Sequence[P]: + """ + Get the phenotype entities of interest. + """ + pass - EXCLUDED = PatientCategory(cat_id=1, - name='Excluded', - description=""" - We are particular about the sample *not* having the tested feature `q`. - In other words, `q` was *excluded* in the sample or the sample is annotated with an excluded ancestor of `q`. +class PropagatingPhenotypePredicate(PhenotypePolyPredicate[PhenotypeCategorization[hpotk.TermId]]): + """ + `PropagatingPhenotypePredicate` tests if a patient is annotated with an HPO term. - For instance, we tested for a Clonic seizure and the sample did *not* have any Seizure, which implies - *not* Clonic seizure. - """) #: :meta hide-value: + Note, `query` must be a term of the provided `hpo`! - NOT_MEASURED = PatientCategory(cat_id=2, - name='Not measured', - description=""" - We do not know if the sample has or has not the tested feature. - """) #: :meta hide-value: + :param hpo: HPO object + :param query: the HPO term to test + :param missing_implies_excluded: `True` if lack of an explicit annotation implies term's absence`. + """ def __init__(self, hpo: hpotk.MinimalOntology, - phenotypic_feature: hpotk.TermId) -> None: + query: hpotk.TermId, + missing_implies_excluded: bool = False): self._hpo = hpotk.util.validate_instance(hpo, hpotk.MinimalOntology, 'hpo') - self._query = hpotk.util.validate_instance(phenotypic_feature, hpotk.TermId, 'phenotypic_feature') - - @staticmethod - def get_categories() -> typing.Sequence[PatientCategory]: - return PropagatingPhenotypePredicate.PRESENT, PropagatingPhenotypePredicate.EXCLUDED, PropagatingPhenotypePredicate.NOT_MEASURED + self._query = hpotk.util.validate_instance(query, hpotk.TermId, 'phenotypic_feature') + self._query_label = self._hpo.get_term_name(query) + assert self._query_label is not None, f'Query {query} is in HPO' + self._missing_implies_excluded = hpotk.util.validate_instance(missing_implies_excluded, bool, + 'missing_implies_excluded') + self._present = PhenotypeCategorization( + category=PatientCategories.YES, + phenotype=query, + ) + self._excluded = PhenotypeCategorization( + category=PatientCategories.NO, + phenotype=query, + ) def get_question(self) -> str: - query_label = self._hpo.get_term(self._query).name - return f'Is \'{query_label}\' present in the patient?' + return f'Is {self._query_label} present in the patient?' - def test(self, patient: Patient) -> typing.Optional[PatientCategory]: + @property + def phenotypes(self) -> typing.Sequence[hpotk.TermId]: + # We usually test just a single HPO term, so we return a tuple with a single member. + return self._query, + + def get_categorizations(self) -> typing.Sequence[C]: + return self._present, self._excluded + + def test(self, patient: Patient) -> typing.Optional[PhenotypeCategorization[P]]: self._check_patient(patient) if len(patient.phenotypes) == 0: return None for phenotype in patient.phenotypes: - if phenotype.is_observed: + if phenotype.is_present: if any(self._query == anc for anc in self._hpo.graph.get_ancestors(phenotype, include_source=True)): - return PropagatingPhenotypePredicate.PRESENT + return self._present else: - if any(self._query == desc for desc in self._hpo.graph.get_descendants(phenotype, include_source=True)): - return self.EXCLUDED - - return self.NOT_MEASURED + if self._missing_implies_excluded: + return self._excluded + else: + if any(self._query == desc for desc in + self._hpo.graph.get_descendants(phenotype, include_source=True)): + return self._excluded - def __str__(self): - return repr(self) + return None def __repr__(self): - return f'PropagatingPhenotypePredicate(query={self._query})' + return f'PropagatingPhenotypeBooleanPredicate(query={self._query})' -class PhenotypePredicateFactory(metaclass=abc.ABCMeta): - """ - `PhenotypePredicateFactory` creates a :class:`PhenotypePolyPredicate` to assess a phenotype query. +class DiseasePresencePredicate(PhenotypePolyPredicate[PhenotypeCategorization[hpotk.TermId]]): """ + `DiseasePresencePredicate` tests if the patient was diagnosed with a disease. - @staticmethod - @abc.abstractmethod - def get_categories() -> typing.Sequence[PatientCategory]: - """ - Get a sequence of all categories which the `PhenotypePolyPredicate` provided by this factory can produce. - """ - pass - - @abc.abstractmethod - def get_predicate(self, query: hpotk.TermId) -> PhenotypePolyPredicate: - """ - Get an instance of `PhenotypePolyPredicate` for the phenotype `query`. - - :param query: a term ID of the query phenotype. - """ - pass - - -class PropagatingPhenotypeBooleanPredicateFactory(PhenotypePredicateFactory): - - def __init__(self, hpo: hpotk.MinimalOntology, - missing_implies_excluded: bool = False): - self._hpo = hpotk.util.validate_instance(hpo, hpotk.MinimalOntology, 'hpo') - self._missing_implies_excluded = missing_implies_excluded - - @staticmethod - def get_categories() -> typing.Sequence[PatientCategory]: - return PropagatingPhenotypeBooleanPredicate.get_categories() - - def get_predicate(self, query: hpotk.TermId) -> PhenotypePolyPredicate: - return PropagatingPhenotypeBooleanPredicate(self._hpo, query, self._missing_implies_excluded) + The predicate tests if the patient's diseases include a disease ID formatted as a :class:`hpotk.model.TermId`. + """ + # TODO: Lauren please implement and test. + pass diff --git a/tests/test_analysis.py b/tests/test_analysis.py index b8a589820..91b397721 100644 --- a/tests/test_analysis.py +++ b/tests/test_analysis.py @@ -1,10 +1,12 @@ +import typing + import hpotk import pytest import pandas as pd -from genophenocorr.analysis import configure_cohort_analysis -from genophenocorr.analysis.predicate import BooleanPredicate +from genophenocorr.analysis import configure_cohort_analysis, GenotypePhenotypeAnalysisResult +from genophenocorr.analysis.predicate import PatientCategories from genophenocorr.data import get_toy_cohort from genophenocorr.model import Cohort, VariantEffect @@ -21,7 +23,7 @@ def test_compare_by_variant_effect(self, toy_cohort: Cohort, toy_hpo: hpotk.Mini cohort_analysis = configure_cohort_analysis(toy_cohort, toy_hpo) results = cohort_analysis.compare_by_variant_effect(VariantEffect.MISSENSE_VARIANT, 'NM_1234.5') print(results) - summary = results.summarize(toy_hpo, BooleanPredicate.YES) + summary = results.summarize(toy_hpo, PatientCategories.YES) print(summary) def test_get_count(self, toy_cohort: Cohort, toy_hpo: hpotk.MinimalOntology): @@ -46,40 +48,34 @@ def test_get_count(self, toy_cohort: Cohort, toy_hpo: hpotk.MinimalOntology): pd.set_option('expand_frame_repr', False) cohort_analysis = configure_cohort_analysis(toy_cohort, toy_hpo) results = cohort_analysis.compare_by_variant_effect(VariantEffect.MISSENSE_VARIANT, 'NM_1234.5') - string_type = str(type(results)) + # Let's make sure we know what class we have - assert "" == string_type - ## We expect that we have YES and NO as phenotype categories + assert isinstance(results, GenotypePhenotypeAnalysisResult) + + # We expect that we have YES and NO as phenotype categories phenotype_categories_tuple = results.phenotype_categories assert len(phenotype_categories_tuple) == 2 - assert BooleanPredicate.YES in phenotype_categories_tuple - assert BooleanPredicate.NO in phenotype_categories_tuple - ## The all_counts field has the counts of the phenotypes - all_counts = results.all_counts - assert isinstance(all_counts, pd.DataFrame) - # TODO -- all_counts has 52 entries. Is this what we expect? - # The index of all_counts is a Tuple with (HPO TermId, BooleanPredicate - # Let's test Arachnodactyly - we should have one row for each Patient Predicate - yes_tuple = (hpotk.TermId.from_curie("HP:0001166"), BooleanPredicate.YES) - no_tuple = (hpotk.TermId.from_curie("HP:0001166"), BooleanPredicate.NO) - assert yes_tuple in all_counts.index - assert no_tuple in all_counts.index - # The YES row is Arachnodactyly YES -- according to the above, we have 1 (MISSENSE NO) and 13 (MISSENSE YES) - yes_row = all_counts.loc[yes_tuple] - assert yes_row is not None - assert yes_row[BooleanPredicate.NO] == 1 - assert yes_row[BooleanPredicate.YES] == 13 - # The NO row is Arachnodactyly NO -- according to the above, we have 9 (MISSENSE NO) and 3 (MISSENSE YES) - no_row = all_counts.loc[no_tuple] - assert no_row is not None - assert no_row[BooleanPredicate.NO] == 9 - assert no_row[BooleanPredicate.YES] == 3 - total_count = yes_row.sum() + no_row.sum() - assert 26 == total_count - + assert PatientCategories.YES in phenotype_categories_tuple + assert PatientCategories.NO in phenotype_categories_tuple + # The all_counts field has the counts of the phenotypes + all_counts = results.all_counts + assert isinstance(all_counts, typing.Mapping) + # We tested 26 HPO terms + assert len(all_counts) == 26 + # The index of all_counts is a Tuple with (HPO TermId, BooleanPredicate + # Let's test Arachnodactyly - we should have one row for each Patient Predicate + counts = all_counts[hpotk.TermId.from_curie("HP:0001166")] + # The YES row is Arachnodactyly YES -- according to the above, we have 1 (MISSENSE NO) and 13 (MISSENSE YES) + assert counts.loc[PatientCategories.YES, PatientCategories.NO] == 1 + assert counts.loc[PatientCategories.YES, PatientCategories.YES] == 13 + # The NO row is Arachnodactyly NO -- according to the above, we have 9 (MISSENSE NO) and 3 (MISSENSE YES) + assert counts.loc[PatientCategories.NO, PatientCategories.NO] == 9 + assert counts.loc[PatientCategories.NO, PatientCategories.YES] == 3 + # In total, 26 patients were categorized + assert counts.sum().sum() == 26 diff --git a/tests/test_predicates.py b/tests/test_predicates.py index 2f4393d6b..4e0803cc7 100644 --- a/tests/test_predicates.py +++ b/tests/test_predicates.py @@ -3,9 +3,9 @@ import hpotk import pytest -from genophenocorr.analysis.predicate import BooleanPredicate, PatientCategory -from genophenocorr.analysis.predicate.genotype import * +from genophenocorr.analysis.predicate import PatientCategory, PatientCategories from genophenocorr.analysis.predicate.phenotype import PropagatingPhenotypePredicate +from genophenocorr.analysis.predicate.genotype import * from genophenocorr.model import Cohort, Patient, FeatureType, VariantEffect from .conftest import toy_cohort, protein_test_service @@ -17,46 +17,63 @@ def find_patient(pat_id: str, cohort: Cohort) -> typing.Optional[Patient]: return pat -@pytest.mark.parametrize('query, patient_id, expected', - # Patient "HetSingleVar" has Phenotypes: - # Measured and Observed - 'HP:0001166;HP:0002266', # Arachnodactyly;Focal clonic seizure - # Measured but not Observed - 'HP:0001257', # Spasticity - # Not Measured and not Observed - 'HP:0006280', # Chronic pancreatitis - [ - # Test exact match - ('HP:0001166', # Arachnodactyly - 'HetSingleVar', - PropagatingPhenotypePredicate.PRESENT), - # Test inferred annotations - ('HP:0001250', # Seizure - 'HetSingleVar', - PropagatingPhenotypePredicate.PRESENT), - # Test excluded feature - ('HP:0001257', # Spasticity - 'HetSingleVar', - PropagatingPhenotypePredicate.EXCLUDED), - ('HP:0006280', # Chronic pancreatitis - 'HetSingleVar', - PropagatingPhenotypePredicate.NOT_MEASURED), - ]) -def test_HPOPresentPredicate(toy_cohort: Cohort, - toy_hpo: hpotk.Ontology, - query: str, - patient_id: str, - expected: PatientCategory): - patient = find_patient(patient_id, toy_cohort) - predicate = PropagatingPhenotypePredicate(hpo=toy_hpo, phenotypic_feature=hpotk.TermId.from_curie(query)) - actual = predicate.test(patient) - assert actual == expected +class TestPropagatingPhenotypeBooleanPredicate: + + @pytest.mark.parametrize('curie, patient_id, expected', + # Patient "HetSingleVar" has Phenotypes: + # Measured and present - 'HP:0001166;HP:0002266', # Arachnodactyly;Focal clonic seizure + # Measured but excluded - 'HP:0001257', # Spasticity + [ + # Test exact match + ('HP:0001166', # Arachnodactyly + 'HetSingleVar', + PatientCategories.YES), + # Test inferred annotations + ('HP:0001250', # Seizure + 'HetSingleVar', + PatientCategories.YES), + # Test excluded feature + ('HP:0001257', # Spasticity + 'HetSingleVar', + PatientCategories.NO), + ]) + def test_phenotype_predicate__present_or_excluded( + self, + toy_cohort: Cohort, + toy_hpo: hpotk.Ontology, + curie: str, + patient_id: str, + expected: PatientCategory, + ): + patient = find_patient(patient_id, toy_cohort) + term_id = hpotk.TermId.from_curie(curie) + predicate = PropagatingPhenotypePredicate(hpo=toy_hpo, query=term_id) + actual = predicate.test(patient) + + assert actual.phenotype == term_id + assert actual.category == expected + + def test_phenotype_predicate__unknown( + self, + toy_cohort: Cohort, + toy_hpo: hpotk.Ontology, + ): + # Not Measured and not Observed - 'HP:0006280', # Chronic pancreatitis + patient = find_patient('HetSingleVar', toy_cohort) + term_id = hpotk.TermId.from_curie('HP:0006280') + predicate = PropagatingPhenotypePredicate(hpo=toy_hpo, query=term_id) + actual = predicate.test(patient) + + assert actual is None @pytest.mark.parametrize('patient_id, variant_effect, expected_result', - (['HetSingleVar', VariantEffect.FRAMESHIFT_VARIANT, BooleanPredicate.YES], - ['HetSingleVar', VariantEffect.MISSENSE_VARIANT, BooleanPredicate.NO], - ['HetDoubleVar1', VariantEffect.STOP_GAINED, BooleanPredicate.YES], - ['HomoVar', VariantEffect.FRAMESHIFT_VARIANT, BooleanPredicate.YES], - ['LargeCNV', VariantEffect.STOP_LOST, BooleanPredicate.YES], - ['LargeCNV', VariantEffect.FEATURE_TRUNCATION, BooleanPredicate.YES])) + (['HetSingleVar', VariantEffect.FRAMESHIFT_VARIANT, PatientCategories.YES], + ['HetSingleVar', VariantEffect.MISSENSE_VARIANT, PatientCategories.NO], + ['HetDoubleVar1', VariantEffect.STOP_GAINED, PatientCategories.YES], + ['HomoVar', VariantEffect.FRAMESHIFT_VARIANT, PatientCategories.YES], + ['LargeCNV', VariantEffect.STOP_LOST, PatientCategories.YES], + ['LargeCNV', VariantEffect.FEATURE_TRUNCATION, PatientCategories.YES])) def test_VariantEffectPredicate(patient_id: str, variant_effect: VariantEffect, expected_result: PatientCategory, @@ -64,75 +81,68 @@ def test_VariantEffectPredicate(patient_id: str, patient = find_patient(patient_id, toy_cohort) predicate = VariantEffectPredicate('NM_013275.6', effect=variant_effect) result = predicate.test(patient) - assert result == expected_result + assert result.category == expected_result @pytest.mark.parametrize('patient_id, variant, hasVarResult', - (['HetSingleVar', '16_89279850_89279850_G_GC', BooleanPredicate.YES], - # the `HetSingleVar` patient does NOT have the variant. - ['HetSingleVar', '16_89279708_89279725_AGTGTTCGGGGCGGGGCC_A', BooleanPredicate.NO], - # but `HetDoubleVar2` below DOES have the variant. - ['HetDoubleVar2', '16_89279708_89279725_AGTGTTCGGGGCGGGGCC_A', BooleanPredicate.YES], - ['HetDoubleVar1', '16_89284601_89284602_GG_A', BooleanPredicate.YES], - ['HetDoubleVar1', '16_89280752_89280752_G_T', BooleanPredicate.YES], - # the `HomoVar` patient does NOT have the variant - ['HomoVar', '16_89280752_89280752_G_T', BooleanPredicate.NO], - ['HomoVar', '16_89279458_89279459_TG_T', BooleanPredicate.YES], - ['LargeCNV', '16_89190071_89439815_DEL', BooleanPredicate.YES])) + (['HetSingleVar', '16_89279850_89279850_G_GC', PatientCategories.YES], + # the `HetSingleVar` patient does NOT have the variant. + ['HetSingleVar', '16_89279708_89279725_AGTGTTCGGGGCGGGGCC_A', PatientCategories.NO], + # but `HetDoubleVar2` below DOES have the variant. + ['HetDoubleVar2', '16_89279708_89279725_AGTGTTCGGGGCGGGGCC_A', PatientCategories.YES], + ['HetDoubleVar1', '16_89284601_89284602_GG_A', PatientCategories.YES], + ['HetDoubleVar1', '16_89280752_89280752_G_T', PatientCategories.YES], + # the `HomoVar` patient does NOT have the variant + ['HomoVar', '16_89280752_89280752_G_T', PatientCategories.NO], + ['HomoVar', '16_89279458_89279459_TG_T', PatientCategories.YES], + ['LargeCNV', '16_89190071_89439815_DEL', PatientCategories.YES])) def test_VariantPredicate(patient_id, variant, hasVarResult, toy_cohort): predicate = VariantPredicate(variant_key=variant) patient = find_patient(patient_id, toy_cohort) result = predicate.test(patient) - assert result == hasVarResult + assert result.category == hasVarResult @pytest.mark.parametrize('patient_id, exon, hasVarResult', - (['HetSingleVar', 9, BooleanPredicate.YES], - ['HetSingleVar', 13, BooleanPredicate.NO], - ['HetDoubleVar1', 9, BooleanPredicate.YES], - ['HetDoubleVar2', 10, BooleanPredicate.YES], - ['HetDoubleVar2', 9, BooleanPredicate.YES], - ['HomoVar', 10, BooleanPredicate.NO], - ['HomoVar', 9, BooleanPredicate.YES], - ['LargeCNV', 1, BooleanPredicate.NO], - ['LargeCNV', 13, BooleanPredicate.YES])) + (['HetSingleVar', 9, PatientCategories.YES], + ['HetSingleVar', 13, PatientCategories.NO], + ['HetDoubleVar1', 9, PatientCategories.YES], + ['HetDoubleVar2', 10, PatientCategories.YES], + ['HetDoubleVar2', 9, PatientCategories.YES], + ['HomoVar', 10, PatientCategories.NO], + ['HomoVar', 9, PatientCategories.YES], + ['LargeCNV', 1, PatientCategories.NO], + ['LargeCNV', 13, PatientCategories.YES])) def test_ExonPredicate(patient_id, exon, hasVarResult, toy_cohort): patient = find_patient(patient_id, toy_cohort) predicate = ExonPredicate('NM_013275.6', exon_number=exon) result = predicate.test(patient) - assert result == hasVarResult + assert result.category == hasVarResult @pytest.mark.parametrize('patient_id, feature_type, hasVarResult', - (['HetDoubleVar2', FeatureType.REGION, BooleanPredicate.YES], - ['HetDoubleVar2', FeatureType.REPEAT, BooleanPredicate.NO], - ['HetSingleVar', FeatureType.REGION, BooleanPredicate.YES], - ['HomoVar', FeatureType.REGION, BooleanPredicate.YES], - ['HetDoubleVar1', FeatureType.REPEAT, BooleanPredicate.NO])) - ## TODO Why do CNV not show as affecting a feature? - ##['LargeCNV', FeatureType.REGION , HETEROZYGOUS])) + (['HetDoubleVar2', FeatureType.REGION, PatientCategories.YES], + ['HetDoubleVar2', FeatureType.REPEAT, PatientCategories.NO], + ['HetSingleVar', FeatureType.REGION, PatientCategories.YES], + ['HomoVar', FeatureType.REGION, PatientCategories.YES], + ['HetDoubleVar1', FeatureType.REPEAT, PatientCategories.NO])) +## TODO Why do CNV not show as affecting a feature? +##['LargeCNV', FeatureType.REGION , HETEROZYGOUS])) def test_ProteinFeatureTypePredicate(patient_id, feature_type, hasVarResult, toy_cohort, protein_test_service): patient = find_patient(patient_id, toy_cohort) predicate = ProtFeatureTypePredicate('NM_013275.6', feature_type=feature_type, protein_service=protein_test_service) result = predicate.test(patient) - assert result == hasVarResult + assert result.category == hasVarResult @pytest.mark.parametrize('patient_id, feature, hasVarResult', - (['HetDoubleVar2', 'Disordered', BooleanPredicate.YES], - ['HetDoubleVar2', 'BadFeature', BooleanPredicate.NO], - ['HetSingleVar', 'Disordered', BooleanPredicate.YES], - ['HomoVar', 'Disordered', BooleanPredicate.YES], - ['HetDoubleVar1', 'Disordered', BooleanPredicate.YES])) + (['HetDoubleVar2', 'Disordered', PatientCategories.YES], + ['HetDoubleVar2', 'BadFeature', PatientCategories.NO], + ['HetSingleVar', 'Disordered', PatientCategories.YES], + ['HomoVar', 'Disordered', PatientCategories.YES], + ['HetDoubleVar1', 'Disordered', PatientCategories.YES])) def test_ProteinFeaturePredicate(patient_id, feature, hasVarResult, toy_cohort, protein_test_service): predicate = ProtFeaturePredicate('NM_013275.6', protein_feature_name=feature, protein_service=protein_test_service) patient = find_patient(patient_id, toy_cohort) result = predicate.test(patient) - assert result == hasVarResult - - -class TestGenericPredicateMethods: - - def test_get_categories(self): - assert len(PropagatingPhenotypePredicate.get_categories()) == PropagatingPhenotypePredicate.n_categories() - assert len(BooleanPredicate.get_categories()) == BooleanPredicate.n_categories() + assert result.category == hasVarResult