-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added helper functions to retrieve metadata, files, directories.
- Loading branch information
Showing
5 changed files
with
265 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
from typing import Optional | ||
import os | ||
import tempfile | ||
import urllib | ||
import requests | ||
import shutil | ||
from . import _utils as ut | ||
|
||
|
||
def _local_root(cache: Optional[str], url: str) -> str: | ||
if cache is None: | ||
import appdirs | ||
cache = appdirs.user_data_dir("sewerrat", "aaron") | ||
return os.path.join(cache, urllib.parse.quote_plus(url)) | ||
|
||
|
||
def _acquire_file_raw(cache: str, path: str, url: str, overwrite: bool) -> str: | ||
target = os.path.join(cache, "LOCAL" + path) # os.path.join behaves poorly when 'path' is an absolute path. | ||
|
||
if overwrite or not os.path.exists(target): | ||
tempdir = os.path.join(cache, "TEMP") | ||
os.makedirs(tempdir, exist_ok=True) | ||
os.makedirs(os.path.dirname(target), exist_ok=True) | ||
|
||
tempfid, temppath = tempfile.mkstemp(dir=tempdir) | ||
try: | ||
with requests.get(url + "/retrieve/file?path=" + urllib.parse.quote_plus(path), stream=True) as r: | ||
if r.status_code >= 300: | ||
raise ut.format_error(r) | ||
with os.fdopen(tempfid, 'wb') as f: | ||
shutil.copyfileobj(r.raw, f) | ||
os.rename(temppath, target) # this should be more or less atomic, so no need for locks. | ||
finally: | ||
if os.path.exists(temppath): | ||
os.remove(temppath) | ||
|
||
return target | ||
|
||
|
||
def _acquire_file(cache: str, path: str, name: str, url: str, overwrite: bool) -> str: | ||
return _acquire_file_raw(cache, path + "/" + name, url, overwrite) | ||
|
||
|
||
def retrieve_directory(path: str, url: str, cache: Optional[str] = None, force_remote: bool = False, overwrite: bool = False, concurrent: int = 1) -> str: | ||
""" | ||
Obtain the path to a registered directory or one of its subdirectories. | ||
This may create a local copy of the directory's contents if the caller | ||
is not on the same filesystem. | ||
Args: | ||
path: | ||
Relative path to a registered directory or its subdirectories. | ||
url: | ||
URL to the Gobbler REST API. Only used for remote queries. | ||
cache: | ||
Path to a cache directory. If None, an appropriate location is | ||
automatically chosen. Only used for remote access. | ||
force_remote: | ||
Whether to force remote access. This will download all files in the | ||
``path`` via the REST API and cache them locally, even if | ||
``path`` is present on the same filesystem. | ||
overwrite: | ||
Whether to overwrite existing files in the cache. | ||
concurrent: | ||
Number of concurrent downloads. | ||
Returns: | ||
Path to the subdirectory on the caller's filesystem. This is either | ||
``path`` if it is accessible, or a path to a local cache of the | ||
directory's contents otherwise. | ||
""" | ||
if not force_remote and os.path.exists(path): | ||
return path | ||
|
||
cache = _local_root(cache, url) | ||
final = os.path.join(cache, "LOCAL" + path) # os.path.join doesn't like joining of absolute paths. | ||
ok = os.path.join(cache, "SUCCESS" + path, "....OK") | ||
if not overwrite and os.path.exists(ok) and os.path.exists(final): | ||
return final | ||
|
||
res = requests.get(url + "/list?path=" + urllib.parse.quote_plus(path) + "&recursive=true") | ||
if res.status_code >= 300: | ||
raise ut.format_error(res) | ||
listing = res.json() | ||
|
||
if concurrent == 1: | ||
for y in listing: | ||
_acquire_file(cache, name=y, path=path, url=url, overwrite=overwrite) | ||
else: | ||
import multiprocessing | ||
import functools | ||
with multiprocessing.Pool(concurrent) as p: | ||
p.map(functools.partial(_acquire_file, cache, path, url=url, overwrite=overwrite), listing) | ||
|
||
# We use a directory-level OK file to avoid having to scan through all | ||
# the directory contents to indicate that it's complete. | ||
os.makedirs(os.path.dirname(ok), exist_ok=True) | ||
with open(ok, "w") as handle: | ||
handle.write("") | ||
return final |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
from typing import Optional | ||
import os | ||
from .retrieve_directory import _local_root, _acquire_file_raw | ||
|
||
|
||
def retrieve_file(path, url, cache: Optional[str] = None, force_remote: bool = False, overwrite: bool = False) -> str: | ||
""" | ||
Retrieve the path to a single file in a registered directory. This will | ||
call the REST API if the caller is not on the same filesystem. | ||
Args: | ||
path: | ||
Relative path to a registered directory or its subdirectories. | ||
url: | ||
URL to the Gobbler REST API. Only used for remote queries. | ||
cache: | ||
Path to a cache directory. If None, an appropriate location is | ||
automatically chosen. Only used for remote access. | ||
force_remote: | ||
Whether to force remote access. This will download ``path`` via the | ||
REST API and cache it locally, even if ``path`` is present on the | ||
same filesystem. | ||
overwrite: | ||
Whether to overwrite existing files in the cache. | ||
Returns: | ||
Path to the subdirectory on the caller's filesystem. This is either | ||
``path`` if it is accessible, or a path to a local copy otherwise. | ||
""" | ||
if not force_remote and os.path.exists(path): | ||
return path | ||
else: | ||
cache = _local_root(cache, url) | ||
return _acquire_file_raw(cache, path, url=url, overwrite=overwrite) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
from typing import Dict, Any | ||
import requests | ||
import urllib | ||
from . import _utils as ut | ||
|
||
|
||
def retrieve_metadata(path: str, url: str) -> Dict[str, Any]: | ||
""" | ||
Retrieve a single metadata entry in a registered directory from the | ||
SewerRat API. | ||
Args: | ||
path: | ||
Absolute path to a metadata file in a registered directory. | ||
url: | ||
URL to the SewerRat REST API. | ||
Returns: | ||
Dictionary containing: | ||
- ``path``, the path to the metadata file. | ||
- ``user``, the identity of the owning user. | ||
- ``time``, the Unix time at which the file was modified. | ||
- ``metadata``, the loaded metadata, typically another dictionary | ||
representing a JSON object. | ||
""" | ||
res = requests.get(url + "/retrieve/metadata?path=" + urllib.parse.quote_plus(path)) | ||
if res.status_code >= 300: | ||
raise ut.format_error(res) | ||
return res.json() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
import sewerrat | ||
import pytest | ||
import os | ||
import tempfile | ||
import json | ||
|
||
|
||
@pytest.fixture(scope="module") | ||
def setup(): | ||
_, url = sewerrat.start_sewerrat() | ||
|
||
mydir = tempfile.mkdtemp() | ||
with open(os.path.join(mydir, "metadata.json"), "w") as handle: | ||
handle.write('{ "first": "Aaron", "last": "Lun" }') | ||
|
||
os.mkdir(os.path.join(mydir, "diet")) | ||
with open(os.path.join(mydir, "diet", "metadata.json"), "w") as handle: | ||
handle.write('{ "meal": "lunch", "ingredients": "water" }') | ||
|
||
sewerrat.register(mydir, ["metadata.json"], url=url) | ||
return mydir | ||
|
||
|
||
def test_retrieve_file(setup): | ||
mydir = setup | ||
_, url = sewerrat.start_sewerrat() | ||
|
||
p = sewerrat.retrieve_file(mydir + "/metadata.json", url=url) | ||
with open(p, "r") as f: | ||
meta = json.load(f) | ||
assert meta["first"] == "Aaron" | ||
|
||
cache = tempfile.mkdtemp() | ||
p = sewerrat.retrieve_file(mydir + "/metadata.json", url=url, cache=cache, force_remote=True) | ||
assert p.startswith(cache) | ||
with open(p, "r") as f: | ||
meta = json.load(f) | ||
assert meta["first"] == "Aaron" | ||
|
||
|
||
def test_retrieve_metadata(setup): | ||
mydir = setup | ||
_, url = sewerrat.start_sewerrat() | ||
|
||
fpath = mydir + "/diet/metadata.json" | ||
meta = sewerrat.retrieve_metadata(fpath, url=url) | ||
assert os.path.normpath(fpath) == os.path.normpath(meta["path"]) | ||
assert meta["metadata"]["meal"] == "lunch" | ||
|
||
|
||
def test_retrieve_directory(setup): | ||
mydir = setup | ||
_, url = sewerrat.start_sewerrat() | ||
|
||
dir = sewerrat.retrieve_directory(mydir, url=url) | ||
with open(os.path.join(dir, "metadata.json"), "r") as f: | ||
meta = json.load(f) | ||
assert meta["first"] == "Aaron" | ||
|
||
subpath = os.path.join(mydir, "diet") | ||
cache = tempfile.mkdtemp() | ||
rdir = sewerrat.retrieve_directory(subpath, url=url, cache=cache, force_remote=True) | ||
assert rdir.startswith(cache) | ||
with open(os.path.join(rdir, "metadata.json"), "r") as f: | ||
meta = json.load(f) | ||
assert meta["meal"] == "lunch" | ||
|
||
# Subsequent requests are no-ops. | ||
with open(os.path.join(rdir, "metadata.json"), "w") as f: | ||
f.write('{ "meal": "dinner" }') | ||
rdir2 = sewerrat.retrieve_directory(subpath, url=url, cache=cache, force_remote=True) | ||
assert rdir == rdir2 | ||
with open(os.path.join(rdir2, "metadata.json"), "r") as f: | ||
meta = json.load(f) | ||
assert meta["meal"] == "dinner" | ||
|
||
# Unless we force an overwrite. | ||
rdir2 == sewerrat.retrieve_directory(subpath, url=url, cache=cache, force_remote=True, overwrite=True) | ||
with open(os.path.join(rdir2, "metadata.json"), "r") as f: | ||
meta = json.load(f) | ||
assert meta["meal"] == "lunch" | ||
|
||
# Trying with multiple cores. | ||
cache = tempfile.mkdtemp() | ||
rdir2 = sewerrat.retrieve_directory(mydir, url=url, cache=cache, force_remote=True, concurrent=2) | ||
with open(os.path.join(rdir2, "diet", "metadata.json"), "r") as f: | ||
meta = json.load(f) | ||
assert meta["meal"] == "lunch" |