Skip to content

Commit

Permalink
Updated Custom Field Definition repo to throw if it cannot acquire lock
Browse files Browse the repository at this point in the history
  • Loading branch information
niemyjski committed Sep 27, 2024
1 parent 65e3861 commit f88defa
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
Expand Down Expand Up @@ -46,7 +46,7 @@ public CustomFieldDefinitionRepository(CustomFieldDefinitionIndex index, ILockPr
public async Task<IDictionary<string, CustomFieldDefinition>> GetFieldMappingAsync(string entityType, string tenantKey)
{
string cacheKey = GetMappingCacheKey(entityType, tenantKey);
var cachedMapping = await _cache.GetAsync<Dictionary<string, CustomFieldDefinition>>(cacheKey);
var cachedMapping = await _cache.GetAsync<Dictionary<string, CustomFieldDefinition>>(cacheKey).AnyContext();
if (cachedMapping.HasValue)
return cachedMapping.Value;

Expand All @@ -55,16 +55,16 @@ public async Task<IDictionary<string, CustomFieldDefinition>> GetFieldMappingAsy
var fields = await FindAsync(q => q
.FieldEquals(cf => cf.EntityType, entityType)
.FieldEquals(cf => cf.TenantKey, tenantKey),
o => o.PageLimit(1000));
o => o.PageLimit(1000)).AnyContext();

do
{
foreach (var customField in fields.Documents)
fieldMapping[customField.Name] = customField;
} while (await fields.NextPageAsync());
} while (await fields.NextPageAsync().AnyContext());

if (fieldMapping.Count > 0)
await _cache.AddAsync(cacheKey, fieldMapping, TimeSpan.FromMinutes(15));
await _cache.AddAsync(cacheKey, fieldMapping, TimeSpan.FromMinutes(15)).AnyContext();

return fieldMapping;
}
Expand Down Expand Up @@ -99,9 +99,11 @@ public Task<CustomFieldDefinition> AddFieldAsync(string entityType, string tenan

public override async Task AddAsync(IEnumerable<CustomFieldDefinition> documents, ICommandOptions options = null)
{
var fieldScopes = documents.GroupBy(d => (d.EntityType, d.TenantKey, d.IndexType));
var lockKeys = fieldScopes.Select(f => GetLockName(f.Key.EntityType, f.Key.TenantKey, f.Key.IndexType));
await using var _ = await _lockProvider.AcquireAsync(lockKeys, TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1));
var fieldScopes = documents.GroupBy(d => (d.EntityType, d.TenantKey, d.IndexType)).ToArray();
string[] lockKeys = fieldScopes.Select(f => GetLockName(f.Key.EntityType, f.Key.TenantKey, f.Key.IndexType)).ToArray();
await using var lck = await _lockProvider.AcquireAsync(lockKeys, TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1)).AnyContext();
if (lck is null)
throw new Exception($"Failed to acquire lock: {String.Join(", ", lockKeys)}");

foreach (var fieldScope in fieldScopes)
{
Expand All @@ -110,15 +112,15 @@ public override async Task AddAsync(IEnumerable<CustomFieldDefinition> documents

var usedNames = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
var availableSlots = new Queue<int>();
var availableSlotsCache = await _cache.GetListAsync<int>(slotFieldScopeKey);
var usedNamesCache = await _cache.GetListAsync<string>(namesFieldScopeKey);
var availableSlotsCache = await _cache.GetListAsync<int>(slotFieldScopeKey).AnyContext();
var usedNamesCache = await _cache.GetListAsync<string>(namesFieldScopeKey).AnyContext();

if (availableSlotsCache.HasValue && usedNamesCache.HasValue && availableSlotsCache.Value.Count > 0)
{
foreach (var availableSlot in availableSlotsCache.Value.OrderBy(s => s))
foreach (int availableSlot in availableSlotsCache.Value.OrderBy(s => s))
availableSlots.Enqueue(availableSlot);

foreach (var usedName in usedNamesCache.Value.ToArray())
foreach (string usedName in usedNamesCache.Value.ToArray())
usedNames.Add(usedName);

_logger.LogTrace("Got cached list of {SlotCount} available slots for {FieldScope}", availableSlots.Count, slotFieldScopeKey);
Expand All @@ -133,13 +135,13 @@ public override async Task AddAsync(IEnumerable<CustomFieldDefinition> documents
.Include(cf => cf.IndexSlot)
.Include(cf => cf.Name)
.Include(cf => cf.IsDeleted),
o => o.IncludeSoftDeletes().PageLimit(1000).QueryLogLevel(Microsoft.Extensions.Logging.LogLevel.Information));
o => o.IncludeSoftDeletes().PageLimit(1000).QueryLogLevel(Microsoft.Extensions.Logging.LogLevel.Information)).AnyContext();

do
{
usedSlots.AddRange(existingFields.Documents.Select(d => d.IndexSlot));
usedNames.AddRange(existingFields.Documents.Where(d => !d.IsDeleted).Select(d => d.Name));
} while (await existingFields.NextPageAsync());
} while (await existingFields.NextPageAsync().AnyContext());

int slotBatchSize = fieldScope.Count() + 25;
int slot = 1;
Expand All @@ -152,8 +154,8 @@ public override async Task AddAsync(IEnumerable<CustomFieldDefinition> documents
}

_logger.LogTrace("Found {FieldCount} fields with {SlotCount} used slots for {FieldScope}", existingFields.Total, usedSlots.Count, slotFieldScopeKey);
await _cache.ListAddAsync(slotFieldScopeKey, availableSlots.ToArray(), TimeSpan.FromMinutes(5));
await _cache.ListAddAsync(namesFieldScopeKey, usedNames.ToArray(), TimeSpan.FromMinutes(5));
await _cache.ListAddAsync(slotFieldScopeKey, availableSlots.ToArray(), TimeSpan.FromMinutes(5)).AnyContext();
await _cache.ListAddAsync(namesFieldScopeKey, usedNames.ToArray(), TimeSpan.FromMinutes(5)).AnyContext();
}

foreach (var doc in fieldScope)
Expand All @@ -167,13 +169,13 @@ public override async Task AddAsync(IEnumerable<CustomFieldDefinition> documents
int availableSlot = availableSlots.Dequeue();
doc.IndexSlot = availableSlot;

await _cache.ListRemoveAsync(slotFieldScopeKey, new[] { availableSlot });
await _cache.ListAddAsync(namesFieldScopeKey, new[] { doc.Name });
await _cache.ListRemoveAsync(slotFieldScopeKey, [availableSlot]).AnyContext();
await _cache.ListAddAsync(namesFieldScopeKey, [doc.Name]).AnyContext();
_logger.LogTrace("New field {FieldName} using slot {IndexSlot} for {FieldScope}", doc.Name, doc.IndexSlot, slotFieldScopeKey);
}
}

await base.AddAsync(documents, options);
await base.AddAsync(documents, options).AnyContext();
}

protected override Task ValidateAndThrowAsync(CustomFieldDefinition document)
Expand Down Expand Up @@ -209,7 +211,7 @@ private async Task OnDocumentsChanged(object source, DocumentsChangeEventArgs<Cu
if (doc.Value.IsDeleted)
{
string namesFieldScopeKey = GetNamesFieldScopeCacheKey(doc.Value.EntityType, doc.Value.TenantKey, doc.Value.IndexType);
await _cache.ListRemoveAsync(namesFieldScopeKey, new[] { doc.Value.Name });
await _cache.ListRemoveAsync(namesFieldScopeKey, new[] { doc.Value.Name }).AnyContext();
}
}
}
Expand All @@ -219,8 +221,8 @@ private async Task OnDocumentsChanged(object source, DocumentsChangeEventArgs<Cu
{
string slotFieldScopeKey = GetSlotFieldScopeCacheKey(doc.Value.EntityType, doc.Value.TenantKey, doc.Value.IndexType);
string namesFieldScopeKey = GetNamesFieldScopeCacheKey(doc.Value.EntityType, doc.Value.TenantKey, doc.Value.IndexType);
await _cache.ListAddAsync(slotFieldScopeKey, new[] { doc.Value.IndexSlot });
await _cache.ListRemoveAsync(namesFieldScopeKey, new[] { doc.Value.Name });
await _cache.ListAddAsync(slotFieldScopeKey, new[] { doc.Value.IndexSlot }).AnyContext();
await _cache.ListRemoveAsync(namesFieldScopeKey, new[] { doc.Value.Name }).AnyContext();
}
}
}
Expand All @@ -247,29 +249,29 @@ private string GetNamesFieldScopeCacheKey(string entityType, string tenantKey, s

protected override async Task InvalidateCacheByQueryAsync(IRepositoryQuery<CustomFieldDefinition> query)
{
await base.InvalidateCacheByQueryAsync(query);
await base.InvalidateCacheByQueryAsync(query).AnyContext();

var conditions = query.GetFieldConditions();
var entityTypeCondition = conditions.FirstOrDefault(c => c.Field == InferField(d => d.EntityType) && c.Operator == ComparisonOperator.Equals);
if (entityTypeCondition == null || String.IsNullOrEmpty(entityTypeCondition.Value?.ToString()))
return;

await _cache.RemoveAsync(GetMappingCacheKey(entityTypeCondition.Value.ToString(), GetTenantKey(query)));
await _cache.RemoveAsync(GetMappingCacheKey(entityTypeCondition.Value.ToString(), GetTenantKey(query))).AnyContext();
}

protected override async Task InvalidateCacheAsync(IReadOnlyCollection<ModifiedDocument<CustomFieldDefinition>> documents, ChangeType? changeType = null)
{
await base.InvalidateCacheAsync(documents, changeType);
await base.InvalidateCacheAsync(documents, changeType).AnyContext();

if (documents.Count == 0)
{
await _cache.RemoveByPrefixAsync("customfield");
_logger.LogInformation("Cleared all custom field mappings from cache due to change {ChangeType}.", changeType);
await _cache.RemoveByPrefixAsync("customfield").AnyContext();
_logger.LogInformation("Cleared all custom field mappings from cache due to change {ChangeType}", changeType);
}

var cacheKeys = documents.Select(d => GetMappingCacheKey(d.Value.EntityType, d.Value.TenantKey)).Distinct().ToList();
foreach (var cacheKey in cacheKeys)
await _cache.RemoveByPrefixAsync(cacheKey);
foreach (string cacheKey in cacheKeys)
await _cache.RemoveByPrefixAsync(cacheKey).AnyContext();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
Expand Down Expand Up @@ -226,7 +226,7 @@ public async Task CanAddNewFieldsAndReserveSlotsConcurrentlyAcrossTenantsAndFiel
Log.SetLogLevel<CustomFieldDefinitionRepository>(LogLevel.Information);

const int COUNT = 1000;
await Parallel.ForEachAsync(Enumerable.Range(1, COUNT), new ParallelOptions { MaxDegreeOfParallelism = 2 }, async (index, ct) =>
await Parallel.ForEachAsync(Enumerable.Range(1, COUNT), new ParallelOptions { MaxDegreeOfParallelism = 2 }, async (index, _) =>
{
var customField = await _customFieldDefinitionRepository.AddAsync(new CustomFieldDefinition
{
Expand Down Expand Up @@ -330,7 +330,7 @@ public async Task CanHandleWrongFieldValueType()
[Fact]
public async Task CanUseCalculatedFieldType()
{
await _customFieldDefinitionRepository.AddAsync(new[] {
await _customFieldDefinitionRepository.AddAsync([
new CustomFieldDefinition {
EntityType = nameof(EmployeeWithCustomFields),
TenantKey = "1",
Expand All @@ -347,11 +347,11 @@ await _customFieldDefinitionRepository.AddAsync(new[] {
EntityType = nameof(EmployeeWithCustomFields),
TenantKey = "1",
Name = "Calculated",
IndexType = CalculatedIntegerFieldType.IndexType,
IndexType = IntegerFieldType.IndexType,
ProcessMode = CustomFieldProcessMode.AlwaysProcess,
Data = new Dictionary<string, object> { { "Expression", "source.Data.Field1 + source.Data.Field2" } }
}
});
]);

var employee = EmployeeWithCustomFieldsGenerator.Generate(age: 19);
employee.CompanyId = "1";
Expand Down

0 comments on commit f88defa

Please sign in to comment.