Skip to content

Commit

Permalink
Flush request body stream at start of client and duplex streamin… (#418)
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesNK authored Jul 30, 2019
1 parent 6e9cead commit 80cb82e
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 26 deletions.
17 changes: 13 additions & 4 deletions build_and_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,26 @@ source activate.sh

echo "Building solution"

dotnet build
dotnet build -c Release

echo "Testing solution"

test_projects=( $( ls test/**/*Tests.csproj ) )

for test_project in "${test_projects[@]}"
do
# Capturing test diagnostic logs because of hanging build
# https://github.com/grpc/grpc-dotnet/pull/363
dotnet test $test_project --no-build --diag:artifacts/${test_project##*/}.log.txt
# "dotnet test" is hanging when it writes to console for an unknown reason
# Tracking issue at https://github.com/microsoft/vstest/issues/2080
# Write test output to a text file and then write the text file to console as a workaround
{
dotnet test $test_project -c Release -v n --no-build &> ${test_project##*/}.log.txt &&
echo "Success" &&
cat ${test_project##*/}.log.txt
} || {
echo "Failure" &&
cat ${test_project##*/}.log.txt &&
exit 1
}
done

echo "Finished"
11 changes: 9 additions & 2 deletions src/Grpc.Net.Client/Internal/GrpcCall.cs
Original file line number Diff line number Diff line change
Expand Up @@ -609,10 +609,17 @@ private HttpContentClientStreamWriter<TRequest, TResponse> CreateWriter(HttpRequ
_writeCompleteTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

message.Content = new PushStreamContent(
(stream) =>
async stream =>
{
// Immediately flush request stream to send headers
// https://github.com/dotnet/corefx/issues/39586#issuecomment-516210081
await stream.FlushAsync().ConfigureAwait(false);
// Pass request stream to writer
_writeStreamTcs.TrySetResult(stream);
return _writeCompleteTcs.Task;
// Wait for the writer to report it is complete
await _writeCompleteTcs.Task.ConfigureAwait(false);
},
GrpcProtocolConstants.GrpcContentTypeHeaderValue);

Expand Down
18 changes: 9 additions & 9 deletions test/FunctionalTests/Client/EventSourceTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public async Task UnaryMethod_SuccessfulCall_PollingCountersUpdatedCorrectly()

async Task<HelloReply> UnarySuccess(HelloRequest request, ServerCallContext context)
{
await tcs.Task;
await tcs.Task.DefaultTimeout();

return new HelloReply();
}
Expand Down Expand Up @@ -119,7 +119,7 @@ public async Task UnaryMethod_ErrorCall_PollingCountersUpdatedCorrectly()

async Task<HelloReply> UnaryError(HelloRequest request, ServerCallContext context)
{
await tcs.Task;
await tcs.Task.DefaultTimeout();

throw new Exception("Error!");
}
Expand Down Expand Up @@ -233,13 +233,13 @@ public async Task DuplexStreamingMethod_Success_PollingCountersUpdatedCorrectly(
{
async Task DuplexStreamingMethod(IAsyncStreamReader<HelloRequest> requestStream, IServerStreamWriter<HelloReply> responseStream, ServerCallContext context)
{
while (await requestStream.MoveNext())
while (await requestStream.MoveNext().DefaultTimeout())
{

}

await responseStream.WriteAsync(new HelloReply { Message = "Message 1" });
await responseStream.WriteAsync(new HelloReply { Message = "Message 2" });
await responseStream.WriteAsync(new HelloReply { Message = "Message 1" }).DefaultTimeout();
await responseStream.WriteAsync(new HelloReply { Message = "Message 2" }).DefaultTimeout();
}

// Arrange
Expand Down Expand Up @@ -267,11 +267,11 @@ async Task DuplexStreamingMethod(IAsyncStreamReader<HelloRequest> requestStream,
["messages-received"] = 0,
}).DefaultTimeout();

await call.RequestStream.WriteAsync(new HelloRequest { Name = "Name 1" });
await call.RequestStream.WriteAsync(new HelloRequest { Name = "Name 2" });
await call.RequestStream.CompleteAsync();
await call.RequestStream.WriteAsync(new HelloRequest { Name = "Name 1" }).DefaultTimeout();
await call.RequestStream.WriteAsync(new HelloRequest { Name = "Name 2" }).DefaultTimeout();
await call.RequestStream.CompleteAsync().DefaultTimeout();

while (await call.ResponseStream.MoveNext())
while (await call.ResponseStream.MoveNext().DefaultTimeout())
{
}

Expand Down
32 changes: 29 additions & 3 deletions test/FunctionalTests/Client/StreamingTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,25 @@ public async Task DuplexStream_SendLargeFileBatchedAndRecieveLargeFileBatched_Su
const int BatchSize = 1024 * 64; // 64 KB

var writeCount = Math.Min(data.Length - sent, BatchSize);
var finalWrite = sent + writeCount == data.Length;

await call.RequestStream.WriteAsync(new DataMessage
{
Data = ByteString.CopyFrom(data, sent, writeCount),
FinalSegment = finalWrite
Data = ByteString.CopyFrom(data, sent, writeCount)
}).DefaultTimeout();

sent += writeCount;

Logger.LogInformation($"Sent {sent} bytes");
}

await call.RequestStream.CompleteAsync().DefaultTimeout();

var ms = new MemoryStream();
while (await call.ResponseStream.MoveNext(CancellationToken.None).DefaultTimeout())
{
ms.Write(call.ResponseStream.Current.Data.Span);

Logger.LogInformation($"Received {ms.Length} bytes");
}

// Assert
Expand Down Expand Up @@ -121,6 +126,7 @@ public async Task ClientStream_SendLargeFileBatchedAndRecieveLargeFileBatched_Su
}

[Test]
[Ignore("Waiting on fix from https://github.com/dotnet/corefx/issues/39586")]
public async Task DuplexStream_SendToUnimplementedMethod_ThrowError()
{
SetExpectedErrorsFilter(writeContext =>
Expand Down Expand Up @@ -157,5 +163,25 @@ await call.RequestStream.WriteAsync(new UnimplementeDataMessage
// Assert
Assert.AreEqual(StatusCode.Cancelled, ex.StatusCode);
}

[Test]
[Ignore("Waiting on fix from https://github.com/dotnet/corefx/issues/39586")]
public async Task DuplexStream_SendToUnimplementedMethodAfterResponseReceived_Hang()
{
// Arrange
var client = GrpcClient.Create<UnimplementedService.UnimplementedServiceClient>(Fixture.Client, LoggerFactory);

for (int i = 0; i < 1000; i++)
{
Logger.LogInformation($"ITERATION {i}");

// Act
var call = client.DuplexData();

// Response will only be headers so the call is "done" on the server side
await call.ResponseHeadersAsync.DefaultTimeout();
await call.RequestStream.CompleteAsync();
}
}
}
}
18 changes: 11 additions & 7 deletions testassets/FunctionalTestsWebsite/Services/StreamService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,20 @@
using System.Threading.Tasks;
using Google.Protobuf;
using Grpc.Core;
using Microsoft.Extensions.Logging;
using Streaming;

namespace FunctionalTestsWebsite.Services
{
public class StreamService : Streaming.StreamService.StreamServiceBase
{
private readonly ILogger _logger;

public StreamService(ILoggerFactory loggerFactory)
{
_logger = loggerFactory.CreateLogger<StreamService>();
}

public override async Task DuplexData(
IAsyncStreamReader<DataMessage> requestStream,
IServerStreamWriter<DataMessage> responseStream,
Expand All @@ -38,10 +46,7 @@ public override async Task DuplexData(
while (await requestStream.MoveNext(CancellationToken.None))
{
ms.Write(requestStream.Current.Data.Span);
if (requestStream.Current.FinalSegment)
{
break;
}
_logger.LogInformation($"Received {ms.Length} bytes");
}

// Write back to client in batches
Expand All @@ -52,14 +57,13 @@ public override async Task DuplexData(
const int BatchSize = 1024 * 64; // 64 KB

var writeCount = Math.Min(data.Length - sent, BatchSize);
var finalWrite = sent + writeCount == data.Length;
await responseStream.WriteAsync(new DataMessage
{
Data = ByteString.CopyFrom(data, sent, writeCount),
FinalSegment = finalWrite
Data = ByteString.CopyFrom(data, sent, writeCount)
});

sent += writeCount;
_logger.LogInformation($"Sent {sent} bytes");
}
}

Expand Down
1 change: 0 additions & 1 deletion testassets/Proto/streaming.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ service StreamService {
message DataMessage {
bytes data = 1;
int32 serverDelayMilliseconds = 2;
bool finalSegment = 3;
}

message DataComplete {
Expand Down

0 comments on commit 80cb82e

Please sign in to comment.