Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 151 additions & 34 deletions crates/countersyncd/src/actor/otel.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,43 @@
use std::time::Duration;
use std::pin::Pin;
use tokio::{sync::mpsc::Receiver, sync::oneshot, select};
use tokio::time::{sleep_until, Instant as TokioInstant, Sleep};
use std::{
fmt::{Display, Formatter},
pin::Pin,
time::Duration,
};
use tokio::{
select,
sync::{mpsc::Receiver, oneshot},
time::{sleep_until, Instant as TokioInstant, Sleep},
};
use log::{debug, error, info, warn};
use tonic::transport::{Channel, Endpoint};
use opentelemetry::ExportError;
use opentelemetry_proto::tonic::{
common::v1::{KeyValue as ProtoKeyValue, AnyValue, any_value::Value, InstrumentationScope},
metrics::v1::{Metric, Gauge as ProtoGauge, ResourceMetrics, ScopeMetrics},
collector::metrics::v1::{
metrics_service_client::MetricsServiceClient,
ExportMetricsServiceRequest,
},
common::v1::{
any_value::Value,
AnyValue,
InstrumentationScope,
KeyValue as ProtoKeyValue,
},
metrics::v1::{
Gauge as ProtoGauge,
Metric,
ResourceMetrics,
ScopeMetrics,
},
resource::v1::Resource as ProtoResource,
};
use crate::message::{
saistats::SAIStatsMessage,
otel::OtelMetrics,
saistats::SAIStatsMessage,
};
use log::{info, error, debug};
use opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_client::MetricsServiceClient;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use tonic::transport::Endpoint;

const INITIAL_BACKOFF_DELAY_SECS: u64 = 1;
const MAX_BACKOFF_DELAY_SECS: u64 = 10;
const MAX_EXPORT_RETRIES: u64 = 30;

/// Configuration for the OtelActor
#[derive(Debug, Clone)]
Expand All @@ -40,12 +63,29 @@ impl Default for OtelActorConfig {
}
}

#[derive(Debug)]
pub struct OtelActorExportError(String);

impl std::error::Error for OtelActorExportError {}

impl ExportError for OtelActorExportError {
fn exporter_name(&self) -> &'static str {
"Otel client exporter"
}
}

impl Display for OtelActorExportError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}

/// Actor that receives SAI statistics and exports to OpenTelemetry
pub struct OtelActor {
stats_receiver: Receiver<SAIStatsMessage>,
config: OtelActorConfig,
shutdown_notifier: Option<oneshot::Sender<()>>,
client: MetricsServiceClient<tonic::transport::Channel>,
client: Option<MetricsServiceClient<Channel>>,

// Pre-allocated reusable structures
resource: ProtoResource,
Expand All @@ -61,6 +101,12 @@ pub struct OtelActor {
exports_performed: u64,
export_failures: u64,
console_reports: u64,

// Reconnecting tracking
consecutive_failures: u64,

// Shutdown flag
should_shutdown: bool,
}

impl OtelActor {
Expand All @@ -70,8 +116,7 @@ impl OtelActor {
config: OtelActorConfig,
shutdown_notifier: oneshot::Sender<()>
) -> Result<OtelActor, Box<dyn std::error::Error>> {
let endpoint = config.collector_endpoint.parse::<Endpoint>()?;
let client = MetricsServiceClient::connect(endpoint).await?;
let client = None;

// Pre-create reusable resource
let resource = ProtoResource {
Expand Down Expand Up @@ -114,11 +159,13 @@ impl OtelActor {
exports_performed: 0,
export_failures: 0,
console_reports: 0,
consecutive_failures: 0,
should_shutdown: false,
})
}

/// Main run loop
pub async fn run(mut self) {
pub async fn run(mut self) -> Result<(), Box<dyn ExportError>> {
info!("OtelActor started");

let mut flush_timer = Box::pin(sleep_until(self.flush_deadline));
Expand All @@ -128,29 +175,36 @@ impl OtelActor {
stats_msg = self.stats_receiver.recv() => {
match stats_msg {
Some(stats) => {
self.handle_stats_message(stats).await;
self.handle_stats_message(stats).await?;
self.reset_flush_timer(&mut flush_timer);
}
None => {
_none => {
info!("Stats receiver channel closed, shutting down OtelActor");
break;
}
}
}
_ = &mut flush_timer => {
self.flush_buffer().await;
self.flush_buffer().await?;
self.reset_flush_timer(&mut flush_timer);
}
}

// Check for shutdown flag
if self.should_shutdown {
info!("Shutdown flag set, exiting Otel run loop");
break;
}
}

// Flush any remaining buffered metrics before shutdown
self.flush_buffer().await;
self.flush_buffer().await?;
self.shutdown().await;
Ok(())
}

/// Handle incoming SAI statistics message
async fn handle_stats_message(&mut self, stats: SAIStatsMessage) {
async fn handle_stats_message(&mut self, stats: SAIStatsMessage) -> Result<(), Box<dyn ExportError>>{
self.messages_received += 1;

debug!("Received SAI stats with {} entries, observation_time: {}",
Expand All @@ -176,9 +230,11 @@ impl OtelActor {

// Force flush when counter threshold is reached
if self.buffered_counters >= self.config.max_counters_per_export {
self.flush_buffer().await;
self.flush_buffer().await?;
self.flush_deadline = TokioInstant::now() + self.config.flush_timeout;
}

Ok(())
}

async fn print_otel_metrics(&mut self, otel_metrics: &OtelMetrics) {
Expand Down Expand Up @@ -214,16 +270,75 @@ impl OtelActor {
}
}

debug!("Raw Gauge: {:#?}", gauge);
info!("Raw Gauge: {:#?}", gauge);
}
}

}

// Exponential backoff
async fn backoff(&self, attempt: u64) {
let delay_secs = std::cmp::min(INITIAL_BACKOFF_DELAY_SECS * 2u64.pow(attempt as u32 - 1), MAX_BACKOFF_DELAY_SECS);
tokio::time::sleep(Duration::from_secs(delay_secs)).await;
}

// Get or create the Otel MetricsServiceClient
fn get_client(&mut self) -> Option<&mut MetricsServiceClient<Channel>> {
if self.client.is_none() {
let endpoint = match self.config.collector_endpoint.parse::<Endpoint>() {
Ok(e) => e,
Err(e) => {
warn!("Invalid Otel endpoint: {}", e);
return None;
}
};

let channel = endpoint.connect_lazy();
self.client = Some(MetricsServiceClient::new(channel));
}

self.client.as_mut()
}

async fn send_request(
&mut self,
request: ExportMetricsServiceRequest,
) -> Result<(), Box<dyn ExportError>> {
for attempt in 1..=MAX_EXPORT_RETRIES {
// Ensure we have a client
let client = match self.get_client() {
Some(c) => c, // Use existing or newly created client
_none => { // Failed to create client
self.client = None;
self.backoff(attempt).await; // Wait before retrying
continue;
}
};

// Attempt to send the request
match client.export(request.clone()).await {
Ok(_) => { // Successful export
self.exports_performed += 1;
self.consecutive_failures = 0;
return Ok(());
}
Err(e) => {
warn!("Export attempt {} failed: {}", attempt, e);
self.client = None; // Drop broken client
self.consecutive_failures += 1;
self.backoff(attempt).await; // Wait before retrying
}
}
}

// All retries exhausted
Err(Box::new(OtelActorExportError("Max export retries exceeded".to_string())))
}

// Export buffered metrics to OpenTelemetry collector
async fn flush_buffer(&mut self) {
async fn flush_buffer(&mut self) -> Result<(), Box<dyn ExportError>> {
if self.buffer.is_empty() {
return;
return Ok(());
}

let mut proto_metrics: Vec<Metric> = Vec::new();
Expand Down Expand Up @@ -251,7 +366,7 @@ impl OtelActor {
if proto_metrics.is_empty() {
self.buffer.clear();
self.buffered_counters = 0;
return;
return Ok(());
}

let resource_metrics = ResourceMetrics {
Expand All @@ -268,19 +383,21 @@ impl OtelActor {
resource_metrics: vec![resource_metrics],
};

match self.client.export(request).await {
Ok(_) => {
self.exports_performed += 1;
debug!("Exported buffered metrics to collector");
}
Err(e) => {
self.export_failures += 1;
error!("Failed to export metrics: {}", e);
}
// Send the export request
let result = self.send_request(request).await;

if let Err(e) = &result {
self.export_failures += 1;
error!(
"Failed to export buffered metrics (consecutive failures {}): {:?}",
self.consecutive_failures, e
);
}

self.buffer.clear();
self.buffered_counters = 0;

result
}

fn reset_flush_timer(&self, timer: &mut Pin<Box<Sleep>>) {
Expand Down
11 changes: 11 additions & 0 deletions crates/countersyncd/src/exit_codes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
//! Process exit codes for countersyncd

/// Successful completion
pub const EXIT_SUCCESS: i32 = 0;

/// General failure
pub const EXIT_FAILURE: i32 = 1;

/// Actor failure
// OpenTelemetry export retries exhausted
pub const EXIT_OTEL_EXPORT_RETRIES_EXHAUSTED: i32 = 125;
1 change: 1 addition & 0 deletions crates/countersyncd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
pub mod actor;
pub mod message;
pub mod sai;
pub mod exit_codes;
16 changes: 10 additions & 6 deletions crates/countersyncd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ use crate::actor::{
otel::{OtelActor, OtelActorConfig},
};

// Internal exit codes
use countersyncd::exit_codes::EXIT_OTEL_EXPORT_RETRIES_EXHAUSTED;

/// Initialize logging based on command line arguments
fn init_logging(log_level: &str, log_format: &str) {
use env_logger::{Builder, Target, WriteStyle};
Expand Down Expand Up @@ -402,8 +405,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let otel_handle = if let Some(otel_actor) = otel_actor {
Some(spawn(async move {
info!("OpenTelemetry actor started");
OtelActor::run(otel_actor).await;
let result = OtelActor::run(otel_actor).await;
info!("OpenTelemetry actor terminated");
result
}))
} else {
info!("OpenTelemetry export disabled - not starting OpenTelemetry actor");
Expand Down Expand Up @@ -448,7 +452,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
counter_db_result.as_ref().unwrap(),
otel_result.as_ref().unwrap()
),
(Ok(()), Ok(()), Ok(()), Ok(()), Ok(()), Ok(()), Ok(()))
(Ok(()), Ok(()), Ok(()), Ok(()), Ok(()), Ok(()), Ok(Ok(())))
)
}
(true, true, false) => {
Expand Down Expand Up @@ -476,7 +480,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
reporter_result.as_ref().unwrap(),
otel_result.as_ref().unwrap()
),
(Ok(()), Ok(()), Ok(()), Ok(()), Ok(()), Ok(()))
(Ok(()), Ok(()), Ok(()), Ok(()), Ok(()), Ok(Ok(())))
)
}
(false, true, true) => {
Expand All @@ -490,7 +494,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
counter_db_result.as_ref().unwrap(),
otel_result.as_ref().unwrap()
),
(Ok(()), Ok(()), Ok(()), Ok(()), Ok(()), Ok(()))
(Ok(()), Ok(()), Ok(()), Ok(()), Ok(()), Ok(Ok(())))
)
}
(true, false, false) => {
Expand Down Expand Up @@ -529,7 +533,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
&swss_result,
otel_result.as_ref().unwrap()
),
(Ok(()), Ok(()), Ok(()), Ok(()), Ok(()))
(Ok(()), Ok(()), Ok(()), Ok(()), Ok(Ok(())))
)
}
(false, false, false) => {
Expand Down Expand Up @@ -583,7 +587,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Err(e.into())
} else if let Some(Err(e)) = otel_result {
error!("OpenTelemetry actor failed: {:?}", e);
Err(e.into())
std::process::exit(EXIT_OTEL_EXPORT_RETRIES_EXHAUSTED);
} else {
error!("Unknown actor failure");
Err("Unknown actor failure".into())
Expand Down
Loading