Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch span processor dedicated thread test #43

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
2 changes: 1 addition & 1 deletion examples/tracing-grpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ fn init_tracer() -> sdktrace::TracerProvider {
global::set_text_map_propagator(TraceContextPropagator::new());
// Install stdout exporter pipeline to be able to retrieve the collected spans.
let provider = sdktrace::TracerProvider::builder()
.with_batch_exporter(SpanExporter::default(), Tokio)
.with_batch_exporter(SpanExporter::default())
.build();

global::set_tracer_provider(provider.clone());
Expand Down
2 changes: 1 addition & 1 deletion examples/tracing-grpc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ fn init_tracer() -> TracerProvider {
global::set_text_map_propagator(TraceContextPropagator::new());
// Install stdout exporter pipeline to be able to retrieve the collected spans.
let provider = TracerProvider::builder()
.with_batch_exporter(SpanExporter::default(), Tokio)
.with_batch_exporter(SpanExporter::default())
.build();

global::set_tracer_provider(provider.clone());
Expand Down
2 changes: 1 addition & 1 deletion examples/tracing-jaeger/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ fn init_tracer_provider() -> Result<opentelemetry_sdk::trace::TracerProvider, Tr
.build()?;

Ok(TracerProvider::builder()
.with_batch_exporter(exporter, runtime::Tokio)
.with_batch_exporter(exporter)
.with_resource(
Resource::builder()
.with_service_name("tracing-jaeger")
Expand Down
5 changes: 1 addition & 4 deletions opentelemetry-otlp/examples/basic-otlp-http/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@ fn init_traces() -> Result<sdktrace::TracerProvider, TraceError> {
.build()?;

Ok(TracerProvider::builder()
// TODO: Enable BatchExporter after
// https://github.com/open-telemetry/opentelemetry-rust/pull/2456
.with_simple_exporter(exporter)
.with_batch_exporter(exporter)
.with_resource(RESOURCE.clone())
.build())
}
Expand All @@ -73,7 +71,6 @@ fn init_metrics() -> Result<opentelemetry_sdk::metrics::SdkMeterProvider, Metric

// #[tokio::main]
// TODO: Re-enable tokio::main, if needed, after
// https://github.com/open-telemetry/opentelemetry-rust/pull/2456
fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let logger_provider = init_logs()?;

Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-otlp/examples/basic-otlp/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ fn init_traces() -> Result<sdktrace::TracerProvider, TraceError> {
.build()?;
Ok(sdktrace::TracerProvider::builder()
.with_resource(RESOURCE.clone())
.with_batch_exporter(exporter, runtime::Tokio)
.with_batch_exporter(exporter)
.build())
}

Expand Down
46 changes: 45 additions & 1 deletion opentelemetry-otlp/tests/integration_test/tests/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fn init_tracer_provider() -> Result<sdktrace::TracerProvider, TraceError> {
let exporter = exporter_builder.build()?;

Ok(opentelemetry_sdk::trace::TracerProvider::builder()
.with_batch_exporter(exporter, runtime::Tokio)
.with_batch_exporter(exporter)
.with_resource(
Resource::builder_empty()
.with_service_name("basic-otlp-tracing-example")
Expand Down Expand Up @@ -141,6 +141,50 @@ pub fn test_serde() -> Result<()> {
Ok(())
}

#[test]
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
pub fn span_batch_non_tokio_main() -> Result<()> {
// Initialize the tracer provider inside a tokio runtime
// as this allows tonic client to capture the runtime,
// but actual export occurs from the dedicated std::thread
// created by BatchSpanProcessor.

use anyhow::Ok;
let rt = tokio::runtime::Runtime::new()?;
let tracer_provider = rt.block_on(async {
// While we're here setup our collector container too, as this needs tokio to run
let _ = test_utils::start_collector_container().await;
init_tracer_provider()
})?;

let tracer = global::tracer("ex.com/basic");

tracer.in_span("operation", |cx| {
let span = cx.span();
span.add_event(
"Nice operation!".to_string(),
vec![KeyValue::new("bogons", 100)],
);
span.set_attribute(KeyValue::new(ANOTHER_KEY, "yes"));

tracer.in_span("Sub operation...", |cx| {
let span = cx.span();
span.set_attribute(KeyValue::new(LEMONS_KEY, "five"));

span.add_event("Sub span event", vec![]);
});
});

tracer_provider.shutdown()?;

// Give it a second to flush
std::thread::sleep(Duration::from_secs(2));

// Validate results
assert_traces_results(test_utils::TRACES_FILE, "./expected/traces.json")?;
Ok(())
}

///
/// Make sure we stop the collector container, otherwise it will sit around hogging our
/// ports and subsequent test runs will fail.
Expand Down
1 change: 0 additions & 1 deletion opentelemetry-otlp/tests/smoke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ async fn smoke_tracer() {
.with_metadata(metadata)
.build()
.expect("NON gzip-tonic SpanExporter failed to build"),
opentelemetry_sdk::runtime::Tokio,
)
.build();

Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ internal-logs = ["tracing"]
experimental_metrics_periodicreader_with_async_runtime = ["metrics"]
spec_unstable_metrics_views = ["metrics"]
experimental_logs_batch_log_processor_with_async_runtime = ["logs"]
experimental_trace_batch_span_processor_with_async_runtime = ["trace"]


[[bench]]
name = "context"
Expand Down
23 changes: 10 additions & 13 deletions opentelemetry-sdk/benches/batch_span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use opentelemetry::trace::{
SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState,
};
use opentelemetry_sdk::export::trace::SpanData;
use opentelemetry_sdk::runtime::Tokio;
use opentelemetry_sdk::testing::trace::NoopSpanExporter;
use opentelemetry_sdk::trace::{
BatchConfigBuilder, BatchSpanProcessor, SpanEvents, SpanLinks, SpanProcessor,
Expand Down Expand Up @@ -49,14 +48,13 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
let rt = Runtime::new().unwrap();
rt.block_on(async move {
let span_processor =
BatchSpanProcessor::builder(NoopSpanExporter::new(), Tokio)
.with_batch_config(
BatchConfigBuilder::default()
.with_max_queue_size(10_000)
.build(),
)
.build();
let span_processor = BatchSpanProcessor::builder(NoopSpanExporter::new())
.with_batch_config(
BatchConfigBuilder::default()
.with_max_queue_size(10_000)
.build(),
)
.build();
let mut shared_span_processor = Arc::new(span_processor);
let mut handles = Vec::with_capacity(10);
for _ in 0..task_num {
Expand All @@ -70,10 +68,9 @@ fn criterion_benchmark(c: &mut Criterion) {
}));
}
futures_util::future::join_all(handles).await;
let _ =
Arc::<BatchSpanProcessor<Tokio>>::get_mut(&mut shared_span_processor)
.unwrap()
.shutdown();
let _ = Arc::<BatchSpanProcessor>::get_mut(&mut shared_span_processor)
.unwrap()
.shutdown();
});
})
},
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/testing/trace/in_memory_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::{Arc, Mutex};
///# async fn main() {
/// let exporter = InMemorySpanExporterBuilder::new().build();
/// let provider = TracerProvider::builder()
/// .with_span_processor(BatchSpanProcessor::builder(exporter.clone(), runtime::Tokio).build())
/// .with_span_processor(BatchSpanProcessor::builder(exporter.clone()).build())
/// .build();
///
/// global::set_tracer_provider(provider.clone());
Expand Down
5 changes: 5 additions & 0 deletions opentelemetry-sdk/src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ mod sampler;
mod span;
mod span_limit;
mod span_processor;
#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
/// Experimental feature to use async runtime with batch span processor.
pub mod span_processor_with_async_runtime;
mod tracer;

pub use config::{config, Config};
Expand All @@ -30,11 +33,13 @@ pub use span_processor::{
BatchConfig, BatchConfigBuilder, BatchSpanProcessor, BatchSpanProcessorBuilder,
SimpleSpanProcessor, SpanProcessor,
};

pub use tracer::Tracer;

#[cfg(feature = "jaeger_remote_sampler")]
pub use sampler::{JaegerRemoteSampler, JaegerRemoteSamplerBuilder};

#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
#[cfg(test)]
mod runtime_tests;

Expand Down
9 changes: 2 additions & 7 deletions opentelemetry-sdk/src/trace/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
/// provider.shutdown();
/// }
/// ```
use crate::runtime::RuntimeChannel;
use crate::trace::{
BatchSpanProcessor, Config, RandomIdGenerator, Sampler, SimpleSpanProcessor, SpanLimits, Tracer,
};
Expand Down Expand Up @@ -296,12 +295,8 @@ impl Builder {
}

/// The [`SpanExporter`] setup using a default [`BatchSpanProcessor`] that this provider should use.
pub fn with_batch_exporter<T: SpanExporter + 'static, R: RuntimeChannel>(
self,
exporter: T,
runtime: R,
) -> Self {
let batch = BatchSpanProcessor::builder(exporter, runtime).build();
pub fn with_batch_exporter<T: SpanExporter + 'static>(self, exporter: T) -> Self {
let batch = BatchSpanProcessor::builder(exporter).build();
self.with_span_processor(batch)
}

Expand Down
6 changes: 5 additions & 1 deletion opentelemetry-sdk/src/trace/runtime_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,12 @@ fn build_batch_tracer_provider<R: RuntimeChannel>(
runtime: R,
) -> crate::trace::TracerProvider {
use crate::trace::TracerProvider;
let processor = crate::trace::span_processor_with_async_runtime::BatchSpanProcessor::builder(
exporter, runtime,
)
.build();
TracerProvider::builder()
.with_batch_exporter(exporter, runtime)
.with_span_processor(processor)
.build()
}

Expand Down
Loading
Loading