Skip to content

Commit

Permalink
- GossipRouter: added reaper_interval (https://issues.redhat.com/brow…
Browse files Browse the repository at this point in the history
…se/JGRP-2753)

- Moved methods/fields from Tcp/NioConnection -> Connection

- Removed updating the timestamp of a connection on sending of messages, but only on reception
  • Loading branch information
belaban committed Jan 29, 2024
1 parent 77d967f commit 817bacc
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 93 deletions.
21 changes: 17 additions & 4 deletions src/org/jgroups/blocks/cs/BaseServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

/**
* Abstract class for a server handling sending, receiving and connection management.
Expand All @@ -38,7 +40,7 @@ public abstract class BaseServer implements Closeable, ConnectionListener {
protected final ThreadFactory factory;
protected SocketFactory socket_factory=new DefaultSocketFactory();
protected long reaperInterval;
protected Reaper reaper;
protected Reaper reaper;
protected Receiver receiver;
protected final AtomicBoolean running=new AtomicBoolean(false);
protected Log log=LogFactory.getLog(getClass());
Expand Down Expand Up @@ -83,7 +85,6 @@ protected BaseServer(ThreadFactory f, SocketFactory sf, int recv_buf_size) {
}



public Receiver receiver() {return receiver;}
public BaseServer receiver(Receiver r) {this.receiver=r; return this;}
public long reaperInterval() {return reaperInterval;}
Expand Down Expand Up @@ -443,6 +444,10 @@ public synchronized void clearConnections() {
conns.clear();
}

public void forAllConnections(BiConsumer<Address,Connection> c) {
conns.forEach(c);
}

/** Removes all connections which are not in current_mbrs */
public void retainAll(Collection<Address> current_mbrs) {
if(current_mbrs == null)
Expand Down Expand Up @@ -483,8 +488,16 @@ public void notifyConnectionEstablished(Connection conn) {


public String toString() {
return new StringBuilder(getClass().getSimpleName()).append(": local_addr=").append(local_addr).append("\n")
.append("connections (" + conns.size() + "):\n").append(super.toString()).append('\n').toString();
return toString(false);
}

public String toString(boolean details) {
String s=String.format("%s (%s, %d conns)", getClass().getSimpleName(), local_addr, conns.size());
if(details && !conns.isEmpty()) {
String tmp=conns.entrySet().stream().map(e -> String.format("%s: %s", e.getKey(), e.getValue())).collect(Collectors.joining("\n"));
return String.format("%s:\n%s", s, tmp);
}
return s;
}


Expand Down
17 changes: 15 additions & 2 deletions src/org/jgroups/blocks/cs/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,32 @@
*/
public abstract class Connection implements Closeable {
public static final byte[] cookie= { 'b', 'e', 'l', 'a' };
protected BaseServer server;
protected Address peer_addr; // address of the 'other end' of the connection
protected long last_access; // timestamp of the last access to this connection (read or write)

abstract public boolean isConnected();
abstract public boolean isConnectionPending();
abstract public boolean isClosed();
abstract public Address localAddress();
abstract public Address peerAddress();
abstract public boolean isExpired(long millis);
public Address peerAddress() {return peer_addr;}
abstract public void flush(); // sends pending data
abstract public void connect(Address dest) throws Exception;
abstract public void start() throws Exception;
abstract public void send(byte[] buf, int offset, int length) throws Exception;
abstract public void send(ByteBuffer buf) throws Exception;
abstract public String status();

protected long getTimestamp() {
return server.timeService() != null? server.timeService().timestamp() : System.nanoTime();
}

protected void updateLastAccessed() {
if(server.connExpireTime() > 0)
last_access=getTimestamp();
}

public boolean isExpired(long now) {
return server.connExpireTime() > 0 && now - last_access >= server.connExpireTime();
}
}
2 changes: 0 additions & 2 deletions src/org/jgroups/blocks/cs/NioBaseServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ public synchronized int numPartialWrites() {
}




/** Prints send and receive buffers for all connections */
@ManagedOperation(description="Prints the send and receive buffers")
public String printBuffers() {
Expand Down
25 changes: 4 additions & 21 deletions src/org/jgroups/blocks/cs/NioConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
public class NioConnection extends Connection {
protected SocketChannel channel; // the channel to the peer
protected SelectionKey key;
protected final NioBaseServer server;
protected final Buffers send_buf; // send messages via gathering writes
protected final ByteBuffer length_buf=ByteBuffer.allocate(Integer.BYTES); // reused: send the length of the next buf
protected boolean copy_on_partial_write=true;
Expand Down Expand Up @@ -79,11 +78,6 @@ public NioConnection(SocketChannel channel, NioBaseServer server) throws Excepti
@Override
public boolean isClosed() {return channel == null || !channel.isOpen();}

@Override
public boolean isExpired(long now) {return server.connExpireTime() > 0 && now - last_access >= server.connExpireTime();}

protected void updateLastAccessed() {if(server.connExpireTime() > 0) last_access=getTimestamp();}

@Override
public Address localAddress() {
InetSocketAddress local_addr=null;
Expand All @@ -93,7 +87,6 @@ public Address localAddress() {
return local_addr != null? new IpAddress(local_addr) : null;
}

public Address peerAddress() {return peer_addr;}
public SelectionKey key() {return key;}
public NioConnection key(SelectionKey k) {this.key=k; return this;}
public NioConnection copyOnPartialWrite(boolean b) {this.copy_on_partial_write=b; return this;}
Expand All @@ -120,7 +113,7 @@ protected void connect(Address dest, boolean send_local_addr) throws Exception {
try {
if(!server.deferClientBinding())
this.channel.bind(new InetSocketAddress(server.clientBindAddress(), server.clientBindPort()));
this.key=server.register(channel, OP_CONNECT | OP_READ, this);
this.key=((NioBaseServer)server).register(channel, OP_CONNECT | OP_READ, this);
boolean success=Util.connect(channel, destAddr);
if(success || channel.finishConnect())
clearSelectionKey(OP_CONNECT);
Expand Down Expand Up @@ -171,7 +164,6 @@ protected void send(ByteBuffer buf, boolean send_length) throws Exception {
send_buf.copy(); // copy data on partial write as further writes might corrupt data (https://issues.redhat.com/browse/JGRP-1991)
partial_writes++;
}
updateLastAccessed();
}
catch(Exception ex) {
if(!(ex instanceof SocketException || ex instanceof EOFException || ex instanceof ClosedChannelException))
Expand All @@ -187,16 +179,12 @@ public void send() throws Exception {
send_lock.lock();
try {
boolean success=send_buf.write(channel);
if(success) {
if(success)
clearSelectionKey(OP_WRITE);
updateLastAccessed();
}
else {
// registerSelectionKey(OP_WRITE);
if(copy_on_partial_write) {
// copy data on partial write as further writes might corrupt data (https://issues.redhat.com/browse/JGRP-1991)
// copy data on partial write as further writes might corrupt data (https://issues.redhat.com/browse/JGRP-1991)
if(copy_on_partial_write)
send_buf.copy();
}
partial_writes++;
}
}
Expand Down Expand Up @@ -285,10 +273,6 @@ public String status() {
return "open";
}

protected long getTimestamp() {
return server.timeService() != null? server.timeService().timestamp() : System.nanoTime();
}

protected void setSocketParameters(Socket client_sock) throws SocketException {
try {
if(server.sendBufferSize() > 0)
Expand Down Expand Up @@ -324,7 +308,6 @@ protected void sendLocalAddress(Address local_addr) throws Exception {
local_addr.writeTo(out);
ByteBuffer buf=ByteBuffer.wrap(out.buffer(), 0, out.position());
send(buf, false);
updateLastAccessed();
}
catch(Exception ex) {
close();
Expand Down
44 changes: 7 additions & 37 deletions src/org/jgroups/blocks/cs/TcpConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ public class TcpConnection extends Connection {
protected OutputStream out;
protected DataInputStream in;
protected volatile Receiver receiver;
protected final TcpBaseServer server;
protected final AtomicInteger writers=new AtomicInteger(0); // to determine the last writer to flush
protected volatile boolean connected;
protected final byte[] length_buf=new byte[Integer.BYTES]; // used to write the length of the data
Expand Down Expand Up @@ -74,14 +73,6 @@ public Address localAddress() {
return local_addr != null? new IpAddress(local_addr) : null;
}

public Address peerAddress() {
return peer_addr;
}

protected long getTimestamp() {
return server.timeService() != null? server.timeService().timestamp() : System.nanoTime();
}

protected String getSockAddress() {
StringBuilder sb=new StringBuilder();
if(sock != null) {
Expand All @@ -91,11 +82,6 @@ protected String getSockAddress() {
return sb.toString();
}

protected void updateLastAccessed() {
if(server.connExpireTime() > 0)
last_access=getTimestamp();
}

public void connect(Address dest) throws Exception {
connect(dest, server.usePeerConnections(), server.useAcks());
}
Expand Down Expand Up @@ -136,29 +122,19 @@ else if(Arrays.equals(BaseServer.FAIL, ack))
}
}


public void start() {
if(receiver != null)
receiver.stop();
receiver=new Receiver(server.factory).start();
}



/**
*
* @param data Guaranteed to be non-null
* @param offset
* @param length
*/
public void send(byte[] data, int offset, int length) throws Exception {
if(out == null)
return;
writers.incrementAndGet();
send_lock.lock();
try {
doSend(data, offset, length);
updateLastAccessed();
}
catch(InterruptedException iex) {
Thread.currentThread().interrupt(); // set interrupt flag again
Expand Down Expand Up @@ -202,12 +178,12 @@ public void flush() {
}

protected OutputStream createDataOutputStream(OutputStream out) {
int size=server.getBufferedOutputStreamSize();
int size=((TcpBaseServer)server).getBufferedOutputStreamSize();
return size == 0? out : new BufferedOutputStream(out, size);
}

protected DataInputStream createDataInputStream(InputStream in) {
int size=server.getBufferedInputStreamSize();
int size=((TcpBaseServer)server).getBufferedInputStreamSize();
return size == 0? new DataInputStream(in) : new DataInputStream(new BufferedInputStream(in, size));
}

Expand Down Expand Up @@ -248,7 +224,6 @@ protected void sendLocalAddress(Address local_addr) throws Exception {
local_addr.writeTo(os);
out.write(os.buffer(), 0, os.position());
out.flush(); // needed ?
updateLastAccessed();
}
catch(Exception ex) {
server.socket_factory.close(this.sock);
Expand All @@ -263,7 +238,7 @@ protected void sendLocalAddress(Address local_addr) throws Exception {
*/
protected Address readPeerAddress(Socket client_sock) throws Exception {
int timeout=client_sock.getSoTimeout();
client_sock.setSoTimeout(server.peerAddressReadTimeout());
client_sock.setSoTimeout(((TcpBaseServer)server).peerAddressReadTimeout());

try {
// read the cookie first
Expand Down Expand Up @@ -369,24 +344,19 @@ public String status() {
return "open";
}

public boolean isExpired(long now) {
return server.conn_expire_time > 0 && now - last_access >= server.conn_expire_time;
}

public boolean isConnected() {
@Override public boolean isConnected() {
return connected;
}

public boolean isConnectionPending() {
@Override public boolean isConnectionPending() {
return false;
}

@Override
public boolean isClosed() {
@Override public boolean isClosed() {
return sock == null || sock.isClosed();
}

public void close() throws IOException {
@Override public void close() throws IOException {
Util.close(sock); // fix for https://issues.redhat.com/browse/JGRP-2350
send_lock.lock();
try {
Expand Down
21 changes: 16 additions & 5 deletions src/org/jgroups/stack/GossipRouter.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,14 @@ public class GossipRouter extends ReceiverAdapter implements ConnectionListener,
@ManagedAttribute(description="server port on which the GossipRouter accepts client connections", writable=true)
protected int port=12001;

@ManagedAttribute(description="time (in msecs) until gossip entry expires. 0 disables expiration.",
@ManagedAttribute(description="Time (in msecs) until idle client connections are closed. 0 disables expiration.",
writable=true,type=AttributeType.TIME)
protected long expiry_time;

@ManagedAttribute(description="Interval (in msecs) to check for expired connections and close them. 0 disables reaping.",
writable=true,type=AttributeType.TIME)
protected long reaper_interval;

@ManagedAttribute(description="Time (in ms) for setting SO_LINGER on sockets returned from accept(). 0 means do not set SO_LINGER"
,type=AttributeType.TIME)
protected int linger_timeout=-1;
Expand Down Expand Up @@ -89,7 +93,6 @@ public class GossipRouter extends ReceiverAdapter implements ConnectionListener,
protected int max_length;
protected BaseServer server;
protected final AtomicBoolean running=new AtomicBoolean(false);
protected Timer timer;
protected final Log log=LogFactory.getLog(this.getClass());

@Component(name="diag",description="DiagnosticsHandler listening for probe requests")
Expand Down Expand Up @@ -126,6 +129,8 @@ public GossipRouter(InetAddress bind_addr, int local_port) throws Exception {
public GossipRouter port(int port) {this.port=port; return this;}
public long expiryTime() {return expiry_time;}
public GossipRouter expiryTime(long t) {this.expiry_time=t; return this;}
public long reaperInterval() {return reaper_interval;}
public GossipRouter reaperInterval(long t) {this.reaper_interval=t; return this;}
public int lingerTimeout() {return linger_timeout;}
public GossipRouter lingerTimeout(int t) {this.linger_timeout=t; return this;}
public int recvBufferSize() {return recv_buf_size;}
Expand Down Expand Up @@ -205,7 +210,7 @@ public GossipRouter start() throws Exception {
recv_buf_size, "jgroups.tcp.gossiprouter");
server.receiver(this).setMaxLength(max_length)
.addConnectionListener(this)
.connExpireTimeout(expiry_time).linger(linger_timeout);
.connExpireTimeout(expiry_time).reaperInterval(reaper_interval).linger(linger_timeout);
server.start();
Runtime.getRuntime().addShutdownHook(new Thread(GossipRouter.this::stop));
return this;
Expand Down Expand Up @@ -748,7 +753,7 @@ public Entry(Address client_addr, PhysicalAddress phys_addr, String logical_name
public static void main(String[] args) throws Exception {
int port=12001;
int recv_buf_size=0, max_length=0;
long expiry_time=0;
long expiry_time=0, reaper_interval=0;
boolean diag_enabled=true, diag_enable_udp=true, diag_enable_tcp=false;
InetAddress diag_mcast_addr=null, diag_bind_addr=null;
int diag_port=7500, diag_port_range=50, diag_ttl=8, soLinger=-1;
Expand Down Expand Up @@ -779,6 +784,10 @@ public static void main(String[] args) throws Exception {
expiry_time=Long.parseLong(args[++i]);
continue;
}
if("-reaper_interval".equals(arg)) {
reaper_interval=Long.parseLong(args[++i]);
continue;
}
if("-jmx".equals(arg)) {
jmx=Boolean.parseBoolean(args[++i]);
continue;
Expand Down Expand Up @@ -898,7 +907,7 @@ public static void main(String[] args) throws Exception {
throw new IllegalArgumentException("Cannot use NIO with TLS");

GossipRouter router=new GossipRouter(bind_addr, port)
.jmx(jmx).expiryTime(expiry_time)
.jmx(jmx).expiryTime(expiry_time).reaperInterval(reaper_interval)
.useNio(nio)
.recvBufferSize(recv_buf_size)
.lingerTimeout(soLinger)
Expand Down Expand Up @@ -963,6 +972,8 @@ static void help() {
System.out.println();
System.out.println(" -expiry <msecs> - Time for closing idle connections. 0 means don't expire.");
System.out.println();
System.out.println(" -reaper_interval <ms> - Time for check for expired connections. 0 means don't check.");
System.out.println();
System.out.println(" -nio <true|false> - Whether or not to use non-blocking connections (NIO)");
System.out.println();
System.out.println(" -max_length <bytes> - The max size (in bytes) of a message");
Expand Down
2 changes: 1 addition & 1 deletion tests/junit-functional/org/jgroups/tests/ServerTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void testConcurrentConnect(BaseServer first, BaseServer second) throws Ex
for(Sender sender: senders)
sender.join();
List<String> ids=Arrays.stream(senders).map(t -> String.valueOf(t.getId())).collect(Collectors.toList());
Util.waitUntil(1000, 100, () -> list.size() == NUM_SENDERS,
Util.waitUntil(3000, 100, () -> list.size() == NUM_SENDERS,
() -> {
list.sort(String::compareTo);
return String.format("list (%d): %s", list.size(), list);
Expand Down
Loading

0 comments on commit 817bacc

Please sign in to comment.