Skip to content

Commit

Permalink
fix: testing aggregate 'Then' with identifier filters out the specifi…
Browse files Browse the repository at this point in the history
…c aggregate
  • Loading branch information
Arne Dumarey authored and ArneD committed Apr 9, 2021
1 parent d5b2c7f commit 01d79f7
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ namespace Be.Vlaanderen.Basisregisters.AggregateSource.Testing.SqlStreamStore
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using EventHandling;
using global::SqlStreamStore;
Expand Down Expand Up @@ -31,7 +30,21 @@ public StreamStoreFactRepository(
public async Task<Fact[]> RetrieveFacts(long fromPositionExclusive)
{
var results = new List<Fact>();
var page = await _streamStore.ReadAllForwards(fromPositionExclusive<0?Position.Start:fromPositionExclusive, 10);
var page = await _streamStore.ReadAllForwards(fromPositionExclusive < 0 ? Position.Start : fromPositionExclusive, 10);
results.AddRange(page.Messages.Where(m => m.Position != fromPositionExclusive).Select(MapToFact));
while (!page.IsEnd)
{
page = await page.ReadNext();
results.AddRange(page.Messages.Where(m => m.Position != fromPositionExclusive).Select(MapToFact));
}

return results.ToArray();
}

public async Task<Fact[]> RetrieveFactsByStream(long fromPositionExclusive, string aggregateIdentifier)
{
var results = new List<Fact>();
var page = await _streamStore.ReadStreamForwards(new StreamId(aggregateIdentifier), fromPositionExclusive < 0 ? Convert.ToInt32(Position.Start) : Convert.ToInt32(fromPositionExclusive), 10);
results.AddRange(page.Messages.Where(m => m.Position != fromPositionExclusive).Select(MapToFact));
while (!page.IsEnd)
{
Expand All @@ -57,7 +70,7 @@ public async Task<long> PersistFacts(Fact[] facts)
_eventMapping.GetEventName(e.Event.GetType()),
_eventSerializer.SerializeObject(e.Event))).ToArray());

return result?.CurrentPosition??await _streamStore.ReadHeadPosition();
return result?.CurrentPosition ?? await _streamStore.ReadHeadPosition();
}

private Fact MapToFact(StreamMessage streamMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ private async Task<EventCentricTestResult> RunAsync(EventCentricTestSpecificatio
if (result.HasValue)
return spec.Fail(result.Value);

var actualEvents = await _factReader.RetrieveFacts(position);
var actualEvents =
spec.Thens.Length == 0
? await _factReader.RetrieveFacts(position)
: await _factReader.RetrieveFactsByStream(position, spec.Thens.First().Identifier);

return actualEvents.SequenceEqual(spec.Thens, new WrappedFactComparerEqualityComparer(_comparer))
? spec.Pass(actualEvents)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ namespace Be.Vlaanderen.Basisregisters.AggregateSource.Testing
public interface IFactReader
{
Task<Fact[]> RetrieveFacts(long fromPositionExclusive);
Task<Fact[]> RetrieveFactsByStream(long fromPositionExclusive, string aggregateIdentifier);
}
}

0 comments on commit 01d79f7

Please sign in to comment.