From 8e4a79ab1c426103adf750785c6657db4b35919e Mon Sep 17 00:00:00 2001 From: dakuchar <60290791+dakuchar@users.noreply.github.com> Date: Wed, 1 Nov 2023 10:44:00 +0100 Subject: [PATCH] [FIX] Update WorkflowInstance state before resuming (#4573) This is needed to resolve concurrency problems, where another Engine Instance has changed the state of the resuming workflowinstance. Co-authored-by: danielkuchar --- .../StartupTasks/ContinueRunningWorkflows.cs | 55 ++++++++++++++----- 1 file changed, 41 insertions(+), 14 deletions(-) diff --git a/src/core/Elsa.Core/StartupTasks/ContinueRunningWorkflows.cs b/src/core/Elsa.Core/StartupTasks/ContinueRunningWorkflows.cs index df956e4893..e8de02a516 100644 --- a/src/core/Elsa.Core/StartupTasks/ContinueRunningWorkflows.cs +++ b/src/core/Elsa.Core/StartupTasks/ContinueRunningWorkflows.cs @@ -63,7 +63,7 @@ private async Task ResumeIdleWorkflows(CancellationToken cancellationToken) if (instances.Any()) _logger.LogInformation("Found {WorkflowInstanceCount} workflows with status 'Idle'. Resuming each one of them", instances.Count); else - _logger.LogInformation("Found no workflows with status 'Id'. Nothing to resume"); + _logger.LogInformation("Found no workflows with status 'Idle'. Nothing to resume"); foreach (var instance in instances) { @@ -76,9 +76,22 @@ private async Task ResumeIdleWorkflows(CancellationToken cancellationToken) } _logger.LogInformation("Resuming {WorkflowInstanceId}", instance.Id); - - var input = await GetWorkflowInputAsync(instance, cancellationToken); - await _workflowInstanceDispatcher.DispatchAsync(new ExecuteWorkflowInstanceRequest(instance.Id, Input: input), cancellationToken); + // Instance state might have change since resume has started. + var resumingInstance = await _workflowInstanceStore.FindByIdAsync(instance.Id, cancellationToken); + if (resumingInstance == null) + { + _logger.LogInformation("Resuming {WorkflowInstanceId} aborted, because it has been removed from storage", instance.Id); + continue; + } + + if (resumingInstance.WorkflowStatus != WorkflowStatus.Idle) + { + _logger.LogInformation("Resuming {WorkflowInstanceId} aborted, because it's status changed from 'Idle' to '{WorkflowStatus}'", resumingInstance.Id, resumingInstance.WorkflowStatus); + continue; + } + + var input = await GetWorkflowInputAsync(resumingInstance, cancellationToken); + await _workflowInstanceDispatcher.DispatchAsync(new ExecuteWorkflowInstanceRequest(resumingInstance.Id, Input: input), cancellationToken); } } @@ -102,28 +115,42 @@ private async Task ResumeRunningWorkflowsAsync(CancellationToken cancellationTok } _logger.LogInformation("Resuming {WorkflowInstanceId}", instance.Id); - var scheduledActivities = instance.ScheduledActivities; + // Instance state might have change since resume has started. + var resumingInstance = await _workflowInstanceStore.FindByIdAsync(instance.Id, cancellationToken); + if (resumingInstance == null) + { + _logger.LogInformation("Resuming {WorkflowInstanceId} aborted, because it has been removed from storage", instance.Id); + continue; + } + + if (resumingInstance.WorkflowStatus != WorkflowStatus.Running) + { + _logger.LogInformation("Resuming {WorkflowInstanceId} aborted, because it's status changed from 'Running' to '{WorkflowStatus}'", resumingInstance.Id, resumingInstance.WorkflowStatus); + continue; + } + + var scheduledActivities = resumingInstance.ScheduledActivities; - if (instance.CurrentActivity == null && !scheduledActivities.Any()) + if (resumingInstance.CurrentActivity == null && !scheduledActivities.Any()) { - if (instance.BlockingActivities.Any()) + if (resumingInstance.BlockingActivities.Any()) { _logger.LogWarning( "Workflow '{WorkflowInstanceId}' was in the Running state, but has no scheduled activities not has a currently executing one. However, it does have blocking activities, so switching to Suspended status", - instance.Id); + resumingInstance.Id); - instance.WorkflowStatus = WorkflowStatus.Suspended; - await _workflowInstanceStore.SaveAsync(instance, cancellationToken); + resumingInstance.WorkflowStatus = WorkflowStatus.Suspended; + await _workflowInstanceStore.SaveAsync(resumingInstance, cancellationToken); continue; } - _logger.LogWarning("Workflow '{WorkflowInstanceId}' was in the Running state, but has no scheduled activities nor has a currently executing one", instance.Id); + _logger.LogWarning("Workflow '{WorkflowInstanceId}' was in the Running state, but has no scheduled activities nor has a currently executing one", resumingInstance.Id); continue; } - var scheduledActivity = instance.CurrentActivity ?? instance.ScheduledActivities.Peek(); - var input = await GetWorkflowInputAsync(instance, cancellationToken); - await _workflowInstanceDispatcher.DispatchAsync(new ExecuteWorkflowInstanceRequest(instance.Id, scheduledActivity.ActivityId, input), cancellationToken); + var scheduledActivity = resumingInstance.CurrentActivity ?? resumingInstance.ScheduledActivities.Peek(); + var input = await GetWorkflowInputAsync(resumingInstance, cancellationToken); + await _workflowInstanceDispatcher.DispatchAsync(new ExecuteWorkflowInstanceRequest(resumingInstance.Id, scheduledActivity.ActivityId, input), cancellationToken); } }