diff --git a/src/main/java/io/vertx/core/eventbus/EventBus.java b/src/main/java/io/vertx/core/eventbus/EventBus.java index 140808e5c11..3d718093149 100644 --- a/src/main/java/io/vertx/core/eventbus/EventBus.java +++ b/src/main/java/io/vertx/core/eventbus/EventBus.java @@ -15,7 +15,6 @@ import io.vertx.codegen.annotations.GenIgnore; import io.vertx.codegen.annotations.Nullable; import io.vertx.codegen.annotations.VertxGen; -import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.eventbus.impl.DefaultSerializableChecker; @@ -199,6 +198,10 @@ default Future> request(String address, @Nullable Object message) */ MessageProducer publisher(String address, DeliveryOptions options); + Future bindStream(String address, Handler handler); + + Future connectStream(String address); + /** * Register a message codec. *

diff --git a/src/main/java/io/vertx/core/eventbus/MessageStream.java b/src/main/java/io/vertx/core/eventbus/MessageStream.java new file mode 100644 index 00000000000..c1d2952fd9c --- /dev/null +++ b/src/main/java/io/vertx/core/eventbus/MessageStream.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2011-2021 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.core.eventbus; + +import io.vertx.codegen.annotations.VertxGen; +import io.vertx.core.Handler; + +@VertxGen +public interface MessageStream { + + void handler(Handler> handler); + + void endHandler(Handler handler); + + void write(String msg); + + void end(); + +} diff --git a/src/main/java/io/vertx/core/eventbus/impl/ClientStream.java b/src/main/java/io/vertx/core/eventbus/impl/ClientStream.java new file mode 100644 index 00000000000..872a9089800 --- /dev/null +++ b/src/main/java/io/vertx/core/eventbus/impl/ClientStream.java @@ -0,0 +1,27 @@ +package io.vertx.core.eventbus.impl; + +import io.vertx.core.Promise; +import io.vertx.core.eventbus.MessageStream; +import io.vertx.core.impl.ContextInternal; + +class ClientStream extends StreamBase { + + private final Promise promise2; + + public ClientStream(EventBusImpl eventBus, String sourceAddress, ContextInternal ctx, Promise promise2) { + super(sourceAddress, ctx, eventBus, sourceAddress, true); + this.promise2 = promise2; + } + + @Override + protected boolean doReceive(Frame frame) { + if (frame instanceof SynFrame) { + SynFrame syn = (SynFrame) frame; + remoteAddress = syn.src; + promise2.complete(this); + return true; + } else { + return super.doReceive(frame); + } + } +} diff --git a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java index 7f299d480bb..2f4155570b2 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java @@ -155,6 +155,31 @@ public MessageProducer publisher(String address, DeliveryOptions options) return new MessageProducerImpl<>(vertx, address, false, options); } + @Override + public Future bindStream(String address, Handler handler) { + ContextInternal ctx = vertx.getOrCreateContext(); + HandlerRegistration reg = new StreamServer(this, ctx, address, handler); + Promise promise = ctx.promise(); + reg.register(true, false, promise); + return promise.future(); + } + + @Override + public Future connectStream(String address) { + ContextInternal ctx = vertx.getOrCreateContext(); + String sourceAddress = generateReplyAddress(); + Promise promise2 = ctx.promise(); + StreamBase reg = new ClientStream(this, sourceAddress, ctx, promise2); + Promise promise = ctx.promise(); + reg.register(false, false, promise); + promise.future().onComplete(ar -> { + if (ar.succeeded()) { + sendLocally(new SynFrame(sourceAddress, address), ctx.promise()); + } + }); + return promise2.future(); + } + @Override public EventBus publish(String address, Object message) { return publish(address, message, new DeliveryOptions()); diff --git a/src/main/java/io/vertx/core/eventbus/impl/FinFrame.java b/src/main/java/io/vertx/core/eventbus/impl/FinFrame.java new file mode 100644 index 00000000000..70309c74ff7 --- /dev/null +++ b/src/main/java/io/vertx/core/eventbus/impl/FinFrame.java @@ -0,0 +1,27 @@ +package io.vertx.core.eventbus.impl; + +import io.vertx.core.buffer.Buffer; + +class FinFrame implements Frame { + + final String addr; + + public FinFrame(String addr) { + this.addr = addr; + } + + @Override + public String address() { + return addr; + } + + @Override + public Buffer encodeToWire() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isFromWire() { + return false; + } +} diff --git a/src/main/java/io/vertx/core/eventbus/impl/StreamBase.java b/src/main/java/io/vertx/core/eventbus/impl/StreamBase.java new file mode 100644 index 00000000000..3b663b1566f --- /dev/null +++ b/src/main/java/io/vertx/core/eventbus/impl/StreamBase.java @@ -0,0 +1,75 @@ +package io.vertx.core.eventbus.impl; + +import io.vertx.core.Handler; +import io.vertx.core.MultiMap; +import io.vertx.core.eventbus.Message; +import io.vertx.core.eventbus.MessageStream; +import io.vertx.core.impl.ContextInternal; + +class StreamBase extends HandlerRegistration implements MessageStream { + + private Handler> handler; + private Handler endHandler; + final String localAddress; + String remoteAddress; + private boolean halfClosed; + + StreamBase(String localAddress, ContextInternal context, EventBusImpl bus, String address, boolean src) { + super(context, bus, address, src); + this.localAddress = localAddress; + } + + @Override + protected boolean doReceive(Frame frame) { + if (frame instanceof MessageImpl) { + MessageImpl msg = (MessageImpl) frame; + Handler> h = handler; + if (h != null) { + h.handle(msg); + } + } else if (frame instanceof FinFrame) { + Handler h = endHandler; + if (h != null) { + h.handle(null); + } + if (halfClosed) { + unregister(); + } else { + halfClosed = true; + } + } + return true; + } + + @Override + protected void dispatch(Message msg, ContextInternal context, Handler handler) { + + } + + @Override + public void handler(Handler> handler) { + this.handler = handler; + } + + @Override + public void endHandler(Handler handler) { + this.endHandler = handler; + } + + @Override + public void write(String body) { + MessageImpl msg = new MessageImpl(remoteAddress, MultiMap.caseInsensitiveMultiMap(), body, CodecManager.STRING_MESSAGE_CODEC, true, bus); + bus.sendLocally(msg, context.promise()); + } + + @Override + public void end() { + FinFrame fin = new FinFrame(remoteAddress); + bus.sendLocally(fin, context.promise()); + if (halfClosed) { + unregister(); + } else { + halfClosed = true; + } + } +} diff --git a/src/main/java/io/vertx/core/eventbus/impl/StreamServer.java b/src/main/java/io/vertx/core/eventbus/impl/StreamServer.java new file mode 100644 index 00000000000..f60a5d87109 --- /dev/null +++ b/src/main/java/io/vertx/core/eventbus/impl/StreamServer.java @@ -0,0 +1,42 @@ +package io.vertx.core.eventbus.impl; + +import io.vertx.core.Handler; +import io.vertx.core.eventbus.Message; +import io.vertx.core.eventbus.MessageStream; +import io.vertx.core.impl.ContextInternal; +import io.vertx.core.impl.future.PromiseInternal; + +class StreamServer extends HandlerRegistration { + private final EventBusImpl eventBus; + private final Handler handler; + + public StreamServer(EventBusImpl eventBus, ContextInternal ctx, String address, Handler handler) { + super(ctx, eventBus, address, false); + this.eventBus = eventBus; + this.handler = handler; + } + + @Override + protected boolean doReceive(Frame frame) { + if (frame instanceof SynFrame) { + SynFrame syn = (SynFrame) frame; + String localAddress = eventBus.generateReplyAddress(); + StreamBase ss = new StreamBase(localAddress, context, eventBus, localAddress, false); + ss.remoteAddress = syn.src; + PromiseInternal p = context.promise(); + ss.register(false, true, p); + p.onComplete(ar -> { + if (ar.succeeded()) { + SynFrame reply = new SynFrame(localAddress, syn.src); + eventBus.sendLocally(reply, context.promise()); + handler.handle(ss); + } + }); + } + return true; + } + + @Override + protected void dispatch(Message msg, ContextInternal context, Handler handler) { + } +} diff --git a/src/main/java/io/vertx/core/eventbus/impl/SynFrame.java b/src/main/java/io/vertx/core/eventbus/impl/SynFrame.java new file mode 100644 index 00000000000..d55d6250a7e --- /dev/null +++ b/src/main/java/io/vertx/core/eventbus/impl/SynFrame.java @@ -0,0 +1,29 @@ +package io.vertx.core.eventbus.impl; + +import io.vertx.core.buffer.Buffer; + +public class SynFrame implements Frame { + + final String src; + final String dst; + + public SynFrame(String src, String dst) { + this.src = src; + this.dst = dst; + } + + @Override + public String address() { + return dst; + } + + @Override + public Buffer encodeToWire() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isFromWire() { + return false; + } +} diff --git a/src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java b/src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java index 43107da8963..f8784d2e369 100644 --- a/src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java +++ b/src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java @@ -1531,5 +1531,34 @@ public void testEarlyTimeoutOnHandlerUnregistration() { }); await(); } + + @Test + public void testStream() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + vertx.eventBus().bindStream(ADDRESS1, stream -> { + stream.handler(msg -> { + assertEquals("ping", msg.body()); + stream.write(msg.body()); + }); + stream.endHandler(v -> { + stream.end(); + }); + }).onComplete(onSuccess(v -> { + latch.countDown(); + })); + awaitLatch(latch); + vertx.eventBus().connectStream(ADDRESS1).onComplete(onSuccess(stream -> { + stream.write("ping"); + stream.handler(msg -> { + assertEquals("ping", msg.body()); + stream.end(); + }); + stream.endHandler(v -> { + testComplete(); + }); + })); + await(); + } + }