Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Chunk.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
namespace Csdsa.Algorithms.StreamProccessing;

public static partial class StreamProcessing
{
/// <summary>
/// Splits the source sequence into contiguous chunks of the specified size.
/// </summary>
/// <typeparam name="T">The element type.</typeparam>
/// <param name="source">The source sequence.</param>
/// <param name="chunkSize">The size of each chunk. Must be positive.</param>
/// <returns>An <see cref="IEnumerable{T}"/> of chunks, each chunk represented as a <see cref="List{T}"/>.</returns>
/// <exception cref="ArgumentNullException">
/// Thrown when <paramref name="source"/> is <see langword="null"/>.
/// </exception>
/// <exception cref="ArgumentOutOfRangeException">
/// Thrown when <paramref name="chunkSize"/> is less than or equal to zero.
/// </exception>
public static IEnumerable<List<T>> Chunk<T>(
this IEnumerable<T> source,
int chunkSize)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(chunkSize);

List<T> buffer = new List<T>(chunkSize);

foreach (T item in source)
{
buffer.Add(item);

if (buffer.Count == chunkSize)
{
yield return new List<T>(buffer);
buffer.Clear();
}
}

if (buffer.Count > 0)
{
yield return buffer;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
using System.Runtime.CompilerServices;

namespace Csdsa.Algorithms.StreamProccessing;

public static partial class StreamProcessing
{
/// <summary>
/// Asynchronously splits an async sequence into contiguous chunks of the specified size.
/// </summary>
/// <typeparam name="T">The element type of the async sequence.</typeparam>
/// <param name="source">The async sequence to split into chunks.</param>
/// <param name="chunkSize">The size of each chunk. Must be positive.</param>
/// <param name="cancellationToken">The cancellation token used to cancel the asynchronous operation.</param>
/// <returns>
/// An async sequence of chunks, each represented as a <see cref="List{T}"/>,
/// containing up to <paramref name="chunkSize"/> elements.
/// </returns>
/// <exception cref="ArgumentNullException">
/// Thrown when <paramref name="source"/> is <see langword="null"/>.
/// </exception>
/// <exception cref="ArgumentOutOfRangeException">
/// Thrown when <paramref name="chunkSize"/> is less than or equal to zero.
/// </exception>
public static async IAsyncEnumerable<List<T>> ChunkAsync<T>(
this IAsyncEnumerable<T> source,
int chunkSize,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(chunkSize);

List<T> buffer = new List<T>(chunkSize);

await foreach (T item in source.WithCancellation(cancellationToken))
{
buffer.Add(item);

if (buffer.Count == chunkSize)
{
yield return new List<T>(buffer);
buffer.Clear();
}
}

if (buffer.Count > 0)
{
yield return buffer;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
namespace Csdsa.Algorithms.StreamProccessing;

public static partial class StreamProcessing
{
/// <summary>
/// Returns distinct elements from a sequence by using a specified key selector.
/// </summary>
/// <typeparam name="T">The element type.</typeparam>
/// <typeparam name="TKey">The type of the key returned by the selector.</typeparam>
/// <param name="source">The sequence to remove duplicate elements from.</param>
/// <param name="keySelector">A function to extract the key for each element.</param>
/// <returns>
/// An <see cref="IEnumerable{T}"/> that contains distinct elements from the source sequence.
/// </returns>
/// <exception cref="ArgumentNullException">
/// Thrown when <paramref name="source"/> or <paramref name="keySelector"/> is <see langword="null"/>.
/// </exception>
public static IEnumerable<T> DistinctBy<T, TKey>(
this IEnumerable<T> source,
Func<T, TKey> keySelector)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentNullException.ThrowIfNull(keySelector);

HashSet<TKey> seen = new HashSet<TKey>();

foreach (T item in source)
{
if (seen.Add(keySelector(item)))
{
yield return item;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using System.Runtime.CompilerServices;

namespace Csdsa.Algorithms.StreamProccessing;

public static partial class StreamProcessing
{
/// <summary>
/// Asynchronously returns distinct elements from an async sequence by using a specified key selector.
/// </summary>
/// <typeparam name="T">The element type of the async sequence.</typeparam>
/// <typeparam name="TKey">The type of the key returned by the selector.</typeparam>
/// <param name="source">The async sequence to remove duplicate elements from.</param>
/// <param name="keySelector">A function to extract the key for each element.</param>
/// <param name="cancellationToken">The cancellation token used to cancel the asynchronous operation.</param>
/// <returns>
/// An async sequence that contains distinct elements from the source sequence.
/// </returns>
/// <exception cref="ArgumentNullException">
/// Thrown when <paramref name="source"/> or <paramref name="keySelector"/> is <see langword="null"/>.
/// </exception>
public static async IAsyncEnumerable<T> DistinctByAsync<T, TKey>(
this IAsyncEnumerable<T> source,
Func<T, TKey> keySelector,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentNullException.ThrowIfNull(keySelector);

HashSet<TKey> seen = new HashSet<TKey>();

await foreach (T item in source.WithCancellation(cancellationToken))
{
if (seen.Add(keySelector(item)))
{
yield return item;
}
}
}
}
30 changes: 30 additions & 0 deletions src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Filter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
namespace Csdsa.Algorithms.StreamProccessing;

public static partial class StreamProcessing
{
/// <summary>
/// Filters a sequence of values based on a predicate.
/// </summary>
/// <typeparam name="T">The element type of the sequence.</typeparam>
/// <param name="source">The sequence to filter.</param>
/// <param name="predicate">A function to test each element for a condition.</param>
/// <returns>An <see cref="IEnumerable{T}"/> that contains elements from the input sequence that satisfy the condition.</returns>
/// <exception cref="ArgumentNullException">
/// Thrown when <paramref name="source"/> or <paramref name="predicate"/> is <see langword="null"/>.
/// </exception>
public static IEnumerable<T> Filter<T>(
this IEnumerable<T> source,
Func<T, bool> predicate)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentNullException.ThrowIfNull(predicate);

foreach (T item in source)
{
if (predicate(item))
{
yield return item;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using System.Runtime.CompilerServices;

namespace Csdsa.Algorithms.StreamProccessing;

public static partial class StreamProcessing
{
/// <summary>
/// Asynchronously filters a sequence of values based on a predicate.
/// </summary>
/// <typeparam name="T">The element type of the async sequence.</typeparam>
/// <param name="source">The async sequence to filter.</param>
/// <param name="predicate">A function to test each element for a condition.</param>
/// <param name="cancellationToken">The cancellation token used to cancel the asynchronous operation.</param>
/// <returns>
/// An async sequence that contains elements from the input sequence that satisfy the condition.
/// </returns>
/// <exception cref="ArgumentNullException">
/// Thrown when <paramref name="source"/> or <paramref name="predicate"/> is <see langword="null"/>.
/// </exception>
public static async IAsyncEnumerable<T> FilterAsync<T>(
this IAsyncEnumerable<T> source,
Func<T, bool> predicate,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentNullException.ThrowIfNull(predicate);

await foreach (T item in source.WithCancellation(cancellationToken))
{
if (predicate(item))
{
yield return item;
}
}
}
}
24 changes: 24 additions & 0 deletions src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.ForEach.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
namespace Csdsa.Algorithms.StreamProccessing;

public static partial class StreamProcessing
{
/// <summary>
/// Performs the specified action on each element of the sequence.
/// </summary>
/// <typeparam name="T">The element type.</typeparam>
/// <param name="source">The sequence whose elements to process.</param>
/// <param name="action">The action to perform on each element.</param>
/// <exception cref="ArgumentNullException">
/// Thrown when <paramref name="source"/> or <paramref name="action"/> is <see langword="null"/>.
/// </exception>
public static void ForEach<T>(this IEnumerable<T> source, Action<T> action)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentNullException.ThrowIfNull(action);

foreach (T item in source)
{
action(item);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
namespace Csdsa.Algorithms.StreamProccessing;

public static partial class StreamProcessing
{
/// <summary>
/// Asynchronously performs the specified action on each element of an async sequence.
/// </summary>
/// <typeparam name="T">The element type of the async sequence.</typeparam>
/// <param name="source">The async sequence whose elements to process.</param>
/// <param name="action">The action to perform on each element.</param>
/// <param name="cancellationToken">The cancellation token used to cancel the asynchronous operation.</param>
/// <returns>A task that represents the asynchronous operation.</returns>
/// <exception cref="ArgumentNullException">
/// Thrown when <paramref name="source"/> or <paramref name="action"/> is <see langword="null"/>.
/// </exception>
public static async Task ForEachAsync<T>(
this IAsyncEnumerable<T> source,
Action<T> action,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentNullException.ThrowIfNull(action);

await foreach (T item in source.WithCancellation(cancellationToken))
{
action(item);
}
}
}
28 changes: 28 additions & 0 deletions src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Map.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
namespace Csdsa.Algorithms.StreamProccessing;

public static partial class StreamProcessing
{
/// <summary>
/// Projects each element of the sequence into a new form.
/// </summary>
/// <typeparam name="T">The source element type.</typeparam>
/// <typeparam name="TResult">The result element type.</typeparam>
/// <param name="source">The sequence whose elements to transform.</param>
/// <param name="selector">A transform function to apply to each element.</param>
/// <returns>An <see cref="IEnumerable{T}"/> whose elements are the result of invoking the transform function.</returns>
/// <exception cref="ArgumentNullException">
/// Thrown when <paramref name="source"/> or <paramref name="selector"/> is <see langword="null"/>.
/// </exception>
public static IEnumerable<TResult> Map<T, TResult>(
this IEnumerable<T> source,
Func<T, TResult> selector)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentNullException.ThrowIfNull(selector);

foreach (T item in source)
{
yield return selector(item);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using System.Runtime.CompilerServices;

namespace Csdsa.Algorithms.StreamProccessing;

public static partial class StreamProcessing
{
/// <summary>
/// Asynchronously projects each element of the async sequence into a new form.
/// </summary>
/// <typeparam name="T">The source element type.</typeparam>
/// <typeparam name="TResult">The result element type.</typeparam>
/// <param name="source">The async sequence whose elements to transform.</param>
/// <param name="selector">A transform function to apply to each element.</param>
/// <param name="cancellationToken">The cancellation token used to cancel the asynchronous operation.</param>
/// <returns>
/// An async sequence whose elements are the result of invoking the transform function
/// on each element of the source sequence.
/// </returns>
/// <exception cref="ArgumentNullException">
/// Thrown when <paramref name="source"/> or <paramref name="selector"/> is <see langword="null"/>.
/// </exception>
public static async IAsyncEnumerable<TResult> MapAsync<T, TResult>(
this IAsyncEnumerable<T> source,
Func<T, TResult> selector,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentNullException.ThrowIfNull(selector);

await foreach (T item in source.WithCancellation(cancellationToken))
{
yield return selector(item);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
namespace Csdsa.Algorithms.StreamProccessing;

public static partial class StreamProcessing
{
/// <summary>
/// Returns a sequence of consecutive element pairs from the source sequence.
/// </summary>
/// <typeparam name="T">The element type.</typeparam>
/// <param name="source">The source sequence.</param>
/// <returns>
/// An <see cref="IEnumerable{T}"/> of pairs, where each pair contains two consecutive
/// elements from the source sequence.
/// </returns>
/// <exception cref="ArgumentNullException">
/// Thrown when <paramref name="source"/> is <see langword="null"/>.
/// </exception>
public static IEnumerable<(T First, T Second)> Pairwise<T>(this IEnumerable<T> source)
{
ArgumentNullException.ThrowIfNull(source);

using IEnumerator<T> enumerator = source.GetEnumerator();

if (!enumerator.MoveNext())
{
yield break;
}

T previous = enumerator.Current;

while (enumerator.MoveNext())
{
yield return (previous, enumerator.Current);
previous = enumerator.Current;
}
}
}
Loading