From b873d7c03b5c21aa14174d71c99b70a7e35a6fc2 Mon Sep 17 00:00:00 2001 From: Jennifer Melot Date: Fri, 20 Oct 2023 17:21:56 -0400 Subject: [PATCH] Separate pre-linkage and post-linkage triggers --- linkage_dag.py | 9 ++++++- metadata_merge_trigger.py | 51 +++++++++++++++++++++++++++++++++++++++ push_to_airflow.sh | 1 + scholarly_lit_trigger.py | 14 ----------- 4 files changed, 60 insertions(+), 15 deletions(-) create mode 100644 metadata_merge_trigger.py diff --git a/linkage_dag.py b/linkage_dag.py index 1679db9..41700ef 100644 --- a/linkage_dag.py +++ b/linkage_dag.py @@ -9,6 +9,7 @@ from airflow.providers.google.cloud.operators.gcs import GCSDeleteObjectsOperator from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator from airflow.providers.google.cloud.transfers.bigquery_to_gcs import BigQueryToGCSOperator +from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.operators.bash import BashOperator from airflow.operators.dummy import DummyOperator from airflow.operators.python import PythonOperator @@ -472,6 +473,11 @@ with open(f"{os.environ.get('DAGS_FOLDER')}/schemas/{gcs_folder}/table_descriptions.json") as f: table_desc = json.loads(f.read()) + trigger_org_er_and_metadata_merge = TriggerDagRunOperator( + task_id="trigger_org_er_and_metadata_merge", + trigger_dag_id="org_er_and_metadata_merge" + ) + for table in production_tables: push_to_production = BigQueryToBigQueryOperator( task_id="copy_"+table.lower(), @@ -496,7 +502,8 @@ }, python_callable=update_table_descriptions ) - start_production_cp >> push_to_production >> snapshot >> pop_descriptions >> success_alert + (start_production_cp >> push_to_production >> snapshot >> pop_descriptions >> success_alert >> + trigger_org_er_and_metadata_merge) # We don't show the "all metadata" table in the production dataset, but we do need to # be able to diff the current data from the data used in the last run in simhash_input diff --git a/metadata_merge_trigger.py b/metadata_merge_trigger.py new file mode 100644 index 0000000..4eb2d52 --- /dev/null +++ b/metadata_merge_trigger.py @@ -0,0 +1,51 @@ +from airflow import DAG +from airflow.operators.trigger_dagrun import TriggerDagRunOperator +from airflow.operators.dummy import DummyOperator +from datetime import timedelta, datetime + +from dataloader.airflow_utils.slack import task_fail_slack_alert + + +default_args = { + "owner": "airflow", + "depends_on_past": False, + "start_date": datetime(2022, 3, 5), + "email": ["jennifer.melot@georgetown.edu"], + "email_on_failure": True, + "email_on_retry": True, + "retries": 0, + "retry_delay": timedelta(minutes=5), + "on_failure_callback": task_fail_slack_alert +} + +with DAG("org_er_and_metadata_merge", + default_args=default_args, + description="Triggers Org ER and metadata merge dags", + schedule_interval=None, + catchup=False + ) as dag: + + trigger_orgfixes1 = TriggerDagRunOperator( + task_id="trigger_orgfixes1", + trigger_dag_id="org_fixes", + wait_for_completion=True + ) + trigger_bulk_org_er_updater = TriggerDagRunOperator( + task_id="trigger_bulk_org_er_updater", + trigger_dag_id="bulk_org_er_updater", + wait_for_completion=True + ) + trigger_orgfixes2 = TriggerDagRunOperator( + task_id="trigger_orgfixes2", + trigger_dag_id="org_fixes", + wait_for_completion=True + ) + trigger_merge = TriggerDagRunOperator( + task_id="trigger_merged_article_metadata_updater", + trigger_dag_id="merged_article_metadata_updater", + wait_for_completion=True + ) + + trigger_orgfixes1 >> trigger_bulk_org_er_updater >> trigger_orgfixes2 >> trigger_merge + + diff --git a/push_to_airflow.sh b/push_to_airflow.sh index 5f99c37..6cbb788 100755 --- a/push_to_airflow.sh +++ b/push_to_airflow.sh @@ -1,5 +1,6 @@ gsutil cp linkage_dag.py gs://us-east1-production2023-cc1-01d75926-bucket/dags/ gsutil cp scholarly_lit_trigger.py gs://us-east1-production2023-cc1-01d75926-bucket/dags/ +gsutil cp metadata_merge_trigger.py gs://us-east1-production2023-cc1-01d75926-bucket/dags/ gsutil rm -r gs://us-east1-production2023-cc1-01d75926-bucket/dags/sql/article_linkage/* gsutil -m cp sql/* gs://us-east1-production2023-cc1-01d75926-bucket/dags/sql/article_linkage/ gsutil rm -r gs://us-east1-production2023-cc1-01d75926-bucket/dags/sequences/article_linkage/* diff --git a/scholarly_lit_trigger.py b/scholarly_lit_trigger.py index fe3ffda..5f9a805 100644 --- a/scholarly_lit_trigger.py +++ b/scholarly_lit_trigger.py @@ -40,17 +40,3 @@ wait_for_completion=True ) start >> trigger >> trigger_linkage - - trigger_bulk_er = TriggerDagRunOperator( - task_id="trigger_bulk_org_er_updater", - trigger_dag_id="bulk_org_er_updater", - wait_for_completion=True - ) - - trigger_metadata_merge = TriggerDagRunOperator( - task_id="trigger_merged_article_metadata_updater", - trigger_dag_id="merged_article_metadata_updater", - wait_for_completion=True - ) - - trigger_linkage >> trigger_bulk_er >> trigger_metadata_merge