diff --git a/bedrock/src/migration/controller.rs b/bedrock/src/migration/controller.rs index 6e540818..5364e70b 100644 --- a/bedrock/src/migration/controller.rs +++ b/bedrock/src/migration/controller.rs @@ -12,6 +12,11 @@ const MIGRATION_KEY_PREFIX: &str = "migration:"; const DEFAULT_RETRY_DELAY_MS: i64 = 60_000; // 1 minute const MAX_RETRY_DELAY_MS: i64 = 86_400_000; // 1 day +#[cfg(not(test))] +const MIGRATION_TIMEOUT_SECS: u64 = 20; // 20 seconds in production +#[cfg(test)] +const MIGRATION_TIMEOUT_SECS: u64 = 1; // 1 second in tests + /// Global lock to prevent concurrent migration runs across all controller instances. /// This is a process-wide coordination mechanism that ensures only one migration /// can execute at a time, regardless of how many [`MigrationController`] instances exist. @@ -91,6 +96,16 @@ impl MigrationController { /// If another migration is already in progress when this method is called, it will return /// immediately with an `InvalidOperation` error rather than waiting. /// + /// # Timeouts + /// + /// Each migration is subject to a 20-second timeout for both applicability checks and execution. + /// If a migration times out: + /// - During `is_applicable()`: The migration is skipped + /// - During `execute()`: The migration is marked as retryable and scheduled for retry with exponential backoff + /// + /// Subsequent migrations will continue regardless of timeouts, ensuring one slow migration + /// doesn't block the entire migration system. + /// /// # Errors /// /// Returns `MigrationError::InvalidOperation` if another migration run is already in progress. @@ -172,18 +187,30 @@ impl MigrationController { // Check if migration is applicable and should run, based on processor defined logic. // This checks actual state (e.g., "does v4 credential exist?") to ensure idempotency // even if migration record is deleted (reinstall scenario). - match processor.is_applicable().await { - Ok(false) => { + // Wrap in timeout to prevent hanging indefinitely + let is_applicable_result = tokio::time::timeout( + tokio::time::Duration::from_secs(MIGRATION_TIMEOUT_SECS), + processor.is_applicable(), + ) + .await; + + match is_applicable_result { + Ok(Ok(false)) => { info!("Migration {migration_id} not applicable, skipping"); summary.skipped += 1; continue; } - Err(e) => { + Ok(Err(e)) => { error!("Failed to check applicability for {migration_id}: {e:?}"); summary.skipped += 1; continue; } - Ok(true) => { + Err(_) => { + error!("Migration {migration_id} is_applicable() timed out after {MIGRATION_TIMEOUT_SECS} seconds, skipping"); + summary.skipped += 1; + continue; + } + Ok(Ok(true)) => { // Continue with execution. Fall through to the execution block below. } } @@ -206,61 +233,21 @@ impl MigrationController { // Save record before execution so we don't lose progress if the app crashes mid-migration self.save_record(&migration_id, &record)?; - // Execute - match processor.execute().await { - Ok(ProcessorResult::Success) => { - info!("Migration {migration_id} succeeded"); - record.status = MigrationStatus::Succeeded; - record.completed_at = Some(Utc::now()); - record.last_error_code = None; - record.last_error_message = None; - record.next_attempt_at = None; // Clear retry time - summary.succeeded += 1; - } - Ok(ProcessorResult::Retryable { - error_code, - error_message, - retry_after_ms, - }) => { - warn!("Migration {migration_id} failed (retryable): {error_code} - {error_message}"); - record.status = MigrationStatus::FailedRetryable; - record.last_error_code = Some(error_code); - record.last_error_message = Some(error_message); - - // Retry time is calculated according to exponential backoff and set on the - // record.next_attempt_at field. When the app is next opened and the - // migration is run again; the controller will check whether to run the - // migration again based on the record.next_attempt_at field. - let retry_delay_ms = retry_after_ms - .unwrap_or_else(|| calculate_backoff_delay(record.attempts)); - record.next_attempt_at = - Some(Utc::now() + Duration::milliseconds(retry_delay_ms)); + // Execute with timeout to prevent indefinite hangs + let execute_result = tokio::time::timeout( + tokio::time::Duration::from_secs(MIGRATION_TIMEOUT_SECS), + processor.execute(), + ) + .await; - summary.failed_retryable += 1; - } - Ok(ProcessorResult::Terminal { - error_code, - error_message, - }) => { - error!("Migration {migration_id} failed (terminal): {error_code} - {error_message}"); - record.status = MigrationStatus::FailedTerminal; - record.last_error_code = Some(error_code); - record.last_error_message = Some(error_message); - record.next_attempt_at = None; // Clear retry time - summary.failed_terminal += 1; - } - Ok(ProcessorResult::BlockedUserAction { reason }) => { - warn!("Migration {migration_id} blocked: {reason}"); - record.status = MigrationStatus::BlockedUserAction; - record.last_error_message = Some(reason); - record.next_attempt_at = None; // Clear retry time - summary.blocked += 1; - } - Err(e) => { - error!("Migration {migration_id} threw error: {e:?}"); + match execute_result { + Err(_) => { + // Timeout occurred + error!("Migration {migration_id} timed out after {MIGRATION_TIMEOUT_SECS} seconds"); record.status = MigrationStatus::FailedRetryable; - record.last_error_code = Some("UNEXPECTED_ERROR".to_string()); - record.last_error_message = Some(format!("{e:?}")); + record.last_error_code = Some("MIGRATION_TIMEOUT".to_string()); + record.last_error_message = + Some(format!("Migration execution timed out after {MIGRATION_TIMEOUT_SECS} seconds")); // Schedule retry with backoff let retry_delay_ms = calculate_backoff_delay(record.attempts); @@ -269,6 +256,70 @@ impl MigrationController { summary.failed_retryable += 1; } + Ok(result) => match result { + Ok(ProcessorResult::Success) => { + info!("Migration {migration_id} succeeded"); + record.status = MigrationStatus::Succeeded; + record.completed_at = Some(Utc::now()); + record.last_error_code = None; + record.last_error_message = None; + record.next_attempt_at = None; // Clear retry time + summary.succeeded += 1; + } + Ok(ProcessorResult::Retryable { + error_code, + error_message, + retry_after_ms, + }) => { + warn!("Migration {migration_id} failed (retryable): {error_code} - {error_message}"); + record.status = MigrationStatus::FailedRetryable; + record.last_error_code = Some(error_code); + record.last_error_message = Some(error_message); + + // Retry time is calculated according to exponential backoff and set on the + // record.next_attempt_at field. When the app is next opened and the + // migration is run again; the controller will check whether to run the + // migration again based on the record.next_attempt_at field. + let retry_delay_ms = retry_after_ms.unwrap_or_else(|| { + calculate_backoff_delay(record.attempts) + }); + record.next_attempt_at = + Some(Utc::now() + Duration::milliseconds(retry_delay_ms)); + + summary.failed_retryable += 1; + } + Ok(ProcessorResult::Terminal { + error_code, + error_message, + }) => { + error!("Migration {migration_id} failed (terminal): {error_code} - {error_message}"); + record.status = MigrationStatus::FailedTerminal; + record.last_error_code = Some(error_code); + record.last_error_message = Some(error_message); + record.next_attempt_at = None; // Clear retry time + summary.failed_terminal += 1; + } + Ok(ProcessorResult::BlockedUserAction { reason }) => { + warn!("Migration {migration_id} blocked: {reason}"); + record.status = MigrationStatus::BlockedUserAction; + record.last_error_message = Some(reason); + record.next_attempt_at = None; // Clear retry time + summary.blocked += 1; + } + Err(e) => { + error!("Migration {migration_id} threw error: {e:?}"); + record.status = MigrationStatus::FailedRetryable; + record.last_error_code = Some("UNEXPECTED_ERROR".to_string()); + record.last_error_message = Some(format!("{e:?}")); + + // Schedule retry with backoff + let retry_delay_ms = calculate_backoff_delay(record.attempts); + record.next_attempt_at = + Some(Utc::now() + Duration::milliseconds(retry_delay_ms)); + + summary.failed_retryable += 1; + } + }, } // Save the final result (success/failure) to storage @@ -725,4 +776,204 @@ mod tests { // Processor should only execute once assert_eq!(processor.execution_count(), 1); } + + /// Test processor that hangs indefinitely during execute + struct HangingExecuteProcessor { + id: String, + } + + impl HangingExecuteProcessor { + fn new(id: &str) -> Self { + Self { id: id.to_string() } + } + } + + #[async_trait] + impl MigrationProcessor for HangingExecuteProcessor { + fn migration_id(&self) -> String { + self.id.clone() + } + + async fn is_applicable(&self) -> Result { + Ok(true) + } + + async fn execute(&self) -> Result { + // Hang longer than the test timeout (1 second) + sleep(Duration::from_secs(10)).await; + Ok(ProcessorResult::Success) + } + } + + /// Test processor that hangs indefinitely during `is_applicable` check + struct HangingApplicabilityProcessor { + id: String, + } + + impl HangingApplicabilityProcessor { + fn new(id: &str) -> Self { + Self { id: id.to_string() } + } + } + + #[async_trait] + impl MigrationProcessor for HangingApplicabilityProcessor { + fn migration_id(&self) -> String { + self.id.clone() + } + + async fn is_applicable(&self) -> Result { + // Hang longer than the test timeout (1 second) + sleep(Duration::from_secs(10)).await; + Ok(true) + } + + async fn execute(&self) -> Result { + Ok(ProcessorResult::Success) + } + } + + #[tokio::test] + #[serial] + async fn test_execute_timeout_marks_migration_as_retryable() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + let processor = Arc::new(HangingExecuteProcessor::new("test.migration.v1")); + let controller = + MigrationController::with_processors(kv_store.clone(), vec![processor]); + + // Run migrations - should timeout + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let summary = result.unwrap(); + assert_eq!(summary.total, 1); + assert_eq!(summary.failed_retryable, 1); + assert_eq!(summary.succeeded, 0); + + // Verify the migration record was saved with timeout error + let key = format!("{MIGRATION_KEY_PREFIX}test.migration.v1"); + let record_json = kv_store.get(key).expect("Record should exist"); + let record: MigrationRecord = + serde_json::from_str(&record_json).expect("Should deserialize"); + + assert!(matches!(record.status, MigrationStatus::FailedRetryable)); + assert_eq!( + record.last_error_code, + Some("MIGRATION_TIMEOUT".to_string()) + ); + assert!(record + .last_error_message + .unwrap() + .contains("timed out after")); + assert!(record.next_attempt_at.is_some()); // Should schedule retry + assert_eq!(record.attempts, 1); + } + + #[tokio::test] + #[serial] + async fn test_is_applicable_timeout_skips_migration() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + let processor = + Arc::new(HangingApplicabilityProcessor::new("test.migration.v1")); + let controller = + MigrationController::with_processors(kv_store.clone(), vec![processor]); + + // Run migrations - should skip due to is_applicable timeout + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let summary = result.unwrap(); + assert_eq!(summary.total, 1); + assert_eq!(summary.skipped, 1); + assert_eq!(summary.succeeded, 0); + assert_eq!(summary.failed_retryable, 0); + + // Verify the migration record was NOT created or updated (skipped before execution) + let key = format!("{MIGRATION_KEY_PREFIX}test.migration.v1"); + let record_result = kv_store.get(key); + // Record might exist from attempt to check applicability, but should not show InProgress + if let Ok(record_json) = record_result { + let record: MigrationRecord = + serde_json::from_str(&record_json).expect("Should deserialize"); + // Should not be InProgress or Succeeded + assert!( + !matches!(record.status, MigrationStatus::InProgress) + && !matches!(record.status, MigrationStatus::Succeeded) + ); + } + } + + #[tokio::test] + #[serial] + async fn test_timeout_does_not_block_subsequent_migrations() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + + // First processor will timeout, second should succeed + let processor1 = Arc::new(HangingExecuteProcessor::new("test.migration1.v1")); + let processor2 = Arc::new(TestProcessor::new("test.migration2.v1")); + + let controller = MigrationController::with_processors( + kv_store.clone(), + vec![processor1, processor2.clone()], + ); + + // Run migrations + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let summary = result.unwrap(); + assert_eq!(summary.total, 2); + assert_eq!(summary.failed_retryable, 1); // First migration timed out + assert_eq!(summary.succeeded, 1); // Second migration succeeded + + // Verify first migration is marked as retryable + let key1 = format!("{MIGRATION_KEY_PREFIX}test.migration1.v1"); + let record1_json = kv_store.get(key1).expect("Migration 1 record should exist"); + let record1: MigrationRecord = + serde_json::from_str(&record1_json).expect("Should deserialize"); + assert!(matches!(record1.status, MigrationStatus::FailedRetryable)); + assert_eq!( + record1.last_error_code, + Some("MIGRATION_TIMEOUT".to_string()) + ); + + // Verify second migration succeeded + let key2 = format!("{MIGRATION_KEY_PREFIX}test.migration2.v1"); + let record2_json = kv_store.get(key2).expect("Migration 2 record should exist"); + let record2: MigrationRecord = + serde_json::from_str(&record2_json).expect("Should deserialize"); + assert!(matches!(record2.status, MigrationStatus::Succeeded)); + + // Verify second processor actually executed + assert_eq!(processor2.execution_count(), 1); + } + + #[tokio::test] + #[serial] + async fn test_timeout_with_exponential_backoff() { + let kv_store = Arc::new(InMemoryDeviceKeyValueStore::new()); + let processor = Arc::new(HangingExecuteProcessor::new("test.migration.v1")); + let controller = + MigrationController::with_processors(kv_store.clone(), vec![processor]); + + // Run migrations multiple times to test backoff + for attempt in 1..=3 { + let result = controller.run_migrations().await; + assert!(result.is_ok()); + + let key = format!("{MIGRATION_KEY_PREFIX}test.migration.v1"); + let record_json = kv_store.get(key.clone()).expect("Record should exist"); + let record: MigrationRecord = + serde_json::from_str(&record_json).expect("Should deserialize"); + + assert_eq!(record.attempts, attempt); + assert!(record.next_attempt_at.is_some()); + + // Clear next_attempt_at to allow immediate retry in test + let mut record_for_retry = record; + record_for_retry.next_attempt_at = None; + let json = serde_json::to_string(&record_for_retry).unwrap(); + kv_store.set(key, json).unwrap(); + } + } } diff --git a/bedrock/src/migration/processor.rs b/bedrock/src/migration/processor.rs index 7c637fd0..ce82ed43 100644 --- a/bedrock/src/migration/processor.rs +++ b/bedrock/src/migration/processor.rs @@ -33,6 +33,21 @@ pub enum ProcessorResult { } /// Trait that all migration processors must implement +/// +/// # Timeouts and Cancellation Safety +/// +/// Both [`is_applicable`](Self::is_applicable) and [`execute`](Self::execute) are subject to timeouts +/// (20 seconds in production). When a timeout occurs, the future is dropped and the migration +/// is marked as failed (for `execute`) or skipped (for `is_applicable`). +/// +/// **IMPORTANT**: Implementations MUST be cancellation-safe: +/// +/// - **DO NOT** spawn background tasks using `tokio::spawn`, `std::thread::spawn`, or similar +/// that will continue running after the timeout +/// - **DO NOT** use blocking operations or FFI calls without proper cleanup +/// - **ENSURE** all work stops when the future is dropped (cooperative cancellation) +/// - **MAKE** migrations idempotent so partial execution can be safely retried +/// #[uniffi::export(with_foreign)] #[async_trait] pub trait MigrationProcessor: Send + Sync { @@ -46,7 +61,6 @@ pub trait MigrationProcessor: Send + Sync { /// to determine if the migration needs to run. This ensures the system is /// truly idempotent and handles edge cases gracefully. /// - /// /// # Returns /// - `Ok(true)` if the migration should run /// - `Ok(false)` if the migration should be skipped @@ -54,6 +68,5 @@ pub trait MigrationProcessor: Send + Sync { async fn is_applicable(&self) -> Result; /// Execute the migration - /// Called by the controller when the migration is ready to run async fn execute(&self) -> Result; }