From 9650dfb8ab93f28085f5e09a990e4176b5d87762 Mon Sep 17 00:00:00 2001 From: Bela Ban Date: Sat, 3 Feb 2024 15:18:32 +0100 Subject: [PATCH] - Added TcpConnectionNonBlocking (https://issues.redhat.com/browse/JGRP-2759) - Added non_blocking_sends and max_send_queue to TCP/TcpConnection/TcpBaseServer --- src/org/jgroups/blocks/cs/TcpBaseServer.java | 15 +- src/org/jgroups/blocks/cs/TcpConnection.java | 9 +- .../blocks/cs/TcpConnectionNonBlocking.java | 164 ++++++++++++++++++ src/org/jgroups/blocks/cs/TcpServer.java | 6 +- src/org/jgroups/demos/Chat.java | 9 - src/org/jgroups/protocols/TCP.java | 32 ++-- src/org/jgroups/stack/Protocol.java | 3 +- 7 files changed, 210 insertions(+), 28 deletions(-) create mode 100644 src/org/jgroups/blocks/cs/TcpConnectionNonBlocking.java diff --git a/src/org/jgroups/blocks/cs/TcpBaseServer.java b/src/org/jgroups/blocks/cs/TcpBaseServer.java index 94e4e19177a..b8a5245bac9 100644 --- a/src/org/jgroups/blocks/cs/TcpBaseServer.java +++ b/src/org/jgroups/blocks/cs/TcpBaseServer.java @@ -10,9 +10,11 @@ * @since 3.6.5 */ public abstract class TcpBaseServer extends BaseServer { - protected int peer_addr_read_timeout=2000; // max time in milliseconds to block on reading peer address - protected int buffered_inputstream_size=8192; - protected int buffered_outputstream_size=8192; + protected int peer_addr_read_timeout=2000; // max time in milliseconds to block on reading peer address + protected int buffered_inputstream_size=8192; + protected int buffered_outputstream_size=8192; + protected boolean non_blocking_sends; // https://issues.redhat.com/browse/JGRP-2759 + protected int max_send_queue=128; // when non_blocking, how many messages to queue max? protected TcpBaseServer(ThreadFactory f, SocketFactory sf, int recv_buf_size) { super(f, sf, recv_buf_size); @@ -20,7 +22,8 @@ protected TcpBaseServer(ThreadFactory f, SocketFactory sf, int recv_buf_size) { @Override protected TcpConnection createConnection(Address dest) throws Exception { - return new TcpConnection(dest, this); + return non_blocking_sends? new TcpConnectionNonBlocking(dest, this).maxSize(max_send_queue) + : new TcpConnection(dest, this); } @@ -30,5 +33,9 @@ protected TcpConnection createConnection(Address dest) throws Exception { public TcpBaseServer setBufferedInputStreamSize(int s) {this.buffered_inputstream_size=s; return this;} public int getBufferedOutputStreamSize() {return buffered_outputstream_size;} public TcpBaseServer setBufferedOutputStreamSize(int s) {this.buffered_outputstream_size=s; return this;} + public boolean nonBlockingSends() {return non_blocking_sends;} + public TcpBaseServer nonBlockingSends(boolean b) {this.non_blocking_sends=b; return this;} + public int maxSendQueue() {return max_send_queue;} + public TcpBaseServer maxSendQueue(int s) {this.max_send_queue=s; return this;} } diff --git a/src/org/jgroups/blocks/cs/TcpConnection.java b/src/org/jgroups/blocks/cs/TcpConnection.java index 1fd4521c0bc..b84545c91b1 100644 --- a/src/org/jgroups/blocks/cs/TcpConnection.java +++ b/src/org/jgroups/blocks/cs/TcpConnection.java @@ -160,15 +160,20 @@ public void send(ByteBuffer buf) throws Exception { } } - @GuardedBy("send_lock") protected void doSend(byte[] data, int offset, int length) throws Exception { + doSend(data, offset, length, false); + } + + @GuardedBy("send_lock") + protected void doSend(byte[] data, int offset, int length, boolean flush) throws Exception { Bits.writeInt(length, length_buf, 0); // write the length of the data buffer first out.write(length_buf, 0, length_buf.length); out.write(data, offset, length); + if(flush) + out.flush(); } - public void flush() { try { out.flush(); diff --git a/src/org/jgroups/blocks/cs/TcpConnectionNonBlocking.java b/src/org/jgroups/blocks/cs/TcpConnectionNonBlocking.java new file mode 100644 index 00000000000..f7ebe79d7d7 --- /dev/null +++ b/src/org/jgroups/blocks/cs/TcpConnectionNonBlocking.java @@ -0,0 +1,164 @@ +package org.jgroups.blocks.cs; + +import org.jgroups.Address; +import org.jgroups.util.ByteArray; +import org.jgroups.util.ThreadFactory; +import org.jgroups.util.Util; + +import javax.net.ssl.SSLException; +import javax.net.ssl.SSLHandshakeException; +import java.io.EOFException; +import java.io.IOException; +import java.net.InetAddress; +import java.net.Socket; +import java.net.SocketException; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.LongAdder; + +/** + * TCP connection which (despite the fancy name) blocks only a single thread at most. Uses a bounded queue, to which + * senders add their messages, and a single consumer sending the messages. When the queue is full, messages will + * get dropped. Therefore, at most one thread is blocked on TCP write when the send-window is full.
+ * Link: https://issues.redhat.com/browse/JGRP-2759 + * @author Bela Ban + * @since 5.3.3 + */ +public class TcpConnectionNonBlocking extends TcpConnection { + protected BlockingQueue queue; + protected int max_size=128; + protected volatile Sender sender; + protected final LongAdder dropped_msgs=new LongAdder(); + + + public TcpConnectionNonBlocking(Address peer_addr, TcpBaseServer server) throws Exception { + super(peer_addr, server); + } + + public TcpConnectionNonBlocking(Socket s, TcpServer server) throws Exception { + super(s, server); + } + + + public int maxSize() {return max_size;} + public TcpConnectionNonBlocking maxSize(int s) {max_size=s; return this;} + public long droppedMessages() {return dropped_msgs.sum();} + public int queueSize() {return queue != null? queue.size() : 0;} + + + @Override public void start() { + super.start(); + queue=new ArrayBlockingQueue<>(max_size); + if(sender != null) + sender.stop(); + sender=new Sender(server.factory).start(); + } + + @Override public void close() throws IOException { + super.close(); + if(sender != null) { + sender.stop(); + sender=null; + } + } + + @Override + public void send(byte[] data, int offset, int length) throws Exception { + + // to be on the safe side, we copy the data: some bundlers (e.g. TransferQueueBundler) reuse a buffer to + // serialize messages to and - before the data is sent by the sender thread - the buffer might be changed! + // This is similar to what NioConnection does on a partial write. If the send was synchronous (like in + // TcpConnection), we would not have to copy the data + ByteArray buf=new ByteArray(data, offset, length).copy(); + boolean added=queue.offer(buf); + if(!added) + dropped_msgs.increment(); + } + + @Override + public String toString() { + return String.format("%s [%d/%d, %d drops, sender: %b]", super.toString(), queueSize(), maxSize(), + droppedMessages(), senderRunning()); + } + + protected String name() { + InetAddress local=sock.getLocalAddress(), remote=sock.getInetAddress(); + String l=local != null? Util.shortName(local) : ""; + String r=remote != null? Util.shortName(remote) : ""; + return String.format("Connection.Sender [%s:%s-%s:%s]", l, sock.getLocalPort(), r, sock.getPort()); + } + + + + protected boolean senderRunning() { + final Sender tmp=sender; + return tmp != null && tmp.running(); + } + + + protected class Sender implements Runnable { + protected final Thread thread; + protected volatile boolean running=true; + + public Sender(ThreadFactory f) { + String name=name(); + thread=f != null? f.newThread(this, name) : new Thread(this, name); + } + + public Sender start() { + running=true; + thread.start(); + return this; + } + + public Sender stop() { + running=false; + Thread t=thread; + if(t != null && t.isAlive()) + t.interrupt(); + return this; + } + + public boolean running() { + return running && isConnected(); + } + + @Override public void run() { + try { + while(running()) { + ByteArray data; + try { + data=queue.take(); + } + catch(InterruptedException iex) { + continue; + } + + // no synchronization needed as this thread is the only sender + doSend(data.getArray(), data.getOffset(), data.getLength(), true); // flush + } + } + catch(EOFException | SocketException ex) { + ; // regular use case when a peer closes its connection - we don't want to log this as exception + } + catch(Exception e) { + //noinspection StatementWithEmptyBody + if (e instanceof SSLException && e.getMessage().contains("Socket closed")) { + ; // regular use case when a peer closes its connection - we don't want to log this as exception + } + else if (e instanceof SSLHandshakeException && e.getCause() instanceof EOFException) { + ; // Ignore SSL handshakes closed early (usually liveness probes) + } + else { + if(server.logDetails()) + server.log.warn("failed sending message", e); + else + server.log.warn("failed sending message: " + e); + } + } + finally { + server.notifyConnectionClosed(TcpConnectionNonBlocking.this); + } + } + } +} diff --git a/src/org/jgroups/blocks/cs/TcpServer.java b/src/org/jgroups/blocks/cs/TcpServer.java index d11fff66720..4b7060f7419 100644 --- a/src/org/jgroups/blocks/cs/TcpServer.java +++ b/src/org/jgroups/blocks/cs/TcpServer.java @@ -115,9 +115,11 @@ public void run() { protected void handleAccept(final Socket client_sock) throws Exception { TcpConnection conn=null; try { - conn=new TcpConnection(client_sock, TcpServer.this); + conn=non_blocking_sends? new TcpConnectionNonBlocking(client_sock, TcpServer.this).maxSize(max_send_queue) + : new TcpConnection(client_sock, TcpServer.this); + Address peer_addr=conn.peerAddress(); - synchronized(this) { + synchronized(TcpServer.this) { boolean conn_exists=hasConnection(peer_addr), replace=conn_exists && use_peer_connections && local_addr.compareTo(peer_addr) < 0; // bigger conn wins diff --git a/src/org/jgroups/demos/Chat.java b/src/org/jgroups/demos/Chat.java index 2c4acdc66b9..0f2ec51e2ff 100644 --- a/src/org/jgroups/demos/Chat.java +++ b/src/org/jgroups/demos/Chat.java @@ -33,15 +33,6 @@ private void start(String props, String name, boolean nohup) throws Exception { try { channel=new JChannel(props).name(name); channel.setReceiver(this); - - // todo: remove once https://issues.redhat.com/browse/JGRP-2689 has been implemented - /* - TP tp=channel.getProtocolStack().getTransport(); - SSLContext context=TLS.getSSLContext(); - DefaultSocketFactory ssl_factory=new DefaultSocketFactory(context); - tp.setSocketFactory(ssl_factory); - */ - channel.connect(CLUSTER); } catch(Exception ex) { diff --git a/src/org/jgroups/protocols/TCP.java b/src/org/jgroups/protocols/TCP.java index 9413914ef68..223e62d0234 100644 --- a/src/org/jgroups/protocols/TCP.java +++ b/src/org/jgroups/protocols/TCP.java @@ -52,6 +52,12 @@ public TCP() {} @Component(name="tls",description="Contains the attributes for TLS (SSL sockets) when enabled=true") protected TLS tls=new TLS(); + @Property(description="use bounded queues for sending (https://issues.redhat.com/browse/JGRP-2759)") + protected boolean non_blocking_sends; + + @Property(description="when sending and non_blocking, how many messages to queue max") + protected int max_send_queue=128; + public int getBufferedInputStreamSize() { return buffered_input_stream_size; } @@ -69,10 +75,14 @@ public TCP setBufferedOutputStreamSize(int buffered_output_stream_size) { this.buffered_output_stream_size=buffered_output_stream_size; return this; } - public TLS tls() {return tls;} - public TCP tls(TLS t) {this.tls=t; return this;} - public boolean logAcceptError() {return log_accept_error;} - public TCP logAcceptError(boolean l) {this.log_accept_error=l; if(srv != null) srv.setLogAcceptError(l); return this;} + public TLS tls() {return tls;} + public TCP tls(TLS t) {this.tls=t; return this;} + public boolean logAcceptError() {return log_accept_error;} + public TCP logAcceptError(boolean l) {this.log_accept_error=l; if(srv != null) srv.setLogAcceptError(l); return this;} + public boolean nonBlockingSends() {return non_blocking_sends;} + public TCP nonBlockingSends(boolean b) {this.non_blocking_sends=b; return this;} + public int maxSendQueue() {return max_send_queue;} + public TCP maxSendQueue(int s) {this.max_send_queue=s; return this;} @ManagedAttribute public int getOpenConnections() { @@ -110,17 +120,19 @@ public void start() throws Exception { } srv=new TcpServer(getThreadFactory(), getSocketFactory(), bind_addr, bind_port, bind_port+port_range, external_addr, external_port, recv_buf_size).setLogAcceptError(log_accept_error); - srv.receiver(this) + + srv.setBufferedInputStreamSize(buffered_input_stream_size).setBufferedOutputStreamSize(buffered_output_stream_size) + .peerAddressReadTimeout(peer_addr_read_timeout) + .nonBlockingSends(non_blocking_sends).maxSendQueue(max_send_queue) + .usePeerConnections(true) + .useAcks(this.use_acks) + .socketFactory(getSocketFactory()) + .receiver(this) .timeService(time_service) .socketConnectionTimeout(sock_conn_timeout) .tcpNodelay(tcp_nodelay).linger(linger) .clientBindAddress(client_bind_addr).clientBindPort(client_bind_port).deferClientBinding(defer_client_bind_addr) .log(this.log).logDetails(this.log_details); - srv.setBufferedInputStreamSize(buffered_input_stream_size).setBufferedOutputStreamSize(buffered_output_stream_size) - .peerAddressReadTimeout(peer_addr_read_timeout) - .usePeerConnections(true) - .useAcks(this.use_acks) - .socketFactory(getSocketFactory()); if(send_buf_size > 0) srv.sendBufferSize(send_buf_size); diff --git a/src/org/jgroups/stack/Protocol.java b/src/org/jgroups/stack/Protocol.java index 79a6725a4d9..34ccb1ecbe9 100644 --- a/src/org/jgroups/stack/Protocol.java +++ b/src/org/jgroups/stack/Protocol.java @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.List; import java.util.Objects; +import java.util.function.Predicate; import java.util.stream.Collectors; @@ -350,7 +351,7 @@ public Object up(Message msg) { * (calling {@link #accept(Message)}), and - if true - calls {@link #up(org.jgroups.Event)} * for that message and removes the message. If the batch is not empty, it is passed up, or else it is dropped.

* Subclasses should check if there are any messages destined for them (e.g. using - * {@link MessageBatch#getMatchingMessages(short,boolean)}), then possibly remove and process them and finally pass + * {@link MessageBatch#iterator(Predicate)}), then possibly remove and process them and finally pass * the batch up to the next protocol. Protocols can also modify messages in place, e.g. ENCRYPT could decrypt all * encrypted messages in the batch, not remove them, and pass the batch up when done. * @param batch The message batch