Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/hamgrd/src/actors/dpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ impl DpuActor {
let bfd_probe_state = match Self::get_bfd_probe_state(incoming) {
Ok(bfd_probe_state) => Some(bfd_probe_state),
Err(e) => {
info!("Not able to get BFD probe state. Error: {}", e);
debug!("Not able to get BFD probe state. Error: {}", e);
None
}
};
Expand Down
3 changes: 1 addition & 2 deletions crates/sonic-common/src/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ pub fn init(program_name: &'static str, link_swsscommon_logger: bool) -> Result<
let file_subscriber = file_subscriber
.with_line_number(true)
.with_target(false)
.with_ansi(false)
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE);
.with_ansi(false);

tracing_subscriber::registry()
.with(level_layer.and_then(file_subscriber))
Expand Down
36 changes: 6 additions & 30 deletions crates/swss-common-bridge/src/consumer.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use crate::TableCache;
use sonic_common::SonicDbTable;
use std::{collections::HashMap, future::Future, sync::Arc};
use std::{future::Future, sync::Arc};
use swbus_actor::ActorMessage;
use swbus_edge::{
simple_client::{MessageBody, OutgoingMessage, SimpleSwbusEdgeClient},
swbus_proto::swbus::ServicePath,
SwbusEdgeRuntime,
};
use swss_common::{
ConsumerStateTable, FieldValues, KeyOpFieldValues, KeyOperation, SubscriberStateTable, Table, ZmqConsumerStateTable,
ConsumerStateTable, KeyOpFieldValues, KeyOperation, SubscriberStateTable, Table, ZmqConsumerStateTable,
};
use tokio::task::JoinHandle;
use tokio_util::task::AbortOnDropHandle;
Expand Down Expand Up @@ -64,7 +65,9 @@ where
}

// Merge the kfv to get the whole table as an update
let kfv = table_cache.merge_kfv(kfv);
let Some(kfv) = table_cache.merge_kfv(kfv) else {
return; // No change, skip sending
};
if !selector(&kfv) {
return;
}
Expand Down Expand Up @@ -113,33 +116,6 @@ where
})
}

/// An in-memory copy of a table.
/// We keep a copy so that we can send the entire table for each update, rather than just the updated fields.
/// This relieves the need for actors to handle partial updates by caching their own copy.
#[derive(Default)]
struct TableCache(HashMap<String, FieldValues>);

impl TableCache {
/// Merge the update and return a `KeyOpFieldValues` that contains the state of the entire table.
fn merge_kfv(&mut self, kfv: KeyOpFieldValues) -> KeyOpFieldValues {
match kfv.operation {
KeyOperation::Set => {
let field_values = self.0.entry(kfv.key.clone()).or_default();
field_values.extend(kfv.field_values);
KeyOpFieldValues {
key: kfv.key,
operation: KeyOperation::Set,
field_values: field_values.clone(),
}
}
KeyOperation::Del => {
self.0.remove(&kfv.key);
kfv
}
}
}
}

pub trait ConsumerTable: Send + 'static {
/// Wait for updates
fn read_data(&mut self) -> impl Future<Output = ()> + Send;
Expand Down
62 changes: 62 additions & 0 deletions crates/swss-common-bridge/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,64 @@
pub mod consumer;
pub mod producer;

use std::collections::HashMap;
use swss_common::{FieldValues, KeyOpFieldValues, KeyOperation};

/// An in-memory copy of a table.
/// We keep a copy so that we can skip update if there is no change. The cache is established from the previous
/// request sent to the Producer. If the Producer is restarted, the cache will be empty.
#[derive(Default)]
pub(crate) struct TableCache(HashMap<String, FieldValues>);

impl TableCache {
/// Merge the update and return a `KeyOpFieldValues` that contains the state of the entire table.
/// Returns None if the update doesn't change the existing data.
fn merge_kfv(&mut self, kfv: KeyOpFieldValues) -> Option<KeyOpFieldValues> {
match kfv.operation {
KeyOperation::Set => {
let field_values = self.0.entry(kfv.key.clone()).or_default();

// Check if the new field_values would be the same as the existing ones
let mut new_field_values = field_values.clone();
new_field_values.extend(kfv.field_values);

if new_field_values == *field_values {
return None;
}

*field_values = new_field_values;
Some(KeyOpFieldValues {
key: kfv.key,
operation: KeyOperation::Set,
field_values: field_values.clone(),
})
}
KeyOperation::Del => {
self.0.remove(&kfv.key);
Some(kfv)
}
}
}

/// Replace the cached entry with the provided kfv.
/// Returns false if operation is SET and field values are the same as cached ones.
/// For Del it always return true because the local cache is cleared after restart.
fn replace_kfv(&mut self, kfv: KeyOpFieldValues) -> bool {
match kfv.operation {
KeyOperation::Set => {
let field_values = self.0.entry(kfv.key.clone()).or_default();

if kfv.field_values == *field_values {
return false;
}

*field_values = kfv.field_values;
true
}
KeyOperation::Del => {
self.0.remove(&kfv.key);
true
}
}
}
}
101 changes: 97 additions & 4 deletions crates/swss-common-bridge/src/producer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::TableCache;
use std::{future::Future, sync::Arc};
use swbus_actor::ActorMessage;
use swbus_edge::{
Expand Down Expand Up @@ -31,6 +32,7 @@ where
{
let swbus = SimpleSwbusEdgeClient::new(rt, addr, false, false);
tokio::task::spawn(async move {
let mut table_cache = TableCache::default();
loop {
let Some(msg) = swbus.recv().await else {
// Swbus shut down, we might as well quit.
Expand All @@ -45,8 +47,12 @@ where
let (error_code, error_message) = match ActorMessage::deserialize(&payload) {
Ok(actor_msg) => match actor_msg.deserialize_data::<KeyOpFieldValues>() {
Ok(kfv) => {
table.apply_kfv(kfv).await;
(SwbusErrorCode::Ok, String::new())
if table_cache.replace_kfv(kfv.clone()) {
table.apply_kfv(kfv).await;
(SwbusErrorCode::Ok, String::new())
} else {
(SwbusErrorCode::Ok, "No change in data".to_string())
}
}
Err(e) => (
SwbusErrorCode::InvalidPayload,
Expand Down Expand Up @@ -119,10 +125,10 @@ mod test {
SwbusEdgeRuntime,
};
use swss_common::{
ConsumerStateTable, KeyOpFieldValues, ProducerStateTable, ZmqClient, ZmqConsumerStateTable,
ConsumerStateTable, KeyOpFieldValues, KeyOperation, ProducerStateTable, ZmqClient, ZmqConsumerStateTable,
ZmqProducerStateTable, ZmqServer,
};
use swss_common_testing::{random_kfvs, random_zmq_endpoint, Redis};
use swss_common_testing::{random_fvs, random_kfvs, random_string, random_zmq_endpoint, Redis};
use tokio::time::timeout;

#[tokio::test]
Expand All @@ -132,6 +138,13 @@ mod test {
let cst = ConsumerStateTable::new(redis.db_connector(), "mytable", None, None).unwrap();
timeout(Duration::from_secs(5), run_test(cst, pst)).await.unwrap();
}
#[tokio::test]
async fn producer_state_table_bridge_check_dup() {
let redis = Redis::start();
let pst = ProducerStateTable::new(redis.db_connector(), "mytable").unwrap();
let cst = ConsumerStateTable::new(redis.db_connector(), "mytable", None, None).unwrap();
timeout(Duration::from_secs(5), run_dup_test(cst, pst)).await.unwrap();
}

#[tokio::test]
async fn zmq_ponsumer_state_table_bridge() {
Expand All @@ -145,6 +158,18 @@ mod test {
timeout(Duration::from_secs(5), run_test(zcst, zpst)).await.unwrap();
}

#[tokio::test]
async fn zmq_ponsumer_state_table_bridge_check_dup() {
let (zmq_endpoint, _deleter) = random_zmq_endpoint();
let mut zmqs = ZmqServer::new(&zmq_endpoint).unwrap();
let zmqc = ZmqClient::new(&zmq_endpoint).unwrap();

let redis = Redis::start();
let zpst = ZmqProducerStateTable::new(redis.db_connector(), "mytable", zmqc, false).unwrap();
let zcst = ZmqConsumerStateTable::new(redis.db_connector(), "mytable", &mut zmqs, None, None).unwrap();
timeout(Duration::from_secs(5), run_dup_test(zcst, zpst)).await.unwrap();
}

async fn run_test<C: ConsumerTable, P: ProducerTable>(mut consumer_table: C, producer_table: P) {
// Setup swbus
let mut swbus_edge = SwbusEdgeRuntime::new("<none>".to_string(), sp("edge"));
Expand Down Expand Up @@ -182,6 +207,74 @@ mod test {
assert_eq!(kfvs, kfvs_received);
}

async fn run_dup_test<C: ConsumerTable, P: ProducerTable>(mut consumer_table: C, producer_table: P) {
// Setup swbus
let mut swbus_edge = SwbusEdgeRuntime::new("<none>".to_string(), sp("edge"));
swbus_edge.start().await.unwrap();
let rt = Arc::new(swbus_edge);

// Create edge client to send updates to the bridge
let swbus = SimpleSwbusEdgeClient::new(rt.clone(), sp("receiver"), true, false);

// Spawn the bridge
let _bridge = ProducerBridge::spawn(rt, sp("mytable-bridge"), producer_table);

// Send some updates to the bridge
let mut kfvs = Vec::new();
for _ in 0..10 {
let kfv = KeyOpFieldValues {
key: random_string(),
operation: KeyOperation::Set,
field_values: random_fvs(),
};

kfvs.push(kfv);
}

for kfv in &kfvs {
let msg = OutgoingMessage {
destination: sp("mytable-bridge"),
body: MessageBody::Request {
payload: encode_kfv(kfv),
},
};
swbus.send(msg).await.unwrap();
}

// Receive the updates directly
let mut kfvs_received = Vec::new();
while kfvs_received.len() < kfvs.len() {
consumer_table.read_data().await;
kfvs_received.extend(consumer_table.pops().await);
}

// Assert we got all the same updates
kfvs.sort_unstable();
kfvs_received.sort_unstable();
assert_eq!(kfvs, kfvs_received);

// Resend the same updates to the producer bridge
for kfv in &kfvs {
let msg = OutgoingMessage {
destination: sp("dpu-bridge"),
body: MessageBody::Request {
payload: encode_kfv(kfv),
},
};
swbus.send(msg).await.unwrap();
println!("Sent: {}", kfv.key);
}
let result = timeout(Duration::from_secs(3), consumer_table.read_data()).await;
if result.is_ok() {
// If we got here, it means the bridge did not skip the updates
let received = consumer_table.pops().await;
for kfv in received {
println!("Received: {}", kfv.key);
}
panic!("Expected bridge to skip duplicate updates, but it processed them");
}
}

fn encode_kfv(kfv: &KeyOpFieldValues) -> Vec<u8> {
ActorMessage::new("", kfv).unwrap().serialize()
}
Expand Down