-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbasic_demo.py
131 lines (112 loc) · 5.22 KB
/
basic_demo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import os
import random
import shutil
import time
from eade.ead_engine import EaDEngine
from eade.rad_engine import RaDEngine
def test_file(test_total_shares: int, test_remove_segments: int):
# create a random file blob that is a random size between 50MB and 100MB
random_file_size = random.randint(50, 100)
random_file_size *= 1024 * 1024
random_file = os.urandom(random_file_size)
random_file_path = f"tests/random_files/{random_file_size}_mb_blob.bin"
with open(random_file_path, "wb") as file:
file.write(random_file)
try:
# make required shares = total shares random / 2
test_required_shares = int(test_total_shares / 2)
print(f"\nRandom file created: {random_file_path}, testing with {test_total_shares} total shares, {test_required_shares} required shares and {test_remove_segments} missing segments.")
test_segments = []
last_percent = 0
def on_progress(id, progress):
nonlocal last_percent
if progress % 10 == 0:
if last_percent == progress:
return
print(f"{id} - Progress: {progress}%")
last_percent = progress
def on_distribute_complete(id, segments):
print(f"{id} - Distribute complete. File split into {len(segments)} segments.")
nonlocal test_segments
nonlocal test_remove_segments
print(f"Segment headers:")
for segment in segments:
# append the segment to the test segments
test_segments.append(segment)
# display the header of each segment
header = eadengine.decode_header(segment)
print(f" L Segment (index) {header['which_segment']}, Data Len: {header['data_len']}, Parity: {header['is_parity']}")
# for testing, remove the segments from random positions
if test_remove_segments > 0:
for i in range(test_remove_segments):
random_index = random.randint(0, len(test_segments) - 1)
test_segments.pop(random_index)
print(f"Removed segment at index {random_index}.")
def on_rebuild_complete(id, decrypted_file_path):
print(f"{id} - Restored file saved to {decrypted_file_path} using {len(test_segments)} segments.")
def on_exception(id, exception):
print(f"{id} - An exception occurred: {exception}")
# keys
key = "UlW_heN5uzEle9rb4_CqrO5nkFgS4HH1".encode()
iv = "tswn0VXbEK0KXqhS".encode()
# create an instance of the engine and distribute the file
print(f"Distributing file with {test_total_shares} shares, {test_required_shares} required shares.")
eadengine = EaDEngine(
file_path=random_file_path,
total_shares=test_total_shares,
required_shares=test_required_shares,
output_path="tests/dist/",
key=key,
iv=iv,
success_callback_func=on_distribute_complete,
exception_callback_func=on_exception,
progress_callback_func=on_progress)
eadengine.split_file()
# wait for the engine to complete
if not eadengine.wait_on_complete():
print("Encyption and distribution failed!")
return
else:
print("Encryption and distribution complete!")
# restore the file
print(f"Restoring file with {len(test_segments)} segments.")
radengine = RaDEngine(
output_path="tests/dist/",
key=key,
iv=iv,
segments=test_segments,
success_callback_func=on_rebuild_complete,
exception_callback_func=on_exception,
progress_callback_func=on_progress)
radengine.restore_file()
# wait for the engine to complete
if not radengine.wait_on_complete():
print("Decryption and restoration failed!")
return
else:
print("Decryption and restoration complete!")
# compare the original file with the restored file, readin file hashs
original_hash_file_name = os.path.join(f"tests/dist/{radengine.id}/", os.path.basename(random_file_path)) + ".sha256"
restored_hash_file_name = f"tests/dist/{radengine.id}/restored.blob.sha256"
with open(original_hash_file_name, "r") as file:
original_hash = file.read()
with open(restored_hash_file_name, "r") as file:
restored_hash = file.read()
if original_hash == restored_hash:
print(f"File hashes match! {original_hash} == {restored_hash}")
else:
print("File hashes DO NOT match!")
finally:
# remove the random file
os.remove(random_file_path)
if __name__ == '__main__':
# remove all files in the test directory
shutil.rmtree("tests/dist/", ignore_errors=True)
time.sleep(1)
# run the test N times
for i in range(5):
print(f"Test run {i + 1}")
random_total_files = random.randint(5, 10)
random_required_files = random.randint(3, 7)
random_missing_files = random.randint(0, 2)
test_file(test_total_shares=random_total_files, test_remove_segments=random_missing_files)