-
Notifications
You must be signed in to change notification settings - Fork 28
/
Copy pathmonkeytest.py
193 lines (170 loc) · 7.4 KB
/
monkeytest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
#!/usr/bin/env python
'''
MonkeyTest -- test your hard drive read-write speed in Python
A simplistic script to show that such system programming
tasks are possible and convenient to be solved in Python
The file is being created, then written with random data, randomly read
and deleted, so the script doesn't waste your drive
(!) Be sure, that the file you point to is not something
you need, cause it'll be overwritten during test
Runs on both Python3 and 2, despite that I prefer 3
Has been tested on 3.5 and 2.7 under ArchLinux
Has been tested on 3.5.2 under Ubuntu Xenial
'''
from __future__ import division, print_function # for compatability with py2
import os, sys
from random import shuffle
import argparse
import json
ASCIIART = r'''Brought to you by coding monkeys.
Eat bananas, drink coffee & enjoy!
_
,//)
) /
/ /
_,^^,/ /
(G,66<_/
_/\_,_) _
/ _ \ ,' )
/ /"\ \/ ,_\
__(,/ > e ) / (_\.oO
\_ / ( -,_/ \_/
U \_, _)
( /
>/
(.oO
'''
# ASCII-art: used part of text-image @ http://www.ascii-art.de/ascii/mno/monkey.txt
# it seems that its original author is Mic Barendsz (mic aka miK)
# text-image is a bit old (1999) so I couldn't find a way to communicate with author
# if You're reading this and You're an author -- feel free to write me
try: # if Python >= 3.3 use new high-res counter
from time import perf_counter as time
except ImportError: # else select highest available resolution counter
if sys.platform[:3] == 'win':
from time import clock as time
else:
from time import time
def get_args():
parser = argparse.ArgumentParser(description='Arguments', formatter_class = argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-f', '--file',
required=False,
action='store',
default='/tmp/monkeytest',
help='The file to read/write to')
parser.add_argument('-s', '--size',
required=False,
action='store',
type=int,
default=128,
help='Total MB to write')
parser.add_argument('-w', '--write-block-size',
required=False,
action='store',
type=int,
default=1024,
help='The block size for writing in bytes')
parser.add_argument('-r', '--read-block-size',
required=False,
action='store',
type=int,
default=512,
help='The block size for reading in bytes')
parser.add_argument('-j', '--json',
required=False,
action='store',
help='Output to json file')
args = parser.parse_args()
return args
class Benchmark:
def __init__(self, file,write_mb, write_block_kb, read_block_b):
self.file = file
self.write_mb = write_mb
self.write_block_kb = write_block_kb
self.read_block_b = read_block_b
wr_blocks = int(self.write_mb * 1024 / self.write_block_kb)
rd_blocks = int(self.write_mb * 1024 * 1024 / self.read_block_b)
self.write_results = self.write_test( 1024 * self.write_block_kb, wr_blocks)
self.read_results = self.read_test(self.read_block_b, rd_blocks)
def write_test(self, block_size, blocks_count, show_progress=True):
'''
Tests write speed by writing random blocks, at total quantity
of blocks_count, each at size of block_size bytes to disk.
Function returns a list of write times in sec of each block.
'''
f = os.open(self.file, os.O_CREAT | os.O_WRONLY, 0o777) # low-level I/O
took = []
for i in range(blocks_count):
if show_progress:
# dirty trick to actually print progress on each iteration
sys.stdout.write('\rWriting: {:.2f} %'.format(
(i + 1) * 100 / blocks_count))
sys.stdout.flush()
buff = os.urandom(block_size)
start = time()
os.write(f, buff)
os.fsync(f) # force write to disk
t = time() - start
took.append(t)
os.close(f)
return took
def read_test(self, block_size, blocks_count, show_progress=True):
'''
Performs read speed test by reading random offset blocks from
file, at maximum of blocks_count, each at size of block_size
bytes until the End Of File reached.
Returns a list of read times in sec of each block.
'''
f = os.open(self.file, os.O_RDONLY, 0o777) # low-level I/O
# generate random read positions
offsets = list(range(0, blocks_count * block_size, block_size))
shuffle(offsets)
took = []
for i, offset in enumerate(offsets, 1):
if show_progress and i % int(self.write_block_kb * 1024 / self.read_block_b) == 0:
# read is faster than write, so try to equalize print period
sys.stdout.write('\rReading: {:.2f} %'.format(
(i + 1) * 100 / blocks_count))
sys.stdout.flush()
start = time()
os.lseek(f, offset, os.SEEK_SET) # set position
buff = os.read(f, block_size) # read from position
t = time() - start
if not buff: break # if EOF reached
took.append(t)
os.close(f)
return took
def print_result(self):
result = ('\n\nWritten {} MB in {:.4f} s\nWrite speed is {:.2f} MB/s'
'\n max: {max:.2f}, min: {min:.2f}\n'.format(
self.write_mb, sum(self.write_results), self.write_mb / sum(self.write_results),
max=self.write_block_kb / (1024 * min(self.write_results)),
min=self.write_block_kb / (1024 * max(self.write_results))))
result += ('\nRead {} x {} B blocks in {:.4f} s\nRead speed is {:.2f} MB/s'
'\n max: {max:.2f}, min: {min:.2f}\n'.format(
len(self.read_results), self.read_block_b,
sum(self.read_results), self.write_mb / sum(self.read_results),
max=self.read_block_b / (1024 * 1024 * min(self.read_results)),
min=self.read_block_b / (1024 * 1024 * max(self.read_results))))
print(result)
print(ASCIIART)
def get_json_result(self,output_file):
results_json = {}
results_json["Written MB"] = self.write_mb
results_json["Write time (sec)"] = round(sum(self.write_results),2)
results_json["Write speed in MB/s"] = round(self.write_mb / sum(self.write_results),2)
results_json["Read blocks"] = len(self.read_results)
results_json["Read time (sec)"] = round(sum(self.read_results),2)
results_json["Read speed in MB/s"] = round(self.write_mb / sum(self.read_results),2)
with open(output_file,'w') as f:
json.dump(results_json,f)
def main():
args = get_args()
benchmark = Benchmark(args.file, args.size, args.write_block_size, args.read_block_size)
if args.json is not None:
benchmark.get_json_result(args.json)
else:
benchmark.print_result()
os.remove(args.file)
if __name__ == "__main__":
main()