Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .azure-pipelines/build-template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,10 @@ jobs:
export ENABLE_ASAN=y
fi
./autogen.sh
dpkg-buildpackage -us -uc -b -j$(nproc) && cp ../*.deb .
RUSTFLAGS=-Dwarnings dpkg-buildpackage -us -uc -b -j$(nproc) && cp ../*.deb .
displayName: "Compile sonic swss"
- script: |
cargo test
RUSTFLAGS=-Dwarnings cargo test
displayName: "Test countersyncd"
- publish: $(System.DefaultWorkingDirectory)/
artifact: ${{ parameters.artifact_name }}
Expand Down
22 changes: 18 additions & 4 deletions crates/countersyncd/src/actor/otel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ impl OtelActor {
info!("OtelActor started");

let mut flush_timer = Box::pin(sleep_until(self.flush_deadline));
let mut run_error: Option<Box<dyn ExportError>> = None;

loop {
select! {
Expand All @@ -180,7 +181,10 @@ impl OtelActor {
ChannelLabel::IpfixToOtel,
self.stats_receiver.len(),
);
self.handle_stats_message(stats).await?;
if let Err(e) = self.handle_stats_message(stats).await {
run_error = Some(e);
break;
}
self.reset_flush_timer(&mut flush_timer);
}
_none => {
Expand All @@ -190,7 +194,10 @@ impl OtelActor {
}
}
_ = &mut flush_timer => {
self.flush_buffer().await?;
if let Err(e) = self.flush_buffer().await {
run_error = Some(e);
break;
}
self.reset_flush_timer(&mut flush_timer);
}
}
Expand All @@ -203,9 +210,16 @@ impl OtelActor {
}

// Flush any remaining buffered metrics before shutdown
self.flush_buffer().await?;
if run_error.is_none() {
if let Err(e) = self.flush_buffer().await {
run_error = Some(e);
}
}
self.shutdown().await;
Ok(())
match run_error {
Some(e) => Err(e),
None => Ok(()),
}
}

/// Handle incoming SAI statistics message
Expand Down
234 changes: 55 additions & 179 deletions crates/countersyncd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod utilities;
// External dependencies
use clap::Parser;
use log::{error, info};
use opentelemetry::ExportError;
use std::time::Duration;
use tokio::{spawn, sync::mpsc::channel};

Expand All @@ -22,7 +23,7 @@ use crate::actor::{
};

// Internal exit codes
use countersyncd::exit_codes::EXIT_OTEL_EXPORT_RETRIES_EXHAUSTED;
use countersyncd::exit_codes::{EXIT_FAILURE, EXIT_OTEL_EXPORT_RETRIES_EXHAUSTED, EXIT_SUCCESS};
use crate::utilities::{set_comm_capacity, ChannelLabel};

/// Initialize logging based on command line arguments
Expand Down Expand Up @@ -84,6 +85,36 @@ fn init_logging(log_level: &str, log_format: &str) {
builder.init();
}

fn exit_on_join(name: &str, result: Result<(), tokio::task::JoinError>) -> ! {
match result {
Ok(()) => {
info!("{} actor exited normally; shutting down", name);
std::process::exit(EXIT_SUCCESS);
}
Err(e) => {
error!("{} actor join error: {:?}", name, e);
std::process::exit(EXIT_FAILURE);
}
}
}

fn exit_on_otel_join(result: Result<Result<(), Box<dyn ExportError>>, tokio::task::JoinError>) -> ! {
match result {
Ok(Ok(())) => {
info!("OpenTelemetry actor exited normally; shutting down");
std::process::exit(EXIT_SUCCESS);
}
Ok(Err(e)) => {
error!("OpenTelemetry actor failed: {:?}", e);
std::process::exit(EXIT_OTEL_EXPORT_RETRIES_EXHAUSTED);
}
Err(e) => {
error!("OpenTelemetry actor join error: {:?}", e);
std::process::exit(EXIT_FAILURE);
}
}
}

/// SONiC High Frequency Telemetry Counter Sync Daemon
///
/// This application processes high-frequency telemetry data from SONiC switches,
Expand Down Expand Up @@ -356,21 +387,21 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
info!("Starting actor tasks...");

// Spawn actor tasks
let data_netlink_handle = spawn(async move {
let mut data_netlink_handle = spawn(async move {
info!("Data netlink actor started");
DataNetlinkActor::run(data_netlink).await;
info!("Data netlink actor terminated");
});

let control_netlink_handle = spawn(async move {
let mut control_netlink_handle = spawn(async move {
info!("Control netlink actor started");
ControlNetlinkActor::run(control_netlink).await;
info!("Control netlink actor terminated");
});

// Use spawn_blocking to ensure IPFIX actor runs on a dedicated thread
// This is important for thread-local variables
let ipfix_handle = tokio::task::spawn_blocking(move || {
let mut ipfix_handle = tokio::task::spawn_blocking(move || {
info!("IPFIX actor started on dedicated thread");
// Create a new runtime for async operations within this blocking thread
let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime for IPFIX actor");
Expand All @@ -380,14 +411,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
info!("IPFIX actor terminated");
});

let swss_handle = spawn(async move {
let mut swss_handle = spawn(async move {
info!("SWSS actor started");
SwssActor::run(swss).await;
info!("SWSS actor terminated");
});

// Only spawn stats reporter if enabled
let reporter_handle = if let Some(stats_reporter) = stats_reporter {
let mut reporter_handle = if let Some(stats_reporter) = stats_reporter {
Some(spawn(async move {
info!("Stats reporter actor started");
StatsReporterActor::run(stats_reporter).await;
Expand All @@ -399,7 +430,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
};

// Only spawn counter DB writer if enabled
let counter_db_handle = if let Some(counter_db) = counter_db {
let mut counter_db_handle = if let Some(counter_db) = counter_db {
Some(spawn(async move {
info!("Counter DB actor started");
CounterDBActor::run(counter_db).await;
Expand All @@ -411,7 +442,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
};

// Only spawn OpenTelemetry actor if enabled
let otel_handle = if let Some(otel_actor) = otel_actor {
let mut otel_handle = if let Some(otel_actor) = otel_actor {
Some(spawn(async move {
info!("OpenTelemetry actor started");
let result = OtelActor::run(otel_actor).await;
Expand All @@ -423,183 +454,28 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
None
};

// Wait for all actors to complete and handle any errors
let data_netlink_result = data_netlink_handle.await;
let control_netlink_result = control_netlink_handle.await;
let ipfix_result = ipfix_handle.await.map_err(|e| {
error!("IPFIX blocking task join error: {:?}", e);
e
});
let swss_result = swss_handle.await;
let reporter_result = if let Some(handle) = reporter_handle {
Some(handle.await)
} else {
None
};
let counter_db_result = if let Some(handle) = counter_db_handle {
Some(handle.await)
} else {
None
};
let otel_result = if let Some(handle) = otel_handle {
Some(handle.await)
} else {
None
};

// Handle results based on what actors were enabled
let all_successful = match (reporter_result.is_some(), counter_db_result.is_some(), otel_result.is_some()) {
(true, true, true) => {
// All optional actors enabled
matches!(
(
&data_netlink_result,
&control_netlink_result,
&ipfix_result,
&swss_result,
reporter_result.as_ref().unwrap(),
counter_db_result.as_ref().unwrap(),
otel_result.as_ref().unwrap()
),
(Ok(()), Ok(()), Ok(()), Ok(()), Ok(()), Ok(()), Ok(Ok(())))
)
}
(true, true, false) => {
// Stats reporter and counter DB enabled, OTEL disabled
matches!(
(
&data_netlink_result,
&control_netlink_result,
&ipfix_result,
&swss_result,
reporter_result.as_ref().unwrap(),
counter_db_result.as_ref().unwrap()
),
(Ok(()), Ok(()), Ok(()), Ok(()), Ok(()), Ok(()))
)
// Exit the program as soon as any actor completes
tokio::select! {
res = &mut data_netlink_handle => {
exit_on_join("Data netlink", res);
}
(true, false, true) => {
// Stats reporter and OTEL enabled, counter DB disabled
matches!(
(
&data_netlink_result,
&control_netlink_result,
&ipfix_result,
&swss_result,
reporter_result.as_ref().unwrap(),
otel_result.as_ref().unwrap()
),
(Ok(()), Ok(()), Ok(()), Ok(()), Ok(()), Ok(Ok(())))
)
res = &mut control_netlink_handle => {
exit_on_join("Control netlink", res);
}
(false, true, true) => {
// Counter DB and OTEL enabled, stats reporter disabled
matches!(
(
&data_netlink_result,
&control_netlink_result,
&ipfix_result,
&swss_result,
counter_db_result.as_ref().unwrap(),
otel_result.as_ref().unwrap()
),
(Ok(()), Ok(()), Ok(()), Ok(()), Ok(()), Ok(Ok(())))
)
res = &mut ipfix_handle => {
exit_on_join("IPFIX", res);
}
(true, false, false) => {
// Only stats reporter enabled
matches!(
(
&data_netlink_result,
&control_netlink_result,
&ipfix_result,
&swss_result,
reporter_result.as_ref().unwrap()
),
(Ok(()), Ok(()), Ok(()), Ok(()), Ok(()))
)
res = &mut swss_handle => {
exit_on_join("SWSS", res);
}
(false, true, false) => {
// Only counter DB enabled
matches!(
(
&data_netlink_result,
&control_netlink_result,
&ipfix_result,
&swss_result,
counter_db_result.as_ref().unwrap()
),
(Ok(()), Ok(()), Ok(()), Ok(()), Ok(()))
)
res = async { reporter_handle.as_mut().unwrap().await }, if reporter_handle.is_some() => {
exit_on_join("Stats reporter", res);
}
(false, false, true) => {
// Only OTEL enabled
matches!(
(
&data_netlink_result,
&control_netlink_result,
&ipfix_result,
&swss_result,
otel_result.as_ref().unwrap()
),
(Ok(()), Ok(()), Ok(()), Ok(()), Ok(Ok(())))
)
res = async { counter_db_handle.as_mut().unwrap().await }, if counter_db_handle.is_some() => {
exit_on_join("Counter DB", res);
}
(false, false, false) => {
// None of the optional actors enabled
matches!(
(
&data_netlink_result,
&control_netlink_result,
&ipfix_result,
&swss_result
),
(Ok(()), Ok(()), Ok(()), Ok(()))
)
}
};

if all_successful {
let status_msg = match (reporter_result.is_some(), counter_db_result.is_some(), otel_result.is_some()) {
(true, true, true) => "All actors completed successfully",
(true, true, false) => "All actors completed successfully (OpenTelemetry disabled)",
(true, false, true) => "All actors completed successfully (counter DB disabled)",
(false, true, true) => "All actors completed successfully (stats reporting disabled)",
(true, false, false) => "All actors completed successfully (counter DB and OpenTelemetry disabled)",
(false, true, false) => "All actors completed successfully (stats reporting and OpenTelemetry disabled)",
(false, false, true) => "All actors completed successfully (stats reporting and counter DB disabled)",
(false, false, false) => {
"All actors completed successfully (stats reporting, counter DB, and OpenTelemetry disabled)"
}
};
info!("{}", status_msg);
Ok(())
} else {
// Check which actor failed
if let Err(e) = data_netlink_result {
error!("Data netlink actor failed: {:?}", e);
Err(e.into())
} else if let Err(e) = control_netlink_result {
error!("Control netlink actor failed: {:?}", e);
Err(e.into())
} else if let Err(e) = ipfix_result {
error!("IPFIX actor failed: {:?}", e);
Err(e.into())
} else if let Err(e) = swss_result {
error!("SWSS actor failed: {:?}", e);
Err(e.into())
} else if let Some(Err(e)) = reporter_result {
error!("Stats reporter actor failed: {:?}", e);
Err(e.into())
} else if let Some(Err(e)) = counter_db_result {
error!("Counter DB actor failed: {:?}", e);
Err(e.into())
} else if let Some(Err(e)) = otel_result {
error!("OpenTelemetry actor failed: {:?}", e);
std::process::exit(EXIT_OTEL_EXPORT_RETRIES_EXHAUSTED);
} else {
error!("Unknown actor failure");
Err("Unknown actor failure".into())
res = async { otel_handle.as_mut().unwrap().await }, if otel_handle.is_some() => {
exit_on_otel_join(res);
}
}
}
Loading