From 3641e3f2dfc7be3e8186a67f209c006ff8e4ae41 Mon Sep 17 00:00:00 2001
From: kaibocai <89094811+kaibocai@users.noreply.github.com>
Date: Wed, 13 Apr 2022 13:44:17 -0500
Subject: [PATCH] add multi instance query support logics (#702)
* add multi instance query support logics
* have return type as OrchestrationStatusQueryResult
* remove unnecessary files copied from durable extension
* made property in OrchestrationQueryResult read only
* fix parameter name - make ContinuationToken nullable - throw exception if orchestrationState is null
* make continuationToken null in constructor
---
.../AzureStorageOrchestrationService.cs | 38 +++++++++-
.../Query/IOrchestrationServiceQueryClient.cs | 30 ++++++++
.../Query/OrchestrationQuery.cs | 73 +++++++++++++++++++
.../Query/OrchestrationQueryResult.cs | 46 ++++++++++++
4 files changed, 186 insertions(+), 1 deletion(-)
create mode 100644 src/DurableTask.Core/Query/IOrchestrationServiceQueryClient.cs
create mode 100644 src/DurableTask.Core/Query/OrchestrationQuery.cs
create mode 100644 src/DurableTask.Core/Query/OrchestrationQueryResult.cs
diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs
index bf7fcc245..5f8b61bd8 100644
--- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs
+++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs
@@ -30,6 +30,7 @@ namespace DurableTask.AzureStorage
using DurableTask.Core;
using DurableTask.Core.Exceptions;
using DurableTask.Core.History;
+ using DurableTask.Core.Query;
using Microsoft.WindowsAzure.Storage;
using Newtonsoft.Json;
@@ -39,7 +40,8 @@ namespace DurableTask.AzureStorage
public sealed class AzureStorageOrchestrationService :
IOrchestrationService,
IOrchestrationServiceClient,
- IDisposable
+ IDisposable,
+ IOrchestrationServiceQueryClient
{
static readonly HistoryEvent[] EmptyHistoryEventList = new HistoryEvent[0];
@@ -1890,6 +1892,40 @@ public void Dispose()
this.orchestrationSessionManager.Dispose();
}
+ ///
+ /// Gets the status of all orchestration instances with paging that match the specified conditions.
+ ///
+ public async Task GetOrchestrationWithQueryAsync(OrchestrationQuery query, CancellationToken cancellationToken)
+ {
+ OrchestrationInstanceStatusQueryCondition convertedCondition = ToAzureStorageCondition(query);
+ DurableStatusQueryResult statusContext = await this.GetOrchestrationStateAsync(convertedCondition, query.PageSize, query.ContinuationToken, cancellationToken);
+ return ConvertFrom(statusContext);
+ }
+
+ private static OrchestrationInstanceStatusQueryCondition ToAzureStorageCondition(OrchestrationQuery condition)
+ {
+ return new OrchestrationInstanceStatusQueryCondition
+ {
+ RuntimeStatus = condition.RuntimeStatus,
+ CreatedTimeFrom = condition.CreatedTimeFrom ?? default(DateTime),
+ CreatedTimeTo = condition.CreatedTimeTo ?? default(DateTime),
+ TaskHubNames = condition.TaskHubNames,
+ InstanceIdPrefix = condition.InstanceIdPrefix,
+ FetchInput = condition.FetchInputsAndOutputs,
+ };
+ }
+
+ private static OrchestrationQueryResult ConvertFrom(DurableStatusQueryResult statusContext)
+ {
+ var results = new List();
+ foreach (var state in statusContext.OrchestrationState)
+ {
+ results.Add(state);
+ }
+
+ return new OrchestrationQueryResult(results, statusContext.ContinuationToken);
+ }
+
class PendingMessageBatch
{
public string OrchestrationInstanceId { get; set; }
diff --git a/src/DurableTask.Core/Query/IOrchestrationServiceQueryClient.cs b/src/DurableTask.Core/Query/IOrchestrationServiceQueryClient.cs
new file mode 100644
index 000000000..cd16bee6b
--- /dev/null
+++ b/src/DurableTask.Core/Query/IOrchestrationServiceQueryClient.cs
@@ -0,0 +1,30 @@
+// ----------------------------------------------------------------------------------
+// Copyright Microsoft Corporation
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// ----------------------------------------------------------------------------------
+
+namespace DurableTask.Core.Query
+{
+ using System.Threading.Tasks;
+ using System.Threading;
+
+ ///
+ /// Interface to allow query multi-instance status with filter.
+ ///
+ public interface IOrchestrationServiceQueryClient
+ {
+ ///
+ /// Gets the status of all orchestration instances with paging that match the specified conditions.
+ ///
+ /// Return orchestration instances that match the specified query.
+ /// Cancellation token that can be used to cancel the query operation.
+ /// Returns each page of orchestration status for all instances and continuation token of next page.
+ Task GetOrchestrationWithQueryAsync(OrchestrationQuery query, CancellationToken cancellationToken);
+ }
+}
diff --git a/src/DurableTask.Core/Query/OrchestrationQuery.cs b/src/DurableTask.Core/Query/OrchestrationQuery.cs
new file mode 100644
index 000000000..2932d11da
--- /dev/null
+++ b/src/DurableTask.Core/Query/OrchestrationQuery.cs
@@ -0,0 +1,73 @@
+// ----------------------------------------------------------------------------------
+// Copyright Microsoft Corporation
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ----------------------------------------------------------------------------------
+#nullable enable
+namespace DurableTask.Core.Query
+{
+ using System;
+ using System.Collections.Generic;
+
+ ///
+ /// Query condition for searching the status of orchestration instances.
+ ///
+ public class OrchestrationQuery
+ {
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ public OrchestrationQuery() { }
+
+ ///
+ /// Return orchestration instances which matches the runtimeStatus.
+ ///
+ public ICollection? RuntimeStatus { get; set; }
+
+ ///
+ /// Return orchestration instances which were created after this DateTime.
+ ///
+ public DateTime? CreatedTimeFrom { get; set; }
+
+ ///
+ /// Return orchestration instances which were created before this DateTime.
+ ///
+ public DateTime? CreatedTimeTo { get; set; }
+
+ ///
+ /// Return orchestration instances which matches the TaskHubNames.
+ ///
+ public ICollection? TaskHubNames { get; set; }
+
+ ///
+ /// Maximum number of records that can be returned by the request. The default value is 100.
+ ///
+ ///
+ /// Requests may return fewer records than the specified page size, even if there are more records.
+ /// Always check the continuation token to determine whether there are more records.
+ ///
+ public int PageSize { get; set; } = 100;
+
+ ///
+ /// ContinuationToken of the pager.
+ ///
+ public string? ContinuationToken { get; set; }
+
+ ///
+ /// Return orchestration instances that have this instance id prefix.
+ ///
+ public string? InstanceIdPrefix { get; set; }
+
+ ///
+ /// Determines whether the query will include the input of the orchestration.
+ ///
+ public bool FetchInputsAndOutputs { get; set; } = true;
+ }
+}
\ No newline at end of file
diff --git a/src/DurableTask.Core/Query/OrchestrationQueryResult.cs b/src/DurableTask.Core/Query/OrchestrationQueryResult.cs
new file mode 100644
index 000000000..67da64faa
--- /dev/null
+++ b/src/DurableTask.Core/Query/OrchestrationQueryResult.cs
@@ -0,0 +1,46 @@
+// ----------------------------------------------------------------------------------
+// Copyright Microsoft Corporation
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ----------------------------------------------------------------------------------
+#nullable enable
+namespace DurableTask.Core.Query
+{
+ using System;
+ using System.Collections.Generic;
+
+ ///
+ /// The status of all orchestration instances with paging for a given query.
+ ///
+ public class OrchestrationQueryResult
+ {
+ ///
+ /// Constructor of OrchestrationQueryResult Class.
+ ///
+ /// A collection of orchestration instance status values.
+ /// A server-generated continuation token or null if there are no further continuations.
+ public OrchestrationQueryResult(IReadOnlyCollection orchestrationState, string? continuationToken)
+ {
+ this.OrchestrationState = orchestrationState ?? throw new ArgumentNullException(nameof(orchestrationState));
+ this.ContinuationToken = continuationToken;
+ }
+ ///
+ /// Gets a collection of statuses of orchestration instances matching the query description.
+ ///
+ /// A collection of orchestration instance status values.
+ public IReadOnlyCollection OrchestrationState { get; }
+
+ ///
+ /// Gets token that can be used to resume the query with data not already returned by this query.
+ ///
+ /// A server-generated continuation token or null if there are no further continuations.
+ public string? ContinuationToken { get; }
+ }
+}