Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#41458, async cache revoke (in ahead) #233

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 147 additions & 32 deletions .paket/Paket.Restore.targets

Large diffs are not rendered by default.

9 changes: 3 additions & 6 deletions Gigya.Microdot.Fakes/FakeHealthMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#endregion

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using Gigya.Microdot.SharedLogic.Monitor;
using Metrics;
Expand All @@ -29,30 +30,26 @@ namespace Gigya.Microdot.Fakes
{
public class FakeHealthMonitor : IHealthMonitor
{
public Dictionary<string, Func<HealthCheckResult>> Monitors = new Dictionary<string, Func<HealthCheckResult>>();

public ConcurrentDictionary<string, Func<HealthCheckResult>> Monitors = new ConcurrentDictionary<string, Func<HealthCheckResult>>();

public void Dispose()
{

}


public ComponentHealthMonitor Get(string component)
{
throw new NotImplementedException();
}


public Dictionary<string, string> GetData(string component)
{
throw new NotImplementedException();
}


public ComponentHealthMonitor SetHealthFunction(string component, Func<HealthCheckResult> check, Func<Dictionary<string, string>> healthData = null)
{
Monitors[component] = check;
Monitors.AddOrUpdate(component, k => check, (k, v) => check);
return new ComponentHealthMonitor(component, check);
}
}
Expand Down
407 changes: 223 additions & 184 deletions Gigya.Microdot.ServiceProxy/Caching/AsyncCache.cs

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions Gigya.Microdot.ServiceProxy/Caching/AsyncCacheItem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,10 @@ public class AsyncCacheItem
/// Extra data for log purposes (e.g. arguments list)
/// </summary>
public string LogData { get; set; }

/// <summary>
/// Should not be cached, as was revoked while calling to factory
/// </summary>
public volatile bool AlreadyRevoked;
}
}
2 changes: 1 addition & 1 deletion Gigya.Microdot.ServiceProxy/Caching/AsyncMemoizer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ public object Memoize(object dataSource, MethodInfo method, object[] args, Cache

private string GetArgumentHash(object[] args)
{
var stream = new MemoryStream();
using (ComputeArgumentHash.NewContext())
using (var stream = new MemoryStream())
using (var writer = new StreamWriter(stream) { AutoFlush = true })
using (SHA1 sha = new SHA1CryptoServiceProvider())
{
Expand Down
6 changes: 6 additions & 0 deletions Gigya.Microdot.ServiceProxy/Caching/CacheConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ namespace Gigya.Microdot.ServiceProxy.Caching
public class CacheConfig: IConfigObject
{
public bool LogRevokes { get; set; } = false;

/// <summary>
/// Configure the interval in ms to clean revokes without associated cache keys (call ahead revokes)
/// </summary>
public int RevokesCleanupMs { get; set; } = 600_000;

public Dictionary<string, CacheGroupConfig> Groups { get; } = new Dictionary<string, CacheGroupConfig>(StringComparer.InvariantCultureIgnoreCase);
}

Expand Down
33 changes: 33 additions & 0 deletions Gigya.Microdot.ServiceProxy/Caching/ReverseItem.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#region Copyright
// Copyright 2017 Gigya Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDER AND CONTRIBUTORS "AS IS"
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE.
#endregion

using System;
using System.Collections.Generic;

namespace Gigya.Microdot.ServiceProxy.Caching
{
public class ReverseItem
{
public HashSet<string> CacheKeysSet = new HashSet<string>();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CacheKeysSet is use by one thread at a time?

public DateTime WhenRevoked = DateTime.MinValue;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why magic number DateTime.MinValue instead of DateTime?

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
<Compile Include="Caching\AsyncCache.cs" />
<Compile Include="Caching\MetricsExtensions.cs" />
<Compile Include="Caching\CacheConfig.cs" />
<Compile Include="Caching\ReverseItem.cs" />
<Compile Include="CoreFX\DispatchProxy\DispatchProxy.cs" />
<Compile Include="CoreFX\DispatchProxy\DispatchProxyGenerator.cs" />
<Compile Include="DelegatingDispatchProxy.cs" />
Expand Down
49 changes: 49 additions & 0 deletions Gigya.Microdot.SharedLogic/Collections/TimeBoundConcurrentQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
using System;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing Copyright

using System.Collections.Concurrent;
using System.Collections.Generic;
using Gigya.Microdot.SharedLogic.Utils;

namespace Gigya.Microdot.SharedLogic.Collections
{
/// <summary>
/// A general purpose queue (FIFO) to keep items queued after a cut off time.
/// </summary>
/// <remarks>
/// Items expected to be queued sequentially in time while the next queued greater or equal to previous 'now'.
/// If condition violated, the dequeue will keep items out of expected order.
/// </remarks>
/// <typeparam name="T"></typeparam>
public class TimeBoundConcurrentQueue<T>
{
public struct Item
{
public DateTimeOffset Time;
public T Data;
}
private readonly ConcurrentQueue<Item> _queue = new ConcurrentQueue<Item>();

public int Count => _queue.Count;

public void Enqueue(DateTimeOffset now, T data)
{
_queue.Enqueue(new Item { Time = now, Data = data });
}

/// <summary>
/// Dequeues and returns items from the queue as long as their <see cref="Item.Time"/> is older or equal to the provided time.
/// </summary>
/// <param name="olderThanOrEqual">The cut off time to dequeue items older or equal than.</param>
public ICollection<Item> Dequeue(DateTimeOffset olderThanOrEqual)
{
var oldItems = new List<Item>();
lock (_queue)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this lock?
Can we active it with out locking?

// Break, if an empty queue or an item is younger
while (_queue.TryPeek(out var item) && item.Time <= olderThanOrEqual)
if (_queue.TryDequeue(out item))
oldItems.Add(item);
else
GAssert.Fail("Failed to dequeue the item.");
return oldItems;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
</Compile>
<Compile Include="ApplicationDirectoryProvider.cs" />
<Compile Include="AssemblyProvider.cs" />
<Compile Include="Collections\TimeBoundConcurrentQueue.cs" />
<Compile Include="Configurations\LoadShedding.cs" />
<Compile Include="Events\MetadataPropertiesCache.cs" />
<Compile Include="Events\EventSerializer.cs" />
Expand Down
7 changes: 3 additions & 4 deletions Gigya.Microdot.Testing.Shared/Utils/EventWaiter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,9 @@ public Task<List<T>> WhenEventsReceived(int? expectedNumberOfEvents, TimeSpan? t

var cancel = new CancellationTokenSource(timeout.Value);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not relay related but we should try to reduce the use of multiple timers for better performance

var wait = new TaskCompletionSource<List<T>>();
cancel.Token.Register(
() => wait.TrySetException(
new Exception(
$"Expected events: {expectedNumberOfEvents}. Received events: {_events.Count}. Timeout after {timeout.Value.TotalMilliseconds} ms")));
cancel.Token.Register(() => wait.TrySetException( new Exception( $"Expected events: {expectedNumberOfEvents}. " +
$"Received events: {_events.Count}. " +
$"Timeout after {timeout.Value.TotalMilliseconds} ms")));
wait.Task.ContinueWith(x => cancel.Dispose(), TaskContinuationOptions.OnlyOnRanToCompletion);

_waiting.Add(new KeyValuePair<int, TaskCompletionSource<List<T>>>(expectedNumberOfEvents.Value, wait));
Expand Down
6 changes: 3 additions & 3 deletions SolutionVersion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
[assembly: AssemblyCopyright("© 2018 Gigya Inc.")]
[assembly: AssemblyDescription("Microdot Framework")]

[assembly: AssemblyVersion("1.13.2.0")]
[assembly: AssemblyFileVersion("1.13.2.0")]
[assembly: AssemblyInformationalVersion("1.13.2.0")]
[assembly: AssemblyVersion("1.13.3.0")]
[assembly: AssemblyFileVersion("1.13.3.0")]
[assembly: AssemblyInformationalVersion("1.13.3.0")]

// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,6 @@
<ItemGroup>
<Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\Gigya.ServiceContract\Gigya.ServiceContract.csproj">
<Project>{db6d3561-835e-40d5-b9d4-83951cf426df}</Project>
<Name>Gigya.ServiceContract</Name>
</ProjectReference>
<ProjectReference Include="..\..\Gigya.ServiceContract\Gigya.ServiceContract.csproj">
<Project>{db6d3561-835e-40d5-b9d4-83951cf426df}</Project>
<Name>Gigya.ServiceContract</Name>
</ProjectReference>
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.
Expand Down
48 changes: 23 additions & 25 deletions tests/Gigya.Microdot.UnitTests/Caching/AsyncMemoizerRevokesTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
Expand Down Expand Up @@ -29,8 +30,9 @@ public class AsyncMemoizerRevokesTests

private AsyncCache CreateCache(ISourceBlock<string> revokeSource = null)
{

return new AsyncCache(new ConsoleLog(), Metric.Context(cacheContextName), TimeFake, new EmptyRevokeListener { RevokeSource = revokeSource }, ()=>new CacheConfig());

var consoleLog = new ConsoleLog();
return new AsyncCache(consoleLog, Metric.Context(cacheContextName), TimeFake, new EmptyRevokeListener { RevokeSource = revokeSource }, () => new CacheConfig());
}

private IMemoizer CreateMemoizer(AsyncCache cache)
Expand Down Expand Up @@ -82,34 +84,25 @@ public void SetUp()
}

[Test]
public async Task MemoizeAsync_RevokeBeforeRetrivalTaskCompletedCaused_NoIssues()
public async Task MemoizeAsync_RevokeBeforeRetrievalTaskCompletedCaused_NoIssues()
{
var completionSource = new TaskCompletionSource<Revocable<Thing>>();
var dataSource = CreateRevokableDataSource(null, completionSource);
var revokesSource = new OneTimeSynchronousSourceBlock<string>();
var cache = CreateCache(revokesSource);
var memoizer = CreateMemoizer(cache);

//Call method to get results
// Call method to get results
var resultTask = (Task<Revocable<Thing>>)memoizer.Memoize(dataSource, ThingifyTaskRevokabkle, new object[] { "someString" }, GetPolicy());

//Post revoke message while results had not arrived
// Post revoke message while results had not arrived
revokesSource.PostMessageSynced("revokeKey");

//Should have a single discarded revoke in meter
GetMetricsData("Revoke").AssertEquals(new MetricsDataEquatable
{
MetersSettings = new MetricsCheckSetting { CheckValues = true },
Meters = new List<MetricDataEquatable> {
new MetricDataEquatable {Name = "Discarded", Unit = Unit.Events, Value = 1},
}
});

//Wait before sending results
// Wait before sending results
await Task.Delay(100);
completionSource.SetResult(new Revocable<Thing> { Value = new Thing { Id = 5 }, RevokeKeys = new[] { "revokeKey" } });

//Results should arive now
// Results should arrive now
var actual = await resultTask;
dataSource.Received(1).ThingifyTaskRevokable("someString");
actual.Value.Id.ShouldBe(5);
Expand All @@ -134,21 +127,21 @@ public async Task MemoizeAsync_RevokableObjectShouldBeCachedAndRevoked()
dataSource.Received(1).ThingifyTaskRevokable("someString");
actual.Value.Id.ShouldBe(5);

//Read value from cache should be still 5
// Read value from cache should be still 5
actual = await CallWithMemoize(memoizer, dataSource);
dataSource.Received(1).ThingifyTaskRevokable("someString");
actual.Value.Id.ShouldBe(5);
//A single cache key should be stored in index
// A single cache key should be stored in index
cache.CacheKeyCount.ShouldBe(1);

//No metric for Revoke
// No metric for Revoke
GetMetricsData("Revoke").AssertEquals(new MetricsDataEquatable { Meters = new List<MetricDataEquatable> ()});

//Post revoke message, no cache keys should be stored
// Post revoke message, no cache keys should be stored
revokesSource.PostMessageSynced("revokeKey");
cache.CacheKeyCount.ShouldBe(0);

//Should have a single revoke in meter
// Should have a single revoke in meter
GetMetricsData("Revoke").AssertEquals(new MetricsDataEquatable
{
MetersSettings = new MetricsCheckSetting { CheckValues = true },
Expand All @@ -157,7 +150,7 @@ public async Task MemoizeAsync_RevokableObjectShouldBeCachedAndRevoked()
}
});

//Should have a single item removed in meter
// Should have a single item removed in meter
GetMetricsData("Items").AssertEquals(new MetricsDataEquatable
{
MetersSettings = new MetricsCheckSetting { CheckValues = true },
Expand All @@ -167,14 +160,19 @@ public async Task MemoizeAsync_RevokableObjectShouldBeCachedAndRevoked()
});


//Value should change to 6
// Value should change to 6
actual = await CallWithMemoize(memoizer, dataSource);
dataSource.Received(2).ThingifyTaskRevokable("someString");
actual.Value.Id.ShouldBe(6);
cache.CacheKeyCount.ShouldBe(1);

actual = await CallWithMemoize(memoizer, dataSource);
dataSource.Received(2).ThingifyTaskRevokable("someString");
actual.Value.Id.ShouldBe(6);
cache.CacheKeyCount.ShouldBe(1);

//Post revoke message to not existing key value still should be 6
revokesSource.PostMessageSynced("NotExistin-RevokeKey");
// Post revoke message to not existing key value still should be 6
revokesSource.PostMessageSynced("NotExisting-RevokeKey");


actual = await CallWithMemoize(memoizer, dataSource);
Expand Down
15 changes: 11 additions & 4 deletions tests/Gigya.Microdot.UnitTests/Caching/AsyncMemoizerTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
Expand Down Expand Up @@ -48,9 +49,10 @@ private AsyncCache CreateCache(ISourceBlock<string> revokeSource = null)
if(revokeSource != null)
revokeListener.RevokeSource = revokeSource;

return new AsyncCache(new ConsoleLog(), Metric.Context("AsyncCache"), TimeFake, revokeListener, () => new CacheConfig());
var consoleLog = new ConsoleLog();
return new AsyncCache(consoleLog, Metric.Context("AsyncCache"), TimeFake, revokeListener, () => new CacheConfig());
}

private IMemoizer CreateMemoizer(AsyncCache cache)
{
var metadataProvider = new MetadataProvider();
Expand Down Expand Up @@ -276,7 +278,9 @@ public async Task MemoizeAsync_CallAfterRefreshTime_TTLNotExpired()
var dataSource = CreateDataSource(5, 7, 9);
var args = new object[] { "someString" };

IMemoizer memoizer = new AsyncMemoizer(new AsyncCache(new ConsoleLog(), Metric.Context("AsyncCache"), new DateTimeImpl(), new EmptyRevokeListener(), () => new CacheConfig()), new MetadataProvider(), Metric.Context("Tests"));
var consoleLog = new ConsoleLog();
IMemoizer memoizer = new AsyncMemoizer(new AsyncCache(consoleLog, Metric.Context("AsyncCache"), new DateTimeImpl(), new EmptyRevokeListener(), () => new CacheConfig()),
new MetadataProvider(), Metric.Context("Tests"));

// T = 0s. No data in cache, should retrieve value from source (5).
(await (Task<Thing>)memoizer.Memoize(dataSource, ThingifyTaskThing, args, GetPolicy(4, 1))).Id.ShouldBe(5);
Expand Down Expand Up @@ -311,7 +315,10 @@ public async Task MemoizeAsync_BackgroundRefreshFails_TTLNotExtended()
refreshTask.SetException(new MissingFieldException("Boo!!"));
var dataSource = CreateDataSource(870, refreshTask, 1002);

IMemoizer memoizer = new AsyncMemoizer(new AsyncCache(new ConsoleLog(), Metric.Context("AsyncCache"), new DateTimeImpl(), new EmptyRevokeListener(), () => new CacheConfig()), new MetadataProvider(), Metric.Context("Tests"));
var consoleLog = new ConsoleLog();
IMemoizer memoizer = new AsyncMemoizer(new AsyncCache(consoleLog, Metric.Context("AsyncCache"), new DateTimeImpl(), new EmptyRevokeListener(), () => new CacheConfig()),
new MetadataProvider(), Metric.Context("Tests"));


// T = 0s. No data in cache, should retrieve value from source (870).
(await (Task<Thing>)memoizer.Memoize(dataSource, ThingifyTaskThing, args, GetPolicy(5, 1, 100))).Id.ShouldBe(870);
Expand Down
Loading