diff --git a/Dockerfile.dev b/Dockerfile.dev index bfff40e..bbd70a7 100644 --- a/Dockerfile.dev +++ b/Dockerfile.dev @@ -17,6 +17,7 @@ RUN apt-get update && \ git \ make \ libbz2-dev \ + libffi-dev \ libncurses-dev \ libreadline-dev \ libssl-dev \ @@ -43,6 +44,8 @@ COPY ./docker/install_pyenv.sh /app/docker/install_pyenv.sh RUN /app/docker/install_pyenv.sh +ENV MAKE_OPTS="-j 8" + RUN pyenv install "$PYTHON_VERSION" RUN pyenv global "$PYTHON_VERSION" diff --git a/README.md b/README.md index 0395a25..fe68628 100644 --- a/README.md +++ b/README.md @@ -265,7 +265,7 @@ distinguish between these circumstances. ## Requirements - The `baton-do` executable from the [baton](https://github.com/wtsi-npg/baton) - iRODS client distribution. + iRODS client distribution. Version >=4.3.1 is required. - The unit tests use the [iRODS client icommands](https://github.com/irods/irods_client_icommands) clients. These are not required during normal operation. diff --git a/docker-compose.yml b/docker-compose.yml index 391445c..8d01127 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,7 +3,7 @@ services: irods-server: platform: linux/amd64 container_name: irods-server - image: "ghcr.io/wtsi-npg/ub-16.04-irods-4.2.7:latest" + image: "ghcr.io/wtsi-npg/ub-18.04-irods-4.2.11:latest" ports: - "127.0.0.1:1247:1247" - "127.0.0.1:20000-20199:20000-20199" @@ -26,6 +26,7 @@ services: environment: IRODS_ENVIRONMENT_FILE: "/home/appuser/.irods/irods_environment.json" IRODS_PASSWORD: "irods" + IRODS_VERSION: "4.2.11" depends_on: irods-server: condition: service_healthy diff --git a/src/partisan/irods.py b/src/partisan/irods.py index 76a5b14..c9780b3 100644 --- a/src/partisan/irods.py +++ b/src/partisan/irods.py @@ -21,13 +21,16 @@ from __future__ import annotations # Will not be needed in Python 3.10 import atexit +import hashlib import json +import os import re import subprocess import threading from abc import abstractmethod from collections import defaultdict from contextlib import contextmanager +from dataclasses import dataclass from datetime import datetime, timezone from enum import Enum, unique from functools import total_ordering, wraps @@ -35,7 +38,17 @@ from pathlib import Path, PurePath from queue import LifoQueue, Queue from threading import Thread -from typing import Annotated, Any, Dict, Iterable, List, Optional, Tuple, Type, Union +from typing import ( + Annotated, + Any, + Dict, + Iterable, + List, + Optional, + Tuple, + Type, + Union, +) import dateutil.parser from structlog import get_logger @@ -188,6 +201,7 @@ def list( item: Dict, acl=False, avu=False, + checksum=False, contents=False, replicas=False, size=False, @@ -202,6 +216,7 @@ def list( this must be suitable input for baton-do. acl: Include ACL information in the result. avu: Include AVU information in the result. + checksum: Include checksum information in the result (for a data object). contents: Include contents in the result (for a collection item). replicas: Include replica information in the result. size: Include size information in the result (for a data object). @@ -218,6 +233,7 @@ def list( { "acl": acl, "avu": avu, + "checksum": checksum, "contents": contents, "replicate": replicas, "size": size, @@ -349,7 +365,7 @@ def get( self, item: Dict, local_path: Path, - verify_checksum=True, + verify_checksum=False, force=True, timeout=None, tries=1, @@ -416,9 +432,9 @@ def put( self, item: Dict, local_path: Path, - calculate_checksum=True, - verify_checksum=True, - force=True, + calculate_checksum=False, + verify_checksum=False, + force=False, timeout=None, tries=1, ): @@ -1869,6 +1885,18 @@ class DataObject(RodsItem): DataObject is a PathLike for the iRODS path it represents. """ + EMPTY_FILE_CHECKSUM = "d41d8cd98f00b204e9800998ecf8427e" + + @dataclass(frozen=True) + class Version: + """A record of a data object's state at a specific time.""" + + checksum: str + timestamp: datetime + + def __repr__(self): + return f"({self.checksum}, {self.timestamp.isoformat(timespec='seconds')})" + def __init__( self, remote_path: Union[PurePath, str], @@ -1884,6 +1912,7 @@ def __init__( """ super().__init__(PurePath(remote_path).parent, check_type=check_type, pool=pool) self.name = PurePath(remote_path).name + self.versions = [] @classmethod def query_metadata( @@ -1973,17 +2002,20 @@ def checksum( Returns: A checksum """ - - item = self._to_dict() - with client(self.pool) as c: - return c.checksum( - item, - calculate_checksum=calculate_checksum, - recalculate_checksum=recalculate_checksum, - verify_checksum=verify_checksum, - timeout=timeout, - tries=tries, - ) + if calculate_checksum or recalculate_checksum or verify_checksum: + item = self._to_dict() + with client(self.pool) as c: + return c.checksum( + item, + calculate_checksum=calculate_checksum, + recalculate_checksum=recalculate_checksum, + verify_checksum=verify_checksum, + timeout=timeout, + tries=tries, + ) + else: + item = self._list(checksum=True, timeout=timeout, tries=tries).pop() + return item[Baton.CHECKSUM] @rods_type_check def size(self, timeout=None, tries=1) -> int: @@ -2113,7 +2145,7 @@ def replicas(self, timeout=None, tries=1) -> List[Replica]: @rods_type_check def get( - self, local_path: Union[Path, str], verify_checksum=True, timeout=None, tries=1 + self, local_path: Union[Path, str], verify_checksum=False, timeout=None, tries=1 ): """Get the data object from iRODS and save to a local file. @@ -2137,29 +2169,90 @@ def put( self, local_path: Union[Path, str], calculate_checksum=False, - verify_checksum=True, + verify_checksum=False, + local_checksum=None, + compare_checksums=False, + fill=False, force=True, timeout=None, tries=1, - ): + ) -> DataObject: """Put the data object into iRODS. + If the put operation overwrites an existing data object, the previous version's + checksum and timestamp are recorded in the versions attribute. Multiple versions + are supported. Versions are not recorded if the data object is new. Version + changes that occurred outside the lifetime of the DataObject instance are not + retained. + Args: local_path: The local path of a file to put into iRODS at the path specified by this data object. - calculate_checksum: Calculate remote checksums for all replicas. If - checksums exist, this is a no-op. - verify_checksum: Verify the local checksum against the remote checksum. - Verification implies checksum calculation. + calculate_checksum: Calculate remote checksums for all replicas on the iRODS + server after the pu operation. If checksums exist, this is a no-op. + verify_checksum: Verify the local checksum calculated by the iRODS C API + against the remote checksum calculated by the iRODS server for data + objects. + local_checksum: A caller-supplied checksum of the local file. This may be a + string, a path to a file containing a string, or a file name + transformation function. If the latter, it must accept the local path as + its only argument and return a string checksum. Typically, this is + useful when this checksum is available from an earlier process that + calculated it. + compare_checksums: Compare the local checksum to the remote checksum + calculated by the iRODS server after the put operation. If the checksums + do not match, raise an error. This is in addition to the comparison + provided by the verify_checksum option. + fill: Fill in a missing data object in iRODS. If the data object already + exists, the operation is skipped. That option may be combined with + compare_checksums to ensure that the data object is up to date. force: Overwrite any data object already present in iRODS. timeout: Operation timeout in seconds. tries: Number of times to try the operation. + + Returns: + The DataObject. """ - item = self._to_dict() - with client(self.pool) as c: - c.put( - item, - Path(local_path), + + if compare_checksums: + if local_checksum is None: + chk = _calculate_local_checksum(local_path) + log.info( + "Calculated checksum for file", path=local_path, local_checksum=chk + ) + elif callable(local_checksum): + chk = local_checksum(local_path) + log.info( + "Calculated checksum for file", path=local_path, local_checksum=chk + ) + elif isinstance(local_checksum, os.PathLike): + with open(local_checksum, "r") as f: + chk = f.read() + log.debug( + "Read checksum from file", + path=local_checksum, + local_checksum=chk, + ) + elif isinstance(local_checksum, str): + chk = local_checksum + log.info("Using provided checksum", local_checksum=chk) + else: + raise ValueError( + f"Invalid type for local_checksum: {type(local_checksum)}; must be " + "a string or a path of a file containing a string" + ) + + if fill and self.exists() and self.checksum() == chk: + log.info( + "Data object already exists in iRODS with matching checksum; skipping", + path=self, + local_checksum=chk, + remote_checksum=self.checksum(), + ) + return self + + self._put( + local_path, calculate_checksum=calculate_checksum, verify_checksum=verify_checksum, force=force, @@ -2167,6 +2260,43 @@ def put( tries=tries, ) + if self.checksum() != chk: + raise ValueError( + f"Checksum mismatch after put: {self.checksum()} != {chk}" + ) + + log.info( + "Added data object to iRODS", + path=self, + prev_version=self.versions[-1] if self.versions else None, + local_checksum=chk, + remote_checksum=self.checksum(), + ) + + return self + + if fill and self.exists(): + log.info("Data object already exists in iRODS", path=self) + return self + + self._put( + local_path, + calculate_checksum=calculate_checksum, + verify_checksum=verify_checksum, + force=force, + timeout=timeout, + tries=tries, + ) + + log.info( + "Added data object to iRODS", + path=self, + prev_version=self.versions[-1] if self.versions else None, + remote_checksum=self.checksum(), + ) + + return self + @rods_type_check def read(self, timeout=None, tries=1) -> str: """Read the data object from iRODS into a string. This operation is supported @@ -2219,6 +2349,82 @@ def trim_replicas(self, min_replicas=2, valid=False, invalid=True) -> (int, int) return valid_trimmed, invalid_trimmed + def is_consistent_size(self, timeout=None, tries=1) -> bool: + """Return true if the data object in iRODS is internally consistent. + This is defined as: + + 1. If the file is zero length, it has the checksum of an empty file. + 2. If the file is not zero length, it does not have the checksum of an empty file. + + In iRODS <= 4.2.8 it is possible for a data object to get into a bad state + where it has zero length, but still reports as not stale and having the + checksum of the full-length file. + + We can trigger this behaviour in iRODS by having more than one client uploading + to a single path. iRODS <= 4.2.8 does not support any form of locking and allows + uncoordinated writes to the filesystem. It does recognise this as a failure, + but does not clean up the damaged file. + + This method looks for data object size and checksum consistency. It checks the + values that iRODS reports for the whole data object; it does not check + individual replicas. + + If the data object is absent, this method returns true as there can be no + conflict where neither value exists. + + If the data object has no checksum, this method returns true as there is no + evidence to dispute its reported size. + + Args: + timeout: Operation timeout in seconds. + tries: Number of times to try the operation. + + Returns: + True if the data object is internally consistent, False otherwise. + """ + if not self.exists(): + return True + + chk = self.checksum(timeout=timeout, tries=tries) + if chk is None: + return True + + if self.size(timeout=timeout, tries=tries) == 0: + return chk == DataObject.EMPTY_FILE_CHECKSUM + + return chk != DataObject.EMPTY_FILE_CHECKSUM + + def _put( + self, + local_path: Union[Path, str], + calculate_checksum=False, + verify_checksum=False, + force=False, + timeout=None, + tries=1, + ) -> DataObject: + item = self._to_dict() + + prev = None + if self.exists(): + prev = DataObject.Version(self.checksum(), self.modified()) + + with client(self.pool) as c: + c.put( + item, + Path(local_path), + calculate_checksum=calculate_checksum, + verify_checksum=verify_checksum, + force=force, + timeout=timeout, + tries=tries, + ) + + if prev is not None and self.checksum() != prev.checksum: + self.versions.append(prev) + + return self + def _list(self, **kwargs) -> List[dict]: item = self._to_dict() with client(self.pool) as c: @@ -2295,7 +2501,9 @@ def __init__( """ super().__init__(remote_path, check_type=check_type, pool=pool) - def create(self, parents=False, exist_ok=False, timeout=None, tries=1): + def create( + self, parents=False, exist_ok=False, timeout=None, tries=1 + ) -> Collection: """Create a new, empty Collection on the server side. Args: @@ -2303,13 +2511,17 @@ def create(self, parents=False, exist_ok=False, timeout=None, tries=1): exist_ok: If the collection exists, do not raise an error. timeout: Operation timeout in seconds. tries: Number of times to try the operation. + + Returns: + The Collection. """ if exist_ok and self.exists(): - return + return self item = self._to_dict() with client(self.pool) as c: c.create_collection(item, parents=parents, timeout=timeout, tries=tries) + return self @property def rods_type(self): @@ -2438,17 +2650,89 @@ def get(self, local_path: Union[Path, str], **kwargs): """ raise NotImplementedError() - def put(self, local_path: Union[Path, str], recurse=True, timeout=None, tries=1): + def put( + self, + local_path: Union[Path, str], + recurse=False, + calculate_checksum=False, + verify_checksum=False, + compare_checksums=False, + fill=False, + force=True, + timeout=None, + tries=1, + ) -> Collection: """Put the collection into iRODS. Args: local_path: The local path of a directory to put into iRODS at the path specified by this collection. recurse: Recurse through subdirectories. + calculate_checksum: Calculate remote checksums for all data object replicas. + See DataObject.put() for more information. + verify_checksum: Verify the local checksum calculated by the iRODS C API + against the remote checksum calculated by the iRODS server for data + objects. See DataObject.put() for more information. + compare_checksums: Compare caller-supplied local checksums to the remote + checksums calculated by the iRODS server after the put operation for + data objects. If the checksums do not match, raise an error. See + DataObject.put() for more information. + fill: Fill in missing data objects in iRODS. If the data object already + exists, the operation is skipped. See DataObject.put() for more + information. + force: Overwrite any data objects already present in iRODS. timeout: Operation timeout in seconds. tries: Number of times to try the operation. + + Returns: + The Collection. """ - raise NotImplementedError() + if not Path(local_path).is_dir(): + raise ValueError(f"Local path '{local_path}' is not a directory") + + self.create(exist_ok=True, timeout=timeout, tries=tries) + + if recurse: + for root, dirs, files in os.walk(local_path): + for d in dirs: + p = Path(root, d) + r = PurePath(self.path, p.relative_to(local_path)) + log.debug(f"Creating collection", local_path=d, remote_path=r) + Collection(r).create(exist_ok=True, timeout=timeout) + for f in files: + p = Path(root, f) + r = PurePath(self.path, p.relative_to(local_path)) + log.debug(f"Putting data object", local_path=p, remote_path=r) + DataObject(r).put( + p, + calculate_checksum=calculate_checksum, + verify_checksum=verify_checksum, + compare_checksums=compare_checksums, + fill=fill, + force=force, + timeout=timeout, + tries=tries, + ) + else: + for p in Path(local_path).iterdir(): + if p.is_dir(): + r = PurePath(self.path, p.relative_to(local_path)) + log.debug(f"Creating collection", local_path=p, remote_path=r) + Collection(r).create(exist_ok=True, timeout=timeout) + else: + r = PurePath(self.path, p.name) + log.debug(f"Putting data object", local_path=p, remote_path=r) + DataObject(r).put( + p, + calculate_checksum=calculate_checksum, + verify_checksum=verify_checksum, + compare_checksums=compare_checksums, + force=force, + timeout=timeout, + tries=tries, + ) + + return self def add_permissions(self, *acs: AC, recurse=False, timeout=None, tries=1) -> int: """Add access controls to the collection. Return the number of access @@ -2602,3 +2886,14 @@ def _make_rods_item(item: Dict, pool: BatonPool) -> Union[DataObject, Collection return Collection(PurePath(c), pool=pool) case _: raise BatonError(f"{Baton.COLL} key missing from {item}") + + +def _calculate_local_checksum(local_path: Union[Path, str]) -> str: + """Calculate the MD5 checksum of a local file. + + Args: + local_path: A local file path. + + Returns: The checksum of the file. + """ + return hashlib.md5(Path(local_path).read_bytes()).hexdigest() diff --git a/tests/conftest.py b/tests/conftest.py index 40382c4..061c721 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -31,7 +31,6 @@ from partisan.icommands import ( add_specific_sql, have_admin, - iinit, imkdir, iput, iquest, @@ -195,6 +194,22 @@ def simple_data_object(tmp_path): irm(root_path, force=True, recurse=True) +@pytest.fixture(scope="function") +def empty_data_object(tmp_path): + """A fixture providing a collection containing a single data object containing + no data.""" + root_path = PurePath("/testZone/home/irods/test") + rods_path = add_rods_path(root_path, tmp_path) + + obj_path = rods_path / "empty.txt" + iput("./tests/data/simple/data_object/empty.txt", obj_path) + + try: + yield obj_path + finally: + irm(root_path, force=True, recurse=True) + + @pytest.fixture(scope="function") def annotated_data_object(simple_data_object): """A fixture providing a collection containing a single, annotated data object diff --git a/tests/data/simple/data_object/empty.txt b/tests/data/simple/data_object/empty.txt new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_examples.py b/tests/test_examples.py index c51b91f..6c12ca7 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -117,7 +117,7 @@ def test_data_object_examples(self, ont_gridion, tmp_path): # is the expected size and matches the expected checksum. local_path = Path(tmp_path, obj.name) - obj.get(local_path) + obj.get(local_path, verify_checksum=True) with open(local_path, "rb") as f: m = hashlib.md5() diff --git a/tests/test_irods.py b/tests/test_irods.py index c9d4370..70036f7 100644 --- a/tests/test_irods.py +++ b/tests/test_irods.py @@ -20,7 +20,7 @@ import hashlib import os.path from datetime import datetime, timezone -from pathlib import PurePath +from pathlib import Path, PurePath import pytest from pytest import mark as m @@ -737,6 +737,44 @@ def test_super_ac_collection_recur(self, full_collection): for item in coll.contents(recurse=True): assert item.acl() == new_acl, "Collection content ACL updated" + @m.context("When a Collection does not exist") + @m.it("When put non-recursively") + @m.it("Is created, with its immediate contents") + def test_put_collection(self, simple_collection): + coll = Collection(simple_collection / "sub") + assert not coll.exists() + + local_path = Path("./tests/data/simple").absolute() + coll.put(local_path, recurse=False) + assert coll.exists() + assert coll.contents() == [Collection(coll.path / "data_object")] + + @m.context("When a Collection does not exist") + @m.it("When put recursively") + @m.it("Is created, with descendants and their contents") + def test_put_collection_recur(self, simple_collection): + coll = Collection(simple_collection / "sub") + assert not coll.exists() + + local_path = Path("./tests/data/simple").absolute() + coll.put(local_path, recurse=True, verify_checksum=True) + assert coll.exists() + + sub = Collection(coll.path / "data_object") + empty = DataObject(sub.path / "empty.txt") + lorem = DataObject(sub.path / "lorem.txt") + utf8 = DataObject(sub.path / "utf-8.txt") + + assert coll.contents() == [sub] + assert sub.contents() == [empty, lorem, utf8] + + assert empty.size() == 0 + assert empty.checksum() == "d41d8cd98f00b204e9800998ecf8427e" + assert lorem.size() == 555 + assert lorem.checksum() == "39a4aa291ca849d601e4e5b8ed627a04" + assert utf8.size() == 2522 + assert utf8.checksum() == "500cec3fbb274064e2a25fa17a69638a" + @m.describe("DataObject") class TestDataObject: @@ -768,6 +806,30 @@ def test_make_data_object_collection_path(self, simple_collection): with pytest.raises(BatonError, match="Invalid iRODS path"): DataObject(p).exists() + @m.describe("Putting new data objects") + @m.context("When a DataObject does not exist") + @m.it("Can be put from a local file without checksum creation") + def test_data_object_put_no_checksum(self, simple_collection): + obj = DataObject(simple_collection / "new.txt") + assert not obj.exists() + + local_path = Path("./tests/data/simple/data_object/lorem.txt").absolute() + obj.put(local_path, calculate_checksum=False, verify_checksum=False) + assert obj.exists() + assert obj.size() == 555 + assert obj.checksum() is None + + @m.it("Can be put from a local file with checksum creation") + def test_data_object_put_checksum_no_verify(self, simple_collection): + obj = DataObject(simple_collection / "new.txt") + assert not obj.exists() + + local_path = Path("./tests/data/simple/data_object/lorem.txt").absolute() + obj.put(local_path, calculate_checksum=True, verify_checksum=False) + assert obj.exists() + assert obj.size() == 555 + assert obj.checksum() == "39a4aa291ca849d601e4e5b8ed627a04" + @m.describe("Operations on an existing DataObject") @m.context("When a DataObject exists") @m.it("Can be detected") @@ -789,7 +851,7 @@ def test_get_data_object(self, tmp_path, simple_data_object): obj = DataObject(simple_data_object) local_path = tmp_path / simple_data_object.name - size = obj.get(local_path) + size = obj.get(local_path, verify_checksum=True) assert size == 555 md5 = hashlib.md5(open(local_path, "rb").read()).hexdigest() @@ -810,6 +872,36 @@ def test_data_object_size(self, simple_data_object): assert obj.size() == 555 assert len(obj.read()) == 555 + @m.it("Can have its checksum and size consistency verified") + def test_verify_data_object_consistency(self, simple_collection): + obj = DataObject(simple_collection / "new.txt") + obj.put( + Path("./tests/data/simple/data_object/lorem.txt"), + calculate_checksum=False, + verify_checksum=False, + ) + assert obj.size() == 555 + assert obj.checksum() is None + assert obj.is_consistent_size() + chk = obj.checksum(calculate_checksum=True) + assert obj.checksum() == chk + assert chk == "39a4aa291ca849d601e4e5b8ed627a04" + assert obj.is_consistent_size() + + empty = DataObject(simple_collection / "empty.txt") + empty.put( + Path("./tests/data/simple/data_object/empty.txt"), + calculate_checksum=False, + verify_checksum=False, + ) + assert empty.size() == 0 + assert empty.checksum() is None + assert empty.is_consistent_size() + chk = empty.checksum(calculate_checksum=True) + assert empty.checksum() == chk + assert chk == "d41d8cd98f00b204e9800998ecf8427e" # Checksum of an empty file + assert empty.is_consistent_size() + @m.it("Has a checksum") def test_get_checksum(self, simple_data_object): obj = DataObject(simple_data_object) @@ -824,7 +916,7 @@ def test_verify_checksum_good(self, simple_data_object): obj = DataObject(simple_data_object) # Note that in iRODS >= 4.2.10, this always passes, even if the remote file - # is the wrong size of has a mismatching checksum, because of this iRODS bug: + # is the wrong size or has a mismatching checksum, because of this iRODS bug: # https://github.com/irods/irods/issues/5843 assert obj.checksum(verify_checksum=True) @@ -840,6 +932,26 @@ def test_verify_checksum_bad(self, invalid_checksum_data_object): obj.checksum(verify_checksum=True) assert e.value.code == -407000 # CHECK_VERIFICATION_RESULTS + @m.it("Fails checksum verification if it has no checksum") + @pytest.mark.skipif( + irods_version() <= (4, 2, 10), + reason=f"requires iRODS server >4.2.10; version is {irods_version()}", + ) + def test_verify_checksum_missing(self, simple_collection): + obj = DataObject(simple_collection / "new.txt") + obj.put( + Path("./tests/data/simple/data_object/lorem.txt"), + calculate_checksum=False, + verify_checksum=False, + ) + + assert obj.size() == 555 + assert obj.checksum() is None + assert obj.is_consistent_size() + with pytest.raises(RodsError, match="checksum") as e: + obj.checksum(verify_checksum=True) + assert e.value.code == -407000 # CHECK_VERIFICATION_RESULTS + @m.it("Has replicas") def test_replicas(self, simple_data_object): obj = DataObject(simple_data_object)