Skip to content

Commit

Permalink
Full implemention of async fetch handler for URL lists; codecs; tests (
Browse files Browse the repository at this point in the history
…closes #36)
  • Loading branch information
lmmx committed Aug 12, 2021
1 parent 68604a7 commit a28c578
Show file tree
Hide file tree
Showing 17 changed files with 498 additions and 114 deletions.
15 changes: 15 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,21 @@ These helper functions help prepare HTTP requests to set up a stream.
:show-inheritance:


Asynchronous fetcher
====================

This helper class handles all of the details of asynchronously
fetching streams, given a list of URLs.

----


.. automodule:: range_streams.async_utils
:members:
:undoc-members:
:show-inheritance:


Overlap handling
=================

Expand Down
6 changes: 3 additions & 3 deletions docs/overview.rst
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ Example
.. code:: py
RangeDict{
RangeSet{Range[0, 3)}: RangeResponse ⠶ [0, 3) @ 'example_text_file.txt' from github.com
RangeSet{Range[0, 3)}: RangeResponse ⠶ [0, 3) @ 'example_text_file.txt' from raw.githubusercontent.com
}
Further ranges are requested by simply calling ``RangeStream.add`` with
Expand All @@ -71,6 +71,6 @@ interval.
.. code:: py
RangeDict{
RangeSet{Range[0, 3)}: RangeResponse ⠶ [0, 3) @ 'example_text_file.txt' from github.com,
RangeSet{Range[7, 9)}: RangeResponse ⠶ [7, 9) @ 'example_text_file.txt' from github.com
RangeSet{Range[0, 3)}: RangeResponse ⠶ [0, 3) @ 'example_text_file.txt' from raw.githubusercontent.com,
RangeSet{Range[7, 9)}: RangeResponse ⠶ [7, 9) @ 'example_text_file.txt' from raw.githubusercontent.com
}
8 changes: 4 additions & 4 deletions src/range_streams/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
>>> rng = Range(0,3) # doctest: +SKIP
>>> s.add(rng) # doctest: +SKIP
>>> s.ranges # doctest: +SKIP
RangeDict{RangeSet{Range[0, 3)}: RangeResponse ⠶ [0, 3) @ 'example_text_file.txt' from github.com}
RangeDict{RangeSet{Range[0, 3)}: RangeResponse ⠶ [0, 3) @ 'example_text_file.txt' from raw.githubusercontent.com}
Once a request is made for a non-empty range, the :class:`~range_streams.stream.RangeStream`
acquires the first entry in the :class:`~ranges.RangeDict` stored on the
Expand All @@ -58,8 +58,8 @@
>>> s.add(byte_range=(7,9)) # doctest: +SKIP
>>> s.ranges # doctest: +SKIP
RangeDict{
RangeSet{Range[0, 3)}: RangeResponse ⠶ [0, 3) @ 'example_text_file.txt' from github.com,
RangeSet{Range[7, 9)}: RangeResponse ⠶ [7, 9) @ 'example_text_file.txt' from github.com
RangeSet{Range[0, 3)}: RangeResponse ⠶ [0, 3) @ 'example_text_file.txt' from raw.githubusercontent.com,
RangeSet{Range[7, 9)}: RangeResponse ⠶ [7, 9) @ 'example_text_file.txt' from raw.githubusercontent.com
}
Codecs are available for ``.zip`` (:class:`~range_streams.codecs.zip.ZipStream`) and ``.conda``
Expand All @@ -71,7 +71,7 @@
>>> s = ZipStream(url=_EXAMPLE_ZIP_URL) # doctest: +SKIP
>>> s.ranges # doctest: +SKIP
RangeDict{
RangeSet{Range[51, 62)}: RangeResponse ⠶ "example_text_file.txt" [51, 62) @ 'example_text_file.txt.zip' from github.com
RangeSet{Range[51, 62)}: RangeResponse ⠶ "example_text_file.txt" [51, 62) @ 'example_text_file.txt.zip' from raw.githubusercontent.com
}
The ``.conda`` format is just a particular type of zip for Python packages on the conda
Expand Down
35 changes: 25 additions & 10 deletions src/range_streams/async_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from functools import partial
from signal import SIGINT, SIGTERM, Signals
from sys import stderr
from typing import TYPE_CHECKING, Callable, Coroutine, Iterator
from typing import TYPE_CHECKING, Callable, Coroutine, Iterator, Type

from aiostream import stream
from ranges import Range, RangeSet
Expand All @@ -19,29 +19,35 @@
from tqdm.asyncio import tqdm_asyncio

from .log_utils import log, set_up_logging
from .stream import RangeStream
from .types import _T as RangeStreamOrSubclass

__all__ = ["SignalHaltError", "AsyncFetcher"]


class AsyncFetcher:
def __init__(
self,
stream_cls: Type[RangeStreamOrSubclass],
urls: list[str],
callback: Callable | None = None,
verbose: bool = False,
show_progress_bar: bool = True,
timeout_s: float = 5.0,
client=None,
**kwargs,
):
"""
Any kwargs are passed through to the stream class constructor.
Args:
callback : A function to be passed 3 values: the AsyncFetcher which is calling
it, the awaited RangeStream, and its source URL (a ``httpx.URL``,
which can be coerced to a string).
"""
if urls == []:
raise ValueError("The list of URLs to fetch cannot be empty")
self.stream_cls = stream_cls
self.stream_cls_kwargs = kwargs
self.url_list = urls
self.callback = callback
self.n = len(urls)
Expand All @@ -64,21 +70,21 @@ def make_calls(self):
if self.show_progress_bar:
self.pbar.close()

async def process_stream(self, rstream: RangeStream):
async def process_stream(self, range_stream: RangeStreamOrSubclass):
"""
Process an awaited RangeStream within an async fetch loop, calling the callback
set on the `~range_streams.async_utils.AsyncFetcher.callback` attribute.
Args:
rstream : The awaited RangeStream
range_stream : The awaited RangeStream (or one of its subclasses)
"""
monostream_response = rstream._ranges[rstream.total_range]
monostream_response = range_stream._ranges[range_stream.total_range]
resp = monostream_response.request.response # httpx.Response
source_url = resp.history[0].url if resp.history else resp.url
# Map the response back to the thing it came from in the url_list
i = next(i for (i, u) in enumerate(self.url_list) if source_url == u)
if self.callback is not None:
await self.callback(self, stream, source_url)
await self.callback(self, range_stream, source_url)
if self.verbose:
log.debug(f"Processed URL in async callback: {source_url}")
if self.show_progress_bar:
Expand Down Expand Up @@ -109,9 +115,18 @@ def fetch_things(self, urls: Iterator[str]):
self.pbar.disable = True
self.pbar.close()

async def fetch(self, client: httpx.AsyncClient, url: httpx.URL) -> RangeStream:
s = RangeStream(
url=str(url), client=client, single_request=True, force_async=True
async def fetch(self, client, url) -> RangeStreamOrSubclass:
"""
Args:
client : ``httpx.AsyncClient``
url : ``httpx.URL``
"""
s = self.stream_cls(
url=str(url),
client=client,
single_request=True,
force_async=True,
**self.stream_cls_kwargs,
)
await s.add_async()
return s
Expand All @@ -137,7 +152,7 @@ async def async_fetch_urlset(
" after using the client in a contextmanager block (which implicitly"
" closes after exiting the block) perhaps?"
)
raise ValueError(msg)
raise ValueError(msg)
# assert self.client is not None # give mypy a clue
processed = await self.fetch_and_process(urls=urls, client=client)
return processed
Expand Down
35 changes: 23 additions & 12 deletions src/range_streams/codecs/conda/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ def __init__(
client=None,
byte_range: Range | tuple[int, int] = Range("[0, 0)"),
pruning_level: int = 0,
scan_contents: bool = True,
single_request: bool = False,
force_async: bool = False,
chunk_size: int | None = None,
scan_contents: bool = True,
):
"""
Set up a stream for the conda (ZIP) archive at ``url``, with either an initial
Expand Down Expand Up @@ -43,23 +45,32 @@ def __init__(
method for further details.
Args:
url : (:class:`str`) The URL of the file to be streamed
client : (:class:`httpx.Client` | ``None``) The HTTPX client
to use for HTTP requests
byte_range : (:class:`~ranges.Range` | ``tuple[int,int]``) The range
of positions on the file to be requested
pruning_level : (:class:`int`) Either ``0`` ('replant'), ``1`` ('burn'),
or ``2`` ('strict')
scan_contents : (:class:`bool`) Whether to scan the archive contents
upon initialisation and add the archive's file ranges
chunk_size : (:class:`int` | ``None``) The chunk size used for the
``httpx.Response.iter_raw`` response byte iterators
url : (:class:`str`) The URL of the file to be streamed
client : (:class:`httpx.Client` | ``None``) The HTTPX client
to use for HTTP requests
byte_range : (:class:`~ranges.Range` | ``tuple[int,int]``) The range
of positions on the file to be requested
pruning_level : (:class:`int`) Either ``0`` ('replant'), ``1`` ('burn'),
or ``2`` ('strict')
single_request : (:class:`bool`) Whether to use a single GET request and
just add 'windows' onto this rather than create multiple
partial content requests.
force_async : (:class:`bool` | ``None``) Whether to require the client
to be ``httpx.AsyncClient``, and if no client is given,
to create one on initialisation. (Experimental/WIP)
chunk_size : (:class:`int` | ``None``) The chunk size used for the
``httpx.Response.iter_raw`` response byte iterators
scan_contents : (:class:`bool`) Whether to scan the archive contents
upon initialisation and add the archive's file ranges
"""
super().__init__(
url=url,
client=client,
byte_range=byte_range,
pruning_level=pruning_level,
single_request=single_request,
force_async=force_async,
chunk_size=chunk_size,
scan_contents=scan_contents,
)
if scan_contents:
Expand Down
78 changes: 70 additions & 8 deletions src/range_streams/codecs/png/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def __init__(
byte_range: Range | tuple[int, int] = Range("[0, 0)"),
pruning_level: int = 0,
single_request: bool = True,
force_async: bool = False,
scan_ihdr: bool = True,
enumerate_chunks: bool = True,
chunk_size: int | None = None,
Expand Down Expand Up @@ -80,6 +81,9 @@ def __init__(
single_request : (:class:`bool`) Whether to use a single GET request and
just add 'windows' onto this rather than create multiple
partial content requests.
force_async : (:class:`bool` | ``None``) Whether to require the client
to be ``httpx.AsyncClient``, and if no client is given,
to create one on initialisation. (Experimental/WIP)
scan_ihdr : (:class:`bool`) Whether to scan the IHDR chunk on
initialisation
enumerate_chunks : (:class:`bool`) Whether to step through each chunk
Expand All @@ -94,12 +98,16 @@ def __init__(
byte_range=byte_range,
pruning_level=pruning_level,
single_request=single_request,
force_async=force_async,
)
if enumerate_chunks:
self.populate_chunks()
self.data = PngData()
if scan_ihdr:
self.scan_ihdr()
if force_async:
self.data = PngData()
else:
if enumerate_chunks:
self.populate_chunks()
self.data = PngData()
if scan_ihdr:
self.scan_ihdr()

def populate_chunks(self):
"""
Expand Down Expand Up @@ -138,7 +146,10 @@ def scan_ihdr(self):
according to the spec.
"""
ihdr_rng = Range(self.data.IHDR.start_pos, self.data.IHDR.end_pos)
self.add(ihdr_rng)
if self.client_is_async:
self.add_async(ihdr_rng)
else:
self.add(ihdr_rng)
ihdr_bytes = self.active_range_response.read()
ihdr_u = struct.unpack(self.data.IHDR.struct, ihdr_bytes)
if None in ihdr_u:
Expand All @@ -151,7 +162,15 @@ def scan_ihdr(self):
self.data.IHDR.filter_method = ihdr_u[self.data.IHDR.parts._IHDR_FILTER_METHOD]
self.data.IHDR.interlacing = ihdr_u[self.data.IHDR.parts._IHDR_INTERLACING]

def enumerate_chunks(self):
def verify_sync(self, msg=""):
if self.client_is_async:
raise ValueError(f"Synchronous client check failed{msg}")

def verify_async(self, msg=""):
if not self.client_is_async:
raise ValueError(f"Asynchronous client check failed{msg}")

def enumerate_chunks(self) -> dict[str, list[PngChunkInfo]]:
"""
Parse the length and type chunks, then skip past the chunk data and CRC chunk,
so as to enumerate all chunks in the PNG (but request and read as little as
Expand All @@ -166,6 +185,7 @@ def enumerate_chunks(self):
Portable_Network_Graphics#%22Chunks%22_within_the_file>`_,
or `the W3C <https://www.w3.org/TR/PNG/#5Chunk-layout>`_).
"""
self.verify_sync(msg=": call `enumerate_chunks_async` on an async PngStream")
png_signature = 8 # PNG files start with an 8-byte signature
chunk_preamble_size = 8 # 4-byte length chunk + 4-byte type chunk
chunks: dict[str, list[PngChunkInfo]] = {}
Expand All @@ -189,8 +209,50 @@ def enumerate_chunks(self):
chunks[chunk_type].append(chunk_info)
return chunks

async def enumerate_chunks_async(self) -> dict[str, list[PngChunkInfo]]:
"""
Parse the length and type chunks, then skip past the chunk data and CRC chunk,
so as to enumerate all chunks in the PNG (but request and read as little as
possible). Build a dictionary of all chunks with keys of the chunk type (four
letter strings) and values of lists (since some chunks e.g. IDAT can appear
multiple times in the PNG).
See `the official specification
<http://www.libpng.org/pub/png/spec/1.2/PNG-Chunks.html>`_ for full details
(or `Wikipedia
<https://en.wikipedia.org/wiki/
Portable_Network_Graphics#%22Chunks%22_within_the_file>`_,
or `the W3C <https://www.w3.org/TR/PNG/#5Chunk-layout>`_).
"""
self.verify_async(msg=": call `enumerate_chunks` on a synchronous PngStream")
png_signature = 8 # PNG files start with an 8-byte signature
chunk_preamble_size = 8 # 4-byte length chunk + 4-byte type chunk
chunks: dict[str, list[PngChunkInfo]] = {}
chunk_start = png_signature # Skip PNG file signature to reach first chunk
chunk_type: str | None = None # initialise for while loop condition
while chunk_type != "IEND":
if chunks:
# Increment chunk_start from last iteration
# (last chunk's end is this chunk's start)
chunk_start = chunk_info.end # type: ignore
chunk_length_rng = Range(chunk_start, chunk_start + chunk_preamble_size)
await self.add_async(chunk_length_rng)
b = await self.active_range_response.aread()
chunk_len = struct.unpack(">I", b[:4])[0]
chunk_type = b[4:].decode("ascii")
assert chunk_type is not None # appease mypy
chunks.setdefault(chunk_type, [])
chunk_info = PngChunkInfo(
start=chunk_start, type=chunk_type, length=chunk_len
)
chunks[chunk_type].append(chunk_info)
return chunks

def get_chunk_data(self, chunk_info: PngChunkInfo) -> bytes:
self.add(chunk_info.data_range)
if self.client_is_async:
self.add_async(chunk_info.data_range)
else:
self.add(chunk_info.data_range)
b = self.active_range_response.read()
return b

Expand Down
Loading

0 comments on commit a28c578

Please sign in to comment.