Skip to content

Commit

Permalink
Fix deadlock in SourceBlockAsyncEnumerator
Browse files Browse the repository at this point in the history
TODO: add test
  • Loading branch information
azyobuzin committed Mar 21, 2021
1 parent af1c6a0 commit 0938a75
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 30 deletions.
1 change: 1 addition & 0 deletions src/BiDaFlow.AsyncEnumerable/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## Unreleased
### Fixed
- `DataflowAsyncEnumerable.AsSourceBlock(IAsyncEnumerable<T>)` would not dispose the enumerator when the cancel is requested
- Fix deadlock in `SourceBlockAsyncEnumerator`, which occurs when the source offers a message while `MoveNextAsync` is running

## [0.2.1] - 2021-03-17
### Fixed
Expand Down
69 changes: 39 additions & 30 deletions src/BiDaFlow.AsyncEnumerable/Internal/SourceBlockAsyncEnumerable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,44 +56,53 @@ public SourceBlockAsyncEnumerator(ISourceBlock<T> source, CancellationToken canc

public ValueTask<bool> MoveNextAsync()
{
lock (this.Lock)
if (this._isAwaiting) throw new InvalidOperationException("Do not call MoveNextAsync until the previous task is completed.");

this._taskHelper.Reset();

while (true)
{
this._taskHelper.Reset();
DataflowMessageHeader messageHeader;

if (this._exception != null)
{
this._taskHelper.SetException(this._exception);
}
else if (this._cancellationToken.IsCancellationRequested)
{
this._taskHelper.SetResult(null);
}
else if (this._isCompleted)
{
this._taskHelper.SetResult(false);
}
else
lock (this.Lock)
{
this._isAwaiting = true;

// Dequeue postponed message
while (this._queue.Count > 0)
if (this._exception != null)
{
this._taskHelper.SetException(this._exception);
break;
}
else if (this._cancellationToken.IsCancellationRequested)
{
this._taskHelper.SetResult(null);
break;
}
else if (this._isCompleted)
{
var header = this._queue.Dequeue();
var consumedValue = this._source.ConsumeMessage(header, this, out var consumed);

if (consumed)
{
this.Current = consumedValue;
this._isAwaiting = false;
this._taskHelper.SetResult(true);
break;
}
this._taskHelper.SetResult(false);
break;
}
else if (this._queue.Count == 0)
{
this._isAwaiting = true;
break;
}

// Dequeue postponed message
messageHeader = this._queue.Dequeue();
}

return new ValueTask<bool>(this, this._taskHelper.Version);
// Call ConsumeMessage outside the lock to avoid deadlock in the source block (OutgoingLock)
var consumedValue = this._source.ConsumeMessage(messageHeader, this, out var consumed);

if (consumed)
{
this.Current = consumedValue;
this._taskHelper.SetResult(true);
break;
}
}

return new ValueTask<bool>(this, this._taskHelper.Version);
}

public ValueTask DisposeAsync()
Expand Down

0 comments on commit 0938a75

Please sign in to comment.