From 70cf3982dbfb222ee8568e640fe011074703bcfc Mon Sep 17 00:00:00 2001 From: mavantgarderc Date: Sun, 7 Dec 2025 05:45:42 +0330 Subject: [PATCH 1/2] Algorithms(StreamProcessing) --- .../StreamProcessing.Chunk.cs | 43 ++++++++++++++++ .../StreamProcessing.ChunkAsync.cs | 50 ++++++++++++++++++ .../StreamProcessing.DistinctBy.cs | 35 +++++++++++++ .../StreamProcessing.DistinctByAsync.cs | 39 ++++++++++++++ .../StreamProcessing.Filter.cs | 30 +++++++++++ .../StreamProcessing.FilterAsync.cs | 36 +++++++++++++ .../StreamProcessing.ForEach.cs | 24 +++++++++ .../StreamProcessing.ForEachAsync.cs | 29 +++++++++++ .../StreamProccessing/StreamProcessing.Map.cs | 28 ++++++++++ .../StreamProcessing.MapAsync.cs | 35 +++++++++++++ .../StreamProcessing.Pairwise.cs | 36 +++++++++++++ .../StreamProcessing.PairwiseAsync.cs | 51 +++++++++++++++++++ .../StreamProcessing.Reduce.cs | 34 +++++++++++++ .../StreamProcessing.ReduceAsync.cs | 36 +++++++++++++ .../StreamProcessing.Skip.cs | 29 +++++++++++ .../StreamProcessing.SkipAsync.cs | 36 +++++++++++++ .../StreamProcessing.Take.cs | 31 +++++++++++ .../StreamProcessing.TakeAsync.cs | 37 ++++++++++++++ .../StreamProcessing.TakeUntil.cs | 36 +++++++++++++ .../StreamProcessing.TakeUntilAsync.cs | 39 ++++++++++++++ .../StreamProcessing.Window.cs | 43 ++++++++++++++++ .../StreamProcessing.WindowAsync.cs | 49 ++++++++++++++++++ .../StreamProccessing/StreamProcessing.cs | 44 ++++++++++++++++ 23 files changed, 850 insertions(+) create mode 100644 src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Chunk.cs create mode 100644 src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.ChunkAsync.cs create mode 100644 src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.DistinctBy.cs create mode 100644 src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.DistinctByAsync.cs create mode 100644 src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Filter.cs create mode 100644 src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.FilterAsync.cs create mode 100644 src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.ForEach.cs create mode 100644 src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.ForEachAsync.cs create mode 100644 src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Map.cs create mode 100644 src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.MapAsync.cs create mode 100644 src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Pairwise.cs create mode 100644 src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.PairwiseAsync.cs create mode 100644 src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Reduce.cs create mode 100644 src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.ReduceAsync.cs create mode 100644 src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Skip.cs create mode 100644 src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.SkipAsync.cs create mode 100644 src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Take.cs create mode 100644 src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.TakeAsync.cs create mode 100644 src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.TakeUntil.cs create mode 100644 src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.TakeUntilAsync.cs create mode 100644 src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Window.cs create mode 100644 src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.WindowAsync.cs create mode 100644 src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.cs diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Chunk.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Chunk.cs new file mode 100644 index 0000000..f4d59f8 --- /dev/null +++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Chunk.cs @@ -0,0 +1,43 @@ +namespace Csdsa.Algorithms.StreamProccessing; + +public static partial class StreamProcessing +{ + /// + /// Splits the source sequence into contiguous chunks of the specified size. + /// + /// The element type. + /// The source sequence. + /// The size of each chunk. Must be positive. + /// An of chunks, each chunk represented as a . + /// + /// Thrown when is . + /// + /// + /// Thrown when is less than or equal to zero. + /// + public static IEnumerable> Chunk( + this IEnumerable source, + int chunkSize) + { + ArgumentNullException.ThrowIfNull(source); + ArgumentOutOfRangeException.ThrowIfNegativeOrZero(chunkSize); + + List buffer = new List(chunkSize); + + foreach (T item in source) + { + buffer.Add(item); + + if (buffer.Count == chunkSize) + { + yield return new List(buffer); + buffer.Clear(); + } + } + + if (buffer.Count > 0) + { + yield return buffer; + } + } +} diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.ChunkAsync.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.ChunkAsync.cs new file mode 100644 index 0000000..a94582f --- /dev/null +++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.ChunkAsync.cs @@ -0,0 +1,50 @@ +using System.Runtime.CompilerServices; + +namespace Csdsa.Algorithms.StreamProccessing; + +public static partial class StreamProcessing +{ + /// + /// Asynchronously splits an async sequence into contiguous chunks of the specified size. + /// + /// The element type of the async sequence. + /// The async sequence to split into chunks. + /// The size of each chunk. Must be positive. + /// The cancellation token used to cancel the asynchronous operation. + /// + /// An async sequence of chunks, each represented as a , + /// containing up to elements. + /// + /// + /// Thrown when is . + /// + /// + /// Thrown when is less than or equal to zero. + /// + public static async IAsyncEnumerable> ChunkAsync( + this IAsyncEnumerable source, + int chunkSize, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(source); + ArgumentOutOfRangeException.ThrowIfNegativeOrZero(chunkSize); + + List buffer = new List(chunkSize); + + await foreach (T item in source.WithCancellation(cancellationToken)) + { + buffer.Add(item); + + if (buffer.Count == chunkSize) + { + yield return new List(buffer); + buffer.Clear(); + } + } + + if (buffer.Count > 0) + { + yield return buffer; + } + } +} diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.DistinctBy.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.DistinctBy.cs new file mode 100644 index 0000000..6345400 --- /dev/null +++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.DistinctBy.cs @@ -0,0 +1,35 @@ +namespace Csdsa.Algorithms.StreamProccessing; + +public static partial class StreamProcessing +{ + /// + /// Returns distinct elements from a sequence by using a specified key selector. + /// + /// The element type. + /// The type of the key returned by the selector. + /// The sequence to remove duplicate elements from. + /// A function to extract the key for each element. + /// + /// An that contains distinct elements from the source sequence. + /// + /// + /// Thrown when or is . + /// + public static IEnumerable DistinctBy( + this IEnumerable source, + Func keySelector) + { + ArgumentNullException.ThrowIfNull(source); + ArgumentNullException.ThrowIfNull(keySelector); + + HashSet seen = new HashSet(); + + foreach (T item in source) + { + if (seen.Add(keySelector(item))) + { + yield return item; + } + } + } +} diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.DistinctByAsync.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.DistinctByAsync.cs new file mode 100644 index 0000000..469a3f8 --- /dev/null +++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.DistinctByAsync.cs @@ -0,0 +1,39 @@ +using System.Runtime.CompilerServices; + +namespace Csdsa.Algorithms.StreamProccessing; + +public static partial class StreamProcessing +{ + /// + /// Asynchronously returns distinct elements from an async sequence by using a specified key selector. + /// + /// The element type of the async sequence. + /// The type of the key returned by the selector. + /// The async sequence to remove duplicate elements from. + /// A function to extract the key for each element. + /// The cancellation token used to cancel the asynchronous operation. + /// + /// An async sequence that contains distinct elements from the source sequence. + /// + /// + /// Thrown when or is . + /// + public static async IAsyncEnumerable DistinctByAsync( + this IAsyncEnumerable source, + Func keySelector, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(source); + ArgumentNullException.ThrowIfNull(keySelector); + + HashSet seen = new HashSet(); + + await foreach (T item in source.WithCancellation(cancellationToken)) + { + if (seen.Add(keySelector(item))) + { + yield return item; + } + } + } +} diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Filter.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Filter.cs new file mode 100644 index 0000000..cd766e4 --- /dev/null +++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Filter.cs @@ -0,0 +1,30 @@ +namespace Csdsa.Algorithms.StreamProccessing; + +public static partial class StreamProcessing +{ + /// + /// Filters a sequence of values based on a predicate. + /// + /// The element type of the sequence. + /// The sequence to filter. + /// A function to test each element for a condition. + /// An that contains elements from the input sequence that satisfy the condition. + /// + /// Thrown when or is . + /// + public static IEnumerable Filter( + this IEnumerable source, + Func predicate) + { + ArgumentNullException.ThrowIfNull(source); + ArgumentNullException.ThrowIfNull(predicate); + + foreach (T item in source) + { + if (predicate(item)) + { + yield return item; + } + } + } +} diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.FilterAsync.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.FilterAsync.cs new file mode 100644 index 0000000..68e5467 --- /dev/null +++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.FilterAsync.cs @@ -0,0 +1,36 @@ +using System.Runtime.CompilerServices; + +namespace Csdsa.Algorithms.StreamProccessing; + +public static partial class StreamProcessing +{ + /// + /// Asynchronously filters a sequence of values based on a predicate. + /// + /// The element type of the async sequence. + /// The async sequence to filter. + /// A function to test each element for a condition. + /// The cancellation token used to cancel the asynchronous operation. + /// + /// An async sequence that contains elements from the input sequence that satisfy the condition. + /// + /// + /// Thrown when or is . + /// + public static async IAsyncEnumerable FilterAsync( + this IAsyncEnumerable source, + Func predicate, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(source); + ArgumentNullException.ThrowIfNull(predicate); + + await foreach (T item in source.WithCancellation(cancellationToken)) + { + if (predicate(item)) + { + yield return item; + } + } + } +} diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.ForEach.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.ForEach.cs new file mode 100644 index 0000000..0ee5353 --- /dev/null +++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.ForEach.cs @@ -0,0 +1,24 @@ +namespace Csdsa.Algorithms.StreamProccessing; + +public static partial class StreamProcessing +{ + /// + /// Performs the specified action on each element of the sequence. + /// + /// The element type. + /// The sequence whose elements to process. + /// The action to perform on each element. + /// + /// Thrown when or is . + /// + public static void ForEach(this IEnumerable source, Action action) + { + ArgumentNullException.ThrowIfNull(source); + ArgumentNullException.ThrowIfNull(action); + + foreach (T item in source) + { + action(item); + } + } +} diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.ForEachAsync.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.ForEachAsync.cs new file mode 100644 index 0000000..6b9b1d6 --- /dev/null +++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.ForEachAsync.cs @@ -0,0 +1,29 @@ +namespace Csdsa.Algorithms.StreamProccessing; + +public static partial class StreamProcessing +{ + /// + /// Asynchronously performs the specified action on each element of an async sequence. + /// + /// The element type of the async sequence. + /// The async sequence whose elements to process. + /// The action to perform on each element. + /// The cancellation token used to cancel the asynchronous operation. + /// A task that represents the asynchronous operation. + /// + /// Thrown when or is . + /// + public static async Task ForEachAsync( + this IAsyncEnumerable source, + Action action, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(source); + ArgumentNullException.ThrowIfNull(action); + + await foreach (T item in source.WithCancellation(cancellationToken)) + { + action(item); + } + } +} diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Map.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Map.cs new file mode 100644 index 0000000..d0379ca --- /dev/null +++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Map.cs @@ -0,0 +1,28 @@ +namespace Csdsa.Algorithms.StreamProccessing; + +public static partial class StreamProcessing +{ + /// + /// Projects each element of the sequence into a new form. + /// + /// The source element type. + /// The result element type. + /// The sequence whose elements to transform. + /// A transform function to apply to each element. + /// An whose elements are the result of invoking the transform function. + /// + /// Thrown when or is . + /// + public static IEnumerable Map( + this IEnumerable source, + Func selector) + { + ArgumentNullException.ThrowIfNull(source); + ArgumentNullException.ThrowIfNull(selector); + + foreach (T item in source) + { + yield return selector(item); + } + } +} diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.MapAsync.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.MapAsync.cs new file mode 100644 index 0000000..2893a63 --- /dev/null +++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.MapAsync.cs @@ -0,0 +1,35 @@ +using System.Runtime.CompilerServices; + +namespace Csdsa.Algorithms.StreamProccessing; + +public static partial class StreamProcessing +{ + /// + /// Asynchronously projects each element of the async sequence into a new form. + /// + /// The source element type. + /// The result element type. + /// The async sequence whose elements to transform. + /// A transform function to apply to each element. + /// The cancellation token used to cancel the asynchronous operation. + /// + /// An async sequence whose elements are the result of invoking the transform function + /// on each element of the source sequence. + /// + /// + /// Thrown when or is . + /// + public static async IAsyncEnumerable MapAsync( + this IAsyncEnumerable source, + Func selector, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(source); + ArgumentNullException.ThrowIfNull(selector); + + await foreach (T item in source.WithCancellation(cancellationToken)) + { + yield return selector(item); + } + } +} diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Pairwise.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Pairwise.cs new file mode 100644 index 0000000..88d533c --- /dev/null +++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Pairwise.cs @@ -0,0 +1,36 @@ +namespace Csdsa.Algorithms.StreamProccessing; + +public static partial class StreamProcessing +{ + /// + /// Returns a sequence of consecutive element pairs from the source sequence. + /// + /// The element type. + /// The source sequence. + /// + /// An of pairs, where each pair contains two consecutive + /// elements from the source sequence. + /// + /// + /// Thrown when is . + /// + public static IEnumerable<(T First, T Second)> Pairwise(this IEnumerable source) + { + ArgumentNullException.ThrowIfNull(source); + + using IEnumerator enumerator = source.GetEnumerator(); + + if (!enumerator.MoveNext()) + { + yield break; + } + + T previous = enumerator.Current; + + while (enumerator.MoveNext()) + { + yield return (previous, enumerator.Current); + previous = enumerator.Current; + } + } +} diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.PairwiseAsync.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.PairwiseAsync.cs new file mode 100644 index 0000000..56c5c81 --- /dev/null +++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.PairwiseAsync.cs @@ -0,0 +1,51 @@ +using System.Runtime.CompilerServices; + +namespace Csdsa.Algorithms.StreamProccessing; + +public static partial class StreamProcessing +{ + /// + /// Asynchronously returns consecutive element pairs from an async sequence. + /// + /// The element type of the async sequence. + /// The async sequence. + /// The cancellation token used to cancel the asynchronous operation. + /// + /// An async sequence of pairs, where each pair contains two consecutive + /// elements from the source sequence. + /// + /// + /// Thrown when is . + /// + public static async IAsyncEnumerable<(T First, T Second)> PairwiseAsync( + this IAsyncEnumerable source, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(source); + + IAsyncEnumerator enumerator = source.GetAsyncEnumerator(cancellationToken); + + try + { + if (!await enumerator.MoveNextAsync().ConfigureAwait(false)) + { + yield break; + } + + T previous = enumerator.Current; + + while (await enumerator.MoveNextAsync().ConfigureAwait(false)) + { + yield return (previous, enumerator.Current); + previous = enumerator.Current; + } + } + finally + { + if (enumerator != null) + { + await enumerator.DisposeAsync().ConfigureAwait(false); + } + } + } +} diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Reduce.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Reduce.cs new file mode 100644 index 0000000..5b77260 --- /dev/null +++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Reduce.cs @@ -0,0 +1,34 @@ +namespace Csdsa.Algorithms.StreamProccessing; + +public static partial class StreamProcessing +{ + /// + /// Aggregates the elements of a sequence using the specified seed value and accumulator function. + /// + /// The element type. + /// The accumulator type. + /// The source sequence. + /// The initial accumulator value. + /// The accumulator function. + /// The final accumulator value. + /// + /// Thrown when or is . + /// + public static TAccumulate Reduce( + this IEnumerable source, + TAccumulate seed, + Func func) + { + ArgumentNullException.ThrowIfNull(source); + ArgumentNullException.ThrowIfNull(func); + + TAccumulate acc = seed; + + foreach (T item in source) + { + acc = func(acc, item); + } + + return acc; + } +} diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.ReduceAsync.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.ReduceAsync.cs new file mode 100644 index 0000000..399df7a --- /dev/null +++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.ReduceAsync.cs @@ -0,0 +1,36 @@ +namespace Csdsa.Algorithms.StreamProccessing; + +public static partial class StreamProcessing +{ + /// + /// Asynchronously aggregates the elements of an async sequence using the specified seed value and accumulator function. + /// + /// The element type. + /// The accumulator type. + /// The async sequence whose elements to aggregate. + /// The initial accumulator value. + /// The accumulator function. + /// The cancellation token used to cancel the asynchronous operation. + /// A task that represents the asynchronous aggregation operation. + /// + /// Thrown when or is . + /// + public static async Task ReduceAsync( + this IAsyncEnumerable source, + TAccumulate seed, + Func func, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(source); + ArgumentNullException.ThrowIfNull(func); + + TAccumulate acc = seed; + + await foreach (T item in source.WithCancellation(cancellationToken)) + { + acc = func(acc, item); + } + + return acc; + } +} diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Skip.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Skip.cs new file mode 100644 index 0000000..2d37c08 --- /dev/null +++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Skip.cs @@ -0,0 +1,29 @@ +namespace Csdsa.Algorithms.StreamProccessing; + +public static partial class StreamProcessing +{ + /// + /// Bypasses a specified number of elements in a sequence and then returns the remaining elements. + /// + /// The element type. + /// An to return elements from. + /// The number of elements to skip. + /// An that contains the elements that occur after the specified index. + /// + /// Thrown when is . + /// + public static IEnumerable Skip(this IEnumerable source, int count) + { + ArgumentNullException.ThrowIfNull(source); + + int i = 0; + + foreach (T item in source) + { + if (i++ >= count) + { + yield return item; + } + } + } +} diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.SkipAsync.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.SkipAsync.cs new file mode 100644 index 0000000..fbab4cc --- /dev/null +++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.SkipAsync.cs @@ -0,0 +1,36 @@ +using System.Runtime.CompilerServices; + +namespace Csdsa.Algorithms.StreamProccessing; + +public static partial class StreamProcessing +{ + /// + /// Asynchronously bypasses a specified number of elements in an async sequence + /// and then returns the remaining elements. + /// + /// The element type of the async sequence. + /// The async sequence to skip elements from. + /// The number of elements to skip. + /// The cancellation token used to cancel the asynchronous operation. + /// An async sequence that contains the elements after the skipped elements. + /// + /// Thrown when is . + /// + public static async IAsyncEnumerable SkipAsync( + this IAsyncEnumerable source, + int count, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(source); + + int i = 0; + + await foreach (T item in source.WithCancellation(cancellationToken)) + { + if (i++ >= count) + { + yield return item; + } + } + } +} diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Take.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Take.cs new file mode 100644 index 0000000..a2a9277 --- /dev/null +++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Take.cs @@ -0,0 +1,31 @@ +namespace Csdsa.Algorithms.StreamProccessing; + +public static partial class StreamProcessing +{ + /// + /// Returns a specified number of contiguous elements from the start of a sequence. + /// + /// The element type. + /// The sequence to return elements from. + /// The number of elements to return. + /// An that contains the specified number of elements. + /// + /// Thrown when is . + /// + public static IEnumerable Take(this IEnumerable source, int count) + { + ArgumentNullException.ThrowIfNull(source); + + int i = 0; + + foreach (T item in source) + { + if (i++ >= count) + { + yield break; + } + + yield return item; + } + } +} diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.TakeAsync.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.TakeAsync.cs new file mode 100644 index 0000000..fc7365c --- /dev/null +++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.TakeAsync.cs @@ -0,0 +1,37 @@ +using System.Runtime.CompilerServices; + +namespace Csdsa.Algorithms.StreamProccessing; + +public static partial class StreamProcessing +{ + /// + /// Asynchronously returns a specified number of contiguous elements from the start of an async sequence. + /// + /// The element type of the async sequence. + /// The async sequence to return elements from. + /// The number of elements to return. + /// The cancellation token used to cancel the asynchronous operation. + /// An async sequence that contains up to elements. + /// + /// Thrown when is . + /// + public static async IAsyncEnumerable TakeAsync( + this IAsyncEnumerable source, + int count, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(source); + + int i = 0; + + await foreach (T item in source.WithCancellation(cancellationToken)) + { + if (i++ >= count) + { + yield break; + } + + yield return item; + } + } +} diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.TakeUntil.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.TakeUntil.cs new file mode 100644 index 0000000..6212974 --- /dev/null +++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.TakeUntil.cs @@ -0,0 +1,36 @@ +namespace Csdsa.Algorithms.StreamProccessing; + +public static partial class StreamProcessing +{ + /// + /// Returns elements from the sequence until the specified predicate evaluates to true, + /// including the element that satisfies the predicate. + /// + /// The element type of the sequence. + /// The source sequence. + /// A function to test each element for a stop condition. + /// + /// An that returns elements from the input sequence + /// until the predicate is satisfied. + /// + /// + /// Thrown when or is . + /// + public static IEnumerable TakeUntil( + this IEnumerable source, + Func predicate) + { + ArgumentNullException.ThrowIfNull(source); + ArgumentNullException.ThrowIfNull(predicate); + + foreach (T item in source) + { + yield return item; + + if (predicate(item)) + { + yield break; + } + } + } +} diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.TakeUntilAsync.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.TakeUntilAsync.cs new file mode 100644 index 0000000..6f84d9f --- /dev/null +++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.TakeUntilAsync.cs @@ -0,0 +1,39 @@ +using System.Runtime.CompilerServices; + +namespace Csdsa.Algorithms.StreamProccessing; + +public static partial class StreamProcessing +{ + /// + /// Asynchronously returns elements from an async sequence until the specified predicate is satisfied, + /// including the element that satisfies the predicate. + /// + /// The element type of the async sequence. + /// The async sequence to return elements from. + /// A function to test each element for a stop condition. + /// The cancellation token used to cancel the asynchronous operation. + /// + /// An async sequence that returns elements from the input sequence until the predicate is satisfied. + /// + /// + /// Thrown when or is . + /// + public static async IAsyncEnumerable TakeUntilAsync( + this IAsyncEnumerable source, + Func predicate, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(source); + ArgumentNullException.ThrowIfNull(predicate); + + await foreach (T item in source.WithCancellation(cancellationToken)) + { + yield return item; + + if (predicate(item)) + { + yield break; + } + } + } +} diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Window.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Window.cs new file mode 100644 index 0000000..0b49bff --- /dev/null +++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Window.cs @@ -0,0 +1,43 @@ +namespace Csdsa.Algorithms.StreamProccessing; + +public static partial class StreamProcessing +{ + /// + /// Batches the source sequence into windows of the specified size. + /// + /// The element type of the sequence. + /// The source sequence. + /// The size of each window. Must be positive. + /// An of windows, each represented as a . + /// + /// Thrown when is . + /// + /// + /// Thrown when is less than or equal to zero. + /// + public static IEnumerable> Window( + this IEnumerable source, + int windowSize) + { + ArgumentNullException.ThrowIfNull(source); + ArgumentOutOfRangeException.ThrowIfNegativeOrZero(windowSize); + + List buffer = new List(windowSize); + + foreach (T item in source) + { + buffer.Add(item); + + if (buffer.Count == windowSize) + { + yield return new List(buffer); + buffer.Clear(); + } + } + + if (buffer.Count > 0) + { + yield return new List(buffer); + } + } +} diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.WindowAsync.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.WindowAsync.cs new file mode 100644 index 0000000..0175281 --- /dev/null +++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.WindowAsync.cs @@ -0,0 +1,49 @@ +using System.Runtime.CompilerServices; + +namespace Csdsa.Algorithms.StreamProccessing; + +public static partial class StreamProcessing +{ + /// + /// Asynchronously batches an async sequence into windows of the specified size. + /// + /// The element type of the async sequence. + /// The async sequence to batch. + /// The size of each window. Must be positive. + /// The cancellation token used to cancel the asynchronous operation. + /// + /// An async sequence of windows, each represented as a . + /// + /// + /// Thrown when is . + /// + /// + /// Thrown when is less than or equal to zero. + /// + public static async IAsyncEnumerable> WindowAsync( + this IAsyncEnumerable source, + int windowSize, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(source); + ArgumentOutOfRangeException.ThrowIfNegativeOrZero(windowSize); + + List buffer = new List(windowSize); + + await foreach (T item in source.WithCancellation(cancellationToken)) + { + buffer.Add(item); + + if (buffer.Count == windowSize) + { + yield return new List(buffer); + buffer.Clear(); + } + } + + if (buffer.Count > 0) + { + yield return buffer; + } + } +} diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.cs new file mode 100644 index 0000000..3978f75 --- /dev/null +++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.cs @@ -0,0 +1,44 @@ +namespace Csdsa.Algorithms.StreamProccessing; + +/// +/// Provides functional-style stream processing helpers for synchronous and asynchronous +/// sequences, focusing on composability and low-allocation patterns. +/// +/// Concepts: +/// +/// +/// +/// Functional transformations such as Map, Filter, and Reduce. +/// +/// +/// Batching and grouping via Window, Chunk, and Pairwise operations. +/// +/// +/// Uniqueness and distinctness via DistinctBy and DistinctByAsync. +/// +/// +/// Early termination and control flow using Take, Skip, and TakeUntil variants. +/// +/// +/// +/// Key practices: +/// +/// +/// +/// Using extension methods for fluent, chainable stream processing. +/// +/// +/// Covering both synchronous () +/// and asynchronous () data flows. +/// +/// +/// Being allocation-conscious for high-throughput scenarios. +/// +/// +/// Favoring composability and separation of concerns in algorithmic pipelines. +/// +/// +/// +public static partial class StreamProcessing +{ +} From 973dc080d23931b314c9f8b66c2ac92154dc3753 Mon Sep 17 00:00:00 2001 From: mavantgarderc Date: Sun, 7 Dec 2025 05:56:01 +0330 Subject: [PATCH 2/2] Algorithms.Tests(StreamProcessing) --- .../StreamProcessing.Chunk.Tests.cs | 24 ++++++++++++++++ .../StreamProcessing.ChunkAsync.Tests.cs | 28 +++++++++++++++++++ .../StreamProcessing.DistinctBy.Tests.cs | 24 ++++++++++++++++ .../StreamProcessing.DistinctByAsync.Tests.cs | 28 +++++++++++++++++++ .../StreamProcessing.Filter.Tests.cs | 21 ++++++++++++++ .../StreamProcessing.FilterAsync.Tests.cs | 25 +++++++++++++++++ .../StreamProcessing.ForEach.Tests.cs | 19 +++++++++++++ .../StreamProcessing.ForEachAync.Tests.cs | 21 ++++++++++++++ .../StreamProcessing.Helpers.Tests.cs | 23 +++++++++++++++ .../StreamProcessing.Map.Tests.cs | 21 ++++++++++++++ .../StreamProcessing.MapAsync.Tests.cs | 25 +++++++++++++++++ .../StreamProcessing.Pairwise.Tests.cs | 21 ++++++++++++++ .../StreamProcessing.PairwiseAsync.Tests.cs | 25 +++++++++++++++++ .../StreamProcessing.Reduce.Tests.cs | 18 ++++++++++++ .../StreamProcessing.ReduceAsync.Tests.cs | 20 +++++++++++++ .../StreamProcessing.Skip.Tests.cs | 21 ++++++++++++++ .../StreamProcessing.SkipAsync.Tests.cs | 25 +++++++++++++++++ .../StreamProcessing.Take.Tests.cs | 21 ++++++++++++++ .../StreamProcessing.TakeAsync.Tests.cs | 25 +++++++++++++++++ .../StreamProcessing.TakeUntil.Tests.cs | 21 ++++++++++++++ .../StreamProcessing.TakeUntilAsync.Tests.cs | 25 +++++++++++++++++ .../StreamProcessing.Window.Tests.cs | 24 ++++++++++++++++ 22 files changed, 505 insertions(+) create mode 100644 tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Chunk.Tests.cs create mode 100644 tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.ChunkAsync.Tests.cs create mode 100644 tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.DistinctBy.Tests.cs create mode 100644 tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.DistinctByAsync.Tests.cs create mode 100644 tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Filter.Tests.cs create mode 100644 tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.FilterAsync.Tests.cs create mode 100644 tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.ForEach.Tests.cs create mode 100644 tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.ForEachAync.Tests.cs create mode 100644 tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Helpers.Tests.cs create mode 100644 tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Map.Tests.cs create mode 100644 tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.MapAsync.Tests.cs create mode 100644 tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Pairwise.Tests.cs create mode 100644 tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.PairwiseAsync.Tests.cs create mode 100644 tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Reduce.Tests.cs create mode 100644 tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.ReduceAsync.Tests.cs create mode 100644 tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Skip.Tests.cs create mode 100644 tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.SkipAsync.Tests.cs create mode 100644 tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Take.Tests.cs create mode 100644 tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.TakeAsync.Tests.cs create mode 100644 tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.TakeUntil.Tests.cs create mode 100644 tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.TakeUntilAsync.Tests.cs create mode 100644 tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Window.Tests.cs diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Chunk.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Chunk.Tests.cs new file mode 100644 index 0000000..d809870 --- /dev/null +++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Chunk.Tests.cs @@ -0,0 +1,24 @@ +using System.Collections.Generic; +using System.Linq; + +using Csdsa.Algorithms.StreamProccessing; + +using Xunit; + +namespace Csdsa.Tests.Algorithms.StreamProccessing; + +public sealed partial class StreamProcessingTests +{ + [Fact] + public void Chunk_GroupsElements() + { + int[] source = { 1, 2, 3, 4, 5 }; + + List> result = StreamProcessing.Chunk(source, 2).ToList(); + + Assert.Equal(3, result.Count); + Assert.Equal(new[] { 1, 2 }, result[0]); + Assert.Equal(new[] { 3, 4 }, result[1]); + Assert.Equal(new[] { 5 }, result[2]); + } +} diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.ChunkAsync.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.ChunkAsync.Tests.cs new file mode 100644 index 0000000..133ca54 --- /dev/null +++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.ChunkAsync.Tests.cs @@ -0,0 +1,28 @@ +using System.Collections.Generic; +using System.Threading.Tasks; + +using Csdsa.Algorithms.StreamProccessing; + +using Xunit; + +namespace Csdsa.Tests.Algorithms.StreamProccessing; + +public sealed partial class StreamProcessingTests +{ + [Fact] + public async Task ChunkAsync_GroupsElements() + { + IAsyncEnumerable source = ToAsyncEnumerable(Int32Array12345); + + List> result = new List>(); + await foreach (List chunk in StreamProcessing.ChunkAsync(source, 2)) + { + result.Add(chunk); + } + + Assert.Equal(3, result.Count); + Assert.Equal(new[] { 1, 2 }, result[0]); + Assert.Equal(new[] { 3, 4 }, result[1]); + Assert.Equal(new[] { 5 }, result[2]); + } +} diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.DistinctBy.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.DistinctBy.Tests.cs new file mode 100644 index 0000000..14bbe4b --- /dev/null +++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.DistinctBy.Tests.cs @@ -0,0 +1,24 @@ +using System.Collections.Generic; +using System.Linq; + +using Csdsa.Algorithms.StreamProccessing; + +using Xunit; + +namespace Csdsa.Tests.Algorithms.StreamProccessing; + +public sealed partial class StreamProcessingTests +{ + [Fact] + public void DistinctBy_ReturnsDistinctItems() + { + string[] source = { "a", "aa", "b", "bb", "ccc" }; + + List result = StreamProcessing.DistinctBy(source, s => s.Length).ToList(); + + Assert.Contains("a", result); + Assert.Contains("aa", result); + Assert.Contains("ccc", result); + Assert.Equal(3, result.Count); + } +} diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.DistinctByAsync.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.DistinctByAsync.Tests.cs new file mode 100644 index 0000000..a7ec931 --- /dev/null +++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.DistinctByAsync.Tests.cs @@ -0,0 +1,28 @@ +using System.Collections.Generic; +using System.Threading.Tasks; + +using Csdsa.Algorithms.StreamProccessing; + +using Xunit; + +namespace Csdsa.Tests.Algorithms.StreamProccessing; + +public sealed partial class StreamProcessingTests +{ + [Fact] + public async Task DistinctByAsync_ReturnsDistinctItems() + { + IAsyncEnumerable source = ToAsyncEnumerable(StringArray); + + List result = new List(); + await foreach (string s in StreamProcessing.DistinctByAsync(source, x => x.Length)) + { + result.Add(s); + } + + Assert.Contains("a", result); + Assert.Contains("aa", result); + Assert.Contains("ccc", result); + Assert.Equal(3, result.Count); + } +} diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Filter.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Filter.Tests.cs new file mode 100644 index 0000000..134135b --- /dev/null +++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Filter.Tests.cs @@ -0,0 +1,21 @@ +using System.Collections.Generic; +using System.Linq; + +using Csdsa.Algorithms.StreamProccessing; + +using Xunit; + +namespace Csdsa.Tests.Algorithms.StreamProccessing; + +public sealed partial class StreamProcessingTests +{ + [Fact] + public void Filter_FiltersElements() + { + int[] source = { 1, 2, 3, 4 }; + + List result = StreamProcessing.Filter(source, x => x % 2 == 0).ToList(); + + Assert.Equal(new[] { 2, 4 }, result); + } +} diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.FilterAsync.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.FilterAsync.Tests.cs new file mode 100644 index 0000000..a19ba97 --- /dev/null +++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.FilterAsync.Tests.cs @@ -0,0 +1,25 @@ +using System.Collections.Generic; +using System.Threading.Tasks; + +using Csdsa.Algorithms.StreamProccessing; + +using Xunit; + +namespace Csdsa.Tests.Algorithms.StreamProccessing; + +public sealed partial class StreamProcessingTests +{ + [Fact] + public async Task FilterAsync_FiltersElements() + { + IAsyncEnumerable source = ToAsyncEnumerable(Int32Array1234); + + List result = new List(); + await foreach (int x in StreamProcessing.FilterAsync(source, y => y % 2 == 0)) + { + result.Add(x); + } + + Assert.Equal(new[] { 2, 4 }, result); + } +} diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.ForEach.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.ForEach.Tests.cs new file mode 100644 index 0000000..f3bb863 --- /dev/null +++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.ForEach.Tests.cs @@ -0,0 +1,19 @@ +using Csdsa.Algorithms.StreamProccessing; + +using Xunit; + +namespace Csdsa.Tests.Algorithms.StreamProccessing; + +public sealed partial class StreamProcessingTests +{ + [Fact] + public void ForEach_PerformsAction() + { + int[] source = { 1, 2, 3 }; + int sum = 0; + + StreamProcessing.ForEach(source, x => sum += x); + + Assert.Equal(6, sum); + } +} diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.ForEachAync.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.ForEachAync.Tests.cs new file mode 100644 index 0000000..735ca6f --- /dev/null +++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.ForEachAync.Tests.cs @@ -0,0 +1,21 @@ +using System.Threading.Tasks; + +using Csdsa.Algorithms.StreamProccessing; + +using Xunit; + +namespace Csdsa.Tests.Algorithms.StreamProccessing; + +public sealed partial class StreamProcessingTests +{ + [Fact] + public async Task ForEachAsync_PerformsAction() + { + IAsyncEnumerable source = ToAsyncEnumerable(Int32Array123); + int sum = 0; + + await StreamProcessing.ForEachAsync(source, x => sum += x); + + Assert.Equal(6, sum); + } +} diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Helpers.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Helpers.Tests.cs new file mode 100644 index 0000000..2011dca --- /dev/null +++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Helpers.Tests.cs @@ -0,0 +1,23 @@ +using System.Collections.Generic; +using System.Threading.Tasks; + +using Csdsa.Algorithms.StreamProccessing; + +namespace Csdsa.Tests.Algorithms.StreamProccessing; + +public sealed partial class StreamProcessingTests +{ + private static async IAsyncEnumerable ToAsyncEnumerable(IEnumerable source) + { + foreach (T item in source) + { + await Task.Yield(); + yield return item; + } + } + + private static readonly int[] Int32Array123 = { 1, 2, 3 }; + private static readonly int[] Int32Array1234 = { 1, 2, 3, 4 }; + private static readonly int[] Int32Array12345 = { 1, 2, 3, 4, 5 }; + private static readonly string[] StringArray = { "a", "aa", "b", "bb", "ccc" }; +} diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Map.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Map.Tests.cs new file mode 100644 index 0000000..b8b44de --- /dev/null +++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Map.Tests.cs @@ -0,0 +1,21 @@ +using System.Collections.Generic; +using System.Linq; + +using Csdsa.Algorithms.StreamProccessing; + +using Xunit; + +namespace Csdsa.Tests.Algorithms.StreamProccessing; + +public sealed partial class StreamProcessingTests +{ + [Fact] + public void Map_TransformsElements() + { + int[] source = { 1, 2, 3 }; + + List result = StreamProcessing.Map(source, x => x * 2).ToList(); + + Assert.Equal(new[] { 2, 4, 6 }, result); + } +} diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.MapAsync.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.MapAsync.Tests.cs new file mode 100644 index 0000000..aee3afb --- /dev/null +++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.MapAsync.Tests.cs @@ -0,0 +1,25 @@ +using System.Collections.Generic; +using System.Threading.Tasks; + +using Csdsa.Algorithms.StreamProccessing; + +using Xunit; + +namespace Csdsa.Tests.Algorithms.StreamProccessing; + +public sealed partial class StreamProcessingTests +{ + [Fact] + public async Task MapAsync_TransformsElements() + { + IAsyncEnumerable source = ToAsyncEnumerable(Int32Array123); + + List result = new List(); + await foreach (int x in StreamProcessing.MapAsync(source, y => y * 2)) + { + result.Add(x); + } + + Assert.Equal(new[] { 2, 4, 6 }, result); + } +} diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Pairwise.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Pairwise.Tests.cs new file mode 100644 index 0000000..c24bc25 --- /dev/null +++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Pairwise.Tests.cs @@ -0,0 +1,21 @@ +using System.Collections.Generic; +using System.Linq; + +using Csdsa.Algorithms.StreamProccessing; + +using Xunit; + +namespace Csdsa.Tests.Algorithms.StreamProccessing; + +public sealed partial class StreamProcessingTests +{ + [Fact] + public void Pairwise_ReturnsConsecutivePairs() + { + int[] source = { 1, 2, 3 }; + + List<(int First, int Second)> result = StreamProcessing.Pairwise(source).ToList(); + + Assert.Equal(new (int, int)[] { (1, 2), (2, 3) }, result); + } +} diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.PairwiseAsync.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.PairwiseAsync.Tests.cs new file mode 100644 index 0000000..ed0205a --- /dev/null +++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.PairwiseAsync.Tests.cs @@ -0,0 +1,25 @@ +using System.Collections.Generic; +using System.Threading.Tasks; + +using Csdsa.Algorithms.StreamProccessing; + +using Xunit; + +namespace Csdsa.Tests.Algorithms.StreamProccessing; + +public sealed partial class StreamProcessingTests +{ + [Fact] + public async Task PairwiseAsync_ReturnsConsecutivePairs() + { + IAsyncEnumerable source = ToAsyncEnumerable(Int32Array123); + + List<(int First, int Second)> result = new List<(int First, int Second)>(); + await foreach ((int First, int Second) pair in StreamProcessing.PairwiseAsync(source)) + { + result.Add(pair); + } + + Assert.Equal(new (int, int)[] { (1, 2), (2, 3) }, result); + } +} diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Reduce.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Reduce.Tests.cs new file mode 100644 index 0000000..6e0b78e --- /dev/null +++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Reduce.Tests.cs @@ -0,0 +1,18 @@ +using Csdsa.Algorithms.StreamProccessing; + +using Xunit; + +namespace Csdsa.Tests.Algorithms.StreamProccessing; + +public sealed partial class StreamProcessingTests +{ + [Fact] + public void Reduce_SumsElements() + { + int[] source = { 1, 2, 3 }; + + int result = StreamProcessing.Reduce(source, 0, (acc, x) => acc + x); + + Assert.Equal(6, result); + } +} diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.ReduceAsync.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.ReduceAsync.Tests.cs new file mode 100644 index 0000000..6512572 --- /dev/null +++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.ReduceAsync.Tests.cs @@ -0,0 +1,20 @@ +using System.Threading.Tasks; + +using Csdsa.Algorithms.StreamProccessing; + +using Xunit; + +namespace Csdsa.Tests.Algorithms.StreamProccessing; + +public sealed partial class StreamProcessingTests +{ + [Fact] + public async Task ReduceAsync_SumsElements() + { + IAsyncEnumerable source = ToAsyncEnumerable(Int32Array123); + + int sum = await StreamProcessing.ReduceAsync(source, 0, (acc, x) => acc + x); + + Assert.Equal(6, sum); + } +} diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Skip.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Skip.Tests.cs new file mode 100644 index 0000000..6a49351 --- /dev/null +++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Skip.Tests.cs @@ -0,0 +1,21 @@ +using System.Collections.Generic; +using System.Linq; + +using Csdsa.Algorithms.StreamProccessing; + +using Xunit; + +namespace Csdsa.Tests.Algorithms.StreamProccessing; + +public sealed partial class StreamProcessingTests +{ + [Fact] + public void Skip_SkipsElements() + { + int[] source = { 1, 2, 3, 4 }; + + List result = StreamProcessing.Skip(source, 2).ToList(); + + Assert.Equal(new[] { 3, 4 }, result); + } +} diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.SkipAsync.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.SkipAsync.Tests.cs new file mode 100644 index 0000000..32a2a35 --- /dev/null +++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.SkipAsync.Tests.cs @@ -0,0 +1,25 @@ +using System.Collections.Generic; +using System.Threading.Tasks; + +using Csdsa.Algorithms.StreamProccessing; + +using Xunit; + +namespace Csdsa.Tests.Algorithms.StreamProccessing; + +public sealed partial class StreamProcessingTests +{ + [Fact] + public async Task SkipAsync_SkipsElements() + { + IAsyncEnumerable source = ToAsyncEnumerable(Int32Array1234); + + List result = new List(); + await foreach (int x in StreamProcessing.SkipAsync(source, 2)) + { + result.Add(x); + } + + Assert.Equal(new[] { 3, 4 }, result); + } +} diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Take.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Take.Tests.cs new file mode 100644 index 0000000..9c25e10 --- /dev/null +++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Take.Tests.cs @@ -0,0 +1,21 @@ +using System.Collections.Generic; +using System.Linq; + +using Csdsa.Algorithms.StreamProccessing; + +using Xunit; + +namespace Csdsa.Tests.Algorithms.StreamProccessing; + +public sealed partial class StreamProcessingTests +{ + [Fact] + public void Take_TakesElements() + { + int[] source = { 1, 2, 3, 4 }; + + List result = StreamProcessing.Take(source, 2).ToList(); + + Assert.Equal(new[] { 1, 2 }, result); + } +} diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.TakeAsync.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.TakeAsync.Tests.cs new file mode 100644 index 0000000..ddb9127 --- /dev/null +++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.TakeAsync.Tests.cs @@ -0,0 +1,25 @@ +using System.Collections.Generic; +using System.Threading.Tasks; + +using Csdsa.Algorithms.StreamProccessing; + +using Xunit; + +namespace Csdsa.Tests.Algorithms.StreamProccessing; + +public sealed partial class StreamProcessingTests +{ + [Fact] + public async Task TakeAsync_TakesElements() + { + IAsyncEnumerable source = ToAsyncEnumerable(Int32Array1234); + + List result = new List(); + await foreach (int x in StreamProcessing.TakeAsync(source, 2)) + { + result.Add(x); + } + + Assert.Equal(new[] { 1, 2 }, result); + } +} diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.TakeUntil.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.TakeUntil.Tests.cs new file mode 100644 index 0000000..646ddef --- /dev/null +++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.TakeUntil.Tests.cs @@ -0,0 +1,21 @@ +using System.Collections.Generic; +using System.Linq; + +using Csdsa.Algorithms.StreamProccessing; + +using Xunit; + +namespace Csdsa.Tests.Algorithms.StreamProccessing; + +public sealed partial class StreamProcessingTests +{ + [Fact] + public void TakeUntil_StopsAtPredicate() + { + int[] source = { 1, 2, 3, 4, 5 }; + + List result = StreamProcessing.TakeUntil(source, x => x == 3).ToList(); + + Assert.Equal(new[] { 1, 2, 3 }, result); + } +} diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.TakeUntilAsync.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.TakeUntilAsync.Tests.cs new file mode 100644 index 0000000..ea25863 --- /dev/null +++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.TakeUntilAsync.Tests.cs @@ -0,0 +1,25 @@ +using System.Collections.Generic; +using System.Threading.Tasks; + +using Csdsa.Algorithms.StreamProccessing; + +using Xunit; + +namespace Csdsa.Tests.Algorithms.StreamProccessing; + +public sealed partial class StreamProcessingTests +{ + [Fact] + public async Task TakeUntilAsync_StopsAtPredicate() + { + IAsyncEnumerable source = ToAsyncEnumerable(Int32Array12345); + + List result = new List(); + await foreach (int x in StreamProcessing.TakeUntilAsync(source, y => y == 3)) + { + result.Add(x); + } + + Assert.Equal(new[] { 1, 2, 3 }, result); + } +} diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Window.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Window.Tests.cs new file mode 100644 index 0000000..9e0fedb --- /dev/null +++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Window.Tests.cs @@ -0,0 +1,24 @@ +using System.Collections.Generic; +using System.Linq; + +using Csdsa.Algorithms.StreamProccessing; + +using Xunit; + +namespace Csdsa.Tests.Algorithms.StreamProccessing; + +public sealed partial class StreamProcessingTests +{ + [Fact] + public void Window_BatchesElements() + { + int[] source = { 1, 2, 3, 4, 5 }; + + List> result = StreamProcessing.Window(source, 2).ToList(); + + Assert.Equal(3, result.Count); + Assert.Equal(new[] { 1, 2 }, result[0]); + Assert.Equal(new[] { 3, 4 }, result[1]); + Assert.Equal(new[] { 5 }, result[2]); + } +}