Skip to content

Commit

Permalink
IO::Buffer read and write logic
Browse files Browse the repository at this point in the history
  • Loading branch information
headius committed Sep 29, 2023
1 parent dd70981 commit dbe0c2a
Show file tree
Hide file tree
Showing 5 changed files with 333 additions and 73 deletions.
48 changes: 48 additions & 0 deletions core/src/main/java/org/jruby/FiberScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import org.jruby.runtime.builtin.IRubyObject;
import org.jruby.util.io.OpenFile;

import java.nio.ByteBuffer;

public class FiberScheduler {
// MRI: rb_fiber_scheduler_kernel_sleep
public static IRubyObject kernelSleep(ThreadContext context, IRubyObject scheduler, IRubyObject timeout) {
Expand Down Expand Up @@ -61,23 +63,50 @@ public static IRubyObject ioRead(ThreadContext context, IRubyObject scheduler, I
return Helpers.invokeChecked(context, scheduler, "io_read", io, buffer, context.runtime.newFixnum(length));
}

public static IRubyObject ioRead(ThreadContext context, IRubyObject scheduler, IRubyObject io, IRubyObject buffer, RubyInteger length, RubyInteger offset) {
return Helpers.invokeChecked(context, scheduler, "io_read", io, buffer, length, offset);
}

// MRI: rb_fiber_scheduler_io_pread
public static IRubyObject ioPRead(ThreadContext context, IRubyObject scheduler, IRubyObject io, IRubyObject buffer, int length, int offset) {
return Helpers.invokeChecked(context, scheduler, "io_pread", io, buffer, context.runtime.newFixnum(length), context.runtime.newFixnum(offset));
}

public static IRubyObject ioPRead(ThreadContext context, IRubyObject scheduler, IRubyObject io, IRubyObject buffer, IRubyObject length, IRubyObject offset) {
return Helpers.invokeChecked(context, scheduler, "io_pread", io, buffer, length, offset);
}

// MRI: rb_fiber_scheduler_io_write
public static IRubyObject ioWrite(ThreadContext context, IRubyObject scheduler, IRubyObject io, IRubyObject buffer, int length) {
return Helpers.invokeChecked(context, scheduler, "io_read", io, buffer, context.runtime.newFixnum(length));
}

public static IRubyObject ioWrite(ThreadContext context, IRubyObject scheduler, IRubyObject io, IRubyObject buffer, RubyInteger length, RubyInteger offset) {
return Helpers.invokeChecked(context, scheduler, "io_read", io, buffer, length, offset);
}

// MRI: rb_fiber_scheduler_io_pwrite
public static IRubyObject ioPWrite(ThreadContext context, IRubyObject scheduler, IRubyObject io, IRubyObject buffer, int length, int offset) {
return Helpers.invokeChecked(context, scheduler, "io_pwrite", io, buffer, context.runtime.newFixnum(length), context.runtime.newFixnum(offset));
}

public static IRubyObject ioPWrite(ThreadContext context, IRubyObject scheduler, IRubyObject io, IRubyObject buffer, IRubyObject length, IRubyObject offset) {
return Helpers.invokeChecked(context, scheduler, "io_pwrite", io, buffer, length, offset);
}

// MRI: rb_fiber_scheduler_io_read_memory
public static IRubyObject ioReadMemory(ThreadContext context, IRubyObject scheduler, IRubyObject io, byte[] base, int size, int length) {
RubyIOBuffer buffer = RubyIOBuffer.newBuffer(context.runtime, ByteBuffer.wrap(base), size, RubyIOBuffer.LOCKED);

IRubyObject result = ioRead(context, scheduler, io, buffer, length);

buffer.unlock(context);
buffer.free(context);

return result;
}

public static IRubyObject ioReadMemory(ThreadContext context, IRubyObject scheduler, IRubyObject io, ByteBuffer base, int size, int length) {
RubyIOBuffer buffer = RubyIOBuffer.newBuffer(context.runtime, base, size, RubyIOBuffer.LOCKED);

IRubyObject result = ioRead(context, scheduler, io, buffer, length);
Expand All @@ -90,6 +119,17 @@ public static IRubyObject ioReadMemory(ThreadContext context, IRubyObject schedu

// MRI: rb_fiber_scheduler_io_write_memory
public static IRubyObject ioWriteMemory(ThreadContext context, IRubyObject scheduler, IRubyObject io, byte[] base, int size, int length) {
RubyIOBuffer buffer = RubyIOBuffer.newBuffer(context.runtime, ByteBuffer.wrap(base), size, RubyIOBuffer.LOCKED | RubyIOBuffer.READONLY);

IRubyObject result = ioWrite(context, scheduler, io, buffer, length);

buffer.unlock(context);
buffer.free(context);

return result;
}

public static IRubyObject ioWriteMemory(ThreadContext context, IRubyObject scheduler, IRubyObject io, ByteBuffer base, int size, int length) {
RubyIOBuffer buffer = RubyIOBuffer.newBuffer(context.runtime, base, size, RubyIOBuffer.LOCKED | RubyIOBuffer.READONLY);

IRubyObject result = ioWrite(context, scheduler, io, buffer, length);
Expand Down Expand Up @@ -154,4 +194,12 @@ public static int resultApply(ThreadContext context, IRubyObject result) {
return RubyNumeric.num2int(result);
}
}

public static IRubyObject result(Ruby runtime, int result, int error) {
if (result == -1) {
return RubyFixnum.newFixnum(runtime, error);
} else {
return RubyFixnum.newFixnum(runtime, error);
}
}
}
88 changes: 82 additions & 6 deletions core/src/main/java/org/jruby/RubyIOBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
import org.jruby.runtime.builtin.IRubyObject;
import org.jruby.util.ByteList;
import org.jruby.util.func.ObjectLongFunction;
import org.jruby.util.io.OpenFile;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
Expand Down Expand Up @@ -129,7 +132,7 @@ public void initialize(ThreadContext context, byte[] baseBytes, int size, int fl
// If we are provided a pointer, we use it.
base = ByteBuffer.wrap(baseBytes);
} else if (size != 0) {
base = newBufferBase(context.runtime, size, flags, base);
base = newBufferBase(context.runtime, size, flags);
} else {
// Otherwise we don't do anything.
return;
Expand Down Expand Up @@ -1258,12 +1261,48 @@ public IRubyObject op_not(ThreadContext context) {

@JRubyMethod(name = "read")
public IRubyObject read(ThreadContext context, IRubyObject io, IRubyObject length) {
return context.nil;
IRubyObject scheduler = context.getFiberCurrentThread().getSchedulerCurrent();
RubyInteger lengthInteger = length.convertToInteger();

if (!scheduler.isNil()) {
IRubyObject result = FiberScheduler.ioRead(context, scheduler, io, this, lengthInteger, RubyFixnum.zero(context.runtime));

if (result != UNDEF) {
return result;
}
}

return read(context, io, lengthInteger.getIntValue(), 0);
}

@JRubyMethod(name = "read")
public IRubyObject read(ThreadContext context, IRubyObject io, IRubyObject length, IRubyObject offset) {
return context.nil;
IRubyObject scheduler = context.getFiberCurrentThread().getSchedulerCurrent();
RubyInteger lengthInteger = length.convertToInteger();
RubyInteger offsetInteger = offset.convertToInteger();

if (!scheduler.isNil()) {
IRubyObject result = FiberScheduler.ioRead(context, scheduler, io, this, lengthInteger, offsetInteger);

if (result != UNDEF) {
return result;
}
}

return read(context, io, lengthInteger.getIntValue(), offsetInteger.getIntValue());
}

public IRubyObject read(ThreadContext context, IRubyObject io, int length, int offset) {
validateRange(context, offset, length);

ByteBuffer buffer = getBufferForWriting(context);

return readInternal(context, RubyIO.convertToIO(context, io), buffer, offset, length);
}

private static IRubyObject readInternal(ThreadContext context, RubyIO io, ByteBuffer base, int offset, int size) {
int result = OpenFile.readInternal(context, io.openFile, io.openFile.fd(), base, offset, size);
return FiberScheduler.result(context.runtime, result, io.openFile.posix.getErrno().value());
}

@JRubyMethod(name = "pread")
Expand All @@ -1286,17 +1325,54 @@ public IRubyObject pread(ThreadContext context, IRubyObject[] args) {
}

public IRubyObject pread(ThreadContext context, IRubyObject io, IRubyObject from, IRubyObject length, IRubyObject offset) {
return context.nil;
throw context.runtime.newNotImplementedError("pread");
}

@JRubyMethod(name = "write")
public IRubyObject write(ThreadContext context, IRubyObject io, IRubyObject length) {
return context.nil;
IRubyObject scheduler = context.getFiberCurrentThread().getSchedulerCurrent();
RubyInteger lengthInteger = length.convertToInteger();

if (!scheduler.isNil()) {
IRubyObject result = FiberScheduler.ioWrite(context, scheduler, io, this, lengthInteger, RubyFixnum.zero(context.runtime));

if (result != UNDEF) {
return result;
}
}

return write(context, io, lengthInteger.getIntValue(), 0);
}

@JRubyMethod(name = "write")
public IRubyObject write(ThreadContext context, IRubyObject io, IRubyObject length, IRubyObject offset) {
return context.nil;
IRubyObject scheduler = context.getFiberCurrentThread().getSchedulerCurrent();
RubyInteger lengthInteger = length.convertToInteger();
RubyInteger offsetInteger = offset.convertToInteger();

if (!scheduler.isNil()) {
IRubyObject result = FiberScheduler.ioWrite(context, scheduler, io, this, lengthInteger, offsetInteger);

if (result != UNDEF) {
return result;
}
}

return write(context, io, lengthInteger.getIntValue(), offsetInteger.getIntValue());
}

public IRubyObject write(ThreadContext context, IRubyObject io, int length, int offset) {
validateRange(context, offset, length);


ByteBuffer buffer = getBufferForWriting(context);

return writeInternal(context, RubyIO.convertToIO(context, io), buffer, offset, length);
}

private static IRubyObject writeInternal(ThreadContext context, RubyIO io, ByteBuffer base, int offset, int size) {
int result = OpenFile.writeInternal(context, io.openFile, base, offset, size);
return FiberScheduler.result(context.runtime, result, io.openFile.posix.getErrno().value());
}

@JRubyMethod(name = "pwrite")
Expand Down
42 changes: 33 additions & 9 deletions core/src/main/java/org/jruby/RubyThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
Expand Down Expand Up @@ -1788,25 +1789,48 @@ public <Data> int executeReadWrite(
ReadWrite<Data> task) throws InterruptedException {
Status oldStatus = STATUS.get(this);
try {
this.unblockArg = data;
this.unblockFunc = task;
preReadWrite(context, data, task);

// check for interrupt before going into blocking call
blockingThreadPoll(context);
return task.run(context, data, bytes, start, length);
} finally {
postReadWrite(context, oldStatus);
}
}

STATUS.set(this, Status.SLEEP);
public <Data> int executeReadWrite(
ThreadContext context,
Data data, ByteBuffer bytes, int start, int length,
ReadWrite<Data> task) throws InterruptedException {
Status oldStatus = STATUS.get(this);
try {
preReadWrite(context, data, task);

return task.run(context, data, bytes, start, length);
} finally {
STATUS.set(this, oldStatus);
this.unblockFunc = null;
this.unblockArg = null;
pollThreadEvents(context);
postReadWrite(context, oldStatus);
}
}

private void postReadWrite(ThreadContext context, Status oldStatus) {
STATUS.set(this, oldStatus);
this.unblockFunc = null;
this.unblockArg = null;
pollThreadEvents(context);
}

private <Data> void preReadWrite(ThreadContext context, Data data, ReadWrite<Data> task) {
this.unblockArg = data;
this.unblockFunc = task;

// check for interrupt before going into blocking call
blockingThreadPoll(context);

STATUS.set(this, Status.SLEEP);
}

public interface ReadWrite<Data> extends Unblocker<Data> {
public int run(ThreadContext context, Data data, byte[] bytes, int start, int length) throws InterruptedException;
public int run(ThreadContext context, Data data, ByteBuffer bytes, int start, int length) throws InterruptedException;
public void wakeup(RubyThread thread, Data data);
}

Expand Down
Loading

0 comments on commit dbe0c2a

Please sign in to comment.