forked from p4lang/p4app-switchML
-
Notifications
You must be signed in to change notification settings - Fork 0
/
next_step_selector.py
343 lines (291 loc) · 15.3 KB
/
next_step_selector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
# Copyright 2021 Intel-KAUST-Microsoft
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from control import Control
from common import PacketSize, PacketType
from bfrt_grpc.client import BfruntimeRpcException
from enum import IntEnum
class Flag(IntEnum):
''' First-last flag '''
FIRST = 0x0
LAST = 0x1
class NextStepSelector(Control):
def __init__(self, target, gc, bfrt_info, folded_pipe):
# Set up base class
super(NextStepSelector, self).__init__(target, gc)
self.log = logging.getLogger(__name__)
self.tables = [
bfrt_info.table_get('pipe.Ingress.next_step_selector.next_step')
]
self.table = self.tables[0]
self.folded_pipe = folded_pipe
# Counters
self.broadcast_counter = bfrt_info.table_get(
'pipe.Ingress.next_step_selector.broadcast_counter')
self.retransmit_counter = bfrt_info.table_get(
'pipe.Ingress.next_step_selector.retransmit_counter')
self.recirculate_counter = bfrt_info.table_get(
'pipe.Ingress.next_step_selector.recirculate_counter')
self.drop_counter = bfrt_info.table_get(
'pipe.Ingress.next_step_selector.drop_counter')
# Clear table and counters and add defaults
self._clear()
self.add_default_entries()
def _clear(self):
super(NextStepSelector, self)._clear()
self.reset_counters()
def add_default_entries(self):
''' Add default entries '''
# Special recirculation ports used for harvest passes
port = {1: 452, 2: 324, 3: 448, 4: 196, 5: 192, 6: 64, 7: 68}
# yapf: disable
entries = [
# packet_size | worker_id | packet_type | first_last_flag | bitmap_result | priority | action | port |
# 128B packets not supported at the moment
( PacketSize.MTU_128, None, None, None, None, 0, 'drop', None),
# 256B packets: Pipe 0 only at the moment
# Last packet -> recirculate for harvest
( PacketSize.MTU_256, None, PacketType.CONSUME0, Flag.LAST, None, 1, 'recirculate_for_HARVEST7', port[7]),
## Just consume a CONSUME packet if it is not the last and we haven't seen it before
( PacketSize.MTU_256, None, PacketType.CONSUME0, None, 0, 2, 'finish_consume', None),
## CONSUME packets that are retransmitted packets to a full slot -> recirculate for harvest
( PacketSize.MTU_256, None, PacketType.CONSUME0, Flag.FIRST, None, 3, 'recirculate_for_HARVEST7', port[7]),
## Drop others
( PacketSize.MTU_256, None, PacketType.CONSUME0, None, None, 4, 'drop', None),
## 512B packets not supported at the moment
( PacketSize.MTU_512, None, None, None, None, 5, 'drop', None)]
if self.folded_pipe:
## 1024B packets
entries.extend([
## Pipe 0, packet not seen before -> pipe 1 (load balance on worker id)
(PacketSize.MTU_1024, i, PacketType.CONSUME0, None, 0, 6, 'recirculate_for_CONSUME1', (1 << 7) + i * 4)
for i in range(16)])
entries.extend([
## Pipe 0, retransmitted packet to a full slot -> pipe 1
## Run through the same path as novel packets (do not skip to harvest) to ensure ordering
(PacketSize.MTU_1024, i, PacketType.CONSUME0, Flag.FIRST, None, 7, 'recirculate_for_CONSUME1', (1 << 7) + i * 4)
for i in range(16)])
entries.extend([
## Drop other CONSUME0 packets
(PacketSize.MTU_1024, None, PacketType.CONSUME0, None, None, 8, 'drop', None),
## Pipe 1 -> pipe 2
(PacketSize.MTU_1024, None, PacketType.CONSUME1, None, None, 9, 'recirculate_for_CONSUME2_same_port_next_pipe', None),
## Pipe 2 -> pipe 3
(PacketSize.MTU_1024, None, PacketType.CONSUME2, None, None, 10, 'recirculate_for_CONSUME3_same_port_next_pipe', None),
## Pipe 4
## For CONSUME3 packets that are the last packet, recirculate for harvest
## The last pass is a combined consume/harvest pass, so skip directly to HARVEST1
(PacketSize.MTU_1024, None, PacketType.CONSUME3, Flag.LAST, None, 11, 'recirculate_for_HARVEST1', port[1]),
## Just consume any CONSUME3 packets if they're not last and we haven't seen them before
(PacketSize.MTU_1024, None, PacketType.CONSUME3, None, 0, 12, 'finish_consume', None),
## CONSUME3 packets that are retransmitted packets to a full slot -> recirculate for harvest
(PacketSize.MTU_1024, None, PacketType.CONSUME3, Flag.FIRST, None, 13, 'recirculate_for_HARVEST1', port[1]),
## Drop others
(PacketSize.MTU_1024, None, PacketType.CONSUME3, None, None, 14, 'drop', None),
## Harvesting 128B at a time
(PacketSize.MTU_1024, None, PacketType.HARVEST0, None, None, 15, 'recirculate_for_HARVEST1', port[1]),
(PacketSize.MTU_1024, None, PacketType.HARVEST1, None, None, 16, 'recirculate_for_HARVEST2', port[2]),
(PacketSize.MTU_1024, None, PacketType.HARVEST2, None, None, 17, 'recirculate_for_HARVEST3', port[3]),
(PacketSize.MTU_1024, None, PacketType.HARVEST3, None, None, 18, 'recirculate_for_HARVEST4', port[4]),
(PacketSize.MTU_1024, None, PacketType.HARVEST4, None, None, 19, 'recirculate_for_HARVEST5', port[5]),
(PacketSize.MTU_1024, None, PacketType.HARVEST5, None, None, 20, 'recirculate_for_HARVEST6', port[6]),
(PacketSize.MTU_1024, None, PacketType.HARVEST6, None, None, 21, 'recirculate_for_HARVEST7', port[7])])
entries.extend([
## Harvest pass 7: final pass
## Read final 128B and broadcast any HARVEST packets that are not
## retransmitted and are the last packet
( None, None, PacketType.HARVEST7, Flag.LAST, 0, 22, 'broadcast', None),
## First packet, not a retranmsmission
## (shouldn't ever get here, because the packet would be dropped in CONSUME)
( None, None, PacketType.HARVEST7, Flag.FIRST, 0, 23, 'drop', None),
## Retransmit any other HARVEST packets
( None, None, PacketType.HARVEST7, Flag.FIRST, None, 24, 'retransmit', None),
## Drop any other HARVEST packets
( None, None, PacketType.HARVEST7, None, None, 25, 'drop', None)
])
# yapf: enable
for e in entries:
success, error_msg = self.add_entry(
**{
'packet_size': e[0],
'worker_id': e[1],
'packet_type': e[2],
'first_last_flag': e[3],
'bitmap_result': e[4],
'priority': e[5],
'action': e[6],
'recirc_dev_port': e[7]
})
if not success:
self.log.critical(error_msg)
def add_entry(self,
action,
recirc_dev_port=None,
packet_size=None,
worker_id=None,
packet_type=None,
first_last_flag=None,
bitmap_result=None,
priority=0):
''' Add next step selector entry. Match arguments can be None, in which case
their mask will be zeroed.
Keyword arguments:
action -- action name
recirc_dev_port -- recirculation port to use for the next pass (if the action requires it)
packet_size -- packet MTU
worker_id -- worker rank, or 2-tuple with (rank,mask)
packet_type -- packet type
first_last_flag -- first or last packet: 1: last 0: first
bitmap_result -- result of bitmap check: 0: not a retransmission, not zero: retransmission
priority -- entry priority (default: 0 = highest)
Returns:
(success flag, None or error message)
'''
# Parameters validation
if packet_size != None and type(packet_size) != PacketSize:
error_msg = 'Invalid packet size {}'.format(packet_size)
self.log.error(error_msg)
return (False, error_msg)
if worker_id != None and worker_id >= 32:
error_msg = 'Worker ID {} too large; only 32 workers supported'.format(
worker_id)
self.log.error(error_msg)
return (False, error_msg)
if packet_type != None and type(packet_type) != PacketType:
error_msg = 'Invalid packet type {}'.format(packet_type)
self.log.error(error_msg)
return (False, error_msg)
if first_last_flag != None and type(first_last_flag) != Flag:
error_msg = 'Invalid first_last_flag type {}'.format(
first_last_flag)
self.log.error(error_msg)
return (False, error_msg)
actions_with_argument = [
'recirculate_for_CONSUME1', 'recirculate_for_HARVEST1',
'recirculate_for_HARVEST2', 'recirculate_for_HARVEST3',
'recirculate_for_HARVEST4', 'recirculate_for_HARVEST5',
'recirculate_for_HARVEST6', 'recirculate_for_HARVEST7'
]
actions_without_argument = [
'recirculate_for_CONSUME2_same_port_next_pipe',
'recirculate_for_CONSUME3_same_port_next_pipe', 'finish_consume',
'broadcast', 'retransmit', 'drop'
]
action_prefix = 'Ingress.next_step_selector.'
if action in actions_with_argument:
if recirc_dev_port == None:
error_msg = 'Missing recirculation port for action {}'.format(
action)
self.log.error(error_msg)
return (False, error_msg)
else:
data = self.table.make_data([
self.gc.DataTuple('recirc_port', recirc_dev_port),
], action_prefix + action)
elif action in actions_without_argument:
data = self.table.make_data([], action_prefix + action)
else:
error_msg = 'Invalid action {}'.format(action)
self.log.error(error_msg)
return (False, error_msg)
# Convert parameters to 2-tuple (value,mask)
packet_size = (0, 0) if packet_size == None else (packet_size,
0x7) # 3 bits
packet_type = (0, 0) if packet_type == None else (packet_type,
0xf) # 4 bits
first_last_flag = (0,
0) if first_last_flag == None else (first_last_flag,
0xff) # 8 bits
bitmap_result = (0, 0) if bitmap_result == None else (
bitmap_result, 0xffffffff) # 32 bits
if worker_id == None:
worker_id = (0, 0)
elif type(worker_id) != tuple:
worker_id = (worker_id, 0xffff) # 16 bits
# Add entry
self.table.entry_add(self.target, [
self.table.make_key([
self.gc.KeyTuple('ig_md.switchml_md.packet_size',
packet_size[0], packet_size[1]),
self.gc.KeyTuple('ig_md.switchml_md.worker_id', worker_id[0],
worker_id[1]),
self.gc.KeyTuple('ig_md.switchml_md.packet_type',
packet_type[0], packet_type[1]),
self.gc.KeyTuple('ig_md.switchml_md.first_last_flag',
first_last_flag[0], first_last_flag[1]),
self.gc.KeyTuple('ig_md.switchml_md.map_result',
bitmap_result[0], bitmap_result[1]),
self.gc.KeyTuple('$MATCH_PRIORITY', priority)
])
], [data])
self.log.debug('Next step entry: {}: packet_size {}, worker_id {}, '
'packet_type {}, first_last_flag {}, bitmap_result {}'
' -> {}({})'.format(priority, packet_size, worker_id,
packet_type, first_last_flag,
bitmap_result, action,
recirc_dev_port))
return (True, None)
def reset_counters(self):
''' Reset counters '''
# Reset indirect counters
self.broadcast_counter.entry_del(self.target)
self.retransmit_counter.entry_del(self.target)
self.recirculate_counter.entry_del(self.target)
self.drop_counter.entry_del(self.target)
def get_counters(self, start=0, count=None):
''' Get the current values of next step counters:
broadcasted, retransmitted, recirculated, dropped packets per slot index.
The parameters can limit the number of returned indices to
[start, start + count]. The default is [0, 8].
'''
if count == None:
count = 8
# Double start and count to get both sets
start = start * 2
count = count * 2
counters = {
'broadcast': self.broadcast_counter,
'retransmit': self.retransmit_counter,
'recirculate': self.recirculate_counter,
'drop': self.drop_counter
}
try:
stats = {}
for counter_name, counter in counters.items():
counter.operations_execute(self.target, 'Sync')
resp = counter.entry_get(self.target, [
counter.make_key([self.gc.KeyTuple('$COUNTER_INDEX', i)])
for i in range(start, start + count)
],
flags={'from_hw': False})
for v, k in resp:
v = v.to_dict()
k = k.to_dict()
pool_index = k['$COUNTER_INDEX']['value'] >> 1
pool_set = k['$COUNTER_INDEX']['value'] & 1
value = v['$COUNTER_SPEC_PKTS']
if ((pool_index, pool_set)) not in stats:
stats[pool_index, pool_set] = {}
stats[pool_index, pool_set][counter_name] = value
except BfruntimeRpcException as bfrte:
# Indices out of bound
self.log.debug(str(bfrte))
return []
# Linearize
values = []
for (pool_index, pool_set), data in stats.items():
data['index'] = pool_index
data['set'] = pool_set
values.append(data)
return values