Skip to content

Commit

Permalink
Toggle CloudWatch alarms actions during Data Refresh (#3652)
Browse files Browse the repository at this point in the history
  • Loading branch information
krysal authored Jan 18, 2024
1 parent 8b0ea73 commit ce5424f
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 13 deletions.
58 changes: 58 additions & 0 deletions catalog/dags/common/cloudwatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""
CloudwatchWrapper extracted partially from
https://github.com/awsdocs/aws-doc-sdk-examples/blob/54c3b82d8f9a12a862f9fcec44909829bda849af/python/example_code/cloudwatch/cloudwatch_basics.py
"""

import logging
import boto3
from botocore.exceptions import ClientError

logger = logging.getLogger(__name__)


class CloudWatchWrapper:
"""Encapsulates Amazon CloudWatch functions"""

def __init__(self, cloudwatch_resource):
""":param cloudwatch_resource: A Boto3 CloudWatch resource."""
self.cloudwatch_resource = cloudwatch_resource

def enable_alarm_actions(self, alarm_name, enable):
"""
Enable or disable actions on the specified alarm. Alarm actions can be
used to send notifications or automate responses when an alarm enters a
particular state.
:param alarm_name: The name of the alarm.
:param enable: When True, actions are enabled for the alarm. Otherwise, they
disabled.
"""
try:
alarm = self.cloudwatch_resource.Alarm(alarm_name)
if enable:
alarm.enable_actions()
else:
alarm.disable_actions()
logger.info(
"%s actions for alarm %s.",
"Enabled" if enable else "Disabled",
alarm_name,
)
except ClientError:
logger.exception(
"Couldn't %s actions alarm %s.",
"enable" if enable else "disable",
alarm_name,
)
raise


def enable_or_disable_alarms(enable):
cw_wrapper = CloudWatchWrapper(boto3.resource("cloudwatch"))

sensitive_alarms_list = [
"ES Production CPU utilization above 50%",
]

for alarm in sensitive_alarms_list:
cw_wrapper.enable_alarm_actions(alarm, enable)
23 changes: 19 additions & 4 deletions catalog/dags/data_refresh/data_refresh_task_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@
from collections.abc import Sequence

from airflow.models.baseoperator import chain
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from airflow.utils.trigger_rule import TriggerRule

from common import ingestion_server
from common import ingestion_server, cloudwatch
from common.constants import XCOM_PULL_TEMPLATE
from common.sensors.single_run_external_dags_sensor import SingleRunExternalDAGsSensor
from common.sensors.utils import wait_for_external_dag
Expand Down Expand Up @@ -131,7 +132,13 @@ def create_data_refresh_task_group(
generate_index_suffix = ingestion_server.generate_index_suffix.override(
trigger_rule=TriggerRule.NONE_FAILED,
)()
tasks.append(generate_index_suffix)

disable_alarms = PythonOperator(
task_id="disable_sensitive_cloudwatch_alarms",
python_callable=cloudwatch.enable_or_disable_alarms,
op_args=[False],
)
tasks.append([generate_index_suffix, disable_alarms])

# Trigger the 'ingest_upstream' task on the ingestion server and await its
# completion. This task copies the media table for the given model from the
Expand Down Expand Up @@ -175,6 +182,13 @@ def create_data_refresh_task_group(
# running against an index that is already promoted in production.
tasks.append(create_filtered_index)

enable_alarms = PythonOperator(
task_id="enable_sensitive_cloudwatch_alarms",
python_callable=cloudwatch.enable_or_disable_alarms,
op_args=[True],
trigger_rule=TriggerRule.ALL_DONE,
)

# Trigger the `promote` task on the ingestion server and await its completion.
# This task promotes the newly created API DB table and elasticsearch index. It
# does not include promotion of the filtered index, which must be promoted
Expand All @@ -189,7 +203,7 @@ def create_data_refresh_task_group(
},
timeout=data_refresh.data_refresh_timeout,
)
tasks.append(promote_tasks)
tasks.append([enable_alarms, promote_tasks])

# Delete the alias' previous target index, now unused.
delete_old_index = ingestion_server.trigger_task(
Expand All @@ -213,7 +227,8 @@ def create_data_refresh_task_group(
# └─ create_filtered_index
# └─ promote (trigger_promote + wait_for_promote)
# └─ delete_old_index
# └─ promote_filtered_index (including delete filtered index)
# └─ promote_filtered_index (including delete filtered index) +
# enable_alarms
chain(*tasks)

return data_refresh_group
7 changes: 4 additions & 3 deletions catalog/env.template
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,10 @@ AIRFLOW_PORT=9090
LOADER_FILE_AGE=1
# Contact email for any APIs
CONTACT_EMAIL=openverse@wordpress.org
# AWS/S3 configuration - does not need to be changed for development
AWS_ACCESS_KEY=test_key
AWS_SECRET_KEY=test_secret
# AWS configuration - does not need to be changed for development
AWS_ACCESS_KEY_ID=test_key
AWS_SECRET_ACCESS_KEY=test_secret
AWS_DEFAULT_REGION=us-east-1
# General bucket used for TSV->DB ingestion and logging
OPENVERSE_BUCKET=openverse-storage
# Seconds to wait before poking for availability of the data refresh pool when running a data_refresh
Expand Down
8 changes: 4 additions & 4 deletions catalog/tests/dags/common/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import pytest

from catalog.tests.dags.common.loader.test_s3 import (
ACCESS_KEY,
AWS_ACCESS_KEY_ID,
S3_LOCAL_ENDPOINT,
SECRET_KEY,
AWS_SECRET_ACCESS_KEY,
)


Expand Down Expand Up @@ -40,8 +40,8 @@ def empty_s3_bucket(request):
print(f"{bucket_name=}")
bucket = boto3.resource(
"s3",
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY,
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
endpoint_url=S3_LOCAL_ENDPOINT,
).Bucket(bucket_name)

Expand Down
4 changes: 2 additions & 2 deletions catalog/tests/dags/common/loader/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
TEST_STAGING_PREFIX = "test_staging"
S3_LOCAL_ENDPOINT = os.getenv("S3_LOCAL_ENDPOINT")
S3_TEST_BUCKET = f"cccatalog-storage-{TEST_ID}"
ACCESS_KEY = os.getenv("AWS_ACCESS_KEY")
SECRET_KEY = os.getenv("AWS_SECRET_KEY")
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")


@pytest.fixture
Expand Down

0 comments on commit ce5424f

Please sign in to comment.