Skip to content

Commit

Permalink
More adjustments to the projections between JasperFx and Marten
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremydmiller committed Dec 6, 2024
1 parent 660f1a8 commit 23e89f5
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 1 deletion.
93 changes: 93 additions & 0 deletions src/JasperFx/Events/Grouping/EventSlicer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
namespace JasperFx.Events.Grouping;

public class EventSlicer<TDoc, TId>: IEventSlicer<TDoc, TId>
{
private readonly List<Action<SliceGroup<TDoc, TId>, IReadOnlyList<IEvent>>> _configurations = new();
private readonly List<IFanOutRule> _afterGroupingFanoutRules = new();
private readonly List<IFanOutRule> _beforeGroupingFanoutRules = new();

public ValueTask SliceAsync(IReadOnlyList<IEvent> events, SliceGroup<TDoc, TId> grouping)
{
grouping.ApplyFanOutRules(_beforeGroupingFanoutRules);

foreach (var configuration in _configurations)
{
configuration(grouping, events);
}

grouping.ApplyFanOutRules(_afterGroupingFanoutRules);

return new ValueTask();
}

public bool HasAnyRules()
{
return _configurations.Any();
}

public IEnumerable<Type> DetermineEventTypes()
{
foreach (var rule in _beforeGroupingFanoutRules) yield return rule.OriginatingType;

foreach (var rule in _afterGroupingFanoutRules) yield return rule.OriginatingType;
}

public EventSlicer<TDoc, TId> Identity<TEvent>(Func<TEvent, TId> identityFunc)
{
_configurations.Add((group, events) => group.AddEvents(identityFunc, events));

return this;
}

public EventSlicer<TDoc, TId> Identities<TEvent>(Func<TEvent, IReadOnlyList<TId>> identitiesFunc)
{
_configurations.Add((group, events) => group.AddEvents(identitiesFunc, events));

return this;
}

/// <summary>
/// Apply "fan out" operations to the given TEvent type that inserts an enumerable of TChild events right behind the
/// parent
/// event in the event stream
/// </summary>
/// <param name="fanOutFunc"></param>
/// <param name="mode">Should the fan out operation happen after grouping, or before? Default is after</param>
/// <typeparam name="TEvent"></typeparam>
/// <typeparam name="TChild"></typeparam>
public EventSlicer<TDoc, TId> FanOut<TEvent, TChild>(Func<TEvent, IEnumerable<TChild>> fanOutFunc,
FanoutMode mode = FanoutMode.AfterGrouping)
{
return FanOut(new FanOutEventDataOperator<TEvent, TChild>(fanOutFunc) { Mode = mode }, mode);
}

/// <summary>
/// Apply "fan out" operations to the given TEvent type that inserts an enumerable of TChild events right behind the
/// parent
/// event in the event stream
/// </summary>
/// <param name="fanOutFunc"></param>
/// <param name="mode">Should the fan out operation happen after grouping, or before? Default is after</param>
/// <typeparam name="TEvent"></typeparam>
/// <typeparam name="TChild"></typeparam>
public EventSlicer<TDoc, TId> FanOut<TEvent, TChild>(Func<IEvent<TEvent>, IEnumerable<TChild>> fanOutFunc, FanoutMode mode = FanoutMode.AfterGrouping)
{
return FanOut(new FanOutEventOperator<TEvent, TChild>(fanOutFunc) { Mode = mode }, mode);
}

private EventSlicer<TDoc, TId> FanOut(IFanOutRule fanout, FanoutMode mode)
{
switch (mode)
{
case FanoutMode.AfterGrouping:
_afterGroupingFanoutRules.Add(fanout);
break;

case FanoutMode.BeforeGrouping:
_beforeGroupingFanoutRules.Add(fanout);
break;
}

return this;
}
}
2 changes: 2 additions & 0 deletions src/JasperFx/Events/Grouping/SliceGroup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

namespace JasperFx.Events.Grouping;

// TODO -- make sure the methods can handle IEvent too because duh

/// <summary>
/// Structure to hold and help organize events in "slices" by identity to apply
/// to the matching aggregate document TDoc. Note that TDoc might be a marker type.
Expand Down
11 changes: 11 additions & 0 deletions src/JasperFx/Events/Grouping/TenantRollupSlicer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#nullable enable
namespace JasperFx.Events.Grouping;

public class TenantRollupSlicer<TDoc>: IEventSlicer<TDoc, string>
{
public ValueTask SliceAsync(IReadOnlyList<IEvent> events, SliceGroup<TDoc, string> grouping)
{
grouping.AddEvents<IEvent>(e => e.TenantId, events);
return new ValueTask();
}
}
3 changes: 2 additions & 1 deletion src/JasperFx/Events/NewStuff/NewProjectionCode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public interface IProjectionSource<TOperations, TStore, TDatabase>: IReadOnlyPro
bool TryBuildReplayExecutor(TStore store, TDatabase database, out IReplayExecutor executor);

IInlineProjection<TOperations> BuildForInline();

}

public interface ISubscriptionSource<TStore, TDatabase>
Expand All @@ -77,7 +78,7 @@ public interface ISubscriptionSource<TStore, TDatabase>
}

// Assuming that DocumentStore et al will be embedded into this
public interface IAsyncShard<TDatabase>
public interface IAsyncShard<TDatabase> // might have a subclass for projections
{
AsyncOptions Options { get; }
ShardRole Role { get; }
Expand Down
14 changes: 14 additions & 0 deletions src/JasperFx/Events/Projections/EventRange.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ namespace JasperFx.Events.Projections;
/// </summary>
public class EventRange
{
public EventRange(ShardName name, long floor, long ceiling)
{
ShardName = name;
SequenceFloor = floor;
SequenceCeiling = ceiling;
}

public EventRange(ISubscriptionAgent agent, long floor, long ceiling)
{
ShardName = agent.Name;
Expand All @@ -23,6 +30,12 @@ public EventRange(ISubscriptionAgent agent, long ceiling)
Agent = agent;
SequenceCeiling = ceiling;
}

public EventRange(ShardName shardName, long ceiling)
{
ShardName = shardName;
SequenceCeiling = ceiling;
}

/// <summary>
/// Identifies the projection shard consuming this event range
Expand Down Expand Up @@ -65,6 +78,7 @@ public async ValueTask SliceAsync(IEventSlicer slicer)

private readonly List<object> _groups = new();


public IReadOnlyList<object> Groups => _groups;

protected bool Equals(EventRange other)
Expand Down

0 comments on commit 23e89f5

Please sign in to comment.