Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion dags/veda_data_pipeline/utils/build_stac/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,24 @@ def handler(event: Dict[str, Any]) -> Union[S3LinkOutput, StacItemOutput]:
try:
stac_item = stac.generate_stac(parsed_event).to_dict()
except Exception as ex:
out_err: StacItemOutput = {"stac_item": {"error": f"{ex}", "event": event}}
# Extract filename from first asset for better error reporting
filename = None
if event.get("assets"):
first_asset = next(iter(event["assets"].values()), {})
href = first_asset.get("href", "")
filename = href.split("/")[-1] if href else None

item_id = event.get("item_id", None)
logging.error(f"Failed to generate STAC for file: {filename} (item_id: {item_id}) - Error: {ex}")

out_err: StacItemOutput = {
"stac_item": {
"error": f"{ex}",
"filename": filename,
"item_id": item_id,
"event": event
}
}
return out_err

output: StacItemOutput = {"stac_item": stac_item}
Expand Down Expand Up @@ -140,12 +157,34 @@ def stac_handler(payload_src: dict, bucket_output, ti=None):
if payload_failures:
logging.warning("\n=== Error Breakdown ===")
error_breakdown = {}
failed_files_by_error = {} # Track files per error type

for failure in payload_failures:
error_msg = failure.get('error', 'Unknown error')
error_breakdown[error_msg] = error_breakdown.get(error_msg, 0) + 1

# Extract filename
filename = failure.get('filename', 'unknown')
if filename == 'unknown' and 'event' in failure:
# Fallback: extract from event if not in failure directly
assets = failure['event'].get('assets', {})
if assets:
first_asset = next(iter(assets.values()), {})
href = first_asset.get('href', '')
filename = href.split("/")[-1] if href else 'unknown'

# Group filenames by error
if error_msg not in failed_files_by_error:
failed_files_by_error[error_msg] = []
failed_files_by_error[error_msg].append(filename)

for error, count in error_breakdown.items():
logging.warning(f" - {error}: {count} occurrences")
# Show up to 5 example filenames per error
example_files = failed_files_by_error[error][:5]
logging.warning(f" Example files: {', '.join(example_files)}")
if len(failed_files_by_error[error]) > 5:
logging.warning(f" ... and {len(failed_files_by_error[error]) - 5} more")

result = {
"payload": {
Expand Down
33 changes: 26 additions & 7 deletions dags/veda_data_pipeline/utils/submit_stac.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import json
import logging
import math
import os
import sys
from dataclasses import dataclass
Expand Down Expand Up @@ -40,6 +42,7 @@ class Creds(TypedDict):
token_type: str
scope: str


@dataclass
class IngestionApi:
base_url: str
Expand Down Expand Up @@ -86,16 +89,32 @@ def submit(self, event: Dict[str, Any], endpoint: str) -> Dict[str, Any]:
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json",
}
response = requests.post(
f"{self.base_url.rstrip('/')}{endpoint}",
json=event,
headers=headers,
)

# Extract filename/item_id from the event for error reporting
item_id = event.get("id", None)
filename = None
if "assets" in event:
assets = event.get("assets", {})
if assets:
first_asset = next(iter(assets.values()), {})
href = first_asset.get("href", "")
filename = href.split("/")[-1] if href else None

try:
response = requests.post(
f"{self.base_url.rstrip('/')}{endpoint}",
json=event,
headers=headers,
)
response.raise_for_status()
except Exception as e:
print(response.text)
raise e
logging.error(f"Failed to submit STAC item. Item ID: {item_id}, Filename: {filename}, Error: {type(e).__name__}: {e}")
# Log response text if it's an HTTP error
if hasattr(e, 'response'):
resp = getattr(e, 'response')
if hasattr(resp, 'text'):
logging.error(f"Response: {resp.text}")
raise
return response.json()


Expand Down