From dd948b09e84ef642c6e02c139ca537461b21ced8 Mon Sep 17 00:00:00 2001 From: Yue Gao Date: Thu, 17 Jul 2025 17:02:10 -0400 Subject: [PATCH 1/2] Suppress update from consumer bridge if there is no change --- Cargo.lock | 1 + crates/hamgrd/src/actors/dpu.rs | 2 +- crates/sonic-common/src/log.rs | 3 +-- crates/swss-common-bridge/Cargo.toml | 1 + crates/swss-common-bridge/src/consumer.rs | 26 +++++++++++++++++------ 5 files changed, 24 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4b6ffc58..335518b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2145,6 +2145,7 @@ dependencies = [ name = "swss-common-bridge" version = "0.1.0" dependencies = [ + "anyhow", "swbus-actor", "swbus-edge", "swss-common", diff --git a/crates/hamgrd/src/actors/dpu.rs b/crates/hamgrd/src/actors/dpu.rs index b16233b5..1932571a 100644 --- a/crates/hamgrd/src/actors/dpu.rs +++ b/crates/hamgrd/src/actors/dpu.rs @@ -234,7 +234,7 @@ impl DpuActor { let bfd_probe_state = match Self::get_bfd_probe_state(incoming) { Ok(bfd_probe_state) => Some(bfd_probe_state), Err(e) => { - info!("Not able to get BFD probe state. Error: {}", e); + debug!("Not able to get BFD probe state. Error: {}", e); None } }; diff --git a/crates/sonic-common/src/log.rs b/crates/sonic-common/src/log.rs index dd845ba4..8f6ddc20 100644 --- a/crates/sonic-common/src/log.rs +++ b/crates/sonic-common/src/log.rs @@ -90,8 +90,7 @@ pub fn init(program_name: &'static str, link_swsscommon_logger: bool) -> Result< let file_subscriber = file_subscriber .with_line_number(true) .with_target(false) - .with_ansi(false) - .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE); + .with_ansi(false); tracing_subscriber::registry() .with(level_layer.and_then(file_subscriber)) diff --git a/crates/swss-common-bridge/Cargo.toml b/crates/swss-common-bridge/Cargo.toml index 0fd2bffc..f1fadc64 100644 --- a/crates/swss-common-bridge/Cargo.toml +++ b/crates/swss-common-bridge/Cargo.toml @@ -14,6 +14,7 @@ swbus-edge = { path = "../swbus-edge" } tokio.workspace = true tokio-util.workspace = true swbus-actor = { path = "../swbus-actor" } +anyhow.workspace = true [lints] workspace = true diff --git a/crates/swss-common-bridge/src/consumer.rs b/crates/swss-common-bridge/src/consumer.rs index 420c3aa1..51d0b7d2 100644 --- a/crates/swss-common-bridge/src/consumer.rs +++ b/crates/swss-common-bridge/src/consumer.rs @@ -1,3 +1,4 @@ +use anyhow::{anyhow, Result}; use std::{collections::HashMap, future::Future, sync::Arc}; use swbus_actor::ActorMessage; use swbus_edge::{ @@ -57,7 +58,10 @@ where let mut table_cache = TableCache::default(); let mut send_kfv = async |kfv: KeyOpFieldValues| { // Merge the kfv to get the whole table as an update - let kfv = table_cache.merge_kfv(kfv); + let kfv = match table_cache.merge_kfv(kfv) { + Ok(kfv) => kfv, + Err(_) => return, // No change, skip sending + }; if !selector(&kfv) { return; } @@ -114,20 +118,30 @@ struct TableCache(HashMap); impl TableCache { /// Merge the update and return a `KeyOpFieldValues` that contains the state of the entire table. - fn merge_kfv(&mut self, kfv: KeyOpFieldValues) -> KeyOpFieldValues { + /// Returns an error if the update doesn't change the existing data. + fn merge_kfv(&mut self, kfv: KeyOpFieldValues) -> Result { match kfv.operation { KeyOperation::Set => { let field_values = self.0.entry(kfv.key.clone()).or_default(); - field_values.extend(kfv.field_values); - KeyOpFieldValues { + + // Check if the new field_values would be the same as the existing ones + let mut new_field_values = field_values.clone(); + new_field_values.extend(kfv.field_values); + + if new_field_values == *field_values { + return Err(anyhow!("No change")); + } + + *field_values = new_field_values; + Ok(KeyOpFieldValues { key: kfv.key, operation: KeyOperation::Set, field_values: field_values.clone(), - } + }) } KeyOperation::Del => { self.0.remove(&kfv.key); - kfv + Ok(kfv) } } } From 60bb2da356d0d82cb94b4a25ba6faaaaa8f5fd0b Mon Sep 17 00:00:00 2001 From: Yue Gao Date: Wed, 6 Aug 2025 17:05:46 -0400 Subject: [PATCH 2/2] Skip no-change update through ProducerBridge --- crates/swss-common-bridge/src/consumer.rs | 48 ++-------- crates/swss-common-bridge/src/lib.rs | 62 +++++++++++++ crates/swss-common-bridge/src/producer.rs | 101 +++++++++++++++++++++- 3 files changed, 164 insertions(+), 47 deletions(-) diff --git a/crates/swss-common-bridge/src/consumer.rs b/crates/swss-common-bridge/src/consumer.rs index 185625a2..8bdb5c3c 100644 --- a/crates/swss-common-bridge/src/consumer.rs +++ b/crates/swss-common-bridge/src/consumer.rs @@ -1,6 +1,6 @@ +use crate::TableCache; use sonic_common::SonicDbTable; -use anyhow::{anyhow, Result}; -use std::{collections::HashMap, future::Future, sync::Arc}; +use std::{future::Future, sync::Arc}; use swbus_actor::ActorMessage; use swbus_edge::{ simple_client::{MessageBody, OutgoingMessage, SimpleSwbusEdgeClient}, @@ -8,7 +8,7 @@ use swbus_edge::{ SwbusEdgeRuntime, }; use swss_common::{ - ConsumerStateTable, FieldValues, KeyOpFieldValues, KeyOperation, SubscriberStateTable, Table, ZmqConsumerStateTable, + ConsumerStateTable, KeyOpFieldValues, KeyOperation, SubscriberStateTable, Table, ZmqConsumerStateTable, }; use tokio::task::JoinHandle; use tokio_util::task::AbortOnDropHandle; @@ -65,9 +65,8 @@ where } // Merge the kfv to get the whole table as an update - let kfv = match table_cache.merge_kfv(kfv) { - Ok(kfv) => kfv, - Err(_) => return, // No change, skip sending + let Some(kfv) = table_cache.merge_kfv(kfv) else { + return; // No change, skip sending }; if !selector(&kfv) { return; @@ -117,43 +116,6 @@ where }) } -/// An in-memory copy of a table. -/// We keep a copy so that we can send the entire table for each update, rather than just the updated fields. -/// This relieves the need for actors to handle partial updates by caching their own copy. -#[derive(Default)] -struct TableCache(HashMap); - -impl TableCache { - /// Merge the update and return a `KeyOpFieldValues` that contains the state of the entire table. - /// Returns an error if the update doesn't change the existing data. - fn merge_kfv(&mut self, kfv: KeyOpFieldValues) -> Result { - match kfv.operation { - KeyOperation::Set => { - let field_values = self.0.entry(kfv.key.clone()).or_default(); - - // Check if the new field_values would be the same as the existing ones - let mut new_field_values = field_values.clone(); - new_field_values.extend(kfv.field_values); - - if new_field_values == *field_values { - return Err(anyhow!("No change")); - } - - *field_values = new_field_values; - Ok(KeyOpFieldValues { - key: kfv.key, - operation: KeyOperation::Set, - field_values: field_values.clone(), - }) - } - KeyOperation::Del => { - self.0.remove(&kfv.key); - Ok(kfv) - } - } - } -} - pub trait ConsumerTable: Send + 'static { /// Wait for updates fn read_data(&mut self) -> impl Future + Send; diff --git a/crates/swss-common-bridge/src/lib.rs b/crates/swss-common-bridge/src/lib.rs index 934046ab..913d0f54 100644 --- a/crates/swss-common-bridge/src/lib.rs +++ b/crates/swss-common-bridge/src/lib.rs @@ -1,2 +1,64 @@ pub mod consumer; pub mod producer; + +use std::collections::HashMap; +use swss_common::{FieldValues, KeyOpFieldValues, KeyOperation}; + +/// An in-memory copy of a table. +/// We keep a copy so that we can skip update if there is no change. The cache is established from the previous +/// request sent to the Producer. If the Producer is restarted, the cache will be empty. +#[derive(Default)] +pub(crate) struct TableCache(HashMap); + +impl TableCache { + /// Merge the update and return a `KeyOpFieldValues` that contains the state of the entire table. + /// Returns None if the update doesn't change the existing data. + fn merge_kfv(&mut self, kfv: KeyOpFieldValues) -> Option { + match kfv.operation { + KeyOperation::Set => { + let field_values = self.0.entry(kfv.key.clone()).or_default(); + + // Check if the new field_values would be the same as the existing ones + let mut new_field_values = field_values.clone(); + new_field_values.extend(kfv.field_values); + + if new_field_values == *field_values { + return None; + } + + *field_values = new_field_values; + Some(KeyOpFieldValues { + key: kfv.key, + operation: KeyOperation::Set, + field_values: field_values.clone(), + }) + } + KeyOperation::Del => { + self.0.remove(&kfv.key); + Some(kfv) + } + } + } + + /// Replace the cached entry with the provided kfv. + /// Returns false if operation is SET and field values are the same as cached ones. + /// For Del it always return true because the local cache is cleared after restart. + fn replace_kfv(&mut self, kfv: KeyOpFieldValues) -> bool { + match kfv.operation { + KeyOperation::Set => { + let field_values = self.0.entry(kfv.key.clone()).or_default(); + + if kfv.field_values == *field_values { + return false; + } + + *field_values = kfv.field_values; + true + } + KeyOperation::Del => { + self.0.remove(&kfv.key); + true + } + } + } +} diff --git a/crates/swss-common-bridge/src/producer.rs b/crates/swss-common-bridge/src/producer.rs index 68077c63..1b04a4bd 100644 --- a/crates/swss-common-bridge/src/producer.rs +++ b/crates/swss-common-bridge/src/producer.rs @@ -1,3 +1,4 @@ +use crate::TableCache; use std::{future::Future, sync::Arc}; use swbus_actor::ActorMessage; use swbus_edge::{ @@ -31,6 +32,7 @@ where { let swbus = SimpleSwbusEdgeClient::new(rt, addr, false, false); tokio::task::spawn(async move { + let mut table_cache = TableCache::default(); loop { let Some(msg) = swbus.recv().await else { // Swbus shut down, we might as well quit. @@ -45,8 +47,12 @@ where let (error_code, error_message) = match ActorMessage::deserialize(&payload) { Ok(actor_msg) => match actor_msg.deserialize_data::() { Ok(kfv) => { - table.apply_kfv(kfv).await; - (SwbusErrorCode::Ok, String::new()) + if table_cache.replace_kfv(kfv.clone()) { + table.apply_kfv(kfv).await; + (SwbusErrorCode::Ok, String::new()) + } else { + (SwbusErrorCode::Ok, "No change in data".to_string()) + } } Err(e) => ( SwbusErrorCode::InvalidPayload, @@ -119,10 +125,10 @@ mod test { SwbusEdgeRuntime, }; use swss_common::{ - ConsumerStateTable, KeyOpFieldValues, ProducerStateTable, ZmqClient, ZmqConsumerStateTable, + ConsumerStateTable, KeyOpFieldValues, KeyOperation, ProducerStateTable, ZmqClient, ZmqConsumerStateTable, ZmqProducerStateTable, ZmqServer, }; - use swss_common_testing::{random_kfvs, random_zmq_endpoint, Redis}; + use swss_common_testing::{random_fvs, random_kfvs, random_string, random_zmq_endpoint, Redis}; use tokio::time::timeout; #[tokio::test] @@ -132,6 +138,13 @@ mod test { let cst = ConsumerStateTable::new(redis.db_connector(), "mytable", None, None).unwrap(); timeout(Duration::from_secs(5), run_test(cst, pst)).await.unwrap(); } + #[tokio::test] + async fn producer_state_table_bridge_check_dup() { + let redis = Redis::start(); + let pst = ProducerStateTable::new(redis.db_connector(), "mytable").unwrap(); + let cst = ConsumerStateTable::new(redis.db_connector(), "mytable", None, None).unwrap(); + timeout(Duration::from_secs(5), run_dup_test(cst, pst)).await.unwrap(); + } #[tokio::test] async fn zmq_ponsumer_state_table_bridge() { @@ -145,6 +158,18 @@ mod test { timeout(Duration::from_secs(5), run_test(zcst, zpst)).await.unwrap(); } + #[tokio::test] + async fn zmq_ponsumer_state_table_bridge_check_dup() { + let (zmq_endpoint, _deleter) = random_zmq_endpoint(); + let mut zmqs = ZmqServer::new(&zmq_endpoint).unwrap(); + let zmqc = ZmqClient::new(&zmq_endpoint).unwrap(); + + let redis = Redis::start(); + let zpst = ZmqProducerStateTable::new(redis.db_connector(), "mytable", zmqc, false).unwrap(); + let zcst = ZmqConsumerStateTable::new(redis.db_connector(), "mytable", &mut zmqs, None, None).unwrap(); + timeout(Duration::from_secs(5), run_dup_test(zcst, zpst)).await.unwrap(); + } + async fn run_test(mut consumer_table: C, producer_table: P) { // Setup swbus let mut swbus_edge = SwbusEdgeRuntime::new("".to_string(), sp("edge")); @@ -182,6 +207,74 @@ mod test { assert_eq!(kfvs, kfvs_received); } + async fn run_dup_test(mut consumer_table: C, producer_table: P) { + // Setup swbus + let mut swbus_edge = SwbusEdgeRuntime::new("".to_string(), sp("edge")); + swbus_edge.start().await.unwrap(); + let rt = Arc::new(swbus_edge); + + // Create edge client to send updates to the bridge + let swbus = SimpleSwbusEdgeClient::new(rt.clone(), sp("receiver"), true, false); + + // Spawn the bridge + let _bridge = ProducerBridge::spawn(rt, sp("mytable-bridge"), producer_table); + + // Send some updates to the bridge + let mut kfvs = Vec::new(); + for _ in 0..10 { + let kfv = KeyOpFieldValues { + key: random_string(), + operation: KeyOperation::Set, + field_values: random_fvs(), + }; + + kfvs.push(kfv); + } + + for kfv in &kfvs { + let msg = OutgoingMessage { + destination: sp("mytable-bridge"), + body: MessageBody::Request { + payload: encode_kfv(kfv), + }, + }; + swbus.send(msg).await.unwrap(); + } + + // Receive the updates directly + let mut kfvs_received = Vec::new(); + while kfvs_received.len() < kfvs.len() { + consumer_table.read_data().await; + kfvs_received.extend(consumer_table.pops().await); + } + + // Assert we got all the same updates + kfvs.sort_unstable(); + kfvs_received.sort_unstable(); + assert_eq!(kfvs, kfvs_received); + + // Resend the same updates to the producer bridge + for kfv in &kfvs { + let msg = OutgoingMessage { + destination: sp("dpu-bridge"), + body: MessageBody::Request { + payload: encode_kfv(kfv), + }, + }; + swbus.send(msg).await.unwrap(); + println!("Sent: {}", kfv.key); + } + let result = timeout(Duration::from_secs(3), consumer_table.read_data()).await; + if result.is_ok() { + // If we got here, it means the bridge did not skip the updates + let received = consumer_table.pops().await; + for kfv in received { + println!("Received: {}", kfv.key); + } + panic!("Expected bridge to skip duplicate updates, but it processed them"); + } + } + fn encode_kfv(kfv: &KeyOpFieldValues) -> Vec { ActorMessage::new("", kfv).unwrap().serialize() }