diff --git a/catalog/dags/common/cloudwatch.py b/catalog/dags/common/cloudwatch.py new file mode 100644 index 00000000000..658384f9c65 --- /dev/null +++ b/catalog/dags/common/cloudwatch.py @@ -0,0 +1,58 @@ +""" +CloudwatchWrapper extracted partially from +https://github.com/awsdocs/aws-doc-sdk-examples/blob/54c3b82d8f9a12a862f9fcec44909829bda849af/python/example_code/cloudwatch/cloudwatch_basics.py +""" + +import logging +import boto3 +from botocore.exceptions import ClientError + +logger = logging.getLogger(__name__) + + +class CloudWatchWrapper: + """Encapsulates Amazon CloudWatch functions""" + + def __init__(self, cloudwatch_resource): + """:param cloudwatch_resource: A Boto3 CloudWatch resource.""" + self.cloudwatch_resource = cloudwatch_resource + + def enable_alarm_actions(self, alarm_name, enable): + """ + Enable or disable actions on the specified alarm. Alarm actions can be + used to send notifications or automate responses when an alarm enters a + particular state. + + :param alarm_name: The name of the alarm. + :param enable: When True, actions are enabled for the alarm. Otherwise, they + disabled. + """ + try: + alarm = self.cloudwatch_resource.Alarm(alarm_name) + if enable: + alarm.enable_actions() + else: + alarm.disable_actions() + logger.info( + "%s actions for alarm %s.", + "Enabled" if enable else "Disabled", + alarm_name, + ) + except ClientError: + logger.exception( + "Couldn't %s actions alarm %s.", + "enable" if enable else "disable", + alarm_name, + ) + raise + + +def enable_or_disable_alarms(enable): + cw_wrapper = CloudWatchWrapper(boto3.resource("cloudwatch")) + + sensitive_alarms_list = [ + "ES Production CPU utilization above 50%", + ] + + for alarm in sensitive_alarms_list: + cw_wrapper.enable_alarm_actions(alarm, enable) diff --git a/catalog/dags/data_refresh/data_refresh_task_factory.py b/catalog/dags/data_refresh/data_refresh_task_factory.py index 1328e82b31d..e9a14bd981c 100644 --- a/catalog/dags/data_refresh/data_refresh_task_factory.py +++ b/catalog/dags/data_refresh/data_refresh_task_factory.py @@ -49,10 +49,11 @@ from collections.abc import Sequence from airflow.models.baseoperator import chain +from airflow.operators.python import PythonOperator from airflow.utils.task_group import TaskGroup from airflow.utils.trigger_rule import TriggerRule -from common import ingestion_server +from common import ingestion_server, cloudwatch from common.constants import XCOM_PULL_TEMPLATE from common.sensors.single_run_external_dags_sensor import SingleRunExternalDAGsSensor from common.sensors.utils import wait_for_external_dag @@ -131,7 +132,13 @@ def create_data_refresh_task_group( generate_index_suffix = ingestion_server.generate_index_suffix.override( trigger_rule=TriggerRule.NONE_FAILED, )() - tasks.append(generate_index_suffix) + + disable_alarms = PythonOperator( + task_id="disable_sensitive_cloudwatch_alarms", + python_callable=cloudwatch.enable_or_disable_alarms, + op_args=[False], + ) + tasks.append([generate_index_suffix, disable_alarms]) # Trigger the 'ingest_upstream' task on the ingestion server and await its # completion. This task copies the media table for the given model from the @@ -175,6 +182,13 @@ def create_data_refresh_task_group( # running against an index that is already promoted in production. tasks.append(create_filtered_index) + enable_alarms = PythonOperator( + task_id="enable_sensitive_cloudwatch_alarms", + python_callable=cloudwatch.enable_or_disable_alarms, + op_args=[True], + trigger_rule=TriggerRule.ALL_DONE, + ) + # Trigger the `promote` task on the ingestion server and await its completion. # This task promotes the newly created API DB table and elasticsearch index. It # does not include promotion of the filtered index, which must be promoted @@ -189,7 +203,7 @@ def create_data_refresh_task_group( }, timeout=data_refresh.data_refresh_timeout, ) - tasks.append(promote_tasks) + tasks.append([enable_alarms, promote_tasks]) # Delete the alias' previous target index, now unused. delete_old_index = ingestion_server.trigger_task( @@ -213,7 +227,8 @@ def create_data_refresh_task_group( # └─ create_filtered_index # └─ promote (trigger_promote + wait_for_promote) # └─ delete_old_index - # └─ promote_filtered_index (including delete filtered index) + # └─ promote_filtered_index (including delete filtered index) + + # enable_alarms chain(*tasks) return data_refresh_group diff --git a/catalog/env.template b/catalog/env.template index 8e94e24716a..def19052cff 100644 --- a/catalog/env.template +++ b/catalog/env.template @@ -88,9 +88,10 @@ AIRFLOW_PORT=9090 LOADER_FILE_AGE=1 # Contact email for any APIs CONTACT_EMAIL=openverse@wordpress.org -# AWS/S3 configuration - does not need to be changed for development -AWS_ACCESS_KEY=test_key -AWS_SECRET_KEY=test_secret +# AWS configuration - does not need to be changed for development +AWS_ACCESS_KEY_ID=test_key +AWS_SECRET_ACCESS_KEY=test_secret +AWS_DEFAULT_REGION=us-east-1 # General bucket used for TSV->DB ingestion and logging OPENVERSE_BUCKET=openverse-storage # Seconds to wait before poking for availability of the data refresh pool when running a data_refresh diff --git a/catalog/tests/dags/common/conftest.py b/catalog/tests/dags/common/conftest.py index c36f42a7209..240e2c46dfa 100644 --- a/catalog/tests/dags/common/conftest.py +++ b/catalog/tests/dags/common/conftest.py @@ -5,9 +5,9 @@ import pytest from catalog.tests.dags.common.loader.test_s3 import ( - ACCESS_KEY, + AWS_ACCESS_KEY_ID, S3_LOCAL_ENDPOINT, - SECRET_KEY, + AWS_SECRET_ACCESS_KEY, ) @@ -40,8 +40,8 @@ def empty_s3_bucket(request): print(f"{bucket_name=}") bucket = boto3.resource( "s3", - aws_access_key_id=ACCESS_KEY, - aws_secret_access_key=SECRET_KEY, + aws_access_key_id=AWS_ACCESS_KEY_ID, + aws_secret_access_key=AWS_SECRET_ACCESS_KEY, endpoint_url=S3_LOCAL_ENDPOINT, ).Bucket(bucket_name) diff --git a/catalog/tests/dags/common/loader/test_s3.py b/catalog/tests/dags/common/loader/test_s3.py index 46786727657..3581983a281 100644 --- a/catalog/tests/dags/common/loader/test_s3.py +++ b/catalog/tests/dags/common/loader/test_s3.py @@ -14,8 +14,8 @@ TEST_STAGING_PREFIX = "test_staging" S3_LOCAL_ENDPOINT = os.getenv("S3_LOCAL_ENDPOINT") S3_TEST_BUCKET = f"cccatalog-storage-{TEST_ID}" -ACCESS_KEY = os.getenv("AWS_ACCESS_KEY") -SECRET_KEY = os.getenv("AWS_SECRET_KEY") +AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID") +AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY") @pytest.fixture