Skip to content

Commit

Permalink
Rename the prefetch-one in cursors.py to "look-ahead" (#302)
Browse files Browse the repository at this point in the history
rename the prefetch-one in cursors.py to look-ahead to avoid ambiguity wrt core/db.py
  • Loading branch information
hemidactylus authored Aug 26, 2024
1 parent d8bb3a5 commit e7cbb59
Showing 1 changed file with 53 additions and 43 deletions.
96 changes: 53 additions & 43 deletions astrapy/cursors.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,66 +160,76 @@ def _hash_document(document: Dict[str, Any]) -> str:
return _item_hash


class _PrefetchIterator:
class _LookAheadIterator:
"""
A class that allows to anticipate reading one element to ensure a call
is made and 'global' (find-wide) properties are read off the first response.
"""

def __init__(self, iterator: Iterator[DocumentType]):
self.iterator = iterator
self.prefetched_item: Optional[DocumentType] = None
self.has_prefetched = False
self.prefetch_exhausted = False
self.preread_item: Optional[DocumentType] = None
self.has_preread = False
self.preread_exhausted = False

def __iter__(self) -> Iterator[DocumentType]:
return self

def __next__(self) -> DocumentType:
if self.has_prefetched:
self.has_prefetched = False
if self.prefetch_exhausted:
if self.has_preread:
self.has_preread = False
if self.preread_exhausted:
raise StopIteration
# if this runs, prefetched_item is filled with a document:
return self.prefetched_item # type: ignore[return-value]
# if this runs, preread_item is filled with a document:
return self.preread_item # type: ignore[return-value]
else:
return next(self.iterator)

def prefetch(self) -> None:
if not self.has_prefetched and not self.prefetch_exhausted:
def preread(self) -> None:
if not self.has_preread and not self.preread_exhausted:
try:
self.prefetched_item = next(self.iterator)
self.has_prefetched = True
self.preread_item = next(self.iterator)
self.has_preread = True
except StopIteration:
self.prefetched_item = None
self.has_prefetched = False
self.prefetch_exhausted = True
self.preread_item = None
self.has_preread = False
self.preread_exhausted = True


class _AsyncPrefetchIterator:
class _AsyncLookAheadIterator:
"""
A class that allows to anticipate reading one element to ensure a call
is made and 'global' (find-wide) properties are read off the first response.
"""

def __init__(self, async_iterator: AsyncIterator[DocumentType]):
self.async_iterator = async_iterator
self.prefetched_item: Optional[DocumentType] = None
self.has_prefetched = False
self.prefetch_exhausted = False
self.preread_item: Optional[DocumentType] = None
self.has_preread = False
self.preread_exhausted = False

def __aiter__(self) -> AsyncIterator[DocumentType]:
return self

async def __anext__(self) -> DocumentType:
if self.has_prefetched:
self.has_prefetched = False
if self.prefetch_exhausted:
if self.has_preread:
self.has_preread = False
if self.preread_exhausted:
raise StopAsyncIteration
# if this runs, prefetched_item is filled with a document:
return self.prefetched_item # type: ignore[return-value]
# if this runs, preread_item is filled with a document:
return self.preread_item # type: ignore[return-value]
else:
return await self.async_iterator.__anext__()

async def prefetch(self) -> None:
if not self.has_prefetched and not self.prefetch_exhausted:
async def preread(self) -> None:
if not self.has_preread and not self.preread_exhausted:
try:
self.prefetched_item = await self.async_iterator.__anext__()
self.has_prefetched = True
self.preread_item = await self.async_iterator.__anext__()
self.has_preread = True
except StopAsyncIteration:
self.prefetched_item = None
self.has_prefetched = False
self.prefetch_exhausted = True
self.preread_item = None
self.has_preread = False
self.preread_exhausted = True


class BaseCursor:
Expand All @@ -244,7 +254,7 @@ class BaseCursor:
_started: bool
_retrieved: int
_alive: bool
_iterator: Optional[Union[_PrefetchIterator, _AsyncPrefetchIterator]] = None
_iterator: Optional[Union[_LookAheadIterator, _AsyncLookAheadIterator]] = None
_api_response_status: Optional[Dict[str, Any]]

def __init__(
Expand Down Expand Up @@ -596,7 +606,7 @@ def __init__(
self._retrieved = 0
self._alive = True
#
self._iterator: Optional[_PrefetchIterator] = None
self._iterator: Optional[_LookAheadIterator] = None
self._api_response_status: Optional[Dict[str, Any]] = None

def __iter__(self) -> Cursor:
Expand Down Expand Up @@ -645,7 +655,7 @@ def get_sort_vector(self) -> Optional[List[float]]:
if self._iterator is None:
self._iterator = self._create_iterator()
self._started = True
self._iterator.prefetch()
self._iterator.preread()
if self._api_response_status:
return self._api_response_status.get("sortVector")
else:
Expand All @@ -660,7 +670,7 @@ def _item_at_index(self, index: int) -> DocumentType:
raise IndexError("no such item for Cursor instance")

@recast_method_sync
def _create_iterator(self) -> _PrefetchIterator:
def _create_iterator(self) -> _LookAheadIterator:
self._ensure_not_started()
self._ensure_alive()
_options = {
Expand Down Expand Up @@ -700,7 +710,7 @@ def _response_setter_callback(raw_response: Dict[str, Any]) -> None:
)
logger.info(f"finished creating iterator on '{self._collection.name}'")
self._started_time_s = time.time()
return _PrefetchIterator(iterator)
return _LookAheadIterator(iterator)

@property
def collection(self) -> Collection:
Expand Down Expand Up @@ -831,7 +841,7 @@ def __init__(
self._retrieved = 0
self._alive = True
#
self._iterator: Optional[_AsyncPrefetchIterator] = None
self._iterator: Optional[_AsyncLookAheadIterator] = None
self._api_response_status: Optional[Dict[str, Any]] = None

def __aiter__(self) -> AsyncCursor:
Expand Down Expand Up @@ -880,7 +890,7 @@ async def get_sort_vector(self) -> Optional[List[float]]:
if self._iterator is None:
self._iterator = self._create_iterator()
self._started = True
await self._iterator.prefetch()
await self._iterator.preread()
if self._api_response_status:
return self._api_response_status.get("sortVector")
else:
Expand All @@ -895,7 +905,7 @@ def _item_at_index(self, index: int) -> DocumentType:
raise IndexError("no such item for AsyncCursor instance")

@recast_method_sync
def _create_iterator(self) -> _AsyncPrefetchIterator:
def _create_iterator(self) -> _AsyncLookAheadIterator:
self._ensure_not_started()
self._ensure_alive()
_options = {
Expand Down Expand Up @@ -935,7 +945,7 @@ def _response_setter_callback(raw_response: Dict[str, Any]) -> None:
)
logger.info(f"finished creating iterator on '{self._collection.name}'")
self._started_time_s = time.time()
return _AsyncPrefetchIterator(iterator)
return _AsyncLookAheadIterator(iterator)

def _to_sync(
self: AsyncCursor,
Expand Down Expand Up @@ -1204,6 +1214,6 @@ def close(self) -> None:


__pdoc__ = {
"_PrefetchIterator": False,
"_AsyncPrefetchIterator": False,
"_LookAheadIterator": False,
"_AsyncLookAheadIterator": False,
}

0 comments on commit e7cbb59

Please sign in to comment.