From d37cd3d99b03edcadc5ce3c247eb12d49a864a8e Mon Sep 17 00:00:00 2001 From: Krystle Salazar Date: Tue, 11 Jun 2024 17:36:36 -0400 Subject: [PATCH 1/7] Save files of cleaned data to temporary directory and remove extra single quotes in values --- ingestion_server/ingestion_server/cleanup.py | 36 ++++++++++++-------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/ingestion_server/ingestion_server/cleanup.py b/ingestion_server/ingestion_server/cleanup.py index 14ba9063d3a..35cba3f4856 100644 --- a/ingestion_server/ingestion_server/cleanup.py +++ b/ingestion_server/ingestion_server/cleanup.py @@ -7,6 +7,7 @@ import csv import logging as log import multiprocessing +import pathlib import time import uuid from urllib.parse import urlparse @@ -64,6 +65,8 @@ "collections.musee-mccord.qc.ca": False, } +TMP_DIR = pathlib.Path("/tmp/cleaned_data").resolve() + def _tag_denylisted(tag): """Check if a tag is banned or contains a banned substring.""" @@ -106,9 +109,9 @@ def cleanup_url(url, tls_support): log.debug(f"Tested domain {_tld}") if tls_supported: - return f"'https://{url}'" + return f"https://{url}" else: - return f"'http://{url}'" + return f"http://{url}" else: return None @@ -141,6 +144,7 @@ def cleanup_tags(tags): if update_required: fragment = Json(tag_output) + log.debug(f"Tags fragment: {fragment}") return fragment else: return None @@ -200,7 +204,7 @@ def test_tls_supported(cls, url): https = url.replace("http://", "https://") try: res = re.get(https, timeout=2) - log.info(f"{https}:{res.status_code}") + log.info(f"tls_test - {https}:{res.status_code}") return 200 <= res.status_code < 400 except re.RequestException: return False @@ -243,23 +247,25 @@ def _clean_data_worker(rows, temp_table, sources_config, all_fields: list[str]): if clean: cleaned_data[update_field] = clean log.debug( - f"Updated {update_field} for {identifier} " - f"from '{dirty_value}' to '{clean}'" + f"Updated {update_field} for {identifier}\n\t" + f"from '{dirty_value}' \n\tto '{clean}'" ) # Generate SQL update for all the fields we just cleaned update_field_expressions = [] for field, clean_value in cleaned_data.items(): - update_field_expressions.append(f"{field} = {clean_value}") - # Save cleaned values for later - # (except for tags, which take up too much space) if field == "tags": + update_field_expressions.append(f"{field} = {clean_value}") continue + update_field_expressions.append(f"{field} = '{clean_value}'") + # Save cleaned values for later + # (except for tags, which take up too much space) cleaned_values[field].append((identifier, clean_value)) if len(update_field_expressions) > 0: update_query = f"""UPDATE {temp_table} SET {', '.join(update_field_expressions)} WHERE id = {_id} """ + log.debug(f"Executing update query: \n\t{update_query}") write_cur.execute(update_query) log.info(f"TLS cache: {TLS_CACHE}") log.info("Worker committing changes...") @@ -273,18 +279,17 @@ def _clean_data_worker(rows, temp_table, sources_config, all_fields: list[str]): def save_cleaned_data(result: dict) -> dict[str, int]: - log.info("Saving cleaned data...") start_time = time.perf_counter() cleanup_counts = {field: len(items) for field, items in result.items()} for field, cleaned_items in result.items(): # Skip the tag field because the file is too large and fills up the disk - if field == "tag": + if field == "tag" or not cleaned_items: continue - if cleaned_items: - with open(f"{field}.tsv", "a") as f: - csv_writer = csv.writer(f, delimiter="\t") - csv_writer.writerows(cleaned_items) + + with open(TMP_DIR.joinpath(f"{field}.tsv"), "a", encoding="utf-8") as f: + csv_writer = csv.writer(f, delimiter="\t") + csv_writer.writerows(cleaned_items) end_time = time.perf_counter() total_time = end_time - start_time @@ -300,6 +305,9 @@ def clean_image_data(table): :return: None """ + # Create directory to store cleaned data temporarily + TMP_DIR.mkdir(parents=True, exist_ok=True) + # Map each table to the fields that need to be cleaned up. Then, map each # field to its cleanup function. log.info("Cleaning up data...") From ab4155148eb02b0313a56617cf1909a2059bb032 Mon Sep 17 00:00:00 2001 From: Krystle Salazar Date: Tue, 11 Jun 2024 17:54:10 -0400 Subject: [PATCH 2/7] Recreate temporary directory before cleaning --- ingestion_server/ingestion_server/cleanup.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/ingestion_server/ingestion_server/cleanup.py b/ingestion_server/ingestion_server/cleanup.py index 35cba3f4856..dc183a3dda7 100644 --- a/ingestion_server/ingestion_server/cleanup.py +++ b/ingestion_server/ingestion_server/cleanup.py @@ -8,6 +8,7 @@ import logging as log import multiprocessing import pathlib +import shutil import time import uuid from urllib.parse import urlparse @@ -305,8 +306,9 @@ def clean_image_data(table): :return: None """ - # Create directory to store cleaned data temporarily - TMP_DIR.mkdir(parents=True, exist_ok=True) + # Recreate directory where cleaned data is stored + shutil.rmtree(TMP_DIR, ignore_errors=True) + TMP_DIR.mkdir(parents=True) # Map each table to the fields that need to be cleaned up. Then, map each # field to its cleanup function. From 4dd8e2cb49cd4efb651b93402a5359d9e1ef6aa7 Mon Sep 17 00:00:00 2001 From: Krystle Salazar Date: Tue, 11 Jun 2024 18:00:19 -0400 Subject: [PATCH 3/7] Add stocksnap to TLS_CACHE --- ingestion_server/ingestion_server/cleanup.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ingestion_server/ingestion_server/cleanup.py b/ingestion_server/ingestion_server/cleanup.py index dc183a3dda7..d27e023b8e9 100644 --- a/ingestion_server/ingestion_server/cleanup.py +++ b/ingestion_server/ingestion_server/cleanup.py @@ -64,6 +64,8 @@ "www.eol.org": True, ".digitaltmuseum.org": True, "collections.musee-mccord.qc.ca": False, + ".stocksnap.io": True, + "cdn.stocksnap.io": True, } TMP_DIR = pathlib.Path("/tmp/cleaned_data").resolve() From b4172e497e8dff141ef03c01957a0e269c8654de Mon Sep 17 00:00:00 2001 From: Krystle Salazar Date: Tue, 11 Jun 2024 18:42:09 -0400 Subject: [PATCH 4/7] Fix test --- ingestion_server/test/unit_tests/test_cleanup.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ingestion_server/test/unit_tests/test_cleanup.py b/ingestion_server/test/unit_tests/test_cleanup.py index 696a873c065..02cc6d047e2 100644 --- a/ingestion_server/test/unit_tests/test_cleanup.py +++ b/ingestion_server/test/unit_tests/test_cleanup.py @@ -47,12 +47,12 @@ def test_url_protocol_fix(): tls_support_cache = {} pook.get("https://flickr.com").reply(200) result = CleanupFunctions.cleanup_url(bad_url, tls_support_cache) - expected = "'https://flickr.com'" + expected = "https://flickr.com" bad_http = "neverssl.com" pook.get("https://neverssl.com").reply(500) result_http = CleanupFunctions.cleanup_url(bad_http, tls_support_cache) - expected_http = "'http://neverssl.com'" + expected_http = "http://neverssl.com" assert result == expected assert result_http == expected_http From 97a721a87cc2c23f0b60bf9a98bd8163ed8842f0 Mon Sep 17 00:00:00 2001 From: Krystle Salazar Date: Tue, 11 Jun 2024 17:36:36 -0400 Subject: [PATCH 5/7] Upload files to AWS S3 (without tags) --- ingestion_server/ingestion_server/cleanup.py | 40 ++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/ingestion_server/ingestion_server/cleanup.py b/ingestion_server/ingestion_server/cleanup.py index d27e023b8e9..12f5f4ce84c 100644 --- a/ingestion_server/ingestion_server/cleanup.py +++ b/ingestion_server/ingestion_server/cleanup.py @@ -13,8 +13,10 @@ import uuid from urllib.parse import urlparse +import boto3 import requests as re import tldextract +from decouple import config from psycopg2.extras import DictCursor, Json from ingestion_server.db_helpers import database_connect @@ -300,6 +302,43 @@ def save_cleaned_data(result: dict) -> dict[str, int]: return cleanup_counts +def _upload_to_s3(fields): + """ + Upload cleaned data to S3. It assumes that the bucket already exists. + + Locally, it connects to a MinIO instance through its endpoint and test credentials. + On live environments, the connection is allowed via IAM roles. + """ + bucket_name = config("OPENVERSE_BUCKET", default="openverse-catalog") + s3_path = "shared/data-refresh-cleaned-data" + try: + s3 = boto3.resource( + "s3", + endpoint_url=config("AWS_S3_ENDPOINT", default=None), + aws_access_key_id=config("AWS_ACCESS_KEY_ID", default=None), + aws_secret_access_key=config("AWS_SECRET_ACCESS_KEY", default=None), + region_name=config("AWS_REGION", default=None), + ) + s3.meta.client.head_bucket(Bucket=bucket_name) + bucket = s3.Bucket(bucket_name) + log.info(f"Connected to S3 and '{bucket_name}' bucket loaded.") + except Exception as e: + log.error(f"Files upload failed. Error connecting to S3.\n{e}") + return + + for field in fields: + file_path = TMP_DIR.joinpath(f"{field}.tsv") + if not file_path.exists(): + continue + + try: + bucket.upload_file(file_path, f"{s3_path}/{field}.tsv") + log.info(f"Uploaded '{field}.tsv' to S3.") + file_path.unlink() + except Exception as e: + log.error(f"Error uploading '{field}.tsv' to S3: {e}") + + def clean_image_data(table): """ Clean up data loaded from upstream that is unsuitable for prod before going live. @@ -395,6 +434,7 @@ def clean_image_data(table): conn.commit() iter_cur.close() conn.close() + _upload_to_s3(cleanable_fields_for_table) end_time = time.perf_counter() cleanup_time = end_time - start_time log.info( From 884f740293e580550040cba2e6d7ea5ee101f647 Mon Sep 17 00:00:00 2001 From: Krystle Salazar Date: Thu, 20 Jun 2024 15:33:13 -0400 Subject: [PATCH 6/7] Add default values to AWS variables in env.template --- ingestion_server/env.template | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/ingestion_server/env.template b/ingestion_server/env.template index faf9be597a6..6cee2aafabd 100644 --- a/ingestion_server/env.template +++ b/ingestion_server/env.template @@ -4,8 +4,10 @@ PYTHONUNBUFFERED="0" #ENVIRONMENT="local" -#AWS_ACCESS_KEY_ID="" -#AWS_SECRET_ACCESS_KEY="" +#AWS_REGION="us-east-1" +#AWS_ACCESS_KEY_ID="test_key" +#AWS_SECRET_ACCESS_KEY="test_secret" +#AWS_S3_ENDPOINT="http://s3:5000" #ELASTICSEARCH_URL="es" #ELASTICSEARCH_PORT="9200" From eb790750bc6437e88c3381f096e247e1f488ae68 Mon Sep 17 00:00:00 2001 From: Krystle Salazar Date: Thu, 27 Jun 2024 17:59:40 -0400 Subject: [PATCH 7/7] Add explanatory comment for skipped non-existing files --- ingestion_server/ingestion_server/cleanup.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ingestion_server/ingestion_server/cleanup.py b/ingestion_server/ingestion_server/cleanup.py index 29235be05a0..d167eeb743d 100644 --- a/ingestion_server/ingestion_server/cleanup.py +++ b/ingestion_server/ingestion_server/cleanup.py @@ -331,6 +331,9 @@ def _upload_to_s3(fields): for field in fields: file_path = TMP_DIR.joinpath(f"{field}.tsv") if not file_path.exists(): + # Once the data has been cleaned in `upstream,` the cleaning process will + # not generate these files. Also, tags never generate any (refer to the + # `_clean_data_worker` function). continue try: