Skip to content

Commit

Permalink
Improved synchronization of cache with remote in retrieve* functions.
Browse files Browse the repository at this point in the history
- Delete files that are no longer present in the remote directory.
- Always perform HEAD requests to check for updates to individual files.
  This allows us to simplify the code by removing the updateDelay
  for files, which was incorrect anyway as the delay was being computed
  from the file's modification time rather than its last HEAD check.
- Don't throw an error when the HEAD fails, to ensure we can still use
  the cache if the request fails, e.g., due to lack of connectivity.
- Bugfix for parsing the modification time from the headers.
  • Loading branch information
LTLA committed Oct 31, 2024
1 parent 44e5d62 commit 3e87e4b
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 19 deletions.
2 changes: 1 addition & 1 deletion src/sewerrat/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def parse_remote_last_modified(res) -> time.time:
return None
try:
mod_time = res.headers["last-modified"]
return time.mktime(time.strptime(mod_time, "%a, %d %b %Y %H:%M:%S %Z"))
return time.mktime(time.strptime(mod_time, "%a, %d %b %Y %H:%M:%S GMT")) - time.mktime(time.gmtime(0))
except:
warnings.warn("failed to parse the 'last-modified' header")
return None
Expand Down
30 changes: 18 additions & 12 deletions src/sewerrat/retrieve_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,16 @@ def _full_file_url(url: str, path: str) -> str:
return url + "/retrieve/file?path=" + urllib.parse.quote_plus(path)


def _acquire_file_raw(cache: str, path: str, url: str, overwrite: bool, update_delay: int) -> str:
def _acquire_file_raw(cache: str, path: str, url: str, overwrite: bool) -> str:
target = os.path.join(cache, "LOCAL" + path) # os.path.join behaves poorly when 'path' is an absolute path.

if not overwrite:
if not os.path.exists(target):
overwrite = True
else:
last_mod = os.path.getmtime(target)
if last_mod + update_delay < time.time(): # only check older files for updates, to avoid excessive queries.
with requests.head(_full_file_url(url, path)) as r:
if r.status_code >= 300:
raise format_error(r)
with requests.head(_full_file_url(url, path)) as r:
if r.status_code < 300:
last_mod = os.path.getmtime(target)
modtime = ut.parse_remote_last_modified(r)
if modtime is not None and modtime > last_mod:
overwrite = True
Expand All @@ -50,8 +48,8 @@ def _acquire_file_raw(cache: str, path: str, url: str, overwrite: bool, update_d
return target


def _acquire_file(cache: str, path: str, name: str, url: str, overwrite: bool, update_delay: int) -> str:
return _acquire_file_raw(cache, path + "/" + name, url, overwrite, update_delay)
def _acquire_file(cache: str, path: str, name: str, url: str, overwrite: bool) -> str:
return _acquire_file_raw(cache, path + "/" + name, url, overwrite)


def retrieve_directory(path: str, url: str, cache: Optional[str] = None, force_remote: bool = False, overwrite: bool = False, concurrent: int = 1, update_delay: int = 3600) -> str:
Expand Down Expand Up @@ -83,8 +81,8 @@ def retrieve_directory(path: str, url: str, cache: Optional[str] = None, force_r
Number of concurrent downloads.
update_delay:
Maximum age of a cached file, in seconds. Older files will be
automatically checked for updates.
Delay interval before checking for updates in a cached directory,
seconds. Only used for remote access.
Returns:
Path to the subdirectory on the caller's filesystem. This is either
Expand All @@ -109,14 +107,22 @@ def retrieve_directory(path: str, url: str, cache: Optional[str] = None, force_r
raise ut.format_error(res)
listing = res.json()

# Delete all files that aren't in the listing.
ok_files = set(listing)
for root, dirs, filenames in os.walk(final):
for f in filenames:
full = os.path.join(root, f)
if os.path.relpath(full, final) not in ok_files:
os.remove(full)

if concurrent == 1:
for y in listing:
_acquire_file(cache, name=y, path=path, url=url, overwrite=overwrite, update_delay=update_delay)
_acquire_file(cache, name=y, path=path, url=url, overwrite=overwrite)
else:
import multiprocessing
import functools
with multiprocessing.Pool(concurrent) as p:
p.map(functools.partial(_acquire_file, cache, path, url=url, overwrite=overwrite, update_delay=update_delay), listing)
p.map(functools.partial(_acquire_file, cache, path, url=url, overwrite=overwrite), listing)

# We use a directory-level OK file to avoid having to scan through all
# the directory contents to indicate that it's complete.
Expand Down
8 changes: 2 additions & 6 deletions src/sewerrat/retrieve_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from .retrieve_directory import _local_root, _acquire_file_raw


def retrieve_file(path, url, cache: Optional[str] = None, force_remote: bool = False, overwrite: bool = False, update_delay: int = 3600) -> str:
def retrieve_file(path, url, cache: Optional[str] = None, force_remote: bool = False, overwrite: bool = False) -> str:
"""
Retrieve the path to a single file in a registered directory. This will
call the REST API if the caller is not on the same filesystem.
Expand All @@ -27,10 +27,6 @@ def retrieve_file(path, url, cache: Optional[str] = None, force_remote: bool = F
overwrite:
Whether to overwrite existing files in the cache.
update_delay:
Maximum age of a cached file, in seconds. Older files will be
automatically checked for updates.
Returns:
Path to the subdirectory on the caller's filesystem. This is either
``path`` if it is accessible, or a path to a local copy otherwise.
Expand All @@ -39,4 +35,4 @@ def retrieve_file(path, url, cache: Optional[str] = None, force_remote: bool = F
return path
else:
cache = _local_root(cache, url)
return _acquire_file_raw(cache, path, url=url, overwrite=overwrite, update_delay=update_delay)
return _acquire_file_raw(cache, path, url=url, overwrite=overwrite)
39 changes: 39 additions & 0 deletions tests/test_retrieve.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,42 @@ def test_retrieve_directory(setup):
with open(os.path.join(rdir2, "diet", "metadata.json"), "r") as f:
meta = json.load(f)
assert meta["meal"] == "lunch"


def test_retrieve_directory_with_updates():
mydir2 = tempfile.mkdtemp()
with open(os.path.join(mydir2, "metadata.json"), "w") as handle:
handle.write('{ "first": "Kanon", "last": "Shibuya" }')

os.mkdir(os.path.join(mydir2, "2"))
with open(os.path.join(mydir2, "2", "metadata.json"), "w") as handle:
handle.write('{ "first": "Kinako", "last": "Sakurakouji" }')

os.mkdir(os.path.join(mydir2, "3"))
with open(os.path.join(mydir2, "3", "metadata.json"), "w") as handle:
handle.write('{ "first": "Margarete", "last": "Wien" }')

_, url = sewerrat.start_sewerrat()
sewerrat.register(mydir2, ["metadata.json"], url=url)

cache = tempfile.mkdtemp()
dir = sewerrat.retrieve_directory(mydir2, url=url, cache=cache, force_remote=True)
with open(os.path.join(dir, "2", "metadata.json"), "r") as f:
meta = json.load(f)
assert meta["first"] == "Kinako"
assert os.path.exists(os.path.join(dir, "3", "metadata.json"))

# Checking if it responds correctly to remote updates.
time.sleep(1.5)
import shutil
shutil.rmtree(os.path.join(mydir2, "3"))
with open(os.path.join(mydir2, "2", "metadata.json"), "w") as handle:
handle.write('{ "first": "Mei", "last": "Yoneme" }')
time.sleep(1.5)

dir2 = sewerrat.retrieve_directory(mydir2, url=url, cache=cache, force_remote=True, update_delay=0)
assert dir == dir2
with open(os.path.join(dir, "2", "metadata.json"), "r") as f:
meta = json.load(f)
assert meta["first"] == "Mei"
assert not os.path.exists(os.path.join(dir, "3", "metadata.json"))

0 comments on commit 3e87e4b

Please sign in to comment.