diff --git a/setup.cfg b/setup.cfg index aface05..854c8e3 100644 --- a/setup.cfg +++ b/setup.cfg @@ -38,6 +38,7 @@ relic.cli = relic.cli.sga = unpack = relic.sga.core.cli:RelicSgaUnpackCli pack = relic.sga.core.cli:RelicSgaPackCli + repack = relic.sga.core.cli:RelicSgaRepackCli [options.packages.find] where = src \ No newline at end of file diff --git a/src/relic/sga/core/__init__.py b/src/relic/sga/core/__init__.py index 2809536..dd87194 100644 --- a/src/relic/sga/core/__init__.py +++ b/src/relic/sga/core/__init__.py @@ -3,4 +3,4 @@ """ from relic.sga.core.definitions import Version, MagicWord, StorageType, VerificationType -__version__ = "1.1.1" +__version__ = "1.1.2" diff --git a/src/relic/sga/core/cli.py b/src/relic/sga/core/cli.py index a4b7423..17aae81 100644 --- a/src/relic/sga/core/cli.py +++ b/src/relic/sga/core/cli.py @@ -1,7 +1,9 @@ from __future__ import annotations +import argparse +import os.path from argparse import ArgumentParser, Namespace -from typing import Optional +from typing import Optional, Callable import fs.copy from fs.base import FS @@ -20,6 +22,42 @@ def _create_parser( return command_group.add_parser("sga") +def _arg_exists_err(value: str) -> argparse.ArgumentTypeError: + return argparse.ArgumentTypeError(f"The given path '{value}' does not exist!") + + +def _get_dir_type_validator(exists: bool) -> Callable[[str], str]: + def _dir_type(path: str) -> str: + if not os.path.exists(path): + if exists: + raise _arg_exists_err(path) + else: + return path + + if os.path.isdir(path): + return path + + raise argparse.ArgumentTypeError(f"The given path '{path}' is not a directory!") + + return _dir_type + + +def _get_file_type_validator(exists: Optional[bool]) -> Callable[[str], str]: + def _file_type(path: str) -> str: + if not os.path.exists(path): + if exists: + raise _arg_exists_err(path) + else: + return path + + if os.path.isfile(path): + return path + + raise argparse.ArgumentTypeError(f"The given path '{path}' is not a file!") + + return _file_type + + class RelicSgaUnpackCli(CliPlugin): def _create_parser( self, command_group: Optional[_SubParsersAction] = None @@ -30,8 +68,16 @@ def _create_parser( else: parser = command_group.add_parser("unpack") - parser.add_argument("src_sga", type=str, help="Source SGA File") - parser.add_argument("out_dir", type=str, help="Output Directory") + parser.add_argument( + "src_sga", + type=_get_file_type_validator(exists=True), + help="Source SGA File", + ) + parser.add_argument( + "out_dir", + type=_get_dir_type_validator(exists=False), + help="Output Directory", + ) return parser @@ -64,3 +110,23 @@ def _create_parser( # pack further delegates to version plugins return parser + + +class RelicSgaRepackCli(CliPluginGroup): + """An alternative to pack which 'repacks' an SGA. Intended for testing purposes.""" + + GROUP = "relic.cli.sga.repack" + + def _create_parser( + self, command_group: Optional[_SubParsersAction] = None + ) -> ArgumentParser: + parser: ArgumentParser + desc = "Debug Command; reads and repacks an SGA archive." + if command_group is None: + parser = ArgumentParser("repack", description=desc) + else: + parser = command_group.add_parser("repack", description=desc) + + # pack further delegates to version plugins + + return parser diff --git a/src/relic/sga/core/filesystem.py b/src/relic/sga/core/filesystem.py index b2ebbf3..fb16593 100644 --- a/src/relic/sga/core/filesystem.py +++ b/src/relic/sga/core/filesystem.py @@ -240,9 +240,10 @@ def to_info(self, namespaces=None): class _EssenceDriveFS(MemoryFS): - def __init__(self, alias: str) -> None: + def __init__(self, alias: str, name: str) -> None: super().__init__() self.alias = alias + self.name = name def _make_dir_entry( self, resource_type: ResourceType, name: str @@ -292,6 +293,22 @@ def setinfo(self, path: str, info: Mapping[str, Mapping[str, object]]) -> None: # if LAZY_NAMESPACE in info and not resource_entry.is_dir: # lazy + def getinfo( + self, path, namespaces=None + ): # type: (Text, Optional[Collection[Text]]) -> Info + info = super().getinfo(path, namespaces) + + _path = self.validatepath(path) + if _path == "/" and ( + namespaces is not None and ESSENCE_NAMESPACE in namespaces + ): + raw_info = info.raw + essence_ns = dict(raw_info[ESSENCE_NAMESPACE]) + essence_ns["alias"] = self.alias + essence_ns["name"] = self.name + info = Info(raw_info) + return info + def getessence(self, path: str) -> Info: return self.getinfo(path, [ESSENCE_NAMESPACE]) @@ -324,9 +341,12 @@ def setmeta(self, meta: Dict[str, Any], namespace: str = "standard") -> None: def getessence(self, path: str) -> Info: return self.getinfo(path, [ESSENCE_NAMESPACE]) - def create_drive(self, name: str) -> _EssenceDriveFS: - drive = _EssenceDriveFS(name) - self.add_fs(name, drive) + def create_drive(self, alias: str, name: str) -> _EssenceDriveFS: + drive = _EssenceDriveFS(alias, name) + first_drive = len([*self.iterate_fs()]) == 0 + self.add_fs( + alias, drive, write=first_drive + ) # TODO see if name would work here, using alias because that is what it originally was return drive def _delegate(self, path): @@ -340,39 +360,6 @@ def _delegate(self, path): return super()._delegate(path) -# if __name__ == "__main__": -# test_file = File("test.txt", b"This is a Test!", StorageType.STORE, False, None) -# test_folder = Folder("Test", [], [test_file]) -# data_folders = [test_folder] -# data_files = [] -# data_drive = Drive("data", "", data_folders, data_files) -# attr_drive = Drive("attr", "", [], [test_file]) -# archive = Archive("Test", None, [data_drive, attr_drive]) -# -# with SGAFS() as fs: -# data_fs = MemoryFS() -# fs.add_fs("data", data_fs) -# data_dir = data_fs.makedir("Test Data") -# with data_dir.open("sample_data.txt", "wb") as data_sample_text: -# data_sample_text.write(b"Sample Data Text!") -# -# attr_fs = MemoryFS() -# fs.add_fs("attr", attr_fs) -# attr_dir = attr_fs.makedir("Test Attr") -# with attr_dir.open("sample_attr.txt", "wb") as attr_sample_text: -# attr_sample_text.write(b"Sample Attr Text!") -# -# for root, folders, files in fs.walk(): -# print(root, "\n\t", folders, "\n\t", files) -# -# for name, sub_fs in fs.iterate_fs(): -# print(name) -# for root, folders, files in sub_fs.walk(): -# print("\t", root, "\n\t\t", folders, "\n\t\t", files) -# -# print(fs.getinfo("/", ["basic", "access"]).raw) -# pass - __all__ = [ "ESSENCE_NAMESPACE", "EssenceFSHandler", diff --git a/src/relic/sga/core/serialization.py b/src/relic/sga/core/serialization.py index 1944331..7862582 100644 --- a/src/relic/sga/core/serialization.py +++ b/src/relic/sga/core/serialization.py @@ -16,6 +16,7 @@ Iterable, TypeVar, Generic, + Type, ) from fs.base import FS @@ -220,6 +221,8 @@ def _write_data(data: bytes, stream: BinaryIO) -> int: def _get_or_write_name(name: str, stream: BinaryIO, lookup: Dict[str, int]) -> int: + # Tools don't like "/" so coerce "/" to "\" + name = name.replace("/", "\\") if name in lookup: return lookup[name] @@ -399,7 +402,7 @@ def assemble_drive( drive_folder_index = drive_def.root_folder - folder_offset drive_folder_def = local_folder_defs[drive_folder_index] - drive = essence_fs.create_drive(drive_def.alias) + drive = essence_fs.create_drive(drive_def.alias, drive_def.name) self._assemble_container( drive, drive_folder_def.file_range, @@ -508,41 +511,70 @@ def flatten_folder_collection(self, container_fs: FS, path: str) -> Tuple[int, i self.flat_folders[subfolder_start:subfolder_end] = subfolder_defs return subfolder_start, subfolder_end + def _flatten_folder_names(self, fs: FS, path: str) -> None: + folders = [file_info.name for file_info in fs.scandir("/") if file_info.is_dir] + files = [file_info.name for file_info in fs.scandir("/") if file_info.is_file] + + if len(path) > 0 and path[0] == "/": + path = path[1:] # strip leading '/' + _get_or_write_name(path, self.name_stream, self.flat_names) + + for fold_path in folders: + full_fold_path = f"{path}/{fold_path}" + full_fold_path = str(full_fold_path).split(":", 1)[ + -1 + ] # Strip 'alias:' from path + if full_fold_path[0] == "/": + full_fold_path = full_fold_path[1:] # strip leading '/' + _get_or_write_name(full_fold_path, self.name_stream, self.flat_names) + + for file_path in files: + _get_or_write_name(file_path, self.name_stream, self.flat_names) + def disassemble_folder(self, folder_fs: FS, path: str) -> FolderDef: folder_def = FolderDef(None, None, None) # type: ignore - - # Subfiles - subfile_range = self.flatten_file_collection(folder_fs) - # Subfolders - # # Since Relic typically uses the first folder as the root folder; I will try to preserve that parent folders come before their child folders - subfolder_range = self.flatten_folder_collection(folder_fs, path) + # Write Name + self._flatten_folder_names(folder_fs, path) folder_name = str(path).split(":", 1)[-1] # Strip 'alias:' from path - if folder_name[0] == "/": folder_name = folder_name[1:] # strip leading '/' - folder_def.name_pos = _get_or_write_name( folder_name, self.name_stream, self.flat_names ) + + # Subfolders + # # Since Relic typically uses the first folder as the root folder; I will try to preserve that parent folders come before their child folders + subfolder_range = self.flatten_folder_collection(folder_fs, path) + + # Subfiles + subfile_range = self.flatten_file_collection(folder_fs) + folder_def.file_range = subfile_range folder_def.folder_range = subfolder_range return folder_def - def disassemble_drive(self, drive: _EssenceDriveFS, alias: str) -> DriveDef: - name = "" + def disassemble_drive(self, drive: _EssenceDriveFS) -> DriveDef: + name = drive.name + folder_name = "" + alias = drive.alias drive_folder_def = FolderDef(None, None, None) # type: ignore + self._flatten_folder_names(drive, folder_name) + root_folder = len(self.flat_folders) folder_start = len(self.flat_folders) file_start = len(self.flat_files) self.flat_folders.append(drive_folder_def) + # Name should be an empty string? drive_folder_def.name_pos = _get_or_write_name( - name, self.name_stream, self.flat_names + folder_name, self.name_stream, self.flat_names ) drive_folder_def.file_range = self.flatten_file_collection(drive) - drive_folder_def.folder_range = self.flatten_folder_collection(drive, name) + drive_folder_def.folder_range = self.flatten_folder_collection( + drive, folder_name + ) folder_end = len(self.flat_folders) file_end = len(self.flat_files) @@ -593,9 +625,9 @@ def write_toc(self) -> TocBlock: ) def disassemble(self) -> TocBlock: - for name, drive_fs in self.fs.iterate_fs(): + for _, drive_fs in self.fs.iterate_fs(): drive_fs = typing.cast(_EssenceDriveFS, drive_fs) - drive_def = self.disassemble_drive(drive_fs, name) + drive_def = self.disassemble_drive(drive_fs) self.flat_drives.append(drive_def) return self.write_toc() @@ -740,6 +772,8 @@ def __init__( gen_empty_meta: Callable[[], TMetaBlock], finalize_meta: Callable[[BinaryIO, TMetaBlock], None], meta2def: Callable[[Dict[str, object]], TFileDef], + assembler: Optional[Type[FSAssembler[TFileDef]]] = None, + disassembler: Optional[Type[FSDisassembler[TFileDef]]] = None, ): self.version = version self.meta_serializer = meta_serializer @@ -752,6 +786,8 @@ def __init__( self.gen_empty_meta = gen_empty_meta self.finalize_meta = finalize_meta self.meta2def = meta2def + self.assembler_type = assembler or FSAssembler + self.disassembler_type = disassembler or FSDisassembler def read(self, stream: BinaryIO) -> EssenceFS: # Magic & Version; skippable so that we can check for a valid file and read the version elsewhere @@ -773,7 +809,7 @@ def read(self, stream: BinaryIO) -> EssenceFS: name, metadata = meta_block.name, self.assemble_meta( stream, meta_block, toc_meta_block ) - assembler: FSAssembler[TFileDef] = FSAssembler( + assembler: FSAssembler[TFileDef] = self.assembler_type( stream=stream, ptrs=meta_block.ptrs, toc=toc_block, @@ -806,7 +842,7 @@ def write(self, stream: BinaryIO, essence_fs: EssenceFS) -> int: with BytesIO() as data_stream: with BytesIO() as toc_stream: with BytesIO() as name_stream: - disassembler = FSDisassembler( + disassembler: FSDisassembler[TFileDef] = self.disassembler_type( fs=essence_fs, toc_stream=toc_stream, data_stream=data_stream, diff --git a/tests/issues/test_issue_39.py b/tests/issues/test_issue_39.py index f9a42c3..7435d41 100644 --- a/tests/issues/test_issue_39.py +++ b/tests/issues/test_issue_39.py @@ -44,7 +44,7 @@ def _generate_fake_osfs() -> FS: def _pack_fake_osfs(osfs: FS, name: str) -> EssenceFS: - # Create 'SGA' + # Create 'SGA' V2 sga = EssenceFS() sga.setmeta( { @@ -57,13 +57,14 @@ def _pack_fake_osfs(osfs: FS, name: str) -> EssenceFS: "essence", ) - alias = "test" + alias = "data" + name = "test data" sga_drive = None # sga.create_drive(alias) for path in osfs.walk.files(): if ( sga_drive is None ): # Lazily create drive, to avoid empty drives from being created - sga_drive = sga.create_drive(alias) + sga_drive = sga.create_drive(alias, name) if "stream" in path: storage = StorageType.STREAM_COMPRESS diff --git a/tests/issues/test_issue_40.py b/tests/issues/test_issue_40.py new file mode 100644 index 0000000..4a39693 --- /dev/null +++ b/tests/issues/test_issue_40.py @@ -0,0 +1,34 @@ +r""" +TestCases for more explicit errors when providing invalid path arguments. +https://github.com/MAK-Relic-Tool/Issue-Tracker/issues/40 +""" +import io +from typing import Iterable +from contextlib import redirect_stderr + +import pytest + +_ARGS = [ + ( + ["sga", "unpack", "nonexistant.sga", "."], + "error: argument src_sga: The given path 'nonexistant.sga' does not exist!", + ), + ( + ["sga", "unpack", __file__, __file__], + rf"error: argument out_dir: The given path '{__file__}' is not a directory!", + ), +] + + +@pytest.mark.parametrize(["args", "msg"], _ARGS) +def test_argparse_error(args: Iterable[str], msg: str): + from relic.core.cli import cli_root + + with io.StringIO() as f: + with redirect_stderr(f): + status = cli_root.run_with(*args) + assert status == 2 + f.seek(0) + err = f.read() + print(err) + assert msg in err diff --git a/tests/test_cli.py b/tests/test_cli.py index 5bce432..28a71cc 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -31,7 +31,7 @@ def test_run_with(self, args: Sequence[str], output: str, exit_code: int): assert status == exit_code -_SGA_HELP = ["sga", "-h"], """usage: relic sga [-h] {pack,unpack} ...""", 0 +_SGA_HELP = ["sga", "-h"], """usage: relic sga [-h] {pack,repack,unpack} ...""", 0 _SGA_PACK_HELP = ["sga", "pack", "-h"], """usage: relic sga pack [-h] {} ...""", 0 _SGA_UNPACK_HELP = ["sga", "unpack", "-h"], """usage: relic sga unpack [-h]""", 0 diff --git a/tests/test_filesystem.py b/tests/test_filesystem.py index 69a5082..df17a4d 100644 --- a/tests/test_filesystem.py +++ b/tests/test_filesystem.py @@ -12,13 +12,13 @@ def make_fs(self): # EssenceFS shouldn't be writeable by default; # being an emulator for Window's hard drives. # With no 'drive' installed, there's nothing to write to! - essence_fs.add_fs("data", _EssenceDriveFS("data"), True) + essence_fs.add_fs("data", _EssenceDriveFS("data", "test"), True) return essence_fs class TestEssenceDriveFS(FSTestCases, unittest.TestCase): def make_fs(self): - return _EssenceDriveFS("") + return _EssenceDriveFS("data", "test") class TestOpener: