Skip to content

Commit

Permalink
Improve dealing with context internally.
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Sep 2, 2024
1 parent aeff192 commit 7e785e9
Show file tree
Hide file tree
Showing 18 changed files with 111 additions and 98 deletions.
9 changes: 5 additions & 4 deletions vertx-core/src/main/java/io/vertx/core/Future.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,12 @@ static CompositeFuture join(List<? extends Future<?>> futures) {
}

/**
* Create a future that hasn't completed yet and that is passed to the {@code handler} before it is returned.
* Create a promise and pass it to the {@code handler}, and then returns this future's promise. The {@code handler}
* is responsible for completing the promise, if the {@code handler} throws an exception, the promise is attempted
* to be failed with this exception.
*
* @param handler the handler
* @param <T> the result type
* @return the future.
* @param handler the handler completing the promise
* @return the future of the created promise
*/
static <T> Future<T> future(Handler<Promise<T>> handler) {
Promise<T> promise = Promise.promise();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,6 @@ public ChannelHandlerContext channelHandlerContext() {
return current.channelHandlerContext();
}

@Override
public Channel channel() {
return current.channel();
}

@Override
public Object metric() {
return current.metric();
Expand Down Expand Up @@ -795,8 +790,8 @@ public Future<HttpClientStream> createStream(ContextInternal context) {
}

@Override
public ContextInternal getContext() {
return current.getContext();
public ContextInternal context() {
return current.context();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,6 @@ public interface HttpClientConnectionInternal extends HttpConnection {
*/
boolean pooled();

/**
* @return the connection channel
*/
Channel channel();

/**
* @return the {@link ChannelHandlerContext} of the handler managing the connection
*/
Expand All @@ -86,7 +81,10 @@ public interface HttpClientConnectionInternal extends HttpConnection {
*/
Future<HttpClientStream> createStream(ContextInternal context);

ContextInternal getContext();
/**
* @return the connection context
*/
ContextInternal context();

boolean isValid();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.PromiseInternal;
import io.vertx.core.internal.VertxInternal;
import io.vertx.core.internal.http.HttpClientInternal;
import io.vertx.core.internal.pool.ConnectionPool;
import io.vertx.core.internal.pool.Lease;
import io.vertx.core.net.endpoint.Endpoint;
import io.vertx.core.net.endpoint.impl.EndpointResolverImpl;
import io.vertx.core.http.*;
import io.vertx.core.net.*;
Expand Down Expand Up @@ -191,6 +193,7 @@ private EndpointProvider<EndpointKey, SharedClientHttpStreamEndpoint> httpEndpoi
}
HttpChannelConnector connector = new HttpChannelConnector(HttpClientImpl.this, netClient, key.sslOptions, proxyOptions, clientMetrics, options.getProtocolVersion(), key.ssl, options.isUseAlpn(), key.authority, key.server, true);
return new SharedClientHttpStreamEndpoint(
vertx,
HttpClientImpl.this,
clientMetrics,
poolMetrics,
Expand Down Expand Up @@ -385,20 +388,19 @@ private Future<HttpClientRequest> doRequest(
Boolean followRedirects,
ClientSSLOptions sslOptions,
ProxyOptions proxyConfig) {
ContextInternal ctx = vertx.getOrCreateContext();
ContextInternal connCtx = ctx.isEventLoopContext() ? ctx : vertx.createEventLoopContext(ctx.nettyEventLoop(), ctx.workerPool(), ctx.classLoader());
Promise<HttpClientRequest> promise = ctx.promise();
ContextInternal streamCtx = vertx.getOrCreateContext();
Future<ConnectionObtainedResult> future;
if (endpointResolver != null) {
Future<EndpointServer> fut = endpointResolver
.lookupEndpoint(ctx, server)
.map(endpoint -> endpoint.selectServer(routingKey));
future = fut.compose(lookup -> {
PromiseInternal<Endpoint> promise = vertx.promise();
endpointResolver.lookupEndpoint(server, promise);
future = promise.future()
.map(endpoint -> endpoint.selectServer(routingKey))
.compose(lookup -> {
SocketAddress address = lookup.address();
ProxyOptions proxyOptions = computeProxyOptions(proxyConfig, address);
EndpointKey key = new EndpointKey(useSSL, sslOptions, proxyOptions, address, authority != null ? authority : HostAndPort.create(address.host(), address.port()));
return httpCM.withEndpointAsync(key, httpEndpointProvider(), (endpoint, created) -> {
Future<Lease<HttpClientConnectionInternal>> fut2 = endpoint.requestConnection(connCtx, connectTimeout);
Future<Lease<HttpClientConnectionInternal>> fut2 = endpoint.requestConnection(streamCtx, connectTimeout);
if (fut2 == null) {
return null;
} else {
Expand All @@ -409,7 +411,7 @@ private Future<HttpClientRequest> doRequest(
}
}).compose(lease -> {
HttpClientConnectionInternal conn = lease.get();
return conn.createStream(ctx).map(stream -> {
return conn.createStream(streamCtx).map(stream -> {
HttpClientStream wrapped = new StatisticsGatheringHttpClientStream(stream, endpointRequest);
wrapped.closeHandler(v -> lease.recycle());
return new ConnectionObtainedResult(proxyOptions, wrapped);
Expand All @@ -422,13 +424,13 @@ private Future<HttpClientRequest> doRequest(
ProxyOptions proxyOptions = computeProxyOptions(proxyConfig, (SocketAddress) server);
EndpointKey key = new EndpointKey(useSSL, sslOptions, proxyOptions, (SocketAddress) server, authority);
future = httpCM.withEndpointAsync(key, httpEndpointProvider(), (endpoint, created) -> {
Future<Lease<HttpClientConnectionInternal>> fut = endpoint.requestConnection(connCtx, connectTimeout);
Future<Lease<HttpClientConnectionInternal>> fut = endpoint.requestConnection(streamCtx, connectTimeout);
if (fut == null) {
return null;
} else {
return fut.compose(lease -> {
HttpClientConnectionInternal conn = lease.get();
return conn.createStream(ctx).map(stream -> {
return conn.createStream(streamCtx).map(stream -> {
stream.closeHandler(v -> {
lease.recycle();
});
Expand All @@ -438,12 +440,12 @@ private Future<HttpClientRequest> doRequest(
}
});
} else {
return ctx.failedFuture("Cannot resolve address " + server);
future = streamCtx.failedFuture("Cannot resolve address " + server);
}
if (future == null) {
return connCtx.failedFuture("Cannot resolve address " + server);
return streamCtx.failedFuture("Cannot resolve address " + server);
} else {
future.map(res -> {
return future.map(res -> {
RequestOptions options = new RequestOptions();
options.setMethod(method);
options.setHeaders(headers);
Expand All @@ -454,8 +456,7 @@ private Future<HttpClientRequest> doRequest(
options.setTraceOperation(traceOperation);
HttpClientStream stream = res.stream;
return createRequest(stream.connection(), stream, options);
}).onComplete(promise);
return promise.future();
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public HttpClientRequestPushPromise(
HttpMethod method,
String uri,
MultiMap headers) {
super(connection, stream, stream.connection().getContext().promise(), method, uri);
super(connection, stream, stream.connection().context().promise(), method, uri);
this.stream = stream;
this.headers = headers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@
*/
public interface HttpServerConnection extends HttpConnection {

ContextInternal getContext();

Channel channel();
/**
* @return the connection context
*/
ContextInternal context();

ChannelHandlerContext channelHandlerContext();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void handle(HttpServerConnection conn) {
conn.invalidRequestHandler(invalidRequestHandler);
if (connectionHandler != null) {
// We hand roll event-loop execution in case of a worker context
ContextInternal ctx = conn.getContext();
ContextInternal ctx = conn.context();
ContextInternal prev = ctx.beginDispatch();
try {
connectionHandler.handle(conn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ public synchronized Future<HttpServer> listen(SocketAddress address) {
}
ContextInternal context = vertx.getOrCreateContext();
ContextInternal listenContext;
// Not sure of this
if (context.isEventLoopContext()) {
listenContext = context;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.impl.NoStackTraceTimeoutException;
import io.vertx.core.internal.PromiseInternal;
import io.vertx.core.internal.VertxInternal;
import io.vertx.core.internal.pool.ConnectResult;
import io.vertx.core.internal.pool.ConnectionPool;
import io.vertx.core.internal.pool.PoolConnection;
Expand Down Expand Up @@ -59,12 +60,14 @@ class SharedClientHttpStreamEndpoint extends ClientHttpEndpointBase<Lease<HttpCl
return selected;
};

private final VertxInternal vertx;
private final HttpClientImpl client;
private final ClientMetrics clientMetrics;
private final HttpChannelConnector connector;
private final ConnectionPool<HttpClientConnectionInternal> pool;

public SharedClientHttpStreamEndpoint(HttpClientImpl client,
public SharedClientHttpStreamEndpoint(VertxInternal vertx,
HttpClientImpl client,
ClientMetrics clientMetrics,
PoolMetrics poolMetrics,
int queueMaxSize,
Expand All @@ -77,6 +80,7 @@ public SharedClientHttpStreamEndpoint(HttpClientImpl client,
ConnectionPool<HttpClientConnectionInternal> pool = ConnectionPool.pool(this, new int[]{http1MaxSize, http2MaxSize}, queueMaxSize)
.connectionSelector(LIFO_SELECTOR).contextProvider(client.contextProvider());

this.vertx = vertx;
this.client = client;
this.clientMetrics = clientMetrics;
this.connector = connector;
Expand Down Expand Up @@ -177,7 +181,9 @@ void acquire() {
@Override
protected Future<Lease<HttpClientConnectionInternal>> requestConnection2(ContextInternal ctx, long timeout) {
PromiseInternal<Lease<HttpClientConnectionInternal>> promise = ctx.promise();
Request request = new Request(ctx, client.options().getProtocolVersion(), timeout, promise);
// ctx.workerPool() -> not sure we want that in a pool
ContextInternal connCtx = vertx.createEventLoopContext(ctx.nettyEventLoop(), ctx.workerPool(), ctx.classLoader());
Request request = new Request(connCtx, client.options().getProtocolVersion(), timeout, promise);
request.acquire();
return promise.future();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ private void checkPending(Void v) {

@Override
public Future<HttpClientRequest> request(RequestOptions options) {
ContextInternal ctx = actual.getContext().owner().getOrCreateContext();
ContextInternal ctx = actual.context().owner().getOrCreateContext();
return request(ctx, options);
}
}
23 changes: 0 additions & 23 deletions vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -434,29 +434,6 @@ public long setTimer(long delay, Handler<Long> handler) {
return scheduleTimeout(ctx, false, delay, TimeUnit.MILLISECONDS, ctx.isDeployment(), handler);
}

@Override
public <T> PromiseInternal<T> promise() {
ContextInternal context = getOrCreateContext();
return context.promise();
}

public <T> PromiseInternal<T> promise(Promise<T> p) {
if (p instanceof PromiseInternal) {
PromiseInternal<T> promise = (PromiseInternal<T>) p;
if (promise.context() != null) {
return promise;
}
}
PromiseInternal<T> promise = promise();
promise.future().onComplete(p);
return promise;
}

public void runOnContext(Handler<Void> task) {
ContextInternal context = getOrCreateContext();
context.runOnContext(task);
}

// The background pool is used for making blocking calls to legacy synchronous APIs
public WorkerPool getWorkerPool() {
return workerPool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,24 @@ default <T> PromiseInternal<T> promise(Promise<T> p) {
return promise;
}

/**
* Create a promise and pass it to the {@code handler}, and then returns this future's promise. The {@code handler}
* is responsible for completing the promise, if the {@code handler} throws an exception, the promise is attempted
* to be failed with this exception.
*
* @param handler the handler completing the promise
* @return the future of the created promise
*/
default <T> Future<T> future(Handler<Promise<T>> handler) {
Promise<T> promise = promise();
try {
handler.handle(promise);
} catch (Throwable t) {
promise.tryFail(t);
}
return promise.future();
}

/**
* @return an empty succeeded {@link Future} associated with this context
*/
Expand Down
33 changes: 31 additions & 2 deletions vertx-core/src/main/java/io/vertx/core/internal/VertxInternal.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,45 @@ static String version() {
return VertxImpl.version();
}

/**
* Create a promise and pass it to the {@code handler}, and then returns this future's promise. The {@code handler}
* is responsible for completing the promise, if the {@code handler} throws an exception, the promise is attempted
* to be failed with this exception.
*
* @param handler the handler completing the promise
* @return the future of the created promise
*/
default <T> Future<T> future(Handler<Promise<T>> handler) {
return getOrCreateContext().future(handler);
}

/**
* @return a promise associated with the context returned by {@link #getOrCreateContext()}.
*/
<T> PromiseInternal<T> promise();
default <T> PromiseInternal<T> promise() {
return getOrCreateContext().promise();
}

/**
* @return a promise associated with the context returned by {@link #getOrCreateContext()} or the {@code handler}
* if that handler is already an instance of {@code PromiseInternal}
*/
<T> PromiseInternal<T> promise(Promise<T> promise);
default <T> PromiseInternal<T> promise(Promise<T> p) {
if (p instanceof PromiseInternal) {
PromiseInternal<T> promise = (PromiseInternal<T>) p;
if (promise.context() != null) {
return promise;
}
}
PromiseInternal<T> promise = promise();
promise.future().onComplete(p);
return promise;
}

default void runOnContext(Handler<Void> task) {
ContextInternal context = getOrCreateContext();
context.runOnContext(task);
}

long maxEventLoopExecTime();

Expand Down
15 changes: 0 additions & 15 deletions vertx-core/src/main/java/io/vertx/core/internal/VertxWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,6 @@ public boolean cancelTimer(long id) {
return delegate.cancelTimer(id);
}

@Override
public void runOnContext(Handler<Void> action) {
delegate.runOnContext(action);
}

@Override
public Future<Void> close() {
return delegate.close();
Expand Down Expand Up @@ -238,16 +233,6 @@ public Handler<Throwable> exceptionHandler() {
return delegate.exceptionHandler();
}

@Override
public <T> PromiseInternal<T> promise() {
return delegate.promise();
}

@Override
public <T> PromiseInternal<T> promise(Promise<T> promise) {
return delegate.promise(promise);
}

@Override
public long maxEventLoopExecTime() {
return delegate.maxEventLoopExecTime();
Expand Down
Loading

0 comments on commit 7e785e9

Please sign in to comment.