Skip to content

Commit

Permalink
SerialPortStream: Fix deadlock on ReadAsync
Browse files Browse the repository at this point in the history
Issue: DOTNET-326, #116
  • Loading branch information
jcurl committed Apr 19, 2021
1 parent 0fbf35c commit 25a621b
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 41 deletions.
5 changes: 3 additions & 2 deletions code/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,6 @@
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("2.3.0.0")]
[assembly: AssemblyFileVersion("2.3.0.0")]
[assembly: AssemblyVersion("2.3.1.0")]
[assembly: AssemblyFileVersion("2.3.1.0")]
[assembly: AssemblyInformationalVersion("2.3.1")]
2 changes: 1 addition & 1 deletion code/SerialPortStream-net40.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<TargetFrameworkVersion>v4.0</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
<TargetFrameworkProfile />
<ReleaseVersion>2.3.0.0</ReleaseVersion>
<ReleaseVersion>2.3.1.0</ReleaseVersion>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
Expand Down
2 changes: 1 addition & 1 deletion code/SerialPortStream-net45.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
<TargetFrameworkProfile />
<ReleaseVersion>2.3.0.0</ReleaseVersion>
<ReleaseVersion>2.3.1.0</ReleaseVersion>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
Expand Down
2 changes: 1 addition & 1 deletion code/SerialPortStream-netstandard15.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<PropertyGroup>
<Description>An independent implementation of System.IO.Ports.SerialPort and SerialStream for better reliability and maintainability.</Description>
<AssemblyTitle>SerialPortStream</AssemblyTitle>
<VersionPrefix>2.3.0.0</VersionPrefix>
<VersionPrefix>2.3.1.0</VersionPrefix>
<TargetFramework>netstandard1.5</TargetFramework>
<DefineConstants>$(DefineConstants);NETSTANDARD15</DefineConstants>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
Expand Down
150 changes: 115 additions & 35 deletions code/SerialPortStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ namespace RJCP.IO.Ports
using Native;
using Trace;

#if !NETSTANDARD15
using System.Runtime.Remoting.Messaging;
#if NETSTANDARD15
using System.Runtime.ExceptionServices;
#else
using System.Runtime.InteropServices;
using System.Runtime.Remoting.Messaging;
#endif

#if NETSTANDARD15 || NET45
using System.Threading.Tasks;
#endif
Expand Down Expand Up @@ -704,24 +705,37 @@ private int InternalRead(byte[] buffer, int offset, int count)

#if NETSTANDARD15 || NET45
/// <summary>
///
/// Asynchronously reads a sequence of bytes from the current stream and advances the
/// position within the stream by the number of bytes read.
/// </summary>
/// <param name="buffer">The buffer to read the data into.</param>
/// <param name="offset">The byte offset in <paramref name="buffer"/> at which to begin writing data read from the stream.</param>
/// <param name="offset">
/// The byte offset in <paramref name="buffer"/> at which to begin writing data read from
/// the stream.
/// </param>
/// <param name="count">The maximum number of bytes to read.</param>
/// <param name="cancellationToken">Using a cancellation token is not supported and will be ignored.</param>
/// <returns>A <see cref="Task{Int32}"/> representing the asynchronous operation.</returns>
/// <param name="cancellationToken">
/// Using a cancellation token is not supported and will be ignored.
/// </param>
/// <returns>A <see cref="Task{T}"/> representing the asynchronous operation.</returns>
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return Task<int>.Factory.FromAsync(
InternalBeginRead,
InternalEndRead,
buffer,
offset,
count,
null);
ReadCheck(buffer, offset, count);

TaskCompletionSource<int> tcs = new TaskCompletionSource<int>();
InternalBeginRead(buffer, offset, count, iar => {
try {
tcs.TrySetResult(InternalEndRead(iar));
} catch (OperationCanceledException) {
tcs.TrySetCanceled();
} catch (Exception ex) {
tcs.TrySetException(ex);
}
}, null);
return tcs.Task;
}
#endif

#if NET40 || NET45
/// <summary>
/// Begins an asynchronous read operation.
Expand All @@ -734,6 +748,8 @@ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, Cancel
/// <returns>An <see cref="IAsyncResult"/> object to be used with <see cref="EndRead"/>.</returns>
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
ReadCheck(buffer, offset, count);

return InternalBeginRead(buffer, offset, count, callback, state);
}

Expand All @@ -747,16 +763,18 @@ public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, Asy
/// Streams return zero (0) only at the end of the stream, otherwise, they should block until at least one byte is available.</returns>
public override int EndRead(IAsyncResult asyncResult)
{
if (asyncResult == null) throw new ArgumentNullException(nameof(asyncResult));

return InternalEndRead(asyncResult);
}
#endif

#if !NETSTANDARD15
private delegate int ReadDelegate(byte[] buffer, int offset, int count);
#endif

private IAsyncResult InternalBeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
ReadCheck(buffer, offset, count);

if (m_Buffer == null || count == 0 || m_Buffer.Stream.WaitForRead(0)) {
// Data in the buffer, we can return immediately
LocalAsync<int> ar = new LocalAsync<int>(state);
Expand All @@ -770,12 +788,33 @@ private IAsyncResult InternalBeginRead(byte[] buffer, int offset, int count, Asy
if (callback != null) callback(ar);
return ar;
} else {
ReadDelegate read = InternalBlockingRead;
#if NETSTANDARD15
// No data in buffer, so we create a thread in the background
return Task.Run(() => read(buffer, offset, count));

#if NETSTANDARD15
TaskCompletionSource<int> tcs = new TaskCompletionSource<int>(state);
Task<int> task = new Task<int>(() => {
int r = InternalBlockingRead(buffer, offset, count);
return r;
});

task.ContinueWith(t => {
// Copy the task result into the returned task.
if (t.IsFaulted)
tcs.TrySetException(t.Exception.InnerExceptions);
else if (t.IsCanceled)
tcs.TrySetCanceled();
else
tcs.TrySetResult(t.Result);
// Invoke the user callback if necessary.
if (callback != null)
callback(tcs.Task);
});

task.Start();
return tcs.Task;
#else
// No data in buffer, so we create a thread in the background
ReadDelegate read = InternalBlockingRead;
return read.BeginInvoke(buffer, offset, count, callback, state);
#endif
}
Expand All @@ -790,8 +829,12 @@ private int InternalEndRead(IAsyncResult asyncResult)
return localAsync.Result;
} else {
#if NETSTANDARD15
var ar = (Task<int>)asyncResult;
return ar.Result;
try {
return ((Task<int>)asyncResult).Result;
} catch (AggregateException ex) {
ExceptionDispatchInfo.Capture(ex.InnerException).Throw();
throw;
}
#else
AsyncResult ar = (AsyncResult)asyncResult;
ReadDelegate caller = (ReadDelegate)ar.AsyncDelegate;
Expand Down Expand Up @@ -1215,15 +1258,24 @@ private void InternalWrite(byte[] buffer, int offset, int count)
/// <exception cref="System.InvalidOperationException">Serial port not open.</exception>
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return Task.Factory.FromAsync(
InternalBeginWrite,
InternalEndWrite,
buffer,
offset,
count,
null);
if (!WriteCheck(buffer, offset, count))
return Task.FromResult((object)null);

TaskCompletionSource<object> tcs = new TaskCompletionSource<object>();
InternalBeginWrite(buffer, offset, count, iar => {
try {
InternalEndWrite(iar);
tcs.TrySetResult(null);
} catch (OperationCanceledException) {
tcs.TrySetCanceled();
} catch (Exception ex) {
tcs.TrySetException(ex);
}
}, null);
return tcs.Task;
}
#endif

#if NET40 || NET45
/// <summary>
/// Begins an asynchronous write operation.
Expand All @@ -1240,6 +1292,8 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati
/// <exception cref="System.InvalidOperationException">Serial port not open.</exception>
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
WriteCheck(buffer, offset, count);

return InternalBeginWrite(buffer, offset, count, callback, state);
}

Expand All @@ -1257,16 +1311,18 @@ public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, As
/// </remarks>
public override void EndWrite(IAsyncResult asyncResult)
{
if (asyncResult == null) throw new ArgumentNullException(nameof(asyncResult));

InternalEndWrite(asyncResult);
}
#endif

#if !NETSTANDARD15
private delegate void WriteDelegate(byte[] buffer, int offset, int count);
#endif

private IAsyncResult InternalBeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
WriteCheck(buffer, offset, count);

if (count == 0 || m_Buffer.Stream.WaitForWrite(count, 0)) {
LocalAsync ar = new LocalAsync(state);
if (count > 0) {
Expand All @@ -1277,10 +1333,30 @@ private IAsyncResult InternalBeginWrite(byte[] buffer, int offset, int count, As
if (callback != null) callback(ar);
return ar;
} else {
WriteDelegate write = InternalBlockingWrite;
#if NETSTANDARD15
return Task.Run(() => write(buffer, offset, count));
TaskCompletionSource<object> tcs = new TaskCompletionSource<object>(state);
Task task = new Task(() => {
InternalBlockingWrite(buffer, offset, count);
});

task.ContinueWith(t => {
// Copy the task result into the returned task.
if (t.IsFaulted)
tcs.TrySetException(t.Exception.InnerExceptions);
else if (t.IsCanceled)
tcs.TrySetCanceled();
else
tcs.TrySetResult(null);
// Invoke the user callback if necessary.
if (callback != null)
callback(tcs.Task);
});

task.Start();
return tcs.Task;
#else
WriteDelegate write = InternalBlockingWrite;
return write.BeginInvoke(buffer, offset, count, callback, state);
#endif
}
Expand All @@ -1293,8 +1369,12 @@ private void InternalEndWrite(IAsyncResult asyncResult)
localAsync.Dispose();
} else {
#if NETSTANDARD15
var ar = (Task)asyncResult;
ar.Wait();
try {
((Task)asyncResult).Wait();
} catch (AggregateException ex) {
ExceptionDispatchInfo.Capture(ex.InnerException).Throw();
throw;
}
#else
AsyncResult ar = (AsyncResult)asyncResult;
WriteDelegate caller = (WriteDelegate)ar.AsyncDelegate;
Expand Down
2 changes: 1 addition & 1 deletion code/SerialPortStream.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<package xmlns="http://schemas.microsoft.com/packaging/2011/08/nuspec.xsd">
<metadata>
<id>SerialPortStream</id>
<version>2.3.0</version>
<version>2.3.1</version>
<title>SerialPortStream</title>
<authors>Jason Curl</authors>
<owners>Jason Curl</owners>
Expand Down
31 changes: 31 additions & 0 deletions test/SerialPortStreamTest/SerialPortStreamRxTxTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -463,10 +463,41 @@ await Task.Run(async () => {
await serial.WriteAsync(bytes, 0, bytes.Length);
await serial.FlushAsync();
});
await Task.Delay(100);
} finally {
if (serial != null) serial.Dispose();
}
}

[Test]
[Timeout(2000)] // We abort the test after timeout. This tests the blocking behavior in ReadAsync and the test will fail if ReadAsync blocks.
public async Task ReadAndWriteAsync()
{
using (var serialPortStreamWrite = new SerialPortStream(SourcePort, 9600, 8, Parity.None, StopBits.One))
using (var serialPortStreamRead = new SerialPortStream(DestPort, 9600, 8, Parity.None, StopBits.One))
{
serialPortStreamWrite.Open();
serialPortStreamRead.Open();

var buffer = new byte[1024];
var readTask = Task.Run(async () => {
int expected = 3;
while (expected > 0) {
int r = await serialPortStreamRead.ReadAsync(buffer, 0, buffer.Length);
expected -= r;
}
});

var bytes = new byte[] { 0x01, 0x02, 0x03 };
await serialPortStreamWrite.WriteAsync(bytes, 0, bytes.Length);
await serialPortStreamWrite.FlushAsync();
await readTask;

Assert.That(buffer[0], Is.EqualTo(0x01));
Assert.That(buffer[1], Is.EqualTo(0x02));
Assert.That(buffer[2], Is.EqualTo(0x03));
}
}
#endif

private byte[] ReceiveData(SerialPortStream sp, int size)
Expand Down

0 comments on commit 25a621b

Please sign in to comment.