Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

**DRAFT** feat: RabbitMQ version 7 instrumentation #2890

Closed
wants to merge 10 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,7 @@
"packageName": "nservicebus"
},
{
"packageName": "rabbitmq.client",
"ignorePatch": true,
"ignoreMinor": true,
"ignoreMajor": true,
"ignoreReason": "Breaking major update. See https://github.com/newrelic/newrelic-dotnet-agent/issues/2885"
"packageName": "rabbitmq.client"
},
{
"packageName": "restsharp"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

using System;
using System.Threading.Tasks;
using NewRelic.Agent.Api;
using NewRelic.Agent.Extensions.Providers.Wrapper;
using NewRelic.Agent.Extensions.SystemExtensions;
Expand Down Expand Up @@ -34,7 +35,20 @@ public AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall ins
serverPort: RabbitMqHelper.GetServerPort(instrumentedMethodCall, agent),
routingKey: queue); // no way to get routing key from BasicGet

return Delegates.GetDelegateFor(
return instrumentedMethodCall.IsAsync ?
Delegates.GetAsyncDelegateFor<Task>(
agent, segment, false,
onComplete: (t) =>
{
if (t.IsFaulted)
{
transaction.NoticeError(t.Exception);
}

segment.End();
})
:
Delegates.GetDelegateFor(
onFailure: transaction.NoticeError,
onComplete: segment.End
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

using System;
using System.Threading.Tasks;
using NewRelic.Agent.Api;
using NewRelic.Agent.Extensions.Providers.Wrapper;

Expand All @@ -23,12 +24,30 @@ public CanWrapResponse CanWrap(InstrumentedMethodInfo methodInfo)
public AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent, ITransaction transaction)
{
// 3.6.0+ (5.1.0+) (IModel)void BasicPublish(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties, byte[] body)

var segment = (RabbitMqHelper.GetRabbitMQVersion(instrumentedMethodCall) >= 6) ?
RabbitMqHelper.CreateSegmentForPublishWrappers6Plus(instrumentedMethodCall, transaction, BasicPropertiesIndex, agent) :
RabbitMqHelper.CreateSegmentForPublishWrappers(instrumentedMethodCall, transaction, BasicPropertiesIndex, agent);

return Delegates.GetDelegateFor(segment);
// v7+:
// public async ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey,
// bool mandatory, TProperties basicProperties, ReadOnlyMemory<byte> body,
// CancellationToken cancellationToken = default) where TProperties : IReadOnlyBasicProperties, IAmqpHeader
var rabbitMqVersion = RabbitMqHelper.GetRabbitMQVersion(instrumentedMethodCall);

var segment = (rabbitMqVersion >= 6) ?
RabbitMqHelper.CreateSegmentForPublishWrappers6Plus(instrumentedMethodCall, transaction, agent)
:
RabbitMqHelper.CreateSegmentForPublishWrappers(instrumentedMethodCall, transaction, agent);

if (rabbitMqVersion >= 6)
RabbitMqHelper.InsertDTHeaders6Plus(instrumentedMethodCall, transaction, BasicPropertiesIndex);
else
RabbitMqHelper.InsertDTHeaders(instrumentedMethodCall, transaction, BasicPropertiesIndex);


// TODO: Can we handle ValueTask<T> return type somehow? Without it, we can't properly manage a message broker segment that wraps the publish call
return instrumentedMethodCall.IsAsync ?
Delegates.GetAsyncDelegateFor<Task>(agent, segment, false, (_) =>
{
segment.End(); // TODO: this never gets called because the delegate is never invoked
})
: Delegates.GetDelegateFor(segment);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public CanWrapResponse CanWrap(InstrumentedMethodInfo methodInfo)
public AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent, ITransaction transaction)
{
// 3.5.X (IModel)void BasicPublish(string exchange, string routingKey, bool mandatory, bool immediate, IBasicProperties basicProperties, byte[] body)
var segment = RabbitMqHelper.CreateSegmentForPublishWrappers(instrumentedMethodCall, transaction, BasicPropertiesIndex, agent);
var segment = RabbitMqHelper.CreateSegmentForPublishWrappers(instrumentedMethodCall, transaction, agent);
return Delegates.GetDelegateFor(segment);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using NewRelic.Agent.Api;
using NewRelic.Agent.Extensions.Providers.Wrapper;
using NewRelic.Agent.Extensions.SystemExtensions;
Expand Down Expand Up @@ -40,6 +41,7 @@ public CanWrapResponse CanWrap(InstrumentedMethodInfo methodInfo)
public AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent, ITransaction transaction)
{
// (IBasicConsumer) void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, byte[] body)
// (V7 IAsyncBasicConsumer) Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body, CancellationToken cancellationToken = default)
var routingKey = instrumentedMethodCall.MethodCall.MethodArguments.ExtractNotNullAs<string>(4);
var destType = RabbitMqHelper.GetBrokerDestinationType(routingKey);
var destName = RabbitMqHelper.ResolveDestinationName(destType, routingKey);
Expand Down Expand Up @@ -69,13 +71,28 @@ public AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall ins
serverPort: port,
routingKey: routingKey);

return Delegates.GetDelegateFor(
onFailure: transaction.NoticeError,
onComplete: () =>
{
segment.End();
transaction.End();
});
return instrumentedMethodCall.IsAsync
? Delegates.GetAsyncDelegateFor<Task>(
agent,
segment,
false,
onComplete: (t) =>
{
if (t.IsFaulted)
{
transaction.NoticeError(t.Exception);
}

segment.End();
transaction.End();
})
: Delegates.GetDelegateFor(
onFailure: transaction.NoticeError,
onComplete: () =>
{
segment.End();
transaction.End();
});

IEnumerable<string> GetHeaderValue(IDictionary<string, object> carrier, string key)
{
Expand Down Expand Up @@ -107,14 +124,22 @@ private void GetServerDetails(InstrumentedMethodCall instrumentedMethodCall, out

try
{
_modelGetter ??= VisibilityBypasser.Instance.GeneratePropertyAccessor<object>(instrumentedMethodCall.MethodCall.InvocationTarget.GetType(), "Model");
// v7 renamed "model" to "channel"
if (RabbitMqHelper.GetRabbitMQVersion(instrumentedMethodCall.MethodCall.InvocationTarget.GetType()) >= 7)
{
_modelGetter ??= VisibilityBypasser.Instance.GeneratePropertyAccessor<object>(instrumentedMethodCall.MethodCall.InvocationTarget.GetType(), "Channel");
}
else
{
_modelGetter ??= VisibilityBypasser.Instance.GeneratePropertyAccessor<object>(instrumentedMethodCall.MethodCall.InvocationTarget.GetType(), "Model");
}
var model = _modelGetter(instrumentedMethodCall.MethodCall.InvocationTarget);

object connection = null;
var modelType = model.GetType();
var connectionGetter = _connectionGetter.GetOrAdd(modelType, GetConnectionForType);
if (connectionGetter != null)
{
{
connection = connectionGetter(modelType, model);
}

Expand All @@ -138,20 +163,17 @@ private void GetServerDetails(InstrumentedMethodCall instrumentedMethodCall, out
static Func<Type, object, object> GetConnectionForType(Type modelType)
{
var version = RabbitMqHelper.GetRabbitMQVersion(modelType); // caches version in RabbitMqHelper.
if (modelType.ToString() == "RabbitMQ.Client.Framing.Impl.Model")
{
return GetConnectionFromFramingModel;
}
else if (modelType.ToString() == "RabbitMQ.Client.Impl.AutorecoveringModel" && version <= 5)
return modelType.ToString() switch
{
return GetConnectionFromAutorecoveryModel5OrOlder;
}
else if (modelType.ToString() == "RabbitMQ.Client.Impl.AutorecoveringModel" && version >= 6)
{
return GetConnectionFromAutorecoveryModel6OrNewer;
}

return null;
"RabbitMQ.Client.Framing.Impl.Model" => GetConnectionFromFramingModel,
"RabbitMQ.Client.Impl.AutorecoveringModel" when version <= 5 =>
GetConnectionFromAutorecoveryModel5OrOlder,
"RabbitMQ.Client.Impl.AutorecoveringModel" when version == 6 =>
GetConnectionFromAutorecoveryModel6OrNewer,
"RabbitMQ.Client.Impl.AutorecoveringChannel" when version >= 7 =>
GetConnectionFromAutorecoveryModel6OrNewer,
_ => null
};
}

static object GetConnectionFromFramingModel(Type modelType, object model)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,72 @@ SPDX-License-Identifier: Apache-2.0
-->
<extension xmlns="urn:newrelic-extension">

<instrumentation>
<instrumentation>
<!-- Consume Pull -->
<tracerFactory name="BasicGetWrapper">
<match assemblyName="RabbitMQ.Client" className="RabbitMQ.Client.Framing.Impl.Model">
<exactMethodMatcher methodName="_Private_BasicGet" />
</match>
</tracerFactory>
<tracerFactory name="BasicGetWrapper">
<match assemblyName="RabbitMQ.Client" className="RabbitMQ.Client.Framing.Impl.Model">
<exactMethodMatcher methodName="_Private_BasicGet" />
</match>
<!--
RabbitMQ v7+
public async Task<BasicGetResult?> BasicGetAsync(string queue, bool autoAck, CancellationToken cancellationToken)
-->
<match assemblyName="RabbitMQ.Client" className="RabbitMQ.Client.Impl.Channel" >
<exactMethodMatcher methodName="BasicGetAsync" parameters="System.String,System.Boolean,System.Threading.CancellationToken"/>
</match>
</tracerFactory>

<!-- Produce -->
<tracerFactory name="BasicPublishWrapper">
<match assemblyName="RabbitMQ.Client" className="RabbitMQ.Client.Framing.Impl.Model">
<exactMethodMatcher methodName="_Private_BasicPublish" parameters="System.String,System.String,System.Boolean,RabbitMQ.Client.IBasicProperties,System.Byte[]"/>
<exactMethodMatcher methodName="_Private_BasicPublish" parameters="System.String,System.String,System.Boolean,RabbitMQ.Client.IBasicProperties,System.ReadOnlyMemory`1[System.Byte]"/>
</match>
</tracerFactory>
<tracerFactory name="BasicPublishWrapper">
<match assemblyName="RabbitMQ.Client" className="RabbitMQ.Client.Framing.Impl.Model">
<exactMethodMatcher methodName="_Private_BasicPublish" parameters="System.String,System.String,System.Boolean,RabbitMQ.Client.IBasicProperties,System.Byte[]"/>
<exactMethodMatcher methodName="_Private_BasicPublish" parameters="System.String,System.String,System.Boolean,RabbitMQ.Client.IBasicProperties,System.ReadOnlyMemory`1[System.Byte]"/>
</match>
<!--
RabbitMQ v7+
public async ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey,
bool mandatory, TProperties basicProperties, ReadOnlyMemory<byte> body,
CancellationToken cancellationToken = default)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
-->
<match assemblyName="RabbitMQ.Client" className="RabbitMQ.Client.Impl.Channel" >
<exactMethodMatcher methodName="BasicPublishAsync" parameters="System.String,System.String,System.Boolean,!!0,System.ReadOnlyMemory`1[System.Byte],System.Threading.CancellationToken"/>
</match>
</tracerFactory>

<!-- Produce -->
<tracerFactory name="BasicPublishWrapperLegacy">
<match assemblyName="RabbitMQ.Client" className="RabbitMQ.Client.Framing.Impl.Model">
<exactMethodMatcher methodName="_Private_BasicPublish" parameters="System.String,System.String,System.Boolean,System.Boolean,RabbitMQ.Client.IBasicProperties,System.Byte[]"/>
</match>
</tracerFactory>
<tracerFactory name="BasicPublishWrapperLegacy">
<match assemblyName="RabbitMQ.Client" className="RabbitMQ.Client.Framing.Impl.Model">
<exactMethodMatcher methodName="_Private_BasicPublish" parameters="System.String,System.String,System.Boolean,System.Boolean,RabbitMQ.Client.IBasicProperties,System.Byte[]"/>
</match>
</tracerFactory>

<!-- Consume Push / Event / Subscribe -->
<tracerFactory name="HandleBasicDeliverWrapper">
<tracerFactory name="HandleBasicDeliverWrapper">
<match assemblyName="RabbitMQ.Client" className="RabbitMQ.Client.Events.EventingBasicConsumer">
<exactMethodMatcher methodName="HandleBasicDeliver" />
</match>
</tracerFactory>

<tracerFactory name="QueuePurgeWrapper">
<match assemblyName="RabbitMQ.Client" className="RabbitMQ.Client.Framing.Impl.Model">
<exactMethodMatcher methodName="_Private_QueuePurge" />
</match>
</tracerFactory>
<!--
RabbitMQ v7+
public override Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body, CancellationToken cancellationToken = default)
-->
<match assemblyName="RabbitMQ.Client" className="RabbitMQ.Client.Events.AsyncEventingBasicConsumer">
<exactMethodMatcher methodName="HandleBasicDeliverAsync" parameters="System.String,System.UInt64,System.Boolean,System.String,System.String,RabbitMQ.Client.IReadOnlyBasicProperties,System.ReadOnlyMemory`1[System.Byte],System.Threading.CancellationToken"/>
</match>
</tracerFactory>

</instrumentation>
<tracerFactory name="QueuePurgeWrapper">
<match assemblyName="RabbitMQ.Client" className="RabbitMQ.Client.Framing.Impl.Model">
<exactMethodMatcher methodName="_Private_QueuePurge" />
</match>
<!--
RabbitMQ 7+
public async Task<uint> QueuePurgeAsync(string queue, CancellationToken cancellationToken)
-->
<match assemblyName="RabbitMQ.Client" className="RabbitMQ.Client.Impl.Channel">
<exactMethodMatcher methodName="QueuePurgeAsync" parameters="System.String,System.Threading.CancellationToken"/>
</match>
</tracerFactory>
</instrumentation>
</extension>
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2020 New Relic, Inc. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

using System.Threading.Tasks;
using NewRelic.Agent.Api;
using NewRelic.Agent.Extensions.Providers.Wrapper;
using NewRelic.Agent.Extensions.SystemExtensions;
Expand All @@ -21,6 +22,7 @@ public CanWrapResponse CanWrap(InstrumentedMethodInfo methodInfo)
public AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent, ITransaction transaction)
{
// (IModel) uint QueuePurge(string queue)
// Task<uint> QueuePurgeAsync(string queue, CancellationToken cancellationToken)
var queue = instrumentedMethodCall.MethodCall.MethodArguments.ExtractNotNullAs<string>(0);
var destType = RabbitMqHelper.GetBrokerDestinationType(queue);
var destName = RabbitMqHelper.ResolveDestinationName(destType, queue);
Expand All @@ -37,7 +39,7 @@ public AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall ins
// Routing key is not available for this method.
// It only returns uint and invocationTarget does not have the value.

return Delegates.GetDelegateFor(segment);
return instrumentedMethodCall.IsAsync ? Delegates.GetAsyncDelegateFor<Task>(agent, segment) : Delegates.GetDelegateFor(segment);
}
}
}
Loading
Loading