diff --git a/dags/veda_data_pipeline/utils/disasters_utils.py b/dags/veda_data_pipeline/utils/disasters_utils.py new file mode 100644 index 00000000..93c5ed62 --- /dev/null +++ b/dags/veda_data_pipeline/utils/disasters_utils.py @@ -0,0 +1,135 @@ +import re +import json +import boto3 +from functools import lru_cache +from typing import Dict, List + + +def _load_from_s3(s3_key: str) -> dict: + """Load JSON data from S3 using Airflow variables.""" + from airflow.models.variable import Variable + bucket = Variable.get("aws_dags_variables", deserialize_json=True).get("EVENT_BUCKET") + client = boto3.client("s3") + result = client.get_object(Bucket=bucket, Key=s3_key) + return json.loads(result["Body"].read().decode()) + + +@lru_cache(maxsize=1) +def _get_country_codes_mapping() -> Dict[str, List[str]]: + """Load and cache country codes mapping from S3.""" + data = _load_from_s3("disasters-monty/monty_country_codes.json") + return {entry["name"].lower(): entry["code"] for entry in data["country_codes"]} + + +@lru_cache(maxsize=1) +def _get_hazard_codes_mapping() -> Dict[str, Dict[str, str]]: + """Load and cache hazard codes mapping from S3.""" + data = _load_from_s3("disasters-monty/monty_hazard_codes.json") + codes = data["classification_systems"]["undrr_isc_2025"]["codes"] + return { + entry["event_name"].lower(): { + "glide_code": entry["glide_code"], + "classification_code": entry["classification_code"] + } + for entry in codes + } + + +def _parse_filename(filename: str) -> dict: + """Parse filename pattern YYYYMM__ and return components.""" + match = re.match(r"^(\d{6})_([^_]+)_([^_]+)", filename) + if not match: + return {} + return { + "year_month": match.group(1), + "hazard_type": match.group(2).lower(), + "location": match.group(3).lower(), + "full_event_name": f"{match.group(1)}_{match.group(2)}_{match.group(3)}" + } + + +def extract_event_name_from_filename(filename: str) -> dict: + """Extract event name in format YYYYMM__.""" + parsed = _parse_filename(filename) + return {"event:name": parsed["full_event_name"]} if parsed else {} + + +def extract_country_codes_from_filename(filename: str) -> dict: + """Extract ISO 3166-1 alpha-3 country codes from location.""" + parsed = _parse_filename(filename) + if not parsed: + return {} + codes = _get_country_codes_mapping().get(parsed["location"]) + return {"monty:country_codes": codes} if codes else {} + + +def extract_hazard_codes_from_filename(filename: str) -> dict: + """Extract GLIDE and UNDRR-ISC hazard classification codes.""" + parsed = _parse_filename(filename) + if not parsed: + return {} + hazard = _get_hazard_codes_mapping().get(parsed["hazard_type"]) + return {"monty:hazard_codes": [hazard["glide_code"], hazard["classification_code"]]} if hazard else {} + + +def extract_datetime_from_filename(filename: str) -> str: + """Extract datetime in formats: YYYY-MM-DD, YYYYMMDD, YYYY-MM-DDTHH:MM:SSZ, YYYYMMDDTHHMMSSZ.""" + patterns = [ + (r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z)", lambda m: m.replace("-", "").replace(":", "")), + (r"(\d{8}T\d{6}[Zz])", lambda m: m.upper()), + (r"(\d{4}-\d{2}-\d{2})", lambda m: m.replace("-", "")), + (r"(\d{8})", lambda m: m), + ] + for pattern, formatter in patterns: + if match := re.search(pattern, filename): + return formatter(match.group(1)) + return "" + + +def extract_corr_id_from_filename(filename: str) -> dict: + """Extract correlation ID in format: {datetime}-{ISO3}-{GLIDE_CODE}-1-GCDB.""" + parsed = _parse_filename(filename) + if not parsed: + return {} + datetime_str = extract_datetime_from_filename(filename) + if not datetime_str: + return {} + location = parsed["location"] + country_mapping = _get_country_codes_mapping() + if location not in country_mapping: + return {} + hazard_type = parsed["hazard_type"] + hazard_mapping = _get_hazard_codes_mapping() + if hazard_type not in hazard_mapping: + return {} + country_code = country_mapping[location][0] + glide_code = hazard_mapping[hazard_type]["glide_code"] + return {"monty:corr_id": f"{datetime_str}-{country_code}-{glide_code}-1-GCDB"} + + +def extract_all_metadata_from_filename(filename: str) -> dict: + """Extract all metadata fields: event:name, monty:country_codes, monty:hazard_codes, monty:corr_id.""" + parsed = _parse_filename(filename) + if not parsed: + return {} + datetime_str = extract_datetime_from_filename(filename) + if not datetime_str: + return {} + location = parsed["location"] + country_mapping = _get_country_codes_mapping() + if location not in country_mapping: + return {} + hazard_type = parsed["hazard_type"] + hazard_mapping = _get_hazard_codes_mapping() + if hazard_type not in hazard_mapping: + return {} + country_codes = country_mapping[location] + hazard_info = hazard_mapping[hazard_type] + hazard_codes = [hazard_info["glide_code"], hazard_info["classification_code"]] + corr_id = f"{datetime_str}-{country_codes[0]}-{hazard_info['glide_code']}-1-GCDB" + return { + "event:name": parsed["full_event_name"], + "monty:country_codes": country_codes, + "monty:hazard_codes": hazard_codes, + "monty:corr_id": corr_id + } diff --git a/dags/veda_data_pipeline/utils/s3_discovery.py b/dags/veda_data_pipeline/utils/s3_discovery.py index 1bec94a2..72349e8e 100644 --- a/dags/veda_data_pipeline/utils/s3_discovery.py +++ b/dags/veda_data_pipeline/utils/s3_discovery.py @@ -11,6 +11,8 @@ import boto3 from smart_open import open as smrt_open +from .disasters_utils import extract_event_name_from_filename, extract_all_metadata_from_filename + # Adding a custom exception for empty list class EmptyFileListError(Exception): @@ -75,8 +77,12 @@ def discover_from_s3( yield s3_object -def group_by_item(discovered_files: List[str], id_regex: str, assets: dict) -> dict: - """Group assets by matching regex patterns against discovered files.""" +def group_by_item(discovered_files: List[str], id_regex: str, assets: dict, extract_event_name: bool = False, extract_monty: bool = False) -> dict: + """Group assets by matching regex patterns against discovered files. + + If extract_event_name is True, extracts event name from filenames and adds to item metadata. + If extract_monty is True, extracts all monty metadata (country codes, hazard codes, corr_id) from filenames and adds to item metadata. + """ grouped_files = [] for uri in discovered_files: # Each file gets its matched asset type and id @@ -112,6 +118,7 @@ def group_by_item(discovered_files: List[str], id_regex: str, assets: dict) -> d # Produce a dictionary in which each record is keyed by an item ID and contains a list of associated asset hrefs for group in grouped_data: item = {"item_id": group["item_id"], "assets": {}} + for file in group["data"]: asset_type = file["asset_type"] filename = file["filename"] @@ -119,6 +126,18 @@ def group_by_item(discovered_files: List[str], id_regex: str, assets: dict) -> d updated_asset = assets[file["asset_type"]].copy() updated_asset["href"] = f"{file['prefix']}/{file['filename']}" item["assets"][asset_type] = updated_asset + + # Extract metadata from first file if flags are enabled + if group["data"]: + first_filename = group["data"][0]["filename"] + + if extract_monty: + # Extract all metadata (country codes, hazard codes, corr_id, and event name) + item["extracted_event_name"] = extract_all_metadata_from_filename(first_filename) + elif extract_event_name: + # Extract only event name + item["extracted_event_name"] = extract_event_name_from_filename(first_filename) + items_with_assets.append(item) return items_with_assets @@ -200,6 +219,8 @@ def s3_discovery_handler(event, chunk_size=2800, role_arn=None, bucket_output=No id_template = event.get("id_template", "{}") date_fields = propagate_forward_datetime_args(event) dry_run = event.get("dry_run", False) + extract_event_name = event.get("disasters:extract_event_name", False) + extract_monty = event.get("disasters:monty", False) if process_from := event.get("process_from_yyyy_mm_dd"): process_from = datetime.strptime(process_from, "%Y-%m-%d").replace( tzinfo=tzlocal() @@ -235,7 +256,13 @@ def s3_discovery_handler(event, chunk_size=2800, role_arn=None, bucket_output=No # group only if more than 1 assets if assets and len(assets.keys()) > 1: - items_with_assets = group_by_item(file_uris, id_regex, assets) + items_with_assets = group_by_item( + file_uris, + id_regex, + assets, + extract_event_name=extract_event_name, + extract_monty=extract_monty + ) else: # out of convenience, we might not always want to explicitly define assets # or if only a single asset is defined, follow default flow @@ -261,11 +288,12 @@ def s3_discovery_handler(event, chunk_size=2800, role_arn=None, bucket_output=No item_count >= slice[1] ): # Stop once we reach the end of the slice, while saving progress break + file_obj = { "collection": collection, "item_id": item["item_id"], "assets": item["assets"], - "properties": properties, + "properties": {**properties, **item.get("extracted_event_name", {})}, **date_fields, }