From 8d8e041cfddfb7e0187ac078a35b1e568612c442 Mon Sep 17 00:00:00 2001 From: Dario Marasco Date: Fri, 14 Nov 2025 18:31:40 -0800 Subject: [PATCH 1/5] add otel-metrics to spanner --- spanner/Cargo.toml | 2 + spanner/src/apiv1/spanner_client.rs | 105 +++++- spanner/src/client.rs | 11 +- spanner/src/lib.rs | 1 + spanner/src/metrics.rs | 473 ++++++++++++++++++++++++++++ spanner/src/session.rs | 107 ++++++- 6 files changed, 669 insertions(+), 30 deletions(-) create mode 100644 spanner/src/metrics.rs diff --git a/spanner/Cargo.toml b/spanner/Cargo.toml index e19012d5..3f2eb81c 100644 --- a/spanner/Cargo.toml +++ b/spanner/Cargo.toml @@ -28,6 +28,7 @@ token-source = "1.0" google-cloud-longrunning = { package = "gcloud-longrunning", version = "1.3.0", path = "../foundation/longrunning" } google-cloud-gax = { package = "gcloud-gax", version = "1.3.1", path = "../foundation/gax" } google-cloud-googleapis = { package = "gcloud-googleapis", version = "1.3.0", path = "../googleapis", features = ["spanner"]} +opentelemetry = { version = "0.31", optional = true, default-features = false, features = ["metrics"] } google-cloud-auth = { package = "gcloud-auth", optional = true, version = "1.1.2", path="../foundation/auth", default-features=false } @@ -45,3 +46,4 @@ auth = ["google-cloud-auth"] default-tls = ["google-cloud-auth?/default-tls"] rustls-tls = ["google-cloud-auth?/rustls-tls"] external-account = ["google-cloud-auth?/external-account"] +otel-metrics = ["opentelemetry"] diff --git a/spanner/src/apiv1/spanner_client.rs b/spanner/src/apiv1/spanner_client.rs index a26edff5..fb6dcecf 100644 --- a/spanner/src/apiv1/spanner_client.rs +++ b/spanner/src/apiv1/spanner_client.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::time::Duration; use google_cloud_gax::conn::Channel; @@ -13,6 +14,8 @@ use google_cloud_googleapis::spanner::v1::{ PartitionReadRequest, PartitionResponse, ReadRequest, ResultSet, RollbackRequest, Session, Transaction, }; +use crate::metrics::MetricsRecorder; + pub(crate) fn ping_query_request(session_name: impl Into) -> ExecuteSqlRequest { ExecuteSqlRequest { session: session_name.into(), @@ -46,6 +49,7 @@ fn default_setting() -> RetrySetting { pub struct Client { inner: SpannerClient, metadata: MetadataMap, + metrics: Arc, } impl Client { @@ -55,6 +59,7 @@ impl Client { Client { inner: inner.max_decoding_message_size(i32::MAX as usize), metadata: Default::default(), + metrics: Arc::new(MetricsRecorder::default()), } } @@ -63,9 +68,15 @@ impl Client { Client { inner: self.inner, metadata, + metrics: self.metrics, } } + pub(crate) fn with_metrics(mut self, metrics: Arc) -> Client { + self.metrics = metrics; + self + } + /// create_session creates a new session. A session can be used to perform /// transactions that read and/or modify data in a Cloud Spanner database. /// Sessions are meant to be reused for many consecutive @@ -93,15 +104,19 @@ impl Client { ) -> Result, Status> { let setting = retry.unwrap_or_else(default_setting); let database = &req.database; + let metrics = Arc::clone(&self.metrics); invoke_fn( Some(setting), |this| async { let request = this.create_request(format!("database={database}"), req.clone()); this.inner.create_session(request).await.map_err(|e| (e, this)) }, - self, + &mut *self, ) .await + .inspect(move |response| { + Self::record_gfe(metrics.as_ref(), "createSession", response); + }) } /// batch_create_sessions creates multiple new sessions. @@ -116,15 +131,19 @@ impl Client { ) -> Result, Status> { let setting = retry.unwrap_or_else(default_setting); let database = &req.database; + let metrics = Arc::clone(&self.metrics); invoke_fn( Some(setting), |this| async { let request = this.create_request(format!("database={database}"), req.clone()); this.inner.batch_create_sessions(request).await.map_err(|e| (e, this)) }, - self, + &mut *self, ) .await + .inspect(move |response| { + Self::record_gfe(metrics.as_ref(), "batchCreateSessions", response); + }) } /// get_session gets a session. Returns NOT_FOUND if the session does not exist. @@ -137,15 +156,19 @@ impl Client { ) -> Result, Status> { let setting = retry.unwrap_or_else(default_setting); let name = &req.name; + let metrics = Arc::clone(&self.metrics); invoke_fn( Some(setting), |this| async { let request = this.create_request(format!("name={name}"), req.clone()); this.inner.get_session(request).await.map_err(|e| (e, this)) }, - self, + &mut *self, ) .await + .inspect(move |response| { + Self::record_gfe(metrics.as_ref(), "getSession", response); + }) } /// list_sessions lists all sessions in a given database. @@ -157,15 +180,19 @@ impl Client { ) -> Result, Status> { let setting = retry.unwrap_or_else(default_setting); let database = &req.database; + let metrics = Arc::clone(&self.metrics); invoke_fn( Some(setting), |this| async { let request = this.create_request(format!("database={database}"), req.clone()); this.inner.list_sessions(request).await.map_err(|e| (e, this)) }, - self, + &mut *self, ) .await + .inspect(move |response| { + Self::record_gfe(metrics.as_ref(), "listSessions", response); + }) } /// delete_session ends a session, releasing server resources associated with it. This will @@ -179,15 +206,19 @@ impl Client { ) -> Result, Status> { let setting = retry.unwrap_or_else(default_setting); let name = &req.name; + let metrics = Arc::clone(&self.metrics); invoke_fn( Some(setting), |this| async { let request = this.create_request(format!("name={name}"), req.clone()); this.inner.delete_session(request).await.map_err(|e| (e, this)) }, - self, + &mut *self, ) .await + .inspect(move |response| { + Self::record_gfe(metrics.as_ref(), "deleteSession", response); + }) } /// execute_sql executes an SQL statement, returning all results in a single reply. This @@ -209,15 +240,19 @@ impl Client { ) -> Result, Status> { let setting = retry.unwrap_or_else(default_setting); let session = &req.session; + let metrics = Arc::clone(&self.metrics); invoke_fn( Some(setting), |this| async { let request = this.create_request(format!("session={session}"), req.clone()); this.inner.execute_sql(request).await.map_err(|e| (e, this)) }, - self, + &mut *self, ) .await + .inspect(move |response| { + Self::record_gfe(metrics.as_ref(), "executeSql", response); + }) } /// execute_streaming_sql like ExecuteSql, except returns the result @@ -233,15 +268,19 @@ impl Client { ) -> Result>, Status> { let setting = retry.unwrap_or_else(default_setting); let session = &req.session; + let metrics = Arc::clone(&self.metrics); invoke_fn( Some(setting), |this| async { let request = this.create_request(format!("session={session}"), req.clone()); this.inner.execute_streaming_sql(request).await.map_err(|e| (e, this)) }, - self, + &mut *self, ) .await + .inspect(move |response| { + Self::record_gfe(metrics.as_ref(), "executeStreamingSql", response); + }) } /// execute_batch_dml executes a batch of SQL DML statements. This method allows many statements @@ -263,6 +302,7 @@ impl Client { ) -> Result, Status> { let setting = retry.unwrap_or_else(default_setting); let session = &req.session; + let metrics = Arc::clone(&self.metrics); invoke_fn( Some(setting), |this| async { @@ -283,9 +323,12 @@ impl Client { Err(err) => Err((err, this)), } }, - self, + &mut *self, ) .await + .inspect(move |response| { + Self::record_gfe(metrics.as_ref(), "executeBatchDml", response); + }) } /// read reads rows from the database using key lookups and scans, as a @@ -305,15 +348,19 @@ impl Client { pub async fn read(&mut self, req: ReadRequest, retry: Option) -> Result, Status> { let setting = retry.unwrap_or_else(default_setting); let session = &req.session; + let metrics = Arc::clone(&self.metrics); invoke_fn( Some(setting), |this| async { let request = this.create_request(format!("session={session}"), req.clone()); this.inner.read(request).await.map_err(|e| (e, this)) }, - self, + &mut *self, ) .await + .inspect(move |response| { + Self::record_gfe(metrics.as_ref(), "read", response); + }) } /// streaming_read like read, except returns the result set as a @@ -329,15 +376,19 @@ impl Client { ) -> Result>, Status> { let setting = retry.unwrap_or_else(default_setting); let session = &req.session; + let metrics = Arc::clone(&self.metrics); invoke_fn( Some(setting), |this| async { let request = this.create_request(format!("session={session}"), req.clone()); this.inner.streaming_read(request).await.map_err(|e| (e, this)) }, - self, + &mut *self, ) .await + .inspect(move |response| { + Self::record_gfe(metrics.as_ref(), "streamingRead", response); + }) } /// BeginTransaction begins a new transaction. This step can often be skipped: @@ -352,15 +403,19 @@ impl Client { ) -> Result, Status> { let setting = retry.unwrap_or_else(default_setting); let session = &req.session; + let metrics = Arc::clone(&self.metrics); invoke_fn( Some(setting), |this| async { let request = this.create_request(format!("session={session}"), req.clone()); this.inner.begin_transaction(request).await.map_err(|e| (e, this)) }, - self, + &mut *self, ) .await + .inspect(move |response| { + Self::record_gfe(metrics.as_ref(), "beginTransaction", response); + }) } /// Commit commits a transaction. The request includes the mutations to be @@ -385,15 +440,19 @@ impl Client { ) -> Result, Status> { let setting = retry.unwrap_or_else(default_setting); let session = &req.session; + let metrics = Arc::clone(&self.metrics); invoke_fn( Some(setting), |this| async { let request = this.create_request(format!("session={session}"), req.clone()); this.inner.commit(request).await.map_err(|e| (e, this)) }, - self, + &mut *self, ) .await + .inspect(move |response| { + Self::record_gfe(metrics.as_ref(), "commit", response); + }) } /// Rollback rolls back a transaction, releasing any locks it holds. It is a good @@ -412,15 +471,19 @@ impl Client { ) -> Result, Status> { let setting = retry.unwrap_or_else(default_setting); let session = &req.session; + let metrics = Arc::clone(&self.metrics); invoke_fn( Some(setting), |this| async { let request = this.create_request(format!("session={session}"), req.clone()); this.inner.rollback(request).await.map_err(|e| (e, this)) }, - self, + &mut *self, ) .await + .inspect(move |response| { + Self::record_gfe(metrics.as_ref(), "rollback", response); + }) } /// PartitionQuery creates a set of partition tokens that can be used to execute a query @@ -442,15 +505,19 @@ impl Client { ) -> Result, Status> { let setting = retry.unwrap_or_else(default_setting); let session = &req.session; + let metrics = Arc::clone(&self.metrics); invoke_fn( Some(setting), |this| async { let request = this.create_request(format!("session={session}"), req.clone()); this.inner.partition_query(request).await.map_err(|e| (e, this)) }, - self, + &mut *self, ) .await + .inspect(move |response| { + Self::record_gfe(metrics.as_ref(), "partitionQuery", response); + }) } /// PartitionRead creates a set of partition tokens that can be used to execute a read @@ -474,15 +541,19 @@ impl Client { ) -> Result, Status> { let setting = retry.unwrap_or_else(default_setting); let session = &req.session; + let metrics = Arc::clone(&self.metrics); invoke_fn( Some(setting), |this| async { let request = this.create_request(format!("session={session}"), req.clone()); this.inner.partition_read(request).await.map_err(|e| (e, this)) }, - self, + &mut *self, ) .await + .inspect(move |response| { + Self::record_gfe(metrics.as_ref(), "partitionRead", response); + }) } fn create_request(&self, param_string: String, into_request: impl grpc::IntoRequest) -> grpc::Request { @@ -500,4 +571,8 @@ impl Client { } req } + + fn record_gfe(metrics: &MetricsRecorder, method: &'static str, response: &Response) { + metrics.record_server_timing(method, response.metadata()); + } } diff --git a/spanner/src/client.rs b/spanner/src/client.rs index 61f2b262..66e16f72 100644 --- a/spanner/src/client.rs +++ b/spanner/src/client.rs @@ -12,6 +12,7 @@ use google_cloud_googleapis::spanner::v1::{commit_request, transaction_options, use token_source::NoopTokenSourceProvider; use crate::apiv1::conn_pool::{ConnectionManager, SPANNER}; +use crate::metrics::{MetricsConfig, MetricsRecorder}; use crate::retry::TransactionRetrySetting; use crate::session::{ManagedSession, SessionConfig, SessionError, SessionManager}; use crate::statement::Statement; @@ -80,6 +81,8 @@ pub struct ClientConfig { pub endpoint: String, /// Runtime project pub environment: Environment, + /// Metrics configuration for emitting OpenTelemetry signals. + pub metrics: MetricsConfig, } impl Default for ClientConfig { @@ -92,6 +95,7 @@ impl Default for ClientConfig { Some(v) => Environment::Emulator(v), None => Environment::GoogleCloud(Box::new(NoopTokenSourceProvider {})), }, + metrics: MetricsConfig::default(), }; config.session_config.min_opened = config.channel_config.num_channels * 4; config.session_config.max_opened = config.channel_config.num_channels * 100; @@ -180,6 +184,11 @@ impl Client { ))); } + let database: String = database.into(); + let metrics = Arc::new( + MetricsRecorder::try_new(&database, &config.metrics).map_err(|e| Error::InvalidConfig(e.to_string()))?, + ); + let pool_size = config.channel_config.num_channels; let options = ConnectionOptions { timeout: Some(config.channel_config.timeout), @@ -187,7 +196,7 @@ impl Client { }; let conn_pool = ConnectionManager::new(pool_size, &config.environment, config.endpoint.as_str(), &options).await?; - let session_manager = SessionManager::new(database, conn_pool, config.session_config).await?; + let session_manager = SessionManager::new(database, conn_pool, config.session_config, metrics.clone()).await?; Ok(Client { sessions: session_manager, diff --git a/spanner/src/lib.rs b/spanner/src/lib.rs index aa879e55..5a0bf03c 100644 --- a/spanner/src/lib.rs +++ b/spanner/src/lib.rs @@ -636,6 +636,7 @@ pub mod admin; pub mod apiv1; pub mod client; pub mod key; +pub mod metrics; pub mod mutation; pub mod reader; pub mod retry; diff --git a/spanner/src/metrics.rs b/spanner/src/metrics.rs new file mode 100644 index 00000000..d5e53e6a --- /dev/null +++ b/spanner/src/metrics.rs @@ -0,0 +1,473 @@ +#[cfg(feature = "otel-metrics")] +use std::collections::HashMap; +#[cfg(feature = "otel-metrics")] +use std::sync::atomic::AtomicU64; +use std::sync::Arc; + +use google_cloud_gax::grpc::metadata::MetadataMap; +use thiserror::Error; + +#[derive(Clone, Default)] +pub struct MetricsConfig { + /// Enables OpenTelemetry metrics emission when the `otel-metrics` feature is active. + pub enabled: bool, + #[cfg(feature = "otel-metrics")] + pub meter_provider: Option>, +} + +impl std::fmt::Debug for MetricsConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut ds = f.debug_struct("MetricsConfig"); + ds.field("enabled", &self.enabled); + #[cfg(feature = "otel-metrics")] + { + let provider = self.meter_provider.is_some(); + ds.field("meter_provider_present", &provider); + } + ds.finish() + } +} + +#[derive(Clone, Default)] +pub(crate) struct MetricsRecorder { + #[cfg(feature = "otel-metrics")] + inner: Option>, +} + +#[derive(Debug, Error)] +pub enum MetricsError { + #[error("invalid database name: {0}")] + InvalidDatabase(String), +} + +#[allow(dead_code)] +#[derive(Clone, Debug)] +pub(crate) struct SessionPoolSnapshot { + pub open_sessions: usize, + pub sessions_in_use: usize, + pub idle_sessions: usize, + pub max_allowed_sessions: usize, + pub max_in_use_last_window: usize, + pub has_multiplexed_session: bool, +} + +pub(crate) type SessionPoolStatsFn = Arc SessionPoolSnapshot + Send + Sync>; + +impl MetricsRecorder { + pub fn try_new(database: &str, config: &MetricsConfig) -> Result { + #[cfg(feature = "otel-metrics")] + { + if !config.enabled { + return Ok(Self { inner: None }); + } + + let parsed = parse_database_name(database)?; + let inner = OtelMetrics::new(parsed, config.meter_provider.clone()); + return Ok(Self { + inner: Some(Arc::new(inner)), + }); + } + #[cfg(not(feature = "otel-metrics"))] + { + let _ = database; + let _ = config; + Ok(Self::default()) + } + } + + pub(crate) fn register_session_pool(&self, stats: SessionPoolStatsFn) { + #[cfg(feature = "otel-metrics")] + if let Some(inner) = &self.inner { + inner.register_session_pool(stats); + } + #[cfg(not(feature = "otel-metrics"))] + { + let _ = stats; + } + } + + pub(crate) fn record_session_timeout(&self) { + #[cfg(feature = "otel-metrics")] + if let Some(inner) = &self.inner { + inner.record_session_timeout(); + } + } + + pub(crate) fn record_session_acquired(&self) { + #[cfg(feature = "otel-metrics")] + if let Some(inner) = &self.inner { + inner.record_session_acquired(); + } + } + + pub(crate) fn record_session_released(&self) { + #[cfg(feature = "otel-metrics")] + if let Some(inner) = &self.inner { + inner.record_session_released(); + } + } + + pub(crate) fn record_server_timing(&self, method: &'static str, metadata: &MetadataMap) { + #[cfg(feature = "otel-metrics")] + if let Some(inner) = &self.inner { + let metrics = parse_server_timing(metadata); + inner.record_gfe_metrics(method, metrics); + } + #[cfg(not(feature = "otel-metrics"))] + { + let _ = method; + let _ = metadata; + } + } +} + +#[cfg(feature = "otel-metrics")] +mod otel_impl { + use super::{ + ParsedDatabaseName, ServerTimingMetrics, SessionPoolStatsFn, ATTR_CLIENT_ID, ATTR_DATABASE, ATTR_INSTANCE, + ATTR_IS_MULTIPLEXED, ATTR_LIB_VERSION, ATTR_METHOD, ATTR_PROJECT, ATTR_TYPE, CLIENT_ID_SEQ, GFE_BUCKETS, + GFE_TIMING_HEADER, METRICS_PREFIX, OTEL_SCOPE, + }; + use opentelemetry::metrics::{Counter, Histogram, Meter, MeterProvider, ObservableGauge}; + use opentelemetry::{global, InstrumentationScope, KeyValue}; + use std::sync::atomic::Ordering; + use std::sync::{Arc, Mutex}; + + pub(super) struct OtelMetrics { + meter: Meter, + attributes: AttributeSets, + session_gauges: Mutex>, + get_session_timeouts: Counter, + acquired_sessions: Counter, + released_sessions: Counter, + gfe_latency: Histogram, + gfe_header_missing: Counter, + } + + struct SessionGaugeHandles { + _open_session_count: ObservableGauge, + _max_allowed_sessions: ObservableGauge, + _num_sessions: ObservableGauge, + _max_in_use_sessions: ObservableGauge, + } + + impl OtelMetrics { + pub(super) fn new( + parsed: ParsedDatabaseName, + meter_provider: Option>, + ) -> Self { + let scope = InstrumentationScope::builder(OTEL_SCOPE) + .with_version(env!("CARGO_PKG_VERSION")) + .build(); + let meter = if let Some(provider) = meter_provider { + provider.meter_with_scope(scope) + } else { + global::meter_provider().meter_with_scope(scope) + }; + + let attributes = AttributeSets::new(parsed); + + let get_session_timeouts = meter + .u64_counter(METRICS_PREFIX.to_owned() + "get_session_timeouts") + .with_description("The number of get sessions timeouts due to pool exhaustion.") + .with_unit("1") + .build(); + let acquired_sessions = meter + .u64_counter(METRICS_PREFIX.to_owned() + "num_acquired_sessions") + .with_description("The number of sessions acquired from the session pool.") + .with_unit("1") + .build(); + let released_sessions = meter + .u64_counter(METRICS_PREFIX.to_owned() + "num_released_sessions") + .with_description("The number of sessions released by the user and pool maintainer.") + .with_unit("1") + .build(); + let gfe_latency = meter + .f64_histogram(METRICS_PREFIX.to_owned() + "gfe_latency") + .with_description("Latency between Google's network receiving an RPC and reading back the first byte of the response.") + .with_unit("ms") + .with_boundaries(GFE_BUCKETS.to_vec()) + .build(); + let gfe_header_missing = meter + .u64_counter(METRICS_PREFIX.to_owned() + "gfe_header_missing_count") + .with_description("Number of RPC responses received without the server-timing header, most likely meaning the RPC never reached Google's network.") + .with_unit("1") + .build(); + + OtelMetrics { + meter, + attributes, + session_gauges: Mutex::new(None), + get_session_timeouts, + acquired_sessions, + released_sessions, + gfe_latency, + gfe_header_missing, + } + } + + pub(super) fn register_session_pool(&self, stats: SessionPoolStatsFn) { + let mut guard = self.session_gauges.lock().unwrap(); + if guard.is_some() { + return; + } + + let open_stats = stats.clone(); + let base = self.attributes.base.clone(); + let multiplexed = self.attributes.with_multiplexed.clone(); + let open_session_count = self + .meter + .i64_observable_gauge(METRICS_PREFIX.to_owned() + "open_session_count") + .with_description("Number of sessions currently opened.") + .with_unit("1") + .with_callback(move |observer| { + let snapshot = open_stats(); + if snapshot.has_multiplexed_session { + observer.observe(1, multiplexed.as_ref()); + } + observer.observe(snapshot.open_sessions as i64, base.as_ref()); + }) + .build(); + + let max_allowed_stats = stats.clone(); + let base = self.attributes.base.clone(); + let max_allowed_sessions = self + .meter + .i64_observable_gauge(METRICS_PREFIX.to_owned() + "max_allowed_sessions") + .with_description("The maximum number of sessions allowed. Configurable by the user.") + .with_unit("1") + .with_callback(move |observer| { + let snapshot = max_allowed_stats(); + observer.observe(snapshot.max_allowed_sessions as i64, base.as_ref()); + }) + .build(); + + let sessions_stats = stats.clone(); + let in_use_attrs = self.attributes.num_in_use.clone(); + let idle_attrs = self.attributes.num_sessions.clone(); + let num_sessions = self + .meter + .i64_observable_gauge(METRICS_PREFIX.to_owned() + "num_sessions_in_pool") + .with_description("The number of sessions currently in use.") + .with_unit("1") + .with_callback(move |observer| { + let snapshot = sessions_stats(); + observer.observe(snapshot.sessions_in_use as i64, in_use_attrs.as_ref()); + observer.observe(snapshot.idle_sessions as i64, idle_attrs.as_ref()); + }) + .build(); + + let max_in_use_stats = stats; + let attrs = self.attributes.max_in_use.clone(); + let max_in_use_sessions = self + .meter + .i64_observable_gauge(METRICS_PREFIX.to_owned() + "max_in_use_sessions") + .with_description("The maximum number of sessions in use during the last 10 minute interval.") + .with_unit("1") + .with_callback(move |observer| { + let snapshot = max_in_use_stats(); + observer.observe(snapshot.max_in_use_last_window as i64, attrs.as_ref()); + }) + .build(); + + *guard = Some(SessionGaugeHandles { + _open_session_count: open_session_count, + _max_allowed_sessions: max_allowed_sessions, + _num_sessions: num_sessions, + _max_in_use_sessions: max_in_use_sessions, + }); + } + + pub(super) fn record_session_timeout(&self) { + self.get_session_timeouts + .add(1, self.attributes.without_multiplexed.as_ref()); + } + + pub(super) fn record_session_acquired(&self) { + self.acquired_sessions + .add(1, self.attributes.without_multiplexed.as_ref()); + } + + pub(super) fn record_session_released(&self) { + self.released_sessions + .add(1, self.attributes.without_multiplexed.as_ref()); + } + + pub(super) fn record_gfe_metrics(&self, method: &'static str, metrics: ServerTimingMetrics) { + if metrics.is_empty() { + self.gfe_header_missing.add(1, self.attributes.base.as_ref()); + return; + } + + let mut attrs: Vec = self.attributes.base.as_ref().to_vec(); + attrs.push(KeyValue::new(ATTR_METHOD, method)); + + let latency = metrics.value(GFE_TIMING_HEADER); + self.gfe_latency.record(latency, &attrs); + } + } + + #[derive(Clone)] + struct AttributeSets { + base: Arc<[KeyValue]>, + with_multiplexed: Arc<[KeyValue]>, + without_multiplexed: Arc<[KeyValue]>, + num_in_use: Arc<[KeyValue]>, + num_sessions: Arc<[KeyValue]>, + max_in_use: Arc<[KeyValue]>, + } + + impl AttributeSets { + fn new(parsed: ParsedDatabaseName) -> Self { + let client_id = next_client_id(); + let base_vec = vec![ + KeyValue::new(ATTR_CLIENT_ID, client_id), + KeyValue::new(ATTR_DATABASE, parsed.database), + KeyValue::new(ATTR_INSTANCE, parsed.instance), + KeyValue::new(ATTR_PROJECT, parsed.project), + KeyValue::new(ATTR_LIB_VERSION, env!("CARGO_PKG_VERSION")), + ]; + + let mut with_multiplexed_vec = base_vec.clone(); + with_multiplexed_vec.push(KeyValue::new(ATTR_IS_MULTIPLEXED, "true")); + + let mut without_multiplexed_vec = base_vec.clone(); + without_multiplexed_vec.push(KeyValue::new(ATTR_IS_MULTIPLEXED, "false")); + + let mut num_in_use_vec = without_multiplexed_vec.clone(); + num_in_use_vec.push(KeyValue::new(ATTR_TYPE, "num_in_use_sessions")); + + let mut num_sessions_vec = without_multiplexed_vec.clone(); + num_sessions_vec.push(KeyValue::new(ATTR_TYPE, "num_sessions")); + + let max_in_use_vec = without_multiplexed_vec.clone(); + + AttributeSets { + base: base_vec.into(), + with_multiplexed: with_multiplexed_vec.into(), + without_multiplexed: without_multiplexed_vec.into(), + num_in_use: num_in_use_vec.into(), + num_sessions: num_sessions_vec.into(), + max_in_use: max_in_use_vec.into(), + } + } + } + + fn next_client_id() -> String { + let id = CLIENT_ID_SEQ.fetch_add(1, Ordering::Relaxed); + format!("rust-client-{id}") + } +} + +#[cfg(feature = "otel-metrics")] +use otel_impl::*; + +#[cfg(feature = "otel-metrics")] +const OTEL_SCOPE: &str = "cloud.google.com/go"; +#[cfg(feature = "otel-metrics")] +const METRICS_PREFIX: &str = "spanner/"; +#[cfg(feature = "otel-metrics")] +const ATTR_CLIENT_ID: &str = "client_id"; +#[cfg(feature = "otel-metrics")] +const ATTR_DATABASE: &str = "database"; +#[cfg(feature = "otel-metrics")] +const ATTR_INSTANCE: &str = "instance_id"; +#[cfg(feature = "otel-metrics")] +const ATTR_PROJECT: &str = "project_id"; +#[cfg(feature = "otel-metrics")] +const ATTR_LIB_VERSION: &str = "library_version"; +#[cfg(feature = "otel-metrics")] +const ATTR_IS_MULTIPLEXED: &str = "is_multiplexed"; +#[cfg(feature = "otel-metrics")] +const ATTR_TYPE: &str = "type"; +#[cfg(feature = "otel-metrics")] +const ATTR_METHOD: &str = "grpc_client_method"; +#[cfg(feature = "otel-metrics")] +const GFE_TIMING_HEADER: &str = "gfet4t7"; +#[cfg(feature = "otel-metrics")] +const SERVER_TIMING_HEADER: &str = "server-timing"; + +#[cfg(feature = "otel-metrics")] +const GFE_BUCKETS: &[f64] = &[ + 0.0, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, + 20.0, 25.0, 30.0, 40.0, 50.0, 65.0, 80.0, 100.0, 130.0, 160.0, 200.0, 250.0, 300.0, 400.0, 500.0, 650.0, 800.0, + 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0, 100000.0, 200000.0, 400000.0, 800000.0, 1600000.0, 3200000.0, +]; + +#[cfg(feature = "otel-metrics")] +static CLIENT_ID_SEQ: AtomicU64 = AtomicU64::new(1); + +#[cfg(feature = "otel-metrics")] +#[derive(Clone)] +struct ParsedDatabaseName { + database: String, + instance: String, + project: String, +} + +#[cfg(feature = "otel-metrics")] +fn parse_database_name(name: &str) -> Result { + let parts: Vec<&str> = name.split('/').collect(); + if parts.len() != 6 || parts[0] != "projects" || parts[2] != "instances" || parts[4] != "databases" { + return Err(MetricsError::InvalidDatabase(name.to_string())); + } + Ok(ParsedDatabaseName { + project: parts[1].to_string(), + instance: parts[3].to_string(), + database: parts[5].to_string(), + }) +} + +#[cfg(feature = "otel-metrics")] +#[derive(Clone)] +struct ServerTimingMetrics { + values: HashMap, +} + +#[cfg(feature = "otel-metrics")] +impl ServerTimingMetrics { + fn is_empty(&self) -> bool { + self.values.is_empty() + } + + fn value(&self, key: &str) -> f64 { + self.values.get(key).copied().unwrap_or_default() + } +} + +#[cfg(feature = "otel-metrics")] +fn parse_server_timing(metadata: &MetadataMap) -> ServerTimingMetrics { + let mut map = HashMap::new(); + for value in metadata.get_all(SERVER_TIMING_HEADER).iter() { + if let Ok(raw) = value.to_str() { + for part in raw.split(',') { + let trimmed = part.trim(); + if let Some((name, dur_part)) = trimmed.split_once(';') { + let name = name.trim(); + if let Some(duration) = dur_part.trim().strip_prefix("dur=") { + if let Ok(parsed) = duration.trim().parse::() { + map.insert(name.to_string(), parsed); + } + } + } + } + } + } + ServerTimingMetrics { values: map } +} + +#[cfg(all(test, feature = "otel-metrics"))] +mod tests { + use super::*; + + #[test] + fn parses_server_timing_header() { + let mut metadata = MetadataMap::new(); + metadata + .insert(SERVER_TIMING_HEADER, "gfet4t7;dur=12.5,another-metric;dur=3.5".parse().unwrap()) + .unwrap(); + let metrics = parse_server_timing(&metadata); + assert!(!metrics.is_empty()); + assert!((metrics.value(GFE_TIMING_HEADER) - 12.5).abs() < f64::EPSILON); + } +} diff --git a/spanner/src/session.rs b/spanner/src/session.rs index 7e1598b2..7c5c4605 100644 --- a/spanner/src/session.rs +++ b/spanner/src/session.rs @@ -20,6 +20,9 @@ use google_cloud_googleapis::spanner::v1::{BatchCreateSessionsRequest, DeleteSes use crate::apiv1::conn_pool::ConnectionManager; use crate::apiv1::spanner_client::{ping_query_request, Client}; +use crate::metrics::{MetricsRecorder, SessionPoolSnapshot, SessionPoolStatsFn}; + +const MAX_IN_USE_WINDOW: Duration = Duration::from_secs(600); /// Session pub struct SessionHandle { @@ -124,6 +127,11 @@ struct Sessions { /// number of sessions scheduled to be replenished. num_creating: usize, + + /// Maximum observed number of sessions in use during the current window. + max_inuse_window: usize, + /// Start of the rolling window used for `max_inuse_window`. + window_started_at: Instant, } impl Sessions { @@ -146,13 +154,16 @@ impl Sessions { None => None, Some(s) => { self.num_inuse += 1; + self.update_max_in_use(); Some(s) } } } fn release(&mut self, session: SessionHandle) { - self.num_inuse -= 1; + if self.num_inuse > 0 { + self.num_inuse -= 1; + } if session.valid { self.available_sessions.push_back(session); } else if !session.deleted { @@ -197,6 +208,16 @@ impl Sessions { Err(e) => tracing::error!("failed to create new sessions {:?}", e), } } + + fn update_max_in_use(&mut self) { + let now = Instant::now(); + if now.duration_since(self.window_started_at) >= MAX_IN_USE_WINDOW { + self.window_started_at = now; + self.max_inuse_window = self.num_inuse; + } else if self.num_inuse > self.max_inuse_window { + self.max_inuse_window = self.num_inuse; + } + } } #[derive(Clone)] @@ -204,6 +225,7 @@ struct SessionPool { inner: Arc>, session_creation_sender: UnboundedSender, config: Arc, + metrics: Arc, } impl SessionPool { @@ -212,25 +234,32 @@ impl SessionPool { conn_pool: &ConnectionManager, session_creation_sender: UnboundedSender, config: Arc, + metrics: Arc, ) -> Result { - let available_sessions = Self::init_pool(database, conn_pool, config.min_opened).await?; - Ok(SessionPool { + let available_sessions = Self::init_pool(database, conn_pool, config.min_opened, metrics.clone()).await?; + let pool = SessionPool { inner: Arc::new(RwLock::new(Sessions { available_sessions, waiters: VecDeque::new(), orphans: Vec::new(), num_inuse: 0, num_creating: 0, + max_inuse_window: 0, + window_started_at: Instant::now(), })), session_creation_sender, config, - }) + metrics, + }; + pool.metrics.register_session_pool(pool.snapshot_fn()); + Ok(pool) } async fn init_pool( database: String, conn_pool: &ConnectionManager, min_opened: usize, + metrics: Arc, ) -> Result, Status> { let channel_num = conn_pool.num(); let creation_count_per_channel = min_opened / channel_num; @@ -245,7 +274,10 @@ impl SessionPool { } else { creation_count_per_channel }; - let next_client = conn_pool.conn().with_metadata(client_metadata(&database)); + let next_client = conn_pool + .conn() + .with_metrics(metrics.clone()) + .with_metadata(client_metadata(&database)); let database = database.clone(); tasks.spawn(async move { batch_create_sessions(next_client, &database, creation_count).await }); } @@ -276,6 +308,7 @@ impl SessionPool { if sessions.waiters.is_empty() { if let Some(mut s) = sessions.take() { s.last_used_at = Instant::now(); + self.metrics.record_session_acquired(); return Ok(ManagedSession::new(self.clone(), s)); } } @@ -296,6 +329,7 @@ impl SessionPool { let mut sessions = self.inner.write(); if let Some(mut s) = sessions.take() { s.last_used_at = Instant::now(); + self.metrics.record_session_acquired(); return Ok(ManagedSession::new(self.clone(), s)); } else { continue; // another waiter raced for session @@ -314,6 +348,7 @@ impl SessionPool { "Timeout acquiring session" ); } + self.metrics.record_session_timeout(); return Err(SessionError::SessionGetTimeout); } } @@ -326,6 +361,7 @@ impl SessionPool { /// If the session is invalid /// - Discard the session. If the number of sessions falls below the threshold as a result of discarding, the session replenishment process is called. fn recycle(&self, mut session: SessionHandle) { + self.metrics.record_session_released(); if session.valid { let mut sessions = self.inner.write(); let waiter = sessions.take_waiter(); @@ -366,6 +402,22 @@ impl SessionPool { self.remove_orphans().await; } + fn snapshot_fn(&self) -> SessionPoolStatsFn { + let inner = self.inner.clone(); + let max_allowed = self.config.max_opened; + Arc::new(move || { + let sessions = inner.read(); + SessionPoolSnapshot { + open_sessions: sessions.num_opened(), + sessions_in_use: sessions.num_inuse, + idle_sessions: sessions.available_sessions.len(), + max_allowed_sessions: max_allowed, + max_in_use_last_window: sessions.max_inuse_window, + has_multiplexed_session: false, + } + }) + } + async fn remove_orphans(&self) { let empty = vec![]; let deleting_sessions = { mem::replace(&mut self.inner.write().orphans, empty) }; @@ -458,10 +510,12 @@ impl SessionManager { database: impl Into, conn_pool: ConnectionManager, config: SessionConfig, + metrics: Arc, ) -> Result, Status> { let database = database.into(); let (sender, receiver) = mpsc::unbounded_channel(); - let session_pool = SessionPool::new(database.clone(), &conn_pool, sender, Arc::new(config.clone())).await?; + let session_pool = + SessionPool::new(database.clone(), &conn_pool, sender, Arc::new(config.clone()), metrics.clone()).await?; let cancel = CancellationToken::new(); let task_session_cleaner = Self::spawn_health_check_task(config, session_pool.clone(), cancel.clone()); @@ -514,7 +568,10 @@ impl SessionManager { } session_count = rx.recv() => match session_count { Some(session_count) => { - let client = conn_pool.conn().with_metadata(client_metadata(&database)); + let client = conn_pool + .conn() + .with_metrics(session_pool.metrics.clone()) + .with_metadata(client_metadata(&database)); let database = database.clone(); tasks.spawn(async move { (session_count, batch_create_sessions(client, &database, session_count).await) }); }, @@ -670,6 +727,7 @@ mod tests { use google_cloud_googleapis::spanner::v1::ExecuteSqlRequest; use crate::apiv1::conn_pool::ConnectionManager; + use crate::metrics::MetricsRecorder; use crate::session::{ batch_create_sessions, client_metadata, health_check, SessionConfig, SessionError, SessionManager, }; @@ -692,7 +750,9 @@ mod tests { ) .await .unwrap(); - let sm = SessionManager::new(DATABASE, cm, config).await.unwrap(); + let sm = SessionManager::new(DATABASE, cm, config, Arc::new(MetricsRecorder::default())) + .await + .unwrap(); let counter = Arc::new(AtomicI64::new(0)); let mut spawns = Vec::with_capacity(100); @@ -732,7 +792,11 @@ mod tests { max_opened: 5, ..Default::default() }; - let sm = std::sync::Arc::new(SessionManager::new(DATABASE, cm, config).await.unwrap()); + let sm = std::sync::Arc::new( + SessionManager::new(DATABASE, cm, config, Arc::new(MetricsRecorder::default())) + .await + .unwrap(), + ); sleep(Duration::from_secs(1)).await; let cancel = CancellationToken::new(); @@ -761,7 +825,11 @@ mod tests { max_opened: 5, ..Default::default() }; - let sm = Arc::new(SessionManager::new(DATABASE, cm, config).await.unwrap()); + let sm = Arc::new( + SessionManager::new(DATABASE, cm, config, Arc::new(MetricsRecorder::default())) + .await + .unwrap(), + ); sleep(Duration::from_secs(1)).await; let cancel = CancellationToken::new(); @@ -790,7 +858,9 @@ mod tests { max_opened: 45, ..Default::default() }; - let sm = SessionManager::new(DATABASE, conn_pool, config).await.unwrap(); + let sm = SessionManager::new(DATABASE, conn_pool, config, Arc::new(MetricsRecorder::default())) + .await + .unwrap(); { let mut sessions = Vec::new(); for _ in 0..45 { @@ -830,7 +900,11 @@ mod tests { session_get_timeout: Duration::from_secs(1), ..Default::default() }; - let sm = Arc::new(SessionManager::new(DATABASE, conn_pool, config.clone()).await.unwrap()); + let sm = Arc::new( + SessionManager::new(DATABASE, conn_pool, config.clone(), Arc::new(MetricsRecorder::default())) + .await + .unwrap(), + ); let mu = Arc::new(RwLock::new(Vec::new())); let mut awaiters = Vec::with_capacity(100); for _ in 0..100 { @@ -1086,7 +1160,9 @@ mod tests { .await .unwrap(); let config = SessionConfig::default(); - let sm = SessionManager::new(DATABASE, cm, config.clone()).await.unwrap(); + let sm = SessionManager::new(DATABASE, cm, config.clone(), Arc::new(MetricsRecorder::default())) + .await + .unwrap(); assert_eq!(sm.num_opened(), config.min_opened); sm.close().await; assert_eq!(sm.num_opened(), 0); @@ -1104,7 +1180,10 @@ mod tests { ) .await .unwrap(); - let client = cm.conn().with_metadata(client_metadata(DATABASE)); + let client = cm + .conn() + .with_metrics(Arc::new(MetricsRecorder::default())) + .with_metadata(client_metadata(DATABASE)); let session_count = 125; let result = batch_create_sessions(client.clone(), DATABASE, session_count).await; match result { From 7807bdeb317d36ae0898c3f7952c537a15b486e6 Mon Sep 17 00:00:00 2001 From: yoshidan Date: Sun, 16 Nov 2025 11:48:19 +0900 Subject: [PATCH 2/5] fix clippy --- storage/src/http/bucket_access_controls/mod.rs | 9 ++------- storage/src/http/notifications/mod.rs | 8 ++------ 2 files changed, 4 insertions(+), 13 deletions(-) diff --git a/storage/src/http/bucket_access_controls/mod.rs b/storage/src/http/bucket_access_controls/mod.rs index ecafd0ec..2c7bbbd8 100644 --- a/storage/src/http/bucket_access_controls/mod.rs +++ b/storage/src/http/bucket_access_controls/mod.rs @@ -64,15 +64,10 @@ pub struct BucketAccessControl { } /// A set of properties to return in a response. -#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, serde::Deserialize, serde::Serialize, Debug)] +#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, serde::Deserialize, serde::Serialize, Debug, Default)] pub enum BucketACLRole { OWNER, + #[default] READER, WRITER, } - -impl Default for BucketACLRole { - fn default() -> Self { - Self::READER - } -} diff --git a/storage/src/http/notifications/mod.rs b/storage/src/http/notifications/mod.rs index 64bb30c1..2885be31 100644 --- a/storage/src/http/notifications/mod.rs +++ b/storage/src/http/notifications/mod.rs @@ -44,15 +44,11 @@ pub enum EventType { #[derive(Clone, PartialEq, Eq, serde::Deserialize, serde::Serialize, Debug)] #[serde(rename_all = "SCREAMING_SNAKE_CASE")] +#[derive(Default)] pub enum PayloadFormat { /// The payload will be a UTF-8 string containing the resource representation of the object’s metadata. + #[default] JsonApiV1, /// No payload is included with the notification. None, } - -impl Default for PayloadFormat { - fn default() -> Self { - Self::JsonApiV1 - } -} From f0972f5014823f033ece32cedf7e334fd5e1664b Mon Sep 17 00:00:00 2001 From: Dario Marasco Date: Mon, 17 Nov 2025 13:40:55 -0800 Subject: [PATCH 3/5] include get session latency --- spanner/src/metrics.rs | 35 ++++++++++++++++++++++++++++++++++- spanner/src/session.rs | 5 +++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/spanner/src/metrics.rs b/spanner/src/metrics.rs index 469f3e7d..0d30c939 100644 --- a/spanner/src/metrics.rs +++ b/spanner/src/metrics.rs @@ -3,6 +3,7 @@ use std::collections::HashMap; #[cfg(feature = "otel-metrics")] use std::sync::atomic::AtomicU64; use std::sync::Arc; +use std::time::Duration; use google_cloud_gax::grpc::metadata::MetadataMap; use thiserror::Error; @@ -107,6 +108,17 @@ impl MetricsRecorder { } } + pub(crate) fn record_session_acquire_latency(&self, duration: Duration) { + #[cfg(feature = "otel-metrics")] + if let Some(inner) = &self.inner { + inner.record_session_acquire_latency(duration); + } + #[cfg(not(feature = "otel-metrics"))] + { + let _ = duration; + } + } + pub(crate) fn record_server_timing(&self, method: &'static str, metadata: &MetadataMap) { #[cfg(feature = "otel-metrics")] if let Some(inner) = &self.inner { @@ -126,12 +138,13 @@ mod otel_impl { use super::{ ParsedDatabaseName, ServerTimingMetrics, SessionPoolStatsFn, ATTR_CLIENT_ID, ATTR_DATABASE, ATTR_INSTANCE, ATTR_IS_MULTIPLEXED, ATTR_LIB_VERSION, ATTR_METHOD, ATTR_PROJECT, ATTR_TYPE, CLIENT_ID_SEQ, GFE_BUCKETS, - GFE_TIMING_HEADER, METRICS_PREFIX, OTEL_SCOPE, + GFE_TIMING_HEADER, METRICS_PREFIX, OTEL_SCOPE, SESSION_ACQUIRE_BUCKETS, }; use opentelemetry::metrics::{Counter, Histogram, Meter, MeterProvider, ObservableGauge}; use opentelemetry::{global, InstrumentationScope, KeyValue}; use std::sync::atomic::Ordering; use std::sync::{Arc, Mutex}; + use std::time::Duration; pub(super) struct OtelMetrics { meter: Meter, @@ -142,6 +155,7 @@ mod otel_impl { released_sessions: Counter, gfe_latency: Histogram, gfe_header_missing: Counter, + session_acquire_latency: Histogram, } struct SessionGaugeHandles { @@ -193,6 +207,12 @@ mod otel_impl { .with_description("Number of RPC responses received without the server-timing header, most likely meaning the RPC never reached Google's network.") .with_unit("1") .build(); + let session_acquire_latency = meter + .f64_histogram(METRICS_PREFIX.to_owned() + "session_acquire_latency") + .with_description("Time spent waiting to acquire a session from the pool.") + .with_unit("ms") + .with_boundaries(SESSION_ACQUIRE_BUCKETS.to_vec()) + .build(); OtelMetrics { meter, @@ -203,6 +223,7 @@ mod otel_impl { released_sessions, gfe_latency, gfe_header_missing, + session_acquire_latency, } } @@ -293,6 +314,12 @@ mod otel_impl { .add(1, self.attributes.without_multiplexed.as_ref()); } + pub(super) fn record_session_acquire_latency(&self, duration: Duration) { + let latency_ms = duration.as_secs_f64() * 1000.0; + self.session_acquire_latency + .record(latency_ms, self.attributes.base.as_ref()); + } + pub(super) fn record_gfe_metrics(&self, method: &'static str, metrics: ServerTimingMetrics) { if metrics.is_empty() { self.gfe_header_missing.add(1, self.attributes.base.as_ref()); @@ -387,6 +414,12 @@ const GFE_TIMING_HEADER: &str = "gfet4t7"; #[cfg(feature = "otel-metrics")] const SERVER_TIMING_HEADER: &str = "server-timing"; +#[cfg(feature = "otel-metrics")] +const SESSION_ACQUIRE_BUCKETS: &[f64] = &[ + 0.0, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 75.0, 100.0, 150.0, 200.0, 300.0, 400.0, 500.0, 750.0, 1000.0, 1500.0, + 2000.0, 3000.0, 4000.0, 5000.0, 7500.0, 10000.0, 15000.0, 30000.0, 60000.0, +]; + #[cfg(feature = "otel-metrics")] const GFE_BUCKETS: &[f64] = &[ 0.0, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, diff --git a/spanner/src/session.rs b/spanner/src/session.rs index 68897ece..7aac1edf 100644 --- a/spanner/src/session.rs +++ b/spanner/src/session.rs @@ -305,6 +305,7 @@ impl SessionPool { /// The client on the waiting list will be notified when another client's session has finished and /// when the process of replenishing the available sessions is complete. async fn acquire(&self) -> Result { + let request_started_at = Instant::now(); loop { let (on_session_acquired, session_count) = { let mut sessions = self.inner.write(); @@ -314,6 +315,8 @@ impl SessionPool { if let Some(mut s) = sessions.take() { s.last_used_at = Instant::now(); self.metrics.record_session_acquired(); + self.metrics + .record_session_acquire_latency(request_started_at.elapsed()); return Ok(ManagedSession::new(self.clone(), s)); } } @@ -335,6 +338,8 @@ impl SessionPool { if let Some(mut s) = sessions.take() { s.last_used_at = Instant::now(); self.metrics.record_session_acquired(); + self.metrics + .record_session_acquire_latency(request_started_at.elapsed()); return Ok(ManagedSession::new(self.clone(), s)); } else { continue; // another waiter raced for session From 78646a7bd192b9db60f98bf9a24f49d83c35382b Mon Sep 17 00:00:00 2001 From: Dario Marasco Date: Mon, 17 Nov 2025 14:02:27 -0800 Subject: [PATCH 4/5] remove mut --- spanner/src/apiv1/spanner_client.rs | 30 ++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/spanner/src/apiv1/spanner_client.rs b/spanner/src/apiv1/spanner_client.rs index 34ee122c..3f35013d 100644 --- a/spanner/src/apiv1/spanner_client.rs +++ b/spanner/src/apiv1/spanner_client.rs @@ -114,7 +114,7 @@ impl Client { let request = this.create_request(disable_route_to_leader, format!("database={database}"), req.clone()); this.inner.create_session(request).await.map_err(|e| (e, this)) }, - &mut *self, + self, ) .await .inspect(move |response| { @@ -142,7 +142,7 @@ impl Client { let request = this.create_request(disable_route_to_leader, format!("database={database}"), req.clone()); this.inner.batch_create_sessions(request).await.map_err(|e| (e, this)) }, - &mut *self, + self, ) .await .inspect(move |response| { @@ -168,7 +168,7 @@ impl Client { let request = this.create_request(disable_route_to_leader, format!("name={name}"), req.clone()); this.inner.get_session(request).await.map_err(|e| (e, this)) }, - &mut *self, + self, ) .await .inspect(move |response| { @@ -193,7 +193,7 @@ impl Client { let request = this.create_request(disable_route_to_leader, format!("database={database}"), req.clone()); this.inner.list_sessions(request).await.map_err(|e| (e, this)) }, - &mut *self, + self, ) .await .inspect(move |response| { @@ -220,7 +220,7 @@ impl Client { let request = this.create_request(disable_route_to_leader, format!("name={name}"), req.clone()); this.inner.delete_session(request).await.map_err(|e| (e, this)) }, - &mut *self, + self, ) .await .inspect(move |response| { @@ -255,7 +255,7 @@ impl Client { let request = this.create_request(disable_route_to_leader, format!("session={session}"), req.clone()); this.inner.execute_sql(request).await.map_err(|e| (e, this)) }, - &mut *self, + self, ) .await .inspect(move |response| { @@ -284,7 +284,7 @@ impl Client { let request = this.create_request(disable_route_to_leader, format!("session={session}"), req.clone()); this.inner.execute_streaming_sql(request).await.map_err(|e| (e, this)) }, - &mut *self, + self, ) .await .inspect(move |response| { @@ -333,7 +333,7 @@ impl Client { Err(err) => Err((err, this)), } }, - &mut *self, + self, ) .await .inspect(move |response| { @@ -370,7 +370,7 @@ impl Client { let request = this.create_request(disable_route_to_leader, format!("session={session}"), req.clone()); this.inner.read(request).await.map_err(|e| (e, this)) }, - &mut *self, + self, ) .await .inspect(move |response| { @@ -399,7 +399,7 @@ impl Client { let request = this.create_request(disable_route_to_leader, format!("session={session}"), req.clone()); this.inner.streaming_read(request).await.map_err(|e| (e, this)) }, - &mut *self, + self, ) .await .inspect(move |response| { @@ -427,7 +427,7 @@ impl Client { let request = this.create_request(disable_route_to_leader, format!("session={session}"), req.clone()); this.inner.begin_transaction(request).await.map_err(|e| (e, this)) }, - &mut *self, + self, ) .await .inspect(move |response| { @@ -465,7 +465,7 @@ impl Client { let request = this.create_request(disable_route_to_leader, format!("session={session}"), req.clone()); this.inner.commit(request).await.map_err(|e| (e, this)) }, - &mut *self, + self, ) .await .inspect(move |response| { @@ -497,7 +497,7 @@ impl Client { let request = this.create_request(disable_route_to_leader, format!("session={session}"), req.clone()); this.inner.rollback(request).await.map_err(|e| (e, this)) }, - &mut *self, + self, ) .await .inspect(move |response| { @@ -532,7 +532,7 @@ impl Client { let request = this.create_request(disable_route_to_leader, format!("session={session}"), req.clone()); this.inner.partition_query(request).await.map_err(|e| (e, this)) }, - &mut *self, + self, ) .await .inspect(move |response| { @@ -569,7 +569,7 @@ impl Client { let request = this.create_request(disable_route_to_leader, format!("session={session}"), req.clone()); this.inner.partition_read(request).await.map_err(|e| (e, this)) }, - &mut *self, + self, ) .await .inspect(move |response| { From 84fcc5aceb1d4c9bdeacc3df15cd568b2a3b5937 Mon Sep 17 00:00:00 2001 From: Dario Marasco Date: Mon, 24 Nov 2025 14:18:57 -0800 Subject: [PATCH 5/5] update test --- spanner/src/metrics.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/spanner/src/metrics.rs b/spanner/src/metrics.rs index 0d30c939..5dac6ba3 100644 --- a/spanner/src/metrics.rs +++ b/spanner/src/metrics.rs @@ -496,9 +496,7 @@ mod tests { #[test] fn parses_server_timing_header() { let mut metadata = MetadataMap::new(); - metadata - .insert(SERVER_TIMING_HEADER, "gfet4t7;dur=12.5,another-metric;dur=3.5".parse().unwrap()) - .unwrap(); + metadata.insert(SERVER_TIMING_HEADER, "gfet4t7;dur=12.5,another-metric;dur=3.5".parse().unwrap()); let metrics = parse_server_timing(&metadata); assert!(!metrics.is_empty()); assert!((metrics.value(GFE_TIMING_HEADER) - 12.5).abs() < f64::EPSILON);