Skip to content

Commit

Permalink
Refactor context property handling with JSON serialization
Browse files Browse the repository at this point in the history
Refactor property handling in ParallelForEachT to use JSON serialization for tag lists. Simplify the storage and retrieval of tags using helper methods GetTagList and SetTagList. Update storage driver types for variables to use WorkflowInstanceStorageDriver.
  • Loading branch information
sfmskywalker committed Sep 6, 2024
1 parent 48f0f72 commit 760a3e2
Showing 1 changed file with 26 additions and 12 deletions.
38 changes: 26 additions & 12 deletions src/modules/Elsa.Workflows.Core/Activities/ParallelForEachT.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System.Runtime.CompilerServices;
using System.Text.Json;
using System.Text.Json.Nodes;
using Elsa.Expressions.Helpers;
using Elsa.Extensions;
using Elsa.Workflows.Attributes;
Expand Down Expand Up @@ -42,19 +44,19 @@ protected override async ValueTask ExecuteAsync(ActivityExecutionContext context
var tags = new List<Guid>();
var currentIndex = 0;

context.SetProperty(ScheduledTagsProperty, tags);
context.SetProperty(CompletedTagsProperty, new List<Guid>());
SetTagList(context, ScheduledTagsProperty, tags);
SetTagList(context, CompletedTagsProperty, new List<Guid>());

await foreach (var item in items)
{
// For each item, declare a new variable for the work to be scheduled.
var currentValueVariable = new Variable<T>("CurrentValue", item)
{
// TODO: This should be configurable, because this won't work for e.g. file streams and other non-serializable types.
StorageDriverType = typeof(WorkflowStorageDriver)
StorageDriverType = typeof(WorkflowInstanceStorageDriver)
};

var currentIndexVariable = new Variable<int>("CurrentIndex", currentIndex++) { StorageDriverType = typeof(WorkflowStorageDriver) };
var currentIndexVariable = new Variable<int>("CurrentIndex", currentIndex++) { StorageDriverType = typeof(WorkflowInstanceStorageDriver) };
var variables = new List<Variable> { currentValueVariable, currentIndexVariable };

// Schedule a body of work for each item.
Expand All @@ -71,20 +73,32 @@ protected override async ValueTask ExecuteAsync(ActivityExecutionContext context
private async ValueTask OnChildCompleted(ActivityCompletedContext context)
{
var targetContext = context.TargetContext;
var scheduledTags = targetContext.GetProperty<List<Guid>>(ScheduledTagsProperty)!;
var scheduledTags = GetTagList(targetContext, ScheduledTagsProperty);
var completedTag = targetContext.Tag.ConvertTo<Guid>();

var completedTags = new HashSet<Guid>(targetContext.UpdateProperty<List<Guid>>(CompletedTagsProperty, completedTags =>
{
completedTags!.Add(completedTag);
return completedTags;
}));

var completedTags = GetTagList(targetContext, CompletedTagsProperty);

completedTags.Add(completedTag);
SetTagList(targetContext, CompletedTagsProperty, completedTags);

// If not all scheduled activities have completed yet, we're not done yet.
if (!scheduledTags.IsEqualTo(completedTags))
return;

// We're done, so complete the activity.
await targetContext.CompleteActivityAsync();
}

private ICollection<Guid> GetTagList(ActivityExecutionContext context, string propertyName)
{
// Read the list of tags from the context using the specified property name. The value is stored as JsonArray, so we need to deserialize it.
var jsonArray = context.GetProperty<JsonArray>(propertyName);
return jsonArray.Select(x => x.ConvertTo<Guid>()).ToList();
}

private void SetTagList(ActivityExecutionContext context, string propertyName, ICollection<Guid> tags)
{
// Serialize the list of tags to a JsonArray and store it in the context using the specified property name.
var jsonArray = JsonSerializer.SerializeToNode(tags) as JsonArray;
context.SetProperty(propertyName, jsonArray);
}
}

0 comments on commit 760a3e2

Please sign in to comment.