Skip to content

Commit 684e95c

Browse files
committed
Fixed NPE with SslMode.TUNNEL Usage
Motivation: A NPE was identified when utilizing `SslMode.TUNNEL`, introduced by PR #204. The issue arises when `ConnectionContext#isMariaDb` is invoked from `SslBridgeHandler#isTls13Enabled`, leading to an NPE due to the ConnectionContext not being initialized at that time. Modification: We have updated ConnectionContext#isMariaDb to return false if the context has not been initialized, preventing the NPE. (Mainly to restore previous behavior) Result: This change addresses the NPE issue, ensuring stability when `SslMode.TUNNEL` is selected. resolves GoogleCloudPlatform/cloud-sql-jdbc-socket-factory#1828
1 parent 6982acc commit 684e95c

File tree

4 files changed

+315
-2
lines changed

4 files changed

+315
-2
lines changed

pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
<mbr.version>0.3.0.RELEASE</mbr.version>
8080
<jsr305.version>3.0.2</jsr305.version>
8181
<java-annotations.version>24.1.0</java-annotations.version>
82+
<bouncy-castle.version>1.77</bouncy-castle.version>
8283
</properties>
8384

8485
<dependencyManagement>
@@ -117,6 +118,12 @@
117118
<version>${java-annotations.version}</version>
118119
<scope>provided</scope>
119120
</dependency>
121+
<dependency>
122+
<groupId>org.bouncycastle</groupId>
123+
<artifactId>bcpkix-jdk18on</artifactId>
124+
<version>${bouncy-castle.version}</version>
125+
<scope>test</scope>
126+
</dependency>
120127
</dependencies>
121128
</dependencyManagement>
122129

@@ -240,6 +247,11 @@
240247
<artifactId>jackson-annotations</artifactId>
241248
<scope>test</scope>
242249
</dependency>
250+
<dependency>
251+
<groupId>org.bouncycastle</groupId>
252+
<artifactId>bcpkix-jdk18on</artifactId>
253+
<scope>test</scope>
254+
</dependency>
243255
</dependencies>
244256

245257
<build>

src/main/java/io/asyncer/r2dbc/mysql/ConnectionContext.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ public final class ConnectionContext implements CodecContext {
5757
*/
5858
private volatile short serverStatuses = ServerStatuses.AUTO_COMMIT;
5959

60+
@Nullable
6061
private volatile Capability capability = null;
6162

6263
ConnectionContext(ZeroDateOption zeroDateOption, @Nullable Path localInfilePath,

src/main/java/io/asyncer/r2dbc/mysql/client/SslBridgeHandler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,8 +220,10 @@ static MySqlSslContextSpec forClient(MySqlSslConfiguration ssl, ConnectionContex
220220
.applicationProtocolConfig(null);
221221
String[] tlsProtocols = ssl.getTlsVersion();
222222

223-
if (tlsProtocols.length > 0) {
224-
builder.protocols(tlsProtocols);
223+
if (tlsProtocols.length > 0 || ssl.getSslMode() == SslMode.TUNNEL) {
224+
if (tlsProtocols.length > 0) {
225+
builder.protocols(tlsProtocols);
226+
}
225227
} else if (isTls13Enabled(context)) {
226228
builder.protocols(TLS_PROTOCOLS);
227229
} else {
Lines changed: 298 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,298 @@
1+
/*
2+
* Copyright 2024 asyncer.io projects
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.asyncer.r2dbc.mysql;
18+
19+
20+
import io.asyncer.r2dbc.mysql.constant.SslMode;
21+
import io.asyncer.r2dbc.mysql.constant.TlsVersions;
22+
import io.netty.bootstrap.Bootstrap;
23+
import io.netty.bootstrap.ServerBootstrap;
24+
import io.netty.buffer.Unpooled;
25+
import io.netty.channel.Channel;
26+
import io.netty.channel.ChannelFuture;
27+
import io.netty.channel.ChannelFutureListener;
28+
import io.netty.channel.ChannelHandlerContext;
29+
import io.netty.channel.ChannelInboundHandlerAdapter;
30+
import io.netty.channel.ChannelInitializer;
31+
import io.netty.channel.ChannelOption;
32+
import io.netty.channel.nio.NioEventLoopGroup;
33+
import io.netty.channel.socket.SocketChannel;
34+
import io.netty.channel.socket.nio.NioServerSocketChannel;
35+
import io.netty.handler.ssl.SslContext;
36+
import io.netty.handler.ssl.SslContextBuilder;
37+
import io.netty.handler.ssl.util.SelfSignedCertificate;
38+
import org.junit.After;
39+
import org.junit.Before;
40+
import org.junit.Test;
41+
42+
import javax.net.ssl.SSLException;
43+
import java.net.InetSocketAddress;
44+
import java.security.cert.CertificateException;
45+
import java.time.Duration;
46+
47+
import static org.assertj.core.api.Assertions.assertThat;
48+
49+
public class SslTunnelIntegrationTest {
50+
51+
private SelfSignedCertificate server;
52+
53+
private SelfSignedCertificate client;
54+
55+
private SslTunnelServer sslTunnelServer;
56+
57+
@Before
58+
public void setUp() throws CertificateException, SSLException {
59+
server = new SelfSignedCertificate();
60+
client = new SelfSignedCertificate();
61+
final SslContext sslContext = SslContextBuilder.forServer(server.key(), server.cert()).protocols(TlsVersions.TLS1_1, TlsVersions.TLS1, TlsVersions.TLS1_2, TlsVersions.TLS1_3).build();
62+
sslTunnelServer = new SslTunnelServer("localhost", 3306, sslContext);
63+
sslTunnelServer.setUp();
64+
}
65+
66+
@After
67+
public void tearDown() throws InterruptedException {
68+
server.delete();
69+
client.delete();
70+
sslTunnelServer.tearDown();
71+
}
72+
73+
@Test
74+
public void sslTunnelConnectionTest() {
75+
final String password = System.getProperty("test.mysql.password");
76+
assertThat(password).withFailMessage("Property test.mysql.password must exists and not be empty")
77+
.isNotNull()
78+
.isNotEmpty();
79+
80+
final MySqlConnectionConfiguration configuration = MySqlConnectionConfiguration
81+
.builder()
82+
.host("localhost")
83+
.port(sslTunnelServer.getLocalPort())
84+
.connectTimeout(Duration.ofSeconds(3))
85+
.user("root")
86+
.password(password)
87+
.database("r2dbc")
88+
.createDatabaseIfNotExist(true)
89+
.sslMode(SslMode.TUNNEL)
90+
.sslKey(client.privateKey().getAbsolutePath())
91+
.sslCert(client.certificate().getAbsolutePath())
92+
.sslCa(server.certificate().getAbsolutePath())
93+
.build();
94+
95+
final MySqlConnectionFactory connectionFactory = MySqlConnectionFactory.from(configuration);
96+
97+
final MySqlConnection connection = connectionFactory.create().block();
98+
connection.createStatement("SELECT 3").execute()
99+
.flatMap(it -> it.map((row, rowMetadata) -> row.get(0)))
100+
.doOnNext(it -> assertThat(it).isEqualTo(3L))
101+
.blockLast();
102+
103+
connection.close().block();
104+
}
105+
106+
private static class SslTunnelServer {
107+
108+
private final String remoteHost;
109+
110+
private final int remotePort;
111+
112+
private final SslContext sslContext;
113+
114+
private volatile ChannelFuture channelFuture;
115+
116+
117+
private SslTunnelServer(String remoteHost, int remotePort, SslContext sslContext) {
118+
this.remoteHost = remoteHost;
119+
this.remotePort = remotePort;
120+
this.sslContext = sslContext;
121+
}
122+
123+
void setUp() {
124+
// Configure the server.
125+
try {
126+
ServerBootstrap b = new ServerBootstrap();
127+
b.localAddress(0)
128+
.group(new NioEventLoopGroup())
129+
.channel(NioServerSocketChannel.class)
130+
.childHandler(new ProxyInitializer(remoteHost, remotePort, sslContext))
131+
.childOption(ChannelOption.AUTO_READ, false);
132+
133+
134+
// Start the server.
135+
channelFuture = b.bind().sync();
136+
137+
} catch (InterruptedException e) {
138+
e.printStackTrace();
139+
}
140+
}
141+
142+
void tearDown() throws InterruptedException {
143+
channelFuture.channel().close().sync();
144+
}
145+
146+
int getLocalPort() {
147+
return ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
148+
}
149+
150+
}
151+
152+
153+
private static class ProxyInitializer extends ChannelInitializer<SocketChannel> {
154+
155+
private final String remoteHost;
156+
157+
private final int remotePort;
158+
159+
private final SslContext sslContext;
160+
161+
ProxyInitializer(String remoteHost, int remotePort, SslContext sslContext) {
162+
this.remoteHost = remoteHost;
163+
this.remotePort = remotePort;
164+
this.sslContext = sslContext;
165+
}
166+
167+
@Override
168+
public void initChannel(SocketChannel ch) {
169+
ch.pipeline().addLast(sslContext.newHandler(ch.alloc()));
170+
ch.pipeline().addLast(new ProxyFrontendHandler(remoteHost, remotePort));
171+
}
172+
}
173+
174+
private static class ProxyFrontendHandler extends ChannelInboundHandlerAdapter {
175+
176+
private final String remoteHost;
177+
private final int remotePort;
178+
179+
// As we use inboundChannel.eventLoop() when building the Bootstrap this does not need to be volatile as
180+
// the outboundChannel will use the same EventLoop (and therefore Thread) as the inboundChannel.
181+
private Channel outboundChannel;
182+
183+
public ProxyFrontendHandler(String remoteHost, int remotePort) {
184+
this.remoteHost = remoteHost;
185+
this.remotePort = remotePort;
186+
}
187+
188+
@Override
189+
public void channelActive(ChannelHandlerContext ctx) {
190+
final Channel inboundChannel = ctx.channel();
191+
192+
// Start the connection attempt.
193+
Bootstrap b = new Bootstrap();
194+
b.group(inboundChannel.eventLoop())
195+
.channel(ctx.channel().getClass())
196+
.handler(new ProxyBackendHandler(inboundChannel))
197+
.option(ChannelOption.AUTO_READ, false);
198+
ChannelFuture f = b.connect(remoteHost, remotePort);
199+
outboundChannel = f.channel();
200+
f.addListener(new ChannelFutureListener() {
201+
@Override
202+
public void operationComplete(ChannelFuture future) {
203+
if (future.isSuccess()) {
204+
// connection complete start to read first data
205+
inboundChannel.read();
206+
} else {
207+
// Close the connection if the connection attempt has failed.
208+
inboundChannel.close();
209+
}
210+
}
211+
});
212+
}
213+
214+
@Override
215+
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
216+
if (outboundChannel.isActive()) {
217+
outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
218+
@Override
219+
public void operationComplete(ChannelFuture future) {
220+
if (future.isSuccess()) {
221+
// was able to flush out data, start to read the next chunk
222+
ctx.channel().read();
223+
} else {
224+
future.channel().close();
225+
}
226+
}
227+
});
228+
}
229+
}
230+
231+
@Override
232+
public void channelInactive(ChannelHandlerContext ctx) {
233+
if (outboundChannel != null) {
234+
closeOnFlush(outboundChannel);
235+
}
236+
}
237+
238+
@Override
239+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
240+
cause.printStackTrace();
241+
closeOnFlush(ctx.channel());
242+
}
243+
244+
/**
245+
* Closes the specified channel after all queued write requests are flushed.
246+
*/
247+
static void closeOnFlush(Channel ch) {
248+
if (ch.isActive()) {
249+
ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
250+
}
251+
}
252+
}
253+
254+
private static class ProxyBackendHandler extends ChannelInboundHandlerAdapter {
255+
256+
private final Channel inboundChannel;
257+
258+
public ProxyBackendHandler(Channel inboundChannel) {
259+
this.inboundChannel = inboundChannel;
260+
}
261+
262+
@Override
263+
public void channelActive(ChannelHandlerContext ctx) {
264+
if (!inboundChannel.isActive()) {
265+
ProxyFrontendHandler.closeOnFlush(ctx.channel());
266+
} else {
267+
ctx.read();
268+
}
269+
}
270+
271+
@Override
272+
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
273+
inboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
274+
@Override
275+
public void operationComplete(ChannelFuture future) {
276+
if (future.isSuccess()) {
277+
ctx.channel().read();
278+
} else {
279+
future.channel().close();
280+
}
281+
}
282+
});
283+
}
284+
285+
@Override
286+
public void channelInactive(ChannelHandlerContext ctx) {
287+
ProxyFrontendHandler.closeOnFlush(inboundChannel);
288+
}
289+
290+
@Override
291+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
292+
cause.printStackTrace();
293+
ProxyFrontendHandler.closeOnFlush(ctx.channel());
294+
}
295+
}
296+
297+
298+
}

0 commit comments

Comments
 (0)