-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathexample.py
More file actions
executable file
·103 lines (74 loc) · 2.37 KB
/
example.py
File metadata and controls
executable file
·103 lines (74 loc) · 2.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#!/usr/bin/env python3
"""
Example testbench
"""
import random
import vpw
import vpw.axim
import vpw.axim2ram
import vpw.axis
if __name__ == '__main__':
dut = vpw.create(module='example',
clock='clk')
vpw.init(dut)
up_stream = vpw.axis.Master("up_axis", 32, concat=2)
vpw.register(up_stream)
dn_stream = vpw.axis.Slave("dn_axis", 32, concat=2)
vpw.register(dn_stream)
axim = vpw.axim.Master("axim", 128, 16)
vpw.register(axim)
vpw.register(vpw.axim2ram.Memory("axim2ram", 128, 16))
# test AXI-Streaming interface
data1 = [n+1 for n in range(16)]
data2 = [17, 18, 19, 20, 21, 22, 23, 24]
up_stream.send(data1, position=0)
up_stream.send(data2, position=1)
dn_stream.ready(True, position=0)
dn_stream.ready(True, position=1)
vpw.idle(100)
print("First stream received")
stream = dn_stream.recv(position=0)
for x in stream:
print(f"stream 1: {x}")
print("Second stream received")
stream = dn_stream.recv(position=1)
for x in stream:
print(f"stream 2: {x}")
print("Intermittent ready on down stream receive")
up_stream.send([n+1 for n in range(10)], position=0)
while len(dn_stream.queue[0]) == 0:
dn_stream.ready(bool(random.getrandbits(1)))
vpw.tick()
stream = dn_stream.recv(position=0)
for x in stream:
print(f"intermittent 0: {x}")
vpw.idle(100)
print("Intermittent valid on up stream send")
up_stream.send([n+1 for n in range(10)], position=1)
while len(dn_stream.queue[1]) == 0:
up_stream.pause(bool(random.getrandbits(1)), 1)
vpw.tick()
stream = dn_stream.recv(position=1)
for x in stream:
print(f"intermittent 1: {x}")
# test AXI-MM interface
axim.send_write(0, [n+1 for n in range(128)])
vpw.idle(1000)
axim.send_read(0, 128)
while True:
vpw.tick()
burst = axim.recv_read()
if burst:
print(burst)
for x, beat in enumerate(burst):
assert((x+1) == beat)
break
vpw.idle(10)
# test AXI-MM interface with large write/read pair
data = [n+1 for n in range(512)]
axim.write(vpw.tick, 256, data, 1)
received = axim.read(vpw.tick, 256, int(len(data) * 128 / 8), 1)
for s, r in zip(data, received):
assert(s == r)
vpw.idle(100)
vpw.finish()