diff --git a/pom.xml b/pom.xml index 9059632b..0e5a94ee 100644 --- a/pom.xml +++ b/pom.xml @@ -222,6 +222,11 @@ mule-sdk-compatibility-api ${muleSdkCompatibilityApiVersion} + + org.mule.sdk + mule-sdk-api + ${muleSdkApiVersion} + diff --git a/src/main/java/org/mule/extensions/jms/internal/operation/JmsConsume.java b/src/main/java/org/mule/extensions/jms/internal/operation/JmsConsume.java index 27f913ba..cde28e26 100644 --- a/src/main/java/org/mule/extensions/jms/internal/operation/JmsConsume.java +++ b/src/main/java/org/mule/extensions/jms/internal/operation/JmsConsume.java @@ -8,6 +8,8 @@ import static org.mule.extensions.jms.internal.common.JmsCommons.EXAMPLE_CONTENT_TYPE; import static org.mule.extensions.jms.internal.common.JmsCommons.EXAMPLE_ENCODING; +import static org.mule.extensions.jms.internal.operation.profiling.tracing.JmsConsumeSpanCustomizer.getJmsConsumeSpanCustomizer; + import static org.slf4j.LoggerFactory.getLogger; import org.mule.extensions.jms.api.config.ConsumerAckMode; @@ -20,7 +22,6 @@ import org.mule.extensions.jms.internal.connection.session.JmsSessionManager; import org.mule.extensions.jms.internal.metadata.JmsOutputResolver; import org.mule.jms.commons.api.AttributesOutputResolver; -import org.mule.jms.commons.api.message.JmsAttributes; import org.mule.jms.commons.internal.connection.JmsTransactionalConnection; import org.mule.runtime.api.connection.ConnectionException; import org.mule.runtime.api.lifecycle.Disposable; @@ -35,14 +36,14 @@ import org.mule.runtime.extension.api.annotation.param.display.Example; import org.mule.runtime.extension.api.annotation.param.display.Summary; import org.mule.runtime.extension.api.runtime.operation.Result; -import org.mule.runtime.extension.api.runtime.process.CompletionCallback; +import org.mule.runtime.extension.api.runtime.parameter.CorrelationInfo; import org.mule.runtime.extension.api.tx.OperationTransactionalAction; +import org.mule.sdk.compatibility.api.utils.ForwardCompatibilityHelper; import java.util.concurrent.TimeUnit; import javax.inject.Inject; import javax.jms.Destination; -import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -65,6 +66,9 @@ public final class JmsConsume implements Initialisable, Disposable { private org.mule.jms.commons.internal.operation.JmsConsume jmsConsume; + @Inject + private java.util.Optional forwardCompatibilityHelper; + /** * Operation that allows the user to consume a single {@link Message} from a given {@link Destination}. * @@ -77,8 +81,8 @@ public final class JmsConsume implements Initialisable, Disposable { * @param selector a custom JMS selector for filtering the messages * @param contentType the {@link Message}'s content content type * @param encoding the {@link Message}'s content encoding - * @param maximumWait maximum time to wait for a message before timing out - * @param maximumWaitUnit Time unit to be used in the maximumWaitTime configurations + * @param maximumWait maximum time to wait for a message before timing out + * @param maximumWaitUnit Time unit to be used in the maximumWaitTime configurations * @return a {@link Result} with the {@link Message} content as {@link Result#getOutput} and its properties * and headers as {@link Result#getAttributes} * @throws JmsConsumeException if an error occurs @@ -97,8 +101,10 @@ public Result consume(@Config JmsConfig config, defaultValue = "10000") @Summary("Maximum time to wait for a message to arrive before timeout") Long maximumWait, @Optional( defaultValue = "MILLISECONDS") @Example("MILLISECONDS") @Summary("Time unit to be used in the maximumWaitTime configuration") TimeUnit maximumWaitUnit, - OperationTransactionalAction transactionalAction) + OperationTransactionalAction transactionalAction, + CorrelationInfo correlationInfo) throws JmsExtensionException, ConnectionException { + customizeCurrentSpan(connection, destination, consumerType, correlationInfo); return (Result) jmsConsume.consume(config, connection, destination, consumerType, ackMode, selector, contentType, encoding, maximumWait, maximumWaitUnit, transactionalAction); @@ -107,11 +113,20 @@ public Result consume(@Config JmsConfig config, @Override public void initialise() { - jmsConsume = new org.mule.jms.commons.internal.operation.JmsConsume(sessionManager, schedulerService); + jmsConsume = + new org.mule.jms.commons.internal.operation.JmsConsume(sessionManager, schedulerService); } @Override public void dispose() { jmsConsume.dispose(); } + + private void customizeCurrentSpan(JmsTransactionalConnection connection, String destination, + org.mule.jms.commons.api.destination.ConsumerType consumerType, + org.mule.runtime.extension.api.runtime.parameter.CorrelationInfo correlationInfo) { + forwardCompatibilityHelper + .ifPresent(fch -> getJmsConsumeSpanCustomizer().customizeSpan(fch.getDistributedTraceContextManager(correlationInfo), + connection, destination, consumerType)); + } } diff --git a/src/main/java/org/mule/extensions/jms/internal/operation/JmsPublish.java b/src/main/java/org/mule/extensions/jms/internal/operation/JmsPublish.java index 981d4979..553e0285 100644 --- a/src/main/java/org/mule/extensions/jms/internal/operation/JmsPublish.java +++ b/src/main/java/org/mule/extensions/jms/internal/operation/JmsPublish.java @@ -7,6 +7,8 @@ package org.mule.extensions.jms.internal.operation; import static org.mule.extensions.jms.internal.common.JmsCommons.QUEUE; +import static org.mule.extensions.jms.internal.operation.profiling.tracing.JmsPublishSpanCustomizer.getJmsPublishSpanCustomizer; + import static org.slf4j.LoggerFactory.getLogger; import org.mule.extensions.jms.api.config.JmsProducerConfig; @@ -33,6 +35,7 @@ import org.mule.runtime.extension.api.runtime.parameter.OutboundCorrelationStrategy; import org.mule.runtime.extension.api.runtime.process.CompletionCallback; import org.mule.runtime.extension.api.tx.OperationTransactionalAction; +import org.mule.sdk.compatibility.api.utils.ForwardCompatibilityHelper; import javax.inject.Inject; import javax.jms.Destination; @@ -55,6 +58,9 @@ public final class JmsPublish implements Initialisable, Disposable { @Inject private SchedulerService schedulerService; + @Inject + private java.util.Optional forwardCompatibilityHelper; + private org.mule.jms.commons.internal.operation.JmsPublish jmsPublish; /** @@ -85,10 +91,18 @@ public void publish(@Config JmsConfig config, @Connection JmsTransactionalConnec CompletionCallback completionCallback) throws JmsExtensionException { + customizeCurrentSpan(connection, destination, destinationType, correlationInfo); jmsPublish.publish(config, connection, destination, destinationType, messageBuilder, overrides, transactionalAction, sendCorrelationId, correlationInfo, completionCallback); } + private void customizeCurrentSpan(JmsTransactionalConnection connection, String destination, DestinationType destinationType, + CorrelationInfo correlationInfo) { + forwardCompatibilityHelper + .ifPresent(fch -> getJmsPublishSpanCustomizer().customizeSpan(fch.getDistributedTraceContextManager(correlationInfo), + connection, destination, destinationType)); + } + @Override public void dispose() { diff --git a/src/main/java/org/mule/extensions/jms/internal/operation/profiling/tracing/JmsConsumeSpanCustomizer.java b/src/main/java/org/mule/extensions/jms/internal/operation/profiling/tracing/JmsConsumeSpanCustomizer.java new file mode 100644 index 00000000..554c9d52 --- /dev/null +++ b/src/main/java/org/mule/extensions/jms/internal/operation/profiling/tracing/JmsConsumeSpanCustomizer.java @@ -0,0 +1,54 @@ +/* + * Copyright 2023 Salesforce, Inc. All rights reserved. + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.extensions.jms.internal.operation.profiling.tracing; + +import static org.mule.extensions.jms.internal.operation.profiling.tracing.SpanCustomizerUtils.safeExecute; +import static org.mule.jms.commons.internal.common.JmsCommons.getDestinationType; + +import static org.slf4j.LoggerFactory.getLogger; + +import org.mule.jms.commons.api.destination.ConsumerType; +import org.mule.jms.commons.internal.connection.JmsTransactionalConnection; +import org.mule.sdk.api.runtime.source.DistributedTraceContextManager; + +import java.util.Locale; + +import org.slf4j.Logger; + +public class JmsConsumeSpanCustomizer extends JmsSpanCustomizer { + + private static final Logger LOGGER = getLogger(JmsConsumeSpanCustomizer.class); + + private static final String SPAN_OPERATION_NAME = "receive"; + private static final String SPAN_KIND_NAME = "CONSUMER"; + public static final String MESSAGING_DESTINATION_KIND = "messaging.destination_kind"; + + /** + * @return a new instance of a {@link JmsConsumeSpanCustomizer}. + */ + public static JmsConsumeSpanCustomizer getJmsConsumeSpanCustomizer() { + return new JmsConsumeSpanCustomizer(); + } + + public void customizeSpan(DistributedTraceContextManager distributedTraceContextManager, JmsTransactionalConnection connection, + String destination, ConsumerType consumerType) { + super.customizeSpan(distributedTraceContextManager, connection, destination); + safeExecute(() -> distributedTraceContextManager + .addCurrentSpanAttribute(MESSAGING_DESTINATION_KIND, getDestinationType(consumerType).toLowerCase(Locale.ROOT)), + "Messaging destination kind data could not be added to span", LOGGER); + } + + @Override + protected String getSpanOperation() { + return SPAN_OPERATION_NAME; + } + + @Override + protected String getSpanKind() { + return SPAN_KIND_NAME; + } +} diff --git a/src/main/java/org/mule/extensions/jms/internal/operation/profiling/tracing/JmsPublishSpanCustomizer.java b/src/main/java/org/mule/extensions/jms/internal/operation/profiling/tracing/JmsPublishSpanCustomizer.java new file mode 100644 index 00000000..2e18b345 --- /dev/null +++ b/src/main/java/org/mule/extensions/jms/internal/operation/profiling/tracing/JmsPublishSpanCustomizer.java @@ -0,0 +1,54 @@ +/* + * Copyright 2023 Salesforce, Inc. All rights reserved. + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.extensions.jms.internal.operation.profiling.tracing; + +import static org.mule.extensions.jms.internal.operation.profiling.tracing.SpanCustomizerUtils.safeExecute; + +import static org.slf4j.LoggerFactory.getLogger; + +import org.mule.extensions.jms.api.destination.DestinationType; +import org.mule.jms.commons.internal.connection.JmsTransactionalConnection; +import org.mule.sdk.api.runtime.source.DistributedTraceContextManager; + +import java.util.Locale; + +import org.slf4j.Logger; + +public class JmsPublishSpanCustomizer extends JmsSpanCustomizer { + + private static final Logger LOGGER = getLogger(JmsPublishSpanCustomizer.class); + + private static final String SPAN_OPERATION_NAME = "send"; + private static final String SPAN_KIND_NAME = "PRODUCER"; + public static final String MESSAGING_DESTINATION_KIND = "messaging.destination_kind"; + + /** + * @return a new instance of a {@link JmsPublishSpanCustomizer}. + */ + public static JmsPublishSpanCustomizer getJmsPublishSpanCustomizer() { + return new JmsPublishSpanCustomizer(); + } + + public void customizeSpan(DistributedTraceContextManager distributedTraceContextManager, JmsTransactionalConnection connection, + String destination, + DestinationType destinationType) { + super.customizeSpan(distributedTraceContextManager, connection, destination); + safeExecute(() -> distributedTraceContextManager + .addCurrentSpanAttribute(MESSAGING_DESTINATION_KIND, destinationType.toString().toLowerCase(Locale.ROOT)), + "Messaging destination kind data could not be added to span", LOGGER); + } + + @Override + protected String getSpanOperation() { + return SPAN_OPERATION_NAME; + } + + @Override + protected String getSpanKind() { + return SPAN_KIND_NAME; + } +} diff --git a/src/main/java/org/mule/extensions/jms/internal/operation/profiling/tracing/JmsSpanCustomizer.java b/src/main/java/org/mule/extensions/jms/internal/operation/profiling/tracing/JmsSpanCustomizer.java new file mode 100644 index 00000000..b09670f3 --- /dev/null +++ b/src/main/java/org/mule/extensions/jms/internal/operation/profiling/tracing/JmsSpanCustomizer.java @@ -0,0 +1,55 @@ +/* + * Copyright 2023 Salesforce, Inc. All rights reserved. + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.extensions.jms.internal.operation.profiling.tracing; + +import static org.mule.extensions.jms.internal.operation.profiling.tracing.SpanCustomizerUtils.safeExecute; +import static org.mule.jms.commons.internal.common.JmsCommons.getDestinationType; + +import static org.slf4j.LoggerFactory.getLogger; + +import org.mule.jms.commons.internal.connection.JmsTransactionalConnection; +import org.mule.sdk.api.runtime.source.DistributedTraceContextManager; + +import java.util.Locale; + +import javax.jms.JMSException; + +import org.slf4j.Logger; + +public abstract class JmsSpanCustomizer { + + private static final Logger LOGGER = getLogger(JmsSpanCustomizer.class); + public static final String MESSAGING_SYSTEM = "messaging.system"; + public static final String MESSAGING_DESTINATION = "messaging.destination"; + public static final String SPAN_KIND = "span.kind.override"; + + protected void customizeSpan(DistributedTraceContextManager distributedTraceContextManager, + JmsTransactionalConnection connection, String destination) { + safeExecute(() -> distributedTraceContextManager.setCurrentSpanName(destination + " " + getSpanOperation()), + "Span name according to semantic conventions could not be added to span", LOGGER); + safeExecute(() -> distributedTraceContextManager + .addCurrentSpanAttribute(MESSAGING_SYSTEM, getMessagingSystem(connection)), + "Messaging system data could not be added to span", LOGGER); + safeExecute(() -> distributedTraceContextManager.addCurrentSpanAttribute(MESSAGING_DESTINATION, destination), + "Messaging destination data could not be added to span", LOGGER); + safeExecute(() -> distributedTraceContextManager.addCurrentSpanAttribute(SPAN_KIND, getSpanKind()), + "Span kind could not be added to span", LOGGER); + } + + protected abstract String getSpanOperation(); + + protected abstract String getSpanKind(); + + private String getMessagingSystem(JmsTransactionalConnection connection) { + try { + return connection.get().getMetaData().getJMSProviderName().toLowerCase(Locale.ROOT); + } catch (JMSException e) { + LOGGER.info("Span connection metadata could not be fetched"); + } + return null; + } +} diff --git a/src/main/java/org/mule/extensions/jms/internal/operation/profiling/tracing/SpanCustomizerUtils.java b/src/main/java/org/mule/extensions/jms/internal/operation/profiling/tracing/SpanCustomizerUtils.java new file mode 100644 index 00000000..40af13a0 --- /dev/null +++ b/src/main/java/org/mule/extensions/jms/internal/operation/profiling/tracing/SpanCustomizerUtils.java @@ -0,0 +1,29 @@ +/* + * Copyright 2023 Salesforce, Inc. All rights reserved. + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.extensions.jms.internal.operation.profiling.tracing; + +import org.slf4j.Logger; + +public class SpanCustomizerUtils { + + /** + * Safely executes a piece of logic. + * + * @param toExecute the piece of logic to execute. + * @param loggingMessage the logging message if a throwable + * @param logger logger used for informing tracing errors. + */ + public static void safeExecute(Runnable toExecute, String loggingMessage, Logger logger) { + try { + toExecute.run(); + } catch (Throwable e) { + if (logger.isWarnEnabled()) { + logger.warn(loggingMessage, e); + } + } + } +} diff --git a/src/test/java/org/mule/extensions/jms/test/AllureConstants.java b/src/test/java/org/mule/extensions/jms/test/AllureConstants.java index 0ae23c07..10cdee18 100644 --- a/src/test/java/org/mule/extensions/jms/test/AllureConstants.java +++ b/src/test/java/org/mule/extensions/jms/test/AllureConstants.java @@ -29,6 +29,8 @@ interface JmsStory { String MESSAGE_TYPES = "Durable Subscriber"; String MESSAGE_FILTERING = "Message Filtering"; + + String TRACING = "Tracing"; } } diff --git a/src/test/java/org/mule/extensions/jms/test/internal/operation/profiling/tracing/JmsSpanCustomizerTestCase.java b/src/test/java/org/mule/extensions/jms/test/internal/operation/profiling/tracing/JmsSpanCustomizerTestCase.java new file mode 100644 index 00000000..9557d357 --- /dev/null +++ b/src/test/java/org/mule/extensions/jms/test/internal/operation/profiling/tracing/JmsSpanCustomizerTestCase.java @@ -0,0 +1,136 @@ +/* + * Copyright 2023 Salesforce, Inc. All rights reserved. + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.extensions.jms.test.internal.operation.profiling.tracing; + +import static org.mule.extensions.jms.api.destination.DestinationType.TOPIC; +import static org.mule.extensions.jms.internal.operation.profiling.tracing.JmsConsumeSpanCustomizer.getJmsConsumeSpanCustomizer; +import static org.mule.extensions.jms.internal.operation.profiling.tracing.JmsPublishSpanCustomizer.getJmsPublishSpanCustomizer; +import static org.mule.extensions.jms.test.AllureConstants.JmsFeature.JmsStory.TRACING; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.mule.extensions.jms.api.destination.DestinationType; +import org.mule.extensions.jms.api.message.JmsMessageBuilder; +import org.mule.extensions.jms.internal.operation.profiling.tracing.JmsConsumeSpanCustomizer; +import org.mule.extensions.jms.internal.operation.profiling.tracing.JmsPublishSpanCustomizer; +import org.mule.jms.commons.api.destination.ConsumerType; +import org.mule.jms.commons.internal.connection.JmsTransactionalConnection; +import org.mule.runtime.api.metadata.DataType; +import org.mule.runtime.api.metadata.TypedValue; +import org.mule.sdk.api.runtime.source.DistributedTraceContextManager; + +import java.util.Locale; +import java.util.OptionalLong; + +import javax.jms.Connection; +import javax.jms.ConnectionMetaData; +import javax.jms.JMSException; + +import io.qameta.allure.Description; +import io.qameta.allure.Issue; +import io.qameta.allure.Story; +import org.junit.Test; + +@Story(TRACING) +@Issue("W-11859222") +public class JmsSpanCustomizerTestCase { + + public static final String MESSAGING_SYSTEM = "messaging.system"; + public static final String MESSAGING_DESTINATION = "messaging.destination"; + public static final String MESSAGING_DESTINATION_KIND = "messaging.destination_kind"; + + @Test + @Description("The consume span customizer informs the distributed trace context manager the correct attributes/name") + public void jmsConsumeSpanCustomizerShouldSetCorrespondingAttributes() throws JMSException { + String messagingSystem = "testActiveMq"; + String destination = "queueName"; + String expectedSpanName = destination + " " + "receive"; + + DistributedTraceContextManager distributedTraceContextManager = mock(DistributedTraceContextManager.class); + JmsTransactionalConnection jmsTransactionalConnection = mock(JmsTransactionalConnection.class); + Connection connection = mock(Connection.class); + ConnectionMetaData connectionMetaData = mock(ConnectionMetaData.class); + ConsumerType consumerType = mock(ConsumerType.class); + + when(jmsTransactionalConnection.get()).thenReturn(connection); + when(connection.getMetaData()).thenReturn(connectionMetaData); + when(connectionMetaData.getJMSProviderName()).thenReturn(messagingSystem); + when(consumerType.topic()).thenReturn(false); + + JmsConsumeSpanCustomizer jmsConsumeSpanCustomizer = getJmsConsumeSpanCustomizer(); + jmsConsumeSpanCustomizer.customizeSpan(distributedTraceContextManager, jmsTransactionalConnection, destination, consumerType); + + verify(distributedTraceContextManager).setCurrentSpanName(expectedSpanName); + verify(distributedTraceContextManager).addCurrentSpanAttribute(MESSAGING_SYSTEM, messagingSystem.toLowerCase(Locale.ROOT)); + verify(distributedTraceContextManager).addCurrentSpanAttribute(MESSAGING_DESTINATION, destination); + verify(distributedTraceContextManager).addCurrentSpanAttribute(MESSAGING_DESTINATION_KIND, "queue"); + } + + @Test + @Description("The publish span customizer informs the distributed trace context manager the correct attributes/name") + public void jmsPublishSpanCustomizerShouldSetCorrespondingAttributes() throws Exception { + String messagingSystem = "testActiveMq"; + String destination = "topicName"; + String expectedSpanName = destination + " " + "send"; + String correlationId = "correlationIdTest1"; + DestinationType destinationType = TOPIC; + TypedValue typedValue = new TypedValue(Object.class, DataType.OBJECT, OptionalLong.of(39L)); + + DistributedTraceContextManager distributedTraceContextManager = mock(DistributedTraceContextManager.class); + JmsTransactionalConnection jmsTransactionalConnection = mock(JmsTransactionalConnection.class); + Connection connection = mock(Connection.class); + ConnectionMetaData connectionMetaData = mock(ConnectionMetaData.class); + JmsMessageBuilder jmsMessageBuilder = mock(JmsMessageBuilder.class); + + when(jmsTransactionalConnection.get()).thenReturn(connection); + when(connection.getMetaData()).thenReturn(connectionMetaData); + when(connectionMetaData.getJMSProviderName()).thenReturn(messagingSystem); + when(jmsMessageBuilder.getCorrelationId()).thenReturn(correlationId); + when(jmsMessageBuilder.getBody()).thenReturn(typedValue); + + JmsPublishSpanCustomizer jmsPublishSpanCustomizer = getJmsPublishSpanCustomizer(); + jmsPublishSpanCustomizer.customizeSpan(distributedTraceContextManager, jmsTransactionalConnection, destination, + destinationType); + + verify(distributedTraceContextManager).setCurrentSpanName(expectedSpanName); + verify(distributedTraceContextManager).addCurrentSpanAttribute(MESSAGING_SYSTEM, messagingSystem.toLowerCase(Locale.ROOT)); + verify(distributedTraceContextManager).addCurrentSpanAttribute(MESSAGING_DESTINATION, destination); + verify(distributedTraceContextManager).addCurrentSpanAttribute(MESSAGING_DESTINATION_KIND, "topic"); + } + + @Test + @Description("The correct functioning of the connector should not be hindered by tracing") + public void jmsSpanCustomizerShouldSetCorrespondingAttributesEvenIfThereIsAnException() throws JMSException { + String messagingSystem = "testActiveMq"; + String destination = "queueName"; + String expectedSpanName = destination + " " + "receive"; + + DistributedTraceContextManager distributedTraceContextManager = mock(DistributedTraceContextManager.class); + JmsTransactionalConnection jmsTransactionalConnection = mock(JmsTransactionalConnection.class); + Connection connection = mock(Connection.class); + ConnectionMetaData connectionMetaData = mock(ConnectionMetaData.class); + ConsumerType consumerType = mock(ConsumerType.class); + + when(jmsTransactionalConnection.get()).thenReturn(connection); + // This will cause a NullPointerException + when(connection.getMetaData()).thenReturn(null); + when(connectionMetaData.getJMSProviderName()).thenReturn(messagingSystem); + when(consumerType.topic()).thenReturn(false); + + JmsConsumeSpanCustomizer jmsConsumeSpanCustomizer = getJmsConsumeSpanCustomizer(); + jmsConsumeSpanCustomizer.customizeSpan(distributedTraceContextManager, jmsTransactionalConnection, destination, consumerType); + + verify(distributedTraceContextManager).setCurrentSpanName(expectedSpanName); + verify(distributedTraceContextManager, never()).addCurrentSpanAttribute(MESSAGING_SYSTEM, + messagingSystem.toLowerCase(Locale.ROOT)); + verify(distributedTraceContextManager).addCurrentSpanAttribute(MESSAGING_DESTINATION, destination); + verify(distributedTraceContextManager).addCurrentSpanAttribute(MESSAGING_DESTINATION_KIND, "queue"); + } +}