From b1cc890cbe12187d5dc5f8c1054d1edbf1f00dad Mon Sep 17 00:00:00 2001 From: LTLA Date: Thu, 25 Apr 2024 11:05:07 -0700 Subject: [PATCH] Added helper functions to retrieve metadata, files, directories. --- src/sewerrat/__init__.py | 3 + src/sewerrat/retrieve_directory.py | 105 +++++++++++++++++++++++++++++ src/sewerrat/retrieve_file.py | 38 +++++++++++ src/sewerrat/retrieve_metadata.py | 31 +++++++++ tests/test_retrieve.py | 88 ++++++++++++++++++++++++ 5 files changed, 265 insertions(+) create mode 100644 src/sewerrat/retrieve_directory.py create mode 100644 src/sewerrat/retrieve_file.py create mode 100644 src/sewerrat/retrieve_metadata.py create mode 100644 tests/test_retrieve.py diff --git a/src/sewerrat/__init__.py b/src/sewerrat/__init__.py index 248ebbb..2575212 100644 --- a/src/sewerrat/__init__.py +++ b/src/sewerrat/__init__.py @@ -20,3 +20,6 @@ from .query import query from .start_sewerrat import start_sewerrat, stop_sewerrat from .list_files import list_files +from .retrieve_directory import retrieve_directory +from .retrieve_file import retrieve_file +from .retrieve_metadata import retrieve_metadata diff --git a/src/sewerrat/retrieve_directory.py b/src/sewerrat/retrieve_directory.py new file mode 100644 index 0000000..3353811 --- /dev/null +++ b/src/sewerrat/retrieve_directory.py @@ -0,0 +1,105 @@ +from typing import Optional +import os +import tempfile +import urllib +import requests +import shutil +from . import _utils as ut + + +def _local_root(cache: Optional[str], url: str) -> str: + if cache is None: + import appdirs + cache = appdirs.user_data_dir("sewerrat", "aaron") + return os.path.join(cache, urllib.parse.quote_plus(url)) + + +def _acquire_file_raw(cache: str, path: str, url: str, overwrite: bool) -> str: + target = os.path.join(cache, "LOCAL" + path) # os.path.join behaves poorly when 'path' is an absolute path. + + if overwrite or not os.path.exists(target): + tempdir = os.path.join(cache, "TEMP") + os.makedirs(tempdir, exist_ok=True) + os.makedirs(os.path.dirname(target), exist_ok=True) + + tempfid, temppath = tempfile.mkstemp(dir=tempdir) + try: + with requests.get(url + "/retrieve/file?path=" + urllib.parse.quote_plus(path), stream=True) as r: + if r.status_code >= 300: + raise ut.format_error(r) + with os.fdopen(tempfid, 'wb') as f: + shutil.copyfileobj(r.raw, f) + os.rename(temppath, target) # this should be more or less atomic, so no need for locks. + finally: + if os.path.exists(temppath): + os.remove(temppath) + + return target + + +def _acquire_file(cache: str, path: str, name: str, url: str, overwrite: bool) -> str: + return _acquire_file_raw(cache, path + "/" + name, url, overwrite) + + +def retrieve_directory(path: str, url: str, cache: Optional[str] = None, force_remote: bool = False, overwrite: bool = False, concurrent: int = 1) -> str: + """ + Obtain the path to a registered directory or one of its subdirectories. + This may create a local copy of the directory's contents if the caller + is not on the same filesystem. + + Args: + path: + Relative path to a registered directory or its subdirectories. + + url: + URL to the Gobbler REST API. Only used for remote queries. + + cache: + Path to a cache directory. If None, an appropriate location is + automatically chosen. Only used for remote access. + + force_remote: + Whether to force remote access. This will download all files in the + ``path`` via the REST API and cache them locally, even if + ``path`` is present on the same filesystem. + + overwrite: + Whether to overwrite existing files in the cache. + + concurrent: + Number of concurrent downloads. + + Returns: + Path to the subdirectory on the caller's filesystem. This is either + ``path`` if it is accessible, or a path to a local cache of the + directory's contents otherwise. + """ + if not force_remote and os.path.exists(path): + return path + + cache = _local_root(cache, url) + final = os.path.join(cache, "LOCAL" + path) # os.path.join doesn't like joining of absolute paths. + ok = os.path.join(cache, "SUCCESS" + path, "....OK") + if not overwrite and os.path.exists(ok) and os.path.exists(final): + return final + + res = requests.get(url + "/list?path=" + urllib.parse.quote_plus(path) + "&recursive=true") + if res.status_code >= 300: + raise ut.format_error(res) + listing = res.json() + + if concurrent == 1: + for y in listing: + _acquire_file(cache, name=y, path=path, url=url, overwrite=overwrite) + else: + import multiprocessing + import functools + with multiprocessing.Pool(concurrent) as p: + p.map(functools.partial(_acquire_file, cache, path, url=url, overwrite=overwrite), listing) + + # We use a directory-level OK file to avoid having to scan through all + # the directory contents to indicate that it's complete. + os.makedirs(os.path.dirname(ok), exist_ok=True) + with open(ok, "w") as handle: + handle.write("") + return final diff --git a/src/sewerrat/retrieve_file.py b/src/sewerrat/retrieve_file.py new file mode 100644 index 0000000..816eb03 --- /dev/null +++ b/src/sewerrat/retrieve_file.py @@ -0,0 +1,38 @@ +from typing import Optional +import os +from .retrieve_directory import _local_root, _acquire_file_raw + + +def retrieve_file(path, url, cache: Optional[str] = None, force_remote: bool = False, overwrite: bool = False) -> str: + """ + Retrieve the path to a single file in a registered directory. This will + call the REST API if the caller is not on the same filesystem. + + Args: + path: + Relative path to a registered directory or its subdirectories. + + url: + URL to the Gobbler REST API. Only used for remote queries. + + cache: + Path to a cache directory. If None, an appropriate location is + automatically chosen. Only used for remote access. + + force_remote: + Whether to force remote access. This will download ``path`` via the + REST API and cache it locally, even if ``path`` is present on the + same filesystem. + + overwrite: + Whether to overwrite existing files in the cache. + + Returns: + Path to the subdirectory on the caller's filesystem. This is either + ``path`` if it is accessible, or a path to a local copy otherwise. + """ + if not force_remote and os.path.exists(path): + return path + else: + cache = _local_root(cache, url) + return _acquire_file_raw(cache, path, url=url, overwrite=overwrite) diff --git a/src/sewerrat/retrieve_metadata.py b/src/sewerrat/retrieve_metadata.py new file mode 100644 index 0000000..72915c0 --- /dev/null +++ b/src/sewerrat/retrieve_metadata.py @@ -0,0 +1,31 @@ +from typing import Dict, Any +import requests +import urllib +from . import _utils as ut + + +def retrieve_metadata(path: str, url: str) -> Dict[str, Any]: + """ + Retrieve a single metadata entry in a registered directory from the + SewerRat API. + + Args: + path: + Absolute path to a metadata file in a registered directory. + + url: + URL to the SewerRat REST API. + + Returns: + Dictionary containing: + + - ``path``, the path to the metadata file. + - ``user``, the identity of the owning user. + - ``time``, the Unix time at which the file was modified. + - ``metadata``, the loaded metadata, typically another dictionary + representing a JSON object. + """ + res = requests.get(url + "/retrieve/metadata?path=" + urllib.parse.quote_plus(path)) + if res.status_code >= 300: + raise ut.format_error(res) + return res.json() diff --git a/tests/test_retrieve.py b/tests/test_retrieve.py new file mode 100644 index 0000000..9e75e15 --- /dev/null +++ b/tests/test_retrieve.py @@ -0,0 +1,88 @@ +import sewerrat +import pytest +import os +import tempfile +import json + + +@pytest.fixture(scope="module") +def setup(): + _, url = sewerrat.start_sewerrat() + + mydir = tempfile.mkdtemp() + with open(os.path.join(mydir, "metadata.json"), "w") as handle: + handle.write('{ "first": "Aaron", "last": "Lun" }') + + os.mkdir(os.path.join(mydir, "diet")) + with open(os.path.join(mydir, "diet", "metadata.json"), "w") as handle: + handle.write('{ "meal": "lunch", "ingredients": "water" }') + + sewerrat.register(mydir, ["metadata.json"], url=url) + return mydir + + +def test_retrieve_file(setup): + mydir = setup + _, url = sewerrat.start_sewerrat() + + p = sewerrat.retrieve_file(mydir + "/metadata.json", url=url) + with open(p, "r") as f: + meta = json.load(f) + assert meta["first"] == "Aaron" + + cache = tempfile.mkdtemp() + p = sewerrat.retrieve_file(mydir + "/metadata.json", url=url, cache=cache, force_remote=True) + assert p.startswith(cache) + with open(p, "r") as f: + meta = json.load(f) + assert meta["first"] == "Aaron" + + +def test_retrieve_metadata(setup): + mydir = setup + _, url = sewerrat.start_sewerrat() + + fpath = mydir + "/diet/metadata.json" + meta = sewerrat.retrieve_metadata(fpath, url=url) + assert os.path.normpath(fpath) == os.path.normpath(meta["path"]) + assert meta["metadata"]["meal"] == "lunch" + + +def test_retrieve_directory(setup): + mydir = setup + _, url = sewerrat.start_sewerrat() + + dir = sewerrat.retrieve_directory(mydir, url=url) + with open(os.path.join(dir, "metadata.json"), "r") as f: + meta = json.load(f) + assert meta["first"] == "Aaron" + + subpath = os.path.join(mydir, "diet") + cache = tempfile.mkdtemp() + rdir = sewerrat.retrieve_directory(subpath, url=url, cache=cache, force_remote=True) + assert rdir.startswith(cache) + with open(os.path.join(rdir, "metadata.json"), "r") as f: + meta = json.load(f) + assert meta["meal"] == "lunch" + + # Subsequent requests are no-ops. + with open(os.path.join(rdir, "metadata.json"), "w") as f: + f.write('{ "meal": "dinner" }') + rdir2 = sewerrat.retrieve_directory(subpath, url=url, cache=cache, force_remote=True) + assert rdir == rdir2 + with open(os.path.join(rdir2, "metadata.json"), "r") as f: + meta = json.load(f) + assert meta["meal"] == "dinner" + + # Unless we force an overwrite. + rdir2 == sewerrat.retrieve_directory(subpath, url=url, cache=cache, force_remote=True, overwrite=True) + with open(os.path.join(rdir2, "metadata.json"), "r") as f: + meta = json.load(f) + assert meta["meal"] == "lunch" + + # Trying with multiple cores. + cache = tempfile.mkdtemp() + rdir2 = sewerrat.retrieve_directory(mydir, url=url, cache=cache, force_remote=True, concurrent=2) + with open(os.path.join(rdir2, "diet", "metadata.json"), "r") as f: + meta = json.load(f) + assert meta["meal"] == "lunch"