Skip to content

Commit a4e8bb5

Browse files
committed
Add new metrics for rabbitmq
1 parent 3d59d61 commit a4e8bb5

File tree

15 files changed

+306
-52
lines changed

15 files changed

+306
-52
lines changed

agent-bridge/src/main/java/com/newrelic/agent/bridge/external/MessageConsumeParameters.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public class MessageConsumeParameters extends com.newrelic.api.agent.MessageCons
2626
@Deprecated
2727
protected MessageConsumeParameters(String library, DestinationType destinationType, String destinationName,
2828
InboundHeaders inboundHeaders) {
29-
super(library, destinationType.toApiDestinationType(), destinationName, inboundHeaders, null, null, null);
29+
super(library, destinationType.toApiDestinationType(), destinationName, inboundHeaders, null, null, null, null, null);
3030
}
3131

3232
}

agent-bridge/src/main/java/com/newrelic/agent/bridge/external/MessageProduceParameters.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public class MessageProduceParameters extends com.newrelic.api.agent.MessageProd
2626
@Deprecated
2727
protected MessageProduceParameters(String library, DestinationType destinationType, String destinationName,
2828
OutboundHeaders outboundHeaders) {
29-
super(library, destinationType.toApiDestinationType(), destinationName, outboundHeaders, null, null, null);
29+
super(library, destinationType.toApiDestinationType(), destinationName, outboundHeaders, null, null, null, null);
3030
}
3131

3232
}

functional_test/src/test/java/test/newrelic/test/agent/api/ApiTest.java

Lines changed: 141 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2030,6 +2030,56 @@ public void testMessagingAPIWithHostAndPort() throws Exception {
20302030
}
20312031
}
20322032

2033+
@Test
2034+
public void testMessagingAPIWithAmqpQueueEndpoint() throws Exception {
2035+
// override default agent config to disabled distributed tracing and use CAT instead
2036+
EnvironmentHolder holder = setupEnvironmentHolder(CAT_CONFIG_FILE, "cat_enabled_dt_disabled_test");
2037+
MessagingTestServer server = new MessagingTestServer(8088);
2038+
2039+
try {
2040+
server.start();
2041+
runTestMessagingAPIWithAmqpQueueEndpoint();
2042+
String messageBrokerMetric = "MessageBroker/RabbitMQ/Exchange/Consume/Named/SomeExchange";
2043+
String endpointMetric = String.format("MessageBroker/instance/%s/8088/Exchange/Named/SomeExchange", HOSTNAME);
2044+
String amqpEndpointMetric = String.format("MessageBroker/instance/%s/8088/Exchange/Named/SomeExchange/Queue/SomeQueue", HOSTNAME);
2045+
Assert.assertTrue("The following metric should exist: " + messageBrokerMetric, apiTestHelper.tranStats.getScopedStats().getStatsMap().containsKey(messageBrokerMetric));
2046+
Assert.assertTrue("The following metric should exist: " + endpointMetric, apiTestHelper.tranStats.getUnscopedStats().getStatsMap().containsKey(endpointMetric));
2047+
Assert.assertTrue("The following metric should exist: " + amqpEndpointMetric, apiTestHelper.tranStats.getUnscopedStats().getStatsMap().containsKey(amqpEndpointMetric));
2048+
} catch (IOException e) {
2049+
e.printStackTrace();
2050+
Assert.fail();
2051+
} finally {
2052+
Transaction.clearTransaction();
2053+
server.closeAllConnections();
2054+
holder.close();
2055+
}
2056+
}
2057+
2058+
@Test
2059+
public void testMessagingAPIAmqpRoutingKeyOnlyEndpoint() throws Exception {
2060+
// override default agent config to disabled distributed tracing and use CAT instead
2061+
EnvironmentHolder holder = setupEnvironmentHolder(CAT_CONFIG_FILE, "cat_enabled_dt_disabled_test");
2062+
MessagingTestServer server = new MessagingTestServer(8088);
2063+
2064+
try {
2065+
server.start();
2066+
runTestMessagingAPIWithAmqpRoutingKeyOnlyEndpoint();
2067+
String messageBrokerMetric = "MessageBroker/RabbitMQ/Exchange/Consume/Named/SomeExchange";
2068+
String endpointMetric = String.format("MessageBroker/instance/%s/8088/Exchange/Named/SomeExchange", HOSTNAME);
2069+
String amqpEndpointMetric = String.format("MessageBroker/instance/%s/8088/Exchange/Named/SomeExchange/RoutingKey/SomeRoutingKey", HOSTNAME);
2070+
Assert.assertTrue("The following metric should exist: " + messageBrokerMetric, apiTestHelper.tranStats.getScopedStats().getStatsMap().containsKey(messageBrokerMetric));
2071+
Assert.assertTrue("The following metric should exist: " + endpointMetric, apiTestHelper.tranStats.getUnscopedStats().getStatsMap().containsKey(endpointMetric));
2072+
Assert.assertTrue("The following metric should exist: " + amqpEndpointMetric, apiTestHelper.tranStats.getUnscopedStats().getStatsMap().containsKey(amqpEndpointMetric));
2073+
} catch (IOException e) {
2074+
e.printStackTrace();
2075+
Assert.fail();
2076+
} finally {
2077+
Transaction.clearTransaction();
2078+
server.closeAllConnections();
2079+
holder.close();
2080+
}
2081+
}
2082+
20332083
@Trace(dispatcher = true)
20342084
private void runTestMessagingAPI() {
20352085
URL myURL = null;
@@ -2103,7 +2153,97 @@ private void runTestMessagingAPIWithHostAndPort() {
21032153
.destinationType(DestinationType.TEMP_QUEUE)
21042154
.destinationName("Message Destination")
21052155
.inboundHeaders(new ApiTestHelper.InboundWrapper(response, HeaderType.MESSAGE))
2106-
.hostAndPort(myURL.getHost(), myURL.getPort())
2156+
.instance(myURL.getHost(), myURL.getPort())
2157+
.build();
2158+
NewRelic.getAgent().getTracedMethod().reportAsExternal(messageResponseParameters);
2159+
2160+
Assert.assertTrue(response.getHeaders("NewRelicAppData").length != 0);
2161+
} catch (Exception e) {
2162+
e.printStackTrace();
2163+
Assert.fail();
2164+
}
2165+
}
2166+
2167+
@Trace(dispatcher = true)
2168+
private void runTestMessagingAPIWithAmqpQueueEndpoint() {
2169+
URL myURL = null;
2170+
try {
2171+
Thread.sleep(600);
2172+
myURL = new URL("http://localhost:8088");
2173+
HttpUriRequest request = RequestBuilder.get().setUri(myURL.toURI()).build();
2174+
2175+
ApiTestHelper.OutboundWrapper outboundRequestWrapper = new ApiTestHelper.OutboundWrapper(request, HeaderType.MESSAGE);
2176+
2177+
// MessageProducer
2178+
ExternalParameters messageProduceParameters = MessageProduceParameters
2179+
.library("RabbitMQ")
2180+
.destinationType(DestinationType.EXCHANGE)
2181+
.destinationName("SomeExchange")
2182+
.outboundHeaders(outboundRequestWrapper)
2183+
.instance(myURL.getHost(), myURL.getPort())
2184+
.amqp("SomeQueue")
2185+
.build();
2186+
NewRelic.getAgent().getTracedMethod().reportAsExternal(messageProduceParameters);
2187+
2188+
Assert.assertTrue(request.getHeaders("NewRelicID").length != 0);
2189+
Assert.assertTrue(request.getHeaders("NewRelicTransaction").length != 0);
2190+
2191+
CloseableHttpClient connection = HttpClientBuilder.create().build();
2192+
CloseableHttpResponse response = connection.execute(request);
2193+
2194+
// MessageConsumer
2195+
ExternalParameters messageResponseParameters = MessageConsumeParameters
2196+
.library("RabbitMQ")
2197+
.destinationType(DestinationType.EXCHANGE)
2198+
.destinationName("SomeExchange")
2199+
.inboundHeaders(new ApiTestHelper.InboundWrapper(response, HeaderType.MESSAGE))
2200+
.instance(myURL.getHost(), myURL.getPort())
2201+
.amqp("SomeQueue", "SomeRoutingkey")
2202+
.build();
2203+
NewRelic.getAgent().getTracedMethod().reportAsExternal(messageResponseParameters);
2204+
2205+
Assert.assertTrue(response.getHeaders("NewRelicAppData").length != 0);
2206+
} catch (Exception e) {
2207+
e.printStackTrace();
2208+
Assert.fail();
2209+
}
2210+
}
2211+
2212+
@Trace(dispatcher = true)
2213+
private void runTestMessagingAPIWithAmqpRoutingKeyOnlyEndpoint() {
2214+
URL myURL = null;
2215+
try {
2216+
Thread.sleep(600);
2217+
myURL = new URL("http://localhost:8088");
2218+
HttpUriRequest request = RequestBuilder.get().setUri(myURL.toURI()).build();
2219+
2220+
ApiTestHelper.OutboundWrapper outboundRequestWrapper = new ApiTestHelper.OutboundWrapper(request, HeaderType.MESSAGE);
2221+
2222+
// MessageProducer
2223+
ExternalParameters messageProduceParameters = MessageProduceParameters
2224+
.library("RabbitMQ")
2225+
.destinationType(DestinationType.EXCHANGE)
2226+
.destinationName("SomeExchange")
2227+
.outboundHeaders(outboundRequestWrapper)
2228+
.instance(myURL.getHost(), myURL.getPort())
2229+
.amqp("SomeRoutingKey")
2230+
.build();
2231+
NewRelic.getAgent().getTracedMethod().reportAsExternal(messageProduceParameters);
2232+
2233+
Assert.assertTrue(request.getHeaders("NewRelicID").length != 0);
2234+
Assert.assertTrue(request.getHeaders("NewRelicTransaction").length != 0);
2235+
2236+
CloseableHttpClient connection = HttpClientBuilder.create().build();
2237+
CloseableHttpResponse response = connection.execute(request);
2238+
2239+
// MessageConsumer
2240+
ExternalParameters messageResponseParameters = MessageConsumeParameters
2241+
.library("RabbitMQ")
2242+
.destinationType(DestinationType.EXCHANGE)
2243+
.destinationName("SomeExchange")
2244+
.inboundHeaders(new ApiTestHelper.InboundWrapper(response, HeaderType.MESSAGE))
2245+
.instance(myURL.getHost(), myURL.getPort())
2246+
.amqp(null, "SomeRoutingKey")
21072247
.build();
21082248
NewRelic.getAgent().getTracedMethod().reportAsExternal(messageResponseParameters);
21092249

instrumentation/jms-1.1/src/main/java/com/nr/agent/instrumentation/jms11/JmsMetricUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ public static void processConsume(Message message, TracedMethod tracer) {
159159
.inboundHeaders(new InboundWrapper(message));
160160
BrokerInstance brokerInstance = getHostAndPort(message);
161161
if (brokerInstance != null) {
162-
builder = builder.hostAndPort(brokerInstance.getHostName(), brokerInstance.getPort());
162+
builder = builder.instance(brokerInstance.getHostName(), brokerInstance.getPort());
163163
}
164164
tracer.reportAsExternal(builder.build());
165165
} catch (JMSException exception) {

instrumentation/jms-3/src/main/java/com/nr/agent/instrumentation/jms3/JmsMetricUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ public static void processConsume(Message message, TracedMethod tracer) {
159159
.inboundHeaders(new InboundWrapper(message));
160160
BrokerInstance brokerInstance = getHostAndPort(message);
161161
if (brokerInstance != null) {
162-
builder = builder.hostAndPort(brokerInstance.getHostName(), brokerInstance.getPort());
162+
builder = builder.instance(brokerInstance.getHostName(), brokerInstance.getPort());
163163
}
164164
tracer.reportAsExternal(builder.build());
165165
} catch (JMSException exception) {

instrumentation/rabbit-amqp-1.7.2/src/main/java/com/nr/agent/instrumentation/rabbitamqp172/RabbitAMQPMetricUtil.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public static void processSendMessage(String exchangeName, String routingKey,
4949
.destinationName(exchangeName.isEmpty() ? DEFAULT : exchangeName)
5050
.outboundHeaders(new OutboundWrapper(headers))
5151
.instance(getHost(connection), getPort(connection))
52+
.amqp(routingKey)
5253
.build());
5354

5455
addAttributes(routingKey, props);
@@ -61,7 +62,8 @@ public static void processGetMessage(String queueName, String routingKey, String
6162
.destinationType(DestinationType.EXCHANGE)
6263
.destinationName(exchangeName.isEmpty() ? DEFAULT : exchangeName)
6364
.inboundHeaders(new InboundWrapper(properties.getHeaders()))
64-
.hostAndPort(getHost(connection), getPort(connection))
65+
.instance(getHost(connection), getPort(connection))
66+
.amqp(queueName, routingKey)
6567
.build());
6668

6769
addConsumeAttributes(queueName, routingKey, properties);

instrumentation/rabbit-amqp-2.4.1/src/main/java/com/nr/agent/instrumentation/rabbitamqp241/RabbitAMQPMetricUtil.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public static void processSendMessage(String exchangeName, String routingKey,
5050
.destinationName(exchangeName.isEmpty() ? DEFAULT : exchangeName)
5151
.outboundHeaders(new OutboundWrapper(headers))
5252
.instance(getHost(connection), getPort(connection))
53+
.amqp(routingKey)
5354
.build());
5455

5556
addAttributes(routingKey, props);
@@ -62,7 +63,8 @@ public static void processGetMessage(String queueName, String routingKey, String
6263
.destinationType(DestinationType.EXCHANGE)
6364
.destinationName(exchangeName.isEmpty() ? DEFAULT : exchangeName)
6465
.inboundHeaders(new InboundWrapper(properties.getHeaders()))
65-
.hostAndPort(getHost(connection), getPort(connection))
66+
.instance(getHost(connection), getPort(connection))
67+
.amqp(queueName, routingKey)
6668
.build());
6769

6870
addConsumeAttributes(queueName, routingKey, properties);

instrumentation/rabbit-amqp-2.5.0/src/main/java/com/nr/agent/instrumentation/rabbitamqp250/RabbitAMQPMetricUtil.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public static void processSendMessage(String exchangeName, String routingKey, Ma
4848
.destinationName(exchangeName.isEmpty() ? DEFAULT : exchangeName)
4949
.outboundHeaders(new OutboundWrapper(headers))
5050
.instance(getHost(connection), getPort(connection))
51+
.amqp(routingKey)
5152
.build());
5253

5354
addAttributes(routingKey, props);
@@ -60,7 +61,8 @@ public static void processGetMessage(String queueName, String routingKey, String
6061
.destinationType(DestinationType.EXCHANGE)
6162
.destinationName(exchangeName.isEmpty() ? DEFAULT : exchangeName)
6263
.inboundHeaders(new InboundWrapper(properties.getHeaders()))
63-
.hostAndPort(getHost(connection), getPort(connection))
64+
.instance(getHost(connection), getPort(connection))
65+
.amqp(queueName, routingKey)
6466
.build());
6567

6668
addConsumeAttributes(queueName, routingKey, properties);

instrumentation/rabbit-amqp-2.7.0/src/main/java/com/nr/agent/instrumentation/rabbitamqp270/RabbitAMQPMetricUtil.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public static void processSendMessage(String exchangeName, String routingKey, Ma
4848
.destinationName(exchangeName.isEmpty() ? DEFAULT : exchangeName)
4949
.outboundHeaders(new OutboundWrapper(headers))
5050
.instance(getHost(connection), getPort(connection))
51+
.amqp(routingKey)
5152
.build());
5253

5354
addAttributes(routingKey, props);
@@ -60,7 +61,8 @@ public static void processGetMessage(String queueName, String routingKey, String
6061
.destinationType(DestinationType.EXCHANGE)
6162
.destinationName(exchangeName.isEmpty() ? DEFAULT : exchangeName)
6263
.inboundHeaders(new InboundWrapper(properties.getHeaders()))
63-
.hostAndPort(getHost(connection), getPort(connection))
64+
.instance(getHost(connection), getPort(connection))
65+
.amqp(queueName, routingKey)
6466
.build());
6567

6668
addConsumeAttributes(queueName, routingKey, properties);

instrumentation/rabbit-amqp-5.0.0/src/main/java/com/nr/agent/instrumentation/rabbitamqp500/RabbitAMQPMetricUtil.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public static void processSendMessage(String exchangeName, String routingKey, Ma
4949
.destinationName(exchangeName.isEmpty() ? DEFAULT : exchangeName)
5050
.outboundHeaders(new OutboundWrapper(headers))
5151
.instance(getHost(connection), getPort(connection))
52+
.amqp(routingKey)
5253
.build());
5354

5455
addAttributes(routingKey, props);
@@ -61,7 +62,8 @@ public static void processGetMessage(String queueName, String routingKey, String
6162
.destinationType(DestinationType.EXCHANGE)
6263
.destinationName(exchangeName.isEmpty() ? DEFAULT : exchangeName)
6364
.inboundHeaders(new InboundWrapper(properties.getHeaders()))
64-
.hostAndPort(getHost(connection), getPort(connection))
65+
.instance(getHost(connection), getPort(connection))
66+
.amqp(queueName, routingKey)
6567
.build());
6668

6769
addConsumeAttributes(queueName, routingKey, properties);

newrelic-agent/src/main/java/com/newrelic/agent/messaging/MessageMetrics.java

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,34 @@ public class MessageMetrics {
1717
public static final String SLASH = "/";
1818
public static final String MESSAGE_BROKER_INSTANCE = METRIC_NAMESPACE + "/instance/";
1919

20+
public static final String ROUTING_KEY = "RoutingKey";
21+
2022
public static final String UNKNOWN = "unknown";
2123
public static String HOSTNAME = Hostname.getHostname(ServiceFactory.getConfigService().getDefaultAgentConfig());
2224

2325
public static boolean isAnyEndpointParamsKnown(String host, Integer port) {
2426
return !(isParamUnknown(host) && isParamUnknown(port));
2527
}
2628
public static void collectMessageProducerRollupMetrics(TracedMethod method, String host, Integer port,
27-
DestinationType destinationType, String destinationName) {
28-
reportInstanceIfEnabled(method, host, port, destinationType, destinationName);
29+
DestinationType destinationType, String destinationName, String amqpRoutingKey) {
30+
reportInstanceIfEnabled(method, host, port, destinationType, destinationName, null, amqpRoutingKey);
2931
}
3032

3133
public static void collectMessageConsumerRollupMetrics(TracedMethod method, String host, Integer port,
32-
DestinationType destinationType, String destinationName) {
33-
reportInstanceIfEnabled(method, host, port, destinationType, destinationName);
34+
DestinationType destinationType, String destinationName, String amqpQueueName, String amqpRoutingKey) {
35+
reportInstanceIfEnabled(method, host, port, destinationType, destinationName, amqpQueueName, amqpRoutingKey);
3436
}
3537

3638
public static void reportInstanceIfEnabled(TracedMethod method, String host, Integer port,
37-
DestinationType destinationType, String destinationName) {
39+
DestinationType destinationType, String destinationName, String amqpQueueName, String amqpRoutingKey) {
3840
MessageBrokerConfig messageBrokerConfig = ServiceFactory.getConfigService().getDefaultAgentConfig().getMessageBrokerConfig();
3941
if (messageBrokerConfig.isInstanceReportingEnabled()) {
40-
String metric = buildInstanceMetric(host, port, destinationType, destinationName);
41-
method.addRollupMetricName(metric);
42+
String instanceMetric = buildInstanceMetric(host, port, destinationType, destinationName);
43+
method.addRollupMetricName(instanceMetric);
44+
if (isAmqpCall(destinationType, amqpQueueName, amqpRoutingKey)) {
45+
String amqpInstance = buildAmqpInstanceMetric(instanceMetric, destinationType, amqpQueueName, amqpRoutingKey);
46+
method.addRollupMetricName(amqpInstance);
47+
}
4248
}
4349
}
4450

@@ -57,6 +63,22 @@ public static String buildInstanceIdentifier(String host, Integer port,
5763
return hostname + SLASH + portName + SLASH + destinationType.getTypeName() + SLASH + parsedDestinationName;
5864
}
5965

66+
public static String buildAmqpInstanceMetric(String instanceMetric, DestinationType destinationType, String amqpQueueName, String amqpRoutingKey) {
67+
String amqpSuffix = buildAmqpInstance(destinationType, amqpQueueName, amqpRoutingKey);
68+
return instanceMetric + SLASH + amqpSuffix;
69+
}
70+
71+
public static String buildAmqpInstance(DestinationType destinationType, String amqpQueueName, String amqpRoutingKey) {
72+
if (DestinationType.EXCHANGE.equals(destinationType)) {
73+
if (!isParamUnknown(amqpQueueName)) {
74+
return DestinationType.NAMED_QUEUE.getTypeName() + SLASH + amqpQueueName;
75+
} else if (!isParamUnknown(amqpRoutingKey)) {
76+
return ROUTING_KEY + SLASH + amqpRoutingKey;
77+
}
78+
}
79+
return "";
80+
}
81+
6082
public static String replaceDestinationName(DestinationType destinationType, String destinationName) {
6183
if (destinationType == DestinationType.TEMP_QUEUE || destinationType == DestinationType.TEMP_TOPIC) {
6284
return "Temp";
@@ -88,6 +110,10 @@ public static String replacePort(Integer port) {
88110
return String.valueOf(port);
89111
}
90112

113+
private static boolean isAmqpCall(DestinationType destinationType, String amqpQueueName, String amqpRoutingKey) {
114+
return DestinationType.EXCHANGE.equals(destinationType) && !(isParamUnknown(amqpQueueName) && isParamUnknown(amqpRoutingKey));
115+
}
116+
91117
private static boolean isParamUnknown(String str) {
92118
return str == null || str.isEmpty();
93119
}

0 commit comments

Comments
 (0)