diff --git a/crates/countersyncd/src/actor/counter_db.rs b/crates/countersyncd/src/actor/counter_db.rs index 5410d508b6b..1d327e9c6e1 100644 --- a/crates/countersyncd/src/actor/counter_db.rs +++ b/crates/countersyncd/src/actor/counter_db.rs @@ -6,6 +6,7 @@ use swss_common::{CxxString, DbConnector}; use tokio::{select, sync::mpsc::Receiver, time::interval}; use crate::message::saistats::SAIStatsMessage; +use crate::utilities::{record_comm_stats, ChannelLabel}; use crate::sai::{ SaiBufferPoolStat, SaiIngressPriorityGroupStat, SaiObjectType, SaiPortStat, SaiQueueStat, }; @@ -176,6 +177,10 @@ impl CounterDBActor { stats_msg = self.stats_receiver.recv() => { match stats_msg { Some(msg) => { + record_comm_stats( + ChannelLabel::IpfixToCounterDb, + self.stats_receiver.len(), + ); self.handle_stats_message(msg).await; } None => { diff --git a/crates/countersyncd/src/actor/data_netlink.rs b/crates/countersyncd/src/actor/data_netlink.rs index 5db7b9d83db..574e3aa1f7a 100644 --- a/crates/countersyncd/src/actor/data_netlink.rs +++ b/crates/countersyncd/src/actor/data_netlink.rs @@ -22,6 +22,7 @@ use super::super::message::{ buffer::SocketBufferMessage, netlink::{NetlinkCommand, SocketConnect}, }; +use crate::utilities::{format_hex_lines, record_comm_stats, ChannelLabel}; #[cfg(not(test))] use super::netlink_utils; @@ -397,7 +398,7 @@ impl DataNetlinkActor { "Creating socket for family '{}' with group_id {}", family, group_id ); - + // Create a raw netlink socket using netlink-sys let mut socket = match Socket::new(NETLINK_GENERIC) { Ok(s) => s, @@ -508,7 +509,7 @@ impl DataNetlinkActor { "Creating socket for family '{}' with group_id {}", family, group_id ); - + // Create a raw netlink socket let mut socket = match Socket::new(NETLINK_GENERIC) { Ok(s) => s, @@ -662,7 +663,6 @@ impl DataNetlinkActor { // Try to receive with non-blocking mode (socket should already be set to non-blocking) debug!("Attempting to receive netlink message..."); - #[cfg(not(test))] let result = { let fd = socket.as_raw_fd(); @@ -699,10 +699,18 @@ impl DataNetlinkActor { // Resize buffer to actual received size buffer.resize(size, 0); + if log::log_enabled!(log::Level::Debug) { + let hex_dump = format_hex_lines(&buffer); + debug!( + "Raw netlink recv buffer ({} bytes):\n{}", + size, hex_dump + ); + } + // Parse buffer which may contain multiple messages and/or incomplete messages let messages = message_parser.parse_buffer(&buffer)?; debug!("Parsed {} complete messages from {} bytes of data", messages.len(), size); - + Ok(messages) } size if size == 0 => { @@ -802,6 +810,10 @@ impl DataNetlinkActor { // Check for pending commands first (non-blocking) if let Ok(command) = actor.command_recipient.try_recv() { + record_comm_stats( + ChannelLabel::ControlNetlinkToDataNetlink, + actor.command_recipient.len(), + ); match command { NetlinkCommand::SocketConnect(SocketConnect { family, group }) => { actor.reset(&family, &group); @@ -831,19 +843,30 @@ impl DataNetlinkActor { Ok(messages) => { consecutive_failures = 0; // Reset failure counter on successful receive actor.last_data_time = Instant::now(); // Update data reception timestamp - + + if messages.is_empty() { debug!("Received data but no complete messages yet (partial message)"); } else { debug!("Successfully parsed {} complete netlink messages", messages.len()); - + // Send each complete netlink message individually to all recipients // This ensures each IPFIX message (contained in one netlink message) // is sent as a separate operation to the downstream actors for (i, message) in messages.iter().enumerate() { + if log::log_enabled!(log::Level::Debug) { + let hex_dump = format_hex_lines(message.as_ref()); + debug!( + "Outgoing netlink payload {}/{} ({} bytes):\n{}", + i + 1, + messages.len(), + message.len(), + hex_dump + ); + } debug!("Processing netlink message {}/{}: {} bytes", i + 1, messages.len(), message.len()); - + // Send this single netlink message to all recipients for (j, recipient) in actor.buffer_recipients.iter().enumerate() { debug!("Sending netlink message {}/{} to recipient {}", @@ -858,7 +881,7 @@ impl DataNetlinkActor { } } } - + debug!("Completed processing {} netlink messages, each sent individually", messages.len()); } } @@ -1163,13 +1186,13 @@ pub mod test { let mock_msg = create_mock_netlink_message(b"TEST_PAYLOAD"); let actual_len = 20 + b"TEST_PAYLOAD".len(); // 16 (nlmsg) + 4 (genl) + payload let mut parser = NetlinkMessageParser::new(); - + let result = parser.parse_buffer(&mock_msg[..actual_len]); assert!(result.is_ok()); let messages = result.unwrap(); assert_eq!(messages.len(), 1); - + let payload = &messages[0]; let payload_str = String::from_utf8(payload.to_vec()).unwrap(); assert_eq!(payload_str, "TEST_PAYLOAD"); @@ -1200,7 +1223,7 @@ pub mod test { let result = parser.parse_buffer(&buffer); assert!(result.is_ok()); - + // Should have no complete messages due to insufficient data let messages = result.unwrap(); assert!(messages.is_empty()); @@ -1210,24 +1233,24 @@ pub mod test { #[test] fn test_multiple_messages_in_buffer() { let mut combined_buffer = Vec::new(); - + // Create two messages let msg1 = create_mock_netlink_message(b"MESSAGE1"); let msg1_len = 20 + b"MESSAGE1".len(); let msg2 = create_mock_netlink_message(b"MESSAGE2"); let msg2_len = 20 + b"MESSAGE2".len(); - + // Combine them in one buffer (simulate receiving multiple messages in one recv) combined_buffer.extend_from_slice(&msg1[..msg1_len]); combined_buffer.extend_from_slice(&msg2[..msg2_len]); - + let mut parser = NetlinkMessageParser::new(); let result = parser.parse_buffer(&combined_buffer); assert!(result.is_ok()); - + let messages = result.unwrap(); assert_eq!(messages.len(), 2); - + let payload1_str = String::from_utf8(messages[0].to_vec()).unwrap(); let payload2_str = String::from_utf8(messages[1].to_vec()).unwrap(); assert_eq!(payload1_str, "MESSAGE1"); @@ -1240,21 +1263,21 @@ pub mod test { let msg = create_mock_netlink_message(b"FRAGMENTED_MESSAGE"); let msg_len = 20 + b"FRAGMENTED_MESSAGE".len(); let mut parser = NetlinkMessageParser::new(); - + // Simulate first recv getting only part of the message let first_part = &msg[..15]; // Less than header size let result1 = parser.parse_buffer(first_part); assert!(result1.is_ok()); let messages1 = result1.unwrap(); assert!(messages1.is_empty()); // No complete messages yet - + // Simulate second recv getting the rest let second_part = &msg[15..msg_len]; let result2 = parser.parse_buffer(second_part); assert!(result2.is_ok()); let messages2 = result2.unwrap(); assert_eq!(messages2.len(), 1); - + let payload_str = String::from_utf8(messages2[0].to_vec()).unwrap(); assert_eq!(payload_str, "FRAGMENTED_MESSAGE"); } @@ -1263,35 +1286,35 @@ pub mod test { #[test] fn test_mixed_complete_and_partial() { let mut combined_buffer = Vec::new(); - + // First complete message let msg1 = create_mock_netlink_message(b"COMPLETE"); let msg1_len = 20 + b"COMPLETE".len(); combined_buffer.extend_from_slice(&msg1[..msg1_len]); - + // Partial second message let msg2 = create_mock_netlink_message(b"PARTIAL_MSG"); let msg2_len = 20 + b"PARTIAL_MSG".len(); combined_buffer.extend_from_slice(&msg2[..25]); // Only part of second message - + let mut parser = NetlinkMessageParser::new(); let result1 = parser.parse_buffer(&combined_buffer); assert!(result1.is_ok()); - + let messages1 = result1.unwrap(); assert_eq!(messages1.len(), 1); // Only first complete message - + let payload1_str = String::from_utf8(messages1[0].to_vec()).unwrap(); assert_eq!(payload1_str, "COMPLETE"); - + // Send remaining part of second message let remaining_part = &msg2[25..msg2_len]; let result2 = parser.parse_buffer(remaining_part); assert!(result2.is_ok()); - + let messages2 = result2.unwrap(); assert_eq!(messages2.len(), 1); // Second message now complete - + let payload2_str = String::from_utf8(messages2[0].to_vec()).unwrap(); assert_eq!(payload2_str, "PARTIAL_MSG"); } diff --git a/crates/countersyncd/src/actor/ipfix.rs b/crates/countersyncd/src/actor/ipfix.rs index 7e509079ffc..b5c2e0df29a 100644 --- a/crates/countersyncd/src/actor/ipfix.rs +++ b/crates/countersyncd/src/actor/ipfix.rs @@ -20,6 +20,7 @@ use super::super::message::{ ipfix::IPFixTemplatesMessage, saistats::{SAIStat, SAIStats, SAIStatsMessage}, }; +use crate::utilities::{record_comm_stats, ChannelLabel}; /// Helper functions for debug logging formatting impl IpfixActor { @@ -114,25 +115,25 @@ impl IpfixActor { /// Formatted string representation of the sets within the message fn format_ipfix_sets_for_debug(message_data: &[u8]) -> String { let mut result = String::new(); - + // Skip IPFIX message header (16 bytes) to get to sets if message_data.len() < 16 { result.push_str(" Error: Message too short for IPFIX header\n"); return result; } - + let mut offset = 16; // Start after IPFIX header let mut set_count = 0; - + result.push_str(" Sets within message:\n"); - + while offset + 4 <= message_data.len() { // Each set starts with 4-byte header: set_id (2 bytes) + length (2 bytes) let set_id = NetworkEndian::read_u16(&message_data[offset..offset + 2]); let set_length = NetworkEndian::read_u16(&message_data[offset + 2..offset + 4]); - + set_count += 1; - + // Validate set length if set_length < 4 { result.push_str(&format!( @@ -141,7 +142,7 @@ impl IpfixActor { )); break; } - + if offset + set_length as usize > message_data.len() { result.push_str(&format!( " Set {}: TRUNCATED (set_id={}, length={}, exceeds message boundary)\n", @@ -149,7 +150,7 @@ impl IpfixActor { )); break; } - + // Determine set type based on set_id let set_type = if set_id == 2 { "Template Set" @@ -160,12 +161,12 @@ impl IpfixActor { } else { "Reserved/Unknown" }; - + result.push_str(&format!( " Set {} (offset: {}, set_id: {}, length: {} bytes, type: {})\n", set_count, offset, set_id, set_length, set_type )); - + // For data sets, show complete structure info if set_id >= 256 && set_length > 4 { let data_length = set_length as usize - 4; // Exclude 4-byte set header @@ -174,7 +175,7 @@ impl IpfixActor { " Data payload: {} bytes", data_length )); - + // Show complete data payload if data_length > 0 { let data_bytes = &message_data[data_start..data_start + data_length]; @@ -183,7 +184,7 @@ impl IpfixActor { .map(|b| format!("{:02x}", b)) .collect::>() .join(" "); - + // Format with line breaks for better readability if data is long if data_length <= 32 { // Short data on single line @@ -208,17 +209,17 @@ impl IpfixActor { result.push_str("\n"); } } - + // Move to next set offset += set_length as usize; } - + if set_count == 0 { result.push_str(" No valid sets found\n"); } else { result.push_str(&format!(" Total sets: {}\n", set_count)); } - + result } @@ -921,6 +922,10 @@ impl IpfixActor { templates = actor.template_recipient.recv() => { match templates { Some(templates) => { + record_comm_stats( + ChannelLabel::SwssToIpfixTemplates, + actor.template_recipient.len(), + ); actor.handle_template(templates); }, None => { @@ -931,6 +936,10 @@ impl IpfixActor { record = actor.record_recipient.recv() => { match record { Some(record) => { + record_comm_stats( + ChannelLabel::DataNetlinkToIpfixRecords, + actor.record_recipient.len(), + ); let messages = actor.handle_record(record); for recipient in &actor.saistats_recipients { for message in &messages { diff --git a/crates/countersyncd/src/actor/otel.rs b/crates/countersyncd/src/actor/otel.rs index 220cec0c002..1d316586ebc 100644 --- a/crates/countersyncd/src/actor/otel.rs +++ b/crates/countersyncd/src/actor/otel.rs @@ -34,6 +34,7 @@ use crate::message::{ otel::OtelMetrics, saistats::SAIStatsMessage, }; +use crate::utilities::{record_comm_stats, ChannelLabel}; const INITIAL_BACKOFF_DELAY_SECS: u64 = 1; const MAX_BACKOFF_DELAY_SECS: u64 = 10; @@ -95,7 +96,7 @@ pub struct OtelActor { buffer: Vec, buffered_counters: usize, flush_deadline: TokioInstant, - + // Statistics tracking messages_received: u64, exports_performed: u64, @@ -175,6 +176,10 @@ impl OtelActor { stats_msg = self.stats_receiver.recv() => { match stats_msg { Some(stats) => { + record_comm_stats( + ChannelLabel::IpfixToOtel, + self.stats_receiver.len(), + ); self.handle_stats_message(stats).await?; self.reset_flush_timer(&mut flush_timer); } @@ -273,7 +278,6 @@ impl OtelActor { info!("Raw Gauge: {:#?}", gauge); } } - } // Exponential backoff diff --git a/crates/countersyncd/src/actor/stats_reporter.rs b/crates/countersyncd/src/actor/stats_reporter.rs index c7142f117b6..16033277874 100644 --- a/crates/countersyncd/src/actor/stats_reporter.rs +++ b/crates/countersyncd/src/actor/stats_reporter.rs @@ -13,6 +13,7 @@ use super::super::message::saistats::SAIStatsMessage; use crate::sai::{ SaiBufferPoolStat, SaiIngressPriorityGroupStat, SaiObjectType, SaiPortStat, SaiQueueStat, }; +use crate::utilities::{record_comm_stats, ChannelLabel}; /// Unique key for identifying a specific counter based on the triplet /// (object_name, type_id, stat_id) @@ -479,6 +480,10 @@ impl StatsReporterActor { stats_msg = actor.stats_receiver.recv() => { match stats_msg { Some(stats) => { + record_comm_stats( + ChannelLabel::IpfixToStatsReporter, + actor.stats_receiver.len(), + ); actor.update_stats(stats); } None => { diff --git a/crates/countersyncd/src/lib.rs b/crates/countersyncd/src/lib.rs index 3090dffc300..a4afbe5c306 100644 --- a/crates/countersyncd/src/lib.rs +++ b/crates/countersyncd/src/lib.rs @@ -3,3 +3,4 @@ pub mod actor; pub mod message; pub mod sai; pub mod exit_codes; +pub mod utilities; diff --git a/crates/countersyncd/src/main.rs b/crates/countersyncd/src/main.rs index b4983b16d6b..53b6a60f409 100644 --- a/crates/countersyncd/src/main.rs +++ b/crates/countersyncd/src/main.rs @@ -2,6 +2,7 @@ mod actor; mod message; mod sai; +mod utilities; // External dependencies use clap::Parser; @@ -22,6 +23,7 @@ use crate::actor::{ // Internal exit codes use countersyncd::exit_codes::EXIT_OTEL_EXPORT_RETRIES_EXHAUSTED; +use crate::utilities::{set_comm_capacity, ChannelLabel}; /// Initialize logging based on command line arguments fn init_logging(log_level: &str, log_format: &str) { @@ -252,6 +254,13 @@ async fn main() -> Result<(), Box> { let (otel_sender, otel_receiver) = channel(args.otel_capacity); let (otel_shutdown_sender, _otel_shutdown_receiver) = tokio::sync::oneshot::channel(); + set_comm_capacity(ChannelLabel::ControlNetlinkToDataNetlink, 10); + set_comm_capacity(ChannelLabel::DataNetlinkToIpfixRecords, args.data_netlink_capacity); + set_comm_capacity(ChannelLabel::SwssToIpfixTemplates, 10); + set_comm_capacity(ChannelLabel::IpfixToStatsReporter, args.stats_reporter_capacity); + set_comm_capacity(ChannelLabel::IpfixToCounterDb, args.counter_db_capacity); + set_comm_capacity(ChannelLabel::IpfixToOtel, args.otel_capacity); + // Get netlink family and group configuration from SONiC constants let (family, group) = get_genl_family_group(); info!("Using netlink family: '{}', group: '{}'", family, group); diff --git a/crates/countersyncd/src/utilities/mod.rs b/crates/countersyncd/src/utilities/mod.rs new file mode 100644 index 00000000000..3cba6299639 --- /dev/null +++ b/crates/countersyncd/src/utilities/mod.rs @@ -0,0 +1,184 @@ +/// Utility helpers shared across countersyncd modules. + +use std::collections::HashMap; +use std::sync::Mutex; +use std::time::{Duration, Instant}; + +use log::info; +use once_cell::sync::Lazy; + +/// Formats a binary buffer into a hex string with 4 bytes per line. +/// +/// Each line contains up to 4 bytes, formatted as two-digit lowercase hex +/// separated by a single space. +pub fn format_hex_lines(buffer: &[u8]) -> String { + const BYTES_PER_LINE: usize = 4; + if buffer.is_empty() { + return String::new(); + } + + let mut lines = Vec::new(); + for chunk in buffer.chunks(BYTES_PER_LINE) { + let line = chunk + .iter() + .map(|byte| format!("{:02x}", byte)) + .collect::>() + .join(" "); + lines.push(line); + } + + lines.join("\n") +} + +/// Log interval for communication stats. +const COMM_LOG_INTERVAL: Duration = Duration::from_secs(600); + +/// Channel labels for actor-to-actor communication. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum ChannelLabel { + ControlNetlinkToDataNetlink, + DataNetlinkToIpfixRecords, + SwssToIpfixTemplates, + IpfixToStatsReporter, + IpfixToCounterDb, + IpfixToOtel, +} + +impl ChannelLabel { + fn as_str(self) -> &'static str { + match self { + ChannelLabel::ControlNetlinkToDataNetlink => "control_netlink.data_netlink_cmd", + ChannelLabel::DataNetlinkToIpfixRecords => "data_netlink.ipfix_records", + ChannelLabel::SwssToIpfixTemplates => "swss.ipfix_templates", + ChannelLabel::IpfixToStatsReporter => "ipfix.stats_reporter", + ChannelLabel::IpfixToCounterDb => "ipfix.counter_db", + ChannelLabel::IpfixToOtel => "ipfix.otel", + } + } +} + +#[derive(Debug, Clone)] +struct CommStats { + /// Total number of samples recorded in the current reporting window. + /// Use to normalize sums and compare workload across windows. + count: u64, + /// Sum of sampled channel lengths (used to compute average). + /// Higher sum with same count means consistently higher queue occupancy. + sum: u64, + /// Peak channel length observed in the current window. + /// Spikes here indicate bursty producers or downstream backpressure. + max: usize, + /// Minimum channel length observed in the current window. + /// Useful to confirm idle periods (min == 0) or steady load (min > 0). + min: usize, + /// Most recent sampled channel length. + /// Helps correlate with immediate behavior when reading logs. + last: usize, + /// Sum of squared channel lengths (used to compute RMS). + /// RMS > AVG implies variability/peaks; RMS ~= AVG implies stable load. + sum_sq: u128, + /// Number of samples where channel length was non-zero. + /// Non-zero ratio hints at sustained pressure vs. intermittent bursts. + nonzero_count: u64, + /// Configured channel capacity (0 means unknown/not set). + /// Enables utilization analysis: avg/capacity and peak/capacity. + capacity: usize, + /// Last time we emitted a log for this label. + last_log: Instant, +} + +impl Default for CommStats { + fn default() -> Self { + Self { + count: 0, + sum: 0, + max: 0, + min: 0, + last: 0, + sum_sq: 0, + nonzero_count: 0, + capacity: 0, + last_log: Instant::now(), + } + } +} + +static COMM_STATS: Lazy>> = + Lazy::new(|| Mutex::new(HashMap::new())); + +/// Records a communication channel length sample and logs periodically. +pub fn record_comm_stats(label: ChannelLabel, channel_len: usize) { + let mut stats_map = COMM_STATS + .lock() + .expect("COMM_STATS mutex poisoned"); + + let stats = stats_map.entry(label).or_insert_with(CommStats::default); + + stats.count = stats.count.saturating_add(1); + stats.sum = stats.sum.saturating_add(channel_len as u64); + stats.sum_sq = stats + .sum_sq + .saturating_add((channel_len as u128).saturating_mul(channel_len as u128)); + stats.last = channel_len; + if channel_len > 0 { + stats.nonzero_count = stats.nonzero_count.saturating_add(1); + } + if stats.count == 1 || channel_len < stats.min { + stats.min = channel_len; + } + if channel_len > stats.max { + stats.max = channel_len; + } + + let now = Instant::now(); + if now.duration_since(stats.last_log) >= COMM_LOG_INTERVAL { + let avg = stats.sum as f64 / stats.count as f64; + let rms = (stats.sum_sq as f64 / stats.count as f64).sqrt(); + if stats.capacity > 0 { + let avg_util = avg / stats.capacity as f64; + let peak_util = stats.max as f64 / stats.capacity as f64; + info!( + "Comm stats [{}]: count={}, avg_len={:.2}, peak_len={}, min_len={}, last_len={}, rms_len={:.2}, nonzero_count={}, capacity={}, avg_util={:.2}, peak_util={:.2}", + label.as_str(), + stats.count, + avg, + stats.max, + stats.min, + stats.last, + rms, + stats.nonzero_count, + stats.capacity, + avg_util, + peak_util + ); + } else { + info!( + "Comm stats [{}]: count={}, avg_len={:.2}, peak_len={}, min_len={}, last_len={}, rms_len={:.2}, nonzero_count={}", + label.as_str(), + stats.count, + avg, + stats.max, + stats.min, + stats.last, + rms, + stats.nonzero_count + ); + } + let capacity = stats.capacity; + *stats = CommStats { + capacity, + last_log: now, + ..CommStats::default() + }; + } +} + +/// Sets channel capacity for utilization analysis (optional). +/// Call this once during initialization if capacity is known. +pub fn set_comm_capacity(label: ChannelLabel, capacity: usize) { + let mut stats_map = COMM_STATS + .lock() + .expect("COMM_STATS mutex poisoned"); + let stats = stats_map.entry(label).or_insert_with(CommStats::default); + stats.capacity = capacity; +}