Skip to content

Commit

Permalink
allows users to not save enumerated data in the StateManager (#194)
Browse files Browse the repository at this point in the history
  • Loading branch information
slorello89 authored Sep 12, 2022
1 parent 925ef38 commit 684e980
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 12 deletions.
10 changes: 10 additions & 0 deletions src/Redis.OM/RedisConnectionProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,15 @@ public RedisConnectionProvider(IConnectionMultiplexer connectionMultiplexer)
/// <returns>A RedisCollection.</returns>
public IRedisCollection<T> RedisCollection<T>(int chunkSize = 100)
where T : notnull => new RedisCollection<T>(Connection, chunkSize);

/// <summary>
/// Gets a redis collection.
/// </summary>
/// <typeparam name="T">The type the collection will be retrieving.</typeparam>
/// <param name="saveState">Whether or not the RedisCollection should maintain the state of documents it enumerates.</param>
/// <param name="chunkSize">Size of chunks to use during pagination, larger chunks = larger payloads returned but fewer round trips.</param>
/// <returns>A RedisCollection.</returns>
public IRedisCollection<T> RedisCollection<T>(bool saveState, int chunkSize = 100)
where T : notnull => new RedisCollection<T>(Connection, saveState, chunkSize);
}
}
14 changes: 7 additions & 7 deletions src/Redis.OM/SearchExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public static IRedisCollection<T> Where<T>(this IRedisCollection<T> source, Expr
null,
GetMethodInfo(Where, source, expression),
new[] { source.Expression, Expression.Quote(expression) });
return new RedisCollection<T>((RedisQueryProvider)source.Provider, exp, source.StateManager, combined, source.ChunkSize);
return new RedisCollection<T>((RedisQueryProvider)source.Provider, exp, source.StateManager, combined, source.SaveState, source.ChunkSize);
}

/// <summary>
Expand All @@ -116,7 +116,7 @@ public static IRedisCollection<TR> Select<T, TR>(this IRedisCollection<T> source
null,
GetMethodInfo(Select, source, expression),
new[] { source.Expression, Expression.Quote(expression) });
return new RedisCollection<TR>((RedisQueryProvider)source.Provider, exp, source.StateManager, null, source.ChunkSize);
return new RedisCollection<TR>((RedisQueryProvider)source.Provider, exp, source.StateManager, null, source.SaveState, source.ChunkSize);
}

/// <summary>
Expand All @@ -134,7 +134,7 @@ public static IRedisCollection<T> Skip<T>(this IRedisCollection<T> source, int c
null,
GetMethodInfo(Skip, source, count),
new[] { source.Expression, Expression.Constant(count) });
return new RedisCollection<T>((RedisQueryProvider)source.Provider, exp, source.StateManager, collection.BooleanExpression, source.ChunkSize);
return new RedisCollection<T>((RedisQueryProvider)source.Provider, exp, source.StateManager, collection.BooleanExpression, source.SaveState, source.ChunkSize);
}

/// <summary>
Expand All @@ -152,7 +152,7 @@ public static IRedisCollection<T> Take<T>(this IRedisCollection<T> source, int c
null,
GetMethodInfo(Take, source, count),
new[] { source.Expression, Expression.Constant(count) });
return new RedisCollection<T>((RedisQueryProvider)source.Provider, exp, source.StateManager, collection.BooleanExpression, source.ChunkSize);
return new RedisCollection<T>((RedisQueryProvider)source.Provider, exp, source.StateManager, collection.BooleanExpression, source.SaveState, source.ChunkSize);
}

/// <summary>
Expand Down Expand Up @@ -480,7 +480,7 @@ public static IRedisCollection<T> GeoFilter<T>(this IRedisCollection<T> source,
Expression.Constant(lat),
Expression.Constant(radius),
Expression.Constant(unit));
return new RedisCollection<T>((RedisQueryProvider)source.Provider, exp, source.StateManager, collection.BooleanExpression, source.ChunkSize);
return new RedisCollection<T>((RedisQueryProvider)source.Provider, exp, source.StateManager, collection.BooleanExpression, source.SaveState, source.ChunkSize);
}

/// <summary>
Expand All @@ -499,7 +499,7 @@ public static IRedisCollection<T> OrderBy<T, TField>(this IRedisCollection<T> so
null,
GetMethodInfo(OrderBy, source, expression),
new[] { source.Expression, Expression.Quote(expression) });
return new RedisCollection<T>((RedisQueryProvider)source.Provider, exp, source.StateManager, collection.BooleanExpression, source.ChunkSize);
return new RedisCollection<T>((RedisQueryProvider)source.Provider, exp, source.StateManager, collection.BooleanExpression, source.SaveState, source.ChunkSize);
}

/// <summary>
Expand All @@ -518,7 +518,7 @@ public static IRedisCollection<T> OrderByDescending<T, TField>(this IRedisCollec
null,
GetMethodInfo(OrderByDescending, source, expression),
new[] { source.Expression, Expression.Quote(expression) });
return new RedisCollection<T>((RedisQueryProvider)source.Provider, exp, source.StateManager, collection.BooleanExpression, source.ChunkSize);
return new RedisCollection<T>((RedisQueryProvider)source.Provider, exp, source.StateManager, collection.BooleanExpression, source.SaveState, source.ChunkSize);
}

/// <summary>
Expand Down
5 changes: 5 additions & 0 deletions src/Redis.OM/Searching/IRedisCollection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ namespace Redis.OM.Searching
/// <typeparam name="T">The type.</typeparam>
public interface IRedisCollection<T> : IOrderedQueryable<T>, IAsyncEnumerable<T>
{
/// <summary>
/// Gets a value indicating whether gets whether the collection is meant to save the state of the records enumerated into it.
/// </summary>
bool SaveState { get; }

/// <summary>
/// Gets the collection state manager.
/// </summary>
Expand Down
40 changes: 37 additions & 3 deletions src/Redis.OM/Searching/RedisCollection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ public class RedisCollection<T> : IRedisCollection<T>
/// <param name="connection">Connection to Redis.</param>
/// <param name="chunkSize">Size of chunks to pull back during pagination, defaults to 100.</param>
public RedisCollection(IRedisConnection connection, int chunkSize = 100)
: this(connection, true, chunkSize)
{
}

/// <summary>
/// Initializes a new instance of the <see cref="RedisCollection{T}"/> class.
/// </summary>
/// <param name="saveState">Determines whether or not the Redis Colleciton will maintain it's state internally.</param>
/// <param name="connection">Connection to Redis.</param>
/// <param name="chunkSize">Size of chunks to pull back during pagination, defaults to 100.</param>
/// <exception cref="ArgumentException">Thrown if the root attribute of the Redis Colleciton is not decorated with a DocumentAttribute.</exception>
public RedisCollection(IRedisConnection connection, bool saveState, int chunkSize)
{
var t = typeof(T);
DocumentAttribute rootAttribute = t.GetCustomAttribute<DocumentAttribute>();
Expand All @@ -41,6 +53,7 @@ public RedisCollection(IRedisConnection connection, int chunkSize = 100)

ChunkSize = chunkSize;
_connection = connection;
SaveState = saveState;
StateManager = new RedisCollectionStateManager(rootAttribute);
Initialize(new RedisQueryProvider(connection, StateManager, rootAttribute, ChunkSize), null, null);
}
Expand All @@ -51,13 +64,15 @@ public RedisCollection(IRedisConnection connection, int chunkSize = 100)
/// <param name="provider">Query Provider.</param>
/// <param name="expression">Expression to be parsed for the query.</param>
/// <param name="stateManager">Manager of the internal state of the collection.</param>
/// <param name="saveState">Whether or not the StateManager will maintain the state.</param>
/// <param name="chunkSize">Size of chunks to pull back during pagination, defaults to 100.</param>
/// <param name="booleanExpression">The expression to build the filter from.</param>
internal RedisCollection(RedisQueryProvider provider, Expression expression, RedisCollectionStateManager stateManager, Expression<Func<T, bool>>? booleanExpression, int chunkSize = 100)
internal RedisCollection(RedisQueryProvider provider, Expression expression, RedisCollectionStateManager stateManager, Expression<Func<T, bool>>? booleanExpression, bool saveState, int chunkSize = 100)
{
StateManager = stateManager;
_connection = provider.Connection;
ChunkSize = chunkSize;
SaveState = saveState;
Initialize(provider, expression, booleanExpression);
}

Expand All @@ -70,6 +85,9 @@ internal RedisCollection(RedisQueryProvider provider, Expression expression, Red
/// <inheritdoc/>
public IQueryProvider Provider { get; private set; } = default!;

/// <inheritdoc />
public bool SaveState { get; }

/// <summary>
/// Gets manages the state of the items queried from Redis.
/// </summary>
Expand Down Expand Up @@ -484,7 +502,7 @@ public T Single(Expression<Func<T, bool>> expression)
public IEnumerator<T> GetEnumerator()
{
StateManager.Clear();
return new RedisCollectionEnumerator<T>(Expression, _connection, ChunkSize, StateManager, BooleanExpression);
return new RedisCollectionEnumerator<T>(Expression, _connection, ChunkSize, StateManager, BooleanExpression, SaveState);
}

/// <inheritdoc/>
Expand All @@ -496,6 +514,14 @@ IEnumerator IEnumerable.GetEnumerator()
/// <inheritdoc/>
public void Save()
{
if (!SaveState)
{
throw new InvalidOperationException(
"The RedisCollection has been instructed to not maintain the state of records enumerated by " +
"Redis making the attempt to Save Invalid. Please initialize the RedisCollection with saveState " +
"set to true to Save documents in the RedisCollection");
}

var diff = StateManager.DetectDifferences();
foreach (var item in diff)
{
Expand All @@ -516,6 +542,14 @@ public void Save()
/// <inheritdoc/>
public async ValueTask SaveAsync()
{
if (!SaveState)
{
throw new InvalidOperationException(
"The RedisCollection has been instructed to not maintain the state of records enumerated by " +
"Redis making the attempt to Save Invalid. Please initialize the RedisCollection with saveState " +
"set to true to Save documents in the RedisCollection");
}

var diff = StateManager.DetectDifferences();
var tasks = new List<Task<int?>>();
foreach (var item in diff)
Expand Down Expand Up @@ -600,7 +634,7 @@ public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToke
{
var provider = (RedisQueryProvider)Provider;
StateManager.Clear();
return new RedisCollectionEnumerator<T>(Expression, provider.Connection, ChunkSize, StateManager, BooleanExpression);
return new RedisCollectionEnumerator<T>(Expression, provider.Connection, ChunkSize, StateManager, BooleanExpression, SaveState);
}

private static MethodInfo GetMethodInfo<T1, T2>(Func<T1, T2> f, T1 unused)
Expand Down
10 changes: 9 additions & 1 deletion src/Redis.OM/Searching/RedisCollectionEnumerator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ internal class RedisCollectionEnumerator<T> : IEnumerator<T>, IAsyncEnumerator<T
private readonly RedisQuery _query;
private readonly Type? _primitiveType;
private readonly bool _limited;
private readonly bool _saveState;
private readonly IRedisConnection _connection;
private readonly RedisCollectionStateManager _stateManager;
private SearchResponse<T> _records = new (new RedisReply(new RedisReply[] { 0 }));
Expand All @@ -36,7 +37,8 @@ internal class RedisCollectionEnumerator<T> : IEnumerator<T>, IAsyncEnumerator<T
/// <param name="chunkSize">the size of a chunk to pull back.</param>
/// <param name="stateManager">the state manager.</param>
/// <param name="booleanExpression">The main boolean expression to use to build the filter.</param>
public RedisCollectionEnumerator(Expression exp, IRedisConnection connection, int chunkSize, RedisCollectionStateManager stateManager, Expression<Func<T, bool>>? booleanExpression)
/// <param name="saveState">Determins whether the records from the RedisCollection are stored in the StateManager.</param>
public RedisCollectionEnumerator(Expression exp, IRedisConnection connection, int chunkSize, RedisCollectionStateManager stateManager, Expression<Func<T, bool>>? booleanExpression, bool saveState)
{
Type rootType;
var t = typeof(T);
Expand All @@ -63,6 +65,7 @@ public RedisCollectionEnumerator(Expression exp, IRedisConnection connection, in

_connection = connection;
_stateManager = stateManager;
_saveState = saveState;
}

/// <summary>
Expand Down Expand Up @@ -176,6 +179,11 @@ private async ValueTask<bool> GetNextChunkAsync()

private void ConcatenateRecords()
{
if (!_saveState)
{
return;
}

foreach (var record in _records.Documents)
{
if (_stateManager.Data.ContainsKey(record.Key) || _primitiveType != null)
Expand Down
2 changes: 1 addition & 1 deletion src/Redis.OM/Searching/RedisQueryProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public IQueryable<TElement> CreateQuery<TElement>(Expression expression)
where TElement : notnull
{
var booleanExpression = expression as Expression<Func<TElement, bool>>;
return new RedisCollection<TElement>(this, expression, StateManager, booleanExpression);
return new RedisCollection<TElement>(this, expression, StateManager, booleanExpression, true);
}

/// <inheritdoc/>
Expand Down
23 changes: 23 additions & 0 deletions test/Redis.OM.Unit.Tests/RediSearchTests/SearchFunctionalTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -810,5 +810,28 @@ public async Task TestMultipleContains()
Assert.Contains(people, x =>x.Id == person1.Id);
Assert.Contains(people, x =>x.Id == person2.Id);
}

[Fact]
public async Task TestShouldFailForSave()
{
var expectedText = "The RedisCollection has been instructed to not maintain the state of records enumerated by " +
"Redis making the attempt to Save Invalid. Please initialize the RedisCollection with saveState " +
"set to true to Save documents in the RedisCollection";
var collection = new RedisCollection<Person>(_connection, false, 100);
var ex = await Assert.ThrowsAsync<InvalidOperationException>(collection.SaveAsync().AsTask);
Assert.Equal(expectedText, ex.Message);
ex = Assert.Throws<InvalidOperationException>(() =>collection.Save());
Assert.Equal(expectedText, ex.Message);
}

[Fact]
public async Task TestStatelessCollection()
{
var collection = new RedisCollection<Person>(_connection, false, 10000);
var res = await collection.ToListAsync();
Assert.True(res.Count >= 1);
Assert.Equal(0,collection.StateManager.Data.Count);
Assert.Equal(0,collection.StateManager.Snapshot.Count);
}
}
}

0 comments on commit 684e980

Please sign in to comment.