Skip to content

Commit

Permalink
fix: Fix duplicate key error when using Kafka with DT enabled (#2433)
Browse files Browse the repository at this point in the history
  • Loading branch information
chynesNR authored Apr 24, 2024
1 parent 49c65bf commit 6a85c03
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ public AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall ins
if (messageAsObject is MessageMetadata messageMetaData)
{
headersSize = GetHeadersSize(messageMetaData.Headers);

transaction.InsertDistributedTraceHeaders(messageMetaData.Headers, DistributedTraceHeadersSetter);
transaction.InsertDistributedTraceHeaders(messageMetaData, DistributedTraceHeadersSetter);
}

ReportSizeMetrics(agent, transaction, topic, headersSize, messageAsObject);
Expand Down Expand Up @@ -134,10 +133,14 @@ private static Func<object, object> GetKeyAccessorFunc(Type t) =>
private static Func<object, object> GetValueAccessorFunc(Type t) =>
VisibilityBypasser.Instance.GeneratePropertyAccessor<object>(t, "Value");

private static void DistributedTraceHeadersSetter(Headers carrier, string key, string value)
private static void DistributedTraceHeadersSetter(MessageMetadata carrier, string key, string value)
{
carrier ??= new Headers();
carrier.Add(key, Encoding.ASCII.GetBytes(value));
carrier.Headers ??= new Headers();
if (!string.IsNullOrEmpty(key))
{
carrier.Headers.Remove(key);
carrier.Headers.Add(key, Encoding.ASCII.GetBytes(value));
}
}

private static long TryGetSize(object obj)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,19 @@ public AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall ins

var segment = transaction.StartMessageBrokerSegment(instrumentedMethodCall.MethodCall, MessageBrokerDestinationType.Topic, MessageBrokerAction.Produce, BrokerVendorName, topicPartition.Topic);

transaction.InsertDistributedTraceHeaders(messageMetadata.Headers, DistributedTraceHeadersSetter);
transaction.InsertDistributedTraceHeaders(messageMetadata, DistributedTraceHeadersSetter);

return instrumentedMethodCall.MethodCall.Method.MethodName == "Produce" ? Delegates.GetDelegateFor(segment) : Delegates.GetAsyncDelegateFor<Task>(agent, segment);
}

private static void DistributedTraceHeadersSetter(Headers carrier, string key, string value)
private static void DistributedTraceHeadersSetter(MessageMetadata carrier, string key, string value)
{
carrier ??= new Headers();
carrier.Add(key, Encoding.ASCII.GetBytes(value));
carrier.Headers ??= new Headers();
if (!string.IsNullOrEmpty(key))
{
carrier.Headers.Remove(key);
carrier.Headers.Add(key, Encoding.ASCII.GetBytes(value));
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void Test()
() => Assert.True(produceSpan.IntrinsicAttributes.ContainsKey("parentId")),
() => Assert.NotNull(consumeTxnSpan),
() => Assert.True(consumeTxnSpan.UserAttributes.ContainsKey("kafka.consume.byteCount")),
() => Assert.InRange((long)consumeTxnSpan.UserAttributes["kafka.consume.byteCount"], 20, 30), // usually is 24 - 26
() => Assert.InRange((long)consumeTxnSpan.UserAttributes["kafka.consume.byteCount"], 460, 470), // includes headers
() => Assert.True(consumeTxnSpan.IntrinsicAttributes.ContainsKey("traceId")),
() => Assert.False(consumeTxnSpan.IntrinsicAttributes.ContainsKey("parentId"))
);
Expand Down

0 comments on commit 6a85c03

Please sign in to comment.