Skip to content

Commit

Permalink
MyPy & Black fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
ModernMAK committed Oct 15, 2023
1 parent 0c3fc9f commit a6ba329
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 28 deletions.
45 changes: 28 additions & 17 deletions src/relic/sga/v2/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@
import os
from argparse import ArgumentParser, Namespace
from pathlib import Path
from typing import Optional, Dict
from typing import Optional, Dict, Any

from relic.sga.core.filesystem import EssenceFS
from relic.core.cli import CliPlugin, _SubParsersAction
from relic.sga.v2.serialization import essence_fs_serializer as v2_serializer
from relic.sga.core import StorageType
from relic.sga.core.definitions import StorageType

_CHUNK_SIZE = 1024 * 1024 * 4 # 4 MiB


def _resolve_storage_type(s: Optional[str]):
def _resolve_storage_type(s: Optional[str]) -> StorageType:
_HELPER = {
"STORE": StorageType.STORE,
"BUFFER": StorageType.BUFFER_COMPRESS,
"STREAM": StorageType.STREAM_COMPRESS
"STREAM": StorageType.STREAM_COMPRESS,
}
if s is None:
return StorageType.STORE
Expand All @@ -29,8 +29,9 @@ def _resolve_storage_type(s: Optional[str]):


class RelicSgaPackV2Cli(CliPlugin):

def _create_parser(self, command_group: Optional[_SubParsersAction] = None) -> ArgumentParser:
def _create_parser(
self, command_group: Optional[_SubParsersAction] = None
) -> ArgumentParser:
parser: ArgumentParser
if command_group is None:
parser = ArgumentParser("v2")
Expand All @@ -44,33 +45,37 @@ def _create_parser(self, command_group: Optional[_SubParsersAction] = None) -> A
return parser

def command(self, ns: Namespace) -> Optional[int]:

# Extract Args
working_dir: str = ns.src_dir
outfile: str = ns.out_sga
config_file: str = ns.config_file
with open(config_file) as json_h:
config: Dict = json.load(json_h)
config: Dict[str, Any] = json.load(json_h)

# Execute Command
print(f"Packing `{outfile}`")

# Create 'SGA'
sga = EssenceFS()
name = os.path.basename(outfile)
sga.setmeta({
"name": name, # Specify name of archive
"header_md5": "0" * 16, # Must be present due to a bug, recalculated when packed
"file_md5": "0" * 16, # Must be present due to a bug, recalculated when packed
}, "essence")
sga.setmeta(
{
"name": name, # Specify name of archive
"header_md5": "0"
* 16, # Must be present due to a bug, recalculated when packed
"file_md5": "0"
* 16, # Must be present due to a bug, recalculated when packed
},
"essence",
)

# Walk Drives
for alias, drive in config.items():
print(f"\tPacking Drive `{alias}`")
sga_drive = None # sga.create_drive(alias)

# CWD for drive operations
drive_cwd = os.path.join(working_dir, drive.get("path",""))
drive_cwd = os.path.join(working_dir, drive.get("path", ""))

# Try to pack files
print(f"\tScanning files in `{drive_cwd}`")
Expand Down Expand Up @@ -103,9 +108,13 @@ def command(self, ns: Namespace) -> Optional[int]:

# match found, copy file to FS
# EssenceFS is unfortunately,
print(f"\t\tPacking File `{os.path.relpath(full_path, drive_cwd)}` w/ `{storage.name}`")
print(
f"\t\tPacking File `{os.path.relpath(full_path, drive_cwd)}` w/ `{storage.name}`"
)
frontier.add(full_path)
if sga_drive is None: # Lazily create drive, to avoid empty drives from being created
if (
sga_drive is None
): # Lazily create drive, to avoid empty drives from being created
sga_drive = sga.create_drive(alias)

with open(full_path, "rb") as unpacked_file:
Expand All @@ -117,7 +126,9 @@ def command(self, ns: Namespace) -> Optional[int]:
if len(buffer) == 0:
break
packed_file.write(buffer)
sga_drive.setinfo(path_in_sga, {"essence": {"storage_type": storage}})
sga_drive.setinfo(
path_in_sga, {"essence": {"storage_type": storage}}
)

print(f"Writing `{outfile}` to disk")
# Write to binary file:
Expand Down
27 changes: 17 additions & 10 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import io
import os.path
import subprocess

# Local testing requires running `pip install -e "."`
import tempfile
from contextlib import redirect_stdout
Expand All @@ -25,6 +26,7 @@ def test_run(self, args: Sequence[str], output: str, exit_code: int):

def test_run_with(self, args: Sequence[str], output: str, exit_code: int):
from relic.core.cli import cli_root

with io.StringIO() as f:
with redirect_stdout(f):
status = cli_root.run_with(*args)
Expand All @@ -36,10 +38,14 @@ def test_run_with(self, args: Sequence[str], output: str, exit_code: int):


_SGA_PACK_HELP = ["sga", "pack", "-h"], """usage: relic sga pack [-h] {v2} ...""", 0
_SGA_PACK_v2_HELP = ["sga", "pack", "v2", "-h"], """sage: relic sga pack v2 [-h] src_dir out_sga config_file""", 0
_SGA_PACK_v2_HELP = (
["sga", "pack", "v2", "-h"],
"""sage: relic sga pack v2 [-h] src_dir out_sga config_file""",
0,
)

_TESTS = [_SGA_PACK_HELP, _SGA_PACK_v2_HELP]
_TEST_IDS = [' '.join(_[0]) for _ in _TESTS]
_TEST_IDS = [" ".join(_[0]) for _ in _TESTS]


@pytest.mark.parametrize(["args", "output", "exit_code"], _TESTS, ids=_TEST_IDS)
Expand Down Expand Up @@ -73,39 +79,40 @@ def test_cli_unpack_pack_one_to_one(src: str):
}"""

from relic.core.cli import cli_root as cli

with tempfile.TemporaryDirectory() as temp_dir:
cli.run_with("sga", "unpack", src, temp_dir)
cfg_file_name = None
repacked_file_name = None
try:
with tempfile.NamedTemporaryFile("w+",delete=False) as config_file:
with tempfile.NamedTemporaryFile("w+", delete=False) as config_file:
config_file.write(cfg)
cfg_file_name = config_file.name
with tempfile.NamedTemporaryFile("rb",delete=False) as repacked:
with tempfile.NamedTemporaryFile("rb", delete=False) as repacked:
repacked_file_name = repacked.name

status = cli.run_with("sga", "pack", "v2", temp_dir, repacked_file_name, cfg_file_name)
status = cli.run_with(
"sga", "pack", "v2", temp_dir, repacked_file_name, cfg_file_name
)
assert status == 0

def check_against(src: FS, dest: FS):
for root, _, files in src.walk():
assert dest.exists(root)
with dest.opendir(root) as root_folder:
for file in files:
file:Info
file: Info
assert root_folder.exists(file.name)

for file in files:
file: Info
path = fs.path.join(root,file.name)
path = fs.path.join(root, file.name)
with src.openbin(path) as src_file:
with dest.openbin(path) as dest_file:
src_data = src_file.read()
dest_data = dest_file.read()
assert dest_data == src_data



with fs.open_fs(f"sga://{src}") as src_sga:
with fs.open_fs(f"sga://{repacked_file_name}") as dst_sga:
check_against(dst_sga, src_sga)
Expand All @@ -118,4 +125,4 @@ def check_against(src: FS, dest: FS):
try:
os.unlink(repacked_file_name)
except:
...
...
2 changes: 1 addition & 1 deletion tests/test_opener.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def test_read(self, v2_file_path):
"name": "SampleSGA-v2",
"file_md5": "89ff85a0c700eceab446cc725784e299",
"header_md5": "1d0571c708f9957815b8b660ceafe631",
"version":{"major":2,"minor":0}
"version": {"major": 2, "minor": 0},
}
]

Expand Down

0 comments on commit a6ba329

Please sign in to comment.