@@ -47,6 +47,7 @@ struct RxWorkerParams {
47
47
int port;
48
48
int queue;
49
49
int num_segs;
50
+ uint64_t timeout_us;
50
51
uint32_t batch_size;
51
52
struct rte_ring * ring;
52
53
struct rte_mempool * flowid_pool;
@@ -1199,6 +1200,7 @@ void DpdkMgr::run() {
1199
1200
params->flowid_pool = rx_flow_id_buffer;
1200
1201
params->meta_pool = rx_meta;
1201
1202
params->batch_size = q.common_ .batch_size_ ;
1203
+ params->timeout_us = q.timeout_us_ ;
1202
1204
rte_eal_remote_launch (
1203
1205
rx_worker, (void *)params, strtol (q.common_ .cpu_core_ .c_str (), NULL , 10 ));
1204
1206
}
@@ -1237,6 +1239,30 @@ void DpdkMgr::flush_packets(int port) {
1237
1239
while (rte_eth_rx_burst (port, 0 , &rx_mbuf, 1 ) != 0 ) { rte_pktmbuf_free (rx_mbuf); }
1238
1240
}
1239
1241
1242
+ inline uint64_t DpdkMgr::get_tsc_hz () {
1243
+ #define CYC_PER_1MHZ 1E6
1244
+ /* Use the generic counter ticks to calculate the PMU
1245
+ * cycle frequency.
1246
+ */
1247
+ uint64_t ticks;
1248
+ uint64_t start_ticks, cur_ticks;
1249
+ uint64_t start_pmu_cycles, end_pmu_cycles;
1250
+
1251
+ /* Number of ticks for 1/10 second */
1252
+ ticks = __rte_arm64_cntfrq () / 10 ;
1253
+
1254
+ start_ticks = __rte_arm64_cntvct_precise ();
1255
+ start_pmu_cycles = rte_rdtsc_precise ();
1256
+ do {
1257
+ cur_ticks = __rte_arm64_cntvct ();
1258
+ } while ((cur_ticks - start_ticks) < ticks);
1259
+ end_pmu_cycles = rte_rdtsc_precise ();
1260
+
1261
+ /* Adjust the cycles to next 1Mhz */
1262
+ return RTE_ALIGN_MUL_CEIL (end_pmu_cycles - start_pmu_cycles,
1263
+ CYC_PER_1MHZ) * 10 ;
1264
+ }
1265
+
1240
1266
// //////////////////////////////////////////////////////////////////////////////
1241
1267
// /
1242
1268
// / \brief
@@ -1246,9 +1272,11 @@ int DpdkMgr::rx_core_worker(void* arg) {
1246
1272
RxWorkerParams* tparams = (RxWorkerParams*)arg;
1247
1273
struct rte_mbuf * rx_mbufs[DEFAULT_NUM_RX_BURST];
1248
1274
int ret = 0 ;
1249
- uint64_t freq = rte_get_tsc_hz ();
1250
- uint64_t timeout_ticks = freq * 0.02 ; // expect all packets within 20ms
1251
1275
1276
+ // In the future we may want to periodically update this if the CPU clock drifts
1277
+ uint64_t freq = rte_get_tsc_hz ();
1278
+ uint64_t timeout_cycles = freq * (tparams->timeout_us /1e6 );
1279
+ uint64_t last_cycles = rte_get_tsc_cycles ();
1252
1280
uint64_t total_pkts = 0 ;
1253
1281
1254
1282
flush_packets (tparams->port );
@@ -1336,7 +1364,21 @@ int DpdkMgr::rx_core_worker(void* arg) {
1336
1364
reinterpret_cast <rte_mbuf**>(&mbuf_arr[0 ]),
1337
1365
DEFAULT_NUM_RX_BURST);
1338
1366
1339
- if (nb_rx == 0 ) { continue ; }
1367
+ if (nb_rx == 0 ) {
1368
+ if (burst->hdr .hdr .num_pkts > 0 && timeout_cycles > 0 ) {
1369
+ const auto cur_cycles = rte_get_tsc_cycles ();
1370
+
1371
+ // We hit our timeout. Send the partial batch immediately
1372
+ if ((cur_cycles - last_cycles) > timeout_cycles) {
1373
+ cur_pkt_in_batch = 0 ;
1374
+ rte_ring_enqueue (tparams->ring , reinterpret_cast <void *>(burst));
1375
+ last_cycles = cur_cycles;
1376
+ break ;
1377
+ }
1378
+ }
1379
+
1380
+ continue ;
1381
+ }
1340
1382
1341
1383
to_copy = std::min (nb_rx, (int )(tparams->batch_size - burst->hdr .hdr .num_pkts ));
1342
1384
memcpy (&burst->pkts [0 ][burst->hdr .hdr .num_pkts ], &mbuf_arr, sizeof (rte_mbuf*) * to_copy);
@@ -1366,10 +1408,22 @@ int DpdkMgr::rx_core_worker(void* arg) {
1366
1408
nb_rx -= to_copy;
1367
1409
1368
1410
if (burst->hdr .hdr .num_pkts == tparams->batch_size ) {
1369
- cur_pkt_in_batch = 0 ;
1370
1411
rte_ring_enqueue (tparams->ring , reinterpret_cast <void *>(burst));
1412
+ cur_pkt_in_batch = 0 ;
1413
+ last_cycles = rte_get_tsc_cycles ();
1371
1414
break ;
1372
1415
}
1416
+ else if (timeout_cycles > 0 ) {
1417
+ const auto cur_cycles = rte_get_tsc_cycles ();
1418
+
1419
+ // We hit our timeout. Send the partial batch immediately
1420
+ if ((cur_cycles - last_cycles) > timeout_cycles) {
1421
+ rte_ring_enqueue (tparams->ring , reinterpret_cast <void *>(burst));
1422
+ cur_pkt_in_batch = 0 ;
1423
+ last_cycles = cur_cycles;
1424
+ break ;
1425
+ }
1426
+ }
1373
1427
} while (!force_quit.load ());
1374
1428
}
1375
1429
0 commit comments