|
13 | 13 | import uuid
|
14 | 14 | from urllib.parse import urlparse
|
15 | 15 |
|
| 16 | +import boto3 |
16 | 17 | import requests as re
|
17 | 18 | import tldextract
|
| 19 | +from decouple import config |
18 | 20 | from psycopg2.extras import DictCursor, Json
|
19 | 21 |
|
20 | 22 | from ingestion_server.db_helpers import database_connect
|
@@ -207,7 +209,7 @@ def test_tls_supported(cls, url):
|
207 | 209 | https = url.replace("http://", "https://")
|
208 | 210 | try:
|
209 | 211 | res = re.get(https, timeout=2)
|
210 |
| - log.debug(f"tls_test - {https}:{res.status_code}") |
| 212 | + log.info(f"tls_test - {https}:{res.status_code}") |
211 | 213 | return 200 <= res.status_code < 400
|
212 | 214 | except re.RequestException:
|
213 | 215 | return False
|
@@ -300,6 +302,45 @@ def save_cleaned_data(result: dict) -> dict[str, int]:
|
300 | 302 | return cleanup_counts
|
301 | 303 |
|
302 | 304 |
|
| 305 | +def _get_s3_resource(): |
| 306 | + if config("ENVIRONMENT", default="local") == "local": |
| 307 | + return boto3.resource( |
| 308 | + "s3", |
| 309 | + endpoint_url=config("AWS_S3_ENDPOINT", default="http://s3:5000"), |
| 310 | + aws_access_key_id=config("AWS_ACCESS_KEY_ID", default="test_key"), |
| 311 | + aws_secret_access_key=config( |
| 312 | + "AWS_SECRET_ACCESS_KEY", default="test_secret" |
| 313 | + ), |
| 314 | + ) |
| 315 | + |
| 316 | + return boto3.resource("s3", region_name=config("AWS_REGION", default="us-east-1")) |
| 317 | + |
| 318 | + |
| 319 | +def _upload_to_s3(fields): |
| 320 | + bucket_name = config("OPENVERSE_BUCKET", default="openverse-catalog") |
| 321 | + s3_path = "shared/data-refresh-cleaned-data" |
| 322 | + fields = [f for f in fields if f != "tags"] |
| 323 | + try: |
| 324 | + s3 = _get_s3_resource() |
| 325 | + bucket = s3.Bucket(bucket_name) |
| 326 | + bucket.load() |
| 327 | + log.info(f"Connected to S3 and '{bucket_name}' bucket loaded.") |
| 328 | + except Exception as e: |
| 329 | + log.error(f"Upload failed. Error connecting to S3 or loading bucket: {e}") |
| 330 | + return |
| 331 | + |
| 332 | + for field in fields: |
| 333 | + file_path = TMP_DIR.joinpath(f"{field}.tsv") |
| 334 | + if not file_path.exists(): |
| 335 | + return |
| 336 | + |
| 337 | + try: |
| 338 | + bucket.upload_file(file_path, f"{s3_path}/{field}.tsv") |
| 339 | + log.info(f"Uploaded '{field}.tsv' to S3.") |
| 340 | + except Exception as e: |
| 341 | + log.error(f"Error uploading '{field}.tsv' to S3: {e}") |
| 342 | + |
| 343 | + |
303 | 344 | def clean_image_data(table):
|
304 | 345 | """
|
305 | 346 | Clean up data loaded from upstream that is unsuitable for prod before going live.
|
@@ -395,6 +436,7 @@ def clean_image_data(table):
|
395 | 436 | conn.commit()
|
396 | 437 | iter_cur.close()
|
397 | 438 | conn.close()
|
| 439 | + _upload_to_s3(cleanable_fields_for_table) |
398 | 440 | end_time = time.perf_counter()
|
399 | 441 | cleanup_time = end_time - start_time
|
400 | 442 | log.info(
|
|
0 commit comments