Skip to content

Commit 2054a76

Browse files
committed
Fixed NPE with SslMode.TUNNEL Usage
Motivation: A NPE was identified when utilizing `SslMode.TUNNEL`, introduced by PR #204. The issue arises when `ConnectionContext#isMariaDb` is invoked from `SslBridgeHandler#isTls13Enabled`, leading to an NPE due to the ConnectionContext not being initialized at that time. Modification: We have updated ConnectionContext#isMariaDb to return false if the context has not been initialized, preventing the NPE. (Mainly to restore previous behavior) Result: This change addresses the NPE issue, ensuring stability when `SslMode.TUNNEL` is selected. It resolves the problem reported in GoogleCloudPlatform/cloud-sql-jdbc-socket-factory#1828
1 parent 56eb874 commit 2054a76

File tree

2 files changed

+311
-0
lines changed

2 files changed

+311
-0
lines changed

pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
<mbr.version>0.3.0.RELEASE</mbr.version>
8080
<jsr305.version>3.0.2</jsr305.version>
8181
<java-annotations.version>24.1.0</java-annotations.version>
82+
<bouncy-castle.version>1.77</bouncy-castle.version>
8283
</properties>
8384

8485
<dependencyManagement>
@@ -117,6 +118,12 @@
117118
<version>${java-annotations.version}</version>
118119
<scope>provided</scope>
119120
</dependency>
121+
<dependency>
122+
<groupId>org.bouncycastle</groupId>
123+
<artifactId>bcpkix-jdk18on</artifactId>
124+
<version>${bouncy-castle.version}</version>
125+
<scope>test</scope>
126+
</dependency>
120127
</dependencies>
121128
</dependencyManagement>
122129

@@ -240,6 +247,11 @@
240247
<artifactId>jackson-annotations</artifactId>
241248
<scope>test</scope>
242249
</dependency>
250+
<dependency>
251+
<groupId>org.bouncycastle</groupId>
252+
<artifactId>bcpkix-jdk18on</artifactId>
253+
<scope>test</scope>
254+
</dependency>
243255
</dependencies>
244256

245257
<build>
Lines changed: 299 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,299 @@
1+
/*
2+
* Copyright 2024 asyncer.io projects
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.asyncer.r2dbc.mysql;
18+
19+
20+
import io.asyncer.r2dbc.mysql.constant.SslMode;
21+
import io.asyncer.r2dbc.mysql.constant.TlsVersions;
22+
import io.netty.bootstrap.Bootstrap;
23+
import io.netty.bootstrap.ServerBootstrap;
24+
import io.netty.buffer.Unpooled;
25+
import io.netty.channel.Channel;
26+
import io.netty.channel.ChannelFuture;
27+
import io.netty.channel.ChannelFutureListener;
28+
import io.netty.channel.ChannelHandlerContext;
29+
import io.netty.channel.ChannelInboundHandlerAdapter;
30+
import io.netty.channel.ChannelInitializer;
31+
import io.netty.channel.ChannelOption;
32+
import io.netty.channel.nio.NioEventLoopGroup;
33+
import io.netty.channel.socket.SocketChannel;
34+
import io.netty.channel.socket.nio.NioServerSocketChannel;
35+
import io.netty.handler.ssl.SslContext;
36+
import io.netty.handler.ssl.SslContextBuilder;
37+
import io.netty.handler.ssl.util.SelfSignedCertificate;
38+
import org.junit.After;
39+
import org.junit.Before;
40+
import org.junit.Test;
41+
42+
import javax.net.ssl.SSLException;
43+
import java.net.InetSocketAddress;
44+
import java.security.cert.CertificateException;
45+
import java.time.Duration;
46+
47+
import static org.assertj.core.api.Assertions.assertThat;
48+
49+
public class SslTunnelIntegrationTest {
50+
51+
private SelfSignedCertificate server;
52+
53+
private SelfSignedCertificate client;
54+
55+
private SslTunnelServer sslTunnelServer;
56+
57+
@Before
58+
public void setUp() throws CertificateException, SSLException {
59+
server = new SelfSignedCertificate();
60+
client = new SelfSignedCertificate();
61+
final SslContext sslContext = SslContextBuilder.forServer(server.key(), server.cert()).build();
62+
sslTunnelServer = new SslTunnelServer("localhost", 3306, sslContext);
63+
sslTunnelServer.setUp();
64+
}
65+
66+
@After
67+
public void tearDown() throws InterruptedException {
68+
server.delete();
69+
client.delete();
70+
sslTunnelServer.tearDown();
71+
}
72+
73+
@Test
74+
public void sslTunnelConnectionTest() {
75+
final String password = System.getProperty("test.mysql.password");
76+
assertThat(password).withFailMessage("Property test.mysql.password must exists and not be empty")
77+
.isNotNull()
78+
.isNotEmpty();
79+
80+
final MySqlConnectionConfiguration configuration = MySqlConnectionConfiguration
81+
.builder()
82+
.host("localhost")
83+
.port(sslTunnelServer.getLocalPort())
84+
.connectTimeout(Duration.ofSeconds(3))
85+
.user("root")
86+
.password(password)
87+
.database("r2dbc")
88+
.createDatabaseIfNotExist(true)
89+
.sslMode(SslMode.TUNNEL)
90+
.tlsVersion(TlsVersions.TLS1_3, TlsVersions.TLS1_2)
91+
.sslKey(client.privateKey().getAbsolutePath())
92+
.sslCert(client.certificate().getAbsolutePath())
93+
.sslCa(server.certificate().getAbsolutePath())
94+
.build();
95+
96+
final MySqlConnectionFactory connectionFactory = MySqlConnectionFactory.from(configuration);
97+
98+
final MySqlConnection connection = connectionFactory.create().block();
99+
connection.createStatement("SELECT 3").execute()
100+
.flatMap(it -> it.map((row, rowMetadata) -> row.get(0)))
101+
.doOnNext(it -> assertThat(it).isEqualTo(3L))
102+
.blockLast();
103+
104+
connection.close().block();
105+
}
106+
107+
private static class SslTunnelServer {
108+
109+
private final String remoteHost;
110+
111+
private final int remotePort;
112+
113+
private final SslContext sslContext;
114+
115+
private volatile ChannelFuture channelFuture;
116+
117+
118+
private SslTunnelServer(String remoteHost, int remotePort, SslContext sslContext) {
119+
this.remoteHost = remoteHost;
120+
this.remotePort = remotePort;
121+
this.sslContext = sslContext;
122+
}
123+
124+
void setUp() {
125+
// Configure the server.
126+
try {
127+
ServerBootstrap b = new ServerBootstrap();
128+
b.localAddress(0)
129+
.group(new NioEventLoopGroup())
130+
.channel(NioServerSocketChannel.class)
131+
.childHandler(new ProxyInitializer(remoteHost, remotePort, sslContext))
132+
.childOption(ChannelOption.AUTO_READ, false);
133+
134+
135+
// Start the server.
136+
channelFuture = b.bind().sync();
137+
138+
} catch (InterruptedException e) {
139+
e.printStackTrace();
140+
}
141+
}
142+
143+
void tearDown() throws InterruptedException {
144+
channelFuture.channel().close().sync();
145+
}
146+
147+
int getLocalPort() {
148+
return ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
149+
}
150+
151+
}
152+
153+
154+
private static class ProxyInitializer extends ChannelInitializer<SocketChannel> {
155+
156+
private final String remoteHost;
157+
158+
private final int remotePort;
159+
160+
private final SslContext sslContext;
161+
162+
ProxyInitializer(String remoteHost, int remotePort, SslContext sslContext) {
163+
this.remoteHost = remoteHost;
164+
this.remotePort = remotePort;
165+
this.sslContext = sslContext;
166+
}
167+
168+
@Override
169+
public void initChannel(SocketChannel ch) {
170+
ch.pipeline().addLast(sslContext.newHandler(ch.alloc()));
171+
ch.pipeline().addLast(new ProxyFrontendHandler(remoteHost, remotePort));
172+
}
173+
}
174+
175+
private static class ProxyFrontendHandler extends ChannelInboundHandlerAdapter {
176+
177+
private final String remoteHost;
178+
private final int remotePort;
179+
180+
// As we use inboundChannel.eventLoop() when building the Bootstrap this does not need to be volatile as
181+
// the outboundChannel will use the same EventLoop (and therefore Thread) as the inboundChannel.
182+
private Channel outboundChannel;
183+
184+
public ProxyFrontendHandler(String remoteHost, int remotePort) {
185+
this.remoteHost = remoteHost;
186+
this.remotePort = remotePort;
187+
}
188+
189+
@Override
190+
public void channelActive(ChannelHandlerContext ctx) {
191+
final Channel inboundChannel = ctx.channel();
192+
193+
// Start the connection attempt.
194+
Bootstrap b = new Bootstrap();
195+
b.group(inboundChannel.eventLoop())
196+
.channel(ctx.channel().getClass())
197+
.handler(new ProxyBackendHandler(inboundChannel))
198+
.option(ChannelOption.AUTO_READ, false);
199+
ChannelFuture f = b.connect(remoteHost, remotePort);
200+
outboundChannel = f.channel();
201+
f.addListener(new ChannelFutureListener() {
202+
@Override
203+
public void operationComplete(ChannelFuture future) {
204+
if (future.isSuccess()) {
205+
// connection complete start to read first data
206+
inboundChannel.read();
207+
} else {
208+
// Close the connection if the connection attempt has failed.
209+
inboundChannel.close();
210+
}
211+
}
212+
});
213+
}
214+
215+
@Override
216+
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
217+
if (outboundChannel.isActive()) {
218+
outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
219+
@Override
220+
public void operationComplete(ChannelFuture future) {
221+
if (future.isSuccess()) {
222+
// was able to flush out data, start to read the next chunk
223+
ctx.channel().read();
224+
} else {
225+
future.channel().close();
226+
}
227+
}
228+
});
229+
}
230+
}
231+
232+
@Override
233+
public void channelInactive(ChannelHandlerContext ctx) {
234+
if (outboundChannel != null) {
235+
closeOnFlush(outboundChannel);
236+
}
237+
}
238+
239+
@Override
240+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
241+
cause.printStackTrace();
242+
closeOnFlush(ctx.channel());
243+
}
244+
245+
/**
246+
* Closes the specified channel after all queued write requests are flushed.
247+
*/
248+
static void closeOnFlush(Channel ch) {
249+
if (ch.isActive()) {
250+
ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
251+
}
252+
}
253+
}
254+
255+
private static class ProxyBackendHandler extends ChannelInboundHandlerAdapter {
256+
257+
private final Channel inboundChannel;
258+
259+
public ProxyBackendHandler(Channel inboundChannel) {
260+
this.inboundChannel = inboundChannel;
261+
}
262+
263+
@Override
264+
public void channelActive(ChannelHandlerContext ctx) {
265+
if (!inboundChannel.isActive()) {
266+
ProxyFrontendHandler.closeOnFlush(ctx.channel());
267+
} else {
268+
ctx.read();
269+
}
270+
}
271+
272+
@Override
273+
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
274+
inboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
275+
@Override
276+
public void operationComplete(ChannelFuture future) {
277+
if (future.isSuccess()) {
278+
ctx.channel().read();
279+
} else {
280+
future.channel().close();
281+
}
282+
}
283+
});
284+
}
285+
286+
@Override
287+
public void channelInactive(ChannelHandlerContext ctx) {
288+
ProxyFrontendHandler.closeOnFlush(inboundChannel);
289+
}
290+
291+
@Override
292+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
293+
cause.printStackTrace();
294+
ProxyFrontendHandler.closeOnFlush(ctx.channel());
295+
}
296+
}
297+
298+
299+
}

0 commit comments

Comments
 (0)