Skip to content

Commit 00fe023

Browse files
authored
Merge branch 'main' into feat-code-of-conduct
2 parents 95ae5f1 + 47fb71f commit 00fe023

File tree

15 files changed

+166
-104
lines changed

15 files changed

+166
-104
lines changed

config/datasets/gcp.yaml

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,23 +32,18 @@ variant_annotation: ${datasets.outputs}/variant_annotation
3232
variant_index: ${datasets.outputs}/variant_index
3333
study_locus: ${datasets.outputs}/study_locus
3434
credible_set: ${datasets.outputs}/credible_set
35+
study_index: ${datasets.outputs}/study_index
36+
summary_statistics: ${datasets.outputs}/summary_statistics
3537
study_locus_overlap: ${datasets.outputs}/study_locus_overlap
3638
colocalisation: ${datasets.outputs}/colocalisation
3739
v2g: ${datasets.outputs}/v2g
3840
ld_index: ${datasets.outputs}/ld_index
39-
catalog_study_index: ${datasets.outputs}/catalog_study_index
40-
catalog_study_locus: ${datasets.study_locus}/catalog_curated
41-
finngen_study_index: ${datasets.outputs}/finngen_study_index
42-
finngen_summary_stats: ${datasets.outputs}/finngen_summary_stats
41+
catalog_study_index: ${datasets.study_index}/catalog_curated
42+
catalog_study_locus: ${datasets.credible_set}/catalog_curated
43+
finngen_study_index: ${datasets.study_index}/finngen
44+
finngen_summary_stats: ${datasets.summary_statistics}/finngen
4345
from_sumstats_study_locus: ${datasets.study_locus}/from_sumstats
4446
from_sumstats_pics: ${datasets.credible_set}/from_sumstats
45-
ukbiobank_study_index: ${datasets.outputs}/ukbiobank_study_index
47+
ukbiobank_study_index: ${datasets.study_index}/ukbiobank
4648
l2g_model: ${datasets.outputs}/l2g_model
4749
l2g_predictions: ${datasets.outputs}/l2g_predictions
48-
eqtl_catalogue_study_index_out: ${datasets.outputs}/preprocess/eqtl_catalogue/study_index
49-
eqtl_catalogue_summary_stats_out: ${datasets.outputs}/preprocess/eqtl_catalogue/summary_stats
50-
51-
# Constants
52-
finngen_release_prefix: FINNGEN_R9
53-
finngen_sumstat_url_prefix: gs://finngen-public-data-r9/summary_stats/finngen_R9_
54-
finngen_sumstat_url_suffix: .gz

config/step/finngen.yaml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,3 @@
11
_target_: otg.finngen.FinnGenStep
2-
finngen_phenotype_table_url: ${datasets.finngen_phenotype_table_url}
3-
finngen_release_prefix: ${datasets.finngen_release_prefix}
4-
finngen_sumstat_url_prefix: ${datasets.finngen_sumstat_url_prefix}
5-
finngen_sumstat_url_suffix: ${datasets.finngen_sumstat_url_suffix}
62
finngen_study_index_out: ${datasets.finngen_study_index}
73
finngen_summary_stats_out: ${datasets.finngen_summary_stats}

src/airflow/dags/common_airflow.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
# Shared DAG construction parameters.
5151
shared_dag_args = dict(
5252
owner="Open Targets Data Team",
53-
retries=1,
53+
retries=0,
5454
)
5555
shared_dag_kwargs = dict(
5656
tags=["genetics_etl", "experimental"],
@@ -68,6 +68,7 @@ def create_cluster(
6868
num_preemptible_workers: int = 0,
6969
num_local_ssds: int = 1,
7070
autoscaling_policy: str = GCP_AUTOSCALING_POLICY,
71+
master_disk_size: int = 500,
7172
) -> DataprocCreateClusterOperator:
7273
"""Generate an Airflow task to create a Dataproc cluster. Common parameters are reused, and varying parameters can be specified as needed.
7374
@@ -79,6 +80,7 @@ def create_cluster(
7980
num_preemptible_workers (int): Number of preemptible worker nodes. Defaults to 0.
8081
num_local_ssds (int): How many local SSDs to attach to each worker node, both primary and secondary. Defaults to 1.
8182
autoscaling_policy (str): Name of the autoscaling policy to use. Defaults to GCP_AUTOSCALING_POLICY.
83+
master_disk_size (int): Size of the master node's boot disk in GB. Defaults to 500.
8284
8385
Returns:
8486
DataprocCreateClusterOperator: Airflow task to create a Dataproc cluster.
@@ -89,7 +91,7 @@ def create_cluster(
8991
zone=GCP_ZONE,
9092
master_machine_type=master_machine_type,
9193
worker_machine_type=worker_machine_type,
92-
master_disk_size=500,
94+
master_disk_size=master_disk_size,
9395
worker_disk_size=500,
9496
num_preemptible_workers=num_preemptible_workers,
9597
num_workers=num_workers,
@@ -273,7 +275,6 @@ def delete_cluster(cluster_name: str) -> DataprocDeleteClusterOperator:
273275
cluster_name=cluster_name,
274276
region=GCP_REGION,
275277
trigger_rule=TriggerRule.ALL_DONE,
276-
deferrable=True,
277278
)
278279

279280

src/airflow/dags/dag_genetics_etl.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
this_task = common.submit_step(
2626
cluster_name=CLUSTER_NAME,
2727
step_id=step_id,
28+
task_id=step_id,
2829
)
2930
# Chain prerequisites.
3031
tasks[step_id] = this_task

src/airflow/dags/dag_preprocess.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
CLUSTER_NAME = "otg-preprocess"
1010

11-
ALL_STEPS = ["finngen", "eqtl_catalogue", "ld_index", "variant_annotation"]
11+
ALL_STEPS = ["eqtl_catalogue", "ld_index", "variant_annotation"]
1212

1313

1414
with DAG(
@@ -18,7 +18,7 @@
1818
**common.shared_dag_kwargs,
1919
):
2020
all_tasks = [
21-
common.submit_step(cluster_name=CLUSTER_NAME, step_id=step)
21+
common.submit_step(cluster_name=CLUSTER_NAME, step_id=step, task_id=step)
2222
for step in ALL_STEPS
2323
]
2424
dag = common.generate_dag(cluster_name=CLUSTER_NAME, tasks=all_tasks)
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
"""Airflow DAG for the Preprocess part of the pipeline."""
2+
from __future__ import annotations
3+
4+
from pathlib import Path
5+
6+
import common_airflow as common
7+
from airflow.models.dag import DAG
8+
from airflow.utils.trigger_rule import TriggerRule
9+
10+
CLUSTER_NAME = "otg-preprocess-finngen"
11+
AUTOSCALING = "finngen-preprocess"
12+
13+
RELEASEBUCKET = "gs://genetics_etl_python_playground/output/python_etl/parquet/XX.XX"
14+
SUMSTATS = "{RELEASEBUCKET}/summary_statistics/finngen"
15+
WINDOWBASED_CLUMPED = (
16+
"{RELEASEBUCKET}/study_locus/from_sumstats_study_locus_window_clumped/finngen"
17+
)
18+
LD_CLUMPED = "{RELEASEBUCKET}/study_locus/from_sumstats_study_locus_ld_clumped/finngen"
19+
PICSED = "{RELEASEBUCKET}/credible_set/from_sumstats_study_locus/finngen"
20+
21+
with DAG(
22+
dag_id=Path(__file__).stem,
23+
description="Open Targets Genetics — Finngen preprocess",
24+
default_args=common.shared_dag_args,
25+
**common.shared_dag_kwargs,
26+
):
27+
study_and_sumstats = common.submit_step(
28+
cluster_name=CLUSTER_NAME,
29+
step_id="finngen",
30+
task_id="finngen_sumstats_and_study_index",
31+
)
32+
33+
window_based_clumping = common.submit_step(
34+
cluster_name=CLUSTER_NAME,
35+
step_id="clump",
36+
task_id="finngen_window_based_clumping",
37+
other_args=[
38+
"step.input_path={SUMSTATS}",
39+
"step.clumped_study_locus_path={WINDOWBASED_CLUMPED}",
40+
],
41+
)
42+
ld_clumping = common.submit_step(
43+
cluster_name=CLUSTER_NAME,
44+
step_id="clump",
45+
task_id="finngen_ld_clumping",
46+
other_args=[
47+
"step.input_path={WINDOWBASED_CLUMPED}",
48+
"step.clumped_study_locus_path={LD_CLUMPED}",
49+
],
50+
trigger_rule=TriggerRule.ALL_DONE,
51+
)
52+
53+
pics = common.submit_step(
54+
cluster_name=CLUSTER_NAME,
55+
step_id="pics",
56+
task_id="finngen_pics",
57+
other_args=[
58+
f"step.study_locus_ld_annotated_in={LD_CLUMPED}",
59+
f"step.picsed_study_locus_out={PICSED}",
60+
],
61+
# This allows to attempt running the task when above step fails do to failifexists
62+
trigger_rule=TriggerRule.ALL_DONE,
63+
)
64+
65+
(
66+
common.create_cluster(
67+
CLUSTER_NAME, autoscaling_policy=AUTOSCALING, master_disk_size=2000
68+
)
69+
>> common.install_dependencies(CLUSTER_NAME)
70+
>> study_and_sumstats
71+
>> window_based_clumping
72+
>> ld_clumping
73+
>> pics
74+
>> common.delete_cluster(CLUSTER_NAME)
75+
)

src/otg/dataset/study_locus.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,14 @@
1313
order_array_of_structs_by_field,
1414
)
1515
from otg.dataset.dataset import Dataset
16-
from otg.dataset.ld_index import LDIndex
1716
from otg.dataset.study_locus_overlap import StudyLocusOverlap
1817
from otg.method.clump import LDclumping
1918

2019
if TYPE_CHECKING:
2120
from pyspark.sql import Column, DataFrame
2221
from pyspark.sql.types import StructType
2322

23+
from otg.dataset.ld_index import LDIndex
2424
from otg.dataset.study_index import StudyIndex
2525

2626

src/otg/dataset/summary_statistics.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,10 @@ def window_based_clumping(
6565
"""Generate study-locus from summary statistics by distance based clumping + collect locus.
6666
6767
Args:
68-
distance (int): Distance in base pairs to be used for clumping.
68+
distance (int): Distance in base pairs to be used for clumping. Defaults to 500_000.
6969
gwas_significance (float, optional): GWAS significance threshold. Defaults to 5e-8.
7070
baseline_significance (float, optional): Baseline significance threshold for inclusion in the locus. Defaults to 0.05.
71-
locus_collect_distance (int | None): The distance to collect locus around semi-indices.
71+
locus_collect_distance (int | None): The distance to collect locus around semi-indices. If not provided, locus is not collected.
7272
7373
Returns:
7474
StudyLocus: Clumped study-locus containing variants based on window.

src/otg/datasource/finngen/study_index.py

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
11
"""Study Index for Finngen data source."""
22
from __future__ import annotations
33

4-
from typing import TYPE_CHECKING
4+
from urllib.request import urlopen
55

66
import pyspark.sql.functions as f
7+
from pyspark.sql import SparkSession
78

89
from otg.dataset.study_index import StudyIndex
910

10-
if TYPE_CHECKING:
11-
from pyspark.sql import DataFrame
12-
1311

1412
class FinnGenStudyIndex:
1513
"""Study index dataset from FinnGen.
@@ -24,35 +22,39 @@ class FinnGenStudyIndex:
2422
Some fields are also populated as constants, such as study type and the initial sample size.
2523
"""
2624

25+
finngen_phenotype_table_url: str = "https://r9.finngen.fi/api/phenos"
26+
finngen_release_prefix: str = "FINNGEN_R9"
27+
finngen_summary_stats_url_prefix: str = (
28+
"gs://finngen-public-data-r9/summary_stats/finngen_R9_"
29+
)
30+
finngen_summary_stats_url_suffix: str = ".gz"
31+
2732
@classmethod
2833
def from_source(
2934
cls: type[FinnGenStudyIndex],
30-
finngen_studies: DataFrame,
31-
finngen_release_prefix: str,
32-
finngen_summary_stats_url_prefix: str,
33-
finngen_summary_stats_url_suffix: str,
35+
spark: SparkSession,
3436
) -> StudyIndex:
3537
"""This function ingests study level metadata from FinnGen.
3638
3739
Args:
38-
finngen_studies (DataFrame): FinnGen raw study table
39-
finngen_release_prefix (str): Release prefix pattern.
40-
finngen_summary_stats_url_prefix (str): URL prefix for summary statistics location.
41-
finngen_summary_stats_url_suffix (str): URL prefix suffix for summary statistics location.
40+
spark (SparkSession): Spark session object.
4241
4342
Returns:
4443
StudyIndex: Parsed and annotated FinnGen study table.
4544
"""
45+
json_data = urlopen(cls.finngen_phenotype_table_url).read().decode("utf-8")
46+
rdd = spark.sparkContext.parallelize([json_data])
47+
raw_df = spark.read.json(rdd)
4648
return StudyIndex(
47-
_df=finngen_studies.select(
48-
f.concat(f.lit(f"{finngen_release_prefix}_"), f.col("phenocode")).alias(
49-
"studyId"
50-
),
49+
_df=raw_df.select(
50+
f.concat(
51+
f.lit(f"{cls.finngen_release_prefix}_"), f.col("phenocode")
52+
).alias("studyId"),
5153
f.col("phenostring").alias("traitFromSource"),
5254
f.col("num_cases").alias("nCases"),
5355
f.col("num_controls").alias("nControls"),
5456
(f.col("num_cases") + f.col("num_controls")).alias("nSamples"),
55-
f.lit(finngen_release_prefix).alias("projectId"),
57+
f.lit(cls.finngen_release_prefix).alias("projectId"),
5658
f.lit("gwas").alias("studyType"),
5759
f.lit(True).alias("hasSumstats"),
5860
f.lit("377,277 (210,870 females and 166,407 males)").alias(
@@ -67,9 +69,9 @@ def from_source(
6769
# Cohort label is consistent with GWAS Catalog curation.
6870
f.array(f.lit("FinnGen")).alias("cohorts"),
6971
f.concat(
70-
f.lit(finngen_summary_stats_url_prefix),
72+
f.lit(cls.finngen_summary_stats_url_prefix),
7173
f.col("phenocode"),
72-
f.lit(finngen_summary_stats_url_suffix),
74+
f.lit(cls.finngen_summary_stats_url_suffix),
7375
).alias("summarystatsLocation"),
7476
).withColumn(
7577
"ldPopulationStructure",

src/otg/datasource/finngen/summary_stats.py

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,38 +3,59 @@
33
from __future__ import annotations
44

55
from dataclasses import dataclass
6-
from typing import TYPE_CHECKING
76

87
import pyspark.sql.functions as f
98
import pyspark.sql.types as t
9+
from pyspark.sql import SparkSession
10+
from pyspark.sql.types import StringType, StructField, StructType
1011

1112
from otg.common.utils import parse_pvalue
1213
from otg.dataset.summary_statistics import SummaryStatistics
1314

14-
if TYPE_CHECKING:
15-
from pyspark.sql import DataFrame
16-
1715

1816
@dataclass
1917
class FinnGenSummaryStats:
2018
"""Summary statistics dataset for FinnGen."""
2119

20+
raw_schema: t.StructType = StructType(
21+
[
22+
StructField("#chrom", StringType(), True),
23+
StructField("pos", StringType(), True),
24+
StructField("ref", StringType(), True),
25+
StructField("alt", StringType(), True),
26+
StructField("rsids", StringType(), True),
27+
StructField("nearest_genes", StringType(), True),
28+
StructField("pval", StringType(), True),
29+
StructField("mlogp", StringType(), True),
30+
StructField("beta", StringType(), True),
31+
StructField("sebeta", StringType(), True),
32+
StructField("af_alt", StringType(), True),
33+
StructField("af_alt_cases", StringType(), True),
34+
StructField("af_alt_controls", StringType(), True),
35+
]
36+
)
37+
2238
@classmethod
2339
def from_source(
2440
cls: type[FinnGenSummaryStats],
25-
summary_stats_df: DataFrame,
41+
spark: SparkSession,
42+
raw_files: list[str],
2643
) -> SummaryStatistics:
2744
"""Ingests all summary statst for all FinnGen studies.
2845
2946
Args:
30-
summary_stats_df (DataFrame): Raw summary statistics dataframe
47+
spark (SparkSession): Spark session object.
48+
raw_files (list[str]): Paths to raw summary statistics .gz files.
3149
3250
Returns:
3351
SummaryStatistics: Processed summary statistics dataset
3452
"""
3553
processed_summary_stats_df = (
36-
summary_stats_df
54+
spark.read.schema(cls.raw_schema)
55+
.option("delimiter", "\t")
56+
.csv(raw_files, header=True)
3757
# Drop rows which don't have proper position.
58+
.filter(f.col("pos").cast(t.IntegerType()).isNotNull())
3859
.select(
3960
# From the full path, extracts just the filename, and converts to upper case to get the study ID.
4061
f.upper(f.regexp_extract(f.input_file_name(), r"([^/]+)\.gz", 1)).alias(

0 commit comments

Comments
 (0)