Skip to content

Commit

Permalink
feat: Unified File Management API (#6100)
Browse files Browse the repository at this point in the history
* feat: FIrst pass at file management API

* [autofix.ci] apply automated fixes

* Add delete and edit endpoints

* [autofix.ci] apply automated fixes

* Add file size and duplicate name handling

* Ensure the File model has a unique name

* Ensure count is before extension

* [autofix.ci] apply automated fixes

* Add the correct path to the return

* Added function to handle list of paths in File component

* [autofix.ci] apply automated fixes

* Update input_mixin.py

* Refactor to a v2 endpoint

* Add unit tests

* Update test_files.py

* Update frontend.ts

* [autofix.ci] apply automated fixes

* Remove extension from name

* Cast the string type for like

* Update files.py

* Update base.py

* Update base.py

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Lucas Oliveira <lucas.edu.oli@hotmail.com>
  • Loading branch information
3 people authored Feb 6, 2025
1 parent cc3417b commit 28e07be
Show file tree
Hide file tree
Showing 18 changed files with 635 additions and 15 deletions.
1 change: 1 addition & 0 deletions scripts/aws/lib/construct/frontend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ export class Web extends Construct {
defaultBehavior: { origin: s3SpaOrigin },
additionalBehaviors: {
'/api/v1/*': albBehaviorOptions,
'/api/v2/*': albBehaviorOptions,
'/health' : albBehaviorOptions,
},
enableLogging: true, // ログ出力設定
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""Add V2 File Table
Revision ID: dd9e0804ebd1
Revises: e3162c1804e6
Create Date: 2025-02-03 11:47:16.101523
"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa
import sqlmodel
from langflow.utils import migration


# revision identifiers, used by Alembic.
revision: str = 'dd9e0804ebd1'
down_revision: Union[str, None] = 'e3162c1804e6'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
conn = op.get_bind()
if not migration.table_exists("file", conn):
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"file",
sa.Column("id", sqlmodel.sql.sqltypes.types.Uuid(), nullable=False),
sa.Column("user_id", sqlmodel.sql.sqltypes.types.Uuid(), nullable=False),
sa.Column("name", sqlmodel.sql.sqltypes.AutoString(), nullable=False, unique=True),
sa.Column("path", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column("size", sa.Integer(), nullable=False),
sa.Column("provider", sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint("id"),
sa.ForeignKeyConstraint(["user_id"], ["user.id"], name="fk_file_user_id_user"),
sa.UniqueConstraint("name"),
)
# ### end Alembic commands ###


def downgrade() -> None:
conn = op.get_bind()
# ### commands auto generated by Alembic - please adjust! ###
if migration.table_exists("file", conn):
op.drop_table("file")
# ### end Alembic commands ###
4 changes: 2 additions & 2 deletions src/backend/base/langflow/api/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from langflow.api.health_check_router import health_check_router
from langflow.api.log_router import log_router
from langflow.api.router import router
from langflow.api.router import router, router_v2

__all__ = ["health_check_router", "log_router", "router"]
__all__ = ["health_check_router", "log_router", "router", "router_v2"]
8 changes: 8 additions & 0 deletions src/backend/base/langflow/api/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@
validate_router,
variables_router,
)
from langflow.api.v2 import files_router as files_router_v2

router = APIRouter(
prefix="/api/v1",
)

router_v2 = APIRouter(
prefix="/api/v2",
)

router.include_router(chat_router)
router.include_router(endpoints_router)
router.include_router(validate_router)
Expand All @@ -33,3 +39,5 @@
router.include_router(monitor_router)
router.include_router(folders_router)
router.include_router(starter_projects_router)

router_v2.include_router(files_router_v2)
14 changes: 14 additions & 0 deletions src/backend/base/langflow/api/schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from pathlib import Path
from uuid import UUID

from pydantic import BaseModel


class UploadFileResponse(BaseModel):
"""File upload response schema."""

id: UUID
name: str
path: Path
size: int
provider: str | None = None
5 changes: 5 additions & 0 deletions src/backend/base/langflow/api/v2/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from langflow.api.v2.files import router as files_router

__all__ = [
"files_router",
]
228 changes: 228 additions & 0 deletions src/backend/base/langflow/api/v2/files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
import uuid
from collections.abc import AsyncGenerator
from http import HTTPStatus
from pathlib import Path
from typing import Annotated

from fastapi import APIRouter, Depends, File, HTTPException, UploadFile
from fastapi.responses import StreamingResponse
from sqlmodel import String, cast, select

from langflow.api.schemas import UploadFileResponse
from langflow.api.utils import CurrentActiveUser, DbSession
from langflow.services.database.models.file import File as UserFile
from langflow.services.deps import get_settings_service, get_storage_service
from langflow.services.storage.service import StorageService

router = APIRouter(tags=["Files"], prefix="/files")


async def byte_stream_generator(file_bytes: bytes, chunk_size: int = 8192) -> AsyncGenerator[bytes, None]:
"""Convert bytes object into an async generator that yields chunks."""
for i in range(0, len(file_bytes), chunk_size):
yield file_bytes[i : i + chunk_size]


async def fetch_file_object(file_id: uuid.UUID, current_user: CurrentActiveUser, session: DbSession):
# Fetch the file from the DB
stmt = select(UserFile).where(UserFile.id == file_id)
results = await session.exec(stmt)
file = results.first()

# Check if the file exists
if not file:
raise HTTPException(status_code=404, detail="File not found")

# Make sure the user has access to the file
if file.user_id != current_user.id:
raise HTTPException(status_code=403, detail="You don't have access to this file")

return file


@router.post("", status_code=HTTPStatus.CREATED)
async def upload_user_file(
file: Annotated[UploadFile, File(...)],
session: DbSession,
current_user: CurrentActiveUser,
storage_service=Depends(get_storage_service),
settings_service=Depends(get_settings_service),
) -> UploadFileResponse:
"""Upload a file for the current user and track it in the database."""
# Get the max allowed file size from settings (in MB)
try:
max_file_size_upload = settings_service.settings.max_file_size_upload
except Exception as e:
raise HTTPException(status_code=500, detail=f"Settings error: {e}") from e

# Validate that a file is actually provided
if not file or not file.filename:
raise HTTPException(status_code=400, detail="No file provided")

# Validate file size (convert MB to bytes)
if file.size > max_file_size_upload * 1024 * 1024:
raise HTTPException(
status_code=413,
detail=f"File size is larger than the maximum file size {max_file_size_upload}MB.",
)

# Read file content and create a unique file name
try:
# Create a unique file name
file_id = uuid.uuid4()
file_content = await file.read()

# Get file extension of the file
file_extension = "." + file.filename.split(".")[-1] if file.filename and "." in file.filename else ""
anonymized_file_name = f"{file_id!s}{file_extension}"

# Here we use the current user's id as the folder name
folder = str(current_user.id)
# Save the file using the storage service.
await storage_service.save_file(flow_id=folder, file_name=anonymized_file_name, data=file_content)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error saving file: {e}") from e

# Create a new database record for the uploaded file.
try:
# Enforce unique constraint on name
# Name it as filename (1), (2), etc.
# Check if the file name already exists
new_filename = file.filename
try:
root_filename, _ = new_filename.rsplit(".", 1)
except ValueError:
root_filename, _ = new_filename, ""

# Check if there are files with the same name
stmt = select(UserFile).where(cast(UserFile.name, String).like(f"{root_filename}%"))
existing_files = await session.exec(stmt)
files = existing_files.all() # Fetch all matching records

# If there are files with the same name, append a count to the filename
if files:
count = len(files) # Count occurrences

# Split the extension from the filename
root_filename = f"{root_filename} ({count})"

# Compute the file size based on the path
file_size = await storage_service.get_file_size(flow_id=folder, file_name=anonymized_file_name)

# Compute the file path
file_path = f"{folder}/{anonymized_file_name}"

# Create a new file record
new_file = UserFile(
id=file_id,
user_id=current_user.id,
name=root_filename,
path=file_path,
size=file_size,
)
session.add(new_file)

await session.commit()
await session.refresh(new_file)
except Exception as e:
# Optionally, you could also delete the file from disk if the DB insert fails.
raise HTTPException(status_code=500, detail=f"Database error: {e}") from e

return UploadFileResponse(id=new_file.id, name=new_file.name, path=Path(new_file.path), size=new_file.size)


@router.get("")
async def list_files(
current_user: CurrentActiveUser,
session: DbSession,
) -> list[UserFile]:
"""List the files available to the current user."""
try:
# Fetch from the UserFile table
stmt = select(UserFile).where(UserFile.user_id == current_user.id)
results = await session.exec(stmt)

return list(results)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error listing files: {e}") from e


@router.get("/{file_id}")
async def download_file(
file_id: uuid.UUID,
current_user: CurrentActiveUser,
session: DbSession,
storage_service: Annotated[StorageService, Depends(get_storage_service)],
):
"""Download a file by its ID."""
try:
# Fetch the file from the DB
file = await fetch_file_object(file_id, current_user, session)

# Get the basename of the file path
file_name = file.path.split("/")[-1]

# Get file stream
file_stream = await storage_service.get_file(flow_id=str(current_user.id), file_name=file_name)

# Ensure file_stream is an async iterator returning bytes
byte_stream = byte_stream_generator(file_stream)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error downloading file: {e}") from e

# Return the file as a streaming response
return StreamingResponse(
byte_stream,
media_type="application/octet-stream",
headers={"Content-Disposition": f'attachment; filename="{file.name}"'},
)


@router.put("/{file_id}")
async def edit_file_name(
file_id: uuid.UUID,
name: str,
current_user: CurrentActiveUser,
session: DbSession,
) -> UploadFileResponse:
"""Edit the name of a file by its ID."""
try:
# Fetch the file from the DB
file = await fetch_file_object(file_id, current_user, session)

# Update the file name
file.name = name
await session.commit()
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error editing file: {e}") from e

return UploadFileResponse(id=file.id, name=file.name, path=file.path, size=file.size)


@router.delete("/{file_id}")
async def delete_file(
file_id: uuid.UUID,
current_user: CurrentActiveUser,
session: DbSession,
storage_service: Annotated[StorageService, Depends(get_storage_service)],
):
"""Delete a file by its ID."""
try:
# Fetch the file from the DB
file = await fetch_file_object(file_id, current_user, session)
if not file:
raise HTTPException(status_code=404, detail="File not found")

# Delete the file from the storage service
await storage_service.delete_file(flow_id=str(current_user.id), file_name=file.path)

# Delete from the database
await session.delete(file)
await session.flush() # Ensures delete is staged
await session.commit() # Commit deletion

except Exception as e:
await session.rollback() # Rollback on failure
raise HTTPException(status_code=500, detail=f"Error deleting file: {e}") from e

return {"message": "File deleted successfully"}
26 changes: 21 additions & 5 deletions src/backend/base/langflow/base/data/base_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ def __str__(self):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Dynamically update FileInput to include valid extensions and bundles
self._base_inputs[0].file_types = [*self.valid_extensions, *self.SUPPORTED_BUNDLE_EXTENSIONS]
self._base_inputs[0].file_types = [
*self.valid_extensions,
*self.SUPPORTED_BUNDLE_EXTENSIONS,
]

file_types = ", ".join(self.valid_extensions)
bundles = ", ".join(self.SUPPORTED_BUNDLE_EXTENSIONS)
Expand Down Expand Up @@ -342,8 +345,13 @@ def add_file(data: Data, path: str | Path, *, delete_after_processing: bool):

if self.path and not file_path:
# Wrap self.path into a Data object
data_obj = Data(data={self.SERVER_FILE_PATH_FIELDNAME: self.path})
add_file(data=data_obj, path=self.path, delete_after_processing=False)
if isinstance(self.path, list):
for path in self.path:
data_obj = Data(data={self.SERVER_FILE_PATH_FIELDNAME: path})
add_file(data=data_obj, path=path, delete_after_processing=False)
else:
data_obj = Data(data={self.SERVER_FILE_PATH_FIELDNAME: self.path})
add_file(data=data_obj, path=self.path, delete_after_processing=False)
elif file_path:
for obj in file_path:
server_file_path = obj.data.get(self.SERVER_FILE_PATH_FIELDNAME)
Expand Down Expand Up @@ -384,7 +392,11 @@ def _unpack_and_collect_files(self, files: list[BaseFile]) -> list[BaseFile]:
# Recurse into directories
collected_files.extend(
[
BaseFileComponent.BaseFile(data, sub_path, delete_after_processing=delete_after_processing)
BaseFileComponent.BaseFile(
data,
sub_path,
delete_after_processing=delete_after_processing,
)
for sub_path in path.rglob("*")
if sub_path.is_file()
]
Expand All @@ -399,7 +411,11 @@ def _unpack_and_collect_files(self, files: list[BaseFile]) -> list[BaseFile]:
self.log(f"Unpacked bundle {path.name} into {subpaths}")
collected_files.extend(
[
BaseFileComponent.BaseFile(data, sub_path, delete_after_processing=delete_after_processing)
BaseFileComponent.BaseFile(
data,
sub_path,
delete_after_processing=delete_after_processing,
)
for sub_path in subpaths
]
)
Expand Down
Loading

0 comments on commit 28e07be

Please sign in to comment.