Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/SIL.Harmony.Tests/DataModelTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void SetCurrentDate(DateTime dateTime)

public async ValueTask<Commit> WriteNextChange(IChange change, bool add = true)
{
return await WriteChange(_localClientId, NextDate(), change, add);
return await WriteChange(clientId: _localClientId, NextDate(), change, add);
}

public async ValueTask<Commit> WriteNextChange(IEnumerable<IChange> changes, bool add = true)
Expand Down
6 changes: 1 addition & 5 deletions src/SIL.Harmony/Changes/ChangeContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,15 @@ internal class ChangeContext : IChangeContext
private readonly SnapshotWorker _worker;
private readonly CrdtConfig _crdtConfig;

internal ChangeContext(Commit commit, int commitIndex, IDictionary<Guid, ObjectSnapshot> intermediateSnapshots, SnapshotWorker worker, CrdtConfig crdtConfig)
internal ChangeContext(Commit commit, SnapshotWorker worker, CrdtConfig crdtConfig)
{
_worker = worker;
_crdtConfig = crdtConfig;
Commit = commit;
CommitIndex = commitIndex;
IntermediateSnapshots = intermediateSnapshots;
}

CommitBase IChangeContext.Commit => Commit;
public Commit Commit { get; }
public int CommitIndex { get; }
public IDictionary<Guid, ObjectSnapshot> IntermediateSnapshots { get; }
public async ValueTask<IObjectSnapshot?> GetSnapshot(Guid entityId) => await _worker.GetSnapshot(entityId);
public IAsyncEnumerable<object> GetObjectsReferencing(Guid entityId, bool includeDeleted = false)
{
Expand Down
8 changes: 8 additions & 0 deletions src/SIL.Harmony/Db/DbSetExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ public static IQueryable<ObjectSnapshot> DefaultOrderDescending(this IQueryable<
.ThenByDescending(c => c.Commit.Id);
}

public static IEnumerable<ObjectSnapshot> DefaultOrder(this IEnumerable<ObjectSnapshot> queryable)
{
return queryable
.OrderBy(c => c.Commit.HybridDateTime.DateTime)
.ThenBy(c => c.Commit.HybridDateTime.Counter)
.ThenBy(c => c.Commit.Id);
}

public static IEnumerable<ObjectSnapshot> DefaultOrderDescending(this IEnumerable<ObjectSnapshot> queryable)
{
return queryable
Expand Down
143 changes: 70 additions & 73 deletions src/SIL.Harmony/SnapshotWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,43 @@

namespace SIL.Harmony;

internal class PendingSnapshots
{
private readonly Dictionary<Guid, ObjectSnapshot> _entitySnapshots = [];
private readonly List<ObjectSnapshot> _snapshots = [];


internal PendingSnapshots(Dictionary<Guid, ObjectSnapshot> entitySnapshots)
{
_entitySnapshots = entitySnapshots;
_snapshots = [.. entitySnapshots.Values.DefaultOrder()];
}

public void AddCurrentSnapshot(ObjectSnapshot snapshot)
{
// we only support one snapshot per entity per commit
_snapshots.RemoveAll(s => s.EntityId == snapshot.EntityId && s.CommitId == snapshot.CommitId);
_snapshots.Add(snapshot);
_entitySnapshots[snapshot.EntityId] = snapshot;
}

public ObjectSnapshot? GetSnapshot(Guid entityId)
{
return _entitySnapshots.GetValueOrDefault(entityId);
}


public IEnumerable<ObjectSnapshot> GetSnapshots()
{
return _snapshots;
}

public IEnumerable<ObjectSnapshot> GetLatestSnapshots()
{
return _entitySnapshots.Values;
}
}

/// <summary>
/// helper service to update snapshots and apply commits to them, has mutable state, don't reuse
/// </summary>
Expand All @@ -13,16 +50,14 @@ internal class SnapshotWorker
private readonly Dictionary<Guid, Guid?> _snapshotLookup;
private readonly CrdtRepository _crdtRepository;
private readonly CrdtConfig _crdtConfig;
private readonly Dictionary<Guid, ObjectSnapshot> _pendingSnapshots = [];
private readonly Dictionary<Guid, ObjectSnapshot> _rootSnapshots = [];
private readonly List<ObjectSnapshot> _newIntermediateSnapshots = [];
private readonly PendingSnapshots _pendingSnapshots = new([]);

private SnapshotWorker(Dictionary<Guid, ObjectSnapshot> snapshots,
Dictionary<Guid, Guid?> snapshotLookup,
CrdtRepository crdtRepository,
CrdtConfig crdtConfig)
{
_pendingSnapshots = snapshots;
_pendingSnapshots = new PendingSnapshots(snapshots);
_crdtRepository = crdtRepository;
_snapshotLookup = snapshotLookup;
_crdtConfig = crdtConfig;
Expand All @@ -45,21 +80,38 @@ internal static async Task<Dictionary<Guid, ObjectSnapshot>> ApplyCommitsToSnaps
/// <param name="crdtConfig"></param>
internal SnapshotWorker(Dictionary<Guid, Guid?> snapshotLookup,
CrdtRepository crdtRepository,
CrdtConfig crdtConfig): this([], snapshotLookup, crdtRepository, crdtConfig)
CrdtConfig crdtConfig) : this([], snapshotLookup, crdtRepository, crdtConfig)
{
}

public async Task UpdateSnapshots(Commit oldestAddedCommit, Commit[] newCommits)
{
var previousCommit = await _crdtRepository.FindPreviousCommit(oldestAddedCommit);
var commits = await _crdtRepository.GetCommitsAfter(previousCommit);
await ApplyCommitChanges(commits.UnionBy(newCommits, c => c.Id), true, previousCommit?.Hash ?? CommitBase.NullParentHash);
var allCommits = commits.UnionBy(newCommits, c => c.Id).DefaultOrder().ToArray();
await ApplyCommitChanges(allCommits, true, previousCommit?.Hash ?? CommitBase.NullParentHash);

var seenEntities = new HashSet<Guid>();
var snapshots = _pendingSnapshots.GetSnapshots()
.Reverse() // ensure we see the latest snapshots first
.Where(s =>
{
if (!seenEntities.Contains(s.EntityId))
{
seenEntities.Add(s.EntityId);
return true;
}

if (s.IsRoot) return true;

await _crdtRepository.AddSnapshots([
.._rootSnapshots.Values,
.._newIntermediateSnapshots,
.._pendingSnapshots.Values
]);
var commitIndex = Array.IndexOf(allCommits, s.Commit) + 1;
return commitIndex % 2 == 0;
})
// reverse back to original order, so snapshot data is more intuitive
// (the repository sorts them as well, but only by commit. This reverse seems to keep snapshots within a single commit in the order they were made)
.Reverse();

await _crdtRepository.AddSnapshots(snapshots);
}

private async ValueTask ApplyCommitChanges(IEnumerable<Commit> commits, bool updateCommitHash, string? previousCommitHash)
Expand All @@ -80,7 +132,7 @@ private async ValueTask ApplyCommitChanges(IEnumerable<Commit> commits, bool upd
{
IObjectBase entity;
var prevSnapshot = await GetSnapshot(commitChange.EntityId);
var changeContext = new ChangeContext(commit, commitIndex, intermediateSnapshots, this, _crdtConfig);
var changeContext = new ChangeContext(commit, this, _crdtConfig);
bool wasDeleted;
if (prevSnapshot is not null)
{
Expand All @@ -100,11 +152,9 @@ private async ValueTask ApplyCommitChanges(IEnumerable<Commit> commits, bool upd
{
await MarkDeleted(entity.Id, changeContext);
}

await GenerateSnapshotForEntity(entity, prevSnapshot, changeContext);
}
_newIntermediateSnapshots.AddRange(intermediateSnapshots.Values);
intermediateSnapshots.Clear();
}
}

Expand Down Expand Up @@ -141,15 +191,8 @@ private async ValueTask MarkDeleted(Guid deletedEntityId, ChangeContext context)

public async ValueTask<ObjectSnapshot?> GetSnapshot(Guid entityId)
{
if (_pendingSnapshots.TryGetValue(entityId, out var snapshot))
{
return snapshot;
}

if (_rootSnapshots.TryGetValue(entityId, out var rootSnapshot))
{
return rootSnapshot;
}
var snapshot = _pendingSnapshots.GetSnapshot(entityId);
if (snapshot is not null) return snapshot;

if (_snapshotLookup.TryGetValue(entityId, out var snapshotId))
{
Expand All @@ -173,24 +216,17 @@ internal async IAsyncEnumerable<ObjectSnapshot> GetSnapshotsWhere(Expression<Fun
var predicate = predicateExpression.Compile();

// foreaches ordered by most to least up-to-date, so we don't return snapshots that are out of date
foreach (var snapshot in _pendingSnapshots.Values
foreach (var snapshot in _pendingSnapshots.GetLatestSnapshots()
.Where(predicate))
{
yield return snapshot;
}

foreach (var snapshot in _rootSnapshots.Values
.Where(predicate)
.Where(s => !_pendingSnapshots.ContainsKey(s.EntityId)))
{
yield return snapshot;
}

await foreach (var snapshot in _crdtRepository.CurrentSnapshots()
.Where(predicateExpression)
.AsAsyncEnumerable())
{
if (_pendingSnapshots.ContainsKey(snapshot.EntityId) || _rootSnapshots.ContainsKey(snapshot.EntityId))
if (_pendingSnapshots.GetSnapshot(snapshot.EntityId) is not null)
continue;
yield return snapshot;
}
Expand All @@ -210,48 +246,9 @@ private async Task GenerateSnapshotForEntity(IObjectBase entity, ObjectSnapshot?
//when both snapshots are for the same commit we don't want to keep the previous, therefore the new snapshot should be root
var isRoot = prevSnapshot is null || (prevSnapshot.IsRoot && prevSnapshot.CommitId == context.Commit.Id);
var newSnapshot = new ObjectSnapshot(entity, context.Commit, isRoot);
//if both snapshots are for the same commit then we don't want to keep the previous snapshot
if (prevSnapshot is null || prevSnapshot.CommitId == context.Commit.Id)
{
//do nothing, will cause prevSnapshot to be overriden in _pendingSnapshots if it exists
}
else if (context.CommitIndex % 2 == 0 && !prevSnapshot.IsRoot && IsNew(prevSnapshot))
{
context.IntermediateSnapshots[prevSnapshot.Entity.Id] = prevSnapshot;
}

await _crdtConfig.BeforeSaveObject.Invoke(entity.DbObject, newSnapshot);

AddSnapshot(newSnapshot);
}

private void AddSnapshot(ObjectSnapshot snapshot)
{
if (snapshot.IsRoot)
{
_rootSnapshots[snapshot.Entity.Id] = snapshot;
}
else
{
//if there was already a pending snapshot there's no need to store it as both may point to the same commit
_pendingSnapshots[snapshot.Entity.Id] = snapshot;
}
}

/// <summary>
/// snapshot is not from the database
/// </summary>
private bool IsNew(ObjectSnapshot snapshot)
{
var entityId = snapshot.EntityId;
if (_pendingSnapshots.TryGetValue(entityId, out var pendingSnapshot))
{
return pendingSnapshot.Id == snapshot.Id;
}
if (_rootSnapshots.TryGetValue(entityId, out var rootSnapshot))
{
return rootSnapshot.Id == snapshot.Id;
}
return false;
_pendingSnapshots.AddCurrentSnapshot(newSnapshot);
}
}
Loading