diff --git a/docs/python_api/steps/gwas_catalog_inclusion.md b/docs/python_api/steps/gwas_catalog_inclusion.md deleted file mode 100644 index e9ede6dd6..000000000 --- a/docs/python_api/steps/gwas_catalog_inclusion.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -title: gwas_catalog_study_inclusion ---- - -::: gentropy.gwas_catalog_study_inclusion.GWASCatalogStudyInclusionGenerator diff --git a/docs/python_api/steps/gwas_catalog_ingestion.md b/docs/python_api/steps/gwas_catalog_ingestion.md deleted file mode 100644 index 69ea92479..000000000 --- a/docs/python_api/steps/gwas_catalog_ingestion.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -title: gwas_catalog_ingestion ---- - -::: gentropy.gwas_catalog_ingestion.GWASCatalogIngestionStep diff --git a/docs/python_api/steps/gwas_catalog_study_index.md b/docs/python_api/steps/gwas_catalog_study_index.md new file mode 100644 index 000000000..4984de2a0 --- /dev/null +++ b/docs/python_api/steps/gwas_catalog_study_index.md @@ -0,0 +1,5 @@ +--- +title: gwas_catalog_study_inclusion +--- + +::: gentropy.gwas_catalog_study_index.GWASCatalogStudyIndexGenerationStep diff --git a/docs/python_api/steps/gwas_catalog_top_hits.md b/docs/python_api/steps/gwas_catalog_top_hits.md new file mode 100644 index 000000000..03d81eafb --- /dev/null +++ b/docs/python_api/steps/gwas_catalog_top_hits.md @@ -0,0 +1,5 @@ +--- +title: GWAS Catalog Top Hits Ingestion Step +--- + +::: gentropy.gwas_catalog_top_hits.GWASCatalogTopHitIngestionStep diff --git a/src/gentropy/assets/schemas/study_index.json b/src/gentropy/assets/schemas/study_index.json index a2dac1bca..9c50d4a19 100644 --- a/src/gentropy/assets/schemas/study_index.json +++ b/src/gentropy/assets/schemas/study_index.json @@ -264,33 +264,12 @@ "metadata": {} }, { - "name": "sumStatQCPerformed", - "type": "boolean", - "nullable": true, - "metadata": {} - }, - { - "name": "sumStatQCValues", + "name": "sumstatQCValues", "type": { - "type": "array", - "elementType": { - "type": "struct", - "fields": [ - { - "name": "QCCheckName", - "type": "string", - "nullable": true, - "metadata": {} - }, - { - "name": "QCCheckValue", - "type": "float", - "nullable": true, - "metadata": {} - } - ] - }, - "containsNull": true + "type": "map", + "keyType": "string", + "valueType": "float", + "valueContainsNull": true }, "nullable": true, "metadata": {} diff --git a/src/gentropy/config.py b/src/gentropy/config.py index 26c676e9b..d47c1b8ab 100644 --- a/src/gentropy/config.py +++ b/src/gentropy/config.py @@ -68,44 +68,36 @@ class GWASCatalogStudyCurationConfig(StepConfig): catalog_study_files: list[str] = MISSING catalog_ancestry_files: list[str] = MISSING - catalog_sumstats_lut: str = MISSING gwas_catalog_study_curation_out: str = MISSING gwas_catalog_study_curation_file: str = MISSING _target_: str = "gentropy.gwas_catalog_study_curation.GWASCatalogStudyCurationStep" @dataclass -class GWASCatalogStudyInclusionConfig(StepConfig): - """GWAS Catalog study inclusion step configuration.""" +class GWASCatalogStudyIndexGenerationStep(StepConfig): + """GWAS Catalog study index generation.""" catalog_study_files: list[str] = MISSING catalog_ancestry_files: list[str] = MISSING - catalog_associations_file: str = MISSING - gwas_catalog_study_curation_file: str = MISSING - variant_annotation_path: str = MISSING - harmonised_study_file: str = MISSING - criteria: str = MISSING - inclusion_list_path: str = MISSING - exclusion_list_path: str = MISSING + study_index_path: str = MISSING + gwas_catalog_study_curation_file: str | None = None + sumstats_qc_path: str | None = None _target_: str = ( - "gentropy.gwas_catalog_study_inclusion.GWASCatalogStudyInclusionGenerator" + "gentropy.gwas_catalog_study_index.GWASCatalogStudyIndexGenerationStep" ) @dataclass -class GWASCatalogIngestionConfig(StepConfig): +class GWASCatalogTopHitIngestionConfig(StepConfig): """GWAS Catalog ingestion step configuration.""" catalog_study_files: list[str] = MISSING catalog_ancestry_files: list[str] = MISSING - catalog_sumstats_lut: str = MISSING catalog_associations_file: str = MISSING variant_annotation_path: str = MISSING catalog_studies_out: str = MISSING catalog_associations_out: str = MISSING - gwas_catalog_study_curation_file: str | None = None - inclusion_list_path: str | None = None - _target_: str = "gentropy.gwas_catalog_ingestion.GWASCatalogIngestionStep" + _target_: str = "gentropy.gwas_catalog_top_hits.GWASCatalogTopHitIngestionStep" @dataclass @@ -658,17 +650,19 @@ def register_config() -> None: ) cs.store( group="step", - name="gwas_catalog_study_inclusion", - node=GWASCatalogStudyInclusionConfig, - ) - cs.store( - group="step", name="gwas_catalog_ingestion", node=GWASCatalogIngestionConfig + name="gwas_catalog_study_index", + node=GWASCatalogStudyIndexGenerationStep, ) cs.store( group="step", name="gwas_catalog_sumstat_preprocess", node=GWASCatalogSumstatsPreprocessConfig, ) + cs.store( + group="step", + name="gwas_catalog_top_hit_ingestion", + node=GWASCatalogTopHitIngestionConfig, + ) cs.store(group="step", name="ld_based_clumping", node=LDBasedClumpingConfig) cs.store(group="step", name="ld_index", node=LDIndexConfig) cs.store(group="step", name="locus_to_gene", node=LocusToGeneConfig) diff --git a/src/gentropy/dataset/study_index.py b/src/gentropy/dataset/study_index.py index e7023ee9b..92bdc61a4 100644 --- a/src/gentropy/dataset/study_index.py +++ b/src/gentropy/dataset/study_index.py @@ -13,6 +13,7 @@ from gentropy.assets import data from gentropy.common.schemas import parse_spark_schema +from gentropy.common.spark_helpers import convert_from_wide_to_long from gentropy.dataset.dataset import Dataset if TYPE_CHECKING: @@ -32,13 +33,29 @@ class StudyQualityCheck(Enum): UNKNOWN_STUDY_TYPE (str): Indicating the provided type of study is not supported. UNKNOWN_BIOSAMPLE (str): Flagging if a biosample identifier is not found in the reference. DUPLICATED_STUDY (str): Flagging if a study identifier is not unique. + SUMSTATS_NOT_AVAILABLE (str): Flagging if harmonized summary statistics are not available or empty. + NO_OT_CURATION (str): Flagging if a study has not been curated by Open Targets. + FAILED_MEAN_BETA_CHECK (str): Flagging if the mean beta QC check value is not within the expected range. + FAILED_PZ_CHECK (str): Flagging if the PZ QC check values are not within the expected range. + FAILED_GC_LAMBDA_CHECK (str): Flagging if the GC lambda value is not within the expected range. + SMALL_NUMBER_OF_SNPS (str): Flagging if the number of SNPs in the study is below the expected threshold. """ - UNRESOLVED_TARGET = "Target/gene identifier could not match to reference." - UNRESOLVED_DISEASE = "No valid disease identifier found." - UNKNOWN_STUDY_TYPE = "This type of study is not supported." - UNKNOWN_BIOSAMPLE = "Biosample identifier was not found in the reference." - DUPLICATED_STUDY = "The identifier of this study is not unique." + UNRESOLVED_TARGET = "Target/gene identifier could not match to reference" + UNRESOLVED_DISEASE = "No valid disease identifier found" + UNKNOWN_STUDY_TYPE = "This type of study is not supported" + UNKNOWN_BIOSAMPLE = "Biosample identifier was not found in the reference" + DUPLICATED_STUDY = "The identifier of this study is not unique" + SUMSTATS_NOT_AVAILABLE = "Harmonized summary statistics are not available or empty" + NO_OT_CURATION = "GWAS Catalog study has not been curated by Open Targets" + FAILED_MEAN_BETA_CHECK = ( + "The mean beta QC check value is not within the expected range" + ) + FAILED_PZ_CHECK = "The PZ QC check values are not within the expected range" + FAILED_GC_LAMBDA_CHECK = "The GC lambda value is not within the expected range" + SMALL_NUMBER_OF_SNPS = ( + "The number of SNPs in the study is below the expected threshold" + ) @dataclass @@ -410,7 +427,9 @@ def validate_target(self: StudyIndex, target_index: GeneIndex) -> StudyIndex: return StudyIndex(_df=validated_df, _schema=StudyIndex.get_schema()) - def validate_biosample(self: StudyIndex, biosample_index: BiosampleIndex) -> StudyIndex: + def validate_biosample( + self: StudyIndex, biosample_index: BiosampleIndex + ) -> StudyIndex: """Validating biosample identifiers in the study index against the provided biosample index. Args: @@ -419,7 +438,9 @@ def validate_biosample(self: StudyIndex, biosample_index: BiosampleIndex) -> Stu Returns: StudyIndex: where non-gwas studies are flagged if biosampleIndex could not be validated. """ - biosample_set = biosample_index.df.select("biosampleId", f.lit(True).alias("isIdFound")) + biosample_set = biosample_index.df.select( + "biosampleId", f.lit(True).alias("isIdFound") + ) # If biosampleId in df, we need to drop it: if "biosampleId" in self.df.columns: @@ -430,7 +451,11 @@ def validate_biosample(self: StudyIndex, biosample_index: BiosampleIndex) -> Stu return self validated_df = ( - self.df.join(biosample_set, self.df.biosampleFromSourceId == biosample_set.biosampleId, how="left") + self.df.join( + biosample_set, + self.df.biosampleFromSourceId == biosample_set.biosampleId, + how="left", + ) .withColumn( "isIdFound", f.when( @@ -450,3 +475,118 @@ def validate_biosample(self: StudyIndex, biosample_index: BiosampleIndex) -> Stu ) return StudyIndex(_df=validated_df, _schema=StudyIndex.get_schema()) + + def annotate_sumstats_qc( + self: StudyIndex, + sumstats_qc: DataFrame, + threshold_mean_beta: float = 0.05, + threshold_mean_diff_pz: float = 0.05, + threshold_se_diff_pz: float = 0.05, + threshold_min_gc_lambda: float = 0.7, + threshold_max_gc_lambda: float = 2.5, + threshold_min_n_variants: int = 2_000_000, + ) -> StudyIndex: + """Annotate summary stats QC information. + + Args: + sumstats_qc (DataFrame): containing summary statistics-based quality controls. + threshold_mean_beta (float): Threshold for mean beta check. Defaults to 0.05. + threshold_mean_diff_pz (float): Threshold for mean diff PZ check. Defaults to 0.05. + threshold_se_diff_pz (float): Threshold for SE diff PZ check. Defaults to 0.05. + threshold_min_gc_lambda (float): Minimum threshold for GC lambda check. Defaults to 0.7. + threshold_max_gc_lambda (float): Maximum threshold for GC lambda check. Defaults to 2.5. + threshold_min_n_variants (int): Minimum number of variants for SuSiE check. Defaults to 2_000_000. + + Returns: + StudyIndex: Updated study index with QC information + """ + # convert all columns in sumstats_qc dataframe in array of structs grouped by studyId + cols = [c for c in sumstats_qc.columns if c != "studyId"] + + studies = self.df + + melted_df = convert_from_wide_to_long( + sumstats_qc, + id_vars=["studyId"], + value_vars=cols, + var_name="QCCheckName", + value_name="QCCheckValue", + ) + + qc_df = ( + melted_df.groupBy("studyId") + .agg( + f.map_from_entries( + f.collect_list( + f.struct(f.col("QCCheckName"), f.col("QCCheckValue")) + ) + ).alias("sumStatQCValues") + ) + .select("studyId", "sumstatQCValues") + ) + + df = ( + studies.drop("sumStatQCValues", "hasSumstats") + .join( + qc_df.withColumn("hasSumstats", f.lit(True)), how="left", on="studyId" + ) + .withColumn("hasSumstats", f.coalesce(f.col("hasSumstats"), f.lit(False))) + .withColumn( + "qualityControls", + StudyIndex.update_quality_flag( + f.col("qualityControls"), + ~f.col("hasSumstats"), + StudyQualityCheck.SUMSTATS_NOT_AVAILABLE, + ), + ) + .withColumn( + "qualityControls", + StudyIndex.update_quality_flag( + f.col("qualityControls"), + ~(f.abs(f.col("sumstatQCValues.mean_beta")) <= threshold_mean_beta), + StudyQualityCheck.FAILED_MEAN_BETA_CHECK, + ), + ) + .withColumn( + "qualityControls", + StudyIndex.update_quality_flag( + f.col("qualityControls"), + ~( + ( + f.abs(f.col("sumstatQCValues.mean_diff_pz")) + <= threshold_mean_diff_pz + ) + & (f.col("sumstatQCValues.se_diff_pz") <= threshold_se_diff_pz) + ), + StudyQualityCheck.FAILED_PZ_CHECK, + ), + ) + .withColumn( + "qualityControls", + StudyIndex.update_quality_flag( + f.col("qualityControls"), + ~( + (f.col("sumstatQCValues.gc_lambda") <= threshold_max_gc_lambda) + & ( + f.col("sumstatQCValues.gc_lambda") + >= threshold_min_gc_lambda + ) + ), + StudyQualityCheck.FAILED_GC_LAMBDA_CHECK, + ), + ) + .withColumn( + "qualityControls", + StudyIndex.update_quality_flag( + f.col("qualityControls"), + (f.col("sumstatQCValues.n_variants") < threshold_min_n_variants), + StudyQualityCheck.SMALL_NUMBER_OF_SNPS, + ), + ) + ) + + # Annotate study index with QC information: + return StudyIndex( + _df=df, + _schema=StudyIndex.get_schema(), + ) diff --git a/src/gentropy/datasource/gwas_catalog/associations.py b/src/gentropy/datasource/gwas_catalog/associations.py index b34944b11..da2bcc6df 100644 --- a/src/gentropy/datasource/gwas_catalog/associations.py +++ b/src/gentropy/datasource/gwas_catalog/associations.py @@ -230,7 +230,9 @@ def _map_variants_to_gnomad_variants( "chromosome", # Calculate the position in Ensembl coordinates for indels: GWASCatalogCuratedAssociationsParser.convert_gnomad_position_to_ensembl( - f.col("position"), f.col("referenceAllele"), f.col("alternateAllele") + f.col("position"), + f.col("referenceAllele"), + f.col("alternateAllele"), ).alias("ensemblPosition"), # Keeping GnomAD position: "position", @@ -240,11 +242,7 @@ def _map_variants_to_gnomad_variants( "alleleFrequencies", variant_index.max_maf().alias("maxMaf"), ).join( - f.broadcast( - gwas_associations_subset.select( - "chromosome", "ensemblPosition" - ).distinct() - ), + gwas_associations_subset.select("chromosome", "ensemblPosition").distinct(), on=["chromosome", "ensemblPosition"], how="inner", ) @@ -253,7 +251,7 @@ def _map_variants_to_gnomad_variants( # based on rsIds or allele concordance) filtered_associations = ( gwas_associations_subset.join( - f.broadcast(va_subset), + va_subset, on=["chromosome", "ensemblPosition"], how="left", ) @@ -1108,7 +1106,10 @@ def from_source( pvalue_threshold is keeped in sync with the WindowBasedClumpingStep gwas_significance. """ return StudyLocusGWASCatalog( - _df=gwas_associations.withColumn( + _df=gwas_associations + # drop duplicate rows + .distinct() + .withColumn( "studyLocusId", f.monotonically_increasing_id().cast(StringType()) ) .transform( diff --git a/src/gentropy/datasource/gwas_catalog/study_index.py b/src/gentropy/datasource/gwas_catalog/study_index.py index d0d841105..421f53d0f 100644 --- a/src/gentropy/datasource/gwas_catalog/study_index.py +++ b/src/gentropy/datasource/gwas_catalog/study_index.py @@ -7,61 +7,15 @@ import pyspark.sql.functions as f import pyspark.sql.types as t -from pyspark import SparkFiles -from gentropy.common.session import Session from gentropy.common.spark_helpers import column2camel_case from gentropy.common.utils import parse_efos -from gentropy.dataset.study_index import StudyIndex +from gentropy.dataset.study_index import StudyIndex, StudyQualityCheck if TYPE_CHECKING: from pyspark.sql import Column, DataFrame -def read_curation_table( - curation_path: str | None, session: Session -) -> DataFrame | None: - """Read curation table if path or URL is given. - - Curation itself is fully optional everything should work without it. - - Args: - curation_path (str | None): Optionally given path the curation tsv. - session (Session): session object - - Returns: - DataFrame | None: if curation was provided, - """ - # If no curation path provided, we are returning none: - if curation_path is None: - return None - # Read curation from the web: - elif curation_path.startswith("http"): - # Registering file: - session.spark.sparkContext.addFile(curation_path) - - # Reading file: - curation_df = session.spark.read.csv( - SparkFiles.get(curation_path.split("/")[-1]), sep="\t", header=True - ) - # Read curation from file: - else: - curation_df = session.spark.read.csv(curation_path, sep="\t", header=True) - return curation_df.select( - "studyId", - "studyType", - f.when(f.col("analysisFlag").isNotNull(), f.split(f.col("analysisFlag"), r"\|")) - .otherwise(f.array()) - .alias("analysisFlags"), - f.when( - f.col("qualityControl").isNotNull(), f.split(f.col("qualityControl"), r"\|") - ) - .otherwise(f.array()) - .alias("qualityControls"), - f.col("isCurated").cast(t.BooleanType()), - ) - - @dataclass class StudyIndexGWASCatalogParser: """GWAS Catalog study index parser. @@ -316,14 +270,12 @@ def from_source( cls: type[StudyIndexGWASCatalogParser], catalog_studies: DataFrame, ancestry_file: DataFrame, - sumstats_lut: DataFrame, ) -> StudyIndexGWASCatalog: """Ingests study level metadata from the GWAS Catalog. Args: catalog_studies (DataFrame): GWAS Catalog raw study table ancestry_file (DataFrame): GWAS Catalog ancestry table. - sumstats_lut (DataFrame): GWAS Catalog summary statistics list. Returns: StudyIndexGWASCatalog: Parsed and annotated GWAS Catalog study table. @@ -332,7 +284,6 @@ def from_source( return ( cls._parse_study_table(catalog_studies) .annotate_ancestries(ancestry_file) - .annotate_sumstats_info(sumstats_lut) .annotate_discovery_sample_sizes() ) @@ -403,7 +354,13 @@ def annotate_from_study_curation( if curation_table is None: return self - columns = self.df.columns + studies = self.df + + if "qualityControls" not in studies.columns: + studies = studies.withColumn("qualityControls", f.array()) + + if "analysisFlags" not in studies.columns: + studies = studies.withColumn("analysisFlags", f.array()) # Adding prefix to columns in the curation table: curation_table = curation_table.select( @@ -415,46 +372,34 @@ def annotate_from_study_curation( ] ) - # Create expression how to update/create quality controls dataset: - qualityControls_expression = ( - f.col("curation_qualityControls") - if "qualityControls" not in columns - else f.when( - f.col("curation_qualityControls").isNotNull(), - f.array_union( - f.col("qualityControls"), f.array(f.col("curation_qualityControls")) - ), - ).otherwise(f.col("qualityControls")) - ) - - # Create expression how to update/create analysis flag: - analysis_expression = ( - f.col("curation_analysisFlags") - if "analysisFlags" not in columns - else f.when( - f.col("curation_analysisFlags").isNotNull(), - f.array_union( - f.col("analysisFlags"), f.array(f.col("curation_analysisFlags")) - ), - ).otherwise(f.col("analysisFlags")) - ) - - # Updating columns list. We might or might not list columns twice, but that doesn't matter, unique set will generated: - columns = list(set(columns + ["qualityControls", "analysisFlags"])) - # Based on the curation table, columns needs to be updated: curated_df = ( - self.df.join(curation_table, on="studyId", how="left") + studies.join( + curation_table.withColumn("isCurated", f.lit(True)), + on="studyId", + how="left", + ) + .withColumn("isCurated", f.coalesce(f.col("isCurated"), f.lit(False))) # Updating study type: .withColumn( "studyType", f.coalesce(f.col("curation_studyType"), f.col("studyType")) ) - # Updating quality controls: - .withColumn("qualityControls", qualityControls_expression) # Updating study annotation flags: - .withColumn("analysisFlags", analysis_expression) + .withColumn( + "analysisFlags", + f.array_union(f.col("analysisFlags"), f.col("curation_analysisFlags")), + ) + .withColumn("analysisFlags", f.coalesce(f.col("analysisFlags"), f.array())) + .withColumn( + "qualityControls", + StudyIndex.update_quality_flag( + f.col("qualityControls"), + ~f.col("isCurated"), + StudyQualityCheck.NO_OT_CURATION, + ), + ) # Dropping columns coming from the curation table: - .select(*columns) + .select(*studies.columns) ) return StudyIndexGWASCatalog( _df=curated_df, _schema=StudyIndexGWASCatalog.get_schema() @@ -641,47 +586,6 @@ def annotate_ancestries( self.df = self.df.join(parsed_ancestry_lut, on="studyId", how="left") return self - def annotate_sumstats_info( - self: StudyIndexGWASCatalog, sumstats_lut: DataFrame - ) -> StudyIndexGWASCatalog: - """Annotate summary stat locations. - - Args: - sumstats_lut (DataFrame): listing GWAS Catalog summary stats paths - - Returns: - StudyIndexGWASCatalog: including `summarystatsLocation` and `hasSumstats` columns - - Raises: - ValueError: if the sumstats_lut table doesn't have the right columns - """ - gwas_sumstats_base_uri = ( - "ftp://ftp.ebi.ac.uk/pub/databases/gwas/summary_statistics/" - ) - - if "_c0" not in sumstats_lut.columns: - raise ValueError( - f'Sumstats look-up table needs to have `_c0` column. However it has: {",".join(sumstats_lut.columns)}' - ) - - parsed_sumstats_lut = sumstats_lut.withColumn( - "summarystatsLocation", - f.concat( - f.lit(gwas_sumstats_base_uri), - f.regexp_replace(f.col("_c0"), r"^\.\/", ""), - ), - ).select( - self._parse_gwas_catalog_study_id("summarystatsLocation").alias("studyId"), - "summarystatsLocation", - f.lit(True).alias("hasSumstats"), - ) - self.df = ( - self.df.drop("hasSumstats") - .join(parsed_sumstats_lut, on="studyId", how="left") - .withColumn("hasSumstats", f.coalesce(f.col("hasSumstats"), f.lit(False))) - ) - return self - def annotate_discovery_sample_sizes( self: StudyIndexGWASCatalog, ) -> StudyIndexGWASCatalog: diff --git a/src/gentropy/datasource/gwas_catalog/study_index_ot_curation.py b/src/gentropy/datasource/gwas_catalog/study_index_ot_curation.py new file mode 100644 index 000000000..8d75e4824 --- /dev/null +++ b/src/gentropy/datasource/gwas_catalog/study_index_ot_curation.py @@ -0,0 +1,90 @@ +"""Study Index for GWAS Catalog data source.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING + +import pyspark.sql.functions as f +import pyspark.sql.types as t +from pyspark import SparkFiles + +from gentropy.common.session import Session + +if TYPE_CHECKING: + from pyspark.sql import DataFrame + + +@dataclass +class StudyIndexGWASCatalogOTCuration: + """Study Index Curation for GWAS Catalog data source. + + This class is responsible for parsing additional curation for the GWAS Catalog studies. + """ + + @staticmethod + def _parser(df: DataFrame) -> DataFrame: + """Parse the curation table. + + Args: + df (DataFrame): DataFrame with the curation table. + + Returns: + DataFrame: DataFrame with the parsed curation table. + """ + if "qualityControl" not in df.columns: + # Add the 'qualityControl' column with null values + df = df.withColumn("qualityControl", f.lit(None).cast("string")) + return df.select( + "studyId", + "studyType", + f.when( + f.col("analysisFlag").isNotNull(), f.split(f.col("analysisFlag"), r"\|") + ) + .otherwise(f.array()) + .alias("analysisFlags"), + f.when( + f.col("qualityControl").isNotNull(), + f.split(f.col("qualityControl"), r"\|"), + ) + .otherwise(f.array()) + .alias("qualityControls"), + f.col("isCurated").cast(t.BooleanType()), + ) + + @classmethod + def from_csv( + cls: type[StudyIndexGWASCatalogOTCuration], session: Session, curation_path: str + ) -> DataFrame: + """Read curation table from csv. + + Args: + session (Session): Session object. + curation_path (str): Path to the curation table. + + Returns: + DataFrame: DataFrame with the curation table. + """ + return cls._parser(session.spark.read.csv(curation_path, sep="\t", header=True)) + + @classmethod + def from_url( + cls: type[StudyIndexGWASCatalogOTCuration], session: Session, curation_url: str + ) -> DataFrame: + """Read curation table from URL. + + Args: + session (Session): Session object. + curation_url (str): URL to the curation table. + + Returns: + DataFrame: DataFrame with the curation table. + """ + # Registering file: + session.spark.sparkContext.addFile(curation_url) + + return cls._parser( + session.spark.read.csv( + SparkFiles.get(curation_url.split("/")[-1]), sep="\t", header=True + ) + ) diff --git a/src/gentropy/gwas_catalog_study_curation.py b/src/gentropy/gwas_catalog_study_curation.py index 7329a1679..530e03ea6 100644 --- a/src/gentropy/gwas_catalog_study_curation.py +++ b/src/gentropy/gwas_catalog_study_curation.py @@ -1,10 +1,13 @@ """Step to update GWAS Catalog study curation file based on newly released GWAS Catalog dataset.""" + from __future__ import annotations from gentropy.common.session import Session from gentropy.datasource.gwas_catalog.study_index import ( StudyIndexGWASCatalogParser, - read_curation_table, +) +from gentropy.datasource.gwas_catalog.study_index_ot_curation import ( + StudyIndexGWASCatalogOTCuration, ) @@ -16,7 +19,6 @@ def __init__( session: Session, catalog_study_files: list[str], catalog_ancestry_files: list[str], - catalog_sumstats_lut: str, gwas_catalog_study_curation_out: str, gwas_catalog_study_curation_file: str | None, ) -> None: @@ -26,9 +28,11 @@ def __init__( session (Session): Session object. catalog_study_files (list[str]): List of raw GWAS catalog studies file. catalog_ancestry_files (list[str]): List of raw ancestry annotations files from GWAS Catalog. - catalog_sumstats_lut (str): GWAS Catalog summary statistics lookup table. gwas_catalog_study_curation_out (str): Path for the updated curation table. gwas_catalog_study_curation_file (str | None): Path to the original curation table. Optinal + + Raises: + ValueError: If the curation file is provided but not a CSV file or URL. """ catalog_studies = session.spark.read.csv( list(catalog_study_files), sep="\t", header=True @@ -36,18 +40,24 @@ def __init__( ancestry_lut = session.spark.read.csv( list(catalog_ancestry_files), sep="\t", header=True ) - sumstats_lut = session.spark.read.csv( - catalog_sumstats_lut, sep="\t", header=False - ) - gwas_catalog_study_curation = read_curation_table( - gwas_catalog_study_curation_file, session - ) + + if gwas_catalog_study_curation_file: + if gwas_catalog_study_curation_file.endswith(".csv"): + gwas_catalog_study_curation = StudyIndexGWASCatalogOTCuration.from_csv( + session, gwas_catalog_study_curation_file + ) + elif gwas_catalog_study_curation_file.startswith("http"): + gwas_catalog_study_curation = StudyIndexGWASCatalogOTCuration.from_url( + session, gwas_catalog_study_curation_file + ) + else: + raise ValueError( + "Only CSV files or URLs are accepted as curation file." + ) # Process GWAS Catalog studies and get list of studies for curation: ( - StudyIndexGWASCatalogParser.from_source( - catalog_studies, ancestry_lut, sumstats_lut - ) + StudyIndexGWASCatalogParser.from_source(catalog_studies, ancestry_lut) # Adding existing curation: .annotate_from_study_curation(gwas_catalog_study_curation) # Extract new studies for curation: diff --git a/src/gentropy/gwas_catalog_study_inclusion.py b/src/gentropy/gwas_catalog_study_inclusion.py deleted file mode 100644 index f07f851a7..000000000 --- a/src/gentropy/gwas_catalog_study_inclusion.py +++ /dev/null @@ -1,190 +0,0 @@ -"""Step to generate an GWAS Catalog study identifier inclusion and exclusion list.""" - -from __future__ import annotations - -from typing import TYPE_CHECKING - -from pyspark.sql import functions as f - -from gentropy.common.session import Session -from gentropy.dataset.variant_index import VariantIndex -from gentropy.datasource.gwas_catalog.associations import ( - GWASCatalogCuratedAssociationsParser, -) -from gentropy.datasource.gwas_catalog.study_index import ( - StudyIndexGWASCatalog, - StudyIndexGWASCatalogParser, - read_curation_table, -) -from gentropy.datasource.gwas_catalog.study_splitter import GWASCatalogStudySplitter - -if TYPE_CHECKING: - from pyspark.sql import Column, DataFrame - - -class GWASCatalogStudyInclusionGenerator: - """GWAS Catalog study eligibility for ingestion based on curation and the provided criteria.""" - - @staticmethod - def flag_eligible_studies( - study_index: StudyIndexGWASCatalog, criteria: str - ) -> DataFrame: - """Apply filter on GWAS Catalog studies based on the provided criteria. - - Args: - study_index (StudyIndexGWASCatalog): complete study index to be filtered based on the provided filter set - criteria (str): name of the filter set to be applied. - - Raises: - ValueError: if the provided filter set is not in the accepted values. - - Returns: - DataFrame: filtered dataframe containing only eligible studies. - """ - filters: dict[str, Column] = { - # Filters applied on studies for ingesting curated associations: - "curation": (study_index.is_gwas() & study_index.has_mapped_trait()), - # Filters applied on studies for ingesting summary statistics: - "summary_stats": ( - study_index.is_gwas() - & study_index.has_mapped_trait() - & (~study_index.is_quality_flagged()) - & study_index.has_summarystats() - ), - } - - if criteria not in filters: - raise ValueError( - f'Wrong value as filter set ({criteria}). Accepted: {",".join(filters.keys())}' - ) - - # Applying the relevant filter to the study: - return study_index.df.select( - "studyId", - "studyType", - "traitFromSource", - "traitFromSourceMappedIds", - "qualityControls", - "hasSumstats", - filters[criteria].alias("isEligible"), - ) - - @staticmethod - def process_harmonised_list(studies: list[str], session: Session) -> DataFrame: - """Generate spark dataframe from the provided list. - - Args: - studies (list[str]): list of path pointing to harmonised summary statistics. - session (Session): session - - Returns: - DataFrame: column name is consistent with original implementatin - """ - return session.spark.createDataFrame([{"_c0": path} for path in studies]) - - @staticmethod - def get_gwas_catalog_study_index( - session: Session, - gnomad_variant_path: str, - catalog_study_files: list[str], - catalog_ancestry_files: list[str], - harmonised_study_file: str, - catalog_associations_file: str, - gwas_catalog_study_curation_file: str, - ) -> StudyIndexGWASCatalog: - """Return GWAS Catalog study index. - - Args: - session (Session): Session object. - gnomad_variant_path (str): Path to GnomAD variant list. - catalog_study_files (list[str]): List of raw GWAS catalog studies file. - catalog_ancestry_files (list[str]): List of raw ancestry annotations files from GWAS Catalog. - harmonised_study_file (str): GWAS Catalog summary statistics lookup table. - catalog_associations_file (str): Raw GWAS catalog associations file. - gwas_catalog_study_curation_file (str): file of the curation table. Optional. - - Returns: - StudyIndexGWASCatalog: Completely processed and fully annotated study index. - """ - # Extract - gnomad_variants = VariantIndex.from_parquet(session, gnomad_variant_path) - catalog_studies = session.spark.read.csv( - list(catalog_study_files), sep="\t", header=True - ) - ancestry_lut = session.spark.read.csv( - list(catalog_ancestry_files), sep="\t", header=True - ) - sumstats_lut = session.spark.read.csv( - harmonised_study_file, sep="\t", header=False - ) - catalog_associations = session.spark.read.csv( - catalog_associations_file, sep="\t", header=True - ).persist() - gwas_catalog_study_curation = read_curation_table( - gwas_catalog_study_curation_file, session - ) - - # Transform - study_index, _ = GWASCatalogStudySplitter.split( - StudyIndexGWASCatalogParser.from_source( - catalog_studies, - ancestry_lut, - sumstats_lut, - ).annotate_from_study_curation(gwas_catalog_study_curation), - GWASCatalogCuratedAssociationsParser.from_source( - catalog_associations, gnomad_variants - ), - ) - - return study_index - - def __init__( - self, - session: Session, - catalog_study_files: list[str], - catalog_ancestry_files: list[str], - catalog_associations_file: str, - gwas_catalog_study_curation_file: str, - gnomad_variant_path: str, - harmonised_study_file: str, - criteria: str, - inclusion_list_path: str, - exclusion_list_path: str, - ) -> None: - """Run step. - - Args: - session (Session): Session objecct. - catalog_study_files (list[str]): List of raw GWAS catalog studies file. - catalog_ancestry_files (list[str]): List of raw ancestry annotations files from GWAS Catalog. - catalog_associations_file (str): Raw GWAS catalog associations file. - gwas_catalog_study_curation_file (str): file of the curation table. Optional. - gnomad_variant_path (str): Path to GnomAD variant list. - harmonised_study_file (str): GWAS Catalog summary statistics lookup table. - criteria (str): name of the filter set to be applied. - inclusion_list_path (str): Output path for the inclusion list. - exclusion_list_path (str): Output path for the exclusion list. - """ - # Create study index: - study_index = self.get_gwas_catalog_study_index( - session, - gnomad_variant_path, - catalog_study_files, - catalog_ancestry_files, - harmonised_study_file, - catalog_associations_file, - gwas_catalog_study_curation_file, - ) - - # Get study indices for inclusion: - flagged_studies = self.flag_eligible_studies(study_index, criteria) - - # Output inclusion list: - eligible = ( - flagged_studies.filter(f.col("isEligible")).select("studyId").persist() - ) - eligible.write.mode(session.write_mode).parquet(inclusion_list_path) - - # Output exclusion list: - excluded = flagged_studies.filter(~f.col("isEligible")).persist() - excluded.write.mode(session.write_mode).parquet(exclusion_list_path) diff --git a/src/gentropy/gwas_catalog_study_index.py b/src/gentropy/gwas_catalog_study_index.py new file mode 100644 index 000000000..6c19b6909 --- /dev/null +++ b/src/gentropy/gwas_catalog_study_index.py @@ -0,0 +1,99 @@ +"""Step to generate an GWAS Catalog study identifier inclusion and exclusion list.""" + +from __future__ import annotations + +from pyspark.sql.types import DoubleType, LongType, StringType, StructField, StructType + +from gentropy.common.session import Session +from gentropy.datasource.gwas_catalog.study_index import StudyIndexGWASCatalogParser +from gentropy.datasource.gwas_catalog.study_index_ot_curation import ( + StudyIndexGWASCatalogOTCuration, +) + + +class GWASCatalogStudyIndexGenerationStep: + """GWAS Catalog study index generation. + + This step generates a study index from the GWAS Catalog studies and ancestry files. It can also add additional curation information and summary statistics QC information when available. + + ''' warning + This step does not generate study index for gwas catalog top hits. + + This step provides several optional arguments to add additional information to the study index: + + - gwas_catalog_study_curation_file: csv file or URL containing the curation table. If provided it annotates the study index with the additional curation information performed by the Open Targets team. + - sumstats_qc_path: Path to the summary statistics QC table. If provided it annotates the study index with the summary statistics QC information in the `sumStatQCValues` columns (e.g. `n_variants`, `n_variants_sig` etc.). + """ + + def __init__( + self, + session: Session, + catalog_study_files: list[str], + catalog_ancestry_files: list[str], + study_index_path: str, + gwas_catalog_study_curation_file: str | None = None, + sumstats_qc_path: str | None = None, + ) -> None: + """Run step. + + Args: + session (Session): Session objecct. + catalog_study_files (list[str]): List of raw GWAS catalog studies file. + catalog_ancestry_files (list[str]): List of raw ancestry annotations files from GWAS Catalog. + study_index_path (str): Output GWAS catalog studies path. + gwas_catalog_study_curation_file (str | None): csv file or URL containing the curation table. Optional. + sumstats_qc_path (str | None): Path to the summary statistics QC table. Optional. + + Raises: + ValueError: If the curation file is provided but not a CSV file or URL. + """ + # Core Study Index Generation: + study_index = StudyIndexGWASCatalogParser.from_source( + session.spark.read.csv(list(catalog_study_files), sep="\t", header=True), + session.spark.read.csv(list(catalog_ancestry_files), sep="\t", header=True), + ) + + # Annotate with curation if provided: + if gwas_catalog_study_curation_file: + if gwas_catalog_study_curation_file.endswith( + ".tsv" + ) | gwas_catalog_study_curation_file.endswith(".tsv"): + gwas_catalog_study_curation = StudyIndexGWASCatalogOTCuration.from_csv( + session, gwas_catalog_study_curation_file + ) + elif gwas_catalog_study_curation_file.startswith("http"): + gwas_catalog_study_curation = StudyIndexGWASCatalogOTCuration.from_url( + session, gwas_catalog_study_curation_file + ) + else: + raise ValueError( + "Only CSV/TSV files or URLs are accepted as curation file." + ) + study_index = study_index.annotate_from_study_curation( + gwas_catalog_study_curation + ) + + # Annotate with sumstats QC if provided: + if sumstats_qc_path: + schema = StructType( + [ + StructField("studyId", StringType(), True), + StructField("mean_beta", DoubleType(), True), + StructField("mean_diff_pz", DoubleType(), True), + StructField("se_diff_pz", DoubleType(), True), + StructField("gc_lambda", DoubleType(), True), + StructField("n_variants", LongType(), True), + StructField("n_variants_sig", LongType(), True), + ] + ) + sumstats_qc = session.spark.read.schema(schema).parquet( + sumstats_qc_path, recursiveFileLookup=True + ) + study_index_with_qc = study_index.annotate_sumstats_qc(sumstats_qc) + + # Write the study + study_index_with_qc.df.write.mode(session.write_mode).parquet( + study_index_path + ) + else: + study_index.df.write.mode(session.write_mode).parquet(study_index_path) diff --git a/src/gentropy/gwas_catalog_ingestion.py b/src/gentropy/gwas_catalog_top_hits.py similarity index 59% rename from src/gentropy/gwas_catalog_ingestion.py rename to src/gentropy/gwas_catalog_top_hits.py index 5dab5bf16..95722c768 100644 --- a/src/gentropy/gwas_catalog_ingestion.py +++ b/src/gentropy/gwas_catalog_top_hits.py @@ -10,30 +10,23 @@ ) from gentropy.datasource.gwas_catalog.study_index import ( StudyIndexGWASCatalogParser, - read_curation_table, ) from gentropy.datasource.gwas_catalog.study_splitter import GWASCatalogStudySplitter -class GWASCatalogIngestionStep: - """GWAS Catalog ingestion step to extract GWASCatalog Study and StudyLocus tables. - - !!! note This step currently only processes the GWAS Catalog curated list of top hits. - """ +class GWASCatalogTopHitIngestionStep: + """GWAS Catalog ingestion step to extract GWASCatalog top hits.""" def __init__( self, session: Session, catalog_study_files: list[str], catalog_ancestry_files: list[str], - catalog_sumstats_lut: str, catalog_associations_file: str, - gnomad_variant_path: str, + variant_annotation_path: str, catalog_studies_out: str, catalog_associations_out: str, distance: int = WindowBasedClumpingStepConfig().distance, - gwas_catalog_study_curation_file: str | None = None, - inclusion_list_path: str | None = None, ) -> None: """Run step. @@ -41,52 +34,31 @@ def __init__( session (Session): Session object. catalog_study_files (list[str]): List of raw GWAS catalog studies file. catalog_ancestry_files (list[str]): List of raw ancestry annotations files from GWAS Catalog. - catalog_sumstats_lut (str): GWAS Catalog summary statistics lookup table. catalog_associations_file (str): Raw GWAS catalog associations file. - gnomad_variant_path (str): Path to GnomAD variants. + variant_annotation_path (str): Path to GnomAD variants. catalog_studies_out (str): Output GWAS catalog studies path. catalog_associations_out (str): Output GWAS catalog associations path. distance (int): Distance, within which tagging variants are collected around the semi-index. - gwas_catalog_study_curation_file (str | None): file of the curation table. Optional. - inclusion_list_path (str | None): optional inclusion list (parquet) """ # Extract - gnomad_variants = VariantIndex.from_parquet(session, gnomad_variant_path) + gnomad_variants = VariantIndex.from_parquet(session, variant_annotation_path) catalog_studies = session.spark.read.csv( list(catalog_study_files), sep="\t", header=True ) ancestry_lut = session.spark.read.csv( list(catalog_ancestry_files), sep="\t", header=True ) - sumstats_lut = session.spark.read.csv( - catalog_sumstats_lut, sep="\t", header=False - ) catalog_associations = session.spark.read.csv( catalog_associations_file, sep="\t", header=True ).persist() - gwas_catalog_study_curation = read_curation_table( - gwas_catalog_study_curation_file, session - ) # Transform study_index, study_locus = GWASCatalogStudySplitter.split( - StudyIndexGWASCatalogParser.from_source( - catalog_studies, ancestry_lut, sumstats_lut - ).annotate_from_study_curation(gwas_catalog_study_curation), + StudyIndexGWASCatalogParser.from_source(catalog_studies, ancestry_lut), GWASCatalogCuratedAssociationsParser.from_source( catalog_associations, gnomad_variants ), ) - - # if inclusion list is provided apply filter: - if inclusion_list_path: - inclusion_list = session.spark.read.parquet( - inclusion_list_path, sep="\t", header=True - ) - - study_index = study_index.apply_inclusion_list(inclusion_list) - study_locus = study_locus.apply_inclusion_list(inclusion_list) - # Load study_index.df.write.mode(session.write_mode).parquet(catalog_studies_out) diff --git a/tests/gentropy/dataset/test_dataset_exclusion.py b/tests/gentropy/dataset/test_dataset_exclusion.py index 329a0a1d5..1b6fce967 100644 --- a/tests/gentropy/dataset/test_dataset_exclusion.py +++ b/tests/gentropy/dataset/test_dataset_exclusion.py @@ -24,11 +24,11 @@ class TestDataExclusion: # Good study no flag: ("S1", None), # Good study permissive flag: - ("S2", "This type of study is not supported."), - ("S2", "No valid disease identifier found."), + ("S2", "This type of study is not supported"), + ("S2", "No valid disease identifier found"), # Bad study: - ("S3", "The identifier of this study is not unique."), - ("S3", "This type of study is not supported."), + ("S3", "The identifier of this study is not unique"), + ("S3", "This type of study is not supported"), ] @pytest.fixture(autouse=True) diff --git a/tests/gentropy/datasource/gwas_catalog/test_gwas_catalog_curation.py b/tests/gentropy/datasource/gwas_catalog/test_gwas_catalog_curation.py index 4163531cb..0dd1a7363 100644 --- a/tests/gentropy/datasource/gwas_catalog/test_gwas_catalog_curation.py +++ b/tests/gentropy/datasource/gwas_catalog/test_gwas_catalog_curation.py @@ -143,39 +143,6 @@ def test_curation__study_type_update( assert expected == observed - # Test update qc flag - @staticmethod - def test_curation__quality_controls( - mock_gwas_study_index: StudyIndexGWASCatalog, mock_study_curation: DataFrame - ) -> None: - """Test for making sure the study type got updated.""" - curated = mock_gwas_study_index.annotate_from_study_curation( - mock_study_curation - ) - - # Expected studyIds: - expected = [ - row["studyId"] - for row in ( - mock_study_curation.filter(f.col("qualityControls").isNotNull()) - .select("studyId") - .distinct() - .collect() - ) - ] - - observed = [ - row["studyId"] - for row in ( - curated.df.filter(f.size(f.col("qualityControls")) > 0) - .select("studyId") - .distinct() - .collect() - ) - ] - - assert expected == observed - # Test updated method flag @staticmethod def test_curation__analysis_flags( diff --git a/tests/gentropy/datasource/gwas_catalog/test_gwas_catalog_study_index.py b/tests/gentropy/datasource/gwas_catalog/test_gwas_catalog_study_index.py index b91529b3d..b3ff4e486 100644 --- a/tests/gentropy/datasource/gwas_catalog/test_gwas_catalog_study_index.py +++ b/tests/gentropy/datasource/gwas_catalog/test_gwas_catalog_study_index.py @@ -31,25 +31,8 @@ def test_parse_study_table(sample_gwas_catalog_studies: DataFrame) -> None: ) -def test_annotate_sumstats( - mock_study_index_gwas_catalog: StudyIndexGWASCatalog, - sample_gwas_catalog_harmonised_sumstats_list: DataFrame, -) -> None: - """Test annotate sumstats of GWASCatalogStudyIndex.""" - mock_study_index_gwas_catalog.df = mock_study_index_gwas_catalog.df.drop( - "summarystatsLocation" - ) - assert isinstance( - mock_study_index_gwas_catalog.annotate_sumstats_info( - sample_gwas_catalog_harmonised_sumstats_list - ), - StudyIndexGWASCatalog, - ) - - def test_study_index_from_source( sample_gwas_catalog_studies: DataFrame, - sample_gwas_catalog_harmonised_sumstats_list: DataFrame, sample_gwas_catalog_ancestries_lut: DataFrame, ) -> None: """Test study index from source.""" @@ -57,7 +40,6 @@ def test_study_index_from_source( StudyIndexGWASCatalogParser.from_source( sample_gwas_catalog_studies, sample_gwas_catalog_ancestries_lut, - sample_gwas_catalog_harmonised_sumstats_list, ), StudyIndexGWASCatalog, )