Skip to content
This repository has been archived by the owner on Jan 7, 2025. It is now read-only.

Commit

Permalink
Add a objects endpoint for ingest process
Browse files Browse the repository at this point in the history
  • Loading branch information
CannonLock committed Feb 7, 2024
1 parent b1c1fd0 commit 4548b52
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 8 deletions.
7 changes: 4 additions & 3 deletions api/models/object.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ class Base(BaseModel):
source: Optional[dict] = None
mime_type: Optional[str] = None
sha256_hash: Optional[str] = None
object_group_id: Optional[int] = None

class Config:
orm_mode = True


class Post(Base):

scheme: SchemeEnum
host: str
bucket: str
Expand All @@ -27,14 +29,13 @@ class Config:

class Get(Post):
id: int
object_group_id: Optional[int] = None
created_on: datetime.datetime
updated_on: datetime.datetime
deleted_on: Optional[datetime.datetime] = None


class GetSecureURL(Base):
secure_url: str
class GetSecureURL(Get):
pre_signed_url: str


class Patch(Base):
Expand Down
26 changes: 21 additions & 5 deletions api/routes/ingest.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import datetime
import os

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import insert, select, update, and_
from sqlalchemy.orm import selectinload, joinedload
from pydantic import parse_obj_as
import minio

from api.database import (
get_async_session,
Expand Down Expand Up @@ -116,21 +116,37 @@ async def patch_ingest_process(id: int, object: IngestProcessModel.Patch, groups
return response


@router.get("/{id}/objects", response_model=list[Object.Get])
@router.get("/{id}/objects", response_model=list[Object.GetSecureURL])
async def get_ingest_process_objects(id: int, groups: list[str] = Depends(get_groups)):
"""Get all objects for an ingestion process"""

engine = get_engine()
async_session = get_async_session(engine)

objects = None
async with async_session() as session:

select_stmt = select(IngestProcessSchema).where(and_(IngestProcessSchema.id == id))
ingest_process = await session.scalar(select_stmt)

object_stmt = select(ObjectGroup).where(ObjectGroup.id == ingest_process.object_group_id).options(selectinload(ObjectGroup.objects))
objects_iterator = await session.execute(object_stmt)
objects = [Object.Get(x) for x in objects_iterator.scalar().objects]
schema_objects = objects_iterator.scalar().objects

if len(schema_objects) == 0:
return []

try:
# Attach the secure url
first_object = schema_objects[0]
m = minio.Minio(endpoint=first_object.host, access_key=os.environ['access_key'], secret_key=os.environ['secret_key'], secure=True)

for obj in schema_objects:
obj.pre_signed_url = m.presigned_get_object(bucket_name=obj.bucket, object_name=obj.key)

return schema_objects

except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Failed to get secure url for object: {e}"
)
11 changes: 11 additions & 0 deletions api/tests/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,3 +543,14 @@ def test_get_objects(self, api_client):
for object in objects:
assert object['key'] in keys

#@pytest.skip("Manual testing only")
def test_get_objects_known_ingest_process(self, api_client):

ingest_process_id = 1

response = api_client.get(f"/ingest-process/{ingest_process_id}/objects")
assert response.status_code == 200
objects = response.json()

assert len(objects) > 0
assert objects[0]['pre_signed_url'] is not None

0 comments on commit 4548b52

Please sign in to comment.