Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions crates/countersyncd/src/actor/counter_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use swss_common::{CxxString, DbConnector};
use tokio::{select, sync::mpsc::Receiver, time::interval};

use crate::message::saistats::SAIStatsMessage;
use crate::utilities::{record_comm_stats, ChannelLabel};
use crate::sai::{
SaiBufferPoolStat, SaiIngressPriorityGroupStat, SaiObjectType, SaiPortStat, SaiQueueStat,
};
Expand Down Expand Up @@ -176,6 +177,10 @@ impl CounterDBActor {
stats_msg = self.stats_receiver.recv() => {
match stats_msg {
Some(msg) => {
record_comm_stats(
ChannelLabel::IpfixToCounterDb,
self.stats_receiver.len(),
);
self.handle_stats_message(msg).await;
}
None => {
Expand Down
77 changes: 50 additions & 27 deletions crates/countersyncd/src/actor/data_netlink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use super::super::message::{
buffer::SocketBufferMessage,
netlink::{NetlinkCommand, SocketConnect},
};
use crate::utilities::{format_hex_lines, record_comm_stats, ChannelLabel};
#[cfg(not(test))]
use super::netlink_utils;

Expand Down Expand Up @@ -397,7 +398,7 @@ impl DataNetlinkActor {
"Creating socket for family '{}' with group_id {}",
family, group_id
);

// Create a raw netlink socket using netlink-sys
let mut socket = match Socket::new(NETLINK_GENERIC) {
Ok(s) => s,
Expand Down Expand Up @@ -508,7 +509,7 @@ impl DataNetlinkActor {
"Creating socket for family '{}' with group_id {}",
family, group_id
);

// Create a raw netlink socket
let mut socket = match Socket::new(NETLINK_GENERIC) {
Ok(s) => s,
Expand Down Expand Up @@ -662,7 +663,6 @@ impl DataNetlinkActor {

// Try to receive with non-blocking mode (socket should already be set to non-blocking)
debug!("Attempting to receive netlink message...");

#[cfg(not(test))]
let result = {
let fd = socket.as_raw_fd();
Expand Down Expand Up @@ -699,10 +699,18 @@ impl DataNetlinkActor {
// Resize buffer to actual received size
buffer.resize(size, 0);

if log::log_enabled!(log::Level::Debug) {
let hex_dump = format_hex_lines(&buffer);
debug!(
"Raw netlink recv buffer ({} bytes):\n{}",
size, hex_dump
);
}

// Parse buffer which may contain multiple messages and/or incomplete messages
let messages = message_parser.parse_buffer(&buffer)?;
debug!("Parsed {} complete messages from {} bytes of data", messages.len(), size);

Ok(messages)
}
size if size == 0 => {
Expand Down Expand Up @@ -802,6 +810,10 @@ impl DataNetlinkActor {

// Check for pending commands first (non-blocking)
if let Ok(command) = actor.command_recipient.try_recv() {
record_comm_stats(
ChannelLabel::ControlNetlinkToDataNetlink,
actor.command_recipient.len(),
);
match command {
NetlinkCommand::SocketConnect(SocketConnect { family, group }) => {
actor.reset(&family, &group);
Expand Down Expand Up @@ -831,19 +843,30 @@ impl DataNetlinkActor {
Ok(messages) => {
consecutive_failures = 0; // Reset failure counter on successful receive
actor.last_data_time = Instant::now(); // Update data reception timestamp



if messages.is_empty() {
debug!("Received data but no complete messages yet (partial message)");
} else {
debug!("Successfully parsed {} complete netlink messages", messages.len());

// Send each complete netlink message individually to all recipients
// This ensures each IPFIX message (contained in one netlink message)
// is sent as a separate operation to the downstream actors
for (i, message) in messages.iter().enumerate() {
if log::log_enabled!(log::Level::Debug) {
let hex_dump = format_hex_lines(message.as_ref());
debug!(
"Outgoing netlink payload {}/{} ({} bytes):\n{}",
i + 1,
messages.len(),
message.len(),
hex_dump
);
}
debug!("Processing netlink message {}/{}: {} bytes",
i + 1, messages.len(), message.len());

// Send this single netlink message to all recipients
for (j, recipient) in actor.buffer_recipients.iter().enumerate() {
debug!("Sending netlink message {}/{} to recipient {}",
Expand All @@ -858,7 +881,7 @@ impl DataNetlinkActor {
}
}
}

debug!("Completed processing {} netlink messages, each sent individually", messages.len());
}
}
Expand Down Expand Up @@ -1163,13 +1186,13 @@ pub mod test {
let mock_msg = create_mock_netlink_message(b"TEST_PAYLOAD");
let actual_len = 20 + b"TEST_PAYLOAD".len(); // 16 (nlmsg) + 4 (genl) + payload
let mut parser = NetlinkMessageParser::new();

let result = parser.parse_buffer(&mock_msg[..actual_len]);
assert!(result.is_ok());

let messages = result.unwrap();
assert_eq!(messages.len(), 1);

let payload = &messages[0];
let payload_str = String::from_utf8(payload.to_vec()).unwrap();
assert_eq!(payload_str, "TEST_PAYLOAD");
Expand Down Expand Up @@ -1200,7 +1223,7 @@ pub mod test {

let result = parser.parse_buffer(&buffer);
assert!(result.is_ok());

// Should have no complete messages due to insufficient data
let messages = result.unwrap();
assert!(messages.is_empty());
Expand All @@ -1210,24 +1233,24 @@ pub mod test {
#[test]
fn test_multiple_messages_in_buffer() {
let mut combined_buffer = Vec::new();

// Create two messages
let msg1 = create_mock_netlink_message(b"MESSAGE1");
let msg1_len = 20 + b"MESSAGE1".len();
let msg2 = create_mock_netlink_message(b"MESSAGE2");
let msg2_len = 20 + b"MESSAGE2".len();

// Combine them in one buffer (simulate receiving multiple messages in one recv)
combined_buffer.extend_from_slice(&msg1[..msg1_len]);
combined_buffer.extend_from_slice(&msg2[..msg2_len]);

let mut parser = NetlinkMessageParser::new();
let result = parser.parse_buffer(&combined_buffer);
assert!(result.is_ok());

let messages = result.unwrap();
assert_eq!(messages.len(), 2);

let payload1_str = String::from_utf8(messages[0].to_vec()).unwrap();
let payload2_str = String::from_utf8(messages[1].to_vec()).unwrap();
assert_eq!(payload1_str, "MESSAGE1");
Expand All @@ -1240,21 +1263,21 @@ pub mod test {
let msg = create_mock_netlink_message(b"FRAGMENTED_MESSAGE");
let msg_len = 20 + b"FRAGMENTED_MESSAGE".len();
let mut parser = NetlinkMessageParser::new();

// Simulate first recv getting only part of the message
let first_part = &msg[..15]; // Less than header size
let result1 = parser.parse_buffer(first_part);
assert!(result1.is_ok());
let messages1 = result1.unwrap();
assert!(messages1.is_empty()); // No complete messages yet

// Simulate second recv getting the rest
let second_part = &msg[15..msg_len];
let result2 = parser.parse_buffer(second_part);
assert!(result2.is_ok());
let messages2 = result2.unwrap();
assert_eq!(messages2.len(), 1);

let payload_str = String::from_utf8(messages2[0].to_vec()).unwrap();
assert_eq!(payload_str, "FRAGMENTED_MESSAGE");
}
Expand All @@ -1263,35 +1286,35 @@ pub mod test {
#[test]
fn test_mixed_complete_and_partial() {
let mut combined_buffer = Vec::new();

// First complete message
let msg1 = create_mock_netlink_message(b"COMPLETE");
let msg1_len = 20 + b"COMPLETE".len();
combined_buffer.extend_from_slice(&msg1[..msg1_len]);

// Partial second message
let msg2 = create_mock_netlink_message(b"PARTIAL_MSG");
let msg2_len = 20 + b"PARTIAL_MSG".len();
combined_buffer.extend_from_slice(&msg2[..25]); // Only part of second message

let mut parser = NetlinkMessageParser::new();
let result1 = parser.parse_buffer(&combined_buffer);
assert!(result1.is_ok());

let messages1 = result1.unwrap();
assert_eq!(messages1.len(), 1); // Only first complete message

let payload1_str = String::from_utf8(messages1[0].to_vec()).unwrap();
assert_eq!(payload1_str, "COMPLETE");

// Send remaining part of second message
let remaining_part = &msg2[25..msg2_len];
let result2 = parser.parse_buffer(remaining_part);
assert!(result2.is_ok());

let messages2 = result2.unwrap();
assert_eq!(messages2.len(), 1); // Second message now complete

let payload2_str = String::from_utf8(messages2[0].to_vec()).unwrap();
assert_eq!(payload2_str, "PARTIAL_MSG");
}
Expand Down
39 changes: 24 additions & 15 deletions crates/countersyncd/src/actor/ipfix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use super::super::message::{
ipfix::IPFixTemplatesMessage,
saistats::{SAIStat, SAIStats, SAIStatsMessage},
};
use crate::utilities::{record_comm_stats, ChannelLabel};

/// Helper functions for debug logging formatting
impl IpfixActor {
Expand Down Expand Up @@ -114,25 +115,25 @@ impl IpfixActor {
/// Formatted string representation of the sets within the message
fn format_ipfix_sets_for_debug(message_data: &[u8]) -> String {
let mut result = String::new();

// Skip IPFIX message header (16 bytes) to get to sets
if message_data.len() < 16 {
result.push_str(" Error: Message too short for IPFIX header\n");
return result;
}

let mut offset = 16; // Start after IPFIX header
let mut set_count = 0;

result.push_str(" Sets within message:\n");

while offset + 4 <= message_data.len() {
// Each set starts with 4-byte header: set_id (2 bytes) + length (2 bytes)
let set_id = NetworkEndian::read_u16(&message_data[offset..offset + 2]);
let set_length = NetworkEndian::read_u16(&message_data[offset + 2..offset + 4]);

set_count += 1;

// Validate set length
if set_length < 4 {
result.push_str(&format!(
Expand All @@ -141,15 +142,15 @@ impl IpfixActor {
));
break;
}

if offset + set_length as usize > message_data.len() {
result.push_str(&format!(
" Set {}: TRUNCATED (set_id={}, length={}, exceeds message boundary)\n",
set_count, set_id, set_length
));
break;
}

// Determine set type based on set_id
let set_type = if set_id == 2 {
"Template Set"
Expand All @@ -160,12 +161,12 @@ impl IpfixActor {
} else {
"Reserved/Unknown"
};

result.push_str(&format!(
" Set {} (offset: {}, set_id: {}, length: {} bytes, type: {})\n",
set_count, offset, set_id, set_length, set_type
));

// For data sets, show complete structure info
if set_id >= 256 && set_length > 4 {
let data_length = set_length as usize - 4; // Exclude 4-byte set header
Expand All @@ -174,7 +175,7 @@ impl IpfixActor {
" Data payload: {} bytes",
data_length
));

// Show complete data payload
if data_length > 0 {
let data_bytes = &message_data[data_start..data_start + data_length];
Expand All @@ -183,7 +184,7 @@ impl IpfixActor {
.map(|b| format!("{:02x}", b))
.collect::<Vec<_>>()
.join(" ");

// Format with line breaks for better readability if data is long
if data_length <= 32 {
// Short data on single line
Expand All @@ -208,17 +209,17 @@ impl IpfixActor {
result.push_str("\n");
}
}

// Move to next set
offset += set_length as usize;
}

if set_count == 0 {
result.push_str(" No valid sets found\n");
} else {
result.push_str(&format!(" Total sets: {}\n", set_count));
}

result
}

Expand Down Expand Up @@ -921,6 +922,10 @@ impl IpfixActor {
templates = actor.template_recipient.recv() => {
match templates {
Some(templates) => {
record_comm_stats(
ChannelLabel::SwssToIpfixTemplates,
actor.template_recipient.len(),
);
actor.handle_template(templates);
},
None => {
Expand All @@ -931,6 +936,10 @@ impl IpfixActor {
record = actor.record_recipient.recv() => {
match record {
Some(record) => {
record_comm_stats(
ChannelLabel::DataNetlinkToIpfixRecords,
actor.record_recipient.len(),
);
let messages = actor.handle_record(record);
for recipient in &actor.saistats_recipients {
for message in &messages {
Expand Down
Loading
Loading