Skip to content

Commit

Permalink
- Added TcpConnectionNonBlocking (https://issues.redhat.com/browse/JG…
Browse files Browse the repository at this point in the history
…RP-2759)

- Added non_blocking_sends and max_send_queue to TCP/TcpConnection/TcpBaseServer
  • Loading branch information
belaban committed Feb 8, 2024
1 parent 438e554 commit 9650dfb
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 28 deletions.
15 changes: 11 additions & 4 deletions src/org/jgroups/blocks/cs/TcpBaseServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,20 @@
* @since 3.6.5
*/
public abstract class TcpBaseServer extends BaseServer {
protected int peer_addr_read_timeout=2000; // max time in milliseconds to block on reading peer address
protected int buffered_inputstream_size=8192;
protected int buffered_outputstream_size=8192;
protected int peer_addr_read_timeout=2000; // max time in milliseconds to block on reading peer address
protected int buffered_inputstream_size=8192;
protected int buffered_outputstream_size=8192;
protected boolean non_blocking_sends; // https://issues.redhat.com/browse/JGRP-2759
protected int max_send_queue=128; // when non_blocking, how many messages to queue max?

protected TcpBaseServer(ThreadFactory f, SocketFactory sf, int recv_buf_size) {
super(f, sf, recv_buf_size);
}

@Override
protected TcpConnection createConnection(Address dest) throws Exception {
return new TcpConnection(dest, this);
return non_blocking_sends? new TcpConnectionNonBlocking(dest, this).maxSize(max_send_queue)
: new TcpConnection(dest, this);
}


Expand All @@ -30,5 +33,9 @@ protected TcpConnection createConnection(Address dest) throws Exception {
public TcpBaseServer setBufferedInputStreamSize(int s) {this.buffered_inputstream_size=s; return this;}
public int getBufferedOutputStreamSize() {return buffered_outputstream_size;}
public TcpBaseServer setBufferedOutputStreamSize(int s) {this.buffered_outputstream_size=s; return this;}
public boolean nonBlockingSends() {return non_blocking_sends;}
public TcpBaseServer nonBlockingSends(boolean b) {this.non_blocking_sends=b; return this;}
public int maxSendQueue() {return max_send_queue;}
public TcpBaseServer maxSendQueue(int s) {this.max_send_queue=s; return this;}

}
9 changes: 7 additions & 2 deletions src/org/jgroups/blocks/cs/TcpConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,15 +160,20 @@ public void send(ByteBuffer buf) throws Exception {
}
}


@GuardedBy("send_lock")
protected void doSend(byte[] data, int offset, int length) throws Exception {
doSend(data, offset, length, false);
}

@GuardedBy("send_lock")
protected void doSend(byte[] data, int offset, int length, boolean flush) throws Exception {
Bits.writeInt(length, length_buf, 0); // write the length of the data buffer first
out.write(length_buf, 0, length_buf.length);
out.write(data, offset, length);
if(flush)
out.flush();
}


public void flush() {
try {
out.flush();
Expand Down
164 changes: 164 additions & 0 deletions src/org/jgroups/blocks/cs/TcpConnectionNonBlocking.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package org.jgroups.blocks.cs;

import org.jgroups.Address;
import org.jgroups.util.ByteArray;
import org.jgroups.util.ThreadFactory;
import org.jgroups.util.Util;

import javax.net.ssl.SSLException;
import javax.net.ssl.SSLHandshakeException;
import java.io.EOFException;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.net.SocketException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.LongAdder;

/**
* TCP connection which (despite the fancy name) blocks only a single thread at most. Uses a bounded queue, to which
* senders add their messages, and a single consumer sending the messages. When the queue is full, messages will
* get dropped. Therefore, at most one thread is blocked on TCP write when the send-window is full.<br/>
* Link: https://issues.redhat.com/browse/JGRP-2759
* @author Bela Ban
* @since 5.3.3
*/
public class TcpConnectionNonBlocking extends TcpConnection {
protected BlockingQueue<ByteArray> queue;
protected int max_size=128;
protected volatile Sender sender;
protected final LongAdder dropped_msgs=new LongAdder();


public TcpConnectionNonBlocking(Address peer_addr, TcpBaseServer server) throws Exception {
super(peer_addr, server);
}

public TcpConnectionNonBlocking(Socket s, TcpServer server) throws Exception {
super(s, server);
}


public int maxSize() {return max_size;}
public TcpConnectionNonBlocking maxSize(int s) {max_size=s; return this;}
public long droppedMessages() {return dropped_msgs.sum();}
public int queueSize() {return queue != null? queue.size() : 0;}


@Override public void start() {
super.start();
queue=new ArrayBlockingQueue<>(max_size);
if(sender != null)
sender.stop();
sender=new Sender(server.factory).start();
}

@Override public void close() throws IOException {
super.close();
if(sender != null) {
sender.stop();
sender=null;
}
}

@Override
public void send(byte[] data, int offset, int length) throws Exception {

// to be on the safe side, we copy the data: some bundlers (e.g. TransferQueueBundler) reuse a buffer to
// serialize messages to and - before the data is sent by the sender thread - the buffer might be changed!
// This is similar to what NioConnection does on a partial write. If the send was synchronous (like in
// TcpConnection), we would not have to copy the data
ByteArray buf=new ByteArray(data, offset, length).copy();
boolean added=queue.offer(buf);
if(!added)
dropped_msgs.increment();
}

@Override
public String toString() {
return String.format("%s [%d/%d, %d drops, sender: %b]", super.toString(), queueSize(), maxSize(),
droppedMessages(), senderRunning());
}

protected String name() {
InetAddress local=sock.getLocalAddress(), remote=sock.getInetAddress();
String l=local != null? Util.shortName(local) : "<null>";
String r=remote != null? Util.shortName(remote) : "<null>";
return String.format("Connection.Sender [%s:%s-%s:%s]", l, sock.getLocalPort(), r, sock.getPort());
}



protected boolean senderRunning() {
final Sender tmp=sender;
return tmp != null && tmp.running();
}


protected class Sender implements Runnable {
protected final Thread thread;
protected volatile boolean running=true;

public Sender(ThreadFactory f) {
String name=name();
thread=f != null? f.newThread(this, name) : new Thread(this, name);
}

public Sender start() {
running=true;
thread.start();
return this;
}

public Sender stop() {
running=false;
Thread t=thread;
if(t != null && t.isAlive())
t.interrupt();
return this;
}

public boolean running() {
return running && isConnected();
}

@Override public void run() {
try {
while(running()) {
ByteArray data;
try {
data=queue.take();
}
catch(InterruptedException iex) {
continue;
}

// no synchronization needed as this thread is the only sender
doSend(data.getArray(), data.getOffset(), data.getLength(), true); // flush
}
}
catch(EOFException | SocketException ex) {
; // regular use case when a peer closes its connection - we don't want to log this as exception
}
catch(Exception e) {
//noinspection StatementWithEmptyBody
if (e instanceof SSLException && e.getMessage().contains("Socket closed")) {
; // regular use case when a peer closes its connection - we don't want to log this as exception
}
else if (e instanceof SSLHandshakeException && e.getCause() instanceof EOFException) {
; // Ignore SSL handshakes closed early (usually liveness probes)
}
else {
if(server.logDetails())
server.log.warn("failed sending message", e);
else
server.log.warn("failed sending message: " + e);
}
}
finally {
server.notifyConnectionClosed(TcpConnectionNonBlocking.this);
}
}
}
}
6 changes: 4 additions & 2 deletions src/org/jgroups/blocks/cs/TcpServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,11 @@ public void run() {
protected void handleAccept(final Socket client_sock) throws Exception {
TcpConnection conn=null;
try {
conn=new TcpConnection(client_sock, TcpServer.this);
conn=non_blocking_sends? new TcpConnectionNonBlocking(client_sock, TcpServer.this).maxSize(max_send_queue)
: new TcpConnection(client_sock, TcpServer.this);

Address peer_addr=conn.peerAddress();
synchronized(this) {
synchronized(TcpServer.this) {
boolean conn_exists=hasConnection(peer_addr),
replace=conn_exists && use_peer_connections && local_addr.compareTo(peer_addr) < 0; // bigger conn wins

Expand Down
9 changes: 0 additions & 9 deletions src/org/jgroups/demos/Chat.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,6 @@ private void start(String props, String name, boolean nohup) throws Exception {
try {
channel=new JChannel(props).name(name);
channel.setReceiver(this);

// todo: remove once https://issues.redhat.com/browse/JGRP-2689 has been implemented
/*
TP tp=channel.getProtocolStack().getTransport();
SSLContext context=TLS.getSSLContext();
DefaultSocketFactory ssl_factory=new DefaultSocketFactory(context);
tp.setSocketFactory(ssl_factory);
*/

channel.connect(CLUSTER);
}
catch(Exception ex) {
Expand Down
32 changes: 22 additions & 10 deletions src/org/jgroups/protocols/TCP.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ public TCP() {}
@Component(name="tls",description="Contains the attributes for TLS (SSL sockets) when enabled=true")
protected TLS tls=new TLS();

@Property(description="use bounded queues for sending (https://issues.redhat.com/browse/JGRP-2759)")
protected boolean non_blocking_sends;

@Property(description="when sending and non_blocking, how many messages to queue max")
protected int max_send_queue=128;

public int getBufferedInputStreamSize() {
return buffered_input_stream_size;
}
Expand All @@ -69,10 +75,14 @@ public TCP setBufferedOutputStreamSize(int buffered_output_stream_size) {
this.buffered_output_stream_size=buffered_output_stream_size;
return this;
}
public TLS tls() {return tls;}
public TCP tls(TLS t) {this.tls=t; return this;}
public boolean logAcceptError() {return log_accept_error;}
public TCP logAcceptError(boolean l) {this.log_accept_error=l; if(srv != null) srv.setLogAcceptError(l); return this;}
public TLS tls() {return tls;}
public TCP tls(TLS t) {this.tls=t; return this;}
public boolean logAcceptError() {return log_accept_error;}
public TCP logAcceptError(boolean l) {this.log_accept_error=l; if(srv != null) srv.setLogAcceptError(l); return this;}
public boolean nonBlockingSends() {return non_blocking_sends;}
public TCP nonBlockingSends(boolean b) {this.non_blocking_sends=b; return this;}
public int maxSendQueue() {return max_send_queue;}
public TCP maxSendQueue(int s) {this.max_send_queue=s; return this;}

@ManagedAttribute
public int getOpenConnections() {
Expand Down Expand Up @@ -110,17 +120,19 @@ public void start() throws Exception {
}
srv=new TcpServer(getThreadFactory(), getSocketFactory(), bind_addr, bind_port, bind_port+port_range,
external_addr, external_port, recv_buf_size).setLogAcceptError(log_accept_error);
srv.receiver(this)

srv.setBufferedInputStreamSize(buffered_input_stream_size).setBufferedOutputStreamSize(buffered_output_stream_size)
.peerAddressReadTimeout(peer_addr_read_timeout)
.nonBlockingSends(non_blocking_sends).maxSendQueue(max_send_queue)
.usePeerConnections(true)
.useAcks(this.use_acks)
.socketFactory(getSocketFactory())
.receiver(this)
.timeService(time_service)
.socketConnectionTimeout(sock_conn_timeout)
.tcpNodelay(tcp_nodelay).linger(linger)
.clientBindAddress(client_bind_addr).clientBindPort(client_bind_port).deferClientBinding(defer_client_bind_addr)
.log(this.log).logDetails(this.log_details);
srv.setBufferedInputStreamSize(buffered_input_stream_size).setBufferedOutputStreamSize(buffered_output_stream_size)
.peerAddressReadTimeout(peer_addr_read_timeout)
.usePeerConnections(true)
.useAcks(this.use_acks)
.socketFactory(getSocketFactory());

if(send_buf_size > 0)
srv.sendBufferSize(send_buf_size);
Expand Down
3 changes: 2 additions & 1 deletion src/org/jgroups/stack/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.function.Predicate;
import java.util.stream.Collectors;


Expand Down Expand Up @@ -350,7 +351,7 @@ public Object up(Message msg) {
* (calling {@link #accept(Message)}), and - if true - calls {@link #up(org.jgroups.Event)}
* for that message and removes the message. If the batch is not empty, it is passed up, or else it is dropped.<p/>
* Subclasses should check if there are any messages destined for them (e.g. using
* {@link MessageBatch#getMatchingMessages(short,boolean)}), then possibly remove and process them and finally pass
* {@link MessageBatch#iterator(Predicate)}), then possibly remove and process them and finally pass
* the batch up to the next protocol. Protocols can also modify messages in place, e.g. ENCRYPT could decrypt all
* encrypted messages in the batch, not remove them, and pass the batch up when done.
* @param batch The message batch
Expand Down

0 comments on commit 9650dfb

Please sign in to comment.