diff --git a/src/Agent/NewRelic/Agent/Core/Metrics/MetricNames.cs b/src/Agent/NewRelic/Agent/Core/Metrics/MetricNames.cs index 2f399603a7..81c08c4bd1 100644 --- a/src/Agent/NewRelic/Agent/Core/Metrics/MetricNames.cs +++ b/src/Agent/NewRelic/Agent/Core/Metrics/MetricNames.cs @@ -353,6 +353,9 @@ public enum MessageBrokerAction Consume, Peek, Purge, + Process, + Settle, + Cancel } public const string MessageBrokerPrefix = "MessageBroker"; diff --git a/src/Agent/NewRelic/Agent/Core/Transactions/Transaction.cs b/src/Agent/NewRelic/Agent/Core/Transactions/Transaction.cs index 9cbe293901..a6c93f9d65 100644 --- a/src/Agent/NewRelic/Agent/Core/Transactions/Transaction.cs +++ b/src/Agent/NewRelic/Agent/Core/Transactions/Transaction.cs @@ -414,6 +414,12 @@ private static MetricNames.MessageBrokerAction AgentWrapperApiEnumToMetricNamesE return MetricNames.MessageBrokerAction.Produce; case MessageBrokerAction.Purge: return MetricNames.MessageBrokerAction.Purge; + case MessageBrokerAction.Process: + return MetricNames.MessageBrokerAction.Process; + case MessageBrokerAction.Settle: + return MetricNames.MessageBrokerAction.Settle; + case MessageBrokerAction.Cancel: + return MetricNames.MessageBrokerAction.Cancel; default: throw new InvalidOperationException("Unexpected enum value: " + wrapper); } diff --git a/src/Agent/NewRelic/Agent/Extensions/NewRelic.Agent.Extensions/Providers/Wrapper/Constants.cs b/src/Agent/NewRelic/Agent/Extensions/NewRelic.Agent.Extensions/Providers/Wrapper/Constants.cs index 116c412f80..6457accbf9 100644 --- a/src/Agent/NewRelic/Agent/Extensions/NewRelic.Agent.Extensions/Providers/Wrapper/Constants.cs +++ b/src/Agent/NewRelic/Agent/Extensions/NewRelic.Agent.Extensions/Providers/Wrapper/Constants.cs @@ -59,6 +59,9 @@ public enum MessageBrokerAction Consume, Peek, Purge, + Process, + Settle, + Cancel } ///This enum must be a sequence of values starting with 0 and incrementing by 1. See MetricNames.GetEnumerationFunc diff --git a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusProcessorWrapper.cs b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusProcessorWrapper.cs new file mode 100644 index 0000000000..f46154639b --- /dev/null +++ b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusProcessorWrapper.cs @@ -0,0 +1,36 @@ +// Copyright 2020 New Relic, Inc. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +using System.Threading.Tasks; +using NewRelic.Agent.Api; +using NewRelic.Agent.Extensions.Providers.Wrapper; + +namespace NewRelic.Providers.Wrapper.AzureServiceBus; + +public class AzureServiceBusProcessorWrapper : AzureServiceBusWrapperBase +{ + public override bool IsTransactionRequired => true; + + public override CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMethodInfo) + { + var canWrap = instrumentedMethodInfo.RequestedWrapperName.Equals(nameof(AzureServiceBusProcessorWrapper)); + return new CanWrapResponse(canWrap); + } + + public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent, ITransaction transaction) + { + if (instrumentedMethodCall.IsAsync) + transaction.AttachToAsync(); + + // this call wraps the client event handler callback, so start a method segment that will time the callback + var segment = transaction.StartMethodSegment( + instrumentedMethodCall.MethodCall, + instrumentedMethodCall.MethodCall.Method.Type.Name, + instrumentedMethodCall.MethodCall.Method.MethodName); + + return instrumentedMethodCall.IsAsync ? + Delegates.GetAsyncDelegateFor(agent, segment) + : + Delegates.GetDelegateFor(segment); + } +} diff --git a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiveWrapper.cs b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiveWrapper.cs index 01e58c33f8..8ca4852379 100644 --- a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiveWrapper.cs +++ b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiveWrapper.cs @@ -16,6 +16,11 @@ public class AzureServiceBusReceiveWrapper : AzureServiceBusWrapperBase { private static readonly ConcurrentDictionary> _getResultFromGenericTask = new(); + private Func _innerReceiverAccessor; + private Func _innerReceiverIsProcessorAccessor; + + public override bool IsTransactionRequired => false; // only partially true. See the code below... + public override CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMethodInfo) { var canWrap = instrumentedMethodInfo.RequestedWrapperName.Equals(nameof(AzureServiceBusReceiveWrapper)); @@ -24,32 +29,71 @@ public override CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMetho public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent, ITransaction transaction) { + transaction.LogFinest("AzureServiceBusReceiveWrapper.BeforeWrappedMethod() is starting."); + dynamic serviceBusReceiver = instrumentedMethodCall.MethodCall.InvocationTarget; string queueName = serviceBusReceiver.EntityPath; // some-queue-name - string fqns = serviceBusReceiver.FullyQualifiedNamespace; // some-service-bus-entity.servicebus.windows.net + string fqns = serviceBusReceiver.FullyQualifiedNamespace; // some-service-bus-entity.servicebus.windows.net + + _innerReceiverAccessor ??= VisibilityBypasser.Instance.GeneratePropertyAccessor(serviceBusReceiver.GetType(), "InnerReceiver"); + object innerReceiver = _innerReceiverAccessor.Invoke(serviceBusReceiver); + + // use reflection to access the _isProcessor field of the inner receiver + _innerReceiverIsProcessorAccessor ??= VisibilityBypasser.Instance.GenerateFieldReadAccessor(innerReceiver.GetType(), "_isProcessor"); + var isProcessor = _innerReceiverIsProcessorAccessor.Invoke(innerReceiver); + + var instrumentedMethodName = instrumentedMethodCall.MethodCall.Method.MethodName; MessageBrokerAction action = - instrumentedMethodCall.MethodCall.Method.MethodName switch + instrumentedMethodName switch { "ReceiveMessagesAsync" => MessageBrokerAction.Consume, "ReceiveDeferredMessagesAsync" => MessageBrokerAction.Consume, "PeekMessagesInternalAsync" => MessageBrokerAction.Peek, - "AbandonMessageAsync" => MessageBrokerAction.Purge, // TODO is this correct ??? Abandon sends the message back to the queue for re-delivery - "CompleteMessageAsync" => MessageBrokerAction.Consume, - "DeadLetterInternalAsync" => MessageBrokerAction.Purge, // TODO is this correct ??? - "DeferMessageAsync" => MessageBrokerAction.Consume, // TODO is this correct or should we extend MessageBrokerAction with more values??? - "RenewMessageLockAsync" => MessageBrokerAction.Consume, // TODO is this correct or should we extend MessageBrokerAction with more values??? - _ => throw new ArgumentOutOfRangeException(nameof(action), $"Unexpected instrumented method call: {instrumentedMethodCall.MethodCall.Method.MethodName}") + "AbandonMessageAsync" => MessageBrokerAction.Settle, + "CompleteMessageAsync" => MessageBrokerAction.Settle, + "DeadLetterInternalAsync" => MessageBrokerAction.Settle, + "DeferMessageAsync" => MessageBrokerAction.Settle, + "RenewMessageLockAsync" => MessageBrokerAction.Consume, // TODO This doesn't quite fit. OTEL uses a default action with no name for this + _ => throw new ArgumentOutOfRangeException(nameof(action), $"Unexpected instrumented method call: {instrumentedMethodName}") }; - // start a message broker segment + // If the inner receiver is configured as a processor and this is a ReceiveMessagesAsync call, start a transaction. + // The transaction will end at the conclusion of ReceiverManager.ProcessOneMessageWithinScopeAsync() + if (isProcessor && instrumentedMethodName == "ReceiveMessagesAsync") + { + transaction = agent.CreateTransaction( + destinationType: MessageBrokerDestinationType.Queue, + BrokerVendorName, + destination: queueName); + + if (instrumentedMethodCall.IsAsync) + { + transaction.DetachFromPrimary(); + } + + transaction.LogFinest("Created transaction for ReceiveMessagesAsync in processor mode."); + } + else + { + transaction = agent.CurrentTransaction; + transaction.LogFinest($"Using existing transaction for {instrumentedMethodName}."); + } + + if (instrumentedMethodCall.IsAsync) + { + transaction.AttachToAsync(); + } + + + // start a message broker segment (only happens if transaction is not NoOpTransaction) var segment = transaction.StartMessageBrokerSegment( instrumentedMethodCall.MethodCall, MessageBrokerDestinationType.Queue, action, BrokerVendorName, queueName, - serverAddress: fqns ); + serverAddress: fqns); return instrumentedMethodCall.IsAsync ? @@ -58,71 +102,39 @@ public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMetho agent, segment, false, - HandleResponse, - TaskContinuationOptions.ExecuteSynchronously) - : Delegates.GetDelegateFor( - onFailure: transaction.NoticeError, - onComplete: segment.End, - onSuccess: ExtractDTHeadersIfAvailable); - - void HandleResponse(Task responseTask) - { - try - { - if (responseTask.IsFaulted) + (responseTask) => { - transaction.NoticeError(responseTask.Exception); - return; - } - - var resultObj = GetTaskResultFromObject(responseTask); - ExtractDTHeadersIfAvailable(resultObj); - } - finally - { - segment.End(); - } - } - - - - void ExtractDTHeadersIfAvailable(object resultObj) - { - if (resultObj != null) - { - switch (instrumentedMethodCall.MethodCall.Method.MethodName) - { - case "ReceiveMessagesAsync": - case "ReceiveDeferredMessagesAsync": - case "PeekMessagesInternalAsync": - // the response contains a list of messages. - // get the first message from the response and extract DT headers - dynamic messages = resultObj; - if (messages.Count > 0) + try + { + if (responseTask.IsFaulted) { - var msg = messages[0]; - if (msg.ApplicationProperties is ReadOnlyDictionary applicationProperties) - { - transaction.AcceptDistributedTraceHeaders(applicationProperties, ProcessHeaders, TransportType.Queue); - } + transaction.NoticeError(responseTask.Exception); } - break; - } - } - IEnumerable ProcessHeaders(ReadOnlyDictionary applicationProperties, string key) - { - var headerValues = new List(); - foreach (var item in applicationProperties) - { - if (item.Key.Equals(key, StringComparison.OrdinalIgnoreCase)) + + HandleReceiveResponse(responseTask, instrumentedMethodName, transaction, isProcessor); + } + catch (Exception ex) { - headerValues.Add(item.Value as string); + transaction.LogFinest($"Unexpected exception: {ex.Message}"); } - } + finally + { + transaction.LogFinest($"Ending segment for {instrumentedMethodName}."); + segment.End(); - return headerValues; - } - } + if (isProcessor && responseTask.IsCanceled) + { + transaction.LogFinest("ReceiveMessagesAsync task was canceled in processor mode. Ignoring transaction."); + transaction.Ignore(); + } + } + }, + TaskContinuationOptions.ExecuteSynchronously) + : + Delegates.GetDelegateFor( + onFailure: transaction.NoticeError, + onComplete: segment.End, + onSuccess: (resultObj) => ExtractDTHeadersIfAvailable(resultObj, transaction, instrumentedMethodName, isProcessor)); } private static object GetTaskResultFromObject(object taskObj) @@ -136,8 +148,63 @@ private static object GetTaskResultFromObject(object taskObj) { return null; } + if (task.IsCanceled) + { + return null; + } var getResponse = _getResultFromGenericTask.GetOrAdd(task.GetType(), t => VisibilityBypasser.Instance.GeneratePropertyAccessor(t, "Result")); return getResponse(task); } + + private static void HandleReceiveResponse(Task responseTask, string instrumentedMethodName, ITransaction transaction, bool isProcessor) + { + var resultObj = GetTaskResultFromObject(responseTask); + ExtractDTHeadersIfAvailable(resultObj, transaction, instrumentedMethodName, isProcessor); + } + private static void ExtractDTHeadersIfAvailable(object resultObj, ITransaction transaction, string instrumentedMethodName, bool isProcessor) + { + if (resultObj != null) + { + switch (instrumentedMethodName) + { + case "ReceiveMessagesAsync": + case "ReceiveDeferredMessagesAsync": + case "PeekMessagesInternalAsync": + // the response contains a list of messages. + // get the first message from the response and extract DT headers + dynamic messages = resultObj; + if (messages.Count > 0) + { + transaction.LogFinest($"Received {messages.Count} message(s). Accepting DT headers."); + var msg = messages[0]; + if (msg.ApplicationProperties is ReadOnlyDictionary applicationProperties) + { + transaction.AcceptDistributedTraceHeaders(applicationProperties, ProcessHeaders, TransportType.Queue); + } + } + else if (messages.Count == 0 && isProcessor) // if there are no messages and the receiver is a processor, ignore the transaction we created + { + transaction.LogFinest("No messages received. Ignoring transaction."); + transaction.Ignore(); + } + break; + } + } + } + + private static IEnumerable ProcessHeaders(ReadOnlyDictionary applicationProperties, string key) + { + var headerValues = new List(); + foreach (var item in applicationProperties) + { + if (item.Key.Equals(key, StringComparison.OrdinalIgnoreCase)) + { + headerValues.Add(item.Value as string); + } + } + + return headerValues; + } + } diff --git a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiverManagerWrapper.cs b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiverManagerWrapper.cs new file mode 100644 index 0000000000..b18a7e9841 --- /dev/null +++ b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusReceiverManagerWrapper.cs @@ -0,0 +1,63 @@ +// Copyright 2020 New Relic, Inc. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +using System; +using System.Threading.Tasks; +using NewRelic.Agent.Api; +using NewRelic.Agent.Extensions.Providers.Wrapper; +using NewRelic.Reflection; + +namespace NewRelic.Providers.Wrapper.AzureServiceBus +{ + public class AzureServiceBusReceiverManagerWrapper : AzureServiceBusWrapperBase + { + private Func _receiverAccessor; + public override bool IsTransactionRequired => true; + + public override CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMethodInfo) + { + var canWrap = instrumentedMethodInfo.RequestedWrapperName.Equals(nameof(AzureServiceBusReceiverManagerWrapper)); + return new CanWrapResponse(canWrap); + } + + public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent, ITransaction transaction) + { + var receiverManager = instrumentedMethodCall.MethodCall.InvocationTarget; + _receiverAccessor ??= VisibilityBypasser.Instance.GeneratePropertyAccessor(receiverManager.GetType(), "Receiver"); + dynamic receiver = _receiverAccessor(receiverManager); + + string queueName = receiver.EntityPath; // some-queue-name + string fqns = receiver.FullyQualifiedNamespace; // some-service-bus-entity.servicebus.windows.net + + if (instrumentedMethodCall.IsAsync) + transaction.AttachToAsync(); + + // start a new MessageBroker segment that wraps ProcessOneMessageWithinScopeAsync + var segment = transaction.StartMessageBrokerSegment( + instrumentedMethodCall.MethodCall, + MessageBrokerDestinationType.Queue, + MessageBrokerAction.Process, // TODO: This is a new action, added for this instrumentation + BrokerVendorName, + queueName, + serverAddress: fqns); + + return instrumentedMethodCall.IsAsync + ? + Delegates.GetAsyncDelegateFor( + agent, + segment, + false, + onComplete: _ => + { + segment.End(); + transaction.End(); + }, TaskContinuationOptions.ExecuteSynchronously) + : + Delegates.GetDelegateFor(onComplete: () => + { + segment.End(); + transaction.End(); + }); + } + } +} diff --git a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusSendWrapper.cs b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusSendWrapper.cs index e8d8b4a9a7..85d91db59b 100644 --- a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusSendWrapper.cs +++ b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusSendWrapper.cs @@ -29,10 +29,15 @@ public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMetho { "SendMessagesAsync" => MessageBrokerAction.Produce, "ScheduleMessagesAsync" => MessageBrokerAction.Produce, - "CancelScheduledMessagesAsync" => MessageBrokerAction.Purge, // TODO is this correct ??? + "CancelScheduledMessagesAsync" => MessageBrokerAction.Cancel, _ => throw new ArgumentOutOfRangeException(nameof(action), $"Unexpected instrumented method call: {instrumentedMethodCall.MethodCall.Method.MethodName}") }; + if (instrumentedMethodCall.IsAsync) + { + transaction.AttachToAsync(); + } + // start a message broker segment var segment = transaction.StartMessageBrokerSegment( instrumentedMethodCall.MethodCall, diff --git a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusWrapperBase.cs b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusWrapperBase.cs index 97d9332c93..ff6b24adde 100644 --- a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusWrapperBase.cs +++ b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/AzureServiceBusWrapperBase.cs @@ -4,17 +4,15 @@ using NewRelic.Agent.Api; using NewRelic.Agent.Extensions.Providers.Wrapper; -namespace NewRelic.Providers.Wrapper.AzureServiceBus -{ - public abstract class AzureServiceBusWrapperBase : IWrapper - { - protected const string BrokerVendorName = "AzureServiceBus"; +namespace NewRelic.Providers.Wrapper.AzureServiceBus; - public bool IsTransactionRequired => true; // only instrument service bus methods if we're already in a transaction +public abstract class AzureServiceBusWrapperBase : IWrapper +{ + protected const string BrokerVendorName = "ServiceBus"; - public abstract CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMethodInfo); + public virtual bool IsTransactionRequired => true; // only instrument service bus methods if we're already in a transaction - public abstract AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent,ITransaction transaction); + public abstract CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMethodInfo); - } + public abstract AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent, ITransaction transaction); } diff --git a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/Instrumentation.xml b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/Instrumentation.xml index c1ea3db422..d0143c6830 100644 --- a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/Instrumentation.xml +++ b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/AzureServiceBus/Instrumentation.xml @@ -107,5 +107,23 @@ SPDX-License-Identifier: Apache-2.0 + + + + + + + + + + + + diff --git a/tests/Agent/IntegrationTests/IntegrationTestHelpers/AgentLogBase.cs b/tests/Agent/IntegrationTests/IntegrationTestHelpers/AgentLogBase.cs index 538a620818..adb614b304 100644 --- a/tests/Agent/IntegrationTests/IntegrationTestHelpers/AgentLogBase.cs +++ b/tests/Agent/IntegrationTests/IntegrationTestHelpers/AgentLogBase.cs @@ -72,6 +72,7 @@ public abstract class AgentLogBase // Transactions (either with an ID or "noop") public const string TransactionLinePrefix = FinestLogLinePrefixRegex + @"Trx ([a-fA-F0-9]*|Noop): "; + public const string TransactionAlreadyEndedLogLineRegex = TransactionLinePrefix + "Transaction has already ended(.*)"; // Serverless payloads public const string ServerlessPayloadLogLineRegex = FinestLogLinePrefixRegex + @"Serverless payload: (.*)"; diff --git a/tests/Agent/IntegrationTests/SharedApplications/Common/MFALatestPackages/MFALatestPackages.csproj b/tests/Agent/IntegrationTests/SharedApplications/Common/MFALatestPackages/MFALatestPackages.csproj index 77d359b9c1..9ab6f5a60b 100644 --- a/tests/Agent/IntegrationTests/SharedApplications/Common/MFALatestPackages/MFALatestPackages.csproj +++ b/tests/Agent/IntegrationTests/SharedApplications/Common/MFALatestPackages/MFALatestPackages.csproj @@ -9,10 +9,7 @@ - - - - + @@ -34,7 +31,10 @@ - + + + + diff --git a/tests/Agent/IntegrationTests/SharedApplications/Common/MultiFunctionApplicationHelpers/MultiFunctionApplicationHelpers.csproj b/tests/Agent/IntegrationTests/SharedApplications/Common/MultiFunctionApplicationHelpers/MultiFunctionApplicationHelpers.csproj index f301901e8d..993654c7ac 100644 --- a/tests/Agent/IntegrationTests/SharedApplications/Common/MultiFunctionApplicationHelpers/MultiFunctionApplicationHelpers.csproj +++ b/tests/Agent/IntegrationTests/SharedApplications/Common/MultiFunctionApplicationHelpers/MultiFunctionApplicationHelpers.csproj @@ -275,7 +275,7 @@ - + diff --git a/tests/Agent/IntegrationTests/SharedApplications/Common/MultiFunctionApplicationHelpers/NetStandardLibraries/AzureServiceBus/AzureServiceBusExerciser.cs b/tests/Agent/IntegrationTests/SharedApplications/Common/MultiFunctionApplicationHelpers/NetStandardLibraries/AzureServiceBus/AzureServiceBusExerciser.cs index 397e2f0953..59e1b775d1 100644 --- a/tests/Agent/IntegrationTests/SharedApplications/Common/MultiFunctionApplicationHelpers/NetStandardLibraries/AzureServiceBus/AzureServiceBusExerciser.cs +++ b/tests/Agent/IntegrationTests/SharedApplications/Common/MultiFunctionApplicationHelpers/NetStandardLibraries/AzureServiceBus/AzureServiceBusExerciser.cs @@ -3,6 +3,7 @@ using System; using System.Runtime.CompilerServices; +using System.Threading; using System.Threading.Tasks; using Azure.Messaging.ServiceBus; using Azure.Messaging.ServiceBus.Administration; @@ -70,18 +71,16 @@ public static async Task ExerciseMultipleReceiveOperationsOnAMessage(string queu [LibraryMethod] [Transaction] [MethodImpl(MethodImplOptions.NoOptimization | MethodImplOptions.NoInlining)] - public static async Task ScheduleAndReceiveAMessage(string queueName) + public static async Task ScheduleAndCancelAMessage(string queueName) { await using var client = new ServiceBusClient(AzureServiceBusConfiguration.ConnectionString); await using var sender = client.CreateSender(queueName); var message = new ServiceBusMessage("Hello world!"); - await sender.ScheduleMessageAsync(message, DateTime.UtcNow.AddSeconds(5)); + var messageSequenceId = await sender.ScheduleMessageAsync(message, DateTime.UtcNow.AddSeconds(90)); - await Task.Delay(TimeSpan.FromSeconds(10)); - - await using var receiver = client.CreateReceiver(queueName, new ServiceBusReceiverOptions() { ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete }); - await receiver.ReceiveMessageAsync(); + // cancel the scheduled message + await sender.CancelScheduledMessageAsync(messageSequenceId); } [LibraryMethod] @@ -121,7 +120,8 @@ public static async Task ReceiveAndAbandonAMessage(string queueName) // receive the message again and complete it to remove it from the queue var receivedMessage2 = await receiver.ReceiveMessageAsync(); - await receiver.CompleteMessageAsync(receivedMessage2); } + await receiver.CompleteMessageAsync(receivedMessage2); + } private static async Task SendAMessage(ServiceBusClient client, string queueName, string messageBody) @@ -143,4 +143,79 @@ public static async Task SendAndReceiveAMessage(string queueName) await using var receiver = client.CreateReceiver(queueName, new ServiceBusReceiverOptions() { ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete }); await receiver.ReceiveMessageAsync(); } + + [LibraryMethod] + // [Transaction] no transaction on this one; we're testing that the ServiceBusProcessor is creating transactions + [MethodImpl(MethodImplOptions.NoOptimization | MethodImplOptions.NoInlining)] + public static async Task ExerciseServiceBusProcessor(string queueName) + { + await using var client = new ServiceBusClient(AzureServiceBusConfiguration.ConnectionString); + + // create the sender + await using ServiceBusSender sender = client.CreateSender(queueName); + + // create a set of messages that we can send + ServiceBusMessage[] messages = new ServiceBusMessage[] + { + new ServiceBusMessage("First"), + new ServiceBusMessage("Second") + }; + + // send the message batch + await sender.SendMessagesAsync(messages); + + // create the options to use for configuring the processor + ServiceBusProcessorOptions options = new() + { + MaxConcurrentCalls = 1 // multi-threading. Yay! + }; + + // create a processor that we can use to process the messages + await using ServiceBusProcessor processor = client.CreateProcessor(queueName, options); + + var receivedMessages = 0; + + // configure the message handler to use + processor.ProcessMessageAsync += MessageHandler; + processor.ProcessErrorAsync += ErrorHandler; // ErrorHandler is required, but we won't exercise it + + async Task MessageHandler(ProcessMessageEventArgs args) + { + string body = args.Message.Body.ToString(); + var threadId = Thread.CurrentThread.ManagedThreadId; + Console.WriteLine($"ThreadId: {threadId} - body: {body}"); + + Interlocked.Increment(ref receivedMessages); + + await Task.Delay(1000); // simulate processing the message + } + + Task ErrorHandler(ProcessErrorEventArgs args) + { + // the error source tells me at what point in the processing an error occurred + Console.WriteLine(args.ErrorSource); + // the fully qualified namespace is available + Console.WriteLine(args.FullyQualifiedNamespace); + // as well as the entity path + Console.WriteLine(args.EntityPath); + Console.WriteLine(args.Exception.ToString()); + return Task.CompletedTask; + } + + // start processing + await processor.StartProcessingAsync(); + + // wait up to 30 seconds or until receivedMessages has a count of 2 + var timeout = DateTime.UtcNow.AddSeconds(30); + while (receivedMessages < 2 && DateTime.UtcNow < timeout) + { + await Task.Delay(1000); + } + + // chill for a bit + await Task.Delay(TimeSpan.FromSeconds(5)); + + // stop processing + await processor.StopProcessingAsync(); + } } diff --git a/tests/Agent/IntegrationTests/UnboundedIntegrationTests/AzureServiceBus/AzureServiceBusProcessorTests.cs b/tests/Agent/IntegrationTests/UnboundedIntegrationTests/AzureServiceBus/AzureServiceBusProcessorTests.cs new file mode 100644 index 0000000000..a76b5b6e8d --- /dev/null +++ b/tests/Agent/IntegrationTests/UnboundedIntegrationTests/AzureServiceBus/AzureServiceBusProcessorTests.cs @@ -0,0 +1,150 @@ +// Copyright 2020 New Relic, Inc. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +using System; +using System.Collections.Generic; +using System.Linq; +using NewRelic.Agent.IntegrationTestHelpers; +using NewRelic.Agent.IntegrationTestHelpers.RemoteServiceFixtures; +using NewRelic.Testing.Assertions; +using Xunit; +using Xunit.Abstractions; + +namespace NewRelic.Agent.UnboundedIntegrationTests.AzureServiceBus; + +public abstract class AzureServiceBusProcessorTestsBase : NewRelicIntegrationTest + where TFixture : ConsoleDynamicMethodFixture +{ + private readonly TFixture _fixture; + private readonly string _queueName; + + protected AzureServiceBusProcessorTestsBase(TFixture fixture, ITestOutputHelper output) : base(fixture) + { + _fixture = fixture; + _fixture.SetTimeout(TimeSpan.FromMinutes(1)); + _fixture.TestLogger = output; + + _queueName = $"test-queue-{Guid.NewGuid()}"; + + _fixture.AddCommand($"AzureServiceBusExerciser InitializeQueue {_queueName}"); + _fixture.AddCommand($"AzureServiceBusExerciser ExerciseServiceBusProcessor {_queueName}"); + _fixture.AddCommand($"AzureServiceBusExerciser DeleteQueue {_queueName}"); + + _fixture.AddActions + ( + setupConfiguration: () => + { + var configModifier = new NewRelicConfigModifier(fixture.DestinationNewRelicConfigFilePath); + + configModifier + .SetLogLevel("finest") + .EnableDistributedTrace() + .ForceTransactionTraces() + .ConfigureFasterMetricsHarvestCycle(20) + .ConfigureFasterSpanEventsHarvestCycle(20) + .ConfigureFasterTransactionTracesHarvestCycle(25) + ; + }, + exerciseApplication: () => + { + _fixture.AgentLog.WaitForLogLine(AgentLogBase.TransactionTransformCompletedLogLineRegex, TimeSpan.FromMinutes(1)); + } + ); + + _fixture.Initialize(); + } + + private readonly string _consumeMetricNameBase = "MessageBroker/ServiceBus/Queue/Consume/Named"; + private readonly string _processMetricNameBase = "MessageBroker/ServiceBus/Queue/Process/Named"; + private readonly string _settleMetricNameBase = "MessageBroker/ServiceBus/Queue/Settle/Named"; + private readonly string _transactionNameBase = "OtherTransaction/Message/ServiceBus/Queue/Named"; + + [Fact] + public void Test() + { + var metrics = _fixture.AgentLog.GetMetrics().ToList(); + + // 2 messages, 1 consume segment, 1 process segment, 1 settle segment per message + var expectedMetrics = new List + { + new() { metricName = $"{_consumeMetricNameBase}/{_queueName}", callCount = 2}, + new() { metricName = $"{_consumeMetricNameBase}/{_queueName}", callCount = 2, metricScope = $"{_transactionNameBase}/{_queueName}"}, + new() { metricName = $"{_processMetricNameBase}/{_queueName}", callCount = 2, metricScope = $"{_transactionNameBase}/{_queueName}"}, + new() { metricName = $"{_settleMetricNameBase}/{_queueName}", callCount = 2, metricScope = $"{_transactionNameBase}/{_queueName}"}, + }; + + var expectedTransactionEvent = _fixture.AgentLog.TryGetTransactionEvent($"{_transactionNameBase}/{_queueName}"); + + var expectedTransactionTraceSegments = new List + { + $"{_consumeMetricNameBase}/{_queueName}", + $"{_processMetricNameBase}/{_queueName}", + "DotNet/ServiceBusProcessor/OnProcessMessageAsync", + $"{_settleMetricNameBase}/{_queueName}", + }; + + var transactionSample = _fixture.AgentLog.TryGetTransactionSample($"{_transactionNameBase}/{_queueName}"); + + var queueConsumeSpanEvent = _fixture.AgentLog.TryGetSpanEvent($"{_consumeMetricNameBase}/{_queueName}"); + var queueProcessSpanEvent = _fixture.AgentLog.TryGetSpanEvent($"{_processMetricNameBase}/{_queueName}"); + + var expectedConsumeAgentAttributes = new List + { + "server.address", + "messaging.destination.name", + }; + + var expectedIntrinsicAttributes = new List { "span.kind", }; + + Assertions.MetricsExist(expectedMetrics, metrics); + + NrAssert.Multiple( + () => Assert.NotNull(expectedTransactionEvent), + () => Assert.NotNull(transactionSample), + () => Assert.NotNull(queueConsumeSpanEvent), + () => Assert.NotNull(queueProcessSpanEvent), + () => Assertions.TransactionTraceSegmentsExist(expectedTransactionTraceSegments, transactionSample), + + () => Assertions.SpanEventHasAttributes(expectedConsumeAgentAttributes, + Tests.TestSerializationHelpers.Models.SpanEventAttributeType.Agent, queueConsumeSpanEvent), + () => Assertions.SpanEventHasAttributes(expectedIntrinsicAttributes, + Tests.TestSerializationHelpers.Models.SpanEventAttributeType.Intrinsic, queueConsumeSpanEvent), + () => Assertions.SpanEventHasAttributes(expectedConsumeAgentAttributes, + Tests.TestSerializationHelpers.Models.SpanEventAttributeType.Agent, queueConsumeSpanEvent), + () => Assertions.SpanEventHasAttributes(expectedIntrinsicAttributes, + Tests.TestSerializationHelpers.Models.SpanEventAttributeType.Intrinsic, queueConsumeSpanEvent) + ); + } +} + +[NetFrameworkTest] +public class AzureServiceBusProcessorTestsFWLatest : AzureServiceBusProcessorTestsBase +{ + public AzureServiceBusProcessorTestsFWLatest(ConsoleDynamicMethodFixtureFWLatest fixture, ITestOutputHelper output) : base(fixture, output) + { + } +} + +[NetFrameworkTest] +public class AzureServiceBusProcessorTestsFW462 : AzureServiceBusProcessorTestsBase +{ + public AzureServiceBusProcessorTestsFW462(ConsoleDynamicMethodFixtureFW462 fixture, ITestOutputHelper output) : base(fixture, output) + { + } +} + +[NetCoreTest] +public class AzureServiceBusProcessorTestsCoreOldest : AzureServiceBusProcessorTestsBase +{ + public AzureServiceBusProcessorTestsCoreOldest(ConsoleDynamicMethodFixtureCoreOldest fixture, ITestOutputHelper output) : base(fixture, output) + { + } +} + +[NetCoreTest] +public class AzureServiceBusProcessorTestsCoreLatest : AzureServiceBusProcessorTestsBase +{ + public AzureServiceBusProcessorTestsCoreLatest(ConsoleDynamicMethodFixtureCoreLatest fixture, ITestOutputHelper output) : base(fixture, output) + { + } +} diff --git a/tests/Agent/IntegrationTests/UnboundedIntegrationTests/AzureServiceBus/AzureServiceBusTests.cs b/tests/Agent/IntegrationTests/UnboundedIntegrationTests/AzureServiceBus/AzureServiceBusTests.cs index 9b8f41c568..8a112113f0 100644 --- a/tests/Agent/IntegrationTests/UnboundedIntegrationTests/AzureServiceBus/AzureServiceBusTests.cs +++ b/tests/Agent/IntegrationTests/UnboundedIntegrationTests/AzureServiceBus/AzureServiceBusTests.cs @@ -28,7 +28,7 @@ protected AzureServiceBusTestsBase(TFixture fixture, ITestOutputHelper output) : _fixture.AddCommand($"AzureServiceBusExerciser InitializeQueue {_queueName}"); _fixture.AddCommand($"AzureServiceBusExerciser ExerciseMultipleReceiveOperationsOnAMessage {_queueName}"); - _fixture.AddCommand($"AzureServiceBusExerciser ScheduleAndReceiveAMessage {_queueName}"); + _fixture.AddCommand($"AzureServiceBusExerciser ScheduleAndCancelAMessage {_queueName}"); _fixture.AddCommand($"AzureServiceBusExerciser ReceiveAndDeadLetterAMessage {_queueName}"); _fixture.AddCommand($"AzureServiceBusExerciser ReceiveAndAbandonAMessage {_queueName}"); _fixture.AddCommand($"AzureServiceBusExerciser DeleteQueue {_queueName}"); @@ -62,40 +62,43 @@ public void Test() var expectedMetrics = new List { - new() { metricName = $"MessageBroker/AzureServiceBus/Queue/Produce/Named/{_queueName}", callCount = 4}, - new() { metricName = $"MessageBroker/AzureServiceBus/Queue/Produce/Named/{_queueName}", callCount = 1, metricScope = $"{_metricScopeBase}/ExerciseMultipleReceiveOperationsOnAMessage"}, - new() { metricName = $"MessageBroker/AzureServiceBus/Queue/Produce/Named/{_queueName}", callCount = 1, metricScope = $"{_metricScopeBase}/ScheduleAndReceiveAMessage"}, - new() { metricName = $"MessageBroker/AzureServiceBus/Queue/Produce/Named/{_queueName}", callCount = 1, metricScope = $"{_metricScopeBase}/ReceiveAndDeadLetterAMessage"}, - new() { metricName = $"MessageBroker/AzureServiceBus/Queue/Produce/Named/{_queueName}", callCount = 1, metricScope = $"{_metricScopeBase}/ReceiveAndAbandonAMessage"}, - - new() { metricName = $"MessageBroker/AzureServiceBus/Queue/Consume/Named/{_queueName}", callCount = 10}, - new() { metricName = $"MessageBroker/AzureServiceBus/Queue/Consume/Named/{_queueName}", callCount = 5, metricScope = $"{_metricScopeBase}/ExerciseMultipleReceiveOperationsOnAMessage"}, - new() { metricName = $"MessageBroker/AzureServiceBus/Queue/Consume/Named/{_queueName}", callCount = 1, metricScope = $"{_metricScopeBase}/ScheduleAndReceiveAMessage"}, - new() { metricName = $"MessageBroker/AzureServiceBus/Queue/Consume/Named/{_queueName}", callCount = 1, metricScope = $"{_metricScopeBase}/ReceiveAndDeadLetterAMessage"}, - new() { metricName = $"MessageBroker/AzureServiceBus/Queue/Consume/Named/{_queueName}", callCount = 3, metricScope = $"{_metricScopeBase}/ReceiveAndAbandonAMessage"}, - - new() { metricName = $"MessageBroker/AzureServiceBus/Queue/Peek/Named/{_queueName}", callCount = 1 }, - new() { metricName = $"MessageBroker/AzureServiceBus/Queue/Peek/Named/{_queueName}", callCount = 1, metricScope = $"{_metricScopeBase}/ExerciseMultipleReceiveOperationsOnAMessage" }, - - new() { metricName = $"MessageBroker/AzureServiceBus/Queue/Purge/Named/{_queueName}", callCount = 2 }, - new() { metricName = $"MessageBroker/AzureServiceBus/Queue/Purge/Named/{_queueName}", callCount = 1, metricScope = $"{_metricScopeBase}/ReceiveAndDeadLetterAMessage" }, - new() { metricName = $"MessageBroker/AzureServiceBus/Queue/Purge/Named/{_queueName}", callCount = 1, metricScope = $"{_metricScopeBase}/ReceiveAndAbandonAMessage" }, + new() { metricName = $"MessageBroker/ServiceBus/Queue/Produce/Named/{_queueName}", callCount = 4}, + new() { metricName = $"MessageBroker/ServiceBus/Queue/Produce/Named/{_queueName}", callCount = 1, metricScope = $"{_metricScopeBase}/ExerciseMultipleReceiveOperationsOnAMessage"}, + new() { metricName = $"MessageBroker/ServiceBus/Queue/Produce/Named/{_queueName}", callCount = 1, metricScope = $"{_metricScopeBase}/ScheduleAndCancelAMessage"}, + new() { metricName = $"MessageBroker/ServiceBus/Queue/Produce/Named/{_queueName}", callCount = 1, metricScope = $"{_metricScopeBase}/ReceiveAndDeadLetterAMessage"}, + new() { metricName = $"MessageBroker/ServiceBus/Queue/Produce/Named/{_queueName}", callCount = 1, metricScope = $"{_metricScopeBase}/ReceiveAndAbandonAMessage"}, + + new() { metricName = $"MessageBroker/ServiceBus/Queue/Consume/Named/{_queueName}", callCount = 6}, + new() { metricName = $"MessageBroker/ServiceBus/Queue/Consume/Named/{_queueName}", callCount = 3, metricScope = $"{_metricScopeBase}/ExerciseMultipleReceiveOperationsOnAMessage"}, + new() { metricName = $"MessageBroker/ServiceBus/Queue/Consume/Named/{_queueName}", callCount = 1, metricScope = $"{_metricScopeBase}/ReceiveAndDeadLetterAMessage"}, + new() { metricName = $"MessageBroker/ServiceBus/Queue/Consume/Named/{_queueName}", callCount = 2, metricScope = $"{_metricScopeBase}/ReceiveAndAbandonAMessage"}, + + new() { metricName = $"MessageBroker/ServiceBus/Queue/Peek/Named/{_queueName}", callCount = 1 }, + new() { metricName = $"MessageBroker/ServiceBus/Queue/Peek/Named/{_queueName}", callCount = 1, metricScope = $"{_metricScopeBase}/ExerciseMultipleReceiveOperationsOnAMessage" }, + + new() { metricName = $"MessageBroker/ServiceBus/Queue/Cancel/Named/{_queueName}", callCount = 1 }, + new() { metricName = $"MessageBroker/ServiceBus/Queue/Cancel/Named/{_queueName}", callCount = 1, metricScope = $"{_metricScopeBase}/ScheduleAndCancelAMessage" }, + + new() { metricName = $"MessageBroker/ServiceBus/Queue/Settle/Named/{_queueName}", callCount = 5 }, + new() { metricName = $"MessageBroker/ServiceBus/Queue/Settle/Named/{_queueName}", callCount = 2, metricScope = $"{_metricScopeBase}/ExerciseMultipleReceiveOperationsOnAMessage"}, + new() { metricName = $"MessageBroker/ServiceBus/Queue/Settle/Named/{_queueName}", callCount = 1, metricScope = $"{_metricScopeBase}/ReceiveAndDeadLetterAMessage" }, + new() { metricName = $"MessageBroker/ServiceBus/Queue/Settle/Named/{_queueName}", callCount = 2, metricScope = $"{_metricScopeBase}/ReceiveAndAbandonAMessage" }, }; var exerciseMultipleReceiveOperationsOnAMessageTransactionEvent = _fixture.AgentLog.TryGetTransactionEvent($"{_metricScopeBase}/ExerciseMultipleReceiveOperationsOnAMessage"); - var scheduleAndReceiveAMessageTransactionEvent = _fixture.AgentLog.TryGetTransactionEvent($"{_metricScopeBase}/ScheduleAndReceiveAMessage"); var expectedTransactionTraceSegments = new List { - $"MessageBroker/AzureServiceBus/Queue/Consume/Named/{_queueName}" + $"MessageBroker/ServiceBus/Queue/Consume/Named/{_queueName}" }; - var transactionSample = _fixture.AgentLog.TryGetTransactionSample($"{_metricScopeBase}/ScheduleAndReceiveAMessage"); // this will always be the slowest transaction + var transactionSample = _fixture.AgentLog.TryGetTransactionSample($"{_metricScopeBase}/ExerciseMultipleReceiveOperationsOnAMessage"); - var queueProduceSpanEvents = _fixture.AgentLog.TryGetSpanEvent($"MessageBroker/AzureServiceBus/Queue/Produce/Named/{_queueName}"); - var queueConsumeSpanEvents = _fixture.AgentLog.TryGetSpanEvent($"MessageBroker/AzureServiceBus/Queue/Consume/Named/{_queueName}"); - var queuePeekSpanEvents = _fixture.AgentLog.TryGetSpanEvent($"MessageBroker/AzureServiceBus/Queue/Peek/Named/{_queueName}"); - var queuePurgeSpanEvents = _fixture.AgentLog.TryGetSpanEvent($"MessageBroker/AzureServiceBus/Queue/Purge/Named/{_queueName}"); + var queueProduceSpanEvents = _fixture.AgentLog.TryGetSpanEvent($"MessageBroker/ServiceBus/Queue/Produce/Named/{_queueName}"); + var queueConsumeSpanEvents = _fixture.AgentLog.TryGetSpanEvent($"MessageBroker/ServiceBus/Queue/Consume/Named/{_queueName}"); + var queuePeekSpanEvents = _fixture.AgentLog.TryGetSpanEvent($"MessageBroker/ServiceBus/Queue/Peek/Named/{_queueName}"); + var queueSettleSpanEvents = _fixture.AgentLog.TryGetSpanEvent($"MessageBroker/ServiceBus/Queue/Settle/Named/{_queueName}"); + var queueCancelSpanEvents = _fixture.AgentLog.TryGetSpanEvent($"MessageBroker/ServiceBus/Queue/Cancel/Named/{_queueName}"); var expectedProduceAgentAttributes = new List { @@ -116,7 +119,13 @@ public void Test() "messaging.destination.name", }; - var expectedPurgeAgentAttributes = new List + var expectedSettleAgentAttributes = new List + { + "server.address", + "messaging.destination.name", + }; + + var expectedCancelAgentAttributes = new List { "server.address", "messaging.destination.name", @@ -128,7 +137,6 @@ public void Test() NrAssert.Multiple( () => Assert.True(exerciseMultipleReceiveOperationsOnAMessageTransactionEvent != null, "ExerciseMultipleReceiveOperationsOnAMessageTransactionEvent should not be null"), - () => Assert.True(scheduleAndReceiveAMessageTransactionEvent != null, "ScheduleAndReceiveAMessageTransactionEvent should not be null"), () => Assert.True(transactionSample != null, "transactionSample should not be null"), () => Assertions.TransactionTraceSegmentsExist(expectedTransactionTraceSegments, transactionSample), @@ -145,8 +153,11 @@ public void Test() () => Assertions.SpanEventHasAttributes(expectedPeekAgentAttributes, Tests.TestSerializationHelpers.Models.SpanEventAttributeType.Agent, queuePeekSpanEvents), - () => Assertions.SpanEventHasAttributes(expectedPurgeAgentAttributes, - Tests.TestSerializationHelpers.Models.SpanEventAttributeType.Agent, queuePurgeSpanEvents) + () => Assertions.SpanEventHasAttributes(expectedSettleAgentAttributes, + Tests.TestSerializationHelpers.Models.SpanEventAttributeType.Agent, queueSettleSpanEvents), + + () => Assertions.SpanEventHasAttributes(expectedCancelAgentAttributes, + Tests.TestSerializationHelpers.Models.SpanEventAttributeType.Agent, queueCancelSpanEvents) ); } }