Skip to content

Commit 9b3f966

Browse files
Merge pull request #168 from opentargets/main
2 parents 08d0c14 + f8ed420 commit 9b3f966

File tree

12 files changed

+173
-25
lines changed

12 files changed

+173
-25
lines changed

.pre-commit-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
repos:
22
- repo: https://github.com/pre-commit/pre-commit-hooks
3-
rev: v4.4.0
3+
rev: v4.5.0
44
hooks:
55
- id: trailing-whitespace
66
- id: end-of-file-fixer

config/datasets/gcp.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ ld_index: ${datasets.outputs}/ld_index
3838
catalog_study_index: ${datasets.outputs}/catalog_study_index
3939
catalog_study_locus: ${datasets.outputs}/catalog_study_locus
4040
finngen_study_index: ${datasets.outputs}/finngen_study_index
41+
finngen_summary_stats: ${datasets.outputs}/finngen_summary_stats
4142
ukbiobank_study_index: ${datasets.outputs}/ukbiobank_study_index
4243

4344
# Constants

config/step/my_finngen.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ finngen_release_prefix: ${datasets.finngen_release_prefix}
77
finngen_sumstat_url_prefix: ${datasets.finngen_sumstat_url_prefix}
88
finngen_sumstat_url_suffix: ${datasets.finngen_sumstat_url_suffix}
99
finngen_study_index_out: ${datasets.finngen_study_index}
10+
finngen_summary_stats_out: ${datasets.finngen_summary_stats}

src/otg/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,7 @@ class FinnGenStepConfig:
273273
finngen_sumstat_url_prefix (str): URL prefix for summary statistics location.
274274
finngen_sumstat_url_suffix (str): URL prefix suffix for summary statistics location.
275275
finngen_study_index_out (str): Output path for the FinnGen study index dataset.
276+
finngen_summary_stats_out (str): Output path for the FinnGen summary statistics.
276277
"""
277278

278279
_target_: str = "otg.finngen.FinnGenStep"
@@ -281,6 +282,7 @@ class FinnGenStepConfig:
281282
finngen_sumstat_url_prefix: str = MISSING
282283
finngen_sumstat_url_suffix: str = MISSING
283284
finngen_study_index_out: str = MISSING
285+
finngen_summary_stats_out: str = MISSING
284286

285287

286288
@dataclass

src/otg/dataset/study_locus.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -315,19 +315,22 @@ def annotate_credible_sets(self: StudyLocus) -> StudyLocus:
315315
Returns:
316316
StudyLocus: including annotation on `is95CredibleSet` and `is99CredibleSet`.
317317
"""
318+
if "locus" not in self.df.columns:
319+
raise ValueError("Locus column not available.")
320+
318321
self.df = self.df.withColumn(
319322
# Sort credible set by posterior probability in descending order
320323
"locus",
321324
f.when(
322-
f.size(f.col("locus")) > 0,
325+
f.col("locus").isNotNull() & (f.size(f.col("locus")) > 0),
323326
order_array_of_structs_by_field("locus", "posteriorProbability"),
324-
).when(f.size(f.col("locus")) == 0, f.col("locus")),
327+
),
325328
).withColumn(
326329
# Calculate array of cumulative sums of posterior probabilities to determine which variants are in the 95% and 99% credible sets
327330
# and zip the cumulative sums array with the credible set array to add the flags
328331
"locus",
329332
f.when(
330-
f.size(f.col("locus")) > 0,
333+
f.col("locus").isNotNull() & (f.size(f.col("locus")) > 0),
331334
f.zip_with(
332335
f.col("locus"),
333336
f.transform(
@@ -345,10 +348,12 @@ def annotate_credible_sets(self: StudyLocus) -> StudyLocus:
345348
),
346349
),
347350
lambda struct_e, acc: struct_e.withField(
348-
CredibleInterval.IS95.value, acc < 0.95
349-
).withField(CredibleInterval.IS99.value, acc < 0.99),
351+
CredibleInterval.IS95.value, (acc < 0.95) & acc.isNotNull()
352+
).withField(
353+
CredibleInterval.IS99.value, (acc < 0.99) & acc.isNotNull()
354+
),
350355
),
351-
).when(f.size(f.col("locus")) == 0, f.col("locus")),
356+
),
352357
)
353358
return self
354359

src/otg/datasource/finngen/study_index.py

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ def from_source(
2929
cls: type[FinnGenStudyIndex],
3030
finngen_studies: DataFrame,
3131
finngen_release_prefix: str,
32-
finngen_sumstat_url_prefix: str,
33-
finngen_sumstat_url_suffix: str,
32+
finngen_summary_stats_url_prefix: str,
33+
finngen_summary_stats_url_suffix: str,
3434
) -> FinnGenStudyIndex:
3535
"""This function ingests study level metadata from FinnGen.
3636
@@ -51,6 +51,7 @@ def from_source(
5151
f.col("phenostring").alias("traitFromSource"),
5252
f.col("num_cases").alias("nCases"),
5353
f.col("num_controls").alias("nControls"),
54+
(f.col("num_cases") + f.col("num_controls")).alias("nSamples"),
5455
f.lit(finngen_release_prefix).alias("projectId"),
5556
f.lit("gwas").alias("studyType"),
5657
f.lit(True).alias("hasSumstats"),
@@ -63,19 +64,14 @@ def from_source(
6364
f.lit("Finnish").alias("ancestry"),
6465
)
6566
).alias("discoverySamples"),
66-
)
67-
.withColumn("nSamples", f.col("nCases") + f.col("nControls"))
68-
.withColumn(
69-
"summarystatsLocation",
7067
f.concat(
71-
f.lit(finngen_sumstat_url_prefix),
72-
f.col("studyId"),
73-
f.lit(finngen_sumstat_url_suffix),
74-
),
75-
)
76-
.withColumn(
68+
f.lit(finngen_summary_stats_url_prefix),
69+
f.col("phenocode"),
70+
f.lit(finngen_summary_stats_url_suffix),
71+
).alias("summarystatsLocation"),
72+
).withColumn(
7773
"ldPopulationStructure",
7874
cls.aggregate_and_map_ancestries(f.col("discoverySamples")),
7975
),
80-
_schema=FinnGenStudyIndex.get_schema(),
76+
_schema=cls.get_schema(),
8177
)
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
"""Summary statistics ingestion for FinnGen."""
2+
3+
from __future__ import annotations
4+
5+
from dataclasses import dataclass
6+
from typing import TYPE_CHECKING
7+
8+
import pyspark.sql.functions as f
9+
import pyspark.sql.types as t
10+
11+
from otg.common.utils import calculate_confidence_interval, parse_pvalue
12+
from otg.dataset.summary_statistics import SummaryStatistics
13+
14+
if TYPE_CHECKING:
15+
from pyspark.sql import DataFrame
16+
17+
18+
@dataclass
19+
class FinnGenSummaryStats(SummaryStatistics):
20+
"""Summary statistics dataset for FinnGen."""
21+
22+
@classmethod
23+
def from_finngen_harmonized_summary_stats(
24+
cls: type[FinnGenSummaryStats],
25+
summary_stats_df: DataFrame,
26+
) -> FinnGenSummaryStats:
27+
"""Ingests all summary statst for all FinnGen studies."""
28+
processed_summary_stats_df = (
29+
summary_stats_df
30+
# Drop rows which don't have proper position.
31+
.filter(f.col("pos").cast(t.IntegerType()).isNotNull()).select(
32+
# From the full path, extracts just the filename, and converts to upper case to get the study ID.
33+
f.upper(f.regexp_extract(f.input_file_name(), r"([^/]+)\.gz", 1)).alias(
34+
"studyId"
35+
),
36+
# Add variant information.
37+
f.concat_ws(
38+
"_",
39+
f.col("#chrom"),
40+
f.col("pos"),
41+
f.col("ref"),
42+
f.col("alt"),
43+
).alias("variantId"),
44+
f.col("#chrom").alias("chromosome"),
45+
f.col("pos").cast(t.IntegerType()).alias("position"),
46+
# Parse p-value into mantissa and exponent.
47+
*parse_pvalue(f.col("pval")),
48+
# Add beta, standard error, and allele frequency information.
49+
f.col("beta").cast("double"),
50+
f.col("sebeta").cast("double").alias("standardError"),
51+
f.col("af_alt").cast("float").alias("effectAlleleFrequencyFromSource"),
52+
)
53+
# Calculating the confidence intervals.
54+
.select(
55+
"*",
56+
*calculate_confidence_interval(
57+
f.col("pValueMantissa"),
58+
f.col("pValueExponent"),
59+
f.col("beta"),
60+
f.col("standardError"),
61+
),
62+
)
63+
)
64+
65+
# Initializing summary statistics object:
66+
return cls(
67+
_df=processed_summary_stats_df,
68+
_schema=cls.get_schema(),
69+
)

src/otg/finngen.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,17 @@
88
from otg.common.session import Session
99
from otg.config import FinnGenStepConfig
1010
from otg.datasource.finngen.study_index import FinnGenStudyIndex
11+
from otg.datasource.finngen.summary_stats import FinnGenSummaryStats
1112

1213

1314
@dataclass
1415
class FinnGenStep(FinnGenStepConfig):
15-
"""FinnGen study table ingestion step."""
16+
"""FinnGen ingestion step."""
1617

1718
session: Session = Session()
1819

1920
def run(self: FinnGenStep) -> None:
20-
"""Run FinnGen study table ingestion step."""
21+
"""Run FinnGen ingestion step."""
2122
# Read the JSON data from the URL.
2223
json_data = urlopen(self.finngen_phenotype_table_url).read().decode("utf-8")
2324
rdd = self.session.spark.sparkContext.parallelize([json_data])
@@ -31,7 +32,25 @@ def run(self: FinnGenStep) -> None:
3132
self.finngen_sumstat_url_suffix,
3233
)
3334

34-
# Write the output.
35+
# Write the study index output.
3536
finngen_studies.df.write.mode(self.session.write_mode).parquet(
3637
self.finngen_study_index_out
3738
)
39+
40+
# Prepare list of files for ingestion.
41+
input_filenames = [
42+
row.summarystatsLocation for row in finngen_studies.collect()
43+
]
44+
summary_stats_df = self.session.spark.read.option("delimiter", "\t").csv(
45+
input_filenames, header=True
46+
)
47+
48+
# Specify data processing instructions.
49+
summary_stats_df = FinnGenSummaryStats.from_finngen_harmonized_summary_stats(
50+
summary_stats_df
51+
).df
52+
53+
# Sort and partition for output.
54+
summary_stats_df.sortWithinPartitions("position").write.partitionBy(
55+
"studyId", "chromosome"
56+
).mode(self.session.write_mode).parquet(self.finngen_summary_stats_out)

tests/conftest.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from otg.dataset.variant_annotation import VariantAnnotation
1919
from otg.dataset.variant_index import VariantIndex
2020
from otg.datasource.finngen.study_index import FinnGenStudyIndex
21+
from otg.datasource.finngen.summary_stats import FinnGenSummaryStats
2122
from otg.datasource.gwas_catalog.associations import GWASCatalogAssociations
2223
from otg.datasource.gwas_catalog.study_index import GWASCatalogStudyIndex
2324
from otg.datasource.ukbiobank.study_index import UKBiobankStudyIndex
@@ -149,13 +150,22 @@ def mock_study_index_gwas_catalog(spark: SparkSession) -> GWASCatalogStudyIndex:
149150

150151
@pytest.fixture()
151152
def mock_study_index_finngen(spark: SparkSession) -> FinnGenStudyIndex:
152-
"""Mock StudyIndexFinnGen dataset."""
153+
"""Mock FinnGenStudyIndex dataset."""
153154
return FinnGenStudyIndex(
154155
_df=mock_study_index_data(spark),
155156
_schema=StudyIndex.get_schema(),
156157
)
157158

158159

160+
@pytest.fixture()
161+
def mock_summary_stats_finngen(spark: SparkSession) -> FinnGenSummaryStats:
162+
"""Mock FinnGenSummaryStats dataset."""
163+
return FinnGenSummaryStats(
164+
_df=mock_summary_statistics(spark),
165+
_schema=SummaryStatistics.get_schema(),
166+
)
167+
168+
159169
@pytest.fixture()
160170
def mock_study_index_ukbiobank(spark: SparkSession) -> UKBiobankStudyIndex:
161171
"""Mock StudyIndexUKBiobank dataset."""
@@ -485,6 +495,17 @@ def sample_finngen_studies(spark: SparkSession) -> DataFrame:
485495
return spark.read.json(rdd)
486496

487497

498+
@pytest.fixture()
499+
def sample_finngen_summary_stats(spark: SparkSession) -> DataFrame:
500+
"""Sample FinnGen summary stats."""
501+
# For reference, the sample file was generated with the following command:
502+
# gsutil cat gs://finngen-public-data-r9/summary_stats/finngen_R9_AB1_ACTINOMYCOSIS.gz | gzip -cd | head -n11 | gzip -c > tests/data_samples/finngen_R9_AB1_ACTINOMYCOSIS.gz
503+
# It's important for the test file to be named in exactly this way, because FinnGen study ID is populated based on input file name.
504+
return spark.read.option("delimiter", "\t").csv(
505+
"tests/data_samples/finngen_R9_AB1_ACTINOMYCOSIS.gz", header=True
506+
)
507+
508+
488509
@pytest.fixture()
489510
def sample_ukbiobank_studies(spark: SparkSession) -> DataFrame:
490511
"""Sample UKBiobank manifest."""
572 Bytes
Binary file not shown.

tests/dataset/test_study_locus.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ def test_clump(mock_study_locus: StudyLocus) -> None:
9898
{"variantId": "tagVariantC", "posteriorProbability": 0.04},
9999
{"variantId": "tagVariantD", "posteriorProbability": 0.005},
100100
{"variantId": "tagVariantE", "posteriorProbability": 0.5},
101+
{"variantId": "tagVariantNull", "posteriorProbability": None},
102+
{"variantId": "tagVariantNull", "posteriorProbability": None},
101103
],
102104
)
103105
],
@@ -138,6 +140,18 @@ def test_clump(mock_study_locus: StudyLocus) -> None:
138140
"is95CredibleSet": False,
139141
"is99CredibleSet": False,
140142
},
143+
{
144+
"variantId": "tagVariantNull",
145+
"posteriorProbability": None,
146+
"is95CredibleSet": False,
147+
"is99CredibleSet": False,
148+
},
149+
{
150+
"variantId": "tagVariantNull",
151+
"posteriorProbability": None,
152+
"is95CredibleSet": False,
153+
"is99CredibleSet": False,
154+
},
141155
],
142156
)
143157
],
@@ -180,7 +194,7 @@ def test_clump(mock_study_locus: StudyLocus) -> None:
180194
1,
181195
"traitA",
182196
"leadB",
183-
[],
197+
None,
184198
)
185199
],
186200
),
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
"""Tests for study index dataset from FinnGen."""
2+
3+
from __future__ import annotations
4+
5+
from pyspark.sql import DataFrame
6+
7+
from otg.dataset.summary_statistics import SummaryStatistics
8+
from otg.datasource.finngen.summary_stats import FinnGenSummaryStats
9+
10+
11+
def test_finngen_summary_stats_from_source(
12+
sample_finngen_summary_stats: DataFrame,
13+
) -> None:
14+
"""Test summary statistics from source."""
15+
assert isinstance(
16+
FinnGenSummaryStats.from_finngen_harmonized_summary_stats(
17+
sample_finngen_summary_stats
18+
),
19+
SummaryStatistics,
20+
)

0 commit comments

Comments
 (0)