Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upload Ingestion Server's TSV files to AWS S3 (skip tags) #4529

Merged
merged 8 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions ingestion_server/env.template
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ PYTHONUNBUFFERED="0"

#ENVIRONMENT="local"

#AWS_ACCESS_KEY_ID=""
#AWS_SECRET_ACCESS_KEY=""
#AWS_REGION="us-east-1"
#AWS_ACCESS_KEY_ID="test_key"
#AWS_SECRET_ACCESS_KEY="test_secret"
#AWS_S3_ENDPOINT="http://s3:5000"

#ELASTICSEARCH_URL="es"
#ELASTICSEARCH_PORT="9200"
Expand Down
80 changes: 66 additions & 14 deletions ingestion_server/ingestion_server/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@
import csv
import logging as log
import multiprocessing
import pathlib
import shutil
import time
import uuid
from urllib.parse import urlparse

import boto3
import requests as re
import tldextract
from decouple import config
from psycopg2.extras import DictCursor, Json

from ingestion_server.db_helpers import database_connect
Expand Down Expand Up @@ -62,8 +66,12 @@
"www.eol.org": True,
".digitaltmuseum.org": True,
"collections.musee-mccord.qc.ca": False,
".stocksnap.io": True,
"cdn.stocksnap.io": True,
}

TMP_DIR = pathlib.Path("/tmp/cleaned_data").resolve()


def _tag_denylisted(tag):
"""Check if a tag is banned or contains a banned substring."""
Expand Down Expand Up @@ -106,9 +114,9 @@ def cleanup_url(url, tls_support):
log.debug(f"Tested domain {_tld}")

if tls_supported:
return f"'https://{url}'"
return f"https://{url}"
else:
return f"'http://{url}'"
return f"http://{url}"
else:
return None

Expand Down Expand Up @@ -141,6 +149,7 @@ def cleanup_tags(tags):

if update_required:
fragment = Json(tag_output)
log.debug(f"Tags fragment: {fragment}")
return fragment
else:
return None
Expand Down Expand Up @@ -200,7 +209,7 @@ def test_tls_supported(cls, url):
https = url.replace("http://", "https://")
try:
res = re.get(https, timeout=2)
log.info(f"{https}:{res.status_code}")
log.info(f"tls_test - {https}:{res.status_code}")
return 200 <= res.status_code < 400
except re.RequestException:
return False
Expand Down Expand Up @@ -243,23 +252,25 @@ def _clean_data_worker(rows, temp_table, sources_config, all_fields: list[str]):
if clean:
cleaned_data[update_field] = clean
log.debug(
f"Updated {update_field} for {identifier} "
f"from '{dirty_value}' to '{clean}'"
f"Updated {update_field} for {identifier}\n\t"
f"from '{dirty_value}' \n\tto '{clean}'"
)
# Generate SQL update for all the fields we just cleaned
update_field_expressions = []
for field, clean_value in cleaned_data.items():
update_field_expressions.append(f"{field} = {clean_value}")
# Save cleaned values for later
# (except for tags, which take up too much space)
if field == "tags":
update_field_expressions.append(f"{field} = {clean_value}")
continue
update_field_expressions.append(f"{field} = '{clean_value}'")
# Save cleaned values for later
# (except for tags, which take up too much space)
cleaned_values[field].append((identifier, clean_value))

if len(update_field_expressions) > 0:
update_query = f"""UPDATE {temp_table} SET
{', '.join(update_field_expressions)} WHERE id = {_id}
"""
log.debug(f"Executing update query: \n\t{update_query}")
write_cur.execute(update_query)
log.info(f"TLS cache: {TLS_CACHE}")
log.info("Worker committing changes...")
Expand All @@ -273,25 +284,61 @@ def _clean_data_worker(rows, temp_table, sources_config, all_fields: list[str]):


def save_cleaned_data(result: dict) -> dict[str, int]:
log.info("Saving cleaned data...")
start_time = time.perf_counter()

cleanup_counts = {field: len(items) for field, items in result.items()}
for field, cleaned_items in result.items():
# Skip the tag field because the file is too large and fills up the disk
if field == "tag":
if field == "tag" or not cleaned_items:
continue
if cleaned_items:
with open(f"{field}.tsv", "a") as f:
csv_writer = csv.writer(f, delimiter="\t")
csv_writer.writerows(cleaned_items)

with open(TMP_DIR.joinpath(f"{field}.tsv"), "a", encoding="utf-8") as f:
csv_writer = csv.writer(f, delimiter="\t")
csv_writer.writerows(cleaned_items)

end_time = time.perf_counter()
total_time = end_time - start_time
log.info(f"Finished saving cleaned data in {total_time:.3f},\n{cleanup_counts}")
return cleanup_counts


def _upload_to_s3(fields):
"""
Upload cleaned data to S3. It assumes that the bucket already exists.

Locally, it connects to a MinIO instance through its endpoint and test credentials.
On live environments, the connection is allowed via IAM roles.
"""
bucket_name = config("OPENVERSE_BUCKET", default="openverse-catalog")
s3_path = "shared/data-refresh-cleaned-data"
try:
s3 = boto3.resource(
"s3",
endpoint_url=config("AWS_S3_ENDPOINT", default=None),
aws_access_key_id=config("AWS_ACCESS_KEY_ID", default=None),
aws_secret_access_key=config("AWS_SECRET_ACCESS_KEY", default=None),
region_name=config("AWS_REGION", default=None),
)
s3.meta.client.head_bucket(Bucket=bucket_name)
bucket = s3.Bucket(bucket_name)
log.info(f"Connected to S3 and '{bucket_name}' bucket loaded.")
except Exception as e:
log.error(f"Files upload failed. Error connecting to S3.\n{e}")
return

for field in fields:
file_path = TMP_DIR.joinpath(f"{field}.tsv")
if not file_path.exists():
continue
Copy link
Collaborator

@sarayourfriend sarayourfriend Jun 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment with the explanation you shared in the other PR for why this is an expected and good condition (not an error)? Otherwise, it's definitely not clear.

Although, I'm actually still not clear.

I understand tags won't have a file. That's fine, but then can we explicitly skip it in the list of files we are looking at? That would be much clearer for that case and implicitly document that we know the tags file does not exist.

However: I don't understand the bit about how these files will over time no longer exist. Why would that happen? Once we've applied the fix upstream, is there a point where the data refresh would have "half fixed" data, and some of the cleaning would stop? That's fine, but I just wanted to clarify, because again it's not clear based on this code, but the condition is a starkly significant one with no explanation, but apparently matters quite a bit (with many implementation details behind the reason for it).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the bit about how these files will over time no longer exist. Why would that happen? Once we've applied the fix upstream, is there a point where the data refresh would have "half fixed" data, and some of the cleaning would stop?

The cleaning steps will remain for a time, let's say two or one data refresh process, until we're confident that nothing is left pending in that ETL. The cleaning won't stop midway by itself, so we'd need to catch these cases when the files aren't produced anymore.

Thanks for raising the flag here. It was clear to me, but now I see I was assuming many things. I added a comment that hopes to add more context to it.


try:
bucket.upload_file(file_path, f"{s3_path}/{field}.tsv")
log.info(f"Uploaded '{field}.tsv' to S3.")
file_path.unlink()
except Exception as e:
log.error(f"Error uploading '{field}.tsv' to S3: {e}")


def clean_image_data(table):
"""
Clean up data loaded from upstream that is unsuitable for prod before going live.
Expand All @@ -300,6 +347,10 @@ def clean_image_data(table):
:return: None
"""

# Recreate directory where cleaned data is stored
shutil.rmtree(TMP_DIR, ignore_errors=True)
TMP_DIR.mkdir(parents=True)

# Map each table to the fields that need to be cleaned up. Then, map each
# field to its cleanup function.
log.info("Cleaning up data...")
Expand Down Expand Up @@ -383,6 +434,7 @@ def clean_image_data(table):
conn.commit()
iter_cur.close()
conn.close()
_upload_to_s3(cleanable_fields_for_table)
end_time = time.perf_counter()
cleanup_time = end_time - start_time
log.info(
Expand Down
4 changes: 2 additions & 2 deletions ingestion_server/test/unit_tests/test_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ def test_url_protocol_fix():
tls_support_cache = {}
pook.get("https://flickr.com").reply(200)
result = CleanupFunctions.cleanup_url(bad_url, tls_support_cache)
expected = "'https://flickr.com'"
expected = "https://flickr.com"

bad_http = "neverssl.com"
pook.get("https://neverssl.com").reply(500)
result_http = CleanupFunctions.cleanup_url(bad_http, tls_support_cache)
expected_http = "'http://neverssl.com'"
expected_http = "http://neverssl.com"
assert result == expected
assert result_http == expected_http

Expand Down
Loading