From 4822e26b47ca0a49c7cdb2d070a867f8ede720e4 Mon Sep 17 00:00:00 2001 From: Tom Waite Date: Tue, 3 Feb 2026 12:57:58 -0800 Subject: [PATCH 01/18] feat: add timeout protection --- bedrock/src/migration/controller.rs | 48 +++++++++++++++++++++++++---- 1 file changed, 42 insertions(+), 6 deletions(-) diff --git a/bedrock/src/migration/controller.rs b/bedrock/src/migration/controller.rs index 6e540818..4339e636 100644 --- a/bedrock/src/migration/controller.rs +++ b/bedrock/src/migration/controller.rs @@ -11,6 +11,7 @@ use tokio::sync::Mutex; const MIGRATION_KEY_PREFIX: &str = "migration:"; const DEFAULT_RETRY_DELAY_MS: i64 = 60_000; // 1 minute const MAX_RETRY_DELAY_MS: i64 = 86_400_000; // 1 day +const MIGRATION_TIMEOUT_SECS: u64 = 20; // 20 seconds /// Global lock to prevent concurrent migration runs across all controller instances. /// This is a process-wide coordination mechanism that ensures only one migration @@ -172,18 +173,30 @@ impl MigrationController { // Check if migration is applicable and should run, based on processor defined logic. // This checks actual state (e.g., "does v4 credential exist?") to ensure idempotency // even if migration record is deleted (reinstall scenario). - match processor.is_applicable().await { - Ok(false) => { + // Wrap in timeout to prevent hanging indefinitely + let is_applicable_result = tokio::time::timeout( + tokio::time::Duration::from_secs(MIGRATION_TIMEOUT_SECS), + processor.is_applicable(), + ) + .await; + + match is_applicable_result { + Ok(Ok(false)) => { info!("Migration {migration_id} not applicable, skipping"); summary.skipped += 1; continue; } - Err(e) => { + Ok(Err(e)) => { error!("Failed to check applicability for {migration_id}: {e:?}"); summary.skipped += 1; continue; } - Ok(true) => { + Err(_) => { + error!("Migration {migration_id} is_applicable() timed out after {MIGRATION_TIMEOUT_SECS} seconds, skipping"); + summary.skipped += 1; + continue; + } + Ok(Ok(true)) => { // Continue with execution. Fall through to the execution block below. } } @@ -206,8 +219,30 @@ impl MigrationController { // Save record before execution so we don't lose progress if the app crashes mid-migration self.save_record(&migration_id, &record)?; - // Execute - match processor.execute().await { + // Execute with timeout to prevent indefinite hangs + let execute_result = tokio::time::timeout( + tokio::time::Duration::from_secs(MIGRATION_TIMEOUT_SECS), + processor.execute(), + ) + .await; + + match execute_result { + Err(_) => { + // Timeout occurred + error!("Migration {migration_id} timed out after {MIGRATION_TIMEOUT_SECS} seconds"); + record.status = MigrationStatus::FailedRetryable; + record.last_error_code = Some("MIGRATION_TIMEOUT".to_string()); + record.last_error_message = + Some(format!("Migration execution timed out after {MIGRATION_TIMEOUT_SECS} seconds")); + + // Schedule retry with backoff + let retry_delay_ms = calculate_backoff_delay(record.attempts); + record.next_attempt_at = + Some(Utc::now() + Duration::milliseconds(retry_delay_ms)); + + summary.failed_retryable += 1; + } + Ok(result) => match result { Ok(ProcessorResult::Success) => { info!("Migration {migration_id} succeeded"); record.status = MigrationStatus::Succeeded; @@ -269,6 +304,7 @@ impl MigrationController { summary.failed_retryable += 1; } + } } // Save the final result (success/failure) to storage From 4d229a5e6ff6187a4bf9bec44f9657efd0c097d7 Mon Sep 17 00:00:00 2001 From: Tom Waite Date: Tue, 3 Feb 2026 13:03:26 -0800 Subject: [PATCH 02/18] style: lint --- bedrock/src/migration/controller.rs | 121 ++++++++++++++-------------- 1 file changed, 61 insertions(+), 60 deletions(-) diff --git a/bedrock/src/migration/controller.rs b/bedrock/src/migration/controller.rs index 4339e636..71b92e1c 100644 --- a/bedrock/src/migration/controller.rs +++ b/bedrock/src/migration/controller.rs @@ -243,68 +243,69 @@ impl MigrationController { summary.failed_retryable += 1; } Ok(result) => match result { - Ok(ProcessorResult::Success) => { - info!("Migration {migration_id} succeeded"); - record.status = MigrationStatus::Succeeded; - record.completed_at = Some(Utc::now()); - record.last_error_code = None; - record.last_error_message = None; - record.next_attempt_at = None; // Clear retry time - summary.succeeded += 1; - } - Ok(ProcessorResult::Retryable { - error_code, - error_message, - retry_after_ms, - }) => { - warn!("Migration {migration_id} failed (retryable): {error_code} - {error_message}"); - record.status = MigrationStatus::FailedRetryable; - record.last_error_code = Some(error_code); - record.last_error_message = Some(error_message); - - // Retry time is calculated according to exponential backoff and set on the - // record.next_attempt_at field. When the app is next opened and the - // migration is run again; the controller will check whether to run the - // migration again based on the record.next_attempt_at field. - let retry_delay_ms = retry_after_ms - .unwrap_or_else(|| calculate_backoff_delay(record.attempts)); - record.next_attempt_at = - Some(Utc::now() + Duration::milliseconds(retry_delay_ms)); - - summary.failed_retryable += 1; - } - Ok(ProcessorResult::Terminal { - error_code, - error_message, - }) => { - error!("Migration {migration_id} failed (terminal): {error_code} - {error_message}"); - record.status = MigrationStatus::FailedTerminal; - record.last_error_code = Some(error_code); - record.last_error_message = Some(error_message); - record.next_attempt_at = None; // Clear retry time - summary.failed_terminal += 1; - } - Ok(ProcessorResult::BlockedUserAction { reason }) => { - warn!("Migration {migration_id} blocked: {reason}"); - record.status = MigrationStatus::BlockedUserAction; - record.last_error_message = Some(reason); - record.next_attempt_at = None; // Clear retry time - summary.blocked += 1; - } - Err(e) => { - error!("Migration {migration_id} threw error: {e:?}"); - record.status = MigrationStatus::FailedRetryable; - record.last_error_code = Some("UNEXPECTED_ERROR".to_string()); - record.last_error_message = Some(format!("{e:?}")); + Ok(ProcessorResult::Success) => { + info!("Migration {migration_id} succeeded"); + record.status = MigrationStatus::Succeeded; + record.completed_at = Some(Utc::now()); + record.last_error_code = None; + record.last_error_message = None; + record.next_attempt_at = None; // Clear retry time + summary.succeeded += 1; + } + Ok(ProcessorResult::Retryable { + error_code, + error_message, + retry_after_ms, + }) => { + warn!("Migration {migration_id} failed (retryable): {error_code} - {error_message}"); + record.status = MigrationStatus::FailedRetryable; + record.last_error_code = Some(error_code); + record.last_error_message = Some(error_message); + + // Retry time is calculated according to exponential backoff and set on the + // record.next_attempt_at field. When the app is next opened and the + // migration is run again; the controller will check whether to run the + // migration again based on the record.next_attempt_at field. + let retry_delay_ms = retry_after_ms.unwrap_or_else(|| { + calculate_backoff_delay(record.attempts) + }); + record.next_attempt_at = + Some(Utc::now() + Duration::milliseconds(retry_delay_ms)); + + summary.failed_retryable += 1; + } + Ok(ProcessorResult::Terminal { + error_code, + error_message, + }) => { + error!("Migration {migration_id} failed (terminal): {error_code} - {error_message}"); + record.status = MigrationStatus::FailedTerminal; + record.last_error_code = Some(error_code); + record.last_error_message = Some(error_message); + record.next_attempt_at = None; // Clear retry time + summary.failed_terminal += 1; + } + Ok(ProcessorResult::BlockedUserAction { reason }) => { + warn!("Migration {migration_id} blocked: {reason}"); + record.status = MigrationStatus::BlockedUserAction; + record.last_error_message = Some(reason); + record.next_attempt_at = None; // Clear retry time + summary.blocked += 1; + } + Err(e) => { + error!("Migration {migration_id} threw error: {e:?}"); + record.status = MigrationStatus::FailedRetryable; + record.last_error_code = Some("UNEXPECTED_ERROR".to_string()); + record.last_error_message = Some(format!("{e:?}")); - // Schedule retry with backoff - let retry_delay_ms = calculate_backoff_delay(record.attempts); - record.next_attempt_at = - Some(Utc::now() + Duration::milliseconds(retry_delay_ms)); + // Schedule retry with backoff + let retry_delay_ms = calculate_backoff_delay(record.attempts); + record.next_attempt_at = + Some(Utc::now() + Duration::milliseconds(retry_delay_ms)); - summary.failed_retryable += 1; - } - } + summary.failed_retryable += 1; + } + }, } // Save the final result (success/failure) to storage From 578492779be3d4e4c1eed75f9eb828506eadbe49 Mon Sep 17 00:00:00 2001 From: Tom Waite Date: Tue, 3 Feb 2026 13:03:36 -0800 Subject: [PATCH 03/18] test: units for timeouts --- bedrock/src/migration/controller.rs | 200 ++++++++++++++++++++++++++++ 1 file changed, 200 insertions(+) diff --git a/bedrock/src/migration/controller.rs b/bedrock/src/migration/controller.rs index 71b92e1c..d5b25709 100644 --- a/bedrock/src/migration/controller.rs +++ b/bedrock/src/migration/controller.rs @@ -762,4 +762,204 @@ mod tests { // Processor should only execute once assert_eq!(processor.execution_count(), 1); } + + /// Test processor that hangs indefinitely during execute + struct HangingExecuteProcessor { + id: String, + } + + impl HangingExecuteProcessor { + fn new(id: &str) -> Self { + Self { id: id.to_string() } + } + } + + #[async_trait] + impl MigrationProcessor for HangingExecuteProcessor { + fn migration_id(&self) -> String { + self.id.clone() + } + + async fn is_applicable(&self) -> Result { + Ok(true) + } + + async fn execute(&self) -> Result { + // Hang longer than the test timeout (1 second) + sleep(Duration::from_secs(10)).await; + Ok(ProcessorResult::Success) + } + } + + /// Test processor that hangs indefinitely during is_applicable check + struct HangingApplicabilityProcessor { + id: String, + } + + impl HangingApplicabilityProcessor { + fn new(id: &str) -> Self { + Self { id: id.to_string() } + } + } + + #[async_trait] + impl MigrationProcessor for HangingApplicabilityProcessor { + fn migration_id(&self) -> String { + self.id.clone() + } + + async fn is_applicable(&self) -> Result { + // Hang longer than the test timeout (1 second) + sleep(Duration::from_secs(10)).await; + Ok(true) + } + + async fn execute(&self) -> Result { + Ok(ProcessorResult::Success) + } + } + + #[tokio::test] + #[serial] + async fn test_execute_timeout_marks_migration_as_retryable() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + let processor = Arc::new(HangingExecuteProcessor::new("test.migration.v1")); + let controller = + MigrationController::with_processors(kv_store.clone(), vec![processor]); + + // Run migrations - should timeout + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let summary = result.unwrap(); + assert_eq!(summary.total, 1); + assert_eq!(summary.failed_retryable, 1); + assert_eq!(summary.succeeded, 0); + + // Verify the migration record was saved with timeout error + let key = format!("{MIGRATION_KEY_PREFIX}test.migration.v1"); + let record_json = kv_store.get(key).expect("Record should exist"); + let record: MigrationRecord = + serde_json::from_str(&record_json).expect("Should deserialize"); + + assert!(matches!(record.status, MigrationStatus::FailedRetryable)); + assert_eq!( + record.last_error_code, + Some("MIGRATION_TIMEOUT".to_string()) + ); + assert!(record + .last_error_message + .unwrap() + .contains("timed out after")); + assert!(record.next_attempt_at.is_some()); // Should schedule retry + assert_eq!(record.attempts, 1); + } + + #[tokio::test] + #[serial] + async fn test_is_applicable_timeout_skips_migration() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + let processor = + Arc::new(HangingApplicabilityProcessor::new("test.migration.v1")); + let controller = + MigrationController::with_processors(kv_store.clone(), vec![processor]); + + // Run migrations - should skip due to is_applicable timeout + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let summary = result.unwrap(); + assert_eq!(summary.total, 1); + assert_eq!(summary.skipped, 1); + assert_eq!(summary.succeeded, 0); + assert_eq!(summary.failed_retryable, 0); + + // Verify the migration record was NOT created or updated (skipped before execution) + let key = format!("{MIGRATION_KEY_PREFIX}test.migration.v1"); + let record_result = kv_store.get(key); + // Record might exist from attempt to check applicability, but should not show InProgress + if let Ok(record_json) = record_result { + let record: MigrationRecord = + serde_json::from_str(&record_json).expect("Should deserialize"); + // Should not be InProgress or Succeeded + assert!( + !matches!(record.status, MigrationStatus::InProgress) + && !matches!(record.status, MigrationStatus::Succeeded) + ); + } + } + + #[tokio::test] + #[serial] + async fn test_timeout_does_not_block_subsequent_migrations() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + + // First processor will timeout, second should succeed + let processor1 = Arc::new(HangingExecuteProcessor::new("test.migration1.v1")); + let processor2 = Arc::new(TestProcessor::new("test.migration2.v1")); + + let controller = MigrationController::with_processors( + kv_store.clone(), + vec![processor1, processor2.clone()], + ); + + // Run migrations + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let summary = result.unwrap(); + assert_eq!(summary.total, 2); + assert_eq!(summary.failed_retryable, 1); // First migration timed out + assert_eq!(summary.succeeded, 1); // Second migration succeeded + + // Verify first migration is marked as retryable + let key1 = format!("{MIGRATION_KEY_PREFIX}test.migration1.v1"); + let record1_json = kv_store.get(key1).expect("Migration 1 record should exist"); + let record1: MigrationRecord = + serde_json::from_str(&record1_json).expect("Should deserialize"); + assert!(matches!(record1.status, MigrationStatus::FailedRetryable)); + assert_eq!( + record1.last_error_code, + Some("MIGRATION_TIMEOUT".to_string()) + ); + + // Verify second migration succeeded + let key2 = format!("{MIGRATION_KEY_PREFIX}test.migration2.v1"); + let record2_json = kv_store.get(key2).expect("Migration 2 record should exist"); + let record2: MigrationRecord = + serde_json::from_str(&record2_json).expect("Should deserialize"); + assert!(matches!(record2.status, MigrationStatus::Succeeded)); + + // Verify second processor actually executed + assert_eq!(processor2.execution_count(), 1); + } + + #[tokio::test] + #[serial] + async fn test_timeout_with_exponential_backoff() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + let processor = Arc::new(HangingExecuteProcessor::new("test.migration.v1")); + let controller = + MigrationController::with_processors(kv_store.clone(), vec![processor]); + + // Run migrations multiple times to test backoff + for attempt in 1..=3 { + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let key = format!("{MIGRATION_KEY_PREFIX}test.migration.v1"); + let record_json = kv_store.get(key.clone()).expect("Record should exist"); + let record: MigrationRecord = + serde_json::from_str(&record_json).expect("Should deserialize"); + + assert_eq!(record.attempts, attempt); + assert!(record.next_attempt_at.is_some()); + + // Clear next_attempt_at to allow immediate retry in test + let mut record_for_retry = record; + record_for_retry.next_attempt_at = None; + let json = serde_json::to_string(&record_for_retry).unwrap(); + kv_store.set(key, json).unwrap(); + } + } } From 29bd749d56f0c0f82ea2493f42ea7cb5178e0b98 Mon Sep 17 00:00:00 2001 From: Tom Waite Date: Tue, 3 Feb 2026 13:05:52 -0800 Subject: [PATCH 04/18] test: reduce timeout threshold for tests --- bedrock/src/migration/controller.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/bedrock/src/migration/controller.rs b/bedrock/src/migration/controller.rs index d5b25709..792e9c8d 100644 --- a/bedrock/src/migration/controller.rs +++ b/bedrock/src/migration/controller.rs @@ -11,7 +11,12 @@ use tokio::sync::Mutex; const MIGRATION_KEY_PREFIX: &str = "migration:"; const DEFAULT_RETRY_DELAY_MS: i64 = 60_000; // 1 minute const MAX_RETRY_DELAY_MS: i64 = 86_400_000; // 1 day -const MIGRATION_TIMEOUT_SECS: u64 = 20; // 20 seconds + +// Use a shorter timeout in tests to speed up test execution +#[cfg(not(test))] +const MIGRATION_TIMEOUT_SECS: u64 = 20; // 20 seconds in production +#[cfg(test)] +const MIGRATION_TIMEOUT_SECS: u64 = 1; // 1 second in tests /// Global lock to prevent concurrent migration runs across all controller instances. /// This is a process-wide coordination mechanism that ensures only one migration From c7a17e2081f2c5b6b12746d91338f5f48136fb3c Mon Sep 17 00:00:00 2001 From: Tom Waite Date: Tue, 3 Feb 2026 13:15:38 -0800 Subject: [PATCH 05/18] style: various lint fixes --- bedrock/src/migration/controller.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bedrock/src/migration/controller.rs b/bedrock/src/migration/controller.rs index 792e9c8d..bf1cfc83 100644 --- a/bedrock/src/migration/controller.rs +++ b/bedrock/src/migration/controller.rs @@ -12,9 +12,9 @@ const MIGRATION_KEY_PREFIX: &str = "migration:"; const DEFAULT_RETRY_DELAY_MS: i64 = 60_000; // 1 minute const MAX_RETRY_DELAY_MS: i64 = 86_400_000; // 1 day -// Use a shorter timeout in tests to speed up test execution #[cfg(not(test))] const MIGRATION_TIMEOUT_SECS: u64 = 20; // 20 seconds in production +// Use a shorter timeout in tests to speed up test ex #[cfg(test)] const MIGRATION_TIMEOUT_SECS: u64 = 1; // 1 second in tests @@ -796,7 +796,7 @@ mod tests { } } - /// Test processor that hangs indefinitely during is_applicable check + /// Test processor that hangs indefinitely during `is_applicable` check struct HangingApplicabilityProcessor { id: String, } From 1dd4a026315eec556ffe6d24abad8a842921a027 Mon Sep 17 00:00:00 2001 From: Tom Waite Date: Tue, 3 Feb 2026 13:17:12 -0800 Subject: [PATCH 06/18] style: more lint --- bedrock/src/migration/controller.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bedrock/src/migration/controller.rs b/bedrock/src/migration/controller.rs index bf1cfc83..d8844052 100644 --- a/bedrock/src/migration/controller.rs +++ b/bedrock/src/migration/controller.rs @@ -14,7 +14,7 @@ const MAX_RETRY_DELAY_MS: i64 = 86_400_000; // 1 day #[cfg(not(test))] const MIGRATION_TIMEOUT_SECS: u64 = 20; // 20 seconds in production -// Use a shorter timeout in tests to speed up test ex + // Use a shorter timeout in tests to speed up test ex #[cfg(test)] const MIGRATION_TIMEOUT_SECS: u64 = 1; // 1 second in tests From a00dad52a4318950bfc57aa3f95b3e2aaa9d533a Mon Sep 17 00:00:00 2001 From: Tom Waite Date: Tue, 3 Feb 2026 14:13:01 -0800 Subject: [PATCH 07/18] docs: timeout strategy --- bedrock/src/migration/controller.rs | 10 ++++++++++ bedrock/src/migration/processor.rs | 17 +++++++++++++++-- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/bedrock/src/migration/controller.rs b/bedrock/src/migration/controller.rs index d8844052..5bc633ea 100644 --- a/bedrock/src/migration/controller.rs +++ b/bedrock/src/migration/controller.rs @@ -97,6 +97,16 @@ impl MigrationController { /// If another migration is already in progress when this method is called, it will return /// immediately with an `InvalidOperation` error rather than waiting. /// + /// # Timeouts + /// + /// Each migration is subject to a 20-second timeout for both applicability checks and execution. + /// If a migration times out: + /// - During `is_applicable()`: The migration is skipped + /// - During `execute()`: The migration is marked as retryable and scheduled for retry with exponential backoff + /// + /// Subsequent migrations will continue regardless of timeouts, ensuring one slow migration + /// doesn't block the entire migration system. + /// /// # Errors /// /// Returns `MigrationError::InvalidOperation` if another migration run is already in progress. diff --git a/bedrock/src/migration/processor.rs b/bedrock/src/migration/processor.rs index 7c637fd0..ce82ed43 100644 --- a/bedrock/src/migration/processor.rs +++ b/bedrock/src/migration/processor.rs @@ -33,6 +33,21 @@ pub enum ProcessorResult { } /// Trait that all migration processors must implement +/// +/// # Timeouts and Cancellation Safety +/// +/// Both [`is_applicable`](Self::is_applicable) and [`execute`](Self::execute) are subject to timeouts +/// (20 seconds in production). When a timeout occurs, the future is dropped and the migration +/// is marked as failed (for `execute`) or skipped (for `is_applicable`). +/// +/// **IMPORTANT**: Implementations MUST be cancellation-safe: +/// +/// - **DO NOT** spawn background tasks using `tokio::spawn`, `std::thread::spawn`, or similar +/// that will continue running after the timeout +/// - **DO NOT** use blocking operations or FFI calls without proper cleanup +/// - **ENSURE** all work stops when the future is dropped (cooperative cancellation) +/// - **MAKE** migrations idempotent so partial execution can be safely retried +/// #[uniffi::export(with_foreign)] #[async_trait] pub trait MigrationProcessor: Send + Sync { @@ -46,7 +61,6 @@ pub trait MigrationProcessor: Send + Sync { /// to determine if the migration needs to run. This ensures the system is /// truly idempotent and handles edge cases gracefully. /// - /// /// # Returns /// - `Ok(true)` if the migration should run /// - `Ok(false)` if the migration should be skipped @@ -54,6 +68,5 @@ pub trait MigrationProcessor: Send + Sync { async fn is_applicable(&self) -> Result; /// Execute the migration - /// Called by the controller when the migration is ready to run async fn execute(&self) -> Result; } From 2b53c9f7d109d5407b126547818a528663cd6c5f Mon Sep 17 00:00:00 2001 From: Tom Waite Date: Tue, 3 Feb 2026 14:15:08 -0800 Subject: [PATCH 08/18] docs: cleanup comment --- bedrock/src/migration/controller.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/bedrock/src/migration/controller.rs b/bedrock/src/migration/controller.rs index 5bc633ea..5364e70b 100644 --- a/bedrock/src/migration/controller.rs +++ b/bedrock/src/migration/controller.rs @@ -14,7 +14,6 @@ const MAX_RETRY_DELAY_MS: i64 = 86_400_000; // 1 day #[cfg(not(test))] const MIGRATION_TIMEOUT_SECS: u64 = 20; // 20 seconds in production - // Use a shorter timeout in tests to speed up test ex #[cfg(test)] const MIGRATION_TIMEOUT_SECS: u64 = 1; // 1 second in tests From 02bddf76f8db348a7b01199737920daca4eaf22f Mon Sep 17 00:00:00 2001 From: Tom Waite Date: Tue, 3 Feb 2026 14:24:34 -0800 Subject: [PATCH 09/18] feat: skip running migration if blocked pending user action --- bedrock/src/migration/controller.rs | 48 +++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/bedrock/src/migration/controller.rs b/bedrock/src/migration/controller.rs index 5364e70b..6954fd0c 100644 --- a/bedrock/src/migration/controller.rs +++ b/bedrock/src/migration/controller.rs @@ -174,6 +174,13 @@ impl MigrationController { continue; } + // Skip if blocked pending user action + if matches!(record.status, MigrationStatus::BlockedUserAction) { + info!("Migration {migration_id} blocked pending user action, skipping"); + summary.blocked += 1; + continue; + } + // Check if we should retry (based on next_attempt_at) if let Some(next_attempt) = record.next_attempt_at { let now = Utc::now(); @@ -976,4 +983,45 @@ mod tests { kv_store.set(key, json).unwrap(); } } + + #[tokio::test] + #[serial] + async fn test_blocked_user_action_skips_migration() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + let processor = Arc::new(TestProcessor::new("test.migration.v1")); + + // Manually create a record with BlockedUserAction status + let mut record = MigrationRecord::new(); + record.status = MigrationStatus::BlockedUserAction; + record.last_error_message = Some("Waiting for user consent".to_string()); + + let key = format!("{MIGRATION_KEY_PREFIX}test.migration.v1"); + let json = serde_json::to_string(&record).unwrap(); + kv_store.set(key.clone(), json).unwrap(); + + let controller = + MigrationController::with_processors(kv_store.clone(), vec![processor.clone()]); + + // Run migrations - should skip blocked migration + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let summary = result.unwrap(); + assert_eq!(summary.total, 1); + assert_eq!(summary.blocked, 1); + assert_eq!(summary.succeeded, 0); + assert_eq!(summary.failed_retryable, 0); + + // Verify the migration was NOT executed + assert_eq!(processor.execution_count(), 0); + + // Verify the status is still BlockedUserAction + let record_json = kv_store.get(key).expect("Record should exist"); + let updated_record: MigrationRecord = + serde_json::from_str(&record_json).expect("Should deserialize"); + assert!(matches!( + updated_record.status, + MigrationStatus::BlockedUserAction + )); + } } From 28836b9e8eff1ece06da5f04980168ac0edd0122 Mon Sep 17 00:00:00 2001 From: Tom Waite Date: Tue, 3 Feb 2026 14:47:01 -0800 Subject: [PATCH 10/18] feat: detect stale in progress migrations --- bedrock/src/migration/controller.rs | 111 +++++++++++++++++++++++++++- 1 file changed, 109 insertions(+), 2 deletions(-) diff --git a/bedrock/src/migration/controller.rs b/bedrock/src/migration/controller.rs index 6954fd0c..798489d3 100644 --- a/bedrock/src/migration/controller.rs +++ b/bedrock/src/migration/controller.rs @@ -17,6 +17,12 @@ const MIGRATION_TIMEOUT_SECS: u64 = 20; // 20 seconds in production #[cfg(test)] const MIGRATION_TIMEOUT_SECS: u64 = 1; // 1 second in tests +// Consider InProgress migrations stale after this duration (indicates crash/abandon) +#[cfg(not(test))] +const STALE_IN_PROGRESS_MINS: i64 = 5; // 5 minutes in production +#[cfg(test)] +const STALE_IN_PROGRESS_MINS: i64 = 0; // Immediately stale in tests (for testing) + /// Global lock to prevent concurrent migration runs across all controller instances. /// This is a process-wide coordination mechanism that ensures only one migration /// can execute at a time, regardless of how many [`MigrationController`] instances exist. @@ -181,6 +187,28 @@ impl MigrationController { continue; } + // Check for stale InProgress status (likely from app crash) + if matches!(record.status, MigrationStatus::InProgress) { + if let Some(last_attempted) = record.last_attempted_at { + let elapsed = Utc::now() - last_attempted; + if elapsed > Duration::minutes(STALE_IN_PROGRESS_MINS) { + warn!( + "Migration {migration_id} stuck in InProgress for {elapsed:?}, resetting to retryable" + ); + record.status = MigrationStatus::FailedRetryable; + record.last_error_code = Some("STALE_IN_PROGRESS".to_string()); + record.last_error_message = Some(format!( + "Migration was stuck in InProgress state for {elapsed:?}, likely due to app crash" + )); + // Don't increment attempts since this wasn't a real attempt + // Schedule immediate retry (no backoff for crash recovery) + record.next_attempt_at = Some(Utc::now()); + self.save_record(&migration_id, &record)?; + // Continue to retry logic below + } + } + } + // Check if we should retry (based on next_attempt_at) if let Some(next_attempt) = record.next_attempt_at { let now = Utc::now(); @@ -999,8 +1027,10 @@ mod tests { let json = serde_json::to_string(&record).unwrap(); kv_store.set(key.clone(), json).unwrap(); - let controller = - MigrationController::with_processors(kv_store.clone(), vec![processor.clone()]); + let controller = MigrationController::with_processors( + kv_store.clone(), + vec![processor.clone()], + ); // Run migrations - should skip blocked migration let result = controller.run_migrations().await; @@ -1024,4 +1054,81 @@ mod tests { MigrationStatus::BlockedUserAction )); } + + #[tokio::test] + #[serial] + async fn test_stale_in_progress_resets_and_retries() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + let processor = Arc::new(TestProcessor::new("test.migration.v1")); + + // Create a stale InProgress record (simulating app crash) + let mut record = MigrationRecord::new(); + record.status = MigrationStatus::InProgress; + record.attempts = 1; + // Set last_attempted_at to 10 minutes ago (stale) + record.last_attempted_at = Some(Utc::now() - chrono::Duration::minutes(10)); + + let key = format!("{MIGRATION_KEY_PREFIX}test.migration.v1"); + let json = serde_json::to_string(&record).unwrap(); + kv_store.set(key.clone(), json).unwrap(); + + let controller = MigrationController::with_processors( + kv_store.clone(), + vec![processor.clone()], + ); + + // Run migrations - should detect staleness and retry + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let summary = result.unwrap(); + assert_eq!(summary.total, 1); + assert_eq!(summary.succeeded, 1); // Should succeed after reset + assert_eq!(summary.failed_retryable, 0); + + // Verify the migration was executed + assert_eq!(processor.execution_count(), 1); + + // Verify the final status is Succeeded + let record_json = kv_store.get(key).expect("Record should exist"); + let updated_record: MigrationRecord = + serde_json::from_str(&record_json).expect("Should deserialize"); + assert!(matches!(updated_record.status, MigrationStatus::Succeeded)); + // Attempt counter should have incremented + assert_eq!(updated_record.attempts, 2); + } + + #[tokio::test] + #[serial] + async fn test_fresh_in_progress_not_treated_as_stale() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + let processor = Arc::new(TestProcessor::new("test.migration.v1")); + + // Create a fresh InProgress record (recent) + let mut record = MigrationRecord::new(); + record.status = MigrationStatus::InProgress; + record.attempts = 1; + // Set last_attempted_at to now (not stale) + record.last_attempted_at = Some(Utc::now()); + + let key = format!("{MIGRATION_KEY_PREFIX}test.migration.v1"); + let json = serde_json::to_string(&record).unwrap(); + kv_store.set(key.clone(), json).unwrap(); + + let controller = MigrationController::with_processors( + kv_store.clone(), + vec![processor.clone()], + ); + + // Run migrations - should treat as normal InProgress and retry + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let summary = result.unwrap(); + assert_eq!(summary.total, 1); + assert_eq!(summary.succeeded, 1); + + // Verify the migration was executed + assert_eq!(processor.execution_count(), 1); + } } From d1940054fe62e89e4b58d707d980b75afe4ea092 Mon Sep 17 00:00:00 2001 From: Tom Waite Date: Tue, 3 Feb 2026 15:00:54 -0800 Subject: [PATCH 11/18] refactor: remove blocked user action state --- bedrock/src/migration/controller.rs | 64 +---------------------------- bedrock/src/migration/processor.rs | 6 --- bedrock/src/migration/state.rs | 2 - 3 files changed, 2 insertions(+), 70 deletions(-) diff --git a/bedrock/src/migration/controller.rs b/bedrock/src/migration/controller.rs index 798489d3..e1429737 100644 --- a/bedrock/src/migration/controller.rs +++ b/bedrock/src/migration/controller.rs @@ -45,8 +45,6 @@ pub struct MigrationRunSummary { pub failed_retryable: i32, /// Number of migrations that failed with terminal errors (won't retry) pub failed_terminal: i32, - /// Number of migrations blocked pending user action - pub blocked: i32, /// Number of migrations that were skipped (already completed or not applicable) pub skipped: i32, } @@ -155,7 +153,6 @@ impl MigrationController { succeeded: 0, failed_retryable: 0, failed_terminal: 0, - blocked: 0, skipped: 0, }; @@ -180,13 +177,6 @@ impl MigrationController { continue; } - // Skip if blocked pending user action - if matches!(record.status, MigrationStatus::BlockedUserAction) { - info!("Migration {migration_id} blocked pending user action, skipping"); - summary.blocked += 1; - continue; - } - // Check for stale InProgress status (likely from app crash) if matches!(record.status, MigrationStatus::InProgress) { if let Some(last_attempted) = record.last_attempted_at { @@ -334,13 +324,6 @@ impl MigrationController { record.next_attempt_at = None; // Clear retry time summary.failed_terminal += 1; } - Ok(ProcessorResult::BlockedUserAction { reason }) => { - warn!("Migration {migration_id} blocked: {reason}"); - record.status = MigrationStatus::BlockedUserAction; - record.last_error_message = Some(reason); - record.next_attempt_at = None; // Clear retry time - summary.blocked += 1; - } Err(e) => { error!("Migration {migration_id} threw error: {e:?}"); record.status = MigrationStatus::FailedRetryable; @@ -362,8 +345,8 @@ impl MigrationController { } info!( - "Migration run completed: {} succeeded, {} retryable, {} terminal, {} blocked, {} skipped", - summary.succeeded, summary.failed_retryable, summary.failed_terminal, summary.blocked, summary.skipped + "Migration run completed: {} succeeded, {} retryable, {} terminal, {} skipped", + summary.succeeded, summary.failed_retryable, summary.failed_terminal, summary.skipped ); Ok(summary) @@ -1012,49 +995,6 @@ mod tests { } } - #[tokio::test] - #[serial] - async fn test_blocked_user_action_skips_migration() { - let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); - let processor = Arc::new(TestProcessor::new("test.migration.v1")); - - // Manually create a record with BlockedUserAction status - let mut record = MigrationRecord::new(); - record.status = MigrationStatus::BlockedUserAction; - record.last_error_message = Some("Waiting for user consent".to_string()); - - let key = format!("{MIGRATION_KEY_PREFIX}test.migration.v1"); - let json = serde_json::to_string(&record).unwrap(); - kv_store.set(key.clone(), json).unwrap(); - - let controller = MigrationController::with_processors( - kv_store.clone(), - vec![processor.clone()], - ); - - // Run migrations - should skip blocked migration - let result = controller.run_migrations().await; - assert!(result.is_ok()); - - let summary = result.unwrap(); - assert_eq!(summary.total, 1); - assert_eq!(summary.blocked, 1); - assert_eq!(summary.succeeded, 0); - assert_eq!(summary.failed_retryable, 0); - - // Verify the migration was NOT executed - assert_eq!(processor.execution_count(), 0); - - // Verify the status is still BlockedUserAction - let record_json = kv_store.get(key).expect("Record should exist"); - let updated_record: MigrationRecord = - serde_json::from_str(&record_json).expect("Should deserialize"); - assert!(matches!( - updated_record.status, - MigrationStatus::BlockedUserAction - )); - } - #[tokio::test] #[serial] async fn test_stale_in_progress_resets_and_retries() { diff --git a/bedrock/src/migration/processor.rs b/bedrock/src/migration/processor.rs index ce82ed43..b8b8a9c5 100644 --- a/bedrock/src/migration/processor.rs +++ b/bedrock/src/migration/processor.rs @@ -24,12 +24,6 @@ pub enum ProcessorResult { /// Human-readable error message error_message: String, }, - - /// Migration blocked pending user action - BlockedUserAction { - /// Reason why the migration is blocked - reason: String, - }, } /// Trait that all migration processors must implement diff --git a/bedrock/src/migration/state.rs b/bedrock/src/migration/state.rs index 123ceca7..04645cf2 100644 --- a/bedrock/src/migration/state.rs +++ b/bedrock/src/migration/state.rs @@ -14,8 +14,6 @@ pub enum MigrationStatus { FailedRetryable, /// Migration failed with terminal error (won't retry) FailedTerminal, - /// Migration blocked pending user action - BlockedUserAction, } /// Record of a single migration's execution state From b801b7260bfc7d54047792aedb6dfc386457790b Mon Sep 17 00:00:00 2001 From: Tom Waite Date: Tue, 3 Feb 2026 16:50:13 -0800 Subject: [PATCH 12/18] docs: add state transition docs for controller --- bedrock/src/migration/README.md | 51 +++++++++++++++++++++++++++++ bedrock/src/migration/controller.rs | 2 +- 2 files changed, 52 insertions(+), 1 deletion(-) create mode 100644 bedrock/src/migration/README.md diff --git a/bedrock/src/migration/README.md b/bedrock/src/migration/README.md new file mode 100644 index 00000000..57db3bf0 --- /dev/null +++ b/bedrock/src/migration/README.md @@ -0,0 +1,51 @@ +## Migration Controller +The Migration `controller.rs` is a simple state machine that runs a for loop over a series of processors and executes the processors. The processors contain logic around performing an individual migration and conform to a simple interface: + +```rust +trait Process { + /// Determines whether the migration should run. + fn is_applicable(&self) -> bool; + + /// Business logic that performs the migration. + fn execute(&self) -> Result<(), MigrationError>; +} +``` + +The migration system is a permanent artifact of the app and is run on app start to bring the app to a expected state. The processors are expected to be idempotent. + +## States +The `controller.rs` stores a key value mapping between the id of the migration and a record of the migration. The record most importantly contains the status of the migration, but also useful monitoring and debug information such as `started_at`, `last_attempted_at`. + +The possible states are: +- `NotStarted` - migration has not been performed +- `InProgress` - migration started, but was interrupted +- `Succeeded` - migration successfully completed +- `FailedRetryable` - migration failed, but can be retried (e.g. there was a network error) +- `FailedTerminal` - migration failed and represents a terminal state. It can not be retried. + +Note, this migration state storage is only present to act as an optimisation so that we don't need to repeatedly call `process.is_applicable()` on each app open. If the app data is wiped, the `controller.rs` would iterate through processors, call `process.is_applicable()` and rebuild the state. + +## State transitions +1. `NotStarted` + - → `InProgress` when migration execution begins + - → `Succeeded` if `is_applicable()` returns false (migration not needed) + - → `FailedRetryable` if `is_applicable()` fails or times out + - → `FailedTerminal` if execution fails with terminal error + +2. `InProgress` + - → `Succeeded` when `execute()` completes successfully + - → `FailedRetryable` if `execute()` times out or fails with retryable error + - → `FailedTerminal` if `execute()` fails with terminal error + - → `FailedRetryable` if detected as stale (app crashed mid-migration) + +3. `Succeeded` + - Terminal state. No further transitions. Migration is skipped on subsequent runs. + +4. `FailedRetryable` + - → `InProgress` when retry is attempted (after exponential backoff period) + - → `Succeeded` if retry succeeds + - → `FailedTerminal` if retry fails with terminal error + - Remains `FailedRetryable` if retry fails again with retryable error (backoff period increases) + +5. `FailedTerminal` + - Terminal state. No further transitions. Migration is permanently skipped on subsequent runs. diff --git a/bedrock/src/migration/controller.rs b/bedrock/src/migration/controller.rs index e1429737..e7409abe 100644 --- a/bedrock/src/migration/controller.rs +++ b/bedrock/src/migration/controller.rs @@ -191,7 +191,7 @@ impl MigrationController { "Migration was stuck in InProgress state for {elapsed:?}, likely due to app crash" )); // Don't increment attempts since this wasn't a real attempt - // Schedule immediate retry (no backoff for crash recovery) + // Schedule immediate retry record.next_attempt_at = Some(Utc::now()); self.save_record(&migration_id, &record)?; // Continue to retry logic below From 7392babe5831f12251046ba066773ee1e6df6251 Mon Sep 17 00:00:00 2001 From: Tom Waite Date: Tue, 3 Feb 2026 16:54:31 -0800 Subject: [PATCH 13/18] docs: update --- bedrock/src/migration/README.md | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/bedrock/src/migration/README.md b/bedrock/src/migration/README.md index 57db3bf0..f3e25bc4 100644 --- a/bedrock/src/migration/README.md +++ b/bedrock/src/migration/README.md @@ -23,14 +23,13 @@ The possible states are: - `FailedRetryable` - migration failed, but can be retried (e.g. there was a network error) - `FailedTerminal` - migration failed and represents a terminal state. It can not be retried. -Note, this migration state storage is only present to act as an optimisation so that we don't need to repeatedly call `process.is_applicable()` on each app open. If the app data is wiped, the `controller.rs` would iterate through processors, call `process.is_applicable()` and rebuild the state. +The migration state storage optimizes subsequent app starts by skipping `Succeeded` and `FailedTerminal` migrations without calling `process.is_applicable()`. For `NotStarted` and `FailedRetryable` migrations, `process.is_applicable()` is called each time to detect when they become applicable. This ensures migrations can respond to changing app state. ## State transitions 1. `NotStarted` - - → `InProgress` when migration execution begins - - → `Succeeded` if `is_applicable()` returns false (migration not needed) + - → `InProgress` when `is_applicable()` returns true and migration execution begins + - Remains `NotStarted` if `is_applicable()` returns false (will be checked again on next app start) - → `FailedRetryable` if `is_applicable()` fails or times out - - → `FailedTerminal` if execution fails with terminal error 2. `InProgress` - → `Succeeded` when `execute()` completes successfully From 6cc6e872d8cd808ddb481f8a83527a650a09322f Mon Sep 17 00:00:00 2001 From: Tom Waite Date: Tue, 3 Feb 2026 17:08:46 -0800 Subject: [PATCH 14/18] refactor: use clearer state based detection pattern --- bedrock/src/migration/controller.rs | 86 ++++++++++++++++++----------- 1 file changed, 55 insertions(+), 31 deletions(-) diff --git a/bedrock/src/migration/controller.rs b/bedrock/src/migration/controller.rs index e7409abe..8a2f8bbf 100644 --- a/bedrock/src/migration/controller.rs +++ b/bedrock/src/migration/controller.rs @@ -163,50 +163,74 @@ impl MigrationController { // Load the current record for this migration (or create new one if first time) let mut record = self.load_record(&migration_id)?; - // Skip if already succeeded - if matches!(record.status, MigrationStatus::Succeeded) { - info!("Migration {migration_id} already succeeded, skipping"); - summary.skipped += 1; - continue; - } + // Determine if this migration should be attempted based on its current status + let should_attempt = match record.status { + MigrationStatus::Succeeded => { + // Terminal state - migration completed successfully + info!("Migration {migration_id} already succeeded, skipping"); + summary.skipped += 1; + false + } - // Skip if terminal failure (non-retryable) - if matches!(record.status, MigrationStatus::FailedTerminal) { - info!("Migration {migration_id} failed terminally, skipping"); - summary.skipped += 1; - continue; - } + MigrationStatus::FailedTerminal => { + // Terminal state - migration failed permanently + info!("Migration {migration_id} failed terminally, skipping"); + summary.skipped += 1; + false + } + + MigrationStatus::InProgress => { + // Check for staleness (app crash recovery) + // InProgress without timestamp is treated as stale + let is_stale = record.last_attempted_at.is_none_or(|last_attempted| { + let elapsed = Utc::now() - last_attempted; + elapsed > Duration::minutes(STALE_IN_PROGRESS_MINS) + }); - // Check for stale InProgress status (likely from app crash) - if matches!(record.status, MigrationStatus::InProgress) { - if let Some(last_attempted) = record.last_attempted_at { - let elapsed = Utc::now() - last_attempted; - if elapsed > Duration::minutes(STALE_IN_PROGRESS_MINS) { + if is_stale { warn!( - "Migration {migration_id} stuck in InProgress for {elapsed:?}, resetting to retryable" + "Migration {migration_id} stuck in InProgress, resetting to retryable" ); record.status = MigrationStatus::FailedRetryable; record.last_error_code = Some("STALE_IN_PROGRESS".to_string()); - record.last_error_message = Some(format!( - "Migration was stuck in InProgress state for {elapsed:?}, likely due to app crash" - )); - // Don't increment attempts since this wasn't a real attempt + record.last_error_message = Some( + "Migration was stuck in InProgress state, likely due to app crash" + .to_string(), + ); // Schedule immediate retry record.next_attempt_at = Some(Utc::now()); self.save_record(&migration_id, &record)?; - // Continue to retry logic below + true // Proceed to retry + } else { + // Fresh InProgress - skip to avoid concurrent execution + info!("Migration {migration_id} currently in progress, skipping"); + summary.skipped += 1; + false } } - } - // Check if we should retry (based on next_attempt_at) - if let Some(next_attempt) = record.next_attempt_at { - let now = Utc::now(); - if now < next_attempt { - info!("Migration {migration_id} scheduled for retry at {next_attempt}, skipping"); - summary.skipped += 1; - continue; + MigrationStatus::FailedRetryable => { + // Check retry timing (exponential backoff) + // No retry time set = attempt immediately + record.next_attempt_at.is_none_or(|next_attempt| { + if Utc::now() < next_attempt { + info!("Migration {migration_id} scheduled for retry at {next_attempt}, skipping"); + summary.skipped += 1; + false + } else { + true // Ready to retry + } + }) } + + MigrationStatus::NotStarted => { + // First time attempting this migration + true + } + }; + + if !should_attempt { + continue; } // Check if migration is applicable and should run, based on processor defined logic. From eeee9f305ca22d5182fc7307bff7e274af834d3e Mon Sep 17 00:00:00 2001 From: Tom Waite Date: Tue, 3 Feb 2026 17:12:59 -0800 Subject: [PATCH 15/18] test: add state based tests --- bedrock/src/migration/controller.rs | 357 ++++++++++++++++++++++++++++ 1 file changed, 357 insertions(+) diff --git a/bedrock/src/migration/controller.rs b/bedrock/src/migration/controller.rs index 8a2f8bbf..fa016017 100644 --- a/bedrock/src/migration/controller.rs +++ b/bedrock/src/migration/controller.rs @@ -1095,4 +1095,361 @@ mod tests { // Verify the migration was executed assert_eq!(processor.execution_count(), 1); } + + #[tokio::test] + #[serial] + async fn test_succeeded_state_is_terminal_and_skipped() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + let processor = Arc::new(TestProcessor::new("test.migration.v1")); + + // Manually create a Succeeded record + let mut record = MigrationRecord::new(); + record.status = MigrationStatus::Succeeded; + record.completed_at = Some(Utc::now()); + + let key = format!("{MIGRATION_KEY_PREFIX}test.migration.v1"); + let json = serde_json::to_string(&record).unwrap(); + kv_store.set(key.clone(), json).unwrap(); + + let controller = + MigrationController::with_processors(kv_store.clone(), vec![processor.clone()]); + + // Run migrations - should skip + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let summary = result.unwrap(); + assert_eq!(summary.total, 1); + assert_eq!(summary.skipped, 1); + assert_eq!(summary.succeeded, 0); + + // Verify is_applicable() was NOT called (processor was never executed) + assert_eq!(processor.execution_count(), 0); + + // Verify status is still Succeeded + let record_json = kv_store.get(key).expect("Record should exist"); + let updated_record: MigrationRecord = + serde_json::from_str(&record_json).expect("Should deserialize"); + assert!(matches!( + updated_record.status, + MigrationStatus::Succeeded + )); + } + + #[tokio::test] + #[serial] + async fn test_failed_terminal_state_is_permanent_and_skipped() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + let processor = Arc::new(TestProcessor::new("test.migration.v1")); + + // Manually create a FailedTerminal record + let mut record = MigrationRecord::new(); + record.status = MigrationStatus::FailedTerminal; + record.last_error_code = Some("TERMINAL_ERROR".to_string()); + record.last_error_message = Some("Permanent failure".to_string()); + + let key = format!("{MIGRATION_KEY_PREFIX}test.migration.v1"); + let json = serde_json::to_string(&record).unwrap(); + kv_store.set(key.clone(), json).unwrap(); + + let controller = + MigrationController::with_processors(kv_store.clone(), vec![processor.clone()]); + + // Run migrations multiple times - should always skip + for _ in 0..3 { + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let summary = result.unwrap(); + assert_eq!(summary.skipped, 1); + assert_eq!(summary.succeeded, 0); + } + + // Verify processor was never executed + assert_eq!(processor.execution_count(), 0); + + // Verify status is still FailedTerminal + let record_json = kv_store.get(key).expect("Record should exist"); + let updated_record: MigrationRecord = + serde_json::from_str(&record_json).expect("Should deserialize"); + assert!(matches!( + updated_record.status, + MigrationStatus::FailedTerminal + )); + } + + #[tokio::test] + #[serial] + async fn test_not_started_state_checks_applicability_and_executes() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + let processor = Arc::new(TestProcessor::new("test.migration.v1")); + + let controller = + MigrationController::with_processors(kv_store.clone(), vec![processor.clone()]); + + // Run migrations - NotStarted should check is_applicable and execute + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let summary = result.unwrap(); + assert_eq!(summary.total, 1); + assert_eq!(summary.succeeded, 1); + + // Verify processor was executed + assert_eq!(processor.execution_count(), 1); + + // Verify status transitioned to Succeeded + let key = format!("{MIGRATION_KEY_PREFIX}test.migration.v1"); + let record_json = kv_store.get(key).expect("Record should exist"); + let record: MigrationRecord = + serde_json::from_str(&record_json).expect("Should deserialize"); + assert!(matches!(record.status, MigrationStatus::Succeeded)); + assert_eq!(record.attempts, 1); + } + + #[tokio::test] + #[serial] + async fn test_failed_retryable_respects_backoff_timing() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + let processor = Arc::new(TestProcessor::new("test.migration.v1")); + + // Create FailedRetryable record with retry scheduled for future + let mut record = MigrationRecord::new(); + record.status = MigrationStatus::FailedRetryable; + record.attempts = 1; + record.last_error_code = Some("NETWORK_ERROR".to_string()); + record.next_attempt_at = Some(Utc::now() + chrono::Duration::hours(1)); // 1 hour in future + + let key = format!("{MIGRATION_KEY_PREFIX}test.migration.v1"); + let json = serde_json::to_string(&record).unwrap(); + kv_store.set(key.clone(), json).unwrap(); + + let controller = + MigrationController::with_processors(kv_store.clone(), vec![processor.clone()]); + + // Run migrations - should skip due to retry timing + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let summary = result.unwrap(); + assert_eq!(summary.total, 1); + assert_eq!(summary.skipped, 1); + assert_eq!(summary.succeeded, 0); + + // Verify processor was NOT executed + assert_eq!(processor.execution_count(), 0); + + // Verify status is still FailedRetryable + let record_json = kv_store.get(key).expect("Record should exist"); + let updated_record: MigrationRecord = + serde_json::from_str(&record_json).expect("Should deserialize"); + assert!(matches!( + updated_record.status, + MigrationStatus::FailedRetryable + )); + assert_eq!(updated_record.attempts, 1); // Attempts should not increment + } + + #[tokio::test] + #[serial] + async fn test_failed_retryable_retries_when_backoff_expires() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + let processor = Arc::new(TestProcessor::new("test.migration.v1")); + + // Create FailedRetryable record with retry scheduled for past + let mut record = MigrationRecord::new(); + record.status = MigrationStatus::FailedRetryable; + record.attempts = 1; + record.last_error_code = Some("NETWORK_ERROR".to_string()); + record.next_attempt_at = Some(Utc::now() - chrono::Duration::minutes(5)); // 5 minutes ago + + let key = format!("{MIGRATION_KEY_PREFIX}test.migration.v1"); + let json = serde_json::to_string(&record).unwrap(); + kv_store.set(key.clone(), json).unwrap(); + + let controller = + MigrationController::with_processors(kv_store.clone(), vec![processor.clone()]); + + // Run migrations - should retry and succeed + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let summary = result.unwrap(); + assert_eq!(summary.total, 1); + assert_eq!(summary.succeeded, 1); + assert_eq!(summary.skipped, 0); + + // Verify processor was executed + assert_eq!(processor.execution_count(), 1); + + // Verify status transitioned to Succeeded + let record_json = kv_store.get(key).expect("Record should exist"); + let updated_record: MigrationRecord = + serde_json::from_str(&record_json).expect("Should deserialize"); + assert!(matches!( + updated_record.status, + MigrationStatus::Succeeded + )); + assert_eq!(updated_record.attempts, 2); // Should increment from 1 to 2 + } + + #[tokio::test] + #[serial] + async fn test_failed_retryable_without_retry_time_executes_immediately() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + let processor = Arc::new(TestProcessor::new("test.migration.v1")); + + // Create FailedRetryable record without next_attempt_at + let mut record = MigrationRecord::new(); + record.status = MigrationStatus::FailedRetryable; + record.attempts = 2; + record.next_attempt_at = None; // No retry time set + + let key = format!("{MIGRATION_KEY_PREFIX}test.migration.v1"); + let json = serde_json::to_string(&record).unwrap(); + kv_store.set(key.clone(), json).unwrap(); + + let controller = + MigrationController::with_processors(kv_store.clone(), vec![processor.clone()]); + + // Run migrations - should execute immediately + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let summary = result.unwrap(); + assert_eq!(summary.succeeded, 1); + + // Verify processor was executed + assert_eq!(processor.execution_count(), 1); + + // Verify status transitioned to Succeeded + let record_json = kv_store.get(key).expect("Record should exist"); + let updated_record: MigrationRecord = + serde_json::from_str(&record_json).expect("Should deserialize"); + assert!(matches!( + updated_record.status, + MigrationStatus::Succeeded + )); + } + + #[tokio::test] + #[serial] + async fn test_in_progress_without_timestamp_treated_as_stale() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + let processor = Arc::new(TestProcessor::new("test.migration.v1")); + + // Create InProgress record without last_attempted_at timestamp + let mut record = MigrationRecord::new(); + record.status = MigrationStatus::InProgress; + record.attempts = 1; + record.last_attempted_at = None; // No timestamp + + let key = format!("{MIGRATION_KEY_PREFIX}test.migration.v1"); + let json = serde_json::to_string(&record).unwrap(); + kv_store.set(key.clone(), json).unwrap(); + + let controller = + MigrationController::with_processors(kv_store.clone(), vec![processor.clone()]); + + // Run migrations - should treat as stale and retry + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let summary = result.unwrap(); + assert_eq!(summary.succeeded, 1); + + // Verify processor was executed + assert_eq!(processor.execution_count(), 1); + + // Verify the record was reset and completed + let record_json = kv_store.get(key).expect("Record should exist"); + let updated_record: MigrationRecord = + serde_json::from_str(&record_json).expect("Should deserialize"); + assert!(matches!( + updated_record.status, + MigrationStatus::Succeeded + )); + } + + #[tokio::test] + #[serial] + async fn test_state_based_execution_order() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + + // Create 5 processors with different states + let processor1 = Arc::new(TestProcessor::new("test.succeeded.v1")); + let processor2 = Arc::new(TestProcessor::new("test.terminal.v1")); + let processor3 = Arc::new(TestProcessor::new("test.retryable.v1")); + let processor4 = Arc::new(TestProcessor::new("test.in_progress.v1")); + let processor5 = Arc::new(TestProcessor::new("test.not_started.v1")); + + // Set up initial states + let mut succeeded = MigrationRecord::new(); + succeeded.status = MigrationStatus::Succeeded; + kv_store + .set( + format!("{MIGRATION_KEY_PREFIX}test.succeeded.v1"), + serde_json::to_string(&succeeded).unwrap(), + ) + .unwrap(); + + let mut terminal = MigrationRecord::new(); + terminal.status = MigrationStatus::FailedTerminal; + kv_store + .set( + format!("{MIGRATION_KEY_PREFIX}test.terminal.v1"), + serde_json::to_string(&terminal).unwrap(), + ) + .unwrap(); + + let mut retryable = MigrationRecord::new(); + retryable.status = MigrationStatus::FailedRetryable; + retryable.next_attempt_at = Some(Utc::now() - chrono::Duration::minutes(1)); // Ready to retry + kv_store + .set( + format!("{MIGRATION_KEY_PREFIX}test.retryable.v1"), + serde_json::to_string(&retryable).unwrap(), + ) + .unwrap(); + + let mut in_progress = MigrationRecord::new(); + in_progress.status = MigrationStatus::InProgress; + in_progress.last_attempted_at = Some(Utc::now()); + kv_store + .set( + format!("{MIGRATION_KEY_PREFIX}test.in_progress.v1"), + serde_json::to_string(&in_progress).unwrap(), + ) + .unwrap(); + + // NotStarted doesn't need setup - it's the default + + let controller = MigrationController::with_processors( + kv_store.clone(), + vec![ + processor1.clone(), + processor2.clone(), + processor3.clone(), + processor4.clone(), + processor5.clone(), + ], + ); + + // Run migrations + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let summary = result.unwrap(); + assert_eq!(summary.total, 5); + // In test mode (STALE_IN_PROGRESS_MINS=0), InProgress is treated as stale and executed + assert_eq!(summary.succeeded, 3); // retryable + in_progress + not_started + assert_eq!(summary.skipped, 2); // succeeded + terminal + + // Verify execution counts + assert_eq!(processor1.execution_count(), 0); // Succeeded - skipped + assert_eq!(processor2.execution_count(), 0); // Terminal - skipped + assert_eq!(processor3.execution_count(), 1); // Retryable - executed + assert_eq!(processor4.execution_count(), 1); // InProgress - treated as stale, executed + assert_eq!(processor5.execution_count(), 1); // NotStarted - executed + } } From d49bb3ffb67eee04219132163a9b47fdbb600890 Mon Sep 17 00:00:00 2001 From: Tom Waite Date: Tue, 3 Feb 2026 17:14:51 -0800 Subject: [PATCH 16/18] style: formatting --- bedrock/src/migration/controller.rs | 75 +++++++++++++++-------------- 1 file changed, 40 insertions(+), 35 deletions(-) diff --git a/bedrock/src/migration/controller.rs b/bedrock/src/migration/controller.rs index fa016017..749ee629 100644 --- a/bedrock/src/migration/controller.rs +++ b/bedrock/src/migration/controller.rs @@ -182,10 +182,11 @@ impl MigrationController { MigrationStatus::InProgress => { // Check for staleness (app crash recovery) // InProgress without timestamp is treated as stale - let is_stale = record.last_attempted_at.is_none_or(|last_attempted| { - let elapsed = Utc::now() - last_attempted; - elapsed > Duration::minutes(STALE_IN_PROGRESS_MINS) - }); + let is_stale = + record.last_attempted_at.is_none_or(|last_attempted| { + let elapsed = Utc::now() - last_attempted; + elapsed > Duration::minutes(STALE_IN_PROGRESS_MINS) + }); if is_stale { warn!( @@ -203,7 +204,9 @@ impl MigrationController { true // Proceed to retry } else { // Fresh InProgress - skip to avoid concurrent execution - info!("Migration {migration_id} currently in progress, skipping"); + info!( + "Migration {migration_id} currently in progress, skipping" + ); summary.skipped += 1; false } @@ -1111,8 +1114,10 @@ mod tests { let json = serde_json::to_string(&record).unwrap(); kv_store.set(key.clone(), json).unwrap(); - let controller = - MigrationController::with_processors(kv_store.clone(), vec![processor.clone()]); + let controller = MigrationController::with_processors( + kv_store.clone(), + vec![processor.clone()], + ); // Run migrations - should skip let result = controller.run_migrations().await; @@ -1130,10 +1135,7 @@ mod tests { let record_json = kv_store.get(key).expect("Record should exist"); let updated_record: MigrationRecord = serde_json::from_str(&record_json).expect("Should deserialize"); - assert!(matches!( - updated_record.status, - MigrationStatus::Succeeded - )); + assert!(matches!(updated_record.status, MigrationStatus::Succeeded)); } #[tokio::test] @@ -1152,8 +1154,10 @@ mod tests { let json = serde_json::to_string(&record).unwrap(); kv_store.set(key.clone(), json).unwrap(); - let controller = - MigrationController::with_processors(kv_store.clone(), vec![processor.clone()]); + let controller = MigrationController::with_processors( + kv_store.clone(), + vec![processor.clone()], + ); // Run migrations multiple times - should always skip for _ in 0..3 { @@ -1184,8 +1188,10 @@ mod tests { let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); let processor = Arc::new(TestProcessor::new("test.migration.v1")); - let controller = - MigrationController::with_processors(kv_store.clone(), vec![processor.clone()]); + let controller = MigrationController::with_processors( + kv_store.clone(), + vec![processor.clone()], + ); // Run migrations - NotStarted should check is_applicable and execute let result = controller.run_migrations().await; @@ -1224,8 +1230,10 @@ mod tests { let json = serde_json::to_string(&record).unwrap(); kv_store.set(key.clone(), json).unwrap(); - let controller = - MigrationController::with_processors(kv_store.clone(), vec![processor.clone()]); + let controller = MigrationController::with_processors( + kv_store.clone(), + vec![processor.clone()], + ); // Run migrations - should skip due to retry timing let result = controller.run_migrations().await; @@ -1267,8 +1275,10 @@ mod tests { let json = serde_json::to_string(&record).unwrap(); kv_store.set(key.clone(), json).unwrap(); - let controller = - MigrationController::with_processors(kv_store.clone(), vec![processor.clone()]); + let controller = MigrationController::with_processors( + kv_store.clone(), + vec![processor.clone()], + ); // Run migrations - should retry and succeed let result = controller.run_migrations().await; @@ -1286,10 +1296,7 @@ mod tests { let record_json = kv_store.get(key).expect("Record should exist"); let updated_record: MigrationRecord = serde_json::from_str(&record_json).expect("Should deserialize"); - assert!(matches!( - updated_record.status, - MigrationStatus::Succeeded - )); + assert!(matches!(updated_record.status, MigrationStatus::Succeeded)); assert_eq!(updated_record.attempts, 2); // Should increment from 1 to 2 } @@ -1309,8 +1316,10 @@ mod tests { let json = serde_json::to_string(&record).unwrap(); kv_store.set(key.clone(), json).unwrap(); - let controller = - MigrationController::with_processors(kv_store.clone(), vec![processor.clone()]); + let controller = MigrationController::with_processors( + kv_store.clone(), + vec![processor.clone()], + ); // Run migrations - should execute immediately let result = controller.run_migrations().await; @@ -1326,10 +1335,7 @@ mod tests { let record_json = kv_store.get(key).expect("Record should exist"); let updated_record: MigrationRecord = serde_json::from_str(&record_json).expect("Should deserialize"); - assert!(matches!( - updated_record.status, - MigrationStatus::Succeeded - )); + assert!(matches!(updated_record.status, MigrationStatus::Succeeded)); } #[tokio::test] @@ -1348,8 +1354,10 @@ mod tests { let json = serde_json::to_string(&record).unwrap(); kv_store.set(key.clone(), json).unwrap(); - let controller = - MigrationController::with_processors(kv_store.clone(), vec![processor.clone()]); + let controller = MigrationController::with_processors( + kv_store.clone(), + vec![processor.clone()], + ); // Run migrations - should treat as stale and retry let result = controller.run_migrations().await; @@ -1365,10 +1373,7 @@ mod tests { let record_json = kv_store.get(key).expect("Record should exist"); let updated_record: MigrationRecord = serde_json::from_str(&record_json).expect("Should deserialize"); - assert!(matches!( - updated_record.status, - MigrationStatus::Succeeded - )); + assert!(matches!(updated_record.status, MigrationStatus::Succeeded)); } #[tokio::test] From c5e3c2bab59e35dc6187e0c52ccb622326332572 Mon Sep 17 00:00:00 2001 From: Tom Waite Date: Wed, 4 Feb 2026 17:26:37 -0800 Subject: [PATCH 17/18] feat: import logger, add first logs --- bedrock/src/migration/controller.rs | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/bedrock/src/migration/controller.rs b/bedrock/src/migration/controller.rs index 749ee629..9e40684d 100644 --- a/bedrock/src/migration/controller.rs +++ b/bedrock/src/migration/controller.rs @@ -2,8 +2,9 @@ use crate::migration::error::MigrationError; use crate::migration::processor::{MigrationProcessor, ProcessorResult}; use crate::migration::state::{MigrationRecord, MigrationStatus}; use crate::primitives::key_value_store::{DeviceKeyValueStore, KeyValueStoreError}; +use crate::primitives::logger::LogContext; use chrono::{Duration, Utc}; -use log::{error, info, warn}; +use log::warn; use once_cell::sync::Lazy; use std::sync::Arc; use tokio::sync::Mutex; @@ -145,7 +146,16 @@ impl MigrationController { async fn run_migrations_async( &self, ) -> Result { - info!("Migration run started"); + let _ctx = LogContext::new("MigrationController"); + + // Store start time for duration tracking + let run_start_time = Utc::now(); + + crate::info!( + "migration_run.started total_processors={} timestamp={}", + self.processors.len(), + run_start_time.to_rfc3339() + ); // Summary of this migration run for analytics. Not stored. let mut summary = MigrationRunSummary { @@ -167,14 +177,22 @@ impl MigrationController { let should_attempt = match record.status { MigrationStatus::Succeeded => { // Terminal state - migration completed successfully - info!("Migration {migration_id} already succeeded, skipping"); + crate::info!( + "migration.skipped id={} reason=already_succeeded timestamp={}", + migration_id, + Utc::now().to_rfc3339() + ); summary.skipped += 1; false } MigrationStatus::FailedTerminal => { // Terminal state - migration failed permanently - info!("Migration {migration_id} failed terminally, skipping"); + crate::info!( + "migration.skipped id={} reason=terminal_failure timestamp={}", + migration_id, + Utc::now().to_rfc3339() + ); summary.skipped += 1; false } From 950a626e08a95ea90e2ea844d35a14bc6120e76e Mon Sep 17 00:00:00 2001 From: Tom Waite Date: Wed, 4 Feb 2026 17:30:12 -0800 Subject: [PATCH 18/18] feat: add full logs --- bedrock/src/migration/controller.rs | 109 ++++++++++++++++++++++------ 1 file changed, 86 insertions(+), 23 deletions(-) diff --git a/bedrock/src/migration/controller.rs b/bedrock/src/migration/controller.rs index 9e40684d..617152e3 100644 --- a/bedrock/src/migration/controller.rs +++ b/bedrock/src/migration/controller.rs @@ -207,8 +207,12 @@ impl MigrationController { }); if is_stale { - warn!( - "Migration {migration_id} stuck in InProgress, resetting to retryable" + crate::warn!( + "migration.stale_recovery id={} last_attempted={} elapsed_mins={} timestamp={}", + migration_id, + record.last_attempted_at.map(|t| t.to_rfc3339()).unwrap_or_default(), + STALE_IN_PROGRESS_MINS, + Utc::now().to_rfc3339() ); record.status = MigrationStatus::FailedRetryable; record.last_error_code = Some("STALE_IN_PROGRESS".to_string()); @@ -222,8 +226,10 @@ impl MigrationController { true // Proceed to retry } else { // Fresh InProgress - skip to avoid concurrent execution - info!( - "Migration {migration_id} currently in progress, skipping" + crate::info!( + "migration.skipped id={} reason=in_progress timestamp={}", + migration_id, + Utc::now().to_rfc3339() ); summary.skipped += 1; false @@ -235,7 +241,12 @@ impl MigrationController { // No retry time set = attempt immediately record.next_attempt_at.is_none_or(|next_attempt| { if Utc::now() < next_attempt { - info!("Migration {migration_id} scheduled for retry at {next_attempt}, skipping"); + crate::info!( + "migration.skipped id={} reason=retry_backoff next_attempt={} timestamp={}", + migration_id, + next_attempt.to_rfc3339(), + Utc::now().to_rfc3339() + ); summary.skipped += 1; false } else { @@ -266,17 +277,28 @@ impl MigrationController { match is_applicable_result { Ok(Ok(false)) => { - info!("Migration {migration_id} not applicable, skipping"); + crate::info!( + "migration.skipped id={} reason=not_applicable timestamp={}", + migration_id, + Utc::now().to_rfc3339() + ); summary.skipped += 1; continue; } Ok(Err(e)) => { - error!("Failed to check applicability for {migration_id}: {e:?}"); + crate::error!( + "Failed to check applicability for {migration_id}: {e:?}" + ); summary.skipped += 1; continue; } Err(_) => { - error!("Migration {migration_id} is_applicable() timed out after {MIGRATION_TIMEOUT_SECS} seconds, skipping"); + crate::error!( + "migration.applicability_timeout id={} timeout_secs={} timestamp={}", + migration_id, + MIGRATION_TIMEOUT_SECS, + Utc::now().to_rfc3339() + ); summary.skipped += 1; continue; } @@ -286,10 +308,11 @@ impl MigrationController { } // Execute the migration - info!( - "Starting migration: {} (attempt {})", + crate::info!( + "migration.started id={} attempt={} timestamp={}", migration_id, - record.attempts + 1 + record.attempts + 1, + Utc::now().to_rfc3339() ); // Update record for execution @@ -313,7 +336,13 @@ impl MigrationController { match execute_result { Err(_) => { // Timeout occurred - error!("Migration {migration_id} timed out after {MIGRATION_TIMEOUT_SECS} seconds"); + crate::error!( + "migration.timeout id={} attempt={} timeout_secs={} timestamp={}", + migration_id, + record.attempts, + MIGRATION_TIMEOUT_SECS, + Utc::now().to_rfc3339() + ); record.status = MigrationStatus::FailedRetryable; record.last_error_code = Some("MIGRATION_TIMEOUT".to_string()); record.last_error_message = @@ -328,7 +357,17 @@ impl MigrationController { } Ok(result) => match result { Ok(ProcessorResult::Success) => { - info!("Migration {migration_id} succeeded"); + let duration_ms = record + .started_at + .map_or(0, |start| (Utc::now() - start).num_milliseconds()); + + crate::info!( + "migration.succeeded id={} attempts={} duration_ms={} timestamp={}", + migration_id, + record.attempts, + duration_ms, + Utc::now().to_rfc3339() + ); record.status = MigrationStatus::Succeeded; record.completed_at = Some(Utc::now()); record.last_error_code = None; @@ -341,11 +380,6 @@ impl MigrationController { error_message, retry_after_ms, }) => { - warn!("Migration {migration_id} failed (retryable): {error_code} - {error_message}"); - record.status = MigrationStatus::FailedRetryable; - record.last_error_code = Some(error_code); - record.last_error_message = Some(error_message); - // Retry time is calculated according to exponential backoff and set on the // record.next_attempt_at field. When the app is next opened and the // migration is run again; the controller will check whether to run the @@ -356,13 +390,34 @@ impl MigrationController { record.next_attempt_at = Some(Utc::now() + Duration::milliseconds(retry_delay_ms)); + crate::warn!( + "migration.failed_retryable id={} attempt={} error_code={} error_message={} retry_delay_ms={} next_attempt={} timestamp={}", + migration_id, + record.attempts, + error_code, + error_message, + retry_delay_ms, + record.next_attempt_at.map(|t| t.to_rfc3339()).unwrap_or_default(), + Utc::now().to_rfc3339() + ); + record.status = MigrationStatus::FailedRetryable; + record.last_error_code = Some(error_code); + record.last_error_message = Some(error_message); + summary.failed_retryable += 1; } Ok(ProcessorResult::Terminal { error_code, error_message, }) => { - error!("Migration {migration_id} failed (terminal): {error_code} - {error_message}"); + crate::error!( + "migration.failed_terminal id={} attempt={} error_code={} error_message={} timestamp={}", + migration_id, + record.attempts, + error_code, + error_message, + Utc::now().to_rfc3339() + ); record.status = MigrationStatus::FailedTerminal; record.last_error_code = Some(error_code); record.last_error_message = Some(error_message); @@ -370,7 +425,7 @@ impl MigrationController { summary.failed_terminal += 1; } Err(e) => { - error!("Migration {migration_id} threw error: {e:?}"); + crate::error!("Migration {migration_id} threw error: {e:?}"); record.status = MigrationStatus::FailedRetryable; record.last_error_code = Some("UNEXPECTED_ERROR".to_string()); record.last_error_message = Some(format!("{e:?}")); @@ -389,9 +444,17 @@ impl MigrationController { self.save_record(&migration_id, &record)?; } - info!( - "Migration run completed: {} succeeded, {} retryable, {} terminal, {} skipped", - summary.succeeded, summary.failed_retryable, summary.failed_terminal, summary.skipped + let run_duration_ms = (Utc::now() - run_start_time).num_milliseconds(); + + crate::info!( + "migration_run.completed total={} succeeded={} failed_retryable={} failed_terminal={} skipped={} duration_ms={} timestamp={}", + summary.total, + summary.succeeded, + summary.failed_retryable, + summary.failed_terminal, + summary.skipped, + run_duration_ms, + Utc::now().to_rfc3339() ); Ok(summary)