diff --git a/dags/veda_data_pipeline/utils/build_stac/handler.py b/dags/veda_data_pipeline/utils/build_stac/handler.py index dd87bffa..984dfcaf 100644 --- a/dags/veda_data_pipeline/utils/build_stac/handler.py +++ b/dags/veda_data_pipeline/utils/build_stac/handler.py @@ -63,7 +63,24 @@ def handler(event: Dict[str, Any]) -> Union[S3LinkOutput, StacItemOutput]: try: stac_item = stac.generate_stac(parsed_event).to_dict() except Exception as ex: - out_err: StacItemOutput = {"stac_item": {"error": f"{ex}", "event": event}} + # Extract filename from first asset for better error reporting + filename = None + if event.get("assets"): + first_asset = next(iter(event["assets"].values()), {}) + href = first_asset.get("href", "") + filename = href.split("/")[-1] if href else None + + item_id = event.get("item_id", None) + logging.error(f"Failed to generate STAC for file: {filename} (item_id: {item_id}) - Error: {ex}") + + out_err: StacItemOutput = { + "stac_item": { + "error": f"{ex}", + "filename": filename, + "item_id": item_id, + "event": event + } + } return out_err output: StacItemOutput = {"stac_item": stac_item} @@ -140,12 +157,34 @@ def stac_handler(payload_src: dict, bucket_output, ti=None): if payload_failures: logging.warning("\n=== Error Breakdown ===") error_breakdown = {} + failed_files_by_error = {} # Track files per error type + for failure in payload_failures: error_msg = failure.get('error', 'Unknown error') error_breakdown[error_msg] = error_breakdown.get(error_msg, 0) + 1 + # Extract filename + filename = failure.get('filename', 'unknown') + if filename == 'unknown' and 'event' in failure: + # Fallback: extract from event if not in failure directly + assets = failure['event'].get('assets', {}) + if assets: + first_asset = next(iter(assets.values()), {}) + href = first_asset.get('href', '') + filename = href.split("/")[-1] if href else 'unknown' + + # Group filenames by error + if error_msg not in failed_files_by_error: + failed_files_by_error[error_msg] = [] + failed_files_by_error[error_msg].append(filename) + for error, count in error_breakdown.items(): logging.warning(f" - {error}: {count} occurrences") + # Show up to 5 example filenames per error + example_files = failed_files_by_error[error][:5] + logging.warning(f" Example files: {', '.join(example_files)}") + if len(failed_files_by_error[error]) > 5: + logging.warning(f" ... and {len(failed_files_by_error[error]) - 5} more") result = { "payload": { diff --git a/dags/veda_data_pipeline/utils/submit_stac.py b/dags/veda_data_pipeline/utils/submit_stac.py index bc0b2d3b..cc904179 100644 --- a/dags/veda_data_pipeline/utils/submit_stac.py +++ b/dags/veda_data_pipeline/utils/submit_stac.py @@ -1,4 +1,6 @@ import json +import logging +import math import os import sys from dataclasses import dataclass @@ -40,6 +42,7 @@ class Creds(TypedDict): token_type: str scope: str + @dataclass class IngestionApi: base_url: str @@ -86,16 +89,32 @@ def submit(self, event: Dict[str, Any], endpoint: str) -> Dict[str, Any]: "Authorization": f"Bearer {self.token}", "Content-Type": "application/json", } - response = requests.post( - f"{self.base_url.rstrip('/')}{endpoint}", - json=event, - headers=headers, - ) + + # Extract filename/item_id from the event for error reporting + item_id = event.get("id", None) + filename = None + if "assets" in event: + assets = event.get("assets", {}) + if assets: + first_asset = next(iter(assets.values()), {}) + href = first_asset.get("href", "") + filename = href.split("/")[-1] if href else None + try: + response = requests.post( + f"{self.base_url.rstrip('/')}{endpoint}", + json=event, + headers=headers, + ) response.raise_for_status() except Exception as e: - print(response.text) - raise e + logging.error(f"Failed to submit STAC item. Item ID: {item_id}, Filename: {filename}, Error: {type(e).__name__}: {e}") + # Log response text if it's an HTTP error + if hasattr(e, 'response'): + resp = getattr(e, 'response') + if hasattr(resp, 'text'): + logging.error(f"Response: {resp.text}") + raise return response.json()