Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

APM-AWS Entity Relationships: Kafka #2636

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public static void Initialize(string installPathExtensionsDirectory)
{ "TransportConfigLegacyWrapper", Path.Combine(_installPathExtensionsDirectory, "NewRelic.Providers.Wrapper.MassTransitLegacy.dll") },

// Kafka
{ "KafkaBuilderWrapper", Path.Combine(_installPathExtensionsDirectory, "NewRelic.Providers.Wrapper.Kafka.dll") },
{ "KafkaProducerWrapper", Path.Combine(_installPathExtensionsDirectory, "NewRelic.Providers.Wrapper.Kafka.dll") },
{ "KafkaSerializerWrapper", Path.Combine(_installPathExtensionsDirectory, "NewRelic.Providers.Wrapper.Kafka.dll") },
{ "KafkaConsumerWrapper", Path.Combine(_installPathExtensionsDirectory, "NewRelic.Providers.Wrapper.Kafka.dll") }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@ SPDX-License-Identifier: Apache-2.0
<extension xmlns="urn:newrelic-extension">

<instrumentation>

<tracerFactory name="KafkaProducerWrapper">
<match assemblyName="Confluent.Kafka" className="Confluent.Kafka.Producer`2">
<exactMethodMatcher methodName="Produce" parameters="Confluent.Kafka.TopicPartition,Confluent.Kafka.Message`2[!0,!1],System.Action`1[Confluent.Kafka.DeliveryReport`2[!0,!1]]"/>
<exactMethodMatcher methodName="ProduceAsync" parameters="Confluent.Kafka.TopicPartition,Confluent.Kafka.Message`2[!0,!1],System.Threading.CancellationToken"/>
</match>
</tracerFactory>

<tracerFactory name="KafkaConsumerWrapper">
<match assemblyName="Confluent.Kafka" className="Confluent.Kafka.Consumer`2">
<exactMethodMatcher methodName="Consume" parameters="System.Int32"/>
</match>
</tracerFactory>

<tracerFactory name="KafkaSerializerWrapper">
<match assemblyName="Confluent.Kafka" className="Confluent.Kafka.Serializers+Utf8Serializer">
<exactMethodMatcher methodName="Serialize" />
Expand All @@ -44,5 +44,14 @@ SPDX-License-Identifier: Apache-2.0
</match>
</tracerFactory>

<tracerFactory name="KafkaBuilderWrapper">
<match assemblyName="Confluent.Kafka" className="Confluent.Kafka.ProducerBuilder`2">
<exactMethodMatcher methodName="Build" />
</match>
<match assemblyName="Confluent.Kafka" className="Confluent.Kafka.ConsumerBuilder`2">
<exactMethodMatcher methodName="Build" />
</match>
</tracerFactory>

</instrumentation>
</extension>
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2020 New Relic, Inc. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

using System;
using System.Collections;
using NewRelic.Agent.Api;
using NewRelic.Agent.Extensions.Providers.Wrapper;
using NewRelic.Reflection;

namespace NewRelic.Providers.Wrapper.Kafka
{
public class KafkaBuilderWrapper : IWrapper
{
private Func<object, IEnumerable> _producerBuilderConfigGetter;
private Func<object, IEnumerable> _consumerBuilderConfigGetter;

private const string WrapperName = "KafkaBuilderWrapper";
private const string BootstrapServersKey = "bootstrap.servers";

public bool IsTransactionRequired => false;
public CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMethodInfo)
{
return new CanWrapResponse(WrapperName.Equals(instrumentedMethodInfo.RequestedWrapperName));
}

public AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent, ITransaction transaction)
{
var builder = instrumentedMethodCall.MethodCall.InvocationTarget;

dynamic configuration = null;

if (builder.GetType().Name == "ProducerBuilder`2")
{
var configGetter = _producerBuilderConfigGetter ??= VisibilityBypasser.Instance.GeneratePropertyAccessor<IEnumerable>(builder.GetType(), "Config");
configuration = configGetter(builder) as dynamic;
}
else if (builder.GetType().Name == "ConsumerBuilder`2")
{
var configGetter = _consumerBuilderConfigGetter ??= VisibilityBypasser.Instance.GeneratePropertyAccessor<IEnumerable>(builder.GetType(), "Config");
configuration = configGetter(builder) as dynamic;
}

if (configuration == null)
return Delegates.NoOp;

string bootstrapServers = null;

foreach (var kvp in configuration)
{
if (kvp.Key == BootstrapServersKey)
{
bootstrapServers = kvp.Value as string;
break;
}
}

if (!string.IsNullOrEmpty(bootstrapServers))
return Delegates.GetDelegateFor<object>(onSuccess: (producerOrConsumerAsObject) =>
{
KafkaHelper.AddBootstrapServersToCache(producerOrConsumerAsObject, bootstrapServers);
});

return Delegates.NoOp;

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ public AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall ins
segment.SetMessageBrokerDestination(topic);
transaction.SetKafkaMessageBrokerTransactionName(MessageBrokerDestinationType.Topic, BrokerVendorName, topic);

if (KafkaHelper.TryGetBootstrapServersFromCache(instrumentedMethodCall.MethodCall.InvocationTarget, out var bootstrapServers))
{
KafkaHelper.RecordKafkaNodeMetrics(agent, topic, bootstrapServers, false);
}

// get the Message.Headers property and process distributed trace headers
var messageAccessor = MessageAccessorDictionary.GetOrAdd(type, GetMessageAccessorFunc);
var messageAsObject = messageAccessor(resultAsObject);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2020 New Relic, Inc. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

using System.Collections.Concurrent;
using System.Collections.Generic;
using NewRelic.Agent.Api;

namespace NewRelic.Providers.Wrapper.Kafka
{
internal static class KafkaHelper
{
private static readonly ConcurrentDictionary<object, List<string>> _bootstrapServerCache = new();

public static void AddBootstrapServersToCache(object producerOrConsumerInstance, string bootStrapServers)
{
if (string.IsNullOrEmpty(bootStrapServers))
return;
var kafkaBootstrapServers = new List<string>();

// parse bootStrapServers - it's a comma separated list of host:port pairs
var servers = bootStrapServers.Split(',');
foreach (var server in servers)
{
kafkaBootstrapServers.Add(server);
}

_bootstrapServerCache[producerOrConsumerInstance] = kafkaBootstrapServers;
}

public static bool TryGetBootstrapServersFromCache(object producerOrConsumerInstance, out List<string> kafkaBootstrapServers)
{
return _bootstrapServerCache.TryGetValue(producerOrConsumerInstance, out kafkaBootstrapServers);
}

public static void RecordKafkaNodeMetrics(IAgent agent, string topicName, List<string> bootstrapServers, bool isProducer)
{
foreach (var server in bootstrapServers)
{
var mode = (isProducer? "Produce" : "Consume");

agent.RecordCountMetric($"MessageBroker/Kafka/Nodes/{server}");
agent.RecordCountMetric($"MessageBroker/Kafka/Nodes/{server}/{mode}/{topicName}");
}

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ public AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall ins

transaction.InsertDistributedTraceHeaders(messageMetadata, DistributedTraceHeadersSetter);

if (KafkaHelper.TryGetBootstrapServersFromCache(instrumentedMethodCall.MethodCall.InvocationTarget, out var bootstrapServers))
{
KafkaHelper.RecordKafkaNodeMetrics(agent, topicPartition.Topic, bootstrapServers, true);
}

return instrumentedMethodCall.MethodCall.Method.MethodName == "Produce" ? Delegates.GetDelegateFor(segment) : Delegates.GetAsyncDelegateFor<Task>(agent, segment);
}

Expand All @@ -43,6 +48,5 @@ private static void DistributedTraceHeadersSetter(MessageMetadata carrier, strin
carrier.Headers.Add(key, Encoding.ASCII.GetBytes(value));
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,12 @@ public async Task<string> ProduceAsync()
await Program.Producer.ProduceAsync();
return "Complete";
}

[HttpGet("bootstrap_server")]
public string GetBootstrapServer()
{
return Program.GetBootstrapServer();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,18 @@ public static async Task Main(string[] args)
await app.WaitForShutdownAsync();
}

public static string GetBootstrapServer()
{
var broker = Environment.GetEnvironmentVariable("NEW_RELIC_KAFKA_BROKER_NAME");
return $"{broker}:9092";
}

public static void SetupKafka(ILogger logger)
{
Thread.Sleep(15 * 1000); // Waiting for Kafka to get ready

var broker = Environment.GetEnvironmentVariable("NEW_RELIC_KAFKA_BROKER_NAME");
var kafkaConfig = new ConfigurationBuilder().AddInMemoryCollection().Build();
kafkaConfig["bootstrap.servers"] = $"{broker}:9092";
kafkaConfig["bootstrap.servers"] = GetBootstrapServer();
kafkaConfig["group.id"] = "kafka-dotnet-getting-started";
kafkaConfig["auto.offset.reset"] = "earliest";
kafkaConfig["dotnet.cancellation.delay.max.ms"] = "10000";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ public virtual void ExerciseApplication()
GetAndAssertStatusCode(address + "produceasync", System.Net.HttpStatusCode.OK);
}

public string GetBootstrapServer()
{
var address = $"http://localhost:{Port}/kafka/bootstrap_server";
var response = GetString(address);

return response;
}

public void Delay(int seconds)
{
Task.Delay(TimeSpan.FromSeconds(seconds)).GetAwaiter().GetResult();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public abstract class LinuxKafkaTest<T> : NewRelicIntegrationTest<T> where T : K

private readonly string _topicName;
private readonly T _fixture;
private string _bootstrapServer;

protected LinuxKafkaTest(T fixture, ITestOutputHelper output) : base(fixture)
{
Expand All @@ -42,6 +43,8 @@ protected LinuxKafkaTest(T fixture, ITestOutputHelper output) : base(fixture)
_fixture.Delay(15); // wait long enough to ensure kafka and app are ready
_fixture.ExerciseApplication();

_bootstrapServer = _fixture.GetBootstrapServer();

_fixture.Delay(11); // wait long enough to ensure a metric harvest occurs after we exercise the app
_fixture.AgentLog.WaitForLogLine(AgentLogBase.HarvestFinishedLogLineRegex, TimeSpan.FromSeconds(11));

Expand All @@ -65,26 +68,34 @@ public void Test()
var consumeTransactionName = @"OtherTransaction/Message/Kafka/Topic/Consume/Named/" + _topicName;
var produceWebTransactionName = @"WebTransaction/MVC/Kafka/Produce";

var messageBrokerNode = $"MessageBroker/Kafka/Nodes/{_bootstrapServer}";
var messageBrokerNodeProduceTopic = $"MessageBroker/Kafka/Nodes/{_bootstrapServer}/Produce/{_topicName}";
var messageBrokerNodeConsumeTopic = $"MessageBroker/Kafka/Nodes/{_bootstrapServer}/Consume/{_topicName}";

var metrics = _fixture.AgentLog.GetMetrics();
var spans = _fixture.AgentLog.GetSpanEvents();
var produceSpan = spans.FirstOrDefault(s => s.IntrinsicAttributes["name"].Equals(messageBrokerProduce));
var consumeTxnSpan = spans.FirstOrDefault(s => s.IntrinsicAttributes["name"].Equals(consumeTransactionName));

var expectedMetrics = new List<Assertions.ExpectedMetric>
{
new Assertions.ExpectedMetric { metricName = produceWebTransactionName, callCount = 2 }, // includes sync and async actions
new Assertions.ExpectedMetric { metricName = messageBrokerProduce, callCount = 2 },
new Assertions.ExpectedMetric { metricName = messageBrokerProduce, metricScope = produceWebTransactionName, callCount = 2 },
new Assertions.ExpectedMetric { metricName = messageBrokerProduceSerializationKey, callCount = 2 },
new Assertions.ExpectedMetric { metricName = messageBrokerProduceSerializationKey, metricScope = produceWebTransactionName, callCount = 2 },
new Assertions.ExpectedMetric { metricName = messageBrokerProduceSerializationValue, callCount = 2 },
new Assertions.ExpectedMetric { metricName = messageBrokerProduceSerializationValue, metricScope = produceWebTransactionName, callCount = 2 },

new Assertions.ExpectedMetric { metricName = consumeTransactionName, callCount = 2 },
new Assertions.ExpectedMetric { metricName = messageBrokerConsume, callCount = 2 },
new Assertions.ExpectedMetric { metricName = messageBrokerConsume, metricScope = consumeTransactionName, callCount = 2 },
new Assertions.ExpectedMetric { metricName = "Supportability/TraceContext/Create/Success", callCount = 2 },
new Assertions.ExpectedMetric { metricName = "Supportability/TraceContext/Accept/Success", callCount = 2 },
new() { metricName = produceWebTransactionName, callCount = 2 }, // includes sync and async actions
new() { metricName = messageBrokerProduce, callCount = 2 },
new() { metricName = messageBrokerProduce, metricScope = produceWebTransactionName, callCount = 2 },
new() { metricName = messageBrokerProduceSerializationKey, callCount = 2 },
new() { metricName = messageBrokerProduceSerializationKey, metricScope = produceWebTransactionName, callCount = 2 },
new() { metricName = messageBrokerProduceSerializationValue, callCount = 2 },
new() { metricName = messageBrokerProduceSerializationValue, metricScope = produceWebTransactionName, callCount = 2 },

new() { metricName = consumeTransactionName, callCount = 2 },
new() { metricName = messageBrokerConsume, callCount = 2 },
new() { metricName = messageBrokerConsume, metricScope = consumeTransactionName, callCount = 2 },
new() { metricName = "Supportability/TraceContext/Create/Success", callCount = 2 },
new() { metricName = "Supportability/TraceContext/Accept/Success", callCount = 2 },

new() { metricName = messageBrokerNode, callCount = 4 },
new() { metricName = messageBrokerNodeProduceTopic, callCount = 2 },
new() { metricName = messageBrokerNodeConsumeTopic, callCount = 2 }
};

NrAssert.Multiple(
Expand Down
Loading