diff --git a/pangeo_forge_recipes/recipes/xarray_zarr.py b/pangeo_forge_recipes/recipes/xarray_zarr.py index f42de13b..b1b9f4e2 100644 --- a/pangeo_forge_recipes/recipes/xarray_zarr.py +++ b/pangeo_forge_recipes/recipes/xarray_zarr.py @@ -2,10 +2,7 @@ A Pangeo Forge Recipe """ -import json import logging -import os -import tempfile import warnings from contextlib import ExitStack, contextmanager from dataclasses import dataclass, field @@ -13,13 +10,12 @@ from typing import Callable, Dict, List, Optional, Sequence, Tuple import dask -import fsspec import numpy as np import xarray as xr import zarr from ..patterns import FilePattern -from ..storage import AbstractTarget, UninitializedTarget, UninitializedTargetError +from ..storage import AbstractTarget, CacheFSSpecTarget, MetadataTarget, file_opener from ..utils import ( chunk_bounds_and_conflicts, chunked_iterable, @@ -47,59 +43,12 @@ def _chunk_metadata_fname(chunk_key) -> str: return "chunk-meta-" + _encode_key(chunk_key) + ".json" -def _copy_btw_filesystems(input_opener, output_opener, BLOCK_SIZE=10_000_000): - with input_opener as source: - with output_opener as target: - while True: - data = source.read(BLOCK_SIZE) - if not data: - break - target.write(data) - - -@contextmanager -def _maybe_open_or_copy_to_local(opener, copy_to_local, orig_name): - _, suffix = os.path.splitext(orig_name) - if copy_to_local: - ntf = tempfile.NamedTemporaryFile(suffix=suffix) - tmp_name = ntf.name - logger.info(f"Copying {orig_name} to local file {tmp_name}") - target_opener = open(tmp_name, mode="wb") - _copy_btw_filesystems(opener, target_opener) - yield tmp_name - ntf.close() # cleans up the temporary file - else: - with opener as fp: - with fp as fp2: - yield fp2 - - -@contextmanager -def _fsspec_safe_open(fname, **kwargs): - # workaround for inconsistent behavior of fsspec.open - # https://github.com/intake/filesystem_spec/issues/579 - with fsspec.open(fname, **kwargs) as fp: - with fp as fp2: - yield fp2 - - -def _get_url_size(fname): - with fsspec.open(fname, mode="rb") as of: - size = of.size - return size - +ChunkKey = Tuple[int] +InputKey = Tuple[int] # Notes about dataclasses: # - https://www.python.org/dev/peps/pep-0557/#inheritance # - https://stackoverflow.com/questions/51575931/class-inheritance-in-python-3-7-dataclasses -# The main awkward thing here is that, because we are using multiple inheritance -# with dataclasses, _ALL_ fields must have default values. This makes it impossible -# to have "required" keyword arguments--everything needs some default. -# That's whay we end up with `UninitializedTarget` and `_variable_sequence_pattern_default_factory` - - -ChunkKey = Tuple[int] -InputKey = Tuple[int] @dataclass @@ -140,9 +89,9 @@ class XarrayZarrRecipe(BaseRecipe): file_pattern: FilePattern inputs_per_chunk: Optional[int] = 1 target_chunks: Dict[str, int] = field(default_factory=dict) - target: AbstractTarget = field(default_factory=UninitializedTarget) - input_cache: AbstractTarget = field(default_factory=UninitializedTarget) - metadata_cache: AbstractTarget = field(default_factory=UninitializedTarget) + target: Optional[AbstractTarget] = None + input_cache: Optional[CacheFSSpecTarget] = None + metadata_cache: Optional[MetadataTarget] = None cache_inputs: bool = True copy_input_to_local_file: bool = False consolidate_zarr: bool = True @@ -247,6 +196,8 @@ def _set_target_chunks(self): @property # type: ignore @closure def prepare_target(self) -> None: + if self.target is None: + raise ValueError("target is not set.") try: ds = self.open_target() logger.info("Found an existing dataset in target") @@ -307,38 +258,31 @@ def prepare_target(self) -> None: self.expand_target_dim(self._concat_dim, n_sequence) if self._cache_metadata: + if self.metadata_cache is None: + raise ValueError("metadata_cache is not set") # if nitems_per_input is not constant, we need to cache this info recipe_meta = {"input_sequence_lens": input_sequence_lens} - meta_mapper = self.metadata_cache.get_mapper() - # we are saving a dictionary with one key (input_sequence_lens) - logger.info("Caching global metadata") - meta_mapper[_GLOBAL_METADATA_KEY] = json.dumps(recipe_meta).encode("utf-8") + self.metadata_cache[_GLOBAL_METADATA_KEY] = recipe_meta # TODO: figure out how to make mypy happy with this convoluted structure @property # type: ignore @closure def cache_input(self, input_key: InputKey) -> None: # type: ignore - logger.info(f"Caching input {input_key}") - fname = self.file_pattern[input_key] + if self.cache_inputs: + if self.input_cache is None: + raise ValueError("input_cache is not set.") + logger.info(f"Caching input '{input_key}'") + fname = self.file_pattern[input_key] + self.input_cache.cache_file(fname, **self.fsspec_open_kwargs) - # check and see if the file already exists in the cache - if self.input_cache.exists(fname): - cached_size = self.input_cache.size(fname) - remote_size = _get_url_size(fname) - if cached_size == remote_size: - logger.info(f"Input {input_key} file {fname} is already cached") - return - - input_opener = _fsspec_safe_open(fname, mode="rb", **self.fsspec_open_kwargs) - target_opener = self.input_cache.open(fname, mode="wb") - _copy_btw_filesystems(input_opener, target_opener) - # TODO: make it so we can cache metadata WITHOUT copying the file if self._cache_metadata: self.cache_input_metadata(input_key) @property # type: ignore @closure def store_chunk(self, chunk_key: ChunkKey) -> None: # type: ignore + if self.target is None: + raise ValueError("target has not been set.") with self.open_chunk(chunk_key) as ds_chunk: # writing a region means that all the variables MUST have concat_dim to_drop = [v for v in ds_chunk.variables if self._concat_dim not in ds_chunk[v].dims] @@ -377,42 +321,20 @@ def store_chunk(self, chunk_key: ChunkKey) -> None: # type: ignore @property # type: ignore @closure def finalize_target(self) -> None: + if self.target is None: + raise ValueError("target has not been set.") if self.consolidate_zarr: logger.info("Consolidating Zarr metadata") target_mapper = self.target.get_mapper() zarr.consolidate_metadata(target_mapper) - @contextmanager - def input_opener(self, fname: str): - try: - logger.info(f"Opening '{fname}' from cache") - opener = self.input_cache.open(fname, mode="rb") - with _maybe_open_or_copy_to_local(opener, self.copy_input_to_local_file, fname) as fp: - yield fp - except (IOError, FileNotFoundError, UninitializedTargetError) as err: - if self.cache_inputs: - raise Exception( - f"You are trying to open input {fname}, but the file is " - "not cached yet. First call `cache_input` or set " - "`cache_inputs=False`." - ) from err - logger.info(f"No cache found. Opening input `{fname}` directly.") - opener = _fsspec_safe_open(fname, mode="rb", **self.fsspec_open_kwargs) - with _maybe_open_or_copy_to_local(opener, self.copy_input_to_local_file, fname) as fp: - yield fp - @contextmanager def open_input(self, input_key: InputKey): fname = self.file_pattern[input_key] logger.info(f"Opening input with Xarray {input_key}: '{fname}'") - with self.input_opener(fname) as f: + cache = self.input_cache if self.cache_inputs else None + with file_opener(fname, cache=cache, copy_to_local=self.copy_input_to_local_file) as f: ds = xr.open_dataset(f, **self.xarray_open_kwargs) - # Explicitly load into memory; - # if we don't do this, we get a ValueError: seek of closed file. - # But there will be some cases where we really don't want to load. - # how to keep around the open file object? - # ds = ds.load() - ds = fix_scalar_attr_encoding(ds) if self.delete_input_encoding: @@ -426,11 +348,12 @@ def open_input(self, input_key: InputKey): yield ds def cache_input_metadata(self, input_key: InputKey): + if self.metadata_cache is None: + raise ValueError("metadata_cache is not set.") logger.info(f"Caching metadata for input '{input_key}'") with self.open_input(input_key) as ds: - metadata = ds.to_dict(data=False) - mapper = self.metadata_cache.get_mapper() - mapper[_input_metadata_fname(input_key)] = json.dumps(metadata).encode("utf-8") + input_metadata = ds.to_dict(data=False) + self.metadata_cache[_input_metadata_fname(input_key)] = input_metadata @contextmanager def open_chunk(self, chunk_key: ChunkKey): @@ -498,9 +421,10 @@ def region_and_conflicts_for_chunk(self, chunk_key: ChunkKey): self._concat_dim # type: ignore ] else: - input_sequence_lens = json.loads( - self.metadata_cache.get_mapper()[_GLOBAL_METADATA_KEY] - )["input_sequence_lens"] + if self.metadata_cache is None: + raise ValueError("metadata_cache is not set.") + global_metadata = self.metadata_cache[_GLOBAL_METADATA_KEY] + input_sequence_lens = global_metadata["input_sequence_lens"] chunk_bounds, all_chunk_conflicts = chunk_bounds_and_conflicts( input_sequence_lens, self._concat_dim_chunks # type: ignore @@ -523,10 +447,10 @@ def iter_chunks(self): yield k def get_input_meta(self, *input_keys: Sequence[InputKey]) -> Dict: - meta_mapper = self.metadata_cache.get_mapper() # getitems should be async; much faster than serial calls - all_meta_raw = meta_mapper.getitems([_input_metadata_fname(k) for k in input_keys]) - return {k: json.loads(raw_bytes) for k, raw_bytes in all_meta_raw.items()} + if self.metadata_cache is None: + raise ValueError("metadata_cache is not set.") + return self.metadata_cache.getitems([_input_metadata_fname(k) for k in input_keys]) def input_position(self, input_key): # returns the index position of an input key wrt the concat_dim diff --git a/pangeo_forge_recipes/storage.py b/pangeo_forge_recipes/storage.py index a5e332f4..d7d13856 100644 --- a/pangeo_forge_recipes/storage.py +++ b/pangeo_forge_recipes/storage.py @@ -1,14 +1,47 @@ +import hashlib +import json +import logging import os import re +import tempfile import unicodedata -import zlib from abc import ABC, abstractmethod from contextlib import contextmanager from dataclasses import dataclass -from typing import Iterator +from typing import Any, Iterator, Optional, Sequence, Union import fsspec +logger = logging.getLogger(__name__) + +# fsspec doesn't provide type hints, so I'm not sure what the write type is for open files +OpenFileType = Any + + +def _get_url_size(fname): + with fsspec.open(fname, mode="rb") as of: + size = of.size + return size + + +@contextmanager +def _fsspec_safe_open(fname: str, **kwargs) -> Iterator[OpenFileType]: + # workaround for inconsistent behavior of fsspec.open + # https://github.com/intake/filesystem_spec/issues/579 + with fsspec.open(fname, **kwargs) as fp: + with fp as fp2: + yield fp2 + + +def _copy_btw_filesystems(input_opener, output_opener, BLOCK_SIZE=10_000_000): + with input_opener as source: + with output_opener as target: + while True: + data = source.read(BLOCK_SIZE) + if not data: + break + target.write(data) + class AbstractTarget(ABC): @abstractmethod @@ -89,7 +122,7 @@ class FlatFSSpecTarget(FSSpecTarget): def _full_path(self, path: str) -> str: # this is just in case _slugify(path) is non-unique - prefix = hex(zlib.adler32(str(path).encode("utf8")))[2:10] + prefix = hashlib.md5(path.encode()).hexdigest() slug = _slugify(path) new_path = "-".join([prefix, slug]) return os.path.join(self.root_path, new_path) @@ -98,7 +131,74 @@ def _full_path(self, path: str) -> str: class CacheFSSpecTarget(FlatFSSpecTarget): """Alias for FlatFSSpecTarget""" - pass + def cache_file(self, fname: str, **open_kwargs) -> None: + # check and see if the file already exists in the cache + logger.info(f"Caching file '{fname}'") + if self.exists(fname): + cached_size = self.size(fname) + remote_size = _get_url_size(fname) + if cached_size == remote_size: + # TODO: add checksumming here + logger.info(f"File '{fname}' is already cached") + return + + input_opener = _fsspec_safe_open(fname, mode="rb", **open_kwargs) + target_opener = self.open(fname, mode="wb") + logger.info(f"Coping remote file '{fname}' to cache") + _copy_btw_filesystems(input_opener, target_opener) + + +class MetadataTarget(FSSpecTarget): + """Target for storing metadata dictionaries as json.""" + + def __setitem__(self, key: str, value: dict) -> None: + mapper = self.get_mapper() + mapper[key] = json.dumps(value).encode("utf-8") + + def __getitem__(self, key: str) -> dict: + return json.loads(self.get_mapper()[key]) + + def getitems(self, keys: Sequence[str]) -> dict: + mapper = self.get_mapper() + all_meta_raw = mapper.getitems(keys) + return {k: json.loads(raw_bytes) for k, raw_bytes in all_meta_raw.items()} + + +@contextmanager +def file_opener( + fname: str, + cache: Optional[CacheFSSpecTarget] = None, + copy_to_local: bool = False, + **open_kwargs, +) -> Iterator[Union[OpenFileType, str]]: + """ + Context manager for opening files. + + :param fname: The filename / url to open. Fsspec will inspect the protocol + (e.g. http, ftp) and determine the appropriate filesystem type to use. + :param cache: A target where the file may have been cached. If none, the file + will be opened directly. + :param copy_to_local: If True, always copy the file to a local temporary file + before opening. In this case, function yields a path name rather than an open file. + """ + if cache is not None: + logger.info(f"Opening '{fname}' from cache") + opener = cache.open(fname, mode="rb") + else: + logger.info(f"Opening '{fname}' directly.") + opener = _fsspec_safe_open(fname, mode="rb", **open_kwargs) + if copy_to_local: + _, suffix = os.path.splitext(fname) + ntf = tempfile.NamedTemporaryFile(suffix=suffix) + tmp_name = ntf.name + logger.info(f"Copying '{fname}' to local file '{tmp_name}'") + target_opener = open(tmp_name, mode="wb") + _copy_btw_filesystems(opener, target_opener) + yield tmp_name + ntf.close() # cleans up the temporary file + else: + with opener as fp: + yield fp def _slugify(value: str) -> str: @@ -109,32 +209,3 @@ def _slugify(value: str) -> str: value = unicodedata.normalize("NFKD", value).encode("ascii", "ignore").decode("ascii") value = re.sub(r"[^.\w\s-]+", "_", value.lower()) return re.sub(r"[-\s]+", "-", value).strip("-_") - - -class UninitializedTarget(AbstractTarget): - def get_mapper(self): - raise UninitializedTargetError - - def exists(self, path: str) -> bool: - raise UninitializedTargetError - - def rm(self, path: str) -> None: - raise UninitializedTargetError - - def open(self, path: str, **kwargs): # don't know how to type hint this - raise UninitializedTargetError - - def size(self, path: str, **kwargs): - raise UninitializedTargetError - - -class TargetError(Exception): - """Base class for exceptions in this module.""" - - pass - - -class UninitializedTargetError(TargetError): - """Operation on an uninitialized Target.""" - - pass diff --git a/tests/conftest.py b/tests/conftest.py index 18a9a8b5..42747ff2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -23,7 +23,7 @@ MergeDim, pattern_from_file_sequence, ) -from pangeo_forge_recipes.storage import CacheFSSpecTarget, FSSpecTarget, UninitializedTarget +from pangeo_forge_recipes.storage import CacheFSSpecTarget, FSSpecTarget, MetadataTarget def pytest_addoption(parser): @@ -120,39 +120,41 @@ def netcdf_local_paths_by_variable(daily_xarray_dataset, tmpdir_factory, request # TODO: refactor to allow netcdf_local_paths_by_variable to be passed without # duplicating the whole test. -@pytest.fixture() -def netcdf_http_server(netcdf_local_paths, request): +@pytest.fixture(scope="session") +def netcdf_http_paths(netcdf_local_paths, request): paths, items_per_file = netcdf_local_paths - def make_netcdf_http_server(username="", password=""): - first_path = paths[0] - # assume that all files are in the same directory - basedir = first_path.dirpath() - fnames = [path.basename for path in paths] - - this_dir = os.path.dirname(os.path.abspath(__file__)) - port = get_open_port() - command_list = [ - "python", - os.path.join(this_dir, "http_auth_server.py"), - port, - "127.0.0.1", - username, - password, - ] - if username: - command_list += [username, password] - p = subprocess.Popen(command_list, cwd=basedir) - url = f"http://127.0.0.1:{port}" - time.sleep(1) # let the server start up - - def teardown(): - p.kill() - - request.addfinalizer(teardown) - return url, fnames, items_per_file - - return make_netcdf_http_server + username = "" + password = "" + + first_path = paths[0] + # assume that all files are in the same directory + basedir = first_path.dirpath() + fnames = [path.basename for path in paths] + + this_dir = os.path.dirname(os.path.abspath(__file__)) + port = get_open_port() + command_list = [ + "python", + os.path.join(this_dir, "http_auth_server.py"), + port, + "127.0.0.1", + username, + password, + ] + if username: + command_list += [username, password] + p = subprocess.Popen(command_list, cwd=basedir) + url = f"http://127.0.0.1:{port}" + time.sleep(2) # let the server start up + + def teardown(): + p.kill() + + request.addfinalizer(teardown) + + all_urls = ["/".join([url, str(fname)]) for fname in fnames] + return all_urls, items_per_file @pytest.fixture() @@ -171,21 +173,31 @@ def tmp_cache(tmpdir_factory): @pytest.fixture() -def uninitialized_target(): - return UninitializedTarget() +def tmp_metadata_target(tmpdir_factory): + path = str(tmpdir_factory.mktemp("cache")) + fs = fsspec.get_filesystem_class("file")() + cache = MetadataTarget(fs, path) + return cache @pytest.fixture -def netCDFtoZarr_sequential_recipe(daily_xarray_dataset, netcdf_local_paths, tmp_target, tmp_cache): +def netCDFtoZarr_sequential_recipe( + daily_xarray_dataset, netcdf_local_paths, tmp_target, tmp_cache, tmp_metadata_target +): paths, items_per_file = netcdf_local_paths file_pattern = pattern_from_file_sequence([str(path) for path in paths], "time", items_per_file) - kwargs = dict(inputs_per_chunk=1, target=tmp_target, input_cache=tmp_cache,) + kwargs = dict( + inputs_per_chunk=1, + target=tmp_target, + input_cache=tmp_cache, + metadata_cache=tmp_metadata_target, + ) return recipes.XarrayZarrRecipe, file_pattern, kwargs, daily_xarray_dataset, tmp_target @pytest.fixture def netCDFtoZarr_sequential_multi_variable_recipe( - daily_xarray_dataset, netcdf_local_paths_by_variable, tmp_target, tmp_cache + daily_xarray_dataset, netcdf_local_paths_by_variable, tmp_target, tmp_cache, tmp_metadata_target ): paths, items_per_file, fnames_by_variable, path_format = netcdf_local_paths_by_variable time_index = list(range(len(paths) // 2)) @@ -198,7 +210,12 @@ def format_function(variable, time): ConcatDim("time", time_index, items_per_file), MergeDim("variable", ["foo", "bar"]), ) - kwargs = dict(inputs_per_chunk=1, target=tmp_target, input_cache=tmp_cache,) + kwargs = dict( + inputs_per_chunk=1, + target=tmp_target, + input_cache=tmp_cache, + metadata_cache=tmp_metadata_target, + ) return recipes.XarrayZarrRecipe, file_pattern, kwargs, daily_xarray_dataset, tmp_target diff --git a/tests/test_fixtures.py b/tests/test_fixtures.py index 1a22cc78..f2bda8b9 100644 --- a/tests/test_fixtures.py +++ b/tests/test_fixtures.py @@ -1,8 +1,6 @@ import fsspec -import pytest import xarray as xr -from pangeo_forge_recipes.storage import UninitializedTargetError from pangeo_forge_recipes.utils import fix_scalar_attr_encoding @@ -21,48 +19,9 @@ def test_fixture_local_files_by_variable(daily_xarray_dataset, netcdf_local_path assert ds.identical(daily_xarray_dataset) -def test_fixture_http_files(daily_xarray_dataset, netcdf_http_server): - url, paths, items_per_file = netcdf_http_server() - urls = ["/".join([url, str(path)]) for path in paths] +def test_fixture_http_files(daily_xarray_dataset, netcdf_http_paths): + urls, items_per_file = netcdf_http_paths open_files = [fsspec.open(url).open() for url in urls] ds = xr.open_mfdataset(open_files, combine="nested", concat_dim="time").load() ds = fix_scalar_attr_encoding(ds) assert ds.identical(daily_xarray_dataset) - - -def test_target(tmp_target): - mapper = tmp_target.get_mapper() - mapper["foo"] = b"bar" - with open(tmp_target.root_path + "/foo") as f: - res = f.read() - assert res == "bar" - with pytest.raises(FileNotFoundError): - tmp_target.rm("baz") - with pytest.raises(FileNotFoundError): - with tmp_target.open("baz"): - pass - - -def test_uninitialized_target(uninitialized_target): - target = uninitialized_target - with pytest.raises(UninitializedTargetError): - target.get_mapper() - with pytest.raises(UninitializedTargetError): - target.exists("foo") - with pytest.raises(UninitializedTargetError): - target.rm("foo") - with pytest.raises(UninitializedTargetError): - with target.open("foo"): - pass - - -def test_cache(tmp_cache): - assert not tmp_cache.exists("foo") - with tmp_cache.open("foo", mode="w") as f: - f.write("bar") - assert tmp_cache.exists("foo") - assert tmp_cache.size("foo") == 3 - with tmp_cache.open("foo", mode="r") as f: - assert f.read() == "bar" - tmp_cache.rm("foo") - assert not tmp_cache.exists("foo") diff --git a/tests/test_recipes.py b/tests/test_recipes.py index 8ccd7d0f..6a6ad2cd 100644 --- a/tests/test_recipes.py +++ b/tests/test_recipes.py @@ -115,7 +115,6 @@ def test_chunks( for cdim in file_pattern.combine_dims: if hasattr(cdim, "nitems_per_file"): cdim.nitems_per_file = None - kwargs["metadata_cache"] = kwargs["input_cache"] with chunk_expectation as excinfo: rec = RecipeClass(file_pattern, **kwargs) diff --git a/tests/test_storage.py b/tests/test_storage.py new file mode 100644 index 00000000..f5b57a8f --- /dev/null +++ b/tests/test_storage.py @@ -0,0 +1,68 @@ +import pytest +from pytest_lazyfixture import lazy_fixture + +from pangeo_forge_recipes.storage import file_opener + + +def test_target(tmp_target): + mapper = tmp_target.get_mapper() + mapper["foo"] = b"bar" + with open(tmp_target.root_path + "/foo") as f: + res = f.read() + assert res == "bar" + with pytest.raises(FileNotFoundError): + tmp_target.rm("baz") + with pytest.raises(FileNotFoundError): + with tmp_target.open("baz"): + pass + + +def test_cache(tmp_cache): + assert not tmp_cache.exists("foo") + with tmp_cache.open("foo", mode="w") as f: + f.write("bar") + assert tmp_cache.exists("foo") + assert tmp_cache.size("foo") == 3 + with tmp_cache.open("foo", mode="r") as f: + assert f.read() == "bar" + tmp_cache.rm("foo") + assert not tmp_cache.exists("foo") + + +def test_metadata_target(tmp_metadata_target): + data = {"foo": 1, "bar": "baz"} + tmp_metadata_target["key1"] = data + assert tmp_metadata_target["key1"] == data + assert tmp_metadata_target.getitems(["key1"]) == {"key1": data} + + +@pytest.mark.parametrize( + "file_paths", [lazy_fixture("netcdf_local_paths"), lazy_fixture("netcdf_http_paths")] +) +@pytest.mark.parametrize("copy_to_local", [False, True]) +@pytest.mark.parametrize("use_cache, cache_first", [(False, False), (True, False), (True, True)]) +def test_file_opener(file_paths, tmp_cache, copy_to_local, use_cache, cache_first): + all_paths, _ = file_paths + path = str(all_paths[0]) + + cache = tmp_cache if use_cache else None + if cache_first: + cache.cache_file(path) + assert cache.exists(path) + details = cache.fs.ls(cache.root_path, detail=True) + cache.cache_file(path) + # check that nothing happened + assert cache.fs.ls(cache.root_path, detail=True) == details + opener = file_opener(path, cache, copy_to_local=copy_to_local) + if use_cache and not cache_first: + with pytest.raises(FileNotFoundError): + with opener as fp: + pass + else: + with opener as fp: + if copy_to_local: + assert isinstance(fp, str) + with open(fp, mode="rb") as fp2: + _ = fp2.read() + else: + _ = fp.read()