forked from MikeHibbert/arweave-python-client
-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathtest_node2_bw.py
169 lines (154 loc) · 6.09 KB
/
test_node2_bw.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
#!/usr/bin/env python3
print()
print('Bundlr node2 can theoretically provide free uploads at a rate of 1MB/s,')
print(' if they are blocked into 100kb transactions and limited to 600/min.')
print()
print('Can we accomplish this speed?')
print()
print(' [note: after exploration, it appears the speed is uncapped atm,')
print(' and speeds near and above 1MB/s are just rate control artefacts.]')
print()
import os, random, time
import tqdm
import concurrent.futures, threading, multiprocessing.pool, joblib
from ar import Peer, Wallet, DataItem
from bundlr import Node
import toys.node2_pump
di_header_size = len(DataItem(data=b'').tobytes())
raw_size = 100*1024
payload_size = raw_size - di_header_size
print(f'Raw size: {raw_size}B')
print(f'Payload size: {payload_size}B')
TOTAL_COUNT = 1200
print('Prepping data ...')
wallet = (
Wallet('testwallet.json')
if os.path.exists('testwallet.json')
else Wallet.generate(jwk_file='testwallet.json')
)
with open('testdata.bin', 'ab+') as f:
if f.tell() < TOTAL_COUNT*payload_size:
f.write(random.randbytes(TOTAL_COUNT*payload_size-f.tell()))
f.seek(0)
data = [f.read(payload_size) for idx in tqdm.tqdm(range(TOTAL_COUNT),total=TOTAL_COUNT,desc='Reading',leave=False,unit='blk')]
def encode():
global dataitems
dataitems = []
for idx in tqdm.tqdm(range(len(data)),total=len(data),desc='Encoding',leave=False,unit='di'):
d = data[idx]
di = DataItem(data=d)
di.sign(wallet.rsa)
bytes = di.tobytes()
assert len(bytes) == raw_size and len(di.data) == payload_size
dataitems.append(bytes)
node = Node()
#print('joblib ...')
#encode()
#mark = time.time() - 1
#expected_sent = 0
#def send(di):
# node = Node()
# return node.send_tx(di)
# #pbar.update(len(di))
# #return
#with tqdm.tqdm(total=100000*TOTAL_COUNT, unit='B', unit_scale=True, smoothing=0) as pbar, multiprocessing.pool.ThreadPool() as pool:
# for chunk_offset in range(0, TOTAL_COUNT, 10):
# results = joblib.Parallel(n_jobs=10)(joblib.delayed(send)(di) for di in dataitems[chunk_offset:chunk_offset+10])
# pbar.update(10*100000)
# #expected_sent += 10 * 100000
# #assert pbar.n == expected_sent
# now = time.time()
# mark += 1
# if now < mark:
# pbar.display(f'sleeping for {mark - now}')
# time.sleep(mark - now)
'''
print('toys/node2_pump InsertableQueue with ThreadPoolExecutor')
encode()
queue = toys.node2_pump.InsertableRateQueueIterable(at_once=10, period_secs=1)
for di in dataitems:
queue.add(di)
with tqdm.tqdm(total=100000*TOTAL_COUNT, unit='B', unit_scale=True, smoothing=0) as pbar, concurrent.futures.ThreadPoolExecutor() as pool:
for result in pool.map(node.send_tx, queue):
pbar.update(100000)
'''
print('toys.node2_pump.Pump ...')
with toys.node2_pump.Pump(node, bytes_per_period=102400*10, period_secs=1) as pump:
#with toys.node2_pump.Pump(node, bytes_per_period=None, period_secs=1) as pump:
with tqdm.tqdm(total=100000*len(data), unit='B', unit_scale=True, smoothing=0) as pbar:
for idx in tqdm.tqdm(range(len(data)),total=len(data),desc='Encoding',leave=False,unit='di'):
di = DataItem(data=data[idx])
di.sign(wallet.rsa)
pump.feed(di)
for result in pump.fetch(len(data)):
pbar.update(100000)
print('concurrent.futures.ThreadPoolExecutor ...')
encode()
mark = time.time() - 1
lock = threading.Lock()
expected_sent = 0
def send(di):
result = node.send_tx(di)
with lock:
pbar.update(100000)#len(di))
return result
with tqdm.tqdm(total=100000*TOTAL_COUNT, unit='B', unit_scale=True, smoothing=0) as pbar, concurrent.futures.ThreadPoolExecutor(max_workers=10) as pool:
for chunk_offset in range(0, TOTAL_COUNT, 10):
results = pool.map(send, dataitems[chunk_offset:chunk_offset+10])
results = list(results) # enumerates generator
expected_sent += 10 * 100000
assert pbar.n == expected_sent
now = time.time()
mark += 1
if now < mark:
pbar.display(f'sleeping for {mark - now}')
time.sleep(mark - now)
import yarl39
print('yarl39 ...')
#with yarl39.SyncThreadPump(node.send_tx, size_per_period=None, period_secs=1) as pump:
with yarl39.SyncThreadPump(node, bytes_per_period=102400*10, period_secs=1) as pump:
with tqdm.tqdm(total=100000*len(data), unit='B', unit_scale=True, smoothing=0) as pbar:
for idx in tqdm.tqdm(range(len(data)),total=len(data),desc='Encoding',leave=False,unit='di'):
di = DataItem(data=data[idx])
di.sign(wallet.rsa)
byts = di.tobytes()
pump.feed(len(byts), byts)
for result in pump.fetch(len(data)):
pbar.update(100000)
print('multiprocessing.pool.ThreadPool ...')
encode()
mark = time.time() - 1
lock = multiprocessing.Lock()
expected_sent = 0
def send(di):
result = node.send_tx(di)
with lock:
pbar.update(100000)#len(di))
return result
with tqdm.tqdm(total=100000*TOTAL_COUNT, unit='B', unit_scale=True, smoothing=0) as pbar, multiprocessing.pool.ThreadPool(processes=10) as pool:
for chunk_offset in range(0, TOTAL_COUNT, 10):
results = pool.map(send, dataitems[chunk_offset:chunk_offset+10])
expected_sent += 10 * 100000
assert pbar.n == expected_sent
now = time.time()
mark += 1
if now < mark:
pbar.display(f'sleeping for {mark - now}')
time.sleep(mark - now)
print('single-threaded serial sends ...')
encode()
mark = time.time() - 1
expected_sent = 0
def send(di):
node.send_tx(di)
pbar.update(100000)#len(di))
with tqdm.tqdm(total=100000*TOTAL_COUNT, unit='B', unit_scale=True, smoothing=0) as pbar, multiprocessing.pool.ThreadPool() as pool:
for chunk_offset in range(0, TOTAL_COUNT, 10):
results = [send(di) for di in dataitems[chunk_offset:chunk_offset+10]]
expected_sent += 10 * 100000
assert pbar.n == expected_sent
now = time.time()
mark += 1
if now < mark:
pbar.display(f'sleeping for {mark - now}')
time.sleep(mark - now)