From a479920d8c24aed216791827b78b0f1704be7dbb Mon Sep 17 00:00:00 2001 From: David Ochoa Date: Tue, 1 Oct 2024 14:40:56 +0100 Subject: [PATCH 01/26] fix: wrong step parameter --- src/gentropy/gwas_catalog_ingestion.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/gentropy/gwas_catalog_ingestion.py b/src/gentropy/gwas_catalog_ingestion.py index 5dab5bf16..5a5461523 100644 --- a/src/gentropy/gwas_catalog_ingestion.py +++ b/src/gentropy/gwas_catalog_ingestion.py @@ -28,7 +28,7 @@ def __init__( catalog_ancestry_files: list[str], catalog_sumstats_lut: str, catalog_associations_file: str, - gnomad_variant_path: str, + variant_annotation_path: str, catalog_studies_out: str, catalog_associations_out: str, distance: int = WindowBasedClumpingStepConfig().distance, @@ -43,7 +43,7 @@ def __init__( catalog_ancestry_files (list[str]): List of raw ancestry annotations files from GWAS Catalog. catalog_sumstats_lut (str): GWAS Catalog summary statistics lookup table. catalog_associations_file (str): Raw GWAS catalog associations file. - gnomad_variant_path (str): Path to GnomAD variants. + variant_annotation_path (str): Path to GnomAD variants. catalog_studies_out (str): Output GWAS catalog studies path. catalog_associations_out (str): Output GWAS catalog associations path. distance (int): Distance, within which tagging variants are collected around the semi-index. @@ -51,7 +51,7 @@ def __init__( inclusion_list_path (str | None): optional inclusion list (parquet) """ # Extract - gnomad_variants = VariantIndex.from_parquet(session, gnomad_variant_path) + gnomad_variants = VariantIndex.from_parquet(session, variant_annotation_path) catalog_studies = session.spark.read.csv( list(catalog_study_files), sep="\t", header=True ) From 395060256745ecce2143464764f1409ab4351c12 Mon Sep 17 00:00:00 2001 From: David Ochoa Date: Tue, 1 Oct 2024 15:17:24 +0100 Subject: [PATCH 02/26] fix: persist va_subset --- .../datasource/gwas_catalog/associations.py | 50 +++++++++++-------- 1 file changed, 28 insertions(+), 22 deletions(-) diff --git a/src/gentropy/datasource/gwas_catalog/associations.py b/src/gentropy/datasource/gwas_catalog/associations.py index b34944b11..b22c7a3f5 100644 --- a/src/gentropy/datasource/gwas_catalog/associations.py +++ b/src/gentropy/datasource/gwas_catalog/associations.py @@ -225,28 +225,34 @@ def _map_variants_to_gnomad_variants( ) # Subset of variant annotation required for GWAS Catalog annotations: - va_subset = variant_index.df.select( - "variantId", - "chromosome", - # Calculate the position in Ensembl coordinates for indels: - GWASCatalogCuratedAssociationsParser.convert_gnomad_position_to_ensembl( - f.col("position"), f.col("referenceAllele"), f.col("alternateAllele") - ).alias("ensemblPosition"), - # Keeping GnomAD position: - "position", - f.col("rsIds").alias("rsIdsGnomad"), - "referenceAllele", - "alternateAllele", - "alleleFrequencies", - variant_index.max_maf().alias("maxMaf"), - ).join( - f.broadcast( - gwas_associations_subset.select( - "chromosome", "ensemblPosition" - ).distinct() - ), - on=["chromosome", "ensemblPosition"], - how="inner", + va_subset = ( + variant_index.df.select( + "variantId", + "chromosome", + # Calculate the position in Ensembl coordinates for indels: + GWASCatalogCuratedAssociationsParser.convert_gnomad_position_to_ensembl( + f.col("position"), + f.col("referenceAllele"), + f.col("alternateAllele"), + ).alias("ensemblPosition"), + # Keeping GnomAD position: + "position", + f.col("rsIds").alias("rsIdsGnomad"), + "referenceAllele", + "alternateAllele", + "alleleFrequencies", + variant_index.max_maf().alias("maxMaf"), + ) + .join( + f.broadcast( + gwas_associations_subset.select( + "chromosome", "ensemblPosition" + ).distinct() + ), + on=["chromosome", "ensemblPosition"], + how="inner", + ) + .persist() ) # Semi-resolved ids (still contains duplicates when conclusion was not possible to make From 951277175ac7d6bb91e666c717ea123ae38b3f04 Mon Sep 17 00:00:00 2001 From: David Ochoa Date: Tue, 1 Oct 2024 15:31:33 +0100 Subject: [PATCH 03/26] fix: remove broadcasts --- .../datasource/gwas_catalog/associations.py | 50 ++++++++----------- 1 file changed, 21 insertions(+), 29 deletions(-) diff --git a/src/gentropy/datasource/gwas_catalog/associations.py b/src/gentropy/datasource/gwas_catalog/associations.py index b22c7a3f5..be948302e 100644 --- a/src/gentropy/datasource/gwas_catalog/associations.py +++ b/src/gentropy/datasource/gwas_catalog/associations.py @@ -225,41 +225,33 @@ def _map_variants_to_gnomad_variants( ) # Subset of variant annotation required for GWAS Catalog annotations: - va_subset = ( - variant_index.df.select( - "variantId", - "chromosome", - # Calculate the position in Ensembl coordinates for indels: - GWASCatalogCuratedAssociationsParser.convert_gnomad_position_to_ensembl( - f.col("position"), - f.col("referenceAllele"), - f.col("alternateAllele"), - ).alias("ensemblPosition"), - # Keeping GnomAD position: - "position", - f.col("rsIds").alias("rsIdsGnomad"), - "referenceAllele", - "alternateAllele", - "alleleFrequencies", - variant_index.max_maf().alias("maxMaf"), - ) - .join( - f.broadcast( - gwas_associations_subset.select( - "chromosome", "ensemblPosition" - ).distinct() - ), - on=["chromosome", "ensemblPosition"], - how="inner", - ) - .persist() + va_subset = variant_index.df.select( + "variantId", + "chromosome", + # Calculate the position in Ensembl coordinates for indels: + GWASCatalogCuratedAssociationsParser.convert_gnomad_position_to_ensembl( + f.col("position"), + f.col("referenceAllele"), + f.col("alternateAllele"), + ).alias("ensemblPosition"), + # Keeping GnomAD position: + "position", + f.col("rsIds").alias("rsIdsGnomad"), + "referenceAllele", + "alternateAllele", + "alleleFrequencies", + variant_index.max_maf().alias("maxMaf"), + ).join( + gwas_associations_subset.select("chromosome", "ensemblPosition").distinct(), + on=["chromosome", "ensemblPosition"], + how="inner", ) # Semi-resolved ids (still contains duplicates when conclusion was not possible to make # based on rsIds or allele concordance) filtered_associations = ( gwas_associations_subset.join( - f.broadcast(va_subset), + va_subset, on=["chromosome", "ensemblPosition"], how="left", ) From 0a2f3092fdf108e09d5c989f94328dacffea5887 Mon Sep 17 00:00:00 2001 From: David Ochoa Date: Tue, 1 Oct 2024 16:47:52 +0100 Subject: [PATCH 04/26] feat: new gwas_catalog_top_hits step --- src/gentropy/config.py | 18 +++++ .../datasource/gwas_catalog/study_index.py | 3 - src/gentropy/gwas_catalog_top_hits.py | 69 +++++++++++++++++++ 3 files changed, 87 insertions(+), 3 deletions(-) create mode 100644 src/gentropy/gwas_catalog_top_hits.py diff --git a/src/gentropy/config.py b/src/gentropy/config.py index 33865d6ea..adb35ef0a 100644 --- a/src/gentropy/config.py +++ b/src/gentropy/config.py @@ -106,6 +106,19 @@ class GWASCatalogIngestionConfig(StepConfig): _target_: str = "gentropy.gwas_catalog_ingestion.GWASCatalogIngestionStep" +@dataclass +class GWASCatalogTopHitIngestionConfig(StepConfig): + """GWAS Catalog ingestion step configuration.""" + + catalog_study_files: list[str] = MISSING + catalog_ancestry_files: list[str] = MISSING + catalog_associations_file: str = MISSING + variant_annotation_path: str = MISSING + catalog_studies_out: str = MISSING + catalog_associations_out: str = MISSING + _target_: str = "gentropy.gwas_catalog_ingestion.GWASCatalogTopHitIngestionStep" + + @dataclass class GWASCatalogSumstatsPreprocessConfig(StepConfig): """GWAS Catalog sumstat preprocess step configuration.""" @@ -526,6 +539,11 @@ def register_config() -> None: name="gwas_catalog_sumstat_preprocess", node=GWASCatalogSumstatsPreprocessConfig, ) + cs.store( + group="step", + name="gwas_catalog_top_hit_ingestion", + node=GWASCatalogTopHitIngestionConfig, + ) cs.store(group="step", name="ld_based_clumping", node=LDBasedClumpingConfig) cs.store(group="step", name="ld_index", node=LDIndexConfig) cs.store(group="step", name="locus_to_gene", node=LocusToGeneConfig) diff --git a/src/gentropy/datasource/gwas_catalog/study_index.py b/src/gentropy/datasource/gwas_catalog/study_index.py index d0d841105..dfc481f52 100644 --- a/src/gentropy/datasource/gwas_catalog/study_index.py +++ b/src/gentropy/datasource/gwas_catalog/study_index.py @@ -316,14 +316,12 @@ def from_source( cls: type[StudyIndexGWASCatalogParser], catalog_studies: DataFrame, ancestry_file: DataFrame, - sumstats_lut: DataFrame, ) -> StudyIndexGWASCatalog: """Ingests study level metadata from the GWAS Catalog. Args: catalog_studies (DataFrame): GWAS Catalog raw study table ancestry_file (DataFrame): GWAS Catalog ancestry table. - sumstats_lut (DataFrame): GWAS Catalog summary statistics list. Returns: StudyIndexGWASCatalog: Parsed and annotated GWAS Catalog study table. @@ -332,7 +330,6 @@ def from_source( return ( cls._parse_study_table(catalog_studies) .annotate_ancestries(ancestry_file) - .annotate_sumstats_info(sumstats_lut) .annotate_discovery_sample_sizes() ) diff --git a/src/gentropy/gwas_catalog_top_hits.py b/src/gentropy/gwas_catalog_top_hits.py new file mode 100644 index 000000000..95722c768 --- /dev/null +++ b/src/gentropy/gwas_catalog_top_hits.py @@ -0,0 +1,69 @@ +"""Step to process GWAS Catalog associations and study table.""" + +from __future__ import annotations + +from gentropy.common.session import Session +from gentropy.config import WindowBasedClumpingStepConfig +from gentropy.dataset.variant_index import VariantIndex +from gentropy.datasource.gwas_catalog.associations import ( + GWASCatalogCuratedAssociationsParser, +) +from gentropy.datasource.gwas_catalog.study_index import ( + StudyIndexGWASCatalogParser, +) +from gentropy.datasource.gwas_catalog.study_splitter import GWASCatalogStudySplitter + + +class GWASCatalogTopHitIngestionStep: + """GWAS Catalog ingestion step to extract GWASCatalog top hits.""" + + def __init__( + self, + session: Session, + catalog_study_files: list[str], + catalog_ancestry_files: list[str], + catalog_associations_file: str, + variant_annotation_path: str, + catalog_studies_out: str, + catalog_associations_out: str, + distance: int = WindowBasedClumpingStepConfig().distance, + ) -> None: + """Run step. + + Args: + session (Session): Session object. + catalog_study_files (list[str]): List of raw GWAS catalog studies file. + catalog_ancestry_files (list[str]): List of raw ancestry annotations files from GWAS Catalog. + catalog_associations_file (str): Raw GWAS catalog associations file. + variant_annotation_path (str): Path to GnomAD variants. + catalog_studies_out (str): Output GWAS catalog studies path. + catalog_associations_out (str): Output GWAS catalog associations path. + distance (int): Distance, within which tagging variants are collected around the semi-index. + """ + # Extract + gnomad_variants = VariantIndex.from_parquet(session, variant_annotation_path) + catalog_studies = session.spark.read.csv( + list(catalog_study_files), sep="\t", header=True + ) + ancestry_lut = session.spark.read.csv( + list(catalog_ancestry_files), sep="\t", header=True + ) + catalog_associations = session.spark.read.csv( + catalog_associations_file, sep="\t", header=True + ).persist() + + # Transform + study_index, study_locus = GWASCatalogStudySplitter.split( + StudyIndexGWASCatalogParser.from_source(catalog_studies, ancestry_lut), + GWASCatalogCuratedAssociationsParser.from_source( + catalog_associations, gnomad_variants + ), + ) + # Load + study_index.df.write.mode(session.write_mode).parquet(catalog_studies_out) + + ( + study_locus.window_based_clumping(distance) + .df.write.mode(session.write_mode) + .parquet(catalog_associations_out) + ) From eab6bc9dfdff28b5f71dc0d1bfdb7038d519eb27 Mon Sep 17 00:00:00 2001 From: David Ochoa Date: Tue, 1 Oct 2024 16:48:35 +0100 Subject: [PATCH 05/26] docs: new step added to documentation --- docs/python_api/steps/gwas_catalog_top_hits.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/python_api/steps/gwas_catalog_top_hits.md diff --git a/docs/python_api/steps/gwas_catalog_top_hits.md b/docs/python_api/steps/gwas_catalog_top_hits.md new file mode 100644 index 000000000..03d81eafb --- /dev/null +++ b/docs/python_api/steps/gwas_catalog_top_hits.md @@ -0,0 +1,5 @@ +--- +title: GWAS Catalog Top Hits Ingestion Step +--- + +::: gentropy.gwas_catalog_top_hits.GWASCatalogTopHitIngestionStep From b09f6cd648989f7f79ba3b5a44b1267952ab1b5e Mon Sep 17 00:00:00 2001 From: David Ochoa Date: Tue, 1 Oct 2024 17:14:23 +0100 Subject: [PATCH 06/26] fix: incorrect target --- src/gentropy/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/gentropy/config.py b/src/gentropy/config.py index adb35ef0a..e01ced43d 100644 --- a/src/gentropy/config.py +++ b/src/gentropy/config.py @@ -116,7 +116,7 @@ class GWASCatalogTopHitIngestionConfig(StepConfig): variant_annotation_path: str = MISSING catalog_studies_out: str = MISSING catalog_associations_out: str = MISSING - _target_: str = "gentropy.gwas_catalog_ingestion.GWASCatalogTopHitIngestionStep" + _target_: str = "gentropy.gwas_catalog_top_hits.GWASCatalogTopHitIngestionStep" @dataclass From 9308c4d168a23652479f627ea796c63dab3c7a51 Mon Sep 17 00:00:00 2001 From: David Ochoa Date: Wed, 2 Oct 2024 15:36:47 +0100 Subject: [PATCH 07/26] fix: failing tests --- src/gentropy/config.py | 2 -- src/gentropy/gwas_catalog_study_curation.py | 10 ++-------- src/gentropy/gwas_catalog_study_inclusion.py | 9 --------- .../gwas_catalog/test_gwas_catalog_study_index.py | 2 -- 4 files changed, 2 insertions(+), 21 deletions(-) diff --git a/src/gentropy/config.py b/src/gentropy/config.py index e01ced43d..fbd9a9398 100644 --- a/src/gentropy/config.py +++ b/src/gentropy/config.py @@ -66,7 +66,6 @@ class GWASCatalogStudyCurationConfig(StepConfig): catalog_study_files: list[str] = MISSING catalog_ancestry_files: list[str] = MISSING - catalog_sumstats_lut: str = MISSING gwas_catalog_study_curation_out: str = MISSING gwas_catalog_study_curation_file: str = MISSING _target_: str = "gentropy.gwas_catalog_study_curation.GWASCatalogStudyCurationStep" @@ -81,7 +80,6 @@ class GWASCatalogStudyInclusionConfig(StepConfig): catalog_associations_file: str = MISSING gwas_catalog_study_curation_file: str = MISSING variant_annotation_path: str = MISSING - harmonised_study_file: str = MISSING criteria: str = MISSING inclusion_list_path: str = MISSING exclusion_list_path: str = MISSING diff --git a/src/gentropy/gwas_catalog_study_curation.py b/src/gentropy/gwas_catalog_study_curation.py index 7329a1679..5026850f3 100644 --- a/src/gentropy/gwas_catalog_study_curation.py +++ b/src/gentropy/gwas_catalog_study_curation.py @@ -1,4 +1,5 @@ """Step to update GWAS Catalog study curation file based on newly released GWAS Catalog dataset.""" + from __future__ import annotations from gentropy.common.session import Session @@ -16,7 +17,6 @@ def __init__( session: Session, catalog_study_files: list[str], catalog_ancestry_files: list[str], - catalog_sumstats_lut: str, gwas_catalog_study_curation_out: str, gwas_catalog_study_curation_file: str | None, ) -> None: @@ -26,7 +26,6 @@ def __init__( session (Session): Session object. catalog_study_files (list[str]): List of raw GWAS catalog studies file. catalog_ancestry_files (list[str]): List of raw ancestry annotations files from GWAS Catalog. - catalog_sumstats_lut (str): GWAS Catalog summary statistics lookup table. gwas_catalog_study_curation_out (str): Path for the updated curation table. gwas_catalog_study_curation_file (str | None): Path to the original curation table. Optinal """ @@ -36,18 +35,13 @@ def __init__( ancestry_lut = session.spark.read.csv( list(catalog_ancestry_files), sep="\t", header=True ) - sumstats_lut = session.spark.read.csv( - catalog_sumstats_lut, sep="\t", header=False - ) gwas_catalog_study_curation = read_curation_table( gwas_catalog_study_curation_file, session ) # Process GWAS Catalog studies and get list of studies for curation: ( - StudyIndexGWASCatalogParser.from_source( - catalog_studies, ancestry_lut, sumstats_lut - ) + StudyIndexGWASCatalogParser.from_source(catalog_studies, ancestry_lut) # Adding existing curation: .annotate_from_study_curation(gwas_catalog_study_curation) # Extract new studies for curation: diff --git a/src/gentropy/gwas_catalog_study_inclusion.py b/src/gentropy/gwas_catalog_study_inclusion.py index f07f851a7..5bc4ca5c6 100644 --- a/src/gentropy/gwas_catalog_study_inclusion.py +++ b/src/gentropy/gwas_catalog_study_inclusion.py @@ -88,7 +88,6 @@ def get_gwas_catalog_study_index( gnomad_variant_path: str, catalog_study_files: list[str], catalog_ancestry_files: list[str], - harmonised_study_file: str, catalog_associations_file: str, gwas_catalog_study_curation_file: str, ) -> StudyIndexGWASCatalog: @@ -99,7 +98,6 @@ def get_gwas_catalog_study_index( gnomad_variant_path (str): Path to GnomAD variant list. catalog_study_files (list[str]): List of raw GWAS catalog studies file. catalog_ancestry_files (list[str]): List of raw ancestry annotations files from GWAS Catalog. - harmonised_study_file (str): GWAS Catalog summary statistics lookup table. catalog_associations_file (str): Raw GWAS catalog associations file. gwas_catalog_study_curation_file (str): file of the curation table. Optional. @@ -114,9 +112,6 @@ def get_gwas_catalog_study_index( ancestry_lut = session.spark.read.csv( list(catalog_ancestry_files), sep="\t", header=True ) - sumstats_lut = session.spark.read.csv( - harmonised_study_file, sep="\t", header=False - ) catalog_associations = session.spark.read.csv( catalog_associations_file, sep="\t", header=True ).persist() @@ -129,7 +124,6 @@ def get_gwas_catalog_study_index( StudyIndexGWASCatalogParser.from_source( catalog_studies, ancestry_lut, - sumstats_lut, ).annotate_from_study_curation(gwas_catalog_study_curation), GWASCatalogCuratedAssociationsParser.from_source( catalog_associations, gnomad_variants @@ -146,7 +140,6 @@ def __init__( catalog_associations_file: str, gwas_catalog_study_curation_file: str, gnomad_variant_path: str, - harmonised_study_file: str, criteria: str, inclusion_list_path: str, exclusion_list_path: str, @@ -160,7 +153,6 @@ def __init__( catalog_associations_file (str): Raw GWAS catalog associations file. gwas_catalog_study_curation_file (str): file of the curation table. Optional. gnomad_variant_path (str): Path to GnomAD variant list. - harmonised_study_file (str): GWAS Catalog summary statistics lookup table. criteria (str): name of the filter set to be applied. inclusion_list_path (str): Output path for the inclusion list. exclusion_list_path (str): Output path for the exclusion list. @@ -171,7 +163,6 @@ def __init__( gnomad_variant_path, catalog_study_files, catalog_ancestry_files, - harmonised_study_file, catalog_associations_file, gwas_catalog_study_curation_file, ) diff --git a/tests/gentropy/datasource/gwas_catalog/test_gwas_catalog_study_index.py b/tests/gentropy/datasource/gwas_catalog/test_gwas_catalog_study_index.py index b91529b3d..62b750b9f 100644 --- a/tests/gentropy/datasource/gwas_catalog/test_gwas_catalog_study_index.py +++ b/tests/gentropy/datasource/gwas_catalog/test_gwas_catalog_study_index.py @@ -49,7 +49,6 @@ def test_annotate_sumstats( def test_study_index_from_source( sample_gwas_catalog_studies: DataFrame, - sample_gwas_catalog_harmonised_sumstats_list: DataFrame, sample_gwas_catalog_ancestries_lut: DataFrame, ) -> None: """Test study index from source.""" @@ -57,7 +56,6 @@ def test_study_index_from_source( StudyIndexGWASCatalogParser.from_source( sample_gwas_catalog_studies, sample_gwas_catalog_ancestries_lut, - sample_gwas_catalog_harmonised_sumstats_list, ), StudyIndexGWASCatalog, ) From 0de744e1e1678840a32b922bbdec27bece48e171 Mon Sep 17 00:00:00 2001 From: David Ochoa Date: Wed, 2 Oct 2024 15:40:46 +0100 Subject: [PATCH 08/26] fix: extra argument --- src/gentropy/gwas_catalog_ingestion.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/gentropy/gwas_catalog_ingestion.py b/src/gentropy/gwas_catalog_ingestion.py index 5a5461523..3f6877b6a 100644 --- a/src/gentropy/gwas_catalog_ingestion.py +++ b/src/gentropy/gwas_catalog_ingestion.py @@ -58,9 +58,6 @@ def __init__( ancestry_lut = session.spark.read.csv( list(catalog_ancestry_files), sep="\t", header=True ) - sumstats_lut = session.spark.read.csv( - catalog_sumstats_lut, sep="\t", header=False - ) catalog_associations = session.spark.read.csv( catalog_associations_file, sep="\t", header=True ).persist() @@ -71,7 +68,7 @@ def __init__( # Transform study_index, study_locus = GWASCatalogStudySplitter.split( StudyIndexGWASCatalogParser.from_source( - catalog_studies, ancestry_lut, sumstats_lut + catalog_studies, ancestry_lut ).annotate_from_study_curation(gwas_catalog_study_curation), GWASCatalogCuratedAssociationsParser.from_source( catalog_associations, gnomad_variants From 8c9c8915793d96f15b3b8be5c0c76345f5232f35 Mon Sep 17 00:00:00 2001 From: David Ochoa Date: Wed, 2 Oct 2024 17:49:04 +0100 Subject: [PATCH 09/26] fix: select does not require hasSumstats anymore --- src/gentropy/gwas_catalog_study_inclusion.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/gentropy/gwas_catalog_study_inclusion.py b/src/gentropy/gwas_catalog_study_inclusion.py index 5bc4ca5c6..20dca188b 100644 --- a/src/gentropy/gwas_catalog_study_inclusion.py +++ b/src/gentropy/gwas_catalog_study_inclusion.py @@ -65,7 +65,6 @@ def flag_eligible_studies( "traitFromSource", "traitFromSourceMappedIds", "qualityControls", - "hasSumstats", filters[criteria].alias("isEligible"), ) From 8663f1d3501e916144ebeec101b25608d75d27d4 Mon Sep 17 00:00:00 2001 From: David Ochoa Date: Thu, 3 Oct 2024 16:48:07 +0100 Subject: [PATCH 10/26] feat: study inclusion step repurposed into study index step --- src/gentropy/config.py | 18 +- .../datasource/gwas_catalog/study_index.py | 124 ++++++------ .../gwas_catalog/study_index_ot_curation.py | 87 +++++++++ src/gentropy/gwas_catalog_study_inclusion.py | 180 ------------------ src/gentropy/gwas_catalog_study_index.py | 90 +++++++++ 5 files changed, 243 insertions(+), 256 deletions(-) create mode 100644 src/gentropy/datasource/gwas_catalog/study_index_ot_curation.py delete mode 100644 src/gentropy/gwas_catalog_study_inclusion.py create mode 100644 src/gentropy/gwas_catalog_study_index.py diff --git a/src/gentropy/config.py b/src/gentropy/config.py index 20e60ff71..898f40a18 100644 --- a/src/gentropy/config.py +++ b/src/gentropy/config.py @@ -72,19 +72,17 @@ class GWASCatalogStudyCurationConfig(StepConfig): @dataclass -class GWASCatalogStudyInclusionConfig(StepConfig): - """GWAS Catalog study inclusion step configuration.""" +class GWASCatalogStudyIndexGenerationStep(StepConfig): + """GWAS Catalog study index generation.""" catalog_study_files: list[str] = MISSING catalog_ancestry_files: list[str] = MISSING - catalog_associations_file: str = MISSING - gwas_catalog_study_curation_file: str = MISSING - variant_annotation_path: str = MISSING - criteria: str = MISSING - inclusion_list_path: str = MISSING - exclusion_list_path: str = MISSING + study_index_path: str = MISSING + harmonised_studies_index_path: str | None = None + gwas_catalog_study_curation_file: str | None = None + sumstats_qc_path: str | None = None _target_: str = ( - "gentropy.gwas_catalog_study_inclusion.GWASCatalogStudyInclusionGenerator" + "gentropy.gwas_catalog_study_index.GWASCatalogStudyIndexGenerationStep" ) @@ -527,7 +525,7 @@ def register_config() -> None: cs.store( group="step", name="gwas_catalog_study_inclusion", - node=GWASCatalogStudyInclusionConfig, + node=GWASCatalogStudyIndexGenerationStep, ) cs.store( group="step", name="gwas_catalog_ingestion", node=GWASCatalogIngestionConfig diff --git a/src/gentropy/datasource/gwas_catalog/study_index.py b/src/gentropy/datasource/gwas_catalog/study_index.py index dfc481f52..c96752b9a 100644 --- a/src/gentropy/datasource/gwas_catalog/study_index.py +++ b/src/gentropy/datasource/gwas_catalog/study_index.py @@ -7,9 +7,7 @@ import pyspark.sql.functions as f import pyspark.sql.types as t -from pyspark import SparkFiles -from gentropy.common.session import Session from gentropy.common.spark_helpers import column2camel_case from gentropy.common.utils import parse_efos from gentropy.dataset.study_index import StudyIndex @@ -18,50 +16,6 @@ from pyspark.sql import Column, DataFrame -def read_curation_table( - curation_path: str | None, session: Session -) -> DataFrame | None: - """Read curation table if path or URL is given. - - Curation itself is fully optional everything should work without it. - - Args: - curation_path (str | None): Optionally given path the curation tsv. - session (Session): session object - - Returns: - DataFrame | None: if curation was provided, - """ - # If no curation path provided, we are returning none: - if curation_path is None: - return None - # Read curation from the web: - elif curation_path.startswith("http"): - # Registering file: - session.spark.sparkContext.addFile(curation_path) - - # Reading file: - curation_df = session.spark.read.csv( - SparkFiles.get(curation_path.split("/")[-1]), sep="\t", header=True - ) - # Read curation from file: - else: - curation_df = session.spark.read.csv(curation_path, sep="\t", header=True) - return curation_df.select( - "studyId", - "studyType", - f.when(f.col("analysisFlag").isNotNull(), f.split(f.col("analysisFlag"), r"\|")) - .otherwise(f.array()) - .alias("analysisFlags"), - f.when( - f.col("qualityControl").isNotNull(), f.split(f.col("qualityControl"), r"\|") - ) - .otherwise(f.array()) - .alias("qualityControls"), - f.col("isCurated").cast(t.BooleanType()), - ) - - @dataclass class StudyIndexGWASCatalogParser: """GWAS Catalog study index parser. @@ -643,41 +597,79 @@ def annotate_sumstats_info( ) -> StudyIndexGWASCatalog: """Annotate summary stat locations. + This function reads a text file with the list of harmonised studies and annotates the study index with the `hasSumstats` column. + Args: sumstats_lut (DataFrame): listing GWAS Catalog summary stats paths Returns: - StudyIndexGWASCatalog: including `summarystatsLocation` and `hasSumstats` columns + StudyIndexGWASCatalog: including `hasSumstats` column Raises: ValueError: if the sumstats_lut table doesn't have the right columns """ - gwas_sumstats_base_uri = ( - "ftp://ftp.ebi.ac.uk/pub/databases/gwas/summary_statistics/" - ) - if "_c0" not in sumstats_lut.columns: raise ValueError( f'Sumstats look-up table needs to have `_c0` column. However it has: {",".join(sumstats_lut.columns)}' ) - parsed_sumstats_lut = sumstats_lut.withColumn( - "summarystatsLocation", - f.concat( - f.lit(gwas_sumstats_base_uri), - f.regexp_replace(f.col("_c0"), r"^\.\/", ""), - ), - ).select( - self._parse_gwas_catalog_study_id("summarystatsLocation").alias("studyId"), - "summarystatsLocation", - f.lit(True).alias("hasSumstats"), + return StudyIndexGWASCatalog( + _df=self.df.drop("hasSumstats") + .join( + sumstats_lut.select( + f.col("_c0").alias("studyId"), f.lit(True).alias("hasSumstats") + ), + on="studyId", + how="left", + ) + .withColumn("hasSumstats", f.coalesce(f.col("hasSumstats"), f.lit(False))), + _schema=StudyIndexGWASCatalog.get_schema(), ) - self.df = ( - self.df.drop("hasSumstats") - .join(parsed_sumstats_lut, on="studyId", how="left") - .withColumn("hasSumstats", f.coalesce(f.col("hasSumstats"), f.lit(False))) + + def annotate_sumstats_qc( + self: StudyIndexGWASCatalog, sumstats_qc: DataFrame + ) -> StudyIndexGWASCatalog: + """Annotate summary stats QC information. + + Args: + sumstats_qc (DataFrame): containing summary statistics-based quality controls. + + Returns: + StudyIndexGWASCatalog: Updated study index with QC information + """ + # convert all columns in sumstats_qc dataframe in array of structs grouped by studyId + sumstats_qc_aggregated = ( + sumstats_qc.groupBy("studyId") + .agg( + f.collect_list( + f.struct( + *[ + f.struct( + f.lit(col).alias("QCCheckName"), + f.col(col).alias("QCCheckValue").cast(t.FloatType()), + ) + for col in sumstats_qc.columns + if col != "studyId" + ] + ) + ).alias("sumStatQCValues") + ) + .withColumn("sumStatQCPerformed", f.lit(True)) + ) + + # Annotate study index with QC information: + return StudyIndexGWASCatalog( + _df=self.df.drop("sumStatQCValues", "sumStatQCPerformed") + .join(sumstats_qc_aggregated, how="left", on="studyId") + .withColumn( + "sumStatQCPerformed", + f.coalesce(f.col("sumStatQCPerformed"), f.lit(False)), + ) + .withColumn( + "sumStatQCValues", f.coalesce(f.col("sumStatQCValues"), f.array()) + ), + _schema=StudyIndexGWASCatalog.get_schema(), ) - return self def annotate_discovery_sample_sizes( self: StudyIndexGWASCatalog, diff --git a/src/gentropy/datasource/gwas_catalog/study_index_ot_curation.py b/src/gentropy/datasource/gwas_catalog/study_index_ot_curation.py new file mode 100644 index 000000000..79500d35c --- /dev/null +++ b/src/gentropy/datasource/gwas_catalog/study_index_ot_curation.py @@ -0,0 +1,87 @@ +"""Study Index for GWAS Catalog data source.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING + +import pyspark.sql.functions as f +import pyspark.sql.types as t +from pyspark import SparkFiles + +from gentropy.common.session import Session + +if TYPE_CHECKING: + from pyspark.sql import DataFrame + + +@dataclass +class StudyIndexGWASCatalogOTCuration: + """Study Index Curation for GWAS Catalog data source. + + This class is responsible for parsing additional curation for the GWAS Catalog studies. + """ + + @staticmethod + def _parser(df: DataFrame) -> DataFrame: + """Parse the curation table. + + Args: + df (DataFrame): DataFrame with the curation table. + + Returns: + DataFrame: DataFrame with the parsed curation table. + """ + return df.select( + "studyId", + "studyType", + f.when( + f.col("analysisFlag").isNotNull(), f.split(f.col("analysisFlag"), r"\|") + ) + .otherwise(f.array()) + .alias("analysisFlags"), + f.when( + f.col("qualityControl").isNotNull(), + f.split(f.col("qualityControl"), r"\|"), + ) + .otherwise(f.array()) + .alias("qualityControls"), + f.col("isCurated").cast(t.BooleanType()), + ) + + @classmethod + def from_csv( + cls: type[StudyIndexGWASCatalogOTCuration], session: Session, curation_path: str + ) -> DataFrame: + """Read curation table from csv. + + Args: + session (Session): Session object. + curation_path (str): Path to the curation table. + + Returns: + DataFrame: DataFrame with the curation table. + """ + return cls._parser(session.spark.read.csv(curation_path, sep="\t", header=True)) + + @classmethod + def from_url( + cls: type[StudyIndexGWASCatalogOTCuration], session: Session, curation_url: str + ) -> DataFrame: + """Read curation table from URL. + + Args: + session (Session): Session object. + curation_url (str): URL to the curation table. + + Returns: + DataFrame: DataFrame with the curation table. + """ + # Registering file: + session.spark.sparkContext.addFile(curation_url) + + return cls._parser( + session.spark.read.csv( + SparkFiles.get(curation_url.split("/")[-1]), sep="\t", header=True + ) + ) diff --git a/src/gentropy/gwas_catalog_study_inclusion.py b/src/gentropy/gwas_catalog_study_inclusion.py deleted file mode 100644 index 20dca188b..000000000 --- a/src/gentropy/gwas_catalog_study_inclusion.py +++ /dev/null @@ -1,180 +0,0 @@ -"""Step to generate an GWAS Catalog study identifier inclusion and exclusion list.""" - -from __future__ import annotations - -from typing import TYPE_CHECKING - -from pyspark.sql import functions as f - -from gentropy.common.session import Session -from gentropy.dataset.variant_index import VariantIndex -from gentropy.datasource.gwas_catalog.associations import ( - GWASCatalogCuratedAssociationsParser, -) -from gentropy.datasource.gwas_catalog.study_index import ( - StudyIndexGWASCatalog, - StudyIndexGWASCatalogParser, - read_curation_table, -) -from gentropy.datasource.gwas_catalog.study_splitter import GWASCatalogStudySplitter - -if TYPE_CHECKING: - from pyspark.sql import Column, DataFrame - - -class GWASCatalogStudyInclusionGenerator: - """GWAS Catalog study eligibility for ingestion based on curation and the provided criteria.""" - - @staticmethod - def flag_eligible_studies( - study_index: StudyIndexGWASCatalog, criteria: str - ) -> DataFrame: - """Apply filter on GWAS Catalog studies based on the provided criteria. - - Args: - study_index (StudyIndexGWASCatalog): complete study index to be filtered based on the provided filter set - criteria (str): name of the filter set to be applied. - - Raises: - ValueError: if the provided filter set is not in the accepted values. - - Returns: - DataFrame: filtered dataframe containing only eligible studies. - """ - filters: dict[str, Column] = { - # Filters applied on studies for ingesting curated associations: - "curation": (study_index.is_gwas() & study_index.has_mapped_trait()), - # Filters applied on studies for ingesting summary statistics: - "summary_stats": ( - study_index.is_gwas() - & study_index.has_mapped_trait() - & (~study_index.is_quality_flagged()) - & study_index.has_summarystats() - ), - } - - if criteria not in filters: - raise ValueError( - f'Wrong value as filter set ({criteria}). Accepted: {",".join(filters.keys())}' - ) - - # Applying the relevant filter to the study: - return study_index.df.select( - "studyId", - "studyType", - "traitFromSource", - "traitFromSourceMappedIds", - "qualityControls", - filters[criteria].alias("isEligible"), - ) - - @staticmethod - def process_harmonised_list(studies: list[str], session: Session) -> DataFrame: - """Generate spark dataframe from the provided list. - - Args: - studies (list[str]): list of path pointing to harmonised summary statistics. - session (Session): session - - Returns: - DataFrame: column name is consistent with original implementatin - """ - return session.spark.createDataFrame([{"_c0": path} for path in studies]) - - @staticmethod - def get_gwas_catalog_study_index( - session: Session, - gnomad_variant_path: str, - catalog_study_files: list[str], - catalog_ancestry_files: list[str], - catalog_associations_file: str, - gwas_catalog_study_curation_file: str, - ) -> StudyIndexGWASCatalog: - """Return GWAS Catalog study index. - - Args: - session (Session): Session object. - gnomad_variant_path (str): Path to GnomAD variant list. - catalog_study_files (list[str]): List of raw GWAS catalog studies file. - catalog_ancestry_files (list[str]): List of raw ancestry annotations files from GWAS Catalog. - catalog_associations_file (str): Raw GWAS catalog associations file. - gwas_catalog_study_curation_file (str): file of the curation table. Optional. - - Returns: - StudyIndexGWASCatalog: Completely processed and fully annotated study index. - """ - # Extract - gnomad_variants = VariantIndex.from_parquet(session, gnomad_variant_path) - catalog_studies = session.spark.read.csv( - list(catalog_study_files), sep="\t", header=True - ) - ancestry_lut = session.spark.read.csv( - list(catalog_ancestry_files), sep="\t", header=True - ) - catalog_associations = session.spark.read.csv( - catalog_associations_file, sep="\t", header=True - ).persist() - gwas_catalog_study_curation = read_curation_table( - gwas_catalog_study_curation_file, session - ) - - # Transform - study_index, _ = GWASCatalogStudySplitter.split( - StudyIndexGWASCatalogParser.from_source( - catalog_studies, - ancestry_lut, - ).annotate_from_study_curation(gwas_catalog_study_curation), - GWASCatalogCuratedAssociationsParser.from_source( - catalog_associations, gnomad_variants - ), - ) - - return study_index - - def __init__( - self, - session: Session, - catalog_study_files: list[str], - catalog_ancestry_files: list[str], - catalog_associations_file: str, - gwas_catalog_study_curation_file: str, - gnomad_variant_path: str, - criteria: str, - inclusion_list_path: str, - exclusion_list_path: str, - ) -> None: - """Run step. - - Args: - session (Session): Session objecct. - catalog_study_files (list[str]): List of raw GWAS catalog studies file. - catalog_ancestry_files (list[str]): List of raw ancestry annotations files from GWAS Catalog. - catalog_associations_file (str): Raw GWAS catalog associations file. - gwas_catalog_study_curation_file (str): file of the curation table. Optional. - gnomad_variant_path (str): Path to GnomAD variant list. - criteria (str): name of the filter set to be applied. - inclusion_list_path (str): Output path for the inclusion list. - exclusion_list_path (str): Output path for the exclusion list. - """ - # Create study index: - study_index = self.get_gwas_catalog_study_index( - session, - gnomad_variant_path, - catalog_study_files, - catalog_ancestry_files, - catalog_associations_file, - gwas_catalog_study_curation_file, - ) - - # Get study indices for inclusion: - flagged_studies = self.flag_eligible_studies(study_index, criteria) - - # Output inclusion list: - eligible = ( - flagged_studies.filter(f.col("isEligible")).select("studyId").persist() - ) - eligible.write.mode(session.write_mode).parquet(inclusion_list_path) - - # Output exclusion list: - excluded = flagged_studies.filter(~f.col("isEligible")).persist() - excluded.write.mode(session.write_mode).parquet(exclusion_list_path) diff --git a/src/gentropy/gwas_catalog_study_index.py b/src/gentropy/gwas_catalog_study_index.py new file mode 100644 index 000000000..5a0f44b2e --- /dev/null +++ b/src/gentropy/gwas_catalog_study_index.py @@ -0,0 +1,90 @@ +"""Step to generate an GWAS Catalog study identifier inclusion and exclusion list.""" + +from __future__ import annotations + +from gentropy.common.session import Session +from gentropy.datasource.gwas_catalog.study_index import ( + StudyIndexGWASCatalogParser, +) +from gentropy.datasource.gwas_catalog.study_index_ot_curation import ( + StudyIndexGWASCatalogOTCuration, +) + + +class GWASCatalogStudyIndexGenerationStep: + """GWAS Catalog study index generation. + + This step generates a study index from the GWAS Catalog studies and ancestry files. It can also add additional curation information and summary statistics QC information when available. + + ''' warning + This step does not generate study index for gwas catalog top hits. + + This step provides several optional arguments to add additional information to the study index: + + - harmonised_studies_index_path: Path to text file containing the list of harmonised studies. If provided it populates the `hasSumstats` column in the study index. + - gwas_catalog_study_curation_file: csv file or URL containing the curation table. If provided it annotates the study index with the additional curation information performed by the Open Targets team. + - sumstats_qc_path: Path to the summary statistics QC table. If provided it annotates the study index with the summary statistics QC information in the `sumStatQCValues` and `sumStatQCPerformed` columns (e.g. `n_variants`, `n_variants_sig` etc.). + """ + + def __init__( + self, + session: Session, + catalog_study_files: list[str], + catalog_ancestry_files: list[str], + study_index_path: str, + harmonised_studies_index_path: str | None = None, + gwas_catalog_study_curation_file: str | None = None, + sumstats_qc_path: str | None = None, + ) -> None: + """Run step. + + Args: + session (Session): Session objecct. + catalog_study_files (list[str]): List of raw GWAS catalog studies file. + catalog_ancestry_files (list[str]): List of raw ancestry annotations files from GWAS Catalog. + study_index_path (str): Output GWAS catalog studies path. + harmonised_studies_index_path (str | None): Path to text file containing the list of harmonised studies. Optional. + gwas_catalog_study_curation_file (str | None): csv file or URL containing the curation table. Optional. + sumstats_qc_path (str | None): Path to the summary statistics QC table. Optional. + + Raises: + ValueError: If the curation file is provided but not a CSV file or URL. + """ + # Core Study Index Generation: + study_index = StudyIndexGWASCatalogParser.from_source( + session.spark.read.csv(list(catalog_study_files), sep="\t", header=True), + session.spark.read.csv(list(catalog_ancestry_files), sep="\t", header=True), + ) + + # Annotate with curation if provided: + if gwas_catalog_study_curation_file: + if gwas_catalog_study_curation_file.endswith(".csv"): + gwas_catalog_study_curation = StudyIndexGWASCatalogOTCuration.from_csv( + session, gwas_catalog_study_curation_file + ) + elif gwas_catalog_study_curation_file.startswith("http"): + gwas_catalog_study_curation = StudyIndexGWASCatalogOTCuration.from_url( + session, gwas_catalog_study_curation_file + ) + else: + raise ValueError( + "Only CSV files or URLs are accepted as curation file." + ) + study_index = study_index.annotate_from_study_curation( + gwas_catalog_study_curation + ) + + # Annotate with has summary statistics if provided: + if harmonised_studies_index_path: + harmonised_studies = session.spark.read.csv( + harmonised_studies_index_path, header=False + ) + study_index = study_index.annotate_sumstats_info(harmonised_studies) + + # Annotate with sumstats QC if provided: + if sumstats_qc_path: + sumstats_qc = session.spark.read.parquet(sumstats_qc_path) + study_index = study_index.annotate_sumstats_qc(sumstats_qc) + + # Write the study + study_index.df.write.mode(session.write_mode).parquet(study_index_path) From fa062b27a7a4c248d6fb24be135c3927154f480d Mon Sep 17 00:00:00 2001 From: David Ochoa Date: Thu, 3 Oct 2024 16:59:14 +0100 Subject: [PATCH 11/26] docs: fix path for documentation --- docs/python_api/steps/gwas_catalog_inclusion.md | 5 ----- docs/python_api/steps/gwas_catalog_study_index.md | 5 +++++ 2 files changed, 5 insertions(+), 5 deletions(-) delete mode 100644 docs/python_api/steps/gwas_catalog_inclusion.md create mode 100644 docs/python_api/steps/gwas_catalog_study_index.md diff --git a/docs/python_api/steps/gwas_catalog_inclusion.md b/docs/python_api/steps/gwas_catalog_inclusion.md deleted file mode 100644 index e9ede6dd6..000000000 --- a/docs/python_api/steps/gwas_catalog_inclusion.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -title: gwas_catalog_study_inclusion ---- - -::: gentropy.gwas_catalog_study_inclusion.GWASCatalogStudyInclusionGenerator diff --git a/docs/python_api/steps/gwas_catalog_study_index.md b/docs/python_api/steps/gwas_catalog_study_index.md new file mode 100644 index 000000000..4984de2a0 --- /dev/null +++ b/docs/python_api/steps/gwas_catalog_study_index.md @@ -0,0 +1,5 @@ +--- +title: gwas_catalog_study_inclusion +--- + +::: gentropy.gwas_catalog_study_index.GWASCatalogStudyIndexGenerationStep From 5023e3a1f5653e18dc7672e73cb5e612e391a4a5 Mon Sep 17 00:00:00 2001 From: David Ochoa Date: Thu, 3 Oct 2024 17:02:57 +0100 Subject: [PATCH 12/26] feat: remove GWASCatalogIngestionStep as it will no longer be necessary --- .../steps/gwas_catalog_ingestion.md | 5 - src/gentropy/config.py | 19 ---- src/gentropy/gwas_catalog_ingestion.py | 94 ------------------- 3 files changed, 118 deletions(-) delete mode 100644 docs/python_api/steps/gwas_catalog_ingestion.md delete mode 100644 src/gentropy/gwas_catalog_ingestion.py diff --git a/docs/python_api/steps/gwas_catalog_ingestion.md b/docs/python_api/steps/gwas_catalog_ingestion.md deleted file mode 100644 index 69ea92479..000000000 --- a/docs/python_api/steps/gwas_catalog_ingestion.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -title: gwas_catalog_ingestion ---- - -::: gentropy.gwas_catalog_ingestion.GWASCatalogIngestionStep diff --git a/src/gentropy/config.py b/src/gentropy/config.py index 898f40a18..5643636fd 100644 --- a/src/gentropy/config.py +++ b/src/gentropy/config.py @@ -86,22 +86,6 @@ class GWASCatalogStudyIndexGenerationStep(StepConfig): ) -@dataclass -class GWASCatalogIngestionConfig(StepConfig): - """GWAS Catalog ingestion step configuration.""" - - catalog_study_files: list[str] = MISSING - catalog_ancestry_files: list[str] = MISSING - catalog_sumstats_lut: str = MISSING - catalog_associations_file: str = MISSING - variant_annotation_path: str = MISSING - catalog_studies_out: str = MISSING - catalog_associations_out: str = MISSING - gwas_catalog_study_curation_file: str | None = None - inclusion_list_path: str | None = None - _target_: str = "gentropy.gwas_catalog_ingestion.GWASCatalogIngestionStep" - - @dataclass class GWASCatalogTopHitIngestionConfig(StepConfig): """GWAS Catalog ingestion step configuration.""" @@ -527,9 +511,6 @@ def register_config() -> None: name="gwas_catalog_study_inclusion", node=GWASCatalogStudyIndexGenerationStep, ) - cs.store( - group="step", name="gwas_catalog_ingestion", node=GWASCatalogIngestionConfig - ) cs.store( group="step", name="gwas_catalog_sumstat_preprocess", diff --git a/src/gentropy/gwas_catalog_ingestion.py b/src/gentropy/gwas_catalog_ingestion.py deleted file mode 100644 index 3f6877b6a..000000000 --- a/src/gentropy/gwas_catalog_ingestion.py +++ /dev/null @@ -1,94 +0,0 @@ -"""Step to process GWAS Catalog associations and study table.""" - -from __future__ import annotations - -from gentropy.common.session import Session -from gentropy.config import WindowBasedClumpingStepConfig -from gentropy.dataset.variant_index import VariantIndex -from gentropy.datasource.gwas_catalog.associations import ( - GWASCatalogCuratedAssociationsParser, -) -from gentropy.datasource.gwas_catalog.study_index import ( - StudyIndexGWASCatalogParser, - read_curation_table, -) -from gentropy.datasource.gwas_catalog.study_splitter import GWASCatalogStudySplitter - - -class GWASCatalogIngestionStep: - """GWAS Catalog ingestion step to extract GWASCatalog Study and StudyLocus tables. - - !!! note This step currently only processes the GWAS Catalog curated list of top hits. - """ - - def __init__( - self, - session: Session, - catalog_study_files: list[str], - catalog_ancestry_files: list[str], - catalog_sumstats_lut: str, - catalog_associations_file: str, - variant_annotation_path: str, - catalog_studies_out: str, - catalog_associations_out: str, - distance: int = WindowBasedClumpingStepConfig().distance, - gwas_catalog_study_curation_file: str | None = None, - inclusion_list_path: str | None = None, - ) -> None: - """Run step. - - Args: - session (Session): Session object. - catalog_study_files (list[str]): List of raw GWAS catalog studies file. - catalog_ancestry_files (list[str]): List of raw ancestry annotations files from GWAS Catalog. - catalog_sumstats_lut (str): GWAS Catalog summary statistics lookup table. - catalog_associations_file (str): Raw GWAS catalog associations file. - variant_annotation_path (str): Path to GnomAD variants. - catalog_studies_out (str): Output GWAS catalog studies path. - catalog_associations_out (str): Output GWAS catalog associations path. - distance (int): Distance, within which tagging variants are collected around the semi-index. - gwas_catalog_study_curation_file (str | None): file of the curation table. Optional. - inclusion_list_path (str | None): optional inclusion list (parquet) - """ - # Extract - gnomad_variants = VariantIndex.from_parquet(session, variant_annotation_path) - catalog_studies = session.spark.read.csv( - list(catalog_study_files), sep="\t", header=True - ) - ancestry_lut = session.spark.read.csv( - list(catalog_ancestry_files), sep="\t", header=True - ) - catalog_associations = session.spark.read.csv( - catalog_associations_file, sep="\t", header=True - ).persist() - gwas_catalog_study_curation = read_curation_table( - gwas_catalog_study_curation_file, session - ) - - # Transform - study_index, study_locus = GWASCatalogStudySplitter.split( - StudyIndexGWASCatalogParser.from_source( - catalog_studies, ancestry_lut - ).annotate_from_study_curation(gwas_catalog_study_curation), - GWASCatalogCuratedAssociationsParser.from_source( - catalog_associations, gnomad_variants - ), - ) - - # if inclusion list is provided apply filter: - if inclusion_list_path: - inclusion_list = session.spark.read.parquet( - inclusion_list_path, sep="\t", header=True - ) - - study_index = study_index.apply_inclusion_list(inclusion_list) - study_locus = study_locus.apply_inclusion_list(inclusion_list) - - # Load - study_index.df.write.mode(session.write_mode).parquet(catalog_studies_out) - - ( - study_locus.window_based_clumping(distance) - .df.write.mode(session.write_mode) - .parquet(catalog_associations_out) - ) From 040ae97621f681e38b21d6ba28d20fd085568467 Mon Sep 17 00:00:00 2001 From: David Ochoa Date: Thu, 3 Oct 2024 17:11:36 +0100 Subject: [PATCH 13/26] fix: gwas catalog study curation step --- src/gentropy/gwas_catalog_study_curation.py | 24 +++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/src/gentropy/gwas_catalog_study_curation.py b/src/gentropy/gwas_catalog_study_curation.py index 5026850f3..530e03ea6 100644 --- a/src/gentropy/gwas_catalog_study_curation.py +++ b/src/gentropy/gwas_catalog_study_curation.py @@ -5,7 +5,9 @@ from gentropy.common.session import Session from gentropy.datasource.gwas_catalog.study_index import ( StudyIndexGWASCatalogParser, - read_curation_table, +) +from gentropy.datasource.gwas_catalog.study_index_ot_curation import ( + StudyIndexGWASCatalogOTCuration, ) @@ -28,6 +30,9 @@ def __init__( catalog_ancestry_files (list[str]): List of raw ancestry annotations files from GWAS Catalog. gwas_catalog_study_curation_out (str): Path for the updated curation table. gwas_catalog_study_curation_file (str | None): Path to the original curation table. Optinal + + Raises: + ValueError: If the curation file is provided but not a CSV file or URL. """ catalog_studies = session.spark.read.csv( list(catalog_study_files), sep="\t", header=True @@ -35,9 +40,20 @@ def __init__( ancestry_lut = session.spark.read.csv( list(catalog_ancestry_files), sep="\t", header=True ) - gwas_catalog_study_curation = read_curation_table( - gwas_catalog_study_curation_file, session - ) + + if gwas_catalog_study_curation_file: + if gwas_catalog_study_curation_file.endswith(".csv"): + gwas_catalog_study_curation = StudyIndexGWASCatalogOTCuration.from_csv( + session, gwas_catalog_study_curation_file + ) + elif gwas_catalog_study_curation_file.startswith("http"): + gwas_catalog_study_curation = StudyIndexGWASCatalogOTCuration.from_url( + session, gwas_catalog_study_curation_file + ) + else: + raise ValueError( + "Only CSV files or URLs are accepted as curation file." + ) # Process GWAS Catalog studies and get list of studies for curation: ( From b1a2f7024761da9a96ffd0a8e3f2c213a8338cbc Mon Sep 17 00:00:00 2001 From: David Ochoa Date: Thu, 3 Oct 2024 17:40:30 +0100 Subject: [PATCH 14/26] refactor: rename step --- src/gentropy/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/gentropy/config.py b/src/gentropy/config.py index 5643636fd..a2f1974dd 100644 --- a/src/gentropy/config.py +++ b/src/gentropy/config.py @@ -508,7 +508,7 @@ def register_config() -> None: ) cs.store( group="step", - name="gwas_catalog_study_inclusion", + name="gwas_catalog_study_index", node=GWASCatalogStudyIndexGenerationStep, ) cs.store( From ca96aab17730af9d7cfb04ac936a3fe6b24f1355 Mon Sep 17 00:00:00 2001 From: David Ochoa Date: Fri, 4 Oct 2024 11:51:18 +0100 Subject: [PATCH 15/26] perf: repartition study locus before PICS to gain parallellisation --- src/gentropy/method/pics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/gentropy/method/pics.py b/src/gentropy/method/pics.py index 5fd084efd..fe0702ba5 100644 --- a/src/gentropy/method/pics.py +++ b/src/gentropy/method/pics.py @@ -227,7 +227,7 @@ def finemap( ) return StudyLocus( _df=( - associations.df + associations.df.repartition() # Old locus column will be dropped if available .select(*[col for col in associations.df.columns if col != "locus"]) # Estimate neglog_pvalue for the lead variant From 8c6190ddf453e9e0ef465bbf232d5162ee288bcd Mon Sep 17 00:00:00 2001 From: David Ochoa Date: Fri, 4 Oct 2024 11:58:26 +0100 Subject: [PATCH 16/26] fix: incorrect partitioning --- src/gentropy/method/pics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/gentropy/method/pics.py b/src/gentropy/method/pics.py index fe0702ba5..f8786596a 100644 --- a/src/gentropy/method/pics.py +++ b/src/gentropy/method/pics.py @@ -227,7 +227,7 @@ def finemap( ) return StudyLocus( _df=( - associations.df.repartition() + associations.df.repartition("studyLocusId") # Old locus column will be dropped if available .select(*[col for col in associations.df.columns if col != "locus"]) # Estimate neglog_pvalue for the lead variant From e0966ea79c7b4e8cac63f895226a899f1a72733c Mon Sep 17 00:00:00 2001 From: David Ochoa Date: Fri, 4 Oct 2024 12:36:48 +0100 Subject: [PATCH 17/26] revert: partitioning --- src/gentropy/method/pics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/gentropy/method/pics.py b/src/gentropy/method/pics.py index f8786596a..5fd084efd 100644 --- a/src/gentropy/method/pics.py +++ b/src/gentropy/method/pics.py @@ -227,7 +227,7 @@ def finemap( ) return StudyLocus( _df=( - associations.df.repartition("studyLocusId") + associations.df # Old locus column will be dropped if available .select(*[col for col in associations.df.columns if col != "locus"]) # Estimate neglog_pvalue for the lead variant From 056376d8a21e78972f377b9e81840b015e1fa6ae Mon Sep 17 00:00:00 2001 From: vivienho <56025826+vivienho@users.noreply.github.com> Date: Tue, 15 Oct 2024 14:17:47 +0100 Subject: [PATCH 18/26] fix: drop duplicate rows after ingesting associations --- src/gentropy/datasource/gwas_catalog/associations.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/gentropy/datasource/gwas_catalog/associations.py b/src/gentropy/datasource/gwas_catalog/associations.py index be948302e..da2bcc6df 100644 --- a/src/gentropy/datasource/gwas_catalog/associations.py +++ b/src/gentropy/datasource/gwas_catalog/associations.py @@ -1106,7 +1106,10 @@ def from_source( pvalue_threshold is keeped in sync with the WindowBasedClumpingStep gwas_significance. """ return StudyLocusGWASCatalog( - _df=gwas_associations.withColumn( + _df=gwas_associations + # drop duplicate rows + .distinct() + .withColumn( "studyLocusId", f.monotonically_increasing_id().cast(StringType()) ) .transform( From afdc442a61dfeb72f41cb81c3037106b1c5caa7b Mon Sep 17 00:00:00 2001 From: Yakov Tsepilov Date: Wed, 16 Oct 2024 11:16:51 +0100 Subject: [PATCH 19/26] fix: fix in study index ingestion --- .../datasource/gwas_catalog/study_index.py | 41 +++++++++++-------- .../gwas_catalog/study_index_ot_curation.py | 3 ++ src/gentropy/gwas_catalog_study_index.py | 27 +++++++++--- 3 files changed, 47 insertions(+), 24 deletions(-) diff --git a/src/gentropy/datasource/gwas_catalog/study_index.py b/src/gentropy/datasource/gwas_catalog/study_index.py index c96752b9a..716c27973 100644 --- a/src/gentropy/datasource/gwas_catalog/study_index.py +++ b/src/gentropy/datasource/gwas_catalog/study_index.py @@ -8,7 +8,7 @@ import pyspark.sql.functions as f import pyspark.sql.types as t -from gentropy.common.spark_helpers import column2camel_case +from gentropy.common.spark_helpers import column2camel_case, convert_from_wide_to_long from gentropy.common.utils import parse_efos from gentropy.dataset.study_index import StudyIndex @@ -638,36 +638,41 @@ def annotate_sumstats_qc( StudyIndexGWASCatalog: Updated study index with QC information """ # convert all columns in sumstats_qc dataframe in array of structs grouped by studyId - sumstats_qc_aggregated = ( - sumstats_qc.groupBy("studyId") + cols = [c for c in sumstats_qc.columns if c != "studyId"] + + melted_df = convert_from_wide_to_long( + sumstats_qc, + id_vars=["studyId"], + value_vars=cols, + var_name="QCCheckName", + value_name="QCCheckValue", + ) + qc_df = ( + melted_df.groupBy("studyId") .agg( f.collect_list( - f.struct( - *[ - f.struct( - f.lit(col).alias("QCCheckName"), - f.col(col).alias("QCCheckValue").cast(t.FloatType()), - ) - for col in sumstats_qc.columns - if col != "studyId" - ] - ) + f.struct(f.col("QCCheckName"), f.col("QCCheckValue")) ).alias("sumStatQCValues") ) .withColumn("sumStatQCPerformed", f.lit(True)) ) - # Annotate study index with QC information: - return StudyIndexGWASCatalog( - _df=self.df.drop("sumStatQCValues", "sumStatQCPerformed") - .join(sumstats_qc_aggregated, how="left", on="studyId") + df = ( + self.df.drop("sumStatQCValues", "sumStatQCPerformed") + .join(qc_df, how="left", on="studyId") .withColumn( "sumStatQCPerformed", f.coalesce(f.col("sumStatQCPerformed"), f.lit(False)), ) .withColumn( "sumStatQCValues", f.coalesce(f.col("sumStatQCValues"), f.array()) - ), + ) + ) + + df = df.filter(f.col("sumStatQCPerformed")) + # Annotate study index with QC information: + return StudyIndexGWASCatalog( + _df=df, _schema=StudyIndexGWASCatalog.get_schema(), ) diff --git a/src/gentropy/datasource/gwas_catalog/study_index_ot_curation.py b/src/gentropy/datasource/gwas_catalog/study_index_ot_curation.py index 79500d35c..8d75e4824 100644 --- a/src/gentropy/datasource/gwas_catalog/study_index_ot_curation.py +++ b/src/gentropy/datasource/gwas_catalog/study_index_ot_curation.py @@ -32,6 +32,9 @@ def _parser(df: DataFrame) -> DataFrame: Returns: DataFrame: DataFrame with the parsed curation table. """ + if "qualityControl" not in df.columns: + # Add the 'qualityControl' column with null values + df = df.withColumn("qualityControl", f.lit(None).cast("string")) return df.select( "studyId", "studyType", diff --git a/src/gentropy/gwas_catalog_study_index.py b/src/gentropy/gwas_catalog_study_index.py index 5a0f44b2e..f4d3c406f 100644 --- a/src/gentropy/gwas_catalog_study_index.py +++ b/src/gentropy/gwas_catalog_study_index.py @@ -2,10 +2,10 @@ from __future__ import annotations +from pyspark.sql.types import DoubleType, LongType, StringType, StructField, StructType + from gentropy.common.session import Session -from gentropy.datasource.gwas_catalog.study_index import ( - StudyIndexGWASCatalogParser, -) +from gentropy.datasource.gwas_catalog.study_index import StudyIndexGWASCatalogParser from gentropy.datasource.gwas_catalog.study_index_ot_curation import ( StudyIndexGWASCatalogOTCuration, ) @@ -58,7 +58,9 @@ def __init__( # Annotate with curation if provided: if gwas_catalog_study_curation_file: - if gwas_catalog_study_curation_file.endswith(".csv"): + if gwas_catalog_study_curation_file.endswith( + ".tsv" + ) | gwas_catalog_study_curation_file.endswith(".tsv"): gwas_catalog_study_curation = StudyIndexGWASCatalogOTCuration.from_csv( session, gwas_catalog_study_curation_file ) @@ -68,7 +70,7 @@ def __init__( ) else: raise ValueError( - "Only CSV files or URLs are accepted as curation file." + "Only CSV/TSV files or URLs are accepted as curation file." ) study_index = study_index.annotate_from_study_curation( gwas_catalog_study_curation @@ -83,7 +85,20 @@ def __init__( # Annotate with sumstats QC if provided: if sumstats_qc_path: - sumstats_qc = session.spark.read.parquet(sumstats_qc_path) + schema = StructType( + [ + StructField("studyId", StringType(), True), + StructField("mean_beta", DoubleType(), True), + StructField("mean_diff_pz", DoubleType(), True), + StructField("se_diff_pz", DoubleType(), True), + StructField("gc_lambda", DoubleType(), True), + StructField("n_variants", LongType(), True), + StructField("n_variants_sig", LongType(), True), + ] + ) + sumstats_qc = session.spark.read.schema(schema).parquet( + sumstats_qc_path, recursiveFileLookup=True + ) study_index = study_index.annotate_sumstats_qc(sumstats_qc) # Write the study From 0f8911d70af699e877523b6a0c2497b3c54a54fa Mon Sep 17 00:00:00 2001 From: Yakov Tsepilov Date: Wed, 16 Oct 2024 11:59:47 +0100 Subject: [PATCH 20/26] fix: v1 --- src/gentropy/datasource/gwas_catalog/study_index.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/gentropy/datasource/gwas_catalog/study_index.py b/src/gentropy/datasource/gwas_catalog/study_index.py index 716c27973..ab99be6d1 100644 --- a/src/gentropy/datasource/gwas_catalog/study_index.py +++ b/src/gentropy/datasource/gwas_catalog/study_index.py @@ -655,10 +655,11 @@ def annotate_sumstats_qc( ).alias("sumStatQCValues") ) .withColumn("sumStatQCPerformed", f.lit(True)) + .withColumn("hasSumstats", f.lit(True)) ) df = ( - self.df.drop("sumStatQCValues", "sumStatQCPerformed") + self.df.drop("sumStatQCValues", "sumStatQCPerformed", "hasSumstats") .join(qc_df, how="left", on="studyId") .withColumn( "sumStatQCPerformed", From 97900f399532dea1dbbc72afc0cd3f7c5be14c1c Mon Sep 17 00:00:00 2001 From: David Ochoa Date: Wed, 16 Oct 2024 19:58:54 +0100 Subject: [PATCH 21/26] feat: working study index with sumstats qc and curation --- src/gentropy/assets/schemas/study_index.json | 31 +--- src/gentropy/config.py | 1 - src/gentropy/dataset/study_index.py | 156 +++++++++++++++++- .../datasource/gwas_catalog/study_index.py | 149 ++++------------- src/gentropy/gwas_catalog_study_index.py | 22 +-- 5 files changed, 190 insertions(+), 169 deletions(-) diff --git a/src/gentropy/assets/schemas/study_index.json b/src/gentropy/assets/schemas/study_index.json index a2dac1bca..9c50d4a19 100644 --- a/src/gentropy/assets/schemas/study_index.json +++ b/src/gentropy/assets/schemas/study_index.json @@ -264,33 +264,12 @@ "metadata": {} }, { - "name": "sumStatQCPerformed", - "type": "boolean", - "nullable": true, - "metadata": {} - }, - { - "name": "sumStatQCValues", + "name": "sumstatQCValues", "type": { - "type": "array", - "elementType": { - "type": "struct", - "fields": [ - { - "name": "QCCheckName", - "type": "string", - "nullable": true, - "metadata": {} - }, - { - "name": "QCCheckValue", - "type": "float", - "nullable": true, - "metadata": {} - } - ] - }, - "containsNull": true + "type": "map", + "keyType": "string", + "valueType": "float", + "valueContainsNull": true }, "nullable": true, "metadata": {} diff --git a/src/gentropy/config.py b/src/gentropy/config.py index ed010134f..1ee42fdaf 100644 --- a/src/gentropy/config.py +++ b/src/gentropy/config.py @@ -81,7 +81,6 @@ class GWASCatalogStudyIndexGenerationStep(StepConfig): catalog_study_files: list[str] = MISSING catalog_ancestry_files: list[str] = MISSING study_index_path: str = MISSING - harmonised_studies_index_path: str | None = None gwas_catalog_study_curation_file: str | None = None sumstats_qc_path: str | None = None _target_: str = ( diff --git a/src/gentropy/dataset/study_index.py b/src/gentropy/dataset/study_index.py index e7023ee9b..92bdc61a4 100644 --- a/src/gentropy/dataset/study_index.py +++ b/src/gentropy/dataset/study_index.py @@ -13,6 +13,7 @@ from gentropy.assets import data from gentropy.common.schemas import parse_spark_schema +from gentropy.common.spark_helpers import convert_from_wide_to_long from gentropy.dataset.dataset import Dataset if TYPE_CHECKING: @@ -32,13 +33,29 @@ class StudyQualityCheck(Enum): UNKNOWN_STUDY_TYPE (str): Indicating the provided type of study is not supported. UNKNOWN_BIOSAMPLE (str): Flagging if a biosample identifier is not found in the reference. DUPLICATED_STUDY (str): Flagging if a study identifier is not unique. + SUMSTATS_NOT_AVAILABLE (str): Flagging if harmonized summary statistics are not available or empty. + NO_OT_CURATION (str): Flagging if a study has not been curated by Open Targets. + FAILED_MEAN_BETA_CHECK (str): Flagging if the mean beta QC check value is not within the expected range. + FAILED_PZ_CHECK (str): Flagging if the PZ QC check values are not within the expected range. + FAILED_GC_LAMBDA_CHECK (str): Flagging if the GC lambda value is not within the expected range. + SMALL_NUMBER_OF_SNPS (str): Flagging if the number of SNPs in the study is below the expected threshold. """ - UNRESOLVED_TARGET = "Target/gene identifier could not match to reference." - UNRESOLVED_DISEASE = "No valid disease identifier found." - UNKNOWN_STUDY_TYPE = "This type of study is not supported." - UNKNOWN_BIOSAMPLE = "Biosample identifier was not found in the reference." - DUPLICATED_STUDY = "The identifier of this study is not unique." + UNRESOLVED_TARGET = "Target/gene identifier could not match to reference" + UNRESOLVED_DISEASE = "No valid disease identifier found" + UNKNOWN_STUDY_TYPE = "This type of study is not supported" + UNKNOWN_BIOSAMPLE = "Biosample identifier was not found in the reference" + DUPLICATED_STUDY = "The identifier of this study is not unique" + SUMSTATS_NOT_AVAILABLE = "Harmonized summary statistics are not available or empty" + NO_OT_CURATION = "GWAS Catalog study has not been curated by Open Targets" + FAILED_MEAN_BETA_CHECK = ( + "The mean beta QC check value is not within the expected range" + ) + FAILED_PZ_CHECK = "The PZ QC check values are not within the expected range" + FAILED_GC_LAMBDA_CHECK = "The GC lambda value is not within the expected range" + SMALL_NUMBER_OF_SNPS = ( + "The number of SNPs in the study is below the expected threshold" + ) @dataclass @@ -410,7 +427,9 @@ def validate_target(self: StudyIndex, target_index: GeneIndex) -> StudyIndex: return StudyIndex(_df=validated_df, _schema=StudyIndex.get_schema()) - def validate_biosample(self: StudyIndex, biosample_index: BiosampleIndex) -> StudyIndex: + def validate_biosample( + self: StudyIndex, biosample_index: BiosampleIndex + ) -> StudyIndex: """Validating biosample identifiers in the study index against the provided biosample index. Args: @@ -419,7 +438,9 @@ def validate_biosample(self: StudyIndex, biosample_index: BiosampleIndex) -> Stu Returns: StudyIndex: where non-gwas studies are flagged if biosampleIndex could not be validated. """ - biosample_set = biosample_index.df.select("biosampleId", f.lit(True).alias("isIdFound")) + biosample_set = biosample_index.df.select( + "biosampleId", f.lit(True).alias("isIdFound") + ) # If biosampleId in df, we need to drop it: if "biosampleId" in self.df.columns: @@ -430,7 +451,11 @@ def validate_biosample(self: StudyIndex, biosample_index: BiosampleIndex) -> Stu return self validated_df = ( - self.df.join(biosample_set, self.df.biosampleFromSourceId == biosample_set.biosampleId, how="left") + self.df.join( + biosample_set, + self.df.biosampleFromSourceId == biosample_set.biosampleId, + how="left", + ) .withColumn( "isIdFound", f.when( @@ -450,3 +475,118 @@ def validate_biosample(self: StudyIndex, biosample_index: BiosampleIndex) -> Stu ) return StudyIndex(_df=validated_df, _schema=StudyIndex.get_schema()) + + def annotate_sumstats_qc( + self: StudyIndex, + sumstats_qc: DataFrame, + threshold_mean_beta: float = 0.05, + threshold_mean_diff_pz: float = 0.05, + threshold_se_diff_pz: float = 0.05, + threshold_min_gc_lambda: float = 0.7, + threshold_max_gc_lambda: float = 2.5, + threshold_min_n_variants: int = 2_000_000, + ) -> StudyIndex: + """Annotate summary stats QC information. + + Args: + sumstats_qc (DataFrame): containing summary statistics-based quality controls. + threshold_mean_beta (float): Threshold for mean beta check. Defaults to 0.05. + threshold_mean_diff_pz (float): Threshold for mean diff PZ check. Defaults to 0.05. + threshold_se_diff_pz (float): Threshold for SE diff PZ check. Defaults to 0.05. + threshold_min_gc_lambda (float): Minimum threshold for GC lambda check. Defaults to 0.7. + threshold_max_gc_lambda (float): Maximum threshold for GC lambda check. Defaults to 2.5. + threshold_min_n_variants (int): Minimum number of variants for SuSiE check. Defaults to 2_000_000. + + Returns: + StudyIndex: Updated study index with QC information + """ + # convert all columns in sumstats_qc dataframe in array of structs grouped by studyId + cols = [c for c in sumstats_qc.columns if c != "studyId"] + + studies = self.df + + melted_df = convert_from_wide_to_long( + sumstats_qc, + id_vars=["studyId"], + value_vars=cols, + var_name="QCCheckName", + value_name="QCCheckValue", + ) + + qc_df = ( + melted_df.groupBy("studyId") + .agg( + f.map_from_entries( + f.collect_list( + f.struct(f.col("QCCheckName"), f.col("QCCheckValue")) + ) + ).alias("sumStatQCValues") + ) + .select("studyId", "sumstatQCValues") + ) + + df = ( + studies.drop("sumStatQCValues", "hasSumstats") + .join( + qc_df.withColumn("hasSumstats", f.lit(True)), how="left", on="studyId" + ) + .withColumn("hasSumstats", f.coalesce(f.col("hasSumstats"), f.lit(False))) + .withColumn( + "qualityControls", + StudyIndex.update_quality_flag( + f.col("qualityControls"), + ~f.col("hasSumstats"), + StudyQualityCheck.SUMSTATS_NOT_AVAILABLE, + ), + ) + .withColumn( + "qualityControls", + StudyIndex.update_quality_flag( + f.col("qualityControls"), + ~(f.abs(f.col("sumstatQCValues.mean_beta")) <= threshold_mean_beta), + StudyQualityCheck.FAILED_MEAN_BETA_CHECK, + ), + ) + .withColumn( + "qualityControls", + StudyIndex.update_quality_flag( + f.col("qualityControls"), + ~( + ( + f.abs(f.col("sumstatQCValues.mean_diff_pz")) + <= threshold_mean_diff_pz + ) + & (f.col("sumstatQCValues.se_diff_pz") <= threshold_se_diff_pz) + ), + StudyQualityCheck.FAILED_PZ_CHECK, + ), + ) + .withColumn( + "qualityControls", + StudyIndex.update_quality_flag( + f.col("qualityControls"), + ~( + (f.col("sumstatQCValues.gc_lambda") <= threshold_max_gc_lambda) + & ( + f.col("sumstatQCValues.gc_lambda") + >= threshold_min_gc_lambda + ) + ), + StudyQualityCheck.FAILED_GC_LAMBDA_CHECK, + ), + ) + .withColumn( + "qualityControls", + StudyIndex.update_quality_flag( + f.col("qualityControls"), + (f.col("sumstatQCValues.n_variants") < threshold_min_n_variants), + StudyQualityCheck.SMALL_NUMBER_OF_SNPS, + ), + ) + ) + + # Annotate study index with QC information: + return StudyIndex( + _df=df, + _schema=StudyIndex.get_schema(), + ) diff --git a/src/gentropy/datasource/gwas_catalog/study_index.py b/src/gentropy/datasource/gwas_catalog/study_index.py index ab99be6d1..421f53d0f 100644 --- a/src/gentropy/datasource/gwas_catalog/study_index.py +++ b/src/gentropy/datasource/gwas_catalog/study_index.py @@ -8,9 +8,9 @@ import pyspark.sql.functions as f import pyspark.sql.types as t -from gentropy.common.spark_helpers import column2camel_case, convert_from_wide_to_long +from gentropy.common.spark_helpers import column2camel_case from gentropy.common.utils import parse_efos -from gentropy.dataset.study_index import StudyIndex +from gentropy.dataset.study_index import StudyIndex, StudyQualityCheck if TYPE_CHECKING: from pyspark.sql import Column, DataFrame @@ -354,7 +354,13 @@ def annotate_from_study_curation( if curation_table is None: return self - columns = self.df.columns + studies = self.df + + if "qualityControls" not in studies.columns: + studies = studies.withColumn("qualityControls", f.array()) + + if "analysisFlags" not in studies.columns: + studies = studies.withColumn("analysisFlags", f.array()) # Adding prefix to columns in the curation table: curation_table = curation_table.select( @@ -366,46 +372,34 @@ def annotate_from_study_curation( ] ) - # Create expression how to update/create quality controls dataset: - qualityControls_expression = ( - f.col("curation_qualityControls") - if "qualityControls" not in columns - else f.when( - f.col("curation_qualityControls").isNotNull(), - f.array_union( - f.col("qualityControls"), f.array(f.col("curation_qualityControls")) - ), - ).otherwise(f.col("qualityControls")) - ) - - # Create expression how to update/create analysis flag: - analysis_expression = ( - f.col("curation_analysisFlags") - if "analysisFlags" not in columns - else f.when( - f.col("curation_analysisFlags").isNotNull(), - f.array_union( - f.col("analysisFlags"), f.array(f.col("curation_analysisFlags")) - ), - ).otherwise(f.col("analysisFlags")) - ) - - # Updating columns list. We might or might not list columns twice, but that doesn't matter, unique set will generated: - columns = list(set(columns + ["qualityControls", "analysisFlags"])) - # Based on the curation table, columns needs to be updated: curated_df = ( - self.df.join(curation_table, on="studyId", how="left") + studies.join( + curation_table.withColumn("isCurated", f.lit(True)), + on="studyId", + how="left", + ) + .withColumn("isCurated", f.coalesce(f.col("isCurated"), f.lit(False))) # Updating study type: .withColumn( "studyType", f.coalesce(f.col("curation_studyType"), f.col("studyType")) ) - # Updating quality controls: - .withColumn("qualityControls", qualityControls_expression) # Updating study annotation flags: - .withColumn("analysisFlags", analysis_expression) + .withColumn( + "analysisFlags", + f.array_union(f.col("analysisFlags"), f.col("curation_analysisFlags")), + ) + .withColumn("analysisFlags", f.coalesce(f.col("analysisFlags"), f.array())) + .withColumn( + "qualityControls", + StudyIndex.update_quality_flag( + f.col("qualityControls"), + ~f.col("isCurated"), + StudyQualityCheck.NO_OT_CURATION, + ), + ) # Dropping columns coming from the curation table: - .select(*columns) + .select(*studies.columns) ) return StudyIndexGWASCatalog( _df=curated_df, _schema=StudyIndexGWASCatalog.get_schema() @@ -592,91 +586,6 @@ def annotate_ancestries( self.df = self.df.join(parsed_ancestry_lut, on="studyId", how="left") return self - def annotate_sumstats_info( - self: StudyIndexGWASCatalog, sumstats_lut: DataFrame - ) -> StudyIndexGWASCatalog: - """Annotate summary stat locations. - - This function reads a text file with the list of harmonised studies and annotates the study index with the `hasSumstats` column. - - Args: - sumstats_lut (DataFrame): listing GWAS Catalog summary stats paths - - Returns: - StudyIndexGWASCatalog: including `hasSumstats` column - - Raises: - ValueError: if the sumstats_lut table doesn't have the right columns - """ - if "_c0" not in sumstats_lut.columns: - raise ValueError( - f'Sumstats look-up table needs to have `_c0` column. However it has: {",".join(sumstats_lut.columns)}' - ) - - return StudyIndexGWASCatalog( - _df=self.df.drop("hasSumstats") - .join( - sumstats_lut.select( - f.col("_c0").alias("studyId"), f.lit(True).alias("hasSumstats") - ), - on="studyId", - how="left", - ) - .withColumn("hasSumstats", f.coalesce(f.col("hasSumstats"), f.lit(False))), - _schema=StudyIndexGWASCatalog.get_schema(), - ) - - def annotate_sumstats_qc( - self: StudyIndexGWASCatalog, sumstats_qc: DataFrame - ) -> StudyIndexGWASCatalog: - """Annotate summary stats QC information. - - Args: - sumstats_qc (DataFrame): containing summary statistics-based quality controls. - - Returns: - StudyIndexGWASCatalog: Updated study index with QC information - """ - # convert all columns in sumstats_qc dataframe in array of structs grouped by studyId - cols = [c for c in sumstats_qc.columns if c != "studyId"] - - melted_df = convert_from_wide_to_long( - sumstats_qc, - id_vars=["studyId"], - value_vars=cols, - var_name="QCCheckName", - value_name="QCCheckValue", - ) - qc_df = ( - melted_df.groupBy("studyId") - .agg( - f.collect_list( - f.struct(f.col("QCCheckName"), f.col("QCCheckValue")) - ).alias("sumStatQCValues") - ) - .withColumn("sumStatQCPerformed", f.lit(True)) - .withColumn("hasSumstats", f.lit(True)) - ) - - df = ( - self.df.drop("sumStatQCValues", "sumStatQCPerformed", "hasSumstats") - .join(qc_df, how="left", on="studyId") - .withColumn( - "sumStatQCPerformed", - f.coalesce(f.col("sumStatQCPerformed"), f.lit(False)), - ) - .withColumn( - "sumStatQCValues", f.coalesce(f.col("sumStatQCValues"), f.array()) - ) - ) - - df = df.filter(f.col("sumStatQCPerformed")) - # Annotate study index with QC information: - return StudyIndexGWASCatalog( - _df=df, - _schema=StudyIndexGWASCatalog.get_schema(), - ) - def annotate_discovery_sample_sizes( self: StudyIndexGWASCatalog, ) -> StudyIndexGWASCatalog: diff --git a/src/gentropy/gwas_catalog_study_index.py b/src/gentropy/gwas_catalog_study_index.py index f4d3c406f..6c19b6909 100644 --- a/src/gentropy/gwas_catalog_study_index.py +++ b/src/gentropy/gwas_catalog_study_index.py @@ -21,9 +21,8 @@ class GWASCatalogStudyIndexGenerationStep: This step provides several optional arguments to add additional information to the study index: - - harmonised_studies_index_path: Path to text file containing the list of harmonised studies. If provided it populates the `hasSumstats` column in the study index. - gwas_catalog_study_curation_file: csv file or URL containing the curation table. If provided it annotates the study index with the additional curation information performed by the Open Targets team. - - sumstats_qc_path: Path to the summary statistics QC table. If provided it annotates the study index with the summary statistics QC information in the `sumStatQCValues` and `sumStatQCPerformed` columns (e.g. `n_variants`, `n_variants_sig` etc.). + - sumstats_qc_path: Path to the summary statistics QC table. If provided it annotates the study index with the summary statistics QC information in the `sumStatQCValues` columns (e.g. `n_variants`, `n_variants_sig` etc.). """ def __init__( @@ -32,7 +31,6 @@ def __init__( catalog_study_files: list[str], catalog_ancestry_files: list[str], study_index_path: str, - harmonised_studies_index_path: str | None = None, gwas_catalog_study_curation_file: str | None = None, sumstats_qc_path: str | None = None, ) -> None: @@ -43,7 +41,6 @@ def __init__( catalog_study_files (list[str]): List of raw GWAS catalog studies file. catalog_ancestry_files (list[str]): List of raw ancestry annotations files from GWAS Catalog. study_index_path (str): Output GWAS catalog studies path. - harmonised_studies_index_path (str | None): Path to text file containing the list of harmonised studies. Optional. gwas_catalog_study_curation_file (str | None): csv file or URL containing the curation table. Optional. sumstats_qc_path (str | None): Path to the summary statistics QC table. Optional. @@ -76,13 +73,6 @@ def __init__( gwas_catalog_study_curation ) - # Annotate with has summary statistics if provided: - if harmonised_studies_index_path: - harmonised_studies = session.spark.read.csv( - harmonised_studies_index_path, header=False - ) - study_index = study_index.annotate_sumstats_info(harmonised_studies) - # Annotate with sumstats QC if provided: if sumstats_qc_path: schema = StructType( @@ -99,7 +89,11 @@ def __init__( sumstats_qc = session.spark.read.schema(schema).parquet( sumstats_qc_path, recursiveFileLookup=True ) - study_index = study_index.annotate_sumstats_qc(sumstats_qc) + study_index_with_qc = study_index.annotate_sumstats_qc(sumstats_qc) - # Write the study - study_index.df.write.mode(session.write_mode).parquet(study_index_path) + # Write the study + study_index_with_qc.df.write.mode(session.write_mode).parquet( + study_index_path + ) + else: + study_index.df.write.mode(session.write_mode).parquet(study_index_path) From 5fda7ec4c655facba5589493bdd969c92d6130a2 Mon Sep 17 00:00:00 2001 From: David Ochoa Date: Wed, 16 Oct 2024 20:06:44 +0100 Subject: [PATCH 22/26] test: deprecate obsoleted testt --- .../test_gwas_catalog_study_index.py | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/tests/gentropy/datasource/gwas_catalog/test_gwas_catalog_study_index.py b/tests/gentropy/datasource/gwas_catalog/test_gwas_catalog_study_index.py index 62b750b9f..b3ff4e486 100644 --- a/tests/gentropy/datasource/gwas_catalog/test_gwas_catalog_study_index.py +++ b/tests/gentropy/datasource/gwas_catalog/test_gwas_catalog_study_index.py @@ -31,22 +31,6 @@ def test_parse_study_table(sample_gwas_catalog_studies: DataFrame) -> None: ) -def test_annotate_sumstats( - mock_study_index_gwas_catalog: StudyIndexGWASCatalog, - sample_gwas_catalog_harmonised_sumstats_list: DataFrame, -) -> None: - """Test annotate sumstats of GWASCatalogStudyIndex.""" - mock_study_index_gwas_catalog.df = mock_study_index_gwas_catalog.df.drop( - "summarystatsLocation" - ) - assert isinstance( - mock_study_index_gwas_catalog.annotate_sumstats_info( - sample_gwas_catalog_harmonised_sumstats_list - ), - StudyIndexGWASCatalog, - ) - - def test_study_index_from_source( sample_gwas_catalog_studies: DataFrame, sample_gwas_catalog_ancestries_lut: DataFrame, From 8ba7f7d3159d33de515e144100595462d7a4027f Mon Sep 17 00:00:00 2001 From: David Ochoa Date: Thu, 17 Oct 2024 09:51:06 +0100 Subject: [PATCH 23/26] test: remove colon causing tests to fail --- tests/gentropy/dataset/test_dataset_exclusion.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/gentropy/dataset/test_dataset_exclusion.py b/tests/gentropy/dataset/test_dataset_exclusion.py index 329a0a1d5..1b6fce967 100644 --- a/tests/gentropy/dataset/test_dataset_exclusion.py +++ b/tests/gentropy/dataset/test_dataset_exclusion.py @@ -24,11 +24,11 @@ class TestDataExclusion: # Good study no flag: ("S1", None), # Good study permissive flag: - ("S2", "This type of study is not supported."), - ("S2", "No valid disease identifier found."), + ("S2", "This type of study is not supported"), + ("S2", "No valid disease identifier found"), # Bad study: - ("S3", "The identifier of this study is not unique."), - ("S3", "This type of study is not supported."), + ("S3", "The identifier of this study is not unique"), + ("S3", "This type of study is not supported"), ] @pytest.fixture(autouse=True) From 50a07249da6b8cb47406e6af8a6ddf0106fef940 Mon Sep 17 00:00:00 2001 From: David Ochoa Date: Thu, 17 Oct 2024 10:14:49 +0100 Subject: [PATCH 24/26] test: curation quality controls no longer --- .../test_gwas_catalog_curation.py | 33 ------------------- 1 file changed, 33 deletions(-) diff --git a/tests/gentropy/datasource/gwas_catalog/test_gwas_catalog_curation.py b/tests/gentropy/datasource/gwas_catalog/test_gwas_catalog_curation.py index 4163531cb..0dd1a7363 100644 --- a/tests/gentropy/datasource/gwas_catalog/test_gwas_catalog_curation.py +++ b/tests/gentropy/datasource/gwas_catalog/test_gwas_catalog_curation.py @@ -143,39 +143,6 @@ def test_curation__study_type_update( assert expected == observed - # Test update qc flag - @staticmethod - def test_curation__quality_controls( - mock_gwas_study_index: StudyIndexGWASCatalog, mock_study_curation: DataFrame - ) -> None: - """Test for making sure the study type got updated.""" - curated = mock_gwas_study_index.annotate_from_study_curation( - mock_study_curation - ) - - # Expected studyIds: - expected = [ - row["studyId"] - for row in ( - mock_study_curation.filter(f.col("qualityControls").isNotNull()) - .select("studyId") - .distinct() - .collect() - ) - ] - - observed = [ - row["studyId"] - for row in ( - curated.df.filter(f.size(f.col("qualityControls")) > 0) - .select("studyId") - .distinct() - .collect() - ) - ] - - assert expected == observed - # Test updated method flag @staticmethod def test_curation__analysis_flags( From 1bbc5d9f70dcbd1383c0ae5da9f29d90ab605f0b Mon Sep 17 00:00:00 2001 From: Yakov Tsepilov Date: Thu, 17 Oct 2024 23:29:47 +0100 Subject: [PATCH 25/26] fix: changing mapping for ancestries adding CSA --- .../data/gwas_population_2_LD_panel_map.json | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/gentropy/assets/data/gwas_population_2_LD_panel_map.json b/src/gentropy/assets/data/gwas_population_2_LD_panel_map.json index fa00089c6..4cc7dcb3b 100644 --- a/src/gentropy/assets/data/gwas_population_2_LD_panel_map.json +++ b/src/gentropy/assets/data/gwas_population_2_LD_panel_map.json @@ -2,19 +2,19 @@ "European": "nfe", "African American or Afro-Caribbean": "afr", "Native American": "amr", - "Asian unspecified": "eas", + "Asian unspecified": "csa", "Hispanic or Latin American": "amr", "East Asian": "eas", - "Central Asian": "eas", - "Oceanian": "eas", - "South East Asian": "eas", + "Central Asian": "csa", + "Oceanian": "csa", + "South East Asian": "csa", "Other admixed ancestry": "nfe", "African unspecified": "afr", "Sub-Saharan African": "afr", - "Greater Middle Eastern (Middle Eastern, North African or Persian)": "eas", - "Aboriginal Australian": "eas", + "Greater Middle Eastern (Middle Eastern, North African or Persian)": "csa", + "Aboriginal Australian": "csa", "Other": "nfe", - "South Asian": "eas", + "South Asian": "csa", "NR": "nfe", "Finnish": "fin" } From b874c2e2da5b09975662a0fff8f4ab4df43be813 Mon Sep 17 00:00:00 2001 From: Yakov Tsepilov Date: Fri, 18 Oct 2024 12:12:55 +0100 Subject: [PATCH 26/26] fix: revert changes in mapping --- .../data/gwas_population_2_LD_panel_map.json | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/gentropy/assets/data/gwas_population_2_LD_panel_map.json b/src/gentropy/assets/data/gwas_population_2_LD_panel_map.json index 4cc7dcb3b..fa00089c6 100644 --- a/src/gentropy/assets/data/gwas_population_2_LD_panel_map.json +++ b/src/gentropy/assets/data/gwas_population_2_LD_panel_map.json @@ -2,19 +2,19 @@ "European": "nfe", "African American or Afro-Caribbean": "afr", "Native American": "amr", - "Asian unspecified": "csa", + "Asian unspecified": "eas", "Hispanic or Latin American": "amr", "East Asian": "eas", - "Central Asian": "csa", - "Oceanian": "csa", - "South East Asian": "csa", + "Central Asian": "eas", + "Oceanian": "eas", + "South East Asian": "eas", "Other admixed ancestry": "nfe", "African unspecified": "afr", "Sub-Saharan African": "afr", - "Greater Middle Eastern (Middle Eastern, North African or Persian)": "csa", - "Aboriginal Australian": "csa", + "Greater Middle Eastern (Middle Eastern, North African or Persian)": "eas", + "Aboriginal Australian": "eas", "Other": "nfe", - "South Asian": "csa", + "South Asian": "eas", "NR": "nfe", "Finnish": "fin" }