Skip to content

Commit

Permalink
[FIX] Update WorkflowInstance state before resuming (#4573)
Browse files Browse the repository at this point in the history
This is needed to resolve concurrency problems, where another Engine Instance has changed the state of the resuming workflowinstance.

Co-authored-by: danielkuchar <daniel.kuchar@mmi-media.de>
  • Loading branch information
dakuchar and dkuchar committed Nov 1, 2023
1 parent 2caab0c commit 8e4a79a
Showing 1 changed file with 41 additions and 14 deletions.
55 changes: 41 additions & 14 deletions src/core/Elsa.Core/StartupTasks/ContinueRunningWorkflows.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ private async Task ResumeIdleWorkflows(CancellationToken cancellationToken)
if (instances.Any())
_logger.LogInformation("Found {WorkflowInstanceCount} workflows with status 'Idle'. Resuming each one of them", instances.Count);
else
_logger.LogInformation("Found no workflows with status 'Id'. Nothing to resume");
_logger.LogInformation("Found no workflows with status 'Idle'. Nothing to resume");

foreach (var instance in instances)
{
Expand All @@ -76,9 +76,22 @@ private async Task ResumeIdleWorkflows(CancellationToken cancellationToken)
}

_logger.LogInformation("Resuming {WorkflowInstanceId}", instance.Id);

var input = await GetWorkflowInputAsync(instance, cancellationToken);
await _workflowInstanceDispatcher.DispatchAsync(new ExecuteWorkflowInstanceRequest(instance.Id, Input: input), cancellationToken);
// Instance state might have change since resume has started.
var resumingInstance = await _workflowInstanceStore.FindByIdAsync(instance.Id, cancellationToken);
if (resumingInstance == null)
{
_logger.LogInformation("Resuming {WorkflowInstanceId} aborted, because it has been removed from storage", instance.Id);
continue;
}

if (resumingInstance.WorkflowStatus != WorkflowStatus.Idle)
{
_logger.LogInformation("Resuming {WorkflowInstanceId} aborted, because it's status changed from 'Idle' to '{WorkflowStatus}'", resumingInstance.Id, resumingInstance.WorkflowStatus);
continue;
}

var input = await GetWorkflowInputAsync(resumingInstance, cancellationToken);
await _workflowInstanceDispatcher.DispatchAsync(new ExecuteWorkflowInstanceRequest(resumingInstance.Id, Input: input), cancellationToken);
}
}

Expand All @@ -102,28 +115,42 @@ private async Task ResumeRunningWorkflowsAsync(CancellationToken cancellationTok
}

_logger.LogInformation("Resuming {WorkflowInstanceId}", instance.Id);
var scheduledActivities = instance.ScheduledActivities;
// Instance state might have change since resume has started.
var resumingInstance = await _workflowInstanceStore.FindByIdAsync(instance.Id, cancellationToken);
if (resumingInstance == null)
{
_logger.LogInformation("Resuming {WorkflowInstanceId} aborted, because it has been removed from storage", instance.Id);
continue;
}

if (resumingInstance.WorkflowStatus != WorkflowStatus.Running)
{
_logger.LogInformation("Resuming {WorkflowInstanceId} aborted, because it's status changed from 'Running' to '{WorkflowStatus}'", resumingInstance.Id, resumingInstance.WorkflowStatus);
continue;
}

var scheduledActivities = resumingInstance.ScheduledActivities;

if (instance.CurrentActivity == null && !scheduledActivities.Any())
if (resumingInstance.CurrentActivity == null && !scheduledActivities.Any())
{
if (instance.BlockingActivities.Any())
if (resumingInstance.BlockingActivities.Any())
{
_logger.LogWarning(
"Workflow '{WorkflowInstanceId}' was in the Running state, but has no scheduled activities not has a currently executing one. However, it does have blocking activities, so switching to Suspended status",
instance.Id);
resumingInstance.Id);

instance.WorkflowStatus = WorkflowStatus.Suspended;
await _workflowInstanceStore.SaveAsync(instance, cancellationToken);
resumingInstance.WorkflowStatus = WorkflowStatus.Suspended;
await _workflowInstanceStore.SaveAsync(resumingInstance, cancellationToken);
continue;
}

_logger.LogWarning("Workflow '{WorkflowInstanceId}' was in the Running state, but has no scheduled activities nor has a currently executing one", instance.Id);
_logger.LogWarning("Workflow '{WorkflowInstanceId}' was in the Running state, but has no scheduled activities nor has a currently executing one", resumingInstance.Id);
continue;
}

var scheduledActivity = instance.CurrentActivity ?? instance.ScheduledActivities.Peek();
var input = await GetWorkflowInputAsync(instance, cancellationToken);
await _workflowInstanceDispatcher.DispatchAsync(new ExecuteWorkflowInstanceRequest(instance.Id, scheduledActivity.ActivityId, input), cancellationToken);
var scheduledActivity = resumingInstance.CurrentActivity ?? resumingInstance.ScheduledActivities.Peek();
var input = await GetWorkflowInputAsync(resumingInstance, cancellationToken);
await _workflowInstanceDispatcher.DispatchAsync(new ExecuteWorkflowInstanceRequest(resumingInstance.Id, scheduledActivity.ActivityId, input), cancellationToken);
}
}

Expand Down

0 comments on commit 8e4a79a

Please sign in to comment.