Skip to content

Commit

Permalink
[Instrumentation.GrpcCore] Use shared semantic conventions (#1917)
Browse files Browse the repository at this point in the history
  • Loading branch information
IliaBrahinets authored Jun 24, 2024
1 parent 222818f commit 02a7ddf
Show file tree
Hide file tree
Showing 11 changed files with 77 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// SPDX-License-Identifier: Apache-2.0

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;
using Grpc.Core;
Expand Down Expand Up @@ -311,16 +310,6 @@ public ClientRpcScope(ClientInterceptorContext<TRequest, TResponse> context, Cli
return;
}

// This if block is for unit testing only.
IEnumerable<KeyValuePair<string, object>> customTags = null;
if (options.ActivityIdentifierValue != default)
{
customTags = new List<KeyValuePair<string, object>>
{
new KeyValuePair<string, object>(SemanticConventions.AttributeActivityIdentifier, options.ActivityIdentifierValue),
};
}

// We want to start an activity but don't activate it.
// After calling StartActivity, Activity.Current will be the new Activity.
// This scope is created synchronously before the RPC invocation starts and so this new Activity will overwrite
Expand All @@ -331,7 +320,7 @@ public ClientRpcScope(ClientInterceptorContext<TRequest, TResponse> context, Cli
this.FullServiceName,
ActivityKind.Client,
this.parentActivity == default ? default : this.parentActivity.Context,
tags: customTags);
tags: options.AdditionalTags);

if (rpcActivity == null)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using System;
using System.Collections.Generic;
using OpenTelemetry.Context.Propagation;

namespace OpenTelemetry.Instrumentation.GrpcCore;
Expand All @@ -27,7 +27,7 @@ public class ClientTracingInterceptorOptions
public TextMapPropagator Propagator { get; internal set; } = Propagators.DefaultTextMapPropagator;

/// <summary>
/// Gets or sets a custom identifier used during unit testing.
/// Gets or sets additional activity tags used during unit testing.
/// </summary>
internal Guid ActivityIdentifierValue { get; set; }
internal IReadOnlyDictionary<string, object> AdditionalTags { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@
<ItemGroup>
<Compile Include="$(RepoRoot)\src\Shared\AssemblyVersionExtensions.cs" Link="Includes\AssemblyVersionExtensions.cs" />
<Compile Include="$(RepoRoot)\src\Shared\Guard.cs" Link="Includes\Guard.cs" />
<Compile Include="$(RepoRoot)\src\Shared\SemanticConventions.cs" Link="Includes\SemanticConventions.cs" />
</ItemGroup>
</Project>
2 changes: 1 addition & 1 deletion src/OpenTelemetry.Instrumentation.GrpcCore/RpcScope.cs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ private void AddMessageEvent(string eventName, IMessage message, bool request)
{
new KeyValuePair<string, object>("name", "message"),
new KeyValuePair<string, object>(SemanticConventions.AttributeMessageType, request ? "SENT" : "RECEIVED"),
new KeyValuePair<string, object>(SemanticConventions.AttributeMessageID, request ? this.requestMessageCounter : this.responseMessageCounter),
new KeyValuePair<string, object>(SemanticConventions.AttributeMessageId, request ? this.requestMessageCounter : this.responseMessageCounter),

// TODO how to get the real compressed or uncompressed sizes
new KeyValuePair<string, object>(SemanticConventions.AttributeMessageCompressedSize, messageSize),
Expand Down
30 changes: 0 additions & 30 deletions src/OpenTelemetry.Instrumentation.GrpcCore/SemanticConventions.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -205,21 +205,11 @@ public ServerRpcScope(ServerCallContext context, ServerTracingInterceptorOptions
}
}

// This if block is for unit testing only.
IEnumerable<KeyValuePair<string, object>> customTags = null;
if (options.ActivityIdentifierValue != default)
{
customTags = new List<KeyValuePair<string, object>>
{
new KeyValuePair<string, object>(SemanticConventions.AttributeActivityIdentifier, options.ActivityIdentifierValue),
};
}

var activity = GrpcCoreInstrumentation.ActivitySource.StartActivity(
this.FullServiceName,
ActivityKind.Server,
currentContext ?? default,
tags: customTags);
tags: options.AdditionalTags);

this.SetActivity(activity);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using System;
using System.Collections.Generic;
using OpenTelemetry.Context.Propagation;

namespace OpenTelemetry.Instrumentation.GrpcCore;
Expand All @@ -27,7 +27,7 @@ public class ServerTracingInterceptorOptions
public TextMapPropagator Propagator { get; internal set; } = Propagators.DefaultTextMapPropagator;

/// <summary>
/// Gets or sets a custom identfier used during unit testing.
/// Gets or sets additional activity tags used during unit testing.
/// </summary>
internal Guid ActivityIdentifierValue { get; set; }
internal IReadOnlyDictionary<string, object> AdditionalTags { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
using Grpc.Core.Interceptors;
using OpenTelemetry.Context.Propagation;
using OpenTelemetry.Tests;
using OpenTelemetry.Trace;
using Xunit;
using StatusCode = Grpc.Core.StatusCode;

namespace OpenTelemetry.Instrumentation.GrpcCore.Tests;

Expand Down Expand Up @@ -237,7 +239,8 @@ public async Task DownstreamInterceptorActivityAccess()
return metadata;
});

var interceptorOptions = new ClientTracingInterceptorOptions { ActivityIdentifierValue = Guid.NewGuid() };
var testTags = new TestActivityTags();
var interceptorOptions = new ClientTracingInterceptorOptions { AdditionalTags = testTags.Tags };
callInvoker = callInvoker.Intercept(new ClientTracingInterceptor(interceptorOptions));
var client = new Foobar.FoobarClient(callInvoker);

Expand All @@ -248,7 +251,7 @@ static void ValidateNewTagOnActivity(InterceptorActivityListener listener)
}

// Check the blocking async call
using (var activityListener = new InterceptorActivityListener(interceptorOptions.ActivityIdentifierValue))
using (var activityListener = new InterceptorActivityListener(testTags))
{
Assert.Equal(parentActivity, Activity.Current);
var response = client.Unary(FoobarService.DefaultRequestMessage);
Expand All @@ -259,7 +262,7 @@ static void ValidateNewTagOnActivity(InterceptorActivityListener listener)
}

// Check unary async
using (var activityListener = new InterceptorActivityListener(interceptorOptions.ActivityIdentifierValue))
using (var activityListener = new InterceptorActivityListener(testTags))
{
Assert.Equal(parentActivity, Activity.Current);
using var call = client.UnaryAsync(FoobarService.DefaultRequestMessage);
Expand All @@ -274,7 +277,7 @@ static void ValidateNewTagOnActivity(InterceptorActivityListener listener)
}

// Check a streaming async call
using (var activityListener = new InterceptorActivityListener(interceptorOptions.ActivityIdentifierValue))
using (var activityListener = new InterceptorActivityListener(testTags))
{
Assert.Equal(parentActivity, Activity.Current);
using var call = client.DuplexStreaming();
Expand Down Expand Up @@ -343,7 +346,7 @@ static void ValidateCommonEventAttributes(ActivityEvent activityEvent)
{
Assert.NotNull(activityEvent.Tags);
Assert.Contains(activityEvent.Tags, t => t.Key == "name" && (string)t.Value == "message");
Assert.Contains(activityEvent.Tags, t => t.Key == SemanticConventions.AttributeMessageID && (int)t.Value == 1);
Assert.Contains(activityEvent.Tags, t => t.Key == SemanticConventions.AttributeMessageId && (int)t.Value == 1);
}

Assert.NotEqual(default, requestMessage);
Expand Down Expand Up @@ -403,15 +406,16 @@ private static async Task TestHandlerSuccess(Func<Foobar.FoobarClient, Metadata,
};

using var server = FoobarService.Start();
var testTags = new TestActivityTags();
var interceptorOptions = new ClientTracingInterceptorOptions
{
Propagator = propagator,
RecordMessageEvents = true,
ActivityIdentifierValue = Guid.NewGuid(),
AdditionalTags = testTags.Tags,
};

// No Activity parent
using (var activityListener = new InterceptorActivityListener(interceptorOptions.ActivityIdentifierValue))
using (var activityListener = new InterceptorActivityListener(testTags))
{
var client = FoobarService.ConstructRpcClient(server.UriString, new ClientTracingInterceptor(interceptorOptions));
await clientRequestFunc(client, additionalMetadata).ConfigureAwait(false);
Expand Down Expand Up @@ -446,7 +450,7 @@ private static async Task TestHandlerSuccess(Func<Foobar.FoobarClient, Metadata,
capturedPropagationContext = default;

// Activity has a parent
using (var activityListener = new InterceptorActivityListener(interceptorOptions.ActivityIdentifierValue))
using (var activityListener = new InterceptorActivityListener(testTags))
{
using var parentActivity = new Activity("foo");
parentActivity.SetIdFormat(ActivityIdFormat.W3C);
Expand Down Expand Up @@ -486,7 +490,8 @@ private static async Task TestHandlerFailure(
string serverUriString = null)
{
using var server = FoobarService.Start();
var interceptorOptions = new ClientTracingInterceptorOptions { Propagator = new TraceContextPropagator(), ActivityIdentifierValue = Guid.NewGuid(), RecordException = true };
var testTags = new TestActivityTags();
var interceptorOptions = new ClientTracingInterceptorOptions { Propagator = new TraceContextPropagator(), AdditionalTags = testTags.Tags, RecordException = true };
var client = FoobarService.ConstructRpcClient(
serverUriString ?? server.UriString,
new ClientTracingInterceptor(interceptorOptions),
Expand All @@ -496,7 +501,7 @@ private static async Task TestHandlerFailure(
new(FoobarService.RequestHeaderErrorDescription, "fubar"),
});

using var activityListener = new InterceptorActivityListener(interceptorOptions.ActivityIdentifierValue);
using var activityListener = new InterceptorActivityListener(testTags);
await Assert.ThrowsAsync<RpcException>(async () => await clientRequestFunc(client, null).ConfigureAwait(false));

var activity = activityListener.Activity;
Expand All @@ -515,8 +520,9 @@ private static async Task TestHandlerFailure(
private void TestActivityIsCancelledWhenHandlerDisposed(Action<Foobar.FoobarClient> clientRequestAction)
{
using var server = FoobarService.Start();
var clientInterceptorOptions = new ClientTracingInterceptorOptions { Propagator = new TraceContextPropagator(), ActivityIdentifierValue = Guid.NewGuid() };
using var activityListener = new InterceptorActivityListener(clientInterceptorOptions.ActivityIdentifierValue);
var testTags = new TestActivityTags();
var clientInterceptorOptions = new ClientTracingInterceptorOptions { Propagator = new TraceContextPropagator(), AdditionalTags = testTags.Tags };
using var activityListener = new InterceptorActivityListener(testTags);
var client = FoobarService.ConstructRpcClient(server.UriString, new ClientTracingInterceptor(clientInterceptorOptions));
clientRequestAction(client);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,12 @@ public async Task DuplexStreamingServerHandlerFail()
private static async Task TestHandlerSuccess(Func<Foobar.FoobarClient, Metadata, Task> clientRequestFunc, Metadata additionalMetadata = null)
{
// starts the server with the server interceptor
var interceptorOptions = new ServerTracingInterceptorOptions { Propagator = new TraceContextPropagator(), RecordMessageEvents = true, ActivityIdentifierValue = Guid.NewGuid() };
var testTags = new TestActivityTags();
var interceptorOptions = new ServerTracingInterceptorOptions { Propagator = new TraceContextPropagator(), RecordMessageEvents = true, AdditionalTags = testTags.Tags };
using var server = FoobarService.Start(new ServerTracingInterceptor(interceptorOptions));

// No parent Activity, no context from header
using (var activityListener = new InterceptorActivityListener(interceptorOptions.ActivityIdentifierValue))
using (var activityListener = new InterceptorActivityListener(testTags))
{
var client = FoobarService.ConstructRpcClient(server.UriString);
await clientRequestFunc(client, additionalMetadata);
Expand All @@ -119,7 +120,7 @@ private static async Task TestHandlerSuccess(Func<Foobar.FoobarClient, Metadata,
}

// No parent Activity, context from header
using (var activityListener = new InterceptorActivityListener(interceptorOptions.ActivityIdentifierValue))
using (var activityListener = new InterceptorActivityListener(testTags))
{
var client = FoobarService.ConstructRpcClient(
server.UriString,
Expand All @@ -145,10 +146,11 @@ private static async Task TestHandlerSuccess(Func<Foobar.FoobarClient, Metadata,
private static async Task TestHandlerFailure(Func<Foobar.FoobarClient, Metadata, Task> clientRequestFunc, Metadata additionalMetadata = null)
{
// starts the server with the server interceptor
var interceptorOptions = new ServerTracingInterceptorOptions { Propagator = new TraceContextPropagator(), ActivityIdentifierValue = Guid.NewGuid(), RecordException = true };
var testTags = new TestActivityTags();
var interceptorOptions = new ServerTracingInterceptorOptions { Propagator = new TraceContextPropagator(), AdditionalTags = testTags.Tags, RecordException = true };
using var server = FoobarService.Start(new ServerTracingInterceptor(interceptorOptions));

using var activityListener = new InterceptorActivityListener(interceptorOptions.ActivityIdentifierValue);
using var activityListener = new InterceptorActivityListener(testTags);
var client = FoobarService.ConstructRpcClient(
server.UriString,
additionalMetadata: new List<Metadata.Entry>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

using System;
using System.Diagnostics;
using System.Linq;

namespace OpenTelemetry.Instrumentation.GrpcCore.Tests;

Expand All @@ -20,15 +19,15 @@ internal sealed class InterceptorActivityListener : IDisposable
/// <summary>
/// Initializes a new instance of the <see cref="InterceptorActivityListener" /> class.
/// </summary>
/// <param name="activityIdentifier">The activity identifier.</param>
public InterceptorActivityListener(Guid activityIdentifier)
/// <param name="testTags">The test activity tags.</param>
public InterceptorActivityListener(TestActivityTags testTags)
{
this.activityListener = new ActivityListener
{
ShouldListenTo = source => source.Name == GrpcCoreInstrumentation.ActivitySourceName,
ActivityStarted = activity =>
{
if (activity.TagObjects.Any(t => t.Key == SemanticConventions.AttributeActivityIdentifier && (Guid)t.Value == activityIdentifier))
if (testTags.HasTestTags(activity))
{
this.Activity = activity;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using OpenTelemetry.Internal;

namespace OpenTelemetry.Instrumentation.GrpcCore.Tests;

internal class TestActivityTags
{
public const string ActivityIdentifierTag = "activityidentifier";

public TestActivityTags()
{
this.Tags = new Dictionary<string, object>()
{
[ActivityIdentifierTag] = Guid.NewGuid(),
};
}

internal IReadOnlyDictionary<string, object> Tags { get; }

/// <summary>
/// Checks whether the activity has test tags.
/// </summary>
/// <param name="activity">The activity.</param>
/// <returns>Returns true if the activty has test tags, false otherwise.</returns>
internal bool HasTestTags(Activity activity)
{
Guard.ThrowIfNull(activity);

return this.Tags
.Select(tag => activity.TagObjects.Any(t => t.Key == tag.Key && t.Value == tag.Value))
.All(v => v);
}
}

0 comments on commit 02a7ddf

Please sign in to comment.