diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/AggregatorGrain.cs b/Orleans.Streams.RabbitMqStreamProvider.Tests/AggregatorGrain.cs index ca9dd44..8d492d4 100755 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/AggregatorGrain.cs +++ b/Orleans.Streams.RabbitMqStreamProvider.Tests/AggregatorGrain.cs @@ -15,7 +15,7 @@ public interface IAggregatorGrain : IGrainWithGuidKey Task MessageSent(Immutable message); Task MessageReceived(Immutable message); Task WereAllMessagesSent(Immutable messages); - Task WereAllSentAlsoDelivered(); + Task WereAllSentAlsoDelivered(int count); Task GetProcessingSilosCount(); Task GetAllSentMessages(); Task GetAllReceivedMessages(); @@ -45,14 +45,20 @@ public Task CleanUp() public Task MessageSent(Immutable message) { _logger.LogInformation($"MessageSent #{message.Value.Id} [{RuntimeIdentity}],[{IdentityString}] from thread {Thread.CurrentThread.Name}"); - _sentMessages.Add(message.Value.Id, message.Value); + if (_sentMessages.TryGetValue(message.Value.Id, out var value)) + value.Count++; + else + _sentMessages.Add(message.Value.Id, message.Value); return Task.CompletedTask; } public Task MessageReceived(Immutable message) { _logger.LogInformation($"MessageReceived #{message.Value.Id} [{RuntimeIdentity}],[{IdentityString}] from thread {Thread.CurrentThread.Name}"); - _receivedMessages.Add(message.Value.Id, message.Value); + if (_receivedMessages.TryGetValue(message.Value.Id, out var value)) + value.Count++; + else + _receivedMessages.Add(message.Value.Id, message.Value); return Task.CompletedTask; } @@ -63,11 +69,11 @@ public Task WereAllMessagesSent(Immutable messages) messages.Value.All(msg => _sentMessages.ContainsKey(msg.Id))); } - public Task WereAllSentAlsoDelivered() + public Task WereAllSentAlsoDelivered(int count) { return Task.FromResult( _sentMessages.Count == _receivedMessages.Count && - _sentMessages.Values.All(msg => _receivedMessages.ContainsKey(msg.Id) && _receivedMessages[msg.Id].Delivered)); + _sentMessages.Values.All(msg => _receivedMessages.TryGetValue(msg.Id, out var message) && message.Delivered && message.Count == count)); } public Task GetProcessingSilosCount() diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/App.config b/Orleans.Streams.RabbitMqStreamProvider.Tests/App.config index 478741b..23d8cda 100644 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/App.config +++ b/Orleans.Streams.RabbitMqStreamProvider.Tests/App.config @@ -1,97 +1,100 @@ - + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + + + + \ No newline at end of file diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/CacheTests.cs b/Orleans.Streams.RabbitMqStreamProvider.Tests/CacheTests.cs deleted file mode 100644 index cdba487..0000000 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/CacheTests.cs +++ /dev/null @@ -1,158 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Diagnostics; -using System.Linq; -using System.Threading.Tasks; -using NUnit.Framework; -using Orleans.Providers.Streams.Common; -using Orleans.Streams; -using Orleans.Streams.BatchContainer; -using Orleans.Streams.Cache; - -namespace RabbitMqStreamTests -{ - [TestFixture] - public class CacheTests - { - [Test] - public async Task RunCacheReadHeavyUsageTestMultipleTimes() - { - for (int i = 0; i < 10; i++) - { - await TestOrleansCacheReadHeavyUsage(); - } - } - - [Test] - public async Task RunCacheWriteHeavyUsageTestMultipleTimes() - { - for (int i = 0; i < 10; i++) - { - await TestOrleansCacheWriteHeavyUsage(); - } - } - - [Test] - public async Task TestOrleansCacheCursorReadHeavyUsage() - { - int cacheSize = 1000000; - var queueCache = new ConcurrentQueueCache(cacheSize); - queueCache.AddToCache(GetQueueMessages(queueCache.GetMaxAddCount())); - - var watch = Stopwatch.StartNew(); - using (var cursor = queueCache.GetCacheCursor(StreamIdentity, null)) - { - var tasks = new List(); - for (int i = 0; i < 10; i++) - { - tasks.Add(Task.Run(() => - { - while (cursor.MoveNext()) - { - var batch = cursor.GetCurrent(out var ignore); - } - })); - } - - await Task.WhenAll(tasks); - } - watch.Stop(); - Console.WriteLine($"Read {cacheSize} items in {watch.ElapsedMilliseconds} ms"); - // without locking ~170ms, with locking ~680ms, thus ~4x slower - - queueCache.TryPurgeFromCache(out var finalPurgedItems); - - Assert.AreEqual(cacheSize, finalPurgedItems.Count, "purged items"); - Assert.AreEqual(cacheSize, queueCache.GetMaxAddCount(), "cache size"); - } - - private async Task TestOrleansCacheReadHeavyUsage() - { - var queueCache = new ConcurrentQueueCache(CacheSize); - - var tasks = new List(); - for (int i = 0; i < 10; i++) - { - queueCache.TryPurgeFromCache(out var purgedItems); - - var multiBatch = GetQueueMessages(new Random().Next(queueCache.GetMaxAddCount())); - queueCache.AddToCache(multiBatch); - - if (multiBatch.Count > 0) - { - tasks.Add(Task.Run(async () => - { - using (var cursor = queueCache.GetCacheCursor(StreamIdentity, multiBatch.First().SequenceToken)) - { - while (cursor.MoveNext()) - { - var batch = cursor.GetCurrent(out var ignore); - await Task.Delay(TimeSpan.FromMilliseconds(50 + new Random().Next(100))); - } - } - })); - } - await Task.Delay(TimeSpan.FromMilliseconds(200)); - } - - await Task.WhenAll(tasks); - queueCache.TryPurgeFromCache(out var finalPurgedItems); - - Assert.AreEqual(CacheSize, queueCache.GetMaxAddCount()); - } - - private async Task TestOrleansCacheWriteHeavyUsage() - { - var queueCache = new ConcurrentQueueCache(CacheSize); - - var tasks = new List(); - for (int i = 0; i < 1000; i++) - { - queueCache.TryPurgeFromCache(out var purgedItems); - - var multiBatch = GetQueueMessages(queueCache.GetMaxAddCount()); - queueCache.AddToCache(multiBatch); - - if (multiBatch.Count > 0) - { - tasks.Add(Task.Run(async () => - { - using (var cursor = queueCache.GetCacheCursor(StreamIdentity, multiBatch.First().SequenceToken)) - { - while (cursor.MoveNext()) - { - var batch = cursor.GetCurrent(out var ignore); - await Task.Delay(TimeSpan.FromMilliseconds(50 + new Random().Next(100))); - } - } - })); - } - } - - await Task.WhenAll(tasks); - queueCache.TryPurgeFromCache(out var finalPurgedItems); - - Assert.AreEqual(CacheSize, queueCache.GetMaxAddCount()); - } - - private const int CacheSize = 50; - - private static readonly StreamIdentity StreamIdentity = new StreamIdentity(Guid.Empty, "test"); - private static ulong _deliveryTag = 0; - - private static IList GetQueueMessages(int count) - { - return Enumerable.Range(0, count) - .Select(i => - new RabbitMqBatchContainer( - StreamIdentity.Guid, StreamIdentity.Namespace, - new[] {"value"}.ToList(), - new Dictionary()) - { - DeliveryTag = _deliveryTag++, - EventSequenceToken = new EventSequenceToken((long) _deliveryTag) - }) - .ToList(); - } - } -} \ No newline at end of file diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/IntegrationTestHelpers.cs b/Orleans.Streams.RabbitMqStreamProvider.Tests/IntegrationTestHelpers.cs index 75ef87e..dc00608 100644 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/IntegrationTestHelpers.cs +++ b/Orleans.Streams.RabbitMqStreamProvider.Tests/IntegrationTestHelpers.cs @@ -10,6 +10,8 @@ namespace RabbitMqStreamTests { internal static class IntegrationTestHelpers { + private const int ReceiversCount = 2; // Number of ReceiverGrain implicitly attached to stream + public static async Task TestRmqStreamProviderWithPrefilledQueue(this TestCluster cluster, Action setupProxyForSender, Action setupProxyForReceiver, int nMessages, int itersToWait, RmqSerializer serializer = RmqSerializer.Default) { await cluster.StopPullingAgents(); @@ -75,15 +77,15 @@ public static async Task TestRmqStreamProviderOnFly(this TestCluster cluster, Ac private static async Task AllMessagesSentAndDelivered(IAggregatorGrain aggregator, Message[] messages) => await aggregator.WereAllMessagesSent(messages.AsImmutable()) && - await aggregator.WereAllSentAlsoDelivered(); + await aggregator.WereAllSentAlsoDelivered(ReceiversCount); private static async Task PrintError(IAggregatorGrain aggregator, Message[] messages) { var sb = new StringBuilder(); sb.AppendLine("Expectation failed!"); - sb.AppendLine($" -expected: {string.Join(',', messages.OrderBy(m => m.Id).Select(m => m.Id))}"); - sb.AppendLine($" -sent : {string.Join(',', (await aggregator.GetAllSentMessages()).OrderBy(m => m.Id).Select(m => m.Id))}"); - sb.AppendLine($" -received: {string.Join(',', (await aggregator.GetAllReceivedMessages()).OrderBy(m => m.Id).Select(m => m.Id))}"); + sb.AppendLine($" -expected: {string.Join(',', messages.OrderBy(m => m.Id).Select(m => $"{m.Id} ({ReceiversCount})"))}"); + sb.AppendLine($" -sent : {string.Join(',', (await aggregator.GetAllSentMessages()).OrderBy(m => m.Id).Select(m => $"{m.Id} ({m.Count})"))}"); + sb.AppendLine($" -received: {string.Join(',', (await aggregator.GetAllReceivedMessages()).OrderBy(m => m.Id).Select(m => $"{m.Id} ({m.Count})"))}"); return sb.ToString(); } } diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/Message.cs b/Orleans.Streams.RabbitMqStreamProvider.Tests/Message.cs index cb895f4..1857b0d 100755 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/Message.cs +++ b/Orleans.Streams.RabbitMqStreamProvider.Tests/Message.cs @@ -19,6 +19,9 @@ public class Message [ProtoMember(4)] public readonly string ProcessedBy; + [ProtoMember(5)] + public int Count = 1; + public Message() { } @@ -45,7 +48,8 @@ public override bool Equals(object obj) return msg != null && Id == msg.Id && Delivered == msg.Delivered - && ProcessedBy == msg.ProcessedBy; + && ProcessedBy == msg.ProcessedBy + && Count == msg.Count; } public override int GetHashCode() @@ -56,6 +60,7 @@ public override int GetHashCode() hashCode = (hashCode * 397) ^ WorkTimeOutMillis.GetHashCode(); hashCode = (hashCode * 397) ^ Delivered.GetHashCode(); hashCode = (hashCode * 397) ^ (ProcessedBy?.GetHashCode() ?? 0); + hashCode = (hashCode * 397) ^ Count.GetHashCode(); return hashCode; } } diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/Orleans.Streams.RabbitMqStreamProvider.Tests.csproj b/Orleans.Streams.RabbitMqStreamProvider.Tests/Orleans.Streams.RabbitMqStreamProvider.Tests.csproj old mode 100755 new mode 100644 index 0667c5b..075e1ff --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/Orleans.Streams.RabbitMqStreamProvider.Tests.csproj +++ b/Orleans.Streams.RabbitMqStreamProvider.Tests/Orleans.Streams.RabbitMqStreamProvider.Tests.csproj @@ -11,15 +11,15 @@ - + all runtime; build; native; contentfiles; analyzers - - - - - + + + + + diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/Properties/AssemblyInfo.cs b/Orleans.Streams.RabbitMqStreamProvider.Tests/Properties/AssemblyInfo.cs index f00fae8..6388f45 100755 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/Properties/AssemblyInfo.cs +++ b/Orleans.Streams.RabbitMqStreamProvider.Tests/Properties/AssemblyInfo.cs @@ -1,19 +1,6 @@ -using System.Reflection; +using System.Reflection; using System.Runtime.InteropServices; -[assembly: AssemblyTitle("Orleans.Streams.RabbitMqStreamProvider.Tests")] -[assembly: AssemblyDescription("")] -[assembly: AssemblyConfiguration("")] -[assembly: AssemblyCompany("")] -[assembly: AssemblyProduct("Orleans.Streams.RabbitMqStreamProvider.Tests")] -[assembly: AssemblyCopyright("Copyright © Martin Ovesny 2017")] -[assembly: AssemblyTrademark("")] -[assembly: AssemblyCulture("")] - [assembly: ComVisible(false)] [assembly: Guid("bee3bdc1-33a5-4c5e-a7f3-a0fa050d99f4")] - -// [assembly: AssemblyVersion("1.0.*")] -[assembly: AssemblyVersion("1.0.0.0")] -[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/ReceiverGrain.cs b/Orleans.Streams.RabbitMqStreamProvider.Tests/ReceiverGrain.cs index 112676b..1c6eafa 100755 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/ReceiverGrain.cs +++ b/Orleans.Streams.RabbitMqStreamProvider.Tests/ReceiverGrain.cs @@ -9,13 +9,20 @@ namespace RabbitMqStreamTests { - public interface IReceiverGrain : IGrainWithGuidKey - { - } + public interface IFirstReceiverGrain : IGrainWithGuidKey { } + public interface ISecondReceiverGrain : IGrainWithGuidKey { } + + [ImplicitStreamSubscription(Globals.StreamNameSpaceDefault)] + [ImplicitStreamSubscription(Globals.StreamNameSpaceProtoBuf)] + public class FirstReceiverGrain : ReceiverGrain, IFirstReceiverGrain + { } [ImplicitStreamSubscription(Globals.StreamNameSpaceDefault)] [ImplicitStreamSubscription(Globals.StreamNameSpaceProtoBuf)] - public class ReceiverGrain : Grain, IReceiverGrain + public class SecondReceiverGrain : ReceiverGrain, ISecondReceiverGrain + { } + + public abstract class ReceiverGrain : Grain { private ILogger _logger; private StreamSubscriptionHandle _subscriptionDefault; @@ -24,7 +31,7 @@ public class ReceiverGrain : Grain, IReceiverGrain public override async Task OnActivateAsync() { await base.OnActivateAsync(); - _logger = ServiceProvider.GetRequiredService().CreateLogger($"{typeof(ReceiverGrain).FullName}.{this.GetPrimaryKey()}"); + _logger = ServiceProvider.GetRequiredService().CreateLogger($"{GetType().FullName}.{this.GetPrimaryKey()}"); _logger.LogInformation($"OnActivateAsync [{RuntimeIdentity}],[{IdentityString}][{this.GetPrimaryKey()}] from thread {Thread.CurrentThread.Name}"); _subscriptionDefault = await GetStreamProvider(Globals.StreamProviderNameDefault) .GetStream(this.GetPrimaryKey(), Globals.StreamNameSpaceDefault) diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/RmqHelpers.cs b/Orleans.Streams.RabbitMqStreamProvider.Tests/RmqHelpers.cs index 2d67771..dbaa113 100644 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/RmqHelpers.cs +++ b/Orleans.Streams.RabbitMqStreamProvider.Tests/RmqHelpers.cs @@ -1,4 +1,5 @@ using RabbitMQ.Client; +using System; namespace RabbitMqStreamTests { @@ -21,11 +22,18 @@ public static void EnsureEmptyQueue() Password = "guest" }; - using (var connection = factory.CreateConnection()) - using (var channel = connection.CreateModel()) + try { - channel.QueuePurge(Globals.StreamNameSpaceDefault); - channel.QueuePurge(Globals.StreamNameSpaceProtoBuf); + using (var connection = factory.CreateConnection()) + using (var channel = connection.CreateModel()) + { + channel.QueuePurge(Globals.StreamNameSpaceDefault); + channel.QueuePurge(Globals.StreamNameSpaceProtoBuf); + } + } + catch (Exception e) + { + Console.WriteLine(e); } } } diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/TestCluster.cs b/Orleans.Streams.RabbitMqStreamProvider.Tests/TestCluster.cs index 325aff3..7ce792d 100644 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/TestCluster.cs +++ b/Orleans.Streams.RabbitMqStreamProvider.Tests/TestCluster.cs @@ -67,7 +67,6 @@ public static async Task Create() .UseLocalhostClustering(siloPort: 11111, gatewayPort: 30000) .Configure(options => { - options.ExpectedClusterSize = 2; options.UseLivenessGossip = true; options.ProbeTimeout = TimeSpan.FromSeconds(5); options.NumMissedProbesLimit = 3; @@ -80,7 +79,6 @@ public static async Task Create() primarySiloEndpoint: new IPEndPoint(IPAddress.Loopback, EndpointOptions.DEFAULT_SILO_PORT)) .Configure(options => { - options.ExpectedClusterSize = 2; options.UseLivenessGossip = true; options.ProbeTimeout = TimeSpan.FromSeconds(5); options.NumMissedProbesLimit = 3; @@ -117,6 +115,7 @@ internal static class ClusterBuilderExtentions public static ISiloHostBuilder ConfigureStreamsAndLogging(this ISiloHostBuilder builder) { return builder + .ConfigureApplicationParts(p => p.AddApplicationPart(typeof(IAggregatorGrain).Assembly)) .AddMemoryGrainStorage("PubSubStore") .AddRabbitMqStream(Globals.StreamProviderNameDefault, configurator => { @@ -133,7 +132,7 @@ public static ISiloHostBuilder ConfigureStreamsAndLogging(this ISiloHostBuilder .AddRabbitMqStream(Globals.StreamProviderNameProtoBuf, configurator => { configurator.ConfigureRabbitMq(host: "localhost", port: ToxiProxyHelpers.RmqProxyPort, - virtualHost: "/", user: "guest", password: "guest", queueName: Globals.StreamNameSpaceProtoBuf); + virtualHost: "/", user: "guest", password: "guest", queueName: Globals.StreamNameSpaceDefault); configurator.ConfigureCache(cacheSize: 100, cacheFillingTimeout: TimeSpan.FromSeconds(10)); configurator.ConfigureStreamPubSub(StreamPubSubType.ImplicitOnly); configurator.ConfigurePullingAgent(ob => ob.Configure( @@ -152,6 +151,7 @@ public static ISiloHostBuilder ConfigureStreamsAndLogging(this ISiloHostBuilder public static IClientBuilder ConfigureStreamsAndLogging(this IClientBuilder builder) { return builder + .ConfigureApplicationParts(p => p.AddApplicationPart(typeof(IAggregatorGrain).Assembly)) .AddRabbitMqStream(Globals.StreamProviderNameDefault, configurator => { configurator.ConfigureRabbitMq(host: "localhost", port: ToxiProxyHelpers.RmqProxyPort, @@ -160,7 +160,7 @@ public static IClientBuilder ConfigureStreamsAndLogging(this IClientBuilder buil .AddRabbitMqStream(Globals.StreamProviderNameProtoBuf, configurator => { configurator.ConfigureRabbitMq(host: "localhost", port: ToxiProxyHelpers.RmqProxyPort, - virtualHost: "/", user: "guest", password: "guest", queueName: Globals.StreamNameSpaceProtoBuf); + virtualHost: "/", user: "guest", password: "guest", queueName: Globals.StreamNameSpaceDefault); }) .ConfigureLogging(log => log .ClearProviders() diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/ToxiProxyHelpers.cs b/Orleans.Streams.RabbitMqStreamProvider.Tests/ToxiProxyHelpers.cs index b0ddbb8..90fa36b 100644 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/ToxiProxyHelpers.cs +++ b/Orleans.Streams.RabbitMqStreamProvider.Tests/ToxiProxyHelpers.cs @@ -1,4 +1,5 @@ using System.Diagnostics; +using System.IO; using Toxiproxy.Net; using Toxiproxy.Net.Toxics; diff --git a/Orleans.Streams.RabbitMqStreamProvider/BatchContainer/RabbitMqBatchContainer.cs b/Orleans.Streams.RabbitMqStreamProvider/BatchContainer/RabbitMqBatchContainer.cs index d45f29c..9f44899 100644 --- a/Orleans.Streams.RabbitMqStreamProvider/BatchContainer/RabbitMqBatchContainer.cs +++ b/Orleans.Streams.RabbitMqStreamProvider/BatchContainer/RabbitMqBatchContainer.cs @@ -40,7 +40,7 @@ public IEnumerable> GetEvents() .Select((e, i) => Tuple.Create(e, EventSequenceToken?.CreateSequenceTokenForEvent(i))) .ToList(); - public bool ShouldDeliver(IStreamIdentity stream, object filterData, StreamFilterPredicate shouldReceiveFunc) - => _events.Any(item => shouldReceiveFunc(stream, filterData, item)); + public bool ShouldDeliver(IStreamIdentity stream, object filterData, StreamFilterPredicate shouldReceiveFunc) => + _events.Any(item => shouldReceiveFunc(stream, filterData, item)); } } \ No newline at end of file diff --git a/Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueAdapterCache.cs b/Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueAdapterCache.cs deleted file mode 100644 index 53b4319..0000000 --- a/Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueAdapterCache.cs +++ /dev/null @@ -1,29 +0,0 @@ -using System; -using System.Collections.Concurrent; - -namespace Orleans.Streams.Cache -{ - public class ConcurrentQueueAdapterCache : IQueueAdapterCache - { - private readonly int _cacheSize; - private readonly ConcurrentDictionary _caches; - - public ConcurrentQueueAdapterCache(int cacheSize) - { - if (cacheSize <= 0) throw new ArgumentOutOfRangeException(nameof(cacheSize), "CacheSize must be a positive number."); - _cacheSize = cacheSize; - _caches = new ConcurrentDictionary(); - } - - /// - /// Create a cache for a given queue id - /// - /// - public IQueueCache CreateQueueCache(QueueId queueId) - { - return _caches.AddOrUpdate(queueId, - id => new ConcurrentQueueCache(_cacheSize), - (id, queueCache) => queueCache); - } - } -} diff --git a/Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueCache.cs b/Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueCache.cs deleted file mode 100644 index f3e31e0..0000000 --- a/Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueCache.cs +++ /dev/null @@ -1,90 +0,0 @@ -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Threading; -using Orleans.Streams.BatchContainer; - -namespace Orleans.Streams.Cache -{ - /// - /// A queue cache that keeps items in memory - /// - /// The dictionary is net kept clean, so if there is huge number of random streams the dictionary will keep growing - /// - public class ConcurrentQueueCache : IQueueCache - { - // StreamIdentity = = - private readonly ConcurrentDictionary, ConcurrentQueue> _cache; - private readonly ConcurrentQueue _itemsToPurge; - private readonly int _maxCacheSize; - - private int _numItemsInCache; - - public ConcurrentQueueCache(int cacheSize) - { - _maxCacheSize = cacheSize; - _cache = new ConcurrentDictionary, ConcurrentQueue>(); - _itemsToPurge = new ConcurrentQueue(); - } - - /// - /// The limit of the maximum number of items that can be added. - /// - /// Returns just an estimate. It doesn't have to be exact. It would require extra locking. - /// - /// Note: there is a condition in pulling agent that if maxAddCount is less than 0 and not equal to -1 (unlimited), - /// it will skip the reading cycle including cache purging - /// - public int GetMaxAddCount() => Math.Max(1, _maxCacheSize - _numItemsInCache); - - public bool IsUnderPressure() => _numItemsInCache >= _maxCacheSize; - - public void AddToCache(IList messages) - { - foreach (var msg in messages) - { - _cache.GetOrAdd(new Tuple(msg.StreamGuid, msg.StreamNamespace), - new ConcurrentQueue()) - .Enqueue((RabbitMqBatchContainer) msg); - } - Interlocked.Add(ref _numItemsInCache, messages.Count); - } - - public bool TryPurgeFromCache(out IList purgedItems) - { - purgedItems = null; - if (!_itemsToPurge.IsEmpty) - { - purgedItems = new List(); - while (_itemsToPurge.TryDequeue(out var item)) - { - purgedItems.Add(item); - } - Interlocked.Add(ref _numItemsInCache, -purgedItems.Count); - } - return purgedItems?.Count > 0; - } - - /// - /// Here we ignore the token since this is not rewindable stream anyway - /// - public IQueueCacheCursor GetCacheCursor(IStreamIdentity streamIdentity, StreamSequenceToken token) - { - return new ConcurrentQueueCacheCursor( - moveNext: () => - { - RabbitMqBatchContainer item = null; - _cache.TryGetValue(new Tuple(streamIdentity.Guid, streamIdentity.Namespace), out var queue); - queue?.TryDequeue(out item); - return item; - }, - purgeItem: item => - { - if (item != null) - { - _itemsToPurge.Enqueue(item); - } - }); - } - } -} diff --git a/Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueCacheCursor.cs b/Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueCacheCursor.cs deleted file mode 100644 index 9f7bb47..0000000 --- a/Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueCacheCursor.cs +++ /dev/null @@ -1,59 +0,0 @@ -using System; -using Orleans.Streams.BatchContainer; - -namespace Orleans.Streams.Cache -{ - public class ConcurrentQueueCacheCursor : IQueueCacheCursor - { - private readonly Func _moveNext; - private readonly Action _purgeItem; - - private readonly object _syncRoot = new object(); - - private RabbitMqBatchContainer _current; - - public ConcurrentQueueCacheCursor(Func moveNext, Action purgeItem) - { - _moveNext = moveNext ?? throw new ArgumentNullException(nameof(moveNext)); - _purgeItem = purgeItem ?? throw new ArgumentNullException(nameof(purgeItem)); - } - - public void Dispose() - { - lock (_syncRoot) - { - _purgeItem(_current); - _current = null; - } - } - - public IBatchContainer GetCurrent(out Exception exception) - { - exception = null; - return _current; - } - - public bool MoveNext() - { - lock (_syncRoot) - { - _purgeItem(_current); - _current = _moveNext(); - } - return _current != null; - } - - public void Refresh(StreamSequenceToken token) - { - // do nothing - } - - public void RecordDeliveryFailure() - { - if (_current != null) - { - _current.DeliveryFailure = true; - } - } - } -} \ No newline at end of file diff --git a/Orleans.Streams.RabbitMqStreamProvider/ClientBuilderExtensions.cs b/Orleans.Streams.RabbitMqStreamProvider/ClientBuilderExtensions.cs index 3d3ba1c..7b6a207 100644 --- a/Orleans.Streams.RabbitMqStreamProvider/ClientBuilderExtensions.cs +++ b/Orleans.Streams.RabbitMqStreamProvider/ClientBuilderExtensions.cs @@ -1,6 +1,8 @@ -using System; -using Orleans.Streaming; +using Microsoft.Extensions.DependencyInjection; +using Orleans.Configuration; +using Orleans.Streams; using Orleans.Streams.BatchContainer; +using System; namespace Orleans.Hosting { @@ -16,11 +18,30 @@ public static class ClientBuilderExtensions return builder; } + public class ClusterClientRabbitMqStreamConfigurator : ClusterClientPersistentStreamConfigurator, IClusterClientAzureQueueStreamConfigurator where TSerializer : IBatchContainerSerializer, new() + { + public ClusterClientRabbitMqStreamConfigurator(string name, IClientBuilder builder) + : base(name, builder, RabbitMqAdapterFactory.Create) + { + this.ConfigureComponent(SimpleQueueCacheOptionsValidator.Create); + + builder + .ConfigureApplicationParts(RabbitMQStreamConfiguratorCommon.AddParts) + .ConfigureServices(services => + services.ConfigureNamedOptionForLogging(name) + .AddTransient(sp => new RabbitMqOptionsValidator(sp.GetOptionsByName(name), name)) + .ConfigureNamedOptionForLogging(name)); + + } + } + /// /// Configure client to use RMQ persistent streams. /// This version uses the default Orleans serializer. /// public static IClientBuilder AddRabbitMqStream(this IClientBuilder builder, string name, Action> configure) => AddRabbitMqStream(builder, name, configure); + + public interface IClusterClientAzureQueueStreamConfigurator : IRabbitMQStreamConfigurator, IClusterClientPersistentStreamConfigurator { } } } \ No newline at end of file diff --git a/Orleans.Streams.RabbitMqStreamProvider/Orleans.Streams.RabbitMqStreamProvider.csproj b/Orleans.Streams.RabbitMqStreamProvider/Orleans.Streams.RabbitMqStreamProvider.csproj old mode 100755 new mode 100644 index e004f90..926e10a --- a/Orleans.Streams.RabbitMqStreamProvider/Orleans.Streams.RabbitMqStreamProvider.csproj +++ b/Orleans.Streams.RabbitMqStreamProvider/Orleans.Streams.RabbitMqStreamProvider.csproj @@ -1,12 +1,9 @@ - - + netstandard2.0 - - + - - + \ No newline at end of file diff --git a/Orleans.Streams.RabbitMqStreamProvider/RabbitMqAdapterFactory.cs b/Orleans.Streams.RabbitMqStreamProvider/RabbitMqAdapterFactory.cs index 46d150c..475f111 100755 --- a/Orleans.Streams.RabbitMqStreamProvider/RabbitMqAdapterFactory.cs +++ b/Orleans.Streams.RabbitMqStreamProvider/RabbitMqAdapterFactory.cs @@ -1,11 +1,11 @@ -using System; -using System.Threading.Tasks; -using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Orleans.Configuration; +using Orleans.Providers.Streams.Common; using Orleans.Serialization; using Orleans.Streams.BatchContainer; -using Orleans.Streams.Cache; +using System; +using System.Threading.Tasks; namespace Orleans.Streams { @@ -29,7 +29,7 @@ public RabbitMqAdapterFactory( if (serviceProvider == null) throw new ArgumentNullException(nameof(serviceProvider)); if (loggerFactory == null) throw new ArgumentNullException(nameof(loggerFactory)); - _cache = new ConcurrentQueueAdapterCache(cachingOptions.CacheSize); + _cache = new SimpleQueueAdapterCache(new SimpleQueueCacheOptions { CacheSize = cachingOptions.CacheSize }, providerName, loggerFactory); _mapper = new HashRingBasedStreamQueueMapper(new HashRingStreamQueueMapperOptions { TotalQueueCount = rmqOptions.NumberOfQueues }, rmqOptions.QueueNamePrefix); _failureHandler = Task.FromResult(new NoOpStreamDeliveryFailureHandler(false)); diff --git a/Orleans.Streams.RabbitMqStreamProvider/RabbitMqStreamBuilder.cs b/Orleans.Streams.RabbitMqStreamProvider/RabbitMqStreamBuilder.cs deleted file mode 100644 index 07b578e..0000000 --- a/Orleans.Streams.RabbitMqStreamProvider/RabbitMqStreamBuilder.cs +++ /dev/null @@ -1,89 +0,0 @@ -using System; -using Microsoft.Extensions.DependencyInjection; -using Orleans.Configuration; -using Orleans.Streams; -using Orleans.Streams.BatchContainer; - -namespace Orleans.Streaming -{ - public class SiloRabbitMqStreamConfigurator : SiloPersistentStreamConfigurator where TSerializer : IBatchContainerSerializer, new() - { - public SiloRabbitMqStreamConfigurator(string name, Action> configureDelegate) - : base(name, configureDelegate, RabbitMqAdapterFactory.Create) - { - this.configureDelegate(services => - { - services.ConfigureNamedOptionForLogging(name) - .AddTransient(sp => new RabbitMqOptionsValidator(sp.GetOptionsByName(name), name)) - .ConfigureNamedOptionForLogging(name) - .AddTransient(sp => new CachingOptionsValidator(sp.GetOptionsByName(name), name)); - }); - } - - public SiloRabbitMqStreamConfigurator ConfigureRabbitMq(string host, int port, string virtualHost, string user, string password, string queueName, bool useQueuePartitioning = RabbitMqOptions.DefaultUseQueuePartitioning, int numberOfQueues = RabbitMqOptions.DefaultNumberOfQueues) - { - Configure(ob => ob.Configure(options => - { - options.HostName = host; - options.Port = port; - options.VirtualHost = virtualHost; - options.UserName = user; - options.Password = password; - options.QueueNamePrefix = queueName; - options.UseQueuePartitioning = useQueuePartitioning; - options.NumberOfQueues = numberOfQueues; - })); - return this; - } - - public SiloRabbitMqStreamConfigurator ConfigureCache(int cacheSize) - { - Configure(ob => ob.Configure(options => options.CacheSize = cacheSize)); - return this; - } - - public SiloRabbitMqStreamConfigurator ConfigureCache(int cacheSize, TimeSpan cacheFillingTimeout) - { - Configure(ob => ob.Configure(options => - { - options.CacheSize = cacheSize; - options.CacheFillingTimeout = cacheFillingTimeout; - })); - return this; - } - } - - public class ClusterClientRabbitMqStreamConfigurator : ClusterClientPersistentStreamConfigurator where TSerializer : IBatchContainerSerializer, new() - { - public ClusterClientRabbitMqStreamConfigurator(string name, IClientBuilder builder) - : base(name, builder, RabbitMqAdapterFactory.Create) - { - clientBuilder - .ConfigureApplicationParts(parts => parts.AddFrameworkPart(typeof(RabbitMqAdapterFactory).Assembly)) - .ConfigureServices(services => services - .ConfigureNamedOptionForLogging(name) - .AddTransient(sp => new RabbitMqOptionsValidator(sp.GetOptionsByName(name), name)) - .ConfigureNamedOptionForLogging(name)); - - } - - public ClusterClientRabbitMqStreamConfigurator ConfigureRabbitMq( - string host, int port, string virtualHost, string user, string password, string queueName, - bool useQueuePartitioning = RabbitMqOptions.DefaultUseQueuePartitioning, - int numberOfQueues = RabbitMqOptions.DefaultNumberOfQueues) - { - Configure(ob => ob.Configure(options => - { - options.HostName = host; - options.Port = port; - options.VirtualHost = virtualHost; - options.UserName = user; - options.Password = password; - options.QueueNamePrefix = queueName; - options.UseQueuePartitioning = useQueuePartitioning; - options.NumberOfQueues = numberOfQueues; - })); - return this; - } - } -} \ No newline at end of file diff --git a/Orleans.Streams.RabbitMqStreamProvider/SiloBuilderExtensions.cs b/Orleans.Streams.RabbitMqStreamProvider/SiloBuilderExtensions.cs index cbca06c..824a22a 100644 --- a/Orleans.Streams.RabbitMqStreamProvider/SiloBuilderExtensions.cs +++ b/Orleans.Streams.RabbitMqStreamProvider/SiloBuilderExtensions.cs @@ -1,6 +1,10 @@ -using System; -using Orleans.Streaming; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using Orleans.ApplicationParts; +using Orleans.Configuration; +using Orleans.Streams; using Orleans.Streams.BatchContainer; +using System; namespace Orleans.Hosting { @@ -12,7 +16,7 @@ public static class SiloBuilderExtensions /// public static ISiloHostBuilder AddRabbitMqStream(this ISiloHostBuilder builder, string name, Action> configure) where TSerializer : IBatchContainerSerializer, new() { - configure?.Invoke(new SiloRabbitMqStreamConfigurator(name, configDelegate => builder.ConfigureServices(configDelegate))); + configure?.Invoke(new SiloRabbitMqStreamConfigurator(name, configDelegate => builder.ConfigureServices(configDelegate), builder)); return builder; } @@ -23,4 +27,66 @@ public static class SiloBuilderExtensions public static ISiloHostBuilder AddRabbitMqStream(this ISiloHostBuilder builder, string name, Action> configure) => AddRabbitMqStream(builder, name, configure); } + + public interface IRabbitMQStreamConfigurator : INamedServiceConfigurator { } + + public class SiloRabbitMqStreamConfigurator : SiloPersistentStreamConfigurator, ISiloRabbitMQStreamConfigurator where TSerializer : IBatchContainerSerializer, new() + { + public SiloRabbitMqStreamConfigurator(string name, Action> configureDelegate, ISiloHostBuilder builder) + : base(name, configureDelegate, RabbitMqAdapterFactory.Create) + { + this.ConfigureComponent(SimpleQueueCacheOptionsValidator.Create); + builder.ConfigureApplicationParts(RabbitMQStreamConfiguratorCommon.AddParts); + + ConfigureDelegate(services => + { + services + .ConfigureNamedOptionForLogging(name) + .AddTransient(sp => new RabbitMqOptionsValidator(sp.GetOptionsByName(name), name)); + }); + } + } + + public static class RabbitMQStreamConfiguratorExtensions + { + public static void ConfigureRabbitMQ(this IRabbitMQStreamConfigurator configurator, Action> configureOptions) + { + configurator.Configure(configureOptions); + } + + public static void ConfigureRabbitMq(this IRabbitMQStreamConfigurator configurator, string host, int port, string virtualHost, string user, string password, string queueName, bool useQueuePartitioning = RabbitMqOptions.DefaultUseQueuePartitioning, int numberOfQueues = RabbitMqOptions.DefaultNumberOfQueues) + { + configurator.Configure(ob => ob.Configure(options => + { + options.HostName = host; + options.Port = port; + options.VirtualHost = virtualHost; + options.UserName = user; + options.Password = password; + options.QueueNamePrefix = queueName; + options.UseQueuePartitioning = useQueuePartitioning; + options.NumberOfQueues = numberOfQueues; + })); + } + + public static void ConfigureCache(this IRabbitMQStreamConfigurator configurator, int cacheSize) => + configurator.Configure(ob => ob.Configure(options => options.CacheSize = cacheSize)); + + public static void ConfigureCache(this IRabbitMQStreamConfigurator configurator, int cacheSize, TimeSpan cacheFillingTimeout) => + configurator.Configure(ob => ob.Configure(options => + { + options.CacheSize = cacheSize; + options.CacheFillingTimeout = cacheFillingTimeout; + })); + } + + public interface ISiloRabbitMQStreamConfigurator : IRabbitMQStreamConfigurator, ISiloPersistentStreamConfigurator { } + + public static class RabbitMQStreamConfiguratorCommon where TSerializer : IBatchContainerSerializer, new() + { + public static void AddParts(IApplicationPartManager parts) + { + parts.AddFrameworkPart(typeof(RabbitMqAdapterFactory).Assembly); + } + } } \ No newline at end of file diff --git a/nuget.config b/nuget.config new file mode 100644 index 0000000..7fc940a --- /dev/null +++ b/nuget.config @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file