diff --git a/config/datasets/gcp.yaml b/config/datasets/gcp.yaml index 50d8e3d90..9b7af54a0 100644 --- a/config/datasets/gcp.yaml +++ b/config/datasets/gcp.yaml @@ -38,6 +38,7 @@ ld_index: ${datasets.outputs}/ld_index catalog_study_index: ${datasets.outputs}/catalog_study_index catalog_study_locus: ${datasets.outputs}/catalog_study_locus finngen_study_index: ${datasets.outputs}/finngen_study_index +finngen_summary_stats: ${datasets.outputs}/finngen_summary_stats ukbiobank_study_index: ${datasets.outputs}/ukbiobank_study_index # Constants diff --git a/config/step/my_finngen.yaml b/config/step/my_finngen.yaml index 19b07d366..93cb2b3ce 100644 --- a/config/step/my_finngen.yaml +++ b/config/step/my_finngen.yaml @@ -7,3 +7,4 @@ finngen_release_prefix: ${datasets.finngen_release_prefix} finngen_sumstat_url_prefix: ${datasets.finngen_sumstat_url_prefix} finngen_sumstat_url_suffix: ${datasets.finngen_sumstat_url_suffix} finngen_study_index_out: ${datasets.finngen_study_index} +finngen_summary_stats_out: ${datasets.finngen_summary_stats} diff --git a/src/otg/config.py b/src/otg/config.py index 5a7b52e56..163a6bd34 100644 --- a/src/otg/config.py +++ b/src/otg/config.py @@ -273,6 +273,7 @@ class FinnGenStepConfig: finngen_sumstat_url_prefix (str): URL prefix for summary statistics location. finngen_sumstat_url_suffix (str): URL prefix suffix for summary statistics location. finngen_study_index_out (str): Output path for the FinnGen study index dataset. + finngen_summary_stats_out (str): Output path for the FinnGen summary statistics. """ _target_: str = "otg.finngen.FinnGenStep" @@ -281,6 +282,7 @@ class FinnGenStepConfig: finngen_sumstat_url_prefix: str = MISSING finngen_sumstat_url_suffix: str = MISSING finngen_study_index_out: str = MISSING + finngen_summary_stats_out: str = MISSING @dataclass diff --git a/src/otg/datasource/finngen/study_index.py b/src/otg/datasource/finngen/study_index.py index d79a302aa..48829a267 100644 --- a/src/otg/datasource/finngen/study_index.py +++ b/src/otg/datasource/finngen/study_index.py @@ -29,8 +29,8 @@ def from_source( cls: type[FinnGenStudyIndex], finngen_studies: DataFrame, finngen_release_prefix: str, - finngen_sumstat_url_prefix: str, - finngen_sumstat_url_suffix: str, + finngen_summary_stats_url_prefix: str, + finngen_summary_stats_url_suffix: str, ) -> FinnGenStudyIndex: """This function ingests study level metadata from FinnGen. @@ -51,6 +51,7 @@ def from_source( f.col("phenostring").alias("traitFromSource"), f.col("num_cases").alias("nCases"), f.col("num_controls").alias("nControls"), + (f.col("num_cases") + f.col("num_controls")).alias("nSamples"), f.lit(finngen_release_prefix).alias("projectId"), f.lit("gwas").alias("studyType"), f.lit(True).alias("hasSumstats"), @@ -63,19 +64,14 @@ def from_source( f.lit("Finnish").alias("ancestry"), ) ).alias("discoverySamples"), - ) - .withColumn("nSamples", f.col("nCases") + f.col("nControls")) - .withColumn( - "summarystatsLocation", f.concat( - f.lit(finngen_sumstat_url_prefix), - f.col("studyId"), - f.lit(finngen_sumstat_url_suffix), - ), - ) - .withColumn( + f.lit(finngen_summary_stats_url_prefix), + f.col("phenocode"), + f.lit(finngen_summary_stats_url_suffix), + ).alias("summarystatsLocation"), + ).withColumn( "ldPopulationStructure", cls.aggregate_and_map_ancestries(f.col("discoverySamples")), ), - _schema=FinnGenStudyIndex.get_schema(), + _schema=cls.get_schema(), ) diff --git a/src/otg/datasource/finngen/summary_stats.py b/src/otg/datasource/finngen/summary_stats.py new file mode 100644 index 000000000..e2b4807a9 --- /dev/null +++ b/src/otg/datasource/finngen/summary_stats.py @@ -0,0 +1,69 @@ +"""Summary statistics ingestion for FinnGen.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING + +import pyspark.sql.functions as f +import pyspark.sql.types as t + +from otg.common.utils import calculate_confidence_interval, parse_pvalue +from otg.dataset.summary_statistics import SummaryStatistics + +if TYPE_CHECKING: + from pyspark.sql import DataFrame + + +@dataclass +class FinnGenSummaryStats(SummaryStatistics): + """Summary statistics dataset for FinnGen.""" + + @classmethod + def from_finngen_harmonized_summary_stats( + cls: type[FinnGenSummaryStats], + summary_stats_df: DataFrame, + ) -> FinnGenSummaryStats: + """Ingests all summary statst for all FinnGen studies.""" + processed_summary_stats_df = ( + summary_stats_df + # Drop rows which don't have proper position. + .filter(f.col("pos").cast(t.IntegerType()).isNotNull()).select( + # From the full path, extracts just the filename, and converts to upper case to get the study ID. + f.upper(f.regexp_extract(f.input_file_name(), r"([^/]+)\.gz", 1)).alias( + "studyId" + ), + # Add variant information. + f.concat_ws( + "_", + f.col("#chrom"), + f.col("pos"), + f.col("ref"), + f.col("alt"), + ).alias("variantId"), + f.col("#chrom").alias("chromosome"), + f.col("pos").cast(t.IntegerType()).alias("position"), + # Parse p-value into mantissa and exponent. + *parse_pvalue(f.col("pval")), + # Add beta, standard error, and allele frequency information. + f.col("beta").cast("double"), + f.col("sebeta").cast("double").alias("standardError"), + f.col("af_alt").cast("float").alias("effectAlleleFrequencyFromSource"), + ) + # Calculating the confidence intervals. + .select( + "*", + *calculate_confidence_interval( + f.col("pValueMantissa"), + f.col("pValueExponent"), + f.col("beta"), + f.col("standardError"), + ), + ) + ) + + # Initializing summary statistics object: + return cls( + _df=processed_summary_stats_df, + _schema=cls.get_schema(), + ) diff --git a/src/otg/finngen.py b/src/otg/finngen.py index a4d0fef63..b62762598 100644 --- a/src/otg/finngen.py +++ b/src/otg/finngen.py @@ -8,16 +8,17 @@ from otg.common.session import Session from otg.config import FinnGenStepConfig from otg.datasource.finngen.study_index import FinnGenStudyIndex +from otg.datasource.finngen.summary_stats import FinnGenSummaryStats @dataclass class FinnGenStep(FinnGenStepConfig): - """FinnGen study table ingestion step.""" + """FinnGen ingestion step.""" session: Session = Session() def run(self: FinnGenStep) -> None: - """Run FinnGen study table ingestion step.""" + """Run FinnGen ingestion step.""" # Read the JSON data from the URL. json_data = urlopen(self.finngen_phenotype_table_url).read().decode("utf-8") rdd = self.session.spark.sparkContext.parallelize([json_data]) @@ -31,7 +32,25 @@ def run(self: FinnGenStep) -> None: self.finngen_sumstat_url_suffix, ) - # Write the output. + # Write the study index output. finngen_studies.df.write.mode(self.session.write_mode).parquet( self.finngen_study_index_out ) + + # Prepare list of files for ingestion. + input_filenames = [ + row.summarystatsLocation for row in finngen_studies.collect() + ] + summary_stats_df = self.session.spark.read.option("delimiter", "\t").csv( + input_filenames, header=True + ) + + # Specify data processing instructions. + summary_stats_df = FinnGenSummaryStats.from_finngen_harmonized_summary_stats( + summary_stats_df + ).df + + # Sort and partition for output. + summary_stats_df.sortWithinPartitions("position").write.partitionBy( + "studyId", "chromosome" + ).mode(self.session.write_mode).parquet(self.finngen_summary_stats_out) diff --git a/tests/conftest.py b/tests/conftest.py index d68dffe42..b8206996e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -18,6 +18,7 @@ from otg.dataset.variant_annotation import VariantAnnotation from otg.dataset.variant_index import VariantIndex from otg.datasource.finngen.study_index import FinnGenStudyIndex +from otg.datasource.finngen.summary_stats import FinnGenSummaryStats from otg.datasource.gwas_catalog.associations import GWASCatalogAssociations from otg.datasource.gwas_catalog.study_index import GWASCatalogStudyIndex from otg.datasource.ukbiobank.study_index import UKBiobankStudyIndex @@ -149,13 +150,22 @@ def mock_study_index_gwas_catalog(spark: SparkSession) -> GWASCatalogStudyIndex: @pytest.fixture() def mock_study_index_finngen(spark: SparkSession) -> FinnGenStudyIndex: - """Mock StudyIndexFinnGen dataset.""" + """Mock FinnGenStudyIndex dataset.""" return FinnGenStudyIndex( _df=mock_study_index_data(spark), _schema=StudyIndex.get_schema(), ) +@pytest.fixture() +def mock_summary_stats_finngen(spark: SparkSession) -> FinnGenSummaryStats: + """Mock FinnGenSummaryStats dataset.""" + return FinnGenSummaryStats( + _df=mock_summary_statistics(spark), + _schema=SummaryStatistics.get_schema(), + ) + + @pytest.fixture() def mock_study_index_ukbiobank(spark: SparkSession) -> UKBiobankStudyIndex: """Mock StudyIndexUKBiobank dataset.""" @@ -485,6 +495,17 @@ def sample_finngen_studies(spark: SparkSession) -> DataFrame: return spark.read.json(rdd) +@pytest.fixture() +def sample_finngen_summary_stats(spark: SparkSession) -> DataFrame: + """Sample FinnGen summary stats.""" + # For reference, the sample file was generated with the following command: + # gsutil cat gs://finngen-public-data-r9/summary_stats/finngen_R9_AB1_ACTINOMYCOSIS.gz | gzip -cd | head -n11 | gzip -c > tests/data_samples/finngen_R9_AB1_ACTINOMYCOSIS.gz + # It's important for the test file to be named in exactly this way, because FinnGen study ID is populated based on input file name. + return spark.read.option("delimiter", "\t").csv( + "tests/data_samples/finngen_R9_AB1_ACTINOMYCOSIS.gz", header=True + ) + + @pytest.fixture() def sample_ukbiobank_studies(spark: SparkSession) -> DataFrame: """Sample UKBiobank manifest.""" diff --git a/tests/data_samples/finngen_R9_AB1_ACTINOMYCOSIS.gz b/tests/data_samples/finngen_R9_AB1_ACTINOMYCOSIS.gz new file mode 100644 index 000000000..f7086b96c Binary files /dev/null and b/tests/data_samples/finngen_R9_AB1_ACTINOMYCOSIS.gz differ diff --git a/tests/datasource/finngen/test_finngen_summary_stats.py b/tests/datasource/finngen/test_finngen_summary_stats.py new file mode 100644 index 000000000..d9cbbccf8 --- /dev/null +++ b/tests/datasource/finngen/test_finngen_summary_stats.py @@ -0,0 +1,20 @@ +"""Tests for study index dataset from FinnGen.""" + +from __future__ import annotations + +from pyspark.sql import DataFrame + +from otg.dataset.summary_statistics import SummaryStatistics +from otg.datasource.finngen.summary_stats import FinnGenSummaryStats + + +def test_finngen_summary_stats_from_source( + sample_finngen_summary_stats: DataFrame, +) -> None: + """Test summary statistics from source.""" + assert isinstance( + FinnGenSummaryStats.from_finngen_harmonized_summary_stats( + sample_finngen_summary_stats + ), + SummaryStatistics, + )