Skip to content

Commit

Permalink
fix: Kafka bootstrap cache wasn't implemented correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
tippmar-nr committed Jul 31, 2024
1 parent d57a875 commit 98b9347
Showing 1 changed file with 8 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Collections;
using System.Collections.Concurrent;
using NewRelic.Agent.Api;
using NewRelic.Agent.Extensions.Providers.Wrapper;
using NewRelic.Reflection;
Expand All @@ -11,8 +12,7 @@ namespace NewRelic.Providers.Wrapper.Kafka
{
public class KafkaBuilderWrapper : IWrapper
{
private Func<object, IEnumerable> _producerBuilderConfigGetter;
private Func<object, IEnumerable> _consumerBuilderConfigGetter;
private ConcurrentDictionary<Type, Func<object, IEnumerable>> _builderConfigGetterDictionary = new();

private const string WrapperName = "KafkaBuilderWrapper";
private const string BootstrapServersKey = "bootstrap.servers";
Expand All @@ -27,21 +27,13 @@ public AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall ins
{
var builder = instrumentedMethodCall.MethodCall.InvocationTarget;

dynamic configuration = null;

if (builder.GetType().Name == "ProducerBuilder`2")
{
var configGetter = _producerBuilderConfigGetter ??= VisibilityBypasser.Instance.GeneratePropertyAccessor<IEnumerable>(builder.GetType(), "Config");
configuration = configGetter(builder) as dynamic;
}
else if (builder.GetType().Name == "ConsumerBuilder`2")
if (!_builderConfigGetterDictionary.TryGetValue(builder.GetType(), out var configGetter))
{
var configGetter = _consumerBuilderConfigGetter ??= VisibilityBypasser.Instance.GeneratePropertyAccessor<IEnumerable>(builder.GetType(), "Config");
configuration = configGetter(builder) as dynamic;
configGetter = VisibilityBypasser.Instance.GeneratePropertyAccessor<IEnumerable>(builder.GetType(), "Config");
_builderConfigGetterDictionary[builder.GetType()] = configGetter;
}

if (configuration == null)
return Delegates.NoOp;
dynamic configuration = configGetter(builder);

string bootstrapServers = null;

Expand All @@ -55,9 +47,9 @@ public AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall ins
}

if (!string.IsNullOrEmpty(bootstrapServers))
return Delegates.GetDelegateFor<object>(onSuccess: (producerOrConsumerAsObject) =>
return Delegates.GetDelegateFor<object>(onSuccess: (builtObject) =>
{
KafkaHelper.AddBootstrapServersToCache(producerOrConsumerAsObject, bootstrapServers);
KafkaHelper.AddBootstrapServersToCache(builtObject, bootstrapServers);
});

return Delegates.NoOp;
Expand Down

0 comments on commit 98b9347

Please sign in to comment.