Skip to content

Commit

Permalink
add /processes
Browse files Browse the repository at this point in the history
  • Loading branch information
Roman Schmidt committed Jan 3, 2024
1 parent 187f6d7 commit df90903
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 3 deletions.
17 changes: 17 additions & 0 deletions openeo_fastapi/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,22 @@ def register_get_collection(self):
endpoint=self.client.get_collection,
)

def register_get_processes(self):
"""Register Endpoint for Processes (GET /processes).
Returns:
None
"""
self.router.add_api_route(
name="processes",
path="/processes",
response_model=None,
response_model_exclude_unset=False,
response_model_exclude_none=True,
methods=["GET"],
endpoint=self.client.get_processes,
)

def register_core(self):
"""Register core OpenEO endpoints.
Expand All @@ -88,6 +104,7 @@ def register_core(self):
self.register_get_capabilities()
self.register_get_collections()
self.register_get_collection()
self.register_get_processes()

def __attrs_post_init__(self):
"""Post-init hook.
Expand Down
3 changes: 2 additions & 1 deletion openeo_fastapi/client/collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import aiohttp
from fastapi import APIRouter
from starlette.responses import JSONResponse
from yurl import URL

from openeo_fastapi.client.models import Collection, Collections

Expand All @@ -25,7 +26,7 @@ async def get_collections():

async def get_collection(collection_id):
"""
Metadata for specific datasets
Metadata for specific dataset
"""

async with aiohttp.ClientSession() as client:
Expand Down
6 changes: 6 additions & 0 deletions openeo_fastapi/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from openeo_fastapi.client import models
from openeo_fastapi.client.collections import get_collection, get_collections
from openeo_fastapi.client.processes import get_processes


@define
Expand Down Expand Up @@ -45,3 +46,8 @@ async def get_collection(self, collection_id) -> models.Collection:
async def get_collections(self) -> models.Collections:
collections = await get_collections()
return collections

@abc.abstractclassmethod
async def get_processes(self) -> dict:
processes = await get_processes()
return processes
98 changes: 98 additions & 0 deletions openeo_fastapi/client/processes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import functools
import importlib
import inspect
import os

from fastapi import APIRouter
from odc.stac import stac_load
from openeo_pg_parser_networkx import ProcessRegistry
from openeo_pg_parser_networkx.process_registry import Process
from openeo_processes_dask.process_implementations.core import process
from openeo_processes_dask.specs import load_collection as load_collection_spec
from openeo_processes_dask.specs import save_result as save_result_spec
from pystac_client import Client
from starlette.responses import FileResponse

router_processes = APIRouter()


def load_collection(
max_items=None,
limit=None,
ids=None,
collections=None,
bbox=None,
intersects=None,
datetime=None,
query=None,
filter=None,
filter_lang=None,
sortby=None,
fields=None,
bands=None,
crs=None,
resolution=None,
):
catalog = Client.open(os.getenv("STAC_API_URL"))
query = catalog.search(
max_items=max_items,
limit=limit,
ids=ids,
collections=collections,
bbox=bbox,
intersects=intersects,
datetime=datetime,
query=query,
filter=filter,
filter_lang=filter_lang,
sortby=sortby,
fields=fields,
)
items = list(query.get_items())
ods = stac_load(items, bands=bands, crs=crs, resolution=resolution)
od = ods.to_array()
return od


def save_result(data, filename="output.nc"):
data.to_netcdf(filename)
return FileResponse(
filename, media_type="application/octet-stream", filename=filename
)


@functools.cache
async def get_processes():
"""
Basic metadata for all datasets
"""

process_registry = ProcessRegistry(wrap_funcs=[process])

processes_from_module = [
func
for _, func in inspect.getmembers(
importlib.import_module("openeo_processes_dask.process_implementations"),
inspect.isfunction,
)
]

specs_module = importlib.import_module("openeo_processes_dask.specs")
specs = {
func.__name__: getattr(specs_module, func.__name__)
for func in processes_from_module
}

for func in processes_from_module:
process_registry[func.__name__] = Process(
spec=specs[func.__name__], implementation=func
)

process_registry["load_collection"] = Process(
spec=load_collection_spec, implementation=load_collection
)
process_registry["save_result"] = Process(
spec=save_result_spec, implementation=save_result
)

return process_registry["predefined", None]
14 changes: 12 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,18 @@ pydantic = "<2"
attrs = "^23.1.0"
httpx = "^0.24.1"
aiohttp = ">3.9"


yurl = ">=1.0.0"
pystac-client = ">=0.7.5"
openeo-pg-parser-networkx = ">=2024.1.1"
openeo-processes-dask = ">=2023.11.6"
odc-stac = ">=0.3.8"
dask-geopandas = ">=0.3.1"
rioxarray = ">=0.15.0"
xvec = ">=0.2.0"
joblib = ">=1.3.2"
planetary_computer = ">=1.0.0"
stackstac = ">=0.5.0"
stac-validator = ">=3.3.2"

[tool.poetry.group.dev.dependencies]
pytest = "^7.2.0"
Expand Down
11 changes: 11 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,14 @@ def test_get_capabilities(core_api):

assert response.status_code == 200
assert response.json()["title"] == "Test Api"


def test_get_processes(core_api):
"""Test the OpenEOApi and OpenEOCore classes interact as intended."""

test_app = TestClient(core_api.app)

response = test_app.get("/processes")

assert response.status_code == 200
assert "save_result" in response.json().keys()

0 comments on commit df90903

Please sign in to comment.