From 76912f3f0eacf9a1afa7e6ad0bfab9123caf7cfd Mon Sep 17 00:00:00 2001
From: Kamesh Tanneru <36271852+tkamesh@users.noreply.github.com>
Date: Tue, 17 Mar 2020 22:40:42 -0700
Subject: [PATCH] [AzureServiceFabric provider] Implement ContinueAsNew
functionality. (#362)
* [AzureServiceFabric] Implement ContinueAsNew functionality.
* Fixes for ContinueAsNew functionality.
* Add test for RecurringOrchestration.
* Convert RetryPolicy to interface. Remove confusing assert.
---
.../FunctionalTests.cs | 45 +++++++++++
.../OrchestrationTasks/ITestTasks.cs | 13 +++-
.../OrchestrationTasks/TestTasks.cs | 25 ++++++-
.../GenerationBasicOrchestration.cs | 42 +++++++++++
.../Orchestrations/RecurringOrchestration.cs | 75 +++++++++++++++++++
.../RecurringOrchestrationInput.cs | 24 ++++++
.../RecurringTargetOrchestration.cs | 31 ++++++++
.../TestOrchestrationsProvider.cs | 3 +
.../DurableTask.AzureServiceFabric.csproj | 2 +-
.../FabricOrchestrationInstanceStore.cs | 49 ++++++------
.../FabricOrchestrationService.cs | 41 ++++++----
.../FabricOrchestrationServiceClient.cs | 3 +-
...FabricOrchestrationServiceInstanceStore.cs | 2 +-
src/DurableTask.AzureServiceFabric/README.md | 1 -
.../RemoteOrchestrationServiceClient.cs | 2 +-
.../Service/Startup.cs | 2 +
.../CountBasedFixedDelayRetryPolicy.cs | 16 ++--
.../TaskHelpers/RetryHelper.cs | 5 +-
.../TaskHelpers/RetryPolicy.cs | 6 +-
.../SimpleOrchestrations.cs | 2 +-
20 files changed, 327 insertions(+), 62 deletions(-)
create mode 100644 Test/TestFabricApplication/TestApplication.Common/Orchestrations/GenerationBasicOrchestration.cs
create mode 100644 Test/TestFabricApplication/TestApplication.Common/Orchestrations/RecurringOrchestration.cs
create mode 100644 Test/TestFabricApplication/TestApplication.Common/Orchestrations/RecurringOrchestrationInput.cs
create mode 100644 Test/TestFabricApplication/TestApplication.Common/Orchestrations/RecurringTargetOrchestration.cs
diff --git a/Test/DurableTask.AzureServiceFabric.Integration.Tests/FunctionalTests.cs b/Test/DurableTask.AzureServiceFabric.Integration.Tests/FunctionalTests.cs
index c2a281877..87a7e610a 100644
--- a/Test/DurableTask.AzureServiceFabric.Integration.Tests/FunctionalTests.cs
+++ b/Test/DurableTask.AzureServiceFabric.Integration.Tests/FunctionalTests.cs
@@ -109,6 +109,51 @@ public async Task Orchestration_With_TimeoutWrapper_Test()
Console.WriteLine($"Time for Orchestration with 5 second running task wrapped in 1 second timeout : {result.CompletedTime - result.CreatedTime}");
}
+ [TestMethod]
+ public async Task GenerationBasicTest()
+ {
+ var instanceId = nameof(GenerationBasicOrchestration);
+ var instance = await this.taskHubClient.CreateOrchestrationInstanceAsync(typeof(GenerationBasicOrchestration), instanceId, 2);
+ var result = await this.taskHubClient.WaitForOrchestrationAsync(instance, TimeSpan.FromMinutes(2));
+
+ Assert.AreEqual(OrchestrationStatus.ContinuedAsNew, result.OrchestrationStatus);
+
+ var state = await this.taskHubClient.GetOrchestrationStateAsync(instanceId);
+ result = await this.taskHubClient.WaitForOrchestrationAsync(state.OrchestrationInstance, TimeSpan.FromMinutes(2));
+
+ Assert.AreEqual(OrchestrationStatus.Completed, result.OrchestrationStatus);
+ Assert.AreEqual("2", result.Output, "Orchestration Result is wrong!!!");
+ Assert.AreEqual(result.OrchestrationInstance.InstanceId, instance.InstanceId);
+ Assert.AreNotEqual(result.OrchestrationInstance.ExecutionId, instance.ExecutionId);
+ }
+
+ [TestMethod]
+ public async Task RecurringOrchestrationTest()
+ {
+ var instanceId = nameof(RecurringOrchestration);
+ var input = new RecurringOrchestrationInput
+ {
+ TargetOrchestrationInput = "1",
+ TargetOrchestrationType = typeof(RecurringTargetOrchestration).ToString(),
+ TargetOrchestrationInstanceId = nameof(RecurringTargetOrchestration)
+ };
+ var instance = await this.taskHubClient.CreateOrchestrationInstanceAsync(typeof(RecurringOrchestration), instanceId, input);
+ await Task.Delay(TimeSpan.FromSeconds(1));
+ var result = await this.taskHubClient.GetOrchestrationStateAsync(input.TargetOrchestrationInstanceId, false);
+
+ Assert.AreNotEqual("4", result[0].Output, "Orchestration Result is wrong!!!");
+
+ OrchestrationState state;
+ do
+ {
+ state = await this.taskHubClient.GetOrchestrationStateAsync(instanceId);
+ await Task.Delay(TimeSpan.FromSeconds(1));
+ } while (state.OrchestrationStatus != OrchestrationStatus.Completed);
+
+ Assert.AreEqual(OrchestrationStatus.Completed, state.OrchestrationStatus);
+ Assert.AreEqual("4", state.Output, "Orchestration Result is wrong!!!");
+ }
+
[TestMethod]
public async Task Orchestration_With_Same_Id_Cant_Be_Started_While_Running()
{
diff --git a/Test/TestFabricApplication/TestApplication.Common/OrchestrationTasks/ITestTasks.cs b/Test/TestFabricApplication/TestApplication.Common/OrchestrationTasks/ITestTasks.cs
index f0f58791c..baef9c21e 100644
--- a/Test/TestFabricApplication/TestApplication.Common/OrchestrationTasks/ITestTasks.cs
+++ b/Test/TestFabricApplication/TestApplication.Common/OrchestrationTasks/ITestTasks.cs
@@ -14,7 +14,6 @@
namespace TestApplication.Common.OrchestrationTasks
{
using System.Threading.Tasks;
- using DurableTask.Test.Orchestrations.Performance;
public interface ITestTasks
{
@@ -24,5 +23,17 @@ public interface ITestTasks
/// remaining number of attempts
/// bool indicating whether task completed successfully or not.
Task ThrowExceptionAsync(int remainingAttempts);
+
+ ///
+ /// Increments Generation Count variable.
+ ///
+ /// Generation count
+ Task IncrementGenerationCount();
+
+ ///
+ /// Utility method to reset counter at the beginning of test.
+ ///
+ /// Generation coutner value
+ Task ResetGenerationCounter();
}
}
diff --git a/Test/TestFabricApplication/TestApplication.Common/OrchestrationTasks/TestTasks.cs b/Test/TestFabricApplication/TestApplication.Common/OrchestrationTasks/TestTasks.cs
index 9b02eeaf3..dd9abfb70 100644
--- a/Test/TestFabricApplication/TestApplication.Common/OrchestrationTasks/TestTasks.cs
+++ b/Test/TestFabricApplication/TestApplication.Common/OrchestrationTasks/TestTasks.cs
@@ -19,19 +19,40 @@ namespace TestApplication.Common.OrchestrationTasks
public class TestTasks : ITestTasks
{
+ static int generationCount = 0;
+
+ ///
+ /// Increments Generation Count variable.
+ ///
+ /// Generation count
+ public Task IncrementGenerationCount()
+ {
+ return Task.FromResult(++generationCount);
+ }
+
+ ///
+ /// Utility method to reset counter at the beginning of test.
+ ///
+ /// Generation coutner value
+ public Task ResetGenerationCounter()
+ {
+ generationCount = 0;
+ return Task.FromResult(generationCount);
+ }
+
///
/// Throws exception when remainingAttempts > 0. Otherwise succeeds.
///
/// remaining number of attempts
/// bool indicating whether task completed successfully or not.
- public async Task ThrowExceptionAsync(int remainingAttempts)
+ public Task ThrowExceptionAsync(int remainingAttempts)
{
if (remainingAttempts > 0)
{
throw new CounterException(remainingAttempts);
}
- return true;
+ return Task.FromResult(true);
}
}
}
diff --git a/Test/TestFabricApplication/TestApplication.Common/Orchestrations/GenerationBasicOrchestration.cs b/Test/TestFabricApplication/TestApplication.Common/Orchestrations/GenerationBasicOrchestration.cs
new file mode 100644
index 000000000..f3b953a43
--- /dev/null
+++ b/Test/TestFabricApplication/TestApplication.Common/Orchestrations/GenerationBasicOrchestration.cs
@@ -0,0 +1,42 @@
+// ----------------------------------------------------------------------------------
+// Copyright Microsoft Corporation
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ----------------------------------------------------------------------------------
+
+namespace TestApplication.Common.Orchestrations
+{
+ using System.Threading.Tasks;
+
+ using DurableTask.Core;
+
+ using TestApplication.Common.OrchestrationTasks;
+
+ public class GenerationBasicOrchestration : TaskOrchestration
+ {
+ // HACK: This is just a hack to communicate result of orchestration back to test
+ public static int Result;
+
+ public override async Task RunTask(OrchestrationContext context, int numberOfGenerations)
+ {
+ var testTasks = context.CreateClient();
+ int count = await testTasks.IncrementGenerationCount();
+ numberOfGenerations--;
+ if (numberOfGenerations > 0)
+ {
+ context.ContinueAsNew(numberOfGenerations);
+ }
+
+ // This is a HACK to get unit test up and running. Should never be done in actual code.
+ Result = count;
+ return count;
+ }
+ }
+}
diff --git a/Test/TestFabricApplication/TestApplication.Common/Orchestrations/RecurringOrchestration.cs b/Test/TestFabricApplication/TestApplication.Common/Orchestrations/RecurringOrchestration.cs
new file mode 100644
index 000000000..3f92aa3d6
--- /dev/null
+++ b/Test/TestFabricApplication/TestApplication.Common/Orchestrations/RecurringOrchestration.cs
@@ -0,0 +1,75 @@
+// ----------------------------------------------------------------------------------
+// Copyright Microsoft Corporation
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ----------------------------------------------------------------------------------
+
+namespace TestApplication.Common.Orchestrations
+{
+ using System;
+ using System.Threading;
+ using System.Threading.Tasks;
+
+ using DurableTask.Core;
+
+ using TestApplication.Common.OrchestrationTasks;
+
+ public class RecurringOrchestration : TaskOrchestration
+ {
+ // HACK: This is just a hack to communicate result of orchestration back to test
+ private static int targetOchestrationInvocationsCount = 0;
+
+ public override async Task RunTask(OrchestrationContext context, RecurringOrchestrationInput input)
+ {
+ var testTasks = context.CreateClient();
+
+ if (targetOchestrationInvocationsCount == 0)
+ {
+ // First time, Reset Generation Count variable.
+ await testTasks.ResetGenerationCounter();
+ }
+
+ int result = await context.CreateSubOrchestrationInstance(input.TargetOrchestrationType,
+ GetTargetOrchestrationVersion(),
+ input.TargetOrchestrationInstanceId,
+ input.TargetOrchestrationInput);
+
+ await context.CreateTimer(GetNextExecutionTime(context), true, CancellationToken.None);
+ if (ShouldRepeatTargetOrchestration())
+ {
+ context.ContinueAsNew(input);
+ }
+ else
+ {
+ // Finally, Reset Generation Count variable.
+ await testTasks.ResetGenerationCounter();
+ }
+
+ return result;
+ }
+
+ public virtual DateTime GetNextExecutionTime(OrchestrationContext context)
+ {
+ return context.CurrentUtcDateTime.AddSeconds(Math.Pow(2, targetOchestrationInvocationsCount));
+ }
+
+ public virtual string GetTargetOrchestrationVersion()
+ {
+ return string.Empty;
+ }
+
+ public virtual bool ShouldRepeatTargetOrchestration()
+ {
+ targetOchestrationInvocationsCount++;
+
+ return targetOchestrationInvocationsCount < 4;
+ }
+ }
+}
diff --git a/Test/TestFabricApplication/TestApplication.Common/Orchestrations/RecurringOrchestrationInput.cs b/Test/TestFabricApplication/TestApplication.Common/Orchestrations/RecurringOrchestrationInput.cs
new file mode 100644
index 000000000..dc4214b3e
--- /dev/null
+++ b/Test/TestFabricApplication/TestApplication.Common/Orchestrations/RecurringOrchestrationInput.cs
@@ -0,0 +1,24 @@
+// ----------------------------------------------------------------------------------
+// Copyright Microsoft Corporation
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ----------------------------------------------------------------------------------
+
+namespace TestApplication.Common.Orchestrations
+{
+ public class RecurringOrchestrationInput
+ {
+ public string TargetOrchestrationType { get; set; }
+
+ public string TargetOrchestrationInput { get; set; }
+
+ public string TargetOrchestrationInstanceId { get; set; }
+ }
+}
diff --git a/Test/TestFabricApplication/TestApplication.Common/Orchestrations/RecurringTargetOrchestration.cs b/Test/TestFabricApplication/TestApplication.Common/Orchestrations/RecurringTargetOrchestration.cs
new file mode 100644
index 000000000..68611dab0
--- /dev/null
+++ b/Test/TestFabricApplication/TestApplication.Common/Orchestrations/RecurringTargetOrchestration.cs
@@ -0,0 +1,31 @@
+// ----------------------------------------------------------------------------------
+// Copyright Microsoft Corporation
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ----------------------------------------------------------------------------------
+
+namespace TestApplication.Common.Orchestrations
+{
+ using System.Threading.Tasks;
+
+ using DurableTask.Core;
+
+ using TestApplication.Common.OrchestrationTasks;
+
+ public class RecurringTargetOrchestration : TaskOrchestration
+ {
+ public override async Task RunTask(OrchestrationContext context, string input)
+ {
+ var testTasks = context.CreateClient();
+ int count = await testTasks.IncrementGenerationCount();
+ return count;
+ }
+ }
+}
diff --git a/Test/TestFabricApplication/TestApplication.StatefulService/TestOrchestrationsProvider.cs b/Test/TestFabricApplication/TestApplication.StatefulService/TestOrchestrationsProvider.cs
index c40fd0de6..1e45902aa 100644
--- a/Test/TestFabricApplication/TestApplication.StatefulService/TestOrchestrationsProvider.cs
+++ b/Test/TestFabricApplication/TestApplication.StatefulService/TestOrchestrationsProvider.cs
@@ -66,6 +66,9 @@ private IEnumerable GetOrchestrationTypes()
typeof(SimpleOrchestrationWithTasks),
typeof(SimpleOrchestrationWithTimer),
typeof(SimpleOrchestrationWithSubOrchestration),
+ typeof(GenerationBasicOrchestration),
+ typeof(RecurringOrchestration),
+ typeof(RecurringTargetOrchestration),
typeof(DriverOrchestration),
typeof(TestOrchestration),
typeof(ExecutionCountingOrchestration)
diff --git a/src/DurableTask.AzureServiceFabric/DurableTask.AzureServiceFabric.csproj b/src/DurableTask.AzureServiceFabric/DurableTask.AzureServiceFabric.csproj
index ec487396f..0c5b0c2e5 100644
--- a/src/DurableTask.AzureServiceFabric/DurableTask.AzureServiceFabric.csproj
+++ b/src/DurableTask.AzureServiceFabric/DurableTask.AzureServiceFabric.csproj
@@ -5,7 +5,7 @@
net461
true
Microsoft.Azure.DurableTask.AzureServiceFabric
- 2.2.0
+ 2.3.0
$(Version)
$(Version)
diff --git a/src/DurableTask.AzureServiceFabric/FabricOrchestrationInstanceStore.cs b/src/DurableTask.AzureServiceFabric/FabricOrchestrationInstanceStore.cs
index 086df7572..cd41d0841 100644
--- a/src/DurableTask.AzureServiceFabric/FabricOrchestrationInstanceStore.cs
+++ b/src/DurableTask.AzureServiceFabric/FabricOrchestrationInstanceStore.cs
@@ -40,7 +40,7 @@ class FabricOrchestrationInstanceStore : IFabricOrchestrationServiceInstanceStor
const string TimeFormatStringPrefix = "yyyy-MM-dd-";
readonly IReliableStateManager stateManager;
readonly CancellationToken cancellationToken;
- readonly ConcurrentDictionary orchestrationWaiters = new ConcurrentDictionary(OrchestrationInstanceComparer.Default);
+ readonly ConcurrentDictionary orchestrationWaiters = new ConcurrentDictionary(StringComparer.Ordinal);
IReliableDictionary instanceStore;
IReliableDictionary> executionIdStore;
@@ -88,22 +88,19 @@ public async Task WriteEntitiesAsync(ITransaction transaction, IEnumerable { instance.ExecutionId },
- (k, old) =>
+ await this.executionIdStore.AddOrUpdateAsync(transaction, instance.InstanceId, new List { instance.ExecutionId },
+ (k, old) =>
+ {
+ old.Add(instance.ExecutionId);
+ if (old.Count > this.MaxExecutionIdsLength)
{
- old.Add(instance.ExecutionId);
- if (old.Count > this.MaxExecutionIdsLength)
- {
- // Remove first 10% items.
- int skipItemsLength = (int)(this.MaxExecutionIdsLength * 0.1);
- old = old.Skip(skipItemsLength).ToList();
- }
-
- return old;
- });
- }
+ // Remove first 10% items.
+ int skipItemsLength = (int)(this.MaxExecutionIdsLength * 0.1);
+ old = old.Skip(skipItemsLength).ToList();
+ }
+
+ return old;
+ });
await this.instanceStore.AddOrUpdateAsync(transaction, key, state.State, (k, oldValue) => state.State);
}
@@ -132,7 +129,7 @@ public async Task> GetOrchestrationState
throw new NotImplementedException("Querying for state across all executions for an orchestration is not supported, only the latest execution can be queried");
}
- await EnsureStoreInitializedAsync();
+ await this.EnsureStoreInitializedAsync();
string latestExecutionId = (await GetExecutionIds(instanceId))?.Last();
@@ -340,9 +337,15 @@ async Task EnsureStoreInitializedAsync()
}
}
- public async Task WaitForOrchestrationAsync(OrchestrationInstance instance, TimeSpan timeout)
+ public async Task WaitForOrchestrationAsync(string instanceId, TimeSpan timeout)
{
- var currentState = await this.GetOrchestrationStateAsync(instance.InstanceId, instance.ExecutionId);
+ var executionId = (await this.GetExecutionIds(instanceId)).Last();
+ if (executionId == null)
+ {
+ return null;
+ }
+
+ var currentState = await this.GetOrchestrationStateAsync(instanceId, executionId);
// If querying state for an orchestration that's not started or completed and state cleaned up, we will immediately return null.
if (currentState?.State == null)
@@ -355,15 +358,15 @@ public async Task WaitForOrchestrationAsync(Or
return currentState;
}
- var waiter = this.orchestrationWaiters.GetOrAdd(instance, new AsyncManualResetEvent());
+ var waiter = this.orchestrationWaiters.GetOrAdd(instanceId, new AsyncManualResetEvent());
bool completed = await waiter.WaitAsync(timeout, this.cancellationToken);
if (!completed)
{
- this.orchestrationWaiters.TryRemove(instance, out waiter);
+ this.orchestrationWaiters.TryRemove(instanceId, out waiter);
}
- currentState = await this.GetOrchestrationStateAsync(instance.InstanceId, instance.ExecutionId);
+ currentState = (await this.GetOrchestrationStateAsync(instanceId, allInstances: false)).FirstOrDefault();
if (currentState?.State != null && currentState.State.OrchestrationStatus.IsTerminalState())
{
return currentState;
@@ -374,7 +377,7 @@ public async Task WaitForOrchestrationAsync(Or
public void OnOrchestrationCompleted(OrchestrationInstance instance)
{
- if (this.orchestrationWaiters.TryRemove(instance, out AsyncManualResetEvent resetEvent))
+ if (this.orchestrationWaiters.TryRemove(instance.InstanceId, out AsyncManualResetEvent resetEvent))
{
resetEvent.Set();
}
diff --git a/src/DurableTask.AzureServiceFabric/FabricOrchestrationService.cs b/src/DurableTask.AzureServiceFabric/FabricOrchestrationService.cs
index fe9ef78ba..3e3821934 100644
--- a/src/DurableTask.AzureServiceFabric/FabricOrchestrationService.cs
+++ b/src/DurableTask.AzureServiceFabric/FabricOrchestrationService.cs
@@ -117,7 +117,7 @@ public Task DeleteAsync(bool deleteInstanceStore)
public bool IsMaxMessageCountExceeded(int currentMessageCount, OrchestrationRuntimeState runtimeState)
{
- return false;
+ return true;
}
public int GetDelayInSecondsAfterOnProcessException(Exception exception)
@@ -164,7 +164,7 @@ public async Task LockNextTaskOrchestrationWorkItemAs
return null;
}
- bool isComplete = currentRuntimeState.OrchestrationStatus.IsTerminalState();
+ bool isComplete = this.IsOrchestrationComplete(currentRuntimeState.OrchestrationStatus);
if (isComplete)
{
await this.HandleCompletedOrchestration(workItem);
@@ -196,7 +196,7 @@ public async Task LockNextTaskOrchestrationWorkItemAs
public Task RenewTaskOrchestrationWorkItemLockAsync(TaskOrchestrationWorkItem workItem)
{
- throw new NotImplementedException();
+ return Task.CompletedTask;
}
public async Task CompleteTaskOrchestrationWorkItemAsync(
@@ -209,13 +209,10 @@ public async Task CompleteTaskOrchestrationWorkItemAsync(
OrchestrationState orchestrationState)
{
SessionInformation sessionInfo = GetSessionInfo(workItem.InstanceId);
-
- if (continuedAsNewMessage != null)
- {
- throw new Exception("ContinueAsNew is not supported yet");
- }
-
- bool isComplete = workItem.OrchestrationRuntimeState.OrchestrationStatus.IsTerminalState();
+ ServiceFabricProviderEventSource.Tracing.LogOrchestrationInformation(workItem.InstanceId,
+ workItem.OrchestrationRuntimeState.OrchestrationInstance?.ExecutionId,
+ $"Current orchestration status: {workItem.OrchestrationRuntimeState.OrchestrationStatus}");
+ bool isComplete = this.IsOrchestrationComplete(workItem.OrchestrationRuntimeState.OrchestrationStatus);
IList sessionsToEnqueue = null;
List> scheduledMessages = null;
@@ -260,8 +257,19 @@ await RetryHelper.ExecuteWithRetryOnTransient(async () =>
}
}
+ if (continuedAsNewMessage != null)
+ {
+ await this.orchestrationProvider.AppendMessageAsync(txn, new TaskMessageItem(continuedAsNewMessage));
+ sessionsToEnqueue = new List() { continuedAsNewMessage.OrchestrationInstance };
+ }
+
await this.orchestrationProvider.CompleteMessages(txn, sessionInfo.Instance, sessionInfo.LockTokens);
+ if (workItem.OrchestrationRuntimeState.OrchestrationStatus == OrchestrationStatus.ContinuedAsNew)
+ {
+ await HandleCompletedOrchestration(workItem);
+ }
+
// When an orchestration is completed, we need to drop the session which involves 2 steps (1) Removing the row from sessions
// (2) Dropping the session messages dictionary. The second step is done in background thread for performance so is not
// part of transaction. Since it will happen outside the trasanction, if this transaction fails for some reason and we dropped
@@ -275,7 +283,8 @@ await RetryHelper.ExecuteWithRetryOnTransient(async () =>
// mark it as complete even if it is. So we use the work item's runtime state when 'newOrchestrationRuntimeState' is null
// so that the latest state is what is stored for the session.
// As part of next transaction, we are going to remove the row anyway for the session and it doesn't matter to update it to 'null'.
- await this.orchestrationProvider.UpdateSessionState(txn, sessionInfo.Instance, newOrchestrationRuntimeState ?? workItem.OrchestrationRuntimeState);
+
+ await this.orchestrationProvider.UpdateSessionState(txn, newOrchestrationRuntimeState.OrchestrationInstance, newOrchestrationRuntimeState ?? workItem.OrchestrationRuntimeState);
// We skip writing to instanceStore when orchestration reached terminal state to avoid a minor timing issue that
// wait for an orchestration completes but another orchestration with the same name cannot be started immediately
@@ -386,7 +395,7 @@ public Task AbandonTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem work
public Task ReleaseTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem workItem)
{
- bool isComplete = workItem.OrchestrationRuntimeState.OrchestrationStatus.IsTerminalState();
+ bool isComplete = this.IsOrchestrationComplete(workItem.OrchestrationRuntimeState.OrchestrationStatus);
SessionInformation sessionInfo = TryRemoveSessionInfo(workItem.InstanceId);
if (sessionInfo != null)
@@ -398,9 +407,10 @@ public Task ReleaseTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem work
}
public int TaskActivityDispatcherCount => this.settings.TaskActivityDispatcherSettings.DispatcherCount;
+
public int MaxConcurrentTaskActivityWorkItems => this.settings.TaskActivityDispatcherSettings.MaxConcurrentActivities;
- public BehaviorOnContinueAsNew EventBehaviourForContinueAsNew { get; }
+ public BehaviorOnContinueAsNew EventBehaviourForContinueAsNew { get; } = BehaviorOnContinueAsNew.Ignore;
// Note: Do not rely on cancellationToken parameter to this method because the top layer does not yet implement any cancellation.
public async Task LockNextTaskActivityWorkItem(TimeSpan receiveTimeout, CancellationToken cancellationToken)
@@ -502,6 +512,11 @@ int GetDelayForFetchOrProcessException(Exception exception)
return 0;
}
+ bool IsOrchestrationComplete(OrchestrationStatus status)
+ {
+ return !(status.IsRunningOrPending() || status == OrchestrationStatus.ContinuedAsNew);
+ }
+
SessionInformation GetSessionInfo(string sessionId)
{
ServiceFabricProviderEventSource.Tracing.TraceMessage(sessionId, $"{nameof(GetSessionInfo)} - Getting session info");
diff --git a/src/DurableTask.AzureServiceFabric/FabricOrchestrationServiceClient.cs b/src/DurableTask.AzureServiceFabric/FabricOrchestrationServiceClient.cs
index 96495d606..1e466d6e9 100644
--- a/src/DurableTask.AzureServiceFabric/FabricOrchestrationServiceClient.cs
+++ b/src/DurableTask.AzureServiceFabric/FabricOrchestrationServiceClient.cs
@@ -177,8 +177,7 @@ public Task PurgeOrchestrationHistoryAsync(DateTime thresholdDateTimeUtc, Orches
public async Task WaitForOrchestrationAsync(string instanceId, string executionId, TimeSpan timeout, CancellationToken cancellationToken)
{
instanceId.EnsureValidInstanceId();
- var instance = new OrchestrationInstance() { InstanceId = instanceId, ExecutionId = executionId };
- var state = await this.instanceStore.WaitForOrchestrationAsync(instance, timeout);
+ var state = await this.instanceStore.WaitForOrchestrationAsync(instanceId, timeout);
return state?.State;
}
#endregion
diff --git a/src/DurableTask.AzureServiceFabric/IFabricOrchestrationServiceInstanceStore.cs b/src/DurableTask.AzureServiceFabric/IFabricOrchestrationServiceInstanceStore.cs
index 187e88b2c..16f859243 100644
--- a/src/DurableTask.AzureServiceFabric/IFabricOrchestrationServiceInstanceStore.cs
+++ b/src/DurableTask.AzureServiceFabric/IFabricOrchestrationServiceInstanceStore.cs
@@ -82,7 +82,7 @@ internal interface IFabricOrchestrationServiceInstanceStore
void OnOrchestrationCompleted(OrchestrationInstance instance);
- Task WaitForOrchestrationAsync(OrchestrationInstance instance, TimeSpan timeout);
+ Task WaitForOrchestrationAsync(string instanceId, TimeSpan timeout);
Task> GetExecutionIds(string instanceId);
}
diff --git a/src/DurableTask.AzureServiceFabric/README.md b/src/DurableTask.AzureServiceFabric/README.md
index a4a298472..229674327 100644
--- a/src/DurableTask.AzureServiceFabric/README.md
+++ b/src/DurableTask.AzureServiceFabric/README.md
@@ -20,7 +20,6 @@ However, with the Service fabric provider, the provider runs on the same cluster
## What is pending?
-* ContinueAsNew needs to be implemented.
* Providing support for Azure Storage instance store with Service Fabric based provider. (So that we can persist the state more permanantly)
* External events should work but not tested.
* Support for history event tracking.
diff --git a/src/DurableTask.AzureServiceFabric/Remote/RemoteOrchestrationServiceClient.cs b/src/DurableTask.AzureServiceFabric/Remote/RemoteOrchestrationServiceClient.cs
index 066ab7888..5e9c8eb47 100644
--- a/src/DurableTask.AzureServiceFabric/Remote/RemoteOrchestrationServiceClient.cs
+++ b/src/DurableTask.AzureServiceFabric/Remote/RemoteOrchestrationServiceClient.cs
@@ -280,7 +280,7 @@ private async Task ConstructEndpointUriAsync(string instanceId, string frag
cancellationToken.ThrowIfCancellationRequested();
string endpoint = await this.partitionProvider.GetPartitionEndPointAsync(instanceId, cancellationToken);
string defaultEndPoint = GetDefaultEndPoint(endpoint);
- return new Uri($"{defaultEndPoint}/{fragment}");
+ return new Uri($"{defaultEndPoint.TrimEnd('/')}/{fragment}");
}
private async Task> GetAllEndpointsAsync(CancellationToken cancellationToken)
diff --git a/src/DurableTask.AzureServiceFabric/Service/Startup.cs b/src/DurableTask.AzureServiceFabric/Service/Startup.cs
index 20ade0f5c..f1ea6279a 100644
--- a/src/DurableTask.AzureServiceFabric/Service/Startup.cs
+++ b/src/DurableTask.AzureServiceFabric/Service/Startup.cs
@@ -57,6 +57,8 @@ void IOwinAppBuilder.Startup(IAppBuilder appBuilder)
config.MapHttpAttributeRoutes();
config.DependencyResolver = new DefaultDependencyResolver(GenerateServiceProvider());
config.IncludeErrorDetailPolicy = IncludeErrorDetailPolicy.Always;
+ config.Formatters.Remove(config.Formatters.XmlFormatter);
+ config.Formatters.Remove(config.Formatters.FormUrlEncodedFormatter);
config.Formatters.JsonFormatter.SerializerSettings.TypeNameHandling = Newtonsoft.Json.TypeNameHandling.All;
appBuilder.UseWebApi(config);
}
diff --git a/src/DurableTask.AzureServiceFabric/TaskHelpers/CountBasedFixedDelayRetryPolicy.cs b/src/DurableTask.AzureServiceFabric/TaskHelpers/CountBasedFixedDelayRetryPolicy.cs
index f2d5d03da..1f562c03e 100644
--- a/src/DurableTask.AzureServiceFabric/TaskHelpers/CountBasedFixedDelayRetryPolicy.cs
+++ b/src/DurableTask.AzureServiceFabric/TaskHelpers/CountBasedFixedDelayRetryPolicy.cs
@@ -15,34 +15,28 @@ namespace DurableTask.AzureServiceFabric.TaskHelpers
{
using System;
- class CountBasedFixedDelayRetryPolicy : RetryPolicy
+ class CountBasedFixedDelayRetryPolicy : IRetryPolicy
{
- readonly int maxNumberOfAttempts;
readonly TimeSpan delay;
int pendingAttempts;
public CountBasedFixedDelayRetryPolicy(int maxNumberOfAttempts, TimeSpan delay)
{
- this.maxNumberOfAttempts = maxNumberOfAttempts;
this.delay = delay;
this.pendingAttempts = maxNumberOfAttempts;
}
- public override bool ShouldExecute()
+ public bool ShouldExecute()
{
return this.pendingAttempts-- > 0;
}
- public override TimeSpan GetNextDelay()
+ public TimeSpan GetNextDelay()
{
- if (this.pendingAttempts < 1)
- {
- return TimeSpan.Zero;
- }
- return this.delay;
+ return this.pendingAttempts < 1 ? TimeSpan.Zero : this.delay;
}
- public static RetryPolicy GetNewDefaultPolicy()
+ public static IRetryPolicy GetNewDefaultPolicy()
{
return new CountBasedFixedDelayRetryPolicy(3, TimeSpan.FromMilliseconds(100));
}
diff --git a/src/DurableTask.AzureServiceFabric/TaskHelpers/RetryHelper.cs b/src/DurableTask.AzureServiceFabric/TaskHelpers/RetryHelper.cs
index 94768d0e9..3441a921f 100644
--- a/src/DurableTask.AzureServiceFabric/TaskHelpers/RetryHelper.cs
+++ b/src/DurableTask.AzureServiceFabric/TaskHelpers/RetryHelper.cs
@@ -26,7 +26,7 @@ public static Task ExecuteWithRetryOnTransient(Func action, string uniqueA
return ExecuteWithRetryOnTransient(action, CountBasedFixedDelayRetryPolicy.GetNewDefaultPolicy(), uniqueActionIdentifier);
}
- static async Task ExecuteWithRetryOnTransient(Func action, RetryPolicy retryPolicy, string uniqueActionIdentifier)
+ static async Task ExecuteWithRetryOnTransient(Func action, IRetryPolicy retryPolicy, string uniqueActionIdentifier)
{
Exception lastException = null;
@@ -37,6 +37,7 @@ static async Task ExecuteWithRetryOnTransient(Func action, RetryPolicy ret
Stopwatch timer = Stopwatch.StartNew();
try
{
+ ServiceFabricProviderEventSource.Tracing.LogOrchestrationInformation(uniqueActionIdentifier, null, "Executing action");
await action();
timer.Stop();
ServiceFabricProviderEventSource.Tracing.LogMeasurement($"{uniqueActionIdentifier}, Attempt Number : {attemptNumber}, Result : Success", timer.ElapsedMilliseconds);
@@ -70,7 +71,7 @@ public static Task ExecuteWithRetryOnTransient(Func ExecuteWithRetryOnTransient(Func> action, RetryPolicy retryPolicy, string uniqueActionIdentifier)
+ static async Task ExecuteWithRetryOnTransient(Func> action, IRetryPolicy retryPolicy, string uniqueActionIdentifier)
{
Exception lastException = null;
diff --git a/src/DurableTask.AzureServiceFabric/TaskHelpers/RetryPolicy.cs b/src/DurableTask.AzureServiceFabric/TaskHelpers/RetryPolicy.cs
index afc4fc7bf..7e2ba8f7b 100644
--- a/src/DurableTask.AzureServiceFabric/TaskHelpers/RetryPolicy.cs
+++ b/src/DurableTask.AzureServiceFabric/TaskHelpers/RetryPolicy.cs
@@ -15,10 +15,10 @@ namespace DurableTask.AzureServiceFabric.TaskHelpers
{
using System;
- abstract class RetryPolicy
+ interface IRetryPolicy
{
- public abstract bool ShouldExecute();
+ bool ShouldExecute();
- public abstract TimeSpan GetNextDelay();
+ TimeSpan GetNextDelay();
}
}
diff --git a/test/DurableTask.Test.Orchestrations/SimpleOrchestrations.cs b/test/DurableTask.Test.Orchestrations/SimpleOrchestrations.cs
index a83e1bbdf..8c73060a4 100644
--- a/test/DurableTask.Test.Orchestrations/SimpleOrchestrations.cs
+++ b/test/DurableTask.Test.Orchestrations/SimpleOrchestrations.cs
@@ -264,7 +264,7 @@ public override void OnEvent(OrchestrationContext context, string name, string i
}
}
- public sealed class ContinueAsNewThenTimerOrchestration : TaskOrchestration
+ public sealed class ContinueAsNewThenTimerOrchestration : TaskOrchestration
{
public override async Task RunTask(OrchestrationContext context, int input)
{