diff --git a/src/org/jgroups/blocks/cs/TcpBaseServer.java b/src/org/jgroups/blocks/cs/TcpBaseServer.java
index 94e4e19177..b8a5245bac 100644
--- a/src/org/jgroups/blocks/cs/TcpBaseServer.java
+++ b/src/org/jgroups/blocks/cs/TcpBaseServer.java
@@ -10,9 +10,11 @@
* @since 3.6.5
*/
public abstract class TcpBaseServer extends BaseServer {
- protected int peer_addr_read_timeout=2000; // max time in milliseconds to block on reading peer address
- protected int buffered_inputstream_size=8192;
- protected int buffered_outputstream_size=8192;
+ protected int peer_addr_read_timeout=2000; // max time in milliseconds to block on reading peer address
+ protected int buffered_inputstream_size=8192;
+ protected int buffered_outputstream_size=8192;
+ protected boolean non_blocking_sends; // https://issues.redhat.com/browse/JGRP-2759
+ protected int max_send_queue=128; // when non_blocking, how many messages to queue max?
protected TcpBaseServer(ThreadFactory f, SocketFactory sf, int recv_buf_size) {
super(f, sf, recv_buf_size);
@@ -20,7 +22,8 @@ protected TcpBaseServer(ThreadFactory f, SocketFactory sf, int recv_buf_size) {
@Override
protected TcpConnection createConnection(Address dest) throws Exception {
- return new TcpConnection(dest, this);
+ return non_blocking_sends? new TcpConnectionNonBlocking(dest, this).maxSize(max_send_queue)
+ : new TcpConnection(dest, this);
}
@@ -30,5 +33,9 @@ protected TcpConnection createConnection(Address dest) throws Exception {
public TcpBaseServer setBufferedInputStreamSize(int s) {this.buffered_inputstream_size=s; return this;}
public int getBufferedOutputStreamSize() {return buffered_outputstream_size;}
public TcpBaseServer setBufferedOutputStreamSize(int s) {this.buffered_outputstream_size=s; return this;}
+ public boolean nonBlockingSends() {return non_blocking_sends;}
+ public TcpBaseServer nonBlockingSends(boolean b) {this.non_blocking_sends=b; return this;}
+ public int maxSendQueue() {return max_send_queue;}
+ public TcpBaseServer maxSendQueue(int s) {this.max_send_queue=s; return this;}
}
diff --git a/src/org/jgroups/blocks/cs/TcpClient.java b/src/org/jgroups/blocks/cs/TcpClient.java
index 0fadde9212..9bef85a8e5 100644
--- a/src/org/jgroups/blocks/cs/TcpClient.java
+++ b/src/org/jgroups/blocks/cs/TcpClient.java
@@ -107,11 +107,6 @@ public void connectionClosed(Connection conn) {
stop();
}
- @Override
- public void connectionEstablished(Connection conn) {
-
- }
-
public String toString() {
if(conn == null || !conn.isConnected())
return String.format("%s -> %s [not connected]", localAddress(), remoteAddress());
diff --git a/src/org/jgroups/blocks/cs/TcpConnection.java b/src/org/jgroups/blocks/cs/TcpConnection.java
index 1fd4521c0b..b84545c91b 100644
--- a/src/org/jgroups/blocks/cs/TcpConnection.java
+++ b/src/org/jgroups/blocks/cs/TcpConnection.java
@@ -160,15 +160,20 @@ public void send(ByteBuffer buf) throws Exception {
}
}
-
@GuardedBy("send_lock")
protected void doSend(byte[] data, int offset, int length) throws Exception {
+ doSend(data, offset, length, false);
+ }
+
+ @GuardedBy("send_lock")
+ protected void doSend(byte[] data, int offset, int length, boolean flush) throws Exception {
Bits.writeInt(length, length_buf, 0); // write the length of the data buffer first
out.write(length_buf, 0, length_buf.length);
out.write(data, offset, length);
+ if(flush)
+ out.flush();
}
-
public void flush() {
try {
out.flush();
diff --git a/src/org/jgroups/blocks/cs/TcpConnectionNonBlocking.java b/src/org/jgroups/blocks/cs/TcpConnectionNonBlocking.java
new file mode 100644
index 0000000000..f7ebe79d7d
--- /dev/null
+++ b/src/org/jgroups/blocks/cs/TcpConnectionNonBlocking.java
@@ -0,0 +1,164 @@
+package org.jgroups.blocks.cs;
+
+import org.jgroups.Address;
+import org.jgroups.util.ByteArray;
+import org.jgroups.util.ThreadFactory;
+import org.jgroups.util.Util;
+
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLHandshakeException;
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.LongAdder;
+
+/**
+ * TCP connection which (despite the fancy name) blocks only a single thread at most. Uses a bounded queue, to which
+ * senders add their messages, and a single consumer sending the messages. When the queue is full, messages will
+ * get dropped. Therefore, at most one thread is blocked on TCP write when the send-window is full.
+ * Link: https://issues.redhat.com/browse/JGRP-2759
+ * @author Bela Ban
+ * @since 5.3.3
+ */
+public class TcpConnectionNonBlocking extends TcpConnection {
+ protected BlockingQueue queue;
+ protected int max_size=128;
+ protected volatile Sender sender;
+ protected final LongAdder dropped_msgs=new LongAdder();
+
+
+ public TcpConnectionNonBlocking(Address peer_addr, TcpBaseServer server) throws Exception {
+ super(peer_addr, server);
+ }
+
+ public TcpConnectionNonBlocking(Socket s, TcpServer server) throws Exception {
+ super(s, server);
+ }
+
+
+ public int maxSize() {return max_size;}
+ public TcpConnectionNonBlocking maxSize(int s) {max_size=s; return this;}
+ public long droppedMessages() {return dropped_msgs.sum();}
+ public int queueSize() {return queue != null? queue.size() : 0;}
+
+
+ @Override public void start() {
+ super.start();
+ queue=new ArrayBlockingQueue<>(max_size);
+ if(sender != null)
+ sender.stop();
+ sender=new Sender(server.factory).start();
+ }
+
+ @Override public void close() throws IOException {
+ super.close();
+ if(sender != null) {
+ sender.stop();
+ sender=null;
+ }
+ }
+
+ @Override
+ public void send(byte[] data, int offset, int length) throws Exception {
+
+ // to be on the safe side, we copy the data: some bundlers (e.g. TransferQueueBundler) reuse a buffer to
+ // serialize messages to and - before the data is sent by the sender thread - the buffer might be changed!
+ // This is similar to what NioConnection does on a partial write. If the send was synchronous (like in
+ // TcpConnection), we would not have to copy the data
+ ByteArray buf=new ByteArray(data, offset, length).copy();
+ boolean added=queue.offer(buf);
+ if(!added)
+ dropped_msgs.increment();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s [%d/%d, %d drops, sender: %b]", super.toString(), queueSize(), maxSize(),
+ droppedMessages(), senderRunning());
+ }
+
+ protected String name() {
+ InetAddress local=sock.getLocalAddress(), remote=sock.getInetAddress();
+ String l=local != null? Util.shortName(local) : "";
+ String r=remote != null? Util.shortName(remote) : "";
+ return String.format("Connection.Sender [%s:%s-%s:%s]", l, sock.getLocalPort(), r, sock.getPort());
+ }
+
+
+
+ protected boolean senderRunning() {
+ final Sender tmp=sender;
+ return tmp != null && tmp.running();
+ }
+
+
+ protected class Sender implements Runnable {
+ protected final Thread thread;
+ protected volatile boolean running=true;
+
+ public Sender(ThreadFactory f) {
+ String name=name();
+ thread=f != null? f.newThread(this, name) : new Thread(this, name);
+ }
+
+ public Sender start() {
+ running=true;
+ thread.start();
+ return this;
+ }
+
+ public Sender stop() {
+ running=false;
+ Thread t=thread;
+ if(t != null && t.isAlive())
+ t.interrupt();
+ return this;
+ }
+
+ public boolean running() {
+ return running && isConnected();
+ }
+
+ @Override public void run() {
+ try {
+ while(running()) {
+ ByteArray data;
+ try {
+ data=queue.take();
+ }
+ catch(InterruptedException iex) {
+ continue;
+ }
+
+ // no synchronization needed as this thread is the only sender
+ doSend(data.getArray(), data.getOffset(), data.getLength(), true); // flush
+ }
+ }
+ catch(EOFException | SocketException ex) {
+ ; // regular use case when a peer closes its connection - we don't want to log this as exception
+ }
+ catch(Exception e) {
+ //noinspection StatementWithEmptyBody
+ if (e instanceof SSLException && e.getMessage().contains("Socket closed")) {
+ ; // regular use case when a peer closes its connection - we don't want to log this as exception
+ }
+ else if (e instanceof SSLHandshakeException && e.getCause() instanceof EOFException) {
+ ; // Ignore SSL handshakes closed early (usually liveness probes)
+ }
+ else {
+ if(server.logDetails())
+ server.log.warn("failed sending message", e);
+ else
+ server.log.warn("failed sending message: " + e);
+ }
+ }
+ finally {
+ server.notifyConnectionClosed(TcpConnectionNonBlocking.this);
+ }
+ }
+ }
+}
diff --git a/src/org/jgroups/blocks/cs/TcpServer.java b/src/org/jgroups/blocks/cs/TcpServer.java
index d11fff6672..5dfe15da6e 100644
--- a/src/org/jgroups/blocks/cs/TcpServer.java
+++ b/src/org/jgroups/blocks/cs/TcpServer.java
@@ -115,7 +115,9 @@ public void run() {
protected void handleAccept(final Socket client_sock) throws Exception {
TcpConnection conn=null;
try {
- conn=new TcpConnection(client_sock, TcpServer.this);
+ conn=non_blocking_sends? new TcpConnectionNonBlocking(client_sock, TcpServer.this).maxSize(max_send_queue)
+ : new TcpConnection(client_sock, TcpServer.this);
+
Address peer_addr=conn.peerAddress();
synchronized(this) {
boolean conn_exists=hasConnection(peer_addr),
diff --git a/src/org/jgroups/demos/Chat.java b/src/org/jgroups/demos/Chat.java
index 2c4acdc66b..0f2ec51e2f 100644
--- a/src/org/jgroups/demos/Chat.java
+++ b/src/org/jgroups/demos/Chat.java
@@ -33,15 +33,6 @@ private void start(String props, String name, boolean nohup) throws Exception {
try {
channel=new JChannel(props).name(name);
channel.setReceiver(this);
-
- // todo: remove once https://issues.redhat.com/browse/JGRP-2689 has been implemented
- /*
- TP tp=channel.getProtocolStack().getTransport();
- SSLContext context=TLS.getSSLContext();
- DefaultSocketFactory ssl_factory=new DefaultSocketFactory(context);
- tp.setSocketFactory(ssl_factory);
- */
-
channel.connect(CLUSTER);
}
catch(Exception ex) {
diff --git a/src/org/jgroups/protocols/BasicTCP.java b/src/org/jgroups/protocols/BasicTCP.java
index 61945b2d14..56b7929877 100644
--- a/src/org/jgroups/protocols/BasicTCP.java
+++ b/src/org/jgroups/protocols/BasicTCP.java
@@ -55,9 +55,6 @@ public abstract class BasicTCP extends TP implements Receiver {
"(https://issues.redhat.com/browse/JGRP-2684)")
protected boolean use_acks;
- // @Property(description="Sets socket option SO_REUSEADDR (https://issues.redhat.com/browse/JGRP-2009)")
- // protected boolean reuse_addr;
-
@LocalAddress
@Property(name="client_bind_addr",
description="The address of a local network interface which should be used by client sockets to bind to. " +
diff --git a/src/org/jgroups/protocols/TCP.java b/src/org/jgroups/protocols/TCP.java
index 9413914ef6..223e62d023 100644
--- a/src/org/jgroups/protocols/TCP.java
+++ b/src/org/jgroups/protocols/TCP.java
@@ -52,6 +52,12 @@ public TCP() {}
@Component(name="tls",description="Contains the attributes for TLS (SSL sockets) when enabled=true")
protected TLS tls=new TLS();
+ @Property(description="use bounded queues for sending (https://issues.redhat.com/browse/JGRP-2759)")
+ protected boolean non_blocking_sends;
+
+ @Property(description="when sending and non_blocking, how many messages to queue max")
+ protected int max_send_queue=128;
+
public int getBufferedInputStreamSize() {
return buffered_input_stream_size;
}
@@ -69,10 +75,14 @@ public TCP setBufferedOutputStreamSize(int buffered_output_stream_size) {
this.buffered_output_stream_size=buffered_output_stream_size;
return this;
}
- public TLS tls() {return tls;}
- public TCP tls(TLS t) {this.tls=t; return this;}
- public boolean logAcceptError() {return log_accept_error;}
- public TCP logAcceptError(boolean l) {this.log_accept_error=l; if(srv != null) srv.setLogAcceptError(l); return this;}
+ public TLS tls() {return tls;}
+ public TCP tls(TLS t) {this.tls=t; return this;}
+ public boolean logAcceptError() {return log_accept_error;}
+ public TCP logAcceptError(boolean l) {this.log_accept_error=l; if(srv != null) srv.setLogAcceptError(l); return this;}
+ public boolean nonBlockingSends() {return non_blocking_sends;}
+ public TCP nonBlockingSends(boolean b) {this.non_blocking_sends=b; return this;}
+ public int maxSendQueue() {return max_send_queue;}
+ public TCP maxSendQueue(int s) {this.max_send_queue=s; return this;}
@ManagedAttribute
public int getOpenConnections() {
@@ -110,17 +120,19 @@ public void start() throws Exception {
}
srv=new TcpServer(getThreadFactory(), getSocketFactory(), bind_addr, bind_port, bind_port+port_range,
external_addr, external_port, recv_buf_size).setLogAcceptError(log_accept_error);
- srv.receiver(this)
+
+ srv.setBufferedInputStreamSize(buffered_input_stream_size).setBufferedOutputStreamSize(buffered_output_stream_size)
+ .peerAddressReadTimeout(peer_addr_read_timeout)
+ .nonBlockingSends(non_blocking_sends).maxSendQueue(max_send_queue)
+ .usePeerConnections(true)
+ .useAcks(this.use_acks)
+ .socketFactory(getSocketFactory())
+ .receiver(this)
.timeService(time_service)
.socketConnectionTimeout(sock_conn_timeout)
.tcpNodelay(tcp_nodelay).linger(linger)
.clientBindAddress(client_bind_addr).clientBindPort(client_bind_port).deferClientBinding(defer_client_bind_addr)
.log(this.log).logDetails(this.log_details);
- srv.setBufferedInputStreamSize(buffered_input_stream_size).setBufferedOutputStreamSize(buffered_output_stream_size)
- .peerAddressReadTimeout(peer_addr_read_timeout)
- .usePeerConnections(true)
- .useAcks(this.use_acks)
- .socketFactory(getSocketFactory());
if(send_buf_size > 0)
srv.sendBufferSize(send_buf_size);
diff --git a/src/org/jgroups/protocols/TUNNEL.java b/src/org/jgroups/protocols/TUNNEL.java
index 09bb63a568..5f4473711a 100644
--- a/src/org/jgroups/protocols/TUNNEL.java
+++ b/src/org/jgroups/protocols/TUNNEL.java
@@ -75,6 +75,12 @@ public interface TUNNELPolicy {
@Property(description="SO_LINGER in seconds. Default of -1 disables it")
protected int linger=-1; // SO_LINGER (number of seconds, -1 disables it)
+ @Property(description="use bounded queues for sending (https://issues.redhat.com/browse/JGRP-2759)")
+ protected boolean non_blocking_sends;
+
+ @Property(description="when sending and non_blocking, how many messages to queue max")
+ protected int max_send_queue=128;
+
/* ------------------------------------------ Fields ----------------------------------------------------- */
protected final List gossip_routers=new ArrayList<>();
@@ -89,7 +95,6 @@ public interface TUNNELPolicy {
public TUNNEL() {
}
-
public long getReconnectInterval() {return reconnect_interval;}
public TUNNEL setReconnectInterval(long r) {this.reconnect_interval=r; return this;}
public boolean isTcpNodelay() {return tcp_nodelay;}
@@ -100,6 +105,10 @@ public TUNNEL() {
public TUNNEL tls(TLS t) {this.tls=t; return this;}
public int getLinger() {return linger;}
public TUNNEL setLinger(int l) {this.linger=l; return this;}
+ public boolean nonBlockingSends() {return non_blocking_sends;}
+ public TUNNEL nonBlockingSends(boolean b) {this.non_blocking_sends=b; return this;}
+ public int maxSendQueue() {return max_send_queue;}
+ public TUNNEL maxSendQueue(int s) {this.max_send_queue=s; return this;}
/** We can simply send a message with dest == null and the GossipRouter will take care of routing it to all
* members in the cluster */
@@ -176,7 +185,8 @@ public void init() throws Exception {
if(gossip_routers.isEmpty())
throw new IllegalStateException("gossip_router_hosts needs to contain at least one address of a GossipRouter");
log.debug("gossip routers are %s", gossip_routers);
- stubManager=RouterStubManager.emptyGossipClientStubManager(log, timer).useNio(this.use_nio);
+ stubManager=RouterStubManager.emptyGossipClientStubManager(log, timer).useNio(this.use_nio)
+ .nonBlockingSends(non_blocking_sends).maxSendQueue(max_send_queue);
sock=getSocketFactory().createDatagramSocket("jgroups.tunnel.ucast_sock", 0, bind_addr);
}
@@ -215,7 +225,8 @@ public Object down(Event evt) {
PhysicalAddress physical_addr=getPhysicalAddressFromCache(local);
String logical_name=org.jgroups.util.NameCache.get(local);
stubManager=new RouterStubManager(log,timer,group,local, logical_name, physical_addr, reconnect_interval)
- .useNio(this.use_nio).socketFactory(getSocketFactory()).heartbeat(heartbeat_interval, heartbeat_timeout);
+ .useNio(this.use_nio).socketFactory(getSocketFactory()).heartbeat(heartbeat_interval, heartbeat_timeout)
+ .nonBlockingSends(non_blocking_sends).maxSendQueue(max_send_queue);
for(InetSocketAddress gr: gossip_routers) {
try {
InetSocketAddress target=gr.isUnresolved()? new InetSocketAddress(gr.getHostString(), gr.getPort())
diff --git a/src/org/jgroups/stack/GossipRouter.java b/src/org/jgroups/stack/GossipRouter.java
index c3945b00a3..01976acd1a 100644
--- a/src/org/jgroups/stack/GossipRouter.java
+++ b/src/org/jgroups/stack/GossipRouter.java
@@ -100,6 +100,12 @@ public class GossipRouter extends ReceiverAdapter implements ConnectionListener,
@Component(name="tls",description="Contains the attributes for TLS (SSL sockets) when enabled=true")
protected TLS tls=new TLS();
+ @ManagedAttribute(description="Use bounded queues for sending (https://issues.redhat.com/browse/JGRP-2759)) in TCP (use_nio=false)")
+ protected boolean non_blocking_sends;
+
+ @ManagedAttribute(description="When sending and non_blocking, how many messages to queue max")
+ protected int max_send_queue=128;
+
// mapping between groups and - pairs
protected final Map> address_mappings=new ConcurrentHashMap<>();
@@ -154,6 +160,10 @@ public GossipRouter(InetAddress bind_addr, int local_port) throws Exception {
public DiagnosticsHandler diagHandler() {return diag;}
public TLS tls() {return tls;}
public GossipRouter tls(TLS t) {this.tls=t; return this;}
+ public boolean nonBlockingSends() {return non_blocking_sends;}
+ public GossipRouter nonBlockingSends(boolean b) {this.non_blocking_sends=b; return this;}
+ public int maxSendQueue() {return max_send_queue;}
+ public GossipRouter maxSendQueue(int s) {this.max_send_queue=s; return this;}
@ManagedAttribute(description="operational status", name="running")
@@ -206,8 +216,8 @@ public GossipRouter start() throws Exception {
server=use_nio? new NioServer(thread_factory, socket_factory, bind_addr, port, port, null, 0,
recv_buf_size, "jgroups.nio.gossiprouter")
- : new TcpServer(thread_factory, socket_factory, bind_addr, port, port, null, 0,
- recv_buf_size, "jgroups.tcp.gossiprouter");
+ : new TcpServer(thread_factory, socket_factory, bind_addr, port, port, null, 0, recv_buf_size,
+ "jgroups.tcp.gossiprouter").nonBlockingSends(non_blocking_sends).maxSendQueue(max_send_queue);
server.receiver(this).setMaxLength(max_length)
.addConnectionListener(this)
.connExpireTimeout(expiry_time).reaperInterval(reaper_interval).linger(linger_timeout);
@@ -760,6 +770,12 @@ public static void main(String[] args) throws Exception {
List diag_bind_interfaces=null;
String diag_passcode=null;
+ // Use bounded queues for sending (https://issues.redhat.com/browse/JGRP-2759)") in TCP (use_nio=false)
+ boolean non_blocking_sends=false;
+
+ // When sending and non_blocking, how many messages to queue max
+ int max_send_queue=128;
+
TLS tls=new TLS();
long start=System.currentTimeMillis();
String bind_addr=null;
@@ -800,6 +816,14 @@ public static void main(String[] args) throws Exception {
nio=Boolean.parseBoolean(args[++i]);
continue;
}
+ if("-non_blocking_sends".equals(arg)) {
+ non_blocking_sends=Boolean.parseBoolean(args[++i]);
+ continue;
+ }
+ if("max_send_queue".equals(arg)) {
+ max_send_queue=Integer.parseInt(args[++i]);
+ continue;
+ }
if("-suspect".equals(arg)) {
suspects=Boolean.parseBoolean(args[++i]);
continue;
@@ -914,7 +938,7 @@ public static void main(String[] args) throws Exception {
.emitSuspectEvents(suspects)
.dumpMessages(dump_msgs)
.maxLength(max_length)
- .tls(tls);
+ .tls(tls).nonBlockingSends(non_blocking_sends).maxSendQueue(max_send_queue);
router.diagHandler().setEnabled(diag_enabled)
.enableUdp(diag_enable_udp)
.enableTcp(diag_enable_tcp)
@@ -976,6 +1000,10 @@ static void help() {
System.out.println();
System.out.println(" -nio - Whether or not to use non-blocking connections (NIO)");
System.out.println();
+ System.out.println(" -non_blocking_sends - Use bounded queues for sending (https://issues.redhat.com/browse/JGRP-2759))");
+ System.out.println();
+ System.out.println(" -max_send_queue - When sending and non_blocking, how many messages to queue max");
+ System.out.println();
System.out.println(" -max_length - The max size (in bytes) of a message");
System.out.println();
System.out.println(" -suspect - Whether or not to use send SUSPECT events when a conn is closed");
diff --git a/src/org/jgroups/stack/Protocol.java b/src/org/jgroups/stack/Protocol.java
index 79a6725a4d..34ccb1ecbe 100644
--- a/src/org/jgroups/stack/Protocol.java
+++ b/src/org/jgroups/stack/Protocol.java
@@ -23,6 +23,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -350,7 +351,7 @@ public Object up(Message msg) {
* (calling {@link #accept(Message)}), and - if true - calls {@link #up(org.jgroups.Event)}
* for that message and removes the message. If the batch is not empty, it is passed up, or else it is dropped.
* Subclasses should check if there are any messages destined for them (e.g. using
- * {@link MessageBatch#getMatchingMessages(short,boolean)}), then possibly remove and process them and finally pass
+ * {@link MessageBatch#iterator(Predicate)}), then possibly remove and process them and finally pass
* the batch up to the next protocol. Protocols can also modify messages in place, e.g. ENCRYPT could decrypt all
* encrypted messages in the batch, not remove them, and pass the batch up when done.
* @param batch The message batch
diff --git a/src/org/jgroups/stack/RouterStub.java b/src/org/jgroups/stack/RouterStub.java
index cada2e1b42..92522693d4 100644
--- a/src/org/jgroups/stack/RouterStub.java
+++ b/src/org/jgroups/stack/RouterStub.java
@@ -28,23 +28,29 @@ public interface StubReceiver {void receive(GossipData data);}
public interface MembersNotification {void members(List mbrs);}
public interface CloseListener {void closed(RouterStub stub);}
- protected BaseServer client;
- protected IpAddress local; // bind address
- protected IpAddress remote; // address of remote GossipRouter
- protected InetSocketAddress remote_sa; // address of remote GossipRouter, not resolved yet
- protected final boolean use_nio;
- protected StubReceiver receiver; // external consumer of data, e.g. TUNNEL
- protected CloseListener close_listener;
- protected SocketFactory socket_factory;
- protected static final Log log=LogFactory.getLog(RouterStub.class);
+ protected BaseServer client;
+ protected IpAddress local; // bind address
+ protected IpAddress remote; // address of remote GossipRouter
+ protected InetSocketAddress remote_sa; // address of remote GossipRouter, not resolved yet
+ protected final boolean use_nio;
+ protected StubReceiver receiver; // external consumer of data, e.g. TUNNEL
+ protected CloseListener close_listener;
+ protected SocketFactory socket_factory;
+ protected static final Log log=LogFactory.getLog(RouterStub.class);
// max number of ms to wait for socket establishment to GossipRouter
- protected int sock_conn_timeout=3000;
- protected boolean tcp_nodelay=true;
- protected int linger=-1; // SO_LINGER (number of seconds, -1 disables it)
- protected boolean handle_heartbeats;
+ protected int sock_conn_timeout=3000;
+ protected boolean tcp_nodelay=true;
+ protected int linger=-1; // SO_LINGER (number of seconds, -1 disables it)
+ protected boolean handle_heartbeats;
// timestamp of last heartbeat (or message from GossipRouter)
- protected volatile long last_heartbeat;
+ protected volatile long last_heartbeat;
+
+ // Use bounded queues for sending (https://issues.redhat.com/browse/JGRP-2759)") in TCP
+ protected boolean non_blocking_sends;
+
+ // When sending and non_blocking, how many messages to queue max
+ protected int max_send_queue=128;
// map to correlate GET_MBRS requests and responses
protected final Map> get_members_map=new HashMap<>();
@@ -54,6 +60,11 @@ public RouterStub(InetSocketAddress local_sa, InetSocketAddress remote_sa, boole
this(local_sa, remote_sa, use_nio, l, sf, -1);
}
+ public RouterStub(InetSocketAddress local_sa, InetSocketAddress remote_sa, boolean use_nio, CloseListener l,
+ SocketFactory sf, int linger) {
+ this(local_sa, remote_sa, use_nio, l, sf, linger, false, 0);
+ }
+
/**
* Creates a stub to a remote_sa {@link GossipRouter}.
* @param local_sa The local_sa bind address and port
@@ -62,8 +73,12 @@ public RouterStub(InetSocketAddress local_sa, InetSocketAddress remote_sa, boole
* @param l The {@link CloseListener}
* @param sf The {@link SocketFactory} to use to create the client socket
* @param linger SO_LINGER timeout
+ * @param non_blocking_sends When true and a TcpClient is used, non-blocking sends are enabled
+ * (https://issues.redhat.com/browse/JGRP-2759)
+ * @param max_send_queue The max size of the send queue for non-blocking sends
*/
- public RouterStub(InetSocketAddress local_sa, InetSocketAddress remote_sa, boolean use_nio, CloseListener l, SocketFactory sf, int linger) {
+ public RouterStub(InetSocketAddress local_sa, InetSocketAddress remote_sa, boolean use_nio, CloseListener l,
+ SocketFactory sf, int linger, boolean non_blocking_sends, int max_send_queue) {
this.local=local_sa != null? new IpAddress(local_sa.getAddress(), local_sa.getPort())
: new IpAddress((InetAddress)null,0);
this.remote_sa=Objects.requireNonNull(remote_sa);
@@ -71,6 +86,8 @@ public RouterStub(InetSocketAddress local_sa, InetSocketAddress remote_sa, boole
this.close_listener=l;
this.socket_factory=sf;
this.linger=linger;
+ this.non_blocking_sends=non_blocking_sends;
+ this.max_send_queue=max_send_queue;
if(resolveRemoteAddress()) // sets remote
client=createClient(sf);
}
@@ -94,7 +111,10 @@ public RouterStub(InetSocketAddress local_sa, InetSocketAddress remote_sa, boole
public long lastHeartbeat() {return last_heartbeat;}
public int getLinger() {return linger;}
public RouterStub setLinger(int l) {this.linger=l; return this;}
-
+ public boolean nonBlockingSends() {return non_blocking_sends;}
+ public RouterStub nonBlockingSends(boolean b) {this.non_blocking_sends=b; return this;}
+ public int maxSendQueue() {return max_send_queue;}
+ public RouterStub maxSendQueue(int s) {this.max_send_queue=s; return this;}
@@ -259,7 +279,8 @@ protected boolean resolveRemoteAddress() {
}
protected BaseServer createClient(SocketFactory sf) {
- BaseServer cl=use_nio? new NioClient(local, remote) : new TcpClient(local, remote);
+ BaseServer cl=use_nio? new NioClient(local, remote)
+ : new TcpClient(local, remote).nonBlockingSends(non_blocking_sends).maxSendQueue(max_send_queue);
if(sf != null) cl.socketFactory(sf);
cl.receiver(this);
cl.addConnectionListener(this);
diff --git a/src/org/jgroups/stack/RouterStubManager.java b/src/org/jgroups/stack/RouterStubManager.java
index 4d19fb53e7..5220f16913 100644
--- a/src/org/jgroups/stack/RouterStubManager.java
+++ b/src/org/jgroups/stack/RouterStubManager.java
@@ -43,6 +43,12 @@ public class RouterStubManager implements Runnable, RouterStub.CloseListener {
protected final Runnable send_heartbeat=this::sendHeartbeat;
protected final Runnable check_timeouts=this::checkTimeouts;
+ // Use bounded queues for sending (https://issues.redhat.com/browse/JGRP-2759)") in TCP
+ protected boolean non_blocking_sends;
+
+ // When sending and non_blocking, how many messages to queue max
+ protected int max_send_queue=128;
+
public RouterStubManager(Log log, TimeScheduler timer, String cluster_name, Address local_addr,
String logical_name, PhysicalAddress phys_addr, long reconnect_interval) {
@@ -64,6 +70,10 @@ public static RouterStubManager emptyGossipClientStubManager(Log log, TimeSchedu
public boolean reconnectorRunning() {return reconnector_task != null && !reconnector_task.isDone();}
public boolean heartbeaterRunning() {return heartbeat_task != null && !heartbeat_task.isDone();}
public boolean timeouterRunning() {return timeout_checker_task != null && !timeout_checker_task.isDone();}
+ public boolean nonBlockingSends() {return non_blocking_sends;}
+ public RouterStubManager nonBlockingSends(boolean b) {this.non_blocking_sends=b; return this;}
+ public int maxSendQueue() {return max_send_queue;}
+ public RouterStubManager maxSendQueue(int s) {this.max_send_queue=s; return this;}
public RouterStubManager socketFactory(SocketFactory socket_factory) {
@@ -116,7 +126,7 @@ public RouterStub createAndRegisterStub(InetSocketAddress local, InetSocketAddre
}
public RouterStub createAndRegisterStub(InetSocketAddress local, InetSocketAddress router_addr, int linger) {
- RouterStub stub=new RouterStub(local, router_addr, use_nio, this, socket_factory, linger)
+ RouterStub stub=new RouterStub(local, router_addr, use_nio, this, socket_factory, linger, non_blocking_sends, max_send_queue)
.handleHeartbeats(heartbeat_interval > 0);
this.stubs.add(stub);
return stub;