diff --git a/.github/workflows/ci_cd.yml b/.github/workflows/ci_cd.yml index c24fb642eff..4a01e3d5637 100644 --- a/.github/workflows/ci_cd.yml +++ b/.github/workflows/ci_cd.yml @@ -457,7 +457,7 @@ jobs: uses: ./.github/actions/load-img with: run_id: ${{ github.run_id }} - setup_images: upstream_db ingestion_server + setup_images: upstream_db ingestion_server catalog # Sets build args specifying versions needed to build Docker image. - name: Prepare build args @@ -486,6 +486,9 @@ jobs: API_PY_VERSION=${{ steps.prepare-build-args.outputs.api_py_version }} PDM_INSTALL_ARGS=--dev + - name: Start Catalog + run: just catalog/up + - name: Start API, ingest and index test data run: just api/init diff --git a/catalog/dags/common/sensors/single_run_external_dags_sensor.py b/catalog/dags/common/sensors/single_run_external_dags_sensor.py index 81afd0a7114..a6279b6dce3 100644 --- a/catalog/dags/common/sensors/single_run_external_dags_sensor.py +++ b/catalog/dags/common/sensors/single_run_external_dags_sensor.py @@ -21,6 +21,8 @@ class SingleRunExternalDAGsSensor(BaseSensorOperator): :param external_dag_ids: A list of dag_ids that you want to wait for :param check_existence: Set to `True` to check if the external DAGs exist, and immediately cease waiting if not (default value: False). + :param allow_concurrent_runs: Used to force the Sensor to pass, even + if there are concurrent runs. """ def __init__( @@ -28,12 +30,16 @@ def __init__( *, external_dag_ids: Iterable[str], check_existence: bool = False, + allow_concurrent_runs: bool = False, **kwargs, ): super().__init__(**kwargs) self.external_dag_ids = external_dag_ids self.check_existence = check_existence - self._has_checked_existence = False + self.allow_concurrent_runs = allow_concurrent_runs + + # Used to ensure some checks are only evaluated on the first poke + self._has_checked_params = False @provide_session def poke(self, context, session=None): @@ -42,8 +48,20 @@ def poke(self, context, session=None): self.external_dag_ids, ) - if self.check_existence: - self._check_for_existence(session=session) + if not self._has_checked_params: + if self.allow_concurrent_runs: + self.log.info( + "`allow_concurrent_runs` is enabled. Returning without" + " checking for running DAGs." + ) + return True + + if self.check_existence: + self._check_for_existence(session=session) + + # Only check DAG existence and `allow_concurrent_runs` + # on the first execution. + self._has_checked_params = True count_running = self.get_count(session) @@ -51,10 +69,6 @@ def poke(self, context, session=None): return count_running == 0 def _check_for_existence(self, session) -> None: - # Check DAG existence only once, on the first execution. - if self._has_checked_existence: - return - for dag_id in self.external_dag_ids: dag_to_wait = ( session.query(DagModel).filter(DagModel.dag_id == dag_id).first() @@ -72,7 +86,6 @@ def _check_for_existence(self, session) -> None: f"The external DAG {dag_id} does not have a task " f"with id {self.task_id}." ) - self._has_checked_existence = True def get_count(self, session) -> int: # Get the count of running DAGs. A DAG is considered 'running' if diff --git a/catalog/dags/data_refresh/copy_data.py b/catalog/dags/data_refresh/copy_data.py index 76f357244c3..aa2213f4993 100644 --- a/catalog/dags/data_refresh/copy_data.py +++ b/catalog/dags/data_refresh/copy_data.py @@ -40,15 +40,18 @@ def initialize_fdw( upstream_conn_id: str, downstream_conn_id: str, + media_type: str, task: AbstractOperator = None, ): """Create the FDW and prepare it for copying.""" upstream_connection = Connection.get_connection_from_secrets(upstream_conn_id) + fdw_name = f"upstream_{media_type}" run_sql.function( postgres_conn_id=downstream_conn_id, sql_template=queries.CREATE_FDW_QUERY, task=task, + fdw_name=fdw_name, host=upstream_connection.host, port=upstream_connection.port, dbname=upstream_connection.schema, @@ -56,12 +59,16 @@ def initialize_fdw( password=upstream_connection.password, ) + return fdw_name + @task( max_active_tis_per_dagrun=1, map_index_template="{{ task.op_kwargs['upstream_table_name'] }}", ) -def create_schema(downstream_conn_id: str, upstream_table_name: str) -> str: +def create_schema( + downstream_conn_id: str, upstream_table_name: str, fdw_name: str +) -> str: """ Create a new schema in the downstream DB through which the upstream table can be accessed. Returns the schema name. @@ -73,7 +80,9 @@ def create_schema(downstream_conn_id: str, upstream_table_name: str) -> str: schema_name = f"upstream_{upstream_table_name}_schema" downstream_pg.run( queries.CREATE_SCHEMA_QUERY.format( - schema_name=schema_name, upstream_table_name=upstream_table_name + fdw_name=fdw_name, + schema_name=schema_name, + upstream_table_name=upstream_table_name, ) ) return schema_name @@ -183,6 +192,7 @@ def copy_data( def copy_upstream_table( upstream_conn_id: str, downstream_conn_id: str, + fdw_name: str, timeout: timedelta, limit: int, upstream_table_name: str, @@ -206,6 +216,7 @@ def copy_upstream_table( schema = create_schema( downstream_conn_id=downstream_conn_id, upstream_table_name=upstream_table_name, + fdw_name=fdw_name, ) create_temp_table = run_sql.override( @@ -286,6 +297,7 @@ def copy_upstream_tables( init_fdw = initialize_fdw( upstream_conn_id=upstream_conn_id, downstream_conn_id=downstream_conn_id, + media_type=data_refresh_config.media_type, ) limit = get_record_limit() @@ -294,6 +306,7 @@ def copy_upstream_tables( copy_tables = copy_upstream_table.partial( upstream_conn_id=upstream_conn_id, downstream_conn_id=downstream_conn_id, + fdw_name=init_fdw, timeout=data_refresh_config.copy_data_timeout, limit=limit, ).expand_kwargs([asdict(tm) for tm in data_refresh_config.table_mappings]) @@ -301,6 +314,7 @@ def copy_upstream_tables( drop_fdw = run_sql.override(task_id="drop_fdw")( postgres_conn_id=downstream_conn_id, sql_template=queries.DROP_SERVER_QUERY, + fdw_name=init_fdw, ) # Set up dependencies diff --git a/catalog/dags/data_refresh/create_and_populate_filtered_index.py b/catalog/dags/data_refresh/create_and_populate_filtered_index.py index acb47f00ec8..1a636e08ade 100644 --- a/catalog/dags/data_refresh/create_and_populate_filtered_index.py +++ b/catalog/dags/data_refresh/create_and_populate_filtered_index.py @@ -49,17 +49,13 @@ def create_and_populate_filtered_index( es_host: str, media_type: MediaType, origin_index_name: str, + filtered_index_name: str, timeout: timedelta, - destination_index_name: str | None = None, ): """ Create and populate a filtered index based on the given origin index, excluding documents with sensitive terms. """ - filtered_index_name = get_filtered_index_name( - media_type=media_type, destination_index_name=destination_index_name - ) - create_filtered_index = es.create_index.override( trigger_rule=TriggerRule.NONE_FAILED, )( @@ -76,7 +72,6 @@ def create_and_populate_filtered_index( method="GET", response_check=lambda response: response.status_code == 200, response_filter=response_filter_sensitive_terms_endpoint, - trigger_rule=TriggerRule.NONE_FAILED, ) populate_filtered_index = es.trigger_and_wait_for_reindex( @@ -99,7 +94,5 @@ def create_and_populate_filtered_index( refresh_index = es.refresh_index(es_host=es_host, index_name=filtered_index_name) - sensitive_terms >> populate_filtered_index - create_filtered_index >> populate_filtered_index >> refresh_index - - return filtered_index_name + # sensitive_terms >> populate_filtered_index + create_filtered_index >> sensitive_terms >> populate_filtered_index >> refresh_index diff --git a/catalog/dags/data_refresh/create_index.py b/catalog/dags/data_refresh/create_index.py deleted file mode 100644 index 78a4066c26b..00000000000 --- a/catalog/dags/data_refresh/create_index.py +++ /dev/null @@ -1,44 +0,0 @@ -""" -# Create Index - -This file contains TaskGroups related to creating Elasticsearch indices -as part of the Data Refresh. -""" - -import logging -import uuid - -from airflow.decorators import task, task_group - -from common import elasticsearch as es -from data_refresh.data_refresh_types import DataRefreshConfig -from data_refresh.es_mapping import index_settings - - -logger = logging.getLogger(__name__) - - -@task -def generate_index_name(media_type: str) -> str: - return f"{media_type}-{uuid.uuid4().hex}" - - -@task_group(group_id="create_temp_index") -def create_index( - data_refresh_config: DataRefreshConfig, - es_host: str, -): - # Generate a UUID suffix that will be used by the newly created index. - temp_index_name = generate_index_name(media_type=data_refresh_config.media_type) - - # Create a new index - es.create_index( - index_config={ - "index": temp_index_name, - "body": index_settings(data_refresh_config.media_type), - }, - es_host=es_host, - ) - - # Return the name of the created index - return temp_index_name diff --git a/catalog/dags/data_refresh/dag_factory.py b/catalog/dags/data_refresh/dag_factory.py index 7c1e975a58f..cb66c92c406 100644 --- a/catalog/dags/data_refresh/dag_factory.py +++ b/catalog/dags/data_refresh/dag_factory.py @@ -27,11 +27,13 @@ """ import logging +import uuid from collections.abc import Sequence from itertools import product from airflow import DAG -from airflow.decorators import task_group +from airflow.decorators import task, task_group +from airflow.models.param import Param from airflow.operators.python import PythonOperator from airflow.utils.trigger_rule import TriggerRule @@ -41,6 +43,7 @@ DAG_DEFAULT_ARGS, DATA_REFRESH_POOL, ENVIRONMENTS, + PRODUCTION, Environment, ) from common.sensors.constants import ES_CONCURRENCY_TAGS @@ -50,10 +53,11 @@ from data_refresh.copy_data import copy_upstream_tables from data_refresh.create_and_populate_filtered_index import ( create_and_populate_filtered_index, + get_filtered_index_name, ) -from data_refresh.create_index import create_index from data_refresh.data_refresh_types import DATA_REFRESH_CONFIGS, DataRefreshConfig from data_refresh.distributed_reindex import perform_distributed_reindex +from data_refresh.es_mapping import index_settings from data_refresh.promote_table import promote_tables from data_refresh.reporting import report_record_difference @@ -66,12 +70,14 @@ def wait_for_conflicting_dags( data_refresh_config: DataRefreshConfig, external_dag_ids: list[str], concurrency_tag: str, + allow_concurrent_data_refreshes: bool, ): # Wait to ensure that no other Data Refresh DAGs are running. SingleRunExternalDAGsSensor( task_id="wait_for_data_refresh", external_dag_ids=external_dag_ids, check_existence=True, + allow_concurrent_runs=allow_concurrent_data_refreshes, poke_interval=data_refresh_config.concurrency_check_poke_interval, mode="reschedule", pool=DATA_REFRESH_POOL, @@ -91,6 +97,12 @@ def wait_for_conflicting_dags( ) +@task +def generate_index_name(media_type: str, index_suffix: str) -> str: + suffix = index_suffix or uuid.uuid4().hex + return f"{media_type}-{suffix}" + + def create_data_refresh_dag( data_refresh_config: DataRefreshConfig, target_environment: Environment, @@ -122,7 +134,9 @@ def create_data_refresh_dag( dagrun_timeout=data_refresh_config.dag_timeout, default_args=default_args, start_date=data_refresh_config.start_date, - schedule=data_refresh_config.schedule, + schedule=( + data_refresh_config.schedule if target_environment == PRODUCTION else None + ), max_active_runs=1, catchup=False, doc_md=__doc__, @@ -132,6 +146,26 @@ def create_data_refresh_dag( concurrency_tag, ], render_template_as_native_obj=True, + params={ + "index_suffix": Param( + default=None, + type=["null", "string"], + description=( + "Optional suffix appended to the `media_type` in the Elasticsearch index" + " name. If not supplied, a uuid is used." + ), + ), + "allow_concurrent_data_refreshes": Param( + default=False, + type="boolean", + description=( + "Whether to allow multiple data refresh DAGs for the given environment" + " to run concurrently. This setting should be enabled with extreme" + " caution, as reindexing multiple large Elasticsearch indices" + " simultaneously should be avoided." + ), + ), + }, ) with dag: @@ -147,7 +181,10 @@ def create_data_refresh_dag( ) wait_for_dags = wait_for_conflicting_dags( - data_refresh_config, external_dag_ids, concurrency_tag + data_refresh_config, + external_dag_ids, + concurrency_tag, + "{{ params.allow_concurrent_data_refreshes }}", ) copy_data = copy_upstream_tables( @@ -160,10 +197,19 @@ def create_data_refresh_dag( data_refresh_config=data_refresh_config, ) + target_index_name = generate_index_name( + media_type=data_refresh_config.media_type, + index_suffix="{{ params.index_suffix }}", + ) + # Create a new temporary index based off the configuration of the existing media index. # This will later replace the live index. - target_index = create_index( - data_refresh_config=data_refresh_config, es_host=es_host + target_index = es.create_index( + index_config={ + "index": target_index_name, + "body": index_settings(data_refresh_config.media_type), + }, + es_host=es_host, ) # Disable Cloudwatch alarms that are noisy during the reindexing steps of a @@ -181,16 +227,21 @@ def create_data_refresh_dag( es_host=es_host, environment="{{ var.value.ENVIRONMENT }}", target_environment=target_environment, - target_index=target_index, + target_index=target_index_name, data_refresh_config=data_refresh_config, ) + filtered_index_name = get_filtered_index_name( + media_type=data_refresh_config.media_type, + destination_index_name=f"{target_index_name}-filtered", + ) + # Create and populate the filtered index filtered_index = create_and_populate_filtered_index( es_host=es_host, media_type=data_refresh_config.media_type, - origin_index_name=target_index, - destination_index_name=f"{target_index}-filtered", + origin_index_name=target_index_name, + filtered_index_name=filtered_index_name, timeout=data_refresh_config.create_filtered_index_timeout, ) @@ -213,7 +264,7 @@ def create_data_refresh_dag( promote_index = es.point_alias.override(group_id="promote_index")( es_host=es_host, - target_index=target_index, + target_index=target_index_name, target_alias=data_refresh_config.media_type, should_delete_old_index=True, ) @@ -222,7 +273,7 @@ def create_data_refresh_dag( group_id="promote_filtered_index" )( es_host=es_host, - target_index=filtered_index, + target_index=filtered_index_name, target_alias=f"{data_refresh_config.media_type}-filtered", should_delete_old_index=True, ) @@ -253,9 +304,11 @@ def create_data_refresh_dag( >> wait_for_dags >> copy_data >> alter_data + >> target_index_name >> target_index >> disable_alarms >> reindex + >> filtered_index_name >> filtered_index ) # Note filtered_index must be directly upstream of promote_table to diff --git a/catalog/dags/data_refresh/data_refresh_types.py b/catalog/dags/data_refresh/data_refresh_types.py index 5faf4c3eab0..87418bd2a87 100644 --- a/catalog/dags/data_refresh/data_refresh_types.py +++ b/catalog/dags/data_refresh/data_refresh_types.py @@ -63,7 +63,7 @@ class DataRefreshConfig: airflow.dag.DAG __init__ method. start_date: datetime.datetime giving the first valid logical date of the DAG. - schedule: string giving the schedule on which the DAG + schedule: string giving the schedule on which the production DAG should be run. Passed to the airflow.dag.DAG __init__ method. dag_timeout: timedelta expressing the amount of time the entire diff --git a/catalog/dags/data_refresh/distributed_reindex.py b/catalog/dags/data_refresh/distributed_reindex.py index e4a7116a6b3..c6dac465d1d 100644 --- a/catalog/dags/data_refresh/distributed_reindex.py +++ b/catalog/dags/data_refresh/distributed_reindex.py @@ -237,13 +237,14 @@ def get_instance_ip_address( ) def create_connection( instance_id: str, + media_type: str, server: str, ): """ Create an Airflow Connection for the given indexer worker and persist it. It will later be dropped in a cleanup task. """ - worker_conn_id = f"indexer_worker_{instance_id or 'localhost'}" + worker_conn_id = f"indexer_worker_{instance_id or media_type}" # Create the Connection logger.info(f"Creating connection with id {worker_conn_id}") @@ -333,6 +334,7 @@ def reindex( worker_conn = create_connection( instance_id=instance_id, + media_type=data_refresh_config.media_type, server=instance_ip_address, ) diff --git a/catalog/dags/data_refresh/queries.py b/catalog/dags/data_refresh/queries.py index d3f6c0dcfc9..8a0c27b7a0d 100644 --- a/catalog/dags/data_refresh/queries.py +++ b/catalog/dags/data_refresh/queries.py @@ -5,11 +5,11 @@ CREATE_FDW_QUERY = dedent( """ - DROP SERVER IF EXISTS upstream CASCADE; - CREATE SERVER upstream FOREIGN DATA WRAPPER postgres_fdw + DROP SERVER IF EXISTS {fdw_name} CASCADE; + CREATE SERVER {fdw_name} FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '{host}', dbname '{dbname}', port '{port}'); - CREATE USER MAPPING IF NOT EXISTS FOR deploy SERVER upstream + CREATE USER MAPPING IF NOT EXISTS FOR deploy SERVER {fdw_name} OPTIONS (user '{user}', password '{password}'); """ ) @@ -20,7 +20,7 @@ CREATE SCHEMA {schema_name} AUTHORIZATION deploy; IMPORT FOREIGN SCHEMA public LIMIT TO ({upstream_table_name}) - FROM SERVER upstream INTO {schema_name}; + FROM SERVER {fdw_name} INTO {schema_name}; """ ) @@ -79,7 +79,7 @@ ADD_PRIMARY_KEY_QUERY = "ALTER TABLE {temp_table_name} ADD PRIMARY KEY (id);" -DROP_SERVER_QUERY = "DROP SERVER upstream CASCADE;" +DROP_SERVER_QUERY = "DROP SERVER {fdw_name} CASCADE;" SELECT_TABLE_INDICES_QUERY = ( "SELECT indexdef FROM pg_indexes WHERE tablename='{table_name}';" diff --git a/catalog/dags/data_refresh/remap_table_indices.py b/catalog/dags/data_refresh/remap_table_indices.py index 7b7efd1bf5b..d93bca6a82b 100644 --- a/catalog/dags/data_refresh/remap_table_indices.py +++ b/catalog/dags/data_refresh/remap_table_indices.py @@ -8,7 +8,6 @@ from typing import NamedTuple from airflow.decorators import task, task_group -from airflow.utils.trigger_rule import TriggerRule from common.sql import fetch_all, run_sql from data_refresh import queries @@ -103,9 +102,7 @@ def remap_table_indices_to_table( later when the table is promoted. """ # Get the CREATE statements for the indices applied to the live (old) table - existing_index_defs = run_sql.override( - task_id="get_existing_index_defs", trigger_rule=TriggerRule.NONE_FAILED - )( + existing_index_defs = run_sql.override(task_id="get_existing_index_defs")( postgres_conn_id=postgres_conn_id, sql_template=queries.SELECT_TABLE_INDICES_QUERY, handler=fetch_all, diff --git a/catalog/justfile b/catalog/justfile index 135a8f5e601..0859b4ecd8e 100644 --- a/catalog/justfile +++ b/catalog/justfile @@ -83,6 +83,11 @@ recreate: shell: env DC_USER="airflow" just ../exec {{ SERVICE }} /bin/bash +# Run an Airflow CLI command and then exit +[positional-arguments] +cli *args: + env DC_USER="airflow" just ../exec {{ SERVICE }} "$@" + # Launch an IPython shell in a new container under `SERVICE` [positional-arguments] ipython *args: up-deps diff --git a/catalog/tests/dags/common/sensors/test_single_run_external_dags_sensor.py b/catalog/tests/dags/common/sensors/test_single_run_external_dags_sensor.py index 6b11acd9890..a544f92ad85 100644 --- a/catalog/tests/dags/common/sensors/test_single_run_external_dags_sensor.py +++ b/catalog/tests/dags/common/sensors/test_single_run_external_dags_sensor.py @@ -212,8 +212,25 @@ def test_succeeds_if_no_running_dags( "ignore:This class is deprecated. Please use " "`airflow.utils.task_group.TaskGroup`.:airflow.exceptions.RemovedInAirflow3Warning" ) +@pytest.mark.parametrize( + "allow_concurrent_runs, expected_message", + [ + (False, "1 DAGs are in the running state"), + ( + True, + "`allow_concurrent_runs` is enabled. Returning without checking for" + " running DAGs.", + ), + ], +) def test_retries_if_running_dags_with_completed_sensor_task( - caplog, sample_dag_id_fixture, sample_pool_fixture, clean_db, setup_pool + allow_concurrent_runs, + expected_message, + caplog, + sample_dag_id_fixture, + sample_pool_fixture, + clean_db, + setup_pool, ): # Create a DAG in the 'running' state running_dag = create_dag("running_dag", sample_dag_id_fixture, sample_pool_fixture) @@ -236,7 +253,7 @@ def test_retries_if_running_dags_with_completed_sensor_task( # Create the Test DAG and sensor and set up dependent dag Ids dag = DAG( - "test_dag_failure", + f"test_dag_failure_with_allow_concurrent_runs_{allow_concurrent_runs}", schedule=None, default_args={ "owner": "airflow", @@ -249,6 +266,7 @@ def test_retries_if_running_dags_with_completed_sensor_task( f"{sample_dag_id_fixture}_success_dag", f"{sample_dag_id_fixture}_running_dag", ], + allow_concurrent_runs=allow_concurrent_runs, poke_interval=5, mode="reschedule", dag=dag, @@ -263,4 +281,4 @@ def test_retries_if_running_dags_with_completed_sensor_task( f"{sample_dag_id_fixture}_success_dag', '{sample_dag_id_fixture}_running_dag'] ..." in caplog.text ) - assert "1 DAGs are in the running state" in caplog.text + assert expected_message in caplog.text diff --git a/documentation/catalog/reference/DAGs.md b/documentation/catalog/reference/DAGs.md index a61b2ab44ef..8826835e325 100644 --- a/documentation/catalog/reference/DAGs.md +++ b/documentation/catalog/reference/DAGs.md @@ -45,8 +45,8 @@ The following are DAGs grouped by their primary tag: | [`create_filtered_image_index`](#create_filtered_media_type_index) | `None` | | [`production_audio_data_refresh`](#environment_media_type_data_refresh) | `0 0 * * 1` | | [`production_image_data_refresh`](#environment_media_type_data_refresh) | `0 0 * * 1` | -| [`staging_audio_data_refresh`](#environment_media_type_data_refresh) | `0 0 * * 1` | -| [`staging_image_data_refresh`](#environment_media_type_data_refresh) | `0 0 * * 1` | +| [`staging_audio_data_refresh`](#environment_media_type_data_refresh) | `None` | +| [`staging_image_data_refresh`](#environment_media_type_data_refresh) | `None` | | [`audio_data_refresh`](#media_type_data_refresh) | `0 0 * * 1` | | [`image_data_refresh`](#media_type_data_refresh) | `0 0 * * 1` | diff --git a/load_sample_data.sh b/load_sample_data.sh index 24135066302..e6fe593b8da 100755 --- a/load_sample_data.sh +++ b/load_sample_data.sh @@ -6,6 +6,25 @@ CACHE_SERVICE_NAME="${CACHE_SERVICE_NAME:-cache}" UPSTREAM_DB_SERVICE_NAME="${UPSTREAM_DB_SERVICE_NAME:-upstream_db}" DB_SERVICE_NAME="${DB_SERVICE_NAME:-db}" +# Detect whether the `AIRFLOW_CONN_SENSITIVE_TERMS` is set +# `true` resolves to an empty string, and prevents the script from failing +# due to `set -e` and grep's non-zero status code if the pattern isn't found +has_sensitive_terms_airflow_conn=$(grep "AIRFLOW_CONN_SENSITIVE_TERMS" catalog/.env || true) + +# Temporary measure to prevent errors in the data refresh portion of the script +# when the AIRFLOW_CONN_SENSITIVE_TERMS is not defined in the .env file, by +# detecting when the variable is missing and populating it with the default from +# the template. This is only necessary temporarily, pending the work to undo the +# split indices for sensitive text detection +# (https://github.com/WordPress/openverse/pull/4904/files) +if [[ ! $has_sensitive_terms_airflow_conn ]]; then + echo "Adding new Airflow connection environment variable required for sample data loading" + grep "AIRFLOW_CONN_SENSITIVE_TERMS" catalog/env.template >>catalog/.env + + echo "Restarting Airflow to populate the new connection variable" + just dc restart webserver scheduler triggerer +fi + while getopts 'c' OPTION; do case "$OPTION" in c) @@ -135,35 +154,21 @@ just docker/es/delete-index image-init just docker/es/delete-index image-init-filtered # Ingest and index the data -just ingestion_server/ingest-upstream "audio" "init" -just docker/es/wait-for-index "audio-init" -just docker/es/wait-for-count "audio-init" -just ingestion_server/promote "audio" "init" "audio" +# Enable the staging data refresh dags, if they are not already. +# These DAGs are on a None schedule so no scheduled runs will be +# triggered. +just catalog/cli airflow dags unpause staging_audio_data_refresh +just catalog/cli airflow dags unpause staging_image_data_refresh +# Trigger the data refresh dags at the same time. The DAGs will manage +# concurrency issues. +just catalog/cli airflow dags trigger staging_audio_data_refresh --conf '{"index_suffix": "init", "allow_concurrent_data_refreshes": true}' +just catalog/cli airflow dags trigger staging_image_data_refresh --conf '{"index_suffix": "init", "allow_concurrent_data_refreshes": true}' +# Wait for all relevant indices to be created and promoted just docker/es/wait-for-index "audio" just docker/es/wait-for-count "audio" -just ingestion_server/create-and-populate-filtered-index "audio" "init" -just docker/es/wait-for-index "audio-init-filtered" -just ingestion_server/point-alias "audio" "init-filtered" "audio-filtered" just docker/es/wait-for-index "audio-filtered" "audio-init-filtered" - -# Image ingestion is flaky; but usually works on the next attempt -set +e -while true; do - just ingestion_server/ingest-upstream "image" "init" - if just docker/es/wait-for-index "image-init"; then - break - fi - ((c++)) && ((c == 3)) && break -done -set -e - -just docker/es/wait-for-count "image-init" -just ingestion_server/promote "image" "init" "image" just docker/es/wait-for-index "image" just docker/es/wait-for-count "image" -just ingestion_server/create-and-populate-filtered-index "image" "init" -just docker/es/wait-for-index "image-init-filtered" -just ingestion_server/point-alias "image" "init-filtered" "image-filtered" just docker/es/wait-for-index "image-filtered" "image-init-filtered" #########