diff --git a/ingestion_server/env.template b/ingestion_server/env.template index faf9be597a6..6cee2aafabd 100644 --- a/ingestion_server/env.template +++ b/ingestion_server/env.template @@ -4,8 +4,10 @@ PYTHONUNBUFFERED="0" #ENVIRONMENT="local" -#AWS_ACCESS_KEY_ID="" -#AWS_SECRET_ACCESS_KEY="" +#AWS_REGION="us-east-1" +#AWS_ACCESS_KEY_ID="test_key" +#AWS_SECRET_ACCESS_KEY="test_secret" +#AWS_S3_ENDPOINT="http://s3:5000" #ELASTICSEARCH_URL="es" #ELASTICSEARCH_PORT="9200" diff --git a/ingestion_server/ingestion_server/cleanup.py b/ingestion_server/ingestion_server/cleanup.py index 0fe8141de9a..d167eeb743d 100644 --- a/ingestion_server/ingestion_server/cleanup.py +++ b/ingestion_server/ingestion_server/cleanup.py @@ -13,8 +13,10 @@ import uuid from urllib.parse import urlparse +import boto3 import requests as re import tldextract +from decouple import config from psycopg2.extras import DictCursor, Json from ingestion_server.db_helpers import database_connect @@ -302,6 +304,46 @@ def save_cleaned_data(result: dict) -> dict[str, int]: return cleanup_counts +def _upload_to_s3(fields): + """ + Upload cleaned data to S3. It assumes that the bucket already exists. + + Locally, it connects to a MinIO instance through its endpoint and test credentials. + On live environments, the connection is allowed via IAM roles. + """ + bucket_name = config("OPENVERSE_BUCKET", default="openverse-catalog") + s3_path = "shared/data-refresh-cleaned-data" + try: + s3 = boto3.resource( + "s3", + endpoint_url=config("AWS_S3_ENDPOINT", default=None), + aws_access_key_id=config("AWS_ACCESS_KEY_ID", default=None), + aws_secret_access_key=config("AWS_SECRET_ACCESS_KEY", default=None), + region_name=config("AWS_REGION", default=None), + ) + s3.meta.client.head_bucket(Bucket=bucket_name) + bucket = s3.Bucket(bucket_name) + log.info(f"Connected to S3 and '{bucket_name}' bucket loaded.") + except Exception as e: + log.error(f"Files upload failed. Error connecting to S3.\n{e}") + return + + for field in fields: + file_path = TMP_DIR.joinpath(f"{field}.tsv") + if not file_path.exists(): + # Once the data has been cleaned in `upstream,` the cleaning process will + # not generate these files. Also, tags never generate any (refer to the + # `_clean_data_worker` function). + continue + + try: + bucket.upload_file(file_path, f"{s3_path}/{field}.tsv") + log.info(f"Uploaded '{field}.tsv' to S3.") + file_path.unlink() + except Exception as e: + log.error(f"Error uploading '{field}.tsv' to S3: {e}") + + def clean_image_data(table): """ Clean up data loaded from upstream that is unsuitable for prod before going live. @@ -397,6 +439,7 @@ def clean_image_data(table): conn.commit() iter_cur.close() conn.close() + _upload_to_s3(cleanable_fields_for_table) end_time = time.perf_counter() cleanup_time = end_time - start_time log.info(