diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md new file mode 100644 index 0000000..cdd7116 --- /dev/null +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -0,0 +1,18 @@ + +**What I did** + +**Why I did it** + +**How I verified it** + +**Details if related** \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index d504ecf..38ec0ba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2144,7 +2144,7 @@ dependencies = [ [[package]] name = "swss-common" version = "0.1.0" -source = "git+https://github.com/sonic-net/sonic-swss-common.git?branch=master#1484a851dbfdd4b122c361cd7ea03eca0afe5d63" +source = "git+https://github.com/sonic-net/sonic-swss-common.git?branch=master#7407a2e6060f9e8c0423e5886b96f1146dbca315" dependencies = [ "bindgen", "getset", @@ -2173,12 +2173,13 @@ dependencies = [ "swss-common-testing", "tokio", "tokio-util", + "tracing", ] [[package]] name = "swss-common-testing" version = "0.1.0" -source = "git+https://github.com/sonic-net/sonic-swss-common.git?branch=master#1484a851dbfdd4b122c361cd7ea03eca0afe5d63" +source = "git+https://github.com/sonic-net/sonic-swss-common.git?branch=master#7407a2e6060f9e8c0423e5886b96f1146dbca315" dependencies = [ "lazy_static", "rand", diff --git a/crates/hamgrd/src/actors/dpu.rs b/crates/hamgrd/src/actors/dpu.rs index 0c994e4..f714e83 100644 --- a/crates/hamgrd/src/actors/dpu.rs +++ b/crates/hamgrd/src/actors/dpu.rs @@ -1,6 +1,7 @@ use crate::actors::{spawn_consumer_bridge_for_actor, ActorCreator}; use crate::db_structs::{ - BfdSessionTable, DashBfdProbeState, DashHaGlobalConfig, Dpu, DpuPmonStateType, DpuState, RemoteDpu, + BfdSessionTable, DashBfdProbeState, DashHaGlobalConfig, Dpu, DpuPmonStateType, DpuState, NeighResolveTable, + RemoteDpu, }; use crate::ha_actor_messages::{ActorRegistration, DpuActorState, RegistrationType}; use crate::ServicePath; @@ -169,6 +170,10 @@ impl DpuActor { if is_managed { if first_time { + if let Err(e) = self.set_neigh_resolve(outgoing) { + error!("Failed to set NEIGH_RESOLVE_TABLE: {}", e); + } + // DASH_HA_GLOBAL_CONFIG from common-bridge sent to this actor instance only. // Key is DASH_HA_GLOBAL_CONFIG self.bridges.push( @@ -463,6 +468,30 @@ impl DpuActor { } false } + + fn set_neigh_resolve(&self, outgoing: &mut Outgoing) -> Result<()> { + let Some(DpuData::LocalDpu { ref dpu, .. }) = self.dpu else { + debug!("DPU is not managed by this HA instance, do not set NEIGH_RESOLVE_TABLE"); + return Ok(()); + }; + + let swss_key = format!("{}:{}", "", dpu.pa_ipv4.clone()); + let entry = NeighResolveTable { + mac: "00:00:00:00:00:00".to_string(), + }; + + let fv = swss_serde::to_field_values(&entry)?; + let kfv = KeyOpFieldValues { + key: swss_key, + operation: KeyOperation::Set, + field_values: fv, + }; + + let msg = ActorMessage::new(self.id.clone(), &kfv)?; + outgoing.send(outgoing.common_bridge_sp::(), msg); + + Ok(()) + } } impl Actor for DpuActor { @@ -504,8 +533,9 @@ mod test { dpu::DpuActor, test::{self, *}, }; - use crate::db_structs::{BfdSessionTable, DashBfdProbeState, DashHaGlobalConfig, Dpu, DpuState, RemoteDpu}; - + use crate::db_structs::{ + BfdSessionTable, DashBfdProbeState, DashHaGlobalConfig, Dpu, DpuState, NeighResolveTable, RemoteDpu, + }; use crate::ha_actor_messages::DpuActorState; use sonic_common::SonicDbTable; use std::time::Duration; @@ -514,6 +544,9 @@ mod test { #[tokio::test] async fn dpu_actor() { + // To enable trace, set ENABLE_TRACE=1 to run test + sonic_common::log::init_logger_for_test(); + let _ = Redis::start_config_db(); let runtime = test::create_actor_runtime(0, "10.0.0.0", "10::").await; // prepare test data @@ -566,6 +599,9 @@ mod test { // Receiving DPU config-db object from swss-common bridge send! { key: Dpu::table_name(), data: { "key": "switch0_dpu0", "operation": "Set", "field_values": dpu_fvs}, addr: crate::common_bridge_sp::(&runtime.get_swbus_edge()) }, + // Check for NeighResolveTable message + recv! { key: "switch0_dpu0", data: {"key": format!("{}:{}", "".to_string(), dpu_actor_state_wo_bfd.pa_ipv4.clone()), "operation": "Set", "field_values": {"mac":"00:00:00:00:00:00"}}, + addr: crate::common_bridge_sp::(&runtime.get_swbus_edge()) }, send! { key: "REMOTE_DPU|switch1_dpu0", data: { "key": "REMOTE_DPU|switch1_dpu0", "operation": "Set", "field_values": serde_json::to_value(&remote_dpu1_fvs).unwrap() }}, send! { key: DashHaGlobalConfig::table_name(), data: { "key": DashHaGlobalConfig::table_name(), "operation": "Set", "field_values": dash_global_cfg_fvs} }, recv! { key: "switch0_dpu0", data: {"key": "default:default:10.0.0.0", "operation": "Set", "field_values": bfd_fvs}, diff --git a/crates/hamgrd/src/actors/ha_set.rs b/crates/hamgrd/src/actors/ha_set.rs index 949a4fa..70467ef 100644 --- a/crates/hamgrd/src/actors/ha_set.rs +++ b/crates/hamgrd/src/actors/ha_set.rs @@ -199,8 +199,8 @@ impl HaSetActor { for vdpu_ext in vdpus { if !vdpu_ext.vdpu.dpu.remote_dpu { - // if it is locally managed dpu, use local nexthop as endpoint - endpoint.push(vdpu_ext.vdpu.dpu.local_nexthop_ip.clone()); + // if it is locally managed dpu, use PA address as endpoint + endpoint.push(vdpu_ext.vdpu.dpu.pa_ipv4.clone()); } else { endpoint.push(vdpu_ext.vdpu.dpu.npu_ipv4.clone()); } @@ -208,7 +208,7 @@ impl HaSetActor { endpoint_monitor.push(vdpu_ext.vdpu.dpu.pa_ipv4.clone()); if vdpu_ext.is_primary { if !vdpu_ext.vdpu.dpu.remote_dpu { - primary.push(vdpu_ext.vdpu.dpu.local_nexthop_ip.clone()); + primary.push(vdpu_ext.vdpu.dpu.pa_ipv4.clone()); } else { primary.push(vdpu_ext.vdpu.dpu.npu_ipv4.clone()); } @@ -228,7 +228,7 @@ impl HaSetActor { let vnet_route = VnetRouteTunnelTable { endpoint, endpoint_monitor: Some(endpoint_monitor), - monitoring: None, + monitoring: Some("custom_bfd".into()), primary: Some(primary), rx_monitor_timer: global_cfg.dpu_bfd_probe_interval_in_ms, tx_monitor_timer: global_cfg.dpu_bfd_probe_interval_in_ms, @@ -564,7 +564,7 @@ mod test { vdpu0_state_obj.dpu.pa_ipv4.clone(), vdpu1_state_obj.dpu.pa_ipv4.clone(), ]), - monitoring: None, + monitoring: Some("custom_bfd".into()), primary: Some(vec![vdpu0_state_obj.dpu.pa_ipv4.clone()]), rx_monitor_timer: global_cfg.dpu_bfd_probe_interval_in_ms, tx_monitor_timer: global_cfg.dpu_bfd_probe_interval_in_ms, @@ -657,7 +657,7 @@ mod test { vdpu0_state_obj.dpu.pa_ipv4.clone(), vdpu1_state_obj.dpu.pa_ipv4.clone(), ]), - monitoring: None, + monitoring: Some("custom_bfd".into()), primary: Some(vec![vdpu0_state_obj.dpu.npu_ipv4.clone()]), rx_monitor_timer: global_cfg.dpu_bfd_probe_interval_in_ms, tx_monitor_timer: global_cfg.dpu_bfd_probe_interval_in_ms, diff --git a/crates/hamgrd/src/actors/test.rs b/crates/hamgrd/src/actors/test.rs index 567277f..e254f6a 100644 --- a/crates/hamgrd/src/actors/test.rs +++ b/crates/hamgrd/src/actors/test.rs @@ -373,7 +373,6 @@ pub fn make_dpu_object(switch: u16, dpu: u32) -> Dpu { vip_ipv6: Some(normalize_ipv6(&format!("3:2:{switch_pair_id}::{dpu}"))), pa_ipv4: format!("18.0.{switch}.{dpu}"), pa_ipv6: Some(normalize_ipv6(&format!("18:0:{switch}::{dpu}"))), - local_nexthop_ip: format!("18.0.{switch}.{dpu}"), dpu_id: dpu, vdpu_id: Some(format!("vdpu{}", switch * 8 + dpu as u16)), orchagent_zmq_port: 8100, @@ -453,7 +452,6 @@ pub fn to_local_dpu(dpu_actor_state: &DpuActorState) -> Dpu { vip_ipv6: dpu_actor_state.vip_ipv6.clone(), pa_ipv4: dpu_actor_state.pa_ipv4.clone(), pa_ipv6: dpu_actor_state.pa_ipv6.clone(), - local_nexthop_ip: dpu_actor_state.local_nexthop_ip.clone(), dpu_id: dpu_actor_state.dpu_id, vdpu_id: dpu_actor_state.vdpu_id.clone(), orchagent_zmq_port: dpu_actor_state.orchagent_zmq_port, diff --git a/crates/hamgrd/src/db_structs.rs b/crates/hamgrd/src/db_structs.rs index 5fb3c0a..36e8c63 100644 --- a/crates/hamgrd/src/db_structs.rs +++ b/crates/hamgrd/src/db_structs.rs @@ -43,7 +43,6 @@ pub struct Dpu { pub vip_ipv6: Option, pub pa_ipv4: String, pub pa_ipv6: Option, - pub local_nexthop_ip: String, pub dpu_id: u32, pub vdpu_id: Option, pub orchagent_zmq_port: u16, @@ -486,6 +485,14 @@ pub fn get_dpu_config_from_db(dpu_id: u32) -> Result { Err(anyhow::anyhow!("DPU entry not found for slot {}", dpu_id)) } +#[skip_serializing_none] +#[serde_as] +#[derive(Default, Debug, Deserialize, Serialize, PartialEq, SonicDb)] +#[sonicdb(table_name = "NEIGH_RESOLVE_TABLE", key_separator = ":", db_name = "APPL_DB")] +pub struct NeighResolveTable { + pub mac: String, +} + #[cfg(test)] mod test { use super::*; @@ -500,7 +507,6 @@ mod test { "operation": "Set", "field_values": { "pa_ipv4": "1.2.3.4", - "local_nexthop_ip": "2.2.2.5", "dpu_id": "1", "orchagent_zmq_port": "8100", "swbus_port": "23606", @@ -552,7 +558,6 @@ mod test { pa_ipv4: "1.2.3.6".to_string(), vip_ipv4: Some("4.5.6.6".to_string()), pa_ipv6: None, - local_nexthop_ip: "2.2.2.5".to_string(), dpu_id: 6, orchagent_zmq_port: 8100, swbus_port: 23612, @@ -571,7 +576,6 @@ mod test { for d in 6..8 { let dpu_fvs = vec![ ("pa_ipv4".to_string(), Ipv4Addr::new(1, 2, 3, d).to_string()), - ("local_nexthop_ip".to_string(), Ipv4Addr::new(2, 2, 2, 5).to_string()), ("vip_ipv4".to_string(), Ipv4Addr::new(4, 5, 6, d).to_string()), ("dpu_id".to_string(), d.to_string()), ("orchagent_zmq_port".to_string(), "8100".to_string()), diff --git a/crates/hamgrd/src/ha_actor_messages.rs b/crates/hamgrd/src/ha_actor_messages.rs index f6e5582..934da16 100644 --- a/crates/hamgrd/src/ha_actor_messages.rs +++ b/crates/hamgrd/src/ha_actor_messages.rs @@ -25,7 +25,6 @@ pub struct DpuActorState { pub vip_ipv6: Option, pub pa_ipv4: String, pub pa_ipv6: Option, - pub local_nexthop_ip: String, pub dpu_id: u32, pub vdpu_id: Option, pub orchagent_zmq_port: u16, @@ -57,7 +56,6 @@ impl DpuActorState { vip_ipv6: dpu.vip_ipv6.clone(), pa_ipv4: dpu.pa_ipv4.clone(), pa_ipv6: dpu.pa_ipv6.clone(), - local_nexthop_ip: dpu.local_nexthop_ip.clone(), dpu_id: dpu.dpu_id, vdpu_id: dpu.vdpu_id.clone(), orchagent_zmq_port: dpu.orchagent_zmq_port, @@ -81,7 +79,6 @@ impl DpuActorState { vip_ipv6: None, pa_ipv4: rdpu.pa_ipv4.clone(), pa_ipv6: rdpu.pa_ipv6.clone(), - local_nexthop_ip: "".to_string(), dpu_id: rdpu.dpu_id, vdpu_id: None, orchagent_zmq_port: 0, diff --git a/crates/hamgrd/src/main.rs b/crates/hamgrd/src/main.rs index 4fc064b..c9aea26 100644 --- a/crates/hamgrd/src/main.rs +++ b/crates/hamgrd/src/main.rs @@ -24,7 +24,9 @@ mod ha_actor_messages; use actors::{dpu::DpuActor, ha_scope::HaScopeActor, ha_set::HaSetActor, vdpu::VDpuActor, DbBasedActor}; use actors::{spawn_vanilla_producer_bridge, spawn_zmq_producer_bridge}; use anyhow::Result; -use db_structs::{BfdSessionTable, DashHaScopeTable, DashHaSetTable, Dpu, VDpu, VnetRouteTunnelTable}; +use db_structs::{ + BfdSessionTable, DashHaScopeTable, DashHaSetTable, Dpu, NeighResolveTable, VDpu, VnetRouteTunnelTable, +}; use lazy_static::lazy_static; use sonic_dash_api_proto::{ha_scope_config::HaScopeConfig, ha_set_config::HaSetConfig}; use std::any::Any; @@ -154,6 +156,10 @@ async fn spawn_producer_bridges(edge_runtime: Arc, dpu: &Dpu) let handle = spawn_vanilla_producer_bridge::(edge_runtime.clone()).await?; handles.push(handle); + + let handle = spawn_vanilla_producer_bridge::(edge_runtime.clone()).await?; + handles.push(handle); + Ok(handles) } diff --git a/crates/swbus-cli/src/main.rs b/crates/swbus-cli/src/main.rs index 9b33fb8..2504390 100644 --- a/crates/swbus-cli/src/main.rs +++ b/crates/swbus-cli/src/main.rs @@ -262,7 +262,6 @@ mod tests { #[test] fn test_get_swbus_config() { let slot = 1; - let npu_ipv4 = "10.0.1.0"; let _ = Redis::start_config_db(); // Mock the config database with a sample configuration @@ -271,15 +270,7 @@ mod tests { std::env::set_var("DEV", format!("dpu{slot}")); let config = get_swbus_config(None).unwrap(); assert_eq!(config.endpoint.to_string(), format!("{}:{}", "10.0.1.0", 23606 + slot)); - let expected_sp = ServicePath::with_node( - "region-a", - "cluster-a", - &format!("{npu_ipv4}-dpu{slot}"), - "", - "", - "", - "", - ); + let expected_sp = ServicePath::with_node("region-a", "cluster-a", &format!("host1-dpu{slot}"), "", "", "", ""); assert!(config .routes .iter() diff --git a/crates/swbus-cli/src/show/hamgrd/actor.rs b/crates/swbus-cli/src/show/hamgrd/actor.rs index 5769887..cbb7a91 100644 --- a/crates/swbus-cli/src/show/hamgrd/actor.rs +++ b/crates/swbus-cli/src/show/hamgrd/actor.rs @@ -44,6 +44,8 @@ fn unix_secs_to_string(unix_secs: u64) -> String { impl IncomingStateDisplay { fn from_incoming_state((key, state): (&String, &IncomingTableEntry)) -> Self { + let formatted_value = Self::format_message_value(&state.msg.data); + let details = vec![ KeyValue { attribute: "source".to_string(), @@ -63,7 +65,7 @@ impl IncomingStateDisplay { }, KeyValue { attribute: "message/value".to_string(), - value: to_string_pretty(&state.msg.data).unwrap_or("INV".to_string()), + value: formatted_value, }, KeyValue { attribute: "created-time".to_string(), @@ -88,6 +90,25 @@ impl IncomingStateDisplay { details: table, } } + + fn format_message_value(data: &serde_json::Value) -> String { + let formatted = Self::try_format_protobuf_json(data); + formatted.unwrap_or_else(|| to_string_pretty(data).unwrap_or_else(|_| "INV".to_string())) + } + + fn try_format_protobuf_json(data: &serde_json::Value) -> Option { + let obj = data.as_object()?; + let field_values = obj.get("field_values")?.as_object()?; + let json_str = field_values.get("json")?.as_str()?; + + let parsed = serde_json::from_str::(json_str).ok()?; + + let mut new_obj = obj.clone(); + let new_field_values = new_obj.get_mut("field_values")?.as_object_mut()?; + new_field_values.insert("json".to_string(), parsed); + + to_string_pretty(&new_obj).ok() + } } #[derive(Tabled)] @@ -227,7 +248,7 @@ impl InternalStateDisplay { ]; let table_meta = Table::new(table_meta).with(Style::ascii().remove_frame()).to_string(); - let committed_fvs = state + let mut committed_fvs = state .backup_fvs .iter() .map(|(key, value)| KeyValue { @@ -236,6 +257,8 @@ impl InternalStateDisplay { }) .collect::>(); + committed_fvs.sort_by(|a, b| a.attribute.cmp(&b.attribute)); + let committed_fvs = Table::new(committed_fvs) .with(Style::ascii().remove_frame()) .to_string(); @@ -272,11 +295,14 @@ impl ShowCmdHandler for ShowActorCmd { let state: ActorStateDump = serde_json::from_str(result).unwrap(); // convert to table for display - let incoming_state_display = state + let mut incoming_state_display: Vec = state .incoming .iter() .map(IncomingStateDisplay::from_incoming_state) .collect::>(); + + incoming_state_display.sort_by(|a, b| a.key.cmp(&b.key)); + let incoming_state_table = Table::new(incoming_state_display) .with(Panel::header("Incoming State")) .with(Modify::list(Rows::first(), Alignment::center())) @@ -286,11 +312,14 @@ impl ShowCmdHandler for ShowActorCmd { info!("{}", incoming_state_table); // convert to table for display - let internal_state_display = state + let mut internal_state_display = state .internal .iter() .map(InternalStateDisplay::from_internal_state) .collect::>(); + + internal_state_display.sort_by(|a, b| a.key.cmp(&b.key)); + let internal_state_table = Table::new(internal_state_display) .with(Panel::header("Internal State")) .with(Modify::list(Rows::first(), Alignment::center())) @@ -300,12 +329,15 @@ impl ShowCmdHandler for ShowActorCmd { info!("{}", internal_state_table); // convert to table for display - let outgoing_sent_state_display = state + let mut outgoing_sent_state_display = state .outgoing .outgoing_sent .iter() .map(OutgoingSentStateDisplay::from_outgoing_state) .collect::>(); + + outgoing_sent_state_display.sort_by(|a, b| a.key.cmp(&b.key)); + let outgoing_sent_state_table = Table::new(outgoing_sent_state_display) .with(Panel::header("Outgoing Sent Message State")) .with(Modify::list(Rows::first(), Alignment::center())) diff --git a/crates/swbus-config/src/lib.rs b/crates/swbus-config/src/lib.rs index c2f7925..50f6e78 100644 --- a/crates/swbus-config/src/lib.rs +++ b/crates/swbus-config/src/lib.rs @@ -31,8 +31,6 @@ pub struct RouteConfig { #[derive(Debug, Clone, Deserialize, PartialEq, Eq)] pub struct PeerConfig { - #[serde(deserialize_with = "deserialize_service_path")] - pub id: ServicePath, pub endpoint: SocketAddr, pub conn_type: ConnectionType, } @@ -126,37 +124,29 @@ pub struct ConfigDBDeviceMetadataEntry { #[serde(rename = "type")] pub device_type: Option, pub sub_type: Option, + pub hostname: Option, } #[instrument] -fn route_config_from_dpu_entry(dpu_entry: &ConfigDBDPUEntry, region: &str, cluster: &str) -> Result> { +fn route_config_from_dpu_entry( + dpu_entry: &ConfigDBDPUEntry, + region: &str, + cluster: &str, + hostname: &str, +) -> Vec { let mut routes = Vec::new(); let dpu_id = dpu_entry.dpu_id; debug!("Collecting routes for local dpu{}", dpu_id); - if let Some(npu_ipv4) = dpu_entry.npu_ipv4.as_ref() { - let sp = ServicePath::with_node(region, cluster, &format!("{npu_ipv4}-dpu{dpu_id}"), "", "", "", ""); - routes.push(RouteConfig { - key: sp, - scope: RouteScope::InCluster, - }); - } - - if let Some(npu_ipv6) = dpu_entry.npu_ipv6.as_ref() { - let sp = ServicePath::with_node(region, cluster, &format!("{npu_ipv6}-dpu{dpu_id}"), "", "", "", ""); - routes.push(RouteConfig { - key: sp, - scope: RouteScope::InCluster, - }); - } - - if routes.is_empty() { - SwbusConfigError::InvalidConfig(format!("No valid routes found in local dpu{dpu_id}")); - } + let sp = ServicePath::with_node(region, cluster, &format!("{hostname}-dpu{dpu_id}"), "", "", "", ""); + routes.push(RouteConfig { + key: sp, + scope: RouteScope::InCluster, + }); debug!("Routes collected: {:?}", &routes); - Ok(routes) + routes } #[instrument] @@ -184,29 +174,22 @@ fn peer_config_from_dpu_entry( "swbusd_port is not found in dpu {key} is not found" )))?; - let dpu_id = dpu_entry.dpu_id; - + // dual stack is not supported. Either all ipv4 or all ipv6. if let Some(npu_ipv4) = dpu_entry.npu_ipv4 { let npu_ipv4 = npu_ipv4 .parse::() .map_err(|_| SwbusConfigError::InvalidConfig(format!("Invalid IPv4 address: {npu_ipv4}")))?; - let sp = ServicePath::with_node(region, cluster, &format!("{npu_ipv4}-dpu{dpu_id}"), "", "", "", ""); peers.push(PeerConfig { - id: sp, endpoint: SocketAddr::new(IpAddr::V4(npu_ipv4), swbusd_port), conn_type: ConnectionType::InCluster, }); - } - - if let Some(npu_ipv6) = dpu_entry.npu_ipv6 { + } else if let Some(npu_ipv6) = dpu_entry.npu_ipv6 { let npu_ipv6 = npu_ipv6 .parse::() .map_err(|_| SwbusConfigError::InvalidConfig(format!("Invalid IPv6 address: {npu_ipv6}")))?; - let sp = ServicePath::with_node(region, cluster, &format!("{npu_ipv6}-dpu{dpu_id}"), "", "", "", ""); peers.push(PeerConfig { - id: sp, endpoint: SocketAddr::new(IpAddr::V6(npu_ipv6), swbusd_port), conn_type: ConnectionType::InCluster, }); @@ -221,7 +204,7 @@ fn peer_config_from_dpu_entry( } #[instrument] -fn get_device_info() -> Result<(String, String)> { +fn get_device_info() -> Result<(String, String, String)> { let db = DbConnector::new_named(CONFIG_DB, false, 0).map_err(|e| ("connecting to config_db".into(), e))?; let table = Table::new(db, "DEVICE_METADATA").map_err(|e| ("opening DEVICE_METADATA table".into(), e))?; @@ -235,9 +218,12 @@ fn get_device_info() -> Result<(String, String)> { "cluster not found in DEVICE_METADATA table".into(), ))?; - debug!("Region: {}, Cluster: {}", region, cluster); + let hostname = metadata.hostname.ok_or(SwbusConfigError::InvalidConfig( + "hostname not found in DEVICE_METADATA table".into(), + ))?; + debug!("Region: {}, Cluster: {}, Hostname: {}", region, cluster, hostname); - Ok((region, cluster)) + Ok((region, cluster, hostname)) } #[instrument] @@ -288,7 +274,7 @@ pub fn swbus_config_from_db(dpu_id: u32) -> Result { let mut myroutes: Option> = None; let mut myendpoint: Option = None; - let (region, cluster) = get_device_info()?; + let (region, cluster, hostname) = get_device_info()?; // Get the Loopback0 address let (my_ipv4, my_ipv6) = get_loopback_address(0)?; @@ -314,10 +300,7 @@ pub fn swbus_config_from_db(dpu_id: u32) -> Result { "swbusd_port is not found in dpu{dpu_id} is not found" )))?; - myroutes = Some(route_config_from_dpu_entry(&dpu, ®ion, &cluster).map_err(|e| { - error!("Failed to collect routes for dpu{dpu_id}: {e}"); - e - })?); + myroutes = Some(route_config_from_dpu_entry(&dpu, ®ion, &cluster, &hostname)); if let Some(npu_ipv4) = dpu.npu_ipv4 { myendpoint = Some(SocketAddr::new(std::net::IpAddr::V4(npu_ipv4), swbusd_port)); @@ -405,6 +388,7 @@ pub mod test_utils { cluster: Some("cluster-a".to_string()), device_type: Some("SpineRouter".to_string()), sub_type: Some("SmartSwitch".to_string()), + hostname: Some("host1".to_string()), }; to_table(&metadata, &table, "localhost").unwrap(); @@ -489,47 +473,25 @@ mod tests { let mut config_fromdb = swbus_config_from_db(0).unwrap(); - assert_eq!(config_fromdb.routes.len(), 2); - assert_eq!(config_fromdb.peers.len(), 10); + assert_eq!(config_fromdb.routes.len(), 1); + assert_eq!(config_fromdb.peers.len(), 5); // create equivalent config in yaml let yaml_content = r#" endpoint: "10.0.1.0:23606" routes: - - key: "region-a.cluster-a.10.0.1.0-dpu0" - scope: "InCluster" - - key: "region-a.cluster-a.2001:db8:1::-dpu0" + - key: "region-a.cluster-a.host1-dpu0" scope: "InCluster" peers: - - id: "region-a.cluster-a.10.0.1.0-dpu1" - endpoint: "10.0.1.0:23607" - conn_type: "InCluster" - - id: "region-a.cluster-a.2001:db8:1::-dpu1" - endpoint: "[2001:db8:1::]:23607" - conn_type: "InCluster" - - id: "region-a.cluster-a.10.0.1.1-dpu0" - endpoint: "10.0.1.1:23606" - conn_type: "InCluster" - - id: "region-a.cluster-a.2001:db8:1::1-dpu0" - endpoint: "[2001:db8:1::1]:23606" + - endpoint: "10.0.1.0:23607" conn_type: "InCluster" - - id: "region-a.cluster-a.10.0.1.1-dpu1" - endpoint: "10.0.1.1:23607" + - endpoint: "10.0.1.1:23606" conn_type: "InCluster" - - id: "region-a.cluster-a.2001:db8:1::1-dpu1" - endpoint: "[2001:db8:1::1]:23607" + - endpoint: "10.0.1.1:23607" conn_type: "InCluster" - - id: "region-a.cluster-a.10.0.1.2-dpu0" - endpoint: "10.0.1.2:23606" + - endpoint: "10.0.1.2:23606" conn_type: "InCluster" - - id: "region-a.cluster-a.2001:db8:1::2-dpu0" - endpoint: "[2001:db8:1::2]:23606" - conn_type: "InCluster" - - id: "region-a.cluster-a.10.0.1.2-dpu1" - endpoint: "10.0.1.2:23607" - conn_type: "InCluster" - - id: "region-a.cluster-a.2001:db8:1::2-dpu1" - endpoint: "[2001:db8:1::2]:23607" + - endpoint: "10.0.1.2:23607" conn_type: "InCluster" "#; @@ -543,9 +505,9 @@ mod tests { expected.npu_ipv6 = Some(Ipv6Addr::from_str("2001:db8:1::").unwrap()); // sort before compare config_fromdb.routes.sort_by(|a, b| a.key.cmp(&b.key)); - config_fromdb.peers.sort_by(|a, b| a.id.cmp(&b.id)); + config_fromdb.peers.sort_by(|a, b| a.endpoint.cmp(&b.endpoint)); expected.routes.sort_by(|a, b| a.key.cmp(&b.key)); - expected.peers.sort_by(|a, b| a.id.cmp(&b.id)); + expected.peers.sort_by(|a, b| a.endpoint.cmp(&b.endpoint)); assert_eq!(config_fromdb, expected); cleanup_configdb_for_test(); @@ -559,11 +521,9 @@ mod tests { - key: "region-a.cluster-a.10.0.0.1-dpu0" scope: "InCluster" peers: - - id: "region-a.cluster-a.10.0.0.2-dpu0" - endpoint: "10.0.0.2:8000" + - endpoint: "10.0.0.2:8000" conn_type: "InCluster" - - id: "region-a.cluster-a.10.0.0.3-dpu0" - endpoint: "10.0.0.3:8000" + - endpoint: "10.0.0.3:8000" conn_type: "InCluster" "#; @@ -590,19 +550,11 @@ mod tests { ); assert_eq!(config.routes[0].scope, RouteScope::InCluster); - assert_eq!( - config.peers[0].id, - ServicePath::from_string("region-a.cluster-a.10.0.0.2-dpu0").unwrap() - ); assert_eq!( config.peers[0].endpoint, "10.0.0.2:8000".parse().expect("not expecting error") ); assert_eq!(config.peers[0].conn_type, ConnectionType::InCluster); - assert_eq!( - config.peers[1].id, - ServicePath::from_string("region-a.cluster-a.10.0.0.3-dpu0").unwrap() - ); assert_eq!( config.peers[1].endpoint, "10.0.0.3:8000".parse().expect("not expecting error") diff --git a/crates/swbus-core/src/mux/conn.rs b/crates/swbus-core/src/mux/conn.rs index 8a996bf..0714c67 100644 --- a/crates/swbus-core/src/mux/conn.rs +++ b/crates/swbus-core/src/mux/conn.rs @@ -62,7 +62,7 @@ impl SwbusConn { // Client-side connection factory and task entry impl SwbusConn { pub async fn connect( - conn_info: Arc, + conn_info: &SwbusConnInfo, mux: Arc, conn_store: Arc, ) -> Result { @@ -85,16 +85,16 @@ impl SwbusConn { } async fn start_client_worker_task( - conn_info: Arc, + conn_info: &SwbusConnInfo, mut client: SwbusServiceClient, mux: Arc, conn_store: Arc, ) -> Result { let (send_queue_tx, send_queue_rx) = mpsc::channel(16); - let mut conn = SwbusConn::new(&conn_info, send_queue_tx); - let request_stream = ReceiverStream::new(send_queue_rx) - .map(|result| result.expect("Not expecting grpc client adding messages with error status")); + let request_stream = ReceiverStream::new(send_queue_rx).map(|result: Result| { + result.expect("Not expecting grpc client adding messages with error status") + }); let mut stream_message_request = Request::new(request_stream); @@ -111,8 +111,43 @@ impl SwbusConn { MetadataValue::from_str(conn_info.connection_type().as_str_name()).unwrap(), ); - let incoming_stream = match client.stream_messages(stream_message_request).await { - Ok(response) => response.into_inner(), + let (incoming_stream, conn_info_for_worker) = match client.stream_messages(stream_message_request).await { + Ok(response) => { + // Extract server service path from response metadata and update remote_service_path + let server_service_path = match response.metadata().get(SWBUS_SERVER_SERVICE_PATH) { + Some(path) => match ServicePath::from_string(path.to_str().unwrap()) { + Ok(service_path) => { + info!("Received server service path: {}", service_path.to_string()); + service_path + } + Err(e) => { + error!("Failed to parse server service path: {:?}", e); + return Err(SwbusError::connection( + SwbusErrorCode::InvalidHeader, + io::Error::new( + io::ErrorKind::InvalidData, + format!("Invalid server service path: {:?}", e), + ), + )); + } + }, + None => { + error!("Server service path not found in response metadata"); + return Err(SwbusError::connection( + SwbusErrorCode::InvalidHeader, + io::Error::new( + io::ErrorKind::InvalidData, + "Server service path not found in response metadata", + ), + )); + } + }; + + // Update conn_info's remote_service_path with actual server service path + let updated_conn_info = Arc::new(conn_info.clone().with_remote_service_path(server_service_path)); + + (response.into_inner(), updated_conn_info) + } Err(e) => { error!("Failed to establish message streaming: {}.", e); return Err(SwbusError::connection( @@ -121,8 +156,7 @@ impl SwbusConn { )); } }; - - let conn_info_for_worker = conn.info().clone(); + let mut conn = SwbusConn::new(&conn_info_for_worker, send_queue_tx); let shutdown_ct_for_worker = conn.shutdown_ct.clone(); let worker_task = tokio::spawn(async move { diff --git a/crates/swbus-core/src/mux/conn_info.rs b/crates/swbus-core/src/mux/conn_info.rs index 8e93f53..38b2ffe 100644 --- a/crates/swbus-core/src/mux/conn_info.rs +++ b/crates/swbus-core/src/mux/conn_info.rs @@ -24,21 +24,17 @@ pub struct SwbusConnInfo { connection_type: ConnectionType, #[getset(get = "pub")] - remote_service_path: ServicePath, + remote_service_path: Option, } impl SwbusConnInfo { - pub fn new_client( - conn_type: ConnectionType, - remote_addr: SocketAddr, - remote_service_path: ServicePath, - ) -> SwbusConnInfo { + pub fn new_client(conn_type: ConnectionType, remote_addr: SocketAddr) -> SwbusConnInfo { SwbusConnInfo { id: format!("swbs-to://{}:{}", remote_addr.ip(), remote_addr.port()), mode: SwbusConnMode::Client, remote_addr, connection_type: conn_type, - remote_service_path, + remote_service_path: None, } } @@ -52,9 +48,14 @@ impl SwbusConnInfo { mode: SwbusConnMode::Server, remote_addr, connection_type: conn_type, - remote_service_path, + remote_service_path: Some(remote_service_path), } } + + pub fn with_remote_service_path(mut self, remote_service_path: ServicePath) -> Self { + self.remote_service_path = Some(remote_service_path); + self + } } #[cfg(test)] @@ -65,14 +66,13 @@ mod tests { #[test] fn new_client_conn_info_should_succeed() { let remote_addr = "127.0.0.1:8080".parse().unwrap(); - let remote_service_path = ServicePath::from_string("region-a.cluster-a.10.0.0.1-dpu0").unwrap(); - let conn_info = SwbusConnInfo::new_client(ConnectionType::InCluster, remote_addr, remote_service_path.clone()); + let conn_info = SwbusConnInfo::new_client(ConnectionType::InCluster, remote_addr); assert_eq!(conn_info.id(), "swbs-to://127.0.0.1:8080"); assert_eq!(conn_info.mode(), SwbusConnMode::Client); assert_eq!(conn_info.remote_addr(), remote_addr); assert_eq!(conn_info.connection_type(), ConnectionType::InCluster); - assert_eq!(conn_info.remote_service_path(), &remote_service_path); + assert_eq!(conn_info.remote_service_path(), &None); } #[test] @@ -85,6 +85,6 @@ mod tests { assert_eq!(conn_info.mode(), SwbusConnMode::Server); assert_eq!(conn_info.remote_addr(), remote_addr); assert_eq!(conn_info.connection_type(), ConnectionType::InRegion); - assert_eq!(conn_info.remote_service_path(), &remote_service_path); + assert_eq!(conn_info.remote_service_path(), &Some(remote_service_path)); } } diff --git a/crates/swbus-core/src/mux/conn_store.rs b/crates/swbus-core/src/mux/conn_store.rs index 8604050..15d49aa 100644 --- a/crates/swbus-core/src/mux/conn_store.rs +++ b/crates/swbus-core/src/mux/conn_store.rs @@ -17,7 +17,7 @@ enum ConnTracker { pub struct SwbusConnStore { mux: Arc, - connections: DashMap, ConnTracker>, + connections: DashMap, } impl SwbusConnStore { @@ -29,8 +29,7 @@ impl SwbusConnStore { } #[instrument(skip(self, conn_info), fields(conn_id=conn_info.id()))] - fn start_connect_task(self: &Arc, conn_info: Arc, reconnect: bool) { - let conn_info_clone = conn_info.clone(); + fn start_connect_task(self: &Arc, conn_info: &SwbusConnInfo, reconnect: bool) { info!("Starting connection task to the peer"); let retry_interval = match reconnect { true => Duration::from_millis(1), @@ -42,13 +41,14 @@ impl SwbusConnStore { let token = CancellationToken::new(); let child_token = token.clone(); + let conn_info_clone = conn_info.clone(); tokio::spawn( async move { loop { if child_token.is_cancelled() { return; } - match SwbusConn::connect(conn_info.clone(), mux_clone.clone(), conn_store.clone()).await { + match SwbusConn::connect(&conn_info_clone, mux_clone.clone(), conn_store.clone()).await { Ok(conn) => { info!("Successfully connect to the peer"); // register the new connection and update the route table @@ -63,21 +63,18 @@ impl SwbusConnStore { } .instrument(current_span.clone()), ); - self.connections.insert(conn_info_clone, ConnTracker::Task(token)); + self.connections + .insert(conn_info.id().to_string(), ConnTracker::Task(token)); } pub fn add_peer(self: &Arc, peer: PeerConfig) { - let conn_info = Arc::new(SwbusConnInfo::new_client( - peer.conn_type, - peer.endpoint, - peer.id.clone(), - )); - self.start_connect_task(conn_info, false); + let conn_info = SwbusConnInfo::new_client(peer.conn_type, peer.endpoint); + self.start_connect_task(&conn_info, false); } - pub fn conn_lost(self: &Arc, conn_info: Arc) { + pub fn conn_lost(self: &Arc, conn_info: &SwbusConnInfo) { // First, we remove the connection from the connection table. - self.connections.remove(&conn_info); + self.connections.remove(conn_info.id()); // If connection is client mode, we start a new connection task. if conn_info.mode() == SwbusConnMode::Client { @@ -88,7 +85,7 @@ impl SwbusConnStore { pub fn conn_established(&self, conn: SwbusConn) { self.mux.register(conn.info(), conn.new_proxy()); self.connections - .insert(conn.info().clone(), ConnTracker::SwbusConn(conn)); + .insert(conn.info().id().to_string(), ConnTracker::SwbusConn(conn)); } pub async fn shutdown(&self) { @@ -124,7 +121,6 @@ mod tests { let peer_config = PeerConfig { conn_type: ConnectionType::InNode, endpoint: "127.0.0.1:8080".to_string().parse().unwrap(), - id: ServicePath::from_string("region-a.cluster-a.10.0.0.2-dpu0").unwrap(), }; let route_config = RouteConfig { key: ServicePath::from_string("region-a.cluster-a.10.0.0.1-dpu0").unwrap(), @@ -137,7 +133,7 @@ mod tests { conn_store.add_peer(peer_config); assert!(conn_store.connections.iter().any(|entry| { - entry.key().id() == "swbs-to://127.0.0.1:8080" && matches!(entry.value(), ConnTracker::Task(_)) + entry.key() == "swbs-to://127.0.0.1:8080" && matches!(entry.value(), ConnTracker::Task(_)) })); } @@ -150,15 +146,11 @@ mod tests { let mux = Arc::new(SwbusMultiplexer::new(vec![route_config])); let conn_store = Arc::new(SwbusConnStore::new(mux.clone())); - let conn_info = Arc::new(SwbusConnInfo::new_client( - ConnectionType::InCluster, - "127.0.0.1:8080".parse().unwrap(), - ServicePath::from_string("regiona.clustera.10.0.0.2-dpu0").unwrap(), - )); - conn_store.conn_lost(conn_info.clone()); + let conn_info = SwbusConnInfo::new_client(ConnectionType::InCluster, "127.0.0.1:8080".parse().unwrap()); + conn_store.conn_lost(&conn_info); assert!(conn_store.connections.iter().any(|entry| { - entry.key().id() == "swbs-to://127.0.0.1:8080" && matches!(entry.value(), ConnTracker::Task(_)) + entry.key() == "swbs-to://127.0.0.1:8080" && matches!(entry.value(), ConnTracker::Task(_)) })); } @@ -172,11 +164,10 @@ mod tests { let mux = Arc::new(SwbusMultiplexer::new(vec![route_config])); let conn_store = Arc::new(SwbusConnStore::new(mux.clone())); - let conn_info = Arc::new(SwbusConnInfo::new_client( - ConnectionType::InCluster, - "127.0.0.1:8080".parse().unwrap(), - ServicePath::from_string("regiona.clustera.10.0.0.2-dpu0").unwrap(), - )); + let mut conn_info = SwbusConnInfo::new_client(ConnectionType::InCluster, "127.0.0.1:8080".parse().unwrap()); + conn_info = + conn_info.with_remote_service_path(ServicePath::from_string("region-a.cluster-a.10.0.0.2-dpu0").unwrap()); + let conn_info = Arc::new(conn_info); let (send_queue_tx, _) = mpsc::channel(16); let conn = SwbusConn::new(&conn_info, send_queue_tx); conn_store.conn_established(conn); @@ -184,6 +175,6 @@ mod tests { assert!(conn_store .connections .iter() - .any(|entry| entry.key().id() == conn_info.id() && matches!(entry.value(), ConnTracker::SwbusConn(_)))); + .any(|entry| entry.key() == conn_info.id() && matches!(entry.value(), ConnTracker::SwbusConn(_)))); } } diff --git a/crates/swbus-core/src/mux/conn_worker.rs b/crates/swbus-core/src/mux/conn_worker.rs index 7a38ed2..888211e 100644 --- a/crates/swbus-core/src/mux/conn_worker.rs +++ b/crates/swbus-core/src/mux/conn_worker.rs @@ -36,6 +36,10 @@ where mux: Arc, conn_store: Arc, ) -> Self { + assert!( + info.remote_service_path().is_some(), + "remote_service_path must be set before creating SwbusConnWorker" + ); Self { info, shutdown_ct, @@ -59,7 +63,7 @@ where self.unregister_from_mux()?; if result.is_err() { info!("Reporting connection lost."); - self.conn_store.conn_lost(self.info.clone()); + self.conn_store.conn_lost(&self.info); } result } @@ -148,9 +152,15 @@ where } } Some(swbus_message::Body::RouteAnnouncement(route_entries)) => { - // drop route announcement message - debug!("Received route announcement"); - self.mux.process_route_announcement(route_entries, &self.info)?; + let my_sp = self.mux.get_my_service_path(); + if message.header.as_ref().unwrap().destination.as_ref().unwrap() == my_sp { + // Message is destined for this service, process it locally + debug!("Received route announcement"); + self.mux.process_route_announcement(route_entries, &self.info)?; + } else { + // drop route announcement message + debug!("Dropping route announcement not destined for me"); + } } Some(swbus_message::Body::ManagementRequest(ref mgmt_request)) => { let my_sp = self.mux.get_my_service_path(); @@ -264,7 +274,7 @@ mod tests { let mux = Arc::new(SwbusMultiplexer::new(vec![route_config])); let conn_store = Arc::new(SwbusConnStore::new(mux.clone())); - let conn_info = Arc::new(SwbusConnInfo::new_client( + let conn_info = Arc::new(SwbusConnInfo::new_server( ConnectionType::InCluster, "127.0.0.1:8080".parse().unwrap(), ServicePath::from_string("regiona.clustera.10.0.0.2-dpu0").unwrap(), @@ -301,7 +311,7 @@ mod tests { let mux = Arc::new(SwbusMultiplexer::new(vec![route_config])); let conn_store = Arc::new(SwbusConnStore::new(mux.clone())); - let conn_info = Arc::new(SwbusConnInfo::new_client( + let conn_info = Arc::new(SwbusConnInfo::new_server( ConnectionType::InCluster, "127.0.0.1:8080".parse().unwrap(), ServicePath::from_string("regiona.clustera.10.0.0.2-dpu0").unwrap(), @@ -328,7 +338,7 @@ mod tests { let mux = Arc::new(SwbusMultiplexer::new(vec![route_config])); let conn_store = Arc::new(SwbusConnStore::new(mux.clone())); - let conn_info = Arc::new(SwbusConnInfo::new_client( + let conn_info = Arc::new(SwbusConnInfo::new_server( ConnectionType::InCluster, "127.0.0.1:8080".parse().unwrap(), ServicePath::from_string("regiona.clustera.10.0.0.2-dpu0").unwrap(), diff --git a/crates/swbus-core/src/mux/multiplexer.rs b/crates/swbus-core/src/mux/multiplexer.rs index 2fd463e..63954df 100644 --- a/crates/swbus-core/src/mux/multiplexer.rs +++ b/crates/swbus-core/src/mux/multiplexer.rs @@ -153,19 +153,35 @@ impl SwbusMultiplexer { /// get route from conn_info based on connection type. This is a direct route which means it is to a immediate nexthop (1 hop away). fn route_from_conn(&self, conn_info: &Arc) -> String { + let remote_sp = conn_info + .remote_service_path() + .as_ref() + .expect("remote_service_path should be set"); match conn_info.connection_type() { // direct route Global, InRegion, InCluster connection type is always to node level ConnectionType::Global | ConnectionType::InRegion | ConnectionType::InCluster => { - conn_info.remote_service_path().to_incluster_prefix() + remote_sp.to_incluster_prefix() } // both Client (for CLI) and InNode route by service prefix. IOW, service prefix uniquely // identify the endpoint to the client (CLI or hamgrd) - ConnectionType::InNode | ConnectionType::Client => conn_info.remote_service_path().to_service_prefix(), + ConnectionType::InNode | ConnectionType::Client => remote_sp.to_service_prefix(), } } pub(crate) fn register(&self, conn_info: &Arc, proxy: SwbusConnProxy) { // Update the route table. + let remote_sp = conn_info + .remote_service_path() + .as_ref() + .expect("remote_service_path should be set"); + if self.my_routes.contains_key(remote_sp) { + error!( + "Conflicting service path to my routes detected from connection: {}", + conn_info.id() + ); + return; + } + let direct_route = self.route_from_conn(conn_info); let nexthop = SwbusNextHop::new_remote(conn_info.clone(), proxy, 1); @@ -205,7 +221,7 @@ impl SwbusMultiplexer { match Self::key_from_route_entry(entry) { Ok(route_key) => { let old_nh = dummy_nh.clone_with_hop_count(entry.hop_count + 1); - let route_removed = self.remove_route_to_nh(&route_key, &old_nh); + let (_, route_removed) = self.remove_route_to_nh(&route_key, &old_nh); if route_removed { need_announce = true; } @@ -217,7 +233,7 @@ impl SwbusMultiplexer { } // remove the direct route - let route_removed = self.remove_route_to_nh(&direct_route, &dummy_nh); + let (_, route_removed) = self.remove_route_to_nh(&direct_route, &dummy_nh); if route_removed { need_announce = true; } @@ -236,34 +252,42 @@ impl SwbusMultiplexer { } } - // Update route for the give service path with the new nexthop. If the route is updated, return true. - // Otherwise, return false. + // Update route for the give service path with the new nexthop. It returns a tuple of (nh_added, route_added). + // if inserted is true, the nexthop is newly inserted or updated (hop count changed). + // if route_added is true, the route entry is newly created. #[instrument(name = "update_route", level = "info", skip(self, nexthop), fields(nh_type=?nexthop.nh_type(), hop_count=nexthop.hop_count(), conn_info=nexthop.conn_info().as_ref().map(|x| x.id()).unwrap_or(&"None".to_string())))] - fn update_route(&self, route_key: String, nexthop: SwbusNextHop) -> bool { + fn update_route(&self, route_key: String, nexthop: SwbusNextHop) -> (bool, bool) { // If route entry doesn't exist, we insert the next hop as a new one. - let inserted = self.routes.entry(route_key).or_default().insert(nexthop.clone()); - info!("Update route entry: inserted={}", inserted); - inserted + let mut route_added = false; + if self.routes.get(&route_key).is_none() { + route_added = true; + } + let nh_added = self.routes.entry(route_key).or_default().insert(nexthop.clone()); + info!("Update route entry: nh_added={nh_added}, route_added={route_added}"); + (nh_added, route_added) } /// remove a specified route and to the specified nexthop + /// It returns a tuple of (nh_removed, route_removed). + /// if nh_removed is true, the nexthop is removed. + /// if route_removed is true, the route entry is completely removed. #[instrument(name = "remove_route_to_nh", level = "info", skip(self, nexthop), fields(nh_type=?nexthop.nh_type(), hop_count=nexthop.hop_count(), conn_info=nexthop.conn_info().as_ref().map(|x| x.id()).unwrap_or(&"None".to_string())))] - fn remove_route_to_nh(&self, route_key: &str, nexthop: &SwbusNextHop) -> bool { - let mut removed = false; - let mut remove_all = false; + fn remove_route_to_nh(&self, route_key: &str, nexthop: &SwbusNextHop) -> (bool, bool) { + let mut nh_removed = false; + let mut route_removed = false; if let Some(mut entry) = self.routes.get_mut(route_key) { - removed = entry.remove(nexthop); + nh_removed = entry.remove(nexthop); if entry.is_empty() { - remove_all = true; + route_removed = true; // can't remove route here because we are holding the lock on the entry. } } - if remove_all { + if route_removed { self.routes.remove(route_key); } - info!("Remove route entry: removed={}", removed); - removed + info!("Remove route entry: nh_removed={nh_removed}, route_removed={route_removed}"); + (nh_removed, route_removed) } /// remove a specified route @@ -284,7 +308,7 @@ impl SwbusMultiplexer { SwbusError::internal( SwbusErrorCode::Fail, format!( - "Receive route announcement from {} but route to the connection {} is not found ", + "Receive route announcement from {:?} but route to the connection {} is not found ", conn_info.remote_service_path(), direct_route ), @@ -302,7 +326,7 @@ impl SwbusMultiplexer { Err(SwbusError::internal( SwbusErrorCode::Fail, format!( - "Receive route announcement from {} but nh of the route is not from same connection: {}", + "Receive route announcement from {:?} but nh of the route is not from same connection: {}", conn_info.remote_service_path(), direct_route ), @@ -337,7 +361,7 @@ impl SwbusMultiplexer { conn_info: &Arc, ) -> Result<()> { debug!( - "Begin processing route announcement from {}", + "Begin processing route announcement from {:?}", conn_info.remote_service_path() ); @@ -350,7 +374,7 @@ impl SwbusMultiplexer { for entry in routes.entries.into_iter() { if entry.service_path.is_none() { error!( - "Received route announcement with missing service path from {}", + "Received route announcement with missing service path from {:?}", conn_info.remote_service_path() ); continue; @@ -377,7 +401,7 @@ impl SwbusMultiplexer { } }; let old_nh = nh.clone_with_hop_count(entry.hop_count + 1); - let route_removed = self.remove_route_to_nh(&route_key, &old_nh); + let (_, route_removed) = self.remove_route_to_nh(&route_key, &old_nh); if route_removed { need_announce = true; } @@ -394,8 +418,8 @@ impl SwbusMultiplexer { }; let new_nh = nh.clone_with_hop_count(entry.hop_count + 1); - let route_changed = self.update_route(route_key, new_nh); - if route_changed { + let (_, route_added) = self.update_route(route_key, new_nh); + if route_added { need_announce = true; } } @@ -406,7 +430,7 @@ impl SwbusMultiplexer { } *old_routes = new_routes; debug!( - "Finished processing route announcement from {}", + "Finished processing route announcement from {:?}", conn_info.remote_service_path() ); Ok(()) @@ -547,7 +571,7 @@ impl SwbusMultiplexer { ), hop_count: nh.hop_count(), nh_id: nh.conn_info().as_ref().unwrap().id().to_string(), - nh_service_path: Some(nh.conn_info().as_ref().unwrap().remote_service_path().clone()), + nh_service_path: nh.conn_info().as_ref().unwrap().remote_service_path().clone(), route_scope: ServicePath::from_string(entry.key()).unwrap().route_scope() as i32, }) .collect::>() @@ -680,7 +704,7 @@ pub mod test_utils { sp: &str, nh_endpoint: &str, ) -> (SwbusConn, mpsc::Receiver>) { - let conn_info = Arc::new(SwbusConnInfo::new_client( + let conn_info = Arc::new(SwbusConnInfo::new_server( conn_type, nh_endpoint.parse().unwrap(), ServicePath::from_string(sp).unwrap(), @@ -696,12 +720,13 @@ pub mod test_utils { conn: &SwbusConn, ) -> Option { let nexthop_nh1 = SwbusNextHop::new_remote(conn.info().clone(), conn.new_proxy(), hop_count); - if mux.update_route(route_key.to_string(), nexthop_nh1) { + let (nh_added, _) = mux.update_route(route_key.to_string(), nexthop_nh1); + if nh_added { Some(RouteEntry { service_path: Some(ServicePath::from_string(route_key).unwrap()), hop_count, nh_id: conn.info().id().to_string(), - nh_service_path: Some(conn.info().remote_service_path().clone()), + nh_service_path: conn.info().remote_service_path().clone(), route_scope: ServicePath::from_string(route_key).unwrap().route_scope() as i32, }) } else { @@ -1160,28 +1185,28 @@ mod tests { add_route(&mux, "region-a.cluster-a.node1", 1, &conn1).unwrap(); let conn1_nh = SwbusNextHop::new_remote(conn1.info().clone(), conn1.new_proxy(), 1); - // create some route entries + // create some route entries for node 1 // route entry matches my routes. Should be skipped - let node1_re1 = new_route_entry_for_ra("region-a.cluster-a.node1", 0); + let n1_node1_re1 = new_route_entry_for_ra("region-a.cluster-a.node1", 0); // negative test: route announcement with scope lower than InCluster - let node1_re2 = new_route_entry_for_ra("region-a.cluster-a.node1/ha/0", 1); - let node2_re1 = new_route_entry_for_ra("region-a.cluster-a.node2", 1); - let clusterb_re1 = new_route_entry_for_ra("region-a.cluster-b", 2); + let n1_node1_re2 = new_route_entry_for_ra("region-a.cluster-a.node1/ha/0", 1); + let n1_node2_re1 = new_route_entry_for_ra("region-a.cluster-a.node2", 1); + let n1_clusterb_re1 = new_route_entry_for_ra("region-a.cluster-b", 2); - // Step1: process a route announcement + // Step1: process a route announcement from node1 let routes = RouteEntries { entries: vec![ - node1_re1.clone(), - node1_re2.clone(), - node2_re1.clone(), - clusterb_re1.clone(), + n1_node1_re1.clone(), + n1_node1_re2.clone(), + n1_node2_re1.clone(), + n1_clusterb_re1.clone(), ], }; let expected_route_entries = BTreeSet::from_iter(vec![ - node1_re1.clone(), - node1_re2.clone(), - node2_re1.clone(), - clusterb_re1.clone(), + n1_node1_re1.clone(), + n1_node1_re2.clone(), + n1_node2_re1.clone(), + n1_clusterb_re1.clone(), ]); mux.process_route_announcement(routes.clone(), conn1.info()).unwrap(); // Expect: @@ -1196,15 +1221,15 @@ mod tests { BTreeSet::from([SwbusNextHop::new_local()]), ); expected.insert( - SwbusMultiplexer::key_from_route_entry(&node1_re1).unwrap(), + SwbusMultiplexer::key_from_route_entry(&n1_node1_re1).unwrap(), BTreeSet::from([conn1_nh.clone_with_hop_count(1)]), ); expected.insert( - SwbusMultiplexer::key_from_route_entry(&node2_re1).unwrap(), + SwbusMultiplexer::key_from_route_entry(&n1_node2_re1).unwrap(), BTreeSet::from([conn1_nh.clone_with_hop_count(2)]), ); expected.insert( - SwbusMultiplexer::key_from_route_entry(&clusterb_re1).unwrap(), + SwbusMultiplexer::key_from_route_entry(&n1_clusterb_re1).unwrap(), BTreeSet::from([conn1_nh.clone_with_hop_count(3)]), ); @@ -1223,7 +1248,7 @@ mod tests { assert_eq!(*routes_by_conn, expected_route_entries); } - // Step 2: process a route announcement with the same routes + // Step 2: process a route announcement with the same routes from node1 mux.process_route_announcement(routes.clone(), conn1.info()).unwrap(); // Expect: // 1. no route is updated @@ -1240,16 +1265,16 @@ mod tests { // process a route announcement let routes = RouteEntries { entries: vec![ - node1_re1.clone(), - node1_re2.clone(), + n1_node1_re1.clone(), + n1_node1_re2.clone(), node2_re1_changed.clone(), clusterb_re1.clone(), node3_re1.clone(), ], }; let expected_route_entries = BTreeSet::from_iter(vec![ - node1_re1.clone(), - node1_re2.clone(), + n1_node1_re1.clone(), + n1_node1_re2.clone(), node2_re1_changed.clone(), clusterb_re1.clone(), node3_re1.clone(), @@ -1261,7 +1286,7 @@ mod tests { BTreeSet::from([SwbusNextHop::new_local()]), ); expected.insert( - SwbusMultiplexer::key_from_route_entry(&node1_re1).unwrap(), + SwbusMultiplexer::key_from_route_entry(&n1_node1_re1).unwrap(), BTreeSet::from([conn1_nh.clone_with_hop_count(1)]), ); expected.insert( @@ -1297,6 +1322,74 @@ mod tests { let routes_by_conn = mux.routes_by_conn.get(conn1.info()).unwrap(); assert_eq!(*routes_by_conn, expected_route_entries); } + let expected_before_node2 = expected.clone(); + // Step 4: process a route announcement from node2 + // add a direct route to the node2 + let (conn2, _) = + new_conn_for_test_with_endpoint(ConnectionType::InCluster, "region-a.cluster-a.node2", "127.0.0.1:8081"); + add_route(&mux, "region-a.cluster-a.node2", 1, &conn2).unwrap(); + let conn2_nh = SwbusNextHop::new_remote(conn2.info().clone(), conn2.new_proxy(), 1); + + // create some route entries for node 2 + let n2_node1_re1 = new_route_entry_for_ra("region-a.cluster-a.node1", 1); + let n2_clusterb_re1 = new_route_entry_for_ra("region-a.cluster-b", 2); + + let routes = RouteEntries { + entries: vec![n2_node1_re1.clone(), n2_clusterb_re1.clone()], + }; + mux.process_route_announcement(routes.clone(), conn2.info()).unwrap(); + + // Expect: + // 1. no route anouncement task is created as no new route is added + // 2. route_entries are updated with new paths + expected + .entry(SwbusMultiplexer::key_from_route_entry(&n1_node2_re1).unwrap()) + .or_default() + .insert(conn2_nh.clone_with_hop_count(1)); + expected + .entry(SwbusMultiplexer::key_from_route_entry(&n2_node1_re1).unwrap()) + .or_default() + .insert(conn2_nh.clone_with_hop_count(2)); + expected + .entry(SwbusMultiplexer::key_from_route_entry(&n2_clusterb_re1).unwrap()) + .or_default() + .insert(conn2_nh.clone_with_hop_count(3)); + let actual: HashMap> = mux + .routes + .iter() + .map(|entry| (entry.key().clone(), entry.value().clone())) + .collect(); + assert_eq!(actual, expected); + assert_eq!( + route_announce_task_rx.try_recv(), + Err(tokio::sync::mpsc::error::TryRecvError::Empty) + ); + + // Step 5: process a route announcement with routes removed from node2 + // Expect: + // 1. no route anouncement task is created as only paths are removed but no route is removed + // 2. route_entries are updated with paths removed + let routes = RouteEntries { entries: vec![] }; + mux.process_route_announcement(routes.clone(), conn2.info()).unwrap(); + + let mut expected = expected_before_node2.clone(); + expected + .entry(SwbusMultiplexer::key_from_route_entry(&n1_node2_re1).unwrap()) + .or_default() + .insert(conn2_nh.clone_with_hop_count(1)); + + let actual: HashMap> = mux + .routes + .iter() + .map(|entry| (entry.key().clone(), entry.value().clone())) + .collect(); + + assert_eq!(actual, expected); + + assert_eq!( + route_announce_task_rx.try_recv(), + Err(tokio::sync::mpsc::error::TryRecvError::Empty) + ); } #[tokio::test] @@ -1312,7 +1405,7 @@ mod tests { add_route(&mux, "region-a.cluster-a.10.0.0.1-dpu0", 1, &conn1).unwrap(); // add the same route with a different connection - let conn_info = Arc::new(SwbusConnInfo::new_client( + let conn_info = Arc::new(SwbusConnInfo::new_server( ConnectionType::InCluster, "127.0.0.1:8081".parse().unwrap(), ServicePath::from_string("region-a.cluster-a.10.0.0.1-dpu0").unwrap(), @@ -1365,9 +1458,9 @@ mod tests { add_route(&mux, route_key, 1, &conn2).unwrap(); let nexthop2 = SwbusNextHop::new_remote(conn2.info().clone(), conn2.new_proxy(), 1); - assert!(mux.remove_route_to_nh(route_key, &nexthop1)); + assert!(mux.remove_route_to_nh(route_key, &nexthop1).0); assert!(mux.routes.contains_key(route_key)); - assert!(mux.remove_route_to_nh(route_key, &nexthop2)); + assert!(mux.remove_route_to_nh(route_key, &nexthop2).0); assert!(!mux.routes.contains_key(route_key)); // Ensure route is removed when empty } @@ -1395,7 +1488,8 @@ mod tests { ); let nexthop2 = SwbusNextHop::new_remote(conn2.info().clone(), conn2.new_proxy(), 1); - assert!(!mux.remove_route_to_nh(route_key, &nexthop2)); + let (nh_remove, route_removed) = mux.remove_route_to_nh(route_key, &nexthop2); + assert!(!nh_remove && !route_removed); assert!(mux.routes.contains_key(route_key)); // Ensure route is not removed } @@ -1501,4 +1595,30 @@ mod tests { ); assert!(!mux.connections.contains(conn1.info())); } + + #[test] + fn test_register_with_conflict_sp() { + // create mux + // create a mux with my routes and add a dummy route announcer + // create a mux with 2 my-routes + let my_route1 = RouteConfig { + key: ServicePath::from_string("region-a.cluster-a.node0").unwrap(), + scope: RouteScope::InCluster, + }; + let mut mux = SwbusMultiplexer::new(vec![my_route1.clone()]); + let (route_announce_task_tx, _) = mpsc::channel(16); + mux.set_route_announcer(route_announce_task_tx, CancellationToken::new()); + let mux = Arc::new(mux); + + // register a connection + let (conn1, _) = new_conn_for_test(ConnectionType::InCluster, "region-a.cluster-a.node0"); + mux.register(conn1.info(), conn1.new_proxy()); + let route_key = mux.route_from_conn(conn1.info()); + // make the conflicting route is not added + if let Some(nh_set) = mux.routes.get(&route_key) { + assert!(!nh_set + .iter() + .any(|nh| nh.nh_type() == NextHopType::Remote && nh.conn_info().as_ref().unwrap() == conn1.info())); + }; + } } diff --git a/crates/swbus-core/src/mux/nexthop.rs b/crates/swbus-core/src/mux/nexthop.rs index 3efe52c..f8f9768 100644 --- a/crates/swbus-core/src/mux/nexthop.rs +++ b/crates/swbus-core/src/mux/nexthop.rs @@ -114,7 +114,7 @@ impl SwbusNextHop { NextHopType::Remote => { let header: &mut SwbusMessageHeader = message.header.as_mut().expect("missing header"); // should not happen otherwise it won't reach here header.ttl -= 1; - if header.ttl == 0 { + if header.ttl < self.hop_count { debug!("TTL expired"); let response = SwbusMessage::new_response( message.header.as_ref().unwrap(), @@ -195,7 +195,7 @@ mod tests { #[tokio::test] async fn test_new_remote() { - let conn_info = Arc::new(SwbusConnInfo::new_client( + let conn_info = Arc::new(SwbusConnInfo::new_server( ConnectionType::InCluster, "127.0.0.1:8080".parse().unwrap(), ServicePath::from_string("regiona.clustera.10.0.0.2-dpu0").unwrap(), @@ -280,7 +280,7 @@ mod tests { #[tokio::test] async fn test_queue_message_remote_ttl_expired() { - let conn_info = Arc::new(SwbusConnInfo::new_client( + let conn_info = Arc::new(SwbusConnInfo::new_server( ConnectionType::InCluster, "127.0.0.1:8080".parse().unwrap(), ServicePath::from_string("regiona.clustera.10.0.0.2-dpu0").unwrap(), @@ -328,7 +328,7 @@ mod tests { #[test] fn test_clone_with_hop_count() { - let conn_info = Arc::new(SwbusConnInfo::new_client( + let conn_info = Arc::new(SwbusConnInfo::new_server( ConnectionType::InCluster, "127.0.0.1:8080".parse().unwrap(), ServicePath::from_string("regiona.clustera.10.0.0.2-dpu0").unwrap(), diff --git a/crates/swbus-core/src/mux/route_annoucer.rs b/crates/swbus-core/src/mux/route_annoucer.rs index 7ad9097..2ffe75e 100644 --- a/crates/swbus-core/src/mux/route_annoucer.rs +++ b/crates/swbus-core/src/mux/route_annoucer.rs @@ -29,7 +29,7 @@ impl std::fmt::Display for RouteAnnounceTask { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!( f, - "trigger: {:?}, conn_info: {}", + "trigger: {:?}, conn_info: {:?}", self.trigger, self.conn_info.remote_service_path() ) @@ -88,7 +88,7 @@ impl RouteAnnouncer { .await .map_err(|e| { error!( - "Failed to send route announcement to {}: {}", + "Failed to send route announcement to {:?}: {}", conn_info.remote_service_path(), e ); @@ -98,8 +98,12 @@ impl RouteAnnouncer { #[instrument(name = "send_route_announcement", level = "debug", skip_all)] async fn send_route_announcement(&self, conn_info: &SwbusConnInfo, routes: &RouteEntries) -> Result<()> { - let dest_sp = conn_info.remote_service_path().clone(); - let msg = SwbusMessage { + let dest_sp = conn_info + .remote_service_path() + .as_ref() + .expect("remote_service_path should be set") + .clone(); + let mut msg = SwbusMessage { header: Some(SwbusMessageHeader::new( self.mux.get_my_service_path().clone(), dest_sp, @@ -107,8 +111,10 @@ impl RouteAnnouncer { )), body: Some(swbus_message::Body::RouteAnnouncement(routes.clone())), }; + // count itself as 1 hop. The message is only meant for direct neighbors + msg.header.as_mut().unwrap().ttl = 2; debug!( - "Sending route announcement to {}, conn_info {:?}, message {:?}", + "Sending route announcement to {:?}, conn_info {:?}, message {:?}", conn_info.remote_service_path(), conn_info, &msg @@ -167,7 +173,7 @@ mod tests { "header": { "version": 1, "flag": 0, - "ttl": 63, + "ttl": 1, "source": "region-a.cluster-a.node0", "destination": "region-a.cluster-a.node1" }, @@ -211,7 +217,7 @@ mod tests { "header": { "version": 1, "flag": 0, - "ttl": 63, + "ttl": 1, "source": "region-a.cluster-a.node0", "destination": "region-a.cluster-a.node2" }, diff --git a/crates/swbus-core/src/mux/service.rs b/crates/swbus-core/src/mux/service.rs index 22715f9..07b4d9d 100644 --- a/crates/swbus-core/src/mux/service.rs +++ b/crates/swbus-core/src/mux/service.rs @@ -173,6 +173,14 @@ impl SwbusService for SwbusServiceHost { .await; self.conn_store.as_ref().unwrap().conn_established(conn); let out_stream = ReceiverStream::new(out_rx); - Ok(Response::new(Box::pin(out_stream) as Self::StreamMessagesStream)) + + // Send server service path in response metadata + let mut response = Response::new(Box::pin(out_stream) as Self::StreamMessagesStream); + let server_service_path = self.mux.as_ref().unwrap().get_my_service_path().to_string(); + response + .metadata_mut() + .insert(SWBUS_SERVER_SERVICE_PATH, server_service_path.parse().unwrap()); + + Ok(response) } } diff --git a/crates/swbus-core/tests/data/b2b/topo.json b/crates/swbus-core/tests/data/b2b/topo.json index 73089ab..27ef330 100644 --- a/crates/swbus-core/tests/data/b2b/topo.json +++ b/crates/swbus-core/tests/data/b2b/topo.json @@ -11,7 +11,6 @@ ], "peers": [ { - "id": "region-a.cluster-a.10.0.0.2-dpu0", "endpoint": "127.0.0.1:60002", "conn_type": "InCluster" } @@ -27,7 +26,6 @@ ], "peers": [ { - "id": "region-a.cluster-a.10.0.0.1-dpu0", "endpoint": "127.0.0.1:60001", "conn_type": "InCluster" } diff --git a/crates/swbus-core/tests/data/inter-cluster/topo.json b/crates/swbus-core/tests/data/inter-cluster/topo.json index 044fd09..c167ddf 100644 --- a/crates/swbus-core/tests/data/inter-cluster/topo.json +++ b/crates/swbus-core/tests/data/inter-cluster/topo.json @@ -11,7 +11,6 @@ ], "peers": [ { - "id": "region-a.cluster-a.gw", "endpoint": "127.0.0.1:60002", "conn_type": "InCluster" } @@ -31,12 +30,10 @@ ], "peers": [ { - "id": "region-a.cluster-a.node1", "endpoint": "127.0.0.1:60001", "conn_type": "InCluster" }, { - "id": "region-a.cluster-b.gw", "endpoint": "127.0.0.1:60003", "conn_type": "InRegion" } @@ -56,17 +53,15 @@ ], "peers": [ { - "id": "region-a.cluster-a.gw", "endpoint": "127.0.0.1:60002", "conn_type": "InRegion" }, { - "id": "region-a.cluster-b.node1", "endpoint": "127.0.0.1:60004", "conn_type": "InCluster" } ] - }, + }, "swbusd_cluster-b.node1": { "endpoint": "127.0.0.1:60004", "routes": [ @@ -77,7 +72,6 @@ ], "peers": [ { - "id": "region-a.cluster-b.gw", "endpoint": "127.0.0.1:60003", "conn_type": "InCluster" } diff --git a/crates/swbus-proto/src/swbus.rs b/crates/swbus-proto/src/swbus.rs index 3e4e939..92a981e 100644 --- a/crates/swbus-proto/src/swbus.rs +++ b/crates/swbus-proto/src/swbus.rs @@ -7,6 +7,8 @@ use crate::swbus::request_response::ResponseBody; /// Service path attribute in gRPC request meta data pub const SWBUS_CLIENT_SERVICE_PATH: &str = "x-swbus-service-path"; +/// Service path attribute in gRPC response meta data +pub const SWBUS_SERVER_SERVICE_PATH: &str = "x-swbus-server-service-path"; /// Service path scope of the connection pub const SWBUS_CONNECTION_TYPE: &str = "x-swbus-connection-type"; diff --git a/crates/swss-common-bridge/Cargo.toml b/crates/swss-common-bridge/Cargo.toml index 9192228..bca7ce6 100644 --- a/crates/swss-common-bridge/Cargo.toml +++ b/crates/swss-common-bridge/Cargo.toml @@ -22,7 +22,7 @@ serde.workspace = true serde_json.workspace = true sonicdb-derive.workspace = true sonic-common.workspace = true - +tracing.workspace = true [lints] workspace = true diff --git a/crates/swss-common-bridge/src/consumer.rs b/crates/swss-common-bridge/src/consumer.rs index 9bdea5f..c07f6f4 100644 --- a/crates/swss-common-bridge/src/consumer.rs +++ b/crates/swss-common-bridge/src/consumer.rs @@ -12,7 +12,7 @@ use swss_common::{ }; use tokio::task::JoinHandle; use tokio_util::task::AbortOnDropHandle; - +use tracing::debug; pub struct ConsumerBridge { _task: AbortOnDropHandle<()>, } @@ -56,6 +56,7 @@ where F: FnMut(&KeyOpFieldValues) -> (ServicePath, String) + Send + 'static, S: Fn(&KeyOpFieldValues) -> bool + Sync + Send + 'static, { + let my_sp = addr.clone(); let swbus = SimpleSwbusEdgeClient::new(rt, addr, false, false); tokio::task::spawn(async move { let mut table_cache = TableCache::default(); @@ -63,12 +64,17 @@ where if P::is_proto() { P::convert_pb_to_json(&mut kfv); } - + debug!("{}: receiving update: {:?}", my_sp.to_longest_path(), kfv); // Merge the kfv to get the whole table as an update let Some(kfv) = table_cache.merge_kfv(kfv) else { + debug!("{}: No change in table, skipping send", my_sp.to_longest_path()); return; // No change, skip sending }; if !selector(&kfv) { + debug!( + "{}: update does not match selector, skipping send", + my_sp.to_longest_path() + ); return; } @@ -175,7 +181,30 @@ macro_rules! impl_consumertable { }; } -impl_consumertable! { ConsumerStateTable[true] SubscriberStateTable[true] ZmqConsumerStateTable[false] } +impl_consumertable! { ConsumerStateTable[true] ZmqConsumerStateTable[false] } + +// Custom implementation for SubscriberStateTable to handle rehydration differently +impl ConsumerTable for SubscriberStateTable { + async fn read_data(&mut self) { + SubscriberStateTable::read_data_async(self) + .await + .expect("SubscriberStateTable::read_data_async io error"); + } + + async fn pops(&mut self) -> Vec { + SubscriberStateTable::pops_async(self) + .await + .expect("SubscriberStateTable::pops_async threw an exception") + } + + async fn rehydrate(&mut self) -> Vec { + // Custom rehydration for SubscriberStateTable: + // The constructor already populated m_buffer with initial snapshot, + // so we just drain it here to get the rehydration data. + // This also prevents the stale buffer from being mixed with live updates. + self.pops().await + } +} #[cfg(test)] mod test { diff --git a/debian/install b/debian/install index 7b3f864..a4624e8 100644 --- a/debian/install +++ b/debian/install @@ -1,3 +1,4 @@ target/release/hamgrd usr/bin target/release/swbusd usr/bin target/release/swbus-cli usr/bin +scripts/wait_for_loopback.py usr/bin diff --git a/scripts/wait_for_loopback.py b/scripts/wait_for_loopback.py new file mode 100644 index 0000000..30ac2d0 --- /dev/null +++ b/scripts/wait_for_loopback.py @@ -0,0 +1,128 @@ +#!/usr/bin/env python3 + +""" + wait_for_loopback.py + + Script to wait for LOOPBACK_INTERFACE to be configured. + + This script polls the loopback interface using ip and ip -6 commands + to check if the interface exists and has IP addresses assigned. + It will continue polling until the interface is found with addresses + or a maximum number of retries is reached. +""" + +import sys +import time +import syslog +import subprocess + +# Configuration +LOOPBACK_INTERFACE = "Loopback0" +MAX_RETRIES = 300 # Maximum number of retries +POLL_INTERVAL = 1 # Poll interval in seconds + + +def log_info(msg): + """Log info message to syslog""" + syslog.syslog(syslog.LOG_INFO, f"wait_for_loopback: {msg}") + + +def log_err(msg): + """Log error message to syslog""" + syslog.syslog(syslog.LOG_ERR, f"wait_for_loopback: {msg}") + + +def log_debug(msg): + """Log debug message to syslog""" + syslog.syslog(syslog.LOG_DEBUG, f"wait_for_loopback: {msg}") + + +def check_loopback_interface(): + """ + Check if LOOPBACK_INTERFACE interface is configured with IP addresses. + + Returns: + bool: True if LOOPBACK_INTERFACE interface exists and has IP addresses, False otherwise. + """ + try: + # Check IPv4 addresses + result_ipv4 = subprocess.run( + ["ip", "-4", "addr", "show", LOOPBACK_INTERFACE], + capture_output=True, + text=True, + timeout=5 + ) + + # Check IPv6 addresses + result_ipv6 = subprocess.run( + ["ip", "-6", "addr", "show", LOOPBACK_INTERFACE], + capture_output=True, + text=True, + timeout=5 + ) + + # Check if interface exists (return code 0) + if result_ipv4.returncode != 0 and result_ipv6.returncode != 0: + log_debug(f"{LOOPBACK_INTERFACE} interface not found") + return False + + # Check if there are any IP addresses configured + has_ipv4 = "inet " in result_ipv4.stdout + has_ipv6 = "inet6 " in result_ipv6.stdout + + if has_ipv4 or has_ipv6: + if has_ipv4: + log_debug(f"Found IPv4 addresses on {LOOPBACK_INTERFACE}") + if has_ipv6: + log_debug(f"Found IPv6 addresses on {LOOPBACK_INTERFACE}") + return True + else: + log_debug(f"{LOOPBACK_INTERFACE} interface exists but has no IP addresses") + return False + + except subprocess.TimeoutExpired: + log_err(f"Timeout checking {LOOPBACK_INTERFACE} interface") + return False + except Exception as e: + log_err(f"Error checking loopback interface: {e}") + return False + + +def wait_for_loopback(): + """ + Main function to wait for LOOPBACK_INTERFACE to be programmed. + + Returns: + int: Exit code (0 for success, 1 for failure) + """ + log_info(f"Wait for {LOOPBACK_INTERFACE} interface...") + + retry_count = 0 + + while retry_count < MAX_RETRIES: + if check_loopback_interface(): + log_info(f"{LOOPBACK_INTERFACE} interface is programmed") + return 0 + + retry_count += 1 + + time.sleep(POLL_INTERVAL) + + log_err(f"{LOOPBACK_INTERFACE} interface was not programmed after {MAX_RETRIES} retries") + return 1 + + +def main(): + """Main entry point""" + syslog.openlog("wait_for_loopback", syslog.LOG_PID) + + try: + exit_code = wait_for_loopback() + sys.exit(exit_code) + except Exception as e: + log_err(f"Unexpected error: {e}") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/test_utils/hamgrd/redis_data_set.cmd b/test_utils/hamgrd/redis_data_set.cmd index b9bcc6c..06d9ab6 100644 --- a/test_utils/hamgrd/redis_data_set.cmd +++ b/test_utils/hamgrd/redis_data_set.cmd @@ -2,6 +2,7 @@ select 4 hset DEVICE_METADATA|localhost region region-a cluster cluster-a HSET "LOOPBACK_INTERFACE|Loopback0|127.0.0.1/32" "NULL" "NULL" hset LOOPBACK_INTERFACE|Loopback0 NULL NULL +HSET "INTERFACE|Ethernet0|18.0.202.0/31" "NULL" "NULL" HSET "DPU|switch0_dpu0" "dpu_id" "0" HSET "DPU|switch0_dpu0" "gnmi_port" "50051" HSET "DPU|switch0_dpu0" "local_port" "8080" @@ -70,4 +71,4 @@ HSET DPU_STATE|dpu7 dpu_data_plane_state up select 0 HSET DASH_HA_SET_CONFIG_TABLE:haset0_0 pb "0a013112050d00010203220576647075302205766470753128013a057664707530" -HSET DASH_HA_SCOPE_CONFIG_TABLE:vdpu0:haset0_0 pb "0a01311001180122067465737469642802" +HSET DASH_HA_SCOPE_CONFIG_TABLE:vdpu0:haset0_0 pb "0a01311001180122086861736574305f302802"