diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteConfiguration.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteConfiguration.java index 4e3dfa2c4c..a73ef2a4a4 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteConfiguration.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteConfiguration.java @@ -26,6 +26,7 @@ import org.apache.reef.wake.remote.impl.DefaultTransportEStage; import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; import org.apache.reef.wake.remote.impl.TransportEvent; +import org.apache.reef.wake.remote.transport.TransportFactory.ProtocolType; /** * Configuration options and helper methods for Wake remoting. @@ -132,4 +133,13 @@ public static final class RemoteClientStage implements Name> { // Intentionally empty } + + /** + * Option for use http. + * Default value must be ProtocolType.TCP.name(). + */ + @NamedParameter(doc = "Option for use http.", default_value = "TCP") + public static final class Protocol implements Name { + // Intentionally empty + } } diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/TransportFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/TransportFactory.java index a58da0e274..0ea562f44e 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/TransportFactory.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/TransportFactory.java @@ -31,6 +31,13 @@ @DefaultImplementation(MessagingTransportFactory.class) public interface TransportFactory { + /** + * Types of protocol used in Transport. + */ + enum ProtocolType { + TCP, HTTP + } + /** * Creates a transport. * diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/MessagingTransportFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/MessagingTransportFactory.java index 2f0d84d78d..b638612d82 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/MessagingTransportFactory.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/MessagingTransportFactory.java @@ -20,6 +20,7 @@ import org.apache.reef.tang.Injector; import org.apache.reef.tang.Tang; +import org.apache.reef.tang.annotations.Parameter; import org.apache.reef.tang.exceptions.InjectionException; import org.apache.reef.wake.EStage; import org.apache.reef.wake.EventHandler; @@ -39,10 +40,14 @@ public final class MessagingTransportFactory implements TransportFactory { private final String localAddress; + private final ProtocolType protocol; + private static final Tang TANG = Tang.Factory.getTang(); @Inject - private MessagingTransportFactory(final LocalAddressProvider localAddressProvider) { + private MessagingTransportFactory(final LocalAddressProvider localAddressProvider, + @Parameter(RemoteConfiguration.Protocol.class) final ProtocolType protocolType) { this.localAddress = localAddressProvider.getLocalAddress(); + this.protocol = protocolType; } /** @@ -59,11 +64,12 @@ public Transport newInstance(final int port, final EventHandler serverHandler, final EventHandler exHandler) { - final Injector injector = Tang.Factory.getTang().newInjector(); + final Injector injector = TANG.newInjector(); injector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, this.localAddress); injector.bindVolatileParameter(RemoteConfiguration.Port.class, port); injector.bindVolatileParameter(RemoteConfiguration.RemoteClientStage.class, new SyncStage<>(clientHandler)); injector.bindVolatileParameter(RemoteConfiguration.RemoteServerStage.class, new SyncStage<>(serverHandler)); + injector.bindVolatileParameter(RemoteConfiguration.Protocol.class, this.protocol); final Transport transport; try { @@ -93,7 +99,7 @@ public Transport newInstance(final String hostAddress, final int numberOfTries, final int retryTimeout) { try { - TcpPortProvider tcpPortProvider = Tang.Factory.getTang().newInjector().getInstance(TcpPortProvider.class); + final TcpPortProvider tcpPortProvider = TANG.newInjector().getInstance(TcpPortProvider.class); return newInstance(hostAddress, port, clientStage, serverStage, numberOfTries, retryTimeout, tcpPortProvider); } catch (final InjectionException e) { @@ -121,7 +127,7 @@ public Transport newInstance(final String hostAddress, final int retryTimeout, final TcpPortProvider tcpPortProvider) { - final Injector injector = Tang.Factory.getTang().newInjector(); + final Injector injector = TANG.newInjector(); injector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, hostAddress); injector.bindVolatileParameter(RemoteConfiguration.Port.class, port); injector.bindVolatileParameter(RemoteConfiguration.RemoteClientStage.class, clientStage); @@ -129,6 +135,7 @@ public Transport newInstance(final String hostAddress, injector.bindVolatileParameter(RemoteConfiguration.NumberOfTries.class, numberOfTries); injector.bindVolatileParameter(RemoteConfiguration.RetryTimeout.class, retryTimeout); injector.bindVolatileInstance(TcpPortProvider.class, tcpPortProvider); + injector.bindVolatileParameter(RemoteConfiguration.Protocol.class, this.protocol); try { return injector.getInstance(NettyMessagingTransport.class); } catch (final InjectionException e) { diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyChannelFutureListener.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyChannelFutureListener.java new file mode 100644 index 0000000000..1fba96f26b --- /dev/null +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyChannelFutureListener.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.wake.remote.transport.netty; + +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import org.apache.reef.wake.remote.transport.LinkListener; + +/** + * Future Listener used in NettyLink. + */ +public final class NettyChannelFutureListener implements ChannelFutureListener { + + private final T message; + private final LinkListener listener; + + NettyChannelFutureListener(final T message, final LinkListener listener) { + this.message = message; + this.listener = listener; + } + + @Override + public void operationComplete(final ChannelFuture channelFuture) throws Exception { + if (channelFuture.isSuccess()) { + listener.onSuccess(message); + } else { + listener.onException(channelFuture.cause(), channelFuture.channel().remoteAddress(), message); + } + } +} diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyChannelInitializer.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyChannelInitializer.java index 14fd0af17e..0de7b082d1 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyChannelInitializer.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyChannelInitializer.java @@ -19,34 +19,70 @@ package org.apache.reef.wake.remote.transport.netty; import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.bytes.ByteArrayDecoder; import io.netty.handler.codec.bytes.ByteArrayEncoder; +import io.netty.handler.codec.http.*; +import org.apache.reef.wake.remote.transport.TransportFactory.ProtocolType; + /** * Netty channel initializer for Transport. */ class NettyChannelInitializer extends ChannelInitializer { + /** * the buffer size of the frame decoder. */ public static final int MAXFRAMELENGTH = 10 * 1024 * 1024; + private static final int MAX_HTTP_MESSAGE_LENGTH = 10 * 1024 * 1024; private final NettyChannelHandlerFactory handlerFactory; - NettyChannelInitializer(final NettyChannelHandlerFactory handlerFactory) { + /** + * Type of protocol channel use. + */ + private final ProtocolType protocolType; + private final boolean isServer; + + NettyChannelInitializer( + final NettyChannelHandlerFactory handlerFactory, + final ProtocolType protocol) { + this(handlerFactory, protocol, false); + } + + NettyChannelInitializer( + final NettyChannelHandlerFactory handlerFactory, + final ProtocolType protocol, + final boolean isServer) { this.handlerFactory = handlerFactory; + this.protocolType = protocol; + this.isServer = isServer; } @Override protected void initChannel(final SocketChannel ch) throws Exception { - ch.pipeline() - .addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MAXFRAMELENGTH, 0, 4, 0, 4)) - .addLast("bytesDecoder", new ByteArrayDecoder()) - .addLast("frameEncoder", new LengthFieldPrepender(4)) - .addLast("bytesEncoder", new ByteArrayEncoder()) - .addLast("chunker", new ChunkedReadWriteHandler()) - .addLast("handler", handlerFactory.createChannelInboundHandler()); + final ChannelPipeline pipeline = ch.pipeline(); + switch (this.protocolType) { + case TCP: + pipeline + .addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MAXFRAMELENGTH, 0, 4, 0, 4)) + .addLast("bytesDecoder", new ByteArrayDecoder()) + .addLast("frameEncoder", new LengthFieldPrepender(4)) + .addLast("bytesEncoder", new ByteArrayEncoder()) + .addLast("chunker", new ChunkedReadWriteHandler()); + break; + case HTTP: + pipeline + .addLast("codec", isServer ? new HttpServerCodec() : new HttpClientCodec()) + .addLast("aggregator", new HttpObjectAggregator(MAX_HTTP_MESSAGE_LENGTH)); + break; + default: + throw new IllegalArgumentException("Invalid type of channel"); + } + // every channel pipeline has the same inbound handler. + pipeline.addLast("handler", handlerFactory.createChannelInboundHandler()); } } diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyDefaultLinkFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyDefaultLinkFactory.java new file mode 100644 index 0000000000..370fa5b104 --- /dev/null +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyDefaultLinkFactory.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.wake.remote.transport.netty; + +import io.netty.channel.Channel; +import org.apache.reef.wake.remote.Encoder; +import org.apache.reef.wake.remote.transport.Link; +import org.apache.reef.wake.remote.transport.LinkListener; + +/** + * Factory that creates a NettyLink. + */ +public final class NettyDefaultLinkFactory implements NettyLinkFactory { + + NettyDefaultLinkFactory() {} + + @Override + public Link newInstance(final Channel channel, final Encoder encoder) { + return new NettyLink(channel, encoder); + } + + @Override + public Link newInstance(final Channel channel, + final Encoder encoder, + final LinkListener listener) { + return new NettyLink(channel, encoder, listener); + } +} diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyHttpClientEventListener.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyHttpClientEventListener.java new file mode 100644 index 0000000000..df72cde6ab --- /dev/null +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyHttpClientEventListener.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.wake.remote.transport.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpContent; +import io.netty.util.CharsetUtil; +import org.apache.reef.wake.EStage; +import org.apache.reef.wake.remote.exception.RemoteRuntimeException; +import org.apache.reef.wake.remote.impl.TransportEvent; + +import java.net.SocketAddress; +import java.util.concurrent.ConcurrentMap; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * A Netty event listener for client. + */ +final class NettyHttpClientEventListener extends AbstractNettyEventListener { + + private static final Logger LOG = Logger.getLogger(NettyHttpClientEventListener.class.getName()); + + NettyHttpClientEventListener( + final ConcurrentMap addrToLinkRefMap, + final EStage stage) { + super(addrToLinkRefMap, stage); + } + + @Override + public void channelRead(final ChannelHandlerContext ctx, final Object msg) { + if (msg instanceof FullHttpResponse) { + if(LOG.isLoggable(Level.FINEST)) { + LOG.log(Level.FINEST, "HttpResponse Received: {0}", msg); + } + final HttpContent httpContent = (HttpContent) msg; + final ByteBuf byteBuf = httpContent.content(); + final Channel channel = ctx.channel(); + final byte[] content; + + if (byteBuf.hasArray()) { + content = byteBuf.array(); + } else { + content = new byte[byteBuf.readableBytes()]; + byteBuf.readBytes(content); + } + + if (LOG.isLoggable(Level.FINEST)) { + final StringBuilder buf = new StringBuilder(); + buf.append("CONTENT: ").append(byteBuf.toString(CharsetUtil.UTF_8)).append("\r\n"); + LOG.log(Level.FINEST, "MessageEvent: local: {0} remote: {1} :: {2}", new Object[]{ + channel.localAddress(), channel.remoteAddress(), buf}); + } + + // send to the dispatch stage + this.stage.onNext(this.getTransportEvent(content, channel)); + } else { + LOG.log(Level.SEVERE, "Unknown type of message received: {0}", msg); + throw new RemoteRuntimeException("Unknown type of message received " + msg); + } + } + + @Override + public void channelActive(final ChannelHandlerContext ctx) { + // noop + } + + @Override + protected TransportEvent getTransportEvent(final byte[] message, final Channel channel) { + return new TransportEvent(message, channel.localAddress(), channel.remoteAddress()); + } + + @Override + protected void exceptionCleanup(final ChannelHandlerContext ctx, final Throwable cause) { + this.closeChannel(ctx.channel()); + } +} diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyHttpLink.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyHttpLink.java new file mode 100644 index 0000000000..b96d770975 --- /dev/null +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyHttpLink.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.wake.remote.transport.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.handler.codec.http.*; +import org.apache.reef.wake.remote.Encoder; +import org.apache.reef.wake.remote.transport.Link; +import org.apache.reef.wake.remote.transport.LinkListener; + +import java.net.SocketAddress; +import java.net.URI; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Link implementation with Netty. + * + * If you set a {@code LinkListener}, it keeps message until writeAndFlush operation completes + * and notifies whether the sent message transferred successfully through the listener. + */ +public final class NettyHttpLink implements Link { + + public static final int INT_SIZE = Integer.SIZE / Byte.SIZE; + + private static final Logger LOG = Logger.getLogger(NettyHttpLink.class.getName()); + + private final Channel channel; + private final Encoder encoder; + private final LinkListener listener; + private final URI uri; + + /** + * Constructs a link. + * + * @param channel the channel + * @param encoder the encoder + * @param uri the URI + */ + public NettyHttpLink(final Channel channel, + final Encoder encoder, + final URI uri) { + this(channel, encoder, null, uri); + } + + /** + * Constructs a link. + * + * @param channel the channel + * @param encoder the encoder + * @param listener the link listener + * @param uri the URI + */ + public NettyHttpLink( + final Channel channel, + final Encoder encoder, + final LinkListener listener, + final URI uri) { + this.channel = channel; + this.encoder = encoder; + this.listener = listener; + this.uri = uri; + } + /** + * Writes the message to this link. + * + * @param message the message + */ + @Override + public void write(final T message) { + if (LOG.isLoggable(Level.FINEST)) { + LOG.log(Level.FINEST, "write {0} :: {1}", new Object[] {channel, message}); + } + final FullHttpRequest request = + new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, uri.getRawPath()); + final ByteBuf buf = Unpooled.wrappedBuffer(encoder.encode(message)); + request.headers() + .set(HttpHeaders.Names.HOST, uri.getHost()) + .set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE) + .set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP) + .set(HttpHeaders.Names.CONTENT_TYPE, "application/wake-transport") + .set(HttpHeaders.Names.CONTENT_LENGTH, buf.readableBytes()); + request.content().clear().writeBytes(buf); + final ChannelFuture future = channel.writeAndFlush(request); + if (listener != null) { + future.addListener(new NettyChannelFutureListener<>(message, listener)); + } + } + + /** + * Gets a local address of the link. + * + * @return a local socket address + */ + @Override + public SocketAddress getLocalAddress() { + return channel.localAddress(); + } + + /** + * Gets a remote address of the link. + * + * @return a remote socket address + */ + @Override + public SocketAddress getRemoteAddress() { + return channel.remoteAddress(); + } + + @Override + public String toString() { + return new StringBuilder() + .append("NettyLink: ") + .append(channel).toString(); // Channel has good .toString() implementation + } +} diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyHttpLinkFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyHttpLinkFactory.java new file mode 100644 index 0000000000..3491b96b8b --- /dev/null +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyHttpLinkFactory.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.wake.remote.transport.netty; + +import io.netty.channel.Channel; +import org.apache.reef.wake.remote.Encoder; +import org.apache.reef.wake.remote.transport.Link; +import org.apache.reef.wake.remote.transport.LinkListener; + +import java.net.URI; + +/** + * Factory that creates a NettyHttpLink. + */ +public final class NettyHttpLinkFactory implements NettyLinkFactory { + + private final URI uri; + + NettyHttpLinkFactory(final URI uri){ + this.uri = uri; + } + + @Override + public Link newInstance(final Channel channel, final Encoder encoder) { + return new NettyHttpLink(channel, encoder, uri); + } + + @Override + public Link newInstance(final Channel channel, + final Encoder encoder, + final LinkListener listener) { + return new NettyHttpLink(channel, encoder, listener, uri); + } +} diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyHttpServerEventListener.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyHttpServerEventListener.java new file mode 100644 index 0000000000..abc539b43b --- /dev/null +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyHttpServerEventListener.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.wake.remote.transport.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.DecoderResult; +import io.netty.handler.codec.http.*; +import io.netty.util.CharsetUtil; +import org.apache.reef.wake.EStage; +import org.apache.reef.wake.remote.exception.RemoteRuntimeException; +import org.apache.reef.wake.remote.impl.ByteCodec; +import org.apache.reef.wake.remote.impl.TransportEvent; + +import java.net.SocketAddress; +import java.net.URI; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.logging.Level; + +/** + * A Netty event listener for server side. + */ +final class NettyHttpServerEventListener extends AbstractNettyEventListener { + + private final URI uri; + private final NettyLinkFactory linkFactory; + + NettyHttpServerEventListener( + final ConcurrentMap addrToLinkRefMap, + final EStage stage, + final URI uri) { + super(addrToLinkRefMap, stage); + this.uri = uri; + this.linkFactory = new NettyHttpLinkFactory<>(uri); + } + + + @Override + public void channelActive(final ChannelHandlerContext ctx) { + final Channel channel = ctx.channel(); + + if (LOG.isLoggable(Level.FINEST)) { + LOG.log(Level.FINEST, "Channel active. key: {0}", channel.remoteAddress()); + } + + this.addrToLinkRefMap.putIfAbsent( + channel.remoteAddress(), new LinkReference(linkFactory.newInstance( + channel, new ByteCodec(), new LoggingLinkListener()))); + + LOG.log(Level.FINER, "Add connected channel ref: {0}", this.addrToLinkRefMap.get(channel.remoteAddress())); + + } + + @Override + public void channelRead(final ChannelHandlerContext ctx, final Object msg) { + if(msg instanceof FullHttpRequest) { + final FullHttpRequest request = (FullHttpRequest) msg; + final HttpHeaders headers = request.headers(); + final ByteBuf byteBuf = request.content(); + final Channel channel = ctx.channel(); + final byte[] content; + + if (byteBuf.hasArray()) { + content = byteBuf.array(); + } else { + content = new byte[byteBuf.readableBytes()]; + byteBuf.readBytes(content); + } + + if (LOG.isLoggable(Level.FINEST)) { + // log header to trailing header contents. + final StringBuilder buf = new StringBuilder(); + if (!headers.isEmpty()) { + for (final Map.Entry h : headers) { + final CharSequence key = h.getKey(); + final CharSequence value = h.getValue(); + buf.append("HEADER: ").append(key).append(" = ").append(value).append("\r\n"); + } + buf.append("\r\n"); + } + appendDecoderResult(buf, request); + if (byteBuf.isReadable()) { + buf.append("CONTENT: "); + buf.append(byteBuf.toString(CharsetUtil.UTF_8)); + buf.append("\r\n"); + appendDecoderResult(buf, request); + } + + buf.append("END OF CONTENT\r\n"); + if (!request.trailingHeaders().isEmpty()) { + buf.append("\r\n"); + for (CharSequence name : request.trailingHeaders().names()) { + for (CharSequence value : request.trailingHeaders().getAll(name)) { + buf.append("TRAILING HEADER: "); + buf.append(name).append(" = ").append(value).append("\r\n"); + } + } + buf.append("\r\n"); + } + LOG.log(Level.FINEST, "Received Message:\n{0}", buf.toString()); + LOG.log(Level.FINEST, "MessageEvent: local: {0} remote: {1} :: {2}", new Object[]{ + channel.localAddress(), channel.remoteAddress(), byteBuf}); + } + + // send to the dispatch stage + this.stage.onNext(this.getTransportEvent(content, channel)); + } else { + LOG.log(Level.SEVERE, "Unknown type of message received: {0}", msg); + throw new RemoteRuntimeException("Unknown type of message received " + msg); + } + } + + private static void appendDecoderResult(final StringBuilder buf, final HttpObject o) { + final DecoderResult result = o.getDecoderResult(); + if (!result.isSuccess()) { + buf.append(".. WITH DECODER FAILURE: ").append(result.cause()).append("\r\n"); + } + } + + + + @Override + protected TransportEvent getTransportEvent(final byte[] message, final Channel channel) { + return new TransportEvent(message, linkFactory.newInstance(channel, new ByteEncoder())); + } + + @Override + protected void exceptionCleanup(final ChannelHandlerContext ctx, final Throwable cause) { + // noop + } +} diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java index 3c51f2a1c2..61c30c8d11 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java @@ -21,7 +21,6 @@ import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; import org.apache.reef.wake.remote.Encoder; import org.apache.reef.wake.remote.transport.Link; import org.apache.reef.wake.remote.transport.LinkListener; @@ -78,7 +77,7 @@ public NettyLink(final Channel channel, final Encoder encoder, final public void write(final T message) { LOG.log(Level.FINEST, "write {0} :: {1}", new Object[] {channel, message}); final ChannelFuture future = channel.writeAndFlush(Unpooled.wrappedBuffer(encoder.encode(message))); - if (listener != null) { + if (listener != null) { future.addListener(new NettyChannelFutureListener<>(message, listener)); } } @@ -108,23 +107,3 @@ public String toString() { return "NettyLink: " + channel; // Channel has good .toString() implementation } } - -class NettyChannelFutureListener implements ChannelFutureListener { - - private final T message; - private LinkListener listener; - - NettyChannelFutureListener(final T message, final LinkListener listener) { - this.message = message; - this.listener = listener; - } - - @Override - public void operationComplete(final ChannelFuture channelFuture) throws Exception { - if (channelFuture.isSuccess()) { - listener.onSuccess(message); - } else { - listener.onException(channelFuture.cause(), channelFuture.channel().remoteAddress(), message); - } - } -} diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLinkFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLinkFactory.java new file mode 100644 index 0000000000..94c62a5891 --- /dev/null +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLinkFactory.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.wake.remote.transport.netty; + +import io.netty.channel.Channel; +import org.apache.reef.tang.annotations.DefaultImplementation; +import org.apache.reef.wake.remote.Encoder; +import org.apache.reef.wake.remote.transport.Link; +import org.apache.reef.wake.remote.transport.LinkListener; + +/** + * Factory that creates a NettyLink. + */ +@DefaultImplementation(NettyDefaultLinkFactory.class) +public interface NettyLinkFactory { + + /** + * Creates a NettyLink. + * @param channel the channel + * @param encoder the encoder + */ + Link newInstance(final Channel channel, final Encoder encoder); + + /** + * Creates a NettyLink. + * @param channel the channel + * @param encoder the encoder + * @param listener the listener + */ + Link newInstance(final Channel channel, + final Encoder encoder, + final LinkListener listener); +} diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java index 2643030116..9dbd3c4882 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java @@ -45,14 +45,12 @@ import org.apache.reef.wake.remote.transport.Link; import org.apache.reef.wake.remote.transport.LinkListener; import org.apache.reef.wake.remote.transport.Transport; +import org.apache.reef.wake.remote.transport.TransportFactory.ProtocolType; import org.apache.reef.wake.remote.transport.exception.TransportRuntimeException; import javax.inject.Inject; import java.io.IOException; -import java.net.BindException; -import java.net.ConnectException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; +import java.net.*; import java.util.ArrayList; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; @@ -62,7 +60,7 @@ import java.util.logging.Logger; /** - * Messaging transport implementation with Netty. + * Messaging transport implementation with Netty and Http. */ public final class NettyMessagingTransport implements Transport { @@ -95,8 +93,10 @@ public final class NettyMessagingTransport implements Transport { private final int serverPort; private final SocketAddress localAddress; - private final NettyClientEventListener clientEventListener; - private final NettyServerEventListener serverEventListener; + private final AbstractNettyEventListener clientEventListener; + private final AbstractNettyEventListener serverEventListener; + + private final URI uri; private final int numberOfTries; private final int retryTimeout; @@ -111,6 +111,7 @@ public final class NettyMessagingTransport implements Transport { * @param numberOfTries the number of tries of connection * @param retryTimeout the timeout of reconnection * @param tcpPortProvider gives an iterator that produces random tcp ports in a range + * @param protocolType the protocol to use for transport */ @Inject private NettyMessagingTransport( @@ -121,7 +122,8 @@ private NettyMessagingTransport( @Parameter(RemoteConfiguration.NumberOfTries.class) final int numberOfTries, @Parameter(RemoteConfiguration.RetryTimeout.class) final int retryTimeout, final TcpPortProvider tcpPortProvider, - final LocalAddressProvider localAddressProvider) { + final LocalAddressProvider localAddressProvider, + @Parameter(RemoteConfiguration.Protocol.class) final ProtocolType protocolType) { int p = port; if (p < 0) { @@ -130,10 +132,29 @@ private NettyMessagingTransport( final String host = UNKNOWN_HOST_NAME.equals(hostAddress) ? localAddressProvider.getLocalAddress() : hostAddress; + //TODO[JIRA REEF-1871] Implement HTTPS with sslContext. + + // for HTTP and default Netty + if (protocolType == ProtocolType.HTTP) { + try{ + this.uri = URI.create("http://" + host); + } catch (final IllegalArgumentException e){ + throw new RemoteRuntimeException("Invalid host address: " + host, e); + } + } else { + this.uri = null; + } + this.numberOfTries = numberOfTries; this.retryTimeout = retryTimeout; - this.clientEventListener = new NettyClientEventListener(this.addrToLinkRefMap, clientStage); - this.serverEventListener = new NettyServerEventListener(this.addrToLinkRefMap, serverStage); + if (protocolType == ProtocolType.TCP) { + this.clientEventListener = new NettyClientEventListener(this.addrToLinkRefMap, clientStage); + this.serverEventListener = new NettyServerEventListener(this.addrToLinkRefMap, serverStage); + } else { + this.clientEventListener = new NettyHttpClientEventListener(this.addrToLinkRefMap, clientStage); + this.serverEventListener = new NettyHttpServerEventListener(this.addrToLinkRefMap, serverStage, this.uri); + } + this.serverBossGroup = new NioEventLoopGroup(SERVER_BOSS_NUM_THREADS, new DefaultThreadFactory(CLASS_NAME + ":ServerBoss")); @@ -146,7 +167,7 @@ private NettyMessagingTransport( this.clientBootstrap.group(this.clientWorkerGroup) .channel(NioSocketChannel.class) .handler(new NettyChannelInitializer(new NettyDefaultChannelHandlerFactory("client", - this.clientChannelGroup, this.clientEventListener))) + this.clientChannelGroup, this.clientEventListener), protocolType)) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_KEEPALIVE, true); @@ -154,7 +175,7 @@ private NettyMessagingTransport( this.serverBootstrap.group(this.serverBossGroup, this.serverWorkerGroup) .channel(NioServerSocketChannel.class) .childHandler(new NettyChannelInitializer(new NettyDefaultChannelHandlerFactory("server", - this.serverChannelGroup, this.serverEventListener))) + this.serverChannelGroup, this.serverEventListener), protocolType, true)) .option(ChannelOption.SO_BACKLOG, 128) .option(ChannelOption.SO_REUSEADDR, true) .childOption(ChannelOption.SO_KEEPALIVE, true); @@ -311,7 +332,10 @@ public Link open(final SocketAddress remoteAddr, final Encoder connectFuture = this.clientBootstrap.connect(remoteAddr); connectFuture.syncUninterruptibly(); - link = new NettyLink<>(connectFuture.channel(), encoder, listener); + final NettyLinkFactory linkFactory = + uri == null ? new NettyDefaultLinkFactory<>() : new NettyHttpLinkFactory(uri); + + link = linkFactory.newInstance(connectFuture.channel(), encoder, listener); linkRef.setLink(link); synchronized (flag) { diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyServerEventListener.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyServerEventListener.java index d4d8f888bc..9657f0dcd9 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyServerEventListener.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyServerEventListener.java @@ -33,6 +33,8 @@ */ final class NettyServerEventListener extends AbstractNettyEventListener { + private NettyLinkFactory linkFactory = new NettyDefaultLinkFactory(); + NettyServerEventListener( final ConcurrentMap addrToLinkRefMap, final EStage stage) { @@ -49,7 +51,7 @@ public void channelActive(final ChannelHandlerContext ctx) { } this.addrToLinkRefMap.putIfAbsent( - channel.remoteAddress(), new LinkReference(new NettyLink<>( + channel.remoteAddress(), new LinkReference(linkFactory.newInstance( channel, new ByteCodec(), new LoggingLinkListener()))); LOG.log(Level.FINER, "Add connected channel ref: {0}", this.addrToLinkRefMap.get(channel.remoteAddress())); @@ -58,7 +60,7 @@ public void channelActive(final ChannelHandlerContext ctx) { @Override protected TransportEvent getTransportEvent(final byte[] message, final Channel channel) { - return new TransportEvent(message, new NettyLink<>(channel, new ByteEncoder())); + return new TransportEvent(message, linkFactory.newInstance(channel, new ByteEncoder())); } @Override diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java index 08426c0796..ee2115f91f 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java @@ -19,6 +19,7 @@ package org.apache.reef.wake.test.remote; import org.apache.reef.tang.Injector; +import org.apache.reef.tang.JavaConfigurationBuilder; import org.apache.reef.tang.Tang; import org.apache.reef.tang.exceptions.InjectionException; import org.apache.reef.wake.EventHandler; @@ -31,6 +32,8 @@ import org.apache.reef.wake.remote.impl.MultiCodec; import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; import org.apache.reef.wake.remote.ports.TcpPortProvider; +import org.apache.reef.wake.remote.transport.TransportFactory; +import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory; import org.apache.reef.wake.test.util.Monitor; import org.apache.reef.wake.test.util.TimeoutHandler; import org.junit.Assert; @@ -55,12 +58,19 @@ public class RemoteManagerTest { private final LocalAddressProvider localAddressProvider; - private final RemoteManagerFactory remoteManagerFactory; + private final RemoteManagerFactory remoteManagerFactoryTcp; + private final RemoteManagerFactory remoteManagerFactoryHttp; public RemoteManagerTest() throws InjectionException { - final Injector injector = Tang.Factory.getTang().newInjector(); + final Tang tang = Tang.Factory.getTang(); + final Injector injector = tang.newInjector(); this.localAddressProvider = injector.getInstance(LocalAddressProvider.class); - this.remoteManagerFactory = injector.getInstance(RemoteManagerFactory.class); + this.remoteManagerFactoryTcp = injector.getInstance(RemoteManagerFactory.class); + + final JavaConfigurationBuilder builder = Tang.Factory.getTang().newConfigurationBuilder(); + builder.bindImplementation(TransportFactory.class, MessagingTransportFactory.class); + builder.bindNamedParameter(RemoteConfiguration.Protocol.class, "HTTP"); + this.remoteManagerFactoryHttp = tang.newInjector(builder.build()).getInstance(RemoteManagerFactory.class); } @Rule @@ -70,6 +80,65 @@ public RemoteManagerTest() throws InjectionException { @Test public void testRemoteManagerTest() throws Exception { + remoteManagerTest(remoteManagerFactoryTcp); + } + + @Test + public void testRemoteManagerConnectionRetryTest() throws Exception { + remoteManagerConnectionRetryTest(remoteManagerFactoryTcp); + } + + @Test + public void testRemoteManagerConnectionRetryWithMultipleSenderTest() throws Exception { + remoteManagerConnectionRetryWithMultipleSenderTest(remoteManagerFactoryTcp); + } + + @Test + public void testRemoteManagerOrderingGuaranteeTest() throws Exception { + remoteManagerOrderingGuaranteeTest(remoteManagerFactoryTcp); + } + + @Test + public void testRemoteManagerPBufTest() throws Exception { + remoteManagerPBufTest(remoteManagerFactoryTcp); + } + + @Test + public void testRemoteManagerExceptionTest() { + remoteManagerExceptionTest(remoteManagerFactoryTcp); + } + + @Test + public void testHttpRemoteManagerTest() throws Exception { + remoteManagerTest(remoteManagerFactoryHttp); + } + + @Test + public void testHttpRemoteManagerConnectionRetryTest() throws Exception { + remoteManagerConnectionRetryTest(remoteManagerFactoryHttp); + } + + @Test + public void testHttpRemoteManagerConnectionRetryWithMultipleSenderTest() throws Exception { + remoteManagerConnectionRetryWithMultipleSenderTest(remoteManagerFactoryHttp); + } + + @Test + public void testHttpRemoteManagerOrderingGuaranteeTest() throws Exception { + remoteManagerOrderingGuaranteeTest(remoteManagerFactoryHttp); + } + + @Test + public void testHttpRemoteManagerPBufTest() throws Exception { + remoteManagerPBufTest(remoteManagerFactoryHttp); + } + + @Test + public void testHttpRemoteManagerExceptionTest() { + remoteManagerExceptionTest(remoteManagerFactoryHttp); + } + + private void remoteManagerTest(final RemoteManagerFactory remoteManagerFactory) throws Exception { System.out.println(LOG_PREFIX + name.getMethodName()); LoggingUtils.setLoggingLevel(Level.INFO); @@ -85,7 +154,7 @@ public void testRemoteManagerTest() throws Exception { final String hostAddress = localAddressProvider.getLocalAddress(); - final RemoteManager rm = this.remoteManagerFactory.getInstance( + final RemoteManager rm = remoteManagerFactory.getInstance( "name", hostAddress, 0, codec, new LoggingEventHandler(), false, 3, 10000, localAddressProvider, Tang.Factory.getTang().newInjector().getInstance(TcpPortProvider.class)); @@ -114,17 +183,18 @@ public void testRemoteManagerTest() throws Exception { timer.close(); } - @Test - public void testRemoteManagerConnectionRetryTest() throws Exception { + private void remoteManagerConnectionRetryTest(final RemoteManagerFactory remoteManagerFactory) throws Exception { final ExecutorService smExecutor = Executors.newFixedThreadPool(1); final ExecutorService rmExecutor = Executors.newFixedThreadPool(1); - final RemoteManager sendingManager = getTestRemoteManager("sender", 9020, 3, 2000); + final RemoteManager sendingManager = + getTestRemoteManager(remoteManagerFactory, "sender", 9020, 3, 2000); final Future smFuture = smExecutor.submit(new SendingRemoteManagerThread(sendingManager, 9010, 20000)); Thread.sleep(1000); - final RemoteManager receivingManager = getTestRemoteManager("receiver", 9010, 1, 2000); + final RemoteManager receivingManager = + getTestRemoteManager(remoteManagerFactory, "receiver", 9010, 1, 2000); final Future rmFuture = rmExecutor.submit(new ReceivingRemoteManagerThread(receivingManager, 20000, 1, 2)); final int smCnt = smFuture.get(); @@ -137,14 +207,15 @@ public void testRemoteManagerConnectionRetryTest() throws Exception { Assert.assertEquals(2, rmCnt); } - @Test - public void testRemoteManagerConnectionRetryWithMultipleSenderTest() throws Exception { + private void remoteManagerConnectionRetryWithMultipleSenderTest( + final RemoteManagerFactory remoteManagerFactory) throws Exception { final int numOfSenderThreads = 5; final ExecutorService smExecutor = Executors.newFixedThreadPool(numOfSenderThreads); final ExecutorService rmExecutor = Executors.newFixedThreadPool(1); final ArrayList> smFutures = new ArrayList<>(numOfSenderThreads); - final RemoteManager sendingManager = getTestRemoteManager("sender", 9030, 3, 5000); + final RemoteManager sendingManager = + getTestRemoteManager(remoteManagerFactory, "sender", 9030, 3, 5000); for (int i = 0; i < numOfSenderThreads; i++) { smFutures.add(smExecutor.submit(new SendingRemoteManagerThread(sendingManager, 9010, 20000))); @@ -152,7 +223,8 @@ public void testRemoteManagerConnectionRetryWithMultipleSenderTest() throws Exce Thread.sleep(2000); - final RemoteManager receivingManager = getTestRemoteManager("receiver", 9010, 1, 2000); + final RemoteManager receivingManager = + getTestRemoteManager(remoteManagerFactory, "receiver", 9010, 1, 2000); final Future receivingFuture = rmExecutor.submit(new ReceivingRemoteManagerThread(receivingManager, 20000, numOfSenderThreads, 2)); @@ -171,8 +243,7 @@ public void testRemoteManagerConnectionRetryWithMultipleSenderTest() throws Exce Assert.assertEquals(2 * numOfSenderThreads, rmCnt); } - @Test - public void testRemoteManagerOrderingGuaranteeTest() throws Exception { + private void remoteManagerOrderingGuaranteeTest(final RemoteManagerFactory remoteManagerFactory) throws Exception { System.out.println(LOG_PREFIX + name.getMethodName()); LoggingUtils.setLoggingLevel(Level.INFO); @@ -188,7 +259,7 @@ public void testRemoteManagerOrderingGuaranteeTest() throws Exception { final String hostAddress = localAddressProvider.getLocalAddress(); - final RemoteManager rm = this.remoteManagerFactory.getInstance( + final RemoteManager rm = remoteManagerFactory.getInstance( "name", hostAddress, 0, codec, new LoggingEventHandler(), true, 3, 10000, localAddressProvider, Tang.Factory.getTang().newInjector().getInstance(TcpPortProvider.class)); @@ -217,8 +288,7 @@ public void testRemoteManagerOrderingGuaranteeTest() throws Exception { timer.close(); } - @Test - public void testRemoteManagerPBufTest() throws Exception { + private void remoteManagerPBufTest(final RemoteManagerFactory remoteManagerFactory) throws Exception { System.out.println(LOG_PREFIX + name.getMethodName()); LoggingUtils.setLoggingLevel(Level.INFO); @@ -231,7 +301,7 @@ public void testRemoteManagerPBufTest() throws Exception { final String hostAddress = localAddressProvider.getLocalAddress(); - final RemoteManager rm = this.remoteManagerFactory.getInstance( + final RemoteManager rm = remoteManagerFactory.getInstance( "name", hostAddress, 0, codec, new LoggingEventHandler(), false, 3, 10000, localAddressProvider, Tang.Factory.getTang().newInjector().getInstance(TcpPortProvider.class)); @@ -253,8 +323,7 @@ public void testRemoteManagerPBufTest() throws Exception { timer.close(); } - @Test - public void testRemoteManagerExceptionTest() { + private void remoteManagerExceptionTest(final RemoteManagerFactory remoteManagerFactory) { System.out.println(LOG_PREFIX + name.getMethodName()); LoggingUtils.setLoggingLevel(Level.INFO); @@ -285,7 +354,8 @@ public void testRemoteManagerExceptionTest() { } } - private RemoteManager getTestRemoteManager(final String rmName, final int localPort, + private RemoteManager getTestRemoteManager(final RemoteManagerFactory remoteManagerFactory, + final String rmName, final int localPort, final int retry, final int retryTimeout) { final Map, Codec> clazzToCodecMap = new HashMap<>(); clazzToCodecMap.put(StartEvent.class, new ObjectSerializableCodec()); diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TransportTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TransportTest.java index 5d5d4d7cde..2bdeb595ea 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TransportTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TransportTest.java @@ -19,12 +19,14 @@ package org.apache.reef.wake.test.remote; import org.apache.reef.tang.Injector; +import org.apache.reef.tang.JavaConfigurationBuilder; import org.apache.reef.tang.Tang; import org.apache.reef.tang.exceptions.InjectionException; import org.apache.reef.wake.EStage; import org.apache.reef.wake.impl.LoggingUtils; import org.apache.reef.wake.impl.TimerStage; import org.apache.reef.wake.remote.Codec; +import org.apache.reef.wake.remote.RemoteConfiguration; import org.apache.reef.wake.remote.address.LocalAddressProvider; import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; import org.apache.reef.wake.remote.impl.TransportEvent; @@ -32,6 +34,7 @@ import org.apache.reef.wake.remote.transport.Transport; import org.apache.reef.wake.remote.transport.netty.LoggingLinkListener; import org.apache.reef.wake.remote.transport.TransportFactory; +import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory; import org.apache.reef.wake.test.util.Monitor; import org.apache.reef.wake.test.util.TimeoutHandler; import org.junit.Assert; @@ -49,12 +52,20 @@ */ public class TransportTest { private final LocalAddressProvider localAddressProvider; - private final TransportFactory tpFactory; + private final TransportFactory tpTcpFactory; + private final TransportFactory tpHttpFactory; public TransportTest() throws InjectionException { - final Injector injector = Tang.Factory.getTang().newInjector(); + final Tang tang = Tang.Factory.getTang(); + + final Injector injector = tang.newInjector(); this.localAddressProvider = injector.getInstance(LocalAddressProvider.class); - this.tpFactory = injector.getInstance(TransportFactory.class); + this.tpTcpFactory = injector.getInstance(TransportFactory.class); + + //for set protocol to HTTP + final JavaConfigurationBuilder jcb = tang.newConfigurationBuilder(); + jcb.bindNamedParameter(RemoteConfiguration.Protocol.class, TransportFactory.ProtocolType.HTTP.name()); + this.tpHttpFactory = tang.newInjector(jcb.build()).getInstance(MessagingTransportFactory.class); } private static final String LOG_PREFIX = "TEST "; @@ -63,6 +74,25 @@ public TransportTest() throws InjectionException { @Test public void testTransportString() throws Exception { + transportString(tpTcpFactory); + } + + @Test + public void testTransportTestEvent() throws Exception { + transportTestEvent(tpTcpFactory); + } + + @Test + public void testHttpTransportString() throws Exception { + transportString(tpHttpFactory); + } + + @Test + public void testHttpTransportTestEvent() throws Exception { + transportTestEvent(tpHttpFactory); + } + + private void transportString(final TransportFactory tpFactory) throws Exception{ System.out.println(LOG_PREFIX + name.getMethodName()); LoggingUtils.setLoggingLevel(Level.INFO); @@ -93,8 +123,7 @@ public void testTransportString() throws Exception { Assert.assertEquals(expected, stage.getCount()); } - @Test - public void testTransportTestEvent() throws Exception { + private void transportTestEvent(final TransportFactory tpFactory) throws Exception{ System.out.println(LOG_PREFIX + name.getMethodName()); LoggingUtils.setLoggingLevel(Level.INFO);