-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathtesting.py
187 lines (149 loc) · 5.26 KB
/
testing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
#!/usr/bin/python
from config import *
from config_local import *
from mailboxsize import *
import os
import subprocess
import shutil
import random
ROOT_DIR = f'{os.getcwd()}/test' # For testing
TEST_TABLE = {
'name':'test_table', # This should be the name of the table
'pk_column':'UsageId', # You may not want to touch this if you already have a table. It is the primary key of the table and it is auto incremented.
'relational_column':'username', # Foreign key column fthat should take a unique identifier for each user (i.e. username, userid, email etc.)
'size_column':'UsageSize', # Finally column for recording total usage size.
'timestamp_column': 'TimeDetected', # Column for recording the time detected
}
TEST_DESCRIPTION = (
"CREATE TABLE " + TEST_TABLE['name'] + "("
" " + TEST_TABLE['pk_column'] +" int NOT NULL AUTO_INCREMENT,"
" " + TEST_TABLE['relational_column'] + " varchar(255) NOT NULL,"
" " + TEST_TABLE['size_column'] + " double,"
" " + TEST_TABLE['timestamp_column'] + " timestamp,"
" PRIMARY KEY (" + TEST_TABLE['pk_column'] + ")"
")"
)
USAGE_SIZES = {}
def init():
if not table_exists(TEST_TABLE['name']):
create_table(TABLE_DESCRIPTION=TEST_DESCRIPTION)
if not os.path.isdir('./test'):
#test_dir = os.path.join(os.getcwd(), 'test')
os.mkdir(ROOT_DIR)
paths = select_user_paths()
users = [tup[0].split("@")[1] for tup in paths]
users = dict.fromkeys(users, False)
for path in paths:
try:
account, user = path[0].split("@")
if not users[user]:
user_dir = os.path.join(ROOT_DIR, user)
os.mkdir(user_dir)
users[user] = True
account_dir = os.path.join(ROOT_DIR, path[1])
os.mkdir(account_dir)
test_paths = ['/test_one', '/test_two']
for test_path in test_paths:
test_dir = account_dir + test_path
os.mkdir(test_dir)
p = test_dir + '/bin_file'
rand = random.randint(1, 10)
size = 1024*1024*rand
with open(p, 'wb') as fout:
fout.write(os.urandom(size))
rand = random.randint(1, 10)
size = 1024*1024*rand
f = account_dir + 'bin_file'
with open(f, 'wb') as fout:
fout.write(os.urandom(size))
USAGE_SIZES[path[0]] = size
except OSError as e:
print (e)
def clean():
shutil.rmtree(ROOT_DIR)
cnx = create_connection()
cursor = cnx.cursor()
query = f"DROP TABLE {TEST_TABLE['name']}"
try:
cursor.execute(query)
except mysql.connector.errors.ProgrammingError as e:
cursor.close()
cnx.close()
print(e)
return
cursor.close()
cnx.close()
def test_database_records():
init()
print("Testing for correct database records.... ", end="")
PASSED = True
try:
update_usage_sizes(USAGE_TABLE=TEST_TABLE, ROOT_DIR=ROOT_DIR)
user_paths = select_user_paths()
for x in user_paths:
curr_dir = os.path.join(ROOT_DIR, x[1])
USAGE_SIZES[str(x[0])] = round(get_directory_size_in_megabytes(curr_dir), 6)
cnx = create_connection()
cursor = cnx.cursor()
query = f"SELECT {TEST_TABLE['relational_column']}, {TEST_TABLE['size_column']}, {TEST_TABLE['timestamp_column']} FROM {TEST_TABLE['name']}"
try:
cursor.execute(query)
except mysql.connector.errors.ProgrammingError as e:
cursor.close()
cnx.close()
print(e)
return
query_result = [rows for rows in cursor]
cursor.close()
cnx.close()
for result in query_result:
if USAGE_SIZES[result[0]] != result[1]:
print("Failed!")
PASSED = False
break
except Exception as e:
print("Error! \n", e)
finally:
clean()
if PASSED:
print("Passed!")
else:
print("Failed!")
return PASSED
def test_size_function():
init()
print("Testing size calculation.... ", end="")
PASSED = True
try:
user_paths = select_user_paths()
user_sizes = {}
for x in user_paths:
curr_dir = os.path.join(ROOT_DIR, x[1])
user_sizes[str(x[0])] = round(get_directory_size_in_megabytes(curr_dir), 6)
for path in user_paths:
command_path = f'./test/{path[1]}'
out = subprocess.run(["du", "-sb", "-s" , command_path], capture_output=True)
sys_size = out.stdout.decode('ascii')
size = sys_size.split(".")[0]
size = float(size)
size = (size-4)/(1024*1024)
size = round(size, 6)
if size != user_sizes[path[0]]:
print("Failed!")
PASSED = False
break
except Exception as e:
print("Error! \n", e)
finally:
clean()
if PASSED:
print("Passed!")
else:
print("Failed!")
return PASSED
def main():
test_size_function()
test_database_records()
# Main function
if __name__ == "__main__":
main()