Skip to content

Commit

Permalink
feat(query): Support IAsyncEnumerable
Browse files Browse the repository at this point in the history
  • Loading branch information
skarllot committed Jul 30, 2023
1 parent 00868d0 commit c5aee6d
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 1 deletion.
8 changes: 8 additions & 0 deletions src/Expressions.Database/Queries/QueryLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,17 @@ internal static class QueryLog
4,
"Error trying to query the single element");

private static readonly Action<ILogger, Exception?> s_asyncEnumerableErrorCallback = LoggerMessage.Define(
LogLevel.Error,
5,
"Error trying to enumerate found elements");

public static void AnyError(ILogger logger, Exception exception) => s_anyErrorCallback(logger, exception);
public static void CountError(ILogger logger, Exception exception) => s_countErrorCallback(logger, exception);
public static void FirstError(ILogger logger, Exception exception) => s_firstErrorCallback(logger, exception);
public static void ListError(ILogger logger, Exception exception) => s_listErrorCallback(logger, exception);
public static void SingleError(ILogger logger, Exception exception) => s_singleErrorCallback(logger, exception);

public static void AsyncEnumerableError(ILogger logger, Exception exception) =>
s_asyncEnumerableErrorCallback(logger, exception);
}
26 changes: 25 additions & 1 deletion src/Expressions.EntityFrameworkCore/Queries/EFQuery.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Microsoft.EntityFrameworkCore;
using System.Runtime.CompilerServices;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Raiqub.Expressions.Queries;

Expand Down Expand Up @@ -131,4 +132,27 @@ and not InvalidOperationException
throw;
}
}

public async IAsyncEnumerable<TResult> ToAsyncEnumerable(
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
IAsyncEnumerable<TResult> enumerable;
try
{
enumerable = _dataSource.AsAsyncEnumerable();
}
catch (Exception exception) when (exception is not ArgumentNullException
and not InvalidOperationException)
{
QueryLog.AsyncEnumerableError(_logger, exception);
throw;
}

await foreach (TResult result in enumerable
.WithCancellation(cancellationToken)
.ConfigureAwait(false))
{
yield return result;
}
}
}
5 changes: 5 additions & 0 deletions src/Expressions.Marten/Queries/MartenQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,9 @@ and not InvalidOperationException
throw;
}
}

public IAsyncEnumerable<TResult> ToAsyncEnumerable(CancellationToken cancellationToken = default)
{
return _dataSource.ToAsyncEnumerable(cancellationToken);
}
}
8 changes: 8 additions & 0 deletions src/Expressions.Reading/Queries/IQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,12 @@ public interface IQuery<T>
/// <exception cref="InvalidOperationException">Query contains more than one element.</exception>
/// <exception cref="OperationCanceledException">If the <see cref="CancellationToken" /> is canceled.</exception>
Task<T?> SingleOrDefaultAsync(CancellationToken cancellationToken = default);

/// <summary>
/// Execute this query to an <see cref="IAsyncEnumerable{T}"/>. This is valuable for reading
/// and processing large result sets without having to keep the entire result set in memory
/// </summary>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> to observe while waiting for the task to complete.</param>
/// <returns>The query results.</returns>
IAsyncEnumerable<T> ToAsyncEnumerable(CancellationToken cancellationToken = default);
}

0 comments on commit c5aee6d

Please sign in to comment.