From 4822e26b47ca0a49c7cdb2d070a867f8ede720e4 Mon Sep 17 00:00:00 2001 From: Tom Waite Date: Tue, 3 Feb 2026 12:57:58 -0800 Subject: [PATCH 1/8] feat: add timeout protection --- bedrock/src/migration/controller.rs | 48 +++++++++++++++++++++++++---- 1 file changed, 42 insertions(+), 6 deletions(-) diff --git a/bedrock/src/migration/controller.rs b/bedrock/src/migration/controller.rs index 6e540818..4339e636 100644 --- a/bedrock/src/migration/controller.rs +++ b/bedrock/src/migration/controller.rs @@ -11,6 +11,7 @@ use tokio::sync::Mutex; const MIGRATION_KEY_PREFIX: &str = "migration:"; const DEFAULT_RETRY_DELAY_MS: i64 = 60_000; // 1 minute const MAX_RETRY_DELAY_MS: i64 = 86_400_000; // 1 day +const MIGRATION_TIMEOUT_SECS: u64 = 20; // 20 seconds /// Global lock to prevent concurrent migration runs across all controller instances. /// This is a process-wide coordination mechanism that ensures only one migration @@ -172,18 +173,30 @@ impl MigrationController { // Check if migration is applicable and should run, based on processor defined logic. // This checks actual state (e.g., "does v4 credential exist?") to ensure idempotency // even if migration record is deleted (reinstall scenario). - match processor.is_applicable().await { - Ok(false) => { + // Wrap in timeout to prevent hanging indefinitely + let is_applicable_result = tokio::time::timeout( + tokio::time::Duration::from_secs(MIGRATION_TIMEOUT_SECS), + processor.is_applicable(), + ) + .await; + + match is_applicable_result { + Ok(Ok(false)) => { info!("Migration {migration_id} not applicable, skipping"); summary.skipped += 1; continue; } - Err(e) => { + Ok(Err(e)) => { error!("Failed to check applicability for {migration_id}: {e:?}"); summary.skipped += 1; continue; } - Ok(true) => { + Err(_) => { + error!("Migration {migration_id} is_applicable() timed out after {MIGRATION_TIMEOUT_SECS} seconds, skipping"); + summary.skipped += 1; + continue; + } + Ok(Ok(true)) => { // Continue with execution. Fall through to the execution block below. } } @@ -206,8 +219,30 @@ impl MigrationController { // Save record before execution so we don't lose progress if the app crashes mid-migration self.save_record(&migration_id, &record)?; - // Execute - match processor.execute().await { + // Execute with timeout to prevent indefinite hangs + let execute_result = tokio::time::timeout( + tokio::time::Duration::from_secs(MIGRATION_TIMEOUT_SECS), + processor.execute(), + ) + .await; + + match execute_result { + Err(_) => { + // Timeout occurred + error!("Migration {migration_id} timed out after {MIGRATION_TIMEOUT_SECS} seconds"); + record.status = MigrationStatus::FailedRetryable; + record.last_error_code = Some("MIGRATION_TIMEOUT".to_string()); + record.last_error_message = + Some(format!("Migration execution timed out after {MIGRATION_TIMEOUT_SECS} seconds")); + + // Schedule retry with backoff + let retry_delay_ms = calculate_backoff_delay(record.attempts); + record.next_attempt_at = + Some(Utc::now() + Duration::milliseconds(retry_delay_ms)); + + summary.failed_retryable += 1; + } + Ok(result) => match result { Ok(ProcessorResult::Success) => { info!("Migration {migration_id} succeeded"); record.status = MigrationStatus::Succeeded; @@ -269,6 +304,7 @@ impl MigrationController { summary.failed_retryable += 1; } + } } // Save the final result (success/failure) to storage From 4d229a5e6ff6187a4bf9bec44f9657efd0c097d7 Mon Sep 17 00:00:00 2001 From: Tom Waite Date: Tue, 3 Feb 2026 13:03:26 -0800 Subject: [PATCH 2/8] style: lint --- bedrock/src/migration/controller.rs | 121 ++++++++++++++-------------- 1 file changed, 61 insertions(+), 60 deletions(-) diff --git a/bedrock/src/migration/controller.rs b/bedrock/src/migration/controller.rs index 4339e636..71b92e1c 100644 --- a/bedrock/src/migration/controller.rs +++ b/bedrock/src/migration/controller.rs @@ -243,68 +243,69 @@ impl MigrationController { summary.failed_retryable += 1; } Ok(result) => match result { - Ok(ProcessorResult::Success) => { - info!("Migration {migration_id} succeeded"); - record.status = MigrationStatus::Succeeded; - record.completed_at = Some(Utc::now()); - record.last_error_code = None; - record.last_error_message = None; - record.next_attempt_at = None; // Clear retry time - summary.succeeded += 1; - } - Ok(ProcessorResult::Retryable { - error_code, - error_message, - retry_after_ms, - }) => { - warn!("Migration {migration_id} failed (retryable): {error_code} - {error_message}"); - record.status = MigrationStatus::FailedRetryable; - record.last_error_code = Some(error_code); - record.last_error_message = Some(error_message); - - // Retry time is calculated according to exponential backoff and set on the - // record.next_attempt_at field. When the app is next opened and the - // migration is run again; the controller will check whether to run the - // migration again based on the record.next_attempt_at field. - let retry_delay_ms = retry_after_ms - .unwrap_or_else(|| calculate_backoff_delay(record.attempts)); - record.next_attempt_at = - Some(Utc::now() + Duration::milliseconds(retry_delay_ms)); - - summary.failed_retryable += 1; - } - Ok(ProcessorResult::Terminal { - error_code, - error_message, - }) => { - error!("Migration {migration_id} failed (terminal): {error_code} - {error_message}"); - record.status = MigrationStatus::FailedTerminal; - record.last_error_code = Some(error_code); - record.last_error_message = Some(error_message); - record.next_attempt_at = None; // Clear retry time - summary.failed_terminal += 1; - } - Ok(ProcessorResult::BlockedUserAction { reason }) => { - warn!("Migration {migration_id} blocked: {reason}"); - record.status = MigrationStatus::BlockedUserAction; - record.last_error_message = Some(reason); - record.next_attempt_at = None; // Clear retry time - summary.blocked += 1; - } - Err(e) => { - error!("Migration {migration_id} threw error: {e:?}"); - record.status = MigrationStatus::FailedRetryable; - record.last_error_code = Some("UNEXPECTED_ERROR".to_string()); - record.last_error_message = Some(format!("{e:?}")); + Ok(ProcessorResult::Success) => { + info!("Migration {migration_id} succeeded"); + record.status = MigrationStatus::Succeeded; + record.completed_at = Some(Utc::now()); + record.last_error_code = None; + record.last_error_message = None; + record.next_attempt_at = None; // Clear retry time + summary.succeeded += 1; + } + Ok(ProcessorResult::Retryable { + error_code, + error_message, + retry_after_ms, + }) => { + warn!("Migration {migration_id} failed (retryable): {error_code} - {error_message}"); + record.status = MigrationStatus::FailedRetryable; + record.last_error_code = Some(error_code); + record.last_error_message = Some(error_message); + + // Retry time is calculated according to exponential backoff and set on the + // record.next_attempt_at field. When the app is next opened and the + // migration is run again; the controller will check whether to run the + // migration again based on the record.next_attempt_at field. + let retry_delay_ms = retry_after_ms.unwrap_or_else(|| { + calculate_backoff_delay(record.attempts) + }); + record.next_attempt_at = + Some(Utc::now() + Duration::milliseconds(retry_delay_ms)); + + summary.failed_retryable += 1; + } + Ok(ProcessorResult::Terminal { + error_code, + error_message, + }) => { + error!("Migration {migration_id} failed (terminal): {error_code} - {error_message}"); + record.status = MigrationStatus::FailedTerminal; + record.last_error_code = Some(error_code); + record.last_error_message = Some(error_message); + record.next_attempt_at = None; // Clear retry time + summary.failed_terminal += 1; + } + Ok(ProcessorResult::BlockedUserAction { reason }) => { + warn!("Migration {migration_id} blocked: {reason}"); + record.status = MigrationStatus::BlockedUserAction; + record.last_error_message = Some(reason); + record.next_attempt_at = None; // Clear retry time + summary.blocked += 1; + } + Err(e) => { + error!("Migration {migration_id} threw error: {e:?}"); + record.status = MigrationStatus::FailedRetryable; + record.last_error_code = Some("UNEXPECTED_ERROR".to_string()); + record.last_error_message = Some(format!("{e:?}")); - // Schedule retry with backoff - let retry_delay_ms = calculate_backoff_delay(record.attempts); - record.next_attempt_at = - Some(Utc::now() + Duration::milliseconds(retry_delay_ms)); + // Schedule retry with backoff + let retry_delay_ms = calculate_backoff_delay(record.attempts); + record.next_attempt_at = + Some(Utc::now() + Duration::milliseconds(retry_delay_ms)); - summary.failed_retryable += 1; - } - } + summary.failed_retryable += 1; + } + }, } // Save the final result (success/failure) to storage From 578492779be3d4e4c1eed75f9eb828506eadbe49 Mon Sep 17 00:00:00 2001 From: Tom Waite Date: Tue, 3 Feb 2026 13:03:36 -0800 Subject: [PATCH 3/8] test: units for timeouts --- bedrock/src/migration/controller.rs | 200 ++++++++++++++++++++++++++++ 1 file changed, 200 insertions(+) diff --git a/bedrock/src/migration/controller.rs b/bedrock/src/migration/controller.rs index 71b92e1c..d5b25709 100644 --- a/bedrock/src/migration/controller.rs +++ b/bedrock/src/migration/controller.rs @@ -762,4 +762,204 @@ mod tests { // Processor should only execute once assert_eq!(processor.execution_count(), 1); } + + /// Test processor that hangs indefinitely during execute + struct HangingExecuteProcessor { + id: String, + } + + impl HangingExecuteProcessor { + fn new(id: &str) -> Self { + Self { id: id.to_string() } + } + } + + #[async_trait] + impl MigrationProcessor for HangingExecuteProcessor { + fn migration_id(&self) -> String { + self.id.clone() + } + + async fn is_applicable(&self) -> Result { + Ok(true) + } + + async fn execute(&self) -> Result { + // Hang longer than the test timeout (1 second) + sleep(Duration::from_secs(10)).await; + Ok(ProcessorResult::Success) + } + } + + /// Test processor that hangs indefinitely during is_applicable check + struct HangingApplicabilityProcessor { + id: String, + } + + impl HangingApplicabilityProcessor { + fn new(id: &str) -> Self { + Self { id: id.to_string() } + } + } + + #[async_trait] + impl MigrationProcessor for HangingApplicabilityProcessor { + fn migration_id(&self) -> String { + self.id.clone() + } + + async fn is_applicable(&self) -> Result { + // Hang longer than the test timeout (1 second) + sleep(Duration::from_secs(10)).await; + Ok(true) + } + + async fn execute(&self) -> Result { + Ok(ProcessorResult::Success) + } + } + + #[tokio::test] + #[serial] + async fn test_execute_timeout_marks_migration_as_retryable() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + let processor = Arc::new(HangingExecuteProcessor::new("test.migration.v1")); + let controller = + MigrationController::with_processors(kv_store.clone(), vec![processor]); + + // Run migrations - should timeout + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let summary = result.unwrap(); + assert_eq!(summary.total, 1); + assert_eq!(summary.failed_retryable, 1); + assert_eq!(summary.succeeded, 0); + + // Verify the migration record was saved with timeout error + let key = format!("{MIGRATION_KEY_PREFIX}test.migration.v1"); + let record_json = kv_store.get(key).expect("Record should exist"); + let record: MigrationRecord = + serde_json::from_str(&record_json).expect("Should deserialize"); + + assert!(matches!(record.status, MigrationStatus::FailedRetryable)); + assert_eq!( + record.last_error_code, + Some("MIGRATION_TIMEOUT".to_string()) + ); + assert!(record + .last_error_message + .unwrap() + .contains("timed out after")); + assert!(record.next_attempt_at.is_some()); // Should schedule retry + assert_eq!(record.attempts, 1); + } + + #[tokio::test] + #[serial] + async fn test_is_applicable_timeout_skips_migration() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + let processor = + Arc::new(HangingApplicabilityProcessor::new("test.migration.v1")); + let controller = + MigrationController::with_processors(kv_store.clone(), vec![processor]); + + // Run migrations - should skip due to is_applicable timeout + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let summary = result.unwrap(); + assert_eq!(summary.total, 1); + assert_eq!(summary.skipped, 1); + assert_eq!(summary.succeeded, 0); + assert_eq!(summary.failed_retryable, 0); + + // Verify the migration record was NOT created or updated (skipped before execution) + let key = format!("{MIGRATION_KEY_PREFIX}test.migration.v1"); + let record_result = kv_store.get(key); + // Record might exist from attempt to check applicability, but should not show InProgress + if let Ok(record_json) = record_result { + let record: MigrationRecord = + serde_json::from_str(&record_json).expect("Should deserialize"); + // Should not be InProgress or Succeeded + assert!( + !matches!(record.status, MigrationStatus::InProgress) + && !matches!(record.status, MigrationStatus::Succeeded) + ); + } + } + + #[tokio::test] + #[serial] + async fn test_timeout_does_not_block_subsequent_migrations() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + + // First processor will timeout, second should succeed + let processor1 = Arc::new(HangingExecuteProcessor::new("test.migration1.v1")); + let processor2 = Arc::new(TestProcessor::new("test.migration2.v1")); + + let controller = MigrationController::with_processors( + kv_store.clone(), + vec![processor1, processor2.clone()], + ); + + // Run migrations + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let summary = result.unwrap(); + assert_eq!(summary.total, 2); + assert_eq!(summary.failed_retryable, 1); // First migration timed out + assert_eq!(summary.succeeded, 1); // Second migration succeeded + + // Verify first migration is marked as retryable + let key1 = format!("{MIGRATION_KEY_PREFIX}test.migration1.v1"); + let record1_json = kv_store.get(key1).expect("Migration 1 record should exist"); + let record1: MigrationRecord = + serde_json::from_str(&record1_json).expect("Should deserialize"); + assert!(matches!(record1.status, MigrationStatus::FailedRetryable)); + assert_eq!( + record1.last_error_code, + Some("MIGRATION_TIMEOUT".to_string()) + ); + + // Verify second migration succeeded + let key2 = format!("{MIGRATION_KEY_PREFIX}test.migration2.v1"); + let record2_json = kv_store.get(key2).expect("Migration 2 record should exist"); + let record2: MigrationRecord = + serde_json::from_str(&record2_json).expect("Should deserialize"); + assert!(matches!(record2.status, MigrationStatus::Succeeded)); + + // Verify second processor actually executed + assert_eq!(processor2.execution_count(), 1); + } + + #[tokio::test] + #[serial] + async fn test_timeout_with_exponential_backoff() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + let processor = Arc::new(HangingExecuteProcessor::new("test.migration.v1")); + let controller = + MigrationController::with_processors(kv_store.clone(), vec![processor]); + + // Run migrations multiple times to test backoff + for attempt in 1..=3 { + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let key = format!("{MIGRATION_KEY_PREFIX}test.migration.v1"); + let record_json = kv_store.get(key.clone()).expect("Record should exist"); + let record: MigrationRecord = + serde_json::from_str(&record_json).expect("Should deserialize"); + + assert_eq!(record.attempts, attempt); + assert!(record.next_attempt_at.is_some()); + + // Clear next_attempt_at to allow immediate retry in test + let mut record_for_retry = record; + record_for_retry.next_attempt_at = None; + let json = serde_json::to_string(&record_for_retry).unwrap(); + kv_store.set(key, json).unwrap(); + } + } } From 29bd749d56f0c0f82ea2493f42ea7cb5178e0b98 Mon Sep 17 00:00:00 2001 From: Tom Waite Date: Tue, 3 Feb 2026 13:05:52 -0800 Subject: [PATCH 4/8] test: reduce timeout threshold for tests --- bedrock/src/migration/controller.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/bedrock/src/migration/controller.rs b/bedrock/src/migration/controller.rs index d5b25709..792e9c8d 100644 --- a/bedrock/src/migration/controller.rs +++ b/bedrock/src/migration/controller.rs @@ -11,7 +11,12 @@ use tokio::sync::Mutex; const MIGRATION_KEY_PREFIX: &str = "migration:"; const DEFAULT_RETRY_DELAY_MS: i64 = 60_000; // 1 minute const MAX_RETRY_DELAY_MS: i64 = 86_400_000; // 1 day -const MIGRATION_TIMEOUT_SECS: u64 = 20; // 20 seconds + +// Use a shorter timeout in tests to speed up test execution +#[cfg(not(test))] +const MIGRATION_TIMEOUT_SECS: u64 = 20; // 20 seconds in production +#[cfg(test)] +const MIGRATION_TIMEOUT_SECS: u64 = 1; // 1 second in tests /// Global lock to prevent concurrent migration runs across all controller instances. /// This is a process-wide coordination mechanism that ensures only one migration From c7a17e2081f2c5b6b12746d91338f5f48136fb3c Mon Sep 17 00:00:00 2001 From: Tom Waite Date: Tue, 3 Feb 2026 13:15:38 -0800 Subject: [PATCH 5/8] style: various lint fixes --- bedrock/src/migration/controller.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bedrock/src/migration/controller.rs b/bedrock/src/migration/controller.rs index 792e9c8d..bf1cfc83 100644 --- a/bedrock/src/migration/controller.rs +++ b/bedrock/src/migration/controller.rs @@ -12,9 +12,9 @@ const MIGRATION_KEY_PREFIX: &str = "migration:"; const DEFAULT_RETRY_DELAY_MS: i64 = 60_000; // 1 minute const MAX_RETRY_DELAY_MS: i64 = 86_400_000; // 1 day -// Use a shorter timeout in tests to speed up test execution #[cfg(not(test))] const MIGRATION_TIMEOUT_SECS: u64 = 20; // 20 seconds in production +// Use a shorter timeout in tests to speed up test ex #[cfg(test)] const MIGRATION_TIMEOUT_SECS: u64 = 1; // 1 second in tests @@ -796,7 +796,7 @@ mod tests { } } - /// Test processor that hangs indefinitely during is_applicable check + /// Test processor that hangs indefinitely during `is_applicable` check struct HangingApplicabilityProcessor { id: String, } From 1dd4a026315eec556ffe6d24abad8a842921a027 Mon Sep 17 00:00:00 2001 From: Tom Waite Date: Tue, 3 Feb 2026 13:17:12 -0800 Subject: [PATCH 6/8] style: more lint --- bedrock/src/migration/controller.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bedrock/src/migration/controller.rs b/bedrock/src/migration/controller.rs index bf1cfc83..d8844052 100644 --- a/bedrock/src/migration/controller.rs +++ b/bedrock/src/migration/controller.rs @@ -14,7 +14,7 @@ const MAX_RETRY_DELAY_MS: i64 = 86_400_000; // 1 day #[cfg(not(test))] const MIGRATION_TIMEOUT_SECS: u64 = 20; // 20 seconds in production -// Use a shorter timeout in tests to speed up test ex + // Use a shorter timeout in tests to speed up test ex #[cfg(test)] const MIGRATION_TIMEOUT_SECS: u64 = 1; // 1 second in tests From a00dad52a4318950bfc57aa3f95b3e2aaa9d533a Mon Sep 17 00:00:00 2001 From: Tom Waite Date: Tue, 3 Feb 2026 14:13:01 -0800 Subject: [PATCH 7/8] docs: timeout strategy --- bedrock/src/migration/controller.rs | 10 ++++++++++ bedrock/src/migration/processor.rs | 17 +++++++++++++++-- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/bedrock/src/migration/controller.rs b/bedrock/src/migration/controller.rs index d8844052..5bc633ea 100644 --- a/bedrock/src/migration/controller.rs +++ b/bedrock/src/migration/controller.rs @@ -97,6 +97,16 @@ impl MigrationController { /// If another migration is already in progress when this method is called, it will return /// immediately with an `InvalidOperation` error rather than waiting. /// + /// # Timeouts + /// + /// Each migration is subject to a 20-second timeout for both applicability checks and execution. + /// If a migration times out: + /// - During `is_applicable()`: The migration is skipped + /// - During `execute()`: The migration is marked as retryable and scheduled for retry with exponential backoff + /// + /// Subsequent migrations will continue regardless of timeouts, ensuring one slow migration + /// doesn't block the entire migration system. + /// /// # Errors /// /// Returns `MigrationError::InvalidOperation` if another migration run is already in progress. diff --git a/bedrock/src/migration/processor.rs b/bedrock/src/migration/processor.rs index 7c637fd0..ce82ed43 100644 --- a/bedrock/src/migration/processor.rs +++ b/bedrock/src/migration/processor.rs @@ -33,6 +33,21 @@ pub enum ProcessorResult { } /// Trait that all migration processors must implement +/// +/// # Timeouts and Cancellation Safety +/// +/// Both [`is_applicable`](Self::is_applicable) and [`execute`](Self::execute) are subject to timeouts +/// (20 seconds in production). When a timeout occurs, the future is dropped and the migration +/// is marked as failed (for `execute`) or skipped (for `is_applicable`). +/// +/// **IMPORTANT**: Implementations MUST be cancellation-safe: +/// +/// - **DO NOT** spawn background tasks using `tokio::spawn`, `std::thread::spawn`, or similar +/// that will continue running after the timeout +/// - **DO NOT** use blocking operations or FFI calls without proper cleanup +/// - **ENSURE** all work stops when the future is dropped (cooperative cancellation) +/// - **MAKE** migrations idempotent so partial execution can be safely retried +/// #[uniffi::export(with_foreign)] #[async_trait] pub trait MigrationProcessor: Send + Sync { @@ -46,7 +61,6 @@ pub trait MigrationProcessor: Send + Sync { /// to determine if the migration needs to run. This ensures the system is /// truly idempotent and handles edge cases gracefully. /// - /// /// # Returns /// - `Ok(true)` if the migration should run /// - `Ok(false)` if the migration should be skipped @@ -54,6 +68,5 @@ pub trait MigrationProcessor: Send + Sync { async fn is_applicable(&self) -> Result; /// Execute the migration - /// Called by the controller when the migration is ready to run async fn execute(&self) -> Result; } From 2b53c9f7d109d5407b126547818a528663cd6c5f Mon Sep 17 00:00:00 2001 From: Tom Waite Date: Tue, 3 Feb 2026 14:15:08 -0800 Subject: [PATCH 8/8] docs: cleanup comment --- bedrock/src/migration/controller.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/bedrock/src/migration/controller.rs b/bedrock/src/migration/controller.rs index 5bc633ea..5364e70b 100644 --- a/bedrock/src/migration/controller.rs +++ b/bedrock/src/migration/controller.rs @@ -14,7 +14,6 @@ const MAX_RETRY_DELAY_MS: i64 = 86_400_000; // 1 day #[cfg(not(test))] const MIGRATION_TIMEOUT_SECS: u64 = 20; // 20 seconds in production - // Use a shorter timeout in tests to speed up test ex #[cfg(test)] const MIGRATION_TIMEOUT_SECS: u64 = 1; // 1 second in tests