Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for entities to OrchestrationServiceClientShim package #228

Merged
merged 8 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,6 @@ The client is responsible for interacting with orchestrations from outside the w
<EnableStyleCop>true</EnableStyleCop>
</PropertyGroup>

<PropertyGroup>
<!-- We are still working on this package for entities preview. -->
<VersionPrefix>1.0.5</VersionPrefix>
<VersionSuffix></VersionSuffix> <!-- Need this here to set it back to empty. -->
</PropertyGroup>


<ItemGroup>
<ProjectReference Include="../Core/Client.csproj" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
// Licensed under the MIT License.

using DurableTask.Core;
using DurableTask.Core.Entities;
using Microsoft.DurableTask.Client.OrchestrationServiceClientShim;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;

namespace Microsoft.DurableTask.Client;

Expand Down Expand Up @@ -58,17 +60,51 @@ public static IDurableTaskClientBuilder UseOrchestrationService(
builder.Services.AddOptions<ShimDurableTaskClientOptions>(builder.Name)
.PostConfigure<IServiceProvider>((opt, sp) =>
{
if (opt.Client is not null)
{
return;
}

// Try to resolve client from service container.
opt.Client = sp.GetService<IOrchestrationServiceClient>()
?? sp.GetService<IOrchestrationService>() as IOrchestrationServiceClient;
ConfigureClient(sp, opt);
ConfigureEntities(builder.Name, sp, opt);
})
.Validate(x => x.Client is not null, "ShimDurableTaskClientOptions.Client must not be null.");
.Validate(x => x.Client is not null, "ShimDurableTaskClientOptions.Client must not be null.")
.Validate(
x => !x.EnableEntitySupport || x.Entities.Queries is not null,
"ShimDurableTaskClientOptions.Entities.Queries must not be null when entity support is enabled.");

return builder.UseBuildTarget<ShimDurableTaskClient, ShimDurableTaskClientOptions>();
}

static void ConfigureClient(IServiceProvider services, ShimDurableTaskClientOptions options)
{
if (options.Client is not null)
{
return;
}

// Try to resolve client from service container.
options.Client = services.GetService<IOrchestrationServiceClient>()
?? services.GetService<IOrchestrationService>() as IOrchestrationServiceClient;
}

static void ConfigureEntities(string name, IServiceProvider services, ShimDurableTaskClientOptions options)
{
if (options.Entities.Queries is null)
{
options.Entities.Queries = services.GetService<EntityBackendQueries>()
?? GetEntityService(services, options)?.EntityBackendQueries;
}

if (options.Entities.MaxSignalDelayTime is null)
{
EntityBackendProperties? properties = services.GetService<IOptionsMonitor<EntityBackendProperties>>()?.Get(name)
?? GetEntityService(services, options)?.EntityBackendProperties;
options.Entities.MaxSignalDelayTime = properties?.MaximumSignalDelayTime;
}
}

static IEntityOrchestrationService? GetEntityService(
IServiceProvider services, ShimDurableTaskClientOptions options)
{
return services.GetService<IEntityOrchestrationService>()
?? services.GetService<IOrchestrationService>() as IEntityOrchestrationService
?? services.GetService<IOrchestrationServiceClient>() as IEntityOrchestrationService
?? options.Client as IEntityOrchestrationService;
}
}
177 changes: 177 additions & 0 deletions src/Client/OrchestrationServiceClientShim/ShimDurableEntityClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using DurableTask.Core;
using DurableTask.Core.Entities;
using Microsoft.DurableTask.Client.Entities;
using Microsoft.DurableTask.Entities;

namespace Microsoft.DurableTask.Client.OrchestrationServiceClientShim;

/// <summary>
/// A shim client for interacting with entities backend via <see cref="IOrchestrationServiceClient"/>.
/// </summary>
class ShimDurableEntityClient : DurableEntityClient
{
readonly ShimDurableTaskClientOptions options;

/// <summary>
/// Initializes a new instance of the <see cref="ShimDurableEntityClient"/> class.
/// </summary>
/// <param name="name">The name of this client.</param>
/// <param name="options">The client options..</param>
public ShimDurableEntityClient(string name, ShimDurableTaskClientOptions options)
: base(name)
{
this.options = Check.NotNull(options);
}

EntityBackendQueries Queries => this.options.Entities.Queries!;

DataConverter Converter => this.options.DataConverter;

/// <inheritdoc/>
public override async Task<CleanEntityStorageResult> CleanEntityStorageAsync(
CleanEntityStorageRequest? request = null,
bool continueUntilComplete = true,
CancellationToken cancellation = default)
{
CleanEntityStorageRequest r = request ?? CleanEntityStorageRequest.Default;
EntityBackendQueries.CleanEntityStorageResult result = await this.Queries.CleanEntityStorageAsync(
new EntityBackendQueries.CleanEntityStorageRequest()
{
RemoveEmptyEntities = r.RemoveEmptyEntities,
ReleaseOrphanedLocks = r.ReleaseOrphanedLocks,
ContinuationToken = r.ContinuationToken,
},
cancellation);

return new()
{
EmptyEntitiesRemoved = result.EmptyEntitiesRemoved,
OrphanedLocksReleased = result.OrphanedLocksReleased,
ContinuationToken = result.ContinuationToken,
};
}

/// <inheritdoc/>
public override AsyncPageable<EntityMetadata> GetAllEntitiesAsync(EntityQuery? filter = null)
=> this.GetAllEntitiesAsync(this.Convert, filter);

/// <inheritdoc/>
public override AsyncPageable<EntityMetadata<T>> GetAllEntitiesAsync<T>(EntityQuery? filter = null)
=> this.GetAllEntitiesAsync(this.Convert<T>, filter);

/// <inheritdoc/>
public override async Task<EntityMetadata?> GetEntityAsync(
EntityInstanceId id, bool includeState = true, CancellationToken cancellation = default)
=> this.Convert(await this.Queries.GetEntityAsync(
new EntityId(id.Name, id.Key), includeState, false, cancellation));

/// <inheritdoc/>
public override async Task<EntityMetadata<T>?> GetEntityAsync<T>(
EntityInstanceId id, bool includeState = true, CancellationToken cancellation = default)
=> this.Convert<T>(await this.Queries.GetEntityAsync(
new EntityId(id.Name, id.Key), includeState, false, cancellation));

/// <inheritdoc/>
public override async Task SignalEntityAsync(
EntityInstanceId id,
string operationName,
object? input = null,
SignalEntityOptions? options = null,
CancellationToken cancellation = default)
{
Check.NotNullOrEmpty(id.Name);
Check.NotNull(id.Key);

DateTimeOffset? scheduledTime = options?.SignalTime;
string? serializedInput = this.Converter.Serialize(input);

EntityMessageEvent eventToSend = ClientEntityHelpers.EmitOperationSignal(
new OrchestrationInstance() { InstanceId = id.ToString() },
Guid.NewGuid(),
operationName,
serializedInput,
EntityMessageEvent.GetCappedScheduledTime(
DateTime.UtcNow,
this.options.Entities.MaxSignalDelayTimeOrDefault,
scheduledTime?.UtcDateTime));

await this.options.Client!.SendTaskOrchestrationMessageAsync(eventToSend.AsTaskMessage());
}

AsyncPageable<TMetadata> GetAllEntitiesAsync<TMetadata>(
Func<EntityBackendQueries.EntityMetadata, TMetadata> select,
EntityQuery? filter)
where TMetadata : notnull
{
bool includeState = filter?.IncludeState ?? true;
bool includeTransient = filter?.IncludeTransient ?? false;
string startsWith = filter?.InstanceIdStartsWith ?? string.Empty;
DateTime? lastModifiedFrom = filter?.LastModifiedFrom?.UtcDateTime;
DateTime? lastModifiedTo = filter?.LastModifiedTo?.UtcDateTime;

return Pageable.Create(async (continuation, size, cancellation) =>
{
size ??= filter?.PageSize;
EntityBackendQueries.EntityQueryResult result = await this.Queries.QueryEntitiesAsync(
new EntityBackendQueries.EntityQuery()
{
InstanceIdStartsWith = startsWith,
LastModifiedFrom = lastModifiedFrom,
LastModifiedTo = lastModifiedTo,
IncludeTransient = includeTransient,
IncludeState = includeState,
ContinuationToken = continuation,
PageSize = size,
},
cancellation);

return new Page<TMetadata>(result.Results.Select(select).ToList(), result.ContinuationToken);
});
}

EntityMetadata<T> Convert<T>(EntityBackendQueries.EntityMetadata metadata)
{
return new(
new EntityInstanceId(metadata.EntityId.Name, metadata.EntityId.Key),
this.Converter.Deserialize<T>(metadata.SerializedState))
{
LastModifiedTime = metadata.LastModifiedTime,
BacklogQueueSize = metadata.BacklogQueueSize,
LockedBy = metadata.LockedBy,
};
}

EntityMetadata<T>? Convert<T>(EntityBackendQueries.EntityMetadata? metadata)
{
if (metadata is null)
{
return null;
}

return this.Convert<T>(metadata.Value);
}

EntityMetadata Convert(EntityBackendQueries.EntityMetadata metadata)
{
SerializedData? data = metadata.SerializedState is null ? null : new(metadata.SerializedState, this.Converter);
return new(new EntityInstanceId(metadata.EntityId.Name, metadata.EntityId.Key), data)
{
LastModifiedTime = metadata.LastModifiedTime,
BacklogQueueSize = metadata.BacklogQueueSize,
LockedBy = metadata.LockedBy,
};
}

EntityMetadata? Convert(EntityBackendQueries.EntityMetadata? metadata)
{
if (metadata is null)
{
return null;
}

return this.Convert(metadata.Value);
}
}
28 changes: 28 additions & 0 deletions src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@

using System.Diagnostics.CodeAnalysis;
using DurableTask.Core;
using DurableTask.Core.Entities;
using DurableTask.Core.History;
using DurableTask.Core.Query;
using Microsoft.DurableTask.Client.Entities;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Core = DurableTask.Core;
Expand All @@ -18,6 +20,7 @@ namespace Microsoft.DurableTask.Client.OrchestrationServiceClientShim;
class ShimDurableTaskClient : DurableTaskClient
{
readonly ShimDurableTaskClientOptions options;
ShimDurableEntityClient? entities;

/// <summary>
/// Initializes a new instance of the <see cref="ShimDurableTaskClient"/> class.
Expand All @@ -42,6 +45,31 @@ public ShimDurableTaskClient(string name, ShimDurableTaskClientOptions options)
this.options = Check.NotNull(options);
}

/// <inheritdoc/>
public override DurableEntityClient Entities
{
get
{
if (!this.options.EnableEntitySupport)
{
throw new InvalidOperationException("Entity support is not enabled.");
}

if (this.entities is null)
{
if (this.options.Entities.Queries is null)
{
throw new NotSupportedException(
"The configured IOrchestrationServiceClient does not support entities.");
}

this.entities = new(this.Name, this.options);
}

return this.entities;
}
}

DataConverter DataConverter => this.options.DataConverter;

IOrchestrationServiceClient Client => this.options.Client!;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// Licensed under the MIT License.

using DurableTask.Core;
using DurableTask.Core.Entities;
using Microsoft.DurableTask.Client.Entities;

namespace Microsoft.DurableTask.Client.OrchestrationServiceClientShim;

Expand All @@ -15,4 +17,33 @@ public sealed class ShimDurableTaskClientOptions : DurableTaskClientOptions
/// If not manually set, this will be resolved from the <see cref="IServiceProvider" />, if available.
/// </summary>
public IOrchestrationServiceClient? Client { get; set; }

/// <summary>
/// Gets the <see cref="ShimDurableTaskEntityOptions"/> to configure entity support.
/// </summary>
public ShimDurableTaskEntityOptions Entities { get; } = new();
}

/// <summary>
/// Options for entities.
/// </summary>
public class ShimDurableTaskEntityOptions
{
/// <summary>
/// Gets or sets the <see cref="EntityBackendQueries"/> to use in the <see cref="DurableEntityClient" />.
/// If not set manually, this will attempt to be resolved automatically by looking for
/// <see cref="IEntityOrchestrationService"/> in the <see cref="IServiceProvider"/>.
/// </summary>
public EntityBackendQueries? Queries { get; set; }

/// <summary>
/// Gets or sets the maximum time span to use for signal delay. If not set manually, will attempt to be resolved
/// through the service container. This will finally default to 3 days if it cannot be any other means.
/// </summary>
public TimeSpan? MaxSignalDelayTime { get; set; }

/// <summary>
/// Gets the max signal delay time.
/// </summary>
internal TimeSpan MaxSignalDelayTimeOrDefault => this.MaxSignalDelayTime ?? TimeSpan.FromDays(3);
}