Skip to content

Commit

Permalink
Remove downstream dag triggers, make trigger dag wait for completion …
Browse files Browse the repository at this point in the history
…on all tasks
  • Loading branch information
jmelot committed Aug 16, 2023
1 parent 9c15df0 commit 3e0b911
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 12 deletions.
9 changes: 0 additions & 9 deletions linkage_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from airflow.providers.google.cloud.operators.gcs import GCSDeleteObjectsOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.providers.google.cloud.transfers.bigquery_to_gcs import BigQueryToGCSOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
Expand Down Expand Up @@ -490,12 +489,6 @@
)
start_production_cp >> push_to_production >> snapshot >> pop_descriptions >> success_alert

downstream_tasks = [
TriggerDagRunOperator(task_id="trigger_article_classification", trigger_dag_id="article_classification"),
TriggerDagRunOperator(task_id="trigger_fields_of_study", trigger_dag_id="fields_of_study"),
TriggerDagRunOperator(task_id="trigger_new_fields_of_study", trigger_dag_id="new_fields_of_study"),
]

# task structure
clear_tmp_dir >> metadata_sequences_start
(metadata_sequences_end >> union_ids >> check_unique_input_ids >> union_metadata >> export_metadata >>
Expand All @@ -505,5 +498,3 @@
gce_instance_stop >> [import_id_mapping, import_lid] >> start_final_transform_queries)

last_transform_query >> check_queries >> start_production_cp

success_alert >> downstream_tasks
9 changes: 6 additions & 3 deletions scholarly_lit_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@

trigger_linkage = TriggerDagRunOperator(
task_id="trigger_article_linkage_updater",
trigger_dag_id="article_linkage_updater"
trigger_dag_id="article_linkage_updater",
wait_for_completion=True
)

for prerequisite_dag in ["clarivate_tables_updater"]:
Expand All @@ -42,12 +43,14 @@

trigger_bulk_er = TriggerDagRunOperator(
task_id="trigger_bulk_org_er_updater",
trigger_dag_id="bulk_org_er_updater"
trigger_dag_id="bulk_org_er_updater",
wait_for_completion=True
)

trigger_metadata_merge = TriggerDagRunOperator(
task_id="trigger_merged_article_metadata_updater",
trigger_dag_id="merged_article_metadata_updater"
trigger_dag_id="merged_article_metadata_updater",
wait_for_completion=True
)

trigger_linkage >> trigger_bulk_er >> trigger_metadata_merge

0 comments on commit 3e0b911

Please sign in to comment.