Skip to content

Commit

Permalink
Refactor OpenTelemetryData to Action<Activity>
Browse files Browse the repository at this point in the history
  • Loading branch information
stevejgordon committed Nov 20, 2024
1 parent 86a1ab5 commit 5abd768
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ internal VirtualizedCluster(TransportConfigurationDescriptor settings)
_syncCall = (t, r) => t.Request<VirtualResponse>(
path: RootPath,
postData: PostData.Serializable(new { }),
openTelemetryData: default,
null,
localConfiguration: r?.Invoke(new RequestConfigurationDescriptor())
);
_asyncCall = async (t, r) =>
Expand All @@ -41,7 +41,7 @@ internal VirtualizedCluster(TransportConfigurationDescriptor settings)
(
path: RootPath,
postData: PostData.Serializable(new { }),
openTelemetryData: default,
null,
localConfiguration: r?.Invoke(new RequestConfigurationDescriptor()),
CancellationToken.None
).ConfigureAwait(false);
Expand Down
19 changes: 4 additions & 15 deletions src/Elastic.Transport/Diagnostics/OpenTelemetry/OpenTelemetry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
using System;
using System.Diagnostics;

#pragma warning disable IDE0130 // Namespace does not match folder structure
namespace Elastic.Transport.Diagnostics;
#pragma warning restore IDE0130 // Namespace does not match folder structure

/// <summary>
/// Activity information for OpenTelemetry instrumentation.
Expand Down Expand Up @@ -37,7 +39,7 @@ public static class OpenTelemetry
internal static bool CurrentSpanIsElasticTransportOwnedHasListenersAndAllDataRequested => ElasticTransportActivitySource.HasListeners() &&
((Activity.Current?.Source.Name.Equals(ElasticTransportActivitySourceName, StringComparison.Ordinal) ?? false) && (Activity.Current?.IsAllDataRequested ?? false));

internal static void SetCommonAttributes(Activity? activity, OpenTelemetryData openTelemetryData, ITransportConfiguration settings)
internal static void SetCommonAttributes(Activity? activity, ITransportConfiguration settings)
{
if (activity is null)
return;
Expand All @@ -51,20 +53,7 @@ internal static void SetCommonAttributes(Activity? activity, OpenTelemetryData o
}

var productSchemaVersion = string.Empty;
if (openTelemetryData.SpanAttributes is not null)
{
foreach (var attribute in openTelemetryData.SpanAttributes)
{
activity?.SetTag(attribute.Key, attribute.Value);

if (attribute.Key.Equals(OpenTelemetryAttributes.DbElasticsearchSchemaUrl, StringComparison.Ordinal))
{
if (attribute.Value is string schemaVersion)
productSchemaVersion = schemaVersion;
}
}
}


// We add the client schema version only when it differs from the product schema version
if (!productSchemaVersion.Equals(OpenTelemetrySchemaVersion, StringComparison.Ordinal))
activity?.SetTag(OpenTelemetryAttributes.ElasticTransportSchemaVersion, OpenTelemetrySchemaVersion);
Expand Down

This file was deleted.

36 changes: 16 additions & 20 deletions src/Elastic.Transport/DistributedTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@
namespace Elastic.Transport;

/// <inheritdoc cref="ITransport{TConfiguration}" />
public sealed class DistributedTransport : DistributedTransport<ITransportConfiguration>
/// <summary>
/// Transport coordinates the client requests over the node pool nodes and is in charge of falling over on
/// different nodes
/// </summary>
/// <param name="configuration">The configuration to use for this transport</param>
public sealed class DistributedTransport(ITransportConfiguration configuration) : DistributedTransport<ITransportConfiguration>(configuration)
{
/// <summary>
/// Transport coordinates the client requests over the node pool nodes and is in charge of falling over on
/// different nodes
/// </summary>
/// <param name="configuration">The configuration to use for this transport</param>
public DistributedTransport(ITransportConfiguration configuration)
: base(configuration) { }
}

/// <inheritdoc cref="ITransport{TConfiguration}" />
Expand Down Expand Up @@ -65,36 +63,36 @@ public DistributedTransport(TConfiguration configuration)
public TResponse Request<TResponse>(
in EndpointPath path,
PostData? data,
in OpenTelemetryData openTelemetryData,
Action<Activity>? configureActivity,
IRequestConfiguration? localConfiguration
) where TResponse : TransportResponse, new() =>
RequestCoreAsync<TResponse>(isAsync: false, path, data, openTelemetryData, localConfiguration)
RequestCoreAsync<TResponse>(isAsync: false, path, data, configureActivity, localConfiguration)
.EnsureCompleted();

/// <inheritdoc cref="ITransport.RequestAsync{TResponse}"/>
public Task<TResponse> RequestAsync<TResponse>(
in EndpointPath path,
PostData? data,
in OpenTelemetryData openTelemetryData,
Action<Activity>? configureActivity,
IRequestConfiguration? localConfiguration,
CancellationToken cancellationToken = default
) where TResponse : TransportResponse, new() =>
RequestCoreAsync<TResponse>(isAsync: true, path, data, openTelemetryData, localConfiguration, cancellationToken)
RequestCoreAsync<TResponse>(isAsync: true, path, data, configureActivity, localConfiguration, cancellationToken)
.AsTask();

private async ValueTask<TResponse> RequestCoreAsync<TResponse>(
bool isAsync,
EndpointPath path,
PostData? data,
OpenTelemetryData openTelemetryData,
Action<Activity>? configureActivity,
IRequestConfiguration? localConfiguration,
CancellationToken cancellationToken = default
) where TResponse : TransportResponse, new()
{
Activity activity = null;

if (OpenTelemetry.ElasticTransportActivitySource.HasListeners())
activity = OpenTelemetry.ElasticTransportActivitySource.StartActivity(openTelemetryData.SpanName ?? path.Method.GetStringValue(),
activity = OpenTelemetry.ElasticTransportActivitySource.StartActivity(path.Method.GetStringValue(),
ActivityKind.Client);

try
Expand Down Expand Up @@ -127,7 +125,7 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(
if (activity is { IsAllDataRequested: true })
{
if (activity.IsAllDataRequested)
OpenTelemetry.SetCommonAttributes(activity, openTelemetryData, Configuration);
OpenTelemetry.SetCommonAttributes(activity, Configuration);

if (Configuration.Authentication is BasicAuthentication basicAuthentication)
activity.SetTag(SemanticConventions.DbUser, basicAuthentication.Username);
Expand All @@ -136,14 +134,12 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(
activity.SetTag(OpenTelemetryAttributes.ElasticTransportProductVersion, Configuration.ProductRegistration.ProductAssemblyVersion);
activity.SetTag(OpenTelemetryAttributes.ElasticTransportVersion, ReflectionVersionInfo.TransportVersion);
activity.SetTag(SemanticConventions.UserAgentOriginal, Configuration.UserAgent.ToString());

if (openTelemetryData.SpanAttributes is not null)
foreach (var attribute in openTelemetryData.SpanAttributes)
activity.SetTag(attribute.Key, attribute.Value);

activity.SetTag(SemanticConventions.HttpRequestMethod, endpoint.Method.GetStringValue());
}

if (configureActivity is not null && activity is not null)
configureActivity.Invoke(activity);

List<PipelineException>? seenExceptions = null;
var attemptedNodes = 0;

Expand Down
11 changes: 6 additions & 5 deletions src/Elastic.Transport/ITransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System.Diagnostics;
using System;
using System.Threading;
using System.Threading.Tasks;
using Elastic.Transport.Diagnostics;

namespace Elastic.Transport;

Expand All @@ -20,15 +21,15 @@ public interface ITransport
/// <typeparam name="TResponse">The type to deserialize the response body into.</typeparam>
/// <param name="path">The path of the request.</param>
/// <param name="postData">The data to be included as the body of the HTTP request.</param>
/// <param name="openTelemetryData">Data to be used to control the OpenTelemetry instrumentation.</param>
/// <param name="configureActivity">An optional <see cref="Action"/> used to configure the <see cref="Activity"/>.</param>
/// <param name="localConfiguration">Per request configuration</param>
/// Allows callers to override completely how `TResponse` should be deserialized to a `TResponse` that implements <see cref="TransportResponse"/> instance.
/// <para>Expert setting only</para>
/// <returns>The deserialized <typeparamref name="TResponse"/>.</returns>
public TResponse Request<TResponse>(
in EndpointPath path,
PostData? postData,
in OpenTelemetryData openTelemetryData,
Action<Activity>? configureActivity,
IRequestConfiguration? localConfiguration
)
where TResponse : TransportResponse, new();
Expand All @@ -40,15 +41,15 @@ public TResponse Request<TResponse>(
/// <param name="path">The path of the request.</param>
/// <param name="postData">The data to be included as the body of the HTTP request.</param>
/// <param name="cancellationToken">The cancellation token to use.</param>
/// <param name="openTelemetryData">Data to be used to control the OpenTelemetry instrumentation.</param>
/// <param name="configureActivity">An optional <see cref="Action"/> used to configure the <see cref="Activity"/>.</param>
/// <param name="localConfiguration">Per request configuration</param>
/// Allows callers to override completely how `TResponse` should be deserialized to a `TResponse` that implements <see cref="TransportResponse"/> instance.
/// <para>Expert setting only</para>
/// <returns>The deserialized <typeparamref name="TResponse"/>.</returns>
public Task<TResponse> RequestAsync<TResponse>(
in EndpointPath path,
PostData? postData,
in OpenTelemetryData openTelemetryData,
Action<Activity>? configureActivity,
IRequestConfiguration? localConfiguration,
CancellationToken cancellationToken = default
)
Expand Down
40 changes: 20 additions & 20 deletions src/Elastic.Transport/ITransportHttpMethodExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,98 +19,98 @@ private static EndpointPath ToEndpointPath(HttpMethod method, string path, Reque
/// <summary>Perform a GET request</summary>
public static TResponse Get<TResponse>(this ITransport<ITransportConfiguration> transport, string path, RequestParameters parameters)
where TResponse : TransportResponse, new() =>
transport.Request<TResponse>(ToEndpointPath(GET, path, parameters, transport.Configuration), postData: null, openTelemetryData: default, null);
transport.Request<TResponse>(ToEndpointPath(GET, path, parameters, transport.Configuration), postData: null, null, null);

/// <summary>Perform a GET request</summary>
public static Task<TResponse> GetAsync<TResponse>(this ITransport<ITransportConfiguration> transport, string path,
RequestParameters parameters, CancellationToken cancellationToken = default)
where TResponse : TransportResponse, new() =>
transport.RequestAsync<TResponse>(ToEndpointPath(GET, path, parameters, transport.Configuration), postData: null, openTelemetryData: default, null, cancellationToken);
transport.RequestAsync<TResponse>(ToEndpointPath(GET, path, parameters, transport.Configuration), postData: null, null, null, cancellationToken);

/// <summary>Perform a GET request</summary>
public static TResponse Get<TResponse>(this ITransport transport, string pathAndQuery)
where TResponse : TransportResponse, new() =>
transport.Request<TResponse>(new EndpointPath(GET, pathAndQuery), postData: null, openTelemetryData: default, null);
transport.Request<TResponse>(new EndpointPath(GET, pathAndQuery), postData: null, null, null);

/// <summary>Perform a GET request</summary>
public static Task<TResponse> GetAsync<TResponse>(this ITransport transport, string pathAndQuery, CancellationToken cancellationToken = default)
where TResponse : TransportResponse, new() =>
transport.RequestAsync<TResponse>(new EndpointPath(GET, pathAndQuery), postData: null, openTelemetryData: default, null, cancellationToken);
transport.RequestAsync<TResponse>(new EndpointPath(GET, pathAndQuery), postData: null, null, null, cancellationToken);

/// <summary>Perform a HEAD request</summary>
public static VoidResponse Head(this ITransport<ITransportConfiguration> transport, string path, RequestParameters parameters)
=> transport.Request<VoidResponse>(ToEndpointPath(HEAD, path, parameters, transport.Configuration), postData: null, openTelemetryData: default, null);
=> transport.Request<VoidResponse>(ToEndpointPath(HEAD, path, parameters, transport.Configuration), postData: null, null, null);

/// <summary>Perform a HEAD request</summary>
public static Task<VoidResponse> HeadAsync(this ITransport<ITransportConfiguration> transport, string path, RequestParameters parameters, CancellationToken cancellationToken = default)
=> transport.RequestAsync<VoidResponse>(ToEndpointPath(HEAD, path, parameters, transport.Configuration), postData: null, openTelemetryData: default, null, cancellationToken);
=> transport.RequestAsync<VoidResponse>(ToEndpointPath(HEAD, path, parameters, transport.Configuration), postData: null, null, null, cancellationToken);

/// <summary>Perform a HEAD request</summary>
public static VoidResponse Head(this ITransport transport, string pathAndQuery)
=> transport.Request<VoidResponse>(new EndpointPath(HEAD, pathAndQuery), postData: null, openTelemetryData: default, null);
=> transport.Request<VoidResponse>(new EndpointPath(HEAD, pathAndQuery), postData: null, null, null);

/// <summary>Perform a HEAD request</summary>
public static Task<VoidResponse> HeadAsync(this ITransport transport, string pathAndQuery, CancellationToken cancellationToken = default)
=> transport.RequestAsync<VoidResponse>(new EndpointPath(HEAD, pathAndQuery), postData: null, openTelemetryData: default, null, cancellationToken);
=> transport.RequestAsync<VoidResponse>(new EndpointPath(HEAD, pathAndQuery), postData: null, null, null, cancellationToken);

/// <summary>Perform a POST request</summary>
public static TResponse Post<TResponse>(this ITransport<ITransportConfiguration> transport, string path, PostData data, RequestParameters parameters)
where TResponse : TransportResponse, new() =>
transport.Request<TResponse>(ToEndpointPath(POST, path, parameters, transport.Configuration), data, openTelemetryData: default, null);
transport.Request<TResponse>(ToEndpointPath(POST, path, parameters, transport.Configuration), data, null, null);

/// <summary>Perform a POST request</summary>
public static Task<TResponse> PostAsync<TResponse>(this ITransport<ITransportConfiguration> transport, string path, PostData data,
RequestParameters parameters, CancellationToken cancellationToken = default)
where TResponse : TransportResponse, new() =>
transport.RequestAsync<TResponse>(ToEndpointPath(POST, path, parameters, transport.Configuration), data, openTelemetryData: default, null, cancellationToken);
transport.RequestAsync<TResponse>(ToEndpointPath(POST, path, parameters, transport.Configuration), data, null, null, cancellationToken);

/// <summary>Perform a POST request</summary>
public static TResponse Post<TResponse>(this ITransport transport, string pathAndQuery, PostData data)
where TResponse : TransportResponse, new() =>
transport.Request<TResponse>(new EndpointPath(POST, pathAndQuery), data, openTelemetryData: default, null);
transport.Request<TResponse>(new EndpointPath(POST, pathAndQuery), data, null, null);

/// <summary>Perform a POST request</summary>
public static Task<TResponse> PostAsync<TResponse>(this ITransport transport, string pathAndQuery, PostData data, CancellationToken cancellationToken = default)
where TResponse : TransportResponse, new() =>
transport.RequestAsync<TResponse>(new EndpointPath(POST, pathAndQuery), data, openTelemetryData: default, null, cancellationToken);
transport.RequestAsync<TResponse>(new EndpointPath(POST, pathAndQuery), data, null, null, cancellationToken);

/// <summary>Perform a PUT request</summary>
public static TResponse Put<TResponse>(this ITransport<ITransportConfiguration> transport, string path, PostData data, RequestParameters parameters)
where TResponse : TransportResponse, new() =>
transport.Request<TResponse>(ToEndpointPath(PUT, path, parameters, transport.Configuration), data, openTelemetryData: default, null);
transport.Request<TResponse>(ToEndpointPath(PUT, path, parameters, transport.Configuration), data, null, null);

/// <summary>Perform a PUT request</summary>
public static Task<TResponse> PutAsync<TResponse>(this ITransport<ITransportConfiguration> transport, string path, PostData data, RequestParameters parameters, CancellationToken cancellationToken = default)
where TResponse : TransportResponse, new() =>
transport.RequestAsync<TResponse>(ToEndpointPath(PUT, path, parameters, transport.Configuration), data, openTelemetryData: default, null, cancellationToken);
transport.RequestAsync<TResponse>(ToEndpointPath(PUT, path, parameters, transport.Configuration), data, null, null, cancellationToken);

/// <summary>Perform a PUT request</summary>
public static TResponse Put<TResponse>(this ITransport transport, string pathAndQuery, PostData data)
where TResponse : TransportResponse, new() =>
transport.Request<TResponse>(new EndpointPath(PUT, pathAndQuery), data, openTelemetryData: default, null);
transport.Request<TResponse>(new EndpointPath(PUT, pathAndQuery), data, null, null);

/// <summary>Perform a PUT request</summary>
public static Task<TResponse> PutAsync<TResponse>(this ITransport transport, string pathAndQuery, PostData data, CancellationToken cancellationToken = default)
where TResponse : TransportResponse, new() =>
transport.RequestAsync<TResponse>(new EndpointPath(PUT, pathAndQuery), data, openTelemetryData: default, null, cancellationToken);
transport.RequestAsync<TResponse>(new EndpointPath(PUT, pathAndQuery), data, null, null, cancellationToken);

/// <summary>Perform a DELETE request</summary>
public static TResponse Delete<TResponse>(this ITransport<ITransportConfiguration> transport, string path, RequestParameters parameters, PostData? data = null)
where TResponse : TransportResponse, new() =>
transport.Request<TResponse>(ToEndpointPath(DELETE, path, parameters, transport.Configuration), data, openTelemetryData: default, null);
transport.Request<TResponse>(ToEndpointPath(DELETE, path, parameters, transport.Configuration), data, null, null);

/// <summary>Perform a DELETE request</summary>
public static Task<TResponse> DeleteAsync<TResponse>(this ITransport<ITransportConfiguration> transport, string path, RequestParameters parameters, PostData? data = null, CancellationToken cancellationToken = default)
where TResponse : TransportResponse, new() =>
transport.RequestAsync<TResponse>(ToEndpointPath(DELETE, path, parameters, transport.Configuration), data, openTelemetryData: default, null, cancellationToken);
transport.RequestAsync<TResponse>(ToEndpointPath(DELETE, path, parameters, transport.Configuration), data, null, null, cancellationToken);

/// <summary>Perform a DELETE request</summary>
public static TResponse Delete<TResponse>(this ITransport transport, string pathAndQuery, PostData? data = null)
where TResponse : TransportResponse, new() =>
transport.Request<TResponse>(new EndpointPath(DELETE, pathAndQuery), data, openTelemetryData: default, null);
transport.Request<TResponse>(new EndpointPath(DELETE, pathAndQuery), data, null, null);

/// <summary>Perform a DELETE request</summary>
public static Task<TResponse> DeleteAsync<TResponse>(this ITransport transport, string pathAndQuery, PostData? data = null, CancellationToken cancellationToken = default)
where TResponse : TransportResponse, new() =>
transport.RequestAsync<TResponse>(new EndpointPath(DELETE, pathAndQuery), data, openTelemetryData: default, null, cancellationToken);
transport.RequestAsync<TResponse>(new EndpointPath(DELETE, pathAndQuery), data, null, null, cancellationToken);
}
Loading

0 comments on commit 5abd768

Please sign in to comment.