Skip to content

Commit

Permalink
M:N thread scheduler for Ractors
Browse files Browse the repository at this point in the history
This patch introduce M:N thread scheduler for Ractor system.

In general, M:N thread scheduler employs N native threads (OS threads)
to manage M user-level threads (Ruby threads in this case).
On the Ruby interpreter, 1 native thread is provided for 1 Ractor
and all Ruby threads are managed by the native thread.

From Ruby 1.9, the interpreter uses 1:1 thread scheduler which means
1 Ruby thread has 1 native thread. M:N scheduler change this strategy.

Because of compatibility issue (and stableness issue of the implementation)
main Ractor doesn't use M:N scheduler on default. On the other words,
threads on the main Ractor will be managed with 1:1 thread scheduler.

There are additional settings by environment variables:

`RUBY_MN_THREADS=1` enables M:N thread scheduler on the main ractor.
Note that non-main ractors use the M:N scheduler without this
configuration. With this configuration, single ractor applications
run threads on M:1 thread scheduler (green threads, user-level threads).

`RUBY_MAX_CPU=n` specifies maximum number of native threads for
M:N scheduler (default: 8).

This patch will be reverted soon if non-easy issues are found.

[Bug #19842]
  • Loading branch information
ko1 committed Oct 11, 2023
1 parent 802ca3a commit a3a371c
Show file tree
Hide file tree
Showing 26 changed files with 3,619 additions and 1,566 deletions.
14 changes: 14 additions & 0 deletions bootstraptest/test_thread.rb
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,20 @@
end
}

assert_equal 'true', %{
Thread.new{}.join
begin
Process.waitpid2 fork{
Thread.new{
sleep 0.1
}.join
}
true
rescue NotImplementedError
true
end
}

assert_equal 'ok', %{
open("zzz_t1.rb", "w") do |f|
f.puts <<-END
Expand Down
3 changes: 3 additions & 0 deletions common.mk
Original file line number Diff line number Diff line change
Expand Up @@ -15323,6 +15323,7 @@ ruby.$(OBJEXT): $(top_srcdir)/internal/ruby_parser.h
ruby.$(OBJEXT): $(top_srcdir)/internal/serial.h
ruby.$(OBJEXT): $(top_srcdir)/internal/static_assert.h
ruby.$(OBJEXT): $(top_srcdir)/internal/string.h
ruby.$(OBJEXT): $(top_srcdir)/internal/thread.h
ruby.$(OBJEXT): $(top_srcdir)/internal/variable.h
ruby.$(OBJEXT): $(top_srcdir)/internal/vm.h
ruby.$(OBJEXT): $(top_srcdir)/internal/warnings.h
Expand Down Expand Up @@ -17536,6 +17537,7 @@ thread.$(OBJEXT): $(top_srcdir)/internal/time.h
thread.$(OBJEXT): $(top_srcdir)/internal/variable.h
thread.$(OBJEXT): $(top_srcdir)/internal/vm.h
thread.$(OBJEXT): $(top_srcdir)/internal/warnings.h
thread.$(OBJEXT): {$(VPATH)}$(COROUTINE_H)
thread.$(OBJEXT): {$(VPATH)}assert.h
thread.$(OBJEXT): {$(VPATH)}atomic.h
thread.$(OBJEXT): {$(VPATH)}backward/2/assume.h
Expand Down Expand Up @@ -17730,6 +17732,7 @@ thread.$(OBJEXT): {$(VPATH)}thread.h
thread.$(OBJEXT): {$(VPATH)}thread_$(THREAD_MODEL).c
thread.$(OBJEXT): {$(VPATH)}thread_$(THREAD_MODEL).h
thread.$(OBJEXT): {$(VPATH)}thread_native.h
thread.$(OBJEXT): {$(VPATH)}thread_pthread_mn.c
thread.$(OBJEXT): {$(VPATH)}thread_sync.c
thread.$(OBJEXT): {$(VPATH)}thread_sync.rbinc
thread.$(OBJEXT): {$(VPATH)}timev.h
Expand Down
2 changes: 2 additions & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -1342,6 +1342,8 @@ AC_CHECK_HEADERS(syscall.h)
AC_CHECK_HEADERS(time.h)
AC_CHECK_HEADERS(ucontext.h)
AC_CHECK_HEADERS(utime.h)
AC_CHECK_HEADERS(sys/epoll.h)

AS_CASE("$target_cpu", [x64|x86_64|i[3-6]86*], [
AC_CHECK_HEADERS(x86intrin.h)
])
Expand Down
9 changes: 7 additions & 2 deletions debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,10 @@ setup_debug_log(void)
(ruby_debug_log_mode & ruby_debug_log_memory) ? "[mem]" : "",
(ruby_debug_log_mode & ruby_debug_log_stderr) ? "[stderr]" : "",
(ruby_debug_log_mode & ruby_debug_log_file) ? "[file]" : "");
if (debug_log.output_file[0]) {
fprintf(stderr, "RUBY_DEBUG_LOG filename=%s\n", debug_log.output_file);
}

rb_nativethread_lock_initialize(&debug_log.lock);

setup_debug_log_filter();
Expand Down Expand Up @@ -609,10 +613,11 @@ ruby_debug_log(const char *file, int line, const char *func_name, const char *fm
// ractor information
if (ruby_single_main_ractor == NULL) {
rb_ractor_t *cr = th ? th->ractor : NULL;
rb_vm_t *vm = GET_VM();

if (r && len < MAX_DEBUG_LOG_MESSAGE_LEN) {
r = snprintf(buff + len, MAX_DEBUG_LOG_MESSAGE_LEN - len, "\tr:#%d/%u",
cr ? (int)rb_ractor_id(cr) : -1, GET_VM()->ractor.cnt);
r = snprintf(buff + len, MAX_DEBUG_LOG_MESSAGE_LEN - len, "\tr:#%d/%u (%u)",
cr ? (int)rb_ractor_id(cr) : -1, vm->ractor.cnt, vm->ractor.sched.running_cnt);

if (r < 0) rb_bug("ruby_debug_log returns %d", r);
len += r;
Expand Down
4 changes: 2 additions & 2 deletions dir.c
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,7 @@ dir_read(VALUE dir)
struct dirent *dp;

GetDIR(dir, dirp);
errno = 0;
rb_errno_set(0);
if ((dp = READDIR(dirp->dir, dirp->enc)) != NULL) {
return rb_external_str_new_with_enc(dp->d_name, NAMLEN(dp), dirp->enc);
}
Expand Down Expand Up @@ -1723,7 +1723,7 @@ nogvl_opendir_at(void *ptr)
/* fallthrough*/
case 0:
if (fd >= 0) close(fd);
errno = e;
rb_errno_set(e);
}
}
#else /* !USE_OPENDIR_AT */
Expand Down
18 changes: 18 additions & 0 deletions eval.c
Original file line number Diff line number Diff line change
Expand Up @@ -2110,3 +2110,21 @@ Init_eval(void)
id_signo = rb_intern_const("signo");
id_status = rb_intern_const("status");
}

int
rb_errno(void)
{
return *rb_orig_errno_ptr();
}

void
rb_errno_set(int e)
{
*rb_orig_errno_ptr() = e;
}

int *
rb_errno_ptr(void)
{
return rb_orig_errno_ptr();
}
18 changes: 18 additions & 0 deletions include/ruby/ruby.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,24 @@ RBIMPL_ATTR_FORMAT(RBIMPL_PRINTF_FORMAT, 3, 0)
*/
int ruby_vsnprintf(char *str, size_t n, char const *fmt, va_list ap);

// TODO: doc

#include <errno.h>

int rb_errno(void);
void rb_errno_set(int);
int *rb_errno_ptr(void);

static inline int *
rb_orig_errno_ptr(void)
{
return &errno;
}

#define rb_orig_errno errno
#undef errno
#define errno (*rb_errno_ptr())

/** @cond INTERNAL_MACRO */
#if RBIMPL_HAS_WARNING("-Wgnu-zero-variadic-macro-arguments")
# /* Skip it; clang -pedantic doesn't like the following */
Expand Down
1 change: 1 addition & 0 deletions internal/thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ void rb_mutex_allow_trap(VALUE self, int val);
VALUE rb_uninterruptible(VALUE (*b_proc)(VALUE), VALUE data);
VALUE rb_mutex_owned_p(VALUE self);
VALUE rb_exec_recursive_outer_mid(VALUE (*f)(VALUE g, VALUE h, int r), VALUE g, VALUE h, ID mid);
void ruby_mn_threads_params(void);

int rb_thread_wait_for_single_fd(int fd, int events, struct timeval * timeout);

Expand Down
64 changes: 35 additions & 29 deletions process.c
Original file line number Diff line number Diff line change
Expand Up @@ -685,10 +685,16 @@ rb_last_status_set(int status, rb_pid_t pid)
GET_THREAD()->last_status = rb_process_status_new(pid, status, 0);
}

static void
last_status_clear(rb_thread_t *th)
{
th->last_status = Qnil;
}

void
rb_last_status_clear(void)
{
GET_THREAD()->last_status = Qnil;
last_status_clear(GET_THREAD());
}

static rb_pid_t
Expand Down Expand Up @@ -1654,26 +1660,13 @@ before_exec(void)
before_exec_async_signal_safe();
}

/* This function should be async-signal-safe. Actually it is. */
static void
after_exec_async_signal_safe(void)
{
}

static void
after_exec_non_async_signal_safe(void)
after_exec(void)
{
rb_thread_reset_timer_thread();
rb_thread_start_timer_thread();
}

static void
after_exec(void)
{
after_exec_async_signal_safe();
after_exec_non_async_signal_safe();
}

#if defined HAVE_WORKING_FORK || defined HAVE_DAEMON
static void
before_fork_ruby(void)
Expand All @@ -1686,10 +1679,14 @@ after_fork_ruby(rb_pid_t pid)
{
rb_threadptr_pending_interrupt_clear(GET_THREAD());
if (pid == 0) {
// child
clear_pid_cache();
rb_thread_atfork();
}
after_exec();
else {
// parent
after_exec();
}
}
#endif

Expand Down Expand Up @@ -4210,16 +4207,19 @@ rb_fork_ruby2(struct rb_process_status *status)

while (1) {
prefork();
disable_child_handler_before_fork(&old);

before_fork_ruby();
pid = rb_fork();
err = errno;
if (status) {
status->pid = pid;
status->error = err;
disable_child_handler_before_fork(&old);
{
pid = rb_fork();
err = errno;
if (status) {
status->pid = pid;
status->error = err;
}
}
after_fork_ruby(pid);
disable_child_handler_fork_parent(&old); /* yes, bad name */
after_fork_ruby(pid);

if (pid >= 0) { /* fork succeed */
return pid;
Expand Down Expand Up @@ -4663,11 +4663,16 @@ static VALUE
do_spawn_process(VALUE arg)
{
struct spawn_args *argp = (struct spawn_args *)arg;

rb_execarg_parent_start1(argp->execarg);

return (VALUE)rb_spawn_process(DATA_PTR(argp->execarg),
argp->errmsg.ptr, argp->errmsg.buflen);
}

NOINLINE(static rb_pid_t
rb_execarg_spawn(VALUE execarg_obj, char *errmsg, size_t errmsg_buflen));

static rb_pid_t
rb_execarg_spawn(VALUE execarg_obj, char *errmsg, size_t errmsg_buflen)
{
Expand All @@ -4676,8 +4681,10 @@ rb_execarg_spawn(VALUE execarg_obj, char *errmsg, size_t errmsg_buflen)
args.execarg = execarg_obj;
args.errmsg.ptr = errmsg;
args.errmsg.buflen = errmsg_buflen;
return (rb_pid_t)rb_ensure(do_spawn_process, (VALUE)&args,
execarg_parent_end, execarg_obj);

rb_pid_t r = (rb_pid_t)rb_ensure(do_spawn_process, (VALUE)&args,
execarg_parent_end, execarg_obj);
return r;
}

static rb_pid_t
Expand Down Expand Up @@ -4820,26 +4827,25 @@ rb_spawn(int argc, const VALUE *argv)
static VALUE
rb_f_system(int argc, VALUE *argv, VALUE _)
{
rb_thread_t *th = GET_THREAD();
VALUE execarg_obj = rb_execarg_new(argc, argv, TRUE, TRUE);
struct rb_execarg *eargp = rb_execarg_get(execarg_obj);

struct rb_process_status status = {0};
eargp->status = &status;

rb_last_status_clear();
last_status_clear(th);

// This function can set the thread's last status.
// May be different from waitpid_state.pid on exec failure.
rb_pid_t pid = rb_execarg_spawn(execarg_obj, 0, 0);

if (pid > 0) {
VALUE status = rb_process_status_wait(pid, 0);

struct rb_process_status *data = rb_check_typeddata(status, &rb_process_status_type);

// Set the last status:
rb_obj_freeze(status);
GET_THREAD()->last_status = status;
th->last_status = status;

if (data->status == EXIT_SUCCESS) {
return Qtrue;
Expand Down
Loading

0 comments on commit a3a371c

Please sign in to comment.