Skip to content

Commit

Permalink
flowprobe: fix flush callbacks when multiple workers
Browse files Browse the repository at this point in the history
IPFIX buffers are stored on a per worker thread basis. Currently, the
flush callbacks will flush only buffers stored for the main thread. And
buffers for worker threads will not be sent until their size reach the
path MTU configured for the exporter. So if traffic is constant, the
problem will unlikely to be visible. Buffers will be sent once they
reach the maximum size. However, if traffic stops at some point and
flush is triggered in order to make the plugin send all currently
buffered data, this will not happen. And collectors will not receive
that data. The plugin will keep the remaining data until traffic starts
again, the buffers reach the maximum size, and be sent.

With this fix, flush buffers for worker threads and for the main thread
when the flush callbacks are triggered.

This will allow to remove @tag_fixme_vpp_workers from the unit tests
that don't set timers. The tests that set timers will still be failing
for other multi-worker related problems.

Type: fix
Change-Id: I9a7d9cef8ddbec7ee68c79309e48e7bc0953d488
Signed-off-by: Alexander Chernavin <achernavin@netgate.com>
(cherry picked from commit 4c7305f)
  • Loading branch information
achernavin22 authored and askorichenko committed Mar 12, 2024
1 parent a541cfd commit 500ac05
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 2 deletions.
86 changes: 86 additions & 0 deletions src/plugins/flowprobe/node.c
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ vlib_node_registration_t flowprobe_input_l2_node;
vlib_node_registration_t flowprobe_output_ip4_node;
vlib_node_registration_t flowprobe_output_ip6_node;
vlib_node_registration_t flowprobe_output_l2_node;
vlib_node_registration_t flowprobe_flush_ip4_node;
vlib_node_registration_t flowprobe_flush_ip6_node;
vlib_node_registration_t flowprobe_flush_l2_node;

/* No counters at the moment */
#define foreach_flowprobe_error \
Expand Down Expand Up @@ -945,18 +948,57 @@ flush_record (flowprobe_variant_t which)
void
flowprobe_flush_callback_ip4 (void)
{
vlib_main_t *worker_vm;
u32 i;

/* Flush for each worker thread */
for (i = 1; i < vlib_get_n_threads (); i++)
{
worker_vm = vlib_get_main_by_index (i);
if (worker_vm)
vlib_node_set_interrupt_pending (worker_vm,
flowprobe_flush_ip4_node.index);
}

/* Flush for the main thread */
flush_record (FLOW_VARIANT_IP4);
}

void
flowprobe_flush_callback_ip6 (void)
{
vlib_main_t *worker_vm;
u32 i;

/* Flush for each worker thread */
for (i = 1; i < vlib_get_n_threads (); i++)
{
worker_vm = vlib_get_main_by_index (i);
if (worker_vm)
vlib_node_set_interrupt_pending (worker_vm,
flowprobe_flush_ip6_node.index);
}

/* Flush for the main thread */
flush_record (FLOW_VARIANT_IP6);
}

void
flowprobe_flush_callback_l2 (void)
{
vlib_main_t *worker_vm;
u32 i;

/* Flush for each worker thread */
for (i = 1; i < vlib_get_n_threads (); i++)
{
worker_vm = vlib_get_main_by_index (i);
if (worker_vm)
vlib_node_set_interrupt_pending (worker_vm,
flowprobe_flush_l2_node.index);
}

/* Flush for the main thread */
flush_record (FLOW_VARIANT_L2);
flush_record (FLOW_VARIANT_L2_IP4);
flush_record (FLOW_VARIANT_L2_IP6);
Expand Down Expand Up @@ -1062,6 +1104,32 @@ flowprobe_walker_process (vlib_main_t * vm,
return 0;
}

static uword
flowprobe_flush_ip4 (vlib_main_t *vm, vlib_node_runtime_t *rt, vlib_frame_t *f)
{
flush_record (FLOW_VARIANT_IP4);

return 0;
}

static uword
flowprobe_flush_ip6 (vlib_main_t *vm, vlib_node_runtime_t *rt, vlib_frame_t *f)
{
flush_record (FLOW_VARIANT_IP6);

return 0;
}

static uword
flowprobe_flush_l2 (vlib_main_t *vm, vlib_node_runtime_t *rt, vlib_frame_t *f)
{
flush_record (FLOW_VARIANT_L2);
flush_record (FLOW_VARIANT_L2_IP4);
flush_record (FLOW_VARIANT_L2_IP6);

return 0;
}

/* *INDENT-OFF* */
VLIB_REGISTER_NODE (flowprobe_input_ip4_node) = {
.function = flowprobe_input_ip4_node_fn,
Expand Down Expand Up @@ -1135,6 +1203,24 @@ VLIB_REGISTER_NODE (flowprobe_walker_node) = {
.type = VLIB_NODE_TYPE_INPUT,
.state = VLIB_NODE_STATE_INTERRUPT,
};
VLIB_REGISTER_NODE (flowprobe_flush_ip4_node) = {
.function = flowprobe_flush_ip4,
.name = "flowprobe-flush-ip4",
.type = VLIB_NODE_TYPE_INPUT,
.state = VLIB_NODE_STATE_INTERRUPT,
};
VLIB_REGISTER_NODE (flowprobe_flush_ip6_node) = {
.function = flowprobe_flush_ip6,
.name = "flowprobe-flush-ip6",
.type = VLIB_NODE_TYPE_INPUT,
.state = VLIB_NODE_STATE_INTERRUPT,
};
VLIB_REGISTER_NODE (flowprobe_flush_l2_node) = {
.function = flowprobe_flush_l2,
.name = "flowprobe-flush-l2",
.type = VLIB_NODE_TYPE_INPUT,
.state = VLIB_NODE_STATE_INTERRUPT,
};
/* *INDENT-ON* */

/*
Expand Down
2 changes: 0 additions & 2 deletions test/test_flowprobe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1228,7 +1228,6 @@ def test_0002(self):
self.logger.info("FFP_TEST_FINISH_0002")


@tag_fixme_vpp_workers
class DatapathTx(MethodHolder, DatapathTestsHolder):
"""Collect info on Ethernet, IP4 and IP6 datapath (TX) (no timers)"""

Expand Down Expand Up @@ -1309,7 +1308,6 @@ def test_rewritten_traffic(self):
ipfix.remove_vpp_config()


@tag_fixme_vpp_workers
class DatapathRx(MethodHolder, DatapathTestsHolder):
"""Collect info on Ethernet, IP4 and IP6 datapath (RX) (no timers)"""

Expand Down

0 comments on commit 500ac05

Please sign in to comment.