Skip to content

Commit

Permalink
Add support for TCPSocket connect_timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
headius committed Mar 25, 2022
1 parent db149a6 commit 70df12f
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 27 deletions.
4 changes: 4 additions & 0 deletions core/src/main/java/org/jruby/Ruby.java
Original file line number Diff line number Diff line change
Expand Up @@ -3911,6 +3911,10 @@ public RaiseException newErrnoEAFNOSUPPORTError(String message) {
return newRaiseException(getErrno().getClass("EAFNOSUPPORT"), message);
}

public RaiseException newErrnoETIMEDOUTError() {
return newRaiseException(getErrno().getClass("ETIMEDOUT"), "Broken pipe");
}

public RaiseException newErrnoFromLastPOSIXErrno() {
RubyClass errnoClass = getErrno(getPosix().errno());
if (errnoClass == null) errnoClass = systemCallError;
Expand Down
111 changes: 88 additions & 23 deletions core/src/main/java/org/jruby/ext/socket/RubyTCPSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyClass;
import org.jruby.RubyHash;
import org.jruby.anno.JRubyMethod;
import org.jruby.ast.util.ArgsUtil;
import org.jruby.runtime.Block;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.Visibility;
Expand All @@ -46,7 +48,7 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NoRouteToHostException;
import java.net.Socket;
import java.net.StandardSocketOptions;
import java.net.UnknownHostException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
Expand All @@ -69,13 +71,10 @@ public RubyTCPSocket(Ruby runtime, RubyClass type) {
}


private SocketChannel attemptConnect(ThreadContext context, IRubyObject host, String localHost, int localPort,
String remoteHost, int remotePort) throws IOException {
private SocketChannel attemptConnect(ThreadContext context, String localHost, int localPort,
String remoteHost, int remotePort, RubyHash opts) throws IOException {
for (InetAddress address: InetAddress.getAllByName(remoteHost)) {
// This is a bit convoluted because (1) SocketChannel.bind is only in jdk 7 and
// (2) Socket.getChannel() seems to return null in some cases
SocketChannel channel = SocketChannel.open();
Socket socket = channel.socket();

openFile = null; // Second or later attempts will have non-closeable failed attempt to connect.

Expand All @@ -85,53 +84,119 @@ private SocketChannel attemptConnect(ThreadContext context, IRubyObject host, St
channel.configureBlocking(false);

if (localHost != null) {
socket.setReuseAddress(true);
socket.bind( new InetSocketAddress(InetAddress.getByName(localHost), localPort) );
channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
channel.bind( new InetSocketAddress(InetAddress.getByName(localHost), localPort) );
}
try {
channel.connect(new InetSocketAddress(address, remotePort));

// wait for connection
while (!context.getThread().select(channel, this, SelectionKey.OP_CONNECT)) {
context.pollThreadEvents();
long timeout = -1;
if (opts != null) {
IRubyObject timeoutObj = ArgsUtil.extractKeywordArg(context, opts, "connect_timeout");
if (!timeoutObj.isNil()) {
timeout = (long) (timeoutObj.convertToFloat().getDoubleValue() * 1000);
}
}

// complete connection
while (!channel.finishConnect()) {
context.pollThreadEvents();
}
// wait for connection
if (context.getThread().select(channel, this, SelectionKey.OP_CONNECT, timeout)) {
// complete connection
while (!channel.finishConnect()) {
context.pollThreadEvents();
}

channel.configureBlocking(true);

channel.configureBlocking(true);
return channel;
}

return channel;
throw context.runtime.newErrnoETIMEDOUTError();
} catch (ConnectException e) {
// fall through and try next valid address for the host.
}
}

// did not complete and only path out is n repeated ConnectExceptions
throw context.runtime.newErrnoECONNREFUSEDError("connect(2) for " + host.inspect() + " port " + remotePort);
throw context.runtime.newErrnoECONNREFUSEDError("connect(2) for " + localHost + " port " + remotePort);
}

@JRubyMethod(visibility = Visibility.PRIVATE)
public IRubyObject initialize(ThreadContext context, IRubyObject host, IRubyObject port) {
final String remoteHost = host.isNil() ? "localhost" : host.convertToString().toString();
final int remotePort = SocketUtils.getPortFrom(context, port);

return initialize(context, remoteHost, remotePort, null, 0, null);
}

@JRubyMethod(visibility = Visibility.PRIVATE)
public IRubyObject initialize(ThreadContext context, IRubyObject host, IRubyObject port, IRubyObject localOrOpts) {
final String remoteHost = host.isNil() ? "localhost" : host.convertToString().toString();
final int remotePort = SocketUtils.getPortFrom(context, port);

IRubyObject opts = ArgsUtil.getOptionsArg(context.runtime, localOrOpts);
if (!opts.isNil()) {
return initialize(context, remoteHost, remotePort, null, 0, (RubyHash) opts);
}

String localHost = localOrOpts.isNil() ? null : localOrOpts.convertToString().toString();

return initialize(context, remoteHost, remotePort, localHost, 0, null);

}

@JRubyMethod(required = 2, optional = 2, visibility = Visibility.PRIVATE)
@JRubyMethod(required = 2, optional = 3, visibility = Visibility.PRIVATE)
public IRubyObject initialize(ThreadContext context, IRubyObject[] args) {
Ruby runtime = context.runtime;
String localHost = null;
int localPort = 0;
IRubyObject maybeOpts;
RubyHash opts = null;

switch (args.length) {
case 2:
return initialize(context, args[0], args[1]);
case 3:
return initialize(context, args[0], args[1], args[2]);
}

// cut switch in half to evaluate early args first
IRubyObject host = args[0];
IRubyObject port = args[1];

final String remoteHost = host.isNil() ? "localhost" : host.convertToString().toString();
final int remotePort = SocketUtils.getPortFrom(context, port);

String localHost = (args.length >= 3 && !args[2].isNil()) ? args[2].convertToString().toString() : null;
int localPort = (args.length == 4 && !args[3].isNil()) ? SocketUtils.getPortFrom(context, args[3]) : 0;
switch (args.length) {
case 4:
if (!args[2].isNil()) localHost = args[2].convertToString().toString();

maybeOpts = ArgsUtil.getOptionsArg(context.runtime, args[3]);
if (!maybeOpts.isNil()) {
opts = (RubyHash) maybeOpts;
} else if (!args[3].isNil()) {
localPort = SocketUtils.getPortFrom(context, args[3]);
}

break;
case 5:
if (!args[4].isNil()) opts = (RubyHash) ArgsUtil.getOptionsArg(context.runtime, args[4], true);
break;
default:
throw context.runtime.newArgumentError(args.length, 2, 4);
}

return initialize(context, remoteHost, remotePort, localHost, localPort, opts);
}

public IRubyObject initialize(ThreadContext context, String remoteHost, int remotePort, String localHost, int localPort, RubyHash opts) {
Ruby runtime = context.runtime;

// try to ensure the socket closes if it doesn't succeed
boolean success = false;
SocketChannel channel = null;

try {
try {
channel = attemptConnect(context, host, localHost, localPort, remoteHost, remotePort);
channel = attemptConnect(context, localHost, localPort, remoteHost, remotePort, opts);
success = true;
} catch (BindException e) {
throw runtime.newErrnoEADDRFromBindException(e, " to: " + remoteHost + ':' + remotePort);
Expand Down
2 changes: 0 additions & 2 deletions spec/tags/ruby/library/socket/tcpsocket/initialize_tags.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,2 @@
wip:TCPSocket#initialize raises Errno::ETIMEDOUT with :connect_timeout when no server is listening on the given address
wip:TCPSocket#initialize with a running server connects to a server when passed connect_timeout argument
wip:TCPSocket#initialize using IPv4 when a server is listening on the given address raises SocketError when the port number is a non numeric String
wip:TCPSocket#initialize using IPv6 when a server is listening on the given address raises SocketError when the port number is a non numeric String
2 changes: 0 additions & 2 deletions spec/tags/ruby/library/socket/tcpsocket/open_tags.txt

This file was deleted.

0 comments on commit 70df12f

Please sign in to comment.