diff --git a/src/backend/app/config.py b/src/backend/app/config.py index 6144ceb9..e795971b 100644 --- a/src/backend/app/config.py +++ b/src/backend/app/config.py @@ -105,6 +105,8 @@ def assemble_db_connection(cls, v: Optional[str], info: ValidationInfo) -> Any: EMAILS_FROM_EMAIL: Optional[EmailStr] = None EMAILS_FROM_NAME: Optional[str] = "Drone Tasking Manager" + NODE_ODM_URL: Optional[str] = "http://odm-api:3000" + @computed_field @property def emails_enabled(self) -> bool: diff --git a/src/backend/app/projects/image_processing.py b/src/backend/app/projects/image_processing.py new file mode 100644 index 00000000..09215ec8 --- /dev/null +++ b/src/backend/app/projects/image_processing.py @@ -0,0 +1,155 @@ +import uuid +import tempfile +import shutil +from pathlib import Path +from pyodm import Node +from app.s3 import get_file_from_bucket, list_objects_from_bucket, add_file_to_bucket +from loguru import logger as log +from concurrent.futures import ThreadPoolExecutor + + +class DroneImageProcessor: + def __init__( + self, + node_odm_url: str, + project_id: uuid.UUID, + task_id: uuid.UUID, + ): + """ + Initializes the connection to the ODM node. + """ + # self.node = Node(node_odm_host, node_odm_port) + self.node = Node.from_url(node_odm_url) + self.project_id = project_id + self.task_id = task_id + + def options_list_to_dict(self, options=[]): + """ + Converts options formatted as a list ([{'name': optionName, 'value': optionValue}, ...]) + to a dictionary {optionName: optionValue, ...} + """ + opts = {} + if options is not None: + for o in options: + opts[o["name"]] = o["value"] + return opts + + def download_object(self, bucket_name: str, obj, images_folder: str): + if obj.object_name.endswith((".jpg", ".jpeg", ".JPG", ".png", ".PNG")): + local_path = Path(images_folder) / Path(obj.object_name).name + local_path.parent.mkdir(parents=True, exist_ok=True) + get_file_from_bucket(bucket_name, obj.object_name, local_path) + + def download_images_from_s3(self, bucket_name, local_dir): + """ + Downloads images from MinIO under the specified path. + + :param bucket_name: Name of the MinIO bucket. + :param project_id: The project UUID. + :param task_id: The task UUID. + :param local_dir: Local directory to save the images. + :return: List of local image file paths. + """ + prefix = f"projects/{self.project_id}/{self.task_id}" + + objects = list_objects_from_bucket(bucket_name, prefix) + + # Process images concurrently + with ThreadPoolExecutor() as executor: + executor.map( + lambda obj: self.download_object(bucket_name, obj, local_dir), + objects, + ) + + def list_images(self, directory): + """ + Lists all images in the specified directory. + + :param directory: The directory containing the images. + :return: List of image file paths. + """ + images = [] + path = Path(directory) + + for file in path.rglob("*"): + if file.suffix.lower() in {".jpg", ".jpeg", ".png"}: + images.append(str(file)) + return images + + def process_new_task(self, images, name=None, options=[], progress_callback=None): + """ + Sends a set of images via the API to start processing. + + :param images: List of image file paths. + :param name: Name of the task. + :param options: Processing options ([{'name': optionName, 'value': optionValue}, ...]). + :param progress_callback: Callback function to report upload progress. + :return: The created task object. + """ + opts = self.options_list_to_dict(options) + + # FIXME: take this from the function above + opts = {"dsm": True} + + task = self.node.create_task(images, opts, name, progress_callback) + return task + + def monitor_task(self, task): + """ + Monitors the task progress until completion. + + :param task: The task object. + """ + log.info(f"Monitoring task {task.uuid}...") + task.wait_for_completion(interval=5) + log.info("Task completed.") + return task + + def download_results(self, task, output_path): + """ + Downloads all results of the task to the specified output path. + + :param task: The task object. + :param output_path: The directory where results will be saved. + """ + log.info(f"Downloading results to {output_path}...") + path = task.download_zip(output_path) + log.info("Download completed.") + return path + + def process_images_from_s3(self, bucket_name, name=None, options=[]): + """ + Processes images from MinIO storage. + + :param bucket_name: Name of the MinIO bucket. + :param project_id: The project UUID. + :param task_id: The task UUID. + :param name: Name of the task. + :param options: Processing options ([{'name': optionName, 'value': optionValue}, ...]). + :return: The task object. + """ + # Create a temporary directory to store downloaded images + temp_dir = tempfile.mkdtemp() + try: + self.download_images_from_s3(bucket_name, temp_dir) + + images_list = self.list_images(temp_dir) + + # Start a new processing task + task = self.process_new_task(images_list, name=name, options=options) + # Monitor task progress + self.monitor_task(task) + + # Optionally, download results + output_file_path = f"/tmp/{self.project_id}" + path_to_download = self.download_results(task, output_path=output_file_path) + + # Upload the results into s3 + s3_path = f"projects/{self.project_id}/{self.task_id}/assets.zip" + add_file_to_bucket(bucket_name, path_to_download, s3_path) + return task + + finally: + # Clean up temporary directory + shutil.rmtree(temp_dir) + pass diff --git a/src/backend/app/projects/project_logic.py b/src/backend/app/projects/project_logic.py index 81bc0e6b..ade9e432 100644 --- a/src/backend/app/projects/project_logic.py +++ b/src/backend/app/projects/project_logic.py @@ -9,8 +9,16 @@ import shapely.wkb as wkblib from shapely.geometry import shape from io import BytesIO -from app.s3 import add_obj_to_bucket +from app.s3 import ( + add_obj_to_bucket, + list_objects_from_bucket, + get_presigned_url, + get_object_metadata, +) from app.config import settings +from app.projects.image_processing import DroneImageProcessor +from app.projects import project_schemas +from minio import S3Error async def upload_file_to_s3( @@ -155,3 +163,68 @@ async def preview_split_by_square(boundary: str, meters: int): meters=meters, ) ) + + +def process_drone_images(project_id: uuid.UUID, task_id: uuid.UUID): + # Initialize the processor + processor = DroneImageProcessor(settings.NODE_ODM_URL, project_id, task_id) + + # Define processing options + options = [ + {"name": "dsm", "value": True}, + {"name": "orthophoto-resolution", "value": 5}, + ] + + processor.process_images_from_s3( + settings.S3_BUCKET_NAME, name=f"DTM-Task-{task_id}", options=options + ) + + +async def get_project_info_from_s3(project_id: uuid.UUID, task_id: uuid.UUID): + """ + Helper function to get the number of images and the URL to download the assets. + """ + try: + # Prefix for the images + images_prefix = f"projects/{project_id}/{task_id}/images/" + + # List and count the images + objects = list_objects_from_bucket( + settings.S3_BUCKET_NAME, prefix=images_prefix + ) + image_extensions = (".jpg", ".jpeg", ".png", ".tif", ".tiff") + image_count = sum( + 1 for obj in objects if obj.object_name.lower().endswith(image_extensions) + ) + + # Generate a presigned URL for the assets ZIP file + try: + # Check if the object exists + assets_path = f"processed/{project_id}/{task_id}/assets.zip" + get_object_metadata(settings.S3_BUCKET_NAME, assets_path) + + # If it exists, generate the presigned URL + presigned_url = get_presigned_url( + settings.S3_BUCKET_NAME, assets_path, expires=3600 + ) + except S3Error as e: + if e.code == "NoSuchKey": + # The object does not exist + log.info( + f"Assets ZIP file not found for project {project_id}, task {task_id}." + ) + presigned_url = None + else: + # An unexpected error occurred + log.error(f"An error occurred while accessing assets file: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + return project_schemas.AssetsInfo( + project_id=str(project_id), + task_id=str(task_id), + image_count=image_count, + assets_url=presigned_url, + ) + except Exception as e: + log.exception(f"An error occurred while retrieving assets info: {e}") + raise HTTPException(status_code=500, detail=str(e)) diff --git a/src/backend/app/projects/project_routes.py b/src/backend/app/projects/project_routes.py index 29c70f12..6a5b9bd0 100644 --- a/src/backend/app/projects/project_routes.py +++ b/src/backend/app/projects/project_routes.py @@ -1,4 +1,5 @@ import os +import uuid from typing import Annotated, Optional from uuid import UUID import geojson @@ -13,6 +14,7 @@ File, Form, Response, + BackgroundTasks, ) from geojson_pydantic import FeatureCollection from loguru import logger as log @@ -28,6 +30,7 @@ from app.users.user_schemas import AuthUser from app.tasks import task_schemas + router = APIRouter( prefix=f"{settings.API_PREFIX}/projects", responses={404: {"description": "Not found"}}, @@ -295,3 +298,33 @@ async def read_project( ): """Get a specific project and all associated tasks by ID.""" return project + + +@router.post("/process_imagery/{project_id}/{task_id}/", tags=["Image Processing"]) +async def process_imagery( + task_id: uuid.UUID, + project: Annotated[ + project_schemas.DbProject, Depends(project_deps.get_project_by_id) + ], + user_data: Annotated[AuthUser, Depends(login_required)], + background_tasks: BackgroundTasks, +): + background_tasks.add_task(project_logic.process_drone_images, project.id, task_id) + return {"message": "Processing started"} + + +@router.get( + "/assets/{project_id}/{task_id}/", + tags=["Image Processing"], + response_model=project_schemas.AssetsInfo, +) +async def get_assets_info( + project: Annotated[ + project_schemas.DbProject, Depends(project_deps.get_project_by_id) + ], + task_id: uuid.UUID, +): + """ + Endpoint to get the number of images and the URL to download the assets for a given project and task. + """ + return await project_logic.get_project_info_from_s3(project.id, task_id) diff --git a/src/backend/app/projects/project_schemas.py b/src/backend/app/projects/project_schemas.py index c49f746d..be52d3c7 100644 --- a/src/backend/app/projects/project_schemas.py +++ b/src/backend/app/projects/project_schemas.py @@ -457,3 +457,10 @@ class PresignedUrlRequest(BaseModel): task_id: uuid.UUID image_name: List[str] expiry: int # Expiry time in hours + + +class AssetsInfo(BaseModel): + project_id: str + task_id: str + image_count: int + assets_url: Optional[str] diff --git a/src/backend/app/s3.py b/src/backend/app/s3.py index 39a55a90..68aaf6d7 100644 --- a/src/backend/app/s3.py +++ b/src/backend/app/s3.py @@ -52,7 +52,7 @@ def add_file_to_bucket(bucket_name: str, file_path: str, s3_path: str): s3_path = f"/{s3_path}" client = s3_client() - client.fput_object(bucket_name, file_path, s3_path) + client.fput_object(bucket_name, s3_path, file_path) def add_obj_to_bucket( @@ -167,3 +167,46 @@ def get_image_dir_url(bucket_name: str, image_dir: str): except Exception as e: log.error(f"Error checking directory existence: {str(e)}") + + +def list_objects_from_bucket(bucket_name: str, prefix: str): + """List all objects in a bucket with a specified prefix. + Args: + bucket_name (str): The name of the S3 bucket. + prefix (str): The prefix to filter objects by. + Returns: + list: A list of objects in the bucket with the specified prefix. + """ + client = s3_client() + objects = client.list_objects(bucket_name, prefix=prefix, recursive=True) + return objects + + +def get_presigned_url(bucket_name: str, object_name: str, expires: int = 3600): + """Generate a presigned URL for an object in an S3 bucket. + + Args: + bucket_name (str): The name of the S3 bucket. + object_name (str): The name of the object in the bucket. + expires (int, optional): The time in seconds until the URL expires. + Defaults to 3600. + + Returns: + str: The presigned URL to access the object. + """ + client = s3_client() + return client.presigned_get_object(bucket_name, object_name, expires=expires) + + +def get_object_metadata(bucket_name: str, object_name: str): + """Get object metadata from an S3 bucket. + + Args: + bucket_name (str): The name of the S3 bucket. + object_name (str): The name of the object in the bucket. + + Returns: + dict: A dictionary containing metadata about the object. + """ + client = s3_client() + return client.stat_object(bucket_name, object_name) diff --git a/src/backend/app/waypoints/waypoint_routes.py b/src/backend/app/waypoints/waypoint_routes.py index af795fb2..fa247b32 100644 --- a/src/backend/app/waypoints/waypoint_routes.py +++ b/src/backend/app/waypoints/waypoint_routes.py @@ -105,16 +105,19 @@ async def get_task_waypoint( if project.is_terrain_follow: dem_path = f"/tmp/{uuid.uuid4()}/dem.tif" - get_file_from_bucket( - settings.S3_BUCKET_NAME, f"projects/{project_id}/dem.tif", dem_path - ) + try: + get_file_from_bucket( + settings.S3_BUCKET_NAME, f"projects/{project_id}/dem.tif", dem_path + ) + # TODO: Do this with inmemory data + outfile_with_elevation = "/tmp/output_file_with_elevation.geojson" + add_elevation_from_dem(dem_path, points, outfile_with_elevation) - # TODO: Do this with inmemory data - outfile_with_elevation = "/tmp/output_file_with_elevation.geojson" - add_elevation_from_dem(dem_path, points, outfile_with_elevation) + inpointsfile = open(outfile_with_elevation, "r") + points_with_elevation = inpointsfile.read() - inpointsfile = open(outfile_with_elevation, "r") - points_with_elevation = inpointsfile.read() + except Exception: + points_with_elevation = points placemarks = create_placemarks(geojson.loads(points_with_elevation), parameters) else: diff --git a/src/backend/pdm.lock b/src/backend/pdm.lock index b031501c..96339e9a 100644 --- a/src/backend/pdm.lock +++ b/src/backend/pdm.lock @@ -363,6 +363,16 @@ version = "2.9.0" requires_python = ">=3.8" summary = "JSON Web Token implementation in Python" +[[package]] +name = "pyodm" +version = "1.5.11" +summary = "Python SDK for OpenDroneMap" +dependencies = [ + "requests", + "requests-toolbelt", + "urllib3", +] + [[package]] name = "pyproj" version = "3.6.1" @@ -415,6 +425,15 @@ dependencies = [ "requests>=2.0.0", ] +[[package]] +name = "requests-toolbelt" +version = "1.0.0" +requires_python = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" +summary = "A utility belt for advanced users of python-requests" +dependencies = [ + "requests<3.0.0,>=2.0.1", +] + [[package]] name = "shapely" version = "2.0.5" @@ -492,7 +511,7 @@ summary = "A small Python utility to set file creation time on Windows" lock_version = "4.2" cross_platform = true groups = ["default"] -content_hash = "sha256:7c804d9b1dbeb05a25f788f88ea8652b2fee18316cd160c81ed67840aecdf6f2" +content_hash = "sha256:34e737b7b5f07d56925b5b8892bcdee256dfe9cec5d8986c30772ecccb70e028" [metadata.files] "aiosmtplib 3.0.2" = [ @@ -1143,6 +1162,10 @@ content_hash = "sha256:7c804d9b1dbeb05a25f788f88ea8652b2fee18316cd160c81ed67840a {url = "https://files.pythonhosted.org/packages/79/84/0fdf9b18ba31d69877bd39c9cd6052b47f3761e9910c15de788e519f079f/PyJWT-2.9.0-py3-none-any.whl", hash = "sha256:3b02fb0f44517787776cf48f2ae25d8e14f300e6d7545a4315cee571a415e850"}, {url = "https://files.pythonhosted.org/packages/fb/68/ce067f09fca4abeca8771fe667d89cc347d1e99da3e093112ac329c6020e/pyjwt-2.9.0.tar.gz", hash = "sha256:7e1e5b56cc735432a7369cbfa0efe50fa113ebecdc04ae6922deba8b84582d0c"}, ] +"pyodm 1.5.11" = [ + {url = "https://files.pythonhosted.org/packages/41/d8/186ee7b2a95f7b8e50f2068e152f5b3a58d838fea90eabde0785c79f32c5/pyodm-1.5.11-py3-none-any.whl", hash = "sha256:c0b9a4358db12f2a84a4d13533b9a3ea4c63419d6518420ba7a2ab32842294b8"}, + {url = "https://files.pythonhosted.org/packages/52/99/53e514b55916ef91c2b6f8f611c0e03465caa9e5a17c742056b543f0c8ee/pyodm-1.5.11.tar.gz", hash = "sha256:bb97710171bfaee92e145bd97b1ac77db8beef580b89d6585a622ff6fb424c53"}, +] "pyproj 3.6.1" = [ {url = "https://files.pythonhosted.org/packages/0b/64/93232511a7906a492b1b7dfdfc17f4e95982d76a24ef4f86d18cfe7ae2c9/pyproj-3.6.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:1e9fbaf920f0f9b4ee62aab832be3ae3968f33f24e2e3f7fbb8c6728ef1d9746"}, {url = "https://files.pythonhosted.org/packages/0e/ab/1c2159ec757677c5a6b8803f6be45c2b550dc42c84ec4a228dc219849bbb/pyproj-3.6.1-cp312-cp312-win32.whl", hash = "sha256:2d6ff73cc6dbbce3766b6c0bce70ce070193105d8de17aa2470009463682a8eb"}, @@ -1192,6 +1215,10 @@ content_hash = "sha256:7c804d9b1dbeb05a25f788f88ea8652b2fee18316cd160c81ed67840a {url = "https://files.pythonhosted.org/packages/3b/5d/63d4ae3b9daea098d5d6f5da83984853c1bbacd5dc826764b249fe119d24/requests_oauthlib-2.0.0-py2.py3-none-any.whl", hash = "sha256:7dd8a5c40426b779b0868c404bdef9768deccf22749cde15852df527e6269b36"}, {url = "https://files.pythonhosted.org/packages/42/f2/05f29bc3913aea15eb670be136045bf5c5bbf4b99ecb839da9b422bb2c85/requests-oauthlib-2.0.0.tar.gz", hash = "sha256:b3dffaebd884d8cd778494369603a9e7b58d29111bf6b41bdc2dcd87203af4e9"}, ] +"requests-toolbelt 1.0.0" = [ + {url = "https://files.pythonhosted.org/packages/3f/51/d4db610ef29373b879047326cbf6fa98b6c1969d6f6dc423279de2b1be2c/requests_toolbelt-1.0.0-py2.py3-none-any.whl", hash = "sha256:cccfdd665f0a24fcf4726e690f65639d272bb0637b9b92dfd91a5568ccf6bd06"}, + {url = "https://files.pythonhosted.org/packages/f3/61/d7545dafb7ac2230c70d38d31cbfe4cc64f7144dc41f6e4e4b78ecd9f5bb/requests-toolbelt-1.0.0.tar.gz", hash = "sha256:7681a0a3d047012b5bdc0ee37d7f8f07ebe76ab08caeccfc3921ce23c88d5bc6"}, +] "shapely 2.0.5" = [ {url = "https://files.pythonhosted.org/packages/04/df/8062f14cb7aa502b8bda358103facedc80b87eec41e3391182655ff40615/shapely-2.0.5-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:03bd7b5fa5deb44795cc0a503999d10ae9d8a22df54ae8d4a4cd2e8a93466195"}, {url = "https://files.pythonhosted.org/packages/13/56/11150c625bc984e9395913a255f52cf6c7de85b98396339cee66119481d4/shapely-2.0.5-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:8af6f7260f809c0862741ad08b1b89cb60c130ae30efab62320bbf4ee9cc71fa"}, diff --git a/src/backend/pyproject.toml b/src/backend/pyproject.toml index b3f5e487..518303c3 100644 --- a/src/backend/pyproject.toml +++ b/src/backend/pyproject.toml @@ -32,6 +32,7 @@ dependencies = [ "python-slugify>=8.0.4", "drone-flightplan==0.3.1rc4", "psycopg2>=2.9.9", + "pyodm>=1.5.11", ] requires-python = ">=3.11" license = {text = "GPL-3.0-only"}