diff --git a/.azure-pipelines/build-template.yml b/.azure-pipelines/build-template.yml index 6052e25a..7717a025 100644 --- a/.azure-pipelines/build-template.yml +++ b/.azure-pipelines/build-template.yml @@ -203,10 +203,10 @@ jobs: export ENABLE_ASAN=y fi ./autogen.sh - dpkg-buildpackage -us -uc -b -j$(nproc) && cp ../*.deb . + RUSTFLAGS=-Dwarnings dpkg-buildpackage -us -uc -b -j$(nproc) && cp ../*.deb . displayName: "Compile sonic swss" - script: | - cargo test + RUSTFLAGS=-Dwarnings cargo test displayName: "Test countersyncd" - publish: $(System.DefaultWorkingDirectory)/ artifact: ${{ parameters.artifact_name }} diff --git a/crates/countersyncd/src/actor/otel.rs b/crates/countersyncd/src/actor/otel.rs index 1d316586..c3ece9eb 100644 --- a/crates/countersyncd/src/actor/otel.rs +++ b/crates/countersyncd/src/actor/otel.rs @@ -170,6 +170,7 @@ impl OtelActor { info!("OtelActor started"); let mut flush_timer = Box::pin(sleep_until(self.flush_deadline)); + let mut run_error: Option> = None; loop { select! { @@ -180,7 +181,10 @@ impl OtelActor { ChannelLabel::IpfixToOtel, self.stats_receiver.len(), ); - self.handle_stats_message(stats).await?; + if let Err(e) = self.handle_stats_message(stats).await { + run_error = Some(e); + break; + } self.reset_flush_timer(&mut flush_timer); } _none => { @@ -190,7 +194,10 @@ impl OtelActor { } } _ = &mut flush_timer => { - self.flush_buffer().await?; + if let Err(e) = self.flush_buffer().await { + run_error = Some(e); + break; + } self.reset_flush_timer(&mut flush_timer); } } @@ -203,9 +210,16 @@ impl OtelActor { } // Flush any remaining buffered metrics before shutdown - self.flush_buffer().await?; + if run_error.is_none() { + if let Err(e) = self.flush_buffer().await { + run_error = Some(e); + } + } self.shutdown().await; - Ok(()) + match run_error { + Some(e) => Err(e), + None => Ok(()), + } } /// Handle incoming SAI statistics message diff --git a/crates/countersyncd/src/main.rs b/crates/countersyncd/src/main.rs index 53b6a60f..1a3451c1 100644 --- a/crates/countersyncd/src/main.rs +++ b/crates/countersyncd/src/main.rs @@ -7,6 +7,7 @@ mod utilities; // External dependencies use clap::Parser; use log::{error, info}; +use opentelemetry::ExportError; use std::time::Duration; use tokio::{spawn, sync::mpsc::channel}; @@ -22,7 +23,7 @@ use crate::actor::{ }; // Internal exit codes -use countersyncd::exit_codes::EXIT_OTEL_EXPORT_RETRIES_EXHAUSTED; +use countersyncd::exit_codes::{EXIT_FAILURE, EXIT_OTEL_EXPORT_RETRIES_EXHAUSTED, EXIT_SUCCESS}; use crate::utilities::{set_comm_capacity, ChannelLabel}; /// Initialize logging based on command line arguments @@ -84,6 +85,36 @@ fn init_logging(log_level: &str, log_format: &str) { builder.init(); } +fn exit_on_join(name: &str, result: Result<(), tokio::task::JoinError>) -> ! { + match result { + Ok(()) => { + info!("{} actor exited normally; shutting down", name); + std::process::exit(EXIT_SUCCESS); + } + Err(e) => { + error!("{} actor join error: {:?}", name, e); + std::process::exit(EXIT_FAILURE); + } + } +} + +fn exit_on_otel_join(result: Result>, tokio::task::JoinError>) -> ! { + match result { + Ok(Ok(())) => { + info!("OpenTelemetry actor exited normally; shutting down"); + std::process::exit(EXIT_SUCCESS); + } + Ok(Err(e)) => { + error!("OpenTelemetry actor failed: {:?}", e); + std::process::exit(EXIT_OTEL_EXPORT_RETRIES_EXHAUSTED); + } + Err(e) => { + error!("OpenTelemetry actor join error: {:?}", e); + std::process::exit(EXIT_FAILURE); + } + } +} + /// SONiC High Frequency Telemetry Counter Sync Daemon /// /// This application processes high-frequency telemetry data from SONiC switches, @@ -356,13 +387,13 @@ async fn main() -> Result<(), Box> { info!("Starting actor tasks..."); // Spawn actor tasks - let data_netlink_handle = spawn(async move { + let mut data_netlink_handle = spawn(async move { info!("Data netlink actor started"); DataNetlinkActor::run(data_netlink).await; info!("Data netlink actor terminated"); }); - let control_netlink_handle = spawn(async move { + let mut control_netlink_handle = spawn(async move { info!("Control netlink actor started"); ControlNetlinkActor::run(control_netlink).await; info!("Control netlink actor terminated"); @@ -370,7 +401,7 @@ async fn main() -> Result<(), Box> { // Use spawn_blocking to ensure IPFIX actor runs on a dedicated thread // This is important for thread-local variables - let ipfix_handle = tokio::task::spawn_blocking(move || { + let mut ipfix_handle = tokio::task::spawn_blocking(move || { info!("IPFIX actor started on dedicated thread"); // Create a new runtime for async operations within this blocking thread let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime for IPFIX actor"); @@ -380,14 +411,14 @@ async fn main() -> Result<(), Box> { info!("IPFIX actor terminated"); }); - let swss_handle = spawn(async move { + let mut swss_handle = spawn(async move { info!("SWSS actor started"); SwssActor::run(swss).await; info!("SWSS actor terminated"); }); // Only spawn stats reporter if enabled - let reporter_handle = if let Some(stats_reporter) = stats_reporter { + let mut reporter_handle = if let Some(stats_reporter) = stats_reporter { Some(spawn(async move { info!("Stats reporter actor started"); StatsReporterActor::run(stats_reporter).await; @@ -399,7 +430,7 @@ async fn main() -> Result<(), Box> { }; // Only spawn counter DB writer if enabled - let counter_db_handle = if let Some(counter_db) = counter_db { + let mut counter_db_handle = if let Some(counter_db) = counter_db { Some(spawn(async move { info!("Counter DB actor started"); CounterDBActor::run(counter_db).await; @@ -411,7 +442,7 @@ async fn main() -> Result<(), Box> { }; // Only spawn OpenTelemetry actor if enabled - let otel_handle = if let Some(otel_actor) = otel_actor { + let mut otel_handle = if let Some(otel_actor) = otel_actor { Some(spawn(async move { info!("OpenTelemetry actor started"); let result = OtelActor::run(otel_actor).await; @@ -423,183 +454,28 @@ async fn main() -> Result<(), Box> { None }; - // Wait for all actors to complete and handle any errors - let data_netlink_result = data_netlink_handle.await; - let control_netlink_result = control_netlink_handle.await; - let ipfix_result = ipfix_handle.await.map_err(|e| { - error!("IPFIX blocking task join error: {:?}", e); - e - }); - let swss_result = swss_handle.await; - let reporter_result = if let Some(handle) = reporter_handle { - Some(handle.await) - } else { - None - }; - let counter_db_result = if let Some(handle) = counter_db_handle { - Some(handle.await) - } else { - None - }; - let otel_result = if let Some(handle) = otel_handle { - Some(handle.await) - } else { - None - }; - - // Handle results based on what actors were enabled - let all_successful = match (reporter_result.is_some(), counter_db_result.is_some(), otel_result.is_some()) { - (true, true, true) => { - // All optional actors enabled - matches!( - ( - &data_netlink_result, - &control_netlink_result, - &ipfix_result, - &swss_result, - reporter_result.as_ref().unwrap(), - counter_db_result.as_ref().unwrap(), - otel_result.as_ref().unwrap() - ), - (Ok(()), Ok(()), Ok(()), Ok(()), Ok(()), Ok(()), Ok(Ok(()))) - ) - } - (true, true, false) => { - // Stats reporter and counter DB enabled, OTEL disabled - matches!( - ( - &data_netlink_result, - &control_netlink_result, - &ipfix_result, - &swss_result, - reporter_result.as_ref().unwrap(), - counter_db_result.as_ref().unwrap() - ), - (Ok(()), Ok(()), Ok(()), Ok(()), Ok(()), Ok(())) - ) + // Exit the program as soon as any actor completes + tokio::select! { + res = &mut data_netlink_handle => { + exit_on_join("Data netlink", res); } - (true, false, true) => { - // Stats reporter and OTEL enabled, counter DB disabled - matches!( - ( - &data_netlink_result, - &control_netlink_result, - &ipfix_result, - &swss_result, - reporter_result.as_ref().unwrap(), - otel_result.as_ref().unwrap() - ), - (Ok(()), Ok(()), Ok(()), Ok(()), Ok(()), Ok(Ok(()))) - ) + res = &mut control_netlink_handle => { + exit_on_join("Control netlink", res); } - (false, true, true) => { - // Counter DB and OTEL enabled, stats reporter disabled - matches!( - ( - &data_netlink_result, - &control_netlink_result, - &ipfix_result, - &swss_result, - counter_db_result.as_ref().unwrap(), - otel_result.as_ref().unwrap() - ), - (Ok(()), Ok(()), Ok(()), Ok(()), Ok(()), Ok(Ok(()))) - ) + res = &mut ipfix_handle => { + exit_on_join("IPFIX", res); } - (true, false, false) => { - // Only stats reporter enabled - matches!( - ( - &data_netlink_result, - &control_netlink_result, - &ipfix_result, - &swss_result, - reporter_result.as_ref().unwrap() - ), - (Ok(()), Ok(()), Ok(()), Ok(()), Ok(())) - ) + res = &mut swss_handle => { + exit_on_join("SWSS", res); } - (false, true, false) => { - // Only counter DB enabled - matches!( - ( - &data_netlink_result, - &control_netlink_result, - &ipfix_result, - &swss_result, - counter_db_result.as_ref().unwrap() - ), - (Ok(()), Ok(()), Ok(()), Ok(()), Ok(())) - ) + res = async { reporter_handle.as_mut().unwrap().await }, if reporter_handle.is_some() => { + exit_on_join("Stats reporter", res); } - (false, false, true) => { - // Only OTEL enabled - matches!( - ( - &data_netlink_result, - &control_netlink_result, - &ipfix_result, - &swss_result, - otel_result.as_ref().unwrap() - ), - (Ok(()), Ok(()), Ok(()), Ok(()), Ok(Ok(()))) - ) + res = async { counter_db_handle.as_mut().unwrap().await }, if counter_db_handle.is_some() => { + exit_on_join("Counter DB", res); } - (false, false, false) => { - // None of the optional actors enabled - matches!( - ( - &data_netlink_result, - &control_netlink_result, - &ipfix_result, - &swss_result - ), - (Ok(()), Ok(()), Ok(()), Ok(())) - ) - } - }; - - if all_successful { - let status_msg = match (reporter_result.is_some(), counter_db_result.is_some(), otel_result.is_some()) { - (true, true, true) => "All actors completed successfully", - (true, true, false) => "All actors completed successfully (OpenTelemetry disabled)", - (true, false, true) => "All actors completed successfully (counter DB disabled)", - (false, true, true) => "All actors completed successfully (stats reporting disabled)", - (true, false, false) => "All actors completed successfully (counter DB and OpenTelemetry disabled)", - (false, true, false) => "All actors completed successfully (stats reporting and OpenTelemetry disabled)", - (false, false, true) => "All actors completed successfully (stats reporting and counter DB disabled)", - (false, false, false) => { - "All actors completed successfully (stats reporting, counter DB, and OpenTelemetry disabled)" - } - }; - info!("{}", status_msg); - Ok(()) - } else { - // Check which actor failed - if let Err(e) = data_netlink_result { - error!("Data netlink actor failed: {:?}", e); - Err(e.into()) - } else if let Err(e) = control_netlink_result { - error!("Control netlink actor failed: {:?}", e); - Err(e.into()) - } else if let Err(e) = ipfix_result { - error!("IPFIX actor failed: {:?}", e); - Err(e.into()) - } else if let Err(e) = swss_result { - error!("SWSS actor failed: {:?}", e); - Err(e.into()) - } else if let Some(Err(e)) = reporter_result { - error!("Stats reporter actor failed: {:?}", e); - Err(e.into()) - } else if let Some(Err(e)) = counter_db_result { - error!("Counter DB actor failed: {:?}", e); - Err(e.into()) - } else if let Some(Err(e)) = otel_result { - error!("OpenTelemetry actor failed: {:?}", e); - std::process::exit(EXIT_OTEL_EXPORT_RETRIES_EXHAUSTED); - } else { - error!("Unknown actor failure"); - Err("Unknown actor failure".into()) + res = async { otel_handle.as_mut().unwrap().await }, if otel_handle.is_some() => { + exit_on_otel_join(res); } } }