Skip to content

Commit

Permalink
Guard Fiber::Scheduler support behind a property
Browse files Browse the repository at this point in the history
This commit adds the property "jruby.experimental.fiber.scheduler"
to control whether the Fiber::Scheduler subsystem will be used at
runtime. It is currently off by default, as this is an
experimental feature in all versions of Ruby that support it. Set
this property at the JVM level, or pass
-Xexperimental.fiber.scheduler to enable it.

None of the classes or constants associated with the fiber
scheduler will be defined if the property is not enabled.

In the future, this property may disappear when this feature is
finalized, or may remain under a different name to allow disabling
all scheduler checks.
  • Loading branch information
headius committed Oct 31, 2023
1 parent 2779806 commit 54a8d4b
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 74 deletions.
13 changes: 8 additions & 5 deletions core/src/main/java/org/jruby/Ruby.java
Original file line number Diff line number Diff line change
Expand Up @@ -1718,11 +1718,14 @@ private void initExceptions() {

RubyClass runtimeError = this.runtimeError;
ObjectAllocator runtimeErrorAllocator = runtimeError.getAllocator();
bufferLockedError = ioBufferClass.defineClassUnder("LockedError", runtimeError, runtimeErrorAllocator);
bufferAllocationError = ioBufferClass.defineClassUnder("AllocationError", runtimeError, runtimeErrorAllocator);
bufferAccessError = ioBufferClass.defineClassUnder("AccessError", runtimeError, runtimeErrorAllocator);
bufferInvalidatedError = ioBufferClass.defineClassUnder("InvalidatedError", runtimeError, runtimeErrorAllocator);
bufferMaskError = ioBufferClass.defineClassUnder("MaskError", runtimeError, runtimeErrorAllocator);

if (Options.FIBER_SCHEDULER.load()) {
bufferLockedError = ioBufferClass.defineClassUnder("LockedError", runtimeError, runtimeErrorAllocator);
bufferAllocationError = ioBufferClass.defineClassUnder("AllocationError", runtimeError, runtimeErrorAllocator);
bufferAccessError = ioBufferClass.defineClassUnder("AccessError", runtimeError, runtimeErrorAllocator);
bufferInvalidatedError = ioBufferClass.defineClassUnder("InvalidatedError", runtimeError, runtimeErrorAllocator);
bufferMaskError = ioBufferClass.defineClassUnder("MaskError", runtimeError, runtimeErrorAllocator);
}

initErrno();

Expand Down
11 changes: 7 additions & 4 deletions core/src/main/java/org/jruby/RubyIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import org.jruby.runtime.encoding.EncodingService;
import org.jruby.util.ShellLauncher.POpenProcess;
import org.jruby.util.*;
import org.jruby.util.cli.Options;
import org.jruby.util.io.ChannelFD;
import org.jruby.util.io.EncodingUtils;
import org.jruby.util.io.FilenoUtil;
Expand Down Expand Up @@ -3795,10 +3796,12 @@ public static RubyIO convertToIO(ThreadContext context, IRubyObject obj) {

@JRubyMethod(name = "select", required = 1, optional = 3, checkArity = false, meta = true)
public static IRubyObject select(ThreadContext context, IRubyObject recv, IRubyObject[] argv) {
IRubyObject scheduler = context.getFiberCurrentThread().getSchedulerCurrent();
if (!scheduler.isNil()) {
IRubyObject result = FiberScheduler.ioSelectv(context, scheduler, argv);
if (result != UNDEF) return result;
if (Options.FIBER_SCHEDULER.load()) {
IRubyObject scheduler = context.getFiberCurrentThread().getSchedulerCurrent();
if (!scheduler.isNil()) {
IRubyObject result = FiberScheduler.ioSelectv(context, scheduler, argv);
if (result != UNDEF) return result;
}
}

int argc = Arity.checkArgumentCount(context, argv, 1, 4);
Expand Down
19 changes: 11 additions & 8 deletions core/src/main/java/org/jruby/RubyIOBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.jruby.util.ByteList;
import org.jruby.util.cli.Options;
import org.jruby.util.io.ChannelFD;
import org.jruby.util.io.OpenFile;

Expand All @@ -31,19 +32,21 @@ public class RubyIOBuffer extends RubyObject {
public static final Runtime FFI_RUNTIME = Runtime.getSystemRuntime();

public static RubyClass createIOBufferClass(Ruby runtime) {
RubyClass IOBuffer = runtime.getIO().defineClassUnder("Buffer", runtime.getObject(), RubyIOBuffer::new);
if (Options.FIBER_SCHEDULER.load()) {
RubyClass IOBuffer = runtime.getIO().defineClassUnder("Buffer", runtime.getObject(), RubyIOBuffer::new);

IOBuffer.includeModule(runtime.getComparable());
IOBuffer.includeModule(runtime.getComparable());

IOBuffer.defineAnnotatedMethods(RubyIOBuffer.class);
IOBuffer.defineAnnotatedConstants(RubyIOBuffer.class);
IOBuffer.defineAnnotatedMethods(RubyIOBuffer.class);
IOBuffer.defineAnnotatedConstants(RubyIOBuffer.class);

RubyClass IO = runtime.getIO();
RubyClass IO = runtime.getIO();

IO.setConstant("READABLE", runtime.newFixnum(OpenFile.READABLE));
IO.setConstant("WRITABLE", runtime.newFixnum(OpenFile.WRITABLE));
IO.setConstant("READABLE", runtime.newFixnum(OpenFile.READABLE));
IO.setConstant("WRITABLE", runtime.newFixnum(OpenFile.WRITABLE));

return IOBuffer;
return IOBuffer;
}
}

@JRubyConstant
Expand Down
58 changes: 30 additions & 28 deletions core/src/main/java/org/jruby/ext/fiber/ThreadFiber.java
Original file line number Diff line number Diff line change
Expand Up @@ -691,38 +691,40 @@ public IRubyObject backtrace_locations(ThreadContext context, IRubyObject level,
return threadFiber.thread.backtrace_locations(context, level, length);
}

// MRI: rb_fiber_s_schedule_kw and rb_fiber_s_schedule, kw passes on context
@JRubyMethod(name = "schedule", meta = true, rest = true, keywords = true)
public static IRubyObject schedule(ThreadContext context, IRubyObject self, IRubyObject[] args, Block block) {
RubyThread thread = context.getThread();
IRubyObject scheduler = thread.getScheduler();
IRubyObject fiber = context.nil;

if (!scheduler.isNil()) {
fiber = scheduler.callMethod(context, "fiber", args, block);
} else {
throw context.runtime.newRuntimeError("No scheduler is available!");
}
public static class FiberSchedulerSupport {
// MRI: rb_fiber_s_schedule_kw and rb_fiber_s_schedule, kw passes on context
@JRubyMethod(name = "schedule", meta = true, rest = true, keywords = true)
public static IRubyObject schedule(ThreadContext context, IRubyObject self, IRubyObject[] args, Block block) {
RubyThread thread = context.getThread();
IRubyObject scheduler = thread.getScheduler();
IRubyObject fiber = context.nil;

if (!scheduler.isNil()) {
fiber = scheduler.callMethod(context, "fiber", args, block);
} else {
throw context.runtime.newRuntimeError("No scheduler is available!");
}

return fiber;
}
return fiber;
}

// MRI: rb_fiber_s_scheduler
@JRubyMethod(name = "scheduler", meta = true)
public static IRubyObject get_scheduler(ThreadContext context, IRubyObject self) {
return context.getFiberCurrentThread().getScheduler();
}
// MRI: rb_fiber_s_scheduler
@JRubyMethod(name = "scheduler", meta = true)
public static IRubyObject get_scheduler(ThreadContext context, IRubyObject self) {
return context.getFiberCurrentThread().getScheduler();
}

// MRI: rb_fiber_current_scheduler
@JRubyMethod(name = "current_scheduler", meta = true)
public static IRubyObject current_scheduler(ThreadContext context, IRubyObject self) {
return context.getFiberCurrentThread().getSchedulerCurrent();
}
// MRI: rb_fiber_current_scheduler
@JRubyMethod(name = "current_scheduler", meta = true)
public static IRubyObject current_scheduler(ThreadContext context, IRubyObject self) {
return context.getFiberCurrentThread().getSchedulerCurrent();
}

// MRI: rb_fiber_set_scheduler
@JRubyMethod(name = "set_scheduler", meta = true)
public static IRubyObject set_scheduler(ThreadContext context, IRubyObject self, IRubyObject scheduler) {
return context.getFiberCurrentThread().setFiberScheduler(scheduler);
// MRI: rb_fiber_set_scheduler
@JRubyMethod(name = "set_scheduler", meta = true)
public static IRubyObject set_scheduler(ThreadContext context, IRubyObject self, IRubyObject scheduler) {
return context.getFiberCurrentThread().setFiberScheduler(scheduler);
}
}

public FiberData getData() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.jruby.runtime.ObjectAllocator;
import org.jruby.runtime.load.Library;
import org.jruby.runtime.builtin.IRubyObject;
import org.jruby.util.cli.Options;

/**
* A thread-based implementation of Ruby 1.9 Fiber library.
Expand All @@ -43,6 +44,11 @@ public RubyClass createFiberClass(final Ruby runtime) {

cFiber.defineAnnotatedMethods(ThreadFiber.class);

if (Options.FIBER_SCHEDULER.load()) {
// define additional methods for Fiber::Scheduler support
cFiber.defineAnnotatedMethods(ThreadFiber.FiberSchedulerSupport.class);
}

return cFiber;
}
}
3 changes: 2 additions & 1 deletion core/src/main/java/org/jruby/util/cli/Category.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public enum Category {
JAVA_INTEGRATION("java integration"),
PROFILING("profiling"),
CLI("command line options"),
COMPLIANCE("compliance options");
COMPLIANCE("compliance options"),
EXPERIMENTAL("experimental features");

Category(String desc) {
this.desc = desc;
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/java/org/jruby/util/cli/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ public class Options {

public static final Option<Integer> PROFILE_MAX_METHODS = integer(PROFILING, "profile.max.methods", 100000, "Maximum number of methods to consider for profiling.");

public static final Option<Boolean> FIBER_SCHEDULER = bool(EXPERIMENTAL, "fiber.scheduler", false, "Enable experimental Fiber::Scheduler support.");

public static final Option<Boolean> CLI_AUTOSPLIT = bool(CLI, "cli.autosplit", false, "Split $_ into $F for -p or -n. Same as -a.");
public static final Option<Boolean> CLI_DEBUG = bool(CLI, "cli.debug", false, "Enable debug mode logging. Same as -d.");
public static final Option<Boolean> CLI_PROCESS_LINE_ENDS = bool(CLI, "cli.process.line.ends", false, "Enable line ending processing. Same as -l.");
Expand Down
61 changes: 36 additions & 25 deletions core/src/main/java/org/jruby/util/io/OpenFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.jruby.Finalizable;
import org.jruby.Ruby;
import org.jruby.RubyArgsFile;
import org.jruby.RubyBasicObject;
import org.jruby.RubyBignum;
import org.jruby.RubyEncoding;
import org.jruby.RubyException;
Expand All @@ -48,6 +47,7 @@
import org.jruby.util.ByteList;
import org.jruby.util.ShellLauncher;
import org.jruby.util.StringSupport;
import org.jruby.util.cli.Options;

import static org.jruby.util.StringSupport.*;

Expand All @@ -61,6 +61,7 @@ public OpenFile(RubyIO io, IRubyObject nil) {
writeconvPreEcopts = nil;
encs.ecopts = nil;
posix = new PosixShim(runtime);
fiberScheduler = Options.FIBER_SCHEDULER.load();
}

// IO Mode flags
Expand Down Expand Up @@ -162,6 +163,8 @@ public static class Buffer {
private final Ptr spPtr = new Ptr();
private final Ptr dpPtr = new Ptr();

private final boolean fiberScheduler;

public void clearStdio() {
stdio_file = null;
}
Expand Down Expand Up @@ -462,7 +465,7 @@ public int io_fflush(ThreadContext context) {

// rb_io_wait_writable
public boolean waitWritable(ThreadContext context, long timeout) {
IRubyObject scheduler = context.getFiberCurrentThread().getSchedulerCurrent();
IRubyObject scheduler = fiberScheduler ? context.getFiberCurrentThread().getSchedulerCurrent() : null;

boolean locked = lock();
try {
Expand All @@ -477,7 +480,7 @@ public boolean waitWritable(ThreadContext context, long timeout) {
return true;
case EAGAIN:
case EWOULDBLOCK:
if (!scheduler.isNil()) {
if (fiberScheduler && !scheduler.isNil()) {
return FiberScheduler.ioWaitWritable(context, scheduler, RubyIO.newIO(context.runtime, channel())).isTrue();
}

Expand All @@ -498,7 +501,7 @@ public boolean waitWritable(ThreadContext context) {

// rb_io_wait_readable
public boolean waitReadable(ThreadContext context, long timeout) {
IRubyObject scheduler = context.getFiberCurrentThread().getSchedulerCurrent();
IRubyObject scheduler = fiberScheduler ? context.getFiberCurrentThread().getSchedulerCurrent() : null;

boolean locked = lock();
try {
Expand All @@ -513,7 +516,7 @@ public boolean waitReadable(ThreadContext context, long timeout) {
return true;
case EAGAIN:
case EWOULDBLOCK:
if (!scheduler.isNil()) {
if (fiberScheduler && !scheduler.isNil()) {
return FiberScheduler.ioWaitReadable(context, scheduler, RubyIO.newIO(context.runtime, channel())).isTrue();
}

Expand Down Expand Up @@ -1478,12 +1481,14 @@ public static int readInternal(ThreadContext context, OpenFile fptr, ChannelFD f
// rb_io_buffer_read_internal
public static int readInternal(ThreadContext context, OpenFile fptr, ChannelFD fd, ByteBuffer buffer, int buf, int count) {
// try scheduler first
IRubyObject scheduler = context.getFiberCurrentThread().getSchedulerCurrent();
if (!scheduler.isNil()) {
IRubyObject result = FiberScheduler.ioReadMemory(context, scheduler, fptr.io, buffer, buf, count);
if (fptr.fiberScheduler) {
IRubyObject scheduler = context.getFiberCurrentThread().getSchedulerCurrent();
if (!scheduler.isNil()) {
IRubyObject result = FiberScheduler.ioReadMemory(context, scheduler, fptr.io, buffer, buf, count);

if (result != null) {
return FiberScheduler.resultApply(context, result);
if (result != null) {
return FiberScheduler.resultApply(context, result);
}
}
}

Expand Down Expand Up @@ -1513,12 +1518,14 @@ simple read(2) because EINTR does not damage the descriptor.

public static int preadInternal(ThreadContext context, OpenFile fptr, ChannelFD fd, ByteBuffer buffer, int from, int length) {
// try scheduler first
IRubyObject scheduler = context.getFiberCurrentThread().getSchedulerCurrent();
if (!scheduler.isNil()) {
IRubyObject result = FiberScheduler.ioPReadMemory(context, scheduler, fptr.io, buffer, from, length, 0);
if (fptr.fiberScheduler) {
IRubyObject scheduler = context.getFiberCurrentThread().getSchedulerCurrent();
if (!scheduler.isNil()) {
IRubyObject result = FiberScheduler.ioPReadMemory(context, scheduler, fptr.io, buffer, from, length, 0);

if (result != null) {
return FiberScheduler.resultApply(context, result);
if (result != null) {
return FiberScheduler.resultApply(context, result);
}
}
}

Expand All @@ -1532,12 +1539,14 @@ public static int preadInternal(ThreadContext context, OpenFile fptr, ChannelFD

public static int pwriteInternal(ThreadContext context, OpenFile fptr, ChannelFD fd, ByteBuffer buffer, int from, int length) {
// try scheduler first
IRubyObject scheduler = context.getFiberCurrentThread().getSchedulerCurrent();
if (!scheduler.isNil()) {
IRubyObject result = FiberScheduler.ioPWriteMemory(context, scheduler, fptr.io, buffer, from, length, 0);
if (fptr.fiberScheduler) {
IRubyObject scheduler = context.getFiberCurrentThread().getSchedulerCurrent();
if (!scheduler.isNil()) {
IRubyObject result = FiberScheduler.ioPWriteMemory(context, scheduler, fptr.io, buffer, from, length, 0);

if (result != null) {
return FiberScheduler.resultApply(context, result);
if (result != null) {
return FiberScheduler.resultApply(context, result);
}
}
}

Expand Down Expand Up @@ -2479,12 +2488,14 @@ public static int writeInternal(ThreadContext context, OpenFile fptr, byte[] buf

// rb_io_buffer_write_internal
public static int writeInternal(ThreadContext context, OpenFile fptr, ByteBuffer bufBytes, int buf, int count) {
IRubyObject scheduler = context.getFiberCurrentThread().getSchedulerCurrent();
if (!scheduler.isNil()) {
IRubyObject result = FiberScheduler.ioWriteMemory(context, scheduler, fptr.io, bufBytes, buf, count);
if (fptr.fiberScheduler) {
IRubyObject scheduler = context.getFiberCurrentThread().getSchedulerCurrent();
if (!scheduler.isNil()) {
IRubyObject result = FiberScheduler.ioWriteMemory(context, scheduler, fptr.io, bufBytes, buf, count);

if (result != null) {
return FiberScheduler.resultApply(context, result);
if (result != null) {
return FiberScheduler.resultApply(context, result);
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions rakelib/test.rake
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ namespace :test do

mri_suites = [:core, :extra, :stdlib]
mri_suites = {
core: "-Xbacktrace.style=mri -Xdebug.fullTrace",
extra: "--disable-gems -Xbacktrace.style=mri -Xdebug.fullTrace",
stdlib: "-Xbacktrace.style=mri -Xdebug.fullTrace",
core: "-Xbacktrace.style=mri -Xdebug.fullTrace -Xexperimental.fiber.scheduler",
extra: "--disable-gems -Xbacktrace.style=mri -Xdebug.fullTrace -Xexperimental.fiber.scheduler",
stdlib: "-Xbacktrace.style=mri -Xdebug.fullTrace -Xexperimental.fiber.scheduler",
}

mri_suites.each do |suite, extra_jruby_opts|
Expand Down

0 comments on commit 54a8d4b

Please sign in to comment.