Skip to content
135 changes: 135 additions & 0 deletions dags/veda_data_pipeline/utils/disasters_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import re
import json
import boto3
from functools import lru_cache
from typing import Dict, List


def _load_from_s3(s3_key: str) -> dict:
"""Load JSON data from S3 using Airflow variables."""
from airflow.models.variable import Variable
bucket = Variable.get("aws_dags_variables", deserialize_json=True).get("EVENT_BUCKET")
client = boto3.client("s3")
result = client.get_object(Bucket=bucket, Key=s3_key)
return json.loads(result["Body"].read().decode())


@lru_cache(maxsize=1)
def _get_country_codes_mapping() -> Dict[str, List[str]]:
"""Load and cache country codes mapping from S3."""
data = _load_from_s3("disasters-monty/monty_country_codes.json")
return {entry["name"].lower(): entry["code"] for entry in data["country_codes"]}


@lru_cache(maxsize=1)
def _get_hazard_codes_mapping() -> Dict[str, Dict[str, str]]:
"""Load and cache hazard codes mapping from S3."""
data = _load_from_s3("disasters-monty/monty_hazard_codes.json")
codes = data["classification_systems"]["undrr_isc_2025"]["codes"]
return {
entry["event_name"].lower(): {
"glide_code": entry["glide_code"],
"classification_code": entry["classification_code"]
}
for entry in codes
}


def _parse_filename(filename: str) -> dict:
"""Parse filename pattern YYYYMM_<hazard>_<location> and return components."""
match = re.match(r"^(\d{6})_([^_]+)_([^_]+)", filename)
if not match:
return {}
return {
"year_month": match.group(1),
"hazard_type": match.group(2).lower(),
"location": match.group(3).lower(),
"full_event_name": f"{match.group(1)}_{match.group(2)}_{match.group(3)}"
}


def extract_event_name_from_filename(filename: str) -> dict:
"""Extract event name in format YYYYMM_<hazard>_<location>."""
parsed = _parse_filename(filename)
return {"event:name": parsed["full_event_name"]} if parsed else {}


def extract_country_codes_from_filename(filename: str) -> dict:
"""Extract ISO 3166-1 alpha-3 country codes from location."""
parsed = _parse_filename(filename)
if not parsed:
return {}
codes = _get_country_codes_mapping().get(parsed["location"])
return {"monty:country_codes": codes} if codes else {}


def extract_hazard_codes_from_filename(filename: str) -> dict:
"""Extract GLIDE and UNDRR-ISC hazard classification codes."""
parsed = _parse_filename(filename)
if not parsed:
return {}
hazard = _get_hazard_codes_mapping().get(parsed["hazard_type"])
return {"monty:hazard_codes": [hazard["glide_code"], hazard["classification_code"]]} if hazard else {}


def extract_datetime_from_filename(filename: str) -> str:
"""Extract datetime in formats: YYYY-MM-DD, YYYYMMDD, YYYY-MM-DDTHH:MM:SSZ, YYYYMMDDTHHMMSSZ."""
patterns = [
(r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z)", lambda m: m.replace("-", "").replace(":", "")),
(r"(\d{8}T\d{6}[Zz])", lambda m: m.upper()),
(r"(\d{4}-\d{2}-\d{2})", lambda m: m.replace("-", "")),
(r"(\d{8})", lambda m: m),
]
for pattern, formatter in patterns:
if match := re.search(pattern, filename):
return formatter(match.group(1))
return ""


def extract_corr_id_from_filename(filename: str) -> dict:
"""Extract correlation ID in format: {datetime}-{ISO3}-{GLIDE_CODE}-1-GCDB."""
parsed = _parse_filename(filename)
if not parsed:
return {}
datetime_str = extract_datetime_from_filename(filename)
if not datetime_str:
return {}
location = parsed["location"]
country_mapping = _get_country_codes_mapping()
if location not in country_mapping:
return {}
hazard_type = parsed["hazard_type"]
hazard_mapping = _get_hazard_codes_mapping()
if hazard_type not in hazard_mapping:
return {}
country_code = country_mapping[location][0]
glide_code = hazard_mapping[hazard_type]["glide_code"]
return {"monty:corr_id": f"{datetime_str}-{country_code}-{glide_code}-1-GCDB"}


def extract_all_metadata_from_filename(filename: str) -> dict:
"""Extract all metadata fields: event:name, monty:country_codes, monty:hazard_codes, monty:corr_id."""
parsed = _parse_filename(filename)
if not parsed:
return {}
datetime_str = extract_datetime_from_filename(filename)
if not datetime_str:
return {}
location = parsed["location"]
country_mapping = _get_country_codes_mapping()
if location not in country_mapping:
return {}
hazard_type = parsed["hazard_type"]
hazard_mapping = _get_hazard_codes_mapping()
if hazard_type not in hazard_mapping:
return {}
country_codes = country_mapping[location]
hazard_info = hazard_mapping[hazard_type]
hazard_codes = [hazard_info["glide_code"], hazard_info["classification_code"]]
corr_id = f"{datetime_str}-{country_codes[0]}-{hazard_info['glide_code']}-1-GCDB"
return {
"event:name": parsed["full_event_name"],
"monty:country_codes": country_codes,
"monty:hazard_codes": hazard_codes,
"monty:corr_id": corr_id
}
36 changes: 32 additions & 4 deletions dags/veda_data_pipeline/utils/s3_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import boto3
from smart_open import open as smrt_open

from .disasters_utils import extract_event_name_from_filename, extract_all_metadata_from_filename


# Adding a custom exception for empty list
class EmptyFileListError(Exception):
Expand Down Expand Up @@ -75,8 +77,12 @@ def discover_from_s3(
yield s3_object


def group_by_item(discovered_files: List[str], id_regex: str, assets: dict) -> dict:
"""Group assets by matching regex patterns against discovered files."""
def group_by_item(discovered_files: List[str], id_regex: str, assets: dict, extract_event_name: bool = False, extract_monty: bool = False) -> dict:
"""Group assets by matching regex patterns against discovered files.

If extract_event_name is True, extracts event name from filenames and adds to item metadata.
If extract_monty is True, extracts all monty metadata (country codes, hazard codes, corr_id) from filenames and adds to item metadata.
"""
grouped_files = []
for uri in discovered_files:
# Each file gets its matched asset type and id
Expand Down Expand Up @@ -112,13 +118,26 @@ def group_by_item(discovered_files: List[str], id_regex: str, assets: dict) -> d
# Produce a dictionary in which each record is keyed by an item ID and contains a list of associated asset hrefs
for group in grouped_data:
item = {"item_id": group["item_id"], "assets": {}}

for file in group["data"]:
asset_type = file["asset_type"]
filename = file["filename"]
# Copy the asset definition and update the href
updated_asset = assets[file["asset_type"]].copy()
updated_asset["href"] = f"{file['prefix']}/{file['filename']}"
item["assets"][asset_type] = updated_asset

# Extract metadata from first file if flags are enabled
if group["data"]:
first_filename = group["data"][0]["filename"]

if extract_monty:
# Extract all metadata (country codes, hazard codes, corr_id, and event name)
item["extracted_event_name"] = extract_all_metadata_from_filename(first_filename)
elif extract_event_name:
# Extract only event name
item["extracted_event_name"] = extract_event_name_from_filename(first_filename)

items_with_assets.append(item)
return items_with_assets

Expand Down Expand Up @@ -200,6 +219,8 @@ def s3_discovery_handler(event, chunk_size=2800, role_arn=None, bucket_output=No
id_template = event.get("id_template", "{}")
date_fields = propagate_forward_datetime_args(event)
dry_run = event.get("dry_run", False)
extract_event_name = event.get("disasters:extract_event_name", False)
extract_monty = event.get("disasters:monty", False)
if process_from := event.get("process_from_yyyy_mm_dd"):
process_from = datetime.strptime(process_from, "%Y-%m-%d").replace(
tzinfo=tzlocal()
Expand Down Expand Up @@ -235,7 +256,13 @@ def s3_discovery_handler(event, chunk_size=2800, role_arn=None, bucket_output=No

# group only if more than 1 assets
if assets and len(assets.keys()) > 1:
items_with_assets = group_by_item(file_uris, id_regex, assets)
items_with_assets = group_by_item(
file_uris,
id_regex,
assets,
extract_event_name=extract_event_name,
extract_monty=extract_monty
)
else:
# out of convenience, we might not always want to explicitly define assets
# or if only a single asset is defined, follow default flow
Expand All @@ -261,11 +288,12 @@ def s3_discovery_handler(event, chunk_size=2800, role_arn=None, bucket_output=No
item_count >= slice[1]
): # Stop once we reach the end of the slice, while saving progress
break

file_obj = {
"collection": collection,
"item_id": item["item_id"],
"assets": item["assets"],
"properties": properties,
"properties": {**properties, **item.get("extracted_event_name", {})},
**date_fields,
}

Expand Down