diff --git a/src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs b/src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs
index c2d1cbf13..143d4bb44 100644
--- a/src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs
+++ b/src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs
@@ -3,10 +3,22 @@
using System.Diagnostics;
using System.Reflection;
+ ///
+ /// ActivitySource properties
+ ///
public static class ActivitySourceAccessor
{
- public const string ActivityString = "otel_activity";
- public static readonly ActivitySource ActivitySource = new("KafkaFlow.OpenTelemetry", Version);
+ ///
+ /// The name of the OpenTelemetry Activity that is used as a key
+ /// in MessageContext.Items dictionary
+ ///
+ public static string ActivityString = "otel_activity";
+
+ ///
+ /// The ActivitySource name that is used in KafkaFlow
+ ///
+ public static ActivitySource ActivitySource = new("KafkaFlow.OpenTelemetry", Version);
+
internal static readonly string Version = Assembly.GetExecutingAssembly().GetName().Version.ToString();
}
}
diff --git a/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs b/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs
index 8370352be..e9f99aabf 100644
--- a/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs
+++ b/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs
@@ -84,7 +84,7 @@ private static void SetConsumerTags(IMessageContext context, Activity activity)
{
var messageKey = Encoding.UTF8.GetString(context.Message.Key as byte[]);
- activity.SetTag(ActivityAccessor.AttributeMessagingOperation, ActivityOperationType.Process);
+ activity.SetTag(ActivityAccessor.AttributeMessagingOperation, ActivityOperationType.Process.ToString().ToLower());
activity.SetTag(AttributeMessagingSourceName, context.ConsumerContext.Topic);
activity.SetTag(AttributeMessagingKafkaConsumerGroup, context.ConsumerContext.GroupId);
activity.SetTag(ActivityAccessor.AttributeMessagingKafkaMessageKey, messageKey);
diff --git a/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs b/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs
index d2eb3e02d..d83855d11 100644
--- a/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs
+++ b/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs
@@ -89,7 +89,7 @@ private static void InjectTraceContextIntoBasicProperties(IMessageContext contex
private static void SetProducerTags(IMessageContext context, Activity activity)
{
- activity.SetTag(ActivityAccessor.AttributeMessagingOperation, ActivityOperationType.Publish);
+ activity.SetTag(ActivityAccessor.AttributeMessagingOperation, ActivityOperationType.Publish.ToString().ToLower());
activity.SetTag(AttributeMessagingDestinationName, context?.ProducerContext.Topic);
activity.SetTag(AttributeMessagingKafkaDestinationPartition, context?.ProducerContext.Partition);
activity.SetTag(ActivityAccessor.AttributeMessagingKafkaMessageKey, context?.Message.Key);
diff --git a/src/KafkaFlow/ActivityFactory.cs b/src/KafkaFlow/ActivityFactory.cs
index dd3e83f69..c1c291658 100644
--- a/src/KafkaFlow/ActivityFactory.cs
+++ b/src/KafkaFlow/ActivityFactory.cs
@@ -6,7 +6,7 @@ internal class ActivityFactory : IActivityFactory
{
public Activity Start(string topicName, ActivityOperationType activityOperationType, ActivityKind activityKind)
{
- var activityName = !string.IsNullOrEmpty(topicName) ? $"{topicName} {activityOperationType}" : activityOperationType.ToString();
+ var activityName = !string.IsNullOrEmpty(topicName) ? $"{topicName} {activityOperationType}" : activityOperationType.ToString().ToLower();
// Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification.
// The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity.
diff --git a/src/KafkaFlow/IActivityFactory.cs b/src/KafkaFlow/IActivityFactory.cs
index 35dc7faf7..cd3bc6a7c 100644
--- a/src/KafkaFlow/IActivityFactory.cs
+++ b/src/KafkaFlow/IActivityFactory.cs
@@ -1,10 +1,9 @@
-using System.Diagnostics;
-
-namespace KafkaFlow
+namespace KafkaFlow
{
+ using System.Diagnostics;
+
internal interface IActivityFactory
{
-
public Activity Start(string topicName, ActivityOperationType activityOperationType, ActivityKind activityKind);
}
}