Skip to content

Commit

Permalink
interrupt_exec
Browse files Browse the repository at this point in the history
introduce
- rb_threadptr_interrupt_exec
- rb_ractor_interrupt_exec
  • Loading branch information
ko1 committed Jul 11, 2024
1 parent 6dc0086 commit 747d112
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 24 deletions.
21 changes: 21 additions & 0 deletions internal/thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,25 @@ RUBY_SYMBOL_EXPORT_END
int rb_threadptr_execute_interrupts(struct rb_thread_struct *th, int blocking_timing);
bool rb_thread_mn_schedulable(VALUE thread);

// interrupt exec

typedef VALUE (rb_interrupt_exec_func_t)(void *data);

enum rb_interrupt_exec_flag {
rb_interrupt_exec_flag_none = 0x00,
rb_interrupt_exec_flag_value_data = 0x01,
};

// interrupt the target_th and run func.
struct rb_ractor_struct;

void rb_threadptr_interrupt_exec(struct rb_thread_struct *target_th,
rb_interrupt_exec_func_t *func, void *data, enum rb_interrupt_exec_flag flags);

// create a thread in the target_r and run func on the created thread.
void rb_ractor_interrupt_exec(struct rb_ractor_struct *target_r,
rb_interrupt_exec_func_t *func, void *data, enum rb_interrupt_exec_flag flags);

void rb_threadptr_interrupt_exec_task_mark(struct rb_thread_struct *th);

#endif /* INTERNAL_THREAD_H */
190 changes: 166 additions & 24 deletions thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -342,25 +342,33 @@ unblock_function_clear(rb_thread_t *th)
}

static void
rb_threadptr_interrupt_common(rb_thread_t *th, int trap)
threadptr_interrupt_locked(rb_thread_t *th, bool trap)
{
// th->interrupt_lock should be acquired here

RUBY_DEBUG_LOG("th:%u trap:%d", rb_th_serial(th), trap);

if (trap) {
RUBY_VM_SET_TRAP_INTERRUPT(th->ec);
}
else {
RUBY_VM_SET_INTERRUPT(th->ec);
}

if (th->unblock.func != NULL) {
(th->unblock.func)(th->unblock.arg);
}
else {
/* none */
}
}

static void
threadptr_interrupt(rb_thread_t *th, int trap)
{
rb_native_mutex_lock(&th->interrupt_lock);
{
if (trap) {
RUBY_VM_SET_TRAP_INTERRUPT(th->ec);
}
else {
RUBY_VM_SET_INTERRUPT(th->ec);
}

if (th->unblock.func != NULL) {
(th->unblock.func)(th->unblock.arg);
}
else {
/* none */
}
threadptr_interrupt_locked(th, trap);
}
rb_native_mutex_unlock(&th->interrupt_lock);
}
Expand All @@ -369,13 +377,13 @@ void
rb_threadptr_interrupt(rb_thread_t *th)
{
RUBY_DEBUG_LOG("th:%u", rb_th_serial(th));
rb_threadptr_interrupt_common(th, 0);
threadptr_interrupt(th, false);
}

static void
threadptr_trap_interrupt(rb_thread_t *th)
{
rb_threadptr_interrupt_common(th, 1);
threadptr_interrupt(th, true);
}

static void
Expand Down Expand Up @@ -490,6 +498,7 @@ rb_thread_terminate_all(rb_thread_t *th)
}

void rb_threadptr_root_fiber_terminate(rb_thread_t *th);
static void threadptr_interrupt_exec_cleanup(rb_thread_t *th);

static void
thread_cleanup_func_before_exec(void *th_ptr)
Expand All @@ -500,6 +509,7 @@ thread_cleanup_func_before_exec(void *th_ptr)
// The thread stack doesn't exist in the forked process:
th->ec->machine.stack_start = th->ec->machine.stack_end = NULL;

threadptr_interrupt_exec_cleanup(th);
rb_threadptr_root_fiber_terminate(th);
}

Expand Down Expand Up @@ -2442,6 +2452,8 @@ threadptr_get_interrupts(rb_thread_t *th)
return interrupt & (rb_atomic_t)~ec->interrupt_mask;
}

static void threadptr_interrupt_exec_exec(rb_thread_t *th);

int
rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
{
Expand Down Expand Up @@ -2473,17 +2485,29 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
rb_postponed_job_flush(th->vm);
}

/* signal handling */
if (trap_interrupt && (th == th->vm->ractor.main_thread)) {
enum rb_thread_status prev_status = th->status;
if (trap_interrupt) {
/* signal handling */
if (th == th->vm->ractor.main_thread) {
enum rb_thread_status prev_status = th->status;

th->status = THREAD_RUNNABLE;
{
while ((sig = rb_get_next_signal()) != 0) {
ret |= rb_signal_exec(th, sig);
th->status = THREAD_RUNNABLE;
{
while ((sig = rb_get_next_signal()) != 0) {
ret |= rb_signal_exec(th, sig);
}
}
th->status = prev_status;
}

if (!ccan_list_empty(&th->interrupt_exec_tasks)) {
enum rb_thread_status prev_status = th->status;

th->status = THREAD_RUNNABLE;
{
threadptr_interrupt_exec_exec(th);
}
th->status = prev_status;
}
th->status = prev_status;
}

/* exception from another thread */
Expand Down Expand Up @@ -4718,6 +4742,7 @@ rb_thread_atfork_internal(rb_thread_t *th, void (*atfork)(rb_thread_t *, const r

/* may be held by any thread in parent */
rb_native_mutex_initialize(&th->interrupt_lock);
ccan_list_head_init(&th->interrupt_exec_tasks);

vm->fork_gen++;
rb_ractor_sleeper_threads_clear(th->ractor);
Expand Down Expand Up @@ -5938,3 +5963,120 @@ rb_internal_thread_specific_set(VALUE thread_val, rb_internal_thread_specific_ke

th->specific_storage[key] = data;
}

// interrupt_exec

struct rb_interrupt_exec_task {
struct ccan_list_node node;

rb_interrupt_exec_func_t *func;
void *data;
enum rb_interrupt_exec_flag flags;
};

void
rb_threadptr_interrupt_exec_task_mark(rb_thread_t *th)
{
struct rb_interrupt_exec_task *task;

ccan_list_for_each(&th->interrupt_exec_tasks, task, node) {
if (task->flags & rb_interrupt_exec_flag_value_data) {
rb_gc_mark((VALUE)task->data);
}
}
}

// native thread safe
// th should be available
void
rb_threadptr_interrupt_exec(rb_thread_t *th, rb_interrupt_exec_func_t *func, void *data, enum rb_interrupt_exec_flag flags)
{
// should not use ALLOC
struct rb_interrupt_exec_task *task = ALLOC(struct rb_interrupt_exec_task);
*task = (struct rb_interrupt_exec_task) {
.flags = flags,
.func = func,
.data = data,
};

rb_native_mutex_lock(&th->interrupt_lock);
{
ccan_list_add_tail(&th->interrupt_exec_tasks, &task->node);
threadptr_interrupt_locked(th, true);
}
rb_native_mutex_unlock(&th->interrupt_lock);
}

static void
threadptr_interrupt_exec_exec(rb_thread_t *th)
{
while (1) {
struct rb_interrupt_exec_task *task;

rb_native_mutex_lock(&th->interrupt_lock);
{
task = ccan_list_pop(&th->interrupt_exec_tasks, struct rb_interrupt_exec_task, node);
}
rb_native_mutex_unlock(&th->interrupt_lock);

if (task) {
(*task->func)(task->data);
ruby_xfree(task);
}
else {
break;
}
}
}

static void
threadptr_interrupt_exec_cleanup(rb_thread_t *th)
{
rb_native_mutex_lock(&th->interrupt_lock);
{
struct rb_interrupt_exec_task *task;

while ((task = ccan_list_pop(&th->interrupt_exec_tasks, struct rb_interrupt_exec_task, node)) != NULL) {
ruby_xfree(task);
}
}
rb_native_mutex_unlock(&th->interrupt_lock);
}

struct interrupt_ractor_new_thread_data {
rb_interrupt_exec_func_t *func;
void *data;
};

static VALUE
interrupt_ractor_new_thread_func(void *data)
{
struct interrupt_ractor_new_thread_data d = *(struct interrupt_ractor_new_thread_data *)data;
ruby_xfree(data);

d.func(d.data);
return Qnil;
}

static VALUE
interrupt_ractor_func(void *data)
{
rb_thread_create(interrupt_ractor_new_thread_func, data);
return Qnil;
}

// native thread safe
// func/data should be native thread safe
void
rb_ractor_interrupt_exec(struct rb_ractor_struct *target_r,
rb_interrupt_exec_func_t *func, void *data, enum rb_interrupt_exec_flag flags)
{
struct interrupt_ractor_new_thread_data *d = ALLOC(struct interrupt_ractor_new_thread_data);

d->func = func;
d->data = data;
rb_thread_t *main_th = target_r->threads.main;
rb_threadptr_interrupt_exec(main_th, interrupt_ractor_func, d, flags);

// TODO MEMO: we can create a new thread in a ractor, but not sure how to do that now.
}
4 changes: 4 additions & 0 deletions vm.c
Original file line number Diff line number Diff line change
Expand Up @@ -3484,6 +3484,8 @@ thread_mark(void *ptr)

rb_gc_mark(th->scheduler);

rb_threadptr_interrupt_exec_task_mark(th);

RUBY_MARK_LEAVE("thread");
}

Expand Down Expand Up @@ -3637,6 +3639,8 @@ th_init(rb_thread_t *th, VALUE self, rb_vm_t *vm)
th->report_on_exception = vm->thread_report_on_exception;
th->ext_config.ractor_safe = true;

ccan_list_head_init(&th->interrupt_exec_tasks);

#if USE_RUBY_DEBUG_LOG
static rb_atomic_t thread_serial = 1;
th->serial = RUBY_ATOMIC_FETCH_ADD(thread_serial, 1);
Expand Down
1 change: 1 addition & 0 deletions vm_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -1134,6 +1134,7 @@ typedef struct rb_thread_struct {
struct rb_unblock_callback unblock;
VALUE locking_mutex;
struct rb_mutex_struct *keeping_mutexes;
struct ccan_list_head interrupt_exec_tasks;

struct rb_waiting_list *join_list;

Expand Down

0 comments on commit 747d112

Please sign in to comment.