From dd86d9e7ec3d910c090c899cf869152a0d75fe19 Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Fri, 5 Apr 2024 16:11:04 -0700 Subject: [PATCH 1/6] Add is_reingestion context to Ingester class --- catalog/dags/providers/factory_utils.py | 6 ++---- .../provider_data_ingester.py | 6 ++++++ catalog/dags/providers/provider_dag_factory.py | 15 +++++++++------ 3 files changed, 17 insertions(+), 10 deletions(-) diff --git a/catalog/dags/providers/factory_utils.py b/catalog/dags/providers/factory_utils.py index eccc73ad891..31ed4887295 100644 --- a/catalog/dags/providers/factory_utils.py +++ b/catalog/dags/providers/factory_utils.py @@ -1,6 +1,5 @@ import logging import time -from collections.abc import Sequence from datetime import datetime from airflow.models import DagRun, TaskInstance @@ -19,17 +18,16 @@ def pull_media_wrapper( media_types: list[MediaType], ti: TaskInstance, dag_run: DagRun, - args: Sequence = None, + ingestion_kwargs: dict, ): """ Run the provided callable after pushing the output directories for each media store, which are generated when initializing the ingester class. """ - args = args or [] # Initialize the ProviderDataIngester class, which will initialize the # media stores and DelayedRequester. logger.info(f"Initializing ProviderIngester {ingester_class.__name__}") - ingester = ingester_class(dag_run.conf, dag_run.dag_id, *args) + ingester = ingester_class(dag_run.conf, dag_run.dag_id, **ingestion_kwargs) stores: dict[MediaType, MediaStore] = ingester.media_stores # Check that the ProviderDataIngester class has a store configuration for each diff --git a/catalog/dags/providers/provider_api_scripts/provider_data_ingester.py b/catalog/dags/providers/provider_api_scripts/provider_data_ingester.py index 982dc4bc91a..8ec5191d79b 100644 --- a/catalog/dags/providers/provider_api_scripts/provider_data_ingester.py +++ b/catalog/dags/providers/provider_api_scripts/provider_data_ingester.py @@ -120,6 +120,7 @@ def __init__( dag_id: str = None, date: str = None, day_shift: int = None, + is_reingestion: bool = False, ): """ Initialize the provider configuration. @@ -129,6 +130,8 @@ def __init__( dag_id: The id of the running provider DAG date: Date String in the form YYYY-MM-DD. This is the date for which running the script will pull data + is_reingestion: Whether the ingester is being used to run reingestion, + ie running ingestion for several days. """ # An airflow variable used to cap the amount of records to be ingested. # This can be used for testing purposes to ensure a provider script @@ -138,6 +141,8 @@ def __init__( "INGESTION_LIMIT", deserialize_json=True, default_var=0 ) + self.is_reingestion = is_reingestion + # If a test limit is imposed, ensure that the `batch_limit` does not # exceed this. if self.limit: @@ -226,6 +231,7 @@ def ingest_records(self, **kwargs) -> None: return logger.info(f"Begin ingestion for {self.__class__.__name__}") + logger.info(f"is reingestion? {self.is_reingestion}") while should_continue: query_params = self._get_query_params(query_params, **kwargs) diff --git a/catalog/dags/providers/provider_dag_factory.py b/catalog/dags/providers/provider_dag_factory.py index a3515986638..fb27191080c 100644 --- a/catalog/dags/providers/provider_dag_factory.py +++ b/catalog/dags/providers/provider_dag_factory.py @@ -172,21 +172,24 @@ def append_day_shift(id_str): if is_reingestion: identifier = f"{day_shift}_{identifier}" - ingestion_kwargs = { + pull_data_kwargs = { "ingester_class": conf.ingester_class, "media_types": conf.media_types, + "ingestion_kwargs": {"is_reingestion": is_reingestion}, } if conf.dated: - ingestion_kwargs["args"] = [ - DATE_RANGE_ARG_TEMPLATE.format(day_shift), - day_shift, # Pass day_shift in as the tsv_suffix - ] + pull_data_kwargs["ingestion_kwargs"].update( + { + "date": DATE_RANGE_ARG_TEMPLATE.format(day_shift), + "day_shift": day_shift, # Pass day_shift in as the tsv_suffix + } + ) pull_data = PythonOperator( task_id=append_day_shift(f"pull_{media_type_name}_data"), python_callable=pull_media_wrapper, op_kwargs={ - **ingestion_kwargs, + **pull_data_kwargs, }, depends_on_past=False, execution_timeout=conf.pull_timeout, From f2a7b3f8ba7062e463c52af302af8e48e4ebb533 Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Mon, 8 Apr 2024 16:04:55 -0700 Subject: [PATCH 2/6] Revert "Add is_reingestion context to Ingester class" This reverts commit dd86d9e7ec3d910c090c899cf869152a0d75fe19. --- catalog/dags/providers/factory_utils.py | 6 ++++-- .../provider_data_ingester.py | 6 ------ catalog/dags/providers/provider_dag_factory.py | 15 ++++++--------- 3 files changed, 10 insertions(+), 17 deletions(-) diff --git a/catalog/dags/providers/factory_utils.py b/catalog/dags/providers/factory_utils.py index 31ed4887295..eccc73ad891 100644 --- a/catalog/dags/providers/factory_utils.py +++ b/catalog/dags/providers/factory_utils.py @@ -1,5 +1,6 @@ import logging import time +from collections.abc import Sequence from datetime import datetime from airflow.models import DagRun, TaskInstance @@ -18,16 +19,17 @@ def pull_media_wrapper( media_types: list[MediaType], ti: TaskInstance, dag_run: DagRun, - ingestion_kwargs: dict, + args: Sequence = None, ): """ Run the provided callable after pushing the output directories for each media store, which are generated when initializing the ingester class. """ + args = args or [] # Initialize the ProviderDataIngester class, which will initialize the # media stores and DelayedRequester. logger.info(f"Initializing ProviderIngester {ingester_class.__name__}") - ingester = ingester_class(dag_run.conf, dag_run.dag_id, **ingestion_kwargs) + ingester = ingester_class(dag_run.conf, dag_run.dag_id, *args) stores: dict[MediaType, MediaStore] = ingester.media_stores # Check that the ProviderDataIngester class has a store configuration for each diff --git a/catalog/dags/providers/provider_api_scripts/provider_data_ingester.py b/catalog/dags/providers/provider_api_scripts/provider_data_ingester.py index 8ec5191d79b..982dc4bc91a 100644 --- a/catalog/dags/providers/provider_api_scripts/provider_data_ingester.py +++ b/catalog/dags/providers/provider_api_scripts/provider_data_ingester.py @@ -120,7 +120,6 @@ def __init__( dag_id: str = None, date: str = None, day_shift: int = None, - is_reingestion: bool = False, ): """ Initialize the provider configuration. @@ -130,8 +129,6 @@ def __init__( dag_id: The id of the running provider DAG date: Date String in the form YYYY-MM-DD. This is the date for which running the script will pull data - is_reingestion: Whether the ingester is being used to run reingestion, - ie running ingestion for several days. """ # An airflow variable used to cap the amount of records to be ingested. # This can be used for testing purposes to ensure a provider script @@ -141,8 +138,6 @@ def __init__( "INGESTION_LIMIT", deserialize_json=True, default_var=0 ) - self.is_reingestion = is_reingestion - # If a test limit is imposed, ensure that the `batch_limit` does not # exceed this. if self.limit: @@ -231,7 +226,6 @@ def ingest_records(self, **kwargs) -> None: return logger.info(f"Begin ingestion for {self.__class__.__name__}") - logger.info(f"is reingestion? {self.is_reingestion}") while should_continue: query_params = self._get_query_params(query_params, **kwargs) diff --git a/catalog/dags/providers/provider_dag_factory.py b/catalog/dags/providers/provider_dag_factory.py index fb27191080c..a3515986638 100644 --- a/catalog/dags/providers/provider_dag_factory.py +++ b/catalog/dags/providers/provider_dag_factory.py @@ -172,24 +172,21 @@ def append_day_shift(id_str): if is_reingestion: identifier = f"{day_shift}_{identifier}" - pull_data_kwargs = { + ingestion_kwargs = { "ingester_class": conf.ingester_class, "media_types": conf.media_types, - "ingestion_kwargs": {"is_reingestion": is_reingestion}, } if conf.dated: - pull_data_kwargs["ingestion_kwargs"].update( - { - "date": DATE_RANGE_ARG_TEMPLATE.format(day_shift), - "day_shift": day_shift, # Pass day_shift in as the tsv_suffix - } - ) + ingestion_kwargs["args"] = [ + DATE_RANGE_ARG_TEMPLATE.format(day_shift), + day_shift, # Pass day_shift in as the tsv_suffix + ] pull_data = PythonOperator( task_id=append_day_shift(f"pull_{media_type_name}_data"), python_callable=pull_media_wrapper, op_kwargs={ - **pull_data_kwargs, + **ingestion_kwargs, }, depends_on_past=False, execution_timeout=conf.pull_timeout, From b08aaa22685b1ccec286c6c17125178bb037df5e Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Mon, 8 Apr 2024 16:29:24 -0700 Subject: [PATCH 3/6] Do not report to Slack during pull_data task for reingestion, but in aggregate task --- .../dags/providers/provider_dag_factory.py | 191 ++++++++++++------ 1 file changed, 130 insertions(+), 61 deletions(-) diff --git a/catalog/dags/providers/provider_dag_factory.py b/catalog/dags/providers/provider_dag_factory.py index a3515986638..4250ff3536e 100644 --- a/catalog/dags/providers/provider_dag_factory.py +++ b/catalog/dags/providers/provider_dag_factory.py @@ -69,14 +69,18 @@ from string import Template from airflow import DAG +from airflow.decorators import task +from airflow.exceptions import AirflowSkipException from airflow.models import Variable from airflow.models.mappedoperator import MappedOperator from airflow.models.param import Param from airflow.operators.empty import EmptyOperator from airflow.operators.python import PythonOperator +from airflow.utils.state import State from airflow.utils.task_group import TaskGroup from airflow.utils.trigger_rule import TriggerRule +from common import slack from common.constants import AWS_CONN_ID, DAG_DEFAULT_ARGS, XCOM_PULL_TEMPLATE from common.constants import POSTGRES_CONN_ID as DB_CONN_ID from common.loader import loader, reporting, s3, sql @@ -109,8 +113,10 @@ def _apply_configuration_overrides(dag: DAG, overrides: list[TaskOverride]): if not overrides: return - for task in dag.tasks: - if (task_overrides := _get_overrides_for_task(task.task_id, overrides)) is None: + for dag_task in dag.tasks: + if ( + task_overrides := _get_overrides_for_task(dag_task.task_id, overrides) + ) is None: continue # Format timeout override and apply @@ -118,10 +124,10 @@ def _apply_configuration_overrides(dag: DAG, overrides: list[TaskOverride]): if timeout_override: # If the task is a MappedOperator, apply the timeout to partial # kwargs to ensure it is set on each mapped task. - if isinstance(task, MappedOperator): - task.partial_kwargs["execution_timeout"] = timeout_override + if isinstance(dag_task, MappedOperator): + dag_task.partial_kwargs["execution_timeout"] = timeout_override else: - task.execution_timeout = timeout_override + dag_task.execution_timeout = timeout_override def _get_overrides_for_task( @@ -139,8 +145,16 @@ def _get_overrides_for_task( return None +def _get_media_type_descriptor(media_types): + """ + Given the list of supported media types, get the short media type descriptor + used in task_ids. + """ + return "mixed" if len(media_types) > 1 else media_types[0] + + def create_ingestion_workflow( - conf: ProviderWorkflow, day_shift: int = 0, is_reingestion: bool = False + provider_conf: ProviderWorkflow, day_shift: int = 0, is_reingestion: bool = False ): """ Create a TaskGroup that performs the ingestion tasks. @@ -150,12 +164,12 @@ def create_ingestion_workflow( Required Arguments: - conf: ProviderWorkflow configuration object. + provider_conf: ProviderWorkflow configuration object. Optional Arguments: day_shift: integer giving the number of days before the current logical date - for which ingestion should run (if `conf.dated==True`). + for which ingestion should run (if `provider_conf.dated==True`). is_reingestion: is this workflow a reingestion workflow """ @@ -164,8 +178,8 @@ def append_day_shift(id_str): return f"{id_str}{f'_day_shift_{day_shift}' if day_shift else ''}" with TaskGroup(group_id=append_day_shift("ingest_data")) as ingest_data: - media_type_name = "mixed" if len(conf.media_types) > 1 else conf.media_types[0] - provider_name = conf.dag_id.replace("_workflow", "") + provider_name = provider_conf.dag_id.replace("_workflow", "") + media_type_name = _get_media_type_descriptor(provider_conf.media_types) # Unique identifier used to generate the load_table name identifier = f"{{{{ ts_nodash }}}}_{provider_name}" @@ -173,10 +187,10 @@ def append_day_shift(id_str): identifier = f"{day_shift}_{identifier}" ingestion_kwargs = { - "ingester_class": conf.ingester_class, - "media_types": conf.media_types, + "ingester_class": provider_conf.ingester_class, + "media_types": provider_conf.media_types, } - if conf.dated: + if provider_conf.dated: ingestion_kwargs["args"] = [ DATE_RANGE_ARG_TEMPLATE.format(day_shift), day_shift, # Pass day_shift in as the tsv_suffix @@ -189,15 +203,21 @@ def append_day_shift(id_str): **ingestion_kwargs, }, depends_on_past=False, - execution_timeout=conf.pull_timeout, + execution_timeout=provider_conf.pull_timeout, # If the data pull fails, we want to load all data that's been retrieved # thus far before we attempt again retries=0, + # Only report errors to Slack in non-reingestion workflows. For reingestion, + # ingestion errors will be reported in aggregate after all reingestion + # days have been attempted. + on_failure_callback=( + slack.on_failure_callback if not is_reingestion else None + ), ) load_tasks = [] record_counts_by_media_type: reporting.MediaTypeRecordMetrics = {} - for media_type in conf.media_types: + for media_type in provider_conf.media_types: with TaskGroup( group_id=append_day_shift(f"load_{media_type}_data") ) as load_data: @@ -232,7 +252,7 @@ def append_day_shift(id_str): ), "aws_conn_id": AWS_CONN_ID, "extra_args": { - "StorageClass": conf.s3_tsv_storage_class, + "StorageClass": provider_conf.s3_tsv_storage_class, }, }, trigger_rule=TriggerRule.NONE_SKIPPED, @@ -263,7 +283,7 @@ def append_day_shift(id_str): ) upsert_data = PythonOperator( task_id=append_day_shift("upsert_data"), - execution_timeout=conf.upsert_timeout, + execution_timeout=provider_conf.upsert_timeout, retries=1, python_callable=loader.upsert_data, op_kwargs={ @@ -301,12 +321,12 @@ def append_day_shift(id_str): pull_data >> load_tasks - if conf.create_preingestion_tasks: - preingestion_tasks = conf.create_preingestion_tasks() + if provider_conf.create_preingestion_tasks: + preingestion_tasks = provider_conf.create_preingestion_tasks() preingestion_tasks >> pull_data - if conf.create_postingestion_tasks: - postingestion_tasks = conf.create_postingestion_tasks() + if provider_conf.create_postingestion_tasks: + postingestion_tasks = provider_conf.create_postingestion_tasks() load_tasks >> postingestion_tasks ingestion_metrics = { @@ -341,36 +361,36 @@ def create_report_load_completion( ) -def create_provider_api_workflow_dag(conf: ProviderWorkflow): +def create_provider_api_workflow_dag(provider_conf: ProviderWorkflow): """ Instantiate a DAG that will run the given `main_function`. Required Arguments: - conf: ProviderWorkflow configuration object. + provider_conf: ProviderWorkflow configuration object. """ - default_args = {**DAG_DEFAULT_ARGS, **(conf.default_args or {})} + default_args = {**DAG_DEFAULT_ARGS, **(provider_conf.default_args or {})} # catchup is turned on by default for dated DAGs to allow backfilling. # It can be overridden with the `CATCHUP_ENABLED` Airflow variable. - catchup_enabled = conf.dated and Variable.get( + catchup_enabled = provider_conf.dated and Variable.get( "CATCHUP_ENABLED", default_var=True, deserialize_json=True ) dag = DAG( - dag_id=conf.dag_id, - default_args={**default_args, "start_date": conf.start_date}, - max_active_tasks=conf.max_active_tasks, - max_active_runs=conf.max_active_runs, - start_date=conf.start_date, - schedule=conf.schedule_string, + dag_id=provider_conf.dag_id, + default_args={**default_args, "start_date": provider_conf.start_date}, + max_active_tasks=provider_conf.max_active_tasks, + max_active_runs=provider_conf.max_active_runs, + start_date=provider_conf.start_date, + schedule=provider_conf.schedule_string, catchup=catchup_enabled, - doc_md=conf.doc_md, + doc_md=provider_conf.doc_md, tags=[ "provider", - *[f"provider: {media_type}" for media_type in conf.media_types], - f"ingestion: {'dated' if conf.dated else 'full'}", - *conf.tags, + *[f"provider: {media_type}" for media_type in provider_conf.media_types], + f"ingestion: {'dated' if provider_conf.dated else 'full'}", + *provider_conf.tags, ], render_template_as_native_obj=True, user_defined_macros={"date_partition_for_prefix": date_partition_for_prefix}, @@ -433,38 +453,44 @@ def create_provider_api_workflow_dag(conf: ProviderWorkflow): ) with dag: - if callable(getattr(conf.ingester_class, "create_ingestion_workflow", None)): + if callable( + getattr(provider_conf.ingester_class, "create_ingestion_workflow", None) + ): ( ingest_data, ingestion_metrics, - ) = conf.ingester_class.create_ingestion_workflow() + ) = provider_conf.ingester_class.create_ingestion_workflow() else: - ingest_data, ingestion_metrics = create_ingestion_workflow(conf) + ingest_data, ingestion_metrics = create_ingestion_workflow(provider_conf) report_load_completion = create_report_load_completion( - conf.dag_id, conf.media_types, ingestion_metrics, conf.dated + provider_conf.dag_id, + provider_conf.media_types, + ingestion_metrics, + provider_conf.dated, ) ingest_data >> report_load_completion # Apply any overrides from the DAG configuration - _apply_configuration_overrides(dag, conf.overrides) + _apply_configuration_overrides(dag, provider_conf.overrides) return dag def _build_partitioned_ingest_workflows( - partitioned_reingestion_days: list[list[int]], conf: ProviderReingestionWorkflow + partitioned_reingestion_days: list[list[int]], + provider_conf: ProviderReingestionWorkflow, ): """ Build a list of lists of ingestion tasks. - These are parameterized by the given dag conf and a list of day shifts. + These are parameterized by the given dag provider_conf and a list of day shifts. Calculation is explained below. Required Arguments: - conf: ProviderReingestionWorkflow configuration + provider_conf: ProviderReingestionWorkflow configuration object used to configure the ingestion tasks. partitioned_reingestion_days: list of lists of integers. It gives the set of days before the current execution @@ -519,7 +545,7 @@ def _build_partitioned_ingest_workflows( workflow_list = [] for day_shift in partition: ingest_data, ingestion_metrics = create_ingestion_workflow( - conf, day_shift, is_reingestion=True + provider_conf, day_shift, is_reingestion=True ) workflow_list.append(ingest_data) duration_list.append(ingestion_metrics["duration"]) @@ -537,18 +563,47 @@ def _build_partitioned_ingest_workflows( return partitioned_workflows, total_ingestion_metrics +@task +def report_aggregate_reingestion_errors( + provider_conf: ProviderReingestionWorkflow, dag_run=None +): + """ + Report ingestion errors that occurred during a reingestion workflow in + a single, aggregate slack message. If no errors were encountered, skip. + """ + # Get the list of failed `pull_data` tasks + media_type_name = _get_media_type_descriptor(provider_conf.media_types) + failed_pull_data_tasks = [ + task + for task in dag_run.get_task_instances(state=State.FAILED) + if f"pull_{media_type_name}_data" in task.task_id + ] + + if not failed_pull_data_tasks: + raise AirflowSkipException + + message = ( + f"Ingestion errors were encountered in {len(failed_pull_data_tasks)}" + f" ingestion days while running the `{provider_conf.dag_id}`` DAG. See the following" + " logs for details:\n" + ) + "\n".join(f" - {task.log_url}" for task in failed_pull_data_tasks) + + slack.send_alert(message, provider_conf.dag_id, "Aggregate Reingestion Error") + + def create_day_partitioned_reingestion_dag( - conf: ProviderReingestionWorkflow, partitioned_reingestion_days: list[list[int]] + provider_conf: ProviderReingestionWorkflow, + partitioned_reingestion_days: list[list[int]], ): """ Instantiate a DAG that will run ingestion using the given configuration. - In addition to a `conf` object and `reingestion_day_list_list`, this is + In addition to a `provider_conf` object and `reingestion_day_list_list`, this is parameterized by a number of dates calculated using the reingestion day list. Required Arguments: - conf: ProviderReingestionWorkflow configuration + provider_conf: ProviderReingestionWorkflow configuration object used to configure the ingestion tasks. reingestion_day_list_list: list of lists of integers. It gives the set of days before the current execution @@ -557,19 +612,22 @@ def create_day_partitioned_reingestion_dag( describes how the calls to the function should be prioritized. """ - default_args = {**DAG_DEFAULT_ARGS, **(conf.default_args or {})} + default_args = {**DAG_DEFAULT_ARGS, **(provider_conf.default_args or {})} dag = DAG( - dag_id=conf.dag_id, - default_args={**default_args, "start_date": conf.start_date}, - max_active_tasks=conf.max_active_tasks, - max_active_runs=conf.max_active_runs, - dagrun_timeout=conf.dagrun_timeout, - schedule=conf.schedule_string, - start_date=conf.start_date, + dag_id=provider_conf.dag_id, + default_args={**default_args, "start_date": provider_conf.start_date}, + max_active_tasks=provider_conf.max_active_tasks, + max_active_runs=provider_conf.max_active_runs, + dagrun_timeout=provider_conf.dagrun_timeout, + schedule=provider_conf.schedule_string, + start_date=provider_conf.start_date, catchup=False, - doc_md=conf.doc_md, + doc_md=provider_conf.doc_md, tags=["provider-reingestion"] - + [f"provider-reingestion: {media_type}" for media_type in conf.media_types], + + [ + f"provider-reingestion: {media_type}" + for media_type in provider_conf.media_types + ], render_template_as_native_obj=True, user_defined_macros={"date_partition_for_prefix": date_partition_for_prefix}, ) @@ -578,7 +636,9 @@ def create_day_partitioned_reingestion_dag( ( partitioned_ingest_workflows, ingestion_metrics, - ) = _build_partitioned_ingest_workflows(partitioned_reingestion_days, conf) + ) = _build_partitioned_ingest_workflows( + partitioned_reingestion_days, provider_conf + ) # For each 'level', make a gather task that waits for all of the reingestion # tasks at that level to complete. @@ -594,17 +654,26 @@ def create_day_partitioned_reingestion_dag( # This gates the tasks at each level. gather_operator >> partitioned_ingest_workflows[i + 1] + report_aggregate_errors = report_aggregate_reingestion_errors(provider_conf) + # Create a single report_load_completion task, passing in the list of duration # and counts data for each completed task. report_load_completion = create_report_load_completion( - conf.dag_id, conf.media_types, ingestion_metrics, conf.dated + provider_conf.dag_id, + provider_conf.media_types, + ingestion_metrics, + provider_conf.dated, ) # report_load_completion is downstream of all the ingestion TaskGroups in the # final list. - partitioned_ingest_workflows[-1] >> report_load_completion + ( + partitioned_ingest_workflows[-1] + >> report_aggregate_errors + >> report_load_completion + ) # Apply any overrides from the DAG configuration - _apply_configuration_overrides(dag, conf.overrides) + _apply_configuration_overrides(dag, provider_conf.overrides) return dag From 128bb024d6cff433a05312b64010eedd23726990 Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Mon, 8 Apr 2024 16:46:25 -0700 Subject: [PATCH 4/6] Fix formatting --- catalog/dags/providers/provider_dag_factory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/catalog/dags/providers/provider_dag_factory.py b/catalog/dags/providers/provider_dag_factory.py index 4250ff3536e..38cec7fd437 100644 --- a/catalog/dags/providers/provider_dag_factory.py +++ b/catalog/dags/providers/provider_dag_factory.py @@ -584,7 +584,7 @@ def report_aggregate_reingestion_errors( message = ( f"Ingestion errors were encountered in {len(failed_pull_data_tasks)}" - f" ingestion days while running the `{provider_conf.dag_id}`` DAG. See the following" + f" ingestion days while running the `{provider_conf.dag_id}` DAG. See the following" " logs for details:\n" ) + "\n".join(f" - {task.log_url}" for task in failed_pull_data_tasks) From d6e9b7da1bd02bc809198448a99a3ed1a8a79454 Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Wed, 10 Apr 2024 11:17:32 -0700 Subject: [PATCH 5/6] Make log links more compact --- catalog/dags/providers/provider_dag_factory.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/catalog/dags/providers/provider_dag_factory.py b/catalog/dags/providers/provider_dag_factory.py index 38cec7fd437..752b42faea8 100644 --- a/catalog/dags/providers/provider_dag_factory.py +++ b/catalog/dags/providers/provider_dag_factory.py @@ -586,7 +586,9 @@ def report_aggregate_reingestion_errors( f"Ingestion errors were encountered in {len(failed_pull_data_tasks)}" f" ingestion days while running the `{provider_conf.dag_id}` DAG. See the following" " logs for details:\n" - ) + "\n".join(f" - {task.log_url}" for task in failed_pull_data_tasks) + ) + "\n".join( + f" - <{task.log_url}|{task.task_id}>" for task in failed_pull_data_tasks + ) slack.send_alert(message, provider_conf.dag_id, "Aggregate Reingestion Error") From 9d4736d07d76b1d7440e48ce1feb55aa88026473 Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Fri, 12 Apr 2024 11:15:08 -0700 Subject: [PATCH 6/6] Report at most 5 failed task logs --- catalog/dags/providers/provider_dag_factory.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/catalog/dags/providers/provider_dag_factory.py b/catalog/dags/providers/provider_dag_factory.py index 752b42faea8..ae44788c2db 100644 --- a/catalog/dags/providers/provider_dag_factory.py +++ b/catalog/dags/providers/provider_dag_factory.py @@ -584,10 +584,10 @@ def report_aggregate_reingestion_errors( message = ( f"Ingestion errors were encountered in {len(failed_pull_data_tasks)}" - f" ingestion days while running the `{provider_conf.dag_id}` DAG. See the following" + f" ingestion days while running the `{provider_conf.dag_id}` DAG. See the" " logs for details:\n" ) + "\n".join( - f" - <{task.log_url}|{task.task_id}>" for task in failed_pull_data_tasks + f" - <{task.log_url}|{task.task_id}>" for task in failed_pull_data_tasks[:5] ) slack.send_alert(message, provider_conf.dag_id, "Aggregate Reingestion Error")