From 7368d4815bd3f88825cbd7e85c35a40d6674c69d Mon Sep 17 00:00:00 2001 From: Kyle Lesinger Date: Thu, 9 Oct 2025 17:27:03 -0500 Subject: [PATCH 1/3] add error handling in stac creation and ingestion --- .../utils/build_stac/handler.py | 41 +++++++++++++- dags/veda_data_pipeline/utils/submit_stac.py | 55 ++++++++++++++++--- 2 files changed, 88 insertions(+), 8 deletions(-) diff --git a/dags/veda_data_pipeline/utils/build_stac/handler.py b/dags/veda_data_pipeline/utils/build_stac/handler.py index dd87bffa..8995ae6b 100644 --- a/dags/veda_data_pipeline/utils/build_stac/handler.py +++ b/dags/veda_data_pipeline/utils/build_stac/handler.py @@ -63,7 +63,24 @@ def handler(event: Dict[str, Any]) -> Union[S3LinkOutput, StacItemOutput]: try: stac_item = stac.generate_stac(parsed_event).to_dict() except Exception as ex: - out_err: StacItemOutput = {"stac_item": {"error": f"{ex}", "event": event}} + # Extract filename from first asset for better error reporting + filename = "unknown" + if event.get("assets"): + first_asset = next(iter(event["assets"].values()), {}) + href = first_asset.get("href", "") + filename = href.split("/")[-1] if href else "unknown" + + item_id = event.get("item_id", "unknown") + logging.error(f"Failed to generate STAC for file: {filename} (item_id: {item_id}) - Error: {ex}") + + out_err: StacItemOutput = { + "stac_item": { + "error": f"{ex}", + "filename": filename, + "item_id": item_id, + "event": event + } + } return out_err output: StacItemOutput = {"stac_item": stac_item} @@ -140,12 +157,34 @@ def stac_handler(payload_src: dict, bucket_output, ti=None): if payload_failures: logging.warning("\n=== Error Breakdown ===") error_breakdown = {} + failed_files_by_error = {} # Track files per error type + for failure in payload_failures: error_msg = failure.get('error', 'Unknown error') error_breakdown[error_msg] = error_breakdown.get(error_msg, 0) + 1 + # Extract filename + filename = failure.get('filename', 'unknown') + if filename == 'unknown' and 'event' in failure: + # Fallback: extract from event if not in failure directly + assets = failure['event'].get('assets', {}) + if assets: + first_asset = next(iter(assets.values()), {}) + href = first_asset.get('href', '') + filename = href.split("/")[-1] if href else 'unknown' + + # Group filenames by error + if error_msg not in failed_files_by_error: + failed_files_by_error[error_msg] = [] + failed_files_by_error[error_msg].append(filename) + for error, count in error_breakdown.items(): logging.warning(f" - {error}: {count} occurrences") + # Show up to 5 example filenames per error + example_files = failed_files_by_error[error][:5] + logging.warning(f" Example files: {', '.join(example_files)}") + if len(failed_files_by_error[error]) > 5: + logging.warning(f" ... and {len(failed_files_by_error[error]) - 5} more") result = { "payload": { diff --git a/dags/veda_data_pipeline/utils/submit_stac.py b/dags/veda_data_pipeline/utils/submit_stac.py index bc0b2d3b..99a9538e 100644 --- a/dags/veda_data_pipeline/utils/submit_stac.py +++ b/dags/veda_data_pipeline/utils/submit_stac.py @@ -1,4 +1,6 @@ import json +import logging +import math import os import sys from dataclasses import dataclass @@ -40,6 +42,29 @@ class Creds(TypedDict): token_type: str scope: str + +def sanitize_for_json(obj: Any) -> Any: + """ + Recursively sanitize an object by replacing inf and NaN float values with None. + This ensures the object can be JSON serialized without errors. + + Args: + obj: Any Python object (dict, list, float, etc.) + + Returns: + Sanitized object with inf/NaN replaced by None + """ + if isinstance(obj, dict): + return {key: sanitize_for_json(value) for key, value in obj.items()} + elif isinstance(obj, list): + return [sanitize_for_json(item) for item in obj] + elif isinstance(obj, float): + if math.isnan(obj) or math.isinf(obj): + return None + return obj + else: + return obj + @dataclass class IngestionApi: base_url: str @@ -86,16 +111,32 @@ def submit(self, event: Dict[str, Any], endpoint: str) -> Dict[str, Any]: "Authorization": f"Bearer {self.token}", "Content-Type": "application/json", } - response = requests.post( - f"{self.base_url.rstrip('/')}{endpoint}", - json=event, - headers=headers, - ) + + # Extract filename/item_id from the event for error reporting + item_id = event.get("id", "unknown") + filename = "unknown" + if "assets" in event: + assets = event.get("assets", {}) + if assets: + first_asset = next(iter(assets.values()), {}) + href = first_asset.get("href", "") + filename = href.split("/")[-1] if href else "unknown" + try: + response = requests.post( + f"{self.base_url.rstrip('/')}{endpoint}", + json=event, + headers=headers, + ) response.raise_for_status() except Exception as e: - print(response.text) - raise e + logging.error(f"Failed to submit STAC item. Item ID: {item_id}, Filename: {filename}, Error: {type(e).__name__}: {e}") + # Log response text if it's an HTTP error + if hasattr(e, 'response'): + resp = getattr(e, 'response') + if hasattr(resp, 'text'): + logging.error(f"Response: {resp.text}") + raise return response.json() From 294676661aa9011a06e34cb12a681fcdd0f10ecf Mon Sep 17 00:00:00 2001 From: Kyle Lesinger Date: Thu, 9 Oct 2025 17:50:24 -0500 Subject: [PATCH 2/3] update submit_stac --- dags/veda_data_pipeline/utils/submit_stac.py | 22 -------------------- 1 file changed, 22 deletions(-) diff --git a/dags/veda_data_pipeline/utils/submit_stac.py b/dags/veda_data_pipeline/utils/submit_stac.py index 99a9538e..bc729984 100644 --- a/dags/veda_data_pipeline/utils/submit_stac.py +++ b/dags/veda_data_pipeline/utils/submit_stac.py @@ -43,28 +43,6 @@ class Creds(TypedDict): scope: str -def sanitize_for_json(obj: Any) -> Any: - """ - Recursively sanitize an object by replacing inf and NaN float values with None. - This ensures the object can be JSON serialized without errors. - - Args: - obj: Any Python object (dict, list, float, etc.) - - Returns: - Sanitized object with inf/NaN replaced by None - """ - if isinstance(obj, dict): - return {key: sanitize_for_json(value) for key, value in obj.items()} - elif isinstance(obj, list): - return [sanitize_for_json(item) for item in obj] - elif isinstance(obj, float): - if math.isnan(obj) or math.isinf(obj): - return None - return obj - else: - return obj - @dataclass class IngestionApi: base_url: str From 06c1af426fdf9a8f6185fbbfb122bbc36ceeb8fa Mon Sep 17 00:00:00 2001 From: ividito Date: Wed, 14 Jan 2026 11:56:16 -0800 Subject: [PATCH 3/3] fix: replace "unknown" with None --- dags/veda_data_pipeline/utils/build_stac/handler.py | 6 +++--- dags/veda_data_pipeline/utils/submit_stac.py | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/dags/veda_data_pipeline/utils/build_stac/handler.py b/dags/veda_data_pipeline/utils/build_stac/handler.py index 8995ae6b..984dfcaf 100644 --- a/dags/veda_data_pipeline/utils/build_stac/handler.py +++ b/dags/veda_data_pipeline/utils/build_stac/handler.py @@ -64,13 +64,13 @@ def handler(event: Dict[str, Any]) -> Union[S3LinkOutput, StacItemOutput]: stac_item = stac.generate_stac(parsed_event).to_dict() except Exception as ex: # Extract filename from first asset for better error reporting - filename = "unknown" + filename = None if event.get("assets"): first_asset = next(iter(event["assets"].values()), {}) href = first_asset.get("href", "") - filename = href.split("/")[-1] if href else "unknown" + filename = href.split("/")[-1] if href else None - item_id = event.get("item_id", "unknown") + item_id = event.get("item_id", None) logging.error(f"Failed to generate STAC for file: {filename} (item_id: {item_id}) - Error: {ex}") out_err: StacItemOutput = { diff --git a/dags/veda_data_pipeline/utils/submit_stac.py b/dags/veda_data_pipeline/utils/submit_stac.py index bc729984..cc904179 100644 --- a/dags/veda_data_pipeline/utils/submit_stac.py +++ b/dags/veda_data_pipeline/utils/submit_stac.py @@ -91,14 +91,14 @@ def submit(self, event: Dict[str, Any], endpoint: str) -> Dict[str, Any]: } # Extract filename/item_id from the event for error reporting - item_id = event.get("id", "unknown") - filename = "unknown" + item_id = event.get("id", None) + filename = None if "assets" in event: assets = event.get("assets", {}) if assets: first_asset = next(iter(assets.values()), {}) href = first_asset.get("href", "") - filename = href.split("/")[-1] if href else "unknown" + filename = href.split("/")[-1] if href else None try: response = requests.post(