Skip to content

Commit

Permalink
- Added TcpConnectionNonBlocking (https://issues.redhat.com/browse/JG…
Browse files Browse the repository at this point in the history
…RP-2759)

- Added non_blocking_sends and max_send_queue to TCP/TcpConnection/TcpBaseServer
- Added non_blocking_sends and max_send_queue to TUNNEL, RouterStubManager, GossipRouter and RouterStub/TcpClient
  • Loading branch information
belaban committed Feb 8, 2024
1 parent 438e554 commit a505290
Show file tree
Hide file tree
Showing 13 changed files with 303 additions and 59 deletions.
15 changes: 11 additions & 4 deletions src/org/jgroups/blocks/cs/TcpBaseServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,20 @@
* @since 3.6.5
*/
public abstract class TcpBaseServer extends BaseServer {
protected int peer_addr_read_timeout=2000; // max time in milliseconds to block on reading peer address
protected int buffered_inputstream_size=8192;
protected int buffered_outputstream_size=8192;
protected int peer_addr_read_timeout=2000; // max time in milliseconds to block on reading peer address
protected int buffered_inputstream_size=8192;
protected int buffered_outputstream_size=8192;
protected boolean non_blocking_sends; // https://issues.redhat.com/browse/JGRP-2759
protected int max_send_queue=128; // when non_blocking, how many messages to queue max?

protected TcpBaseServer(ThreadFactory f, SocketFactory sf, int recv_buf_size) {
super(f, sf, recv_buf_size);
}

@Override
protected TcpConnection createConnection(Address dest) throws Exception {
return new TcpConnection(dest, this);
return non_blocking_sends? new TcpConnectionNonBlocking(dest, this).maxSize(max_send_queue)
: new TcpConnection(dest, this);
}


Expand All @@ -30,5 +33,9 @@ protected TcpConnection createConnection(Address dest) throws Exception {
public TcpBaseServer setBufferedInputStreamSize(int s) {this.buffered_inputstream_size=s; return this;}
public int getBufferedOutputStreamSize() {return buffered_outputstream_size;}
public TcpBaseServer setBufferedOutputStreamSize(int s) {this.buffered_outputstream_size=s; return this;}
public boolean nonBlockingSends() {return non_blocking_sends;}
public TcpBaseServer nonBlockingSends(boolean b) {this.non_blocking_sends=b; return this;}
public int maxSendQueue() {return max_send_queue;}
public TcpBaseServer maxSendQueue(int s) {this.max_send_queue=s; return this;}

}
5 changes: 0 additions & 5 deletions src/org/jgroups/blocks/cs/TcpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,6 @@ public void connectionClosed(Connection conn) {
stop();
}

@Override
public void connectionEstablished(Connection conn) {

}

public String toString() {
if(conn == null || !conn.isConnected())
return String.format("%s -> %s [not connected]", localAddress(), remoteAddress());
Expand Down
9 changes: 7 additions & 2 deletions src/org/jgroups/blocks/cs/TcpConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,15 +160,20 @@ public void send(ByteBuffer buf) throws Exception {
}
}


@GuardedBy("send_lock")
protected void doSend(byte[] data, int offset, int length) throws Exception {
doSend(data, offset, length, false);
}

@GuardedBy("send_lock")
protected void doSend(byte[] data, int offset, int length, boolean flush) throws Exception {
Bits.writeInt(length, length_buf, 0); // write the length of the data buffer first
out.write(length_buf, 0, length_buf.length);
out.write(data, offset, length);
if(flush)
out.flush();
}


public void flush() {
try {
out.flush();
Expand Down
164 changes: 164 additions & 0 deletions src/org/jgroups/blocks/cs/TcpConnectionNonBlocking.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package org.jgroups.blocks.cs;

import org.jgroups.Address;
import org.jgroups.util.ByteArray;
import org.jgroups.util.ThreadFactory;
import org.jgroups.util.Util;

import javax.net.ssl.SSLException;
import javax.net.ssl.SSLHandshakeException;
import java.io.EOFException;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.net.SocketException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.LongAdder;

/**
* TCP connection which (despite the fancy name) blocks only a single thread at most. Uses a bounded queue, to which
* senders add their messages, and a single consumer sending the messages. When the queue is full, messages will
* get dropped. Therefore, at most one thread is blocked on TCP write when the send-window is full.<br/>
* Link: https://issues.redhat.com/browse/JGRP-2759
* @author Bela Ban
* @since 5.3.3
*/
public class TcpConnectionNonBlocking extends TcpConnection {
protected BlockingQueue<ByteArray> queue;
protected int max_size=128;
protected volatile Sender sender;
protected final LongAdder dropped_msgs=new LongAdder();


public TcpConnectionNonBlocking(Address peer_addr, TcpBaseServer server) throws Exception {
super(peer_addr, server);
}

public TcpConnectionNonBlocking(Socket s, TcpServer server) throws Exception {
super(s, server);
}


public int maxSize() {return max_size;}
public TcpConnectionNonBlocking maxSize(int s) {max_size=s; return this;}
public long droppedMessages() {return dropped_msgs.sum();}
public int queueSize() {return queue != null? queue.size() : 0;}


@Override public void start() {
super.start();
queue=new ArrayBlockingQueue<>(max_size);
if(sender != null)
sender.stop();
sender=new Sender(server.factory).start();
}

@Override public void close() throws IOException {
super.close();
if(sender != null) {
sender.stop();
sender=null;
}
}

@Override
public void send(byte[] data, int offset, int length) throws Exception {

// to be on the safe side, we copy the data: some bundlers (e.g. TransferQueueBundler) reuse a buffer to
// serialize messages to and - before the data is sent by the sender thread - the buffer might be changed!
// This is similar to what NioConnection does on a partial write. If the send was synchronous (like in
// TcpConnection), we would not have to copy the data
ByteArray buf=new ByteArray(data, offset, length).copy();
boolean added=queue.offer(buf);
if(!added)
dropped_msgs.increment();
}

@Override
public String toString() {
return String.format("%s [%d/%d, %d drops, sender: %b]", super.toString(), queueSize(), maxSize(),
droppedMessages(), senderRunning());
}

protected String name() {
InetAddress local=sock.getLocalAddress(), remote=sock.getInetAddress();
String l=local != null? Util.shortName(local) : "<null>";
String r=remote != null? Util.shortName(remote) : "<null>";
return String.format("Connection.Sender [%s:%s-%s:%s]", l, sock.getLocalPort(), r, sock.getPort());
}



protected boolean senderRunning() {
final Sender tmp=sender;
return tmp != null && tmp.running();
}


protected class Sender implements Runnable {
protected final Thread thread;
protected volatile boolean running=true;

public Sender(ThreadFactory f) {
String name=name();
thread=f != null? f.newThread(this, name) : new Thread(this, name);
}

public Sender start() {
running=true;
thread.start();
return this;
}

public Sender stop() {
running=false;
Thread t=thread;
if(t != null && t.isAlive())
t.interrupt();
return this;
}

public boolean running() {
return running && isConnected();
}

@Override public void run() {
try {
while(running()) {
ByteArray data;
try {
data=queue.take();
}
catch(InterruptedException iex) {
continue;
}

// no synchronization needed as this thread is the only sender
doSend(data.getArray(), data.getOffset(), data.getLength(), true); // flush
}
}
catch(EOFException | SocketException ex) {
; // regular use case when a peer closes its connection - we don't want to log this as exception
}
catch(Exception e) {
//noinspection StatementWithEmptyBody
if (e instanceof SSLException && e.getMessage().contains("Socket closed")) {
; // regular use case when a peer closes its connection - we don't want to log this as exception
}
else if (e instanceof SSLHandshakeException && e.getCause() instanceof EOFException) {
; // Ignore SSL handshakes closed early (usually liveness probes)
}
else {
if(server.logDetails())
server.log.warn("failed sending message", e);
else
server.log.warn("failed sending message: " + e);
}
}
finally {
server.notifyConnectionClosed(TcpConnectionNonBlocking.this);
}
}
}
}
4 changes: 3 additions & 1 deletion src/org/jgroups/blocks/cs/TcpServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ public void run() {
protected void handleAccept(final Socket client_sock) throws Exception {
TcpConnection conn=null;
try {
conn=new TcpConnection(client_sock, TcpServer.this);
conn=non_blocking_sends? new TcpConnectionNonBlocking(client_sock, TcpServer.this).maxSize(max_send_queue)
: new TcpConnection(client_sock, TcpServer.this);

Address peer_addr=conn.peerAddress();
synchronized(this) {
boolean conn_exists=hasConnection(peer_addr),
Expand Down
9 changes: 0 additions & 9 deletions src/org/jgroups/demos/Chat.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,6 @@ private void start(String props, String name, boolean nohup) throws Exception {
try {
channel=new JChannel(props).name(name);
channel.setReceiver(this);

// todo: remove once https://issues.redhat.com/browse/JGRP-2689 has been implemented
/*
TP tp=channel.getProtocolStack().getTransport();
SSLContext context=TLS.getSSLContext();
DefaultSocketFactory ssl_factory=new DefaultSocketFactory(context);
tp.setSocketFactory(ssl_factory);
*/

channel.connect(CLUSTER);
}
catch(Exception ex) {
Expand Down
3 changes: 0 additions & 3 deletions src/org/jgroups/protocols/BasicTCP.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,6 @@ public abstract class BasicTCP extends TP implements Receiver {
"(https://issues.redhat.com/browse/JGRP-2684)")
protected boolean use_acks;

// @Property(description="Sets socket option SO_REUSEADDR (https://issues.redhat.com/browse/JGRP-2009)")
// protected boolean reuse_addr;

@LocalAddress
@Property(name="client_bind_addr",
description="The address of a local network interface which should be used by client sockets to bind to. " +
Expand Down
32 changes: 22 additions & 10 deletions src/org/jgroups/protocols/TCP.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ public TCP() {}
@Component(name="tls",description="Contains the attributes for TLS (SSL sockets) when enabled=true")
protected TLS tls=new TLS();

@Property(description="use bounded queues for sending (https://issues.redhat.com/browse/JGRP-2759)")
protected boolean non_blocking_sends;

@Property(description="when sending and non_blocking, how many messages to queue max")
protected int max_send_queue=128;

public int getBufferedInputStreamSize() {
return buffered_input_stream_size;
}
Expand All @@ -69,10 +75,14 @@ public TCP setBufferedOutputStreamSize(int buffered_output_stream_size) {
this.buffered_output_stream_size=buffered_output_stream_size;
return this;
}
public TLS tls() {return tls;}
public TCP tls(TLS t) {this.tls=t; return this;}
public boolean logAcceptError() {return log_accept_error;}
public TCP logAcceptError(boolean l) {this.log_accept_error=l; if(srv != null) srv.setLogAcceptError(l); return this;}
public TLS tls() {return tls;}
public TCP tls(TLS t) {this.tls=t; return this;}
public boolean logAcceptError() {return log_accept_error;}
public TCP logAcceptError(boolean l) {this.log_accept_error=l; if(srv != null) srv.setLogAcceptError(l); return this;}
public boolean nonBlockingSends() {return non_blocking_sends;}
public TCP nonBlockingSends(boolean b) {this.non_blocking_sends=b; return this;}
public int maxSendQueue() {return max_send_queue;}
public TCP maxSendQueue(int s) {this.max_send_queue=s; return this;}

@ManagedAttribute
public int getOpenConnections() {
Expand Down Expand Up @@ -110,17 +120,19 @@ public void start() throws Exception {
}
srv=new TcpServer(getThreadFactory(), getSocketFactory(), bind_addr, bind_port, bind_port+port_range,
external_addr, external_port, recv_buf_size).setLogAcceptError(log_accept_error);
srv.receiver(this)

srv.setBufferedInputStreamSize(buffered_input_stream_size).setBufferedOutputStreamSize(buffered_output_stream_size)
.peerAddressReadTimeout(peer_addr_read_timeout)
.nonBlockingSends(non_blocking_sends).maxSendQueue(max_send_queue)
.usePeerConnections(true)
.useAcks(this.use_acks)
.socketFactory(getSocketFactory())
.receiver(this)
.timeService(time_service)
.socketConnectionTimeout(sock_conn_timeout)
.tcpNodelay(tcp_nodelay).linger(linger)
.clientBindAddress(client_bind_addr).clientBindPort(client_bind_port).deferClientBinding(defer_client_bind_addr)
.log(this.log).logDetails(this.log_details);
srv.setBufferedInputStreamSize(buffered_input_stream_size).setBufferedOutputStreamSize(buffered_output_stream_size)
.peerAddressReadTimeout(peer_addr_read_timeout)
.usePeerConnections(true)
.useAcks(this.use_acks)
.socketFactory(getSocketFactory());

if(send_buf_size > 0)
srv.sendBufferSize(send_buf_size);
Expand Down
17 changes: 14 additions & 3 deletions src/org/jgroups/protocols/TUNNEL.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ public interface TUNNELPolicy {
@Property(description="SO_LINGER in seconds. Default of -1 disables it")
protected int linger=-1; // SO_LINGER (number of seconds, -1 disables it)

@Property(description="use bounded queues for sending (https://issues.redhat.com/browse/JGRP-2759)")
protected boolean non_blocking_sends;

@Property(description="when sending and non_blocking, how many messages to queue max")
protected int max_send_queue=128;

/* ------------------------------------------ Fields ----------------------------------------------------- */

protected final List<InetSocketAddress> gossip_routers=new ArrayList<>();
Expand All @@ -89,7 +95,6 @@ public interface TUNNELPolicy {
public TUNNEL() {
}


public long getReconnectInterval() {return reconnect_interval;}
public TUNNEL setReconnectInterval(long r) {this.reconnect_interval=r; return this;}
public boolean isTcpNodelay() {return tcp_nodelay;}
Expand All @@ -100,6 +105,10 @@ public TUNNEL() {
public TUNNEL tls(TLS t) {this.tls=t; return this;}
public int getLinger() {return linger;}
public TUNNEL setLinger(int l) {this.linger=l; return this;}
public boolean nonBlockingSends() {return non_blocking_sends;}
public TUNNEL nonBlockingSends(boolean b) {this.non_blocking_sends=b; return this;}
public int maxSendQueue() {return max_send_queue;}
public TUNNEL maxSendQueue(int s) {this.max_send_queue=s; return this;}

/** We can simply send a message with dest == null and the GossipRouter will take care of routing it to all
* members in the cluster */
Expand Down Expand Up @@ -176,7 +185,8 @@ public void init() throws Exception {
if(gossip_routers.isEmpty())
throw new IllegalStateException("gossip_router_hosts needs to contain at least one address of a GossipRouter");
log.debug("gossip routers are %s", gossip_routers);
stubManager=RouterStubManager.emptyGossipClientStubManager(log, timer).useNio(this.use_nio);
stubManager=RouterStubManager.emptyGossipClientStubManager(log, timer).useNio(this.use_nio)
.nonBlockingSends(non_blocking_sends).maxSendQueue(max_send_queue);
sock=getSocketFactory().createDatagramSocket("jgroups.tunnel.ucast_sock", 0, bind_addr);
}

Expand Down Expand Up @@ -215,7 +225,8 @@ public Object down(Event evt) {
PhysicalAddress physical_addr=getPhysicalAddressFromCache(local);
String logical_name=org.jgroups.util.NameCache.get(local);
stubManager=new RouterStubManager(log,timer,group,local, logical_name, physical_addr, reconnect_interval)
.useNio(this.use_nio).socketFactory(getSocketFactory()).heartbeat(heartbeat_interval, heartbeat_timeout);
.useNio(this.use_nio).socketFactory(getSocketFactory()).heartbeat(heartbeat_interval, heartbeat_timeout)
.nonBlockingSends(non_blocking_sends).maxSendQueue(max_send_queue);
for(InetSocketAddress gr: gossip_routers) {
try {
InetSocketAddress target=gr.isUnresolved()? new InetSocketAddress(gr.getHostString(), gr.getPort())
Expand Down
Loading

0 comments on commit a505290

Please sign in to comment.