Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ stages:
runBranch: 'refs/heads/master'
path: $(Build.ArtifactStagingDirectory)/download/common
patterns: |
target/debs/bullseye/libnl-3-200_*.deb
target/debs/bullseye/libnl-3-dev_*.deb
target/debs/bullseye/libnl-genl-3-200_*.deb
target/debs/bullseye/libnl-genl-3-dev_*.deb
target/debs/bullseye/libnl-route-3-200_*.deb
target/debs/bullseye/libnl-route-3-dev_*.deb
target/debs/bullseye/libnl-nf-3-200_*.deb
target/debs/bullseye/libnl-nf-3-dev_*.deb
target/debs/bullseye/libyang_*.deb
target/debs/bookworm/libnl-3-200_*.deb
target/debs/bookworm/libnl-3-dev_*.deb
target/debs/bookworm/libnl-genl-3-200_*.deb
target/debs/bookworm/libnl-genl-3-dev_*.deb
target/debs/bookworm/libnl-route-3-200_*.deb
target/debs/bookworm/libnl-route-3-dev_*.deb
target/debs/bookworm/libnl-nf-3-200_*.deb
target/debs/bookworm/libnl-nf-3-dev_*.deb
target/debs/bookworm/libyang_*.deb
displayName: "Download common-libs deb packages"

- task: DownloadPipelineArtifact@2
Expand Down
22 changes: 20 additions & 2 deletions crates/hamgrd/src/actors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ use swbus_edge::swbus_proto::message_id_generator::MessageIdGenerator;
use swbus_edge::swbus_proto::result::*;
use swbus_edge::swbus_proto::swbus::{swbus_message::Body, DataRequest, ServicePath, SwbusErrorCode, SwbusMessage};
use swbus_edge::SwbusEdgeRuntime;
use swss_common::{KeyOpFieldValues, KeyOperation, SubscriberStateTable, ZmqClient, ZmqProducerStateTable};
use swss_common::{
KeyOpFieldValues, KeyOperation, ProducerStateTable, SubscriberStateTable, ZmqClient, ZmqProducerStateTable,
};
use swss_common_bridge::{consumer::ConsumerBridge, producer::spawn_producer_bridge};
use tokio::sync::mpsc::{channel, Receiver};
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -108,7 +110,7 @@ where
SwbusError::RouteError { code, detail } => (code, detail),
};
let response = SwbusMessage::new_response(
&msg,
msg.header.as_ref().unwrap(),
Some(&self.sp),
code,
&err_msg,
Expand Down Expand Up @@ -277,3 +279,19 @@ where
anyhow::bail!("Failed to connect to ZMQ server at {}", zmq_endpoint);
}
}

pub async fn spawn_vanilla_producer_bridge<T>(edge_runtime: Arc<SwbusEdgeRuntime>) -> AnyhowResult<JoinHandle<()>>
where
T: SonicDbTable + 'static,
{
let db = crate::db_for_table::<T>().await?;
let pst = ProducerStateTable::new(db, T::table_name()).unwrap();

let sp = crate::common_bridge_sp::<T>(&edge_runtime);
info!(
"spawned producer bridge for {} at {}",
T::table_name(),
sp.to_longest_path()
);
Ok(spawn_producer_bridge(edge_runtime.clone(), sp, pst))
}
119 changes: 80 additions & 39 deletions crates/hamgrd/src/actors/dpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ use crate::ha_actor_messages::{ActorRegistration, DpuActorState, RegistrationTyp
use crate::ServicePath;
use anyhow::{anyhow, Result};
use sonic_common::SonicDbTable;
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use swbus_actor::{state::incoming::Incoming, state::outgoing::Outgoing, Actor, ActorMessage, Context, State};
use swbus_edge::SwbusEdgeRuntime;
use swss_common::{KeyOpFieldValues, KeyOperation, SubscriberStateTable};
use swss_common_bridge::consumer::ConsumerBridge;
use tracing::{debug, error, info, instrument};
use tracing::{debug, error, instrument};

use super::spawn_consumer_bridge_for_actor_with_selector;

Expand Down Expand Up @@ -60,19 +60,30 @@ impl DpuActor {
RemoteDpu::table_name()
}

fn get_dpu_state(incoming: &Incoming) -> Result<DpuState> {
let dpu_state_kfv: KeyOpFieldValues = incoming.get(DpuState::table_name())?.deserialize_data()?;
Ok(swss_serde::from_field_values(&dpu_state_kfv.field_values)?)
fn get_dpu_state(incoming: &Incoming) -> Result<Option<DpuState>> {
match incoming.get(DpuState::table_name()) {
Some(msg) => {
let dpu_state_kfv: KeyOpFieldValues = msg.deserialize_data()?;
Ok(Some(swss_serde::from_field_values(&dpu_state_kfv.field_values)?))
}
None => Ok(None),
}
}

fn get_bfd_probe_state(incoming: &Incoming) -> Result<DashBfdProbeState> {
let bfd_probe_kfv: KeyOpFieldValues = incoming.get(DashBfdProbeState::table_name())?.deserialize_data()?;
Ok(swss_serde::from_field_values(&bfd_probe_kfv.field_values)?)
fn get_bfd_probe_state(incoming: &Incoming) -> Result<Option<DashBfdProbeState>> {
match incoming.get(DashBfdProbeState::table_name()) {
Some(msg) => {
let bfd_probe_kfv: KeyOpFieldValues = msg.deserialize_data()?;
Ok(Some(swss_serde::from_field_values(&bfd_probe_kfv.field_values)?))
}
None => Ok(None),
}
}

fn get_dash_ha_global_config(incoming: &Incoming) -> Result<DashHaGlobalConfig> {
let ha_global_config_kfv: KeyOpFieldValues =
incoming.get(DashHaGlobalConfig::table_name())?.deserialize_data()?;
let ha_global_config_kfv: KeyOpFieldValues = incoming
.get_or_fail(DashHaGlobalConfig::table_name())?
.deserialize_data()?;
Ok(swss_serde::from_field_values(&ha_global_config_kfv.field_values)?)
}

Expand Down Expand Up @@ -125,10 +136,17 @@ impl DpuActor {
Ok(bridges)
}

fn do_cleanup(&mut self, _context: &mut Context, state: &mut State) {
if let Err(e) = self.update_bfd_sessions(state, true) {
error!("Failed to cleanup BFD sessions: {}", e);
}
}

async fn handle_dpu_message(&mut self, state: &mut State, key: &str, context: &mut Context) -> Result<()> {
let (_internal, incoming, outgoing) = state.get_all();
let dpu_kfv: KeyOpFieldValues = incoming.get(key)?.deserialize_data()?;
let dpu_kfv: KeyOpFieldValues = incoming.get_or_fail(key)?.deserialize_data()?;
if dpu_kfv.operation == KeyOperation::Del {
self.do_cleanup(context, state);
context.stop();
return Ok(());
}
Expand Down Expand Up @@ -226,16 +244,16 @@ impl DpuActor {
}
// Check pmon state from DPU_STATE table
let dpu_state = match Self::get_dpu_state(incoming) {
Ok(dpu_state) => Some(dpu_state),
Ok(dpu_state) => dpu_state,
Err(e) => {
info!("Not able to get DPU state. Assume DPU is down. Error: {}", e);
error!("Failed to decode DPU_STATE. Error: {}", e);
None
}
};
let bfd_probe_state = match Self::get_bfd_probe_state(incoming) {
Ok(bfd_probe_state) => Some(bfd_probe_state),
Ok(bfd_probe_state) => bfd_probe_state,
Err(e) => {
debug!("Not able to get BFD probe state. Error: {}", e);
error!("Failed to decode DASH_BFD_PROBE_STATE. Error: {}", e);
None
}
};
Expand Down Expand Up @@ -303,7 +321,9 @@ impl DpuActor {

// handle DPU state registration request. In response to the request, this actor will send its current state.
fn handle_dpu_state_registration(&mut self, key: &str, incoming: &Incoming, outgoing: &mut Outgoing) -> Result<()> {
let entry = incoming.get_entry(key)?;
let entry = incoming
.get_entry(key)
.ok_or_else(|| anyhow!("Entry not found for key: {}", key))?;
let ActorRegistration { active, .. } = entry.msg.deserialize_data()?;
if active {
self.update_dpu_state(incoming, outgoing, Some(entry.source.clone()))?;
Expand All @@ -316,38 +336,50 @@ impl DpuActor {
peer_ip: &str,
global_cfg: &DashHaGlobalConfig,
outgoing: &mut Outgoing,
remove: bool,
) -> Result<()> {
// todo: this needs to wait until HaScope has been activated.
let Some(DpuData::LocalDpu { ref dpu, .. }) = self.dpu else {
debug!("DPU is not managed by this HA instance. Ignore BFD session creation");
return Ok(());
};
let bfd_session = BfdSessionTable {
tx_interval: global_cfg.dpu_bfd_probe_interval_in_ms,
rx_interval: global_cfg.dpu_bfd_probe_interval_in_ms,
multiplier: global_cfg.dpu_bfd_probe_multiplier,
multihop: true,
local_addr: dpu.pa_ipv4.clone(),
session_type: Some("passive".to_string()),
shutdown: false,
};

let fv = swss_serde::to_field_values(&bfd_session)?;
let sep = BfdSessionTable::key_separator();
let kfv = KeyOpFieldValues {
key: format!("default{sep}default{sep}{peer_ip}"),
operation: KeyOperation::Set,
field_values: fv,
let key = format!("default{sep}default{sep}{peer_ip}");

let kfv = if remove {
KeyOpFieldValues {
key,
operation: KeyOperation::Del,
field_values: HashMap::new(),
}
} else {
let bfd_session = BfdSessionTable {
tx_interval: global_cfg.dpu_bfd_probe_interval_in_ms,
rx_interval: global_cfg.dpu_bfd_probe_interval_in_ms,
multiplier: global_cfg.dpu_bfd_probe_multiplier,
multihop: true,
local_addr: dpu.pa_ipv4.clone(),
session_type: Some("passive".to_string()),
shutdown: false,
};

let fv = swss_serde::to_field_values(&bfd_session)?;
KeyOpFieldValues {
key,
operation: KeyOperation::Set,
field_values: fv,
}
};

let msg = ActorMessage::new(self.id.clone(), &kfv)?;
outgoing.send(outgoing.common_bridge_sp::<BfdSessionTable>(), msg);
Ok(())
}

fn update_bfd_sessions(&self, state: &mut State) -> Result<()> {
fn update_bfd_sessions(&self, state: &mut State, remove: bool) -> Result<()> {
if !self.is_local_managed() {
debug!("DPU is not managed by this HA instance. Ignore BFD session creation");
debug!("DPU is not managed by this HA instance. Ignore BFD session creation or deletion");
return Ok(());
}
let (_internal, incoming, outgoing) = state.get_all();
Expand Down Expand Up @@ -375,7 +407,7 @@ impl DpuActor {
remote_npus.push(npu_ipv4.clone());
remote_npus.sort();
for npu in remote_npus {
self.update_bfd_session(&npu, &global_cfg, outgoing)?;
self.update_bfd_session(&npu, &global_cfg, outgoing, remove)?;
}
Ok(())
}
Expand All @@ -387,7 +419,7 @@ impl DpuActor {
context: &mut Context,
) -> Result<()> {
let (_internal, incoming, outgoing) = state.get_all();
let dpu_kfv: KeyOpFieldValues = incoming.get(key)?.deserialize_data()?;
let dpu_kfv: KeyOpFieldValues = incoming.get_or_fail(key)?.deserialize_data()?;
if dpu_kfv.operation == KeyOperation::Del {
context.stop();
return Ok(());
Expand All @@ -402,13 +434,13 @@ impl DpuActor {

fn handle_remote_dpu_message_to_local_dpu(&mut self, state: &mut State, key: &str) -> Result<()> {
let (_internal, incoming, outgoing) = state.get_all();
let dpu_kfv: KeyOpFieldValues = incoming.get(key)?.deserialize_data()?;
let dpu_kfv: KeyOpFieldValues = incoming.get_or_fail(key)?.deserialize_data()?;

let remote_dpu: RemoteDpu = swss_serde::from_field_values(&dpu_kfv.field_values)?;

// create bfd session
let global_cfg = Self::get_dash_ha_global_config(incoming)?;
self.update_bfd_session(&remote_dpu.npu_ipv4, &global_cfg, outgoing)?;
self.update_bfd_session(&remote_dpu.npu_ipv4, &global_cfg, outgoing, false)?;
Ok(())
}

Expand All @@ -421,7 +453,7 @@ impl DpuActor {
}

fn handle_dash_ha_global_config(&mut self, state: &mut State) -> Result<()> {
self.update_bfd_sessions(state)?;
self.update_bfd_sessions(state, false)?;
Ok(())
}

Expand Down Expand Up @@ -550,7 +582,7 @@ mod test {
recv! { key: "switch0_dpu0", data: {"key": "default:default:10.0.3.0", "operation": "Set", "field_values": bfd_fvs},
addr: crate::common_bridge_sp::<BfdSessionTable>(&runtime.get_swbus_edge()) },

send! { key: DashBfdProbeState::table_name(), data: { "key": "", "operation": "Set", "field_values":serde_json::to_value(to_field_values(&dpu_bfd_up_state).unwrap()).unwrap()} },
send! { key: DashBfdProbeState::table_name(), data: { "key": "dash_ha", "operation": "Set", "field_values":serde_json::to_value(to_field_values(&dpu_bfd_up_state).unwrap()).unwrap()} },
recv! { key: "DPUStateUpdate|switch0_dpu0", data: dpu_actor_up_state, addr: runtime.sp("vdpu", "test-vdpu") },

// Simulate DPU_STATE planes going down then up
Expand All @@ -560,12 +592,21 @@ mod test {
recv! { key: "DPUStateUpdate|switch0_dpu0", data: dpu_actor_up_state, addr: runtime.sp("vdpu", "test-vdpu") },

// Simulate BFD probe going down
send! { key: DashBfdProbeState::table_name(), data: { "key": "", "operation": "Set", "field_values": serde_json::to_value(to_field_values(&dpu_bfd_down_state).unwrap()).unwrap()} },
send! { key: DashBfdProbeState::table_name(), data: { "key": "dash_ha", "operation": "Set", "field_values": serde_json::to_value(to_field_values(&dpu_bfd_down_state).unwrap()).unwrap()} },
recv! { key: "DPUStateUpdate|switch0_dpu0", data: dpu_actor_bfd_down_state, addr: runtime.sp("vdpu", "test-vdpu") },

// simulate delete of Dpu entry
send! { key: Dpu::table_name(), data: { "key": DpuActor::dpu_table_name(), "operation": "Del", "field_values": dpu_fvs},
addr: crate::common_bridge_sp::<Dpu>(&runtime.get_swbus_edge()) },

recv! { key: "switch0_dpu0", data: {"key": "default:default:10.0.0.0", "operation": "Del", "field_values": {}},
addr: crate::common_bridge_sp::<BfdSessionTable>(&runtime.get_swbus_edge()) },
recv! { key: "switch0_dpu0", data: {"key": "default:default:10.0.1.0", "operation": "Del", "field_values": {}},
addr: crate::common_bridge_sp::<BfdSessionTable>(&runtime.get_swbus_edge()) },
recv! { key: "switch0_dpu0", data: {"key": "default:default:10.0.2.0", "operation": "Del", "field_values": {}},
addr: crate::common_bridge_sp::<BfdSessionTable>(&runtime.get_swbus_edge()) },
recv! { key: "switch0_dpu0", data: {"key": "default:default:10.0.3.0", "operation": "Del", "field_values": {}},
addr: crate::common_bridge_sp::<BfdSessionTable>(&runtime.get_swbus_edge()) },
];
test::run_commands(&runtime, runtime.sp("dpu", "switch0_dpu0"), &commands).await;
if tokio::time::timeout(Duration::from_secs(1), handle).await.is_err() {
Expand Down
Loading