diff --git a/spanner/Cargo.toml b/spanner/Cargo.toml index e19012d5..3f2eb81c 100644 --- a/spanner/Cargo.toml +++ b/spanner/Cargo.toml @@ -28,6 +28,7 @@ token-source = "1.0" google-cloud-longrunning = { package = "gcloud-longrunning", version = "1.3.0", path = "../foundation/longrunning" } google-cloud-gax = { package = "gcloud-gax", version = "1.3.1", path = "../foundation/gax" } google-cloud-googleapis = { package = "gcloud-googleapis", version = "1.3.0", path = "../googleapis", features = ["spanner"]} +opentelemetry = { version = "0.31", optional = true, default-features = false, features = ["metrics"] } google-cloud-auth = { package = "gcloud-auth", optional = true, version = "1.1.2", path="../foundation/auth", default-features=false } @@ -45,3 +46,4 @@ auth = ["google-cloud-auth"] default-tls = ["google-cloud-auth?/default-tls"] rustls-tls = ["google-cloud-auth?/rustls-tls"] external-account = ["google-cloud-auth?/external-account"] +otel-metrics = ["opentelemetry"] diff --git a/spanner/src/apiv1/spanner_client.rs b/spanner/src/apiv1/spanner_client.rs index a7c20eb6..3f35013d 100644 --- a/spanner/src/apiv1/spanner_client.rs +++ b/spanner/src/apiv1/spanner_client.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::time::Duration; use google_cloud_gax::conn::Channel; @@ -13,6 +14,8 @@ use google_cloud_googleapis::spanner::v1::{ PartitionReadRequest, PartitionResponse, ReadRequest, ResultSet, RollbackRequest, Session, Transaction, }; +use crate::metrics::MetricsRecorder; + const ROUTE_TO_LEADER_HEADER: &str = "x-goog-spanner-route-to-leader"; pub(crate) fn ping_query_request(session_name: impl Into) -> ExecuteSqlRequest { @@ -48,6 +51,7 @@ fn default_setting() -> RetrySetting { pub struct Client { inner: SpannerClient, metadata: MetadataMap, + metrics: Arc, } impl Client { @@ -57,6 +61,7 @@ impl Client { Client { inner: inner.max_decoding_message_size(i32::MAX as usize), metadata: Default::default(), + metrics: Arc::new(MetricsRecorder::default()), } } @@ -65,9 +70,15 @@ impl Client { Client { inner: self.inner, metadata, + metrics: self.metrics, } } + pub(crate) fn with_metrics(mut self, metrics: Arc) -> Client { + self.metrics = metrics; + self + } + /// create_session creates a new session. A session can be used to perform /// transactions that read and/or modify data in a Cloud Spanner database. /// Sessions are meant to be reused for many consecutive @@ -96,6 +107,7 @@ impl Client { ) -> Result, Status> { let setting = retry.unwrap_or_else(default_setting); let database = &req.database; + let metrics = Arc::clone(&self.metrics); invoke_fn( Some(setting), |this| async { @@ -105,6 +117,9 @@ impl Client { self, ) .await + .inspect(move |response| { + Self::record_gfe(metrics.as_ref(), "createSession", response); + }) } /// batch_create_sessions creates multiple new sessions. @@ -120,6 +135,7 @@ impl Client { ) -> Result, Status> { let setting = retry.unwrap_or_else(default_setting); let database = &req.database; + let metrics = Arc::clone(&self.metrics); invoke_fn( Some(setting), |this| async { @@ -129,6 +145,9 @@ impl Client { self, ) .await + .inspect(move |response| { + Self::record_gfe(metrics.as_ref(), "batchCreateSessions", response); + }) } /// get_session gets a session. Returns NOT_FOUND if the session does not exist. @@ -142,6 +161,7 @@ impl Client { ) -> Result, Status> { let setting = retry.unwrap_or_else(default_setting); let name = &req.name; + let metrics = Arc::clone(&self.metrics); invoke_fn( Some(setting), |this| async { @@ -151,6 +171,9 @@ impl Client { self, ) .await + .inspect(move |response| { + Self::record_gfe(metrics.as_ref(), "getSession", response); + }) } /// list_sessions lists all sessions in a given database. @@ -163,6 +186,7 @@ impl Client { ) -> Result, Status> { let setting = retry.unwrap_or_else(default_setting); let database = &req.database; + let metrics = Arc::clone(&self.metrics); invoke_fn( Some(setting), |this| async { @@ -172,6 +196,9 @@ impl Client { self, ) .await + .inspect(move |response| { + Self::record_gfe(metrics.as_ref(), "listSessions", response); + }) } /// delete_session ends a session, releasing server resources associated with it. This will @@ -186,6 +213,7 @@ impl Client { ) -> Result, Status> { let setting = retry.unwrap_or_else(default_setting); let name = &req.name; + let metrics = Arc::clone(&self.metrics); invoke_fn( Some(setting), |this| async { @@ -195,6 +223,9 @@ impl Client { self, ) .await + .inspect(move |response| { + Self::record_gfe(metrics.as_ref(), "deleteSession", response); + }) } /// execute_sql executes an SQL statement, returning all results in a single reply. This @@ -217,6 +248,7 @@ impl Client { ) -> Result, Status> { let setting = retry.unwrap_or_else(default_setting); let session = &req.session; + let metrics = Arc::clone(&self.metrics); invoke_fn( Some(setting), |this| async { @@ -226,6 +258,9 @@ impl Client { self, ) .await + .inspect(move |response| { + Self::record_gfe(metrics.as_ref(), "executeSql", response); + }) } /// execute_streaming_sql like ExecuteSql, except returns the result @@ -242,6 +277,7 @@ impl Client { ) -> Result>, Status> { let setting = retry.unwrap_or_else(default_setting); let session = &req.session; + let metrics = Arc::clone(&self.metrics); invoke_fn( Some(setting), |this| async { @@ -251,6 +287,9 @@ impl Client { self, ) .await + .inspect(move |response| { + Self::record_gfe(metrics.as_ref(), "executeStreamingSql", response); + }) } /// execute_batch_dml executes a batch of SQL DML statements. This method allows many statements @@ -273,6 +312,7 @@ impl Client { ) -> Result, Status> { let setting = retry.unwrap_or_else(default_setting); let session = &req.session; + let metrics = Arc::clone(&self.metrics); invoke_fn( Some(setting), |this| async { @@ -296,6 +336,9 @@ impl Client { self, ) .await + .inspect(move |response| { + Self::record_gfe(metrics.as_ref(), "executeBatchDml", response); + }) } /// read reads rows from the database using key lookups and scans, as a @@ -320,6 +363,7 @@ impl Client { ) -> Result, Status> { let setting = retry.unwrap_or_else(default_setting); let session = &req.session; + let metrics = Arc::clone(&self.metrics); invoke_fn( Some(setting), |this| async { @@ -329,6 +373,9 @@ impl Client { self, ) .await + .inspect(move |response| { + Self::record_gfe(metrics.as_ref(), "read", response); + }) } /// streaming_read like read, except returns the result set as a @@ -345,6 +392,7 @@ impl Client { ) -> Result>, Status> { let setting = retry.unwrap_or_else(default_setting); let session = &req.session; + let metrics = Arc::clone(&self.metrics); invoke_fn( Some(setting), |this| async { @@ -354,6 +402,9 @@ impl Client { self, ) .await + .inspect(move |response| { + Self::record_gfe(metrics.as_ref(), "streamingRead", response); + }) } /// BeginTransaction begins a new transaction. This step can often be skipped: @@ -369,6 +420,7 @@ impl Client { ) -> Result, Status> { let setting = retry.unwrap_or_else(default_setting); let session = &req.session; + let metrics = Arc::clone(&self.metrics); invoke_fn( Some(setting), |this| async { @@ -378,6 +430,9 @@ impl Client { self, ) .await + .inspect(move |response| { + Self::record_gfe(metrics.as_ref(), "beginTransaction", response); + }) } /// Commit commits a transaction. The request includes the mutations to be @@ -403,6 +458,7 @@ impl Client { ) -> Result, Status> { let setting = retry.unwrap_or_else(default_setting); let session = &req.session; + let metrics = Arc::clone(&self.metrics); invoke_fn( Some(setting), |this| async { @@ -412,6 +468,9 @@ impl Client { self, ) .await + .inspect(move |response| { + Self::record_gfe(metrics.as_ref(), "commit", response); + }) } /// Rollback rolls back a transaction, releasing any locks it holds. It is a good @@ -431,6 +490,7 @@ impl Client { ) -> Result, Status> { let setting = retry.unwrap_or_else(default_setting); let session = &req.session; + let metrics = Arc::clone(&self.metrics); invoke_fn( Some(setting), |this| async { @@ -440,6 +500,9 @@ impl Client { self, ) .await + .inspect(move |response| { + Self::record_gfe(metrics.as_ref(), "rollback", response); + }) } /// PartitionQuery creates a set of partition tokens that can be used to execute a query @@ -462,6 +525,7 @@ impl Client { ) -> Result, Status> { let setting = retry.unwrap_or_else(default_setting); let session = &req.session; + let metrics = Arc::clone(&self.metrics); invoke_fn( Some(setting), |this| async { @@ -471,6 +535,9 @@ impl Client { self, ) .await + .inspect(move |response| { + Self::record_gfe(metrics.as_ref(), "partitionQuery", response); + }) } /// PartitionRead creates a set of partition tokens that can be used to execute a read @@ -495,6 +562,7 @@ impl Client { ) -> Result, Status> { let setting = retry.unwrap_or_else(default_setting); let session = &req.session; + let metrics = Arc::clone(&self.metrics); invoke_fn( Some(setting), |this| async { @@ -504,6 +572,9 @@ impl Client { self, ) .await + .inspect(move |response| { + Self::record_gfe(metrics.as_ref(), "partitionRead", response); + }) } fn create_request( @@ -529,4 +600,8 @@ impl Client { } req } + + fn record_gfe(metrics: &MetricsRecorder, method: &'static str, response: &Response) { + metrics.record_server_timing(method, response.metadata()); + } } diff --git a/spanner/src/client.rs b/spanner/src/client.rs index f69cd2ff..1c453a5a 100644 --- a/spanner/src/client.rs +++ b/spanner/src/client.rs @@ -12,6 +12,7 @@ use google_cloud_googleapis::spanner::v1::{commit_request, transaction_options, use token_source::NoopTokenSourceProvider; use crate::apiv1::conn_pool::{ConnectionManager, SPANNER}; +use crate::metrics::{MetricsConfig, MetricsRecorder}; use crate::retry::TransactionRetrySetting; use crate::session::{ManagedSession, SessionConfig, SessionError, SessionManager}; use crate::statement::Statement; @@ -82,6 +83,8 @@ pub struct ClientConfig { pub environment: Environment, /// DisableRouteToLeader specifies if all the requests of type read-write and PDML need to be routed to the leader region. pub disable_route_to_leader: bool, + /// Metrics configuration for emitting OpenTelemetry signals. + pub metrics: MetricsConfig, } impl Default for ClientConfig { @@ -95,6 +98,7 @@ impl Default for ClientConfig { None => Environment::GoogleCloud(Box::new(NoopTokenSourceProvider {})), }, disable_route_to_leader: false, + metrics: MetricsConfig::default(), }; config.session_config.min_opened = config.channel_config.num_channels * 4; config.session_config.max_opened = config.channel_config.num_channels * 100; @@ -184,6 +188,11 @@ impl Client { ))); } + let database: String = database.into(); + let metrics = Arc::new( + MetricsRecorder::try_new(&database, &config.metrics).map_err(|e| Error::InvalidConfig(e.to_string()))?, + ); + let pool_size = config.channel_config.num_channels; let options = ConnectionOptions { timeout: Some(config.channel_config.timeout), @@ -191,8 +200,14 @@ impl Client { }; let conn_pool = ConnectionManager::new(pool_size, &config.environment, config.endpoint.as_str(), &options).await?; - let session_manager = - SessionManager::new(database, conn_pool, config.session_config, config.disable_route_to_leader).await?; + let session_manager = SessionManager::new( + database, + conn_pool, + config.session_config, + config.disable_route_to_leader, + metrics.clone(), + ) + .await?; Ok(Client { sessions: session_manager, diff --git a/spanner/src/lib.rs b/spanner/src/lib.rs index aa879e55..5a0bf03c 100644 --- a/spanner/src/lib.rs +++ b/spanner/src/lib.rs @@ -636,6 +636,7 @@ pub mod admin; pub mod apiv1; pub mod client; pub mod key; +pub mod metrics; pub mod mutation; pub mod reader; pub mod retry; diff --git a/spanner/src/metrics.rs b/spanner/src/metrics.rs new file mode 100644 index 00000000..5dac6ba3 --- /dev/null +++ b/spanner/src/metrics.rs @@ -0,0 +1,504 @@ +#[cfg(feature = "otel-metrics")] +use std::collections::HashMap; +#[cfg(feature = "otel-metrics")] +use std::sync::atomic::AtomicU64; +use std::sync::Arc; +use std::time::Duration; + +use google_cloud_gax::grpc::metadata::MetadataMap; +use thiserror::Error; + +#[derive(Clone, Default)] +pub struct MetricsConfig { + /// Enables OpenTelemetry metrics emission when the `otel-metrics` feature is active. + pub enabled: bool, + #[cfg(feature = "otel-metrics")] + pub meter_provider: Option>, +} + +impl std::fmt::Debug for MetricsConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut ds = f.debug_struct("MetricsConfig"); + ds.field("enabled", &self.enabled); + #[cfg(feature = "otel-metrics")] + { + let provider = self.meter_provider.is_some(); + ds.field("meter_provider_present", &provider); + } + ds.finish() + } +} + +#[derive(Clone, Default)] +pub(crate) struct MetricsRecorder { + #[cfg(feature = "otel-metrics")] + inner: Option>, +} + +#[derive(Debug, Error)] +pub enum MetricsError { + #[error("invalid database name: {0}")] + InvalidDatabase(String), +} + +#[allow(dead_code)] +#[derive(Clone, Debug)] +pub(crate) struct SessionPoolSnapshot { + pub open_sessions: usize, + pub sessions_in_use: usize, + pub idle_sessions: usize, + pub max_allowed_sessions: usize, + pub max_in_use_last_window: usize, + pub has_multiplexed_session: bool, +} + +pub(crate) type SessionPoolStatsFn = Arc SessionPoolSnapshot + Send + Sync>; + +impl MetricsRecorder { + pub fn try_new(database: &str, config: &MetricsConfig) -> Result { + #[cfg(feature = "otel-metrics")] + { + if config.enabled { + let parsed = parse_database_name(database)?; + let inner = OtelMetrics::new(parsed, config.meter_provider.clone()); + Ok(Self { + inner: Some(Arc::new(inner)), + }) + } else { + Ok(Self { inner: None }) + } + } + #[cfg(not(feature = "otel-metrics"))] + { + let _ = database; + let _ = config; + Ok(Self::default()) + } + } + + pub(crate) fn register_session_pool(&self, stats: SessionPoolStatsFn) { + #[cfg(feature = "otel-metrics")] + if let Some(inner) = &self.inner { + inner.register_session_pool(stats); + } + #[cfg(not(feature = "otel-metrics"))] + { + let _ = stats; + } + } + + pub(crate) fn record_session_timeout(&self) { + #[cfg(feature = "otel-metrics")] + if let Some(inner) = &self.inner { + inner.record_session_timeout(); + } + } + + pub(crate) fn record_session_acquired(&self) { + #[cfg(feature = "otel-metrics")] + if let Some(inner) = &self.inner { + inner.record_session_acquired(); + } + } + + pub(crate) fn record_session_released(&self) { + #[cfg(feature = "otel-metrics")] + if let Some(inner) = &self.inner { + inner.record_session_released(); + } + } + + pub(crate) fn record_session_acquire_latency(&self, duration: Duration) { + #[cfg(feature = "otel-metrics")] + if let Some(inner) = &self.inner { + inner.record_session_acquire_latency(duration); + } + #[cfg(not(feature = "otel-metrics"))] + { + let _ = duration; + } + } + + pub(crate) fn record_server_timing(&self, method: &'static str, metadata: &MetadataMap) { + #[cfg(feature = "otel-metrics")] + if let Some(inner) = &self.inner { + let metrics = parse_server_timing(metadata); + inner.record_gfe_metrics(method, metrics); + } + #[cfg(not(feature = "otel-metrics"))] + { + let _ = method; + let _ = metadata; + } + } +} + +#[cfg(feature = "otel-metrics")] +mod otel_impl { + use super::{ + ParsedDatabaseName, ServerTimingMetrics, SessionPoolStatsFn, ATTR_CLIENT_ID, ATTR_DATABASE, ATTR_INSTANCE, + ATTR_IS_MULTIPLEXED, ATTR_LIB_VERSION, ATTR_METHOD, ATTR_PROJECT, ATTR_TYPE, CLIENT_ID_SEQ, GFE_BUCKETS, + GFE_TIMING_HEADER, METRICS_PREFIX, OTEL_SCOPE, SESSION_ACQUIRE_BUCKETS, + }; + use opentelemetry::metrics::{Counter, Histogram, Meter, MeterProvider, ObservableGauge}; + use opentelemetry::{global, InstrumentationScope, KeyValue}; + use std::sync::atomic::Ordering; + use std::sync::{Arc, Mutex}; + use std::time::Duration; + + pub(super) struct OtelMetrics { + meter: Meter, + attributes: AttributeSets, + session_gauges: Mutex>, + get_session_timeouts: Counter, + acquired_sessions: Counter, + released_sessions: Counter, + gfe_latency: Histogram, + gfe_header_missing: Counter, + session_acquire_latency: Histogram, + } + + struct SessionGaugeHandles { + _open_session_count: ObservableGauge, + _max_allowed_sessions: ObservableGauge, + _num_sessions: ObservableGauge, + _max_in_use_sessions: ObservableGauge, + } + + impl OtelMetrics { + pub(super) fn new( + parsed: ParsedDatabaseName, + meter_provider: Option>, + ) -> Self { + let scope = InstrumentationScope::builder(OTEL_SCOPE) + .with_version(env!("CARGO_PKG_VERSION")) + .build(); + let meter = if let Some(provider) = meter_provider { + provider.meter_with_scope(scope) + } else { + global::meter_provider().meter_with_scope(scope) + }; + + let attributes = AttributeSets::new(parsed); + + let get_session_timeouts = meter + .u64_counter(METRICS_PREFIX.to_owned() + "get_session_timeouts") + .with_description("The number of get sessions timeouts due to pool exhaustion.") + .with_unit("1") + .build(); + let acquired_sessions = meter + .u64_counter(METRICS_PREFIX.to_owned() + "num_acquired_sessions") + .with_description("The number of sessions acquired from the session pool.") + .with_unit("1") + .build(); + let released_sessions = meter + .u64_counter(METRICS_PREFIX.to_owned() + "num_released_sessions") + .with_description("The number of sessions released by the user and pool maintainer.") + .with_unit("1") + .build(); + let gfe_latency = meter + .f64_histogram(METRICS_PREFIX.to_owned() + "gfe_latency") + .with_description("Latency between Google's network receiving an RPC and reading back the first byte of the response.") + .with_unit("ms") + .with_boundaries(GFE_BUCKETS.to_vec()) + .build(); + let gfe_header_missing = meter + .u64_counter(METRICS_PREFIX.to_owned() + "gfe_header_missing_count") + .with_description("Number of RPC responses received without the server-timing header, most likely meaning the RPC never reached Google's network.") + .with_unit("1") + .build(); + let session_acquire_latency = meter + .f64_histogram(METRICS_PREFIX.to_owned() + "session_acquire_latency") + .with_description("Time spent waiting to acquire a session from the pool.") + .with_unit("ms") + .with_boundaries(SESSION_ACQUIRE_BUCKETS.to_vec()) + .build(); + + OtelMetrics { + meter, + attributes, + session_gauges: Mutex::new(None), + get_session_timeouts, + acquired_sessions, + released_sessions, + gfe_latency, + gfe_header_missing, + session_acquire_latency, + } + } + + pub(super) fn register_session_pool(&self, stats: SessionPoolStatsFn) { + let mut guard = self.session_gauges.lock().unwrap(); + if guard.is_some() { + return; + } + + let open_stats = stats.clone(); + let base = self.attributes.base.clone(); + let multiplexed = self.attributes.with_multiplexed.clone(); + let open_session_count = self + .meter + .i64_observable_gauge(METRICS_PREFIX.to_owned() + "open_session_count") + .with_description("Number of sessions currently opened.") + .with_unit("1") + .with_callback(move |observer| { + let snapshot = open_stats(); + if snapshot.has_multiplexed_session { + observer.observe(1, multiplexed.as_ref()); + } + observer.observe(snapshot.open_sessions as i64, base.as_ref()); + }) + .build(); + + let max_allowed_stats = stats.clone(); + let base = self.attributes.base.clone(); + let max_allowed_sessions = self + .meter + .i64_observable_gauge(METRICS_PREFIX.to_owned() + "max_allowed_sessions") + .with_description("The maximum number of sessions allowed. Configurable by the user.") + .with_unit("1") + .with_callback(move |observer| { + let snapshot = max_allowed_stats(); + observer.observe(snapshot.max_allowed_sessions as i64, base.as_ref()); + }) + .build(); + + let sessions_stats = stats.clone(); + let in_use_attrs = self.attributes.num_in_use.clone(); + let idle_attrs = self.attributes.num_sessions.clone(); + let num_sessions = self + .meter + .i64_observable_gauge(METRICS_PREFIX.to_owned() + "num_sessions_in_pool") + .with_description("The number of sessions currently in use.") + .with_unit("1") + .with_callback(move |observer| { + let snapshot = sessions_stats(); + observer.observe(snapshot.sessions_in_use as i64, in_use_attrs.as_ref()); + observer.observe(snapshot.idle_sessions as i64, idle_attrs.as_ref()); + }) + .build(); + + let max_in_use_stats = stats; + let attrs = self.attributes.max_in_use.clone(); + let max_in_use_sessions = self + .meter + .i64_observable_gauge(METRICS_PREFIX.to_owned() + "max_in_use_sessions") + .with_description("The maximum number of sessions in use during the last 10 minute interval.") + .with_unit("1") + .with_callback(move |observer| { + let snapshot = max_in_use_stats(); + observer.observe(snapshot.max_in_use_last_window as i64, attrs.as_ref()); + }) + .build(); + + *guard = Some(SessionGaugeHandles { + _open_session_count: open_session_count, + _max_allowed_sessions: max_allowed_sessions, + _num_sessions: num_sessions, + _max_in_use_sessions: max_in_use_sessions, + }); + } + + pub(super) fn record_session_timeout(&self) { + self.get_session_timeouts + .add(1, self.attributes.without_multiplexed.as_ref()); + } + + pub(super) fn record_session_acquired(&self) { + self.acquired_sessions + .add(1, self.attributes.without_multiplexed.as_ref()); + } + + pub(super) fn record_session_released(&self) { + self.released_sessions + .add(1, self.attributes.without_multiplexed.as_ref()); + } + + pub(super) fn record_session_acquire_latency(&self, duration: Duration) { + let latency_ms = duration.as_secs_f64() * 1000.0; + self.session_acquire_latency + .record(latency_ms, self.attributes.base.as_ref()); + } + + pub(super) fn record_gfe_metrics(&self, method: &'static str, metrics: ServerTimingMetrics) { + if metrics.is_empty() { + self.gfe_header_missing.add(1, self.attributes.base.as_ref()); + return; + } + + let mut attrs: Vec = self.attributes.base.as_ref().to_vec(); + attrs.push(KeyValue::new(ATTR_METHOD, method)); + + let latency = metrics.value(GFE_TIMING_HEADER); + self.gfe_latency.record(latency, &attrs); + } + } + + #[derive(Clone)] + struct AttributeSets { + base: Arc<[KeyValue]>, + with_multiplexed: Arc<[KeyValue]>, + without_multiplexed: Arc<[KeyValue]>, + num_in_use: Arc<[KeyValue]>, + num_sessions: Arc<[KeyValue]>, + max_in_use: Arc<[KeyValue]>, + } + + impl AttributeSets { + fn new(parsed: ParsedDatabaseName) -> Self { + let client_id = next_client_id(); + let base_vec = vec![ + KeyValue::new(ATTR_CLIENT_ID, client_id), + KeyValue::new(ATTR_DATABASE, parsed.database), + KeyValue::new(ATTR_INSTANCE, parsed.instance), + KeyValue::new(ATTR_PROJECT, parsed.project), + KeyValue::new(ATTR_LIB_VERSION, env!("CARGO_PKG_VERSION")), + ]; + + let mut with_multiplexed_vec = base_vec.clone(); + with_multiplexed_vec.push(KeyValue::new(ATTR_IS_MULTIPLEXED, "true")); + + let mut without_multiplexed_vec = base_vec.clone(); + without_multiplexed_vec.push(KeyValue::new(ATTR_IS_MULTIPLEXED, "false")); + + let mut num_in_use_vec = without_multiplexed_vec.clone(); + num_in_use_vec.push(KeyValue::new(ATTR_TYPE, "num_in_use_sessions")); + + let mut num_sessions_vec = without_multiplexed_vec.clone(); + num_sessions_vec.push(KeyValue::new(ATTR_TYPE, "num_sessions")); + + let max_in_use_vec = without_multiplexed_vec.clone(); + + AttributeSets { + base: base_vec.into(), + with_multiplexed: with_multiplexed_vec.into(), + without_multiplexed: without_multiplexed_vec.into(), + num_in_use: num_in_use_vec.into(), + num_sessions: num_sessions_vec.into(), + max_in_use: max_in_use_vec.into(), + } + } + } + + fn next_client_id() -> String { + let id = CLIENT_ID_SEQ.fetch_add(1, Ordering::Relaxed); + format!("rust-client-{id}") + } +} + +#[cfg(feature = "otel-metrics")] +use otel_impl::*; + +#[cfg(feature = "otel-metrics")] +const OTEL_SCOPE: &str = "cloud.google.com/go"; +#[cfg(feature = "otel-metrics")] +const METRICS_PREFIX: &str = "spanner/"; +#[cfg(feature = "otel-metrics")] +const ATTR_CLIENT_ID: &str = "client_id"; +#[cfg(feature = "otel-metrics")] +const ATTR_DATABASE: &str = "database"; +#[cfg(feature = "otel-metrics")] +const ATTR_INSTANCE: &str = "instance_id"; +#[cfg(feature = "otel-metrics")] +const ATTR_PROJECT: &str = "project_id"; +#[cfg(feature = "otel-metrics")] +const ATTR_LIB_VERSION: &str = "library_version"; +#[cfg(feature = "otel-metrics")] +const ATTR_IS_MULTIPLEXED: &str = "is_multiplexed"; +#[cfg(feature = "otel-metrics")] +const ATTR_TYPE: &str = "type"; +#[cfg(feature = "otel-metrics")] +const ATTR_METHOD: &str = "grpc_client_method"; +#[cfg(feature = "otel-metrics")] +const GFE_TIMING_HEADER: &str = "gfet4t7"; +#[cfg(feature = "otel-metrics")] +const SERVER_TIMING_HEADER: &str = "server-timing"; + +#[cfg(feature = "otel-metrics")] +const SESSION_ACQUIRE_BUCKETS: &[f64] = &[ + 0.0, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 75.0, 100.0, 150.0, 200.0, 300.0, 400.0, 500.0, 750.0, 1000.0, 1500.0, + 2000.0, 3000.0, 4000.0, 5000.0, 7500.0, 10000.0, 15000.0, 30000.0, 60000.0, +]; + +#[cfg(feature = "otel-metrics")] +const GFE_BUCKETS: &[f64] = &[ + 0.0, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, + 20.0, 25.0, 30.0, 40.0, 50.0, 65.0, 80.0, 100.0, 130.0, 160.0, 200.0, 250.0, 300.0, 400.0, 500.0, 650.0, 800.0, + 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0, 100000.0, 200000.0, 400000.0, 800000.0, 1600000.0, 3200000.0, +]; + +#[cfg(feature = "otel-metrics")] +static CLIENT_ID_SEQ: AtomicU64 = AtomicU64::new(1); + +#[cfg(feature = "otel-metrics")] +#[derive(Clone)] +struct ParsedDatabaseName { + database: String, + instance: String, + project: String, +} + +#[cfg(feature = "otel-metrics")] +fn parse_database_name(name: &str) -> Result { + let parts: Vec<&str> = name.split('/').collect(); + if parts.len() != 6 || parts[0] != "projects" || parts[2] != "instances" || parts[4] != "databases" { + return Err(MetricsError::InvalidDatabase(name.to_string())); + } + Ok(ParsedDatabaseName { + project: parts[1].to_string(), + instance: parts[3].to_string(), + database: parts[5].to_string(), + }) +} + +#[cfg(feature = "otel-metrics")] +#[derive(Clone)] +struct ServerTimingMetrics { + values: HashMap, +} + +#[cfg(feature = "otel-metrics")] +impl ServerTimingMetrics { + fn is_empty(&self) -> bool { + self.values.is_empty() + } + + fn value(&self, key: &str) -> f64 { + self.values.get(key).copied().unwrap_or_default() + } +} + +#[cfg(feature = "otel-metrics")] +fn parse_server_timing(metadata: &MetadataMap) -> ServerTimingMetrics { + let mut map = HashMap::new(); + for value in metadata.get_all(SERVER_TIMING_HEADER).iter() { + if let Ok(raw) = value.to_str() { + for part in raw.split(',') { + let trimmed = part.trim(); + if let Some((name, dur_part)) = trimmed.split_once(';') { + let name = name.trim(); + if let Some(duration) = dur_part.trim().strip_prefix("dur=") { + if let Ok(parsed) = duration.trim().parse::() { + map.insert(name.to_string(), parsed); + } + } + } + } + } + } + ServerTimingMetrics { values: map } +} + +#[cfg(all(test, feature = "otel-metrics"))] +mod tests { + use super::*; + + #[test] + fn parses_server_timing_header() { + let mut metadata = MetadataMap::new(); + metadata.insert(SERVER_TIMING_HEADER, "gfet4t7;dur=12.5,another-metric;dur=3.5".parse().unwrap()); + let metrics = parse_server_timing(&metadata); + assert!(!metrics.is_empty()); + assert!((metrics.value(GFE_TIMING_HEADER) - 12.5).abs() < f64::EPSILON); + } +} diff --git a/spanner/src/session.rs b/spanner/src/session.rs index de05cffd..7aac1edf 100644 --- a/spanner/src/session.rs +++ b/spanner/src/session.rs @@ -20,6 +20,9 @@ use google_cloud_googleapis::spanner::v1::{BatchCreateSessionsRequest, DeleteSes use crate::apiv1::conn_pool::ConnectionManager; use crate::apiv1::spanner_client::{ping_query_request, Client}; +use crate::metrics::{MetricsRecorder, SessionPoolSnapshot, SessionPoolStatsFn}; + +const MAX_IN_USE_WINDOW: Duration = Duration::from_secs(600); /// Session pub struct SessionHandle { @@ -124,6 +127,11 @@ struct Sessions { /// number of sessions scheduled to be replenished. num_creating: usize, + + /// Maximum observed number of sessions in use during the current window. + max_inuse_window: usize, + /// Start of the rolling window used for `max_inuse_window`. + window_started_at: Instant, } impl Sessions { @@ -146,13 +154,16 @@ impl Sessions { None => None, Some(s) => { self.num_inuse += 1; + self.update_max_in_use(); Some(s) } } } fn release(&mut self, session: SessionHandle) { - self.num_inuse -= 1; + if self.num_inuse > 0 { + self.num_inuse -= 1; + } if session.valid { self.available_sessions.push_back(session); } else if !session.deleted { @@ -197,6 +208,16 @@ impl Sessions { Err(e) => tracing::error!("failed to create new sessions {:?}", e), } } + + fn update_max_in_use(&mut self) { + let now = Instant::now(); + if now.duration_since(self.window_started_at) >= MAX_IN_USE_WINDOW { + self.window_started_at = now; + self.max_inuse_window = self.num_inuse; + } else if self.num_inuse > self.max_inuse_window { + self.max_inuse_window = self.num_inuse; + } + } } #[derive(Clone)] @@ -204,6 +225,7 @@ struct SessionPool { inner: Arc>, session_creation_sender: UnboundedSender, config: Arc, + metrics: Arc, } impl SessionPool { @@ -213,20 +235,26 @@ impl SessionPool { session_creation_sender: UnboundedSender, config: Arc, disable_route_to_leader: bool, + metrics: Arc, ) -> Result { let available_sessions = - Self::init_pool(database, conn_pool, config.min_opened, disable_route_to_leader).await?; - Ok(SessionPool { + Self::init_pool(database, conn_pool, config.min_opened, disable_route_to_leader, metrics.clone()).await?; + let pool = SessionPool { inner: Arc::new(RwLock::new(Sessions { available_sessions, waiters: VecDeque::new(), orphans: Vec::new(), num_inuse: 0, num_creating: 0, + max_inuse_window: 0, + window_started_at: Instant::now(), })), session_creation_sender, config, - }) + metrics, + }; + pool.metrics.register_session_pool(pool.snapshot_fn()); + Ok(pool) } async fn init_pool( @@ -234,6 +262,7 @@ impl SessionPool { conn_pool: &ConnectionManager, min_opened: usize, disable_route_to_leader: bool, + metrics: Arc, ) -> Result, Status> { let channel_num = conn_pool.num(); let creation_count_per_channel = min_opened / channel_num; @@ -248,7 +277,10 @@ impl SessionPool { } else { creation_count_per_channel }; - let next_client = conn_pool.conn().with_metadata(client_metadata(&database)); + let next_client = conn_pool + .conn() + .with_metrics(metrics.clone()) + .with_metadata(client_metadata(&database)); let database = database.clone(); tasks.spawn(async move { batch_create_sessions(next_client, &database, creation_count, disable_route_to_leader).await @@ -273,6 +305,7 @@ impl SessionPool { /// The client on the waiting list will be notified when another client's session has finished and /// when the process of replenishing the available sessions is complete. async fn acquire(&self) -> Result { + let request_started_at = Instant::now(); loop { let (on_session_acquired, session_count) = { let mut sessions = self.inner.write(); @@ -281,6 +314,9 @@ impl SessionPool { if sessions.waiters.is_empty() { if let Some(mut s) = sessions.take() { s.last_used_at = Instant::now(); + self.metrics.record_session_acquired(); + self.metrics + .record_session_acquire_latency(request_started_at.elapsed()); return Ok(ManagedSession::new(self.clone(), s)); } } @@ -301,6 +337,9 @@ impl SessionPool { let mut sessions = self.inner.write(); if let Some(mut s) = sessions.take() { s.last_used_at = Instant::now(); + self.metrics.record_session_acquired(); + self.metrics + .record_session_acquire_latency(request_started_at.elapsed()); return Ok(ManagedSession::new(self.clone(), s)); } else { continue; // another waiter raced for session @@ -319,6 +358,7 @@ impl SessionPool { "Timeout acquiring session" ); } + self.metrics.record_session_timeout(); return Err(SessionError::SessionGetTimeout); } } @@ -331,6 +371,7 @@ impl SessionPool { /// If the session is invalid /// - Discard the session. If the number of sessions falls below the threshold as a result of discarding, the session replenishment process is called. fn recycle(&self, mut session: SessionHandle) { + self.metrics.record_session_released(); if session.valid { let mut sessions = self.inner.write(); let waiter = sessions.take_waiter(); @@ -371,6 +412,22 @@ impl SessionPool { self.remove_orphans().await; } + fn snapshot_fn(&self) -> SessionPoolStatsFn { + let inner = self.inner.clone(); + let max_allowed = self.config.max_opened; + Arc::new(move || { + let sessions = inner.read(); + SessionPoolSnapshot { + open_sessions: sessions.num_opened(), + sessions_in_use: sessions.num_inuse, + idle_sessions: sessions.available_sessions.len(), + max_allowed_sessions: max_allowed, + max_in_use_last_window: sessions.max_inuse_window, + has_multiplexed_session: false, + } + }) + } + async fn remove_orphans(&self) { let empty = vec![]; let deleting_sessions = { mem::replace(&mut self.inner.write().orphans, empty) }; @@ -464,6 +521,7 @@ impl SessionManager { conn_pool: ConnectionManager, config: SessionConfig, disable_route_to_leader: bool, + metrics: Arc, ) -> Result, Status> { let database = database.into(); let (sender, receiver) = mpsc::unbounded_channel(); @@ -473,6 +531,7 @@ impl SessionManager { sender, Arc::new(config.clone()), disable_route_to_leader, + metrics.clone(), ) .await?; @@ -534,7 +593,10 @@ impl SessionManager { } session_count = rx.recv() => match session_count { Some(session_count) => { - let client = conn_pool.conn().with_metadata(client_metadata(&database)); + let client = conn_pool + .conn() + .with_metrics(session_pool.metrics.clone()) + .with_metadata(client_metadata(&database)); let database = database.clone(); tasks.spawn(async move { (session_count, batch_create_sessions(client, &database, session_count, disable_route_to_leader).await) }); }, @@ -701,6 +763,7 @@ mod tests { use google_cloud_googleapis::spanner::v1::ExecuteSqlRequest; use crate::apiv1::conn_pool::ConnectionManager; + use crate::metrics::MetricsRecorder; use crate::session::{ batch_create_sessions, client_metadata, health_check, SessionConfig, SessionError, SessionManager, }; @@ -723,7 +786,9 @@ mod tests { ) .await .unwrap(); - let sm = SessionManager::new(DATABASE, cm, config, false).await.unwrap(); + let sm = SessionManager::new(DATABASE, cm, config, false, Arc::new(MetricsRecorder::default())) + .await + .unwrap(); let counter = Arc::new(AtomicI64::new(0)); let mut spawns = Vec::with_capacity(100); @@ -763,7 +828,11 @@ mod tests { max_opened: 5, ..Default::default() }; - let sm = std::sync::Arc::new(SessionManager::new(DATABASE, cm, config, false).await.unwrap()); + let sm = std::sync::Arc::new( + SessionManager::new(DATABASE, cm, config, false, Arc::new(MetricsRecorder::default())) + .await + .unwrap(), + ); sleep(Duration::from_secs(1)).await; let cancel = CancellationToken::new(); @@ -792,7 +861,11 @@ mod tests { max_opened: 5, ..Default::default() }; - let sm = Arc::new(SessionManager::new(DATABASE, cm, config, false).await.unwrap()); + let sm = Arc::new( + SessionManager::new(DATABASE, cm, config, false, Arc::new(MetricsRecorder::default())) + .await + .unwrap(), + ); sleep(Duration::from_secs(1)).await; let cancel = CancellationToken::new(); @@ -821,7 +894,9 @@ mod tests { max_opened: 45, ..Default::default() }; - let sm = SessionManager::new(DATABASE, conn_pool, config, false).await.unwrap(); + let sm = SessionManager::new(DATABASE, conn_pool, config, false, Arc::new(MetricsRecorder::default())) + .await + .unwrap(); { let mut sessions = Vec::new(); for _ in 0..45 { @@ -862,7 +937,7 @@ mod tests { ..Default::default() }; let sm = Arc::new( - SessionManager::new(DATABASE, conn_pool, config.clone(), false) + SessionManager::new(DATABASE, conn_pool, config.clone(), false, Arc::new(MetricsRecorder::default())) .await .unwrap(), ); @@ -1121,7 +1196,9 @@ mod tests { .await .unwrap(); let config = SessionConfig::default(); - let sm = SessionManager::new(DATABASE, cm, config.clone(), false).await.unwrap(); + let sm = SessionManager::new(DATABASE, cm, config.clone(), false, Arc::new(MetricsRecorder::default())) + .await + .unwrap(); assert_eq!(sm.num_opened(), config.min_opened); sm.close().await; assert_eq!(sm.num_opened(), 0); @@ -1139,7 +1216,10 @@ mod tests { ) .await .unwrap(); - let client = cm.conn().with_metadata(client_metadata(DATABASE)); + let client = cm + .conn() + .with_metrics(Arc::new(MetricsRecorder::default())) + .with_metadata(client_metadata(DATABASE)); let session_count = 125; let result = batch_create_sessions(client.clone(), DATABASE, session_count, false).await; match result {