diff --git a/src/org/jgroups/blocks/cs/NioBaseServer.java b/src/org/jgroups/blocks/cs/NioBaseServer.java index 4346f98519..66158064c7 100644 --- a/src/org/jgroups/blocks/cs/NioBaseServer.java +++ b/src/org/jgroups/blocks/cs/NioBaseServer.java @@ -129,7 +129,6 @@ public void run() { while(it.hasNext()) { SelectionKey key=it.next(); NioConnection conn=(NioConnection)key.attachment(); - it.remove(); try { if(!key.isValid()) continue; @@ -152,8 +151,12 @@ else if(key.isAcceptable()) } } catch(Throwable ex) { + // key.cancel(); // todo: really??????????????????????????????? closeConnection(conn); } + finally { + it.remove(); + } } } acceptorDone(); @@ -165,7 +168,7 @@ protected boolean doSelect() { int num=selector.select(); num_selects++; checkforPendingRegistrations(); - if(num == 0) return true; + return num >= 0; } catch(ClosedSelectorException closed_ex) { log.trace("selector was closed; acceptor terminating"); diff --git a/src/org/jgroups/blocks/cs/NioConnection.java b/src/org/jgroups/blocks/cs/NioConnection.java index 3ace867df1..01d0ebea0d 100644 --- a/src/org/jgroups/blocks/cs/NioConnection.java +++ b/src/org/jgroups/blocks/cs/NioConnection.java @@ -27,6 +27,8 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.BooleanSupplier; +import static java.nio.channels.SelectionKey.*; + /** * An NIO based impl of {@link Connection} * @author Bela Ban @@ -38,7 +40,7 @@ public class NioConnection extends Connection { protected final NioBaseServer server; protected final Buffers send_buf; // send messages via gathering writes - protected boolean write_interest_set; // set when a send() didn't manage to send all data + // protected boolean write_interest_set; // set when a send() didn't manage to send all data protected boolean copy_on_partial_write=true; protected int partial_writes; // number of partial writes (write which did not write all bytes) protected final Lock send_lock=new ReentrantLock(); // serialize send() @@ -149,10 +151,10 @@ protected void connect(Address dest, boolean send_local_addr) throws Exception { try { if(!server.deferClientBinding()) this.channel.bind(new InetSocketAddress(server.clientBindAddress(), server.clientBindPort())); - this.key=server.register(channel, SelectionKey.OP_CONNECT | SelectionKey.OP_READ, this); + this.key=server.register(channel, OP_CONNECT | OP_READ, this); boolean success=Util.connect(channel, destAddr); if(success || channel.finishConnect()) { - clearSelectionKey(SelectionKey.OP_CONNECT); + clearSelectionKey(OP_CONNECT); this.connected=channel.isConnected(); } if(this.channel.getLocalAddress() != null && this.channel.getLocalAddress().equals(destAddr)) @@ -187,17 +189,20 @@ public void send(ByteBuffer buf) throws Exception { send(buf, true); } - public void send() throws Exception { send_lock.lock(); try { boolean success=send_buf.write(channel); - writeInterest(!success); - if(success) + if(success) { + clearSelectionKey(OP_WRITE); updateLastAccessed(); - if(!success) { - if(copy_on_partial_write) - send_buf.copy(); // copy data on partial write as subsequent writes might corrupt data (https://issues.redhat.com/browse/JGRP-1991) + } + else { + registerSelectionKey(OP_WRITE); + if(copy_on_partial_write) { + // copy data on partial write as further writes might corrupt data (https://issues.redhat.com/browse/JGRP-1991) + send_buf.copy(); + } partial_writes++; } } @@ -210,7 +215,19 @@ public void send() throws Exception { /** Read the length first, then the actual data. This method is not reentrant and access must be synchronized */ public void receive() throws Exception { - reader.receive(); + for(;;) { // try to receive as many msgs as possible, until no more msgs are ready or the conn is closed + try { + if(!_receive(false)) + break; + updateLastAccessed(); + } + catch(Exception ex) { + if(!(ex instanceof SocketException || ex instanceof EOFException || ex instanceof ClosedChannelException)) + server.log.warn("failed handling message", ex); + server.closeConnection(NioConnection.this); + return; + } + } } protected void send(ByteBuffer buf, boolean send_length) throws Exception { @@ -222,17 +239,17 @@ protected void send(ByteBuffer buf, boolean send_length) throws Exception { else send_buf.add(buf); boolean success=send_buf.write(channel); - writeInterest(!success); - if(success) - updateLastAccessed(); - else { + if(!success) { + registerSelectionKey(OP_WRITE); if(copy_on_partial_write) send_buf.copy(); // copy data on partial write as subsequent writes might corrupt data (https://issues.redhat.com/browse/JGRP-1991) partial_writes++; } + updateLastAccessed(); } catch(Exception ex) { - server.log().error("%s: failed sending message to %s: %s", server.localAddress(), peerAddress(), ex); + if(!(ex instanceof SocketException || ex instanceof EOFException || ex instanceof ClosedChannelException)) + server.log().error("%s: failed sending message to %s: %s", server.localAddress(), peerAddress(), ex); throw ex; } finally { @@ -312,20 +329,12 @@ protected long getTimestamp() { return server.timeService() != null? server.timeService().timestamp() : System.nanoTime(); } - protected void writeInterest(boolean register) { - if(register) { - if(!write_interest_set) { - write_interest_set=true; - registerSelectionKey(SelectionKey.OP_WRITE); - } - } - else { - if(write_interest_set) { - write_interest_set=false; - clearSelectionKey(SelectionKey.OP_WRITE); - } - } - } + /*protected void writeInterest(boolean register) { + if(register) + registerSelectionKey(SelectionKey.OP_WRITE); + else + clearSelectionKey(SelectionKey.OP_WRITE); + }*/ protected void setSocketParameters(Socket client_sock) throws SocketException { try { @@ -457,7 +466,7 @@ public void receive() { try { data_available=true; // only a single receive() at a time, until OP_READ is registered again (by the reader thread) - clear(SelectionKey.OP_READ); + clear(OP_READ); switch(state) { case reading: break; @@ -481,7 +490,7 @@ public void run() { _run(); } finally { - register(SelectionKey.OP_READ); + register(OP_READ); } } @@ -511,7 +520,7 @@ protected void _run() { // Transition to state waiting_to_terminate and wait for server.readerIdleTime() ms state(State.waiting_to_terminate); data_available=false; - register(SelectionKey.OP_READ); // now we might get receive() calls again + register(OP_READ); // now we might get receive() calls again if(data_available_cond.waitFor(is_data_available, server.readerIdleTime(), TimeUnit.MILLISECONDS)) state(State.reading); else { diff --git a/src/org/jgroups/blocks/cs/NioServer.java b/src/org/jgroups/blocks/cs/NioServer.java index e81128a1f6..1374c34c7a 100644 --- a/src/org/jgroups/blocks/cs/NioServer.java +++ b/src/org/jgroups/blocks/cs/NioServer.java @@ -5,7 +5,12 @@ import org.jgroups.util.*; import java.net.InetAddress; -import java.nio.channels.*; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; + +import static java.nio.channels.SelectionKey.OP_READ; /** * Server for sending and receiving messages via NIO channels. Uses only a single thread to accept, connect, write and @@ -75,11 +80,11 @@ public NioServer(ThreadFactory thread_factory, SocketFactory socket_factory, Ine @Override protected void handleAccept(SelectionKey key) throws Exception { SocketChannel client_channel=channel.accept(); - NioConnection conn=null; if(client_channel == null) return; // can happen if no connection is available to accept + NioConnection conn=null; try { conn=new NioConnection(client_channel, NioServer.this); - SelectionKey client_key=client_channel.register(selector, SelectionKey.OP_READ, conn); + SelectionKey client_key=client_channel.register(selector, OP_READ, conn); conn.key(client_key); // we need to set the selection key of the client channel *not* the server channel Address peer_addr=conn.peerAddress(); if(use_peer_connections) @@ -113,8 +118,6 @@ public synchronized void start() throws Exception { public synchronized void stop() { super.stop(); if(running.compareAndSet(true, false)) { - // Util.close(selector); // closing the selector also stops the acceptor thread - // socket_factory.close(channel); selector.wakeup(); // Wait for server channel to close (via acceptorDone()) Util.interruptAndWaitToDie(acceptor); diff --git a/tests/junit-functional/org/jgroups/tests/ServerTests.java b/tests/junit-functional/org/jgroups/tests/ServerTests.java index 0d251de55f..09d3fc44d1 100644 --- a/tests/junit-functional/org/jgroups/tests/ServerTests.java +++ b/tests/junit-functional/org/jgroups/tests/ServerTests.java @@ -181,7 +181,8 @@ protected static void send(String str, BaseServer server, Address dest) { server.send(dest, data, 0, data.length); } catch(Exception e) { - System.err.printf("failed sending a message to %s: %s\n", dest, e); + //System.err.printf("failed sending a message to %s: %s\n", dest, e); + e.printStackTrace(); } }