Skip to content

Commit

Permalink
Merge pull request #5295 from eclipse-vertx/internal-changes
Browse files Browse the repository at this point in the history
Internal changes
  • Loading branch information
vietj committed Sep 2, 2024
2 parents f42b5d2 + 7e785e9 commit 528f54a
Show file tree
Hide file tree
Showing 23 changed files with 123 additions and 118 deletions.
9 changes: 5 additions & 4 deletions vertx-core/src/main/java/io/vertx/core/Future.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,12 @@ static CompositeFuture join(List<? extends Future<?>> futures) {
}

/**
* Create a future that hasn't completed yet and that is passed to the {@code handler} before it is returned.
* Create a promise and pass it to the {@code handler}, and then returns this future's promise. The {@code handler}
* is responsible for completing the promise, if the {@code handler} throws an exception, the promise is attempted
* to be failed with this exception.
*
* @param handler the handler
* @param <T> the result type
* @return the future.
* @param handler the handler completing the promise
* @return the future of the created promise
*/
static <T> Future<T> future(Handler<Promise<T>> handler) {
Promise<T> promise = Promise.promise();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,6 @@ public ChannelHandlerContext channelHandlerContext() {
return current.channelHandlerContext();
}

@Override
public Channel channel() {
return current.channel();
}

@Override
public Object metric() {
return current.metric();
Expand Down Expand Up @@ -358,7 +353,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
public void upgradeTo(ChannelHandlerContext ctx, FullHttpResponse upgradeResponse) throws Exception {

// Now we need to upgrade this to an HTTP2
VertxHttp2ConnectionHandler<Http2ClientConnection> handler = Http2ClientConnection.createHttp2ConnectionHandler(upgradedConnection.client, upgradingConnection.metrics, upgradingConnection.getContext(), true, upgradedConnection.current.metric(), upgradedConnection.current.authority(), upgradingConnection.pooled());
VertxHttp2ConnectionHandler<Http2ClientConnection> handler = Http2ClientConnection.createHttp2ConnectionHandler(upgradedConnection.client, upgradingConnection.metrics, upgradingConnection.context(), true, upgradedConnection.current.metric(), upgradedConnection.current.authority(), upgradingConnection.pooled());
upgradingConnection.channel().pipeline().addLast(handler);
handler.connectFuture().addListener(future -> {
if (!future.isSuccess()) {
Expand Down Expand Up @@ -795,8 +790,8 @@ public Future<HttpClientStream> createStream(ContextInternal context) {
}

@Override
public ContextInternal getContext() {
return current.getContext();
public ContextInternal context() {
return current.context();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ private void http1xConnected(HttpVersion version,
conn2.concurrencyChangeHandler(concurrency -> {
// Ignore
});
conn2.createStream(conn.getContext()).onComplete(ar -> {
conn2.createStream(conn.context()).onComplete(ar -> {
if (ar.succeeded()) {
HttpClientStream stream = ar.result();
stream.headHandler(resp -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,6 @@ public interface HttpClientConnectionInternal extends HttpConnection {
*/
boolean pooled();

/**
* @return the connection channel
*/
Channel channel();

/**
* @return the {@link ChannelHandlerContext} of the handler managing the connection
*/
Expand All @@ -86,7 +81,10 @@ public interface HttpClientConnectionInternal extends HttpConnection {
*/
Future<HttpClientStream> createStream(ContextInternal context);

ContextInternal getContext();
/**
* @return the connection context
*/
ContextInternal context();

boolean isValid();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.PromiseInternal;
import io.vertx.core.internal.VertxInternal;
import io.vertx.core.internal.http.HttpClientInternal;
import io.vertx.core.internal.pool.ConnectionPool;
import io.vertx.core.internal.pool.Lease;
import io.vertx.core.net.endpoint.Endpoint;
import io.vertx.core.net.endpoint.impl.EndpointResolverImpl;
import io.vertx.core.http.*;
import io.vertx.core.net.*;
Expand Down Expand Up @@ -191,6 +193,7 @@ private EndpointProvider<EndpointKey, SharedClientHttpStreamEndpoint> httpEndpoi
}
HttpChannelConnector connector = new HttpChannelConnector(HttpClientImpl.this, netClient, key.sslOptions, proxyOptions, clientMetrics, options.getProtocolVersion(), key.ssl, options.isUseAlpn(), key.authority, key.server, true);
return new SharedClientHttpStreamEndpoint(
vertx,
HttpClientImpl.this,
clientMetrics,
poolMetrics,
Expand Down Expand Up @@ -385,20 +388,19 @@ private Future<HttpClientRequest> doRequest(
Boolean followRedirects,
ClientSSLOptions sslOptions,
ProxyOptions proxyConfig) {
ContextInternal ctx = vertx.getOrCreateContext();
ContextInternal connCtx = ctx.isEventLoopContext() ? ctx : vertx.createEventLoopContext(ctx.nettyEventLoop(), ctx.workerPool(), ctx.classLoader());
Promise<HttpClientRequest> promise = ctx.promise();
ContextInternal streamCtx = vertx.getOrCreateContext();
Future<ConnectionObtainedResult> future;
if (endpointResolver != null) {
Future<EndpointServer> fut = endpointResolver
.lookupEndpoint(ctx, server)
.map(endpoint -> endpoint.selectServer(routingKey));
future = fut.compose(lookup -> {
PromiseInternal<Endpoint> promise = vertx.promise();
endpointResolver.lookupEndpoint(server, promise);
future = promise.future()
.map(endpoint -> endpoint.selectServer(routingKey))
.compose(lookup -> {
SocketAddress address = lookup.address();
ProxyOptions proxyOptions = computeProxyOptions(proxyConfig, address);
EndpointKey key = new EndpointKey(useSSL, sslOptions, proxyOptions, address, authority != null ? authority : HostAndPort.create(address.host(), address.port()));
return httpCM.withEndpointAsync(key, httpEndpointProvider(), (endpoint, created) -> {
Future<Lease<HttpClientConnectionInternal>> fut2 = endpoint.requestConnection(connCtx, connectTimeout);
Future<Lease<HttpClientConnectionInternal>> fut2 = endpoint.requestConnection(streamCtx, connectTimeout);
if (fut2 == null) {
return null;
} else {
Expand All @@ -409,7 +411,7 @@ private Future<HttpClientRequest> doRequest(
}
}).compose(lease -> {
HttpClientConnectionInternal conn = lease.get();
return conn.createStream(ctx).map(stream -> {
return conn.createStream(streamCtx).map(stream -> {
HttpClientStream wrapped = new StatisticsGatheringHttpClientStream(stream, endpointRequest);
wrapped.closeHandler(v -> lease.recycle());
return new ConnectionObtainedResult(proxyOptions, wrapped);
Expand All @@ -422,13 +424,13 @@ private Future<HttpClientRequest> doRequest(
ProxyOptions proxyOptions = computeProxyOptions(proxyConfig, (SocketAddress) server);
EndpointKey key = new EndpointKey(useSSL, sslOptions, proxyOptions, (SocketAddress) server, authority);
future = httpCM.withEndpointAsync(key, httpEndpointProvider(), (endpoint, created) -> {
Future<Lease<HttpClientConnectionInternal>> fut = endpoint.requestConnection(connCtx, connectTimeout);
Future<Lease<HttpClientConnectionInternal>> fut = endpoint.requestConnection(streamCtx, connectTimeout);
if (fut == null) {
return null;
} else {
return fut.compose(lease -> {
HttpClientConnectionInternal conn = lease.get();
return conn.createStream(ctx).map(stream -> {
return conn.createStream(streamCtx).map(stream -> {
stream.closeHandler(v -> {
lease.recycle();
});
Expand All @@ -438,12 +440,12 @@ private Future<HttpClientRequest> doRequest(
}
});
} else {
return ctx.failedFuture("Cannot resolve address " + server);
future = streamCtx.failedFuture("Cannot resolve address " + server);
}
if (future == null) {
return connCtx.failedFuture("Cannot resolve address " + server);
return streamCtx.failedFuture("Cannot resolve address " + server);
} else {
future.map(res -> {
return future.map(res -> {
RequestOptions options = new RequestOptions();
options.setMethod(method);
options.setHeaders(headers);
Expand All @@ -454,8 +456,7 @@ private Future<HttpClientRequest> doRequest(
options.setTraceOperation(traceOperation);
HttpClientStream stream = res.stream;
return createRequest(stream.connection(), stream, options);
}).onComplete(promise);
return promise.future();
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public HttpClientRequestPushPromise(
HttpMethod method,
String uri,
MultiMap headers) {
super(connection, stream, stream.connection().getContext().promise(), method, uri);
super(connection, stream, stream.connection().context().promise(), method, uri);
this.stream = stream;
this.headers = headers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public Future<Void> end() {

@Override
public Future<Void> sendFile(String filename, long offset, long length) {
return HttpUtils.resolveFile(conn.getContext(), filename, offset, length)
return HttpUtils.resolveFile(conn.context(), filename, offset, length)
.compose(file -> file
.pipe()
.endOnComplete(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@
*/
public interface HttpServerConnection extends HttpConnection {

ContextInternal getContext();

Channel channel();
/**
* @return the connection context
*/
ContextInternal context();

ChannelHandlerContext channelHandlerContext();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void handle(HttpServerConnection conn) {
conn.invalidRequestHandler(invalidRequestHandler);
if (connectionHandler != null) {
// We hand roll event-loop execution in case of a worker context
ContextInternal ctx = conn.getContext();
ContextInternal ctx = conn.context();
ContextInternal prev = ctx.beginDispatch();
try {
connectionHandler.handle(conn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ public synchronized Future<HttpServer> listen(SocketAddress address) {
}
ContextInternal context = vertx.getOrCreateContext();
ContextInternal listenContext;
// Not sure of this
if (context.isEventLoopContext()) {
listenContext = context;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ private ServerWebSocket acceptHandshake() {
return webSocketConn;
});
CompletableFuture<Void> latch = new CompletableFuture<>();
httpConn.getContext().execute(() -> {
httpConn.context().execute(() -> {
// Must be done on event-loop
pipeline.replace(VertxHandler.class, "handler", handler);
latch.complete(null);
Expand All @@ -520,7 +520,7 @@ private ServerWebSocket acceptHandshake() {
if (METRICS_ENABLED && httpConn.metrics != null) {
webSocket.setMetric(httpConn.metrics.connected(httpConn.metric(), requestMetric, this));
}
webSocket.registerHandler(httpConn.getContext().owner().eventBus());
webSocket.registerHandler(httpConn.context().owner().eventBus());
return webSocket;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.impl.NoStackTraceTimeoutException;
import io.vertx.core.internal.PromiseInternal;
import io.vertx.core.internal.VertxInternal;
import io.vertx.core.internal.pool.ConnectResult;
import io.vertx.core.internal.pool.ConnectionPool;
import io.vertx.core.internal.pool.PoolConnection;
Expand Down Expand Up @@ -59,12 +60,14 @@ class SharedClientHttpStreamEndpoint extends ClientHttpEndpointBase<Lease<HttpCl
return selected;
};

private final VertxInternal vertx;
private final HttpClientImpl client;
private final ClientMetrics clientMetrics;
private final HttpChannelConnector connector;
private final ConnectionPool<HttpClientConnectionInternal> pool;

public SharedClientHttpStreamEndpoint(HttpClientImpl client,
public SharedClientHttpStreamEndpoint(VertxInternal vertx,
HttpClientImpl client,
ClientMetrics clientMetrics,
PoolMetrics poolMetrics,
int queueMaxSize,
Expand All @@ -77,6 +80,7 @@ public SharedClientHttpStreamEndpoint(HttpClientImpl client,
ConnectionPool<HttpClientConnectionInternal> pool = ConnectionPool.pool(this, new int[]{http1MaxSize, http2MaxSize}, queueMaxSize)
.connectionSelector(LIFO_SELECTOR).contextProvider(client.contextProvider());

this.vertx = vertx;
this.client = client;
this.clientMetrics = clientMetrics;
this.connector = connector;
Expand Down Expand Up @@ -177,7 +181,9 @@ void acquire() {
@Override
protected Future<Lease<HttpClientConnectionInternal>> requestConnection2(ContextInternal ctx, long timeout) {
PromiseInternal<Lease<HttpClientConnectionInternal>> promise = ctx.promise();
Request request = new Request(ctx, client.options().getProtocolVersion(), timeout, promise);
// ctx.workerPool() -> not sure we want that in a pool
ContextInternal connCtx = vertx.createEventLoopContext(ctx.nettyEventLoop(), ctx.workerPool(), ctx.classLoader());
Request request = new Request(connCtx, client.options().getProtocolVersion(), timeout, promise);
request.acquire();
return promise.future();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ private void checkPending(Void v) {

@Override
public Future<HttpClientRequest> request(RequestOptions options) {
ContextInternal ctx = actual.getContext().owner().getOrCreateContext();
ContextInternal ctx = actual.context().owner().getOrCreateContext();
return request(ctx, options);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ protected void handleMessage(Object item) {
} else {
Buffer data = (Buffer) item;
int len = data.length();
conn.getContext().emit(null, v -> {
conn.context().emit(null, v -> {
if (stream.state().remoteSideOpen()) {
// Handle the HTTP upgrade case
// buffers are received by HTTP/1 and not accounted by HTTP/2
Expand All @@ -79,7 +79,7 @@ protected void handleMessage(Object item) {
this.priority = HttpUtils.DEFAULT_STREAM_PRIORITY;
this.isConnect = false;
this.writable = true;
this.outboundQueue = new OutboundMessageQueue<>(conn.getContext().nettyEventLoop()) {
this.outboundQueue = new OutboundMessageQueue<>(conn.context().nettyEventLoop()) {
// TODO implement stop drain to optimize flushes ?
@Override
public boolean test(MessageWrite msg) {
Expand Down Expand Up @@ -200,7 +200,7 @@ public boolean isNotWritable() {

public final Future<Void> writeFrame(int type, int flags, ByteBuf payload) {
Promise<Void> promise = context.promise();
EventLoop eventLoop = conn.getContext().nettyEventLoop();
EventLoop eventLoop = conn.context().nettyEventLoop();
if (eventLoop.inEventLoop()) {
doWriteFrame(type, flags, payload, promise);
} else {
Expand All @@ -210,7 +210,7 @@ public final Future<Void> writeFrame(int type, int flags, ByteBuf payload) {
}

public final void writeFrame(int type, int flags, ByteBuf payload, Promise<Void> promise) {
EventLoop eventLoop = conn.getContext().nettyEventLoop();
EventLoop eventLoop = conn.context().nettyEventLoop();
if (eventLoop.inEventLoop()) {
doWriteFrame(type, flags, payload, promise);
} else {
Expand All @@ -224,7 +224,7 @@ private void doWriteFrame(int type, int flags, ByteBuf payload, Promise<Void> pr

final void writeHeaders(Http2Headers headers, boolean first, boolean end, boolean checkFlush, Promise<Void> promise) {
if (first) {
EventLoop eventLoop = conn.getContext().nettyEventLoop();
EventLoop eventLoop = conn.context().nettyEventLoop();
if (eventLoop.inEventLoop()) {
doWriteHeaders(headers, end, checkFlush, promise);
} else {
Expand Down Expand Up @@ -288,7 +288,7 @@ void doWriteData(ByteBuf buf, boolean end, Promise<Void> promise) {
}

final void writeReset(long code) {
EventLoop eventLoop = conn.getContext().nettyEventLoop();
EventLoop eventLoop = conn.context().nettyEventLoop();
if (eventLoop.inEventLoop()) {
doWriteReset(code);
} else {
Expand Down
23 changes: 0 additions & 23 deletions vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -434,29 +434,6 @@ public long setTimer(long delay, Handler<Long> handler) {
return scheduleTimeout(ctx, false, delay, TimeUnit.MILLISECONDS, ctx.isDeployment(), handler);
}

@Override
public <T> PromiseInternal<T> promise() {
ContextInternal context = getOrCreateContext();
return context.promise();
}

public <T> PromiseInternal<T> promise(Promise<T> p) {
if (p instanceof PromiseInternal) {
PromiseInternal<T> promise = (PromiseInternal<T>) p;
if (promise.context() != null) {
return promise;
}
}
PromiseInternal<T> promise = promise();
promise.future().onComplete(p);
return promise;
}

public void runOnContext(Handler<Void> task) {
ContextInternal context = getOrCreateContext();
context.runOnContext(task);
}

// The background pool is used for making blocking calls to legacy synchronous APIs
public WorkerPool getWorkerPool() {
return workerPool;
Expand Down
Loading

0 comments on commit 528f54a

Please sign in to comment.