Skip to content

Commit

Permalink
Merge pull request #2857 from calebschoepp/telemetry-fixes
Browse files Browse the repository at this point in the history
Telemetry fixes
  • Loading branch information
rylev authored Oct 7, 2024
2 parents 57a5b88 + 7ee2579 commit f80e5ce
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 19 deletions.
8 changes: 4 additions & 4 deletions crates/factor-key-value/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl key_value::HostStore for KeyValueDispatch {
.await)
}

#[instrument(name = "spin_key_value.get", skip(self, store), err(level = Level::INFO), fields(otel.kind = "client"))]
#[instrument(name = "spin_key_value.get", skip(self, store, key), err(level = Level::INFO), fields(otel.kind = "client"))]
async fn get(
&mut self,
store: Resource<key_value::Store>,
Expand All @@ -95,7 +95,7 @@ impl key_value::HostStore for KeyValueDispatch {
Ok(store.get(&key).await)
}

#[instrument(name = "spin_key_value.set", skip(self, store, value), err(level = Level::INFO), fields(otel.kind = "client"))]
#[instrument(name = "spin_key_value.set", skip(self, store, key, value), err(level = Level::INFO), fields(otel.kind = "client"))]
async fn set(
&mut self,
store: Resource<key_value::Store>,
Expand All @@ -106,7 +106,7 @@ impl key_value::HostStore for KeyValueDispatch {
Ok(store.set(&key, &value).await)
}

#[instrument(name = "spin_key_value.delete", skip(self, store), err(level = Level::INFO), fields(otel.kind = "client"))]
#[instrument(name = "spin_key_value.delete", skip(self, store, key), err(level = Level::INFO), fields(otel.kind = "client"))]
async fn delete(
&mut self,
store: Resource<key_value::Store>,
Expand All @@ -116,7 +116,7 @@ impl key_value::HostStore for KeyValueDispatch {
Ok(store.delete(&key).await)
}

#[instrument(name = "spin_key_value.exists", skip(self, store), err(level = Level::INFO), fields(otel.kind = "client"))]
#[instrument(name = "spin_key_value.exists", skip(self, store, key), err(level = Level::INFO), fields(otel.kind = "client"))]
async fn exists(
&mut self,
store: Resource<key_value::Store>,
Expand Down
9 changes: 6 additions & 3 deletions crates/factor-outbound-mysql/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use spin_world::v1::mysql as v1;
use spin_world::v2::mysql::{self as v2, Connection};
use spin_world::v2::rdbms_types as v2_types;
use spin_world::v2::rdbms_types::ParameterValue;
use tracing::field::Empty;
use tracing::{instrument, Level};

use crate::client::Client;
Expand Down Expand Up @@ -38,8 +39,10 @@ impl<C: Client> v2::Host for InstanceState<C> {}

#[async_trait]
impl<C: Client> v2::HostConnection for InstanceState<C> {
#[instrument(name = "spin_outbound_mysql.open_connection", skip(self), err(level = Level::INFO), fields(otel.kind = "client", db.system = "mysql"))]
#[instrument(name = "spin_outbound_mysql.open", skip(self, address), err(level = Level::INFO), fields(otel.kind = "client", db.system = "mysql", db.address = Empty, server.port = Empty, db.namespace = Empty))]
async fn open(&mut self, address: String) -> Result<Resource<Connection>, v2::Error> {
spin_factor_outbound_networking::record_address_fields(&address);

if !self
.is_address_allowed(&address)
.await
Expand All @@ -52,7 +55,7 @@ impl<C: Client> v2::HostConnection for InstanceState<C> {
self.open_connection(&address).await
}

#[instrument(name = "spin_outbound_mysql.execute", skip(self, connection), err(level = Level::INFO), fields(otel.kind = "client", db.system = "mysql", otel.name = statement))]
#[instrument(name = "spin_outbound_mysql.execute", skip(self, connection, params), err(level = Level::INFO), fields(otel.kind = "client", db.system = "mysql", otel.name = statement))]
async fn execute(
&mut self,
connection: Resource<Connection>,
Expand All @@ -66,7 +69,7 @@ impl<C: Client> v2::HostConnection for InstanceState<C> {
.await?)
}

#[instrument(name = "spin_outbound_mysql.query", skip(self, connection), err(level = Level::INFO), fields(otel.kind = "client", db.system = "mysql", otel.name = statement))]
#[instrument(name = "spin_outbound_mysql.query", skip(self, connection, params), err(level = Level::INFO), fields(otel.kind = "client", db.system = "mysql", otel.name = statement))]
async fn query(
&mut self,
connection: Resource<Connection>,
Expand Down
20 changes: 20 additions & 0 deletions crates/factor-outbound-networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub use config::{
};

pub use runtime_config::ComponentTlsConfigs;
use url::Url;

pub type SharedFutureResult<T> = Shared<BoxFuture<'static, Result<Arc<T>, Arc<anyhow::Error>>>>;

Expand Down Expand Up @@ -247,3 +248,22 @@ impl<F: Fn(&str, &str) + Send + Sync> DisallowedHostHandler for F {
self(scheme, authority);
}
}

/// Records the address host, port, and database as fields on the current tracing span.
///
/// This should only be called from within a function that has been instrumented with a span.
///
/// The following fields must be pre-declared as empty on the span or they will not show up.
/// ```
/// use tracing::field::Empty;
/// #[tracing::instrument(fields(db.address = Empty, server.port = Empty, db.namespace = Empty))]
/// fn open() {}
/// ```
pub fn record_address_fields(address: &str) {
if let Ok(url) = Url::parse(address) {
let span = tracing::Span::current();
span.record("db.address", url.host_str().unwrap_or_default());
span.record("server.port", url.port().unwrap_or_default());
span.record("db.namespace", url.path().trim_start_matches('/'));
}
}
9 changes: 6 additions & 3 deletions crates/factor-outbound-pg/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use spin_world::v1::rdbms_types as v1_types;
use spin_world::v2::postgres::{self as v2, Connection};
use spin_world::v2::rdbms_types;
use spin_world::v2::rdbms_types::{ParameterValue, RowSet};
use tracing::field::Empty;
use tracing::instrument;
use tracing::Level;

Expand Down Expand Up @@ -63,8 +64,10 @@ impl<C: Send + Sync + Client> v2::Host for InstanceState<C> {}

#[async_trait]
impl<C: Send + Sync + Client> v2::HostConnection for InstanceState<C> {
#[instrument(name = "spin_outbound_pg.open_connection", skip(self), err(level = Level::INFO), fields(otel.kind = "client", db.system = "postgresql"))]
#[instrument(name = "spin_outbound_pg.open", skip(self, address), err(level = Level::INFO), fields(otel.kind = "client", db.system = "postgresql", db.address = Empty, server.port = Empty, db.namespace = Empty))]
async fn open(&mut self, address: String) -> Result<Resource<Connection>, v2::Error> {
spin_factor_outbound_networking::record_address_fields(&address);

if !self
.is_address_allowed(&address)
.await
Expand All @@ -77,7 +80,7 @@ impl<C: Send + Sync + Client> v2::HostConnection for InstanceState<C> {
self.open_connection(&address).await
}

#[instrument(name = "spin_outbound_pg.execute", skip(self, connection), err(level = Level::INFO), fields(otel.kind = "client", db.system = "postgresql", otel.name = statement))]
#[instrument(name = "spin_outbound_pg.execute", skip(self, connection, params), err(level = Level::INFO), fields(otel.kind = "client", db.system = "postgresql", otel.name = statement))]
async fn execute(
&mut self,
connection: Resource<Connection>,
Expand All @@ -91,7 +94,7 @@ impl<C: Send + Sync + Client> v2::HostConnection for InstanceState<C> {
.await?)
}

#[instrument(name = "spin_outbound_pg.query", skip(self, connection), err(level = Level::INFO), fields(otel.kind = "client", db.system = "postgresql", otel.name = statement))]
#[instrument(name = "spin_outbound_pg.query", skip(self, connection, params), err(level = Level::INFO), fields(otel.kind = "client", db.system = "postgresql", otel.name = statement))]
async fn query(
&mut self,
connection: Resource<Connection>,
Expand Down
5 changes: 4 additions & 1 deletion crates/factor-outbound-redis/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use spin_world::v1::{redis as v1, redis_types};
use spin_world::v2::redis::{
self as v2, Connection as RedisConnection, Error, RedisParameter, RedisResult,
};
use tracing::field::Empty;
use tracing::{instrument, Level};

pub struct InstanceState {
Expand Down Expand Up @@ -53,8 +54,10 @@ impl v2::Host for crate::InstanceState {

#[async_trait]
impl v2::HostConnection for crate::InstanceState {
#[instrument(name = "spin_outbound_redis.open_connection", skip(self), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis"))]
#[instrument(name = "spin_outbound_redis.open_connection", skip(self, address), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", db.address = Empty, server.port = Empty, db.namespace = Empty))]
async fn open(&mut self, address: String) -> Result<Resource<RedisConnection>, Error> {
spin_factor_outbound_networking::record_address_fields(&address);

if !self
.is_address_allowed(&address)
.await
Expand Down
2 changes: 1 addition & 1 deletion crates/factor-sqlite/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl v2::HostConnection for InstanceState {
.map(Resource::new_own)
}

#[instrument(name = "spin_sqlite.execute", skip(self, connection), err(level = Level::INFO), fields(otel.kind = "client", db.system = "sqlite", otel.name = query, sqlite.backend = Empty))]
#[instrument(name = "spin_sqlite.execute", skip(self, connection, parameters), err(level = Level::INFO), fields(otel.kind = "client", db.system = "sqlite", otel.name = query, sqlite.backend = Empty))]
async fn execute(
&mut self,
connection: Resource<v2::Connection>,
Expand Down
3 changes: 3 additions & 0 deletions crates/telemetry/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::time::Duration;

use anyhow::{bail, Result};
use opentelemetry::global;
use opentelemetry_otlp::MetricsExporterBuilder;
use opentelemetry_sdk::{
metrics::{
Expand Down Expand Up @@ -57,6 +58,8 @@ pub(crate) fn otel_metrics_layer<S: Subscriber + for<'span> LookupSpan<'span>>(
.with_resource(resource)
.build();

global::set_meter_provider(meter_provider.clone());

Ok(MetricsLayer::new(meter_provider))
}

Expand Down
13 changes: 6 additions & 7 deletions crates/telemetry/src/traces.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::time::Duration;

use anyhow::bail;
use opentelemetry::trace::TracerProvider;
use opentelemetry::{global, trace::TracerProvider};
use opentelemetry_otlp::SpanExporterBuilder;
use opentelemetry_sdk::{
resource::{EnvResourceDetector, TelemetryResourceDetector},
Expand Down Expand Up @@ -34,22 +34,21 @@ pub(crate) fn otel_tracing_layer<S: Subscriber + for<'span> LookupSpan<'span>>(
],
);

// This will configure the exporter based on the OTEL_EXPORTER_* environment variables. We
// currently default to using the HTTP exporter but in the future we could select off of the
// combination of OTEL_EXPORTER_OTLP_PROTOCOL and OTEL_EXPORTER_OTLP_TRACES_PROTOCOL to
// determine whether we should use http/protobuf or grpc.
let exporter: SpanExporterBuilder = match OtlpProtocol::traces_protocol_from_env() {
// This will configure the exporter based on the OTEL_EXPORTER_* environment variables.
let exporter_builder: SpanExporterBuilder = match OtlpProtocol::traces_protocol_from_env() {
OtlpProtocol::Grpc => opentelemetry_otlp::new_exporter().tonic().into(),
OtlpProtocol::HttpProtobuf => opentelemetry_otlp::new_exporter().http().into(),
OtlpProtocol::HttpJson => bail!("http/json OTLP protocol is not supported"),
};

let tracer_provider = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(exporter)
.with_exporter(exporter_builder)
.with_trace_config(opentelemetry_sdk::trace::Config::default().with_resource(resource))
.install_batch(opentelemetry_sdk::runtime::Tokio)?;

global::set_tracer_provider(tracer_provider.clone());

let env_filter = match EnvFilter::try_from_env("SPIN_OTEL_TRACING_LEVEL") {
Ok(filter) => filter,
// If it isn't set or it fails to parse default to info
Expand Down

0 comments on commit f80e5ce

Please sign in to comment.