From 2042ccbe01163f4c82126243bb47d4c0ccbc4a24 Mon Sep 17 00:00:00 2001 From: Kyle Lesinger Date: Wed, 7 Jan 2026 08:59:44 -0600 Subject: [PATCH 01/11] add event name metadata update --- .../groups/processing_tasks.py | 2 +- dags/veda_data_pipeline/utils/s3_discovery.py | 69 +++++++++++++++++-- 2 files changed, 66 insertions(+), 5 deletions(-) diff --git a/dags/veda_data_pipeline/groups/processing_tasks.py b/dags/veda_data_pipeline/groups/processing_tasks.py index ee666b09..587596d7 100644 --- a/dags/veda_data_pipeline/groups/processing_tasks.py +++ b/dags/veda_data_pipeline/groups/processing_tasks.py @@ -33,7 +33,7 @@ def remove_thumbnail_asset(ti): return payload # with exponential backoff enabled, retry delay is converted to seconds -@task(retries=2, retry_delay=60, retry_exponential_backoff=True, max_active_tis_per_dag=5) +@task(retries=1, retry_delay=60, retry_exponential_backoff=True, max_active_tis_per_dag=5) def submit_to_stac_ingestor_task(built_stac: dict): """Submit STAC items to the STAC ingestor API.""" event = built_stac.copy() diff --git a/dags/veda_data_pipeline/utils/s3_discovery.py b/dags/veda_data_pipeline/utils/s3_discovery.py index 1bec94a2..8b5ac184 100644 --- a/dags/veda_data_pipeline/utils/s3_discovery.py +++ b/dags/veda_data_pipeline/utils/s3_discovery.py @@ -75,8 +75,43 @@ def discover_from_s3( yield s3_object -def group_by_item(discovered_files: List[str], id_regex: str, assets: dict) -> dict: - """Group assets by matching regex patterns against discovered files.""" +def extract_event_name_from_filename(filename: str) -> dict: + """ + Extract event name from filename using pattern YYYYMM__. + + Args: + filename: The filename to extract event name from + + Returns: + Dict with event:name property or empty dict if no match + + Example: + For filename "202501_Fire_CA_aria_disturbance_track64_share_2025-01-09_day.tif": + Returns: {"event:name": "202501_Fire_CA"} + """ + # Pattern to match YYYYMM__ at the start of the filename + pattern = r"^(\d{6}_[^_]+_[^_]+)" + match = re.match(pattern, filename) + + if match: + event_name = match.group(1) + return {"event:name": event_name} + + return {} + + +def group_by_item(discovered_files: List[str], id_regex: str, assets: dict, extract_event_name: bool = False) -> dict: + """Group assets by matching regex patterns against discovered files. + + Args: + discovered_files: List of S3 URIs to discovered files + id_regex: Regex pattern to extract item ID from filename + assets: Dict of asset definitions with regex patterns + extract_event_name: If True, extract event name from filename pattern YYYYMM__ + + Returns: + List of items with grouped assets and extracted metadata + """ grouped_files = [] for uri in discovered_files: # Each file gets its matched asset type and id @@ -92,12 +127,18 @@ def group_by_item(discovered_files: List[str], id_regex: str, assets: dict) -> d asset_type = asset_name break if asset_type: + # Extract event name from filename if flag is enabled + extracted_metadata = {} + if extract_event_name: + extracted_metadata = extract_event_name_from_filename(filename) + grouped_files.append( { "prefix": prefix, "filename": filename, "asset_type": asset_type, "item_id": item_id, + "extracted_metadata": extracted_metadata, } ) else: @@ -112,6 +153,8 @@ def group_by_item(discovered_files: List[str], id_regex: str, assets: dict) -> d # Produce a dictionary in which each record is keyed by an item ID and contains a list of associated asset hrefs for group in grouped_data: item = {"item_id": group["item_id"], "assets": {}} + # Merge all extracted metadata from files in this group (they should be the same for all files with same item_id) + merged_metadata = {} for file in group["data"]: asset_type = file["asset_type"] filename = file["filename"] @@ -119,6 +162,12 @@ def group_by_item(discovered_files: List[str], id_regex: str, assets: dict) -> d updated_asset = assets[file["asset_type"]].copy() updated_asset["href"] = f"{file['prefix']}/{file['filename']}" item["assets"][asset_type] = updated_asset + # Merge extracted metadata (prioritize first occurrence) + if file.get("extracted_metadata") and not merged_metadata: + merged_metadata = file["extracted_metadata"] + + if merged_metadata: + item["extracted_metadata"] = merged_metadata items_with_assets.append(item) return items_with_assets @@ -200,6 +249,7 @@ def s3_discovery_handler(event, chunk_size=2800, role_arn=None, bucket_output=No id_template = event.get("id_template", "{}") date_fields = propagate_forward_datetime_args(event) dry_run = event.get("dry_run", False) + extract_event_name = event.get("disasters:extract_event_name", False) if process_from := event.get("process_from_yyyy_mm_dd"): process_from = datetime.strptime(process_from, "%Y-%m-%d").replace( tzinfo=tzlocal() @@ -235,7 +285,12 @@ def s3_discovery_handler(event, chunk_size=2800, role_arn=None, bucket_output=No # group only if more than 1 assets if assets and len(assets.keys()) > 1: - items_with_assets = group_by_item(file_uris, id_regex, assets) + items_with_assets = group_by_item( + file_uris, + id_regex, + assets, + extract_event_name=extract_event_name + ) else: # out of convenience, we might not always want to explicitly define assets # or if only a single asset is defined, follow default flow @@ -261,11 +316,17 @@ def s3_discovery_handler(event, chunk_size=2800, role_arn=None, bucket_output=No item_count >= slice[1] ): # Stop once we reach the end of the slice, while saving progress break + + # Merge extracted_metadata into properties for this item + item_properties = properties.copy() + if item.get("extracted_metadata"): + item_properties.update(item["extracted_metadata"]) + file_obj = { "collection": collection, "item_id": item["item_id"], "assets": item["assets"], - "properties": properties, + "properties": item_properties, **date_fields, } From 69eeb50f33ffac99fd0ee4eac83fb58b6f42698a Mon Sep 17 00:00:00 2001 From: Kyle Lesinger Date: Wed, 7 Jan 2026 09:13:23 -0600 Subject: [PATCH 02/11] add event name metadata update --- dags/veda_data_pipeline/groups/processing_tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/veda_data_pipeline/groups/processing_tasks.py b/dags/veda_data_pipeline/groups/processing_tasks.py index 587596d7..ee666b09 100644 --- a/dags/veda_data_pipeline/groups/processing_tasks.py +++ b/dags/veda_data_pipeline/groups/processing_tasks.py @@ -33,7 +33,7 @@ def remove_thumbnail_asset(ti): return payload # with exponential backoff enabled, retry delay is converted to seconds -@task(retries=1, retry_delay=60, retry_exponential_backoff=True, max_active_tis_per_dag=5) +@task(retries=2, retry_delay=60, retry_exponential_backoff=True, max_active_tis_per_dag=5) def submit_to_stac_ingestor_task(built_stac: dict): """Submit STAC items to the STAC ingestor API.""" event = built_stac.copy() From 69b0761258fc5a9972a33c89dcdf43ccbdfd93d6 Mon Sep 17 00:00:00 2001 From: Kyle Lesinger Date: Wed, 7 Jan 2026 09:16:57 -0600 Subject: [PATCH 03/11] add event name metadata update --- dags/veda_data_pipeline/utils/s3_discovery.py | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/dags/veda_data_pipeline/utils/s3_discovery.py b/dags/veda_data_pipeline/utils/s3_discovery.py index 8b5ac184..15526783 100644 --- a/dags/veda_data_pipeline/utils/s3_discovery.py +++ b/dags/veda_data_pipeline/utils/s3_discovery.py @@ -101,17 +101,7 @@ def extract_event_name_from_filename(filename: str) -> dict: def group_by_item(discovered_files: List[str], id_regex: str, assets: dict, extract_event_name: bool = False) -> dict: - """Group assets by matching regex patterns against discovered files. - - Args: - discovered_files: List of S3 URIs to discovered files - id_regex: Regex pattern to extract item ID from filename - assets: Dict of asset definitions with regex patterns - extract_event_name: If True, extract event name from filename pattern YYYYMM__ - - Returns: - List of items with grouped assets and extracted metadata - """ + """Group assets by matching regex patterns against discovered files.""" grouped_files = [] for uri in discovered_files: # Each file gets its matched asset type and id From fe9c368af4449ceb31d6eda63f636e690d686713 Mon Sep 17 00:00:00 2001 From: Kyle Lesinger Date: Wed, 7 Jan 2026 09:29:20 -0600 Subject: [PATCH 04/11] simplify logic --- dags/veda_data_pipeline/utils/s3_discovery.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/dags/veda_data_pipeline/utils/s3_discovery.py b/dags/veda_data_pipeline/utils/s3_discovery.py index 15526783..c52340c0 100644 --- a/dags/veda_data_pipeline/utils/s3_discovery.py +++ b/dags/veda_data_pipeline/utils/s3_discovery.py @@ -118,9 +118,7 @@ def group_by_item(discovered_files: List[str], id_regex: str, assets: dict, extr break if asset_type: # Extract event name from filename if flag is enabled - extracted_metadata = {} - if extract_event_name: - extracted_metadata = extract_event_name_from_filename(filename) + extracted_metadata = extract_event_name_from_filename(filename) if extract_event_name else {} grouped_files.append( { From 533b10aaadb66c2abcf0d17fa186b6e94dfa296f Mon Sep 17 00:00:00 2001 From: Kyle Lesinger Date: Wed, 7 Jan 2026 09:39:07 -0600 Subject: [PATCH 05/11] simplify logic --- dags/veda_data_pipeline/utils/s3_discovery.py | 21 ++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/dags/veda_data_pipeline/utils/s3_discovery.py b/dags/veda_data_pipeline/utils/s3_discovery.py index c52340c0..33fee1fa 100644 --- a/dags/veda_data_pipeline/utils/s3_discovery.py +++ b/dags/veda_data_pipeline/utils/s3_discovery.py @@ -101,7 +101,10 @@ def extract_event_name_from_filename(filename: str) -> dict: def group_by_item(discovered_files: List[str], id_regex: str, assets: dict, extract_event_name: bool = False) -> dict: - """Group assets by matching regex patterns against discovered files.""" + """Group assets by matching regex patterns against discovered files. + + If extract_event_name is True, extracts event name from filenames and adds to item metadata. + """ grouped_files = [] for uri in discovered_files: # Each file gets its matched asset type and id @@ -117,16 +120,12 @@ def group_by_item(discovered_files: List[str], id_regex: str, assets: dict, extr asset_type = asset_name break if asset_type: - # Extract event name from filename if flag is enabled - extracted_metadata = extract_event_name_from_filename(filename) if extract_event_name else {} - grouped_files.append( { "prefix": prefix, "filename": filename, "asset_type": asset_type, "item_id": item_id, - "extracted_metadata": extracted_metadata, } ) else: @@ -141,8 +140,7 @@ def group_by_item(discovered_files: List[str], id_regex: str, assets: dict, extr # Produce a dictionary in which each record is keyed by an item ID and contains a list of associated asset hrefs for group in grouped_data: item = {"item_id": group["item_id"], "assets": {}} - # Merge all extracted metadata from files in this group (they should be the same for all files with same item_id) - merged_metadata = {} + for file in group["data"]: asset_type = file["asset_type"] filename = file["filename"] @@ -150,12 +148,11 @@ def group_by_item(discovered_files: List[str], id_regex: str, assets: dict, extr updated_asset = assets[file["asset_type"]].copy() updated_asset["href"] = f"{file['prefix']}/{file['filename']}" item["assets"][asset_type] = updated_asset - # Merge extracted metadata (prioritize first occurrence) - if file.get("extracted_metadata") and not merged_metadata: - merged_metadata = file["extracted_metadata"] - if merged_metadata: - item["extracted_metadata"] = merged_metadata + # Extract event name from first file if flag is enabled + if extract_event_name and "extracted_metadata" not in item: + item["extracted_metadata"] = extract_event_name_from_filename(filename) + items_with_assets.append(item) return items_with_assets From 93086352f20583e1370b1257b48e8cb72ae76a06 Mon Sep 17 00:00:00 2001 From: Kyle Lesinger Date: Wed, 7 Jan 2026 09:45:52 -0600 Subject: [PATCH 06/11] simplify logic --- dags/veda_data_pipeline/utils/s3_discovery.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/dags/veda_data_pipeline/utils/s3_discovery.py b/dags/veda_data_pipeline/utils/s3_discovery.py index 33fee1fa..14dedbf6 100644 --- a/dags/veda_data_pipeline/utils/s3_discovery.py +++ b/dags/veda_data_pipeline/utils/s3_discovery.py @@ -149,9 +149,10 @@ def group_by_item(discovered_files: List[str], id_regex: str, assets: dict, extr updated_asset["href"] = f"{file['prefix']}/{file['filename']}" item["assets"][asset_type] = updated_asset - # Extract event name from first file if flag is enabled - if extract_event_name and "extracted_metadata" not in item: - item["extracted_metadata"] = extract_event_name_from_filename(filename) + # Extract event name from first file if flag is enabled + if extract_event_name and group["data"]: + first_filename = group["data"][0]["filename"] + item["extracted_metadata"] = extract_event_name_from_filename(first_filename) items_with_assets.append(item) return items_with_assets From eca5ef826939403dab7a659be113c18e7e2751c5 Mon Sep 17 00:00:00 2001 From: Kyle Lesinger Date: Wed, 7 Jan 2026 09:53:46 -0600 Subject: [PATCH 07/11] simplify logic --- dags/veda_data_pipeline/utils/s3_discovery.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/dags/veda_data_pipeline/utils/s3_discovery.py b/dags/veda_data_pipeline/utils/s3_discovery.py index 14dedbf6..f40af673 100644 --- a/dags/veda_data_pipeline/utils/s3_discovery.py +++ b/dags/veda_data_pipeline/utils/s3_discovery.py @@ -152,7 +152,7 @@ def group_by_item(discovered_files: List[str], id_regex: str, assets: dict, extr # Extract event name from first file if flag is enabled if extract_event_name and group["data"]: first_filename = group["data"][0]["filename"] - item["extracted_metadata"] = extract_event_name_from_filename(first_filename) + item["extracted_event_name"] = extract_event_name_from_filename(first_filename) items_with_assets.append(item) return items_with_assets @@ -303,16 +303,11 @@ def s3_discovery_handler(event, chunk_size=2800, role_arn=None, bucket_output=No ): # Stop once we reach the end of the slice, while saving progress break - # Merge extracted_metadata into properties for this item - item_properties = properties.copy() - if item.get("extracted_metadata"): - item_properties.update(item["extracted_metadata"]) - file_obj = { "collection": collection, "item_id": item["item_id"], "assets": item["assets"], - "properties": item_properties, + "properties": {**properties, **item.get("extracted_event_name", {})}, **date_fields, } From cf3e09feb5f71f8690c20644a67c447b38323fa9 Mon Sep 17 00:00:00 2001 From: Kyle Lesinger Date: Thu, 8 Jan 2026 09:38:23 -0600 Subject: [PATCH 08/11] add monty metadata --- .../utils/disasters_utils.py | 133 ++++++++++++++++++ dags/veda_data_pipeline/utils/s3_discovery.py | 46 +++--- 2 files changed, 149 insertions(+), 30 deletions(-) create mode 100644 dags/veda_data_pipeline/utils/disasters_utils.py diff --git a/dags/veda_data_pipeline/utils/disasters_utils.py b/dags/veda_data_pipeline/utils/disasters_utils.py new file mode 100644 index 00000000..311178b6 --- /dev/null +++ b/dags/veda_data_pipeline/utils/disasters_utils.py @@ -0,0 +1,133 @@ +import re +import json +import boto3 +from typing import Dict, List + + +def _load_from_s3(s3_key: str) -> dict: + """Load JSON data from S3 using Airflow variables.""" + from airflow.models.variable import Variable + bucket = Variable.get("aws_dags_variables", deserialize_json=True).get("EVENT_BUCKET") + client = boto3.client("s3") + result = client.get_object(Bucket=bucket, Key=s3_key) + return json.loads(result["Body"].read().decode()) + + +def _load_country_codes_mapping() -> Dict[str, List[str]]: + """Load the monty country codes mapping from S3.""" + data = _load_from_s3("disasters-monty/monty_country_codes.json") + return {entry["name"].lower(): entry["code"] for entry in data["country_codes"]} + + +def _load_hazard_codes_mapping() -> Dict[str, Dict[str, str]]: + """Load the monty hazard codes mapping from S3.""" + data = _load_from_s3("disasters-monty/monty_hazard_codes.json") + codes = data["classification_systems"]["undrr_isc_2025"]["codes"] + return { + entry["event_name"].lower(): { + "glide_code": entry["glide_code"], + "classification_code": entry["classification_code"] + } + for entry in codes + } + + +# Load mappings once when module is imported +COUNTRY_CODES_MAPPING = _load_country_codes_mapping() +HAZARD_CODES_MAPPING = _load_hazard_codes_mapping() + + +def _parse_filename(filename: str) -> dict: + """Parse filename pattern YYYYMM__ and return components.""" + match = re.match(r"^(\d{6})_([^_]+)_([^_]+)", filename) + if not match: + return {} + return { + "year_month": match.group(1), + "hazard_type": match.group(2).lower(), + "location": match.group(3).lower(), + "full_event_name": f"{match.group(1)}_{match.group(2)}_{match.group(3)}" + } + + +def extract_event_name_from_filename(filename: str) -> dict: + """Extract event name in format YYYYMM__.""" + parsed = _parse_filename(filename) + return {"event:name": parsed["full_event_name"]} if parsed else {} + + +def extract_country_codes_from_filename(filename: str) -> dict: + """Extract ISO 3166-1 alpha-3 country codes from location.""" + parsed = _parse_filename(filename) + if not parsed: + return {} + codes = COUNTRY_CODES_MAPPING.get(parsed["location"]) + return {"monty:country_codes": codes} if codes else {} + + +def extract_hazard_codes_from_filename(filename: str) -> dict: + """Extract GLIDE and UNDRR-ISC hazard classification codes.""" + parsed = _parse_filename(filename) + if not parsed: + return {} + hazard = HAZARD_CODES_MAPPING.get(parsed["hazard_type"]) + return {"monty:hazard_codes": [hazard["glide_code"], hazard["classification_code"]]} if hazard else {} + + +def extract_datetime_from_filename(filename: str) -> str: + """Extract datetime in formats: YYYY-MM-DD, YYYYMMDD, YYYY-MM-DDTHH:MM:SSZ, YYYYMMDDTHHMMSSZ.""" + patterns = [ + (r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z)", lambda m: m.replace("-", "").replace(":", "")), + (r"(\d{8}T\d{6}[Zz])", lambda m: m.upper()), + (r"(\d{4}-\d{2}-\d{2})", lambda m: m.replace("-", "")), + (r"(\d{8})", lambda m: m), + ] + for pattern, formatter in patterns: + if match := re.search(pattern, filename): + return formatter(match.group(1)) + return "" + + +def extract_corr_id_from_filename(filename: str) -> dict: + """Extract correlation ID in format: {datetime}-{ISO3}-{GLIDE_CODE}-1-GCDB.""" + parsed = _parse_filename(filename) + if not parsed: + return {} + datetime_str = extract_datetime_from_filename(filename) + if not datetime_str: + return {} + location = parsed["location"] + if location not in COUNTRY_CODES_MAPPING: + return {} + hazard_type = parsed["hazard_type"] + if hazard_type not in HAZARD_CODES_MAPPING: + return {} + country_code = COUNTRY_CODES_MAPPING[location][0] + glide_code = HAZARD_CODES_MAPPING[hazard_type]["glide_code"] + return {"monty:corr_id": f"{datetime_str}-{country_code}-{glide_code}-1-GCDB"} + + +def extract_all_metadata_from_filename(filename: str) -> dict: + """Extract all metadata fields: event:name, monty:country_codes, monty:hazard_codes, monty:corr_id.""" + parsed = _parse_filename(filename) + if not parsed: + return {} + datetime_str = extract_datetime_from_filename(filename) + if not datetime_str: + return {} + location = parsed["location"] + if location not in COUNTRY_CODES_MAPPING: + return {} + hazard_type = parsed["hazard_type"] + if hazard_type not in HAZARD_CODES_MAPPING: + return {} + country_codes = COUNTRY_CODES_MAPPING[location] + hazard_info = HAZARD_CODES_MAPPING[hazard_type] + hazard_codes = [hazard_info["glide_code"], hazard_info["classification_code"]] + corr_id = f"{datetime_str}-{country_codes[0]}-{hazard_info['glide_code']}-1-GCDB" + return { + "event:name": parsed["full_event_name"], + "monty:country_codes": country_codes, + "monty:hazard_codes": hazard_codes, + "monty:corr_id": corr_id + } diff --git a/dags/veda_data_pipeline/utils/s3_discovery.py b/dags/veda_data_pipeline/utils/s3_discovery.py index f40af673..12d000bc 100644 --- a/dags/veda_data_pipeline/utils/s3_discovery.py +++ b/dags/veda_data_pipeline/utils/s3_discovery.py @@ -11,6 +11,8 @@ import boto3 from smart_open import open as smrt_open +from .disasters_utils import extract_event_name_from_filename, extract_all_metadata_from_filename + # Adding a custom exception for empty list class EmptyFileListError(Exception): @@ -75,35 +77,11 @@ def discover_from_s3( yield s3_object -def extract_event_name_from_filename(filename: str) -> dict: - """ - Extract event name from filename using pattern YYYYMM__. - - Args: - filename: The filename to extract event name from - - Returns: - Dict with event:name property or empty dict if no match - - Example: - For filename "202501_Fire_CA_aria_disturbance_track64_share_2025-01-09_day.tif": - Returns: {"event:name": "202501_Fire_CA"} - """ - # Pattern to match YYYYMM__ at the start of the filename - pattern = r"^(\d{6}_[^_]+_[^_]+)" - match = re.match(pattern, filename) - - if match: - event_name = match.group(1) - return {"event:name": event_name} - - return {} - - -def group_by_item(discovered_files: List[str], id_regex: str, assets: dict, extract_event_name: bool = False) -> dict: +def group_by_item(discovered_files: List[str], id_regex: str, assets: dict, extract_event_name: bool = False, extract_country_codes: bool = False) -> dict: """Group assets by matching regex patterns against discovered files. If extract_event_name is True, extracts event name from filenames and adds to item metadata. + If extract_country_codes is True, extracts country codes from filenames and adds to item metadata. """ grouped_files = [] for uri in discovered_files: @@ -149,10 +127,16 @@ def group_by_item(discovered_files: List[str], id_regex: str, assets: dict, extr updated_asset["href"] = f"{file['prefix']}/{file['filename']}" item["assets"][asset_type] = updated_asset - # Extract event name from first file if flag is enabled - if extract_event_name and group["data"]: + # Extract metadata from first file if flags are enabled + if group["data"]: first_filename = group["data"][0]["filename"] - item["extracted_event_name"] = extract_event_name_from_filename(first_filename) + + if extract_country_codes: + # Extract all metadata (country codes, hazard codes, corr_id, and event name) + item["extracted_event_name"] = extract_all_metadata_from_filename(first_filename) + elif extract_event_name: + # Extract only event name + item["extracted_event_name"] = extract_event_name_from_filename(first_filename) items_with_assets.append(item) return items_with_assets @@ -236,6 +220,7 @@ def s3_discovery_handler(event, chunk_size=2800, role_arn=None, bucket_output=No date_fields = propagate_forward_datetime_args(event) dry_run = event.get("dry_run", False) extract_event_name = event.get("disasters:extract_event_name", False) + extract_monty = event.get("disasters:monty", False) if process_from := event.get("process_from_yyyy_mm_dd"): process_from = datetime.strptime(process_from, "%Y-%m-%d").replace( tzinfo=tzlocal() @@ -275,7 +260,8 @@ def s3_discovery_handler(event, chunk_size=2800, role_arn=None, bucket_output=No file_uris, id_regex, assets, - extract_event_name=extract_event_name + extract_event_name=extract_event_name, + extract_country_codes=extract_monty ) else: # out of convenience, we might not always want to explicitly define assets From 681c5e2720bc415a2abf5dfc488ea4ec891f841d Mon Sep 17 00:00:00 2001 From: Kyle Lesinger Date: Thu, 8 Jan 2026 09:49:20 -0600 Subject: [PATCH 09/11] remove eager loading (s3 credential issue) --- .../utils/disasters_utils.py | 46 +++++++++++++------ 1 file changed, 33 insertions(+), 13 deletions(-) diff --git a/dags/veda_data_pipeline/utils/disasters_utils.py b/dags/veda_data_pipeline/utils/disasters_utils.py index 311178b6..c0dd4b67 100644 --- a/dags/veda_data_pipeline/utils/disasters_utils.py +++ b/dags/veda_data_pipeline/utils/disasters_utils.py @@ -32,9 +32,25 @@ def _load_hazard_codes_mapping() -> Dict[str, Dict[str, str]]: } -# Load mappings once when module is imported -COUNTRY_CODES_MAPPING = _load_country_codes_mapping() -HAZARD_CODES_MAPPING = _load_hazard_codes_mapping() +# Lazy-load mappings on first access +_COUNTRY_CODES_MAPPING = None +_HAZARD_CODES_MAPPING = None + + +def _get_country_codes_mapping() -> Dict[str, List[str]]: + """Get country codes mapping, loading from S3 on first access.""" + global _COUNTRY_CODES_MAPPING + if _COUNTRY_CODES_MAPPING is None: + _COUNTRY_CODES_MAPPING = _load_country_codes_mapping() + return _COUNTRY_CODES_MAPPING + + +def _get_hazard_codes_mapping() -> Dict[str, Dict[str, str]]: + """Get hazard codes mapping, loading from S3 on first access.""" + global _HAZARD_CODES_MAPPING + if _HAZARD_CODES_MAPPING is None: + _HAZARD_CODES_MAPPING = _load_hazard_codes_mapping() + return _HAZARD_CODES_MAPPING def _parse_filename(filename: str) -> dict: @@ -61,7 +77,7 @@ def extract_country_codes_from_filename(filename: str) -> dict: parsed = _parse_filename(filename) if not parsed: return {} - codes = COUNTRY_CODES_MAPPING.get(parsed["location"]) + codes = _get_country_codes_mapping().get(parsed["location"]) return {"monty:country_codes": codes} if codes else {} @@ -70,7 +86,7 @@ def extract_hazard_codes_from_filename(filename: str) -> dict: parsed = _parse_filename(filename) if not parsed: return {} - hazard = HAZARD_CODES_MAPPING.get(parsed["hazard_type"]) + hazard = _get_hazard_codes_mapping().get(parsed["hazard_type"]) return {"monty:hazard_codes": [hazard["glide_code"], hazard["classification_code"]]} if hazard else {} @@ -97,13 +113,15 @@ def extract_corr_id_from_filename(filename: str) -> dict: if not datetime_str: return {} location = parsed["location"] - if location not in COUNTRY_CODES_MAPPING: + country_mapping = _get_country_codes_mapping() + if location not in country_mapping: return {} hazard_type = parsed["hazard_type"] - if hazard_type not in HAZARD_CODES_MAPPING: + hazard_mapping = _get_hazard_codes_mapping() + if hazard_type not in hazard_mapping: return {} - country_code = COUNTRY_CODES_MAPPING[location][0] - glide_code = HAZARD_CODES_MAPPING[hazard_type]["glide_code"] + country_code = country_mapping[location][0] + glide_code = hazard_mapping[hazard_type]["glide_code"] return {"monty:corr_id": f"{datetime_str}-{country_code}-{glide_code}-1-GCDB"} @@ -116,13 +134,15 @@ def extract_all_metadata_from_filename(filename: str) -> dict: if not datetime_str: return {} location = parsed["location"] - if location not in COUNTRY_CODES_MAPPING: + country_mapping = _get_country_codes_mapping() + if location not in country_mapping: return {} hazard_type = parsed["hazard_type"] - if hazard_type not in HAZARD_CODES_MAPPING: + hazard_mapping = _get_hazard_codes_mapping() + if hazard_type not in hazard_mapping: return {} - country_codes = COUNTRY_CODES_MAPPING[location] - hazard_info = HAZARD_CODES_MAPPING[hazard_type] + country_codes = country_mapping[location] + hazard_info = hazard_mapping[hazard_type] hazard_codes = [hazard_info["glide_code"], hazard_info["classification_code"]] corr_id = f"{datetime_str}-{country_codes[0]}-{hazard_info['glide_code']}-1-GCDB" return { From 360ff89114dee2e7cf5a54e569cd24faf151ee75 Mon Sep 17 00:00:00 2001 From: Kyle Lesinger Date: Thu, 8 Jan 2026 10:07:54 -0600 Subject: [PATCH 10/11] add lru cache --- .../utils/disasters_utils.py | 32 ++++--------------- 1 file changed, 7 insertions(+), 25 deletions(-) diff --git a/dags/veda_data_pipeline/utils/disasters_utils.py b/dags/veda_data_pipeline/utils/disasters_utils.py index c0dd4b67..93c5ed62 100644 --- a/dags/veda_data_pipeline/utils/disasters_utils.py +++ b/dags/veda_data_pipeline/utils/disasters_utils.py @@ -1,6 +1,7 @@ import re import json import boto3 +from functools import lru_cache from typing import Dict, List @@ -13,14 +14,16 @@ def _load_from_s3(s3_key: str) -> dict: return json.loads(result["Body"].read().decode()) -def _load_country_codes_mapping() -> Dict[str, List[str]]: - """Load the monty country codes mapping from S3.""" +@lru_cache(maxsize=1) +def _get_country_codes_mapping() -> Dict[str, List[str]]: + """Load and cache country codes mapping from S3.""" data = _load_from_s3("disasters-monty/monty_country_codes.json") return {entry["name"].lower(): entry["code"] for entry in data["country_codes"]} -def _load_hazard_codes_mapping() -> Dict[str, Dict[str, str]]: - """Load the monty hazard codes mapping from S3.""" +@lru_cache(maxsize=1) +def _get_hazard_codes_mapping() -> Dict[str, Dict[str, str]]: + """Load and cache hazard codes mapping from S3.""" data = _load_from_s3("disasters-monty/monty_hazard_codes.json") codes = data["classification_systems"]["undrr_isc_2025"]["codes"] return { @@ -32,27 +35,6 @@ def _load_hazard_codes_mapping() -> Dict[str, Dict[str, str]]: } -# Lazy-load mappings on first access -_COUNTRY_CODES_MAPPING = None -_HAZARD_CODES_MAPPING = None - - -def _get_country_codes_mapping() -> Dict[str, List[str]]: - """Get country codes mapping, loading from S3 on first access.""" - global _COUNTRY_CODES_MAPPING - if _COUNTRY_CODES_MAPPING is None: - _COUNTRY_CODES_MAPPING = _load_country_codes_mapping() - return _COUNTRY_CODES_MAPPING - - -def _get_hazard_codes_mapping() -> Dict[str, Dict[str, str]]: - """Get hazard codes mapping, loading from S3 on first access.""" - global _HAZARD_CODES_MAPPING - if _HAZARD_CODES_MAPPING is None: - _HAZARD_CODES_MAPPING = _load_hazard_codes_mapping() - return _HAZARD_CODES_MAPPING - - def _parse_filename(filename: str) -> dict: """Parse filename pattern YYYYMM__ and return components.""" match = re.match(r"^(\d{6})_([^_]+)_([^_]+)", filename) From 78ae72df12530f7109592e35b4bcd8e7c62f8e29 Mon Sep 17 00:00:00 2001 From: Kyle Lesinger Date: Thu, 8 Jan 2026 10:13:30 -0600 Subject: [PATCH 11/11] update s3 discovery with better object names --- dags/veda_data_pipeline/utils/s3_discovery.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dags/veda_data_pipeline/utils/s3_discovery.py b/dags/veda_data_pipeline/utils/s3_discovery.py index 12d000bc..72349e8e 100644 --- a/dags/veda_data_pipeline/utils/s3_discovery.py +++ b/dags/veda_data_pipeline/utils/s3_discovery.py @@ -77,11 +77,11 @@ def discover_from_s3( yield s3_object -def group_by_item(discovered_files: List[str], id_regex: str, assets: dict, extract_event_name: bool = False, extract_country_codes: bool = False) -> dict: +def group_by_item(discovered_files: List[str], id_regex: str, assets: dict, extract_event_name: bool = False, extract_monty: bool = False) -> dict: """Group assets by matching regex patterns against discovered files. If extract_event_name is True, extracts event name from filenames and adds to item metadata. - If extract_country_codes is True, extracts country codes from filenames and adds to item metadata. + If extract_monty is True, extracts all monty metadata (country codes, hazard codes, corr_id) from filenames and adds to item metadata. """ grouped_files = [] for uri in discovered_files: @@ -131,7 +131,7 @@ def group_by_item(discovered_files: List[str], id_regex: str, assets: dict, extr if group["data"]: first_filename = group["data"][0]["filename"] - if extract_country_codes: + if extract_monty: # Extract all metadata (country codes, hazard codes, corr_id, and event name) item["extracted_event_name"] = extract_all_metadata_from_filename(first_filename) elif extract_event_name: @@ -261,7 +261,7 @@ def s3_discovery_handler(event, chunk_size=2800, role_arn=None, bucket_output=No id_regex, assets, extract_event_name=extract_event_name, - extract_country_codes=extract_monty + extract_monty=extract_monty ) else: # out of convenience, we might not always want to explicitly define assets