From 65f8aa3905febcf09b2534ebdaf5c454e90e5add Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Wed, 31 Jan 2024 23:26:49 +0000 Subject: [PATCH 01/31] pull from remote --- python/src/space/core/ops/append.py | 4 ++ python/src/space/core/proto/metadata_pb2.pyi | 3 ++ python/src/space/core/runners.py | 16 +++--- python/src/space/core/storage.py | 57 ++++++++++++++------ 4 files changed, 57 insertions(+), 23 deletions(-) diff --git a/python/src/space/core/ops/append.py b/python/src/space/core/ops/append.py index 0c39c5d..4a9e443 100644 --- a/python/src/space/core/ops/append.py +++ b/python/src/space/core/ops/append.py @@ -79,6 +79,7 @@ def __init__(self, location: str, metadata: meta.StorageMetadata, file_options: FileOptions, + branch: Optional[str] = None, record_address_input: bool = False): """ Args: @@ -114,6 +115,9 @@ def __init__(self, self._record_manifest_writer = RecordManifestWriter(self._metadata_dir) self._patch = rt.Patch() + if branch: + self._patch.branch = branch + def write(self, data: InputData) -> None: if not isinstance(data, pa.Table): diff --git a/python/src/space/core/proto/metadata_pb2.pyi b/python/src/space/core/proto/metadata_pb2.pyi index 0581e6c..9ad0be6 100644 --- a/python/src/space/core/proto/metadata_pb2.pyi +++ b/python/src/space/core/proto/metadata_pb2.pyi @@ -366,6 +366,8 @@ global___StorageStatistics = StorageStatistics class ChangeLog(google.protobuf.message.Message): """Change log stores changes made by a snapshot. NEXT_ID: 3 + TODO: to replace RowBitmap list by runtime.FileSet (not backward + compatible). """ DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -391,6 +393,7 @@ global___ChangeLog = ChangeLog @typing_extensions.final class RowBitmap(google.protobuf.message.Message): """Mark rows in a file by bitmap. + TODO: to replace it by runtime.DataFile (not backward compatible). NEXT_ID: 5 """ diff --git a/python/src/space/core/runners.py b/python/src/space/core/runners.py index 855e459..219d6b4 100644 --- a/python/src/space/core/runners.py +++ b/python/src/space/core/runners.py @@ -143,13 +143,13 @@ def __init__(self, self._file_options = file_options or FileOptions() @abstractmethod - def append(self, data: InputData) -> JobResult: + def append(self, data: InputData, branch: Optional[str] = None) -> JobResult: """Append data into the dataset.""" @abstractmethod def append_from( self, source_fns: Union[InputIteratorFn, - List[InputIteratorFn]]) -> JobResult: + List[InputIteratorFn]], branch: Optional[str] = None) -> JobResult: """Append data into the dataset from an iterator source. source_fns contains a list of no args functions that return iterators. It @@ -159,7 +159,7 @@ def append_from( @abstractmethod def append_array_record(self, pattern: str, - index_fn: ArrayRecordIndexFn) -> JobResult: + index_fn: ArrayRecordIndexFn, branch: Optional[str] = None ) -> JobResult: """Append data from ArrayRecord files without copying data. Args: @@ -170,7 +170,7 @@ def append_array_record(self, pattern: str, """ @abstractmethod - def append_parquet(self, pattern: str) -> JobResult: + def append_parquet(self, pattern: str, branch: Optional[str] = None) -> JobResult: """Append data from Parquet files without copying data. Args: @@ -238,18 +238,18 @@ def diff(self, ReadOptions(batch_size=batch_size)) @StorageMixin.transactional - def append(self, data: InputData) -> Optional[rt.Patch]: + def append(self, data: InputData, branch: Optional[str] = None) -> Optional[rt.Patch]: op = LocalAppendOp(self._storage.location, self._storage.metadata, - self._file_options) + self._file_options, branch) op.write(data) return op.finish() @StorageMixin.transactional def append_from( - self, source_fns: Union[InputIteratorFn, List[InputIteratorFn]] + self, source_fns: Union[InputIteratorFn, List[InputIteratorFn]], branch: Optional[str] = None ) -> Optional[rt.Patch]: op = LocalAppendOp(self._storage.location, self._storage.metadata, - self._file_options) + self._file_options, branch) if not isinstance(source_fns, list): source_fns = [source_fns] diff --git a/python/src/space/core/storage.py b/python/src/space/core/storage.py index 14e4470..5a127d4 100644 --- a/python/src/space/core/storage.py +++ b/python/src/space/core/storage.py @@ -51,6 +51,8 @@ # Initial snapshot ID. _INIT_SNAPSHOT_ID = 0 +_RESERVED_REFERENCE = ["main"] + # pylint: disable=too-many-public-methods class Storage(paths.StoragePathsMixin): @@ -210,24 +212,38 @@ def _lookup_reference(self, tag_or_branch: str) -> meta.SnapshotReference: def add_tag(self, tag: str, snapshot_id: Optional[int] = None) -> None: """Add tag to a snapshot""" + self._add_reference(tag, meta.SnapshotReference.TAG, snapshot_id) + + def add_branch(self, branch: str, snapshot_id: Optional[int] = None) -> None: + """Add branch to a snapshot""" + self._add_reference(branch, meta.SnapshotReference.BRANCH, snapshot_id) + + def _add_reference(self, + reference_name: str, + reference_type: meta.SnapshotReference.ReferenceType, + snapshot_id: Optional[int] = None) -> None: + """Add reference to a snapshot""" if snapshot_id is None: snapshot_id = self._metadata.current_snapshot_id if snapshot_id not in self._metadata.snapshots: raise errors.VersionNotFoundError(f"Snapshot {snapshot_id} is not found") - if len(tag) == 0: - raise errors.UserInputError("Tag cannot be empty") + if len(reference_name) == 0: + raise errors.UserInputError("{reference_type} cannot be empty") - if tag in self._metadata.refs: - raise errors.VersionAlreadyExistError(f"Reference {tag} already exist") + if reference_name in _RESERVED_REFERENCE: + raise errors.UserInputError("{reference_name} is reserved") + + if reference_name in self._metadata.refs: + raise errors.VersionAlreadyExistError(f"Reference {reference_name} already exist") new_metadata = meta.StorageMetadata() new_metadata.CopyFrom(self._metadata) - tag_ref = meta.SnapshotReference(reference_name=tag, - snapshot_id=snapshot_id, - type=meta.SnapshotReference.TAG) - new_metadata.refs[tag].CopyFrom(tag_ref) + ref = meta.SnapshotReference(reference_name=reference_name, + snapshot_id=snapshot_id, + type=reference_type) + new_metadata.refs[reference_name].CopyFrom(ref) new_metadata_path = self.new_metadata_path() self._write_metadata(new_metadata_path, new_metadata) self._metadata = new_metadata @@ -235,13 +251,20 @@ def add_tag(self, tag: str, snapshot_id: Optional[int] = None) -> None: def remove_tag(self, tag: str) -> None: """Remove tag from metadata""" - if (tag not in self._metadata.refs or - self._metadata.refs[tag].type != meta.SnapshotReference.TAG): - raise errors.VersionNotFoundError(f"Tag {tag} is not found") + self._remove_reference(tag, meta.SnapshotReference.TAG) + + def remove_branch(self, branch: str) -> None: + """Remove tag from metadata""" + self._remove_reference(branch, meta.SnapshotReference.BRANCH) + + def _remove_reference(self, reference_name:str, reference_type: meta.SnapshotReference.ReferenceType)-> None: + if (reference_name not in self._metadata.refs or + self._metadata.refs[reference_name].type != reference_type): + raise errors.VersionNotFoundError(f"{reference_type} {reference_name} is not found") new_metadata = meta.StorageMetadata() new_metadata.CopyFrom(self._metadata) - del new_metadata.refs[tag] + del new_metadata.refs[reference_name] new_metadata_path = self.new_metadata_path() self._write_metadata(new_metadata_path, new_metadata) self._metadata = new_metadata @@ -256,12 +279,16 @@ def commit(self, patch: rt.Patch) -> None: Args: patch: a patch describing changes made to the storage. """ - current_snapshot = self.snapshot() - new_metadata = meta.StorageMetadata() new_metadata.CopyFrom(self._metadata) new_snapshot_id = self._next_snapshot_id() - new_metadata.current_snapshot_id = new_snapshot_id + if patch.branch: + branch_snapshot_id = self._lookup_reference(patch.branch) + current_snapshot = self.snapshot(branch_snapshot_id) + new_metadata.refs[patch.branch].snapshot_id = new_snapshot_id + else: + new_metadata.current_snapshot_id = new_snapshot_id + current_snapshot = self.snapshot() new_metadata.last_update_time.CopyFrom(proto_now()) new_metadata_path = self.new_metadata_path() From 212a9a75abcb402893b978685b1fe2e168edfc83 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Wed, 31 Jan 2024 23:29:05 +0000 Subject: [PATCH 02/31] rebase from remote --- python/src/space/ray/ops/append.py | 6 ++++-- python/src/space/ray/runners.py | 14 ++++++++------ 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/python/src/space/ray/ops/append.py b/python/src/space/ray/ops/append.py index ce87144..2e29a40 100644 --- a/python/src/space/ray/ops/append.py +++ b/python/src/space/ray/ops/append.py @@ -38,6 +38,7 @@ def __init__(self, metadata: meta.StorageMetadata, ray_options: RayOptions, file_options: FileOptions, + branch: Optional[str] = None, record_address_input: bool = False): """ Args: @@ -46,7 +47,7 @@ def __init__(self, self._ray_options = ray_options self._actors = [ _AppendActor.remote( # type: ignore[attr-defined] # pylint: disable=no-member - location, metadata, file_options, record_address_input) + location, metadata, file_options, branch, record_address_input) for _ in range(self._ray_options.max_parallelism) ] @@ -101,8 +102,9 @@ def __init__(self, location: str, metadata: meta.StorageMetadata, file_options: FileOptions, + branch: Optional[str] = None, record_address_input: bool = False): - self._op = LocalAppendOp(location, metadata, file_options, + self._op = LocalAppendOp(location, metadata, file_options, branch, record_address_input) def write_from(self, source_fn: InputIteratorFn) -> None: diff --git a/python/src/space/ray/runners.py b/python/src/space/ray/runners.py index 51ff67b..3696153 100644 --- a/python/src/space/ray/runners.py +++ b/python/src/space/ray/runners.py @@ -273,16 +273,15 @@ def __init__(self, self._ray_options = ray_options or RayOptions() @StorageMixin.transactional - def append(self, data: InputData) -> Optional[rt.Patch]: + def append(self, data: InputData, branch: Optional[str] = None) -> Optional[rt.Patch]: op = RayAppendOp(self._storage.location, self._storage.metadata, - self._ray_options, self._file_options) + self._ray_options, self._file_options, branch) op.write(data) return op.finish() @StorageMixin.transactional def append_from( - self, source_fns: Union[InputIteratorFn, List[InputIteratorFn]] - ) -> Optional[rt.Patch]: + self, source_fns: Union[InputIteratorFn, List[InputIteratorFn]], branch: Optional[str] = None) -> Optional[rt.Patch]: if not isinstance(source_fns, list): source_fns = [source_fns] @@ -290,8 +289,11 @@ def append_from( ray_options.max_parallelism = min(len(source_fns), ray_options.max_parallelism) - return _append_from(self._storage, source_fns, ray_options, - self._file_options) + op = RayAppendOp(self._storage.location, self._storage.metadata, + ray_options, self._file_options, branch) + op.write_from(source_fns) + + return op.finish() @StorageMixin.transactional def append_array_record(self, pattern: str, From 122a815b2482edd344b58a6549807a6167b0c750 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Wed, 31 Jan 2024 23:31:12 +0000 Subject: [PATCH 03/31] rebase1 --- python/src/space/core/datasets.py | 18 +++++++ python/src/space/core/ops/append.py | 3 -- python/src/space/core/runners.py | 14 +++--- python/src/space/core/storage.py | 63 ++++++++++++++++++------ python/src/space/ray/runners.py | 8 +-- python/tests/core/ops/test_delete.py | 4 +- python/tests/core/ops/test_read.py | 4 +- python/tests/core/test_runners.py | 73 ++++++++++++++++++++++++++++ python/tests/core/test_storage.py | 73 ++++++++++++++++++++++++++-- python/tests/ray/test_runners.py | 21 ++++++++ 10 files changed, 243 insertions(+), 38 deletions(-) diff --git a/python/src/space/core/datasets.py b/python/src/space/core/datasets.py index 6912354..1935dec 100644 --- a/python/src/space/core/datasets.py +++ b/python/src/space/core/datasets.py @@ -22,6 +22,7 @@ from space.core.ops.utils import FileOptions from space.core.options import JoinOptions, ReadOptions +from space.core.utils import errors from space.core.runners import LocalRunner from space.core.storage import Storage from space.core.transform.plans import LogicalPlanBuilder @@ -35,6 +36,7 @@ class Dataset(View): def __init__(self, storage: Storage): self._storage = storage + self._current_branche = "main" @property def storage(self) -> Storage: @@ -81,6 +83,22 @@ def remove_tag(self, tag: str): """Remove tag from a snapshot.""" self._storage.remove_tag(tag) + @property + def current_branch(self) -> str: + """Return the current branch.""" + return self._current_branch + + def add_branch(self, branch: str): + """Add branch to a snapshot.""" + self._storage.add_branch(branch=branch) + + def remove_branch(self, branch: str): + """Remove tag from a snapshot.""" + self._storage.remove_branch(branch) + + def set_current_branch(self, branch: str): + self.storage.set_current_branch(branch) + def local(self, file_options: Optional[FileOptions] = None) -> LocalRunner: """Get a runner that runs operations locally.""" return LocalRunner(self._storage, file_options) diff --git a/python/src/space/core/ops/append.py b/python/src/space/core/ops/append.py index 4a9e443..a11e542 100644 --- a/python/src/space/core/ops/append.py +++ b/python/src/space/core/ops/append.py @@ -79,7 +79,6 @@ def __init__(self, location: str, metadata: meta.StorageMetadata, file_options: FileOptions, - branch: Optional[str] = None, record_address_input: bool = False): """ Args: @@ -115,8 +114,6 @@ def __init__(self, self._record_manifest_writer = RecordManifestWriter(self._metadata_dir) self._patch = rt.Patch() - if branch: - self._patch.branch = branch def write(self, data: InputData) -> None: diff --git a/python/src/space/core/runners.py b/python/src/space/core/runners.py index 219d6b4..6fe4d37 100644 --- a/python/src/space/core/runners.py +++ b/python/src/space/core/runners.py @@ -143,13 +143,13 @@ def __init__(self, self._file_options = file_options or FileOptions() @abstractmethod - def append(self, data: InputData, branch: Optional[str] = None) -> JobResult: + def append(self, data: InputData) -> JobResult: """Append data into the dataset.""" @abstractmethod def append_from( self, source_fns: Union[InputIteratorFn, - List[InputIteratorFn]], branch: Optional[str] = None) -> JobResult: + List[InputIteratorFn]]) -> JobResult: """Append data into the dataset from an iterator source. source_fns contains a list of no args functions that return iterators. It @@ -159,7 +159,7 @@ def append_from( @abstractmethod def append_array_record(self, pattern: str, - index_fn: ArrayRecordIndexFn, branch: Optional[str] = None ) -> JobResult: + index_fn: ArrayRecordIndexFn) -> JobResult: """Append data from ArrayRecord files without copying data. Args: @@ -238,18 +238,18 @@ def diff(self, ReadOptions(batch_size=batch_size)) @StorageMixin.transactional - def append(self, data: InputData, branch: Optional[str] = None) -> Optional[rt.Patch]: + def append(self, data: InputData) -> Optional[rt.Patch]: op = LocalAppendOp(self._storage.location, self._storage.metadata, - self._file_options, branch) + self._file_options) op.write(data) return op.finish() @StorageMixin.transactional def append_from( - self, source_fns: Union[InputIteratorFn, List[InputIteratorFn]], branch: Optional[str] = None + self, source_fns: Union[InputIteratorFn, List[InputIteratorFn]] ) -> Optional[rt.Patch]: op = LocalAppendOp(self._storage.location, self._storage.metadata, - self._file_options, branch) + self._file_options) if not isinstance(source_fns, list): source_fns = [source_fns] diff --git a/python/src/space/core/storage.py b/python/src/space/core/storage.py index 5a127d4..53fea2d 100644 --- a/python/src/space/core/storage.py +++ b/python/src/space/core/storage.py @@ -50,8 +50,8 @@ # Initial snapshot ID. _INIT_SNAPSHOT_ID = 0 - -_RESERVED_REFERENCE = ["main"] +_MAIN_BRANCH = "main" +_RESERVED_REFERENCE = [_MAIN_BRANCH] # pylint: disable=too-many-public-methods @@ -62,7 +62,7 @@ class Storage(paths.StoragePathsMixin): """ def __init__(self, location: str, metadata_file: str, - metadata: meta.StorageMetadata): + metadata: meta.StorageMetadata, current_branch: Optional[str] = None): super().__init__(location) self._fs = create_fs(location) self._metadata = metadata @@ -78,13 +78,22 @@ def __init__(self, location: str, metadata_file: str, self._field_name_ids: Dict[str, int] = arrow.field_name_to_id_dict( self._physical_schema) - self._primary_keys = set(self._metadata.schema.primary_keys) + self._primary_keys = set(self._metadata.schema.primary_keys) + self._current_branch = current_branch + self._max_snapshot_id = max([ref[1].snapshot_id for ref in self._metadata.refs.items()] + [self._metadata.current_snapshot_id]) @property def metadata(self) -> meta.StorageMetadata: """Return the storage metadata.""" return self._metadata + @property + def current_branch(self) -> str: + """Return the current branch.""" + if not self._current_branch: + return _MAIN_BRANCH + return self._current_branch + @property def primary_keys(self) -> List[str]: """Return the storage primary keys.""" @@ -187,7 +196,7 @@ def reload(self) -> bool: return False metadata = _read_metadata(self._fs, self._location, entry_point) - self.__init__(self.location, entry_point.metadata_file, metadata) # type: ignore[misc] # pylint: disable=unnecessary-dunder-call + self.__init__(self.location, entry_point.metadata_file, metadata, self.current_branch) # type: ignore[misc] # pylint: disable=unnecessary-dunder-call logging.info( f"Storage reloaded to snapshot: {self._metadata.current_snapshot_id}") return True @@ -214,9 +223,17 @@ def add_tag(self, tag: str, snapshot_id: Optional[int] = None) -> None: """Add tag to a snapshot""" self._add_reference(tag, meta.SnapshotReference.TAG, snapshot_id) - def add_branch(self, branch: str, snapshot_id: Optional[int] = None) -> None: + def add_branch(self, branch: str) -> None: """Add branch to a snapshot""" - self._add_reference(branch, meta.SnapshotReference.BRANCH, snapshot_id) + self._add_reference(branch, meta.SnapshotReference.BRANCH, None) + + def set_current_branch(self, branch: str): + """Set current branch for the snapshot.""" + if branch != "main": + snapshot_ref = self._lookup_reference(branch) + if snapshot_ref.type != meta.SnapshotReference.BRANCH: + raise errors.UserInputError("{branch} is not a branch.") + self._current_branch = branch def _add_reference(self, reference_name: str, @@ -255,6 +272,8 @@ def remove_tag(self, tag: str) -> None: def remove_branch(self, branch: str) -> None: """Remove tag from metadata""" + if branch == self._current_branch: + raise errors.UserInputError("Cannot remove the current branch.") self._remove_reference(branch, meta.SnapshotReference.BRANCH) def _remove_reference(self, reference_name:str, reference_type: meta.SnapshotReference.ReferenceType)-> None: @@ -270,7 +289,7 @@ def _remove_reference(self, reference_name:str, reference_type: meta.SnapshotRef self._metadata = new_metadata self._metadata_file = new_metadata_path - def commit(self, patch: rt.Patch) -> None: + def commit(self, patch: rt.Patch, branch:str) -> None: """Commit changes to the storage. TODO: only support a single writer; to ensure atomicity in commit by @@ -278,14 +297,19 @@ def commit(self, patch: rt.Patch) -> None: Args: patch: a patch describing changes made to the storage. + branch: the branch this commit is writing to. """ new_metadata = meta.StorageMetadata() new_metadata.CopyFrom(self._metadata) new_snapshot_id = self._next_snapshot_id() - if patch.branch: - branch_snapshot_id = self._lookup_reference(patch.branch) - current_snapshot = self.snapshot(branch_snapshot_id) - new_metadata.refs[patch.branch].snapshot_id = new_snapshot_id + if branch and branch != _MAIN_BRANCH: + branch_snapshot = self._lookup_reference(branch) + # To block the case delete branch and add a tag during commit + # TODO: move this check out of commit() + if branch_snapshot.type != meta.SnapshotReference.BRANCH: + raise errors.UserInputError("Branch {branch} is no longer exists.") + current_snapshot = self.snapshot(branch_snapshot.snapshot_id) + new_metadata.refs[branch].snapshot_id = new_snapshot_id else: new_metadata.current_snapshot_id = new_snapshot_id current_snapshot = self.snapshot() @@ -316,6 +340,7 @@ def commit(self, patch: rt.Patch) -> None: self._write_metadata(new_metadata_path, new_metadata) self._metadata = new_metadata self._metadata_file = new_metadata_path + print(self._metadata) def data_files(self, filter_: Optional[pc.Expression] = None, @@ -444,7 +469,8 @@ def _initialize_files(self, metadata_path: str) -> None: raise errors.StorageExistError(str(e)) from None def _next_snapshot_id(self) -> int: - return self._metadata.current_snapshot_id + 1 + self._max_snapshot_id = self._max_snapshot_id + 1 + return self._max_snapshot_id def _write_metadata( self, @@ -500,7 +526,7 @@ def __init__(self, storage: Storage): self._txn_id = uuid_() # The storage snapshot ID when the transaction starts. self._snapshot_id: Optional[int] = None - + self._branch = storage.current_branch self._result: Optional[JobResult] = None def commit(self, patch: Optional[rt.Patch]) -> None: @@ -510,7 +536,10 @@ def commit(self, patch: Optional[rt.Patch]) -> None: # Check that no other commit has taken place. assert self._snapshot_id is not None self._storage.reload() - if self._snapshot_id != self._storage.metadata.current_snapshot_id: + current_snapshot_id = self._storage.metadata.current_snapshot_id + if self._branch != _MAIN_BRANCH: + current_snapshot_id = self._storage.version_to_snapshot_id(self._branch) + if self._snapshot_id != current_snapshot_id: self._result = JobResult( JobResult.State.FAILED, None, "Abort commit because the storage has been modified.") @@ -520,7 +549,7 @@ def commit(self, patch: Optional[rt.Patch]) -> None: self._result = JobResult(JobResult.State.SKIPPED) return - self._storage.commit(patch) + self._storage.commit(patch, self._branch) self._result = JobResult(JobResult.State.SUCCEEDED, patch.storage_statistics_update) @@ -537,6 +566,8 @@ def __enter__(self) -> Transaction: # mutations. self._storage.reload() self._snapshot_id = self._storage.metadata.current_snapshot_id + if self._branch != _MAIN_BRANCH: + self._snapshot_id = self._storage._lookup_reference(self._branch).snapshot_id logging.info(f"Start transaction {self._txn_id}") return self diff --git a/python/src/space/ray/runners.py b/python/src/space/ray/runners.py index 3696153..7bd7dd0 100644 --- a/python/src/space/ray/runners.py +++ b/python/src/space/ray/runners.py @@ -273,15 +273,15 @@ def __init__(self, self._ray_options = ray_options or RayOptions() @StorageMixin.transactional - def append(self, data: InputData, branch: Optional[str] = None) -> Optional[rt.Patch]: + def append(self, data: InputData) -> Optional[rt.Patch]: op = RayAppendOp(self._storage.location, self._storage.metadata, - self._ray_options, self._file_options, branch) + self._ray_options, self._file_options) op.write(data) return op.finish() @StorageMixin.transactional def append_from( - self, source_fns: Union[InputIteratorFn, List[InputIteratorFn]], branch: Optional[str] = None) -> Optional[rt.Patch]: + self, source_fns: Union[InputIteratorFn, List[InputIteratorFn]]) -> Optional[rt.Patch]: if not isinstance(source_fns, list): source_fns = [source_fns] @@ -290,7 +290,7 @@ def append_from( ray_options.max_parallelism) op = RayAppendOp(self._storage.location, self._storage.metadata, - ray_options, self._file_options, branch) + ray_options, self._file_options) op.write_from(source_fns) return op.finish() diff --git a/python/tests/core/ops/test_delete.py b/python/tests/core/ops/test_delete.py index db616d5..9825a0b 100644 --- a/python/tests/core/ops/test_delete.py +++ b/python/tests/core/ops/test_delete.py @@ -42,7 +42,7 @@ def test_delete_all_types(self, tmp_path, all_types_schema, for batch in input_data: append_op.write(batch) - storage.commit(append_op.finish()) + storage.commit(append_op.finish(), "main") old_data_files = storage.data_files() delete_op = FileSetDeleteOp( @@ -54,7 +54,7 @@ def test_delete_all_types(self, tmp_path, all_types_schema, _default_file_options) patch = delete_op.delete() assert patch is not None - storage.commit(patch) + storage.commit(patch, "main") # Verify storage metadata after patch. new_data_files = storage.data_files() diff --git a/python/tests/core/ops/test_read.py b/python/tests/core/ops/test_read.py index f6c053c..c0c3cec 100644 --- a/python/tests/core/ops/test_read.py +++ b/python/tests/core/ops/test_read.py @@ -42,7 +42,7 @@ def test_read_all_types(self, tmp_path, all_types_schema, for batch in input_data: append_op.write(batch) - storage.commit(append_op.finish()) + storage.commit(append_op.finish(), "main") read_op = FileSetReadOp(str(location), storage.metadata, storage.data_files()) @@ -80,7 +80,7 @@ def test_read_with_record_filters(self, tmp_path, record_fields_schema, for batch in input_data: append_op.write(batch) - storage.commit(append_op.finish()) + storage.commit(append_op.finish(), "main") data_files = storage.data_files() read_op = FileSetReadOp(str(location), storage.metadata, data_files) diff --git a/python/tests/core/test_runners.py b/python/tests/core/test_runners.py index c094bb0..8f4cb85 100644 --- a/python/tests/core/test_runners.py +++ b/python/tests/core/test_runners.py @@ -200,6 +200,79 @@ def test_add_read_remove_tag(self, sample_dataset): assert "Version insert1 is not found" in str(excinfo.value) + def test_concurrent_write_to_different_branch(self, sample_dataset): + ds = sample_dataset + ds.add_branch("exp1") + local_runner = ds.local() + lock1 = threading.Lock() + lock2 = threading.Lock() + lock1.acquire() + lock2.acquire() + + sample_data = _generate_data([1, 2]) + + def make_iter(): + yield sample_data + lock2.release() + lock1.acquire() + yield sample_data + lock1.release() + + job_result = [None] + + def append_data(): + job_result[0] = local_runner.append_from(make_iter) + + t = threading.Thread(target=append_data) + t.start() + lock2.acquire() + ds.set_current_branch("exp1") + local_runner.append(sample_data) + lock2.release() + lock1.release() + t.join() + + assert local_runner.read_all() == pa.concat_tables( + [sample_data, sample_data]) + assert local_runner.read_all(version="exp1") == pa.concat_tables( + [sample_data]) + + def test_add_read_with_branch(self, sample_dataset): + ds = sample_dataset + local_runner = ds.local() + + sample_data1 = _generate_data([1, 2]) + local_runner.append(sample_data1) + + ds.add_branch("exp1") + + assert local_runner.read_all() == sample_data1 + + create_time0 = datetime.utcfromtimestamp( + ds.storage.metadata.snapshots[0].create_time.seconds).replace( + tzinfo=pytz.utc) + create_time1 = datetime.utcfromtimestamp( + ds.storage.metadata.snapshots[1].create_time.seconds).replace( + tzinfo=pytz.utc) + assert ds.versions().to_pydict() == { + "snapshot_id": [1, 0], + "tag_or_branch": ["exp1", None], + "create_time": [create_time1, create_time0] + } + + sample_data2 = _generate_data([3, 4]) + local_runner.append(sample_data2) + + ds.set_current_branch("exp1") + + sample_data3 = _generate_data([5, 6]) + local_runner.append(sample_data3) + + assert local_runner.read_all() == pa.concat_tables( + [sample_data1, sample_data2]) + assert local_runner.read_all(version="exp1") == pa.concat_tables( + [sample_data1, sample_data3]) + def test_dataset_with_file_type(self, tmp_path): schema = pa.schema([("id", pa.int64()), ("name", pa.string()), ("file", File(directory="test_folder"))]) diff --git a/python/tests/core/test_storage.py b/python/tests/core/test_storage.py index cc7efc0..41b2f4e 100644 --- a/python/tests/core/test_storage.py +++ b/python/tests/core/test_storage.py @@ -137,7 +137,7 @@ def test_commit(self, tmp_path): record_uncompressed_bytes=30) patch = rt.Patch(addition=added_manifest_files, storage_statistics_update=added_storage_statistics) - storage.commit(patch) + storage.commit(patch,"main") assert storage.snapshot(0) is not None new_snapshot = storage.snapshot(1) @@ -155,7 +155,7 @@ def test_commit(self, tmp_path): index_manifest_files=["data/index_manifest1"], record_manifest_files=["data/record_manifest1"]), storage_statistics_update=added_storage_statistics2) - storage.commit(patch) + storage.commit(patch,"main") new_snapshot = storage.snapshot(2) assert new_snapshot.manifest_files == meta.ManifestFiles( @@ -177,7 +177,7 @@ def test_commit(self, tmp_path): index_compressed_bytes=-10, index_uncompressed_bytes=-20, record_uncompressed_bytes=-30)) - storage.commit(patch) + storage.commit(patch, "main") new_snapshot = storage.snapshot(3) assert new_snapshot.manifest_files.index_manifest_files == [ "data/index_manifest1" @@ -202,7 +202,7 @@ def create_index_manifest_writer(): def commit_add_index_manifest(manifest_path: str): patch = rt.Patch(addition=meta.ManifestFiles( index_manifest_files=[storage.short_path(manifest_path)])) - storage.commit(patch) + storage.commit(patch, "main") manifest_writer = create_index_manifest_writer() manifest_writer.write( @@ -375,3 +375,68 @@ def test_tags(self, tmp_path): "tag_or_branch": ["tag2"], "create_time": [create_time1] } + + def test_branches(self, tmp_path): + location = tmp_path / "dataset" + storage = Storage.create(location=str(location), + schema=_SCHEMA, + primary_keys=["int64"], + record_fields=[]) + + create_time1 = datetime.utcfromtimestamp( + storage.metadata.snapshots[0].create_time.seconds).replace( + tzinfo=pytz.utc) + assert storage.versions().to_pydict() == { + "snapshot_id": [0], + "tag_or_branch": [None], + "create_time": [create_time1] + } + + storage.add_branch("branch1") + + with pytest.raises(errors.UserInputError, match=r".*already exist.*"): + storage.add_branch("branch1") + + storage.add_branch("branch2") + + snapshot_id1 = storage.version_to_snapshot_id("branch1") + snapshot_id2 = storage.version_to_snapshot_id("branch2") + + metadata = storage.metadata + assert len(metadata.refs) == 2 + assert snapshot_id1 == metadata.current_snapshot_id + assert snapshot_id2 == metadata.current_snapshot_id + + versions = storage.versions().to_pydict() + versions["tag_or_branch"].sort() + assert versions == { + "snapshot_id": [0, 0], + "tag_or_branch": ["branch1", "branch2"], + "create_time": [create_time1, create_time1] + } + + storage.remove_branch("branch1") + + with pytest.raises(errors.UserInputError, match=r".*not found.*"): + storage.remove_branch("branch1") + assert len(storage.metadata.refs) == 1 + + patch = rt.Patch(addition=meta.ManifestFiles( + index_manifest_files=["data/index_manifest1"], + record_manifest_files=["data/record_manifest1"]), + storage_statistics_update=meta.StorageStatistics( + num_rows=100, + index_compressed_bytes=100, + index_uncompressed_bytes=200, + record_uncompressed_bytes=300)) + storage.commit(patch, "branch2") + + create_time2 = datetime.utcfromtimestamp( + storage.metadata.snapshots[1].create_time.seconds).replace( + tzinfo=pytz.utc) + + assert storage.versions().to_pydict() == { + "snapshot_id": [1, 0], + "tag_or_branch": ["branch2", None], + "create_time": [create_time2,create_time1] + } diff --git a/python/tests/ray/test_runners.py b/python/tests/ray/test_runners.py index 88ec2a9..10d0035 100644 --- a/python/tests/ray/test_runners.py +++ b/python/tests/ray/test_runners.py @@ -159,6 +159,27 @@ def test_write_read_dataset(self, sample_dataset, enable_row_range_block, }) ]).sort_by("int64")) + # Test insert. + sample_dataset.add_branch("branch") + sample_dataset.set_current_branch("branch") + result = runner.insert(generate_data([7, 12])) + assert result.state == JobResult.State.FAILED + assert "Primary key to insert already exist" in result.error_message + + runner.upsert(generate_data([7, 12])) + assert_equal( + runner.read_all(version="branch").sort_by("int64"), + pa.concat_tables([ + input_data0, input_data1, input_data2, input_data3, input_data4, + generate_data([12]) + ]).sort_by("int64")) + + # Test delete. + runner.delete(pc.field("int64") < 10) + assert_equal( + runner.read_all(version="branch").sort_by("int64"), + pa.concat_tables([generate_data([10, 11, 12])]).sort_by("int64")) + @pytest.mark.parametrize("enable_row_range_block", [(True,), (False,)]) def test_read_batch_size(self, tmp_path, sample_schema, enable_row_range_block): From 7761d95bafccb44d74e761984252d43c3e37af6e Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Thu, 1 Feb 2024 01:59:47 +0000 Subject: [PATCH 04/31] fix insert and upsert --- python/src/space/core/storage.py | 6 +- python/src/space/ray/ops/append.py | 5 +- python/tests/ray/test_runners.py | 169 +++++++++++++---------------- 3 files changed, 82 insertions(+), 98 deletions(-) diff --git a/python/src/space/core/storage.py b/python/src/space/core/storage.py index 53fea2d..9c4a326 100644 --- a/python/src/space/core/storage.py +++ b/python/src/space/core/storage.py @@ -123,7 +123,10 @@ def snapshot(self, snapshot_id: Optional[int] = None) -> meta.Snapshot: if not specified. """ if snapshot_id is None: - snapshot_id = self._metadata.current_snapshot_id + if self.current_branch == _MAIN_BRANCH: + snapshot_id = self._metadata.current_snapshot_id + else: + snapshot_id = self.version_to_snapshot_id(self.current_branch) if snapshot_id in self._metadata.snapshots: return self._metadata.snapshots[snapshot_id] @@ -340,7 +343,6 @@ def commit(self, patch: rt.Patch, branch:str) -> None: self._write_metadata(new_metadata_path, new_metadata) self._metadata = new_metadata self._metadata_file = new_metadata_path - print(self._metadata) def data_files(self, filter_: Optional[pc.Expression] = None, diff --git a/python/src/space/ray/ops/append.py b/python/src/space/ray/ops/append.py index 2e29a40..cc7bc60 100644 --- a/python/src/space/ray/ops/append.py +++ b/python/src/space/ray/ops/append.py @@ -47,7 +47,7 @@ def __init__(self, self._ray_options = ray_options self._actors = [ _AppendActor.remote( # type: ignore[attr-defined] # pylint: disable=no-member - location, metadata, file_options, branch, record_address_input) + location, metadata, file_options, record_address_input) for _ in range(self._ray_options.max_parallelism) ] @@ -102,9 +102,8 @@ def __init__(self, location: str, metadata: meta.StorageMetadata, file_options: FileOptions, - branch: Optional[str] = None, record_address_input: bool = False): - self._op = LocalAppendOp(location, metadata, file_options, branch, + self._op = LocalAppendOp(location, metadata, file_options, record_address_input) def write_from(self, source_fn: InputIteratorFn) -> None: diff --git a/python/tests/ray/test_runners.py b/python/tests/ray/test_runners.py index 10d0035..f6b3337 100644 --- a/python/tests/ray/test_runners.py +++ b/python/tests/ray/test_runners.py @@ -83,102 +83,85 @@ class TestRayReadWriteRunner: ]) def test_write_read_dataset(self, sample_dataset, enable_row_range_block, batch_size): + sample_dataset.add_branch("branch1") runner = sample_dataset.ray(ray_options=RayOptions( max_parallelism=4, enable_row_range_block=enable_row_range_block)) + for branch in ["branch1", "main"]: + sample_dataset.set_current_branch(branch) + # Test append. + input_data0 = generate_data([1, 2, 3]) + runner.append(input_data0) + + assert_equal( + runner.read_all(batch_size=batch_size, version=branch if branch !="main" else None).sort_by("int64"), + input_data0.sort_by("int64")) + + input_data1 = generate_data([4, 5]) + input_data2 = generate_data([6, 7]) + input_data3 = generate_data([8]) + input_data4 = generate_data([9, 10, 11]) + + runner.append_from([ + lambda: iter([input_data1, input_data2]), lambda: iter([input_data3]), + lambda: iter([input_data4]) + ]) + + assert_equal( + runner.read_all(batch_size=batch_size, version=branch if branch !="main" else None).sort_by("int64"), + pa.concat_tables( + [input_data0, input_data1, input_data2, input_data3, + input_data4]).sort_by("int64")) + + # Test insert. + result = runner.insert(generate_data([7, 12])) + assert result.state == JobResult.State.FAILED + assert "Primary key to insert already exist" in result.error_message + + runner.upsert(generate_data([7, 12])) + assert_equal( + runner.read_all(batch_size=batch_size,version=branch if branch !="main" else None).sort_by("int64"), + pa.concat_tables([ + input_data0, input_data1, input_data2, input_data3, input_data4, + generate_data([12]) + ]).sort_by("int64")) + + # Test delete. + runner.delete(pc.field("int64") < 10) + assert_equal( + runner.read_all(batch_size=batch_size,version=branch if branch !="main" else None).sort_by("int64"), + pa.concat_tables([generate_data([10, 11, 12])]).sort_by("int64")) + + # Test reading views. + view = sample_dataset.map_batches(fn=_sample_map_udf, + output_schema=sample_dataset.schema, + output_record_fields=["binary"]) + assert_equal( + view.ray(DEFAULT_RAY_OPTIONS).read_all( + batch_size=batch_size).sort_by("int64"), + pa.concat_tables([ + pa.Table.from_pydict({ + "int64": [10, 11, 12], + "float64": [v / 10 + 1 for v in [10, 11, 12]], + "binary": [f"b{v}".encode("utf-8") for v in [10, 11, 12]] + }) + ]).sort_by("int64")) + + # Test a transform on a view. + transform_on_view = view.map_batches(fn=_sample_map_udf, + output_schema=view.schema, + output_record_fields=["binary"]) + assert_equal( + transform_on_view.ray(DEFAULT_RAY_OPTIONS).read_all( + batch_size=batch_size).sort_by("int64"), + pa.concat_tables([ + pa.Table.from_pydict({ + "int64": [10, 11, 12], + "float64": [v / 10 + 2 for v in [10, 11, 12]], + "binary": [f"b{v}".encode("utf-8") for v in [10, 11, 12]] + }) + ]).sort_by("int64")) + - # Test append. - input_data0 = generate_data([1, 2, 3]) - runner.append(input_data0) - - assert_equal( - runner.read_all(batch_size=batch_size).sort_by("int64"), - input_data0.sort_by("int64")) - - input_data1 = generate_data([4, 5]) - input_data2 = generate_data([6, 7]) - input_data3 = generate_data([8]) - input_data4 = generate_data([9, 10, 11]) - - runner.append_from([ - lambda: iter([input_data1, input_data2]), lambda: iter([input_data3]), - lambda: iter([input_data4]) - ]) - - assert_equal( - runner.read_all(batch_size=batch_size).sort_by("int64"), - pa.concat_tables( - [input_data0, input_data1, input_data2, input_data3, - input_data4]).sort_by("int64")) - - # Test insert. - result = runner.insert(generate_data([7, 12])) - assert result.state == JobResult.State.FAILED - assert "Primary key to insert already exist" in result.error_message - - runner.upsert(generate_data([7, 12])) - assert_equal( - runner.read_all(batch_size=batch_size).sort_by("int64"), - pa.concat_tables([ - input_data0, input_data1, input_data2, input_data3, input_data4, - generate_data([12]) - ]).sort_by("int64")) - - # Test delete. - runner.delete(pc.field("int64") < 10) - assert_equal( - runner.read_all(batch_size=batch_size).sort_by("int64"), - pa.concat_tables([generate_data([10, 11, 12])]).sort_by("int64")) - - # Test reading views. - view = sample_dataset.map_batches(fn=_sample_map_udf, - output_schema=sample_dataset.schema, - output_record_fields=["binary"]) - assert_equal( - view.ray(DEFAULT_RAY_OPTIONS).read_all( - batch_size=batch_size).sort_by("int64"), - pa.concat_tables([ - pa.Table.from_pydict({ - "int64": [10, 11, 12], - "float64": [v / 10 + 1 for v in [10, 11, 12]], - "binary": [f"b{v}".encode("utf-8") for v in [10, 11, 12]] - }) - ]).sort_by("int64")) - - # Test a transform on a view. - transform_on_view = view.map_batches(fn=_sample_map_udf, - output_schema=view.schema, - output_record_fields=["binary"]) - assert_equal( - transform_on_view.ray(DEFAULT_RAY_OPTIONS).read_all( - batch_size=batch_size).sort_by("int64"), - pa.concat_tables([ - pa.Table.from_pydict({ - "int64": [10, 11, 12], - "float64": [v / 10 + 2 for v in [10, 11, 12]], - "binary": [f"b{v}".encode("utf-8") for v in [10, 11, 12]] - }) - ]).sort_by("int64")) - - # Test insert. - sample_dataset.add_branch("branch") - sample_dataset.set_current_branch("branch") - result = runner.insert(generate_data([7, 12])) - assert result.state == JobResult.State.FAILED - assert "Primary key to insert already exist" in result.error_message - - runner.upsert(generate_data([7, 12])) - assert_equal( - runner.read_all(version="branch").sort_by("int64"), - pa.concat_tables([ - input_data0, input_data1, input_data2, input_data3, input_data4, - generate_data([12]) - ]).sort_by("int64")) - - # Test delete. - runner.delete(pc.field("int64") < 10) - assert_equal( - runner.read_all(version="branch").sort_by("int64"), - pa.concat_tables([generate_data([10, 11, 12])]).sort_by("int64")) @pytest.mark.parametrize("enable_row_range_block", [(True,), (False,)]) def test_read_batch_size(self, tmp_path, sample_schema, From 02e4c4ac2cbf7820ba456e138f968d5d5567748f Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Tue, 6 Feb 2024 06:58:22 +0000 Subject: [PATCH 05/31] fix bug in snapshot resolution --- python/src/space/core/datasets.py | 7 +------ python/src/space/core/runners.py | 2 +- python/src/space/core/storage.py | 18 +++++++++++------- python/src/space/ray/runners.py | 3 ++- python/tests/core/test_runners.py | 2 ++ 5 files changed, 17 insertions(+), 15 deletions(-) diff --git a/python/src/space/core/datasets.py b/python/src/space/core/datasets.py index 1935dec..b51714a 100644 --- a/python/src/space/core/datasets.py +++ b/python/src/space/core/datasets.py @@ -36,7 +36,6 @@ class Dataset(View): def __init__(self, storage: Storage): self._storage = storage - self._current_branche = "main" @property def storage(self) -> Storage: @@ -83,11 +82,6 @@ def remove_tag(self, tag: str): """Remove tag from a snapshot.""" self._storage.remove_tag(tag) - @property - def current_branch(self) -> str: - """Return the current branch.""" - return self._current_branch - def add_branch(self, branch: str): """Add branch to a snapshot.""" self._storage.add_branch(branch=branch) @@ -97,6 +91,7 @@ def remove_branch(self, branch: str): self._storage.remove_branch(branch) def set_current_branch(self, branch: str): + """Set current branch for the dataset.""" self.storage.set_current_branch(branch) def local(self, file_options: Optional[FileOptions] = None) -> LocalRunner: diff --git a/python/src/space/core/runners.py b/python/src/space/core/runners.py index 6fe4d37..855e459 100644 --- a/python/src/space/core/runners.py +++ b/python/src/space/core/runners.py @@ -170,7 +170,7 @@ def append_array_record(self, pattern: str, """ @abstractmethod - def append_parquet(self, pattern: str, branch: Optional[str] = None) -> JobResult: + def append_parquet(self, pattern: str) -> JobResult: """Append data from Parquet files without copying data. Args: diff --git a/python/src/space/core/storage.py b/python/src/space/core/storage.py index 9c4a326..4fc9da0 100644 --- a/python/src/space/core/storage.py +++ b/python/src/space/core/storage.py @@ -62,7 +62,8 @@ class Storage(paths.StoragePathsMixin): """ def __init__(self, location: str, metadata_file: str, - metadata: meta.StorageMetadata, current_branch: Optional[str] = None): + metadata: meta.StorageMetadata, + current_branch: Optional[str] = None): super().__init__(location) self._fs = create_fs(location) self._metadata = metadata @@ -78,9 +79,11 @@ def __init__(self, location: str, metadata_file: str, self._field_name_ids: Dict[str, int] = arrow.field_name_to_id_dict( self._physical_schema) - self._primary_keys = set(self._metadata.schema.primary_keys) + self._primary_keys = set(self._metadata.schema.primary_keys) self._current_branch = current_branch - self._max_snapshot_id = max([ref[1].snapshot_id for ref in self._metadata.refs.items()] + [self._metadata.current_snapshot_id]) + self._max_snapshot_id = max( + [ref[1].snapshot_id for ref in self._metadata.refs.items()] + + [self._metadata.current_snapshot_id]) @property def metadata(self) -> meta.StorageMetadata: @@ -225,7 +228,7 @@ def _lookup_reference(self, tag_or_branch: str) -> meta.SnapshotReference: def add_tag(self, tag: str, snapshot_id: Optional[int] = None) -> None: """Add tag to a snapshot""" self._add_reference(tag, meta.SnapshotReference.TAG, snapshot_id) - + def add_branch(self, branch: str) -> None: """Add branch to a snapshot""" self._add_reference(branch, meta.SnapshotReference.BRANCH, None) @@ -256,7 +259,8 @@ def _add_reference(self, raise errors.UserInputError("{reference_name} is reserved") if reference_name in self._metadata.refs: - raise errors.VersionAlreadyExistError(f"Reference {reference_name} already exist") + raise errors.VersionAlreadyExistError( + f"Reference {reference_name} already exist") new_metadata = meta.StorageMetadata() new_metadata.CopyFrom(self._metadata) @@ -272,7 +276,7 @@ def _add_reference(self, def remove_tag(self, tag: str) -> None: """Remove tag from metadata""" self._remove_reference(tag, meta.SnapshotReference.TAG) - + def remove_branch(self, branch: str) -> None: """Remove tag from metadata""" if branch == self._current_branch: @@ -315,7 +319,7 @@ def commit(self, patch: rt.Patch, branch:str) -> None: new_metadata.refs[branch].snapshot_id = new_snapshot_id else: new_metadata.current_snapshot_id = new_snapshot_id - current_snapshot = self.snapshot() + current_snapshot = self.snapshot(self._metadata.current_snapshot_id) new_metadata.last_update_time.CopyFrom(proto_now()) new_metadata_path = self.new_metadata_path() diff --git a/python/src/space/ray/runners.py b/python/src/space/ray/runners.py index 7bd7dd0..6689da4 100644 --- a/python/src/space/ray/runners.py +++ b/python/src/space/ray/runners.py @@ -281,7 +281,8 @@ def append(self, data: InputData) -> Optional[rt.Patch]: @StorageMixin.transactional def append_from( - self, source_fns: Union[InputIteratorFn, List[InputIteratorFn]]) -> Optional[rt.Patch]: + self, + source_fns: Union[InputIteratorFn, List[InputIteratorFn]]) -> Optional[rt.Patch]: if not isinstance(source_fns, list): source_fns = [source_fns] diff --git a/python/tests/core/test_runners.py b/python/tests/core/test_runners.py index 8f4cb85..9a1c7bb 100644 --- a/python/tests/core/test_runners.py +++ b/python/tests/core/test_runners.py @@ -232,6 +232,7 @@ def append_data(): lock1.release() t.join() + ds.set_current_branch("main") assert local_runner.read_all() == pa.concat_tables( [sample_data, sample_data]) assert local_runner.read_all(version="exp1") == pa.concat_tables( @@ -268,6 +269,7 @@ def test_add_read_with_branch(self, sample_dataset): sample_data3 = _generate_data([5, 6]) local_runner.append(sample_data3) + ds.set_current_branch("main") assert local_runner.read_all() == pa.concat_tables( [sample_data1, sample_data2]) assert local_runner.read_all(version="exp1") == pa.concat_tables( From 856780da24dfa89e5ec83e225b3866fc93978eb2 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Fri, 9 Feb 2024 07:46:00 +0000 Subject: [PATCH 06/31] fix lint --- python/src/space/core/datasets.py | 3 +-- python/src/space/core/storage.py | 19 ++++++++++++------- python/src/space/ray/ops/append.py | 1 - python/src/space/ray/runners.py | 4 ++-- 4 files changed, 15 insertions(+), 12 deletions(-) diff --git a/python/src/space/core/datasets.py b/python/src/space/core/datasets.py index 0a71c3c..1378553 100644 --- a/python/src/space/core/datasets.py +++ b/python/src/space/core/datasets.py @@ -20,9 +20,8 @@ import pyarrow as pa from substrait.algebra_pb2 import ReadRel, Rel -from space.core.ops.utils import FileOptions, JoinOptions, ReadOptions +from space.core.ops.utils import FileOptions from space.core.options import JoinOptions, ReadOptions -from space.core.utils import errors from space.core.runners import LocalRunner from space.core.storage import Storage, Version diff --git a/python/src/space/core/storage.py b/python/src/space/core/storage.py index 4fc9da0..a0b39b1 100644 --- a/python/src/space/core/storage.py +++ b/python/src/space/core/storage.py @@ -62,7 +62,7 @@ class Storage(paths.StoragePathsMixin): """ def __init__(self, location: str, metadata_file: str, - metadata: meta.StorageMetadata, + metadata: meta.StorageMetadata, current_branch: Optional[str] = None): super().__init__(location) self._fs = create_fs(location) @@ -202,7 +202,10 @@ def reload(self) -> bool: return False metadata = _read_metadata(self._fs, self._location, entry_point) - self.__init__(self.location, entry_point.metadata_file, metadata, self.current_branch) # type: ignore[misc] # pylint: disable=unnecessary-dunder-call + self.__init__(self.location, + entry_point.metadata_file, + metadata, + self.current_branch) # type: ignore[misc] # pylint: disable=unnecessary-dunder-call logging.info( f"Storage reloaded to snapshot: {self._metadata.current_snapshot_id}") return True @@ -283,14 +286,16 @@ def remove_branch(self, branch: str) -> None: raise errors.UserInputError("Cannot remove the current branch.") self._remove_reference(branch, meta.SnapshotReference.BRANCH) - def _remove_reference(self, reference_name:str, reference_type: meta.SnapshotReference.ReferenceType)-> None: - if (reference_name not in self._metadata.refs or - self._metadata.refs[reference_name].type != reference_type): - raise errors.VersionNotFoundError(f"{reference_type} {reference_name} is not found") + def _remove_reference(self, + ref_name: str, + ref_type: meta.SnapshotReference.ReferenceType)-> None: + if (ref_name not in self._metadata.refs or + self._metadata.refs[ref_name].type != ref_type): + raise errors.VersionNotFoundError(f"Reference {ref_name} is not found") new_metadata = meta.StorageMetadata() new_metadata.CopyFrom(self._metadata) - del new_metadata.refs[reference_name] + del new_metadata.refs[ref_name] new_metadata_path = self.new_metadata_path() self._write_metadata(new_metadata_path, new_metadata) self._metadata = new_metadata diff --git a/python/src/space/ray/ops/append.py b/python/src/space/ray/ops/append.py index b5f5834..406c342 100644 --- a/python/src/space/ray/ops/append.py +++ b/python/src/space/ray/ops/append.py @@ -38,7 +38,6 @@ def __init__(self, metadata: meta.StorageMetadata, ray_options: RayOptions, file_options: FileOptions, - branch: Optional[str] = None, record_address_input: bool = False): """ Args: diff --git a/python/src/space/ray/runners.py b/python/src/space/ray/runners.py index 5579970..76d22f1 100644 --- a/python/src/space/ray/runners.py +++ b/python/src/space/ray/runners.py @@ -280,8 +280,8 @@ def append(self, data: InputData) -> Optional[rt.Patch]: @StorageMixin.transactional def append_from( - self, - source_fns: Union[InputIteratorFn, List[InputIteratorFn]]) -> Optional[rt.Patch]: + self, + source_fns: Union[InputIteratorFn, List[InputIteratorFn]]) -> Optional[rt.Patch]: # pylint: disable=line-too-long if not isinstance(source_fns, list): source_fns = [source_fns] From 0a15b65cf11d1cb988fc456b05656d1fb96ea80d Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Fri, 9 Feb 2024 07:56:39 +0000 Subject: [PATCH 07/31] fix lint take 2 --- python/src/space/core/datasets.py | 4 ++-- python/src/space/core/storage.py | 17 ++++++++--------- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/python/src/space/core/datasets.py b/python/src/space/core/datasets.py index 1378553..1537353 100644 --- a/python/src/space/core/datasets.py +++ b/python/src/space/core/datasets.py @@ -20,8 +20,8 @@ import pyarrow as pa from substrait.algebra_pb2 import ReadRel, Rel -from space.core.ops.utils import FileOptions -from space.core.options import JoinOptions, ReadOptions + +from space.core.options import FileOptions, JoinOptions, ReadOptions from space.core.runners import LocalRunner from space.core.storage import Storage, Version diff --git a/python/src/space/core/storage.py b/python/src/space/core/storage.py index a0b39b1..9248b66 100644 --- a/python/src/space/core/storage.py +++ b/python/src/space/core/storage.py @@ -202,9 +202,7 @@ def reload(self) -> bool: return False metadata = _read_metadata(self._fs, self._location, entry_point) - self.__init__(self.location, - entry_point.metadata_file, - metadata, + self.__init__(self.location, entry_point.metadata_file, metadata, # type: ignore[misc] # pylint: disable=unnecessary-dunder-call self.current_branch) # type: ignore[misc] # pylint: disable=unnecessary-dunder-call logging.info( f"Storage reloaded to snapshot: {self._metadata.current_snapshot_id}") @@ -219,9 +217,9 @@ def version_to_snapshot_id(self, version: Version) -> int: if isinstance(version, int): return version - return self._lookup_reference(version).snapshot_id + return self.lookup_reference(version).snapshot_id - def _lookup_reference(self, tag_or_branch: str) -> meta.SnapshotReference: + def lookup_reference(self, tag_or_branch: str) -> meta.SnapshotReference: """Lookup a snapshot reference.""" if tag_or_branch in self._metadata.refs: return self._metadata.refs[tag_or_branch] @@ -239,7 +237,7 @@ def add_branch(self, branch: str) -> None: def set_current_branch(self, branch: str): """Set current branch for the snapshot.""" if branch != "main": - snapshot_ref = self._lookup_reference(branch) + snapshot_ref = self.lookup_reference(branch) if snapshot_ref.type != meta.SnapshotReference.BRANCH: raise errors.UserInputError("{branch} is not a branch.") self._current_branch = branch @@ -286,7 +284,7 @@ def remove_branch(self, branch: str) -> None: raise errors.UserInputError("Cannot remove the current branch.") self._remove_reference(branch, meta.SnapshotReference.BRANCH) - def _remove_reference(self, + def _remove_reference(self, ref_name: str, ref_type: meta.SnapshotReference.ReferenceType)-> None: if (ref_name not in self._metadata.refs or @@ -315,7 +313,7 @@ def commit(self, patch: rt.Patch, branch:str) -> None: new_metadata.CopyFrom(self._metadata) new_snapshot_id = self._next_snapshot_id() if branch and branch != _MAIN_BRANCH: - branch_snapshot = self._lookup_reference(branch) + branch_snapshot = self.lookup_reference(branch) # To block the case delete branch and add a tag during commit # TODO: move this check out of commit() if branch_snapshot.type != meta.SnapshotReference.BRANCH: @@ -578,7 +576,8 @@ def __enter__(self) -> Transaction: self._storage.reload() self._snapshot_id = self._storage.metadata.current_snapshot_id if self._branch != _MAIN_BRANCH: - self._snapshot_id = self._storage._lookup_reference(self._branch).snapshot_id + self._snapshot_id = self._storage.lookup_reference( + self._branch).snapshot_id logging.info(f"Start transaction {self._txn_id}") return self From c3f6d496373ab1c3bfc07a4c0670b64eb99f129e Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Fri, 9 Feb 2024 08:09:33 +0000 Subject: [PATCH 08/31] fix lint for test --- python/tests/core/test_runners.py | 2 +- python/tests/ray/test_runners.py | 131 +++++++++++++++--------------- 2 files changed, 66 insertions(+), 67 deletions(-) diff --git a/python/tests/core/test_runners.py b/python/tests/core/test_runners.py index 9a1c7bb..8e5e573 100644 --- a/python/tests/core/test_runners.py +++ b/python/tests/core/test_runners.py @@ -246,7 +246,7 @@ def test_add_read_with_branch(self, sample_dataset): local_runner.append(sample_data1) ds.add_branch("exp1") - + assert local_runner.read_all() == sample_data1 create_time0 = datetime.utcfromtimestamp( diff --git a/python/tests/ray/test_runners.py b/python/tests/ray/test_runners.py index e85249a..9cfaeb1 100644 --- a/python/tests/ray/test_runners.py +++ b/python/tests/ray/test_runners.py @@ -86,82 +86,81 @@ def test_write_read_dataset(self, sample_dataset, enable_row_range_block, sample_dataset.add_branch("branch1") runner = sample_dataset.ray(ray_options=RayOptions( max_parallelism=4, enable_row_range_block=enable_row_range_block)) + input_data0 = generate_data([1, 2, 3]) + input_data1 = generate_data([4, 5]) + input_data2 = generate_data([6, 7]) + input_data3 = generate_data([8]) + input_data4 = generate_data([9, 10, 11]) for branch in ["branch1", "main"]: - sample_dataset.set_current_branch(branch) - # Test append. - input_data0 = generate_data([1, 2, 3]) - runner.append(input_data0) - - assert_equal( - runner.read_all(batch_size=batch_size, version=branch if branch !="main" else None).sort_by("int64"), - input_data0.sort_by("int64")) - - input_data1 = generate_data([4, 5]) - input_data2 = generate_data([6, 7]) - input_data3 = generate_data([8]) - input_data4 = generate_data([9, 10, 11]) - - runner.append_from([ - lambda: iter([input_data1, input_data2]), lambda: iter([input_data3]), - lambda: iter([input_data4]) + sample_dataset.set_current_branch(branch) + # Test append. + runner.append(input_data0) + + assert_equal( + runner.read_all(batch_size=batch_size).sort_by("int64"), + input_data0.sort_by("int64")) + + + runner.append_from([ + lambda: iter([input_data1, input_data2]), lambda: iter([input_data3]), + lambda: iter([input_data4]) ]) - assert_equal( - runner.read_all(batch_size=batch_size, version=branch if branch !="main" else None).sort_by("int64"), + assert_equal( + runner.read_all(batch_size=batch_size).sort_by("int64"), pa.concat_tables( [input_data0, input_data1, input_data2, input_data3, input_data4]).sort_by("int64")) # Test insert. - result = runner.insert(generate_data([7, 12])) - assert result.state == JobResult.State.FAILED - assert "Primary key to insert already exist" in result.error_message - - runner.upsert(generate_data([7, 12])) - assert_equal( - runner.read_all(batch_size=batch_size,version=branch if branch !="main" else None).sort_by("int64"), - pa.concat_tables([ - input_data0, input_data1, input_data2, input_data3, input_data4, - generate_data([12]) - ]).sort_by("int64")) - - # Test delete. - runner.delete(pc.field("int64") < 10) - assert_equal( - runner.read_all(batch_size=batch_size,version=branch if branch !="main" else None).sort_by("int64"), - pa.concat_tables([generate_data([10, 11, 12])]).sort_by("int64")) - - # Test reading views. - view = sample_dataset.map_batches(fn=_sample_map_udf, - output_schema=sample_dataset.schema, - output_record_fields=["binary"]) - assert_equal( - view.ray(DEFAULT_RAY_OPTIONS).read_all( - batch_size=batch_size).sort_by("int64"), - pa.concat_tables([ - pa.Table.from_pydict({ - "int64": [10, 11, 12], - "float64": [v / 10 + 1 for v in [10, 11, 12]], - "binary": [f"b{v}".encode("utf-8") for v in [10, 11, 12]] - }) - ]).sort_by("int64")) - - # Test a transform on a view. - transform_on_view = view.map_batches(fn=_sample_map_udf, - output_schema=view.schema, - output_record_fields=["binary"]) - assert_equal( - transform_on_view.ray(DEFAULT_RAY_OPTIONS).read_all( - batch_size=batch_size).sort_by("int64"), + result = runner.insert(generate_data([7, 12])) + assert result.state == JobResult.State.FAILED + assert "Primary key to insert already exist" in result.error_message + + runner.upsert(generate_data([7, 12])) + assert_equal( + runner.read_all(batch_size=batch_size).sort_by("int64"), + pa.concat_tables([ + input_data0, input_data1, input_data2, input_data3, input_data4, + generate_data([12]) + ]).sort_by("int64")) + + # Test delete. + runner.delete(pc.field("int64") < 10) + assert_equal( + runner.read_all(batch_size=batch_size,version=branch if branch !="main" else None).sort_by("int64"), + pa.concat_tables([generate_data([10, 11, 12])]).sort_by("int64")) + + # Test reading views. + view = sample_dataset.map_batches(fn=_sample_map_udf, + output_schema=sample_dataset.schema, + output_record_fields=["binary"]) + + assert_equal( + view.ray(DEFAULT_RAY_OPTIONS).read_all( + batch_size=batch_size).sort_by("int64"), + pa.concat_tables([ + pa.Table.from_pydict({ + "int64": [10, 11, 12], + "float64": [v / 10 + 1 for v in [10, 11, 12]], + "binary": [f"b{v}".encode("utf-8") for v in [10, 11, 12]] + }) + ]).sort_by("int64")) + + # Test a transform on a view. + transform_on_view = view.map_batches(fn=_sample_map_udf, + output_schema=view.schema, + output_record_fields=["binary"]) + assert_equal( + transform_on_view.ray(DEFAULT_RAY_OPTIONS).read_all( + batch_size=batch_size).sort_by("int64"), pa.concat_tables([ - pa.Table.from_pydict({ - "int64": [10, 11, 12], - "float64": [v / 10 + 2 for v in [10, 11, 12]], - "binary": [f"b{v}".encode("utf-8") for v in [10, 11, 12]] + pa.Table.from_pydict({ + "int64": [10, 11, 12], + "float64": [v / 10 + 2 for v in [10, 11, 12]], + "binary": [f"b{v}".encode("utf-8") for v in [10, 11, 12]] }) - ]).sort_by("int64")) - - + ]).sort_by("int64")) @pytest.mark.parametrize("enable_row_range_block", [(True,), (False,)]) def test_read_batch_size(self, tmp_path, sample_schema, From 926e3d485c92bf72fcfce56a8c58014e7b4e4b1d Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Fri, 9 Feb 2024 08:12:45 +0000 Subject: [PATCH 09/31] fix lint for test2 --- python/tests/ray/test_runners.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/tests/ray/test_runners.py b/python/tests/ray/test_runners.py index 9cfaeb1..63581e2 100644 --- a/python/tests/ray/test_runners.py +++ b/python/tests/ray/test_runners.py @@ -128,7 +128,7 @@ def test_write_read_dataset(self, sample_dataset, enable_row_range_block, # Test delete. runner.delete(pc.field("int64") < 10) assert_equal( - runner.read_all(batch_size=batch_size,version=branch if branch !="main" else None).sort_by("int64"), + runner.read_all(batch_size=batch_size).sort_by("int64"), pa.concat_tables([generate_data([10, 11, 12])]).sort_by("int64")) # Test reading views. From a64cc8332b069221671c67ac6d51e6fd94fb5e9c Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Fri, 9 Feb 2024 08:26:13 +0000 Subject: [PATCH 10/31] fix type declaration --- python/src/space/core/storage.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/python/src/space/core/storage.py b/python/src/space/core/storage.py index 9248b66..58e604d 100644 --- a/python/src/space/core/storage.py +++ b/python/src/space/core/storage.py @@ -228,11 +228,13 @@ def lookup_reference(self, tag_or_branch: str) -> meta.SnapshotReference: def add_tag(self, tag: str, snapshot_id: Optional[int] = None) -> None: """Add tag to a snapshot""" - self._add_reference(tag, meta.SnapshotReference.TAG, snapshot_id) + self._add_reference( + tag, meta.SnapshotReference.TAG, snapshot_id) def add_branch(self, branch: str) -> None: """Add branch to a snapshot""" - self._add_reference(branch, meta.SnapshotReference.BRANCH, None) + self._add_reference( + branch, meta.SnapshotReference.BRANCH, None) def set_current_branch(self, branch: str): """Set current branch for the snapshot.""" @@ -242,10 +244,9 @@ def set_current_branch(self, branch: str): raise errors.UserInputError("{branch} is not a branch.") self._current_branch = branch - def _add_reference(self, - reference_name: str, - reference_type: meta.SnapshotReference.ReferenceType, - snapshot_id: Optional[int] = None) -> None: + def _add_reference(self,reference_name: str, + reference_type: meta.SnapshotReference.ReferenceType.ValueType, + snapshot_id: Optional[int] = None) -> None: """Add reference to a snapshot""" if snapshot_id is None: snapshot_id = self._metadata.current_snapshot_id @@ -254,7 +255,7 @@ def _add_reference(self, raise errors.VersionNotFoundError(f"Snapshot {snapshot_id} is not found") if len(reference_name) == 0: - raise errors.UserInputError("{reference_type} cannot be empty") + raise errors.UserInputError("reference name cannot be empty") if reference_name in _RESERVED_REFERENCE: raise errors.UserInputError("{reference_name} is reserved") @@ -267,7 +268,7 @@ def _add_reference(self, new_metadata.CopyFrom(self._metadata) ref = meta.SnapshotReference(reference_name=reference_name, snapshot_id=snapshot_id, - type=reference_type) + type=reference_type) new_metadata.refs[reference_name].CopyFrom(ref) new_metadata_path = self.new_metadata_path() self._write_metadata(new_metadata_path, new_metadata) @@ -284,9 +285,8 @@ def remove_branch(self, branch: str) -> None: raise errors.UserInputError("Cannot remove the current branch.") self._remove_reference(branch, meta.SnapshotReference.BRANCH) - def _remove_reference(self, - ref_name: str, - ref_type: meta.SnapshotReference.ReferenceType)-> None: + def _remove_reference(self, ref_name: str, + ref_type: meta.SnapshotReference.ReferenceType.ValueType)-> None: if (ref_name not in self._metadata.refs or self._metadata.refs[ref_name].type != ref_type): raise errors.VersionNotFoundError(f"Reference {ref_name} is not found") From 942b4a3a16200f92e57943f0fc47c97988479d60 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Fri, 9 Feb 2024 17:54:38 +0000 Subject: [PATCH 11/31] formatting] --- python/src/space/core/datasets.py | 5 +- python/src/space/core/runners.py | 8 +-- python/src/space/core/storage.py | 42 +++++++------ python/src/space/ray/runners.py | 22 ++++--- python/tests/core/test_runners.py | 6 +- python/tests/core/test_storage.py | 33 ++++++----- python/tests/ray/test_runners.py | 97 ++++++++++++++++--------------- 7 files changed, 115 insertions(+), 98 deletions(-) diff --git a/python/src/space/core/datasets.py b/python/src/space/core/datasets.py index 1537353..e67d130 100644 --- a/python/src/space/core/datasets.py +++ b/python/src/space/core/datasets.py @@ -20,7 +20,6 @@ import pyarrow as pa from substrait.algebra_pb2 import ReadRel, Rel - from space.core.options import FileOptions, JoinOptions, ReadOptions from space.core.runners import LocalRunner @@ -53,8 +52,8 @@ def create(cls, location: str, schema: pa.Schema, primary_keys: List[str], primary_keys: un-enforced primary keys. record_fields: fields stored in row format (ArrayRecord). """ - return Dataset(Storage.create(location, schema, primary_keys, - record_fields)) + return Dataset( + Storage.create(location, schema, primary_keys, record_fields)) @classmethod def load(cls, location: str) -> Dataset: diff --git a/python/src/space/core/runners.py b/python/src/space/core/runners.py index 32e0744..5e1f992 100644 --- a/python/src/space/core/runners.py +++ b/python/src/space/core/runners.py @@ -231,10 +231,10 @@ def diff(self, start_version: Version, end_version: Version, batch_size: Optional[int] = None) -> Iterator[ChangeData]: - return read_change_data(self._storage, - self._storage.version_to_snapshot_id(start_version), - self._storage.version_to_snapshot_id(end_version), - ReadOptions(batch_size=batch_size)) + return read_change_data( + self._storage, self._storage.version_to_snapshot_id(start_version), + self._storage.version_to_snapshot_id(end_version), + ReadOptions(batch_size=batch_size)) @StorageMixin.transactional def append(self, data: InputData) -> Optional[rt.Patch]: diff --git a/python/src/space/core/storage.py b/python/src/space/core/storage.py index 58e604d..757e971 100644 --- a/python/src/space/core/storage.py +++ b/python/src/space/core/storage.py @@ -61,7 +61,9 @@ class Storage(paths.StoragePathsMixin): Not thread safe. """ - def __init__(self, location: str, metadata_file: str, + def __init__(self, + location: str, + metadata_file: str, metadata: meta.StorageMetadata, current_branch: Optional[str] = None): super().__init__(location) @@ -82,8 +84,8 @@ def __init__(self, location: str, metadata_file: str, self._primary_keys = set(self._metadata.schema.primary_keys) self._current_branch = current_branch self._max_snapshot_id = max( - [ref[1].snapshot_id for ref in self._metadata.refs.items()] + - [self._metadata.current_snapshot_id]) + [ref[1].snapshot_id for ref in self._metadata.refs.items()] + + [self._metadata.current_snapshot_id]) @property def metadata(self) -> meta.StorageMetadata: @@ -202,8 +204,11 @@ def reload(self) -> bool: return False metadata = _read_metadata(self._fs, self._location, entry_point) - self.__init__(self.location, entry_point.metadata_file, metadata, # type: ignore[misc] # pylint: disable=unnecessary-dunder-call - self.current_branch) # type: ignore[misc] # pylint: disable=unnecessary-dunder-call + self.__init__( + self.location, + entry_point.metadata_file, + metadata, # type: ignore[misc] # pylint: disable=unnecessary-dunder-call + self.current_branch) # type: ignore[misc] # pylint: disable=unnecessary-dunder-call logging.info( f"Storage reloaded to snapshot: {self._metadata.current_snapshot_id}") return True @@ -228,13 +233,11 @@ def lookup_reference(self, tag_or_branch: str) -> meta.SnapshotReference: def add_tag(self, tag: str, snapshot_id: Optional[int] = None) -> None: """Add tag to a snapshot""" - self._add_reference( - tag, meta.SnapshotReference.TAG, snapshot_id) + self._add_reference(tag, meta.SnapshotReference.TAG, snapshot_id) def add_branch(self, branch: str) -> None: """Add branch to a snapshot""" - self._add_reference( - branch, meta.SnapshotReference.BRANCH, None) + self._add_reference(branch, meta.SnapshotReference.BRANCH, None) def set_current_branch(self, branch: str): """Set current branch for the snapshot.""" @@ -244,7 +247,9 @@ def set_current_branch(self, branch: str): raise errors.UserInputError("{branch} is not a branch.") self._current_branch = branch - def _add_reference(self,reference_name: str, + def _add_reference( + self, + reference_name: str, reference_type: meta.SnapshotReference.ReferenceType.ValueType, snapshot_id: Optional[int] = None) -> None: """Add reference to a snapshot""" @@ -262,7 +267,7 @@ def _add_reference(self,reference_name: str, if reference_name in self._metadata.refs: raise errors.VersionAlreadyExistError( - f"Reference {reference_name} already exist") + f"Reference {reference_name} already exist") new_metadata = meta.StorageMetadata() new_metadata.CopyFrom(self._metadata) @@ -280,15 +285,16 @@ def remove_tag(self, tag: str) -> None: self._remove_reference(tag, meta.SnapshotReference.TAG) def remove_branch(self, branch: str) -> None: - """Remove tag from metadata""" + """Remove branch from metadata""" if branch == self._current_branch: raise errors.UserInputError("Cannot remove the current branch.") self._remove_reference(branch, meta.SnapshotReference.BRANCH) - def _remove_reference(self, ref_name: str, - ref_type: meta.SnapshotReference.ReferenceType.ValueType)-> None: - if (ref_name not in self._metadata.refs or - self._metadata.refs[ref_name].type != ref_type): + def _remove_reference( + self, ref_name: str, + ref_type: meta.SnapshotReference.ReferenceType.ValueType) -> None: + if (ref_name not in self._metadata.refs + or self._metadata.refs[ref_name].type != ref_type): raise errors.VersionNotFoundError(f"Reference {ref_name} is not found") new_metadata = meta.StorageMetadata() @@ -299,7 +305,7 @@ def _remove_reference(self, ref_name: str, self._metadata = new_metadata self._metadata_file = new_metadata_path - def commit(self, patch: rt.Patch, branch:str) -> None: + def commit(self, patch: rt.Patch, branch: str) -> None: """Commit changes to the storage. TODO: only support a single writer; to ensure atomicity in commit by @@ -577,7 +583,7 @@ def __enter__(self) -> Transaction: self._snapshot_id = self._storage.metadata.current_snapshot_id if self._branch != _MAIN_BRANCH: self._snapshot_id = self._storage.lookup_reference( - self._branch).snapshot_id + self._branch).snapshot_id logging.info(f"Start transaction {self._txn_id}") return self diff --git a/python/src/space/ray/runners.py b/python/src/space/ray/runners.py index 76d22f1..d4af717 100644 --- a/python/src/space/ray/runners.py +++ b/python/src/space/ray/runners.py @@ -197,12 +197,13 @@ def refresh(self, previous_snapshot_id: Optional[int] = None txn = self._start_txn() - for change in self.diff_ray(start_snapshot_id, end_snapshot_id, batch_size): + for change in self.diff_ray(start_snapshot_id, end_snapshot_id, + batch_size): assert isinstance(change.data, list) # Commit when changes from the same snapshot end. - if (previous_snapshot_id is not None and - change.snapshot_id != previous_snapshot_id): + if (previous_snapshot_id is not None + and change.snapshot_id != previous_snapshot_id): txn.commit(utils.merge_patches(patches)) patches.clear() @@ -222,7 +223,8 @@ def refresh(self, elif change.type_ == ChangeType.ADD: patches.append(self._process_append(change.data)) else: - raise NotImplementedError(f"Change type {change.type_} not supported") + raise NotImplementedError( + f"Change type {change.type_} not supported") except (errors.SpaceRuntimeError, errors.UserInputError) as e: r = JobResult(JobResult.State.FAILED, None, repr(e)) return job_results + [r] @@ -235,7 +237,8 @@ def refresh(self, return job_results - def _process_delete(self, data: List[ray.data.Dataset]) -> Optional[rt.Patch]: + def _process_delete(self, + data: List[ray.data.Dataset]) -> Optional[rt.Patch]: # Deletion does not use parallel read streams. assert len(data) == 1 arrow_data = pa.concat_tables(iter_batches( @@ -250,7 +253,8 @@ def _process_delete(self, data: List[ray.data.Dataset]) -> Optional[rt.Patch]: self._file_options) return op.delete() - def _process_append(self, data: List[ray.data.Dataset]) -> Optional[rt.Patch]: + def _process_append(self, + data: List[ray.data.Dataset]) -> Optional[rt.Patch]: return _append_from(self._storage, [partial(iter_batches, ds) for ds in data], self._ray_options, self._file_options) @@ -281,7 +285,7 @@ def append(self, data: InputData) -> Optional[rt.Patch]: @StorageMixin.transactional def append_from( self, - source_fns: Union[InputIteratorFn, List[InputIteratorFn]]) -> Optional[rt.Patch]: # pylint: disable=line-too-long + source_fns: Union[InputIteratorFn, List[InputIteratorFn]]) -> Optional[rt.Patch]: # pylint: disable=line-too-long if not isinstance(source_fns, list): source_fns = [source_fns] @@ -308,8 +312,8 @@ def append_parquet(self, pattern: str) -> Optional[rt.Patch]: @StorageMixin.transactional def _insert(self, data: InputData, mode: InsertOptions.Mode) -> Optional[rt.Patch]: - op = RayInsertOp(self._storage, InsertOptions(mode=mode), self._ray_options, - self._file_options) + op = RayInsertOp(self._storage, InsertOptions(mode=mode), + self._ray_options, self._file_options) return op.write(data) @StorageMixin.transactional diff --git a/python/tests/core/test_runners.py b/python/tests/core/test_runners.py index 8e5e573..9aa4c76 100644 --- a/python/tests/core/test_runners.py +++ b/python/tests/core/test_runners.py @@ -36,7 +36,8 @@ class TestLocalRunner: @pytest.fixture def sample_dataset(self, tmp_path): simple_tf_features_dict = f.FeaturesDict({ - "image": f.Image(shape=(None, None, 3), dtype=np.uint8), + "image": + f.Image(shape=(None, None, 3), dtype=np.uint8), }) schema = pa.schema([("id", pa.int64()), ("name", pa.string()), ("feat1", TfFeatures(simple_tf_features_dict)), @@ -297,7 +298,8 @@ def test_dataset_with_file_type(self, tmp_path): def _generate_data(ids: Iterable[int]) -> pa.Table: return pa.Table.from_pydict({ - "id": ids, + "id": + ids, "name": [f"name_{i}" for i in ids], "feat1": [bytes(f"feat1_{i}", "utf-8") for i in ids], "feat2": [bytes(f"feat2_{i}", "utf-8") for i in ids], diff --git a/python/tests/core/test_storage.py b/python/tests/core/test_storage.py index 41b2f4e..62e44ec 100644 --- a/python/tests/core/test_storage.py +++ b/python/tests/core/test_storage.py @@ -137,7 +137,7 @@ def test_commit(self, tmp_path): record_uncompressed_bytes=30) patch = rt.Patch(addition=added_manifest_files, storage_statistics_update=added_storage_statistics) - storage.commit(patch,"main") + storage.commit(patch, "main") assert storage.snapshot(0) is not None new_snapshot = storage.snapshot(1) @@ -155,7 +155,7 @@ def test_commit(self, tmp_path): index_manifest_files=["data/index_manifest1"], record_manifest_files=["data/record_manifest1"]), storage_statistics_update=added_storage_statistics2) - storage.commit(patch,"main") + storage.commit(patch, "main") new_snapshot = storage.snapshot(2) assert new_snapshot.manifest_files == meta.ManifestFiles( @@ -274,7 +274,8 @@ def commit_add_index_manifest(manifest_path: str): # Test data_files() with filters. index_file1.manifest_file_id = 1 assert storage.data_files(filter_=pc.field("int64") > 1000) == rt.FileSet( - index_files=[index_file1], index_manifest_files={1: manifests_dict2[2]}) + index_files=[index_file1], + index_manifest_files={1: manifests_dict2[2]}) def test_create_storage_schema_validation(self, tmp_path): location = tmp_path / "dataset" @@ -293,15 +294,17 @@ def test_create_storage_schema_validation(self, tmp_path): primary_keys=["not_exist"], record_fields=[]) - with pytest.raises(errors.UserInputError, - match=r".*Record field int64 cannot be a primary key.*"): + with pytest.raises( + errors.UserInputError, + match=r".*Record field int64 cannot be a primary key.*"): Storage.create(location=str(location), schema=pa.schema([pa.field("int64", pa.int64())]), primary_keys=["int64"], record_fields=["int64"]) - with pytest.raises(errors.UserInputError, - match=r".*Record field not_exist not found in schema.*"): + with pytest.raises( + errors.UserInputError, + match=r".*Record field not_exist not found in schema.*"): Storage.create(location=str(location), schema=pa.schema([pa.field("int64", pa.int64())]), primary_keys=["int64"], @@ -310,8 +313,8 @@ def test_create_storage_schema_validation(self, tmp_path): with pytest.raises(errors.UserInputError, match=r".*Primary key type not supported.*"): Storage.create(location=str(location), - schema=pa.schema([pa.field("list", - pa.list_(pa.string()))]), + schema=pa.schema( + [pa.field("list", pa.list_(pa.string()))]), primary_keys=["list"], record_fields=["list"]) @@ -424,11 +427,11 @@ def test_branches(self, tmp_path): patch = rt.Patch(addition=meta.ManifestFiles( index_manifest_files=["data/index_manifest1"], record_manifest_files=["data/record_manifest1"]), - storage_statistics_update=meta.StorageStatistics( - num_rows=100, - index_compressed_bytes=100, - index_uncompressed_bytes=200, - record_uncompressed_bytes=300)) + storage_statistics_update=meta.StorageStatistics( + num_rows=100, + index_compressed_bytes=100, + index_uncompressed_bytes=200, + record_uncompressed_bytes=300)) storage.commit(patch, "branch2") create_time2 = datetime.utcfromtimestamp( @@ -438,5 +441,5 @@ def test_branches(self, tmp_path): assert storage.versions().to_pydict() == { "snapshot_id": [1, 0], "tag_or_branch": ["branch2", None], - "create_time": [create_time2,create_time1] + "create_time": [create_time2, create_time1] } diff --git a/python/tests/ray/test_runners.py b/python/tests/ray/test_runners.py index 63581e2..29c61c6 100644 --- a/python/tests/ray/test_runners.py +++ b/python/tests/ray/test_runners.py @@ -97,72 +97,71 @@ def test_write_read_dataset(self, sample_dataset, enable_row_range_block, runner.append(input_data0) assert_equal( - runner.read_all(batch_size=batch_size).sort_by("int64"), - input_data0.sort_by("int64")) - + runner.read_all(batch_size=batch_size).sort_by("int64"), + input_data0.sort_by("int64")) runner.append_from([ - lambda: iter([input_data1, input_data2]), lambda: iter([input_data3]), - lambda: iter([input_data4]) - ]) + lambda: iter([input_data1, input_data2]), + lambda: iter([input_data3]), lambda: iter([input_data4]) + ]) assert_equal( - runner.read_all(batch_size=batch_size).sort_by("int64"), - pa.concat_tables( - [input_data0, input_data1, input_data2, input_data3, - input_data4]).sort_by("int64")) + runner.read_all(batch_size=batch_size).sort_by("int64"), + pa.concat_tables([ + input_data0, input_data1, input_data2, input_data3, input_data4 + ]).sort_by("int64")) - # Test insert. + # Test insert. result = runner.insert(generate_data([7, 12])) assert result.state == JobResult.State.FAILED assert "Primary key to insert already exist" in result.error_message runner.upsert(generate_data([7, 12])) assert_equal( - runner.read_all(batch_size=batch_size).sort_by("int64"), + runner.read_all(batch_size=batch_size).sort_by("int64"), pa.concat_tables([ - input_data0, input_data1, input_data2, input_data3, input_data4, - generate_data([12]) + input_data0, input_data1, input_data2, input_data3, input_data4, + generate_data([12]) ]).sort_by("int64")) # Test delete. runner.delete(pc.field("int64") < 10) assert_equal( - runner.read_all(batch_size=batch_size).sort_by("int64"), - pa.concat_tables([generate_data([10, 11, 12])]).sort_by("int64")) + runner.read_all(batch_size=batch_size).sort_by("int64"), + pa.concat_tables([generate_data([10, 11, 12])]).sort_by("int64")) # Test reading views. view = sample_dataset.map_batches(fn=_sample_map_udf, - output_schema=sample_dataset.schema, - output_record_fields=["binary"]) + output_schema=sample_dataset.schema, + output_record_fields=["binary"]) assert_equal( - view.ray(DEFAULT_RAY_OPTIONS).read_all( - batch_size=batch_size).sort_by("int64"), + view.ray(DEFAULT_RAY_OPTIONS).read_all( + batch_size=batch_size).sort_by("int64"), pa.concat_tables([ - pa.Table.from_pydict({ - "int64": [10, 11, 12], - "float64": [v / 10 + 1 for v in [10, 11, 12]], - "binary": [f"b{v}".encode("utf-8") for v in [10, 11, 12]] - }) - ]).sort_by("int64")) + pa.Table.from_pydict({ + "int64": [10, 11, 12], + "float64": [v / 10 + 1 for v in [10, 11, 12]], + "binary": [f"b{v}".encode("utf-8") for v in [10, 11, 12]] + }) + ]).sort_by("int64")) # Test a transform on a view. transform_on_view = view.map_batches(fn=_sample_map_udf, - output_schema=view.schema, - output_record_fields=["binary"]) + output_schema=view.schema, + output_record_fields=["binary"]) assert_equal( transform_on_view.ray(DEFAULT_RAY_OPTIONS).read_all( - batch_size=batch_size).sort_by("int64"), - pa.concat_tables([ + batch_size=batch_size).sort_by("int64"), + pa.concat_tables([ pa.Table.from_pydict({ - "int64": [10, 11, 12], - "float64": [v / 10 + 2 for v in [10, 11, 12]], - "binary": [f"b{v}".encode("utf-8") for v in [10, 11, 12]] - }) - ]).sort_by("int64")) + "int64": [10, 11, 12], + "float64": [v / 10 + 2 for v in [10, 11, 12]], + "binary": [f"b{v}".encode("utf-8") for v in [10, 11, 12]] + }) + ]).sort_by("int64")) - @pytest.mark.parametrize("enable_row_range_block", [(True,), (False,)]) + @pytest.mark.parametrize("enable_row_range_block", [(True, ), (False, )]) def test_read_batch_size(self, tmp_path, sample_schema, enable_row_range_block): ds = Dataset.create(str(tmp_path / f"dataset_{random_id()}"), @@ -181,7 +180,8 @@ def test_read_batch_size(self, tmp_path, sample_schema, assert d.num_rows == 10 @pytest.mark.parametrize("refresh_batch_size", [None, 2]) - def test_diff_map_batches(self, tmp_path, sample_dataset, refresh_batch_size): + def test_diff_map_batches(self, tmp_path, sample_dataset, + refresh_batch_size): ds = sample_dataset view_schema = pa.schema( @@ -249,7 +249,8 @@ def test_diff_map_batches(self, tmp_path, sample_dataset, refresh_batch_size): with pytest.raises( errors.VersionNotFoundError, - match=r".*Target snapshot ID 3 higher than source dataset version 2.*"): + match=r".*Target snapshot ID 3 higher than source dataset version 2.*" + ): ray_runner.refresh(3) # Test upsert's diff. @@ -405,13 +406,11 @@ def generate_data2(values: Iterable[int]) -> pa.Table: left_fields_ = [ ds1_schema.field(f).remove_metadata() - for f in left_fields or ds1.schema.names - if f != "int64" + for f in left_fields or ds1.schema.names if f != "int64" ] right_fields_ = [ ds2.schema.field(f).remove_metadata() - for f in right_fields or ds2.schema.names - if f != "int64" + for f in right_fields or ds2.schema.names if f != "int64" ] if not swap: @@ -427,7 +426,8 @@ def generate_data2(values: Iterable[int]) -> pa.Table: def generate_expected(values: Iterable[int]) -> pa.Table: return pa.Table.from_pydict({ - "int64": values, + "int64": + values, "float64": [v / 10 for v in values], "binary": [f"b{v}".encode("utf-8") for v in values], "string": [f"s{v}" for v in values] @@ -443,7 +443,8 @@ def generate_expected(values: Iterable[int]) -> pa.Table: # Sanity checks of addresses. address_column = join_values.column("binary").combine_chunks() - assert address_column.field("_FILE")[0].as_py().startswith("data/binary_") + assert address_column.field("_FILE")[0].as_py().startswith( + "data/binary_") assert len(address_column.field("_ROW_ID")) == len(indexes) # Test reading addresses. @@ -494,8 +495,9 @@ def test_join_input_validation(self, tmp_path, sample_dataset): left_fields=["float64"], right_fields=["string"]) - with pytest.raises(errors.UserInputError, - match=r".*Join key must be primary key on both sides.*"): + with pytest.raises( + errors.UserInputError, + match=r".*Join key must be primary key on both sides.*"): ds1.join(ds2, keys=["string"], left_fields=["float64"], @@ -504,7 +506,8 @@ def test_join_input_validation(self, tmp_path, sample_dataset): def generate_data(values: Iterable[int]) -> pa.Table: return pa.Table.from_pydict({ - "int64": values, + "int64": + values, "float64": [v / 10 for v in values], "binary": [f"b{v}".encode("utf-8") for v in values] }) From 200a9841e57371b903e820a8cc99e9443bc6e5bb Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Fri, 9 Feb 2024 18:04:26 +0000 Subject: [PATCH 12/31] fix lint --- python/src/space/core/storage.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/src/space/core/storage.py b/python/src/space/core/storage.py index 757e971..23fb293 100644 --- a/python/src/space/core/storage.py +++ b/python/src/space/core/storage.py @@ -204,11 +204,11 @@ def reload(self) -> bool: return False metadata = _read_metadata(self._fs, self._location, entry_point) - self.__init__( + self.__init__( # type: ignore[misc] # pylint: disable=unnecessary-dunder-call self.location, entry_point.metadata_file, - metadata, # type: ignore[misc] # pylint: disable=unnecessary-dunder-call - self.current_branch) # type: ignore[misc] # pylint: disable=unnecessary-dunder-call + metadata, + self.current_branch) logging.info( f"Storage reloaded to snapshot: {self._metadata.current_snapshot_id}") return True From 983acf23e3c569d34ce6bc8ba25d581b1b079542 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Tue, 13 Feb 2024 02:18:34 +0000 Subject: [PATCH 13/31] format some files --- python/src/space/core/datasets.py | 1 - python/src/space/core/ops/append.py | 5 ++--- python/src/space/core/storage.py | 17 ++++++++--------- 3 files changed, 10 insertions(+), 13 deletions(-) diff --git a/python/src/space/core/datasets.py b/python/src/space/core/datasets.py index e67d130..b45a4fd 100644 --- a/python/src/space/core/datasets.py +++ b/python/src/space/core/datasets.py @@ -21,7 +21,6 @@ from substrait.algebra_pb2 import ReadRel, Rel from space.core.options import FileOptions, JoinOptions, ReadOptions - from space.core.runners import LocalRunner from space.core.storage import Storage, Version from space.core.transform.plans import LogicalPlanBuilder diff --git a/python/src/space/core/ops/append.py b/python/src/space/core/ops/append.py index 317d618..d0da93a 100644 --- a/python/src/space/core/ops/append.py +++ b/python/src/space/core/ops/append.py @@ -115,7 +115,6 @@ def __init__(self, self._patch = rt.Patch() - def write(self, data: InputData) -> None: if not isinstance(data, pa.Table): data = pa.Table.from_pydict(data) @@ -235,8 +234,8 @@ def _finish_index_writer(self) -> None: file_path = self._index_writer_info.file_path stats = self._index_manifest_writer.write( file_path, self._index_writer_info.writer.writer.metadata) - utils.update_index_storage_stats(base=self._patch.storage_statistics_update, - update=stats) + utils.update_index_storage_stats( + base=self._patch.storage_statistics_update, update=stats) self._patch.change_log.added_rows.append( meta.RowBitmap(file=file_path, all_rows=True, num_rows=stats.num_rows)) diff --git a/python/src/space/core/storage.py b/python/src/space/core/storage.py index 23fb293..4c0dc5d 100644 --- a/python/src/space/core/storage.py +++ b/python/src/space/core/storage.py @@ -50,6 +50,7 @@ # Initial snapshot ID. _INIT_SNAPSHOT_ID = 0 +# Name for the main branch, by default the read write are using this branch. _MAIN_BRANCH = "main" _RESERVED_REFERENCE = [_MAIN_BRANCH] @@ -97,6 +98,7 @@ def current_branch(self) -> str: """Return the current branch.""" if not self._current_branch: return _MAIN_BRANCH + return self._current_branch @property @@ -204,10 +206,8 @@ def reload(self) -> bool: return False metadata = _read_metadata(self._fs, self._location, entry_point) - self.__init__( # type: ignore[misc] # pylint: disable=unnecessary-dunder-call - self.location, - entry_point.metadata_file, - metadata, + self.__init__( # type: ignore[misc] # pylint: disable=unnecessary-dunder-call + self.location, entry_point.metadata_file, metadata, self.current_branch) logging.info( f"Storage reloaded to snapshot: {self._metadata.current_snapshot_id}") @@ -247,11 +247,10 @@ def set_current_branch(self, branch: str): raise errors.UserInputError("{branch} is not a branch.") self._current_branch = branch - def _add_reference( - self, - reference_name: str, - reference_type: meta.SnapshotReference.ReferenceType.ValueType, - snapshot_id: Optional[int] = None) -> None: + def _add_reference(self, + ref_name: str, + ref_type: meta.SnapshotReference.ReferenceType.ValueType, + snapshot_id: Optional[int] = None) -> None: """Add reference to a snapshot""" if snapshot_id is None: snapshot_id = self._metadata.current_snapshot_id From fb569cd9d4b06762b4be4c39a3cedf080c2449dc Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Tue, 13 Feb 2024 02:21:29 +0000 Subject: [PATCH 14/31] fix format --- python/src/space/core/datasets.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/python/src/space/core/datasets.py b/python/src/space/core/datasets.py index b45a4fd..048a44f 100644 --- a/python/src/space/core/datasets.py +++ b/python/src/space/core/datasets.py @@ -73,24 +73,24 @@ def record_fields(self) -> List[str]: return self._storage.record_fields def add_tag(self, tag: str, snapshot_id: Optional[int] = None): - """Add yag to a snapshot.""" + """Add tag to a dataset.""" self._storage.add_tag(tag, snapshot_id) def remove_tag(self, tag: str): - """Remove tag from a snapshot.""" + """Remove tag from a dataset.""" self._storage.remove_tag(tag) def add_branch(self, branch: str): - """Add branch to a snapshot.""" - self._storage.add_branch(branch=branch) + """Add branch to a dataset.""" + self._storage.add_branch(branch) def remove_branch(self, branch: str): - """Remove tag from a snapshot.""" + """Remove tag branch a dataset.""" self._storage.remove_branch(branch) def set_current_branch(self, branch: str): """Set current branch for the dataset.""" - self.storage.set_current_branch(branch) + self._storage.set_current_branch(branch) def local(self, file_options: Optional[FileOptions] = None) -> LocalRunner: """Get a runner that runs operations locally.""" From 52d8b62eb91f4022e0560db6b0ca4fa9b354df09 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Tue, 13 Feb 2024 02:23:38 +0000 Subject: [PATCH 15/31] fix naming --- python/src/space/core/storage.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/python/src/space/core/storage.py b/python/src/space/core/storage.py index 4c0dc5d..e7a1608 100644 --- a/python/src/space/core/storage.py +++ b/python/src/space/core/storage.py @@ -258,22 +258,22 @@ def _add_reference(self, if snapshot_id not in self._metadata.snapshots: raise errors.VersionNotFoundError(f"Snapshot {snapshot_id} is not found") - if len(reference_name) == 0: + if len(ref_name) == 0: raise errors.UserInputError("reference name cannot be empty") - if reference_name in _RESERVED_REFERENCE: - raise errors.UserInputError("{reference_name} is reserved") + if ref_name in _RESERVED_REFERENCE: + raise errors.UserInputError("{ref_name} is reserved") - if reference_name in self._metadata.refs: + if ref_name in self._metadata.refs: raise errors.VersionAlreadyExistError( - f"Reference {reference_name} already exist") + f"Reference {ref_name} already exist") new_metadata = meta.StorageMetadata() new_metadata.CopyFrom(self._metadata) - ref = meta.SnapshotReference(reference_name=reference_name, + ref = meta.SnapshotReference(ref_name=ref_name, snapshot_id=snapshot_id, - type=reference_type) - new_metadata.refs[reference_name].CopyFrom(ref) + type=ref_name) + new_metadata.refs[ref_name].CopyFrom(ref) new_metadata_path = self.new_metadata_path() self._write_metadata(new_metadata_path, new_metadata) self._metadata = new_metadata @@ -328,6 +328,7 @@ def commit(self, patch: rt.Patch, branch: str) -> None: else: new_metadata.current_snapshot_id = new_snapshot_id current_snapshot = self.snapshot(self._metadata.current_snapshot_id) + new_metadata.last_update_time.CopyFrom(proto_now()) new_metadata_path = self.new_metadata_path() From b327782d33b605a23e930e63a3e367cb2e5d8474 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Tue, 13 Feb 2024 05:41:55 +0000 Subject: [PATCH 16/31] fix ref name --- python/src/space/core/storage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/src/space/core/storage.py b/python/src/space/core/storage.py index e7a1608..8bf6417 100644 --- a/python/src/space/core/storage.py +++ b/python/src/space/core/storage.py @@ -272,7 +272,7 @@ def _add_reference(self, new_metadata.CopyFrom(self._metadata) ref = meta.SnapshotReference(ref_name=ref_name, snapshot_id=snapshot_id, - type=ref_name) + type=ref_type) new_metadata.refs[ref_name].CopyFrom(ref) new_metadata_path = self.new_metadata_path() self._write_metadata(new_metadata_path, new_metadata) From 25c0eb0c27fea97c2dffc7b006654c4319729b5c Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Tue, 13 Feb 2024 05:54:06 +0000 Subject: [PATCH 17/31] fix ref name --- python/src/space/core/storage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/src/space/core/storage.py b/python/src/space/core/storage.py index 8bf6417..5e2dec1 100644 --- a/python/src/space/core/storage.py +++ b/python/src/space/core/storage.py @@ -270,7 +270,7 @@ def _add_reference(self, new_metadata = meta.StorageMetadata() new_metadata.CopyFrom(self._metadata) - ref = meta.SnapshotReference(ref_name=ref_name, + ref = meta.SnapshotReference(reference_name=ref_name, snapshot_id=snapshot_id, type=ref_type) new_metadata.refs[ref_name].CopyFrom(ref) From ea69464e22478d4e03472b18f9b0c0cc26749427 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Fri, 16 Feb 2024 01:18:43 +0000 Subject: [PATCH 18/31] resolve --- python/src/space/core/datasets.py | 2 +- python/src/space/core/storage.py | 13 ++++++------- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/python/src/space/core/datasets.py b/python/src/space/core/datasets.py index 048a44f..de36fff 100644 --- a/python/src/space/core/datasets.py +++ b/python/src/space/core/datasets.py @@ -85,7 +85,7 @@ def add_branch(self, branch: str): self._storage.add_branch(branch) def remove_branch(self, branch: str): - """Remove tag branch a dataset.""" + """Remove branch for a dataset.""" self._storage.remove_branch(branch) def set_current_branch(self, branch: str): diff --git a/python/src/space/core/storage.py b/python/src/space/core/storage.py index 5e2dec1..4b0e5e2 100644 --- a/python/src/space/core/storage.py +++ b/python/src/space/core/storage.py @@ -52,6 +52,7 @@ _INIT_SNAPSHOT_ID = 0 # Name for the main branch, by default the read write are using this branch. _MAIN_BRANCH = "main" +# Sets of reference that could not be added as branches or tags by user. _RESERVED_REFERENCE = [_MAIN_BRANCH] @@ -83,9 +84,9 @@ def __init__(self, self._physical_schema) self._primary_keys = set(self._metadata.schema.primary_keys) - self._current_branch = current_branch + self._current_branch = current_branch if current_branch else _MAIN_BRANCH self._max_snapshot_id = max( - [ref[1].snapshot_id for ref in self._metadata.refs.items()] + + [ref.snapshot_id for ref in self._metadata.refs.values()] + [self._metadata.current_snapshot_id]) @property @@ -96,9 +97,6 @@ def metadata(self) -> meta.StorageMetadata: @property def current_branch(self) -> str: """Return the current branch.""" - if not self._current_branch: - return _MAIN_BRANCH - return self._current_branch @property @@ -241,15 +239,16 @@ def add_branch(self, branch: str) -> None: def set_current_branch(self, branch: str): """Set current branch for the snapshot.""" - if branch != "main": + if branch != _MAIN_BRANCH: snapshot_ref = self.lookup_reference(branch) if snapshot_ref.type != meta.SnapshotReference.BRANCH: raise errors.UserInputError("{branch} is not a branch.") + self._current_branch = branch def _add_reference(self, ref_name: str, - ref_type: meta.SnapshotReference.ReferenceType.ValueType, + ref_type: meta.SnapshotReference.ReferenceType, snapshot_id: Optional[int] = None) -> None: """Add reference to a snapshot""" if snapshot_id is None: From 78b6e4d82c81734b64053f2d4e22386a9d1a3d7c Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Fri, 16 Feb 2024 01:23:44 +0000 Subject: [PATCH 19/31] use value types --- python/src/space/core/storage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/src/space/core/storage.py b/python/src/space/core/storage.py index 4b0e5e2..5ce405c 100644 --- a/python/src/space/core/storage.py +++ b/python/src/space/core/storage.py @@ -248,7 +248,7 @@ def set_current_branch(self, branch: str): def _add_reference(self, ref_name: str, - ref_type: meta.SnapshotReference.ReferenceType, + ref_type: meta.SnapshotReference.ReferenceType.ValueType, snapshot_id: Optional[int] = None) -> None: """Add reference to a snapshot""" if snapshot_id is None: From 0fb4992c4f8c0a926f8b70bbcfa4cec55056dccd Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Fri, 16 Feb 2024 01:52:12 +0000 Subject: [PATCH 20/31] reduce flaky tests --- python/tests/core/loaders/test_parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/tests/core/loaders/test_parquet.py b/python/tests/core/loaders/test_parquet.py index 5366493..7c42f8b 100644 --- a/python/tests/core/loaders/test_parquet.py +++ b/python/tests/core/loaders/test_parquet.py @@ -71,4 +71,4 @@ def test_append_parquet(self, tmp_path): ]).combine_chunks().sort_by("int64") assert not ds.index_files(version="empty") - assert ds.index_files(version="after_append") == [file0, file1] + assert sorted(ds.index_files(version="after_append")) == [file0, file1] From 30af48ac8dae58649beafd05372732df76451104 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Fri, 16 Feb 2024 02:00:06 +0000 Subject: [PATCH 21/31] reduce flaky tests --- python/tests/ray/test_runners.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/tests/ray/test_runners.py b/python/tests/ray/test_runners.py index 29c61c6..ac3c921 100644 --- a/python/tests/ray/test_runners.py +++ b/python/tests/ray/test_runners.py @@ -451,7 +451,8 @@ def generate_expected(values: Iterable[int]) -> pa.Table: values = read_record_column(ds1.storage, join_values.select(["binary"]), field="binary") - assert values == expected_values.column("binary").combine_chunks() + assert sorted(values) == sorted( + expected_values.column("binary").combine_chunks()) join_values = join_values.drop("binary") expected_values = expected_values.drop("binary") From 36a3ef21478f67d821d2dae19777fbac66136c87 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Fri, 16 Feb 2024 02:05:46 +0000 Subject: [PATCH 22/31] reduce tests --- python/tests/ray/test_runners.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/tests/ray/test_runners.py b/python/tests/ray/test_runners.py index ac3c921..29c61c6 100644 --- a/python/tests/ray/test_runners.py +++ b/python/tests/ray/test_runners.py @@ -451,8 +451,7 @@ def generate_expected(values: Iterable[int]) -> pa.Table: values = read_record_column(ds1.storage, join_values.select(["binary"]), field="binary") - assert sorted(values) == sorted( - expected_values.column("binary").combine_chunks()) + assert values == expected_values.column("binary").combine_chunks() join_values = join_values.drop("binary") expected_values = expected_values.drop("binary") From 9d3df724fee408bc0dcb2789e72b85c0c5805bec Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Sat, 17 Feb 2024 22:16:25 +0000 Subject: [PATCH 23/31] misc refactor --- python/src/space/core/storage.py | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/python/src/space/core/storage.py b/python/src/space/core/storage.py index 5ce405c..4628f69 100644 --- a/python/src/space/core/storage.py +++ b/python/src/space/core/storage.py @@ -84,7 +84,7 @@ def __init__(self, self._physical_schema) self._primary_keys = set(self._metadata.schema.primary_keys) - self._current_branch = current_branch if current_branch else _MAIN_BRANCH + self._current_branch = current_branch or _MAIN_BRANCH self._max_snapshot_id = max( [ref.snapshot_id for ref in self._metadata.refs.values()] + [self._metadata.current_snapshot_id]) @@ -119,6 +119,13 @@ def physical_schema(self) -> pa.Schema: """Return the physcal schema that uses reference for record fields.""" return self._physical_schema + @property + def current_snapshot_id(self) -> int: + if self._branch != _MAIN_BRANCH: + return self._storage.lookup_reference(self._branch).snapshot_id + + return self.metadata.current_snapshot_id + def serializer(self) -> DictSerializer: """Return a serializer (deserializer) for the dataset.""" return DictSerializer.create(self.logical_schema) @@ -237,7 +244,7 @@ def add_branch(self, branch: str) -> None: """Add branch to a snapshot""" self._add_reference(branch, meta.SnapshotReference.BRANCH, None) - def set_current_branch(self, branch: str): + def set_current_branch(self, branch: str) -> None: """Set current branch for the snapshot.""" if branch != _MAIN_BRANCH: snapshot_ref = self.lookup_reference(branch) @@ -257,11 +264,11 @@ def _add_reference(self, if snapshot_id not in self._metadata.snapshots: raise errors.VersionNotFoundError(f"Snapshot {snapshot_id} is not found") - if len(ref_name) == 0: - raise errors.UserInputError("reference name cannot be empty") + if not ref_name: + raise errors.UserInputError("Reference name cannot be empty.") if ref_name in _RESERVED_REFERENCE: - raise errors.UserInputError("{ref_name} is reserved") + raise errors.UserInputError("{ref_name} is reserved.") if ref_name in self._metadata.refs: raise errors.VersionAlreadyExistError( @@ -286,6 +293,7 @@ def remove_branch(self, branch: str) -> None: """Remove branch from metadata""" if branch == self._current_branch: raise errors.UserInputError("Cannot remove the current branch.") + self._remove_reference(branch, meta.SnapshotReference.BRANCH) def _remove_reference( @@ -550,9 +558,8 @@ def commit(self, patch: Optional[rt.Patch]) -> None: # Check that no other commit has taken place. assert self._snapshot_id is not None self._storage.reload() - current_snapshot_id = self._storage.metadata.current_snapshot_id - if self._branch != _MAIN_BRANCH: - current_snapshot_id = self._storage.version_to_snapshot_id(self._branch) + current_snapshot_id = self.current_snapshot_id + if self._snapshot_id != current_snapshot_id: self._result = JobResult( JobResult.State.FAILED, None, @@ -579,10 +586,7 @@ def __enter__(self) -> Transaction: # All mutations start with a transaction, so storage is always reloaded for # mutations. self._storage.reload() - self._snapshot_id = self._storage.metadata.current_snapshot_id - if self._branch != _MAIN_BRANCH: - self._snapshot_id = self._storage.lookup_reference( - self._branch).snapshot_id + self._snapshot_id = self._storage.current_snapshot_id logging.info(f"Start transaction {self._txn_id}") return self From bc88f9b4dcec5315d99610ec26a0aa34685ada78 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Sat, 17 Feb 2024 22:28:05 +0000 Subject: [PATCH 24/31] fix bug of incorrect type about branch --- python/src/space/core/storage.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/python/src/space/core/storage.py b/python/src/space/core/storage.py index 4628f69..0a0e22d 100644 --- a/python/src/space/core/storage.py +++ b/python/src/space/core/storage.py @@ -119,10 +119,9 @@ def physical_schema(self) -> pa.Schema: """Return the physcal schema that uses reference for record fields.""" return self._physical_schema - @property - def current_snapshot_id(self) -> int: - if self._branch != _MAIN_BRANCH: - return self._storage.lookup_reference(self._branch).snapshot_id + def get_current_snapshot_id(self, branch: str) -> int: + if branch != _MAIN_BRANCH: + return self.lookup_reference(branch).snapshot_id return self.metadata.current_snapshot_id @@ -558,7 +557,7 @@ def commit(self, patch: Optional[rt.Patch]) -> None: # Check that no other commit has taken place. assert self._snapshot_id is not None self._storage.reload() - current_snapshot_id = self.current_snapshot_id + current_snapshot_id = self.get_current_snapshot_id(self._branch) if self._snapshot_id != current_snapshot_id: self._result = JobResult( @@ -586,7 +585,7 @@ def __enter__(self) -> Transaction: # All mutations start with a transaction, so storage is always reloaded for # mutations. self._storage.reload() - self._snapshot_id = self._storage.current_snapshot_id + self._snapshot_id = self._storage.get_current_snapshot_id(self._branch) logging.info(f"Start transaction {self._txn_id}") return self From c36a06ffd3d3e782423c49a0d9437f839d8fd2e3 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Sat, 17 Feb 2024 22:33:07 +0000 Subject: [PATCH 25/31] fix doc --- python/src/space/core/storage.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/src/space/core/storage.py b/python/src/space/core/storage.py index 0a0e22d..12951b1 100644 --- a/python/src/space/core/storage.py +++ b/python/src/space/core/storage.py @@ -120,6 +120,7 @@ def physical_schema(self) -> pa.Schema: return self._physical_schema def get_current_snapshot_id(self, branch: str) -> int: + """Returns the snapshot id for the current branch.""" if branch != _MAIN_BRANCH: return self.lookup_reference(branch).snapshot_id @@ -557,7 +558,7 @@ def commit(self, patch: Optional[rt.Patch]) -> None: # Check that no other commit has taken place. assert self._snapshot_id is not None self._storage.reload() - current_snapshot_id = self.get_current_snapshot_id(self._branch) + current_snapshot_id = self.storage.get_current_snapshot_id(self._branch) if self._snapshot_id != current_snapshot_id: self._result = JobResult( From 1f05c612f5a8474187af4bf60ce5e4d3acd3a3a4 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Sat, 17 Feb 2024 22:36:28 +0000 Subject: [PATCH 26/31] fix bug --- python/src/space/core/storage.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/src/space/core/storage.py b/python/src/space/core/storage.py index 12951b1..9ce66ef 100644 --- a/python/src/space/core/storage.py +++ b/python/src/space/core/storage.py @@ -301,7 +301,9 @@ def _remove_reference( ref_type: meta.SnapshotReference.ReferenceType.ValueType) -> None: if (ref_name not in self._metadata.refs or self._metadata.refs[ref_name].type != ref_type): - raise errors.VersionNotFoundError(f"Reference {ref_name} is not found") + raise errors.VersionNotFoundError( + f"Reference {ref_name} is not found or has a wrong type (tag vs branch)" + ) new_metadata = meta.StorageMetadata() new_metadata.CopyFrom(self._metadata) @@ -558,7 +560,7 @@ def commit(self, patch: Optional[rt.Patch]) -> None: # Check that no other commit has taken place. assert self._snapshot_id is not None self._storage.reload() - current_snapshot_id = self.storage.get_current_snapshot_id(self._branch) + current_snapshot_id = self._storage.get_current_snapshot_id(self._branch) if self._snapshot_id != current_snapshot_id: self._result = JobResult( From 55d2e4e7e2ca0a183fe6ea72b812b7b7e0b5a49f Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Sat, 17 Feb 2024 22:40:38 +0000 Subject: [PATCH 27/31] fix line too long --- python/src/space/core/storage.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/src/space/core/storage.py b/python/src/space/core/storage.py index 9ce66ef..55eac25 100644 --- a/python/src/space/core/storage.py +++ b/python/src/space/core/storage.py @@ -302,8 +302,8 @@ def _remove_reference( if (ref_name not in self._metadata.refs or self._metadata.refs[ref_name].type != ref_type): raise errors.VersionNotFoundError( - f"Reference {ref_name} is not found or has a wrong type (tag vs branch)" - ) + f"Reference {ref_name} is not found or has a wrong type " + "(tag vs branch)") new_metadata = meta.StorageMetadata() new_metadata.CopyFrom(self._metadata) @@ -566,7 +566,7 @@ def commit(self, patch: Optional[rt.Patch]) -> None: self._result = JobResult( JobResult.State.FAILED, None, "Abort commit because the storage has been modified.") - return + returnS if patch is None: self._result = JobResult(JobResult.State.SKIPPED) From 08652bcc960d5c03acaa8d3fc6b01a50b76429f3 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Sat, 17 Feb 2024 22:43:40 +0000 Subject: [PATCH 28/31] fix typo --- python/src/space/core/storage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/src/space/core/storage.py b/python/src/space/core/storage.py index 55eac25..2ec90b5 100644 --- a/python/src/space/core/storage.py +++ b/python/src/space/core/storage.py @@ -566,7 +566,7 @@ def commit(self, patch: Optional[rt.Patch]) -> None: self._result = JobResult( JobResult.State.FAILED, None, "Abort commit because the storage has been modified.") - returnS + return if patch is None: self._result = JobResult(JobResult.State.SKIPPED) From e9b24a2a9ed115c8cd791830bec0b4462e5ce137 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Sun, 18 Feb 2024 00:37:34 +0000 Subject: [PATCH 29/31] resolve commtny --- python/src/space/core/storage.py | 11 ++++++----- python/src/space/ray/runners.py | 3 ++- python/tests/core/test_storage.py | 3 +-- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/python/src/space/core/storage.py b/python/src/space/core/storage.py index 2ec90b5..70d1621 100644 --- a/python/src/space/core/storage.py +++ b/python/src/space/core/storage.py @@ -119,7 +119,7 @@ def physical_schema(self) -> pa.Schema: """Return the physcal schema that uses reference for record fields.""" return self._physical_schema - def get_current_snapshot_id(self, branch: str) -> int: + def current_snapshot_id(self, branch: str) -> int: """Returns the snapshot id for the current branch.""" if branch != _MAIN_BRANCH: return self.lookup_reference(branch).snapshot_id @@ -326,14 +326,15 @@ def commit(self, patch: rt.Patch, branch: str) -> None: new_metadata = meta.StorageMetadata() new_metadata.CopyFrom(self._metadata) new_snapshot_id = self._next_snapshot_id() - if branch and branch != _MAIN_BRANCH: + if branch != _MAIN_BRANCH: branch_snapshot = self.lookup_reference(branch) # To block the case delete branch and add a tag during commit # TODO: move this check out of commit() if branch_snapshot.type != meta.SnapshotReference.BRANCH: raise errors.UserInputError("Branch {branch} is no longer exists.") - current_snapshot = self.snapshot(branch_snapshot.snapshot_id) + new_metadata.refs[branch].snapshot_id = new_snapshot_id + current_snapshot = self.snapshot(branch_snapshot.snapshot_id) else: new_metadata.current_snapshot_id = new_snapshot_id current_snapshot = self.snapshot(self._metadata.current_snapshot_id) @@ -560,7 +561,7 @@ def commit(self, patch: Optional[rt.Patch]) -> None: # Check that no other commit has taken place. assert self._snapshot_id is not None self._storage.reload() - current_snapshot_id = self._storage.get_current_snapshot_id(self._branch) + current_snapshot_id = self._storage.current_snapshot_id(self._branch) if self._snapshot_id != current_snapshot_id: self._result = JobResult( @@ -588,7 +589,7 @@ def __enter__(self) -> Transaction: # All mutations start with a transaction, so storage is always reloaded for # mutations. self._storage.reload() - self._snapshot_id = self._storage.get_current_snapshot_id(self._branch) + self._snapshot_id = self._storage.current_snapshot_id(self._branch) logging.info(f"Start transaction {self._txn_id}") return self diff --git a/python/src/space/ray/runners.py b/python/src/space/ray/runners.py index d4af717..b09f575 100644 --- a/python/src/space/ray/runners.py +++ b/python/src/space/ray/runners.py @@ -297,7 +297,8 @@ def append_from( ray_options, self._file_options) op.write_from(source_fns) - return op.finish() + return _append_from(self._storage, source_fns, ray_options, + self._file_options) @StorageMixin.transactional def append_array_record(self, pattern: str, diff --git a/python/tests/core/test_storage.py b/python/tests/core/test_storage.py index 62e44ec..0015489 100644 --- a/python/tests/core/test_storage.py +++ b/python/tests/core/test_storage.py @@ -407,8 +407,7 @@ def test_branches(self, tmp_path): metadata = storage.metadata assert len(metadata.refs) == 2 - assert snapshot_id1 == metadata.current_snapshot_id - assert snapshot_id2 == metadata.current_snapshot_id + assert snapshot_id1 == snapshot_id2 == metadata.current_snapshot_id versions = storage.versions().to_pydict() versions["tag_or_branch"].sort() From 281471ebbc1d92f9e8166b04e69dce151b21068a Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Sun, 18 Feb 2024 00:42:54 +0000 Subject: [PATCH 30/31] revert merge conflict change --- python/src/space/ray/runners.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/python/src/space/ray/runners.py b/python/src/space/ray/runners.py index b09f575..97f72c6 100644 --- a/python/src/space/ray/runners.py +++ b/python/src/space/ray/runners.py @@ -293,10 +293,6 @@ def append_from( ray_options.max_parallelism = min(len(source_fns), ray_options.max_parallelism) - op = RayAppendOp(self._storage.location, self._storage.metadata, - ray_options, self._file_options) - op.write_from(source_fns) - return _append_from(self._storage, source_fns, ray_options, self._file_options) From 6355232ee4c40cbf5b0d3bd393605ab1e765b07b Mon Sep 17 00:00:00 2001 From: coufon Date: Sun, 18 Feb 2024 01:25:40 +0000 Subject: [PATCH 31/31] Format code using Google style --- python/src/space/core/datasets.py | 4 +-- python/src/space/core/ops/append.py | 4 +-- python/src/space/core/runners.py | 8 +++--- python/src/space/core/storage.py | 7 +++--- python/src/space/ray/runners.py | 24 ++++++++---------- python/tests/core/test_runners.py | 6 ++--- python/tests/core/test_storage.py | 17 ++++++------- python/tests/ray/test_runners.py | 38 +++++++++++++---------------- 8 files changed, 47 insertions(+), 61 deletions(-) diff --git a/python/src/space/core/datasets.py b/python/src/space/core/datasets.py index de36fff..29c7649 100644 --- a/python/src/space/core/datasets.py +++ b/python/src/space/core/datasets.py @@ -51,8 +51,8 @@ def create(cls, location: str, schema: pa.Schema, primary_keys: List[str], primary_keys: un-enforced primary keys. record_fields: fields stored in row format (ArrayRecord). """ - return Dataset( - Storage.create(location, schema, primary_keys, record_fields)) + return Dataset(Storage.create(location, schema, primary_keys, + record_fields)) @classmethod def load(cls, location: str) -> Dataset: diff --git a/python/src/space/core/ops/append.py b/python/src/space/core/ops/append.py index d0da93a..6672323 100644 --- a/python/src/space/core/ops/append.py +++ b/python/src/space/core/ops/append.py @@ -234,8 +234,8 @@ def _finish_index_writer(self) -> None: file_path = self._index_writer_info.file_path stats = self._index_manifest_writer.write( file_path, self._index_writer_info.writer.writer.metadata) - utils.update_index_storage_stats( - base=self._patch.storage_statistics_update, update=stats) + utils.update_index_storage_stats(base=self._patch.storage_statistics_update, + update=stats) self._patch.change_log.added_rows.append( meta.RowBitmap(file=file_path, all_rows=True, num_rows=stats.num_rows)) diff --git a/python/src/space/core/runners.py b/python/src/space/core/runners.py index 5e1f992..32e0744 100644 --- a/python/src/space/core/runners.py +++ b/python/src/space/core/runners.py @@ -231,10 +231,10 @@ def diff(self, start_version: Version, end_version: Version, batch_size: Optional[int] = None) -> Iterator[ChangeData]: - return read_change_data( - self._storage, self._storage.version_to_snapshot_id(start_version), - self._storage.version_to_snapshot_id(end_version), - ReadOptions(batch_size=batch_size)) + return read_change_data(self._storage, + self._storage.version_to_snapshot_id(start_version), + self._storage.version_to_snapshot_id(end_version), + ReadOptions(batch_size=batch_size)) @StorageMixin.transactional def append(self, data: InputData) -> Optional[rt.Patch]: diff --git a/python/src/space/core/storage.py b/python/src/space/core/storage.py index 70d1621..7eb32fa 100644 --- a/python/src/space/core/storage.py +++ b/python/src/space/core/storage.py @@ -212,8 +212,7 @@ def reload(self) -> bool: metadata = _read_metadata(self._fs, self._location, entry_point) self.__init__( # type: ignore[misc] # pylint: disable=unnecessary-dunder-call - self.location, entry_point.metadata_file, metadata, - self.current_branch) + self.location, entry_point.metadata_file, metadata, self.current_branch) logging.info( f"Storage reloaded to snapshot: {self._metadata.current_snapshot_id}") return True @@ -299,8 +298,8 @@ def remove_branch(self, branch: str) -> None: def _remove_reference( self, ref_name: str, ref_type: meta.SnapshotReference.ReferenceType.ValueType) -> None: - if (ref_name not in self._metadata.refs - or self._metadata.refs[ref_name].type != ref_type): + if (ref_name not in self._metadata.refs or + self._metadata.refs[ref_name].type != ref_type): raise errors.VersionNotFoundError( f"Reference {ref_name} is not found or has a wrong type " "(tag vs branch)") diff --git a/python/src/space/ray/runners.py b/python/src/space/ray/runners.py index 97f72c6..0cfa206 100644 --- a/python/src/space/ray/runners.py +++ b/python/src/space/ray/runners.py @@ -197,13 +197,12 @@ def refresh(self, previous_snapshot_id: Optional[int] = None txn = self._start_txn() - for change in self.diff_ray(start_snapshot_id, end_snapshot_id, - batch_size): + for change in self.diff_ray(start_snapshot_id, end_snapshot_id, batch_size): assert isinstance(change.data, list) # Commit when changes from the same snapshot end. - if (previous_snapshot_id is not None - and change.snapshot_id != previous_snapshot_id): + if (previous_snapshot_id is not None and + change.snapshot_id != previous_snapshot_id): txn.commit(utils.merge_patches(patches)) patches.clear() @@ -223,8 +222,7 @@ def refresh(self, elif change.type_ == ChangeType.ADD: patches.append(self._process_append(change.data)) else: - raise NotImplementedError( - f"Change type {change.type_} not supported") + raise NotImplementedError(f"Change type {change.type_} not supported") except (errors.SpaceRuntimeError, errors.UserInputError) as e: r = JobResult(JobResult.State.FAILED, None, repr(e)) return job_results + [r] @@ -237,8 +235,7 @@ def refresh(self, return job_results - def _process_delete(self, - data: List[ray.data.Dataset]) -> Optional[rt.Patch]: + def _process_delete(self, data: List[ray.data.Dataset]) -> Optional[rt.Patch]: # Deletion does not use parallel read streams. assert len(data) == 1 arrow_data = pa.concat_tables(iter_batches( @@ -253,8 +250,7 @@ def _process_delete(self, self._file_options) return op.delete() - def _process_append(self, - data: List[ray.data.Dataset]) -> Optional[rt.Patch]: + def _process_append(self, data: List[ray.data.Dataset]) -> Optional[rt.Patch]: return _append_from(self._storage, [partial(iter_batches, ds) for ds in data], self._ray_options, self._file_options) @@ -284,8 +280,8 @@ def append(self, data: InputData) -> Optional[rt.Patch]: @StorageMixin.transactional def append_from( - self, - source_fns: Union[InputIteratorFn, List[InputIteratorFn]]) -> Optional[rt.Patch]: # pylint: disable=line-too-long + self, source_fns: Union[InputIteratorFn, List[InputIteratorFn]] + ) -> Optional[rt.Patch]: if not isinstance(source_fns, list): source_fns = [source_fns] @@ -309,8 +305,8 @@ def append_parquet(self, pattern: str) -> Optional[rt.Patch]: @StorageMixin.transactional def _insert(self, data: InputData, mode: InsertOptions.Mode) -> Optional[rt.Patch]: - op = RayInsertOp(self._storage, InsertOptions(mode=mode), - self._ray_options, self._file_options) + op = RayInsertOp(self._storage, InsertOptions(mode=mode), self._ray_options, + self._file_options) return op.write(data) @StorageMixin.transactional diff --git a/python/tests/core/test_runners.py b/python/tests/core/test_runners.py index 9aa4c76..8e5e573 100644 --- a/python/tests/core/test_runners.py +++ b/python/tests/core/test_runners.py @@ -36,8 +36,7 @@ class TestLocalRunner: @pytest.fixture def sample_dataset(self, tmp_path): simple_tf_features_dict = f.FeaturesDict({ - "image": - f.Image(shape=(None, None, 3), dtype=np.uint8), + "image": f.Image(shape=(None, None, 3), dtype=np.uint8), }) schema = pa.schema([("id", pa.int64()), ("name", pa.string()), ("feat1", TfFeatures(simple_tf_features_dict)), @@ -298,8 +297,7 @@ def test_dataset_with_file_type(self, tmp_path): def _generate_data(ids: Iterable[int]) -> pa.Table: return pa.Table.from_pydict({ - "id": - ids, + "id": ids, "name": [f"name_{i}" for i in ids], "feat1": [bytes(f"feat1_{i}", "utf-8") for i in ids], "feat2": [bytes(f"feat2_{i}", "utf-8") for i in ids], diff --git a/python/tests/core/test_storage.py b/python/tests/core/test_storage.py index 0015489..de4d96b 100644 --- a/python/tests/core/test_storage.py +++ b/python/tests/core/test_storage.py @@ -274,8 +274,7 @@ def commit_add_index_manifest(manifest_path: str): # Test data_files() with filters. index_file1.manifest_file_id = 1 assert storage.data_files(filter_=pc.field("int64") > 1000) == rt.FileSet( - index_files=[index_file1], - index_manifest_files={1: manifests_dict2[2]}) + index_files=[index_file1], index_manifest_files={1: manifests_dict2[2]}) def test_create_storage_schema_validation(self, tmp_path): location = tmp_path / "dataset" @@ -294,17 +293,15 @@ def test_create_storage_schema_validation(self, tmp_path): primary_keys=["not_exist"], record_fields=[]) - with pytest.raises( - errors.UserInputError, - match=r".*Record field int64 cannot be a primary key.*"): + with pytest.raises(errors.UserInputError, + match=r".*Record field int64 cannot be a primary key.*"): Storage.create(location=str(location), schema=pa.schema([pa.field("int64", pa.int64())]), primary_keys=["int64"], record_fields=["int64"]) - with pytest.raises( - errors.UserInputError, - match=r".*Record field not_exist not found in schema.*"): + with pytest.raises(errors.UserInputError, + match=r".*Record field not_exist not found in schema.*"): Storage.create(location=str(location), schema=pa.schema([pa.field("int64", pa.int64())]), primary_keys=["int64"], @@ -313,8 +310,8 @@ def test_create_storage_schema_validation(self, tmp_path): with pytest.raises(errors.UserInputError, match=r".*Primary key type not supported.*"): Storage.create(location=str(location), - schema=pa.schema( - [pa.field("list", pa.list_(pa.string()))]), + schema=pa.schema([pa.field("list", + pa.list_(pa.string()))]), primary_keys=["list"], record_fields=["list"]) diff --git a/python/tests/ray/test_runners.py b/python/tests/ray/test_runners.py index 29c61c6..c7b61ec 100644 --- a/python/tests/ray/test_runners.py +++ b/python/tests/ray/test_runners.py @@ -101,15 +101,15 @@ def test_write_read_dataset(self, sample_dataset, enable_row_range_block, input_data0.sort_by("int64")) runner.append_from([ - lambda: iter([input_data1, input_data2]), - lambda: iter([input_data3]), lambda: iter([input_data4]) + lambda: iter([input_data1, input_data2]), lambda: iter([input_data3]), + lambda: iter([input_data4]) ]) assert_equal( runner.read_all(batch_size=batch_size).sort_by("int64"), - pa.concat_tables([ - input_data0, input_data1, input_data2, input_data3, input_data4 - ]).sort_by("int64")) + pa.concat_tables( + [input_data0, input_data1, input_data2, input_data3, + input_data4]).sort_by("int64")) # Test insert. result = runner.insert(generate_data([7, 12])) @@ -161,7 +161,7 @@ def test_write_read_dataset(self, sample_dataset, enable_row_range_block, }) ]).sort_by("int64")) - @pytest.mark.parametrize("enable_row_range_block", [(True, ), (False, )]) + @pytest.mark.parametrize("enable_row_range_block", [(True,), (False,)]) def test_read_batch_size(self, tmp_path, sample_schema, enable_row_range_block): ds = Dataset.create(str(tmp_path / f"dataset_{random_id()}"), @@ -180,8 +180,7 @@ def test_read_batch_size(self, tmp_path, sample_schema, assert d.num_rows == 10 @pytest.mark.parametrize("refresh_batch_size", [None, 2]) - def test_diff_map_batches(self, tmp_path, sample_dataset, - refresh_batch_size): + def test_diff_map_batches(self, tmp_path, sample_dataset, refresh_batch_size): ds = sample_dataset view_schema = pa.schema( @@ -249,8 +248,7 @@ def test_diff_map_batches(self, tmp_path, sample_dataset, with pytest.raises( errors.VersionNotFoundError, - match=r".*Target snapshot ID 3 higher than source dataset version 2.*" - ): + match=r".*Target snapshot ID 3 higher than source dataset version 2.*"): ray_runner.refresh(3) # Test upsert's diff. @@ -406,11 +404,13 @@ def generate_data2(values: Iterable[int]) -> pa.Table: left_fields_ = [ ds1_schema.field(f).remove_metadata() - for f in left_fields or ds1.schema.names if f != "int64" + for f in left_fields or ds1.schema.names + if f != "int64" ] right_fields_ = [ ds2.schema.field(f).remove_metadata() - for f in right_fields or ds2.schema.names if f != "int64" + for f in right_fields or ds2.schema.names + if f != "int64" ] if not swap: @@ -426,8 +426,7 @@ def generate_data2(values: Iterable[int]) -> pa.Table: def generate_expected(values: Iterable[int]) -> pa.Table: return pa.Table.from_pydict({ - "int64": - values, + "int64": values, "float64": [v / 10 for v in values], "binary": [f"b{v}".encode("utf-8") for v in values], "string": [f"s{v}" for v in values] @@ -443,8 +442,7 @@ def generate_expected(values: Iterable[int]) -> pa.Table: # Sanity checks of addresses. address_column = join_values.column("binary").combine_chunks() - assert address_column.field("_FILE")[0].as_py().startswith( - "data/binary_") + assert address_column.field("_FILE")[0].as_py().startswith("data/binary_") assert len(address_column.field("_ROW_ID")) == len(indexes) # Test reading addresses. @@ -495,9 +493,8 @@ def test_join_input_validation(self, tmp_path, sample_dataset): left_fields=["float64"], right_fields=["string"]) - with pytest.raises( - errors.UserInputError, - match=r".*Join key must be primary key on both sides.*"): + with pytest.raises(errors.UserInputError, + match=r".*Join key must be primary key on both sides.*"): ds1.join(ds2, keys=["string"], left_fields=["float64"], @@ -506,8 +503,7 @@ def test_join_input_validation(self, tmp_path, sample_dataset): def generate_data(values: Iterable[int]) -> pa.Table: return pa.Table.from_pydict({ - "int64": - values, + "int64": values, "float64": [v / 10 for v in values], "binary": [f"b{v}".encode("utf-8") for v in values] })