Skip to content

Commit

Permalink
Bulk Operations Delete & Update (#278)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeevananthan-23 authored Jan 13, 2023
1 parent ff528ac commit c1461c1
Show file tree
Hide file tree
Showing 5 changed files with 393 additions and 100 deletions.
16 changes: 16 additions & 0 deletions src/Redis.OM/RedisCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,14 @@ public static async Task<IDictionary<string, string>> HGetAllAsync(this IRedisCo
/// <returns>the status.</returns>
public static string Unlink(this IRedisConnection connection, string key) => connection.Execute("UNLINK", key);

/// <summary>
/// Unlinks array of keys.
/// </summary>
/// <param name="connection">the connection.</param>
/// <param name="keys">the keys to unlink.</param>
/// <returns>the status.</returns>
public static string Unlink(this IRedisConnection connection, string[] keys) => connection.Execute("UNLINK", keys);

/// <summary>
/// Unlinks a key.
/// </summary>
Expand All @@ -762,6 +770,14 @@ public static async Task<IDictionary<string, string>> HGetAllAsync(this IRedisCo
/// <returns>the status.</returns>
public static async Task<string> UnlinkAsync(this IRedisConnection connection, string key) => await connection.ExecuteAsync("UNLINK", key);

/// <summary>
/// Unlinks array of keys.
/// </summary>
/// <param name="connection">the connection.</param>
/// <param name="keys">the keys to unlink.</param>
/// <returns>the status.</returns>
public static async Task<string> UnlinkAsync(this IRedisConnection connection, string[] keys) => await connection.ExecuteAsync("UNLINK", keys);

/// <summary>
/// Unlinks the key and then adds an updated value of it.
/// </summary>
Expand Down
50 changes: 35 additions & 15 deletions src/Redis.OM/Searching/IRedisCollection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,21 @@ public interface IRedisCollection<T> : IOrderedQueryable<T>, IAsyncEnumerable<T>
/// <returns>the Id of the newly inserted item, or null if not inserted.</returns>
string? Insert(T item, WhenKey when, TimeSpan? timeSpan = null);

/// <summary>
/// Inserts list of items into redis.
/// </summary>
/// <param name="items">The items to insert.</param>
/// <returns>The list of Keys.</returns>
Task<List<string>> Insert(IEnumerable<T> items);

/// <summary>
/// Inserts list of items into redis.
/// </summary>
/// <param name="items">The items to insert.</param>
/// <param name="timeSpan">The timespan of the document's (TTL).</param>
/// /// <returns>The list of Keys.</returns>
Task<List<string>> Insert(IEnumerable<T> items, TimeSpan timeSpan);

/// <summary>
/// finds an item by it's ID or keyname.
/// </summary>
Expand Down Expand Up @@ -127,19 +142,39 @@ public interface IRedisCollection<T> : IOrderedQueryable<T>, IAsyncEnumerable<T>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
Task UpdateAsync(T item);

/// <summary>
/// Updates the provided items in Redis. Document must have a property marked with the <see cref="RedisIdFieldAttribute"/>.
/// </summary>
/// <param name="items">The items to update.</param>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
ValueTask UpdateAsync(IEnumerable<T> items);

/// <summary>
/// Deletes the item from Redis.
/// </summary>
/// <param name="item">The item to be deleted.</param>
void Delete(T item);

/// <summary>
/// Deletes the List of items from Redis.
/// </summary>
/// <param name="items">The items to be deleted.</param>
void Delete(IEnumerable<T> items);

/// <summary>
/// Deletes the item from Redis.
/// </summary>
/// <param name="item">The item to be deleted.</param>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
Task DeleteAsync(T item);

/// <summary>
/// Deletes the List of items from Redis.
/// </summary>
/// <param name="items">The items to be deleted.</param>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
Task DeleteAsync(IEnumerable<T> items);

/// <summary>
/// Async method for enumerating the collection to a list.
/// </summary>
Expand Down Expand Up @@ -266,20 +301,5 @@ public interface IRedisCollection<T> : IOrderedQueryable<T>, IAsyncEnumerable<T>
/// <param name="ids">The Ids to look up.</param>
/// <returns>A dictionary correlating the ids provided to the objects in Redis.</returns>
Task<IDictionary<string, T?>> FindByIdsAsync(IEnumerable<string> ids);

/// <summary>
/// Inserts list of items into redis.
/// </summary>
/// <param name="items">The items to insert.</param>
/// <returns>The list of Keys.</returns>
Task<List<string>> Insert(IEnumerable<T> items);

/// <summary>
/// Inserts list of items into redis.
/// </summary>
/// <param name="items">The items to insert.</param>
/// <param name="timeSpan">The timespan of the document's (TTL).</param>
/// /// <returns>The list of Keys.</returns>
Task<List<string>> Insert(IEnumerable<T> items, TimeSpan timeSpan);
}
}
109 changes: 81 additions & 28 deletions src/Redis.OM/Searching/RedisCollection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Linq;
using System.Linq.Expressions;
using System.Reflection;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Redis.OM.Common;
Expand Down Expand Up @@ -191,6 +192,14 @@ public async Task UpdateAsync(T item)
SaveToStateManager(key, item);
}

/// <inheritdoc />
public async ValueTask UpdateAsync(IEnumerable<T> items)
{
var tasks = items.Select(UpdateAsync);

await Task.WhenAll(tasks);
}

/// <inheritdoc />
public void Delete(T item)
{
Expand All @@ -199,6 +208,23 @@ public void Delete(T item)
StateManager.Remove(key);
}

/// <inheritdoc />
public void Delete(IEnumerable<T> items)
{
var keys = items.Select(x => x.GetKey()).ToArray();
if (!keys.Any())
{
return;
}

foreach (var key in keys)
{
StateManager.Remove(key);
}

_connection.Unlink(keys);
}

/// <inheritdoc />
public async Task DeleteAsync(T item)
{
Expand All @@ -207,6 +233,23 @@ public async Task DeleteAsync(T item)
StateManager.Remove(key);
}

/// <inheritdoc />
public async Task DeleteAsync(IEnumerable<T> items)
{
var keys = items.Select(x => x.GetKey()).ToArray();
if (!keys.Any())
{
return;
}

foreach (var key in keys)
{
StateManager.Remove(key);
}

await _connection.UnlinkAsync(keys);
}

/// <inheritdoc />
public async Task<IList<T>> ToListAsync()
{
Expand Down Expand Up @@ -604,6 +647,44 @@ public async Task<string> InsertAsync(T item, TimeSpan timeSpan)
return ((RedisQueryProvider)Provider).Connection.Set(item, when, timeSpan);
}

/// <inheritdoc/>
public async Task<List<string>> Insert(IEnumerable<T> items)
{
var distinct = items.Distinct().ToArray();
if (!distinct.Any())
{
return new List<string>();
}

var tasks = new List<Task<string>>();
foreach (var item in distinct)
{
tasks.Add(((RedisQueryProvider)Provider).Connection.SetAsync(item));
}

var result = await Task.WhenAll(tasks);
return result.ToList();
}

/// <inheritdoc/>
public async Task<List<string>> Insert(IEnumerable<T> items, TimeSpan timeSpan)
{
var distinct = items.Distinct().ToArray();
if (!distinct.Any())
{
return new List<string>();
}

var tasks = new List<Task<string>>();
foreach (var item in distinct)
{
tasks.Add(((RedisQueryProvider)Provider).Connection.SetAsync(item, timeSpan));
}

var result = await Task.WhenAll(tasks);
return result.ToList();
}

/// <inheritdoc/>
public T? FindById(string id)
{
Expand Down Expand Up @@ -640,34 +721,6 @@ public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToke
return new RedisCollectionEnumerator<T>(Expression, provider.Connection, ChunkSize, StateManager, BooleanExpression, SaveState);
}

/// <inheritdoc/>
public async Task<List<string>> Insert(IEnumerable<T> items)
{
var tasks = new List<Task<string>>();
foreach (var item in items.Distinct())
{
tasks.Add(((RedisQueryProvider)Provider).Connection.SetAsync(item));
}

await Task.WhenAll(tasks);
var result = tasks.Select(x => x.Result).ToList();
return result;
}

/// <inheritdoc/>
public async Task<List<string>> Insert(IEnumerable<T> items, TimeSpan timeSpan)
{
var tasks = new List<Task<string>>();
foreach (var item in items.Distinct())
{
tasks.Add(((RedisQueryProvider)Provider).Connection.SetAsync(item, timeSpan));
}

await Task.WhenAll(tasks);
var result = tasks.Select(x => x.Result).ToList();
return result;
}

private static MethodInfo GetMethodInfo<T1, T2>(Func<T1, T2> f, T1 unused)
{
return f.Method;
Expand Down
Loading

0 comments on commit c1461c1

Please sign in to comment.