Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions spanner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ token-source = "1.0"
google-cloud-longrunning = { package = "gcloud-longrunning", version = "1.3.0", path = "../foundation/longrunning" }
google-cloud-gax = { package = "gcloud-gax", version = "1.3.1", path = "../foundation/gax" }
google-cloud-googleapis = { package = "gcloud-googleapis", version = "1.3.0", path = "../googleapis", features = ["spanner"]}
opentelemetry = { version = "0.31", optional = true, default-features = false, features = ["metrics"] }

google-cloud-auth = { package = "gcloud-auth", optional = true, version = "1.1.2", path="../foundation/auth", default-features=false }

Expand All @@ -45,3 +46,4 @@ auth = ["google-cloud-auth"]
default-tls = ["google-cloud-auth?/default-tls"]
rustls-tls = ["google-cloud-auth?/rustls-tls"]
external-account = ["google-cloud-auth?/external-account"]
otel-metrics = ["opentelemetry"]
75 changes: 75 additions & 0 deletions spanner/src/apiv1/spanner_client.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::sync::Arc;
use std::time::Duration;

use google_cloud_gax::conn::Channel;
Expand All @@ -13,6 +14,8 @@ use google_cloud_googleapis::spanner::v1::{
PartitionReadRequest, PartitionResponse, ReadRequest, ResultSet, RollbackRequest, Session, Transaction,
};

use crate::metrics::MetricsRecorder;

const ROUTE_TO_LEADER_HEADER: &str = "x-goog-spanner-route-to-leader";

pub(crate) fn ping_query_request(session_name: impl Into<String>) -> ExecuteSqlRequest {
Expand Down Expand Up @@ -48,6 +51,7 @@ fn default_setting() -> RetrySetting {
pub struct Client {
inner: SpannerClient<Channel>,
metadata: MetadataMap,
metrics: Arc<MetricsRecorder>,
}

impl Client {
Expand All @@ -57,6 +61,7 @@ impl Client {
Client {
inner: inner.max_decoding_message_size(i32::MAX as usize),
metadata: Default::default(),
metrics: Arc::new(MetricsRecorder::default()),
}
}

Expand All @@ -65,9 +70,15 @@ impl Client {
Client {
inner: self.inner,
metadata,
metrics: self.metrics,
}
}

pub(crate) fn with_metrics(mut self, metrics: Arc<MetricsRecorder>) -> Client {
self.metrics = metrics;
self
}

/// create_session creates a new session. A session can be used to perform
/// transactions that read and/or modify data in a Cloud Spanner database.
/// Sessions are meant to be reused for many consecutive
Expand Down Expand Up @@ -96,6 +107,7 @@ impl Client {
) -> Result<Response<Session>, Status> {
let setting = retry.unwrap_or_else(default_setting);
let database = &req.database;
let metrics = Arc::clone(&self.metrics);
invoke_fn(
Some(setting),
|this| async {
Expand All @@ -105,6 +117,9 @@ impl Client {
self,
)
.await
.inspect(move |response| {
Self::record_gfe(metrics.as_ref(), "createSession", response);
})
}

/// batch_create_sessions creates multiple new sessions.
Expand All @@ -120,6 +135,7 @@ impl Client {
) -> Result<Response<BatchCreateSessionsResponse>, Status> {
let setting = retry.unwrap_or_else(default_setting);
let database = &req.database;
let metrics = Arc::clone(&self.metrics);
invoke_fn(
Some(setting),
|this| async {
Expand All @@ -129,6 +145,9 @@ impl Client {
self,
)
.await
.inspect(move |response| {
Self::record_gfe(metrics.as_ref(), "batchCreateSessions", response);
})
}

/// get_session gets a session. Returns NOT_FOUND if the session does not exist.
Expand All @@ -142,6 +161,7 @@ impl Client {
) -> Result<Response<Session>, Status> {
let setting = retry.unwrap_or_else(default_setting);
let name = &req.name;
let metrics = Arc::clone(&self.metrics);
invoke_fn(
Some(setting),
|this| async {
Expand All @@ -151,6 +171,9 @@ impl Client {
self,
)
.await
.inspect(move |response| {
Self::record_gfe(metrics.as_ref(), "getSession", response);
})
}

/// list_sessions lists all sessions in a given database.
Expand All @@ -163,6 +186,7 @@ impl Client {
) -> Result<Response<ListSessionsResponse>, Status> {
let setting = retry.unwrap_or_else(default_setting);
let database = &req.database;
let metrics = Arc::clone(&self.metrics);
invoke_fn(
Some(setting),
|this| async {
Expand All @@ -172,6 +196,9 @@ impl Client {
self,
)
.await
.inspect(move |response| {
Self::record_gfe(metrics.as_ref(), "listSessions", response);
})
}

/// delete_session ends a session, releasing server resources associated with it. This will
Expand All @@ -186,6 +213,7 @@ impl Client {
) -> Result<Response<()>, Status> {
let setting = retry.unwrap_or_else(default_setting);
let name = &req.name;
let metrics = Arc::clone(&self.metrics);
invoke_fn(
Some(setting),
|this| async {
Expand All @@ -195,6 +223,9 @@ impl Client {
self,
)
.await
.inspect(move |response| {
Self::record_gfe(metrics.as_ref(), "deleteSession", response);
})
}

/// execute_sql executes an SQL statement, returning all results in a single reply. This
Expand All @@ -217,6 +248,7 @@ impl Client {
) -> Result<Response<ResultSet>, Status> {
let setting = retry.unwrap_or_else(default_setting);
let session = &req.session;
let metrics = Arc::clone(&self.metrics);
invoke_fn(
Some(setting),
|this| async {
Expand All @@ -226,6 +258,9 @@ impl Client {
self,
)
.await
.inspect(move |response| {
Self::record_gfe(metrics.as_ref(), "executeSql", response);
})
}

/// execute_streaming_sql like ExecuteSql, except returns the result
Expand All @@ -242,6 +277,7 @@ impl Client {
) -> Result<Response<Streaming<PartialResultSet>>, Status> {
let setting = retry.unwrap_or_else(default_setting);
let session = &req.session;
let metrics = Arc::clone(&self.metrics);
invoke_fn(
Some(setting),
|this| async {
Expand All @@ -251,6 +287,9 @@ impl Client {
self,
)
.await
.inspect(move |response| {
Self::record_gfe(metrics.as_ref(), "executeStreamingSql", response);
})
}

/// execute_batch_dml executes a batch of SQL DML statements. This method allows many statements
Expand All @@ -273,6 +312,7 @@ impl Client {
) -> Result<Response<ExecuteBatchDmlResponse>, Status> {
let setting = retry.unwrap_or_else(default_setting);
let session = &req.session;
let metrics = Arc::clone(&self.metrics);
invoke_fn(
Some(setting),
|this| async {
Expand All @@ -296,6 +336,9 @@ impl Client {
self,
)
.await
.inspect(move |response| {
Self::record_gfe(metrics.as_ref(), "executeBatchDml", response);
})
}

/// read reads rows from the database using key lookups and scans, as a
Expand All @@ -320,6 +363,7 @@ impl Client {
) -> Result<Response<ResultSet>, Status> {
let setting = retry.unwrap_or_else(default_setting);
let session = &req.session;
let metrics = Arc::clone(&self.metrics);
invoke_fn(
Some(setting),
|this| async {
Expand All @@ -329,6 +373,9 @@ impl Client {
self,
)
.await
.inspect(move |response| {
Self::record_gfe(metrics.as_ref(), "read", response);
})
}

/// streaming_read like read, except returns the result set as a
Expand All @@ -345,6 +392,7 @@ impl Client {
) -> Result<Response<Streaming<PartialResultSet>>, Status> {
let setting = retry.unwrap_or_else(default_setting);
let session = &req.session;
let metrics = Arc::clone(&self.metrics);
invoke_fn(
Some(setting),
|this| async {
Expand All @@ -354,6 +402,9 @@ impl Client {
self,
)
.await
.inspect(move |response| {
Self::record_gfe(metrics.as_ref(), "streamingRead", response);
})
}

/// BeginTransaction begins a new transaction. This step can often be skipped:
Expand All @@ -369,6 +420,7 @@ impl Client {
) -> Result<Response<Transaction>, Status> {
let setting = retry.unwrap_or_else(default_setting);
let session = &req.session;
let metrics = Arc::clone(&self.metrics);
invoke_fn(
Some(setting),
|this| async {
Expand All @@ -378,6 +430,9 @@ impl Client {
self,
)
.await
.inspect(move |response| {
Self::record_gfe(metrics.as_ref(), "beginTransaction", response);
})
}

/// Commit commits a transaction. The request includes the mutations to be
Expand All @@ -403,6 +458,7 @@ impl Client {
) -> Result<Response<CommitResponse>, Status> {
let setting = retry.unwrap_or_else(default_setting);
let session = &req.session;
let metrics = Arc::clone(&self.metrics);
invoke_fn(
Some(setting),
|this| async {
Expand All @@ -412,6 +468,9 @@ impl Client {
self,
)
.await
.inspect(move |response| {
Self::record_gfe(metrics.as_ref(), "commit", response);
})
}

/// Rollback rolls back a transaction, releasing any locks it holds. It is a good
Expand All @@ -431,6 +490,7 @@ impl Client {
) -> Result<Response<()>, Status> {
let setting = retry.unwrap_or_else(default_setting);
let session = &req.session;
let metrics = Arc::clone(&self.metrics);
invoke_fn(
Some(setting),
|this| async {
Expand All @@ -440,6 +500,9 @@ impl Client {
self,
)
.await
.inspect(move |response| {
Self::record_gfe(metrics.as_ref(), "rollback", response);
})
}

/// PartitionQuery creates a set of partition tokens that can be used to execute a query
Expand All @@ -462,6 +525,7 @@ impl Client {
) -> Result<Response<PartitionResponse>, Status> {
let setting = retry.unwrap_or_else(default_setting);
let session = &req.session;
let metrics = Arc::clone(&self.metrics);
invoke_fn(
Some(setting),
|this| async {
Expand All @@ -471,6 +535,9 @@ impl Client {
self,
)
.await
.inspect(move |response| {
Self::record_gfe(metrics.as_ref(), "partitionQuery", response);
})
}

/// PartitionRead creates a set of partition tokens that can be used to execute a read
Expand All @@ -495,6 +562,7 @@ impl Client {
) -> Result<Response<PartitionResponse>, Status> {
let setting = retry.unwrap_or_else(default_setting);
let session = &req.session;
let metrics = Arc::clone(&self.metrics);
invoke_fn(
Some(setting),
|this| async {
Expand All @@ -504,6 +572,9 @@ impl Client {
self,
)
.await
.inspect(move |response| {
Self::record_gfe(metrics.as_ref(), "partitionRead", response);
})
}

fn create_request<T>(
Expand All @@ -529,4 +600,8 @@ impl Client {
}
req
}

fn record_gfe<T>(metrics: &MetricsRecorder, method: &'static str, response: &Response<T>) {
metrics.record_server_timing(method, response.metadata());
}
}
19 changes: 17 additions & 2 deletions spanner/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use google_cloud_googleapis::spanner::v1::{commit_request, transaction_options,
use token_source::NoopTokenSourceProvider;

use crate::apiv1::conn_pool::{ConnectionManager, SPANNER};
use crate::metrics::{MetricsConfig, MetricsRecorder};
use crate::retry::TransactionRetrySetting;
use crate::session::{ManagedSession, SessionConfig, SessionError, SessionManager};
use crate::statement::Statement;
Expand Down Expand Up @@ -82,6 +83,8 @@ pub struct ClientConfig {
pub environment: Environment,
/// DisableRouteToLeader specifies if all the requests of type read-write and PDML need to be routed to the leader region.
pub disable_route_to_leader: bool,
/// Metrics configuration for emitting OpenTelemetry signals.
pub metrics: MetricsConfig,
}

impl Default for ClientConfig {
Expand All @@ -95,6 +98,7 @@ impl Default for ClientConfig {
None => Environment::GoogleCloud(Box::new(NoopTokenSourceProvider {})),
},
disable_route_to_leader: false,
metrics: MetricsConfig::default(),
};
config.session_config.min_opened = config.channel_config.num_channels * 4;
config.session_config.max_opened = config.channel_config.num_channels * 100;
Expand Down Expand Up @@ -184,15 +188,26 @@ impl Client {
)));
}

let database: String = database.into();
let metrics = Arc::new(
MetricsRecorder::try_new(&database, &config.metrics).map_err(|e| Error::InvalidConfig(e.to_string()))?,
);

let pool_size = config.channel_config.num_channels;
let options = ConnectionOptions {
timeout: Some(config.channel_config.timeout),
connect_timeout: Some(config.channel_config.connect_timeout),
};
let conn_pool =
ConnectionManager::new(pool_size, &config.environment, config.endpoint.as_str(), &options).await?;
let session_manager =
SessionManager::new(database, conn_pool, config.session_config, config.disable_route_to_leader).await?;
let session_manager = SessionManager::new(
database,
conn_pool,
config.session_config,
config.disable_route_to_leader,
metrics.clone(),
)
.await?;

Ok(Client {
sessions: session_manager,
Expand Down
1 change: 1 addition & 0 deletions spanner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,7 @@ pub mod admin;
pub mod apiv1;
pub mod client;
pub mod key;
pub mod metrics;
pub mod mutation;
pub mod reader;
pub mod retry;
Expand Down
Loading