diff --git a/internal/thread.h b/internal/thread.h index 47273436e3bdbd..a07709a1bd0b7d 100644 --- a/internal/thread.h +++ b/internal/thread.h @@ -76,4 +76,25 @@ RUBY_SYMBOL_EXPORT_END int rb_threadptr_execute_interrupts(struct rb_thread_struct *th, int blocking_timing); bool rb_thread_mn_schedulable(VALUE thread); +// interrupt exec + +typedef VALUE (rb_interrupt_exec_func_t)(void *data); + +enum rb_interrupt_exec_flag { + rb_interrupt_exec_flag_none = 0x00, + rb_interrupt_exec_flag_value_data = 0x01, +}; + +// interrupt the target_th and run func. +struct rb_ractor_struct; + +void rb_threadptr_interrupt_exec(struct rb_thread_struct *target_th, + rb_interrupt_exec_func_t *func, void *data, enum rb_interrupt_exec_flag flags); + +// create a thread in the target_r and run func on the created thread. +void rb_ractor_interrupt_exec(struct rb_ractor_struct *target_r, + rb_interrupt_exec_func_t *func, void *data, enum rb_interrupt_exec_flag flags); + +void rb_threadptr_interrupt_exec_task_mark(struct rb_thread_struct *th); + #endif /* INTERNAL_THREAD_H */ diff --git a/thread.c b/thread.c index 45aa0565ca897b..b8f10e756e9435 100644 --- a/thread.c +++ b/thread.c @@ -342,25 +342,33 @@ unblock_function_clear(rb_thread_t *th) } static void -rb_threadptr_interrupt_common(rb_thread_t *th, int trap) +threadptr_interrupt_locked(rb_thread_t *th, bool trap) { + // th->interrupt_lock should be acquired here + RUBY_DEBUG_LOG("th:%u trap:%d", rb_th_serial(th), trap); + if (trap) { + RUBY_VM_SET_TRAP_INTERRUPT(th->ec); + } + else { + RUBY_VM_SET_INTERRUPT(th->ec); + } + + if (th->unblock.func != NULL) { + (th->unblock.func)(th->unblock.arg); + } + else { + /* none */ + } +} + +static void +threadptr_interrupt(rb_thread_t *th, int trap) +{ rb_native_mutex_lock(&th->interrupt_lock); { - if (trap) { - RUBY_VM_SET_TRAP_INTERRUPT(th->ec); - } - else { - RUBY_VM_SET_INTERRUPT(th->ec); - } - - if (th->unblock.func != NULL) { - (th->unblock.func)(th->unblock.arg); - } - else { - /* none */ - } + threadptr_interrupt_locked(th, trap); } rb_native_mutex_unlock(&th->interrupt_lock); } @@ -369,13 +377,13 @@ void rb_threadptr_interrupt(rb_thread_t *th) { RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); - rb_threadptr_interrupt_common(th, 0); + threadptr_interrupt(th, false); } static void threadptr_trap_interrupt(rb_thread_t *th) { - rb_threadptr_interrupt_common(th, 1); + threadptr_interrupt(th, true); } static void @@ -490,6 +498,7 @@ rb_thread_terminate_all(rb_thread_t *th) } void rb_threadptr_root_fiber_terminate(rb_thread_t *th); +static void threadptr_interrupt_exec_cleanup(rb_thread_t *th); static void thread_cleanup_func_before_exec(void *th_ptr) @@ -500,6 +509,7 @@ thread_cleanup_func_before_exec(void *th_ptr) // The thread stack doesn't exist in the forked process: th->ec->machine.stack_start = th->ec->machine.stack_end = NULL; + threadptr_interrupt_exec_cleanup(th); rb_threadptr_root_fiber_terminate(th); } @@ -2442,6 +2452,8 @@ threadptr_get_interrupts(rb_thread_t *th) return interrupt & (rb_atomic_t)~ec->interrupt_mask; } +static void threadptr_interrupt_exec_exec(rb_thread_t *th); + int rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing) { @@ -2473,17 +2485,29 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing) rb_postponed_job_flush(th->vm); } - /* signal handling */ - if (trap_interrupt && (th == th->vm->ractor.main_thread)) { - enum rb_thread_status prev_status = th->status; + if (trap_interrupt) { + /* signal handling */ + if (th == th->vm->ractor.main_thread) { + enum rb_thread_status prev_status = th->status; - th->status = THREAD_RUNNABLE; - { - while ((sig = rb_get_next_signal()) != 0) { - ret |= rb_signal_exec(th, sig); + th->status = THREAD_RUNNABLE; + { + while ((sig = rb_get_next_signal()) != 0) { + ret |= rb_signal_exec(th, sig); + } } + th->status = prev_status; + } + + if (!ccan_list_empty(&th->interrupt_exec_tasks)) { + enum rb_thread_status prev_status = th->status; + + th->status = THREAD_RUNNABLE; + { + threadptr_interrupt_exec_exec(th); + } + th->status = prev_status; } - th->status = prev_status; } /* exception from another thread */ @@ -4718,6 +4742,7 @@ rb_thread_atfork_internal(rb_thread_t *th, void (*atfork)(rb_thread_t *, const r /* may be held by any thread in parent */ rb_native_mutex_initialize(&th->interrupt_lock); + ccan_list_head_init(&th->interrupt_exec_tasks); vm->fork_gen++; rb_ractor_sleeper_threads_clear(th->ractor); @@ -5938,3 +5963,120 @@ rb_internal_thread_specific_set(VALUE thread_val, rb_internal_thread_specific_ke th->specific_storage[key] = data; } + +// interrupt_exec + +struct rb_interrupt_exec_task { + struct ccan_list_node node; + + rb_interrupt_exec_func_t *func; + void *data; + enum rb_interrupt_exec_flag flags; +}; + +void +rb_threadptr_interrupt_exec_task_mark(rb_thread_t *th) +{ + struct rb_interrupt_exec_task *task; + + ccan_list_for_each(&th->interrupt_exec_tasks, task, node) { + if (task->flags & rb_interrupt_exec_flag_value_data) { + rb_gc_mark((VALUE)task->data); + } + } +} + +// native thread safe +// th should be available +void +rb_threadptr_interrupt_exec(rb_thread_t *th, rb_interrupt_exec_func_t *func, void *data, enum rb_interrupt_exec_flag flags) +{ + // should not use ALLOC + struct rb_interrupt_exec_task *task = ALLOC(struct rb_interrupt_exec_task); + *task = (struct rb_interrupt_exec_task) { + .flags = flags, + .func = func, + .data = data, + }; + + rb_native_mutex_lock(&th->interrupt_lock); + { + ccan_list_add_tail(&th->interrupt_exec_tasks, &task->node); + threadptr_interrupt_locked(th, true); + } + rb_native_mutex_unlock(&th->interrupt_lock); +} + +static void +threadptr_interrupt_exec_exec(rb_thread_t *th) +{ + while (1) { + struct rb_interrupt_exec_task *task; + + rb_native_mutex_lock(&th->interrupt_lock); + { + task = ccan_list_pop(&th->interrupt_exec_tasks, struct rb_interrupt_exec_task, node); + } + rb_native_mutex_unlock(&th->interrupt_lock); + + if (task) { + (*task->func)(task->data); + ruby_xfree(task); + } + else { + break; + } + } +} + +static void +threadptr_interrupt_exec_cleanup(rb_thread_t *th) +{ + rb_native_mutex_lock(&th->interrupt_lock); + { + struct rb_interrupt_exec_task *task; + + while ((task = ccan_list_pop(&th->interrupt_exec_tasks, struct rb_interrupt_exec_task, node)) != NULL) { + ruby_xfree(task); + } + } + rb_native_mutex_unlock(&th->interrupt_lock); +} + +struct interrupt_ractor_new_thread_data { + rb_interrupt_exec_func_t *func; + void *data; +}; + +static VALUE +interrupt_ractor_new_thread_func(void *data) +{ + struct interrupt_ractor_new_thread_data d = *(struct interrupt_ractor_new_thread_data *)data; + ruby_xfree(data); + + d.func(d.data); + return Qnil; +} + +static VALUE +interrupt_ractor_func(void *data) +{ + rb_thread_create(interrupt_ractor_new_thread_func, data); + return Qnil; +} + +// native thread safe +// func/data should be native thread safe +void +rb_ractor_interrupt_exec(struct rb_ractor_struct *target_r, + rb_interrupt_exec_func_t *func, void *data, enum rb_interrupt_exec_flag flags) +{ + struct interrupt_ractor_new_thread_data *d = ALLOC(struct interrupt_ractor_new_thread_data); + + d->func = func; + d->data = data; + rb_thread_t *main_th = target_r->threads.main; + rb_threadptr_interrupt_exec(main_th, interrupt_ractor_func, d, flags); + + // TODO MEMO: we can create a new thread in a ractor, but not sure how to do that now. +} diff --git a/vm.c b/vm.c index c7ccbc9550da8f..5644ddd1732568 100644 --- a/vm.c +++ b/vm.c @@ -3484,6 +3484,8 @@ thread_mark(void *ptr) rb_gc_mark(th->scheduler); + rb_threadptr_interrupt_exec_task_mark(th); + RUBY_MARK_LEAVE("thread"); } @@ -3637,6 +3639,8 @@ th_init(rb_thread_t *th, VALUE self, rb_vm_t *vm) th->report_on_exception = vm->thread_report_on_exception; th->ext_config.ractor_safe = true; + ccan_list_head_init(&th->interrupt_exec_tasks); + #if USE_RUBY_DEBUG_LOG static rb_atomic_t thread_serial = 1; th->serial = RUBY_ATOMIC_FETCH_ADD(thread_serial, 1); diff --git a/vm_core.h b/vm_core.h index 2a75332820ec66..2e90fd1b4d4e5d 100644 --- a/vm_core.h +++ b/vm_core.h @@ -1134,6 +1134,7 @@ typedef struct rb_thread_struct { struct rb_unblock_callback unblock; VALUE locking_mutex; struct rb_mutex_struct *keeping_mutexes; + struct ccan_list_head interrupt_exec_tasks; struct rb_waiting_list *join_list;