1
1
import os
2
2
3
3
from fastapi import APIRouter , Depends , HTTPException
4
- from sqlalchemy import insert , select , update , and_
4
+ from sqlalchemy import insert , select , update , and_ , delete
5
5
from sqlalchemy .orm import selectinload , joinedload , defer
6
6
import minio
7
7
13
13
from api .routes .security import has_access
14
14
import api .models .ingest as IngestProcessModel
15
15
import api .models .object as Object
16
- from api .schemas import IngestProcess as IngestProcessSchema , ObjectGroup , Sources
16
+ from api .schemas import IngestProcess as IngestProcessSchema , ObjectGroup , Sources , IngestProcessTag
17
17
from api .query_parser import get_filter_query_params , QueryParser
18
18
19
19
router = APIRouter (
@@ -39,7 +39,8 @@ async def get_multiple_ingest_process(page: int = 0, page_size: int = 50, filter
39
39
.limit (page_size )\
40
40
.offset (page_size * page )\
41
41
.where (and_ (query_parser .where_expressions ()))\
42
- .options (joinedload (IngestProcessSchema .source ).defer (Sources .rgeom ).defer (Sources .web_geom ))
42
+ .options (joinedload (IngestProcessSchema .source ).defer (Sources .rgeom ).defer (Sources .web_geom ))\
43
+ .options (selectinload (IngestProcessSchema .tags ))
43
44
44
45
results = await session .execute (select_stmt )
45
46
@@ -56,7 +57,8 @@ async def get_ingest_process(id: int):
56
57
async with async_session () as session :
57
58
58
59
select_stmt = select (IngestProcessSchema ).where (and_ (IngestProcessSchema .id == id ))\
59
- .options (joinedload (IngestProcessSchema .source ).defer (Sources .rgeom ).defer (Sources .web_geom ))
60
+ .options (joinedload (IngestProcessSchema .source ).defer (Sources .rgeom ).defer (Sources .web_geom ))\
61
+ .options (selectinload (IngestProcessSchema .tags ))
60
62
61
63
result = await session .scalar (select_stmt )
62
64
@@ -78,18 +80,25 @@ async def create_ingest_process(object: IngestProcessModel.Post, user_has_access
78
80
79
81
async with async_session () as session :
80
82
81
- object_group_stmt = insert (ObjectGroup ).values ().returning (ObjectGroup )
82
- object_group = await session .scalar (object_group_stmt )
83
-
84
- stmt = insert (IngestProcessSchema ).values (** object .model_dump (), object_group_id = object_group .id ).returning (IngestProcessSchema )
83
+ object_group = ObjectGroup ()
84
+ session .add (object_group )
85
+ await session .commit ()
85
86
86
- server_object = await session .scalar (stmt )
87
+ tags = [IngestProcessTag (tag = tag ) for tag in object .tags ]
88
+ del object .tags
87
89
88
- server_object .source = await session .scalar (select (Sources ).where (Sources .source_id == server_object .source_id ))
90
+ ingest_process = IngestProcessSchema (
91
+ ** object .model_dump (),
92
+ object_group_id = object_group .id ,
93
+ tags = tags
94
+ )
89
95
96
+ session .add (ingest_process )
90
97
await session .commit ()
91
98
92
- return server_object
99
+ ingest_process .source = await session .get (Sources , object .source_id )
100
+
101
+ return ingest_process
93
102
94
103
95
104
@router .patch ("/{id}" , response_model = IngestProcessModel .Get )
@@ -119,6 +128,62 @@ async def patch_ingest_process(
119
128
await session .commit ()
120
129
return response
121
130
131
+ @router .post ("/{id}/tags" , response_model = list [str ])
132
+ async def add_ingest_process_tag (
133
+ id : int ,
134
+ tag : IngestProcessModel .Tag ,
135
+ user_has_access : bool = Depends (has_access )
136
+ ):
137
+ """Add a tag to an ingest process"""
138
+
139
+ if not user_has_access :
140
+ raise HTTPException (status_code = 403 , detail = "User does not have access to create an object" )
141
+
142
+ engine = get_engine ()
143
+ async_session = get_async_session (engine )
144
+
145
+ async with async_session () as session :
146
+
147
+ ingest_process = await session .get (IngestProcessSchema , id )
148
+
149
+ if ingest_process is None :
150
+ raise HTTPException (status_code = 404 , detail = f"IngestProcess with id ({ id } ) not found" )
151
+
152
+ ingest_process .tags .append (IngestProcessTag (tag = tag .tag ))
153
+ await session .commit ()
154
+
155
+ ingest_process = await session .get (IngestProcessSchema , id )
156
+ return [tag .tag for tag in ingest_process .tags ]
157
+
158
+ return None
159
+
160
+ @router .delete ("/{id}/tags/{tag}" , response_model = list [str ])
161
+ async def delete_ingest_process_tag (id : int , tag : str , user_has_access : bool = Depends (has_access )):
162
+ """Delete a tag from an ingest process"""
163
+
164
+ if not user_has_access :
165
+ raise HTTPException (status_code = 403 , detail = "User does not have access to create an object" )
166
+
167
+ engine = get_engine ()
168
+ async_session = get_async_session (engine )
169
+
170
+ async with async_session () as session :
171
+
172
+ ingest_process = await session .get (IngestProcessSchema , id )
173
+
174
+ if ingest_process is None :
175
+ raise HTTPException (status_code = 404 , detail = f"IngestProcess with id ({ id } ) not found" )
176
+
177
+ tag_stmt = delete (IngestProcessTag ).where (and_ (IngestProcessTag .ingest_process_id == id , IngestProcessTag .tag == tag ))
178
+ await session .execute (tag_stmt )
179
+ await session .commit ()
180
+
181
+ ingest_process = await session .get (IngestProcessSchema , id )
182
+
183
+ return [tag .tag for tag in ingest_process .tags ]
184
+
185
+ return ingest_process
186
+
122
187
123
188
@router .get ("/{id}/objects" , response_model = list [Object .GetSecureURL ])
124
189
async def get_ingest_process_objects (id : int ):
0 commit comments