From 6d8e43b730dcb9a9cc6b17ae5584246fc9c3801b Mon Sep 17 00:00:00 2001 From: Jennifer Melot Date: Wed, 10 Apr 2024 11:40:28 -0400 Subject: [PATCH] Trigger org-fixes directly, remove org er dag Part of https://github.com/georgetown-cset/cset_article_schema/issues/164 --- linkage_dag.py | 8 ++++---- metadata_merge_trigger.py | 42 --------------------------------------- 2 files changed, 4 insertions(+), 46 deletions(-) delete mode 100644 metadata_merge_trigger.py diff --git a/linkage_dag.py b/linkage_dag.py index 035c1d7..0414fe0 100644 --- a/linkage_dag.py +++ b/linkage_dag.py @@ -571,9 +571,9 @@ ) as f: table_desc = json.loads(f.read()) - trigger_org_er_and_metadata_merge = TriggerDagRunOperator( - task_id="trigger_org_er_and_metadata_merge", - trigger_dag_id="org_er_and_metadata_merge", + trigger_org_fixes = TriggerDagRunOperator( + task_id="trigger_org_fixes", + trigger_dag_id="org_fixes", ) for table in production_tables: @@ -606,7 +606,7 @@ >> snapshot >> pop_descriptions >> success_alert - >> trigger_org_er_and_metadata_merge + >> trigger_org_fixes ) # We don't show the "all metadata" table in the production dataset, but we do need to diff --git a/metadata_merge_trigger.py b/metadata_merge_trigger.py deleted file mode 100644 index feb12c2..0000000 --- a/metadata_merge_trigger.py +++ /dev/null @@ -1,42 +0,0 @@ -from datetime import datetime, timedelta - -from airflow import DAG -from airflow.operators.dummy import DummyOperator -from airflow.operators.trigger_dagrun import TriggerDagRunOperator -from dataloader.airflow_utils.defaults import get_default_args - -with DAG( - "org_er_and_metadata_merge", - default_args=get_default_args(pocs=["Jennifer"]), - description="Triggers Org ER and metadata merge dags", - schedule_interval=None, - catchup=False, -) as dag: - - trigger_orgfixes1 = TriggerDagRunOperator( - task_id="trigger_orgfixes1", - trigger_dag_id="org_fixes", - wait_for_completion=True, - ) - trigger_bulk_org_er_updater = TriggerDagRunOperator( - task_id="trigger_bulk_org_er_updater", - trigger_dag_id="bulk_org_er_updater", - wait_for_completion=True, - ) - trigger_orgfixes2 = TriggerDagRunOperator( - task_id="trigger_orgfixes2", - trigger_dag_id="org_fixes", - wait_for_completion=True, - ) - trigger_merge = TriggerDagRunOperator( - task_id="trigger_merged_article_metadata_updater", - trigger_dag_id="merged_article_metadata_updater", - wait_for_completion=True, - ) - - ( - trigger_orgfixes1 - >> trigger_bulk_org_er_updater - >> trigger_orgfixes2 - >> trigger_merge - )