diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/Orleans.Streams.RabbitMqStreamProvider.Tests.csproj b/Orleans.Streams.RabbitMqStreamProvider.Tests/Orleans.Streams.RabbitMqStreamProvider.Tests.csproj index 0667c5b..8199fcb 100755 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/Orleans.Streams.RabbitMqStreamProvider.Tests.csproj +++ b/Orleans.Streams.RabbitMqStreamProvider.Tests/Orleans.Streams.RabbitMqStreamProvider.Tests.csproj @@ -1,7 +1,7 @@  - netcoreapp2.2 + netcoreapp3.0 false false @@ -11,19 +11,16 @@ - + all runtime; build; native; contentfiles; analyzers - - - - - + + + - diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/TestCluster.cs b/Orleans.Streams.RabbitMqStreamProvider.Tests/TestCluster.cs index 325aff3..660bf9f 100644 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/TestCluster.cs +++ b/Orleans.Streams.RabbitMqStreamProvider.Tests/TestCluster.cs @@ -67,7 +67,6 @@ public static async Task Create() .UseLocalhostClustering(siloPort: 11111, gatewayPort: 30000) .Configure(options => { - options.ExpectedClusterSize = 2; options.UseLivenessGossip = true; options.ProbeTimeout = TimeSpan.FromSeconds(5); options.NumMissedProbesLimit = 3; @@ -80,7 +79,6 @@ public static async Task Create() primarySiloEndpoint: new IPEndPoint(IPAddress.Loopback, EndpointOptions.DEFAULT_SILO_PORT)) .Configure(options => { - options.ExpectedClusterSize = 2; options.UseLivenessGossip = true; options.ProbeTimeout = TimeSpan.FromSeconds(5); options.NumMissedProbesLimit = 3; diff --git a/Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueAdapterCache.cs b/Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueAdapterCache.cs deleted file mode 100644 index 53b4319..0000000 --- a/Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueAdapterCache.cs +++ /dev/null @@ -1,29 +0,0 @@ -using System; -using System.Collections.Concurrent; - -namespace Orleans.Streams.Cache -{ - public class ConcurrentQueueAdapterCache : IQueueAdapterCache - { - private readonly int _cacheSize; - private readonly ConcurrentDictionary _caches; - - public ConcurrentQueueAdapterCache(int cacheSize) - { - if (cacheSize <= 0) throw new ArgumentOutOfRangeException(nameof(cacheSize), "CacheSize must be a positive number."); - _cacheSize = cacheSize; - _caches = new ConcurrentDictionary(); - } - - /// - /// Create a cache for a given queue id - /// - /// - public IQueueCache CreateQueueCache(QueueId queueId) - { - return _caches.AddOrUpdate(queueId, - id => new ConcurrentQueueCache(_cacheSize), - (id, queueCache) => queueCache); - } - } -} diff --git a/Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueCache.cs b/Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueCache.cs deleted file mode 100644 index f3e31e0..0000000 --- a/Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueCache.cs +++ /dev/null @@ -1,90 +0,0 @@ -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Threading; -using Orleans.Streams.BatchContainer; - -namespace Orleans.Streams.Cache -{ - /// - /// A queue cache that keeps items in memory - /// - /// The dictionary is net kept clean, so if there is huge number of random streams the dictionary will keep growing - /// - public class ConcurrentQueueCache : IQueueCache - { - // StreamIdentity = = - private readonly ConcurrentDictionary, ConcurrentQueue> _cache; - private readonly ConcurrentQueue _itemsToPurge; - private readonly int _maxCacheSize; - - private int _numItemsInCache; - - public ConcurrentQueueCache(int cacheSize) - { - _maxCacheSize = cacheSize; - _cache = new ConcurrentDictionary, ConcurrentQueue>(); - _itemsToPurge = new ConcurrentQueue(); - } - - /// - /// The limit of the maximum number of items that can be added. - /// - /// Returns just an estimate. It doesn't have to be exact. It would require extra locking. - /// - /// Note: there is a condition in pulling agent that if maxAddCount is less than 0 and not equal to -1 (unlimited), - /// it will skip the reading cycle including cache purging - /// - public int GetMaxAddCount() => Math.Max(1, _maxCacheSize - _numItemsInCache); - - public bool IsUnderPressure() => _numItemsInCache >= _maxCacheSize; - - public void AddToCache(IList messages) - { - foreach (var msg in messages) - { - _cache.GetOrAdd(new Tuple(msg.StreamGuid, msg.StreamNamespace), - new ConcurrentQueue()) - .Enqueue((RabbitMqBatchContainer) msg); - } - Interlocked.Add(ref _numItemsInCache, messages.Count); - } - - public bool TryPurgeFromCache(out IList purgedItems) - { - purgedItems = null; - if (!_itemsToPurge.IsEmpty) - { - purgedItems = new List(); - while (_itemsToPurge.TryDequeue(out var item)) - { - purgedItems.Add(item); - } - Interlocked.Add(ref _numItemsInCache, -purgedItems.Count); - } - return purgedItems?.Count > 0; - } - - /// - /// Here we ignore the token since this is not rewindable stream anyway - /// - public IQueueCacheCursor GetCacheCursor(IStreamIdentity streamIdentity, StreamSequenceToken token) - { - return new ConcurrentQueueCacheCursor( - moveNext: () => - { - RabbitMqBatchContainer item = null; - _cache.TryGetValue(new Tuple(streamIdentity.Guid, streamIdentity.Namespace), out var queue); - queue?.TryDequeue(out item); - return item; - }, - purgeItem: item => - { - if (item != null) - { - _itemsToPurge.Enqueue(item); - } - }); - } - } -} diff --git a/Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueCacheCursor.cs b/Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueCacheCursor.cs deleted file mode 100644 index 9f7bb47..0000000 --- a/Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueCacheCursor.cs +++ /dev/null @@ -1,59 +0,0 @@ -using System; -using Orleans.Streams.BatchContainer; - -namespace Orleans.Streams.Cache -{ - public class ConcurrentQueueCacheCursor : IQueueCacheCursor - { - private readonly Func _moveNext; - private readonly Action _purgeItem; - - private readonly object _syncRoot = new object(); - - private RabbitMqBatchContainer _current; - - public ConcurrentQueueCacheCursor(Func moveNext, Action purgeItem) - { - _moveNext = moveNext ?? throw new ArgumentNullException(nameof(moveNext)); - _purgeItem = purgeItem ?? throw new ArgumentNullException(nameof(purgeItem)); - } - - public void Dispose() - { - lock (_syncRoot) - { - _purgeItem(_current); - _current = null; - } - } - - public IBatchContainer GetCurrent(out Exception exception) - { - exception = null; - return _current; - } - - public bool MoveNext() - { - lock (_syncRoot) - { - _purgeItem(_current); - _current = _moveNext(); - } - return _current != null; - } - - public void Refresh(StreamSequenceToken token) - { - // do nothing - } - - public void RecordDeliveryFailure() - { - if (_current != null) - { - _current.DeliveryFailure = true; - } - } - } -} \ No newline at end of file diff --git a/Orleans.Streams.RabbitMqStreamProvider/ClientBuilderExtensions.cs b/Orleans.Streams.RabbitMqStreamProvider/ClientBuilderExtensions.cs index 3d3ba1c..c04731e 100644 --- a/Orleans.Streams.RabbitMqStreamProvider/ClientBuilderExtensions.cs +++ b/Orleans.Streams.RabbitMqStreamProvider/ClientBuilderExtensions.cs @@ -1,4 +1,5 @@ using System; +using Orleans.Configuration; using Orleans.Streaming; using Orleans.Streams.BatchContainer; @@ -6,21 +7,61 @@ namespace Orleans.Hosting { public static class ClientBuilderExtensions { + /// + /// Configure client to use RMQ persistent streams, using the . + /// + [Obsolete("Use 'UseRabbitMqStreams'")] + public static IClientBuilder AddRabbitMqStream(this IClientBuilder builder, string name, Action> configure = null) + { + return UseRabbitMqStreams(builder, name, configure); + } + /// /// Configure client to use RMQ persistent streams. - /// This version enables to inject a custom BacthContainer serializer. /// - public static IClientBuilder AddRabbitMqStream(this IClientBuilder builder, string name, Action> configure) where TSerializer : IBatchContainerSerializer, new() + [Obsolete("Use 'UseRabbitMqStreams'")] + public static IClientBuilder AddRabbitMqStream(this IClientBuilder builder, string name, Action> configure = null) + where TSerializer : IBatchContainerSerializer, new() { - configure?.Invoke(new ClusterClientRabbitMqStreamConfigurator(name, builder)); - return builder; + return UseRabbitMqStreams(builder, name, configure); + } + + /// + /// Configure client to use RMQ persistent streams, using the . + /// + public static IClientBuilder UseRabbitMqStreams(this IClientBuilder builder, string name, Action options) + { + return UseRabbitMqStreams(builder, name, options); } /// /// Configure client to use RMQ persistent streams. - /// This version uses the default Orleans serializer. /// - public static IClientBuilder AddRabbitMqStream(this IClientBuilder builder, string name, Action> configure) - => AddRabbitMqStream(builder, name, configure); + public static IClientBuilder UseRabbitMqStreams(this IClientBuilder builder, string name, Action options) + where TSerializer : IBatchContainerSerializer, new() + { + return builder.UseRabbitMqStreams(name, b => b.ConfigureRabbitMq(ob => ob.Configure(options))); + + } + + /// + /// Configure client to use RMQ persistent streams, using the . + /// + public static IClientBuilder UseRabbitMqStreams(this IClientBuilder builder, string name, Action> configure = null) + { + return UseRabbitMqStreams(builder, name, configure); + } + + /// + /// Configure client to use RMQ persistent streams. + /// + public static IClientBuilder UseRabbitMqStreams(this IClientBuilder builder, string name, Action> configure = null) + where TSerializer : IBatchContainerSerializer, new() + { + var configurator = new ClusterClientRabbitMqStreamConfigurator(name, builder); + configure?.Invoke(configurator); + + return builder; + } } } \ No newline at end of file diff --git a/Orleans.Streams.RabbitMqStreamProvider/Configuration/RabbitMqOptions.cs b/Orleans.Streams.RabbitMqStreamProvider/Configuration/RabbitMqOptions.cs index f1373cc..7ce0e77 100644 --- a/Orleans.Streams.RabbitMqStreamProvider/Configuration/RabbitMqOptions.cs +++ b/Orleans.Streams.RabbitMqStreamProvider/Configuration/RabbitMqOptions.cs @@ -4,15 +4,15 @@ namespace Orleans.Configuration { public class RabbitMqOptions { - public string HostName; - public int Port; - public string VirtualHost; - public string UserName; - public string Password; - - public string QueueNamePrefix; - public bool UseQueuePartitioning = DefaultUseQueuePartitioning; - public int NumberOfQueues = DefaultNumberOfQueues; + public string HostName { get; set; } + public int Port { get; set; } + public string VirtualHost { get; set; } + public string UserName { get; set; } + public string Password { get; set; } + + public string QueueNamePrefix { get; set; } + public bool UseQueuePartitioning { get; set; } = DefaultUseQueuePartitioning; + public int NumberOfQueues { get; set; } = DefaultNumberOfQueues; public const bool DefaultUseQueuePartitioning = false; diff --git a/Orleans.Streams.RabbitMqStreamProvider/Orleans.Streams.RabbitMqStreamProvider.csproj b/Orleans.Streams.RabbitMqStreamProvider/Orleans.Streams.RabbitMqStreamProvider.csproj index e004f90..e1d375f 100755 --- a/Orleans.Streams.RabbitMqStreamProvider/Orleans.Streams.RabbitMqStreamProvider.csproj +++ b/Orleans.Streams.RabbitMqStreamProvider/Orleans.Streams.RabbitMqStreamProvider.csproj @@ -1,11 +1,11 @@ - + netstandard2.0 - + diff --git a/Orleans.Streams.RabbitMqStreamProvider/RabbitMqAdapterFactory.cs b/Orleans.Streams.RabbitMqStreamProvider/RabbitMqAdapterFactory.cs index 46d150c..3049499 100755 --- a/Orleans.Streams.RabbitMqStreamProvider/RabbitMqAdapterFactory.cs +++ b/Orleans.Streams.RabbitMqStreamProvider/RabbitMqAdapterFactory.cs @@ -3,9 +3,9 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Orleans.Configuration; +using Orleans.Providers.Streams.Common; using Orleans.Serialization; using Orleans.Streams.BatchContainer; -using Orleans.Streams.Cache; namespace Orleans.Streams { @@ -29,7 +29,9 @@ public RabbitMqAdapterFactory( if (serviceProvider == null) throw new ArgumentNullException(nameof(serviceProvider)); if (loggerFactory == null) throw new ArgumentNullException(nameof(loggerFactory)); - _cache = new ConcurrentQueueAdapterCache(cachingOptions.CacheSize); + _cache = new SimpleQueueAdapterCache(new SimpleQueueCacheOptions() {CacheSize = cachingOptions.CacheSize}, providerName, loggerFactory); + + _mapper = new HashRingBasedStreamQueueMapper(new HashRingStreamQueueMapperOptions { TotalQueueCount = rmqOptions.NumberOfQueues }, rmqOptions.QueueNamePrefix); _failureHandler = Task.FromResult(new NoOpStreamDeliveryFailureHandler(false)); @@ -46,10 +48,13 @@ public RabbitMqAdapterFactory( public IStreamQueueMapper GetStreamQueueMapper() => _mapper; public static RabbitMqAdapterFactory Create(IServiceProvider services, string name) - => ActivatorUtilities.CreateInstance>( - services, - name, - services.GetOptionsByName(name), - services.GetOptionsByName(name)); + { + var rabbitMqOptions = services.GetOptionsByName(name); + var cachingOptions = services.GetOptionsByName(name); + + var factory = ActivatorUtilities.CreateInstance>(services, name, rabbitMqOptions, cachingOptions); + + return factory; + } } } \ No newline at end of file diff --git a/Orleans.Streams.RabbitMqStreamProvider/RabbitMqStreamBuilder.cs b/Orleans.Streams.RabbitMqStreamProvider/RabbitMqStreamBuilder.cs index 07b578e..acf428c 100644 --- a/Orleans.Streams.RabbitMqStreamProvider/RabbitMqStreamBuilder.cs +++ b/Orleans.Streams.RabbitMqStreamProvider/RabbitMqStreamBuilder.cs @@ -1,78 +1,53 @@ using System; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using Orleans.ApplicationParts; using Orleans.Configuration; +using Orleans.Hosting; +using Orleans.Providers.Streams.Common; using Orleans.Streams; using Orleans.Streams.BatchContainer; namespace Orleans.Streaming { - public class SiloRabbitMqStreamConfigurator : SiloPersistentStreamConfigurator where TSerializer : IBatchContainerSerializer, new() + + public class SiloRabbitMqStreamConfigurator : SiloPersistentStreamConfigurator + where TSerializer : IBatchContainerSerializer, new() { + [Obsolete] public SiloRabbitMqStreamConfigurator(string name, Action> configureDelegate) - : base(name, configureDelegate, RabbitMqAdapterFactory.Create) + : this(name, configureDelegate, configureAppPartsDelegate => { }) { - this.configureDelegate(services => - { - services.ConfigureNamedOptionForLogging(name) - .AddTransient(sp => new RabbitMqOptionsValidator(sp.GetOptionsByName(name), name)) - .ConfigureNamedOptionForLogging(name) - .AddTransient(sp => new CachingOptionsValidator(sp.GetOptionsByName(name), name)); - }); - } - public SiloRabbitMqStreamConfigurator ConfigureRabbitMq(string host, int port, string virtualHost, string user, string password, string queueName, bool useQueuePartitioning = RabbitMqOptions.DefaultUseQueuePartitioning, int numberOfQueues = RabbitMqOptions.DefaultNumberOfQueues) - { - Configure(ob => ob.Configure(options => - { - options.HostName = host; - options.Port = port; - options.VirtualHost = virtualHost; - options.UserName = user; - options.Password = password; - options.QueueNamePrefix = queueName; - options.UseQueuePartitioning = useQueuePartitioning; - options.NumberOfQueues = numberOfQueues; - })); - return this; } - public SiloRabbitMqStreamConfigurator ConfigureCache(int cacheSize) + public SiloRabbitMqStreamConfigurator(string name, Action> configureServicesDelegate, Action> configureAppPartsDelegate) + : base(name, configureServicesDelegate, RabbitMqAdapterFactory.Create) { - Configure(ob => ob.Configure(options => options.CacheSize = cacheSize)); - return this; - } - public SiloRabbitMqStreamConfigurator ConfigureCache(int cacheSize, TimeSpan cacheFillingTimeout) - { - Configure(ob => ob.Configure(options => - { - options.CacheSize = cacheSize; - options.CacheFillingTimeout = cacheFillingTimeout; - })); - return this; - } - } + configureAppPartsDelegate(parts => + { + parts + .AddFrameworkPart(typeof(RabbitMqAdapter).Assembly) + .AddFrameworkPart(typeof(EventSequenceTokenV2).Assembly); + }); - public class ClusterClientRabbitMqStreamConfigurator : ClusterClientPersistentStreamConfigurator where TSerializer : IBatchContainerSerializer, new() - { - public ClusterClientRabbitMqStreamConfigurator(string name, IClientBuilder builder) - : base(name, builder, RabbitMqAdapterFactory.Create) - { - clientBuilder - .ConfigureApplicationParts(parts => parts.AddFrameworkPart(typeof(RabbitMqAdapterFactory).Assembly)) - .ConfigureServices(services => services + ConfigureDelegate(services => + { + services .ConfigureNamedOptionForLogging(name) + .ConfigureNamedOptionForLogging(name) .AddTransient(sp => new RabbitMqOptionsValidator(sp.GetOptionsByName(name), name)) - .ConfigureNamedOptionForLogging(name)); - + .AddTransient(sp => new CachingOptionsValidator(sp.GetOptionsByName(name), name)); + }); } - public ClusterClientRabbitMqStreamConfigurator ConfigureRabbitMq( + public SiloRabbitMqStreamConfigurator ConfigureRabbitMq( string host, int port, string virtualHost, string user, string password, string queueName, bool useQueuePartitioning = RabbitMqOptions.DefaultUseQueuePartitioning, int numberOfQueues = RabbitMqOptions.DefaultNumberOfQueues) { - Configure(ob => ob.Configure(options => + return ConfigureRabbitMq(configureOptions => configureOptions.Configure(options => { options.HostName = host; options.Port = port; @@ -83,6 +58,47 @@ public ClusterClientRabbitMqStreamConfigurator ConfigureRabbitMq( options.UseQueuePartitioning = useQueuePartitioning; options.NumberOfQueues = numberOfQueues; })); + } + + public SiloRabbitMqStreamConfigurator ConfigureRabbitMq(Action> configureOptions) + { + this.Configure(configureOptions); + + return this; + } + + public SiloRabbitMqStreamConfigurator ConfigureCache(Action> configureOptions) + { + this.Configure(configureOptions); + + return this; + } + } + + public class ClusterClientRabbitMqStreamConfigurator : ClusterClientPersistentStreamConfigurator + where TSerializer : IBatchContainerSerializer, new() + { + public ClusterClientRabbitMqStreamConfigurator(string name, IClientBuilder clientBuilder) + : base(name, clientBuilder, RabbitMqAdapterFactory.Create) + { + clientBuilder + .ConfigureApplicationParts(parts => + { + parts + .AddFrameworkPart(typeof(RabbitMqAdapterFactory).Assembly) + .AddApplicationPart(typeof(EventSequenceTokenV2).Assembly); + }) + .ConfigureServices(services => + { + services + .ConfigureNamedOptionForLogging(name) + .AddTransient(sp => new RabbitMqOptionsValidator(sp.GetOptionsByName(name), name)); + }); + } + + public ClusterClientRabbitMqStreamConfigurator ConfigureRabbitMq(Action> configureOptions) + { + this.Configure(configureOptions); return this; } } diff --git a/Orleans.Streams.RabbitMqStreamProvider/ServiceCollectionExtensions.cs b/Orleans.Streams.RabbitMqStreamProvider/ServiceCollectionExtensions.cs new file mode 100644 index 0000000..6cbe6d4 --- /dev/null +++ b/Orleans.Streams.RabbitMqStreamProvider/ServiceCollectionExtensions.cs @@ -0,0 +1,28 @@ +using System; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using Orleans.Configuration; + +namespace Orleans.Hosting +{ + public static class ServiceCollectionExtensions + { + public static IServiceCollection AddRabbitMqStreams(this IServiceCollection services, string name, IConfiguration configuration) + { + return AddRabbitMqStreams(services, name, configureOptions => configureOptions.Bind(configuration)); + } + + public static IServiceCollection AddRabbitMqStreams(this IServiceCollection services, string name, Action options) + { + return AddRabbitMqStreams(services, name, configureOptions => configureOptions.Configure(options)); + } + + public static IServiceCollection AddRabbitMqStreams(this IServiceCollection services, string name, Action> configureOptions) + { + configureOptions(services.AddOptions(name)); + + return services; + } + } +} \ No newline at end of file diff --git a/Orleans.Streams.RabbitMqStreamProvider/SiloBuilderExtensions.cs b/Orleans.Streams.RabbitMqStreamProvider/SiloBuilderExtensions.cs index cbca06c..5b6e4cf 100644 --- a/Orleans.Streams.RabbitMqStreamProvider/SiloBuilderExtensions.cs +++ b/Orleans.Streams.RabbitMqStreamProvider/SiloBuilderExtensions.cs @@ -1,26 +1,157 @@ using System; +using Microsoft.Extensions.Options; +using Orleans.Configuration; using Orleans.Streaming; using Orleans.Streams.BatchContainer; namespace Orleans.Hosting { + public static class SiloHostBuilderExtensions + { + /// + /// Configure client to use RMQ persistent streams, using the . + /// + [Obsolete("Use 'UseRabbitMqStream")] + public static ISiloHostBuilder AddRabbitMqStream(this ISiloHostBuilder builder, string name, Action> configure = null) + { + return UseRabbitMqStreams(builder, name, configure); + } + + /// + /// Configure client to use RMQ persistent streams. + /// + [Obsolete("Use 'UseRabbitMqStream")] + public static ISiloHostBuilder AddRabbitMqStream(this ISiloHostBuilder builder, string name, Action> configure = null) + where TSerializer : IBatchContainerSerializer, new() + { + return UseRabbitMqStreams(builder, name, configure); + } + + /// + /// Configure client to use RMQ persistent streams, using the . + /// + public static ISiloHostBuilder UseRabbitMqStreams(this ISiloHostBuilder builder, string name, Action configureOptions) + { + return UseRabbitMqStreams(builder, name, configureOptions); + } + + /// + /// Configure client to use RMQ persistent streams. + /// + public static ISiloHostBuilder UseRabbitMqStreams(this ISiloHostBuilder builder, string name, Action configureOptions) + where TSerializer : IBatchContainerSerializer, new() + { + builder.UseRabbitMqStreams(name, b => b.ConfigureRabbitMq(ob => ob.Configure(configureOptions))); + + return builder; + } + + /// + /// Configure client to use RMQ persistent streams, using the . + /// + public static ISiloHostBuilder UseRabbitMqStreams(this ISiloHostBuilder builder, string name, Action> configureOptions) + { + return UseRabbitMqStreams(builder, name, configureOptions); + } + + /// + /// Configure client to use RMQ persistent streams. + /// + public static ISiloHostBuilder UseRabbitMqStreams(this ISiloHostBuilder builder, string name, Action> configureOptions) + where TSerializer : IBatchContainerSerializer, new() + { + builder.UseRabbitMqStreams(name, b => b.ConfigureRabbitMq(configureOptions)); + + return builder; + } + + /// + /// Configure client to use RMQ persistent streams, using the . + /// + public static ISiloHostBuilder UseRabbitMqStreams(this ISiloHostBuilder builder, string name, Action> configure = null) + { + return UseRabbitMqStreams(builder, name, configure); + } + + /// + /// Configure client to use RMQ persistent streams. + /// + public static ISiloHostBuilder UseRabbitMqStreams(this ISiloHostBuilder builder, string name, Action> configure = null) + where TSerializer : IBatchContainerSerializer, new() + { + var configurator = new SiloRabbitMqStreamConfigurator(name, + configureServicesDelegate => builder.ConfigureServices(configureServicesDelegate), + configureAppPartsDelegate => builder.ConfigureApplicationParts(configureAppPartsDelegate) + ); + + configure?.Invoke(configurator); + + return builder; + } + } + public static class SiloBuilderExtensions { /// - /// Configure silo to use RMQ persistent streams. - /// This version enables to inject a custom BacthContainer serializer. + /// Configure client to use RMQ persistent streams, using the . + /// + public static ISiloBuilder UseRabbitMqStreams(this ISiloBuilder builder, string name, Action configureOptions) + { + return UseRabbitMqStreams(builder, name, configureOptions); + } + + /// + /// Configure client to use RMQ persistent streams. + /// + public static ISiloBuilder UseRabbitMqStreams(this ISiloBuilder builder, string name, Action configureOptions) + where TSerializer : IBatchContainerSerializer, new() + { + builder.UseRabbitMqStreams(name, b => b.ConfigureRabbitMq(ob => ob.Configure(configureOptions))); + + return builder; + } + + /// + /// Configure client to use RMQ persistent streams, using the . + /// + public static ISiloBuilder UseRabbitMqStreams(this ISiloBuilder builder, string name, Action> configureOptions) + { + return UseRabbitMqStreams(builder, name, configureOptions); + } + + /// + /// Configure client to use RMQ persistent streams. /// - public static ISiloHostBuilder AddRabbitMqStream(this ISiloHostBuilder builder, string name, Action> configure) where TSerializer : IBatchContainerSerializer, new() + public static ISiloBuilder UseRabbitMqStreams(this ISiloBuilder builder, string name, Action> configureOptions) + where TSerializer : IBatchContainerSerializer, new() { - configure?.Invoke(new SiloRabbitMqStreamConfigurator(name, configDelegate => builder.ConfigureServices(configDelegate))); + builder.UseRabbitMqStreams(name, b => b.ConfigureRabbitMq(configureOptions)); + return builder; } /// - /// Configure silo to use RMQ persistent streams. - /// This version uses the default Orleans serializer. + /// Configure client to use RMQ persistent streams, using the . /// - public static ISiloHostBuilder AddRabbitMqStream(this ISiloHostBuilder builder, string name, Action> configure) - => AddRabbitMqStream(builder, name, configure); + public static ISiloBuilder UseRabbitMqStreams(this ISiloBuilder builder, string name, Action> configure = null) + { + return UseRabbitMqStreams(builder, name, configure); + } + + /// + /// Configure client to use RMQ persistent streams. + /// + public static ISiloBuilder UseRabbitMqStreams(this ISiloBuilder builder, string name, Action> configure = null) + where TSerializer : IBatchContainerSerializer, new() + { + var configurator = new SiloRabbitMqStreamConfigurator(name, + configureServicesDelegate => builder.ConfigureServices(configureServicesDelegate), + configureAppPartsDelegate => builder.ConfigureApplicationParts(configureAppPartsDelegate) + ); + + configure?.Invoke(configurator); + + return builder; + } } } \ No newline at end of file diff --git a/appveyor.yml b/appveyor.yml index 3615cc1..096cf58 100755 --- a/appveyor.yml +++ b/appveyor.yml @@ -18,7 +18,7 @@ install: - start /B /WAIT %rabbitmq_installer_path% /S - ps: (Get-Service -Name RabbitMQ).Status -version: 2.4.3.{build} +version: 3.0.0.{build} assembly_info: patch: true file: AssemblyInfo.cs