From 07c01a89c427453395efa1ca5366531705221712 Mon Sep 17 00:00:00 2001 From: Martastain Date: Thu, 31 Oct 2024 13:23:32 +0100 Subject: [PATCH] feat: list s3 files --- ayon_server/files/common.py | 4 ++ ayon_server/files/project_storage.py | 56 ++++++++++++++++------------ ayon_server/files/s3.py | 55 +++++++++++++++++++++++++++ ayon_server/files/utils.py | 13 +++++++ 4 files changed, 104 insertions(+), 24 deletions(-) create mode 100644 ayon_server/files/common.py create mode 100644 ayon_server/files/utils.py diff --git a/ayon_server/files/common.py b/ayon_server/files/common.py new file mode 100644 index 00000000..99ad05a5 --- /dev/null +++ b/ayon_server/files/common.py @@ -0,0 +1,4 @@ +from typing import Literal + +StorageType = Literal["local", "s3"] +FileGroup = Literal["uploads", "thumbnails"] diff --git a/ayon_server/files/project_storage.py b/ayon_server/files/project_storage.py index 339d433e..8d378ee3 100644 --- a/ayon_server/files/project_storage.py +++ b/ayon_server/files/project_storage.py @@ -1,6 +1,6 @@ import os import time -from typing import Any, Literal +from typing import Any import aiocache import aiofiles @@ -9,6 +9,7 @@ from fastapi import Request from fastapi.responses import RedirectResponse from nxtools import log_traceback, logging +from typing_extensions import AsyncGenerator from ayon_server.api.files import handle_upload from ayon_server.config import ayonconfig @@ -18,6 +19,7 @@ delete_s3_file, get_signed_url, handle_s3_upload, + list_s3_files, retrieve_s3_file, store_s3_file, upload_s3_file, @@ -27,8 +29,8 @@ from ayon_server.helpers.project_list import ProjectListItem, get_project_info from ayon_server.lib.postgres import Postgres -StorageType = Literal["local", "s3"] -FileGroup = Literal["uploads", "thumbnails"] +from .common import FileGroup, StorageType +from .utils import list_local_files class ProjectStorage: @@ -95,6 +97,19 @@ async def get_root(self) -> str: # Common file management methods + async def get_filegroup_dir(self, file_group: FileGroup) -> str: + assert file_group in ["uploads", "thumbnails"], "Invalid file group" + root = await self.get_root() + project_dirname = self.project_name + if self.storage_type == "s3": + if self.project_info is None: + self.project_info = await get_project_info(self.project_name) + assert self.project_info # mypy + + project_timestamp = int(self.project_info.created_at.timestamp()) + project_dirname = f"{self.project_name}.{project_timestamp}" + return os.path.join(root, project_dirname, file_group) + async def get_path( self, file_id: str, @@ -106,28 +121,17 @@ async def get_path( path from the bucket), while in the case of local storage, it's the full path to the file on the disk. """ - root = await self.get_root() - assert file_group in ["uploads", "thumbnails"], "Invalid file group" _file_id = file_id.replace("-", "") if len(_file_id) != 32: raise ValueError(f"Invalid file ID: {file_id}") - project_dirname = self.project_name - if self.storage_type == "s3": - if self.project_info is None: - self.project_info = await get_project_info(self.project_name) - assert self.project_info # mypy - - project_timestamp = int(self.project_info.created_at.timestamp()) - project_dirname = f"{self.project_name}.{project_timestamp}" + file_group_dir = await self.get_filegroup_dir(file_group) # Take first two characters of the file ID as a subdirectory # to avoid having too many files in a single directory sub_dir = _file_id[:2] return os.path.join( - root, - project_dirname, - file_group, + file_group_dir, sub_dir, _file_id, ) @@ -424,19 +428,23 @@ async def trash(self) -> None: pass # Listing stored files - - async def list_files(self, file_group: FileGroup = "uploads") -> list[str]: + # + async def list_files( + self, file_group: FileGroup = "uploads" + ) -> AsyncGenerator[str, None]: """List all files in the storage for the project""" - files = [] if self.storage_type == "local": projects_root = await self.get_root() project_dir = os.path.join(projects_root, self.project_name) group_dir = os.path.join(project_dir, file_group) + if not os.path.isdir(group_dir): - return [] + return - for root, _, filenames in os.walk(os.path.join(project_dir, file_group)): - for filename in filenames: - files.append(filename) + async for f in list_local_files(group_dir): + yield f + elif self.storage_type == "s3": + async for f in list_s3_files(self, file_group): + yield f - return files + return diff --git a/ayon_server/files/s3.py b/ayon_server/files/s3.py index d0b97d28..e9e23134 100644 --- a/ayon_server/files/s3.py +++ b/ayon_server/files/s3.py @@ -9,6 +9,9 @@ from nxtools import logging from pydantic import BaseModel, Field from starlette.concurrency import run_in_threadpool +from typing_extensions import AsyncGenerator + +from .common import FileGroup if TYPE_CHECKING: from ayon_server.files.project_storage import ProjectStorage @@ -109,6 +112,58 @@ async def delete_s3_file(storage: "ProjectStorage", key: str): await run_in_threadpool(_delete_s3_file, storage, key) +class FileIterator: + def __init__(self, storage: "ProjectStorage", file_group: FileGroup): + self.storage = storage + self.file_group: FileGroup = file_group + + def _init_iterator(self, prefix: str) -> None: + paginator = self.client.get_paginator("list_objects_v2") + page_iterator = paginator.paginate( + Bucket=self.storage.bucket_name, + Prefix=prefix, + PaginationConfig={"PageSize": 1000}, + ) + self.iterator = page_iterator.__iter__() + + async def init_iterator(self): + self.client = await get_s3_client(self.storage) + prefix = await self.storage.get_filegroup_dir(self.file_group) + await run_in_threadpool(self._init_iterator, prefix) + + def _next(self): + try: + page = next(self.iterator) + except StopIteration: + return None + return [obj["Key"] for obj in page.get("Contents", [])] + + async def next(self): + return await run_in_threadpool(self._next) + + async def __aiter__(self): + while True: + try: + contents = await self.next() + if not contents: + break + for obj in contents: + yield obj + except StopIteration: + break + + +async def list_s3_files( + storage: "ProjectStorage", file_group: FileGroup +) -> AsyncGenerator[str, None]: + assert file_group in ["uploads", "thumbnails"], "Invalid file group" + file_iterator = FileIterator(storage, file_group) + await file_iterator.init_iterator() + async for key in file_iterator: + fname = key.split("/")[-1] + yield fname + + # Multipart upload to S3 with async queue # Used for larger files diff --git a/ayon_server/files/utils.py b/ayon_server/files/utils.py new file mode 100644 index 00000000..3080482b --- /dev/null +++ b/ayon_server/files/utils.py @@ -0,0 +1,13 @@ +from typing import AsyncGenerator + +import aiofiles.os + + +async def list_local_files(root: str) -> AsyncGenerator[str, None]: + records = await aiofiles.os.scandir(root) + for rec in records: + if rec.is_dir(): + async for file in list_local_files(rec.path): + yield file + else: + yield rec.name