Skip to content

Commit

Permalink
NetSocketInternal should provide a way to handle events flowing throu…
Browse files Browse the repository at this point in the history
…gh the channel pipeline. This is useful in some cases like exposing the HTTP headers and request URI after a WebSocket handshake when MQTT uses a WebSocket transport.
  • Loading branch information
vietj committed Oct 18, 2021
1 parent dd21282 commit 6540324
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 11 deletions.
6 changes: 5 additions & 1 deletion src/main/java/io/vertx/core/net/impl/ConnectionBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -391,13 +391,17 @@ private void checkCloseHandler(AsyncResult<Void> ar) {
}
}

protected void handleEvent(Object evt) {
// Will release the event if needed
ReferenceCountUtil.release(evt);
}

/**
* Called by the Netty handler when the connection becomes idle. The default implementation closes the
* connection.
* <p/>
* Subclasses can override it to prevent the idle event to happen (e.g when the connection is pooled) or
* perform extra work when the idle event happens.
* @param event
*/
protected void handleIdle(IdleStateEvent event) {
chctx.close();
Expand Down
27 changes: 22 additions & 5 deletions src/main/java/io/vertx/core/net/impl/NetSocketImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class NetSocketImpl extends ConnectionBase implements NetSocketInternal {
private MessageConsumer registration;
private Handler<Buffer> handler;
private Handler<Object> messageHandler;
private Handler<Object> eventHandler;

public NetSocketImpl(ContextInternal context, ChannelHandlerContext channel, SSLHelper helper, TCPMetrics metrics) {
this(context, channel, null, helper, metrics, null);
Expand Down Expand Up @@ -207,11 +208,13 @@ private synchronized Handler<Object> messageHandler() {

@Override
public synchronized NetSocketInternal messageHandler(Handler<Object> handler) {
if (handler == null) {
messageHandler = new DataMessageHandler();
} else {
messageHandler = handler;
}
messageHandler = handler == null ? new DataMessageHandler() : handler;
return this;
}

@Override
public synchronized NetSocketInternal eventHandler(Handler<Object> handler) {
eventHandler = handler;
return this;
}

Expand Down Expand Up @@ -386,10 +389,24 @@ protected void handleClosed() {
super.handleClosed();
}

@Override
public void handleMessage(Object msg) {
context.emit(msg, messageHandler());
}

@Override
protected void handleEvent(Object evt) {
Handler<Object> handler;
synchronized (this) {
handler = eventHandler;
}
if (handler != null) {
context.emit(evt, handler);
} else {
super.handleEvent(evt);
}
}

private class DataMessageHandler implements Handler<Object> {

@Override
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/io/vertx/core/net/impl/NetSocketInternal.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,14 @@ public interface NetSocketInternal extends NetSocket {
*/
NetSocketInternal messageHandler(Handler<Object> handler);

/**
* Set an handler to process pipeline user events.
*
* The handler should take care of releasing event, e.g calling {@code ReferenceCountUtil.release(evt)}.
*
* @param handler the handler to set
* @return a reference to this, so the API can be used fluently
*/
NetSocketInternal eventHandler(Handler<Object> handler);

}
6 changes: 1 addition & 5 deletions src/main/java/io/vertx/core/net/impl/VertxHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,13 @@

package io.vertx.core.net.impl;

import io.netty.buffer.AbstractReferenceCountedByteBuf;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.vertx.core.Handler;
import io.vertx.core.buffer.impl.VertxByteBufAllocator;
Expand Down Expand Up @@ -165,8 +162,7 @@ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exce
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
conn.handleIdle((IdleStateEvent) evt);
} else {
ctx.fireUserEventTriggered(evt);
}
conn.handleEvent(evt);
}
}
28 changes: 28 additions & 0 deletions src/test/java/io/vertx/core/net/NetTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ConnectTimeoutException;
Expand Down Expand Up @@ -3612,6 +3613,33 @@ public void testServerNetSocketShouldBeClosedWhenTheClosedHandlerIsCalled() thro
await();
}

@Test
public void testNetSocketInternalEvent() throws Exception {
server.connectHandler(so -> {
NetSocketInternal soi = (NetSocketInternal) so;
Object expectedEvent = new Object();
soi.eventHandler(event -> {
assertSame(expectedEvent, event);
soi.close();
});
ChannelPipeline pipeline = soi.channelHandlerContext().pipeline();
pipeline.addFirst(new ChannelHandlerAdapter() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
super.handlerAdded(ctx);
ctx.executor().schedule(() -> {
ctx.fireUserEventTriggered(expectedEvent);
}, 10, TimeUnit.MILLISECONDS);
}
});
});
startServer();
client.connect(testAddress, onSuccess(so -> {
so.closeHandler(v -> testComplete());
}));
await();
}

@Test
public void testServerWithIdleTimeoutSendChunkedFile() throws Exception {
testIdleTimeoutSendChunkedFile(true);
Expand Down

0 comments on commit 6540324

Please sign in to comment.