diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Chunk.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Chunk.cs
new file mode 100644
index 0000000..f4d59f8
--- /dev/null
+++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Chunk.cs
@@ -0,0 +1,43 @@
+namespace Csdsa.Algorithms.StreamProccessing;
+
+public static partial class StreamProcessing
+{
+ ///
+ /// Splits the source sequence into contiguous chunks of the specified size.
+ ///
+ /// The element type.
+ /// The source sequence.
+ /// The size of each chunk. Must be positive.
+ /// An of chunks, each chunk represented as a .
+ ///
+ /// Thrown when is .
+ ///
+ ///
+ /// Thrown when is less than or equal to zero.
+ ///
+ public static IEnumerable> Chunk(
+ this IEnumerable source,
+ int chunkSize)
+ {
+ ArgumentNullException.ThrowIfNull(source);
+ ArgumentOutOfRangeException.ThrowIfNegativeOrZero(chunkSize);
+
+ List buffer = new List(chunkSize);
+
+ foreach (T item in source)
+ {
+ buffer.Add(item);
+
+ if (buffer.Count == chunkSize)
+ {
+ yield return new List(buffer);
+ buffer.Clear();
+ }
+ }
+
+ if (buffer.Count > 0)
+ {
+ yield return buffer;
+ }
+ }
+}
diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.ChunkAsync.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.ChunkAsync.cs
new file mode 100644
index 0000000..a94582f
--- /dev/null
+++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.ChunkAsync.cs
@@ -0,0 +1,50 @@
+using System.Runtime.CompilerServices;
+
+namespace Csdsa.Algorithms.StreamProccessing;
+
+public static partial class StreamProcessing
+{
+ ///
+ /// Asynchronously splits an async sequence into contiguous chunks of the specified size.
+ ///
+ /// The element type of the async sequence.
+ /// The async sequence to split into chunks.
+ /// The size of each chunk. Must be positive.
+ /// The cancellation token used to cancel the asynchronous operation.
+ ///
+ /// An async sequence of chunks, each represented as a ,
+ /// containing up to elements.
+ ///
+ ///
+ /// Thrown when is .
+ ///
+ ///
+ /// Thrown when is less than or equal to zero.
+ ///
+ public static async IAsyncEnumerable> ChunkAsync(
+ this IAsyncEnumerable source,
+ int chunkSize,
+ [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ {
+ ArgumentNullException.ThrowIfNull(source);
+ ArgumentOutOfRangeException.ThrowIfNegativeOrZero(chunkSize);
+
+ List buffer = new List(chunkSize);
+
+ await foreach (T item in source.WithCancellation(cancellationToken))
+ {
+ buffer.Add(item);
+
+ if (buffer.Count == chunkSize)
+ {
+ yield return new List(buffer);
+ buffer.Clear();
+ }
+ }
+
+ if (buffer.Count > 0)
+ {
+ yield return buffer;
+ }
+ }
+}
diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.DistinctBy.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.DistinctBy.cs
new file mode 100644
index 0000000..6345400
--- /dev/null
+++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.DistinctBy.cs
@@ -0,0 +1,35 @@
+namespace Csdsa.Algorithms.StreamProccessing;
+
+public static partial class StreamProcessing
+{
+ ///
+ /// Returns distinct elements from a sequence by using a specified key selector.
+ ///
+ /// The element type.
+ /// The type of the key returned by the selector.
+ /// The sequence to remove duplicate elements from.
+ /// A function to extract the key for each element.
+ ///
+ /// An that contains distinct elements from the source sequence.
+ ///
+ ///
+ /// Thrown when or is .
+ ///
+ public static IEnumerable DistinctBy(
+ this IEnumerable source,
+ Func keySelector)
+ {
+ ArgumentNullException.ThrowIfNull(source);
+ ArgumentNullException.ThrowIfNull(keySelector);
+
+ HashSet seen = new HashSet();
+
+ foreach (T item in source)
+ {
+ if (seen.Add(keySelector(item)))
+ {
+ yield return item;
+ }
+ }
+ }
+}
diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.DistinctByAsync.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.DistinctByAsync.cs
new file mode 100644
index 0000000..469a3f8
--- /dev/null
+++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.DistinctByAsync.cs
@@ -0,0 +1,39 @@
+using System.Runtime.CompilerServices;
+
+namespace Csdsa.Algorithms.StreamProccessing;
+
+public static partial class StreamProcessing
+{
+ ///
+ /// Asynchronously returns distinct elements from an async sequence by using a specified key selector.
+ ///
+ /// The element type of the async sequence.
+ /// The type of the key returned by the selector.
+ /// The async sequence to remove duplicate elements from.
+ /// A function to extract the key for each element.
+ /// The cancellation token used to cancel the asynchronous operation.
+ ///
+ /// An async sequence that contains distinct elements from the source sequence.
+ ///
+ ///
+ /// Thrown when or is .
+ ///
+ public static async IAsyncEnumerable DistinctByAsync(
+ this IAsyncEnumerable source,
+ Func keySelector,
+ [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ {
+ ArgumentNullException.ThrowIfNull(source);
+ ArgumentNullException.ThrowIfNull(keySelector);
+
+ HashSet seen = new HashSet();
+
+ await foreach (T item in source.WithCancellation(cancellationToken))
+ {
+ if (seen.Add(keySelector(item)))
+ {
+ yield return item;
+ }
+ }
+ }
+}
diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Filter.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Filter.cs
new file mode 100644
index 0000000..cd766e4
--- /dev/null
+++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Filter.cs
@@ -0,0 +1,30 @@
+namespace Csdsa.Algorithms.StreamProccessing;
+
+public static partial class StreamProcessing
+{
+ ///
+ /// Filters a sequence of values based on a predicate.
+ ///
+ /// The element type of the sequence.
+ /// The sequence to filter.
+ /// A function to test each element for a condition.
+ /// An that contains elements from the input sequence that satisfy the condition.
+ ///
+ /// Thrown when or is .
+ ///
+ public static IEnumerable Filter(
+ this IEnumerable source,
+ Func predicate)
+ {
+ ArgumentNullException.ThrowIfNull(source);
+ ArgumentNullException.ThrowIfNull(predicate);
+
+ foreach (T item in source)
+ {
+ if (predicate(item))
+ {
+ yield return item;
+ }
+ }
+ }
+}
diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.FilterAsync.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.FilterAsync.cs
new file mode 100644
index 0000000..68e5467
--- /dev/null
+++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.FilterAsync.cs
@@ -0,0 +1,36 @@
+using System.Runtime.CompilerServices;
+
+namespace Csdsa.Algorithms.StreamProccessing;
+
+public static partial class StreamProcessing
+{
+ ///
+ /// Asynchronously filters a sequence of values based on a predicate.
+ ///
+ /// The element type of the async sequence.
+ /// The async sequence to filter.
+ /// A function to test each element for a condition.
+ /// The cancellation token used to cancel the asynchronous operation.
+ ///
+ /// An async sequence that contains elements from the input sequence that satisfy the condition.
+ ///
+ ///
+ /// Thrown when or is .
+ ///
+ public static async IAsyncEnumerable FilterAsync(
+ this IAsyncEnumerable source,
+ Func predicate,
+ [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ {
+ ArgumentNullException.ThrowIfNull(source);
+ ArgumentNullException.ThrowIfNull(predicate);
+
+ await foreach (T item in source.WithCancellation(cancellationToken))
+ {
+ if (predicate(item))
+ {
+ yield return item;
+ }
+ }
+ }
+}
diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.ForEach.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.ForEach.cs
new file mode 100644
index 0000000..0ee5353
--- /dev/null
+++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.ForEach.cs
@@ -0,0 +1,24 @@
+namespace Csdsa.Algorithms.StreamProccessing;
+
+public static partial class StreamProcessing
+{
+ ///
+ /// Performs the specified action on each element of the sequence.
+ ///
+ /// The element type.
+ /// The sequence whose elements to process.
+ /// The action to perform on each element.
+ ///
+ /// Thrown when or is .
+ ///
+ public static void ForEach(this IEnumerable source, Action action)
+ {
+ ArgumentNullException.ThrowIfNull(source);
+ ArgumentNullException.ThrowIfNull(action);
+
+ foreach (T item in source)
+ {
+ action(item);
+ }
+ }
+}
diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.ForEachAsync.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.ForEachAsync.cs
new file mode 100644
index 0000000..6b9b1d6
--- /dev/null
+++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.ForEachAsync.cs
@@ -0,0 +1,29 @@
+namespace Csdsa.Algorithms.StreamProccessing;
+
+public static partial class StreamProcessing
+{
+ ///
+ /// Asynchronously performs the specified action on each element of an async sequence.
+ ///
+ /// The element type of the async sequence.
+ /// The async sequence whose elements to process.
+ /// The action to perform on each element.
+ /// The cancellation token used to cancel the asynchronous operation.
+ /// A task that represents the asynchronous operation.
+ ///
+ /// Thrown when or is .
+ ///
+ public static async Task ForEachAsync(
+ this IAsyncEnumerable source,
+ Action action,
+ CancellationToken cancellationToken = default)
+ {
+ ArgumentNullException.ThrowIfNull(source);
+ ArgumentNullException.ThrowIfNull(action);
+
+ await foreach (T item in source.WithCancellation(cancellationToken))
+ {
+ action(item);
+ }
+ }
+}
diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Map.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Map.cs
new file mode 100644
index 0000000..d0379ca
--- /dev/null
+++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Map.cs
@@ -0,0 +1,28 @@
+namespace Csdsa.Algorithms.StreamProccessing;
+
+public static partial class StreamProcessing
+{
+ ///
+ /// Projects each element of the sequence into a new form.
+ ///
+ /// The source element type.
+ /// The result element type.
+ /// The sequence whose elements to transform.
+ /// A transform function to apply to each element.
+ /// An whose elements are the result of invoking the transform function.
+ ///
+ /// Thrown when or is .
+ ///
+ public static IEnumerable Map(
+ this IEnumerable source,
+ Func selector)
+ {
+ ArgumentNullException.ThrowIfNull(source);
+ ArgumentNullException.ThrowIfNull(selector);
+
+ foreach (T item in source)
+ {
+ yield return selector(item);
+ }
+ }
+}
diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.MapAsync.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.MapAsync.cs
new file mode 100644
index 0000000..2893a63
--- /dev/null
+++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.MapAsync.cs
@@ -0,0 +1,35 @@
+using System.Runtime.CompilerServices;
+
+namespace Csdsa.Algorithms.StreamProccessing;
+
+public static partial class StreamProcessing
+{
+ ///
+ /// Asynchronously projects each element of the async sequence into a new form.
+ ///
+ /// The source element type.
+ /// The result element type.
+ /// The async sequence whose elements to transform.
+ /// A transform function to apply to each element.
+ /// The cancellation token used to cancel the asynchronous operation.
+ ///
+ /// An async sequence whose elements are the result of invoking the transform function
+ /// on each element of the source sequence.
+ ///
+ ///
+ /// Thrown when or is .
+ ///
+ public static async IAsyncEnumerable MapAsync(
+ this IAsyncEnumerable source,
+ Func selector,
+ [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ {
+ ArgumentNullException.ThrowIfNull(source);
+ ArgumentNullException.ThrowIfNull(selector);
+
+ await foreach (T item in source.WithCancellation(cancellationToken))
+ {
+ yield return selector(item);
+ }
+ }
+}
diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Pairwise.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Pairwise.cs
new file mode 100644
index 0000000..88d533c
--- /dev/null
+++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Pairwise.cs
@@ -0,0 +1,36 @@
+namespace Csdsa.Algorithms.StreamProccessing;
+
+public static partial class StreamProcessing
+{
+ ///
+ /// Returns a sequence of consecutive element pairs from the source sequence.
+ ///
+ /// The element type.
+ /// The source sequence.
+ ///
+ /// An of pairs, where each pair contains two consecutive
+ /// elements from the source sequence.
+ ///
+ ///
+ /// Thrown when is .
+ ///
+ public static IEnumerable<(T First, T Second)> Pairwise(this IEnumerable source)
+ {
+ ArgumentNullException.ThrowIfNull(source);
+
+ using IEnumerator enumerator = source.GetEnumerator();
+
+ if (!enumerator.MoveNext())
+ {
+ yield break;
+ }
+
+ T previous = enumerator.Current;
+
+ while (enumerator.MoveNext())
+ {
+ yield return (previous, enumerator.Current);
+ previous = enumerator.Current;
+ }
+ }
+}
diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.PairwiseAsync.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.PairwiseAsync.cs
new file mode 100644
index 0000000..56c5c81
--- /dev/null
+++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.PairwiseAsync.cs
@@ -0,0 +1,51 @@
+using System.Runtime.CompilerServices;
+
+namespace Csdsa.Algorithms.StreamProccessing;
+
+public static partial class StreamProcessing
+{
+ ///
+ /// Asynchronously returns consecutive element pairs from an async sequence.
+ ///
+ /// The element type of the async sequence.
+ /// The async sequence.
+ /// The cancellation token used to cancel the asynchronous operation.
+ ///
+ /// An async sequence of pairs, where each pair contains two consecutive
+ /// elements from the source sequence.
+ ///
+ ///
+ /// Thrown when is .
+ ///
+ public static async IAsyncEnumerable<(T First, T Second)> PairwiseAsync(
+ this IAsyncEnumerable source,
+ [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ {
+ ArgumentNullException.ThrowIfNull(source);
+
+ IAsyncEnumerator enumerator = source.GetAsyncEnumerator(cancellationToken);
+
+ try
+ {
+ if (!await enumerator.MoveNextAsync().ConfigureAwait(false))
+ {
+ yield break;
+ }
+
+ T previous = enumerator.Current;
+
+ while (await enumerator.MoveNextAsync().ConfigureAwait(false))
+ {
+ yield return (previous, enumerator.Current);
+ previous = enumerator.Current;
+ }
+ }
+ finally
+ {
+ if (enumerator != null)
+ {
+ await enumerator.DisposeAsync().ConfigureAwait(false);
+ }
+ }
+ }
+}
diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Reduce.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Reduce.cs
new file mode 100644
index 0000000..5b77260
--- /dev/null
+++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Reduce.cs
@@ -0,0 +1,34 @@
+namespace Csdsa.Algorithms.StreamProccessing;
+
+public static partial class StreamProcessing
+{
+ ///
+ /// Aggregates the elements of a sequence using the specified seed value and accumulator function.
+ ///
+ /// The element type.
+ /// The accumulator type.
+ /// The source sequence.
+ /// The initial accumulator value.
+ /// The accumulator function.
+ /// The final accumulator value.
+ ///
+ /// Thrown when or is .
+ ///
+ public static TAccumulate Reduce(
+ this IEnumerable source,
+ TAccumulate seed,
+ Func func)
+ {
+ ArgumentNullException.ThrowIfNull(source);
+ ArgumentNullException.ThrowIfNull(func);
+
+ TAccumulate acc = seed;
+
+ foreach (T item in source)
+ {
+ acc = func(acc, item);
+ }
+
+ return acc;
+ }
+}
diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.ReduceAsync.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.ReduceAsync.cs
new file mode 100644
index 0000000..399df7a
--- /dev/null
+++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.ReduceAsync.cs
@@ -0,0 +1,36 @@
+namespace Csdsa.Algorithms.StreamProccessing;
+
+public static partial class StreamProcessing
+{
+ ///
+ /// Asynchronously aggregates the elements of an async sequence using the specified seed value and accumulator function.
+ ///
+ /// The element type.
+ /// The accumulator type.
+ /// The async sequence whose elements to aggregate.
+ /// The initial accumulator value.
+ /// The accumulator function.
+ /// The cancellation token used to cancel the asynchronous operation.
+ /// A task that represents the asynchronous aggregation operation.
+ ///
+ /// Thrown when or is .
+ ///
+ public static async Task ReduceAsync(
+ this IAsyncEnumerable source,
+ TAccumulate seed,
+ Func func,
+ CancellationToken cancellationToken = default)
+ {
+ ArgumentNullException.ThrowIfNull(source);
+ ArgumentNullException.ThrowIfNull(func);
+
+ TAccumulate acc = seed;
+
+ await foreach (T item in source.WithCancellation(cancellationToken))
+ {
+ acc = func(acc, item);
+ }
+
+ return acc;
+ }
+}
diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Skip.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Skip.cs
new file mode 100644
index 0000000..2d37c08
--- /dev/null
+++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Skip.cs
@@ -0,0 +1,29 @@
+namespace Csdsa.Algorithms.StreamProccessing;
+
+public static partial class StreamProcessing
+{
+ ///
+ /// Bypasses a specified number of elements in a sequence and then returns the remaining elements.
+ ///
+ /// The element type.
+ /// An to return elements from.
+ /// The number of elements to skip.
+ /// An that contains the elements that occur after the specified index.
+ ///
+ /// Thrown when is .
+ ///
+ public static IEnumerable Skip(this IEnumerable source, int count)
+ {
+ ArgumentNullException.ThrowIfNull(source);
+
+ int i = 0;
+
+ foreach (T item in source)
+ {
+ if (i++ >= count)
+ {
+ yield return item;
+ }
+ }
+ }
+}
diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.SkipAsync.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.SkipAsync.cs
new file mode 100644
index 0000000..fbab4cc
--- /dev/null
+++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.SkipAsync.cs
@@ -0,0 +1,36 @@
+using System.Runtime.CompilerServices;
+
+namespace Csdsa.Algorithms.StreamProccessing;
+
+public static partial class StreamProcessing
+{
+ ///
+ /// Asynchronously bypasses a specified number of elements in an async sequence
+ /// and then returns the remaining elements.
+ ///
+ /// The element type of the async sequence.
+ /// The async sequence to skip elements from.
+ /// The number of elements to skip.
+ /// The cancellation token used to cancel the asynchronous operation.
+ /// An async sequence that contains the elements after the skipped elements.
+ ///
+ /// Thrown when is .
+ ///
+ public static async IAsyncEnumerable SkipAsync(
+ this IAsyncEnumerable source,
+ int count,
+ [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ {
+ ArgumentNullException.ThrowIfNull(source);
+
+ int i = 0;
+
+ await foreach (T item in source.WithCancellation(cancellationToken))
+ {
+ if (i++ >= count)
+ {
+ yield return item;
+ }
+ }
+ }
+}
diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Take.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Take.cs
new file mode 100644
index 0000000..a2a9277
--- /dev/null
+++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Take.cs
@@ -0,0 +1,31 @@
+namespace Csdsa.Algorithms.StreamProccessing;
+
+public static partial class StreamProcessing
+{
+ ///
+ /// Returns a specified number of contiguous elements from the start of a sequence.
+ ///
+ /// The element type.
+ /// The sequence to return elements from.
+ /// The number of elements to return.
+ /// An that contains the specified number of elements.
+ ///
+ /// Thrown when is .
+ ///
+ public static IEnumerable Take(this IEnumerable source, int count)
+ {
+ ArgumentNullException.ThrowIfNull(source);
+
+ int i = 0;
+
+ foreach (T item in source)
+ {
+ if (i++ >= count)
+ {
+ yield break;
+ }
+
+ yield return item;
+ }
+ }
+}
diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.TakeAsync.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.TakeAsync.cs
new file mode 100644
index 0000000..fc7365c
--- /dev/null
+++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.TakeAsync.cs
@@ -0,0 +1,37 @@
+using System.Runtime.CompilerServices;
+
+namespace Csdsa.Algorithms.StreamProccessing;
+
+public static partial class StreamProcessing
+{
+ ///
+ /// Asynchronously returns a specified number of contiguous elements from the start of an async sequence.
+ ///
+ /// The element type of the async sequence.
+ /// The async sequence to return elements from.
+ /// The number of elements to return.
+ /// The cancellation token used to cancel the asynchronous operation.
+ /// An async sequence that contains up to elements.
+ ///
+ /// Thrown when is .
+ ///
+ public static async IAsyncEnumerable TakeAsync(
+ this IAsyncEnumerable source,
+ int count,
+ [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ {
+ ArgumentNullException.ThrowIfNull(source);
+
+ int i = 0;
+
+ await foreach (T item in source.WithCancellation(cancellationToken))
+ {
+ if (i++ >= count)
+ {
+ yield break;
+ }
+
+ yield return item;
+ }
+ }
+}
diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.TakeUntil.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.TakeUntil.cs
new file mode 100644
index 0000000..6212974
--- /dev/null
+++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.TakeUntil.cs
@@ -0,0 +1,36 @@
+namespace Csdsa.Algorithms.StreamProccessing;
+
+public static partial class StreamProcessing
+{
+ ///
+ /// Returns elements from the sequence until the specified predicate evaluates to true,
+ /// including the element that satisfies the predicate.
+ ///
+ /// The element type of the sequence.
+ /// The source sequence.
+ /// A function to test each element for a stop condition.
+ ///
+ /// An that returns elements from the input sequence
+ /// until the predicate is satisfied.
+ ///
+ ///
+ /// Thrown when or is .
+ ///
+ public static IEnumerable TakeUntil(
+ this IEnumerable source,
+ Func predicate)
+ {
+ ArgumentNullException.ThrowIfNull(source);
+ ArgumentNullException.ThrowIfNull(predicate);
+
+ foreach (T item in source)
+ {
+ yield return item;
+
+ if (predicate(item))
+ {
+ yield break;
+ }
+ }
+ }
+}
diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.TakeUntilAsync.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.TakeUntilAsync.cs
new file mode 100644
index 0000000..6f84d9f
--- /dev/null
+++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.TakeUntilAsync.cs
@@ -0,0 +1,39 @@
+using System.Runtime.CompilerServices;
+
+namespace Csdsa.Algorithms.StreamProccessing;
+
+public static partial class StreamProcessing
+{
+ ///
+ /// Asynchronously returns elements from an async sequence until the specified predicate is satisfied,
+ /// including the element that satisfies the predicate.
+ ///
+ /// The element type of the async sequence.
+ /// The async sequence to return elements from.
+ /// A function to test each element for a stop condition.
+ /// The cancellation token used to cancel the asynchronous operation.
+ ///
+ /// An async sequence that returns elements from the input sequence until the predicate is satisfied.
+ ///
+ ///
+ /// Thrown when or is .
+ ///
+ public static async IAsyncEnumerable TakeUntilAsync(
+ this IAsyncEnumerable source,
+ Func predicate,
+ [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ {
+ ArgumentNullException.ThrowIfNull(source);
+ ArgumentNullException.ThrowIfNull(predicate);
+
+ await foreach (T item in source.WithCancellation(cancellationToken))
+ {
+ yield return item;
+
+ if (predicate(item))
+ {
+ yield break;
+ }
+ }
+ }
+}
diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Window.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Window.cs
new file mode 100644
index 0000000..0b49bff
--- /dev/null
+++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.Window.cs
@@ -0,0 +1,43 @@
+namespace Csdsa.Algorithms.StreamProccessing;
+
+public static partial class StreamProcessing
+{
+ ///
+ /// Batches the source sequence into windows of the specified size.
+ ///
+ /// The element type of the sequence.
+ /// The source sequence.
+ /// The size of each window. Must be positive.
+ /// An of windows, each represented as a .
+ ///
+ /// Thrown when is .
+ ///
+ ///
+ /// Thrown when is less than or equal to zero.
+ ///
+ public static IEnumerable> Window(
+ this IEnumerable source,
+ int windowSize)
+ {
+ ArgumentNullException.ThrowIfNull(source);
+ ArgumentOutOfRangeException.ThrowIfNegativeOrZero(windowSize);
+
+ List buffer = new List(windowSize);
+
+ foreach (T item in source)
+ {
+ buffer.Add(item);
+
+ if (buffer.Count == windowSize)
+ {
+ yield return new List(buffer);
+ buffer.Clear();
+ }
+ }
+
+ if (buffer.Count > 0)
+ {
+ yield return new List(buffer);
+ }
+ }
+}
diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.WindowAsync.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.WindowAsync.cs
new file mode 100644
index 0000000..0175281
--- /dev/null
+++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.WindowAsync.cs
@@ -0,0 +1,49 @@
+using System.Runtime.CompilerServices;
+
+namespace Csdsa.Algorithms.StreamProccessing;
+
+public static partial class StreamProcessing
+{
+ ///
+ /// Asynchronously batches an async sequence into windows of the specified size.
+ ///
+ /// The element type of the async sequence.
+ /// The async sequence to batch.
+ /// The size of each window. Must be positive.
+ /// The cancellation token used to cancel the asynchronous operation.
+ ///
+ /// An async sequence of windows, each represented as a .
+ ///
+ ///
+ /// Thrown when is .
+ ///
+ ///
+ /// Thrown when is less than or equal to zero.
+ ///
+ public static async IAsyncEnumerable> WindowAsync(
+ this IAsyncEnumerable source,
+ int windowSize,
+ [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ {
+ ArgumentNullException.ThrowIfNull(source);
+ ArgumentOutOfRangeException.ThrowIfNegativeOrZero(windowSize);
+
+ List buffer = new List(windowSize);
+
+ await foreach (T item in source.WithCancellation(cancellationToken))
+ {
+ buffer.Add(item);
+
+ if (buffer.Count == windowSize)
+ {
+ yield return new List(buffer);
+ buffer.Clear();
+ }
+ }
+
+ if (buffer.Count > 0)
+ {
+ yield return buffer;
+ }
+ }
+}
diff --git a/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.cs b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.cs
new file mode 100644
index 0000000..3978f75
--- /dev/null
+++ b/src/Csdsa/Algorithms/StreamProccessing/StreamProcessing.cs
@@ -0,0 +1,44 @@
+namespace Csdsa.Algorithms.StreamProccessing;
+
+///
+/// Provides functional-style stream processing helpers for synchronous and asynchronous
+/// sequences, focusing on composability and low-allocation patterns.
+///
+/// Concepts:
+///
+///
+/// -
+/// Functional transformations such as Map, Filter, and Reduce.
+///
+/// -
+/// Batching and grouping via Window, Chunk, and Pairwise operations.
+///
+/// -
+/// Uniqueness and distinctness via DistinctBy and DistinctByAsync.
+///
+/// -
+/// Early termination and control flow using Take, Skip, and TakeUntil variants.
+///
+///
+///
+/// Key practices:
+///
+///
+/// -
+/// Using extension methods for fluent, chainable stream processing.
+///
+/// -
+/// Covering both synchronous ()
+/// and asynchronous () data flows.
+///
+/// -
+/// Being allocation-conscious for high-throughput scenarios.
+///
+/// -
+/// Favoring composability and separation of concerns in algorithmic pipelines.
+///
+///
+///
+public static partial class StreamProcessing
+{
+}
diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Chunk.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Chunk.Tests.cs
new file mode 100644
index 0000000..d809870
--- /dev/null
+++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Chunk.Tests.cs
@@ -0,0 +1,24 @@
+using System.Collections.Generic;
+using System.Linq;
+
+using Csdsa.Algorithms.StreamProccessing;
+
+using Xunit;
+
+namespace Csdsa.Tests.Algorithms.StreamProccessing;
+
+public sealed partial class StreamProcessingTests
+{
+ [Fact]
+ public void Chunk_GroupsElements()
+ {
+ int[] source = { 1, 2, 3, 4, 5 };
+
+ List> result = StreamProcessing.Chunk(source, 2).ToList();
+
+ Assert.Equal(3, result.Count);
+ Assert.Equal(new[] { 1, 2 }, result[0]);
+ Assert.Equal(new[] { 3, 4 }, result[1]);
+ Assert.Equal(new[] { 5 }, result[2]);
+ }
+}
diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.ChunkAsync.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.ChunkAsync.Tests.cs
new file mode 100644
index 0000000..133ca54
--- /dev/null
+++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.ChunkAsync.Tests.cs
@@ -0,0 +1,28 @@
+using System.Collections.Generic;
+using System.Threading.Tasks;
+
+using Csdsa.Algorithms.StreamProccessing;
+
+using Xunit;
+
+namespace Csdsa.Tests.Algorithms.StreamProccessing;
+
+public sealed partial class StreamProcessingTests
+{
+ [Fact]
+ public async Task ChunkAsync_GroupsElements()
+ {
+ IAsyncEnumerable source = ToAsyncEnumerable(Int32Array12345);
+
+ List> result = new List>();
+ await foreach (List chunk in StreamProcessing.ChunkAsync(source, 2))
+ {
+ result.Add(chunk);
+ }
+
+ Assert.Equal(3, result.Count);
+ Assert.Equal(new[] { 1, 2 }, result[0]);
+ Assert.Equal(new[] { 3, 4 }, result[1]);
+ Assert.Equal(new[] { 5 }, result[2]);
+ }
+}
diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.DistinctBy.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.DistinctBy.Tests.cs
new file mode 100644
index 0000000..14bbe4b
--- /dev/null
+++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.DistinctBy.Tests.cs
@@ -0,0 +1,24 @@
+using System.Collections.Generic;
+using System.Linq;
+
+using Csdsa.Algorithms.StreamProccessing;
+
+using Xunit;
+
+namespace Csdsa.Tests.Algorithms.StreamProccessing;
+
+public sealed partial class StreamProcessingTests
+{
+ [Fact]
+ public void DistinctBy_ReturnsDistinctItems()
+ {
+ string[] source = { "a", "aa", "b", "bb", "ccc" };
+
+ List result = StreamProcessing.DistinctBy(source, s => s.Length).ToList();
+
+ Assert.Contains("a", result);
+ Assert.Contains("aa", result);
+ Assert.Contains("ccc", result);
+ Assert.Equal(3, result.Count);
+ }
+}
diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.DistinctByAsync.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.DistinctByAsync.Tests.cs
new file mode 100644
index 0000000..a7ec931
--- /dev/null
+++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.DistinctByAsync.Tests.cs
@@ -0,0 +1,28 @@
+using System.Collections.Generic;
+using System.Threading.Tasks;
+
+using Csdsa.Algorithms.StreamProccessing;
+
+using Xunit;
+
+namespace Csdsa.Tests.Algorithms.StreamProccessing;
+
+public sealed partial class StreamProcessingTests
+{
+ [Fact]
+ public async Task DistinctByAsync_ReturnsDistinctItems()
+ {
+ IAsyncEnumerable source = ToAsyncEnumerable(StringArray);
+
+ List result = new List();
+ await foreach (string s in StreamProcessing.DistinctByAsync(source, x => x.Length))
+ {
+ result.Add(s);
+ }
+
+ Assert.Contains("a", result);
+ Assert.Contains("aa", result);
+ Assert.Contains("ccc", result);
+ Assert.Equal(3, result.Count);
+ }
+}
diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Filter.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Filter.Tests.cs
new file mode 100644
index 0000000..134135b
--- /dev/null
+++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Filter.Tests.cs
@@ -0,0 +1,21 @@
+using System.Collections.Generic;
+using System.Linq;
+
+using Csdsa.Algorithms.StreamProccessing;
+
+using Xunit;
+
+namespace Csdsa.Tests.Algorithms.StreamProccessing;
+
+public sealed partial class StreamProcessingTests
+{
+ [Fact]
+ public void Filter_FiltersElements()
+ {
+ int[] source = { 1, 2, 3, 4 };
+
+ List result = StreamProcessing.Filter(source, x => x % 2 == 0).ToList();
+
+ Assert.Equal(new[] { 2, 4 }, result);
+ }
+}
diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.FilterAsync.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.FilterAsync.Tests.cs
new file mode 100644
index 0000000..a19ba97
--- /dev/null
+++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.FilterAsync.Tests.cs
@@ -0,0 +1,25 @@
+using System.Collections.Generic;
+using System.Threading.Tasks;
+
+using Csdsa.Algorithms.StreamProccessing;
+
+using Xunit;
+
+namespace Csdsa.Tests.Algorithms.StreamProccessing;
+
+public sealed partial class StreamProcessingTests
+{
+ [Fact]
+ public async Task FilterAsync_FiltersElements()
+ {
+ IAsyncEnumerable source = ToAsyncEnumerable(Int32Array1234);
+
+ List result = new List();
+ await foreach (int x in StreamProcessing.FilterAsync(source, y => y % 2 == 0))
+ {
+ result.Add(x);
+ }
+
+ Assert.Equal(new[] { 2, 4 }, result);
+ }
+}
diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.ForEach.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.ForEach.Tests.cs
new file mode 100644
index 0000000..f3bb863
--- /dev/null
+++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.ForEach.Tests.cs
@@ -0,0 +1,19 @@
+using Csdsa.Algorithms.StreamProccessing;
+
+using Xunit;
+
+namespace Csdsa.Tests.Algorithms.StreamProccessing;
+
+public sealed partial class StreamProcessingTests
+{
+ [Fact]
+ public void ForEach_PerformsAction()
+ {
+ int[] source = { 1, 2, 3 };
+ int sum = 0;
+
+ StreamProcessing.ForEach(source, x => sum += x);
+
+ Assert.Equal(6, sum);
+ }
+}
diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.ForEachAync.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.ForEachAync.Tests.cs
new file mode 100644
index 0000000..735ca6f
--- /dev/null
+++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.ForEachAync.Tests.cs
@@ -0,0 +1,21 @@
+using System.Threading.Tasks;
+
+using Csdsa.Algorithms.StreamProccessing;
+
+using Xunit;
+
+namespace Csdsa.Tests.Algorithms.StreamProccessing;
+
+public sealed partial class StreamProcessingTests
+{
+ [Fact]
+ public async Task ForEachAsync_PerformsAction()
+ {
+ IAsyncEnumerable source = ToAsyncEnumerable(Int32Array123);
+ int sum = 0;
+
+ await StreamProcessing.ForEachAsync(source, x => sum += x);
+
+ Assert.Equal(6, sum);
+ }
+}
diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Helpers.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Helpers.Tests.cs
new file mode 100644
index 0000000..2011dca
--- /dev/null
+++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Helpers.Tests.cs
@@ -0,0 +1,23 @@
+using System.Collections.Generic;
+using System.Threading.Tasks;
+
+using Csdsa.Algorithms.StreamProccessing;
+
+namespace Csdsa.Tests.Algorithms.StreamProccessing;
+
+public sealed partial class StreamProcessingTests
+{
+ private static async IAsyncEnumerable ToAsyncEnumerable(IEnumerable source)
+ {
+ foreach (T item in source)
+ {
+ await Task.Yield();
+ yield return item;
+ }
+ }
+
+ private static readonly int[] Int32Array123 = { 1, 2, 3 };
+ private static readonly int[] Int32Array1234 = { 1, 2, 3, 4 };
+ private static readonly int[] Int32Array12345 = { 1, 2, 3, 4, 5 };
+ private static readonly string[] StringArray = { "a", "aa", "b", "bb", "ccc" };
+}
diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Map.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Map.Tests.cs
new file mode 100644
index 0000000..b8b44de
--- /dev/null
+++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Map.Tests.cs
@@ -0,0 +1,21 @@
+using System.Collections.Generic;
+using System.Linq;
+
+using Csdsa.Algorithms.StreamProccessing;
+
+using Xunit;
+
+namespace Csdsa.Tests.Algorithms.StreamProccessing;
+
+public sealed partial class StreamProcessingTests
+{
+ [Fact]
+ public void Map_TransformsElements()
+ {
+ int[] source = { 1, 2, 3 };
+
+ List result = StreamProcessing.Map(source, x => x * 2).ToList();
+
+ Assert.Equal(new[] { 2, 4, 6 }, result);
+ }
+}
diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.MapAsync.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.MapAsync.Tests.cs
new file mode 100644
index 0000000..aee3afb
--- /dev/null
+++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.MapAsync.Tests.cs
@@ -0,0 +1,25 @@
+using System.Collections.Generic;
+using System.Threading.Tasks;
+
+using Csdsa.Algorithms.StreamProccessing;
+
+using Xunit;
+
+namespace Csdsa.Tests.Algorithms.StreamProccessing;
+
+public sealed partial class StreamProcessingTests
+{
+ [Fact]
+ public async Task MapAsync_TransformsElements()
+ {
+ IAsyncEnumerable source = ToAsyncEnumerable(Int32Array123);
+
+ List result = new List();
+ await foreach (int x in StreamProcessing.MapAsync(source, y => y * 2))
+ {
+ result.Add(x);
+ }
+
+ Assert.Equal(new[] { 2, 4, 6 }, result);
+ }
+}
diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Pairwise.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Pairwise.Tests.cs
new file mode 100644
index 0000000..c24bc25
--- /dev/null
+++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Pairwise.Tests.cs
@@ -0,0 +1,21 @@
+using System.Collections.Generic;
+using System.Linq;
+
+using Csdsa.Algorithms.StreamProccessing;
+
+using Xunit;
+
+namespace Csdsa.Tests.Algorithms.StreamProccessing;
+
+public sealed partial class StreamProcessingTests
+{
+ [Fact]
+ public void Pairwise_ReturnsConsecutivePairs()
+ {
+ int[] source = { 1, 2, 3 };
+
+ List<(int First, int Second)> result = StreamProcessing.Pairwise(source).ToList();
+
+ Assert.Equal(new (int, int)[] { (1, 2), (2, 3) }, result);
+ }
+}
diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.PairwiseAsync.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.PairwiseAsync.Tests.cs
new file mode 100644
index 0000000..ed0205a
--- /dev/null
+++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.PairwiseAsync.Tests.cs
@@ -0,0 +1,25 @@
+using System.Collections.Generic;
+using System.Threading.Tasks;
+
+using Csdsa.Algorithms.StreamProccessing;
+
+using Xunit;
+
+namespace Csdsa.Tests.Algorithms.StreamProccessing;
+
+public sealed partial class StreamProcessingTests
+{
+ [Fact]
+ public async Task PairwiseAsync_ReturnsConsecutivePairs()
+ {
+ IAsyncEnumerable source = ToAsyncEnumerable(Int32Array123);
+
+ List<(int First, int Second)> result = new List<(int First, int Second)>();
+ await foreach ((int First, int Second) pair in StreamProcessing.PairwiseAsync(source))
+ {
+ result.Add(pair);
+ }
+
+ Assert.Equal(new (int, int)[] { (1, 2), (2, 3) }, result);
+ }
+}
diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Reduce.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Reduce.Tests.cs
new file mode 100644
index 0000000..6e0b78e
--- /dev/null
+++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Reduce.Tests.cs
@@ -0,0 +1,18 @@
+using Csdsa.Algorithms.StreamProccessing;
+
+using Xunit;
+
+namespace Csdsa.Tests.Algorithms.StreamProccessing;
+
+public sealed partial class StreamProcessingTests
+{
+ [Fact]
+ public void Reduce_SumsElements()
+ {
+ int[] source = { 1, 2, 3 };
+
+ int result = StreamProcessing.Reduce(source, 0, (acc, x) => acc + x);
+
+ Assert.Equal(6, result);
+ }
+}
diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.ReduceAsync.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.ReduceAsync.Tests.cs
new file mode 100644
index 0000000..6512572
--- /dev/null
+++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.ReduceAsync.Tests.cs
@@ -0,0 +1,20 @@
+using System.Threading.Tasks;
+
+using Csdsa.Algorithms.StreamProccessing;
+
+using Xunit;
+
+namespace Csdsa.Tests.Algorithms.StreamProccessing;
+
+public sealed partial class StreamProcessingTests
+{
+ [Fact]
+ public async Task ReduceAsync_SumsElements()
+ {
+ IAsyncEnumerable source = ToAsyncEnumerable(Int32Array123);
+
+ int sum = await StreamProcessing.ReduceAsync(source, 0, (acc, x) => acc + x);
+
+ Assert.Equal(6, sum);
+ }
+}
diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Skip.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Skip.Tests.cs
new file mode 100644
index 0000000..6a49351
--- /dev/null
+++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Skip.Tests.cs
@@ -0,0 +1,21 @@
+using System.Collections.Generic;
+using System.Linq;
+
+using Csdsa.Algorithms.StreamProccessing;
+
+using Xunit;
+
+namespace Csdsa.Tests.Algorithms.StreamProccessing;
+
+public sealed partial class StreamProcessingTests
+{
+ [Fact]
+ public void Skip_SkipsElements()
+ {
+ int[] source = { 1, 2, 3, 4 };
+
+ List result = StreamProcessing.Skip(source, 2).ToList();
+
+ Assert.Equal(new[] { 3, 4 }, result);
+ }
+}
diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.SkipAsync.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.SkipAsync.Tests.cs
new file mode 100644
index 0000000..32a2a35
--- /dev/null
+++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.SkipAsync.Tests.cs
@@ -0,0 +1,25 @@
+using System.Collections.Generic;
+using System.Threading.Tasks;
+
+using Csdsa.Algorithms.StreamProccessing;
+
+using Xunit;
+
+namespace Csdsa.Tests.Algorithms.StreamProccessing;
+
+public sealed partial class StreamProcessingTests
+{
+ [Fact]
+ public async Task SkipAsync_SkipsElements()
+ {
+ IAsyncEnumerable source = ToAsyncEnumerable(Int32Array1234);
+
+ List result = new List();
+ await foreach (int x in StreamProcessing.SkipAsync(source, 2))
+ {
+ result.Add(x);
+ }
+
+ Assert.Equal(new[] { 3, 4 }, result);
+ }
+}
diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Take.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Take.Tests.cs
new file mode 100644
index 0000000..9c25e10
--- /dev/null
+++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Take.Tests.cs
@@ -0,0 +1,21 @@
+using System.Collections.Generic;
+using System.Linq;
+
+using Csdsa.Algorithms.StreamProccessing;
+
+using Xunit;
+
+namespace Csdsa.Tests.Algorithms.StreamProccessing;
+
+public sealed partial class StreamProcessingTests
+{
+ [Fact]
+ public void Take_TakesElements()
+ {
+ int[] source = { 1, 2, 3, 4 };
+
+ List result = StreamProcessing.Take(source, 2).ToList();
+
+ Assert.Equal(new[] { 1, 2 }, result);
+ }
+}
diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.TakeAsync.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.TakeAsync.Tests.cs
new file mode 100644
index 0000000..ddb9127
--- /dev/null
+++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.TakeAsync.Tests.cs
@@ -0,0 +1,25 @@
+using System.Collections.Generic;
+using System.Threading.Tasks;
+
+using Csdsa.Algorithms.StreamProccessing;
+
+using Xunit;
+
+namespace Csdsa.Tests.Algorithms.StreamProccessing;
+
+public sealed partial class StreamProcessingTests
+{
+ [Fact]
+ public async Task TakeAsync_TakesElements()
+ {
+ IAsyncEnumerable source = ToAsyncEnumerable(Int32Array1234);
+
+ List result = new List();
+ await foreach (int x in StreamProcessing.TakeAsync(source, 2))
+ {
+ result.Add(x);
+ }
+
+ Assert.Equal(new[] { 1, 2 }, result);
+ }
+}
diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.TakeUntil.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.TakeUntil.Tests.cs
new file mode 100644
index 0000000..646ddef
--- /dev/null
+++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.TakeUntil.Tests.cs
@@ -0,0 +1,21 @@
+using System.Collections.Generic;
+using System.Linq;
+
+using Csdsa.Algorithms.StreamProccessing;
+
+using Xunit;
+
+namespace Csdsa.Tests.Algorithms.StreamProccessing;
+
+public sealed partial class StreamProcessingTests
+{
+ [Fact]
+ public void TakeUntil_StopsAtPredicate()
+ {
+ int[] source = { 1, 2, 3, 4, 5 };
+
+ List result = StreamProcessing.TakeUntil(source, x => x == 3).ToList();
+
+ Assert.Equal(new[] { 1, 2, 3 }, result);
+ }
+}
diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.TakeUntilAsync.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.TakeUntilAsync.Tests.cs
new file mode 100644
index 0000000..ea25863
--- /dev/null
+++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.TakeUntilAsync.Tests.cs
@@ -0,0 +1,25 @@
+using System.Collections.Generic;
+using System.Threading.Tasks;
+
+using Csdsa.Algorithms.StreamProccessing;
+
+using Xunit;
+
+namespace Csdsa.Tests.Algorithms.StreamProccessing;
+
+public sealed partial class StreamProcessingTests
+{
+ [Fact]
+ public async Task TakeUntilAsync_StopsAtPredicate()
+ {
+ IAsyncEnumerable source = ToAsyncEnumerable(Int32Array12345);
+
+ List result = new List();
+ await foreach (int x in StreamProcessing.TakeUntilAsync(source, y => y == 3))
+ {
+ result.Add(x);
+ }
+
+ Assert.Equal(new[] { 1, 2, 3 }, result);
+ }
+}
diff --git a/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Window.Tests.cs b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Window.Tests.cs
new file mode 100644
index 0000000..9e0fedb
--- /dev/null
+++ b/tests/Csdsa.Tests/Algorithms/StreamProcessing/StreamProcessing.Window.Tests.cs
@@ -0,0 +1,24 @@
+using System.Collections.Generic;
+using System.Linq;
+
+using Csdsa.Algorithms.StreamProccessing;
+
+using Xunit;
+
+namespace Csdsa.Tests.Algorithms.StreamProccessing;
+
+public sealed partial class StreamProcessingTests
+{
+ [Fact]
+ public void Window_BatchesElements()
+ {
+ int[] source = { 1, 2, 3, 4, 5 };
+
+ List> result = StreamProcessing.Window(source, 2).ToList();
+
+ Assert.Equal(3, result.Count);
+ Assert.Equal(new[] { 1, 2 }, result[0]);
+ Assert.Equal(new[] { 3, 4 }, result[1]);
+ Assert.Equal(new[] { 5 }, result[2]);
+ }
+}