Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/copilot-instructions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Prompt Instructions

## Coding standards (C#)
- Always put a blank line between properties and at the end of the file.
- Use 4 spaces for indentation.
- Always use curly braces for all control structures.
11 changes: 11 additions & 0 deletions ExampleInMemoryQueue/Customer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace ExampleInMemoryQueue;

public record Customer(string Name, string Email)
{
public int Id { get; init; }

public DateTime CreatedAt { get; init; } = DateTime.UtcNow;

public DateTime LastSyncAt { get; set; } = DateTime.UtcNow;

}
33 changes: 33 additions & 0 deletions ExampleInMemoryQueue/CustomerSync.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using Microsoft.Extensions.Logging;
using Syncerbell;

namespace ExampleInMemoryQueue;

public class CustomerSync(ILogger<CustomerSync> logger) : IEntitySync
{
public async Task<SyncResult> Run(EntitySyncContext context, CancellationToken cancellationToken = default)
{
var threadId = Thread.CurrentThread.ManagedThreadId;
logger.LogInformation("CustomerSync starting on thread {ThreadId} for entity {EntityType}",
threadId, context.Entity.Entity);

const int totalOperations = 5;
await context.ReportProgress(0, totalOperations);

for (int i = 1; i <= totalOperations; i++)
{
logger.LogInformation("CustomerSync thread {ThreadId}: Processing operation {Operation} of {Total}",
threadId, i, totalOperations);

// Simulate customer data synchronization work
await Task.Delay(Random.Shared.Next(500, 1500), cancellationToken);

await context.ReportProgress(i, totalOperations);
}

logger.LogInformation("CustomerSync completed on thread {ThreadId} for entity {EntityType}",
threadId, context.Entity.Entity);

return new SyncResult(Entity: context.Entity, Success: true);
}
}
21 changes: 21 additions & 0 deletions ExampleInMemoryQueue/ExampleInMemoryQueue.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="8.0.1" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.1" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="8.0.1" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="8.0.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Syncerbell\Syncerbell.csproj" />
</ItemGroup>

</Project>
15 changes: 15 additions & 0 deletions ExampleInMemoryQueue/Order.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace ExampleInMemoryQueue;

public record Order(int CustomerId, DateTime OrderDate)
{
public int Id { get; init; }

public DateTime CreatedAt { get; init; } = DateTime.UtcNow;

public DateTime LastSyncAt { get; set; } = DateTime.UtcNow;

public decimal TotalAmount { get; set; } = 0m;

public string Status { get; set; } = "Pending";

}
33 changes: 33 additions & 0 deletions ExampleInMemoryQueue/OrderSync.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using Microsoft.Extensions.Logging;
using Syncerbell;

namespace ExampleInMemoryQueue;

public class OrderSync(ILogger<OrderSync> logger) : IEntitySync
{
public async Task<SyncResult> Run(EntitySyncContext context, CancellationToken cancellationToken = default)
{
var threadId = Thread.CurrentThread.ManagedThreadId;
logger.LogInformation("OrderSync starting on thread {ThreadId} for entity {EntityType}",
threadId, context.Entity.Entity);

const int totalOperations = 4;
await context.ReportProgress(0, totalOperations);

for (int i = 1; i <= totalOperations; i++)
{
logger.LogInformation("OrderSync thread {ThreadId}: Processing operation {Operation} of {Total}",
threadId, i, totalOperations);

// Simulate order processing synchronization work
await Task.Delay(Random.Shared.Next(400, 1200), cancellationToken);

await context.ReportProgress(i, totalOperations);
}

logger.LogInformation("OrderSync completed on thread {ThreadId} for entity {EntityType}",
threadId, context.Entity.Entity);

return new SyncResult(Entity: context.Entity, Success: true);
}
}
13 changes: 13 additions & 0 deletions ExampleInMemoryQueue/Product.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace ExampleInMemoryQueue;

public record Product(string Name, decimal Price, string Category)
{
public int Id { get; init; }

public DateTime CreatedAt { get; init; } = DateTime.UtcNow;

public DateTime LastSyncAt { get; set; } = DateTime.UtcNow;

public int StockQuantity { get; set; } = 0;

}
33 changes: 33 additions & 0 deletions ExampleInMemoryQueue/ProductSync.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using Microsoft.Extensions.Logging;
using Syncerbell;

namespace ExampleInMemoryQueue;

public class ProductSync(ILogger<ProductSync> logger) : IEntitySync
{
public async Task<SyncResult> Run(EntitySyncContext context, CancellationToken cancellationToken = default)
{
var threadId = Thread.CurrentThread.ManagedThreadId;
logger.LogInformation("ProductSync starting on thread {ThreadId} for entity {EntityType}",
threadId, context.Entity.Entity);

const int totalOperations = 7;
await context.ReportProgress(0, totalOperations);

for (int i = 1; i <= totalOperations; i++)
{
logger.LogInformation("ProductSync thread {ThreadId}: Processing operation {Operation} of {Total}",
threadId, i, totalOperations);

// Simulate product inventory synchronization work
await Task.Delay(Random.Shared.Next(300, 1000), cancellationToken);

await context.ReportProgress(i, totalOperations);
}

logger.LogInformation("ProductSync completed on thread {ThreadId} for entity {EntityType}",
threadId, context.Entity.Entity);

return new SyncResult(Entity: context.Entity, Success: true);
}
}
165 changes: 165 additions & 0 deletions ExampleInMemoryQueue/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
using ExampleInMemoryQueue;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Syncerbell;
using System.Linq;

// Create service collection and configure services
var services = new ServiceCollection();

// Add logging
services.AddLogging(builder =>
{
builder.AddConsole();
builder.SetMinimumLevel(LogLevel.Information);
});

// Configure Syncerbell with multiple entities
services.AddSyncerbell(options =>
{
options.DefaultLeaseExpiration = TimeSpan.FromMinutes(30);

// Add Customer entity
options.AddEntity<Customer, CustomerSync>(entity =>
{
entity.LeaseExpiration = TimeSpan.FromMinutes(15);
entity.Parameters = new SortedDictionary<string, object?>()
{
["Region"] = "US-West",
["BatchSize"] = 100
};
entity.Eligibility = new AlwaysEligibleStrategy();
});

// Add Product entity
options.AddEntity<Product, ProductSync>(entity =>
{
entity.LeaseExpiration = TimeSpan.FromMinutes(20);
entity.Parameters = new SortedDictionary<string, object?>()
{
["Category"] = "Electronics",
["BatchSize"] = 50
};
entity.Eligibility = new AlwaysEligibleStrategy();
});

// Add Order entity
options.AddEntity<Order, OrderSync>(entity =>
{
entity.LeaseExpiration = TimeSpan.FromMinutes(10);
entity.Parameters = new SortedDictionary<string, object?>()
{
["Status"] = "Active",
["BatchSize"] = 25
};
entity.Eligibility = new AlwaysEligibleStrategy();
});
});

// Add in-memory persistence for sync logs
services.AddSyncerbellInMemoryPersistence();

// Build service provider
var serviceProvider = services.BuildServiceProvider();

// Get required services
var logger = serviceProvider.GetRequiredService<ILogger<Program>>();
var syncQueueService = serviceProvider.GetRequiredService<ISyncQueueService>();
var syncService = serviceProvider.GetRequiredService<ISyncService>();

logger.LogInformation("=== Syncerbell Threading Example ===");
logger.LogInformation("This example demonstrates fanning out queue operations using multiple threads");

try
{
// Step 1: Create queued sync entries for all entities
logger.LogInformation("Step 1: Creating queued sync entries for all entities...");
var queuedEntries = await syncQueueService.CreateAllQueuedSyncEntries(SyncTriggerType.Manual);

logger.LogInformation("Created {Count} queued sync entries:", queuedEntries.Count);
foreach (var entry in queuedEntries)
{
logger.LogInformation(" - Entry ID: {EntryId}, Entity: {EntityType}", entry.Id, entry.Entity);
}

// Step 2: Simulate recording queue message IDs (as if we put them in a real queue)
logger.LogInformation("Step 2: Recording queue message IDs...");
var queueMessageTasks = queuedEntries.Select(async entry =>
{
var queueMessageId = $"msg_{Guid.NewGuid():N}";
await syncQueueService.RecordQueueMessageId(entry.Id, queueMessageId);
logger.LogInformation("Recorded queue message ID {MessageId} for entry {EntryId}", queueMessageId, entry.Id);
return (entry, queueMessageId);
});

var entriesWithMessages = await Task.WhenAll(queueMessageTasks);

// Step 3: Process sync entries concurrently using multiple threads
logger.LogInformation("Step 3: Processing sync entries concurrently using multiple threads...");
logger.LogInformation("Starting {Count} threads for parallel processing...", queuedEntries.Count);

var processingTasks = entriesWithMessages.Select(async entryInfo =>
{
var (entry, _) = entryInfo;
var threadId = Environment.CurrentManagedThreadId;

logger.LogInformation("Thread {ThreadId}: Starting to process entry {EntryId} (Entity: {EntityType})",
threadId, entry.Id, entry.Entity);

try
{
// Use ISyncService to sync the individual entity from the queued entry
var result = await syncService.SyncEntityIfEligible(entry.Id, SyncTriggerType.Manual);

if (result != null)
{
logger.LogInformation("Thread {ThreadId}: Successfully processed entry {EntryId} - {EntityType}",
threadId, entry.Id, entry.Entity);
return result;
}
else
{
logger.LogWarning("Thread {ThreadId}: Failed to process entry {EntryId} - result was null",
threadId, entry.Id);
return null;
}
}
catch (Exception ex)
{
logger.LogError(ex, "Thread {ThreadId}: Error processing entry {EntryId}",
threadId, entry.Id);
return null;
}
});

// Wait for all threads to complete
var results = await Task.WhenAll(processingTasks);

logger.LogInformation("=== Processing Complete ===");

var successfulResults = results.Where(r => r?.Success == true).ToList();
var failedResults = results.Where(r => r == null || r.Success == false).ToList();

logger.LogInformation("Successfully processed: {SuccessCount} entities", successfulResults.Count);
logger.LogInformation("Failed to process: {FailCount} entities", failedResults.Count);

if (successfulResults.Any())
{
logger.LogInformation("Successful sync results:");
foreach (var result in successfulResults)
{
logger.LogInformation(" - {EntityType}: Success", result?.Entity?.EntityType?.Name ?? "null");
}
}

logger.LogInformation("Example completed successfully!");
}
catch (Exception ex)
{
logger.LogError(ex, "An error occurred during the example execution");
}
finally
{
// Dispose the service provider
serviceProvider.Dispose();
}
8 changes: 8 additions & 0 deletions F23.Syncerbell.sln
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
Directory.Build.props = Directory.Build.props
LICENSE = LICENSE
global.json = global.json
README.md = README.md
EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ExampleInMemoryQueue", "ExampleInMemoryQueue\ExampleInMemoryQueue.csproj", "{F8782B44-7714-4AC9-9F58-3B1D8BACBE79}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -46,8 +49,13 @@ Global
{C844F729-43EB-4D7E-8CA5-BDCBF83A971E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C844F729-43EB-4D7E-8CA5-BDCBF83A971E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C844F729-43EB-4D7E-8CA5-BDCBF83A971E}.Release|Any CPU.Build.0 = Release|Any CPU
{F8782B44-7714-4AC9-9F58-3B1D8BACBE79}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F8782B44-7714-4AC9-9F58-3B1D8BACBE79}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F8782B44-7714-4AC9-9F58-3B1D8BACBE79}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F8782B44-7714-4AC9-9F58-3B1D8BACBE79}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{BE825FA8-9B20-40C8-B233-65250AAD268D} = {7C18AD40-956C-46D5-96E5-4DA767C38067}
{F8782B44-7714-4AC9-9F58-3B1D8BACBE79} = {7C18AD40-956C-46D5-96E5-4DA767C38067}
EndGlobalSection
EndGlobal
Loading