diff --git a/src/Be.Vlaanderen.Basisregisters.AggregateSource.Testing.SqlStreamStore/StreamStoreFactRepository.cs b/src/Be.Vlaanderen.Basisregisters.AggregateSource.Testing.SqlStreamStore/StreamStoreFactRepository.cs index b229433..7145f98 100755 --- a/src/Be.Vlaanderen.Basisregisters.AggregateSource.Testing.SqlStreamStore/StreamStoreFactRepository.cs +++ b/src/Be.Vlaanderen.Basisregisters.AggregateSource.Testing.SqlStreamStore/StreamStoreFactRepository.cs @@ -3,7 +3,6 @@ namespace Be.Vlaanderen.Basisregisters.AggregateSource.Testing.SqlStreamStore using System; using System.Collections.Generic; using System.Linq; - using System.Reflection; using System.Threading.Tasks; using EventHandling; using global::SqlStreamStore; @@ -31,7 +30,21 @@ public StreamStoreFactRepository( public async Task RetrieveFacts(long fromPositionExclusive) { var results = new List(); - var page = await _streamStore.ReadAllForwards(fromPositionExclusive<0?Position.Start:fromPositionExclusive, 10); + var page = await _streamStore.ReadAllForwards(fromPositionExclusive < 0 ? Position.Start : fromPositionExclusive, 10); + results.AddRange(page.Messages.Where(m => m.Position != fromPositionExclusive).Select(MapToFact)); + while (!page.IsEnd) + { + page = await page.ReadNext(); + results.AddRange(page.Messages.Where(m => m.Position != fromPositionExclusive).Select(MapToFact)); + } + + return results.ToArray(); + } + + public async Task RetrieveFactsByStream(long fromPositionExclusive, string aggregateIdentifier) + { + var results = new List(); + var page = await _streamStore.ReadStreamForwards(new StreamId(aggregateIdentifier), fromPositionExclusive < 0 ? Convert.ToInt32(Position.Start) : Convert.ToInt32(fromPositionExclusive), 10); results.AddRange(page.Messages.Where(m => m.Position != fromPositionExclusive).Select(MapToFact)); while (!page.IsEnd) { @@ -57,7 +70,7 @@ public async Task PersistFacts(Fact[] facts) _eventMapping.GetEventName(e.Event.GetType()), _eventSerializer.SerializeObject(e.Event))).ToArray()); - return result?.CurrentPosition??await _streamStore.ReadHeadPosition(); + return result?.CurrentPosition ?? await _streamStore.ReadHeadPosition(); } private Fact MapToFact(StreamMessage streamMessage) diff --git a/src/Be.Vlaanderen.Basisregisters.AggregateSource.Testing/EventCentricTestSpecificationRunner.cs b/src/Be.Vlaanderen.Basisregisters.AggregateSource.Testing/EventCentricTestSpecificationRunner.cs index 4e8ab53..4d82054 100755 --- a/src/Be.Vlaanderen.Basisregisters.AggregateSource.Testing/EventCentricTestSpecificationRunner.cs +++ b/src/Be.Vlaanderen.Basisregisters.AggregateSource.Testing/EventCentricTestSpecificationRunner.cs @@ -32,7 +32,10 @@ private async Task RunAsync(EventCentricTestSpecificatio if (result.HasValue) return spec.Fail(result.Value); - var actualEvents = await _factReader.RetrieveFacts(position); + var actualEvents = + spec.Thens.Length == 0 + ? await _factReader.RetrieveFacts(position) + : await _factReader.RetrieveFactsByStream(position, spec.Thens.First().Identifier); return actualEvents.SequenceEqual(spec.Thens, new WrappedFactComparerEqualityComparer(_comparer)) ? spec.Pass(actualEvents) diff --git a/src/Be.Vlaanderen.Basisregisters.AggregateSource.Testing/IFactReader.cs b/src/Be.Vlaanderen.Basisregisters.AggregateSource.Testing/IFactReader.cs index f8204c8..b921ae9 100755 --- a/src/Be.Vlaanderen.Basisregisters.AggregateSource.Testing/IFactReader.cs +++ b/src/Be.Vlaanderen.Basisregisters.AggregateSource.Testing/IFactReader.cs @@ -5,5 +5,6 @@ namespace Be.Vlaanderen.Basisregisters.AggregateSource.Testing public interface IFactReader { Task RetrieveFacts(long fromPositionExclusive); + Task RetrieveFactsByStream(long fromPositionExclusive, string aggregateIdentifier); } }