Skip to content
This repository has been archived by the owner on Jan 7, 2025. It is now read-only.

Commit

Permalink
Update the ingest Process
Browse files Browse the repository at this point in the history
  • Loading branch information
CannonLock committed Feb 6, 2024
1 parent 246830f commit b1c1fd0
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 102 deletions.
23 changes: 11 additions & 12 deletions api/models/ingest.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,32 @@
import datetime
from typing import Optional
from enum import Enum

from pydantic import BaseModel, ConfigDict

from api.models.object import Object
from api.schemas import IngestState


class IngestProcess(BaseModel):
class Post(BaseModel):

object_id: int
state: Optional[IngestState] = None
comments: Optional[str] = None
source_id: Optional[int] = None
access_group_id: Optional[int] = None

class Config:
orm_mode = True
extra = "ignore"


class ResponseIngestProcess(IngestProcess):
class Get(Post):

id: int
group_id: Optional[int] = None
object_group_id: int
created_on: datetime.datetime
completed_on: Optional[datetime.datetime] = None


class ResponseIngestProcessWithObject(ResponseIngestProcess):
object: Object

class Patch(Post):
pass

class IngestProcessPatch(BaseModel):
group_id: Optional[int] = None
comments: Optional[str] = None
completed_on: Optional[datetime.datetime] = None
30 changes: 25 additions & 5 deletions api/models/object.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,42 @@

from api.schemas import SchemeEnum

class Base(BaseModel):

class Object(BaseModel):
source: Optional[dict] = None
mime_type: Optional[str] = None
sha256_hash: Optional[str] = None

class Config:
orm_mode = True


class Post(Base):
scheme: SchemeEnum
host: str
bucket: str
key: str
source: dict
mime_type: str
sha256_hash: str

class Config:
orm_mode = True


class ResponseObject(Object):
class Get(Post):
id: int
object_group_id: Optional[int] = None
created_on: datetime.datetime
updated_on: datetime.datetime
deleted_on: Optional[datetime.datetime] = None


class GetSecureURL(Base):
secure_url: str


class Patch(Base):

object_group_id: Optional[int] = None
created_on: Optional[datetime.datetime] = None
updated_on: Optional[datetime.datetime] = None
deleted_on: Optional[datetime.datetime] = None

49 changes: 26 additions & 23 deletions api/routes/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
results_to_model
)
from api.routes.security import get_groups
from api.models.ingest import IngestProcess, ResponseIngestProcess, ResponseIngestProcessWithObject, IngestProcessPatch
from api.schemas import IngestProcess as IngestProcessSchema
import api.models.ingest as IngestProcessModel
import api.models.object as Object
from api.schemas import IngestProcess as IngestProcessSchema, ObjectGroup
from api.query_parser import get_filter_query_params, QueryParser

router = APIRouter(
Expand All @@ -22,7 +23,7 @@
)


@router.get("", response_model=list[ResponseIngestProcess])
@router.get("", response_model=list[IngestProcessModel.Get])
async def get_multiple_ingest_process(page: int = 0, page_size: int = 50, filter_query_params=Depends(get_filter_query_params), groups: list[str] = Depends(get_groups)):
"""Get all ingestion processes"""

Expand Down Expand Up @@ -54,7 +55,7 @@ async def get_multiple_ingest_process(page: int = 0, page_size: int = 50, filter
return results.all()


@router.get("/{id}", response_model=ResponseIngestProcess)
@router.get("/{id}", response_model=IngestProcessModel.Get)
async def get_ingest_process(id: int, groups: list[str] = Depends(get_groups)):
"""Get a single object"""

Expand All @@ -70,29 +71,32 @@ async def get_ingest_process(id: int, groups: list[str] = Depends(get_groups)):
if result is None:
raise HTTPException(status_code=404, detail=f"IngestProcess with id ({id}) not found")

response = ResponseIngestProcess(**result.__dict__)
response = IngestProcessModel.Get(**result.__dict__)
return response


@router.post("", response_model=ResponseIngestProcessWithObject)
async def create_ingest_process(object: IngestProcess, groups: list[str] = Depends(get_groups)):
@router.post("", response_model=IngestProcessModel.Get)
async def create_ingest_process(object: IngestProcessModel.Post, groups: list[str] = Depends(get_groups)):
"""Create/Register a new object"""

engine = get_engine()
async_session = get_async_session(engine, expire_on_commit=False)

async with async_session() as session:

stmt = insert(IngestProcessSchema).values(**object.dict()).returning(IngestProcessSchema).options(selectinload(IngestProcessSchema.object))
object_group_stmt = insert(ObjectGroup).values().returning(ObjectGroup)
object_group = await session.scalar(object_group_stmt)

stmt = insert(IngestProcessSchema).values(**object.model_dump(), object_group_id=object_group.id).returning(IngestProcessSchema)
server_object = await session.scalar(stmt)

await session.commit()

return server_object


@router.patch("/{id}", response_model=ResponseIngestProcess)
async def patch_ingest_process(id: int, object: IngestProcessPatch, groups: list[str] = Depends(get_groups)) -> ResponseIngestProcess:
@router.patch("/{id}", response_model=IngestProcessModel.Get)
async def patch_ingest_process(id: int, object: IngestProcessModel.Patch, groups: list[str] = Depends(get_groups)):
"""Update a object"""

engine = get_engine()
Expand All @@ -102,32 +106,31 @@ async def patch_ingest_process(id: int, object: IngestProcessPatch, groups: list

update_stmt = update(IngestProcessSchema)\
.where(IngestProcessSchema.id == id)\
.values(**object.dict())\
.values(**object.model_dump(exclude_unset=True))\
.returning(IngestProcessSchema)

server_object = await session.scalar(update_stmt)

response = ResponseIngestProcess(**server_object.__dict__)
response = IngestProcessModel.Get(**server_object.__dict__)
await session.commit()
return response


@router.delete("/{id}")
async def delete_ingest_process(id: int, groups: list[str] = Depends(get_groups)) -> ResponseIngestProcess:
"""Delete a object"""
@router.get("/{id}/objects", response_model=list[Object.Get])
async def get_ingest_process_objects(id: int, groups: list[str] = Depends(get_groups)):
"""Get all objects for an ingestion process"""

engine = get_engine()
async_session = get_async_session(engine)

objects = None
async with async_session() as session:

delete_stmt = update(IngestProcessSchema)\
.where(IngestProcessSchema.id == id)\
.values(deleted_on=datetime.datetime.utcnow())\
.returning(IngestProcessSchema)
select_stmt = select(IngestProcessSchema).where(and_(IngestProcessSchema.id == id))
ingest_process = await session.scalar(select_stmt)

object_stmt = select(ObjectGroup).where(ObjectGroup.id == ingest_process.object_group_id).options(selectinload(ObjectGroup.objects))
objects_iterator = await session.execute(object_stmt)
objects = [Object.Get(x) for x in objects_iterator.scalar().objects]

server_object = await session.scalar(delete_stmt)

response = ResponseIngestProcess(**server_object.__dict__)
await session.commit()
return response
54 changes: 27 additions & 27 deletions api/routes/object.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
results_to_model
)
from api.routes.security import get_groups
from api.models.object import Object, ResponseObject
from api.schemas import Objects
import api.models.object as Object
import api.schemas as schemas
from api.query_parser import get_filter_query_params, QueryParser

router = APIRouter(
Expand All @@ -20,22 +20,22 @@
)


@router.get("", response_model=list[ResponseObject])
@router.get("", response_model=list[Object.Get])
async def get_objects(page: int = 0, page_size: int = 50, filter_query_params=Depends(get_filter_query_params), groups: list[str] = Depends(get_groups)):
"""Get all objects"""

engine = get_engine()
async_session = get_async_session(engine)

query_parser = QueryParser(columns=Objects.__table__.c, query_params=filter_query_params)
query_parser = QueryParser(columns=schemas.Object.__table__.c, query_params=filter_query_params)

async with async_session() as session:

# TODO: This flow should likely be refactored into a function, lets see it used once more before making the move
select_stmt = select(*query_parser.get_select_columns())\
.limit(page_size)\
.offset(page_size * page)\
.where(and_(Objects.deleted_on == None, query_parser.where_expressions()))
.where(and_(schemas.Object.deleted_on == None, query_parser.where_expressions()))

# Add grouping
if query_parser.get_group_by_column() is not None:
Expand All @@ -49,10 +49,10 @@ async def get_objects(page: int = 0, page_size: int = 50, filter_query_params=De

results = await session.execute(select_stmt)

return results_to_model(results, ResponseObject)
return results_to_model(results, Object.Get)


@router.get("/{id}", response_model=ResponseObject)
@router.get("/{id}", response_model=Object.Get)
async def get_object(id: int, groups: list[str] = Depends(get_groups)):
"""Get a single object"""

Expand All @@ -61,71 +61,71 @@ async def get_object(id: int, groups: list[str] = Depends(get_groups)):

async with async_session() as session:

select_stmt = select(Objects).where(and_(Objects.id == id, Objects.deleted_on == None))
select_stmt = select(schemas.Object).where(and_(schemas.Object.id == id, schemas.Object.deleted_on == None))

result = await session.scalar(select_stmt)

if result is None:
raise HTTPException(status_code=404, detail=f"Object with id ({id}) not found")

response = ResponseObject(**result.__dict__)
response = Object.Get(**result.__dict__)
return response


@router.post("", response_model=ResponseObject)
async def create_file(object: Object, groups: list[str] = Depends(get_groups)):
@router.post("", response_model=Object.Get)
async def create_object(object: Object.Post, groups: list[str] = Depends(get_groups)):
"""Create/Register a new object"""

engine = get_engine()
async_session = get_async_session(engine)

async with async_session() as session:

insert_stmt = insert(Objects).values(**object.dict()).returning(Objects)
insert_stmt = insert(schemas.Object).values(**object.model_dump()).returning(schemas.Object)
server_object = await session.scalar(insert_stmt)

response = ResponseObject(**server_object.__dict__)
response = Object.Get(**server_object.__dict__)
await session.commit()
return response


@router.patch("/{id}", response_model=ResponseObject)
async def patch_object(id: int, object: Object, groups: list[str] = Depends(get_groups)) -> ResponseObject:
@router.patch("/{id}", response_model=Object.Get)
async def patch_object(id: int, object: Object.Patch, groups: list[str] = Depends(get_groups)):
"""Update a object"""

engine = get_engine()
async_session = get_async_session(engine)

async with async_session() as session:

update_stmt = update(Objects)\
.where(Objects.id == id)\
.values(**object.dict())\
.returning(Objects)
update_stmt = update(schemas.Object)\
.where(schemas.Object.id == id)\
.values(**object.model_dump(exclude_unset=True))\
.returning(schemas.Object)

server_object = await session.scalar(update_stmt)

response = ResponseObject(**server_object.__dict__)
response = Object.Get(**server_object.__dict__)
await session.commit()
return response


@router.delete("/{id}")
async def delete_object(id: int, groups: list[str] = Depends(get_groups)) -> ResponseObject:
@router.delete("/{id}", response_model=Object.Get)
async def delete_object(id: int, groups: list[str] = Depends(get_groups)):
"""Delete a object"""

engine = get_engine()
async_session = get_async_session(engine)

async with async_session() as session:

delete_stmt = update(Objects)\
.where(Objects.id == id)\
delete_stmt = update(schemas.Object)\
.where(schemas.Object.id == id)\
.values(deleted_on=datetime.datetime.utcnow())\
.returning(Objects)
.returning(schemas.Object)

server_object = await session.scalar(delete_stmt)

response = ResponseObject(**server_object.__dict__)
response = Object.Get(**server_object.__dict__)
await session.commit()
return response
return response
Loading

0 comments on commit b1c1fd0

Please sign in to comment.