Skip to content
This repository has been archived by the owner on Jan 7, 2025. It is now read-only.

Commit

Permalink
Add multi file imports
Browse files Browse the repository at this point in the history
  • Loading branch information
CannonLock committed Apr 11, 2024
1 parent f279829 commit b88f144
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 56 deletions.
73 changes: 41 additions & 32 deletions api/routes/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Union

from fastapi import APIRouter, Depends, HTTPException, UploadFile
from starlette.datastructures import UploadFile as StarletteUploadFile
import starlette.requests
from sqlalchemy import insert, select, update, and_, delete
from sqlalchemy.orm import selectinload, joinedload, defer
import minio
Expand Down Expand Up @@ -254,8 +254,13 @@ async def get_ingest_process_objects(id: int):
detail=f"Failed to get secure url for object: {e}"
)

@router.post("/{id}/objects", response_model=Object.Get)
async def create_object(id: int, object: UploadFile, user_has_access: bool = Depends(has_access)):
@router.post("/{id}/objects", response_model=list[Object.Get])
async def create_object(
request: starlette.requests.Request,
id: int,
object: list[UploadFile],
user_has_access: bool = Depends(has_access)
):
"""Create/Register a new object"""

if not user_has_access:
Expand All @@ -264,43 +269,47 @@ async def create_object(id: int, object: UploadFile, user_has_access: bool = Dep
engine = get_engine()
async_session = get_async_session(engine)

response_objects = []

async with async_session() as session:

ingest_stmt = select(IngestProcessSchema).where(IngestProcessSchema.id == id)
ingest_process = await session.scalar(ingest_stmt)

# Upload the file to s3
m = minio.Minio(endpoint=os.environ['S3_HOST'], access_key=os.environ['access_key'],
secret_key=os.environ['secret_key'], secure=True)
if "multipart/form-data" in request.headers['content-type']:

file_length = len(object.file.read())
object.file.seek(0)
files = (await request.form()).getlist("object")
for upload_file in files:

object_file_name = f"{ingest_process.id}/{object.filename}"
m = minio.Minio(endpoint=os.environ['S3_HOST'], access_key=os.environ['access_key'],
secret_key=os.environ['secret_key'], secure=True)

m.put_object(
bucket_name=os.environ['S3_BUCKET'],
object_name=object_file_name,
data=object.file,
content_type=object.content_type,
length=file_length
)
object_file_name = f"{ingest_process.id}/{upload_file.filename}"

# Upload this file pointer to postgres
object = Object.Post(
mime_type=object.content_type,
key=object_file_name,
bucket=os.environ['S3_BUCKET'],
host=os.environ['S3_HOST'],
scheme=schemas.SchemeEnum.http,
object_group_id=ingest_process.object_group_id
)
m.put_object(
bucket_name=os.environ['S3_BUCKET'],
object_name=object_file_name,
data=upload_file.file,
content_type=upload_file.content_type,
length=upload_file.size
)

insert_stmt = insert(schemas.Object)\
.values(**object.model_dump())\
.returning(schemas.Object)
server_object = await session.scalar(insert_stmt)
object = Object.Post(
mime_type=upload_file.content_type,
key=object_file_name,
bucket=os.environ['S3_BUCKET'],
host=os.environ['S3_HOST'],
scheme=schemas.SchemeEnum.http,
object_group_id=ingest_process.object_group_id
)

response = Object.Get(**server_object.__dict__)
await session.commit()
return response
insert_stmt = insert(schemas.Object) \
.values(**object.model_dump()) \
.returning(schemas.Object)
server_object = await session.scalar(insert_stmt)

response_objects.append(Object.Get(**server_object.__dict__))

await session.commit()

return response_objects
52 changes: 29 additions & 23 deletions api/routes/object.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime
from typing import Union

import starlette.requests
from fastapi import APIRouter, Depends, HTTPException, File, UploadFile
from starlette.datastructures import UploadFile as StarletteUploadFile
from sqlalchemy import insert, select, update, and_
Expand Down Expand Up @@ -79,34 +80,39 @@ async def get_object(id: int):


@router.post("", response_model=Object.Get)
async def create_object(object: Union[Object.Post, UploadFile], user_has_access: bool = Depends(has_access)):
async def create_object(
request: starlette.requests.Request,
object: Union[Object.Post, list[UploadFile], UploadFile],
user_has_access: bool = Depends(has_access)
):
"""Create/Register a new object"""

if not user_has_access:
raise HTTPException(status_code=403, detail="User does not have access to create object")

if isinstance(object, StarletteUploadFile):
m = minio.Minio(endpoint=os.environ['S3_HOST'], access_key=os.environ['access_key'],
secret_key=os.environ['secret_key'], secure=True)

file_length = len(object.file.read())
object.file.seek(0)

m.put_object(
bucket_name=os.environ['S3_BUCKET'],
object_name=object.filename,
data=object.file,
content_type=object.content_type,
length=file_length
)

object = Object.Post(
mime_type=object.content_type,
key=object.filename,
bucket=os.environ['S3_BUCKET'],
host=os.environ['S3_HOST'],
scheme=schemas.SchemeEnum.http
)
if "multipart/form-data" in request.headers['content-type']:

files = (await request.form()).getlist("object")
for upload_file in files:

m = minio.Minio(endpoint=os.environ['S3_HOST'], access_key=os.environ['access_key'],
secret_key=os.environ['secret_key'], secure=True)

m.put_object(
bucket_name=os.environ['S3_BUCKET'],
object_name=upload_file.filename,
data=upload_file.file,
content_type=upload_file.content_type,
length=upload_file.size
)

object = Object.Post(
mime_type=upload_file.content_type,
key=upload_file.filename,
bucket=os.environ['S3_BUCKET'],
host=os.environ['S3_HOST'],
scheme=schemas.SchemeEnum.http
)

engine = get_engine()
async_session = get_async_session(engine)
Expand Down
34 changes: 33 additions & 1 deletion api/tests/object.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,26 @@ def test_object_post(self, api_client):
def test_object_file_post(self, api_client):
"""Test posting an object to the database"""

random_string = f"test-{random.randint(0, 10000000)}"

response = api_client.post(
"/object",
files={"object": open("./tests/data/test.txt", "rb")}
files=[
("object", ("test.txt", open(f"./tests/data/{random_string}.txt", "rb"), "text/plain"))
]
)

assert response.status_code == 200

def test_object_multi_file_post(self, api_client):
"""Test posting an object to the database"""

response = api_client.post(
"/object",
files=[
("object", ("test.txt", open("./tests/data/test.txt", "rb"), "text/plain")),
("object", ("object.py", open("./tests/object.py", "rb"), "text/plain"))
]
)

assert response.status_code == 200
Expand All @@ -50,6 +67,21 @@ def test_object_post_to_ingest_process(self, api_client):

assert response.status_code == 200

def test_object_multi_file_post_to_ingest_process(self, api_client):
"""Test posting an object to the database"""

random_string = f"test-{random.randint(0, 10000000)}"

response = api_client.post(
"/ingest-process/1/objects",
files=[
("object", (f"{random_string}.txt", open("./tests/data/test.txt", "rb"), "text/plain")),
("object", (f"{random_string}.py", open("./tests/object.py", "rb"), "text/plain"))
]
)

assert response.status_code == 200

def test_get_objects(self, api_client):
response = api_client.get("/object")
assert response.status_code == 200
Expand Down

0 comments on commit b88f144

Please sign in to comment.