Skip to content

Commit

Permalink
update: otel refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
heemankv committed Nov 4, 2024
1 parent 85aacf6 commit 6ad6261
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Fixed

- OTEL config refactor
- wait for transaction logic in ethereum settlement client
- y_0 point evaluation in build kzg proof for ethereum settlement
- fixed metrics name, signoz dashboard.
Expand Down
65 changes: 39 additions & 26 deletions crates/orchestrator/src/telemetry.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::str::FromStr;
use std::time::Duration;

use lazy_static::lazy_static;
use opentelemetry::trace::TracerProvider;
use opentelemetry::{global, KeyValue};
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
Expand All @@ -16,15 +15,16 @@ use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::layer::SubscriberExt as _;
use tracing_subscriber::util::SubscriberInitExt as _;
use tracing_subscriber::EnvFilter;
use utils::env_utils::{get_env_var_optional, get_env_var_or_default, get_env_var_or_panic};
use utils::env_utils::{get_env_var_optional, get_env_var_or_default};

lazy_static! {
#[derive(Debug)]
pub static ref OTEL_SERVICE_NAME: String = get_env_var_or_panic("OTEL_SERVICE_NAME");
pub struct OTELConfig {
endpoint: String,
service_name: String,
}

pub fn setup_analytics() -> Option<SdkMeterProvider> {
let otel_endpoint = get_env_var_optional("OTEL_COLLECTOR_ENDPOINT").expect("Failed to get OTEL_COLLECTOR_ENDPOINT");
let otel_config = get_otel_config();

let log_level = get_env_var_or_default("RUST_LOG", "INFO");
let level = Level::from_str(&log_level).unwrap_or(Level::INFO);

Expand All @@ -33,15 +33,15 @@ pub fn setup_analytics() -> Option<SdkMeterProvider> {
.with(tracing_subscriber::fmt::layer())
.with(EnvFilter::from_default_env());

if let Some(otel_endpoint) = otel_endpoint {
let meter_provider = init_metric_provider(&otel_endpoint);
let tracer = init_tracer_provider(&otel_endpoint);
if let Some(otel_config) = otel_config {
let meter_provider = init_metric_provider(&otel_config);
let tracer = init_tracer_provider(&otel_config);

// Opentelemetry will not provide a global API to manage the logger
// provider. Application users must manage the lifecycle of the logger
// provider on their own. Dropping logger providers will disable log
// emitting.
let logger_provider = init_logs(&otel_endpoint).unwrap();
let logger_provider = init_logs(&otel_config).unwrap();
// Create a new OpenTelemetryTracingBridge using the above LoggerProvider.
let layer = OpenTelemetryTracingBridge::new(&logger_provider);

Expand All @@ -53,11 +53,24 @@ pub fn setup_analytics() -> Option<SdkMeterProvider> {
}
}

fn get_otel_config() -> Option<OTELConfig> {
let otel_endpoint = get_env_var_optional("OTEL_COLLECTOR_ENDPOINT").expect("Failed to get OTEL_COLLECTOR_ENDPOINT");
let otel_service_name = get_env_var_optional("OTEL_SERVICE_NAME").expect("Failed to get OTEL_SERVICE_NAME");

match (otel_endpoint, otel_service_name) {
(Some(endpoint), Some(service_name)) => Some(OTELConfig { endpoint, service_name }),
_ => {
tracing::warn!("OTEL_COLLECTOR_ENDPOINT or OTEL_SERVICE_NAME is not set");
None
}
}
}

pub fn shutdown_analytics(meter_provider: Option<SdkMeterProvider>) {
let otel_endpoint = get_env_var_or_panic("OTEL_COLLECTOR_ENDPOINT");
let otel_config = get_otel_config();

// guard clause if otel is disabled
if otel_endpoint.is_empty() {
if otel_config.is_none() {
return;
}

Expand All @@ -67,29 +80,29 @@ pub fn shutdown_analytics(meter_provider: Option<SdkMeterProvider>) {
}
}

pub fn init_tracer_provider(otel_endpoint: &str) -> Tracer {
pub fn init_tracer_provider(otel_config: &OTELConfig) -> Tracer {
let batch_config = BatchConfigBuilder::default()
// Increasing the queue size and batch size, only increase in queue size delays full channel error.
.build();

let provider = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(opentelemetry_otlp::new_exporter().tonic().with_endpoint(otel_endpoint.to_string()))
.with_exporter(opentelemetry_otlp::new_exporter().tonic().with_endpoint(otel_config.endpoint.to_string()))
.with_trace_config(Config::default().with_resource(Resource::new(vec![KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
format!("{}{}", *OTEL_SERVICE_NAME, "_trace_service"),
format!("{}{}", otel_config.service_name, "_trace_service"),
)])))
.with_batch_config(batch_config)
.install_batch(runtime::Tokio)
.expect("Failed to install tracer provider");

global::set_tracer_provider(provider.clone());

provider.tracer(format!("{}{}", *OTEL_SERVICE_NAME, "_subscriber"))
provider.tracer(format!("{}{}", otel_config.service_name, "_subscriber"))
}

pub fn init_metric_provider(otel_endpoint: &str) -> SdkMeterProvider {
let export_config = ExportConfig { endpoint: otel_endpoint.to_string(), ..ExportConfig::default() };
pub fn init_metric_provider(otel_config: &OTELConfig) -> SdkMeterProvider {
let export_config = ExportConfig { endpoint: otel_config.endpoint.to_string(), ..ExportConfig::default() };

// Creates and builds the OTLP exporter
let exporter = opentelemetry_otlp::new_exporter().tonic().with_export_config(export_config).build_metrics_exporter(
Expand All @@ -109,21 +122,21 @@ pub fn init_metric_provider(otel_endpoint: &str) -> SdkMeterProvider {
.with_reader(reader)
.with_resource(Resource::new(vec![KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
format!("{}{}", *OTEL_SERVICE_NAME, "_meter_service"),
format!("{}{}", otel_config.service_name, "_meter_service"),
)]))
.build();
global::set_meter_provider(provider.clone());
provider
}

fn init_logs(otel_endpoint: &str) -> Result<LoggerProvider, opentelemetry::logs::LogError> {
fn init_logs(otel_config: &OTELConfig) -> Result<LoggerProvider, opentelemetry::logs::LogError> {
opentelemetry_otlp::new_pipeline()
.logging()
.with_resource(Resource::new(vec![KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
format!("{}{}", *OTEL_SERVICE_NAME, "_logs_service"),
format!("{}{}", otel_config.service_name, "_logs_service"),
)]))
.with_exporter(opentelemetry_otlp::new_exporter().tonic().with_endpoint(otel_endpoint.to_string()))
.with_exporter(opentelemetry_otlp::new_exporter().tonic().with_endpoint(otel_config.endpoint.to_string()))
.install_batch(runtime::Tokio)
}

Expand All @@ -145,11 +158,11 @@ mod tests {
env::set_var("OTEL_COLLECTOR_ENDPOINT", "http://localhost:4317");
env::set_var("OTEL_SERVICE_NAME", "test_service");

let otel_endpoint = get_env_var_or_panic("OTEL_COLLECTOR_ENDPOINT");
let otel_config = get_otel_config().unwrap();

// Call the function and check if it doesn't panic
let result = std::panic::catch_unwind(|| {
let _provider = init_metric_provider(&otel_endpoint);
let _provider = init_metric_provider(&otel_config);
});

// Check if the global meter provider is set
Expand All @@ -163,11 +176,11 @@ mod tests {
// Set up necessary environment variables
env::set_var("OTEL_COLLECTOR_ENDPOINT", "http://localhost:4317");
env::set_var("OTEL_SERVICE_NAME", "test_service");
let otel_endpoint = get_env_var_or_panic("OTEL_COLLECTOR_ENDPOINT");
let otel_config = get_otel_config().unwrap();

// Call the function and check if it doesn't panic
let result = std::panic::catch_unwind(|| {
let _tracer = init_tracer_provider(otel_endpoint.as_str());
let _tracer = init_tracer_provider(&otel_config);
});

assert!(result.is_ok(), "init_tracer_provider() panicked");
Expand Down

0 comments on commit 6ad6261

Please sign in to comment.