diff --git a/src/SIL.Harmony.Tests/DataModelTestBase.cs b/src/SIL.Harmony.Tests/DataModelTestBase.cs index ea4b305..279ea8f 100644 --- a/src/SIL.Harmony.Tests/DataModelTestBase.cs +++ b/src/SIL.Harmony.Tests/DataModelTestBase.cs @@ -73,7 +73,7 @@ public void SetCurrentDate(DateTime dateTime) public async ValueTask WriteNextChange(IChange change, bool add = true) { - return await WriteChange(_localClientId, NextDate(), change, add); + return await WriteChange(clientId: _localClientId, NextDate(), change, add); } public async ValueTask WriteNextChange(IEnumerable changes, bool add = true) diff --git a/src/SIL.Harmony/Changes/ChangeContext.cs b/src/SIL.Harmony/Changes/ChangeContext.cs index 97ab601..a93e75b 100644 --- a/src/SIL.Harmony/Changes/ChangeContext.cs +++ b/src/SIL.Harmony/Changes/ChangeContext.cs @@ -7,19 +7,15 @@ internal class ChangeContext : IChangeContext private readonly SnapshotWorker _worker; private readonly CrdtConfig _crdtConfig; - internal ChangeContext(Commit commit, int commitIndex, IDictionary intermediateSnapshots, SnapshotWorker worker, CrdtConfig crdtConfig) + internal ChangeContext(Commit commit, SnapshotWorker worker, CrdtConfig crdtConfig) { _worker = worker; _crdtConfig = crdtConfig; Commit = commit; - CommitIndex = commitIndex; - IntermediateSnapshots = intermediateSnapshots; } CommitBase IChangeContext.Commit => Commit; public Commit Commit { get; } - public int CommitIndex { get; } - public IDictionary IntermediateSnapshots { get; } public async ValueTask GetSnapshot(Guid entityId) => await _worker.GetSnapshot(entityId); public IAsyncEnumerable GetObjectsReferencing(Guid entityId, bool includeDeleted = false) { diff --git a/src/SIL.Harmony/Db/DbSetExtensions.cs b/src/SIL.Harmony/Db/DbSetExtensions.cs index 9bb0066..abc6086 100644 --- a/src/SIL.Harmony/Db/DbSetExtensions.cs +++ b/src/SIL.Harmony/Db/DbSetExtensions.cs @@ -21,6 +21,14 @@ public static IQueryable DefaultOrderDescending(this IQueryable< .ThenByDescending(c => c.Commit.Id); } + public static IEnumerable DefaultOrder(this IEnumerable queryable) + { + return queryable + .OrderBy(c => c.Commit.HybridDateTime.DateTime) + .ThenBy(c => c.Commit.HybridDateTime.Counter) + .ThenBy(c => c.Commit.Id); + } + public static IEnumerable DefaultOrderDescending(this IEnumerable queryable) { return queryable diff --git a/src/SIL.Harmony/SnapshotWorker.cs b/src/SIL.Harmony/SnapshotWorker.cs index 612346f..ccc859b 100644 --- a/src/SIL.Harmony/SnapshotWorker.cs +++ b/src/SIL.Harmony/SnapshotWorker.cs @@ -5,6 +5,43 @@ namespace SIL.Harmony; +internal class PendingSnapshots +{ + private readonly Dictionary _entitySnapshots = []; + private readonly List _snapshots = []; + + + internal PendingSnapshots(Dictionary entitySnapshots) + { + _entitySnapshots = entitySnapshots; + _snapshots = [.. entitySnapshots.Values.DefaultOrder()]; + } + + public void AddCurrentSnapshot(ObjectSnapshot snapshot) + { + // we only support one snapshot per entity per commit + _snapshots.RemoveAll(s => s.EntityId == snapshot.EntityId && s.CommitId == snapshot.CommitId); + _snapshots.Add(snapshot); + _entitySnapshots[snapshot.EntityId] = snapshot; + } + + public ObjectSnapshot? GetSnapshot(Guid entityId) + { + return _entitySnapshots.GetValueOrDefault(entityId); + } + + + public IEnumerable GetSnapshots() + { + return _snapshots; + } + + public IEnumerable GetLatestSnapshots() + { + return _entitySnapshots.Values; + } +} + /// /// helper service to update snapshots and apply commits to them, has mutable state, don't reuse /// @@ -13,16 +50,14 @@ internal class SnapshotWorker private readonly Dictionary _snapshotLookup; private readonly CrdtRepository _crdtRepository; private readonly CrdtConfig _crdtConfig; - private readonly Dictionary _pendingSnapshots = []; - private readonly Dictionary _rootSnapshots = []; - private readonly List _newIntermediateSnapshots = []; + private readonly PendingSnapshots _pendingSnapshots = new([]); private SnapshotWorker(Dictionary snapshots, Dictionary snapshotLookup, CrdtRepository crdtRepository, CrdtConfig crdtConfig) { - _pendingSnapshots = snapshots; + _pendingSnapshots = new PendingSnapshots(snapshots); _crdtRepository = crdtRepository; _snapshotLookup = snapshotLookup; _crdtConfig = crdtConfig; @@ -45,7 +80,7 @@ internal static async Task> ApplyCommitsToSnaps /// internal SnapshotWorker(Dictionary snapshotLookup, CrdtRepository crdtRepository, - CrdtConfig crdtConfig): this([], snapshotLookup, crdtRepository, crdtConfig) + CrdtConfig crdtConfig) : this([], snapshotLookup, crdtRepository, crdtConfig) { } @@ -53,13 +88,30 @@ public async Task UpdateSnapshots(Commit oldestAddedCommit, Commit[] newCommits) { var previousCommit = await _crdtRepository.FindPreviousCommit(oldestAddedCommit); var commits = await _crdtRepository.GetCommitsAfter(previousCommit); - await ApplyCommitChanges(commits.UnionBy(newCommits, c => c.Id), true, previousCommit?.Hash ?? CommitBase.NullParentHash); + var allCommits = commits.UnionBy(newCommits, c => c.Id).DefaultOrder().ToArray(); + await ApplyCommitChanges(allCommits, true, previousCommit?.Hash ?? CommitBase.NullParentHash); + + var seenEntities = new HashSet(); + var snapshots = _pendingSnapshots.GetSnapshots() + .Reverse() // ensure we see the latest snapshots first + .Where(s => + { + if (!seenEntities.Contains(s.EntityId)) + { + seenEntities.Add(s.EntityId); + return true; + } + + if (s.IsRoot) return true; - await _crdtRepository.AddSnapshots([ - .._rootSnapshots.Values, - .._newIntermediateSnapshots, - .._pendingSnapshots.Values - ]); + var commitIndex = Array.IndexOf(allCommits, s.Commit) + 1; + return commitIndex % 2 == 0; + }) + // reverse back to original order, so snapshot data is more intuitive + // (the repository sorts them as well, but only by commit. This reverse seems to keep snapshots within a single commit in the order they were made) + .Reverse(); + + await _crdtRepository.AddSnapshots(snapshots); } private async ValueTask ApplyCommitChanges(IEnumerable commits, bool updateCommitHash, string? previousCommitHash) @@ -80,7 +132,7 @@ private async ValueTask ApplyCommitChanges(IEnumerable commits, bool upd { IObjectBase entity; var prevSnapshot = await GetSnapshot(commitChange.EntityId); - var changeContext = new ChangeContext(commit, commitIndex, intermediateSnapshots, this, _crdtConfig); + var changeContext = new ChangeContext(commit, this, _crdtConfig); bool wasDeleted; if (prevSnapshot is not null) { @@ -100,11 +152,9 @@ private async ValueTask ApplyCommitChanges(IEnumerable commits, bool upd { await MarkDeleted(entity.Id, changeContext); } - + await GenerateSnapshotForEntity(entity, prevSnapshot, changeContext); } - _newIntermediateSnapshots.AddRange(intermediateSnapshots.Values); - intermediateSnapshots.Clear(); } } @@ -141,15 +191,8 @@ private async ValueTask MarkDeleted(Guid deletedEntityId, ChangeContext context) public async ValueTask GetSnapshot(Guid entityId) { - if (_pendingSnapshots.TryGetValue(entityId, out var snapshot)) - { - return snapshot; - } - - if (_rootSnapshots.TryGetValue(entityId, out var rootSnapshot)) - { - return rootSnapshot; - } + var snapshot = _pendingSnapshots.GetSnapshot(entityId); + if (snapshot is not null) return snapshot; if (_snapshotLookup.TryGetValue(entityId, out var snapshotId)) { @@ -173,24 +216,17 @@ internal async IAsyncEnumerable GetSnapshotsWhere(Expression !_pendingSnapshots.ContainsKey(s.EntityId))) - { - yield return snapshot; - } - await foreach (var snapshot in _crdtRepository.CurrentSnapshots() .Where(predicateExpression) .AsAsyncEnumerable()) { - if (_pendingSnapshots.ContainsKey(snapshot.EntityId) || _rootSnapshots.ContainsKey(snapshot.EntityId)) + if (_pendingSnapshots.GetSnapshot(snapshot.EntityId) is not null) continue; yield return snapshot; } @@ -210,48 +246,9 @@ private async Task GenerateSnapshotForEntity(IObjectBase entity, ObjectSnapshot? //when both snapshots are for the same commit we don't want to keep the previous, therefore the new snapshot should be root var isRoot = prevSnapshot is null || (prevSnapshot.IsRoot && prevSnapshot.CommitId == context.Commit.Id); var newSnapshot = new ObjectSnapshot(entity, context.Commit, isRoot); - //if both snapshots are for the same commit then we don't want to keep the previous snapshot - if (prevSnapshot is null || prevSnapshot.CommitId == context.Commit.Id) - { - //do nothing, will cause prevSnapshot to be overriden in _pendingSnapshots if it exists - } - else if (context.CommitIndex % 2 == 0 && !prevSnapshot.IsRoot && IsNew(prevSnapshot)) - { - context.IntermediateSnapshots[prevSnapshot.Entity.Id] = prevSnapshot; - } await _crdtConfig.BeforeSaveObject.Invoke(entity.DbObject, newSnapshot); - AddSnapshot(newSnapshot); - } - - private void AddSnapshot(ObjectSnapshot snapshot) - { - if (snapshot.IsRoot) - { - _rootSnapshots[snapshot.Entity.Id] = snapshot; - } - else - { - //if there was already a pending snapshot there's no need to store it as both may point to the same commit - _pendingSnapshots[snapshot.Entity.Id] = snapshot; - } - } - - /// - /// snapshot is not from the database - /// - private bool IsNew(ObjectSnapshot snapshot) - { - var entityId = snapshot.EntityId; - if (_pendingSnapshots.TryGetValue(entityId, out var pendingSnapshot)) - { - return pendingSnapshot.Id == snapshot.Id; - } - if (_rootSnapshots.TryGetValue(entityId, out var rootSnapshot)) - { - return rootSnapshot.Id == snapshot.Id; - } - return false; + _pendingSnapshots.AddCurrentSnapshot(newSnapshot); } }