From 097b1fd6f6aebe46416aa7c1e761f1d0797509d3 Mon Sep 17 00:00:00 2001 From: Darius Couchard Date: Wed, 13 Mar 2024 15:40:38 +0100 Subject: [PATCH 01/11] Added sar and optical pipelines --- scripts/extractions/extract_optical.py | 199 +++++++++++ scripts/extractions/extract_sar.py | 460 +++++++++++++++++++++++++ 2 files changed, 659 insertions(+) create mode 100644 scripts/extractions/extract_optical.py create mode 100644 scripts/extractions/extract_sar.py diff --git a/scripts/extractions/extract_optical.py b/scripts/extractions/extract_optical.py new file mode 100644 index 00000000..b007f4c7 --- /dev/null +++ b/scripts/extractions/extract_optical.py @@ -0,0 +1,199 @@ +import logging +from typing import List, Optional + +import geojson +import geopandas as gpd +import openeo +import pandas as pd +from extract_sar import ( + _buffer_geometry, + _filter_extract_true, + _upload_geoparquet_artifactory, +) +from openeo_gfmap import Backend, BackendContext, FetchType, TemporalContext +from openeo_gfmap.fetching.s2 import build_sentinel2_l2a_extractor +from openeo_gfmap.manager import _log + +# Logger for this current pipeline +_pipeline_log: Optional[logging.Logger] = None + + +def _setup_logger(level=logging.INFO) -> None: + global _pipeline_log + """Setup the logger from the openeo_gfmap package to the assigned level.""" + _pipeline_log = logging.getLogger("pipeline_sar") + + _pipeline_log.setLevel(level) + _log.setLevel(level) + + stream_handler = logging.StreamHandler() + _log.addHandler(stream_handler) + _pipeline_log.addHandler(stream_handler) + + formatter = logging.Formatter("%(asctime)s|%(name)s|%(levelname)s: %(message)s") + stream_handler.setFormatter(formatter) + + # Exclude the other loggers from other libraries + class ManagerLoggerFilter(logging.Filter): + """Filter to only accept the OpenEO-GFMAP manager logs.""" + + def filter(self, record): + return record.name in [_log.name, _pipeline_log.name] + + stream_handler.addFilter(ManagerLoggerFilter()) + + +def get_job_nb_polygons(row: pd.Series) -> int: + """Get the number of polygons in the geometry.""" + return len( + list( + filter( + lambda feat: feat.properties.get("extract"), + geojson.loads(row.geometry)["features"], + ) + ) + ) + + +def create_job_dataframe_s2( + backend: Backend, split_jobs: List[gpd.GeoDataFrame] +) -> pd.DataFrame: + """Create a dataframe from the split jobs, containg all the necessary information to run the job.""" + columns = [ + "backend_name", + "out_prefix", + "out_extension", + "start_date", + "end_date", + "s2_tile", + "h3index", + "geometry", + ] + rows = [] + for job in split_jobs: + # Compute the average in the valid date and make a buffer of 1.5 year around + median_time = pd.to_datetime(job.valid_date).mean() + start_date = median_time - pd.Timedelta(days=275) # A bit more than 9 months + end_date = median_time + pd.Timedelta(days=275) # A bit more than 9 months + s2_tile = job.tile.iloc[0] # Job dataframes are split depending on the + h3index = job.h3index.iloc[0] + + rows.append( + pd.Series( + dict( + zip( + columns, + [ + backend.value, + "S2-L2A-10m", + ".nc", + start_date.strftime("%Y-%m-%d"), + end_date.strftime("%Y-%m-%d"), + s2_tile, + h3index, + job.to_json(), + ], + ) + ) + ) + ) + + return pd.DataFrame(rows) + + +def create_datacube_optical( + row: pd.Series, + connection: openeo.DataCube, + provider=None, + connection_provider=None, + executor_memory: str = "5G", + executor_memory_overhead: str = "2G", +) -> gpd.GeoDataFrame: + start_date = row.start_date + end_date = row.end_date + temporal_context = TemporalContext(start_date, end_date) + + # Get the feature collection containing the geometry to the job + geometry = geojson.loads(row.geometry) + assert isinstance(geometry, geojson.FeatureCollection) + + # Filter the geometry to the rows with the extract only flag + geometry = _filter_extract_true(geometry) + assert len(geometry.features) > 0, "No geometries with the extract flag found" + + # Performs a buffer of 64 px around the geometry + geometry_df = _buffer_geometry(geometry, 320) + spatial_extent_url = _upload_geoparquet_artifactory(geometry_df, row.name) + + # Backend name and fetching type + backend = Backend(row.backend_name) + backend_context = BackendContext(backend) + + fetch_type = FetchType.POLYGON + bands_to_download = [ + "S2-L2A-B01", + "S2-L2A-B02", + "S2-L2A-B03", + "S2-L2A-B04", + "S2-L2A-B05", + "S2-L2A-B06", + "S2-L2A-B07", + "S2-L2A-B08", + "S2-L2A-B8A", + "S2-L2A-B09", + "S2-L2A-B11", + "S2-L2A-B12", + "S2-L2A-SCL", + ] + + # Create the job to extract S2 + extraction_parameters = { + "target_resolution": 10, + "load_collection": { + "eo:cloud_cover": lambda val: val <= 95.0, + "tileId": lambda val: val == row.s2_tile, + }, + } + extractor = build_sentinel2_l2a_extractor( + backend_context, + bands=bands_to_download, + fetch_type=fetch_type.POLYGON, + **extraction_parameters, + ) + + cube = extractor.get_cube(connection, spatial_extent_url, temporal_context) + + # Compute the SCL dilation and add it to the cube + scl_dilated_mask = cube.process( + "to_scl_dilation_mask", + data=cube, + scl_band_name="S2-L2A-SCL", + kernel1_size=17, # 17px dilation on a 20m layer + kernel2_size=77, # 77px dilation on a 20m layer + mask1_values=[2, 4, 5, 6, 7], + mask2_values=[3, 8, 9, 10, 11], + erosion_kernel_size=3, + ).rename_labels("bands", ["S2-L2A-SCL_DILATED_MASK"]) + + cube = cube.merge_cubes(scl_dilated_mask) + cube = cube.linear_scale_range(0, 65534, 0, 65534) + + # Get the h3index to use in the tile + h3index = geometry.features[0].properties["h3index"] + valid_date = geometry.features[0].properties["valid_date"] + + # Increase the memory of the jobs depending on the number of polygons to extract + number_polygons = get_job_nb_polygons(row) + _log.debug(f"Number of polygons to extract: {number_polygons}") + + job_options = { + "executor-memory": "5G", + "executor-memoryOverhead": "2G", + } + + return cube.create_job( + out_format="NetCDF", + title=f"GFMAP_Extraction_S2_{h3index}_{valid_date}", + sample_by_feature=True, + job_options=job_options, + ) diff --git a/scripts/extractions/extract_sar.py b/scripts/extractions/extract_sar.py new file mode 100644 index 00000000..2ce66cfe --- /dev/null +++ b/scripts/extractions/extract_sar.py @@ -0,0 +1,460 @@ +"""Extract S1 data using OpenEO-GFMAP package.""" +import argparse +import json +import logging +import os +from datetime import datetime +from functools import partial +from importlib.metadata import version +from pathlib import Path +from tempfile import NamedTemporaryFile +from typing import List, Optional + +import geojson +import geopandas as gpd +import openeo +import pandas as pd +import pystac +import requests +import xarray as xr +from openeo_gfmap import Backend, BackendContext, FetchType, TemporalContext +from openeo_gfmap.backend import cdse_connection +from openeo_gfmap.fetching.s1 import build_sentinel1_grd_extractor +from openeo_gfmap.manager import _log +from openeo_gfmap.manager.job_manager import GFMAPJobManager +from openeo_gfmap.manager.job_splitters import split_job_hex +from openeo_gfmap.stac import AUXILIARY +from pyproj import CRS, Transformer +from rasterio.features import rasterize +from rasterio.transform import from_bounds +from shapely.geometry import Point, box + +# Logger for this current pipeline +_pipeline_log: Optional[logging.Logger] = None + + +def _setup_logger(level=logging.INFO) -> None: + global _pipeline_log + """Setup the logger from the openeo_gfmap package to the assigned level.""" + _pipeline_log = logging.getLogger("pipeline_sar") + + _pipeline_log.setLevel(level) + _log.setLevel(level) + + stream_handler = logging.StreamHandler() + _log.addHandler(stream_handler) + _pipeline_log.addHandler(stream_handler) + + formatter = logging.Formatter("%(asctime)s|%(name)s|%(levelname)s: %(message)s") + stream_handler.setFormatter(formatter) + + # Exclude the other loggers from other libraries + class ManagerLoggerFilter(logging.Filter): + """Filter to only accept the OpenEO-GFMAP manager logs.""" + + def filter(self, record): + return record.name in [_log.name, _pipeline_log.name] + + stream_handler.addFilter(ManagerLoggerFilter()) + + +def _buffer_geometry(geometries: geojson.FeatureCollection) -> gpd.GeoDataFrame: + """For each geometry of the colleciton, perform a square buffer of 320 + meters on the centroid and return the GeoDataFrame. Before buffering, + the centroid is clipped to the closest 20m multiplier in order to stay + aligned with the Sentinel-1 pixel grid. + """ + gdf = gpd.GeoDataFrame.from_features(geometries).set_crs(epsg=4326) + utm = gdf.estimate_utm_crs() + gdf = gdf.to_crs(utm) + + # Perform the buffering operation + gdf["geometry"] = gdf.centroid.apply( + lambda point: Point(round(point.x / 20.0) * 20.0, round(point.y / 20.0) * 20.0) + ).buffer( + distance=320, cap_style=3 + ) # Square buffer + + return gdf + + +def _filter_extract_true(geometries: geojson.FeatureCollection) -> gpd.GeoDataFrame: + """Remove all the geometries from the Feature Collection that have the property field `extract` set to `False`""" + return geojson.FeatureCollection( + [f for f in geometries.features if f.properties.get("extract", False)] + ) + + +def _upload_geoparquet_artifactory(gdf: gpd.GeoDataFrame, name: str) -> str: + """Upload the given GeoDataFrame to artifactory and return the URL of the + uploaded file. Necessary as a workaround for Polygon sampling in OpenEO + using custom CRS. + """ + # Save the dataframe as geoparquet to upload it to artifactory + temporary_file = NamedTemporaryFile() + gdf.to_parquet(temporary_file.name) + + artifactory_username = os.getenv("ARTIFACTORY_USERNAME") + artifactory_password = os.getenv("ARTIFACTORY_PASSWORD") + + headers = {"Content-Type": "application/octet-stream"} + + upload_url = f"https://artifactory.vgt.vito.be/artifactory/auxdata-public/gfmap-temp/openeogfmap_dataframe_{name}.parquet" + + with open(temporary_file.name, "rb") as f: + response = requests.put( + upload_url, + headers=headers, + data=f, + auth=(artifactory_username, artifactory_password), + timeout=180, + ) + + assert ( + response.status_code == 201 + ), f"Error uploading the dataframe to artifactory: {response.text}" + + return upload_url + + +def generate_job_df( + backend: Backend, split_dataframes: List[gpd.GeoDataFrame] +) -> pd.DataFrame: + """Generates a dataframe for each job that will be executed by the GFMAPJobManager. + This step also includes additional information that will be used later in other steps. + """ + columns = [ + "backend_name", + "out_prefix", + "out_extension", + "start_date", + "end_date", + "geometry", + ] + rows = [] + for job in split_dataframes: + # Compute the average in the valid date and make a buffer of 1.5 year around + median_time = pd.to_datetime(job.valid_date).mean() + start_date = median_time - pd.Timedelta(days=275) # A bit more than 9 months + end_date = median_time + pd.Timedelta(days=275) # A bit more than 9 months + + rows.append( + pd.Series( + dict( + zip( + columns, + [ + backend.value, + "S1-SIGMA0-20m", + ".nc", + start_date.strftime("%Y-%m-%d"), + end_date.strftime("%Y-%m-%d"), + job.to_json(), + ], + ) + ) + ) + ) + return pd.DataFrame(rows) + + +def create_datacube_sar( + row: pd.Series, + connection: openeo.DataCube, + provider, + connection_provider, + executor_memory: str = "5G", + executor_memory_overhead: str = "2G", +) -> openeo.BatchJob: + """Creates an OpenEO BatchJob from the given row information. This job is a + S1 patch of 32x32 pixels at 20m spatial resolution.""" + + # Load the temporal extent + start_date = row.start_date + end_date = row.end_date + temporal_context = TemporalContext(start_date, end_date) + + # Get the feature collection containing the geometry to the job + geometry = geojson.loads(row.geometry) + assert isinstance(geometry, geojson.FeatureCollection) + + # Filter the geometry to the rows with the extract only flag + geometry = _filter_extract_true(geometry) + assert len(geometry.features) > 0, "No geometries with the extract flag found" + + # Performs a buffer of 64 px around the geometry + geometry_df = _buffer_geometry(geometry) + spatial_extent_url = _upload_geoparquet_artifactory(geometry_df, row.name) + + # Backend name and fetching type + backend = Backend(row.backend_name) + backend_context = BackendContext(backend) + + # Create the job to extract S2 + extraction_parameters = { + "target_resolution": 20, + } + + # Initialize the fetching utilities in GFMAP to perfrom S1 extraction and + # backscatter computation. + extractor = build_sentinel1_grd_extractor( + backend_context=backend_context, + bands=["S1-VV", "S1-VH"], + fetch_type=FetchType.POLYGON, + **extraction_parameters, + ) + + cube = extractor.get_cube( + connection=connection, + spatial_context=spatial_extent_url, + temporal_context=temporal_context, + ) + + # Additional values to generate the BatcJob name + h3index = geometry.features[0].properties["h3index"] + valid_date = geometry.features[0].properties["valid_date"] + + job_options = { + "executor-memory": executor_memory, + "executor-memoryOverhead": executor_memory_overhead, + } + return cube.create_job( + out_format="NetCDF", + title=f"GFMAP_Extraction_S1_{h3index}_{valid_date}", + sample_by_feature=True, + job_options=job_options, + ) + + +def generate_output_path( + root_folder: Path, + tmp_path: Path, + geometry_index: int, + row: pd.Series, + s2_grid: gpd.GeoDataFrame, +) -> Path: + """Generates a path where to save the output result, using the root folder""" + features = geojson.loads(row.geometry) + sample_id = features[geometry_index].properties["sample_id"] + ref_id = features[geometry_index].properties["ref_id"] + try: + # Loads the centroid of the tile in latlon coordinates + inds = xr.open_dataset(tmp_path, chunks="auto") + + source_crs = CRS.from_wkt(inds.crs.attrs["crs_wkt"]) + dst_crs = CRS.from_epsg(4326) + transformer = Transformer.from_crs(source_crs, dst_crs, always_xy=True) + + # Get the center point of the tile + centroid_utm = box( + inds.x.min().item(), + inds.y.min().item(), + inds.x.max().item(), + inds.y.max().item(), + ).centroid + centroid_latlon = Point(*transformer.transform(centroid_utm.x, centroid_utm.y)) + + # Intersecting with the s2 grid, take the tile that is the closest from this centroid + intersecting = s2_grid.geometry.intersects(centroid_latlon) + + # Select the intersecting cell that has a centroid the closest from the point + intersecting_cells = s2_grid[intersecting] + intersecting_cells["distance"] = intersecting_cells.distance(centroid_latlon) + intersecting_cells.sort_values("distance", inplace=True) + s2_tile = intersecting_cells.iloc[0] + + s2_tile_id = s2_tile.tile + + subfolder = ( + root_folder / ref_id / str(source_crs.to_epsg()) / s2_tile_id / sample_id + ) + except: # NOQA + subfolder = root_folder / "unsortable" + + return ( + subfolder + / f"{row.out_prefix}_{sample_id}_{source_crs.to_epsg()}_{row.start_date}_{row.end_date}{row.out_extension}" + ) + + +def add_item_asset(related_item: pystac.Item, path: Path): + asset = AUXILIARY.create_asset(href=path.as_posix()) + related_item.add_asset("auxiliary", asset) + + +def post_job_action( + job_items: List[pystac.Item], row: pd.Series, parameters: dict = {} +) -> list: + base_gpd = gpd.GeoDataFrame.from_features(json.loads(row.geometry)).set_crs( + epsg=4326 + ) + assert ( + len(base_gpd[base_gpd.extract == True]) == len(job_items), + "The number of result paths should be the same as the number of geometries", + ) + extracted_gpd = base_gpd[base_gpd.extract == True].reset_index(drop=True) + # In this case we want to burn the metadata in a new file in the same folder as the S2 product + for idx, item in enumerate(job_items): + sample_id = extracted_gpd.iloc[idx].sample_id + ref_id = extracted_gpd.iloc[idx].ref_id + confidence = extracted_gpd.iloc[idx].confidence + valid_date = extracted_gpd.iloc[idx].valid_date + + item_asset_path = Path(list(item.assets.values())[0].href) + # Read information from the item file (could also read it from the item object metadata) + result_ds = xr.open_dataset(item_asset_path, chunks="auto") + + # Add some metadata to the result_df netcdf file + result_ds.attrs.update( + { + "start_date": row.start_date, + "end_date": row.end_date, + "valid_date": valid_date, + "GFMAP_version": version("openeo_gfmap"), + "creation_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "description": f"Sentinel2 L2A observations for sample: {sample_id}, unprocessed.", + "title": f"Sentinel2 L2A - {sample_id}", + "sample_id": sample_id, + "spatial_resolution": "10m", + } + ) + result_ds.to_netcdf(item_asset_path, format="NETCDF4", engine="h5netcdf") + + target_crs = CRS.from_wkt(result_ds.crs.attrs["crs_wkt"]) + + # Get the surrounding polygons around our extracted center geometry to rastetize them + bounds = ( + result_ds.x.min().item(), + result_ds.y.min().item(), + result_ds.x.max().item(), + result_ds.y.max().item(), + ) + bbox = box(*bounds) + surround_gpd = base_gpd.to_crs(target_crs).clip(bbox) + + # Burn the polygon croptypes + transform = from_bounds(*bounds, result_ds.x.size, result_ds.y.size) + croptype_shapes = list(zip(surround_gpd.geometry, surround_gpd.croptype_label)) + + fill_value = 0 + croptype = rasterize( + croptype_shapes, + out_shape=(result_ds.y.size, result_ds.x.size), + transform=transform, + all_touched=False, + fill=fill_value, + default_value=0, + dtype="int64", + ) + + # Create the attributes to add to the metadata + attributes = { + "ref_id": ref_id, + "sample_id": sample_id, + "confidence": str(confidence), + "valid_date": valid_date, + "_FillValue": fill_value, + "Conventions": "CF-1.9", + "GFMAP_version": version("openeo_gfmap"), + "creation_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "description": f"Contains rasterized WorldCereal labels for sample: {sample_id}.", + "title": f"WORLDCEREAL Auxiliary file for sample: {sample_id}", + "spatial_resolution": "10m", + } + + aux_dataset = xr.Dataset( + {"LABEL": (("y", "x"), croptype), "crs": result_ds["crs"]}, + coords={"y": result_ds.y, "x": result_ds.x}, + attrs=attributes, + ) + # Required to map the 'crs' layer as geo-reference for the 'LABEL' layer. + aux_dataset["LABEL"].attrs["grid_mapping"] = "crs" + + # Save the metadata in the same folder as the S2 product + metadata_path = ( + item_asset_path.parent + / f"WORLDCEREAL_10m_{sample_id}_{target_crs.to_epsg()}_{valid_date}.nc" + ) + aux_dataset.to_netcdf(metadata_path, format="NETCDF4", engine="h5netcdf") + aux_dataset.close() + + # Adds this metadata as a new asset + add_item_asset(item, metadata_path) + + return job_items + + +if __name__ == "__main__": + _setup_logger() + + parser = argparse.ArgumentParser( + description="S1 data sampling with OpenEO-GFMAP package." + ) + parser.add_argument( + "output_path", type=Path, help="Path where to save the extraction results." + ) + parser.add_argument( + "input_df", type=Path, help="Path to the input dataframe for the training data." + ) + parser.add_argument( + "--max_locations", + type=int, + default=50, + help="Maximum number of locations to extract per job.", + ) + parser.add_argument( + "--memory", type=str, default="5G", help="Memory to allocate for the executor." + ) + parser.add_argument( + "--memory-overhead", + type=str, + default="2G", + help="Memory overhead to allocate for the executor.", + ) + + args = parser.parse_args() + + tracking_df_path = Path(args.output_path) / "job_tracking.csv" + + # Load the input dataframe, and perform dataset splitting using the h3 tile + # to respect the area of interest. Also filters out the jobs that have + # no location with the extract=True flag. + _pipeline_log.info(f"Loading input dataframe from {args.input_df}.") + input_df = gpd.read_file(args.input_df) + + split_dfs = split_job_hex(input_df, max_points=args.max_locations) + split_dfs = [df for df in split_dfs if df.extract.any()] + + job_df = generate_job_df(Backend.CDSE, split_dfs) + + _pipeline_log.warning( + f"Sub-sampling the job dataframe for testing. Remove this for production." + ) + job_df = job_df.iloc[[0, 2, 3, -6]].reset_index(drop=True) + + # Setup the memory parameters for the job creator. + create_datacube_sar = partial( + create_datacube_sar, + executor_memory=args.memory, + executor_memory_overhead=args.memory_overhead, + ) + + # Setup the s2 grid for the output path generation function + generate_output_path = partial( + generate_output_path, + s2_grid=gpd.read_file("/data/users/Public/couchard/s2grid_bounds.geojson"), + ) + + manager = GFMAPJobManager( + output_dir=args.output_path, + output_path_generator=generate_output_path, + post_job_action=None, # No post-job action required for S1 + poll_sleep=60, + n_threads=2, + post_job_params={}, + ) + + manager.add_backend(Backend.CDSE.value, cdse_connection, parallel_jobs=6) + + _pipeline_log.info("Launching the jobs from the manager.") + manager.run_jobs(job_df, create_datacube_sar, tracking_df_path) From c3ee0c534e8d572ace3b103f7a20b79e7a98f2ff Mon Sep 17 00:00:00 2001 From: Darius Couchard Date: Wed, 20 Mar 2024 14:06:47 +0100 Subject: [PATCH 02/11] Added pipelines for Optical, SAR and started METEO --- scripts/extractions/extract_meteo.py | 163 ++++++++ scripts/extractions/extract_optical.py | 377 +++++++++++++------ scripts/extractions/extract_sar.py | 205 ++++------ scripts/extractions/udf_distance_to_cloud.py | 65 ++++ 4 files changed, 564 insertions(+), 246 deletions(-) create mode 100644 scripts/extractions/extract_meteo.py create mode 100644 scripts/extractions/udf_distance_to_cloud.py diff --git a/scripts/extractions/extract_meteo.py b/scripts/extractions/extract_meteo.py new file mode 100644 index 00000000..bd4e84ff --- /dev/null +++ b/scripts/extractions/extract_meteo.py @@ -0,0 +1,163 @@ +"""Extract AGERA5 (Meteo) data using OpenEO-GFMAP package.""" +import argparse +from functools import partial +from pathlib import Path +from typing import List + +import geojson +import geopandas as gpd +import openeo +import pandas as pd +from extract_sar import ( + _buffer_geometry, + _filter_extract_true, + _pipeline_log, + _setup_logger, + _upload_geoparquet_artifactory, + create_job_dataframe, + generate_output_path, +) +from openeo_gfmap import Backend, TemporalContext +from openeo_gfmap.backend import vito_connection +from openeo_gfmap.manager.job_manager import GFMAPJobManager +from openeo_gfmap.manager.job_splitters import ( + _append_h3_index, + _load_s2_grid, + split_job_s2grid, +) + + +def create_datacube_meteo( + row: pd.Series, + connection: openeo.DataCube, + provider=None, + connection_provider=None, + executor_memory: str = "2G", + executor_memory_overhead: str = "1G", +) -> gpd.GeoDataFrame: + start_date = row.start_date + end_date = row.end_date + temporal_context = TemporalContext(start_date, end_date) + + # Get the feature collection containing the geometry to the job + geometry = geojson.loads(row.geometry) + assert isinstance(geometry, geojson.FeatureCollection) + + # Filter the geometry to the rows with the extract only flag + geometry = _filter_extract_true(geometry) + assert len(geometry.features) > 0, "No geometries with the extract flag found" + + # Performs a buffer of 64 px around the geometry + geometry_df = _buffer_geometry(geometry, distance_m=5) + spatial_extent_url = _upload_geoparquet_artifactory(geometry_df, row.name) + + bands_to_download = ["temperature-mean"] + + cube = connection.load_collection( + "AGERA5", + temporal_extent=[temporal_context.start_date, temporal_context.end_date], + bands=bands_to_download, + ) + filter_geometry = connection.load_url(spatial_extent_url, format="parquet") + cube = cube.filter_spatial(filter_geometry) + cube.rename_labels( + dimension="bands", + target=["AGERA5-temperature-mean"], + source=["temperature-mean"], + ) + + h3index = geometry.features[0].properties["h3index"] + valid_date = geometry.features[0].properties["valid_date"] + + job_options = { + "executor-memory": executor_memory, + "executor-memoryOverhead": executor_memory_overhead, + } + return cube.create_job( + out_format="NetCDF", + title=f"GFMAP_Extraction_AGERA5_{h3index}_{valid_date}", + sample_by_feature=True, + job_options=job_options, + ) + + +if __name__ == "__main__": + _setup_logger() + from extract_sar import _pipeline_log + + parser = argparse.ArgumentParser( + description="AGERA5 samples extraction with OpenEO-GFMAP package." + ) + parser.add_argument( + "output_path", type=Path, help="Path where to save the extraction results." + ) + parser.add_argument( + "input_df", + type=str, + help="Path or URL to the input dataframe for the training data.", + ) + parser.add_argument( + "--max_locations", + type=int, + default=5, + help="Maximum number of locations to extract per job.", + ) + parser.add_argument( + "--memory", type=str, default="5G", help="Memory to allocate for the executor." + ) + parser.add_argument( + "--memory-overhead", + type=str, + default="3G", + help="Memory overhead to allocate for the executor.", + ) + + args = parser.parse_args() + + tracking_df_path = Path(args.output_path) / "job_tracking.csv" + + # Load the input dataframe + _pipeline_log.info("Loading input dataframe from %s.", args.input_df) + + input_df = gpd.read_file(args.input_df) + input_df = _append_h3_index(input_df, grid_resolution=3) + + split_dfs = split_job_s2grid(input_df, max_points=args.max_locations) + split_dfs = [df for df in split_dfs if df.extract.any()] + + job_df = create_job_dataframe(Backend.TERRASCOPE, split_dfs, prefix="AGERA5") + + _pipeline_log.warning( + "Sub-sampling the job dataframe for testing. Remove this for production." + ) + # job_df = job_df.iloc[[0, 2, 3, -6]].reset_index(drop=True) + job_df = job_df.iloc[[0]].reset_index(drop=True) + + # Setup the memory parameters for the job creator. + create_datacube_meteo = partial( + create_datacube_meteo, + executor_memory=args.memory, + executor_memory_overhead=args.memory_overhead, + ) + + # Setup the s2 grid for the output path generation function + generate_output_path = partial( + generate_output_path, + s2_grid=_load_s2_grid(), + ) + + manager = GFMAPJobManager( + output_dir=args.output_path, + output_path_generator=generate_output_path, + post_job_action=None, + collection_id="AGERA5-EXTRACTION", + collection_description="AGERA5 data extraction example.", + poll_sleep=60, + n_threads=2, + post_job_params={}, + ) + + manager.add_backend(Backend.TERRASCOPE.value, vito_connection, parallel_jobs=6) + + _pipeline_log.info("Launching the jobs from the manager.") + manager.run_jobs(job_df, create_datacube_meteo, tracking_df_path) diff --git a/scripts/extractions/extract_optical.py b/scripts/extractions/extract_optical.py index b007f4c7..08162810 100644 --- a/scripts/extractions/extract_optical.py +++ b/scripts/extractions/extract_optical.py @@ -1,104 +1,43 @@ -import logging -from typing import List, Optional +"""Extract S2 data using OpenEO-GFMAP package.""" +import argparse +import json +from datetime import datetime +from functools import partial +from importlib.metadata import version +from pathlib import Path +from typing import List import geojson import geopandas as gpd import openeo import pandas as pd +import pystac +import xarray as xr from extract_sar import ( _buffer_geometry, _filter_extract_true, + _get_job_nb_polygons, + _pipeline_log, + _setup_logger, _upload_geoparquet_artifactory, + create_job_dataframe, + generate_output_path, ) from openeo_gfmap import Backend, BackendContext, FetchType, TemporalContext +from openeo_gfmap.backend import cdse_staging_connection from openeo_gfmap.fetching.s2 import build_sentinel2_l2a_extractor from openeo_gfmap.manager import _log - -# Logger for this current pipeline -_pipeline_log: Optional[logging.Logger] = None - - -def _setup_logger(level=logging.INFO) -> None: - global _pipeline_log - """Setup the logger from the openeo_gfmap package to the assigned level.""" - _pipeline_log = logging.getLogger("pipeline_sar") - - _pipeline_log.setLevel(level) - _log.setLevel(level) - - stream_handler = logging.StreamHandler() - _log.addHandler(stream_handler) - _pipeline_log.addHandler(stream_handler) - - formatter = logging.Formatter("%(asctime)s|%(name)s|%(levelname)s: %(message)s") - stream_handler.setFormatter(formatter) - - # Exclude the other loggers from other libraries - class ManagerLoggerFilter(logging.Filter): - """Filter to only accept the OpenEO-GFMAP manager logs.""" - - def filter(self, record): - return record.name in [_log.name, _pipeline_log.name] - - stream_handler.addFilter(ManagerLoggerFilter()) - - -def get_job_nb_polygons(row: pd.Series) -> int: - """Get the number of polygons in the geometry.""" - return len( - list( - filter( - lambda feat: feat.properties.get("extract"), - geojson.loads(row.geometry)["features"], - ) - ) - ) - - -def create_job_dataframe_s2( - backend: Backend, split_jobs: List[gpd.GeoDataFrame] -) -> pd.DataFrame: - """Create a dataframe from the split jobs, containg all the necessary information to run the job.""" - columns = [ - "backend_name", - "out_prefix", - "out_extension", - "start_date", - "end_date", - "s2_tile", - "h3index", - "geometry", - ] - rows = [] - for job in split_jobs: - # Compute the average in the valid date and make a buffer of 1.5 year around - median_time = pd.to_datetime(job.valid_date).mean() - start_date = median_time - pd.Timedelta(days=275) # A bit more than 9 months - end_date = median_time + pd.Timedelta(days=275) # A bit more than 9 months - s2_tile = job.tile.iloc[0] # Job dataframes are split depending on the - h3index = job.h3index.iloc[0] - - rows.append( - pd.Series( - dict( - zip( - columns, - [ - backend.value, - "S2-L2A-10m", - ".nc", - start_date.strftime("%Y-%m-%d"), - end_date.strftime("%Y-%m-%d"), - s2_tile, - h3index, - job.to_json(), - ], - ) - ) - ) - ) - - return pd.DataFrame(rows) +from openeo_gfmap.manager.job_manager import GFMAPJobManager +from openeo_gfmap.manager.job_splitters import ( + _append_h3_index, + _load_s2_grid, + split_job_s2grid, +) +from openeo_gfmap.stac import AUXILIARY +from pyproj import CRS +from rasterio.features import rasterize +from rasterio.transform import from_bounds +from shapely.geometry import box def create_datacube_optical( @@ -122,7 +61,7 @@ def create_datacube_optical( assert len(geometry.features) > 0, "No geometries with the extract flag found" # Performs a buffer of 64 px around the geometry - geometry_df = _buffer_geometry(geometry, 320) + geometry_df = _buffer_geometry(geometry) spatial_extent_url = _upload_geoparquet_artifactory(geometry_df, row.name) # Backend name and fetching type @@ -146,6 +85,27 @@ def create_datacube_optical( "S2-L2A-SCL", ] + # Compute the SCL dilation and add it to the cube + sub_collection = connection.load_collection( + collection_id="SENTINEL2_L2A", + bands=["SCL"], + temporal_extent=[start_date, end_date], + properties={ + "eo:cloud_cover": lambda val: val <= 95.0, + "tileId": lambda val: val == row.s2_tile, + }, + ) + scl_dilated_mask = sub_collection.process( + "to_scl_dilation_mask", + data=sub_collection, + scl_band_name="SCL", + kernel1_size=17, # 17px dilation on a 20m layer + kernel2_size=77, # 77px dilation on a 20m layer + mask1_values=[2, 4, 5, 6, 7], + mask2_values=[3, 8, 9, 10, 11], + erosion_kernel_size=3, + ).rename_labels("bands", ["S2-L2A-SCL_DILATED_MASK"]) + # Create the job to extract S2 extraction_parameters = { "target_resolution": 10, @@ -153,6 +113,7 @@ def create_datacube_optical( "eo:cloud_cover": lambda val: val <= 95.0, "tileId": lambda val: val == row.s2_tile, }, + "additional_mask": scl_dilated_mask, # Add an additional mask computed from the SCL layer } extractor = build_sentinel2_l2a_extractor( backend_context, @@ -163,37 +124,239 @@ def create_datacube_optical( cube = extractor.get_cube(connection, spatial_extent_url, temporal_context) - # Compute the SCL dilation and add it to the cube - scl_dilated_mask = cube.process( - "to_scl_dilation_mask", - data=cube, - scl_band_name="S2-L2A-SCL", - kernel1_size=17, # 17px dilation on a 20m layer - kernel2_size=77, # 77px dilation on a 20m layer - mask1_values=[2, 4, 5, 6, 7], - mask2_values=[3, 8, 9, 10, 11], - erosion_kernel_size=3, - ).rename_labels("bands", ["S2-L2A-SCL_DILATED_MASK"]) + # Compute the distance to cloud and add it to the cube + scl = cube.filter_bands(["S2-L2A-SCL"]) + distance_to_cloud = scl.apply_neighborhood( + process=openeo.UDF.from_file( + Path(__file__).parent / "udf_distance_to_cloud.py" + ), + size=[ + {"dimension": "x", "unit": "px", "value": 256}, + {"dimension": "y", "unit": "px", "value": 256}, + ], + overlap=[ + {"dimension": "x", "unit": "px", "value": 16}, + {"dimension": "y", "unit": "px", "value": 16}, + ], + ).rename_labels("bands", ["S2-L2A-DISTANCE-TO-CLOUD"]) + cube = cube.merge_cubes(distance_to_cloud) - cube = cube.merge_cubes(scl_dilated_mask) cube = cube.linear_scale_range(0, 65534, 0, 65534) # Get the h3index to use in the tile - h3index = geometry.features[0].properties["h3index"] + s2_tile = row.s2_tile valid_date = geometry.features[0].properties["valid_date"] # Increase the memory of the jobs depending on the number of polygons to extract - number_polygons = get_job_nb_polygons(row) - _log.debug(f"Number of polygons to extract: {number_polygons}") + number_polygons = _get_job_nb_polygons(row) + _log.debug("Number of polygons to extract %s", number_polygons) job_options = { - "executor-memory": "5G", - "executor-memoryOverhead": "2G", + "executor-memory": executor_memory, + "executor-memoryOverhead": executor_memory_overhead, } return cube.create_job( out_format="NetCDF", - title=f"GFMAP_Extraction_S2_{h3index}_{valid_date}", + title=f"GFMAP_Extraction_S2_{s2_tile}_{valid_date}", sample_by_feature=True, job_options=job_options, ) + + +def add_item_asset(related_item: pystac.Item, path: Path): + asset = AUXILIARY.create_asset(href=path.as_posix()) + related_item.add_asset("auxiliary", asset) + + +def post_job_action( + job_items: List[pystac.Item], row: pd.Series, parameters: dict = {} +) -> list: + base_gpd = gpd.GeoDataFrame.from_features(json.loads(row.geometry)).set_crs( + epsg=4326 + ) + assert len(base_gpd[base_gpd.extract]) == len( + job_items + ), "The number of result paths should be the same as the number of geometries" + + extracted_gpd = base_gpd[base_gpd.extract].reset_index(drop=True) + # In this case we want to burn the metadata in a new file in the same folder as the S2 product + for idx, item in enumerate(job_items): + if "sample_id" in extracted_gpd.columns: + sample_id = extracted_gpd.iloc[idx].sample_id + else: + sample_id = extracted_gpd.iloc[idx].sampleID + + ref_id = extracted_gpd.iloc[idx].ref_id + confidence = extracted_gpd.iloc[idx].confidence + valid_date = extracted_gpd.iloc[idx].valid_date + h3index = extracted_gpd.iloc[idx].h3index + s2_tile = row.s2_tile + + item_asset_path = Path(list(item.assets.values())[0].href) + # Read information from the item file (could also read it from the item object metadata) + result_ds = xr.open_dataset(item_asset_path) + + # Add some metadata to the result_df netcdf file + result_ds.attrs.update( + { + "start_date": row.start_date, + "end_date": row.end_date, + "valid_date": valid_date, + "GFMAP_version": version("openeo_gfmap"), + "creation_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "description": f"Sentinel2 L2A observations for sample: {sample_id}, unprocessed.", + "title": f"Sentinel2 L2A - {sample_id}", + "sample_id": sample_id, + "ref_id": ref_id, + "spatial_resolution": "10m", + "s2_tile": s2_tile, + "h3index": h3index, + } + ) + result_ds.to_netcdf(item_asset_path) + + target_crs = CRS.from_wkt(result_ds.crs.attrs["crs_wkt"]) + + # Get the surrounding polygons around our extracted center geometry to rastetize them + bounds = ( + result_ds.x.min().item(), + result_ds.y.min().item(), + result_ds.x.max().item(), + result_ds.y.max().item(), + ) + bbox = box(*bounds) + surround_gpd = base_gpd.to_crs(target_crs).clip(bbox) + + # Burn the polygon croptypes + transform = from_bounds(*bounds, result_ds.x.size, result_ds.y.size) + croptype_shapes = list(zip(surround_gpd.geometry, surround_gpd.croptype_label)) + + fill_value = 0 + croptype = rasterize( + croptype_shapes, + out_shape=(result_ds.y.size, result_ds.x.size), + transform=transform, + all_touched=False, + fill=fill_value, + default_value=0, + dtype="int64", + ) + + # Create the attributes to add to the metadata + attributes = { + "ref_id": ref_id, + "sample_id": sample_id, + "confidence": str(confidence), + "valid_date": valid_date, + "_FillValue": fill_value, + "Conventions": "CF-1.9", + "GFMAP_version": version("openeo_gfmap"), + "creation_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "description": f"Contains rasterized WorldCereal labels for sample: {sample_id}.", + "title": f"WORLDCEREAL Auxiliary file for sample: {sample_id}", + "spatial_resolution": "10m", + } + + aux_dataset = xr.Dataset( + {"LABEL": (("y", "x"), croptype), "crs": result_ds["crs"]}, + coords={"y": result_ds.y, "x": result_ds.x}, + attrs=attributes, + ) + # Required to map the 'crs' layer as geo-reference for the 'LABEL' layer. + aux_dataset["LABEL"].attrs["grid_mapping"] = "crs" + + # Save the metadata in the same folder as the S2 product + metadata_path = ( + item_asset_path.parent + / f"WORLDCEREAL-10m_{sample_id}_{target_crs.to_epsg()}_{valid_date}.nc" + ) + aux_dataset.to_netcdf(metadata_path, format="NETCDF4", engine="h5netcdf") + aux_dataset.close() + + # Adds this metadata as a new asset + add_item_asset(item, metadata_path) + + return job_items + + +if __name__ == "__main__": + _setup_logger() + from extract_sar import _pipeline_log + + parser = argparse.ArgumentParser( + description="S2 samples extraction with OpenEO-GFMAP package." + ) + parser.add_argument( + "output_path", type=Path, help="Path where to save the extraction results." + ) + parser.add_argument( + "input_df", + type=str, + help="Path or URL to the input dataframe for the training data.", + ) + parser.add_argument( + "--max_locations", + type=int, + default=100, + help="Maximum number of locations to extract per job.", + ) + parser.add_argument( + "--memory", type=str, default="5G", help="Memory to allocate for the executor." + ) + parser.add_argument( + "--memory-overhead", + type=str, + default="3G", + help="Memory overhead to allocate for the executor.", + ) + + args = parser.parse_args() + + tracking_df_path = Path(args.output_path) / "job_tracking.csv" + + # Load the input dataframe + _pipeline_log.info("Loading input dataframe from %s.", args.input_df) + + input_df = gpd.read_file(args.input_df) + input_df = _append_h3_index(input_df, grid_resolution=3) + + split_dfs = split_job_s2grid(input_df, max_points=args.max_locations) + split_dfs = [df for df in split_dfs if df.extract.any()] + + job_df = create_job_dataframe(Backend.CDSE_STAGING, split_dfs, prefix="S2-L2A-10m") + + _pipeline_log.warning( + "Sub-sampling the job dataframe for testing. Remove this for production." + ) + + # Setup the memory parameters for the job creator. + create_datacube_optical = partial( + create_datacube_optical, + executor_memory=args.memory, + executor_memory_overhead=args.memory_overhead, + ) + + # Setup the s2 grid for the output path generation function + generate_output_path = partial( + generate_output_path, + s2_grid=_load_s2_grid(), + ) + + manager = GFMAPJobManager( + output_dir=args.output_path, + output_path_generator=generate_output_path, + post_job_action=post_job_action, + collection_id="SENTINEL2-EXTRACTION", + collection_description="Sentinel-2 and Auxiliary data extraction example.", + poll_sleep=60, + n_threads=2, + post_job_params={}, + ) + + manager.add_backend( + Backend.CDSE_STAGING.value, cdse_staging_connection, parallel_jobs=6 + ) + + _pipeline_log.info("Launching the jobs from the manager.") + manager.run_jobs(job_df, create_datacube_optical, tracking_df_path) diff --git a/scripts/extractions/extract_sar.py b/scripts/extractions/extract_sar.py index 2ce66cfe..feb2049a 100644 --- a/scripts/extractions/extract_sar.py +++ b/scripts/extractions/extract_sar.py @@ -24,10 +24,7 @@ from openeo_gfmap.manager.job_manager import GFMAPJobManager from openeo_gfmap.manager.job_splitters import split_job_hex from openeo_gfmap.stac import AUXILIARY -from pyproj import CRS, Transformer -from rasterio.features import rasterize -from rasterio.transform import from_bounds -from shapely.geometry import Point, box +from shapely.geometry import Point # Logger for this current pipeline _pipeline_log: Optional[logging.Logger] = None @@ -58,7 +55,9 @@ def filter(self, record): stream_handler.addFilter(ManagerLoggerFilter()) -def _buffer_geometry(geometries: geojson.FeatureCollection) -> gpd.GeoDataFrame: +def _buffer_geometry( + geometries: geojson.FeatureCollection, distance_m: int = 320 +) -> gpd.GeoDataFrame: """For each geometry of the colleciton, perform a square buffer of 320 meters on the centroid and return the GeoDataFrame. Before buffering, the centroid is clipped to the closest 20m multiplier in order to stay @@ -72,7 +71,7 @@ def _buffer_geometry(geometries: geojson.FeatureCollection) -> gpd.GeoDataFrame: gdf["geometry"] = gdf.centroid.apply( lambda point: Point(round(point.x / 20.0) * 20.0, round(point.y / 20.0) * 20.0) ).buffer( - distance=320, cap_style=3 + distance=distance_m, cap_style=3 ) # Square buffer return gdf @@ -117,26 +116,60 @@ def _upload_geoparquet_artifactory(gdf: gpd.GeoDataFrame, name: str) -> str: return upload_url -def generate_job_df( - backend: Backend, split_dataframes: List[gpd.GeoDataFrame] +def _get_job_nb_polygons(row: pd.Series) -> int: + """Get the number of polygons in the geometry.""" + return len( + list( + filter( + lambda feat: feat.properties.get("extract"), + geojson.loads(row.geometry)["features"], + ) + ) + ) + + +def generate_output_path( + root_folder: Path, geometry_index: int, row: pd.Series, s2_grid: gpd.GeoDataFrame +): + features = geojson.loads(row.geometry) + sample_id = features[geometry_index].properties.get("sample_id", None) + if sample_id is None: + sample_id = features[geometry_index].properties["sampleID"] + ref_id = features[geometry_index].properties["ref_id"] + + s2_tile_id = row.s2_tile + h3index = row.h3index + epsg = s2_grid[s2_grid.tile == s2_tile_id].iloc[0].epsg + + subfolder = root_folder / ref_id / h3index / sample_id + return ( + subfolder + / f"{row.out_prefix}_{sample_id}_{epsg}_{row.start_date}_{row.end_date}{row.out_extension}" + ) + + +def create_job_dataframe( + backend: Backend, split_jobs: List[gpd.GeoDataFrame], prefix: str = "S1-SIGMA0-10m" ) -> pd.DataFrame: - """Generates a dataframe for each job that will be executed by the GFMAPJobManager. - This step also includes additional information that will be used later in other steps. - """ + """Create a dataframe from the split jobs, containg all the necessary information to run the job.""" columns = [ "backend_name", "out_prefix", "out_extension", "start_date", "end_date", + "s2_tile", + "h3index", "geometry", ] rows = [] - for job in split_dataframes: + for job in split_jobs: # Compute the average in the valid date and make a buffer of 1.5 year around median_time = pd.to_datetime(job.valid_date).mean() start_date = median_time - pd.Timedelta(days=275) # A bit more than 9 months end_date = median_time + pd.Timedelta(days=275) # A bit more than 9 months + s2_tile = job.tile.iloc[0] # Job dataframes are split depending on the + h3index = job.h3index.iloc[0] rows.append( pd.Series( @@ -145,16 +178,19 @@ def generate_job_df( columns, [ backend.value, - "S1-SIGMA0-20m", + prefix, ".nc", start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d"), + s2_tile, + h3index, job.to_json(), ], ) ) ) ) + return pd.DataFrame(rows) @@ -226,57 +262,6 @@ def create_datacube_sar( ) -def generate_output_path( - root_folder: Path, - tmp_path: Path, - geometry_index: int, - row: pd.Series, - s2_grid: gpd.GeoDataFrame, -) -> Path: - """Generates a path where to save the output result, using the root folder""" - features = geojson.loads(row.geometry) - sample_id = features[geometry_index].properties["sample_id"] - ref_id = features[geometry_index].properties["ref_id"] - try: - # Loads the centroid of the tile in latlon coordinates - inds = xr.open_dataset(tmp_path, chunks="auto") - - source_crs = CRS.from_wkt(inds.crs.attrs["crs_wkt"]) - dst_crs = CRS.from_epsg(4326) - transformer = Transformer.from_crs(source_crs, dst_crs, always_xy=True) - - # Get the center point of the tile - centroid_utm = box( - inds.x.min().item(), - inds.y.min().item(), - inds.x.max().item(), - inds.y.max().item(), - ).centroid - centroid_latlon = Point(*transformer.transform(centroid_utm.x, centroid_utm.y)) - - # Intersecting with the s2 grid, take the tile that is the closest from this centroid - intersecting = s2_grid.geometry.intersects(centroid_latlon) - - # Select the intersecting cell that has a centroid the closest from the point - intersecting_cells = s2_grid[intersecting] - intersecting_cells["distance"] = intersecting_cells.distance(centroid_latlon) - intersecting_cells.sort_values("distance", inplace=True) - s2_tile = intersecting_cells.iloc[0] - - s2_tile_id = s2_tile.tile - - subfolder = ( - root_folder / ref_id / str(source_crs.to_epsg()) / s2_tile_id / sample_id - ) - except: # NOQA - subfolder = root_folder / "unsortable" - - return ( - subfolder - / f"{row.out_prefix}_{sample_id}_{source_crs.to_epsg()}_{row.start_date}_{row.end_date}{row.out_extension}" - ) - - def add_item_asset(related_item: pystac.Item, path: Path): asset = AUXILIARY.create_asset(href=path.as_posix()) related_item.add_asset("auxiliary", asset) @@ -288,17 +273,16 @@ def post_job_action( base_gpd = gpd.GeoDataFrame.from_features(json.loads(row.geometry)).set_crs( epsg=4326 ) - assert ( - len(base_gpd[base_gpd.extract == True]) == len(job_items), - "The number of result paths should be the same as the number of geometries", - ) - extracted_gpd = base_gpd[base_gpd.extract == True].reset_index(drop=True) + assert len(base_gpd[base_gpd.extract]) == len( + job_items + ), "The number of result paths should be the same as the number of geometries" + extracted_gpd = base_gpd[base_gpd.extract].reset_index(drop=True) # In this case we want to burn the metadata in a new file in the same folder as the S2 product for idx, item in enumerate(job_items): sample_id = extracted_gpd.iloc[idx].sample_id ref_id = extracted_gpd.iloc[idx].ref_id - confidence = extracted_gpd.iloc[idx].confidence valid_date = extracted_gpd.iloc[idx].valid_date + h3index = extracted_gpd.iloc[idx].h3index item_asset_path = Path(list(item.assets.values())[0].href) # Read information from the item file (could also read it from the item object metadata) @@ -312,74 +296,15 @@ def post_job_action( "valid_date": valid_date, "GFMAP_version": version("openeo_gfmap"), "creation_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - "description": f"Sentinel2 L2A observations for sample: {sample_id}, unprocessed.", - "title": f"Sentinel2 L2A - {sample_id}", + "description": f"Sentinel1 GRD observations for sample: {sample_id}, unprocessed.", + "title": f"Sentinel1 GRD - {sample_id}", "sample_id": sample_id, + "ref_id": ref_id, "spatial_resolution": "10m", + "h3index": h3index, } ) - result_ds.to_netcdf(item_asset_path, format="NETCDF4", engine="h5netcdf") - - target_crs = CRS.from_wkt(result_ds.crs.attrs["crs_wkt"]) - - # Get the surrounding polygons around our extracted center geometry to rastetize them - bounds = ( - result_ds.x.min().item(), - result_ds.y.min().item(), - result_ds.x.max().item(), - result_ds.y.max().item(), - ) - bbox = box(*bounds) - surround_gpd = base_gpd.to_crs(target_crs).clip(bbox) - - # Burn the polygon croptypes - transform = from_bounds(*bounds, result_ds.x.size, result_ds.y.size) - croptype_shapes = list(zip(surround_gpd.geometry, surround_gpd.croptype_label)) - - fill_value = 0 - croptype = rasterize( - croptype_shapes, - out_shape=(result_ds.y.size, result_ds.x.size), - transform=transform, - all_touched=False, - fill=fill_value, - default_value=0, - dtype="int64", - ) - - # Create the attributes to add to the metadata - attributes = { - "ref_id": ref_id, - "sample_id": sample_id, - "confidence": str(confidence), - "valid_date": valid_date, - "_FillValue": fill_value, - "Conventions": "CF-1.9", - "GFMAP_version": version("openeo_gfmap"), - "creation_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - "description": f"Contains rasterized WorldCereal labels for sample: {sample_id}.", - "title": f"WORLDCEREAL Auxiliary file for sample: {sample_id}", - "spatial_resolution": "10m", - } - - aux_dataset = xr.Dataset( - {"LABEL": (("y", "x"), croptype), "crs": result_ds["crs"]}, - coords={"y": result_ds.y, "x": result_ds.x}, - attrs=attributes, - ) - # Required to map the 'crs' layer as geo-reference for the 'LABEL' layer. - aux_dataset["LABEL"].attrs["grid_mapping"] = "crs" - - # Save the metadata in the same folder as the S2 product - metadata_path = ( - item_asset_path.parent - / f"WORLDCEREAL_10m_{sample_id}_{target_crs.to_epsg()}_{valid_date}.nc" - ) - aux_dataset.to_netcdf(metadata_path, format="NETCDF4", engine="h5netcdf") - aux_dataset.close() - - # Adds this metadata as a new asset - add_item_asset(item, metadata_path) + result_ds.to_netcdf(item_asset_path) return job_items @@ -388,7 +313,7 @@ def post_job_action( _setup_logger() parser = argparse.ArgumentParser( - description="S1 data sampling with OpenEO-GFMAP package." + description="S1 samples extraction with OpenEO-GFMAP package." ) parser.add_argument( "output_path", type=Path, help="Path where to save the extraction results." @@ -419,18 +344,18 @@ def post_job_action( # Load the input dataframe, and perform dataset splitting using the h3 tile # to respect the area of interest. Also filters out the jobs that have # no location with the extract=True flag. - _pipeline_log.info(f"Loading input dataframe from {args.input_df}.") + _pipeline_log.info("Loading input dataframe from %s.", args.input_df) + input_df = gpd.read_file(args.input_df) split_dfs = split_job_hex(input_df, max_points=args.max_locations) split_dfs = [df for df in split_dfs if df.extract.any()] - job_df = generate_job_df(Backend.CDSE, split_dfs) + job_df = create_job_dataframe(Backend.CDSE, split_dfs) _pipeline_log.warning( - f"Sub-sampling the job dataframe for testing. Remove this for production." + "Sub-sampling the job dataframe for testing. Remove this for production." ) - job_df = job_df.iloc[[0, 2, 3, -6]].reset_index(drop=True) # Setup the memory parameters for the job creator. create_datacube_sar = partial( @@ -449,6 +374,8 @@ def post_job_action( output_dir=args.output_path, output_path_generator=generate_output_path, post_job_action=None, # No post-job action required for S1 + collection_id="SENTINEL2-EXTRACTION", + collection_description=("Sentinel-2 and Auxiliary data extraction example."), poll_sleep=60, n_threads=2, post_job_params={}, diff --git a/scripts/extractions/udf_distance_to_cloud.py b/scripts/extractions/udf_distance_to_cloud.py new file mode 100644 index 00000000..00d32da8 --- /dev/null +++ b/scripts/extractions/udf_distance_to_cloud.py @@ -0,0 +1,65 @@ +import numpy as np +import xarray as xr +from openeo.udf import XarrayDataCube +from scipy.ndimage import distance_transform_cdt +from skimage.morphology import binary_erosion, footprints + + +def apply_datacube(cube: XarrayDataCube, context: dict) -> XarrayDataCube: + cube_array: xr.DataArray = cube.get_array() + cube_array = cube_array.transpose("t", "bands", "y", "x") + + clouds = np.logical_or( + np.logical_and(cube_array < 11, cube_array >= 8), cube_array == 3 + ).isel(bands=0) + + # Calculate the Distance To Cloud score + # Erode + er = footprints.disk(3) + + # Define a function to apply binary erosion + def erode(image, selem): + return ~binary_erosion(image, selem) + + # Use apply_ufunc to apply the erosion operation + eroded = xr.apply_ufunc( + erode, # function to apply + clouds, # input DataArray + input_core_dims=[["y", "x"]], # dimensions over which to apply function + output_core_dims=[["y", "x"]], # dimensions of the output + vectorize=True, # vectorize the function over non-core dimensions + dask="parallelized", # enable dask parallelization + output_dtypes=[np.int32], # data type of the output + kwargs={"selem": er}, # additional keyword arguments to pass to erode + ) + + # Distance to cloud in manhattan distance measure + distance = xr.apply_ufunc( + distance_transform_cdt, + eroded, + input_core_dims=[["y", "x"]], + output_core_dims=[["y", "x"]], + vectorize=True, + dask="parallelized", + output_dtypes=[np.int32], + ) + + distance_da = xr.DataArray( + distance, + coords={ + "t": cube_array.coords["t"], + "y": cube_array.coords["y"], + "x": cube_array.coords["x"], + }, + dims=["t", "y", "x"], + ) + + distance_da = distance_da.expand_dims( + dim={ + "bands": cube_array.coords["bands"], + }, + ) + + distance_da = distance_da.transpose("t", "bands", "y", "x") + + return XarrayDataCube(distance_da) From 9a14cee8cf0e5f414841fd94ce811e71acdb74d6 Mon Sep 17 00:00:00 2001 From: Darius Couchard Date: Wed, 20 Mar 2024 14:15:38 +0100 Subject: [PATCH 03/11] Added subsampling --- scripts/extractions/extract_optical.py | 1 + scripts/extractions/extract_sar.py | 1 + 2 files changed, 2 insertions(+) diff --git a/scripts/extractions/extract_optical.py b/scripts/extractions/extract_optical.py index 08162810..4074b993 100644 --- a/scripts/extractions/extract_optical.py +++ b/scripts/extractions/extract_optical.py @@ -329,6 +329,7 @@ def post_job_action( _pipeline_log.warning( "Sub-sampling the job dataframe for testing. Remove this for production." ) + job_df = job_df.iloc[[0]] # Setup the memory parameters for the job creator. create_datacube_optical = partial( diff --git a/scripts/extractions/extract_sar.py b/scripts/extractions/extract_sar.py index feb2049a..3edac37c 100644 --- a/scripts/extractions/extract_sar.py +++ b/scripts/extractions/extract_sar.py @@ -356,6 +356,7 @@ def post_job_action( _pipeline_log.warning( "Sub-sampling the job dataframe for testing. Remove this for production." ) + job_df = job_df.iloc[[0]] # Setup the memory parameters for the job creator. create_datacube_sar = partial( From 9ea421e028b2e2af238e5b112cf85874eeebe17f Mon Sep 17 00:00:00 2001 From: Darius Couchard Date: Thu, 21 Mar 2024 14:31:16 +0100 Subject: [PATCH 04/11] Optimized extraction workflows --- scripts/extractions/extract_meteo.py | 1 - scripts/extractions/extract_optical.py | 63 ++++++++++++++++---------- scripts/extractions/extract_sar.py | 28 ++++++++---- 3 files changed, 60 insertions(+), 32 deletions(-) diff --git a/scripts/extractions/extract_meteo.py b/scripts/extractions/extract_meteo.py index bd4e84ff..f5873670 100644 --- a/scripts/extractions/extract_meteo.py +++ b/scripts/extractions/extract_meteo.py @@ -2,7 +2,6 @@ import argparse from functools import partial from pathlib import Path -from typing import List import geojson import geopandas as gpd diff --git a/scripts/extractions/extract_optical.py b/scripts/extractions/extract_optical.py index 4074b993..13acb540 100644 --- a/scripts/extractions/extract_optical.py +++ b/scripts/extractions/extract_optical.py @@ -33,12 +33,24 @@ _load_s2_grid, split_job_s2grid, ) -from openeo_gfmap.stac import AUXILIARY from pyproj import CRS from rasterio.features import rasterize from rasterio.transform import from_bounds from shapely.geometry import box +AUXILIARY = pystac.extensions.item_assets.AssetDefinition( + { + "title": "ground truth data", + "description": "This asset contains the crop type codes.", + "type": "application/x-netcdf", + "roles": ["data"], + "proj:shape": [64, 64], + "raster:bands": [ + {"name": "CROPTYPE", "data_type": "uint16", "bits_per_sample": 16} + ], + } +) + def create_datacube_optical( row: pd.Series, @@ -85,7 +97,7 @@ def create_datacube_optical( "S2-L2A-SCL", ] - # Compute the SCL dilation and add it to the cube + # Extract the SCL collection only sub_collection = connection.load_collection( collection_id="SENTINEL2_L2A", bands=["SCL"], @@ -95,6 +107,8 @@ def create_datacube_optical( "tileId": lambda val: val == row.s2_tile, }, ) + + # Compute the SCL dilation mask scl_dilated_mask = sub_collection.process( "to_scl_dilation_mask", data=sub_collection, @@ -106,14 +120,32 @@ def create_datacube_optical( erosion_kernel_size=3, ).rename_labels("bands", ["S2-L2A-SCL_DILATED_MASK"]) + # Compute the distance to cloud and add it to the cube + distance_to_cloud = sub_collection.apply_neighborhood( + process=openeo.UDF.from_file( + Path(__file__).parent / "udf_distance_to_cloud.py" + ), + size=[ + {"dimension": "x", "unit": "px", "value": 256}, + {"dimension": "y", "unit": "px", "value": 256}, + ], + overlap=[ + {"dimension": "x", "unit": "px", "value": 16}, + {"dimension": "y", "unit": "px", "value": 16}, + ], + ).rename_labels("bands", ["S2-L2A-DISTANCE-TO-CLOUD"]) + + additional_masks = scl_dilated_mask.merge_cubes(distance_to_cloud) + # Create the job to extract S2 extraction_parameters = { - "target_resolution": 10, + "target_resolution": None, # Disable target resolution "load_collection": { "eo:cloud_cover": lambda val: val <= 95.0, "tileId": lambda val: val == row.s2_tile, }, - "additional_mask": scl_dilated_mask, # Add an additional mask computed from the SCL layer + "pre_merge": additional_masks, # Add an additional mask computed from the SCL layer + "pre_mask": scl_dilated_mask, } extractor = build_sentinel2_l2a_extractor( backend_context, @@ -124,23 +156,6 @@ def create_datacube_optical( cube = extractor.get_cube(connection, spatial_extent_url, temporal_context) - # Compute the distance to cloud and add it to the cube - scl = cube.filter_bands(["S2-L2A-SCL"]) - distance_to_cloud = scl.apply_neighborhood( - process=openeo.UDF.from_file( - Path(__file__).parent / "udf_distance_to_cloud.py" - ), - size=[ - {"dimension": "x", "unit": "px", "value": 256}, - {"dimension": "y", "unit": "px", "value": 256}, - ], - overlap=[ - {"dimension": "x", "unit": "px", "value": 16}, - {"dimension": "y", "unit": "px", "value": 16}, - ], - ).rename_labels("bands", ["S2-L2A-DISTANCE-TO-CLOUD"]) - cube = cube.merge_cubes(distance_to_cloud) - cube = cube.linear_scale_range(0, 65534, 0, 65534) # Get the h3index to use in the tile @@ -298,11 +313,11 @@ def post_job_action( parser.add_argument( "--max_locations", type=int, - default=100, + default=500, help="Maximum number of locations to extract per job.", ) parser.add_argument( - "--memory", type=str, default="5G", help="Memory to allocate for the executor." + "--memory", type=str, default="3G", help="Memory to allocate for the executor." ) parser.add_argument( "--memory-overhead", @@ -361,3 +376,5 @@ def post_job_action( _pipeline_log.info("Launching the jobs from the manager.") manager.run_jobs(job_df, create_datacube_optical, tracking_df_path) + + manager.create_stac(asset_definitions={"auxiliary": AUXILIARY}) diff --git a/scripts/extractions/extract_sar.py b/scripts/extractions/extract_sar.py index 3edac37c..ba0ddc4b 100644 --- a/scripts/extractions/extract_sar.py +++ b/scripts/extractions/extract_sar.py @@ -22,7 +22,11 @@ from openeo_gfmap.fetching.s1 import build_sentinel1_grd_extractor from openeo_gfmap.manager import _log from openeo_gfmap.manager.job_manager import GFMAPJobManager -from openeo_gfmap.manager.job_splitters import split_job_hex +from openeo_gfmap.manager.job_splitters import ( + _append_h3_index, + _load_s2_grid, + split_job_s2grid, +) from openeo_gfmap.stac import AUXILIARY from shapely.geometry import Point @@ -235,7 +239,7 @@ def create_datacube_sar( # backscatter computation. extractor = build_sentinel1_grd_extractor( backend_context=backend_context, - bands=["S1-VV", "S1-VH"], + bands=["S1-SIGMA0-VV", "S1-SIGMA0-VH"], fetch_type=FetchType.POLYGON, **extraction_parameters, ) @@ -250,6 +254,10 @@ def create_datacube_sar( h3index = geometry.features[0].properties["h3index"] valid_date = geometry.features[0].properties["valid_date"] + # Increase the memory of the jobs depending on the number of polygons to extract + number_polygons = _get_job_nb_polygons(row) + _log.debug("Number of polygons to extract %s", number_polygons) + job_options = { "executor-memory": executor_memory, "executor-memoryOverhead": executor_memory_overhead, @@ -300,7 +308,7 @@ def post_job_action( "title": f"Sentinel1 GRD - {sample_id}", "sample_id": sample_id, "ref_id": ref_id, - "spatial_resolution": "10m", + "spatial_resolution": "20m", "h3index": h3index, } ) @@ -324,7 +332,7 @@ def post_job_action( parser.add_argument( "--max_locations", type=int, - default=50, + default=100, help="Maximum number of locations to extract per job.", ) parser.add_argument( @@ -347,8 +355,9 @@ def post_job_action( _pipeline_log.info("Loading input dataframe from %s.", args.input_df) input_df = gpd.read_file(args.input_df) + input_df = _append_h3_index(input_df) - split_dfs = split_job_hex(input_df, max_points=args.max_locations) + split_dfs = split_job_s2grid(input_df, max_points=args.max_locations) split_dfs = [df for df in split_dfs if df.extract.any()] job_df = create_job_dataframe(Backend.CDSE, split_dfs) @@ -368,15 +377,15 @@ def post_job_action( # Setup the s2 grid for the output path generation function generate_output_path = partial( generate_output_path, - s2_grid=gpd.read_file("/data/users/Public/couchard/s2grid_bounds.geojson"), + s2_grid=_load_s2_grid(), ) manager = GFMAPJobManager( output_dir=args.output_path, output_path_generator=generate_output_path, post_job_action=None, # No post-job action required for S1 - collection_id="SENTINEL2-EXTRACTION", - collection_description=("Sentinel-2 and Auxiliary data extraction example."), + collection_id="SENTINEL1-EXTRACTION", + collection_description="Sentinel-1 data extraction example.", poll_sleep=60, n_threads=2, post_job_params={}, @@ -386,3 +395,6 @@ def post_job_action( _pipeline_log.info("Launching the jobs from the manager.") manager.run_jobs(job_df, create_datacube_sar, tracking_df_path) + + _pipeline_log.info("Jobs are finished, creating the STAC catalogue...") + manager.create_stac() From d46c304cffcfdfb4b93283e225017b1f557d586b Mon Sep 17 00:00:00 2001 From: Darius Couchard Date: Tue, 30 Apr 2024 12:40:29 +0200 Subject: [PATCH 05/11] Added fetchers using GFMAP, updated extraction pipeline --- scripts/extractions/extract_optical.py | 214 ++----- scripts/extractions/extract_sar.py | 28 +- scripts/inference/cropland_mapping.py | 69 ++ src/worldcereal/openeo/compositing.py | 23 - src/worldcereal/openeo/preprocessing.py | 593 ++++++++---------- .../openeo}/udf_distance_to_cloud.py | 7 +- 6 files changed, 385 insertions(+), 549 deletions(-) create mode 100644 scripts/inference/cropland_mapping.py delete mode 100644 src/worldcereal/openeo/compositing.py rename {scripts/extractions => src/worldcereal/openeo}/udf_distance_to_cloud.py (87%) diff --git a/scripts/extractions/extract_optical.py b/scripts/extractions/extract_optical.py index 13acb540..744019c1 100644 --- a/scripts/extractions/extract_optical.py +++ b/scripts/extractions/extract_optical.py @@ -12,7 +12,6 @@ import openeo import pandas as pd import pystac -import xarray as xr from extract_sar import ( _buffer_geometry, _filter_extract_true, @@ -25,7 +24,6 @@ ) from openeo_gfmap import Backend, BackendContext, FetchType, TemporalContext from openeo_gfmap.backend import cdse_staging_connection -from openeo_gfmap.fetching.s2 import build_sentinel2_l2a_extractor from openeo_gfmap.manager import _log from openeo_gfmap.manager.job_manager import GFMAPJobManager from openeo_gfmap.manager.job_splitters import ( @@ -33,10 +31,9 @@ _load_s2_grid, split_job_s2grid, ) -from pyproj import CRS -from rasterio.features import rasterize -from rasterio.transform import from_bounds -from shapely.geometry import box +from openeo_gfmap.utils.netcdf import update_nc_attributes + +from worldcereal.openeo.preprocessing import raw_datacube_S2 AUXILIARY = pystac.extensions.item_assets.AssetDefinition( { @@ -46,7 +43,7 @@ "roles": ["data"], "proj:shape": [64, 64], "raster:bands": [ - {"name": "CROPTYPE", "data_type": "uint16", "bits_per_sample": 16} + {"name": "ewoc_code", "data_type": "int64", "bits_per_sample": 64} ], } ) @@ -80,7 +77,10 @@ def create_datacube_optical( backend = Backend(row.backend_name) backend_context = BackendContext(backend) - fetch_type = FetchType.POLYGON + # Get the h3index to use in the tile + s2_tile = row.s2_tile + valid_date = geometry.features[0].properties["valid_date"] + bands_to_download = [ "S2-L2A-B01", "S2-L2A-B02", @@ -97,78 +97,34 @@ def create_datacube_optical( "S2-L2A-SCL", ] - # Extract the SCL collection only - sub_collection = connection.load_collection( - collection_id="SENTINEL2_L2A", - bands=["SCL"], - temporal_extent=[start_date, end_date], - properties={ - "eo:cloud_cover": lambda val: val <= 95.0, - "tileId": lambda val: val == row.s2_tile, - }, - ) - - # Compute the SCL dilation mask - scl_dilated_mask = sub_collection.process( - "to_scl_dilation_mask", - data=sub_collection, - scl_band_name="SCL", - kernel1_size=17, # 17px dilation on a 20m layer - kernel2_size=77, # 77px dilation on a 20m layer - mask1_values=[2, 4, 5, 6, 7], - mask2_values=[3, 8, 9, 10, 11], - erosion_kernel_size=3, - ).rename_labels("bands", ["S2-L2A-SCL_DILATED_MASK"]) - - # Compute the distance to cloud and add it to the cube - distance_to_cloud = sub_collection.apply_neighborhood( - process=openeo.UDF.from_file( - Path(__file__).parent / "udf_distance_to_cloud.py" - ), - size=[ - {"dimension": "x", "unit": "px", "value": 256}, - {"dimension": "y", "unit": "px", "value": 256}, - ], - overlap=[ - {"dimension": "x", "unit": "px", "value": 16}, - {"dimension": "y", "unit": "px", "value": 16}, - ], - ).rename_labels("bands", ["S2-L2A-DISTANCE-TO-CLOUD"]) - - additional_masks = scl_dilated_mask.merge_cubes(distance_to_cloud) - - # Create the job to extract S2 - extraction_parameters = { - "target_resolution": None, # Disable target resolution - "load_collection": { - "eo:cloud_cover": lambda val: val <= 95.0, - "tileId": lambda val: val == row.s2_tile, - }, - "pre_merge": additional_masks, # Add an additional mask computed from the SCL layer - "pre_mask": scl_dilated_mask, - } - extractor = build_sentinel2_l2a_extractor( + cube = raw_datacube_S2( + connection, backend_context, - bands=bands_to_download, - fetch_type=fetch_type.POLYGON, - **extraction_parameters, + spatial_extent_url, + temporal_context, + bands_to_download, + FetchType.POLYGON, + filter_tile=s2_tile, + apply_mask=False, + additional_masks=True, ) - cube = extractor.get_cube(connection, spatial_extent_url, temporal_context) - - cube = cube.linear_scale_range(0, 65534, 0, 65534) - - # Get the h3index to use in the tile - s2_tile = row.s2_tile - valid_date = geometry.features[0].properties["valid_date"] - # Increase the memory of the jobs depending on the number of polygons to extract number_polygons = _get_job_nb_polygons(row) _log.debug("Number of polygons to extract %s", number_polygons) job_options = { + "driver-memory": "2G", + "driver-memoryOverhead": "2G", + "driver-cores": "1", "executor-memory": executor_memory, "executor-memoryOverhead": executor_memory_overhead, + "executor-cores": "1", + "max-executors": "34", + "soft-errors": "true", + "gdal-dataset-cache-size": 2, + "gdal-cachemax": 120, + "executor-threads-jvm": 1, } return cube.create_job( @@ -179,11 +135,6 @@ def create_datacube_optical( ) -def add_item_asset(related_item: pystac.Item, path: Path): - asset = AUXILIARY.create_asset(href=path.as_posix()) - related_item.add_asset("auxiliary", asset) - - def post_job_action( job_items: List[pystac.Item], row: pd.Series, parameters: dict = {} ) -> list: @@ -203,94 +154,30 @@ def post_job_action( sample_id = extracted_gpd.iloc[idx].sampleID ref_id = extracted_gpd.iloc[idx].ref_id - confidence = extracted_gpd.iloc[idx].confidence valid_date = extracted_gpd.iloc[idx].valid_date h3index = extracted_gpd.iloc[idx].h3index s2_tile = row.s2_tile item_asset_path = Path(list(item.assets.values())[0].href) - # Read information from the item file (could also read it from the item object metadata) - result_ds = xr.open_dataset(item_asset_path) # Add some metadata to the result_df netcdf file - result_ds.attrs.update( - { - "start_date": row.start_date, - "end_date": row.end_date, - "valid_date": valid_date, - "GFMAP_version": version("openeo_gfmap"), - "creation_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - "description": f"Sentinel2 L2A observations for sample: {sample_id}, unprocessed.", - "title": f"Sentinel2 L2A - {sample_id}", - "sample_id": sample_id, - "ref_id": ref_id, - "spatial_resolution": "10m", - "s2_tile": s2_tile, - "h3index": h3index, - } - ) - result_ds.to_netcdf(item_asset_path) - - target_crs = CRS.from_wkt(result_ds.crs.attrs["crs_wkt"]) - - # Get the surrounding polygons around our extracted center geometry to rastetize them - bounds = ( - result_ds.x.min().item(), - result_ds.y.min().item(), - result_ds.x.max().item(), - result_ds.y.max().item(), - ) - bbox = box(*bounds) - surround_gpd = base_gpd.to_crs(target_crs).clip(bbox) - - # Burn the polygon croptypes - transform = from_bounds(*bounds, result_ds.x.size, result_ds.y.size) - croptype_shapes = list(zip(surround_gpd.geometry, surround_gpd.croptype_label)) - - fill_value = 0 - croptype = rasterize( - croptype_shapes, - out_shape=(result_ds.y.size, result_ds.x.size), - transform=transform, - all_touched=False, - fill=fill_value, - default_value=0, - dtype="int64", - ) - - # Create the attributes to add to the metadata - attributes = { - "ref_id": ref_id, - "sample_id": sample_id, - "confidence": str(confidence), + new_attributes = { + "start_date": row.start_date, + "end_date": row.end_date, "valid_date": valid_date, - "_FillValue": fill_value, - "Conventions": "CF-1.9", "GFMAP_version": version("openeo_gfmap"), "creation_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - "description": f"Contains rasterized WorldCereal labels for sample: {sample_id}.", - "title": f"WORLDCEREAL Auxiliary file for sample: {sample_id}", + "description": f"Sentinel2 L2A observations for sample: {sample_id}, unprocessed.", + "title": f"Sentinel2 L2A - {sample_id}", + "sample_id": sample_id, + "ref_id": ref_id, "spatial_resolution": "10m", + "s2_tile": s2_tile, + "h3index": h3index, } - aux_dataset = xr.Dataset( - {"LABEL": (("y", "x"), croptype), "crs": result_ds["crs"]}, - coords={"y": result_ds.y, "x": result_ds.x}, - attrs=attributes, - ) - # Required to map the 'crs' layer as geo-reference for the 'LABEL' layer. - aux_dataset["LABEL"].attrs["grid_mapping"] = "crs" - - # Save the metadata in the same folder as the S2 product - metadata_path = ( - item_asset_path.parent - / f"WORLDCEREAL-10m_{sample_id}_{target_crs.to_epsg()}_{valid_date}.nc" - ) - aux_dataset.to_netcdf(metadata_path, format="NETCDF4", engine="h5netcdf") - aux_dataset.close() - - # Adds this metadata as a new asset - add_item_asset(item, metadata_path) + # Saves the new attributes in the netcdf file + update_nc_attributes(item_asset_path, new_attributes) return job_items @@ -317,12 +204,15 @@ def post_job_action( help="Maximum number of locations to extract per job.", ) parser.add_argument( - "--memory", type=str, default="3G", help="Memory to allocate for the executor." + "--memory", + type=str, + default="600m", + help="Memory to allocate for the executor.", ) parser.add_argument( - "--memory-overhead", + "--memory_overhead", type=str, - default="3G", + default="1900m", help="Memory overhead to allocate for the executor.", ) @@ -341,11 +231,6 @@ def post_job_action( job_df = create_job_dataframe(Backend.CDSE_STAGING, split_dfs, prefix="S2-L2A-10m") - _pipeline_log.warning( - "Sub-sampling the job dataframe for testing. Remove this for production." - ) - job_df = job_df.iloc[[0]] - # Setup the memory parameters for the job creator. create_datacube_optical = partial( create_datacube_optical, @@ -375,6 +260,15 @@ def post_job_action( ) _pipeline_log.info("Launching the jobs from the manager.") - manager.run_jobs(job_df, create_datacube_optical, tracking_df_path) - manager.create_stac(asset_definitions={"auxiliary": AUXILIARY}) + try: + manager.run_jobs(job_df, create_datacube_optical, tracking_df_path) + manager.create_stac( + constellation="sentinel2", item_assets={"auxiliary": AUXILIARY} + ) + except Exception as e: + _pipeline_log.error("Error during the job execution: %s", e) + manager.create_stac( + constellation="sentinel2", item_assets={"auxiliary": AUXILIARY} + ) + raise e diff --git a/scripts/extractions/extract_sar.py b/scripts/extractions/extract_sar.py index ba0ddc4b..5f760d2b 100644 --- a/scripts/extractions/extract_sar.py +++ b/scripts/extractions/extract_sar.py @@ -18,7 +18,7 @@ import requests import xarray as xr from openeo_gfmap import Backend, BackendContext, FetchType, TemporalContext -from openeo_gfmap.backend import cdse_connection +from openeo_gfmap.backend import cdse_staging_connection from openeo_gfmap.fetching.s1 import build_sentinel1_grd_extractor from openeo_gfmap.manager import _log from openeo_gfmap.manager.job_manager import GFMAPJobManager @@ -27,9 +27,10 @@ _load_s2_grid, split_job_s2grid, ) -from openeo_gfmap.stac import AUXILIARY from shapely.geometry import Point +from worldcereal.openeo.preprocessing import raw_datacube_S1 + # Logger for this current pipeline _pipeline_log: Optional[logging.Logger] = None @@ -223,7 +224,7 @@ def create_datacube_sar( assert len(geometry.features) > 0, "No geometries with the extract flag found" # Performs a buffer of 64 px around the geometry - geometry_df = _buffer_geometry(geometry) + geometry_df = _buffer_geometry(geometry, distance_m=310) spatial_extent_url = _upload_geoparquet_artifactory(geometry_df, row.name) # Backend name and fetching type @@ -251,7 +252,7 @@ def create_datacube_sar( ) # Additional values to generate the BatcJob name - h3index = geometry.features[0].properties["h3index"] + s2_tile = row.s2_tile valid_date = geometry.features[0].properties["valid_date"] # Increase the memory of the jobs depending on the number of polygons to extract @@ -264,17 +265,12 @@ def create_datacube_sar( } return cube.create_job( out_format="NetCDF", - title=f"GFMAP_Extraction_S1_{h3index}_{valid_date}", + title=f"GFMAP_Extraction_S1_{s2_tile}_{valid_date}", sample_by_feature=True, job_options=job_options, ) -def add_item_asset(related_item: pystac.Item, path: Path): - asset = AUXILIARY.create_asset(href=path.as_posix()) - related_item.add_asset("auxiliary", asset) - - def post_job_action( job_items: List[pystac.Item], row: pd.Series, parameters: dict = {} ) -> list: @@ -332,16 +328,16 @@ def post_job_action( parser.add_argument( "--max_locations", type=int, - default=100, + default=500, help="Maximum number of locations to extract per job.", ) parser.add_argument( - "--memory", type=str, default="5G", help="Memory to allocate for the executor." + "--memory", type=str, default="3G", help="Memory to allocate for the executor." ) parser.add_argument( "--memory-overhead", type=str, - default="2G", + default="1G", help="Memory overhead to allocate for the executor.", ) @@ -360,7 +356,7 @@ def post_job_action( split_dfs = split_job_s2grid(input_df, max_points=args.max_locations) split_dfs = [df for df in split_dfs if df.extract.any()] - job_df = create_job_dataframe(Backend.CDSE, split_dfs) + job_df = create_job_dataframe(Backend.CDSE_STAGING, split_dfs) _pipeline_log.warning( "Sub-sampling the job dataframe for testing. Remove this for production." @@ -391,7 +387,9 @@ def post_job_action( post_job_params={}, ) - manager.add_backend(Backend.CDSE.value, cdse_connection, parallel_jobs=6) + manager.add_backend( + Backend.CDSE_STAGING.value, cdse_staging_connection, parallel_jobs=6 + ) _pipeline_log.info("Launching the jobs from the manager.") manager.run_jobs(job_df, create_datacube_sar, tracking_df_path) diff --git a/scripts/inference/cropland_mapping.py b/scripts/inference/cropland_mapping.py new file mode 100644 index 00000000..6066b092 --- /dev/null +++ b/scripts/inference/cropland_mapping.py @@ -0,0 +1,69 @@ +"""Cropland mapping inference script, demonstrating the use of the GFMAP, Presto and WorldCereal classifiers in a first inference pipeline.""" + +import argparse + +from openeo_gfmap import BoundingBoxExtent, TemporalContext +from openeo_gfmap.backend import Backend, BackendContext, cdse_connection +from openeo_gfmap.features.feature_extractor import PatchFeatureExtractor + +from worldcereal.openeo.preprocessing import worldcereal_preprocessed_inputs_gfmap + + +class PrestoFeatureExtractor(PatchFeatureExtractor): + def __init__(self): + pass + + def extract(self, image): + pass + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + prog="WC - Cropland Inference", + description="Cropland inference using GFMAP, Presto and WorldCereal classifiers", + ) + + parser.add_argument("minx", type=float, help="Minimum X coordinate (west)") + parser.add_argument("miny", type=float, help="Minimum Y coordinate (south)") + parser.add_argument("maxx", type=float, help="Maximum X coordinate (east)") + parser.add_argument("maxy", type=float, help="Maximum Y coordinate (north)") + parser.add_argument( + "start_date", type=str, help="Starting date for data extraction." + ) + parser.add_argument("end_date", type=str, help="Ending date for data extraction.") + parser.add_argument( + "output_folder", type=str, help="Path to folder where to save results." + ) + + args = parser.parse_args() + + minx = args.minx + miny = args.miny + maxx = args.maxx + maxy = args.maxy + + start_date = args.start_date + end_date = args.end_date + + spatial_extent = BoundingBoxExtent(minx, miny, maxx, maxy) + temporal_extent = TemporalContext(start_date, end_date) + + backend = BackendContext(Backend.CDSE) + + # Preparing the input cube for the inference + input_cube = worldcereal_preprocessed_inputs_gfmap( + connection=cdse_connection(), + backend_context=backend, + spatial_extent=spatial_extent, + temporal_extent=temporal_extent, + ) + + # Start the job and download + job = input_cube.create_job( + title=f"Cropland inference BBOX: {minx} {miny} {maxx} {maxy}", + description="Cropland inference using WorldCereal, Presto and GFMAP classifiers", + out_format="NetCDF", + ) + + job.start_and_wait() + job.get_results().download_files(args.output_folder) diff --git a/src/worldcereal/openeo/compositing.py b/src/worldcereal/openeo/compositing.py deleted file mode 100644 index a7fec336..00000000 --- a/src/worldcereal/openeo/compositing.py +++ /dev/null @@ -1,23 +0,0 @@ -def max_ndvi_selection(ndvi): - max_ndvi = ndvi.max() - return ndvi.array_apply(lambda x: x != max_ndvi) - - -def max_ndvi_composite(s2_cube, composite_window="month"): - ndvi = s2_cube.ndvi(nir="B08", red="B04") - - rank_mask = ndvi.apply_neighborhood( - max_ndvi_selection, - size=[ - {"dimension": "x", "unit": "px", "value": 1}, - {"dimension": "y", "unit": "px", "value": 1}, - {"dimension": "t", "value": "month"}, - ], - overlap=[], - ) - - s2_cube = s2_cube.mask(rank_mask).aggregate_temporal_period( - composite_window, "first" - ) - - return s2_cube diff --git a/src/worldcereal/openeo/preprocessing.py b/src/worldcereal/openeo/preprocessing.py index 1e15b660..7f41ffbe 100644 --- a/src/worldcereal/openeo/preprocessing.py +++ b/src/worldcereal/openeo/preprocessing.py @@ -1,375 +1,274 @@ -from openeo.processes import array_create, if_, is_nodata, power -from openeo.rest.datacube import DataCube - -from worldcereal.openeo.compositing import max_ndvi_composite -from worldcereal.openeo.masking import scl_mask_erode_dilate +from pathlib import Path +from typing import List, Optional + +from openeo import UDF, Connection, DataCube +from openeo_gfmap import ( + Backend, + BackendContext, + BoundingBoxExtent, + FetchType, + SpatialContext, + TemporalContext, +) +from openeo_gfmap.fetching.generic import build_generic_extractor +from openeo_gfmap.fetching.s1 import build_sentinel1_grd_extractor +from openeo_gfmap.fetching.s2 import build_sentinel2_l2a_extractor +from openeo_gfmap.preprocessing.compositing import ( + max_ndvi_compositing, + mean_compositing, +) +from openeo_gfmap.preprocessing.interpolation import linear_interpolation +from openeo_gfmap.preprocessing.sar import compress_backscatter_uint16 COMPOSITE_WINDOW = "month" -def add_S1_bands( - connection, - S1_collection, - other_bands, - bbox, - start, - end, - preprocess=True, - **processing_options, -): - """Method to add S1 bands to datacube - - Args: - S1_collection (str): name of the S1 collection - other_bands (DataCube): OpenEO datacube to add bands to - - Available processing_options: - s1_orbitdirection - provider - target_crs +def raw_datacube_S2( + connection: Connection, + backend_context: BackendContext, + spatial_extent: SpatialContext, + temporal_extent: TemporalContext, + bands: List[str], + fetch_type: FetchType, + filter_tile: Optional[str] = None, + additional_masks: bool = True, + apply_mask: bool = False, +) -> DataCube: + """Extract Sentinel-2 datacube from OpenEO using GFMAP routines. + Raw data is extracted with no cloud masking applied by default (can be + enabled by setting `apply_mask=True`). In additional to the raw band values + a cloud-mask computed from the dilation of the SCL layer, as well as a + rank mask from the BAP compositing are added. + + Parameters + ---------- + connection : Connection + OpenEO connection instance. + backend_context : BackendContext + GFMAP Backend context to use for extraction. + spatial_extent : SpatialContext + Spatial context to extract data from, can be a GFMAP BoundingBoxExtent, + a GeoJSON dict or an URL to a publicly accessible GeoParquet file. + temporal_extent : TemporalContext + Temporal context to extract data from. + bands : List[str] + List of Sentinel-2 bands to extract. + fetch_type : FetchType + GFMAP Fetch type to use for extraction. + filter_tile : Optional[str], optional + Filter by tile ID, by default disabled. This forces the process to only + one tile ID from the Sentinel-2 collection. + apply_mask : bool, optional + Apply cloud masking, by default False. Can be enabled for high + optimization of memory usage. """ - isCreo = "creo" in processing_options.get("provider", "").lower() - orbit_direction = processing_options.get("s1_orbitdirection", None) - composite_window = processing_options.get("composite_window", COMPOSITE_WINDOW) - - # TODO: implement as needed - # if isCreo: - # orbit_direction = catalogue_check_S1(orbit_direction, start, end, bbox) - - if orbit_direction is not None: - properties = { - "sat:orbit_state": lambda orbdir: orbdir == orbit_direction - } # NOQA - else: - properties = {} - - # Load collection - S1bands = connection.load_collection( - S1_collection, - bands=["VH", "VV"], - spatial_extent=bbox, - temporal_extent=[start, end], - properties=properties, + # Extract the SCL collection only + scl_cube_properties = {"eo:cloud_cover": lambda val: val <= 95.0} + if filter_tile: + scl_cube_properties["tileId"] = lambda val: val == filter_tile + + scl_cube = connection.load_collection( + collection_id="SENTINEL2_L2A", + bands=["SCL"], + temporal_extent=[temporal_extent.start_date, temporal_extent.end_date], + spatial_extent=dict(spatial_extent) if fetch_type == FetchType.TILE else None, + properties=scl_cube_properties, ) - if S1_collection == "SENTINEL1_GRD": - # compute backscatter if starting from raw GRD, - # otherwise assume preprocessed backscatter - S1bands = S1bands.sar_backscatter( - coefficient="sigma0-ellipsoid", - local_incidence_angle=False, - # DO NOT USE MAPZEN - elevation_model="COPERNICUS_30" if isCreo else None, - options={ - "implementation_version": "2", - "tile_size": 256, - "otb_memory": 1024, - "debug": False, - "elev_geoid": "/opt/openeo-vito-aux-data/egm96.tif", - }, - ) - else: - pass - - # Resample to the S2 spatial resolution - target_crs = processing_options.get("target_crs", None) - if target_crs is not None: - S1bands = S1bands.resample_spatial(projection=target_crs, resolution=10.0) - - if preprocess: - # Composite to compositing window - S1bands = S1bands.aggregate_temporal_period( - period=composite_window, reducer="mean" - ) - - # Linearly interpolate missing values - S1bands = S1bands.apply_dimension( - dimension="t", process="array_interpolate_linear" - ) - - # Scale to int16 - if isCreo: - # for CREO, rescaling also replaces nodata introduced by orfeo - # with a low value - # https://github.com/Open-EO/openeo-geopyspark-driver/issues/293 - # TODO: check if nodata is correctly handled in Orfeo - S1bands = S1bands.apply_dimension( - dimension="bands", - process=lambda x: array_create( - [ - if_( - is_nodata(x[0]), - 1, - power(base=10, p=(10.0 * x[0].log(base=10) + 83.0) / 20.0), - ), - if_( - is_nodata(x[1]), - 1, - power(base=10, p=(10.0 * x[1].log(base=10) + 83.0) / 20.0), - ), - ] - ), + # Resample to 10m resolution for the SCL layer + scl_cube = scl_cube.resample_spatial(10) + + # Compute the SCL dilation mask + scl_dilated_mask = scl_cube.process( + "to_scl_dilation_mask", + data=scl_cube, + scl_band_name="SCL", + kernel1_size=17, # 17px dilation on a 10m layer + kernel2_size=77, # 77px dilation on a 10m layer + mask1_values=[2, 4, 5, 6, 7], + mask2_values=[3, 8, 9, 10, 11], + erosion_kernel_size=3, + ).rename_labels("bands", ["S2-L2A-SCL_DILATED_MASK"]) + + # Compute the distance to cloud and add it to the cube + distance_to_cloud = scl_cube.apply_neighborhood( + process=UDF.from_file(Path(__file__).parent / "udf_distance_to_cloud.py"), + size=[ + {"dimension": "x", "unit": "px", "value": 256}, + {"dimension": "y", "unit": "px", "value": 256}, + {"dimension": "t", "unit": "null", "value": "P1D"}, + ], + overlap=[ + {"dimension": "x", "unit": "px", "value": 16}, + {"dimension": "y", "unit": "px", "value": 16}, + ], + ).rename_labels("bands", ["S2-L2A-DISTANCE-TO-CLOUD"]) + + additional_masks = scl_dilated_mask.merge_cubes(distance_to_cloud) + + # Try filtering using the geometry + if fetch_type == FetchType.TILE: + additional_masks = additional_masks.filter_spatial(spatial_extent.to_geojson()) + + # Create the job to extract S2 + extraction_parameters = { + "target_resolution": None, # Disable target resolution + "load_collection": { + "eo:cloud_cover": lambda val: val <= 95.0, + }, + } + if (backend_context.backend is Backend.CDSE_STAGING) and ( + fetch_type == FetchType.POLYGON + ): # At the moment only available in CDSE staging + extraction_parameters["update_arguments"] = { + "featureflags": { + "tile_size": 128, + "load_per_product": True, + } + } + if additional_masks: + extraction_parameters["pre_merge"] = additional_masks + if filter_tile: + extraction_parameters["load_collection"]["tileId"] = ( + lambda val: val == filter_tile ) - else: - S1bands = S1bands.apply_dimension( - dimension="bands", - process=lambda x: array_create( - [ - power(base=10, p=(10.0 * x[0].log(base=10) + 83.0) / 20.0), - power(base=10, p=(10.0 * x[1].log(base=10) + 83.0) / 20.0), - ] - ), - ) - - S1bands = S1bands.linear_scale_range(1, 65534, 1, 65534) - - # -------------------------------------------------------------------- - # Merge cubes - # -------------------------------------------------------------------- - - merged_inputs = other_bands.resample_cube_spatial(S1bands).merge_cubes(S1bands) - - return merged_inputs - + if apply_mask: + extraction_parameters["pre_mask"] = scl_dilated_mask + + extractor = build_sentinel2_l2a_extractor( + backend_context, + bands=bands, + fetch_type=fetch_type, + **extraction_parameters, + ) -def add_DEM(connection, DEM_collection, other_bands, bbox, **processing_options): - """Method to add DEM to datacube + return extractor.get_cube(connection, spatial_extent, temporal_extent) - Args: - connection (_type_): _description_ - DEM_collection (str): Name of DEM collection - other_bands (DataCube): DataCube to merge DEM into - bbox (_type_): _description_ - Returns: - DataCube: merged datacube +def raw_datacube_S1( + connection: Connection, + backend_context: BackendContext, + spatial_extent: SpatialContext, + temporal_extent: TemporalContext, + bands: List[str], + fetch_type: FetchType, + target_resolution: float = 20.0, + orbit_direction: Optional[str] = None, +) -> DataCube: + """Extract Sentinel-1 datacube from OpenEO using GFMAP routines. + + Parameters + ---------- + connection : Connection + OpenEO connection instance. + backend_context : BackendContext + GFMAP Backend context to use for extraction. + spatial_extent : SpatialContext + Spatial context to extract data from, can be a GFMAP BoundingBoxExtent, + a GeoJSON dict or an URL to a publicly accessible GeoParquet file. + temporal_extent : TemporalContext + Temporal context to extract data from. + bands : List[str] + List of Sentinel-1 bands to extract. + fetch_type : FetchType + GFMAP Fetch type to use for extraction. """ + extractor_parameters = { + "target_resolution": target_resolution, + } + if orbit_direction is not None: + extractor_parameters["load_collection"] = { + "sat:orbit_state": lambda orbit: orbit == orbit_direction + } - dem = connection.load_collection( - DEM_collection, - spatial_extent=bbox, + extractor = build_sentinel1_grd_extractor( + backend_context, bands=bands, fetch_type=fetch_type, **extractor_parameters ) - # Resample to the S2 spatial resolution - target_crs = processing_options.get("target_crs", None) - if target_crs is not None: - dem = dem.resample_spatial( - projection=target_crs, resolution=10.0, method="cubic" - ) - - # collection has timestamps which we need to get rid of - dem = dem.max_time() - - # -------------------------------------------------------------------- - # Merge cubes - # -------------------------------------------------------------------- - - merged_inputs = other_bands.merge_cubes(dem) - - return merged_inputs + return extractor.get_cube(connection, spatial_extent, temporal_extent) -def add_meteo( - connection, - METEO_collection, - other_bands, - bbox, - start, - end, - target_crs=None, - **processing_options, -): - # AGERA5 - # TODO: add precipitation with sum compositor - - composite_window = processing_options.get("composite_window", COMPOSITE_WINDOW) - - meteo = connection.load_collection( - METEO_collection, - spatial_extent=bbox, - bands=["temperature-mean"], - temporal_extent=[start, end], +def raw_datacube_DEM( + connection: Connection, + backend_context: BackendContext, + spatial_extent: SpatialContext, + fetch_type: FetchType, +) -> DataCube: + extractor = build_generic_extractor( + backend_context=backend_context, + bands=["COP-DEM"], + fetch_type=fetch_type, + collection_name="COPERNICUS_30", ) + return extractor.get_cube(connection, spatial_extent, None) - if target_crs is not None: - meteo = meteo.resample_spatial(projection=target_crs, resolution=10.0) - - # Composite to compositing window - meteo = meteo.aggregate_temporal_period(period=composite_window, reducer="mean") - - # Linearly interpolate missing values. - # Shouldn't exist in this dataset but is good practice to do so - meteo = meteo.apply_dimension(dimension="t", process="array_interpolate_linear") - - # Rename band to match Radix model requirements - meteo = meteo.rename_labels("bands", ["temperature_mean"]) - - # -------------------------------------------------------------------- - # Merge cubes - # or return just meteo - # -------------------------------------------------------------------- - if other_bands is None: - return meteo - merged_inputs = other_bands.merge_cubes(meteo) - - return merged_inputs - - -def worldcereal_preprocessed_inputs( - connection, - bbox, - start: str, - end: str, - S2_collection="TERRASCOPE_S2_TOC_V2", - S1_collection="SENTINEL1_GRD_SIGMA0", - DEM_collection="COPERNICUS_30", - METEO_collection="AGERA5", - preprocess=True, - masking="mask_scl_dilation", - **processing_options, +def worldcereal_preprocessed_inputs_gfmap( + connection: Connection, + backend_context: BackendContext, + spatial_extent: BoundingBoxExtent, + temporal_extent: TemporalContext, ) -> DataCube: - """Main method to get preprocessed inputs from OpenEO for - downstream crop type mapping. - - Args: - connection: OpenEO connection instance - bbox (_type_): _description_ - start (str): Start date for requested input data (yyyy-mm-dd) - end (str): Start date for requested input data (yyyy-mm-dd) - S2_collection (str, optional): Collection name for S2 data. - Defaults to - 'TERRASCOPE_S2_TOC_V2'. - S1_collection (str, optional): Collection name for S1 data. - Defaults to - 'SENTINEL1_GRD'. - DEM_collection (str, optional): Collection name for DEM data. - Defaults to - 'COPERNICUS_30'. - METEO_collection (str, optional): Collection name for - meteo data. Defaults to 'AGERA5'. - preprocess (bool, optional): Apply compositing and interpolation. - Defaults to True. - masking (str, optional): Masking method to be applied. - One of ['satio', 'mask_scl_dilation', None] - Defaults to 'mask_scl_dilation'. - - Returns: - DataCube: OpenEO DataCube wich the requested inputs - """ - - composite_window = processing_options.get("composite_window", COMPOSITE_WINDOW) - - # -------------------------------------------------------------------- - # Optical data - # -------------------------------------------------------------------- - - S2_bands = ["B02", "B03", "B04", "B05", "B06", "B07", "B08", "B11", "B12"] - if masking not in ["satio", "mask_scl_dilation", None]: - raise ValueError(f"Unknown masking option `{masking}`") - if masking in ["mask_scl_dilation"]: - # Need SCL band to mask - S2_bands.append("SCL") - bands = connection.load_collection( - S2_collection, - bands=S2_bands, - spatial_extent=bbox, - temporal_extent=[start, end], - max_cloud_cover=95, + # Extraction of S2 from GFMAP + s2_data = raw_datacube_S2( + connection=connection, + backend_context=backend_context, + spatial_extent=spatial_extent, + temporal_extent=temporal_extent, + bands=[ + "S2-L2A-B02", + "S2-L2A-B03", + "S2-L2A-B04", + "S2-L2A-B05", + "S2-L2A-B06", + "S2-L2A-B07", + "S2-L2A-B08", + "S2-L2A-B8A", + "S2-L2A-B11", + "S2-L2A-B12", + ], + fetch_type=FetchType.TILE, + filter_tile=False, + additional_masks=False, + apply_mask=True, ) - # TODO: implement as needed - # S2URL creo only accepts request in EPSG:4326 - # isCreo = "creo" in processing_options.get("provider", "").lower() - # if isCreo: - # catalogue_check_S2(start, end, bbox) - - target_crs = processing_options.get("target_crs", None) - if target_crs is not None: - bands = bands.resample_spatial(projection=target_crs, resolution=10.0) - - # NOTE: For now we mask again snow/ice because clouds - # are sometimes marked as SCL value 11! - if masking == "mask_scl_dilation": - # TODO: double check cloud masking parameters - # https://github.com/Open-EO/openeo-geotrellis-extensions/blob/develop/geotrellis-common/src/main/scala/org/openeo/geotrelliscommon/CloudFilterStrategy.scala#L54 # NOQA - bands = bands.process( - "mask_scl_dilation", - data=bands, - scl_band_name="SCL", - kernel1_size=17, - kernel2_size=77, - mask1_values=[2, 4, 5, 6, 7], - mask2_values=[3, 8, 9, 10, 11], - erosion_kernel_size=3, - ).filter_bands(bands.metadata.band_names[:-1]) - elif masking == "satio": - # Apply satio-based mask - mask = scl_mask_erode_dilate( - connection, - bbox, - scl_layer_band=S2_collection + ":SCL", - target_crs=target_crs, - ).resample_cube_spatial(bands) - bands = bands.mask(mask) - - if preprocess: - # Composite to compositing window - # bands = bands.aggregate_temporal_period(period=composite_window, - # reducer="median") - bands = max_ndvi_composite(bands, composite_window=composite_window) - - # TODO: if we would disable it here, nodata values - # will be 65535 and we need to cope with that later - # Linearly interpolate missing values - bands = bands.apply_dimension(dimension="t", process="array_interpolate_linear") - - # Force UINT16 to avoid overflow issue with S2 data - bands = bands.linear_scale_range(0, 65534, 0, 65534) - - # -------------------------------------------------------------------- - # AGERA5 Meteo data - # -------------------------------------------------------------------- - if METEO_collection is not None: - bands = add_meteo( - connection, - METEO_collection, - bands, - bbox, - start, - end, - target_crs=target_crs, - composite_window=composite_window, - ) - - # -------------------------------------------------------------------- - # SAR data - # -------------------------------------------------------------------- - if S1_collection is not None: - bands = add_S1_bands( - connection, - S1_collection, - bands, - bbox, - start, - end, - composite_window=composite_window, - **processing_options, - ) - - bands = bands.filter_temporal(start, end) + s2_data = max_ndvi_compositing(s2_data, period="month") + s2_data = linear_interpolation(s2_data) + + # Cast to uint16 + s2_data = s2_data.linear_scale_range(0, 65534, 0, 65534) + + # Extraction of the S1 data + s1_data = raw_datacube_S1( + connection=connection, + backend_context=backend_context, + spatial_extent=spatial_extent, + temporal_extent=temporal_extent, + bands=[ + "S1-SIGMA0-VV", + "S1-SIGMA0-VH", + ], + fetch_type=FetchType.TILE, + target_resolution=10.0, # Compute the backscatter at 20m resolution, then upsample nearest neighbor when merging cubes + orbit_direction="ASCENDING", + ) - # -------------------------------------------------------------------- - # DEM data - # -------------------------------------------------------------------- - if DEM_collection is not None: - bands = add_DEM(connection, DEM_collection, bands, bbox, **processing_options) + s1_data = mean_compositing(s1_data, period="month") + s1_data = linear_interpolation(s1_data) + s1_data = compress_backscatter_uint16(backend_context, s1_data) - # forcing 16bit - bands = bands.linear_scale_range(0, 65534, 0, 65534) + dem_data = raw_datacube_DEM( + connection=connection, + backend_context=backend_context, + spatial_extent=spatial_extent, + fetch_type=FetchType.TILE, + ) - return bands + dem_data = dem_data.resample_cube_spatial(s2_data, method="cubic") + dem_data = dem_data.linear_scale_range(0, 65534, 0, 65534) + data = s2_data.merge_cubes(s1_data) + data = data.merge_cubes(dem_data) -def worldcereal_raw_inputs(*args, **kwargs): - return worldcereal_preprocessed_inputs(*args, **kwargs, preprocess=False) + return data diff --git a/scripts/extractions/udf_distance_to_cloud.py b/src/worldcereal/openeo/udf_distance_to_cloud.py similarity index 87% rename from scripts/extractions/udf_distance_to_cloud.py rename to src/worldcereal/openeo/udf_distance_to_cloud.py index 00d32da8..b65af8a9 100644 --- a/scripts/extractions/udf_distance_to_cloud.py +++ b/src/worldcereal/openeo/udf_distance_to_cloud.py @@ -7,7 +7,7 @@ def apply_datacube(cube: XarrayDataCube, context: dict) -> XarrayDataCube: cube_array: xr.DataArray = cube.get_array() - cube_array = cube_array.transpose("t", "bands", "y", "x") + cube_array = cube_array.transpose("bands", "y", "x") clouds = np.logical_or( np.logical_and(cube_array < 11, cube_array >= 8), cube_array == 3 @@ -47,11 +47,10 @@ def erode(image, selem): distance_da = xr.DataArray( distance, coords={ - "t": cube_array.coords["t"], "y": cube_array.coords["y"], "x": cube_array.coords["x"], }, - dims=["t", "y", "x"], + dims=["y", "x"], ) distance_da = distance_da.expand_dims( @@ -60,6 +59,6 @@ def erode(image, selem): }, ) - distance_da = distance_da.transpose("t", "bands", "y", "x") + distance_da = distance_da.transpose("bands", "y", "x") return XarrayDataCube(distance_da) From d943a8693216730d564db657ef04d06e811d410f Mon Sep 17 00:00:00 2001 From: Darius Couchard Date: Tue, 30 Apr 2024 12:46:54 +0200 Subject: [PATCH 06/11] Removed useless import --- scripts/extractions/extract_sar.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/scripts/extractions/extract_sar.py b/scripts/extractions/extract_sar.py index 5f760d2b..3027ee2b 100644 --- a/scripts/extractions/extract_sar.py +++ b/scripts/extractions/extract_sar.py @@ -29,8 +29,6 @@ ) from shapely.geometry import Point -from worldcereal.openeo.preprocessing import raw_datacube_S1 - # Logger for this current pipeline _pipeline_log: Optional[logging.Logger] = None From 750484cafbd36f9c6c60c3489930fa3721dd8a99 Mon Sep 17 00:00:00 2001 From: Darius Couchard Date: Tue, 30 Apr 2024 17:02:09 +0200 Subject: [PATCH 07/11] Updated pipeline for worldcereal harmoznied datasets and change backend to CDSE --- scripts/extractions/extract_meteo.py | 4 +- scripts/extractions/extract_optical.py | 173 +++++++++++++++++++----- scripts/extractions/extract_sar.py | 33 +++-- src/worldcereal/openeo/preprocessing.py | 18 +-- 4 files changed, 167 insertions(+), 61 deletions(-) diff --git a/scripts/extractions/extract_meteo.py b/scripts/extractions/extract_meteo.py index f5873670..1af98b95 100644 --- a/scripts/extractions/extract_meteo.py +++ b/scripts/extractions/extract_meteo.py @@ -66,7 +66,7 @@ def create_datacube_meteo( ) h3index = geometry.features[0].properties["h3index"] - valid_date = geometry.features[0].properties["valid_date"] + valid_time = geometry.features[0].properties["valid_time"] job_options = { "executor-memory": executor_memory, @@ -74,7 +74,7 @@ def create_datacube_meteo( } return cube.create_job( out_format="NetCDF", - title=f"GFMAP_Extraction_AGERA5_{h3index}_{valid_date}", + title=f"GFMAP_Extraction_AGERA5_{h3index}_{valid_time}", sample_by_feature=True, job_options=job_options, ) diff --git a/scripts/extractions/extract_optical.py b/scripts/extractions/extract_optical.py index 744019c1..a41e25f1 100644 --- a/scripts/extractions/extract_optical.py +++ b/scripts/extractions/extract_optical.py @@ -10,6 +10,7 @@ import geojson import geopandas as gpd import openeo +from openeo.extra.job_management import _log as _log_openeo import pandas as pd import pystac from extract_sar import ( @@ -23,11 +24,10 @@ generate_output_path, ) from openeo_gfmap import Backend, BackendContext, FetchType, TemporalContext -from openeo_gfmap.backend import cdse_staging_connection +from openeo_gfmap.backend import cdse_connection from openeo_gfmap.manager import _log from openeo_gfmap.manager.job_manager import GFMAPJobManager from openeo_gfmap.manager.job_splitters import ( - _append_h3_index, _load_s2_grid, split_job_s2grid, ) @@ -35,20 +35,128 @@ from worldcereal.openeo.preprocessing import raw_datacube_S2 -AUXILIARY = pystac.extensions.item_assets.AssetDefinition( +# Define the sentinel 2 asset +sentinel2_asset = pystac.extensions.item_assets.AssetDefinition( { - "title": "ground truth data", - "description": "This asset contains the crop type codes.", + "gsd": 10, + "title": "Sentinel2", + "description": "Sentinel-2 bands", "type": "application/x-netcdf", "roles": ["data"], "proj:shape": [64, 64], - "raster:bands": [ - {"name": "ewoc_code", "data_type": "int64", "bits_per_sample": 64} + "raster:bands": [{"name": "S2-L2A-B01"}, + {"name": "S2-L2A-B02"}, + {"name": "S2-L2A-B03"}, + {"name": "S2-L2A-B04"}, + {"name": "S2-L2A-B05"}, + {"name": "S2-L2A-B06"}, + {"name": "S2-L2A-B07"}, + {"name": "S2-L2A-B8A"}, + {"name": "S2-L2A-B08"}, + {"name": "S2-L2A-B11"}, + {"name": "S2-L2A-B12"}, + {"name": "S2-L2A-SCL"}, + {"name": "S2-L2A-SCL_DILATED_MASK"}, + {"name": "S2-L2A-DISTANCE_TO_CLOUD"}], + "cube:variables": { + "S2-L2A-B01": {"dimensions": ["time", "y", "x"], "type": "data"}, + "S2-L2A-B02": {"dimensions": ["time", "y", "x"], "type": "data"}, + "S2-L2A-B03": {"dimensions": ["time", "y", "x"], "type": "data"}, + "S2-L2A-B04": {"dimensions": ["time", "y", "x"], "type": "data"}, + "S2-L2A-B05": {"dimensions": ["time", "y", "x"], "type": "data"}, + "S2-L2A-B06": {"dimensions": ["time", "y", "x"], "type": "data"}, + "S2-L2A-B07": {"dimensions": ["time", "y", "x"], "type": "data"}, + "S2-L2A-B8A": {"dimensions": ["time", "y", "x"], "type": "data"}, + "S2-L2A-B08": {"dimensions": ["time", "y", "x"], "type": "data"}, + "S2-L2A-B11": {"dimensions": ["time", "y", "x"], "type": "data"}, + "S2-L2A-B12": {"dimensions": ["time", "y", "x"], "type": "data"}, + "S2-L2A-SCL": {"dimensions": ["time", "y", "x"], "type": "data"}, + "S2-L2A-SCL_DILATED_MASK": {"dimensions": ["time", "y", "x"], "type": "data"}, + "S2-L2A-DISTANCE_TO_CLOUD": {"dimensions": ["time", "y", "x"], "type": "data"}, + }, + "eo:bands": [ + { + "name": "S2-L2A-B01", + "common_name": "coastal", + "center_wavelength": 0.443, + "full_width_half_max": 0.027, + }, + { + "name": "S2-L2A-B02", + "common_name": "blue", + "center_wavelength": 0.49, + "full_width_half_max": 0.098, + }, + { + "name": "S2-L2A-B03", + "common_name": "green", + "center_wavelength": 0.56, + "full_width_half_max": 0.045, + }, + { + "name": "S2-L2A-B04", + "common_name": "red", + "center_wavelength": 0.665, + "full_width_half_max": 0.038, + }, + { + "name": "S2-L2A-B05", + "common_name": "rededge", + "center_wavelength": 0.704, + "full_width_half_max": 0.019, + }, + { + "name": "S2-L2A-B06", + "common_name": "rededge", + "center_wavelength": 0.74, + "full_width_half_max": 0.018, + }, + { + "name": "S2-L2A-B07", + "common_name": "rededge", + "center_wavelength": 0.783, + "full_width_half_max": 0.028, + }, + { + "name": "S2-L2A-B08", + "common_name": "nir", + "center_wavelength": 0.842, + "full_width_half_max": 0.145, + }, + { + "name": "S2-L2A-B8A", + "common_name": "nir08", + "center_wavelength": 0.865, + "full_width_half_max": 0.033, + }, + { + "name": "S2-L2A-B11", + "common_name": "swir16", + "center_wavelength": 1.61, + "full_width_half_max": 0.143, + }, + { + "name": "S2-L2A-B12", + "common_name": "swir16", + "center_wavelength": 1.61, + "full_width_half_max": 0.143, + }, + { + "name": "S2-L2A-SCL", + "common_name": "swir16", + "center_wavelength": 1.61, + "full_width_half_max": 0.143, + }, + { + "name": "S2-L2A-SCL_DILATED_MASK", + }, + { + "name": "S2-L2A-DISTANCE_TO_CLOUD", + }, ], } ) - def create_datacube_optical( row: pd.Series, connection: openeo.DataCube, @@ -79,7 +187,7 @@ def create_datacube_optical( # Get the h3index to use in the tile s2_tile = row.s2_tile - valid_date = geometry.features[0].properties["valid_date"] + valid_time = geometry.features[0].properties["valid_time"] bands_to_download = [ "S2-L2A-B01", @@ -129,7 +237,7 @@ def create_datacube_optical( return cube.create_job( out_format="NetCDF", - title=f"GFMAP_Extraction_S2_{s2_tile}_{valid_date}", + title=f"GFMAP_Extraction_S2_{s2_tile}_{valid_time}", sample_by_feature=True, job_options=job_options, ) @@ -141,11 +249,11 @@ def post_job_action( base_gpd = gpd.GeoDataFrame.from_features(json.loads(row.geometry)).set_crs( epsg=4326 ) - assert len(base_gpd[base_gpd.extract]) == len( + assert len(base_gpd[base_gpd.extract == 1]) == len( job_items ), "The number of result paths should be the same as the number of geometries" - extracted_gpd = base_gpd[base_gpd.extract].reset_index(drop=True) + extracted_gpd = base_gpd[base_gpd.extract == 1].reset_index(drop=True) # In this case we want to burn the metadata in a new file in the same folder as the S2 product for idx, item in enumerate(job_items): if "sample_id" in extracted_gpd.columns: @@ -154,8 +262,8 @@ def post_job_action( sample_id = extracted_gpd.iloc[idx].sampleID ref_id = extracted_gpd.iloc[idx].ref_id - valid_date = extracted_gpd.iloc[idx].valid_date - h3index = extracted_gpd.iloc[idx].h3index + valid_time = extracted_gpd.iloc[idx].valid_time + h3_l3_cell = extracted_gpd.iloc[idx].h3_l3_cell s2_tile = row.s2_tile item_asset_path = Path(list(item.assets.values())[0].href) @@ -164,7 +272,7 @@ def post_job_action( new_attributes = { "start_date": row.start_date, "end_date": row.end_date, - "valid_date": valid_date, + "valid_time": valid_time, "GFMAP_version": version("openeo_gfmap"), "creation_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "description": f"Sentinel2 L2A observations for sample: {sample_id}, unprocessed.", @@ -173,7 +281,7 @@ def post_job_action( "ref_id": ref_id, "spatial_resolution": "10m", "s2_tile": s2_tile, - "h3index": h3index, + "h3_l3_cell": h3_l3_cell, } # Saves the new attributes in the netcdf file @@ -194,7 +302,7 @@ def post_job_action( ) parser.add_argument( "input_df", - type=str, + type=Path, help="Path or URL to the input dataframe for the training data.", ) parser.add_argument( @@ -206,7 +314,7 @@ def post_job_action( parser.add_argument( "--memory", type=str, - default="600m", + default="1800m", help="Memory to allocate for the executor.", ) parser.add_argument( @@ -223,13 +331,15 @@ def post_job_action( # Load the input dataframe _pipeline_log.info("Loading input dataframe from %s.", args.input_df) - input_df = gpd.read_file(args.input_df) - input_df = _append_h3_index(input_df, grid_resolution=3) + if args.input_df.name.endswith('.geoparquet'): + input_df = gpd.read_parquet(args.input_df) + else: + input_df = gpd.read_file(args.input_df) split_dfs = split_job_s2grid(input_df, max_points=args.max_locations) - split_dfs = [df for df in split_dfs if df.extract.any()] + split_dfs = [df for df in split_dfs if (df.extract == 1).any()] - job_df = create_job_dataframe(Backend.CDSE_STAGING, split_dfs, prefix="S2-L2A-10m") + job_df = create_job_dataframe(Backend.CDSE, split_dfs, prefix="S2-L2A-10m") # Setup the memory parameters for the job creator. create_datacube_optical = partial( @@ -256,19 +366,12 @@ def post_job_action( ) manager.add_backend( - Backend.CDSE_STAGING.value, cdse_staging_connection, parallel_jobs=6 + Backend.CDSE.value, cdse_connection, parallel_jobs=6 + ) + manager.setup_stac( + constellation="sentinel2", + item_assets={"sentinel2": sentinel2_asset}, ) _pipeline_log.info("Launching the jobs from the manager.") - - try: - manager.run_jobs(job_df, create_datacube_optical, tracking_df_path) - manager.create_stac( - constellation="sentinel2", item_assets={"auxiliary": AUXILIARY} - ) - except Exception as e: - _pipeline_log.error("Error during the job execution: %s", e) - manager.create_stac( - constellation="sentinel2", item_assets={"auxiliary": AUXILIARY} - ) - raise e + manager.run_jobs(job_df, create_datacube_optical, tracking_df_path) diff --git a/scripts/extractions/extract_sar.py b/scripts/extractions/extract_sar.py index 3027ee2b..c7f1ffff 100644 --- a/scripts/extractions/extract_sar.py +++ b/scripts/extractions/extract_sar.py @@ -13,6 +13,7 @@ import geojson import geopandas as gpd import openeo +from openeo.extra.job_management import _log as _log_openeo import pandas as pd import pystac import requests @@ -34,15 +35,17 @@ def _setup_logger(level=logging.INFO) -> None: - global _pipeline_log + global _pipeline_log, _log_openeo """Setup the logger from the openeo_gfmap package to the assigned level.""" _pipeline_log = logging.getLogger("pipeline_sar") _pipeline_log.setLevel(level) _log.setLevel(level) + _log_openeo.setLevel(level) stream_handler = logging.StreamHandler() _log.addHandler(stream_handler) + _log_openeo.addHandler(stream_handler) _pipeline_log.addHandler(stream_handler) formatter = logging.Formatter("%(asctime)s|%(name)s|%(levelname)s: %(message)s") @@ -53,7 +56,7 @@ class ManagerLoggerFilter(logging.Filter): """Filter to only accept the OpenEO-GFMAP manager logs.""" def filter(self, record): - return record.name in [_log.name, _pipeline_log.name] + return record.name in [_log.name, _pipeline_log.name, _log_openeo.name] stream_handler.addFilter(ManagerLoggerFilter()) @@ -83,7 +86,7 @@ def _buffer_geometry( def _filter_extract_true(geometries: geojson.FeatureCollection) -> gpd.GeoDataFrame: """Remove all the geometries from the Feature Collection that have the property field `extract` set to `False`""" return geojson.FeatureCollection( - [f for f in geometries.features if f.properties.get("extract", False)] + [f for f in geometries.features if f.properties.get("extract", 0) == 1] ) @@ -141,10 +144,10 @@ def generate_output_path( ref_id = features[geometry_index].properties["ref_id"] s2_tile_id = row.s2_tile - h3index = row.h3index + h3_l3_cell = row.h3_l3_cell epsg = s2_grid[s2_grid.tile == s2_tile_id].iloc[0].epsg - subfolder = root_folder / ref_id / h3index / sample_id + subfolder = root_folder / ref_id / h3_l3_cell / sample_id return ( subfolder / f"{row.out_prefix}_{sample_id}_{epsg}_{row.start_date}_{row.end_date}{row.out_extension}" @@ -162,17 +165,17 @@ def create_job_dataframe( "start_date", "end_date", "s2_tile", - "h3index", + "h3_l3_cell", "geometry", ] rows = [] for job in split_jobs: # Compute the average in the valid date and make a buffer of 1.5 year around - median_time = pd.to_datetime(job.valid_date).mean() + median_time = pd.to_datetime(job.valid_time).mean() start_date = median_time - pd.Timedelta(days=275) # A bit more than 9 months end_date = median_time + pd.Timedelta(days=275) # A bit more than 9 months s2_tile = job.tile.iloc[0] # Job dataframes are split depending on the - h3index = job.h3index.iloc[0] + h3_l3_cell = job.h3_l3_cell.iloc[0] rows.append( pd.Series( @@ -186,7 +189,7 @@ def create_job_dataframe( start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d"), s2_tile, - h3index, + h3_l3_cell, job.to_json(), ], ) @@ -251,7 +254,7 @@ def create_datacube_sar( # Additional values to generate the BatcJob name s2_tile = row.s2_tile - valid_date = geometry.features[0].properties["valid_date"] + valid_time = geometry.features[0].properties["valid_time"] # Increase the memory of the jobs depending on the number of polygons to extract number_polygons = _get_job_nb_polygons(row) @@ -263,7 +266,7 @@ def create_datacube_sar( } return cube.create_job( out_format="NetCDF", - title=f"GFMAP_Extraction_S1_{s2_tile}_{valid_date}", + title=f"GFMAP_Extraction_S1_{s2_tile}_{valid_time}", sample_by_feature=True, job_options=job_options, ) @@ -283,8 +286,8 @@ def post_job_action( for idx, item in enumerate(job_items): sample_id = extracted_gpd.iloc[idx].sample_id ref_id = extracted_gpd.iloc[idx].ref_id - valid_date = extracted_gpd.iloc[idx].valid_date - h3index = extracted_gpd.iloc[idx].h3index + valid_time = extracted_gpd.iloc[idx].valid_time + h3_l3_cell = extracted_gpd.iloc[idx].h3_l3_cell item_asset_path = Path(list(item.assets.values())[0].href) # Read information from the item file (could also read it from the item object metadata) @@ -295,7 +298,7 @@ def post_job_action( { "start_date": row.start_date, "end_date": row.end_date, - "valid_date": valid_date, + "valid_time": valid_time, "GFMAP_version": version("openeo_gfmap"), "creation_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "description": f"Sentinel1 GRD observations for sample: {sample_id}, unprocessed.", @@ -303,7 +306,7 @@ def post_job_action( "sample_id": sample_id, "ref_id": ref_id, "spatial_resolution": "20m", - "h3index": h3index, + "h3_l3_cell": h3_l3_cell, } ) result_ds.to_netcdf(item_asset_path) diff --git a/src/worldcereal/openeo/preprocessing.py b/src/worldcereal/openeo/preprocessing.py index 7f41ffbe..67e2bda0 100644 --- a/src/worldcereal/openeo/preprocessing.py +++ b/src/worldcereal/openeo/preprocessing.py @@ -117,15 +117,15 @@ def raw_datacube_S2( "eo:cloud_cover": lambda val: val <= 95.0, }, } - if (backend_context.backend is Backend.CDSE_STAGING) and ( - fetch_type == FetchType.POLYGON - ): # At the moment only available in CDSE staging - extraction_parameters["update_arguments"] = { - "featureflags": { - "tile_size": 128, - "load_per_product": True, - } - } + # if (backend_context.backend is Backend.CDSE_STAGING) and ( + # fetch_type == FetchType.POLYGON + # ): # At the moment only available in CDSE staging + # extraction_parameters["update_arguments"] = { + # "featureflags": { + # "tile_size": 128, + # "load_per_product": True, + # } + # } if additional_masks: extraction_parameters["pre_merge"] = additional_masks if filter_tile: From 7ed193b009d8704b3fd82b21a426761f97e76be3 Mon Sep 17 00:00:00 2001 From: Darius Couchard Date: Wed, 22 May 2024 11:32:22 +0200 Subject: [PATCH 08/11] Updated private methods to public and moved gfmap logger setup to gfmap --- scripts/extractions/extract_meteo.py | 32 +++++----- scripts/extractions/extract_optical.py | 81 +++++++++++++------------- scripts/extractions/extract_sar.py | 46 +++++++-------- 3 files changed, 78 insertions(+), 81 deletions(-) diff --git a/scripts/extractions/extract_meteo.py b/scripts/extractions/extract_meteo.py index 1af98b95..1cb0d865 100644 --- a/scripts/extractions/extract_meteo.py +++ b/scripts/extractions/extract_meteo.py @@ -8,13 +8,13 @@ import openeo import pandas as pd from extract_sar import ( - _buffer_geometry, - _filter_extract_true, - _pipeline_log, - _setup_logger, - _upload_geoparquet_artifactory, + buffer_geometry, create_job_dataframe, + filter_extract_true, generate_output_path, + pipeline_log, + setup_logger, + upload_geoparquet_artifactory, ) from openeo_gfmap import Backend, TemporalContext from openeo_gfmap.backend import vito_connection @@ -43,14 +43,14 @@ def create_datacube_meteo( assert isinstance(geometry, geojson.FeatureCollection) # Filter the geometry to the rows with the extract only flag - geometry = _filter_extract_true(geometry) + geometry = filter_extract_true(geometry) assert len(geometry.features) > 0, "No geometries with the extract flag found" # Performs a buffer of 64 px around the geometry - geometry_df = _buffer_geometry(geometry, distance_m=5) - spatial_extent_url = _upload_geoparquet_artifactory(geometry_df, row.name) + geometry_df = buffer_geometry(geometry, distance_m=5) + spatial_extent_url = upload_geoparquet_artifactory(geometry_df, row.name) - bands_to_download = ["temperature-mean"] + bands_to_download = ["temperature-mean", "precipitation-flux"] cube = connection.load_collection( "AGERA5", @@ -61,8 +61,8 @@ def create_datacube_meteo( cube = cube.filter_spatial(filter_geometry) cube.rename_labels( dimension="bands", - target=["AGERA5-temperature-mean"], - source=["temperature-mean"], + target=["AGERA5-temperature-mean", "AGERA5-precipitation-flux"], + source=["temperature-mean", "precipitation-flux"], ) h3index = geometry.features[0].properties["h3index"] @@ -81,8 +81,8 @@ def create_datacube_meteo( if __name__ == "__main__": - _setup_logger() - from extract_sar import _pipeline_log + setup_logger() + from extract_sar import pipeline_log parser = argparse.ArgumentParser( description="AGERA5 samples extraction with OpenEO-GFMAP package." @@ -116,7 +116,7 @@ def create_datacube_meteo( tracking_df_path = Path(args.output_path) / "job_tracking.csv" # Load the input dataframe - _pipeline_log.info("Loading input dataframe from %s.", args.input_df) + pipeline_log.info("Loading input dataframe from %s.", args.input_df) input_df = gpd.read_file(args.input_df) input_df = _append_h3_index(input_df, grid_resolution=3) @@ -126,7 +126,7 @@ def create_datacube_meteo( job_df = create_job_dataframe(Backend.TERRASCOPE, split_dfs, prefix="AGERA5") - _pipeline_log.warning( + pipeline_log.warning( "Sub-sampling the job dataframe for testing. Remove this for production." ) # job_df = job_df.iloc[[0, 2, 3, -6]].reset_index(drop=True) @@ -158,5 +158,5 @@ def create_datacube_meteo( manager.add_backend(Backend.TERRASCOPE.value, vito_connection, parallel_jobs=6) - _pipeline_log.info("Launching the jobs from the manager.") + pipeline_log.info("Launching the jobs from the manager.") manager.run_jobs(job_df, create_datacube_meteo, tracking_df_path) diff --git a/scripts/extractions/extract_optical.py b/scripts/extractions/extract_optical.py index a41e25f1..99551f97 100644 --- a/scripts/extractions/extract_optical.py +++ b/scripts/extractions/extract_optical.py @@ -10,27 +10,23 @@ import geojson import geopandas as gpd import openeo -from openeo.extra.job_management import _log as _log_openeo import pandas as pd import pystac from extract_sar import ( - _buffer_geometry, - _filter_extract_true, - _get_job_nb_polygons, - _pipeline_log, - _setup_logger, - _upload_geoparquet_artifactory, + buffer_geometry, create_job_dataframe, + filter_extract_true, generate_output_path, + get_job_nb_polygons, + pipeline_log, + setup_logger, + upload_geoparquet_artifactory, ) from openeo_gfmap import Backend, BackendContext, FetchType, TemporalContext from openeo_gfmap.backend import cdse_connection from openeo_gfmap.manager import _log from openeo_gfmap.manager.job_manager import GFMAPJobManager -from openeo_gfmap.manager.job_splitters import ( - _load_s2_grid, - split_job_s2grid, -) +from openeo_gfmap.manager.job_splitters import _load_s2_grid, split_job_s2grid from openeo_gfmap.utils.netcdf import update_nc_attributes from worldcereal.openeo.preprocessing import raw_datacube_S2 @@ -44,20 +40,22 @@ "type": "application/x-netcdf", "roles": ["data"], "proj:shape": [64, 64], - "raster:bands": [{"name": "S2-L2A-B01"}, - {"name": "S2-L2A-B02"}, - {"name": "S2-L2A-B03"}, - {"name": "S2-L2A-B04"}, - {"name": "S2-L2A-B05"}, - {"name": "S2-L2A-B06"}, - {"name": "S2-L2A-B07"}, - {"name": "S2-L2A-B8A"}, - {"name": "S2-L2A-B08"}, - {"name": "S2-L2A-B11"}, - {"name": "S2-L2A-B12"}, - {"name": "S2-L2A-SCL"}, - {"name": "S2-L2A-SCL_DILATED_MASK"}, - {"name": "S2-L2A-DISTANCE_TO_CLOUD"}], + "raster:bands": [ + {"name": "S2-L2A-B01"}, + {"name": "S2-L2A-B02"}, + {"name": "S2-L2A-B03"}, + {"name": "S2-L2A-B04"}, + {"name": "S2-L2A-B05"}, + {"name": "S2-L2A-B06"}, + {"name": "S2-L2A-B07"}, + {"name": "S2-L2A-B8A"}, + {"name": "S2-L2A-B08"}, + {"name": "S2-L2A-B11"}, + {"name": "S2-L2A-B12"}, + {"name": "S2-L2A-SCL"}, + {"name": "S2-L2A-SCL_DILATED_MASK"}, + {"name": "S2-L2A-DISTANCE_TO_CLOUD"}, + ], "cube:variables": { "S2-L2A-B01": {"dimensions": ["time", "y", "x"], "type": "data"}, "S2-L2A-B02": {"dimensions": ["time", "y", "x"], "type": "data"}, @@ -71,8 +69,14 @@ "S2-L2A-B11": {"dimensions": ["time", "y", "x"], "type": "data"}, "S2-L2A-B12": {"dimensions": ["time", "y", "x"], "type": "data"}, "S2-L2A-SCL": {"dimensions": ["time", "y", "x"], "type": "data"}, - "S2-L2A-SCL_DILATED_MASK": {"dimensions": ["time", "y", "x"], "type": "data"}, - "S2-L2A-DISTANCE_TO_CLOUD": {"dimensions": ["time", "y", "x"], "type": "data"}, + "S2-L2A-SCL_DILATED_MASK": { + "dimensions": ["time", "y", "x"], + "type": "data", + }, + "S2-L2A-DISTANCE_TO_CLOUD": { + "dimensions": ["time", "y", "x"], + "type": "data", + }, }, "eo:bands": [ { @@ -157,6 +161,7 @@ } ) + def create_datacube_optical( row: pd.Series, connection: openeo.DataCube, @@ -174,12 +179,12 @@ def create_datacube_optical( assert isinstance(geometry, geojson.FeatureCollection) # Filter the geometry to the rows with the extract only flag - geometry = _filter_extract_true(geometry) + geometry = filter_extract_true(geometry) assert len(geometry.features) > 0, "No geometries with the extract flag found" # Performs a buffer of 64 px around the geometry - geometry_df = _buffer_geometry(geometry) - spatial_extent_url = _upload_geoparquet_artifactory(geometry_df, row.name) + geometry_df = buffer_geometry(geometry) + spatial_extent_url = upload_geoparquet_artifactory(geometry_df, row.name) # Backend name and fetching type backend = Backend(row.backend_name) @@ -218,7 +223,7 @@ def create_datacube_optical( ) # Increase the memory of the jobs depending on the number of polygons to extract - number_polygons = _get_job_nb_polygons(row) + number_polygons = get_job_nb_polygons(row) _log.debug("Number of polygons to extract %s", number_polygons) job_options = { @@ -291,8 +296,8 @@ def post_job_action( if __name__ == "__main__": - _setup_logger() - from extract_sar import _pipeline_log + setup_logger() + from extract_sar import pipeline_log parser = argparse.ArgumentParser( description="S2 samples extraction with OpenEO-GFMAP package." @@ -329,9 +334,9 @@ def post_job_action( tracking_df_path = Path(args.output_path) / "job_tracking.csv" # Load the input dataframe - _pipeline_log.info("Loading input dataframe from %s.", args.input_df) + pipeline_log.info("Loading input dataframe from %s.", args.input_df) - if args.input_df.name.endswith('.geoparquet'): + if args.input_df.name.endswith(".geoparquet"): input_df = gpd.read_parquet(args.input_df) else: input_df = gpd.read_file(args.input_df) @@ -365,13 +370,11 @@ def post_job_action( post_job_params={}, ) - manager.add_backend( - Backend.CDSE.value, cdse_connection, parallel_jobs=6 - ) + manager.add_backend(Backend.CDSE.value, cdse_connection, parallel_jobs=6) manager.setup_stac( constellation="sentinel2", item_assets={"sentinel2": sentinel2_asset}, ) - _pipeline_log.info("Launching the jobs from the manager.") + pipeline_log.info("Launching the jobs from the manager.") manager.run_jobs(job_df, create_datacube_optical, tracking_df_path) diff --git a/scripts/extractions/extract_sar.py b/scripts/extractions/extract_sar.py index c7f1ffff..e85f4a2d 100644 --- a/scripts/extractions/extract_sar.py +++ b/scripts/extractions/extract_sar.py @@ -13,7 +13,6 @@ import geojson import geopandas as gpd import openeo -from openeo.extra.job_management import _log as _log_openeo import pandas as pd import pystac import requests @@ -21,7 +20,6 @@ from openeo_gfmap import Backend, BackendContext, FetchType, TemporalContext from openeo_gfmap.backend import cdse_staging_connection from openeo_gfmap.fetching.s1 import build_sentinel1_grd_extractor -from openeo_gfmap.manager import _log from openeo_gfmap.manager.job_manager import GFMAPJobManager from openeo_gfmap.manager.job_splitters import ( _append_h3_index, @@ -31,22 +29,18 @@ from shapely.geometry import Point # Logger for this current pipeline -_pipeline_log: Optional[logging.Logger] = None +pipeline_log: Optional[logging.Logger] = None -def _setup_logger(level=logging.INFO) -> None: - global _pipeline_log, _log_openeo +def setup_logger(level=logging.INFO) -> None: """Setup the logger from the openeo_gfmap package to the assigned level.""" - _pipeline_log = logging.getLogger("pipeline_sar") + global pipeline_log + pipeline_log = logging.getLogger("pipeline_sar") - _pipeline_log.setLevel(level) - _log.setLevel(level) - _log_openeo.setLevel(level) + pipeline_log.setLevel(level) stream_handler = logging.StreamHandler() - _log.addHandler(stream_handler) - _log_openeo.addHandler(stream_handler) - _pipeline_log.addHandler(stream_handler) + pipeline_log.addHandler(stream_handler) formatter = logging.Formatter("%(asctime)s|%(name)s|%(levelname)s: %(message)s") stream_handler.setFormatter(formatter) @@ -56,12 +50,12 @@ class ManagerLoggerFilter(logging.Filter): """Filter to only accept the OpenEO-GFMAP manager logs.""" def filter(self, record): - return record.name in [_log.name, _pipeline_log.name, _log_openeo.name] + return record.name in [pipeline_log.name] stream_handler.addFilter(ManagerLoggerFilter()) -def _buffer_geometry( +def buffer_geometry( geometries: geojson.FeatureCollection, distance_m: int = 320 ) -> gpd.GeoDataFrame: """For each geometry of the colleciton, perform a square buffer of 320 @@ -83,14 +77,14 @@ def _buffer_geometry( return gdf -def _filter_extract_true(geometries: geojson.FeatureCollection) -> gpd.GeoDataFrame: +def filter_extract_true(geometries: geojson.FeatureCollection) -> gpd.GeoDataFrame: """Remove all the geometries from the Feature Collection that have the property field `extract` set to `False`""" return geojson.FeatureCollection( [f for f in geometries.features if f.properties.get("extract", 0) == 1] ) -def _upload_geoparquet_artifactory(gdf: gpd.GeoDataFrame, name: str) -> str: +def upload_geoparquet_artifactory(gdf: gpd.GeoDataFrame, name: str) -> str: """Upload the given GeoDataFrame to artifactory and return the URL of the uploaded file. Necessary as a workaround for Polygon sampling in OpenEO using custom CRS. @@ -122,7 +116,7 @@ def _upload_geoparquet_artifactory(gdf: gpd.GeoDataFrame, name: str) -> str: return upload_url -def _get_job_nb_polygons(row: pd.Series) -> int: +def get_job_nb_polygons(row: pd.Series) -> int: """Get the number of polygons in the geometry.""" return len( list( @@ -221,12 +215,12 @@ def create_datacube_sar( assert isinstance(geometry, geojson.FeatureCollection) # Filter the geometry to the rows with the extract only flag - geometry = _filter_extract_true(geometry) + geometry = filter_extract_true(geometry) assert len(geometry.features) > 0, "No geometries with the extract flag found" # Performs a buffer of 64 px around the geometry - geometry_df = _buffer_geometry(geometry, distance_m=310) - spatial_extent_url = _upload_geoparquet_artifactory(geometry_df, row.name) + geometry_df = buffer_geometry(geometry, distance_m=310) + spatial_extent_url = upload_geoparquet_artifactory(geometry_df, row.name) # Backend name and fetching type backend = Backend(row.backend_name) @@ -257,7 +251,7 @@ def create_datacube_sar( valid_time = geometry.features[0].properties["valid_time"] # Increase the memory of the jobs depending on the number of polygons to extract - number_polygons = _get_job_nb_polygons(row) + number_polygons = get_job_nb_polygons(row) _log.debug("Number of polygons to extract %s", number_polygons) job_options = { @@ -315,7 +309,7 @@ def post_job_action( if __name__ == "__main__": - _setup_logger() + setup_logger() parser = argparse.ArgumentParser( description="S1 samples extraction with OpenEO-GFMAP package." @@ -349,7 +343,7 @@ def post_job_action( # Load the input dataframe, and perform dataset splitting using the h3 tile # to respect the area of interest. Also filters out the jobs that have # no location with the extract=True flag. - _pipeline_log.info("Loading input dataframe from %s.", args.input_df) + pipeline_log.info("Loading input dataframe from %s.", args.input_df) input_df = gpd.read_file(args.input_df) input_df = _append_h3_index(input_df) @@ -359,7 +353,7 @@ def post_job_action( job_df = create_job_dataframe(Backend.CDSE_STAGING, split_dfs) - _pipeline_log.warning( + pipeline_log.warning( "Sub-sampling the job dataframe for testing. Remove this for production." ) job_df = job_df.iloc[[0]] @@ -392,8 +386,8 @@ def post_job_action( Backend.CDSE_STAGING.value, cdse_staging_connection, parallel_jobs=6 ) - _pipeline_log.info("Launching the jobs from the manager.") + pipeline_log.info("Launching the jobs from the manager.") manager.run_jobs(job_df, create_datacube_sar, tracking_df_path) - _pipeline_log.info("Jobs are finished, creating the STAC catalogue...") + pipeline_log.info("Jobs are finished, creating the STAC catalogue...") manager.create_stac() From 49e98ab86d0be4dede14e1bfe2c39e51e47b10e1 Mon Sep 17 00:00:00 2001 From: Darius Couchard Date: Wed, 22 May 2024 11:51:35 +0200 Subject: [PATCH 09/11] added scaling to meteo --- scripts/extractions/extract_meteo.py | 4 ++++ scripts/extractions/extract_sar.py | 2 +- src/worldcereal/openeo/preprocessing.py | 1 - 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/scripts/extractions/extract_meteo.py b/scripts/extractions/extract_meteo.py index 1cb0d865..29b6bddd 100644 --- a/scripts/extractions/extract_meteo.py +++ b/scripts/extractions/extract_meteo.py @@ -65,6 +65,10 @@ def create_datacube_meteo( source=["temperature-mean", "precipitation-flux"], ) + # Rescale to uint16, multiplying by 100 first + cube = cube * 100 + cube = cube.linear_scale_range(0, 65534, 0, 65534) + h3index = geometry.features[0].properties["h3index"] valid_time = geometry.features[0].properties["valid_time"] diff --git a/scripts/extractions/extract_sar.py b/scripts/extractions/extract_sar.py index e85f4a2d..874d041f 100644 --- a/scripts/extractions/extract_sar.py +++ b/scripts/extractions/extract_sar.py @@ -252,7 +252,7 @@ def create_datacube_sar( # Increase the memory of the jobs depending on the number of polygons to extract number_polygons = get_job_nb_polygons(row) - _log.debug("Number of polygons to extract %s", number_polygons) + pipeline_log.debug("Number of polygons to extract %s", number_polygons) job_options = { "executor-memory": executor_memory, diff --git a/src/worldcereal/openeo/preprocessing.py b/src/worldcereal/openeo/preprocessing.py index 67e2bda0..03a32919 100644 --- a/src/worldcereal/openeo/preprocessing.py +++ b/src/worldcereal/openeo/preprocessing.py @@ -3,7 +3,6 @@ from openeo import UDF, Connection, DataCube from openeo_gfmap import ( - Backend, BackendContext, BoundingBoxExtent, FetchType, From 2fb819f332f20b3f1f321774c8dc2c0b857417fe Mon Sep 17 00:00:00 2001 From: Darius Couchard Date: Wed, 22 May 2024 12:35:10 +0200 Subject: [PATCH 10/11] Updated private functions to public in gfmap and removed unnecessary featureflags --- scripts/extractions/extract_sar.py | 8 ++++---- src/worldcereal/openeo/preprocessing.py | 9 --------- 2 files changed, 4 insertions(+), 13 deletions(-) diff --git a/scripts/extractions/extract_sar.py b/scripts/extractions/extract_sar.py index 874d041f..199c163d 100644 --- a/scripts/extractions/extract_sar.py +++ b/scripts/extractions/extract_sar.py @@ -22,8 +22,8 @@ from openeo_gfmap.fetching.s1 import build_sentinel1_grd_extractor from openeo_gfmap.manager.job_manager import GFMAPJobManager from openeo_gfmap.manager.job_splitters import ( - _append_h3_index, - _load_s2_grid, + append_h3_index, + load_s2_grid, split_job_s2grid, ) from shapely.geometry import Point @@ -346,7 +346,7 @@ def post_job_action( pipeline_log.info("Loading input dataframe from %s.", args.input_df) input_df = gpd.read_file(args.input_df) - input_df = _append_h3_index(input_df) + input_df = append_h3_index(input_df) split_dfs = split_job_s2grid(input_df, max_points=args.max_locations) split_dfs = [df for df in split_dfs if df.extract.any()] @@ -368,7 +368,7 @@ def post_job_action( # Setup the s2 grid for the output path generation function generate_output_path = partial( generate_output_path, - s2_grid=_load_s2_grid(), + s2_grid=load_s2_grid(), ) manager = GFMAPJobManager( diff --git a/src/worldcereal/openeo/preprocessing.py b/src/worldcereal/openeo/preprocessing.py index 03a32919..6991ea41 100644 --- a/src/worldcereal/openeo/preprocessing.py +++ b/src/worldcereal/openeo/preprocessing.py @@ -116,15 +116,6 @@ def raw_datacube_S2( "eo:cloud_cover": lambda val: val <= 95.0, }, } - # if (backend_context.backend is Backend.CDSE_STAGING) and ( - # fetch_type == FetchType.POLYGON - # ): # At the moment only available in CDSE staging - # extraction_parameters["update_arguments"] = { - # "featureflags": { - # "tile_size": 128, - # "load_per_product": True, - # } - # } if additional_masks: extraction_parameters["pre_merge"] = additional_masks if filter_tile: From e9ce691a26e54b4ec0ac8eb736684e4964c30aa0 Mon Sep 17 00:00:00 2001 From: Darius Couchard Date: Wed, 22 May 2024 12:49:35 +0200 Subject: [PATCH 11/11] Fix Changed imports from private to public --- scripts/extractions/extract_meteo.py | 8 ++++---- scripts/extractions/extract_optical.py | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/scripts/extractions/extract_meteo.py b/scripts/extractions/extract_meteo.py index 29b6bddd..91ac84b7 100644 --- a/scripts/extractions/extract_meteo.py +++ b/scripts/extractions/extract_meteo.py @@ -20,8 +20,8 @@ from openeo_gfmap.backend import vito_connection from openeo_gfmap.manager.job_manager import GFMAPJobManager from openeo_gfmap.manager.job_splitters import ( - _append_h3_index, - _load_s2_grid, + append_h3_index, + load_s2_grid, split_job_s2grid, ) @@ -123,7 +123,7 @@ def create_datacube_meteo( pipeline_log.info("Loading input dataframe from %s.", args.input_df) input_df = gpd.read_file(args.input_df) - input_df = _append_h3_index(input_df, grid_resolution=3) + input_df = append_h3_index(input_df, grid_resolution=3) split_dfs = split_job_s2grid(input_df, max_points=args.max_locations) split_dfs = [df for df in split_dfs if df.extract.any()] @@ -146,7 +146,7 @@ def create_datacube_meteo( # Setup the s2 grid for the output path generation function generate_output_path = partial( generate_output_path, - s2_grid=_load_s2_grid(), + s2_grid=load_s2_grid(), ) manager = GFMAPJobManager( diff --git a/scripts/extractions/extract_optical.py b/scripts/extractions/extract_optical.py index 99551f97..7651a7a4 100644 --- a/scripts/extractions/extract_optical.py +++ b/scripts/extractions/extract_optical.py @@ -26,7 +26,7 @@ from openeo_gfmap.backend import cdse_connection from openeo_gfmap.manager import _log from openeo_gfmap.manager.job_manager import GFMAPJobManager -from openeo_gfmap.manager.job_splitters import _load_s2_grid, split_job_s2grid +from openeo_gfmap.manager.job_splitters import load_s2_grid, split_job_s2grid from openeo_gfmap.utils.netcdf import update_nc_attributes from worldcereal.openeo.preprocessing import raw_datacube_S2 @@ -356,7 +356,7 @@ def post_job_action( # Setup the s2 grid for the output path generation function generate_output_path = partial( generate_output_path, - s2_grid=_load_s2_grid(), + s2_grid=load_s2_grid(), ) manager = GFMAPJobManager(