From f7861e9a1cf420f09555243469cdf3f4f68e2c19 Mon Sep 17 00:00:00 2001 From: CtREL0K Date: Fri, 17 Jan 2025 18:07:23 +0500 Subject: [PATCH 1/3] add fragmenter, update main(parser) --- fragmenter.py | 105 ++++++++++++++++++++++++++++++++++++++++++++++++++ main.py | 73 ++++++++++++++++++++++++++++++++--- 2 files changed, 173 insertions(+), 5 deletions(-) create mode 100644 fragmenter.py diff --git a/fragmenter.py b/fragmenter.py new file mode 100644 index 0000000..2362d92 --- /dev/null +++ b/fragmenter.py @@ -0,0 +1,105 @@ +import struct +import random +from pathlib import Path + +from fat_reader import FatReader +from directory_parser import DirectoryParser + +FAT_END_MASK = 0x0FFFFFF8 +FAT_ENTRY_MASK = 0x0FFFFFFF +FAT_FREE_MASK = 0x00000000 +MIN_VALID_INDEX = 2 + + +class Fragmenter: + """ + Класс для фрагментации файлов в файловой системе FAT32. + """ + + def __init__(self, image_path: Path, fat_reader: FatReader, directory_parser: DirectoryParser) -> None: + self.image_path = image_path + self.fat_reader = fat_reader + self.directory_parser = directory_parser + self.bpb = fat_reader.bpb + self.free_clusters = self._find_free_clusters() + + def _find_free_clusters(self) -> list[int]: + """ + Находит все свободные кластеры. + """ + return [cluster.index for cluster in self.fat_reader.clusters if cluster.next_index == FAT_FREE_MASK] + + def fragment_file(self, file_path: Path) -> None: + """ + Фрагментирует указанный файл, разбивая его на несмежные кластеры. + """ + all_files = self.directory_parser.get_all_files(self.bpb.root_clus) + target_file = next(f for f in all_files if f["path"].lower() == file_path) + + cluster_chain = self.fat_reader.get_cluster_chain(target_file["starting_cluster"]) + print(cluster_chain) + cluster_indices = [cluster.index for cluster in cluster_chain] + + if len(cluster_indices) < 2: + print(f"Файл '{file_path}' слишком мал для фрагментации.") + return + + num_fragments = random.randint(2, min(50, len(cluster_indices))) + clusters_to_move = random.sample(cluster_indices, num_fragments) + print(f"Фрагментируем файл '{file_path}', перемещая кластеры: {clusters_to_move}") + + new_clusters = [] + for old_cluster_index in clusters_to_move: + if not self.free_clusters: + print("Нет свободных кластеров для фрагментации.") + break + index = random.randrange(len(self.free_clusters)) + new_cluster = self.free_clusters.pop(index) + self._copy_cluster_data(old_cluster_index, new_cluster) + new_clusters.append(new_cluster) + self.fat_reader.clusters[old_cluster_index].next_index = FAT_FREE_MASK + + self.directory_parser.update_starting_cluster(target_file["path"], new_clusters[0]) + for i in range(len(new_clusters) - 1): + self.fat_reader.clusters[new_clusters[i]].next_index = new_clusters[i + 1] + self.fat_reader.clusters[new_clusters[-1]].next_index = FAT_ENTRY_MASK + + last_cluster = cluster_chain[-1] + last_cluster.next_index = new_clusters[0] + for i in range(len(new_clusters) - 1): + self.fat_reader.clusters[new_clusters[i]].next_index = new_clusters[i + 1] + self.fat_reader.clusters[new_clusters[-1]].next_index = FAT_ENTRY_MASK + + self._write_fat() + print(f"Файл '{file_path}' успешно фрагментирован.") + + def _copy_cluster_data(self, old_cluster_index: int, new_cluster_index: int) -> None: + """ + Копирует данные из одного кластера в другой. + """ + with open(self.image_path, 'r+b') as f: + old_offset = self.fat_reader.get_cluster_offset(old_cluster_index) + new_offset = self.fat_reader.get_cluster_offset(new_cluster_index) + f.seek(old_offset) + data = f.read(self.fat_reader.cluster_size) + f.seek(new_offset) + f.write(data) + print(f"Скопирован кластер {old_cluster_index} в {new_cluster_index}") + + def _write_fat(self) -> None: + """ + Записывает обновленную таблицу FAT обратно в образ диска. + """ + with open(self.image_path, 'r+b') as f: + fat_start = self.bpb.reserved_sec_cnt * self.bpb.byts_per_sec + fat_size = self.bpb.fat_size_32 * self.bpb.byts_per_sec + fat_data = bytearray() + + for cluster in self.fat_reader.clusters: + fat_entry = cluster.next_index & FAT_ENTRY_MASK + fat_data += struct.pack(" Date: Fri, 17 Jan 2025 21:19:02 +0500 Subject: [PATCH 2/3] add ClusterManager, simplified fragmenter and defragmenter --- cluster_manager.py | 91 ++++++++++++++++++++++++++++++++++++++++++++++ defragmenter.py | 73 ++----------------------------------- fragmenter.py | 81 +++++++---------------------------------- main.py | 36 +++--------------- 4 files changed, 112 insertions(+), 169 deletions(-) create mode 100644 cluster_manager.py diff --git a/cluster_manager.py b/cluster_manager.py new file mode 100644 index 0000000..b52fee7 --- /dev/null +++ b/cluster_manager.py @@ -0,0 +1,91 @@ +import struct +from pathlib import Path + +from cluster import Cluster +from directory_parser import DirectoryParser +from fat_reader import FatReader + +FAT_ENTRY_MASK = 0x0FFFFFFF +FAT_FREE_MASK = 0x00000000 + +class ClusterManager: + """ + Базовый класс для управления кластерами в файловой системе FAT32. + """ + def __init__(self, image_path: Path, fat_reader: FatReader, directory_parser: DirectoryParser) -> None: + self._image_path = image_path + self._fat_reader = fat_reader + self._directory_parser = directory_parser + self._bpb = fat_reader.bpb + self._free_clusters: list[int] = self._find_free_clusters() + + def find_fragmented_files(self, all_files: list[dict]) -> list[dict]: + fragmented_files = [] + for file_entry in all_files: + cluster_chain = self._fat_reader.get_cluster_chain(file_entry["starting_cluster"]) + if self._is_fragmented(cluster_chain): + fragmented_files.append({ + "path": file_entry["path"], + "cluster_chain": [cluster.index for cluster in cluster_chain] + }) + + return fragmented_files + + def _is_fragmented(self, cluster_chain: list[Cluster]) -> bool: + """ + Проверяет, является ли кластерная цепочка фрагментированной. + """ + for cluster_index in range(len(cluster_chain) - 1): + if cluster_chain[cluster_index].next_index != (cluster_chain[cluster_index].index + 1): + return True + return False + + def _write_fat(self) -> None: + """ + Записывает обновлённую FAT таблицу обратно в образ диска. + """ + with open(self._image_path, 'r+b') as f: + fat_start = self._bpb.reserved_sec_cnt * self._bpb.byts_per_sec + fat_size = self._bpb.fat_size_32 * self._bpb.byts_per_sec + fat_data = bytearray() + + for cluster in self._fat_reader.clusters: + fat_entry = cluster.next_index & FAT_ENTRY_MASK + fat_data += struct.pack(" None: + """ + Копирует данные из одного кластера в другой + """ + with open(self._image_path, 'r+b') as f: + old_data = self._fat_reader.read_cluster_data(self._fat_reader.clusters[old_cluster_index]) + f.seek(self._fat_reader.get_cluster_offset(new_cluster_index)) + f.write(old_data) + + def _update_directory_entry(self, file_entry: dict, new_start_cluster_index: int) -> None: + """ + Обновляет поле starting_cluster для файла в каталоге. + """ + self._directory_parser.update_starting_cluster(file_entry['path'], new_start_cluster_index) + + def _update_fat(self, old_clusters_indices: list[int], new_clusters_indices: list[int]) -> None: + """ + Обновляет FAT таблицу: освобождает старые кластеры и связывает новые кластеры. + """ + for cluster in old_clusters_indices: + self._fat_reader.clusters[cluster].next_index = FAT_FREE_MASK + + for i in range(len(new_clusters_indices) - 1): + self._fat_reader.clusters[new_clusters_indices[i]].next_index = new_clusters_indices[i + 1] + self._fat_reader.clusters[new_clusters_indices[-1]].next_index = FAT_ENTRY_MASK diff --git a/defragmenter.py b/defragmenter.py index 11c18bf..4482dba 100644 --- a/defragmenter.py +++ b/defragmenter.py @@ -1,26 +1,20 @@ -import struct from pathlib import Path -from cluster import Cluster +from cluster_manager import ClusterManager from directory_parser import DirectoryParser from fat_reader import FatReader -FAT_END_MASK = 0x0FFFFFF8 FAT_ENTRY_MASK = 0x0FFFFFFF FAT_FREE_MASK = 0x00000000 ClusterIndexList = list[int] -class Defragmenter: +class Defragmenter(ClusterManager): """ Класс для дефрагментации файловой системы FAT32. """ def __init__(self, image_path: Path, fat_reader: FatReader, directory_parser: DirectoryParser) -> None: - self._image_path = image_path - self._fat_reader = fat_reader - self._directory_parser = directory_parser - self._bpb = fat_reader.bpb - self._free_clusters: list[int] = self._find_free_clusters() + super().__init__(image_path, fat_reader, directory_parser) def defragment(self) -> None: """ @@ -49,21 +43,6 @@ def defragment(self) -> None: self._write_fat() print("Дефрагментация завершена успешно.") - def _is_fragmented(self, cluster_chain: list[Cluster]) -> bool: - """ - Проверяет, является ли кластерная цепочка фрагментированной. - """ - for cluster_index in range(len(cluster_chain) - 1): - if cluster_chain[cluster_index].next_index != (cluster_chain[cluster_index].index + 1): - return True - return False - - def _find_free_clusters(self): - """ - Находит все свободные кластеры. - """ - return [cluster.index for cluster in self._fat_reader.clusters if cluster.next_index == FAT_FREE_MASK] - def _find_free_blocks(self) -> list[ClusterIndexList]: """ Находит все непрерывные блоки свободных кластеров. @@ -116,50 +95,4 @@ def _allocate_clusters(self, clusters_count: int) -> list[int]: self._free_clusters.remove(cluster) return new_clusters - def _copy_cluster_data(self, old_cluster_index: int, new_cluster_index: int) -> None: - """ - Копирует данные из одного кластера в другой - """ - with open(self._image_path, 'r+b') as f: - old_data = self._fat_reader.read_cluster_data(self._fat_reader.clusters[old_cluster_index]) - f.seek(self._fat_reader.get_cluster_offset(new_cluster_index)) - f.write(old_data) - - def _update_fat(self, old_clusters_indices: list[int], new_clusters_indices: list[int]) -> None: - """ - Обновляет FAT таблицу: освобождает старые кластеры и связывает новые кластеры. - """ - for cluster in old_clusters_indices: - self._fat_reader.clusters[cluster].next_index = FAT_FREE_MASK - - for i in range(len(new_clusters_indices)): - current_cluster = new_clusters_indices[i] - if i < len(new_clusters_indices) - 1: - self._fat_reader.clusters[current_cluster].next_index = new_clusters_indices[i + 1] - else: - self._fat_reader.clusters[current_cluster].next_index = FAT_ENTRY_MASK - print(f"FAT таблица обновлена для новых кластеров: {new_clusters_indices}") - - def _update_directory_entry(self, file_entry: dict, new_start_cluster_index: int) -> None: - """ - Обновляет поле starting_cluster для файла в каталоге. - """ - self._directory_parser.update_starting_cluster(file_entry['path'], new_start_cluster_index) - - def _write_fat(self) -> None: - """ - Записывает обновлённую FAT таблицу обратно в образ диска. - """ - with open(self._image_path, 'r+b') as f: - fat_start = self._bpb.reserved_sec_cnt * self._bpb.byts_per_sec - fat_size = self._bpb.fat_size_32 * self._bpb.byts_per_sec - fat_data = bytearray() - - for cluster in self._fat_reader.clusters: - fat_entry = cluster.next_index & FAT_ENTRY_MASK - fat_data += struct.pack(" None: - self.image_path = image_path - self.fat_reader = fat_reader - self.directory_parser = directory_parser - self.bpb = fat_reader.bpb - self.free_clusters = self._find_free_clusters() - - def _find_free_clusters(self) -> list[int]: - """ - Находит все свободные кластеры. - """ - return [cluster.index for cluster in self.fat_reader.clusters if cluster.next_index == FAT_FREE_MASK] + super().__init__(image_path, fat_reader, directory_parser) def fragment_file(self, file_path: Path) -> None: """ Фрагментирует указанный файл, разбивая его на несмежные кластеры. """ - all_files = self.directory_parser.get_all_files(self.bpb.root_clus) - target_file = next(f for f in all_files if f["path"].lower() == file_path) + all_files = self._directory_parser.get_all_files(self._bpb.root_clus) + target_file = next(f for f in all_files if f["path"] == file_path) - cluster_chain = self.fat_reader.get_cluster_chain(target_file["starting_cluster"]) + cluster_chain = self._fat_reader.get_cluster_chain(target_file["starting_cluster"]) print(cluster_chain) cluster_indices = [cluster.index for cluster in cluster_chain] @@ -44,62 +32,19 @@ def fragment_file(self, file_path: Path) -> None: print(f"Файл '{file_path}' слишком мал для фрагментации.") return - num_fragments = random.randint(2, min(50, len(cluster_indices))) - clusters_to_move = random.sample(cluster_indices, num_fragments) - print(f"Фрагментируем файл '{file_path}', перемещая кластеры: {clusters_to_move}") + print(f"Фрагментируем файл '{file_path}'") new_clusters = [] - for old_cluster_index in clusters_to_move: - if not self.free_clusters: + for old_cluster_index in cluster_indices: + if not self._free_clusters: print("Нет свободных кластеров для фрагментации.") break - index = random.randrange(len(self.free_clusters)) - new_cluster = self.free_clusters.pop(index) + index = random.randrange(len(self._free_clusters)) + new_cluster = self._free_clusters.pop(index) self._copy_cluster_data(old_cluster_index, new_cluster) new_clusters.append(new_cluster) - self.fat_reader.clusters[old_cluster_index].next_index = FAT_FREE_MASK - - self.directory_parser.update_starting_cluster(target_file["path"], new_clusters[0]) - for i in range(len(new_clusters) - 1): - self.fat_reader.clusters[new_clusters[i]].next_index = new_clusters[i + 1] - self.fat_reader.clusters[new_clusters[-1]].next_index = FAT_ENTRY_MASK - - last_cluster = cluster_chain[-1] - last_cluster.next_index = new_clusters[0] - for i in range(len(new_clusters) - 1): - self.fat_reader.clusters[new_clusters[i]].next_index = new_clusters[i + 1] - self.fat_reader.clusters[new_clusters[-1]].next_index = FAT_ENTRY_MASK + self._update_directory_entry(target_file, new_clusters[0]) + self._update_fat(cluster_indices, new_clusters) self._write_fat() print(f"Файл '{file_path}' успешно фрагментирован.") - - def _copy_cluster_data(self, old_cluster_index: int, new_cluster_index: int) -> None: - """ - Копирует данные из одного кластера в другой. - """ - with open(self.image_path, 'r+b') as f: - old_offset = self.fat_reader.get_cluster_offset(old_cluster_index) - new_offset = self.fat_reader.get_cluster_offset(new_cluster_index) - f.seek(old_offset) - data = f.read(self.fat_reader.cluster_size) - f.seek(new_offset) - f.write(data) - print(f"Скопирован кластер {old_cluster_index} в {new_cluster_index}") - - def _write_fat(self) -> None: - """ - Записывает обновленную таблицу FAT обратно в образ диска. - """ - with open(self.image_path, 'r+b') as f: - fat_start = self.bpb.reserved_sec_cnt * self.bpb.byts_per_sec - fat_size = self.bpb.fat_size_32 * self.bpb.byts_per_sec - fat_data = bytearray() - - for cluster in self.fat_reader.clusters: - fat_entry = cluster.next_index & FAT_ENTRY_MASK - fat_data += struct.pack(" Date: Sat, 18 Jan 2025 19:26:43 +0500 Subject: [PATCH 3/3] tests, some refactor --- README.md | 47 +++++++- defragmenter/__init__.py | 0 bpb.py => defragmenter/bpb.py | 0 Cluster.py => defragmenter/cluster.py | 0 .../cluster_manager.py | 6 +- .../defragmenter.py | 6 +- .../directory_parser.py | 4 +- .../fat_attributes.py | 0 fat_reader.py => defragmenter/fat_reader.py | 4 +- fragmenter.py => defragmenter/fragmenter.py | 14 +-- main.py | 16 +-- pytest.ini | 4 + requirements.txt | 7 ++ tests/__init__.py | 0 tests/conftest.py | 34 ++++++ tests/test_bpb.py | 16 +++ tests/test_cluster.py | 7 ++ tests/test_cluster_manager.py | 54 +++++++++ tests/test_defragmenter.py | 63 +++++++++++ tests/test_directory_parser.py | 104 ++++++++++++++++++ tests/test_fragmenter.py | 56 ++++++++++ 21 files changed, 416 insertions(+), 26 deletions(-) create mode 100644 defragmenter/__init__.py rename bpb.py => defragmenter/bpb.py (100%) rename Cluster.py => defragmenter/cluster.py (100%) rename cluster_manager.py => defragmenter/cluster_manager.py (96%) rename defragmenter.py => defragmenter/defragmenter.py (96%) rename directory_parser.py => defragmenter/directory_parser.py (98%) rename FAT_Attributes.py => defragmenter/fat_attributes.py (100%) rename fat_reader.py => defragmenter/fat_reader.py (97%) rename fragmenter.py => defragmenter/fragmenter.py (78%) create mode 100644 pytest.ini create mode 100644 requirements.txt create mode 100644 tests/__init__.py create mode 100644 tests/conftest.py create mode 100644 tests/test_bpb.py create mode 100644 tests/test_cluster.py create mode 100644 tests/test_cluster_manager.py create mode 100644 tests/test_defragmenter.py create mode 100644 tests/test_directory_parser.py create mode 100644 tests/test_fragmenter.py diff --git a/README.md b/README.md index 836d65b..86d5a98 100644 --- a/README.md +++ b/README.md @@ -1 +1,46 @@ -# disk-defragmentation \ No newline at end of file +# Проект "Дефрагментатор" + +## Авторы +* Овчинников Кирилл +* Зуев Кирилл + +## Описание +Дефрагментатор - программа, которая устраняет +фрагментацию файлов на дисках для оптимизации процесса записи и чтения с диска. + +## Требования +* Python 3.8 и выше +* Для работы дефрагментатора не нужно сторонних библиотек, однако для тестирования понадобятся +сторонние библиотеки: +```pip install -r requirements.txt``` + +## Запуск +Дефрагментатор поддерживает три команды: +1. check - выводит на экран все файлы, находящиеся на диске, а также фрагментированные файлы\ +Пример использование:\ +```python main.py check <путь до образа диска> ``` +2. defragment - выполняет дефрагментацию диска\ +Пример использования:\ +```python main.py defragment <путь до образа диска> ``` +3. fragment - выполняет фрагментацию определённого файла, если его размер позволяет это сделать\ +Пример использования:\ +```python main.py fragment <путь до образа диска> <путь до файла, который будет фрагментирован>``` + +## Тесты +Код покрыт тестами, процент покрытия - 83.\ +Тесты находятся в директории ```tests``` +``` +Name Stmts Miss Cover Missing +---------------------------------------------------------------- +defragmenter\__init__.py 0 0 100% +defragmenter\bpb.py 13 0 100% +defragmenter\cluster.py 10 0 100% +defragmenter\cluster_manager.py 52 19 63% 47-59, 71-74, 86-91 +defragmenter\defragmenter.py 63 19 70% 73-88, 94-98 +defragmenter\directory_parser.py 140 22 84% 39-41, 106, 136, 191-210 +defragmenter\fat_attributes.py 9 0 100% +defragmenter\fat_reader.py 54 3 94% 38, 52-53 +defragmenter\fragmenter.py 34 0 100% +---------------------------------------------------------------- +TOTAL 375 63 83% +``` diff --git a/defragmenter/__init__.py b/defragmenter/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/bpb.py b/defragmenter/bpb.py similarity index 100% rename from bpb.py rename to defragmenter/bpb.py diff --git a/Cluster.py b/defragmenter/cluster.py similarity index 100% rename from Cluster.py rename to defragmenter/cluster.py diff --git a/cluster_manager.py b/defragmenter/cluster_manager.py similarity index 96% rename from cluster_manager.py rename to defragmenter/cluster_manager.py index b52fee7..a1f1602 100644 --- a/cluster_manager.py +++ b/defragmenter/cluster_manager.py @@ -1,9 +1,9 @@ import struct from pathlib import Path -from cluster import Cluster -from directory_parser import DirectoryParser -from fat_reader import FatReader +from defragmenter.cluster import Cluster +from defragmenter.directory_parser import DirectoryParser +from defragmenter.fat_reader import FatReader FAT_ENTRY_MASK = 0x0FFFFFFF FAT_FREE_MASK = 0x00000000 diff --git a/defragmenter.py b/defragmenter/defragmenter.py similarity index 96% rename from defragmenter.py rename to defragmenter/defragmenter.py index 4482dba..75f44dc 100644 --- a/defragmenter.py +++ b/defragmenter/defragmenter.py @@ -1,8 +1,8 @@ from pathlib import Path -from cluster_manager import ClusterManager -from directory_parser import DirectoryParser -from fat_reader import FatReader +from defragmenter.cluster_manager import ClusterManager +from defragmenter.directory_parser import DirectoryParser +from defragmenter.fat_reader import FatReader FAT_ENTRY_MASK = 0x0FFFFFFF FAT_FREE_MASK = 0x00000000 diff --git a/directory_parser.py b/defragmenter/directory_parser.py similarity index 98% rename from directory_parser.py rename to defragmenter/directory_parser.py index beb8893..a2016e3 100644 --- a/directory_parser.py +++ b/defragmenter/directory_parser.py @@ -1,8 +1,8 @@ import struct from typing import Any -from fat_attributes import FatAttributes -from fat_reader import FatReader +from defragmenter.fat_attributes import FatAttributes +from defragmenter.fat_reader import FatReader ENTRY_SIZE = 32 EMPTY_ENTRY_MARK = 0x00 diff --git a/FAT_Attributes.py b/defragmenter/fat_attributes.py similarity index 100% rename from FAT_Attributes.py rename to defragmenter/fat_attributes.py diff --git a/fat_reader.py b/defragmenter/fat_reader.py similarity index 97% rename from fat_reader.py rename to defragmenter/fat_reader.py index ecff6d6..15e8ead 100644 --- a/fat_reader.py +++ b/defragmenter/fat_reader.py @@ -1,8 +1,8 @@ import struct from pathlib import Path -from bpb import BPB -from cluster import Cluster +from defragmenter.bpb import BPB +from defragmenter.cluster import Cluster FAT_ENTRY_SIZE = 4 FAT_ENTRY_MASK = 0x0FFFFFFF diff --git a/fragmenter.py b/defragmenter/fragmenter.py similarity index 78% rename from fragmenter.py rename to defragmenter/fragmenter.py index 7c4edc2..58dae6e 100644 --- a/fragmenter.py +++ b/defragmenter/fragmenter.py @@ -1,9 +1,9 @@ import random from pathlib import Path -from fat_reader import FatReader -from directory_parser import DirectoryParser -from cluster_manager import ClusterManager +from defragmenter.fat_reader import FatReader +from defragmenter.directory_parser import DirectoryParser +from defragmenter.cluster_manager import ClusterManager FAT_ENTRY_MASK = 0x0FFFFFFF FAT_FREE_MASK = 0x00000000 @@ -22,15 +22,15 @@ def fragment_file(self, file_path: Path) -> None: Фрагментирует указанный файл, разбивая его на несмежные кластеры. """ all_files = self._directory_parser.get_all_files(self._bpb.root_clus) - target_file = next(f for f in all_files if f["path"] == file_path) + target_file = next((f for f in all_files if Path(f["path"]) == file_path), None) + if not target_file: + raise FileNotFoundError(f"Файл {file_path} не найден") cluster_chain = self._fat_reader.get_cluster_chain(target_file["starting_cluster"]) - print(cluster_chain) cluster_indices = [cluster.index for cluster in cluster_chain] if len(cluster_indices) < 2: - print(f"Файл '{file_path}' слишком мал для фрагментации.") - return + raise ValueError(f"Файл '{file_path}' слишком мал для фрагментации.") print(f"Фрагментируем файл '{file_path}'") diff --git a/main.py b/main.py index 4c6db3e..35201b9 100644 --- a/main.py +++ b/main.py @@ -2,12 +2,12 @@ import argparse from pathlib import Path -from bpb import BPB -from directory_parser import DirectoryParser -from fat_reader import FatReader -from defragmenter import Defragmenter -from fragmenter import Fragmenter -from cluster_manager import ClusterManager +from defragmenter.bpb import BPB +from defragmenter.directory_parser import DirectoryParser +from defragmenter.fat_reader import FatReader +from defragmenter.defragmenter import Defragmenter +from defragmenter.fragmenter import Fragmenter +from defragmenter.cluster_manager import ClusterManager arg_parser = argparse.ArgumentParser() subparsers = arg_parser.add_subparsers(dest='command', help='Доступные команды', required=True) @@ -51,10 +51,10 @@ print(f"path: {file['path']}, cluster_chain: {file['cluster_chain']}") elif command == "defragment": - defragmenter = Defragmenter(image_path, fat_reader, parser) + defragmenter = Defragmenter(final_image_path, fat_reader, parser) defragmenter.defragment() elif command == "fragment": file_path = Path(args.file_path) fragmenter = Fragmenter(final_image_path, fat_reader, parser) - fragmenter.fragment_file(args.file_path) + fragmenter.fragment_file(Path(args.file_path)) diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..76501e0 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,4 @@ +[pytest] +minversion = 6.0 +addopts = --cov=defragmenter --cov-report=term-missing +testpaths = tests diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..ac96e08 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +colorama==0.4.6 +coverage==7.6.10 +iniconfig==2.0.0 +packaging==24.2 +pluggy==1.5.0 +pytest==8.3.4 +pytest-cov==6.0.0 diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..b774c52 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,34 @@ +import pytest +from unittest.mock import MagicMock + +@pytest.fixture +def mock_fat_reader(): + fat_reader = MagicMock() + fat_reader.bpb = MagicMock() + fat_reader.bpb.reserved_sec_cnt = 32 + fat_reader.bpb.byts_per_sec = 512 + fat_reader.bpb.fat_size_32 = 256 + fat_reader.bpb.root_clus = 2 + fat_reader.bpb.sec_per_clus = 8 + fat_reader.bpb.total_sec_32 = 100000 + fat_reader.cluster_size = 4096 + fat_reader.clusters = [] + for _ in range(1000): + fat_reader.clusters.append(MagicMock()) + for i, cluster in enumerate(fat_reader.clusters): + cluster.index = i + cluster.next_index = i + 1 if i <= 500 else i + 2 if i < 998 else 0x0FFFFFFF + fat_reader.get_cluster_offset.side_effect = lambda x: x * fat_reader.cluster_size + #fat_reader.read_cluster_data.side_effect = lambda cluster: b'Data' * 1024 + fat_reader.write_fat.return_value = None + return fat_reader + +@pytest.fixture +def mock_directory_parser(): + directory_parser = MagicMock() + directory_parser.get_all_files.return_value = [ + {"path": "DIR1/FILE1.TXT", "starting_cluster": 2, "size": 2048}, + {"path": "DIR2/FILE2.TXT", "starting_cluster": 10, "size": 4096}, + ] + directory_parser.update_starting_cluster.return_value = None + return directory_parser diff --git a/tests/test_bpb.py b/tests/test_bpb.py new file mode 100644 index 0000000..a9ac61e --- /dev/null +++ b/tests/test_bpb.py @@ -0,0 +1,16 @@ +from pathlib import Path + +from defragmenter.bpb import BPB + +def test_init(): + project_root = Path(__file__).resolve().parent.parent + image_path = project_root / "Images" / "FAT_32_32MB" + assert image_path.exists(), f"Файл {image_path} не найден." + bpb = BPB(image_path) + assert bpb.root_clus == 2 + assert bpb.byts_per_sec == 512 + assert bpb.num_fats == 2 + assert bpb.reserved_sec_cnt == 2782 + assert bpb.total_sec_32 == 65536 + assert bpb.fat_size_32 == 14993 + assert bpb.sec_per_clus == 32 \ No newline at end of file diff --git a/tests/test_cluster.py b/tests/test_cluster.py new file mode 100644 index 0000000..c5fc3fd --- /dev/null +++ b/tests/test_cluster.py @@ -0,0 +1,7 @@ +from defragmenter.cluster import Cluster + +def test_is_valid(): + valid_index_cluster = Cluster(100, 101, False) + assert valid_index_cluster.is_valid() + invalid_index_cluster = Cluster(1, 2, False) + assert not invalid_index_cluster.is_valid() diff --git a/tests/test_cluster_manager.py b/tests/test_cluster_manager.py new file mode 100644 index 0000000..f33c367 --- /dev/null +++ b/tests/test_cluster_manager.py @@ -0,0 +1,54 @@ +from pathlib import Path + +from defragmenter.bpb import BPB +from defragmenter.cluster_manager import ClusterManager +from defragmenter.directory_parser import DirectoryParser +from defragmenter.fat_reader import FatReader + + +def test_is_fragmented_not_fragmented(mock_fat_reader, mock_directory_parser): + manager = ClusterManager(Path("Images/FAT_32_32MB"), mock_fat_reader, mock_directory_parser) + cluster_chain = [mock_fat_reader.clusters[i] for i in range(2, 7)] + for i, cluster in enumerate(cluster_chain): + if i < len(cluster_chain) - 1: + cluster.next_index = cluster.index + 1 + else: + cluster.next_index = 0x0FFFFFFF + assert not manager._is_fragmented(cluster_chain) + +def test_is_fragmented_fragmented(mock_fat_reader, mock_directory_parser): + manager = ClusterManager(Path("Images/FAT_32_32MB"), mock_fat_reader, mock_directory_parser) + cluster_chain = [mock_fat_reader.clusters[i] for i in range(2, 7)] + cluster_chain[2].next_index = 20 + assert manager._is_fragmented(cluster_chain) + +def test_update_directory_entry_success(mock_fat_reader, mock_directory_parser): + manager = ClusterManager(Path("Images/FAT_32_32MB"), mock_fat_reader, mock_directory_parser) + file_entry = {"path": "DIR1/FILE1.TXT"} + manager._update_directory_entry(file_entry, 5) + mock_directory_parser.update_starting_cluster.assert_called_with("DIR1/FILE1.TXT", 5) + +def test_find_fragmented_files(): + project_root = Path(__file__).resolve().parent.parent + image_path = project_root / "Images" / "FAT_32_fragmented" + bpb = BPB(image_path) + fat_reader = FatReader(image_path, bpb) + directory_parser = DirectoryParser(fat_reader) + all_files = directory_parser.get_all_files(bpb.root_clus) + manager = ClusterManager(image_path, fat_reader, directory_parser) + fragmented_files = manager.find_fragmented_files(all_files) + expected_fragmented_files = [ + { + 'path': 'ASDA.TXT', + 'cluster_chain': [6, 1552, 1553, 1554, 1555, 1556] + }, + { + 'path': 'ipset-discord.txt', + 'cluster_chain': [1337, 1557, 1558, 1559, 1560, 1561] + }, + { + 'path': 'list-discord.txt', + 'cluster_chain': [1546, 1742] + } + ] + assert expected_fragmented_files == fragmented_files diff --git a/tests/test_defragmenter.py b/tests/test_defragmenter.py new file mode 100644 index 0000000..837d494 --- /dev/null +++ b/tests/test_defragmenter.py @@ -0,0 +1,63 @@ +import pytest +from unittest.mock import MagicMock, patch +from pathlib import Path + +from defragmenter.defragmenter import Defragmenter + +def test_defragment_no_fragmented_files(mock_fat_reader, mock_directory_parser): + defragmenter = Defragmenter(Path("Images/FAT_32_32MB"), mock_fat_reader, mock_directory_parser) + defragmenter.logger = MagicMock() + + mock_directory_parser.get_all_files.return_value = [ + {"path": "DIR1/FILE1.TXT", "starting_cluster": 2, "size": 4096}, + ] + cluster_chain = [mock_fat_reader.clusters[i] for i in range(2, 7)] + for i, cluster in enumerate(cluster_chain): + if i < len(cluster_chain) - 1: + cluster.next_index = cluster.index + 1 + else: + cluster.next_index = 0x0FFFFFFF + defragmenter._copy_cluster_data = MagicMock() + with patch('defragmenter.defragmenter.ClusterManager._write_fat', side_effect=None): + defragmenter.defragment() + defragmenter._copy_cluster_data.assert_not_called() + +def test_defragment_with_fragmented_files(mock_fat_reader, mock_directory_parser): + defragmenter = Defragmenter(Path("Images/FAT_32_32MB"), mock_fat_reader, mock_directory_parser) + defragmenter.logger = MagicMock() + mock_directory_parser.get_all_files.return_value = [ + {"path": "DIR1/FILE1.TXT", "starting_cluster": 567, "size": 4096}, + ] + cluster_chain = [mock_fat_reader.clusters[i] for i in range(2, 7)] + cluster_chain[2].next_index = 20 + mock_fat_reader.get_cluster_chain = MagicMock() + mock_fat_reader.get_cluster_chain.return_value = cluster_chain + defragmenter._copy_cluster_data = MagicMock() + defragmenter._update_fat = MagicMock() + defragmenter._update_directory_entry = MagicMock() + defragmenter._allocate_clusters = MagicMock() + defragmenter._allocate_clusters.return_value = [i for i in range(500, 505)] + + with patch('defragmenter.defragmenter.ClusterManager._write_fat', side_effect=None): + defragmenter.defragment() + + defragmenter._copy_cluster_data.assert_called() + defragmenter._update_fat.assert_called() + defragmenter._update_directory_entry.assert_called_with(mock_directory_parser.get_all_files.return_value[0], + defragmenter._allocate_clusters.return_value[0]) + +def test_defragment_write_fat_failure(mock_fat_reader, mock_directory_parser): + defragmenter = Defragmenter(Path("Images/FAT_32_32MB"), mock_fat_reader, mock_directory_parser) + mock_directory_parser.get_all_files.return_value = [ + {"path": "DIR1/FILE1.TXT", "starting_cluster": 2, "size": 4096}, + ] + cluster_chain = [mock_fat_reader.clusters[i] for i in range(2, 7)] + cluster_chain[2].next_index = 20 + with pytest.raises(Exception): + with patch('defragmenter.defragmenter.ClusterManager._write_fat', side_effect=Exception("Write FAT failed")): + defragmenter.defragment() + +def test_find_free_blocks(mock_fat_reader, mock_directory_parser): + defragmenter = Defragmenter(Path("Images/FAT_32_32MB"), mock_fat_reader, mock_directory_parser) + defragmenter._free_clusters = MagicMock().return_value = [2, 3, 4, 7, 8, 10, 11, 12, 13] + assert defragmenter._find_free_blocks() == [[2, 3, 4], [7, 8], [10, 11, 12, 13]] \ No newline at end of file diff --git a/tests/test_directory_parser.py b/tests/test_directory_parser.py new file mode 100644 index 0000000..368407d --- /dev/null +++ b/tests/test_directory_parser.py @@ -0,0 +1,104 @@ +from pathlib import Path + +import pytest + +from defragmenter.fat_reader import FatReader +from defragmenter.directory_parser import DirectoryParser +from defragmenter.bpb import BPB + + +@pytest.fixture +def directory_parser(): + project_root = Path(__file__).resolve().parent.parent + image_path = project_root / "Images" / "FAT_32_32MB" + assert image_path.exists(), f"Файл {image_path} не найден." + fat_reader = FatReader(image_path, BPB(image_path)) + return DirectoryParser(fat_reader) + +def test_init(directory_parser): + assert directory_parser is not None + + +def test_parse_directory_entries(directory_parser): + first_cluster = directory_parser.fat_reader.clusters[directory_parser.fat_reader.bpb.root_clus] + cluster_data = directory_parser.fat_reader.read_cluster_data(first_cluster) + + entries = directory_parser.parse_directory_entries(cluster_data) + + assert isinstance(entries, list) + + expected_file = { + 'attributes': 32, + 'name': 'ASDA.TXT', + 'size': 1644, + 'starting_cluster': 6 + } + + assert expected_file in entries, f"{expected_file} не найден в записях каталога." + + +def test_get_all_files(directory_parser): + all_files = directory_parser.get_all_files(directory_parser.fat_reader.bpb.root_clus) + assert isinstance(all_files, list) + expected_files = [ + { + 'path': 'ASDA.TXT', + 'size': 1644, + 'starting_cluster': 6 + }, + { + 'path': 'PortScan/Parser.py', + 'size': 2986, + 'starting_cluster': 9 + }, + { + 'path': 'PortScan/__pycache__/TCP_Scanner.cpython-311.pyc', + 'starting_cluster': 25, + 'size': 8589 + } + ] + + for file in expected_files: + assert file in all_files, f"Файл {file} не найден в списке всех файлов." + + +def test_find_directory_entry_existing_file(directory_parser): + target_name = "ASDA.TXT" + root_cluster = directory_parser.fat_reader.bpb.root_clus + result = directory_parser.find_directory_entry(root_cluster, target_name) + assert result is not None, f"Запись для {target_name} не найдена." + entry_offset, cluster_index = result + assert isinstance(entry_offset, int) + assert isinstance(cluster_index, int) + +def test_find_directory_entry_nonexistent_file(directory_parser): + target_name = "NONEXISTENT.TXT" + root_cluster = directory_parser.fat_reader.bpb.root_clus + result = directory_parser.find_directory_entry(root_cluster, target_name) + assert result is None, f"Запись для {target_name} должна отсутствовать." + +def test_navigate_path_valid_path(directory_parser): + path = ['PortScan', '__pycache__', 'TCP_Scanner.cpython-311.pyc'] + path_parts = path + result = directory_parser.navigate_path(path_parts) + assert isinstance(result, int) + assert result >= 2 + +def test_navigate_path_invalid_path(directory_parser): + path = ["nonexistent_dir", "file.txt"] + path_parts = path + result = directory_parser.navigate_path(path_parts) + assert result is None, "Путь должен отсутствовать." + +def test_find_subdirectory_cluster_existing(directory_parser): + subdir_name = "PortScan" + root_cluster = directory_parser.fat_reader.bpb.root_clus + result = directory_parser.find_subdirectory_cluster(root_cluster, subdir_name) + assert isinstance(result, int), "Кластер подкаталога должен быть целым числом." + assert result >= 2, "Кластер должен быть валидным (>=2)." + +def test_find_subdirectory_cluster_nonexistent(directory_parser): + subdir_name = "ghost_dir" + root_cluster = directory_parser.fat_reader.bpb.root_clus + result = directory_parser.find_subdirectory_cluster(root_cluster, subdir_name) + assert result is None, "Кластер подкаталога не должен существовать." diff --git a/tests/test_fragmenter.py b/tests/test_fragmenter.py new file mode 100644 index 0000000..19a3792 --- /dev/null +++ b/tests/test_fragmenter.py @@ -0,0 +1,56 @@ +import pytest +from unittest.mock import MagicMock, patch +from pathlib import Path + +from defragmenter.fragmenter import Fragmenter + +def test_fragment_file_success(mock_fat_reader, mock_directory_parser): + fragmenter = Fragmenter(Path("Images/FAT_32_32MB"), mock_fat_reader, mock_directory_parser) + fragmenter.logger = MagicMock() + fragmenter._write_fat = MagicMock() + fragmenter._update_fat = MagicMock() + fragmenter._update_directory_entry = MagicMock() + fragmenter._copy_cluster_data = MagicMock() + fragmenter._free_clusters = MagicMock().return_value = [i for i in range(1000)] + + mock_directory_parser.get_all_files.return_value = [ + {"path": "DIR1/FILE1.TXT", "starting_cluster": 2, "size": 4096} + ] + target_file = {"path": "DIR1/FILE1.TXT", "starting_cluster": 2, "size": 4096} + mock_fat_reader.get_cluster_chain.side_effect = lambda start: mock_fat_reader.clusters[start:start+4] + + with patch('random.randrange', return_value=100): + fragmenter.fragment_file(Path("DIR1/FILE1.TXT")) + + fragmenter._copy_cluster_data.assert_called() + fragmenter._update_fat.assert_called() + fragmenter._update_directory_entry.assert_called_with(target_file, 100) + +def test_fragment_file_not_found(mock_fat_reader, mock_directory_parser): + fragmenter = Fragmenter(Path("Images/FAT_32_32MB"), mock_fat_reader, mock_directory_parser) + fragmenter.logger = MagicMock() + mock_directory_parser.get_all_files.return_value = [] + with pytest.raises(FileNotFoundError): + fragmenter.fragment_file(Path("DIR1/FILE1.TXT")) + + +def test_fragment_file_small_file(mock_fat_reader, mock_directory_parser): + fragmenter = Fragmenter(Path("Images/FAT_32_32MB"), mock_fat_reader, mock_directory_parser) + fragmenter.logger = MagicMock() + mock_directory_parser.get_all_files.return_value = [ + {"path": "DIR1/SMALL.TXT", "starting_cluster": 2, "size": 512} # Размер меньше 2 кластеров + ] + mock_fat_reader.get_cluster_chain.side_effect = lambda start: mock_fat_reader.clusters[start] + with pytest.raises(ValueError): + fragmenter.fragment_file(Path("DIR1/SMALL.TXT")) + +def test_fragment_file_copy_failure(mock_fat_reader, mock_directory_parser): + fragmenter = Fragmenter(Path("Images/FAT_32_32MB"), mock_fat_reader, mock_directory_parser) + fragmenter.logger = MagicMock() + fragmenter._copy_cluster_data = MagicMock(side_effect=Exception("Copy failed")) + mock_directory_parser.get_all_files.return_value = [ + {"path": "DIR1/FILE1.TXT", "starting_cluster": 600, "size": 4096} + ] + mock_fat_reader.get_cluster_chain.side_effect = lambda start: mock_fat_reader.clusters[start:start+2] + with pytest.raises(Exception): + fragmenter.fragment_file(Path("DIR1/FILE1.TXT"))