Skip to content

Commit

Permalink
Removed synchronization in TcpServer.handleAccept() (https://issues.r…
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Feb 12, 2024
1 parent b6b036d commit 9663136
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 21 deletions.
2 changes: 1 addition & 1 deletion src/org/jgroups/blocks/cs/BaseServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public abstract class BaseServer implements Closeable, ConnectionListener {
protected final ThreadFactory factory;
protected SocketFactory socket_factory=new DefaultSocketFactory();
protected long reaperInterval;
protected Reaper reaper;
protected Reaper reaper;
protected Receiver receiver;
protected final AtomicBoolean running=new AtomicBoolean(false);
protected Log log=LogFactory.getLog(getClass());
Expand Down
35 changes: 16 additions & 19 deletions src/org/jgroups/blocks/cs/TcpServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,24 +117,22 @@ protected void handleAccept(final Socket client_sock) throws Exception {
try {
conn=new TcpConnection(client_sock, TcpServer.this);
Address peer_addr=conn.peerAddress();
synchronized(this) {
boolean conn_exists=hasConnection(peer_addr),
replace=conn_exists && use_peer_connections && local_addr.compareTo(peer_addr) < 0; // bigger conn wins

if(!conn_exists || replace) {
if(use_acks)
conn.send(OK, 0, OK.length); // do this *before* other threads can send messages!!
replaceConnection(peer_addr, conn); // closes old conn
conn.start();
log.trace("%s: accepted connection from %s", local_addr, peer_addr);
}
else {
log.trace("%s: rejected connection from %s %s", local_addr, peer_addr, explanation(conn_exists, replace));
if(use_acks)
conn.send(FAIL, 0, FAIL.length);
conn.flush();
Util.close(conn); // keep our existing conn, reject accept() and close client_sock
}
boolean conn_exists=hasConnection(peer_addr),
replace=conn_exists && use_peer_connections && local_addr.compareTo(peer_addr) < 0; // bigger conn wins

if(!conn_exists || replace) {
if(use_acks)
conn.send(OK, 0, OK.length); // do this *before* other threads can send messages!!
replaceConnection(peer_addr, conn); // closes old conn
conn.start();
log.trace("%s: accepted connection from %s", local_addr, peer_addr);
}
else {
log.trace("%s: rejected connection from %s %s", local_addr, peer_addr, explanation(conn_exists, replace));
if(use_acks)
conn.send(FAIL, 0, FAIL.length);
conn.flush();
Util.close(conn); // keep our existing conn, reject accept() and close client_sock
}
}
catch(Exception ex) {
Expand All @@ -145,6 +143,5 @@ protected void handleAccept(final Socket client_sock) throws Exception {
}



}

Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class ConcurrentConnectTest {
/**
* Creates {A,B}, then injects view {A} into A and {B} into B, this removes all TCP connections between A and B.
* Then make A send 5 messages to B and vice versa. This causes concurrent connection establishment. Even with
* UNICAST3 being absent, both A and B should receive all of the other's messages.
* UNICAST3 being absent, both A and B should receive all the other's messages.
*/
// @Test(invocationCount=10)
public void testConcurrentConnect() throws Exception {
Expand Down

0 comments on commit 9663136

Please sign in to comment.