From 61f7a9e4aabdac3fd5f39d67dfc94759e3e81205 Mon Sep 17 00:00:00 2001 From: Paul Irwin Date: Wed, 30 Jul 2025 14:58:51 -0600 Subject: [PATCH 1/7] Add queue tracking properties to sync log entries --- Syncerbell.EntityFrameworkCore/README.md | 8 +++++++- Syncerbell.EntityFrameworkCore/SyncLogEntry.cs | 8 ++++++++ Syncerbell.Tests/ProgressPercentageTests.cs | 2 ++ Syncerbell/ISyncLogEntry.cs | 10 ++++++++++ Syncerbell/InMemorySyncLogPersistence.cs | 8 +++++++- 5 files changed, 34 insertions(+), 2 deletions(-) diff --git a/Syncerbell.EntityFrameworkCore/README.md b/Syncerbell.EntityFrameworkCore/README.md index 1567ed0..038cdbe 100644 --- a/Syncerbell.EntityFrameworkCore/README.md +++ b/Syncerbell.EntityFrameworkCore/README.md @@ -26,6 +26,8 @@ CREATE TABLE [dbo].[SyncLogEntries] ( LeasedAt datetime2 NULL, LeaseExpiresAt datetime2 NULL, LeasedBy varchar(100) NULL, + QueueMessageId varchar(100) NULL, + QueuedAt datetime2 NULL, FinishedAt datetime2 NULL, ResultMessage nvarchar(MAX) NULL, HighWaterMark varchar(100) NULL, @@ -85,6 +87,8 @@ The 0.5.0 release adds several new columns to the SyncLogEntries table for enhan - `ProgressValue` for tracking current progress amount - `ProgressMax` for tracking maximum progress value - `RecordCount` for tracking total number of records processed +- `QueueMessageId` for tracking message IDs when fanning out sync operations via a queue +- `QueuedAt` for tracking when sync operations are queued If you are upgrading from an earlier version and not using dacpac deployment, you need to run the following SQL script to add the new columns: @@ -93,6 +97,8 @@ to add the new columns: ALTER TABLE [dbo].[SyncLogEntries] ADD [ProgressValue] int NULL, [ProgressMax] int NULL, - [RecordCount] int NULL; + [RecordCount] int NULL, + [QueueMessageId] varchar(100) NULL, + [QueuedAt] datetime2 NULL; GO ``` diff --git a/Syncerbell.EntityFrameworkCore/SyncLogEntry.cs b/Syncerbell.EntityFrameworkCore/SyncLogEntry.cs index cc1ec41..999f181 100644 --- a/Syncerbell.EntityFrameworkCore/SyncLogEntry.cs +++ b/Syncerbell.EntityFrameworkCore/SyncLogEntry.cs @@ -55,6 +55,14 @@ public class SyncLogEntry : ISyncLogEntry [Unicode(false)] public string? LeasedBy { get; set; } + /// + [StringLength(maximumLength: 100)] + [Unicode(false)] + public string? QueueMessageId { get; set; } + + /// + public DateTime? QueuedAt { get; set; } + /// public DateTime? FinishedAt { get; set; } diff --git a/Syncerbell.Tests/ProgressPercentageTests.cs b/Syncerbell.Tests/ProgressPercentageTests.cs index 065ea48..6965e51 100644 --- a/Syncerbell.Tests/ProgressPercentageTests.cs +++ b/Syncerbell.Tests/ProgressPercentageTests.cs @@ -9,6 +9,8 @@ private class TestSyncLogEntry : ISyncLogEntry public DateTime? LeasedAt { get; set; } public DateTime? LeaseExpiresAt { get; set; } public string? LeasedBy { get; set; } + public string? QueueMessageId { get; set; } + public DateTime? QueuedAt { get; set; } public DateTime? FinishedAt { get; set; } public string? ResultMessage { get; set; } public string? HighWaterMark { get; set; } diff --git a/Syncerbell/ISyncLogEntry.cs b/Syncerbell/ISyncLogEntry.cs index 13f3a49..9ea1c17 100644 --- a/Syncerbell/ISyncLogEntry.cs +++ b/Syncerbell/ISyncLogEntry.cs @@ -30,6 +30,16 @@ public interface ISyncLogEntry /// string? LeasedBy { get; set; } + /// + /// Gets or sets the message ID when fanning out sync operations via a queue. + /// + string? QueueMessageId { get; set; } + + /// + /// Gets or sets the date and time when the sync operation was queued. + /// + DateTime? QueuedAt { get; set; } + /// /// Gets or sets the date and time when the sync operation finished. /// diff --git a/Syncerbell/InMemorySyncLogPersistence.cs b/Syncerbell/InMemorySyncLogPersistence.cs index 26e491e..b5f62f3 100644 --- a/Syncerbell/InMemorySyncLogPersistence.cs +++ b/Syncerbell/InMemorySyncLogPersistence.cs @@ -152,6 +152,10 @@ private class InMemoryEntry : ISyncLogEntry public string? LeasedBy { get; set; } + public string? QueueMessageId { get; set; } + + public DateTime? QueuedAt { get; set; } + public DateTime? LeaseExpiresAt { get; set; } public DateTime? FinishedAt { get; set; } @@ -177,10 +181,12 @@ public InMemoryEntry Clone() CreatedAt = CreatedAt, LeasedAt = LeasedAt, LeasedBy = LeasedBy, + QueueMessageId = QueueMessageId, + QueuedAt = QueuedAt, + LeaseExpiresAt = LeaseExpiresAt, FinishedAt = FinishedAt, ResultMessage = ResultMessage, HighWaterMark = HighWaterMark, - LeaseExpiresAt = LeaseExpiresAt, ProgressValue = ProgressValue, ProgressMax = ProgressMax, RecordCount = RecordCount From 162ce3c5dd2b823043f43716bad444a639aa0dd2 Mon Sep 17 00:00:00 2001 From: Paul Irwin Date: Wed, 30 Jul 2025 15:08:29 -0600 Subject: [PATCH 2/7] Promote SyncEntityIfEligible to public interface for sync service --- Syncerbell/ISyncService.cs | 11 +++++++++++ Syncerbell/SyncService.cs | 10 +++------- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/Syncerbell/ISyncService.cs b/Syncerbell/ISyncService.cs index d4d3a76..63bbae5 100644 --- a/Syncerbell/ISyncService.cs +++ b/Syncerbell/ISyncService.cs @@ -14,4 +14,15 @@ public interface ISyncService Task> SyncAllEligible( SyncTriggerType triggerType, CancellationToken cancellationToken = default); + + /// + /// Synchronizes a specific entity if it's eligible for syncing based on the provided trigger type. + /// + /// The type of trigger that initiated the sync operation. + /// The entity options for the entity to sync. + /// A cancellation token that can be used to cancel the operation. + /// Returns a task that resolves to a object indicating the outcome of the sync operation, or null if the entity was not eligible or could not be processed. + Task SyncEntityIfEligible(SyncTriggerType triggerType, + SyncEntityOptions entity, + CancellationToken cancellationToken = default); } diff --git a/Syncerbell/SyncService.cs b/Syncerbell/SyncService.cs index a0f3c97..bbd4c03 100644 --- a/Syncerbell/SyncService.cs +++ b/Syncerbell/SyncService.cs @@ -16,12 +16,7 @@ public class SyncService( private const string SyncSuccessMessage = "Sync completed successfully."; private const string SyncFailedMessage = "The sync failed. Check logs for details."; - /// - /// Synchronizes all eligible entities based on the specified trigger type. - /// - /// The type of trigger initiating the sync operation. - /// A token to monitor for cancellation requests. - /// A list of objects representing the result of each sync operation. + /// public async Task> SyncAllEligible(SyncTriggerType triggerType, CancellationToken cancellationToken = default) { var entities = new List(options.Entities); @@ -68,7 +63,8 @@ public async Task> SyncAllEligible(SyncTriggerType tri return results; } - private async Task SyncEntityIfEligible(SyncTriggerType triggerType, + /// + public async Task SyncEntityIfEligible(SyncTriggerType triggerType, SyncEntityOptions entity, CancellationToken cancellationToken = default) { From 7558335968326f390a368132aaca0ebb4c16717c Mon Sep 17 00:00:00 2001 From: Paul Irwin Date: Thu, 31 Jul 2025 10:21:38 -0600 Subject: [PATCH 3/7] Add support for syncing a queued log entry --- .github/copilot-instructions.md | 6 + .../EntityFrameworkCoreSyncLogPersistence.cs | 131 ++++++++--- Syncerbell.EntityFrameworkCore/README.md | 5 +- .../SyncLogEntry.cs | 9 + .../AlwaysEligibleStrategyTests.cs | 4 +- .../InMemorySyncLogPersistenceTests.cs | 16 +- .../ParameterSerializationTests.cs | 166 ++++++++++++++ Syncerbell.Tests/ProgressPercentageTests.cs | 7 + Syncerbell/ISyncLogEntry.cs | 25 +++ Syncerbell/ISyncLogPersistence.cs | 33 ++- Syncerbell/ISyncQueueService.cs | 41 ++++ Syncerbell/ISyncService.cs | 11 + Syncerbell/InMemorySyncLogPersistence.cs | 203 ++++++++++++------ Syncerbell/ParameterSerialization.cs | 19 ++ Syncerbell/ServiceCollectionExtensions.cs | 1 + Syncerbell/SyncEntityOptions.cs | 5 + Syncerbell/SyncQueueService.cs | 106 +++++++++ Syncerbell/SyncService.cs | 110 +++++++--- Syncerbell/SyncTriggerType.cs | 4 +- 19 files changed, 767 insertions(+), 135 deletions(-) create mode 100644 .github/copilot-instructions.md create mode 100644 Syncerbell.Tests/ParameterSerializationTests.cs create mode 100644 Syncerbell/ISyncQueueService.cs create mode 100644 Syncerbell/ParameterSerialization.cs create mode 100644 Syncerbell/SyncQueueService.cs diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md new file mode 100644 index 0000000..c7ef767 --- /dev/null +++ b/.github/copilot-instructions.md @@ -0,0 +1,6 @@ +# Prompt Instructions + +## Coding standards (C#) +- Always put a blank line between properties and at the end of the file. +- Use 4 spaces for indentation. +- Always use curly braces for all control structures. diff --git a/Syncerbell.EntityFrameworkCore/EntityFrameworkCoreSyncLogPersistence.cs b/Syncerbell.EntityFrameworkCore/EntityFrameworkCoreSyncLogPersistence.cs index dd72aef..a369f9c 100644 --- a/Syncerbell.EntityFrameworkCore/EntityFrameworkCoreSyncLogPersistence.cs +++ b/Syncerbell.EntityFrameworkCore/EntityFrameworkCoreSyncLogPersistence.cs @@ -1,5 +1,5 @@ -using System.Text.Json; using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging; namespace Syncerbell.EntityFrameworkCore; @@ -8,30 +8,28 @@ namespace Syncerbell.EntityFrameworkCore; /// public class EntityFrameworkCoreSyncLogPersistence( SyncLogDbContext context, - SyncerbellOptions options) + SyncerbellOptions options, + ILogger logger) : ISyncLogPersistence { - /// - /// Attempts to acquire a log entry for the specified entity, creating or updating an entry as needed. - /// - /// The entity options for which to acquire a log entry. - /// A token to monitor for cancellation requests. - /// An containing the acquired log entry and prior sync info, or null if the entry is already leased. - public async Task TryAcquireLogEntry(SyncEntityOptions entity, CancellationToken cancellationToken = default) + /// + public async Task TryAcquireLogEntry(SyncTriggerType triggerType, SyncEntityOptions entity, + CancellationToken cancellationToken = default) { return await ResilientTransaction.New(context).ExecuteAsync(async () => { - var parametersJson = entity.Parameters != null - ? JsonSerializer.Serialize(entity.Parameters) - : null; + var parametersJson = ParameterSerialization.Serialize(entity.Parameters); var logEntry = await GetPendingOrInProgressLogEntry(entity, parametersJson, cancellationToken); if (logEntry?.LeaseExpiresAt != null && logEntry.LeaseExpiresAt < DateTime.UtcNow) { // expired lease, set as expired and allow re-acquisition - logEntry.SyncStatus = SyncStatus.LeaseExpired; + logger.LogWarning( + "Lease expired for log entry {LogEntryId} for entity {Entity}. Setting status to LeaseExpired.", + logEntry.Id, entity.Entity); + logEntry.SyncStatus = SyncStatus.LeaseExpired; await context.SaveChangesAsync(cancellationToken); logEntry = null; // reset logEntry to allow creation of a new one @@ -43,18 +41,66 @@ public class EntityFrameworkCoreSyncLogPersistence( return null; } - var priorSyncInfo = await GetPriorSyncInfo(entity, parametersJson, cancellationToken); + var priorSyncInfo = await GetPriorSyncInfo(entity.Entity, parametersJson, entity.SchemaVersion, cancellationToken); - logEntry = await CreateOrUpdateLogEntry(entity, parametersJson, logEntry, cancellationToken); + logEntry = await CreateOrUpdateLogEntry(triggerType, entity, parametersJson, logEntry, cancellationToken); return new AcquireLogEntryResult(logEntry, priorSyncInfo); }, cancellationToken); } - private async Task GetPriorSyncInfo(SyncEntityOptions entity, string? parametersJson, CancellationToken cancellationToken) + /// + public async Task TryAcquireLogEntry(ISyncLogEntry syncLogEntry, + SyncEntityOptions entity, + CancellationToken cancellationToken = default) + { + return await ResilientTransaction.New(context).ExecuteAsync(async () => + { + if (syncLogEntry is not SyncLogEntry efSyncLogEntry) + { + throw new ArgumentException($"Log entry must be of type {nameof(SyncLogEntry)}", nameof(syncLogEntry)); + } + + // check if the log entry matches the entity and schema version + if (!syncLogEntry.Entity.Equals(entity.Entity) || + syncLogEntry.SchemaVersion != entity.SchemaVersion || + syncLogEntry.ParametersJson != ParameterSerialization.Serialize(entity.Parameters)) + { + logger.LogWarning("Log entry {LogEntryId} does not match entity {Entity}, parameters, or schema version. Cannot acquire.", syncLogEntry.Id, entity.Entity); + return null; + } + + // Check if lease is expired + if (syncLogEntry.LeaseExpiresAt != null && syncLogEntry.LeaseExpiresAt < DateTime.UtcNow) + { + // expired lease, set as expired and allow re-acquisition + syncLogEntry.SyncStatus = SyncStatus.LeaseExpired; + await context.SaveChangesAsync(cancellationToken); + } + else if (syncLogEntry.LeasedAt != null) + { + // If the log entry is already leased or in progress, return null as we can't acquire it yet. + return null; + } + + // Get prior sync info using the log entry data directly + var priorSyncInfo = await GetPriorSyncInfo(syncLogEntry.Entity, syncLogEntry.ParametersJson, syncLogEntry.SchemaVersion, cancellationToken); + + // Lease the log entry + UpdateLogEntryLease(efSyncLogEntry, entity.LeaseExpiration ?? options.DefaultLeaseExpiration); + + context.SyncLogEntries.Update(efSyncLogEntry); + await context.SaveChangesAsync(cancellationToken); + + return new AcquireLogEntryResult(efSyncLogEntry, priorSyncInfo); + }, cancellationToken); + } + + private async Task GetPriorSyncInfo(string entity, string? parametersJson, int? schemaVersion, + CancellationToken cancellationToken) { var priorEntriesQuery = context.SyncLogEntries - .Where(e => e.Entity == entity.Entity && e.ParametersJson == parametersJson && e.SchemaVersion == entity.SchemaVersion) + .Where(e => e.Entity == entity && e.ParametersJson == parametersJson && e.SchemaVersion == schemaVersion) .OrderByDescending(e => e.CreatedAt); return new PriorSyncInfo @@ -66,7 +112,8 @@ private async Task GetPriorSyncInfo(SyncEntityOptions entity, str }; } - private async Task GetPendingOrInProgressLogEntry(SyncEntityOptions entity, string? parametersJson, CancellationToken cancellationToken) + private async Task GetPendingOrInProgressLogEntry(SyncEntityOptions entity, string? parametersJson, + CancellationToken cancellationToken) { return await context.SyncLogEntries .Where(e => e.Entity == entity.Entity @@ -76,8 +123,19 @@ private async Task GetPriorSyncInfo(SyncEntityOptions entity, str .SingleOrDefaultAsync(cancellationToken); } - private async Task CreateOrUpdateLogEntry(SyncEntityOptions entity, - string? parametersJson, SyncLogEntry? logEntry, CancellationToken cancellationToken) + private void UpdateLogEntryLease(SyncLogEntry logEntry, TimeSpan leaseExpiration) + { + logEntry.LeasedAt = DateTime.UtcNow; + logEntry.LeasedBy = options.MachineIdProvider(); + logEntry.LeaseExpiresAt = DateTime.UtcNow.Add(leaseExpiration); + } + + private async Task CreateOrUpdateLogEntry( + SyncTriggerType triggerType, + SyncEntityOptions entity, + string? parametersJson, + SyncLogEntry? logEntry, + CancellationToken cancellationToken) { if (logEntry == null) { @@ -86,19 +144,15 @@ private async Task CreateOrUpdateLogEntry(SyncEntityOptions entity Entity = entity.Entity, ParametersJson = parametersJson, SchemaVersion = entity.SchemaVersion, + TriggerType = triggerType, SyncStatus = SyncStatus.Pending, CreatedAt = DateTime.UtcNow, - LeasedAt = DateTime.UtcNow, - LeasedBy = options.MachineIdProvider(), - LeaseExpiresAt = DateTime.UtcNow.Add(entity.LeaseExpiration ?? options.DefaultLeaseExpiration), }; context.SyncLogEntries.Add(logEntry); } else { - logEntry.LeasedAt = DateTime.UtcNow; - logEntry.LeasedBy = options.MachineIdProvider(); - logEntry.LeaseExpiresAt = DateTime.UtcNow.Add(entity.LeaseExpiration ?? options.DefaultLeaseExpiration); + UpdateLogEntryLease(logEntry, entity.LeaseExpiration ?? options.DefaultLeaseExpiration); context.SyncLogEntries.Update(logEntry); } @@ -107,18 +161,25 @@ private async Task CreateOrUpdateLogEntry(SyncEntityOptions entity return logEntry; } - /// - /// Updates an existing log entry for the specified entity. - /// - /// The entity options associated with the log entry. - /// The log entry to update. Must be of type . - /// A token to monitor for cancellation requests. - /// Thrown if is not of type . - /// A completed . - public async Task UpdateLogEntry(SyncEntityOptions entity, ISyncLogEntry logEntry, CancellationToken cancellationToken = default) + /// + public async Task FindById(string id, CancellationToken cancellationToken = default) + { + if (!int.TryParse(id, out var intId)) + { + throw new ArgumentException($"Identifier '{id}' is not a valid integer.", nameof(id)); + } + + return await context.SyncLogEntries + .FirstOrDefaultAsync(e => e.Id == intId, cancellationToken); + } + + /// + public async Task UpdateLogEntry(ISyncLogEntry logEntry, CancellationToken cancellationToken = default) { if (logEntry is not SyncLogEntry entry) + { throw new ArgumentException($"Log entry must be of type {nameof(SyncLogEntry)}", nameof(logEntry)); + } context.SyncLogEntries.Update(entry); await context.SaveChangesAsync(cancellationToken); diff --git a/Syncerbell.EntityFrameworkCore/README.md b/Syncerbell.EntityFrameworkCore/README.md index 038cdbe..5f546b2 100644 --- a/Syncerbell.EntityFrameworkCore/README.md +++ b/Syncerbell.EntityFrameworkCore/README.md @@ -21,6 +21,7 @@ CREATE TABLE [dbo].[SyncLogEntries] ( Entity varchar(100) NOT NULL, ParametersJson nvarchar(100) NULL, SchemaVersion int NULL, + TriggerType int NOT NULL CONSTRAINT [DF_SyncLogEntries_TriggerType] DEFAULT 0, SyncStatus int NOT NULL CONSTRAINT [DF_SyncLogEntries_SyncStatus] DEFAULT 1, CreatedAt datetime2 NOT NULL CONSTRAINT [DF_SyncLogEntries_CreatedAt] DEFAULT GETUTCDATE(), LeasedAt datetime2 NULL, @@ -84,6 +85,7 @@ GO ## Migrating to 0.5.0 The 0.5.0 release adds several new columns to the SyncLogEntries table for enhanced tracking capabilities: +- `TriggerType` for tracking the type of trigger that initiated the sync operation - `ProgressValue` for tracking current progress amount - `ProgressMax` for tracking maximum progress value - `RecordCount` for tracking total number of records processed @@ -95,7 +97,8 @@ to add the new columns: ```tsql ALTER TABLE [dbo].[SyncLogEntries] -ADD [ProgressValue] int NULL, +ADD [TriggerType] int NOT NULL CONSTRAINT [DF_SyncLogEntries_TriggerType] DEFAULT 0, + [ProgressValue] int NULL, [ProgressMax] int NULL, [RecordCount] int NULL, [QueueMessageId] varchar(100) NULL, diff --git a/Syncerbell.EntityFrameworkCore/SyncLogEntry.cs b/Syncerbell.EntityFrameworkCore/SyncLogEntry.cs index 999f181..41b7829 100644 --- a/Syncerbell.EntityFrameworkCore/SyncLogEntry.cs +++ b/Syncerbell.EntityFrameworkCore/SyncLogEntry.cs @@ -37,6 +37,11 @@ public class SyncLogEntry : ISyncLogEntry /// public int? SchemaVersion { get; init; } + /// + /// Gets the type of trigger that initiated the sync operation. + /// + public required SyncTriggerType TriggerType { get; init; } + /// public SyncStatus SyncStatus { get; set; } @@ -84,6 +89,10 @@ public class SyncLogEntry : ISyncLogEntry /// public int? RecordCount { get; set; } + /// + [NotMapped] + string ISyncLogEntry.Id => Id.ToString(); + /// /// Gets or sets the row version for concurrency checking. /// diff --git a/Syncerbell.Tests/AlwaysEligibleStrategyTests.cs b/Syncerbell.Tests/AlwaysEligibleStrategyTests.cs index 4a990ae..bd3fb49 100644 --- a/Syncerbell.Tests/AlwaysEligibleStrategyTests.cs +++ b/Syncerbell.Tests/AlwaysEligibleStrategyTests.cs @@ -6,8 +6,8 @@ public class AlwaysEligibleStrategyTests [InlineData(SyncTriggerType.Manual, 30)] [InlineData(SyncTriggerType.Timer, null)] [InlineData(SyncTriggerType.Timer, 400)] - [InlineData(SyncTriggerType.Custom, null)] - [InlineData(SyncTriggerType.Custom, -1)] // in the future? doesn't matter. + [InlineData(SyncTriggerType.Unknown, null)] + [InlineData(SyncTriggerType.Unknown, -1)] // in the future? doesn't matter. [Theory] public async Task AlwaysEligible_AlwaysReturnsTrue(SyncTriggerType syncTriggerType, int? lastSyncDaysAgo) { diff --git a/Syncerbell.Tests/InMemorySyncLogPersistenceTests.cs b/Syncerbell.Tests/InMemorySyncLogPersistenceTests.cs index 4f611a6..19db3f4 100644 --- a/Syncerbell.Tests/InMemorySyncLogPersistenceTests.cs +++ b/Syncerbell.Tests/InMemorySyncLogPersistenceTests.cs @@ -1,3 +1,5 @@ +using Microsoft.Extensions.Logging; + namespace Syncerbell.Tests; public class InMemorySyncLogPersistenceTests @@ -7,11 +9,12 @@ public async Task InMemorySyncLogPersistence_ShouldCreateNewLogEntryIfNotFound() { // Arrange var options = new SyncerbellOptions(); - var persistence = new InMemorySyncLogPersistence(options); + var logger = new LoggerFactory().CreateLogger(); + var persistence = new InMemorySyncLogPersistence(options, logger); var entity = new SyncEntityOptions("TestEntity", typeof(TestEntitySync)); // Act - var result = await persistence.TryAcquireLogEntry(entity); + var result = await persistence.TryAcquireLogEntry(SyncTriggerType.Manual, entity); // Assert Assert.NotNull(result); @@ -29,18 +32,19 @@ public async Task InMemorySyncLogPersistence_ShouldAcquireExistingLogEntry() { // Arrange var options = new SyncerbellOptions(); - var persistence = new InMemorySyncLogPersistence(options); + var logger = new LoggerFactory().CreateLogger(); + var persistence = new InMemorySyncLogPersistence(options, logger); var entity = new SyncEntityOptions("TestEntity", typeof(TestEntitySync)); - var originalLogEntry = await persistence.TryAcquireLogEntry(entity); + var originalLogEntry = await persistence.TryAcquireLogEntry(SyncTriggerType.Manual, entity); Assert.NotNull(originalLogEntry?.SyncLogEntry); originalLogEntry.SyncLogEntry.LeaseExpiresAt = null; originalLogEntry.SyncLogEntry.LeasedAt = null; originalLogEntry.SyncLogEntry.LeasedBy = null; - await persistence.UpdateLogEntry(entity, originalLogEntry.SyncLogEntry); + await persistence.UpdateLogEntry(originalLogEntry.SyncLogEntry); // Act - var result = await persistence.TryAcquireLogEntry(entity); + var result = await persistence.TryAcquireLogEntry(SyncTriggerType.Manual, entity); // Assert Assert.NotNull(result); diff --git a/Syncerbell.Tests/ParameterSerializationTests.cs b/Syncerbell.Tests/ParameterSerializationTests.cs new file mode 100644 index 0000000..09238bb --- /dev/null +++ b/Syncerbell.Tests/ParameterSerializationTests.cs @@ -0,0 +1,166 @@ +using System.Text.Json; + +namespace Syncerbell.Tests; + +public class ParameterSerializationTests +{ + [Fact] + public void Serialize_WithNullParameters_ReturnsNull() + { + // Arrange + SortedDictionary? parameters = null; + + // Act + var result = ParameterSerialization.Serialize(parameters); + + // Assert + Assert.Null(result); + } + + [Fact] + public void Serialize_WithEmptyDictionary_ReturnsNull() + { + // Arrange + var parameters = new SortedDictionary(); + + // Act + var result = ParameterSerialization.Serialize(parameters); + + // Assert + Assert.Null(result); + } + + [Fact] + public void Serialize_WithSingleStringParameter_ReturnsValidJson() + { + // Arrange + var parameters = new SortedDictionary + { + { "key1", "value1" } + }; + + // Act + var result = ParameterSerialization.Serialize(parameters); + + // Assert + Assert.NotNull(result); + Assert.Equal("{\"key1\":\"value1\"}", result); + } + + [Fact] + public void Serialize_WithMultipleParameters_ReturnsValidJson() + { + // Arrange + var parameters = new SortedDictionary + { + { "stringParam", "test" }, + { "intParam", 42 }, + { "boolParam", true } + }; + + // Act + var result = ParameterSerialization.Serialize(parameters); + + // Assert + Assert.NotNull(result); + Assert.Equal("{\"boolParam\":true,\"intParam\":42,\"stringParam\":\"test\"}", result); + } + + [Fact] + public void Serialize_WithNullValues_ReturnsValidJson() + { + // Arrange + var parameters = new SortedDictionary + { + { "nullParam", null }, + { "stringParam", "test" } + }; + + // Act + var result = ParameterSerialization.Serialize(parameters); + + // Assert + Assert.NotNull(result); + Assert.Equal("{\"nullParam\":null,\"stringParam\":\"test\"}", result); + } + + [Fact] + public void Serialize_WithComplexObjects_ReturnsValidJson() + { + // Arrange + var complexObject = new { Name = "Test", Value = 123 }; + var parameters = new SortedDictionary + { + { "complex", complexObject }, + { "array", new[] { 1, 2, 3 } } + }; + + // Act + var result = ParameterSerialization.Serialize(parameters); + + // Assert + Assert.NotNull(result); + Assert.Equal("{\"array\":[1,2,3],\"complex\":{\"Name\":\"Test\",\"Value\":123}}", result); + } + + [Fact] + public void Serialize_PreservesKeyOrder_WhenUsingSortedDictionary() + { + // Arrange + var parameters = new SortedDictionary + { + { "zKey", "last" }, + { "aKey", "first" }, + { "mKey", "middle" } + }; + + // Act + var result = ParameterSerialization.Serialize(parameters); + + // Assert + Assert.NotNull(result); + Assert.Equal("{\"aKey\":\"first\",\"mKey\":\"middle\",\"zKey\":\"last\"}", result); + } + + [Theory] + [InlineData("")] + [InlineData(" ")] + [InlineData("special-chars!@#$%")] + public void Serialize_WithVariousStringValues_ReturnsValidJson(string value) + { + // Arrange + var parameters = new SortedDictionary + { + { "testKey", value } + }; + + // Act + var result = ParameterSerialization.Serialize(parameters); + + // Assert + Assert.NotNull(result); + var deserialized = JsonSerializer.Deserialize>(result); + Assert.Equal(value, deserialized!["testKey"].GetString()); + } + + [Theory] + [InlineData(int.MinValue)] + [InlineData(0)] + [InlineData(int.MaxValue)] + public void Serialize_WithVariousIntegerValues_ReturnsValidJson(int value) + { + // Arrange + var parameters = new SortedDictionary + { + { "intKey", value } + }; + + // Act + var result = ParameterSerialization.Serialize(parameters); + + // Assert + Assert.NotNull(result); + var deserialized = JsonSerializer.Deserialize>(result); + Assert.Equal(value, deserialized!["intKey"].GetInt32()); + } +} diff --git a/Syncerbell.Tests/ProgressPercentageTests.cs b/Syncerbell.Tests/ProgressPercentageTests.cs index 6965e51..ef6940f 100644 --- a/Syncerbell.Tests/ProgressPercentageTests.cs +++ b/Syncerbell.Tests/ProgressPercentageTests.cs @@ -4,6 +4,10 @@ public class ProgressPercentageTests { private class TestSyncLogEntry : ISyncLogEntry { + private readonly Guid _id = Guid.NewGuid(); + + public int? SchemaVersion => null; + public SyncTriggerType TriggerType => SyncTriggerType.Manual; public SyncStatus SyncStatus { get; set; } public DateTime CreatedAt { get; } = DateTime.UtcNow; public DateTime? LeasedAt { get; set; } @@ -17,6 +21,9 @@ private class TestSyncLogEntry : ISyncLogEntry public int? ProgressValue { get; set; } public int? ProgressMax { get; set; } public int? RecordCount { get; set; } + public string Id => _id.ToString(); + public string Entity => "TestEntity"; + public string? ParametersJson => null; } [Theory] diff --git a/Syncerbell/ISyncLogEntry.cs b/Syncerbell/ISyncLogEntry.cs index 9ea1c17..5dbc091 100644 --- a/Syncerbell/ISyncLogEntry.cs +++ b/Syncerbell/ISyncLogEntry.cs @@ -5,6 +5,31 @@ /// public interface ISyncLogEntry { + /// + /// Gets an identifier for locating the sync operation in logs or other systems. + /// + string Id { get; } + + /// + /// Gets the name of the entity being synchronized. + /// + string Entity { get; } + + /// + /// Gets the serialized parameters associated with the entity, if any. + /// + string? ParametersJson { get; } + + /// + /// Gets the schema version of the entity, if specified. + /// + int? SchemaVersion { get; } + + /// + /// Gets the type of trigger that initiated the sync operation. + /// + SyncTriggerType TriggerType { get; } + /// /// Gets or sets the current status of the sync operation. /// diff --git a/Syncerbell/ISyncLogPersistence.cs b/Syncerbell/ISyncLogPersistence.cs index 8ca8264..0f259cc 100644 --- a/Syncerbell/ISyncLogPersistence.cs +++ b/Syncerbell/ISyncLogPersistence.cs @@ -11,16 +11,43 @@ public interface ISyncLogPersistence /// If there is an unleased log entry for the entity, it will be returned and leased to the caller. /// If there is a log entry that is already leased, this method will return null. /// + /// + /// Implementations should not modify the trigger type on an existing sync log entry if it differs from the one + /// provided. This value is only used when creating a new log entry. + /// + /// The type of trigger that initiated the sync operation. /// The entity for which to acquire a log entry. /// The cancellation token to observe while waiting for the task to complete. /// Asynchronously returns the acquired log entry details, or null. - Task TryAcquireLogEntry(SyncEntityOptions entity, CancellationToken cancellationToken = default); + Task TryAcquireLogEntry(SyncTriggerType triggerType, + SyncEntityOptions entity, + CancellationToken cancellationToken = default); + + /// + /// Tries to acquire a provided log entry. + /// If the log entry exists and is not already leased, it will be returned and leased to the caller. + /// If the log entry is already leased or does not exist, this method will return null. + /// + /// The sync log entry to acquire. + /// The entity options for the log entry to acquire. + /// The cancellation token to observe while waiting for the task to complete. + /// Asynchronously returns the acquired log entry details, or null. + Task TryAcquireLogEntry(ISyncLogEntry syncLogEntry, + SyncEntityOptions entity, + CancellationToken cancellationToken = default); /// /// Updates the specified log entry in the persistence layer. /// - /// The options for the entity associated with the log entry. /// The log entry to update. /// The cancellation token to observe while waiting for the task to complete. - Task UpdateLogEntry(SyncEntityOptions entity, ISyncLogEntry logEntry, CancellationToken cancellationToken = default); + Task UpdateLogEntry(ISyncLogEntry logEntry, CancellationToken cancellationToken = default); + + /// + /// Finds a sync log entry by its identifier. + /// + /// The unique identifier used to locate the sync log entry. + /// The cancellation token to observe while waiting for the task to complete. + /// The sync log entry if found, otherwise null. + Task FindById(string id, CancellationToken cancellationToken = default); } diff --git a/Syncerbell/ISyncQueueService.cs b/Syncerbell/ISyncQueueService.cs new file mode 100644 index 0000000..97fa06d --- /dev/null +++ b/Syncerbell/ISyncQueueService.cs @@ -0,0 +1,41 @@ +namespace Syncerbell; + +/// +/// Service for creating sync log entries that are marked as queued for distributed, asynchronous processing. +/// This service enables fanning out sync operations across multiple workers or services using a FIFO queue pattern. +/// +public interface ISyncQueueService +{ + /// + /// Creates sync log entries for all entities and marks them as queued for processing. + /// The entries are created in a queued state, ready to be picked up by distributed workers + /// for asynchronous sync processing in FIFO order. + /// + /// + /// This does not actually enqueue the entries in a message queue or similar system. + /// The caller should do that separately, and then call RecordQueueMessage to record the queue message + /// details in the sync log. + /// + /// The type of trigger that initiated this sync operation (e.g., manual, timer). + /// A cancellation token to cancel the operation if needed. + /// + /// A read-only list of sync log entries that were created and queued for processing. + /// Each entry represents a sync operation that needs to be performed by a distributed worker. + /// + /// Thrown when the operation is cancelled via the cancellation token. + Task> CreateAllQueuedSyncEntries(SyncTriggerType syncTrigger, + CancellationToken cancellationToken = default); + + /// + /// Records queue message details in the sync log entry after it has been enqueued in a message queue system. + /// This method should be called after successfully enqueuing a sync log entry to track the queue message information. + /// + /// The unique identifier used to locate the sync log entry in the system, such as a sync log ID or entity name. + /// The unique identifier of the message in the queue system. + /// A cancellation token to cancel the operation if needed. + /// A task representing the asynchronous operation. + /// Thrown when syncLogEntry or queueMessageId is null. + /// Thrown when the operation is cancelled via the cancellation token. + Task RecordQueueMessageId(string syncLogEntryId, string queueMessageId, + CancellationToken cancellationToken = default); +} diff --git a/Syncerbell/ISyncService.cs b/Syncerbell/ISyncService.cs index 63bbae5..3a47451 100644 --- a/Syncerbell/ISyncService.cs +++ b/Syncerbell/ISyncService.cs @@ -25,4 +25,15 @@ Task> SyncAllEligible( Task SyncEntityIfEligible(SyncTriggerType triggerType, SyncEntityOptions entity, CancellationToken cancellationToken = default); + + /// + /// Synchronizes a specific entity from an already-queued sync log entry. + /// + /// The id of the sync log entry to process. + /// The type of trigger that initiated the sync operation. + /// A cancellation token that can be used to cancel the operation. + /// Returns a task that resolves to a object indicating the outcome of the sync operation, or null if the entry was not found or could not be processed. + Task SyncEntityIfEligible(string syncLogEntryId, + SyncTriggerType triggerType, + CancellationToken cancellationToken = default); } diff --git a/Syncerbell/InMemorySyncLogPersistence.cs b/Syncerbell/InMemorySyncLogPersistence.cs index b5f62f3..015b87a 100644 --- a/Syncerbell/InMemorySyncLogPersistence.cs +++ b/Syncerbell/InMemorySyncLogPersistence.cs @@ -1,5 +1,6 @@ using System.Diagnostics; using System.Diagnostics.CodeAnalysis; +using Microsoft.Extensions.Logging; namespace Syncerbell; @@ -7,23 +8,21 @@ namespace Syncerbell; /// An in-memory implementation of ISyncLogPersistence for testing or lightweight scenarios. /// Not suitable for most production use due to lack of durability. /// -public class InMemorySyncLogPersistence(SyncerbellOptions options) : ISyncLogPersistence +public class InMemorySyncLogPersistence( + SyncerbellOptions options, + ILogger logger) + : ISyncLogPersistence { private readonly ReaderWriterLockSlim _lock = new(); - private readonly List entries = new(); - - /// - /// Attempts to acquire a log entry for the specified entity, creating a new entry if necessary. - /// - /// The entity options for which to acquire a log entry. - /// A token to monitor for cancellation requests. - /// An containing the acquired log entry and prior sync info, or null if the entry is already leased. + private readonly List entries = []; + + /// [SuppressMessage("ReSharper", "PossibleMultipleEnumeration")] - public Task TryAcquireLogEntry(SyncEntityOptions entity, CancellationToken cancellationToken = default) + public Task TryAcquireLogEntry(SyncTriggerType triggerType, + SyncEntityOptions entity, + CancellationToken cancellationToken = default) { - var parametersJson = entity.Parameters != null - ? System.Text.Json.JsonSerializer.Serialize(entity.Parameters) - : null; + var parametersJson = ParameterSerialization.Serialize(entity.Parameters); try { @@ -35,35 +34,12 @@ public class InMemorySyncLogPersistence(SyncerbellOptions options) : ISyncLogPer && e.SchemaVersion == entity.SchemaVersion && e.SyncStatus is SyncStatus.Pending or SyncStatus.InProgress); - if (logEntry?.LeaseExpiresAt != null && logEntry.LeaseExpiresAt < DateTime.UtcNow) + if (!TryProcessExistingEntry(logEntry)) { - // expired lease, set as expired and allow re-acquisition - logEntry.SyncStatus = SyncStatus.LeaseExpired; - - logEntry = null; // reset logEntry to allow creation of a new one - } - else if (logEntry?.LeasedAt != null) - { - // If the log entry is already leased or in progress, return null as we can't acquire it yet. - // This could either be a pending or in-progress entry. return Task.FromResult(null); } - var priorEntriesQuery = entries - .Where(e => e.Entity == entity.Entity - && e.ParametersJson == parametersJson - && e.SchemaVersion == entity.SchemaVersion - && e.SyncStatus != SyncStatus.Pending - && e.SyncStatus != SyncStatus.InProgress) - .OrderByDescending(e => e.CreatedAt); - - var priorSyncInfo = new PriorSyncInfo - { - HighWaterMark = priorEntriesQuery.FirstOrDefault(i => i.HighWaterMark != null)?.HighWaterMark, - LastSyncCompletedAt = priorEntriesQuery.FirstOrDefault(i => i.FinishedAt != null)?.FinishedAt, - LastSyncLeasedAt = priorEntriesQuery.FirstOrDefault(i => i.LeasedAt != null)?.LeasedAt, - LastSyncQueuedAt = priorEntriesQuery.FirstOrDefault()?.CreatedAt, - }; + var priorSyncInfo = GetPriorSyncInfo(entity.Entity, parametersJson, entity.SchemaVersion); if (logEntry == null) { @@ -73,23 +49,55 @@ public class InMemorySyncLogPersistence(SyncerbellOptions options) : ISyncLogPer ParametersJson = parametersJson, SyncStatus = SyncStatus.Pending, SchemaVersion = entity.SchemaVersion, + TriggerType = triggerType, CreatedAt = DateTime.UtcNow, - LeasedAt = DateTime.UtcNow, - LeasedBy = options.MachineIdProvider(), - LeaseExpiresAt = DateTime.UtcNow.Add(entity.LeaseExpiration ?? options.DefaultLeaseExpiration), }; entries.Add(logEntry); } - else + + UpdateLogEntryLease(logEntry, entity.LeaseExpiration ?? options.DefaultLeaseExpiration); + + // Clone the log entry so that it can't be modified outside the writer lock + return Task.FromResult(new AcquireLogEntryResult(logEntry.Clone(), priorSyncInfo)); + } + finally + { + _lock.ExitWriteLock(); + } + } + + /// + public Task TryAcquireLogEntry(ISyncLogEntry logEntry, SyncEntityOptions entity, CancellationToken cancellationToken = default) + { + try + { + _lock.EnterWriteLock(); + + if (logEntry is not InMemoryEntry inMemoryEntry) + { + throw new ArgumentException($"Log entry must be of type {nameof(InMemoryEntry)}", nameof(logEntry)); + } + + // check if the log entry matches the entity and schema version + if (!logEntry.Entity.Equals(entity.Entity) || + logEntry.SchemaVersion != entity.SchemaVersion || + logEntry.ParametersJson != ParameterSerialization.Serialize(entity.Parameters)) + { + logger.LogWarning("Log entry {LogEntryId} does not match entity {Entity}, parameters, or schema version. Cannot acquire.", logEntry.Id, entity.Entity); + return Task.FromResult(null); + } + + if (!TryProcessExistingEntry(inMemoryEntry)) { - logEntry.LeasedAt = DateTime.UtcNow; - logEntry.LeasedBy = options.MachineIdProvider(); - logEntry.LeaseExpiresAt = - DateTime.UtcNow.Add(entity.LeaseExpiration ?? options.DefaultLeaseExpiration); + return Task.FromResult(null); } + var priorSyncInfo = GetPriorSyncInfo(logEntry.Entity, logEntry.ParametersJson, logEntry.SchemaVersion); + + UpdateLogEntryLease(inMemoryEntry, entity.LeaseExpiration ?? options.DefaultLeaseExpiration); + // Clone the log entry so that it can't be modified outside the writer lock - return Task.FromResult(new AcquireLogEntryResult(logEntry.Clone(), priorSyncInfo)); + return Task.FromResult(new AcquireLogEntryResult(inMemoryEntry.Clone(), priorSyncInfo)); } finally { @@ -97,35 +105,89 @@ public class InMemorySyncLogPersistence(SyncerbellOptions options) : ISyncLogPer } } - /// - /// Updates an existing log entry for the specified entity. - /// - /// The entity options associated with the log entry. - /// The log entry to update. Must be of type . - /// A token to monitor for cancellation requests. - /// Thrown if is not of type . - /// Thrown if the log entry cannot be found for update. - /// A completed . - public Task UpdateLogEntry(SyncEntityOptions entity, ISyncLogEntry logEntry, CancellationToken cancellationToken = default) + private static bool TryProcessExistingEntry(InMemoryEntry? logEntry) + { + if (logEntry?.LeaseExpiresAt != null && logEntry.LeaseExpiresAt < DateTime.UtcNow) + { + // expired lease, set as expired and allow re-acquisition + logEntry.SyncStatus = SyncStatus.LeaseExpired; + return true; + } + + if (logEntry?.LeasedAt != null) + { + // If the log entry is already leased or in progress, return false as we can't acquire it yet. + return false; + } + + return true; + } + + [SuppressMessage("ReSharper", "PossibleMultipleEnumeration")] + private PriorSyncInfo GetPriorSyncInfo(string entity, string? parametersJson, int? schemaVersion) + { + var priorEntriesQuery = entries + .Where(e => e.Entity == entity + && e.ParametersJson == parametersJson + && e.SchemaVersion == schemaVersion + && e.SyncStatus != SyncStatus.Pending + && e.SyncStatus != SyncStatus.InProgress) + .OrderByDescending(e => e.CreatedAt); + + return new PriorSyncInfo + { + HighWaterMark = priorEntriesQuery.FirstOrDefault(i => i.HighWaterMark != null)?.HighWaterMark, + LastSyncCompletedAt = priorEntriesQuery.FirstOrDefault(i => i.FinishedAt != null)?.FinishedAt, + LastSyncLeasedAt = priorEntriesQuery.FirstOrDefault(i => i.LeasedAt != null)?.LeasedAt, + LastSyncQueuedAt = priorEntriesQuery.FirstOrDefault()?.CreatedAt, + }; + } + + private void UpdateLogEntryLease(InMemoryEntry logEntry, TimeSpan leaseExpiration) + { + logEntry.LeasedAt = DateTime.UtcNow; + logEntry.LeasedBy = options.MachineIdProvider(); + logEntry.LeaseExpiresAt = DateTime.UtcNow.Add(leaseExpiration); + } + + /// + public Task FindById(string id, CancellationToken cancellationToken = default) + { + try + { + _lock.EnterReadLock(); + + var entry = entries.FirstOrDefault(e => e.Id == id); + return Task.FromResult(entry?.Clone()); + } + finally + { + _lock.ExitReadLock(); + } + } + + /// + public Task UpdateLogEntry(ISyncLogEntry logEntry, CancellationToken cancellationToken = default) { if (logEntry is not InMemoryEntry entry) + { throw new ArgumentException($"Log entry must be of type {nameof(InMemoryEntry)}", nameof(logEntry)); + } try { _lock.EnterWriteLock(); - var existingEntryIndex = entries.FindIndex(e => e.Entity == entry.Entity && e.ParametersJson == entry.ParametersJson && e.SchemaVersion == entry.SchemaVersion); + var existingEntryIndex = entries.FindIndex(e => e.Id == logEntry.Id); if (existingEntryIndex < 0) { - throw new InvalidOperationException("Log entry not found for update."); + throw new InvalidOperationException($"Log entry with identifier '{logEntry.Id}' not found for update."); } Debug.Assert(!ReferenceEquals(entries[existingEntryIndex], entry)); - // Replace the existing entry with a clone of the updated one, - // so that the caller cannot modify it outside the lock after calling this method. + // Replace the existing entry with a clone of the updated one entries[existingEntryIndex] = entry.Clone(); } finally @@ -138,12 +200,26 @@ public Task UpdateLogEntry(SyncEntityOptions entity, ISyncLogEntry logEntry, Can private class InMemoryEntry : ISyncLogEntry { + public InMemoryEntry() + { + _id = Guid.NewGuid(); + } + + private InMemoryEntry(Guid id) + { + _id = id; + } + + private readonly Guid _id; + public required string Entity { get; init; } public string? ParametersJson { get; init; } public int? SchemaVersion { get; init; } + public SyncTriggerType TriggerType { get; set; } + public SyncStatus SyncStatus { get; set; } public DateTime CreatedAt { get; init; } @@ -170,13 +246,16 @@ private class InMemoryEntry : ISyncLogEntry public int? RecordCount { get; set; } + public string Id => _id.ToString(); + public InMemoryEntry Clone() { - return new InMemoryEntry + return new InMemoryEntry(_id) { Entity = Entity, ParametersJson = ParametersJson, SchemaVersion = SchemaVersion, + TriggerType = TriggerType, SyncStatus = SyncStatus, CreatedAt = CreatedAt, LeasedAt = LeasedAt, diff --git a/Syncerbell/ParameterSerialization.cs b/Syncerbell/ParameterSerialization.cs new file mode 100644 index 0000000..d2189ba --- /dev/null +++ b/Syncerbell/ParameterSerialization.cs @@ -0,0 +1,19 @@ +using System.Text.Json; + +namespace Syncerbell; + +/// +/// Provides methods for serializing parameters used in synchronization operations. +/// +public static class ParameterSerialization +{ + /// + /// Serializes a dictionary of parameters into a JSON string. + /// + /// The parameters to serialize. + /// A JSON string representation of the parameters, or null if the parameters are null or empty. + public static string? Serialize(SortedDictionary? parameters) + => parameters == null || parameters.Count == 0 + ? null + : JsonSerializer.Serialize(parameters); +} diff --git a/Syncerbell/ServiceCollectionExtensions.cs b/Syncerbell/ServiceCollectionExtensions.cs index 8eb40a3..973ed58 100644 --- a/Syncerbell/ServiceCollectionExtensions.cs +++ b/Syncerbell/ServiceCollectionExtensions.cs @@ -20,6 +20,7 @@ public static IServiceCollection AddSyncerbell(this IServiceCollection services, services.AddSingleton(options); services.AddSingleton(); + services.AddSingleton(); foreach (var entity in options.Entities) { diff --git a/Syncerbell/SyncEntityOptions.cs b/Syncerbell/SyncEntityOptions.cs index 64ecd80..1401774 100644 --- a/Syncerbell/SyncEntityOptions.cs +++ b/Syncerbell/SyncEntityOptions.cs @@ -62,6 +62,11 @@ public class SyncEntityOptions(string entity, Type entitySyncType) /// public SortedDictionary? Parameters { get; set; } + /// + /// Gets the serialized JSON representation of the parameters, or null if no parameters are set. + /// + public string? ParametersJson => ParameterSerialization.Serialize(Parameters); + /// /// Gets or sets the strategy to determine if the entity is eligible for synchronization. /// diff --git a/Syncerbell/SyncQueueService.cs b/Syncerbell/SyncQueueService.cs new file mode 100644 index 0000000..8ec77a6 --- /dev/null +++ b/Syncerbell/SyncQueueService.cs @@ -0,0 +1,106 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +namespace Syncerbell; + +/// +/// Implementation of ISyncQueueService that creates queued sync log entries for distributed processing. +/// +public class SyncQueueService( + SyncerbellOptions options, + IServiceProvider serviceProvider, + ISyncLogPersistence syncLogPersistence, + ILogger logger) + : ISyncQueueService +{ + /// + public async Task> CreateAllQueuedSyncEntries(SyncTriggerType syncTrigger, + CancellationToken cancellationToken = default) + { + var entities = new List(options.Entities); + + if (options.EntityProviderType is { } entityProviderType) + { + var entityProvider = serviceProvider.GetRequiredService(entityProviderType) as IEntityProvider + ?? throw new InvalidOperationException( + $"Entity provider type {entityProviderType.FullName} is not registered or does not implement {nameof(IEntityProvider)}."); + + var additionalEntities = await entityProvider.GetEntities(cancellationToken); + + if (additionalEntities.Count == 0) + { + logger.LogWarning("Entity provider returned no additional entities. Using configured entities only."); + } + else + { + logger.LogInformation("Entity provider returned {Count} additional entities.", additionalEntities.Count); + entities.AddRange(additionalEntities); + } + } + + if (entities.Count == 0) + { + logger.LogWarning("No entities registered for sync. Skipping queue creation operation."); + return []; + } + + var queuedEntries = new List(); + var currentTime = DateTime.UtcNow; + + foreach (var entity in entities) + { + try + { + var acquireResult = await syncLogPersistence.TryAcquireLogEntry(syncTrigger, entity, cancellationToken); + + if (acquireResult is not { SyncLogEntry: { } logEntry }) + { + logger.LogDebug("No log entry acquired for entity {EntityName}. Skipping queue creation.", entity.Entity); + continue; + } + + // Mark the entry as queued + logEntry.SyncStatus = SyncStatus.Pending; + logEntry.QueuedAt = currentTime; + + await syncLogPersistence.UpdateLogEntry(logEntry, cancellationToken); + + queuedEntries.Add(logEntry); + + logger.LogDebug("Created queued sync entry for entity {EntityName}.", entity.Entity); + } + catch (Exception ex) + { + logger.LogError(ex, "Failed to create queued sync entry for entity {EntityName}.", entity.Entity); + // Continue with other entities even if one fails + } + } + + logger.LogInformation("Created {Count} queued sync entries for distributed processing.", queuedEntries.Count); + + return queuedEntries; + } + + /// + public async Task RecordQueueMessageId(string syncLogEntryId, string queueMessageId, + CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(syncLogEntryId); + ArgumentException.ThrowIfNullOrWhiteSpace(queueMessageId); + + var syncLogEntry = await syncLogPersistence.FindById(syncLogEntryId, cancellationToken); + + if (syncLogEntry == null) + { + logger.LogWarning("Sync log entry with identifier {Id} not found.", syncLogEntryId); + throw new InvalidOperationException($"Sync log entry with identifier '{syncLogEntryId}' not found."); + } + + syncLogEntry.QueueMessageId = queueMessageId; + + await syncLogPersistence.UpdateLogEntry(syncLogEntry, cancellationToken); + + logger.LogDebug("Recorded queue message ID {QueueMessageId} for sync log entry with identifier {Id}.", + queueMessageId, syncLogEntryId); + } +} diff --git a/Syncerbell/SyncService.cs b/Syncerbell/SyncService.cs index bbd4c03..f0bed5b 100644 --- a/Syncerbell/SyncService.cs +++ b/Syncerbell/SyncService.cs @@ -19,26 +19,7 @@ public class SyncService( /// public async Task> SyncAllEligible(SyncTriggerType triggerType, CancellationToken cancellationToken = default) { - var entities = new List(options.Entities); - - if (options.EntityProviderType is { } entityProviderType) - { - var entityProvider = serviceProvider.GetRequiredService(entityProviderType) as IEntityProvider - ?? throw new InvalidOperationException( - $"Entity provider type {entityProviderType.FullName} is not registered or does not implement {nameof(IEntityProvider)}."); - - var additionalEntities = await entityProvider.GetEntities(cancellationToken); - - if (additionalEntities.Count == 0) - { - logger.LogWarning("Entity provider returned no additional entities. Using configured entities only."); - } - else - { - logger.LogInformation("Entity provider returned {Count} additional entities.", additionalEntities.Count); - entities.AddRange(additionalEntities); - } - } + var entities = await GetAllEntities(cancellationToken); if (entities.Count == 0) { @@ -68,16 +49,97 @@ public async Task> SyncAllEligible(SyncTriggerType tri SyncEntityOptions entity, CancellationToken cancellationToken = default) { - var acquireResult = await syncLogPersistence.TryAcquireLogEntry(entity, cancellationToken); + var acquireResult = await syncLogPersistence.TryAcquireLogEntry(triggerType, entity, cancellationToken); if (acquireResult is not { SyncLogEntry: { } log, PriorSyncInfo: { } priorSyncInfo }) { // If no log entry was acquired, we skip the sync for this entity. // This could be because the entity is already being processed or has a pending sync. - logger.LogDebug("No log entry acquired for entity {EntityName}. Skipping sync.", entity.Entity); + logger.LogInformation("No log entry acquired for entity {EntityName}. Skipping sync.", entity.Entity); + return null; + } + + return await ProcessSyncLogEntry(log, priorSyncInfo, triggerType, entity, cancellationToken); + } + + /// + public async Task SyncEntityIfEligible(string syncLogEntryId, + SyncTriggerType triggerType, + CancellationToken cancellationToken = default) + { + var log = await syncLogPersistence.FindById(syncLogEntryId, cancellationToken); + + if (log is null) + { + throw new InvalidOperationException($"Sync log entry with identifier '{syncLogEntryId}' not found."); + } + + var allEntities = await GetAllEntities(cancellationToken); + + var entity = allEntities.FirstOrDefault(e => e.Entity == log.Entity && e.SchemaVersion == log.SchemaVersion && e.ParametersJson == log.ParametersJson) + ?? throw new InvalidOperationException($"No entity configuration found for entity {log.Entity} " + + $"with parameters {log.ParametersJson ?? "null"} " + + $"and schema version {log.SchemaVersion?.ToString() ?? "null"} " + + $"from log entry {syncLogEntryId}."); + + var acquireResult = await syncLogPersistence.TryAcquireLogEntry(log, entity, cancellationToken); + + if (acquireResult is not { PriorSyncInfo: { } priorSyncInfo }) + { + // If no log entry was acquired, it might not exist or is already being processed + logger.LogInformation("No log entry acquired for ID {SyncLogEntryId}. Entry may not exist or is already being processed.", syncLogEntryId); return null; } + return await ProcessSyncLogEntry(log, priorSyncInfo, triggerType, entity, cancellationToken); + } + + /// + /// Gets all entities configured for synchronization, including both statically configured and dynamically provided entities. + /// + /// A cancellation token that can be used to cancel the operation. + /// A list of all sync entity options. + private async Task> GetAllEntities(CancellationToken cancellationToken) + { + var entities = new List(options.Entities); + + if (options.EntityProviderType is { } entityProviderType) + { + var entityProvider = serviceProvider.GetRequiredService(entityProviderType) as IEntityProvider + ?? throw new InvalidOperationException( + $"Entity provider type {entityProviderType.FullName} is not registered or does not implement {nameof(IEntityProvider)}."); + + var additionalEntities = await entityProvider.GetEntities(cancellationToken); + + if (additionalEntities.Count == 0) + { + logger.LogWarning("Entity provider returned no additional entities. Using configured entities only."); + } + else + { + logger.LogInformation("Entity provider returned {Count} additional entities.", additionalEntities.Count); + entities.AddRange(additionalEntities); + } + } + + return entities; + } + + /// + /// Processes a sync log entry by checking eligibility and executing the sync if eligible. + /// + /// The sync log entry to process. + /// The prior sync information. + /// The trigger type for the sync operation. + /// The entity configuration. + /// A cancellation token that can be used to cancel the operation. + /// Returns a task that resolves to a object indicating the outcome of the sync operation, or null if not eligible. + private async Task ProcessSyncLogEntry(ISyncLogEntry log, + PriorSyncInfo priorSyncInfo, + SyncTriggerType triggerType, + SyncEntityOptions entity, + CancellationToken cancellationToken) + { var trigger = new SyncTrigger { PriorSyncInfo = priorSyncInfo, @@ -154,7 +216,7 @@ private async Task ReportProgress(SyncEntityOptions entity, ISyncLogEntry logEnt logger.LogDebug("Reporting progress for entity {EntityName}: {ProgressValue}/{ProgressMax} ({ProgressPercentage:P})", entity.Entity, progress.Value, progress.Max, logEntry.ProgressPercentage); - await syncLogPersistence.UpdateLogEntry(entity, logEntry, cancellationToken); + await syncLogPersistence.UpdateLogEntry(logEntry, cancellationToken); } private async Task UpdateLogEntry(ISyncLogEntry log, SyncStatus status, SyncResult syncResult, CancellationToken cancellationToken) @@ -163,6 +225,6 @@ private async Task UpdateLogEntry(ISyncLogEntry log, SyncStatus status, SyncResu log.ResultMessage = syncResult.Message ?? (syncResult.Success ? SyncSuccessMessage : SyncFailedMessage); log.FinishedAt = DateTime.UtcNow; log.HighWaterMark = syncResult.HighWaterMark; - await syncLogPersistence.UpdateLogEntry(syncResult.Entity, log, cancellationToken); + await syncLogPersistence.UpdateLogEntry(log, cancellationToken); } } diff --git a/Syncerbell/SyncTriggerType.cs b/Syncerbell/SyncTriggerType.cs index 99b9314..c34444f 100644 --- a/Syncerbell/SyncTriggerType.cs +++ b/Syncerbell/SyncTriggerType.cs @@ -6,9 +6,9 @@ namespace Syncerbell; public enum SyncTriggerType { /// - /// A custom trigger type, typically used for user-defined or application-specific scenarios. + /// An unknown trigger type, used when the trigger is not specified or recognized. /// - Custom = 0, + Unknown = 0, /// /// A timer-based trigger, such as a scheduled or periodic sync. /// From 80b20acc993f6a4ecbfde1eda67edc601ffba3b3 Mon Sep 17 00:00:00 2001 From: Paul Irwin Date: Thu, 31 Jul 2025 12:16:54 -0600 Subject: [PATCH 4/7] Add example project; some bug fixes; add AcquireLeaseBehavior --- ExampleInMemoryQueue/Customer.cs | 11 ++ ExampleInMemoryQueue/CustomerSync.cs | 33 ++++ .../ExampleInMemoryQueue.csproj | 21 +++ ExampleInMemoryQueue/Order.cs | 15 ++ ExampleInMemoryQueue/OrderSync.cs | 33 ++++ ExampleInMemoryQueue/Product.cs | 13 ++ ExampleInMemoryQueue/ProductSync.cs | 33 ++++ ExampleInMemoryQueue/Program.cs | 165 ++++++++++++++++++ F23.Syncerbell.sln | 7 + .../EntityFrameworkCoreSyncLogPersistence.cs | 23 ++- Syncerbell/AcquireLeaseBehavior.cs | 27 +++ Syncerbell/ISyncLogPersistence.cs | 6 + Syncerbell/InMemorySyncLogPersistence.cs | 24 ++- Syncerbell/SyncQueueService.cs | 2 +- Syncerbell/SyncService.cs | 4 +- 15 files changed, 399 insertions(+), 18 deletions(-) create mode 100644 ExampleInMemoryQueue/Customer.cs create mode 100644 ExampleInMemoryQueue/CustomerSync.cs create mode 100644 ExampleInMemoryQueue/ExampleInMemoryQueue.csproj create mode 100644 ExampleInMemoryQueue/Order.cs create mode 100644 ExampleInMemoryQueue/OrderSync.cs create mode 100644 ExampleInMemoryQueue/Product.cs create mode 100644 ExampleInMemoryQueue/ProductSync.cs create mode 100644 ExampleInMemoryQueue/Program.cs create mode 100644 Syncerbell/AcquireLeaseBehavior.cs diff --git a/ExampleInMemoryQueue/Customer.cs b/ExampleInMemoryQueue/Customer.cs new file mode 100644 index 0000000..50668b5 --- /dev/null +++ b/ExampleInMemoryQueue/Customer.cs @@ -0,0 +1,11 @@ +namespace ExampleInMemoryQueue; + +public record Customer(string Name, string Email) +{ + public int Id { get; init; } + + public DateTime CreatedAt { get; init; } = DateTime.UtcNow; + + public DateTime LastSyncAt { get; set; } = DateTime.UtcNow; + +} diff --git a/ExampleInMemoryQueue/CustomerSync.cs b/ExampleInMemoryQueue/CustomerSync.cs new file mode 100644 index 0000000..09ebbeb --- /dev/null +++ b/ExampleInMemoryQueue/CustomerSync.cs @@ -0,0 +1,33 @@ +using Microsoft.Extensions.Logging; +using Syncerbell; + +namespace ExampleInMemoryQueue; + +public class CustomerSync(ILogger logger) : IEntitySync +{ + public async Task Run(EntitySyncContext context, CancellationToken cancellationToken = default) + { + var threadId = Thread.CurrentThread.ManagedThreadId; + logger.LogInformation("CustomerSync starting on thread {ThreadId} for entity {EntityType}", + threadId, context.Entity.Entity); + + const int totalOperations = 5; + await context.ReportProgress(0, totalOperations); + + for (int i = 1; i <= totalOperations; i++) + { + logger.LogInformation("CustomerSync thread {ThreadId}: Processing operation {Operation} of {Total}", + threadId, i, totalOperations); + + // Simulate customer data synchronization work + await Task.Delay(Random.Shared.Next(500, 1500), cancellationToken); + + await context.ReportProgress(i, totalOperations); + } + + logger.LogInformation("CustomerSync completed on thread {ThreadId} for entity {EntityType}", + threadId, context.Entity.Entity); + + return new SyncResult(Entity: context.Entity, Success: true); + } +} diff --git a/ExampleInMemoryQueue/ExampleInMemoryQueue.csproj b/ExampleInMemoryQueue/ExampleInMemoryQueue.csproj new file mode 100644 index 0000000..78137da --- /dev/null +++ b/ExampleInMemoryQueue/ExampleInMemoryQueue.csproj @@ -0,0 +1,21 @@ + + + + Exe + net8.0 + enable + enable + + + + + + + + + + + + + + diff --git a/ExampleInMemoryQueue/Order.cs b/ExampleInMemoryQueue/Order.cs new file mode 100644 index 0000000..bbc8b3d --- /dev/null +++ b/ExampleInMemoryQueue/Order.cs @@ -0,0 +1,15 @@ +namespace ExampleInMemoryQueue; + +public record Order(int CustomerId, DateTime OrderDate) +{ + public int Id { get; init; } + + public DateTime CreatedAt { get; init; } = DateTime.UtcNow; + + public DateTime LastSyncAt { get; set; } = DateTime.UtcNow; + + public decimal TotalAmount { get; set; } = 0m; + + public string Status { get; set; } = "Pending"; + +} diff --git a/ExampleInMemoryQueue/OrderSync.cs b/ExampleInMemoryQueue/OrderSync.cs new file mode 100644 index 0000000..eeac351 --- /dev/null +++ b/ExampleInMemoryQueue/OrderSync.cs @@ -0,0 +1,33 @@ +using Microsoft.Extensions.Logging; +using Syncerbell; + +namespace ExampleInMemoryQueue; + +public class OrderSync(ILogger logger) : IEntitySync +{ + public async Task Run(EntitySyncContext context, CancellationToken cancellationToken = default) + { + var threadId = Thread.CurrentThread.ManagedThreadId; + logger.LogInformation("OrderSync starting on thread {ThreadId} for entity {EntityType}", + threadId, context.Entity.Entity); + + const int totalOperations = 4; + await context.ReportProgress(0, totalOperations); + + for (int i = 1; i <= totalOperations; i++) + { + logger.LogInformation("OrderSync thread {ThreadId}: Processing operation {Operation} of {Total}", + threadId, i, totalOperations); + + // Simulate order processing synchronization work + await Task.Delay(Random.Shared.Next(400, 1200), cancellationToken); + + await context.ReportProgress(i, totalOperations); + } + + logger.LogInformation("OrderSync completed on thread {ThreadId} for entity {EntityType}", + threadId, context.Entity.Entity); + + return new SyncResult(Entity: context.Entity, Success: true); + } +} diff --git a/ExampleInMemoryQueue/Product.cs b/ExampleInMemoryQueue/Product.cs new file mode 100644 index 0000000..6990575 --- /dev/null +++ b/ExampleInMemoryQueue/Product.cs @@ -0,0 +1,13 @@ +namespace ExampleInMemoryQueue; + +public record Product(string Name, decimal Price, string Category) +{ + public int Id { get; init; } + + public DateTime CreatedAt { get; init; } = DateTime.UtcNow; + + public DateTime LastSyncAt { get; set; } = DateTime.UtcNow; + + public int StockQuantity { get; set; } = 0; + +} diff --git a/ExampleInMemoryQueue/ProductSync.cs b/ExampleInMemoryQueue/ProductSync.cs new file mode 100644 index 0000000..437868c --- /dev/null +++ b/ExampleInMemoryQueue/ProductSync.cs @@ -0,0 +1,33 @@ +using Microsoft.Extensions.Logging; +using Syncerbell; + +namespace ExampleInMemoryQueue; + +public class ProductSync(ILogger logger) : IEntitySync +{ + public async Task Run(EntitySyncContext context, CancellationToken cancellationToken = default) + { + var threadId = Thread.CurrentThread.ManagedThreadId; + logger.LogInformation("ProductSync starting on thread {ThreadId} for entity {EntityType}", + threadId, context.Entity.Entity); + + const int totalOperations = 7; + await context.ReportProgress(0, totalOperations); + + for (int i = 1; i <= totalOperations; i++) + { + logger.LogInformation("ProductSync thread {ThreadId}: Processing operation {Operation} of {Total}", + threadId, i, totalOperations); + + // Simulate product inventory synchronization work + await Task.Delay(Random.Shared.Next(300, 1000), cancellationToken); + + await context.ReportProgress(i, totalOperations); + } + + logger.LogInformation("ProductSync completed on thread {ThreadId} for entity {EntityType}", + threadId, context.Entity.Entity); + + return new SyncResult(Entity: context.Entity, Success: true); + } +} diff --git a/ExampleInMemoryQueue/Program.cs b/ExampleInMemoryQueue/Program.cs new file mode 100644 index 0000000..b387e9b --- /dev/null +++ b/ExampleInMemoryQueue/Program.cs @@ -0,0 +1,165 @@ +using ExampleInMemoryQueue; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Syncerbell; +using System.Linq; + +// Create service collection and configure services +var services = new ServiceCollection(); + +// Add logging +services.AddLogging(builder => +{ + builder.AddConsole(); + builder.SetMinimumLevel(LogLevel.Information); +}); + +// Configure Syncerbell with multiple entities +services.AddSyncerbell(options => +{ + options.DefaultLeaseExpiration = TimeSpan.FromMinutes(30); + + // Add Customer entity + options.AddEntity(entity => + { + entity.LeaseExpiration = TimeSpan.FromMinutes(15); + entity.Parameters = new SortedDictionary() + { + ["Region"] = "US-West", + ["BatchSize"] = 100 + }; + entity.Eligibility = new AlwaysEligibleStrategy(); + }); + + // Add Product entity + options.AddEntity(entity => + { + entity.LeaseExpiration = TimeSpan.FromMinutes(20); + entity.Parameters = new SortedDictionary() + { + ["Category"] = "Electronics", + ["BatchSize"] = 50 + }; + entity.Eligibility = new AlwaysEligibleStrategy(); + }); + + // Add Order entity + options.AddEntity(entity => + { + entity.LeaseExpiration = TimeSpan.FromMinutes(10); + entity.Parameters = new SortedDictionary() + { + ["Status"] = "Active", + ["BatchSize"] = 25 + }; + entity.Eligibility = new AlwaysEligibleStrategy(); + }); +}); + +// Add in-memory persistence for sync logs +services.AddSyncerbellInMemoryPersistence(); + +// Build service provider +var serviceProvider = services.BuildServiceProvider(); + +// Get required services +var logger = serviceProvider.GetRequiredService>(); +var syncQueueService = serviceProvider.GetRequiredService(); +var syncService = serviceProvider.GetRequiredService(); + +logger.LogInformation("=== Syncerbell Threading Example ==="); +logger.LogInformation("This example demonstrates fanning out queue operations using multiple threads"); + +try +{ + // Step 1: Create queued sync entries for all entities + logger.LogInformation("Step 1: Creating queued sync entries for all entities..."); + var queuedEntries = await syncQueueService.CreateAllQueuedSyncEntries(SyncTriggerType.Manual); + + logger.LogInformation("Created {Count} queued sync entries:", queuedEntries.Count); + foreach (var entry in queuedEntries) + { + logger.LogInformation(" - Entry ID: {EntryId}, Entity: {EntityType}", entry.Id, entry.Entity); + } + + // Step 2: Simulate recording queue message IDs (as if we put them in a real queue) + logger.LogInformation("Step 2: Recording queue message IDs..."); + var queueMessageTasks = queuedEntries.Select(async entry => + { + var queueMessageId = $"msg_{Guid.NewGuid():N}"; + await syncQueueService.RecordQueueMessageId(entry.Id, queueMessageId); + logger.LogInformation("Recorded queue message ID {MessageId} for entry {EntryId}", queueMessageId, entry.Id); + return (entry, queueMessageId); + }); + + var entriesWithMessages = await Task.WhenAll(queueMessageTasks); + + // Step 3: Process sync entries concurrently using multiple threads + logger.LogInformation("Step 3: Processing sync entries concurrently using multiple threads..."); + logger.LogInformation("Starting {Count} threads for parallel processing...", queuedEntries.Count); + + var processingTasks = entriesWithMessages.Select(async entryInfo => + { + var (entry, _) = entryInfo; + var threadId = Environment.CurrentManagedThreadId; + + logger.LogInformation("Thread {ThreadId}: Starting to process entry {EntryId} (Entity: {EntityType})", + threadId, entry.Id, entry.Entity); + + try + { + // Use ISyncService to sync the individual entity from the queued entry + var result = await syncService.SyncEntityIfEligible(entry.Id, SyncTriggerType.Manual); + + if (result != null) + { + logger.LogInformation("Thread {ThreadId}: Successfully processed entry {EntryId} - {EntityType}", + threadId, entry.Id, entry.Entity); + return result; + } + else + { + logger.LogWarning("Thread {ThreadId}: Failed to process entry {EntryId} - result was null", + threadId, entry.Id); + return null; + } + } + catch (Exception ex) + { + logger.LogError(ex, "Thread {ThreadId}: Error processing entry {EntryId}", + threadId, entry.Id); + return null; + } + }); + + // Wait for all threads to complete + var results = await Task.WhenAll(processingTasks); + + logger.LogInformation("=== Processing Complete ==="); + + var successfulResults = results.Where(r => r?.Success == true).ToList(); + var failedResults = results.Where(r => r == null || r.Success == false).ToList(); + + logger.LogInformation("Successfully processed: {SuccessCount} entities", successfulResults.Count); + logger.LogInformation("Failed to process: {FailCount} entities", failedResults.Count); + + if (successfulResults.Any()) + { + logger.LogInformation("Successful sync results:"); + foreach (var result in successfulResults) + { + logger.LogInformation(" - {EntityType}: Success", result?.Entity?.EntityType?.Name ?? "null"); + } + } + + logger.LogInformation("Example completed successfully!"); +} +catch (Exception ex) +{ + logger.LogError(ex, "An error occurred during the example execution"); +} +finally +{ + // Dispose the service provider + serviceProvider.Dispose(); +} diff --git a/F23.Syncerbell.sln b/F23.Syncerbell.sln index e081a1f..52f1585 100644 --- a/F23.Syncerbell.sln +++ b/F23.Syncerbell.sln @@ -20,6 +20,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution global.json = global.json EndProjectSection EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ExampleInMemoryQueue", "ExampleInMemoryQueue\ExampleInMemoryQueue.csproj", "{F8782B44-7714-4AC9-9F58-3B1D8BACBE79}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -46,8 +48,13 @@ Global {C844F729-43EB-4D7E-8CA5-BDCBF83A971E}.Debug|Any CPU.Build.0 = Debug|Any CPU {C844F729-43EB-4D7E-8CA5-BDCBF83A971E}.Release|Any CPU.ActiveCfg = Release|Any CPU {C844F729-43EB-4D7E-8CA5-BDCBF83A971E}.Release|Any CPU.Build.0 = Release|Any CPU + {F8782B44-7714-4AC9-9F58-3B1D8BACBE79}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {F8782B44-7714-4AC9-9F58-3B1D8BACBE79}.Debug|Any CPU.Build.0 = Debug|Any CPU + {F8782B44-7714-4AC9-9F58-3B1D8BACBE79}.Release|Any CPU.ActiveCfg = Release|Any CPU + {F8782B44-7714-4AC9-9F58-3B1D8BACBE79}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(NestedProjects) = preSolution {BE825FA8-9B20-40C8-B233-65250AAD268D} = {7C18AD40-956C-46D5-96E5-4DA767C38067} + {F8782B44-7714-4AC9-9F58-3B1D8BACBE79} = {7C18AD40-956C-46D5-96E5-4DA767C38067} EndGlobalSection EndGlobal diff --git a/Syncerbell.EntityFrameworkCore/EntityFrameworkCoreSyncLogPersistence.cs b/Syncerbell.EntityFrameworkCore/EntityFrameworkCoreSyncLogPersistence.cs index a369f9c..cde8aba 100644 --- a/Syncerbell.EntityFrameworkCore/EntityFrameworkCoreSyncLogPersistence.cs +++ b/Syncerbell.EntityFrameworkCore/EntityFrameworkCoreSyncLogPersistence.cs @@ -13,7 +13,9 @@ public class EntityFrameworkCoreSyncLogPersistence( : ISyncLogPersistence { /// - public async Task TryAcquireLogEntry(SyncTriggerType triggerType, SyncEntityOptions entity, + public async Task TryAcquireLogEntry(SyncTriggerType triggerType, + SyncEntityOptions entity, + AcquireLeaseBehavior behavior = AcquireLeaseBehavior.AcquireIfNotLeased, CancellationToken cancellationToken = default) { return await ResilientTransaction.New(context).ExecuteAsync(async () => @@ -34,7 +36,7 @@ public class EntityFrameworkCoreSyncLogPersistence( logEntry = null; // reset logEntry to allow creation of a new one } - else if (logEntry?.LeasedAt != null) + else if (behavior != AcquireLeaseBehavior.ForceAcquire && logEntry?.LeasedAt != null) { // If the log entry is already leased or in progress, return null as we can't acquire it yet. // This could either be a pending or in-progress entry. @@ -43,7 +45,7 @@ public class EntityFrameworkCoreSyncLogPersistence( var priorSyncInfo = await GetPriorSyncInfo(entity.Entity, parametersJson, entity.SchemaVersion, cancellationToken); - logEntry = await CreateOrUpdateLogEntry(triggerType, entity, parametersJson, logEntry, cancellationToken); + logEntry = await CreateOrUpdateLogEntry(behavior, triggerType, entity, parametersJson, logEntry, cancellationToken); return new AcquireLogEntryResult(logEntry, priorSyncInfo); }, cancellationToken); @@ -52,6 +54,7 @@ public class EntityFrameworkCoreSyncLogPersistence( /// public async Task TryAcquireLogEntry(ISyncLogEntry syncLogEntry, SyncEntityOptions entity, + AcquireLeaseBehavior behavior = AcquireLeaseBehavior.AcquireIfNotLeased, CancellationToken cancellationToken = default) { return await ResilientTransaction.New(context).ExecuteAsync(async () => @@ -77,7 +80,7 @@ public class EntityFrameworkCoreSyncLogPersistence( syncLogEntry.SyncStatus = SyncStatus.LeaseExpired; await context.SaveChangesAsync(cancellationToken); } - else if (syncLogEntry.LeasedAt != null) + else if (behavior != AcquireLeaseBehavior.ForceAcquire && syncLogEntry.LeasedAt != null) { // If the log entry is already leased or in progress, return null as we can't acquire it yet. return null; @@ -87,7 +90,10 @@ public class EntityFrameworkCoreSyncLogPersistence( var priorSyncInfo = await GetPriorSyncInfo(syncLogEntry.Entity, syncLogEntry.ParametersJson, syncLogEntry.SchemaVersion, cancellationToken); // Lease the log entry - UpdateLogEntryLease(efSyncLogEntry, entity.LeaseExpiration ?? options.DefaultLeaseExpiration); + if (behavior != AcquireLeaseBehavior.DoNotAcquire) + { + UpdateLogEntryLease(efSyncLogEntry, entity.LeaseExpiration ?? options.DefaultLeaseExpiration); + } context.SyncLogEntries.Update(efSyncLogEntry); await context.SaveChangesAsync(cancellationToken); @@ -131,6 +137,7 @@ private void UpdateLogEntryLease(SyncLogEntry logEntry, TimeSpan leaseExpiration } private async Task CreateOrUpdateLogEntry( + AcquireLeaseBehavior behavior, SyncTriggerType triggerType, SyncEntityOptions entity, string? parametersJson, @@ -149,15 +156,15 @@ private async Task CreateOrUpdateLogEntry( CreatedAt = DateTime.UtcNow, }; context.SyncLogEntries.Add(logEntry); + await context.SaveChangesAsync(cancellationToken); } - else + else if (behavior != AcquireLeaseBehavior.DoNotAcquire) { UpdateLogEntryLease(logEntry, entity.LeaseExpiration ?? options.DefaultLeaseExpiration); context.SyncLogEntries.Update(logEntry); + await context.SaveChangesAsync(cancellationToken); } - await context.SaveChangesAsync(cancellationToken); - return logEntry; } diff --git a/Syncerbell/AcquireLeaseBehavior.cs b/Syncerbell/AcquireLeaseBehavior.cs new file mode 100644 index 0000000..873fcb1 --- /dev/null +++ b/Syncerbell/AcquireLeaseBehavior.cs @@ -0,0 +1,27 @@ +namespace Syncerbell; + +/// +/// Represents the behavior for acquiring a lease of a sync log entry during synchronization operations. +/// +public enum AcquireLeaseBehavior +{ + /// + /// Do not acquire a lease for the sync log entry. + /// If the entry is unleased or created, this will leave the entry unleased, + /// allowing other processes to acquire it if needed. + /// If the entry is already leased, it will fail to acquire the lease. + /// + DoNotAcquire = 0, + + /// + /// Try to acquire a lease for the sync log entry only if it is not already leased. + /// If the entry is already leased, it will fail to acquire the lease. + /// + AcquireIfNotLeased = 1, + + /// + /// Acquire a lease for the sync log entry, regardless of its current leased state. + /// This will override any existing lease, effectively leasing the entry to the current process. + /// + ForceAcquire = 2, +} diff --git a/Syncerbell/ISyncLogPersistence.cs b/Syncerbell/ISyncLogPersistence.cs index 0f259cc..114fc8a 100644 --- a/Syncerbell/ISyncLogPersistence.cs +++ b/Syncerbell/ISyncLogPersistence.cs @@ -17,10 +17,13 @@ public interface ISyncLogPersistence /// /// The type of trigger that initiated the sync operation. /// The entity for which to acquire a log entry. + /// The behavior for acquiring the lease of the log entry. + /// See the documentation for for details. /// The cancellation token to observe while waiting for the task to complete. /// Asynchronously returns the acquired log entry details, or null. Task TryAcquireLogEntry(SyncTriggerType triggerType, SyncEntityOptions entity, + AcquireLeaseBehavior behavior = AcquireLeaseBehavior.AcquireIfNotLeased, CancellationToken cancellationToken = default); /// @@ -30,10 +33,13 @@ public interface ISyncLogPersistence /// /// The sync log entry to acquire. /// The entity options for the log entry to acquire. + /// The behavior for acquiring the lease of the log entry. + /// See the documentation for for details. /// The cancellation token to observe while waiting for the task to complete. /// Asynchronously returns the acquired log entry details, or null. Task TryAcquireLogEntry(ISyncLogEntry syncLogEntry, SyncEntityOptions entity, + AcquireLeaseBehavior behavior = AcquireLeaseBehavior.AcquireIfNotLeased, CancellationToken cancellationToken = default); /// diff --git a/Syncerbell/InMemorySyncLogPersistence.cs b/Syncerbell/InMemorySyncLogPersistence.cs index 015b87a..fd36293 100644 --- a/Syncerbell/InMemorySyncLogPersistence.cs +++ b/Syncerbell/InMemorySyncLogPersistence.cs @@ -20,6 +20,7 @@ public class InMemorySyncLogPersistence( [SuppressMessage("ReSharper", "PossibleMultipleEnumeration")] public Task TryAcquireLogEntry(SyncTriggerType triggerType, SyncEntityOptions entity, + AcquireLeaseBehavior behavior = AcquireLeaseBehavior.AcquireIfNotLeased, CancellationToken cancellationToken = default) { var parametersJson = ParameterSerialization.Serialize(entity.Parameters); @@ -34,7 +35,7 @@ public class InMemorySyncLogPersistence( && e.SchemaVersion == entity.SchemaVersion && e.SyncStatus is SyncStatus.Pending or SyncStatus.InProgress); - if (!TryProcessExistingEntry(logEntry)) + if (!TryProcessExistingEntry(behavior, logEntry)) { return Task.FromResult(null); } @@ -55,7 +56,10 @@ public class InMemorySyncLogPersistence( entries.Add(logEntry); } - UpdateLogEntryLease(logEntry, entity.LeaseExpiration ?? options.DefaultLeaseExpiration); + if (behavior != AcquireLeaseBehavior.DoNotAcquire) + { + UpdateLogEntryLease(logEntry, entity.LeaseExpiration ?? options.DefaultLeaseExpiration); + } // Clone the log entry so that it can't be modified outside the writer lock return Task.FromResult(new AcquireLogEntryResult(logEntry.Clone(), priorSyncInfo)); @@ -67,7 +71,10 @@ public class InMemorySyncLogPersistence( } /// - public Task TryAcquireLogEntry(ISyncLogEntry logEntry, SyncEntityOptions entity, CancellationToken cancellationToken = default) + public Task TryAcquireLogEntry(ISyncLogEntry logEntry, + SyncEntityOptions entity, + AcquireLeaseBehavior behavior = AcquireLeaseBehavior.AcquireIfNotLeased, + CancellationToken cancellationToken = default) { try { @@ -87,14 +94,17 @@ public class InMemorySyncLogPersistence( return Task.FromResult(null); } - if (!TryProcessExistingEntry(inMemoryEntry)) + if (!TryProcessExistingEntry(behavior, inMemoryEntry)) { return Task.FromResult(null); } var priorSyncInfo = GetPriorSyncInfo(logEntry.Entity, logEntry.ParametersJson, logEntry.SchemaVersion); - UpdateLogEntryLease(inMemoryEntry, entity.LeaseExpiration ?? options.DefaultLeaseExpiration); + if (behavior != AcquireLeaseBehavior.DoNotAcquire) + { + UpdateLogEntryLease(inMemoryEntry, entity.LeaseExpiration ?? options.DefaultLeaseExpiration); + } // Clone the log entry so that it can't be modified outside the writer lock return Task.FromResult(new AcquireLogEntryResult(inMemoryEntry.Clone(), priorSyncInfo)); @@ -105,7 +115,7 @@ public class InMemorySyncLogPersistence( } } - private static bool TryProcessExistingEntry(InMemoryEntry? logEntry) + private static bool TryProcessExistingEntry(AcquireLeaseBehavior behavior, InMemoryEntry? logEntry) { if (logEntry?.LeaseExpiresAt != null && logEntry.LeaseExpiresAt < DateTime.UtcNow) { @@ -114,7 +124,7 @@ private static bool TryProcessExistingEntry(InMemoryEntry? logEntry) return true; } - if (logEntry?.LeasedAt != null) + if (behavior != AcquireLeaseBehavior.ForceAcquire && logEntry?.LeasedAt != null) { // If the log entry is already leased or in progress, return false as we can't acquire it yet. return false; diff --git a/Syncerbell/SyncQueueService.cs b/Syncerbell/SyncQueueService.cs index 8ec77a6..cd1b5c8 100644 --- a/Syncerbell/SyncQueueService.cs +++ b/Syncerbell/SyncQueueService.cs @@ -51,7 +51,7 @@ public async Task> CreateAllQueuedSyncEntries(SyncT { try { - var acquireResult = await syncLogPersistence.TryAcquireLogEntry(syncTrigger, entity, cancellationToken); + var acquireResult = await syncLogPersistence.TryAcquireLogEntry(syncTrigger, entity, AcquireLeaseBehavior.DoNotAcquire, cancellationToken); if (acquireResult is not { SyncLogEntry: { } logEntry }) { diff --git a/Syncerbell/SyncService.cs b/Syncerbell/SyncService.cs index f0bed5b..c742352 100644 --- a/Syncerbell/SyncService.cs +++ b/Syncerbell/SyncService.cs @@ -49,7 +49,7 @@ public async Task> SyncAllEligible(SyncTriggerType tri SyncEntityOptions entity, CancellationToken cancellationToken = default) { - var acquireResult = await syncLogPersistence.TryAcquireLogEntry(triggerType, entity, cancellationToken); + var acquireResult = await syncLogPersistence.TryAcquireLogEntry(triggerType, entity, AcquireLeaseBehavior.AcquireIfNotLeased, cancellationToken); if (acquireResult is not { SyncLogEntry: { } log, PriorSyncInfo: { } priorSyncInfo }) { @@ -82,7 +82,7 @@ public async Task> SyncAllEligible(SyncTriggerType tri $"and schema version {log.SchemaVersion?.ToString() ?? "null"} " + $"from log entry {syncLogEntryId}."); - var acquireResult = await syncLogPersistence.TryAcquireLogEntry(log, entity, cancellationToken); + var acquireResult = await syncLogPersistence.TryAcquireLogEntry(log, entity, AcquireLeaseBehavior.AcquireIfNotLeased, cancellationToken); if (acquireResult is not { PriorSyncInfo: { } priorSyncInfo }) { From ab3debdbd604114d556f79ef9d4d089f55f974b8 Mon Sep 17 00:00:00 2001 From: Paul Irwin Date: Thu, 31 Jul 2025 14:45:26 -0600 Subject: [PATCH 5/7] Rename LastSyncQueuedAt to LastSyncCreatedAt --- .../EntityFrameworkCoreSyncLogPersistence.cs | 2 +- Syncerbell.Tests/AlwaysEligibleStrategyTests.cs | 2 +- Syncerbell.Tests/EntitySyncContextTests.cs | 14 +++++++------- .../IntervalEligibilityStrategyTests.cs | 2 +- Syncerbell/InMemorySyncLogPersistence.cs | 2 +- Syncerbell/PriorSyncInfo.cs | 4 ++-- 6 files changed, 13 insertions(+), 13 deletions(-) diff --git a/Syncerbell.EntityFrameworkCore/EntityFrameworkCoreSyncLogPersistence.cs b/Syncerbell.EntityFrameworkCore/EntityFrameworkCoreSyncLogPersistence.cs index cde8aba..3a39cc3 100644 --- a/Syncerbell.EntityFrameworkCore/EntityFrameworkCoreSyncLogPersistence.cs +++ b/Syncerbell.EntityFrameworkCore/EntityFrameworkCoreSyncLogPersistence.cs @@ -112,7 +112,7 @@ private async Task GetPriorSyncInfo(string entity, string? parame return new PriorSyncInfo { HighWaterMark = (await priorEntriesQuery.FirstOrDefaultAsync(i => i.HighWaterMark != null, cancellationToken))?.HighWaterMark, - LastSyncQueuedAt = (await priorEntriesQuery.FirstOrDefaultAsync(cancellationToken))?.CreatedAt, + LastSyncCreatedAt = (await priorEntriesQuery.FirstOrDefaultAsync(cancellationToken))?.CreatedAt, LastSyncLeasedAt = (await priorEntriesQuery.FirstOrDefaultAsync(i => i.LeasedAt != null, cancellationToken))?.LeasedAt, LastSyncCompletedAt = (await priorEntriesQuery.FirstOrDefaultAsync(i => i.FinishedAt != null, cancellationToken))?.FinishedAt, }; diff --git a/Syncerbell.Tests/AlwaysEligibleStrategyTests.cs b/Syncerbell.Tests/AlwaysEligibleStrategyTests.cs index bd3fb49..ab508e5 100644 --- a/Syncerbell.Tests/AlwaysEligibleStrategyTests.cs +++ b/Syncerbell.Tests/AlwaysEligibleStrategyTests.cs @@ -20,7 +20,7 @@ public async Task AlwaysEligible_AlwaysReturnsTrue(SyncTriggerType syncTriggerTy PriorSyncInfo = new PriorSyncInfo() { LastSyncLeasedAt = lastSyncDaysAgo.HasValue ? DateTime.UtcNow.AddDays(-1 * lastSyncDaysAgo.Value) : null, - LastSyncQueuedAt = lastSyncDaysAgo.HasValue ? DateTime.UtcNow.AddDays(-1 * lastSyncDaysAgo.Value) : null, + LastSyncCreatedAt = lastSyncDaysAgo.HasValue ? DateTime.UtcNow.AddDays(-1 * lastSyncDaysAgo.Value) : null, LastSyncCompletedAt = null, HighWaterMark = null, }, diff --git a/Syncerbell.Tests/EntitySyncContextTests.cs b/Syncerbell.Tests/EntitySyncContextTests.cs index 3ed5a5d..37265a8 100644 --- a/Syncerbell.Tests/EntitySyncContextTests.cs +++ b/Syncerbell.Tests/EntitySyncContextTests.cs @@ -22,7 +22,7 @@ Task ProgressReporter(Progress progress) PriorSyncInfo = new PriorSyncInfo { HighWaterMark = null, - LastSyncQueuedAt = null, + LastSyncCreatedAt = null, LastSyncLeasedAt = null, LastSyncCompletedAt = null } @@ -62,7 +62,7 @@ Task ProgressReporter(Progress progress) PriorSyncInfo = new PriorSyncInfo { HighWaterMark = null, - LastSyncQueuedAt = null, + LastSyncCreatedAt = null, LastSyncLeasedAt = null, LastSyncCompletedAt = null } @@ -91,7 +91,7 @@ public async Task ReportProgress_WithNegativeValue_ThrowsArgumentOutOfRangeExcep PriorSyncInfo = new PriorSyncInfo { HighWaterMark = null, - LastSyncQueuedAt = null, + LastSyncCreatedAt = null, LastSyncLeasedAt = null, LastSyncCompletedAt = null } @@ -117,7 +117,7 @@ public async Task ReportProgress_WithNonPositiveMax_ThrowsArgumentOutOfRangeExce PriorSyncInfo = new PriorSyncInfo { HighWaterMark = null, - LastSyncQueuedAt = null, + LastSyncCreatedAt = null, LastSyncLeasedAt = null, LastSyncCompletedAt = null } @@ -142,7 +142,7 @@ public async Task ReportProgress_WithValueGreaterThanMax_ThrowsArgumentOutOfRang PriorSyncInfo = new PriorSyncInfo { HighWaterMark = null, - LastSyncQueuedAt = null, + LastSyncCreatedAt = null, LastSyncLeasedAt = null, LastSyncCompletedAt = null } @@ -173,7 +173,7 @@ async Task ProgressReporter(Progress progress) PriorSyncInfo = new PriorSyncInfo { HighWaterMark = null, - LastSyncQueuedAt = null, + LastSyncCreatedAt = null, LastSyncLeasedAt = null, LastSyncCompletedAt = null } @@ -205,7 +205,7 @@ Task ProgressReporter(Progress progress) PriorSyncInfo = new PriorSyncInfo { HighWaterMark = null, - LastSyncQueuedAt = null, + LastSyncCreatedAt = null, LastSyncLeasedAt = null, LastSyncCompletedAt = null } diff --git a/Syncerbell.Tests/IntervalEligibilityStrategyTests.cs b/Syncerbell.Tests/IntervalEligibilityStrategyTests.cs index 2503c65..fbe6251 100644 --- a/Syncerbell.Tests/IntervalEligibilityStrategyTests.cs +++ b/Syncerbell.Tests/IntervalEligibilityStrategyTests.cs @@ -19,7 +19,7 @@ public async Task IntervalEligibility_AlwaysReturnsTrue(int intervalDays, int? l PriorSyncInfo = new PriorSyncInfo() { LastSyncLeasedAt = lastSyncDaysAgo.HasValue ? DateTime.UtcNow.AddDays(-1 * lastSyncDaysAgo.Value) : null, - LastSyncQueuedAt = lastSyncDaysAgo.HasValue ? DateTime.UtcNow.AddDays(-1 * lastSyncDaysAgo.Value) : null, + LastSyncCreatedAt = lastSyncDaysAgo.HasValue ? DateTime.UtcNow.AddDays(-1 * lastSyncDaysAgo.Value) : null, LastSyncCompletedAt = null, HighWaterMark = null, }, diff --git a/Syncerbell/InMemorySyncLogPersistence.cs b/Syncerbell/InMemorySyncLogPersistence.cs index fd36293..cce1509 100644 --- a/Syncerbell/InMemorySyncLogPersistence.cs +++ b/Syncerbell/InMemorySyncLogPersistence.cs @@ -149,7 +149,7 @@ private PriorSyncInfo GetPriorSyncInfo(string entity, string? parametersJson, in HighWaterMark = priorEntriesQuery.FirstOrDefault(i => i.HighWaterMark != null)?.HighWaterMark, LastSyncCompletedAt = priorEntriesQuery.FirstOrDefault(i => i.FinishedAt != null)?.FinishedAt, LastSyncLeasedAt = priorEntriesQuery.FirstOrDefault(i => i.LeasedAt != null)?.LeasedAt, - LastSyncQueuedAt = priorEntriesQuery.FirstOrDefault()?.CreatedAt, + LastSyncCreatedAt = priorEntriesQuery.FirstOrDefault()?.CreatedAt, }; } diff --git a/Syncerbell/PriorSyncInfo.cs b/Syncerbell/PriorSyncInfo.cs index d00d14c..f7a5e8f 100644 --- a/Syncerbell/PriorSyncInfo.cs +++ b/Syncerbell/PriorSyncInfo.cs @@ -11,9 +11,9 @@ public class PriorSyncInfo public required string? HighWaterMark { get; init; } /// - /// Gets or sets the date and time when the last sync was queued for the entity. + /// Gets or sets the date and time when the last sync log entry was created for the entity. /// - public required DateTime? LastSyncQueuedAt { get; init; } + public required DateTime? LastSyncCreatedAt { get; init; } /// /// Gets or sets the date and time when the last sync was leased for the entity. From 0f606a67df48d1fa65b076b1ede96910005b9876 Mon Sep 17 00:00:00 2001 From: Paul Irwin Date: Thu, 31 Jul 2025 15:13:07 -0600 Subject: [PATCH 6/7] Add README --- F23.Syncerbell.sln | 1 + README.md | 261 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 262 insertions(+) create mode 100644 README.md diff --git a/F23.Syncerbell.sln b/F23.Syncerbell.sln index 52f1585..5e32372 100644 --- a/F23.Syncerbell.sln +++ b/F23.Syncerbell.sln @@ -18,6 +18,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Directory.Build.props = Directory.Build.props LICENSE = LICENSE global.json = global.json + README.md = README.md EndProjectSection EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ExampleInMemoryQueue", "ExampleInMemoryQueue\ExampleInMemoryQueue.csproj", "{F8782B44-7714-4AC9-9F58-3B1D8BACBE79}" diff --git a/README.md b/README.md new file mode 100644 index 0000000..7af17df --- /dev/null +++ b/README.md @@ -0,0 +1,261 @@ +# Syncerbell + +A sync orchestration library for .NET. + +## Core Concepts + +Syncerbell is a library designed to help you synchronize entities between +your application and a remote service or database. It provides a simple and +flexible way to define synchronization logic, manage the state of entities, +and handle the persistence of synchronization logs. + +Syncerbell is built around the concept of "entities" that need to be +synchronized. An entity is an abstract concept that represents a piece of +data that needs to be kept in sync with a remote service or database. This +could be a database record, a file, or any other piece of data that needs to +be synchronized. It could even be an entire database, or a collection of +entities that need to be synchronized together. + +Syncerbell is not aware of any external concerns like HTTP requests, +your app's database, or any other external service. It is designed to just +be an orchestration layer that manages the synchronization process, +calling your custom synchronization logic when needed, and keeping track of +the state of entities and their synchronization history. + +Syncerbell uses a concept called "eligibility" to determine when an entity +is ready to be synchronized. An entity is eligible for synchronization when +it meets certain criteria defined by the developer. This could be based on +time intervals, changes in the entity's state, or any other custom logic. +Syncerbell provides two built-in eligibility strategies: +- `IntervalEligibilityStrategy`: This strategy allows you to specify a time + interval after which the entity is considered eligible for synchronization. +- `AlwaysEligibleStrategy`: This strategy makes the entity always eligible for + synchronization, meaning it will be synchronized every time the sync service + runs. + +You can also implement your own custom eligibility strategies by +implementing the `IEligibilityStrategy` interface. This allows you to define +custom logic for determining when an entity is eligible for synchronization. + +Syncerbell also provides a persistence mechanism for storing the state of +entities and their synchronization history. See the +[persistence section](#persistence-configuration) for more details on how to +configure the persistence mechanism. + +Each sync run for an entity is represented by an `ISyncLogEntry` instance, which +contains information about the synchronization run, such as the entity being +synchronized, the time of the run, the result of the synchronization, and any +additional data that may be relevant to the synchronization process. Syncerbell +automatically manages the creation and storage of these log entries, allowing +you to focus on the synchronization logic itself. You do not normally need +to interact with these log entries directly, but they can be useful for +debugging and monitoring purposes. + +Syncerbell supports progress reporting for long-running synchronization +operations. You can report progress using the `ReportProgress` method +on the `EntitySyncContext` provided to your synchronization logic. This progress +will automatically be persisted in the sync log entries. + +Some sync operations may take a long time to complete, and Syncerbell +provides a way to handle these long-running operations by allowing you to +"fan out" the synchronization of an entity into multiple smaller +synchronization operations. This is done by calling the `ISyncQueueService` to +create pending sync log entries for all configured entities, enqueueing the +resulting log entries (which is up to you to implement), and then for each one, +calling the `ISyncService.SyncEntityIfEligible` method to process just that one +entity. This allows you to break down large synchronization operations into +smaller, manageable pieces, while still keeping track of the overall synchronization +state of the entity. Note that this currently will not consider eligibility +when determining whether to enqueue the entity, because that will be done when +the `SyncEntityIfEligible` method is called. This means that you can enqueue +entities that are not currently eligible for synchronization, and in case they +become eligible before the next sync run, they will be processed as part of that run. + +## Installation + +The easiest and most common way to get started with Syncerbell is to install +the Entity Framework Core package via NuGet. You can do this using the +command line or by adding it to your project file. This package will +transitively install the core Syncerbell library, and is suitable for use +in most applications. + +```bash +dotnet add package F23.Syncerbell.EntityFrameworkCore +``` + +Alternatively, you can install the core Syncerbell library directly if you +are not using Entity Framework Core, or if you want to use a different +persistence mechanism. This package is suitable for use in most applications. + +```bash +dotnet add package F23.Syncerbell +``` + +After installing the package, there are three main things you need to do: +1. [Configure](#configuration) Syncerbell in your application. +2. Set up the [hosting](#hosting) mechanism for Syncerbell. +3. Implement the [synchronization logic](#synchronization-logic) for your entities. + +## Configuration + +Syncerbell must be configured in your application to work correctly. The +configuration is typically done in the `Program.cs` file of your application, +and is designed to work well with `Microsoft.Extensions.DependencyInjection`. + +### Core Configuration + +You must first add the Syncerbell services to your service collection, and +configure at least one entity type to be synchronized. This is done using +the `AddSyncerbell` method. + +```c# +services.AddSyncerbell(options => +{ + options.AddEntity(entity => + { + // sync every hour + entity.Eligibility = new IntervalEligibilityStrategy(TimeSpan.FromHours(1)); + }); +}); +``` + +For more advanced use cases of statically-registered entities, you can +see the documentation for the non-generic `AddEntity` overload. + +If you need to generate entities to sync at runtime (such as in a multi-tenant application), +you can register a custom `IEntityProvider` implementation. + +### Persistence Configuration + +Syncerbell supports multiple persistence mechanisms. The most common is +Entity Framework Core, but you can also use custom persistence mechanisms +by implementing the `ISyncLogPersistence` interface. + +#### In-Memory + +For testing or development purposes, you can use the in-memory persistence +mechanism. This is not suitable for production use, but can be useful for +quick prototyping or testing. + +```c# +services.AddSyncerbellInMemoryPersistence(); +``` + +#### Entity Framework Core + +To use Entity Framework Core as the persistence mechanism, you need to +add the `F23.Syncerbell.EntityFrameworkCore` package and configure it in your +`Program.cs` file. This is the most common and recommended way to use Syncerbell +in production applications. + +```c# +services.AddSyncerbellEntityFrameworkCorePersistence(options => +{ + options.ConfigureDbContextOptions = dbContextOptions => + { + // configure the EF Core context like you normally would + dbContextOptions.UseSqlServer(builder.Configuration.GetConnectionString("DefaultConnectionString"), + sqlOptions => + { + sqlOptions.EnableRetryOnFailure(); + }); + }; +}); +``` + +Currently, there are no migrations for the Syncerbell database schema. +You can find the schema for the `SyncLogEntries` table in the +[Syncerbell.EntityFrameworkCore project README](Syncerbell.EntityFrameworkCore/README.md). + +## Hosting + +Syncerbell is designed to be hosted as a background service of some kind. +You can easily use Syncerbell in Azure Functions, Windows or Linux services, +ASP.NET Core applications as a background service, or any other +background service host. + +Note that Syncerbell should be called periodically to check for +entities that are eligible for synchronization. The frequency of these checks +should be at least as frequent as your smallest eligibility interval, but not +so frequent that it causes performance issues or excessive load on your +application or the remote service you are synchronizing with. + +### Azure Functions + +To use Syncerbell in Azure Functions, you can create a function that +triggers on a schedule or an event, and then use the `Syncerbell` library +to perform the synchronization. + +```c# +using Microsoft.Azure.Functions.Worker; +using Microsoft.Extensions.Logging; +using Syncerbell; + +namespace MyApp.Functions; + +public class SyncerbellTimer(ISyncService syncService, ILogger logger) +{ + private const bool RunOnStartup = + #if DEBUG + true; + #else + false; + #endif + + [Function(nameof(SyncerbellTimer))] + public async Task Run([TimerTrigger("0 0 * * * *", RunOnStartup = RunOnStartup)] TimerInfo timerInfo) + { + logger.LogInformation("Syncerbell timer function triggered at: {Time}", DateTime.UtcNow); + + try + { + await syncService.SyncAllEligible(SyncTriggerType.Timer); + logger.LogInformation("Syncerbell timer completed successfully."); + } + catch (Exception ex) + { + logger.LogError(ex, "Error during Syncerbell timer execution"); + throw; + } + } +} +``` + +### Hosted Service + +To use Syncerbell in an ASP.NET Core application, Windows or Linux service, or +any other command-line-invoked application, you can create a hosted service +using the HostedService library: + +```bash +dotnet add package F23.Syncerbell.HostedService +``` + +Then, you can register the hosted service in your `Program.cs`: +```c# +services.AddSyncerbellHostedService(options => +{ + // NOTE: This is purposefully very short to demonstrate the library. + options.StartupDelay = TimeSpan.FromSeconds(5); + options.CheckInterval = TimeSpan.FromSeconds(30); +}); +``` + +See the `ExampleApiUsage` project for an example of how to configure a hosted service. + +## Synchronization Logic + +To implement the synchronization logic for your entities, you need to create +a class that implements the `IEntitySync` interface. This class will contain the +logic for synchronizing the entity with the remote service or database. + +This interface defines a `Run` method that will be called by Syncerbell +when the entity is eligible for synchronization. You can implement this method +to perform the actual synchronization logic, such as making HTTP requests, +updating the database, or any other necessary operations. + +It will provide you with a context object that contains information about the +entity being synchronized, and prior run details (such as the last "high water mark" for the entity). +Your implementation should return a `SyncResult` object that indicates +whether the synchronization was successful, and an optional "high water mark" +that can be used for incremental synchronization in the future sync runs. From 3525d94400683d06ce3f0fd2f924afc7fac528a0 Mon Sep 17 00:00:00 2001 From: Paul Irwin Date: Thu, 31 Jul 2025 15:43:08 -0600 Subject: [PATCH 7/7] PR feedback: refactor entity resolver code into shared class --- Syncerbell/ServiceCollectionExtensions.cs | 1 + Syncerbell/SyncEntityResolver.cs | 45 +++++++++++++++++++++++ Syncerbell/SyncQueueService.cs | 25 +------------ Syncerbell/SyncService.cs | 37 ++----------------- 4 files changed, 51 insertions(+), 57 deletions(-) create mode 100644 Syncerbell/SyncEntityResolver.cs diff --git a/Syncerbell/ServiceCollectionExtensions.cs b/Syncerbell/ServiceCollectionExtensions.cs index 973ed58..5aee86b 100644 --- a/Syncerbell/ServiceCollectionExtensions.cs +++ b/Syncerbell/ServiceCollectionExtensions.cs @@ -21,6 +21,7 @@ public static IServiceCollection AddSyncerbell(this IServiceCollection services, services.AddSingleton(options); services.AddSingleton(); services.AddSingleton(); + services.AddSingleton(); foreach (var entity in options.Entities) { diff --git a/Syncerbell/SyncEntityResolver.cs b/Syncerbell/SyncEntityResolver.cs new file mode 100644 index 0000000..a66a991 --- /dev/null +++ b/Syncerbell/SyncEntityResolver.cs @@ -0,0 +1,45 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +namespace Syncerbell; + +/// +/// Resolves sync entities by combining configured entities with additional entities from an optional entity provider. +/// +public class SyncEntityResolver( + SyncerbellOptions options, + IServiceProvider serviceProvider, + ILogger logger) +{ + /// + /// Resolves all sync entities by combining configured entities with additional entities from the entity provider. + /// + /// A cancellation token to cancel the operation. + /// A read-only list of sync entity options containing all resolved entities. + /// Thrown when the entity provider type is not registered or does not implement . + public async Task> ResolveEntities(CancellationToken cancellationToken = default) + { + var entities = new List(options.Entities); + + if (options.EntityProviderType is { } entityProviderType) + { + var entityProvider = serviceProvider.GetRequiredService(entityProviderType) as IEntityProvider + ?? throw new InvalidOperationException( + $"Entity provider type {entityProviderType.FullName} is not registered or does not implement {nameof(IEntityProvider)}."); + + var additionalEntities = await entityProvider.GetEntities(cancellationToken); + + if (additionalEntities.Count == 0) + { + logger.LogWarning("Entity provider returned no additional entities. Using configured entities only."); + } + else + { + logger.LogInformation("Entity provider returned {Count} additional entities.", additionalEntities.Count); + entities.AddRange(additionalEntities); + } + } + + return entities; + } +} diff --git a/Syncerbell/SyncQueueService.cs b/Syncerbell/SyncQueueService.cs index cd1b5c8..2c8944f 100644 --- a/Syncerbell/SyncQueueService.cs +++ b/Syncerbell/SyncQueueService.cs @@ -1,4 +1,3 @@ -using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; namespace Syncerbell; @@ -7,8 +6,7 @@ namespace Syncerbell; /// Implementation of ISyncQueueService that creates queued sync log entries for distributed processing. /// public class SyncQueueService( - SyncerbellOptions options, - IServiceProvider serviceProvider, + SyncEntityResolver entityResolver, ISyncLogPersistence syncLogPersistence, ILogger logger) : ISyncQueueService @@ -17,26 +15,7 @@ public class SyncQueueService( public async Task> CreateAllQueuedSyncEntries(SyncTriggerType syncTrigger, CancellationToken cancellationToken = default) { - var entities = new List(options.Entities); - - if (options.EntityProviderType is { } entityProviderType) - { - var entityProvider = serviceProvider.GetRequiredService(entityProviderType) as IEntityProvider - ?? throw new InvalidOperationException( - $"Entity provider type {entityProviderType.FullName} is not registered or does not implement {nameof(IEntityProvider)}."); - - var additionalEntities = await entityProvider.GetEntities(cancellationToken); - - if (additionalEntities.Count == 0) - { - logger.LogWarning("Entity provider returned no additional entities. Using configured entities only."); - } - else - { - logger.LogInformation("Entity provider returned {Count} additional entities.", additionalEntities.Count); - entities.AddRange(additionalEntities); - } - } + var entities = await entityResolver.ResolveEntities(cancellationToken); if (entities.Count == 0) { diff --git a/Syncerbell/SyncService.cs b/Syncerbell/SyncService.cs index c742352..c14b2d8 100644 --- a/Syncerbell/SyncService.cs +++ b/Syncerbell/SyncService.cs @@ -7,7 +7,7 @@ namespace Syncerbell; /// Provides synchronization services for all registered entities, handling eligibility, logging, and execution. /// public class SyncService( - SyncerbellOptions options, + SyncEntityResolver syncEntityResolver, IServiceProvider serviceProvider, ISyncLogPersistence syncLogPersistence, ILogger logger) @@ -19,7 +19,7 @@ public class SyncService( /// public async Task> SyncAllEligible(SyncTriggerType triggerType, CancellationToken cancellationToken = default) { - var entities = await GetAllEntities(cancellationToken); + var entities = await syncEntityResolver.ResolveEntities(cancellationToken); if (entities.Count == 0) { @@ -74,7 +74,7 @@ public async Task> SyncAllEligible(SyncTriggerType tri throw new InvalidOperationException($"Sync log entry with identifier '{syncLogEntryId}' not found."); } - var allEntities = await GetAllEntities(cancellationToken); + var allEntities = await syncEntityResolver.ResolveEntities(cancellationToken); var entity = allEntities.FirstOrDefault(e => e.Entity == log.Entity && e.SchemaVersion == log.SchemaVersion && e.ParametersJson == log.ParametersJson) ?? throw new InvalidOperationException($"No entity configuration found for entity {log.Entity} " + @@ -94,37 +94,6 @@ public async Task> SyncAllEligible(SyncTriggerType tri return await ProcessSyncLogEntry(log, priorSyncInfo, triggerType, entity, cancellationToken); } - /// - /// Gets all entities configured for synchronization, including both statically configured and dynamically provided entities. - /// - /// A cancellation token that can be used to cancel the operation. - /// A list of all sync entity options. - private async Task> GetAllEntities(CancellationToken cancellationToken) - { - var entities = new List(options.Entities); - - if (options.EntityProviderType is { } entityProviderType) - { - var entityProvider = serviceProvider.GetRequiredService(entityProviderType) as IEntityProvider - ?? throw new InvalidOperationException( - $"Entity provider type {entityProviderType.FullName} is not registered or does not implement {nameof(IEntityProvider)}."); - - var additionalEntities = await entityProvider.GetEntities(cancellationToken); - - if (additionalEntities.Count == 0) - { - logger.LogWarning("Entity provider returned no additional entities. Using configured entities only."); - } - else - { - logger.LogInformation("Entity provider returned {Count} additional entities.", additionalEntities.Count); - entities.AddRange(additionalEntities); - } - } - - return entities; - } - /// /// Processes a sync log entry by checking eligibility and executing the sync if eligible. ///