From 92a48d008ebc6af2dd2a4f1bff406a8919996b59 Mon Sep 17 00:00:00 2001 From: Favyen Bastani Date: Fri, 19 Dec 2025 08:15:14 -0800 Subject: [PATCH] Fix Sentinel-2 vessel detection pipeline and run it for UPF. - Add one_off_projects/2025_12_africa_vessels/ scripts to handle this request. - Update rslp/common/worker.py to support WEKA mount in worker jobs. - Update prediction pipeline to reduce memory usage. With AllPatchesDataset, data loader workers will have entire scenes in memory instead of just individual windows, so we need to reduce the number of workers to compensate. --- .../2025_12_africa_vessels/README.md | 91 ++++++++++++ .../2025_12_africa_vessels/get_aois.py | 49 ++++++ .../2025_12_africa_vessels/get_scene_ids.py | 139 ++++++++++++++++++ rslp/common/worker.py | 8 +- rslp/sentinel2_vessels/predict_pipeline.py | 26 +++- 5 files changed, 310 insertions(+), 3 deletions(-) create mode 100644 one_off_projects/2025_12_africa_vessels/README.md create mode 100644 one_off_projects/2025_12_africa_vessels/get_aois.py create mode 100644 one_off_projects/2025_12_africa_vessels/get_scene_ids.py diff --git a/one_off_projects/2025_12_africa_vessels/README.md b/one_off_projects/2025_12_africa_vessels/README.md new file mode 100644 index 00000000..6f9e9401 --- /dev/null +++ b/one_off_projects/2025_12_africa_vessels/README.md @@ -0,0 +1,91 @@ +This is for UPF request to get vessel detections in Africa. + +First run script to get the 5x5 degree AOIs that are not on land: + +``` +python one_off_projects/2025_12_africa_vessels/get_aois.py +``` + +Then we can get scene IDs for each AOI, e.g.: + +``` +python one_off_projects/2025_12_africa_vessels/get_scene_ids.py \ + --cache_path /weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/cache/sentinel2/ \ + --geojson /weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/aois/aoi_0_0_5_5.geojson \ + --out_fname /weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/scene_ids/aoi_0_0_5_5.json \ + --geom_fname /weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/scene_geojsons/aoi_0_0_5_5.geojson +``` + +The `--out_fname` has a simple list of scene IDs compatible with +`rslp.main sentinel2_vessels write_entries`, while `--geom_fname` has the detailed +scene geometry that could be useful for UPF. + +Here is batch version: + +```python +import multiprocessing +import os +import subprocess + +import tqdm + +def process(aoi_name: str) -> None: + subprocess.call([ + "python", + "one_off_projects/2025_12_africa_vessels/get_scene_ids.py", + "--cache_path=/weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/cache/sentinel2/", + f"--geojson=/weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/aois/{aoi_name}.geojson", + f"--out_fname=/weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/scene_ids/{aoi_name}.json", + f"--geom_fname=/weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/scene_geojsons/{aoi_name}.geojson", + ]) + +aoi_names = [fname.split(".")[0] for fname in os.listdir("/weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/aois/")] +p = multiprocessing.Pool(4) +outputs = p.imap_unordered(process, aoi_names) +for _ in tqdm.tqdm(outputs, total=len(aoi_names)): + pass +p.close() +``` + +Write the jobs to queue: + +``` +python -m rslp.main sentinel2_vessels write_entries \ + --queue_name favyen/sentinel2-vessels-predict \ + --json_fname /weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/scene_ids/aoi_0_0_5_5.json \ + --json_out_dir /weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/json_outputs/ \ + --geojson_out_dir /weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/geojson_outputs/ \ + --crop_out_dir /weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/crop_outputs/ +``` + +Here is batch version: + +```python +import os +import subprocess +for fname in os.listdir("/weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/scene_ids/"): + label = fname.split(".")[0] + subprocess.call([ + "python", + "-m", + "rslp.main", + "sentinel2_vessels", + "write_entries", + "--queue_name", + "favyen/sentinel2-vessels-predict", + "--json_fname", + f"/weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/scene_ids/{label}.json", + "--json_out_dir", + "/weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/json_outputs/", + "--geojson_out_dir", + "/weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/geojson_outputs/", + "--crop_out_dir", + "/weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/crop_outputs/", + ]) +``` + +And launch worker jobs: + +``` +python -m rslp.main common launch --image_name favyen/rslpomp20251212c --queue_name favyen/sentinel2-vessels-predict --num_workers 100 --gpus 1 --shared_memory 256GiB --cluster=[ai2/jupiter,ai2/neptune,ai2/saturn] --weka_mounts+='{"bucket_name": "dfive-default", "mount_path": "/weka/dfive-default"}' +``` diff --git a/one_off_projects/2025_12_africa_vessels/get_aois.py b/one_off_projects/2025_12_africa_vessels/get_aois.py new file mode 100644 index 00000000..4dedfd3b --- /dev/null +++ b/one_off_projects/2025_12_africa_vessels/get_aois.py @@ -0,0 +1,49 @@ +"""Get 5x5 degree tiles that are not on land.""" + +import json +from pathlib import Path + +from global_land_mask import globe + +AOI_DIR = "/weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/aois/" +GRID_SIZE = 5 + +if __name__ == "__main__": + box = (-30, -40, 70, 35) + + for lon in range(box[0], box[2], GRID_SIZE): + for lat in range(box[1], box[3], GRID_SIZE): + coordinates = [ + (lon, lat), + (lon, lat + GRID_SIZE), + (lon + GRID_SIZE, lat + GRID_SIZE), + (lon + GRID_SIZE, lat), + (lon, lat), + ] + # Make sure at least one corner is in the ocean, otherwise we skip this tile. + at_least_one_water = False + for coord in coordinates: + if globe.is_land(coord[1], coord[0]): + continue + at_least_one_water = True + + print(lon, lat, at_least_one_water) + + if not at_least_one_water: + continue + + fname = Path(AOI_DIR) / f"aoi_{lon}_{lat}_{lon+GRID_SIZE}_{lat+GRID_SIZE}.geojson" + with fname.open("w") as f: + feat = { + "type": "Feature", + "properties": {}, + "geometry": { + "type": "Polygon", + "coordinates": [coordinates], + } + } + json.dump({ + "type": "FeatureCollection", + "properties": {}, + "features": [feat], + }, f) diff --git a/one_off_projects/2025_12_africa_vessels/get_scene_ids.py b/one_off_projects/2025_12_africa_vessels/get_scene_ids.py new file mode 100644 index 00000000..b8bd37d8 --- /dev/null +++ b/one_off_projects/2025_12_africa_vessels/get_scene_ids.py @@ -0,0 +1,139 @@ +"""Get Sentinel-2 scene IDs that we should run vessel detection model on.""" + +import argparse +import json +import multiprocessing +from datetime import datetime, timezone, UTC + +import shapely +import tqdm +from rslearn.config import QueryConfig, SpaceMode +from rslearn.const import WGS84_PROJECTION +from rslearn.data_sources.gcp_public_data import Sentinel2, Sentinel2Item +from rslearn.utils.geometry import STGeometry +from rslearn.utils.vector_format import GeojsonVectorFormat, GeojsonCoordinateMode +from rslearn.utils.feature import Feature +from rslearn.utils.mp import star_imap_unordered +from upath import UPath + + +def split_aoi(aoi: STGeometry, size: int = 1) -> list[STGeometry]: + """Split up a big AOI into smaller geometries. + + Args: + aoi: the AOI to split up. + size: the size for sub-tiles to create within the bounds of the AOI. + + Returns: + list of sub-tiles. + """ + # We assume the tile has integer lon/lat coordinates and are 5x5 degrees. + bounds = tuple(int(v) for v in aoi.shp.bounds) + assert (bounds[2] - bounds[0]) == 5 + assert (bounds[3] - bounds[1]) == 5 + + # Only size=1 really makes sense here since 5 has no larger factors besides 5. + num_x_tiles = (bounds[2] - bounds[0]) // size + num_y_tiles = (bounds[3] - bounds[1]) // size + + geoms: list[STGeometry] = [] + for col in range(num_x_tiles): + for row in range(num_y_tiles): + x_start = bounds[0] + col + y_start = bounds[1] + row + geom = STGeometry(WGS84_PROJECTION, shapely.box(x_start, y_start, x_start + size, y_start + size), aoi.time_range) + geoms.append(geom) + + return geoms + + +def get_items(geom: STGeometry, cache_path: UPath) -> list[Sentinel2Item]: + """Get the items matching the given geometry.""" + query_config = QueryConfig(space_mode=SpaceMode.INTERSECTS, max_matches=100000) + sentinel2 = Sentinel2( + index_cache_dir=cache_path, use_rtree_index=False, use_bigquery=True + ) + item_groups = sentinel2.get_items([geom], query_config)[0] + items = [] + for group in item_groups: + if len(group) != 1: + raise ValueError("expected each item group to have one item with INTERSECTS space mode") + items.append(group[0]) + return items + + +if __name__ == "__main__": + multiprocessing.set_start_method("forkserver") + + parser = argparse.ArgumentParser( + description="Get Sentinel-2 scene IDs", + ) + parser.add_argument( + "--cache_path", + type=str, + help="Path to cache stuff", + required=True, + ) + parser.add_argument( + "--geojson", + type=str, + help="GeoJSON filename containing the area of interest", + required=True, + ) + parser.add_argument( + "--out_fname", + type=str, + help="Filename to write scene IDs", + required=True, + ) + parser.add_argument( + "--geom_fname", + type=str, + help="Filename to write scene geometries", + default=None, + ) + args = parser.parse_args() + + vector_format = GeojsonVectorFormat(coordinate_mode=GeojsonCoordinateMode.WGS84) + features = vector_format.decode_from_file(UPath(args.geojson)) + assert len(features) == 1 + feat = features[0] + + geom = STGeometry( + feat.geometry.projection, + feat.geometry.shp, + ( + datetime(2016, 1, 1, tzinfo=UTC), + datetime(2025, 1, 1, tzinfo=UTC), + ), + ) + + # Split up the AOI. + geoms = split_aoi(geom) + print(f"Got {len(geoms)} sub-tiles") + + # Process the AOIs in parallel. + scene_ids = set() + features: list[Feature] = [] + p = multiprocessing.Pool(64) + outputs = star_imap_unordered(p, get_items, [dict( + geom=geom, + cache_path=UPath(args.cache_path) + ) for geom in geoms]) + for item_list in tqdm.tqdm(outputs, total=len(geoms)): + for item in item_list: + if item.name in scene_ids: + continue + scene_ids.add(item.name) + feat = Feature(item.geometry, { + "scene_id": item.name, + }) + features.append(feat) + + print(f"Got {len(scene_ids)} scene IDs after de-duplication") + + with open(args.out_fname, "w") as f: + json.dump(list(scene_ids), f) + + if args.geom_fname: + vector_format.encode_to_file(UPath(args.geom_fname), features) diff --git a/rslp/common/worker.py b/rslp/common/worker.py index 10f46ad5..9f5a1aa3 100644 --- a/rslp/common/worker.py +++ b/rslp/common/worker.py @@ -23,6 +23,7 @@ from rslp.utils.beaker import ( DEFAULT_BUDGET, DEFAULT_WORKSPACE, + WekaMount, create_gcp_credentials_mount, get_base_env_vars, ) @@ -131,6 +132,7 @@ def launch_workers( gpus: int = 0, shared_memory: str | None = None, priority: BeakerJobPriority = BeakerJobPriority.low, + weka_mounts: list[WekaMount] = [], ) -> None: """Start workers for the prediction jobs. @@ -142,11 +144,15 @@ def launch_workers( gpus: number of GPUs to request per worker. shared_memory: shared memory string like "256GiB". priority: priority to assign the Beaker jobs. + weka_mounts: list of weka mounts for Beaker job. """ with Beaker.from_env(default_workspace=DEFAULT_WORKSPACE) as beaker: for _ in tqdm.tqdm(range(num_workers)): env_vars = get_base_env_vars(use_weka_prefix=False) + datasets = [create_gcp_credentials_mount()] + datasets += [weka_mount.to_data_mount() for weka_mount in weka_mounts] + spec = BeakerExperimentSpec.new( budget=DEFAULT_BUDGET, description="worker", @@ -163,7 +169,7 @@ def launch_workers( cluster=cluster, ), preemptible=True, - datasets=[create_gcp_credentials_mount()], + datasets=datasets, env_vars=env_vars, resources=BeakerTaskResources( gpu_count=gpus, shared_memory=shared_memory diff --git a/rslp/sentinel2_vessels/predict_pipeline.py b/rslp/sentinel2_vessels/predict_pipeline.py index 62f771a9..61eb92b4 100644 --- a/rslp/sentinel2_vessels/predict_pipeline.py +++ b/rslp/sentinel2_vessels/predict_pipeline.py @@ -49,6 +49,15 @@ SENTINEL2_RESOLUTION = 10 CROP_WINDOW_SIZE = 128 +# Use lower number of data loader workers for prediction since each worker will read a +# big scene (unlike the small windows used during training). +NUM_DATA_LOADER_WORKERS = 4 + +# We make sure the windows we create for Sentinel-2 scenes are multiples of this amount +# because we store some bands at 1/4 of the input resolution, so the window size needs +# be a multiple of 4. +WINDOW_MIN_MULTIPLE = 4 + # Distance threshold for near marine infrastructure filter in km. # 0.05 km = 50 m INFRA_DISTANCE_THRESHOLD = 0.05 @@ -265,12 +274,21 @@ def get_vessel_detections( windows: list[Window] = [] group = "detector_predict" for scene_idx, scene_data in enumerate(scene_datas): + # Pad the bounds so they are multiple of WINDOW_MIN_MULTIPLE. + padded_bounds = ( + (scene_data.bounds[0] // WINDOW_MIN_MULTIPLE) * WINDOW_MIN_MULTIPLE, + (scene_data.bounds[1] // WINDOW_MIN_MULTIPLE) * WINDOW_MIN_MULTIPLE, + ((scene_data.bounds[2] + WINDOW_MIN_MULTIPLE - 1) // WINDOW_MIN_MULTIPLE) + * WINDOW_MIN_MULTIPLE, + ((scene_data.bounds[3] + WINDOW_MIN_MULTIPLE - 1) // WINDOW_MIN_MULTIPLE) + * WINDOW_MIN_MULTIPLE, + ) window = Window( storage=dataset.storage, group=group, name=str(scene_idx), projection=scene_data.projection, - bounds=scene_data.bounds, + bounds=padded_bounds, time_range=scene_data.time_range, ) window.save() @@ -305,7 +323,11 @@ def get_vessel_detections( # Run object detector. with time_operation(TimerOperations.RunModelPredict): - run_model_predict(DETECT_MODEL_CONFIG, ds_path) + run_model_predict( + DETECT_MODEL_CONFIG, + ds_path, + extra_args=["--data.init_args.num_workers", str(NUM_DATA_LOADER_WORKERS)], + ) # Read the detections. detections: list[VesselDetection] = []