diff --git a/scripts/extractions/extract.py b/scripts/extractions/extract.py index 7f19201..4bfc6c6 100644 --- a/scripts/extractions/extract.py +++ b/scripts/extractions/extract.py @@ -16,39 +16,39 @@ from openeo_gfmap.backend import cdse_connection from openeo_gfmap.manager.job_manager import GFMAPJobManager from openeo_gfmap.manager.job_splitters import load_s2_grid, split_job_s2grid -from patch_extractions.extract_meteo import ( - create_datacube_meteo, - create_job_dataframe_meteo, +from patch_extractions.extract_patch_meteo import ( + create_job_dataframe_patch_meteo, + create_job_patch_meteo, ) -from patch_extractions.extract_optical import ( - create_datacube_optical, - create_job_dataframe_s2, +from patch_extractions.extract_patch_s2 import ( + create_job_dataframe_patch_s2, + create_job_patch_s2, ) -from point_extractions.extract_point import ( - create_datacube_point, - create_job_dataframe_point, - generate_output_path_point, - post_job_action_point, +from point_extractions.extract_point_worldcereal import ( + create_job_dataframe_point_worldcereal, + create_job_point_worldcereal, + generate_output_path_point_worldcereal, + post_job_action_point_worldcereal, ) from worldcereal.openeo.extract import ( - generate_output_path, + generate_output_path_patch, pipeline_log, - post_job_action, + post_job_action_patch, ) from worldcereal.stac.constants import ExtractionCollection -from patch_extractions.extract_sar import ( # isort: skip - create_datacube_sar, - create_job_dataframe_s1, +from patch_extractions.extract_patch_s1 import ( # isort: skip + create_job_patch_s1, + create_job_dataframe_patch_s1, ) -from patch_extractions.extract_worldcereal import ( # isort: skip - create_datacube_worldcereal, - create_job_dataframe_worldcereal, - post_job_action_worldcereal, - generate_output_path_worldcereal, +from patch_extractions.extract_patch_worldcereal import ( # isort: skip + create_job_patch_worldcereal, + create_job_dataframe_patch_worldcereal, + post_job_action_patch_worldcereal, + generate_output_path_patch_worldcereal, ) @@ -121,11 +121,11 @@ def prepare_job_dataframe( pipeline_log.info("Dataframes split to jobs, creating the job dataframe...") collection_switch: dict[ExtractionCollection, typing.Callable] = { - ExtractionCollection.SENTINEL1: create_job_dataframe_s1, - ExtractionCollection.SENTINEL2: create_job_dataframe_s2, - ExtractionCollection.METEO: create_job_dataframe_meteo, - ExtractionCollection.WORLDCEREAL: create_job_dataframe_worldcereal, - ExtractionCollection.POINT: create_job_dataframe_point, + ExtractionCollection.PATCH_SENTINEL1: create_job_dataframe_patch_s1, + ExtractionCollection.PATCH_SENTINEL2: create_job_dataframe_patch_s2, + ExtractionCollection.PATCH_METEO: create_job_dataframe_patch_meteo, + ExtractionCollection.PATCH_WORLDCEREAL: create_job_dataframe_patch_worldcereal, + ExtractionCollection.POINT_WORLDCEREAL: create_job_dataframe_point_worldcereal, } create_job_dataframe_fn = collection_switch.get( @@ -156,32 +156,32 @@ def setup_extraction_functions( """ datacube_creation = { - ExtractionCollection.SENTINEL1: partial( - create_datacube_sar, + ExtractionCollection.PATCH_SENTINEL1: partial( + create_job_patch_s1, executor_memory=memory, python_memory=python_memory, max_executors=max_executors, ), - ExtractionCollection.SENTINEL2: partial( - create_datacube_optical, + ExtractionCollection.PATCH_SENTINEL2: partial( + create_job_patch_s2, executor_memory=memory, python_memory=python_memory, max_executors=max_executors, ), - ExtractionCollection.METEO: partial( - create_datacube_meteo, + ExtractionCollection.PATCH_METEO: partial( + create_job_patch_meteo, executor_memory=memory, python_memory=python_memory, max_executors=max_executors, ), - ExtractionCollection.WORLDCEREAL: partial( - create_datacube_worldcereal, + ExtractionCollection.PATCH_WORLDCEREAL: partial( + create_job_patch_worldcereal, executor_memory=memory, python_memory=python_memory, max_executors=max_executors, ), - ExtractionCollection.POINT: partial( - create_datacube_point, + ExtractionCollection.POINT_WORLDCEREAL: partial( + create_job_point_worldcereal, executor_memory=memory, python_memory=python_memory, max_executors=max_executors, @@ -196,19 +196,21 @@ def setup_extraction_functions( ) path_fns = { - ExtractionCollection.SENTINEL1: partial( - generate_output_path, s2_grid=load_s2_grid() + ExtractionCollection.PATCH_SENTINEL1: partial( + generate_output_path_patch, s2_grid=load_s2_grid() ), - ExtractionCollection.SENTINEL2: partial( - generate_output_path, s2_grid=load_s2_grid() + ExtractionCollection.PATCH_SENTINEL2: partial( + generate_output_path_patch, s2_grid=load_s2_grid() ), - ExtractionCollection.METEO: partial( - generate_output_path, s2_grid=load_s2_grid() + ExtractionCollection.PATCH_METEO: partial( + generate_output_path_patch, s2_grid=load_s2_grid() ), - ExtractionCollection.WORLDCEREAL: partial( - generate_output_path_worldcereal, s2_grid=load_s2_grid() + ExtractionCollection.PATCH_WORLDCEREAL: partial( + generate_output_path_patch_worldcereal, s2_grid=load_s2_grid() + ), + ExtractionCollection.POINT_WORLDCEREAL: partial( + generate_output_path_point_worldcereal ), - ExtractionCollection.POINT: partial(generate_output_path_point), } path_fn = path_fns.get( @@ -219,37 +221,37 @@ def setup_extraction_functions( ) post_job_actions = { - ExtractionCollection.SENTINEL1: partial( - post_job_action, + ExtractionCollection.PATCH_SENTINEL1: partial( + post_job_action_patch, extract_value=extract_value, description="Sentinel1 GRD raw observations, unprocessed.", title="Sentinel-1 GRD", spatial_resolution="20m", s1_orbit_fix=True, ), - ExtractionCollection.SENTINEL2: partial( - post_job_action, + ExtractionCollection.PATCH_SENTINEL2: partial( + post_job_action_patch, extract_value=extract_value, description="Sentinel2 L2A observations, processed.", title="Sentinel-2 L2A", spatial_resolution="10m", ), - ExtractionCollection.METEO: partial( - post_job_action, + ExtractionCollection.PATCH_METEO: partial( + post_job_action_patch, extract_value=extract_value, description="Meteo observations", title="Meteo observations", spatial_resolution="1deg", ), - ExtractionCollection.WORLDCEREAL: partial( - post_job_action_worldcereal, + ExtractionCollection.PATCH_WORLDCEREAL: partial( + post_job_action_patch_worldcereal, extract_value=extract_value, description="WorldCereal preprocessed inputs", title="WorldCereal inputs", spatial_resolution="10m", ), - ExtractionCollection.POINT: partial( - post_job_action_point, + ExtractionCollection.POINT_WORLDCEREAL: partial( + post_job_action_point_worldcereal, ), } diff --git a/scripts/extractions/patch_extractions/extract_meteo.py b/scripts/extractions/patch_extractions/extract_patch_meteo.py similarity index 97% rename from scripts/extractions/patch_extractions/extract_meteo.py rename to scripts/extractions/patch_extractions/extract_patch_meteo.py index eec3d84..ae8662c 100644 --- a/scripts/extractions/patch_extractions/extract_meteo.py +++ b/scripts/extractions/patch_extractions/extract_patch_meteo.py @@ -15,13 +15,13 @@ ) # isort: skip -def create_job_dataframe_meteo( +def create_job_dataframe_patch_meteo( backend: Backend, split_jobs: List[gpd.GeoDataFrame] ) -> pd.DataFrame: raise NotImplementedError("This function is not implemented yet.") -def create_datacube_meteo( +def create_job_patch_meteo( row: pd.Series, connection: openeo.DataCube, provider=None, diff --git a/scripts/extractions/patch_extractions/extract_sar.py b/scripts/extractions/patch_extractions/extract_patch_s1.py similarity index 99% rename from scripts/extractions/patch_extractions/extract_sar.py rename to scripts/extractions/patch_extractions/extract_patch_s1.py index 1643d7a..b23ea5a 100644 --- a/scripts/extractions/patch_extractions/extract_sar.py +++ b/scripts/extractions/patch_extractions/extract_patch_s1.py @@ -30,7 +30,7 @@ S1_GRD_CATALOGUE_BEGIN_DATE = datetime(2014, 10, 1) -def create_job_dataframe_s1( +def create_job_dataframe_patch_s1( backend: Backend, split_jobs: List[gpd.GeoDataFrame], ) -> pd.DataFrame: @@ -117,7 +117,7 @@ def create_job_dataframe_s1( return pd.DataFrame(rows) -def create_datacube_sar( +def create_job_patch_s1( row: pd.Series, connection: openeo.DataCube, provider, diff --git a/scripts/extractions/patch_extractions/extract_optical.py b/scripts/extractions/patch_extractions/extract_patch_s2.py similarity index 98% rename from scripts/extractions/patch_extractions/extract_optical.py rename to scripts/extractions/patch_extractions/extract_patch_s2.py index 0be4616..7af3f1f 100644 --- a/scripts/extractions/patch_extractions/extract_optical.py +++ b/scripts/extractions/patch_extractions/extract_patch_s2.py @@ -23,7 +23,7 @@ S2_L2A_CATALOGUE_BEGIN_DATE = datetime(2017, 1, 1) -def create_job_dataframe_s2( +def create_job_dataframe_patch_s2( backend: Backend, split_jobs: List[gpd.GeoDataFrame], ) -> pd.DataFrame: @@ -67,7 +67,7 @@ def create_job_dataframe_s2( return pd.DataFrame(rows) -def create_datacube_optical( +def create_job_patch_s2( row: pd.Series, connection: openeo.DataCube, provider=None, diff --git a/scripts/extractions/patch_extractions/extract_worldcereal.py b/scripts/extractions/patch_extractions/extract_patch_worldcereal.py similarity index 98% rename from scripts/extractions/patch_extractions/extract_worldcereal.py rename to scripts/extractions/patch_extractions/extract_patch_worldcereal.py index e134159..f862d75 100644 --- a/scripts/extractions/patch_extractions/extract_worldcereal.py +++ b/scripts/extractions/patch_extractions/extract_patch_worldcereal.py @@ -43,7 +43,7 @@ WORLDCEREAL_BEGIN_DATE = datetime(2017, 1, 1) -def create_job_dataframe_worldcereal( +def create_job_dataframe_patch_worldcereal( backend: Backend, split_jobs: List[gpd.GeoDataFrame], ) -> pd.DataFrame: @@ -119,7 +119,7 @@ def create_job_dataframe_worldcereal( return pd.DataFrame(rows) -def create_datacube_worldcereal( +def create_job_patch_worldcereal( row: pd.Series, connection: openeo.DataCube, provider, @@ -333,7 +333,7 @@ def postprocess_extracted_file( shutil.move(tempfile, item_asset_path) -def post_job_action_worldcereal( +def post_job_action_patch_worldcereal( job_items: List[pystac.Item], row: pd.Series, extract_value: int, @@ -397,7 +397,7 @@ def post_job_action_worldcereal( return job_items -def generate_output_path_worldcereal( +def generate_output_path_patch_worldcereal( root_folder: Path, geometry_index: int, row: pd.Series, s2_grid: gpd.GeoDataFrame ): """Generate the output path for the extracted data, from a base path and diff --git a/scripts/extractions/point_extractions/extract_point.py b/scripts/extractions/point_extractions/extract_point_worldcereal.py similarity index 96% rename from scripts/extractions/point_extractions/extract_point.py rename to scripts/extractions/point_extractions/extract_point_worldcereal.py index fa97bf6..4fdccfa 100644 --- a/scripts/extractions/point_extractions/extract_point.py +++ b/scripts/extractions/point_extractions/extract_point_worldcereal.py @@ -18,7 +18,9 @@ ) -def generate_output_path_point(root_folder: Path, geometry_index: int, row: pd.Series): +def generate_output_path_point_worldcereal( + root_folder: Path, geometry_index: int, row: pd.Series +): """ For point extractions, only one asset (a geoparquet file) is generated per job. Therefore geometry_index is always 0. @@ -45,7 +47,7 @@ def generate_output_path_point(root_folder: Path, geometry_index: int, row: pd.S return real_subfolder / f"point_extractions{row.out_extension}" -def create_job_dataframe_point( +def create_job_dataframe_point_worldcereal( backend: Backend, split_jobs: List[gpd.GeoDataFrame] ) -> pd.DataFrame: """Create a dataframe from the split jobs, containg all the necessary information to run the job.""" @@ -84,7 +86,7 @@ def create_job_dataframe_point( return pd.DataFrame(rows) -def create_datacube_point( +def create_job_point_worldcereal( row: pd.Series, connection: openeo.DataCube, provider, @@ -144,7 +146,7 @@ def create_datacube_point( ) -def post_job_action_point( +def post_job_action_point_worldcereal( job_items: List[pystac.Item], row: pd.Series, parameters: Optional[dict] = None ) -> list: for idx, item in enumerate(job_items): diff --git a/src/worldcereal/openeo/extract.py b/src/worldcereal/openeo/extract.py index 7afe599..29f593e 100644 --- a/src/worldcereal/openeo/extract.py +++ b/src/worldcereal/openeo/extract.py @@ -41,7 +41,7 @@ def filter(self, record): stream_handler.addFilter(ManagerLoggerFilter()) -def post_job_action( +def post_job_action_patch( job_items: List[pystac.Item], row: pd.Series, extract_value: int, @@ -131,7 +131,7 @@ def post_job_action( return job_items -def generate_output_path( +def generate_output_path_patch( root_folder: Path, geometry_index: int, row: pd.Series, s2_grid: gpd.GeoDataFrame ): """Generate the output path for the extracted data, from a base path and diff --git a/src/worldcereal/stac/constants.py b/src/worldcereal/stac/constants.py index 9f09a34..593cd7d 100644 --- a/src/worldcereal/stac/constants.py +++ b/src/worldcereal/stac/constants.py @@ -8,11 +8,11 @@ class ExtractionCollection(Enum): """Collections that can be extracted in the extraction scripts.""" - SENTINEL1 = "SENTINEL1" - SENTINEL2 = "SENTINEL2" - METEO = "METEO" - WORLDCEREAL = "WORLDCEREAL" - POINT = "POINT" + PATCH_SENTINEL1 = "PATCH_SENTINEL1" + PATCH_SENTINEL2 = "PATCH_SENTINEL2" + PATCH_METEO = "PATCH_METEO" + PATCH_WORLDCEREAL = "PATCH_WORLDCEREAL" + POINT_WORLDCEREAL = "POINT_WORLDCEREAL" # Define the sentinel 1 asset @@ -182,34 +182,34 @@ class ExtractionCollection(Enum): COLLECTION_IDS = { - ExtractionCollection.SENTINEL1: "SENTINEL1-EXTRACTION", - ExtractionCollection.SENTINEL2: "sentinel2-EXTRACTION", - ExtractionCollection.METEO: "METEO-EXTRACTION", - ExtractionCollection.WORLDCEREAL: "WORLDCEREAL-INPUTS", + ExtractionCollection.PATCH_SENTINEL1: "SENTINEL1-EXTRACTION", + ExtractionCollection.PATCH_SENTINEL2: "sentinel2-EXTRACTION", + ExtractionCollection.PATCH_METEO: "METEO-EXTRACTION", + ExtractionCollection.PATCH_WORLDCEREAL: "WORLDCEREAL-INPUTS", } COLLECTION_DESCRIPTIONS = { - ExtractionCollection.SENTINEL1: "Sentinel1 GRD data extraction.", - ExtractionCollection.SENTINEL2: "Sentinel2 L2A data extraction.", - ExtractionCollection.METEO: "Meteo data extraction.", - ExtractionCollection.WORLDCEREAL: "WorldCereal preprocessed inputs extraction.", + ExtractionCollection.PATCH_SENTINEL1: "Sentinel1 GRD data extraction.", + ExtractionCollection.PATCH_SENTINEL2: "Sentinel2 L2A data extraction.", + ExtractionCollection.PATCH_METEO: "Meteo data extraction.", + ExtractionCollection.PATCH_WORLDCEREAL: "WorldCereal preprocessed inputs extraction.", } CONSTELLATION_NAMES = { - ExtractionCollection.SENTINEL1: "sentinel1", - ExtractionCollection.SENTINEL2: "sentinel2", - ExtractionCollection.METEO: "agera5", - ExtractionCollection.WORLDCEREAL: "worldcereal", + ExtractionCollection.PATCH_SENTINEL1: "sentinel1", + ExtractionCollection.PATCH_SENTINEL2: "sentinel2", + ExtractionCollection.PATCH_METEO: "agera5", + ExtractionCollection.PATCH_WORLDCEREAL: "worldcereal", } ITEM_ASSETS = { - ExtractionCollection.SENTINEL1: {"sentinel1": SENTINEL1_ASSET}, - ExtractionCollection.SENTINEL2: {"sentinel2": SENTINEL2_ASSET}, - ExtractionCollection.METEO: {"agera5": METEO_ASSET}, - ExtractionCollection.WORLDCEREAL: None, + ExtractionCollection.PATCH_SENTINEL1: {"sentinel1": SENTINEL1_ASSET}, + ExtractionCollection.PATCH_SENTINEL2: {"sentinel2": SENTINEL2_ASSET}, + ExtractionCollection.PATCH_METEO: {"agera5": METEO_ASSET}, + ExtractionCollection.PATCH_WORLDCEREAL: None, } COLLECTION_REGEXES = { - ExtractionCollection.SENTINEL1: r"^S1-SIGMA0-10m_(?:ASCENDING|DESCENDING)_(.*)_[0-9]{4,5}_([0-9]{4}-[0-9]{2}-[0-9]{2}(?:_)?){2}.nc$", - ExtractionCollection.SENTINEL2: r"^S2-L2A-10m_(.*)_[0-9]{4,5}_([0-9]{4}-[0-9]{2}-[0-9]{2}(?:_)?){2}.nc$", + ExtractionCollection.PATCH_SENTINEL1: r"^S1-SIGMA0-10m_(?:ASCENDING|DESCENDING)_(.*)_[0-9]{4,5}_([0-9]{4}-[0-9]{2}-[0-9]{2}(?:_)?){2}.nc$", + ExtractionCollection.PATCH_SENTINEL2: r"^S2-L2A-10m_(.*)_[0-9]{4,5}_([0-9]{4}-[0-9]{2}-[0-9]{2}(?:_)?){2}.nc$", }