From 452558a234f500a79b6658c6f6b4a1a14984474a Mon Sep 17 00:00:00 2001 From: Staci Mullins <63313398+stacimc@users.noreply.github.com> Date: Thu, 31 Oct 2024 10:05:02 -0700 Subject: [PATCH] Configure poke interval for the filtered index creation by environment (#5114) * Configure poke interval for the filtered index creation by environment * Set kwarg explicitly in mapped task * Adjust poke intervals --- catalog/dags/common/elasticsearch.py | 3 ++- .../dags/data_refresh/create_and_populate_filtered_index.py | 2 ++ catalog/dags/data_refresh/dag_factory.py | 1 + catalog/dags/data_refresh/data_refresh_types.py | 6 +++--- .../create_proportional_by_source_staging_index_dag.py | 2 ++ 5 files changed, 10 insertions(+), 4 deletions(-) diff --git a/catalog/dags/common/elasticsearch.py b/catalog/dags/common/elasticsearch.py index 1c2a0493c6d..814797c29ce 100644 --- a/catalog/dags/common/elasticsearch.py +++ b/catalog/dags/common/elasticsearch.py @@ -143,6 +143,7 @@ def trigger_and_wait_for_reindex( max_docs: int | None = None, refresh: bool = True, slices: Union[int, Literal["auto"]] = "auto", + poke_interval: int = REFRESH_POKE_INTERVAL, ): @task def trigger_reindex( @@ -208,7 +209,7 @@ def wait_for_reindex( slices, ) - wait_for_reindex_task = wait_for_reindex( + wait_for_reindex_task = wait_for_reindex.override(poke_interval=poke_interval)( task_id=trigger_reindex_task, expected_docs=max_docs, es_host=es_host ) diff --git a/catalog/dags/data_refresh/create_and_populate_filtered_index.py b/catalog/dags/data_refresh/create_and_populate_filtered_index.py index 1a636e08ade..b263183effd 100644 --- a/catalog/dags/data_refresh/create_and_populate_filtered_index.py +++ b/catalog/dags/data_refresh/create_and_populate_filtered_index.py @@ -51,6 +51,7 @@ def create_and_populate_filtered_index( origin_index_name: str, filtered_index_name: str, timeout: timedelta, + poke_interval: int, ): """ Create and populate a filtered index based on the given origin index, excluding @@ -90,6 +91,7 @@ def create_and_populate_filtered_index( } }, refresh=False, + poke_interval=poke_interval, ) refresh_index = es.refresh_index(es_host=es_host, index_name=filtered_index_name) diff --git a/catalog/dags/data_refresh/dag_factory.py b/catalog/dags/data_refresh/dag_factory.py index b4511808d02..a580a1eb3fb 100644 --- a/catalog/dags/data_refresh/dag_factory.py +++ b/catalog/dags/data_refresh/dag_factory.py @@ -243,6 +243,7 @@ def create_data_refresh_dag( origin_index_name=target_index_name, filtered_index_name=filtered_index_name, timeout=data_refresh_config.create_filtered_index_timeout, + poke_interval=data_refresh_config.reindex_poke_interval, ) # Re-enable Cloudwatch alarms once reindexing is complete, even if it diff --git a/catalog/dags/data_refresh/data_refresh_types.py b/catalog/dags/data_refresh/data_refresh_types.py index 3ff42d0325e..0041443047d 100644 --- a/catalog/dags/data_refresh/data_refresh_types.py +++ b/catalog/dags/data_refresh/data_refresh_types.py @@ -128,9 +128,9 @@ def table_mappings(self) -> list[TableMapping]: add_primary_key_timeout=timedelta(hours=12), indexer_worker_timeout=timedelta(days=1), concurrency_check_poke_interval=int( - os.getenv("DATA_REFRESH_POKE_INTERVAL", 60) + os.getenv("DATA_REFRESH_POKE_INTERVAL", 60 * 5) ), - reindex_poke_interval=int(os.getenv("DATA_REFRESH_POKE_INTERVAL", 60)), + reindex_poke_interval=int(os.getenv("DATA_REFRESH_POKE_INTERVAL", 60 * 10)), ), AUDIO: DataRefreshConfig( media_type=AUDIO, @@ -152,6 +152,6 @@ def table_mappings(self) -> list[TableMapping]: concurrency_check_poke_interval=int( os.getenv("DATA_REFRESH_POKE_INTERVAL", 60 * 30) ), - reindex_poke_interval=int(os.getenv("DATA_REFRESH_POKE_INTERVAL", 60)), + reindex_poke_interval=int(os.getenv("DATA_REFRESH_POKE_INTERVAL", 60 * 5)), ), } diff --git a/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py b/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py index aeecd9381ab..243903b6eca 100644 --- a/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py +++ b/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py @@ -46,6 +46,7 @@ AUDIO, DAG_DEFAULT_ARGS, MEDIA_TYPES, + REFRESH_POKE_INTERVAL, STAGING, ) from common.sensors.constants import STAGING_ES_CONCURRENCY_TAG @@ -158,6 +159,7 @@ def create_proportional_by_source_staging_index(): # Do not refresh the index after each partial reindex refresh=False, es_host=es_host, + poke_interval=REFRESH_POKE_INTERVAL, ).expand_kwargs(desired_source_counts) refresh_destination_index = es.refresh_index(