From ec3ab17e31662b81cf03777d3e697e272421e3ab Mon Sep 17 00:00:00 2001 From: xiazen Date: Thu, 15 Feb 2018 16:32:23 -0800 Subject: [PATCH] Sms stream provider ported to use options and lifecycle. --- .../Streams/EventHub/EventHubStreamOptions.cs | 3 - .../Configuration/ConfigurationExtensions.cs | 48 ----------- .../Streams/ClientStreamExtensions.cs | 43 ++++++++++ .../SimpleMessageStreamProvider.cs | 80 +++++++------------ .../SimpleMessageStreamProviderOptions.cs | 17 ++++ .../Hosting/StreamHostingExtensions.cs | 43 ++++++++++ .../Streaming/SQSStreamTests.cs | 6 +- .../Streaming/PubSubStreamTests.cs | 4 +- test/Tester/GrainCallFilterTests.cs | 19 ++++- ...grammaticSubscribeTestSMSStreamProvider.cs | 21 +++-- ...scribeTestsWithImplicitSubscrbingGrains.cs | 30 +++++-- .../StreamingTests/SMSClientStreamTests.cs | 23 +++++- .../StreamingTests/SMSDeactivationTests.cs | 21 ++++- .../SMSSubscriptionMultiplicityTests.cs | 24 +++++- .../StreamingTests/SampleStreamingTests.cs | 23 +++++- .../StreamFilteringTests_SMS.cs | 23 +++++- .../Streaming/AQStreamingTests.cs | 5 +- .../Streaming/HaloStreamSubscribeTests.cs | 7 +- .../Streaming/StreamLifecycleTests.cs | 10 +-- .../Streaming/StreamLimitTests.cs | 8 +- .../Streaming/StreamReliabilityTests.cs | 8 +- .../MultiClusterRegistrationTests.cs | 14 +++- .../GeoClusterTests/TestingClusterHost.cs | 26 ++++-- test/TesterInternal/StreamProvidersTests.cs | 17 ++-- .../StreamingTests/SMSStreamingTests.cs | 58 +++++++------- .../StreamPubSubReliabilityTests.cs | 23 +++++- 26 files changed, 390 insertions(+), 214 deletions(-) create mode 100644 src/Orleans.Core/Streams/SimpleMessageStream/SimpleMessageStreamProviderOptions.cs diff --git a/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/EventHubStreamOptions.cs b/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/EventHubStreamOptions.cs index 931b9174190..cd727075ffa 100644 --- a/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/EventHubStreamOptions.cs +++ b/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/EventHubStreamOptions.cs @@ -1,8 +1,5 @@  using System; -using System.Collections.Generic; -using Microsoft.Extensions.Options; -using Orleans.Runtime.Configuration; namespace Orleans.Configuration { diff --git a/src/Orleans.Core.Legacy/Configuration/ConfigurationExtensions.cs b/src/Orleans.Core.Legacy/Configuration/ConfigurationExtensions.cs index b7fe35ef7be..47eaa945992 100644 --- a/src/Orleans.Core.Legacy/Configuration/ConfigurationExtensions.cs +++ b/src/Orleans.Core.Legacy/Configuration/ConfigurationExtensions.cs @@ -10,40 +10,6 @@ namespace Orleans.Runtime.Configuration /// public static class ConfigurationExtensions { - /// - /// Adds a stream provider of type - /// - /// The cluster configuration object to add provider to. - /// The provider name. - /// Specifies whether the producer waits for the consumer to process the event before continuing. Setting this to false is useful for troubleshooting serialization issues. - /// If set to true items transfered via the stream are always wrapped in Immutable for delivery.> - /// Specifies how can grains subscribe to this stream. - public static void AddSimpleMessageStreamProvider(this ClientConfiguration config, string providerName, - bool fireAndForgetDelivery = SimpleMessageStreamProvider.DEFAULT_VALUE_FIRE_AND_FORGET_DELIVERY, - bool optimizeForImmutableData = SimpleMessageStreamProvider.DEFAULT_VALUE_OPTIMIZE_FOR_IMMUTABLE_DATA, - StreamPubSubType pubSubType = SimpleMessageStreamProvider.DEFAULT_STREAM_PUBSUB_TYPE) - { - var properties = GetSimpleMessageStreamProviderConfiguration(providerName, fireAndForgetDelivery, optimizeForImmutableData, pubSubType); - config.RegisterStreamProvider(providerName, properties); - } - - /// - /// Adds a stream provider of type - /// - /// The cluster configuration object to add provider to. - /// The provider name. - /// Specifies whether the producer waits for the consumer to process the event before continuing. Setting this to false is useful for troubleshooting serialization issues. - /// If set to true items transfered via the stream are always wrapped in Immutable for delivery. - /// Specifies how can grains subscribe to this stream. - public static void AddSimpleMessageStreamProvider(this ClusterConfiguration config, string providerName, - bool fireAndForgetDelivery = SimpleMessageStreamProvider.DEFAULT_VALUE_FIRE_AND_FORGET_DELIVERY, - bool optimizeForImmutableData = SimpleMessageStreamProvider.DEFAULT_VALUE_OPTIMIZE_FOR_IMMUTABLE_DATA, - StreamPubSubType pubSubType = SimpleMessageStreamProvider.DEFAULT_STREAM_PUBSUB_TYPE) - { - var properties = GetSimpleMessageStreamProviderConfiguration(providerName, fireAndForgetDelivery, optimizeForImmutableData, pubSubType); - config.Globals.RegisterStreamProvider(providerName, properties); - } - /// /// Configures all cluster nodes to use the specified startup class for dependency injection. /// @@ -59,19 +25,5 @@ public static void UseStartupType(this ClusterConfiguration config) config.Defaults.StartupTypeName = startupName; } - - private static Dictionary GetSimpleMessageStreamProviderConfiguration(string providerName, bool fireAndForgetDelivery, bool optimizeForImmutableData, StreamPubSubType pubSubType) - { - if (string.IsNullOrWhiteSpace(providerName)) throw new ArgumentNullException(nameof(providerName)); - - var properties = new Dictionary - { - { SimpleMessageStreamProvider.FIRE_AND_FORGET_DELIVERY, fireAndForgetDelivery.ToString() }, - { SimpleMessageStreamProvider.OPTIMIZE_FOR_IMMUTABLE_DATA, optimizeForImmutableData.ToString() }, - { SimpleMessageStreamProvider.STREAM_PUBSUB_TYPE, pubSubType.ToString() }, - }; - - return properties; - } } } \ No newline at end of file diff --git a/src/Orleans.Core/Streams/ClientStreamExtensions.cs b/src/Orleans.Core/Streams/ClientStreamExtensions.cs index 89793b9c112..16bc61b7c68 100644 --- a/src/Orleans.Core/Streams/ClientStreamExtensions.cs +++ b/src/Orleans.Core/Streams/ClientStreamExtensions.cs @@ -3,6 +3,7 @@ using Microsoft.Extensions.DependencyInjection; using Orleans.Configuration; using Orleans.Providers.Streams.Common; +using Orleans.Providers.Streams.SimpleMessageStream; using Orleans.Runtime; using Orleans.Streams; @@ -49,5 +50,47 @@ public static IServiceCollection AddClusterClientPersistentStreams(thi .AddSingletonNamedService>(name, (s, n) => ((PersistentStreamProvider)s.GetRequiredServiceByName(n)).ParticipateIn()) .AddSingletonNamedService(name, adapterFactory); } + + /// + /// Configure client to use SimpleMessageProvider + /// + public static IClientBuilder AddSimpleMessageStreamProvider(this IClientBuilder builder, string name, + Action configureOptions) + + { + return builder.ConfigureServices(services => + services.AddClusterClientSimpleMessageStreamProvider(name, configureOptions)); + } + + /// + /// Configure client to use SimpleMessageProvider + /// + public static IClientBuilder AddSimpleMessageStreamProvider(this IClientBuilder builder, string name, + Action> configureOptions = null) + + { + return builder.ConfigureServices(services => + services.AddClusterClientSimpleMessageStreamProvider(name, configureOptions)); + } + + /// + /// Configure client to use simple message stream provider + /// + public static IServiceCollection AddClusterClientSimpleMessageStreamProvider(this IServiceCollection services, string name, + Action configureOptions = null) + { + return services.AddClusterClientSimpleMessageStreamProvider(name, ob => ob.Configure(configureOptions)); + } + + /// + /// Configure client to use simple message provider + /// + public static IServiceCollection AddClusterClientSimpleMessageStreamProvider(this IServiceCollection services, string name, + Action> configureOptions = null) + { + configureOptions?.Invoke(services.AddOptions(name)); + return services.ConfigureNamedOptionForLogging(name) + .AddSingletonNamedService(name, SimpleMessageStreamProvider.Create); + } } } diff --git a/src/Orleans.Core/Streams/SimpleMessageStream/SimpleMessageStreamProvider.cs b/src/Orleans.Core/Streams/SimpleMessageStream/SimpleMessageStreamProvider.cs index ec4b055f9ee..ea2c0ad8cd9 100644 --- a/src/Orleans.Core/Streams/SimpleMessageStream/SimpleMessageStreamProvider.cs +++ b/src/Orleans.Core/Streams/SimpleMessageStream/SimpleMessageStreamProvider.cs @@ -1,76 +1,49 @@ using System; -using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; using Orleans.Runtime; using Orleans.Streams; using Orleans.Streams.Core; -using Microsoft.Extensions.Logging; +using Orleans.Serialization; +using Orleans.Configuration; namespace Orleans.Providers.Streams.SimpleMessageStream { - using Orleans.Serialization; - - public class SimpleMessageStreamProvider : IStreamProvider, IProvider, IInternalStreamProvider, IStreamSubscriptionManagerRetriever + public class SimpleMessageStreamProvider : IInternalStreamProvider, IStreamProvider, IStreamSubscriptionManagerRetriever { public string Name { get; private set; } private ILogger logger; private IStreamProviderRuntime providerRuntime; - private bool fireAndForgetDelivery; - private bool optimizeForImmutableData; - private StreamPubSubType pubSubType; - private ProviderStateManager stateManager = new ProviderStateManager(); private IRuntimeClient runtimeClient; private IStreamSubscriptionManager streamSubscriptionManager; private ILoggerFactory loggerFactory; private SerializationManager serializationManager; - - internal const string STREAM_PUBSUB_TYPE = "PubSubType"; - internal const string FIRE_AND_FORGET_DELIVERY = "FireAndForgetDelivery"; - internal const string OPTIMIZE_FOR_IMMUTABLE_DATA = "OptimizeForImmutableData"; - internal const StreamPubSubType DEFAULT_STREAM_PUBSUB_TYPE = StreamPubSubType.ExplicitGrainBasedAndImplicit; - internal const bool DEFAULT_VALUE_FIRE_AND_FORGET_DELIVERY = false; - internal const bool DEFAULT_VALUE_OPTIMIZE_FOR_IMMUTABLE_DATA = true; + private SimpleMessageStreamProviderOptions options; public bool IsRewindable { get { return false; } } - public Task Init(string name, IProviderRuntime providerUtilitiesManager, IProviderConfiguration config) + public SimpleMessageStreamProvider(string name, SimpleMessageStreamProviderOptions options, + ILoggerFactory loggerFactory, IProviderRuntime providerRuntime, SerializationManager serializationManager) { - if (!stateManager.PresetState(ProviderState.Initialized)) return Task.CompletedTask; + this.loggerFactory = loggerFactory; this.Name = name; - providerRuntime = (IStreamProviderRuntime) providerUtilitiesManager; - this.runtimeClient = this.providerRuntime.ServiceProvider.GetRequiredService(); - this.serializationManager = this.providerRuntime.ServiceProvider.GetRequiredService(); - fireAndForgetDelivery = config.GetBoolProperty(FIRE_AND_FORGET_DELIVERY, DEFAULT_VALUE_FIRE_AND_FORGET_DELIVERY); - optimizeForImmutableData = config.GetBoolProperty(OPTIMIZE_FOR_IMMUTABLE_DATA, DEFAULT_VALUE_OPTIMIZE_FOR_IMMUTABLE_DATA); - - string pubSubTypeString; - pubSubType = !config.Properties.TryGetValue(STREAM_PUBSUB_TYPE, out pubSubTypeString) - ? DEFAULT_STREAM_PUBSUB_TYPE - : (StreamPubSubType)Enum.Parse(typeof(StreamPubSubType), pubSubTypeString); - if (pubSubType == StreamPubSubType.ExplicitGrainBasedAndImplicit - || pubSubType == StreamPubSubType.ExplicitGrainBasedOnly) + this.logger = loggerFactory.CreateLogger($"{this.GetType().FullName}.{name}"); + this.options = options; + this.providerRuntime = providerRuntime as IStreamProviderRuntime; + this.runtimeClient = providerRuntime.ServiceProvider.GetService(); + this.serializationManager = serializationManager; + if (this.options.PubSubType == StreamPubSubType.ExplicitGrainBasedAndImplicit + || this.options.PubSubType == StreamPubSubType.ExplicitGrainBasedOnly) { this.streamSubscriptionManager = this.providerRuntime.ServiceProvider - .GetService().GetStreamSubscriptionManager(StreamSubscriptionManagerType.ExplicitSubscribeOnly); + .GetService() + .GetStreamSubscriptionManager(StreamSubscriptionManagerType.ExplicitSubscribeOnly); } - this.loggerFactory = providerRuntime.ServiceProvider.GetRequiredService(); - logger = loggerFactory.CreateLogger(this.GetType().FullName); - logger.Info("Initialized SimpleMessageStreamProvider with name {0} and with property FireAndForgetDelivery: {1}, OptimizeForImmutableData: {2} " + - "and PubSubType: {3}", Name, fireAndForgetDelivery, optimizeForImmutableData, pubSubType); - stateManager.CommitState(); - return Task.CompletedTask; - } - - public Task Start() - { - if (stateManager.PresetState(ProviderState.Started)) stateManager.CommitState(); - return Task.CompletedTask; - } - - public Task Close() - { - if (stateManager.PresetState(ProviderState.Closed)) stateManager.CommitState(); - return Task.CompletedTask; + logger.Info( + "Initialized SimpleMessageStreamProvider with name {0} and with property FireAndForgetDelivery: {1}, OptimizeForImmutableData: {2} " + + "and PubSubType: {3}", Name, this.options.FireAndForgetDelivery, this.options.OptimizeForImmutableData, + this.options.PubSubType); } public IStreamSubscriptionManager GetStreamSubscriptionManager() @@ -89,7 +62,7 @@ public IAsyncStream GetStream(Guid id, string streamNamespace) IInternalAsyncBatchObserver IInternalStreamProvider.GetProducerInterface(IAsyncStream stream) { return new SimpleMessageStreamProducer((StreamImpl)stream, Name, providerRuntime, - fireAndForgetDelivery, optimizeForImmutableData, providerRuntime.PubSub(pubSubType), IsRewindable, + this.options.FireAndForgetDelivery, this.options.OptimizeForImmutableData, providerRuntime.PubSub(this.options.PubSubType), IsRewindable, this.serializationManager, this.loggerFactory); } @@ -101,7 +74,12 @@ IInternalAsyncObservable IInternalStreamProvider.GetConsumerInterface(IAsy private IInternalAsyncObservable GetConsumerInterfaceImpl(IAsyncStream stream) { return new StreamConsumer((StreamImpl)stream, Name, providerRuntime, - providerRuntime.PubSub(pubSubType), this.logger, IsRewindable); + providerRuntime.PubSub(this.options.PubSubType), this.logger, IsRewindable); + } + + public static IStreamProvider Create(IServiceProvider services, string name) + { + return ActivatorUtilities.CreateInstance(services, name, services.GetService>().Get(name)); } } } diff --git a/src/Orleans.Core/Streams/SimpleMessageStream/SimpleMessageStreamProviderOptions.cs b/src/Orleans.Core/Streams/SimpleMessageStream/SimpleMessageStreamProviderOptions.cs new file mode 100644 index 00000000000..81784eb0da2 --- /dev/null +++ b/src/Orleans.Core/Streams/SimpleMessageStream/SimpleMessageStreamProviderOptions.cs @@ -0,0 +1,17 @@ + +using Orleans.Streams; + +namespace Orleans.Configuration +{ + public class SimpleMessageStreamProviderOptions + { + public bool FireAndForgetDelivery { get; set; } = DEFAULT_VALUE_FIRE_AND_FORGET_DELIVERY; + public const bool DEFAULT_VALUE_FIRE_AND_FORGET_DELIVERY = false; + + public bool OptimizeForImmutableData { get; set; } = DEFAULT_VALUE_OPTIMIZE_FOR_IMMUTABLE_DATA; + public const bool DEFAULT_VALUE_OPTIMIZE_FOR_IMMUTABLE_DATA = true; + + public StreamPubSubType PubSubType { get; set; } = DEFAULT_PUBSUB_TYPE; + public static StreamPubSubType DEFAULT_PUBSUB_TYPE = StreamPubSubType.ExplicitGrainBasedAndImplicit; + } +} diff --git a/src/Orleans.Runtime.Abstractions/Hosting/StreamHostingExtensions.cs b/src/Orleans.Runtime.Abstractions/Hosting/StreamHostingExtensions.cs index 194a033eb55..02043d8c27a 100644 --- a/src/Orleans.Runtime.Abstractions/Hosting/StreamHostingExtensions.cs +++ b/src/Orleans.Runtime.Abstractions/Hosting/StreamHostingExtensions.cs @@ -6,6 +6,7 @@ using Orleans.Providers.Streams.Common; using Orleans.Providers; using Orleans.Configuration; +using Orleans.Providers.Streams.SimpleMessageStream; namespace Orleans.Hosting { @@ -51,5 +52,47 @@ public static IServiceCollection AddSiloPersistentStreams(this IServic .AddSingletonNamedService(name, adapterFactory) .AddSingletonNamedService(name, (s, n) => s.GetServiceByName(n) as IControllable); } + + /// + /// Configure silo to use SimpleMessageProvider + /// + public static ISiloHostBuilder AddSimpleMessageStreamProvider(this ISiloHostBuilder builder, string name, + Action configureOptions) + + { + return builder.ConfigureServices(services => + services.AddSiloSimpleMessageStreamProvider(name, configureOptions)); + } + + /// + /// Configure silo to use SimpleMessageProvider + /// + public static ISiloHostBuilder AddSimpleMessageStreamProvider(this ISiloHostBuilder builder, string name, + Action> configureOptions = null) + + { + return builder.ConfigureServices(services => + services.AddSiloSimpleMessageStreamProvider(name, configureOptions)); + } + + /// + /// Configure silo to use simple message stream provider + /// + public static IServiceCollection AddSiloSimpleMessageStreamProvider(this IServiceCollection services, string name, + Action configureOptions = null) + { + return services.AddSiloSimpleMessageStreamProvider(name, ob => ob.Configure(configureOptions)); + } + + /// + /// Configure silo to use simple message provider + /// + public static IServiceCollection AddSiloSimpleMessageStreamProvider(this IServiceCollection services, string name, + Action> configureOptions = null) + { + configureOptions?.Invoke(services.AddOptions(name)); + return services.ConfigureNamedOptionForLogging(name) + .AddSingletonNamedService(name, SimpleMessageStreamProvider.Create); + } } } diff --git a/test/AWSUtils.Tests/Streaming/SQSStreamTests.cs b/test/AWSUtils.Tests/Streaming/SQSStreamTests.cs index 0ee4a6105a7..840baa8b364 100644 --- a/test/AWSUtils.Tests/Streaming/SQSStreamTests.cs +++ b/test/AWSUtils.Tests/Streaming/SQSStreamTests.cs @@ -35,10 +35,6 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder) //from the config files options.ClusterConfiguration.AddMemoryStorageProvider("MemoryStore", numStorageGrains: 1); - options.ClusterConfiguration.AddSimpleMessageStreamProvider("SMSProvider", fireAndForgetDelivery: false); - - options.ClientConfiguration.AddSimpleMessageStreamProvider("SMSProvider", fireAndForgetDelivery: false); - //previous silo creation options.ClusterConfiguration.Globals.DataConnectionString = AWSTestConstants.DefaultSQSConnectionString; options.ClientConfiguration.DataConnectionString = AWSTestConstants.DefaultSQSConnectionString; @@ -66,6 +62,7 @@ private class MySiloBuilderConfigurator : ISiloBuilderConfigurator public void Configure(ISiloHostBuilder hostBuilder) { hostBuilder + .AddSimpleMessageStreamProvider("SMSProvider") .AddSqsStreams("SQSProvider", options => { options.ConnectionString = AWSTestConstants.DefaultSQSConnectionString; @@ -82,6 +79,7 @@ private class MyClientBuilderConfigurator : IClientBuilderConfigurator public void Configure(IConfiguration configuration, IClientBuilder clientBuilder) { clientBuilder + .AddSimpleMessageStreamProvider("SMSProvider") .AddSqsStreams("SQSProvider", options => { options.ConnectionString = AWSTestConstants.DefaultSQSConnectionString; diff --git a/test/GoogleUtils.Tests/Streaming/PubSubStreamTests.cs b/test/GoogleUtils.Tests/Streaming/PubSubStreamTests.cs index b0397e7a41d..0ab8660e171 100644 --- a/test/GoogleUtils.Tests/Streaming/PubSubStreamTests.cs +++ b/test/GoogleUtils.Tests/Streaming/PubSubStreamTests.cs @@ -31,8 +31,6 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder) { //from the config files legacy.ClusterConfiguration.AddMemoryStorageProvider("MemoryStore", numStorageGrains: 1); - legacy.ClusterConfiguration.AddSimpleMessageStreamProvider("SMSProvider", fireAndForgetDelivery: false); - legacy.ClientConfiguration.AddSimpleMessageStreamProvider("SMSProvider", fireAndForgetDelivery: false); legacy.ClusterConfiguration.Globals.RegisterStorageProvider("PubSubStore"); }); @@ -45,6 +43,7 @@ private class MySiloBuilderConfigurator : ISiloBuilderConfigurator public void Configure(ISiloHostBuilder hostBuilder) { hostBuilder + .AddSimpleMessageStreamProvider("SMSProvider") .AddPubSubStreams(PUBSUB_STREAM_PROVIDER_NAME, options => { options.ProjectId = GoogleTestUtils.ProjectId; @@ -60,6 +59,7 @@ private class MyClientBuilderConfigurator : IClientBuilderConfigurator public void Configure(IConfiguration configuration, IClientBuilder clientBuilder) { clientBuilder + .AddSimpleMessageStreamProvider("SMSProvider") .AddPubSubStreams(PUBSUB_STREAM_PROVIDER_NAME, options => { options.ProjectId = GoogleTestUtils.ProjectId; diff --git a/test/Tester/GrainCallFilterTests.cs b/test/Tester/GrainCallFilterTests.cs index a2a682fd80b..dd5502b9e0f 100644 --- a/test/Tester/GrainCallFilterTests.cs +++ b/test/Tester/GrainCallFilterTests.cs @@ -32,9 +32,24 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder) { legacy.ClusterConfiguration.AddMemoryStorageProvider("Default"); legacy.ClusterConfiguration.AddMemoryStorageProvider("PubSubStore"); - legacy.ClusterConfiguration.AddSimpleMessageStreamProvider("SMSProvider"); - legacy.ClientConfiguration.AddSimpleMessageStreamProvider("SMSProvider"); }); + builder.AddClientBuilderConfigurator(); + builder.AddSiloBuilderConfigurator(); + } + + public class SiloConfigurator : ISiloBuilderConfigurator + { + public void Configure(ISiloHostBuilder hostBuilder) + { + hostBuilder.AddSimpleMessageStreamProvider("SMSProvider"); + } + } + public class ClientConfiguretor : IClientBuilderConfigurator + { + public void Configure(IConfiguration configuration, IClientBuilder clientBuilder) + { + clientBuilder.AddSimpleMessageStreamProvider("SMSProvider"); + } } private class SiloInvokerTestSiloBuilderConfigurator : ISiloBuilderConfigurator diff --git a/test/Tester/StreamingTests/ProgrammaticSubscribeTests/ProgrammaticSubscribeTestSMSStreamProvider.cs b/test/Tester/StreamingTests/ProgrammaticSubscribeTests/ProgrammaticSubscribeTestSMSStreamProvider.cs index d61740f130a..366b3a56fd7 100644 --- a/test/Tester/StreamingTests/ProgrammaticSubscribeTests/ProgrammaticSubscribeTestSMSStreamProvider.cs +++ b/test/Tester/StreamingTests/ProgrammaticSubscribeTests/ProgrammaticSubscribeTestSMSStreamProvider.cs @@ -3,6 +3,9 @@ using System.Linq; using System.Text; using System.Threading.Tasks; +using Microsoft.Extensions.Configuration; +using Orleans; +using Orleans.Hosting; using Orleans.Runtime.Configuration; using Orleans.Streams; using Orleans.TestingHost; @@ -23,15 +26,17 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder) { legacy.ClusterConfiguration.AddMemoryStorageProvider("Default"); legacy.ClusterConfiguration.AddMemoryStorageProvider("PubSubStore"); - legacy.ClusterConfiguration.AddSimpleMessageStreamProvider(StreamProviderName, - false, - true, - StreamPubSubType.ExplicitGrainBasedAndImplicit); - legacy.ClusterConfiguration.AddSimpleMessageStreamProvider(StreamProviderName2, - false, - true, - StreamPubSubType.ExplicitGrainBasedOnly); }); + builder.AddSiloBuilderConfigurator(); + } + } + + public class SiloConfigurator : ISiloBuilderConfigurator + { + public void Configure(ISiloHostBuilder hostBuilder) + { + hostBuilder.AddSimpleMessageStreamProvider(StreamProviderName); + hostBuilder.AddSimpleMessageStreamProvider(StreamProviderName2, options => options.PubSubType = StreamPubSubType.ExplicitGrainBasedOnly); } } diff --git a/test/Tester/StreamingTests/ProgrammaticSubscribeTests/ProgrammaticSubscribeTestsWithImplicitSubscrbingGrains.cs b/test/Tester/StreamingTests/ProgrammaticSubscribeTests/ProgrammaticSubscribeTestsWithImplicitSubscrbingGrains.cs index ceb38043672..6253a82f5f6 100644 --- a/test/Tester/StreamingTests/ProgrammaticSubscribeTests/ProgrammaticSubscribeTestsWithImplicitSubscrbingGrains.cs +++ b/test/Tester/StreamingTests/ProgrammaticSubscribeTests/ProgrammaticSubscribeTestsWithImplicitSubscrbingGrains.cs @@ -3,6 +3,9 @@ using Orleans.TestingHost; using System; using System.Threading.Tasks; +using Microsoft.Extensions.Configuration; +using Orleans; +using Orleans.Hosting; using TestExtensions; using Xunit; using UnitTests.Grains.ProgrammaticSubscribe; @@ -22,18 +25,29 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder) { legacy.ClusterConfiguration.AddMemoryStorageProvider("Default"); legacy.ClusterConfiguration.AddMemoryStorageProvider("PubSubStore"); - legacy.ClusterConfiguration.AddSimpleMessageStreamProvider(StreamProviderName, - false, - true, - StreamPubSubType.ImplicitOnly); - legacy.ClientConfiguration.AddSimpleMessageStreamProvider(StreamProviderName, - false, - true, - StreamPubSubType.ImplicitOnly); }); + builder.AddClientBuilderConfigurator(); + builder.AddSiloBuilderConfigurator(); } } + public class SiloConfigurator : ISiloBuilderConfigurator + { + public void Configure(ISiloHostBuilder hostBuilder) + { + hostBuilder.AddSimpleMessageStreamProvider(StreamProviderName,options => options.PubSubType = StreamPubSubType.ImplicitOnly); + } + } + + public class ClientConfiguretor : IClientBuilderConfigurator + { + public void Configure(IConfiguration configuration, IClientBuilder clientBuilder) + { + clientBuilder.AddSimpleMessageStreamProvider(StreamProviderName, options => options.PubSubType = StreamPubSubType.ImplicitOnly); + } + } + + public ProgrammaticSubscribeTestsWithImplicitSubscrbingGrains(Fixture fixture) { this.fixture = fixture; diff --git a/test/Tester/StreamingTests/SMSClientStreamTests.cs b/test/Tester/StreamingTests/SMSClientStreamTests.cs index 1831ca2591c..251c24e3d71 100644 --- a/test/Tester/StreamingTests/SMSClientStreamTests.cs +++ b/test/Tester/StreamingTests/SMSClientStreamTests.cs @@ -1,6 +1,9 @@ using System; using System.Threading.Tasks; +using Microsoft.Extensions.Configuration; +using Orleans; +using Orleans.Hosting; using Orleans.Runtime; using Orleans.Runtime.Configuration; using Orleans.TestingHost; @@ -30,11 +33,25 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder) builder.ConfigureLegacyConfiguration(legacy => { legacy.ClusterConfiguration.AddMemoryStorageProvider("PubSubStore"); - legacy.ClusterConfiguration.AddSimpleMessageStreamProvider(SMSStreamProviderName); legacy.ClusterConfiguration.Globals.ClientDropTimeout = TimeSpan.FromSeconds(5); - - legacy.ClientConfiguration.AddSimpleMessageStreamProvider(SMSStreamProviderName); }); + builder.AddClientBuilderConfigurator(); + builder.AddSiloBuilderConfigurator(); + } + + public class SiloConfigurator : ISiloBuilderConfigurator + { + public void Configure(ISiloHostBuilder hostBuilder) + { + hostBuilder.AddSimpleMessageStreamProvider(SMSStreamProviderName); + } + } + public class ClientConfiguretor : IClientBuilderConfigurator + { + public void Configure(IConfiguration configuration, IClientBuilder clientBuilder) + { + clientBuilder.AddSimpleMessageStreamProvider(SMSStreamProviderName); + } } [Fact, TestCategory("SlowBVT"), TestCategory("Functional"), TestCategory("Streaming")] diff --git a/test/Tester/StreamingTests/SMSDeactivationTests.cs b/test/Tester/StreamingTests/SMSDeactivationTests.cs index f10b03845a3..faa6f82b315 100644 --- a/test/Tester/StreamingTests/SMSDeactivationTests.cs +++ b/test/Tester/StreamingTests/SMSDeactivationTests.cs @@ -1,6 +1,8 @@ using System; using System.Threading.Tasks; +using Microsoft.Extensions.Configuration; using Orleans; +using Orleans.Hosting; using Orleans.Runtime; using Orleans.Runtime.Configuration; using Orleans.TestingHost; @@ -30,9 +32,24 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder) legacy.ClusterConfiguration.Globals.ResponseTimeout = TimeSpan.FromMinutes(30); legacy.ClusterConfiguration.AddMemoryStorageProvider("PubSubStore"); - legacy.ClusterConfiguration.AddSimpleMessageStreamProvider(StreamTestsConstants.SMS_STREAM_PROVIDER_NAME); - legacy.ClientConfiguration.AddSimpleMessageStreamProvider(StreamTestsConstants.SMS_STREAM_PROVIDER_NAME); }); + builder.AddClientBuilderConfigurator(); + builder.AddSiloBuilderConfigurator(); + } + + public class SiloConfigurator : ISiloBuilderConfigurator + { + public void Configure(ISiloHostBuilder hostBuilder) + { + hostBuilder.AddSimpleMessageStreamProvider(StreamTestsConstants.SMS_STREAM_PROVIDER_NAME); + } + } + public class ClientConfiguretor : IClientBuilderConfigurator + { + public void Configure(IConfiguration configuration, IClientBuilder clientBuilder) + { + clientBuilder.AddSimpleMessageStreamProvider(StreamTestsConstants.SMS_STREAM_PROVIDER_NAME); + } } [Fact, TestCategory("Functional"), TestCategory("Streaming")] diff --git a/test/Tester/StreamingTests/SMSSubscriptionMultiplicityTests.cs b/test/Tester/StreamingTests/SMSSubscriptionMultiplicityTests.cs index e4fa09f7023..7c7db478b71 100644 --- a/test/Tester/StreamingTests/SMSSubscriptionMultiplicityTests.cs +++ b/test/Tester/StreamingTests/SMSSubscriptionMultiplicityTests.cs @@ -1,5 +1,8 @@ using System; using System.Threading.Tasks; +using Microsoft.Extensions.Configuration; +using Orleans; +using Orleans.Hosting; using Orleans.Runtime; using Orleans.Runtime.Configuration; using Orleans.TestingHost; @@ -10,18 +13,33 @@ namespace UnitTests.StreamingTests { public class SMSSubscriptionMultiplicityTests : OrleansTestingBase, IClassFixture { + public class Fixture : BaseTestClusterFixture { - public const string StreamProvider = StreamTestsConstants.SMS_STREAM_PROVIDER_NAME; + public const string StreamProvider = StreamTestsConstants.SMS_STREAM_PROVIDER_NAME; protected override void ConfigureTestCluster(TestClusterBuilder builder) { builder.ConfigureLegacyConfiguration(legacy => { legacy.ClusterConfiguration.AddMemoryStorageProvider("PubSubStore"); - legacy.ClusterConfiguration.AddSimpleMessageStreamProvider(StreamProvider); - legacy.ClientConfiguration.AddSimpleMessageStreamProvider(StreamProvider); }); + builder.AddClientBuilderConfigurator(); + builder.AddSiloBuilderConfigurator(); + } + public class SiloConfigurator : ISiloBuilderConfigurator + { + public void Configure(ISiloHostBuilder hostBuilder) + { + hostBuilder.AddSimpleMessageStreamProvider(StreamProvider); + } + } + public class ClientConfiguretor : IClientBuilderConfigurator + { + public void Configure(IConfiguration configuration, IClientBuilder clientBuilder) + { + clientBuilder.AddSimpleMessageStreamProvider(StreamProvider); + } } } diff --git a/test/Tester/StreamingTests/SampleStreamingTests.cs b/test/Tester/StreamingTests/SampleStreamingTests.cs index 89df65be3c0..2674e280de9 100644 --- a/test/Tester/StreamingTests/SampleStreamingTests.cs +++ b/test/Tester/StreamingTests/SampleStreamingTests.cs @@ -1,7 +1,10 @@ using System; using System.Linq; using System.Threading.Tasks; +using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; +using Orleans; +using Orleans.Hosting; using Orleans.Runtime; using Orleans.Runtime.Configuration; using Orleans.Streams; @@ -27,10 +30,24 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder) builder.ConfigureLegacyConfiguration(legacy => { legacy.ClusterConfiguration.AddMemoryStorageProvider("PubSubStore"); - - legacy.ClusterConfiguration.AddSimpleMessageStreamProvider(StreamProvider, false); - legacy.ClientConfiguration.AddSimpleMessageStreamProvider(StreamProvider, false); }); + builder.AddSiloBuilderConfigurator(); + builder.AddClientBuilderConfigurator(); + } + + public class SiloConfigurator : ISiloBuilderConfigurator + { + public void Configure(ISiloHostBuilder hostBuilder) + { + hostBuilder.AddSimpleMessageStreamProvider(StreamProvider); + } + } + public class ClientConfiguretor : IClientBuilderConfigurator + { + public void Configure(IConfiguration configuration, IClientBuilder clientBuilder) + { + clientBuilder.AddSimpleMessageStreamProvider(StreamProvider); + } } } diff --git a/test/Tester/StreamingTests/StreamFilteringTests_SMS.cs b/test/Tester/StreamingTests/StreamFilteringTests_SMS.cs index e1539ae04e5..d1dbc90ef18 100644 --- a/test/Tester/StreamingTests/StreamFilteringTests_SMS.cs +++ b/test/Tester/StreamingTests/StreamFilteringTests_SMS.cs @@ -2,6 +2,9 @@ using Orleans.Runtime.Configuration; using Orleans.TestingHost; using System.Threading.Tasks; +using Microsoft.Extensions.Configuration; +using Orleans; +using Orleans.Hosting; using Xunit; using TestExtensions; using UnitTests.StreamingTests; @@ -19,10 +22,24 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder) { legacy.ClusterConfiguration.AddMemoryStorageProvider("MemoryStore"); legacy.ClusterConfiguration.AddMemoryStorageProvider("PubSubStore"); - - legacy.ClusterConfiguration.AddSimpleMessageStreamProvider(StreamProvider, false); - legacy.ClientConfiguration.AddSimpleMessageStreamProvider(StreamProvider, false); }); + builder.AddClientBuilderConfigurator(); + builder.AddSiloBuilderConfigurator(); + } + + public class SiloConfigurator : ISiloBuilderConfigurator + { + public void Configure(ISiloHostBuilder hostBuilder) + { + hostBuilder.AddSimpleMessageStreamProvider(StreamProvider); + } + } + public class ClientConfiguretor : IClientBuilderConfigurator + { + public void Configure(IConfiguration configuration, IClientBuilder clientBuilder) + { + clientBuilder.AddSimpleMessageStreamProvider(StreamProvider); + } } } diff --git a/test/TesterAzureUtils/Streaming/AQStreamingTests.cs b/test/TesterAzureUtils/Streaming/AQStreamingTests.cs index b5c86f83068..75ad2e99347 100644 --- a/test/TesterAzureUtils/Streaming/AQStreamingTests.cs +++ b/test/TesterAzureUtils/Streaming/AQStreamingTests.cs @@ -30,9 +30,6 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder) builder.ConfigureLegacyConfiguration(legacy => { legacy.ClusterConfiguration.AddMemoryStorageProvider(); - - legacy.ClusterConfiguration.AddSimpleMessageStreamProvider(SmsStreamProviderName, fireAndForgetDelivery: false); - legacy.ClientConfiguration.AddSimpleMessageStreamProvider(SmsStreamProviderName, fireAndForgetDelivery: false); }); builder.AddSiloBuilderConfigurator(); builder.AddClientBuilderConfigurator(); @@ -43,6 +40,7 @@ private class MyClientBuilderConfigurator : IClientBuilderConfigurator public void Configure(IConfiguration configuration, IClientBuilder clientBuilder) { clientBuilder + .AddSimpleMessageStreamProvider(SmsStreamProviderName) .AddAzureQueueStreams(AzureQueueStreamProviderName, options => { @@ -56,6 +54,7 @@ private class SiloBuilderConfigurator : ISiloBuilderConfigurator public void Configure(ISiloHostBuilder hostBuilder) { hostBuilder + .AddSimpleMessageStreamProvider(SmsStreamProviderName) .AddAzureTableGrainStorage("AzureStore", builder => builder.Configure>((options, silo) => { options.ServiceId = silo.Value.ServiceId.ToString(); diff --git a/test/TesterAzureUtils/Streaming/HaloStreamSubscribeTests.cs b/test/TesterAzureUtils/Streaming/HaloStreamSubscribeTests.cs index f5a02d94384..63d7447c08a 100644 --- a/test/TesterAzureUtils/Streaming/HaloStreamSubscribeTests.cs +++ b/test/TesterAzureUtils/Streaming/HaloStreamSubscribeTests.cs @@ -34,11 +34,6 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder) builder.ConfigureLegacyConfiguration(legacy => { legacy.ClusterConfiguration.AddMemoryStorageProvider("MemoryStore", numStorageGrains: 1); - - legacy.ClusterConfiguration.AddSimpleMessageStreamProvider(SmsStreamProviderName, fireAndForgetDelivery: false); - legacy.ClusterConfiguration.AddSimpleMessageStreamProvider("SMSProviderDoNotOptimizeForImmutableData", - fireAndForgetDelivery: false, - optimizeForImmutableData: false); }); builder.AddSiloBuilderConfigurator(); } @@ -54,6 +49,8 @@ public void Configure(ISiloHostBuilder hostBuilder) options.ConnectionString = TestDefaultConfiguration.DataConnectionString; options.DeleteStateOnClear = true; })) + .AddSimpleMessageStreamProvider(SmsStreamProviderName) + .AddSimpleMessageStreamProvider("SMSProviderDoNotOptimizeForImmutableData", options => options.OptimizeForImmutableData = false) .AddAzureTableGrainStorage("PubSubStore", builder => builder.Configure>((options, silo) => { options.ServiceId = silo.Value.ServiceId.ToString(); diff --git a/test/TesterAzureUtils/Streaming/StreamLifecycleTests.cs b/test/TesterAzureUtils/Streaming/StreamLifecycleTests.cs index e662957fdea..8c44066aff5 100644 --- a/test/TesterAzureUtils/Streaming/StreamLifecycleTests.cs +++ b/test/TesterAzureUtils/Streaming/StreamLifecycleTests.cs @@ -39,13 +39,6 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder) builder.ConfigureLegacyConfiguration(legacy => { legacy.ClusterConfiguration.AddMemoryStorageProvider("MemoryStore", numStorageGrains: 1); - - legacy.ClusterConfiguration.AddSimpleMessageStreamProvider(SmsStreamProviderName, fireAndForgetDelivery: false); - legacy.ClusterConfiguration.AddSimpleMessageStreamProvider("SMSProviderDoNotOptimizeForImmutableData", - fireAndForgetDelivery: false, - optimizeForImmutableData: false); - - legacy.ClientConfiguration.AddSimpleMessageStreamProvider(SmsStreamProviderName, fireAndForgetDelivery: false); }); builder.AddSiloBuilderConfigurator(); builder.AddClientBuilderConfigurator(); @@ -56,6 +49,7 @@ private class MyClientBuilderConfigurator : IClientBuilderConfigurator public void Configure(IConfiguration configuration, IClientBuilder clientBuilder) { clientBuilder + .AddSimpleMessageStreamProvider(SmsStreamProviderName) .AddAzureQueueStreams(AzureQueueStreamProviderName, options => { @@ -69,6 +63,8 @@ private class MySiloBuilderConfigurator : ISiloBuilderConfigurator public void Configure(ISiloHostBuilder hostBuilder) { hostBuilder + .AddSimpleMessageStreamProvider(SmsStreamProviderName) + .AddSimpleMessageStreamProvider("SMSProviderDoNotOptimizeForImmutableData", options => options.OptimizeForImmutableData = false) .AddAzureTableGrainStorage("AzureStore", builder => builder.Configure>((options, silo) => { options.ServiceId = silo.Value.ServiceId.ToString(); diff --git a/test/TesterAzureUtils/Streaming/StreamLimitTests.cs b/test/TesterAzureUtils/Streaming/StreamLimitTests.cs index 916b1bb006c..7830466109b 100644 --- a/test/TesterAzureUtils/Streaming/StreamLimitTests.cs +++ b/test/TesterAzureUtils/Streaming/StreamLimitTests.cs @@ -44,13 +44,7 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder) builder.ConfigureLegacyConfiguration(legacy => { - legacy.ClusterConfiguration.AddMemoryStorageProvider("MemoryStore", numStorageGrains: 1); - - legacy.ClusterConfiguration.AddSimpleMessageStreamProvider(SmsStreamProviderName, fireAndForgetDelivery: false); - legacy.ClusterConfiguration.AddSimpleMessageStreamProvider("SMSProviderDoNotOptimizeForImmutableData", - fireAndForgetDelivery: false, - optimizeForImmutableData: false); }); builder.AddSiloBuilderConfigurator(); } @@ -60,6 +54,8 @@ private class SiloBuilderConfigurator : ISiloBuilderConfigurator public void Configure(ISiloHostBuilder hostBuilder) { hostBuilder + .AddSimpleMessageStreamProvider(SmsStreamProviderName) + .AddSimpleMessageStreamProvider("SMSProviderDoNotOptimizeForImmutableData", options => options.OptimizeForImmutableData = false) .AddAzureTableGrainStorage("AzureStore", builder => builder.Configure>((options, silo) => { options.ServiceId = silo.Value.ServiceId.ToString(); diff --git a/test/TesterAzureUtils/Streaming/StreamReliabilityTests.cs b/test/TesterAzureUtils/Streaming/StreamReliabilityTests.cs index cba92a18efe..19d1dc80b5b 100644 --- a/test/TesterAzureUtils/Streaming/StreamReliabilityTests.cs +++ b/test/TesterAzureUtils/Streaming/StreamReliabilityTests.cs @@ -53,10 +53,6 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder) builder.ConfigureLegacyConfiguration(legacy => { legacy.ClusterConfiguration.AddMemoryStorageProvider("MemoryStore", numStorageGrains: 1); - - legacy.ClusterConfiguration.AddSimpleMessageStreamProvider(SMS_STREAM_PROVIDER_NAME, fireAndForgetDelivery: false); - - legacy.ClientConfiguration.AddSimpleMessageStreamProvider(SMS_STREAM_PROVIDER_NAME, fireAndForgetDelivery: false); }); builder.AddSiloBuilderConfigurator(); @@ -75,7 +71,8 @@ public void Configure(IConfiguration configuration, IClientBuilder clientBuilder options => { options.ConnectionString = TestDefaultConfiguration.DataConnectionString; - }); + }) + .AddSimpleMessageStreamProvider(SMS_STREAM_PROVIDER_NAME); } } @@ -94,6 +91,7 @@ public void Configure(ISiloHostBuilder hostBuilder) options.ConnectionString = TestDefaultConfiguration.DataConnectionString; options.DeleteStateOnClear = true; })) + .AddSimpleMessageStreamProvider(SMS_STREAM_PROVIDER_NAME) .AddAzureTableGrainStorage("PubSubStore", builder => builder.Configure>((options, silo) => { options.ServiceId = silo.Value.ServiceId.ToString(); diff --git a/test/TesterInternal/GeoClusterTests/MultiClusterRegistrationTests.cs b/test/TesterInternal/GeoClusterTests/MultiClusterRegistrationTests.cs index b81ad697e19..d02c54fa095 100644 --- a/test/TesterInternal/GeoClusterTests/MultiClusterRegistrationTests.cs +++ b/test/TesterInternal/GeoClusterTests/MultiClusterRegistrationTests.cs @@ -6,11 +6,13 @@ using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; +using Orleans.TestingHost; using TestExtensions; using TestGrainInterfaces; using Tests.GeoClusterTests; using Xunit; using Xunit.Abstractions; +using Orleans.Hosting; namespace UnitTests.GeoClusterTests { @@ -95,7 +97,13 @@ private Task StartClustersAndClients(params short[] silos) { return StartClustersAndClients(null, null, silos); } - + public class SiloConfigurator : ISiloBuilderConfigurator + { + public void Configure(ISiloHostBuilder hostBuilder) + { + hostBuilder.AddSimpleMessageStreamProvider("SMSProvider"); + } + } private Task StartClustersAndClients(Action config_customizer, Action clientconfig_customizer, params short[] silos) { WriteLog("Creating clusters and clients..."); @@ -111,8 +119,6 @@ private Task StartClustersAndClients(Action config_customi // configuration for cluster Action addtracing = (ClusterConfiguration c) => { - c.AddSimpleMessageStreamProvider("SMSProvider", fireAndForgetDelivery: false); - config_customizer?.Invoke(c); }; // configuration for clients @@ -127,7 +133,7 @@ private Task StartClustersAndClients(Action config_customi var numsilos = silos[i]; var clustername = ClusterNames[i] = ((char)('A' + i)).ToString(); var c = Clients[i] = new ClientWrapper[numsilos]; - NewGeoCluster(globalserviceid, clustername, silos[i], addtracing); + NewGeoCluster(globalserviceid, clustername, silos[i], addtracing); // create one client per silo Parallel.For(0, numsilos, paralleloptions, (j) => c[j] = this.NewClient(clustername, j, ClientWrapper.Factory, ccc)); } diff --git a/test/TesterInternal/GeoClusterTests/TestingClusterHost.cs b/test/TesterInternal/GeoClusterTests/TestingClusterHost.cs index 6c8894b6883..0a1277dbea1 100644 --- a/test/TesterInternal/GeoClusterTests/TestingClusterHost.cs +++ b/test/TesterInternal/GeoClusterTests/TestingClusterHost.cs @@ -136,8 +136,12 @@ private static int GetProxyBase(int clusternumber) #endregion #region Cluster Creation - public void NewGeoCluster(Guid globalServiceId, string clusterId, short numSilos, Action customizer = null) + { + NewGeoCluster(globalServiceId, clusterId, numSilos, customizer); + } + public void NewGeoCluster(Guid globalServiceId, string clusterId, short numSilos, Action customizer = null) + where TSiloBuilderConfigurator : ISiloBuilderConfigurator, new() { Action extendedcustomizer = config => { @@ -154,13 +158,17 @@ public void NewGeoCluster(Guid globalServiceId, string clusterId, short numSilos ChannelType = GlobalConfiguration.GossipChannelType.AzureTable, ConnectionString = TestDefaultConfiguration.DataConnectionString }}; - customizer?.Invoke(config); }; - NewCluster(clusterId, numSilos, extendedcustomizer); + NewCluster(clusterId, numSilos, extendedcustomizer); + } + private class NoOpSiloBuilderConfigurator : ISiloBuilderConfigurator + { + public void Configure(ISiloHostBuilder hostBuilder) + { + } } - private class TestSiloBuilderConfigurator : ISiloBuilderConfigurator { @@ -179,7 +187,14 @@ public void Configure(ISiloHostBuilder hostBuilder) } } - public void NewCluster(string clusterId, short numSilos, Action customizer = null) + public void NewCluster(string clusterId, short numSilos, + Action customizer = null) + { + NewCluster(clusterId, numSilos, customizer); + } + + public void NewCluster(string clusterId, short numSilos, Action customizer = null) + where TSiloBuilderConfigurator : ISiloBuilderConfigurator, new() { TestCluster testCluster; lock (Clusters) @@ -198,6 +213,7 @@ public void NewCluster(string clusterId, short numSilos, Action(); + builder.AddSiloBuilderConfigurator(); builder.ConfigureLegacyConfiguration(legacy => { legacy.ClusterConfiguration.AddMemoryStorageProvider("Default"); diff --git a/test/TesterInternal/StreamProvidersTests.cs b/test/TesterInternal/StreamProvidersTests.cs index 301a4f49933..8f32371fb66 100644 --- a/test/TesterInternal/StreamProvidersTests.cs +++ b/test/TesterInternal/StreamProvidersTests.cs @@ -11,6 +11,8 @@ using Xunit; using Xunit.Abstractions; using System.Threading.Tasks; +using Microsoft.Extensions.Configuration; +using Orleans.Hosting; using Orleans.Runtime.TestHooks; using Orleans.Runtime; using Orleans.Runtime.Configuration; @@ -112,12 +114,17 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder) builder.ConfigureLegacyConfiguration(legacy => { legacy.ClusterConfiguration.AddMemoryStorageProvider("MemoryStore", numStorageGrains: 1); - - legacy.ClusterConfiguration.AddSimpleMessageStreamProvider(StreamTestsConstants.SMS_STREAM_PROVIDER_NAME, fireAndForgetDelivery: false); - legacy.ClusterConfiguration.AddSimpleMessageStreamProvider("SMSProviderDoNotOptimizeForImmutableData", - fireAndForgetDelivery: false, - optimizeForImmutableData: false); }); + builder.AddSiloBuilderConfigurator(); + } + + public class SiloConfigurator : ISiloBuilderConfigurator + { + public void Configure(ISiloHostBuilder hostBuilder) + { + hostBuilder.AddSimpleMessageStreamProvider(StreamTestsConstants.SMS_STREAM_PROVIDER_NAME) + .AddSimpleMessageStreamProvider("SMSProviderDoNotOptimizeForImmutableData", options => options.OptimizeForImmutableData = false); + } } } diff --git a/test/TesterInternal/StreamingTests/SMSStreamingTests.cs b/test/TesterInternal/StreamingTests/SMSStreamingTests.cs index 7fca0cf6939..160aefc433c 100644 --- a/test/TesterInternal/StreamingTests/SMSStreamingTests.cs +++ b/test/TesterInternal/StreamingTests/SMSStreamingTests.cs @@ -1,5 +1,8 @@ using System; using System.Threading.Tasks; +using Microsoft.Extensions.Configuration; +using Orleans; +using Orleans.Hosting; using Orleans.Providers; using Orleans.Providers.Streams.SimpleMessageStream; using Orleans.Runtime.Configuration; @@ -16,7 +19,7 @@ public class Fixture : BaseTestClusterFixture private static readonly Guid ServiceId = Guid.NewGuid(); public const string AzureQueueStreamProviderName = StreamTestsConstants.AZURE_QUEUE_STREAM_PROVIDER_NAME; public const string SmsStreamProviderName = "SMSProvider"; - + public const bool SMSFireAndForgetOnSilo = false; public ClusterConfiguration ClusterConfiguration { get; set; } protected override void ConfigureTestCluster(TestClusterBuilder builder) @@ -28,16 +31,32 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder) legacy.ClusterConfiguration.AddMemoryStorageProvider("MemoryStore", numStorageGrains: 1); legacy.ClusterConfiguration.AddMemoryStorageProvider("PubSubStore"); - legacy.ClusterConfiguration.AddSimpleMessageStreamProvider(SmsStreamProviderName, fireAndForgetDelivery: false); - legacy.ClusterConfiguration.AddSimpleMessageStreamProvider("SMSProviderDoNotOptimizeForImmutableData", - fireAndForgetDelivery: false, - optimizeForImmutableData: false); - - legacy.ClientConfiguration.AddSimpleMessageStreamProvider(SmsStreamProviderName, fireAndForgetDelivery: false); - legacy.ClusterConfiguration.Globals.ServiceId = ServiceId; this.ClusterConfiguration = legacy.ClusterConfiguration; }); + builder.AddSiloBuilderConfigurator(); + builder.AddClientBuilderConfigurator(); + } + + public class SiloConfigurator : ISiloBuilderConfigurator + { + public void Configure(ISiloHostBuilder hostBuilder) + { + hostBuilder.AddSimpleMessageStreamProvider(SmsStreamProviderName) + .AddSimpleMessageStreamProvider("SMSProviderDoNotOptimizeForImmutableData", + options => + { + options.OptimizeForImmutableData = false; + options.FireAndForgetDelivery = SMSFireAndForgetOnSilo; + }); + } + } + public class ClientConfiguretor : IClientBuilderConfigurator + { + public void Configure(IConfiguration configuration, IClientBuilder clientBuilder) + { + clientBuilder.AddSimpleMessageStreamProvider(SmsStreamProviderName); + } } } @@ -48,7 +67,7 @@ public SMSStreamingTests(Fixture fixture) { runner = new SingleStreamTestRunner(fixture.HostedCluster.InternalClient, SingleStreamTestRunner.SMS_STREAM_PROVIDER_NAME); // runner = new SingleStreamTestRunner(SingleStreamTestRunner.SMS_STREAM_PROVIDER_NAME, 0, false); - fireAndForgetDeliveryProperty = ExtractFireAndForgetDeliveryProperty(fixture); + fireAndForgetDeliveryProperty = Fixture.SMSFireAndForgetOnSilo; } #region Simple Message Stream Tests @@ -144,27 +163,6 @@ public async Task SMS_14_SameGrain_ProducerFirstConsumerLater() await runner.StreamTest_14_SameGrain_ProducerFirstConsumerLater(!fireAndForgetDeliveryProperty); } - private bool ExtractFireAndForgetDeliveryProperty(Fixture fixture) - { - ProviderCategoryConfiguration providerConfigs; - if (fixture.ClusterConfiguration.Globals.ProviderConfigurations.TryGetValue(ProviderCategoryConfiguration.STREAM_PROVIDER_CATEGORY_NAME, - out providerConfigs)) - { - IProviderConfiguration provider; - if (providerConfigs.Providers.TryGetValue(SingleStreamTestRunner.SMS_STREAM_PROVIDER_NAME, out provider)) - { - string fireAndForgetProperty = null; - bool fireAndForget = false; - if (provider.Properties.TryGetValue(SimpleMessageStreamProvider.FIRE_AND_FORGET_DELIVERY, out fireAndForgetProperty)) - { - fireAndForget = Boolean.Parse(fireAndForgetProperty); - } - return fireAndForget; - } - } - throw new Exception("failed to get fire and forget"); - } - //----------------------------------------------// [Fact, TestCategory("Functional"), TestCategory("Streaming")] diff --git a/test/TesterInternal/StreamingTests/StreamPubSubReliabilityTests.cs b/test/TesterInternal/StreamingTests/StreamPubSubReliabilityTests.cs index 727e5b188c1..a9ffb08d720 100644 --- a/test/TesterInternal/StreamingTests/StreamPubSubReliabilityTests.cs +++ b/test/TesterInternal/StreamingTests/StreamPubSubReliabilityTests.cs @@ -1,6 +1,8 @@ using System; using System.Threading.Tasks; +using Microsoft.Extensions.Configuration; using Orleans; +using Orleans.Hosting; using Orleans.Runtime; using Orleans.Runtime.Configuration; using Orleans.TestingHost; @@ -24,17 +26,30 @@ protected override void ConfigureTestCluster(TestClusterBuilder builder) legacy.ClusterConfiguration.AddMemoryStorageProvider("MemoryStore", numStorageGrains: 1); legacy.ClusterConfiguration.Globals.RegisterStorageProvider(PubSubStoreProviderName); - legacy.ClusterConfiguration.AddSimpleMessageStreamProvider(StreamTestsConstants.SMS_STREAM_PROVIDER_NAME, fireAndForgetDelivery: false); - legacy.ClusterConfiguration.Globals.MaxResendCount = 0; legacy.ClusterConfiguration.Globals.ResponseTimeout = TimeSpan.FromSeconds(30); - legacy.ClientConfiguration.AddSimpleMessageStreamProvider(StreamTestsConstants.SMS_STREAM_PROVIDER_NAME, fireAndForgetDelivery: false); - legacy.ClientConfiguration.ClientSenderBuckets = 8192; legacy.ClientConfiguration.ResponseTimeout = TimeSpan.FromSeconds(30); legacy.ClientConfiguration.MaxResendCount = 0; }); + builder.AddSiloBuilderConfigurator(); + builder.AddClientBuilderConfigurator(); + } + } + + public class SiloConfigurator : ISiloBuilderConfigurator + { + public void Configure(ISiloHostBuilder hostBuilder) + { + hostBuilder.AddSimpleMessageStreamProvider(StreamTestsConstants.SMS_STREAM_PROVIDER_NAME); + } + } + public class ClientConfiguretor : IClientBuilderConfigurator + { + public void Configure(IConfiguration configuration, IClientBuilder clientBuilder) + { + clientBuilder.AddSimpleMessageStreamProvider(StreamTestsConstants.SMS_STREAM_PROVIDER_NAME); } }