From 3999fcb23173d18548d2767ea48b77ec40ea31c4 Mon Sep 17 00:00:00 2001 From: yue-fred-gao <132678244+yue-fred-gao@users.noreply.github.com> Date: Fri, 3 Oct 2025 14:01:37 -0400 Subject: [PATCH 01/10] Fix bouncing route exchange (#117) ### why swbusd incorrectly handling route announce to others that is routed through it due to the queue to the direct connection was full ### what this PR does route announcement should not be routed through others --- crates/swbus-core/src/mux/conn_worker.rs | 12 +++++++++--- crates/swbus-core/src/mux/nexthop.rs | 2 +- crates/swbus-core/src/mux/route_annoucer.rs | 8 +++++--- test_utils/hamgrd/redis_data_set.cmd | 2 +- 4 files changed, 16 insertions(+), 8 deletions(-) diff --git a/crates/swbus-core/src/mux/conn_worker.rs b/crates/swbus-core/src/mux/conn_worker.rs index 7a38ed2..75babb5 100644 --- a/crates/swbus-core/src/mux/conn_worker.rs +++ b/crates/swbus-core/src/mux/conn_worker.rs @@ -148,9 +148,15 @@ where } } Some(swbus_message::Body::RouteAnnouncement(route_entries)) => { - // drop route announcement message - debug!("Received route announcement"); - self.mux.process_route_announcement(route_entries, &self.info)?; + let my_sp = self.mux.get_my_service_path(); + if message.header.as_ref().unwrap().destination.as_ref().unwrap() == my_sp { + // Message is destined for this service, process it locally + debug!("Received route announcement"); + self.mux.process_route_announcement(route_entries, &self.info)?; + } else { + // drop route announcement message + debug!("Dropping route announcement not destined for me"); + } } Some(swbus_message::Body::ManagementRequest(ref mgmt_request)) => { let my_sp = self.mux.get_my_service_path(); diff --git a/crates/swbus-core/src/mux/nexthop.rs b/crates/swbus-core/src/mux/nexthop.rs index 3efe52c..e121264 100644 --- a/crates/swbus-core/src/mux/nexthop.rs +++ b/crates/swbus-core/src/mux/nexthop.rs @@ -114,7 +114,7 @@ impl SwbusNextHop { NextHopType::Remote => { let header: &mut SwbusMessageHeader = message.header.as_mut().expect("missing header"); // should not happen otherwise it won't reach here header.ttl -= 1; - if header.ttl == 0 { + if header.ttl < self.hop_count { debug!("TTL expired"); let response = SwbusMessage::new_response( message.header.as_ref().unwrap(), diff --git a/crates/swbus-core/src/mux/route_annoucer.rs b/crates/swbus-core/src/mux/route_annoucer.rs index 7ad9097..d30ccb2 100644 --- a/crates/swbus-core/src/mux/route_annoucer.rs +++ b/crates/swbus-core/src/mux/route_annoucer.rs @@ -99,7 +99,7 @@ impl RouteAnnouncer { #[instrument(name = "send_route_announcement", level = "debug", skip_all)] async fn send_route_announcement(&self, conn_info: &SwbusConnInfo, routes: &RouteEntries) -> Result<()> { let dest_sp = conn_info.remote_service_path().clone(); - let msg = SwbusMessage { + let mut msg = SwbusMessage { header: Some(SwbusMessageHeader::new( self.mux.get_my_service_path().clone(), dest_sp, @@ -107,6 +107,8 @@ impl RouteAnnouncer { )), body: Some(swbus_message::Body::RouteAnnouncement(routes.clone())), }; + // count itself as 1 hop. The message is only meant for direct neighbors + msg.header.as_mut().unwrap().ttl = 2; debug!( "Sending route announcement to {}, conn_info {:?}, message {:?}", conn_info.remote_service_path(), @@ -167,7 +169,7 @@ mod tests { "header": { "version": 1, "flag": 0, - "ttl": 63, + "ttl": 1, "source": "region-a.cluster-a.node0", "destination": "region-a.cluster-a.node1" }, @@ -211,7 +213,7 @@ mod tests { "header": { "version": 1, "flag": 0, - "ttl": 63, + "ttl": 1, "source": "region-a.cluster-a.node0", "destination": "region-a.cluster-a.node2" }, diff --git a/test_utils/hamgrd/redis_data_set.cmd b/test_utils/hamgrd/redis_data_set.cmd index b9bcc6c..c64989e 100644 --- a/test_utils/hamgrd/redis_data_set.cmd +++ b/test_utils/hamgrd/redis_data_set.cmd @@ -70,4 +70,4 @@ HSET DPU_STATE|dpu7 dpu_data_plane_state up select 0 HSET DASH_HA_SET_CONFIG_TABLE:haset0_0 pb "0a013112050d00010203220576647075302205766470753128013a057664707530" -HSET DASH_HA_SCOPE_CONFIG_TABLE:vdpu0:haset0_0 pb "0a01311001180122067465737469642802" +HSET DASH_HA_SCOPE_CONFIG_TABLE:vdpu0:haset0_0 pb "0a01311001180122086861736574305f302802" From bc636b59547a044fa2c932fa97345f80135d20f7 Mon Sep 17 00:00:00 2001 From: dypet Date: Tue, 21 Oct 2025 11:46:54 -0600 Subject: [PATCH 02/10] Add neigh resolve (#120) Remove local_nexthop_ip and set NEIGH_RESOLVE_TABLE for PA address configured on interface. --- Cargo.lock | 4 +-- crates/hamgrd/src/actors/dpu.rs | 42 ++++++++++++++++++++++++-- crates/hamgrd/src/actors/ha_set.rs | 6 ++-- crates/hamgrd/src/actors/test.rs | 2 -- crates/hamgrd/src/db_structs.rs | 12 +++++--- crates/hamgrd/src/ha_actor_messages.rs | 3 -- crates/hamgrd/src/main.rs | 8 ++++- test_utils/hamgrd/redis_data_set.cmd | 1 + 8 files changed, 60 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d504ecf..e595aaa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2144,7 +2144,7 @@ dependencies = [ [[package]] name = "swss-common" version = "0.1.0" -source = "git+https://github.com/sonic-net/sonic-swss-common.git?branch=master#1484a851dbfdd4b122c361cd7ea03eca0afe5d63" +source = "git+https://github.com/sonic-net/sonic-swss-common.git?branch=master#7407a2e6060f9e8c0423e5886b96f1146dbca315" dependencies = [ "bindgen", "getset", @@ -2178,7 +2178,7 @@ dependencies = [ [[package]] name = "swss-common-testing" version = "0.1.0" -source = "git+https://github.com/sonic-net/sonic-swss-common.git?branch=master#1484a851dbfdd4b122c361cd7ea03eca0afe5d63" +source = "git+https://github.com/sonic-net/sonic-swss-common.git?branch=master#7407a2e6060f9e8c0423e5886b96f1146dbca315" dependencies = [ "lazy_static", "rand", diff --git a/crates/hamgrd/src/actors/dpu.rs b/crates/hamgrd/src/actors/dpu.rs index 0c994e4..f714e83 100644 --- a/crates/hamgrd/src/actors/dpu.rs +++ b/crates/hamgrd/src/actors/dpu.rs @@ -1,6 +1,7 @@ use crate::actors::{spawn_consumer_bridge_for_actor, ActorCreator}; use crate::db_structs::{ - BfdSessionTable, DashBfdProbeState, DashHaGlobalConfig, Dpu, DpuPmonStateType, DpuState, RemoteDpu, + BfdSessionTable, DashBfdProbeState, DashHaGlobalConfig, Dpu, DpuPmonStateType, DpuState, NeighResolveTable, + RemoteDpu, }; use crate::ha_actor_messages::{ActorRegistration, DpuActorState, RegistrationType}; use crate::ServicePath; @@ -169,6 +170,10 @@ impl DpuActor { if is_managed { if first_time { + if let Err(e) = self.set_neigh_resolve(outgoing) { + error!("Failed to set NEIGH_RESOLVE_TABLE: {}", e); + } + // DASH_HA_GLOBAL_CONFIG from common-bridge sent to this actor instance only. // Key is DASH_HA_GLOBAL_CONFIG self.bridges.push( @@ -463,6 +468,30 @@ impl DpuActor { } false } + + fn set_neigh_resolve(&self, outgoing: &mut Outgoing) -> Result<()> { + let Some(DpuData::LocalDpu { ref dpu, .. }) = self.dpu else { + debug!("DPU is not managed by this HA instance, do not set NEIGH_RESOLVE_TABLE"); + return Ok(()); + }; + + let swss_key = format!("{}:{}", "", dpu.pa_ipv4.clone()); + let entry = NeighResolveTable { + mac: "00:00:00:00:00:00".to_string(), + }; + + let fv = swss_serde::to_field_values(&entry)?; + let kfv = KeyOpFieldValues { + key: swss_key, + operation: KeyOperation::Set, + field_values: fv, + }; + + let msg = ActorMessage::new(self.id.clone(), &kfv)?; + outgoing.send(outgoing.common_bridge_sp::(), msg); + + Ok(()) + } } impl Actor for DpuActor { @@ -504,8 +533,9 @@ mod test { dpu::DpuActor, test::{self, *}, }; - use crate::db_structs::{BfdSessionTable, DashBfdProbeState, DashHaGlobalConfig, Dpu, DpuState, RemoteDpu}; - + use crate::db_structs::{ + BfdSessionTable, DashBfdProbeState, DashHaGlobalConfig, Dpu, DpuState, NeighResolveTable, RemoteDpu, + }; use crate::ha_actor_messages::DpuActorState; use sonic_common::SonicDbTable; use std::time::Duration; @@ -514,6 +544,9 @@ mod test { #[tokio::test] async fn dpu_actor() { + // To enable trace, set ENABLE_TRACE=1 to run test + sonic_common::log::init_logger_for_test(); + let _ = Redis::start_config_db(); let runtime = test::create_actor_runtime(0, "10.0.0.0", "10::").await; // prepare test data @@ -566,6 +599,9 @@ mod test { // Receiving DPU config-db object from swss-common bridge send! { key: Dpu::table_name(), data: { "key": "switch0_dpu0", "operation": "Set", "field_values": dpu_fvs}, addr: crate::common_bridge_sp::(&runtime.get_swbus_edge()) }, + // Check for NeighResolveTable message + recv! { key: "switch0_dpu0", data: {"key": format!("{}:{}", "".to_string(), dpu_actor_state_wo_bfd.pa_ipv4.clone()), "operation": "Set", "field_values": {"mac":"00:00:00:00:00:00"}}, + addr: crate::common_bridge_sp::(&runtime.get_swbus_edge()) }, send! { key: "REMOTE_DPU|switch1_dpu0", data: { "key": "REMOTE_DPU|switch1_dpu0", "operation": "Set", "field_values": serde_json::to_value(&remote_dpu1_fvs).unwrap() }}, send! { key: DashHaGlobalConfig::table_name(), data: { "key": DashHaGlobalConfig::table_name(), "operation": "Set", "field_values": dash_global_cfg_fvs} }, recv! { key: "switch0_dpu0", data: {"key": "default:default:10.0.0.0", "operation": "Set", "field_values": bfd_fvs}, diff --git a/crates/hamgrd/src/actors/ha_set.rs b/crates/hamgrd/src/actors/ha_set.rs index 949a4fa..466086a 100644 --- a/crates/hamgrd/src/actors/ha_set.rs +++ b/crates/hamgrd/src/actors/ha_set.rs @@ -199,8 +199,8 @@ impl HaSetActor { for vdpu_ext in vdpus { if !vdpu_ext.vdpu.dpu.remote_dpu { - // if it is locally managed dpu, use local nexthop as endpoint - endpoint.push(vdpu_ext.vdpu.dpu.local_nexthop_ip.clone()); + // if it is locally managed dpu, use PA address as endpoint + endpoint.push(vdpu_ext.vdpu.dpu.pa_ipv4.clone()); } else { endpoint.push(vdpu_ext.vdpu.dpu.npu_ipv4.clone()); } @@ -208,7 +208,7 @@ impl HaSetActor { endpoint_monitor.push(vdpu_ext.vdpu.dpu.pa_ipv4.clone()); if vdpu_ext.is_primary { if !vdpu_ext.vdpu.dpu.remote_dpu { - primary.push(vdpu_ext.vdpu.dpu.local_nexthop_ip.clone()); + primary.push(vdpu_ext.vdpu.dpu.pa_ipv4.clone()); } else { primary.push(vdpu_ext.vdpu.dpu.npu_ipv4.clone()); } diff --git a/crates/hamgrd/src/actors/test.rs b/crates/hamgrd/src/actors/test.rs index 567277f..e254f6a 100644 --- a/crates/hamgrd/src/actors/test.rs +++ b/crates/hamgrd/src/actors/test.rs @@ -373,7 +373,6 @@ pub fn make_dpu_object(switch: u16, dpu: u32) -> Dpu { vip_ipv6: Some(normalize_ipv6(&format!("3:2:{switch_pair_id}::{dpu}"))), pa_ipv4: format!("18.0.{switch}.{dpu}"), pa_ipv6: Some(normalize_ipv6(&format!("18:0:{switch}::{dpu}"))), - local_nexthop_ip: format!("18.0.{switch}.{dpu}"), dpu_id: dpu, vdpu_id: Some(format!("vdpu{}", switch * 8 + dpu as u16)), orchagent_zmq_port: 8100, @@ -453,7 +452,6 @@ pub fn to_local_dpu(dpu_actor_state: &DpuActorState) -> Dpu { vip_ipv6: dpu_actor_state.vip_ipv6.clone(), pa_ipv4: dpu_actor_state.pa_ipv4.clone(), pa_ipv6: dpu_actor_state.pa_ipv6.clone(), - local_nexthop_ip: dpu_actor_state.local_nexthop_ip.clone(), dpu_id: dpu_actor_state.dpu_id, vdpu_id: dpu_actor_state.vdpu_id.clone(), orchagent_zmq_port: dpu_actor_state.orchagent_zmq_port, diff --git a/crates/hamgrd/src/db_structs.rs b/crates/hamgrd/src/db_structs.rs index 5fb3c0a..36e8c63 100644 --- a/crates/hamgrd/src/db_structs.rs +++ b/crates/hamgrd/src/db_structs.rs @@ -43,7 +43,6 @@ pub struct Dpu { pub vip_ipv6: Option, pub pa_ipv4: String, pub pa_ipv6: Option, - pub local_nexthop_ip: String, pub dpu_id: u32, pub vdpu_id: Option, pub orchagent_zmq_port: u16, @@ -486,6 +485,14 @@ pub fn get_dpu_config_from_db(dpu_id: u32) -> Result { Err(anyhow::anyhow!("DPU entry not found for slot {}", dpu_id)) } +#[skip_serializing_none] +#[serde_as] +#[derive(Default, Debug, Deserialize, Serialize, PartialEq, SonicDb)] +#[sonicdb(table_name = "NEIGH_RESOLVE_TABLE", key_separator = ":", db_name = "APPL_DB")] +pub struct NeighResolveTable { + pub mac: String, +} + #[cfg(test)] mod test { use super::*; @@ -500,7 +507,6 @@ mod test { "operation": "Set", "field_values": { "pa_ipv4": "1.2.3.4", - "local_nexthop_ip": "2.2.2.5", "dpu_id": "1", "orchagent_zmq_port": "8100", "swbus_port": "23606", @@ -552,7 +558,6 @@ mod test { pa_ipv4: "1.2.3.6".to_string(), vip_ipv4: Some("4.5.6.6".to_string()), pa_ipv6: None, - local_nexthop_ip: "2.2.2.5".to_string(), dpu_id: 6, orchagent_zmq_port: 8100, swbus_port: 23612, @@ -571,7 +576,6 @@ mod test { for d in 6..8 { let dpu_fvs = vec![ ("pa_ipv4".to_string(), Ipv4Addr::new(1, 2, 3, d).to_string()), - ("local_nexthop_ip".to_string(), Ipv4Addr::new(2, 2, 2, 5).to_string()), ("vip_ipv4".to_string(), Ipv4Addr::new(4, 5, 6, d).to_string()), ("dpu_id".to_string(), d.to_string()), ("orchagent_zmq_port".to_string(), "8100".to_string()), diff --git a/crates/hamgrd/src/ha_actor_messages.rs b/crates/hamgrd/src/ha_actor_messages.rs index f6e5582..934da16 100644 --- a/crates/hamgrd/src/ha_actor_messages.rs +++ b/crates/hamgrd/src/ha_actor_messages.rs @@ -25,7 +25,6 @@ pub struct DpuActorState { pub vip_ipv6: Option, pub pa_ipv4: String, pub pa_ipv6: Option, - pub local_nexthop_ip: String, pub dpu_id: u32, pub vdpu_id: Option, pub orchagent_zmq_port: u16, @@ -57,7 +56,6 @@ impl DpuActorState { vip_ipv6: dpu.vip_ipv6.clone(), pa_ipv4: dpu.pa_ipv4.clone(), pa_ipv6: dpu.pa_ipv6.clone(), - local_nexthop_ip: dpu.local_nexthop_ip.clone(), dpu_id: dpu.dpu_id, vdpu_id: dpu.vdpu_id.clone(), orchagent_zmq_port: dpu.orchagent_zmq_port, @@ -81,7 +79,6 @@ impl DpuActorState { vip_ipv6: None, pa_ipv4: rdpu.pa_ipv4.clone(), pa_ipv6: rdpu.pa_ipv6.clone(), - local_nexthop_ip: "".to_string(), dpu_id: rdpu.dpu_id, vdpu_id: None, orchagent_zmq_port: 0, diff --git a/crates/hamgrd/src/main.rs b/crates/hamgrd/src/main.rs index 4fc064b..c9aea26 100644 --- a/crates/hamgrd/src/main.rs +++ b/crates/hamgrd/src/main.rs @@ -24,7 +24,9 @@ mod ha_actor_messages; use actors::{dpu::DpuActor, ha_scope::HaScopeActor, ha_set::HaSetActor, vdpu::VDpuActor, DbBasedActor}; use actors::{spawn_vanilla_producer_bridge, spawn_zmq_producer_bridge}; use anyhow::Result; -use db_structs::{BfdSessionTable, DashHaScopeTable, DashHaSetTable, Dpu, VDpu, VnetRouteTunnelTable}; +use db_structs::{ + BfdSessionTable, DashHaScopeTable, DashHaSetTable, Dpu, NeighResolveTable, VDpu, VnetRouteTunnelTable, +}; use lazy_static::lazy_static; use sonic_dash_api_proto::{ha_scope_config::HaScopeConfig, ha_set_config::HaSetConfig}; use std::any::Any; @@ -154,6 +156,10 @@ async fn spawn_producer_bridges(edge_runtime: Arc, dpu: &Dpu) let handle = spawn_vanilla_producer_bridge::(edge_runtime.clone()).await?; handles.push(handle); + + let handle = spawn_vanilla_producer_bridge::(edge_runtime.clone()).await?; + handles.push(handle); + Ok(handles) } diff --git a/test_utils/hamgrd/redis_data_set.cmd b/test_utils/hamgrd/redis_data_set.cmd index c64989e..06d9ab6 100644 --- a/test_utils/hamgrd/redis_data_set.cmd +++ b/test_utils/hamgrd/redis_data_set.cmd @@ -2,6 +2,7 @@ select 4 hset DEVICE_METADATA|localhost region region-a cluster cluster-a HSET "LOOPBACK_INTERFACE|Loopback0|127.0.0.1/32" "NULL" "NULL" hset LOOPBACK_INTERFACE|Loopback0 NULL NULL +HSET "INTERFACE|Ethernet0|18.0.202.0/31" "NULL" "NULL" HSET "DPU|switch0_dpu0" "dpu_id" "0" HSET "DPU|switch0_dpu0" "gnmi_port" "50051" HSET "DPU|switch0_dpu0" "local_port" "8080" From 4614a45105186de625b204a587adb82993929299 Mon Sep 17 00:00:00 2001 From: yue-fred-gao <132678244+yue-fred-gao@users.noreply.github.com> Date: Tue, 21 Oct 2025 13:47:19 -0400 Subject: [PATCH 03/10] Bfd probe update lost (#121) ### why We observed the first bfd probe update is lost which is when bfd session comes up the first time after ha established. This is because SubscriberStateTable constructor already populated m_buffer with initial snapshot. At the first time SubscriberStateTable::pops is called, it only returns the data in m_buffer, which are the entries before update. In dash-ha, rehydrate (the operation to pull initial data) is through table get. Then it enters select loop _ = table.read_data() => { for kfv in table.pops().await { send_kfv(kfv).await; } } read_data returns when there is update to the table, which means at the first update, pops returns the old state stored in m_buffer. ### what this PR does for SubscriberStateTable, implement rehydrate using SubscriberStateTable pops, which is by the design of SubscriberStateTable in swss-common. --- Cargo.lock | 1 + crates/swss-common-bridge/Cargo.toml | 2 +- crates/swss-common-bridge/src/consumer.rs | 35 +++++++++++++++++++++-- 3 files changed, 34 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e595aaa..38ec0ba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2173,6 +2173,7 @@ dependencies = [ "swss-common-testing", "tokio", "tokio-util", + "tracing", ] [[package]] diff --git a/crates/swss-common-bridge/Cargo.toml b/crates/swss-common-bridge/Cargo.toml index 9192228..bca7ce6 100644 --- a/crates/swss-common-bridge/Cargo.toml +++ b/crates/swss-common-bridge/Cargo.toml @@ -22,7 +22,7 @@ serde.workspace = true serde_json.workspace = true sonicdb-derive.workspace = true sonic-common.workspace = true - +tracing.workspace = true [lints] workspace = true diff --git a/crates/swss-common-bridge/src/consumer.rs b/crates/swss-common-bridge/src/consumer.rs index 9bdea5f..c07f6f4 100644 --- a/crates/swss-common-bridge/src/consumer.rs +++ b/crates/swss-common-bridge/src/consumer.rs @@ -12,7 +12,7 @@ use swss_common::{ }; use tokio::task::JoinHandle; use tokio_util::task::AbortOnDropHandle; - +use tracing::debug; pub struct ConsumerBridge { _task: AbortOnDropHandle<()>, } @@ -56,6 +56,7 @@ where F: FnMut(&KeyOpFieldValues) -> (ServicePath, String) + Send + 'static, S: Fn(&KeyOpFieldValues) -> bool + Sync + Send + 'static, { + let my_sp = addr.clone(); let swbus = SimpleSwbusEdgeClient::new(rt, addr, false, false); tokio::task::spawn(async move { let mut table_cache = TableCache::default(); @@ -63,12 +64,17 @@ where if P::is_proto() { P::convert_pb_to_json(&mut kfv); } - + debug!("{}: receiving update: {:?}", my_sp.to_longest_path(), kfv); // Merge the kfv to get the whole table as an update let Some(kfv) = table_cache.merge_kfv(kfv) else { + debug!("{}: No change in table, skipping send", my_sp.to_longest_path()); return; // No change, skip sending }; if !selector(&kfv) { + debug!( + "{}: update does not match selector, skipping send", + my_sp.to_longest_path() + ); return; } @@ -175,7 +181,30 @@ macro_rules! impl_consumertable { }; } -impl_consumertable! { ConsumerStateTable[true] SubscriberStateTable[true] ZmqConsumerStateTable[false] } +impl_consumertable! { ConsumerStateTable[true] ZmqConsumerStateTable[false] } + +// Custom implementation for SubscriberStateTable to handle rehydration differently +impl ConsumerTable for SubscriberStateTable { + async fn read_data(&mut self) { + SubscriberStateTable::read_data_async(self) + .await + .expect("SubscriberStateTable::read_data_async io error"); + } + + async fn pops(&mut self) -> Vec { + SubscriberStateTable::pops_async(self) + .await + .expect("SubscriberStateTable::pops_async threw an exception") + } + + async fn rehydrate(&mut self) -> Vec { + // Custom rehydration for SubscriberStateTable: + // The constructor already populated m_buffer with initial snapshot, + // so we just drain it here to get the rehydration data. + // This also prevents the stale buffer from being mixed with live updates. + self.pops().await + } +} #[cfg(test)] mod test { From 522570c4dbd798c59b319d33cfe83c7604d565bb Mon Sep 17 00:00:00 2001 From: yue-fred-gao <132678244+yue-fred-gao@users.noreply.github.com> Date: Tue, 28 Oct 2025 20:33:02 -0400 Subject: [PATCH 04/10] Add custom_bfd (#125) ### why new vnetorch implementation requires monitoring='custom_bfd' to be set. ### what this PR does set monitoring='custom_bfd' when create vnet route --- crates/hamgrd/src/actors/ha_set.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/hamgrd/src/actors/ha_set.rs b/crates/hamgrd/src/actors/ha_set.rs index 466086a..70467ef 100644 --- a/crates/hamgrd/src/actors/ha_set.rs +++ b/crates/hamgrd/src/actors/ha_set.rs @@ -228,7 +228,7 @@ impl HaSetActor { let vnet_route = VnetRouteTunnelTable { endpoint, endpoint_monitor: Some(endpoint_monitor), - monitoring: None, + monitoring: Some("custom_bfd".into()), primary: Some(primary), rx_monitor_timer: global_cfg.dpu_bfd_probe_interval_in_ms, tx_monitor_timer: global_cfg.dpu_bfd_probe_interval_in_ms, @@ -564,7 +564,7 @@ mod test { vdpu0_state_obj.dpu.pa_ipv4.clone(), vdpu1_state_obj.dpu.pa_ipv4.clone(), ]), - monitoring: None, + monitoring: Some("custom_bfd".into()), primary: Some(vec![vdpu0_state_obj.dpu.pa_ipv4.clone()]), rx_monitor_timer: global_cfg.dpu_bfd_probe_interval_in_ms, tx_monitor_timer: global_cfg.dpu_bfd_probe_interval_in_ms, @@ -657,7 +657,7 @@ mod test { vdpu0_state_obj.dpu.pa_ipv4.clone(), vdpu1_state_obj.dpu.pa_ipv4.clone(), ]), - monitoring: None, + monitoring: Some("custom_bfd".into()), primary: Some(vec![vdpu0_state_obj.dpu.npu_ipv4.clone()]), rx_monitor_timer: global_cfg.dpu_bfd_probe_interval_in_ms, tx_monitor_timer: global_cfg.dpu_bfd_probe_interval_in_ms, From 7e54b9a0c71ff57f60fc4e647a847b1d338f9cc3 Mon Sep 17 00:00:00 2001 From: yue-fred-gao <132678244+yue-fred-gao@users.noreply.github.com> Date: Mon, 3 Nov 2025 23:06:56 -0600 Subject: [PATCH 05/10] Fix issue #118: use hostname to build service path (#122) ### why swbus-cli show swbusd route is not working. The issue is introduced by PR https://github.com/sonic-net/sonic-dash-ha/pull/108. In the PR, it compares the dest sp of received ManagementRequest to "my_service_path". the request is processed if they are same. In a setup with both ipv4 and ipv6 endpoints, my_service_path takes one of them. If the dest sp of the request picks the other one, they don't match so the request will be dropped. ### what this PR does 1. we use hostname in DEVICE_META|localhost to build my service path. 2. remote service path is exchanged when connection is established. Client sends its own service path in the request and server replies with its one when connection is created. --- crates/swbus-cli/src/main.rs | 11 +- crates/swbus-config/src/lib.rs | 120 ++++++------------ crates/swbus-core/src/mux/conn.rs | 52 ++++++-- crates/swbus-core/src/mux/conn_info.rs | 24 ++-- crates/swbus-core/src/mux/conn_store.rs | 49 +++---- crates/swbus-core/src/mux/conn_worker.rs | 12 +- crates/swbus-core/src/mux/multiplexer.rs | 64 ++++++++-- crates/swbus-core/src/mux/nexthop.rs | 6 +- crates/swbus-core/src/mux/route_annoucer.rs | 12 +- crates/swbus-core/src/mux/service.rs | 10 +- crates/swbus-core/tests/data/b2b/topo.json | 2 - .../tests/data/inter-cluster/topo.json | 8 +- crates/swbus-proto/src/swbus.rs | 2 + 13 files changed, 196 insertions(+), 176 deletions(-) diff --git a/crates/swbus-cli/src/main.rs b/crates/swbus-cli/src/main.rs index 9b33fb8..2504390 100644 --- a/crates/swbus-cli/src/main.rs +++ b/crates/swbus-cli/src/main.rs @@ -262,7 +262,6 @@ mod tests { #[test] fn test_get_swbus_config() { let slot = 1; - let npu_ipv4 = "10.0.1.0"; let _ = Redis::start_config_db(); // Mock the config database with a sample configuration @@ -271,15 +270,7 @@ mod tests { std::env::set_var("DEV", format!("dpu{slot}")); let config = get_swbus_config(None).unwrap(); assert_eq!(config.endpoint.to_string(), format!("{}:{}", "10.0.1.0", 23606 + slot)); - let expected_sp = ServicePath::with_node( - "region-a", - "cluster-a", - &format!("{npu_ipv4}-dpu{slot}"), - "", - "", - "", - "", - ); + let expected_sp = ServicePath::with_node("region-a", "cluster-a", &format!("host1-dpu{slot}"), "", "", "", ""); assert!(config .routes .iter() diff --git a/crates/swbus-config/src/lib.rs b/crates/swbus-config/src/lib.rs index c2f7925..50f6e78 100644 --- a/crates/swbus-config/src/lib.rs +++ b/crates/swbus-config/src/lib.rs @@ -31,8 +31,6 @@ pub struct RouteConfig { #[derive(Debug, Clone, Deserialize, PartialEq, Eq)] pub struct PeerConfig { - #[serde(deserialize_with = "deserialize_service_path")] - pub id: ServicePath, pub endpoint: SocketAddr, pub conn_type: ConnectionType, } @@ -126,37 +124,29 @@ pub struct ConfigDBDeviceMetadataEntry { #[serde(rename = "type")] pub device_type: Option, pub sub_type: Option, + pub hostname: Option, } #[instrument] -fn route_config_from_dpu_entry(dpu_entry: &ConfigDBDPUEntry, region: &str, cluster: &str) -> Result> { +fn route_config_from_dpu_entry( + dpu_entry: &ConfigDBDPUEntry, + region: &str, + cluster: &str, + hostname: &str, +) -> Vec { let mut routes = Vec::new(); let dpu_id = dpu_entry.dpu_id; debug!("Collecting routes for local dpu{}", dpu_id); - if let Some(npu_ipv4) = dpu_entry.npu_ipv4.as_ref() { - let sp = ServicePath::with_node(region, cluster, &format!("{npu_ipv4}-dpu{dpu_id}"), "", "", "", ""); - routes.push(RouteConfig { - key: sp, - scope: RouteScope::InCluster, - }); - } - - if let Some(npu_ipv6) = dpu_entry.npu_ipv6.as_ref() { - let sp = ServicePath::with_node(region, cluster, &format!("{npu_ipv6}-dpu{dpu_id}"), "", "", "", ""); - routes.push(RouteConfig { - key: sp, - scope: RouteScope::InCluster, - }); - } - - if routes.is_empty() { - SwbusConfigError::InvalidConfig(format!("No valid routes found in local dpu{dpu_id}")); - } + let sp = ServicePath::with_node(region, cluster, &format!("{hostname}-dpu{dpu_id}"), "", "", "", ""); + routes.push(RouteConfig { + key: sp, + scope: RouteScope::InCluster, + }); debug!("Routes collected: {:?}", &routes); - Ok(routes) + routes } #[instrument] @@ -184,29 +174,22 @@ fn peer_config_from_dpu_entry( "swbusd_port is not found in dpu {key} is not found" )))?; - let dpu_id = dpu_entry.dpu_id; - + // dual stack is not supported. Either all ipv4 or all ipv6. if let Some(npu_ipv4) = dpu_entry.npu_ipv4 { let npu_ipv4 = npu_ipv4 .parse::() .map_err(|_| SwbusConfigError::InvalidConfig(format!("Invalid IPv4 address: {npu_ipv4}")))?; - let sp = ServicePath::with_node(region, cluster, &format!("{npu_ipv4}-dpu{dpu_id}"), "", "", "", ""); peers.push(PeerConfig { - id: sp, endpoint: SocketAddr::new(IpAddr::V4(npu_ipv4), swbusd_port), conn_type: ConnectionType::InCluster, }); - } - - if let Some(npu_ipv6) = dpu_entry.npu_ipv6 { + } else if let Some(npu_ipv6) = dpu_entry.npu_ipv6 { let npu_ipv6 = npu_ipv6 .parse::() .map_err(|_| SwbusConfigError::InvalidConfig(format!("Invalid IPv6 address: {npu_ipv6}")))?; - let sp = ServicePath::with_node(region, cluster, &format!("{npu_ipv6}-dpu{dpu_id}"), "", "", "", ""); peers.push(PeerConfig { - id: sp, endpoint: SocketAddr::new(IpAddr::V6(npu_ipv6), swbusd_port), conn_type: ConnectionType::InCluster, }); @@ -221,7 +204,7 @@ fn peer_config_from_dpu_entry( } #[instrument] -fn get_device_info() -> Result<(String, String)> { +fn get_device_info() -> Result<(String, String, String)> { let db = DbConnector::new_named(CONFIG_DB, false, 0).map_err(|e| ("connecting to config_db".into(), e))?; let table = Table::new(db, "DEVICE_METADATA").map_err(|e| ("opening DEVICE_METADATA table".into(), e))?; @@ -235,9 +218,12 @@ fn get_device_info() -> Result<(String, String)> { "cluster not found in DEVICE_METADATA table".into(), ))?; - debug!("Region: {}, Cluster: {}", region, cluster); + let hostname = metadata.hostname.ok_or(SwbusConfigError::InvalidConfig( + "hostname not found in DEVICE_METADATA table".into(), + ))?; + debug!("Region: {}, Cluster: {}, Hostname: {}", region, cluster, hostname); - Ok((region, cluster)) + Ok((region, cluster, hostname)) } #[instrument] @@ -288,7 +274,7 @@ pub fn swbus_config_from_db(dpu_id: u32) -> Result { let mut myroutes: Option> = None; let mut myendpoint: Option = None; - let (region, cluster) = get_device_info()?; + let (region, cluster, hostname) = get_device_info()?; // Get the Loopback0 address let (my_ipv4, my_ipv6) = get_loopback_address(0)?; @@ -314,10 +300,7 @@ pub fn swbus_config_from_db(dpu_id: u32) -> Result { "swbusd_port is not found in dpu{dpu_id} is not found" )))?; - myroutes = Some(route_config_from_dpu_entry(&dpu, ®ion, &cluster).map_err(|e| { - error!("Failed to collect routes for dpu{dpu_id}: {e}"); - e - })?); + myroutes = Some(route_config_from_dpu_entry(&dpu, ®ion, &cluster, &hostname)); if let Some(npu_ipv4) = dpu.npu_ipv4 { myendpoint = Some(SocketAddr::new(std::net::IpAddr::V4(npu_ipv4), swbusd_port)); @@ -405,6 +388,7 @@ pub mod test_utils { cluster: Some("cluster-a".to_string()), device_type: Some("SpineRouter".to_string()), sub_type: Some("SmartSwitch".to_string()), + hostname: Some("host1".to_string()), }; to_table(&metadata, &table, "localhost").unwrap(); @@ -489,47 +473,25 @@ mod tests { let mut config_fromdb = swbus_config_from_db(0).unwrap(); - assert_eq!(config_fromdb.routes.len(), 2); - assert_eq!(config_fromdb.peers.len(), 10); + assert_eq!(config_fromdb.routes.len(), 1); + assert_eq!(config_fromdb.peers.len(), 5); // create equivalent config in yaml let yaml_content = r#" endpoint: "10.0.1.0:23606" routes: - - key: "region-a.cluster-a.10.0.1.0-dpu0" - scope: "InCluster" - - key: "region-a.cluster-a.2001:db8:1::-dpu0" + - key: "region-a.cluster-a.host1-dpu0" scope: "InCluster" peers: - - id: "region-a.cluster-a.10.0.1.0-dpu1" - endpoint: "10.0.1.0:23607" - conn_type: "InCluster" - - id: "region-a.cluster-a.2001:db8:1::-dpu1" - endpoint: "[2001:db8:1::]:23607" - conn_type: "InCluster" - - id: "region-a.cluster-a.10.0.1.1-dpu0" - endpoint: "10.0.1.1:23606" - conn_type: "InCluster" - - id: "region-a.cluster-a.2001:db8:1::1-dpu0" - endpoint: "[2001:db8:1::1]:23606" + - endpoint: "10.0.1.0:23607" conn_type: "InCluster" - - id: "region-a.cluster-a.10.0.1.1-dpu1" - endpoint: "10.0.1.1:23607" + - endpoint: "10.0.1.1:23606" conn_type: "InCluster" - - id: "region-a.cluster-a.2001:db8:1::1-dpu1" - endpoint: "[2001:db8:1::1]:23607" + - endpoint: "10.0.1.1:23607" conn_type: "InCluster" - - id: "region-a.cluster-a.10.0.1.2-dpu0" - endpoint: "10.0.1.2:23606" + - endpoint: "10.0.1.2:23606" conn_type: "InCluster" - - id: "region-a.cluster-a.2001:db8:1::2-dpu0" - endpoint: "[2001:db8:1::2]:23606" - conn_type: "InCluster" - - id: "region-a.cluster-a.10.0.1.2-dpu1" - endpoint: "10.0.1.2:23607" - conn_type: "InCluster" - - id: "region-a.cluster-a.2001:db8:1::2-dpu1" - endpoint: "[2001:db8:1::2]:23607" + - endpoint: "10.0.1.2:23607" conn_type: "InCluster" "#; @@ -543,9 +505,9 @@ mod tests { expected.npu_ipv6 = Some(Ipv6Addr::from_str("2001:db8:1::").unwrap()); // sort before compare config_fromdb.routes.sort_by(|a, b| a.key.cmp(&b.key)); - config_fromdb.peers.sort_by(|a, b| a.id.cmp(&b.id)); + config_fromdb.peers.sort_by(|a, b| a.endpoint.cmp(&b.endpoint)); expected.routes.sort_by(|a, b| a.key.cmp(&b.key)); - expected.peers.sort_by(|a, b| a.id.cmp(&b.id)); + expected.peers.sort_by(|a, b| a.endpoint.cmp(&b.endpoint)); assert_eq!(config_fromdb, expected); cleanup_configdb_for_test(); @@ -559,11 +521,9 @@ mod tests { - key: "region-a.cluster-a.10.0.0.1-dpu0" scope: "InCluster" peers: - - id: "region-a.cluster-a.10.0.0.2-dpu0" - endpoint: "10.0.0.2:8000" + - endpoint: "10.0.0.2:8000" conn_type: "InCluster" - - id: "region-a.cluster-a.10.0.0.3-dpu0" - endpoint: "10.0.0.3:8000" + - endpoint: "10.0.0.3:8000" conn_type: "InCluster" "#; @@ -590,19 +550,11 @@ mod tests { ); assert_eq!(config.routes[0].scope, RouteScope::InCluster); - assert_eq!( - config.peers[0].id, - ServicePath::from_string("region-a.cluster-a.10.0.0.2-dpu0").unwrap() - ); assert_eq!( config.peers[0].endpoint, "10.0.0.2:8000".parse().expect("not expecting error") ); assert_eq!(config.peers[0].conn_type, ConnectionType::InCluster); - assert_eq!( - config.peers[1].id, - ServicePath::from_string("region-a.cluster-a.10.0.0.3-dpu0").unwrap() - ); assert_eq!( config.peers[1].endpoint, "10.0.0.3:8000".parse().expect("not expecting error") diff --git a/crates/swbus-core/src/mux/conn.rs b/crates/swbus-core/src/mux/conn.rs index 8a996bf..0714c67 100644 --- a/crates/swbus-core/src/mux/conn.rs +++ b/crates/swbus-core/src/mux/conn.rs @@ -62,7 +62,7 @@ impl SwbusConn { // Client-side connection factory and task entry impl SwbusConn { pub async fn connect( - conn_info: Arc, + conn_info: &SwbusConnInfo, mux: Arc, conn_store: Arc, ) -> Result { @@ -85,16 +85,16 @@ impl SwbusConn { } async fn start_client_worker_task( - conn_info: Arc, + conn_info: &SwbusConnInfo, mut client: SwbusServiceClient, mux: Arc, conn_store: Arc, ) -> Result { let (send_queue_tx, send_queue_rx) = mpsc::channel(16); - let mut conn = SwbusConn::new(&conn_info, send_queue_tx); - let request_stream = ReceiverStream::new(send_queue_rx) - .map(|result| result.expect("Not expecting grpc client adding messages with error status")); + let request_stream = ReceiverStream::new(send_queue_rx).map(|result: Result| { + result.expect("Not expecting grpc client adding messages with error status") + }); let mut stream_message_request = Request::new(request_stream); @@ -111,8 +111,43 @@ impl SwbusConn { MetadataValue::from_str(conn_info.connection_type().as_str_name()).unwrap(), ); - let incoming_stream = match client.stream_messages(stream_message_request).await { - Ok(response) => response.into_inner(), + let (incoming_stream, conn_info_for_worker) = match client.stream_messages(stream_message_request).await { + Ok(response) => { + // Extract server service path from response metadata and update remote_service_path + let server_service_path = match response.metadata().get(SWBUS_SERVER_SERVICE_PATH) { + Some(path) => match ServicePath::from_string(path.to_str().unwrap()) { + Ok(service_path) => { + info!("Received server service path: {}", service_path.to_string()); + service_path + } + Err(e) => { + error!("Failed to parse server service path: {:?}", e); + return Err(SwbusError::connection( + SwbusErrorCode::InvalidHeader, + io::Error::new( + io::ErrorKind::InvalidData, + format!("Invalid server service path: {:?}", e), + ), + )); + } + }, + None => { + error!("Server service path not found in response metadata"); + return Err(SwbusError::connection( + SwbusErrorCode::InvalidHeader, + io::Error::new( + io::ErrorKind::InvalidData, + "Server service path not found in response metadata", + ), + )); + } + }; + + // Update conn_info's remote_service_path with actual server service path + let updated_conn_info = Arc::new(conn_info.clone().with_remote_service_path(server_service_path)); + + (response.into_inner(), updated_conn_info) + } Err(e) => { error!("Failed to establish message streaming: {}.", e); return Err(SwbusError::connection( @@ -121,8 +156,7 @@ impl SwbusConn { )); } }; - - let conn_info_for_worker = conn.info().clone(); + let mut conn = SwbusConn::new(&conn_info_for_worker, send_queue_tx); let shutdown_ct_for_worker = conn.shutdown_ct.clone(); let worker_task = tokio::spawn(async move { diff --git a/crates/swbus-core/src/mux/conn_info.rs b/crates/swbus-core/src/mux/conn_info.rs index 8e93f53..38b2ffe 100644 --- a/crates/swbus-core/src/mux/conn_info.rs +++ b/crates/swbus-core/src/mux/conn_info.rs @@ -24,21 +24,17 @@ pub struct SwbusConnInfo { connection_type: ConnectionType, #[getset(get = "pub")] - remote_service_path: ServicePath, + remote_service_path: Option, } impl SwbusConnInfo { - pub fn new_client( - conn_type: ConnectionType, - remote_addr: SocketAddr, - remote_service_path: ServicePath, - ) -> SwbusConnInfo { + pub fn new_client(conn_type: ConnectionType, remote_addr: SocketAddr) -> SwbusConnInfo { SwbusConnInfo { id: format!("swbs-to://{}:{}", remote_addr.ip(), remote_addr.port()), mode: SwbusConnMode::Client, remote_addr, connection_type: conn_type, - remote_service_path, + remote_service_path: None, } } @@ -52,9 +48,14 @@ impl SwbusConnInfo { mode: SwbusConnMode::Server, remote_addr, connection_type: conn_type, - remote_service_path, + remote_service_path: Some(remote_service_path), } } + + pub fn with_remote_service_path(mut self, remote_service_path: ServicePath) -> Self { + self.remote_service_path = Some(remote_service_path); + self + } } #[cfg(test)] @@ -65,14 +66,13 @@ mod tests { #[test] fn new_client_conn_info_should_succeed() { let remote_addr = "127.0.0.1:8080".parse().unwrap(); - let remote_service_path = ServicePath::from_string("region-a.cluster-a.10.0.0.1-dpu0").unwrap(); - let conn_info = SwbusConnInfo::new_client(ConnectionType::InCluster, remote_addr, remote_service_path.clone()); + let conn_info = SwbusConnInfo::new_client(ConnectionType::InCluster, remote_addr); assert_eq!(conn_info.id(), "swbs-to://127.0.0.1:8080"); assert_eq!(conn_info.mode(), SwbusConnMode::Client); assert_eq!(conn_info.remote_addr(), remote_addr); assert_eq!(conn_info.connection_type(), ConnectionType::InCluster); - assert_eq!(conn_info.remote_service_path(), &remote_service_path); + assert_eq!(conn_info.remote_service_path(), &None); } #[test] @@ -85,6 +85,6 @@ mod tests { assert_eq!(conn_info.mode(), SwbusConnMode::Server); assert_eq!(conn_info.remote_addr(), remote_addr); assert_eq!(conn_info.connection_type(), ConnectionType::InRegion); - assert_eq!(conn_info.remote_service_path(), &remote_service_path); + assert_eq!(conn_info.remote_service_path(), &Some(remote_service_path)); } } diff --git a/crates/swbus-core/src/mux/conn_store.rs b/crates/swbus-core/src/mux/conn_store.rs index 8604050..15d49aa 100644 --- a/crates/swbus-core/src/mux/conn_store.rs +++ b/crates/swbus-core/src/mux/conn_store.rs @@ -17,7 +17,7 @@ enum ConnTracker { pub struct SwbusConnStore { mux: Arc, - connections: DashMap, ConnTracker>, + connections: DashMap, } impl SwbusConnStore { @@ -29,8 +29,7 @@ impl SwbusConnStore { } #[instrument(skip(self, conn_info), fields(conn_id=conn_info.id()))] - fn start_connect_task(self: &Arc, conn_info: Arc, reconnect: bool) { - let conn_info_clone = conn_info.clone(); + fn start_connect_task(self: &Arc, conn_info: &SwbusConnInfo, reconnect: bool) { info!("Starting connection task to the peer"); let retry_interval = match reconnect { true => Duration::from_millis(1), @@ -42,13 +41,14 @@ impl SwbusConnStore { let token = CancellationToken::new(); let child_token = token.clone(); + let conn_info_clone = conn_info.clone(); tokio::spawn( async move { loop { if child_token.is_cancelled() { return; } - match SwbusConn::connect(conn_info.clone(), mux_clone.clone(), conn_store.clone()).await { + match SwbusConn::connect(&conn_info_clone, mux_clone.clone(), conn_store.clone()).await { Ok(conn) => { info!("Successfully connect to the peer"); // register the new connection and update the route table @@ -63,21 +63,18 @@ impl SwbusConnStore { } .instrument(current_span.clone()), ); - self.connections.insert(conn_info_clone, ConnTracker::Task(token)); + self.connections + .insert(conn_info.id().to_string(), ConnTracker::Task(token)); } pub fn add_peer(self: &Arc, peer: PeerConfig) { - let conn_info = Arc::new(SwbusConnInfo::new_client( - peer.conn_type, - peer.endpoint, - peer.id.clone(), - )); - self.start_connect_task(conn_info, false); + let conn_info = SwbusConnInfo::new_client(peer.conn_type, peer.endpoint); + self.start_connect_task(&conn_info, false); } - pub fn conn_lost(self: &Arc, conn_info: Arc) { + pub fn conn_lost(self: &Arc, conn_info: &SwbusConnInfo) { // First, we remove the connection from the connection table. - self.connections.remove(&conn_info); + self.connections.remove(conn_info.id()); // If connection is client mode, we start a new connection task. if conn_info.mode() == SwbusConnMode::Client { @@ -88,7 +85,7 @@ impl SwbusConnStore { pub fn conn_established(&self, conn: SwbusConn) { self.mux.register(conn.info(), conn.new_proxy()); self.connections - .insert(conn.info().clone(), ConnTracker::SwbusConn(conn)); + .insert(conn.info().id().to_string(), ConnTracker::SwbusConn(conn)); } pub async fn shutdown(&self) { @@ -124,7 +121,6 @@ mod tests { let peer_config = PeerConfig { conn_type: ConnectionType::InNode, endpoint: "127.0.0.1:8080".to_string().parse().unwrap(), - id: ServicePath::from_string("region-a.cluster-a.10.0.0.2-dpu0").unwrap(), }; let route_config = RouteConfig { key: ServicePath::from_string("region-a.cluster-a.10.0.0.1-dpu0").unwrap(), @@ -137,7 +133,7 @@ mod tests { conn_store.add_peer(peer_config); assert!(conn_store.connections.iter().any(|entry| { - entry.key().id() == "swbs-to://127.0.0.1:8080" && matches!(entry.value(), ConnTracker::Task(_)) + entry.key() == "swbs-to://127.0.0.1:8080" && matches!(entry.value(), ConnTracker::Task(_)) })); } @@ -150,15 +146,11 @@ mod tests { let mux = Arc::new(SwbusMultiplexer::new(vec![route_config])); let conn_store = Arc::new(SwbusConnStore::new(mux.clone())); - let conn_info = Arc::new(SwbusConnInfo::new_client( - ConnectionType::InCluster, - "127.0.0.1:8080".parse().unwrap(), - ServicePath::from_string("regiona.clustera.10.0.0.2-dpu0").unwrap(), - )); - conn_store.conn_lost(conn_info.clone()); + let conn_info = SwbusConnInfo::new_client(ConnectionType::InCluster, "127.0.0.1:8080".parse().unwrap()); + conn_store.conn_lost(&conn_info); assert!(conn_store.connections.iter().any(|entry| { - entry.key().id() == "swbs-to://127.0.0.1:8080" && matches!(entry.value(), ConnTracker::Task(_)) + entry.key() == "swbs-to://127.0.0.1:8080" && matches!(entry.value(), ConnTracker::Task(_)) })); } @@ -172,11 +164,10 @@ mod tests { let mux = Arc::new(SwbusMultiplexer::new(vec![route_config])); let conn_store = Arc::new(SwbusConnStore::new(mux.clone())); - let conn_info = Arc::new(SwbusConnInfo::new_client( - ConnectionType::InCluster, - "127.0.0.1:8080".parse().unwrap(), - ServicePath::from_string("regiona.clustera.10.0.0.2-dpu0").unwrap(), - )); + let mut conn_info = SwbusConnInfo::new_client(ConnectionType::InCluster, "127.0.0.1:8080".parse().unwrap()); + conn_info = + conn_info.with_remote_service_path(ServicePath::from_string("region-a.cluster-a.10.0.0.2-dpu0").unwrap()); + let conn_info = Arc::new(conn_info); let (send_queue_tx, _) = mpsc::channel(16); let conn = SwbusConn::new(&conn_info, send_queue_tx); conn_store.conn_established(conn); @@ -184,6 +175,6 @@ mod tests { assert!(conn_store .connections .iter() - .any(|entry| entry.key().id() == conn_info.id() && matches!(entry.value(), ConnTracker::SwbusConn(_)))); + .any(|entry| entry.key() == conn_info.id() && matches!(entry.value(), ConnTracker::SwbusConn(_)))); } } diff --git a/crates/swbus-core/src/mux/conn_worker.rs b/crates/swbus-core/src/mux/conn_worker.rs index 75babb5..888211e 100644 --- a/crates/swbus-core/src/mux/conn_worker.rs +++ b/crates/swbus-core/src/mux/conn_worker.rs @@ -36,6 +36,10 @@ where mux: Arc, conn_store: Arc, ) -> Self { + assert!( + info.remote_service_path().is_some(), + "remote_service_path must be set before creating SwbusConnWorker" + ); Self { info, shutdown_ct, @@ -59,7 +63,7 @@ where self.unregister_from_mux()?; if result.is_err() { info!("Reporting connection lost."); - self.conn_store.conn_lost(self.info.clone()); + self.conn_store.conn_lost(&self.info); } result } @@ -270,7 +274,7 @@ mod tests { let mux = Arc::new(SwbusMultiplexer::new(vec![route_config])); let conn_store = Arc::new(SwbusConnStore::new(mux.clone())); - let conn_info = Arc::new(SwbusConnInfo::new_client( + let conn_info = Arc::new(SwbusConnInfo::new_server( ConnectionType::InCluster, "127.0.0.1:8080".parse().unwrap(), ServicePath::from_string("regiona.clustera.10.0.0.2-dpu0").unwrap(), @@ -307,7 +311,7 @@ mod tests { let mux = Arc::new(SwbusMultiplexer::new(vec![route_config])); let conn_store = Arc::new(SwbusConnStore::new(mux.clone())); - let conn_info = Arc::new(SwbusConnInfo::new_client( + let conn_info = Arc::new(SwbusConnInfo::new_server( ConnectionType::InCluster, "127.0.0.1:8080".parse().unwrap(), ServicePath::from_string("regiona.clustera.10.0.0.2-dpu0").unwrap(), @@ -334,7 +338,7 @@ mod tests { let mux = Arc::new(SwbusMultiplexer::new(vec![route_config])); let conn_store = Arc::new(SwbusConnStore::new(mux.clone())); - let conn_info = Arc::new(SwbusConnInfo::new_client( + let conn_info = Arc::new(SwbusConnInfo::new_server( ConnectionType::InCluster, "127.0.0.1:8080".parse().unwrap(), ServicePath::from_string("regiona.clustera.10.0.0.2-dpu0").unwrap(), diff --git a/crates/swbus-core/src/mux/multiplexer.rs b/crates/swbus-core/src/mux/multiplexer.rs index 2fd463e..b0fa535 100644 --- a/crates/swbus-core/src/mux/multiplexer.rs +++ b/crates/swbus-core/src/mux/multiplexer.rs @@ -153,19 +153,35 @@ impl SwbusMultiplexer { /// get route from conn_info based on connection type. This is a direct route which means it is to a immediate nexthop (1 hop away). fn route_from_conn(&self, conn_info: &Arc) -> String { + let remote_sp = conn_info + .remote_service_path() + .as_ref() + .expect("remote_service_path should be set"); match conn_info.connection_type() { // direct route Global, InRegion, InCluster connection type is always to node level ConnectionType::Global | ConnectionType::InRegion | ConnectionType::InCluster => { - conn_info.remote_service_path().to_incluster_prefix() + remote_sp.to_incluster_prefix() } // both Client (for CLI) and InNode route by service prefix. IOW, service prefix uniquely // identify the endpoint to the client (CLI or hamgrd) - ConnectionType::InNode | ConnectionType::Client => conn_info.remote_service_path().to_service_prefix(), + ConnectionType::InNode | ConnectionType::Client => remote_sp.to_service_prefix(), } } pub(crate) fn register(&self, conn_info: &Arc, proxy: SwbusConnProxy) { // Update the route table. + let remote_sp = conn_info + .remote_service_path() + .as_ref() + .expect("remote_service_path should be set"); + if self.my_routes.contains_key(remote_sp) { + error!( + "Conflicting service path to my routes detected from connection: {}", + conn_info.id() + ); + return; + } + let direct_route = self.route_from_conn(conn_info); let nexthop = SwbusNextHop::new_remote(conn_info.clone(), proxy, 1); @@ -284,7 +300,7 @@ impl SwbusMultiplexer { SwbusError::internal( SwbusErrorCode::Fail, format!( - "Receive route announcement from {} but route to the connection {} is not found ", + "Receive route announcement from {:?} but route to the connection {} is not found ", conn_info.remote_service_path(), direct_route ), @@ -302,7 +318,7 @@ impl SwbusMultiplexer { Err(SwbusError::internal( SwbusErrorCode::Fail, format!( - "Receive route announcement from {} but nh of the route is not from same connection: {}", + "Receive route announcement from {:?} but nh of the route is not from same connection: {}", conn_info.remote_service_path(), direct_route ), @@ -337,7 +353,7 @@ impl SwbusMultiplexer { conn_info: &Arc, ) -> Result<()> { debug!( - "Begin processing route announcement from {}", + "Begin processing route announcement from {:?}", conn_info.remote_service_path() ); @@ -350,7 +366,7 @@ impl SwbusMultiplexer { for entry in routes.entries.into_iter() { if entry.service_path.is_none() { error!( - "Received route announcement with missing service path from {}", + "Received route announcement with missing service path from {:?}", conn_info.remote_service_path() ); continue; @@ -406,7 +422,7 @@ impl SwbusMultiplexer { } *old_routes = new_routes; debug!( - "Finished processing route announcement from {}", + "Finished processing route announcement from {:?}", conn_info.remote_service_path() ); Ok(()) @@ -547,7 +563,7 @@ impl SwbusMultiplexer { ), hop_count: nh.hop_count(), nh_id: nh.conn_info().as_ref().unwrap().id().to_string(), - nh_service_path: Some(nh.conn_info().as_ref().unwrap().remote_service_path().clone()), + nh_service_path: nh.conn_info().as_ref().unwrap().remote_service_path().clone(), route_scope: ServicePath::from_string(entry.key()).unwrap().route_scope() as i32, }) .collect::>() @@ -680,7 +696,7 @@ pub mod test_utils { sp: &str, nh_endpoint: &str, ) -> (SwbusConn, mpsc::Receiver>) { - let conn_info = Arc::new(SwbusConnInfo::new_client( + let conn_info = Arc::new(SwbusConnInfo::new_server( conn_type, nh_endpoint.parse().unwrap(), ServicePath::from_string(sp).unwrap(), @@ -701,7 +717,7 @@ pub mod test_utils { service_path: Some(ServicePath::from_string(route_key).unwrap()), hop_count, nh_id: conn.info().id().to_string(), - nh_service_path: Some(conn.info().remote_service_path().clone()), + nh_service_path: conn.info().remote_service_path().clone(), route_scope: ServicePath::from_string(route_key).unwrap().route_scope() as i32, }) } else { @@ -1312,7 +1328,7 @@ mod tests { add_route(&mux, "region-a.cluster-a.10.0.0.1-dpu0", 1, &conn1).unwrap(); // add the same route with a different connection - let conn_info = Arc::new(SwbusConnInfo::new_client( + let conn_info = Arc::new(SwbusConnInfo::new_server( ConnectionType::InCluster, "127.0.0.1:8081".parse().unwrap(), ServicePath::from_string("region-a.cluster-a.10.0.0.1-dpu0").unwrap(), @@ -1501,4 +1517,30 @@ mod tests { ); assert!(!mux.connections.contains(conn1.info())); } + + #[test] + fn test_register_with_conflict_sp() { + // create mux + // create a mux with my routes and add a dummy route announcer + // create a mux with 2 my-routes + let my_route1 = RouteConfig { + key: ServicePath::from_string("region-a.cluster-a.node0").unwrap(), + scope: RouteScope::InCluster, + }; + let mut mux = SwbusMultiplexer::new(vec![my_route1.clone()]); + let (route_announce_task_tx, _) = mpsc::channel(16); + mux.set_route_announcer(route_announce_task_tx, CancellationToken::new()); + let mux = Arc::new(mux); + + // register a connection + let (conn1, _) = new_conn_for_test(ConnectionType::InCluster, "region-a.cluster-a.node0"); + mux.register(conn1.info(), conn1.new_proxy()); + let route_key = mux.route_from_conn(conn1.info()); + // make the conflicting route is not added + if let Some(nh_set) = mux.routes.get(&route_key) { + assert!(!nh_set + .iter() + .any(|nh| nh.nh_type() == NextHopType::Remote && nh.conn_info().as_ref().unwrap() == conn1.info())); + }; + } } diff --git a/crates/swbus-core/src/mux/nexthop.rs b/crates/swbus-core/src/mux/nexthop.rs index e121264..f8f9768 100644 --- a/crates/swbus-core/src/mux/nexthop.rs +++ b/crates/swbus-core/src/mux/nexthop.rs @@ -195,7 +195,7 @@ mod tests { #[tokio::test] async fn test_new_remote() { - let conn_info = Arc::new(SwbusConnInfo::new_client( + let conn_info = Arc::new(SwbusConnInfo::new_server( ConnectionType::InCluster, "127.0.0.1:8080".parse().unwrap(), ServicePath::from_string("regiona.clustera.10.0.0.2-dpu0").unwrap(), @@ -280,7 +280,7 @@ mod tests { #[tokio::test] async fn test_queue_message_remote_ttl_expired() { - let conn_info = Arc::new(SwbusConnInfo::new_client( + let conn_info = Arc::new(SwbusConnInfo::new_server( ConnectionType::InCluster, "127.0.0.1:8080".parse().unwrap(), ServicePath::from_string("regiona.clustera.10.0.0.2-dpu0").unwrap(), @@ -328,7 +328,7 @@ mod tests { #[test] fn test_clone_with_hop_count() { - let conn_info = Arc::new(SwbusConnInfo::new_client( + let conn_info = Arc::new(SwbusConnInfo::new_server( ConnectionType::InCluster, "127.0.0.1:8080".parse().unwrap(), ServicePath::from_string("regiona.clustera.10.0.0.2-dpu0").unwrap(), diff --git a/crates/swbus-core/src/mux/route_annoucer.rs b/crates/swbus-core/src/mux/route_annoucer.rs index d30ccb2..2ffe75e 100644 --- a/crates/swbus-core/src/mux/route_annoucer.rs +++ b/crates/swbus-core/src/mux/route_annoucer.rs @@ -29,7 +29,7 @@ impl std::fmt::Display for RouteAnnounceTask { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!( f, - "trigger: {:?}, conn_info: {}", + "trigger: {:?}, conn_info: {:?}", self.trigger, self.conn_info.remote_service_path() ) @@ -88,7 +88,7 @@ impl RouteAnnouncer { .await .map_err(|e| { error!( - "Failed to send route announcement to {}: {}", + "Failed to send route announcement to {:?}: {}", conn_info.remote_service_path(), e ); @@ -98,7 +98,11 @@ impl RouteAnnouncer { #[instrument(name = "send_route_announcement", level = "debug", skip_all)] async fn send_route_announcement(&self, conn_info: &SwbusConnInfo, routes: &RouteEntries) -> Result<()> { - let dest_sp = conn_info.remote_service_path().clone(); + let dest_sp = conn_info + .remote_service_path() + .as_ref() + .expect("remote_service_path should be set") + .clone(); let mut msg = SwbusMessage { header: Some(SwbusMessageHeader::new( self.mux.get_my_service_path().clone(), @@ -110,7 +114,7 @@ impl RouteAnnouncer { // count itself as 1 hop. The message is only meant for direct neighbors msg.header.as_mut().unwrap().ttl = 2; debug!( - "Sending route announcement to {}, conn_info {:?}, message {:?}", + "Sending route announcement to {:?}, conn_info {:?}, message {:?}", conn_info.remote_service_path(), conn_info, &msg diff --git a/crates/swbus-core/src/mux/service.rs b/crates/swbus-core/src/mux/service.rs index 22715f9..07b4d9d 100644 --- a/crates/swbus-core/src/mux/service.rs +++ b/crates/swbus-core/src/mux/service.rs @@ -173,6 +173,14 @@ impl SwbusService for SwbusServiceHost { .await; self.conn_store.as_ref().unwrap().conn_established(conn); let out_stream = ReceiverStream::new(out_rx); - Ok(Response::new(Box::pin(out_stream) as Self::StreamMessagesStream)) + + // Send server service path in response metadata + let mut response = Response::new(Box::pin(out_stream) as Self::StreamMessagesStream); + let server_service_path = self.mux.as_ref().unwrap().get_my_service_path().to_string(); + response + .metadata_mut() + .insert(SWBUS_SERVER_SERVICE_PATH, server_service_path.parse().unwrap()); + + Ok(response) } } diff --git a/crates/swbus-core/tests/data/b2b/topo.json b/crates/swbus-core/tests/data/b2b/topo.json index 73089ab..27ef330 100644 --- a/crates/swbus-core/tests/data/b2b/topo.json +++ b/crates/swbus-core/tests/data/b2b/topo.json @@ -11,7 +11,6 @@ ], "peers": [ { - "id": "region-a.cluster-a.10.0.0.2-dpu0", "endpoint": "127.0.0.1:60002", "conn_type": "InCluster" } @@ -27,7 +26,6 @@ ], "peers": [ { - "id": "region-a.cluster-a.10.0.0.1-dpu0", "endpoint": "127.0.0.1:60001", "conn_type": "InCluster" } diff --git a/crates/swbus-core/tests/data/inter-cluster/topo.json b/crates/swbus-core/tests/data/inter-cluster/topo.json index 044fd09..c167ddf 100644 --- a/crates/swbus-core/tests/data/inter-cluster/topo.json +++ b/crates/swbus-core/tests/data/inter-cluster/topo.json @@ -11,7 +11,6 @@ ], "peers": [ { - "id": "region-a.cluster-a.gw", "endpoint": "127.0.0.1:60002", "conn_type": "InCluster" } @@ -31,12 +30,10 @@ ], "peers": [ { - "id": "region-a.cluster-a.node1", "endpoint": "127.0.0.1:60001", "conn_type": "InCluster" }, { - "id": "region-a.cluster-b.gw", "endpoint": "127.0.0.1:60003", "conn_type": "InRegion" } @@ -56,17 +53,15 @@ ], "peers": [ { - "id": "region-a.cluster-a.gw", "endpoint": "127.0.0.1:60002", "conn_type": "InRegion" }, { - "id": "region-a.cluster-b.node1", "endpoint": "127.0.0.1:60004", "conn_type": "InCluster" } ] - }, + }, "swbusd_cluster-b.node1": { "endpoint": "127.0.0.1:60004", "routes": [ @@ -77,7 +72,6 @@ ], "peers": [ { - "id": "region-a.cluster-b.gw", "endpoint": "127.0.0.1:60003", "conn_type": "InCluster" } diff --git a/crates/swbus-proto/src/swbus.rs b/crates/swbus-proto/src/swbus.rs index 3e4e939..92a981e 100644 --- a/crates/swbus-proto/src/swbus.rs +++ b/crates/swbus-proto/src/swbus.rs @@ -7,6 +7,8 @@ use crate::swbus::request_response::ResponseBody; /// Service path attribute in gRPC request meta data pub const SWBUS_CLIENT_SERVICE_PATH: &str = "x-swbus-service-path"; +/// Service path attribute in gRPC response meta data +pub const SWBUS_SERVER_SERVICE_PATH: &str = "x-swbus-server-service-path"; /// Service path scope of the connection pub const SWBUS_CONNECTION_TYPE: &str = "x-swbus-connection-type"; From 83d6e56ed73f124539ea6a22da9fe031991132aa Mon Sep 17 00:00:00 2001 From: dypet Date: Mon, 3 Nov 2025 22:07:47 -0700 Subject: [PATCH 06/10] Add wait for loopback script. (#126) swbusd needs to be started after loopback interface is programmed during startup, adding script to poll for this. --- debian/install | 1 + scripts/wait_for_loopback.py | 128 +++++++++++++++++++++++++++++++++++ 2 files changed, 129 insertions(+) create mode 100644 scripts/wait_for_loopback.py diff --git a/debian/install b/debian/install index 7b3f864..a4624e8 100644 --- a/debian/install +++ b/debian/install @@ -1,3 +1,4 @@ target/release/hamgrd usr/bin target/release/swbusd usr/bin target/release/swbus-cli usr/bin +scripts/wait_for_loopback.py usr/bin diff --git a/scripts/wait_for_loopback.py b/scripts/wait_for_loopback.py new file mode 100644 index 0000000..30ac2d0 --- /dev/null +++ b/scripts/wait_for_loopback.py @@ -0,0 +1,128 @@ +#!/usr/bin/env python3 + +""" + wait_for_loopback.py + + Script to wait for LOOPBACK_INTERFACE to be configured. + + This script polls the loopback interface using ip and ip -6 commands + to check if the interface exists and has IP addresses assigned. + It will continue polling until the interface is found with addresses + or a maximum number of retries is reached. +""" + +import sys +import time +import syslog +import subprocess + +# Configuration +LOOPBACK_INTERFACE = "Loopback0" +MAX_RETRIES = 300 # Maximum number of retries +POLL_INTERVAL = 1 # Poll interval in seconds + + +def log_info(msg): + """Log info message to syslog""" + syslog.syslog(syslog.LOG_INFO, f"wait_for_loopback: {msg}") + + +def log_err(msg): + """Log error message to syslog""" + syslog.syslog(syslog.LOG_ERR, f"wait_for_loopback: {msg}") + + +def log_debug(msg): + """Log debug message to syslog""" + syslog.syslog(syslog.LOG_DEBUG, f"wait_for_loopback: {msg}") + + +def check_loopback_interface(): + """ + Check if LOOPBACK_INTERFACE interface is configured with IP addresses. + + Returns: + bool: True if LOOPBACK_INTERFACE interface exists and has IP addresses, False otherwise. + """ + try: + # Check IPv4 addresses + result_ipv4 = subprocess.run( + ["ip", "-4", "addr", "show", LOOPBACK_INTERFACE], + capture_output=True, + text=True, + timeout=5 + ) + + # Check IPv6 addresses + result_ipv6 = subprocess.run( + ["ip", "-6", "addr", "show", LOOPBACK_INTERFACE], + capture_output=True, + text=True, + timeout=5 + ) + + # Check if interface exists (return code 0) + if result_ipv4.returncode != 0 and result_ipv6.returncode != 0: + log_debug(f"{LOOPBACK_INTERFACE} interface not found") + return False + + # Check if there are any IP addresses configured + has_ipv4 = "inet " in result_ipv4.stdout + has_ipv6 = "inet6 " in result_ipv6.stdout + + if has_ipv4 or has_ipv6: + if has_ipv4: + log_debug(f"Found IPv4 addresses on {LOOPBACK_INTERFACE}") + if has_ipv6: + log_debug(f"Found IPv6 addresses on {LOOPBACK_INTERFACE}") + return True + else: + log_debug(f"{LOOPBACK_INTERFACE} interface exists but has no IP addresses") + return False + + except subprocess.TimeoutExpired: + log_err(f"Timeout checking {LOOPBACK_INTERFACE} interface") + return False + except Exception as e: + log_err(f"Error checking loopback interface: {e}") + return False + + +def wait_for_loopback(): + """ + Main function to wait for LOOPBACK_INTERFACE to be programmed. + + Returns: + int: Exit code (0 for success, 1 for failure) + """ + log_info(f"Wait for {LOOPBACK_INTERFACE} interface...") + + retry_count = 0 + + while retry_count < MAX_RETRIES: + if check_loopback_interface(): + log_info(f"{LOOPBACK_INTERFACE} interface is programmed") + return 0 + + retry_count += 1 + + time.sleep(POLL_INTERVAL) + + log_err(f"{LOOPBACK_INTERFACE} interface was not programmed after {MAX_RETRIES} retries") + return 1 + + +def main(): + """Main entry point""" + syslog.openlog("wait_for_loopback", syslog.LOG_PID) + + try: + exit_code = wait_for_loopback() + sys.exit(exit_code) + except Exception as e: + log_err(f"Unexpected error: {e}") + sys.exit(1) + + +if __name__ == "__main__": + main() From 49fc0ad6547aada8cc168174e96d488a027ba179 Mon Sep 17 00:00:00 2001 From: dypet Date: Mon, 3 Nov 2025 22:08:39 -0700 Subject: [PATCH 07/10] Fix format for protobuf fields in show command. (#127) Fixing format for protobuf messages in show command output to match all other message formatting. --- crates/swbus-cli/src/show/hamgrd/actor.rs | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/crates/swbus-cli/src/show/hamgrd/actor.rs b/crates/swbus-cli/src/show/hamgrd/actor.rs index 5769887..0729fcf 100644 --- a/crates/swbus-cli/src/show/hamgrd/actor.rs +++ b/crates/swbus-cli/src/show/hamgrd/actor.rs @@ -44,6 +44,8 @@ fn unix_secs_to_string(unix_secs: u64) -> String { impl IncomingStateDisplay { fn from_incoming_state((key, state): (&String, &IncomingTableEntry)) -> Self { + let formatted_value = Self::format_message_value(&state.msg.data); + let details = vec![ KeyValue { attribute: "source".to_string(), @@ -63,7 +65,7 @@ impl IncomingStateDisplay { }, KeyValue { attribute: "message/value".to_string(), - value: to_string_pretty(&state.msg.data).unwrap_or("INV".to_string()), + value: formatted_value, }, KeyValue { attribute: "created-time".to_string(), @@ -88,6 +90,25 @@ impl IncomingStateDisplay { details: table, } } + + fn format_message_value(data: &serde_json::Value) -> String { + let formatted = Self::try_format_protobuf_json(data); + formatted.unwrap_or_else(|| to_string_pretty(data).unwrap_or_else(|_| "INV".to_string())) + } + + fn try_format_protobuf_json(data: &serde_json::Value) -> Option { + let obj = data.as_object()?; + let field_values = obj.get("field_values")?.as_object()?; + let json_str = field_values.get("json")?.as_str()?; + + let parsed = serde_json::from_str::(json_str).ok()?; + + let mut new_obj = obj.clone(); + let new_field_values = new_obj.get_mut("field_values")?.as_object_mut()?; + new_field_values.insert("json".to_string(), parsed); + + to_string_pretty(&new_obj).ok() + } } #[derive(Tabled)] From 5a542bd62279785d9303a6c10000f6caa61607c3 Mon Sep 17 00:00:00 2001 From: yue-fred-gao <132678244+yue-fred-gao@users.noreply.github.com> Date: Mon, 3 Nov 2025 23:08:51 -0600 Subject: [PATCH 08/10] Sort the output of show actor command (#128) ### why The order of output of show actor command is not stable. It is better to be stable so user can quickly find the data they need. ### what this PR does sort incoming/internal/outgoing state table by key sort field values in internal table by key --- crates/swbus-cli/src/show/hamgrd/actor.rs | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/crates/swbus-cli/src/show/hamgrd/actor.rs b/crates/swbus-cli/src/show/hamgrd/actor.rs index 0729fcf..cbb7a91 100644 --- a/crates/swbus-cli/src/show/hamgrd/actor.rs +++ b/crates/swbus-cli/src/show/hamgrd/actor.rs @@ -248,7 +248,7 @@ impl InternalStateDisplay { ]; let table_meta = Table::new(table_meta).with(Style::ascii().remove_frame()).to_string(); - let committed_fvs = state + let mut committed_fvs = state .backup_fvs .iter() .map(|(key, value)| KeyValue { @@ -257,6 +257,8 @@ impl InternalStateDisplay { }) .collect::>(); + committed_fvs.sort_by(|a, b| a.attribute.cmp(&b.attribute)); + let committed_fvs = Table::new(committed_fvs) .with(Style::ascii().remove_frame()) .to_string(); @@ -293,11 +295,14 @@ impl ShowCmdHandler for ShowActorCmd { let state: ActorStateDump = serde_json::from_str(result).unwrap(); // convert to table for display - let incoming_state_display = state + let mut incoming_state_display: Vec = state .incoming .iter() .map(IncomingStateDisplay::from_incoming_state) .collect::>(); + + incoming_state_display.sort_by(|a, b| a.key.cmp(&b.key)); + let incoming_state_table = Table::new(incoming_state_display) .with(Panel::header("Incoming State")) .with(Modify::list(Rows::first(), Alignment::center())) @@ -307,11 +312,14 @@ impl ShowCmdHandler for ShowActorCmd { info!("{}", incoming_state_table); // convert to table for display - let internal_state_display = state + let mut internal_state_display = state .internal .iter() .map(InternalStateDisplay::from_internal_state) .collect::>(); + + internal_state_display.sort_by(|a, b| a.key.cmp(&b.key)); + let internal_state_table = Table::new(internal_state_display) .with(Panel::header("Internal State")) .with(Modify::list(Rows::first(), Alignment::center())) @@ -321,12 +329,15 @@ impl ShowCmdHandler for ShowActorCmd { info!("{}", internal_state_table); // convert to table for display - let outgoing_sent_state_display = state + let mut outgoing_sent_state_display = state .outgoing .outgoing_sent .iter() .map(OutgoingSentStateDisplay::from_outgoing_state) .collect::>(); + + outgoing_sent_state_display.sort_by(|a, b| a.key.cmp(&b.key)); + let outgoing_sent_state_table = Table::new(outgoing_sent_state_display) .with(Panel::header("Outgoing Sent Message State")) .with(Modify::list(Rows::first(), Alignment::center())) From e0f6e807325f1bbdc367d5847a884061c3f31ea0 Mon Sep 17 00:00:00 2001 From: yue-fred-gao <132678244+yue-fred-gao@users.noreply.github.com> Date: Tue, 4 Nov 2025 08:50:11 -0600 Subject: [PATCH 09/10] Add a PR template (#129) ### why I did it need a PR template to ensure consistent PR description ### what I did Use the same PR template from sonic-swss --- .github/PULL_REQUEST_TEMPLATE.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 .github/PULL_REQUEST_TEMPLATE.md diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md new file mode 100644 index 0000000..cdd7116 --- /dev/null +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -0,0 +1,18 @@ + +**What I did** + +**Why I did it** + +**How I verified it** + +**Details if related** \ No newline at end of file From f1157a5355a15f2e78d289c182e8db2bba217405 Mon Sep 17 00:00:00 2001 From: yue-fred-gao <132678244+yue-fred-gao@users.noreply.github.com> Date: Thu, 13 Nov 2025 14:07:40 -0600 Subject: [PATCH 10/10] Suppress route announcement if no change to advertised routes (#130) **What I did** If no new route is added, don't send route announcement. There might be change to existing routes, such as path added or removed. But it won't affect advertised routes because swbusd only advertises one route to its neighbours no matter how many paths the route has. **Why I did it** This suppresses unnecessary route announcement. Current design will send route announcement but the neighbour will not propagate to its neighbour if there is no change to its route table. With the new design, it will not send the announcement in the first place if there is no change. **How I verified it** Run it in a testbed with 2 switches, each with 8 swbusd. **Details if related** --- crates/swbus-core/src/mux/multiplexer.rs | 170 +++++++++++++++++------ 1 file changed, 124 insertions(+), 46 deletions(-) diff --git a/crates/swbus-core/src/mux/multiplexer.rs b/crates/swbus-core/src/mux/multiplexer.rs index b0fa535..63954df 100644 --- a/crates/swbus-core/src/mux/multiplexer.rs +++ b/crates/swbus-core/src/mux/multiplexer.rs @@ -221,7 +221,7 @@ impl SwbusMultiplexer { match Self::key_from_route_entry(entry) { Ok(route_key) => { let old_nh = dummy_nh.clone_with_hop_count(entry.hop_count + 1); - let route_removed = self.remove_route_to_nh(&route_key, &old_nh); + let (_, route_removed) = self.remove_route_to_nh(&route_key, &old_nh); if route_removed { need_announce = true; } @@ -233,7 +233,7 @@ impl SwbusMultiplexer { } // remove the direct route - let route_removed = self.remove_route_to_nh(&direct_route, &dummy_nh); + let (_, route_removed) = self.remove_route_to_nh(&direct_route, &dummy_nh); if route_removed { need_announce = true; } @@ -252,34 +252,42 @@ impl SwbusMultiplexer { } } - // Update route for the give service path with the new nexthop. If the route is updated, return true. - // Otherwise, return false. + // Update route for the give service path with the new nexthop. It returns a tuple of (nh_added, route_added). + // if inserted is true, the nexthop is newly inserted or updated (hop count changed). + // if route_added is true, the route entry is newly created. #[instrument(name = "update_route", level = "info", skip(self, nexthop), fields(nh_type=?nexthop.nh_type(), hop_count=nexthop.hop_count(), conn_info=nexthop.conn_info().as_ref().map(|x| x.id()).unwrap_or(&"None".to_string())))] - fn update_route(&self, route_key: String, nexthop: SwbusNextHop) -> bool { + fn update_route(&self, route_key: String, nexthop: SwbusNextHop) -> (bool, bool) { // If route entry doesn't exist, we insert the next hop as a new one. - let inserted = self.routes.entry(route_key).or_default().insert(nexthop.clone()); - info!("Update route entry: inserted={}", inserted); - inserted + let mut route_added = false; + if self.routes.get(&route_key).is_none() { + route_added = true; + } + let nh_added = self.routes.entry(route_key).or_default().insert(nexthop.clone()); + info!("Update route entry: nh_added={nh_added}, route_added={route_added}"); + (nh_added, route_added) } /// remove a specified route and to the specified nexthop + /// It returns a tuple of (nh_removed, route_removed). + /// if nh_removed is true, the nexthop is removed. + /// if route_removed is true, the route entry is completely removed. #[instrument(name = "remove_route_to_nh", level = "info", skip(self, nexthop), fields(nh_type=?nexthop.nh_type(), hop_count=nexthop.hop_count(), conn_info=nexthop.conn_info().as_ref().map(|x| x.id()).unwrap_or(&"None".to_string())))] - fn remove_route_to_nh(&self, route_key: &str, nexthop: &SwbusNextHop) -> bool { - let mut removed = false; - let mut remove_all = false; + fn remove_route_to_nh(&self, route_key: &str, nexthop: &SwbusNextHop) -> (bool, bool) { + let mut nh_removed = false; + let mut route_removed = false; if let Some(mut entry) = self.routes.get_mut(route_key) { - removed = entry.remove(nexthop); + nh_removed = entry.remove(nexthop); if entry.is_empty() { - remove_all = true; + route_removed = true; // can't remove route here because we are holding the lock on the entry. } } - if remove_all { + if route_removed { self.routes.remove(route_key); } - info!("Remove route entry: removed={}", removed); - removed + info!("Remove route entry: nh_removed={nh_removed}, route_removed={route_removed}"); + (nh_removed, route_removed) } /// remove a specified route @@ -393,7 +401,7 @@ impl SwbusMultiplexer { } }; let old_nh = nh.clone_with_hop_count(entry.hop_count + 1); - let route_removed = self.remove_route_to_nh(&route_key, &old_nh); + let (_, route_removed) = self.remove_route_to_nh(&route_key, &old_nh); if route_removed { need_announce = true; } @@ -410,8 +418,8 @@ impl SwbusMultiplexer { }; let new_nh = nh.clone_with_hop_count(entry.hop_count + 1); - let route_changed = self.update_route(route_key, new_nh); - if route_changed { + let (_, route_added) = self.update_route(route_key, new_nh); + if route_added { need_announce = true; } } @@ -712,7 +720,8 @@ pub mod test_utils { conn: &SwbusConn, ) -> Option { let nexthop_nh1 = SwbusNextHop::new_remote(conn.info().clone(), conn.new_proxy(), hop_count); - if mux.update_route(route_key.to_string(), nexthop_nh1) { + let (nh_added, _) = mux.update_route(route_key.to_string(), nexthop_nh1); + if nh_added { Some(RouteEntry { service_path: Some(ServicePath::from_string(route_key).unwrap()), hop_count, @@ -1176,28 +1185,28 @@ mod tests { add_route(&mux, "region-a.cluster-a.node1", 1, &conn1).unwrap(); let conn1_nh = SwbusNextHop::new_remote(conn1.info().clone(), conn1.new_proxy(), 1); - // create some route entries + // create some route entries for node 1 // route entry matches my routes. Should be skipped - let node1_re1 = new_route_entry_for_ra("region-a.cluster-a.node1", 0); + let n1_node1_re1 = new_route_entry_for_ra("region-a.cluster-a.node1", 0); // negative test: route announcement with scope lower than InCluster - let node1_re2 = new_route_entry_for_ra("region-a.cluster-a.node1/ha/0", 1); - let node2_re1 = new_route_entry_for_ra("region-a.cluster-a.node2", 1); - let clusterb_re1 = new_route_entry_for_ra("region-a.cluster-b", 2); + let n1_node1_re2 = new_route_entry_for_ra("region-a.cluster-a.node1/ha/0", 1); + let n1_node2_re1 = new_route_entry_for_ra("region-a.cluster-a.node2", 1); + let n1_clusterb_re1 = new_route_entry_for_ra("region-a.cluster-b", 2); - // Step1: process a route announcement + // Step1: process a route announcement from node1 let routes = RouteEntries { entries: vec![ - node1_re1.clone(), - node1_re2.clone(), - node2_re1.clone(), - clusterb_re1.clone(), + n1_node1_re1.clone(), + n1_node1_re2.clone(), + n1_node2_re1.clone(), + n1_clusterb_re1.clone(), ], }; let expected_route_entries = BTreeSet::from_iter(vec![ - node1_re1.clone(), - node1_re2.clone(), - node2_re1.clone(), - clusterb_re1.clone(), + n1_node1_re1.clone(), + n1_node1_re2.clone(), + n1_node2_re1.clone(), + n1_clusterb_re1.clone(), ]); mux.process_route_announcement(routes.clone(), conn1.info()).unwrap(); // Expect: @@ -1212,15 +1221,15 @@ mod tests { BTreeSet::from([SwbusNextHop::new_local()]), ); expected.insert( - SwbusMultiplexer::key_from_route_entry(&node1_re1).unwrap(), + SwbusMultiplexer::key_from_route_entry(&n1_node1_re1).unwrap(), BTreeSet::from([conn1_nh.clone_with_hop_count(1)]), ); expected.insert( - SwbusMultiplexer::key_from_route_entry(&node2_re1).unwrap(), + SwbusMultiplexer::key_from_route_entry(&n1_node2_re1).unwrap(), BTreeSet::from([conn1_nh.clone_with_hop_count(2)]), ); expected.insert( - SwbusMultiplexer::key_from_route_entry(&clusterb_re1).unwrap(), + SwbusMultiplexer::key_from_route_entry(&n1_clusterb_re1).unwrap(), BTreeSet::from([conn1_nh.clone_with_hop_count(3)]), ); @@ -1239,7 +1248,7 @@ mod tests { assert_eq!(*routes_by_conn, expected_route_entries); } - // Step 2: process a route announcement with the same routes + // Step 2: process a route announcement with the same routes from node1 mux.process_route_announcement(routes.clone(), conn1.info()).unwrap(); // Expect: // 1. no route is updated @@ -1256,16 +1265,16 @@ mod tests { // process a route announcement let routes = RouteEntries { entries: vec![ - node1_re1.clone(), - node1_re2.clone(), + n1_node1_re1.clone(), + n1_node1_re2.clone(), node2_re1_changed.clone(), clusterb_re1.clone(), node3_re1.clone(), ], }; let expected_route_entries = BTreeSet::from_iter(vec![ - node1_re1.clone(), - node1_re2.clone(), + n1_node1_re1.clone(), + n1_node1_re2.clone(), node2_re1_changed.clone(), clusterb_re1.clone(), node3_re1.clone(), @@ -1277,7 +1286,7 @@ mod tests { BTreeSet::from([SwbusNextHop::new_local()]), ); expected.insert( - SwbusMultiplexer::key_from_route_entry(&node1_re1).unwrap(), + SwbusMultiplexer::key_from_route_entry(&n1_node1_re1).unwrap(), BTreeSet::from([conn1_nh.clone_with_hop_count(1)]), ); expected.insert( @@ -1313,6 +1322,74 @@ mod tests { let routes_by_conn = mux.routes_by_conn.get(conn1.info()).unwrap(); assert_eq!(*routes_by_conn, expected_route_entries); } + let expected_before_node2 = expected.clone(); + // Step 4: process a route announcement from node2 + // add a direct route to the node2 + let (conn2, _) = + new_conn_for_test_with_endpoint(ConnectionType::InCluster, "region-a.cluster-a.node2", "127.0.0.1:8081"); + add_route(&mux, "region-a.cluster-a.node2", 1, &conn2).unwrap(); + let conn2_nh = SwbusNextHop::new_remote(conn2.info().clone(), conn2.new_proxy(), 1); + + // create some route entries for node 2 + let n2_node1_re1 = new_route_entry_for_ra("region-a.cluster-a.node1", 1); + let n2_clusterb_re1 = new_route_entry_for_ra("region-a.cluster-b", 2); + + let routes = RouteEntries { + entries: vec![n2_node1_re1.clone(), n2_clusterb_re1.clone()], + }; + mux.process_route_announcement(routes.clone(), conn2.info()).unwrap(); + + // Expect: + // 1. no route anouncement task is created as no new route is added + // 2. route_entries are updated with new paths + expected + .entry(SwbusMultiplexer::key_from_route_entry(&n1_node2_re1).unwrap()) + .or_default() + .insert(conn2_nh.clone_with_hop_count(1)); + expected + .entry(SwbusMultiplexer::key_from_route_entry(&n2_node1_re1).unwrap()) + .or_default() + .insert(conn2_nh.clone_with_hop_count(2)); + expected + .entry(SwbusMultiplexer::key_from_route_entry(&n2_clusterb_re1).unwrap()) + .or_default() + .insert(conn2_nh.clone_with_hop_count(3)); + let actual: HashMap> = mux + .routes + .iter() + .map(|entry| (entry.key().clone(), entry.value().clone())) + .collect(); + assert_eq!(actual, expected); + assert_eq!( + route_announce_task_rx.try_recv(), + Err(tokio::sync::mpsc::error::TryRecvError::Empty) + ); + + // Step 5: process a route announcement with routes removed from node2 + // Expect: + // 1. no route anouncement task is created as only paths are removed but no route is removed + // 2. route_entries are updated with paths removed + let routes = RouteEntries { entries: vec![] }; + mux.process_route_announcement(routes.clone(), conn2.info()).unwrap(); + + let mut expected = expected_before_node2.clone(); + expected + .entry(SwbusMultiplexer::key_from_route_entry(&n1_node2_re1).unwrap()) + .or_default() + .insert(conn2_nh.clone_with_hop_count(1)); + + let actual: HashMap> = mux + .routes + .iter() + .map(|entry| (entry.key().clone(), entry.value().clone())) + .collect(); + + assert_eq!(actual, expected); + + assert_eq!( + route_announce_task_rx.try_recv(), + Err(tokio::sync::mpsc::error::TryRecvError::Empty) + ); } #[tokio::test] @@ -1381,9 +1458,9 @@ mod tests { add_route(&mux, route_key, 1, &conn2).unwrap(); let nexthop2 = SwbusNextHop::new_remote(conn2.info().clone(), conn2.new_proxy(), 1); - assert!(mux.remove_route_to_nh(route_key, &nexthop1)); + assert!(mux.remove_route_to_nh(route_key, &nexthop1).0); assert!(mux.routes.contains_key(route_key)); - assert!(mux.remove_route_to_nh(route_key, &nexthop2)); + assert!(mux.remove_route_to_nh(route_key, &nexthop2).0); assert!(!mux.routes.contains_key(route_key)); // Ensure route is removed when empty } @@ -1411,7 +1488,8 @@ mod tests { ); let nexthop2 = SwbusNextHop::new_remote(conn2.info().clone(), conn2.new_proxy(), 1); - assert!(!mux.remove_route_to_nh(route_key, &nexthop2)); + let (nh_remove, route_removed) = mux.remove_route_to_nh(route_key, &nexthop2); + assert!(!nh_remove && !route_removed); assert!(mux.routes.contains_key(route_key)); // Ensure route is not removed }