From dbe0c2a53e54ef05cc3eeb2d64442a5785dfc0ed Mon Sep 17 00:00:00 2001 From: Charles Oliver Nutter Date: Fri, 29 Sep 2023 17:53:15 -0500 Subject: [PATCH] IO::Buffer read and write logic --- .../main/java/org/jruby/FiberScheduler.java | 48 +++++++ .../src/main/java/org/jruby/RubyIOBuffer.java | 88 ++++++++++++- core/src/main/java/org/jruby/RubyThread.java | 42 +++++-- .../main/java/org/jruby/util/io/OpenFile.java | 119 +++++++++++++++--- .../java/org/jruby/util/io/PosixShim.java | 109 ++++++++++------ 5 files changed, 333 insertions(+), 73 deletions(-) diff --git a/core/src/main/java/org/jruby/FiberScheduler.java b/core/src/main/java/org/jruby/FiberScheduler.java index 3d2ad2f63aaa..a1e239fa878e 100644 --- a/core/src/main/java/org/jruby/FiberScheduler.java +++ b/core/src/main/java/org/jruby/FiberScheduler.java @@ -5,6 +5,8 @@ import org.jruby.runtime.builtin.IRubyObject; import org.jruby.util.io.OpenFile; +import java.nio.ByteBuffer; + public class FiberScheduler { // MRI: rb_fiber_scheduler_kernel_sleep public static IRubyObject kernelSleep(ThreadContext context, IRubyObject scheduler, IRubyObject timeout) { @@ -61,23 +63,50 @@ public static IRubyObject ioRead(ThreadContext context, IRubyObject scheduler, I return Helpers.invokeChecked(context, scheduler, "io_read", io, buffer, context.runtime.newFixnum(length)); } + public static IRubyObject ioRead(ThreadContext context, IRubyObject scheduler, IRubyObject io, IRubyObject buffer, RubyInteger length, RubyInteger offset) { + return Helpers.invokeChecked(context, scheduler, "io_read", io, buffer, length, offset); + } + // MRI: rb_fiber_scheduler_io_pread public static IRubyObject ioPRead(ThreadContext context, IRubyObject scheduler, IRubyObject io, IRubyObject buffer, int length, int offset) { return Helpers.invokeChecked(context, scheduler, "io_pread", io, buffer, context.runtime.newFixnum(length), context.runtime.newFixnum(offset)); } + public static IRubyObject ioPRead(ThreadContext context, IRubyObject scheduler, IRubyObject io, IRubyObject buffer, IRubyObject length, IRubyObject offset) { + return Helpers.invokeChecked(context, scheduler, "io_pread", io, buffer, length, offset); + } + // MRI: rb_fiber_scheduler_io_write public static IRubyObject ioWrite(ThreadContext context, IRubyObject scheduler, IRubyObject io, IRubyObject buffer, int length) { return Helpers.invokeChecked(context, scheduler, "io_read", io, buffer, context.runtime.newFixnum(length)); } + public static IRubyObject ioWrite(ThreadContext context, IRubyObject scheduler, IRubyObject io, IRubyObject buffer, RubyInteger length, RubyInteger offset) { + return Helpers.invokeChecked(context, scheduler, "io_read", io, buffer, length, offset); + } + // MRI: rb_fiber_scheduler_io_pwrite public static IRubyObject ioPWrite(ThreadContext context, IRubyObject scheduler, IRubyObject io, IRubyObject buffer, int length, int offset) { return Helpers.invokeChecked(context, scheduler, "io_pwrite", io, buffer, context.runtime.newFixnum(length), context.runtime.newFixnum(offset)); } + public static IRubyObject ioPWrite(ThreadContext context, IRubyObject scheduler, IRubyObject io, IRubyObject buffer, IRubyObject length, IRubyObject offset) { + return Helpers.invokeChecked(context, scheduler, "io_pwrite", io, buffer, length, offset); + } + // MRI: rb_fiber_scheduler_io_read_memory public static IRubyObject ioReadMemory(ThreadContext context, IRubyObject scheduler, IRubyObject io, byte[] base, int size, int length) { + RubyIOBuffer buffer = RubyIOBuffer.newBuffer(context.runtime, ByteBuffer.wrap(base), size, RubyIOBuffer.LOCKED); + + IRubyObject result = ioRead(context, scheduler, io, buffer, length); + + buffer.unlock(context); + buffer.free(context); + + return result; + } + + public static IRubyObject ioReadMemory(ThreadContext context, IRubyObject scheduler, IRubyObject io, ByteBuffer base, int size, int length) { RubyIOBuffer buffer = RubyIOBuffer.newBuffer(context.runtime, base, size, RubyIOBuffer.LOCKED); IRubyObject result = ioRead(context, scheduler, io, buffer, length); @@ -90,6 +119,17 @@ public static IRubyObject ioReadMemory(ThreadContext context, IRubyObject schedu // MRI: rb_fiber_scheduler_io_write_memory public static IRubyObject ioWriteMemory(ThreadContext context, IRubyObject scheduler, IRubyObject io, byte[] base, int size, int length) { + RubyIOBuffer buffer = RubyIOBuffer.newBuffer(context.runtime, ByteBuffer.wrap(base), size, RubyIOBuffer.LOCKED | RubyIOBuffer.READONLY); + + IRubyObject result = ioWrite(context, scheduler, io, buffer, length); + + buffer.unlock(context); + buffer.free(context); + + return result; + } + + public static IRubyObject ioWriteMemory(ThreadContext context, IRubyObject scheduler, IRubyObject io, ByteBuffer base, int size, int length) { RubyIOBuffer buffer = RubyIOBuffer.newBuffer(context.runtime, base, size, RubyIOBuffer.LOCKED | RubyIOBuffer.READONLY); IRubyObject result = ioWrite(context, scheduler, io, buffer, length); @@ -154,4 +194,12 @@ public static int resultApply(ThreadContext context, IRubyObject result) { return RubyNumeric.num2int(result); } } + + public static IRubyObject result(Ruby runtime, int result, int error) { + if (result == -1) { + return RubyFixnum.newFixnum(runtime, error); + } else { + return RubyFixnum.newFixnum(runtime, error); + } + } } diff --git a/core/src/main/java/org/jruby/RubyIOBuffer.java b/core/src/main/java/org/jruby/RubyIOBuffer.java index 6016e7101db6..3e30fc517ef7 100644 --- a/core/src/main/java/org/jruby/RubyIOBuffer.java +++ b/core/src/main/java/org/jruby/RubyIOBuffer.java @@ -15,7 +15,10 @@ import org.jruby.runtime.builtin.IRubyObject; import org.jruby.util.ByteList; import org.jruby.util.func.ObjectLongFunction; +import org.jruby.util.io.OpenFile; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; import java.math.BigInteger; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -129,7 +132,7 @@ public void initialize(ThreadContext context, byte[] baseBytes, int size, int fl // If we are provided a pointer, we use it. base = ByteBuffer.wrap(baseBytes); } else if (size != 0) { - base = newBufferBase(context.runtime, size, flags, base); + base = newBufferBase(context.runtime, size, flags); } else { // Otherwise we don't do anything. return; @@ -1258,12 +1261,48 @@ public IRubyObject op_not(ThreadContext context) { @JRubyMethod(name = "read") public IRubyObject read(ThreadContext context, IRubyObject io, IRubyObject length) { - return context.nil; + IRubyObject scheduler = context.getFiberCurrentThread().getSchedulerCurrent(); + RubyInteger lengthInteger = length.convertToInteger(); + + if (!scheduler.isNil()) { + IRubyObject result = FiberScheduler.ioRead(context, scheduler, io, this, lengthInteger, RubyFixnum.zero(context.runtime)); + + if (result != UNDEF) { + return result; + } + } + + return read(context, io, lengthInteger.getIntValue(), 0); } @JRubyMethod(name = "read") public IRubyObject read(ThreadContext context, IRubyObject io, IRubyObject length, IRubyObject offset) { - return context.nil; + IRubyObject scheduler = context.getFiberCurrentThread().getSchedulerCurrent(); + RubyInteger lengthInteger = length.convertToInteger(); + RubyInteger offsetInteger = offset.convertToInteger(); + + if (!scheduler.isNil()) { + IRubyObject result = FiberScheduler.ioRead(context, scheduler, io, this, lengthInteger, offsetInteger); + + if (result != UNDEF) { + return result; + } + } + + return read(context, io, lengthInteger.getIntValue(), offsetInteger.getIntValue()); + } + + public IRubyObject read(ThreadContext context, IRubyObject io, int length, int offset) { + validateRange(context, offset, length); + + ByteBuffer buffer = getBufferForWriting(context); + + return readInternal(context, RubyIO.convertToIO(context, io), buffer, offset, length); + } + + private static IRubyObject readInternal(ThreadContext context, RubyIO io, ByteBuffer base, int offset, int size) { + int result = OpenFile.readInternal(context, io.openFile, io.openFile.fd(), base, offset, size); + return FiberScheduler.result(context.runtime, result, io.openFile.posix.getErrno().value()); } @JRubyMethod(name = "pread") @@ -1286,17 +1325,54 @@ public IRubyObject pread(ThreadContext context, IRubyObject[] args) { } public IRubyObject pread(ThreadContext context, IRubyObject io, IRubyObject from, IRubyObject length, IRubyObject offset) { - return context.nil; + throw context.runtime.newNotImplementedError("pread"); } @JRubyMethod(name = "write") public IRubyObject write(ThreadContext context, IRubyObject io, IRubyObject length) { - return context.nil; + IRubyObject scheduler = context.getFiberCurrentThread().getSchedulerCurrent(); + RubyInteger lengthInteger = length.convertToInteger(); + + if (!scheduler.isNil()) { + IRubyObject result = FiberScheduler.ioWrite(context, scheduler, io, this, lengthInteger, RubyFixnum.zero(context.runtime)); + + if (result != UNDEF) { + return result; + } + } + + return write(context, io, lengthInteger.getIntValue(), 0); } @JRubyMethod(name = "write") public IRubyObject write(ThreadContext context, IRubyObject io, IRubyObject length, IRubyObject offset) { - return context.nil; + IRubyObject scheduler = context.getFiberCurrentThread().getSchedulerCurrent(); + RubyInteger lengthInteger = length.convertToInteger(); + RubyInteger offsetInteger = offset.convertToInteger(); + + if (!scheduler.isNil()) { + IRubyObject result = FiberScheduler.ioWrite(context, scheduler, io, this, lengthInteger, offsetInteger); + + if (result != UNDEF) { + return result; + } + } + + return write(context, io, lengthInteger.getIntValue(), offsetInteger.getIntValue()); + } + + public IRubyObject write(ThreadContext context, IRubyObject io, int length, int offset) { + validateRange(context, offset, length); + + + ByteBuffer buffer = getBufferForWriting(context); + + return writeInternal(context, RubyIO.convertToIO(context, io), buffer, offset, length); + } + + private static IRubyObject writeInternal(ThreadContext context, RubyIO io, ByteBuffer base, int offset, int size) { + int result = OpenFile.writeInternal(context, io.openFile, base, offset, size); + return FiberScheduler.result(context.runtime, result, io.openFile.posix.getErrno().value()); } @JRubyMethod(name = "pwrite") diff --git a/core/src/main/java/org/jruby/RubyThread.java b/core/src/main/java/org/jruby/RubyThread.java index 5c4a61df3db1..b207d5a054dc 100644 --- a/core/src/main/java/org/jruby/RubyThread.java +++ b/core/src/main/java/org/jruby/RubyThread.java @@ -36,6 +36,7 @@ import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.ref.WeakReference; +import java.nio.ByteBuffer; import java.nio.channels.Channel; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; @@ -1788,25 +1789,48 @@ public int executeReadWrite( ReadWrite task) throws InterruptedException { Status oldStatus = STATUS.get(this); try { - this.unblockArg = data; - this.unblockFunc = task; + preReadWrite(context, data, task); - // check for interrupt before going into blocking call - blockingThreadPoll(context); + return task.run(context, data, bytes, start, length); + } finally { + postReadWrite(context, oldStatus); + } + } - STATUS.set(this, Status.SLEEP); + public int executeReadWrite( + ThreadContext context, + Data data, ByteBuffer bytes, int start, int length, + ReadWrite task) throws InterruptedException { + Status oldStatus = STATUS.get(this); + try { + preReadWrite(context, data, task); return task.run(context, data, bytes, start, length); } finally { - STATUS.set(this, oldStatus); - this.unblockFunc = null; - this.unblockArg = null; - pollThreadEvents(context); + postReadWrite(context, oldStatus); } } + private void postReadWrite(ThreadContext context, Status oldStatus) { + STATUS.set(this, oldStatus); + this.unblockFunc = null; + this.unblockArg = null; + pollThreadEvents(context); + } + + private void preReadWrite(ThreadContext context, Data data, ReadWrite task) { + this.unblockArg = data; + this.unblockFunc = task; + + // check for interrupt before going into blocking call + blockingThreadPoll(context); + + STATUS.set(this, Status.SLEEP); + } + public interface ReadWrite extends Unblocker { public int run(ThreadContext context, Data data, byte[] bytes, int start, int length) throws InterruptedException; + public int run(ThreadContext context, Data data, ByteBuffer bytes, int start, int length) throws InterruptedException; public void wakeup(RubyThread thread, Data data); } diff --git a/core/src/main/java/org/jruby/util/io/OpenFile.java b/core/src/main/java/org/jruby/util/io/OpenFile.java index 38143443d72c..b5cac8f67124 100644 --- a/core/src/main/java/org/jruby/util/io/OpenFile.java +++ b/core/src/main/java/org/jruby/util/io/OpenFile.java @@ -2,6 +2,7 @@ import java.io.Closeable; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.channels.Channel; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; @@ -1332,6 +1333,25 @@ public int fillbuf(ThreadContext context) { final static RubyThread.ReadWrite READ_TASK = new RubyThread.ReadWrite() { @Override public int run(ThreadContext context, OpenFile fptr, byte[] buffer, int start, int length) throws InterruptedException { + ChannelFD fd = preRead(context, fptr); + try { + return fptr.posix.read(fd, buffer, start, length, fptr.nonblock); + } finally { + fptr.lock(); + } + } + + @Override + public int run(ThreadContext context, OpenFile fptr, ByteBuffer buffer, int start, int length) throws InterruptedException { + ChannelFD fd = preRead(context, fptr); + try { + return fptr.posix.read(fd, buffer, start, length, fptr.nonblock); + } finally { + fptr.lock(); + } + } + + private ChannelFD preRead(ThreadContext context, OpenFile fptr) { ChannelFD fd = fptr.fd; if (fd == null) { @@ -1342,11 +1362,7 @@ public int run(ThreadContext context, OpenFile fptr, byte[] buffer, int start, i assert fptr.lockedByMe(); fptr.unlock(); - try { - return fptr.posix.read(fd, buffer, start, length, fptr.nonblock); - } finally { - fptr.lock(); - } + return fd; } @Override @@ -1358,6 +1374,25 @@ public void wakeup(RubyThread thread, OpenFile data) { final static RubyThread.ReadWrite WRITE_TASK = new RubyThread.ReadWrite() { @Override public int run(ThreadContext context, OpenFile fptr, byte[] bytes, int start, int length) throws InterruptedException { + ChannelFD fd = preWrite(context, fptr); + try { + return fptr.posix.write(fd, bytes, start, length, fptr.nonblock); + } finally { + fptr.lock(); + } + } + + @Override + public int run(ThreadContext context, OpenFile fptr, ByteBuffer bytes, int start, int length) throws InterruptedException { + ChannelFD fd = preWrite(context, fptr); + try { + return fptr.posix.write(fd, bytes, start, length, fptr.nonblock); + } finally { + fptr.lock(); + } + } + + private ChannelFD preWrite(ThreadContext context, OpenFile fptr) { ChannelFD fd = fptr.fd; if (fd == null) { @@ -1368,11 +1403,7 @@ public int run(ThreadContext context, OpenFile fptr, byte[] bytes, int start, in assert fptr.lockedByMe(); fptr.unlock(); - try { - return fptr.posix.write(fd, bytes, start, length, fptr.nonblock); - } finally { - fptr.lock(); - } + return fd; } @Override @@ -1382,7 +1413,7 @@ public void wakeup(RubyThread thread, OpenFile data) { } }; - // rb_read_internal, rb_io_read_memory + // rb_read_internal, rb_io_read_memory, rb_io_buffer_read_internal public static int readInternal(ThreadContext context, OpenFile fptr, ChannelFD fd, byte[] bufBytes, int buf, int count) { IRubyObject scheduler = context.getFiberCurrentThread().getSchedulerCurrent(); if (!scheduler.isNil()) { @@ -1405,6 +1436,48 @@ simple read(2) because EINTR does not damage the descriptor. working with any native descriptor. */ + preRead(context, fptr, fd); + + try { + return context.getThread().executeReadWrite(context, fptr, bufBytes, buf, count, READ_TASK); + } catch (InterruptedException ie) { + throw context.runtime.newConcurrencyError("IO operation interrupted"); + } + } + + // rb_io_buffer_read_internal + public static int readInternal(ThreadContext context, OpenFile fptr, ChannelFD fd, ByteBuffer bufBytes, int buf, int count) { + IRubyObject scheduler = context.getFiberCurrentThread().getSchedulerCurrent(); + if (!scheduler.isNil()) { + IRubyObject result = FiberScheduler.ioReadMemory(context, scheduler, fptr.tiedIOForWriting, bufBytes, buf, count); + + if (result != RubyBasicObject.UNDEF) { + FiberScheduler.resultApply(context, result); + } + } + // if we can do selection and this is not a non-blocking call, do selection + + /* + NOTE CON: We only do this selection because on the JDK, blocking calls to NIO channels can't be + interrupted, and we need to be able to interrupt blocking reads. In MRI, this logic is always just a + simple read(2) because EINTR does not damage the descriptor. + + Doing selects here on ENXIO native channels caused FIFOs to block for read all the time, even when no + writers are connected. This may or may not be correct behavior for selects against FIFOs, but in any + case since MRI does not do a select here at all I believe correct logic is to skip the select when + working with any native descriptor. + */ + + preRead(context, fptr, fd); + + try { + return context.getThread().executeReadWrite(context, fptr, bufBytes, buf, count, READ_TASK); + } catch (InterruptedException ie) { + throw context.runtime.newConcurrencyError("IO operation interrupted"); + } + } + + private static void preRead(ThreadContext context, OpenFile fptr, ChannelFD fd) { if (fd == null) { // stream was closed on its way in, raise appropriate error throw context.runtime.newErrnoEBADFError(); @@ -1420,12 +1493,6 @@ simple read(2) because EINTR does not damage the descriptor. } finally { fptr.lock(); } - - try { - return context.getThread().executeReadWrite(context, fptr, bufBytes, buf, count, READ_TASK); - } catch (InterruptedException ie) { - throw context.runtime.newConcurrencyError("IO operation interrupted"); - } } /** @@ -2349,6 +2416,24 @@ public static int writeInternal(ThreadContext context, OpenFile fptr, byte[] buf } } + // rb_io_buffer_write_internal + public static int writeInternal(ThreadContext context, OpenFile fptr, ByteBuffer bufBytes, int buf, int count) { + IRubyObject scheduler = context.getFiberCurrentThread().getSchedulerCurrent(); + if (!scheduler.isNil()) { + IRubyObject result = FiberScheduler.ioWriteMemory(context, scheduler, fptr.tiedIOForWriting, bufBytes, buf, count); + + if (result != RubyBasicObject.UNDEF) { + FiberScheduler.resultApply(context, result); + } + } + + try { + return context.getThread().executeReadWrite(context, fptr, bufBytes, buf, count, WRITE_TASK); + } catch (InterruptedException ie) { + throw context.runtime.newConcurrencyError("IO operation interrupted"); + } + } + // rb_write_internal2 (no GVL version...we just don't use executeTask as above. int writeInternal2(ChannelFD fd, byte[] bufBytes, int buf, int count) { return posix.write(fd, bufBytes, buf, count, nonblock); diff --git a/core/src/main/java/org/jruby/util/io/PosixShim.java b/core/src/main/java/org/jruby/util/io/PosixShim.java index ae0c72e6ee49..ed3c36cba3a4 100644 --- a/core/src/main/java/org/jruby/util/io/PosixShim.java +++ b/core/src/main/java/org/jruby/util/io/PosixShim.java @@ -89,10 +89,15 @@ public long lseek(ChannelFD fd, long offset, int type) { } public int write(ChannelFD fd, byte[] bytes, int offset, int length, boolean nonblock) { - clear(); - // FIXME: don't allocate every time ByteBuffer tmp = ByteBuffer.wrap(bytes, offset, length); + + return write(fd, tmp, offset, length, nonblock); + } + + public int write(ChannelFD fd, ByteBuffer buffer, int offset, int length, boolean nonblock) { + clear(); + try { if (nonblock) { // TODO: figure out what nonblocking writes against atypical streams (files?) actually do @@ -104,7 +109,7 @@ public int write(ChannelFD fd, byte[] bytes, int offset, int length, boolean non setErrno(Errno.EACCES); return -1; } - int written = fd.chWrite.write(tmp); + int written = fd.chWrite.write(buffer); if (written == 0 && length > 0) { // if it's a nonblocking write against a file and we've hit EOF, do EAGAIN @@ -129,55 +134,77 @@ public int read(ChannelFD fd, byte[] target, int offset, int length, boolean non clear(); try { - if (nonblock) { - // need to ensure channels that don't support nonblocking IO at least - // appear to be nonblocking - if (fd.chSelect != null) { - // ok...we should have set it nonblocking already in setNonblock - } else { - if (fd.chFile != null) { - long position = fd.chFile.position(); - long size = fd.chFile.size(); - if (position != -1 && size != -1 && position < size) { - // there should be bytes available...proceed - } else { - setErrno(Errno.EAGAIN); - return -1; - } - } else if (fd.chNative != null && fd.isNativeFile) { - // it's a native file, so we don't do selection or nonblock - } else { - setErrno(Errno.EAGAIN); - return -1; - } - } - } + if (checkForBlocking(fd, nonblock)) return -1; // FIXME: inefficient to recreate ByteBuffer every time ByteBuffer buffer = ByteBuffer.wrap(target, offset, length); - int read = fd.chRead.read(buffer); + return performRead(fd, buffer, nonblock); + } catch (IOException ioe) { + setErrno(Helpers.errnoFromException(ioe)); + return -1; + } + } - if (nonblock) { - if (read == JAVA_EOF) { - read = NATIVE_EOF; // still treat EOF as EOF - } else if (read == 0) { - setErrno(Errno.EAGAIN); - return -1; - } else { - return read; - } - } else { - // NIO channels will always raise for errors, so -1 only means EOF. - if (read == JAVA_EOF) read = NATIVE_EOF; - } + public int read(ChannelFD fd, ByteBuffer buffer, int offset, int length, boolean nonblock) { + clear(); + + try { + if (checkForBlocking(fd, nonblock)) return -1; - return read; + return performRead(fd, buffer, nonblock); } catch (IOException ioe) { setErrno(Helpers.errnoFromException(ioe)); return -1; } } + private int performRead(ChannelFD fd, ByteBuffer buffer, boolean nonblock) throws IOException { + int read = fd.chRead.read(buffer); + + if (nonblock) { + if (read == JAVA_EOF) { + read = NATIVE_EOF; // still treat EOF as EOF + } else if (read == 0) { + setErrno(Errno.EAGAIN); + return -1; + } else { + return read; + } + } else { + // NIO channels will always raise for errors, so -1 only means EOF. + if (read == JAVA_EOF) read = NATIVE_EOF; + } + + return read; + } + + private boolean checkForBlocking(ChannelFD fd, boolean nonblock) throws IOException { + if (nonblock) { + // need to ensure channels that don't support nonblocking IO at least + // appear to be nonblocking + if (fd.chSelect != null) { + // ok...we should have set it nonblocking already in setNonblock + } else { + if (fd.chFile != null) { + long position = fd.chFile.position(); + long size = fd.chFile.size(); + if (position != -1 && size != -1 && position < size) { + // there should be bytes available...proceed + } else { + setErrno(Errno.EAGAIN); + return true; + } + } else if (fd.chNative != null && fd.isNativeFile) { + // it's a native file, so we don't do selection or nonblock + } else { + setErrno(Errno.EAGAIN); + return true; + } + } + } + return false; + } + // rb_thread_flock public int flock(ChannelFD fd, int lockMode) { // TODO: null channel always succeeds for all locking operations