diff --git a/crates/countersyncd/src/actor/otel.rs b/crates/countersyncd/src/actor/otel.rs index 4db909ff..220cec0c 100644 --- a/crates/countersyncd/src/actor/otel.rs +++ b/crates/countersyncd/src/actor/otel.rs @@ -1,20 +1,43 @@ -use std::time::Duration; -use std::pin::Pin; -use tokio::{sync::mpsc::Receiver, sync::oneshot, select}; -use tokio::time::{sleep_until, Instant as TokioInstant, Sleep}; +use std::{ + fmt::{Display, Formatter}, + pin::Pin, + time::Duration, +}; +use tokio::{ + select, + sync::{mpsc::Receiver, oneshot}, + time::{sleep_until, Instant as TokioInstant, Sleep}, +}; +use log::{debug, error, info, warn}; +use tonic::transport::{Channel, Endpoint}; +use opentelemetry::ExportError; use opentelemetry_proto::tonic::{ - common::v1::{KeyValue as ProtoKeyValue, AnyValue, any_value::Value, InstrumentationScope}, - metrics::v1::{Metric, Gauge as ProtoGauge, ResourceMetrics, ScopeMetrics}, + collector::metrics::v1::{ + metrics_service_client::MetricsServiceClient, + ExportMetricsServiceRequest, + }, + common::v1::{ + any_value::Value, + AnyValue, + InstrumentationScope, + KeyValue as ProtoKeyValue, + }, + metrics::v1::{ + Gauge as ProtoGauge, + Metric, + ResourceMetrics, + ScopeMetrics, + }, resource::v1::Resource as ProtoResource, }; use crate::message::{ - saistats::SAIStatsMessage, otel::OtelMetrics, + saistats::SAIStatsMessage, }; -use log::{info, error, debug}; -use opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_client::MetricsServiceClient; -use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; -use tonic::transport::Endpoint; + +const INITIAL_BACKOFF_DELAY_SECS: u64 = 1; +const MAX_BACKOFF_DELAY_SECS: u64 = 10; +const MAX_EXPORT_RETRIES: u64 = 30; /// Configuration for the OtelActor #[derive(Debug, Clone)] @@ -40,12 +63,29 @@ impl Default for OtelActorConfig { } } +#[derive(Debug)] +pub struct OtelActorExportError(String); + +impl std::error::Error for OtelActorExportError {} + +impl ExportError for OtelActorExportError { + fn exporter_name(&self) -> &'static str { + "Otel client exporter" + } +} + +impl Display for OtelActorExportError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + /// Actor that receives SAI statistics and exports to OpenTelemetry pub struct OtelActor { stats_receiver: Receiver, config: OtelActorConfig, shutdown_notifier: Option>, - client: MetricsServiceClient, + client: Option>, // Pre-allocated reusable structures resource: ProtoResource, @@ -61,6 +101,12 @@ pub struct OtelActor { exports_performed: u64, export_failures: u64, console_reports: u64, + + // Reconnecting tracking + consecutive_failures: u64, + + // Shutdown flag + should_shutdown: bool, } impl OtelActor { @@ -70,8 +116,7 @@ impl OtelActor { config: OtelActorConfig, shutdown_notifier: oneshot::Sender<()> ) -> Result> { - let endpoint = config.collector_endpoint.parse::()?; - let client = MetricsServiceClient::connect(endpoint).await?; + let client = None; // Pre-create reusable resource let resource = ProtoResource { @@ -114,11 +159,13 @@ impl OtelActor { exports_performed: 0, export_failures: 0, console_reports: 0, + consecutive_failures: 0, + should_shutdown: false, }) } /// Main run loop - pub async fn run(mut self) { + pub async fn run(mut self) -> Result<(), Box> { info!("OtelActor started"); let mut flush_timer = Box::pin(sleep_until(self.flush_deadline)); @@ -128,29 +175,36 @@ impl OtelActor { stats_msg = self.stats_receiver.recv() => { match stats_msg { Some(stats) => { - self.handle_stats_message(stats).await; + self.handle_stats_message(stats).await?; self.reset_flush_timer(&mut flush_timer); } - None => { + _none => { info!("Stats receiver channel closed, shutting down OtelActor"); break; } } } _ = &mut flush_timer => { - self.flush_buffer().await; + self.flush_buffer().await?; self.reset_flush_timer(&mut flush_timer); } } + + // Check for shutdown flag + if self.should_shutdown { + info!("Shutdown flag set, exiting Otel run loop"); + break; + } } // Flush any remaining buffered metrics before shutdown - self.flush_buffer().await; + self.flush_buffer().await?; self.shutdown().await; + Ok(()) } /// Handle incoming SAI statistics message - async fn handle_stats_message(&mut self, stats: SAIStatsMessage) { + async fn handle_stats_message(&mut self, stats: SAIStatsMessage) -> Result<(), Box>{ self.messages_received += 1; debug!("Received SAI stats with {} entries, observation_time: {}", @@ -176,9 +230,11 @@ impl OtelActor { // Force flush when counter threshold is reached if self.buffered_counters >= self.config.max_counters_per_export { - self.flush_buffer().await; + self.flush_buffer().await?; self.flush_deadline = TokioInstant::now() + self.config.flush_timeout; } + + Ok(()) } async fn print_otel_metrics(&mut self, otel_metrics: &OtelMetrics) { @@ -214,16 +270,75 @@ impl OtelActor { } } - debug!("Raw Gauge: {:#?}", gauge); + info!("Raw Gauge: {:#?}", gauge); } } } + // Exponential backoff + async fn backoff(&self, attempt: u64) { + let delay_secs = std::cmp::min(INITIAL_BACKOFF_DELAY_SECS * 2u64.pow(attempt as u32 - 1), MAX_BACKOFF_DELAY_SECS); + tokio::time::sleep(Duration::from_secs(delay_secs)).await; + } + + // Get or create the Otel MetricsServiceClient + fn get_client(&mut self) -> Option<&mut MetricsServiceClient> { + if self.client.is_none() { + let endpoint = match self.config.collector_endpoint.parse::() { + Ok(e) => e, + Err(e) => { + warn!("Invalid Otel endpoint: {}", e); + return None; + } + }; + + let channel = endpoint.connect_lazy(); + self.client = Some(MetricsServiceClient::new(channel)); + } + + self.client.as_mut() + } + + async fn send_request( + &mut self, + request: ExportMetricsServiceRequest, + ) -> Result<(), Box> { + for attempt in 1..=MAX_EXPORT_RETRIES { + // Ensure we have a client + let client = match self.get_client() { + Some(c) => c, // Use existing or newly created client + _none => { // Failed to create client + self.client = None; + self.backoff(attempt).await; // Wait before retrying + continue; + } + }; + + // Attempt to send the request + match client.export(request.clone()).await { + Ok(_) => { // Successful export + self.exports_performed += 1; + self.consecutive_failures = 0; + return Ok(()); + } + Err(e) => { + warn!("Export attempt {} failed: {}", attempt, e); + self.client = None; // Drop broken client + self.consecutive_failures += 1; + self.backoff(attempt).await; // Wait before retrying + } + } + } + + // All retries exhausted + Err(Box::new(OtelActorExportError("Max export retries exceeded".to_string()))) + } + // Export buffered metrics to OpenTelemetry collector - async fn flush_buffer(&mut self) { + async fn flush_buffer(&mut self) -> Result<(), Box> { if self.buffer.is_empty() { - return; + return Ok(()); } let mut proto_metrics: Vec = Vec::new(); @@ -251,7 +366,7 @@ impl OtelActor { if proto_metrics.is_empty() { self.buffer.clear(); self.buffered_counters = 0; - return; + return Ok(()); } let resource_metrics = ResourceMetrics { @@ -268,19 +383,21 @@ impl OtelActor { resource_metrics: vec![resource_metrics], }; - match self.client.export(request).await { - Ok(_) => { - self.exports_performed += 1; - debug!("Exported buffered metrics to collector"); - } - Err(e) => { - self.export_failures += 1; - error!("Failed to export metrics: {}", e); - } + // Send the export request + let result = self.send_request(request).await; + + if let Err(e) = &result { + self.export_failures += 1; + error!( + "Failed to export buffered metrics (consecutive failures {}): {:?}", + self.consecutive_failures, e + ); } self.buffer.clear(); self.buffered_counters = 0; + + result } fn reset_flush_timer(&self, timer: &mut Pin>) { diff --git a/crates/countersyncd/src/exit_codes.rs b/crates/countersyncd/src/exit_codes.rs new file mode 100644 index 00000000..55a0c032 --- /dev/null +++ b/crates/countersyncd/src/exit_codes.rs @@ -0,0 +1,11 @@ +//! Process exit codes for countersyncd + +/// Successful completion +pub const EXIT_SUCCESS: i32 = 0; + +/// General failure +pub const EXIT_FAILURE: i32 = 1; + +/// Actor failure +// OpenTelemetry export retries exhausted +pub const EXIT_OTEL_EXPORT_RETRIES_EXHAUSTED: i32 = 125; diff --git a/crates/countersyncd/src/lib.rs b/crates/countersyncd/src/lib.rs index d9fc7b8f..3090dffc 100644 --- a/crates/countersyncd/src/lib.rs +++ b/crates/countersyncd/src/lib.rs @@ -2,3 +2,4 @@ pub mod actor; pub mod message; pub mod sai; +pub mod exit_codes; diff --git a/crates/countersyncd/src/main.rs b/crates/countersyncd/src/main.rs index 53c23501..b4983b16 100644 --- a/crates/countersyncd/src/main.rs +++ b/crates/countersyncd/src/main.rs @@ -20,6 +20,9 @@ use crate::actor::{ otel::{OtelActor, OtelActorConfig}, }; +// Internal exit codes +use countersyncd::exit_codes::EXIT_OTEL_EXPORT_RETRIES_EXHAUSTED; + /// Initialize logging based on command line arguments fn init_logging(log_level: &str, log_format: &str) { use env_logger::{Builder, Target, WriteStyle}; @@ -402,8 +405,9 @@ async fn main() -> Result<(), Box> { let otel_handle = if let Some(otel_actor) = otel_actor { Some(spawn(async move { info!("OpenTelemetry actor started"); - OtelActor::run(otel_actor).await; + let result = OtelActor::run(otel_actor).await; info!("OpenTelemetry actor terminated"); + result })) } else { info!("OpenTelemetry export disabled - not starting OpenTelemetry actor"); @@ -448,7 +452,7 @@ async fn main() -> Result<(), Box> { counter_db_result.as_ref().unwrap(), otel_result.as_ref().unwrap() ), - (Ok(()), Ok(()), Ok(()), Ok(()), Ok(()), Ok(()), Ok(())) + (Ok(()), Ok(()), Ok(()), Ok(()), Ok(()), Ok(()), Ok(Ok(()))) ) } (true, true, false) => { @@ -476,7 +480,7 @@ async fn main() -> Result<(), Box> { reporter_result.as_ref().unwrap(), otel_result.as_ref().unwrap() ), - (Ok(()), Ok(()), Ok(()), Ok(()), Ok(()), Ok(())) + (Ok(()), Ok(()), Ok(()), Ok(()), Ok(()), Ok(Ok(()))) ) } (false, true, true) => { @@ -490,7 +494,7 @@ async fn main() -> Result<(), Box> { counter_db_result.as_ref().unwrap(), otel_result.as_ref().unwrap() ), - (Ok(()), Ok(()), Ok(()), Ok(()), Ok(()), Ok(())) + (Ok(()), Ok(()), Ok(()), Ok(()), Ok(()), Ok(Ok(()))) ) } (true, false, false) => { @@ -529,7 +533,7 @@ async fn main() -> Result<(), Box> { &swss_result, otel_result.as_ref().unwrap() ), - (Ok(()), Ok(()), Ok(()), Ok(()), Ok(())) + (Ok(()), Ok(()), Ok(()), Ok(()), Ok(Ok(()))) ) } (false, false, false) => { @@ -583,7 +587,7 @@ async fn main() -> Result<(), Box> { Err(e.into()) } else if let Some(Err(e)) = otel_result { error!("OpenTelemetry actor failed: {:?}", e); - Err(e.into()) + std::process::exit(EXIT_OTEL_EXPORT_RETRIES_EXHAUSTED); } else { error!("Unknown actor failure"); Err("Unknown actor failure".into())