diff --git a/src/org/jgroups/blocks/cs/BaseServer.java b/src/org/jgroups/blocks/cs/BaseServer.java index 89061eab89..b5a39bac1a 100644 --- a/src/org/jgroups/blocks/cs/BaseServer.java +++ b/src/org/jgroups/blocks/cs/BaseServer.java @@ -25,6 +25,8 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; /** * Abstract class for a server handling sending, receiving and connection management. @@ -38,7 +40,7 @@ public abstract class BaseServer implements Closeable, ConnectionListener { protected final ThreadFactory factory; protected SocketFactory socket_factory=new DefaultSocketFactory(); protected long reaperInterval; - protected Reaper reaper; + protected Reaper reaper; protected Receiver receiver; protected final AtomicBoolean running=new AtomicBoolean(false); protected Log log=LogFactory.getLog(getClass()); @@ -83,7 +85,6 @@ protected BaseServer(ThreadFactory f, SocketFactory sf, int recv_buf_size) { } - public Receiver receiver() {return receiver;} public BaseServer receiver(Receiver r) {this.receiver=r; return this;} public long reaperInterval() {return reaperInterval;} @@ -443,6 +444,10 @@ public synchronized void clearConnections() { conns.clear(); } + public void forAllConnections(BiConsumer
c) { + conns.forEach(c); + } + /** Removes all connections which are not in current_mbrs */ public void retainAll(Collection current_mbrs) { if(current_mbrs == null) @@ -483,8 +488,16 @@ public void notifyConnectionEstablished(Connection conn) { public String toString() { - return new StringBuilder(getClass().getSimpleName()).append(": local_addr=").append(local_addr).append("\n") - .append("connections (" + conns.size() + "):\n").append(super.toString()).append('\n').toString(); + return toString(false); + } + + public String toString(boolean details) { + String s=String.format("%s (%s, %d conns)", getClass().getSimpleName(), local_addr, conns.size()); + if(details && !conns.isEmpty()) { + String tmp=conns.entrySet().stream().map(e -> String.format("%s: %s", e.getKey(), e.getValue())).collect(Collectors.joining("\n")); + return String.format("%s:\n%s", s, tmp); + } + return s; } diff --git a/src/org/jgroups/blocks/cs/Connection.java b/src/org/jgroups/blocks/cs/Connection.java index d4a7f11b25..05296dd4f4 100644 --- a/src/org/jgroups/blocks/cs/Connection.java +++ b/src/org/jgroups/blocks/cs/Connection.java @@ -11,6 +11,7 @@ */ public abstract class Connection implements Closeable { public static final byte[] cookie= { 'b', 'e', 'l', 'a' }; + protected BaseServer server; protected Address peer_addr; // address of the 'other end' of the connection protected long last_access; // timestamp of the last access to this connection (read or write) @@ -18,12 +19,24 @@ public abstract class Connection implements Closeable { abstract public boolean isConnectionPending(); abstract public boolean isClosed(); abstract public Address localAddress(); - abstract public Address peerAddress(); - abstract public boolean isExpired(long millis); + public Address peerAddress() {return peer_addr;} abstract public void flush(); // sends pending data abstract public void connect(Address dest) throws Exception; abstract public void start() throws Exception; abstract public void send(byte[] buf, int offset, int length) throws Exception; abstract public void send(ByteBuffer buf) throws Exception; abstract public String status(); + + protected long getTimestamp() { + return server.timeService() != null? server.timeService().timestamp() : System.nanoTime(); + } + + protected void updateLastAccessed() { + if(server.connExpireTime() > 0) + last_access=getTimestamp(); + } + + public boolean isExpired(long now) { + return server.connExpireTime() > 0 && now - last_access >= server.connExpireTime(); + } } diff --git a/src/org/jgroups/blocks/cs/NioBaseServer.java b/src/org/jgroups/blocks/cs/NioBaseServer.java index 0802bca286..77a21b27c7 100644 --- a/src/org/jgroups/blocks/cs/NioBaseServer.java +++ b/src/org/jgroups/blocks/cs/NioBaseServer.java @@ -70,8 +70,6 @@ public synchronized int numPartialWrites() { } - - /** Prints send and receive buffers for all connections */ @ManagedOperation(description="Prints the send and receive buffers") public String printBuffers() { diff --git a/src/org/jgroups/blocks/cs/NioConnection.java b/src/org/jgroups/blocks/cs/NioConnection.java index 7d87d523d1..906049173d 100644 --- a/src/org/jgroups/blocks/cs/NioConnection.java +++ b/src/org/jgroups/blocks/cs/NioConnection.java @@ -34,7 +34,6 @@ public class NioConnection extends Connection { protected SocketChannel channel; // the channel to the peer protected SelectionKey key; - protected final NioBaseServer server; protected final Buffers send_buf; // send messages via gathering writes protected final ByteBuffer length_buf=ByteBuffer.allocate(Integer.BYTES); // reused: send the length of the next buf protected boolean copy_on_partial_write=true; @@ -79,11 +78,6 @@ public NioConnection(SocketChannel channel, NioBaseServer server) throws Excepti @Override public boolean isClosed() {return channel == null || !channel.isOpen();} - @Override - public boolean isExpired(long now) {return server.connExpireTime() > 0 && now - last_access >= server.connExpireTime();} - - protected void updateLastAccessed() {if(server.connExpireTime() > 0) last_access=getTimestamp();} - @Override public Address localAddress() { InetSocketAddress local_addr=null; @@ -93,7 +87,6 @@ public Address localAddress() { return local_addr != null? new IpAddress(local_addr) : null; } - public Address peerAddress() {return peer_addr;} public SelectionKey key() {return key;} public NioConnection key(SelectionKey k) {this.key=k; return this;} public NioConnection copyOnPartialWrite(boolean b) {this.copy_on_partial_write=b; return this;} @@ -120,7 +113,7 @@ protected void connect(Address dest, boolean send_local_addr) throws Exception { try { if(!server.deferClientBinding()) this.channel.bind(new InetSocketAddress(server.clientBindAddress(), server.clientBindPort())); - this.key=server.register(channel, OP_CONNECT | OP_READ, this); + this.key=((NioBaseServer)server).register(channel, OP_CONNECT | OP_READ, this); boolean success=Util.connect(channel, destAddr); if(success || channel.finishConnect()) clearSelectionKey(OP_CONNECT); @@ -171,7 +164,6 @@ protected void send(ByteBuffer buf, boolean send_length) throws Exception { send_buf.copy(); // copy data on partial write as further writes might corrupt data (https://issues.redhat.com/browse/JGRP-1991) partial_writes++; } - updateLastAccessed(); } catch(Exception ex) { if(!(ex instanceof SocketException || ex instanceof EOFException || ex instanceof ClosedChannelException)) @@ -187,16 +179,12 @@ public void send() throws Exception { send_lock.lock(); try { boolean success=send_buf.write(channel); - if(success) { + if(success) clearSelectionKey(OP_WRITE); - updateLastAccessed(); - } else { - // registerSelectionKey(OP_WRITE); - if(copy_on_partial_write) { - // copy data on partial write as further writes might corrupt data (https://issues.redhat.com/browse/JGRP-1991) + // copy data on partial write as further writes might corrupt data (https://issues.redhat.com/browse/JGRP-1991) + if(copy_on_partial_write) send_buf.copy(); - } partial_writes++; } } @@ -285,10 +273,6 @@ public String status() { return "open"; } - protected long getTimestamp() { - return server.timeService() != null? server.timeService().timestamp() : System.nanoTime(); - } - protected void setSocketParameters(Socket client_sock) throws SocketException { try { if(server.sendBufferSize() > 0) @@ -324,7 +308,6 @@ protected void sendLocalAddress(Address local_addr) throws Exception { local_addr.writeTo(out); ByteBuffer buf=ByteBuffer.wrap(out.buffer(), 0, out.position()); send(buf, false); - updateLastAccessed(); } catch(Exception ex) { close(); diff --git a/src/org/jgroups/blocks/cs/TcpConnection.java b/src/org/jgroups/blocks/cs/TcpConnection.java index f6801d9d18..1fd4521c0b 100644 --- a/src/org/jgroups/blocks/cs/TcpConnection.java +++ b/src/org/jgroups/blocks/cs/TcpConnection.java @@ -34,7 +34,6 @@ public class TcpConnection extends Connection { protected OutputStream out; protected DataInputStream in; protected volatile Receiver receiver; - protected final TcpBaseServer server; protected final AtomicInteger writers=new AtomicInteger(0); // to determine the last writer to flush protected volatile boolean connected; protected final byte[] length_buf=new byte[Integer.BYTES]; // used to write the length of the data @@ -74,14 +73,6 @@ public Address localAddress() { return local_addr != null? new IpAddress(local_addr) : null; } - public Address peerAddress() { - return peer_addr; - } - - protected long getTimestamp() { - return server.timeService() != null? server.timeService().timestamp() : System.nanoTime(); - } - protected String getSockAddress() { StringBuilder sb=new StringBuilder(); if(sock != null) { @@ -91,11 +82,6 @@ protected String getSockAddress() { return sb.toString(); } - protected void updateLastAccessed() { - if(server.connExpireTime() > 0) - last_access=getTimestamp(); - } - public void connect(Address dest) throws Exception { connect(dest, server.usePeerConnections(), server.useAcks()); } @@ -136,21 +122,12 @@ else if(Arrays.equals(BaseServer.FAIL, ack)) } } - public void start() { if(receiver != null) receiver.stop(); receiver=new Receiver(server.factory).start(); } - - - /** - * - * @param data Guaranteed to be non-null - * @param offset - * @param length - */ public void send(byte[] data, int offset, int length) throws Exception { if(out == null) return; @@ -158,7 +135,6 @@ public void send(byte[] data, int offset, int length) throws Exception { send_lock.lock(); try { doSend(data, offset, length); - updateLastAccessed(); } catch(InterruptedException iex) { Thread.currentThread().interrupt(); // set interrupt flag again @@ -202,12 +178,12 @@ public void flush() { } protected OutputStream createDataOutputStream(OutputStream out) { - int size=server.getBufferedOutputStreamSize(); + int size=((TcpBaseServer)server).getBufferedOutputStreamSize(); return size == 0? out : new BufferedOutputStream(out, size); } protected DataInputStream createDataInputStream(InputStream in) { - int size=server.getBufferedInputStreamSize(); + int size=((TcpBaseServer)server).getBufferedInputStreamSize(); return size == 0? new DataInputStream(in) : new DataInputStream(new BufferedInputStream(in, size)); } @@ -248,7 +224,6 @@ protected void sendLocalAddress(Address local_addr) throws Exception { local_addr.writeTo(os); out.write(os.buffer(), 0, os.position()); out.flush(); // needed ? - updateLastAccessed(); } catch(Exception ex) { server.socket_factory.close(this.sock); @@ -263,7 +238,7 @@ protected void sendLocalAddress(Address local_addr) throws Exception { */ protected Address readPeerAddress(Socket client_sock) throws Exception { int timeout=client_sock.getSoTimeout(); - client_sock.setSoTimeout(server.peerAddressReadTimeout()); + client_sock.setSoTimeout(((TcpBaseServer)server).peerAddressReadTimeout()); try { // read the cookie first @@ -369,24 +344,19 @@ public String status() { return "open"; } - public boolean isExpired(long now) { - return server.conn_expire_time > 0 && now - last_access >= server.conn_expire_time; - } - - public boolean isConnected() { + @Override public boolean isConnected() { return connected; } - public boolean isConnectionPending() { + @Override public boolean isConnectionPending() { return false; } - @Override - public boolean isClosed() { + @Override public boolean isClosed() { return sock == null || sock.isClosed(); } - public void close() throws IOException { + @Override public void close() throws IOException { Util.close(sock); // fix for https://issues.redhat.com/browse/JGRP-2350 send_lock.lock(); try { diff --git a/src/org/jgroups/stack/GossipRouter.java b/src/org/jgroups/stack/GossipRouter.java index 8389d1489f..c3945b00a3 100644 --- a/src/org/jgroups/stack/GossipRouter.java +++ b/src/org/jgroups/stack/GossipRouter.java @@ -57,10 +57,14 @@ public class GossipRouter extends ReceiverAdapter implements ConnectionListener, @ManagedAttribute(description="server port on which the GossipRouter accepts client connections", writable=true) protected int port=12001; - @ManagedAttribute(description="time (in msecs) until gossip entry expires. 0 disables expiration.", + @ManagedAttribute(description="Time (in msecs) until idle client connections are closed. 0 disables expiration.", writable=true,type=AttributeType.TIME) protected long expiry_time; + @ManagedAttribute(description="Interval (in msecs) to check for expired connections and close them. 0 disables reaping.", + writable=true,type=AttributeType.TIME) + protected long reaper_interval; + @ManagedAttribute(description="Time (in ms) for setting SO_LINGER on sockets returned from accept(). 0 means do not set SO_LINGER" ,type=AttributeType.TIME) protected int linger_timeout=-1; @@ -89,7 +93,6 @@ public class GossipRouter extends ReceiverAdapter implements ConnectionListener, protected int max_length; protected BaseServer server; protected final AtomicBoolean running=new AtomicBoolean(false); - protected Timer timer; protected final Log log=LogFactory.getLog(this.getClass()); @Component(name="diag",description="DiagnosticsHandler listening for probe requests") @@ -126,6 +129,8 @@ public GossipRouter(InetAddress bind_addr, int local_port) throws Exception { public GossipRouter port(int port) {this.port=port; return this;} public long expiryTime() {return expiry_time;} public GossipRouter expiryTime(long t) {this.expiry_time=t; return this;} + public long reaperInterval() {return reaper_interval;} + public GossipRouter reaperInterval(long t) {this.reaper_interval=t; return this;} public int lingerTimeout() {return linger_timeout;} public GossipRouter lingerTimeout(int t) {this.linger_timeout=t; return this;} public int recvBufferSize() {return recv_buf_size;} @@ -205,7 +210,7 @@ public GossipRouter start() throws Exception { recv_buf_size, "jgroups.tcp.gossiprouter"); server.receiver(this).setMaxLength(max_length) .addConnectionListener(this) - .connExpireTimeout(expiry_time).linger(linger_timeout); + .connExpireTimeout(expiry_time).reaperInterval(reaper_interval).linger(linger_timeout); server.start(); Runtime.getRuntime().addShutdownHook(new Thread(GossipRouter.this::stop)); return this; @@ -748,7 +753,7 @@ public Entry(Address client_addr, PhysicalAddress phys_addr, String logical_name public static void main(String[] args) throws Exception { int port=12001; int recv_buf_size=0, max_length=0; - long expiry_time=0; + long expiry_time=0, reaper_interval=0; boolean diag_enabled=true, diag_enable_udp=true, diag_enable_tcp=false; InetAddress diag_mcast_addr=null, diag_bind_addr=null; int diag_port=7500, diag_port_range=50, diag_ttl=8, soLinger=-1; @@ -779,6 +784,10 @@ public static void main(String[] args) throws Exception { expiry_time=Long.parseLong(args[++i]); continue; } + if("-reaper_interval".equals(arg)) { + reaper_interval=Long.parseLong(args[++i]); + continue; + } if("-jmx".equals(arg)) { jmx=Boolean.parseBoolean(args[++i]); continue; @@ -898,7 +907,7 @@ public static void main(String[] args) throws Exception { throw new IllegalArgumentException("Cannot use NIO with TLS"); GossipRouter router=new GossipRouter(bind_addr, port) - .jmx(jmx).expiryTime(expiry_time) + .jmx(jmx).expiryTime(expiry_time).reaperInterval(reaper_interval) .useNio(nio) .recvBufferSize(recv_buf_size) .lingerTimeout(soLinger) @@ -963,6 +972,8 @@ static void help() { System.out.println(); System.out.println(" -expiry