From 10171b8b248a19aa81c9fc1744d49d55d9abe220 Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Wed, 30 Oct 2024 13:36:14 -0700 Subject: [PATCH 1/3] Configure poke interval for the filtered index creation by environment --- catalog/dags/common/elasticsearch.py | 3 ++- .../dags/data_refresh/create_and_populate_filtered_index.py | 2 ++ catalog/dags/data_refresh/dag_factory.py | 1 + 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/catalog/dags/common/elasticsearch.py b/catalog/dags/common/elasticsearch.py index 1c2a0493c6d..814797c29ce 100644 --- a/catalog/dags/common/elasticsearch.py +++ b/catalog/dags/common/elasticsearch.py @@ -143,6 +143,7 @@ def trigger_and_wait_for_reindex( max_docs: int | None = None, refresh: bool = True, slices: Union[int, Literal["auto"]] = "auto", + poke_interval: int = REFRESH_POKE_INTERVAL, ): @task def trigger_reindex( @@ -208,7 +209,7 @@ def wait_for_reindex( slices, ) - wait_for_reindex_task = wait_for_reindex( + wait_for_reindex_task = wait_for_reindex.override(poke_interval=poke_interval)( task_id=trigger_reindex_task, expected_docs=max_docs, es_host=es_host ) diff --git a/catalog/dags/data_refresh/create_and_populate_filtered_index.py b/catalog/dags/data_refresh/create_and_populate_filtered_index.py index 1a636e08ade..b263183effd 100644 --- a/catalog/dags/data_refresh/create_and_populate_filtered_index.py +++ b/catalog/dags/data_refresh/create_and_populate_filtered_index.py @@ -51,6 +51,7 @@ def create_and_populate_filtered_index( origin_index_name: str, filtered_index_name: str, timeout: timedelta, + poke_interval: int, ): """ Create and populate a filtered index based on the given origin index, excluding @@ -90,6 +91,7 @@ def create_and_populate_filtered_index( } }, refresh=False, + poke_interval=poke_interval, ) refresh_index = es.refresh_index(es_host=es_host, index_name=filtered_index_name) diff --git a/catalog/dags/data_refresh/dag_factory.py b/catalog/dags/data_refresh/dag_factory.py index cb66c92c406..63717bd683d 100644 --- a/catalog/dags/data_refresh/dag_factory.py +++ b/catalog/dags/data_refresh/dag_factory.py @@ -243,6 +243,7 @@ def create_data_refresh_dag( origin_index_name=target_index_name, filtered_index_name=filtered_index_name, timeout=data_refresh_config.create_filtered_index_timeout, + poke_interval=data_refresh_config.reindex_poke_interval, ) # Re-enable Cloudwatch alarms once reindexing is complete, even if it From 6f5b2ebb42fd4aad71a258a7bb5904a4714df4cc Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Wed, 30 Oct 2024 14:05:50 -0700 Subject: [PATCH 2/3] Set kwarg explicitly in mapped task --- .../create_proportional_by_source_staging_index_dag.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py b/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py index aeecd9381ab..243903b6eca 100644 --- a/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py +++ b/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py @@ -46,6 +46,7 @@ AUDIO, DAG_DEFAULT_ARGS, MEDIA_TYPES, + REFRESH_POKE_INTERVAL, STAGING, ) from common.sensors.constants import STAGING_ES_CONCURRENCY_TAG @@ -158,6 +159,7 @@ def create_proportional_by_source_staging_index(): # Do not refresh the index after each partial reindex refresh=False, es_host=es_host, + poke_interval=REFRESH_POKE_INTERVAL, ).expand_kwargs(desired_source_counts) refresh_destination_index = es.refresh_index( From 63a25caff46d29d55e1ac1455befcd9d421c96af Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Thu, 31 Oct 2024 09:55:41 -0700 Subject: [PATCH 3/3] Adjust poke intervals --- catalog/dags/data_refresh/data_refresh_types.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/catalog/dags/data_refresh/data_refresh_types.py b/catalog/dags/data_refresh/data_refresh_types.py index 87418bd2a87..abdff25d451 100644 --- a/catalog/dags/data_refresh/data_refresh_types.py +++ b/catalog/dags/data_refresh/data_refresh_types.py @@ -125,9 +125,9 @@ def table_mappings(self) -> list[TableMapping]: dag_timeout=timedelta(days=4), copy_data_timeout=timedelta(hours=12), concurrency_check_poke_interval=int( - os.getenv("DATA_REFRESH_POKE_INTERVAL", 60) + os.getenv("DATA_REFRESH_POKE_INTERVAL", 60 * 5) ), - reindex_poke_interval=int(os.getenv("DATA_REFRESH_POKE_INTERVAL", 60)), + reindex_poke_interval=int(os.getenv("DATA_REFRESH_POKE_INTERVAL", 60 * 10)), ), AUDIO: DataRefreshConfig( media_type=AUDIO, @@ -149,6 +149,6 @@ def table_mappings(self) -> list[TableMapping]: concurrency_check_poke_interval=int( os.getenv("DATA_REFRESH_POKE_INTERVAL", 60 * 30) ), - reindex_poke_interval=int(os.getenv("DATA_REFRESH_POKE_INTERVAL", 60)), + reindex_poke_interval=int(os.getenv("DATA_REFRESH_POKE_INTERVAL", 60 * 5)), ), }