Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Have MergedLinesEnumerable implement IAsyncEnumerable<string> #109

Open
wants to merge 30 commits into
base: release-1.7
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
3fddb89
Update README.md
madelson Nov 15, 2020
15815ce
Update README.md
madelson Mar 10, 2023
ca60f2a
Update README.md
madelson Mar 10, 2023
da45635
Update README.md
madelson Mar 10, 2023
e45f1f6
Update README.md
madelson Apr 4, 2023
87316df
Close #98: Have MergedLinesEnumerable implement IAsyncEnumerable<string>
Bartleby2718 Feb 26, 2024
7976a96
Merge branch 'release-1.7' into IAsyncEnumerable
Bartleby2718 Mar 7, 2024
d3720b9
Fix spaces, preprocessor directives, pull out common assertions, Add …
Bartleby2718 Mar 7, 2024
990605e
Use var
Bartleby2718 Mar 7, 2024
0ab96b0
Pass CancellationToken to GetAsyncEnumeratorInternal
Bartleby2718 Mar 7, 2024
fdd9137
Unfixed
Bartleby2718 Mar 8, 2024
750b1e0
Fix all bugs to pass all tests (but seeing some transient failures of…
Bartleby2718 Mar 10, 2024
caa9428
Clean up per Visual Studio's suggestions and remove comments
Bartleby2718 Mar 10, 2024
3b99204
Remove System.Linq.Async and use an extension method instead
Bartleby2718 Mar 10, 2024
6a2baae
Fix Condition for Microsoft.Bcl.AsyncInterfaces
Bartleby2718 Mar 10, 2024
983a28f
Revert primary constructor changes to fix CI failures
Bartleby2718 Mar 10, 2024
7104cea
Also revert collection expressions changes
Bartleby2718 Mar 10, 2024
f6a555a
Revert primary constructor in AsyncEnumerableAdapter
Bartleby2718 Mar 10, 2024
9030874
Revert primary constructor in AsyncEnumeratorAdapter
Bartleby2718 Mar 10, 2024
b2b7bbe
Revert collection expression in MergedLinesEnumerableTestBase
Bartleby2718 Mar 10, 2024
f9f0813
Fix spacing and variable names
Bartleby2718 Mar 11, 2024
8d18b9d
Fix preprocessor directives for IAsyncEnumerable/IAsyncEnumerator
Bartleby2718 Mar 11, 2024
5151a25
Replace AsyncEnumerableAdapter with AsAsyncEnumerable
Bartleby2718 Mar 11, 2024
97198a3
Revert to doing Task.WaitAll(task1, task2, consumeTask)
Bartleby2718 Mar 11, 2024
1420db9
Revert AsAsyncEnumerable changes to disallow repeated consumptions
Bartleby2718 Mar 11, 2024
4c68836
Minor style changes
Bartleby2718 Jun 29, 2024
263d7dd
Do not wait consumeTask with other tasks, and revert everything else
Bartleby2718 Jun 29, 2024
16b38df
Merge branch 'release-1.7' into IAsyncEnumerable
Bartleby2718 Jun 29, 2024
7830e3e
Try bumping the timeout, considering the CI pipeline
Bartleby2718 Jun 29, 2024
ff9af41
Try replacing Task.Run with an async method
Bartleby2718 Jul 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions MedallionShell.Tests/MedallionShell.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
<IsPackable>false</IsPackable>
<LangVersion>Latest</LangVersion>
<Nullable>enable</Nullable>
<GenerateDocumentationFile>True</GenerateDocumentationFile>
<CodeAnalysisRuleSet>..\stylecop.analyzers.ruleset</CodeAnalysisRuleSet>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<GenerateDocumentationFile>True</GenerateDocumentationFile>
<CodeAnalysisRuleSet>..\stylecop.analyzers.ruleset</CodeAnalysisRuleSet>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<NoWarn>1591</NoWarn>
<RootNamespace>Medallion.Shell.Tests</RootNamespace>
</PropertyGroup>
Expand All @@ -17,15 +17,16 @@
<PackageReference Include="nunit" Version="3.13.3" />
<PackageReference Include="NUnit3TestAdapter" Version="4.4.0-beta.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.4.1" />
<PackageReference Include="Moq" Version="4.7.63" />
<PackageReference Include="StyleCop.Analyzers" Version="1.2.0-beta.435">
<PackageReference Include="Moq" Version="4.7.63" />
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="8.0.0" />
<PackageReference Include="StyleCop.Analyzers" Version="1.2.0-beta.435">
<PrivateAssets>All</PrivateAssets>
</PackageReference>
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\MedallionShell\MedallionShell.csproj" />
<ProjectReference Include="..\SampleCommand\SampleCommand.csproj" />
<ProjectReference Include="..\SampleCommand\SampleCommand.csproj" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'net462'">
Expand Down
39 changes: 39 additions & 0 deletions MedallionShell.Tests/Streams/AsyncEnumerableAdapter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Medallion.Shell.Tests.Streams;

Bartleby2718 marked this conversation as resolved.
Show resolved Hide resolved
public class AsyncEnumerableAdapter : IAsyncEnumerable<string>
{
private readonly IEnumerable<string> strings;

public AsyncEnumerableAdapter(IEnumerable<string> strings)
{
this.strings = strings;
}

public IAsyncEnumerator<string> GetAsyncEnumerator(CancellationToken cancellationToken = default) =>
// this does not allow consuming the same IEnumerable twice
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@madelson TestConsumeTwice started failing after making this change. Explanation from ChatGPT

In the AsyncEnumerableAdapter class, the GetAsyncEnumerator() method creates a new AsyncEnumeratorAdapter instance that wraps the original IEnumerator<string>. This means that it's directly using the original enumerator from the IEnumerable<string>, which can only be enumerated once. If you try to get the async enumerator twice, it will fail because the underlying enumerator has already been exhausted.

On the other hand, the AsAsyncEnumerable() extension method creates a new enumerator each time it's called. It does this by iterating over the IEnumerable<T> items in a foreach loop. This means you can call GetAsyncEnumerator() multiple times without an issue because each call creates a new enumerator.

So, if your test expects an InvalidOperationException when calling GetAsyncEnumerator() twice, it will fail when using AsAsyncEnumerable() because this method allows multiple enumerations. If you want to preserve the single-use behavior, you should stick with the AsyncEnumerableAdapter class. If you want to allow multiple enumerations, then AsAsyncEnumerable() is the way to go. It all depends on the specific requirements of your code.

For posterity, I added this comment.

new AsyncEnumeratorAdapter(this.strings.GetEnumerator());

private class AsyncEnumeratorAdapter : IAsyncEnumerator<string>
{
private readonly IEnumerator<string> enumerator;

public AsyncEnumeratorAdapter(IEnumerator<string> enumerator)
{
this.enumerator = enumerator;
}

public string Current => this.enumerator.Current;

public ValueTask DisposeAsync()
{
this.enumerator.Dispose();
return default;
}

public ValueTask<bool> MoveNextAsync() => new(this.enumerator.MoveNext());
}
}
137 changes: 0 additions & 137 deletions MedallionShell.Tests/Streams/MergedLinesEnumerableTest.cs

This file was deleted.

13 changes: 13 additions & 0 deletions MedallionShell.Tests/Streams/MergedLinesEnumerableTestAsync.cs
Bartleby2718 marked this conversation as resolved.
Show resolved Hide resolved
Bartleby2718 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#if NETCOREAPP3_0_OR_GREATER || NETSTANDARD2_0_OR_GREATER
using System.Collections.Generic;
using System.IO;
using Medallion.Shell.Streams;

namespace Medallion.Shell.Tests.Streams;

public class MergedLinesEnumerableTestAsync : MergedLinesEnumerableTestBase
{
protected override IAsyncEnumerable<string> Create(TextReader reader1, TextReader reader2) =>
new MergedLinesEnumerable(reader1, reader2);
}
#endif
142 changes: 142 additions & 0 deletions MedallionShell.Tests/Streams/MergedLinesEnumerableTestBase.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Medallion.Shell.Streams;
using Moq;
using NUnit.Framework;

namespace Medallion.Shell.Tests.Streams;

public abstract class MergedLinesEnumerableTestBase
{
protected abstract IAsyncEnumerable<string> Create(TextReader reader1, TextReader reader2);

[Test]
public async Task TestOneIsEmpty()
{
var empty1 = new StringReader(string.Empty);
var nonEmpty1 = new StringReader("abc\r\ndef\r\nghi\r\njkl");

var enumerable1 = this.Create(empty1, nonEmpty1);
var list1 = await enumerable1.ToListAsync();
list1.SequenceEqual(new[] { "abc", "def", "ghi", "jkl" })
.ShouldEqual(true, string.Join(", ", list1));

var empty2 = new StringReader(string.Empty);
var nonEmpty2 = new StringReader("a\nbb\nccc\n");
var enumerable2 = this.Create(nonEmpty2, empty2);
var list2 = await enumerable2.ToListAsync();
list2.SequenceEqual(new[] { "a", "bb", "ccc" })
.ShouldEqual(true, string.Join(", ", list2));
}

[Test]
public async Task TestBothAreEmpty()
{
var list = await this.Create(new StringReader(string.Empty), new StringReader(string.Empty)).ToListAsync();
list.Count.ShouldEqual(0, string.Join(", ", list));
}

[Test]
public async Task TestBothArePopulatedEqualSizes()
{
var list = await this.Create(
new StringReader("a\nbb\nccc"),
new StringReader("1\r\n22\r\n333")
)
.ToListAsync();
string.Join(", ", list).ShouldEqual("a, 1, bb, 22, ccc, 333");
}

[Test]
public async Task TestBothArePopulatedDifferenceSizes()
{
var lines1 = string.Join("\n", new[] { "x", "y", "z" });
var lines2 = string.Join("\n", new[] { "1", "2", "3", "4", "5" });

var list1 = await this.Create(new StringReader(lines1), new StringReader(lines2))
.ToListAsync();
string.Join(", ", list1).ShouldEqual("x, 1, y, 2, z, 3, 4, 5");

var list2 = await this.Create(new StringReader(lines2), new StringReader(lines1))
.ToListAsync();
string.Join(", ", list2).ShouldEqual("1, x, 2, y, 3, z, 4, 5");
}

[Test]
public void TestConsumeTwice()
{
var asyncEnumerable = this.Create(new StringReader("a"), new StringReader("b"));
asyncEnumerable.GetAsyncEnumerator();
Assert.Throws<InvalidOperationException>(() => asyncEnumerable.GetAsyncEnumerator());
}

[Test]
public void TestOneThrows()
{
void TestOneThrows(bool reverse)
{
var reader1 = new StringReader("a\nb\nc");
var count = 0;
var mockReader = new Mock<TextReader>(MockBehavior.Strict);
mockReader.Setup(r => r.ReadLineAsync())
.ReturnsAsync(() => ++count < 3 ? "LINE" : throw new TimeZoneNotFoundException());

Assert.ThrowsAsync<TimeZoneNotFoundException>(
async () => await this.Create(
reverse ? mockReader.Object : reader1,
reverse ? reader1 : mockReader.Object
).ToListAsync()
);
}

TestOneThrows(reverse: false);
TestOneThrows(reverse: true);
}

[Test, Timeout(5_000)] // something's wrong if it's taking more than 5 seconds
public async Task FuzzTest()
{
Pipe pipe1 = new(), pipe2 = new();

var asyncEnumerable = this.Create(new StreamReader(pipe1.OutputStream), new StreamReader(pipe2.OutputStream));

var strings1 = Enumerable.Range(0, 2000).Select(_ => Guid.NewGuid().ToString()).ToArray();
var strings2 = Enumerable.Range(0, 2300).Select(_ => Guid.NewGuid().ToString()).ToArray();

static void WriteStrings(IReadOnlyList<string> strings, Pipe pipe)
{
SpinWait spinWait = default;
Random random = new(Guid.NewGuid().GetHashCode());
using StreamWriter writer = new(pipe.InputStream);
foreach (var line in strings)
{
if (random.Next(10) == 1)
{
spinWait.SpinOnce();
}

writer.WriteLine(line);
}
}

var task1 = Task.Run(() => WriteStrings(strings1, pipe1));
var task2 = Task.Run(() => WriteStrings(strings2, pipe2));
Task.WaitAll(task1, task2); // need to dispose the writer to end the stream
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Bartleby2718 can we try await Task.WhenAll(task1, task2, consumeTask); here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@madelson I've been trying to get that working, but this test fails for the async case if I do that. Not sure if the test logic is flawed (i.e. shouldn't await consumeTask if the input streams may not have been closed?) or there's a bug somewhere else.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you try specifically with the test being async and using await Task.WhenAll instead of Task.WaitAll? If that doesn't work, could be some kind of threading bug in 1.7

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@madelson Yes, MergedLinesEnumerableTestAsync's FuzzTest fails but MergedLinesEnumerableTestSync's FuzzTest is fine. It fails even when it's run alone, but I did notice that TestPipeline(2) always fails with it if all tests are run together:
image

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@madelson I somehow got the test to pass with await Task.WhenAll(task1, task2, consumeTask);! Not sure if this is the fix or it means something else needs to be fixed, but this does look promising.

Let me know what you think! (FWIW the test didn't pass within 10 secodns with if (random.Next(4) == 1).)


Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I had to swap the order for tests to pass. Is this a red flag?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah let's revert this change and make sure it still passes. Also, does this pass or fail on main? You didn't make any changes to Pipe I think so it may be an issue with the release branch.

Copy link
Author

@Bartleby2718 Bartleby2718 Mar 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@madelson I looked more into this and gathered some numbers, but I'm lost as to how I should debug this.

Note:

  • Workarounds used in the test project to run tests on master:
    • updated PackageReferences to nunit 3.13.3, NUnit3TestAdapter 3.17.0, Microsoft.NET.Test.Sdk 15.9.0
    • added NU1902,NU1903 to NoWarn and set CheckEolTargetFramework to false
chance \ Branch master
(net46 / netcoreapp2.2)
release-1.7
(net462 / net6.0)
AsyncEnumerable with Task.WaitAll(task1, task2, consumeTask)
(net462 / net6.0)
25% (current) 516ms / 8s
image
511ms / 8.4s
image
522ms / timeout in the async case, 9.2s in the sync case
image
20% (new) 408ms / 6.4s
image
374ms / 6.2s
image
282ms / timeout in the async case, 5.9s in the sync case
image

However, I noticed that a small change makes a difference.

  1. If I start consume before waiting writes, I get a timeout:
            var consumeTask = Task.Run(enumerable.ToListAsync);
            Task.WaitAll(task1, task2);
  1. If I wait for writes before I start consuming, the performance is similar to the first two columns:
            Task.WaitAll(task1, task2);
            var consumeTask = Task.Run(enumerable.ToListAsync);
  • This proves that the problem lies in consumeTask, not SpinWait.
  1. If I do the same as but use much shorter strings1 and strings2, the test completes within 20ms.
            // originally 2000
            var strings1 = Enumerable.Range(0, 20).Select(_ => Guid.NewGuid().ToString()).ToArray();
            // originally 2300
            var strings2 = Enumerable.Range(0, 23).Select(_ => Guid.NewGuid().ToString()).ToArray();
...
            // same as master or release-1.7
            var consumeTask = Task.Run(enumerable.ToListAsync);
            Task.WaitAll(task1, task2, consumeTask);

            CollectionAssert.AreEquivalent(strings1.Concat(strings2).ToList(), consumeTask.Result);

Therefore, I believe that my IAsyncEnumerable implementation is flawed in a way that somehow "explodes" for bigger inputs (the threshold fluctuates, but it's somewhere between 70 and low 100s).

Any idea how I should debug this? For one thing, I think replacing Guid.NewGuid() with a human-friendly value will help, but that's all I can think of. I can also temporarily comment out SpinWait-related code.

Copy link
Author

@Bartleby2718 Bartleby2718 Mar 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also found that consumeTask.Status is WaitingForActivation even after a few seconds (if I don't await it or include consumeTask in the WaitAll).

Copy link
Author

@Bartleby2718 Bartleby2718 Mar 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like ChatGPT came to the rescue again. (It wasn't helping a few hours ago.)

I lost its message, but it said something along the lines of "StreamWriter wasn't being disposed properly, causing the Pipe's InputStream to wait indefinitely."

Now the test passes, but I can't run spinWait.SpinOnce(); as often. Do you think the frequency should also be a protected virtual value?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I lost its message, but it said something along the lines of "StreamWriter wasn't being disposed properly, causing the Pipe's InputStream to wait indefinitely."

Makes sense; we have do dispose the writer to end the stream. Can you point me to the relevant code change?

Now the test passes, but I can't run spinWait.SpinOnce(); as often. Do you think the frequency should also be a protected virtual value?

I'm not sure I follow here. As often as what? Does it fail when it runs more often? In what way? How does the overall time for this test case compare before and after the changes (I would expect it to be the same). Who would be overriding the frequency if it were protected virtual?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@madelson

  1. StreamWriter changes
    FuzzTest is the one where I had to change this. Note the parameter changes of the local function WriteStrings.
  2. spinWait.SpinOnce() changes
    In FuzzTest, specifically https://github.com/madelson/MedallionShell/pull/109/files#diff-68fdbc9634d30b7e1a0bb438ab37b458f0c478766fba188c44ece72f93e41cacR102-R121, I updated the if condition from random.Next(4) == 1 to random.Next(110) == 1, so I'm spinning left often (25% -> 0.91%). If I use a greater value for random.Next (i.e. spin more often), then the test never ends (at least for like 30 minutes, after which I stop the test) only in the async case. Therefore I was wondering if MergedLinesEnumerableTestAsync and MergedLinesEnumerableTestSync should use different frequencies by overriding a protected virtual variable.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@madelson Let me know if the above makes sense!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@madelson Bumping this thread!

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test never ends (at least for like 30 minutes, after which I stop the test) only in the async case

This makes me feel like there is a bug somewhere. It could be in the MergedLinesEnumerable changes, it could be in the test code, or it could be in the Pipe code.

What I would suggest is to (temporarily) add some logging statements to the code like this:

static class TempLogger
{
    private static readonly object Lock = new();

    public static void Log(string message)
    {
       lock (Lock)
       {
            File.AppendAllLines(@"c:\dev\log.txt", [$"[{DateTime.Now}] {message}"]);
       }
    }
}

My assumption is that at some point we should stop seeing log statements as the code will enter a hung state. We can then add additional logs to try to get closer and closer to the point where each thread stops.

From there, hopefully we can deduce why it is hanging.

CollectionAssert.AreEquivalent(strings1.Concat(strings2), await asyncEnumerable.ToListAsync());
}
Bartleby2718 marked this conversation as resolved.
Show resolved Hide resolved
}

public static class AsyncEnumerableExtensions
{
public static async Task<List<string>> ToListAsync(this IAsyncEnumerable<string> strings)
{
List<string> result = [];
await foreach (var item in strings) { result.Add(item); }
return result;
}
}
11 changes: 11 additions & 0 deletions MedallionShell.Tests/Streams/MergedLinesEnumerableTestSync.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System.Collections.Generic;
using System.IO;
using Medallion.Shell.Streams;

namespace Medallion.Shell.Tests.Streams;

public class MergedLinesEnumerableTestSync : MergedLinesEnumerableTestBase
{
protected override IAsyncEnumerable<string> Create(TextReader reader1, TextReader reader2) =>
new AsyncEnumerableAdapter(new MergedLinesEnumerable(reader1, reader2));
}
5 changes: 4 additions & 1 deletion MedallionShell/MedallionShell.csproj
Bartleby2718 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
</PropertyGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'netstandard1.3'">
<PackageReference Include="System.Diagnostics.Process" version="4.3.0" />
<PackageReference Include="System.Diagnostics.Process" version="4.3.0" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)' == 'net46' or '$(TargetFramework)' == 'net45'">
<PackageReference Include="System.Runtime.InteropServices.RuntimeInformation" Version="4.3.0" />
Expand Down Expand Up @@ -80,4 +80,7 @@
<LogicalName>MedallionShell.ProcessSignaler.exe</LogicalName>
</EmbeddedResource>
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0'">
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="8.0.0" />
</ItemGroup>
</Project>
Loading