Skip to content

Commit 1f3e7cf

Browse files
- Resolves some over-use of memory for the r2dbc-mssql and m2dbc-mysql modules
- Resolves a potential token timeout issue with the reactor-3.3.0 module - Implements a new module for r2dbc-mysql 1.1.3+
1 parent 925da72 commit 1f3e7cf

20 files changed

+462
-14
lines changed

instrumentation/r2dbc-mssql/src/main/java/io/r2dbc/mssql/client/R2dbcUtils.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,9 @@ public static InetSocketAddress extractSocketAddress(Client client) {
5353
try {
5454
if(client instanceof ReactorNettyClient_Instrumentation) {
5555
ReactorNettyClient_Instrumentation instrumentedClient = (ReactorNettyClient_Instrumentation) client;
56-
Connection clientConnection = instrumentedClient.clientConnection;
57-
if(clientConnection.channel().remoteAddress() != null && clientConnection.channel().remoteAddress() instanceof InetSocketAddress) {
58-
return (InetSocketAddress) clientConnection.channel().remoteAddress();
59-
}
60-
}
56+
if(instrumentedClient.remoteAddress != null && instrumentedClient.remoteAddress instanceof InetSocketAddress) {
57+
return (InetSocketAddress) instrumentedClient.remoteAddress;
58+
} }
6159
return null;
6260
} catch(Exception exception) {
6361
return null;

instrumentation/r2dbc-mssql/src/main/java/io/r2dbc/mssql/client/ReactorNettyClient_Instrumentation.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44
import com.newrelic.api.agent.weaver.NewField;
55
import com.newrelic.api.agent.weaver.Weave;
66
import reactor.netty.Connection;
7+
import java.net.SocketAddress;
78

89
@Weave(type = MatchType.ExactClass, originalName = "io.r2dbc.mssql.client.ReactorNettyClient")
910
public class ReactorNettyClient_Instrumentation {
1011
@NewField
11-
public final Connection clientConnection;
12+
public final SocketAddress remoteAddress;
1213

1314
private ReactorNettyClient_Instrumentation(Connection connection, TdsEncoder TdsEncoder, ConnectionContext context) {
14-
this.clientConnection = connection;
15+
this.remoteAddress = connection == null ? null :
16+
connection.channel() == null ? null : connection.channel().remoteAddress();
1517
}
1618
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
dependencies {
2+
implementation(project(":agent-bridge"))
3+
implementation(project(":agent-bridge-datastore"))
4+
implementation("io.asyncer:r2dbc-mysql:1.1.3")
5+
testImplementation("ch.vorburger.mariaDB4j:mariaDB4j:2.2.1")
6+
}
7+
8+
jar {
9+
manifest { attributes 'Implementation-Title': 'com.newrelic.instrumentation.r2dbc-mysql-1.1.3' }
10+
}
11+
12+
verifyInstrumentation {
13+
// note the older instrumentation is for the dev.mik: r2dbc-mysql, which only covers 8.2.0
14+
// and this module only covers 1.1.3+, so we currently have a gap from 0.9.0 to 1.1.2
15+
passesOnly 'io.asyncer:r2dbc-mysql:[1.1.3,)'
16+
}
17+
18+
site {
19+
title 'MySQL R2DBC'
20+
type 'Datastore'
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package io.asyncer.r2dbc.mysql;
2+
3+
import com.newrelic.api.agent.weaver.MatchType;
4+
import com.newrelic.api.agent.weaver.Weave;
5+
import com.newrelic.api.agent.weaver.Weaver;
6+
import io.asyncer.r2dbc.mysql.client.Client;
7+
8+
@Weave(type = MatchType.ExactClass, originalName = "io.asyncer.r2dbc.mysql.MySqlStatementSupport")
9+
abstract class MySqlStatementSupport_Instrumentation {
10+
protected final Client client = Weaver.callOriginal();
11+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package io.asyncer.r2dbc.mysql;
2+
3+
import com.newrelic.api.agent.weaver.MatchType;
4+
import com.newrelic.api.agent.weaver.Weave;
5+
import com.newrelic.api.agent.weaver.Weaver;
6+
import io.asyncer.r2dbc.mysql.client.Client;
7+
8+
@Weave(type = MatchType.ExactClass, originalName = "io.asyncer.r2dbc.mysql.ParameterizedStatementSupport")
9+
abstract class ParameterizedStatementSupport_Instrumentation extends MySqlStatementSupport_Instrumentation {
10+
protected final Query query = Weaver.callOriginal();
11+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package io.asyncer.r2dbc.mysql;
2+
3+
import io.asyncer.r2dbc.mysql.api.MySqlResult;
4+
import com.newrelic.api.agent.weaver.MatchType;
5+
import com.newrelic.api.agent.weaver.Weave;
6+
import com.newrelic.api.agent.weaver.Weaver;
7+
import io.asyncer.r2dbc.mysql.client.R2dbcUtils;
8+
import reactor.core.publisher.Flux;
9+
10+
import java.util.List;
11+
12+
@Weave(type = MatchType.ExactClass, originalName = "io.asyncer.r2dbc.mysql.PrepareParameterizedStatement")
13+
final class PrepareParameterizedStatement_Instrumentation extends ParameterizedStatementSupport_Instrumentation {
14+
public Flux<MySqlResult> execute(List<Binding> bindings) {
15+
Flux<MySqlResult> request = Weaver.callOriginal();
16+
if(request != null && this.query != null && this.client != null) {
17+
return R2dbcUtils.wrapRequest(request, query.getFormattedSql(), client);
18+
}
19+
return request;
20+
}
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package io.asyncer.r2dbc.mysql;
2+
3+
import io.asyncer.r2dbc.mysql.api.MySqlResult;
4+
import com.newrelic.api.agent.weaver.MatchType;
5+
import com.newrelic.api.agent.weaver.Weave;
6+
import com.newrelic.api.agent.weaver.Weaver;
7+
import io.asyncer.r2dbc.mysql.client.R2dbcUtils;
8+
import reactor.core.publisher.Flux;
9+
10+
@Weave(type = MatchType.ExactClass, originalName = "io.asyncer.r2dbc.mysql.PrepareSimpleStatement")
11+
final class PrepareSimpleStatement_Instrumentation extends SimpleStatementSupport_Instrumentation {
12+
public Flux<MySqlResult> execute() {
13+
Flux<MySqlResult> request = Weaver.callOriginal();
14+
if(request != null && this.sql != null && this.client != null) {
15+
return R2dbcUtils.wrapRequest(request, this.sql, this.client);
16+
}
17+
return request;
18+
}
19+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package io.asyncer.r2dbc.mysql;
2+
3+
import com.newrelic.api.agent.weaver.MatchType;
4+
import com.newrelic.api.agent.weaver.Weave;
5+
import com.newrelic.api.agent.weaver.Weaver;
6+
import io.asyncer.r2dbc.mysql.client.Client;
7+
8+
@Weave(type = MatchType.ExactClass, originalName = "io.asyncer.r2dbc.mysql.SimpleStatementSupport")
9+
abstract class SimpleStatementSupport_Instrumentation extends MySqlStatementSupport_Instrumentation {
10+
protected final String sql = Weaver.callOriginal();
11+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package io.asyncer.r2dbc.mysql;
2+
3+
import io.asyncer.r2dbc.mysql.api.MySqlResult;
4+
import com.newrelic.api.agent.weaver.MatchType;
5+
import com.newrelic.api.agent.weaver.Weave;
6+
import com.newrelic.api.agent.weaver.Weaver;
7+
import io.asyncer.r2dbc.mysql.client.R2dbcUtils;
8+
import reactor.core.publisher.Flux;
9+
10+
import java.util.List;
11+
12+
@Weave(type = MatchType.ExactClass, originalName = "io.asyncer.r2dbc.mysql.TextParameterizedStatement")
13+
final class TextParameterizedStatement_Instrumentation extends ParameterizedStatementSupport_Instrumentation {
14+
protected Flux<MySqlResult> execute(List<Binding> bindings) {
15+
Flux<MySqlResult> request = Weaver.callOriginal();
16+
if(request != null && this.query != null && this.client != null) {
17+
return R2dbcUtils.wrapRequest(request, String.join("", query.getFormattedSql()), client);
18+
}
19+
return request;
20+
}
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package io.asyncer.r2dbc.mysql;
2+
3+
import io.asyncer.r2dbc.mysql.api.MySqlResult;
4+
import com.newrelic.api.agent.weaver.MatchType;
5+
import com.newrelic.api.agent.weaver.Weave;
6+
import com.newrelic.api.agent.weaver.Weaver;
7+
import io.asyncer.r2dbc.mysql.client.R2dbcUtils;
8+
import reactor.core.publisher.Flux;
9+
10+
@Weave(type = MatchType.ExactClass, originalName = "io.asyncer.r2dbc.mysql.TextSimpleStatement")
11+
final class TextSimpleStatement_Instrumentation extends SimpleStatementSupport_Instrumentation {
12+
public Flux<MySqlResult> execute() {
13+
Flux<MySqlResult> request = Weaver.callOriginal();
14+
if(request != null && this.sql != null && this.client != null) {
15+
return R2dbcUtils.wrapRequest(request, this.sql, this.client);
16+
}
17+
return request;
18+
}
19+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package io.asyncer.r2dbc.mysql.client;
2+
3+
import com.newrelic.agent.bridge.NoOpTransaction;
4+
import com.newrelic.agent.bridge.datastore.DatastoreVendor;
5+
import com.newrelic.agent.bridge.datastore.OperationAndTableName;
6+
import com.newrelic.agent.bridge.datastore.R2dbcObfuscator;
7+
import com.newrelic.agent.bridge.datastore.R2dbcOperation;
8+
import com.newrelic.api.agent.DatastoreParameters;
9+
import com.newrelic.api.agent.NewRelic;
10+
import com.newrelic.api.agent.Segment;
11+
import com.newrelic.api.agent.Transaction;
12+
import io.asyncer.r2dbc.mysql.api.MySqlResult;
13+
import org.reactivestreams.Subscription;
14+
import reactor.core.publisher.Flux;
15+
import reactor.netty.Connection;
16+
17+
import java.net.InetSocketAddress;
18+
19+
import java.util.function.Consumer;
20+
21+
public class R2dbcUtils {
22+
public static Flux<MySqlResult> wrapRequest(Flux<MySqlResult> request, String sql, Client client) {
23+
if(request != null) {
24+
Transaction transaction = NewRelic.getAgent().getTransaction();
25+
if(transaction != null && !(transaction instanceof NoOpTransaction)) {
26+
Segment segment = transaction.startSegment("execute");
27+
return request
28+
.doOnSubscribe(reportExecution(sql, client, segment))
29+
.doFinally((type) -> segment.end());
30+
}
31+
}
32+
return request;
33+
}
34+
35+
private static Consumer<Subscription> reportExecution(String sql, Client client, Segment segment) {
36+
return (subscription) -> {
37+
OperationAndTableName sqlOperation = R2dbcOperation.extractFrom(sql);
38+
InetSocketAddress socketAddress = extractSocketAddress(client);
39+
if (sqlOperation != null && socketAddress != null) {
40+
segment.reportAsExternal(DatastoreParameters
41+
.product(DatastoreVendor.MySQL.name())
42+
.collection(sqlOperation.getTableName())
43+
.operation(sqlOperation.getOperation())
44+
.instance(socketAddress.getHostName(), socketAddress.getPort())
45+
.databaseName(null)
46+
.slowQuery(sql, R2dbcObfuscator.MYSQL_QUERY_CONVERTER)
47+
.build());
48+
}
49+
};
50+
}
51+
52+
public static InetSocketAddress extractSocketAddress(Client client) {
53+
try {
54+
if(client instanceof ReactorNettyClient_Instrumentation) {
55+
ReactorNettyClient_Instrumentation instrumentedClient = (ReactorNettyClient_Instrumentation) client;
56+
if(instrumentedClient.remoteAddress != null && instrumentedClient.remoteAddress instanceof InetSocketAddress) {
57+
return (InetSocketAddress) instrumentedClient.remoteAddress;
58+
}
59+
}
60+
return null;
61+
} catch(Exception exception) {
62+
return null;
63+
}
64+
}
65+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package io.asyncer.r2dbc.mysql.client;
2+
3+
import com.newrelic.api.agent.weaver.MatchType;
4+
import com.newrelic.api.agent.weaver.NewField;
5+
import com.newrelic.api.agent.weaver.Weave;
6+
import io.asyncer.r2dbc.mysql.ConnectionContext;
7+
import io.asyncer.r2dbc.mysql.MySqlSslConfiguration;
8+
import reactor.netty.Connection;
9+
10+
import java.net.SocketAddress;
11+
12+
@Weave(type = MatchType.ExactClass, originalName = "io.asyncer.r2dbc.mysql.client.ReactorNettyClient")
13+
class ReactorNettyClient_Instrumentation {
14+
@NewField
15+
public final SocketAddress remoteAddress;
16+
17+
ReactorNettyClient_Instrumentation(Connection connection, MySqlSslConfiguration ssl, ConnectionContext context) {
18+
this.remoteAddress = connection == null ? null :
19+
connection.channel() == null ? null : connection.channel().remoteAddress();
20+
}
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package com.nr.agent.instrumentation.r2dbc;
2+
3+
import ch.vorburger.mariadb4j.DB;
4+
import ch.vorburger.mariadb4j.DBConfigurationBuilder;
5+
import com.newrelic.agent.introspec.DatastoreHelper;
6+
import com.newrelic.agent.introspec.InstrumentationTestConfig;
7+
import com.newrelic.agent.introspec.InstrumentationTestRunner;
8+
import com.newrelic.agent.introspec.Introspector;
9+
import io.r2dbc.spi.Connection;
10+
import io.r2dbc.spi.ConnectionFactories;
11+
import io.r2dbc.spi.ConnectionFactory;
12+
import org.junit.AfterClass;
13+
import org.junit.Before;
14+
import org.junit.Test;
15+
import org.junit.runner.RunWith;
16+
import reactor.core.publisher.Mono;
17+
18+
import static org.junit.Assert.assertEquals;
19+
20+
@RunWith(InstrumentationTestRunner.class)
21+
@InstrumentationTestConfig(includePrefixes = "io.asyncer.r2dbc.mysql")
22+
public class MySQLInstrumentedTest {
23+
24+
public static DB mariaDb;
25+
public Connection connection;
26+
27+
@Before
28+
public void setup() throws Exception {
29+
String databaseName = "MySQL" + System.currentTimeMillis();
30+
DBConfigurationBuilder builder = DBConfigurationBuilder.newBuilder().setPort(0);
31+
mariaDb = DB.newEmbeddedDB(builder.build());
32+
mariaDb.start();
33+
mariaDb.createDB(databaseName);
34+
mariaDb.source("users.sql", "user", "password", databaseName);
35+
ConnectionFactory connectionFactory = ConnectionFactories.get(builder.getURL(databaseName).replace("jdbc", "r2dbc").replace("localhost", "user:password@localhost"));
36+
connection = Mono.from(connectionFactory.create()).block();
37+
}
38+
39+
@AfterClass
40+
public static void teardown() throws Exception {
41+
mariaDb.stop();
42+
}
43+
44+
@Test
45+
public void testBasicRequests() {
46+
//Given
47+
Introspector introspector = InstrumentationTestRunner.getIntrospector();
48+
DatastoreHelper helper = new DatastoreHelper("MySQL");
49+
50+
//When
51+
R2dbcTestUtils.basicRequests(connection);
52+
53+
//Then
54+
assertEquals(1, introspector.getFinishedTransactionCount(1000));
55+
assertEquals(1, introspector.getTransactionNames().size());
56+
String transactionName = introspector.getTransactionNames().stream().findFirst().orElse("");
57+
helper.assertScopedStatementMetricCount(transactionName, "INSERT", "USERS", 1);
58+
helper.assertScopedStatementMetricCount(transactionName, "SELECT", "USERS", 3);
59+
helper.assertScopedStatementMetricCount(transactionName, "UPDATE", "USERS", 1);
60+
helper.assertScopedStatementMetricCount(transactionName, "DELETE", "USERS", 1);
61+
helper.assertAggregateMetrics();
62+
helper.assertUnscopedOperationMetricCount("INSERT", 1);
63+
helper.assertUnscopedOperationMetricCount("SELECT", 3);
64+
helper.assertUnscopedOperationMetricCount("UPDATE", 1);
65+
helper.assertUnscopedOperationMetricCount("DELETE", 1);
66+
helper.assertUnscopedStatementMetricCount("INSERT", "USERS", 1);
67+
helper.assertUnscopedStatementMetricCount("SELECT", "USERS", 3);
68+
helper.assertUnscopedStatementMetricCount("UPDATE", "USERS", 1);
69+
helper.assertUnscopedStatementMetricCount("DELETE", "USERS", 1);
70+
}
71+
72+
@Test
73+
public void testParametrizedRequests() {
74+
//Given
75+
Introspector introspector = InstrumentationTestRunner.getIntrospector();
76+
DatastoreHelper helper = new DatastoreHelper("MySQL");
77+
78+
//When
79+
R2dbcTestUtils.parametrizedRequests(connection);
80+
81+
//Then
82+
assertEquals(1, introspector.getFinishedTransactionCount(1000));
83+
assertEquals(1, introspector.getTransactionNames().size());
84+
String transactionName = introspector.getTransactionNames().stream().findFirst().orElse("");
85+
helper.assertScopedStatementMetricCount(transactionName, "INSERT", "USERS", 1);
86+
helper.assertScopedStatementMetricCount(transactionName, "SELECT", "USERS", 3);
87+
helper.assertScopedStatementMetricCount(transactionName, "UPDATE", "USERS", 1);
88+
helper.assertScopedStatementMetricCount(transactionName, "DELETE", "USERS", 1);
89+
helper.assertAggregateMetrics();
90+
helper.assertUnscopedOperationMetricCount("INSERT", 1);
91+
helper.assertUnscopedOperationMetricCount("SELECT", 3);
92+
helper.assertUnscopedOperationMetricCount("UPDATE", 1);
93+
helper.assertUnscopedOperationMetricCount("DELETE", 1);
94+
helper.assertUnscopedStatementMetricCount("INSERT", "USERS", 1);
95+
helper.assertUnscopedStatementMetricCount("SELECT", "USERS", 3);
96+
helper.assertUnscopedStatementMetricCount("UPDATE", "USERS", 1);
97+
helper.assertUnscopedStatementMetricCount("DELETE", "USERS", 1);
98+
}
99+
}

0 commit comments

Comments
 (0)