Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor OpenTelemetryData to Action<Activity> #143

Merged
merged 3 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ internal VirtualizedCluster(TransportConfigurationDescriptor settings)
_syncCall = (t, r) => t.Request<VirtualResponse>(
path: RootPath,
postData: PostData.Serializable(new { }),
openTelemetryData: default,
null,
localConfiguration: r?.Invoke(new RequestConfigurationDescriptor())
);
_asyncCall = async (t, r) =>
Expand All @@ -41,7 +41,7 @@ internal VirtualizedCluster(TransportConfigurationDescriptor settings)
(
path: RootPath,
postData: PostData.Serializable(new { }),
openTelemetryData: default,
null,
localConfiguration: r?.Invoke(new RequestConfigurationDescriptor()),
CancellationToken.None
).ConfigureAwait(false);
Expand Down
19 changes: 4 additions & 15 deletions src/Elastic.Transport/Diagnostics/OpenTelemetry/OpenTelemetry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
using System;
using System.Diagnostics;

#pragma warning disable IDE0130 // Namespace does not match folder structure
namespace Elastic.Transport.Diagnostics;
#pragma warning restore IDE0130 // Namespace does not match folder structure

/// <summary>
/// Activity information for OpenTelemetry instrumentation.
Expand Down Expand Up @@ -37,7 +39,7 @@ public static class OpenTelemetry
internal static bool CurrentSpanIsElasticTransportOwnedHasListenersAndAllDataRequested => ElasticTransportActivitySource.HasListeners() &&
((Activity.Current?.Source.Name.Equals(ElasticTransportActivitySourceName, StringComparison.Ordinal) ?? false) && (Activity.Current?.IsAllDataRequested ?? false));

internal static void SetCommonAttributes(Activity? activity, OpenTelemetryData openTelemetryData, ITransportConfiguration settings)
internal static void SetCommonAttributes(Activity? activity, ITransportConfiguration settings)
{
if (activity is null)
return;
Expand All @@ -51,20 +53,7 @@ internal static void SetCommonAttributes(Activity? activity, OpenTelemetryData o
}

var productSchemaVersion = string.Empty;
if (openTelemetryData.SpanAttributes is not null)
{
foreach (var attribute in openTelemetryData.SpanAttributes)
{
activity?.SetTag(attribute.Key, attribute.Value);

if (attribute.Key.Equals(OpenTelemetryAttributes.DbElasticsearchSchemaUrl, StringComparison.Ordinal))
{
if (attribute.Value is string schemaVersion)
productSchemaVersion = schemaVersion;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevejgordon Removing this causes the condition in line 69 to be always true.

}
}
}


// We add the client schema version only when it differs from the product schema version
if (!productSchemaVersion.Equals(OpenTelemetrySchemaVersion, StringComparison.Ordinal))
activity?.SetTag(OpenTelemetryAttributes.ElasticTransportSchemaVersion, OpenTelemetrySchemaVersion);
Expand Down

This file was deleted.

36 changes: 16 additions & 20 deletions src/Elastic.Transport/DistributedTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@
namespace Elastic.Transport;

/// <inheritdoc cref="ITransport{TConfiguration}" />
public sealed class DistributedTransport : DistributedTransport<ITransportConfiguration>
/// <summary>
/// Transport coordinates the client requests over the node pool nodes and is in charge of falling over on
/// different nodes
/// </summary>
/// <param name="configuration">The configuration to use for this transport</param>
public sealed class DistributedTransport(ITransportConfiguration configuration) : DistributedTransport<ITransportConfiguration>(configuration)
{
/// <summary>
/// Transport coordinates the client requests over the node pool nodes and is in charge of falling over on
/// different nodes
/// </summary>
/// <param name="configuration">The configuration to use for this transport</param>
public DistributedTransport(ITransportConfiguration configuration)
: base(configuration) { }
}

/// <inheritdoc cref="ITransport{TConfiguration}" />
Expand Down Expand Up @@ -65,36 +63,36 @@ public DistributedTransport(TConfiguration configuration)
public TResponse Request<TResponse>(
in EndpointPath path,
PostData? data,
in OpenTelemetryData openTelemetryData,
Action<Activity>? configureActivity,
IRequestConfiguration? localConfiguration
) where TResponse : TransportResponse, new() =>
RequestCoreAsync<TResponse>(isAsync: false, path, data, openTelemetryData, localConfiguration)
RequestCoreAsync<TResponse>(isAsync: false, path, data, configureActivity, localConfiguration)
.EnsureCompleted();

/// <inheritdoc cref="ITransport.RequestAsync{TResponse}"/>
public Task<TResponse> RequestAsync<TResponse>(
in EndpointPath path,
PostData? data,
in OpenTelemetryData openTelemetryData,
Action<Activity>? configureActivity,
IRequestConfiguration? localConfiguration,
CancellationToken cancellationToken = default
) where TResponse : TransportResponse, new() =>
RequestCoreAsync<TResponse>(isAsync: true, path, data, openTelemetryData, localConfiguration, cancellationToken)
RequestCoreAsync<TResponse>(isAsync: true, path, data, configureActivity, localConfiguration, cancellationToken)
.AsTask();

private async ValueTask<TResponse> RequestCoreAsync<TResponse>(
bool isAsync,
EndpointPath path,
PostData? data,
OpenTelemetryData openTelemetryData,
Action<Activity>? configureActivity,
IRequestConfiguration? localConfiguration,
CancellationToken cancellationToken = default
) where TResponse : TransportResponse, new()
{
Activity activity = null;

if (OpenTelemetry.ElasticTransportActivitySource.HasListeners())
activity = OpenTelemetry.ElasticTransportActivitySource.StartActivity(openTelemetryData.SpanName ?? path.Method.GetStringValue(),
activity = OpenTelemetry.ElasticTransportActivitySource.StartActivity(path.Method.GetStringValue(),
ActivityKind.Client);

try
Expand Down Expand Up @@ -127,7 +125,7 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(
if (activity is { IsAllDataRequested: true })
{
if (activity.IsAllDataRequested)
OpenTelemetry.SetCommonAttributes(activity, openTelemetryData, Configuration);
OpenTelemetry.SetCommonAttributes(activity, Configuration);

if (Configuration.Authentication is BasicAuthentication basicAuthentication)
activity.SetTag(SemanticConventions.DbUser, basicAuthentication.Username);
Expand All @@ -136,11 +134,6 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(
activity.SetTag(OpenTelemetryAttributes.ElasticTransportProductVersion, Configuration.ProductRegistration.ProductAssemblyVersion);
activity.SetTag(OpenTelemetryAttributes.ElasticTransportVersion, ReflectionVersionInfo.TransportVersion);
activity.SetTag(SemanticConventions.UserAgentOriginal, Configuration.UserAgent.ToString());

if (openTelemetryData.SpanAttributes is not null)
foreach (var attribute in openTelemetryData.SpanAttributes)
activity.SetTag(attribute.Key, attribute.Value);

activity.SetTag(SemanticConventions.HttpRequestMethod, endpoint.Method.GetStringValue());
}

Expand Down Expand Up @@ -268,6 +261,9 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(
activity?.SetTag(SemanticConventions.HttpResponseStatusCode, response.ApiCallDetails.HttpStatusCode);
activity?.SetTag(OpenTelemetryAttributes.ElasticTransportAttemptedNodes, attemptedNodes);

if (configureActivity is not null && activity is not null)
configureActivity.Invoke(activity);

return FinalizeResponse(endpoint, boundConfiguration, data, pipeline, startedOn, auditor, seenExceptions, response);
}
finally
Expand Down
11 changes: 6 additions & 5 deletions src/Elastic.Transport/ITransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System.Diagnostics;
using System;
using System.Threading;
using System.Threading.Tasks;
using Elastic.Transport.Diagnostics;

namespace Elastic.Transport;

Expand All @@ -20,15 +21,15 @@ public interface ITransport
/// <typeparam name="TResponse">The type to deserialize the response body into.</typeparam>
/// <param name="path">The path of the request.</param>
/// <param name="postData">The data to be included as the body of the HTTP request.</param>
/// <param name="openTelemetryData">Data to be used to control the OpenTelemetry instrumentation.</param>
/// <param name="configureActivity">An optional <see cref="Action"/> used to configure the <see cref="Activity"/>.</param>
/// <param name="localConfiguration">Per request configuration</param>
/// Allows callers to override completely how `TResponse` should be deserialized to a `TResponse` that implements <see cref="TransportResponse"/> instance.
/// <para>Expert setting only</para>
/// <returns>The deserialized <typeparamref name="TResponse"/>.</returns>
public TResponse Request<TResponse>(
in EndpointPath path,
PostData? postData,
in OpenTelemetryData openTelemetryData,
Action<Activity>? configureActivity,
IRequestConfiguration? localConfiguration
)
where TResponse : TransportResponse, new();
Expand All @@ -40,15 +41,15 @@ public TResponse Request<TResponse>(
/// <param name="path">The path of the request.</param>
/// <param name="postData">The data to be included as the body of the HTTP request.</param>
/// <param name="cancellationToken">The cancellation token to use.</param>
/// <param name="openTelemetryData">Data to be used to control the OpenTelemetry instrumentation.</param>
/// <param name="configureActivity">An optional <see cref="Action"/> used to configure the <see cref="Activity"/>.</param>
/// <param name="localConfiguration">Per request configuration</param>
/// Allows callers to override completely how `TResponse` should be deserialized to a `TResponse` that implements <see cref="TransportResponse"/> instance.
/// <para>Expert setting only</para>
/// <returns>The deserialized <typeparamref name="TResponse"/>.</returns>
public Task<TResponse> RequestAsync<TResponse>(
in EndpointPath path,
PostData? postData,
in OpenTelemetryData openTelemetryData,
Action<Activity>? configureActivity,
IRequestConfiguration? localConfiguration,
CancellationToken cancellationToken = default
)
Expand Down
Loading
Loading