From 98341a0cbeeceeebfcd4cfa51f9cf5e9c8313312 Mon Sep 17 00:00:00 2001 From: Sergey Zorchenko Date: Thu, 29 Aug 2019 12:42:25 +0300 Subject: [PATCH 1/5] working version --- .../App.config | 97 ++++++++++--------- ...treams.RabbitMqStreamProvider.Tests.csproj | 2 +- .../RmqHelpers.cs | 24 +++-- .../TestCluster.cs | 8 +- .../ToxiProxyHelpers.cs | 2 +- .../packages.config | 26 ++--- .../BatchContainer/RabbitMqBatchContainer.cs | 5 +- .../Cache/ConcurrentQueueAdapterCache.cs | 29 ------ .../Cache/ConcurrentQueueCache.cs | 90 ----------------- .../Cache/ConcurrentQueueCacheCursor.cs | 59 ----------- .../RabbitMqAdapterFactory.cs | 3 +- 11 files changed, 90 insertions(+), 255 deletions(-) delete mode 100644 Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueAdapterCache.cs delete mode 100644 Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueCache.cs delete mode 100644 Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueCacheCursor.cs diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/App.config b/Orleans.Streams.RabbitMqStreamProvider.Tests/App.config index 478741b..23d8cda 100644 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/App.config +++ b/Orleans.Streams.RabbitMqStreamProvider.Tests/App.config @@ -1,97 +1,100 @@ - + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + + + + \ No newline at end of file diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/Orleans.Streams.RabbitMqStreamProvider.Tests.csproj b/Orleans.Streams.RabbitMqStreamProvider.Tests/Orleans.Streams.RabbitMqStreamProvider.Tests.csproj index 9ee4162..92adcfd 100755 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/Orleans.Streams.RabbitMqStreamProvider.Tests.csproj +++ b/Orleans.Streams.RabbitMqStreamProvider.Tests/Orleans.Streams.RabbitMqStreamProvider.Tests.csproj @@ -9,7 +9,7 @@ Properties RabbitMqStreamTests RabbitMqStreamTests - v4.6.1 + v4.7.2 512 {3AC096D0-A1C2-E12C-1390-A8335801FDAB};{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC} 15.0 diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/RmqHelpers.cs b/Orleans.Streams.RabbitMqStreamProvider.Tests/RmqHelpers.cs index 2d67771..39d72af 100644 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/RmqHelpers.cs +++ b/Orleans.Streams.RabbitMqStreamProvider.Tests/RmqHelpers.cs @@ -1,4 +1,5 @@ using RabbitMQ.Client; +using System; namespace RabbitMqStreamTests { @@ -14,18 +15,25 @@ public static void EnsureEmptyQueue() { var factory = new ConnectionFactory { - HostName = "localhost", - VirtualHost = "/", + HostName = "orlytest.golamago.online", + VirtualHost = "stream-test", Port = 5672, - UserName = "guest", - Password = "guest" + UserName = "lama-testing", + Password = "testing" }; - using (var connection = factory.CreateConnection()) - using (var channel = connection.CreateModel()) + try { - channel.QueuePurge(Globals.StreamNameSpaceDefault); - channel.QueuePurge(Globals.StreamNameSpaceProtoBuf); + using (var connection = factory.CreateConnection()) + using (var channel = connection.CreateModel()) + { + channel.QueuePurge(Globals.StreamNameSpaceDefault); + channel.QueuePurge(Globals.StreamNameSpaceProtoBuf); + } + } + catch (Exception e) + { + Console.WriteLine(e); } } } diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/TestCluster.cs b/Orleans.Streams.RabbitMqStreamProvider.Tests/TestCluster.cs index e680649..85b603d 100644 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/TestCluster.cs +++ b/Orleans.Streams.RabbitMqStreamProvider.Tests/TestCluster.cs @@ -121,7 +121,7 @@ public static ISiloHostBuilder ConfigureStreamsAndLogging(this ISiloHostBuilder .AddRabbitMqStream(Globals.StreamProviderNameDefault, configurator => { configurator.ConfigureRabbitMq(host: "localhost", port: ToxiProxyHelpers.RmqProxyPort, - virtualHost: "/", user: "guest", password: "guest", queueName: Globals.StreamNameSpaceDefault); + virtualHost: "stream-test", user: "lama-testing", password: "testing", queueName: Globals.StreamNameSpaceDefault); configurator.ConfigureCache(cacheSize: 100, cacheFillingTimeout: TimeSpan.FromSeconds(10)); configurator.ConfigureStreamPubSub(StreamPubSubType.ImplicitOnly); configurator.ConfigurePullingAgent(ob => ob.Configure( @@ -133,7 +133,7 @@ public static ISiloHostBuilder ConfigureStreamsAndLogging(this ISiloHostBuilder .AddRabbitMqStream(Globals.StreamProviderNameProtoBuf, configurator => { configurator.ConfigureRabbitMq(host: "localhost", port: ToxiProxyHelpers.RmqProxyPort, - virtualHost: "/", user: "guest", password: "guest", queueName: Globals.StreamNameSpaceProtoBuf); + virtualHost: "stream-test", user: "lama-testing", password: "testing", queueName: Globals.StreamNameSpaceProtoBuf); configurator.ConfigureCache(cacheSize: 100, cacheFillingTimeout: TimeSpan.FromSeconds(10)); configurator.ConfigureStreamPubSub(StreamPubSubType.ImplicitOnly); configurator.ConfigurePullingAgent(ob => ob.Configure( @@ -151,12 +151,12 @@ public static IClientBuilder ConfigureStreamsAndLogging(this IClientBuilder buil .AddRabbitMqStream(Globals.StreamProviderNameDefault, configurator => { configurator.ConfigureRabbitMq(host: "localhost", port: ToxiProxyHelpers.RmqProxyPort, - virtualHost: "/", user: "guest", password: "guest", queueName: Globals.StreamNameSpaceDefault); + virtualHost: "stream-test", user: "lama-testing", password: "testing", queueName: Globals.StreamNameSpaceDefault); }) .AddRabbitMqStream(Globals.StreamProviderNameProtoBuf, configurator => { configurator.ConfigureRabbitMq(host: "localhost", port: ToxiProxyHelpers.RmqProxyPort, - virtualHost: "/", user: "guest", password: "guest", queueName: Globals.StreamNameSpaceProtoBuf); + virtualHost: "stream-test", user: "lama-testing", password: "testing", queueName: Globals.StreamNameSpaceProtoBuf); }) .ConfigureLogging(log => log.AddConsole()); } diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/ToxiProxyHelpers.cs b/Orleans.Streams.RabbitMqStreamProvider.Tests/ToxiProxyHelpers.cs index a674826..452ccfa 100644 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/ToxiProxyHelpers.cs +++ b/Orleans.Streams.RabbitMqStreamProvider.Tests/ToxiProxyHelpers.cs @@ -25,7 +25,7 @@ public static Process StartProxy() Name = RmqProxyName, Enabled = true, Listen = $"localhost:{RmqProxyPort}", - Upstream = $"localhost:{RmqPort}" + Upstream = $"orlytest.golamago.online:{RmqPort}" }); return proxyProcess; diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/packages.config b/Orleans.Streams.RabbitMqStreamProvider.Tests/packages.config index 77999c5..821a057 100755 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/packages.config +++ b/Orleans.Streams.RabbitMqStreamProvider.Tests/packages.config @@ -35,14 +35,14 @@ - + - + @@ -51,17 +51,17 @@ - + - + - - + + @@ -70,7 +70,7 @@ - + @@ -78,23 +78,23 @@ - + - + - + - + - + @@ -102,7 +102,7 @@ - + diff --git a/Orleans.Streams.RabbitMqStreamProvider/BatchContainer/RabbitMqBatchContainer.cs b/Orleans.Streams.RabbitMqStreamProvider/BatchContainer/RabbitMqBatchContainer.cs index d45f29c..116dfb3 100644 --- a/Orleans.Streams.RabbitMqStreamProvider/BatchContainer/RabbitMqBatchContainer.cs +++ b/Orleans.Streams.RabbitMqStreamProvider/BatchContainer/RabbitMqBatchContainer.cs @@ -40,7 +40,8 @@ public IEnumerable> GetEvents() .Select((e, i) => Tuple.Create(e, EventSequenceToken?.CreateSequenceTokenForEvent(i))) .ToList(); - public bool ShouldDeliver(IStreamIdentity stream, object filterData, StreamFilterPredicate shouldReceiveFunc) - => _events.Any(item => shouldReceiveFunc(stream, filterData, item)); + // TODO: Раскопать, почему при нормальной обработке фильтра он доставляет только одному подписчику + public bool ShouldDeliver(IStreamIdentity stream, object filterData, StreamFilterPredicate shouldReceiveFunc) => true; + // _events.Any(item => shouldReceiveFunc(stream, filterData, item)); } } \ No newline at end of file diff --git a/Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueAdapterCache.cs b/Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueAdapterCache.cs deleted file mode 100644 index 53b4319..0000000 --- a/Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueAdapterCache.cs +++ /dev/null @@ -1,29 +0,0 @@ -using System; -using System.Collections.Concurrent; - -namespace Orleans.Streams.Cache -{ - public class ConcurrentQueueAdapterCache : IQueueAdapterCache - { - private readonly int _cacheSize; - private readonly ConcurrentDictionary _caches; - - public ConcurrentQueueAdapterCache(int cacheSize) - { - if (cacheSize <= 0) throw new ArgumentOutOfRangeException(nameof(cacheSize), "CacheSize must be a positive number."); - _cacheSize = cacheSize; - _caches = new ConcurrentDictionary(); - } - - /// - /// Create a cache for a given queue id - /// - /// - public IQueueCache CreateQueueCache(QueueId queueId) - { - return _caches.AddOrUpdate(queueId, - id => new ConcurrentQueueCache(_cacheSize), - (id, queueCache) => queueCache); - } - } -} diff --git a/Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueCache.cs b/Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueCache.cs deleted file mode 100644 index f3e31e0..0000000 --- a/Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueCache.cs +++ /dev/null @@ -1,90 +0,0 @@ -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Threading; -using Orleans.Streams.BatchContainer; - -namespace Orleans.Streams.Cache -{ - /// - /// A queue cache that keeps items in memory - /// - /// The dictionary is net kept clean, so if there is huge number of random streams the dictionary will keep growing - /// - public class ConcurrentQueueCache : IQueueCache - { - // StreamIdentity = = - private readonly ConcurrentDictionary, ConcurrentQueue> _cache; - private readonly ConcurrentQueue _itemsToPurge; - private readonly int _maxCacheSize; - - private int _numItemsInCache; - - public ConcurrentQueueCache(int cacheSize) - { - _maxCacheSize = cacheSize; - _cache = new ConcurrentDictionary, ConcurrentQueue>(); - _itemsToPurge = new ConcurrentQueue(); - } - - /// - /// The limit of the maximum number of items that can be added. - /// - /// Returns just an estimate. It doesn't have to be exact. It would require extra locking. - /// - /// Note: there is a condition in pulling agent that if maxAddCount is less than 0 and not equal to -1 (unlimited), - /// it will skip the reading cycle including cache purging - /// - public int GetMaxAddCount() => Math.Max(1, _maxCacheSize - _numItemsInCache); - - public bool IsUnderPressure() => _numItemsInCache >= _maxCacheSize; - - public void AddToCache(IList messages) - { - foreach (var msg in messages) - { - _cache.GetOrAdd(new Tuple(msg.StreamGuid, msg.StreamNamespace), - new ConcurrentQueue()) - .Enqueue((RabbitMqBatchContainer) msg); - } - Interlocked.Add(ref _numItemsInCache, messages.Count); - } - - public bool TryPurgeFromCache(out IList purgedItems) - { - purgedItems = null; - if (!_itemsToPurge.IsEmpty) - { - purgedItems = new List(); - while (_itemsToPurge.TryDequeue(out var item)) - { - purgedItems.Add(item); - } - Interlocked.Add(ref _numItemsInCache, -purgedItems.Count); - } - return purgedItems?.Count > 0; - } - - /// - /// Here we ignore the token since this is not rewindable stream anyway - /// - public IQueueCacheCursor GetCacheCursor(IStreamIdentity streamIdentity, StreamSequenceToken token) - { - return new ConcurrentQueueCacheCursor( - moveNext: () => - { - RabbitMqBatchContainer item = null; - _cache.TryGetValue(new Tuple(streamIdentity.Guid, streamIdentity.Namespace), out var queue); - queue?.TryDequeue(out item); - return item; - }, - purgeItem: item => - { - if (item != null) - { - _itemsToPurge.Enqueue(item); - } - }); - } - } -} diff --git a/Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueCacheCursor.cs b/Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueCacheCursor.cs deleted file mode 100644 index 9f7bb47..0000000 --- a/Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueCacheCursor.cs +++ /dev/null @@ -1,59 +0,0 @@ -using System; -using Orleans.Streams.BatchContainer; - -namespace Orleans.Streams.Cache -{ - public class ConcurrentQueueCacheCursor : IQueueCacheCursor - { - private readonly Func _moveNext; - private readonly Action _purgeItem; - - private readonly object _syncRoot = new object(); - - private RabbitMqBatchContainer _current; - - public ConcurrentQueueCacheCursor(Func moveNext, Action purgeItem) - { - _moveNext = moveNext ?? throw new ArgumentNullException(nameof(moveNext)); - _purgeItem = purgeItem ?? throw new ArgumentNullException(nameof(purgeItem)); - } - - public void Dispose() - { - lock (_syncRoot) - { - _purgeItem(_current); - _current = null; - } - } - - public IBatchContainer GetCurrent(out Exception exception) - { - exception = null; - return _current; - } - - public bool MoveNext() - { - lock (_syncRoot) - { - _purgeItem(_current); - _current = _moveNext(); - } - return _current != null; - } - - public void Refresh(StreamSequenceToken token) - { - // do nothing - } - - public void RecordDeliveryFailure() - { - if (_current != null) - { - _current.DeliveryFailure = true; - } - } - } -} \ No newline at end of file diff --git a/Orleans.Streams.RabbitMqStreamProvider/RabbitMqAdapterFactory.cs b/Orleans.Streams.RabbitMqStreamProvider/RabbitMqAdapterFactory.cs index 46d150c..74b9699 100755 --- a/Orleans.Streams.RabbitMqStreamProvider/RabbitMqAdapterFactory.cs +++ b/Orleans.Streams.RabbitMqStreamProvider/RabbitMqAdapterFactory.cs @@ -3,6 +3,7 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Orleans.Configuration; +using Orleans.Providers.Streams.Common; using Orleans.Serialization; using Orleans.Streams.BatchContainer; using Orleans.Streams.Cache; @@ -29,7 +30,7 @@ public RabbitMqAdapterFactory( if (serviceProvider == null) throw new ArgumentNullException(nameof(serviceProvider)); if (loggerFactory == null) throw new ArgumentNullException(nameof(loggerFactory)); - _cache = new ConcurrentQueueAdapterCache(cachingOptions.CacheSize); + _cache = new SimpleQueueAdapterCache(new SimpleQueueCacheOptions { CacheSize = cachingOptions.CacheSize }, providerName, loggerFactory); _mapper = new HashRingBasedStreamQueueMapper(new HashRingStreamQueueMapperOptions { TotalQueueCount = rmqOptions.NumberOfQueues }, rmqOptions.QueueNamePrefix); _failureHandler = Task.FromResult(new NoOpStreamDeliveryFailureHandler(false)); From 7c0da2f01e3dc5fa7e47d6c199193e512acf01a6 Mon Sep 17 00:00:00 2001 From: Sergey Zorchenko Date: Thu, 29 Aug 2019 12:42:25 +0300 Subject: [PATCH 2/5] netcore 3.0 and orleans 3.0 support --- .../App.config | 97 ++--- .../CacheTests.cs | 158 -------- ...treams.RabbitMqStreamProvider.Tests.csproj | 336 ++---------------- .../Properties/AssemblyInfo.cs | 15 +- .../RmqHelpers.cs | 16 +- .../TestCluster.cs | 4 +- .../ToxiProxyHelpers.cs | 10 +- .../packages.config | 113 ------ .../BatchContainer/RabbitMqBatchContainer.cs | 5 +- .../Cache/ConcurrentQueueAdapterCache.cs | 29 -- .../Cache/ConcurrentQueueCache.cs | 90 ----- .../Cache/ConcurrentQueueCacheCursor.cs | 59 --- .../ClientBuilderExtensions.cs | 25 +- ...eans.Streams.RabbitMqStreamProvider.csproj | 9 +- .../RabbitMqAdapterFactory.cs | 10 +- .../RabbitMqStreamBuilder.cs | 75 +--- .../SiloBuilderExtensions.cs | 67 ++++ nuget.config | 6 + 18 files changed, 205 insertions(+), 919 deletions(-) delete mode 100644 Orleans.Streams.RabbitMqStreamProvider.Tests/CacheTests.cs delete mode 100755 Orleans.Streams.RabbitMqStreamProvider.Tests/packages.config delete mode 100644 Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueAdapterCache.cs delete mode 100644 Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueCache.cs delete mode 100644 Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueCacheCursor.cs create mode 100644 nuget.config diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/App.config b/Orleans.Streams.RabbitMqStreamProvider.Tests/App.config index 478741b..23d8cda 100644 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/App.config +++ b/Orleans.Streams.RabbitMqStreamProvider.Tests/App.config @@ -1,97 +1,100 @@ - + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + - - + + + + + \ No newline at end of file diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/CacheTests.cs b/Orleans.Streams.RabbitMqStreamProvider.Tests/CacheTests.cs deleted file mode 100644 index 5faa398..0000000 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/CacheTests.cs +++ /dev/null @@ -1,158 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Diagnostics; -using System.Linq; -using System.Threading.Tasks; -using Microsoft.VisualStudio.TestTools.UnitTesting; -using Orleans.Providers.Streams.Common; -using Orleans.Streams; -using Orleans.Streams.BatchContainer; -using Orleans.Streams.Cache; - -namespace RabbitMqStreamTests -{ - [TestClass] - public class CacheTests - { - [TestMethod] - public async Task RunCacheReadHeavyUsageTestMultipleTimes() - { - for (int i = 0; i < 10; i++) - { - await TestOrleansCacheReadHeavyUsage(); - } - } - - [TestMethod] - public async Task RunCacheWriteHeavyUsageTestMultipleTimes() - { - for (int i = 0; i < 10; i++) - { - await TestOrleansCacheWriteHeavyUsage(); - } - } - - [TestMethod] - public async Task TestOrleansCacheCursorReadHeavyUsage() - { - int cacheSize = 1000000; - var queueCache = new ConcurrentQueueCache(cacheSize); - queueCache.AddToCache(GetQueueMessages(queueCache.GetMaxAddCount())); - - var watch = Stopwatch.StartNew(); - using (var cursor = queueCache.GetCacheCursor(StreamIdentity, null)) - { - var tasks = new List(); - for (int i = 0; i < 10; i++) - { - tasks.Add(Task.Run(() => - { - while (cursor.MoveNext()) - { - var batch = cursor.GetCurrent(out var ignore); - } - })); - } - - await Task.WhenAll(tasks); - } - watch.Stop(); - Console.WriteLine($"Read {cacheSize} items in {watch.ElapsedMilliseconds} ms"); - // without locking ~170ms, with locking ~680ms, thus ~4x slower - - queueCache.TryPurgeFromCache(out var finalPurgedItems); - - Assert.AreEqual(cacheSize, finalPurgedItems.Count, "purged items"); - Assert.AreEqual(cacheSize, queueCache.GetMaxAddCount(), "cache size"); - } - - private async Task TestOrleansCacheReadHeavyUsage() - { - var queueCache = new ConcurrentQueueCache(CacheSize); - - var tasks = new List(); - for (int i = 0; i < 10; i++) - { - queueCache.TryPurgeFromCache(out var purgedItems); - - var multiBatch = GetQueueMessages(new Random().Next(queueCache.GetMaxAddCount())); - queueCache.AddToCache(multiBatch); - - if (multiBatch.Count > 0) - { - tasks.Add(Task.Run(async () => - { - using (var cursor = queueCache.GetCacheCursor(StreamIdentity, multiBatch.First().SequenceToken)) - { - while (cursor.MoveNext()) - { - var batch = cursor.GetCurrent(out var ignore); - await Task.Delay(TimeSpan.FromMilliseconds(50 + new Random().Next(100))); - } - } - })); - } - await Task.Delay(TimeSpan.FromMilliseconds(200)); - } - - await Task.WhenAll(tasks); - queueCache.TryPurgeFromCache(out var finalPurgedItems); - - Assert.AreEqual(CacheSize, queueCache.GetMaxAddCount()); - } - - private async Task TestOrleansCacheWriteHeavyUsage() - { - var queueCache = new ConcurrentQueueCache(CacheSize); - - var tasks = new List(); - for (int i = 0; i < 1000; i++) - { - queueCache.TryPurgeFromCache(out var purgedItems); - - var multiBatch = GetQueueMessages(queueCache.GetMaxAddCount()); - queueCache.AddToCache(multiBatch); - - if (multiBatch.Count > 0) - { - tasks.Add(Task.Run(async () => - { - using (var cursor = queueCache.GetCacheCursor(StreamIdentity, multiBatch.First().SequenceToken)) - { - while (cursor.MoveNext()) - { - var batch = cursor.GetCurrent(out var ignore); - await Task.Delay(TimeSpan.FromMilliseconds(50 + new Random().Next(100))); - } - } - })); - } - } - - await Task.WhenAll(tasks); - queueCache.TryPurgeFromCache(out var finalPurgedItems); - - Assert.AreEqual(CacheSize, queueCache.GetMaxAddCount()); - } - - private const int CacheSize = 50; - - private static readonly StreamIdentity StreamIdentity = new StreamIdentity(Guid.Empty, "test"); - private static ulong _deliveryTag = 0; - - private static IList GetQueueMessages(int count) - { - return Enumerable.Range(0, count) - .Select(i => - new RabbitMqBatchContainer( - StreamIdentity.Guid, StreamIdentity.Namespace, - new[] {"value"}.ToList(), - new Dictionary()) - { - DeliveryTag = _deliveryTag++, - EventSequenceToken = new EventSequenceToken((long) _deliveryTag) - }) - .ToList(); - } - } -} \ No newline at end of file diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/Orleans.Streams.RabbitMqStreamProvider.Tests.csproj b/Orleans.Streams.RabbitMqStreamProvider.Tests/Orleans.Streams.RabbitMqStreamProvider.Tests.csproj index 9ee4162..96ff9ce 100755 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/Orleans.Streams.RabbitMqStreamProvider.Tests.csproj +++ b/Orleans.Streams.RabbitMqStreamProvider.Tests/Orleans.Streams.RabbitMqStreamProvider.Tests.csproj @@ -1,337 +1,49 @@ - - - + - Debug - AnyCPU - {BEE3BDC1-33A5-4C5E-A7F3-A0FA050D99F4} - Library - Properties + netcoreapp2.1 + latest RabbitMqStreamTests RabbitMqStreamTests - v4.6.1 - 512 {3AC096D0-A1C2-E12C-1390-A8335801FDAB};{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC} - 15.0 $(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion) $(ProgramFiles)\Common Files\microsoft shared\VSTT\$(VisualStudioVersion)\UITestExtensionPackages False UnitTest - - - + Orleans.Streams.RabbitMqStreamProvider.Tests + Orleans.Streams.RabbitMqStreamProvider.Tests + Copyright © Martin Ovesny 2017 + bin\$(Configuration)\ - true full - false - bin\Debug\ - DEBUG;TRACE - prompt - 4 pdbonly - true - bin\Release\ - TRACE - prompt - 4 + - - - - - - - - - - - - - - - - - - Designer - - - Designer - + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + + + + + - - ..\packages\Microsoft.CodeAnalysis.Common.2.8.2\lib\netstandard1.3\Microsoft.CodeAnalysis.dll - - - ..\packages\Microsoft.CodeAnalysis.CSharp.2.8.2\lib\netstandard1.3\Microsoft.CodeAnalysis.CSharp.dll - - - ..\packages\Microsoft.Diagnostics.Tracing.EventSource.Redist.1.1.28\lib\net46\Microsoft.Diagnostics.Tracing.EventSource.dll - - - ..\packages\Microsoft.DotNet.PlatformAbstractions.2.0.0\lib\net45\Microsoft.DotNet.PlatformAbstractions.dll - - - ..\packages\Microsoft.Extensions.Configuration.2.1.1\lib\netstandard2.0\Microsoft.Extensions.Configuration.dll - - - ..\packages\Microsoft.Extensions.Configuration.Abstractions.2.1.1\lib\netstandard2.0\Microsoft.Extensions.Configuration.Abstractions.dll - - - ..\packages\Microsoft.Extensions.Configuration.Binder.2.1.1\lib\netstandard2.0\Microsoft.Extensions.Configuration.Binder.dll - - - ..\packages\Microsoft.Extensions.DependencyInjection.2.1.1\lib\net461\Microsoft.Extensions.DependencyInjection.dll - - - ..\packages\Microsoft.Extensions.DependencyInjection.Abstractions.2.1.1\lib\netstandard2.0\Microsoft.Extensions.DependencyInjection.Abstractions.dll - - - ..\packages\Microsoft.Extensions.DependencyModel.2.0.0\lib\net451\Microsoft.Extensions.DependencyModel.dll - - - ..\packages\Microsoft.Extensions.FileProviders.Abstractions.2.1.0\lib\netstandard2.0\Microsoft.Extensions.FileProviders.Abstractions.dll - - - ..\packages\Microsoft.Extensions.Hosting.Abstractions.2.1.0\lib\netstandard2.0\Microsoft.Extensions.Hosting.Abstractions.dll - - - ..\packages\Microsoft.Extensions.Logging.2.1.1\lib\netstandard2.0\Microsoft.Extensions.Logging.dll - - - ..\packages\Microsoft.Extensions.Logging.Abstractions.2.1.1\lib\netstandard2.0\Microsoft.Extensions.Logging.Abstractions.dll - - - ..\packages\Microsoft.Extensions.Logging.Configuration.2.1.1\lib\netstandard2.0\Microsoft.Extensions.Logging.Configuration.dll - - - ..\packages\Microsoft.Extensions.Logging.Console.2.1.1\lib\netstandard2.0\Microsoft.Extensions.Logging.Console.dll - - - ..\packages\Microsoft.Extensions.Options.2.1.1\lib\netstandard2.0\Microsoft.Extensions.Options.dll - - - ..\packages\Microsoft.Extensions.Options.ConfigurationExtensions.2.1.1\lib\netstandard2.0\Microsoft.Extensions.Options.ConfigurationExtensions.dll - - - ..\packages\Microsoft.Extensions.Primitives.2.1.1\lib\netstandard2.0\Microsoft.Extensions.Primitives.dll - - - ..\packages\MSTest.TestFramework.1.1.11\lib\net45\Microsoft.VisualStudio.TestPlatform.TestFramework.dll - - - ..\packages\MSTest.TestFramework.1.1.11\lib\net45\Microsoft.VisualStudio.TestPlatform.TestFramework.Extensions.dll - - - ..\packages\Microsoft.Win32.Primitives.4.3.0\lib\net46\Microsoft.Win32.Primitives.dll - - - ..\packages\Newtonsoft.Json.10.0.3\lib\net45\Newtonsoft.Json.dll - - - ..\packages\Microsoft.Orleans.OrleansCodeGenerator.2.3.0\lib\netstandard2.0\Orleans.CodeGeneration.dll - - - ..\packages\Microsoft.Orleans.Core.2.3.0\lib\netstandard2.0\Orleans.Core.dll - - - ..\packages\Microsoft.Orleans.Core.Abstractions.2.3.0\lib\netstandard2.0\Orleans.Core.Abstractions.dll - - - ..\packages\Microsoft.Orleans.OrleansRuntime.2.3.0\lib\netstandard2.0\Orleans.Runtime.dll - - - ..\packages\Microsoft.Orleans.Runtime.Abstractions.2.3.0\lib\netstandard2.0\Orleans.Runtime.Abstractions.dll - - - ..\packages\Microsoft.Orleans.OrleansProviders.2.3.0\lib\netstandard2.0\OrleansProviders.dll - - - ..\packages\protobuf-net.2.3.13\lib\net40\protobuf-net.dll - - - ..\packages\RabbitMQ.Client.5.1.0\lib\net451\RabbitMQ.Client.dll - - - - ..\packages\System.AppContext.4.3.0\lib\net46\System.AppContext.dll - - - ..\packages\System.Buffers.4.4.0\lib\netstandard2.0\System.Buffers.dll - - - ..\packages\System.Collections.Immutable.1.4.0\lib\netstandard2.0\System.Collections.Immutable.dll - - - ..\packages\System.ComponentModel.Primitives.4.3.0\lib\net45\System.ComponentModel.Primitives.dll - - - ..\packages\System.ComponentModel.TypeConverter.4.3.0\lib\net45\System.ComponentModel.TypeConverter.dll - - - ..\packages\System.Console.4.3.0\lib\net46\System.Console.dll - - - ..\packages\System.Diagnostics.DiagnosticSource.4.3.0\lib\net46\System.Diagnostics.DiagnosticSource.dll - - - ..\packages\System.Diagnostics.FileVersionInfo.4.3.0\lib\net46\System.Diagnostics.FileVersionInfo.dll - - - ..\packages\System.Diagnostics.Process.4.3.0\lib\net461\System.Diagnostics.Process.dll - True - True - - - ..\packages\System.Diagnostics.StackTrace.4.3.0\lib\net46\System.Diagnostics.StackTrace.dll - - - ..\packages\System.Diagnostics.TraceSource.4.3.0\lib\net46\System.Diagnostics.TraceSource.dll - True - True - - - ..\packages\System.Globalization.Calendars.4.3.0\lib\net46\System.Globalization.Calendars.dll - - - ..\packages\System.IO.Compression.4.3.0\lib\net46\System.IO.Compression.dll - True - - - - ..\packages\System.IO.Compression.ZipFile.4.3.0\lib\net46\System.IO.Compression.ZipFile.dll - - - ..\packages\System.IO.FileSystem.4.3.0\lib\net46\System.IO.FileSystem.dll - - - ..\packages\System.IO.FileSystem.Primitives.4.3.0\lib\net46\System.IO.FileSystem.Primitives.dll - - - ..\packages\System.Memory.4.5.1\lib\netstandard2.0\System.Memory.dll - - - ..\packages\System.Net.Http.4.3.0\lib\net46\System.Net.Http.dll - - - ..\packages\Toxiproxy.Net.2.0.1\lib\net45\System.Net.Http.Formatting.dll - - - ..\packages\System.Net.NameResolution.4.3.0\lib\net46\System.Net.NameResolution.dll - True - True - - - ..\packages\System.Net.NetworkInformation.4.3.0\lib\net46\System.Net.NetworkInformation.dll - True - True - - - ..\packages\System.Net.Sockets.4.3.0\lib\net46\System.Net.Sockets.dll - - - - ..\packages\System.Numerics.Vectors.4.4.0\lib\net46\System.Numerics.Vectors.dll - - - ..\packages\System.Reflection.Metadata.1.5.0\lib\netstandard2.0\System.Reflection.Metadata.dll - - - ..\packages\System.Reflection.TypeExtensions.4.4.0\lib\net461\System.Reflection.TypeExtensions.dll - - - ..\packages\System.Runtime.CompilerServices.Unsafe.4.5.1\lib\netstandard2.0\System.Runtime.CompilerServices.Unsafe.dll - - - ..\packages\System.Runtime.InteropServices.RuntimeInformation.4.3.0\lib\net45\System.Runtime.InteropServices.RuntimeInformation.dll - True - - - - ..\packages\System.Runtime.Serialization.Formatters.4.3.0\lib\net46\System.Runtime.Serialization.Formatters.dll - True - True - - - ..\packages\System.Runtime.Serialization.Primitives.4.3.0\lib\net46\System.Runtime.Serialization.Primitives.dll - True - True - - - ..\packages\System.Security.Cryptography.Algorithms.4.3.0\lib\net461\System.Security.Cryptography.Algorithms.dll - - - ..\packages\System.Security.Cryptography.Encoding.4.3.0\lib\net46\System.Security.Cryptography.Encoding.dll - - - ..\packages\System.Security.Cryptography.Primitives.4.3.0\lib\net46\System.Security.Cryptography.Primitives.dll - - - ..\packages\System.Security.Cryptography.X509Certificates.4.3.0\lib\net461\System.Security.Cryptography.X509Certificates.dll - - - ..\packages\System.Text.Encoding.CodePages.4.3.0\lib\net46\System.Text.Encoding.CodePages.dll - - - ..\packages\System.Threading.Tasks.Extensions.4.4.0\lib\netstandard2.0\System.Threading.Tasks.Extensions.dll - - - ..\packages\System.Threading.Thread.4.3.0\lib\net46\System.Threading.Thread.dll - - - ..\packages\System.Threading.ThreadPool.4.3.0\lib\net46\System.Threading.ThreadPool.dll - True - True - - - ..\packages\System.ValueTuple.4.4.0\lib\net461\System.ValueTuple.dll - True - True - - - - - ..\packages\System.Xml.ReaderWriter.4.3.0\lib\net46\System.Xml.ReaderWriter.dll - - - ..\packages\System.Xml.XmlDocument.4.3.0\lib\net46\System.Xml.XmlDocument.dll - - - ..\packages\System.Xml.XPath.4.3.0\lib\net46\System.Xml.XPath.dll - - - ..\packages\System.Xml.XPath.XDocument.4.3.0\lib\net46\System.Xml.XPath.XDocument.dll - - - ..\packages\System.Xml.XPath.XmlDocument.4.3.0\lib\net46\System.Xml.XPath.XmlDocument.dll - - - ..\packages\Toxiproxy.Net.2.0.1\lib\net45\Toxiproxy.Net.dll - - - {7b0ee715-ac3a-426e-a0a5-317afdeaa251} - Orleans.Streams.RabbitMqStreamProvider - + - - - - - This project references NuGet package(s) that are missing on this computer. Use NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}. - - - - - - - \ No newline at end of file diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/Properties/AssemblyInfo.cs b/Orleans.Streams.RabbitMqStreamProvider.Tests/Properties/AssemblyInfo.cs index f00fae8..6388f45 100755 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/Properties/AssemblyInfo.cs +++ b/Orleans.Streams.RabbitMqStreamProvider.Tests/Properties/AssemblyInfo.cs @@ -1,19 +1,6 @@ -using System.Reflection; +using System.Reflection; using System.Runtime.InteropServices; -[assembly: AssemblyTitle("Orleans.Streams.RabbitMqStreamProvider.Tests")] -[assembly: AssemblyDescription("")] -[assembly: AssemblyConfiguration("")] -[assembly: AssemblyCompany("")] -[assembly: AssemblyProduct("Orleans.Streams.RabbitMqStreamProvider.Tests")] -[assembly: AssemblyCopyright("Copyright © Martin Ovesny 2017")] -[assembly: AssemblyTrademark("")] -[assembly: AssemblyCulture("")] - [assembly: ComVisible(false)] [assembly: Guid("bee3bdc1-33a5-4c5e-a7f3-a0fa050d99f4")] - -// [assembly: AssemblyVersion("1.0.*")] -[assembly: AssemblyVersion("1.0.0.0")] -[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/RmqHelpers.cs b/Orleans.Streams.RabbitMqStreamProvider.Tests/RmqHelpers.cs index 2d67771..dbaa113 100644 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/RmqHelpers.cs +++ b/Orleans.Streams.RabbitMqStreamProvider.Tests/RmqHelpers.cs @@ -1,4 +1,5 @@ using RabbitMQ.Client; +using System; namespace RabbitMqStreamTests { @@ -21,11 +22,18 @@ public static void EnsureEmptyQueue() Password = "guest" }; - using (var connection = factory.CreateConnection()) - using (var channel = connection.CreateModel()) + try { - channel.QueuePurge(Globals.StreamNameSpaceDefault); - channel.QueuePurge(Globals.StreamNameSpaceProtoBuf); + using (var connection = factory.CreateConnection()) + using (var channel = connection.CreateModel()) + { + channel.QueuePurge(Globals.StreamNameSpaceDefault); + channel.QueuePurge(Globals.StreamNameSpaceProtoBuf); + } + } + catch (Exception e) + { + Console.WriteLine(e); } } } diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/TestCluster.cs b/Orleans.Streams.RabbitMqStreamProvider.Tests/TestCluster.cs index e680649..86d6dfc 100644 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/TestCluster.cs +++ b/Orleans.Streams.RabbitMqStreamProvider.Tests/TestCluster.cs @@ -67,7 +67,6 @@ public static async Task Create() .UseLocalhostClustering(siloPort: 11111, gatewayPort: 30000) .Configure(options => { - options.ExpectedClusterSize = 2; options.UseLivenessGossip = true; options.ProbeTimeout = TimeSpan.FromSeconds(5); options.NumMissedProbesLimit = 3; @@ -80,7 +79,6 @@ public static async Task Create() primarySiloEndpoint: new IPEndPoint(IPAddress.Loopback, EndpointOptions.DEFAULT_SILO_PORT)) .Configure(options => { - options.ExpectedClusterSize = 2; options.UseLivenessGossip = true; options.ProbeTimeout = TimeSpan.FromSeconds(5); options.NumMissedProbesLimit = 3; @@ -117,6 +115,7 @@ internal static class ClusterBuilderExtentions public static ISiloHostBuilder ConfigureStreamsAndLogging(this ISiloHostBuilder builder) { return builder + .ConfigureApplicationParts(p => p.AddApplicationPart(typeof(IAggregatorGrain).Assembly)) .AddMemoryGrainStorage("PubSubStore") .AddRabbitMqStream(Globals.StreamProviderNameDefault, configurator => { @@ -148,6 +147,7 @@ public static ISiloHostBuilder ConfigureStreamsAndLogging(this ISiloHostBuilder public static IClientBuilder ConfigureStreamsAndLogging(this IClientBuilder builder) { return builder + .ConfigureApplicationParts(p => p.AddApplicationPart(typeof(IAggregatorGrain).Assembly)) .AddRabbitMqStream(Globals.StreamProviderNameDefault, configurator => { configurator.ConfigureRabbitMq(host: "localhost", port: ToxiProxyHelpers.RmqProxyPort, diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/ToxiProxyHelpers.cs b/Orleans.Streams.RabbitMqStreamProvider.Tests/ToxiProxyHelpers.cs index a674826..3bfcdd5 100644 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/ToxiProxyHelpers.cs +++ b/Orleans.Streams.RabbitMqStreamProvider.Tests/ToxiProxyHelpers.cs @@ -1,4 +1,5 @@ using System.Diagnostics; +using System.IO; using Toxiproxy.Net; using Toxiproxy.Net.Toxics; @@ -13,10 +14,15 @@ internal static class ToxiProxyHelpers public static Process StartProxy() { StopProxyIfRunning(); - + var path = @"..\packages\toxiproxy.Net\2.0.1\compiled\Win64\toxiproxy-server-2.1.2-windows-amd64.exe"; + var dir = Directory.GetCurrentDirectory(); + while(new DirectoryInfo(dir).Name != "Orleans.Streams.RabbitMqStreamProvider.Tests") + { + dir = Directory.GetParent(dir).FullName; + } var proxyProcess = new Process { - StartInfo = new ProcessStartInfo(@"..\..\..\packages\Toxiproxy.Net.2.0.1\compiled\Win64\toxiproxy-server-2.1.2-windows-amd64.exe") + StartInfo = new ProcessStartInfo(Path.Combine(dir, path)) }; proxyProcess.Start(); diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/packages.config b/Orleans.Streams.RabbitMqStreamProvider.Tests/packages.config deleted file mode 100755 index 77999c5..0000000 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/packages.config +++ /dev/null @@ -1,113 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/Orleans.Streams.RabbitMqStreamProvider/BatchContainer/RabbitMqBatchContainer.cs b/Orleans.Streams.RabbitMqStreamProvider/BatchContainer/RabbitMqBatchContainer.cs index d45f29c..116dfb3 100644 --- a/Orleans.Streams.RabbitMqStreamProvider/BatchContainer/RabbitMqBatchContainer.cs +++ b/Orleans.Streams.RabbitMqStreamProvider/BatchContainer/RabbitMqBatchContainer.cs @@ -40,7 +40,8 @@ public IEnumerable> GetEvents() .Select((e, i) => Tuple.Create(e, EventSequenceToken?.CreateSequenceTokenForEvent(i))) .ToList(); - public bool ShouldDeliver(IStreamIdentity stream, object filterData, StreamFilterPredicate shouldReceiveFunc) - => _events.Any(item => shouldReceiveFunc(stream, filterData, item)); + // TODO: Раскопать, почему при нормальной обработке фильтра он доставляет только одному подписчику + public bool ShouldDeliver(IStreamIdentity stream, object filterData, StreamFilterPredicate shouldReceiveFunc) => true; + // _events.Any(item => shouldReceiveFunc(stream, filterData, item)); } } \ No newline at end of file diff --git a/Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueAdapterCache.cs b/Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueAdapterCache.cs deleted file mode 100644 index 53b4319..0000000 --- a/Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueAdapterCache.cs +++ /dev/null @@ -1,29 +0,0 @@ -using System; -using System.Collections.Concurrent; - -namespace Orleans.Streams.Cache -{ - public class ConcurrentQueueAdapterCache : IQueueAdapterCache - { - private readonly int _cacheSize; - private readonly ConcurrentDictionary _caches; - - public ConcurrentQueueAdapterCache(int cacheSize) - { - if (cacheSize <= 0) throw new ArgumentOutOfRangeException(nameof(cacheSize), "CacheSize must be a positive number."); - _cacheSize = cacheSize; - _caches = new ConcurrentDictionary(); - } - - /// - /// Create a cache for a given queue id - /// - /// - public IQueueCache CreateQueueCache(QueueId queueId) - { - return _caches.AddOrUpdate(queueId, - id => new ConcurrentQueueCache(_cacheSize), - (id, queueCache) => queueCache); - } - } -} diff --git a/Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueCache.cs b/Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueCache.cs deleted file mode 100644 index f3e31e0..0000000 --- a/Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueCache.cs +++ /dev/null @@ -1,90 +0,0 @@ -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Threading; -using Orleans.Streams.BatchContainer; - -namespace Orleans.Streams.Cache -{ - /// - /// A queue cache that keeps items in memory - /// - /// The dictionary is net kept clean, so if there is huge number of random streams the dictionary will keep growing - /// - public class ConcurrentQueueCache : IQueueCache - { - // StreamIdentity = = - private readonly ConcurrentDictionary, ConcurrentQueue> _cache; - private readonly ConcurrentQueue _itemsToPurge; - private readonly int _maxCacheSize; - - private int _numItemsInCache; - - public ConcurrentQueueCache(int cacheSize) - { - _maxCacheSize = cacheSize; - _cache = new ConcurrentDictionary, ConcurrentQueue>(); - _itemsToPurge = new ConcurrentQueue(); - } - - /// - /// The limit of the maximum number of items that can be added. - /// - /// Returns just an estimate. It doesn't have to be exact. It would require extra locking. - /// - /// Note: there is a condition in pulling agent that if maxAddCount is less than 0 and not equal to -1 (unlimited), - /// it will skip the reading cycle including cache purging - /// - public int GetMaxAddCount() => Math.Max(1, _maxCacheSize - _numItemsInCache); - - public bool IsUnderPressure() => _numItemsInCache >= _maxCacheSize; - - public void AddToCache(IList messages) - { - foreach (var msg in messages) - { - _cache.GetOrAdd(new Tuple(msg.StreamGuid, msg.StreamNamespace), - new ConcurrentQueue()) - .Enqueue((RabbitMqBatchContainer) msg); - } - Interlocked.Add(ref _numItemsInCache, messages.Count); - } - - public bool TryPurgeFromCache(out IList purgedItems) - { - purgedItems = null; - if (!_itemsToPurge.IsEmpty) - { - purgedItems = new List(); - while (_itemsToPurge.TryDequeue(out var item)) - { - purgedItems.Add(item); - } - Interlocked.Add(ref _numItemsInCache, -purgedItems.Count); - } - return purgedItems?.Count > 0; - } - - /// - /// Here we ignore the token since this is not rewindable stream anyway - /// - public IQueueCacheCursor GetCacheCursor(IStreamIdentity streamIdentity, StreamSequenceToken token) - { - return new ConcurrentQueueCacheCursor( - moveNext: () => - { - RabbitMqBatchContainer item = null; - _cache.TryGetValue(new Tuple(streamIdentity.Guid, streamIdentity.Namespace), out var queue); - queue?.TryDequeue(out item); - return item; - }, - purgeItem: item => - { - if (item != null) - { - _itemsToPurge.Enqueue(item); - } - }); - } - } -} diff --git a/Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueCacheCursor.cs b/Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueCacheCursor.cs deleted file mode 100644 index 9f7bb47..0000000 --- a/Orleans.Streams.RabbitMqStreamProvider/Cache/ConcurrentQueueCacheCursor.cs +++ /dev/null @@ -1,59 +0,0 @@ -using System; -using Orleans.Streams.BatchContainer; - -namespace Orleans.Streams.Cache -{ - public class ConcurrentQueueCacheCursor : IQueueCacheCursor - { - private readonly Func _moveNext; - private readonly Action _purgeItem; - - private readonly object _syncRoot = new object(); - - private RabbitMqBatchContainer _current; - - public ConcurrentQueueCacheCursor(Func moveNext, Action purgeItem) - { - _moveNext = moveNext ?? throw new ArgumentNullException(nameof(moveNext)); - _purgeItem = purgeItem ?? throw new ArgumentNullException(nameof(purgeItem)); - } - - public void Dispose() - { - lock (_syncRoot) - { - _purgeItem(_current); - _current = null; - } - } - - public IBatchContainer GetCurrent(out Exception exception) - { - exception = null; - return _current; - } - - public bool MoveNext() - { - lock (_syncRoot) - { - _purgeItem(_current); - _current = _moveNext(); - } - return _current != null; - } - - public void Refresh(StreamSequenceToken token) - { - // do nothing - } - - public void RecordDeliveryFailure() - { - if (_current != null) - { - _current.DeliveryFailure = true; - } - } - } -} \ No newline at end of file diff --git a/Orleans.Streams.RabbitMqStreamProvider/ClientBuilderExtensions.cs b/Orleans.Streams.RabbitMqStreamProvider/ClientBuilderExtensions.cs index 3d3ba1c..7b6a207 100644 --- a/Orleans.Streams.RabbitMqStreamProvider/ClientBuilderExtensions.cs +++ b/Orleans.Streams.RabbitMqStreamProvider/ClientBuilderExtensions.cs @@ -1,6 +1,8 @@ -using System; -using Orleans.Streaming; +using Microsoft.Extensions.DependencyInjection; +using Orleans.Configuration; +using Orleans.Streams; using Orleans.Streams.BatchContainer; +using System; namespace Orleans.Hosting { @@ -16,11 +18,30 @@ public static class ClientBuilderExtensions return builder; } + public class ClusterClientRabbitMqStreamConfigurator : ClusterClientPersistentStreamConfigurator, IClusterClientAzureQueueStreamConfigurator where TSerializer : IBatchContainerSerializer, new() + { + public ClusterClientRabbitMqStreamConfigurator(string name, IClientBuilder builder) + : base(name, builder, RabbitMqAdapterFactory.Create) + { + this.ConfigureComponent(SimpleQueueCacheOptionsValidator.Create); + + builder + .ConfigureApplicationParts(RabbitMQStreamConfiguratorCommon.AddParts) + .ConfigureServices(services => + services.ConfigureNamedOptionForLogging(name) + .AddTransient(sp => new RabbitMqOptionsValidator(sp.GetOptionsByName(name), name)) + .ConfigureNamedOptionForLogging(name)); + + } + } + /// /// Configure client to use RMQ persistent streams. /// This version uses the default Orleans serializer. /// public static IClientBuilder AddRabbitMqStream(this IClientBuilder builder, string name, Action> configure) => AddRabbitMqStream(builder, name, configure); + + public interface IClusterClientAzureQueueStreamConfigurator : IRabbitMQStreamConfigurator, IClusterClientPersistentStreamConfigurator { } } } \ No newline at end of file diff --git a/Orleans.Streams.RabbitMqStreamProvider/Orleans.Streams.RabbitMqStreamProvider.csproj b/Orleans.Streams.RabbitMqStreamProvider/Orleans.Streams.RabbitMqStreamProvider.csproj index fbcf832..88eb63e 100755 --- a/Orleans.Streams.RabbitMqStreamProvider/Orleans.Streams.RabbitMqStreamProvider.csproj +++ b/Orleans.Streams.RabbitMqStreamProvider/Orleans.Streams.RabbitMqStreamProvider.csproj @@ -1,12 +1,9 @@ - - + netstandard2.0 - - + - - + \ No newline at end of file diff --git a/Orleans.Streams.RabbitMqStreamProvider/RabbitMqAdapterFactory.cs b/Orleans.Streams.RabbitMqStreamProvider/RabbitMqAdapterFactory.cs index 46d150c..475f111 100755 --- a/Orleans.Streams.RabbitMqStreamProvider/RabbitMqAdapterFactory.cs +++ b/Orleans.Streams.RabbitMqStreamProvider/RabbitMqAdapterFactory.cs @@ -1,11 +1,11 @@ -using System; -using System.Threading.Tasks; -using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Orleans.Configuration; +using Orleans.Providers.Streams.Common; using Orleans.Serialization; using Orleans.Streams.BatchContainer; -using Orleans.Streams.Cache; +using System; +using System.Threading.Tasks; namespace Orleans.Streams { @@ -29,7 +29,7 @@ public RabbitMqAdapterFactory( if (serviceProvider == null) throw new ArgumentNullException(nameof(serviceProvider)); if (loggerFactory == null) throw new ArgumentNullException(nameof(loggerFactory)); - _cache = new ConcurrentQueueAdapterCache(cachingOptions.CacheSize); + _cache = new SimpleQueueAdapterCache(new SimpleQueueCacheOptions { CacheSize = cachingOptions.CacheSize }, providerName, loggerFactory); _mapper = new HashRingBasedStreamQueueMapper(new HashRingStreamQueueMapperOptions { TotalQueueCount = rmqOptions.NumberOfQueues }, rmqOptions.QueueNamePrefix); _failureHandler = Task.FromResult(new NoOpStreamDeliveryFailureHandler(false)); diff --git a/Orleans.Streams.RabbitMqStreamProvider/RabbitMqStreamBuilder.cs b/Orleans.Streams.RabbitMqStreamProvider/RabbitMqStreamBuilder.cs index 59060a1..154e669 100644 --- a/Orleans.Streams.RabbitMqStreamProvider/RabbitMqStreamBuilder.cs +++ b/Orleans.Streams.RabbitMqStreamProvider/RabbitMqStreamBuilder.cs @@ -1,5 +1,6 @@ using System; using Microsoft.Extensions.DependencyInjection; +using Orleans; using Orleans.Configuration; using Orleans.Hosting; using Orleans.Streams; @@ -7,81 +8,7 @@ namespace Orleans.Streaming { - public class SiloRabbitMqStreamConfigurator : SiloPersistentStreamConfigurator where TSerializer : IBatchContainerSerializer, new() - { - public SiloRabbitMqStreamConfigurator(string name, Action> configureDelegate, ISiloHostBuilder builder) - : base(name, configureDelegate, RabbitMqAdapterFactory.Create) - { - this.configureDelegate(services => - { - services.ConfigureNamedOptionForLogging(name) - .AddTransient(sp => new RabbitMqOptionsValidator(sp.GetOptionsByName(name), name)) - .ConfigureNamedOptionForLogging(name) - .AddTransient(sp => new CachingOptionsValidator(sp.GetOptionsByName(name), name)); - }); - } - public SiloRabbitMqStreamConfigurator ConfigureRabbitMq(string host, int port, string virtualHost, string user, string password, string queueName, bool useQueuePartitioning = RabbitMqOptions.DefaultUseQueuePartitioning, int numberOfQueues = RabbitMqOptions.DefaultNumberOfQueues) - { - Configure(ob => ob.Configure(options => - { - options.HostName = host; - options.Port = port; - options.VirtualHost = virtualHost; - options.UserName = user; - options.Password = password; - options.QueueNamePrefix = queueName; - options.UseQueuePartitioning = useQueuePartitioning; - options.NumberOfQueues = numberOfQueues; - })); - return this; - } - public SiloRabbitMqStreamConfigurator ConfigureCache(int cacheSize) - { - Configure(ob => ob.Configure(options => options.CacheSize = cacheSize)); - return this; - } - public SiloRabbitMqStreamConfigurator ConfigureCache(int cacheSize, TimeSpan cacheFillingTimeout) - { - Configure(ob => ob.Configure(options => - { - options.CacheSize = cacheSize; - options.CacheFillingTimeout = cacheFillingTimeout; - })); - return this; - } - } - - public class ClusterClientRabbitMqStreamConfigurator : ClusterClientPersistentStreamConfigurator where TSerializer : IBatchContainerSerializer, new() - { - public ClusterClientRabbitMqStreamConfigurator(string name, IClientBuilder builder) - : base(name, builder, RabbitMqAdapterFactory.Create) - { - clientBuilder - .ConfigureApplicationParts(parts => parts.AddFrameworkPart(typeof(RabbitMqAdapterFactory).Assembly)) - .ConfigureServices(services => - services.ConfigureNamedOptionForLogging(name) - .AddTransient(sp => new RabbitMqOptionsValidator(sp.GetOptionsByName(name), name)) - .ConfigureNamedOptionForLogging(name)); - - } - - public ClusterClientRabbitMqStreamConfigurator ConfigureRabbitMq(string host, int port, string virtualHost, string user, string password, string queueName, bool useQueuePartitioning = RabbitMqOptions.DefaultUseQueuePartitioning, int numberOfQueues = RabbitMqOptions.DefaultNumberOfQueues) - { - Configure(ob => ob.Configure(options => - { - options.HostName = host; - options.Port = port; - options.VirtualHost = virtualHost; - options.UserName = user; - options.Password = password; - options.QueueNamePrefix = queueName; - options.UseQueuePartitioning = useQueuePartitioning; - options.NumberOfQueues = numberOfQueues; - })); - return this; - } - } } \ No newline at end of file diff --git a/Orleans.Streams.RabbitMqStreamProvider/SiloBuilderExtensions.cs b/Orleans.Streams.RabbitMqStreamProvider/SiloBuilderExtensions.cs index 868fe11..fb37e4d 100644 --- a/Orleans.Streams.RabbitMqStreamProvider/SiloBuilderExtensions.cs +++ b/Orleans.Streams.RabbitMqStreamProvider/SiloBuilderExtensions.cs @@ -1,5 +1,10 @@ using System; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using Orleans.ApplicationParts; +using Orleans.Configuration; using Orleans.Streaming; +using Orleans.Streams; using Orleans.Streams.BatchContainer; namespace Orleans.Hosting @@ -23,4 +28,66 @@ public static class SiloBuilderExtensions public static ISiloHostBuilder AddRabbitMqStream(this ISiloHostBuilder builder, string name, Action> configure) => AddRabbitMqStream(builder, name, configure); } + + public interface IRabbitMQStreamConfigurator : INamedServiceConfigurator { } + + public class SiloRabbitMqStreamConfigurator : SiloPersistentStreamConfigurator, ISiloRabbitMQStreamConfigurator where TSerializer : IBatchContainerSerializer, new() + { + public SiloRabbitMqStreamConfigurator(string name, Action> configureDelegate, ISiloHostBuilder builder) + : base(name, configureDelegate, RabbitMqAdapterFactory.Create) + { + this.ConfigureComponent(SimpleQueueCacheOptionsValidator.Create); + builder.ConfigureApplicationParts(RabbitMQStreamConfiguratorCommon.AddParts); + + ConfigureDelegate(services => + { + services + .ConfigureNamedOptionForLogging(name) + .AddTransient(sp => new RabbitMqOptionsValidator(sp.GetOptionsByName(name), name)); + }); + } + } + + public static class RabbitMQStreamConfiguratorExtensions + { + public static void ConfigureRabbitMQ(this IRabbitMQStreamConfigurator configurator, Action> configureOptions) + { + configurator.Configure(configureOptions); + } + + public static void ConfigureRabbitMq(this IRabbitMQStreamConfigurator configurator, string host, int port, string virtualHost, string user, string password, string queueName, bool useQueuePartitioning = RabbitMqOptions.DefaultUseQueuePartitioning, int numberOfQueues = RabbitMqOptions.DefaultNumberOfQueues) + { + configurator.Configure(ob => ob.Configure(options => + { + options.HostName = host; + options.Port = port; + options.VirtualHost = virtualHost; + options.UserName = user; + options.Password = password; + options.QueueNamePrefix = queueName; + options.UseQueuePartitioning = useQueuePartitioning; + options.NumberOfQueues = numberOfQueues; + })); + } + + public static void ConfigureCache(this IRabbitMQStreamConfigurator configurator, int cacheSize) => + configurator.Configure(ob => ob.Configure(options => options.CacheSize = cacheSize)); + + public static void ConfigureCache(this IRabbitMQStreamConfigurator configurator, int cacheSize, TimeSpan cacheFillingTimeout) => + configurator.Configure(ob => ob.Configure(options => + { + options.CacheSize = cacheSize; + options.CacheFillingTimeout = cacheFillingTimeout; + })); + } + + public interface ISiloRabbitMQStreamConfigurator : IRabbitMQStreamConfigurator, ISiloPersistentStreamConfigurator { } + + public static class RabbitMQStreamConfiguratorCommon where TSerializer : IBatchContainerSerializer, new() + { + public static void AddParts(IApplicationPartManager parts) + { + parts.AddFrameworkPart(typeof(RabbitMqAdapterFactory).Assembly); + } + } } \ No newline at end of file diff --git a/nuget.config b/nuget.config new file mode 100644 index 0000000..7fc940a --- /dev/null +++ b/nuget.config @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file From 3431c802e0fcb412ef325db242e6ed40b6c9f18f Mon Sep 17 00:00:00 2001 From: Sergey Zorchenko Date: Thu, 17 Oct 2019 10:30:39 +0300 Subject: [PATCH 3/5] rc2 --- .../Orleans.Streams.RabbitMqStreamProvider.Tests.csproj | 8 +++----- .../RmqHelpers.cs | 8 ++++---- .../TestCluster.cs | 8 ++++---- .../ToxiProxyHelpers.cs | 2 +- .../Orleans.Streams.RabbitMqStreamProvider.csproj | 2 +- 5 files changed, 13 insertions(+), 15 deletions(-) diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/Orleans.Streams.RabbitMqStreamProvider.Tests.csproj b/Orleans.Streams.RabbitMqStreamProvider.Tests/Orleans.Streams.RabbitMqStreamProvider.Tests.csproj index 96ff9ce..02dc3d8 100644 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/Orleans.Streams.RabbitMqStreamProvider.Tests.csproj +++ b/Orleans.Streams.RabbitMqStreamProvider.Tests/Orleans.Streams.RabbitMqStreamProvider.Tests.csproj @@ -23,12 +23,12 @@ - + all runtime; build; native; contentfiles; analyzers; buildtransitive - - + + @@ -39,9 +39,7 @@ - - diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/RmqHelpers.cs b/Orleans.Streams.RabbitMqStreamProvider.Tests/RmqHelpers.cs index dbaa113..39d72af 100644 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/RmqHelpers.cs +++ b/Orleans.Streams.RabbitMqStreamProvider.Tests/RmqHelpers.cs @@ -15,11 +15,11 @@ public static void EnsureEmptyQueue() { var factory = new ConnectionFactory { - HostName = "localhost", - VirtualHost = "/", + HostName = "orlytest.golamago.online", + VirtualHost = "stream-test", Port = 5672, - UserName = "guest", - Password = "guest" + UserName = "lama-testing", + Password = "testing" }; try diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/TestCluster.cs b/Orleans.Streams.RabbitMqStreamProvider.Tests/TestCluster.cs index 86d6dfc..84ac866 100644 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/TestCluster.cs +++ b/Orleans.Streams.RabbitMqStreamProvider.Tests/TestCluster.cs @@ -120,7 +120,7 @@ public static ISiloHostBuilder ConfigureStreamsAndLogging(this ISiloHostBuilder .AddRabbitMqStream(Globals.StreamProviderNameDefault, configurator => { configurator.ConfigureRabbitMq(host: "localhost", port: ToxiProxyHelpers.RmqProxyPort, - virtualHost: "/", user: "guest", password: "guest", queueName: Globals.StreamNameSpaceDefault); + virtualHost: "stream-test", user: "lama-testing", password: "testing", queueName: Globals.StreamNameSpaceDefault); configurator.ConfigureCache(cacheSize: 100, cacheFillingTimeout: TimeSpan.FromSeconds(10)); configurator.ConfigureStreamPubSub(StreamPubSubType.ImplicitOnly); configurator.ConfigurePullingAgent(ob => ob.Configure( @@ -132,7 +132,7 @@ public static ISiloHostBuilder ConfigureStreamsAndLogging(this ISiloHostBuilder .AddRabbitMqStream(Globals.StreamProviderNameProtoBuf, configurator => { configurator.ConfigureRabbitMq(host: "localhost", port: ToxiProxyHelpers.RmqProxyPort, - virtualHost: "/", user: "guest", password: "guest", queueName: Globals.StreamNameSpaceProtoBuf); + virtualHost: "stream-test", user: "lama-testing", password: "testing", queueName: Globals.StreamNameSpaceProtoBuf); configurator.ConfigureCache(cacheSize: 100, cacheFillingTimeout: TimeSpan.FromSeconds(10)); configurator.ConfigureStreamPubSub(StreamPubSubType.ImplicitOnly); configurator.ConfigurePullingAgent(ob => ob.Configure( @@ -151,12 +151,12 @@ public static IClientBuilder ConfigureStreamsAndLogging(this IClientBuilder buil .AddRabbitMqStream(Globals.StreamProviderNameDefault, configurator => { configurator.ConfigureRabbitMq(host: "localhost", port: ToxiProxyHelpers.RmqProxyPort, - virtualHost: "/", user: "guest", password: "guest", queueName: Globals.StreamNameSpaceDefault); + virtualHost: "stream-test", user: "lama-testing", password: "testing", queueName: Globals.StreamNameSpaceDefault); }) .AddRabbitMqStream(Globals.StreamProviderNameProtoBuf, configurator => { configurator.ConfigureRabbitMq(host: "localhost", port: ToxiProxyHelpers.RmqProxyPort, - virtualHost: "/", user: "guest", password: "guest", queueName: Globals.StreamNameSpaceProtoBuf); + virtualHost: "stream-test", user: "lama-testing", password: "testing", queueName: Globals.StreamNameSpaceProtoBuf); }) .ConfigureLogging(log => log.AddConsole()); } diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/ToxiProxyHelpers.cs b/Orleans.Streams.RabbitMqStreamProvider.Tests/ToxiProxyHelpers.cs index 3bfcdd5..8101dfd 100644 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/ToxiProxyHelpers.cs +++ b/Orleans.Streams.RabbitMqStreamProvider.Tests/ToxiProxyHelpers.cs @@ -31,7 +31,7 @@ public static Process StartProxy() Name = RmqProxyName, Enabled = true, Listen = $"localhost:{RmqProxyPort}", - Upstream = $"localhost:{RmqPort}" + Upstream = $"orlytest.golamago.online:{RmqPort}" }); return proxyProcess; diff --git a/Orleans.Streams.RabbitMqStreamProvider/Orleans.Streams.RabbitMqStreamProvider.csproj b/Orleans.Streams.RabbitMqStreamProvider/Orleans.Streams.RabbitMqStreamProvider.csproj index 88eb63e..adb95c8 100755 --- a/Orleans.Streams.RabbitMqStreamProvider/Orleans.Streams.RabbitMqStreamProvider.csproj +++ b/Orleans.Streams.RabbitMqStreamProvider/Orleans.Streams.RabbitMqStreamProvider.csproj @@ -3,7 +3,7 @@ netstandard2.0 - + \ No newline at end of file From b0cee1a0ed06ceb594f9ff4b1385c80e4b488658 Mon Sep 17 00:00:00 2001 From: Sergey Zorchenko Date: Thu, 24 Oct 2019 10:18:36 +0300 Subject: [PATCH 4/5] added parallel consumers to test and changed back rmq connection parameters --- .../AggregatorGrain.cs | 16 +++++++++++----- .../IntegrationTestHelpers.cs | 10 ++++++---- .../Message.cs | 7 ++++++- .../ReceiverGrain.cs | 17 ++++++++++++----- .../RmqHelpers.cs | 8 ++++---- .../TestCluster.cs | 8 ++++---- .../BatchContainer/RabbitMqBatchContainer.cs | 5 ++--- 7 files changed, 45 insertions(+), 26 deletions(-) diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/AggregatorGrain.cs b/Orleans.Streams.RabbitMqStreamProvider.Tests/AggregatorGrain.cs index ca9dd44..8d492d4 100755 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/AggregatorGrain.cs +++ b/Orleans.Streams.RabbitMqStreamProvider.Tests/AggregatorGrain.cs @@ -15,7 +15,7 @@ public interface IAggregatorGrain : IGrainWithGuidKey Task MessageSent(Immutable message); Task MessageReceived(Immutable message); Task WereAllMessagesSent(Immutable messages); - Task WereAllSentAlsoDelivered(); + Task WereAllSentAlsoDelivered(int count); Task GetProcessingSilosCount(); Task GetAllSentMessages(); Task GetAllReceivedMessages(); @@ -45,14 +45,20 @@ public Task CleanUp() public Task MessageSent(Immutable message) { _logger.LogInformation($"MessageSent #{message.Value.Id} [{RuntimeIdentity}],[{IdentityString}] from thread {Thread.CurrentThread.Name}"); - _sentMessages.Add(message.Value.Id, message.Value); + if (_sentMessages.TryGetValue(message.Value.Id, out var value)) + value.Count++; + else + _sentMessages.Add(message.Value.Id, message.Value); return Task.CompletedTask; } public Task MessageReceived(Immutable message) { _logger.LogInformation($"MessageReceived #{message.Value.Id} [{RuntimeIdentity}],[{IdentityString}] from thread {Thread.CurrentThread.Name}"); - _receivedMessages.Add(message.Value.Id, message.Value); + if (_receivedMessages.TryGetValue(message.Value.Id, out var value)) + value.Count++; + else + _receivedMessages.Add(message.Value.Id, message.Value); return Task.CompletedTask; } @@ -63,11 +69,11 @@ public Task WereAllMessagesSent(Immutable messages) messages.Value.All(msg => _sentMessages.ContainsKey(msg.Id))); } - public Task WereAllSentAlsoDelivered() + public Task WereAllSentAlsoDelivered(int count) { return Task.FromResult( _sentMessages.Count == _receivedMessages.Count && - _sentMessages.Values.All(msg => _receivedMessages.ContainsKey(msg.Id) && _receivedMessages[msg.Id].Delivered)); + _sentMessages.Values.All(msg => _receivedMessages.TryGetValue(msg.Id, out var message) && message.Delivered && message.Count == count)); } public Task GetProcessingSilosCount() diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/IntegrationTestHelpers.cs b/Orleans.Streams.RabbitMqStreamProvider.Tests/IntegrationTestHelpers.cs index 75ef87e..dc00608 100644 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/IntegrationTestHelpers.cs +++ b/Orleans.Streams.RabbitMqStreamProvider.Tests/IntegrationTestHelpers.cs @@ -10,6 +10,8 @@ namespace RabbitMqStreamTests { internal static class IntegrationTestHelpers { + private const int ReceiversCount = 2; // Number of ReceiverGrain implicitly attached to stream + public static async Task TestRmqStreamProviderWithPrefilledQueue(this TestCluster cluster, Action setupProxyForSender, Action setupProxyForReceiver, int nMessages, int itersToWait, RmqSerializer serializer = RmqSerializer.Default) { await cluster.StopPullingAgents(); @@ -75,15 +77,15 @@ public static async Task TestRmqStreamProviderOnFly(this TestCluster cluster, Ac private static async Task AllMessagesSentAndDelivered(IAggregatorGrain aggregator, Message[] messages) => await aggregator.WereAllMessagesSent(messages.AsImmutable()) && - await aggregator.WereAllSentAlsoDelivered(); + await aggregator.WereAllSentAlsoDelivered(ReceiversCount); private static async Task PrintError(IAggregatorGrain aggregator, Message[] messages) { var sb = new StringBuilder(); sb.AppendLine("Expectation failed!"); - sb.AppendLine($" -expected: {string.Join(',', messages.OrderBy(m => m.Id).Select(m => m.Id))}"); - sb.AppendLine($" -sent : {string.Join(',', (await aggregator.GetAllSentMessages()).OrderBy(m => m.Id).Select(m => m.Id))}"); - sb.AppendLine($" -received: {string.Join(',', (await aggregator.GetAllReceivedMessages()).OrderBy(m => m.Id).Select(m => m.Id))}"); + sb.AppendLine($" -expected: {string.Join(',', messages.OrderBy(m => m.Id).Select(m => $"{m.Id} ({ReceiversCount})"))}"); + sb.AppendLine($" -sent : {string.Join(',', (await aggregator.GetAllSentMessages()).OrderBy(m => m.Id).Select(m => $"{m.Id} ({m.Count})"))}"); + sb.AppendLine($" -received: {string.Join(',', (await aggregator.GetAllReceivedMessages()).OrderBy(m => m.Id).Select(m => $"{m.Id} ({m.Count})"))}"); return sb.ToString(); } } diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/Message.cs b/Orleans.Streams.RabbitMqStreamProvider.Tests/Message.cs index cb895f4..1857b0d 100755 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/Message.cs +++ b/Orleans.Streams.RabbitMqStreamProvider.Tests/Message.cs @@ -19,6 +19,9 @@ public class Message [ProtoMember(4)] public readonly string ProcessedBy; + [ProtoMember(5)] + public int Count = 1; + public Message() { } @@ -45,7 +48,8 @@ public override bool Equals(object obj) return msg != null && Id == msg.Id && Delivered == msg.Delivered - && ProcessedBy == msg.ProcessedBy; + && ProcessedBy == msg.ProcessedBy + && Count == msg.Count; } public override int GetHashCode() @@ -56,6 +60,7 @@ public override int GetHashCode() hashCode = (hashCode * 397) ^ WorkTimeOutMillis.GetHashCode(); hashCode = (hashCode * 397) ^ Delivered.GetHashCode(); hashCode = (hashCode * 397) ^ (ProcessedBy?.GetHashCode() ?? 0); + hashCode = (hashCode * 397) ^ Count.GetHashCode(); return hashCode; } } diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/ReceiverGrain.cs b/Orleans.Streams.RabbitMqStreamProvider.Tests/ReceiverGrain.cs index 112676b..1c6eafa 100755 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/ReceiverGrain.cs +++ b/Orleans.Streams.RabbitMqStreamProvider.Tests/ReceiverGrain.cs @@ -9,13 +9,20 @@ namespace RabbitMqStreamTests { - public interface IReceiverGrain : IGrainWithGuidKey - { - } + public interface IFirstReceiverGrain : IGrainWithGuidKey { } + public interface ISecondReceiverGrain : IGrainWithGuidKey { } + + [ImplicitStreamSubscription(Globals.StreamNameSpaceDefault)] + [ImplicitStreamSubscription(Globals.StreamNameSpaceProtoBuf)] + public class FirstReceiverGrain : ReceiverGrain, IFirstReceiverGrain + { } [ImplicitStreamSubscription(Globals.StreamNameSpaceDefault)] [ImplicitStreamSubscription(Globals.StreamNameSpaceProtoBuf)] - public class ReceiverGrain : Grain, IReceiverGrain + public class SecondReceiverGrain : ReceiverGrain, ISecondReceiverGrain + { } + + public abstract class ReceiverGrain : Grain { private ILogger _logger; private StreamSubscriptionHandle _subscriptionDefault; @@ -24,7 +31,7 @@ public class ReceiverGrain : Grain, IReceiverGrain public override async Task OnActivateAsync() { await base.OnActivateAsync(); - _logger = ServiceProvider.GetRequiredService().CreateLogger($"{typeof(ReceiverGrain).FullName}.{this.GetPrimaryKey()}"); + _logger = ServiceProvider.GetRequiredService().CreateLogger($"{GetType().FullName}.{this.GetPrimaryKey()}"); _logger.LogInformation($"OnActivateAsync [{RuntimeIdentity}],[{IdentityString}][{this.GetPrimaryKey()}] from thread {Thread.CurrentThread.Name}"); _subscriptionDefault = await GetStreamProvider(Globals.StreamProviderNameDefault) .GetStream(this.GetPrimaryKey(), Globals.StreamNameSpaceDefault) diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/RmqHelpers.cs b/Orleans.Streams.RabbitMqStreamProvider.Tests/RmqHelpers.cs index 39d72af..dbaa113 100644 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/RmqHelpers.cs +++ b/Orleans.Streams.RabbitMqStreamProvider.Tests/RmqHelpers.cs @@ -15,11 +15,11 @@ public static void EnsureEmptyQueue() { var factory = new ConnectionFactory { - HostName = "orlytest.golamago.online", - VirtualHost = "stream-test", + HostName = "localhost", + VirtualHost = "/", Port = 5672, - UserName = "lama-testing", - Password = "testing" + UserName = "guest", + Password = "guest" }; try diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/TestCluster.cs b/Orleans.Streams.RabbitMqStreamProvider.Tests/TestCluster.cs index b401efd..7ce792d 100644 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/TestCluster.cs +++ b/Orleans.Streams.RabbitMqStreamProvider.Tests/TestCluster.cs @@ -120,7 +120,7 @@ public static ISiloHostBuilder ConfigureStreamsAndLogging(this ISiloHostBuilder .AddRabbitMqStream(Globals.StreamProviderNameDefault, configurator => { configurator.ConfigureRabbitMq(host: "localhost", port: ToxiProxyHelpers.RmqProxyPort, - virtualHost: "stream-test", user: "lama-testing", password: "testing", queueName: Globals.StreamNameSpaceDefault); + virtualHost: "/", user: "guest", password: "guest", queueName: Globals.StreamNameSpaceDefault); configurator.ConfigureCache(cacheSize: 100, cacheFillingTimeout: TimeSpan.FromSeconds(10)); configurator.ConfigureStreamPubSub(StreamPubSubType.ImplicitOnly); configurator.ConfigurePullingAgent(ob => ob.Configure( @@ -132,7 +132,7 @@ public static ISiloHostBuilder ConfigureStreamsAndLogging(this ISiloHostBuilder .AddRabbitMqStream(Globals.StreamProviderNameProtoBuf, configurator => { configurator.ConfigureRabbitMq(host: "localhost", port: ToxiProxyHelpers.RmqProxyPort, - virtualHost: "stream-test", user: "lama-testing", password: "testing", queueName: Globals.StreamNameSpaceProtoBuf); + virtualHost: "/", user: "guest", password: "guest", queueName: Globals.StreamNameSpaceDefault); configurator.ConfigureCache(cacheSize: 100, cacheFillingTimeout: TimeSpan.FromSeconds(10)); configurator.ConfigureStreamPubSub(StreamPubSubType.ImplicitOnly); configurator.ConfigurePullingAgent(ob => ob.Configure( @@ -155,12 +155,12 @@ public static IClientBuilder ConfigureStreamsAndLogging(this IClientBuilder buil .AddRabbitMqStream(Globals.StreamProviderNameDefault, configurator => { configurator.ConfigureRabbitMq(host: "localhost", port: ToxiProxyHelpers.RmqProxyPort, - virtualHost: "stream-test", user: "lama-testing", password: "testing", queueName: Globals.StreamNameSpaceDefault); + virtualHost: "/", user: "guest", password: "guest", queueName: Globals.StreamNameSpaceDefault); }) .AddRabbitMqStream(Globals.StreamProviderNameProtoBuf, configurator => { configurator.ConfigureRabbitMq(host: "localhost", port: ToxiProxyHelpers.RmqProxyPort, - virtualHost: "stream-test", user: "lama-testing", password: "testing", queueName: Globals.StreamNameSpaceProtoBuf); + virtualHost: "/", user: "guest", password: "guest", queueName: Globals.StreamNameSpaceDefault); }) .ConfigureLogging(log => log .ClearProviders() diff --git a/Orleans.Streams.RabbitMqStreamProvider/BatchContainer/RabbitMqBatchContainer.cs b/Orleans.Streams.RabbitMqStreamProvider/BatchContainer/RabbitMqBatchContainer.cs index 116dfb3..9f44899 100644 --- a/Orleans.Streams.RabbitMqStreamProvider/BatchContainer/RabbitMqBatchContainer.cs +++ b/Orleans.Streams.RabbitMqStreamProvider/BatchContainer/RabbitMqBatchContainer.cs @@ -40,8 +40,7 @@ public IEnumerable> GetEvents() .Select((e, i) => Tuple.Create(e, EventSequenceToken?.CreateSequenceTokenForEvent(i))) .ToList(); - // TODO: Раскопать, почему при нормальной обработке фильтра он доставляет только одному подписчику - public bool ShouldDeliver(IStreamIdentity stream, object filterData, StreamFilterPredicate shouldReceiveFunc) => true; - // _events.Any(item => shouldReceiveFunc(stream, filterData, item)); + public bool ShouldDeliver(IStreamIdentity stream, object filterData, StreamFilterPredicate shouldReceiveFunc) => + _events.Any(item => shouldReceiveFunc(stream, filterData, item)); } } \ No newline at end of file From 16d17a250f6cc39932605927d410a9222cecc3ea Mon Sep 17 00:00:00 2001 From: Sergey Zorchenko Date: Tue, 19 Nov 2019 12:35:28 +0300 Subject: [PATCH 5/5] orleans 3.0 release --- ...leans.Streams.RabbitMqStreamProvider.Tests.csproj | 12 ++++++------ .../Orleans.Streams.RabbitMqStreamProvider.csproj | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/Orleans.Streams.RabbitMqStreamProvider.Tests/Orleans.Streams.RabbitMqStreamProvider.Tests.csproj b/Orleans.Streams.RabbitMqStreamProvider.Tests/Orleans.Streams.RabbitMqStreamProvider.Tests.csproj index f3b87df..075e1ff 100644 --- a/Orleans.Streams.RabbitMqStreamProvider.Tests/Orleans.Streams.RabbitMqStreamProvider.Tests.csproj +++ b/Orleans.Streams.RabbitMqStreamProvider.Tests/Orleans.Streams.RabbitMqStreamProvider.Tests.csproj @@ -11,15 +11,15 @@ - + all runtime; build; native; contentfiles; analyzers - - - - - + + + + + diff --git a/Orleans.Streams.RabbitMqStreamProvider/Orleans.Streams.RabbitMqStreamProvider.csproj b/Orleans.Streams.RabbitMqStreamProvider/Orleans.Streams.RabbitMqStreamProvider.csproj index bd0bf6d..926e10a 100644 --- a/Orleans.Streams.RabbitMqStreamProvider/Orleans.Streams.RabbitMqStreamProvider.csproj +++ b/Orleans.Streams.RabbitMqStreamProvider/Orleans.Streams.RabbitMqStreamProvider.csproj @@ -3,7 +3,7 @@ netstandard2.0 - + \ No newline at end of file