Skip to content

Commit

Permalink
refactors
Browse files Browse the repository at this point in the history
  • Loading branch information
simaoribeiro committed Nov 13, 2023
1 parent 5c80b3c commit 29ce080
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 9 deletions.
16 changes: 14 additions & 2 deletions src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,22 @@
using System.Diagnostics;
using System.Reflection;

/// <summary>
/// ActivitySource properties
/// </summary>
public static class ActivitySourceAccessor
{
public const string ActivityString = "otel_activity";
public static readonly ActivitySource ActivitySource = new("KafkaFlow.OpenTelemetry", Version);
/// <summary>
/// The name of the OpenTelemetry Activity that is used as a key
/// in MessageContext.Items dictionary
/// </summary>
public static string ActivityString = "otel_activity";

Check warning on line 15 in src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs

View workflow job for this annotation

GitHub Actions / Test deployment


/// <summary>
/// The ActivitySource name that is used in KafkaFlow
/// </summary>
public static ActivitySource ActivitySource = new("KafkaFlow.OpenTelemetry", Version);

Check warning on line 20 in src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs

View workflow job for this annotation

GitHub Actions / Test deployment


internal static readonly string Version = Assembly.GetExecutingAssembly().GetName().Version.ToString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private static void SetConsumerTags(IMessageContext context, Activity activity)
{
var messageKey = Encoding.UTF8.GetString(context.Message.Key as byte[]);

activity.SetTag(ActivityAccessor.AttributeMessagingOperation, ActivityOperationType.Process);
activity.SetTag(ActivityAccessor.AttributeMessagingOperation, ActivityOperationType.Process.ToString().ToLower());
activity.SetTag(AttributeMessagingSourceName, context.ConsumerContext.Topic);
activity.SetTag(AttributeMessagingKafkaConsumerGroup, context.ConsumerContext.GroupId);
activity.SetTag(ActivityAccessor.AttributeMessagingKafkaMessageKey, messageKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private static void InjectTraceContextIntoBasicProperties(IMessageContext contex

private static void SetProducerTags(IMessageContext context, Activity activity)
{
activity.SetTag(ActivityAccessor.AttributeMessagingOperation, ActivityOperationType.Publish);
activity.SetTag(ActivityAccessor.AttributeMessagingOperation, ActivityOperationType.Publish.ToString().ToLower());
activity.SetTag(AttributeMessagingDestinationName, context?.ProducerContext.Topic);
activity.SetTag(AttributeMessagingKafkaDestinationPartition, context?.ProducerContext.Partition);
activity.SetTag(ActivityAccessor.AttributeMessagingKafkaMessageKey, context?.Message.Key);
Expand Down
2 changes: 1 addition & 1 deletion src/KafkaFlow/ActivityFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ internal class ActivityFactory : IActivityFactory
{
public Activity Start(string topicName, ActivityOperationType activityOperationType, ActivityKind activityKind)
{
var activityName = !string.IsNullOrEmpty(topicName) ? $"{topicName} {activityOperationType}" : activityOperationType.ToString();
var activityName = !string.IsNullOrEmpty(topicName) ? $"{topicName} {activityOperationType}" : activityOperationType.ToString().ToLower();

// Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification.
// The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity.
Expand Down
7 changes: 3 additions & 4 deletions src/KafkaFlow/IActivityFactory.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
using System.Diagnostics;

namespace KafkaFlow
namespace KafkaFlow
{
using System.Diagnostics;

internal interface IActivityFactory
{

public Activity Start(string topicName, ActivityOperationType activityOperationType, ActivityKind activityKind);
}
}

0 comments on commit 29ce080

Please sign in to comment.