Skip to content
This repository has been archived by the owner on May 2, 2024. It is now read-only.

Commit

Permalink
Add mars request dispatch
Browse files Browse the repository at this point in the history
  • Loading branch information
cfkanesan committed Mar 28, 2024
1 parent 8496832 commit ac68370
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 13 deletions.
14 changes: 5 additions & 9 deletions src/idpi/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,6 @@
# Local
from . import config, mars

GRIB_DEF = {
mars.Model.COSMO_1E: "cosmo",
mars.Model.COSMO_2E: "cosmo",
mars.Model.ICON_CH1_EPS: "cosmo",
mars.Model.ICON_CH2_EPS: "cosmo",
}


@contextmanager
def cosmo_grib_defs():
Expand Down Expand Up @@ -98,7 +91,7 @@ def _(self, request: dict) -> Iterator:
# validate the request
req = mars.Request(**req_kwargs)

grib_def = config.get("data_scope", GRIB_DEF[req.model])
grib_def = config.get("data_scope", "cosmo")
with grib_def_ctx(grib_def):
if self.datafiles:
fs = ekd.from_source("file", self.datafiles)
Expand All @@ -115,12 +108,15 @@ def _(self, request: dict) -> Iterator:
asynchronous=False,
)
urls = [p["location"] for p in pointers]
print(urls)
source = ekd.from_source("url", urls)
else:
source = ekd.from_source("fdb", req.to_fdb())
yield from source # type: ignore

@retrieve.register
def _(self, request: mars.Request) -> Iterator:
yield from self.retrieve(request.dump())

@retrieve.register
def _(self, request: str) -> Iterator:
yield from self.retrieve({"param": request})
Expand Down
4 changes: 2 additions & 2 deletions src/idpi/grib_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import xarray as xr

# Local
from . import data_source, metadata, tasking
from . import data_source, mars, metadata, tasking

logger = logging.getLogger(__name__)

Expand All @@ -28,7 +28,7 @@
}
INV_DIM_MAP = {v: k for k, v in DIM_MAP.items()}

Request = str | tuple | dict
Request = str | tuple | dict | mars.Request


def _is_ensemble(field) -> bool:
Expand Down
19 changes: 17 additions & 2 deletions tests/test_idpi/test_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,10 @@ def test_retrieve_files_ifs(mock_from_source, mock_grib_def_ctx):


def test_retrieve_fdb(mock_from_source, mock_grib_def_ctx):
datafiles = []
param = "U"
template = {"date": "20200101", "time": "0000"}

source = data_source.DataSource(datafiles, request_template=template)
source = data_source.DataSource(request_template=template)
for _ in source.retrieve(param):
pass

Expand All @@ -85,3 +84,19 @@ def test_retrieve_fdb(mock_from_source, mock_grib_def_ctx):
call("fdb", mars.Request(param, **template).to_fdb()),
call().__iter__(),
]


def test_retrieve_fdb_mars(mock_from_source, mock_grib_def_ctx):
param = "U"
request = mars.Request(param=param)
template = {"date": "20200101", "time": "0000"}

source = data_source.DataSource(request_template=template)
for _ in source.retrieve(request):
pass

assert mock_grib_def_ctx.mock_calls == [call("cosmo")]
assert mock_from_source.mock_calls == [
call("fdb", mars.Request(param, **template).to_fdb()),
call().__iter__(),
]

0 comments on commit ac68370

Please sign in to comment.