Skip to content

Commit

Permalink
Merge pull request #371 from matyasberry/feature/transport-tweaks
Browse files Browse the repository at this point in the history
Alternative transport implementations
  • Loading branch information
artem-v authored Oct 29, 2021
2 parents 730ce0d + e324722 commit 23d1e4d
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.scalecube.cluster.transport.api;

import io.scalecube.net.Address;
import java.util.StringJoiner;
import java.util.function.Function;
import reactor.core.Exceptions;

public final class TransportConfig implements Cloneable {
Expand All @@ -20,6 +22,7 @@ public final class TransportConfig implements Cloneable {
private MessageCodec messageCodec = MessageCodec.INSTANCE;
private int maxFrameLength = 2 * 1024 * 1024; // 2 MB
private TransportFactory transportFactory;
private Function<Address, Address> addressMapper = Function.identity();

public TransportConfig() {}

Expand Down Expand Up @@ -134,6 +137,22 @@ public TransportConfig maxFrameLength(int maxFrameLength) {
return t;
}

/**
* Setter for {@code addressMapper}.
*
* @param addressMapper address mapper
* @return new {@code TransportConfig} instance
*/
public TransportConfig addressMapper(Function<Address, Address> addressMapper) {
TransportConfig t = clone();
t.addressMapper = addressMapper;
return t;
}

public Function<Address, Address> addressMapper() {
return addressMapper;
}

public TransportFactory transportFactory() {
return transportFactory;
}
Expand Down Expand Up @@ -168,6 +187,7 @@ public String toString() {
.add("messageCodec=" + messageCodec)
.add("maxFrameLength=" + maxFrameLength)
.add("transportFactory=" + transportFactory)
.add("addressMapper=" + addressMapper)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,27 @@ public final class TransportImpl implements Transport {
// Transport factory
private final Receiver receiver;
private final Sender sender;
private final Function<Address, Address> addressMapper;

/**
* Constructor with cofig as parameter.
* Constructor with config as parameter.
*
* @param messageCodec message codec
* @param receiver transport receiver part
* @param sender transport sender part
* @param addressMapper function to map addresses. Useful when running against NAT-ed
* environments. Used during connection setup so that the actual connection is established
* against <code>addressMapper.apply(origAddress) destination</code>
*/
public TransportImpl(MessageCodec messageCodec, Receiver receiver, Sender sender) {
public TransportImpl(
MessageCodec messageCodec,
Receiver receiver,
Sender sender,
Function<Address, Address> addressMapper) {
this.messageCodec = messageCodec;
this.receiver = receiver;
this.sender = sender;
this.addressMapper = addressMapper;
}

private static Address prepareAddress(DisposableServer server) {
Expand Down Expand Up @@ -216,8 +225,9 @@ private ByteBuf encodeMessage(Message message) {
}

private Mono<? extends Connection> connect(Address remoteAddress) {
final Address mappedAddr = addressMapper.apply(remoteAddress);
return sender
.connect(remoteAddress)
.connect(mappedAddr)
.doOnSuccess(
connection -> {
connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

final class TcpReceiver implements Receiver {
public final class TcpReceiver implements Receiver {

private final TransportConfig config;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;

final class TcpSender implements Sender {
public final class TcpSender implements Sender {

private final TransportConfig config;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ public final class TcpTransportFactory implements TransportFactory {

@Override
public Transport createTransport(TransportConfig config) {
return new TransportImpl(config.messageCodec(), new TcpReceiver(config), new TcpSender(config));
return new TransportImpl(
config.messageCodec(),
new TcpReceiver(config),
new TcpSender(config),
config.addressMapper());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
import reactor.netty.http.websocket.WebsocketInbound;
import reactor.netty.http.websocket.WebsocketOutbound;

final class WebsocketReceiver implements Receiver {
public final class WebsocketReceiver implements Receiver {

private final TransportConfig config;

WebsocketReceiver(TransportConfig config) {
public WebsocketReceiver(TransportConfig config) {
this.config = config;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.WebsocketClientSpec;

final class WebsocketSender implements Sender {
public final class WebsocketSender implements Sender {

private final TransportConfig config;

WebsocketSender(TransportConfig config) {
public WebsocketSender(TransportConfig config) {
this.config = config;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ public final class WebsocketTransportFactory implements TransportFactory {
@Override
public Transport createTransport(TransportConfig config) {
return new TransportImpl(
config.messageCodec(), new WebsocketReceiver(config), new WebsocketSender(config));
config.messageCodec(),
new WebsocketReceiver(config),
new WebsocketSender(config),
config.addressMapper());
}
}

0 comments on commit 23d1e4d

Please sign in to comment.