Skip to content

Commit

Permalink
Added per-queue timeout value (#640)
Browse files Browse the repository at this point in the history
* Added per-queue timeout value

Signed-off-by: Cliff Burdick <cburdick@nvidia.com>

* lint fixes

Signed-off-by: Cliff Burdick <cburdick@nvidia.com>

* Update README

Signed-off-by: Cliff Burdick <cburdick@nvidia.com>

* Remove prototype

Signed-off-by: Cliff Burdick <cburdick@nvidia.com>

* lint fixes

Signed-off-by: Cliff Burdick <cburdick@nvidia.com>

---------

Signed-off-by: Cliff Burdick <cburdick@nvidia.com>
Co-authored-by: Tom Birdsong <40648863+tbirdso@users.noreply.github.com>
  • Loading branch information
cliffburdick and tbirdso authored Jan 6, 2025
1 parent 9ac961e commit 872d865
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 4 deletions.
2 changes: 2 additions & 0 deletions operators/advanced_network/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,8 @@ Too low means risk of dropped packets from NIC having nowhere to write (Rx) or h
- type: `string`
- **`memory_regions`**: List of memory regions where buffers are stored. memory regions names are configured in the [Memory Regions](#memory-regions) section
type: `list`
- **`timeout_us`**: Timeout value that a batch will be sent on even if not enough packets to fill a batch were received
- type: `integer`

- **`flows`**: List of flows - rules to apply to packets, mostly to divert to the right queue. (<mark>Not in use for Rivermax manager</mark>)
type: `list`
Expand Down
5 changes: 5 additions & 0 deletions operators/advanced_network/adv_network_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,11 @@ struct YAML::convert<holoscan::ops::AdvNetConfigYaml> {
HOLOSCAN_LOG_ERROR("Failed to parse RxQueueConfig");
return false;
}

try {
q.timeout_us_ = q_item["timeout_us"].as<uint64_t>();
} catch (const std::exception& e) { q.timeout_us_ = 0; }

rx_cfg.queues_.emplace_back(std::move(q));
}

Expand Down
1 change: 1 addition & 0 deletions operators/advanced_network/adv_network_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ struct MemoryRegion {

struct RxQueueConfig {
CommonQueueConfig common_;
uint64_t timeout_us_;
std::string output_port_;
};

Expand Down
38 changes: 34 additions & 4 deletions operators/advanced_network/managers/dpdk/adv_network_dpdk_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ struct RxWorkerParams {
int port;
int queue;
int num_segs;
uint64_t timeout_us;
uint32_t batch_size;
struct rte_ring* ring;
struct rte_mempool* flowid_pool;
Expand Down Expand Up @@ -1199,6 +1200,7 @@ void DpdkMgr::run() {
params->flowid_pool = rx_flow_id_buffer;
params->meta_pool = rx_meta;
params->batch_size = q.common_.batch_size_;
params->timeout_us = q.timeout_us_;
rte_eal_remote_launch(
rx_worker, (void*)params, strtol(q.common_.cpu_core_.c_str(), NULL, 10));
}
Expand Down Expand Up @@ -1237,6 +1239,7 @@ void DpdkMgr::flush_packets(int port) {
while (rte_eth_rx_burst(port, 0, &rx_mbuf, 1) != 0) { rte_pktmbuf_free(rx_mbuf); }
}


////////////////////////////////////////////////////////////////////////////////
///
/// \brief
Expand All @@ -1246,9 +1249,11 @@ int DpdkMgr::rx_core_worker(void* arg) {
RxWorkerParams* tparams = (RxWorkerParams*)arg;
struct rte_mbuf* rx_mbufs[DEFAULT_NUM_RX_BURST];
int ret = 0;
uint64_t freq = rte_get_tsc_hz();
uint64_t timeout_ticks = freq * 0.02; // expect all packets within 20ms

// In the future we may want to periodically update this if the CPU clock drifts
uint64_t freq = rte_get_tsc_hz();
uint64_t timeout_cycles = freq * (tparams->timeout_us/1e6);
uint64_t last_cycles = rte_get_tsc_cycles();
uint64_t total_pkts = 0;

flush_packets(tparams->port);
Expand Down Expand Up @@ -1336,7 +1341,21 @@ int DpdkMgr::rx_core_worker(void* arg) {
reinterpret_cast<rte_mbuf**>(&mbuf_arr[0]),
DEFAULT_NUM_RX_BURST);

if (nb_rx == 0) { continue; }
if (nb_rx == 0) {
if (burst->hdr.hdr.num_pkts > 0 && timeout_cycles > 0) {
const auto cur_cycles = rte_get_tsc_cycles();

// We hit our timeout. Send the partial batch immediately
if ((cur_cycles - last_cycles) > timeout_cycles) {
cur_pkt_in_batch = 0;
rte_ring_enqueue(tparams->ring, reinterpret_cast<void*>(burst));
last_cycles = cur_cycles;
break;
}
}

continue;
}

to_copy = std::min(nb_rx, (int)(tparams->batch_size - burst->hdr.hdr.num_pkts));
memcpy(&burst->pkts[0][burst->hdr.hdr.num_pkts], &mbuf_arr, sizeof(rte_mbuf*) * to_copy);
Expand Down Expand Up @@ -1366,9 +1385,20 @@ int DpdkMgr::rx_core_worker(void* arg) {
nb_rx -= to_copy;

if (burst->hdr.hdr.num_pkts == tparams->batch_size) {
cur_pkt_in_batch = 0;
rte_ring_enqueue(tparams->ring, reinterpret_cast<void*>(burst));
cur_pkt_in_batch = 0;
last_cycles = rte_get_tsc_cycles();
break;
} else if (timeout_cycles > 0) {
const auto cur_cycles = rte_get_tsc_cycles();

// We hit our timeout. Send the partial batch immediately
if ((cur_cycles - last_cycles) > timeout_cycles) {
rte_ring_enqueue(tparams->ring, reinterpret_cast<void*>(burst));
cur_pkt_in_batch = 0;
last_cycles = cur_cycles;
break;
}
}
} while (!force_quit.load());
}
Expand Down

0 comments on commit 872d865

Please sign in to comment.