From 12e7c87969077becfe997d493cf9065505441146 Mon Sep 17 00:00:00 2001 From: Princewill Onyenanu Date: Mon, 6 May 2024 19:24:15 +0100 Subject: [PATCH] Add variable to disable removing sql source files during ingestion (#4216) * add variable to disable removig sql source files for ingestion workflows. * Change default for removed parameter, allow either flag to remove * Whitespace change --------- Co-authored-by: Madison Swain-Bowden --- .../dags/providers/provider_api_scripts/inaturalist.py | 9 ++++++--- catalog/dags/providers/provider_dag_factory.py | 2 +- catalog/env.template | 4 ++++ 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/catalog/dags/providers/provider_api_scripts/inaturalist.py b/catalog/dags/providers/provider_api_scripts/inaturalist.py index b454ff09396..8b0661a1f7a 100644 --- a/catalog/dags/providers/provider_api_scripts/inaturalist.py +++ b/catalog/dags/providers/provider_api_scripts/inaturalist.py @@ -328,7 +328,7 @@ def create_preingestion_tasks(): python_callable=INaturalistDataIngester.load_catalog_of_life_names, doc_md="Load vernacular taxon names from Catalog of Life", op_kwargs={ - "remove_api_files": "{{params.sql_rm_source_data_after_ingesting}}" + "remove_api_files": "{{ params.sql_rm_source_data_after_ingesting or var.json.SQL_RM_SOURCE_DATA_AFTER_INGESTION }}", }, execution_timeout=timedelta(minutes=15), ) @@ -347,8 +347,11 @@ def create_postingestion_tasks(): check_drop_parameter = ShortCircuitOperator( task_id="check_drop_parameter", doc_md="Skip post-ingestion if NOT sql_rm_source_data_after_ingesting.", - op_args=["{{ params.sql_rm_source_data_after_ingesting }}"], - python_callable=(lambda x: x), + op_args=[ + "{{ params.sql_rm_source_data_after_ingesting }}", + "{{ var.json.SQL_RM_SOURCE_DATA_AFTER_INGESTION }}", + ], + python_callable=(lambda *x: any(x)), trigger_rule=TriggerRule.NONE_SKIPPED, # just skip the drop steps, not the final reporting step in the dag ignore_downstream_trigger_rules=False, diff --git a/catalog/dags/providers/provider_dag_factory.py b/catalog/dags/providers/provider_dag_factory.py index 3fb6de65e68..3c6bbaedb8d 100644 --- a/catalog/dags/providers/provider_dag_factory.py +++ b/catalog/dags/providers/provider_dag_factory.py @@ -449,7 +449,7 @@ def create_provider_api_workflow_dag(provider_conf: ProviderWorkflow): ), ), "sql_rm_source_data_after_ingesting": Param( - default=True, + default=False, type="boolean", description=( "Whether to delete source data from airflow and DB once ingestion" diff --git a/catalog/env.template b/catalog/env.template index 9d6b1bfdbbd..71c5e915a71 100644 --- a/catalog/env.template +++ b/catalog/env.template @@ -131,3 +131,7 @@ AIRFLOW_VAR_AIRFLOW_RDS_SNAPSHOTS_TO_RETAIN=7 # Whether to toggle production CloudWatch alarms when running a data refresh DAG. # Used to prevent requiring AWS credentials when running locally. AIRFLOW_VAR_TOGGLE_CLOUDWATCH_ALARMS=false + +# Whether to delete source data from airflow and DB once ingestion is complete. +# This is used to support data quality testing in SQL-only DAGs like iNaturalist +AIRFLOW_VAR_SQL_RM_SOURCE_DATA_AFTER_INGESTION=false