Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SGA Packing Patch #12

Merged
merged 5 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ relic.cli =
relic.cli.sga =
unpack = relic.sga.core.cli:RelicSgaUnpackCli
pack = relic.sga.core.cli:RelicSgaPackCli
repack = relic.sga.core.cli:RelicSgaRepackCli

[options.packages.find]
where = src
2 changes: 1 addition & 1 deletion src/relic/sga/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
"""
from relic.sga.core.definitions import Version, MagicWord, StorageType, VerificationType

__version__ = "1.1.1"
__version__ = "1.1.2"
72 changes: 69 additions & 3 deletions src/relic/sga/core/cli.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from __future__ import annotations

import argparse
import os.path
from argparse import ArgumentParser, Namespace
from typing import Optional
from typing import Optional, Callable

import fs.copy
from fs.base import FS
Expand All @@ -20,6 +22,42 @@ def _create_parser(
return command_group.add_parser("sga")


def _arg_exists_err(value: str) -> argparse.ArgumentTypeError:
return argparse.ArgumentTypeError(f"The given path '{value}' does not exist!")


def _get_dir_type_validator(exists: bool) -> Callable[[str], str]:
def _dir_type(path: str) -> str:
if not os.path.exists(path):
if exists:
raise _arg_exists_err(path)
else:
return path

if os.path.isdir(path):
return path

raise argparse.ArgumentTypeError(f"The given path '{path}' is not a directory!")

return _dir_type


def _get_file_type_validator(exists: Optional[bool]) -> Callable[[str], str]:
def _file_type(path: str) -> str:
if not os.path.exists(path):
if exists:
raise _arg_exists_err(path)
else:
return path

if os.path.isfile(path):
return path

raise argparse.ArgumentTypeError(f"The given path '{path}' is not a file!")

return _file_type


class RelicSgaUnpackCli(CliPlugin):
def _create_parser(
self, command_group: Optional[_SubParsersAction] = None
Expand All @@ -30,8 +68,16 @@ def _create_parser(
else:
parser = command_group.add_parser("unpack")

parser.add_argument("src_sga", type=str, help="Source SGA File")
parser.add_argument("out_dir", type=str, help="Output Directory")
parser.add_argument(
"src_sga",
type=_get_file_type_validator(exists=True),
help="Source SGA File",
)
parser.add_argument(
"out_dir",
type=_get_dir_type_validator(exists=False),
help="Output Directory",
)

return parser

Expand Down Expand Up @@ -64,3 +110,23 @@ def _create_parser(
# pack further delegates to version plugins

return parser


class RelicSgaRepackCli(CliPluginGroup):
"""An alternative to pack which 'repacks' an SGA. Intended for testing purposes."""

GROUP = "relic.cli.sga.repack"

def _create_parser(
self, command_group: Optional[_SubParsersAction] = None
) -> ArgumentParser:
parser: ArgumentParser
desc = "Debug Command; reads and repacks an SGA archive."
if command_group is None:
parser = ArgumentParser("repack", description=desc)
else:
parser = command_group.add_parser("repack", description=desc)

# pack further delegates to version plugins

return parser
61 changes: 24 additions & 37 deletions src/relic/sga/core/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,10 @@ def to_info(self, namespaces=None):


class _EssenceDriveFS(MemoryFS):
def __init__(self, alias: str) -> None:
def __init__(self, alias: str, name: str) -> None:
super().__init__()
self.alias = alias
self.name = name

def _make_dir_entry(
self, resource_type: ResourceType, name: str
Expand Down Expand Up @@ -292,6 +293,22 @@ def setinfo(self, path: str, info: Mapping[str, Mapping[str, object]]) -> None:
# if LAZY_NAMESPACE in info and not resource_entry.is_dir:
# lazy

def getinfo(
self, path, namespaces=None
): # type: (Text, Optional[Collection[Text]]) -> Info
info = super().getinfo(path, namespaces)

_path = self.validatepath(path)
if _path == "/" and (
namespaces is not None and ESSENCE_NAMESPACE in namespaces
):
raw_info = info.raw
essence_ns = dict(raw_info[ESSENCE_NAMESPACE])
essence_ns["alias"] = self.alias
essence_ns["name"] = self.name
info = Info(raw_info)
return info

def getessence(self, path: str) -> Info:
return self.getinfo(path, [ESSENCE_NAMESPACE])

Expand Down Expand Up @@ -324,9 +341,12 @@ def setmeta(self, meta: Dict[str, Any], namespace: str = "standard") -> None:
def getessence(self, path: str) -> Info:
return self.getinfo(path, [ESSENCE_NAMESPACE])

def create_drive(self, name: str) -> _EssenceDriveFS:
drive = _EssenceDriveFS(name)
self.add_fs(name, drive)
def create_drive(self, alias: str, name: str) -> _EssenceDriveFS:
drive = _EssenceDriveFS(alias, name)
first_drive = len([*self.iterate_fs()]) == 0
self.add_fs(
alias, drive, write=first_drive
) # TODO see if name would work here, using alias because that is what it originally was
return drive

def _delegate(self, path):
Expand All @@ -340,39 +360,6 @@ def _delegate(self, path):
return super()._delegate(path)


# if __name__ == "__main__":
# test_file = File("test.txt", b"This is a Test!", StorageType.STORE, False, None)
# test_folder = Folder("Test", [], [test_file])
# data_folders = [test_folder]
# data_files = []
# data_drive = Drive("data", "", data_folders, data_files)
# attr_drive = Drive("attr", "", [], [test_file])
# archive = Archive("Test", None, [data_drive, attr_drive])
#
# with SGAFS() as fs:
# data_fs = MemoryFS()
# fs.add_fs("data", data_fs)
# data_dir = data_fs.makedir("Test Data")
# with data_dir.open("sample_data.txt", "wb") as data_sample_text:
# data_sample_text.write(b"Sample Data Text!")
#
# attr_fs = MemoryFS()
# fs.add_fs("attr", attr_fs)
# attr_dir = attr_fs.makedir("Test Attr")
# with attr_dir.open("sample_attr.txt", "wb") as attr_sample_text:
# attr_sample_text.write(b"Sample Attr Text!")
#
# for root, folders, files in fs.walk():
# print(root, "\n\t", folders, "\n\t", files)
#
# for name, sub_fs in fs.iterate_fs():
# print(name)
# for root, folders, files in sub_fs.walk():
# print("\t", root, "\n\t\t", folders, "\n\t\t", files)
#
# print(fs.getinfo("/", ["basic", "access"]).raw)
# pass

__all__ = [
"ESSENCE_NAMESPACE",
"EssenceFSHandler",
Expand Down
70 changes: 53 additions & 17 deletions src/relic/sga/core/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
Iterable,
TypeVar,
Generic,
Type,
)

from fs.base import FS
Expand Down Expand Up @@ -220,6 +221,8 @@ def _write_data(data: bytes, stream: BinaryIO) -> int:


def _get_or_write_name(name: str, stream: BinaryIO, lookup: Dict[str, int]) -> int:
# Tools don't like "/" so coerce "/" to "\"
name = name.replace("/", "\\")
if name in lookup:
return lookup[name]

Expand Down Expand Up @@ -399,7 +402,7 @@ def assemble_drive(
drive_folder_index = drive_def.root_folder - folder_offset
drive_folder_def = local_folder_defs[drive_folder_index]

drive = essence_fs.create_drive(drive_def.alias)
drive = essence_fs.create_drive(drive_def.alias, drive_def.name)
self._assemble_container(
drive,
drive_folder_def.file_range,
Expand Down Expand Up @@ -508,41 +511,70 @@ def flatten_folder_collection(self, container_fs: FS, path: str) -> Tuple[int, i
self.flat_folders[subfolder_start:subfolder_end] = subfolder_defs
return subfolder_start, subfolder_end

def _flatten_folder_names(self, fs: FS, path: str) -> None:
folders = [file_info.name for file_info in fs.scandir("/") if file_info.is_dir]
files = [file_info.name for file_info in fs.scandir("/") if file_info.is_file]

if len(path) > 0 and path[0] == "/":
path = path[1:] # strip leading '/'
_get_or_write_name(path, self.name_stream, self.flat_names)

for fold_path in folders:
full_fold_path = f"{path}/{fold_path}"
full_fold_path = str(full_fold_path).split(":", 1)[
-1
] # Strip 'alias:' from path
if full_fold_path[0] == "/":
full_fold_path = full_fold_path[1:] # strip leading '/'
_get_or_write_name(full_fold_path, self.name_stream, self.flat_names)

for file_path in files:
_get_or_write_name(file_path, self.name_stream, self.flat_names)

def disassemble_folder(self, folder_fs: FS, path: str) -> FolderDef:
folder_def = FolderDef(None, None, None) # type: ignore

# Subfiles
subfile_range = self.flatten_file_collection(folder_fs)
# Subfolders
# # Since Relic typically uses the first folder as the root folder; I will try to preserve that parent folders come before their child folders
subfolder_range = self.flatten_folder_collection(folder_fs, path)
# Write Name
self._flatten_folder_names(folder_fs, path)

folder_name = str(path).split(":", 1)[-1] # Strip 'alias:' from path

if folder_name[0] == "/":
folder_name = folder_name[1:] # strip leading '/'

folder_def.name_pos = _get_or_write_name(
folder_name, self.name_stream, self.flat_names
)

# Subfolders
# # Since Relic typically uses the first folder as the root folder; I will try to preserve that parent folders come before their child folders
subfolder_range = self.flatten_folder_collection(folder_fs, path)

# Subfiles
subfile_range = self.flatten_file_collection(folder_fs)

folder_def.file_range = subfile_range
folder_def.folder_range = subfolder_range

return folder_def

def disassemble_drive(self, drive: _EssenceDriveFS, alias: str) -> DriveDef:
name = ""
def disassemble_drive(self, drive: _EssenceDriveFS) -> DriveDef:
name = drive.name
folder_name = ""
alias = drive.alias
drive_folder_def = FolderDef(None, None, None) # type: ignore
self._flatten_folder_names(drive, folder_name)

root_folder = len(self.flat_folders)
folder_start = len(self.flat_folders)
file_start = len(self.flat_files)
self.flat_folders.append(drive_folder_def)

# Name should be an empty string?
drive_folder_def.name_pos = _get_or_write_name(
name, self.name_stream, self.flat_names
folder_name, self.name_stream, self.flat_names
)
drive_folder_def.file_range = self.flatten_file_collection(drive)
drive_folder_def.folder_range = self.flatten_folder_collection(drive, name)
drive_folder_def.folder_range = self.flatten_folder_collection(
drive, folder_name
)

folder_end = len(self.flat_folders)
file_end = len(self.flat_files)
Expand Down Expand Up @@ -593,9 +625,9 @@ def write_toc(self) -> TocBlock:
)

def disassemble(self) -> TocBlock:
for name, drive_fs in self.fs.iterate_fs():
for _, drive_fs in self.fs.iterate_fs():
drive_fs = typing.cast(_EssenceDriveFS, drive_fs)
drive_def = self.disassemble_drive(drive_fs, name)
drive_def = self.disassemble_drive(drive_fs)
self.flat_drives.append(drive_def)

return self.write_toc()
Expand Down Expand Up @@ -740,6 +772,8 @@ def __init__(
gen_empty_meta: Callable[[], TMetaBlock],
finalize_meta: Callable[[BinaryIO, TMetaBlock], None],
meta2def: Callable[[Dict[str, object]], TFileDef],
assembler: Optional[Type[FSAssembler[TFileDef]]] = None,
disassembler: Optional[Type[FSDisassembler[TFileDef]]] = None,
):
self.version = version
self.meta_serializer = meta_serializer
Expand All @@ -752,6 +786,8 @@ def __init__(
self.gen_empty_meta = gen_empty_meta
self.finalize_meta = finalize_meta
self.meta2def = meta2def
self.assembler_type = assembler or FSAssembler
self.disassembler_type = disassembler or FSDisassembler

def read(self, stream: BinaryIO) -> EssenceFS:
# Magic & Version; skippable so that we can check for a valid file and read the version elsewhere
Expand All @@ -773,7 +809,7 @@ def read(self, stream: BinaryIO) -> EssenceFS:
name, metadata = meta_block.name, self.assemble_meta(
stream, meta_block, toc_meta_block
)
assembler: FSAssembler[TFileDef] = FSAssembler(
assembler: FSAssembler[TFileDef] = self.assembler_type(
stream=stream,
ptrs=meta_block.ptrs,
toc=toc_block,
Expand Down Expand Up @@ -806,7 +842,7 @@ def write(self, stream: BinaryIO, essence_fs: EssenceFS) -> int:
with BytesIO() as data_stream:
with BytesIO() as toc_stream:
with BytesIO() as name_stream:
disassembler = FSDisassembler(
disassembler: FSDisassembler[TFileDef] = self.disassembler_type(
fs=essence_fs,
toc_stream=toc_stream,
data_stream=data_stream,
Expand Down
7 changes: 4 additions & 3 deletions tests/issues/test_issue_39.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def _generate_fake_osfs() -> FS:


def _pack_fake_osfs(osfs: FS, name: str) -> EssenceFS:
# Create 'SGA'
# Create 'SGA' V2
sga = EssenceFS()
sga.setmeta(
{
Expand All @@ -57,13 +57,14 @@ def _pack_fake_osfs(osfs: FS, name: str) -> EssenceFS:
"essence",
)

alias = "test"
alias = "data"
name = "test data"
sga_drive = None # sga.create_drive(alias)
for path in osfs.walk.files():
if (
sga_drive is None
): # Lazily create drive, to avoid empty drives from being created
sga_drive = sga.create_drive(alias)
sga_drive = sga.create_drive(alias, name)

if "stream" in path:
storage = StorageType.STREAM_COMPRESS
Expand Down
Loading