Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drone Image Processing using Open Drone Map (ODM) #227

Merged
merged 22 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/backend/app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ def assemble_db_connection(cls, v: Optional[str], info: ValidationInfo) -> Any:
EMAILS_FROM_EMAIL: Optional[EmailStr] = None
EMAILS_FROM_NAME: Optional[str] = "Drone Tasking Manager"

NODE_ODM_URL: Optional[str] = "http://odm-api:3000"

@computed_field
@property
def emails_enabled(self) -> bool:
Expand Down
155 changes: 155 additions & 0 deletions src/backend/app/projects/image_processing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import os
import uuid
import tempfile
import shutil
from pyodm import Node
from app.s3 import get_file_from_bucket, list_objects_from_bucket, add_file_to_bucket
from loguru import logger as log
from concurrent.futures import ThreadPoolExecutor


class DroneImageProcessor:
def __init__(
self,
node_odm_url: str,
project_id: uuid.UUID,
task_id: uuid.UUID,
):
"""
Initializes the connection to the ODM node.
"""
# self.node = Node(node_odm_host, node_odm_port)
self.node = Node.from_url(node_odm_url)
self.project_id = project_id
self.task_id = task_id

def options_list_to_dict(self, options=[]):
"""
Converts options formatted as a list ([{'name': optionName, 'value': optionValue}, ...])
to a dictionary {optionName: optionValue, ...}
"""
opts = {}
if options is not None:
for o in options:
opts[o["name"]] = o["value"]
return opts

def download_object(self, bucket_name: str, obj, images_folder: str):
if obj.object_name.endswith((".jpg", ".jpeg", ".JPG", ".png", ".PNG")):
# log.info(f"Downloading image from s3 {obj.object_name}")
local_path = f"{images_folder}/{os.path.basename(obj.object_name)}"
os.makedirs(os.path.dirname(local_path), exist_ok=True)
nrjadkry marked this conversation as resolved.
Show resolved Hide resolved
get_file_from_bucket(bucket_name, obj.object_name, local_path)

def download_images_from_minio(self, bucket_name, local_dir):
nrjadkry marked this conversation as resolved.
Show resolved Hide resolved
"""
Downloads images from MinIO under the specified path.

:param bucket_name: Name of the MinIO bucket.
:param project_id: The project UUID.
:param task_id: The task UUID.
:param local_dir: Local directory to save the images.
:return: List of local image file paths.
"""
prefix = f"projects/{self.project_id}/{self.task_id}"

objects = list_objects_from_bucket(bucket_name, prefix)

# Process images concurrently
with ThreadPoolExecutor() as executor:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend asyncio over threading for this type of downloading onto a filesystem.

asyncio.gather would work nicely and be less blocking (using a single thread via coroutines).

Example from FMTM:

https://github.com/hotosm/fmtm/blob/35e7240cc78d62c288f0ea60aef0eb7bde8beaed/src/backend/app/central/central_crud.py#L560-L570

But not this would mean you need to update functions to async and use await

executor.map(
lambda obj: self.download_object(bucket_name, obj, local_dir),
objects,
)

def list_images(self, directory):
"""
Lists all images in the specified directory.

:param directory: The directory containing the images.
:return: List of image file paths.
"""
images = []
for root, dirs, files in os.walk(directory):
nrjadkry marked this conversation as resolved.
Show resolved Hide resolved
for file in files:
if file.endswith((".jpg", ".jpeg", ".JPG", ".png", ".PNG")):
images.append(os.path.join(root, file))
return images

def process_new_task(self, images, name=None, options=[], progress_callback=None):
"""
Sends a set of images via the API to start processing.

:param images: List of image file paths.
:param name: Name of the task.
:param options: Processing options ([{'name': optionName, 'value': optionValue}, ...]).
:param progress_callback: Callback function to report upload progress.
:return: The created task object.
"""
opts = self.options_list_to_dict(options)

# FIXME: take this from the function above
opts = {"dsm": True}

task = self.node.create_task(images, opts, name, progress_callback)
return task

def monitor_task(self, task):
"""
Monitors the task progress until completion.

:param task: The task object.
"""
log.info(f"Monitoring task {task.uuid}...")
task.wait_for_completion(interval=5)
log.info("Task completed.")
return task

def download_results(self, task, output_path):
"""
Downloads all results of the task to the specified output path.

:param task: The task object.
:param output_path: The directory where results will be saved.
"""
log.info(f"Downloading results to {output_path}...")
path = task.download_zip(output_path)
log.info("Download completed.")
return path

def process_images_from_s3(self, bucket_name, name=None, options=[]):
"""
Processes images from MinIO storage.

:param bucket_name: Name of the MinIO bucket.
:param project_id: The project UUID.
:param task_id: The task UUID.
:param name: Name of the task.
:param options: Processing options ([{'name': optionName, 'value': optionValue}, ...]).
:return: The task object.
"""
# Create a temporary directory to store downloaded images
temp_dir = tempfile.mkdtemp()
try:
self.download_images_from_minio(bucket_name, temp_dir)

images_list = self.list_images(temp_dir)

# Start a new processing task
task = self.process_new_task(images_list, name=name, options=options)
# Monitor task progress
self.monitor_task(task)

# Optionally, download results
output_file_path = f"/tmp/{self.project_id}"
path_to_download = self.download_results(task, output_path=output_file_path)

# Upload the results into s3
s3_path = f"projects/{self.project_id}/{self.task_id}/assets.zip"
add_file_to_bucket(bucket_name, path_to_download, s3_path)
return task

finally:
# Clean up temporary directory
shutil.rmtree(temp_dir)
pass
16 changes: 16 additions & 0 deletions src/backend/app/projects/project_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from io import BytesIO
from app.s3 import add_obj_to_bucket
from app.config import settings
from app.projects.image_processing import DroneImageProcessor


async def upload_file_to_s3(
Expand Down Expand Up @@ -155,3 +156,18 @@ async def preview_split_by_square(boundary: str, meters: int):
meters=meters,
)
)


def process_drone_images(project_id: uuid.UUID, task_id: uuid.UUID):
# Initialize the processor
processor = DroneImageProcessor(settings.NODE_ODM_URL, project_id, task_id)

# Define processing options
options = [
{"name": "dsm", "value": True},
{"name": "orthophoto-resolution", "value": 5},
]

processor.process_images_from_s3(
settings.S3_BUCKET_NAME, name=f"DTM-Task-{task_id}", options=options
)
16 changes: 16 additions & 0 deletions src/backend/app/projects/project_routes.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import uuid
from typing import Annotated, Optional
from uuid import UUID
import geojson
Expand All @@ -13,6 +14,7 @@
File,
Form,
Response,
BackgroundTasks,
)
from geojson_pydantic import FeatureCollection
from loguru import logger as log
Expand All @@ -28,6 +30,7 @@
from app.users.user_schemas import AuthUser
from app.tasks import task_schemas


router = APIRouter(
prefix=f"{settings.API_PREFIX}/projects",
responses={404: {"description": "Not found"}},
Expand Down Expand Up @@ -295,3 +298,16 @@ async def read_project(
):
"""Get a specific project and all associated tasks by ID."""
return project


@router.post("/process_imagery/{project_id}/{task_id}/")
async def process_imagery(
task_id: uuid.UUID,
project: Annotated[
project_schemas.DbProject, Depends(project_deps.get_project_by_id)
],
user_data: Annotated[AuthUser, Depends(login_required)],
background_tasks: BackgroundTasks,
):
background_tasks.add_task(project_logic.process_drone_images, project.id, task_id)
return {"message": "Processing started"}
15 changes: 14 additions & 1 deletion src/backend/app/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def add_file_to_bucket(bucket_name: str, file_path: str, s3_path: str):
s3_path = f"/{s3_path}"

client = s3_client()
client.fput_object(bucket_name, file_path, s3_path)
client.fput_object(bucket_name, s3_path, file_path)


def add_obj_to_bucket(
Expand Down Expand Up @@ -167,3 +167,16 @@ def get_image_dir_url(bucket_name: str, image_dir: str):

except Exception as e:
log.error(f"Error checking directory existence: {str(e)}")


def list_objects_from_bucket(bucket_name: str, prefix: str):
"""List all objects in a bucket with a specified prefix.
Args:
bucket_name (str): The name of the S3 bucket.
prefix (str): The prefix to filter objects by.
Returns:
list: A list of objects in the bucket with the specified prefix.
"""
client = s3_client()
objects = client.list_objects(bucket_name, prefix=prefix, recursive=True)
return objects
19 changes: 11 additions & 8 deletions src/backend/app/waypoints/waypoint_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,19 @@ async def get_task_waypoint(

if project.is_terrain_follow:
dem_path = f"/tmp/{uuid.uuid4()}/dem.tif"
get_file_from_bucket(
settings.S3_BUCKET_NAME, f"projects/{project_id}/dem.tif", dem_path
)
try:
get_file_from_bucket(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you rightly say below, once everything is working and we are happy, this definitely needs to be changed getting elevation data by querying the cloud optimised DEM. A DEM can be quite large, so downloading every time we process imagery is very inefficient

settings.S3_BUCKET_NAME, f"projects/{project_id}/dem.tif", dem_path
)
# TODO: Do this with inmemory data
outfile_with_elevation = "/tmp/output_file_with_elevation.geojson"
add_elevation_from_dem(dem_path, points, outfile_with_elevation)

# TODO: Do this with inmemory data
outfile_with_elevation = "/tmp/output_file_with_elevation.geojson"
add_elevation_from_dem(dem_path, points, outfile_with_elevation)
inpointsfile = open(outfile_with_elevation, "r")
points_with_elevation = inpointsfile.read()

inpointsfile = open(outfile_with_elevation, "r")
points_with_elevation = inpointsfile.read()
except Exception:
points_with_elevation = points

placemarks = create_placemarks(geojson.loads(points_with_elevation), parameters)
else:
Expand Down
29 changes: 28 additions & 1 deletion src/backend/pdm.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ dependencies = [
"python-slugify>=8.0.4",
"drone-flightplan==0.3.1rc4",
"psycopg2>=2.9.9",
"pyodm>=1.5.11",
]
requires-python = ">=3.11"
license = {text = "GPL-3.0-only"}
Expand Down