Skip to content

Commit

Permalink
load products to dicts
Browse files Browse the repository at this point in the history
  • Loading branch information
peshence committed Nov 14, 2024
1 parent 61bff0d commit 2abae04
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 60 deletions.
66 changes: 36 additions & 30 deletions polytope_server/common/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import logging
import os
import xml.etree.ElementTree as ET
from datetime import date, datetime, time, timedelta
from typing import List, Optional, Tuple
from datetime import date, datetime, timedelta
from typing import Any, Dict, List, Optional, Tuple

from polytope_feature.utility.exceptions import PolytopeError

Expand All @@ -16,8 +16,16 @@

class ScheduleReader:
def __init__(self, schedule_file: str) -> None:
self.tree = ET.parse(schedule_file)
self.products: List[ET.Element] = self.tree.findall("product") + self.tree.findall("dissemination_only/product")
self.products: List[Dict[str, Any]] = self.load_products(schedule_file)

def load_products(self, schedule_file: str) -> List[Dict[str, Any]]:
tree = ET.parse(schedule_file)
products = tree.findall("product")
product_dicts = []
for product in products:
product_dict = {child.tag: child.text for child in product}
product_dicts.append(product_dict)
return product_dicts

def check_released(
self, date_in: str, cclass: str, stream: str, domain: str, time_in: str, step: str, diss_type: str
Expand Down Expand Up @@ -53,9 +61,9 @@ def check_released(
If the data is not released yet.
"""
# Get only latest production date and time, last step
date_in = max(map(parse_mars_date, split_mars_param(date_in)))
time_in = max(map(lambda x: datetime.strptime(x, "%H:%M").time(), split_mars_param(time_in)))
step = max(map(int, split_mars_param(step)))
date_in = datetime.strftime(max(map(parse_mars_date, split_mars_param(date_in))), "%Y-%m-%d")
time_in = max(map(lambda x: datetime.strptime(x, "%H:%M").time(), split_mars_param(time_in))).strftime("%H:%M")
step = str(max(map(int, split_mars_param(step))))

cclass = split_mars_param(cclass)
stream = split_mars_param(stream)
Expand All @@ -71,7 +79,7 @@ def check_released(
)

release_time_dt = datetime.strptime(release_time, "%H:%M:%S")
release_date = date_in + timedelta(days=delta_day)
release_date = datetime.fromisoformat(date_in) + timedelta(days=delta_day)
release_date = release_date.replace(
hour=release_time_dt.hour, minute=release_time_dt.minute, second=release_time_dt.second
)
Expand Down Expand Up @@ -113,28 +121,28 @@ def get_release_time_and_delta_day(
number of days to add to the production date
"""

def matches_criteria(itree: ET.Element) -> bool:
if itree.findtext("class") != cclass:
def matches_criteria(product: Dict[str, Any]) -> bool:
if product.get("class") != cclass:
return False
if stream.lower() not in itree.findtext("stream"):
if stream.lower() not in product.get("stream", "").lower():
return False
if domain.lower() != find_tag(itree, "domain"):
if domain.lower() != find_tag(product, "domain"):
return False
if itree.findtext("time") != time_in:
if product.get("time") != time_in:
return False
if itree.findtext("diss_type") != diss_type.lower():
if find_tag(product, "diss_type") != diss_type.lower():
return False
if cclass != "ai":
tmp_step = find_tag(itree, "step")
tmp_step = find_tag(product, "step")
istep = int(tmp_step) if tmp_step is not None else tmp_step
if istep != int(step):
return False
return True

for itree in self.products:
if matches_criteria(itree):
release_time = itree.findtext("release_time")
delta_day = int(itree.findtext("release_delta_day"))
for product in self.products:
if matches_criteria(product):
release_time = product.get("release_time")
delta_day = int(product.get("release_delta_day", 0))
logging.info(
"release time: {} with delta_day: {} found for class: {}, stream: {}, type: {}, "
"domain: {}, time: {}, step: {}".format(
Expand All @@ -145,7 +153,7 @@ def matches_criteria(itree: ET.Element) -> bool:

logging.warning(
"No release time found for class{}, stream: {}, type: {}, domain: {}, time: {}, step: {}".format(
cclass, stream, diss_type, domain, time, step
cclass, stream, diss_type, domain, time_in, step
)
)
return None, None
Expand Down Expand Up @@ -179,14 +187,12 @@ def parse_mars_date(mars_date: str) -> datetime:
delta = int(mars_date)
if delta > 0:
raise PolytopeError(f"Invalid date format: {mars_date}")
return date.today() - timedelta(-int(mars_date))
return date.today() - timedelta(days=-delta)
except ValueError:
raise PolytopeError(f"Invalid date format: {mars_date}")


def split_mars_param(
param: str,
) -> List[str]:
def split_mars_param(param: str) -> List[str]:
"""
Parse a MARS parameter string into an array if it is
one or get the last element if it's a range.
Expand All @@ -209,15 +215,15 @@ def split_mars_param(
return parts


def find_tag(tree_elem: ET.Element, keyword: str) -> Optional[str]:
def find_tag(product: Dict[str, Any], keyword: str) -> Optional[str]:
"""
Utility function to find a tag in the tree element,
Utility function to find a tag in the product dictionary,
checking for both 'diss_{keyword}' and '{keyword}' tags. Used with "step" and "domain" tags.
Parameters
----------
tree_elem : ET.Element
The XML element to search within.
product : Dict[str, Any]
The product dictionary to search within.
keyword : str
The tag to search for.
Expand All @@ -226,9 +232,9 @@ def find_tag(tree_elem: ET.Element, keyword: str) -> Optional[str]:
Optional[str]
The text of the tag if found, otherwise None.
"""
tag = tree_elem.findtext(f"diss_{keyword}")
tag = product.get(f"diss_{keyword}")
if tag is None:
tag = tree_elem.findtext(keyword)
tag = product.get(keyword)
if tag is None:
raise IOError(f"Couldn't find forecast {keyword} as either 'diss_{keyword}' or '{keyword}'")
return tag
Expand Down
51 changes: 21 additions & 30 deletions tests/unit/test_schedule.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import xml.etree.ElementTree as ET
from datetime import date, timedelta

import pytest
Expand Down Expand Up @@ -47,7 +46,7 @@ def mock_schedule_file(tmp_path):
def test_get_release_time_and_delta_day_match(mock_schedule_file):
reader = ScheduleReader(str(mock_schedule_file))
release_time, delta_day = reader.get_release_time_and_delta_day(
cclass="od", stream="oper/wave", domain="g", time="00:00", step="0000", diss_type="an"
cclass="od", stream="oper/wave", domain="g", time_in="00:00", step="0000", diss_type="an"
)
assert release_time == "05:35:00"
assert delta_day == 1
Expand All @@ -56,7 +55,7 @@ def test_get_release_time_and_delta_day_match(mock_schedule_file):
def test_get_release_time_and_delta_day_no_match(mock_schedule_file):
reader = ScheduleReader(str(mock_schedule_file))
release_time, delta_day = reader.get_release_time_and_delta_day(
cclass="od", stream="nonexistent", domain="g", time="00:00", step="0000", diss_type="an"
cclass="od", stream="nonexistent", domain="g", time_in="00:00", step="0000", diss_type="an"
)
assert release_time is None
assert delta_day is None
Expand All @@ -78,37 +77,29 @@ def test_check_released(mock_schedule_file):


def test_find_tag():
xml_data = """
<root>
<product>
<diss_domain>g</diss_domain>
<domain>m</domain>
</product>
</root>
"""
tree = ET.ElementTree(ET.fromstring(xml_data))
product = tree.find("product")
# Test case 1: Tag exists as 'diss_{keyword}'
product = {"diss_domain": "g", "domain": "m"}
assert find_tag(product, "domain") == "g"

xml_data = """
<root>
<product>
<domain>m</domain>
</product>
</root>
"""
tree = ET.ElementTree(ET.fromstring(xml_data))
product = tree.find("product")
# Test case 2: Tag exists as '{keyword}'
product = {"domain": "m"}
assert find_tag(product, "domain") == "m"

xml_data = """
<root>
<product>
</product>
</root>
"""
tree = ET.ElementTree(ET.fromstring(xml_data))
product = tree.find("product")
# Test case 3: Tag does not exist
product = {}
with pytest.raises(IOError):
find_tag(product, "domain")

# Test case 4: Tag exists as both 'diss_{keyword}' and '{keyword}', should return 'diss_{keyword}'
product = {"diss_domain": "g", "domain": "m"}
assert find_tag(product, "domain") == "g"

# Test case 5: Tag exists as 'diss_{keyword}' but is None
product = {"diss_domain": None, "domain": "m"}
assert find_tag(product, "domain") == "m"

# Test case 6: Tag exists as '{keyword}' but is None
product = {"domain": None}
with pytest.raises(IOError):
find_tag(product, "domain")

Expand Down

0 comments on commit 2abae04

Please sign in to comment.