diff --git a/.paket/Paket.Restore.targets b/.paket/Paket.Restore.targets index e7c1bc0c..52f41c60 100644 --- a/.paket/Paket.Restore.targets +++ b/.paket/Paket.Restore.targets @@ -11,23 +11,49 @@ $(MSBuildThisFileDirectory)..\ $(PaketRootPath)paket-files\paket.restore.cached $(PaketRootPath)paket.lock + classic + proj + assembly + native /Library/Frameworks/Mono.framework/Commands/mono mono - - $(PaketRootPath)paket.exe - $(PaketToolsPath)paket.exe - "$(PaketExePath)" - $(MonoPath) --runtime=v4.0.30319 "$(PaketExePath)" - + + $(PaketRootPath)paket.bootstrapper.exe + $(PaketToolsPath)paket.bootstrapper.exe + $([System.IO.Path]::GetDirectoryName("$(PaketBootStrapperExePath)"))\ + + + + + $(PaketRootPath)paket.exe + $(PaketToolsPath)paket.exe + $(PaketToolsPath)paket.exe + $(_PaketBootStrapperExeDir)paket.exe + paket.exe + + + $(PaketRootPath)paket + $(PaketToolsPath)paket + $(PaketToolsPath)paket + + + $(PaketRootPath)paket.exe + $(PaketToolsPath)paket.exe + + + $(PaketBootStrapperExeDir)paket.exe + + + paket + + <_PaketExeExtension>$([System.IO.Path]::GetExtension("$(PaketExePath)")) - dotnet "$(PaketExePath)" + dotnet "$(PaketExePath)" + $(MonoPath) --runtime=v4.0.30319 "$(PaketExePath)" + "$(PaketExePath)" - - "$(PaketExePath)" - $(PaketRootPath)paket.bootstrapper.exe - $(PaketToolsPath)paket.bootstrapper.exe "$(PaketBootStrapperExePath)" $(MonoPath) --runtime=v4.0.30319 "$(PaketBootStrapperExePath)" @@ -36,30 +62,40 @@ true true + + + True - + + + + + true - $(NoWarn);NU1603 + $(NoWarn);NU1603;NU1604;NU1605;NU1608 - /usr/bin/shasum $(PaketRestoreCacheFile) | /usr/bin/awk '{ print $1 }' - /usr/bin/shasum $(PaketLockFilePath) | /usr/bin/awk '{ print $1 }' + /usr/bin/shasum "$(PaketRestoreCacheFile)" | /usr/bin/awk '{ print $1 }' + /usr/bin/shasum "$(PaketLockFilePath)" | /usr/bin/awk '{ print $1 }' - + - + + + + $([System.IO.File]::ReadAllText('$(PaketRestoreCacheFile)')) @@ -69,11 +105,26 @@ true + + + true + + - + + + + + + + + $(MSBuildProjectDirectory)\obj\$(MSBuildProjectFile).paket.references.cached @@ -82,7 +133,9 @@ $(MSBuildProjectDirectory)\$(MSBuildProjectName).paket.references $(MSBuildProjectDirectory)\paket.references - $(MSBuildProjectDirectory)\obj\$(MSBuildProjectFile).$(TargetFramework).paket.resolved + + false + true true references-file-or-cache-not-found @@ -101,32 +154,43 @@ - + true - target-framework '$(TargetFramework)' + target-framework '$(TargetFramework)' or '$(TargetFrameworks)' files @(PaketResolvedFilePaths) - + + - + + false + true + + - + - + + $([System.String]::Copy('%(PaketReferencesFileLines.Identity)').Split(',').Length) $([System.String]::Copy('%(PaketReferencesFileLines.Identity)').Split(',')[0]) $([System.String]::Copy('%(PaketReferencesFileLines.Identity)').Split(',')[1]) $([System.String]::Copy('%(PaketReferencesFileLines.Identity)').Split(',')[4]) + $([System.String]::Copy('%(PaketReferencesFileLines.Identity)').Split(',')[5]) %(PaketReferencesFileLinesInfo.PackageVersion) - All + All + runtime + runtime + true + true @@ -158,19 +222,27 @@ false + $(MSBuildVersion) + 15.8.0 <_NuspecFilesNewLocation Include="$(BaseIntermediateOutputPath)$(Configuration)\*.nuspec"/> + + $(MSBuildProjectDirectory)/$(MSBuildProjectFile) true - false - true + false + true + false + true + false + true $(BaseIntermediateOutputPath)$(Configuration) $(BaseIntermediateOutputPath) @@ -183,11 +255,54 @@ - - + - + + - > Monitors = new Dictionary>(); - + public ConcurrentDictionary> Monitors = new ConcurrentDictionary>(); public void Dispose() { } - public ComponentHealthMonitor Get(string component) { throw new NotImplementedException(); } - public Dictionary GetData(string component) { throw new NotImplementedException(); } - public ComponentHealthMonitor SetHealthFunction(string component, Func check, Func> healthData = null) { - Monitors[component] = check; + Monitors.AddOrUpdate(component, k => check, (k, v) => check); return new ComponentHealthMonitor(component, check); } } diff --git a/Gigya.Microdot.ServiceProxy/Caching/AsyncCache.cs b/Gigya.Microdot.ServiceProxy/Caching/AsyncCache.cs index 7dcc81c7..65b97844 100644 --- a/Gigya.Microdot.ServiceProxy/Caching/AsyncCache.cs +++ b/Gigya.Microdot.ServiceProxy/Caching/AsyncCache.cs @@ -35,48 +35,74 @@ using Gigya.ServiceContract.HttpService; using Metrics; using System.Threading.Tasks.Dataflow; +using Gigya.Microdot.SharedLogic.Collections; + +// ReSharper disable InconsistentlySynchronizedField // Stats/Metrics filed related namespace Gigya.Microdot.ServiceProxy.Caching { - public sealed class AsyncCache : IMemoryCacheManager, IServiceProvider, IDisposable { + private class Statistics + { + private const double Mb = 1048576; + private MetricsContext Metrics { get; } + public MetricsContext Hits { get; } + public MetricsContext Misses { get; } + public MetricsContext JoinedTeam { get; } + public MetricsContext AwaitingResult { get; } + public MetricsContext Failed { get; } + public Counter ClearCache { get; } + public MetricsContext Items { get; } + public MetricsContext Revokes { get; } + + public Statistics(AsyncCache subject, MetricsContext metrics) + { + // Gauges + Metrics = metrics; + Metrics.Gauge("Size", () => subject.LastCacheSizeBytes / Mb, Unit.MegaBytes); + Metrics.Gauge("Entries", () => subject.MemoryCache.GetCount(), Unit.Items); + Metrics.Gauge("SizeLimit", () => subject.MemoryCache.CacheMemoryLimit / Mb, Unit.MegaBytes); + Metrics.Gauge("RamUsageLimit", () => subject.MemoryCache.PhysicalMemoryLimit, Unit.Percent); + + // Counters + AwaitingResult = Metrics.Context("AwaitingResult"); + ClearCache = Metrics.Counter("ClearCache", Unit.Calls); + + // Meters + Hits = Metrics.Context("Hits"); + Misses = Metrics.Context("Misses"); + JoinedTeam = Metrics.Context("JoinedTeam"); + Failed = Metrics.Context("Failed"); + + Items = Metrics.Context("Items"); + Revokes = Metrics.Context("Revoke"); + } + } private IDateTime DateTime { get; } private Func GetRevokeConfig { get; } private ILog Log { get; } - private MemoryCache MemoryCache { get; set; } + private MemoryCache MemoryCache { get; set; } // , where cache key = hash(method name + params) private long LastCacheSizeBytes { get; set; } - private MetricsContext Metrics { get; } - - internal ConcurrentDictionary> RevokeKeyToCacheKeysIndex { get; set; } = new ConcurrentDictionary>(); - - private MetricsContext Hits { get; set; } - private MetricsContext Misses { get; set; } - private MetricsContext JoinedTeam { get; set; } - private MetricsContext AwaitingResult { get; set; } - private MetricsContext Failed { get; set; } - private Counter ClearCache { get; set; } - - private MetricsContext Items { get; set; } - private MetricsContext Revokes { get; set; } + private Statistics Stats { get; } + private readonly CancellationTokenSource _cleanUpToken; + private readonly TimeBoundConcurrentQueue _revokesQueue = new TimeBoundConcurrentQueue(); + internal ConcurrentDictionary RevokeKeyToCacheKeysIndex { get; set; } = new ConcurrentDictionary(); private IDisposable RevokeDisposable { get; } - private const double MB = 1048576.0; - private int _clearCount; - - public int RevokeKeysCount => RevokeKeyToCacheKeysIndex.Count; + private int _clearCount; /// /// Not thread safe used for testing /// - internal int CacheKeyCount => RevokeKeyToCacheKeysIndex.Sum(item => item.Value.Count); - + internal int CacheKeyCount => RevokeKeyToCacheKeysIndex.Sum(item => item.Value.CacheKeysSet.Count); public AsyncCache(ILog log, MetricsContext metrics, IDateTime dateTime, IRevokeListener revokeListener, Func getRevokeConfig) { DateTime = dateTime; GetRevokeConfig = getRevokeConfig; + Stats = new Statistics(this, metrics); Log = log; if (ObjectCache.Host == null) @@ -86,171 +112,160 @@ public AsyncCache(ILog log, MetricsContext metrics, IDateTime dateTime, IRevokeL Clear(); - Metrics = metrics; - InitMetrics(); - var onRevoke = new ActionBlock(OnRevoke); - RevokeDisposable = revokeListener.RevokeSource.LinkTo(onRevoke); + RevokeDisposable = revokeListener.RevokeSource.LinkTo(new ActionBlock(OnRevoke)); + // Clean up queue of revokes periodically + _cleanUpToken = new CancellationTokenSource(); + Task.Run(OnMaintain).ContinueWith(_ => { try{_cleanUpToken.Dispose();}catch (Exception ){ /*ignore already disposed*/ }}); } + private Task OnRevoke(string revokeKey) { - var shouldLog = GetRevokeConfig().LogRevokes; + var logRevokes = GetRevokeConfig().LogRevokes; if (string.IsNullOrEmpty(revokeKey)) { - Log.Warn("Error while revoking cache, revokeKey can't be null"); + Log.Warn("Error while revoking cache, revokeKey can't be null or empty"); return Task.FromResult(false); } try { - if (shouldLog) - Log.Info(x=>x("Revoke request received", unencryptedTags: new {revokeKey})); + if (logRevokes) + Log.Info(x => x("Revoke request received", unencryptedTags: new {revokeKey})); + + // Save before gaining control over reverse item + var now = DateTime.UtcNow; - if (RevokeKeyToCacheKeysIndex.TryGetValue(revokeKey, out HashSet cacheKeys)) + // We need to handle race between Enqueue and factory/AlreadyRevoked + var rItem = RevokeKeyToCacheKeysIndex.GetOrAdd(revokeKey, k => { - lock (cacheKeys) - { - var arrayOfCacheKeys = cacheKeys.ToArray();// To prevent iteration over modified collection. - if (shouldLog && arrayOfCacheKeys.Length==0) - Log.Info(x => x("There is no CacheKey to Revoke", unencryptedTags: new { revokeKey })); + _revokesQueue.Enqueue(now, revokeKey); + return new ReverseItem { WhenRevoked = now }; + }); - foreach (var cacheKey in arrayOfCacheKeys) - { - if (shouldLog) - Log.Info(x => x("Revoking cacheKey", unencryptedTags: new { revokeKey, cacheKey })); + rItem.WhenRevoked = now; - var unused = (AsyncCacheItem)MemoryCache.Remove(cacheKey); - } - } - Revokes.Meter("Succeeded", Unit.Events).Mark(); - } - else + // Lock wide while processing ALL the keys, to compete with possible call (and insertion to cache) to be in consistent state + lock (rItem) { - if (shouldLog) - Log.Info(x => x("Key is not cached. No revoke is needed", unencryptedTags: new { revokeKey })); + // We have to copy aside, else MemoryCache remove call back 100% modifying CacheKeysSet + var cacheKeys = rItem.CacheKeysSet.ToArray(); - Revokes.Meter("Discarded", Unit.Events).Mark(); - } + if (logRevokes && cacheKeys.Length == 0) + Log.Info(x => x("There is no CacheKey to Revoke", unencryptedTags: new { revokeKey })); + + foreach (var cacheKey in cacheKeys) + { + if (logRevokes) + Log.Info(x => x("Revoking cacheKey", unencryptedTags: new { revokeKey, cacheKey })); + + MemoryCache.Remove(cacheKey); + } + } + Stats.Revokes.Meter("Succeeded", Unit.Events).Mark(); } catch (Exception ex) { - Revokes.Meter("Failed", Unit.Events).Mark(); + Stats.Revokes.Meter("Failed", Unit.Events).Mark(); Log.Warn("Error while revoking cache", exception: ex, unencryptedTags: new {revokeKey}); } return Task.FromResult(true); } - - private void InitMetrics() + /// + /// Cleanup revoke keys that are has no associated cache keys and older/equal than interval. + /// + private async Task OnMaintain() { - // Gauges - Metrics.Gauge("Size", () => LastCacheSizeBytes / MB, Unit.MegaBytes); - Metrics.Gauge("Entries", () => MemoryCache.GetCount(), Unit.Items); - Metrics.Gauge("SizeLimit", () => MemoryCache.CacheMemoryLimit / MB, Unit.MegaBytes); - Metrics.Gauge("RamUsageLimit", () => MemoryCache.PhysicalMemoryLimit, Unit.Percent); - - // Counters - AwaitingResult = Metrics.Context("AwaitingResult"); - ClearCache = Metrics.Counter("ClearCache", Unit.Calls); - - // Meters - Hits = Metrics.Context("Hits"); - Misses = Metrics.Context("Misses"); - JoinedTeam = Metrics.Context("JoinedTeam"); - Failed = Metrics.Context("Failed"); - - Items = Metrics.Context("Items"); - Revokes = Metrics.Context("Revoke"); + try + { + while (!_cleanUpToken.Token.IsCancellationRequested) + { + var periodMs = GetRevokeConfig().RevokesCleanupMs; + var revokeKeys = _revokesQueue.Dequeue(DateTime.UtcNow.AddMilliseconds(-1 * periodMs)); + + foreach (var revokeKey in revokeKeys) + if (RevokeKeyToCacheKeysIndex.TryGetValue(revokeKey.Data, out var reverseItem)) + if (!reverseItem.CacheKeysSet.Any()) + // We compete with possible call, adding the value to cache, exactly when dequeue-ing values + lock (reverseItem.CacheKeysSet) + if (!reverseItem.CacheKeysSet.Any()) + RevokeKeyToCacheKeysIndex.TryRemove(revokeKey.Data, out _); + + await Task.Delay(periodMs, _cleanUpToken.Token).ConfigureAwait(false); + } + } + catch (TaskCanceledException) + { + } } - - public Task GetOrAdd(string key, Func factory, Type taskResultType, CacheItemPolicyEx policy, string groupName, string logData, string[] metricsKeys) + public Task GetOrAdd(string cacheKey, Func factory, Type taskResultType, CacheItemPolicyEx policy, string cacheGroup, string cacheData, string[] metricsKeys) { - var getValueTask = GetOrAdd(key, () => TaskConverter.ToWeaklyTypedTask(factory(), taskResultType), policy, groupName, logData, metricsKeys, taskResultType); + var getValueTask = GetOrAdd(cacheKey, () => TaskConverter.ToWeaklyTypedTask(factory(), taskResultType), policy, cacheGroup, cacheData, metricsKeys, taskResultType); return TaskConverter.ToStronglyTypedTask(getValueTask, taskResultType); } - - private Task GetOrAdd(string key, Func> factory, CacheItemPolicyEx policy, string groupName, string logData, string[] metricsKeys, Type taskResultType) + private Task GetOrAdd(string cacheKey, Func> factory, CacheItemPolicyEx policy, string cacheGroup, string cacheData, string[] metricsKeys, Type taskResultType) { - var shouldLog = ShouldLog(groupName); + var shouldLog = ShouldLog(cacheGroup); - async Task WrappedFactory(bool removeOnException) + async Task WrappedFactory(AsyncCacheItem item, bool removeOnException) { try { if (shouldLog) - Log.Info(x => x("Cache item is waiting for value to be resolved", unencryptedTags: new - { - cacheKey = key, - cacheGroup = groupName, - cacheData = logData - })); + Log.Info(x => x("Cache item is waiting for value to be resolved", unencryptedTags: new { cacheKey, cacheGroup,cacheData })); + + // Indicate when data source was called for actual result (beginning of request) + var whenCalled = DateTime.UtcNow; var result = await factory().ConfigureAwait(false); + if (shouldLog) + Log.Info(x => x("Cache item value is resolved", unencryptedTags: new { cacheKey, cacheGroup, cacheData, value = GetValueForLogging(result)})); + + var revokeKeys = (result as IRevocable)?.RevokeKeys?.ToArray(); + + // Could happen the item evicted from cache by a policy + if(revokeKeys != null && MemoryCache.Contains(cacheKey)) { - Log.Info(x => x("Cache item value is resolved", unencryptedTags: new - { - cacheKey = key, - cacheGroup = groupName, - cacheData = logData, - value = GetValueForLogging(result) - })); - } - //Can happen if item removed before task is completed - if(MemoryCache.Contains(key)) - { - var revocableResult = result as IRevocable; - if(revocableResult?.RevokeKeys != null) + // Add items in reverse index for revoke keys + foreach (var revokeKey in revokeKeys) { - foreach(var revokeKey in revocableResult.RevokeKeys) - { - var cacheKeys = RevokeKeyToCacheKeysIndex.GetOrAdd(revokeKey, k => new HashSet()); + var reverseEntry = RevokeKeyToCacheKeysIndex.GetOrAdd(revokeKey, k => new ReverseItem()); - lock(cacheKeys) - { - cacheKeys.Add(key); - } - Log.Info(x=>x("RevokeKey added to reverse index", unencryptedTags: new - { - revokeKey = revokeKey, - cacheKey = key, - cacheGroup = groupName, - cacheData = logData - })); - } + lock (reverseEntry) + reverseEntry.CacheKeysSet.Add(cacheKey); + + if (shouldLog) + Log.Info(x => x("RevokeKey added to reverse index", unencryptedTags: new { revokeKey, cacheKey, cacheGroup, cacheData })); } + + AlreadyRevoked(item, whenCalled, revokeKeys, shouldLog, cacheGroup, cacheData); } - AwaitingResult.Decrement(metricsKeys); + + Stats.AwaitingResult.Decrement(metricsKeys); return result; } catch(Exception exception) { - Log.Info(x=>x("Error resolving value for cache item", unencryptedTags: new - { - cacheKey = key, - cacheGroup = groupName, - cacheData = logData, - removeOnException, - errorMessage = exception.Message - })); + Log.Warn(x => x("Error resolving value for cache item", unencryptedTags: new {cacheKey, cacheGroup, cacheData, removeOnException}, exception: exception)); if(removeOnException) - MemoryCache.Remove(key); // Do not cache exceptions. + MemoryCache.Remove(cacheKey); // Do not cache exceptions. - AwaitingResult.Decrement(metricsKeys); - Failed.Mark(metricsKeys); + Stats.AwaitingResult.Decrement(metricsKeys); + Stats.Failed.Mark(metricsKeys); throw; } } - - var newItem = shouldLog ? - new AsyncCacheItem {GroupName = string.Intern(groupName), LogData = logData} : - new AsyncCacheItem (); // if log is not needed, then do not cache unnecessary details which will blow up the memory + var newItem = shouldLog == false + ? new AsyncCacheItem () // if log is not needed, then do not cache unnecessary details which will blow up the memory + : new AsyncCacheItem { GroupName = string.Intern(cacheGroup), LogData = cacheData }; Task resultTask; @@ -262,24 +277,18 @@ async Task WrappedFactory(bool removeOnException) if (typeof(IRevocable).IsAssignableFrom(taskResultType)) policy.RemovedCallback += ItemRemovedCallback; - // Surprisingly, when using MemoryCache.AddOrGetExisting() where the item doesn't exist in the cache, - // null is returned. - var existingItem = (AsyncCacheItem)MemoryCache.AddOrGetExisting(key, newItem, policy); + // Null is returned when the item doesn't exist in the cache! + var existingItem = (AsyncCacheItem)MemoryCache.AddOrGetExisting(cacheKey, newItem, policy); if (existingItem == null) { - Misses.Mark(metricsKeys); - AwaitingResult.Increment(metricsKeys); - newItem.CurrentValueTask = WrappedFactory(true); + Stats.Misses.Mark(metricsKeys); + Stats.AwaitingResult.Increment(metricsKeys); + newItem.CurrentValueTask = WrappedFactory(newItem, true); newItem.NextRefreshTime = DateTime.UtcNow + policy.RefreshTime; resultTask = newItem.CurrentValueTask; if (shouldLog) - Log.Info(x => x("Item added to cache", unencryptedTags: new - { - cacheKey = key, - cacheGroup = groupName, - cacheData = logData - })); + Log.Info(x => x("Item added to cache", unencryptedTags: new {cacheKey, cacheGroup, cacheData})); } else { @@ -289,18 +298,20 @@ async Task WrappedFactory(bool removeOnException) { resultTask = existingItem.CurrentValueTask; - // Start refresh if an existing refresh ins't in progress and we've passed the next refresh time. - if (existingItem.RefreshTask?.IsCompleted != false && DateTime.UtcNow >= existingItem.NextRefreshTime) + // Start refresh, if an existing refresh isn't in progress and we've passed the next refresh time. + if (existingItem.CurrentValueTask.IsCompleted && + existingItem.RefreshTask?.IsCompleted != false && DateTime.UtcNow >= existingItem.NextRefreshTime) { existingItem.RefreshTask = ((Func)(async () => { try { - var getNewValue = WrappedFactory(false); + var getNewValue = WrappedFactory(existingItem, false); await getNewValue.ConfigureAwait(false); existingItem.CurrentValueTask = getNewValue; existingItem.NextRefreshTime = DateTime.UtcNow + policy.RefreshTime; - MemoryCache.Set(new CacheItem(key, existingItem), policy); + if(!existingItem.AlreadyRevoked) + MemoryCache.Set(new CacheItem(cacheKey, existingItem), policy); } catch { @@ -311,15 +322,54 @@ async Task WrappedFactory(bool removeOnException) } if (resultTask.GetAwaiter().IsCompleted) - Hits.Mark(metricsKeys); + Stats.Hits.Mark(metricsKeys); else - JoinedTeam.Mark(metricsKeys); + Stats.JoinedTeam.Mark(metricsKeys); } } return resultTask; } + /// + /// Handle the case the revoke received in the middle of call to data source so the cacheItem should be removed. + /// If won't be removed the cache item contains stall value unless evicted by policy, meaning "wrong" value returned. + /// + private void AlreadyRevoked(AsyncCacheItem cacheItem, DateTime whenCalled, IEnumerable revokeKeys, bool shouldLog, string cacheGroup, string cacheData) + { + foreach (var revokeKey in revokeKeys) + { + if (!RevokeKeyToCacheKeysIndex.TryGetValue(revokeKey, out ReverseItem reverseItem)) + continue; + + // The cached item should be removed as revoke received after calling to data source + if (reverseItem.WhenRevoked < whenCalled) + continue; + + // Signal to refresh task don't cache + cacheItem.AlreadyRevoked = true; + + // Race with OnRevoke, avoid possible modification exception + string[] cacheKeys; + lock (reverseItem) + cacheKeys = reverseItem.CacheKeysSet.ToArray(); + + foreach (var cacheKey in cacheKeys) + { + // Null returned if not found, triggering remove callback (cleaning reverse index) + var isRemoved = MemoryCache.Remove(cacheKey) != null; + + if (shouldLog) + { + var removed = isRemoved; // changing closure in loop + Log.Info(x => x("Removing cacheKey (revoked before call)", unencryptedTags: new { + cacheKey, cacheGroup, cacheData, removed, revokeKey, diff = whenCalled - reverseItem.WhenRevoked + })); + } + } + } + } + private ConcurrentDictionary _revocableValueFieldPerType = new ConcurrentDictionary(); private string GetValueForLogging(object value) { @@ -337,59 +387,51 @@ private string GetValueForLogging(object value) } /// - /// For revocable items , move over all revoke ids in cache index and remove them. + /// For revocable items, move over all revoke keys in cache index and remove them. /// private void ItemRemovedCallback(CacheEntryRemovedArguments arguments) { + var removeReason = arguments.RemovedReason.ToString(); + var cacheKey = arguments.CacheItem.Key; var cacheItem = arguments.CacheItem.Value as AsyncCacheItem; - var shouldLog = ShouldLog(cacheItem?.GroupName); + var cacheGroup = cacheItem?.GroupName; + var cacheData = cacheItem?.LogData; + var shouldLog = ShouldLog(cacheGroup); if (shouldLog) - Log.Info(x=>x("Item removed from cache", unencryptedTags: new - { - cacheKey = arguments.CacheItem.Key, - removeReason = arguments.RemovedReason.ToString(), - cacheGroup = cacheItem?.GroupName, - cacheData = cacheItem?.LogData - })); - - var cachedItem = ((AsyncCacheItem)arguments.CacheItem.Value).CurrentValueTask; - if(cachedItem.Status == TaskStatus.RanToCompletion && (cachedItem.Result as IRevocable)?.RevokeKeys!=null) + Log.Info(x => x("Item removed from cache", unencryptedTags: new{ cacheKey, removeReason, cacheGroup,cacheData})); + + // Task can be null, if wasn't cached and revoked before call + var resultTask = cacheItem?.CurrentValueTask; + + // What if it is not completed? + // We can try to add a continuation allowing us to handle it same way when done? + if(resultTask?.Status == TaskStatus.RanToCompletion) { - foreach(var revokeKey in ((IRevocable)cachedItem.Result).RevokeKeys) + var revokeKeys = (resultTask.Result as IRevocable)?.RevokeKeys ?? Enumerable.Empty(); + foreach(var revokeKey in revokeKeys) { - if (RevokeKeyToCacheKeysIndex.TryGetValue(revokeKey, out HashSet cacheKeys)) + if (RevokeKeyToCacheKeysIndex.TryGetValue(revokeKey, out ReverseItem reverseItem)) { - lock (cacheKeys) + lock (reverseItem) { + var cacheKeys = reverseItem.CacheKeysSet; cacheKeys.Remove(arguments.CacheItem.Key); - Log.Info(x => x("RevokeKey removed from reverse index", unencryptedTags: new - { - cacheKey = arguments.CacheItem.Key, - revokeKey = revokeKey, - removeReason = arguments.RemovedReason.ToString(), - cacheGroup = cacheItem?.GroupName, - cacheData = cacheItem?.LogData - })); + + if (shouldLog) + Log.Info(x => x("CacheKey removed from reverse index", unencryptedTags: new{ cacheKey, revokeKey, removeReason, cacheGroup, cacheData})); if (!cacheKeys.Any()) { if (RevokeKeyToCacheKeysIndex.TryRemove(revokeKey, out _) && shouldLog) - Log.Info(x => x("Reverse index for cache item was removed", unencryptedTags: new - { - cacheKey = arguments.CacheItem.Key, - removeReason = arguments.RemovedReason.ToString(), - cacheGroup = cacheItem?.GroupName, - cacheData = cacheItem?.LogData - })); - + Log.Info(x => x("Reverse index for cache item was removed", unencryptedTags: new{ cacheKey, revokeKey, removeReason, cacheGroup, cacheData})); } } } } } - Items.Meter(arguments.RemovedReason.ToString(), Unit.Items).Mark(); + Stats.Items.Meter(arguments.RemovedReason.ToString(), Unit.Items).Mark(); } private bool ShouldLog(string groupName) @@ -404,21 +446,21 @@ private bool ShouldLog(string groupName) return false; } - public void Clear() { + // TODO: as we aren't completely sure what is go on with ongoing tasks completing on edge of reference replacement var oldMemoryCache = MemoryCache; MemoryCache = new MemoryCache(nameof(AsyncCache) + Interlocked.Increment(ref _clearCount), new NameValueCollection { { "PollingInterval", "00:00:01" } }); if (oldMemoryCache != null) { // Disposing of MemoryCache can be a CPU intensive task and should therefore not block the current thread. + // Triggering callback to run and clean up the reverse index Task.Run(() => oldMemoryCache.Dispose()); - ClearCache.Increment(); + Stats.ClearCache.Increment(); } } - public void UpdateCacheSize(long size, MemoryCache cache) { if (cache != MemoryCache) @@ -427,21 +469,18 @@ public void UpdateCacheSize(long size, MemoryCache cache) LastCacheSizeBytes = size; } - public void ReleaseCache(MemoryCache cache) { } - public object GetService(Type serviceType) { return serviceType == typeof(IMemoryCacheManager) ? this : null; } - public void Dispose() { + try{ _cleanUpToken.Cancel(); }catch (Exception ){ /*ignore already disposed*/ } MemoryCache?.Dispose(); RevokeDisposable?.Dispose(); } } - } \ No newline at end of file diff --git a/Gigya.Microdot.ServiceProxy/Caching/AsyncCacheItem.cs b/Gigya.Microdot.ServiceProxy/Caching/AsyncCacheItem.cs index 137ee2d6..b402eeb9 100644 --- a/Gigya.Microdot.ServiceProxy/Caching/AsyncCacheItem.cs +++ b/Gigya.Microdot.ServiceProxy/Caching/AsyncCacheItem.cs @@ -42,5 +42,10 @@ public class AsyncCacheItem /// Extra data for log purposes (e.g. arguments list) /// public string LogData { get; set; } + + /// + /// Should not be cached, as was revoked while calling to factory + /// + public volatile bool AlreadyRevoked; } } \ No newline at end of file diff --git a/Gigya.Microdot.ServiceProxy/Caching/AsyncMemoizer.cs b/Gigya.Microdot.ServiceProxy/Caching/AsyncMemoizer.cs index f0b84049..6c7cd5f5 100644 --- a/Gigya.Microdot.ServiceProxy/Caching/AsyncMemoizer.cs +++ b/Gigya.Microdot.ServiceProxy/Caching/AsyncMemoizer.cs @@ -72,8 +72,8 @@ public object Memoize(object dataSource, MethodInfo method, object[] args, Cache private string GetArgumentHash(object[] args) { - var stream = new MemoryStream(); using (ComputeArgumentHash.NewContext()) + using (var stream = new MemoryStream()) using (var writer = new StreamWriter(stream) { AutoFlush = true }) using (SHA1 sha = new SHA1CryptoServiceProvider()) { diff --git a/Gigya.Microdot.ServiceProxy/Caching/CacheConfig.cs b/Gigya.Microdot.ServiceProxy/Caching/CacheConfig.cs index d5cda6be..be5aebca 100644 --- a/Gigya.Microdot.ServiceProxy/Caching/CacheConfig.cs +++ b/Gigya.Microdot.ServiceProxy/Caching/CacheConfig.cs @@ -8,6 +8,12 @@ namespace Gigya.Microdot.ServiceProxy.Caching public class CacheConfig: IConfigObject { public bool LogRevokes { get; set; } = false; + + /// + /// Configure the interval in ms to clean revokes without associated cache keys (call ahead revokes) + /// + public int RevokesCleanupMs { get; set; } = 600_000; + public Dictionary Groups { get; } = new Dictionary(StringComparer.InvariantCultureIgnoreCase); } diff --git a/Gigya.Microdot.ServiceProxy/Caching/ReverseItem.cs b/Gigya.Microdot.ServiceProxy/Caching/ReverseItem.cs new file mode 100644 index 00000000..f46ea146 --- /dev/null +++ b/Gigya.Microdot.ServiceProxy/Caching/ReverseItem.cs @@ -0,0 +1,33 @@ +#region Copyright +// Copyright 2017 Gigya Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDER AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. +#endregion + +using System; +using System.Collections.Generic; + +namespace Gigya.Microdot.ServiceProxy.Caching +{ + public class ReverseItem + { + public HashSet CacheKeysSet = new HashSet(); + public DateTime WhenRevoked = DateTime.MinValue; + } +} diff --git a/Gigya.Microdot.ServiceProxy/Gigya.Microdot.ServiceProxy.csproj b/Gigya.Microdot.ServiceProxy/Gigya.Microdot.ServiceProxy.csproj index 3d6106f7..cdfb7a4b 100644 --- a/Gigya.Microdot.ServiceProxy/Gigya.Microdot.ServiceProxy.csproj +++ b/Gigya.Microdot.ServiceProxy/Gigya.Microdot.ServiceProxy.csproj @@ -56,6 +56,7 @@ + diff --git a/Gigya.Microdot.SharedLogic/Collections/TimeBoundConcurrentQueue.cs b/Gigya.Microdot.SharedLogic/Collections/TimeBoundConcurrentQueue.cs new file mode 100644 index 00000000..ddb345d7 --- /dev/null +++ b/Gigya.Microdot.SharedLogic/Collections/TimeBoundConcurrentQueue.cs @@ -0,0 +1,49 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using Gigya.Microdot.SharedLogic.Utils; + +namespace Gigya.Microdot.SharedLogic.Collections +{ + /// + /// A general purpose queue (FIFO) to keep items queued after a cut off time. + /// + /// + /// Items expected to be queued sequentially in time while the next queued greater or equal to previous 'now'. + /// If condition violated, the dequeue will keep items out of expected order. + /// + /// + public class TimeBoundConcurrentQueue + { + public struct Item + { + public DateTimeOffset Time; + public T Data; + } + private readonly ConcurrentQueue _queue = new ConcurrentQueue(); + + public int Count => _queue.Count; + + public void Enqueue(DateTimeOffset now, T data) + { + _queue.Enqueue(new Item { Time = now, Data = data }); + } + + /// + /// Dequeues and returns items from the queue as long as their is older or equal to the provided time. + /// + /// The cut off time to dequeue items older or equal than. + public ICollection Dequeue(DateTimeOffset olderThanOrEqual) + { + var oldItems = new List(); + lock (_queue) + // Break, if an empty queue or an item is younger + while (_queue.TryPeek(out var item) && item.Time <= olderThanOrEqual) + if (_queue.TryDequeue(out item)) + oldItems.Add(item); + else + GAssert.Fail("Failed to dequeue the item."); + return oldItems; + } + } +} \ No newline at end of file diff --git a/Gigya.Microdot.SharedLogic/Gigya.Microdot.SharedLogic.csproj b/Gigya.Microdot.SharedLogic/Gigya.Microdot.SharedLogic.csproj index 882555a8..02f6d5c5 100644 --- a/Gigya.Microdot.SharedLogic/Gigya.Microdot.SharedLogic.csproj +++ b/Gigya.Microdot.SharedLogic/Gigya.Microdot.SharedLogic.csproj @@ -46,6 +46,7 @@ + diff --git a/Gigya.Microdot.Testing.Shared/Utils/EventWaiter.cs b/Gigya.Microdot.Testing.Shared/Utils/EventWaiter.cs index d18618fb..823ef39b 100644 --- a/Gigya.Microdot.Testing.Shared/Utils/EventWaiter.cs +++ b/Gigya.Microdot.Testing.Shared/Utils/EventWaiter.cs @@ -81,10 +81,9 @@ public Task> WhenEventsReceived(int? expectedNumberOfEvents, TimeSpan? t var cancel = new CancellationTokenSource(timeout.Value); var wait = new TaskCompletionSource>(); - cancel.Token.Register( - () => wait.TrySetException( - new Exception( - $"Expected events: {expectedNumberOfEvents}. Received events: {_events.Count}. Timeout after {timeout.Value.TotalMilliseconds} ms"))); + cancel.Token.Register(() => wait.TrySetException( new Exception( $"Expected events: {expectedNumberOfEvents}. " + + $"Received events: {_events.Count}. " + + $"Timeout after {timeout.Value.TotalMilliseconds} ms"))); wait.Task.ContinueWith(x => cancel.Dispose(), TaskContinuationOptions.OnlyOnRanToCompletion); _waiting.Add(new KeyValuePair>>(expectedNumberOfEvents.Value, wait)); diff --git a/SolutionVersion.cs b/SolutionVersion.cs index b199cc20..37706cd6 100644 --- a/SolutionVersion.cs +++ b/SolutionVersion.cs @@ -28,9 +28,9 @@ [assembly: AssemblyCopyright("© 2018 Gigya Inc.")] [assembly: AssemblyDescription("Microdot Framework")] -[assembly: AssemblyVersion("1.13.2.0")] -[assembly: AssemblyFileVersion("1.13.2.0")] -[assembly: AssemblyInformationalVersion("1.13.2.0")] +[assembly: AssemblyVersion("1.13.3.0")] +[assembly: AssemblyFileVersion("1.13.3.0")] +[assembly: AssemblyInformationalVersion("1.13.3.0")] // Setting ComVisible to false makes the types in this assembly not visible // to COM components. If you need to access a type in this assembly from diff --git a/tests/Gigya.Microdot.ServiceContract.UnitTests/Gigya.Microdot.ServiceContract.UnitTests.csproj b/tests/Gigya.Microdot.ServiceContract.UnitTests/Gigya.Microdot.ServiceContract.UnitTests.csproj index d0f9e894..796cc69c 100644 --- a/tests/Gigya.Microdot.ServiceContract.UnitTests/Gigya.Microdot.ServiceContract.UnitTests.csproj +++ b/tests/Gigya.Microdot.ServiceContract.UnitTests/Gigya.Microdot.ServiceContract.UnitTests.csproj @@ -57,16 +57,6 @@ - - - {db6d3561-835e-40d5-b9d4-83951cf426df} - Gigya.ServiceContract - - - {db6d3561-835e-40d5-b9d4-83951cf426df} - Gigya.ServiceContract - - 1) | + | | + |_inMiddleOf.Set(); | + 2)-----------------------------------> + | | + | _inMiddleOf.WaitOne(); + | | _cacheRevoker.Revoke(key); | + | 3)-------------------------------> + | | 4)OnRevoke() + | 5)await eventWaiter; 8.1) |--------> Enqueue + | | | Maintainer + _revokeSent.WaitOne(); 6)revokeSent.Set() | + | | | + 7)<----------------------------------| | + | | | + 8) AlreadyRevoked(...) | | + | | | + <-------------------------+ + + */ + + [Test] + [Repeat(1)] + public async Task RevokeBeforeServiceResultReceived_ShouldRevokeStaleValue() + { + _configDic = new Dictionary(); + _kernel = new TestingKernel(mockConfig: _configDic); + + _kernel.Rebind(typeof(CachingProxyProvider<>)).ToSelf().InTransientScope(); + _kernel.Rebind().ToConstant(new FakeRevokingManager()); + var c = _kernel.Get>()(); // required + + SetupServiceMock(); + SetupDateTime(); + + _proxy = _kernel.Get(); + _cacheRevoker = _kernel.Get(); + _revokeListener = _kernel.Get(); + + + var key = Guid.NewGuid().ToString(); + await ClearCachingPolicyConfig(); + + // Init return value explicitly + _serviceResult = FirstResult; + + // Simulate race between revoke and AddGet + _revokeSent = new ManualResetEvent(false); + _inMiddleOf = new ManualResetEvent(false); + + Task.WaitAll( + + // Call to service to cache FirstResult (and stuck until _revokeDelay signaled) + Task.Run(async () => + { + var result = await _proxy.CallRevocableService(key); + result.Value.ShouldBe(FirstResult, "Result should have been cached"); + }), + + // Revoke the key (not truly, as value is not actually cached, yet). + Task.Run(async() => + { + _inMiddleOf.WaitOne(); + var eventWaiter = _revokeListener.RevokeSource.WhenEventReceived(TimeSpan.FromMinutes(1)); + await _cacheRevoker.Revoke(key); + await eventWaiter; // Wait the revoke will be processed + _revokeSent.Set(); // Signal to continue adding/getting + }) + ); + + // Init return value and expect to be returned, if not cached the first one! + _serviceResult = SecondResult; + await ResultRevocableShouldBe(SecondResult, key, "Result shouldn't have been cached"); + } + + [Test] + public async Task RevokeMaintainer_ShouldCleanupOnlyOlderThan() + { + // Tear down using kernel, despite test doesn't + _configDic = new Dictionary(); + _kernel = new TestingKernel(mockConfig: _configDic); + + var dateTime = DateTime.UtcNow; + var total = 500; + var maintainer = new TimeBoundConcurrentQueue(); + + // Add items, half older, half younger, IT IS A fifo QUEUE! + for (int i = 0; i < total/2; i++) + maintainer.Enqueue(dateTime - TimeSpan.FromHours(1), "revokeKey"); // older + + for (int i = 0; i < total/2; i++) + maintainer.Enqueue(dateTime + TimeSpan.FromHours(1), "revokeKey"); // younger + + // expect to dequeue half + var keys = maintainer.Dequeue(dateTime - TimeSpan.FromSeconds(30)); + + maintainer.Count.ShouldBe(total / 2); + keys.Count.ShouldBe(total / 2); + } + + private void SetupDateTime() + { + _now = DateTime.UtcNow; + var dateTimeMock = Substitute.For(); + dateTimeMock.UtcNow.Returns(_=> _isTrueTime() ? DateTime.UtcNow : _now); + _kernel.Rebind().ToConstant(dateTimeMock); + } + + + private async Task SetCachingPolicyConfig(params string[][] keyValues) + { + bool changed = _configDic.Values.Count != 0 && keyValues.Length == 0; + + _configDic.Clear(); + foreach (var keyValue in keyValues) + { + var key = keyValue[0]; + var value = keyValue[1]; + if (key != null && value != null) + { + _kernel.Get() + .SetValue($"Discovery.Services.CachingTestService.CachingPolicy.{key}", value); + changed = true; + } + } + if (changed) + { + await _kernel.Get().ApplyChanges(); + await Task.Delay(200); + } + } + + private async Task ClearCachingPolicyConfig() + { + await SetCachingPolicyConfig(); + } + + private async Task ResultRevocableShouldBe(string expectedResult, string key, string message = null) + { + var result = await _proxy.CallRevocableService(key); + result.Value.ShouldBe(expectedResult, message); + } + } +} diff --git a/tests/Gigya.Microdot.UnitTests/Gigya.Microdot.UnitTests.csproj b/tests/Gigya.Microdot.UnitTests/Gigya.Microdot.UnitTests.csproj index 034c4f19..8dffe27e 100644 --- a/tests/Gigya.Microdot.UnitTests/Gigya.Microdot.UnitTests.csproj +++ b/tests/Gigya.Microdot.UnitTests/Gigya.Microdot.UnitTests.csproj @@ -49,6 +49,7 @@ +