diff --git a/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/EventHubStreamOptions.cs b/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/EventHubStreamOptions.cs
index 931b9174190..cd727075ffa 100644
--- a/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/EventHubStreamOptions.cs
+++ b/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/EventHubStreamOptions.cs
@@ -1,8 +1,5 @@
using System;
-using System.Collections.Generic;
-using Microsoft.Extensions.Options;
-using Orleans.Runtime.Configuration;
namespace Orleans.Configuration
{
diff --git a/src/Orleans.Core.Legacy/Configuration/ConfigurationExtensions.cs b/src/Orleans.Core.Legacy/Configuration/ConfigurationExtensions.cs
index b7fe35ef7be..47eaa945992 100644
--- a/src/Orleans.Core.Legacy/Configuration/ConfigurationExtensions.cs
+++ b/src/Orleans.Core.Legacy/Configuration/ConfigurationExtensions.cs
@@ -10,40 +10,6 @@ namespace Orleans.Runtime.Configuration
///
public static class ConfigurationExtensions
{
- ///
- /// Adds a stream provider of type
- ///
- /// The cluster configuration object to add provider to.
- /// The provider name.
- /// Specifies whether the producer waits for the consumer to process the event before continuing. Setting this to false is useful for troubleshooting serialization issues.
- /// If set to true items transfered via the stream are always wrapped in Immutable for delivery.>
- /// Specifies how can grains subscribe to this stream.
- public static void AddSimpleMessageStreamProvider(this ClientConfiguration config, string providerName,
- bool fireAndForgetDelivery = SimpleMessageStreamProvider.DEFAULT_VALUE_FIRE_AND_FORGET_DELIVERY,
- bool optimizeForImmutableData = SimpleMessageStreamProvider.DEFAULT_VALUE_OPTIMIZE_FOR_IMMUTABLE_DATA,
- StreamPubSubType pubSubType = SimpleMessageStreamProvider.DEFAULT_STREAM_PUBSUB_TYPE)
- {
- var properties = GetSimpleMessageStreamProviderConfiguration(providerName, fireAndForgetDelivery, optimizeForImmutableData, pubSubType);
- config.RegisterStreamProvider(providerName, properties);
- }
-
- ///
- /// Adds a stream provider of type
- ///
- /// The cluster configuration object to add provider to.
- /// The provider name.
- /// Specifies whether the producer waits for the consumer to process the event before continuing. Setting this to false is useful for troubleshooting serialization issues.
- /// If set to true items transfered via the stream are always wrapped in Immutable for delivery.
- /// Specifies how can grains subscribe to this stream.
- public static void AddSimpleMessageStreamProvider(this ClusterConfiguration config, string providerName,
- bool fireAndForgetDelivery = SimpleMessageStreamProvider.DEFAULT_VALUE_FIRE_AND_FORGET_DELIVERY,
- bool optimizeForImmutableData = SimpleMessageStreamProvider.DEFAULT_VALUE_OPTIMIZE_FOR_IMMUTABLE_DATA,
- StreamPubSubType pubSubType = SimpleMessageStreamProvider.DEFAULT_STREAM_PUBSUB_TYPE)
- {
- var properties = GetSimpleMessageStreamProviderConfiguration(providerName, fireAndForgetDelivery, optimizeForImmutableData, pubSubType);
- config.Globals.RegisterStreamProvider(providerName, properties);
- }
-
///
/// Configures all cluster nodes to use the specified startup class for dependency injection.
///
@@ -59,19 +25,5 @@ public static void UseStartupType(this ClusterConfiguration config)
config.Defaults.StartupTypeName = startupName;
}
-
- private static Dictionary GetSimpleMessageStreamProviderConfiguration(string providerName, bool fireAndForgetDelivery, bool optimizeForImmutableData, StreamPubSubType pubSubType)
- {
- if (string.IsNullOrWhiteSpace(providerName)) throw new ArgumentNullException(nameof(providerName));
-
- var properties = new Dictionary
- {
- { SimpleMessageStreamProvider.FIRE_AND_FORGET_DELIVERY, fireAndForgetDelivery.ToString() },
- { SimpleMessageStreamProvider.OPTIMIZE_FOR_IMMUTABLE_DATA, optimizeForImmutableData.ToString() },
- { SimpleMessageStreamProvider.STREAM_PUBSUB_TYPE, pubSubType.ToString() },
- };
-
- return properties;
- }
}
}
\ No newline at end of file
diff --git a/src/Orleans.Core/Streams/ClientStreamExtensions.cs b/src/Orleans.Core/Streams/ClientStreamExtensions.cs
index 89793b9c112..16bc61b7c68 100644
--- a/src/Orleans.Core/Streams/ClientStreamExtensions.cs
+++ b/src/Orleans.Core/Streams/ClientStreamExtensions.cs
@@ -3,6 +3,7 @@
using Microsoft.Extensions.DependencyInjection;
using Orleans.Configuration;
using Orleans.Providers.Streams.Common;
+using Orleans.Providers.Streams.SimpleMessageStream;
using Orleans.Runtime;
using Orleans.Streams;
@@ -49,5 +50,47 @@ public static IServiceCollection AddClusterClientPersistentStreams(thi
.AddSingletonNamedService>(name, (s, n) => ((PersistentStreamProvider)s.GetRequiredServiceByName(n)).ParticipateIn())
.AddSingletonNamedService(name, adapterFactory);
}
+
+ ///
+ /// Configure client to use SimpleMessageProvider
+ ///
+ public static IClientBuilder AddSimpleMessageStreamProvider(this IClientBuilder builder, string name,
+ Action configureOptions)
+
+ {
+ return builder.ConfigureServices(services =>
+ services.AddClusterClientSimpleMessageStreamProvider(name, configureOptions));
+ }
+
+ ///
+ /// Configure client to use SimpleMessageProvider
+ ///
+ public static IClientBuilder AddSimpleMessageStreamProvider(this IClientBuilder builder, string name,
+ Action> configureOptions = null)
+
+ {
+ return builder.ConfigureServices(services =>
+ services.AddClusterClientSimpleMessageStreamProvider(name, configureOptions));
+ }
+
+ ///
+ /// Configure client to use simple message stream provider
+ ///
+ public static IServiceCollection AddClusterClientSimpleMessageStreamProvider(this IServiceCollection services, string name,
+ Action configureOptions = null)
+ {
+ return services.AddClusterClientSimpleMessageStreamProvider(name, ob => ob.Configure(configureOptions));
+ }
+
+ ///
+ /// Configure client to use simple message provider
+ ///
+ public static IServiceCollection AddClusterClientSimpleMessageStreamProvider(this IServiceCollection services, string name,
+ Action> configureOptions = null)
+ {
+ configureOptions?.Invoke(services.AddOptions(name));
+ return services.ConfigureNamedOptionForLogging(name)
+ .AddSingletonNamedService(name, SimpleMessageStreamProvider.Create);
+ }
}
}
diff --git a/src/Orleans.Core/Streams/SimpleMessageStream/SimpleMessageStreamProvider.cs b/src/Orleans.Core/Streams/SimpleMessageStream/SimpleMessageStreamProvider.cs
index ec4b055f9ee..ea2c0ad8cd9 100644
--- a/src/Orleans.Core/Streams/SimpleMessageStream/SimpleMessageStreamProvider.cs
+++ b/src/Orleans.Core/Streams/SimpleMessageStream/SimpleMessageStreamProvider.cs
@@ -1,76 +1,49 @@
using System;
-using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
using Orleans.Runtime;
using Orleans.Streams;
using Orleans.Streams.Core;
-using Microsoft.Extensions.Logging;
+using Orleans.Serialization;
+using Orleans.Configuration;
namespace Orleans.Providers.Streams.SimpleMessageStream
{
- using Orleans.Serialization;
-
- public class SimpleMessageStreamProvider : IStreamProvider, IProvider, IInternalStreamProvider, IStreamSubscriptionManagerRetriever
+ public class SimpleMessageStreamProvider : IInternalStreamProvider, IStreamProvider, IStreamSubscriptionManagerRetriever
{
public string Name { get; private set; }
private ILogger logger;
private IStreamProviderRuntime providerRuntime;
- private bool fireAndForgetDelivery;
- private bool optimizeForImmutableData;
- private StreamPubSubType pubSubType;
- private ProviderStateManager stateManager = new ProviderStateManager();
private IRuntimeClient runtimeClient;
private IStreamSubscriptionManager streamSubscriptionManager;
private ILoggerFactory loggerFactory;
private SerializationManager serializationManager;
-
- internal const string STREAM_PUBSUB_TYPE = "PubSubType";
- internal const string FIRE_AND_FORGET_DELIVERY = "FireAndForgetDelivery";
- internal const string OPTIMIZE_FOR_IMMUTABLE_DATA = "OptimizeForImmutableData";
- internal const StreamPubSubType DEFAULT_STREAM_PUBSUB_TYPE = StreamPubSubType.ExplicitGrainBasedAndImplicit;
- internal const bool DEFAULT_VALUE_FIRE_AND_FORGET_DELIVERY = false;
- internal const bool DEFAULT_VALUE_OPTIMIZE_FOR_IMMUTABLE_DATA = true;
+ private SimpleMessageStreamProviderOptions options;
public bool IsRewindable { get { return false; } }
- public Task Init(string name, IProviderRuntime providerUtilitiesManager, IProviderConfiguration config)
+ public SimpleMessageStreamProvider(string name, SimpleMessageStreamProviderOptions options,
+ ILoggerFactory loggerFactory, IProviderRuntime providerRuntime, SerializationManager serializationManager)
{
- if (!stateManager.PresetState(ProviderState.Initialized)) return Task.CompletedTask;
+ this.loggerFactory = loggerFactory;
this.Name = name;
- providerRuntime = (IStreamProviderRuntime) providerUtilitiesManager;
- this.runtimeClient = this.providerRuntime.ServiceProvider.GetRequiredService();
- this.serializationManager = this.providerRuntime.ServiceProvider.GetRequiredService();
- fireAndForgetDelivery = config.GetBoolProperty(FIRE_AND_FORGET_DELIVERY, DEFAULT_VALUE_FIRE_AND_FORGET_DELIVERY);
- optimizeForImmutableData = config.GetBoolProperty(OPTIMIZE_FOR_IMMUTABLE_DATA, DEFAULT_VALUE_OPTIMIZE_FOR_IMMUTABLE_DATA);
-
- string pubSubTypeString;
- pubSubType = !config.Properties.TryGetValue(STREAM_PUBSUB_TYPE, out pubSubTypeString)
- ? DEFAULT_STREAM_PUBSUB_TYPE
- : (StreamPubSubType)Enum.Parse(typeof(StreamPubSubType), pubSubTypeString);
- if (pubSubType == StreamPubSubType.ExplicitGrainBasedAndImplicit
- || pubSubType == StreamPubSubType.ExplicitGrainBasedOnly)
+ this.logger = loggerFactory.CreateLogger($"{this.GetType().FullName}.{name}");
+ this.options = options;
+ this.providerRuntime = providerRuntime as IStreamProviderRuntime;
+ this.runtimeClient = providerRuntime.ServiceProvider.GetService();
+ this.serializationManager = serializationManager;
+ if (this.options.PubSubType == StreamPubSubType.ExplicitGrainBasedAndImplicit
+ || this.options.PubSubType == StreamPubSubType.ExplicitGrainBasedOnly)
{
this.streamSubscriptionManager = this.providerRuntime.ServiceProvider
- .GetService().GetStreamSubscriptionManager(StreamSubscriptionManagerType.ExplicitSubscribeOnly);
+ .GetService()
+ .GetStreamSubscriptionManager(StreamSubscriptionManagerType.ExplicitSubscribeOnly);
}
- this.loggerFactory = providerRuntime.ServiceProvider.GetRequiredService();
- logger = loggerFactory.CreateLogger(this.GetType().FullName);
- logger.Info("Initialized SimpleMessageStreamProvider with name {0} and with property FireAndForgetDelivery: {1}, OptimizeForImmutableData: {2} " +
- "and PubSubType: {3}", Name, fireAndForgetDelivery, optimizeForImmutableData, pubSubType);
- stateManager.CommitState();
- return Task.CompletedTask;
- }
-
- public Task Start()
- {
- if (stateManager.PresetState(ProviderState.Started)) stateManager.CommitState();
- return Task.CompletedTask;
- }
-
- public Task Close()
- {
- if (stateManager.PresetState(ProviderState.Closed)) stateManager.CommitState();
- return Task.CompletedTask;
+ logger.Info(
+ "Initialized SimpleMessageStreamProvider with name {0} and with property FireAndForgetDelivery: {1}, OptimizeForImmutableData: {2} " +
+ "and PubSubType: {3}", Name, this.options.FireAndForgetDelivery, this.options.OptimizeForImmutableData,
+ this.options.PubSubType);
}
public IStreamSubscriptionManager GetStreamSubscriptionManager()
@@ -89,7 +62,7 @@ public IAsyncStream GetStream(Guid id, string streamNamespace)
IInternalAsyncBatchObserver IInternalStreamProvider.GetProducerInterface(IAsyncStream stream)
{
return new SimpleMessageStreamProducer((StreamImpl)stream, Name, providerRuntime,
- fireAndForgetDelivery, optimizeForImmutableData, providerRuntime.PubSub(pubSubType), IsRewindable,
+ this.options.FireAndForgetDelivery, this.options.OptimizeForImmutableData, providerRuntime.PubSub(this.options.PubSubType), IsRewindable,
this.serializationManager, this.loggerFactory);
}
@@ -101,7 +74,12 @@ IInternalAsyncObservable IInternalStreamProvider.GetConsumerInterface(IAsy
private IInternalAsyncObservable GetConsumerInterfaceImpl(IAsyncStream stream)
{
return new StreamConsumer((StreamImpl)stream, Name, providerRuntime,
- providerRuntime.PubSub(pubSubType), this.logger, IsRewindable);
+ providerRuntime.PubSub(this.options.PubSubType), this.logger, IsRewindable);
+ }
+
+ public static IStreamProvider Create(IServiceProvider services, string name)
+ {
+ return ActivatorUtilities.CreateInstance(services, name, services.GetService>().Get(name));
}
}
}
diff --git a/src/Orleans.Core/Streams/SimpleMessageStream/SimpleMessageStreamProviderOptions.cs b/src/Orleans.Core/Streams/SimpleMessageStream/SimpleMessageStreamProviderOptions.cs
new file mode 100644
index 00000000000..81784eb0da2
--- /dev/null
+++ b/src/Orleans.Core/Streams/SimpleMessageStream/SimpleMessageStreamProviderOptions.cs
@@ -0,0 +1,17 @@
+
+using Orleans.Streams;
+
+namespace Orleans.Configuration
+{
+ public class SimpleMessageStreamProviderOptions
+ {
+ public bool FireAndForgetDelivery { get; set; } = DEFAULT_VALUE_FIRE_AND_FORGET_DELIVERY;
+ public const bool DEFAULT_VALUE_FIRE_AND_FORGET_DELIVERY = false;
+
+ public bool OptimizeForImmutableData { get; set; } = DEFAULT_VALUE_OPTIMIZE_FOR_IMMUTABLE_DATA;
+ public const bool DEFAULT_VALUE_OPTIMIZE_FOR_IMMUTABLE_DATA = true;
+
+ public StreamPubSubType PubSubType { get; set; } = DEFAULT_PUBSUB_TYPE;
+ public static StreamPubSubType DEFAULT_PUBSUB_TYPE = StreamPubSubType.ExplicitGrainBasedAndImplicit;
+ }
+}
diff --git a/src/Orleans.Runtime.Abstractions/Hosting/StreamHostingExtensions.cs b/src/Orleans.Runtime.Abstractions/Hosting/StreamHostingExtensions.cs
index 194a033eb55..02043d8c27a 100644
--- a/src/Orleans.Runtime.Abstractions/Hosting/StreamHostingExtensions.cs
+++ b/src/Orleans.Runtime.Abstractions/Hosting/StreamHostingExtensions.cs
@@ -6,6 +6,7 @@
using Orleans.Providers.Streams.Common;
using Orleans.Providers;
using Orleans.Configuration;
+using Orleans.Providers.Streams.SimpleMessageStream;
namespace Orleans.Hosting
{
@@ -51,5 +52,47 @@ public static IServiceCollection AddSiloPersistentStreams(this IServic
.AddSingletonNamedService(name, adapterFactory)
.AddSingletonNamedService(name, (s, n) => s.GetServiceByName(n) as IControllable);
}
+
+ ///
+ /// Configure silo to use SimpleMessageProvider
+ ///
+ public static ISiloHostBuilder AddSimpleMessageStreamProvider(this ISiloHostBuilder builder, string name,
+ Action configureOptions)
+
+ {
+ return builder.ConfigureServices(services =>
+ services.AddSiloSimpleMessageStreamProvider(name, configureOptions));
+ }
+
+ ///
+ /// Configure silo to use SimpleMessageProvider
+ ///
+ public static ISiloHostBuilder AddSimpleMessageStreamProvider(this ISiloHostBuilder builder, string name,
+ Action> configureOptions = null)
+
+ {
+ return builder.ConfigureServices(services =>
+ services.AddSiloSimpleMessageStreamProvider(name, configureOptions));
+ }
+
+ ///
+ /// Configure silo to use simple message stream provider
+ ///
+ public static IServiceCollection AddSiloSimpleMessageStreamProvider(this IServiceCollection services, string name,
+ Action configureOptions = null)
+ {
+ return services.AddSiloSimpleMessageStreamProvider(name, ob => ob.Configure(configureOptions));
+ }
+
+ ///
+ /// Configure silo to use simple message provider
+ ///
+ public static IServiceCollection AddSiloSimpleMessageStreamProvider(this IServiceCollection services, string name,
+ Action> configureOptions = null)
+ {
+ configureOptions?.Invoke(services.AddOptions(name));
+ return services.ConfigureNamedOptionForLogging(name)
+ .AddSingletonNamedService(name, SimpleMessageStreamProvider.Create);
+ }
}
}
diff --git a/test/AWSUtils.Tests/Streaming/SQSStreamTests.cs b/test/AWSUtils.Tests/Streaming/SQSStreamTests.cs
index 0ee4a6105a7..840baa8b364 100644
--- a/test/AWSUtils.Tests/Streaming/SQSStreamTests.cs
+++ b/test/AWSUtils.Tests/Streaming/SQSStreamTests.cs
@@ -35,10 +35,6 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder)
//from the config files
options.ClusterConfiguration.AddMemoryStorageProvider("MemoryStore", numStorageGrains: 1);
- options.ClusterConfiguration.AddSimpleMessageStreamProvider("SMSProvider", fireAndForgetDelivery: false);
-
- options.ClientConfiguration.AddSimpleMessageStreamProvider("SMSProvider", fireAndForgetDelivery: false);
-
//previous silo creation
options.ClusterConfiguration.Globals.DataConnectionString = AWSTestConstants.DefaultSQSConnectionString;
options.ClientConfiguration.DataConnectionString = AWSTestConstants.DefaultSQSConnectionString;
@@ -66,6 +62,7 @@ private class MySiloBuilderConfigurator : ISiloBuilderConfigurator
public void Configure(ISiloHostBuilder hostBuilder)
{
hostBuilder
+ .AddSimpleMessageStreamProvider("SMSProvider")
.AddSqsStreams("SQSProvider", options =>
{
options.ConnectionString = AWSTestConstants.DefaultSQSConnectionString;
@@ -82,6 +79,7 @@ private class MyClientBuilderConfigurator : IClientBuilderConfigurator
public void Configure(IConfiguration configuration, IClientBuilder clientBuilder)
{
clientBuilder
+ .AddSimpleMessageStreamProvider("SMSProvider")
.AddSqsStreams("SQSProvider", options =>
{
options.ConnectionString = AWSTestConstants.DefaultSQSConnectionString;
diff --git a/test/GoogleUtils.Tests/Streaming/PubSubStreamTests.cs b/test/GoogleUtils.Tests/Streaming/PubSubStreamTests.cs
index b0397e7a41d..0ab8660e171 100644
--- a/test/GoogleUtils.Tests/Streaming/PubSubStreamTests.cs
+++ b/test/GoogleUtils.Tests/Streaming/PubSubStreamTests.cs
@@ -31,8 +31,6 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder)
{
//from the config files
legacy.ClusterConfiguration.AddMemoryStorageProvider("MemoryStore", numStorageGrains: 1);
- legacy.ClusterConfiguration.AddSimpleMessageStreamProvider("SMSProvider", fireAndForgetDelivery: false);
- legacy.ClientConfiguration.AddSimpleMessageStreamProvider("SMSProvider", fireAndForgetDelivery: false);
legacy.ClusterConfiguration.Globals.RegisterStorageProvider("PubSubStore");
});
@@ -45,6 +43,7 @@ private class MySiloBuilderConfigurator : ISiloBuilderConfigurator
public void Configure(ISiloHostBuilder hostBuilder)
{
hostBuilder
+ .AddSimpleMessageStreamProvider("SMSProvider")
.AddPubSubStreams(PUBSUB_STREAM_PROVIDER_NAME, options =>
{
options.ProjectId = GoogleTestUtils.ProjectId;
@@ -60,6 +59,7 @@ private class MyClientBuilderConfigurator : IClientBuilderConfigurator
public void Configure(IConfiguration configuration, IClientBuilder clientBuilder)
{
clientBuilder
+ .AddSimpleMessageStreamProvider("SMSProvider")
.AddPubSubStreams(PUBSUB_STREAM_PROVIDER_NAME, options =>
{
options.ProjectId = GoogleTestUtils.ProjectId;
diff --git a/test/Tester/GrainCallFilterTests.cs b/test/Tester/GrainCallFilterTests.cs
index a2a682fd80b..dd5502b9e0f 100644
--- a/test/Tester/GrainCallFilterTests.cs
+++ b/test/Tester/GrainCallFilterTests.cs
@@ -32,9 +32,24 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder)
{
legacy.ClusterConfiguration.AddMemoryStorageProvider("Default");
legacy.ClusterConfiguration.AddMemoryStorageProvider("PubSubStore");
- legacy.ClusterConfiguration.AddSimpleMessageStreamProvider("SMSProvider");
- legacy.ClientConfiguration.AddSimpleMessageStreamProvider("SMSProvider");
});
+ builder.AddClientBuilderConfigurator();
+ builder.AddSiloBuilderConfigurator();
+ }
+
+ public class SiloConfigurator : ISiloBuilderConfigurator
+ {
+ public void Configure(ISiloHostBuilder hostBuilder)
+ {
+ hostBuilder.AddSimpleMessageStreamProvider("SMSProvider");
+ }
+ }
+ public class ClientConfiguretor : IClientBuilderConfigurator
+ {
+ public void Configure(IConfiguration configuration, IClientBuilder clientBuilder)
+ {
+ clientBuilder.AddSimpleMessageStreamProvider("SMSProvider");
+ }
}
private class SiloInvokerTestSiloBuilderConfigurator : ISiloBuilderConfigurator
diff --git a/test/Tester/StreamingTests/ProgrammaticSubscribeTests/ProgrammaticSubscribeTestSMSStreamProvider.cs b/test/Tester/StreamingTests/ProgrammaticSubscribeTests/ProgrammaticSubscribeTestSMSStreamProvider.cs
index d61740f130a..366b3a56fd7 100644
--- a/test/Tester/StreamingTests/ProgrammaticSubscribeTests/ProgrammaticSubscribeTestSMSStreamProvider.cs
+++ b/test/Tester/StreamingTests/ProgrammaticSubscribeTests/ProgrammaticSubscribeTestSMSStreamProvider.cs
@@ -3,6 +3,9 @@
using System.Linq;
using System.Text;
using System.Threading.Tasks;
+using Microsoft.Extensions.Configuration;
+using Orleans;
+using Orleans.Hosting;
using Orleans.Runtime.Configuration;
using Orleans.Streams;
using Orleans.TestingHost;
@@ -23,15 +26,17 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder)
{
legacy.ClusterConfiguration.AddMemoryStorageProvider("Default");
legacy.ClusterConfiguration.AddMemoryStorageProvider("PubSubStore");
- legacy.ClusterConfiguration.AddSimpleMessageStreamProvider(StreamProviderName,
- false,
- true,
- StreamPubSubType.ExplicitGrainBasedAndImplicit);
- legacy.ClusterConfiguration.AddSimpleMessageStreamProvider(StreamProviderName2,
- false,
- true,
- StreamPubSubType.ExplicitGrainBasedOnly);
});
+ builder.AddSiloBuilderConfigurator();
+ }
+ }
+
+ public class SiloConfigurator : ISiloBuilderConfigurator
+ {
+ public void Configure(ISiloHostBuilder hostBuilder)
+ {
+ hostBuilder.AddSimpleMessageStreamProvider(StreamProviderName);
+ hostBuilder.AddSimpleMessageStreamProvider(StreamProviderName2, options => options.PubSubType = StreamPubSubType.ExplicitGrainBasedOnly);
}
}
diff --git a/test/Tester/StreamingTests/ProgrammaticSubscribeTests/ProgrammaticSubscribeTestsWithImplicitSubscrbingGrains.cs b/test/Tester/StreamingTests/ProgrammaticSubscribeTests/ProgrammaticSubscribeTestsWithImplicitSubscrbingGrains.cs
index ceb38043672..6253a82f5f6 100644
--- a/test/Tester/StreamingTests/ProgrammaticSubscribeTests/ProgrammaticSubscribeTestsWithImplicitSubscrbingGrains.cs
+++ b/test/Tester/StreamingTests/ProgrammaticSubscribeTests/ProgrammaticSubscribeTestsWithImplicitSubscrbingGrains.cs
@@ -3,6 +3,9 @@
using Orleans.TestingHost;
using System;
using System.Threading.Tasks;
+using Microsoft.Extensions.Configuration;
+using Orleans;
+using Orleans.Hosting;
using TestExtensions;
using Xunit;
using UnitTests.Grains.ProgrammaticSubscribe;
@@ -22,18 +25,29 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder)
{
legacy.ClusterConfiguration.AddMemoryStorageProvider("Default");
legacy.ClusterConfiguration.AddMemoryStorageProvider("PubSubStore");
- legacy.ClusterConfiguration.AddSimpleMessageStreamProvider(StreamProviderName,
- false,
- true,
- StreamPubSubType.ImplicitOnly);
- legacy.ClientConfiguration.AddSimpleMessageStreamProvider(StreamProviderName,
- false,
- true,
- StreamPubSubType.ImplicitOnly);
});
+ builder.AddClientBuilderConfigurator();
+ builder.AddSiloBuilderConfigurator();
}
}
+ public class SiloConfigurator : ISiloBuilderConfigurator
+ {
+ public void Configure(ISiloHostBuilder hostBuilder)
+ {
+ hostBuilder.AddSimpleMessageStreamProvider(StreamProviderName,options => options.PubSubType = StreamPubSubType.ImplicitOnly);
+ }
+ }
+
+ public class ClientConfiguretor : IClientBuilderConfigurator
+ {
+ public void Configure(IConfiguration configuration, IClientBuilder clientBuilder)
+ {
+ clientBuilder.AddSimpleMessageStreamProvider(StreamProviderName, options => options.PubSubType = StreamPubSubType.ImplicitOnly);
+ }
+ }
+
+
public ProgrammaticSubscribeTestsWithImplicitSubscrbingGrains(Fixture fixture)
{
this.fixture = fixture;
diff --git a/test/Tester/StreamingTests/SMSClientStreamTests.cs b/test/Tester/StreamingTests/SMSClientStreamTests.cs
index 1831ca2591c..251c24e3d71 100644
--- a/test/Tester/StreamingTests/SMSClientStreamTests.cs
+++ b/test/Tester/StreamingTests/SMSClientStreamTests.cs
@@ -1,6 +1,9 @@
using System;
using System.Threading.Tasks;
+using Microsoft.Extensions.Configuration;
+using Orleans;
+using Orleans.Hosting;
using Orleans.Runtime;
using Orleans.Runtime.Configuration;
using Orleans.TestingHost;
@@ -30,11 +33,25 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder)
builder.ConfigureLegacyConfiguration(legacy =>
{
legacy.ClusterConfiguration.AddMemoryStorageProvider("PubSubStore");
- legacy.ClusterConfiguration.AddSimpleMessageStreamProvider(SMSStreamProviderName);
legacy.ClusterConfiguration.Globals.ClientDropTimeout = TimeSpan.FromSeconds(5);
-
- legacy.ClientConfiguration.AddSimpleMessageStreamProvider(SMSStreamProviderName);
});
+ builder.AddClientBuilderConfigurator();
+ builder.AddSiloBuilderConfigurator();
+ }
+
+ public class SiloConfigurator : ISiloBuilderConfigurator
+ {
+ public void Configure(ISiloHostBuilder hostBuilder)
+ {
+ hostBuilder.AddSimpleMessageStreamProvider(SMSStreamProviderName);
+ }
+ }
+ public class ClientConfiguretor : IClientBuilderConfigurator
+ {
+ public void Configure(IConfiguration configuration, IClientBuilder clientBuilder)
+ {
+ clientBuilder.AddSimpleMessageStreamProvider(SMSStreamProviderName);
+ }
}
[Fact, TestCategory("SlowBVT"), TestCategory("Functional"), TestCategory("Streaming")]
diff --git a/test/Tester/StreamingTests/SMSDeactivationTests.cs b/test/Tester/StreamingTests/SMSDeactivationTests.cs
index f10b03845a3..faa6f82b315 100644
--- a/test/Tester/StreamingTests/SMSDeactivationTests.cs
+++ b/test/Tester/StreamingTests/SMSDeactivationTests.cs
@@ -1,6 +1,8 @@
using System;
using System.Threading.Tasks;
+using Microsoft.Extensions.Configuration;
using Orleans;
+using Orleans.Hosting;
using Orleans.Runtime;
using Orleans.Runtime.Configuration;
using Orleans.TestingHost;
@@ -30,9 +32,24 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder)
legacy.ClusterConfiguration.Globals.ResponseTimeout = TimeSpan.FromMinutes(30);
legacy.ClusterConfiguration.AddMemoryStorageProvider("PubSubStore");
- legacy.ClusterConfiguration.AddSimpleMessageStreamProvider(StreamTestsConstants.SMS_STREAM_PROVIDER_NAME);
- legacy.ClientConfiguration.AddSimpleMessageStreamProvider(StreamTestsConstants.SMS_STREAM_PROVIDER_NAME);
});
+ builder.AddClientBuilderConfigurator();
+ builder.AddSiloBuilderConfigurator();
+ }
+
+ public class SiloConfigurator : ISiloBuilderConfigurator
+ {
+ public void Configure(ISiloHostBuilder hostBuilder)
+ {
+ hostBuilder.AddSimpleMessageStreamProvider(StreamTestsConstants.SMS_STREAM_PROVIDER_NAME);
+ }
+ }
+ public class ClientConfiguretor : IClientBuilderConfigurator
+ {
+ public void Configure(IConfiguration configuration, IClientBuilder clientBuilder)
+ {
+ clientBuilder.AddSimpleMessageStreamProvider(StreamTestsConstants.SMS_STREAM_PROVIDER_NAME);
+ }
}
[Fact, TestCategory("Functional"), TestCategory("Streaming")]
diff --git a/test/Tester/StreamingTests/SMSSubscriptionMultiplicityTests.cs b/test/Tester/StreamingTests/SMSSubscriptionMultiplicityTests.cs
index e4fa09f7023..7c7db478b71 100644
--- a/test/Tester/StreamingTests/SMSSubscriptionMultiplicityTests.cs
+++ b/test/Tester/StreamingTests/SMSSubscriptionMultiplicityTests.cs
@@ -1,5 +1,8 @@
using System;
using System.Threading.Tasks;
+using Microsoft.Extensions.Configuration;
+using Orleans;
+using Orleans.Hosting;
using Orleans.Runtime;
using Orleans.Runtime.Configuration;
using Orleans.TestingHost;
@@ -10,18 +13,33 @@ namespace UnitTests.StreamingTests
{
public class SMSSubscriptionMultiplicityTests : OrleansTestingBase, IClassFixture
{
+
public class Fixture : BaseTestClusterFixture
{
- public const string StreamProvider = StreamTestsConstants.SMS_STREAM_PROVIDER_NAME;
+ public const string StreamProvider = StreamTestsConstants.SMS_STREAM_PROVIDER_NAME;
protected override void ConfigureTestCluster(TestClusterBuilder builder)
{
builder.ConfigureLegacyConfiguration(legacy =>
{
legacy.ClusterConfiguration.AddMemoryStorageProvider("PubSubStore");
- legacy.ClusterConfiguration.AddSimpleMessageStreamProvider(StreamProvider);
- legacy.ClientConfiguration.AddSimpleMessageStreamProvider(StreamProvider);
});
+ builder.AddClientBuilderConfigurator();
+ builder.AddSiloBuilderConfigurator();
+ }
+ public class SiloConfigurator : ISiloBuilderConfigurator
+ {
+ public void Configure(ISiloHostBuilder hostBuilder)
+ {
+ hostBuilder.AddSimpleMessageStreamProvider(StreamProvider);
+ }
+ }
+ public class ClientConfiguretor : IClientBuilderConfigurator
+ {
+ public void Configure(IConfiguration configuration, IClientBuilder clientBuilder)
+ {
+ clientBuilder.AddSimpleMessageStreamProvider(StreamProvider);
+ }
}
}
diff --git a/test/Tester/StreamingTests/SampleStreamingTests.cs b/test/Tester/StreamingTests/SampleStreamingTests.cs
index 89df65be3c0..2674e280de9 100644
--- a/test/Tester/StreamingTests/SampleStreamingTests.cs
+++ b/test/Tester/StreamingTests/SampleStreamingTests.cs
@@ -1,7 +1,10 @@
using System;
using System.Linq;
using System.Threading.Tasks;
+using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
+using Orleans;
+using Orleans.Hosting;
using Orleans.Runtime;
using Orleans.Runtime.Configuration;
using Orleans.Streams;
@@ -27,10 +30,24 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder)
builder.ConfigureLegacyConfiguration(legacy =>
{
legacy.ClusterConfiguration.AddMemoryStorageProvider("PubSubStore");
-
- legacy.ClusterConfiguration.AddSimpleMessageStreamProvider(StreamProvider, false);
- legacy.ClientConfiguration.AddSimpleMessageStreamProvider(StreamProvider, false);
});
+ builder.AddSiloBuilderConfigurator();
+ builder.AddClientBuilderConfigurator();
+ }
+
+ public class SiloConfigurator : ISiloBuilderConfigurator
+ {
+ public void Configure(ISiloHostBuilder hostBuilder)
+ {
+ hostBuilder.AddSimpleMessageStreamProvider(StreamProvider);
+ }
+ }
+ public class ClientConfiguretor : IClientBuilderConfigurator
+ {
+ public void Configure(IConfiguration configuration, IClientBuilder clientBuilder)
+ {
+ clientBuilder.AddSimpleMessageStreamProvider(StreamProvider);
+ }
}
}
diff --git a/test/Tester/StreamingTests/StreamFilteringTests_SMS.cs b/test/Tester/StreamingTests/StreamFilteringTests_SMS.cs
index e1539ae04e5..d1dbc90ef18 100644
--- a/test/Tester/StreamingTests/StreamFilteringTests_SMS.cs
+++ b/test/Tester/StreamingTests/StreamFilteringTests_SMS.cs
@@ -2,6 +2,9 @@
using Orleans.Runtime.Configuration;
using Orleans.TestingHost;
using System.Threading.Tasks;
+using Microsoft.Extensions.Configuration;
+using Orleans;
+using Orleans.Hosting;
using Xunit;
using TestExtensions;
using UnitTests.StreamingTests;
@@ -19,10 +22,24 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder)
{
legacy.ClusterConfiguration.AddMemoryStorageProvider("MemoryStore");
legacy.ClusterConfiguration.AddMemoryStorageProvider("PubSubStore");
-
- legacy.ClusterConfiguration.AddSimpleMessageStreamProvider(StreamProvider, false);
- legacy.ClientConfiguration.AddSimpleMessageStreamProvider(StreamProvider, false);
});
+ builder.AddClientBuilderConfigurator();
+ builder.AddSiloBuilderConfigurator();
+ }
+
+ public class SiloConfigurator : ISiloBuilderConfigurator
+ {
+ public void Configure(ISiloHostBuilder hostBuilder)
+ {
+ hostBuilder.AddSimpleMessageStreamProvider(StreamProvider);
+ }
+ }
+ public class ClientConfiguretor : IClientBuilderConfigurator
+ {
+ public void Configure(IConfiguration configuration, IClientBuilder clientBuilder)
+ {
+ clientBuilder.AddSimpleMessageStreamProvider(StreamProvider);
+ }
}
}
diff --git a/test/TesterAzureUtils/Streaming/AQStreamingTests.cs b/test/TesterAzureUtils/Streaming/AQStreamingTests.cs
index b5c86f83068..75ad2e99347 100644
--- a/test/TesterAzureUtils/Streaming/AQStreamingTests.cs
+++ b/test/TesterAzureUtils/Streaming/AQStreamingTests.cs
@@ -30,9 +30,6 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder)
builder.ConfigureLegacyConfiguration(legacy =>
{
legacy.ClusterConfiguration.AddMemoryStorageProvider();
-
- legacy.ClusterConfiguration.AddSimpleMessageStreamProvider(SmsStreamProviderName, fireAndForgetDelivery: false);
- legacy.ClientConfiguration.AddSimpleMessageStreamProvider(SmsStreamProviderName, fireAndForgetDelivery: false);
});
builder.AddSiloBuilderConfigurator();
builder.AddClientBuilderConfigurator();
@@ -43,6 +40,7 @@ private class MyClientBuilderConfigurator : IClientBuilderConfigurator
public void Configure(IConfiguration configuration, IClientBuilder clientBuilder)
{
clientBuilder
+ .AddSimpleMessageStreamProvider(SmsStreamProviderName)
.AddAzureQueueStreams(AzureQueueStreamProviderName,
options =>
{
@@ -56,6 +54,7 @@ private class SiloBuilderConfigurator : ISiloBuilderConfigurator
public void Configure(ISiloHostBuilder hostBuilder)
{
hostBuilder
+ .AddSimpleMessageStreamProvider(SmsStreamProviderName)
.AddAzureTableGrainStorage("AzureStore", builder => builder.Configure>((options, silo) =>
{
options.ServiceId = silo.Value.ServiceId.ToString();
diff --git a/test/TesterAzureUtils/Streaming/HaloStreamSubscribeTests.cs b/test/TesterAzureUtils/Streaming/HaloStreamSubscribeTests.cs
index f5a02d94384..63d7447c08a 100644
--- a/test/TesterAzureUtils/Streaming/HaloStreamSubscribeTests.cs
+++ b/test/TesterAzureUtils/Streaming/HaloStreamSubscribeTests.cs
@@ -34,11 +34,6 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder)
builder.ConfigureLegacyConfiguration(legacy =>
{
legacy.ClusterConfiguration.AddMemoryStorageProvider("MemoryStore", numStorageGrains: 1);
-
- legacy.ClusterConfiguration.AddSimpleMessageStreamProvider(SmsStreamProviderName, fireAndForgetDelivery: false);
- legacy.ClusterConfiguration.AddSimpleMessageStreamProvider("SMSProviderDoNotOptimizeForImmutableData",
- fireAndForgetDelivery: false,
- optimizeForImmutableData: false);
});
builder.AddSiloBuilderConfigurator();
}
@@ -54,6 +49,8 @@ public void Configure(ISiloHostBuilder hostBuilder)
options.ConnectionString = TestDefaultConfiguration.DataConnectionString;
options.DeleteStateOnClear = true;
}))
+ .AddSimpleMessageStreamProvider(SmsStreamProviderName)
+ .AddSimpleMessageStreamProvider("SMSProviderDoNotOptimizeForImmutableData", options => options.OptimizeForImmutableData = false)
.AddAzureTableGrainStorage("PubSubStore", builder => builder.Configure>((options, silo) =>
{
options.ServiceId = silo.Value.ServiceId.ToString();
diff --git a/test/TesterAzureUtils/Streaming/StreamLifecycleTests.cs b/test/TesterAzureUtils/Streaming/StreamLifecycleTests.cs
index e662957fdea..8c44066aff5 100644
--- a/test/TesterAzureUtils/Streaming/StreamLifecycleTests.cs
+++ b/test/TesterAzureUtils/Streaming/StreamLifecycleTests.cs
@@ -39,13 +39,6 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder)
builder.ConfigureLegacyConfiguration(legacy =>
{
legacy.ClusterConfiguration.AddMemoryStorageProvider("MemoryStore", numStorageGrains: 1);
-
- legacy.ClusterConfiguration.AddSimpleMessageStreamProvider(SmsStreamProviderName, fireAndForgetDelivery: false);
- legacy.ClusterConfiguration.AddSimpleMessageStreamProvider("SMSProviderDoNotOptimizeForImmutableData",
- fireAndForgetDelivery: false,
- optimizeForImmutableData: false);
-
- legacy.ClientConfiguration.AddSimpleMessageStreamProvider(SmsStreamProviderName, fireAndForgetDelivery: false);
});
builder.AddSiloBuilderConfigurator();
builder.AddClientBuilderConfigurator();
@@ -56,6 +49,7 @@ private class MyClientBuilderConfigurator : IClientBuilderConfigurator
public void Configure(IConfiguration configuration, IClientBuilder clientBuilder)
{
clientBuilder
+ .AddSimpleMessageStreamProvider(SmsStreamProviderName)
.AddAzureQueueStreams(AzureQueueStreamProviderName,
options =>
{
@@ -69,6 +63,8 @@ private class MySiloBuilderConfigurator : ISiloBuilderConfigurator
public void Configure(ISiloHostBuilder hostBuilder)
{
hostBuilder
+ .AddSimpleMessageStreamProvider(SmsStreamProviderName)
+ .AddSimpleMessageStreamProvider("SMSProviderDoNotOptimizeForImmutableData", options => options.OptimizeForImmutableData = false)
.AddAzureTableGrainStorage("AzureStore", builder => builder.Configure>((options, silo) =>
{
options.ServiceId = silo.Value.ServiceId.ToString();
diff --git a/test/TesterAzureUtils/Streaming/StreamLimitTests.cs b/test/TesterAzureUtils/Streaming/StreamLimitTests.cs
index 916b1bb006c..7830466109b 100644
--- a/test/TesterAzureUtils/Streaming/StreamLimitTests.cs
+++ b/test/TesterAzureUtils/Streaming/StreamLimitTests.cs
@@ -44,13 +44,7 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder)
builder.ConfigureLegacyConfiguration(legacy =>
{
-
legacy.ClusterConfiguration.AddMemoryStorageProvider("MemoryStore", numStorageGrains: 1);
-
- legacy.ClusterConfiguration.AddSimpleMessageStreamProvider(SmsStreamProviderName, fireAndForgetDelivery: false);
- legacy.ClusterConfiguration.AddSimpleMessageStreamProvider("SMSProviderDoNotOptimizeForImmutableData",
- fireAndForgetDelivery: false,
- optimizeForImmutableData: false);
});
builder.AddSiloBuilderConfigurator();
}
@@ -60,6 +54,8 @@ private class SiloBuilderConfigurator : ISiloBuilderConfigurator
public void Configure(ISiloHostBuilder hostBuilder)
{
hostBuilder
+ .AddSimpleMessageStreamProvider(SmsStreamProviderName)
+ .AddSimpleMessageStreamProvider("SMSProviderDoNotOptimizeForImmutableData", options => options.OptimizeForImmutableData = false)
.AddAzureTableGrainStorage("AzureStore", builder => builder.Configure>((options, silo) =>
{
options.ServiceId = silo.Value.ServiceId.ToString();
diff --git a/test/TesterAzureUtils/Streaming/StreamReliabilityTests.cs b/test/TesterAzureUtils/Streaming/StreamReliabilityTests.cs
index cba92a18efe..19d1dc80b5b 100644
--- a/test/TesterAzureUtils/Streaming/StreamReliabilityTests.cs
+++ b/test/TesterAzureUtils/Streaming/StreamReliabilityTests.cs
@@ -53,10 +53,6 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder)
builder.ConfigureLegacyConfiguration(legacy =>
{
legacy.ClusterConfiguration.AddMemoryStorageProvider("MemoryStore", numStorageGrains: 1);
-
- legacy.ClusterConfiguration.AddSimpleMessageStreamProvider(SMS_STREAM_PROVIDER_NAME, fireAndForgetDelivery: false);
-
- legacy.ClientConfiguration.AddSimpleMessageStreamProvider(SMS_STREAM_PROVIDER_NAME, fireAndForgetDelivery: false);
});
builder.AddSiloBuilderConfigurator();
@@ -75,7 +71,8 @@ public void Configure(IConfiguration configuration, IClientBuilder clientBuilder
options =>
{
options.ConnectionString = TestDefaultConfiguration.DataConnectionString;
- });
+ })
+ .AddSimpleMessageStreamProvider(SMS_STREAM_PROVIDER_NAME);
}
}
@@ -94,6 +91,7 @@ public void Configure(ISiloHostBuilder hostBuilder)
options.ConnectionString = TestDefaultConfiguration.DataConnectionString;
options.DeleteStateOnClear = true;
}))
+ .AddSimpleMessageStreamProvider(SMS_STREAM_PROVIDER_NAME)
.AddAzureTableGrainStorage("PubSubStore", builder => builder.Configure>((options, silo) =>
{
options.ServiceId = silo.Value.ServiceId.ToString();
diff --git a/test/TesterInternal/GeoClusterTests/MultiClusterRegistrationTests.cs b/test/TesterInternal/GeoClusterTests/MultiClusterRegistrationTests.cs
index b81ad697e19..d02c54fa095 100644
--- a/test/TesterInternal/GeoClusterTests/MultiClusterRegistrationTests.cs
+++ b/test/TesterInternal/GeoClusterTests/MultiClusterRegistrationTests.cs
@@ -6,11 +6,13 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
+using Orleans.TestingHost;
using TestExtensions;
using TestGrainInterfaces;
using Tests.GeoClusterTests;
using Xunit;
using Xunit.Abstractions;
+using Orleans.Hosting;
namespace UnitTests.GeoClusterTests
{
@@ -95,7 +97,13 @@ private Task StartClustersAndClients(params short[] silos)
{
return StartClustersAndClients(null, null, silos);
}
-
+ public class SiloConfigurator : ISiloBuilderConfigurator
+ {
+ public void Configure(ISiloHostBuilder hostBuilder)
+ {
+ hostBuilder.AddSimpleMessageStreamProvider("SMSProvider");
+ }
+ }
private Task StartClustersAndClients(Action config_customizer, Action clientconfig_customizer, params short[] silos)
{
WriteLog("Creating clusters and clients...");
@@ -111,8 +119,6 @@ private Task StartClustersAndClients(Action config_customi
// configuration for cluster
Action addtracing = (ClusterConfiguration c) =>
{
- c.AddSimpleMessageStreamProvider("SMSProvider", fireAndForgetDelivery: false);
-
config_customizer?.Invoke(c);
};
// configuration for clients
@@ -127,7 +133,7 @@ private Task StartClustersAndClients(Action config_customi
var numsilos = silos[i];
var clustername = ClusterNames[i] = ((char)('A' + i)).ToString();
var c = Clients[i] = new ClientWrapper[numsilos];
- NewGeoCluster(globalserviceid, clustername, silos[i], addtracing);
+ NewGeoCluster(globalserviceid, clustername, silos[i], addtracing);
// create one client per silo
Parallel.For(0, numsilos, paralleloptions, (j) => c[j] = this.NewClient(clustername, j, ClientWrapper.Factory, ccc));
}
diff --git a/test/TesterInternal/GeoClusterTests/TestingClusterHost.cs b/test/TesterInternal/GeoClusterTests/TestingClusterHost.cs
index 6c8894b6883..0a1277dbea1 100644
--- a/test/TesterInternal/GeoClusterTests/TestingClusterHost.cs
+++ b/test/TesterInternal/GeoClusterTests/TestingClusterHost.cs
@@ -136,8 +136,12 @@ private static int GetProxyBase(int clusternumber)
#endregion
#region Cluster Creation
-
public void NewGeoCluster(Guid globalServiceId, string clusterId, short numSilos, Action customizer = null)
+ {
+ NewGeoCluster(globalServiceId, clusterId, numSilos, customizer);
+ }
+ public void NewGeoCluster(Guid globalServiceId, string clusterId, short numSilos, Action customizer = null)
+ where TSiloBuilderConfigurator : ISiloBuilderConfigurator, new()
{
Action extendedcustomizer = config =>
{
@@ -154,13 +158,17 @@ public void NewGeoCluster(Guid globalServiceId, string clusterId, short numSilos
ChannelType = GlobalConfiguration.GossipChannelType.AzureTable,
ConnectionString = TestDefaultConfiguration.DataConnectionString
}};
-
customizer?.Invoke(config);
};
- NewCluster(clusterId, numSilos, extendedcustomizer);
+ NewCluster(clusterId, numSilos, extendedcustomizer);
+ }
+ private class NoOpSiloBuilderConfigurator : ISiloBuilderConfigurator
+ {
+ public void Configure(ISiloHostBuilder hostBuilder)
+ {
+ }
}
-
private class TestSiloBuilderConfigurator : ISiloBuilderConfigurator
{
@@ -179,7 +187,14 @@ public void Configure(ISiloHostBuilder hostBuilder)
}
}
- public void NewCluster(string clusterId, short numSilos, Action customizer = null)
+ public void NewCluster(string clusterId, short numSilos,
+ Action customizer = null)
+ {
+ NewCluster(clusterId, numSilos, customizer);
+ }
+
+ public void NewCluster(string clusterId, short numSilos, Action customizer = null)
+ where TSiloBuilderConfigurator : ISiloBuilderConfigurator, new()
{
TestCluster testCluster;
lock (Clusters)
@@ -198,6 +213,7 @@ public void NewCluster(string clusterId, short numSilos, Action();
+ builder.AddSiloBuilderConfigurator();
builder.ConfigureLegacyConfiguration(legacy =>
{
legacy.ClusterConfiguration.AddMemoryStorageProvider("Default");
diff --git a/test/TesterInternal/StreamProvidersTests.cs b/test/TesterInternal/StreamProvidersTests.cs
index 301a4f49933..8f32371fb66 100644
--- a/test/TesterInternal/StreamProvidersTests.cs
+++ b/test/TesterInternal/StreamProvidersTests.cs
@@ -11,6 +11,8 @@
using Xunit;
using Xunit.Abstractions;
using System.Threading.Tasks;
+using Microsoft.Extensions.Configuration;
+using Orleans.Hosting;
using Orleans.Runtime.TestHooks;
using Orleans.Runtime;
using Orleans.Runtime.Configuration;
@@ -112,12 +114,17 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder)
builder.ConfigureLegacyConfiguration(legacy =>
{
legacy.ClusterConfiguration.AddMemoryStorageProvider("MemoryStore", numStorageGrains: 1);
-
- legacy.ClusterConfiguration.AddSimpleMessageStreamProvider(StreamTestsConstants.SMS_STREAM_PROVIDER_NAME, fireAndForgetDelivery: false);
- legacy.ClusterConfiguration.AddSimpleMessageStreamProvider("SMSProviderDoNotOptimizeForImmutableData",
- fireAndForgetDelivery: false,
- optimizeForImmutableData: false);
});
+ builder.AddSiloBuilderConfigurator();
+ }
+
+ public class SiloConfigurator : ISiloBuilderConfigurator
+ {
+ public void Configure(ISiloHostBuilder hostBuilder)
+ {
+ hostBuilder.AddSimpleMessageStreamProvider(StreamTestsConstants.SMS_STREAM_PROVIDER_NAME)
+ .AddSimpleMessageStreamProvider("SMSProviderDoNotOptimizeForImmutableData", options => options.OptimizeForImmutableData = false);
+ }
}
}
diff --git a/test/TesterInternal/StreamingTests/SMSStreamingTests.cs b/test/TesterInternal/StreamingTests/SMSStreamingTests.cs
index 7fca0cf6939..160aefc433c 100644
--- a/test/TesterInternal/StreamingTests/SMSStreamingTests.cs
+++ b/test/TesterInternal/StreamingTests/SMSStreamingTests.cs
@@ -1,5 +1,8 @@
using System;
using System.Threading.Tasks;
+using Microsoft.Extensions.Configuration;
+using Orleans;
+using Orleans.Hosting;
using Orleans.Providers;
using Orleans.Providers.Streams.SimpleMessageStream;
using Orleans.Runtime.Configuration;
@@ -16,7 +19,7 @@ public class Fixture : BaseTestClusterFixture
private static readonly Guid ServiceId = Guid.NewGuid();
public const string AzureQueueStreamProviderName = StreamTestsConstants.AZURE_QUEUE_STREAM_PROVIDER_NAME;
public const string SmsStreamProviderName = "SMSProvider";
-
+ public const bool SMSFireAndForgetOnSilo = false;
public ClusterConfiguration ClusterConfiguration { get; set; }
protected override void ConfigureTestCluster(TestClusterBuilder builder)
@@ -28,16 +31,32 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder)
legacy.ClusterConfiguration.AddMemoryStorageProvider("MemoryStore", numStorageGrains: 1);
legacy.ClusterConfiguration.AddMemoryStorageProvider("PubSubStore");
- legacy.ClusterConfiguration.AddSimpleMessageStreamProvider(SmsStreamProviderName, fireAndForgetDelivery: false);
- legacy.ClusterConfiguration.AddSimpleMessageStreamProvider("SMSProviderDoNotOptimizeForImmutableData",
- fireAndForgetDelivery: false,
- optimizeForImmutableData: false);
-
- legacy.ClientConfiguration.AddSimpleMessageStreamProvider(SmsStreamProviderName, fireAndForgetDelivery: false);
-
legacy.ClusterConfiguration.Globals.ServiceId = ServiceId;
this.ClusterConfiguration = legacy.ClusterConfiguration;
});
+ builder.AddSiloBuilderConfigurator();
+ builder.AddClientBuilderConfigurator();
+ }
+
+ public class SiloConfigurator : ISiloBuilderConfigurator
+ {
+ public void Configure(ISiloHostBuilder hostBuilder)
+ {
+ hostBuilder.AddSimpleMessageStreamProvider(SmsStreamProviderName)
+ .AddSimpleMessageStreamProvider("SMSProviderDoNotOptimizeForImmutableData",
+ options =>
+ {
+ options.OptimizeForImmutableData = false;
+ options.FireAndForgetDelivery = SMSFireAndForgetOnSilo;
+ });
+ }
+ }
+ public class ClientConfiguretor : IClientBuilderConfigurator
+ {
+ public void Configure(IConfiguration configuration, IClientBuilder clientBuilder)
+ {
+ clientBuilder.AddSimpleMessageStreamProvider(SmsStreamProviderName);
+ }
}
}
@@ -48,7 +67,7 @@ public SMSStreamingTests(Fixture fixture)
{
runner = new SingleStreamTestRunner(fixture.HostedCluster.InternalClient, SingleStreamTestRunner.SMS_STREAM_PROVIDER_NAME);
// runner = new SingleStreamTestRunner(SingleStreamTestRunner.SMS_STREAM_PROVIDER_NAME, 0, false);
- fireAndForgetDeliveryProperty = ExtractFireAndForgetDeliveryProperty(fixture);
+ fireAndForgetDeliveryProperty = Fixture.SMSFireAndForgetOnSilo;
}
#region Simple Message Stream Tests
@@ -144,27 +163,6 @@ public async Task SMS_14_SameGrain_ProducerFirstConsumerLater()
await runner.StreamTest_14_SameGrain_ProducerFirstConsumerLater(!fireAndForgetDeliveryProperty);
}
- private bool ExtractFireAndForgetDeliveryProperty(Fixture fixture)
- {
- ProviderCategoryConfiguration providerConfigs;
- if (fixture.ClusterConfiguration.Globals.ProviderConfigurations.TryGetValue(ProviderCategoryConfiguration.STREAM_PROVIDER_CATEGORY_NAME,
- out providerConfigs))
- {
- IProviderConfiguration provider;
- if (providerConfigs.Providers.TryGetValue(SingleStreamTestRunner.SMS_STREAM_PROVIDER_NAME, out provider))
- {
- string fireAndForgetProperty = null;
- bool fireAndForget = false;
- if (provider.Properties.TryGetValue(SimpleMessageStreamProvider.FIRE_AND_FORGET_DELIVERY, out fireAndForgetProperty))
- {
- fireAndForget = Boolean.Parse(fireAndForgetProperty);
- }
- return fireAndForget;
- }
- }
- throw new Exception("failed to get fire and forget");
- }
-
//----------------------------------------------//
[Fact, TestCategory("Functional"), TestCategory("Streaming")]
diff --git a/test/TesterInternal/StreamingTests/StreamPubSubReliabilityTests.cs b/test/TesterInternal/StreamingTests/StreamPubSubReliabilityTests.cs
index 727e5b188c1..a9ffb08d720 100644
--- a/test/TesterInternal/StreamingTests/StreamPubSubReliabilityTests.cs
+++ b/test/TesterInternal/StreamingTests/StreamPubSubReliabilityTests.cs
@@ -1,6 +1,8 @@
using System;
using System.Threading.Tasks;
+using Microsoft.Extensions.Configuration;
using Orleans;
+using Orleans.Hosting;
using Orleans.Runtime;
using Orleans.Runtime.Configuration;
using Orleans.TestingHost;
@@ -24,17 +26,30 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder)
legacy.ClusterConfiguration.AddMemoryStorageProvider("MemoryStore", numStorageGrains: 1);
legacy.ClusterConfiguration.Globals.RegisterStorageProvider(PubSubStoreProviderName);
- legacy.ClusterConfiguration.AddSimpleMessageStreamProvider(StreamTestsConstants.SMS_STREAM_PROVIDER_NAME, fireAndForgetDelivery: false);
-
legacy.ClusterConfiguration.Globals.MaxResendCount = 0;
legacy.ClusterConfiguration.Globals.ResponseTimeout = TimeSpan.FromSeconds(30);
- legacy.ClientConfiguration.AddSimpleMessageStreamProvider(StreamTestsConstants.SMS_STREAM_PROVIDER_NAME, fireAndForgetDelivery: false);
-
legacy.ClientConfiguration.ClientSenderBuckets = 8192;
legacy.ClientConfiguration.ResponseTimeout = TimeSpan.FromSeconds(30);
legacy.ClientConfiguration.MaxResendCount = 0;
});
+ builder.AddSiloBuilderConfigurator();
+ builder.AddClientBuilderConfigurator();
+ }
+ }
+
+ public class SiloConfigurator : ISiloBuilderConfigurator
+ {
+ public void Configure(ISiloHostBuilder hostBuilder)
+ {
+ hostBuilder.AddSimpleMessageStreamProvider(StreamTestsConstants.SMS_STREAM_PROVIDER_NAME);
+ }
+ }
+ public class ClientConfiguretor : IClientBuilderConfigurator
+ {
+ public void Configure(IConfiguration configuration, IClientBuilder clientBuilder)
+ {
+ clientBuilder.AddSimpleMessageStreamProvider(StreamTestsConstants.SMS_STREAM_PROVIDER_NAME);
}
}