diff --git a/src/relic/sga/v2/cli.py b/src/relic/sga/v2/cli.py index 6995cc2..19927ea 100644 --- a/src/relic/sga/v2/cli.py +++ b/src/relic/sga/v2/cli.py @@ -2,21 +2,21 @@ import os from argparse import ArgumentParser, Namespace from pathlib import Path -from typing import Optional, Dict +from typing import Optional, Dict, Any from relic.sga.core.filesystem import EssenceFS from relic.core.cli import CliPlugin, _SubParsersAction from relic.sga.v2.serialization import essence_fs_serializer as v2_serializer -from relic.sga.core import StorageType +from relic.sga.core.definitions import StorageType _CHUNK_SIZE = 1024 * 1024 * 4 # 4 MiB -def _resolve_storage_type(s: Optional[str]): +def _resolve_storage_type(s: Optional[str]) -> StorageType: _HELPER = { "STORE": StorageType.STORE, "BUFFER": StorageType.BUFFER_COMPRESS, - "STREAM": StorageType.STREAM_COMPRESS + "STREAM": StorageType.STREAM_COMPRESS, } if s is None: return StorageType.STORE @@ -29,8 +29,9 @@ def _resolve_storage_type(s: Optional[str]): class RelicSgaPackV2Cli(CliPlugin): - - def _create_parser(self, command_group: Optional[_SubParsersAction] = None) -> ArgumentParser: + def _create_parser( + self, command_group: Optional[_SubParsersAction] = None + ) -> ArgumentParser: parser: ArgumentParser if command_group is None: parser = ArgumentParser("v2") @@ -44,13 +45,12 @@ def _create_parser(self, command_group: Optional[_SubParsersAction] = None) -> A return parser def command(self, ns: Namespace) -> Optional[int]: - # Extract Args working_dir: str = ns.src_dir outfile: str = ns.out_sga config_file: str = ns.config_file with open(config_file) as json_h: - config: Dict = json.load(json_h) + config: Dict[str, Any] = json.load(json_h) # Execute Command print(f"Packing `{outfile}`") @@ -58,11 +58,16 @@ def command(self, ns: Namespace) -> Optional[int]: # Create 'SGA' sga = EssenceFS() name = os.path.basename(outfile) - sga.setmeta({ - "name": name, # Specify name of archive - "header_md5": "0" * 16, # Must be present due to a bug, recalculated when packed - "file_md5": "0" * 16, # Must be present due to a bug, recalculated when packed - }, "essence") + sga.setmeta( + { + "name": name, # Specify name of archive + "header_md5": "0" + * 16, # Must be present due to a bug, recalculated when packed + "file_md5": "0" + * 16, # Must be present due to a bug, recalculated when packed + }, + "essence", + ) # Walk Drives for alias, drive in config.items(): @@ -70,7 +75,7 @@ def command(self, ns: Namespace) -> Optional[int]: sga_drive = None # sga.create_drive(alias) # CWD for drive operations - drive_cwd = os.path.join(working_dir, drive.get("path","")) + drive_cwd = os.path.join(working_dir, drive.get("path", "")) # Try to pack files print(f"\tScanning files in `{drive_cwd}`") @@ -103,9 +108,13 @@ def command(self, ns: Namespace) -> Optional[int]: # match found, copy file to FS # EssenceFS is unfortunately, - print(f"\t\tPacking File `{os.path.relpath(full_path, drive_cwd)}` w/ `{storage.name}`") + print( + f"\t\tPacking File `{os.path.relpath(full_path, drive_cwd)}` w/ `{storage.name}`" + ) frontier.add(full_path) - if sga_drive is None: # Lazily create drive, to avoid empty drives from being created + if ( + sga_drive is None + ): # Lazily create drive, to avoid empty drives from being created sga_drive = sga.create_drive(alias) with open(full_path, "rb") as unpacked_file: @@ -117,7 +126,9 @@ def command(self, ns: Namespace) -> Optional[int]: if len(buffer) == 0: break packed_file.write(buffer) - sga_drive.setinfo(path_in_sga, {"essence": {"storage_type": storage}}) + sga_drive.setinfo( + path_in_sga, {"essence": {"storage_type": storage}} + ) print(f"Writing `{outfile}` to disk") # Write to binary file: diff --git a/tests/test_cli.py b/tests/test_cli.py index 1ffc450..a206c10 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,6 +1,7 @@ import io import os.path import subprocess + # Local testing requires running `pip install -e "."` import tempfile from contextlib import redirect_stdout @@ -25,6 +26,7 @@ def test_run(self, args: Sequence[str], output: str, exit_code: int): def test_run_with(self, args: Sequence[str], output: str, exit_code: int): from relic.core.cli import cli_root + with io.StringIO() as f: with redirect_stdout(f): status = cli_root.run_with(*args) @@ -36,10 +38,14 @@ def test_run_with(self, args: Sequence[str], output: str, exit_code: int): _SGA_PACK_HELP = ["sga", "pack", "-h"], """usage: relic sga pack [-h] {v2} ...""", 0 -_SGA_PACK_v2_HELP = ["sga", "pack", "v2", "-h"], """sage: relic sga pack v2 [-h] src_dir out_sga config_file""", 0 +_SGA_PACK_v2_HELP = ( + ["sga", "pack", "v2", "-h"], + """sage: relic sga pack v2 [-h] src_dir out_sga config_file""", + 0, +) _TESTS = [_SGA_PACK_HELP, _SGA_PACK_v2_HELP] -_TEST_IDS = [' '.join(_[0]) for _ in _TESTS] +_TEST_IDS = [" ".join(_[0]) for _ in _TESTS] @pytest.mark.parametrize(["args", "output", "exit_code"], _TESTS, ids=_TEST_IDS) @@ -73,18 +79,21 @@ def test_cli_unpack_pack_one_to_one(src: str): }""" from relic.core.cli import cli_root as cli + with tempfile.TemporaryDirectory() as temp_dir: cli.run_with("sga", "unpack", src, temp_dir) cfg_file_name = None repacked_file_name = None try: - with tempfile.NamedTemporaryFile("w+",delete=False) as config_file: + with tempfile.NamedTemporaryFile("w+", delete=False) as config_file: config_file.write(cfg) cfg_file_name = config_file.name - with tempfile.NamedTemporaryFile("rb",delete=False) as repacked: + with tempfile.NamedTemporaryFile("rb", delete=False) as repacked: repacked_file_name = repacked.name - status = cli.run_with("sga", "pack", "v2", temp_dir, repacked_file_name, cfg_file_name) + status = cli.run_with( + "sga", "pack", "v2", temp_dir, repacked_file_name, cfg_file_name + ) assert status == 0 def check_against(src: FS, dest: FS): @@ -92,20 +101,18 @@ def check_against(src: FS, dest: FS): assert dest.exists(root) with dest.opendir(root) as root_folder: for file in files: - file:Info + file: Info assert root_folder.exists(file.name) for file in files: file: Info - path = fs.path.join(root,file.name) + path = fs.path.join(root, file.name) with src.openbin(path) as src_file: with dest.openbin(path) as dest_file: src_data = src_file.read() dest_data = dest_file.read() assert dest_data == src_data - - with fs.open_fs(f"sga://{src}") as src_sga: with fs.open_fs(f"sga://{repacked_file_name}") as dst_sga: check_against(dst_sga, src_sga) @@ -118,4 +125,4 @@ def check_against(src: FS, dest: FS): try: os.unlink(repacked_file_name) except: - ... \ No newline at end of file + ... diff --git a/tests/test_opener.py b/tests/test_opener.py index 7b10723..51947de 100644 --- a/tests/test_opener.py +++ b/tests/test_opener.py @@ -105,7 +105,7 @@ def test_read(self, v2_file_path): "name": "SampleSGA-v2", "file_md5": "89ff85a0c700eceab446cc725784e299", "header_md5": "1d0571c708f9957815b8b660ceafe631", - "version":{"major":2,"minor":0} + "version": {"major": 2, "minor": 0}, } ]