Skip to content

Commit

Permalink
Merge pull request #150 from opentargets/tskir-3095-finngen-sumstat
Browse files Browse the repository at this point in the history
[Preprocess #1] Business logic for FinnGen summary stats ingestion
  • Loading branch information
DSuveges authored Oct 10, 2023
2 parents f02ae41 + 66fa6eb commit f8ed420
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 17 deletions.
1 change: 1 addition & 0 deletions config/datasets/gcp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ ld_index: ${datasets.outputs}/ld_index
catalog_study_index: ${datasets.outputs}/catalog_study_index
catalog_study_locus: ${datasets.outputs}/catalog_study_locus
finngen_study_index: ${datasets.outputs}/finngen_study_index
finngen_summary_stats: ${datasets.outputs}/finngen_summary_stats
ukbiobank_study_index: ${datasets.outputs}/ukbiobank_study_index

# Constants
Expand Down
1 change: 1 addition & 0 deletions config/step/my_finngen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ finngen_release_prefix: ${datasets.finngen_release_prefix}
finngen_sumstat_url_prefix: ${datasets.finngen_sumstat_url_prefix}
finngen_sumstat_url_suffix: ${datasets.finngen_sumstat_url_suffix}
finngen_study_index_out: ${datasets.finngen_study_index}
finngen_summary_stats_out: ${datasets.finngen_summary_stats}
2 changes: 2 additions & 0 deletions src/otg/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ class FinnGenStepConfig:
finngen_sumstat_url_prefix (str): URL prefix for summary statistics location.
finngen_sumstat_url_suffix (str): URL prefix suffix for summary statistics location.
finngen_study_index_out (str): Output path for the FinnGen study index dataset.
finngen_summary_stats_out (str): Output path for the FinnGen summary statistics.
"""

_target_: str = "otg.finngen.FinnGenStep"
Expand All @@ -281,6 +282,7 @@ class FinnGenStepConfig:
finngen_sumstat_url_prefix: str = MISSING
finngen_sumstat_url_suffix: str = MISSING
finngen_study_index_out: str = MISSING
finngen_summary_stats_out: str = MISSING


@dataclass
Expand Down
22 changes: 9 additions & 13 deletions src/otg/datasource/finngen/study_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ def from_source(
cls: type[FinnGenStudyIndex],
finngen_studies: DataFrame,
finngen_release_prefix: str,
finngen_sumstat_url_prefix: str,
finngen_sumstat_url_suffix: str,
finngen_summary_stats_url_prefix: str,
finngen_summary_stats_url_suffix: str,
) -> FinnGenStudyIndex:
"""This function ingests study level metadata from FinnGen.
Expand All @@ -51,6 +51,7 @@ def from_source(
f.col("phenostring").alias("traitFromSource"),
f.col("num_cases").alias("nCases"),
f.col("num_controls").alias("nControls"),
(f.col("num_cases") + f.col("num_controls")).alias("nSamples"),
f.lit(finngen_release_prefix).alias("projectId"),
f.lit("gwas").alias("studyType"),
f.lit(True).alias("hasSumstats"),
Expand All @@ -63,19 +64,14 @@ def from_source(
f.lit("Finnish").alias("ancestry"),
)
).alias("discoverySamples"),
)
.withColumn("nSamples", f.col("nCases") + f.col("nControls"))
.withColumn(
"summarystatsLocation",
f.concat(
f.lit(finngen_sumstat_url_prefix),
f.col("studyId"),
f.lit(finngen_sumstat_url_suffix),
),
)
.withColumn(
f.lit(finngen_summary_stats_url_prefix),
f.col("phenocode"),
f.lit(finngen_summary_stats_url_suffix),
).alias("summarystatsLocation"),
).withColumn(
"ldPopulationStructure",
cls.aggregate_and_map_ancestries(f.col("discoverySamples")),
),
_schema=FinnGenStudyIndex.get_schema(),
_schema=cls.get_schema(),
)
69 changes: 69 additions & 0 deletions src/otg/datasource/finngen/summary_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""Summary statistics ingestion for FinnGen."""

from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING

import pyspark.sql.functions as f
import pyspark.sql.types as t

from otg.common.utils import calculate_confidence_interval, parse_pvalue
from otg.dataset.summary_statistics import SummaryStatistics

if TYPE_CHECKING:
from pyspark.sql import DataFrame


@dataclass
class FinnGenSummaryStats(SummaryStatistics):
"""Summary statistics dataset for FinnGen."""

@classmethod
def from_finngen_harmonized_summary_stats(
cls: type[FinnGenSummaryStats],
summary_stats_df: DataFrame,
) -> FinnGenSummaryStats:
"""Ingests all summary statst for all FinnGen studies."""
processed_summary_stats_df = (
summary_stats_df
# Drop rows which don't have proper position.
.filter(f.col("pos").cast(t.IntegerType()).isNotNull()).select(
# From the full path, extracts just the filename, and converts to upper case to get the study ID.
f.upper(f.regexp_extract(f.input_file_name(), r"([^/]+)\.gz", 1)).alias(
"studyId"
),
# Add variant information.
f.concat_ws(
"_",
f.col("#chrom"),
f.col("pos"),
f.col("ref"),
f.col("alt"),
).alias("variantId"),
f.col("#chrom").alias("chromosome"),
f.col("pos").cast(t.IntegerType()).alias("position"),
# Parse p-value into mantissa and exponent.
*parse_pvalue(f.col("pval")),
# Add beta, standard error, and allele frequency information.
f.col("beta").cast("double"),
f.col("sebeta").cast("double").alias("standardError"),
f.col("af_alt").cast("float").alias("effectAlleleFrequencyFromSource"),
)
# Calculating the confidence intervals.
.select(
"*",
*calculate_confidence_interval(
f.col("pValueMantissa"),
f.col("pValueExponent"),
f.col("beta"),
f.col("standardError"),
),
)
)

# Initializing summary statistics object:
return cls(
_df=processed_summary_stats_df,
_schema=cls.get_schema(),
)
25 changes: 22 additions & 3 deletions src/otg/finngen.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@
from otg.common.session import Session
from otg.config import FinnGenStepConfig
from otg.datasource.finngen.study_index import FinnGenStudyIndex
from otg.datasource.finngen.summary_stats import FinnGenSummaryStats


@dataclass
class FinnGenStep(FinnGenStepConfig):
"""FinnGen study table ingestion step."""
"""FinnGen ingestion step."""

session: Session = Session()

def run(self: FinnGenStep) -> None:
"""Run FinnGen study table ingestion step."""
"""Run FinnGen ingestion step."""
# Read the JSON data from the URL.
json_data = urlopen(self.finngen_phenotype_table_url).read().decode("utf-8")
rdd = self.session.spark.sparkContext.parallelize([json_data])
Expand All @@ -31,7 +32,25 @@ def run(self: FinnGenStep) -> None:
self.finngen_sumstat_url_suffix,
)

# Write the output.
# Write the study index output.
finngen_studies.df.write.mode(self.session.write_mode).parquet(
self.finngen_study_index_out
)

# Prepare list of files for ingestion.
input_filenames = [
row.summarystatsLocation for row in finngen_studies.collect()
]
summary_stats_df = self.session.spark.read.option("delimiter", "\t").csv(
input_filenames, header=True
)

# Specify data processing instructions.
summary_stats_df = FinnGenSummaryStats.from_finngen_harmonized_summary_stats(
summary_stats_df
).df

# Sort and partition for output.
summary_stats_df.sortWithinPartitions("position").write.partitionBy(
"studyId", "chromosome"
).mode(self.session.write_mode).parquet(self.finngen_summary_stats_out)
23 changes: 22 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from otg.dataset.variant_annotation import VariantAnnotation
from otg.dataset.variant_index import VariantIndex
from otg.datasource.finngen.study_index import FinnGenStudyIndex
from otg.datasource.finngen.summary_stats import FinnGenSummaryStats
from otg.datasource.gwas_catalog.associations import GWASCatalogAssociations
from otg.datasource.gwas_catalog.study_index import GWASCatalogStudyIndex
from otg.datasource.ukbiobank.study_index import UKBiobankStudyIndex
Expand Down Expand Up @@ -149,13 +150,22 @@ def mock_study_index_gwas_catalog(spark: SparkSession) -> GWASCatalogStudyIndex:

@pytest.fixture()
def mock_study_index_finngen(spark: SparkSession) -> FinnGenStudyIndex:
"""Mock StudyIndexFinnGen dataset."""
"""Mock FinnGenStudyIndex dataset."""
return FinnGenStudyIndex(
_df=mock_study_index_data(spark),
_schema=StudyIndex.get_schema(),
)


@pytest.fixture()
def mock_summary_stats_finngen(spark: SparkSession) -> FinnGenSummaryStats:
"""Mock FinnGenSummaryStats dataset."""
return FinnGenSummaryStats(
_df=mock_summary_statistics(spark),
_schema=SummaryStatistics.get_schema(),
)


@pytest.fixture()
def mock_study_index_ukbiobank(spark: SparkSession) -> UKBiobankStudyIndex:
"""Mock StudyIndexUKBiobank dataset."""
Expand Down Expand Up @@ -485,6 +495,17 @@ def sample_finngen_studies(spark: SparkSession) -> DataFrame:
return spark.read.json(rdd)


@pytest.fixture()
def sample_finngen_summary_stats(spark: SparkSession) -> DataFrame:
"""Sample FinnGen summary stats."""
# For reference, the sample file was generated with the following command:
# gsutil cat gs://finngen-public-data-r9/summary_stats/finngen_R9_AB1_ACTINOMYCOSIS.gz | gzip -cd | head -n11 | gzip -c > tests/data_samples/finngen_R9_AB1_ACTINOMYCOSIS.gz
# It's important for the test file to be named in exactly this way, because FinnGen study ID is populated based on input file name.
return spark.read.option("delimiter", "\t").csv(
"tests/data_samples/finngen_R9_AB1_ACTINOMYCOSIS.gz", header=True
)


@pytest.fixture()
def sample_ukbiobank_studies(spark: SparkSession) -> DataFrame:
"""Sample UKBiobank manifest."""
Expand Down
Binary file added tests/data_samples/finngen_R9_AB1_ACTINOMYCOSIS.gz
Binary file not shown.
20 changes: 20 additions & 0 deletions tests/datasource/finngen/test_finngen_summary_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""Tests for study index dataset from FinnGen."""

from __future__ import annotations

from pyspark.sql import DataFrame

from otg.dataset.summary_statistics import SummaryStatistics
from otg.datasource.finngen.summary_stats import FinnGenSummaryStats


def test_finngen_summary_stats_from_source(
sample_finngen_summary_stats: DataFrame,
) -> None:
"""Test summary statistics from source."""
assert isinstance(
FinnGenSummaryStats.from_finngen_harmonized_summary_stats(
sample_finngen_summary_stats
),
SummaryStatistics,
)

0 comments on commit f8ed420

Please sign in to comment.