diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..5ff3258 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "crates/sonic-dash-api-proto/sonic-dash-api"] + path = crates/sonic-dash-api-proto/sonic-dash-api + url = https://github.com/sonic-net/sonic-dash-api.git diff --git a/Cargo.lock b/Cargo.lock index 6327102..d07c256 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -791,11 +791,14 @@ dependencies = [ "anyhow", "chrono", "clap", + "hex", "lazy_static", + "prost", "serde", "serde_json", "serde_with", "sonic-common", + "sonic-dash-api-proto", "sonicdb-derive", "swbus-actor", "swbus-config", @@ -1937,12 +1940,29 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "sonic-dash-api-proto" +version = "0.1.0" +dependencies = [ + "hex", + "prost", + "prost-build", + "serde", + "serde_json", + "sonic-common", + "sonicdb-derive", + "strum", + "strum_macros", + "swss-common", +] + [[package]] name = "sonicdb-derive" version = "0.1.0" dependencies = [ "proc-macro2", "quote", + "sonic-common", "swss-common", "syn 2.0.58", ] @@ -1989,6 +2009,7 @@ dependencies = [ "serde", "serde_json", "serde_with", + "sonic-common", "swbus-edge", "swss-common", "swss-common-testing", @@ -2138,6 +2159,14 @@ dependencies = [ name = "swss-common-bridge" version = "0.1.0" dependencies = [ + "anyhow", + "hex", + "prost", + "serde", + "serde_json", + "sonic-common", + "sonic-dash-api-proto", + "sonicdb-derive", "swbus-actor", "swbus-edge", "swss-common", diff --git a/Cargo.toml b/Cargo.toml index dea146f..d3cfb05 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ members = [ "crates/swss-common-bridge", "crates/container", "crates/sonicdb-derive", + "crates/sonic-dash-api-proto" ] exclude = [] @@ -68,9 +69,12 @@ contracts = "0.6" derivative = "2" derive_builder = "0.20" getset = "0.1" +hex = "0.4" lazy_static = "1.4" owning_ref = "0.4" +prost-build = "0.13" strum = { version = "0.26", features = ["derive"] } +strum_macros = "0.26" regex = "1" dashmap = "6" itertools = "0.13" @@ -92,6 +96,8 @@ swbus-config = { version = "0.1.0", path = "crates/swbus-config" } swss-serde = { version = "0.1.0", path = "crates/swss-serde" } swbus-actor = { version = "0.1.0", path = "crates/swbus-actor" } sonicdb-derive = { version = "0.1.0", path = "crates/sonicdb-derive" } +sonic-dash-api-proto = { version = "0.1.0", path = "crates/sonic-dash-api-proto" } +swss-common = { git = "https://github.com/sonic-net/sonic-swss-common.git", branch = "master" } # Dev dependencies criterion = "0.5" diff --git a/azure-pipelines.yml b/azure-pipelines.yml index ab418a1..e7d4c21 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -22,6 +22,10 @@ stages: vmImage: 'ubuntu-22.04' steps: + - checkout: self + clean: true + submodules: true + - script: | set -exuo pipefail # dash-ha build deps diff --git a/crates/hamgrd/Cargo.toml b/crates/hamgrd/Cargo.toml index 504a271..9f13be4 100644 --- a/crates/hamgrd/Cargo.toml +++ b/crates/hamgrd/Cargo.toml @@ -30,7 +30,8 @@ clap.workspace = true tracing.workspace = true chrono.workspace = true uuid.workspace = true +sonic-dash-api-proto.workspace = true +prost.workspace = true +hex.workspace = true lazy_static.workspace = true - -[dev-dependencies] serde_json.workspace = true diff --git a/crates/hamgrd/src/actors.rs b/crates/hamgrd/src/actors.rs index 0fa107a..4cae90b 100644 --- a/crates/hamgrd/src/actors.rs +++ b/crates/hamgrd/src/actors.rs @@ -9,15 +9,14 @@ pub mod vdpu; #[cfg(test)] pub mod test; use anyhow::Result as AnyhowResult; +use sonic_common::SonicDbTable; use std::sync::Arc; use swbus_actor::{spawn, Actor, ActorMessage}; use swbus_edge::swbus_proto::message_id_generator::MessageIdGenerator; use swbus_edge::swbus_proto::result::*; use swbus_edge::swbus_proto::swbus::{swbus_message::Body, DataRequest, ServicePath, SwbusErrorCode, SwbusMessage}; use swbus_edge::SwbusEdgeRuntime; -use swss_common::{ - KeyOpFieldValues, KeyOperation, SonicDbTable, SubscriberStateTable, ZmqClient, ZmqProducerStateTable, -}; +use swss_common::{KeyOpFieldValues, KeyOperation, SubscriberStateTable, ZmqClient, ZmqProducerStateTable}; use swss_common_bridge::{consumer::ConsumerBridge, producer::spawn_producer_bridge}; use tokio::sync::mpsc::{channel, Receiver}; use tokio::task::JoinHandle; @@ -48,7 +47,7 @@ pub trait DbBasedActor: Actor { let sst = SubscriberStateTable::new_async(config_db, T::table_name(), None, None).await?; let addr = crate::common_bridge_sp::(&edge_runtime); let base_addr = edge_runtime.get_base_sp(); - Ok(vec![ConsumerBridge::spawn( + Ok(vec![ConsumerBridge::spawn::( edge_runtime.clone(), addr, sst, @@ -220,7 +219,7 @@ where if actor_id.is_some() { let sp = edge_runtime.new_sp(actor_name, actor_id.unwrap()); - Ok(ConsumerBridge::spawn( + Ok(ConsumerBridge::spawn::( edge_runtime, addr, sst, @@ -235,7 +234,7 @@ where )) } else { let base_addr = edge_runtime.get_base_sp(); - Ok(ConsumerBridge::spawn( + Ok(ConsumerBridge::spawn::( edge_runtime, addr, sst, diff --git a/crates/hamgrd/src/actors/dpu.rs b/crates/hamgrd/src/actors/dpu.rs index b16233b..0e6ae79 100644 --- a/crates/hamgrd/src/actors/dpu.rs +++ b/crates/hamgrd/src/actors/dpu.rs @@ -5,11 +5,12 @@ use crate::db_structs::{ use crate::ha_actor_messages::{ActorRegistration, DpuActorState, RegistrationType}; use crate::ServicePath; use anyhow::{anyhow, Result}; +use sonic_common::SonicDbTable; use std::collections::HashSet; use std::sync::Arc; use swbus_actor::{state::incoming::Incoming, state::outgoing::Outgoing, Actor, ActorMessage, Context, State}; use swbus_edge::SwbusEdgeRuntime; -use swss_common::{KeyOpFieldValues, KeyOperation, SonicDbTable, SubscriberStateTable}; +use swss_common::{KeyOpFieldValues, KeyOperation, SubscriberStateTable}; use swss_common_bridge::consumer::ConsumerBridge; use tracing::{debug, error, info, instrument}; @@ -92,7 +93,7 @@ impl DpuActor { let sst = SubscriberStateTable::new_async(config_db, Self::dpu_table_name(), None, None).await?; let addr = crate::common_bridge_sp::(&edge_runtime); let base_addr = edge_runtime.get_base_sp(); - bridges.push(ConsumerBridge::spawn( + bridges.push(ConsumerBridge::spawn::( edge_runtime.clone(), addr, sst, @@ -109,7 +110,7 @@ impl DpuActor { let sst = SubscriberStateTable::new_async(config_db, Self::remote_dpu_table_name(), None, None).await?; let addr = crate::common_bridge_sp::(&edge_runtime); let base_addr = edge_runtime.get_base_sp(); - bridges.push(ConsumerBridge::spawn( + bridges.push(ConsumerBridge::spawn::( edge_runtime.clone(), addr, sst, @@ -474,8 +475,8 @@ mod test { use crate::db_structs::{BfdSessionTable, DashBfdProbeState, DashHaGlobalConfig, Dpu, DpuState, RemoteDpu}; use crate::ha_actor_messages::DpuActorState; + use sonic_common::SonicDbTable; use std::time::Duration; - use swss_common::SonicDbTable; use swss_common_testing::Redis; use swss_serde::to_field_values; diff --git a/crates/hamgrd/src/actors/ha_scope.rs b/crates/hamgrd/src/actors/ha_scope.rs index 74eafa1..f91e32d 100644 --- a/crates/hamgrd/src/actors/ha_scope.rs +++ b/crates/hamgrd/src/actors/ha_scope.rs @@ -3,13 +3,16 @@ use crate::db_structs::*; use crate::ha_actor_messages::{ActorRegistration, HaSetActorState, RegistrationType, VDpuActorState}; use crate::{HaSetActor, VDpuActor}; use anyhow::Result; +use sonic_common::SonicDbTable; +use sonic_dash_api_proto::decode_from_field_values; +use sonic_dash_api_proto::ha_scope_config::{DesiredHaState, HaScopeConfig}; use std::collections::HashMap; use swbus_actor::{ state::{incoming::Incoming, internal::Internal, outgoing::Outgoing}, Actor, ActorMessage, Context, State, }; use swss_common::Table; -use swss_common::{KeyOpFieldValues, KeyOperation, SonicDbTable}; +use swss_common::{KeyOpFieldValues, KeyOperation}; use swss_common_bridge::consumer::ConsumerBridge; use tracing::{debug, error, info, instrument}; use uuid::Uuid; @@ -18,7 +21,7 @@ pub struct HaScopeActor { id: String, ha_scope_id: String, vdpu_id: String, - dash_ha_scope_config: Option, + dash_ha_scope_config: Option, bridges: Vec, // we need to keep track the previous dpu_ha_scope_state to detect state change dpu_ha_scope_state: Option, @@ -26,7 +29,7 @@ pub struct HaScopeActor { impl DbBasedActor for HaScopeActor { fn new(key: String) -> Result { - if let Some((vdpu_id, ha_scope_id)) = key.split_once(DashHaScopeConfigTable::key_separator()) { + if let Some((vdpu_id, ha_scope_id)) = key.split_once(HaScopeConfig::key_separator()) { Ok(HaScopeActor { id: key.to_string(), vdpu_id: vdpu_id.to_string(), @@ -41,7 +44,7 @@ impl DbBasedActor for HaScopeActor { } fn table_name() -> &'static str { - DashHaScopeConfigTable::table_name() + HaScopeConfig::table_name() } fn name() -> &'static str { @@ -185,39 +188,42 @@ impl HaScopeActor { let mut activate_role_requested = false; let mut flow_reconcile_requested = false; - if let Some(approved_ops) = dash_ha_scope_config.approved_pending_operation_ids.as_ref() { - if !approved_ops.is_empty() { - let pending_operations = self.get_pending_operations(internal, None)?; - for op_id in approved_ops { - let Some(op) = pending_operations.get(op_id) else { - // has been removed from pending list - continue; - }; - match op.as_str() { - "switchover" => { - // todo: this is for switch driven ha - } - "activate_role" => { - activate_role_requested = true; - } - "flow_reconcile" => { - flow_reconcile_requested = true; - } - "brainsplit_recover" => { - // todo: what's the action here? - } - _ => { - error!("Unknown operation type {}", op); - } + let approved_ops = dash_ha_scope_config.approved_pending_operation_ids.clone(); + if !approved_ops.is_empty() { + let pending_operations = self.get_pending_operations(internal, None)?; + for op_id in approved_ops { + let Some(op) = pending_operations.get(&op_id) else { + // has been removed from pending list + continue; + }; + match op.as_str() { + "switchover" => { + // todo: this is for switch driven ha + } + "activate_role" => { + activate_role_requested = true; + } + "flow_reconcile" => { + flow_reconcile_requested = true; + } + "brainsplit_recover" => { + // todo: what's the action here? + } + _ => { + error!("Unknown operation type {}", op); } } } } let dash_ha_scope = DashHaScopeTable { - version: dash_ha_scope_config.version, + version: dash_ha_scope_config.version.parse().unwrap(), disabled: dash_ha_scope_config.disabled, - ha_role: dash_ha_scope_config.desired_ha_state.clone(), /*todo, how switching_to_active is derived. Is it relevant to dpu driven mode */ + ha_role: format!( + "{}", + DesiredHaState::try_from(dash_ha_scope_config.desired_ha_state).unwrap() + ) + .to_lowercase(), /*todo, how switching_to_active is derived. Is it relevant to dpu driven mode */ flow_reconcile_requested, activate_role_requested, }; @@ -375,7 +381,13 @@ impl HaScopeActor { npu_ha_scope_state.local_ha_state_last_updated_reason = Some("dpu initiated".to_string()); // The target HA state in ASIC. This is the state that hamgrd generates and asking DPU to move to. - npu_ha_scope_state.local_target_asic_ha_state = Some(dash_ha_scope_config.desired_ha_state.clone()); + npu_ha_scope_state.local_target_asic_ha_state = Some( + format!( + "{}", + DesiredHaState::try_from(dash_ha_scope_config.desired_ha_state).unwrap() + ) + .to_lowercase(), + ); // The HA state that ASIC acked. npu_ha_scope_state.local_acked_asic_ha_state = Some(dpu_ha_scope_state.ha_role.clone()); @@ -415,7 +427,7 @@ impl HaScopeActor { return Ok(()); } let first_time = self.dash_ha_scope_config.is_none(); - let dash_ha_scope_config: DashHaScopeConfigTable = swss_serde::from_field_values(&kfv.field_values)?; + let dash_ha_scope_config: HaScopeConfig = decode_from_field_values(&kfv.field_values)?; // Update internal config self.dash_ha_scope_config = Some(dash_ha_scope_config); @@ -439,15 +451,13 @@ impl HaScopeActor { self.update_npu_ha_scope_state_ha_state(state)?; // need to update operation list if approved_pending_operation_ids is not empty - let approved_pending_operation_ids = match self + let approved_pending_operation_ids = self .dash_ha_scope_config .as_ref() .unwrap() .approved_pending_operation_ids - { - Some(ref ids) => ids.clone(), - None => Vec::new(), - }; + .clone(); + if !approved_pending_operation_ids.is_empty() { self.update_npu_ha_scope_state_pending_operations(state, Vec::new(), approved_pending_operation_ids)?; } @@ -589,13 +599,14 @@ mod test { vdpu::VDpuActor, DbBasedActor, }, - db_structs::{ - now_in_millis, DashHaScopeConfigTable, DashHaScopeTable, DpuDashHaScopeState, NpuDashHaScopeState, - }, + db_structs::{now_in_millis, DashHaScopeTable, DpuDashHaScopeState, NpuDashHaScopeState}, ha_actor_messages::*, }; + use sonic_common::SonicDbTable; + use sonic_dash_api_proto::ha_scope_config::{DesiredHaState, HaScopeConfig}; + use sonic_dash_api_proto::types::HaOwner; use std::time::Duration; - use swss_common::{SonicDbTable, Table}; + use swss_common::Table; use swss_common_testing::*; use swss_serde::to_field_values; @@ -644,10 +655,10 @@ mod test { #[rustfmt::skip] let commands = [ // Send DASH_HA_SCOPE_CONFIG_TABLE to actor with admin state disabled - send! { key: DashHaScopeConfigTable::table_name(), data: { "key": &scope_id, "operation": "Set", - "field_values": {"version": "1", "disabled": "true", "desired_ha_state": "active", "approved_pending_operation_ids": "" }, + send! { key: HaScopeConfig::table_name(), data: { "key": &scope_id, "operation": "Set", + "field_values": {"json": format!("{{\"version\":\"1\",\"disabled\":true,\"desired_ha_state\":{},\"owner\":{},\"ha_set_id\":\"test_id\",\"approved_pending_operation_ids\":[]}}", DesiredHaState::Active as i32, HaOwner::Dpu as i32)}, }, - addr: crate::common_bridge_sp::(&runtime.get_swbus_edge()) }, + addr: crate::common_bridge_sp::(&runtime.get_swbus_edge()) }, // Recv registration to vDPU and ha-set recv! { key: ActorRegistration::msg_key(RegistrationType::VDPUState, &scope_id), data: { "active": true }, addr: runtime.sp(VDpuActor::name(), &vdpu0_id) }, @@ -677,10 +688,10 @@ mod test { chkdb! { type: NpuDashHaScopeState, key: &scope_id_in_state, data: npu_ha_scope_state_fvs2 }, // Send DASH_HA_SCOPE_CONFIG_TABLE to actor with admin state enabled - send! { key: DashHaScopeConfigTable::table_name(), data: { "key": &scope_id, "operation": "Set", - "field_values": {"version": "2", "disabled": "false", "desired_ha_state": "active", "approved_pending_operation_ids": "" }, + send! { key: HaScopeConfig::table_name(), data: { "key": &scope_id, "operation": "Set", + "field_values": {"json": format!("{{\"version\":\"2\",\"disabled\":false,\"desired_ha_state\":{},\"owner\":{},\"ha_set_id\":\"test_id\",\"approved_pending_operation_ids\":[]}}", DesiredHaState::Active as i32, HaOwner::Dpu as i32)}, }, - addr: crate::common_bridge_sp::(&runtime.get_swbus_edge()) }, + addr: crate::common_bridge_sp::(&runtime.get_swbus_edge()) }, // Recv update to DPU DASH_HA_SCOPE_TABLE with disabled = false recv! { key: &ha_set_id, data: { "key": &ha_set_id, "operation": "Set", @@ -708,7 +719,10 @@ mod test { let db = crate::db_for_table::().await.unwrap(); let table = Table::new(db, NpuDashHaScopeState::table_name()).unwrap(); let npu_ha_scope_state: NpuDashHaScopeState = swss_serde::from_table(&table, &scope_id_in_state).unwrap(); - let op_id = npu_ha_scope_state.pending_operation_ids.unwrap().pop().unwrap(); + let op_id = format!( + "[\"{}\"]", + npu_ha_scope_state.pending_operation_ids.unwrap().pop().unwrap() + ); // continue the test to activate the role let mut npu_ha_scope_state4 = npu_ha_scope_state3.clone(); @@ -732,9 +746,9 @@ mod test { let commands = [ // Send DASH_HA_SCOPE_CONFIG_TABLE with activation approved send! { key: HaScopeActor::table_name(), data: { "key": &scope_id, "operation": "Set", - "field_values": {"version": "3", "disabled": "false", "desired_ha_state": "active", "approved_pending_operation_ids": &op_id }, + "field_values": {"json": format!("{{\"version\":\"3\",\"disabled\":false,\"desired_ha_state\":{},\"owner\":{},\"ha_set_id\":\"test_id\",\"approved_pending_operation_ids\":{}}}", DesiredHaState::Active as i32, HaOwner::Dpu as i32, &op_id)}, }, - addr: crate::common_bridge_sp::(&runtime.get_swbus_edge()) }, + addr: crate::common_bridge_sp::(&runtime.get_swbus_edge()) }, // Recv update to DPU DASH_HA_SCOPE_TABLE with activate_role_requested=true recv! { key: &ha_set_id, data: { "key": &ha_set_id, "operation": "Set", @@ -775,10 +789,10 @@ mod test { #[rustfmt::skip] let commands = [ // Send DASH_HA_SCOPE_CONFIG_TABLE with desired_ha_state = dead - send! { key: DashHaScopeConfigTable::table_name(), data: { "key": &scope_id, "operation": "Set", - "field_values": {"version": "2", "disabled": "false", "desired_ha_state": "dead", "approved_pending_operation_ids": "" }, + send! { key: HaScopeConfig::table_name(), data: { "key": &scope_id, "operation": "Set", + "field_values": {"json": format!("{{\"version\":\"2\",\"disabled\":false,\"desired_ha_state\":{},\"owner\":{},\"ha_set_id\":\"test_id\",\"approved_pending_operation_ids\":[]}}", DesiredHaState::Dead as i32, HaOwner::Dpu as i32)}, }, - addr: crate::common_bridge_sp::(&runtime.get_swbus_edge()) }, + addr: crate::common_bridge_sp::(&runtime.get_swbus_edge()) }, // Check NPU DASH_HA_SCOPE_STATE is updated with desired_ha_state = dead chkdb! { type: NpuDashHaScopeState, @@ -786,9 +800,10 @@ mod test { exclude: "pending_operation_list_last_updated_time_in_ms" }, // simulate delete of ha-scope entry - send! { key: DashHaScopeConfigTable::table_name(), data: { "key": &scope_id, "operation": "Del", - "field_values": {"version": "2", "disabled": "false", "desired_ha_state": "dead", "approved_pending_operation_ids": "" }}, - addr: crate::common_bridge_sp::(&runtime.get_swbus_edge()) }, + send! { key: HaScopeConfig::table_name(), data: { "key": &scope_id, "operation": "Del", + "field_values": {"json": format!("{{\"version\":\"2\",\"disabled\":false,\"desired_ha_state\":{},\"owner\":{},\"ha_set_id\":\"test_id\",\"approved_pending_operation_ids\":[]}}", DesiredHaState::Dead as i32, HaOwner::Dpu as i32)}, + }, + addr: crate::common_bridge_sp::(&runtime.get_swbus_edge()) }, ]; test::run_commands(&runtime, runtime.sp(HaScopeActor::name(), &scope_id), &commands).await; diff --git a/crates/hamgrd/src/actors/ha_set.rs b/crates/hamgrd/src/actors/ha_set.rs index c6675bd..8b5151d 100644 --- a/crates/hamgrd/src/actors/ha_set.rs +++ b/crates/hamgrd/src/actors/ha_set.rs @@ -3,18 +3,22 @@ use crate::actors::{spawn_consumer_bridge_for_actor, DbBasedActor}; use crate::db_structs::*; use crate::ha_actor_messages::{ActorRegistration, HaSetActorState, RegistrationType, VDpuActorState}; use anyhow::{anyhow, Result}; +use sonic_common::SonicDbTable; +use sonic_dash_api_proto::decode_from_field_values; +use sonic_dash_api_proto::ha_set_config::HaSetConfig; +use sonic_dash_api_proto::ip_to_string; use swbus_actor::{ state::{incoming::Incoming, internal::Internal, outgoing::Outgoing}, Actor, ActorMessage, Context, State, }; use swss_common::Table; -use swss_common::{KeyOpFieldValues, KeyOperation, SonicDbTable}; +use swss_common::{KeyOpFieldValues, KeyOperation}; use swss_common_bridge::consumer::ConsumerBridge; use tracing::{debug, error, info, instrument}; pub struct HaSetActor { id: String, - dash_ha_set_config: Option, + dash_ha_set_config: Option, bridges: Vec, } @@ -29,7 +33,7 @@ impl DbBasedActor for HaSetActor { } fn table_name() -> &'static str { - DashHaSetConfigTable::table_name() + HaSetConfig::table_name() } fn name() -> &'static str { @@ -95,10 +99,14 @@ impl HaSetActor { let dash_ha_set = DashHaSetTable { version: dash_ha_set_config.version.clone(), - vip_v4: dash_ha_set_config.vip_v4.clone(), - vip_v6: dash_ha_set_config.vip_v6.clone(), - owner: dash_ha_set_config.owner.clone(), - scope: dash_ha_set_config.scope.clone(), + vip_v4: dash_ha_set_config.vip_v4.as_ref().map(ip_to_string).unwrap_or_default(), + vip_v6: dash_ha_set_config.vip_v6.as_ref().map(ip_to_string), + scope: sonic_dash_api_proto::types::HaScope::try_from(dash_ha_set_config.scope) + .map(|s| { + let name = s.as_str_name(); + name.strip_prefix("SCOPE_").unwrap_or(name).to_lowercase() + }) + .ok(), local_npu_ip: local_vdpu.dpu.npu_ipv4.clone(), local_ip: local_vdpu.dpu.pa_ipv4.clone(), peer_ip: remote_vdpu.dpu.pa_ipv4.clone(), @@ -109,6 +117,7 @@ impl HaSetActor { dp_channel_probe_interval_ms: global_cfg.dp_channel_probe_interval_ms, dp_channel_probe_fail_threshold: global_cfg.dp_channel_probe_fail_threshold, }; + Ok(Some(dash_ha_set)) } @@ -154,7 +163,13 @@ impl HaSetActor { global_cfg .vnet_name .ok_or(anyhow!("Missing vnet_name in global config"))?, - self.dash_ha_set_config.as_ref().unwrap().vip_v4 + self.dash_ha_set_config + .as_ref() + .unwrap() + .vip_v4 + .as_ref() + .map(ip_to_string) + .unwrap_or_default() ); if !internal.has_entry(VnetRouteTunnelTable::table_name(), &swss_key) { @@ -235,13 +250,18 @@ impl HaSetActor { // Collect all preferred (primary) vdpus first let mut seen = std::collections::HashSet::new(); - if let Some(prefered_vdpu_ids) = ha_set_cfg.preferred_vdpu_ids.as_ref() { - for id in prefered_vdpu_ids.iter().filter(|id| !id.is_empty()) { + if !ha_set_cfg.preferred_vdpu_id.is_empty() { + for id in ha_set_cfg + .preferred_vdpu_id + .split(',') + .map(str::trim) + .filter(|id| !id.is_empty()) + { result.push( self.get_vdpu(incoming, id) .map(|vdpu| VDpuStateExt { vdpu, is_primary: true }), ); - seen.insert(id); + seen.insert(id.to_string()); } } @@ -249,7 +269,7 @@ impl HaSetActor { for id in ha_set_cfg .vdpu_ids .iter() - .filter(|id| !id.is_empty() && !seen.contains(id)) + .filter(|id| !id.is_empty() && !seen.contains(*id)) { result.push(self.get_vdpu(incoming, id).map(|vdpu| VDpuStateExt { vdpu, @@ -291,7 +311,7 @@ impl HaSetActor { } let first_time = self.dash_ha_set_config.is_none(); - self.dash_ha_set_config = Some(swss_serde::from_field_values(&dpu_kfv.field_values)?); + self.dash_ha_set_config = Some(decode_from_field_values(&dpu_kfv.field_values).unwrap()); // Subscribe to the DPU Actor for state updates. self.register_to_vdpu_actor(outgoing, true).await?; @@ -387,6 +407,7 @@ impl Actor for HaSetActor { #[cfg(test)] mod test { + use crate::actors::KeyOperation; use crate::{ actors::{ ha_set::HaSetActor, @@ -394,13 +415,30 @@ mod test { vdpu::VDpuActor, DbBasedActor, }, - db_structs::{DashHaGlobalConfig, DashHaSetConfigTable, DashHaSetTable, VnetRouteTunnelTable}, + db_structs::*, ha_actor_messages::*, }; + use sonic_common::SonicDbTable; + use sonic_dash_api_proto::ha_set_config::HaSetConfig; + use sonic_dash_api_proto::ip_to_string; + use std::collections::HashMap; use std::time::Duration; - use swss_common::SonicDbTable; + use swss_common::CxxString; + use swss_common::KeyOpFieldValues; use swss_common_testing::*; + fn protobuf_struct_to_kfv(cfg: &T) -> HashMap { + let json = serde_json::to_string(&cfg).unwrap(); + let mut kfv = KeyOpFieldValues { + key: HaSetActor::table_name().to_string(), + operation: KeyOperation::Set, + field_values: HashMap::new(), + }; + kfv.field_values.clear(); + kfv.field_values.insert("json".to_string(), json.into()); + kfv.field_values.clone() + } + #[tokio::test] async fn ha_set_actor() { // To enable trace, set ENABLE_TRACE=1 to run test @@ -414,7 +452,7 @@ mod test { let global_cfg_fvs = serde_json::to_value(swss_serde::to_field_values(&global_cfg).unwrap()).unwrap(); let (ha_set_id, ha_set_cfg) = make_dpu_scope_ha_set_config(0, 0); - let ha_set_cfg_fvs = serde_json::to_value(swss_serde::to_field_values(&ha_set_cfg).unwrap()).unwrap(); + let ha_set_cfg_fvs = protobuf_struct_to_kfv(&ha_set_cfg); let dpu0 = make_local_dpu_actor_state(0, 0, true, None, None); let dpu1 = make_remote_dpu_actor_state(1, 0); let (vdpu0_id, vdpu0_state_obj) = make_vdpu_actor_state(true, &dpu0); @@ -454,7 +492,7 @@ mod test { let commands = [ // Send DASH_HA_SET_CONFIG_TABLE config send! { key: HaSetActor::table_name(), data: { "key": HaSetActor::table_name(), "operation": "Set", "field_values": ha_set_cfg_fvs }, - addr: crate::common_bridge_sp::(&runtime.get_swbus_edge()) }, + addr: crate::common_bridge_sp::(&runtime.get_swbus_edge()) }, recv! { key: ActorRegistration::msg_key(RegistrationType::VDPUState, &ha_set_id), data: { "active": true }, addr: runtime.sp(VDpuActor::name(), &vdpu0_id) }, recv! { key: ActorRegistration::msg_key(RegistrationType::VDPUState, &ha_set_id), data: { "active": true }, @@ -473,10 +511,10 @@ mod test { // Verify that haset actor state is sent to ha-scope actor recv! { key: HaSetActorState::msg_key(&ha_set_id), data: { "up": true, "ha_set": &ha_set_obj }, addr: runtime.sp("ha-scope", &format!("vdpu0:{ha_set_id}")) }, - chkdb! { type: VnetRouteTunnelTable, key: &format!("{}:{}", global_cfg.vnet_name.unwrap(), ha_set_cfg.vip_v4), data: expected_vnet_route }, + chkdb! { type: VnetRouteTunnelTable, key: &format!("{}:{}", global_cfg.vnet_name.unwrap(), ip_to_string(&ha_set_cfg.vip_v4.unwrap())), data: expected_vnet_route }, // simulate delete of ha-set entry send! { key: HaSetActor::table_name(), data: { "key": HaSetActor::table_name(), "operation": "Del", "field_values": ha_set_cfg_fvs }, - addr: crate::common_bridge_sp::(&runtime.get_swbus_edge()) }, + addr: crate::common_bridge_sp::(&runtime.get_swbus_edge()) }, ]; test::run_commands(&runtime, runtime.sp(HaSetActor::name(), &ha_set_id), &commands).await; @@ -499,7 +537,7 @@ mod test { let global_cfg_fvs = serde_json::to_value(swss_serde::to_field_values(&global_cfg).unwrap()).unwrap(); let (ha_set_id, ha_set_cfg) = make_dpu_scope_ha_set_config(2, 0); - let ha_set_cfg_fvs = serde_json::to_value(swss_serde::to_field_values(&ha_set_cfg).unwrap()).unwrap(); + let ha_set_cfg_fvs = protobuf_struct_to_kfv(&ha_set_cfg); let dpu0 = make_remote_dpu_actor_state(2, 0); let dpu1 = make_remote_dpu_actor_state(3, 0); let (vdpu0_id, vdpu0_state_obj) = make_vdpu_actor_state(true, &dpu0); @@ -535,7 +573,7 @@ mod test { let commands = [ // Send DASH_HA_SET_CONFIG_TABLE config send! { key: HaSetActor::table_name(), data: { "key": HaSetActor::table_name(), "operation": "Set", "field_values": ha_set_cfg_fvs }, - addr: crate::common_bridge_sp::(&runtime.get_swbus_edge()) }, + addr: crate::common_bridge_sp::(&runtime.get_swbus_edge()) }, recv! { key: ActorRegistration::msg_key(RegistrationType::VDPUState, &ha_set_id), data: { "active": true }, addr: runtime.sp(VDpuActor::name(), &vdpu0_id) }, recv! { key: ActorRegistration::msg_key(RegistrationType::VDPUState, &ha_set_id), data: { "active": true }, @@ -546,11 +584,12 @@ mod test { // Simulate VDPU state update for vdpu1 (backup) send! { key: VDpuActorState::msg_key(&vdpu1_id), data: vdpu1_state, addr: runtime.sp("vdpu", &vdpu1_id) }, // Verify that the DASH_HA_SET_TABLE was updated - chkdb! { type: VnetRouteTunnelTable, key: &format!("{}:{}", global_cfg.vnet_name.unwrap(), ha_set_cfg.vip_v4), + + chkdb! { type: VnetRouteTunnelTable, key: &format!("{}:{}", global_cfg.vnet_name.unwrap(), ip_to_string(&ha_set_cfg.vip_v4.unwrap())), data: expected_vnet_route }, // simulate delete of ha-set entry send! { key: HaSetActor::table_name(), data: { "key": HaSetActor::table_name(), "operation": "Del", "field_values": ha_set_cfg_fvs }, - addr: crate::common_bridge_sp::(&runtime.get_swbus_edge()) }, + addr: crate::common_bridge_sp::(&runtime.get_swbus_edge()) }, ]; test::run_commands(&runtime, runtime.sp(HaSetActor::name(), &ha_set_id), &commands).await; diff --git a/crates/hamgrd/src/actors/test.rs b/crates/hamgrd/src/actors/test.rs index 297447b..acbf1b8 100644 --- a/crates/hamgrd/src/actors/test.rs +++ b/crates/hamgrd/src/actors/test.rs @@ -3,6 +3,8 @@ use crate::ha_actor_messages::*; use crate::RuntimeData; use anyhow::Result; use serde_json::Value; +use sonic_dash_api_proto::ip_to_string; +use sonic_dash_api_proto::{ha_set_config::HaSetConfig, types::ip_address::Ip, types::*}; use std::{collections::HashMap, future::Future, time::Duration}; use std::{net::Ipv4Addr, net::Ipv6Addr, sync::Arc}; use swbus_actor::{ActorMessage, ActorRuntime}; @@ -433,26 +435,41 @@ pub fn make_vdpu_actor_state(up: bool, dpu_state: &DpuActorState) -> (String, VD ) } +pub fn string_to_ip(s: String) -> Option { + if s.is_empty() { + return None; + } + if let Ok(v4) = s.parse::() { + Some(IpAddress { + ip: Some(Ip::Ipv4(u32::from(v4))), + }) + } else if let Ok(v6) = s.parse::() { + Some(IpAddress { + ip: Some(Ip::Ipv6(v6.octets().to_vec())), + }) + } else { + None + } +} + /// Create a DashHaSetConfigTable for a given DPU /// The allocation scheme is as follows: /// switch_n/dpu_x and switch_n+1/dpu_x are in the same HA set, where n is even /// vdpu id is vdpu{switch id * 8}-{dpu} -pub fn make_dpu_scope_ha_set_config(switch: u16, dpu: u16) -> (String, DashHaSetConfigTable) { +pub fn make_dpu_scope_ha_set_config(switch: u16, dpu: u16) -> (String, HaSetConfig) { let switch_pair_id = switch / 2; let vdpu0_id = format!("vdpu{}-{dpu}", switch_pair_id * 2); let vdpu1_id = format!("vdpu{}-{dpu}", switch_pair_id * 2 + 1); - let ha_set = DashHaSetConfigTable { + let ha_set = HaSetConfig { version: "1".to_string(), - vip_v4: format!("3.2.{switch_pair_id}.{dpu}"), - vip_v6: Some(normalize_ipv6(&format!("3:2:{switch_pair_id}::{dpu}"))), - // dpu or switch - owner: Some("dpu".to_string()), + vip_v4: string_to_ip(format!("3.2.{switch_pair_id}.{dpu}")), + vip_v6: string_to_ip(normalize_ipv6(&format!("3:2:{switch_pair_id}::{dpu}"))), // dpu or eni - scope: Some("dpu".to_string()), + scope: HaScope::Dpu as i32, vdpu_ids: vec![vdpu0_id.clone(), vdpu1_id.clone()], - pinned_vdpu_bfd_probe_states: None, - preferred_vdpu_ids: Some(vec![vdpu0_id]), - preferred_standalone_vdpu_index: Some(0), + pinned_vdpu_bfd_probe_states: vec!["".to_string()], + preferred_vdpu_id: vdpu0_id, + preferred_standalone_vdpu_index: 0, }; (format!("haset{switch_pair_id}-{dpu}"), ha_set) } @@ -463,10 +480,9 @@ pub fn make_dpu_scope_ha_set_obj(switch: u16, dpu: u16) -> (String, DashHaSetTab let global_cfg = make_dash_ha_global_config(); let ha_set = DashHaSetTable { version: "1".to_string(), - vip_v4: haset_cfg.vip_v4, - vip_v6: haset_cfg.vip_v6, - owner: haset_cfg.owner, - scope: haset_cfg.scope, + vip_v4: ip_to_string(&haset_cfg.vip_v4.unwrap()), + vip_v6: Some(ip_to_string(&haset_cfg.vip_v6.unwrap())), + scope: Some("ha_scope_dpu".to_string()), local_npu_ip: format!("10.0.{switch}.{dpu}"), local_ip: format!("18.0.{switch}.{dpu}"), peer_ip: format!("18.0.{}.{dpu}", switch_pair_id * 2 + 1), diff --git a/crates/hamgrd/src/actors/vdpu.rs b/crates/hamgrd/src/actors/vdpu.rs index 6db4410..33e0f12 100644 --- a/crates/hamgrd/src/actors/vdpu.rs +++ b/crates/hamgrd/src/actors/vdpu.rs @@ -3,9 +3,10 @@ use crate::actors::DbBasedActor; use crate::db_structs::VDpu; use crate::ha_actor_messages::{ActorRegistration, DpuActorState, RegistrationType, VDpuActorState}; use anyhow::Result; +use sonic_common::SonicDbTable; use swbus_actor::Context; use swbus_actor::{state::incoming::Incoming, state::outgoing::Outgoing, Actor, State}; -use swss_common::{KeyOpFieldValues, KeyOperation, SonicDbTable}; +use swss_common::{KeyOpFieldValues, KeyOperation}; use tracing::{error, instrument}; pub struct VDpuActor { @@ -150,8 +151,8 @@ mod test { }, ha_actor_messages::*, }; + use sonic_common::SonicDbTable; use std::time::Duration; - use swss_common::SonicDbTable; #[tokio::test] async fn vdpu_actor() { diff --git a/crates/hamgrd/src/db_structs.rs b/crates/hamgrd/src/db_structs.rs index dc6f80a..f370b72 100644 --- a/crates/hamgrd/src/db_structs.rs +++ b/crates/hamgrd/src/db_structs.rs @@ -213,27 +213,6 @@ pub fn now_in_millis() -> i64 { chrono::Utc::now().timestamp_millis() } -/// -#[serde_as] -#[skip_serializing_none] -#[derive(Serialize, Deserialize, SonicDb)] -#[sonicdb(table_name = "DASH_HA_SET_CONFIG_TABLE", key_separator = ":", db_name = "APPL_DB")] -pub struct DashHaSetConfigTable { - pub version: String, - pub vip_v4: String, - pub vip_v6: Option, - // dpu or switch - pub owner: Option, - // dpu or eni - pub scope: Option, - #[serde_as(as = "StringWithSeparator::")] - pub vdpu_ids: Vec, - pub pinned_vdpu_bfd_probe_states: Option, - #[serde_as(as = "Option>")] - pub preferred_vdpu_ids: Option>, - pub preferred_standalone_vdpu_index: Option, -} - /// #[skip_serializing_none] #[derive(Serialize, Deserialize, Default, PartialEq, Eq, SonicDb)] @@ -250,8 +229,6 @@ pub struct DashHaSetTable { pub vip_v4: String, // IPv4 Data path VIP. pub vip_v6: Option, - // Owner of HA state machine. It can be controller, switch. - pub owner: Option, // Scope of HA set. It can be dpu, eni. pub scope: Option, // The IP address of local NPU. It can be IPv4 or IPv6. Used for setting up the BFD session. @@ -292,19 +269,6 @@ pub struct VnetRouteTunnelTable { pub check_directly_connected: Option, } -/// -#[skip_serializing_none] -#[serde_as] -#[derive(Debug, Deserialize, Serialize, PartialEq, SonicDb)] -#[sonicdb(table_name = "DASH_HA_SCOPE_CONFIG_TABLE", key_separator = ":", db_name = "APPL_DB")] -pub struct DashHaScopeConfigTable { - pub version: u32, - pub disabled: bool, - pub desired_ha_state: String, - #[serde_as(as = "Option>")] - pub approved_pending_operation_ids: Option>, -} - /// #[skip_serializing_none] #[serde_as] diff --git a/crates/hamgrd/src/main.rs b/crates/hamgrd/src/main.rs index 882118c..8b6404d 100644 --- a/crates/hamgrd/src/main.rs +++ b/crates/hamgrd/src/main.rs @@ -1,6 +1,7 @@ use anyhow::{anyhow, Ok}; use clap::Parser; use sonic_common::log; +use sonic_common::SonicDbTable; use std::net::{Ipv4Addr, Ipv6Addr}; use std::{ sync::{Arc, Mutex}, @@ -9,7 +10,7 @@ use std::{ use swbus_actor::{set_global_runtime, ActorRuntime}; use swbus_config::swbus_config_from_db; use swbus_edge::{simple_client::SimpleSwbusEdgeClient, swbus_proto::swbus::ServicePath, RuntimeEnv, SwbusEdgeRuntime}; -use swss_common::{sonic_db_config_initialize_global, DbConnector, SonicDbTable}; +use swss_common::{sonic_db_config_initialize_global, DbConnector}; use swss_common_bridge::consumer::ConsumerBridge; use tokio::{signal, task::JoinHandle, time::timeout}; use tracing::error; @@ -19,10 +20,9 @@ mod ha_actor_messages; use actors::spawn_zmq_producer_bridge; use actors::{dpu::DpuActor, ha_scope::HaScopeActor, ha_set::HaSetActor, vdpu::VDpuActor, DbBasedActor}; use anyhow::Result; -use db_structs::{ - BfdSessionTable, DashHaScopeConfigTable, DashHaScopeTable, DashHaSetConfigTable, DashHaSetTable, Dpu, VDpu, -}; +use db_structs::{BfdSessionTable, DashHaScopeTable, DashHaSetTable, Dpu, VDpu}; use lazy_static::lazy_static; +use sonic_dash_api_proto::{ha_scope_config::HaScopeConfig, ha_set_config::HaSetConfig}; use std::any::Any; lazy_static! { @@ -153,8 +153,8 @@ async fn start_actor_creators(edge_runtime: &Arc) -> Result = Vec::new(); bridges.append(&mut DpuActor::start_actor_creator(edge_runtime.clone()).await?); bridges.append(&mut VDpuActor::start_actor_creator::(edge_runtime.clone()).await?); - bridges.append(&mut HaSetActor::start_actor_creator::(edge_runtime.clone()).await?); - bridges.append(&mut HaScopeActor::start_actor_creator::(edge_runtime.clone()).await?); + bridges.append(&mut HaSetActor::start_actor_creator::(edge_runtime.clone()).await?); + bridges.append(&mut HaScopeActor::start_actor_creator::(edge_runtime.clone()).await?); Ok(bridges) } @@ -221,7 +221,7 @@ impl RuntimeData { pub fn common_bridge_sp(runtime: &SwbusEdgeRuntime) -> ServicePath where - T: swss_common::SonicDbTable + 'static, + T: sonic_common::SonicDbTable + 'static, { let mut new_sp = runtime.get_base_sp(); new_sp.resource_type = "swss-common-bridge".into(); diff --git a/crates/sonic-common/src/lib.rs b/crates/sonic-common/src/lib.rs index 2f2dd5c..84262bd 100644 --- a/crates/sonic-common/src/lib.rs +++ b/crates/sonic-common/src/lib.rs @@ -1,2 +1,18 @@ pub mod log; pub mod panic; +use swss_common::KeyOpFieldValues; + +/// Trait for objects that can be stored in a Sonic DB table. +pub trait SonicDbTable { + fn key_separator() -> char; + fn table_name() -> &'static str; + fn db_name() -> &'static str; + fn is_proto() -> bool { + false + } + fn convert_pb_to_json(_kfv: &mut KeyOpFieldValues) { + // Default implementation does nothing. + // This can be overridden by the macro to convert protobuf to JSON. + } + fn is_dpu() -> bool; +} diff --git a/crates/sonic-dash-api-proto/Cargo.toml b/crates/sonic-dash-api-proto/Cargo.toml new file mode 100644 index 0000000..2dc0b81 --- /dev/null +++ b/crates/sonic-dash-api-proto/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "sonic-dash-api-proto" +version.workspace = true +authors.workspace = true +license.workspace = true +repository.workspace = true +documentation.workspace = true +keywords.workspace = true +edition.workspace = true + +[dependencies] +prost.workspace = true +serde.workspace = true +sonicdb-derive.workspace = true +swss-common.workspace = true +sonic-common.workspace = true +hex.workspace = true +serde_json.workspace = true +strum.workspace = true +strum_macros.workspace = true + +[build-dependencies] +prost-build.workspace = true diff --git a/crates/sonic-dash-api-proto/build.rs b/crates/sonic-dash-api-proto/build.rs new file mode 100644 index 0000000..f43ae69 --- /dev/null +++ b/crates/sonic-dash-api-proto/build.rs @@ -0,0 +1,68 @@ +fn main() { + let proto_path = "sonic-dash-api/proto".to_string(); + + let mut proto_config = prost_build::Config::new(); + + proto_config.type_attribute("dash.ha_scope_config.HaScopeConfig", "use sonicdb_derive::SonicDb;"); + + proto_config.type_attribute("dash.ha_scope_config.HaScopeConfig", "use prost::Message;"); + + proto_config.type_attribute("dash.ha_scope_config.HaScopeConfig", "#[derive(SonicDb)]"); + + proto_config.type_attribute( + "dash.ha_scope_config.HaScopeConfig", + "#[sonicdb(table_name = \"DASH_HA_SCOPE_CONFIG_TABLE\", key_separator = \":\", db_name = \"APPL_DB\", is_proto = \"true\",)]", + ); + + proto_config.type_attribute( + "dash.ha_scope_config.HaScopeConfig", + "#[derive(serde::Serialize, serde::Deserialize)]", + ); + + proto_config.type_attribute( + "dash.ha_scope_config.DesiredHaState", + "#[derive(strum_macros::Display)]", + ); + + proto_config.type_attribute("dash.ha_set_config.HaSetConfig", "use sonicdb_derive::SonicDb;"); + + proto_config.type_attribute("dash.ha_set_config.HaSetConfig", "use prost::Message;"); + + proto_config.type_attribute("dash.ha_set_config.HaSetConfig", "#[derive(SonicDb)]"); + + proto_config.type_attribute( + "dash.ha_set_config.HaSetConfig", + "#[sonicdb(table_name = \"DASH_HA_SET_CONFIG_TABLE\", key_separator = \":\", db_name = \"APPL_DB\", is_proto = \"true\",)]", + ); + + proto_config.type_attribute( + "dash.ha_set_config.HaSetConfig", + "#[derive(serde::Serialize, serde::Deserialize)]", + ); + + proto_config.type_attribute( + "dash.types.IpAddress", + "#[derive(serde::Serialize, serde::Deserialize)]", + ); + + proto_config + .compile_protos( + &[ + format!("{proto_path}/ha_set_config.proto"), + format!("{proto_path}/ha_scope_config.proto"), + format!("{proto_path}/types.proto"), + ], + &[proto_path], + ) + .unwrap(); + + // --- PATCH GENERATED CODE FOR SERDE ON ONEOF ENUMS --- + let out_dir = std::env::var("OUT_DIR").unwrap(); + let types_rs = std::path::Path::new(&out_dir).join("dash.types.rs"); + let content = std::fs::read_to_string(&types_rs).unwrap(); + let patched = content.replace( + "#[derive(Clone, PartialEq, ::prost::Oneof)]\n pub enum Ip {", + "#[derive(Clone, PartialEq, ::prost::Oneof)]\n #[derive(serde::Serialize, serde::Deserialize)]\n pub enum Ip {" + ); + std::fs::write(&types_rs, patched).unwrap(); +} diff --git a/crates/sonic-dash-api-proto/src/lib.rs b/crates/sonic-dash-api-proto/src/lib.rs new file mode 100644 index 0000000..3d57dd4 --- /dev/null +++ b/crates/sonic-dash-api-proto/src/lib.rs @@ -0,0 +1,36 @@ +use crate::types::ip_address::Ip; +use crate::types::IpAddress; +use std::collections::HashMap; +use swss_common::CxxString; + +pub mod ha_scope_config { + include!(concat!(env!("OUT_DIR"), "/dash.ha_scope_config.rs")); +} + +pub mod ha_set_config { + include!(concat!(env!("OUT_DIR"), "/dash.ha_set_config.rs")); +} + +pub mod types { + include!(concat!(env!("OUT_DIR"), "/dash.types.rs")); +} + +pub fn ip_to_string(ip: &IpAddress) -> String { + match &ip.ip { + Some(Ip::Ipv4(addr)) => std::net::Ipv4Addr::from(*addr).to_string(), + Some(Ip::Ipv6(addr)) => { + use std::net::Ipv6Addr; + let bytes: [u8; 16] = addr.clone().try_into().unwrap_or([0; 16]); + Ipv6Addr::from(bytes).to_string() + } + _ => "".to_string(), + } +} + +pub fn decode_from_field_values serde::Deserialize<'de>>( + field_values: &HashMap, +) -> Result { + let json_str = field_values.get("json").unwrap(); + let s = json_str.to_string_lossy().into_owned(); + serde_json::from_str(&s) +} diff --git a/crates/sonicdb-derive/Cargo.toml b/crates/sonicdb-derive/Cargo.toml index e3d8479..32b1a8f 100644 --- a/crates/sonicdb-derive/Cargo.toml +++ b/crates/sonicdb-derive/Cargo.toml @@ -13,6 +13,7 @@ syn = "2.0" # For parsing Rust code quote = "1.0" # For generating Rust code proc-macro2 = "1.0" # For working with procedural macros swss-common = { git = "https://github.com/sonic-net/sonic-swss-common.git", branch = "master" } +sonic-common.workspace = true [lib] proc-macro = true diff --git a/crates/sonicdb-derive/src/lib.rs b/crates/sonicdb-derive/src/lib.rs index f31c1dc..351a257 100644 --- a/crates/sonicdb-derive/src/lib.rs +++ b/crates/sonicdb-derive/src/lib.rs @@ -15,6 +15,7 @@ pub fn serde_sonicdb_derive(input: TokenStream) -> TokenStream { let mut table_name: String = "".to_string(); let mut key_separator: char = 'a'; let mut db_name: String = "".to_string(); + let mut protobuf_encoded: bool = false; let mut is_dpu: bool = false; for attr in &input.attrs { if attr.path().is_ident("sonicdb") { @@ -40,6 +41,14 @@ pub fn serde_sonicdb_derive(input: TokenStream) -> TokenStream { let s: LitStr = value.parse()?; db_name = s.value(); Ok(()) + } else if meta.path.is_ident("is_proto") { + let value = meta.value()?; // this parses the `=` + let proto_bool: LitStr = value.parse()?; + match proto_bool.value().to_lowercase().as_str() { + "true" => protobuf_encoded = true, + _ => protobuf_encoded = false, + }; + Ok(()) } else if meta.path.is_ident("is_dpu") { let value = meta.value()?; let s: LitStr = value.parse()?; @@ -67,10 +76,49 @@ pub fn serde_sonicdb_derive(input: TokenStream) -> TokenStream { panic!("Missing key_separator attribute"); } + let convert_pb_to_json_impl = if protobuf_encoded { + quote! { + fn is_proto() -> bool { + true + } + fn convert_pb_to_json(kfv: &mut swss_common::KeyOpFieldValues) { + let value_hex = match kfv.field_values.get("pb") { + Some(v) => v.to_str().ok(), + None => None, + }; + let value_hex = match value_hex { + Some(s) if !s.is_empty() => s, + _ => return, + }; + let value_bytes = match hex::decode(value_hex) { + Ok(bytes) => bytes, + Err(_) => return, + }; + let config = match #struct_name::decode(&*value_bytes) { + Ok(cfg) => cfg, + Err(_) => return, + }; + + let json = match serde_json::to_string(&config) { + Ok(j) => j, + Err(_) => return, + }; + kfv.field_values.clear(); + kfv.field_values.insert("json".to_string(), json.into()); + } + } + } else { + quote! { + fn is_proto() -> bool { + false + } + } + }; + let is_dpu_value = is_dpu; let expanded = quote! { - impl swss_common::SonicDbTable for #struct_name { + impl sonic_common::SonicDbTable for #struct_name { fn key_separator() -> char { #key_separator } @@ -86,9 +134,10 @@ pub fn serde_sonicdb_derive(input: TokenStream) -> TokenStream { fn is_dpu() -> bool { #is_dpu_value } + + #convert_pb_to_json_impl } }; - // Return the generated code TokenStream::from(expanded) } diff --git a/crates/sonicdb-derive/tests/basic.rs b/crates/sonicdb-derive/tests/basic.rs index 11cb4fe..7e6b4bc 100644 --- a/crates/sonicdb-derive/tests/basic.rs +++ b/crates/sonicdb-derive/tests/basic.rs @@ -1,6 +1,6 @@ #![allow(unused)] +use sonic_common::SonicDbTable; use sonicdb_derive::SonicDb; -use swss_common::SonicDbTable; #[test] fn test_attributes() { #[derive(SonicDb)] diff --git a/crates/swbus-actor/Cargo.toml b/crates/swbus-actor/Cargo.toml index 338ed3b..eb77038 100644 --- a/crates/swbus-actor/Cargo.toml +++ b/crates/swbus-actor/Cargo.toml @@ -17,6 +17,7 @@ serde_json.workspace = true serde_with.workspace = true anyhow.workspace = true tracing.workspace = true +sonic-common.workspace = true [lints] workspace = true diff --git a/crates/swbus-actor/src/state/outgoing.rs b/crates/swbus-actor/src/state/outgoing.rs index 3a3d87d..24215b3 100644 --- a/crates/swbus-actor/src/state/outgoing.rs +++ b/crates/swbus-actor/src/state/outgoing.rs @@ -144,7 +144,7 @@ impl Outgoing { pub fn common_bridge_sp(&self) -> ServicePath where - T: swss_common::SonicDbTable + 'static, + T: sonic_common::SonicDbTable + 'static, { let resource_id = format!("{}|{}", T::db_name(), T::table_name()); self.from_my_sp("swss-common-bridge", &resource_id) diff --git a/crates/swss-common-bridge/Cargo.toml b/crates/swss-common-bridge/Cargo.toml index 2153471..9192228 100644 --- a/crates/swss-common-bridge/Cargo.toml +++ b/crates/swss-common-bridge/Cargo.toml @@ -14,6 +14,14 @@ swbus-edge = { path = "../swbus-edge" } tokio.workspace = true tokio-util.workspace = true swbus-actor = { path = "../swbus-actor" } +anyhow.workspace = true +sonic-dash-api-proto.workspace = true +prost.workspace = true +hex.workspace = true +serde.workspace = true +serde_json.workspace = true +sonicdb-derive.workspace = true +sonic-common.workspace = true [lints] workspace = true diff --git a/crates/swss-common-bridge/src/consumer.rs b/crates/swss-common-bridge/src/consumer.rs index 420c3aa..299272c 100644 --- a/crates/swss-common-bridge/src/consumer.rs +++ b/crates/swss-common-bridge/src/consumer.rs @@ -1,3 +1,4 @@ +use sonic_common::SonicDbTable; use std::{collections::HashMap, future::Future, sync::Arc}; use swbus_actor::ActorMessage; use swbus_edge::{ @@ -21,7 +22,7 @@ impl ConsumerBridge { /// `dest_generator` is a function that takes a `&KeyOpFieldValues` read from `table` /// and generates the `ServicePath` address and `String` input table key that /// the data will be sent to. - pub fn spawn( + pub fn spawn( rt: Arc, addr: ServicePath, table: T, @@ -29,18 +30,19 @@ impl ConsumerBridge { selector: S, ) -> Self where + P: SonicDbTable, T: ConsumerTable, F: FnMut(&KeyOpFieldValues) -> (ServicePath, String) + Send + 'static, S: Fn(&KeyOpFieldValues) -> bool + Sync + Send + 'static, { - let task = spawn_consumer_bridge(rt, addr, table, dest_generator, selector); + let task = spawn_consumer_bridge::(rt, addr, table, dest_generator, selector); ConsumerBridge { _task: AbortOnDropHandle::new(task), } } } -pub fn spawn_consumer_bridge( +pub fn spawn_consumer_bridge( rt: Arc, addr: ServicePath, mut table: T, @@ -48,6 +50,7 @@ pub fn spawn_consumer_bridge( selector: S, ) -> JoinHandle<()> where + P: SonicDbTable, T: ConsumerTable, F: FnMut(&KeyOpFieldValues) -> (ServicePath, String) + Send + 'static, S: Fn(&KeyOpFieldValues) -> bool + Sync + Send + 'static, @@ -55,7 +58,11 @@ where let swbus = SimpleSwbusEdgeClient::new(rt, addr, false, false); tokio::task::spawn(async move { let mut table_cache = TableCache::default(); - let mut send_kfv = async |kfv: KeyOpFieldValues| { + let mut send_kfv = async |mut kfv: KeyOpFieldValues| { + if P::is_proto() { + P::convert_pb_to_json(&mut kfv); + } + // Merge the kfv to get the whole table as an update let kfv = table_cache.merge_kfv(kfv); if !selector(&kfv) { @@ -198,6 +205,9 @@ impl_consumertable! { ConsumerStateTable[true] SubscriberStateTable[true] ZmqCon mod test { use super::{spawn_consumer_bridge, ConsumerTable}; use crate::producer::ProducerTable; + use sonic_dash_api_proto::ha_set_config::HaSetConfig; + use sonicdb_derive::SonicDb; + use std::collections::HashMap; use std::{sync::Arc, time::Duration}; use swbus_actor::ActorMessage; use swbus_edge::{ @@ -212,6 +222,13 @@ mod test { use swss_common_testing::{random_kfvs, random_zmq_endpoint, Redis}; use tokio::time::timeout; + #[derive(SonicDb)] + #[sonicdb(table_name = "MY_STRUCT", key_separator = ":", db_name = "db1")] + struct MyStruct { + _id1: String, + _attr1: Option, + } + #[tokio::test] async fn consumer_state_table_bridge() { let redis = Redis::start(); @@ -223,6 +240,14 @@ mod test { .unwrap(); } + #[tokio::test] + async fn consumer_state_proto_table_bridge() { + let redis = Redis::start(); + let pst = ProducerStateTable::new(redis.db_connector(), "mytable").unwrap(); + let cst = ConsumerStateTable::new(redis.db_connector(), "mytable", None, None).unwrap(); + timeout(Duration::from_secs(5), run_proto_test(cst, pst)).await.unwrap(); + } + #[tokio::test] async fn zmq_consumer_state_table_bridge() { let (zmq_endpoint, _deleter) = random_zmq_endpoint(); @@ -251,7 +276,7 @@ mod test { let swbus = SimpleSwbusEdgeClient::new(rt.clone(), sp("receiver"), true, false); // Spawn the bridge - let bridge = spawn_consumer_bridge( + let bridge = spawn_consumer_bridge::( rt.clone(), sp("mytable-bridge"), consumer_table, @@ -277,7 +302,7 @@ mod test { // Test rehydration if let Some(rehydrate_table) = rehydrate_table { // Spawn new bridge to rehydrate with - let _bridge_rehydrate = spawn_consumer_bridge( + let _bridge_rehydrate = spawn_consumer_bridge::( rt, sp("mytable-bridge"), rehydrate_table, @@ -315,4 +340,58 @@ mod test { fn sp(s: &str) -> ServicePath { ServicePath::from_string(&format!("test.test.test/test/test/test/{s}")).unwrap() } + + async fn run_proto_test(consumer_table: C, mut producer_table: P) { + // Setup swbus + let mut swbus_edge = SwbusEdgeRuntime::new("".to_string(), sp("edge")); + swbus_edge.start().await.unwrap(); + let rt = Arc::new(swbus_edge); + + // Create edge client to receive updates from the bridge + let swbus = SimpleSwbusEdgeClient::new(rt.clone(), sp("receiver"), true, false); + + // Spawn the bridge + let bridge = spawn_consumer_bridge::( + rt.clone(), + sp("mytable-bridge"), + consumer_table, + |_| (sp("receiver"), "".into()), + |_| true, + ); + + // Send some updates we should receive + let kfvs = KeyOpFieldValues { + key: "haset0_0".to_string(), + operation: swss_common::KeyOperation::Set, + field_values: { + let mut map = HashMap::new(); + map.insert( + "pb".to_string(), + "0a013112050d00010203220576647075302205766470753128013a057664707530" + .to_string() + .into(), + ); + map + }, + }; + + producer_table.apply_kfv(kfvs.clone()).await; + + let kfvs_expected = KeyOpFieldValues { + key: "haset0_0".to_string(), + operation: swss_common::KeyOperation::Set, + field_values: { + let mut map = HashMap::new(); + map.insert("json".to_string(), "{\"version\":\"1\",\"vip_v4\":{\"ip\":{\"Ipv4\":50462976}},\"vip_v6\":null,\"vdpu_ids\":[\"vdpu0\",\"vdpu1\"],\"scope\":1,\"pinned_vdpu_bfd_probe_states\":[],\"preferred_vdpu_id\":\"vdpu0\",\"preferred_standalone_vdpu_index\":0}".to_string().into()); + map + }, + }; + + // Receive the updates + let kfvs_received = receive_n_messages(1, &swbus).await; + + // Assert we received the decoded protobuf + assert_eq!(kfvs_expected, kfvs_received[0]); + bridge.abort(); + } } diff --git a/test_utils/hamgrd/redis_data_set.cmd b/test_utils/hamgrd/redis_data_set.cmd index 841fecf..b9bcc6c 100644 --- a/test_utils/hamgrd/redis_data_set.cmd +++ b/test_utils/hamgrd/redis_data_set.cmd @@ -69,17 +69,5 @@ HSET DPU_STATE|dpu7 dpu_control_plane_state up HSET DPU_STATE|dpu7 dpu_data_plane_state up select 0 -HSET DASH_HA_SET_CONFIG_TABLE:haset0_0 version "1" -HSET DASH_HA_SET_CONFIG_TABLE:haset0_0 vip_v4 "3.2.1.0" -HSET DASH_HA_SET_CONFIG_TABLE:haset0_0 vip_v6 "" -HSET DASH_HA_SET_CONFIG_TABLE:haset0_0 owner "dpu" -HSET DASH_HA_SET_CONFIG_TABLE:haset0_0 scope "dpu" -HSET DASH_HA_SET_CONFIG_TABLE:haset0_0 vdpu_ids "vdpu0,vdpu1" -HSET DASH_HA_SET_CONFIG_TABLE:haset0_0 pinned_vdpu_bfd_probe_states "" -HSET DASH_HA_SET_CONFIG_TABLE:haset0_0 preferred_vdpu_ids "vdpu0" -HSET DASH_HA_SET_CONFIG_TABLE:haset0_0 preferred_standalone_vdpu_index "0" - -HSET DASH_HA_SCOPE_CONFIG_TABLE:vdpu0:haset0_0 version "1" -HSET DASH_HA_SCOPE_CONFIG_TABLE:vdpu0:haset0_0 disabled "true" -HSET DASH_HA_SCOPE_CONFIG_TABLE:vdpu0:haset0_0 desired_ha_state "active" -HSET DASH_HA_SCOPE_CONFIG_TABLE:vdpu0:haset0_0 approved_pending_operation_ids "" +HSET DASH_HA_SET_CONFIG_TABLE:haset0_0 pb "0a013112050d00010203220576647075302205766470753128013a057664707530" +HSET DASH_HA_SCOPE_CONFIG_TABLE:vdpu0:haset0_0 pb "0a01311001180122067465737469642802"