|
| 1 | +"""DAG for updating GWAS Catalog curation table.""" |
| 2 | + |
| 3 | +from __future__ import annotations |
| 4 | + |
| 5 | +from datetime import datetime |
| 6 | +from pathlib import Path |
| 7 | + |
| 8 | +from ot_orchestration.utils import common |
| 9 | +from ot_orchestration.utils.dataproc import ( |
| 10 | + create_cluster, |
| 11 | + install_dependencies, |
| 12 | + submit_step, |
| 13 | +) |
| 14 | +from airflow.models.dag import DAG |
| 15 | +from airflow.models.baseoperator import chain |
| 16 | + |
| 17 | +CLUSTER_NAME = "otg-gwascatalog-curation" |
| 18 | +RUN_DATE = datetime.today().strftime("%Y-%m-%d") |
| 19 | + |
| 20 | +with DAG( |
| 21 | + dag_id=Path(__file__).stem, |
| 22 | + description="Open Targets Genetics — GWAS Catalog curation update", |
| 23 | + default_args=common.shared_dag_args, |
| 24 | + **common.shared_dag_kwargs, |
| 25 | +): |
| 26 | + update_gwas_curation = submit_step( |
| 27 | + cluster_name=CLUSTER_NAME, |
| 28 | + step_id="ot_gwas_catalog_study_curation", |
| 29 | + task_id="gwas_catalog_curation_update", |
| 30 | + other_args=[ |
| 31 | + f"step.gwas_catalog_study_curation_out=gs://genetics_etl_python_playground/input/v2d/GWAS_Catalog_study_curation_{RUN_DATE}.tsv", |
| 32 | + ], |
| 33 | + ) |
| 34 | + |
| 35 | + # DAG description: |
| 36 | + chain( |
| 37 | + create_cluster(CLUSTER_NAME, num_workers=2), |
| 38 | + install_dependencies(CLUSTER_NAME), |
| 39 | + update_gwas_curation, |
| 40 | + ) |
0 commit comments