From 38f451a01b7f58eadb72c0db9c264d6f235a090c Mon Sep 17 00:00:00 2001 From: Tester <66121918+TesterTesterov@users.noreply.github.com> Date: Wed, 22 Dec 2021 16:44:35 +0000 Subject: [PATCH] Upload project files First files. --- gui.py | 351 ++++++++++++++++++++++++++++++++++++++++++++++++++ main.py | 52 ++++++++ setup.py | 22 ++++ silky_arc.py | 230 +++++++++++++++++++++++++++++++++ silky_lzss.py | 276 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 931 insertions(+) create mode 100644 gui.py create mode 100644 main.py create mode 100644 setup.py create mode 100644 silky_arc.py create mode 100644 silky_lzss.py diff --git a/gui.py b/gui.py new file mode 100644 index 0000000..d589230 --- /dev/null +++ b/gui.py @@ -0,0 +1,351 @@ +# GUI for this tool. Nothing more, nothing less. + +import os +import ctypes +import locale +import threading +import tkinter as tk +from tkinter.filedialog import askopenfilename, askdirectory +from tkinter.messagebox import showinfo, showwarning, showerror +from silky_arc import SilkyArc + + +class SilkyArcToolGUI: + default_width = 300 + default_height = 300 + + possible_languages = ("eng", "rus") + + _strings_lib = { + 'eng': ( + "SilkyArcTool by Tester", + "English", + "Русский", + "...", + "Silky archive file (.arc):", + "Resources directory:", # 5 + "Filename choice", + "Directory choice", + "*.arc", + "Silky Archives", + "*", # 10 + "All files", + "Unpack archive", + "Pack archive", + "Warning", + "Archive name not stated.", # 15 + "Directory name not stated.", + "Error", + "Help", + ), + 'rus': ( + "SilkyArcTool от Tester-а", + "English", + "Русский", + "...", + "Архивный файл Silky (.arc):", + "Директория с ресурсами:", # 5 + "Выбор имени файла", + "Выбор директории", + "*.arc", + "Архивы Silky", + "*", # 10 + "Все файлы", + "Распаковать архив", + "Запаковать архив", + "Предупреждение", + "Имя архива не указано.", # 15 + "Имя директории не указано.", + "Ошибка", + "Справка", + ) + } + + programm_help = { + 'eng': """ +Dual languaged (rus+eng) GUI tool for packing and unpacking archives of Silky Engine. +This type of .arc archive also used in Ai6WIN engine (and possibly in Ai5WIN) by Silky. +If you want to work with Silky Engine's .mes scripts, use mesScriptAsseAndDisassembler instead. + +Why this tool was created, if there are other tools that can work with this type of archive? +The answer is simple: because there was no actually good enough tools. One tool can only extract the data, other -- only +pack, but without using original compression, that resulting in outrageous big output archives. My tool solves all the +issues -- not only it can extract archives, but also pack them from files, compressing it by algorithm (variation of +LZSS), extraction of which was implemented by Silky Engine. Through the tool has one problem -- it works quite slow, +especially for packing, so you may need to wait for some minutes (due to implementation compression algorithm on +Python). + +>>> Usage. + +1. Run the tool (main.py or .exe). +2. Print filename (with extension!!!) or choose it by clicking on button "...". +3. Print directory or choose it by clicking on button "...". +4. Print "0", if thou want to unpack, or "1", if thou want to pack. +5. Just wait until it done. +""", + 'rus': """ +Двуязычное средство (рус+англ) для распаковки и запаковки архивов Silky Engine. Сей вид архива также используется в +движке Ai6WIN (и, возможно, в Ai5WIN) от Silky. Ежели вам нужно работать со скриптами .mes Silky Engine, используйте +mesScriptAsseAndDisassembler. + +Почему же это средство было создано, ежель и так есть средства, что могут работать с сим типом архива? Ответ прост: ни +одно из тех существующих средств не является достаточно хорошим. Одно может только извлекать, другое -- только +запаковывать, однако ж без использования оригинального алгоритма сжатия, из-за чего архивы получаются большими сверх +всякой меры. Но моё средство исправляет эти проблемы: оно может как распаковывать данные, так и запаковывать их, причём +сжимая файлы так, как их хочет видеть Silky Engine (разновидностью LZSS). Единственная, однако, проблема у средства есть +-- несколько медленно работает оно, особенно при запаковке, так что может придётся прождать несколько минут (ввиду +реализации алгоритма сжатия на Python). + +>>> Использование. +1. Запустите пакет средств (main.py иль .exe). +2. Введите имя архива (с расширением!!!) или выберите его, нажав на кнопку "...". +3. Введите имя директории файлов или выберите его, нажав на кнопку "...". +4. Введите "0", коли распаковать желаете, али "1", коли запаковать желаете. +5. Ждите завершения. +""" + } + + def __init__(self, **kwargs): + """Arguments: width, height, language ("eng", "rus"), ...""" + self._width = kwargs.get("width", self.default_width) + self._height = kwargs.get("height", self.default_height) + self._language = kwargs.get("language", self.init_language()) + + self._root = tk.Tk() + self._root.lang_index = 0 + + self._arc_name = tk.StringVar() + self._arc_name.set("") + self._dir_name = tk.StringVar() + self._dir_name.set("") + + self._root.geometry('{}x{}+{}+{}'.format( + self._width, + self._height, + self._root.winfo_screenwidth() // 2 - self._width // 2, + self._root.winfo_screenheight() // 2 - self._height // 2)) + self._root["bg"] = 'grey' + + self._top_frame = tk.Frame(master=self._root, + background="white", + borderwidth=5, + relief=tk.RAISED) + self._bottom_frame = tk.Frame(master=self._root, + background="grey", + borderwidth=5, + relief=tk.SUNKEN) + + self._language_buttons = [] + for i in range(2): + new_button = tk.Button( + master=self._top_frame, + background="white", + font=("Helvetica", 14), + command=lambda i=i: self.translate(self.possible_languages[i])) + new_button.lang_index = i + 1 + self._language_buttons.append(new_button) + + self._entry_combinations = [] + entry_btns_commands = (self._choose_file, self._choose_dir) + entry_vars = (self._arc_name, self._dir_name) + for i in range(2): + new_list = [] + new_lbl = tk.Label(master=self._bottom_frame, + background="white", + font=("Helvetica", 14)) + new_lbl.lang_index = 4 + i + new_entry = tk.Entry(master=self._bottom_frame, + background="white", + borderwidth=2, + textvariable=entry_vars[i], + font=("Helvetica", 12), + relief=tk.SUNKEN) + new_btn = tk.Button(master=self._bottom_frame, + background="white", + command=entry_btns_commands[i], + font=("Helvetica", 14)) + new_btn.lang_index = 3 + new_list.append(new_lbl) + new_list.append(new_entry) + new_list.append(new_btn) + self._entry_combinations.append(new_list) + # Label, Entry, Button. + + self._action_btns = [] + actions = (self._unpack, self._pack) + for i in range(2): + new_btn = tk.Button( + master=self._bottom_frame, + background="white", + font=("Helvetica", 14), + command=actions[i], + ) + new_btn.lang_index = 12 + i + self._action_btns.append(new_btn) + + self._help_btn = tk.Button( + master=self._bottom_frame, + background="white", + font=("Helvetica", 14), + command=lambda: showinfo(self._strings_lib[self._language][18], self.programm_help[self._language]), + ) + self._help_btn.lang_index = 18 + + self._init_strings() + + for num, widget in enumerate(self._language_buttons): + widget.place(relx=0.5 * num, rely=0.0, relwidth=0.5, relheight=1.0) + self._top_frame.place(relx=0.0, rely=0.0, relwidth=1.0, relheight=0.2) + + for num, widget_list in enumerate(self._entry_combinations): + widget_list[0].place(relx=0.0, rely=0.2 * num, relwidth=1.0, relheight=0.1) + widget_list[1].place(relx=0.0, rely=0.1 + 0.2 * num, relwidth=0.8, relheight=0.1) + widget_list[2].place(relx=0.8, rely=0.1 + 0.2 * num, relwidth=0.2, relheight=0.1) + for num, widget in enumerate(self._action_btns): + widget.place(relx=0.0, rely=0.4 + 0.2 * num, relwidth=1.0, relheight=0.2) + self._help_btn.place(relx=0.0, rely=0.8, relwidth=1.0, relheight=0.2) + self._bottom_frame.place(relx=0.0, rely=0.2, relwidth=1.0, relheight=0.8) + + self._root.mainloop() + + # Getters. + + def get_width(self) -> int: + """Get the GUI's window width.""" + return self._width + + def get_height(self) -> int: + """Get the GUI's window height.""" + return self._height + + def get_language(self) -> str: + """Get the GUI's language.""" + return self._language + + # Technical methods for packing and unpacking. + + def _unpack(self) -> None: + """Unpack archive.""" + can_i = self.which_problems_i_have() + if can_i: + showwarning(title=can_i[0], message=can_i[1]) + return + unpacking_thread = threading.Thread(daemon=False, target=self._unpack_this_archive, + args=(self._arc_name.get(), self._dir_name.get())) + unpacking_thread.start() + + def _unpack_this_archive(self, arc_name, dir_name) -> None: + try: + self.lock_activity() + arc_archive = SilkyArc(arc_name, dir_name, verbose=True, integrity_check=False) + arc_archive.unpack() + except Exception as e: + showerror(self._strings_lib[self._language][17], str(e)) + finally: + self.unlock_activity() + + def _pack(self) -> None: + """Pack archive.""" + can_i = self.which_problems_i_have() + if can_i: + showwarning(title=can_i[0], message=can_i[1]) + return + packing_thread = threading.Thread(daemon=False, target=self._pack_this_archive, + args=(self._arc_name.get(), self._dir_name.get())) + packing_thread.start() + + def _pack_this_archive(self, arc_name, dir_name) -> None: + try: + self.lock_activity() + arc_archive = SilkyArc(arc_name, dir_name, verbose=True, integrity_check=False) + arc_archive.pack() + except Exception as e: + showerror(self._strings_lib[self._language][17], str(e)) + finally: + self.unlock_activity() + + # Technical methods for locking/unlocking activity. + + def lock_activity(self) -> None: + for btn in self._action_btns: + btn["state"] = tk.DISABLED + + def unlock_activity(self) -> None: + for btn in self._action_btns: + btn["state"] = tk.NORMAL + + # Technical methods for validation. + + def which_problems_i_have(self): + if self._arc_name.get() == "": + return (self._strings_lib[self._language][14], self._strings_lib[self._language][15]) + if self._dir_name.get() == "": + return (self._strings_lib[self._language][14], self._strings_lib[self._language][16]) + return None + + # Technical methods for files and dirs. + + def _choose_file(self) -> None: + """Choose the archive file.""" + file_types = ( + (self._strings_lib[self._language][9], self._strings_lib[self._language][8]), + (self._strings_lib[self._language][11], self._strings_lib[self._language][10]), + ) + file_name = askopenfilename(filetypes=file_types, initialdir=os.getcwd(), + title=self._strings_lib[self._language][6]) + file_name = os.path.normpath(file_name) + if file_name != "": + relpath = os.path.relpath(file_name, os.getcwd()) + if relpath.count(os.sep) < file_name.count(os.sep): + self._arc_name.set(relpath) + else: + self._arc_name.set(file_name) + + def _choose_dir(self) -> None: + """Choose the directory.""" + dir_name = askdirectory(initialdir=os.getcwd(), title=self._strings_lib[self._language][7]) + dir_name = os.path.normpath(dir_name) + if dir_name != "": + relpath = os.path.relpath(dir_name, os.getcwd()) + if relpath.count(os.sep) < dir_name.count(os.sep): + self._dir_name.set(relpath) + else: + self._dir_name.set(dir_name) + + # Language methods. + + def translate(self, language: str) -> None: + """Change the GUI language on "rus" or "eng".""" + if language not in self.possible_languages: + print("Error! Incorrect language!/Ошибка! Некорректный язык!") + return + self._language = language + self._init_strings() + + def _init_strings(self) -> None: + """Initialize strings of the GUI's widgets.""" + + # Quite an elegant solution I through off. Hope this works. + def _init_all_children_strings(widget): + for elem in widget.winfo_children(): + if hasattr(elem, "lang_index"): + elem["text"] = self._strings_lib[self._language][elem.lang_index] + if isinstance(elem, tk.Frame): + _init_all_children_strings(elem) + + self._root.title(self._strings_lib[self._language][self._root.lang_index]) + _init_all_children_strings(self._root) + + @staticmethod + def init_language() -> str: + """Get default language from the system. Works only on Windows.""" + lang_num = 0 + try: + windll = ctypes.windll.kernel32 + super_locale = locale.windows_locale[windll.GetUserDefaultUILanguage()][:2] + to_rus_locales = ('ru', 'uk', 'sr', 'bg', 'kk', 'be', 'hy', 'az') + if super_locale in to_rus_locales: + lang_num = 1 + except Exception: # Yes, yes, I know this is a bad practice, but it does not matter here. + pass + return SilkyArcToolGUI.possible_languages[lang_num] diff --git a/main.py b/main.py new file mode 100644 index 0000000..449d02e --- /dev/null +++ b/main.py @@ -0,0 +1,52 @@ +debug = False +from gui import SilkyArcToolGUI + + +def test(mode: str) -> None: + """Test the program. Pick one of the mods: +{"unpack", "pack", "decompress", "compress", "compare_dirs", "compare_files"}""" + import filecmp + import os + from silky_arc import SilkyArc + + arc_file = 'Script.arc' + true_arc_file = 'Script.arc_good' + folder = 'Script' + etalon = "_good" + good_folder = folder + etalon + comp_file = 'AP_SCENE_01.MES_i' + dec_file = "AP_SCENE_01.MES_o" + true_comp_file = 'AP_SCENE_01.MES_gi' + true_dec_file = "AP_SCENE_01.MES_go" + if mode == "unpack": + arc_archive = SilkyArc(arc_file, folder, integrity_check=True) + arc_archive.unpack() + elif mode == "pack": + arc_archive = SilkyArc(arc_file, folder, integrity_check=True) + arc_archive.pack() + elif mode == "decompress": + with open(comp_file, 'rb') as iner, open(dec_file, 'wb') as outer: + outer.write(SilkyArc.lzss_decompress(iner.read())) + print(filecmp.cmp(dec_file, true_dec_file, shallow=False)) + elif mode == "compress": + with open(comp_file, 'wb') as outer, open(dec_file, 'rb') as iner: + outer.write(SilkyArc.lzss_compress(iner.read())) + print(filecmp.cmp(comp_file, true_comp_file, shallow=False)) + elif mode == "compare_dirs": + for root, dirs, files in os.walk(folder): + for file in files: + if not filecmp.cmp(os.path.join(folder, file), os.path.join(good_folder, file), shallow=False): + print(file) + elif mode == "compare_files": + print("Равны ль файлы? Ответ: {}".format(filecmp.cmp(arc_file, true_arc_file))) + + +def main(): + new_gui = SilkyArcToolGUI() + + +if __name__ == '__main__': + if debug: + test("compare_files") + else: + main() diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..c3c887a --- /dev/null +++ b/setup.py @@ -0,0 +1,22 @@ +import sys +import cx_Freeze + +base = None + +if (sys.platform == 'win32'): + base = "Win32GUI" + + +executables = [cx_Freeze.Executable("main.py", + shortcut_name="SilkyArcTool", + shortcut_dir="SilkyArcTool", + base="Win32GUI")] + +cx_Freeze.setup( + name="SilkyArcTool", + version="1.0", + description="Dual languaged (rus+eng) tool for packing and unpacking archives of Silky Engine.\n" + "Двуязычное средство (рус+англ) для распаковки и запаковки архивов Silky Engine.", + options={"build_exe": {"packages": []}}, + executables=executables +) \ No newline at end of file diff --git a/silky_arc.py b/silky_arc.py new file mode 100644 index 0000000..c2934a2 --- /dev/null +++ b/silky_arc.py @@ -0,0 +1,230 @@ +import struct +import os +import tempfile +from silky_lzss import SilkyLZSS + +# 4 байта I: длина описания файлов. +# Далее идёт следующий сегмент (с повторениями): +##1 байт длина имени файла. +###Далее столько идёт длина имени файла. +###Далее 4 байта >I размер после lzss-компрессии. +###Далее 4 байта >I размер до lzss-компрессии. +###Далее 4 байта >I смещение относительно начала файла. +# Далее идёт сегмент, где сплошняком записываются сами файлы по параметрам в описании. + + +class SilkyArc: + name_encoding = "cp932" + + def __init__(self, arc: str, dir: str, verbose: bool = True, integrity_check: bool = False): + """Parameters: +arc: name of the archive file, +dir: name of the directory, +verbose: False (no progress messages) or True (enable progress messages).""" + self._arc_name = arc + self._dir_name = dir + + self._verbose = verbose + self._integrity_check = integrity_check + + self._names = [] + # 0 -- name length, 1 -- name, 2 -- compressed in lzss size, 3 -- size after lzss decompression, + # 4 -- offset from the beginning of file. + + # Основные пользовательские методы. + + def unpack(self) -> None: + if self._verbose: + print("=== === === UNPACKING OF {0} STARTS!/РАСПАКОВКА {0} НАЧАТА! === === ===".format(self._arc_name)) + self._names = self._unpack_names() + if self._verbose: + print("=== Header of {0} unpacked!/Заголовок {0} распакован! ===".format(self._arc_name)) + self._unpack_files() + if self._verbose: + print("=== Files of {0} unpacked!/Файлы {0} распакованы! ===".format(self._arc_name)) + print("=== === === UNPACKING OF {0} ENDS!/РАСПАКОВКА {0} ЗАКОНЧЕНА! === === ===".format(self._arc_name)) + + def pack(self) -> None: + if self._verbose: + print("=== === === PACKING OF {0} STARTS!/ЗАПАКОВКА {0} НАЧАТА! === === ===".format(self._arc_name)) + head_len, self._names, temp_file = self._pack_names_and_files() + if self._verbose: + print("=== Data of {0} initialized!/Данные {0} определены! ===".format(self._arc_name)) + try: + os.rename(self._arc_name, self._arc_name + '.bak') + except OSError: + pass + self._pack_files(head_len, temp_file) + if self._verbose: + print("=== Archive {0} successfully compiled!/Архив {0} успешно собран! ===".format(self._arc_name)) + print("=== === === PACKING OF {0} ENDS!/ЗАПАКОВКА {0} ЗАКОНЧЕНА! === === ===".format(self._arc_name)) + + # Special techmocal methods: Silky Engine's implementation of lzss. + + @staticmethod + def lzss_compress(byter: bytes) -> bytes: + """Compress bytes with lzss.""" + + dec = SilkyLZSS(byter) + new_bytes = dec.encode() + + return new_bytes + + @staticmethod + def lzss_decompress(byter: bytes) -> bytes: + """Decompress bytes with lzss.""" + + dec = SilkyLZSS(byter) + new_bytes = dec.decode() + + return new_bytes + + # Unpacking methods. + + def _read_header(self, filer) -> int: + return struct.unpack('I', filer.read(4))[0] + + def _unpack_names(self) -> list: + input_file = open(self._arc_name, 'rb') + limit = self._read_header(input_file) + array_name = [] + while (input_file.tell() < limit): + name_len = input_file.read(1)[0] + name = self.decrypt_name(input_file.read(name_len)) + prms = [] + # 0 - размер. + # 1 - размер после декомпрессии lzss. + # 2 - начальное смещение. + for i in range(3): + prms.append(struct.unpack('>I', input_file.read(4))[0]) + array_name.append([name_len, name, prms[0], prms[1], prms[2]]) + input_file.close() + return array_name + + def _unpack_files(self) -> None: + os.makedirs(self._dir_name, exist_ok=True) + input_file = open(self._arc_name, 'rb') + + for i in self._names: + this_file_name = os.path.normpath(os.path.join(self._dir_name, i[1])) + input_file.seek(i[4], 0) + new_file_bytes = input_file.read(i[2]) + if self._integrity_check: + try: + assert len(new_file_bytes) == i[2] + except AssertionError: + print("!!! File {0} compressed size is incorrect!/Размер сжатого файла {0} некорректен!". + format(i[1])) + new_file_bytes = self.lzss_decompress(new_file_bytes) + if self._integrity_check: + try: + assert len(new_file_bytes) == i[3] + except AssertionError: + print("!!! File {0} true size is incorrect!/Истинный размер файла {0} некорректен!". + format(i[1])) + with open(this_file_name, 'wb') as this_file: + this_file.write(new_file_bytes) + if self._verbose: + print("> File {0} successfully unpacked!/Файл {0} успешно распакован!".format(i[1])) + + input_file.close() + + # Packing methods. + + def _pack_names_and_files(self) -> tuple: + names = [] + sum = 0 + + temp_file = tempfile.TemporaryFile(mode="w+b") + + for root, dirs, files in os.walk(self._dir_name): + for filename in files: + name_array = [] + + rel_name = os.path.normpath(os.path.join(root, filename)) + end_name = rel_name + if rel_name.startswith(root + os.sep): + end_name = rel_name[len(root + os.sep):] + encrypted_name = self.encrypt_name(end_name) + + with open(rel_name, 'rb') as this_file: + this_bytes = this_file.read() + encrypted_bytes = self.lzss_compress(this_bytes) + + temp_file.write(encrypted_bytes) + + name_array.append(len(encrypted_name)) # Length of encrypted name. + name_array.append(encrypted_name) # Filename (encrypted). + name_array.append(len(encrypted_bytes)) # Filename (encrypted). + name_array.append(len(this_bytes)) # Filename (encrypted). + name_array.append(None) # Offset from the start of file (currently unknown). + + names.append(name_array) + + sum += len(encrypted_name) + 13 + # 1 байт за размер имени, далее имя, далее три >I параметра. + + if self._verbose: + print("> File {0} successfully managed!/Файл {0} успешно обработан!".format(end_name)) + + head_len = sum + sum += 4 + # Теперь sum на смещении первого файла. + + for i in range(len(names)): + names[i][4] = sum + sum += names[i][2] + if self._verbose: + print(">>> File offsets successfully calculated!/Смещения файлов успешно подсчитаны!") + + return head_len, names, temp_file + + def _pack_files(self, head_len: int, temp_file: tempfile.TemporaryFile) -> None: + new_archive = open(self._arc_name, 'wb') + new_archive.write(struct.pack('I', head_len)) + + for i in self._names: + new_archive.write(struct.pack('B', i[0])) + new_archive.write(i[1]) + for j in range(2, 5): + new_archive.write(struct.pack('>I', i[j])) + if self._verbose: + print(">>> Archive header successfully created!/Заголовок архива успешно создан!") + + temp_file.seek(0) + for i in self._names: + new_bytes = temp_file.read(i[2]) + if self._integrity_check: + try: + assert len(new_bytes) == i[2] + except AssertionError: + print("!!! File {0} compressed size is incorrect!/Размер сжатого файла {0} некорректен!".format( + self.decrypt_name(i[1]))) + new_archive.write(new_bytes) + if self._verbose: + print(">>> Archive files data successfully packed!/Данные файлов архива успешно запакованы!") + + new_archive.close() + temp_file.close() + + # Other technical methosd. + + @staticmethod + def decrypt_name(test: bytes) -> str: + tester = b'' + k = 0 + for i in range(len(test) - 1, -1, -1): + k += 1 + tester = struct.pack('B', test[i] + k) + tester + name = tester.decode(SilkyArc.name_encoding) + return name + + @staticmethod + def encrypt_name(test: str) -> bytes: + text_array = test.encode(SilkyArc.name_encoding) + tester = b'' + k = 0 + for i in range(len(text_array) - 1, -1, -1): + k += 1 + tester = struct.pack('B', text_array[i] - k) + tester + return tester diff --git a/silky_lzss.py b/silky_lzss.py new file mode 100644 index 0000000..119792b --- /dev/null +++ b/silky_lzss.py @@ -0,0 +1,276 @@ +# Class for working with Silky LZSS compression. +# So tiresome... + +# Made by Tester Testerov. +# Partially based on original 4/6/1989 Haruhiko Okumura implementation. +# Has many differences, through, and not just cosmetic. +# Because Silky Engine's implementation is different itself. It is more closer to... +# Oh, probably, Saxman compression? + + +class SilkyLZSS: + def __init__(self, buffer: bytes, N: int = 4096, F: int = 18, threshold: int = 2, null: int = None, + debug: bool = False, progress_print: int = 2**15, padding_byte=b'\x00'): + + self.debug = debug + self.input_buffer = buffer + + if null is None: # No, thou cannot just take None there. Or program will [REDACTED]!! + self.null = N + else: + self.null = null + + if isinstance(padding_byte, int): + self.padding_byte = self.unsigned_char(padding_byte) + else: + self.padding_byte = padding_byte[0] + self.progress_print = progress_print # Duration of progress print. + + self.N = N # Buffer size. In classical implementation it is 2^14. More buffer size is, the lesser file becomes. + self.F = F # Match length limit. In original implementation it is 18. More limit is, the lesser file becomes. + self.threshold = threshold # Minimum limit of match length to encode as position and length. + # I strongly recommend thou use default threshold value. + + self.text_buffer = [0] * (self.N + self.F - 1) + self.match_position = 0 + self.match_length = 0 + + self.lson = [0] * (self.N + 1) + self.rson = [0] * (self.N + 257) + self.dad = [0] * (self.N + 1) + + self.length_crutch = 0 + + def init_tree(self) -> None: + for i in range(self.N + 1, self.N + 257): + self.rson[i] = self.null + for i in range(self.N): + self.dad[i] = self.null + + def insert_node(self, r: int) -> None: + """Insert string (len(s) = F) from buffer into one of the trees and returns the longest match.""" + i = 0 + cmp = 1 + + p = self.N + 1 + self.text_buffer[r] + self.rson[r] = self.null + self.lson[r] = self.null + self.match_length = 0 + while True: + if cmp >= 0: + if self.rson[p] != self.null: + p = self.rson[p] + else: + self.rson[p] = r + self.dad[r] = p + return + else: + if self.lson[p] != self.null: + p = self.lson[p] + else: + self.lson[p] = r + self.dad[r] = p + return + for i in range(1, self.F): + cmp = self.text_buffer[r + i] - self.text_buffer[p + i] + if cmp != 0: + i -= 1 + break + i += 1 + if i > self.match_length: + self.match_position = p + self.match_length = i + if self.match_length >= self.F: + break + self.dad[r] = self.dad[p] + self.lson[r] = self.lson[p] + self.rson[r] = self.rson[p] + self.dad[self.lson[p]] = r + self.dad[self.rson[p]] = r + if self.rson[self.dad[p]] == p: + self.rson[self.dad[p]] = r + else: + self.lson[self.dad[p]] = r + self.dad[p] = self.null + + def delete_node(self, p: int) -> None: + """Delete node p.""" + if self.dad[p] == self.null: + return + if self.rson[p] == self.null: + q = self.lson[p] + elif self.lson[p] == self.null: + q = self.rson[p] + else: + q = self.lson[p] + if self.rson[q] != self.null: + q = self.rson[q] + while self.rson[q] != self.null: + q = self.rson[q] + self.rson[self.dad[q]] = self.lson[q] + self.dad[self.lson[q]] = self.dad[q] + self.lson[q] = self.lson[p] + self.dad[self.lson[p]] = q + self.rson[q] = self.rson[p] + self.dad[self.rson[p]] = q + self.dad[q] = self.dad[p] + if self.rson[self.dad[p]] == p: + self.rson[self.dad[p]] = q + else: + self.lson[self.dad[p]] = q + self.dad[p] = self.null + + def encode(self) -> bytes: + i = -1 + length = -1 + r = self.N - self.F + s = 0 + code_buf = [0]*17 + mask = 1 + code_buf_ptr = mask + output_buffer = b'' + + self.init_tree() + + code_buf[0] = 0 # Delete? + for i in range(s, r): + self.text_buffer[i] = self.padding_byte + for length in range(0, self.F): + if length >= len(self.input_buffer): + length -= 1 + break + self.text_buffer[r + length] = self.input_buffer[length] + length += 1 # Necessary crutch. + if length == 0: + return b'' + for i in range(1, self.F+1): + self.insert_node(r - i) + self.insert_node(r) + + pos = i # Crutch variable to fetch correct entries from input_buffer. + + print_count = 0 + + while True: + if self.match_length > length: # Probably correct. + self.match_length = length + if self.match_length <= self.threshold: # Correct. + self.match_length = 1 + code_buf[0] |= mask + code_buf[code_buf_ptr] = self.text_buffer[r] + code_buf_ptr += 1 + else: # Correct. + code_buf[code_buf_ptr] = self.unsigned_char(self.match_position) + code_buf_ptr += 1 + code_buf[code_buf_ptr] = self.unsigned_char((((self.match_position >> 4) & 0xf0) | + (self.match_length - (self.threshold + 1)))) + code_buf_ptr += 1 + mask <<= 1 + mask %= 256 # In that implementation was used just an unsigned char! #ERR!!! + if mask == 0: # Probably right. + for i in range(0, code_buf_ptr): + output_buffer += code_buf[i].to_bytes(1, byteorder="big") # Matches. + i += 1 + code_buf[0] = 0 + code_buf_ptr = 1 + mask = 1 + last_match_length = self.match_length # Matches. + + for i in range(0, last_match_length): + if pos >= len(self.input_buffer): + i -= 1 + break + self.delete_node(s) + c = self.input_buffer[pos] + pos += 1 + self.text_buffer[s] = c + if s < (self.F - 1): + self.text_buffer[s + self.N] = c + s = (s + 1) & (self.N - 1) + r = (r + 1) & (self.N - 1) + self.insert_node(r) + i += 1 # Alas, this crutch is necessary. + + if pos > print_count: # Frankly, not very important, but... + if self.debug: + print("{}\r".format(pos)) + print_count += self.progress_print + + while i < last_match_length: + i += 1 + self.delete_node(s) + s = (s + 1) & (self.N - 1) + r = (r + 1) & (self.N - 1) + length -= 1 + if length: + self.insert_node(r) + i += 1 + if length <= 0: + break + + if code_buf_ptr > 1: + for i in range(0, code_buf_ptr): + output_buffer += code_buf[i].to_bytes(1, byteorder="big") + if self.debug: + print("In: {} bytes.".format(len(self.input_buffer))) + print("Out: {} bytes.".format(len(output_buffer))) + print("Out/In: {}.".format(round(len(self.input_buffer)/len(output_buffer), 4))) + + return output_buffer + + def decode(self) -> bytes: + """Decode bytes from lzss.""" + + output_buffer = b'' + r = self.N - self.F + flags = 0 + + text_buffer = [0]*r + + self.init_tree() + + for i in range(0, r): + self.text_buffer[i] = self.padding_byte + + current_pos = 0 + + while True: + flags >>= 1 + if (flags & 256) == 0: + if current_pos >= len(self.input_buffer): + break + c = self.input_buffer[current_pos] + current_pos += 1 + flags = c | 0xff00 + if flags & 1: + if current_pos >= len(self.input_buffer): + break + c = self.input_buffer[current_pos] + current_pos += 1 + output_buffer += c.to_bytes(1, byteorder="big") + self.text_buffer[r] = c + r += 1 + r &= self.N - 1 + else: + if current_pos >= len(self.input_buffer): + break + i = self.input_buffer[current_pos] + current_pos += 1 + if current_pos >= len(self.input_buffer): + break + j = self.input_buffer[current_pos] + current_pos += 1 + i |= (j & 0xf0) << 4 + j = (j & 0x0f) + self.threshold + for k in range(0, j+1): + c = self.text_buffer[(i + k) & (self.N - 1)] + output_buffer += c.to_bytes(1, byteorder="big") + self.text_buffer[r] = c + r += 1 + r &= self.N - 1 + return output_buffer + + @staticmethod + def unsigned_char(char: int): + """Convert into unsigned char.""" + return char % 256