Skip to content

Commit

Permalink
Add ParallelEnumerate overloads with degreeOfParallism (#106)
Browse files Browse the repository at this point in the history
Fixes #103
  • Loading branch information
nietras authored Mar 8, 2024
1 parent 6a3cf86 commit 592ca80
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 7 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1607,6 +1607,8 @@ namespace nietras.SeparatedValues
public static nietras.SeparatedValues.SepReader FromText(this nietras.SeparatedValues.SepReaderOptions options, string text) { }
public static System.Collections.Generic.IEnumerable<T> ParallelEnumerate<T>(this nietras.SeparatedValues.SepReader reader, nietras.SeparatedValues.SepReader.RowFunc<T> select) { }
public static System.Collections.Generic.IEnumerable<T> ParallelEnumerate<T>(this nietras.SeparatedValues.SepReader reader, nietras.SeparatedValues.SepReader.RowTryFunc<T> trySelect) { }
public static System.Collections.Generic.IEnumerable<T> ParallelEnumerate<T>(this nietras.SeparatedValues.SepReader reader, nietras.SeparatedValues.SepReader.RowFunc<T> select, int degreeOfParallism) { }
public static System.Collections.Generic.IEnumerable<T> ParallelEnumerate<T>(this nietras.SeparatedValues.SepReader reader, nietras.SeparatedValues.SepReader.RowTryFunc<T> trySelect, int degreeOfParallism) { }
public static nietras.SeparatedValues.SepReaderOptions Reader(this nietras.SeparatedValues.Sep sep) { }
public static nietras.SeparatedValues.SepReaderOptions Reader(this nietras.SeparatedValues.Sep? sep) { }
public static nietras.SeparatedValues.SepReaderOptions Reader(this nietras.SeparatedValues.SepSpec spec) { }
Expand Down
24 changes: 24 additions & 0 deletions src/Sep.Test/PackageAssetsTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@ public void PackageAssetsTest_ParallelEnumerate_NoQuotes(SepCreateToString creat
VerifyEnumerate(text, createToString, (reader, select) => reader.ParallelEnumerate(select));
}

[DataTestMethod]
[DynamicData(nameof(ToStrings))]
public void PackageAssetsTest_ParallelEnumerate_NoQuotes_DegreeOfParallism(SepCreateToString createToString)
{
#if SEPREADERTRACE
var text = NoQuotes;
#else
var text = string.Join(string.Empty, Enumerable.Repeat(NoQuotes, 100));
#endif
VerifyEnumerate(text, createToString, (reader, select) => reader.ParallelEnumerate(select, degreeOfParallism: 5));
}

[DataTestMethod]
[DynamicData(nameof(ToStrings))]
public void PackageAssetsTest_ParallelEnumerate_WithQuotes(SepCreateToString createToString)
Expand All @@ -113,6 +125,18 @@ public void PackageAssetsTest_ParallelEnumerate_RowTryFunc_NoQuotes(SepCreateToS
VerifyEnumerateTry(text, createToString, (reader, select) => reader.ParallelEnumerate(select));
}

[DataTestMethod]
[DynamicData(nameof(ToStrings))]
public void PackageAssetsTest_ParallelEnumerate_RowTryFunc_NoQuotes_DegreeOfParallism(SepCreateToString createToString)
{
#if SEPREADERTRACE
var text = NoQuotes;
#else
var text = string.Join(string.Empty, Enumerable.Repeat(NoQuotes, 100));
#endif
VerifyEnumerateTry(text, createToString, (reader, select) => reader.ParallelEnumerate(select, degreeOfParallism: 5));
}

[DataTestMethod]
[DynamicData(nameof(ToStrings))]
public void PackageAssetsTest_ParallelEnumerate_RowTryFunc_WithQuotes(SepCreateToString createToString)
Expand Down
2 changes: 1 addition & 1 deletion src/Sep/Sep.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<EnableAOTAnalyzer>true</EnableAOTAnalyzer>

<EnablePackageValidation>true</EnablePackageValidation>
<PackageValidationBaselineVersion>0.3.0</PackageValidationBaselineVersion>
<PackageValidationBaselineVersion>0.4.0</PackageValidationBaselineVersion>

<Description>Modern, minimal, fast, zero allocation, reading and writing of separated values (csv, tsv etc.). Cross-platform, trimmable and AOT/NativeAOT compatible.</Description>

Expand Down
35 changes: 29 additions & 6 deletions src/Sep/SepReaderExtensions.Enumeration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,29 @@ public static IEnumerable<T> ParallelEnumerate<T>(this SepReader reader, SepRead
return ParallelEnumerateAsParallel(reader, trySelect);
}

static IEnumerable<T> ParallelEnumerateAsParallel<T>(this SepReader reader, SepReader.RowFunc<T> select)
public static IEnumerable<T> ParallelEnumerate<T>(this SepReader reader, SepReader.RowFunc<T> select, int degreeOfParallism)
{
ArgumentNullException.ThrowIfNull(reader);
ArgumentNullException.ThrowIfNull(select);
if (!reader.HasRows) { return Array.Empty<T>(); }
return ParallelEnumerateAsParallel(reader, select, p => p.WithDegreeOfParallelism(degreeOfParallism));
}

public static IEnumerable<T> ParallelEnumerate<T>(this SepReader reader, SepReader.RowTryFunc<T> trySelect, int degreeOfParallism)
{
ArgumentNullException.ThrowIfNull(reader);
ArgumentNullException.ThrowIfNull(trySelect);
if (!reader.HasRows) { return Array.Empty<T>(); }
return ParallelEnumerateAsParallel(reader, trySelect, p => p.WithDegreeOfParallelism(degreeOfParallism));
}

static IEnumerable<T> ParallelEnumerateAsParallel<T>(this SepReader reader, SepReader.RowFunc<T> select,
Func<ParallelQuery<SepReaderState>, ParallelQuery<SepReaderState>>? modifyParallelQuery = null)
{
var statesStack = new ConcurrentStack<SepReaderState>();
try
{
var parallelStates = EnumerateStatesParallel(reader, statesStack);
var parallelStates = EnumerateStatesParallel(reader, statesStack, modifyParallelQuery);
var batches = parallelStates.Select(PooledSelect);
foreach (var batch in batches)
{
Expand Down Expand Up @@ -91,12 +108,13 @@ static IEnumerable<T> ParallelEnumerateAsParallel<T>(this SepReader reader, SepR
}
}

static IEnumerable<T> ParallelEnumerateAsParallel<T>(this SepReader reader, SepReader.RowTryFunc<T> trySelect)
static IEnumerable<T> ParallelEnumerateAsParallel<T>(this SepReader reader, SepReader.RowTryFunc<T> trySelect,
Func<ParallelQuery<SepReaderState>, ParallelQuery<SepReaderState>>? modifyParallelQuery = null)
{
var statesStack = new ConcurrentStack<SepReaderState>();
try
{
var parallelStates = EnumerateStatesParallel(reader, statesStack);
var parallelStates = EnumerateStatesParallel(reader, statesStack, modifyParallelQuery);
var batches = parallelStates.Select(PooledSelect);
foreach (var batch in batches)
{
Expand Down Expand Up @@ -132,10 +150,15 @@ static IEnumerable<T> ParallelEnumerateAsParallel<T>(this SepReader reader, SepR
return (array, index);
}
}
static ParallelQuery<SepReaderState> EnumerateStatesParallel(SepReader reader, ConcurrentStack<SepReaderState> statesStack)

static ParallelQuery<SepReaderState> EnumerateStatesParallel(SepReader reader,
ConcurrentStack<SepReaderState> statesStack,
Func<ParallelQuery<SepReaderState>, ParallelQuery<SepReaderState>>? modifyParallelQuery = null)
{
var states = EnumerateStates(reader, statesStack);
return states.AsParallel().AsOrdered();
// For now always force ordered
var statesParallel = states.AsParallel().AsOrdered();
return modifyParallelQuery is null ? statesParallel : modifyParallelQuery(statesParallel);
}

static IEnumerable<SepReaderState> EnumerateStates(SepReader reader, ConcurrentStack<SepReaderState> states)
Expand Down

0 comments on commit 592ca80

Please sign in to comment.