From 1cc327ce9a20ae0304b6e9170637ad3609610ea1 Mon Sep 17 00:00:00 2001 From: Marty Tippin <120425148+tippmar-nr@users.noreply.github.com> Date: Tue, 12 Nov 2024 14:40:27 -0600 Subject: [PATCH 1/5] WIP - service bus processor instrumentation --- .../AzureServiceBusProcessorWrapper.cs | 69 +++++++++ .../AzureServiceBusReceiveWrapper.cs | 123 +++++----------- .../AzureServiceBusReceiverManagerWrapper.cs | 43 ++++++ .../AzureServiceBusWrapperBase.cs | 91 +++++++++++- .../AzureServiceBus/Instrumentation.xml | 20 +++ .../AzureServiceBusExerciser.cs | 77 +++++++++- .../AzureServiceBusProcessorTests.cs | 136 ++++++++++++++++++ 7 files changed, 468 insertions(+), 91 deletions(-) create mode 100644 src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusProcessorWrapper.cs create mode 100644 src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiverManagerWrapper.cs create mode 100644 tests/Agent/IntegrationTests/UnboundedIntegrationTests/AzureServiceBus/AzureServiceBusProcessorTests.cs diff --git a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusProcessorWrapper.cs b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusProcessorWrapper.cs new file mode 100644 index 000000000..7b3c1f7b5 --- /dev/null +++ b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusProcessorWrapper.cs @@ -0,0 +1,69 @@ +// Copyright 2020 New Relic, Inc. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +using System.Threading.Tasks; +using NewRelic.Agent.Api; +using NewRelic.Agent.Extensions.Providers.Wrapper; +using NewRelic.Agent.Extensions.SystemExtensions; + +namespace NewRelic.Providers.Wrapper.AzureServiceBus; + +public class AzureServiceBusProcessorWrapper : AzureServiceBusWrapperBase +{ + public override bool IsTransactionRequired => false; + + public override CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMethodInfo) + { + var canWrap = instrumentedMethodInfo.RequestedWrapperName.Equals(nameof(AzureServiceBusProcessorWrapper)); + return new CanWrapResponse(canWrap); + } + + public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent, ITransaction transaction) + { + dynamic serviceBusProcessor = instrumentedMethodCall.MethodCall.InvocationTarget; + string queueName = serviceBusProcessor.EntityPath; // some-queue-name + string fqns = serviceBusProcessor.FullyQualifiedNamespace; // some-service-bus-entity.servicebus.windows.net + + transaction = agent.CreateTransaction( + destinationType: MessageBrokerDestinationType.Queue, + BrokerVendorName, + destination: queueName); + + // ??? + var segment = transaction.StartMethodSegment(instrumentedMethodCall.MethodCall, "Azure.Messaging.ServiceBus.ServiceBusProcessor", "ProcessMessageAsync"); + + //// start a message broker segment ??? + //var segment = transaction.StartMessageBrokerSegment( + // instrumentedMethodCall.MethodCall, + // MessageBrokerDestinationType.Queue, + // MessageBrokerAction.Consume, + // BrokerVendorName, + // queueName, + // serverAddress: fqns); + + + return instrumentedMethodCall.IsAsync ? + Delegates.GetAsyncDelegateFor( + agent, + segment, + true, // TODO Is this correct?? + t => + { + if (t.IsFaulted) + { + transaction.NoticeError(t.Exception); + } + + segment.End(); + transaction.End(); + }, TaskContinuationOptions.ExecuteSynchronously) + : + Delegates.GetDelegateFor( + onFailure: transaction.NoticeError, + onComplete: () => + { + segment.End(); + transaction.End(); + }); + } +} diff --git a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiveWrapper.cs b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiveWrapper.cs index 01e58c33f..ac0783084 100644 --- a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiveWrapper.cs +++ b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiveWrapper.cs @@ -2,11 +2,9 @@ // SPDX-License-Identifier: Apache-2.0 using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Collections.ObjectModel; using System.Threading.Tasks; using NewRelic.Agent.Api; +using NewRelic.Agent.Api.Experimental; using NewRelic.Agent.Extensions.Providers.Wrapper; using NewRelic.Reflection; @@ -14,7 +12,10 @@ namespace NewRelic.Providers.Wrapper.AzureServiceBus; public class AzureServiceBusReceiveWrapper : AzureServiceBusWrapperBase { - private static readonly ConcurrentDictionary> _getResultFromGenericTask = new(); + private Func _innerReceiverAccessor; + private Func _innerReceiverIsProcessorAccessor; + + public override bool IsTransactionRequired => false; // only partially true. See the code below... public override CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMethodInfo) { @@ -26,7 +27,14 @@ public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMetho { dynamic serviceBusReceiver = instrumentedMethodCall.MethodCall.InvocationTarget; string queueName = serviceBusReceiver.EntityPath; // some-queue-name - string fqns = serviceBusReceiver.FullyQualifiedNamespace; // some-service-bus-entity.servicebus.windows.net + string fqns = serviceBusReceiver.FullyQualifiedNamespace; // some-service-bus-entity.servicebus.windows.net + + _innerReceiverAccessor ??= VisibilityBypasser.Instance.GeneratePropertyAccessor(serviceBusReceiver.GetType(), "InnerReceiver"); + object innerReceiver = _innerReceiverAccessor.Invoke(serviceBusReceiver); + + // use reflection to access the _isProcessor field of the inner receiver + _innerReceiverIsProcessorAccessor ??= VisibilityBypasser.Instance.GenerateFieldReadAccessor(innerReceiver.GetType(), "_isProcessor"); + var isProcessor = _innerReceiverIsProcessorAccessor.Invoke(innerReceiver); MessageBrokerAction action = instrumentedMethodCall.MethodCall.Method.MethodName switch @@ -42,6 +50,16 @@ public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMetho _ => throw new ArgumentOutOfRangeException(nameof(action), $"Unexpected instrumented method call: {instrumentedMethodCall.MethodCall.Method.MethodName}") }; + // if the inner receiver is configured as a processor, start a transaction + // the transaction will end at the conclusion of ReceiverManager.ProcessOneMessage() + if (isProcessor) + { + transaction = agent.CreateTransaction( + destinationType: MessageBrokerDestinationType.Queue, + BrokerVendorName, + destination: queueName); + } + // start a message broker segment var segment = transaction.StartMessageBrokerSegment( instrumentedMethodCall.MethodCall, @@ -49,7 +67,9 @@ public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMetho action, BrokerVendorName, queueName, - serverAddress: fqns ); + serverAddress: fqns); + + var instrumentedMethodName = instrumentedMethodCall.MethodCall.Method.MethodName; return instrumentedMethodCall.IsAsync ? @@ -57,87 +77,22 @@ public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMetho Delegates.GetAsyncDelegateFor( agent, segment, - false, - HandleResponse, + true, + (responseTask) => + { + try + { + HandleReceiveResponse(responseTask, instrumentedMethodCall.MethodCall.Method.MethodName, transaction); + } + finally + { + segment.End(); + } + }, TaskContinuationOptions.ExecuteSynchronously) : Delegates.GetDelegateFor( onFailure: transaction.NoticeError, onComplete: segment.End, - onSuccess: ExtractDTHeadersIfAvailable); - - void HandleResponse(Task responseTask) - { - try - { - if (responseTask.IsFaulted) - { - transaction.NoticeError(responseTask.Exception); - return; - } - - var resultObj = GetTaskResultFromObject(responseTask); - ExtractDTHeadersIfAvailable(resultObj); - } - finally - { - segment.End(); - } - } - - - - void ExtractDTHeadersIfAvailable(object resultObj) - { - if (resultObj != null) - { - switch (instrumentedMethodCall.MethodCall.Method.MethodName) - { - case "ReceiveMessagesAsync": - case "ReceiveDeferredMessagesAsync": - case "PeekMessagesInternalAsync": - // the response contains a list of messages. - // get the first message from the response and extract DT headers - dynamic messages = resultObj; - if (messages.Count > 0) - { - var msg = messages[0]; - if (msg.ApplicationProperties is ReadOnlyDictionary applicationProperties) - { - transaction.AcceptDistributedTraceHeaders(applicationProperties, ProcessHeaders, TransportType.Queue); - } - } - break; - } - } - IEnumerable ProcessHeaders(ReadOnlyDictionary applicationProperties, string key) - { - var headerValues = new List(); - foreach (var item in applicationProperties) - { - if (item.Key.Equals(key, StringComparison.OrdinalIgnoreCase)) - { - headerValues.Add(item.Value as string); - } - } - - return headerValues; - } - } - } - - private static object GetTaskResultFromObject(object taskObj) - { - var task = taskObj as Task; - if (task == null) - { - return null; - } - if (task.IsFaulted) - { - return null; - } - - var getResponse = _getResultFromGenericTask.GetOrAdd(task.GetType(), t => VisibilityBypasser.Instance.GeneratePropertyAccessor(t, "Result")); - return getResponse(task); + onSuccess: (resultObj) => ExtractDTHeadersIfAvailable(resultObj, transaction, instrumentedMethodName)); } } diff --git a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiverManagerWrapper.cs b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiverManagerWrapper.cs new file mode 100644 index 000000000..a302baf23 --- /dev/null +++ b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiverManagerWrapper.cs @@ -0,0 +1,43 @@ +// Copyright 2020 New Relic, Inc. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +using System.Threading.Tasks; +using NewRelic.Agent.Api; +using NewRelic.Agent.Extensions.Providers.Wrapper; + +namespace NewRelic.Providers.Wrapper.AzureServiceBus +{ + public class AzureServiceBusReceiverManagerWrapper : AzureServiceBusWrapperBase + { + public override bool IsTransactionRequired => false; + + public override CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMethodInfo) + { + var canWrap = instrumentedMethodInfo.RequestedWrapperName.Equals(nameof(AzureServiceBusReceiverManagerWrapper)); + return new CanWrapResponse(canWrap); + } + + public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent, + ITransaction transaction) + { + + // TODO not working at present -- transaction is always NoOpTransaction here but shouldn't be + // make sure the transaction ends when the receiver manager is done processing messages + if (instrumentedMethodCall.IsAsync) + { + return Delegates.GetAsyncDelegateFor( + agent, + transaction.CurrentSegment, + true, + onComplete: _ => + { + transaction.End(); + }); + } + return Delegates.GetDelegateFor(onComplete: () => + { + transaction.End(); + }); + } + } +} diff --git a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusWrapperBase.cs b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusWrapperBase.cs index 97d9332c9..b08f97e74 100644 --- a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusWrapperBase.cs +++ b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusWrapperBase.cs @@ -1,20 +1,99 @@ // Copyright 2020 New Relic, Inc. All rights reserved. // SPDX-License-Identifier: Apache-2.0 +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Threading.Tasks; using NewRelic.Agent.Api; using NewRelic.Agent.Extensions.Providers.Wrapper; +using NewRelic.Reflection; -namespace NewRelic.Providers.Wrapper.AzureServiceBus +namespace NewRelic.Providers.Wrapper.AzureServiceBus; + +public abstract class AzureServiceBusWrapperBase : IWrapper { - public abstract class AzureServiceBusWrapperBase : IWrapper + private static readonly ConcurrentDictionary> _getResultFromGenericTask = new(); + + protected const string BrokerVendorName = "AzureServiceBus"; + + public virtual bool IsTransactionRequired => true; // only instrument service bus methods if we're already in a transaction + + public abstract CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMethodInfo); + + public abstract AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent, ITransaction transaction); + + protected static object GetTaskResultFromObject(object taskObj) + { + var task = taskObj as Task; + if (task == null) + { + return null; + } + if (task.IsFaulted) + { + return null; + } + if (task.IsCanceled) + { + return null; + } + + var getResponse = _getResultFromGenericTask.GetOrAdd(task.GetType(), t => VisibilityBypasser.Instance.GeneratePropertyAccessor(t, "Result")); + return getResponse(task); + } + + protected void HandleReceiveResponse(Task responseTask, string instrumentedMethodName, ITransaction transaction) { - protected const string BrokerVendorName = "AzureServiceBus"; + if (responseTask.IsCanceled) + return; - public bool IsTransactionRequired => true; // only instrument service bus methods if we're already in a transaction + if (responseTask.IsFaulted) + { + transaction.NoticeError(responseTask.Exception); + return; + } - public abstract CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMethodInfo); + var resultObj = GetTaskResultFromObject(responseTask); + ExtractDTHeadersIfAvailable(resultObj, transaction, instrumentedMethodName); + } + protected void ExtractDTHeadersIfAvailable(object resultObj, ITransaction transaction, string instrumentedMethodName) + { + if (resultObj != null) + { + switch (instrumentedMethodName) + { + case "ReceiveMessagesAsync": + case "ReceiveDeferredMessagesAsync": + case "PeekMessagesInternalAsync": + // the response contains a list of messages. + // get the first message from the response and extract DT headers + dynamic messages = resultObj; + if (messages.Count > 0) + { + var msg = messages[0]; + if (msg.ApplicationProperties is ReadOnlyDictionary applicationProperties) + { + transaction.AcceptDistributedTraceHeaders(applicationProperties, ProcessHeaders, TransportType.Queue); + } + } + break; + } + } + } - public abstract AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent,ITransaction transaction); + protected IEnumerable ProcessHeaders(ReadOnlyDictionary applicationProperties, string key) + { + var headerValues = new List(); + foreach (var item in applicationProperties) + { + if (item.Key.Equals(key, StringComparison.OrdinalIgnoreCase)) + { + headerValues.Add(item.Value as string); + } + } + return headerValues; } } diff --git a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/Instrumentation.xml b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/Instrumentation.xml index c1ea3db42..534d2c675 100644 --- a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/Instrumentation.xml +++ b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/Instrumentation.xml @@ -107,5 +107,25 @@ SPDX-License-Identifier: Apache-2.0 + + + + + + + + + + + + diff --git a/tests/Agent/IntegrationTests/SharedApplications/Common/MultiFunctionApplicationHelpers/NetStandardLibraries/AzureServiceBus/AzureServiceBusExerciser.cs b/tests/Agent/IntegrationTests/SharedApplications/Common/MultiFunctionApplicationHelpers/NetStandardLibraries/AzureServiceBus/AzureServiceBusExerciser.cs index 397e2f095..63797795e 100644 --- a/tests/Agent/IntegrationTests/SharedApplications/Common/MultiFunctionApplicationHelpers/NetStandardLibraries/AzureServiceBus/AzureServiceBusExerciser.cs +++ b/tests/Agent/IntegrationTests/SharedApplications/Common/MultiFunctionApplicationHelpers/NetStandardLibraries/AzureServiceBus/AzureServiceBusExerciser.cs @@ -3,6 +3,7 @@ using System; using System.Runtime.CompilerServices; +using System.Threading; using System.Threading.Tasks; using Azure.Messaging.ServiceBus; using Azure.Messaging.ServiceBus.Administration; @@ -121,7 +122,8 @@ public static async Task ReceiveAndAbandonAMessage(string queueName) // receive the message again and complete it to remove it from the queue var receivedMessage2 = await receiver.ReceiveMessageAsync(); - await receiver.CompleteMessageAsync(receivedMessage2); } + await receiver.CompleteMessageAsync(receivedMessage2); + } private static async Task SendAMessage(ServiceBusClient client, string queueName, string messageBody) @@ -143,4 +145,77 @@ public static async Task SendAndReceiveAMessage(string queueName) await using var receiver = client.CreateReceiver(queueName, new ServiceBusReceiverOptions() { ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete }); await receiver.ReceiveMessageAsync(); } + + [LibraryMethod] + // [Transaction] no transaction on this one; we're testing that the ServiceBusProcessor is creating transactions + [MethodImpl(MethodImplOptions.NoOptimization | MethodImplOptions.NoInlining)] + public static async Task ExerciseServiceBusProcessor(string queueName) + { + await using var client = new ServiceBusClient(AzureServiceBusConfiguration.ConnectionString); + + // create the sender + await using ServiceBusSender sender = client.CreateSender(queueName); + + // create a set of messages that we can send + ServiceBusMessage[] messages = new ServiceBusMessage[] + { + new ServiceBusMessage("First"), + new ServiceBusMessage("Second") + }; + + // send the message batch + await sender.SendMessagesAsync(messages); + + // create the options to use for configuring the processor + ServiceBusProcessorOptions options = new() + { + MaxConcurrentCalls = 2 // multi-threading. Yay! + }; + + // create a processor that we can use to process the messages + await using ServiceBusProcessor processor = client.CreateProcessor(queueName, options); + + var receivedMessages = 0; + + // configure the message handler to use + processor.ProcessMessageAsync += MessageHandler; + processor.ProcessErrorAsync += ErrorHandler; // ErrorHandler is required, but we won't exercise it + + Task MessageHandler(ProcessMessageEventArgs args) + { + string body = args.Message.Body.ToString(); + Console.WriteLine(body); + + Interlocked.Increment(ref receivedMessages); + + return Task.CompletedTask; + } + Task ErrorHandler(ProcessErrorEventArgs args) + { + // the error source tells me at what point in the processing an error occurred + Console.WriteLine(args.ErrorSource); + // the fully qualified namespace is available + Console.WriteLine(args.FullyQualifiedNamespace); + // as well as the entity path + Console.WriteLine(args.EntityPath); + Console.WriteLine(args.Exception.ToString()); + return Task.CompletedTask; + } + + // start processing + await processor.StartProcessingAsync(); + + // wait up to 30 seconds or until receivedMessages has a count of 2 + var timeout = DateTime.UtcNow.AddSeconds(30); + while (receivedMessages < 2 && DateTime.UtcNow < timeout) + { + await Task.Delay(1000); + } + + // chill for a bit + await Task.Delay(TimeSpan.FromSeconds(5)); + + // stop processing + await processor.StopProcessingAsync(); + } } diff --git a/tests/Agent/IntegrationTests/UnboundedIntegrationTests/AzureServiceBus/AzureServiceBusProcessorTests.cs b/tests/Agent/IntegrationTests/UnboundedIntegrationTests/AzureServiceBus/AzureServiceBusProcessorTests.cs new file mode 100644 index 000000000..f714db9e7 --- /dev/null +++ b/tests/Agent/IntegrationTests/UnboundedIntegrationTests/AzureServiceBus/AzureServiceBusProcessorTests.cs @@ -0,0 +1,136 @@ +// Copyright 2020 New Relic, Inc. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +using System; +using System.Collections.Generic; +using System.Drawing.Imaging; +using System.Linq; +using NewRelic.Agent.IntegrationTestHelpers; +using NewRelic.Agent.IntegrationTestHelpers.RemoteServiceFixtures; +using NewRelic.Testing.Assertions; +using Xunit; +using Xunit.Abstractions; + +namespace NewRelic.Agent.UnboundedIntegrationTests.AzureServiceBus; + +public abstract class AzureServiceBusProcessorTestsBase : NewRelicIntegrationTest + where TFixture : ConsoleDynamicMethodFixture +{ + private readonly TFixture _fixture; + private readonly string _queueName; + + protected AzureServiceBusProcessorTestsBase(TFixture fixture, ITestOutputHelper output) : base(fixture) + { + _fixture = fixture; + _fixture.SetTimeout(TimeSpan.FromMinutes(1)); + _fixture.TestLogger = output; + + _queueName = $"test-queue-{Guid.NewGuid()}"; + + _fixture.AddCommand($"AzureServiceBusExerciser InitializeQueue {_queueName}"); + _fixture.AddCommand($"AzureServiceBusExerciser ExerciseServiceBusProcessor {_queueName}"); + _fixture.AddCommand($"AzureServiceBusExerciser DeleteQueue {_queueName}"); + + _fixture.AddActions + ( + setupConfiguration: () => + { + var configModifier = new NewRelicConfigModifier(fixture.DestinationNewRelicConfigFilePath); + + configModifier + .SetLogLevel("finest") + .EnableDistributedTrace() + .ForceTransactionTraces() + .ConfigureFasterMetricsHarvestCycle(20) + .ConfigureFasterSpanEventsHarvestCycle(20) + .ConfigureFasterTransactionTracesHarvestCycle(25) + ; + }, + exerciseApplication: () => + { + _fixture.AgentLog.WaitForLogLine(AgentLogBase.TransactionTransformCompletedLogLineRegex, TimeSpan.FromMinutes(1)); + } + ); + + _fixture.Initialize(); + } + + private readonly string _metricNameBase = "MessageBroker/AzureServiceBus/Queue/Consume/Named"; + private readonly string _transactionNameBase = "OtherTransaction/Message/AzureServiceBus/Queue/Named"; + + [Fact] + public void Test() + { + var metrics = _fixture.AgentLog.GetMetrics().ToList(); + + var expectedMetrics = new List + { + new() { metricName = $"{_metricNameBase}/{_queueName}", callCount = 2}, + new() { metricName = $"{_metricNameBase}/{_queueName}", callCount = 2, metricScope = $"{_transactionNameBase}/{_queueName}"}, + }; + + var expectedTransactinEvent = _fixture.AgentLog.TryGetTransactionEvent($"{_transactionNameBase}/{_queueName}"); + + var expectedTransactionTraceSegments = new List + { + $"{_metricNameBase}/{_queueName}" + }; + + var transactionSample = _fixture.AgentLog.TryGetTransactionSample($"{_transactionNameBase}/{_queueName}"); + + var queueConsumeSpanEvents = _fixture.AgentLog.TryGetSpanEvent($"{_metricNameBase} /{_queueName}"); + + var expectedConsumeAgentAttributes = new List + { + "server.address", + "messaging.destination.name", + }; + + var expectedIntrinsicAttributes = new List { "span.kind", }; + + Assertions.MetricsExist(expectedMetrics, metrics); + + NrAssert.Multiple( + () => Assert.True(expectedTransactinEvent != null, "expectedTransactionEvent should not be null"), + () => Assert.True(transactionSample != null, "transactionSample should not be null"), + () => Assertions.TransactionTraceSegmentsExist(expectedTransactionTraceSegments, transactionSample), + + () => Assertions.SpanEventHasAttributes(expectedConsumeAgentAttributes, + Tests.TestSerializationHelpers.Models.SpanEventAttributeType.Agent, queueConsumeSpanEvents), + () => Assertions.SpanEventHasAttributes(expectedIntrinsicAttributes, + Tests.TestSerializationHelpers.Models.SpanEventAttributeType.Intrinsic, queueConsumeSpanEvents) + ); + } +} + +[NetFrameworkTest] +public class AzureServiceBusProcessorTestsFWLatest : AzureServiceBusProcessorTestsBase +{ + public AzureServiceBusProcessorTestsFWLatest(ConsoleDynamicMethodFixtureFWLatest fixture, ITestOutputHelper output) : base(fixture, output) + { + } +} + +[NetFrameworkTest] +public class AzureServiceBusProcessorTestsFW462 : AzureServiceBusProcessorTestsBase +{ + public AzureServiceBusProcessorTestsFW462(ConsoleDynamicMethodFixtureFW462 fixture, ITestOutputHelper output) : base(fixture, output) + { + } +} + +[NetCoreTest] +public class AzureServiceBusProcessorTestsCoreOldest : AzureServiceBusProcessorTestsBase +{ + public AzureServiceBusProcessorTestsCoreOldest(ConsoleDynamicMethodFixtureCoreOldest fixture, ITestOutputHelper output) : base(fixture, output) + { + } +} + +[NetCoreTest] +public class AzureServiceBusProcessorTestsCoreLatest : AzureServiceBusProcessorTestsBase +{ + public AzureServiceBusProcessorTestsCoreLatest(ConsoleDynamicMethodFixtureCoreLatest fixture, ITestOutputHelper output) : base(fixture, output) + { + } +} From 2418bd4af6db4096532ca0f192119771578b4ba0 Mon Sep 17 00:00:00 2001 From: Marty Tippin <120425148+tippmar-nr@users.noreply.github.com> Date: Wed, 13 Nov 2024 15:33:35 -0600 Subject: [PATCH 2/5] More guesses. Still not working --- .../AzureServiceBusProcessorWrapper.cs | 14 +++++++------- .../AzureServiceBusReceiveWrapper.cs | 4 ++-- .../AzureServiceBusReceiverManagerWrapper.cs | 9 ++++----- 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusProcessorWrapper.cs b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusProcessorWrapper.cs index 7b3c1f7b5..d5c91c6bf 100644 --- a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusProcessorWrapper.cs +++ b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusProcessorWrapper.cs @@ -24,13 +24,13 @@ public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMetho string queueName = serviceBusProcessor.EntityPath; // some-queue-name string fqns = serviceBusProcessor.FullyQualifiedNamespace; // some-service-bus-entity.servicebus.windows.net - transaction = agent.CreateTransaction( - destinationType: MessageBrokerDestinationType.Queue, - BrokerVendorName, - destination: queueName); + //transaction = agent.CreateTransaction( + // destinationType: MessageBrokerDestinationType.Queue, + // BrokerVendorName, + // destination: queueName); // ??? - var segment = transaction.StartMethodSegment(instrumentedMethodCall.MethodCall, "Azure.Messaging.ServiceBus.ServiceBusProcessor", "ProcessMessageAsync"); + var segment = agent.CurrentTransaction.StartMethodSegment(instrumentedMethodCall.MethodCall, "Azure.Messaging.ServiceBus.ServiceBusProcessor", "ProcessMessageAsync"); //// start a message broker segment ??? //var segment = transaction.StartMessageBrokerSegment( @@ -55,7 +55,7 @@ public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMetho } segment.End(); - transaction.End(); +// transaction.End(); }, TaskContinuationOptions.ExecuteSynchronously) : Delegates.GetDelegateFor( @@ -63,7 +63,7 @@ public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMetho onComplete: () => { segment.End(); - transaction.End(); +// transaction.End(); }); } } diff --git a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiveWrapper.cs b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiveWrapper.cs index ac0783084..371da492e 100644 --- a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiveWrapper.cs +++ b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiveWrapper.cs @@ -50,9 +50,9 @@ public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMetho _ => throw new ArgumentOutOfRangeException(nameof(action), $"Unexpected instrumented method call: {instrumentedMethodCall.MethodCall.Method.MethodName}") }; - // if the inner receiver is configured as a processor, start a transaction + // if the inner receiver is configured as a processor and this is a ReceiveMessagesAsync call, start a transaction // the transaction will end at the conclusion of ReceiverManager.ProcessOneMessage() - if (isProcessor) + if (isProcessor && instrumentedMethodCall.MethodCall.Method.MethodName == "ReceiveMessagesAsync") { transaction = agent.CreateTransaction( destinationType: MessageBrokerDestinationType.Queue, diff --git a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiverManagerWrapper.cs b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiverManagerWrapper.cs index a302baf23..62250a199 100644 --- a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiverManagerWrapper.cs +++ b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiverManagerWrapper.cs @@ -20,23 +20,22 @@ public override CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMetho public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent, ITransaction transaction) { - // TODO not working at present -- transaction is always NoOpTransaction here but shouldn't be // make sure the transaction ends when the receiver manager is done processing messages if (instrumentedMethodCall.IsAsync) { return Delegates.GetAsyncDelegateFor( agent, - transaction.CurrentSegment, - true, + agent.CurrentTransaction.CurrentSegment, + false, onComplete: _ => { - transaction.End(); + agent.CurrentTransaction.End(); }); } return Delegates.GetDelegateFor(onComplete: () => { - transaction.End(); + agent.CurrentTransaction.End(); }); } } From cbc8de2b8d31f09f6fb3bd15faf7ac6cd73addbb Mon Sep 17 00:00:00 2001 From: Marty Tippin <120425148+tippmar-nr@users.noreply.github.com> Date: Wed, 20 Nov 2024 15:53:49 -0600 Subject: [PATCH 3/5] ServiceBusProcessor instrumentation is working --- .../Agent/Core/Metrics/MetricNames.cs | 1 + .../Agent/Core/Transactions/Transaction.cs | 2 + .../Providers/Wrapper/Constants.cs | 1 + .../AzureServiceBusProcessorWrapper.cs | 54 +++------- .../AzureServiceBusReceiveWrapper.cs | 100 ++++++++++++++++-- .../AzureServiceBusReceiverManagerWrapper.cs | 36 +++++-- .../AzureServiceBusSendWrapper.cs | 5 + .../AzureServiceBusWrapperBase.cs | 81 -------------- .../AzureServiceBus/Instrumentation.xml | 10 +- 9 files changed, 152 insertions(+), 138 deletions(-) diff --git a/src/Agent/NewRelic/Agent/Core/Metrics/MetricNames.cs b/src/Agent/NewRelic/Agent/Core/Metrics/MetricNames.cs index 2f399603a..8df32a64c 100644 --- a/src/Agent/NewRelic/Agent/Core/Metrics/MetricNames.cs +++ b/src/Agent/NewRelic/Agent/Core/Metrics/MetricNames.cs @@ -353,6 +353,7 @@ public enum MessageBrokerAction Consume, Peek, Purge, + Process } public const string MessageBrokerPrefix = "MessageBroker"; diff --git a/src/Agent/NewRelic/Agent/Core/Transactions/Transaction.cs b/src/Agent/NewRelic/Agent/Core/Transactions/Transaction.cs index 9cbe29390..74034df13 100644 --- a/src/Agent/NewRelic/Agent/Core/Transactions/Transaction.cs +++ b/src/Agent/NewRelic/Agent/Core/Transactions/Transaction.cs @@ -414,6 +414,8 @@ private static MetricNames.MessageBrokerAction AgentWrapperApiEnumToMetricNamesE return MetricNames.MessageBrokerAction.Produce; case MessageBrokerAction.Purge: return MetricNames.MessageBrokerAction.Purge; + case MessageBrokerAction.Process: + return MetricNames.MessageBrokerAction.Process; default: throw new InvalidOperationException("Unexpected enum value: " + wrapper); } diff --git a/src/Agent/NewRelic/Agent/Extensions/NewRelic.Agent.Extensions/Providers/Wrapper/Constants.cs b/src/Agent/NewRelic/Agent/Extensions/NewRelic.Agent.Extensions/Providers/Wrapper/Constants.cs index 116c412f8..fb1f3e4cd 100644 --- a/src/Agent/NewRelic/Agent/Extensions/NewRelic.Agent.Extensions/Providers/Wrapper/Constants.cs +++ b/src/Agent/NewRelic/Agent/Extensions/NewRelic.Agent.Extensions/Providers/Wrapper/Constants.cs @@ -59,6 +59,7 @@ public enum MessageBrokerAction Consume, Peek, Purge, + Process } ///This enum must be a sequence of values starting with 0 and incrementing by 1. See MetricNames.GetEnumerationFunc diff --git a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusProcessorWrapper.cs b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusProcessorWrapper.cs index d5c91c6bf..a5cfd1a75 100644 --- a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusProcessorWrapper.cs +++ b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusProcessorWrapper.cs @@ -4,13 +4,12 @@ using System.Threading.Tasks; using NewRelic.Agent.Api; using NewRelic.Agent.Extensions.Providers.Wrapper; -using NewRelic.Agent.Extensions.SystemExtensions; namespace NewRelic.Providers.Wrapper.AzureServiceBus; public class AzureServiceBusProcessorWrapper : AzureServiceBusWrapperBase { - public override bool IsTransactionRequired => false; + public override bool IsTransactionRequired => true; public override CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMethodInfo) { @@ -20,50 +19,29 @@ public override CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMetho public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent, ITransaction transaction) { - dynamic serviceBusProcessor = instrumentedMethodCall.MethodCall.InvocationTarget; - string queueName = serviceBusProcessor.EntityPath; // some-queue-name - string fqns = serviceBusProcessor.FullyQualifiedNamespace; // some-service-bus-entity.servicebus.windows.net - - //transaction = agent.CreateTransaction( - // destinationType: MessageBrokerDestinationType.Queue, - // BrokerVendorName, - // destination: queueName); - - // ??? - var segment = agent.CurrentTransaction.StartMethodSegment(instrumentedMethodCall.MethodCall, "Azure.Messaging.ServiceBus.ServiceBusProcessor", "ProcessMessageAsync"); - - //// start a message broker segment ??? - //var segment = transaction.StartMessageBrokerSegment( - // instrumentedMethodCall.MethodCall, - // MessageBrokerDestinationType.Queue, - // MessageBrokerAction.Consume, - // BrokerVendorName, - // queueName, - // serverAddress: fqns); + if (instrumentedMethodCall.IsAsync) + transaction.AttachToAsync(); + // this call wraps the client event handler callback, so start a method segment that will time the callback + var segment = transaction.StartMethodSegment( + instrumentedMethodCall.MethodCall, + instrumentedMethodCall.MethodCall.Method.Type.Name, + instrumentedMethodCall.MethodCall.Method.MethodName); return instrumentedMethodCall.IsAsync ? Delegates.GetAsyncDelegateFor( agent, segment, - true, // TODO Is this correct?? - t => - { - if (t.IsFaulted) + false, // TODO Is this correct? + onComplete: t => { - transaction.NoticeError(t.Exception); - } + if (t.Status == TaskStatus.Faulted) + transaction.NoticeError(t.Exception); - segment.End(); -// transaction.End(); - }, TaskContinuationOptions.ExecuteSynchronously) - : - Delegates.GetDelegateFor( - onFailure: transaction.NoticeError, - onComplete: () => - { segment.End(); -// transaction.End(); - }); + } + ) + : + Delegates.GetDelegateFor(segment); } } diff --git a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiveWrapper.cs b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiveWrapper.cs index 371da492e..bad331b37 100644 --- a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiveWrapper.cs +++ b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiveWrapper.cs @@ -2,9 +2,11 @@ // SPDX-License-Identifier: Apache-2.0 using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Collections.ObjectModel; using System.Threading.Tasks; using NewRelic.Agent.Api; -using NewRelic.Agent.Api.Experimental; using NewRelic.Agent.Extensions.Providers.Wrapper; using NewRelic.Reflection; @@ -12,6 +14,8 @@ namespace NewRelic.Providers.Wrapper.AzureServiceBus; public class AzureServiceBusReceiveWrapper : AzureServiceBusWrapperBase { + private static readonly ConcurrentDictionary> _getResultFromGenericTask = new(); + private Func _innerReceiverAccessor; private Func _innerReceiverIsProcessorAccessor; @@ -50,17 +54,26 @@ public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMetho _ => throw new ArgumentOutOfRangeException(nameof(action), $"Unexpected instrumented method call: {instrumentedMethodCall.MethodCall.Method.MethodName}") }; - // if the inner receiver is configured as a processor and this is a ReceiveMessagesAsync call, start a transaction - // the transaction will end at the conclusion of ReceiverManager.ProcessOneMessage() + // If the inner receiver is configured as a processor and this is a ReceiveMessagesAsync call, start a transaction. + // The transaction will end at the conclusion of ReceiverManager.ProcessOneMessageWithinScopeAsync() if (isProcessor && instrumentedMethodCall.MethodCall.Method.MethodName == "ReceiveMessagesAsync") { transaction = agent.CreateTransaction( destinationType: MessageBrokerDestinationType.Queue, BrokerVendorName, destination: queueName); + + if (instrumentedMethodCall.IsAsync) + transaction.DetachFromPrimary(); + } + + if (instrumentedMethodCall.IsAsync) + { + transaction.AttachToAsync(); } - // start a message broker segment + + // start a message broker segment (only happens if transaction is not NoOpTransaction) var segment = transaction.StartMessageBrokerSegment( instrumentedMethodCall.MethodCall, MessageBrokerDestinationType.Queue, @@ -77,7 +90,7 @@ public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMetho Delegates.GetAsyncDelegateFor( agent, segment, - true, + true, // TODO Is this correct?? (responseTask) => { try @@ -90,9 +103,84 @@ public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMetho } }, TaskContinuationOptions.ExecuteSynchronously) - : Delegates.GetDelegateFor( + : + Delegates.GetDelegateFor( onFailure: transaction.NoticeError, onComplete: segment.End, onSuccess: (resultObj) => ExtractDTHeadersIfAvailable(resultObj, transaction, instrumentedMethodName)); } + + private static object GetTaskResultFromObject(object taskObj) + { + var task = taskObj as Task; + if (task == null) + { + return null; + } + if (task.IsFaulted) + { + return null; + } + if (task.IsCanceled) + { + return null; + } + + var getResponse = _getResultFromGenericTask.GetOrAdd(task.GetType(), t => VisibilityBypasser.Instance.GeneratePropertyAccessor(t, "Result")); + return getResponse(task); + } + + private static void HandleReceiveResponse(Task responseTask, string instrumentedMethodName, ITransaction transaction) + { + if (responseTask.IsCanceled) + return; + + if (responseTask.IsFaulted) + { + transaction.NoticeError(responseTask.Exception); + return; + } + + var resultObj = GetTaskResultFromObject(responseTask); + ExtractDTHeadersIfAvailable(resultObj, transaction, instrumentedMethodName); + } + private static void ExtractDTHeadersIfAvailable(object resultObj, ITransaction transaction, string instrumentedMethodName) + { + if (resultObj != null) + { + switch (instrumentedMethodName) + { + case "ReceiveMessagesAsync": + case "ReceiveDeferredMessagesAsync": + case "PeekMessagesInternalAsync": + // the response contains a list of messages. + // get the first message from the response and extract DT headers + dynamic messages = resultObj; + if (messages.Count > 0) + { + var msg = messages[0]; + if (msg.ApplicationProperties is ReadOnlyDictionary applicationProperties) + { + transaction.AcceptDistributedTraceHeaders(applicationProperties, ProcessHeaders, TransportType.Queue); + } + } + break; + } + } + } + + private static IEnumerable ProcessHeaders(ReadOnlyDictionary applicationProperties, string key) + { + var headerValues = new List(); + foreach (var item in applicationProperties) + { + if (item.Key.Equals(key, StringComparison.OrdinalIgnoreCase)) + { + headerValues.Add(item.Value as string); + } + } + + return headerValues; + } + } diff --git a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiverManagerWrapper.cs b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiverManagerWrapper.cs index 62250a199..aa4cccaa1 100644 --- a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiverManagerWrapper.cs +++ b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiverManagerWrapper.cs @@ -1,15 +1,18 @@ // Copyright 2020 New Relic, Inc. All rights reserved. // SPDX-License-Identifier: Apache-2.0 +using System; using System.Threading.Tasks; using NewRelic.Agent.Api; using NewRelic.Agent.Extensions.Providers.Wrapper; +using NewRelic.Reflection; namespace NewRelic.Providers.Wrapper.AzureServiceBus { public class AzureServiceBusReceiverManagerWrapper : AzureServiceBusWrapperBase { - public override bool IsTransactionRequired => false; + private Func _receiverAccessor; + public override bool IsTransactionRequired => true; public override CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMethodInfo) { @@ -20,22 +23,41 @@ public override CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMetho public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent, ITransaction transaction) { - // TODO not working at present -- transaction is always NoOpTransaction here but shouldn't be - // make sure the transaction ends when the receiver manager is done processing messages + var receiverManager = instrumentedMethodCall.MethodCall.InvocationTarget; + _receiverAccessor ??= VisibilityBypasser.Instance.GeneratePropertyAccessor(receiverManager.GetType(), "Receiver"); + dynamic receiver = _receiverAccessor(receiverManager); + + string queueName = receiver.EntityPath; // some-queue-name + string fqns = receiver.FullyQualifiedNamespace; // some-service-bus-entity.servicebus.windows.net + + if (instrumentedMethodCall.IsAsync) + transaction.AttachToAsync(); + + // start a new MessageBroker segment that wraps ProcessOneMessageWithinScopeAsync + var segment = transaction.StartMessageBrokerSegment( + instrumentedMethodCall.MethodCall, + MessageBrokerDestinationType.Queue, + MessageBrokerAction.Process, // TODO: This is a new action, added for this instrumentation + BrokerVendorName, + queueName, + serverAddress: fqns); + if (instrumentedMethodCall.IsAsync) { return Delegates.GetAsyncDelegateFor( agent, - agent.CurrentTransaction.CurrentSegment, - false, + segment, + true, onComplete: _ => { - agent.CurrentTransaction.End(); + segment.End(); + transaction.End(); }); } return Delegates.GetDelegateFor(onComplete: () => { - agent.CurrentTransaction.End(); + segment.End(); + transaction.End(); }); } } diff --git a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusSendWrapper.cs b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusSendWrapper.cs index e8d8b4a9a..e9fdeae5f 100644 --- a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusSendWrapper.cs +++ b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusSendWrapper.cs @@ -33,6 +33,11 @@ public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMetho _ => throw new ArgumentOutOfRangeException(nameof(action), $"Unexpected instrumented method call: {instrumentedMethodCall.MethodCall.Method.MethodName}") }; + if (instrumentedMethodCall.IsAsync) + { + transaction.AttachToAsync(); + } + // start a message broker segment var segment = transaction.StartMessageBrokerSegment( instrumentedMethodCall.MethodCall, diff --git a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusWrapperBase.cs b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusWrapperBase.cs index b08f97e74..05d3f6d22 100644 --- a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusWrapperBase.cs +++ b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusWrapperBase.cs @@ -1,21 +1,13 @@ // Copyright 2020 New Relic, Inc. All rights reserved. // SPDX-License-Identifier: Apache-2.0 -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Collections.ObjectModel; -using System.Threading.Tasks; using NewRelic.Agent.Api; using NewRelic.Agent.Extensions.Providers.Wrapper; -using NewRelic.Reflection; namespace NewRelic.Providers.Wrapper.AzureServiceBus; public abstract class AzureServiceBusWrapperBase : IWrapper { - private static readonly ConcurrentDictionary> _getResultFromGenericTask = new(); - protected const string BrokerVendorName = "AzureServiceBus"; public virtual bool IsTransactionRequired => true; // only instrument service bus methods if we're already in a transaction @@ -23,77 +15,4 @@ public abstract class AzureServiceBusWrapperBase : IWrapper public abstract CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMethodInfo); public abstract AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent, ITransaction transaction); - - protected static object GetTaskResultFromObject(object taskObj) - { - var task = taskObj as Task; - if (task == null) - { - return null; - } - if (task.IsFaulted) - { - return null; - } - if (task.IsCanceled) - { - return null; - } - - var getResponse = _getResultFromGenericTask.GetOrAdd(task.GetType(), t => VisibilityBypasser.Instance.GeneratePropertyAccessor(t, "Result")); - return getResponse(task); - } - - protected void HandleReceiveResponse(Task responseTask, string instrumentedMethodName, ITransaction transaction) - { - if (responseTask.IsCanceled) - return; - - if (responseTask.IsFaulted) - { - transaction.NoticeError(responseTask.Exception); - return; - } - - var resultObj = GetTaskResultFromObject(responseTask); - ExtractDTHeadersIfAvailable(resultObj, transaction, instrumentedMethodName); - } - protected void ExtractDTHeadersIfAvailable(object resultObj, ITransaction transaction, string instrumentedMethodName) - { - if (resultObj != null) - { - switch (instrumentedMethodName) - { - case "ReceiveMessagesAsync": - case "ReceiveDeferredMessagesAsync": - case "PeekMessagesInternalAsync": - // the response contains a list of messages. - // get the first message from the response and extract DT headers - dynamic messages = resultObj; - if (messages.Count > 0) - { - var msg = messages[0]; - if (msg.ApplicationProperties is ReadOnlyDictionary applicationProperties) - { - transaction.AcceptDistributedTraceHeaders(applicationProperties, ProcessHeaders, TransportType.Queue); - } - } - break; - } - } - } - - protected IEnumerable ProcessHeaders(ReadOnlyDictionary applicationProperties, string key) - { - var headerValues = new List(); - foreach (var item in applicationProperties) - { - if (item.Key.Equals(key, StringComparison.OrdinalIgnoreCase)) - { - headerValues.Add(item.Value as string); - } - } - - return headerValues; - } } diff --git a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/Instrumentation.xml b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/Instrumentation.xml index 534d2c675..d0143c683 100644 --- a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/Instrumentation.xml +++ b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/Instrumentation.xml @@ -118,13 +118,11 @@ SPDX-License-Identifier: Apache-2.0 + https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ReceiverManager.cs#L149 + protected async Task ProcessOneMessageWithinScopeAsync(ServiceBusReceivedMessage message, string activityName, CancellationToken cancellationToken) + --> - + From 9d0f21cd6f89c67a9bdcf987dccec26b79b15482 Mon Sep 17 00:00:00 2001 From: Marty Tippin <120425148+tippmar-nr@users.noreply.github.com> Date: Mon, 25 Nov 2024 12:10:07 -0600 Subject: [PATCH 4/5] WIP. Still getting "transaction has already ended" --- .../Agent/Core/Metrics/MetricNames.cs | 4 +- .../Agent/Core/Transactions/Transaction.cs | 4 ++ .../Providers/Wrapper/Constants.cs | 4 +- .../AzureServiceBusProcessorWrapper.cs | 13 +--- .../AzureServiceBusReceiveWrapper.cs | 58 +++++++++------ .../AzureServiceBusReceiverManagerWrapper.cs | 4 +- .../AzureServiceBusSendWrapper.cs | 2 +- .../AzureServiceBusWrapperBase.cs | 2 +- .../IntegrationTestHelpers/AgentLogBase.cs | 1 + .../AzureServiceBusExerciser.cs | 20 +++--- .../AzureServiceBusProcessorTests.cs | 42 +++++++---- .../AzureServiceBus/AzureServiceBusTests.cs | 71 +++++++++++-------- 12 files changed, 133 insertions(+), 92 deletions(-) diff --git a/src/Agent/NewRelic/Agent/Core/Metrics/MetricNames.cs b/src/Agent/NewRelic/Agent/Core/Metrics/MetricNames.cs index 8df32a64c..81c08c4bd 100644 --- a/src/Agent/NewRelic/Agent/Core/Metrics/MetricNames.cs +++ b/src/Agent/NewRelic/Agent/Core/Metrics/MetricNames.cs @@ -353,7 +353,9 @@ public enum MessageBrokerAction Consume, Peek, Purge, - Process + Process, + Settle, + Cancel } public const string MessageBrokerPrefix = "MessageBroker"; diff --git a/src/Agent/NewRelic/Agent/Core/Transactions/Transaction.cs b/src/Agent/NewRelic/Agent/Core/Transactions/Transaction.cs index 74034df13..a6c93f9d6 100644 --- a/src/Agent/NewRelic/Agent/Core/Transactions/Transaction.cs +++ b/src/Agent/NewRelic/Agent/Core/Transactions/Transaction.cs @@ -416,6 +416,10 @@ private static MetricNames.MessageBrokerAction AgentWrapperApiEnumToMetricNamesE return MetricNames.MessageBrokerAction.Purge; case MessageBrokerAction.Process: return MetricNames.MessageBrokerAction.Process; + case MessageBrokerAction.Settle: + return MetricNames.MessageBrokerAction.Settle; + case MessageBrokerAction.Cancel: + return MetricNames.MessageBrokerAction.Cancel; default: throw new InvalidOperationException("Unexpected enum value: " + wrapper); } diff --git a/src/Agent/NewRelic/Agent/Extensions/NewRelic.Agent.Extensions/Providers/Wrapper/Constants.cs b/src/Agent/NewRelic/Agent/Extensions/NewRelic.Agent.Extensions/Providers/Wrapper/Constants.cs index fb1f3e4cd..6457accbf 100644 --- a/src/Agent/NewRelic/Agent/Extensions/NewRelic.Agent.Extensions/Providers/Wrapper/Constants.cs +++ b/src/Agent/NewRelic/Agent/Extensions/NewRelic.Agent.Extensions/Providers/Wrapper/Constants.cs @@ -59,7 +59,9 @@ public enum MessageBrokerAction Consume, Peek, Purge, - Process + Process, + Settle, + Cancel } ///This enum must be a sequence of values starting with 0 and incrementing by 1. See MetricNames.GetEnumerationFunc diff --git a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusProcessorWrapper.cs b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusProcessorWrapper.cs index a5cfd1a75..f46154639 100644 --- a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusProcessorWrapper.cs +++ b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusProcessorWrapper.cs @@ -29,18 +29,7 @@ public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMetho instrumentedMethodCall.MethodCall.Method.MethodName); return instrumentedMethodCall.IsAsync ? - Delegates.GetAsyncDelegateFor( - agent, - segment, - false, // TODO Is this correct? - onComplete: t => - { - if (t.Status == TaskStatus.Faulted) - transaction.NoticeError(t.Exception); - - segment.End(); - } - ) + Delegates.GetAsyncDelegateFor(agent, segment) : Delegates.GetDelegateFor(segment); } diff --git a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiveWrapper.cs b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiveWrapper.cs index bad331b37..2653b0057 100644 --- a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiveWrapper.cs +++ b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiveWrapper.cs @@ -40,23 +40,27 @@ public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMetho _innerReceiverIsProcessorAccessor ??= VisibilityBypasser.Instance.GenerateFieldReadAccessor(innerReceiver.GetType(), "_isProcessor"); var isProcessor = _innerReceiverIsProcessorAccessor.Invoke(innerReceiver); + var instrumentedMethodName = instrumentedMethodCall.MethodCall.Method.MethodName; + MessageBrokerAction action = - instrumentedMethodCall.MethodCall.Method.MethodName switch + instrumentedMethodName switch { "ReceiveMessagesAsync" => MessageBrokerAction.Consume, "ReceiveDeferredMessagesAsync" => MessageBrokerAction.Consume, "PeekMessagesInternalAsync" => MessageBrokerAction.Peek, - "AbandonMessageAsync" => MessageBrokerAction.Purge, // TODO is this correct ??? Abandon sends the message back to the queue for re-delivery - "CompleteMessageAsync" => MessageBrokerAction.Consume, - "DeadLetterInternalAsync" => MessageBrokerAction.Purge, // TODO is this correct ??? - "DeferMessageAsync" => MessageBrokerAction.Consume, // TODO is this correct or should we extend MessageBrokerAction with more values??? - "RenewMessageLockAsync" => MessageBrokerAction.Consume, // TODO is this correct or should we extend MessageBrokerAction with more values??? - _ => throw new ArgumentOutOfRangeException(nameof(action), $"Unexpected instrumented method call: {instrumentedMethodCall.MethodCall.Method.MethodName}") + "AbandonMessageAsync" => MessageBrokerAction.Settle, + "CompleteMessageAsync" => MessageBrokerAction.Settle, + "DeadLetterInternalAsync" => MessageBrokerAction.Settle, + "DeferMessageAsync" => MessageBrokerAction.Settle, + "RenewMessageLockAsync" => MessageBrokerAction.Consume, // TODO This doesn't quite fit. OTEL uses a default action with no name for this + _ => throw new ArgumentOutOfRangeException(nameof(action), $"Unexpected instrumented method call: {instrumentedMethodName}") }; + agent.Logger.Finest("AzureServiceBusReceiveWrapper - BeforeWrappedMethod: instrumentedMethodName: {instrumentedMethodName} - action: {action} - isProcessor: {isProcessor}", instrumentedMethodName, action, isProcessor); + // If the inner receiver is configured as a processor and this is a ReceiveMessagesAsync call, start a transaction. // The transaction will end at the conclusion of ReceiverManager.ProcessOneMessageWithinScopeAsync() - if (isProcessor && instrumentedMethodCall.MethodCall.Method.MethodName == "ReceiveMessagesAsync") + if (isProcessor && instrumentedMethodName == "ReceiveMessagesAsync") { transaction = agent.CreateTransaction( destinationType: MessageBrokerDestinationType.Queue, @@ -65,6 +69,14 @@ public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMetho if (instrumentedMethodCall.IsAsync) transaction.DetachFromPrimary(); + + transaction.LogFinest("Created transaction for ReceiveMessagesAsync in processor mode."); + } + + if (!isProcessor && instrumentedMethodName == "ReceiveMessagesAsync") + { + transaction = agent.CurrentTransaction; + transaction.LogFinest("Using existing transaction for ReceiveMessagesAsync, not in processor mode."); } if (instrumentedMethodCall.IsAsync) @@ -82,23 +94,25 @@ public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMetho queueName, serverAddress: fqns); - var instrumentedMethodName = instrumentedMethodCall.MethodCall.Method.MethodName; - return instrumentedMethodCall.IsAsync ? // return an async delegate Delegates.GetAsyncDelegateFor( agent, segment, - true, // TODO Is this correct?? + isProcessor, // TODO Is this correct?? (responseTask) => { try { - HandleReceiveResponse(responseTask, instrumentedMethodCall.MethodCall.Method.MethodName, transaction); + if (responseTask.IsFaulted) + transaction.NoticeError(responseTask.Exception); + + HandleReceiveResponse(responseTask, instrumentedMethodName, transaction, isProcessor); } finally { + transaction.LogFinest($"Ending segment for {instrumentedMethodName}."); segment.End(); } }, @@ -107,7 +121,7 @@ public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMetho Delegates.GetDelegateFor( onFailure: transaction.NoticeError, onComplete: segment.End, - onSuccess: (resultObj) => ExtractDTHeadersIfAvailable(resultObj, transaction, instrumentedMethodName)); + onSuccess: (resultObj) => ExtractDTHeadersIfAvailable(resultObj, transaction, instrumentedMethodName, isProcessor)); } private static object GetTaskResultFromObject(object taskObj) @@ -130,21 +144,15 @@ private static object GetTaskResultFromObject(object taskObj) return getResponse(task); } - private static void HandleReceiveResponse(Task responseTask, string instrumentedMethodName, ITransaction transaction) + private static void HandleReceiveResponse(Task responseTask, string instrumentedMethodName, ITransaction transaction, bool isProcessor) { if (responseTask.IsCanceled) return; - if (responseTask.IsFaulted) - { - transaction.NoticeError(responseTask.Exception); - return; - } - var resultObj = GetTaskResultFromObject(responseTask); - ExtractDTHeadersIfAvailable(resultObj, transaction, instrumentedMethodName); + ExtractDTHeadersIfAvailable(resultObj, transaction, instrumentedMethodName, isProcessor); } - private static void ExtractDTHeadersIfAvailable(object resultObj, ITransaction transaction, string instrumentedMethodName) + private static void ExtractDTHeadersIfAvailable(object resultObj, ITransaction transaction, string instrumentedMethodName, bool isProcessor) { if (resultObj != null) { @@ -158,12 +166,18 @@ private static void ExtractDTHeadersIfAvailable(object resultObj, ITransaction t dynamic messages = resultObj; if (messages.Count > 0) { + transaction.LogFinest($"Received {messages.Count} message(s). Accepting DT headers."); var msg = messages[0]; if (msg.ApplicationProperties is ReadOnlyDictionary applicationProperties) { transaction.AcceptDistributedTraceHeaders(applicationProperties, ProcessHeaders, TransportType.Queue); } } + else if (messages.Count == 0 && isProcessor) // if there are no messages and the receiver is a processor, ignore the transaction we created + { + transaction.LogFinest("No messages received. Ignoring transaction."); + transaction.Ignore(); + } break; } } diff --git a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiverManagerWrapper.cs b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiverManagerWrapper.cs index aa4cccaa1..20fb02d38 100644 --- a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiverManagerWrapper.cs +++ b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiverManagerWrapper.cs @@ -47,12 +47,12 @@ public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMetho return Delegates.GetAsyncDelegateFor( agent, segment, - true, + false, onComplete: _ => { segment.End(); transaction.End(); - }); + }, TaskContinuationOptions.ExecuteSynchronously); } return Delegates.GetDelegateFor(onComplete: () => { diff --git a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusSendWrapper.cs b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusSendWrapper.cs index e9fdeae5f..85d91db59 100644 --- a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusSendWrapper.cs +++ b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusSendWrapper.cs @@ -29,7 +29,7 @@ public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMetho { "SendMessagesAsync" => MessageBrokerAction.Produce, "ScheduleMessagesAsync" => MessageBrokerAction.Produce, - "CancelScheduledMessagesAsync" => MessageBrokerAction.Purge, // TODO is this correct ??? + "CancelScheduledMessagesAsync" => MessageBrokerAction.Cancel, _ => throw new ArgumentOutOfRangeException(nameof(action), $"Unexpected instrumented method call: {instrumentedMethodCall.MethodCall.Method.MethodName}") }; diff --git a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusWrapperBase.cs b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusWrapperBase.cs index 05d3f6d22..ff6b24add 100644 --- a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusWrapperBase.cs +++ b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusWrapperBase.cs @@ -8,7 +8,7 @@ namespace NewRelic.Providers.Wrapper.AzureServiceBus; public abstract class AzureServiceBusWrapperBase : IWrapper { - protected const string BrokerVendorName = "AzureServiceBus"; + protected const string BrokerVendorName = "ServiceBus"; public virtual bool IsTransactionRequired => true; // only instrument service bus methods if we're already in a transaction diff --git a/tests/Agent/IntegrationTests/IntegrationTestHelpers/AgentLogBase.cs b/tests/Agent/IntegrationTests/IntegrationTestHelpers/AgentLogBase.cs index 538a62081..adb614b30 100644 --- a/tests/Agent/IntegrationTests/IntegrationTestHelpers/AgentLogBase.cs +++ b/tests/Agent/IntegrationTests/IntegrationTestHelpers/AgentLogBase.cs @@ -72,6 +72,7 @@ public abstract class AgentLogBase // Transactions (either with an ID or "noop") public const string TransactionLinePrefix = FinestLogLinePrefixRegex + @"Trx ([a-fA-F0-9]*|Noop): "; + public const string TransactionAlreadyEndedLogLineRegex = TransactionLinePrefix + "Transaction has already ended(.*)"; // Serverless payloads public const string ServerlessPayloadLogLineRegex = FinestLogLinePrefixRegex + @"Serverless payload: (.*)"; diff --git a/tests/Agent/IntegrationTests/SharedApplications/Common/MultiFunctionApplicationHelpers/NetStandardLibraries/AzureServiceBus/AzureServiceBusExerciser.cs b/tests/Agent/IntegrationTests/SharedApplications/Common/MultiFunctionApplicationHelpers/NetStandardLibraries/AzureServiceBus/AzureServiceBusExerciser.cs index 63797795e..59e1b775d 100644 --- a/tests/Agent/IntegrationTests/SharedApplications/Common/MultiFunctionApplicationHelpers/NetStandardLibraries/AzureServiceBus/AzureServiceBusExerciser.cs +++ b/tests/Agent/IntegrationTests/SharedApplications/Common/MultiFunctionApplicationHelpers/NetStandardLibraries/AzureServiceBus/AzureServiceBusExerciser.cs @@ -71,18 +71,16 @@ public static async Task ExerciseMultipleReceiveOperationsOnAMessage(string queu [LibraryMethod] [Transaction] [MethodImpl(MethodImplOptions.NoOptimization | MethodImplOptions.NoInlining)] - public static async Task ScheduleAndReceiveAMessage(string queueName) + public static async Task ScheduleAndCancelAMessage(string queueName) { await using var client = new ServiceBusClient(AzureServiceBusConfiguration.ConnectionString); await using var sender = client.CreateSender(queueName); var message = new ServiceBusMessage("Hello world!"); - await sender.ScheduleMessageAsync(message, DateTime.UtcNow.AddSeconds(5)); + var messageSequenceId = await sender.ScheduleMessageAsync(message, DateTime.UtcNow.AddSeconds(90)); - await Task.Delay(TimeSpan.FromSeconds(10)); - - await using var receiver = client.CreateReceiver(queueName, new ServiceBusReceiverOptions() { ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete }); - await receiver.ReceiveMessageAsync(); + // cancel the scheduled message + await sender.CancelScheduledMessageAsync(messageSequenceId); } [LibraryMethod] @@ -169,7 +167,7 @@ public static async Task ExerciseServiceBusProcessor(string queueName) // create the options to use for configuring the processor ServiceBusProcessorOptions options = new() { - MaxConcurrentCalls = 2 // multi-threading. Yay! + MaxConcurrentCalls = 1 // multi-threading. Yay! }; // create a processor that we can use to process the messages @@ -181,15 +179,17 @@ public static async Task ExerciseServiceBusProcessor(string queueName) processor.ProcessMessageAsync += MessageHandler; processor.ProcessErrorAsync += ErrorHandler; // ErrorHandler is required, but we won't exercise it - Task MessageHandler(ProcessMessageEventArgs args) + async Task MessageHandler(ProcessMessageEventArgs args) { string body = args.Message.Body.ToString(); - Console.WriteLine(body); + var threadId = Thread.CurrentThread.ManagedThreadId; + Console.WriteLine($"ThreadId: {threadId} - body: {body}"); Interlocked.Increment(ref receivedMessages); - return Task.CompletedTask; + await Task.Delay(1000); // simulate processing the message } + Task ErrorHandler(ProcessErrorEventArgs args) { // the error source tells me at what point in the processing an error occurred diff --git a/tests/Agent/IntegrationTests/UnboundedIntegrationTests/AzureServiceBus/AzureServiceBusProcessorTests.cs b/tests/Agent/IntegrationTests/UnboundedIntegrationTests/AzureServiceBus/AzureServiceBusProcessorTests.cs index f714db9e7..9a8306a18 100644 --- a/tests/Agent/IntegrationTests/UnboundedIntegrationTests/AzureServiceBus/AzureServiceBusProcessorTests.cs +++ b/tests/Agent/IntegrationTests/UnboundedIntegrationTests/AzureServiceBus/AzureServiceBusProcessorTests.cs @@ -3,7 +3,6 @@ using System; using System.Collections.Generic; -using System.Drawing.Imaging; using System.Linq; using NewRelic.Agent.IntegrationTestHelpers; using NewRelic.Agent.IntegrationTestHelpers.RemoteServiceFixtures; @@ -55,30 +54,39 @@ protected AzureServiceBusProcessorTestsBase(TFixture fixture, ITestOutputHelper _fixture.Initialize(); } - private readonly string _metricNameBase = "MessageBroker/AzureServiceBus/Queue/Consume/Named"; - private readonly string _transactionNameBase = "OtherTransaction/Message/AzureServiceBus/Queue/Named"; + private readonly string _consumeMetricNameBase = "MessageBroker/ServiceBus/Queue/Consume/Named"; + private readonly string _processMetricNameBase = "MessageBroker/ServiceBus/Queue/Process/Named"; + private readonly string _settleMetricNameBase = "MessageBroker/ServiceBus/Queue/Settle/Named"; + private readonly string _transactionNameBase = "OtherTransaction/Message/ServiceBus/Queue/Named"; [Fact] public void Test() { var metrics = _fixture.AgentLog.GetMetrics().ToList(); + // 2 messages, 1 consume segment, 1 process segment, 1 settle segment per message var expectedMetrics = new List { - new() { metricName = $"{_metricNameBase}/{_queueName}", callCount = 2}, - new() { metricName = $"{_metricNameBase}/{_queueName}", callCount = 2, metricScope = $"{_transactionNameBase}/{_queueName}"}, + new() { metricName = $"{_consumeMetricNameBase}/{_queueName}", callCount = 4}, + new() { metricName = $"{_consumeMetricNameBase}/{_queueName}", callCount = 4, metricScope = $"{_transactionNameBase}/{_queueName}"}, + new() { metricName = $"{_processMetricNameBase}/{_queueName}", callCount = 2, metricScope = $"{_transactionNameBase}/{_queueName}"}, + new() { metricName = $"{_settleMetricNameBase}/{_queueName}", callCount = 2, metricScope = $"{_transactionNameBase}/{_queueName}"}, }; - var expectedTransactinEvent = _fixture.AgentLog.TryGetTransactionEvent($"{_transactionNameBase}/{_queueName}"); + var expectedTransactionEvent = _fixture.AgentLog.TryGetTransactionEvent($"{_transactionNameBase}/{_queueName}"); var expectedTransactionTraceSegments = new List { - $"{_metricNameBase}/{_queueName}" + $"{_consumeMetricNameBase}/{_queueName}", + $"{_processMetricNameBase}/{_queueName}", + "DotNet/ServiceBusProcessor/OnProcessMessageAsync", + $"{_settleMetricNameBase}/{_queueName}", }; var transactionSample = _fixture.AgentLog.TryGetTransactionSample($"{_transactionNameBase}/{_queueName}"); - var queueConsumeSpanEvents = _fixture.AgentLog.TryGetSpanEvent($"{_metricNameBase} /{_queueName}"); + var queueConsumeSpanEvent = _fixture.AgentLog.TryGetSpanEvent($"{_consumeMetricNameBase}/{_queueName}"); + var queueProcessSpanEvent = _fixture.AgentLog.TryGetSpanEvent($"{_processMetricNameBase}/{_queueName}"); var expectedConsumeAgentAttributes = new List { @@ -88,17 +96,27 @@ public void Test() var expectedIntrinsicAttributes = new List { "span.kind", }; + // make sure log does not contain "Transaction has already ended" + var transactionAlreadyEndedErrors = _fixture.AgentLog.TryGetLogLines(AgentLogBase.TransactionAlreadyEndedLogLineRegex); + Assert.Empty(transactionAlreadyEndedErrors); + Assertions.MetricsExist(expectedMetrics, metrics); NrAssert.Multiple( - () => Assert.True(expectedTransactinEvent != null, "expectedTransactionEvent should not be null"), - () => Assert.True(transactionSample != null, "transactionSample should not be null"), + () => Assert.NotNull(expectedTransactionEvent), + () => Assert.NotNull(transactionSample), + () => Assert.NotNull(queueConsumeSpanEvent), + () => Assert.NotNull(queueProcessSpanEvent), () => Assertions.TransactionTraceSegmentsExist(expectedTransactionTraceSegments, transactionSample), () => Assertions.SpanEventHasAttributes(expectedConsumeAgentAttributes, - Tests.TestSerializationHelpers.Models.SpanEventAttributeType.Agent, queueConsumeSpanEvents), + Tests.TestSerializationHelpers.Models.SpanEventAttributeType.Agent, queueConsumeSpanEvent), + () => Assertions.SpanEventHasAttributes(expectedIntrinsicAttributes, + Tests.TestSerializationHelpers.Models.SpanEventAttributeType.Intrinsic, queueConsumeSpanEvent), + () => Assertions.SpanEventHasAttributes(expectedConsumeAgentAttributes, + Tests.TestSerializationHelpers.Models.SpanEventAttributeType.Agent, queueConsumeSpanEvent), () => Assertions.SpanEventHasAttributes(expectedIntrinsicAttributes, - Tests.TestSerializationHelpers.Models.SpanEventAttributeType.Intrinsic, queueConsumeSpanEvents) + Tests.TestSerializationHelpers.Models.SpanEventAttributeType.Intrinsic, queueConsumeSpanEvent) ); } } diff --git a/tests/Agent/IntegrationTests/UnboundedIntegrationTests/AzureServiceBus/AzureServiceBusTests.cs b/tests/Agent/IntegrationTests/UnboundedIntegrationTests/AzureServiceBus/AzureServiceBusTests.cs index 9b8f41c56..8a112113f 100644 --- a/tests/Agent/IntegrationTests/UnboundedIntegrationTests/AzureServiceBus/AzureServiceBusTests.cs +++ b/tests/Agent/IntegrationTests/UnboundedIntegrationTests/AzureServiceBus/AzureServiceBusTests.cs @@ -28,7 +28,7 @@ protected AzureServiceBusTestsBase(TFixture fixture, ITestOutputHelper output) : _fixture.AddCommand($"AzureServiceBusExerciser InitializeQueue {_queueName}"); _fixture.AddCommand($"AzureServiceBusExerciser ExerciseMultipleReceiveOperationsOnAMessage {_queueName}"); - _fixture.AddCommand($"AzureServiceBusExerciser ScheduleAndReceiveAMessage {_queueName}"); + _fixture.AddCommand($"AzureServiceBusExerciser ScheduleAndCancelAMessage {_queueName}"); _fixture.AddCommand($"AzureServiceBusExerciser ReceiveAndDeadLetterAMessage {_queueName}"); _fixture.AddCommand($"AzureServiceBusExerciser ReceiveAndAbandonAMessage {_queueName}"); _fixture.AddCommand($"AzureServiceBusExerciser DeleteQueue {_queueName}"); @@ -62,40 +62,43 @@ public void Test() var expectedMetrics = new List { - new() { metricName = $"MessageBroker/AzureServiceBus/Queue/Produce/Named/{_queueName}", callCount = 4}, - new() { metricName = $"MessageBroker/AzureServiceBus/Queue/Produce/Named/{_queueName}", callCount = 1, metricScope = $"{_metricScopeBase}/ExerciseMultipleReceiveOperationsOnAMessage"}, - new() { metricName = $"MessageBroker/AzureServiceBus/Queue/Produce/Named/{_queueName}", callCount = 1, metricScope = $"{_metricScopeBase}/ScheduleAndReceiveAMessage"}, - new() { metricName = $"MessageBroker/AzureServiceBus/Queue/Produce/Named/{_queueName}", callCount = 1, metricScope = $"{_metricScopeBase}/ReceiveAndDeadLetterAMessage"}, - new() { metricName = $"MessageBroker/AzureServiceBus/Queue/Produce/Named/{_queueName}", callCount = 1, metricScope = $"{_metricScopeBase}/ReceiveAndAbandonAMessage"}, - - new() { metricName = $"MessageBroker/AzureServiceBus/Queue/Consume/Named/{_queueName}", callCount = 10}, - new() { metricName = $"MessageBroker/AzureServiceBus/Queue/Consume/Named/{_queueName}", callCount = 5, metricScope = $"{_metricScopeBase}/ExerciseMultipleReceiveOperationsOnAMessage"}, - new() { metricName = $"MessageBroker/AzureServiceBus/Queue/Consume/Named/{_queueName}", callCount = 1, metricScope = $"{_metricScopeBase}/ScheduleAndReceiveAMessage"}, - new() { metricName = $"MessageBroker/AzureServiceBus/Queue/Consume/Named/{_queueName}", callCount = 1, metricScope = $"{_metricScopeBase}/ReceiveAndDeadLetterAMessage"}, - new() { metricName = $"MessageBroker/AzureServiceBus/Queue/Consume/Named/{_queueName}", callCount = 3, metricScope = $"{_metricScopeBase}/ReceiveAndAbandonAMessage"}, - - new() { metricName = $"MessageBroker/AzureServiceBus/Queue/Peek/Named/{_queueName}", callCount = 1 }, - new() { metricName = $"MessageBroker/AzureServiceBus/Queue/Peek/Named/{_queueName}", callCount = 1, metricScope = $"{_metricScopeBase}/ExerciseMultipleReceiveOperationsOnAMessage" }, - - new() { metricName = $"MessageBroker/AzureServiceBus/Queue/Purge/Named/{_queueName}", callCount = 2 }, - new() { metricName = $"MessageBroker/AzureServiceBus/Queue/Purge/Named/{_queueName}", callCount = 1, metricScope = $"{_metricScopeBase}/ReceiveAndDeadLetterAMessage" }, - new() { metricName = $"MessageBroker/AzureServiceBus/Queue/Purge/Named/{_queueName}", callCount = 1, metricScope = $"{_metricScopeBase}/ReceiveAndAbandonAMessage" }, + new() { metricName = $"MessageBroker/ServiceBus/Queue/Produce/Named/{_queueName}", callCount = 4}, + new() { metricName = $"MessageBroker/ServiceBus/Queue/Produce/Named/{_queueName}", callCount = 1, metricScope = $"{_metricScopeBase}/ExerciseMultipleReceiveOperationsOnAMessage"}, + new() { metricName = $"MessageBroker/ServiceBus/Queue/Produce/Named/{_queueName}", callCount = 1, metricScope = $"{_metricScopeBase}/ScheduleAndCancelAMessage"}, + new() { metricName = $"MessageBroker/ServiceBus/Queue/Produce/Named/{_queueName}", callCount = 1, metricScope = $"{_metricScopeBase}/ReceiveAndDeadLetterAMessage"}, + new() { metricName = $"MessageBroker/ServiceBus/Queue/Produce/Named/{_queueName}", callCount = 1, metricScope = $"{_metricScopeBase}/ReceiveAndAbandonAMessage"}, + + new() { metricName = $"MessageBroker/ServiceBus/Queue/Consume/Named/{_queueName}", callCount = 6}, + new() { metricName = $"MessageBroker/ServiceBus/Queue/Consume/Named/{_queueName}", callCount = 3, metricScope = $"{_metricScopeBase}/ExerciseMultipleReceiveOperationsOnAMessage"}, + new() { metricName = $"MessageBroker/ServiceBus/Queue/Consume/Named/{_queueName}", callCount = 1, metricScope = $"{_metricScopeBase}/ReceiveAndDeadLetterAMessage"}, + new() { metricName = $"MessageBroker/ServiceBus/Queue/Consume/Named/{_queueName}", callCount = 2, metricScope = $"{_metricScopeBase}/ReceiveAndAbandonAMessage"}, + + new() { metricName = $"MessageBroker/ServiceBus/Queue/Peek/Named/{_queueName}", callCount = 1 }, + new() { metricName = $"MessageBroker/ServiceBus/Queue/Peek/Named/{_queueName}", callCount = 1, metricScope = $"{_metricScopeBase}/ExerciseMultipleReceiveOperationsOnAMessage" }, + + new() { metricName = $"MessageBroker/ServiceBus/Queue/Cancel/Named/{_queueName}", callCount = 1 }, + new() { metricName = $"MessageBroker/ServiceBus/Queue/Cancel/Named/{_queueName}", callCount = 1, metricScope = $"{_metricScopeBase}/ScheduleAndCancelAMessage" }, + + new() { metricName = $"MessageBroker/ServiceBus/Queue/Settle/Named/{_queueName}", callCount = 5 }, + new() { metricName = $"MessageBroker/ServiceBus/Queue/Settle/Named/{_queueName}", callCount = 2, metricScope = $"{_metricScopeBase}/ExerciseMultipleReceiveOperationsOnAMessage"}, + new() { metricName = $"MessageBroker/ServiceBus/Queue/Settle/Named/{_queueName}", callCount = 1, metricScope = $"{_metricScopeBase}/ReceiveAndDeadLetterAMessage" }, + new() { metricName = $"MessageBroker/ServiceBus/Queue/Settle/Named/{_queueName}", callCount = 2, metricScope = $"{_metricScopeBase}/ReceiveAndAbandonAMessage" }, }; var exerciseMultipleReceiveOperationsOnAMessageTransactionEvent = _fixture.AgentLog.TryGetTransactionEvent($"{_metricScopeBase}/ExerciseMultipleReceiveOperationsOnAMessage"); - var scheduleAndReceiveAMessageTransactionEvent = _fixture.AgentLog.TryGetTransactionEvent($"{_metricScopeBase}/ScheduleAndReceiveAMessage"); var expectedTransactionTraceSegments = new List { - $"MessageBroker/AzureServiceBus/Queue/Consume/Named/{_queueName}" + $"MessageBroker/ServiceBus/Queue/Consume/Named/{_queueName}" }; - var transactionSample = _fixture.AgentLog.TryGetTransactionSample($"{_metricScopeBase}/ScheduleAndReceiveAMessage"); // this will always be the slowest transaction + var transactionSample = _fixture.AgentLog.TryGetTransactionSample($"{_metricScopeBase}/ExerciseMultipleReceiveOperationsOnAMessage"); - var queueProduceSpanEvents = _fixture.AgentLog.TryGetSpanEvent($"MessageBroker/AzureServiceBus/Queue/Produce/Named/{_queueName}"); - var queueConsumeSpanEvents = _fixture.AgentLog.TryGetSpanEvent($"MessageBroker/AzureServiceBus/Queue/Consume/Named/{_queueName}"); - var queuePeekSpanEvents = _fixture.AgentLog.TryGetSpanEvent($"MessageBroker/AzureServiceBus/Queue/Peek/Named/{_queueName}"); - var queuePurgeSpanEvents = _fixture.AgentLog.TryGetSpanEvent($"MessageBroker/AzureServiceBus/Queue/Purge/Named/{_queueName}"); + var queueProduceSpanEvents = _fixture.AgentLog.TryGetSpanEvent($"MessageBroker/ServiceBus/Queue/Produce/Named/{_queueName}"); + var queueConsumeSpanEvents = _fixture.AgentLog.TryGetSpanEvent($"MessageBroker/ServiceBus/Queue/Consume/Named/{_queueName}"); + var queuePeekSpanEvents = _fixture.AgentLog.TryGetSpanEvent($"MessageBroker/ServiceBus/Queue/Peek/Named/{_queueName}"); + var queueSettleSpanEvents = _fixture.AgentLog.TryGetSpanEvent($"MessageBroker/ServiceBus/Queue/Settle/Named/{_queueName}"); + var queueCancelSpanEvents = _fixture.AgentLog.TryGetSpanEvent($"MessageBroker/ServiceBus/Queue/Cancel/Named/{_queueName}"); var expectedProduceAgentAttributes = new List { @@ -116,7 +119,13 @@ public void Test() "messaging.destination.name", }; - var expectedPurgeAgentAttributes = new List + var expectedSettleAgentAttributes = new List + { + "server.address", + "messaging.destination.name", + }; + + var expectedCancelAgentAttributes = new List { "server.address", "messaging.destination.name", @@ -128,7 +137,6 @@ public void Test() NrAssert.Multiple( () => Assert.True(exerciseMultipleReceiveOperationsOnAMessageTransactionEvent != null, "ExerciseMultipleReceiveOperationsOnAMessageTransactionEvent should not be null"), - () => Assert.True(scheduleAndReceiveAMessageTransactionEvent != null, "ScheduleAndReceiveAMessageTransactionEvent should not be null"), () => Assert.True(transactionSample != null, "transactionSample should not be null"), () => Assertions.TransactionTraceSegmentsExist(expectedTransactionTraceSegments, transactionSample), @@ -145,8 +153,11 @@ public void Test() () => Assertions.SpanEventHasAttributes(expectedPeekAgentAttributes, Tests.TestSerializationHelpers.Models.SpanEventAttributeType.Agent, queuePeekSpanEvents), - () => Assertions.SpanEventHasAttributes(expectedPurgeAgentAttributes, - Tests.TestSerializationHelpers.Models.SpanEventAttributeType.Agent, queuePurgeSpanEvents) + () => Assertions.SpanEventHasAttributes(expectedSettleAgentAttributes, + Tests.TestSerializationHelpers.Models.SpanEventAttributeType.Agent, queueSettleSpanEvents), + + () => Assertions.SpanEventHasAttributes(expectedCancelAgentAttributes, + Tests.TestSerializationHelpers.Models.SpanEventAttributeType.Agent, queueCancelSpanEvents) ); } } From 9c66fb0c055c432b44849f195b059c782c8f5205 Mon Sep 17 00:00:00 2001 From: Marty Tippin <120425148+tippmar-nr@users.noreply.github.com> Date: Thu, 12 Dec 2024 15:01:35 -0600 Subject: [PATCH 5/5] Updates, integration tests --- .../AzureServiceBusReceiveWrapper.cs | 28 +++++++++++++------ .../AzureServiceBusReceiverManagerWrapper.cs | 23 ++++++++------- .../MFALatestPackages.csproj | 10 +++---- .../MultiFunctionApplicationHelpers.csproj | 2 +- .../AzureServiceBusProcessorTests.cs | 8 ++---- 5 files changed, 38 insertions(+), 33 deletions(-) diff --git a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiveWrapper.cs b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiveWrapper.cs index 2653b0057..8ca485237 100644 --- a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiveWrapper.cs +++ b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiveWrapper.cs @@ -29,6 +29,8 @@ public override CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMetho public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent, ITransaction transaction) { + transaction.LogFinest("AzureServiceBusReceiveWrapper.BeforeWrappedMethod() is starting."); + dynamic serviceBusReceiver = instrumentedMethodCall.MethodCall.InvocationTarget; string queueName = serviceBusReceiver.EntityPath; // some-queue-name string fqns = serviceBusReceiver.FullyQualifiedNamespace; // some-service-bus-entity.servicebus.windows.net @@ -56,8 +58,6 @@ public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMetho _ => throw new ArgumentOutOfRangeException(nameof(action), $"Unexpected instrumented method call: {instrumentedMethodName}") }; - agent.Logger.Finest("AzureServiceBusReceiveWrapper - BeforeWrappedMethod: instrumentedMethodName: {instrumentedMethodName} - action: {action} - isProcessor: {isProcessor}", instrumentedMethodName, action, isProcessor); - // If the inner receiver is configured as a processor and this is a ReceiveMessagesAsync call, start a transaction. // The transaction will end at the conclusion of ReceiverManager.ProcessOneMessageWithinScopeAsync() if (isProcessor && instrumentedMethodName == "ReceiveMessagesAsync") @@ -68,15 +68,16 @@ public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMetho destination: queueName); if (instrumentedMethodCall.IsAsync) + { transaction.DetachFromPrimary(); + } transaction.LogFinest("Created transaction for ReceiveMessagesAsync in processor mode."); } - - if (!isProcessor && instrumentedMethodName == "ReceiveMessagesAsync") + else { transaction = agent.CurrentTransaction; - transaction.LogFinest("Using existing transaction for ReceiveMessagesAsync, not in processor mode."); + transaction.LogFinest($"Using existing transaction for {instrumentedMethodName}."); } if (instrumentedMethodCall.IsAsync) @@ -100,20 +101,32 @@ public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMetho Delegates.GetAsyncDelegateFor( agent, segment, - isProcessor, // TODO Is this correct?? + false, (responseTask) => { try { if (responseTask.IsFaulted) + { transaction.NoticeError(responseTask.Exception); + } HandleReceiveResponse(responseTask, instrumentedMethodName, transaction, isProcessor); } + catch (Exception ex) + { + transaction.LogFinest($"Unexpected exception: {ex.Message}"); + } finally { transaction.LogFinest($"Ending segment for {instrumentedMethodName}."); segment.End(); + + if (isProcessor && responseTask.IsCanceled) + { + transaction.LogFinest("ReceiveMessagesAsync task was canceled in processor mode. Ignoring transaction."); + transaction.Ignore(); + } } }, TaskContinuationOptions.ExecuteSynchronously) @@ -146,9 +159,6 @@ private static object GetTaskResultFromObject(object taskObj) private static void HandleReceiveResponse(Task responseTask, string instrumentedMethodName, ITransaction transaction, bool isProcessor) { - if (responseTask.IsCanceled) - return; - var resultObj = GetTaskResultFromObject(responseTask); ExtractDTHeadersIfAvailable(resultObj, transaction, instrumentedMethodName, isProcessor); } diff --git a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiverManagerWrapper.cs b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiverManagerWrapper.cs index 20fb02d38..b18a7e984 100644 --- a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiverManagerWrapper.cs +++ b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiverManagerWrapper.cs @@ -20,8 +20,7 @@ public override CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMetho return new CanWrapResponse(canWrap); } - public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent, - ITransaction transaction) + public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent, ITransaction transaction) { var receiverManager = instrumentedMethodCall.MethodCall.InvocationTarget; _receiverAccessor ??= VisibilityBypasser.Instance.GeneratePropertyAccessor(receiverManager.GetType(), "Receiver"); @@ -42,9 +41,9 @@ public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMetho queueName, serverAddress: fqns); - if (instrumentedMethodCall.IsAsync) - { - return Delegates.GetAsyncDelegateFor( + return instrumentedMethodCall.IsAsync + ? + Delegates.GetAsyncDelegateFor( agent, segment, false, @@ -52,13 +51,13 @@ public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMetho { segment.End(); transaction.End(); - }, TaskContinuationOptions.ExecuteSynchronously); - } - return Delegates.GetDelegateFor(onComplete: () => - { - segment.End(); - transaction.End(); - }); + }, TaskContinuationOptions.ExecuteSynchronously) + : + Delegates.GetDelegateFor(onComplete: () => + { + segment.End(); + transaction.End(); + }); } } } diff --git a/tests/Agent/IntegrationTests/SharedApplications/Common/MFALatestPackages/MFALatestPackages.csproj b/tests/Agent/IntegrationTests/SharedApplications/Common/MFALatestPackages/MFALatestPackages.csproj index 77d359b9c..9ab6f5a60 100644 --- a/tests/Agent/IntegrationTests/SharedApplications/Common/MFALatestPackages/MFALatestPackages.csproj +++ b/tests/Agent/IntegrationTests/SharedApplications/Common/MFALatestPackages/MFALatestPackages.csproj @@ -9,10 +9,7 @@ - - - - + @@ -34,7 +31,10 @@ - + + + + diff --git a/tests/Agent/IntegrationTests/SharedApplications/Common/MultiFunctionApplicationHelpers/MultiFunctionApplicationHelpers.csproj b/tests/Agent/IntegrationTests/SharedApplications/Common/MultiFunctionApplicationHelpers/MultiFunctionApplicationHelpers.csproj index f301901e8..993654c7a 100644 --- a/tests/Agent/IntegrationTests/SharedApplications/Common/MultiFunctionApplicationHelpers/MultiFunctionApplicationHelpers.csproj +++ b/tests/Agent/IntegrationTests/SharedApplications/Common/MultiFunctionApplicationHelpers/MultiFunctionApplicationHelpers.csproj @@ -275,7 +275,7 @@ - + diff --git a/tests/Agent/IntegrationTests/UnboundedIntegrationTests/AzureServiceBus/AzureServiceBusProcessorTests.cs b/tests/Agent/IntegrationTests/UnboundedIntegrationTests/AzureServiceBus/AzureServiceBusProcessorTests.cs index 9a8306a18..a76b5b6e8 100644 --- a/tests/Agent/IntegrationTests/UnboundedIntegrationTests/AzureServiceBus/AzureServiceBusProcessorTests.cs +++ b/tests/Agent/IntegrationTests/UnboundedIntegrationTests/AzureServiceBus/AzureServiceBusProcessorTests.cs @@ -67,8 +67,8 @@ public void Test() // 2 messages, 1 consume segment, 1 process segment, 1 settle segment per message var expectedMetrics = new List { - new() { metricName = $"{_consumeMetricNameBase}/{_queueName}", callCount = 4}, - new() { metricName = $"{_consumeMetricNameBase}/{_queueName}", callCount = 4, metricScope = $"{_transactionNameBase}/{_queueName}"}, + new() { metricName = $"{_consumeMetricNameBase}/{_queueName}", callCount = 2}, + new() { metricName = $"{_consumeMetricNameBase}/{_queueName}", callCount = 2, metricScope = $"{_transactionNameBase}/{_queueName}"}, new() { metricName = $"{_processMetricNameBase}/{_queueName}", callCount = 2, metricScope = $"{_transactionNameBase}/{_queueName}"}, new() { metricName = $"{_settleMetricNameBase}/{_queueName}", callCount = 2, metricScope = $"{_transactionNameBase}/{_queueName}"}, }; @@ -96,10 +96,6 @@ public void Test() var expectedIntrinsicAttributes = new List { "span.kind", }; - // make sure log does not contain "Transaction has already ended" - var transactionAlreadyEndedErrors = _fixture.AgentLog.TryGetLogLines(AgentLogBase.TransactionAlreadyEndedLogLineRegex); - Assert.Empty(transactionAlreadyEndedErrors); - Assertions.MetricsExist(expectedMetrics, metrics); NrAssert.Multiple(