diff --git a/api/models/ingest.py b/api/models/ingest.py index e64915e..0e675fa 100644 --- a/api/models/ingest.py +++ b/api/models/ingest.py @@ -1,33 +1,32 @@ import datetime from typing import Optional +from enum import Enum from pydantic import BaseModel, ConfigDict -from api.models.object import Object +from api.schemas import IngestState -class IngestProcess(BaseModel): +class Post(BaseModel): - object_id: int + state: Optional[IngestState] = None comments: Optional[str] = None + source_id: Optional[int] = None + access_group_id: Optional[int] = None class Config: orm_mode = True extra = "ignore" -class ResponseIngestProcess(IngestProcess): +class Get(Post): + id: int - group_id: Optional[int] = None + object_group_id: int created_on: datetime.datetime completed_on: Optional[datetime.datetime] = None -class ResponseIngestProcessWithObject(ResponseIngestProcess): - object: Object - +class Patch(Post): + pass -class IngestProcessPatch(BaseModel): - group_id: Optional[int] = None - comments: Optional[str] = None - completed_on: Optional[datetime.datetime] = None diff --git a/api/models/object.py b/api/models/object.py index 9341bf8..2a07c07 100644 --- a/api/models/object.py +++ b/api/models/object.py @@ -5,22 +5,42 @@ from api.schemas import SchemeEnum +class Base(BaseModel): -class Object(BaseModel): + source: Optional[dict] = None + mime_type: Optional[str] = None + sha256_hash: Optional[str] = None + + class Config: + orm_mode = True + + +class Post(Base): scheme: SchemeEnum host: str bucket: str key: str - source: dict - mime_type: str - sha256_hash: str class Config: orm_mode = True -class ResponseObject(Object): +class Get(Post): id: int + object_group_id: Optional[int] = None created_on: datetime.datetime updated_on: datetime.datetime deleted_on: Optional[datetime.datetime] = None + + +class GetSecureURL(Base): + secure_url: str + + +class Patch(Base): + + object_group_id: Optional[int] = None + created_on: Optional[datetime.datetime] = None + updated_on: Optional[datetime.datetime] = None + deleted_on: Optional[datetime.datetime] = None + diff --git a/api/routes/ingest.py b/api/routes/ingest.py index 278125e..377cec7 100644 --- a/api/routes/ingest.py +++ b/api/routes/ingest.py @@ -11,8 +11,9 @@ results_to_model ) from api.routes.security import get_groups -from api.models.ingest import IngestProcess, ResponseIngestProcess, ResponseIngestProcessWithObject, IngestProcessPatch -from api.schemas import IngestProcess as IngestProcessSchema +import api.models.ingest as IngestProcessModel +import api.models.object as Object +from api.schemas import IngestProcess as IngestProcessSchema, ObjectGroup from api.query_parser import get_filter_query_params, QueryParser router = APIRouter( @@ -22,7 +23,7 @@ ) -@router.get("", response_model=list[ResponseIngestProcess]) +@router.get("", response_model=list[IngestProcessModel.Get]) async def get_multiple_ingest_process(page: int = 0, page_size: int = 50, filter_query_params=Depends(get_filter_query_params), groups: list[str] = Depends(get_groups)): """Get all ingestion processes""" @@ -54,7 +55,7 @@ async def get_multiple_ingest_process(page: int = 0, page_size: int = 50, filter return results.all() -@router.get("/{id}", response_model=ResponseIngestProcess) +@router.get("/{id}", response_model=IngestProcessModel.Get) async def get_ingest_process(id: int, groups: list[str] = Depends(get_groups)): """Get a single object""" @@ -70,12 +71,12 @@ async def get_ingest_process(id: int, groups: list[str] = Depends(get_groups)): if result is None: raise HTTPException(status_code=404, detail=f"IngestProcess with id ({id}) not found") - response = ResponseIngestProcess(**result.__dict__) + response = IngestProcessModel.Get(**result.__dict__) return response -@router.post("", response_model=ResponseIngestProcessWithObject) -async def create_ingest_process(object: IngestProcess, groups: list[str] = Depends(get_groups)): +@router.post("", response_model=IngestProcessModel.Get) +async def create_ingest_process(object: IngestProcessModel.Post, groups: list[str] = Depends(get_groups)): """Create/Register a new object""" engine = get_engine() @@ -83,7 +84,10 @@ async def create_ingest_process(object: IngestProcess, groups: list[str] = Depen async with async_session() as session: - stmt = insert(IngestProcessSchema).values(**object.dict()).returning(IngestProcessSchema).options(selectinload(IngestProcessSchema.object)) + object_group_stmt = insert(ObjectGroup).values().returning(ObjectGroup) + object_group = await session.scalar(object_group_stmt) + + stmt = insert(IngestProcessSchema).values(**object.model_dump(), object_group_id=object_group.id).returning(IngestProcessSchema) server_object = await session.scalar(stmt) await session.commit() @@ -91,8 +95,8 @@ async def create_ingest_process(object: IngestProcess, groups: list[str] = Depen return server_object -@router.patch("/{id}", response_model=ResponseIngestProcess) -async def patch_ingest_process(id: int, object: IngestProcessPatch, groups: list[str] = Depends(get_groups)) -> ResponseIngestProcess: +@router.patch("/{id}", response_model=IngestProcessModel.Get) +async def patch_ingest_process(id: int, object: IngestProcessModel.Patch, groups: list[str] = Depends(get_groups)): """Update a object""" engine = get_engine() @@ -102,32 +106,31 @@ async def patch_ingest_process(id: int, object: IngestProcessPatch, groups: list update_stmt = update(IngestProcessSchema)\ .where(IngestProcessSchema.id == id)\ - .values(**object.dict())\ + .values(**object.model_dump(exclude_unset=True))\ .returning(IngestProcessSchema) server_object = await session.scalar(update_stmt) - response = ResponseIngestProcess(**server_object.__dict__) + response = IngestProcessModel.Get(**server_object.__dict__) await session.commit() return response -@router.delete("/{id}") -async def delete_ingest_process(id: int, groups: list[str] = Depends(get_groups)) -> ResponseIngestProcess: - """Delete a object""" +@router.get("/{id}/objects", response_model=list[Object.Get]) +async def get_ingest_process_objects(id: int, groups: list[str] = Depends(get_groups)): + """Get all objects for an ingestion process""" engine = get_engine() async_session = get_async_session(engine) + objects = None async with async_session() as session: - delete_stmt = update(IngestProcessSchema)\ - .where(IngestProcessSchema.id == id)\ - .values(deleted_on=datetime.datetime.utcnow())\ - .returning(IngestProcessSchema) + select_stmt = select(IngestProcessSchema).where(and_(IngestProcessSchema.id == id)) + ingest_process = await session.scalar(select_stmt) + + object_stmt = select(ObjectGroup).where(ObjectGroup.id == ingest_process.object_group_id).options(selectinload(ObjectGroup.objects)) + objects_iterator = await session.execute(object_stmt) + objects = [Object.Get(x) for x in objects_iterator.scalar().objects] - server_object = await session.scalar(delete_stmt) - response = ResponseIngestProcess(**server_object.__dict__) - await session.commit() - return response \ No newline at end of file diff --git a/api/routes/object.py b/api/routes/object.py index 7387f71..9019295 100644 --- a/api/routes/object.py +++ b/api/routes/object.py @@ -9,8 +9,8 @@ results_to_model ) from api.routes.security import get_groups -from api.models.object import Object, ResponseObject -from api.schemas import Objects +import api.models.object as Object +import api.schemas as schemas from api.query_parser import get_filter_query_params, QueryParser router = APIRouter( @@ -20,14 +20,14 @@ ) -@router.get("", response_model=list[ResponseObject]) +@router.get("", response_model=list[Object.Get]) async def get_objects(page: int = 0, page_size: int = 50, filter_query_params=Depends(get_filter_query_params), groups: list[str] = Depends(get_groups)): """Get all objects""" engine = get_engine() async_session = get_async_session(engine) - query_parser = QueryParser(columns=Objects.__table__.c, query_params=filter_query_params) + query_parser = QueryParser(columns=schemas.Object.__table__.c, query_params=filter_query_params) async with async_session() as session: @@ -35,7 +35,7 @@ async def get_objects(page: int = 0, page_size: int = 50, filter_query_params=De select_stmt = select(*query_parser.get_select_columns())\ .limit(page_size)\ .offset(page_size * page)\ - .where(and_(Objects.deleted_on == None, query_parser.where_expressions())) + .where(and_(schemas.Object.deleted_on == None, query_parser.where_expressions())) # Add grouping if query_parser.get_group_by_column() is not None: @@ -49,10 +49,10 @@ async def get_objects(page: int = 0, page_size: int = 50, filter_query_params=De results = await session.execute(select_stmt) - return results_to_model(results, ResponseObject) + return results_to_model(results, Object.Get) -@router.get("/{id}", response_model=ResponseObject) +@router.get("/{id}", response_model=Object.Get) async def get_object(id: int, groups: list[str] = Depends(get_groups)): """Get a single object""" @@ -61,19 +61,19 @@ async def get_object(id: int, groups: list[str] = Depends(get_groups)): async with async_session() as session: - select_stmt = select(Objects).where(and_(Objects.id == id, Objects.deleted_on == None)) + select_stmt = select(schemas.Object).where(and_(schemas.Object.id == id, schemas.Object.deleted_on == None)) result = await session.scalar(select_stmt) if result is None: raise HTTPException(status_code=404, detail=f"Object with id ({id}) not found") - response = ResponseObject(**result.__dict__) + response = Object.Get(**result.__dict__) return response -@router.post("", response_model=ResponseObject) -async def create_file(object: Object, groups: list[str] = Depends(get_groups)): +@router.post("", response_model=Object.Get) +async def create_object(object: Object.Post, groups: list[str] = Depends(get_groups)): """Create/Register a new object""" engine = get_engine() @@ -81,16 +81,16 @@ async def create_file(object: Object, groups: list[str] = Depends(get_groups)): async with async_session() as session: - insert_stmt = insert(Objects).values(**object.dict()).returning(Objects) + insert_stmt = insert(schemas.Object).values(**object.model_dump()).returning(schemas.Object) server_object = await session.scalar(insert_stmt) - response = ResponseObject(**server_object.__dict__) + response = Object.Get(**server_object.__dict__) await session.commit() return response -@router.patch("/{id}", response_model=ResponseObject) -async def patch_object(id: int, object: Object, groups: list[str] = Depends(get_groups)) -> ResponseObject: +@router.patch("/{id}", response_model=Object.Get) +async def patch_object(id: int, object: Object.Patch, groups: list[str] = Depends(get_groups)): """Update a object""" engine = get_engine() @@ -98,20 +98,20 @@ async def patch_object(id: int, object: Object, groups: list[str] = Depends(get_ async with async_session() as session: - update_stmt = update(Objects)\ - .where(Objects.id == id)\ - .values(**object.dict())\ - .returning(Objects) + update_stmt = update(schemas.Object)\ + .where(schemas.Object.id == id)\ + .values(**object.model_dump(exclude_unset=True))\ + .returning(schemas.Object) server_object = await session.scalar(update_stmt) - response = ResponseObject(**server_object.__dict__) + response = Object.Get(**server_object.__dict__) await session.commit() return response -@router.delete("/{id}") -async def delete_object(id: int, groups: list[str] = Depends(get_groups)) -> ResponseObject: +@router.delete("/{id}", response_model=Object.Get) +async def delete_object(id: int, groups: list[str] = Depends(get_groups)): """Delete a object""" engine = get_engine() @@ -119,13 +119,13 @@ async def delete_object(id: int, groups: list[str] = Depends(get_groups)) -> Res async with async_session() as session: - delete_stmt = update(Objects)\ - .where(Objects.id == id)\ + delete_stmt = update(schemas.Object)\ + .where(schemas.Object.id == id)\ .values(deleted_on=datetime.datetime.utcnow())\ - .returning(Objects) + .returning(schemas.Object) server_object = await session.scalar(delete_stmt) - response = ResponseObject(**server_object.__dict__) + response = Object.Get(**server_object.__dict__) await session.commit() - return response \ No newline at end of file + return response diff --git a/api/schemas.py b/api/schemas.py index 14f21aa..84ca985 100644 --- a/api/schemas.py +++ b/api/schemas.py @@ -2,7 +2,7 @@ from typing import List import datetime from sqlalchemy import ForeignKey, func, DateTime, Enum, UniqueConstraint -from sqlalchemy.dialects.postgresql import VARCHAR, TEXT, INTEGER, ARRAY, BOOLEAN, JSON +from sqlalchemy.dialects.postgresql import VARCHAR, TEXT, INTEGER, ARRAY, BOOLEAN, JSON, JSONB from sqlalchemy import String from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship from geoalchemy2 import Geometry @@ -91,20 +91,21 @@ class SchemeEnum(enum.Enum): s3 = "s3" -class Objects(Base): - __tablename__ = "objects" +class Object(Base): + __tablename__ = "object" __table_args__ = ( UniqueConstraint('scheme', 'host', 'bucket', 'key', name='unique_file'), - {'schema': 'macrostrat'} + {'schema': 'storage'} ) id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) + object_group_id: Mapped[int] = mapped_column(ForeignKey("storage.object_group.id"), nullable=True) scheme: Mapped[str] = mapped_column(Enum(SchemeEnum)) host: Mapped[str] = mapped_column(VARCHAR(255), nullable=False) bucket: Mapped[str] = mapped_column(VARCHAR(255), nullable=False) key: Mapped[str] = mapped_column(VARCHAR(255), nullable=False) - source: Mapped[dict] = mapped_column(JSON) - mime_type: Mapped[str] = mapped_column(VARCHAR(255)) - sha256_hash: Mapped[str] = mapped_column(VARCHAR(255)) + source: Mapped[dict] = mapped_column(JSONB, nullable=True) + mime_type: Mapped[str] = mapped_column(VARCHAR(255), nullable=True) + sha256_hash: Mapped[str] = mapped_column(VARCHAR(255), nullable=True) created_on: Mapped[datetime.datetime] = mapped_column( DateTime(timezone=True), server_default=func.now() ) @@ -115,19 +116,44 @@ class Objects(Base): DateTime(timezone=True), nullable=True ) + # Relationships + object_group: Mapped["ObjectGroup"] = relationship(back_populates="objects") + + +class ObjectGroup(Base): + __tablename__ = "object_group" + __table_args__ = {'schema': 'storage'} + id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) + + # Relationships + objects: Mapped[List["Object"]] = relationship(back_populates="object_group") + ingest_process: Mapped["IngestProcess"] = relationship(back_populates="object_group") + + +class IngestState(enum.Enum): + pending = "pending" + ingested = "ingested" + prepared = "prepared" + failed = "failed" + abandoned = "abandoned" + class IngestProcess(Base): __tablename__ = "ingest_process" __table_args__ = {'schema': 'macrostrat'} id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) + state: Mapped[str] = mapped_column(Enum(IngestState, name="ingest_state"), nullable=True) comments: Mapped[str] = mapped_column(TEXT, nullable=True) - group_id: Mapped[int] = mapped_column(ForeignKey("macrostrat_auth.group.id"), nullable=True) - object_id: Mapped[Objects] = mapped_column(ForeignKey("macrostrat.objects.id")) - object: Mapped[Objects] = relationship("Objects", lazy="joined") + source_id: Mapped[int] = mapped_column(ForeignKey("maps.sources.source_id"), nullable=True) + access_group_id: Mapped[int] = mapped_column(ForeignKey("macrostrat_auth.group.id"), nullable=True) + object_group_id: Mapped[ObjectGroup] = mapped_column(ForeignKey("storage.object_group.id")) created_on: Mapped[datetime.datetime] = mapped_column( DateTime(timezone=True), server_default=func.now() ) completed_on: Mapped[datetime.datetime] = mapped_column( DateTime(timezone=True), nullable=True ) + + # Relationships + object_group: Mapped[ObjectGroup] = relationship(back_populates="ingest_process", lazy="joined") diff --git a/api/tests/main.py b/api/tests/main.py index ef09d80..07c28f2 100644 --- a/api/tests/main.py +++ b/api/tests/main.py @@ -385,6 +385,24 @@ def test_get_object(self, api_client): assert single_data == data[0] + def test_patch_object(self, api_client): + + # Get a object + response = api_client.get("/object") + assert response.status_code == 200 + object_data = response.json() + assert len(object_data) > 0 + + # Patch Object + response = api_client.patch( + f"/object/{object_data[0]['id']}", + json={"source": {"comments": "test"}} + ) + assert response.status_code == 200 + single_data = response.json() + + assert single_data['source']['comments'] == "test" + def test_delete_object(self, api_client): key = f"test-{random.randint(0,10000000)}" @@ -417,30 +435,9 @@ class TestIngestProcess: def test_add_ingest_process(self, api_client): """Test adding an ingest process""" - key = f"test-{random.randint(0, 10000000)}" - - object_data = { - "scheme": "http", - "host": "test.com", - "bucket": "test", - "key": key, - "source": { - "test_key": "test_value" - }, - "mime_type": "application/json", - "sha256_hash": hashlib.sha256(open(__file__, "rb").read()).hexdigest() - } - - response = api_client.post( - "/object", - json=object_data, - ) - - assert response.status_code == 200 - object = response.json() - ingest_process_data = { - "object_id": object['id'] + "comments": "This is a test comment", + "state": "pending" } response = api_client.post( @@ -449,7 +446,6 @@ def test_add_ingest_process(self, api_client): ) assert response.status_code == 200 - assert response.json()['object']['key'] == key def test_get_ingest_processes(self, api_client): response = api_client.get("/ingest-process") @@ -489,4 +485,61 @@ def test_patch_ingest_process(self, api_client): single_data = response.json() - assert single_data['comments'] == "test" \ No newline at end of file + assert single_data['comments'] == "test" + + def test_pair_object_to_ingest(self, api_client): + response = api_client.get("/ingest-process") + assert response.status_code == 200 + ingest_data = response.json()[0] + + response = api_client.get("/object") + assert response.status_code == 200 + object_data = response.json()[0] + + # Pair the object to the ingest process + response = api_client.patch(f"/object/{object_data['id']}", json={"object_group_id": ingest_data['object_group_id']}) + assert response.status_code == 200 + + def test_get_objects(self, api_client): + + # Add an ingest process + ingest_process_data = { + "comments": "This is a test comment", + "state": "pending" + } + ingest_response = api_client.post( + "/ingest-process", + json=ingest_process_data, + ) + assert ingest_response.status_code == 200 + ingest_data = ingest_response.json() + + # Add some objects + keys = [] + for i in range(4): + key = f"test-{random.randint(0,10000000)}" + keys.append(key) + object_data = { + "scheme": "http", + "host": "test.com", + "bucket": "test", + "key": key, + "source": {"test_key": "test_value"}, + "mime_type": "application/json" + } + response = api_client.post("/object", json=object_data) + assert response.status_code == 200 + object_data = response.json() + + # Pair the object to the ingest process + response = api_client.patch(f"/object/{object_data['id']}", json={"object_group_id": ingest_data['object_group_id']}) + assert response.status_code == 200 + + response = api_client.get(f"/ingest-process/{ingest_data['id']}/objects") + assert response.status_code == 200 + objects = response.json() + + assert len(objects) == 4 + for object in objects: + assert object['key'] in keys +