|
| 1 | +"""Airflow DAG for the preprocessing of GWAS Catalog's harmonised summary statistics and curated associations.""" |
| 2 | +from __future__ import annotations |
| 3 | + |
| 4 | +from pathlib import Path |
| 5 | + |
| 6 | +import common_airflow as common |
| 7 | +from airflow.models.dag import DAG |
| 8 | +from airflow.utils.task_group import TaskGroup |
| 9 | +from airflow.utils.trigger_rule import TriggerRule |
| 10 | + |
| 11 | +CLUSTER_NAME = "otg-preprocess-gwascatalog" |
| 12 | +AUTOSCALING = "otg-preprocess-gwascatalog" |
| 13 | + |
| 14 | +SUMSTATS = "gs://open-targets-gwas-summary-stats/harmonised" |
| 15 | +RELEASEBUCKET = "gs://genetics_etl_python_playground/output/python_etl/parquet/XX.XX" |
| 16 | + |
| 17 | +with DAG( |
| 18 | + dag_id=Path(__file__).stem, |
| 19 | + description="Open Targets Genetics — GWAS Catalog preprocess", |
| 20 | + default_args=common.shared_dag_args, |
| 21 | + **common.shared_dag_kwargs, |
| 22 | +): |
| 23 | + with TaskGroup(group_id="summary_stats_preprocessing") as summary_stats_group: |
| 24 | + summary_stats_window_clumping = common.submit_step( |
| 25 | + cluster_name=CLUSTER_NAME, |
| 26 | + step_id="clump", |
| 27 | + task_id="catalog_sumstats_window_clumping", |
| 28 | + other_args=[ |
| 29 | + f"step.input_path={SUMSTATS}", |
| 30 | + f"step.clumped_study_locus_path={RELEASEBUCKET}/study_locus/window_clumped/from_sumstats/catalog", |
| 31 | + ], |
| 32 | + ) |
| 33 | + summary_stats_ld_clumping = common.submit_step( |
| 34 | + cluster_name=CLUSTER_NAME, |
| 35 | + step_id="clump", |
| 36 | + task_id="catalog_sumstats_ld_clumping", |
| 37 | + other_args=[ |
| 38 | + f"step.input_path={RELEASEBUCKET}/study_locus/window_clumped/from_sumstats/catalog", |
| 39 | + "step.ld_index_path={RELEASEBUCKET}/ld_index", |
| 40 | + "step.study_index_path={RELEASEBUCKET}/study_index/catalog", |
| 41 | + "step.clumped_study_locus_path={RELEASEBUCKET}/study_locus/ld_clumped/from_sumstats/catalog", |
| 42 | + ], |
| 43 | + trigger_rule=TriggerRule.ALL_DONE, |
| 44 | + ) |
| 45 | + summary_stats_pics = common.submit_step( |
| 46 | + cluster_name=CLUSTER_NAME, |
| 47 | + step_id="pics", |
| 48 | + task_id="catalog_sumstats_pics", |
| 49 | + other_args=[ |
| 50 | + "step.study_locus_ld_annotated_in={RELEASEBUCKET}/study_locus/ld_clumped/from_sumstats/catalog", |
| 51 | + "step.picsed_study_locus_out={RELEASEBUCKET}/credible_set/from_sumstats/catalog", |
| 52 | + ], |
| 53 | + trigger_rule=TriggerRule.ALL_DONE, |
| 54 | + ) |
| 55 | + summary_stats_window_clumping >> summary_stats_ld_clumping >> summary_stats_pics |
| 56 | + |
| 57 | + with TaskGroup(group_id="curation_preprocessing") as curation_group: |
| 58 | + parse_study_and_curated_assocs = common.submit_step( |
| 59 | + cluster_name=CLUSTER_NAME, |
| 60 | + step_id="gwas_catalog_ingestion", |
| 61 | + task_id="catalog_ingestion", |
| 62 | + ) |
| 63 | + |
| 64 | + curation_ld_clumping = common.submit_step( |
| 65 | + cluster_name=CLUSTER_NAME, |
| 66 | + step_id="clump", |
| 67 | + task_id="catalog_curation_ld_clumping", |
| 68 | + other_args=[ |
| 69 | + "step.input_path={RELEASEBUCKET}/study_locus/catalog_curated", |
| 70 | + "step.ld_index_path={RELEASEBUCKET}/ld_index", |
| 71 | + "step.study_index_path={RELEASEBUCKET}/study_index/catalog", |
| 72 | + "step.clumped_study_locus_path={RELEASEBUCKET}/study_locus/ld_clumped/catalog_curated", |
| 73 | + ], |
| 74 | + trigger_rule=TriggerRule.ALL_DONE, |
| 75 | + ) |
| 76 | + |
| 77 | + curation_pics = common.submit_step( |
| 78 | + cluster_name=CLUSTER_NAME, |
| 79 | + step_id="pics", |
| 80 | + task_id="catalog_curation_pics", |
| 81 | + other_args=[ |
| 82 | + "step.study_locus_ld_annotated_in={RELEASEBUCKET}/study_locus/ld_clumped/catalog_curated", |
| 83 | + "step.picsed_study_locus_out={RELEASEBUCKET}/credible_set/catalog_curated", |
| 84 | + ], |
| 85 | + trigger_rule=TriggerRule.ALL_DONE, |
| 86 | + ) |
| 87 | + parse_study_and_curated_assocs >> curation_ld_clumping >> curation_pics |
| 88 | + |
| 89 | + ( |
| 90 | + common.create_cluster( |
| 91 | + CLUSTER_NAME, autoscaling_policy=AUTOSCALING, num_workers=5 |
| 92 | + ) |
| 93 | + >> common.install_dependencies(CLUSTER_NAME) |
| 94 | + >> [summary_stats_group, curation_group] |
| 95 | + >> common.delete_cluster(CLUSTER_NAME) |
| 96 | + ) |
0 commit comments