diff --git a/catalog/dags/common/cloudwatch.py b/catalog/dags/common/cloudwatch.py deleted file mode 100644 index 658384f9c65..00000000000 --- a/catalog/dags/common/cloudwatch.py +++ /dev/null @@ -1,58 +0,0 @@ -""" -CloudwatchWrapper extracted partially from -https://github.com/awsdocs/aws-doc-sdk-examples/blob/54c3b82d8f9a12a862f9fcec44909829bda849af/python/example_code/cloudwatch/cloudwatch_basics.py -""" - -import logging -import boto3 -from botocore.exceptions import ClientError - -logger = logging.getLogger(__name__) - - -class CloudWatchWrapper: - """Encapsulates Amazon CloudWatch functions""" - - def __init__(self, cloudwatch_resource): - """:param cloudwatch_resource: A Boto3 CloudWatch resource.""" - self.cloudwatch_resource = cloudwatch_resource - - def enable_alarm_actions(self, alarm_name, enable): - """ - Enable or disable actions on the specified alarm. Alarm actions can be - used to send notifications or automate responses when an alarm enters a - particular state. - - :param alarm_name: The name of the alarm. - :param enable: When True, actions are enabled for the alarm. Otherwise, they - disabled. - """ - try: - alarm = self.cloudwatch_resource.Alarm(alarm_name) - if enable: - alarm.enable_actions() - else: - alarm.disable_actions() - logger.info( - "%s actions for alarm %s.", - "Enabled" if enable else "Disabled", - alarm_name, - ) - except ClientError: - logger.exception( - "Couldn't %s actions alarm %s.", - "enable" if enable else "disable", - alarm_name, - ) - raise - - -def enable_or_disable_alarms(enable): - cw_wrapper = CloudWatchWrapper(boto3.resource("cloudwatch")) - - sensitive_alarms_list = [ - "ES Production CPU utilization above 50%", - ] - - for alarm in sensitive_alarms_list: - cw_wrapper.enable_alarm_actions(alarm, enable) diff --git a/catalog/dags/data_refresh/data_refresh_task_factory.py b/catalog/dags/data_refresh/data_refresh_task_factory.py index e9a14bd981c..1328e82b31d 100644 --- a/catalog/dags/data_refresh/data_refresh_task_factory.py +++ b/catalog/dags/data_refresh/data_refresh_task_factory.py @@ -49,11 +49,10 @@ from collections.abc import Sequence from airflow.models.baseoperator import chain -from airflow.operators.python import PythonOperator from airflow.utils.task_group import TaskGroup from airflow.utils.trigger_rule import TriggerRule -from common import ingestion_server, cloudwatch +from common import ingestion_server from common.constants import XCOM_PULL_TEMPLATE from common.sensors.single_run_external_dags_sensor import SingleRunExternalDAGsSensor from common.sensors.utils import wait_for_external_dag @@ -132,13 +131,7 @@ def create_data_refresh_task_group( generate_index_suffix = ingestion_server.generate_index_suffix.override( trigger_rule=TriggerRule.NONE_FAILED, )() - - disable_alarms = PythonOperator( - task_id="disable_sensitive_cloudwatch_alarms", - python_callable=cloudwatch.enable_or_disable_alarms, - op_args=[False], - ) - tasks.append([generate_index_suffix, disable_alarms]) + tasks.append(generate_index_suffix) # Trigger the 'ingest_upstream' task on the ingestion server and await its # completion. This task copies the media table for the given model from the @@ -182,13 +175,6 @@ def create_data_refresh_task_group( # running against an index that is already promoted in production. tasks.append(create_filtered_index) - enable_alarms = PythonOperator( - task_id="enable_sensitive_cloudwatch_alarms", - python_callable=cloudwatch.enable_or_disable_alarms, - op_args=[True], - trigger_rule=TriggerRule.ALL_DONE, - ) - # Trigger the `promote` task on the ingestion server and await its completion. # This task promotes the newly created API DB table and elasticsearch index. It # does not include promotion of the filtered index, which must be promoted @@ -203,7 +189,7 @@ def create_data_refresh_task_group( }, timeout=data_refresh.data_refresh_timeout, ) - tasks.append([enable_alarms, promote_tasks]) + tasks.append(promote_tasks) # Delete the alias' previous target index, now unused. delete_old_index = ingestion_server.trigger_task( @@ -227,8 +213,7 @@ def create_data_refresh_task_group( # └─ create_filtered_index # └─ promote (trigger_promote + wait_for_promote) # └─ delete_old_index - # └─ promote_filtered_index (including delete filtered index) + - # enable_alarms + # └─ promote_filtered_index (including delete filtered index) chain(*tasks) return data_refresh_group diff --git a/catalog/env.template b/catalog/env.template index def19052cff..8e94e24716a 100644 --- a/catalog/env.template +++ b/catalog/env.template @@ -88,10 +88,9 @@ AIRFLOW_PORT=9090 LOADER_FILE_AGE=1 # Contact email for any APIs CONTACT_EMAIL=openverse@wordpress.org -# AWS configuration - does not need to be changed for development -AWS_ACCESS_KEY_ID=test_key -AWS_SECRET_ACCESS_KEY=test_secret -AWS_DEFAULT_REGION=us-east-1 +# AWS/S3 configuration - does not need to be changed for development +AWS_ACCESS_KEY=test_key +AWS_SECRET_KEY=test_secret # General bucket used for TSV->DB ingestion and logging OPENVERSE_BUCKET=openverse-storage # Seconds to wait before poking for availability of the data refresh pool when running a data_refresh diff --git a/catalog/tests/dags/common/conftest.py b/catalog/tests/dags/common/conftest.py index 240e2c46dfa..c36f42a7209 100644 --- a/catalog/tests/dags/common/conftest.py +++ b/catalog/tests/dags/common/conftest.py @@ -5,9 +5,9 @@ import pytest from catalog.tests.dags.common.loader.test_s3 import ( - AWS_ACCESS_KEY_ID, + ACCESS_KEY, S3_LOCAL_ENDPOINT, - AWS_SECRET_ACCESS_KEY, + SECRET_KEY, ) @@ -40,8 +40,8 @@ def empty_s3_bucket(request): print(f"{bucket_name=}") bucket = boto3.resource( "s3", - aws_access_key_id=AWS_ACCESS_KEY_ID, - aws_secret_access_key=AWS_SECRET_ACCESS_KEY, + aws_access_key_id=ACCESS_KEY, + aws_secret_access_key=SECRET_KEY, endpoint_url=S3_LOCAL_ENDPOINT, ).Bucket(bucket_name) diff --git a/catalog/tests/dags/common/loader/test_s3.py b/catalog/tests/dags/common/loader/test_s3.py index 3581983a281..46786727657 100644 --- a/catalog/tests/dags/common/loader/test_s3.py +++ b/catalog/tests/dags/common/loader/test_s3.py @@ -14,8 +14,8 @@ TEST_STAGING_PREFIX = "test_staging" S3_LOCAL_ENDPOINT = os.getenv("S3_LOCAL_ENDPOINT") S3_TEST_BUCKET = f"cccatalog-storage-{TEST_ID}" -AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID") -AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY") +ACCESS_KEY = os.getenv("AWS_ACCESS_KEY") +SECRET_KEY = os.getenv("AWS_SECRET_KEY") @pytest.fixture