diff --git a/src/Agent/NewRelic/Agent/Core/Utilities/ExtensionsLoader.cs b/src/Agent/NewRelic/Agent/Core/Utilities/ExtensionsLoader.cs index 64dab83bc1..c481409b3c 100644 --- a/src/Agent/NewRelic/Agent/Core/Utilities/ExtensionsLoader.cs +++ b/src/Agent/NewRelic/Agent/Core/Utilities/ExtensionsLoader.cs @@ -81,6 +81,7 @@ public static void Initialize(string installPathExtensionsDirectory) { "TransportConfigLegacyWrapper", Path.Combine(_installPathExtensionsDirectory, "NewRelic.Providers.Wrapper.MassTransitLegacy.dll") }, // Kafka + { "KafkaBuilderWrapper", Path.Combine(_installPathExtensionsDirectory, "NewRelic.Providers.Wrapper.Kafka.dll") }, { "KafkaProducerWrapper", Path.Combine(_installPathExtensionsDirectory, "NewRelic.Providers.Wrapper.Kafka.dll") }, { "KafkaSerializerWrapper", Path.Combine(_installPathExtensionsDirectory, "NewRelic.Providers.Wrapper.Kafka.dll") }, { "KafkaConsumerWrapper", Path.Combine(_installPathExtensionsDirectory, "NewRelic.Providers.Wrapper.Kafka.dll") } diff --git a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/Kafka/Instrumentation.xml b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/Kafka/Instrumentation.xml index e7a14a76c3..c3c8bccf64 100644 --- a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/Kafka/Instrumentation.xml +++ b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/Kafka/Instrumentation.xml @@ -6,20 +6,20 @@ SPDX-License-Identifier: Apache-2.0 - + - + - + @@ -44,5 +44,14 @@ SPDX-License-Identifier: Apache-2.0 + + + + + + + + + diff --git a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/Kafka/KafkaBuilderWrapper.cs b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/Kafka/KafkaBuilderWrapper.cs new file mode 100644 index 0000000000..2dc440fcaf --- /dev/null +++ b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/Kafka/KafkaBuilderWrapper.cs @@ -0,0 +1,67 @@ +// Copyright 2020 New Relic, Inc. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +using System; +using System.Collections; +using NewRelic.Agent.Api; +using NewRelic.Agent.Extensions.Providers.Wrapper; +using NewRelic.Reflection; + +namespace NewRelic.Providers.Wrapper.Kafka +{ + public class KafkaBuilderWrapper : IWrapper + { + private Func _producerBuilderConfigGetter; + private Func _consumerBuilderConfigGetter; + + private const string WrapperName = "KafkaBuilderWrapper"; + private const string BootstrapServersKey = "bootstrap.servers"; + + public bool IsTransactionRequired => false; + public CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMethodInfo) + { + return new CanWrapResponse(WrapperName.Equals(instrumentedMethodInfo.RequestedWrapperName)); + } + + public AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent, ITransaction transaction) + { + var builder = instrumentedMethodCall.MethodCall.InvocationTarget; + + dynamic configuration = null; + + if (builder.GetType().Name == "ProducerBuilder`2") + { + var configGetter = _producerBuilderConfigGetter ??= VisibilityBypasser.Instance.GeneratePropertyAccessor(builder.GetType(), "Config"); + configuration = configGetter(builder) as dynamic; + } + else if (builder.GetType().Name == "ConsumerBuilder`2") + { + var configGetter = _consumerBuilderConfigGetter ??= VisibilityBypasser.Instance.GeneratePropertyAccessor(builder.GetType(), "Config"); + configuration = configGetter(builder) as dynamic; + } + + if (configuration == null) + return Delegates.NoOp; + + string bootstrapServers = null; + + foreach (var kvp in configuration) + { + if (kvp.Key == BootstrapServersKey) + { + bootstrapServers = kvp.Value as string; + break; + } + } + + if (!string.IsNullOrEmpty(bootstrapServers)) + return Delegates.GetDelegateFor(onSuccess: (producerOrConsumerAsObject) => + { + KafkaHelper.AddBootstrapServersToCache(producerOrConsumerAsObject, bootstrapServers); + }); + + return Delegates.NoOp; + + } + } +} diff --git a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/Kafka/KafkaConsumerWrapper.cs b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/Kafka/KafkaConsumerWrapper.cs index 0077b9101a..d20b2eeabc 100644 --- a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/Kafka/KafkaConsumerWrapper.cs +++ b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/Kafka/KafkaConsumerWrapper.cs @@ -66,6 +66,11 @@ public AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall ins segment.SetMessageBrokerDestination(topic); transaction.SetKafkaMessageBrokerTransactionName(MessageBrokerDestinationType.Topic, BrokerVendorName, topic); + if (KafkaHelper.TryGetBootstrapServersFromCache(instrumentedMethodCall.MethodCall.InvocationTarget, out var bootstrapServers)) + { + KafkaHelper.RecordKafkaNodeMetrics(agent, topic, bootstrapServers, false); + } + // get the Message.Headers property and process distributed trace headers var messageAccessor = MessageAccessorDictionary.GetOrAdd(type, GetMessageAccessorFunc); var messageAsObject = messageAccessor(resultAsObject); diff --git a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/Kafka/KafkaHelper.cs b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/Kafka/KafkaHelper.cs new file mode 100644 index 0000000000..9662f6ef0a --- /dev/null +++ b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/Kafka/KafkaHelper.cs @@ -0,0 +1,47 @@ +// Copyright 2020 New Relic, Inc. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +using System.Collections.Concurrent; +using System.Collections.Generic; +using NewRelic.Agent.Api; + +namespace NewRelic.Providers.Wrapper.Kafka +{ + internal static class KafkaHelper + { + private static readonly ConcurrentDictionary> _bootstrapServerCache = new(); + + public static void AddBootstrapServersToCache(object producerOrConsumerInstance, string bootStrapServers) + { + if (string.IsNullOrEmpty(bootStrapServers)) + return; + var kafkaBootstrapServers = new List(); + + // parse bootStrapServers - it's a comma separated list of host:port pairs + var servers = bootStrapServers.Split(','); + foreach (var server in servers) + { + kafkaBootstrapServers.Add(server); + } + + _bootstrapServerCache[producerOrConsumerInstance] = kafkaBootstrapServers; + } + + public static bool TryGetBootstrapServersFromCache(object producerOrConsumerInstance, out List kafkaBootstrapServers) + { + return _bootstrapServerCache.TryGetValue(producerOrConsumerInstance, out kafkaBootstrapServers); + } + + public static void RecordKafkaNodeMetrics(IAgent agent, string topicName, List bootstrapServers, bool isProducer) + { + foreach (var server in bootstrapServers) + { + var mode = (isProducer? "Produce" : "Consume"); + + agent.RecordCountMetric($"MessageBroker/Kafka/Nodes/{server}"); + agent.RecordCountMetric($"MessageBroker/Kafka/Nodes/{server}/{mode}/{topicName}"); + } + + } + } +} diff --git a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/Kafka/KafkaProducerWrapper.cs b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/Kafka/KafkaProducerWrapper.cs index 59e6696245..d409a67a20 100644 --- a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/Kafka/KafkaProducerWrapper.cs +++ b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/Kafka/KafkaProducerWrapper.cs @@ -31,6 +31,11 @@ public AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall ins transaction.InsertDistributedTraceHeaders(messageMetadata, DistributedTraceHeadersSetter); + if (KafkaHelper.TryGetBootstrapServersFromCache(instrumentedMethodCall.MethodCall.InvocationTarget, out var bootstrapServers)) + { + KafkaHelper.RecordKafkaNodeMetrics(agent, topicPartition.Topic, bootstrapServers, true); + } + return instrumentedMethodCall.MethodCall.Method.MethodName == "Produce" ? Delegates.GetDelegateFor(segment) : Delegates.GetAsyncDelegateFor(agent, segment); } @@ -43,6 +48,5 @@ private static void DistributedTraceHeadersSetter(MessageMetadata carrier, strin carrier.Headers.Add(key, Encoding.ASCII.GetBytes(value)); } } - } } diff --git a/tests/Agent/IntegrationTests/ContainerApplications/KafkaTestApp/Controllers/KafkaController.cs b/tests/Agent/IntegrationTests/ContainerApplications/KafkaTestApp/Controllers/KafkaController.cs index 5e6dc76de1..729a63e8b4 100644 --- a/tests/Agent/IntegrationTests/ContainerApplications/KafkaTestApp/Controllers/KafkaController.cs +++ b/tests/Agent/IntegrationTests/ContainerApplications/KafkaTestApp/Controllers/KafkaController.cs @@ -33,5 +33,12 @@ public async Task ProduceAsync() await Program.Producer.ProduceAsync(); return "Complete"; } + + [HttpGet("bootstrap_server")] + public string GetBootstrapServer() + { + return Program.GetBootstrapServer(); + } + } } diff --git a/tests/Agent/IntegrationTests/ContainerApplications/KafkaTestApp/Program.cs b/tests/Agent/IntegrationTests/ContainerApplications/KafkaTestApp/Program.cs index 5afd2248c6..379d8e6020 100644 --- a/tests/Agent/IntegrationTests/ContainerApplications/KafkaTestApp/Program.cs +++ b/tests/Agent/IntegrationTests/ContainerApplications/KafkaTestApp/Program.cs @@ -50,13 +50,18 @@ public static async Task Main(string[] args) await app.WaitForShutdownAsync(); } + public static string GetBootstrapServer() + { + var broker = Environment.GetEnvironmentVariable("NEW_RELIC_KAFKA_BROKER_NAME"); + return $"{broker}:9092"; + } + public static void SetupKafka(ILogger logger) { Thread.Sleep(15 * 1000); // Waiting for Kafka to get ready - var broker = Environment.GetEnvironmentVariable("NEW_RELIC_KAFKA_BROKER_NAME"); var kafkaConfig = new ConfigurationBuilder().AddInMemoryCollection().Build(); - kafkaConfig["bootstrap.servers"] = $"{broker}:9092"; + kafkaConfig["bootstrap.servers"] = GetBootstrapServer(); kafkaConfig["group.id"] = "kafka-dotnet-getting-started"; kafkaConfig["auto.offset.reset"] = "earliest"; kafkaConfig["dotnet.cancellation.delay.max.ms"] = "10000"; diff --git a/tests/Agent/IntegrationTests/ContainerIntegrationTests/Fixtures/KafkaTestFixtures.cs b/tests/Agent/IntegrationTests/ContainerIntegrationTests/Fixtures/KafkaTestFixtures.cs index 59481ee213..2d1477a416 100644 --- a/tests/Agent/IntegrationTests/ContainerIntegrationTests/Fixtures/KafkaTestFixtures.cs +++ b/tests/Agent/IntegrationTests/ContainerIntegrationTests/Fixtures/KafkaTestFixtures.cs @@ -29,6 +29,14 @@ public virtual void ExerciseApplication() GetAndAssertStatusCode(address + "produceasync", System.Net.HttpStatusCode.OK); } + public string GetBootstrapServer() + { + var address = $"http://localhost:{Port}/kafka/bootstrap_server"; + var response = GetString(address); + + return response; + } + public void Delay(int seconds) { Task.Delay(TimeSpan.FromSeconds(seconds)).GetAwaiter().GetResult(); diff --git a/tests/Agent/IntegrationTests/ContainerIntegrationTests/Tests/KafkaTests.cs b/tests/Agent/IntegrationTests/ContainerIntegrationTests/Tests/KafkaTests.cs index 3bb5c429f8..a197060ba8 100644 --- a/tests/Agent/IntegrationTests/ContainerIntegrationTests/Tests/KafkaTests.cs +++ b/tests/Agent/IntegrationTests/ContainerIntegrationTests/Tests/KafkaTests.cs @@ -20,6 +20,7 @@ public abstract class LinuxKafkaTest : NewRelicIntegrationTest where T : K private readonly string _topicName; private readonly T _fixture; + private string _bootstrapServer; protected LinuxKafkaTest(T fixture, ITestOutputHelper output) : base(fixture) { @@ -42,6 +43,8 @@ protected LinuxKafkaTest(T fixture, ITestOutputHelper output) : base(fixture) _fixture.Delay(15); // wait long enough to ensure kafka and app are ready _fixture.ExerciseApplication(); + _bootstrapServer = _fixture.GetBootstrapServer(); + _fixture.Delay(11); // wait long enough to ensure a metric harvest occurs after we exercise the app _fixture.AgentLog.WaitForLogLine(AgentLogBase.HarvestFinishedLogLineRegex, TimeSpan.FromSeconds(11)); @@ -65,6 +68,10 @@ public void Test() var consumeTransactionName = @"OtherTransaction/Message/Kafka/Topic/Consume/Named/" + _topicName; var produceWebTransactionName = @"WebTransaction/MVC/Kafka/Produce"; + var messageBrokerNode = $"MessageBroker/Kafka/Nodes/{_bootstrapServer}"; + var messageBrokerNodeProduceTopic = $"MessageBroker/Kafka/Nodes/{_bootstrapServer}/Produce/{_topicName}"; + var messageBrokerNodeConsumeTopic = $"MessageBroker/Kafka/Nodes/{_bootstrapServer}/Consume/{_topicName}"; + var metrics = _fixture.AgentLog.GetMetrics(); var spans = _fixture.AgentLog.GetSpanEvents(); var produceSpan = spans.FirstOrDefault(s => s.IntrinsicAttributes["name"].Equals(messageBrokerProduce)); @@ -72,19 +79,23 @@ public void Test() var expectedMetrics = new List { - new Assertions.ExpectedMetric { metricName = produceWebTransactionName, callCount = 2 }, // includes sync and async actions - new Assertions.ExpectedMetric { metricName = messageBrokerProduce, callCount = 2 }, - new Assertions.ExpectedMetric { metricName = messageBrokerProduce, metricScope = produceWebTransactionName, callCount = 2 }, - new Assertions.ExpectedMetric { metricName = messageBrokerProduceSerializationKey, callCount = 2 }, - new Assertions.ExpectedMetric { metricName = messageBrokerProduceSerializationKey, metricScope = produceWebTransactionName, callCount = 2 }, - new Assertions.ExpectedMetric { metricName = messageBrokerProduceSerializationValue, callCount = 2 }, - new Assertions.ExpectedMetric { metricName = messageBrokerProduceSerializationValue, metricScope = produceWebTransactionName, callCount = 2 }, - - new Assertions.ExpectedMetric { metricName = consumeTransactionName, callCount = 2 }, - new Assertions.ExpectedMetric { metricName = messageBrokerConsume, callCount = 2 }, - new Assertions.ExpectedMetric { metricName = messageBrokerConsume, metricScope = consumeTransactionName, callCount = 2 }, - new Assertions.ExpectedMetric { metricName = "Supportability/TraceContext/Create/Success", callCount = 2 }, - new Assertions.ExpectedMetric { metricName = "Supportability/TraceContext/Accept/Success", callCount = 2 }, + new() { metricName = produceWebTransactionName, callCount = 2 }, // includes sync and async actions + new() { metricName = messageBrokerProduce, callCount = 2 }, + new() { metricName = messageBrokerProduce, metricScope = produceWebTransactionName, callCount = 2 }, + new() { metricName = messageBrokerProduceSerializationKey, callCount = 2 }, + new() { metricName = messageBrokerProduceSerializationKey, metricScope = produceWebTransactionName, callCount = 2 }, + new() { metricName = messageBrokerProduceSerializationValue, callCount = 2 }, + new() { metricName = messageBrokerProduceSerializationValue, metricScope = produceWebTransactionName, callCount = 2 }, + + new() { metricName = consumeTransactionName, callCount = 2 }, + new() { metricName = messageBrokerConsume, callCount = 2 }, + new() { metricName = messageBrokerConsume, metricScope = consumeTransactionName, callCount = 2 }, + new() { metricName = "Supportability/TraceContext/Create/Success", callCount = 2 }, + new() { metricName = "Supportability/TraceContext/Accept/Success", callCount = 2 }, + + new() { metricName = messageBrokerNode, callCount = 4 }, + new() { metricName = messageBrokerNodeProduceTopic, callCount = 2 }, + new() { metricName = messageBrokerNodeConsumeTopic, callCount = 2 } }; NrAssert.Multiple(