From 1b82a1a5706841782a39ebb456c6e26fb22e0772 Mon Sep 17 00:00:00 2001 From: Daniel Norberg Date: Sat, 8 Nov 2025 18:52:14 +0100 Subject: [PATCH 1/6] [spanner] Add TransactionManager API for manual transaction control with session reuse MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements the TransactionManager API as described in issue #375. This allows manual control over transaction execution while maintaining session reuse across retries, which helps retain lock priority and improves commit success rates for ABORTED transactions. Key changes: - Add TransactionManager struct that holds a session across multiple transaction attempts - Add Client::transaction_manager() method to create a TransactionManager - Add Debug implementation for BeginError to support unwrap() in user code - Add comprehensive integration tests for TransactionManager functionality The API enables users to handle transaction retry logic manually while benefiting from session reuse: ```rust let mut tm = client.transaction_manager().await?; let retry = &mut TransactionRetry::new(); loop { let tx = tm.begin_read_write_transaction().await?; let result = run_in_transaction(tx).await; match tx.end(result, None).await { Ok((commit_result, success)) => return Ok(success), Err(err) => retry.next(err).await? } } ``` Fixes #375 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- spanner/src/client.rs | 38 ++++ spanner/src/lib.rs | 1 + spanner/src/transaction_manager.rs | 102 +++++++++ spanner/src/transaction_rw.rs | 9 + spanner/tests/transaction_manager_test.rs | 257 ++++++++++++++++++++++ 5 files changed, 407 insertions(+) create mode 100644 spanner/src/transaction_manager.rs create mode 100644 spanner/tests/transaction_manager_test.rs diff --git a/spanner/src/client.rs b/spanner/src/client.rs index f69cd2ff..3be14e25 100644 --- a/spanner/src/client.rs +++ b/spanner/src/client.rs @@ -16,6 +16,7 @@ use crate::retry::TransactionRetrySetting; use crate::session::{ManagedSession, SessionConfig, SessionError, SessionManager}; use crate::statement::Statement; use crate::transaction::{CallOptions, QueryOptions}; +use crate::transaction_manager::TransactionManager; use crate::transaction_ro::{BatchReadOnlyTransaction, ReadOnlyTransaction}; use crate::transaction_rw::{commit, CommitOptions, CommitResult, ReadWriteTransaction}; use crate::value::TimestampBound; @@ -604,6 +605,43 @@ impl Client { .map_err(|e| e.status.into()) } + /// transaction_manager creates a TransactionManager that allows manual control over + /// transaction execution with session reuse across retries. This is useful when you need + /// to handle transaction retry logic manually while maintaining the same session for + /// better lock priority. + /// + /// The TransactionManager holds a session and allows multiple calls to + /// `begin_read_write_transaction()` that reuse the same session. This is particularly + /// helpful when retrying transactions that fail with ABORTED errors, as reusing the + /// session retains lock priority and increases the likelihood of commit success. + /// + /// # Example + /// + /// ```ignore + /// use google_cloud_spanner::client::Client; + /// use google_cloud_spanner::retry::TransactionRetry; + /// + /// async fn run(client: Client) -> Result<(), Error> { + /// let mut tm = client.transaction_manager().await?; + /// let retry = &mut TransactionRetry::new(); + /// + /// loop { + /// let tx = tm.begin_read_write_transaction().await?; + /// + /// let result = run_in_transaction(tx).await; + /// + /// match tx.end(result, None).await { + /// Ok((commit_result, success)) => return Ok(success), + /// Err(err) => retry.next(err).await? + /// } + /// } + /// } + /// ``` + pub async fn transaction_manager(&self) -> Result { + let session = self.get_session().await?; + Ok(TransactionManager::new(session)) + } + /// Get open session count. pub fn session_count(&self) -> usize { self.sessions.num_opened() diff --git a/spanner/src/lib.rs b/spanner/src/lib.rs index aa879e55..e473d7f8 100644 --- a/spanner/src/lib.rs +++ b/spanner/src/lib.rs @@ -643,6 +643,7 @@ pub mod row; pub mod session; pub mod statement; pub mod transaction; +pub mod transaction_manager; pub mod transaction_ro; pub mod transaction_rw; pub mod value; diff --git a/spanner/src/transaction_manager.rs b/spanner/src/transaction_manager.rs new file mode 100644 index 00000000..130bf6ae --- /dev/null +++ b/spanner/src/transaction_manager.rs @@ -0,0 +1,102 @@ +use crate::session::ManagedSession; +use crate::transaction::CallOptions; +use crate::transaction_rw::{BeginError, ReadWriteTransaction}; + +/// TransactionManager manages a single session for executing multiple +/// read-write transactions with session reuse. This is particularly useful +/// for manual transaction retry loops where reusing the same session helps +/// maintain lock priority across retries. +/// +/// # Example +/// +/// ```rust,ignore +/// use google_cloud_spanner::client::Client; +/// use google_cloud_spanner::retry::TransactionRetry; +/// +/// async fn example(client: Client) -> Result<(), Status> { +/// let mut tm = client.transaction_manager().await?; +/// let retry = &mut TransactionRetry::new(); +/// +/// loop { +/// let tx = tm.begin_read_write_transaction().await?; +/// +/// let result = do_work(tx).await; +/// +/// match tx.end(result, None).await { +/// Ok((commit_result, success)) => return Ok(success), +/// Err(err) => retry.next(err).await? +/// } +/// } +/// } +/// ``` +pub struct TransactionManager { + session: Option, + transaction: Option, +} + +impl TransactionManager { + /// Creates a new TransactionManager with the given session. + pub(crate) fn new(session: ManagedSession) -> Self { + Self { + session: Some(session), + transaction: None, + } + } + + /// Begins a new read-write transaction, reusing the session from the + /// previous transaction if one exists. Returns a mutable reference to + /// the transaction which can be used to execute queries and mutations. + /// + /// The transaction must be ended by calling `end()` on the returned + /// reference before calling `begin_read_write_transaction()` again. + pub async fn begin_read_write_transaction(&mut self) -> Result<&mut ReadWriteTransaction, BeginError> { + // Extract session from previous transaction if it exists, otherwise use stored session + let session = if let Some(ref mut tx) = self.transaction { + tx.take_session() + .expect("transaction should have a session") + } else { + self.session.take().expect("manager should have a session") + }; + + // Create new transaction with the session + let new_tx = ReadWriteTransaction::begin(session, CallOptions::default(), None).await?; + + // Store the transaction and return a mutable reference + self.transaction = Some(new_tx); + Ok(self.transaction.as_mut().unwrap()) + } + + /// Begins a new read-write transaction with custom call options and transaction tag. + /// This is similar to `begin_read_write_transaction()` but allows specifying + /// custom options for the transaction. + pub async fn begin_read_write_transaction_with_options( + &mut self, + options: CallOptions, + transaction_tag: Option, + ) -> Result<&mut ReadWriteTransaction, BeginError> { + // Extract session from previous transaction if it exists, otherwise use stored session + let session = if let Some(ref mut tx) = self.transaction { + tx.take_session() + .expect("transaction should have a session") + } else { + self.session.take().expect("manager should have a session") + }; + + // Create new transaction with the session + let new_tx = ReadWriteTransaction::begin(session, options, transaction_tag).await?; + + // Store the transaction and return a mutable reference + self.transaction = Some(new_tx); + Ok(self.transaction.as_mut().unwrap()) + } +} + +impl Drop for TransactionManager { + fn drop(&mut self) { + // If there's a transaction, extract its session before dropping + // This ensures the session is properly returned to the pool + if let Some(ref mut tx) = self.transaction { + let _ = tx.take_session(); + } + } +} diff --git a/spanner/src/transaction_rw.rs b/spanner/src/transaction_rw.rs index fd6757d9..38faedde 100644 --- a/spanner/src/transaction_rw.rs +++ b/spanner/src/transaction_rw.rs @@ -120,6 +120,15 @@ pub struct BeginError { pub session: ManagedSession, } +impl std::fmt::Debug for BeginError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BeginError") + .field("status", &self.status) + .field("session", &"") + .finish() + } +} + impl ReadWriteTransaction { pub async fn begin( session: ManagedSession, diff --git a/spanner/tests/transaction_manager_test.rs b/spanner/tests/transaction_manager_test.rs new file mode 100644 index 00000000..c1a5f6fd --- /dev/null +++ b/spanner/tests/transaction_manager_test.rs @@ -0,0 +1,257 @@ +use serial_test::serial; +use time::OffsetDateTime; + +use common::*; +use google_cloud_spanner::key::Key; +use google_cloud_spanner::retry::TransactionRetry; +use google_cloud_spanner::row::Row; +use google_cloud_spanner::statement::Statement; + +mod common; + +#[ctor::ctor] +fn init() { + let filter = tracing_subscriber::filter::EnvFilter::from_default_env() + .add_directive("google_cloud_spanner=trace".parse().unwrap()); + let _ = tracing_subscriber::fmt().with_env_filter(filter).try_init(); + std::env::set_var("SPANNER_EMULATOR_HOST", "localhost:9010"); +} + +#[tokio::test] +#[serial] +async fn test_transaction_manager_basic() { + // Set up test data + let now = OffsetDateTime::now_utc(); + let data_client = create_data_client().await; + let user_id = format!("user_tm_basic_{}", now.unix_timestamp()); + + // Create initial user + let cr = data_client + .apply(vec![create_user_mutation(&user_id, &now)]) + .await + .unwrap(); + + // Test TransactionManager + let mut tm = data_client.transaction_manager().await.unwrap(); + let retry = &mut TransactionRetry::new(); + + let commit_timestamp = loop { + let tx = tm.begin_read_write_transaction().await.unwrap(); + + let result = async { + // Add character and item + let mut stmt1 = Statement::new( + "INSERT INTO UserCharacter (UserId,CharacterId,Level,UpdatedAt) \ + VALUES(@UserId,1,10,PENDING_COMMIT_TIMESTAMP())", + ); + stmt1.add_param("UserId", &user_id); + + let mut stmt2 = Statement::new( + "INSERT INTO UserItem (UserId,ItemId,Quantity,UpdatedAt) \ + VALUES(@UserId,100,500,PENDING_COMMIT_TIMESTAMP())", + ); + stmt2.add_param("UserId", &user_id); + + tx.update(stmt1).await?; + tx.update(stmt2).await + } + .await; + + match tx.end(result, None).await { + Ok((commit_result, _)) => { + assert!(commit_result.timestamp.is_some()); + let ts = commit_result.timestamp.unwrap(); + break OffsetDateTime::from_unix_timestamp(ts.seconds) + .unwrap() + .replace_nanosecond(ts.nanos as u32) + .unwrap(); + } + Err(err) => retry.next(err).await.unwrap(), + } + }; + + // Verify the data was written + let ts = cr.timestamp.unwrap(); + let user_commit_timestamp = OffsetDateTime::from_unix_timestamp(ts.seconds) + .unwrap() + .replace_nanosecond(ts.nanos as u32) + .unwrap(); + + verify_transaction_manager_data(&user_id, &now, &user_commit_timestamp, &commit_timestamp).await; +} + +#[tokio::test] +#[serial] +async fn test_transaction_manager_rollback() { + // Set up test data + let now = OffsetDateTime::now_utc(); + let data_client = create_data_client().await; + let user_id = format!("user_tm_rollback_{}", now.unix_timestamp()); + + let cr = data_client + .apply(vec![create_user_mutation(&user_id, &now)]) + .await + .unwrap(); + + // Test TransactionManager with rollback + { + let mut tm = data_client.transaction_manager().await.unwrap(); + let tx = tm.begin_read_write_transaction().await.unwrap(); + + let result = async { + // Try to update non-existent table (will cause rollback) + let mut stmt = Statement::new("UPDATE User SET NullableString = 'test' WHERE UserId = @UserId"); + stmt.add_param("UserId", &user_id); + tx.update(stmt).await?; + + // This should fail + let stmt2 = Statement::new("UPDATE NonExistentTable SET Column = 'value'"); + tx.update(stmt2).await + } + .await; + + let _ = tx.end(result, None).await; + } + + // Verify the data wasn't modified (rollback worked) + let mut tx = data_client.read_only_transaction().await.unwrap(); + let reader = tx.read("User", &user_columns(), Key::new(&user_id)).await.unwrap(); + let row: Row = all_rows(reader).await.unwrap().pop().unwrap(); + + let ts = cr.timestamp.unwrap(); + let ts = OffsetDateTime::from_unix_timestamp(ts.seconds) + .unwrap() + .replace_nanosecond(ts.nanos as u32) + .unwrap(); + assert_user_row(&row, &user_id, &now, &ts); +} + +#[tokio::test] +#[serial] +async fn test_transaction_manager_multiple_transactions() { + // Set up test data + let now = OffsetDateTime::now_utc(); + let data_client = create_data_client().await; + let user_id = format!("user_tm_multi_{}", now.unix_timestamp()); + + data_client + .apply(vec![create_user_mutation(&user_id, &now)]) + .await + .unwrap(); + + // Test multiple transactions with the same TransactionManager + let mut tm = data_client.transaction_manager().await.unwrap(); + + // First transaction: add character + { + let tx = tm.begin_read_write_transaction().await.unwrap(); + + let result = async { + let mut stmt = Statement::new( + "INSERT INTO UserCharacter (UserId,CharacterId,Level,UpdatedAt) \ + VALUES(@UserId,1,5,PENDING_COMMIT_TIMESTAMP())", + ); + stmt.add_param("UserId", &user_id); + tx.update(stmt).await + } + .await; + + match tx.end(result, None).await { + Ok(_) => (), + Err(err) => panic!("First transaction failed: {:?}", err), + } + } + + // Second transaction: add item (reusing the same manager/session) + { + let tx = tm.begin_read_write_transaction().await.unwrap(); + + let result = async { + let mut stmt = Statement::new( + "INSERT INTO UserItem (UserId,ItemId,Quantity,UpdatedAt) \ + VALUES(@UserId,200,300,PENDING_COMMIT_TIMESTAMP())", + ); + stmt.add_param("UserId", &user_id); + tx.update(stmt).await + } + .await; + + match tx.end(result, None).await { + Ok(_) => (), + Err(err) => panic!("Second transaction failed: {:?}", err), + } + } + + // Verify both transactions were successful + let mut tx = data_client.read_only_transaction().await.unwrap(); + let mut stmt = Statement::new( + "SELECT *, + ARRAY(SELECT AS STRUCT * FROM UserItem WHERE UserId = p.UserId) as UserItem, + ARRAY(SELECT AS STRUCT * FROM UserCharacter WHERE UserId = p.UserId) as UserCharacter + FROM User p WHERE UserId = @UserId", + ); + stmt.add_param("UserId", &user_id); + + let reader = tx.query(stmt).await.unwrap(); + let rows: Vec = all_rows(reader).await.unwrap(); + + assert_eq!(1, rows.len()); + let row = rows.first().unwrap(); + + let user_items = row.column_by_name::>("UserItem").unwrap(); + assert_eq!(1, user_items.len()); + assert_eq!(user_items[0].item_id, 200); + assert_eq!(user_items[0].quantity, 300); + + let user_characters = row.column_by_name::>("UserCharacter").unwrap(); + assert_eq!(1, user_characters.len()); + assert_eq!(user_characters[0].character_id, 1); + assert_eq!(user_characters[0].level, 5); +} + +async fn verify_transaction_manager_data( + user_id: &str, + now: &OffsetDateTime, + user_commit_timestamp: &OffsetDateTime, + commit_timestamp: &OffsetDateTime, +) { + let data_client = create_data_client().await; + let mut tx = data_client.read_only_transaction().await.unwrap(); + + let mut stmt = Statement::new( + "SELECT *, + ARRAY(SELECT AS STRUCT * FROM UserItem WHERE UserId = p.UserId) as UserItem, + ARRAY(SELECT AS STRUCT * FROM UserCharacter WHERE UserId = p.UserId) as UserCharacter + FROM User p WHERE UserId = @UserId", + ); + stmt.add_param("UserId", &user_id); + + let reader = tx.query(stmt).await.unwrap(); + let rows: Vec = all_rows(reader).await.unwrap(); + + assert_eq!(1, rows.len()); + let row = rows.first().unwrap(); + assert_user_row(row, user_id, now, user_commit_timestamp); + + let mut user_items = row.column_by_name::>("UserItem").unwrap(); + let first_item = user_items.pop().unwrap(); + assert_eq!(first_item.user_id, *user_id); + assert_eq!(first_item.item_id, 100); + assert_eq!(first_item.quantity, 500); + assert_eq!( + OffsetDateTime::from(first_item.updated_at).to_string(), + commit_timestamp.to_string() + ); + assert!(user_items.is_empty()); + + let mut user_characters = row.column_by_name::>("UserCharacter").unwrap(); + let first_character = user_characters.pop().unwrap(); + assert_eq!(first_character.user_id, *user_id); + assert_eq!(first_character.character_id, 1); + assert_eq!(first_character.level, 10); + assert_eq!( + OffsetDateTime::from(first_character.updated_at).to_string(), + commit_timestamp.to_string() + ); + assert!(user_characters.is_empty()); +} From 569f6a97c93638390978cffcb980cb014e45d683 Mon Sep 17 00:00:00 2001 From: Daniel Norberg Date: Sat, 8 Nov 2025 19:07:59 +0100 Subject: [PATCH 2/6] [spanner] Refactor TransactionManager to use client::Error and simplify Drop handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Improvements: - Use client::Error instead of BeginError for better API consistency with Client::begin_read_write_transaction() - Handle session preservation internally when begin fails, storing it back for retry - Remove redundant Drop implementation - ManagedSession::drop() already handles session pool return - Remove Debug implementation from BeginError as it's no longer needed Benefits: - Consistent error types across the API - Simpler, more idiomatic code - Better encapsulation of session management - Automatic cleanup through Rust's Drop chain 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- spanner/src/transaction_manager.rs | 49 ++++++++++++++++-------------- spanner/src/transaction_rw.rs | 9 ------ 2 files changed, 27 insertions(+), 31 deletions(-) diff --git a/spanner/src/transaction_manager.rs b/spanner/src/transaction_manager.rs index 130bf6ae..419570ea 100644 --- a/spanner/src/transaction_manager.rs +++ b/spanner/src/transaction_manager.rs @@ -1,6 +1,7 @@ +use crate::client::Error; use crate::session::ManagedSession; use crate::transaction::CallOptions; -use crate::transaction_rw::{BeginError, ReadWriteTransaction}; +use crate::transaction_rw::ReadWriteTransaction; /// TransactionManager manages a single session for executing multiple /// read-write transactions with session reuse. This is particularly useful @@ -49,7 +50,7 @@ impl TransactionManager { /// /// The transaction must be ended by calling `end()` on the returned /// reference before calling `begin_read_write_transaction()` again. - pub async fn begin_read_write_transaction(&mut self) -> Result<&mut ReadWriteTransaction, BeginError> { + pub async fn begin_read_write_transaction(&mut self) -> Result<&mut ReadWriteTransaction, Error> { // Extract session from previous transaction if it exists, otherwise use stored session let session = if let Some(ref mut tx) = self.transaction { tx.take_session() @@ -59,11 +60,18 @@ impl TransactionManager { }; // Create new transaction with the session - let new_tx = ReadWriteTransaction::begin(session, CallOptions::default(), None).await?; - - // Store the transaction and return a mutable reference - self.transaction = Some(new_tx); - Ok(self.transaction.as_mut().unwrap()) + match ReadWriteTransaction::begin(session, CallOptions::default(), None).await { + Ok(new_tx) => { + // Store the transaction and return a mutable reference + self.transaction = Some(new_tx); + Ok(self.transaction.as_mut().unwrap()) + } + Err(begin_error) => { + // Store session back for next retry attempt + self.session = Some(begin_error.session); + Err(begin_error.status.into()) + } + } } /// Begins a new read-write transaction with custom call options and transaction tag. @@ -73,7 +81,7 @@ impl TransactionManager { &mut self, options: CallOptions, transaction_tag: Option, - ) -> Result<&mut ReadWriteTransaction, BeginError> { + ) -> Result<&mut ReadWriteTransaction, Error> { // Extract session from previous transaction if it exists, otherwise use stored session let session = if let Some(ref mut tx) = self.transaction { tx.take_session() @@ -83,20 +91,17 @@ impl TransactionManager { }; // Create new transaction with the session - let new_tx = ReadWriteTransaction::begin(session, options, transaction_tag).await?; - - // Store the transaction and return a mutable reference - self.transaction = Some(new_tx); - Ok(self.transaction.as_mut().unwrap()) - } -} - -impl Drop for TransactionManager { - fn drop(&mut self) { - // If there's a transaction, extract its session before dropping - // This ensures the session is properly returned to the pool - if let Some(ref mut tx) = self.transaction { - let _ = tx.take_session(); + match ReadWriteTransaction::begin(session, options, transaction_tag).await { + Ok(new_tx) => { + // Store the transaction and return a mutable reference + self.transaction = Some(new_tx); + Ok(self.transaction.as_mut().unwrap()) + } + Err(begin_error) => { + // Store session back for next retry attempt + self.session = Some(begin_error.session); + Err(begin_error.status.into()) + } } } } diff --git a/spanner/src/transaction_rw.rs b/spanner/src/transaction_rw.rs index 38faedde..fd6757d9 100644 --- a/spanner/src/transaction_rw.rs +++ b/spanner/src/transaction_rw.rs @@ -120,15 +120,6 @@ pub struct BeginError { pub session: ManagedSession, } -impl std::fmt::Debug for BeginError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("BeginError") - .field("status", &self.status) - .field("session", &"") - .finish() - } -} - impl ReadWriteTransaction { pub async fn begin( session: ManagedSession, From d148c118895aa283c7d1a397a1ff60f6f20240a7 Mon Sep 17 00:00:00 2001 From: Daniel Norberg Date: Sat, 8 Nov 2025 19:20:59 +0100 Subject: [PATCH 3/6] [spanner] Eliminate duplicate code in TransactionManager by extracting common logic MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Refactoring: - Extract common transaction begin logic into private begin_internal() helper method - Simplify public begin_read_write_transaction() methods to thin delegates - Consolidates 31 lines of duplicate code (90% identical) into single implementation Benefits: - Single source of truth for transaction begin logic - Bug fixes and changes only needed in one place - 11 lines removed (11% reduction in file size) - Improved maintainability with no behavior changes Before: Two 23-line methods with duplicate logic After: Two 2-line delegates + one 23-line helper 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- spanner/src/transaction_manager.rs | 31 ++++++++++-------------------- 1 file changed, 10 insertions(+), 21 deletions(-) diff --git a/spanner/src/transaction_manager.rs b/spanner/src/transaction_manager.rs index 419570ea..64c3fb88 100644 --- a/spanner/src/transaction_manager.rs +++ b/spanner/src/transaction_manager.rs @@ -51,27 +51,7 @@ impl TransactionManager { /// The transaction must be ended by calling `end()` on the returned /// reference before calling `begin_read_write_transaction()` again. pub async fn begin_read_write_transaction(&mut self) -> Result<&mut ReadWriteTransaction, Error> { - // Extract session from previous transaction if it exists, otherwise use stored session - let session = if let Some(ref mut tx) = self.transaction { - tx.take_session() - .expect("transaction should have a session") - } else { - self.session.take().expect("manager should have a session") - }; - - // Create new transaction with the session - match ReadWriteTransaction::begin(session, CallOptions::default(), None).await { - Ok(new_tx) => { - // Store the transaction and return a mutable reference - self.transaction = Some(new_tx); - Ok(self.transaction.as_mut().unwrap()) - } - Err(begin_error) => { - // Store session back for next retry attempt - self.session = Some(begin_error.session); - Err(begin_error.status.into()) - } - } + self.begin_internal(CallOptions::default(), None).await } /// Begins a new read-write transaction with custom call options and transaction tag. @@ -81,6 +61,15 @@ impl TransactionManager { &mut self, options: CallOptions, transaction_tag: Option, + ) -> Result<&mut ReadWriteTransaction, Error> { + self.begin_internal(options, transaction_tag).await + } + + /// Internal helper that contains the common logic for beginning a transaction. + async fn begin_internal( + &mut self, + options: CallOptions, + transaction_tag: Option, ) -> Result<&mut ReadWriteTransaction, Error> { // Extract session from previous transaction if it exists, otherwise use stored session let session = if let Some(ref mut tx) = self.transaction { From 8a4a3e5912d10ff7048e96ada57ffe0b3830ee86 Mon Sep 17 00:00:00 2001 From: Daniel Norberg Date: Sat, 8 Nov 2025 19:23:41 +0100 Subject: [PATCH 4/6] [spanner] Simplify TransactionManager by removing unnecessary helper method MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Refactoring: - Remove begin_internal() private helper method - Change begin_read_write_transaction() to directly delegate to begin_read_write_transaction_with_options() - Move implementation logic into begin_read_write_transaction_with_options() Benefits: - Simpler, more idiomatic Rust pattern (convenience method → full method) - Removes unnecessary indirection through private helper - 8 lines removed (8% reduction) - Clearer code flow with direct delegation This follows the common Rust pattern where simple methods call more complex ones with default parameters, similar to Vec::new() → Vec::with_capacity(0). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- spanner/src/transaction_manager.rs | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/spanner/src/transaction_manager.rs b/spanner/src/transaction_manager.rs index 64c3fb88..4d9d5704 100644 --- a/spanner/src/transaction_manager.rs +++ b/spanner/src/transaction_manager.rs @@ -51,7 +51,8 @@ impl TransactionManager { /// The transaction must be ended by calling `end()` on the returned /// reference before calling `begin_read_write_transaction()` again. pub async fn begin_read_write_transaction(&mut self) -> Result<&mut ReadWriteTransaction, Error> { - self.begin_internal(CallOptions::default(), None).await + self.begin_read_write_transaction_with_options(CallOptions::default(), None) + .await } /// Begins a new read-write transaction with custom call options and transaction tag. @@ -61,15 +62,6 @@ impl TransactionManager { &mut self, options: CallOptions, transaction_tag: Option, - ) -> Result<&mut ReadWriteTransaction, Error> { - self.begin_internal(options, transaction_tag).await - } - - /// Internal helper that contains the common logic for beginning a transaction. - async fn begin_internal( - &mut self, - options: CallOptions, - transaction_tag: Option, ) -> Result<&mut ReadWriteTransaction, Error> { // Extract session from previous transaction if it exists, otherwise use stored session let session = if let Some(ref mut tx) = self.transaction { From b23f1f43532b292c23402c1629742580279af0d6 Mon Sep 17 00:00:00 2001 From: Daniel Norberg Date: Sat, 8 Nov 2025 20:22:54 +0100 Subject: [PATCH 5/6] [spanner] Add transaction() accessor method to TransactionManager MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Features: - Add transaction() method that returns Option<&mut ReadWriteTransaction> - Returns None if no transaction is active, Some if active - Enables checking transaction state and accessing existing transaction - Includes comprehensive test coverage Use cases: - Check if a transaction exists without starting one - Access existing transaction for additional operations - Conditional logic based on transaction state Test added: - test_transaction_accessor() validates behavior before/after begin - Tests functional access through the accessor - Verifies data written through accessor is committed 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- spanner/src/transaction_manager.rs | 21 ++++++++ spanner/tests/transaction_manager_test.rs | 58 +++++++++++++++++++++++ 2 files changed, 79 insertions(+) diff --git a/spanner/src/transaction_manager.rs b/spanner/src/transaction_manager.rs index 4d9d5704..c5a6c4ac 100644 --- a/spanner/src/transaction_manager.rs +++ b/spanner/src/transaction_manager.rs @@ -44,6 +44,27 @@ impl TransactionManager { } } + /// Returns a mutable reference to the current transaction, if one has been started. + /// + /// Returns `None` if no transaction is currently active. Call + /// `begin_read_write_transaction()` to start a new transaction. + /// + /// # Example + /// + /// ```rust,ignore + /// let mut tm = client.transaction_manager().await?; + /// + /// // Initially returns None + /// assert!(tm.transaction().is_none()); + /// + /// // After begin, returns Some + /// let tx = tm.begin_read_write_transaction().await?; + /// assert!(tm.transaction().is_some()); + /// ``` + pub fn transaction(&mut self) -> Option<&mut ReadWriteTransaction> { + self.transaction.as_mut() + } + /// Begins a new read-write transaction, reusing the session from the /// previous transaction if one exists. Returns a mutable reference to /// the transaction which can be used to execute queries and mutations. diff --git a/spanner/tests/transaction_manager_test.rs b/spanner/tests/transaction_manager_test.rs index c1a5f6fd..87ce2bc8 100644 --- a/spanner/tests/transaction_manager_test.rs +++ b/spanner/tests/transaction_manager_test.rs @@ -255,3 +255,61 @@ async fn verify_transaction_manager_data( ); assert!(user_characters.is_empty()); } + +#[tokio::test] +#[serial] +async fn test_transaction_accessor() { + // Set up test data + let now = OffsetDateTime::now_utc(); + let data_client = create_data_client().await; + let user_id = format!("user_tm_accessor_{}", now.unix_timestamp()); + + data_client + .apply(vec![create_user_mutation(&user_id, &now)]) + .await + .unwrap(); + + // Test transaction accessor method + let mut tm = data_client.transaction_manager().await.unwrap(); + + // Initially should return None + assert!(tm.transaction().is_none()); + + // Begin a transaction + let _tx = tm.begin_read_write_transaction().await.unwrap(); + + // Now should return Some + assert!(tm.transaction().is_some()); + + // Should be able to use the accessor to perform operations + if let Some(tx) = tm.transaction() { + let mut stmt = Statement::new( + "INSERT INTO UserItem (UserId,ItemId,Quantity,UpdatedAt) \ + VALUES(@UserId,999,123,PENDING_COMMIT_TIMESTAMP())", + ); + stmt.add_param("UserId", &user_id); + tx.update(stmt).await.unwrap(); + + // Commit via the accessor + let result: Result<(), google_cloud_spanner::client::Error> = Ok(()); + match tx.end(result, None).await { + Ok(_) => (), + Err(err) => panic!("Commit failed: {:?}", err), + } + } + + // Verify the data was actually written + let mut tx = data_client.read_only_transaction().await.unwrap(); + let mut stmt = Statement::new( + "SELECT * FROM UserItem WHERE UserId = @UserId AND ItemId = 999", + ); + stmt.add_param("UserId", &user_id); + + let reader = tx.query(stmt).await.unwrap(); + let rows: Vec = all_rows(reader).await.unwrap(); + + assert_eq!(1, rows.len()); + let row = rows.first().unwrap(); + assert_eq!(row.column_by_name::("ItemId").unwrap(), 999); + assert_eq!(row.column_by_name::("Quantity").unwrap(), 123); +} From e574648f37a217aa73d31ec3be4148e6d49b9012 Mon Sep 17 00:00:00 2001 From: Daniel Norberg Date: Thu, 20 Nov 2025 22:20:08 +0100 Subject: [PATCH 6/6] [spanner] Pass disable_route_to_leader through TransactionManager MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add disable_route_to_leader field to TransactionManager and pass it through to ReadWriteTransaction::begin. This ensures that TransactionManager respects the client's disable_route_to_leader setting when creating transactions. Changes: - Add disable_route_to_leader field to TransactionManager struct - Update TransactionManager::new to accept disable_route_to_leader parameter - Pass disable_route_to_leader to ReadWriteTransaction::begin in begin_read_write_transaction_with_options - Update Client::transaction_manager to pass disable_route_to_leader from client config 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- spanner/src/client.rs | 2 +- spanner/src/transaction_manager.rs | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/spanner/src/client.rs b/spanner/src/client.rs index 3be14e25..7969bd8e 100644 --- a/spanner/src/client.rs +++ b/spanner/src/client.rs @@ -639,7 +639,7 @@ impl Client { /// ``` pub async fn transaction_manager(&self) -> Result { let session = self.get_session().await?; - Ok(TransactionManager::new(session)) + Ok(TransactionManager::new(session, self.disable_route_to_leader)) } /// Get open session count. diff --git a/spanner/src/transaction_manager.rs b/spanner/src/transaction_manager.rs index c5a6c4ac..810565a7 100644 --- a/spanner/src/transaction_manager.rs +++ b/spanner/src/transaction_manager.rs @@ -33,14 +33,16 @@ use crate::transaction_rw::ReadWriteTransaction; pub struct TransactionManager { session: Option, transaction: Option, + disable_route_to_leader: bool, } impl TransactionManager { /// Creates a new TransactionManager with the given session. - pub(crate) fn new(session: ManagedSession) -> Self { + pub(crate) fn new(session: ManagedSession, disable_route_to_leader: bool) -> Self { Self { session: Some(session), transaction: None, + disable_route_to_leader, } } @@ -93,7 +95,7 @@ impl TransactionManager { }; // Create new transaction with the session - match ReadWriteTransaction::begin(session, options, transaction_tag).await { + match ReadWriteTransaction::begin(session, options, transaction_tag, self.disable_route_to_leader).await { Ok(new_tx) => { // Store the transaction and return a mutable reference self.transaction = Some(new_tx);