From a28c578ffb9e3bab1536056d23a86ebbc84c1388 Mon Sep 17 00:00:00 2001 From: Louis Maddox Date: Thu, 12 Aug 2021 21:31:35 +0100 Subject: [PATCH] Full implemention of async fetch handler for URL lists; codecs; tests (closes #36) --- docs/api.rst | 15 +++ docs/overview.rst | 6 +- src/range_streams/__init__.py | 8 +- src/range_streams/async_utils.py | 35 +++++-- src/range_streams/codecs/conda/stream.py | 35 ++++--- src/range_streams/codecs/png/stream.py | 78 ++++++++++++-- src/range_streams/codecs/tar/stream.py | 19 +++- src/range_streams/codecs/zip/stream.py | 63 ++++++++--- src/range_streams/overlaps.py | 3 - src/range_streams/request.py | 77 +++++++++----- src/range_streams/response.py | 127 ++++++++++++++++++++--- src/range_streams/stream.py | 26 ++++- src/range_streams/types.py | 11 ++ tests/async_test.py | 94 ++++++++++++++--- tests/data.py | 9 +- tests/range_stream_core_test.py | 2 +- tests/response_test.py | 4 +- 17 files changed, 498 insertions(+), 114 deletions(-) create mode 100644 src/range_streams/types.py diff --git a/docs/api.rst b/docs/api.rst index 0216633..6f9de31 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -38,6 +38,21 @@ These helper functions help prepare HTTP requests to set up a stream. :show-inheritance: +Asynchronous fetcher +==================== + +This helper class handles all of the details of asynchronously +fetching streams, given a list of URLs. + +---- + + +.. automodule:: range_streams.async_utils + :members: + :undoc-members: + :show-inheritance: + + Overlap handling ================= diff --git a/docs/overview.rst b/docs/overview.rst index b7bdeff..9e27341 100644 --- a/docs/overview.rst +++ b/docs/overview.rst @@ -52,7 +52,7 @@ Example .. code:: py RangeDict{ - RangeSet{Range[0, 3)}: RangeResponse ⠶ [0, 3) @ 'example_text_file.txt' from github.com + RangeSet{Range[0, 3)}: RangeResponse ⠶ [0, 3) @ 'example_text_file.txt' from raw.githubusercontent.com } Further ranges are requested by simply calling ``RangeStream.add`` with @@ -71,6 +71,6 @@ interval. .. code:: py RangeDict{ - RangeSet{Range[0, 3)}: RangeResponse ⠶ [0, 3) @ 'example_text_file.txt' from github.com, - RangeSet{Range[7, 9)}: RangeResponse ⠶ [7, 9) @ 'example_text_file.txt' from github.com + RangeSet{Range[0, 3)}: RangeResponse ⠶ [0, 3) @ 'example_text_file.txt' from raw.githubusercontent.com, + RangeSet{Range[7, 9)}: RangeResponse ⠶ [7, 9) @ 'example_text_file.txt' from raw.githubusercontent.com } diff --git a/src/range_streams/__init__.py b/src/range_streams/__init__.py index 0c0dfd4..eaf3613 100644 --- a/src/range_streams/__init__.py +++ b/src/range_streams/__init__.py @@ -39,7 +39,7 @@ >>> rng = Range(0,3) # doctest: +SKIP >>> s.add(rng) # doctest: +SKIP >>> s.ranges # doctest: +SKIP - RangeDict{RangeSet{Range[0, 3)}: RangeResponse ⠶ [0, 3) @ 'example_text_file.txt' from github.com} + RangeDict{RangeSet{Range[0, 3)}: RangeResponse ⠶ [0, 3) @ 'example_text_file.txt' from raw.githubusercontent.com} Once a request is made for a non-empty range, the :class:`~range_streams.stream.RangeStream` acquires the first entry in the :class:`~ranges.RangeDict` stored on the @@ -58,8 +58,8 @@ >>> s.add(byte_range=(7,9)) # doctest: +SKIP >>> s.ranges # doctest: +SKIP RangeDict{ - RangeSet{Range[0, 3)}: RangeResponse ⠶ [0, 3) @ 'example_text_file.txt' from github.com, - RangeSet{Range[7, 9)}: RangeResponse ⠶ [7, 9) @ 'example_text_file.txt' from github.com + RangeSet{Range[0, 3)}: RangeResponse ⠶ [0, 3) @ 'example_text_file.txt' from raw.githubusercontent.com, + RangeSet{Range[7, 9)}: RangeResponse ⠶ [7, 9) @ 'example_text_file.txt' from raw.githubusercontent.com } Codecs are available for ``.zip`` (:class:`~range_streams.codecs.zip.ZipStream`) and ``.conda`` @@ -71,7 +71,7 @@ >>> s = ZipStream(url=_EXAMPLE_ZIP_URL) # doctest: +SKIP >>> s.ranges # doctest: +SKIP RangeDict{ - RangeSet{Range[51, 62)}: RangeResponse ⠶ "example_text_file.txt" [51, 62) @ 'example_text_file.txt.zip' from github.com + RangeSet{Range[51, 62)}: RangeResponse ⠶ "example_text_file.txt" [51, 62) @ 'example_text_file.txt.zip' from raw.githubusercontent.com } The ``.conda`` format is just a particular type of zip for Python packages on the conda diff --git a/src/range_streams/async_utils.py b/src/range_streams/async_utils.py index 71779f2..93344d4 100644 --- a/src/range_streams/async_utils.py +++ b/src/range_streams/async_utils.py @@ -6,7 +6,7 @@ from functools import partial from signal import SIGINT, SIGTERM, Signals from sys import stderr -from typing import TYPE_CHECKING, Callable, Coroutine, Iterator +from typing import TYPE_CHECKING, Callable, Coroutine, Iterator, Type from aiostream import stream from ranges import Range, RangeSet @@ -19,7 +19,7 @@ from tqdm.asyncio import tqdm_asyncio from .log_utils import log, set_up_logging -from .stream import RangeStream +from .types import _T as RangeStreamOrSubclass __all__ = ["SignalHaltError", "AsyncFetcher"] @@ -27,14 +27,18 @@ class AsyncFetcher: def __init__( self, + stream_cls: Type[RangeStreamOrSubclass], urls: list[str], callback: Callable | None = None, verbose: bool = False, show_progress_bar: bool = True, timeout_s: float = 5.0, client=None, + **kwargs, ): """ + Any kwargs are passed through to the stream class constructor. + Args: callback : A function to be passed 3 values: the AsyncFetcher which is calling it, the awaited RangeStream, and its source URL (a ``httpx.URL``, @@ -42,6 +46,8 @@ def __init__( """ if urls == []: raise ValueError("The list of URLs to fetch cannot be empty") + self.stream_cls = stream_cls + self.stream_cls_kwargs = kwargs self.url_list = urls self.callback = callback self.n = len(urls) @@ -64,21 +70,21 @@ def make_calls(self): if self.show_progress_bar: self.pbar.close() - async def process_stream(self, rstream: RangeStream): + async def process_stream(self, range_stream: RangeStreamOrSubclass): """ Process an awaited RangeStream within an async fetch loop, calling the callback set on the `~range_streams.async_utils.AsyncFetcher.callback` attribute. Args: - rstream : The awaited RangeStream + range_stream : The awaited RangeStream (or one of its subclasses) """ - monostream_response = rstream._ranges[rstream.total_range] + monostream_response = range_stream._ranges[range_stream.total_range] resp = monostream_response.request.response # httpx.Response source_url = resp.history[0].url if resp.history else resp.url # Map the response back to the thing it came from in the url_list i = next(i for (i, u) in enumerate(self.url_list) if source_url == u) if self.callback is not None: - await self.callback(self, stream, source_url) + await self.callback(self, range_stream, source_url) if self.verbose: log.debug(f"Processed URL in async callback: {source_url}") if self.show_progress_bar: @@ -109,9 +115,18 @@ def fetch_things(self, urls: Iterator[str]): self.pbar.disable = True self.pbar.close() - async def fetch(self, client: httpx.AsyncClient, url: httpx.URL) -> RangeStream: - s = RangeStream( - url=str(url), client=client, single_request=True, force_async=True + async def fetch(self, client, url) -> RangeStreamOrSubclass: + """ + Args: + client : ``httpx.AsyncClient`` + url : ``httpx.URL`` + """ + s = self.stream_cls( + url=str(url), + client=client, + single_request=True, + force_async=True, + **self.stream_cls_kwargs, ) await s.add_async() return s @@ -137,7 +152,7 @@ async def async_fetch_urlset( " after using the client in a contextmanager block (which implicitly" " closes after exiting the block) perhaps?" ) - raise ValueError(msg) + raise ValueError(msg) # assert self.client is not None # give mypy a clue processed = await self.fetch_and_process(urls=urls, client=client) return processed diff --git a/src/range_streams/codecs/conda/stream.py b/src/range_streams/codecs/conda/stream.py index 6c44cf8..d1cadb4 100644 --- a/src/range_streams/codecs/conda/stream.py +++ b/src/range_streams/codecs/conda/stream.py @@ -14,8 +14,10 @@ def __init__( client=None, byte_range: Range | tuple[int, int] = Range("[0, 0)"), pruning_level: int = 0, - scan_contents: bool = True, + single_request: bool = False, + force_async: bool = False, chunk_size: int | None = None, + scan_contents: bool = True, ): """ Set up a stream for the conda (ZIP) archive at ``url``, with either an initial @@ -43,23 +45,32 @@ def __init__( method for further details. Args: - url : (:class:`str`) The URL of the file to be streamed - client : (:class:`httpx.Client` | ``None``) The HTTPX client - to use for HTTP requests - byte_range : (:class:`~ranges.Range` | ``tuple[int,int]``) The range - of positions on the file to be requested - pruning_level : (:class:`int`) Either ``0`` ('replant'), ``1`` ('burn'), - or ``2`` ('strict') - scan_contents : (:class:`bool`) Whether to scan the archive contents - upon initialisation and add the archive's file ranges - chunk_size : (:class:`int` | ``None``) The chunk size used for the - ``httpx.Response.iter_raw`` response byte iterators + url : (:class:`str`) The URL of the file to be streamed + client : (:class:`httpx.Client` | ``None``) The HTTPX client + to use for HTTP requests + byte_range : (:class:`~ranges.Range` | ``tuple[int,int]``) The range + of positions on the file to be requested + pruning_level : (:class:`int`) Either ``0`` ('replant'), ``1`` ('burn'), + or ``2`` ('strict') + single_request : (:class:`bool`) Whether to use a single GET request and + just add 'windows' onto this rather than create multiple + partial content requests. + force_async : (:class:`bool` | ``None``) Whether to require the client + to be ``httpx.AsyncClient``, and if no client is given, + to create one on initialisation. (Experimental/WIP) + chunk_size : (:class:`int` | ``None``) The chunk size used for the + ``httpx.Response.iter_raw`` response byte iterators + scan_contents : (:class:`bool`) Whether to scan the archive contents + upon initialisation and add the archive's file ranges """ super().__init__( url=url, client=client, byte_range=byte_range, pruning_level=pruning_level, + single_request=single_request, + force_async=force_async, + chunk_size=chunk_size, scan_contents=scan_contents, ) if scan_contents: diff --git a/src/range_streams/codecs/png/stream.py b/src/range_streams/codecs/png/stream.py index 4795da3..e3cd872 100644 --- a/src/range_streams/codecs/png/stream.py +++ b/src/range_streams/codecs/png/stream.py @@ -33,6 +33,7 @@ def __init__( byte_range: Range | tuple[int, int] = Range("[0, 0)"), pruning_level: int = 0, single_request: bool = True, + force_async: bool = False, scan_ihdr: bool = True, enumerate_chunks: bool = True, chunk_size: int | None = None, @@ -80,6 +81,9 @@ def __init__( single_request : (:class:`bool`) Whether to use a single GET request and just add 'windows' onto this rather than create multiple partial content requests. + force_async : (:class:`bool` | ``None``) Whether to require the client + to be ``httpx.AsyncClient``, and if no client is given, + to create one on initialisation. (Experimental/WIP) scan_ihdr : (:class:`bool`) Whether to scan the IHDR chunk on initialisation enumerate_chunks : (:class:`bool`) Whether to step through each chunk @@ -94,12 +98,16 @@ def __init__( byte_range=byte_range, pruning_level=pruning_level, single_request=single_request, + force_async=force_async, ) - if enumerate_chunks: - self.populate_chunks() - self.data = PngData() - if scan_ihdr: - self.scan_ihdr() + if force_async: + self.data = PngData() + else: + if enumerate_chunks: + self.populate_chunks() + self.data = PngData() + if scan_ihdr: + self.scan_ihdr() def populate_chunks(self): """ @@ -138,7 +146,10 @@ def scan_ihdr(self): according to the spec. """ ihdr_rng = Range(self.data.IHDR.start_pos, self.data.IHDR.end_pos) - self.add(ihdr_rng) + if self.client_is_async: + self.add_async(ihdr_rng) + else: + self.add(ihdr_rng) ihdr_bytes = self.active_range_response.read() ihdr_u = struct.unpack(self.data.IHDR.struct, ihdr_bytes) if None in ihdr_u: @@ -151,7 +162,15 @@ def scan_ihdr(self): self.data.IHDR.filter_method = ihdr_u[self.data.IHDR.parts._IHDR_FILTER_METHOD] self.data.IHDR.interlacing = ihdr_u[self.data.IHDR.parts._IHDR_INTERLACING] - def enumerate_chunks(self): + def verify_sync(self, msg=""): + if self.client_is_async: + raise ValueError(f"Synchronous client check failed{msg}") + + def verify_async(self, msg=""): + if not self.client_is_async: + raise ValueError(f"Asynchronous client check failed{msg}") + + def enumerate_chunks(self) -> dict[str, list[PngChunkInfo]]: """ Parse the length and type chunks, then skip past the chunk data and CRC chunk, so as to enumerate all chunks in the PNG (but request and read as little as @@ -166,6 +185,7 @@ def enumerate_chunks(self): Portable_Network_Graphics#%22Chunks%22_within_the_file>`_, or `the W3C `_). """ + self.verify_sync(msg=": call `enumerate_chunks_async` on an async PngStream") png_signature = 8 # PNG files start with an 8-byte signature chunk_preamble_size = 8 # 4-byte length chunk + 4-byte type chunk chunks: dict[str, list[PngChunkInfo]] = {} @@ -189,8 +209,50 @@ def enumerate_chunks(self): chunks[chunk_type].append(chunk_info) return chunks + async def enumerate_chunks_async(self) -> dict[str, list[PngChunkInfo]]: + """ + Parse the length and type chunks, then skip past the chunk data and CRC chunk, + so as to enumerate all chunks in the PNG (but request and read as little as + possible). Build a dictionary of all chunks with keys of the chunk type (four + letter strings) and values of lists (since some chunks e.g. IDAT can appear + multiple times in the PNG). + + See `the official specification + `_ for full details + (or `Wikipedia + `_, + or `the W3C `_). + """ + self.verify_async(msg=": call `enumerate_chunks` on a synchronous PngStream") + png_signature = 8 # PNG files start with an 8-byte signature + chunk_preamble_size = 8 # 4-byte length chunk + 4-byte type chunk + chunks: dict[str, list[PngChunkInfo]] = {} + chunk_start = png_signature # Skip PNG file signature to reach first chunk + chunk_type: str | None = None # initialise for while loop condition + while chunk_type != "IEND": + if chunks: + # Increment chunk_start from last iteration + # (last chunk's end is this chunk's start) + chunk_start = chunk_info.end # type: ignore + chunk_length_rng = Range(chunk_start, chunk_start + chunk_preamble_size) + await self.add_async(chunk_length_rng) + b = await self.active_range_response.aread() + chunk_len = struct.unpack(">I", b[:4])[0] + chunk_type = b[4:].decode("ascii") + assert chunk_type is not None # appease mypy + chunks.setdefault(chunk_type, []) + chunk_info = PngChunkInfo( + start=chunk_start, type=chunk_type, length=chunk_len + ) + chunks[chunk_type].append(chunk_info) + return chunks + def get_chunk_data(self, chunk_info: PngChunkInfo) -> bytes: - self.add(chunk_info.data_range) + if self.client_is_async: + self.add_async(chunk_info.data_range) + else: + self.add(chunk_info.data_range) b = self.active_range_response.read() return b diff --git a/src/range_streams/codecs/tar/stream.py b/src/range_streams/codecs/tar/stream.py index 815853a..0e05d69 100644 --- a/src/range_streams/codecs/tar/stream.py +++ b/src/range_streams/codecs/tar/stream.py @@ -42,6 +42,7 @@ def __init__( pruning_level: int = 0, scan_headers: bool = True, single_request: bool = False, + force_async: bool = False, chunk_size: int | None = None, ): """ @@ -90,6 +91,9 @@ def __init__( single_request : (:class:`bool`) Whether to use a single GET request and just add 'windows' onto this rather than create multiple partial content requests. + force_async : (:class:`bool` | ``None``) Whether to require the client + to be ``httpx.AsyncClient``, and if no client is given, + to create one on initialisation. (Experimental/WIP) chunk_size : (:class:`int` | ``None``) The chunk size used for the ``httpx.Response.iter_raw`` response byte iterators """ @@ -153,7 +157,10 @@ def read_file_name(self, start_pos_offset: int = 0) -> str: file_name_rng_start = start_pos_offset + self.data.HEADER._H_FILENAME_START file_name_rng_end = file_name_rng_start + self.data.HEADER._H_FILENAME_SIZE file_name_rng = Range(file_name_rng_start, file_name_rng_end) - self.add(file_name_rng) + if self.client_is_async: + self.add_async(file_name_rng) + else: + self.add(file_name_rng) file_name_b = self.active_range_response.read().rstrip(b"\x00") if file_name_b == b"": raise StopIteration("Expected file name, got padding bytes") @@ -167,7 +174,10 @@ def read_file_size(self, start_pos_offset: int = 0) -> int: file_size_rng_start = start_pos_offset + self.data.HEADER._H_FILE_SIZE_START file_size_rng_end = file_size_rng_start + self.data.HEADER._H_FILE_SIZE_SIZE file_size_rng = Range(file_size_rng_start, file_size_rng_end) - self.add(file_size_rng) + if self.client_is_async: + self.add_async(file_size_rng) + else: + self.add(file_size_rng) file_size_b = self.active_range_response.read() file_size = int(file_size_b, 8) # convert octal number from bitstring return file_size @@ -175,7 +185,10 @@ def read_file_size(self, start_pos_offset: int = 0) -> int: def add_file_ranges(self): for tf_info in self.tarred_files: assert tf_info.filename is not None - self.add(tf_info.file_range, name=tf_info.filename) + if self.client_is_async: + self.add_async(tf_info.file_range, name=tf_info.filename) + else: + self.add(tf_info.file_range, name=tf_info.filename) @property def filename_list(self) -> list[str]: diff --git a/src/range_streams/codecs/zip/stream.py b/src/range_streams/codecs/zip/stream.py index 23fca17..3ac0b20 100644 --- a/src/range_streams/codecs/zip/stream.py +++ b/src/range_streams/codecs/zip/stream.py @@ -45,8 +45,9 @@ def __init__( byte_range: Range | tuple[int, int] = Range("[0, 0)"), pruning_level: int = 0, single_request: bool = False, - scan_contents: bool = True, + force_async: bool = False, chunk_size: int | None = None, + scan_contents: bool = True, ): """ Set up a stream for the ZIP archive at ``url``, with either an initial range to @@ -90,10 +91,13 @@ def __init__( single_request : (:class:`bool`) Whether to use a single GET request and just add 'windows' onto this rather than create multiple partial content requests. - scan_contents : (:class:`bool`) Whether to scan the archive contents - upon initialisation and add the archive's file ranges + force_async : (:class:`bool` | ``None``) Whether to require the client + to be ``httpx.AsyncClient``, and if no client is given, + to create one on initialisation. (Experimental/WIP) chunk_size : (:class:`int` | ``None``) The chunk size used for the ``httpx.Response.iter_raw`` response byte iterators + scan_contents : (:class:`bool`) Whether to scan the archive contents + upon initialisation and add the archive's file ranges """ super().__init__( url=url, @@ -101,6 +105,8 @@ def __init__( byte_range=byte_range, pruning_level=pruning_level, single_request=single_request, + force_async=force_async, + chunk_size=chunk_size, ) self.data = ZipData() if scan_contents: @@ -110,7 +116,10 @@ def __init__( def check_head_bytes(self): start_sig = self.data.LOC_F_H.start_sig head_byte_range = Range(0, len(start_sig)) - self.add(head_byte_range) + if self.client_is_async: + self.add_async(head_byte_range) + else: + self.add(head_byte_range) start_bytes = self.active_range_response.read() if start_bytes != start_sig: # pragma: no cover # Actually think this will be if zip is empty @@ -126,7 +135,10 @@ def check_end_of_central_dir_start(self): """ eocd_rng = self.total_range eocd_rng.start = eocd_rng.end - self.data.E_O_CTRL_DIR_REC.get_size() - self.add(eocd_rng) + if self.client_is_async: + self.add_async(eocd_rng) + else: + self.add(eocd_rng) eocd_bytes = self.active_range_response.read() start_sig = self.data.E_O_CTRL_DIR_REC.start_sig start_found = eocd_bytes[: len(start_sig)] == start_sig @@ -146,7 +158,10 @@ def check_end_of_central_dir_rec(self): self.check_end_of_central_dir_start() eocd_rng = self.total_range eocd_rng.start = self.data.E_O_CTRL_DIR_REC.start_pos - self.add(eocd_rng) + if self.client_is_async: + self.add_async(eocd_rng) + else: + self.add(eocd_rng) b = self.active_range_response.read()[: self.data.E_O_CTRL_DIR_REC.get_size()] u = struct.unpack(self.data.E_O_CTRL_DIR_REC.struct, b) _ECD_ENTRIES_TOTAL = 4 @@ -173,7 +188,10 @@ def check_central_dir_rec(self): cd_size = self.data.CTRL_DIR_REC.get_size() cd_end = cd_start + cd_size cd_rng = Range(cd_start, cd_end) - self.add(cd_rng) + if self.client_is_async: + self.add_async(cd_rng) + else: + self.add(cd_rng) cd_bytes = self.active_range_response.read() u = struct.unpack(self.data.CTRL_DIR_REC.struct, cd_bytes[:cd_size]) zf_info = ZippedFileInfo.from_central_directory_entry(u) @@ -183,25 +201,31 @@ def check_central_dir_rec(self): raise ValueError(f"Bad Central Directory signature at {cd_start}") fn_len = zf_info.filename_length fn_rng = Range(cd_end, cd_end + fn_len) - self.add(fn_rng) + if self.client_is_async: + self.add_async(fn_rng) + else: + self.add(fn_rng) filename = self.active_range_response.read() flags = zf_info.flags if flags & 0x800: # pragma: no cover # UTF-8 file names extension - filename = filename.decode("utf-8") + fn_str = filename.decode("utf-8") else: # Historical ZIP filename encoding - filename = filename.decode("cp437") + fn_str = filename.decode("cp437") extra_len = zf_info.extra_field_length comment_len = zf_info.comment_length cd_read_offset += cd_size + fn_len + extra_len + comment_len - zf_info = ZippedFileInfo.from_central_directory_entry(u, filename=filename) + zf_info = ZippedFileInfo.from_central_directory_entry(u, filename=fn_str) self.zipped_files.append(zf_info) return def add_file_ranges(self): for zf_info in self.zipped_files: - self.add(zf_info.file_range, name=zf_info.filename) + if self.client_is_async: + self.add_async(zf_info.file_range, name=zf_info.filename) + else: + self.add(zf_info.file_range, name=zf_info.filename) def get_central_dir_bytes(self, step=20): """ @@ -215,13 +239,19 @@ def get_central_dir_bytes(self, step=20): self.check_end_of_central_dir_rec() pre_eocd = self.data.E_O_CTRL_DIR_REC.start_pos cent_dir_rng = Range(pre_eocd - step, pre_eocd) - self.add(cent_dir_rng) + if self.client_is_async: + self.add_async(cent_dir_rng) + else: + self.add(cent_dir_rng) target = self.data.CTRL_DIR_REC.start_sig byte_cache = b"" cd_byte_store = b"" cache_miss_size = len(target) - 1 while cent_dir_rng.start > 0: - self.add(cent_dir_rng) + if self.client_is_async: + self.add_async(cent_dir_rng) + else: + self.add(cent_dir_rng) cd_bytes = self.active_range_response.read() cd_byte_store = cd_bytes + cd_byte_store byte_cache = cd_bytes + byte_cache[:cache_miss_size] @@ -305,7 +335,10 @@ def decompress_zipped_file( assert method is not None # because mypy can't follow my logic zf_rng = zf_info.file_range if zf_rng not in self.ranges: # pragma: no cover - self.add(zf_rng) + if self.client_is_async: + self.add_async(zf_rng) + else: + self.add(zf_rng) else: self.set_active_range(zf_rng) zf_bytes = self.active_range_response.read() diff --git a/src/range_streams/overlaps.py b/src/range_streams/overlaps.py index e60178e..8ba19b5 100644 --- a/src/range_streams/overlaps.py +++ b/src/range_streams/overlaps.py @@ -48,9 +48,6 @@ def overlap_whence( Note: same convention as Python io module's :obj:`~io.SEEK_SET`, :obj:`~io.SEEK_CUR`, and :obj:`~io.SEEK_END`. """ - # if rng.start == 3 and rng.end == 7: - # breakpoint() - # print(f"Hit {rng=}") if rng in rng_dict: # Full overlap (i.e. in middle of pre-existing range) whence = 1 # type: int | None diff --git a/src/range_streams/request.py b/src/range_streams/request.py index 8938841..41013a3 100644 --- a/src/range_streams/request.py +++ b/src/range_streams/request.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Iterator +from typing import TYPE_CHECKING, AsyncIterator, Iterator MYPY = False # when using mypy will be overrided as True if MYPY or not TYPE_CHECKING: # pragma: no cover @@ -23,7 +23,10 @@ class RangeRequest: :meth:`~range_streams.response.RangeResponse.iter_raw`] on the underlying ``httpx.Response``, suitable for :class:`~range_streams.response.RangeResponse` - to wrap in a :class:`io.BytesIO` buffered stream. + to wrap in a :class:`io.BytesIO` buffered stream. For async clients, + :attr:`~range_streams.response.RangeResponse._aiterator` is set instead + [giving access to + :meth:`~range_streams.response.RangeResponse.aiter_raw`] on the """ def __init__( @@ -58,7 +61,8 @@ def __init__( simulated request). Any read operations will be restricted to this range of positions (as the underlying stream being 'windowed' is a larger one). - chunk_size : The chunk size to the ``httpx.Response.iter_raw`` iterator + chunk_size : The chunk size to the ``httpx.Response.iter_raw`` iterator (or + ``httpx.Response.aiter_raw`` if using an async client) """ self.range = byte_range self.url = url @@ -76,13 +80,38 @@ def __init__( self._check_resp_req() # Sphinx typing workaround # This shouldn't need to be accessed but set it to be thorough self.content_range = f"{self.range_header}/{range_len(byte_range)}" - # self._iterator = None # must overwrite after initialisation - self._iterator = None if self.is_windowed else self.iter_raw() + if isinstance(self.client, httpx.AsyncClient): + # Note: _aiter_raw is 'stored' uncalled as cannot await here (not async) + # The Callable becomes a Coroutine once `await_aiterator` called + self._aiterator_preinit = None if self.is_windowed else self.aiter_raw + else: + self._iterator = None if self.is_windowed else self.iter_raw() else: # Make and send a partial range request self.setup_stream() self.content_range = self.content_range_header() - self._iterator = self.iter_raw() + if self.client_is_async: + # Note: _aiter_raw is 'stored' uncalled as cannot await here (not async) + self._aiterator_preinit = self.aiter_raw + else: + self._iterator = self.iter_raw() + + async def await_aiterator(self) -> None: + """ + Initialise the async iterator on the + :attr:`~range_streams.response.RangeResponse._aiterator` attribute from the + stored function which when called returns the ``typing.AsyncIterator[bytes]``. + """ + assert self._aiterator_preinit is not None + self._aiterator = await self._aiterator_preinit() + + @property + def client_is_async(self): + return isinstance(self.client, httpx.AsyncClient) + + @property + def aiterator_initialised(self): + return self.client_is_async and hasattr(self, "_aiterator") @classmethod def windowed_request( @@ -104,7 +133,8 @@ def windowed_request( on_request : The sent ``httpx.Request`` tail_mark : The :attr:`~range_streams.response.RangeResponse.tail_mark` to trim the ``byte_range`` (if any). Passed separately - chunk_size : The chunk size to the ``httpx.Response.iter_raw`` iterator + chunk_size : The chunk size to the ``httpx.Response.iter_raw`` iterator (or + ``httpx.Response.aiter_raw`` if using an async client) """ window_range = Range(byte_range.start, byte_range.end - tail_mark) # Build the request that this object pretends to have sent @@ -118,21 +148,6 @@ def windowed_request( total_content_length = range_request.total_content_length window_on_range = range_request.range window_len = range_len(window_range) - # Avoid having to import ``httpx.Response`` by calling ``type`` on one - # HttpxResp_cls = type(range_request.response) - # windowed_response = HttpxResp_cls( - # status_code=206, # Partial Content - # headers={ - # "accept-ranges": "bytes", - # "content-length": str(window_len), - # "content-range": f"{content_byte_range}/{total_content_length}", - # }, - # stream=None, # do not consume the stream the window is being placed onto - # #stream=range_request.response.stream, # this consumes the parent's stream! - # request=unsent_request, - # ) - # When iterating the stream via iter_raw, supply the source stream's iterator! - # windowed_response.iter_raw = range_request.response.iter_raw windowed_response = range_request.response windowed_range_request = cls( byte_range=window_range, @@ -143,7 +158,14 @@ def windowed_request( ) # Calling ``response.iter_raw()`` again raises ``httpx.StreamConsumed`` error # so simply overwrite after initialisation with existing RangeRequest iterator - windowed_range_request._iterator = range_request._iterator + if range_request.client_is_async: + if not range_request.aiterator_initialised: + msg = "aiterator is not initialised" + msg += ": `await_aiterator` after instantiating an async RangeRequest" + raise ValueError(msg) + windowed_range_request._aiterator = range_request._aiterator + else: + windowed_range_request._iterator = range_request._iterator return windowed_range_request @classmethod @@ -176,7 +198,6 @@ def from_get_stream( GET_got=(req, resp), chunk_size=chunk_size, ) - # range_request._iterator = resp.iter_raw() return range_request @property @@ -230,6 +251,14 @@ def iter_raw(self) -> Iterator[bytes]: """ return self.response.iter_raw(chunk_size=self.chunk_size) + async def aiter_raw(self) -> AsyncIterator[bytes]: + """ + Wrap the :meth:`iter_raw` method of the underlying :class:`httpx.Response` + object within the :class:`~range_streams.response.RangeResponse` in + :attr:`~range_streams.request.RangeRequest.response`. + """ + return self.response.aiter_raw(chunk_size=self.chunk_size) + def close(self) -> None: """ Close the :attr:`~range_streams.request.RangeRequest.response` diff --git a/src/range_streams/response.py b/src/range_streams/response.py index 6a23cde..6aff55f 100644 --- a/src/range_streams/response.py +++ b/src/range_streams/response.py @@ -128,17 +128,40 @@ def is_active_buf_range(self) -> bool: """ return self._bytes.active_buf_range == self.request.range + def verify_sync(self, msg=""): + if self.parent_stream.client_is_async: + raise ValueError(f"Synchronous client check failed{msg}") + + def verify_async(self, msg=""): + if not self.parent_stream.client_is_async: + raise ValueError(f"Asynchronous client check failed{msg}") + @property def source_iterator(self): """ The iterator associated with the source range, for a windowed range. """ + self.verify_sync(msg=" when accessing source_iterator property") return self.source_range_response.request._iterator @property def _iterator(self): + self.verify_sync(msg=" when accessing iterator property") return self.source_iterator if self.is_windowed else self.request._iterator + @property + def source_aiterator(self): + """ + The async iterator associated with the source range, for a windowed range. + """ + self.verify_async(msg=" when accessing source_aiterator property") + return self.source_range_response.request._aiterator + + @property + def _aiterator(self): + self.verify_async(msg=" when accessing aiterator property") + return self.source_aiterator if self.is_windowed else self.request._aiterator + def check_is_windowed(self) -> bool: """ Whether the associated request is windowed. @@ -226,7 +249,7 @@ def window_offset(self) -> int: has_offset = self.is_windowed and self.request.range > self.source_range return self.request.range.start - self.source_range.start if has_offset else 0 - def prepare_reading_window(self): + def prepare_reading_window(self) -> None: """ Prepare the stream cursor for reading (unclear if this should only be done on initialisation...) Should be done every time if the cursor is shared, but is it? @@ -239,7 +262,7 @@ def prepare_reading_window(self): print("\n---READ_READY removed\n---") @property - def client(self): + def client(self): # Returns: httpx.Client | httpx.AsyncClient """ The request's client. """ @@ -266,6 +289,7 @@ def _load_all(self) -> None: If seeking on a windowed range, then 'loading all' will not really load to the end of the stream, just the end of the window onto it. """ + self.verify_sync(msg=" when loading all") self.buf_keep() if self.is_windowed: # Would need to offset this if source range is non-total range @@ -278,7 +302,26 @@ def _load_all(self) -> None: self._bytes.write(chunk) self.store_tell() - def _load_until(self, goal_position): + async def _aload_all(self) -> None: + """ + If seeking on a windowed range, then 'loading all' will not really + load to the end of the stream, just the end of the window onto it. + """ + self.verify_async(msg=" when loading all") + self.buf_keep() + if self.is_windowed: + # Would need to offset this if source range is non-total range + # (also may need to take into account tail-mark for windows?) + window_end = self.request.range.end + await self._aload_until(window_end) + else: + self._bytes.seek(0, SEEK_END) + async for chunk in self._aiterator: + self._bytes.write(chunk) + self.store_tell() + + def _load_until(self, goal_position: int) -> None: + self.verify_sync(msg=f" when loading until {goal_position}") self.buf_keep() current_position = self._bytes.seek(0, SEEK_END) while current_position < goal_position: @@ -288,6 +331,18 @@ def _load_until(self, goal_position): break self.store_tell() + async def _aload_until(self, goal_position: int) -> None: + self.verify_async(msg=f" when loading until {goal_position}") + self.buf_keep() + current_position = self._bytes.seek(0, SEEK_END) + while current_position < goal_position: + try: + awaited_bytes = await self._aiterator.__anext__() + current_position += self._bytes.write(awaited_bytes) + except StopAsyncIteration: + break + self.store_tell() + def tell(self) -> int: """ File-like tell (position indicator) within the range request stream. @@ -307,19 +362,27 @@ def tell(self) -> int: print(f"{t=} (plain tell)") return t - def read(self, size=None): + def _prepare_to_read(self) -> int: """ - File-like reading within the range request stream, with careful handling of - windowed ranges and tail marks. + Called at the start of :meth:`~range_streams.response.RangeResponse.read` to + ensure the reading window is prepared (on the first read of a windowed range) + and acquire the starting position. """ self.buf_keep() if DEBUG_VERBOSE: print(f"Reading {self.request.range}") - # ... if not self.read_ready: # Only run on the first use after init self.prepare_reading_window() - left_off_at = self._bytes.tell() + return self._bytes.tell() + + def read(self, size: int | None = None) -> bytes: + """ + File-like reading within the range request stream, with careful handling of + windowed ranges and tail marks. + """ + self.verify_sync(msg=f" when reading {size} bytes") + left_off_at = self._prepare_to_read() if size is None: self._load_all() else: @@ -329,7 +392,34 @@ def read(self, size=None): # Probably overshoots the cursor (loads a chunk at a time) self._load_until(goal_position) - # Rewind the cursor to the start position now the bytes to read are loaded + read_bytes = self._get_read_bytes(size=size, left_off_at=left_off_at) + return read_bytes + + async def aread(self, size: int | None = None) -> bytes: + """ + File-like reading within the range request stream, with careful handling of + windowed ranges and tail marks. + """ + self.verify_async(msg=f" when reading {size} bytes") + left_off_at = self._prepare_to_read() + if size is None: + await self._aload_all() + else: + goal_position = left_off_at + size + if DEBUG_VERBOSE: + print(f"{goal_position=} = {left_off_at=} + {size=}") + # Probably overshoots the cursor (loads a chunk at a time) + await self._aload_until(goal_position) + read_bytes = self._get_read_bytes(size=size, left_off_at=left_off_at) + return read_bytes + + def _get_read_bytes(self, size: int | None, left_off_at: int) -> bytes: + """ + Called at the end of :meth:`~range_streams.response.RangeResponse.read` and + :meth:`~range_streams.response.RangeResponse.aread` to rewind the cursor to the + starting position after the bytes to read are loaded [from the a/sync iterator], + read said bytes and return them (ensuring to store the final cursor position). + """ self._bytes.seek(left_off_at) if self.is_windowed: # Convert absolute window end to relative offset on source range @@ -345,13 +435,17 @@ def read(self, size=None): self.store_tell() return read_bytes - def seek(self, position, whence=SEEK_SET): + def seek(self, position: int, whence=SEEK_SET): """ - File-like seeking within the range request stream. + File-like seeking within the range request stream. Synchronous only. """ + msg = "No negative seek so `RangeResponse.seek` is synchronous (try `load_all`)" self.buf_keep() if whence == SEEK_END: - self._load_all() + if self.request.client_is_async: + raise NotImplementedError(msg) + else: + self._load_all() if self.is_windowed: position = position + self.window_offset self._bytes.seek(position, whence) @@ -412,4 +506,13 @@ def close(self): Close the associated ``httpx.Response`` object. In single request mode, there is just the one (shared with all the 'windowed' responses). """ + self.verify_sync(msg=f" when closing the request response on {self}") self.request.response.close() + + async def aclose(self): + """ + Close the associated ``httpx.Response`` object. In single request mode, there is + just the one (shared with all the 'windowed' responses). + """ + self.verify_async(msg=f" when closing the request response on {self}") + self.request.response.aclose() diff --git a/src/range_streams/stream.py b/src/range_streams/stream.py index b486a74..28380ad 100644 --- a/src/range_streams/stream.py +++ b/src/range_streams/stream.py @@ -14,7 +14,7 @@ from copy import deepcopy from io import SEEK_SET from pathlib import Path -from typing import TYPE_CHECKING, Coroutine +from typing import TYPE_CHECKING, Callable, Coroutine, Type from urllib.parse import urlparse MYPY = False # when using mypy will be overrided as True @@ -23,6 +23,7 @@ from ranges import Range, RangeDict +from .async_utils import AsyncFetcher from .http_utils import detect_header_value, range_header from .overlaps import get_range_containing, overlap_whence from .range_utils import ( @@ -636,6 +637,7 @@ async def get_async_monostream(self) -> None: resp=resp, chunk_size=self.chunk_size, ) + await range_req.await_aiterator() # Initialise its async stream iterator # then just use the req to create a RangeResponse and register as usual resp = RangeResponse(stream=self, range_request=range_req, range_name="") @@ -869,3 +871,25 @@ def close(self): """ for range_response in self._ranges.values(): range_response.close() + + @classmethod + def make_async_fetcher( + cls, + urls: list[str], + callback: Callable | None = None, + verbose: bool = False, + show_progress_bar: bool = True, + timeout_s: float = 5.0, + client=None, + **kwargs, + ): + return AsyncFetcher( + stream_cls=cls, + urls=urls, + callback=callback, + verbose=verbose, + show_progress_bar=show_progress_bar, + timeout_s=timeout_s, + client=client, + **kwargs, # Any other kwargs can be passed through to RangeStream subclass + ) diff --git a/src/range_streams/types.py b/src/range_streams/types.py new file mode 100644 index 0000000..1e54456 --- /dev/null +++ b/src/range_streams/types.py @@ -0,0 +1,11 @@ +from __future__ import annotations + +from typing import TypeVar + +import range_streams + +all__ = ["_T"] + +_T = TypeVar( + "_T", bound="range_streams.stream.RangeStream" +) # RangeStream or a subclass diff --git a/tests/async_test.py b/tests/async_test.py index 44e8eff..069e7b4 100644 --- a/tests/async_test.py +++ b/tests/async_test.py @@ -1,4 +1,5 @@ import asyncio +from functools import partial from signal import SIGINT from pytest import fixture, mark, raises @@ -6,13 +7,16 @@ from range_streams import _EXAMPLE_PNG_URL, _EXAMPLE_ZIP_URL, RangeStream from range_streams.async_utils import AsyncFetcher, SignalHaltError +from range_streams.codecs import PngStream -from .data import EXAMPLE_FILE_LENGTH, EXAMPLE_URL +from .data import EXAMPLE_FILE_LENGTH, EXAMPLE_SMALL_PNG_URL, EXAMPLE_URL # https://tonybaloney.github.io/posts/async-test-patterns-for-pytest-and-unittest.html THREE_URLS = [EXAMPLE_URL, _EXAMPLE_PNG_URL, _EXAMPLE_ZIP_URL] +default_kwargs = dict(stream_cls=RangeStream, show_progress_bar=False) + class CallbackMutatedClass: values = [] @@ -25,56 +29,120 @@ def reset(cls): cls.values = [] -async def demo_callback_func(fetcher, range_stream, url): +async def url_callback_func(fetcher, range_stream, url): + """ + Async function which puts the URL onto the storage class's list of values + """ return CallbackMutatedClass.values.append(url) +async def stream_callback_func(fetcher, range_stream, url): + """ + Async function which puts the stream object onto the storage class's list of values + """ + return CallbackMutatedClass.values.append(range_stream) + + +async def read_png_callback_func(fetcher, png_stream, url): + """ + Async function which puts the stream object onto the storage class's list of values + """ + await png_stream.enumerate_chunks_async() + # await png_stream.scan_ihdr_async() + return CallbackMutatedClass.values.append(png_stream) + + async def sigint_callback_func(fetcher, range_stream, url): """ Mimic the act of sending the signal interrupt by raising it in a callback """ - await demo_callback_func(fetcher, range_stream, url) + await url_callback_func(fetcher, range_stream, url) # raise KeyboardInterrupt ? loop = asyncio.get_running_loop() fetcher.immediate_exit(signal_enum=SIGINT, loop=loop) -@mark.parametrize("callback", [None, demo_callback_func]) +@mark.parametrize("cb", [None, url_callback_func]) @mark.parametrize("verbose", [True, False]) @mark.parametrize("error_msg", ["The list of URLs to fetch cannot be empty"]) @mark.parametrize("urls", [([]), (THREE_URLS)]) -def test_fetcher(urls, error_msg, verbose, callback): +def test_fetcher(urls, error_msg, verbose, cb): """ Fetch lists of 0 or 3 URLs asynchronously, with/out a callback, verbosely/quietly. """ - args = dict(callback=callback, urls=urls, verbose=verbose, show_progress_bar=False) + kwargs = dict(**default_kwargs, callback=cb, urls=urls, verbose=verbose) if urls == []: with raises(ValueError, match=error_msg): - fetched = AsyncFetcher(**args) + fetched = AsyncFetcher(**kwargs) else: - fetched = AsyncFetcher(**args) + fetched = AsyncFetcher(**kwargs) fetched.make_calls() - expected_values = set() if callback is None else set(urls) + expected_values = set() if cb is None else set(urls) stored_urls = getattr(CallbackMutatedClass, "values") assert set(stored_urls) == set(expected_values) CallbackMutatedClass.reset() -@mark.parametrize("callback", [sigint_callback_func]) +@mark.parametrize("cb", [sigint_callback_func]) @mark.parametrize("error_msg", ["The list of URLs to fetch cannot be empty"]) @mark.parametrize("urls", [(THREE_URLS)]) -def test_fetcher_sigint(urls, error_msg, callback): +def test_fetcher_sigint(urls, error_msg, cb): """ Fetch lists of 3 URLs asynchronously, with/out a callback, verbosely/quietly. Cannot figure out how to emulate passing the SIGINT from this test so can't catch, best I can do here is to check that the loop is stopped at the first callback when ``immediate_exit`` is called. """ - args = dict(callback=callback, urls=urls, show_progress_bar=False) - fetched = AsyncFetcher(**args) + kwargs = dict(**default_kwargs, callback=cb, urls=urls, verbose=False) + fetched = AsyncFetcher(**kwargs) # with raises(SignalHaltError, match=error_msg): fetched.make_calls() stored_urls = getattr(CallbackMutatedClass, "values") assert len(stored_urls) == 1 assert set(stored_urls) < set(urls) CallbackMutatedClass.reset() + + +@mark.parametrize("stream_cls", [RangeStream, PngStream]) +@mark.parametrize("cb", [None, stream_callback_func]) +@mark.parametrize("error_msg", ["The list of URLs to fetch cannot be empty"]) +@mark.parametrize( + "urls", + [ + ([]), + ([_EXAMPLE_PNG_URL, EXAMPLE_SMALL_PNG_URL]), + ], +) +def test_fetcher_classmethod(urls, error_msg, cb, stream_cls): + """ + Fetch lists of 0 or 2 URLs asynchronously, with/out a callback, using the + classmethod constructor of the ``stream_cls`` (RangeStream or a subclass). + """ + kwargs = dict(callback=cb, urls=urls, show_progress_bar=False, verbose=False) + if urls == []: + with raises(ValueError, match=error_msg): + fetched = stream_cls.make_async_fetcher(**kwargs) + else: + fetched = stream_cls.make_async_fetcher(**kwargs) + fetched.make_calls() + expected_values = set() if cb is None else set([stream_cls]) + stored_classes = list(map(type, getattr(CallbackMutatedClass, "values"))) + assert set(stored_classes) == set(expected_values) + CallbackMutatedClass.reset() + + +@mark.parametrize("cb", [read_png_callback_func]) +@mark.parametrize("urls", [([_EXAMPLE_PNG_URL, EXAMPLE_SMALL_PNG_URL])]) +def test_fetcher_classmethod_read_png(urls, cb): + """ + Fetch list of 2 PNG URLs asynchronously, with a callback that reads them, using the + ``make_async_fetcher`` classmethod constructor. + """ + kwargs = dict(callback=cb, urls=urls, show_progress_bar=False, verbose=False) + stream_cls = PngStream + fetched = stream_cls.make_async_fetcher(**kwargs) + fetched.make_calls() + expected_values = set() if cb is None else set([stream_cls]) + stored_classes = list(map(type, getattr(CallbackMutatedClass, "values"))) + assert set(stored_classes) == set(expected_values) + CallbackMutatedClass.reset() diff --git a/tests/data.py b/tests/data.py index 26dccf7..d477367 100644 --- a/tests/data.py +++ b/tests/data.py @@ -1,4 +1,7 @@ -EXAMPLE_URL = ( - "https://github.com/lmmx/range-streams/raw/master/data/example_text_file.txt" -) +EXAMPLE_URL = "https://raw.githubusercontent.com/lmmx/range-streams/master/data/example_text_file.txt" + EXAMPLE_FILE_LENGTH = 11 + +EXAMPLE_SMALL_PNG_URL = ( + "https://raw.githubusercontent.com/lmmx/range-streams/master/data/red_square.png" +) diff --git a/tests/range_stream_core_test.py b/tests/range_stream_core_test.py index ab728fd..33b0a84 100644 --- a/tests/range_stream_core_test.py +++ b/tests/range_stream_core_test.py @@ -121,7 +121,7 @@ def test_range_update(full_range_stream_fresh): def test_range_stream_repr(full_range_stream): assert f"{full_range_stream!r}" == ( - "RangeStream ⠶ [0, 11) @@ 'example_text_file.txt' from github.com" + "RangeStream ⠶ [0, 11) @@ 'example_text_file.txt' from raw.githubusercontent.com" ) diff --git a/tests/response_test.py b/tests/response_test.py index 8f8bbee..85882db 100644 --- a/tests/response_test.py +++ b/tests/response_test.py @@ -65,7 +65,7 @@ def test_response(example_response): def test_response_repr(example_response): print(f"{example_response!r}") assert f"{example_response!r}" == ( - "RangeResponse ⠶ [0, 1) @ 'example_text_file.txt' from github.com" + "RangeResponse ⠶ [0, 1) @ 'example_text_file.txt' from raw.githubusercontent.com" ) @@ -118,7 +118,7 @@ def test_example_response_seek_tell(example_response, seek, whence, expected): (-4, SEEK_END, 7), ], ) -def test_full_response_seek_tell(seek, whence, expected): +def test_full_response_seek_tell(seek, whence, expected, empty_range_stream): req = make_request(0, EXAMPLE_FILE_LENGTH) full_response = RangeResponse(stream=empty_range_stream, range_request=req) full_response.seek(position=seek, whence=whence)