From 5028472cdc9ed5fa4e08bce1b925e496301d2f85 Mon Sep 17 00:00:00 2001 From: Sonic Build Admin Date: Sun, 31 Aug 2025 18:44:25 +0000 Subject: [PATCH] Suppress update from consumer bridge if there is no change ### why There is very frequent update from DPU_STATE. It appears pmon collecting dpu state and update this table every minute. But in each cycle, it makes multiple no change update to the table. redis doesn't seem to filter it. It sends notification regardlessly. Also hamgrd sends no-change update to ProducerBridge or ZmqProducerBridge. The latter sends notification to DPU via zmq periodically propagated from DPU_STATE update. DashHaOrch treats no-change update as warning and it also pollutes swss.rec in DPU. ### what this PR does - consumer bridge filters no change update. It compares the update to previous state. If there is no change, it will just drop the notification. - remove tracing span lifecycle events if logger is initialized from swss. span events are good for debugging long operations but they are also too chatty. They can be enabled if swbusd/hamgrd are started with RUST_LOG env, which has highest priority in initializing loggers. - producer bridge keeps a cache of previous update and filter out the no-change update. --- crates/hamgrd/src/actors/dpu.rs | 2 +- crates/sonic-common/src/log.rs | 3 +- crates/swss-common-bridge/src/consumer.rs | 36 ++------ crates/swss-common-bridge/src/lib.rs | 62 +++++++++++++ crates/swss-common-bridge/src/producer.rs | 101 +++++++++++++++++++++- 5 files changed, 167 insertions(+), 37 deletions(-) diff --git a/crates/hamgrd/src/actors/dpu.rs b/crates/hamgrd/src/actors/dpu.rs index 0e6ae79..1ee801a 100644 --- a/crates/hamgrd/src/actors/dpu.rs +++ b/crates/hamgrd/src/actors/dpu.rs @@ -235,7 +235,7 @@ impl DpuActor { let bfd_probe_state = match Self::get_bfd_probe_state(incoming) { Ok(bfd_probe_state) => Some(bfd_probe_state), Err(e) => { - info!("Not able to get BFD probe state. Error: {}", e); + debug!("Not able to get BFD probe state. Error: {}", e); None } }; diff --git a/crates/sonic-common/src/log.rs b/crates/sonic-common/src/log.rs index dd845ba..8f6ddc2 100644 --- a/crates/sonic-common/src/log.rs +++ b/crates/sonic-common/src/log.rs @@ -90,8 +90,7 @@ pub fn init(program_name: &'static str, link_swsscommon_logger: bool) -> Result< let file_subscriber = file_subscriber .with_line_number(true) .with_target(false) - .with_ansi(false) - .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE); + .with_ansi(false); tracing_subscriber::registry() .with(level_layer.and_then(file_subscriber)) diff --git a/crates/swss-common-bridge/src/consumer.rs b/crates/swss-common-bridge/src/consumer.rs index 299272c..8bdb5c3 100644 --- a/crates/swss-common-bridge/src/consumer.rs +++ b/crates/swss-common-bridge/src/consumer.rs @@ -1,5 +1,6 @@ +use crate::TableCache; use sonic_common::SonicDbTable; -use std::{collections::HashMap, future::Future, sync::Arc}; +use std::{future::Future, sync::Arc}; use swbus_actor::ActorMessage; use swbus_edge::{ simple_client::{MessageBody, OutgoingMessage, SimpleSwbusEdgeClient}, @@ -7,7 +8,7 @@ use swbus_edge::{ SwbusEdgeRuntime, }; use swss_common::{ - ConsumerStateTable, FieldValues, KeyOpFieldValues, KeyOperation, SubscriberStateTable, Table, ZmqConsumerStateTable, + ConsumerStateTable, KeyOpFieldValues, KeyOperation, SubscriberStateTable, Table, ZmqConsumerStateTable, }; use tokio::task::JoinHandle; use tokio_util::task::AbortOnDropHandle; @@ -64,7 +65,9 @@ where } // Merge the kfv to get the whole table as an update - let kfv = table_cache.merge_kfv(kfv); + let Some(kfv) = table_cache.merge_kfv(kfv) else { + return; // No change, skip sending + }; if !selector(&kfv) { return; } @@ -113,33 +116,6 @@ where }) } -/// An in-memory copy of a table. -/// We keep a copy so that we can send the entire table for each update, rather than just the updated fields. -/// This relieves the need for actors to handle partial updates by caching their own copy. -#[derive(Default)] -struct TableCache(HashMap); - -impl TableCache { - /// Merge the update and return a `KeyOpFieldValues` that contains the state of the entire table. - fn merge_kfv(&mut self, kfv: KeyOpFieldValues) -> KeyOpFieldValues { - match kfv.operation { - KeyOperation::Set => { - let field_values = self.0.entry(kfv.key.clone()).or_default(); - field_values.extend(kfv.field_values); - KeyOpFieldValues { - key: kfv.key, - operation: KeyOperation::Set, - field_values: field_values.clone(), - } - } - KeyOperation::Del => { - self.0.remove(&kfv.key); - kfv - } - } - } -} - pub trait ConsumerTable: Send + 'static { /// Wait for updates fn read_data(&mut self) -> impl Future + Send; diff --git a/crates/swss-common-bridge/src/lib.rs b/crates/swss-common-bridge/src/lib.rs index 934046a..913d0f5 100644 --- a/crates/swss-common-bridge/src/lib.rs +++ b/crates/swss-common-bridge/src/lib.rs @@ -1,2 +1,64 @@ pub mod consumer; pub mod producer; + +use std::collections::HashMap; +use swss_common::{FieldValues, KeyOpFieldValues, KeyOperation}; + +/// An in-memory copy of a table. +/// We keep a copy so that we can skip update if there is no change. The cache is established from the previous +/// request sent to the Producer. If the Producer is restarted, the cache will be empty. +#[derive(Default)] +pub(crate) struct TableCache(HashMap); + +impl TableCache { + /// Merge the update and return a `KeyOpFieldValues` that contains the state of the entire table. + /// Returns None if the update doesn't change the existing data. + fn merge_kfv(&mut self, kfv: KeyOpFieldValues) -> Option { + match kfv.operation { + KeyOperation::Set => { + let field_values = self.0.entry(kfv.key.clone()).or_default(); + + // Check if the new field_values would be the same as the existing ones + let mut new_field_values = field_values.clone(); + new_field_values.extend(kfv.field_values); + + if new_field_values == *field_values { + return None; + } + + *field_values = new_field_values; + Some(KeyOpFieldValues { + key: kfv.key, + operation: KeyOperation::Set, + field_values: field_values.clone(), + }) + } + KeyOperation::Del => { + self.0.remove(&kfv.key); + Some(kfv) + } + } + } + + /// Replace the cached entry with the provided kfv. + /// Returns false if operation is SET and field values are the same as cached ones. + /// For Del it always return true because the local cache is cleared after restart. + fn replace_kfv(&mut self, kfv: KeyOpFieldValues) -> bool { + match kfv.operation { + KeyOperation::Set => { + let field_values = self.0.entry(kfv.key.clone()).or_default(); + + if kfv.field_values == *field_values { + return false; + } + + *field_values = kfv.field_values; + true + } + KeyOperation::Del => { + self.0.remove(&kfv.key); + true + } + } + } +} diff --git a/crates/swss-common-bridge/src/producer.rs b/crates/swss-common-bridge/src/producer.rs index 68077c6..1b04a4b 100644 --- a/crates/swss-common-bridge/src/producer.rs +++ b/crates/swss-common-bridge/src/producer.rs @@ -1,3 +1,4 @@ +use crate::TableCache; use std::{future::Future, sync::Arc}; use swbus_actor::ActorMessage; use swbus_edge::{ @@ -31,6 +32,7 @@ where { let swbus = SimpleSwbusEdgeClient::new(rt, addr, false, false); tokio::task::spawn(async move { + let mut table_cache = TableCache::default(); loop { let Some(msg) = swbus.recv().await else { // Swbus shut down, we might as well quit. @@ -45,8 +47,12 @@ where let (error_code, error_message) = match ActorMessage::deserialize(&payload) { Ok(actor_msg) => match actor_msg.deserialize_data::() { Ok(kfv) => { - table.apply_kfv(kfv).await; - (SwbusErrorCode::Ok, String::new()) + if table_cache.replace_kfv(kfv.clone()) { + table.apply_kfv(kfv).await; + (SwbusErrorCode::Ok, String::new()) + } else { + (SwbusErrorCode::Ok, "No change in data".to_string()) + } } Err(e) => ( SwbusErrorCode::InvalidPayload, @@ -119,10 +125,10 @@ mod test { SwbusEdgeRuntime, }; use swss_common::{ - ConsumerStateTable, KeyOpFieldValues, ProducerStateTable, ZmqClient, ZmqConsumerStateTable, + ConsumerStateTable, KeyOpFieldValues, KeyOperation, ProducerStateTable, ZmqClient, ZmqConsumerStateTable, ZmqProducerStateTable, ZmqServer, }; - use swss_common_testing::{random_kfvs, random_zmq_endpoint, Redis}; + use swss_common_testing::{random_fvs, random_kfvs, random_string, random_zmq_endpoint, Redis}; use tokio::time::timeout; #[tokio::test] @@ -132,6 +138,13 @@ mod test { let cst = ConsumerStateTable::new(redis.db_connector(), "mytable", None, None).unwrap(); timeout(Duration::from_secs(5), run_test(cst, pst)).await.unwrap(); } + #[tokio::test] + async fn producer_state_table_bridge_check_dup() { + let redis = Redis::start(); + let pst = ProducerStateTable::new(redis.db_connector(), "mytable").unwrap(); + let cst = ConsumerStateTable::new(redis.db_connector(), "mytable", None, None).unwrap(); + timeout(Duration::from_secs(5), run_dup_test(cst, pst)).await.unwrap(); + } #[tokio::test] async fn zmq_ponsumer_state_table_bridge() { @@ -145,6 +158,18 @@ mod test { timeout(Duration::from_secs(5), run_test(zcst, zpst)).await.unwrap(); } + #[tokio::test] + async fn zmq_ponsumer_state_table_bridge_check_dup() { + let (zmq_endpoint, _deleter) = random_zmq_endpoint(); + let mut zmqs = ZmqServer::new(&zmq_endpoint).unwrap(); + let zmqc = ZmqClient::new(&zmq_endpoint).unwrap(); + + let redis = Redis::start(); + let zpst = ZmqProducerStateTable::new(redis.db_connector(), "mytable", zmqc, false).unwrap(); + let zcst = ZmqConsumerStateTable::new(redis.db_connector(), "mytable", &mut zmqs, None, None).unwrap(); + timeout(Duration::from_secs(5), run_dup_test(zcst, zpst)).await.unwrap(); + } + async fn run_test(mut consumer_table: C, producer_table: P) { // Setup swbus let mut swbus_edge = SwbusEdgeRuntime::new("".to_string(), sp("edge")); @@ -182,6 +207,74 @@ mod test { assert_eq!(kfvs, kfvs_received); } + async fn run_dup_test(mut consumer_table: C, producer_table: P) { + // Setup swbus + let mut swbus_edge = SwbusEdgeRuntime::new("".to_string(), sp("edge")); + swbus_edge.start().await.unwrap(); + let rt = Arc::new(swbus_edge); + + // Create edge client to send updates to the bridge + let swbus = SimpleSwbusEdgeClient::new(rt.clone(), sp("receiver"), true, false); + + // Spawn the bridge + let _bridge = ProducerBridge::spawn(rt, sp("mytable-bridge"), producer_table); + + // Send some updates to the bridge + let mut kfvs = Vec::new(); + for _ in 0..10 { + let kfv = KeyOpFieldValues { + key: random_string(), + operation: KeyOperation::Set, + field_values: random_fvs(), + }; + + kfvs.push(kfv); + } + + for kfv in &kfvs { + let msg = OutgoingMessage { + destination: sp("mytable-bridge"), + body: MessageBody::Request { + payload: encode_kfv(kfv), + }, + }; + swbus.send(msg).await.unwrap(); + } + + // Receive the updates directly + let mut kfvs_received = Vec::new(); + while kfvs_received.len() < kfvs.len() { + consumer_table.read_data().await; + kfvs_received.extend(consumer_table.pops().await); + } + + // Assert we got all the same updates + kfvs.sort_unstable(); + kfvs_received.sort_unstable(); + assert_eq!(kfvs, kfvs_received); + + // Resend the same updates to the producer bridge + for kfv in &kfvs { + let msg = OutgoingMessage { + destination: sp("dpu-bridge"), + body: MessageBody::Request { + payload: encode_kfv(kfv), + }, + }; + swbus.send(msg).await.unwrap(); + println!("Sent: {}", kfv.key); + } + let result = timeout(Duration::from_secs(3), consumer_table.read_data()).await; + if result.is_ok() { + // If we got here, it means the bridge did not skip the updates + let received = consumer_table.pops().await; + for kfv in received { + println!("Received: {}", kfv.key); + } + panic!("Expected bridge to skip duplicate updates, but it processed them"); + } + } + fn encode_kfv(kfv: &KeyOpFieldValues) -> Vec { ActorMessage::new("", kfv).unwrap().serialize() }