Skip to content

Commit

Permalink
feat: gwas catalog top-hit + study step (#808)
Browse files Browse the repository at this point in the history
* fix: wrong step parameter

* fix: persist va_subset

* fix: remove broadcasts

* feat: new gwas_catalog_top_hits step

* docs: new step added to documentation

* fix: incorrect target

* fix: failing tests

* fix: extra argument

* fix: select does not require hasSumstats anymore

* feat: study inclusion step repurposed into study index step

* docs: fix path for documentation

* feat: remove GWASCatalogIngestionStep as it will no longer be necessary

* fix: gwas catalog study curation step

* refactor: rename step

* perf: repartition study locus before PICS to gain parallellisation

* fix: incorrect partitioning

* revert: partitioning

* fix: drop duplicate rows after ingesting associations

* fix: fix in study index ingestion

* fix: v1

* feat: working study index with sumstats qc and curation

* test: deprecate obsoleted testt

* test: remove colon causing tests to fail

* test: curation quality controls no longer

* fix: changing mapping for ancestries adding CSA

* fix: revert changes in mapping

---------

Co-authored-by: Daniel Suveges <daniel.suveges@protonmail.com>
Co-authored-by: Vivien Ho <56025826+vivienho@users.noreply.github.com>
Co-authored-by: Yakov <yt4@sanger.ac.uk>
Co-authored-by: Szymon Szyszkowski <69353402+project-defiant@users.noreply.github.com>
  • Loading branch information
5 people authored Oct 22, 2024
1 parent 782a458 commit df220e9
Show file tree
Hide file tree
Showing 17 changed files with 436 additions and 488 deletions.
5 changes: 0 additions & 5 deletions docs/python_api/steps/gwas_catalog_inclusion.md

This file was deleted.

5 changes: 0 additions & 5 deletions docs/python_api/steps/gwas_catalog_ingestion.md

This file was deleted.

5 changes: 5 additions & 0 deletions docs/python_api/steps/gwas_catalog_study_index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
title: gwas_catalog_study_inclusion
---

::: gentropy.gwas_catalog_study_index.GWASCatalogStudyIndexGenerationStep
5 changes: 5 additions & 0 deletions docs/python_api/steps/gwas_catalog_top_hits.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
title: GWAS Catalog Top Hits Ingestion Step
---

::: gentropy.gwas_catalog_top_hits.GWASCatalogTopHitIngestionStep
31 changes: 5 additions & 26 deletions src/gentropy/assets/schemas/study_index.json
Original file line number Diff line number Diff line change
Expand Up @@ -264,33 +264,12 @@
"metadata": {}
},
{
"name": "sumStatQCPerformed",
"type": "boolean",
"nullable": true,
"metadata": {}
},
{
"name": "sumStatQCValues",
"name": "sumstatQCValues",
"type": {
"type": "array",
"elementType": {
"type": "struct",
"fields": [
{
"name": "QCCheckName",
"type": "string",
"nullable": true,
"metadata": {}
},
{
"name": "QCCheckValue",
"type": "float",
"nullable": true,
"metadata": {}
}
]
},
"containsNull": true
"type": "map",
"keyType": "string",
"valueType": "float",
"valueContainsNull": true
},
"nullable": true,
"metadata": {}
Expand Down
36 changes: 15 additions & 21 deletions src/gentropy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,44 +68,36 @@ class GWASCatalogStudyCurationConfig(StepConfig):

catalog_study_files: list[str] = MISSING
catalog_ancestry_files: list[str] = MISSING
catalog_sumstats_lut: str = MISSING
gwas_catalog_study_curation_out: str = MISSING
gwas_catalog_study_curation_file: str = MISSING
_target_: str = "gentropy.gwas_catalog_study_curation.GWASCatalogStudyCurationStep"


@dataclass
class GWASCatalogStudyInclusionConfig(StepConfig):
"""GWAS Catalog study inclusion step configuration."""
class GWASCatalogStudyIndexGenerationStep(StepConfig):
"""GWAS Catalog study index generation."""

catalog_study_files: list[str] = MISSING
catalog_ancestry_files: list[str] = MISSING
catalog_associations_file: str = MISSING
gwas_catalog_study_curation_file: str = MISSING
variant_annotation_path: str = MISSING
harmonised_study_file: str = MISSING
criteria: str = MISSING
inclusion_list_path: str = MISSING
exclusion_list_path: str = MISSING
study_index_path: str = MISSING
gwas_catalog_study_curation_file: str | None = None
sumstats_qc_path: str | None = None
_target_: str = (
"gentropy.gwas_catalog_study_inclusion.GWASCatalogStudyInclusionGenerator"
"gentropy.gwas_catalog_study_index.GWASCatalogStudyIndexGenerationStep"
)


@dataclass
class GWASCatalogIngestionConfig(StepConfig):
class GWASCatalogTopHitIngestionConfig(StepConfig):
"""GWAS Catalog ingestion step configuration."""

catalog_study_files: list[str] = MISSING
catalog_ancestry_files: list[str] = MISSING
catalog_sumstats_lut: str = MISSING
catalog_associations_file: str = MISSING
variant_annotation_path: str = MISSING
catalog_studies_out: str = MISSING
catalog_associations_out: str = MISSING
gwas_catalog_study_curation_file: str | None = None
inclusion_list_path: str | None = None
_target_: str = "gentropy.gwas_catalog_ingestion.GWASCatalogIngestionStep"
_target_: str = "gentropy.gwas_catalog_top_hits.GWASCatalogTopHitIngestionStep"


@dataclass
Expand Down Expand Up @@ -658,17 +650,19 @@ def register_config() -> None:
)
cs.store(
group="step",
name="gwas_catalog_study_inclusion",
node=GWASCatalogStudyInclusionConfig,
)
cs.store(
group="step", name="gwas_catalog_ingestion", node=GWASCatalogIngestionConfig
name="gwas_catalog_study_index",
node=GWASCatalogStudyIndexGenerationStep,
)
cs.store(
group="step",
name="gwas_catalog_sumstat_preprocess",
node=GWASCatalogSumstatsPreprocessConfig,
)
cs.store(
group="step",
name="gwas_catalog_top_hit_ingestion",
node=GWASCatalogTopHitIngestionConfig,
)
cs.store(group="step", name="ld_based_clumping", node=LDBasedClumpingConfig)
cs.store(group="step", name="ld_index", node=LDIndexConfig)
cs.store(group="step", name="locus_to_gene", node=LocusToGeneConfig)
Expand Down
156 changes: 148 additions & 8 deletions src/gentropy/dataset/study_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from gentropy.assets import data
from gentropy.common.schemas import parse_spark_schema
from gentropy.common.spark_helpers import convert_from_wide_to_long
from gentropy.dataset.dataset import Dataset

if TYPE_CHECKING:
Expand All @@ -32,13 +33,29 @@ class StudyQualityCheck(Enum):
UNKNOWN_STUDY_TYPE (str): Indicating the provided type of study is not supported.
UNKNOWN_BIOSAMPLE (str): Flagging if a biosample identifier is not found in the reference.
DUPLICATED_STUDY (str): Flagging if a study identifier is not unique.
SUMSTATS_NOT_AVAILABLE (str): Flagging if harmonized summary statistics are not available or empty.
NO_OT_CURATION (str): Flagging if a study has not been curated by Open Targets.
FAILED_MEAN_BETA_CHECK (str): Flagging if the mean beta QC check value is not within the expected range.
FAILED_PZ_CHECK (str): Flagging if the PZ QC check values are not within the expected range.
FAILED_GC_LAMBDA_CHECK (str): Flagging if the GC lambda value is not within the expected range.
SMALL_NUMBER_OF_SNPS (str): Flagging if the number of SNPs in the study is below the expected threshold.
"""

UNRESOLVED_TARGET = "Target/gene identifier could not match to reference."
UNRESOLVED_DISEASE = "No valid disease identifier found."
UNKNOWN_STUDY_TYPE = "This type of study is not supported."
UNKNOWN_BIOSAMPLE = "Biosample identifier was not found in the reference."
DUPLICATED_STUDY = "The identifier of this study is not unique."
UNRESOLVED_TARGET = "Target/gene identifier could not match to reference"
UNRESOLVED_DISEASE = "No valid disease identifier found"
UNKNOWN_STUDY_TYPE = "This type of study is not supported"
UNKNOWN_BIOSAMPLE = "Biosample identifier was not found in the reference"
DUPLICATED_STUDY = "The identifier of this study is not unique"
SUMSTATS_NOT_AVAILABLE = "Harmonized summary statistics are not available or empty"
NO_OT_CURATION = "GWAS Catalog study has not been curated by Open Targets"
FAILED_MEAN_BETA_CHECK = (
"The mean beta QC check value is not within the expected range"
)
FAILED_PZ_CHECK = "The PZ QC check values are not within the expected range"
FAILED_GC_LAMBDA_CHECK = "The GC lambda value is not within the expected range"
SMALL_NUMBER_OF_SNPS = (
"The number of SNPs in the study is below the expected threshold"
)


@dataclass
Expand Down Expand Up @@ -410,7 +427,9 @@ def validate_target(self: StudyIndex, target_index: GeneIndex) -> StudyIndex:

return StudyIndex(_df=validated_df, _schema=StudyIndex.get_schema())

def validate_biosample(self: StudyIndex, biosample_index: BiosampleIndex) -> StudyIndex:
def validate_biosample(
self: StudyIndex, biosample_index: BiosampleIndex
) -> StudyIndex:
"""Validating biosample identifiers in the study index against the provided biosample index.
Args:
Expand All @@ -419,7 +438,9 @@ def validate_biosample(self: StudyIndex, biosample_index: BiosampleIndex) -> Stu
Returns:
StudyIndex: where non-gwas studies are flagged if biosampleIndex could not be validated.
"""
biosample_set = biosample_index.df.select("biosampleId", f.lit(True).alias("isIdFound"))
biosample_set = biosample_index.df.select(
"biosampleId", f.lit(True).alias("isIdFound")
)

# If biosampleId in df, we need to drop it:
if "biosampleId" in self.df.columns:
Expand All @@ -430,7 +451,11 @@ def validate_biosample(self: StudyIndex, biosample_index: BiosampleIndex) -> Stu
return self

validated_df = (
self.df.join(biosample_set, self.df.biosampleFromSourceId == biosample_set.biosampleId, how="left")
self.df.join(
biosample_set,
self.df.biosampleFromSourceId == biosample_set.biosampleId,
how="left",
)
.withColumn(
"isIdFound",
f.when(
Expand All @@ -450,3 +475,118 @@ def validate_biosample(self: StudyIndex, biosample_index: BiosampleIndex) -> Stu
)

return StudyIndex(_df=validated_df, _schema=StudyIndex.get_schema())

def annotate_sumstats_qc(
self: StudyIndex,
sumstats_qc: DataFrame,
threshold_mean_beta: float = 0.05,
threshold_mean_diff_pz: float = 0.05,
threshold_se_diff_pz: float = 0.05,
threshold_min_gc_lambda: float = 0.7,
threshold_max_gc_lambda: float = 2.5,
threshold_min_n_variants: int = 2_000_000,
) -> StudyIndex:
"""Annotate summary stats QC information.
Args:
sumstats_qc (DataFrame): containing summary statistics-based quality controls.
threshold_mean_beta (float): Threshold for mean beta check. Defaults to 0.05.
threshold_mean_diff_pz (float): Threshold for mean diff PZ check. Defaults to 0.05.
threshold_se_diff_pz (float): Threshold for SE diff PZ check. Defaults to 0.05.
threshold_min_gc_lambda (float): Minimum threshold for GC lambda check. Defaults to 0.7.
threshold_max_gc_lambda (float): Maximum threshold for GC lambda check. Defaults to 2.5.
threshold_min_n_variants (int): Minimum number of variants for SuSiE check. Defaults to 2_000_000.
Returns:
StudyIndex: Updated study index with QC information
"""
# convert all columns in sumstats_qc dataframe in array of structs grouped by studyId
cols = [c for c in sumstats_qc.columns if c != "studyId"]

studies = self.df

melted_df = convert_from_wide_to_long(
sumstats_qc,
id_vars=["studyId"],
value_vars=cols,
var_name="QCCheckName",
value_name="QCCheckValue",
)

qc_df = (
melted_df.groupBy("studyId")
.agg(
f.map_from_entries(
f.collect_list(
f.struct(f.col("QCCheckName"), f.col("QCCheckValue"))
)
).alias("sumStatQCValues")
)
.select("studyId", "sumstatQCValues")
)

df = (
studies.drop("sumStatQCValues", "hasSumstats")
.join(
qc_df.withColumn("hasSumstats", f.lit(True)), how="left", on="studyId"
)
.withColumn("hasSumstats", f.coalesce(f.col("hasSumstats"), f.lit(False)))
.withColumn(
"qualityControls",
StudyIndex.update_quality_flag(
f.col("qualityControls"),
~f.col("hasSumstats"),
StudyQualityCheck.SUMSTATS_NOT_AVAILABLE,
),
)
.withColumn(
"qualityControls",
StudyIndex.update_quality_flag(
f.col("qualityControls"),
~(f.abs(f.col("sumstatQCValues.mean_beta")) <= threshold_mean_beta),
StudyQualityCheck.FAILED_MEAN_BETA_CHECK,
),
)
.withColumn(
"qualityControls",
StudyIndex.update_quality_flag(
f.col("qualityControls"),
~(
(
f.abs(f.col("sumstatQCValues.mean_diff_pz"))
<= threshold_mean_diff_pz
)
& (f.col("sumstatQCValues.se_diff_pz") <= threshold_se_diff_pz)
),
StudyQualityCheck.FAILED_PZ_CHECK,
),
)
.withColumn(
"qualityControls",
StudyIndex.update_quality_flag(
f.col("qualityControls"),
~(
(f.col("sumstatQCValues.gc_lambda") <= threshold_max_gc_lambda)
& (
f.col("sumstatQCValues.gc_lambda")
>= threshold_min_gc_lambda
)
),
StudyQualityCheck.FAILED_GC_LAMBDA_CHECK,
),
)
.withColumn(
"qualityControls",
StudyIndex.update_quality_flag(
f.col("qualityControls"),
(f.col("sumstatQCValues.n_variants") < threshold_min_n_variants),
StudyQualityCheck.SMALL_NUMBER_OF_SNPS,
),
)
)

# Annotate study index with QC information:
return StudyIndex(
_df=df,
_schema=StudyIndex.get_schema(),
)
17 changes: 9 additions & 8 deletions src/gentropy/datasource/gwas_catalog/associations.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,9 @@ def _map_variants_to_gnomad_variants(
"chromosome",
# Calculate the position in Ensembl coordinates for indels:
GWASCatalogCuratedAssociationsParser.convert_gnomad_position_to_ensembl(
f.col("position"), f.col("referenceAllele"), f.col("alternateAllele")
f.col("position"),
f.col("referenceAllele"),
f.col("alternateAllele"),
).alias("ensemblPosition"),
# Keeping GnomAD position:
"position",
Expand All @@ -240,11 +242,7 @@ def _map_variants_to_gnomad_variants(
"alleleFrequencies",
variant_index.max_maf().alias("maxMaf"),
).join(
f.broadcast(
gwas_associations_subset.select(
"chromosome", "ensemblPosition"
).distinct()
),
gwas_associations_subset.select("chromosome", "ensemblPosition").distinct(),
on=["chromosome", "ensemblPosition"],
how="inner",
)
Expand All @@ -253,7 +251,7 @@ def _map_variants_to_gnomad_variants(
# based on rsIds or allele concordance)
filtered_associations = (
gwas_associations_subset.join(
f.broadcast(va_subset),
va_subset,
on=["chromosome", "ensemblPosition"],
how="left",
)
Expand Down Expand Up @@ -1108,7 +1106,10 @@ def from_source(
pvalue_threshold is keeped in sync with the WindowBasedClumpingStep gwas_significance.
"""
return StudyLocusGWASCatalog(
_df=gwas_associations.withColumn(
_df=gwas_associations
# drop duplicate rows
.distinct()
.withColumn(
"studyLocusId", f.monotonically_increasing_id().cast(StringType())
)
.transform(
Expand Down
Loading

0 comments on commit df220e9

Please sign in to comment.