Skip to content

Commit

Permalink
Merge pull request #34 from georgetown-cset/trigger-updates
Browse files Browse the repository at this point in the history
Separate pre-linkage and post-linkage triggers
  • Loading branch information
jamesdunham authored Oct 26, 2023
2 parents f886329 + b873d7c commit 9a5b5cb
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 15 deletions.
9 changes: 8 additions & 1 deletion linkage_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from airflow.providers.google.cloud.operators.gcs import GCSDeleteObjectsOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.providers.google.cloud.transfers.bigquery_to_gcs import BigQueryToGCSOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
Expand Down Expand Up @@ -472,6 +473,11 @@
with open(f"{os.environ.get('DAGS_FOLDER')}/schemas/{gcs_folder}/table_descriptions.json") as f:
table_desc = json.loads(f.read())

trigger_org_er_and_metadata_merge = TriggerDagRunOperator(
task_id="trigger_org_er_and_metadata_merge",
trigger_dag_id="org_er_and_metadata_merge"
)

for table in production_tables:
push_to_production = BigQueryToBigQueryOperator(
task_id="copy_"+table.lower(),
Expand All @@ -496,7 +502,8 @@
},
python_callable=update_table_descriptions
)
start_production_cp >> push_to_production >> snapshot >> pop_descriptions >> success_alert
(start_production_cp >> push_to_production >> snapshot >> pop_descriptions >> success_alert >>
trigger_org_er_and_metadata_merge)

# We don't show the "all metadata" table in the production dataset, but we do need to
# be able to diff the current data from the data used in the last run in simhash_input
Expand Down
51 changes: 51 additions & 0 deletions metadata_merge_trigger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from airflow import DAG
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.dummy import DummyOperator
from datetime import timedelta, datetime

from dataloader.airflow_utils.slack import task_fail_slack_alert


default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2022, 3, 5),
"email": ["jennifer.melot@georgetown.edu"],
"email_on_failure": True,
"email_on_retry": True,
"retries": 0,
"retry_delay": timedelta(minutes=5),
"on_failure_callback": task_fail_slack_alert
}

with DAG("org_er_and_metadata_merge",
default_args=default_args,
description="Triggers Org ER and metadata merge dags",
schedule_interval=None,
catchup=False
) as dag:

trigger_orgfixes1 = TriggerDagRunOperator(
task_id="trigger_orgfixes1",
trigger_dag_id="org_fixes",
wait_for_completion=True
)
trigger_bulk_org_er_updater = TriggerDagRunOperator(
task_id="trigger_bulk_org_er_updater",
trigger_dag_id="bulk_org_er_updater",
wait_for_completion=True
)
trigger_orgfixes2 = TriggerDagRunOperator(
task_id="trigger_orgfixes2",
trigger_dag_id="org_fixes",
wait_for_completion=True
)
trigger_merge = TriggerDagRunOperator(
task_id="trigger_merged_article_metadata_updater",
trigger_dag_id="merged_article_metadata_updater",
wait_for_completion=True
)

trigger_orgfixes1 >> trigger_bulk_org_er_updater >> trigger_orgfixes2 >> trigger_merge


1 change: 1 addition & 0 deletions push_to_airflow.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
gsutil cp linkage_dag.py gs://us-east1-production2023-cc1-01d75926-bucket/dags/
gsutil cp scholarly_lit_trigger.py gs://us-east1-production2023-cc1-01d75926-bucket/dags/
gsutil cp metadata_merge_trigger.py gs://us-east1-production2023-cc1-01d75926-bucket/dags/
gsutil rm -r gs://us-east1-production2023-cc1-01d75926-bucket/dags/sql/article_linkage/*
gsutil -m cp sql/* gs://us-east1-production2023-cc1-01d75926-bucket/dags/sql/article_linkage/
gsutil rm -r gs://us-east1-production2023-cc1-01d75926-bucket/dags/sequences/article_linkage/*
Expand Down
14 changes: 0 additions & 14 deletions scholarly_lit_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,3 @@
wait_for_completion=True
)
start >> trigger >> trigger_linkage

trigger_bulk_er = TriggerDagRunOperator(
task_id="trigger_bulk_org_er_updater",
trigger_dag_id="bulk_org_er_updater",
wait_for_completion=True
)

trigger_metadata_merge = TriggerDagRunOperator(
task_id="trigger_merged_article_metadata_updater",
trigger_dag_id="merged_article_metadata_updater",
wait_for_completion=True
)

trigger_linkage >> trigger_bulk_er >> trigger_metadata_merge

0 comments on commit 9a5b5cb

Please sign in to comment.