From affff72cb03a13072c55c84038054cfa10196466 Mon Sep 17 00:00:00 2001 From: Maciej Urbanski Date: Fri, 22 Mar 2024 16:44:13 +0100 Subject: [PATCH] Fix `decode_content=True` causing an error when downloading tiny and large encoded files. --- b2sdk/raw_simulator.py | 42 ++++++++++++++--- b2sdk/transfer/inbound/downloader/abstract.py | 24 +++++++++- b2sdk/transfer/inbound/downloader/parallel.py | 1 + b2sdk/transfer/inbound/downloader/simple.py | 14 +++--- changelog.d/+fix_decode_content.fixed.md | 1 + test/integration/test_download.py | 45 +++++++++---------- 6 files changed, 87 insertions(+), 40 deletions(-) create mode 100644 changelog.d/+fix_decode_content.fixed.md diff --git a/b2sdk/raw_simulator.py b/b2sdk/raw_simulator.py index 2a4cfbdf1..96c5120dc 100644 --- a/b2sdk/raw_simulator.py +++ b/b2sdk/raw_simulator.py @@ -10,6 +10,7 @@ from __future__ import annotations import collections +import dataclasses import io import logging import random @@ -448,25 +449,52 @@ def _get_encryption_mode_and_secret(self, encryption: EncryptionSetting | None): return mode, secret -FakeRequest = collections.namedtuple('FakeRequest', 'url headers') +@dataclasses.dataclass +class FakeRequest: + url: str + headers: CaseInsensitiveDict + + +@dataclasses.dataclass +class FakeRaw: + data_bytes: bytes + _position: int = 0 + + def tell(self): + return self._position + + def read(self, size): + data = self.data_bytes[self._position:self._position + size] + self._position += len(data) + return data class FakeResponse: def __init__(self, account_auth_token_or_none, file_sim, url, range_=None): - self.data_bytes = file_sim.data_bytes + self.raw = FakeRaw(file_sim.data_bytes) self.headers = file_sim.as_download_headers(account_auth_token_or_none, range_) self.url = url self.range_ = range_ if range_ is not None: self.data_bytes = self.data_bytes[range_[0]:range_[1] + 1] + @property + def data_bytes(self): + return self.raw.data_bytes + + @data_bytes.setter + def data_bytes(self, value): + self.raw = FakeRaw(value) + def iter_content(self, chunk_size=1): - start = 0 rnd = random.Random(self.url) - while start <= len(self.data_bytes): - time.sleep(rnd.random() * 0.01) - yield self.data_bytes[start:start + chunk_size] - start += chunk_size + while True: + chunk = self.raw.read(chunk_size) + if chunk: + time.sleep(rnd.random() * 0.01) + yield chunk + else: + break @property def request(self): diff --git a/b2sdk/transfer/inbound/downloader/abstract.py b/b2sdk/transfer/inbound/downloader/abstract.py index fdb9ad04b..9705807d9 100644 --- a/b2sdk/transfer/inbound/downloader/abstract.py +++ b/b2sdk/transfer/inbound/downloader/abstract.py @@ -41,8 +41,18 @@ def copy(self): class AbstractDownloader(metaclass=B2TraceMetaAbstract): + """ + Abstract class for downloaders. + + :var REQUIRES_SEEKING: if True, the downloader requires the ability to seek in the file object. + :var SUPPORTS_DECODE_CONTENT: if True, the downloader supports decoded HTTP streams. + In practice, this means that the downloader can handle HTTP responses which already + have the content decoded per Content-Encoding and, more likely than not, of a different + length than requested. + """ REQUIRES_SEEKING = True + SUPPORTS_DECODE_CONTENT = True DEFAULT_THREAD_POOL_CLASS = staticmethod(ThreadPoolExecutor) DEFAULT_ALIGN_FACTOR = 4096 @@ -103,6 +113,8 @@ def is_suitable(self, download_version: DownloadVersion, allow_seeking: bool): """ if self.REQUIRES_SEEKING and not allow_seeking: return False + if not self.SUPPORTS_DECODE_CONTENT and download_version.content_encoding and download_version.api.api_config.decode_content: + return False return True @abstractmethod @@ -113,8 +125,16 @@ def download( download_version: DownloadVersion, session: B2Session, encryption: EncryptionSetting | None = None, - ): + ) -> tuple[int, str]: """ - @returns (bytes_read, actual_sha1) + Download target to a file-like object. + + :param file: file-like object to write to + :param response: requests.Response of b2_download_url_by_* endpoint with the target object + :param download_version: DownloadVersion of an object being downloaded + :param session: B2Session to be used for downloading + :param encryption: optional Encryption setting + :return: (bytes_read, actual_sha1) + please note bytes_read may be different from bytes written to a file object if decode_content=True """ pass diff --git a/b2sdk/transfer/inbound/downloader/parallel.py b/b2sdk/transfer/inbound/downloader/parallel.py index a8ee36666..d6222cc18 100644 --- a/b2sdk/transfer/inbound/downloader/parallel.py +++ b/b2sdk/transfer/inbound/downloader/parallel.py @@ -56,6 +56,7 @@ class ParallelDownloader(AbstractDownloader): # cloud file start cloud file end # FINISH_HASHING_BUFFER_SIZE = 1024**2 + SUPPORTS_DECODE_CONTENT = False def __init__(self, min_part_size: int, max_streams: int | None = None, **kwargs): """ diff --git a/b2sdk/transfer/inbound/downloader/simple.py b/b2sdk/transfer/inbound/downloader/simple.py index 03ef68792..99c4f26e4 100644 --- a/b2sdk/transfer/inbound/downloader/simple.py +++ b/b2sdk/transfer/inbound/downloader/simple.py @@ -26,6 +26,7 @@ class SimpleDownloader(AbstractDownloader): REQUIRES_SEEKING = False + SUPPORTS_DECODE_CONTENT = True def _download( self, @@ -39,12 +40,12 @@ def _download( chunk_size = self._get_chunk_size(actual_size) digest = self._get_hasher() - - bytes_read = 0 + decoded_bytes_read = 0 for data in response.iter_content(chunk_size=chunk_size): file.write(data) digest.update(data) - bytes_read += len(data) + decoded_bytes_read += len(data) + bytes_read = response.raw.tell() assert actual_size >= 1 # code below does `actual_size - 1`, but it should never reach that part with an empty file @@ -62,8 +63,8 @@ def _download( # original response is not closed at this point yet, as another layer is responsible for closing it, so a new socket might be allocated, # but this is a very rare case and so it is not worth the optimization logger.debug( - 're-download attempts remaining: %i, bytes read already: %i. Getting range %s now.', - retries_left, bytes_read, new_range + 're-download attempts remaining: %i, bytes read: %i (decoded: %i). Getting range %s now.', + retries_left, bytes_read, decoded_bytes_read, new_range ) with session.download_file_from_url( response.request.url, @@ -75,7 +76,8 @@ def _download( ): file.write(data) digest.update(data) - bytes_read += len(data) + decoded_bytes_read += len(data) + bytes_read += followup_response.raw.tell() retries_left -= 1 return bytes_read, digest.hexdigest() diff --git a/changelog.d/+fix_decode_content.fixed.md b/changelog.d/+fix_decode_content.fixed.md new file mode 100644 index 000000000..827198f79 --- /dev/null +++ b/changelog.d/+fix_decode_content.fixed.md @@ -0,0 +1 @@ +Fix `decode_content=True` causing an error when downloading tiny and large files. diff --git a/test/integration/test_download.py b/test/integration/test_download.py index fd73020b6..9a1556f21 100644 --- a/test/integration/test_download.py +++ b/test/integration/test_download.py @@ -97,32 +97,27 @@ def test_small_unverified(self): pprint(f.download_version._get_args_for_clone()) assert not f.download_version.content_sha1_verified - def test_gzip(self): - bucket = self.create_bucket() - with tempfile.TemporaryDirectory() as temp_dir: - temp_dir = pathlib.Path(temp_dir) - source_file = temp_dir / 'compressed_file.gz' - downloaded_uncompressed_file = temp_dir / 'downloaded_uncompressed_file' - downloaded_compressed_file = temp_dir / 'downloaded_compressed_file' - - data_to_write = b"I'm about to be compressed and sent to the cloud, yay!\n" * 100 # too short files failed somehow - with gzip.open(source_file, 'wb') as gzip_file: - gzip_file.write(data_to_write) - file_version = bucket.upload_local_file( - str(source_file), 'gzipped_file', file_info={'b2-content-encoding': 'gzip'} - ) - self.b2_api.download_file_by_id(file_id=file_version.id_).save_to( - str(downloaded_compressed_file) - ) - assert downloaded_compressed_file.read_bytes() == source_file.read_bytes() - decompressing_api, _ = authorize( - self.b2_auth_data, B2HttpApiConfig(decode_content=True) - ) - decompressing_api.download_file_by_id(file_id=file_version.id_).save_to( - str(downloaded_uncompressed_file) - ) - assert downloaded_uncompressed_file.read_bytes() == data_to_write +@pytest.mark.parametrize("size_multiplier", [1, 100]) +def test_gzip(b2_auth_data, bucket, tmp_path, b2_api, size_multiplier): + """Test downloading gzipped files of varius sizes with and without content-encoding.""" + source_file = tmp_path / 'compressed_file.gz' + downloaded_uncompressed_file = tmp_path / 'downloaded_uncompressed_file' + downloaded_compressed_file = tmp_path / 'downloaded_compressed_file' + + data_to_write = b"I'm about to be compressed and sent to the cloud, yay!\n" * size_multiplier + source_file.write_bytes(gzip.compress(data_to_write)) + file_version = bucket.upload_local_file( + str(source_file), 'gzipped_file', file_info={'b2-content-encoding': 'gzip'} + ) + b2_api.download_file_by_id(file_id=file_version.id_).save_to(str(downloaded_compressed_file)) + assert downloaded_compressed_file.read_bytes() == source_file.read_bytes() + + decompressing_api, _ = authorize(b2_auth_data, B2HttpApiConfig(decode_content=True)) + decompressing_api.download_file_by_id(file_id=file_version.id_).save_to( + str(downloaded_uncompressed_file) + ) + assert downloaded_uncompressed_file.read_bytes() == data_to_write @pytest.fixture