Skip to content

Commit

Permalink
Expose some of the implementation to internals required by MQTT.
Browse files Browse the repository at this point in the history
Also update the tests jar for more reusability in projects relying on it for their testing.
  • Loading branch information
vietj committed Sep 18, 2024
1 parent ecd187c commit 49d2754
Show file tree
Hide file tree
Showing 18 changed files with 140 additions and 70 deletions.
2 changes: 1 addition & 1 deletion vertx-core/src/main/java/io/vertx/core/MultiMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public interface MultiMap extends Iterable<Map.Entry<String, String>> {
* @return the multi-map
*/
static MultiMap caseInsensitiveMultiMap() {
return HeadersMultiMap.headers();
return HeadersMultiMap.caseInsensitive();
}

@GenIgnore(GenIgnore.PERMITTED_TYPE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,10 +378,8 @@ public void handleMessage(Object msg) {
if (msg instanceof DatagramPacket) {
DatagramPacket packet = (DatagramPacket) msg;
ByteBuf content = packet.content();
if (content.isDirect()) {
content = VertxHandler.safeBuffer(content);
}
handlePacket(new DatagramPacketImpl(packet.sender(), BufferInternal.buffer(content)));
Buffer buffer = BufferInternal.safeBuffer(content);
handlePacket(new DatagramPacketImpl(packet.sender(), buffer));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@ private void removeChannelHandlers() {
}

private void handleResponseChunk(Stream stream, ByteBuf chunk) {
Buffer buff = BufferInternal.buffer(VertxHandler.safeBuffer(chunk));
Buffer buff = BufferInternal.safeBuffer(chunk);
int len = buff.length();
stream.bytesRead += len;
stream.handleChunk(buff);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ private void onContent(Object msg) {
handleError(content);
return;
}
Buffer buffer = BufferInternal.buffer(VertxHandler.safeBuffer(content.content()));
Buffer buffer = BufferInternal.safeBuffer(content.content());
Http1xServerRequest request = requestInProgress;
request.handleContent(buffer);
//TODO chunk trailers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,16 @@ private static CharSequence toValidCharSequence(Object value) {
}

/**
* @return a case insensitive multi-map suited for HTTP header validation
* @return a case-insensitive multimap suited for HTTP header validation
*/
public static HeadersMultiMap httpHeaders() {
return new HeadersMultiMap(HTTP_VALIDATOR);
}

/**
* @return a all-purpose case insensitive multi-map that does not perform validation
* @return a all-purpose case-insensitive multimap that does not perform validation
*/
public static HeadersMultiMap headers() {
public static HeadersMultiMap caseInsensitive() {
return new HeadersMultiMap();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,24 @@
import io.netty.buffer.ByteBuf;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.buffer.impl.BufferImpl;
import io.vertx.core.net.impl.VertxHandler;

import java.nio.ByteBuffer;
import java.util.Objects;

public interface BufferInternal extends Buffer {

/**
* Create a new Vert.x buffer from a Netty {@code ByteBuf}. Pooled {@code byteBuf} are copied and released,
* otherwise it is wrapped.
*
* @param byteBuf the buffer
* @return a Vert.x buffer to use
*/
static BufferInternal safeBuffer(ByteBuf byteBuf) {
return buffer(VertxHandler.safeBuffer(byteBuf));
}

/**
* <p>
* Create a new buffer from a Netty {@code ByteBuf}.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2011-2024 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.internal.http;

import io.netty.handler.codec.http2.Http2Headers;
import io.vertx.core.MultiMap;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.impl.headers.HeadersAdaptor;
import io.vertx.core.http.impl.headers.Http2HeadersAdaptor;

/**
* HTTP multimap implementations.
*/
public interface HttpHeadersInternal extends HttpHeaders {

/**
* @return a multimap wrapping Netty HTTP {code header} instance
*/
static MultiMap headers(io.netty.handler.codec.http.HttpHeaders headers) {
return new HeadersAdaptor(headers);
}

/**
* @return a multimap wrapping Netty HTTP/2 {code header} instance
*/
static MultiMap headers(Http2Headers headers) {
return new Http2HeadersAdaptor(headers);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -393,9 +393,7 @@ private class DataMessageHandler implements Handler<Object> {
@Override
public void handle(Object msg) {
if (msg instanceof ByteBuf) {
msg = VertxHandler.safeBuffer((ByteBuf) msg);
ByteBuf byteBuf = (ByteBuf) msg;
Buffer buffer = BufferInternal.buffer(byteBuf);
Buffer buffer = BufferInternal.safeBuffer((ByteBuf) msg);
pending.write(buffer);
} else {
handleInvalid(msg);
Expand Down
26 changes: 10 additions & 16 deletions vertx-core/src/main/java/io/vertx/core/net/impl/VertxHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,32 +43,26 @@ private VertxHandler(Function<ChannelHandlerContext, C> connectionFactory) {
}

/**
* Copy and release the {@code buf} when necessary.
* Pooled {@code byteBuf} are copied and released, otherwise it is returned as is.
*
* <p> This methods assuming the has full ownership of the buffer.
*
* <p> This method assumes that pooled buffers are allocated by {@code PooledByteBufAllocator}
*
* <p> The returned buffer will not need to be released and can be wrapped by a {@link io.vertx.core.buffer.Buffer}.
*
* @param buf the buffer
* @return a safe buffer to use
* @param byteBuf the buffer
* @return a buffer safe
*/
public static ByteBuf safeBuffer(ByteBuf buf) {
if (buf != Unpooled.EMPTY_BUFFER && (buf.alloc() instanceof PooledByteBufAllocator || buf instanceof CompositeByteBuf)) {
public static ByteBuf safeBuffer(ByteBuf byteBuf) {
if (byteBuf != Unpooled.EMPTY_BUFFER && (byteBuf.alloc() instanceof PooledByteBufAllocator || byteBuf instanceof CompositeByteBuf)) {
try {
if (buf.isReadable()) {
ByteBuf buffer = VertxByteBufAllocator.DEFAULT.heapBuffer(buf.readableBytes());
buffer.writeBytes(buf, buf.readerIndex(), buf.readableBytes());
if (byteBuf.isReadable()) {
ByteBuf buffer = VertxByteBufAllocator.DEFAULT.heapBuffer(byteBuf.readableBytes());
buffer.writeBytes(byteBuf, byteBuf.readerIndex(), byteBuf.readableBytes());
return buffer;
} else {
return Unpooled.EMPTY_BUFFER;
}
} finally {
buf.release();
byteBuf.release();
}
}
return buf;
return byteBuf;
}

/**
Expand Down
42 changes: 21 additions & 21 deletions vertx-core/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,26 +94,26 @@

// Testing

exports io.vertx.core.impl to io.vertx.tests;
exports io.vertx.core.impl.cpu to io.vertx.tests;
exports io.vertx.core.impl.future to io.vertx.tests;
exports io.vertx.core.impl.utils to io.vertx.tests;
exports io.vertx.core.net.impl to io.vertx.tests;
exports io.vertx.core.shareddata.impl to io.vertx.tests;
exports io.vertx.core.buffer.impl to io.vertx.tests;
exports io.vertx.core.streams.impl to io.vertx.tests;
exports io.vertx.core.eventbus.impl to io.vertx.tests;
exports io.vertx.core.eventbus.impl.clustered to io.vertx.tests;
exports io.vertx.core.spi.cluster.impl to io.vertx.tests;
exports io.vertx.core.file.impl to io.vertx.tests;
exports io.vertx.core.http.impl to io.vertx.tests;
exports io.vertx.core.http.impl.headers to io.vertx.tests;
exports io.vertx.core.http.impl.ws to io.vertx.tests;
exports io.vertx.core.json.pointer.impl to io.vertx.tests;
exports io.vertx.core.impl.transports to io.vertx.tests;
exports io.vertx.core.net.impl.pkcs1 to io.vertx.tests;
exports io.vertx.core.spi.cluster.impl.selector to io.vertx.tests;
exports io.vertx.core.impl.verticle to io.vertx.tests;
exports io.vertx.core.impl.deployment to io.vertx.tests;
exports io.vertx.core.impl to io.vertx.core.tests;
exports io.vertx.core.impl.cpu to io.vertx.core.tests;
exports io.vertx.core.impl.future to io.vertx.core.tests;
exports io.vertx.core.impl.utils to io.vertx.core.tests;
exports io.vertx.core.net.impl to io.vertx.core.tests;
exports io.vertx.core.shareddata.impl to io.vertx.core.tests;
exports io.vertx.core.buffer.impl to io.vertx.core.tests;
exports io.vertx.core.streams.impl to io.vertx.core.tests;
exports io.vertx.core.eventbus.impl to io.vertx.core.tests;
exports io.vertx.core.eventbus.impl.clustered to io.vertx.core.tests;
exports io.vertx.core.spi.cluster.impl to io.vertx.core.tests;
exports io.vertx.core.file.impl to io.vertx.core.tests;
exports io.vertx.core.http.impl to io.vertx.core.tests;
exports io.vertx.core.http.impl.headers to io.vertx.core.tests;
exports io.vertx.core.http.impl.ws to io.vertx.core.tests;
exports io.vertx.core.json.pointer.impl to io.vertx.core.tests;
exports io.vertx.core.impl.transports to io.vertx.core.tests;
exports io.vertx.core.net.impl.pkcs1 to io.vertx.core.tests;
exports io.vertx.core.spi.cluster.impl.selector to io.vertx.core.tests;
exports io.vertx.core.impl.verticle to io.vertx.core.tests;
exports io.vertx.core.impl.deployment to io.vertx.core.tests;

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.List;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertTrue;

public class FileDescriptorLeakDetectorRule implements TestRule {

Expand Down Expand Up @@ -93,7 +93,7 @@ public void evaluate() throws Throwable {

long averageEvaluations = getAverage(iterations);
System.out.println("*** Open file descriptor open file descriptors average " + averageEvaluations);
assertThat(averageEvaluations).isLessThanOrEqualTo(maxBaseLine);
assertTrue(averageEvaluations <= maxBaseLine);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ protected Future<Vertx> clusteredVertx(VertxOptions options, ClusterManager clus
if (created == null) {
created = Collections.synchronizedList(new ArrayList<>());
}
if (clusterManager == null) {
clusterManager = new FakeClusterManager();
}
return createVertxBuilder(options)
.withClusterManager(clusterManager)
.buildClustered().andThen(event -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.vertx.core.spi.cluster.NodeInfo;
import io.vertx.core.spi.cluster.impl.NodeSelector;
import io.vertx.test.core.VertxTestBase;
import io.vertx.test.fakecluster.FakeClusterManager;
import org.junit.Test;

import java.util.*;
Expand All @@ -45,7 +46,9 @@ public void test() throws Exception {
return vertxOptions;
})
.map(options -> {
VertxBootstrap factory = ((VertxBootstrapImpl)VertxBootstrap.create().options(options).init()).clusterNodeSelector(new CustomNodeSelector());
VertxBootstrap factory = ((VertxBootstrapImpl)VertxBootstrap.create().options(options).init())
.clusterManager(new FakeClusterManager())
.clusterNodeSelector(new CustomNodeSelector());
return factory.clusteredVertx();
})
.collect(collectingAndThen(toList(), Future::all));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.cluster.impl.NodeSelector;
import io.vertx.test.core.VertxTestBase;
import io.vertx.test.fakecluster.FakeClusterManager;
import org.junit.Test;

import java.util.Collections;
Expand All @@ -37,7 +38,7 @@ public class MessageQueueOnWorkerThreadTest extends VertxTestBase {
public void setUp() throws Exception {
super.setUp();
CustomNodeSelector selector = new CustomNodeSelector();
VertxBootstrapImpl factory = new VertxBootstrapImpl().init().clusterNodeSelector(selector);
VertxBootstrapImpl factory = new VertxBootstrapImpl().init().clusterManager(new FakeClusterManager()).clusterNodeSelector(selector);
Future<Vertx> fut = factory.clusteredVertx();
vertx = fut.await();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.vertx.core.spi.cluster.impl.NodeSelector;
import io.vertx.core.spi.cluster.impl.DefaultNodeSelector;
import io.vertx.test.core.VertxTestBase;
import io.vertx.test.fakecluster.FakeClusterManager;
import org.junit.Test;

import java.util.Collections;
Expand Down Expand Up @@ -49,7 +50,10 @@ public void selectForPublish(String address, Promise<Iterable<String>> promise)
promise.fail("Not implemented");
}
};
((VertxBootstrapImpl)VertxBootstrap.create().options(options).init()).clusterNodeSelector(nodeSelector).clusteredVertx().onComplete(onSuccess(node -> {
((VertxBootstrapImpl)VertxBootstrap.create().options(options).init())
.clusterManager(new FakeClusterManager())
.clusterNodeSelector(nodeSelector)
.clusteredVertx().onComplete(onSuccess(node -> {
vertx = node;
MessageProducer<String> sender = vertx.eventBus().sender("foo");
sender.write("the_string").onComplete(onFailure(err -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.vertx.core.spi.observability.HttpResponse;
import io.vertx.test.core.TestUtils;
import io.vertx.test.core.VertxTestBase;
import io.vertx.test.fakecluster.FakeClusterManager;
import io.vertx.test.http.HttpTestBase;
import org.junit.Ignore;
import org.junit.Test;
Expand Down Expand Up @@ -77,6 +78,7 @@ public void testFactoryInCluster() throws Exception {
.withClusterManager(getClusterManager())
.withMetrics(factory);
builder
.withClusterManager(new FakeClusterManager())
.buildClustered()
.onComplete(onSuccess(vertx -> {
assertSame(testThread, metricsThread.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public void testCreateClustered() throws Exception {
VertxBootstrap factory = VertxBootstrap.create().init();
CompletableFuture<Vertx> fut = new CompletableFuture<>();
factory.init();
factory.clusterManager(new FakeClusterManager());
factory.clusteredVertx().onComplete(ar -> {
if (ar.succeeded()) {
fut.complete(ar.result());
Expand Down
Loading

0 comments on commit 49d2754

Please sign in to comment.