Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add Open Telemetry attributes to grpc spans #698

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion crates/block-producer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ itertools = { workspace = true }
miden-block-prover = { git = "https://github.com/0xPolygonMiden/miden-base.git", branch = "next" }
miden-lib = { workspace = true }
miden-node-proto = { workspace = true }
miden-node-utils = { workspace = true, features = ["testing"] }
miden-objects = { workspace = true }
miden-processor = { workspace = true }
miden-tx = { workspace = true }
Expand Down
4 changes: 0 additions & 4 deletions crates/utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ workspace = true
[features]
# Enables depedencies intended for build script generation of version metadata.
vergen = ["dep:vergen", "dep:vergen-gitcl"]
# Enables utility functions for testing traces created by some other crate's stack.
testing = ["dep:tokio"]

[dependencies]
anyhow = { version = "1.0" }
Expand All @@ -38,8 +36,6 @@ tracing-forest = { version = "0.1", optional = true, features = ["chrono"
tracing-opentelemetry = { version = "0.29" }
tracing-subscriber = { workspace = true }

# Optional dependencies enabled by `testing` feature.
tokio = { workspace = true, optional = true }
# Optional dependencies enabled by `vergen` feature.
# This must match the version expected by `vergen-gitcl`.
vergen = { "version" = "9.0", optional = true }
Expand Down
18 changes: 0 additions & 18 deletions crates/utils/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,24 +57,6 @@ pub fn setup_tracing(otel: OpenTelemetry) -> Result<()> {
tracing::subscriber::set_global_default(subscriber).map_err(Into::into)
}

#[cfg(feature = "testing")]
pub fn setup_test_tracing() -> Result<(
tokio::sync::mpsc::UnboundedReceiver<opentelemetry_sdk::trace::SpanData>,
tokio::sync::mpsc::UnboundedReceiver<()>,
)> {
opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());

let (exporter, rx_export, rx_shutdown) =
opentelemetry_sdk::testing::trace::new_tokio_test_exporter();

let otel_layer = open_telemetry_layer(exporter);
let subscriber = Registry::default()
.with(stdout_layer().with_filter(env_or_default_filter()))
.with(otel_layer.with_filter(env_or_default_filter()));
tracing::subscriber::set_global_default(subscriber)?;
Ok((rx_export, rx_shutdown))
}

fn open_telemetry_layer<S>(
exporter: impl SpanExporter + 'static,
) -> Box<dyn tracing_subscriber::Layer<S> + Send + Sync + 'static>
Expand Down
117 changes: 94 additions & 23 deletions crates/utils/src/tracing/grpc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
use super::OpenTelemetrySpanExt;
/// Creates a [`tracing::Span`] based on RPC service and method name.
macro_rules! rpc_span {
($service:expr, $method:expr) => {
tracing::info_span!(
concat!($service, "/", $method),
rpc.service = $service,
rpc.method = $method
)
};
}

/// A [`trace_fn`](tonic::transport::server::Server) implementation for the block producer which
/// adds open-telemetry information to the span.
Expand All @@ -8,14 +17,12 @@ use super::OpenTelemetrySpanExt;
/// to the client's origin trace.
pub fn block_producer_trace_fn<T>(request: &http::Request<T>) -> tracing::Span {
let span = if let Some("SubmitProvenTransaction") = request.uri().path().rsplit('/').next() {
tracing::info_span!("block-producer.rpc/SubmitProvenTransaction")
rpc_span!("block-producer.rpc", "SubmitProvenTransaction")
} else {
tracing::info_span!("block-producer.rpc/Unknown")
rpc_span!("block-producer.rpc", "Unknown")
};

span.set_parent(request);
span.set_http_attributes(request);
span
add_otel_span_attributes(span, request)
}

/// A [`trace_fn`](tonic::transport::server::Server) implementation for the store which adds
Expand All @@ -26,25 +33,68 @@ pub fn block_producer_trace_fn<T>(request: &http::Request<T>) -> tracing::Span {
/// client's origin trace.
pub fn store_trace_fn<T>(request: &http::Request<T>) -> tracing::Span {
let span = match request.uri().path().rsplit('/').next() {
Some("ApplyBlock") => tracing::info_span!("store.rpc/ApplyBlock"),
Some("CheckNullifiers") => tracing::info_span!("store.rpc/CheckNullifiers"),
Some("CheckNullifiersByPrefix") => tracing::info_span!("store.rpc/CheckNullifiersByPrefix"),
Some("GetAccountDetails") => tracing::info_span!("store.rpc/GetAccountDetails"),
Some("GetAccountProofs") => tracing::info_span!("store.rpc/GetAccountProofs"),
Some("GetAccountStateDelta") => tracing::info_span!("store.rpc/GetAccountStateDelta"),
Some("GetBlockByNumber") => tracing::info_span!("store.rpc/GetBlockByNumber"),
Some("GetBlockHeaderByNumber") => tracing::info_span!("store.rpc/GetBlockHeaderByNumber"),
Some("GetBlockInputs") => tracing::info_span!("store.rpc/GetBlockInputs"),
Some("GetBatchInputs") => tracing::info_span!("store.rpc/GetBatchInputs"),
Some("GetNotesById") => tracing::info_span!("store.rpc/GetNotesById"),
Some("GetTransactionInputs") => tracing::info_span!("store.rpc/GetTransactionInputs"),
Some("SyncNotes") => tracing::info_span!("store.rpc/SyncNotes"),
Some("SyncState") => tracing::info_span!("store.rpc/SyncState"),
_ => tracing::info_span!("store.rpc/Unknown"),
Some("ApplyBlock") => rpc_span!("store.rpc", "ApplyBlock"),
Some("CheckNullifiers") => rpc_span!("store.rpc", "CheckNullifiers"),
Some("CheckNullifiersByPrefix") => rpc_span!("store.rpc", "CheckNullifiersByPrefix"),
Some("GetAccountDetails") => rpc_span!("store.rpc", "GetAccountDetails"),
Some("GetAccountProofs") => rpc_span!("store.rpc", "GetAccountProofs"),
Some("GetAccountStateDelta") => rpc_span!("store.rpc", "GetAccountStateDelta"),
Some("GetBlockByNumber") => rpc_span!("store.rpc", "GetBlockByNumber"),
Some("GetBlockHeaderByNumber") => rpc_span!("store.rpc", "GetBlockHeaderByNumber"),
Some("GetBlockInputs") => rpc_span!("store.rpc", "GetBlockInputs"),
Some("GetBatchInputs") => rpc_span!("store.rpc", "GetBatchInputs"),
Some("GetNotesById") => rpc_span!("store.rpc", "GetNotesById"),
Some("GetTransactionInputs") => rpc_span!("store.rpc", "GetTransactionInputs"),
Some("SyncNotes") => rpc_span!("store.rpc", "SyncNotes"),
Some("SyncState") => rpc_span!("store.rpc", "SyncState"),
_ => rpc_span!("store.rpc", "Unknown"),
};

span.set_parent(request);
span.set_http_attributes(request);
add_otel_span_attributes(span, request)
}

/// Adds remote tracing context to the span.
///
/// Could be expanded in the future by adding in more open-telemetry properties.
fn add_otel_span_attributes<T>(span: tracing::Span, request: &http::Request<T>) -> tracing::Span {
use super::OpenTelemetrySpanExt;
// Pull the open-telemetry parent context using the HTTP extractor. We could make a more
// generic gRPC extractor by utilising the gRPC metadata. However that
// (a) requires cloning headers,
// (b) we would have to write this ourselves, and
// (c) gRPC metadata is transferred using HTTP headers in any case.
let otel_ctx = opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.extract(&MetadataExtractor(&tonic::metadata::MetadataMap::from_headers(
request.headers().clone(),
)))
});
tracing_opentelemetry::OpenTelemetrySpanExt::set_parent(&span, otel_ctx);

// Set HTTP attributes.
// See https://opentelemetry.io/docs/specs/semconv/rpc/rpc-spans/#server-attributes.
span.set_attribute("rpc.system", "grpc");
if let Some(host) = request.uri().host() {
span.set_attribute("server.address", host);
}
if let Some(host_port) = request.uri().port() {
span.set_attribute("server.port", host_port.as_str());
}
let remote_addr = request
.extensions()
.get::<tonic::transport::server::TcpConnectInfo>()
.and_then(tonic::transport::server::TcpConnectInfo::remote_addr);
if let Some(addr) = remote_addr {
span.set_attribute("client.address", addr.ip());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the one that the spec says may be the DNS name if no DNS lookup is required.

Client address - domain name if available without reverse DNS lookup; otherwise, IP address or Unix domain socket name.

I'm wondering if we shouldn't use the text_map_propagator to cheat the system a bit, and on the client side inject block-producer and rpc as an additional property.

We could then use that here, and otherwise default to the IP if its missing.

Could also be done as a follow-up PR - I imagine you might be very much over this back-and-forth :D

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could also be done as a follow-up PR - I imagine you might be very much over this back-and-forth :D

Not at all - happy to iterate on this PR as much as required.

I'm wondering if we shouldn't use the text_map_propagator to cheat the system a bit, and on the client side inject block-producer and rpc as an additional property.
We could then use that here, and otherwise default to the IP if its missing.

I'm a bit dubious as to the utility of that functionality but happy to add that to this PR if you think it is worthwhile.

Copy link
Contributor

@Mirko-von-Leipzig Mirko-von-Leipzig Mar 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main reason I'm considering it is because our components can (and currently do) run on the same server instance (and same process). Which means for the inter-component comms, the client and server IP address will just be the same since they're the same for all three components.

So I think it would help having the "caller" identifiable at least. Though maybe instead we should be hosting different api endpoints for this internal communication in any case e.g. store.block-producer/get_block_inputs and store.rpc/get_state, which would do the same job and provide some additional decoupling.

I'm happy to just punt this to a separate issue instead.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok yea that makes sense. Thanks for elaborating

span.set_attribute("client.port", addr.port());
span.set_attribute("network.peer.address", addr.ip());
span.set_attribute("network.peer.port", addr.port());
span.set_attribute("network.transport", "tcp");
match addr.ip() {
std::net::IpAddr::V4(_) => span.set_attribute("network.type", "ipv4"),
std::net::IpAddr::V6(_) => span.set_attribute("network.type", "ipv6"),
}
}

span
}

Expand All @@ -57,6 +107,7 @@ impl tonic::service::Interceptor for OtelInterceptor {
&mut self,
mut request: tonic::Request<()>,
) -> Result<tonic::Request<()>, tonic::Status> {
use tracing_opentelemetry::OpenTelemetrySpanExt;
let ctx = tracing::Span::current().context();
opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.inject_context(&ctx, &mut MetadataInjector(request.metadata_mut()));
Expand All @@ -66,6 +117,26 @@ impl tonic::service::Interceptor for OtelInterceptor {
}
}

struct MetadataExtractor<'a>(&'a tonic::metadata::MetadataMap);
impl opentelemetry::propagation::Extractor for MetadataExtractor<'_> {
/// Get a value for a key from the `MetadataMap`. If the value can't be converted to &str,
/// returns None
fn get(&self, key: &str) -> Option<&str> {
self.0.get(key).and_then(|metadata| metadata.to_str().ok())
}

/// Collect all the keys from the `MetadataMap`.
fn keys(&self) -> Vec<&str> {
self.0
.keys()
.map(|key| match key {
tonic::metadata::KeyRef::Ascii(v) => v.as_str(),
tonic::metadata::KeyRef::Binary(v) => v.as_str(),
})
.collect::<Vec<_>>()
}
}

struct MetadataInjector<'a>(&'a mut tonic::metadata::MetadataMap);
impl opentelemetry::propagation::Injector for MetadataInjector<'_> {
/// Set a key and value in the `MetadataMap`. Does nothing if the key or value are not valid
Expand Down
119 changes: 40 additions & 79 deletions crates/utils/src/tracing/span_ext.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use core::time::Duration;
use std::net::SocketAddr;
use std::net::IpAddr;

use miden_objects::{block::BlockNumber, Digest};
use opentelemetry::{trace::Status, Key, Value};
Expand All @@ -9,16 +9,6 @@ pub trait ToValue {
fn to_value(&self) -> Value;
}

impl ToValue for Option<SocketAddr> {
fn to_value(&self) -> Value {
if let Some(socket_addr) = self {
socket_addr.to_string().into()
} else {
"no_remote_addr".into()
}
}
}

impl ToValue for Duration {
fn to_value(&self) -> Value {
self.as_secs_f64().into()
Expand All @@ -31,81 +21,73 @@ impl ToValue for Digest {
}
}

impl ToValue for f64 {
fn to_value(&self) -> Value {
(*self).into()
}
}

impl ToValue for BlockNumber {
fn to_value(&self) -> Value {
i64::from(self.as_u32()).into()
}
}

impl ToValue for u32 {
fn to_value(&self) -> Value {
i64::from(*self).into()
}
/// Generates `impl ToValue` blocks for types that are `ToString`.
macro_rules! impl_to_string_to_value {
($($t:ty),*) => {
$(
impl ToValue for $t {
fn to_value(&self) -> Value {
self.to_string().into()
}
}
)*
};
}

impl ToValue for i64 {
fn to_value(&self) -> Value {
(*self).into()
}
impl_to_string_to_value!(IpAddr, &str);

/// Generates `impl ToValue` blocks for integer types.
macro_rules! impl_int_to_value {
($($t:ty),*) => {
$(
impl ToValue for $t {
fn to_value(&self) -> Value {
i64::from(*self).into()
}
}
)*
};
}
impl_int_to_value!(u16, u32, i64);

/// Generates `impl ToValue` blocks for types that are `Into<Value>`.
macro_rules! impl_to_value {
($($t:ty),*) => {
$(
impl ToValue for $t {
fn to_value(&self) -> Value {
(*self).into()
}
}
)*
};
}
impl_to_value!(f64);

/// Utility functions based on [`tracing_opentelemetry::OpenTelemetrySpanExt`].
///
/// This is a sealed trait. It and cannot be implemented outside of this module.
pub trait OpenTelemetrySpanExt: private::Sealed {
fn set_parent<T>(&self, request: &http::Request<T>);
fn set_attribute(&self, key: impl Into<Key>, value: impl ToValue);
fn set_http_attributes<T>(&self, request: &http::Request<T>);
fn set_error(&self, err: &dyn std::error::Error);
fn context(&self) -> opentelemetry::Context;
}

impl<S> OpenTelemetrySpanExt for S
where
S: tracing_opentelemetry::OpenTelemetrySpanExt,
{
/// Sets the parent context by extracting HTTP metadata from the request.
fn set_parent<T>(&self, request: &http::Request<T>) {
// Pull the open-telemetry parent context using the HTTP extractor. We could make a more
// generic gRPC extractor by utilising the gRPC metadata. However that
// (a) requires cloning headers,
// (b) we would have to write this ourselves, and
// (c) gRPC metadata is transferred using HTTP headers in any case.
let otel_ctx = opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.extract(&MetadataExtractor(&tonic::metadata::MetadataMap::from_headers(
request.headers().clone(),
)))
});
tracing_opentelemetry::OpenTelemetrySpanExt::set_parent(self, otel_ctx);
}

/// Returns the context of `Span`.
fn context(&self) -> opentelemetry::Context {
tracing_opentelemetry::OpenTelemetrySpanExt::context(self)
}

/// Sets an attribute on `Span`.
///
/// Implementations for `ToValue` should be added to this crate (miden-node-utils).
fn set_attribute(&self, key: impl Into<Key>, value: impl ToValue) {
tracing_opentelemetry::OpenTelemetrySpanExt::set_attribute(self, key, value.to_value());
}

/// Sets standard attributes to the `Span` based on an associated HTTP request.
fn set_http_attributes<T>(&self, request: &http::Request<T>) {
let remote_addr = request
.extensions()
.get::<tonic::transport::server::TcpConnectInfo>()
.and_then(tonic::transport::server::TcpConnectInfo::remote_addr);
OpenTelemetrySpanExt::set_attribute(self, "remote_addr", remote_addr);
}

/// Sets a status on `Span` based on an error.
fn set_error(&self, err: &dyn std::error::Error) {
// Coalesce all sources into one string.
Expand All @@ -125,24 +107,3 @@ mod private {
pub trait Sealed {}
impl<S> Sealed for S where S: tracing_opentelemetry::OpenTelemetrySpanExt {}
}

/// Facilitates Open Telemetry metadata extraction for Tonic `MetadataMap`.
struct MetadataExtractor<'a>(&'a tonic::metadata::MetadataMap);
impl opentelemetry::propagation::Extractor for MetadataExtractor<'_> {
/// Get a value for a key from the `MetadataMap`. If the value can't be converted to &str,
/// returns None
fn get(&self, key: &str) -> Option<&str> {
self.0.get(key).and_then(|metadata| metadata.to_str().ok())
}

/// Collect all the keys from the `MetadataMap`.
fn keys(&self) -> Vec<&str> {
self.0
.keys()
.map(|key| match key {
tonic::metadata::KeyRef::Ascii(v) => v.as_str(),
tonic::metadata::KeyRef::Binary(v) => v.as_str(),
})
.collect::<Vec<_>>()
}
}