Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions one_off_projects/2025_12_africa_vessels/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
This is for UPF request to get vessel detections in Africa.

First run script to get the 5x5 degree AOIs that are not on land:

```
python one_off_projects/2025_12_africa_vessels/get_aois.py
```

Then we can get scene IDs for each AOI, e.g.:

```
python one_off_projects/2025_12_africa_vessels/get_scene_ids.py \
--cache_path /weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/cache/sentinel2/ \
--geojson /weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/aois/aoi_0_0_5_5.geojson \
--out_fname /weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/scene_ids/aoi_0_0_5_5.json \
--geom_fname /weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/scene_geojsons/aoi_0_0_5_5.geojson
```

The `--out_fname` has a simple list of scene IDs compatible with
`rslp.main sentinel2_vessels write_entries`, while `--geom_fname` has the detailed
scene geometry that could be useful for UPF.

Here is batch version:

```python
import multiprocessing
import os
import subprocess

import tqdm

def process(aoi_name: str) -> None:
subprocess.call([
"python",
"one_off_projects/2025_12_africa_vessels/get_scene_ids.py",
"--cache_path=/weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/cache/sentinel2/",
f"--geojson=/weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/aois/{aoi_name}.geojson",
f"--out_fname=/weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/scene_ids/{aoi_name}.json",
f"--geom_fname=/weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/scene_geojsons/{aoi_name}.geojson",
])

aoi_names = [fname.split(".")[0] for fname in os.listdir("/weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/aois/")]
p = multiprocessing.Pool(4)
outputs = p.imap_unordered(process, aoi_names)
for _ in tqdm.tqdm(outputs, total=len(aoi_names)):
pass
p.close()
```

Write the jobs to queue:

```
python -m rslp.main sentinel2_vessels write_entries \
--queue_name favyen/sentinel2-vessels-predict \
--json_fname /weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/scene_ids/aoi_0_0_5_5.json \
--json_out_dir /weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/json_outputs/ \
--geojson_out_dir /weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/geojson_outputs/ \
--crop_out_dir /weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/crop_outputs/
```

Here is batch version:

```python
import os
import subprocess
for fname in os.listdir("/weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/scene_ids/"):
label = fname.split(".")[0]
subprocess.call([
"python",
"-m",
"rslp.main",
"sentinel2_vessels",
"write_entries",
"--queue_name",
"favyen/sentinel2-vessels-predict",
"--json_fname",
f"/weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/scene_ids/{label}.json",
"--json_out_dir",
"/weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/json_outputs/",
"--geojson_out_dir",
"/weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/geojson_outputs/",
"--crop_out_dir",
"/weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/crop_outputs/",
])
```

And launch worker jobs:

```
python -m rslp.main common launch --image_name favyen/rslpomp20251212c --queue_name favyen/sentinel2-vessels-predict --num_workers 100 --gpus 1 --shared_memory 256GiB --cluster=[ai2/jupiter,ai2/neptune,ai2/saturn] --weka_mounts+='{"bucket_name": "dfive-default", "mount_path": "/weka/dfive-default"}'
```
49 changes: 49 additions & 0 deletions one_off_projects/2025_12_africa_vessels/get_aois.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""Get 5x5 degree tiles that are not on land."""

import json
from pathlib import Path

from global_land_mask import globe

AOI_DIR = "/weka/dfive-default/rslearn-eai/projects/2025_12_africa_vessels/aois/"
GRID_SIZE = 5

if __name__ == "__main__":
box = (-30, -40, 70, 35)

for lon in range(box[0], box[2], GRID_SIZE):
for lat in range(box[1], box[3], GRID_SIZE):
coordinates = [
(lon, lat),
(lon, lat + GRID_SIZE),
(lon + GRID_SIZE, lat + GRID_SIZE),
(lon + GRID_SIZE, lat),
(lon, lat),
]
# Make sure at least one corner is in the ocean, otherwise we skip this tile.
at_least_one_water = False
for coord in coordinates:
if globe.is_land(coord[1], coord[0]):
continue
at_least_one_water = True

print(lon, lat, at_least_one_water)

if not at_least_one_water:
continue

fname = Path(AOI_DIR) / f"aoi_{lon}_{lat}_{lon+GRID_SIZE}_{lat+GRID_SIZE}.geojson"
with fname.open("w") as f:
feat = {
"type": "Feature",
"properties": {},
"geometry": {
"type": "Polygon",
"coordinates": [coordinates],
}
}
json.dump({
"type": "FeatureCollection",
"properties": {},
"features": [feat],
}, f)
139 changes: 139 additions & 0 deletions one_off_projects/2025_12_africa_vessels/get_scene_ids.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
"""Get Sentinel-2 scene IDs that we should run vessel detection model on."""

import argparse
import json
import multiprocessing
from datetime import datetime, timezone, UTC

import shapely
import tqdm
from rslearn.config import QueryConfig, SpaceMode
from rslearn.const import WGS84_PROJECTION
from rslearn.data_sources.gcp_public_data import Sentinel2, Sentinel2Item
from rslearn.utils.geometry import STGeometry
from rslearn.utils.vector_format import GeojsonVectorFormat, GeojsonCoordinateMode
from rslearn.utils.feature import Feature
from rslearn.utils.mp import star_imap_unordered
from upath import UPath


def split_aoi(aoi: STGeometry, size: int = 1) -> list[STGeometry]:
"""Split up a big AOI into smaller geometries.

Args:
aoi: the AOI to split up.
size: the size for sub-tiles to create within the bounds of the AOI.

Returns:
list of sub-tiles.
"""
# We assume the tile has integer lon/lat coordinates and are 5x5 degrees.
bounds = tuple(int(v) for v in aoi.shp.bounds)
assert (bounds[2] - bounds[0]) == 5
assert (bounds[3] - bounds[1]) == 5

# Only size=1 really makes sense here since 5 has no larger factors besides 5.
num_x_tiles = (bounds[2] - bounds[0]) // size
num_y_tiles = (bounds[3] - bounds[1]) // size

geoms: list[STGeometry] = []
for col in range(num_x_tiles):
for row in range(num_y_tiles):
x_start = bounds[0] + col
y_start = bounds[1] + row
geom = STGeometry(WGS84_PROJECTION, shapely.box(x_start, y_start, x_start + size, y_start + size), aoi.time_range)
geoms.append(geom)

return geoms


def get_items(geom: STGeometry, cache_path: UPath) -> list[Sentinel2Item]:
"""Get the items matching the given geometry."""
query_config = QueryConfig(space_mode=SpaceMode.INTERSECTS, max_matches=100000)
sentinel2 = Sentinel2(
index_cache_dir=cache_path, use_rtree_index=False, use_bigquery=True
)
item_groups = sentinel2.get_items([geom], query_config)[0]
items = []
for group in item_groups:
if len(group) != 1:
raise ValueError("expected each item group to have one item with INTERSECTS space mode")
items.append(group[0])
return items


if __name__ == "__main__":
multiprocessing.set_start_method("forkserver")

parser = argparse.ArgumentParser(
description="Get Sentinel-2 scene IDs",
)
parser.add_argument(
"--cache_path",
type=str,
help="Path to cache stuff",
required=True,
)
parser.add_argument(
"--geojson",
type=str,
help="GeoJSON filename containing the area of interest",
required=True,
)
parser.add_argument(
"--out_fname",
type=str,
help="Filename to write scene IDs",
required=True,
)
parser.add_argument(
"--geom_fname",
type=str,
help="Filename to write scene geometries",
default=None,
)
args = parser.parse_args()

vector_format = GeojsonVectorFormat(coordinate_mode=GeojsonCoordinateMode.WGS84)
features = vector_format.decode_from_file(UPath(args.geojson))
assert len(features) == 1
feat = features[0]

geom = STGeometry(
feat.geometry.projection,
feat.geometry.shp,
(
datetime(2016, 1, 1, tzinfo=UTC),
datetime(2025, 1, 1, tzinfo=UTC),
),
)

# Split up the AOI.
geoms = split_aoi(geom)
print(f"Got {len(geoms)} sub-tiles")

# Process the AOIs in parallel.
scene_ids = set()
features: list[Feature] = []
p = multiprocessing.Pool(64)
outputs = star_imap_unordered(p, get_items, [dict(
geom=geom,
cache_path=UPath(args.cache_path)
) for geom in geoms])
for item_list in tqdm.tqdm(outputs, total=len(geoms)):
for item in item_list:
if item.name in scene_ids:
continue
scene_ids.add(item.name)
feat = Feature(item.geometry, {
"scene_id": item.name,
})
features.append(feat)

print(f"Got {len(scene_ids)} scene IDs after de-duplication")

with open(args.out_fname, "w") as f:
json.dump(list(scene_ids), f)

if args.geom_fname:
vector_format.encode_to_file(UPath(args.geom_fname), features)
8 changes: 7 additions & 1 deletion rslp/common/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from rslp.utils.beaker import (
DEFAULT_BUDGET,
DEFAULT_WORKSPACE,
WekaMount,
create_gcp_credentials_mount,
get_base_env_vars,
)
Expand Down Expand Up @@ -131,6 +132,7 @@ def launch_workers(
gpus: int = 0,
shared_memory: str | None = None,
priority: BeakerJobPriority = BeakerJobPriority.low,
weka_mounts: list[WekaMount] = [],
) -> None:
"""Start workers for the prediction jobs.

Expand All @@ -142,11 +144,15 @@ def launch_workers(
gpus: number of GPUs to request per worker.
shared_memory: shared memory string like "256GiB".
priority: priority to assign the Beaker jobs.
weka_mounts: list of weka mounts for Beaker job.
"""
with Beaker.from_env(default_workspace=DEFAULT_WORKSPACE) as beaker:
for _ in tqdm.tqdm(range(num_workers)):
env_vars = get_base_env_vars(use_weka_prefix=False)

datasets = [create_gcp_credentials_mount()]
datasets += [weka_mount.to_data_mount() for weka_mount in weka_mounts]

spec = BeakerExperimentSpec.new(
budget=DEFAULT_BUDGET,
description="worker",
Expand All @@ -163,7 +169,7 @@ def launch_workers(
cluster=cluster,
),
preemptible=True,
datasets=[create_gcp_credentials_mount()],
datasets=datasets,
env_vars=env_vars,
resources=BeakerTaskResources(
gpu_count=gpus, shared_memory=shared_memory
Expand Down
26 changes: 24 additions & 2 deletions rslp/sentinel2_vessels/predict_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@
SENTINEL2_RESOLUTION = 10
CROP_WINDOW_SIZE = 128

# Use lower number of data loader workers for prediction since each worker will read a
# big scene (unlike the small windows used during training).
NUM_DATA_LOADER_WORKERS = 4

# We make sure the windows we create for Sentinel-2 scenes are multiples of this amount
# because we store some bands at 1/4 of the input resolution, so the window size needs
# be a multiple of 4.
WINDOW_MIN_MULTIPLE = 4

# Distance threshold for near marine infrastructure filter in km.
# 0.05 km = 50 m
INFRA_DISTANCE_THRESHOLD = 0.05
Expand Down Expand Up @@ -265,12 +274,21 @@ def get_vessel_detections(
windows: list[Window] = []
group = "detector_predict"
for scene_idx, scene_data in enumerate(scene_datas):
# Pad the bounds so they are multiple of WINDOW_MIN_MULTIPLE.
padded_bounds = (
(scene_data.bounds[0] // WINDOW_MIN_MULTIPLE) * WINDOW_MIN_MULTIPLE,
(scene_data.bounds[1] // WINDOW_MIN_MULTIPLE) * WINDOW_MIN_MULTIPLE,
((scene_data.bounds[2] + WINDOW_MIN_MULTIPLE - 1) // WINDOW_MIN_MULTIPLE)
* WINDOW_MIN_MULTIPLE,
((scene_data.bounds[3] + WINDOW_MIN_MULTIPLE - 1) // WINDOW_MIN_MULTIPLE)
* WINDOW_MIN_MULTIPLE,
)
window = Window(
storage=dataset.storage,
group=group,
name=str(scene_idx),
projection=scene_data.projection,
bounds=scene_data.bounds,
bounds=padded_bounds,
time_range=scene_data.time_range,
)
window.save()
Expand Down Expand Up @@ -305,7 +323,11 @@ def get_vessel_detections(

# Run object detector.
with time_operation(TimerOperations.RunModelPredict):
run_model_predict(DETECT_MODEL_CONFIG, ds_path)
run_model_predict(
DETECT_MODEL_CONFIG,
ds_path,
extra_args=["--data.init_args.num_workers", str(NUM_DATA_LOADER_WORKERS)],
)

# Read the detections.
detections: list[VesselDetection] = []
Expand Down
Loading