diff --git a/.github/workflows/black.yml b/.github/workflows/black.yml index d41355c..28b6b83 100644 --- a/.github/workflows/black.yml +++ b/.github/workflows/black.yml @@ -4,16 +4,16 @@ on: workflow_dispatch: push: - branches: [ main ] + branches: [main] paths: - - 'src/relic/**' - - '.github/workflows/black.yml' + - src/relic/** + - .github/workflows/black.yml pull_request: - branches: [ main ] + branches: [main] paths: - - 'src/relic/**' - - '.github/workflows/black.yml' + - src/relic/** + - .github/workflows/black.yml jobs: black: - uses: MAK-Relic-Tool/Workflows/.github/workflows/black.yml@main \ No newline at end of file + uses: MAK-Relic-Tool/Workflows/.github/workflows/black.yml@main diff --git a/.github/workflows/mypy.yml b/.github/workflows/mypy.yml index 917e10f..a85e300 100644 --- a/.github/workflows/mypy.yml +++ b/.github/workflows/mypy.yml @@ -4,26 +4,27 @@ on: workflow_dispatch: push: - branches: [ main ] + branches: [main] paths: - - 'src/relic/**' - - '.github/workflows/mypy.yml' - - 'mypy.ini' - - 'setup.cfg' - - 'setup.py' - - 'MANIFEST.in' + - src/relic/** + - .github/workflows/mypy.yml + - mypy.ini + - setup.cfg + - setup.py + - MANIFEST.in pull_request: - branches: [ main ] + branches: [main] paths: - - 'src/relic/**' - - '.github/workflows/mypy.yml' - - 'mypy.ini' - - 'setup.cfg' - - 'setup.py' - - 'MANIFEST.in' + - src/relic/** + - .github/workflows/mypy.yml + - mypy.ini + - setup.cfg + - setup.py + - MANIFEST.in jobs: mypy: uses: MAK-Relic-Tool/Workflows/.github/workflows/mypy.yml@main with: - package: "relic.sga.core" \ No newline at end of file + package: relic.sga.core + mypy-config: pyproject.toml diff --git a/.github/workflows/publish-pypi.yml b/.github/workflows/publish-pypi.yml index e7bc18a..6265ab2 100644 --- a/.github/workflows/publish-pypi.yml +++ b/.github/workflows/publish-pypi.yml @@ -10,4 +10,4 @@ jobs: pypi: uses: MAK-Relic-Tool/Workflows/.github/workflows/publish-to-pypi.yml@main secrets: - pypi-token: ${{ secrets.PYPI_API_TOKEN }} \ No newline at end of file + pypi-token: ${{ secrets.PYPI_API_TOKEN }} diff --git a/.github/workflows/pylint.yml b/.github/workflows/pylint.yml index 2f38c27..52955c0 100644 --- a/.github/workflows/pylint.yml +++ b/.github/workflows/pylint.yml @@ -4,22 +4,22 @@ on: workflow_dispatch: push: - branches: [ main ] + branches: [main] paths: - - 'src/relic/**' - - '.github/workflows/pylint.yml' - - '.pylintrc' - - 'requirements.txt' + - src/relic/** + - .github/workflows/pylint.yml + - .pylintrc + - requirements.txt pull_request: - branches: [ main ] + branches: [main] paths: - - 'src/relic/**' - - '.github/workflows/pylint.yml' - - '.pylintrc' - - 'requirements.txt' + - src/relic/** + - .github/workflows/pylint.yml + - .pylintrc + - requirements.txt jobs: pylint: uses: MAK-Relic-Tool/Workflows/.github/workflows/pylint.yml@main with: - path: "src" \ No newline at end of file + path: src diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index 319891c..5c13340 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -4,27 +4,27 @@ on: workflow_dispatch: push: - branches: [ main ] + branches: [main] paths: - - 'src/relic/**' - - 'tests/**' - - '.github/workflows/pytest.yml' - - 'setup.cfg' - - 'setup.py' - - 'MANIFEST.in' - - 'test-requirements.txt' + - src/relic/** + - tests/** + - .github/workflows/pytest.yml + - setup.cfg + - setup.py + - MANIFEST.in + - test-requirements.txt pull_request: - branches: [ main ] + branches: [main] paths: - - 'src/relic/**' - - 'tests/**' - - '.github/workflows/pytest.yml' - - 'setup.cfg' - - 'setup.py' - - 'MANIFEST.in' - - 'test-requirements.txt' + - src/relic/** + - tests/** + - .github/workflows/pytest.yml + - setup.cfg + - setup.py + - MANIFEST.in + - test-requirements.txt jobs: pytest: - uses: MAK-Relic-Tool/Workflows/.github/workflows/pytest.yml@main \ No newline at end of file + uses: MAK-Relic-Tool/Workflows/.github/workflows/pytest.yml@main diff --git a/.gitignore b/.gitignore index 16fe929..3a4ad22 100644 --- a/.gitignore +++ b/.gitignore @@ -12,14 +12,14 @@ __pycache__/ src/**/*.egg-info/* dist/* -# Ignore local builds -build/** - -# Ignore local test sources -tests/**/sources.json +# Ignore MyPy +.mypy_cache/ # Ignore docs ## Ignore compiled docs docs/build/** ## Ignore autogenerated sources docs/source/generated/** + +# Ignore local builds +build/** diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..a8b4308 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,43 @@ +repos: +- repo: https://github.com/pre-commit/pre-commit-hooks + rev: v2.3.0 + hooks: + - id: check-yaml + - id: end-of-file-fixer + - id: trailing-whitespace + - id: check-case-conflict + - id: check-docstring-first + - id: check-json + - id: pretty-format-json + args: [--autofix] + - id: check-merge-conflict + - id: check-toml + - id: check-yaml + - id: mixed-line-ending + - id: requirements-txt-fixer +- repo: https://github.com/psf/black + rev: 24.1.1 + hooks: + - id: black +# - repo: https://github.com/pre-commit/mirrors-mypy + # rev: v1.8.0 + # hooks: + # - id: mypy + # args: [--explicit-package-bases, --namespace-packages] + # additional_dependencies: + # - relic-tool-core >= 2.0.0 + # - fs + +- repo: https://github.com/macisamuele/language-formatters-pre-commit-hooks + rev: v2.12.0 + hooks: + - id: pretty-format-yaml + args: [--autofix] + - id: pretty-format-toml + args: [--autofix] + +- repo: https://github.com/PyCQA/docformatter + rev: v1.7.5 + hooks: + - id: docformatter + args: [--black, --in-place, --recursive] diff --git a/.pylintrc b/.pylintrc index 5c7abb7..14bdc8d 100644 --- a/.pylintrc +++ b/.pylintrc @@ -103,7 +103,7 @@ enable=c-extension-no-member # which contain the number of messages in each category, as well as 'statement' # which is the total number of statements analyzed. This score is used by the # global evaluation report (RP0004). -evaluation=10.0 - ((float(4 * error + 3 * warning + 2 * refactor + 1 * convention) / ((4+3+2+1) * statement)) * 10) +evaluation=10.0 - ((float(4 * error + 3 * warning + 2 * refactor + 1 * convention) / (4 * statement)) * 10) # Template used to display messages. This is a python new-style format string # used to format the message information. See doc for all details. diff --git a/LICENSE.txt b/LICENSE.txt index e72bfdd..f288702 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -671,4 +671,4 @@ into proprietary programs. If your program is a subroutine library, you may consider it more useful to permit linking proprietary applications with the library. If this is what you want to do, use the GNU Lesser General Public License instead of this License. But first, please read -. \ No newline at end of file +. diff --git a/MANIFEST.in b/MANIFEST.in index ff70094..1c39e4b 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1 +1 @@ -global-include py.typed \ No newline at end of file +global-include py.typed diff --git a/README.md b/README.md index 124763a..0478b51 100644 --- a/README.md +++ b/README.md @@ -24,4 +24,4 @@ pip install relic-tool-sga-core For more information, see [pip VCS support](https://pip.pypa.io/en/stable/topics/vcs-support/#git) ``` pip install git+https://github.com/MAK-Relic-Tool/Relic-Tool-SGA-Core -``` \ No newline at end of file +``` diff --git a/docs/source/conf.py b/docs/source/conf.py index adc2afd..0e75426 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -13,14 +13,14 @@ import os import sys -sys.path.insert(0, os.path.abspath('../../src')) +sys.path.insert(0, os.path.abspath("../../src")) # -- Project information ----------------------------------------------------- -from relic.sga import __version__ as package_version +from relic.sga.core import __version__ as package_version -project = 'Relic Tool - SGA' -copyright = '2022, Marcus Kertesz' -author = 'Marcus Kertesz' +project = "Relic Tool - SGA" +copyright = "2022, Marcus Kertesz" +author = "Marcus Kertesz" # The full version, including alpha/beta/rc tags release = package_version @@ -32,13 +32,13 @@ # ones. extensions = [ - 'sphinx.ext.autodoc', # Core library for html generation from docstrings - 'sphinx.ext.autosummary', # Create neat summary tables + "sphinx.ext.autodoc", # Core library for html generation from docstrings + "sphinx.ext.autosummary", # Create neat summary tables ] autosummary_generate = True # Turn on sphinx.ext.autosummary # Add any paths that contain templates here, relative to this directory. -templates_path = ['_templates'] +templates_path = ["_templates"] # List of patterns, relative to source directory, that match files and # directories to ignore when looking for source files. @@ -50,9 +50,9 @@ # The theme to use for HTML and HTML Help pages. See the documentation for # a list of builtin themes. # -html_theme = 'alabaster' +html_theme = "alabaster" # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = ['_static'] +html_static_path = ["_static"] diff --git a/mypy.ini b/mypy.ini deleted file mode 100644 index 11190db..0000000 --- a/mypy.ini +++ /dev/null @@ -1,31 +0,0 @@ -[mypy] -;ignore_missing_imports = False ;Moved to module_level ignores -mypy_path = $MYPY_CONFIG_FILE_DIR/src - -;strict = True -; Manually specify strict flags -warn_unused_configs = True -disallow_any_generics = True -disallow_subclassing_any = True -disallow_untyped_calls = True -disallow_untyped_defs = True -disallow_incomplete_defs = True -check_untyped_defs = True -disallow_untyped_decorators = True -no_implicit_optional = True -warn_redundant_casts = True -warn_unused_ignores = True -warn_return_any = True -no_implicit_reexport = True -strict_equality = True -strict_concatenate = True - -[mypy-serialization_tools.*] -ignore_missing_imports = True - -[mypy-pkg_resources.*] -ignore_missing_imports = True - -[mypy-tests.*] -ignore_missing_imports = True -ignore_errors = True \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index b5a3c46..be154f6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,142 @@ [build-system] -requires = [ - "setuptools>=42", - "wheel" +build-backend = "setuptools.build_meta" +requires = ["setuptools>=61.2"] + +[project] +authors = [{name = "Marcus Kertesz"}] +classifiers = [ + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "License :: OSI Approved :: GNU General Public License v3 (GPLv3)" ] -build-backend = "setuptools.build_meta" \ No newline at end of file +dependencies = [ + "relic-tool-core >= 2.0.0", + "fs" +] +description = "Core files shared by versioned SGA files." +dynamic = ["version"] +name = "relic-tool-sga-core" +requires-python = ">=3.9" + +[project.entry-points] +"relic.cli" = {sga = "relic.sga.core.cli:RelicSgaCli"} + +[project.entry-points."fs.opener"] +sga = "relic.sga.core.essencefs.opener:EssenceFsOpener" + +[project.entry-points."relic.cli.sga"] +info = "relic.sga.core.cli:RelicSgaInfoCli" +pack = "relic.sga.core.cli:RelicSgaPackCli" +repack = "relic.sga.core.cli:RelicSgaRepackCli" +unpack = "relic.sga.core.cli:RelicSgaUnpackCli" + +[project.readme] +content-type = "text/markdown" +file = "README.md" + +[project.urls] +"Bug Tracker" = "https://github.com/MAK-Relic-Tool/Issue-Tracker/issues" +Homepage = "https://github.com/MAK-Relic-Tool/Relic-Tool-SGA-Core" + +[tool.mypy] +check_untyped_defs = true +disallow_any_generics = true +disallow_incomplete_defs = true +disallow_subclassing_any = true +disallow_untyped_calls = true +disallow_untyped_decorators = true +disallow_untyped_defs = true +extra_checks = true +mypy_path = "$MYPY_CONFIG_FILE_DIR/src" +no_implicit_optional = true +no_implicit_reexport = true +strict_equality = true +warn_redundant_casts = true +warn_return_any = true +warn_unused_configs = true +warn_unused_ignores = true + +[[tool.mypy.overrides]] +ignore_missing_imports = true +module = ["serialization_tools.*"] + +[[tool.mypy.overrides]] +ignore_missing_imports = true +module = ["pkg_resources.*"] + +[[tool.mypy.overrides]] +ignore_errors = true +ignore_missing_imports = true +module = ["tests.*"] + +[[tool.mypy.overrides]] +ignore_errors = true +ignore_missing_imports = true +module = ["docs.*"] + +[tool.semantic_release] +assets = [] +commit_message = "{version}\n\nAutomatically generated by python-semantic-release" +commit_parser = "angular" +logging_use_named_masks = false +major_on_zero = true +tag_format = "v{version}" + +[tool.semantic_release.branches.main] +match = "(main|master)" +prerelease = false +prerelease_token = "rc" + +[tool.semantic_release.changelog] +changelog_file = "CHANGELOG.md" +exclude_commit_patterns = [] +template_dir = "templates" + +[tool.semantic_release.changelog.environment] +autoescape = true +block_end_string = "%}" +block_start_string = "{%" +comment_end_string = "#}" +comment_start_string = "{#" +extensions = [] +keep_trailing_newline = false +lstrip_blocks = false +newline_sequence = "\n" +trim_blocks = false +variable_end_string = "}}" +variable_start_string = "{{" + +[tool.semantic_release.commit_author] +default = "semantic-release " +env = "GIT_COMMIT_AUTHOR" + +[tool.semantic_release.commit_parser_options] +allowed_tags = ["build", "chore", "ci", "docs", "feat", "fix", "perf", "style", "refactor", "test"] +default_bump_level = 0 +minor_tags = ["feat"] +patch_tags = ["fix", "perf"] + +[tool.semantic_release.publish] +dist_glob_patterns = ["dist/*"] +upload_to_vcs_release = true + +[tool.semantic_release.remote] +ignore_token_for_push = false +name = "origin" +type = "github" + +[tool.semantic_release.remote.token] +env = "GH_TOKEN" + +[tool.setuptools] +include-package-data = true +package-dir = {"" = "src"} + +[tool.setuptools.dynamic] +version = {attr = "relic.sga.core.__version__"} + +[tool.setuptools.packages.find] +namespaces = true +where = ["src"] diff --git a/requirements-dev.txt b/requirements-dev.txt index 294aed3..a852399 100644 Binary files a/requirements-dev.txt and b/requirements-dev.txt differ diff --git a/requirements.txt b/requirements.txt index 8883f38..1688b8e 100644 Binary files a/requirements.txt and b/requirements.txt differ diff --git a/setup.cfg b/setup.cfg deleted file mode 100644 index 4bee485..0000000 --- a/setup.cfg +++ /dev/null @@ -1,45 +0,0 @@ -[metadata] -name = relic-tool-sga-core -version = attr: relic.sga.core.__version__ - -author = Marcus Kertesz -description = Core files shared by versioned SGA files. -long_description = file: README.md -long_description_content_type = text/markdown -url = https://github.com/MAK-Relic-Tool/Relic-Tool-SGA-Core -project_urls = - Bug Tracker = https://github.com/MAK-Relic-Tool/Issue-Tracker/issues -classifiers = - Programming Language :: Python :: 3 :: Only - Programming Language :: Python :: 3.9 - Programming Language :: Python :: 3.10 - Programming Language :: Python :: 3.11 - License :: OSI Approved :: GNU General Public License v3 (GPLv3) - -[options] -include_package_data = True -package_dir = - = src -packages = find_namespace: -python_requires = >=3.9 - -install_requires = - mak-serialization-tools >= 2022.0a19 - relic-tool-core >= 1.1.1 - fs - -[options.entry_points] -fs.opener = - sga = relic.sga.core.filesystem:EssenceFSOpener - -relic.cli = - sga = relic.sga.core.cli:RelicSgaCli - -relic.cli.sga = - unpack = relic.sga.core.cli:RelicSgaUnpackCli - pack = relic.sga.core.cli:RelicSgaPackCli - repack = relic.sga.core.cli:RelicSgaRepackCli - info = relic.sga.core.cli:RelicSgaInfoCli - -[options.packages.find] -where = src \ No newline at end of file diff --git a/setup.py b/setup.py deleted file mode 100644 index 6b40b52..0000000 --- a/setup.py +++ /dev/null @@ -1,4 +0,0 @@ -from setuptools import setup - -if __name__ == '__main__': - setup() diff --git a/src/relic/sga/core/__init__.py b/src/relic/sga/core/__init__.py index 19757c1..504fbb4 100644 --- a/src/relic/sga/core/__init__.py +++ b/src/relic/sga/core/__init__.py @@ -1,6 +1,10 @@ -""" -Shared definitions used by several components of the module -""" -from relic.sga.core.definitions import Version, MagicWord, StorageType, VerificationType +"""Shared definitions used by several components of the module.""" + +from relic.sga.core.definitions import ( + Version, + MAGIC_WORD, + StorageType, + VerificationType, +) __version__ = "1.1.4" diff --git a/src/relic/sga/core/cli.py b/src/relic/sga/core/cli.py index 437489c..4c37bae 100644 --- a/src/relic/sga/core/cli.py +++ b/src/relic/sga/core/cli.py @@ -1,43 +1,66 @@ from __future__ import annotations import argparse -import datetime +import dataclasses +import json import os.path from argparse import ArgumentParser, Namespace -from typing import Optional, Callable, Dict, List, Any, Tuple, Set, TextIO +from json import JSONEncoder +from typing import Optional, Callable, Any, Dict -import fs.copy +from fs import open_fs from fs.base import FS -from fs.multifs import MultiFS -from relic.core.cli import CliPluginGroup, _SubParsersAction, CliPlugin +from fs.copy import copy_fs +from relic.core.cli import CliPluginGroup, _SubParsersAction, CliPlugin, RelicArgParser -from relic.sga.core.definitions import StorageType -from relic.sga.core.filesystem import EssenceFS, _EssenceDriveFS +from relic.sga.core.essencefs import EssenceFS - -class RelicSgaCli(CliPluginGroup): - GROUP = "relic.cli.sga" - - def _create_parser( - self, command_group: Optional[_SubParsersAction] = None - ) -> ArgumentParser: - if command_group is None: - return ArgumentParser("sga") - else: - return command_group.add_parser("sga") +_SUCCESS = 0 def _arg_exists_err(value: str) -> argparse.ArgumentTypeError: return argparse.ArgumentTypeError(f"The given path '{value}' does not exist!") +def _get_path_validator(exists: bool) -> Callable[[str], str]: + def _path_type(path: str) -> str: + path = os.path.abspath(path) + + def _step(_path: str) -> None: + parent, _ = os.path.split(_path) + + if len(parent) != 0 and parent != _path: + return _step(parent) + + if not os.path.exists(parent): + return None + + if os.path.isfile(parent): + raise argparse.ArgumentTypeError( + f"The given path '{path}' is not a valid path; it treats a file ({parent}) as a directory!" + ) + + return None + + if exists and not os.path.exists(path): + raise _arg_exists_err(path) + + _step(path) # we want step to validate; but we dont care about its result + + return path + + return _path_type + + def _get_dir_type_validator(exists: bool) -> Callable[[str], str]: + validate_path = _get_path_validator(False) + def _dir_type(path: str) -> str: + path = os.path.abspath(path) if not os.path.exists(path): if exists: raise _arg_exists_err(path) - else: - return path + return validate_path(path) if os.path.isdir(path): return path @@ -48,12 +71,14 @@ def _dir_type(path: str) -> str: def _get_file_type_validator(exists: Optional[bool]) -> Callable[[str], str]: + validate_path = _get_path_validator(False) + def _file_type(path: str) -> str: + path = os.path.abspath(path) if not os.path.exists(path): if exists: raise _arg_exists_err(path) - else: - return path + return validate_path(path) if os.path.isfile(path): return path @@ -63,15 +88,31 @@ def _file_type(path: str) -> str: return _file_type +class RelicSgaCli(CliPluginGroup): + GROUP = "relic.cli.sga" + + def _create_parser( + self, command_group: Optional[_SubParsersAction] = None + ) -> ArgumentParser: + name = "sga" + if command_group is None: + return RelicArgParser(name) + return command_group.add_parser(name) + + class RelicSgaUnpackCli(CliPlugin): def _create_parser( self, command_group: Optional[_SubParsersAction] = None ) -> ArgumentParser: parser: ArgumentParser + desc = """Unpack an SGA archive to the filesystem. + If only one root is present in the SGA, '--merge' is implied. + If multiple roots are in the SGA '--isolate' is implied. + Manually specify the flags to override this behaviour.""" if command_group is None: - parser = ArgumentParser("unpack") + parser = RelicArgParser("unpack", description=desc) else: - parser = command_group.add_parser("unpack") + parser = command_group.add_parser("unpack", description=desc) parser.add_argument( "src_sga", @@ -83,196 +124,166 @@ def _create_parser( type=_get_dir_type_validator(exists=False), help="Output Directory", ) + sga_root_flags = parser.add_mutually_exclusive_group() + + sga_root_flags.add_argument( + "-m", + "--merge", + help="SGA roots will always write to the same folder; specified by out_dir", + action="store_true", + ) + sga_root_flags.add_argument( + "-i", + "--isolate", + help="SGA roots will always write to separate folders, one per alias; located within out_dir", + action="store_true", + ) return parser def command(self, ns: Namespace) -> Optional[int]: infile: str = ns.src_sga outdir: str = ns.out_dir + merge: bool = ns.merge + isolate: bool = ns.isolate print(f"Unpacking `{infile}`") - def _callback(_1: FS, srcfile: str, _2: FS, _3: str) -> None: - print(f"\t\tUnpacking File `{srcfile}`") - - fs.copy.copy_fs(f"sga://{infile}", f"osfs://{outdir}", on_copy=_callback) - - return None # To shut-up mypy + def _callback(_1: FS, srcfile: str, _2: FS, dstfile: str) -> None: + print(f"\t\tUnpacking File `{srcfile}`\n\t\tWrote to `{dstfile}`") + + if merge: # we can short circuit the merge flag case + copy_fs( + f"sga://{infile}", + f"osfs://{outdir}", + on_copy=_callback, + preserve_time=True, + ) + return _SUCCESS + + # we need to open the archive to 'isolate' or to determine if we implicit merge + sga: EssenceFS + with open_fs(infile, default_protocol="sga") as sga: # type: ignore + roots = list(sga.iterate_fs()) + # Implicit merge; we reuse sga to avoid reopening the filesystem + if not isolate and len(roots) == 1: + copy_fs(sga, f"osfs://{outdir}", on_copy=_callback, preserve_time=True) + return _SUCCESS + + # Isolate or Implied Isolate + with open_fs(outdir, writeable=True, create=True) as osfs: + for alias, subfs in roots: + with osfs.makedir(alias, recreate=True) as osfs_subfs: + copy_fs( + subfs, osfs_subfs, on_copy=_callback, preserve_time=True + ) + + return _SUCCESS + + +class EssenceInfoEncoder(JSONEncoder): + def default(self, o: Any) -> Any: + if dataclasses.is_dataclass(o): + return dataclasses.asdict(o) + try: + return super().default(o) + except ( + TypeError + ): # Kinda bad; but we don't want to serialize, we want to print; so i think this is an acceptable tradeoff + return str(o) -class RelicSgaPackCli(CliPluginGroup): - GROUP = "relic.cli.sga.pack" +class RelicSgaInfoCli(CliPlugin): + _JSON_MINIFY_KWARGS: Dict[str, Any] = {"separators": (",", ":"), "indent": None} + _JSON_MAXIFY_KWARGS: Dict[str, Any] = {"separators": (", ", ": "), "indent": 4} def _create_parser( self, command_group: Optional[_SubParsersAction] = None ) -> ArgumentParser: parser: ArgumentParser + desc = """Reads an SGA Archive and extracts it's metadata to a json object. + If out_json is a directory; the name of the file will be '[name of sga].json' + """ if command_group is None: - parser = ArgumentParser("pack") + parser = RelicArgParser("info", description=desc) else: - parser = command_group.add_parser("pack") + parser = command_group.add_parser("info", description=desc) - # pack further delegates to version plugins + parser.add_argument( + "src_sga", + type=_get_file_type_validator(exists=True), + help="Source SGA File", + ) + parser.add_argument( + "out_json", + type=_get_path_validator(exists=False), + help="Output File or Directory", + ) + parser.add_argument( + "-m", + "--minify", + action="store_true", + default=False, + help="Minifies the resulting json by stripping whitespace, newlines, and indentations. Reduces filesize", + ) return parser + def command(self, ns: Namespace) -> Optional[int]: + infile: str = ns.src_sga + outjson: str = ns.out_json + minify: bool = ns.minify -class RelicSgaRepackCli(CliPluginGroup): - """An alternative to pack which 'repacks' an SGA. Intended for testing purposes.""" + print(f"Reading Info `{infile}`") - GROUP = "relic.cli.sga.repack" + # we need to open the archive to 'isolate' or to determine if we implicit merge + sga: EssenceFS + with open_fs(infile, default_protocol="sga") as sga: # type: ignore + info = sga.info_tree() + + outjson_dir, outjson_file = os.path.split(outjson) + if len(outjson_file) == 0: # Directory + # Get name of sga without extension, then add .json extension + outjson_dir = outjson + outjson_file = os.path.splitext(os.path.split(infile)[1])[0] + ".json" + + os.makedirs(outjson_dir, exist_ok=True) + outjson = os.path.join(outjson_dir, outjson_file) + + with open(outjson, "w", encoding=None) as info_h: + json_kwargs: Dict[str, Any] = ( + self._JSON_MINIFY_KWARGS if minify else self._JSON_MAXIFY_KWARGS + ) + json.dump(info, info_h, cls=EssenceInfoEncoder, **json_kwargs) + + return _SUCCESS + + +class RelicSgaPackCli(CliPluginGroup): + GROUP = "relic.cli.sga.pack" def _create_parser( self, command_group: Optional[_SubParsersAction] = None ) -> ArgumentParser: parser: ArgumentParser - desc = "Debug Command; reads and repacks an SGA archive." if command_group is None: - parser = ArgumentParser("repack", description=desc) + parser = RelicArgParser("pack") else: - parser = command_group.add_parser("repack", description=desc) - - # pack further delegates to version plugins + parser = command_group.add_parser("pack") return parser -class RelicSgaInfoCli(CliPlugin): +class RelicSgaRepackCli(CliPluginGroup): + GROUP = "relic.cli.sga.repack" + def _create_parser( self, command_group: Optional[_SubParsersAction] = None ) -> ArgumentParser: parser: ArgumentParser - description = "Dumps metadata packed into an SGA file." if command_group is None: - parser = ArgumentParser("info", description=description) + parser = RelicArgParser("repack") else: - parser = command_group.add_parser("info", description=description) + parser = command_group.add_parser("repack") - parser.add_argument( - "sga", - type=_get_file_type_validator(exists=True), - help="SGA File to inspect", - ) - parser.add_argument( - "log_file", - nargs="?", - type=_get_file_type_validator(exists=False), - help="Optional file to write messages to, required if `-q/--quiet` is used", - default=None, - ) - parser.add_argument( - "-q", - "--quiet", - action="store_true", - default=False, - help="When specified, SGA info is not printed to the console", - ) return parser - - def command(self, ns: Namespace) -> Optional[int]: - sga: str = ns.sga - log_file: str = ns.log_file - quiet: bool = ns.quiet - - logger: Optional[TextIO] = None - try: - if log_file is not None: - logger = open(log_file, "w") - - outputs: List[Optional[TextIO]] = [] - if quiet is False: - outputs.append(None) # None is a sentinel for stdout - if logger is not None: - outputs.append(logger) - - if len(outputs) == 0: - print( - "Please specify a `log_file` if using the `-q` or `--quiet` command" - ) - return 1 - - def _print( - *msg: str, sep: Optional[str] = None, end: Optional[str] = None - ) -> None: - for output in outputs: - print(*msg, sep=sep, end=end, file=output) - - def _is_container(d: Any) -> bool: - return isinstance(d, (Dict, List, Tuple, Set)) # type: ignore - - def _stringify(d: Any, indent: int = 0) -> None: - _TAB = "\t" - if isinstance(d, Dict): - for k, v in d.items(): - if _is_container(v): - _print(f"{_TAB * indent}{k}:") - _stringify(v, indent + 1) - else: - _print(f"{_TAB * indent}{k}: {v}") - elif isinstance(d, (List, Tuple, Set)): # type: ignore - _print(f"{_TAB * indent}{', '.join(*d)}") - else: - _print(f"{_TAB * indent}{d}") - - def _getessence(fs: FS, path: str = "/") -> Dict[str, Any]: - return fs.getinfo(path, "essence").raw.get("essence", {}) # type: ignore - - _print(f"File: `{sga}`") - sgafs: EssenceFS - with fs.open_fs(f"sga://{sga}") as sgafs: # type: ignore - _print("Archive Metadata:") - _stringify(sgafs.getmeta("essence"), indent=1) - - drive: _EssenceDriveFS - for alias, drive in sgafs.iterate_fs(): # type: ignore - _print(f"Drive: `{drive.name}` (`{drive.alias}`)") - _print("\tDrive Metadata:") - info = _getessence(drive) - if len(info) > 0: - _stringify(info, indent=2) - else: - _print(f"\t\tNo Metadata") - - _print("\tDrive Files Metadata:") - for f in drive.walk.files(): - _print(f"\t\t`{f}`:") - finfo: Dict[str, Any] = _getessence(drive, f) - finfo = finfo.copy() - # We alter storage_type cause it *should* always be present, if its not, we dont do anything - key = "storage_type" - if key in finfo: - stv: int = finfo[key] - st: StorageType = StorageType(stv) - finfo[key] = f"{stv} ({st.name})" - - # We alter modified too, cause when it is present, its garbage - key = "modified" - if key in finfo: - mtv: int = finfo[key] - mt = datetime.datetime.fromtimestamp( - mtv, datetime.timezone.utc - ) - finfo[key] = str(mt) - - # And CRC32 if it's in bytes; this should be removed ASAP tho # I only put this in because its such a minor patch to V2 - key = "crc32" - if key in finfo: - crcv: bytes = finfo[key] - if isinstance(crcv, bytes): - crc32 = int.from_bytes(crcv, "little", signed=False) - finfo[key] = crc32 - - if len(finfo) > 0: - _stringify(finfo, indent=3) - else: - _print(f"\t\t\tNo Metadata") - - finally: - if logger is not None: - logger.close() - - if log_file is not None: - print( - f"Saved to `{os.path.join(os.getcwd(), log_file)}`" - ) # DO NOT USE _PRINT - return None diff --git a/src/relic/sga/core/definitions.py b/src/relic/sga/core/definitions.py index dcfbeeb..b25fa83 100644 --- a/src/relic/sga/core/definitions.py +++ b/src/relic/sga/core/definitions.py @@ -1,23 +1,14 @@ -""" -Definitions expressed concretely in core -""" +"""Definitions expressed concretely in core.""" + from __future__ import annotations from dataclasses import dataclass from enum import Enum -from typing import ClassVar, BinaryIO, Any - -from relic.core.errors import MismatchError -from serialization_tools.magic import MagicWordIO -from serialization_tools.structx import Struct - -MagicWord = MagicWordIO(Struct("< 8s"), "_ARCHIVE".encode("ascii")) +from typing import Any, Tuple, Iterable, Union, List +from relic.core.serialization import MagicWord -def _validate_magic_word(self: MagicWordIO, stream: BinaryIO, advance: bool) -> None: - magic = self.read_magic_word(stream, advance) - if magic != self.word: - raise MismatchError("MagicWord", magic, self.word) +MAGIC_WORD = MagicWord(b"_ARCHIVE", name="SGA Magic Word") @dataclass @@ -32,90 +23,74 @@ class Version: major: int minor: int = 0 - LAYOUT: ClassVar[Struct] = Struct("<2H") - def __str__(self) -> str: return f"Version {self.major}.{self.minor}" + def __iter__(self) -> Iterable[int]: + yield self.major + yield self.minor + + def __len__(self) -> int: + return 2 + + def __getitem__(self, item: Union[int, slice]) -> Union[int, List[int]]: + return self.as_tuple()[item] + + def as_tuple(self) -> Tuple[int, int]: + return tuple(self) # type: ignore + def __eq__(self, other: object) -> bool: - return ( - isinstance(other, Version) - and self.major == other.major - and self.minor == other.minor + return self.as_tuple() == ( + other.as_tuple() if isinstance(other, Version) else other + ) + + def __ne__(self, other: object) -> bool: + return self.as_tuple() != ( + other.as_tuple() if isinstance(other, Version) else other ) def __lt__(self, other: Any) -> bool: - if isinstance(other, Version): - return self.major < other.major or ( - self.major == other.major and self.minor < other.minor - ) - raise TypeError(f"Other is not an instance of `{self.__class__}`!") + cmp: bool = self.as_tuple() < ( + other.as_tuple() if isinstance(other, Version) else other + ) + return cmp def __gt__(self, other: Any) -> bool: - if isinstance(other, Version): - return self.major > other.major or ( - self.major == other.major and self.minor > other.minor - ) - raise TypeError(f"Other is not an instance of `{self.__class__}`!") + cmp: bool = self.as_tuple() > ( + other.as_tuple() if isinstance(other, Version) else other + ) + return cmp def __le__(self, other: Any) -> bool: - if isinstance(other, Version): - return self.major < other.major or ( - self.major == other.major and self.minor <= other.minor - ) - raise TypeError(f"Other is not an instance of `{self.__class__}`!") + cmp: bool = self.as_tuple() <= ( + other.as_tuple() if isinstance(other, Version) else other + ) + return cmp def __ge__(self, other: Any) -> bool: - if isinstance(other, Version): - return self.major > other.major or ( - self.major == other.major and self.minor >= other.minor - ) - raise TypeError(f"Other is not an instance of `{self.__class__}`!") + cmp: bool = self.as_tuple() >= ( + other.as_tuple() if isinstance(other, Version) else other + ) + return cmp def __hash__(self) -> int: - # if this was C we could guarantee the hash was unique - # because major/minor would both be 16 bits and the hash would be 32 - # Since python doesn't allow that we just assume data isn't garbage; - # garbage in => garbage out after all - return self.major << 16 + self.minor - - @classmethod - def unpack(cls, stream: BinaryIO) -> Version: - """ - Reads a version from the stream. - :param stream: Data stream to read from. - :return: A new Version instance. - """ - layout: Struct = cls.LAYOUT - args = layout.unpack_stream(stream) - return cls(*args) - - def pack(self, stream: BinaryIO) -> int: - """ - Writes the version to the stream. - :param stream: Data stream to write to. - :return: Number of bytes written. - """ - layout: Struct = self.LAYOUT - args = (self.major, self.minor) - packed: int = layout.pack_stream(stream, *args) - return packed + return self.as_tuple().__hash__() class StorageType(int, Enum): - """ - Specifies whether data is stored as a 'raw blob' or as a 'zlib compressed blob' - """ + """Specifies whether data is stored as a 'raw blob' or as a 'zlib compressed + blob'.""" + # According to modpackager STORE = 0 - BUFFER_COMPRESS = 1 - STREAM_COMPRESS = 2 + STREAM_COMPRESS = 1 + BUFFER_COMPRESS = 2 -class VerificationType(int, Enum): - """ - A 'Flag' used to specify how the data's Redundancy Check is stored. - """ +class VerificationType( + int, Enum +): # TODO; consider not sharing this; this is format specific and wasn't introduced until V4? It could be reimplemented in each version; since each version may have different values + """A 'Flag' used to specify how the data's Redundancy Check is stored.""" NONE = 0 # unknown real values, assuming incremental CRC = 1 # unknown real values, assuming incremental @@ -124,4 +99,4 @@ class VerificationType(int, Enum): SHA1_BLOCKS = 4 # unknown real values, assuming incremental -__all__ = ["MagicWord", "Version", "StorageType", "VerificationType"] +__all__ = ["MAGIC_WORD", "Version", "StorageType", "VerificationType"] diff --git a/src/relic/sga/core/errors.py b/src/relic/sga/core/errors.py index c762715..04180cb 100644 --- a/src/relic/sga/core/errors.py +++ b/src/relic/sga/core/errors.py @@ -1,38 +1,31 @@ -""" -Error definitions for the SGA API -""" -from typing import List, Optional +"""Error definitions for the SGA API.""" + +from typing import List, Optional, Generic, TypeVar from relic.core.errors import MismatchError, RelicToolError from relic.sga.core.definitions import Version +_T = TypeVar("_T") -class VersionMismatchError(MismatchError[Version]): - """ - A version did not match the version expected. - """ - def __init__( - self, received: Optional[Version] = None, expected: Optional[Version] = None - ): - super().__init__("Version", received, expected) +class MagicMismatchError(MismatchError[bytes]): + """The archive did not specify the correct magic word.""" + + def __init__(self, received: Optional[bytes], expected: Optional[bytes] = None): + super().__init__("Magic Word", received, expected) -class MD5MismatchError(MismatchError[bytes]): - """ - An archive or file did not pass the redundancy check. - """ +class VersionMismatchError(MismatchError[Version]): + """A version did not match the version expected.""" def __init__( - self, received: Optional[bytes] = None, expected: Optional[bytes] = None + self, received: Optional[Version] = None, expected: Optional[Version] = None ): - super().__init__("MD5", received, expected) + super().__init__("Version", received, expected) class VersionNotSupportedError(RelicToolError): - """ - An unknown version was provided. - """ + """An unknown version was provided.""" def __init__(self, received: Version, allowed: List[Version]): super().__init__() @@ -48,17 +41,33 @@ def str_ver(version: Version) -> str: # dont use str(version); too verbose class DecompressedSizeMismatch(MismatchError[int]): - """ - A file was decompressed, but did not pass the redundancy check. - """ + """A file was decompressed, but did not match the expected size.""" def __init__(self, received: Optional[int] = None, expected: Optional[int] = None): super().__init__("Decompressed Size", received, expected) +class HashMismatchError(MismatchError[_T], Generic[_T]): + """A sentinel class for catching all hash mismatch errors.""" + + +class Md5MismatchError(HashMismatchError[bytes]): ... + + +class Crc32MismatchError(HashMismatchError[int]): ... + + +class Sha1MismatchError(HashMismatchError[bytes]): # + ... + + __all__ = [ "VersionMismatchError", - "MD5MismatchError", "VersionNotSupportedError", "DecompressedSizeMismatch", + "HashMismatchError", + "Md5MismatchError", + "Crc32MismatchError", + "Sha1MismatchError", + "MagicMismatchError", ] diff --git a/src/relic/sga/core/essencefs/__init__.py b/src/relic/sga/core/essencefs/__init__.py new file mode 100644 index 0000000..a6e670c --- /dev/null +++ b/src/relic/sga/core/essencefs/__init__.py @@ -0,0 +1,10 @@ +from __future__ import annotations + +from relic.sga.core.essencefs.definitions import EssenceFS +from relic.sga.core.essencefs.opener import ( + EssenceFsOpenerPlugin, + EssenceFsOpener, + open_sga, +) + +__all__ = ["EssenceFS", "EssenceFsOpener", "EssenceFsOpenerPlugin", "open_sga"] diff --git a/src/relic/sga/core/essencefs/definitions.py b/src/relic/sga/core/essencefs/definitions.py new file mode 100644 index 0000000..ccbb082 --- /dev/null +++ b/src/relic/sga/core/essencefs/definitions.py @@ -0,0 +1,20 @@ +from __future__ import annotations + +from abc import ABC +from typing import Iterator, Tuple, Any, Dict + +from fs.base import FS + + +class EssenceFS(FS, ABC): + def iterate_fs(self) -> Iterator[Tuple[str, FS]]: + raise NotImplementedError + + def info_tree(self, **options: Any) -> Dict[str, Any]: + """Get a dictionary of the Filesystem tree, containing metadata for + files/folders, 'drives' and the root archive. + + :rtype: Dict[str,Any] + :returns: A dictionary representing the file system tree and it's metadata + """ + raise NotImplementedError diff --git a/src/relic/sga/core/essencefs/opener.py b/src/relic/sga/core/essencefs/opener.py new file mode 100644 index 0000000..eae7516 --- /dev/null +++ b/src/relic/sga/core/essencefs/opener.py @@ -0,0 +1,137 @@ +from __future__ import annotations + +import os +from os.path import expanduser +from typing import ( + Dict, + Optional, + Protocol, + BinaryIO, + TypeVar, + List, + Iterable, + Union, + Type, +) + +import fs.opener +from fs.opener import Opener +from fs.opener.errors import OpenerError +from fs.opener.parse import ParseResult +from relic.core.errors import RelicToolError +from relic.core.lazyio import BinaryProxy, get_proxy +from relic.core.entrytools import EntrypointRegistry + +from relic.sga.core.definitions import Version, MAGIC_WORD +from relic.sga.core.essencefs.definitions import EssenceFS +from relic.sga.core.serialization import ( + VersionSerializer, +) + +_TEssenceFS = TypeVar("_TEssenceFS", bound=EssenceFS) + + +# Reimplement Opener as a Typed-Protocol # Ugly, but it's my ugly +# This should allow it to be used as an opener; or as a plugin-only opener for the EssenceFSOpener +class EssenceFsOpenerPlugin(Protocol[_TEssenceFS]): # type: ignore + @property + def protocols(self) -> List[str]: + raise NotImplementedError + + @property + def versions(self) -> List[Version]: + raise NotImplementedError + + def __repr__(self) -> str: + raise NotImplementedError + + def open_fs( + self, + fs_url: str, + parse_result: ParseResult, + writeable: bool, + create: bool, + cwd: str, + ) -> _TEssenceFS: + raise NotImplementedError + + +def _get_version(file: Union[BinaryProxy, BinaryIO], advance: bool = False) -> Version: + binio = get_proxy(file) + start = binio.tell() + MAGIC_WORD.validate(binio, advance=True) + version = VersionSerializer.read(binio) + if not advance: + binio.seek(start, os.SEEK_CUR) + return version + + +class EssenceFsOpener( + EntrypointRegistry[Version, EssenceFsOpenerPlugin[_TEssenceFS]], Opener +): + EP_GROUP = "relic.sga.opener" + + protocols = ["sga"] + + def __init__( + self, + # data: Optional[Dict[Version, EssenceFsOpenerPlugin]] = None, + autoload: bool = True, + ): + super().__init__( + entry_point_path=self.EP_GROUP, + key_func=self._version2key, # type: ignore # WHY? + auto_key_func=self._val2keys, + # data=data, + autoload=autoload, + ) + + @staticmethod + def _version2key(version: Version) -> str: + return f"v{version.major}.{version.minor}" + + @staticmethod + def _value2keys(plugin: EssenceFsOpenerPlugin[_TEssenceFS]) -> Iterable[Version]: + yield from plugin.versions + + def open_fs( + self, + fs_url: str, + parse_result: ParseResult, + writeable: bool, + create: bool, + cwd: str, + ) -> EssenceFS: + # All EssenceFS should be writable; so we can ignore that + + if parse_result.resource == "": + if create: + raise RelicToolError( + "Cannot create an SGA from fs.open_fs or relic.sga.core.essencefs.open_sga;" + " please manually create an empty FS object from an appropriate SGA Plugin." + ) + raise fs.opener.errors.OpenerError( + "No path was given and opener not marked for 'create'!" + ) + + path = os.path.abspath(os.path.join(cwd, expanduser(parse_result.resource))) + with open(path, "rb") as peeker: + version = _get_version( + peeker, True + ) # advance is true to avoid unnecessary seek + try: + opener: Union[Type[EssenceFsOpenerPlugin], EssenceFsOpenerPlugin] = self[version] # type: ignore + except KeyError as e: + raise RelicToolError( + f"Version {version} not supported! Supported SGA Versions '{list(self.keys())}'." + ) + + if isinstance(opener, type): + opener: EssenceFsOpenerPlugin = opener() # type: ignore + + return opener.open_fs(fs_url, parse_result, writeable, create, cwd) # type: ignore + + +registry: EssenceFsOpener[EssenceFS] = EssenceFsOpener() + +open_sga = registry.open_fs diff --git a/src/relic/sga/core/filesystem.py b/src/relic/sga/core/filesystem.py deleted file mode 100644 index 9ec8ac1..0000000 --- a/src/relic/sga/core/filesystem.py +++ /dev/null @@ -1,369 +0,0 @@ -from __future__ import annotations - -import abc -import os -from os.path import expanduser -from typing import ( - Optional, - Dict, - Any, - BinaryIO, - Text, - Collection, - Mapping, - cast, - Protocol, - TypeVar, - Generic, - runtime_checkable, -) - -import fs.opener.errors -import pkg_resources -from fs import ResourceType, errors -from fs.base import FS -from fs.info import Info -from fs.memoryfs import MemoryFS, _DirEntry, _MemoryFile -from fs.multifs import MultiFS -from fs.opener import Opener, registry as fs_registry -from fs.opener.parse import ParseResult -from fs.path import split -from fs.permissions import Permissions -from fs.subfs import SubFS - -from relic.sga.core.definitions import Version, MagicWord, _validate_magic_word -from relic.sga.core.errors import VersionNotSupportedError - -ESSENCE_NAMESPACE = "essence" - -TKey = TypeVar("TKey") -TValue = TypeVar("TValue") - - -class EntrypointRegistry(Generic[TKey, TValue]): - def __init__(self, entry_point_path: str, autoload: bool = False): - self._entry_point_path = entry_point_path - self._mapping: Dict[TKey, TValue] = {} - self._autoload = autoload - - def register(self, key: TKey, value: TValue) -> None: - self._mapping[key] = value - - @abc.abstractmethod - def auto_register(self, value: TValue) -> None: - raise NotImplementedError - - def get(self, key: TKey, default: Optional[TValue] = None) -> Optional[TValue]: - if key in self._mapping: - return self._mapping[key] - - if self._autoload: - try: - entry_point = next( - pkg_resources.iter_entry_points( - self._entry_point_path, self._key2entry_point_path(key) - ) - ) - except StopIteration: - entry_point = None - if entry_point is None: - return default - self._auto_register_entrypoint(entry_point) - if key not in self._mapping: - raise NotImplementedError # TODO specify autoload failed to load in a usable value - return self._mapping[key] - return default - - @abc.abstractmethod - def _key2entry_point_path(self, key: TKey) -> str: - raise NotImplementedError - - def _auto_register_entrypoint(self, entry_point: Any) -> None: - try: - entry_point_result = entry_point.load() - except: # Wrap in exception - raise - return self._register_entrypoint(entry_point_result) - - @abc.abstractmethod - def _register_entrypoint(self, entry_point_result: Any) -> None: - raise NotImplementedError - - -@runtime_checkable -class EssenceFSHandler(Protocol): - version: Version - - def read(self, stream: BinaryIO) -> EssenceFS: - raise NotImplementedError - - def write(self, stream: BinaryIO, essence_fs: EssenceFS) -> int: - raise NotImplementedError - - -class EssenceFSFactory(EntrypointRegistry[Version, EssenceFSHandler]): - def _key2entry_point_path(self, key: Version) -> str: - return f"v{key.major}.{key.minor}" - - def _register_entrypoint(self, entry_point_result: Any) -> None: - if isinstance(entry_point_result, EssenceFSHandler): - self.auto_register(entry_point_result) - elif isinstance(entry_point_result, (list, tuple, Collection)): - version, handler = entry_point_result - if not isinstance(handler, EssenceFSHandler): - handler = handler() - self.register(version, handler) - else: - # Callable; register nested result - self._register_entrypoint(entry_point_result()) - - def auto_register(self, value: EssenceFSHandler) -> None: - self.register(value.version, value) - - def __init__(self, autoload: bool = True) -> None: - super().__init__("relic.sga.handler", autoload) - - @staticmethod - def _read_magic_and_version(sga_stream: BinaryIO) -> Version: - # sga_stream.seek(0) - jump_back = sga_stream.tell() - _validate_magic_word(MagicWord, sga_stream, advance=True) - version = Version.unpack(sga_stream) - sga_stream.seek(jump_back) - return version - - def _get_handler(self, version: Version) -> EssenceFSHandler: - handler = self.get(version) - if handler is None: - # This may raise a 'false positive' if a Null handler is registered - raise VersionNotSupportedError(version, list(self._mapping.keys())) - return handler - - def _get_handler_from_stream( - self, sga_stream: BinaryIO, version: Optional[Version] = None - ) -> EssenceFSHandler: - if version is None: - version = self._read_magic_and_version(sga_stream) - return self._get_handler(version) - - def _get_handler_from_fs( - self, sga_fs: EssenceFS, version: Optional[Version] = None - ) -> EssenceFSHandler: - if version is None: - sga_version: Dict[str, int] = sga_fs.getmeta("essence").get("version") # type: ignore - version = Version(sga_version["major"], sga_version["minor"]) - return self._get_handler(version) - - def read( - self, sga_stream: BinaryIO, version: Optional[Version] = None - ) -> EssenceFS: - handler = self._get_handler_from_stream(sga_stream, version) - return handler.read(sga_stream) - - def write( - self, sga_stream: BinaryIO, sga_fs: EssenceFS, version: Optional[Version] = None - ) -> int: - handler = self._get_handler_from_fs(sga_fs, version) - return handler.write(sga_stream, sga_fs) - - -registry = EssenceFSFactory(True) - - -# @fs_registry.install -# Can't use decorator; it breaks subclassing for entrypoints -class EssenceFSOpener(Opener): - def __init__(self, factory: Optional[EssenceFSFactory] = None): - if factory is None: - factory = registry - self.factory = factory - - protocols = ["sga"] - - def open_fs( - self, - fs_url: str, - parse_result: ParseResult, - writeable: bool, - create: bool, - cwd: str, - ) -> FS: - # All EssenceFS should be writable; so we can ignore that - - # Resolve Path - if fs_url == "sga://": - if create: - return EssenceFS() - else: - raise fs.opener.errors.OpenerError( - "No path was given and opener not marked for 'create'!" - ) - - _path = os.path.abspath(os.path.join(cwd, expanduser(parse_result.resource))) - path = os.path.normpath(_path) - - # Create will always create a new EssenceFS if needed - try: - with open(path, "rb") as sga_file: - return self.factory.read(sga_file) - except FileNotFoundError as e: - if create: - return EssenceFS() - else: - raise - - -fs_registry.install(EssenceFSOpener) - - -class _EssenceFile(_MemoryFile): - ... # I plan on allowing lazy file loading from the archive; I'll likely need to implement this to do that - - -class _EssenceDirEntry(_DirEntry): - def __init__(self, resource_type: ResourceType, name: Text): - super().__init__(resource_type, name) - self.essence: Dict[str, object] = {} - - def to_info(self, namespaces=None): - # type: (Optional[Collection[Text]]) -> Info - info = super().to_info(namespaces) - if namespaces is not None and ESSENCE_NAMESPACE in namespaces: - info_dict = dict(info.raw) - info_dict[ESSENCE_NAMESPACE] = self.essence.copy() - info = Info(info_dict) - return info - - -class _EssenceDriveFS(MemoryFS): - def __init__(self, alias: str, name: str) -> None: - super().__init__() - self.alias = alias - self.name = name - - def _make_dir_entry( - self, resource_type: ResourceType, name: str - ) -> _EssenceDirEntry: - return _EssenceDirEntry(resource_type, name) - - def validatepath(self, path: str) -> str: - if ":" in path: - parts = path.split(":", 1) - if parts[0].replace("\\", "/")[0] == "/": - parts[0] = parts[0].replace("\\", "/")[1:] - if parts[0] != self.alias: - raise fs.errors.InvalidPath( - path, - f"Alias `{parts[0]}` does not math the Drive's Alias `{self.alias}`", - ) - fixed_path = parts[1] - else: - fixed_path = path - return super().validatepath(fixed_path).replace("\\", "/") - - def setinfo(self, path: str, info: Mapping[str, Mapping[str, object]]) -> None: - _path = self.validatepath(path) - with self._lock: - dir_path, file_name = split(_path) - parent_dir_entry = self._get_dir_entry(dir_path) - - if parent_dir_entry is None or file_name not in parent_dir_entry: - raise errors.ResourceNotFound(path) - - resource_entry = cast( - _EssenceDirEntry, parent_dir_entry.get_entry(file_name) - ) - - if "details" in info: - details = info["details"] - if "accessed" in details: - resource_entry.accessed_time = details["accessed"] # type: ignore - if "modified" in details: - resource_entry.modified_time = details["modified"] # type: ignore - - if ESSENCE_NAMESPACE in info and not resource_entry.is_dir: - essence = info[ESSENCE_NAMESPACE] - resource_entry.essence.clear() - resource_entry.essence.update(essence) - - # if LAZY_NAMESPACE in info and not resource_entry.is_dir: - # lazy - - def getinfo( - self, path, namespaces=None - ): # type: (Text, Optional[Collection[Text]]) -> Info - info = super().getinfo(path, namespaces) - - _path = self.validatepath(path) - if _path == "/" and ( - namespaces is not None and ESSENCE_NAMESPACE in namespaces - ): - raw_info = dict(info.raw) - essence_ns = raw_info[ESSENCE_NAMESPACE] = {} - essence_ns["alias"] = self.alias - essence_ns["name"] = self.name - info = Info(raw_info) - return info - - def getessence(self, path: str) -> Info: - return self.getinfo(path, [ESSENCE_NAMESPACE]) - - def makedirs( - self, - path, # type: Text - permissions=None, # type: Optional[Permissions] - recreate=False, # type: bool - ): # type: (...) -> SubFS[FS] - _path = path.replace("\\", "/") # Coerce path seperator - return super().makedirs(_path, permissions, recreate) - - -class EssenceFS(MultiFS): - def __init__(self) -> None: - super().__init__() - self._sga_meta: Dict[str, object] = {} - - def getmeta(self, namespace: str = "standard") -> Mapping[str, object]: - if namespace == ESSENCE_NAMESPACE: - return self._sga_meta.copy() - return super().getmeta(namespace) - - def setmeta(self, meta: Dict[str, Any], namespace: str = "standard") -> None: - if namespace == ESSENCE_NAMESPACE: - self._sga_meta = meta.copy() - else: - raise NotImplementedError - - def getessence(self, path: str) -> Info: - return self.getinfo(path, [ESSENCE_NAMESPACE]) - - def create_drive(self, alias: str, name: str) -> _EssenceDriveFS: - drive = _EssenceDriveFS(alias, name) - first_drive = len([*self.iterate_fs()]) == 0 - self.add_fs( - alias, drive, write=first_drive - ) # TODO see if name would work here, using alias because that is what it originally was - return drive - - def _delegate(self, path): - # type: (Text) -> Optional[FS] - # Resolve path's drive, if present, - # otherwise; use underlying FS - if ":" in path: - parts = path.split(":", 1) - return self.get_fs(parts[0]) - - return super()._delegate(path) - - -__all__ = [ - "ESSENCE_NAMESPACE", - "EssenceFSHandler", - "EssenceFSFactory", - "_EssenceFile", - "_EssenceDirEntry", - "_EssenceDriveFS", - "EssenceFS", - "registry", - "EssenceFSOpener", -] diff --git a/src/relic/sga/core/hashtools.py b/src/relic/sga/core/hashtools.py new file mode 100644 index 0000000..197edbe --- /dev/null +++ b/src/relic/sga/core/hashtools.py @@ -0,0 +1,150 @@ +import hashlib +import zlib +from typing import BinaryIO, Optional, Generic, TypeVar, Type, Union, Protocol + +from relic.core.lazyio import read_chunks + +from relic.sga.core.errors import ( + HashMismatchError, + Md5MismatchError, + Crc32MismatchError, + Sha1MismatchError, +) + +_T_CON = TypeVar("_T_CON", contravariant=True) +_T = TypeVar("_T") + +Hashable = Union[BinaryIO, bytes, bytearray] + + +class _HasherHashFunc(Protocol[_T]): # pylint disable: too-few-public-methods + def __call__( + self, + stream: Hashable, + *, + start: Optional[int] = None, + size: Optional[int] = None, + eigen: Optional[_T] = None, + ) -> _T: + raise NotImplementedError + + +class Hasher(Generic[_T]): + def __init__( + self, + hasher_name: str, + hash_func: _HasherHashFunc[_T], + default_err_cls: Type[HashMismatchError[_T]] = HashMismatchError, + ): + self._hasher_name = hasher_name + self._default_err_cls = default_err_cls + self._hash_func = hash_func + if not hasattr(self, "__name__"): + self.__name__ = self._hasher_name + + def __call__( + self, + stream: Hashable, + *, + start: Optional[int] = None, + size: Optional[int] = None, + eigen: Optional[_T] = None, + ) -> _T: + return self.hash(stream=stream, start=start, size=size, eigen=eigen) + + def hash( + self, + stream: Hashable, + *, + start: Optional[int] = None, + size: Optional[int] = None, + eigen: Optional[_T] = None, + ) -> _T: + return self._hash_func(stream=stream, start=start, size=size, eigen=eigen) + + def check( + self, + stream: Hashable, + expected: _T, + *, + start: Optional[int] = None, + size: Optional[int] = None, + eigen: Optional[_T] = None, + ) -> bool: + result = self.hash(stream=stream, start=start, size=size, eigen=eigen) + return result == expected + + def validate( + self, + stream: Hashable, + expected: _T, + *, + start: Optional[int] = None, + size: Optional[int] = None, + eigen: Optional[_T] = None, + err_cls: Optional[Type[HashMismatchError[_T]]] = None, + name: Optional[str] = None, + ) -> None: + result = self.hash(stream=stream, start=start, size=size, eigen=eigen) + if result != expected: + if err_cls is None: + err_cls = self._default_err_cls + + raise err_cls( + name if name is not None else self._hasher_name, result, expected + ) + + +def _md5( + stream: Hashable, + *, + start: Optional[int] = None, + size: Optional[int] = None, + eigen: Optional[bytes] = None, +) -> bytes: + hasher = ( + hashlib.md5(eigen, usedforsecurity=False) + if eigen is not None + else hashlib.md5(usedforsecurity=False) + ) + for chunk in read_chunks(stream, start, size): + hasher.update(chunk) + return hasher.digest() + + +def _crc32( + stream: Hashable, + *, + start: Optional[int] = None, + size: Optional[int] = None, + eigen: Optional[int] = None, +) -> int: + crc = eigen if eigen is not None else 0 + for chunk in read_chunks(stream, start, size): + crc = zlib.crc32(chunk, crc) + return crc + + +def _sha1( + stream: Hashable, + *, + start: Optional[int] = None, + size: Optional[int] = None, + eigen: Optional[bytes] = None, +) -> bytes: + hasher = ( + hashlib.sha1(eigen, usedforsecurity=False) + if eigen is not None + else hashlib.sha1(usedforsecurity=False) + ) + for chunk in read_chunks(stream, start, size): + hasher.update(chunk) + return hasher.digest() + + +# Create hashers bound to their hash method +md5 = Hasher("MD5", _md5, Md5MismatchError) +crc32 = Hasher("CRC-32", _crc32, Crc32MismatchError) +sha1 = Hasher("SHA-1", _sha1, Sha1MismatchError) + +__all__ = ["Hashable", "md5", "crc32", "sha1", "Hasher"] diff --git a/src/relic/sga/core/protocols.py b/src/relic/sga/core/protocols.py deleted file mode 100644 index 17f6a95..0000000 --- a/src/relic/sga/core/protocols.py +++ /dev/null @@ -1,42 +0,0 @@ -""" -Defines protocols that the SGA API uses. -""" -from __future__ import annotations - -from typing import ( - TypeVar, - Protocol, - BinaryIO, - runtime_checkable, -) - -T = TypeVar("T") - - -@runtime_checkable -class StreamSerializer(Protocol[T]): - """Serializes the Type to/from a binary stream.""" - - def unpack(self, stream: BinaryIO) -> T: - """ - Converts binary data from the stream to parsed data. - - :param stream: The stream to read from. - - :return: The parsed data. - """ - raise NotImplementedError - - def pack(self, stream: BinaryIO, value: T) -> int: - """ - Converts binary data from the stream to parsed data. - - :param stream: The stream to write to. - :param value: The data to convert to binary. - - :return: The number of bytes written. - """ - raise NotImplementedError - - -__all__ = ["T", "StreamSerializer"] diff --git a/src/relic/sga/core/serialization.py b/src/relic/sga/core/serialization.py index 3309f17..7caa19d 100644 --- a/src/relic/sga/core/serialization.py +++ b/src/relic/sga/core/serialization.py @@ -1,985 +1,625 @@ from __future__ import annotations -import hashlib -import typing -import zlib -from dataclasses import dataclass from io import BytesIO from typing import ( BinaryIO, - List, - Dict, - Optional, - Callable, + ClassVar, Tuple, - Iterable, - TypeVar, Generic, Type, + Optional, + List, + Protocol, + Union, + Iterable, + TypeVar, + Dict, + Literal, + Iterator, ) -from fs.base import FS - -from serialization_tools.size import KiB, MiB -from serialization_tools.structx import Struct - -from relic.sga.core.definitions import ( - StorageType, - Version, - MagicWord, - _validate_magic_word, -) -from relic.sga.core.errors import ( - MD5MismatchError, - VersionMismatchError, - DecompressedSizeMismatch, +from relic.core.errors import RelicToolError +from relic.core.lazyio import ( + BinaryWindow, + tell_end, + BinaryProxySerializer, + BinaryProxy, + BinaryWrapper, + BinarySerializer, ) -from relic.sga.core.filesystem import EssenceFS, _EssenceDriveFS, EssenceFSHandler -from relic.sga.core.protocols import StreamSerializer, T +from relic.sga.core.definitions import Version, StorageType -@dataclass -class TocBlock: - drive_info: Tuple[int, int] - folder_info: Tuple[int, int] - file_info: Tuple[int, int] - name_info: Tuple[int, int] +_T = TypeVar("_T") - @classmethod - def default(cls) -> TocBlock: - null_pair = (0, 0) - return cls(null_pair, null_pair, null_pair, null_pair) - - -class TocHeaderSerializer(StreamSerializer[TocBlock]): - def __init__(self, layout: Struct): - self.layout = layout - - def unpack(self, stream: BinaryIO) -> TocBlock: - ( - drive_pos, - drive_count, - folder_pos, - folder_count, - file_pos, - file_count, - name_pos, - name_count, - ) = self.layout.unpack_stream(stream) - - return TocBlock( - (drive_pos, drive_count), - (folder_pos, folder_count), - (file_pos, file_count), - (name_pos, name_count), - ) +_NULL_PTR = (None, None) - def pack(self, stream: BinaryIO, value: TocBlock) -> int: - args = ( - value.drive_info[0], - value.drive_info[1], - value.folder_info[0], - value.folder_info[1], - value.file_info[0], - value.file_info[1], - value.name_info[0], - value.name_info[1], - ) - packed: int = self.layout.pack_stream(stream, *args) - return packed - - -@dataclass -class DriveDef: - alias: str - name: str - root_folder: int - folder_range: Tuple[int, int] - file_range: Tuple[int, int] - - -class DriveDefSerializer(StreamSerializer[DriveDef]): - def __init__(self, layout: Struct): - self.layout = layout - - def unpack(self, stream: BinaryIO) -> DriveDef: - encoded_alias: bytes - encoded_name: bytes - ( - encoded_alias, - encoded_name, - folder_start, - folder_end, - file_start, - file_end, - root_folder, - ) = self.layout.unpack_stream(stream) - alias: str = encoded_alias.rstrip(b"\0").decode("ascii") - name: str = encoded_name.rstrip(b"\0").decode("ascii") - folder_range = (folder_start, folder_end) - file_range = (file_start, file_end) - return DriveDef( - alias=alias, - name=name, - root_folder=root_folder, - folder_range=folder_range, - file_range=file_range, - ) - def pack(self, stream: BinaryIO, value: DriveDef) -> int: - alias: bytes = value.alias.encode("ascii") - name: bytes = value.name.encode("ascii") - args = ( - alias, - name, - value.folder_range[0], - value.folder_range[1], - value.file_range[0], - value.file_range[1], - value.root_folder, - ) - packed: int = self.layout.pack_stream(stream, *args) - return packed - - -@dataclass -class FolderDef: - name_pos: int - folder_range: Tuple[int, int] - file_range: Tuple[int, int] - - -class FolderDefSerializer(StreamSerializer[FolderDef]): - def __init__(self, layout: Struct): - self.layout = layout - - def unpack(self, stream: BinaryIO) -> FolderDef: - ( - name_pos, - folder_start, - folder_end, - file_start, - file_end, - ) = self.layout.unpack_stream(stream) - folder_range = (folder_start, folder_end) - file_range = (file_start, file_end) - return FolderDef( - name_pos=name_pos, folder_range=folder_range, file_range=file_range - ) +def _safe_get_parent_name( + parent: BinaryIO, default: Optional[str] = None +) -> Optional[str]: + return default if not hasattr(parent, "name") else parent.name - def pack(self, stream: BinaryIO, value: FolderDef) -> int: - args = ( - value.name_pos, - value.folder_range[0], - value.folder_range[1], - value.file_range[0], - value.file_range[1], - ) - packed: int = self.layout.pack_stream(stream, *args) - return packed +class ArchivePtrs(Protocol): + @property + def toc_pos(self) -> int: + raise NotImplementedError -@dataclass -class MetaBlock: - name: str - ptrs: ArchivePtrs + @property + def toc_size(self) -> int: + raise NotImplementedError + @property + def data_pos(self) -> int: + raise NotImplementedError -# TMetadata = TypeVar("TMetadata") -TMetaBlock = TypeVar("TMetaBlock", bound=MetaBlock) -TTocMetaBlock = TypeVar("TTocMetaBlock") + @property + def data_size(self) -> Optional[int]: + raise NotImplementedError -@dataclass -class FileDef: - name_pos: int - data_pos: int - length_on_disk: int - length_in_archive: int - storage_type: StorageType +class SgaHeader(BinaryProxySerializer, ArchivePtrs): + def __init__(self, parent: BinaryIO): + super().__init__(parent) + @property + def name(self) -> str: + raise NotImplementedError -TFileDef = TypeVar("TFileDef", bound=FileDef) -AssembleFileMetaFunc = Callable[[TFileDef], Dict[str, object]] -DisassembleFileMetaFunc = Callable[[Dict[str, object]], TFileDef] -AssembleMetaFunc = Callable[ - [BinaryIO, TMetaBlock, Optional[TTocMetaBlock]], Dict[str, object] -] -DisassembleMetaFunc = Callable[ - [BinaryIO, Dict[str, object]], Tuple[TMetaBlock, TTocMetaBlock] -] + @property + def toc_pos(self) -> int: + raise NotImplementedError + @property + def toc_size(self) -> int: + raise NotImplementedError -def _write_data(data: bytes, stream: BinaryIO) -> int: - """ - Returns the index the data was written to. - """ - pos = stream.tell() - stream.write(data) - return pos + @property + def data_pos(self) -> int: + raise NotImplementedError + @property + def data_size(self) -> int: + raise NotImplementedError -def _get_or_write_name(name: str, stream: BinaryIO, lookup: Dict[str, int]) -> int: - # Tools don't like "/" so coerce "/" to "\" - name = name.replace("/", "\\") - if name in lookup: - return lookup[name] - pos = lookup[name] = stream.tell() - enc_name = name.encode("ascii") + b"\0" - stream.write(enc_name) - return pos +class SgaTocHeader(BinaryProxySerializer): + _DRIVE_POS: ClassVar[Tuple[int, int]] = _NULL_PTR # type: ignore + _DRIVE_COUNT: ClassVar[Tuple[int, int]] = _NULL_PTR # type: ignore + _FOLDER_POS: ClassVar[Tuple[int, int]] = _NULL_PTR # type: ignore + _FOLDER_COUNT: ClassVar[Tuple[int, int]] = _NULL_PTR # type: ignore + _FILE_POS: ClassVar[Tuple[int, int]] = _NULL_PTR # type: ignore + _FILE_COUNT: ClassVar[Tuple[int, int]] = _NULL_PTR # type: ignore + _NAME_POS: ClassVar[Tuple[int, int]] = _NULL_PTR # type: ignore + _NAME_COUNT: ClassVar[Tuple[int, int]] = _NULL_PTR # type: ignore + class TablePointer: + def __init__( + self, parent: SgaTocHeader, pos: Tuple[int, int], count: Tuple[int, int] + ): + self._offset_ptr = pos + self._count_ptr = count + self._serializer = parent._serializer -@dataclass -class TOCSerializationInfo(Generic[TFileDef]): - drive: StreamSerializer[DriveDef] - folder: StreamSerializer[FolderDef] - file: StreamSerializer[TFileDef] - name_toc_is_count: bool + @property + def offset(self) -> int: + return self._serializer.int.read(*self._offset_ptr) + @offset.setter + def offset(self, value: int) -> None: + self._serializer.int.write(value, *self._offset_ptr) -ESSENCE_NAMESPACE = "essence" + @property + def count(self) -> int: + return self._serializer.int.read(*self._count_ptr) + @count.setter + def count(self, value: int) -> None: + self._serializer.int.write(value, *self._count_ptr) -class FSAssembler(Generic[TFileDef]): - """ - A Helper class used to assemble the SGA hierarchy - """ + @property + def info(self) -> Tuple[int, int]: + return self.offset, self.count - def __init__( - self, - stream: BinaryIO, - ptrs: ArchivePtrs, - toc: TocBlock, - toc_serialization_info: TOCSerializationInfo[TFileDef], - build_file_meta: AssembleFileMetaFunc[TFileDef], - ): - self.stream: BinaryIO = stream - self.ptrs: ArchivePtrs = ptrs - self.toc: TocBlock = toc - self.toc_serialization_info: TOCSerializationInfo[ - TFileDef - ] = toc_serialization_info - self.build_file_meta: AssembleFileMetaFunc[TFileDef] = build_file_meta - self.names: Dict[int, str] = {} - - # decompress_files: bool = False - # lazy: bool = False - - def read_toc_part( - self, - toc_info: Tuple[int, int], - serializer: StreamSerializer[T], - ) -> List[T]: - self.stream.seek(self.ptrs.header_pos + toc_info[0]) - return [serializer.unpack(self.stream) for _ in range(toc_info[1])] + @info.setter + def info(self, value: Tuple[int, int]) -> None: + pos, count = value + self.offset = pos + self.count = count - def read_toc( - self, - ) -> Tuple[List[DriveDef], List[FolderDef], List[TFileDef], Dict[int, str]]: - drives = self.read_toc_part( - self.toc.drive_info, self.toc_serialization_info.drive - ) - folders = self.read_toc_part( - self.toc.folder_info, self.toc_serialization_info.folder - ) - files = self.read_toc_part(self.toc.file_info, self.toc_serialization_info.file) - names = ( - _read_toc_names_as_count( - self.stream, self.toc.name_info, self.ptrs.header_pos - ) - if self.toc_serialization_info.name_toc_is_count - else _read_toc_names_as_size( - self.stream, self.toc.name_info, self.ptrs.header_pos - ) + def __init__(self, parent: BinaryIO): + super().__init__( + parent, ) - return drives, folders, files, names - - def assemble_file(self, parent_dir: FS, file_def: TFileDef) -> None: - name = self.names[file_def.name_pos] - - metadata = self.build_file_meta(file_def) - file_compressed = file_def.storage_type != StorageType.STORE - lazy_info = FileLazyInfo( - jump_to=self.ptrs.data_pos + file_def.data_pos, - packed_size=file_def.length_in_archive, - unpacked_size=file_def.length_on_disk, - stream=self.stream, - decompress=file_compressed, # self.decompress_files, + self._drive = self.TablePointer(self, self._DRIVE_POS, self._DRIVE_COUNT) + self._folder = self.TablePointer(self, self._FOLDER_POS, self._FOLDER_COUNT) + self._file = self.TablePointer(self, self._FILE_POS, self._FILE_COUNT) + self._name = self.TablePointer(self, self._NAME_POS, self._NAME_COUNT) + + # DRIVE + @property + def drive(self) -> TablePointer: + return self._drive + + @property + def folder(self) -> TablePointer: + return self._folder + + @property + def file(self) -> TablePointer: + return self._file + + @property + def name(self) -> TablePointer: + return self._name + + +class SgaTocDrive(BinaryProxySerializer): + _ALIAS: ClassVar[Tuple[int, int]] = _NULL_PTR # type: ignore + _NAME: ClassVar[Tuple[int, int]] = _NULL_PTR # type: ignore + _FIRST_FOLDER: ClassVar[Tuple[int, int]] = _NULL_PTR # type: ignore + _LAST_FOLDER: ClassVar[Tuple[int, int]] = _NULL_PTR # type: ignore + _FIRST_FILE: ClassVar[Tuple[int, int]] = _NULL_PTR # type: ignore + _LAST_FILE: ClassVar[Tuple[int, int]] = _NULL_PTR # type: ignore + _ROOT_FOLDER: ClassVar[Tuple[int, int]] = _NULL_PTR # type: ignore + _SIZE: ClassVar[int] = _NULL_PTR # type: ignore + _INT_BYTEORDER: ClassVar[Literal["little"]] = "little" + _INT_SIGNED: ClassVar[bool] = False + _STR_ENC = "ascii" + _STR_PAD = "\0" + + def __init__(self, parent: BinaryIO): + super().__init__( + parent, ) - data = lazy_info.read(file_compressed) # self.decompress_files) + @property + def alias(self) -> str: + return self._serializer.c_string.read( + *self._ALIAS, encoding=self._STR_ENC, padding=self._STR_PAD + ) - essence_info: Dict[str, object] = {"storage_type": int(file_def.storage_type)} - if metadata is not None: - essence_info.update(metadata) + @alias.setter + def alias(self, value: str) -> None: + self._serializer.c_string.write( + value, *self._ALIAS, encoding=self._STR_ENC, padding=self._STR_PAD + ) - with parent_dir.open(name, "wb") as file: - file.write(data) + @property + def name(self) -> str: + return self._serializer.c_string.read( + *self._NAME, encoding=self._STR_ENC, padding=self._STR_PAD + ) - info = {ESSENCE_NAMESPACE: essence_info} - parent_dir.setinfo(name, info) + @name.setter + def name(self, value: str) -> None: + self._serializer.c_string.write( + value, *self._NAME, encoding=self._STR_ENC, padding=self._STR_PAD + ) - def _assemble_container( - self, - container: FS, - file_range: Tuple[int, int], - folder_range: Tuple[int, int], - files: List[TFileDef], - folders: List[FolderDef], - file_offset: int, - folder_offset: int, - ) -> None: - offsetted_file_range = [ - file_range[0] - file_offset, - file_range[1] - file_offset, - ] - offsetted_folder_range = [ - folder_range[0] - folder_offset, - folder_range[1] - folder_offset, - ] - - container_files = files[offsetted_file_range[0] : offsetted_file_range[1]] - container_folders = folders[ - offsetted_folder_range[0] : offsetted_folder_range[1] - ] - - for file_def in container_files: - self.assemble_file(container, file_def) - - for folder_def in container_folders: - self.assemble_folder( - container, folder_def, files, folders, file_offset, folder_offset - ) + @property + def first_folder(self) -> int: + return self._serializer.int.read( + *self._FIRST_FOLDER, byteorder=self._INT_BYTEORDER, signed=self._INT_SIGNED + ) - def assemble_folder( - self, - parent_dir: FS, - folder_def: FolderDef, - files: List[TFileDef], - folders: List[FolderDef], - file_offset: int, - folder_offset: int, - ) -> FS: - raw_folder_name = self.names[folder_def.name_pos] - folder_name_as_path = raw_folder_name.split( - "\\" - ) # We could gaurd against '/' but because the official relic mod tools crap themselves, we'll crap ourselves too. # TODO, instead of crappign ourselves, maybe produce a decent error? instead of relying on ResourceNotFound? - folder_name = ( - folder_name_as_path[-1] if len(folder_name_as_path) > 0 else raw_folder_name + @first_folder.setter + def first_folder(self, value: int) -> None: + self._serializer.int.write( + value, + *self._FIRST_FOLDER, + byteorder=self._INT_BYTEORDER, + signed=self._INT_SIGNED, ) - folder = parent_dir.makedir(folder_name) - self._assemble_container( - folder, - folder_def.file_range, - folder_def.folder_range, - files, - folders, - file_offset, - folder_offset, + @property + def last_folder(self) -> int: + return self._serializer.int.read( + *self._LAST_FOLDER, byteorder=self._INT_BYTEORDER, signed=self._INT_SIGNED ) - return folder - def assemble_drive( - self, - essence_fs: EssenceFS, - drive_def: DriveDef, - folder_defs: List[FolderDef], - file_defs: List[TFileDef], - ) -> FS: - local_file_defs = file_defs[drive_def.file_range[0] : drive_def.file_range[1]] - local_folder_defs = folder_defs[ - drive_def.folder_range[0] : drive_def.folder_range[1] - ] - - file_offset = drive_def.file_range[0] - folder_offset = drive_def.folder_range[0] - - # make root folder relative to our folder slice - drive_folder_index = drive_def.root_folder - folder_offset - drive_folder_def = local_folder_defs[drive_folder_index] - - drive = essence_fs.create_drive(drive_def.alias, drive_def.name) - self._assemble_container( - drive, - drive_folder_def.file_range, - drive_folder_def.folder_range, - local_file_defs, - local_folder_defs, - file_offset, - folder_offset, + @last_folder.setter + def last_folder(self, value: int) -> None: + self._serializer.int.write( + value, + *self._LAST_FOLDER, + byteorder=self._INT_BYTEORDER, + signed=self._INT_SIGNED, ) - return drive - def assemble(self, fs: EssenceFS) -> None: - drive_defs, folder_defs, file_defs, names = self.read_toc() - self.names.update(names) - for drive_def in drive_defs: - self.assemble_drive(fs, drive_def, folder_defs, file_defs) + @property + def first_file(self) -> int: + return self._serializer.int.read( + *self._FIRST_FILE, byteorder=self._INT_BYTEORDER, signed=self._INT_SIGNED + ) + @first_file.setter + def first_file(self, value: int) -> None: + self._serializer.int.write( + value, + *self._FIRST_FILE, + byteorder=self._INT_BYTEORDER, + signed=self._INT_SIGNED, + ) -class FSDisassembler(Generic[TFileDef]): - def __init__( - self, - fs: EssenceFS, - toc_stream: BinaryIO, - data_stream: BinaryIO, - name_stream: BinaryIO, - toc_serialization_info: TOCSerializationInfo[TFileDef], - meta2def: DisassembleFileMetaFunc[TFileDef], - ): - self.fs = fs - """A stream containing the TOC Block""" - self.toc_stream = toc_stream - """A stream containing the DATA Block""" - self.data_stream = data_stream - """A stream containing the NAME Block""" - self.name_stream = name_stream - """A collection containing serializers for DriveDef, FolderDef, FileDef, and a flag to determine whether the NAME Block uses 'size in bytes ~ SIZE' or 'number of elements ~ COUNT'""" - self.toc_serialization_info = toc_serialization_info - """A function which converts FileMetadata to a FileDef""" - self.meta2def = meta2def - """A collection of file definitions laid out sequentially (by folder). This is populated and used inside the assembler.""" - self.flat_files: List[TFileDef] = [] - """A collection of folder definitions laid out sequentially (by drive/parent folder). This is populated and used inside the assembler.""" - self.flat_folders: List[FolderDef] = [] - """A collection of drive definitions), ordered arbitrarily. This is populated and used inside the assembler.""" - self.flat_drives: List[DriveDef] = [] - """A lookup table to find names already written to the NAME block; contains the position of the desired name in the NAME block.""" - self.flat_names: Dict[str, int] = {} - - def disassemble_file(self, container_fs: FS, file_name: str) -> TFileDef: - with container_fs.open(file_name, "rb") as handle: - data = handle.read() - - metadata = dict(container_fs.getinfo(file_name, ["essence"]).raw["essence"]) - - file_def: TFileDef = self.meta2def(metadata) - _storage_type_value: int = metadata["storage_type"] # type: ignore - storage_type = StorageType(_storage_type_value) - if storage_type == StorageType.STORE: - store_data = data - elif storage_type in [ - StorageType.BUFFER_COMPRESS, - StorageType.STREAM_COMPRESS, - ]: - store_data = zlib.compress(data) # TODO process in chunks for large files - else: - raise NotImplementedError + @property + def last_file(self) -> int: + return self._serializer.int.read( + *self._LAST_FILE, byteorder=self._INT_BYTEORDER, signed=self._INT_SIGNED + ) - file_def.storage_type = storage_type - file_def.length_on_disk = len(data) - file_def.length_in_archive = len(store_data) + @last_file.setter + def last_file(self, value: int) -> None: + self._serializer.int.write( + value, + *self._LAST_FILE, + byteorder=self._INT_BYTEORDER, + signed=self._INT_SIGNED, + ) - file_def.name_pos = _get_or_write_name( - file_name, self.name_stream, self.flat_names + @property + def root_folder(self) -> int: + return self._serializer.int.read( + *self._ROOT_FOLDER, byteorder=self._INT_BYTEORDER, signed=self._INT_SIGNED ) - file_def.data_pos = _write_data(store_data, self.data_stream) - - return file_def - - def flatten_file_collection(self, container_fs: FS) -> Tuple[int, int]: - subfile_start = len(self.flat_files) - subfile_defs = [ - self.disassemble_file(container_fs, file_info.name) - for file_info in container_fs.scandir("/") - if not file_info.is_dir - ] - self.flat_files.extend(subfile_defs) - subfile_end = len(self.flat_files) - - if subfile_start == subfile_end: - subfile_start = subfile_end = 0 # - return subfile_start, subfile_end - - def flatten_folder_collection(self, container_fs: FS, path: str) -> Tuple[int, int]: - # Create temporary None folders to ensure a continuous range of child folders; BEFORE entering any child folders - subfolder_start = len(self.flat_folders) - folders = [ - file_info.name - for file_info in container_fs.scandir("/") - if file_info.is_dir - ] - self.flat_folders.extend([None] * len(folders)) # type:ignore - subfolder_end = len(self.flat_folders) - - # Enter subfolders, and add them to the flat array - subfolder_defs = [ - self.disassemble_folder(container_fs.opendir(folder), f"{path}/{folder}") - for folder in folders - ] - self.flat_folders[subfolder_start:subfolder_end] = subfolder_defs - return subfolder_start, subfolder_end - - def _flatten_folder_names(self, fs: FS, path: str) -> None: - folders = [file_info.name for file_info in fs.scandir("/") if file_info.is_dir] - files = [file_info.name for file_info in fs.scandir("/") if file_info.is_file] - - if len(path) > 0 and path[0] == "/": - path = path[1:] # strip leading '/' - _get_or_write_name(path, self.name_stream, self.flat_names) - - for fold_path in folders: - full_fold_path = f"{path}/{fold_path}" - full_fold_path = str(full_fold_path).split(":", 1)[ - -1 - ] # Strip 'alias:' from path - if full_fold_path[0] == "/": - full_fold_path = full_fold_path[1:] # strip leading '/' - _get_or_write_name(full_fold_path, self.name_stream, self.flat_names) - - for file_path in files: - _get_or_write_name(file_path, self.name_stream, self.flat_names) - - def disassemble_folder(self, folder_fs: FS, path: str) -> FolderDef: - folder_def = FolderDef(None, None, None) # type: ignore - # Write Name - self._flatten_folder_names(folder_fs, path) - - folder_name = str(path).split(":", 1)[-1] # Strip 'alias:' from path - if folder_name[0] == "/": - folder_name = folder_name[1:] # strip leading '/' - folder_def.name_pos = _get_or_write_name( - folder_name, self.name_stream, self.flat_names + + @root_folder.setter + def root_folder(self, value: int) -> None: + self._serializer.int.write( + value, + *self._ROOT_FOLDER, + byteorder=self._INT_BYTEORDER, + signed=self._INT_SIGNED, ) - # Subfolders - # # Since Relic typically uses the first folder as the root folder; I will try to preserve that parent folders come before their child folders - subfolder_range = self.flatten_folder_collection(folder_fs, path) - # Subfiles - subfile_range = self.flatten_file_collection(folder_fs) +class SgaTocFolder(BinaryProxySerializer): + _NAME_OFFSET: ClassVar[Tuple[int, int]] = (None, None) # type: ignore + _SUB_FOLDER_START: ClassVar[Tuple[int, int]] = _NULL_PTR # type: ignore + _SUB_FOLDER_STOP: ClassVar[Tuple[int, int]] = _NULL_PTR # type: ignore + _FIRST_FILE: ClassVar[Tuple[int, int]] = _NULL_PTR # type: ignore + _LAST_FILE: ClassVar[Tuple[int, int]] = _NULL_PTR # type: ignore + _SIZE: ClassVar[int] = _NULL_PTR # type: ignore + _INT_BYTEORDER: ClassVar[Literal["little"]] = "little" + _INT_SIGNED: ClassVar[bool] = False - folder_def.file_range = subfile_range - folder_def.folder_range = subfolder_range + def __init__(self, parent: BinaryIO): + super().__init__(parent) - return folder_def + @property + def name_offset(self) -> int: + return self._serializer.int.read( + *self._NAME_OFFSET, byteorder=self._INT_BYTEORDER, signed=self._INT_SIGNED + ) - def disassemble_drive(self, drive: _EssenceDriveFS) -> DriveDef: - name = drive.name - folder_name = "" - alias = drive.alias - drive_folder_def = FolderDef(None, None, None) # type: ignore - self._flatten_folder_names(drive, folder_name) + @name_offset.setter + def name_offset(self, value: int) -> None: + self._serializer.int.write( + value, + *self._NAME_OFFSET, + byteorder=self._INT_BYTEORDER, + signed=self._INT_SIGNED, + ) - root_folder = len(self.flat_folders) - folder_start = len(self.flat_folders) - file_start = len(self.flat_files) - self.flat_folders.append(drive_folder_def) + @property + def first_folder(self) -> int: + return self._serializer.int.read( + *self._SUB_FOLDER_START, + byteorder=self._INT_BYTEORDER, + signed=self._INT_SIGNED, + ) - # Name should be an empty string? - drive_folder_def.name_pos = _get_or_write_name( - folder_name, self.name_stream, self.flat_names + @first_folder.setter + def first_folder(self, value: int) -> None: + self._serializer.int.write( + value, + *self._SUB_FOLDER_START, + byteorder=self._INT_BYTEORDER, + signed=self._INT_SIGNED, ) - drive_folder_def.file_range = self.flatten_file_collection(drive) - drive_folder_def.folder_range = self.flatten_folder_collection( - drive, folder_name + + @property + def last_folder(self) -> int: + return self._serializer.int.read( + *self._SUB_FOLDER_STOP, + byteorder=self._INT_BYTEORDER, + signed=self._INT_SIGNED, ) - folder_end = len(self.flat_folders) - file_end = len(self.flat_files) + @last_folder.setter + def last_folder(self, value: int) -> None: + self._serializer.int.write( + value, + *self._SUB_FOLDER_STOP, + byteorder=self._INT_BYTEORDER, + signed=self._INT_SIGNED, + ) - drive_def = DriveDef( - alias, - name, - root_folder, - folder_range=(folder_start, folder_end), - file_range=(file_start, file_end), + @property + def first_file(self) -> int: + return self._serializer.int.read( + *self._FIRST_FILE, byteorder=self._INT_BYTEORDER, signed=self._INT_SIGNED ) - return drive_def - - def write_toc(self) -> TocBlock: - """ - Writes TOC data to the stream. - - The TocHeader returned is relative to the toc stream's start, does not include the TocHeader itself. - """ - # Normally, this is drive -> folder -> file -> names - # But the TOC can handle an arbitrary order (due to ptrs); so we only do this to match their style - drive_offset = self.toc_stream.tell() - for drive_def in self.flat_drives: - self.toc_serialization_info.drive.pack(self.toc_stream, drive_def) - - folder_offset = self.toc_stream.tell() - for folder_def in self.flat_folders: - self.toc_serialization_info.folder.pack(self.toc_stream, folder_def) - - file_offset = self.toc_stream.tell() - for file_def in self.flat_files: - self.toc_serialization_info.file.pack(self.toc_stream, file_def) - - name_offset = self.toc_stream.tell() - name_size = self.name_stream.tell() - self.name_stream.seek(0) - _chunked_copy(self.name_stream, self.toc_stream, chunk_size=64 * KiB) - return TocBlock( - drive_info=(drive_offset, len(self.flat_drives)), - folder_info=(folder_offset, len(self.flat_folders)), - file_info=(file_offset, len(self.flat_files)), - name_info=( - name_offset, - len(self.flat_names) - if self.toc_serialization_info.name_toc_is_count - else name_size, - ), + + @first_file.setter + def first_file(self, value: int) -> None: + self._serializer.int.write( + value, + *self._FIRST_FILE, + byteorder=self._INT_BYTEORDER, + signed=self._INT_SIGNED, ) - def disassemble(self) -> TocBlock: - for _, drive_fs in self.fs.iterate_fs(): - drive_fs = typing.cast(_EssenceDriveFS, drive_fs) - drive_def = self.disassemble_drive(drive_fs) - self.flat_drives.append(drive_def) - - return self.write_toc() - - -def _read_toc_names_as_count( - stream: BinaryIO, toc_info: Tuple[int, int], header_pos: int, buffer_size: int = 256 -) -> Dict[int, str]: - NULL = 0 - NULL_CHAR = b"\0" - stream.seek(header_pos + toc_info[0]) - - names: Dict[int, str] = {} - running_buffer = bytearray() - offset = 0 - while len(names) < toc_info[1]: - buffer = stream.read(buffer_size) - if len(buffer) == 0: - raise Exception("Ran out of data!") # TODO, proper exception - terminal_null = buffer[-1] == NULL - parts = buffer.split(NULL_CHAR) - if len(parts) > 1: - parts[0] = running_buffer + parts[0] - running_buffer.clear() - if not terminal_null: - running_buffer.extend(parts[-1]) - parts = parts[:-1] # drop empty or partial + @property + def last_file(self) -> int: + return self._serializer.int.read( + *self._LAST_FILE, byteorder=self._INT_BYTEORDER, signed=self._INT_SIGNED + ) - else: - if not terminal_null: - running_buffer.extend(parts[0]) - offset += len(buffer) - continue - - remaining = toc_info[1] - len(names) - available = min(len(parts), remaining) - for _ in range(available): - name = parts[_] - names[offset] = name.decode("ascii") - offset += len(name) + 1 - return names - - -def _read_toc_names_as_size( - stream: BinaryIO, toc_info: Tuple[int, int], header_pos: int -) -> Dict[int, str]: - stream.seek(header_pos + toc_info[0]) - name_buffer = stream.read(toc_info[1]) - parts = name_buffer.split(b"\0") - names: Dict[int, str] = {} - offset = 0 - for part in parts: - names[offset] = part.decode("ascii") - offset += len(part) + 1 - return names - - -def _chunked_read( - stream: BinaryIO, size: Optional[int] = None, chunk_size: Optional[int] = None -) -> Iterable[bytes]: - if size is None and chunk_size is None: - yield stream.read() - elif size is None and chunk_size is not None: + @last_file.setter + def last_file(self, value: int) -> None: + self._serializer.int.write( + value, + *self._LAST_FILE, + byteorder=self._INT_BYTEORDER, + signed=self._INT_SIGNED, + ) + + +class SgaNameWindow(BinaryProxySerializer): + def __init__( + self, + parent: BinaryIO, + offset: int, + count: int, + length_mode: bool = False, + encoding: str = "utf-8", + ) -> None: + size = count if length_mode else tell_end(parent) + self._window = BinaryWindow(parent, offset, size, name="SGA ToC Name Buffer") + super().__init__(self._window) + self._count = count if not length_mode else None + + self._encoding = encoding + self._cacheable = parent.readable() and not parent.writable() + self.length_mode = length_mode + + self._cache: Optional[Dict[int, str]] = None + self._init_cache() + + def _init_cache(self) -> None: + if not self._cacheable: + return + if self._cache is None: + self._cache = {} + + # Length mode can preload the cache + if self.length_mode: + self._serializer.stream.seek(0) + buffer = self._serializer.stream.read() + names: List[bytes] = buffer.split(b"\0") + counter = 0 + for name in names: + self._cache[counter] = name.decode(self._encoding) + counter += len(name) + 1 # +1 for "\0" + self._count = len(self._cache) + + @staticmethod + def _read_until_terminal( + stream: BinaryIO, start: int, buffer_size: int = 64, terminal: bytes = b"\x00" + ) -> bytes: + parts = [] + stream.seek(start) while True: - buffer = stream.read(chunk_size) - yield buffer - if len(buffer) != chunk_size: + buffer = stream.read(buffer_size) + split = buffer.split(terminal, maxsplit=1) + parts.append(split[0]) + if len(split) > 1: break - elif size is not None and chunk_size is None: - yield stream.read(size) - elif size is not None and chunk_size is not None: - chunks = size // chunk_size - for _ in range(chunks): - yield stream.read(chunk_size) - total_read = chunk_size * chunks - if total_read < size: - yield stream.read(size - total_read) - else: - raise Exception("Something impossible happened!") - - -def _chunked_copy( - in_stream: BinaryIO, - out_stream: BinaryIO, - size: Optional[int] = None, - chunk_size: Optional[int] = None, -) -> None: - for chunk in _chunked_read(in_stream, size, chunk_size): - out_stream.write(chunk) - - -@dataclass -class Md5ChecksumHelper: - expected: Optional[bytes] - stream: Optional[BinaryIO] - start: int - size: Optional[int] = None - eigen: Optional[bytes] = None - - def read(self, stream: Optional[BinaryIO] = None) -> bytes: - stream = self.stream if stream is None else stream - if stream is None: - raise IOError("No Stream Provided!") - stream.seek(self.start) - md5 = hashlib.md5(self.eigen) if self.eigen is not None else hashlib.md5() - # Safer for large files to read chunked - for chunk in _chunked_read(stream, self.size, 256 * KiB): - md5.update(chunk) - md5_str = md5.hexdigest() - return bytes.fromhex(md5_str) - - def validate(self, stream: Optional[BinaryIO] = None) -> None: - result = self.read(stream) - if self.expected != result: - raise MD5MismatchError(result, self.expected) - - -def _fix_toc(toc: TocBlock, cur_toc_start: int, desired_toc_start: int) -> None: - def _fix(info: Tuple[int, int]) -> Tuple[int, int]: - return info[0] + (cur_toc_start - desired_toc_start), info[1] - - toc.folder_info = _fix(toc.folder_info) - toc.file_info = _fix(toc.file_info) - toc.drive_info = _fix(toc.drive_info) - toc.name_info = _fix(toc.name_info) - - -class EssenceFSSerializer( - EssenceFSHandler, Generic[TFileDef, TMetaBlock, TTocMetaBlock] -): - # Would use a dataclass; but I also want to be able to override defaults in parent dataclasses + return b"".join(parts) + + def get_name(self, name_offset: int) -> str: + if self._cache is not None and name_offset in self._cache: + return self._cache[name_offset] + + name_buffer = self._read_until_terminal(self._serializer.stream, name_offset) + name = name_buffer.decode(self._encoding) + + if self._cache is not None: + self._cache[name_offset] = name + + return name + + +_TocWindowCls = TypeVar("_TocWindowCls", BinaryProxySerializer, BinaryWrapper) + + +class SgaTocInfoArea(Generic[_TocWindowCls]): def __init__( self, - version: Version, - meta_serializer: StreamSerializer[TMetaBlock], - toc_serializer: StreamSerializer[TocBlock], - toc_meta_serializer: Optional[StreamSerializer[TTocMetaBlock]], - toc_serialization_info: TOCSerializationInfo[TFileDef], - assemble_meta: AssembleMetaFunc[TMetaBlock, TTocMetaBlock], - disassemble_meta: DisassembleMetaFunc[TMetaBlock, TTocMetaBlock], - build_file_meta: AssembleFileMetaFunc[TFileDef], - gen_empty_meta: Callable[[], TMetaBlock], - finalize_meta: Callable[[BinaryIO, TMetaBlock], None], - meta2def: Callable[[Dict[str, object]], TFileDef], - assembler: Optional[Type[FSAssembler[TFileDef]]] = None, - disassembler: Optional[Type[FSDisassembler[TFileDef]]] = None, - ): - self.version = version - self.meta_serializer = meta_serializer - self.toc_serializer = toc_serializer - self.toc_meta_serializer = toc_meta_serializer - self.toc_serialization_info = toc_serialization_info - self.assemble_meta = assemble_meta - self.disassemble_meta = disassemble_meta - self.build_file_meta = build_file_meta - self.gen_empty_meta = gen_empty_meta - self.finalize_meta = finalize_meta - self.meta2def = meta2def - self.assembler_type = assembler or FSAssembler - self.disassembler_type = disassembler or FSDisassembler - - def read(self, stream: BinaryIO) -> EssenceFS: - # Magic & Version; skippable so that we can check for a valid file and read the version elsewhere - _validate_magic_word(MagicWord, stream, advance=True) - stream_version = Version.unpack(stream) - if stream_version != self.version: - raise VersionMismatchError(stream_version, self.version) - - meta_block = self.meta_serializer.unpack(stream) - stream.seek(meta_block.ptrs.header_pos) - toc_block = self.toc_serializer.unpack(stream) - # Additional TOC information is not present in earlier versions - toc_meta_block = ( - self.toc_meta_serializer.unpack(stream) - if self.toc_meta_serializer is not None - else None - ) - - name, metadata = meta_block.name, self.assemble_meta( - stream, meta_block, toc_meta_block - ) - assembler: FSAssembler[TFileDef] = self.assembler_type( - stream=stream, - ptrs=meta_block.ptrs, - toc=toc_block, - toc_serialization_info=self.toc_serialization_info, - # decompress_files=decompress, - build_file_meta=self.build_file_meta, - # lazy=lazy, - ) - essence_fs = EssenceFS() - assembler.assemble(essence_fs) - essence_info: Dict[str, object] = { - "name": name, - "version": {"major": stream_version.major, "minor": stream_version.minor}, - } - if metadata is not None: - essence_info.update(metadata) - - essence_fs.setmeta(essence_info, ESSENCE_NAMESPACE) - return essence_fs - - def write(self, stream: BinaryIO, essence_fs: EssenceFS) -> int: - archive_metadata: Dict[str, object] = typing.cast( - Dict[str, object], essence_fs.getmeta("essence") - ) - archive_name: str = typing.cast(str, archive_metadata["name"]) - # IDK why I write to a temp stream; maybe to preserve dest stream in case of errors? - with BytesIO() as temp_stream: - MagicWord.write_magic_word(temp_stream) - self.version.pack(temp_stream) - with BytesIO() as data_stream: - with BytesIO() as toc_stream: - with BytesIO() as name_stream: - disassembler: FSDisassembler[TFileDef] = self.disassembler_type( - fs=essence_fs, - toc_stream=toc_stream, - data_stream=data_stream, - name_stream=name_stream, - toc_serialization_info=self.toc_serialization_info, - meta2def=self.meta2def, - ) - - partial_toc = disassembler.disassemble() - - partial_meta, toc_meta = self.disassemble_meta( - temp_stream, archive_metadata - ) - # we need to come back with the correct data - meta_writeback = temp_stream.tell() - empty_meta = self.gen_empty_meta() - self.meta_serializer.pack(temp_stream, empty_meta) - - # the start of the toc stream in the current stream - toc_start = temp_stream.tell() - toc_writeback = toc_start - self.toc_serializer.pack(temp_stream, TocBlock.default()) - - if self.toc_meta_serializer: - self.toc_meta_serializer.pack(temp_stream, toc_meta) - - toc_rel_start = temp_stream.tell() - toc_stream.seek(0) - _chunked_copy(toc_stream, temp_stream, chunk_size=64 * KiB) - toc_end = temp_stream.tell() # The end of the TOC block; - toc_size = toc_end - toc_start - - data_start = temp_stream.tell() - data_stream.seek(0) - _chunked_copy(data_stream, temp_stream, chunk_size=1 * MiB) - data_size = data_stream.tell() - - partial_meta.name = archive_name - partial_meta.ptrs = ArchivePtrs( - toc_start, toc_size, data_start, data_size - ) - _fix_toc(partial_toc, toc_rel_start, toc_start) - - temp_stream.seek(toc_writeback) - self.toc_serializer.pack(temp_stream, partial_toc) - - if self.finalize_meta is not None: - self.finalize_meta(temp_stream, partial_meta) - - temp_stream.seek(meta_writeback) - self.meta_serializer.pack(temp_stream, partial_meta) - - temp_stream.seek(0) - _chunked_copy(temp_stream, stream, chunk_size=16 * MiB) - return temp_stream.tell() - - -# Archives have 7 blocks: -# MagicBlock -# Contains "_ARCHIVE" (8 byte long ASCII string) -# Contains Version (UINT16, UINT16 tuple) -# MetaBlock -# Several Metadata sections -# PTR Block -# TOC Block -# FileBlock -# FolderBlock -# DriveBlock -# NameBlock -# DataBlock - - -@dataclass -class FileLazyInfo: - jump_to: int - packed_size: int - unpacked_size: int - stream: BinaryIO - decompress: bool - - def read(self, decompress: Optional[bool] = None) -> bytes: - decompress = self.decompress if decompress is None else decompress - jump_back = self.stream.tell() - self.stream.seek(self.jump_to) - in_buffer = self.stream.read(self.packed_size) - if decompress and self.packed_size != self.unpacked_size: - out_buffer = zlib.decompress(in_buffer) - if len(out_buffer) != self.unpacked_size: - raise DecompressedSizeMismatch(len(out_buffer), self.unpacked_size) + parent: Union[BinaryIO, BinaryProxy], + offset: int, + count: int, + cls: Type[_TocWindowCls], + cls_size: Optional[int] = None, + ) -> None: + self._parent = parent + self._cls: Type[_TocWindowCls] = cls + if hasattr(self._cls, "_SIZE"): + self._cls_size = self._cls._SIZE + elif cls_size is not None: + self._cls_size = cls_size else: - out_buffer = in_buffer - self.stream.seek(jump_back) - return out_buffer + raise RelicToolError("TOC Window size could not be determined!") + + self._windows: Dict[int, _TocWindowCls] = {} + self._info_offset = offset + self._info_count = count + + def __get_window(self, index: int) -> _TocWindowCls: + offset, count = self._info_offset, self._info_count + if not 0 <= index < count: + raise IndexError(index, f"Valid indexes are ['{0}', '{count}')") + + if index not in self._windows: + self._windows[index] = self._cls( + BinaryWindow( + self._parent, + offset + self._cls_size * index, + self._cls_size, + name=f"SGA ToC Info Area ['{index}']", + ) + ) + + return self._windows[index] + + def __getitem__( + self, item: Union[int, slice] + ) -> Union[_TocWindowCls, List[_TocWindowCls]]: + if isinstance(item, slice): + return list( + self.__get_window(index) + for index in range(*item.indices(self._info_count)) + ) + return self.__get_window(item) + + def __len__(self) -> int: + return self._info_count + + def __iter__(self) -> Iterator[_TocWindowCls]: + for index in range(self._info_count): + yield self[index] # type: ignore + + +class SgaTocFile: + @property + def name_offset(self) -> int: + raise NotImplementedError + + @property + def data_offset(self) -> int: + raise NotImplementedError + @property + def compressed_size(self) -> int: # length_in_archive + raise NotImplementedError -@dataclass -class ArchivePtrs: - """ - Contains 'pointers' to the TOC Block (header_pos, header_size) and the DATA Block (data_pos, data_size) - """ + @property + def decompressed_size(self) -> int: # length_on_disk + raise NotImplementedError - header_pos: int - header_size: int - data_pos: int - data_size: Optional[int] = None + @property + def storage_type(self) -> StorageType: + raise NotImplementedError + + +class SgaToc(BinaryProxySerializer): + def __init__(self, parent: BinaryIO): + super().__init__(parent) + + @property + def header(self) -> SgaTocHeader: + raise NotImplementedError + + @property + def drives(self) -> SgaTocInfoArea[SgaTocDrive]: # type: ignore + raise NotImplementedError + + @property + def folders(self) -> SgaTocInfoArea[SgaTocFolder]: # type: ignore + raise NotImplementedError + + @property + def files(self) -> SgaTocInfoArea[SgaTocFile]: # type: ignore + raise NotImplementedError + + @property + def names(self) -> SgaNameWindow: + raise NotImplementedError + + +class SgaFile(BinaryProxySerializer): + _MAGIC_WORD = (0, 8) + _VERSION = (8, 4) + _MAGIC_VERSION_SIZE = 12 + _VERSION_INT_FMT = {"byteorder": "little", "signed": False} + + @property + def magic_word(self) -> bytes: + return self._serializer.read_bytes(*self._MAGIC_WORD) + + @property + def version(self) -> Version: + buffer = self._serializer.read_bytes(*self._VERSION) + major = self._serializer.uint16.unpack(buffer[:2], **self._VERSION_INT_FMT) # type: ignore + minor = self._serializer.uint16.unpack(buffer[2:], **self._VERSION_INT_FMT) # type: ignore + return Version(major, minor) + + @version.setter + def version(self, value: Version) -> None: + major = self._serializer.uint16.pack(value.major, **self._VERSION_INT_FMT) # type: ignore + minor = self._serializer.uint16.pack(value.minor, **self._VERSION_INT_FMT) # type: ignore + buffer = b"".join([major, minor]) + self._serializer.write_bytes(buffer, *self._VERSION) + + @property + def meta(self) -> SgaHeader: + raise NotImplementedError + + @property + def table_of_contents(self) -> SgaToc: + raise NotImplementedError + + @property + def data_block(self) -> BinaryWindow: + raise NotImplementedError + + +class VersionSerializer: + _INT_SIZE = 2 + _MAJOR = (0, _INT_SIZE) + _MINOR = (2, _INT_SIZE) + _SIZE = _INT_SIZE * 2 + _INT_BYTEORDER: ClassVar[Literal["little"]] = "little" + _INT_SIGNED = False + + @classmethod + def unpack(cls, buffer: bytes) -> Version: + with BytesIO(buffer) as reader: + serializer = BinarySerializer(reader) + major = serializer.uint16.read( + *cls._MAJOR, byteorder=cls._INT_BYTEORDER, signed=cls._INT_SIGNED + ) + minor = serializer.uint16.read( + *cls._MINOR, byteorder=cls._INT_BYTEORDER, signed=cls._INT_SIGNED + ) + return Version(major, minor) + + @classmethod + def read(cls, stream: BinaryIO) -> Version: + buffer = stream.read(cls._SIZE) + return cls.unpack(buffer) + + @classmethod + def pack(cls, version: Version) -> bytes: + with BytesIO(b"\0" * cls._SIZE) as writer: + serializer = BinarySerializer(writer) + serializer.uint16.write( + version.major, + *cls._MAJOR, + byteorder=cls._INT_BYTEORDER, + signed=cls._INT_SIGNED, + ) + serializer.uint16.write( + version.minor, + *cls._MINOR, + byteorder=cls._INT_BYTEORDER, + signed=cls._INT_SIGNED, + ) + return writer.getvalue() @classmethod - def default(cls) -> ArchivePtrs: - """ - Creates a 'Default' Archive Ptrs Object; used to create a valid placeholder until proper data is supplied. - """ - return cls(0, 0, 0, 0) - - -__all__ = [ - "TocBlock", - "TocHeaderSerializer", - "DriveDef", - "DriveDefSerializer", - "FolderDef", - "FolderDefSerializer", - "MetaBlock", - "TMetaBlock", - "TTocMetaBlock", - "FileDef", - "TFileDef", - "AssembleFileMetaFunc", - "DisassembleFileMetaFunc", - "AssembleMetaFunc", - "DisassembleMetaFunc", - "TOCSerializationInfo", - "FSAssembler", - "FSDisassembler", - "Md5ChecksumHelper", - "EssenceFSSerializer", - "FileLazyInfo", - "ArchivePtrs", -] + def write(cls, stream: BinaryIO, version: Version) -> int: + buffer = cls.pack(version) + return stream.write(buffer) diff --git a/tests/issues/test_issue_39.py b/tests/issues/test_issue_39.py deleted file mode 100644 index 7435d41..0000000 --- a/tests/issues/test_issue_39.py +++ /dev/null @@ -1,110 +0,0 @@ -""" -TestCases for 'EssenceDriveFS not respecting // separator' -https://github.com/MAK-Relic-Tool/Issue-Tracker/issues/39 -""" -import zlib -from contextlib import contextmanager - -import fs -from fs.base import FS -from fs.memoryfs import MemoryFS - -from relic.sga.core import StorageType -from relic.sga.core.filesystem import EssenceFS - - -@contextmanager -def _generate_fake_osfs() -> FS: - raw_text = b"""Ready to unleash 11 barrels of lead. -Where's that artillery?!?! -Orks are da biggust and da strongest. -Fix bayonets! -Fear me, but follow! -Call for an earth-shaker? -My mind is too weary to fight on... -We'll be off as soon as the fuel arrives. -Where are those tech priests. -Fire until they see the glow of our barrels!""" - - comp_text = zlib.compress(raw_text) - - with MemoryFS() as fs: - with fs.makedir("/samples") as samples_folder: - with samples_folder.makedir("/strings") as strings_folders: - with strings_folders.openbin("buffer.txt", "wb") as file: - file.write(comp_text) - with strings_folders.openbin("stream.txt", "wb") as file: - file.write(comp_text) - with strings_folders.openbin("store.txt", "wb") as file: - file.write(raw_text) - yield fs - - -_CHUNK_SIZE = 1024 * 1024 * 16 # 16 MiB - - -def _pack_fake_osfs(osfs: FS, name: str) -> EssenceFS: - # Create 'SGA' V2 - sga = EssenceFS() - sga.setmeta( - { - "name": name, # Specify name of archive - "header_md5": "0" - * 16, # Must be present due to a bug, recalculated when packed - "file_md5": "0" - * 16, # Must be present due to a bug, recalculated when packed - }, - "essence", - ) - - alias = "data" - name = "test data" - sga_drive = None # sga.create_drive(alias) - for path in osfs.walk.files(): - if ( - sga_drive is None - ): # Lazily create drive, to avoid empty drives from being created - sga_drive = sga.create_drive(alias, name) - - if "stream" in path: - storage = StorageType.STREAM_COMPRESS - elif "buffer" in path: - storage = StorageType.BUFFER_COMPRESS - else: - storage = StorageType.STORE - - with osfs.openbin(path, "r") as unpacked_file: - parent, file = fs.path.split(path) - with sga_drive.makedirs(parent, recreate=True) as folder: - with folder.openbin(file, "w") as packed_file: - while True: - buffer = unpacked_file.read(_CHUNK_SIZE) - if len(buffer) == 0: - break - packed_file.write(buffer) - sga_drive.setinfo(path, {"essence": {"storage_type": storage}}) - return sga - - -def _check_path(sga: EssenceFS, path: str): - left_sep = path.replace("\\", "/") - right_sep = path.replace("/", "\\") - - info = sga.getinfo(path) - l_info = sga.getinfo(left_sep) - r_info = sga.getinfo(right_sep) - - assert info == l_info - assert l_info == r_info - - -def test_fix_39(): - with _generate_fake_osfs() as osfs: - sga = _pack_fake_osfs(osfs, "Test Archive") - for root, folders, files in sga.walk(): - _check_path(sga, root) - # for folder in folders - # folders are checked when we walk into them - for file in files: - full_path = fs.path.join(root, file.name) - _check_path(sga, full_path) diff --git a/tests/issues/test_issue_40.py b/tests/issues/test_issue_40.py index 4a39693..518bc62 100644 --- a/tests/issues/test_issue_40.py +++ b/tests/issues/test_issue_40.py @@ -1,34 +1,49 @@ -r""" -TestCases for more explicit errors when providing invalid path arguments. +r"""TestCases for more explicit errors when providing invalid path arguments. + https://github.com/MAK-Relic-Tool/Issue-Tracker/issues/40 """ -import io + +import os +from argparse import ArgumentError from typing import Iterable -from contextlib import redirect_stderr import pytest + +def _ArgumentError(name, message): + _ = ArgumentError(None, message) + _.argument_name = name + return _ + + _ARGS = [ ( ["sga", "unpack", "nonexistant.sga", "."], - "error: argument src_sga: The given path 'nonexistant.sga' does not exist!", + _ArgumentError( + "src_sga", + f"The given path '{os.path.abspath('nonexistant.sga')}' does not exist!", + ), ), ( ["sga", "unpack", __file__, __file__], - rf"error: argument out_dir: The given path '{__file__}' is not a directory!", + _ArgumentError("out_dir", f"The given path '{__file__}' is not a directory!"), ), ] -@pytest.mark.parametrize(["args", "msg"], _ARGS) -def test_argparse_error(args: Iterable[str], msg: str): - from relic.core.cli import cli_root +@pytest.mark.parametrize(["args", "expected"], _ARGS) +def test_argparse_error(args: Iterable[str], expected: ArgumentError): + from relic.core import CLI - with io.StringIO() as f: - with redirect_stderr(f): - status = cli_root.run_with(*args) - assert status == 2 - f.seek(0) - err = f.read() - print(err) - assert msg in err + try: + _ = CLI.run_with(*args) + except ArgumentError as arg_err: + assert arg_err.argument_name == expected.argument_name, ( + arg_err.argument_name, + expected.argument_name, + ) + assert arg_err.message == expected.message, (arg_err.message, expected.message) + except Exception as exc: + assert False, str(exc) + else: + assert False, str("Did not error!") diff --git a/tests/test_cli.py b/tests/test_cli.py index 2586021..c6a68d9 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -10,35 +10,46 @@ class CommandTests: def test_run(self, args: Sequence[str], output: str, exit_code: int): - _args = ["relic", *args] - cmd = subprocess.run(_args, capture_output=True, text=True) + cmd = subprocess.run(args, capture_output=True, text=True) result = cmd.stdout status = cmd.returncode print(f"'{result}'") # Visual Aid for Debugging - assert output in result assert status == exit_code + assert output in result def test_run_with(self, args: Sequence[str], output: str, exit_code: int): - from relic.core.cli import cli_root + from relic.core.cli import CLI with io.StringIO() as f: with redirect_stdout(f): - status = cli_root.run_with(*args) + status = CLI.run_with(*args) f.seek(0) result = f.read() print(f"'{result}'") # Visual Aid for Debugging - assert output in result assert status == exit_code + assert output in result -_SGA_HELP = ["sga", "-h"], """usage: relic sga [-h] {info,pack,repack,unpack} ...""", 0 -_SGA_PACK_HELP = ["sga", "pack", "-h"], """usage: relic sga pack [-h] {} ...""", 0 -_SGA_UNPACK_HELP = ["sga", "unpack", "-h"], """usage: relic sga unpack [-h]""", 0 - -_TESTS = [_SGA_HELP, _SGA_PACK_HELP, _SGA_UNPACK_HELP] +_SGA_HELP = ( + ["relic", "sga", "-h"], + """usage: relic sga [-h] {info,pack,unpack} ...""", + 0, +) +_SGA_PACK_HELP = ( + ["relic", "sga", "pack", "-h"], + """usage: relic sga pack [-h] {} ...""", + 0, +) +_SGA_UNPACK_HELP = ( + ["relic", "sga", "unpack", "-h"], + """usage: relic sga unpack [-h]""", + 0, +) +_SGA_INFO_HELP = ["relic", "sga", "info", "-h"], """usage: relic sga info [-h]""", 0 + +_TESTS = [_SGA_HELP, _SGA_PACK_HELP, _SGA_UNPACK_HELP, _SGA_INFO_HELP] _TEST_IDS = [" ".join(_[0]) for _ in _TESTS] @pytest.mark.parametrize(["args", "output", "exit_code"], _TESTS, ids=_TEST_IDS) -class TestRelicSgaCli(CommandTests): - ... +class TestRelicSgaCli(CommandTests): ... diff --git a/tests/test_filesystem.py b/tests/test_filesystem.py deleted file mode 100644 index df17a4d..0000000 --- a/tests/test_filesystem.py +++ /dev/null @@ -1,27 +0,0 @@ -import unittest - -import fs -from fs.test import FSTestCases - -from relic.sga.core.filesystem import EssenceFS, _EssenceDriveFS - - -class TestEssenceFS(FSTestCases, unittest.TestCase): - def make_fs(self): - essence_fs = EssenceFS() - # EssenceFS shouldn't be writeable by default; - # being an emulator for Window's hard drives. - # With no 'drive' installed, there's nothing to write to! - essence_fs.add_fs("data", _EssenceDriveFS("data", "test"), True) - return essence_fs - - -class TestEssenceDriveFS(FSTestCases, unittest.TestCase): - def make_fs(self): - return _EssenceDriveFS("data", "test") - - -class TestOpener: - def test_open_fs(self): - with fs.open_fs("sga://", create=True) as sga: - pass diff --git a/tests/test_regressions.py b/tests/test_regressions.py deleted file mode 100644 index ee09de9..0000000 --- a/tests/test_regressions.py +++ /dev/null @@ -1,88 +0,0 @@ -""" -Tests which ensures releases do not break backwards-compatibility by failing to expose modules/names -""" - -import importlib -from typing import List, Iterable, Tuple - -import pytest - -core__all__ = [ - "definitions", - "errors", - "filesystem", - "protocols", - "serialization", -] - - -@pytest.mark.parametrize("submodule", core__all__) -def test_import_module(submodule: str): - try: - importlib.import_module(f"relic.sga.core.{submodule}") - except ImportError: - raise AssertionError(f"{submodule} is no longer exposed!") - - -definitions__all__ = ["MagicWord", "Version", "StorageType", "VerificationType"] -errors__all__ = [ - "VersionMismatchError", - "MD5MismatchError", - "VersionNotSupportedError", - "DecompressedSizeMismatch", -] -fs__all__ = [ - "ESSENCE_NAMESPACE", - "EssenceFSHandler", - "EssenceFSFactory", - "_EssenceFile", - "_EssenceDirEntry", - "_EssenceDriveFS", - "EssenceFS", - "registry", - "EssenceFSOpener", -] -protocols__all__ = ["T", "StreamSerializer"] -serialization__all__ = [ - "TocBlock", - "TocHeaderSerializer", - "DriveDef", - "DriveDefSerializer", - "FolderDef", - "FolderDefSerializer", - "MetaBlock", - "TMetaBlock", - "TTocMetaBlock", - "FileDef", - "TFileDef", - "AssembleFileMetaFunc", - "DisassembleFileMetaFunc", - "AssembleMetaFunc", - "DisassembleMetaFunc", - "TOCSerializationInfo", - "FSAssembler", - "FSDisassembler", - "Md5ChecksumHelper", - "EssenceFSSerializer", - "FileLazyInfo", - "ArchivePtrs", -] - - -def module_imports_helper(submodule: str, all: List[str]) -> Iterable[Tuple[str, str]]: - return zip([submodule] * len(all), all) - - -@pytest.mark.parametrize( - ["submodule", "attribute"], - [ - *module_imports_helper("errors", errors__all__), - *module_imports_helper("definitions", definitions__all__), - *module_imports_helper("filesystem", fs__all__), - *module_imports_helper("protocols", protocols__all__), - *module_imports_helper("serialization", serialization__all__), - ], -) -def test_module_imports(submodule: str, attribute: str): - module = importlib.import_module(f"relic.sga.core.{submodule}") - _ = getattr(module, attribute) diff --git a/tests/unittest/test_hashtools.py b/tests/unittest/test_hashtools.py new file mode 100644 index 0000000..b58b1c4 --- /dev/null +++ b/tests/unittest/test_hashtools.py @@ -0,0 +1,262 @@ +import dataclasses +from dataclasses import dataclass +from hashlib import md5 as calc_md5, sha1 as calc_sha1 +from io import BytesIO +from typing import Optional, Type, Iterable, Union, Callable +from zlib import crc32 as calc_crc32 + +import pytest + +from relic.sga.core.errors import ( + HashMismatchError, + Crc32MismatchError, + Sha1MismatchError, + Md5MismatchError, +) +from relic.sga.core.hashtools import Hashable, _T, Hasher, md5, sha1, crc32 + + +@dataclass +class HashArgs: + stream: Hashable + start: Optional[int] = None + size: Optional[int] = None + eigen: Optional[_T] = None + err_cls: Optional[Type[HashMismatchError]] = None + name: Optional[str] = None + + @property + def hash_kwargs(self): + hash_kwargs = dataclasses.asdict(self) + del hash_kwargs["err_cls"] + del hash_kwargs["name"] + return hash_kwargs + + @property + def validate_kwargs(self): + return dataclasses.asdict(self) + + +def _invert_hash(t: Union[bytes, int]): + if isinstance(t, int): + return t ^ 0xFFFFFFFF + return bytes(v ^ 255 for v in t) + + +def _hasher_from_input( + items: Iterable[HashArgs], + parse_eigen: Callable[[Optional[int]], Optional[Union[int, bytes]]], + calc_hash: Callable[[bytes, Union[int, bytes]], Union[bytes, int]], + hasher: Hasher, + err_cls: Type[HashMismatchError], +): + TF = [True, False] + for args in items: + buffer: bytes = args.stream + if args.start is not None: + buffer = buffer[args.start :] + if args.size is not None: + buffer = buffer[: args.size] + + eigen = parse_eigen(args.eigen) + calced = calc_hash(buffer, eigen) + for use_bytes in TF: + for should_pass in TF: + expected = _invert_hash(calced) if not should_pass else calced + + hash_args = HashArgs( + BytesIO(args.stream) if not use_bytes else args.stream, + args.start, + args.size, + eigen, + ) + + yield hasher, hash_args, expected, should_pass, err_cls + + +def _md5_from_input(items: Iterable[HashArgs]): + def _eigen(e: Optional[int]) -> Optional[bytes]: + if e is not None: + return e.to_bytes(4, "little", signed=False) + else: + return None + + def _hash(b: bytes, e: Optional[bytes]) -> bytes: + hasher = calc_md5(usedforsecurity=False) + if e is not None: + hasher.update(e) + hasher.update(b) + return hasher.digest() + + yield from _hasher_from_input(items, _eigen, _hash, md5, Md5MismatchError) + + +def _sha1_from_input(items: Iterable[HashArgs]): + def _eigen(e: Optional[int]) -> Optional[bytes]: + if e is not None: + return e.to_bytes(4, "little", signed=False) + else: + return None + + def _hash(b: bytes, e: Optional[bytes]) -> bytes: + hasher = calc_sha1(usedforsecurity=False) + if e is not None: + hasher.update(e) + hasher.update(b) + return hasher.digest() + + yield from _hasher_from_input(items, _eigen, _hash, sha1, Sha1MismatchError) + + +def _crc32_from_input(items: Iterable[HashArgs]): + def _eigen(e: Optional[int]) -> Optional[int]: + return e + + def _hash(b: bytes, e: Optional[int]) -> int: + if e is None: + return calc_crc32(b) + else: + return calc_crc32(b, e) + + yield from _hasher_from_input(items, _eigen, _hash, crc32, Crc32MismatchError) + + +def _convert_input_2_tests(items: Iterable[HashArgs]): + items = list(items) + yield from _md5_from_input(items) + yield from _sha1_from_input(items) + yield from _crc32_from_input(items) + + +_input_args = [ + (HashArgs(b"Alec Baldwin")), + (HashArgs(b"Ben Afleck", start=4)), + (HashArgs(b"Chris Pratt", size=5)), + (HashArgs(b"Donald Glover", start=7, size=5)), + (HashArgs(b"Eric Andre", eigen=5202012)), + (HashArgs(b"Fraser, Branden", start=8, eigen=5041999)), + (HashArgs(b"Gene Simmons", size=4, eigen=2181974)), + (HashArgs(b"Hulk Hogan", start=5, size=3, eigen=1012012)), +] + +_TEST_DATA = list(_convert_input_2_tests(_input_args)) + +_HASHER_TESTS = list( + (hasher, args, buffer) + for (hasher, args, buffer, passing, _) in _TEST_DATA + if passing +) +_HASHER_TEST_IDS = list(f"{_[0].__name__} ~ {_[1]} ~ {_[2]}" for _ in _HASHER_TESTS) + +_HASHER_CHECK_TESTS = list( + (hasher, args, buffer, passing) for (hasher, args, buffer, passing, _) in _TEST_DATA +) +_HASHER_CHECK_TEST_IDS = list( + f"{_[0].__name__} ~ {_[1]} ~ {_[2]} ~ {'Match' if _[3] else 'MisMatch'}" + for _ in _HASHER_CHECK_TESTS +) + +_HASHER_VALIDATE_ERR_TESTS = list( + (hasher, args, buffer, err_cls) + for (hasher, args, buffer, passing, err_cls) in _TEST_DATA + if not passing +) +_HASHER_VALIDATE_ERR_IDS = list( + f"{_[0].__name__} ~ {_[1]} ~ {_[2]} ~ {_[3]}" for _ in _HASHER_VALIDATE_ERR_TESTS +) + +_HASHER_VALIDATE_ERR_NAME_TESTS = list( + (hasher, args, buffer, hasher._hasher_name) + for (hasher, args, buffer, passing, _) in _TEST_DATA + if not passing +) +_HASHER_VALIDATE_ERR_NAME_IDS = list( + f"{_[0].__name__} ~ {_[1]} ~ {_[2]} ~ {_[3]}" + for _ in _HASHER_VALIDATE_ERR_NAME_TESTS +) + + +@pytest.mark.parametrize( + ["hasher", "args", "expected"], _HASHER_TESTS, ids=_HASHER_TEST_IDS +) +def test_hasher_hash(hasher: Hasher, args: HashArgs, expected: _T): + if hasattr(args.stream, "seek"): + args.stream.seek(0) + result = hasher.hash(**args.hash_kwargs) + assert result == expected + + +@pytest.mark.parametrize( + ["hasher", "args", "expected", "expected_result"], + _HASHER_CHECK_TESTS, + ids=_HASHER_CHECK_TEST_IDS, +) +def test_hasher_check( + hasher: Hasher, args: HashArgs, expected: _T, expected_result: bool +): + if hasattr(args.stream, "seek"): + args.stream.seek(0) + + result = hasher.check(**args.hash_kwargs, expected=expected) + assert result == expected_result + + +@pytest.mark.parametrize( + ["hasher", "args", "expected", "expected_result"], + _HASHER_CHECK_TESTS, + ids=_HASHER_CHECK_TEST_IDS, +) +def test_hasher_validate( + hasher: Hasher, args: HashArgs, expected: _T, expected_result: bool +): + if hasattr(args.stream, "seek"): + args.stream.seek(0) + + try: + hasher.validate(**args.hash_kwargs, expected=expected) + except HashMismatchError: + assert expected_result is False + else: + assert expected_result is True + + +@pytest.mark.parametrize( + ["hasher", "args", "expected_failure", "expected_err_cls"], + _HASHER_VALIDATE_ERR_TESTS, + ids=_HASHER_VALIDATE_ERR_IDS, +) +def test_hasher_validate_err_cls( + hasher: Hasher, + args: HashArgs, + expected_failure: _T, + expected_err_cls: Type[HashMismatchError], +): + if hasattr(args.stream, "seek"): + args.stream.seek(0) + + try: + hasher.validate(**args.hash_kwargs, expected=expected_failure) + except expected_err_cls: + pass + except Exception as e: + assert isinstance(e, expected_err_cls) + else: + pytest.fail("Hasher did not raise an error!") + + +@pytest.mark.parametrize( + ["hasher", "args", "expected_failure", "expected_name"], + _HASHER_VALIDATE_ERR_NAME_TESTS, + ids=_HASHER_VALIDATE_ERR_NAME_IDS, +) +def test_hasher_validate_err_name( + hasher: Hasher, args: HashArgs, expected_failure: _T, expected_name: str +): + if hasattr(args.stream, "seek"): + args.stream.seek(0) + try: + hasher.validate(**args.validate_kwargs, expected=expected_failure) + except HashMismatchError as e: + assert e.name is expected_name + else: + pytest.fail("Hasher did not raise a HashMismatchError!")