From fe6e6efb13151c8c92b0101764990ffd473fe64e Mon Sep 17 00:00:00 2001 From: Vincent Verelst Date: Mon, 16 Dec 2024 13:29:02 +0100 Subject: [PATCH 01/22] updated generate_output_path_patch to include asset_id #231 --- src/worldcereal/openeo/extract.py | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/src/worldcereal/openeo/extract.py b/src/worldcereal/openeo/extract.py index c4d27ebb..3e07aa7c 100644 --- a/src/worldcereal/openeo/extract.py +++ b/src/worldcereal/openeo/extract.py @@ -3,6 +3,7 @@ import json import logging import os +import re import shutil from datetime import datetime from importlib.metadata import version @@ -153,15 +154,31 @@ def post_job_action_patch( def generate_output_path_patch( - root_folder: Path, geometry_index: int, row: pd.Series, s2_grid: gpd.GeoDataFrame + root_folder: Path, + job_index: int, + row: pd.Series, + asset_id: str, + s2_grid: gpd.GeoDataFrame, ): """Generate the output path for the extracted data, from a base path and the row information. """ - features = geojson.loads(row.geometry) - sample_id = features[geometry_index].properties.get("sample_id", None) - if sample_id is None: - sample_id = features[geometry_index].properties["sampleID"] + # First extract the sample ID from the asset ID + match = re.fullmatch(r"openEO_(.+)\.nc", asset_id) + if match: + sample_id = match.group(1) + else: + pipeline_log.error("Asset ID does not match the expected pattern: %s", asset_id) + raise ValueError(f"Invalid Asset ID format: {asset_id}") + + # Find which index in the FeatureCollection corresponds to the sample_id + features = geojson.loads(row.geometry)["features"] + sample_id_to_index = { + feature.properties.get("sample_id", None): index + for index, feature in enumerate(features) + } + geometry_index = sample_id_to_index.get(sample_id, None) + ref_id = features[geometry_index].properties["ref_id"] if "orbit_state" in row: From 9dcddf97c51201af4a5f02cafcbb65e6f97a051a Mon Sep 17 00:00:00 2001 From: Vincent Verelst Date: Tue, 17 Dec 2024 11:58:52 +0100 Subject: [PATCH 02/22] updated other generate path function as well #231 --- .../patch_extractions/extract_patch_worldcereal.py | 9 ++++++--- .../point_extractions/extract_point_worldcereal.py | 2 +- src/worldcereal/openeo/extract.py | 8 +------- 3 files changed, 8 insertions(+), 11 deletions(-) diff --git a/scripts/extractions/patch_extractions/extract_patch_worldcereal.py b/scripts/extractions/patch_extractions/extract_patch_worldcereal.py index f862d758..fe2b9292 100644 --- a/scripts/extractions/patch_extractions/extract_patch_worldcereal.py +++ b/scripts/extractions/patch_extractions/extract_patch_worldcereal.py @@ -398,13 +398,16 @@ def post_job_action_patch_worldcereal( def generate_output_path_patch_worldcereal( - root_folder: Path, geometry_index: int, row: pd.Series, s2_grid: gpd.GeoDataFrame + root_folder: Path, + job_index: int, + row: pd.Series, + asset_id: str, + s2_grid: gpd.GeoDataFrame, ): """Generate the output path for the extracted data, from a base path and the row information. """ - features = geojson.loads(row.geometry) - sample_id = features[geometry_index].properties.get("sample_id", None) + sample_id = asset_id.replace(".nc", "").replace("openEO_", "") s2_tile_id = row.s2_tile epsg = s2_grid[s2_grid.tile == s2_tile_id].iloc[0].epsg diff --git a/scripts/extractions/point_extractions/extract_point_worldcereal.py b/scripts/extractions/point_extractions/extract_point_worldcereal.py index ad07ec1a..1adb6c1d 100644 --- a/scripts/extractions/point_extractions/extract_point_worldcereal.py +++ b/scripts/extractions/point_extractions/extract_point_worldcereal.py @@ -16,7 +16,7 @@ def generate_output_path_point_worldcereal( - root_folder: Path, geometry_index: int, row: pd.Series + root_folder: Path, geometry_index: int, row: pd.Series, asset_id: str ): """ For point extractions, only one asset (a geoparquet file) is generated per job. diff --git a/src/worldcereal/openeo/extract.py b/src/worldcereal/openeo/extract.py index 3e07aa7c..28b22ae2 100644 --- a/src/worldcereal/openeo/extract.py +++ b/src/worldcereal/openeo/extract.py @@ -3,7 +3,6 @@ import json import logging import os -import re import shutil from datetime import datetime from importlib.metadata import version @@ -164,12 +163,7 @@ def generate_output_path_patch( the row information. """ # First extract the sample ID from the asset ID - match = re.fullmatch(r"openEO_(.+)\.nc", asset_id) - if match: - sample_id = match.group(1) - else: - pipeline_log.error("Asset ID does not match the expected pattern: %s", asset_id) - raise ValueError(f"Invalid Asset ID format: {asset_id}") + sample_id = asset_id.replace(".nc", "").replace("openEO_", "") # Find which index in the FeatureCollection corresponds to the sample_id features = geojson.loads(row.geometry)["features"] From af1aab449af6447763c1013341b291ec21a9154c Mon Sep 17 00:00:00 2001 From: Kristof Van Tricht Date: Wed, 18 Dec 2024 15:18:05 +0100 Subject: [PATCH 03/22] Extract above or equal to target value of `extract` --- scripts/extractions/extract.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/extractions/extract.py b/scripts/extractions/extract.py index 39a992bf..6e213646 100644 --- a/scripts/extractions/extract.py +++ b/scripts/extractions/extract.py @@ -103,7 +103,7 @@ def prepare_job_dataframe( pipeline_log.info("Preparing the job dataframe.") # Filter the input dataframe to only keep the locations to extract - input_df = input_df[input_df["extract"] == extract_value].copy() + input_df = input_df[input_df["extract"] >= extract_value].copy() # Split the locations into chunks of max_locations split_dfs = [] From 0d4cc8213d8dd39e15c64344f98898e87138f6f4 Mon Sep 17 00:00:00 2001 From: Kristof Van Tricht Date: Wed, 18 Dec 2024 15:20:05 +0100 Subject: [PATCH 04/22] Default should be 2 --- scripts/extractions/extract.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/extractions/extract.py b/scripts/extractions/extract.py index 6e213646..e5f94bd9 100644 --- a/scripts/extractions/extract.py +++ b/scripts/extractions/extract.py @@ -372,8 +372,8 @@ def manager_main_loop( parser.add_argument( "--parallel_jobs", type=int, - default=10, - help="The maximum number of parrallel jobs to run at the same time.", + default=2, + help="The maximum number of parallel jobs to run at the same time.", ) parser.add_argument( "--restart_failed", From 4095b0b623f2bda2883cc887f4ffdf10a5d1d8f7 Mon Sep 17 00:00:00 2001 From: Kristof Van Tricht Date: Wed, 18 Dec 2024 15:21:24 +0100 Subject: [PATCH 05/22] Add function --- scripts/extractions/extract.py | 157 ++++++++++++++++++++++----------- 1 file changed, 107 insertions(+), 50 deletions(-) diff --git a/scripts/extractions/extract.py b/scripts/extractions/extract.py index e5f94bd9..427aa088 100644 --- a/scripts/extractions/extract.py +++ b/scripts/extractions/extract.py @@ -334,6 +334,102 @@ def manager_main_loop( raise e +def run_extractions( + collection: ExtractionCollection, + output_folder: Path, + input_df: Path, + max_locations_per_job: int = 500, + memory: str = "1800m", + python_memory: str = "1900m", + max_executors: int = 22, + parallel_jobs: int = 10, + restart_failed: bool = False, + extract_value: int = 1, + backend=Backend.CDSE, +) -> None: + """Main function responsible for launching point and patch extractions. + + Parameters + ---------- + collection : ExtractionCollection + The collection to extract. Most popular: PATCH_WORLDCEREAL, POINT_WORLDCEREAL + output_folder : Path + The folder where to store the extracted data + input_df : Path + Path to the input dataframe containing the geometries + for which extractions need to be done + max_locations_per_job : int, optional + The maximum number of locations to extract per job, by default 500 + memory : str, optional + Memory to allocate for the executor, by default "1800m" + python_memory : str, optional + Memory to allocate for the python processes as well as OrfeoToolbox in the executors, + by default "1900m" + max_executors : int, optional + Number of executors to run, by default 22 + parallel_jobs : int, optional + The maximum number of parallel jobs to run at the same time, by default 10 + restart_failed : bool, optional + Restart the jobs that previously failed, by default False + extract_value : int, optional + All samples with an "extract" value equal or larger than this one, will be extracted, by default 1 + backend : openeo_gfmap.Backend, optional + cloud backend where to run the extractions, by default Backend.CDSE + + Raises + ------ + ValueError + _description_ + """ + + if not output_folder.is_dir(): + output_folder.mkdir(parents=True) + + tracking_df_path = output_folder / "job_tracking.csv" + + # Load the input dataframe and build the job dataframe + input_df = load_dataframe(input_df) + + job_df = None + if not tracking_df_path.exists(): + job_df = prepare_job_dataframe( + input_df, collection, max_locations_per_job, extract_value, backend + ) + + # Setup the extraction functions + pipeline_log.info("Setting up the extraction functions.") + datacube_fn, path_fn, post_job_fn = setup_extraction_functions( + collection, extract_value, memory, python_memory, max_executors + ) + + # Initialize and setups the job manager + pipeline_log.info("Initializing the job manager.") + + job_manager = GFMAPJobManager( + output_dir=output_folder, + output_path_generator=path_fn, + post_job_action=post_job_fn, + poll_sleep=60, + n_threads=4, + restart_failed=restart_failed, + stac_enabled=False, + ) + + job_manager.add_backend( + backend.value, + cdse_connection, + parallel_jobs=parallel_jobs, + ) + + manager_main_loop(job_manager, collection, job_df, datacube_fn, tracking_df_path) + + pipeline_log.info("Extraction completed successfully.") + send_notification( + title=f"WorldCereal Extraction {collection.value} - Completed", + message="Extractions have been completed successfully.", + ) + + if __name__ == "__main__": parser = argparse.ArgumentParser(description="Extract data from a collection") parser.add_argument( @@ -389,55 +485,16 @@ def manager_main_loop( args = parser.parse_args() - # Fetches values and setups hardocded values - collection = args.collection - extract_value = args.extract_value - max_locations_per_job = args.max_locations - backend = Backend.CDSE - - if not args.output_folder.is_dir(): - raise ValueError(f"Output folder {args.output_folder} does not exist.") - - tracking_df_path = Path(args.output_folder) / "job_tracking.csv" - - # Load the input dataframe and build the job dataframe - input_df = load_dataframe(args.input_df) - - job_df = None - if not tracking_df_path.exists(): - job_df = prepare_job_dataframe( - input_df, collection, max_locations_per_job, extract_value, backend - ) - - # Setup the extraction functions - pipeline_log.info("Setting up the extraction functions.") - datacube_fn, path_fn, post_job_fn = setup_extraction_functions( - collection, extract_value, args.memory, args.python_memory, args.max_executors - ) - - # Initialize and setups the job manager - pipeline_log.info("Initializing the job manager.") - - job_manager = GFMAPJobManager( - output_dir=args.output_folder, - output_path_generator=path_fn, - post_job_action=post_job_fn, - poll_sleep=60, - n_threads=4, - restart_failed=args.restart_failed, - stac_enabled=False, - ) - - job_manager.add_backend( - Backend.CDSE.value, - cdse_connection, + run_extractions( + collection=args.collection, + output_folder=args.output_folder, + input_df=args.input_df, + max_locations_per_job=args.max_locations, + memory=args.memory, + python_memory=args.python_memory, + max_executors=args.max_executors, parallel_jobs=args.parallel_jobs, - ) - - manager_main_loop(job_manager, collection, job_df, datacube_fn, tracking_df_path) - - pipeline_log.info("Extraction completed successfully.") - send_notification( - title=f"WorldCereal Extraction {collection.value} - Completed", - message="Extractions have been completed successfully.", + restart_failed=args.restart_failed, + extract_value=args.extract_value, + backend=Backend.CDSE, ) From f079dd605fe622cbd5c95bdd94ad217253902424 Mon Sep 17 00:00:00 2001 From: Kristof Van Tricht Date: Wed, 18 Dec 2024 15:34:40 +0100 Subject: [PATCH 06/22] Update output path generator for points --- .../extract_point_worldcereal.py | 50 +++++++++++++------ 1 file changed, 35 insertions(+), 15 deletions(-) diff --git a/scripts/extractions/point_extractions/extract_point_worldcereal.py b/scripts/extractions/point_extractions/extract_point_worldcereal.py index ad07ec1a..c764275e 100644 --- a/scripts/extractions/point_extractions/extract_point_worldcereal.py +++ b/scripts/extractions/point_extractions/extract_point_worldcereal.py @@ -16,32 +16,52 @@ def generate_output_path_point_worldcereal( - root_folder: Path, geometry_index: int, row: pd.Series -): - """ - For point extractions, only one asset (a geoparquet file) is generated per job. - Therefore geometry_index is always 0. - It has to be included in the function signature to be compatible with the GFMapJobManager. + root_folder: Path, + geometry_index: int, + row: pd.Series, + asset_id: Optional[str] = None, +) -> Path: + """Method to generate the output path for the point extractions. + + Parameters + ---------- + root_folder : Path + root folder where the output parquet file will be saved + geometry_index : int + For point extractions, only one asset (a geoparquet file) is generated per job. + Therefore geometry_index is always 0. It has to be included in the function signature + to be compatible with the GFMapJobManager + row : pd.Series + the current job row from the GFMapJobManager + asset_id : str, optional + Needed for compatibility with GFMapJobManager but not used. + + Returns + ------- + Path + output path for the point extractions parquet file """ - features = geojson.loads(row.geometry) - ref_id = features[geometry_index].properties["ref_id"] s2_tile_id = row.s2_tile + utm_zone = str(s2_tile_id[0:2]) - subfolder = root_folder / ref_id / s2_tile_id - + # Create the subfolder to store the output + subfolder = root_folder / utm_zone / s2_tile_id subfolder.mkdir(parents=True, exist_ok=True) - # Subfolder is not necessarily unique, so we create numbered folders. + # Subfolder is not necessarily unique, so we create subfolders for + # the different parts. if not any(subfolder.iterdir()): - real_subfolder = subfolder / "0" + real_subfolder = subfolder / "part_0" else: i = 0 - while (subfolder / str(i)).exists(): + while (subfolder / f"part_{i}").exists(): i += 1 - real_subfolder = subfolder / str(i) + real_subfolder = subfolder / f"part_{i}" + + output_file = f"WORLDCEREAL_{root_folder.name}_{row.start_date}_{row.end_date}_{s2_tile_id}_part_{i}{row.out_extension}" - return real_subfolder / f"point_extractions{row.out_extension}" + return real_subfolder / output_file def create_job_dataframe_point_worldcereal( From 8abf05f1686d810b467f8c90a08e23f1ae18b169 Mon Sep 17 00:00:00 2001 From: Kristof Van Tricht Date: Wed, 18 Dec 2024 15:36:00 +0100 Subject: [PATCH 07/22] Preprocessed worldcereal should have 1st day till last day of months --- .../point_extractions/extract_point_worldcereal.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/scripts/extractions/point_extractions/extract_point_worldcereal.py b/scripts/extractions/point_extractions/extract_point_worldcereal.py index c764275e..4ad9a606 100644 --- a/scripts/extractions/point_extractions/extract_point_worldcereal.py +++ b/scripts/extractions/point_extractions/extract_point_worldcereal.py @@ -72,10 +72,15 @@ def create_job_dataframe_point_worldcereal( for job in tqdm(split_jobs): min_time = job.valid_time.min() max_time = job.valid_time.max() + # 9 months before and after the valid time start_date = (min_time - pd.Timedelta(days=275)).to_pydatetime() end_date = (max_time + pd.Timedelta(days=275)).to_pydatetime() + # ensure start date is 1st day of month, end date is last day of month + start_date = start_date.replace(day=1) + end_date = end_date.replace(day=1) + pd.offsets.MonthEnd(0) + s2_tile = job.tile.iloc[0] h3_l3_cell = job.h3_l3_cell.iloc[0] From 4bc749cf1b2f634ea9d66bcc893315a7e337b69e Mon Sep 17 00:00:00 2001 From: Kristof Van Tricht Date: Wed, 18 Dec 2024 15:37:39 +0100 Subject: [PATCH 08/22] Add required attributes to dataframe --- .../extract_point_worldcereal.py | 21 +++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/scripts/extractions/point_extractions/extract_point_worldcereal.py b/scripts/extractions/point_extractions/extract_point_worldcereal.py index 4ad9a606..007c7439 100644 --- a/scripts/extractions/point_extractions/extract_point_worldcereal.py +++ b/scripts/extractions/point_extractions/extract_point_worldcereal.py @@ -72,7 +72,7 @@ def create_job_dataframe_point_worldcereal( for job in tqdm(split_jobs): min_time = job.valid_time.min() max_time = job.valid_time.max() - + # 9 months before and after the valid time start_date = (min_time - pd.Timedelta(days=275)).to_pydatetime() end_date = (max_time + pd.Timedelta(days=275)).to_pydatetime() @@ -175,7 +175,24 @@ def post_job_action_point_worldcereal( gdf = gpd.read_parquet(item_asset_path) # Convert the dates to datetime format - gdf["date"] = pd.to_datetime(gdf["date"]) + gdf["timestamp"] = pd.to_datetime(gdf["date"]) + gdf.drop(columns=["date"], inplace=True) + + # Derive latitude and longitude from the geometry + gdf["lat"] = gdf.geometry.y + gdf["lon"] = gdf.geometry.x + + # For each sample, add start and end date to the dataframe + # is there a better way to do this, as this is already done in the job creation? + sample_ids = gdf["sample_id"].unique() + for sample_id in sample_ids: + sample = gdf[gdf["sample_id"] == sample_id] + start_date = sample["timestamp"].min() + end_date = sample["timestamp"].max() + gdf.loc[gdf["sample_id"] == sample_id, "start_date"] = pd.to_datetime( + start_date + ) + gdf.loc[gdf["sample_id"] == sample_id, "end_date"] = end_date # Convert band dtype to uint16 (temporary fix) # TODO: remove this step when the issue is fixed on the OpenEO backend From e1da04946ed309a1b922be7415d68b10426f3fb1 Mon Sep 17 00:00:00 2001 From: Kristof Van Tricht Date: Wed, 18 Dec 2024 15:42:09 +0100 Subject: [PATCH 09/22] Updated output folder structure for patches --- src/worldcereal/openeo/extract.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/worldcereal/openeo/extract.py b/src/worldcereal/openeo/extract.py index 28b22ae2..7d34f64b 100644 --- a/src/worldcereal/openeo/extract.py +++ b/src/worldcereal/openeo/extract.py @@ -181,10 +181,11 @@ def generate_output_path_patch( orbit_state = "" s2_tile_id = row.s2_tile - h3_l3_cell = row.h3_l3_cell + utm_zone = str(s2_tile_id[0:2]) epsg = s2_grid[s2_grid.tile == s2_tile_id].iloc[0].epsg - subfolder = root_folder / ref_id / h3_l3_cell / sample_id + subfolder = root_folder / ref_id / utm_zone / s2_tile_id / sample_id + return ( subfolder / f"{row.out_prefix}{orbit_state}_{sample_id}_{epsg}_{row.start_date}_{row.end_date}{row.out_extension}" From 2688409b613e432544d8a5277c583aaeaa3039e1 Mon Sep 17 00:00:00 2001 From: Kristof Van Tricht Date: Wed, 18 Dec 2024 15:50:55 +0100 Subject: [PATCH 10/22] h3_l3_cell is not needed as attribute --- .../extractions/point_extractions/extract_point_worldcereal.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/scripts/extractions/point_extractions/extract_point_worldcereal.py b/scripts/extractions/point_extractions/extract_point_worldcereal.py index 007c7439..7005d423 100644 --- a/scripts/extractions/point_extractions/extract_point_worldcereal.py +++ b/scripts/extractions/point_extractions/extract_point_worldcereal.py @@ -82,7 +82,6 @@ def create_job_dataframe_point_worldcereal( end_date = end_date.replace(day=1) + pd.offsets.MonthEnd(0) s2_tile = job.tile.iloc[0] - h3_l3_cell = job.h3_l3_cell.iloc[0] # Convert dates to string format start_date, end_date = start_date.strftime("%Y-%m-%d"), end_date.strftime( @@ -99,7 +98,6 @@ def create_job_dataframe_point_worldcereal( "start_date": start_date, "end_date": end_date, "s2_tile": s2_tile, - "h3_l3_cell": h3_l3_cell, "geometry": job.to_json(), } From 3a905ef80899de96fe99540b9aade2148c98aba8 Mon Sep 17 00:00:00 2001 From: Kristof Van Tricht Date: Wed, 18 Dec 2024 16:23:25 +0100 Subject: [PATCH 11/22] Bugfix part numbering --- .../point_extractions/extract_point_worldcereal.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/extractions/point_extractions/extract_point_worldcereal.py b/scripts/extractions/point_extractions/extract_point_worldcereal.py index 7005d423..621029f8 100644 --- a/scripts/extractions/point_extractions/extract_point_worldcereal.py +++ b/scripts/extractions/point_extractions/extract_point_worldcereal.py @@ -51,10 +51,10 @@ def generate_output_path_point_worldcereal( # Subfolder is not necessarily unique, so we create subfolders for # the different parts. + i = 0 if not any(subfolder.iterdir()): - real_subfolder = subfolder / "part_0" + real_subfolder = subfolder / f"part_{i}" else: - i = 0 while (subfolder / f"part_{i}").exists(): i += 1 real_subfolder = subfolder / f"part_{i}" From c11646180207ac139c855864c75d20513303d972 Mon Sep 17 00:00:00 2001 From: Kristof Van Tricht Date: Wed, 18 Dec 2024 16:42:09 +0100 Subject: [PATCH 12/22] Allow to filter on S2 tile --- .../point_extractions/extract_point_worldcereal.py | 8 ++++++++ src/worldcereal/openeo/preprocessing.py | 3 ++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/scripts/extractions/point_extractions/extract_point_worldcereal.py b/scripts/extractions/point_extractions/extract_point_worldcereal.py index 621029f8..e8f43010 100644 --- a/scripts/extractions/point_extractions/extract_point_worldcereal.py +++ b/scripts/extractions/point_extractions/extract_point_worldcereal.py @@ -129,6 +129,13 @@ def create_job_point_worldcereal( backend = Backend(row.backend_name) backend_context = BackendContext(backend) + # Try to get s2 tile ID to filter the collection + if "s2_tile" in row: + pipeline_log.debug(f"Extracting data for S2 tile {row.s2_tile}") + s2_tile = row.s2_tile + else: + s2_tile = None + inputs = worldcereal_preprocessed_inputs( connection=connection, backend_context=backend_context, @@ -136,6 +143,7 @@ def create_job_point_worldcereal( temporal_extent=temporal_extent, fetch_type=FetchType.POINT, validate_temporal_context=False, + s2_tile=s2_tile, ) # Finally, create a vector cube based on the Point geometries diff --git a/src/worldcereal/openeo/preprocessing.py b/src/worldcereal/openeo/preprocessing.py index f4300da9..b22dea96 100644 --- a/src/worldcereal/openeo/preprocessing.py +++ b/src/worldcereal/openeo/preprocessing.py @@ -340,6 +340,7 @@ def worldcereal_preprocessed_inputs( validate_temporal_context: bool = True, s1_orbit_state: Optional[str] = None, tile_size: Optional[int] = None, + s2_tile: Optional[str] = None, ) -> DataCube: # First validate the temporal context @@ -364,7 +365,7 @@ def worldcereal_preprocessed_inputs( "S2-L2A-B12", ], fetch_type=fetch_type, - filter_tile=None, + filter_tile=s2_tile, distance_to_cloud_flag=False if fetch_type == FetchType.POINT else True, additional_masks_flag=False, apply_mask_flag=True, From 833e96e02064245cfd9726f549341b3f36439046 Mon Sep 17 00:00:00 2001 From: Kristof Van Tricht Date: Wed, 18 Dec 2024 18:33:41 +0100 Subject: [PATCH 13/22] slope can now also be loaded for points --- src/worldcereal/openeo/preprocessing.py | 3 +- .../testresources/preprocess_graph.json | 118 +++++++++++++++--- 2 files changed, 105 insertions(+), 16 deletions(-) diff --git a/src/worldcereal/openeo/preprocessing.py b/src/worldcereal/openeo/preprocessing.py index b22dea96..1b68f91b 100644 --- a/src/worldcereal/openeo/preprocessing.py +++ b/src/worldcereal/openeo/preprocessing.py @@ -253,9 +253,8 @@ def raw_datacube_DEM( cube = extractor.get_cube(connection, spatial_extent, None) cube = cube.rename_labels(dimension="bands", target=["elevation"]) - if backend_context.backend == Backend.CDSE and fetch_type == FetchType.TILE: + if backend_context.backend == Backend.CDSE: # On CDSE we can load the slope from a global slope collection - # but this currently only works for tile fetching. if isinstance(spatial_extent, BoundingBoxExtent): spatial_extent = dict(spatial_extent) diff --git a/tests/worldcerealtests/testresources/preprocess_graph.json b/tests/worldcerealtests/testresources/preprocess_graph.json index 22f739e0..3676600a 100644 --- a/tests/worldcerealtests/testresources/preprocess_graph.json +++ b/tests/worldcerealtests/testresources/preprocess_graph.json @@ -705,6 +705,85 @@ } } }, + "loadstac1": { + "process_id": "load_stac", + "arguments": { + "bands": [ + "Slope" + ], + "spatial_extent": { + "type": "FeatureCollection", + "features": [ + { + "type": "Feature", + "geometry": { + "type": "Polygon", + "coordinates": [ + [ + [ + 44.433631, + 51.317362 + ], + [ + 44.432274, + 51.427238 + ], + [ + 44.69808, + 51.428224 + ], + [ + 44.698802, + 51.318344 + ], + [ + 44.433631, + 51.317362 + ] + ] + ] + }, + "properties": {} + } + ] + }, + "url": "https://stac.openeo.vito.be/collections/COPERNICUS30_DEM_SLOPE" + } + }, + "renamelabels4": { + "process_id": "rename_labels", + "arguments": { + "data": { + "from_node": "loadstac1" + }, + "dimension": "bands", + "target": [ + "slope" + ] + } + }, + "reducedimension1": { + "process_id": "reduce_dimension", + "arguments": { + "data": { + "from_node": "renamelabels4" + }, + "dimension": "t", + "reducer": { + "process_graph": { + "min1": { + "process_id": "min", + "arguments": { + "data": { + "from_parameter": "data" + } + }, + "result": true + } + } + } + } + }, "loadcollection4": { "process_id": "load_collection", "arguments": { @@ -760,7 +839,7 @@ } } }, - "reducedimension1": { + "reducedimension2": { "process_id": "reduce_dimension", "arguments": { "data": { @@ -769,7 +848,7 @@ "dimension": "t", "reducer": { "process_graph": { - "min1": { + "min2": { "process_id": "min", "arguments": { "data": { @@ -782,11 +861,11 @@ } } }, - "renamelabels4": { + "renamelabels5": { "process_id": "rename_labels", "arguments": { "data": { - "from_node": "reducedimension1" + "from_node": "reducedimension2" }, "dimension": "bands", "source": [ @@ -797,11 +876,11 @@ ] } }, - "renamelabels5": { + "renamelabels6": { "process_id": "rename_labels", "arguments": { "data": { - "from_node": "renamelabels4" + "from_node": "renamelabels5" }, "dimension": "bands", "target": [ @@ -809,11 +888,22 @@ ] } }, + "mergecubes2": { + "process_id": "merge_cubes", + "arguments": { + "cube1": { + "from_node": "reducedimension1" + }, + "cube2": { + "from_node": "renamelabels6" + } + } + }, "resamplecubespatial1": { "process_id": "resample_cube_spatial", "arguments": { "data": { - "from_node": "renamelabels5" + "from_node": "mergecubes2" }, "method": "bilinear", "target": { @@ -846,7 +936,7 @@ } } }, - "mergecubes2": { + "mergecubes3": { "process_id": "merge_cubes", "arguments": { "cube1": { @@ -857,7 +947,7 @@ } } }, - "loadstac1": { + "loadstac2": { "process_id": "load_stac", "arguments": { "bands": [ @@ -907,11 +997,11 @@ "url": "https://s3.waw3-1.cloudferro.com/swift/v1/agera/stac/collection.json" } }, - "renamelabels6": { + "renamelabels7": { "process_id": "rename_labels", "arguments": { "data": { - "from_node": "loadstac1" + "from_node": "loadstac2" }, "dimension": "bands", "target": [ @@ -924,7 +1014,7 @@ "process_id": "resample_cube_spatial", "arguments": { "data": { - "from_node": "renamelabels6" + "from_node": "renamelabels7" }, "method": "bilinear", "target": { @@ -932,11 +1022,11 @@ } } }, - "mergecubes3": { + "mergecubes4": { "process_id": "merge_cubes", "arguments": { "cube1": { - "from_node": "mergecubes2" + "from_node": "mergecubes3" }, "cube2": { "from_node": "resamplecubespatial2" From d64fe03e0e201afbea8d7fae1c87e558c5e21fa1 Mon Sep 17 00:00:00 2001 From: Kristof Van Tricht Date: Wed, 18 Dec 2024 19:37:11 +0100 Subject: [PATCH 14/22] default parallel jobs set to 2 --- scripts/extractions/extract.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/extractions/extract.py b/scripts/extractions/extract.py index 427aa088..4f4e2891 100644 --- a/scripts/extractions/extract.py +++ b/scripts/extractions/extract.py @@ -342,7 +342,7 @@ def run_extractions( memory: str = "1800m", python_memory: str = "1900m", max_executors: int = 22, - parallel_jobs: int = 10, + parallel_jobs: int = 2, restart_failed: bool = False, extract_value: int = 1, backend=Backend.CDSE, From 113c2c8cd34fb00d013e88554f5ea52526ce17e8 Mon Sep 17 00:00:00 2001 From: Kristof Van Tricht Date: Thu, 19 Dec 2024 22:02:09 +0100 Subject: [PATCH 15/22] Use job ID to label parquet files --- .../extract_point_worldcereal.py | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/scripts/extractions/point_extractions/extract_point_worldcereal.py b/scripts/extractions/point_extractions/extract_point_worldcereal.py index e8f43010..56b454b1 100644 --- a/scripts/extractions/point_extractions/extract_point_worldcereal.py +++ b/scripts/extractions/point_extractions/extract_point_worldcereal.py @@ -49,19 +49,11 @@ def generate_output_path_point_worldcereal( subfolder = root_folder / utm_zone / s2_tile_id subfolder.mkdir(parents=True, exist_ok=True) - # Subfolder is not necessarily unique, so we create subfolders for - # the different parts. - i = 0 - if not any(subfolder.iterdir()): - real_subfolder = subfolder / f"part_{i}" - else: - while (subfolder / f"part_{i}").exists(): - i += 1 - real_subfolder = subfolder / f"part_{i}" - - output_file = f"WORLDCEREAL_{root_folder.name}_{row.start_date}_{row.end_date}_{s2_tile_id}_part_{i}{row.out_extension}" + # we may have multiple output files per s2_tile_id and need + # a unique name so we use the job ID + output_file = f"WORLDCEREAL_{root_folder.name}_{row.start_date}_{row.end_date}_{s2_tile_id}_{row.id}{row.out_extension}" - return real_subfolder / output_file + return subfolder / output_file def create_job_dataframe_point_worldcereal( From dbe8ffa5f4474e474dac0b724dcfe8f08b880e79 Mon Sep 17 00:00:00 2001 From: Kristof Van Tricht Date: Mon, 6 Jan 2025 13:04:49 +0100 Subject: [PATCH 16/22] First and last day of month as dates --- .../point_extractions/extract_point_worldcereal.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/extractions/point_extractions/extract_point_worldcereal.py b/scripts/extractions/point_extractions/extract_point_worldcereal.py index 56b454b1..cd61830c 100644 --- a/scripts/extractions/point_extractions/extract_point_worldcereal.py +++ b/scripts/extractions/point_extractions/extract_point_worldcereal.py @@ -185,8 +185,8 @@ def post_job_action_point_worldcereal( sample_ids = gdf["sample_id"].unique() for sample_id in sample_ids: sample = gdf[gdf["sample_id"] == sample_id] - start_date = sample["timestamp"].min() - end_date = sample["timestamp"].max() + start_date = sample["timestamp"].min().replace(day=1) + end_date = sample["timestamp"].max().replace(day=1) + pd.offsets.MonthEnd(0) gdf.loc[gdf["sample_id"] == sample_id, "start_date"] = pd.to_datetime( start_date ) From 24808bd7c8f109619d8137a67b270863f41708bf Mon Sep 17 00:00:00 2001 From: Kristof Van Tricht Date: Mon, 6 Jan 2025 13:07:09 +0100 Subject: [PATCH 17/22] Convert to datetimes first --- .../point_extractions/extract_point_worldcereal.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/scripts/extractions/point_extractions/extract_point_worldcereal.py b/scripts/extractions/point_extractions/extract_point_worldcereal.py index cd61830c..7a30bafb 100644 --- a/scripts/extractions/point_extractions/extract_point_worldcereal.py +++ b/scripts/extractions/point_extractions/extract_point_worldcereal.py @@ -185,11 +185,11 @@ def post_job_action_point_worldcereal( sample_ids = gdf["sample_id"].unique() for sample_id in sample_ids: sample = gdf[gdf["sample_id"] == sample_id] - start_date = sample["timestamp"].min().replace(day=1) - end_date = sample["timestamp"].max().replace(day=1) + pd.offsets.MonthEnd(0) - gdf.loc[gdf["sample_id"] == sample_id, "start_date"] = pd.to_datetime( - start_date - ) + start_date = pd.to_datetime(sample["timestamp"].min()).replace(day=1) + end_date = pd.to_datetime(sample["timestamp"].max()).replace( + day=1 + ) + pd.offsets.MonthEnd(0) + gdf.loc[gdf["sample_id"] == sample_id, "start_date"] = start_date gdf.loc[gdf["sample_id"] == sample_id, "end_date"] = end_date # Convert band dtype to uint16 (temporary fix) From a5622ff036a70c886a9ef5d99e2575c7ed1df666 Mon Sep 17 00:00:00 2001 From: Kristof Van Tricht Date: Tue, 7 Jan 2025 16:22:30 +0100 Subject: [PATCH 18/22] Refactoring of default job settings parsing --- scripts/extractions/extract.py | 36 +++++++++---------- .../patch_extractions/extract_patch_meteo.py | 10 +++--- .../patch_extractions/extract_patch_s1.py | 6 ++-- .../patch_extractions/extract_patch_s2.py | 10 +++--- .../extract_patch_worldcereal.py | 6 ++-- .../extract_point_worldcereal.py | 6 ++-- 6 files changed, 37 insertions(+), 37 deletions(-) diff --git a/scripts/extractions/extract.py b/scripts/extractions/extract.py index 4f4e2891..8d2859d0 100644 --- a/scripts/extractions/extract.py +++ b/scripts/extractions/extract.py @@ -144,9 +144,9 @@ def prepare_job_dataframe( def setup_extraction_functions( collection: ExtractionCollection, extract_value: int, - memory: str, - python_memory: str, - max_executors: int, + memory: typing.Union[str, None], + python_memory: typing.Union[str, None], + max_executors: typing.Union[int, None], ) -> tuple[typing.Callable, typing.Callable, typing.Callable]: """Setup the datacube creation, path generation and post-job action functions for the given collection. Returns a tuple of three functions: @@ -158,33 +158,33 @@ def setup_extraction_functions( datacube_creation = { ExtractionCollection.PATCH_SENTINEL1: partial( create_job_patch_s1, - executor_memory=memory, - python_memory=python_memory, - max_executors=max_executors, + executor_memory=memory if memory is not None else "1800m", + python_memory=python_memory if python_memory is not None else "1900m", + max_executors=max_executors if max_executors is not None else 22, ), ExtractionCollection.PATCH_SENTINEL2: partial( create_job_patch_s2, - executor_memory=memory, - python_memory=python_memory, - max_executors=max_executors, + executor_memory=memory if memory is not None else "1800m", + python_memory=python_memory if python_memory is not None else "1900m", + max_executors=max_executors if max_executors is not None else 22, ), ExtractionCollection.PATCH_METEO: partial( create_job_patch_meteo, - executor_memory=memory, - python_memory=python_memory, - max_executors=max_executors, + executor_memory=memory if memory is not None else "1800m", + python_memory=python_memory if python_memory is not None else "1000m", + max_executors=max_executors if max_executors is not None else 22, ), ExtractionCollection.PATCH_WORLDCEREAL: partial( create_job_patch_worldcereal, - executor_memory=memory, - python_memory=python_memory, - max_executors=max_executors, + executor_memory=memory if memory is not None else "1800m", + python_memory=python_memory if python_memory is not None else "3000m", + max_executors=max_executors if max_executors is not None else 22, ), ExtractionCollection.POINT_WORLDCEREAL: partial( create_job_point_worldcereal, - executor_memory=memory, - python_memory=python_memory, - max_executors=max_executors, + executor_memory=memory if memory is not None else "1800m", + python_memory=python_memory if python_memory is not None else "3000m", + max_executors=max_executors if max_executors is not None else 22, ), } diff --git a/scripts/extractions/patch_extractions/extract_patch_meteo.py b/scripts/extractions/patch_extractions/extract_patch_meteo.py index ae8662c6..285fa57b 100644 --- a/scripts/extractions/patch_extractions/extract_patch_meteo.py +++ b/scripts/extractions/patch_extractions/extract_patch_meteo.py @@ -24,11 +24,11 @@ def create_job_dataframe_patch_meteo( def create_job_patch_meteo( row: pd.Series, connection: openeo.DataCube, - provider=None, - connection_provider=None, - executor_memory: str = "2G", - python_memory: str = "1G", - max_executors: int = 22, + provider, + connection_provider, + executor_memory: str, + python_memory: str, + max_executors: int, ) -> gpd.GeoDataFrame: start_date = row.start_date end_date = row.end_date diff --git a/scripts/extractions/patch_extractions/extract_patch_s1.py b/scripts/extractions/patch_extractions/extract_patch_s1.py index b23ea5a8..69b58a97 100644 --- a/scripts/extractions/patch_extractions/extract_patch_s1.py +++ b/scripts/extractions/patch_extractions/extract_patch_s1.py @@ -122,9 +122,9 @@ def create_job_patch_s1( connection: openeo.DataCube, provider, connection_provider, - executor_memory: str = "5G", - python_memory: str = "2G", - max_executors: int = 22, + executor_memory: str, + python_memory: str, + max_executors: int, ) -> openeo.BatchJob: """Creates an OpenEO BatchJob from the given row information. This job is a S1 patch of 32x32 pixels at 20m spatial resolution.""" diff --git a/scripts/extractions/patch_extractions/extract_patch_s2.py b/scripts/extractions/patch_extractions/extract_patch_s2.py index 7af3f1f2..351737bd 100644 --- a/scripts/extractions/patch_extractions/extract_patch_s2.py +++ b/scripts/extractions/patch_extractions/extract_patch_s2.py @@ -70,11 +70,11 @@ def create_job_dataframe_patch_s2( def create_job_patch_s2( row: pd.Series, connection: openeo.DataCube, - provider=None, - connection_provider=None, - executor_memory: str = "5G", - python_memory: str = "2G", - max_executors: int = 22, + provider, + connection_provider, + executor_memory: str, + python_memory: str, + max_executors: int, ) -> gpd.GeoDataFrame: start_date = row.start_date end_date = row.end_date diff --git a/scripts/extractions/patch_extractions/extract_patch_worldcereal.py b/scripts/extractions/patch_extractions/extract_patch_worldcereal.py index fe2b9292..da409bf5 100644 --- a/scripts/extractions/patch_extractions/extract_patch_worldcereal.py +++ b/scripts/extractions/patch_extractions/extract_patch_worldcereal.py @@ -124,9 +124,9 @@ def create_job_patch_worldcereal( connection: openeo.DataCube, provider, connection_provider, - executor_memory: str = "5G", - python_memory: str = "2G", - max_executors: int = 22, + executor_memory: str, + python_memory: str, + max_executors: int, ) -> openeo.BatchJob: """Creates an OpenEO BatchJob from the given row information.""" diff --git a/scripts/extractions/point_extractions/extract_point_worldcereal.py b/scripts/extractions/point_extractions/extract_point_worldcereal.py index 7a30bafb..df4eef36 100644 --- a/scripts/extractions/point_extractions/extract_point_worldcereal.py +++ b/scripts/extractions/point_extractions/extract_point_worldcereal.py @@ -103,9 +103,9 @@ def create_job_point_worldcereal( connection: openeo.DataCube, provider, connection_provider, - executor_memory: str = "5G", - python_memory: str = "2G", - max_executors: int = 22, + executor_memory: str, + python_memory: str, + max_executors: int, ): """Creates an OpenEO BatchJob from the given row information.""" From bd1acdad1903c2bb4544729d3a3338119c9733b3 Mon Sep 17 00:00:00 2001 From: Kristof Van Tricht Date: Wed, 8 Jan 2025 11:38:08 +0100 Subject: [PATCH 19/22] Use openeo-gfmap==0.3.0 --- environment.yml | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/environment.yml b/environment.yml index c92922f4..276db692 100644 --- a/environment.yml +++ b/environment.yml @@ -35,7 +35,7 @@ dependencies: - pip: - duckdb==1.1.0 - h3==3.7.7 - - openeo-gfmap==0.2.0 + - openeo-gfmap==0.3.0 - git+https://github.com/worldcereal/worldcereal-classification - git+https://github.com/WorldCereal/presto-worldcereal.git@croptype diff --git a/pyproject.toml b/pyproject.toml index afd4d1bf..accc5055 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,7 +43,7 @@ dependencies = [ "netcdf4<=1.6.4", "numpy<2.0.0", "openeo==0.31.0", - "openeo-gfmap==0.2.0", + "openeo-gfmap==0.3.0", "pyarrow", "pydantic==2.8.0", "rioxarray>=0.13.0", From 105e3d775ef0878a92464eaf89326b10cc404ce2 Mon Sep 17 00:00:00 2001 From: Kristof Van Tricht Date: Wed, 8 Jan 2025 11:44:44 +0100 Subject: [PATCH 20/22] Need to bump to h3==4.1.0 as well --- environment.yml | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/environment.yml b/environment.yml index 276db692..f7b646a6 100644 --- a/environment.yml +++ b/environment.yml @@ -34,7 +34,7 @@ dependencies: - tqdm - pip: - duckdb==1.1.0 - - h3==3.7.7 + - h3==4.1.0 - openeo-gfmap==0.3.0 - git+https://github.com/worldcereal/worldcereal-classification - git+https://github.com/WorldCereal/presto-worldcereal.git@croptype diff --git a/pyproject.toml b/pyproject.toml index accc5055..d7c0aabf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,7 +37,7 @@ dependencies = [ "cftime", "geojson", "geopandas", - "h3==3.7.7", + "h3==4.1.0", "h5netcdf>=1.1.0", "loguru>=0.7.2", "netcdf4<=1.6.4", From b99559c8dda578f426cfc023207dc0f1448ff7dc Mon Sep 17 00:00:00 2001 From: Kristof Van Tricht Date: Wed, 8 Jan 2025 12:09:44 +0100 Subject: [PATCH 21/22] Updated reference graphs --- .../testresources/preprocess_graph.json | 64 +------------------ .../preprocess_graphwithslope.json | 64 +------------------ 2 files changed, 6 insertions(+), 122 deletions(-) diff --git a/tests/worldcerealtests/testresources/preprocess_graph.json b/tests/worldcerealtests/testresources/preprocess_graph.json index 3676600a..c227e237 100644 --- a/tests/worldcerealtests/testresources/preprocess_graph.json +++ b/tests/worldcerealtests/testresources/preprocess_graph.json @@ -540,35 +540,6 @@ } }, "arrayelement2": { - "process_id": "array_element", - "arguments": { - "data": { - "from_parameter": "data" - }, - "index": 0 - } - }, - "isnodata1": { - "process_id": "is_nodata", - "arguments": { - "x": { - "from_node": "arrayelement2" - } - } - }, - "if1": { - "process_id": "if", - "arguments": { - "accept": 1, - "reject": { - "from_node": "power1" - }, - "value": { - "from_node": "isnodata1" - } - } - }, - "arrayelement3": { "process_id": "array_element", "arguments": { "data": { @@ -582,7 +553,7 @@ "arguments": { "base": 10, "x": { - "from_node": "arrayelement3" + "from_node": "arrayelement2" } } }, @@ -622,44 +593,15 @@ } } }, - "arrayelement4": { - "process_id": "array_element", - "arguments": { - "data": { - "from_parameter": "data" - }, - "index": 1 - } - }, - "isnodata2": { - "process_id": "is_nodata", - "arguments": { - "x": { - "from_node": "arrayelement4" - } - } - }, - "if2": { - "process_id": "if", - "arguments": { - "accept": 1, - "reject": { - "from_node": "power2" - }, - "value": { - "from_node": "isnodata2" - } - } - }, "arraycreate1": { "process_id": "array_create", "arguments": { "data": [ { - "from_node": "if1" + "from_node": "power1" }, { - "from_node": "if2" + "from_node": "power2" } ] }, diff --git a/tests/worldcerealtests/testresources/preprocess_graphwithslope.json b/tests/worldcerealtests/testresources/preprocess_graphwithslope.json index a96e0077..a167a7de 100644 --- a/tests/worldcerealtests/testresources/preprocess_graphwithslope.json +++ b/tests/worldcerealtests/testresources/preprocess_graphwithslope.json @@ -429,35 +429,6 @@ } }, "arrayelement2": { - "process_id": "array_element", - "arguments": { - "data": { - "from_parameter": "data" - }, - "index": 0 - } - }, - "isnodata1": { - "process_id": "is_nodata", - "arguments": { - "x": { - "from_node": "arrayelement2" - } - } - }, - "if1": { - "process_id": "if", - "arguments": { - "accept": 1, - "reject": { - "from_node": "power1" - }, - "value": { - "from_node": "isnodata1" - } - } - }, - "arrayelement3": { "process_id": "array_element", "arguments": { "data": { @@ -471,7 +442,7 @@ "arguments": { "base": 10, "x": { - "from_node": "arrayelement3" + "from_node": "arrayelement2" } } }, @@ -511,44 +482,15 @@ } } }, - "arrayelement4": { - "process_id": "array_element", - "arguments": { - "data": { - "from_parameter": "data" - }, - "index": 1 - } - }, - "isnodata2": { - "process_id": "is_nodata", - "arguments": { - "x": { - "from_node": "arrayelement4" - } - } - }, - "if2": { - "process_id": "if", - "arguments": { - "accept": 1, - "reject": { - "from_node": "power2" - }, - "value": { - "from_node": "isnodata2" - } - } - }, "arraycreate1": { "process_id": "array_create", "arguments": { "data": [ { - "from_node": "if1" + "from_node": "power1" }, { - "from_node": "if2" + "from_node": "power2" } ] }, From f2b22cc7a4a26dc6d53d234d1cd150c788c794a5 Mon Sep 17 00:00:00 2001 From: Kristof Van Tricht Date: Thu, 9 Jan 2025 11:35:30 +0100 Subject: [PATCH 22/22] Set required attributes in the job --- .../extract_point_worldcereal.py | 22 +++++-------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/scripts/extractions/point_extractions/extract_point_worldcereal.py b/scripts/extractions/point_extractions/extract_point_worldcereal.py index df4eef36..ab800871 100644 --- a/scripts/extractions/point_extractions/extract_point_worldcereal.py +++ b/scripts/extractions/point_extractions/extract_point_worldcereal.py @@ -83,6 +83,12 @@ def create_job_dataframe_point_worldcereal( # Set back the valid_time in the geometry as string job["valid_time"] = job.valid_time.dt.strftime("%Y-%m-%d") + # Add other attributes we want to keep in the result + job["start_date"] = start_date + job["end_date"] = end_date + job["lat"] = job.geometry.y + job["lon"] = job.geometry.x + variables = { "backend_name": backend.value, "out_prefix": "point-extraction", @@ -176,22 +182,6 @@ def post_job_action_point_worldcereal( gdf["timestamp"] = pd.to_datetime(gdf["date"]) gdf.drop(columns=["date"], inplace=True) - # Derive latitude and longitude from the geometry - gdf["lat"] = gdf.geometry.y - gdf["lon"] = gdf.geometry.x - - # For each sample, add start and end date to the dataframe - # is there a better way to do this, as this is already done in the job creation? - sample_ids = gdf["sample_id"].unique() - for sample_id in sample_ids: - sample = gdf[gdf["sample_id"] == sample_id] - start_date = pd.to_datetime(sample["timestamp"].min()).replace(day=1) - end_date = pd.to_datetime(sample["timestamp"].max()).replace( - day=1 - ) + pd.offsets.MonthEnd(0) - gdf.loc[gdf["sample_id"] == sample_id, "start_date"] = start_date - gdf.loc[gdf["sample_id"] == sample_id, "end_date"] = end_date - # Convert band dtype to uint16 (temporary fix) # TODO: remove this step when the issue is fixed on the OpenEO backend bands = [