Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions spanner/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::retry::TransactionRetrySetting;
use crate::session::{ManagedSession, SessionConfig, SessionError, SessionManager};
use crate::statement::Statement;
use crate::transaction::{CallOptions, QueryOptions};
use crate::transaction_manager::TransactionManager;
use crate::transaction_ro::{BatchReadOnlyTransaction, ReadOnlyTransaction};
use crate::transaction_rw::{commit, CommitOptions, CommitResult, ReadWriteTransaction};
use crate::value::TimestampBound;
Expand Down Expand Up @@ -604,6 +605,43 @@ impl Client {
.map_err(|e| e.status.into())
}

/// transaction_manager creates a TransactionManager that allows manual control over
/// transaction execution with session reuse across retries. This is useful when you need
/// to handle transaction retry logic manually while maintaining the same session for
/// better lock priority.
///
/// The TransactionManager holds a session and allows multiple calls to
/// `begin_read_write_transaction()` that reuse the same session. This is particularly
/// helpful when retrying transactions that fail with ABORTED errors, as reusing the
/// session retains lock priority and increases the likelihood of commit success.
///
/// # Example
///
/// ```ignore
/// use google_cloud_spanner::client::Client;
/// use google_cloud_spanner::retry::TransactionRetry;
///
/// async fn run(client: Client) -> Result<(), Error> {
/// let mut tm = client.transaction_manager().await?;
/// let retry = &mut TransactionRetry::new();
///
/// loop {
/// let tx = tm.begin_read_write_transaction().await?;
///
/// let result = run_in_transaction(tx).await;
///
/// match tx.end(result, None).await {
/// Ok((commit_result, success)) => return Ok(success),
/// Err(err) => retry.next(err).await?
/// }
/// }
/// }
/// ```
pub async fn transaction_manager(&self) -> Result<TransactionManager, Error> {
let session = self.get_session().await?;
Ok(TransactionManager::new(session, self.disable_route_to_leader))
}

/// Get open session count.
pub fn session_count(&self) -> usize {
self.sessions.num_opened()
Expand Down
1 change: 1 addition & 0 deletions spanner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,7 @@ pub mod row;
pub mod session;
pub mod statement;
pub mod transaction;
pub mod transaction_manager;
pub mod transaction_ro;
pub mod transaction_rw;
pub mod value;
Expand Down
111 changes: 111 additions & 0 deletions spanner/src/transaction_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
use crate::client::Error;
use crate::session::ManagedSession;
use crate::transaction::CallOptions;
use crate::transaction_rw::ReadWriteTransaction;

/// TransactionManager manages a single session for executing multiple
/// read-write transactions with session reuse. This is particularly useful
/// for manual transaction retry loops where reusing the same session helps
/// maintain lock priority across retries.
///
/// # Example
///
/// ```rust,ignore
/// use google_cloud_spanner::client::Client;
/// use google_cloud_spanner::retry::TransactionRetry;
///
/// async fn example(client: Client) -> Result<(), Status> {
/// let mut tm = client.transaction_manager().await?;
/// let retry = &mut TransactionRetry::new();
///
/// loop {
/// let tx = tm.begin_read_write_transaction().await?;
///
/// let result = do_work(tx).await;
///
/// match tx.end(result, None).await {
/// Ok((commit_result, success)) => return Ok(success),
/// Err(err) => retry.next(err).await?
/// }
/// }
/// }
/// ```
pub struct TransactionManager {
session: Option<ManagedSession>,
transaction: Option<ReadWriteTransaction>,
disable_route_to_leader: bool,
}

impl TransactionManager {
/// Creates a new TransactionManager with the given session.
pub(crate) fn new(session: ManagedSession, disable_route_to_leader: bool) -> Self {
Self {
session: Some(session),
transaction: None,
disable_route_to_leader,
}
}

/// Returns a mutable reference to the current transaction, if one has been started.
///
/// Returns `None` if no transaction is currently active. Call
/// `begin_read_write_transaction()` to start a new transaction.
///
/// # Example
///
/// ```rust,ignore
/// let mut tm = client.transaction_manager().await?;
///
/// // Initially returns None
/// assert!(tm.transaction().is_none());
///
/// // After begin, returns Some
/// let tx = tm.begin_read_write_transaction().await?;
/// assert!(tm.transaction().is_some());
/// ```
pub fn transaction(&mut self) -> Option<&mut ReadWriteTransaction> {
self.transaction.as_mut()
}

/// Begins a new read-write transaction, reusing the session from the
/// previous transaction if one exists. Returns a mutable reference to
/// the transaction which can be used to execute queries and mutations.
///
/// The transaction must be ended by calling `end()` on the returned
/// reference before calling `begin_read_write_transaction()` again.
pub async fn begin_read_write_transaction(&mut self) -> Result<&mut ReadWriteTransaction, Error> {
self.begin_read_write_transaction_with_options(CallOptions::default(), None)
.await
}

/// Begins a new read-write transaction with custom call options and transaction tag.
/// This is similar to `begin_read_write_transaction()` but allows specifying
/// custom options for the transaction.
pub async fn begin_read_write_transaction_with_options(
&mut self,
options: CallOptions,
transaction_tag: Option<String>,
) -> Result<&mut ReadWriteTransaction, Error> {
// Extract session from previous transaction if it exists, otherwise use stored session
let session = if let Some(ref mut tx) = self.transaction {
tx.take_session()
.expect("transaction should have a session")
} else {
self.session.take().expect("manager should have a session")
};

// Create new transaction with the session
match ReadWriteTransaction::begin(session, options, transaction_tag, self.disable_route_to_leader).await {
Ok(new_tx) => {
// Store the transaction and return a mutable reference
self.transaction = Some(new_tx);
Ok(self.transaction.as_mut().unwrap())
}
Err(begin_error) => {
// Store session back for next retry attempt
self.session = Some(begin_error.session);
Err(begin_error.status.into())
}
}
}
}
Loading