Skip to content

Commit

Permalink
- Removed Reader in NioConnection
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Jan 18, 2024
1 parent e1fa2e2 commit 9f06e74
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 40 deletions.
7 changes: 5 additions & 2 deletions src/org/jgroups/blocks/cs/NioBaseServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ public void run() {
while(it.hasNext()) {
SelectionKey key=it.next();
NioConnection conn=(NioConnection)key.attachment();
it.remove();
try {
if(!key.isValid())
continue;
Expand All @@ -152,8 +151,12 @@ else if(key.isAcceptable())
}
}
catch(Throwable ex) {
// key.cancel(); // todo: really???????????????????????????????
closeConnection(conn);
}
finally {
it.remove();
}
}
}
acceptorDone();
Expand All @@ -165,7 +168,7 @@ protected boolean doSelect() {
int num=selector.select();
num_selects++;
checkforPendingRegistrations();
if(num == 0) return true;
return num >= 0;
}
catch(ClosedSelectorException closed_ex) {
log.trace("selector was closed; acceptor terminating");
Expand Down
73 changes: 41 additions & 32 deletions src/org/jgroups/blocks/cs/NioConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BooleanSupplier;

import static java.nio.channels.SelectionKey.*;

/**
* An NIO based impl of {@link Connection}
* @author Bela Ban
Expand All @@ -38,7 +40,7 @@ public class NioConnection extends Connection {
protected final NioBaseServer server;

protected final Buffers send_buf; // send messages via gathering writes
protected boolean write_interest_set; // set when a send() didn't manage to send all data
// protected boolean write_interest_set; // set when a send() didn't manage to send all data
protected boolean copy_on_partial_write=true;
protected int partial_writes; // number of partial writes (write which did not write all bytes)
protected final Lock send_lock=new ReentrantLock(); // serialize send()
Expand Down Expand Up @@ -149,10 +151,10 @@ protected void connect(Address dest, boolean send_local_addr) throws Exception {
try {
if(!server.deferClientBinding())
this.channel.bind(new InetSocketAddress(server.clientBindAddress(), server.clientBindPort()));
this.key=server.register(channel, SelectionKey.OP_CONNECT | SelectionKey.OP_READ, this);
this.key=server.register(channel, OP_CONNECT | OP_READ, this);
boolean success=Util.connect(channel, destAddr);
if(success || channel.finishConnect()) {
clearSelectionKey(SelectionKey.OP_CONNECT);
clearSelectionKey(OP_CONNECT);
this.connected=channel.isConnected();
}
if(this.channel.getLocalAddress() != null && this.channel.getLocalAddress().equals(destAddr))
Expand Down Expand Up @@ -187,17 +189,20 @@ public void send(ByteBuffer buf) throws Exception {
send(buf, true);
}


public void send() throws Exception {
send_lock.lock();
try {
boolean success=send_buf.write(channel);
writeInterest(!success);
if(success)
if(success) {
clearSelectionKey(OP_WRITE);
updateLastAccessed();
if(!success) {
if(copy_on_partial_write)
send_buf.copy(); // copy data on partial write as subsequent writes might corrupt data (https://issues.redhat.com/browse/JGRP-1991)
}
else {
registerSelectionKey(OP_WRITE);
if(copy_on_partial_write) {
// copy data on partial write as further writes might corrupt data (https://issues.redhat.com/browse/JGRP-1991)
send_buf.copy();
}
partial_writes++;
}
}
Expand All @@ -210,7 +215,19 @@ public void send() throws Exception {

/** Read the length first, then the actual data. This method is not reentrant and access must be synchronized */
public void receive() throws Exception {
reader.receive();
for(;;) { // try to receive as many msgs as possible, until no more msgs are ready or the conn is closed
try {
if(!_receive(false))
break;
updateLastAccessed();
}
catch(Exception ex) {
if(!(ex instanceof SocketException || ex instanceof EOFException || ex instanceof ClosedChannelException))
server.log.warn("failed handling message", ex);
server.closeConnection(NioConnection.this);
return;
}
}
}

protected void send(ByteBuffer buf, boolean send_length) throws Exception {
Expand All @@ -222,17 +239,17 @@ protected void send(ByteBuffer buf, boolean send_length) throws Exception {
else
send_buf.add(buf);
boolean success=send_buf.write(channel);
writeInterest(!success);
if(success)
updateLastAccessed();
else {
if(!success) {
registerSelectionKey(OP_WRITE);
if(copy_on_partial_write)
send_buf.copy(); // copy data on partial write as subsequent writes might corrupt data (https://issues.redhat.com/browse/JGRP-1991)
partial_writes++;
}
updateLastAccessed();
}
catch(Exception ex) {
server.log().error("%s: failed sending message to %s: %s", server.localAddress(), peerAddress(), ex);
if(!(ex instanceof SocketException || ex instanceof EOFException || ex instanceof ClosedChannelException))
server.log().error("%s: failed sending message to %s: %s", server.localAddress(), peerAddress(), ex);
throw ex;
}
finally {
Expand Down Expand Up @@ -312,20 +329,12 @@ protected long getTimestamp() {
return server.timeService() != null? server.timeService().timestamp() : System.nanoTime();
}

protected void writeInterest(boolean register) {
if(register) {
if(!write_interest_set) {
write_interest_set=true;
registerSelectionKey(SelectionKey.OP_WRITE);
}
}
else {
if(write_interest_set) {
write_interest_set=false;
clearSelectionKey(SelectionKey.OP_WRITE);
}
}
}
/*protected void writeInterest(boolean register) {
if(register)
registerSelectionKey(SelectionKey.OP_WRITE);
else
clearSelectionKey(SelectionKey.OP_WRITE);
}*/

protected void setSocketParameters(Socket client_sock) throws SocketException {
try {
Expand Down Expand Up @@ -457,7 +466,7 @@ public void receive() {
try {
data_available=true;
// only a single receive() at a time, until OP_READ is registered again (by the reader thread)
clear(SelectionKey.OP_READ);
clear(OP_READ);
switch(state) {
case reading:
break;
Expand All @@ -481,7 +490,7 @@ public void run() {
_run();
}
finally {
register(SelectionKey.OP_READ);
register(OP_READ);
}
}

Expand Down Expand Up @@ -511,7 +520,7 @@ protected void _run() {
// Transition to state waiting_to_terminate and wait for server.readerIdleTime() ms
state(State.waiting_to_terminate);
data_available=false;
register(SelectionKey.OP_READ); // now we might get receive() calls again
register(OP_READ); // now we might get receive() calls again
if(data_available_cond.waitFor(is_data_available, server.readerIdleTime(), TimeUnit.MILLISECONDS))
state(State.reading);
else {
Expand Down
13 changes: 8 additions & 5 deletions src/org/jgroups/blocks/cs/NioServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@
import org.jgroups.util.*;

import java.net.InetAddress;
import java.nio.channels.*;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

import static java.nio.channels.SelectionKey.OP_READ;

/**
* Server for sending and receiving messages via NIO channels. Uses only a single thread to accept, connect, write and
Expand Down Expand Up @@ -75,11 +80,11 @@ public NioServer(ThreadFactory thread_factory, SocketFactory socket_factory, Ine
@Override
protected void handleAccept(SelectionKey key) throws Exception {
SocketChannel client_channel=channel.accept();
NioConnection conn=null;
if(client_channel == null) return; // can happen if no connection is available to accept
NioConnection conn=null;
try {
conn=new NioConnection(client_channel, NioServer.this);
SelectionKey client_key=client_channel.register(selector, SelectionKey.OP_READ, conn);
SelectionKey client_key=client_channel.register(selector, OP_READ, conn);
conn.key(client_key); // we need to set the selection key of the client channel *not* the server channel
Address peer_addr=conn.peerAddress();
if(use_peer_connections)
Expand Down Expand Up @@ -113,8 +118,6 @@ public synchronized void start() throws Exception {
public synchronized void stop() {
super.stop();
if(running.compareAndSet(true, false)) {
// Util.close(selector); // closing the selector also stops the acceptor thread
// socket_factory.close(channel);
selector.wakeup();
// Wait for server channel to close (via acceptorDone())
Util.interruptAndWaitToDie(acceptor);
Expand Down
3 changes: 2 additions & 1 deletion tests/junit-functional/org/jgroups/tests/ServerTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ protected static void send(String str, BaseServer server, Address dest) {
server.send(dest, data, 0, data.length);
}
catch(Exception e) {
System.err.printf("failed sending a message to %s: %s\n", dest, e);
//System.err.printf("failed sending a message to %s: %s\n", dest, e);
e.printStackTrace();
}
}

Expand Down

0 comments on commit 9f06e74

Please sign in to comment.