Skip to content

Commit

Permalink
Revert "Toggle CloudWatch alarms actions during Data Refresh (#3652)"
Browse files Browse the repository at this point in the history
This reverts commit ce5424f.
  • Loading branch information
AetherUnbound committed Jan 22, 2024
1 parent 4bb67cf commit ee53e52
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 87 deletions.
58 changes: 0 additions & 58 deletions catalog/dags/common/cloudwatch.py

This file was deleted.

23 changes: 4 additions & 19 deletions catalog/dags/data_refresh/data_refresh_task_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,10 @@
from collections.abc import Sequence

from airflow.models.baseoperator import chain
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from airflow.utils.trigger_rule import TriggerRule

from common import ingestion_server, cloudwatch
from common import ingestion_server
from common.constants import XCOM_PULL_TEMPLATE
from common.sensors.single_run_external_dags_sensor import SingleRunExternalDAGsSensor
from common.sensors.utils import wait_for_external_dag
Expand Down Expand Up @@ -132,13 +131,7 @@ def create_data_refresh_task_group(
generate_index_suffix = ingestion_server.generate_index_suffix.override(
trigger_rule=TriggerRule.NONE_FAILED,
)()

disable_alarms = PythonOperator(
task_id="disable_sensitive_cloudwatch_alarms",
python_callable=cloudwatch.enable_or_disable_alarms,
op_args=[False],
)
tasks.append([generate_index_suffix, disable_alarms])
tasks.append(generate_index_suffix)

# Trigger the 'ingest_upstream' task on the ingestion server and await its
# completion. This task copies the media table for the given model from the
Expand Down Expand Up @@ -182,13 +175,6 @@ def create_data_refresh_task_group(
# running against an index that is already promoted in production.
tasks.append(create_filtered_index)

enable_alarms = PythonOperator(
task_id="enable_sensitive_cloudwatch_alarms",
python_callable=cloudwatch.enable_or_disable_alarms,
op_args=[True],
trigger_rule=TriggerRule.ALL_DONE,
)

# Trigger the `promote` task on the ingestion server and await its completion.
# This task promotes the newly created API DB table and elasticsearch index. It
# does not include promotion of the filtered index, which must be promoted
Expand All @@ -203,7 +189,7 @@ def create_data_refresh_task_group(
},
timeout=data_refresh.data_refresh_timeout,
)
tasks.append([enable_alarms, promote_tasks])
tasks.append(promote_tasks)

# Delete the alias' previous target index, now unused.
delete_old_index = ingestion_server.trigger_task(
Expand All @@ -227,8 +213,7 @@ def create_data_refresh_task_group(
# └─ create_filtered_index
# └─ promote (trigger_promote + wait_for_promote)
# └─ delete_old_index
# └─ promote_filtered_index (including delete filtered index) +
# enable_alarms
# └─ promote_filtered_index (including delete filtered index)
chain(*tasks)

return data_refresh_group
7 changes: 3 additions & 4 deletions catalog/env.template
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,9 @@ AIRFLOW_PORT=9090
LOADER_FILE_AGE=1
# Contact email for any APIs
CONTACT_EMAIL=openverse@wordpress.org
# AWS configuration - does not need to be changed for development
AWS_ACCESS_KEY_ID=test_key
AWS_SECRET_ACCESS_KEY=test_secret
AWS_DEFAULT_REGION=us-east-1
# AWS/S3 configuration - does not need to be changed for development
AWS_ACCESS_KEY=test_key
AWS_SECRET_KEY=test_secret
# General bucket used for TSV->DB ingestion and logging
OPENVERSE_BUCKET=openverse-storage
# Seconds to wait before poking for availability of the data refresh pool when running a data_refresh
Expand Down
8 changes: 4 additions & 4 deletions catalog/tests/dags/common/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import pytest

from catalog.tests.dags.common.loader.test_s3 import (
AWS_ACCESS_KEY_ID,
ACCESS_KEY,
S3_LOCAL_ENDPOINT,
AWS_SECRET_ACCESS_KEY,
SECRET_KEY,
)


Expand Down Expand Up @@ -40,8 +40,8 @@ def empty_s3_bucket(request):
print(f"{bucket_name=}")
bucket = boto3.resource(
"s3",
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY,
endpoint_url=S3_LOCAL_ENDPOINT,
).Bucket(bucket_name)

Expand Down
4 changes: 2 additions & 2 deletions catalog/tests/dags/common/loader/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
TEST_STAGING_PREFIX = "test_staging"
S3_LOCAL_ENDPOINT = os.getenv("S3_LOCAL_ENDPOINT")
S3_TEST_BUCKET = f"cccatalog-storage-{TEST_ID}"
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
ACCESS_KEY = os.getenv("AWS_ACCESS_KEY")
SECRET_KEY = os.getenv("AWS_SECRET_KEY")


@pytest.fixture
Expand Down

0 comments on commit ee53e52

Please sign in to comment.