Skip to content

Commit

Permalink
Pull methods out, move comments for clarity
Browse files Browse the repository at this point in the history
  • Loading branch information
stacimc committed Nov 10, 2023
1 parent 07c7a68 commit f961f74
Showing 1 changed file with 51 additions and 43 deletions.
94 changes: 51 additions & 43 deletions catalog/dags/data_refresh/create_filtered_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,44 @@
from data_refresh.data_refresh_types import DataRefresh


def create_and_populate_filtered_index(
media_type: str,
data_refresh: DataRefresh,
origin_index_suffix: str | None,
destination_index_suffix: str | None,
):
create_payload = {}
if origin_index_suffix:
create_payload["origin_index_suffix"] = origin_index_suffix
if destination_index_suffix:
create_payload["destination_index_suffix"] = destination_index_suffix

return ingestion_server.trigger_and_wait_for_task(
action="CREATE_AND_POPULATE_FILTERED_INDEX",
model=media_type,
data=create_payload or None,
timeout=data_refresh.create_filtered_index_timeout,
)


def point_alias(
media_type: str, target_alias: str, destination_index_suffix: str
) -> TaskGroup:
point_alias_payload = {
"alias": target_alias,
"index_suffix": f"{destination_index_suffix}-filtered",
}

with TaskGroup(group_id="point_alias") as point_alias_group:
ingestion_server.trigger_and_wait_for_task(
action="POINT_ALIAS",
model=media_type,
data=point_alias_payload,
timeout=timedelta(hours=12), # matches the ingestion server's wait time
)
return point_alias_group


def create_filtered_index_creation_task_groups(
data_refresh: DataRefresh,
origin_index_suffix: str | None,
Expand All @@ -38,38 +76,6 @@ def create_filtered_index_creation_task_groups(
media_type = data_refresh.media_type
target_alias = f"{media_type}-filtered"

def create_and_populate_filtered_index(
origin_index_suffix: str | None,
destination_index_suffix: str | None,
):
create_payload = {}
if origin_index_suffix:
create_payload["origin_index_suffix"] = origin_index_suffix
if destination_index_suffix:
create_payload["destination_index_suffix"] = destination_index_suffix

return ingestion_server.trigger_and_wait_for_task(
action="CREATE_AND_POPULATE_FILTERED_INDEX",
model=media_type,
data=create_payload or None,
timeout=data_refresh.create_filtered_index_timeout,
)

def point_alias(destination_index_suffix: str) -> TaskGroup:
point_alias_payload = {
"alias": target_alias,
"index_suffix": f"{destination_index_suffix}-filtered",
}

with TaskGroup(group_id="point_alias") as point_alias_group:
ingestion_server.trigger_and_wait_for_task(
action="POINT_ALIAS",
model=media_type,
data=point_alias_payload,
timeout=timedelta(hours=12), # matches the ingestion server's wait time
)
return point_alias_group

with TaskGroup(group_id="create_filtered_index") as create_filtered_index_group:
# If a destination index suffix isn't provided, we need to generate
# one so that we know where to point the alias
Expand All @@ -79,18 +85,11 @@ def point_alias(destination_index_suffix: str) -> TaskGroup:
)(destination_index_suffix)
)

# Determine the current index. The current index retrieval has to happen prior
# to any of the index creation steps to ensure the appropriate index information
# is retrieved.
get_current_index_if_exists = ingestion_server.get_current_index(target_alias)

do_create, await_create = create_and_populate_filtered_index(
origin_index_suffix=origin_index_suffix,
destination_index_suffix=final_destination_index_suffix,
)

# Determine the destination index suffix and get the current index. The current
# index retrieval has to happen prior to any of the index creation steps to
# ensure the appropriate index information is retrieved.
[get_current_index_if_exists, destination_index_suffix]

# The current index retrieval step can be skipped if the index does not
# currently exist. The empty operator below works as a control flow management
# step to ensure the create step runs even if the current index retrieval step
Expand All @@ -101,6 +100,13 @@ def point_alias(destination_index_suffix: str) -> TaskGroup:
trigger_rule=TriggerRule.NONE_FAILED,
)

do_create, await_create = create_and_populate_filtered_index(
media_type=media_type,
data_refresh=data_refresh,
origin_index_suffix=origin_index_suffix,
destination_index_suffix=final_destination_index_suffix,
)

get_current_index_if_exists >> continue_if_no_current_index >> do_create
do_create >> await_create

Expand All @@ -113,7 +119,9 @@ def point_alias(destination_index_suffix: str) -> TaskGroup:
)

do_point_alias = point_alias(
destination_index_suffix=final_destination_index_suffix
media_type=media_type,
target_alias=target_alias,
destination_index_suffix=final_destination_index_suffix,
)

delete_old_index = ingestion_server.trigger_task(
Expand Down

0 comments on commit f961f74

Please sign in to comment.