Skip to content

Commit

Permalink
feat: add Airflow DAG for Preprocess
Browse files Browse the repository at this point in the history
  • Loading branch information
tskir committed Oct 9, 2023
1 parent 678f340 commit f4864e4
Showing 1 changed file with 42 additions and 0 deletions.
42 changes: 42 additions & 0 deletions src/airflow/dags/dag_preprocess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""Apache Airflow workflow to run the Preprocess part of the pipeline."""

from __future__ import annotations

from functools import partial

from airflow.models.dag import DAG
from common_airflow import (
create_cluster,
delete_cluster,
outputs,
shared_dag_kwargs,
submit_pyspark_job,
)

# Workflow specific configuration.
CLUSTER_NAME = "otg-preprocess"
SPARK_WRITE_MODE = "append"
submit_pyspark_job_partial = partial(submit_pyspark_job, CLUSTER_NAME)


with DAG(
dag_id="otg-preprocess",
description="Open Targets Genetics — Preprocess Workflow",
**shared_dag_kwargs,
):
# Ingest FinnGen.
ingest_finngen = submit_pyspark_job_partial(
task_id="ingest-finngen",
python_module_path="finngen.py",
args=dict(
finngen_phenotype_table_url="https://r9.finngen.fi/api/phenos",
finngen_release_prefix="FINNGEN_R9",
finngen_summary_stats_url_prefix="gs://finngen-public-data-r9/summary_stats/finngen_R9_",
finngen_summary_stats_url_suffix=".gz",
finngen_study_index_out=f"{outputs}/preprocess/finngen/study_index",
spark_write_mode=SPARK_WRITE_MODE,
),
)

# Assemble the ingestion actions into DAG.
(create_cluster(CLUSTER_NAME) >> [ingest_finngen] >> delete_cluster(CLUSTER_NAME))

0 comments on commit f4864e4

Please sign in to comment.