From 80cb82e482ec7226ec1019f2720ddc792be96260 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Wed, 31 Jul 2019 08:38:34 +1200 Subject: [PATCH] =?UTF-8?q?Flush=20request=20body=20stream=20at=20start=20?= =?UTF-8?q?of=20client=20and=20duplex=20streamin=E2=80=A6=20(#418)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build_and_test.sh | 17 +++++++--- src/Grpc.Net.Client/Internal/GrpcCall.cs | 11 +++++-- .../Client/EventSourceTests.cs | 18 +++++------ test/FunctionalTests/Client/StreamingTests.cs | 32 +++++++++++++++++-- .../Services/StreamService.cs | 18 +++++++---- testassets/Proto/streaming.proto | 1 - 6 files changed, 71 insertions(+), 26 deletions(-) diff --git a/build_and_test.sh b/build_and_test.sh index ee7377510..371f1af51 100755 --- a/build_and_test.sh +++ b/build_and_test.sh @@ -19,7 +19,7 @@ source activate.sh echo "Building solution" -dotnet build +dotnet build -c Release echo "Testing solution" @@ -27,9 +27,18 @@ test_projects=( $( ls test/**/*Tests.csproj ) ) for test_project in "${test_projects[@]}" do - # Capturing test diagnostic logs because of hanging build - # https://github.com/grpc/grpc-dotnet/pull/363 - dotnet test $test_project --no-build --diag:artifacts/${test_project##*/}.log.txt + # "dotnet test" is hanging when it writes to console for an unknown reason + # Tracking issue at https://github.com/microsoft/vstest/issues/2080 + # Write test output to a text file and then write the text file to console as a workaround + { + dotnet test $test_project -c Release -v n --no-build &> ${test_project##*/}.log.txt && + echo "Success" && + cat ${test_project##*/}.log.txt + } || { + echo "Failure" && + cat ${test_project##*/}.log.txt && + exit 1 + } done echo "Finished" \ No newline at end of file diff --git a/src/Grpc.Net.Client/Internal/GrpcCall.cs b/src/Grpc.Net.Client/Internal/GrpcCall.cs index a9a8d3e5f..c29db4107 100644 --- a/src/Grpc.Net.Client/Internal/GrpcCall.cs +++ b/src/Grpc.Net.Client/Internal/GrpcCall.cs @@ -609,10 +609,17 @@ private HttpContentClientStreamWriter CreateWriter(HttpRequ _writeCompleteTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); message.Content = new PushStreamContent( - (stream) => + async stream => { + // Immediately flush request stream to send headers + // https://github.com/dotnet/corefx/issues/39586#issuecomment-516210081 + await stream.FlushAsync().ConfigureAwait(false); + + // Pass request stream to writer _writeStreamTcs.TrySetResult(stream); - return _writeCompleteTcs.Task; + + // Wait for the writer to report it is complete + await _writeCompleteTcs.Task.ConfigureAwait(false); }, GrpcProtocolConstants.GrpcContentTypeHeaderValue); diff --git a/test/FunctionalTests/Client/EventSourceTests.cs b/test/FunctionalTests/Client/EventSourceTests.cs index 25476056b..61e4b4171 100644 --- a/test/FunctionalTests/Client/EventSourceTests.cs +++ b/test/FunctionalTests/Client/EventSourceTests.cs @@ -52,7 +52,7 @@ public async Task UnaryMethod_SuccessfulCall_PollingCountersUpdatedCorrectly() async Task UnarySuccess(HelloRequest request, ServerCallContext context) { - await tcs.Task; + await tcs.Task.DefaultTimeout(); return new HelloReply(); } @@ -119,7 +119,7 @@ public async Task UnaryMethod_ErrorCall_PollingCountersUpdatedCorrectly() async Task UnaryError(HelloRequest request, ServerCallContext context) { - await tcs.Task; + await tcs.Task.DefaultTimeout(); throw new Exception("Error!"); } @@ -233,13 +233,13 @@ public async Task DuplexStreamingMethod_Success_PollingCountersUpdatedCorrectly( { async Task DuplexStreamingMethod(IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context) { - while (await requestStream.MoveNext()) + while (await requestStream.MoveNext().DefaultTimeout()) { } - await responseStream.WriteAsync(new HelloReply { Message = "Message 1" }); - await responseStream.WriteAsync(new HelloReply { Message = "Message 2" }); + await responseStream.WriteAsync(new HelloReply { Message = "Message 1" }).DefaultTimeout(); + await responseStream.WriteAsync(new HelloReply { Message = "Message 2" }).DefaultTimeout(); } // Arrange @@ -267,11 +267,11 @@ async Task DuplexStreamingMethod(IAsyncStreamReader requestStream, ["messages-received"] = 0, }).DefaultTimeout(); - await call.RequestStream.WriteAsync(new HelloRequest { Name = "Name 1" }); - await call.RequestStream.WriteAsync(new HelloRequest { Name = "Name 2" }); - await call.RequestStream.CompleteAsync(); + await call.RequestStream.WriteAsync(new HelloRequest { Name = "Name 1" }).DefaultTimeout(); + await call.RequestStream.WriteAsync(new HelloRequest { Name = "Name 2" }).DefaultTimeout(); + await call.RequestStream.CompleteAsync().DefaultTimeout(); - while (await call.ResponseStream.MoveNext()) + while (await call.ResponseStream.MoveNext().DefaultTimeout()) { } diff --git a/test/FunctionalTests/Client/StreamingTests.cs b/test/FunctionalTests/Client/StreamingTests.cs index dc90b9d46..c7e6fcf7c 100644 --- a/test/FunctionalTests/Client/StreamingTests.cs +++ b/test/FunctionalTests/Client/StreamingTests.cs @@ -55,20 +55,25 @@ public async Task DuplexStream_SendLargeFileBatchedAndRecieveLargeFileBatched_Su const int BatchSize = 1024 * 64; // 64 KB var writeCount = Math.Min(data.Length - sent, BatchSize); - var finalWrite = sent + writeCount == data.Length; + await call.RequestStream.WriteAsync(new DataMessage { - Data = ByteString.CopyFrom(data, sent, writeCount), - FinalSegment = finalWrite + Data = ByteString.CopyFrom(data, sent, writeCount) }).DefaultTimeout(); sent += writeCount; + + Logger.LogInformation($"Sent {sent} bytes"); } + await call.RequestStream.CompleteAsync().DefaultTimeout(); + var ms = new MemoryStream(); while (await call.ResponseStream.MoveNext(CancellationToken.None).DefaultTimeout()) { ms.Write(call.ResponseStream.Current.Data.Span); + + Logger.LogInformation($"Received {ms.Length} bytes"); } // Assert @@ -121,6 +126,7 @@ public async Task ClientStream_SendLargeFileBatchedAndRecieveLargeFileBatched_Su } [Test] + [Ignore("Waiting on fix from https://github.com/dotnet/corefx/issues/39586")] public async Task DuplexStream_SendToUnimplementedMethod_ThrowError() { SetExpectedErrorsFilter(writeContext => @@ -157,5 +163,25 @@ await call.RequestStream.WriteAsync(new UnimplementeDataMessage // Assert Assert.AreEqual(StatusCode.Cancelled, ex.StatusCode); } + + [Test] + [Ignore("Waiting on fix from https://github.com/dotnet/corefx/issues/39586")] + public async Task DuplexStream_SendToUnimplementedMethodAfterResponseReceived_Hang() + { + // Arrange + var client = GrpcClient.Create(Fixture.Client, LoggerFactory); + + for (int i = 0; i < 1000; i++) + { + Logger.LogInformation($"ITERATION {i}"); + + // Act + var call = client.DuplexData(); + + // Response will only be headers so the call is "done" on the server side + await call.ResponseHeadersAsync.DefaultTimeout(); + await call.RequestStream.CompleteAsync(); + } + } } } diff --git a/testassets/FunctionalTestsWebsite/Services/StreamService.cs b/testassets/FunctionalTestsWebsite/Services/StreamService.cs index e7e070a51..eb8d8beba 100644 --- a/testassets/FunctionalTestsWebsite/Services/StreamService.cs +++ b/testassets/FunctionalTestsWebsite/Services/StreamService.cs @@ -22,12 +22,20 @@ using System.Threading.Tasks; using Google.Protobuf; using Grpc.Core; +using Microsoft.Extensions.Logging; using Streaming; namespace FunctionalTestsWebsite.Services { public class StreamService : Streaming.StreamService.StreamServiceBase { + private readonly ILogger _logger; + + public StreamService(ILoggerFactory loggerFactory) + { + _logger = loggerFactory.CreateLogger(); + } + public override async Task DuplexData( IAsyncStreamReader requestStream, IServerStreamWriter responseStream, @@ -38,10 +46,7 @@ public override async Task DuplexData( while (await requestStream.MoveNext(CancellationToken.None)) { ms.Write(requestStream.Current.Data.Span); - if (requestStream.Current.FinalSegment) - { - break; - } + _logger.LogInformation($"Received {ms.Length} bytes"); } // Write back to client in batches @@ -52,14 +57,13 @@ public override async Task DuplexData( const int BatchSize = 1024 * 64; // 64 KB var writeCount = Math.Min(data.Length - sent, BatchSize); - var finalWrite = sent + writeCount == data.Length; await responseStream.WriteAsync(new DataMessage { - Data = ByteString.CopyFrom(data, sent, writeCount), - FinalSegment = finalWrite + Data = ByteString.CopyFrom(data, sent, writeCount) }); sent += writeCount; + _logger.LogInformation($"Sent {sent} bytes"); } } diff --git a/testassets/Proto/streaming.proto b/testassets/Proto/streaming.proto index 90a9a3527..ed7c02bbd 100644 --- a/testassets/Proto/streaming.proto +++ b/testassets/Proto/streaming.proto @@ -24,7 +24,6 @@ service StreamService { message DataMessage { bytes data = 1; int32 serverDelayMilliseconds = 2; - bool finalSegment = 3; } message DataComplete {