Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Toggle CloudWatch alarms actions during Data Refresh (#3652)" #3689

Merged
merged 1 commit into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 0 additions & 58 deletions catalog/dags/common/cloudwatch.py

This file was deleted.

23 changes: 4 additions & 19 deletions catalog/dags/data_refresh/data_refresh_task_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,10 @@
from collections.abc import Sequence

from airflow.models.baseoperator import chain
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from airflow.utils.trigger_rule import TriggerRule

from common import ingestion_server, cloudwatch
from common import ingestion_server
from common.constants import XCOM_PULL_TEMPLATE
from common.sensors.single_run_external_dags_sensor import SingleRunExternalDAGsSensor
from common.sensors.utils import wait_for_external_dag
Expand Down Expand Up @@ -132,13 +131,7 @@ def create_data_refresh_task_group(
generate_index_suffix = ingestion_server.generate_index_suffix.override(
trigger_rule=TriggerRule.NONE_FAILED,
)()

disable_alarms = PythonOperator(
task_id="disable_sensitive_cloudwatch_alarms",
python_callable=cloudwatch.enable_or_disable_alarms,
op_args=[False],
)
tasks.append([generate_index_suffix, disable_alarms])
tasks.append(generate_index_suffix)

# Trigger the 'ingest_upstream' task on the ingestion server and await its
# completion. This task copies the media table for the given model from the
Expand Down Expand Up @@ -182,13 +175,6 @@ def create_data_refresh_task_group(
# running against an index that is already promoted in production.
tasks.append(create_filtered_index)

enable_alarms = PythonOperator(
task_id="enable_sensitive_cloudwatch_alarms",
python_callable=cloudwatch.enable_or_disable_alarms,
op_args=[True],
trigger_rule=TriggerRule.ALL_DONE,
)

# Trigger the `promote` task on the ingestion server and await its completion.
# This task promotes the newly created API DB table and elasticsearch index. It
# does not include promotion of the filtered index, which must be promoted
Expand All @@ -203,7 +189,7 @@ def create_data_refresh_task_group(
},
timeout=data_refresh.data_refresh_timeout,
)
tasks.append([enable_alarms, promote_tasks])
tasks.append(promote_tasks)

# Delete the alias' previous target index, now unused.
delete_old_index = ingestion_server.trigger_task(
Expand All @@ -227,8 +213,7 @@ def create_data_refresh_task_group(
# └─ create_filtered_index
# └─ promote (trigger_promote + wait_for_promote)
# └─ delete_old_index
# └─ promote_filtered_index (including delete filtered index) +
# enable_alarms
# └─ promote_filtered_index (including delete filtered index)
chain(*tasks)

return data_refresh_group
7 changes: 3 additions & 4 deletions catalog/env.template
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,9 @@ AIRFLOW_PORT=9090
LOADER_FILE_AGE=1
# Contact email for any APIs
CONTACT_EMAIL=openverse@wordpress.org
# AWS configuration - does not need to be changed for development
AWS_ACCESS_KEY_ID=test_key
AWS_SECRET_ACCESS_KEY=test_secret
AWS_DEFAULT_REGION=us-east-1
# AWS/S3 configuration - does not need to be changed for development
AWS_ACCESS_KEY=test_key
AWS_SECRET_KEY=test_secret
# General bucket used for TSV->DB ingestion and logging
OPENVERSE_BUCKET=openverse-storage
# Seconds to wait before poking for availability of the data refresh pool when running a data_refresh
Expand Down
8 changes: 4 additions & 4 deletions catalog/tests/dags/common/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import pytest

from catalog.tests.dags.common.loader.test_s3 import (
AWS_ACCESS_KEY_ID,
ACCESS_KEY,
S3_LOCAL_ENDPOINT,
AWS_SECRET_ACCESS_KEY,
SECRET_KEY,
)


Expand Down Expand Up @@ -40,8 +40,8 @@ def empty_s3_bucket(request):
print(f"{bucket_name=}")
bucket = boto3.resource(
"s3",
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY,
endpoint_url=S3_LOCAL_ENDPOINT,
).Bucket(bucket_name)

Expand Down
4 changes: 2 additions & 2 deletions catalog/tests/dags/common/loader/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
TEST_STAGING_PREFIX = "test_staging"
S3_LOCAL_ENDPOINT = os.getenv("S3_LOCAL_ENDPOINT")
S3_TEST_BUCKET = f"cccatalog-storage-{TEST_ID}"
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
ACCESS_KEY = os.getenv("AWS_ACCESS_KEY")
SECRET_KEY = os.getenv("AWS_SECRET_KEY")


@pytest.fixture
Expand Down
Loading