diff --git a/vertx-core/src/main/java/io/vertx/core/Future.java b/vertx-core/src/main/java/io/vertx/core/Future.java index ff2b7bd0a70..9e5318dc497 100644 --- a/vertx-core/src/main/java/io/vertx/core/Future.java +++ b/vertx-core/src/main/java/io/vertx/core/Future.java @@ -185,11 +185,12 @@ static CompositeFuture join(List> futures) { } /** - * Create a future that hasn't completed yet and that is passed to the {@code handler} before it is returned. + * Create a promise and pass it to the {@code handler}, and then returns this future's promise. The {@code handler} + * is responsible for completing the promise, if the {@code handler} throws an exception, the promise is attempted + * to be failed with this exception. * - * @param handler the handler - * @param the result type - * @return the future. + * @param handler the handler completing the promise + * @return the future of the created promise */ static Future future(Handler> handler) { Promise promise = Promise.promise(); diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/Http2UpgradeClientConnection.java b/vertx-core/src/main/java/io/vertx/core/http/impl/Http2UpgradeClientConnection.java index bb7bc7bfd4f..be4f727ab1a 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/Http2UpgradeClientConnection.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/Http2UpgradeClientConnection.java @@ -98,11 +98,6 @@ public ChannelHandlerContext channelHandlerContext() { return current.channelHandlerContext(); } - @Override - public Channel channel() { - return current.channel(); - } - @Override public Object metric() { return current.metric(); @@ -795,8 +790,8 @@ public Future createStream(ContextInternal context) { } @Override - public ContextInternal getContext() { - return current.getContext(); + public ContextInternal context() { + return current.context(); } @Override diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientConnectionInternal.java b/vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientConnectionInternal.java index 1af1f1fea60..2ca4ebfedfe 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientConnectionInternal.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientConnectionInternal.java @@ -68,11 +68,6 @@ public interface HttpClientConnectionInternal extends HttpConnection { */ boolean pooled(); - /** - * @return the connection channel - */ - Channel channel(); - /** * @return the {@link ChannelHandlerContext} of the handler managing the connection */ @@ -86,7 +81,10 @@ public interface HttpClientConnectionInternal extends HttpConnection { */ Future createStream(ContextInternal context); - ContextInternal getContext(); + /** + * @return the connection context + */ + ContextInternal context(); boolean isValid(); diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java b/vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java index 837cf2957c3..f635c7ce597 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java @@ -16,10 +16,12 @@ import io.vertx.core.MultiMap; import io.vertx.core.Promise; import io.vertx.core.internal.ContextInternal; +import io.vertx.core.internal.PromiseInternal; import io.vertx.core.internal.VertxInternal; import io.vertx.core.internal.http.HttpClientInternal; import io.vertx.core.internal.pool.ConnectionPool; import io.vertx.core.internal.pool.Lease; +import io.vertx.core.net.endpoint.Endpoint; import io.vertx.core.net.endpoint.impl.EndpointResolverImpl; import io.vertx.core.http.*; import io.vertx.core.net.*; @@ -191,6 +193,7 @@ private EndpointProvider httpEndpoi } HttpChannelConnector connector = new HttpChannelConnector(HttpClientImpl.this, netClient, key.sslOptions, proxyOptions, clientMetrics, options.getProtocolVersion(), key.ssl, options.isUseAlpn(), key.authority, key.server, true); return new SharedClientHttpStreamEndpoint( + vertx, HttpClientImpl.this, clientMetrics, poolMetrics, @@ -385,20 +388,19 @@ private Future doRequest( Boolean followRedirects, ClientSSLOptions sslOptions, ProxyOptions proxyConfig) { - ContextInternal ctx = vertx.getOrCreateContext(); - ContextInternal connCtx = ctx.isEventLoopContext() ? ctx : vertx.createEventLoopContext(ctx.nettyEventLoop(), ctx.workerPool(), ctx.classLoader()); - Promise promise = ctx.promise(); + ContextInternal streamCtx = vertx.getOrCreateContext(); Future future; if (endpointResolver != null) { - Future fut = endpointResolver - .lookupEndpoint(ctx, server) - .map(endpoint -> endpoint.selectServer(routingKey)); - future = fut.compose(lookup -> { + PromiseInternal promise = vertx.promise(); + endpointResolver.lookupEndpoint(server, promise); + future = promise.future() + .map(endpoint -> endpoint.selectServer(routingKey)) + .compose(lookup -> { SocketAddress address = lookup.address(); ProxyOptions proxyOptions = computeProxyOptions(proxyConfig, address); EndpointKey key = new EndpointKey(useSSL, sslOptions, proxyOptions, address, authority != null ? authority : HostAndPort.create(address.host(), address.port())); return httpCM.withEndpointAsync(key, httpEndpointProvider(), (endpoint, created) -> { - Future> fut2 = endpoint.requestConnection(connCtx, connectTimeout); + Future> fut2 = endpoint.requestConnection(streamCtx, connectTimeout); if (fut2 == null) { return null; } else { @@ -409,7 +411,7 @@ private Future doRequest( } }).compose(lease -> { HttpClientConnectionInternal conn = lease.get(); - return conn.createStream(ctx).map(stream -> { + return conn.createStream(streamCtx).map(stream -> { HttpClientStream wrapped = new StatisticsGatheringHttpClientStream(stream, endpointRequest); wrapped.closeHandler(v -> lease.recycle()); return new ConnectionObtainedResult(proxyOptions, wrapped); @@ -422,13 +424,13 @@ private Future doRequest( ProxyOptions proxyOptions = computeProxyOptions(proxyConfig, (SocketAddress) server); EndpointKey key = new EndpointKey(useSSL, sslOptions, proxyOptions, (SocketAddress) server, authority); future = httpCM.withEndpointAsync(key, httpEndpointProvider(), (endpoint, created) -> { - Future> fut = endpoint.requestConnection(connCtx, connectTimeout); + Future> fut = endpoint.requestConnection(streamCtx, connectTimeout); if (fut == null) { return null; } else { return fut.compose(lease -> { HttpClientConnectionInternal conn = lease.get(); - return conn.createStream(ctx).map(stream -> { + return conn.createStream(streamCtx).map(stream -> { stream.closeHandler(v -> { lease.recycle(); }); @@ -438,12 +440,12 @@ private Future doRequest( } }); } else { - return ctx.failedFuture("Cannot resolve address " + server); + future = streamCtx.failedFuture("Cannot resolve address " + server); } if (future == null) { - return connCtx.failedFuture("Cannot resolve address " + server); + return streamCtx.failedFuture("Cannot resolve address " + server); } else { - future.map(res -> { + return future.map(res -> { RequestOptions options = new RequestOptions(); options.setMethod(method); options.setHeaders(headers); @@ -454,8 +456,7 @@ private Future doRequest( options.setTraceOperation(traceOperation); HttpClientStream stream = res.stream; return createRequest(stream.connection(), stream, options); - }).onComplete(promise); - return promise.future(); + }); } } diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientRequestPushPromise.java b/vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientRequestPushPromise.java index b79c93f4786..81fe0987c4e 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientRequestPushPromise.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientRequestPushPromise.java @@ -35,7 +35,7 @@ public HttpClientRequestPushPromise( HttpMethod method, String uri, MultiMap headers) { - super(connection, stream, stream.connection().getContext().promise(), method, uri); + super(connection, stream, stream.connection().context().promise(), method, uri); this.stream = stream; this.headers = headers; } diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/HttpServerConnection.java b/vertx-core/src/main/java/io/vertx/core/http/impl/HttpServerConnection.java index 32e5af59d96..ff8dcc7923b 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/HttpServerConnection.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/HttpServerConnection.java @@ -22,9 +22,10 @@ */ public interface HttpServerConnection extends HttpConnection { - ContextInternal getContext(); - - Channel channel(); + /** + * @return the connection context + */ + ContextInternal context(); ChannelHandlerContext channelHandlerContext(); diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/HttpServerConnectionHandler.java b/vertx-core/src/main/java/io/vertx/core/http/impl/HttpServerConnectionHandler.java index 7d1e69149a9..6e741b1fb1f 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/HttpServerConnectionHandler.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/HttpServerConnectionHandler.java @@ -72,7 +72,7 @@ public void handle(HttpServerConnection conn) { conn.invalidRequestHandler(invalidRequestHandler); if (connectionHandler != null) { // We hand roll event-loop execution in case of a worker context - ContextInternal ctx = conn.getContext(); + ContextInternal ctx = conn.context(); ContextInternal prev = ctx.beginDispatch(); try { connectionHandler.handle(conn); diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/HttpServerImpl.java b/vertx-core/src/main/java/io/vertx/core/http/impl/HttpServerImpl.java index 3fe6e0d077c..4121b586e59 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/HttpServerImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/HttpServerImpl.java @@ -175,6 +175,7 @@ public synchronized Future listen(SocketAddress address) { } ContextInternal context = vertx.getOrCreateContext(); ContextInternal listenContext; + // Not sure of this if (context.isEventLoopContext()) { listenContext = context; } else { diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/SharedClientHttpStreamEndpoint.java b/vertx-core/src/main/java/io/vertx/core/http/impl/SharedClientHttpStreamEndpoint.java index 8515fd45ca0..4f6cc16889c 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/SharedClientHttpStreamEndpoint.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/SharedClientHttpStreamEndpoint.java @@ -19,6 +19,7 @@ import io.vertx.core.internal.ContextInternal; import io.vertx.core.impl.NoStackTraceTimeoutException; import io.vertx.core.internal.PromiseInternal; +import io.vertx.core.internal.VertxInternal; import io.vertx.core.internal.pool.ConnectResult; import io.vertx.core.internal.pool.ConnectionPool; import io.vertx.core.internal.pool.PoolConnection; @@ -59,12 +60,14 @@ class SharedClientHttpStreamEndpoint extends ClientHttpEndpointBase pool; - public SharedClientHttpStreamEndpoint(HttpClientImpl client, + public SharedClientHttpStreamEndpoint(VertxInternal vertx, + HttpClientImpl client, ClientMetrics clientMetrics, PoolMetrics poolMetrics, int queueMaxSize, @@ -77,6 +80,7 @@ public SharedClientHttpStreamEndpoint(HttpClientImpl client, ConnectionPool pool = ConnectionPool.pool(this, new int[]{http1MaxSize, http2MaxSize}, queueMaxSize) .connectionSelector(LIFO_SELECTOR).contextProvider(client.contextProvider()); + this.vertx = vertx; this.client = client; this.clientMetrics = clientMetrics; this.connector = connector; @@ -177,7 +181,9 @@ void acquire() { @Override protected Future> requestConnection2(ContextInternal ctx, long timeout) { PromiseInternal> promise = ctx.promise(); - Request request = new Request(ctx, client.options().getProtocolVersion(), timeout, promise); + // ctx.workerPool() -> not sure we want that in a pool + ContextInternal connCtx = vertx.createEventLoopContext(ctx.nettyEventLoop(), ctx.workerPool(), ctx.classLoader()); + Request request = new Request(connCtx, client.options().getProtocolVersion(), timeout, promise); request.acquire(); return promise.future(); } diff --git a/vertx-core/src/main/java/io/vertx/core/http/impl/UnpooledHttpClientConnection.java b/vertx-core/src/main/java/io/vertx/core/http/impl/UnpooledHttpClientConnection.java index 353f93a25b4..a068cf395f4 100644 --- a/vertx-core/src/main/java/io/vertx/core/http/impl/UnpooledHttpClientConnection.java +++ b/vertx-core/src/main/java/io/vertx/core/http/impl/UnpooledHttpClientConnection.java @@ -247,7 +247,7 @@ private void checkPending(Void v) { @Override public Future request(RequestOptions options) { - ContextInternal ctx = actual.getContext().owner().getOrCreateContext(); + ContextInternal ctx = actual.context().owner().getOrCreateContext(); return request(ctx, options); } } diff --git a/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java b/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java index 9651e4d1be8..277eec6ae20 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java @@ -434,29 +434,6 @@ public long setTimer(long delay, Handler handler) { return scheduleTimeout(ctx, false, delay, TimeUnit.MILLISECONDS, ctx.isDeployment(), handler); } - @Override - public PromiseInternal promise() { - ContextInternal context = getOrCreateContext(); - return context.promise(); - } - - public PromiseInternal promise(Promise p) { - if (p instanceof PromiseInternal) { - PromiseInternal promise = (PromiseInternal) p; - if (promise.context() != null) { - return promise; - } - } - PromiseInternal promise = promise(); - promise.future().onComplete(p); - return promise; - } - - public void runOnContext(Handler task) { - ContextInternal context = getOrCreateContext(); - context.runOnContext(task); - } - // The background pool is used for making blocking calls to legacy synchronous APIs public WorkerPool getWorkerPool() { return workerPool; diff --git a/vertx-core/src/main/java/io/vertx/core/internal/ContextInternal.java b/vertx-core/src/main/java/io/vertx/core/internal/ContextInternal.java index 806ef017e86..498475bf859 100644 --- a/vertx-core/src/main/java/io/vertx/core/internal/ContextInternal.java +++ b/vertx-core/src/main/java/io/vertx/core/internal/ContextInternal.java @@ -85,6 +85,24 @@ default PromiseInternal promise(Promise p) { return promise; } + /** + * Create a promise and pass it to the {@code handler}, and then returns this future's promise. The {@code handler} + * is responsible for completing the promise, if the {@code handler} throws an exception, the promise is attempted + * to be failed with this exception. + * + * @param handler the handler completing the promise + * @return the future of the created promise + */ + default Future future(Handler> handler) { + Promise promise = promise(); + try { + handler.handle(promise); + } catch (Throwable t) { + promise.tryFail(t); + } + return promise.future(); + } + /** * @return an empty succeeded {@link Future} associated with this context */ diff --git a/vertx-core/src/main/java/io/vertx/core/internal/VertxInternal.java b/vertx-core/src/main/java/io/vertx/core/internal/VertxInternal.java index 5ffc6773e2b..afe6cfd7a22 100644 --- a/vertx-core/src/main/java/io/vertx/core/internal/VertxInternal.java +++ b/vertx-core/src/main/java/io/vertx/core/internal/VertxInternal.java @@ -55,16 +55,45 @@ static String version() { return VertxImpl.version(); } + /** + * Create a promise and pass it to the {@code handler}, and then returns this future's promise. The {@code handler} + * is responsible for completing the promise, if the {@code handler} throws an exception, the promise is attempted + * to be failed with this exception. + * + * @param handler the handler completing the promise + * @return the future of the created promise + */ + default Future future(Handler> handler) { + return getOrCreateContext().future(handler); + } + /** * @return a promise associated with the context returned by {@link #getOrCreateContext()}. */ - PromiseInternal promise(); + default PromiseInternal promise() { + return getOrCreateContext().promise(); + } /** * @return a promise associated with the context returned by {@link #getOrCreateContext()} or the {@code handler} * if that handler is already an instance of {@code PromiseInternal} */ - PromiseInternal promise(Promise promise); + default PromiseInternal promise(Promise p) { + if (p instanceof PromiseInternal) { + PromiseInternal promise = (PromiseInternal) p; + if (promise.context() != null) { + return promise; + } + } + PromiseInternal promise = promise(); + promise.future().onComplete(p); + return promise; + } + + default void runOnContext(Handler task) { + ContextInternal context = getOrCreateContext(); + context.runOnContext(task); + } long maxEventLoopExecTime(); diff --git a/vertx-core/src/main/java/io/vertx/core/internal/VertxWrapper.java b/vertx-core/src/main/java/io/vertx/core/internal/VertxWrapper.java index aace00bace9..4b2bf70a70a 100644 --- a/vertx-core/src/main/java/io/vertx/core/internal/VertxWrapper.java +++ b/vertx-core/src/main/java/io/vertx/core/internal/VertxWrapper.java @@ -153,11 +153,6 @@ public boolean cancelTimer(long id) { return delegate.cancelTimer(id); } - @Override - public void runOnContext(Handler action) { - delegate.runOnContext(action); - } - @Override public Future close() { return delegate.close(); @@ -238,16 +233,6 @@ public Handler exceptionHandler() { return delegate.exceptionHandler(); } - @Override - public PromiseInternal promise() { - return delegate.promise(); - } - - @Override - public PromiseInternal promise(Promise promise) { - return delegate.promise(promise); - } - @Override public long maxEventLoopExecTime() { return delegate.maxEventLoopExecTime(); diff --git a/vertx-core/src/main/java/io/vertx/core/internal/net/endpoint/EndpointResolverInternal.java b/vertx-core/src/main/java/io/vertx/core/internal/net/endpoint/EndpointResolverInternal.java index b69a249ac2f..fecab91e208 100644 --- a/vertx-core/src/main/java/io/vertx/core/internal/net/endpoint/EndpointResolverInternal.java +++ b/vertx-core/src/main/java/io/vertx/core/internal/net/endpoint/EndpointResolverInternal.java @@ -11,6 +11,7 @@ package io.vertx.core.internal.net.endpoint; import io.vertx.core.Future; +import io.vertx.core.Promise; import io.vertx.core.internal.ContextInternal; import io.vertx.core.internal.VertxInternal; import io.vertx.core.net.Address; @@ -28,7 +29,7 @@ static EndpointResolverInternal create(VertxInternal vertx, return new EndpointResolverImpl<>(vertx, endpointResolver, loadBalancer, expirationMillis); } - Future lookupEndpoint(ContextInternal ctx, Address address); + void lookupEndpoint(Address address, Promise promise); /** * Check expired endpoints, this method is called by the client periodically to give the opportunity to trigger eviction diff --git a/vertx-core/src/main/java/io/vertx/core/net/endpoint/impl/EndpointResolverImpl.java b/vertx-core/src/main/java/io/vertx/core/net/endpoint/impl/EndpointResolverImpl.java index 40f40419440..5ed0ed85f57 100644 --- a/vertx-core/src/main/java/io/vertx/core/net/endpoint/impl/EndpointResolverImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/net/endpoint/impl/EndpointResolverImpl.java @@ -11,6 +11,7 @@ package io.vertx.core.net.endpoint.impl; import io.vertx.core.Future; +import io.vertx.core.Promise; import io.vertx.core.internal.net.endpoint.EndpointResolverInternal; import io.vertx.core.net.endpoint.EndpointServer; import io.vertx.core.net.endpoint.ServerInteraction; @@ -69,11 +70,17 @@ public void checkExpired() { @Override public Future resolveEndpoint(Address address) { - return lookupEndpoint2(vertx.getOrCreateContext(), address); + return vertx.future(promise -> lookupEndpoint(address, promise)); } - public Future lookupEndpoint(ContextInternal ctx, Address address) { - return lookupEndpoint2(ctx, address); + public void lookupEndpoint(Address address, Promise promise) { + A casted = endpointResolver.tryCast(address); + if (casted == null) { + promise.fail("Cannot resolve address " + address); + return; + } + ManagedEndpoint resolved = resolveAddress(casted); + ((Future) resolved.endpoint).onComplete(promise); } private class EndpointImpl implements io.vertx.core.net.endpoint.Endpoint { @@ -117,15 +124,6 @@ public EndpointServer selectServer(String key) { } } - private Future lookupEndpoint2(ContextInternal ctx, Address address) { - A casted = endpointResolver.tryCast(address); - if (casted == null) { - return ctx.failedFuture("Cannot resolve address " + address); - } - ManagedEndpoint resolved = resolveAddress(casted); - return (Future) resolved.endpoint; - } - private class ManagedEndpoint extends Endpoint { private final Future endpoint; diff --git a/vertx-core/src/test/java/io/vertx/tests/http/Http2ClientTest.java b/vertx-core/src/test/java/io/vertx/tests/http/Http2ClientTest.java index a97076483d2..60240108682 100644 --- a/vertx-core/src/test/java/io/vertx/tests/http/Http2ClientTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/http/Http2ClientTest.java @@ -1696,7 +1696,7 @@ public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers assertEquals(HttpVersion.HTTP_2, resp1.version()); client.request(requestOptions).onComplete(onSuccess(req2 -> { req2.send().onComplete(onSuccess(resp2 -> { - assertSame(((HttpClientConnectionInternal)conn).channel(), ((HttpClientConnectionInternal)resp2.request().connection()).channel()); + assertSame(((HttpClientConnectionInternal)conn).channelHandlerContext().channel(), ((HttpClientConnectionInternal)resp2.request().connection()).channelHandlerContext().channel()); testComplete(); })); })); @@ -1735,8 +1735,8 @@ public void testRejectClearTextUpgrade() throws Exception { client.request(requestOptions).onComplete(onSuccess(req -> { req.send().onComplete(onSuccess(resp -> { Http2UpgradeClientConnection connection = (Http2UpgradeClientConnection) resp.request().connection(); - Channel ch = connection.channel(); - ChannelPipeline pipeline = ch.pipeline(); + ChannelHandlerContext chctx = connection.channelHandlerContext(); + ChannelPipeline pipeline = chctx.pipeline(); for (Map.Entry entry : pipeline) { assertTrue("Was not expecting pipeline handler " + entry.getValue().getClass(), entry.getKey().equals("codec") || entry.getKey().equals("handler")); } diff --git a/vertx-core/src/test/java/io/vertx/tests/http/SharedHttpClientTest.java b/vertx-core/src/test/java/io/vertx/tests/http/SharedHttpClientTest.java index 1382bc4cffc..d46cbd368f9 100644 --- a/vertx-core/src/test/java/io/vertx/tests/http/SharedHttpClientTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/http/SharedHttpClientTest.java @@ -42,7 +42,9 @@ public void testVerticlesUseSamePool() throws Exception { CountDownLatch receivedLatch = new CountDownLatch(TOTAL_REQUESTS); ServerVerticle serverVerticle = new ServerVerticle(); - vertx.deployVerticle(serverVerticle).onComplete(onSuccess(serverId -> { + vertx + .deployVerticle(serverVerticle) + .onComplete(onSuccess(serverId -> { DeploymentOptions deploymentOptions = deploymentOptions( CLIENT_VERTICLE_INSTANCES,