Skip to content

Commit

Permalink
Improve response handling and add LeaveOpen property
Browse files Browse the repository at this point in the history
Add `LeaveOpen` property to `StreamResponse` and `TransportResponse` classes for flexible stream management.
  • Loading branch information
stevejgordon committed Oct 29, 2024
1 parent c030d63 commit 1e7ba25
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 77 deletions.
95 changes: 47 additions & 48 deletions src/Elastic.Transport/Components/Pipeline/DefaultResponseBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -224,70 +224,69 @@ private async ValueTask<TResponse> SetBodyCoreAsync<TResponse>(bool isAsync,
details.ResponseBodyInBytes = bytes;
}

var isStreamResponse = typeof(TResponse) == typeof(StreamResponse);
if (SetSpecialTypes<TResponse>(mimeType, bytes, responseStream, requestData.MemoryStreamFactory, out var r)) return r;

using (isStreamResponse ? Stream.Null : responseStream ??= Stream.Null)
{
if (SetSpecialTypes<TResponse>(mimeType, bytes, responseStream, requestData.MemoryStreamFactory, out var r)) return r;
if (details.HttpStatusCode.HasValue &&
requestData.SkipDeserializationForStatusCodes.Contains(details.HttpStatusCode.Value))
return null;

if (details.HttpStatusCode.HasValue &&
requestData.SkipDeserializationForStatusCodes.Contains(details.HttpStatusCode.Value))
return null;
var serializer = requestData.ConnectionSettings.RequestResponseSerializer;

var serializer = requestData.ConnectionSettings.RequestResponseSerializer;
TResponse response;
if (requestData.CustomResponseBuilder != null)
{
var beforeTicks = Stopwatch.GetTimestamp();

TResponse response;
if (requestData.CustomResponseBuilder != null)
{
var beforeTicks = Stopwatch.GetTimestamp();
if (isAsync)
response = await requestData.CustomResponseBuilder
.DeserializeResponseAsync(serializer, details, responseStream, cancellationToken)
.ConfigureAwait(false) as TResponse;
else
response = requestData.CustomResponseBuilder
.DeserializeResponse(serializer, details, responseStream) as TResponse;

if (isAsync)
response = await requestData.CustomResponseBuilder
.DeserializeResponseAsync(serializer, details, responseStream, cancellationToken)
.ConfigureAwait(false) as TResponse;
else
response = requestData.CustomResponseBuilder
.DeserializeResponse(serializer, details, responseStream) as TResponse;
var deserializeResponseMs = (Stopwatch.GetTimestamp() - beforeTicks) / (Stopwatch.Frequency / 1000);
if (deserializeResponseMs > OpenTelemetry.MinimumMillisecondsToEmitTimingSpanAttribute && OpenTelemetry.CurrentSpanIsElasticTransportOwnedHasListenersAndAllDataRequested)
Activity.Current?.SetTag(OpenTelemetryAttributes.ElasticTransportDeserializeResponseMs, deserializeResponseMs);

var deserializeResponseMs = (Stopwatch.GetTimestamp() - beforeTicks) / (Stopwatch.Frequency / 1000);
if (deserializeResponseMs > OpenTelemetry.MinimumMillisecondsToEmitTimingSpanAttribute && OpenTelemetry.CurrentSpanIsElasticTransportOwnedHasListenersAndAllDataRequested)
Activity.Current?.SetTag(OpenTelemetryAttributes.ElasticTransportDeserializeResponseMs, deserializeResponseMs);
return response;
}

// TODO: Handle empty data in a nicer way as throwing exceptions has a cost we'd like to avoid!
// ie. check content-length (add to ApiCallDetails)? Content-length cannot be retrieved from a GZip content stream which is annoying.
try
{
if (requiresErrorDeserialization && TryGetError(details, requestData, responseStream, out var error) && error.HasError())
{
response = new TResponse();
SetErrorOnResponse(response, error);
return response;
}

// TODO: Handle empty data in a nicer way as throwing exceptions has a cost we'd like to avoid!
// ie. check content-length (add to ApiCallDetails)? Content-length cannot be retrieved from a GZip content stream which is annoying.
try
{
if (requiresErrorDeserialization && TryGetError(details, requestData, responseStream, out var error) && error.HasError())
{
response = new TResponse();
SetErrorOnResponse(response, error);
return response;
}
if (!requestData.ValidateResponseContentType(mimeType))
return default;

if (!requestData.ValidateResponseContentType(mimeType))
return default;
var beforeTicks = Stopwatch.GetTimestamp();

var beforeTicks = Stopwatch.GetTimestamp();
if (isAsync)
response = await serializer.DeserializeAsync<TResponse>(responseStream, cancellationToken).ConfigureAwait(false);
else
response = serializer.Deserialize<TResponse>(responseStream);

if (isAsync)
response = await serializer.DeserializeAsync<TResponse>(responseStream, cancellationToken).ConfigureAwait(false);
else
response = serializer.Deserialize<TResponse>(responseStream);
var deserializeResponseMs = (Stopwatch.GetTimestamp() - beforeTicks) / (Stopwatch.Frequency / 1000);

var deserializeResponseMs = (Stopwatch.GetTimestamp() - beforeTicks) / (Stopwatch.Frequency / 1000);
if (deserializeResponseMs > OpenTelemetry.MinimumMillisecondsToEmitTimingSpanAttribute && OpenTelemetry.CurrentSpanIsElasticTransportOwnedHasListenersAndAllDataRequested)
Activity.Current?.SetTag(OpenTelemetryAttributes.ElasticTransportDeserializeResponseMs, deserializeResponseMs);

if (deserializeResponseMs > OpenTelemetry.MinimumMillisecondsToEmitTimingSpanAttribute && OpenTelemetry.CurrentSpanIsElasticTransportOwnedHasListenersAndAllDataRequested)
Activity.Current?.SetTag(OpenTelemetryAttributes.ElasticTransportDeserializeResponseMs, deserializeResponseMs);
if (!response.LeaveOpen)
responseStream.Dispose();

return response;
}
catch (JsonException ex) when (ex.Message.Contains("The input does not contain any JSON tokens"))
{
return default;
}
return response;
}
catch (JsonException ex) when (ex.Message.Contains("The input does not contain any JSON tokens"))
{
responseStream.Dispose();
return default;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,43 +154,43 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, Req
ex = e;
}

var isStreamResponse = typeof(TResponse) == typeof(StreamResponse);
TResponse response;

using (isStreamResponse ? DiagnosticSources.SingletonDisposable : receivedResponse)
try
{
TResponse response;
if (isAsync)
response = await requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponseAsync<TResponse>
(requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats, cancellationToken)
.ConfigureAwait(false);
else
response = requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponse<TResponse>
(requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats);

try
{
if (isAsync)
response = await requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponseAsync<TResponse>
(requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats, cancellationToken)
.ConfigureAwait(false);
else
response = requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponse<TResponse>
(requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats);
// Defer disposal of the response message
if (response is StreamResponse sr)
sr.Finalizer = () => receivedResponse.Dispose();

// Defer disposal of the response message
if (response is StreamResponse sr)
sr.Finalizer = () => receivedResponse.Dispose();
if (!OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners || (!(Activity.Current?.IsAllDataRequested ?? false)))
return response;

if (!OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners || (!(Activity.Current?.IsAllDataRequested ?? false)))
return response;
var attributes = requestData.ConnectionSettings.ProductRegistration.ParseOpenTelemetryAttributesFromApiCallDetails(response.ApiCallDetails);

var attributes = requestData.ConnectionSettings.ProductRegistration.ParseOpenTelemetryAttributesFromApiCallDetails(response.ApiCallDetails);
if (attributes is null) return response;

if (attributes is null) return response;
foreach (var attribute in attributes)
Activity.Current?.SetTag(attribute.Key, attribute.Value);

foreach (var attribute in attributes)
Activity.Current?.SetTag(attribute.Key, attribute.Value);
// Unless indicated otherwise by the TransportResponse, we've now handled the response stream, so we can dispose of the HttpResponseMessage
// to release the connection.
if (!response.LeaveOpen)
receivedResponse.Dispose();

return response;
}
catch
{
receivedResponse.Dispose(); // if there's an exception, ensure we always release the response so that the connection is freed.
throw;
}
return response;
}
catch
{
receivedResponse.Dispose(); // if there's an exception, ensure we always release the response so that the connection is freed.
throw;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, Req
}
}

if (!response.LeaveOpen)
receivedResponse.Dispose();

return response;
}
catch
Expand Down
2 changes: 2 additions & 0 deletions src/Elastic.Transport/Responses/Special/StreamResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public StreamResponse(Stream body, string? mimeType)
MimeType = mimeType ?? string.Empty;
}

internal override bool LeaveOpen => true;

/// <summary>
/// Disposes the underlying stream.
/// </summary>
Expand Down
4 changes: 3 additions & 1 deletion src/Elastic.Transport/Responses/TransportResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Elastic.Transport;

/// <summary>
/// A response from an Elastic product including details about the request/response life cycle. Base class for the built in low level response
/// types, <see cref="StringResponse"/>, <see cref="BytesResponse"/>, <see cref="DynamicResponse"/> and <see cref="VoidResponse"/>
/// types, <see cref="StringResponse"/>, <see cref="BytesResponse"/>, <see cref="DynamicResponse"/>, <see cref="StreamResponse"/> and <see cref="VoidResponse"/>
/// </summary>
public abstract class TransportResponse<T> : TransportResponse
{
Expand All @@ -34,5 +34,7 @@ public abstract class TransportResponse
public override string ToString() => ApiCallDetails?.DebugInformation
// ReSharper disable once ConstantNullCoalescingCondition
?? $"{nameof(ApiCallDetails)} not set, likely a bug, reverting to default ToString(): {base.ToString()}";

internal virtual bool LeaveOpen { get; } = false;
}

0 comments on commit 1e7ba25

Please sign in to comment.