diff --git a/bedrock/src/migration/README.md b/bedrock/src/migration/README.md new file mode 100644 index 00000000..f3e25bc4 --- /dev/null +++ b/bedrock/src/migration/README.md @@ -0,0 +1,50 @@ +## Migration Controller +The Migration `controller.rs` is a simple state machine that runs a for loop over a series of processors and executes the processors. The processors contain logic around performing an individual migration and conform to a simple interface: + +```rust +trait Process { + /// Determines whether the migration should run. + fn is_applicable(&self) -> bool; + + /// Business logic that performs the migration. + fn execute(&self) -> Result<(), MigrationError>; +} +``` + +The migration system is a permanent artifact of the app and is run on app start to bring the app to a expected state. The processors are expected to be idempotent. + +## States +The `controller.rs` stores a key value mapping between the id of the migration and a record of the migration. The record most importantly contains the status of the migration, but also useful monitoring and debug information such as `started_at`, `last_attempted_at`. + +The possible states are: +- `NotStarted` - migration has not been performed +- `InProgress` - migration started, but was interrupted +- `Succeeded` - migration successfully completed +- `FailedRetryable` - migration failed, but can be retried (e.g. there was a network error) +- `FailedTerminal` - migration failed and represents a terminal state. It can not be retried. + +The migration state storage optimizes subsequent app starts by skipping `Succeeded` and `FailedTerminal` migrations without calling `process.is_applicable()`. For `NotStarted` and `FailedRetryable` migrations, `process.is_applicable()` is called each time to detect when they become applicable. This ensures migrations can respond to changing app state. + +## State transitions +1. `NotStarted` + - → `InProgress` when `is_applicable()` returns true and migration execution begins + - Remains `NotStarted` if `is_applicable()` returns false (will be checked again on next app start) + - → `FailedRetryable` if `is_applicable()` fails or times out + +2. `InProgress` + - → `Succeeded` when `execute()` completes successfully + - → `FailedRetryable` if `execute()` times out or fails with retryable error + - → `FailedTerminal` if `execute()` fails with terminal error + - → `FailedRetryable` if detected as stale (app crashed mid-migration) + +3. `Succeeded` + - Terminal state. No further transitions. Migration is skipped on subsequent runs. + +4. `FailedRetryable` + - → `InProgress` when retry is attempted (after exponential backoff period) + - → `Succeeded` if retry succeeds + - → `FailedTerminal` if retry fails with terminal error + - Remains `FailedRetryable` if retry fails again with retryable error (backoff period increases) + +5. `FailedTerminal` + - Terminal state. No further transitions. Migration is permanently skipped on subsequent runs. diff --git a/bedrock/src/migration/controller.rs b/bedrock/src/migration/controller.rs index 6e540818..617152e3 100644 --- a/bedrock/src/migration/controller.rs +++ b/bedrock/src/migration/controller.rs @@ -2,8 +2,9 @@ use crate::migration::error::MigrationError; use crate::migration::processor::{MigrationProcessor, ProcessorResult}; use crate::migration::state::{MigrationRecord, MigrationStatus}; use crate::primitives::key_value_store::{DeviceKeyValueStore, KeyValueStoreError}; +use crate::primitives::logger::LogContext; use chrono::{Duration, Utc}; -use log::{error, info, warn}; +use log::warn; use once_cell::sync::Lazy; use std::sync::Arc; use tokio::sync::Mutex; @@ -12,6 +13,17 @@ const MIGRATION_KEY_PREFIX: &str = "migration:"; const DEFAULT_RETRY_DELAY_MS: i64 = 60_000; // 1 minute const MAX_RETRY_DELAY_MS: i64 = 86_400_000; // 1 day +#[cfg(not(test))] +const MIGRATION_TIMEOUT_SECS: u64 = 20; // 20 seconds in production +#[cfg(test)] +const MIGRATION_TIMEOUT_SECS: u64 = 1; // 1 second in tests + +// Consider InProgress migrations stale after this duration (indicates crash/abandon) +#[cfg(not(test))] +const STALE_IN_PROGRESS_MINS: i64 = 5; // 5 minutes in production +#[cfg(test)] +const STALE_IN_PROGRESS_MINS: i64 = 0; // Immediately stale in tests (for testing) + /// Global lock to prevent concurrent migration runs across all controller instances. /// This is a process-wide coordination mechanism that ensures only one migration /// can execute at a time, regardless of how many [`MigrationController`] instances exist. @@ -34,8 +46,6 @@ pub struct MigrationRunSummary { pub failed_retryable: i32, /// Number of migrations that failed with terminal errors (won't retry) pub failed_terminal: i32, - /// Number of migrations blocked pending user action - pub blocked: i32, /// Number of migrations that were skipped (already completed or not applicable) pub skipped: i32, } @@ -91,6 +101,16 @@ impl MigrationController { /// If another migration is already in progress when this method is called, it will return /// immediately with an `InvalidOperation` error rather than waiting. /// + /// # Timeouts + /// + /// Each migration is subject to a 20-second timeout for both applicability checks and execution. + /// If a migration times out: + /// - During `is_applicable()`: The migration is skipped + /// - During `execute()`: The migration is marked as retryable and scheduled for retry with exponential backoff + /// + /// Subsequent migrations will continue regardless of timeouts, ensuring one slow migration + /// doesn't block the entire migration system. + /// /// # Errors /// /// Returns `MigrationError::InvalidOperation` if another migration run is already in progress. @@ -126,7 +146,16 @@ impl MigrationController { async fn run_migrations_async( &self, ) -> Result { - info!("Migration run started"); + let _ctx = LogContext::new("MigrationController"); + + // Store start time for duration tracking + let run_start_time = Utc::now(); + + crate::info!( + "migration_run.started total_processors={} timestamp={}", + self.processors.len(), + run_start_time.to_rfc3339() + ); // Summary of this migration run for analytics. Not stored. let mut summary = MigrationRunSummary { @@ -134,7 +163,6 @@ impl MigrationController { succeeded: 0, failed_retryable: 0, failed_terminal: 0, - blocked: 0, skipped: 0, }; @@ -145,54 +173,146 @@ impl MigrationController { // Load the current record for this migration (or create new one if first time) let mut record = self.load_record(&migration_id)?; - // Skip if already succeeded - if matches!(record.status, MigrationStatus::Succeeded) { - info!("Migration {migration_id} already succeeded, skipping"); - summary.skipped += 1; - continue; - } - - // Skip if terminal failure (non-retryable) - if matches!(record.status, MigrationStatus::FailedTerminal) { - info!("Migration {migration_id} failed terminally, skipping"); - summary.skipped += 1; - continue; - } + // Determine if this migration should be attempted based on its current status + let should_attempt = match record.status { + MigrationStatus::Succeeded => { + // Terminal state - migration completed successfully + crate::info!( + "migration.skipped id={} reason=already_succeeded timestamp={}", + migration_id, + Utc::now().to_rfc3339() + ); + summary.skipped += 1; + false + } - // Check if we should retry (based on next_attempt_at) - if let Some(next_attempt) = record.next_attempt_at { - let now = Utc::now(); - if now < next_attempt { - info!("Migration {migration_id} scheduled for retry at {next_attempt}, skipping"); + MigrationStatus::FailedTerminal => { + // Terminal state - migration failed permanently + crate::info!( + "migration.skipped id={} reason=terminal_failure timestamp={}", + migration_id, + Utc::now().to_rfc3339() + ); summary.skipped += 1; - continue; + false + } + + MigrationStatus::InProgress => { + // Check for staleness (app crash recovery) + // InProgress without timestamp is treated as stale + let is_stale = + record.last_attempted_at.is_none_or(|last_attempted| { + let elapsed = Utc::now() - last_attempted; + elapsed > Duration::minutes(STALE_IN_PROGRESS_MINS) + }); + + if is_stale { + crate::warn!( + "migration.stale_recovery id={} last_attempted={} elapsed_mins={} timestamp={}", + migration_id, + record.last_attempted_at.map(|t| t.to_rfc3339()).unwrap_or_default(), + STALE_IN_PROGRESS_MINS, + Utc::now().to_rfc3339() + ); + record.status = MigrationStatus::FailedRetryable; + record.last_error_code = Some("STALE_IN_PROGRESS".to_string()); + record.last_error_message = Some( + "Migration was stuck in InProgress state, likely due to app crash" + .to_string(), + ); + // Schedule immediate retry + record.next_attempt_at = Some(Utc::now()); + self.save_record(&migration_id, &record)?; + true // Proceed to retry + } else { + // Fresh InProgress - skip to avoid concurrent execution + crate::info!( + "migration.skipped id={} reason=in_progress timestamp={}", + migration_id, + Utc::now().to_rfc3339() + ); + summary.skipped += 1; + false + } + } + + MigrationStatus::FailedRetryable => { + // Check retry timing (exponential backoff) + // No retry time set = attempt immediately + record.next_attempt_at.is_none_or(|next_attempt| { + if Utc::now() < next_attempt { + crate::info!( + "migration.skipped id={} reason=retry_backoff next_attempt={} timestamp={}", + migration_id, + next_attempt.to_rfc3339(), + Utc::now().to_rfc3339() + ); + summary.skipped += 1; + false + } else { + true // Ready to retry + } + }) + } + + MigrationStatus::NotStarted => { + // First time attempting this migration + true } + }; + + if !should_attempt { + continue; } // Check if migration is applicable and should run, based on processor defined logic. // This checks actual state (e.g., "does v4 credential exist?") to ensure idempotency // even if migration record is deleted (reinstall scenario). - match processor.is_applicable().await { - Ok(false) => { - info!("Migration {migration_id} not applicable, skipping"); + // Wrap in timeout to prevent hanging indefinitely + let is_applicable_result = tokio::time::timeout( + tokio::time::Duration::from_secs(MIGRATION_TIMEOUT_SECS), + processor.is_applicable(), + ) + .await; + + match is_applicable_result { + Ok(Ok(false)) => { + crate::info!( + "migration.skipped id={} reason=not_applicable timestamp={}", + migration_id, + Utc::now().to_rfc3339() + ); + summary.skipped += 1; + continue; + } + Ok(Err(e)) => { + crate::error!( + "Failed to check applicability for {migration_id}: {e:?}" + ); summary.skipped += 1; continue; } - Err(e) => { - error!("Failed to check applicability for {migration_id}: {e:?}"); + Err(_) => { + crate::error!( + "migration.applicability_timeout id={} timeout_secs={} timestamp={}", + migration_id, + MIGRATION_TIMEOUT_SECS, + Utc::now().to_rfc3339() + ); summary.skipped += 1; continue; } - Ok(true) => { + Ok(Ok(true)) => { // Continue with execution. Fall through to the execution block below. } } // Execute the migration - info!( - "Starting migration: {} (attempt {})", + crate::info!( + "migration.started id={} attempt={} timestamp={}", migration_id, - record.attempts + 1 + record.attempts + 1, + Utc::now().to_rfc3339() ); // Update record for execution @@ -206,61 +326,27 @@ impl MigrationController { // Save record before execution so we don't lose progress if the app crashes mid-migration self.save_record(&migration_id, &record)?; - // Execute - match processor.execute().await { - Ok(ProcessorResult::Success) => { - info!("Migration {migration_id} succeeded"); - record.status = MigrationStatus::Succeeded; - record.completed_at = Some(Utc::now()); - record.last_error_code = None; - record.last_error_message = None; - record.next_attempt_at = None; // Clear retry time - summary.succeeded += 1; - } - Ok(ProcessorResult::Retryable { - error_code, - error_message, - retry_after_ms, - }) => { - warn!("Migration {migration_id} failed (retryable): {error_code} - {error_message}"); - record.status = MigrationStatus::FailedRetryable; - record.last_error_code = Some(error_code); - record.last_error_message = Some(error_message); - - // Retry time is calculated according to exponential backoff and set on the - // record.next_attempt_at field. When the app is next opened and the - // migration is run again; the controller will check whether to run the - // migration again based on the record.next_attempt_at field. - let retry_delay_ms = retry_after_ms - .unwrap_or_else(|| calculate_backoff_delay(record.attempts)); - record.next_attempt_at = - Some(Utc::now() + Duration::milliseconds(retry_delay_ms)); - - summary.failed_retryable += 1; - } - Ok(ProcessorResult::Terminal { - error_code, - error_message, - }) => { - error!("Migration {migration_id} failed (terminal): {error_code} - {error_message}"); - record.status = MigrationStatus::FailedTerminal; - record.last_error_code = Some(error_code); - record.last_error_message = Some(error_message); - record.next_attempt_at = None; // Clear retry time - summary.failed_terminal += 1; - } - Ok(ProcessorResult::BlockedUserAction { reason }) => { - warn!("Migration {migration_id} blocked: {reason}"); - record.status = MigrationStatus::BlockedUserAction; - record.last_error_message = Some(reason); - record.next_attempt_at = None; // Clear retry time - summary.blocked += 1; - } - Err(e) => { - error!("Migration {migration_id} threw error: {e:?}"); + // Execute with timeout to prevent indefinite hangs + let execute_result = tokio::time::timeout( + tokio::time::Duration::from_secs(MIGRATION_TIMEOUT_SECS), + processor.execute(), + ) + .await; + + match execute_result { + Err(_) => { + // Timeout occurred + crate::error!( + "migration.timeout id={} attempt={} timeout_secs={} timestamp={}", + migration_id, + record.attempts, + MIGRATION_TIMEOUT_SECS, + Utc::now().to_rfc3339() + ); record.status = MigrationStatus::FailedRetryable; - record.last_error_code = Some("UNEXPECTED_ERROR".to_string()); - record.last_error_message = Some(format!("{e:?}")); + record.last_error_code = Some("MIGRATION_TIMEOUT".to_string()); + record.last_error_message = + Some(format!("Migration execution timed out after {MIGRATION_TIMEOUT_SECS} seconds")); // Schedule retry with backoff let retry_delay_ms = calculate_backoff_delay(record.attempts); @@ -269,15 +355,106 @@ impl MigrationController { summary.failed_retryable += 1; } + Ok(result) => match result { + Ok(ProcessorResult::Success) => { + let duration_ms = record + .started_at + .map_or(0, |start| (Utc::now() - start).num_milliseconds()); + + crate::info!( + "migration.succeeded id={} attempts={} duration_ms={} timestamp={}", + migration_id, + record.attempts, + duration_ms, + Utc::now().to_rfc3339() + ); + record.status = MigrationStatus::Succeeded; + record.completed_at = Some(Utc::now()); + record.last_error_code = None; + record.last_error_message = None; + record.next_attempt_at = None; // Clear retry time + summary.succeeded += 1; + } + Ok(ProcessorResult::Retryable { + error_code, + error_message, + retry_after_ms, + }) => { + // Retry time is calculated according to exponential backoff and set on the + // record.next_attempt_at field. When the app is next opened and the + // migration is run again; the controller will check whether to run the + // migration again based on the record.next_attempt_at field. + let retry_delay_ms = retry_after_ms.unwrap_or_else(|| { + calculate_backoff_delay(record.attempts) + }); + record.next_attempt_at = + Some(Utc::now() + Duration::milliseconds(retry_delay_ms)); + + crate::warn!( + "migration.failed_retryable id={} attempt={} error_code={} error_message={} retry_delay_ms={} next_attempt={} timestamp={}", + migration_id, + record.attempts, + error_code, + error_message, + retry_delay_ms, + record.next_attempt_at.map(|t| t.to_rfc3339()).unwrap_or_default(), + Utc::now().to_rfc3339() + ); + record.status = MigrationStatus::FailedRetryable; + record.last_error_code = Some(error_code); + record.last_error_message = Some(error_message); + + summary.failed_retryable += 1; + } + Ok(ProcessorResult::Terminal { + error_code, + error_message, + }) => { + crate::error!( + "migration.failed_terminal id={} attempt={} error_code={} error_message={} timestamp={}", + migration_id, + record.attempts, + error_code, + error_message, + Utc::now().to_rfc3339() + ); + record.status = MigrationStatus::FailedTerminal; + record.last_error_code = Some(error_code); + record.last_error_message = Some(error_message); + record.next_attempt_at = None; // Clear retry time + summary.failed_terminal += 1; + } + Err(e) => { + crate::error!("Migration {migration_id} threw error: {e:?}"); + record.status = MigrationStatus::FailedRetryable; + record.last_error_code = Some("UNEXPECTED_ERROR".to_string()); + record.last_error_message = Some(format!("{e:?}")); + + // Schedule retry with backoff + let retry_delay_ms = calculate_backoff_delay(record.attempts); + record.next_attempt_at = + Some(Utc::now() + Duration::milliseconds(retry_delay_ms)); + + summary.failed_retryable += 1; + } + }, } // Save the final result (success/failure) to storage self.save_record(&migration_id, &record)?; } - info!( - "Migration run completed: {} succeeded, {} retryable, {} terminal, {} blocked, {} skipped", - summary.succeeded, summary.failed_retryable, summary.failed_terminal, summary.blocked, summary.skipped + let run_duration_ms = (Utc::now() - run_start_time).num_milliseconds(); + + crate::info!( + "migration_run.completed total={} succeeded={} failed_retryable={} failed_terminal={} skipped={} duration_ms={} timestamp={}", + summary.total, + summary.succeeded, + summary.failed_retryable, + summary.failed_terminal, + summary.skipped, + run_duration_ms, + Utc::now().to_rfc3339() ); Ok(summary) @@ -725,4 +902,640 @@ mod tests { // Processor should only execute once assert_eq!(processor.execution_count(), 1); } + + /// Test processor that hangs indefinitely during execute + struct HangingExecuteProcessor { + id: String, + } + + impl HangingExecuteProcessor { + fn new(id: &str) -> Self { + Self { id: id.to_string() } + } + } + + #[async_trait] + impl MigrationProcessor for HangingExecuteProcessor { + fn migration_id(&self) -> String { + self.id.clone() + } + + async fn is_applicable(&self) -> Result { + Ok(true) + } + + async fn execute(&self) -> Result { + // Hang longer than the test timeout (1 second) + sleep(Duration::from_secs(10)).await; + Ok(ProcessorResult::Success) + } + } + + /// Test processor that hangs indefinitely during `is_applicable` check + struct HangingApplicabilityProcessor { + id: String, + } + + impl HangingApplicabilityProcessor { + fn new(id: &str) -> Self { + Self { id: id.to_string() } + } + } + + #[async_trait] + impl MigrationProcessor for HangingApplicabilityProcessor { + fn migration_id(&self) -> String { + self.id.clone() + } + + async fn is_applicable(&self) -> Result { + // Hang longer than the test timeout (1 second) + sleep(Duration::from_secs(10)).await; + Ok(true) + } + + async fn execute(&self) -> Result { + Ok(ProcessorResult::Success) + } + } + + #[tokio::test] + #[serial] + async fn test_execute_timeout_marks_migration_as_retryable() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + let processor = Arc::new(HangingExecuteProcessor::new("test.migration.v1")); + let controller = + MigrationController::with_processors(kv_store.clone(), vec![processor]); + + // Run migrations - should timeout + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let summary = result.unwrap(); + assert_eq!(summary.total, 1); + assert_eq!(summary.failed_retryable, 1); + assert_eq!(summary.succeeded, 0); + + // Verify the migration record was saved with timeout error + let key = format!("{MIGRATION_KEY_PREFIX}test.migration.v1"); + let record_json = kv_store.get(key).expect("Record should exist"); + let record: MigrationRecord = + serde_json::from_str(&record_json).expect("Should deserialize"); + + assert!(matches!(record.status, MigrationStatus::FailedRetryable)); + assert_eq!( + record.last_error_code, + Some("MIGRATION_TIMEOUT".to_string()) + ); + assert!(record + .last_error_message + .unwrap() + .contains("timed out after")); + assert!(record.next_attempt_at.is_some()); // Should schedule retry + assert_eq!(record.attempts, 1); + } + + #[tokio::test] + #[serial] + async fn test_is_applicable_timeout_skips_migration() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + let processor = + Arc::new(HangingApplicabilityProcessor::new("test.migration.v1")); + let controller = + MigrationController::with_processors(kv_store.clone(), vec![processor]); + + // Run migrations - should skip due to is_applicable timeout + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let summary = result.unwrap(); + assert_eq!(summary.total, 1); + assert_eq!(summary.skipped, 1); + assert_eq!(summary.succeeded, 0); + assert_eq!(summary.failed_retryable, 0); + + // Verify the migration record was NOT created or updated (skipped before execution) + let key = format!("{MIGRATION_KEY_PREFIX}test.migration.v1"); + let record_result = kv_store.get(key); + // Record might exist from attempt to check applicability, but should not show InProgress + if let Ok(record_json) = record_result { + let record: MigrationRecord = + serde_json::from_str(&record_json).expect("Should deserialize"); + // Should not be InProgress or Succeeded + assert!( + !matches!(record.status, MigrationStatus::InProgress) + && !matches!(record.status, MigrationStatus::Succeeded) + ); + } + } + + #[tokio::test] + #[serial] + async fn test_timeout_does_not_block_subsequent_migrations() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + + // First processor will timeout, second should succeed + let processor1 = Arc::new(HangingExecuteProcessor::new("test.migration1.v1")); + let processor2 = Arc::new(TestProcessor::new("test.migration2.v1")); + + let controller = MigrationController::with_processors( + kv_store.clone(), + vec![processor1, processor2.clone()], + ); + + // Run migrations + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let summary = result.unwrap(); + assert_eq!(summary.total, 2); + assert_eq!(summary.failed_retryable, 1); // First migration timed out + assert_eq!(summary.succeeded, 1); // Second migration succeeded + + // Verify first migration is marked as retryable + let key1 = format!("{MIGRATION_KEY_PREFIX}test.migration1.v1"); + let record1_json = kv_store.get(key1).expect("Migration 1 record should exist"); + let record1: MigrationRecord = + serde_json::from_str(&record1_json).expect("Should deserialize"); + assert!(matches!(record1.status, MigrationStatus::FailedRetryable)); + assert_eq!( + record1.last_error_code, + Some("MIGRATION_TIMEOUT".to_string()) + ); + + // Verify second migration succeeded + let key2 = format!("{MIGRATION_KEY_PREFIX}test.migration2.v1"); + let record2_json = kv_store.get(key2).expect("Migration 2 record should exist"); + let record2: MigrationRecord = + serde_json::from_str(&record2_json).expect("Should deserialize"); + assert!(matches!(record2.status, MigrationStatus::Succeeded)); + + // Verify second processor actually executed + assert_eq!(processor2.execution_count(), 1); + } + + #[tokio::test] + #[serial] + async fn test_timeout_with_exponential_backoff() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + let processor = Arc::new(HangingExecuteProcessor::new("test.migration.v1")); + let controller = + MigrationController::with_processors(kv_store.clone(), vec![processor]); + + // Run migrations multiple times to test backoff + for attempt in 1..=3 { + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let key = format!("{MIGRATION_KEY_PREFIX}test.migration.v1"); + let record_json = kv_store.get(key.clone()).expect("Record should exist"); + let record: MigrationRecord = + serde_json::from_str(&record_json).expect("Should deserialize"); + + assert_eq!(record.attempts, attempt); + assert!(record.next_attempt_at.is_some()); + + // Clear next_attempt_at to allow immediate retry in test + let mut record_for_retry = record; + record_for_retry.next_attempt_at = None; + let json = serde_json::to_string(&record_for_retry).unwrap(); + kv_store.set(key, json).unwrap(); + } + } + + #[tokio::test] + #[serial] + async fn test_stale_in_progress_resets_and_retries() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + let processor = Arc::new(TestProcessor::new("test.migration.v1")); + + // Create a stale InProgress record (simulating app crash) + let mut record = MigrationRecord::new(); + record.status = MigrationStatus::InProgress; + record.attempts = 1; + // Set last_attempted_at to 10 minutes ago (stale) + record.last_attempted_at = Some(Utc::now() - chrono::Duration::minutes(10)); + + let key = format!("{MIGRATION_KEY_PREFIX}test.migration.v1"); + let json = serde_json::to_string(&record).unwrap(); + kv_store.set(key.clone(), json).unwrap(); + + let controller = MigrationController::with_processors( + kv_store.clone(), + vec![processor.clone()], + ); + + // Run migrations - should detect staleness and retry + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let summary = result.unwrap(); + assert_eq!(summary.total, 1); + assert_eq!(summary.succeeded, 1); // Should succeed after reset + assert_eq!(summary.failed_retryable, 0); + + // Verify the migration was executed + assert_eq!(processor.execution_count(), 1); + + // Verify the final status is Succeeded + let record_json = kv_store.get(key).expect("Record should exist"); + let updated_record: MigrationRecord = + serde_json::from_str(&record_json).expect("Should deserialize"); + assert!(matches!(updated_record.status, MigrationStatus::Succeeded)); + // Attempt counter should have incremented + assert_eq!(updated_record.attempts, 2); + } + + #[tokio::test] + #[serial] + async fn test_fresh_in_progress_not_treated_as_stale() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + let processor = Arc::new(TestProcessor::new("test.migration.v1")); + + // Create a fresh InProgress record (recent) + let mut record = MigrationRecord::new(); + record.status = MigrationStatus::InProgress; + record.attempts = 1; + // Set last_attempted_at to now (not stale) + record.last_attempted_at = Some(Utc::now()); + + let key = format!("{MIGRATION_KEY_PREFIX}test.migration.v1"); + let json = serde_json::to_string(&record).unwrap(); + kv_store.set(key.clone(), json).unwrap(); + + let controller = MigrationController::with_processors( + kv_store.clone(), + vec![processor.clone()], + ); + + // Run migrations - should treat as normal InProgress and retry + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let summary = result.unwrap(); + assert_eq!(summary.total, 1); + assert_eq!(summary.succeeded, 1); + + // Verify the migration was executed + assert_eq!(processor.execution_count(), 1); + } + + #[tokio::test] + #[serial] + async fn test_succeeded_state_is_terminal_and_skipped() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + let processor = Arc::new(TestProcessor::new("test.migration.v1")); + + // Manually create a Succeeded record + let mut record = MigrationRecord::new(); + record.status = MigrationStatus::Succeeded; + record.completed_at = Some(Utc::now()); + + let key = format!("{MIGRATION_KEY_PREFIX}test.migration.v1"); + let json = serde_json::to_string(&record).unwrap(); + kv_store.set(key.clone(), json).unwrap(); + + let controller = MigrationController::with_processors( + kv_store.clone(), + vec![processor.clone()], + ); + + // Run migrations - should skip + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let summary = result.unwrap(); + assert_eq!(summary.total, 1); + assert_eq!(summary.skipped, 1); + assert_eq!(summary.succeeded, 0); + + // Verify is_applicable() was NOT called (processor was never executed) + assert_eq!(processor.execution_count(), 0); + + // Verify status is still Succeeded + let record_json = kv_store.get(key).expect("Record should exist"); + let updated_record: MigrationRecord = + serde_json::from_str(&record_json).expect("Should deserialize"); + assert!(matches!(updated_record.status, MigrationStatus::Succeeded)); + } + + #[tokio::test] + #[serial] + async fn test_failed_terminal_state_is_permanent_and_skipped() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + let processor = Arc::new(TestProcessor::new("test.migration.v1")); + + // Manually create a FailedTerminal record + let mut record = MigrationRecord::new(); + record.status = MigrationStatus::FailedTerminal; + record.last_error_code = Some("TERMINAL_ERROR".to_string()); + record.last_error_message = Some("Permanent failure".to_string()); + + let key = format!("{MIGRATION_KEY_PREFIX}test.migration.v1"); + let json = serde_json::to_string(&record).unwrap(); + kv_store.set(key.clone(), json).unwrap(); + + let controller = MigrationController::with_processors( + kv_store.clone(), + vec![processor.clone()], + ); + + // Run migrations multiple times - should always skip + for _ in 0..3 { + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let summary = result.unwrap(); + assert_eq!(summary.skipped, 1); + assert_eq!(summary.succeeded, 0); + } + + // Verify processor was never executed + assert_eq!(processor.execution_count(), 0); + + // Verify status is still FailedTerminal + let record_json = kv_store.get(key).expect("Record should exist"); + let updated_record: MigrationRecord = + serde_json::from_str(&record_json).expect("Should deserialize"); + assert!(matches!( + updated_record.status, + MigrationStatus::FailedTerminal + )); + } + + #[tokio::test] + #[serial] + async fn test_not_started_state_checks_applicability_and_executes() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + let processor = Arc::new(TestProcessor::new("test.migration.v1")); + + let controller = MigrationController::with_processors( + kv_store.clone(), + vec![processor.clone()], + ); + + // Run migrations - NotStarted should check is_applicable and execute + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let summary = result.unwrap(); + assert_eq!(summary.total, 1); + assert_eq!(summary.succeeded, 1); + + // Verify processor was executed + assert_eq!(processor.execution_count(), 1); + + // Verify status transitioned to Succeeded + let key = format!("{MIGRATION_KEY_PREFIX}test.migration.v1"); + let record_json = kv_store.get(key).expect("Record should exist"); + let record: MigrationRecord = + serde_json::from_str(&record_json).expect("Should deserialize"); + assert!(matches!(record.status, MigrationStatus::Succeeded)); + assert_eq!(record.attempts, 1); + } + + #[tokio::test] + #[serial] + async fn test_failed_retryable_respects_backoff_timing() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + let processor = Arc::new(TestProcessor::new("test.migration.v1")); + + // Create FailedRetryable record with retry scheduled for future + let mut record = MigrationRecord::new(); + record.status = MigrationStatus::FailedRetryable; + record.attempts = 1; + record.last_error_code = Some("NETWORK_ERROR".to_string()); + record.next_attempt_at = Some(Utc::now() + chrono::Duration::hours(1)); // 1 hour in future + + let key = format!("{MIGRATION_KEY_PREFIX}test.migration.v1"); + let json = serde_json::to_string(&record).unwrap(); + kv_store.set(key.clone(), json).unwrap(); + + let controller = MigrationController::with_processors( + kv_store.clone(), + vec![processor.clone()], + ); + + // Run migrations - should skip due to retry timing + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let summary = result.unwrap(); + assert_eq!(summary.total, 1); + assert_eq!(summary.skipped, 1); + assert_eq!(summary.succeeded, 0); + + // Verify processor was NOT executed + assert_eq!(processor.execution_count(), 0); + + // Verify status is still FailedRetryable + let record_json = kv_store.get(key).expect("Record should exist"); + let updated_record: MigrationRecord = + serde_json::from_str(&record_json).expect("Should deserialize"); + assert!(matches!( + updated_record.status, + MigrationStatus::FailedRetryable + )); + assert_eq!(updated_record.attempts, 1); // Attempts should not increment + } + + #[tokio::test] + #[serial] + async fn test_failed_retryable_retries_when_backoff_expires() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + let processor = Arc::new(TestProcessor::new("test.migration.v1")); + + // Create FailedRetryable record with retry scheduled for past + let mut record = MigrationRecord::new(); + record.status = MigrationStatus::FailedRetryable; + record.attempts = 1; + record.last_error_code = Some("NETWORK_ERROR".to_string()); + record.next_attempt_at = Some(Utc::now() - chrono::Duration::minutes(5)); // 5 minutes ago + + let key = format!("{MIGRATION_KEY_PREFIX}test.migration.v1"); + let json = serde_json::to_string(&record).unwrap(); + kv_store.set(key.clone(), json).unwrap(); + + let controller = MigrationController::with_processors( + kv_store.clone(), + vec![processor.clone()], + ); + + // Run migrations - should retry and succeed + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let summary = result.unwrap(); + assert_eq!(summary.total, 1); + assert_eq!(summary.succeeded, 1); + assert_eq!(summary.skipped, 0); + + // Verify processor was executed + assert_eq!(processor.execution_count(), 1); + + // Verify status transitioned to Succeeded + let record_json = kv_store.get(key).expect("Record should exist"); + let updated_record: MigrationRecord = + serde_json::from_str(&record_json).expect("Should deserialize"); + assert!(matches!(updated_record.status, MigrationStatus::Succeeded)); + assert_eq!(updated_record.attempts, 2); // Should increment from 1 to 2 + } + + #[tokio::test] + #[serial] + async fn test_failed_retryable_without_retry_time_executes_immediately() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + let processor = Arc::new(TestProcessor::new("test.migration.v1")); + + // Create FailedRetryable record without next_attempt_at + let mut record = MigrationRecord::new(); + record.status = MigrationStatus::FailedRetryable; + record.attempts = 2; + record.next_attempt_at = None; // No retry time set + + let key = format!("{MIGRATION_KEY_PREFIX}test.migration.v1"); + let json = serde_json::to_string(&record).unwrap(); + kv_store.set(key.clone(), json).unwrap(); + + let controller = MigrationController::with_processors( + kv_store.clone(), + vec![processor.clone()], + ); + + // Run migrations - should execute immediately + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let summary = result.unwrap(); + assert_eq!(summary.succeeded, 1); + + // Verify processor was executed + assert_eq!(processor.execution_count(), 1); + + // Verify status transitioned to Succeeded + let record_json = kv_store.get(key).expect("Record should exist"); + let updated_record: MigrationRecord = + serde_json::from_str(&record_json).expect("Should deserialize"); + assert!(matches!(updated_record.status, MigrationStatus::Succeeded)); + } + + #[tokio::test] + #[serial] + async fn test_in_progress_without_timestamp_treated_as_stale() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + let processor = Arc::new(TestProcessor::new("test.migration.v1")); + + // Create InProgress record without last_attempted_at timestamp + let mut record = MigrationRecord::new(); + record.status = MigrationStatus::InProgress; + record.attempts = 1; + record.last_attempted_at = None; // No timestamp + + let key = format!("{MIGRATION_KEY_PREFIX}test.migration.v1"); + let json = serde_json::to_string(&record).unwrap(); + kv_store.set(key.clone(), json).unwrap(); + + let controller = MigrationController::with_processors( + kv_store.clone(), + vec![processor.clone()], + ); + + // Run migrations - should treat as stale and retry + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let summary = result.unwrap(); + assert_eq!(summary.succeeded, 1); + + // Verify processor was executed + assert_eq!(processor.execution_count(), 1); + + // Verify the record was reset and completed + let record_json = kv_store.get(key).expect("Record should exist"); + let updated_record: MigrationRecord = + serde_json::from_str(&record_json).expect("Should deserialize"); + assert!(matches!(updated_record.status, MigrationStatus::Succeeded)); + } + + #[tokio::test] + #[serial] + async fn test_state_based_execution_order() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + + // Create 5 processors with different states + let processor1 = Arc::new(TestProcessor::new("test.succeeded.v1")); + let processor2 = Arc::new(TestProcessor::new("test.terminal.v1")); + let processor3 = Arc::new(TestProcessor::new("test.retryable.v1")); + let processor4 = Arc::new(TestProcessor::new("test.in_progress.v1")); + let processor5 = Arc::new(TestProcessor::new("test.not_started.v1")); + + // Set up initial states + let mut succeeded = MigrationRecord::new(); + succeeded.status = MigrationStatus::Succeeded; + kv_store + .set( + format!("{MIGRATION_KEY_PREFIX}test.succeeded.v1"), + serde_json::to_string(&succeeded).unwrap(), + ) + .unwrap(); + + let mut terminal = MigrationRecord::new(); + terminal.status = MigrationStatus::FailedTerminal; + kv_store + .set( + format!("{MIGRATION_KEY_PREFIX}test.terminal.v1"), + serde_json::to_string(&terminal).unwrap(), + ) + .unwrap(); + + let mut retryable = MigrationRecord::new(); + retryable.status = MigrationStatus::FailedRetryable; + retryable.next_attempt_at = Some(Utc::now() - chrono::Duration::minutes(1)); // Ready to retry + kv_store + .set( + format!("{MIGRATION_KEY_PREFIX}test.retryable.v1"), + serde_json::to_string(&retryable).unwrap(), + ) + .unwrap(); + + let mut in_progress = MigrationRecord::new(); + in_progress.status = MigrationStatus::InProgress; + in_progress.last_attempted_at = Some(Utc::now()); + kv_store + .set( + format!("{MIGRATION_KEY_PREFIX}test.in_progress.v1"), + serde_json::to_string(&in_progress).unwrap(), + ) + .unwrap(); + + // NotStarted doesn't need setup - it's the default + + let controller = MigrationController::with_processors( + kv_store.clone(), + vec![ + processor1.clone(), + processor2.clone(), + processor3.clone(), + processor4.clone(), + processor5.clone(), + ], + ); + + // Run migrations + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let summary = result.unwrap(); + assert_eq!(summary.total, 5); + // In test mode (STALE_IN_PROGRESS_MINS=0), InProgress is treated as stale and executed + assert_eq!(summary.succeeded, 3); // retryable + in_progress + not_started + assert_eq!(summary.skipped, 2); // succeeded + terminal + + // Verify execution counts + assert_eq!(processor1.execution_count(), 0); // Succeeded - skipped + assert_eq!(processor2.execution_count(), 0); // Terminal - skipped + assert_eq!(processor3.execution_count(), 1); // Retryable - executed + assert_eq!(processor4.execution_count(), 1); // InProgress - treated as stale, executed + assert_eq!(processor5.execution_count(), 1); // NotStarted - executed + } } diff --git a/bedrock/src/migration/processor.rs b/bedrock/src/migration/processor.rs index 7c637fd0..b8b8a9c5 100644 --- a/bedrock/src/migration/processor.rs +++ b/bedrock/src/migration/processor.rs @@ -24,15 +24,24 @@ pub enum ProcessorResult { /// Human-readable error message error_message: String, }, - - /// Migration blocked pending user action - BlockedUserAction { - /// Reason why the migration is blocked - reason: String, - }, } /// Trait that all migration processors must implement +/// +/// # Timeouts and Cancellation Safety +/// +/// Both [`is_applicable`](Self::is_applicable) and [`execute`](Self::execute) are subject to timeouts +/// (20 seconds in production). When a timeout occurs, the future is dropped and the migration +/// is marked as failed (for `execute`) or skipped (for `is_applicable`). +/// +/// **IMPORTANT**: Implementations MUST be cancellation-safe: +/// +/// - **DO NOT** spawn background tasks using `tokio::spawn`, `std::thread::spawn`, or similar +/// that will continue running after the timeout +/// - **DO NOT** use blocking operations or FFI calls without proper cleanup +/// - **ENSURE** all work stops when the future is dropped (cooperative cancellation) +/// - **MAKE** migrations idempotent so partial execution can be safely retried +/// #[uniffi::export(with_foreign)] #[async_trait] pub trait MigrationProcessor: Send + Sync { @@ -46,7 +55,6 @@ pub trait MigrationProcessor: Send + Sync { /// to determine if the migration needs to run. This ensures the system is /// truly idempotent and handles edge cases gracefully. /// - /// /// # Returns /// - `Ok(true)` if the migration should run /// - `Ok(false)` if the migration should be skipped @@ -54,6 +62,5 @@ pub trait MigrationProcessor: Send + Sync { async fn is_applicable(&self) -> Result; /// Execute the migration - /// Called by the controller when the migration is ready to run async fn execute(&self) -> Result; } diff --git a/bedrock/src/migration/state.rs b/bedrock/src/migration/state.rs index 123ceca7..04645cf2 100644 --- a/bedrock/src/migration/state.rs +++ b/bedrock/src/migration/state.rs @@ -14,8 +14,6 @@ pub enum MigrationStatus { FailedRetryable, /// Migration failed with terminal error (won't retry) FailedTerminal, - /// Migration blocked pending user action - BlockedUserAction, } /// Record of a single migration's execution state