From 3e0b91151e521b5de27f23f7d9fdd63f4532a756 Mon Sep 17 00:00:00 2001 From: Jennifer Melot Date: Wed, 16 Aug 2023 11:33:52 -0400 Subject: [PATCH] Remove downstream dag triggers, make trigger dag wait for completion on all tasks --- linkage_dag.py | 9 --------- scholarly_lit_trigger.py | 9 ++++++--- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/linkage_dag.py b/linkage_dag.py index 3d3bc72..2d05466 100644 --- a/linkage_dag.py +++ b/linkage_dag.py @@ -9,7 +9,6 @@ from airflow.providers.google.cloud.operators.gcs import GCSDeleteObjectsOperator from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator from airflow.providers.google.cloud.transfers.bigquery_to_gcs import BigQueryToGCSOperator -from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.operators.bash import BashOperator from airflow.operators.dummy import DummyOperator from airflow.operators.python import PythonOperator @@ -490,12 +489,6 @@ ) start_production_cp >> push_to_production >> snapshot >> pop_descriptions >> success_alert - downstream_tasks = [ - TriggerDagRunOperator(task_id="trigger_article_classification", trigger_dag_id="article_classification"), - TriggerDagRunOperator(task_id="trigger_fields_of_study", trigger_dag_id="fields_of_study"), - TriggerDagRunOperator(task_id="trigger_new_fields_of_study", trigger_dag_id="new_fields_of_study"), - ] - # task structure clear_tmp_dir >> metadata_sequences_start (metadata_sequences_end >> union_ids >> check_unique_input_ids >> union_metadata >> export_metadata >> @@ -505,5 +498,3 @@ gce_instance_stop >> [import_id_mapping, import_lid] >> start_final_transform_queries) last_transform_query >> check_queries >> start_production_cp - - success_alert >> downstream_tasks diff --git a/scholarly_lit_trigger.py b/scholarly_lit_trigger.py index 2677d9a..11bfcbc 100644 --- a/scholarly_lit_trigger.py +++ b/scholarly_lit_trigger.py @@ -29,7 +29,8 @@ trigger_linkage = TriggerDagRunOperator( task_id="trigger_article_linkage_updater", - trigger_dag_id="article_linkage_updater" + trigger_dag_id="article_linkage_updater", + wait_for_completion=True ) for prerequisite_dag in ["clarivate_tables_updater"]: @@ -42,12 +43,14 @@ trigger_bulk_er = TriggerDagRunOperator( task_id="trigger_bulk_org_er_updater", - trigger_dag_id="bulk_org_er_updater" + trigger_dag_id="bulk_org_er_updater", + wait_for_completion=True ) trigger_metadata_merge = TriggerDagRunOperator( task_id="trigger_merged_article_metadata_updater", - trigger_dag_id="merged_article_metadata_updater" + trigger_dag_id="merged_article_metadata_updater", + wait_for_completion=True ) trigger_linkage >> trigger_bulk_er >> trigger_metadata_merge