Skip to content

Commit

Permalink
DML for idiomatic + necessary changes around (#238)
Browse files Browse the repository at this point in the history
* astrapy-level 'chunked_delete_many' plain implementation

* collection delete_many tests put chunking to test

* projection filters through find_one_and_replace (also vector_), sync+async

* (idiomatic) find_one_and_replace

* projection support for (astrapy) find_one_and_update's

* find_one_and_delete

* find_one_and_update

* replace_one

* update_one

* update_many
  • Loading branch information
hemidactylus authored Mar 6, 2024
1 parent 37c94df commit 619bfe6
Show file tree
Hide file tree
Showing 9 changed files with 1,522 additions and 55 deletions.
82 changes: 72 additions & 10 deletions astrapy/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,8 +500,9 @@ def find_one_and_replace(
self,
replacement: Dict[str, Any],
*,
sort: Optional[Dict[str, Any]] = {},
filter: Optional[Dict[str, Any]] = None,
projection: Optional[Dict[str, Any]] = None,
sort: Optional[Dict[str, Any]] = None,
options: Optional[Dict[str, Any]] = None,
) -> API_RESPONSE:
"""
Expand All @@ -517,6 +518,7 @@ def find_one_and_replace(
json_query = make_payload(
top_level="findOneAndReplace",
filter=filter,
projection=projection,
replacement=replacement,
options=options,
sort=sort,
Expand Down Expand Up @@ -547,7 +549,7 @@ def vector_find_one_and_replace(
dict or None: either the matched document or None if nothing found
"""
# Pre-process the included arguments
sort, _ = self._recast_as_sort_projection(
sort, projection = self._recast_as_sort_projection(
convert_vector_to_floats(vector),
fields=fields,
)
Expand All @@ -556,6 +558,7 @@ def vector_find_one_and_replace(
raw_find_result = self.find_one_and_replace(
replacement=replacement,
filter=filter,
projection=projection,
sort=sort,
)

Expand All @@ -567,6 +570,7 @@ def find_one_and_update(
sort: Optional[Dict[str, Any]] = {},
filter: Optional[Dict[str, Any]] = None,
options: Optional[Dict[str, Any]] = None,
projection: Optional[Dict[str, Any]] = None,
) -> API_RESPONSE:
"""
Find a single document and update it.
Expand All @@ -584,6 +588,7 @@ def find_one_and_update(
update=update,
options=options,
sort=sort,
projection=projection,
)

response = self._request(
Expand Down Expand Up @@ -614,7 +619,7 @@ def vector_find_one_and_update(
update operation, or None if nothing found
"""
# Pre-process the included arguments
sort, _ = self._recast_as_sort_projection(
sort, projection = self._recast_as_sort_projection(
convert_vector_to_floats(vector),
fields=fields,
)
Expand All @@ -624,6 +629,7 @@ def vector_find_one_and_update(
update=update,
filter=filter,
sort=sort,
projection=projection,
)

return cast(Union[API_DOC, None], raw_find_result["data"]["document"])
Expand Down Expand Up @@ -873,7 +879,10 @@ def update_one(
return response

def update_many(
self, filter: Dict[str, Any], update: Dict[str, Any]
self,
filter: Dict[str, Any],
update: Dict[str, Any],
options: Optional[Dict[str, Any]] = None,
) -> API_RESPONSE:
"""
Updates multiple documents in the collection.
Expand All @@ -883,7 +892,12 @@ def update_many(
Returns:
dict: The response from the database after the update operation.
"""
json_query = make_payload(top_level="updateMany", filter=filter, update=update)
json_query = make_payload(
top_level="updateMany",
filter=filter,
update=update,
options=options,
)

response = self._request(
method=http_methods.POST,
Expand Down Expand Up @@ -973,6 +987,23 @@ def delete_many(self, filter: Dict[str, Any]) -> API_RESPONSE:

return response

def chunked_delete_many(self, filter: Dict[str, Any]) -> List[API_RESPONSE]:
"""
Delete many documents from the collection based on a filter condition,
chaining several API calls until exhaustion of the documents to delete.
Args:
filter (dict): Criteria to identify the documents to delete.
Returns:
List[dict]: The responses from the database from all the calls
"""
responses = []
must_proceed = True
while must_proceed:
dm_response = self.delete_many(filter=filter)
responses.append(dm_response)
must_proceed = dm_response.get("status", {}).get("moreData", False)
return responses

def clear(self) -> API_RESPONSE:
"""
Clear the collection, deleting all documents
Expand Down Expand Up @@ -1534,8 +1565,9 @@ async def find_one_and_replace(
self,
replacement: Dict[str, Any],
*,
sort: Optional[Dict[str, Any]] = {},
filter: Optional[Dict[str, Any]] = None,
projection: Optional[Dict[str, Any]] = None,
sort: Optional[Dict[str, Any]] = None,
options: Optional[Dict[str, Any]] = None,
) -> API_RESPONSE:
"""
Expand All @@ -1551,6 +1583,7 @@ async def find_one_and_replace(
json_query = make_payload(
top_level="findOneAndReplace",
filter=filter,
projection=projection,
replacement=replacement,
options=options,
sort=sort,
Expand Down Expand Up @@ -1581,7 +1614,7 @@ async def vector_find_one_and_replace(
dict or None: either the matched document or None if nothing found
"""
# Pre-process the included arguments
sort, _ = self._recast_as_sort_projection(
sort, projection = self._recast_as_sort_projection(
vector,
fields=fields,
)
Expand All @@ -1590,6 +1623,7 @@ async def vector_find_one_and_replace(
raw_find_result = await self.find_one_and_replace(
replacement=replacement,
filter=filter,
projection=projection,
sort=sort,
)

Expand All @@ -1601,6 +1635,7 @@ async def find_one_and_update(
sort: Optional[Dict[str, Any]] = {},
filter: Optional[Dict[str, Any]] = None,
options: Optional[Dict[str, Any]] = None,
projection: Optional[Dict[str, Any]] = None,
) -> API_RESPONSE:
"""
Find a single document and update it.
Expand All @@ -1618,6 +1653,7 @@ async def find_one_and_update(
update=update,
options=options,
sort=sort,
projection=projection,
)

response = await self._request(
Expand Down Expand Up @@ -1648,7 +1684,7 @@ async def vector_find_one_and_update(
update operation, or None if nothing found
"""
# Pre-process the included arguments
sort, _ = self._recast_as_sort_projection(
sort, projection = self._recast_as_sort_projection(
vector,
fields=fields,
)
Expand All @@ -1658,6 +1694,7 @@ async def vector_find_one_and_update(
update=update,
filter=filter,
sort=sort,
projection=projection,
)

return cast(Union[API_DOC, None], raw_find_result["data"]["document"])
Expand Down Expand Up @@ -1881,7 +1918,10 @@ async def update_one(
return response

async def update_many(
self, filter: Dict[str, Any], update: Dict[str, Any]
self,
filter: Dict[str, Any],
update: Dict[str, Any],
options: Optional[Dict[str, Any]] = None,
) -> API_RESPONSE:
"""
Updates multiple documents in the collection.
Expand All @@ -1891,7 +1931,12 @@ async def update_many(
Returns:
dict: The response from the database after the update operation.
"""
json_query = make_payload(top_level="updateMany", filter=filter, update=update)
json_query = make_payload(
top_level="updateMany",
filter=filter,
update=update,
options=options,
)

response = await self._request(
method=http_methods.POST,
Expand Down Expand Up @@ -1972,6 +2017,23 @@ async def delete_many(self, filter: Dict[str, Any]) -> API_RESPONSE:

return response

async def chunked_delete_many(self, filter: Dict[str, Any]) -> List[API_RESPONSE]:
"""
Delete many documents from the collection based on a filter condition,
chaining several API calls until exhaustion of the documents to delete.
Args:
filter (dict): Criteria to identify the documents to delete.
Returns:
List[dict]: The responses from the database from all the calls
"""
responses = []
must_proceed = True
while must_proceed:
dm_response = await self.delete_many(filter=filter)
responses.append(dm_response)
must_proceed = dm_response.get("status", {}).get("moreData", False)
return responses

async def clear(self) -> API_RESPONSE:
"""
Clear the collection, deleting all documents
Expand Down
Loading

0 comments on commit 619bfe6

Please sign in to comment.