diff --git a/thread_pthread.c b/thread_pthread.c index 86daa716f46817..ac80f6fe1061bb 100644 --- a/thread_pthread.c +++ b/thread_pthread.c @@ -281,8 +281,8 @@ static void threadptr_trap_interrupt(rb_thread_t *); #define TIME_QUANTUM_USEC (TIME_QUANTUM_MSEC * 1000) #define TIME_QUANTUM_NSEC (TIME_QUANTUM_USEC * 1000) -static void native_thread_dedicated_inc(struct rb_native_thread *nt); -static void native_thread_dedicated_dec(struct rb_native_thread *nt); +static void native_thread_dedicated_inc(rb_vm_t *vm, rb_ractor_t *cr, struct rb_native_thread *nt); +static void native_thread_dedicated_dec(rb_vm_t *vm, rb_ractor_t *cr, struct rb_native_thread *nt); static void native_thread_assign(struct rb_native_thread *nt, rb_thread_t *th); static void ractor_sched_enq(rb_vm_t *vm, rb_ractor_t *r); @@ -376,6 +376,8 @@ thread_sched_set_lock_owner(struct rb_thread_sched *sched, rb_thread_t *th) static void ASSERT_thread_sched_locked(struct rb_thread_sched *sched, rb_thread_t *th) { + VM_ASSERT(rb_native_mutex_trylock(&sched->lock_) == EBUSY); + #if VM_CHECK_MODE if (th) { VM_ASSERT(sched->lock_owner == th); @@ -386,6 +388,74 @@ ASSERT_thread_sched_locked(struct rb_thread_sched *sched, rb_thread_t *th) #endif } +#define ractor_sched_lock(a, b) ractor_sched_lock_(a, b, __FILE__, __LINE__) +#define ractor_sched_unlock(a, b) ractor_sched_unlock_(a, b, __FILE__, __LINE__) + +static unsigned int +rb_ractor_serial(const rb_ractor_t *r) { + if (r) { + return rb_ractor_id(r); + } + else { + return 0; + } +} + +static void +ractor_sched_set_locked(rb_vm_t *vm, rb_ractor_t *cr) +{ +#if VM_CHECK_MODE > 0 + VM_ASSERT(vm->ractor.sched.lock_owner == NULL); + VM_ASSERT(vm->ractor.sched.locked == false); + + vm->ractor.sched.lock_owner = cr; + vm->ractor.sched.locked = true; +#endif +} + +static void +ractor_sched_set_unlocked(rb_vm_t *vm, rb_ractor_t *cr) +{ +#if VM_CHECK_MODE > 0 + VM_ASSERT(vm->ractor.sched.locked); + VM_ASSERT(vm->ractor.sched.lock_owner == cr); + + vm->ractor.sched.locked = false; + vm->ractor.sched.lock_owner = NULL; +#endif +} + +static void +ractor_sched_lock_(rb_vm_t *vm, rb_ractor_t *cr, const char *file, int line) +{ + rb_native_mutex_lock(&vm->ractor.sched.lock); + +#if VM_CHECK_MODE + RUBY_DEBUG_LOG2(file, line, "cr:%u prev_owner:%u", rb_ractor_serial(cr), rb_ractor_serial(vm->ractor.sched.lock_owner)); +#else + RUBY_DEBUG_LOG2(file, line, "cr:%u", rb_ractor_serial(cr)); +#endif + + ractor_sched_set_locked(vm, cr); +} + +static void +ractor_sched_unlock_(rb_vm_t *vm, rb_ractor_t *cr, const char *file, int line) +{ + RUBY_DEBUG_LOG2(file, line, "cr:%u", rb_ractor_serial(cr)); + + ractor_sched_set_unlocked(vm, cr); + rb_native_mutex_unlock(&vm->ractor.sched.lock); +} + +static void +ASSERT_ractor_sched_locked(rb_vm_t *vm, rb_ractor_t *cr) +{ + VM_ASSERT(rb_native_mutex_trylock(&vm->ractor.sched.lock) == EBUSY); + VM_ASSERT(vm->ractor.sched.locked); + VM_ASSERT(cr == NULL || vm->ractor.sched.lock_owner == cr); +} + RBIMPL_ATTR_MAYBE_UNUSED() static bool ractor_sched_running_threads_contain_p(rb_vm_t *vm, rb_thread_t *th) @@ -437,7 +507,8 @@ static void ractor_sched_barrier_join_wait_locked(rb_vm_t *vm, rb_thread_t *th); // setup timeslice signals by the timer thread. static void -thread_sched_setup_running_threads(struct rb_thread_sched *sched, rb_vm_t *vm, rb_thread_t *add_th, rb_thread_t *del_th, rb_thread_t *add_timeslice_th) +thread_sched_setup_running_threads(struct rb_thread_sched *sched, rb_ractor_t *cr, rb_vm_t *vm, + rb_thread_t *add_th, rb_thread_t *del_th, rb_thread_t *add_timeslice_th) { #if USE_RUBY_DEBUG_LOG unsigned int prev_running_cnt = vm->ractor.sched.running_cnt; @@ -457,7 +528,7 @@ thread_sched_setup_running_threads(struct rb_thread_sched *sched, rb_vm_t *vm, r rb_th_serial(add_th), rb_th_serial(del_th), rb_th_serial(add_timeslice_th), rb_th_serial(del_timeslice_th)); - rb_native_mutex_lock(&vm->ractor.sched.lock); + ractor_sched_lock(vm, cr); { // update running_threads if (del_th) { @@ -509,7 +580,7 @@ thread_sched_setup_running_threads(struct rb_thread_sched *sched, rb_vm_t *vm, r VM_ASSERT(ractor_sched_running_threads_size(vm) == vm->ractor.sched.running_cnt); VM_ASSERT(ractor_sched_timeslice_threads_size(vm) <= vm->ractor.sched.running_cnt); } - rb_native_mutex_unlock(&vm->ractor.sched.lock); + ractor_sched_unlock(vm, cr); //RUBY_DEBUG_LOG("+:%u -:%u +ts:%u -ts:%u run:%u->%u", // rb_th_serial(add_th), rb_th_serial(del_th), @@ -524,7 +595,7 @@ thread_sched_add_running_thread(struct rb_thread_sched *sched, rb_thread_t *th) VM_ASSERT(sched->running == th); rb_vm_t *vm = th->vm; - thread_sched_setup_running_threads(sched, vm, th, NULL, ccan_list_empty(&sched->readyq) ? NULL : th); + thread_sched_setup_running_threads(sched, th->ractor, vm, th, NULL, ccan_list_empty(&sched->readyq) ? NULL : th); } static void @@ -533,7 +604,7 @@ thread_sched_del_running_thread(struct rb_thread_sched *sched, rb_thread_t *th) ASSERT_thread_sched_locked(sched, th); rb_vm_t *vm = th->vm; - thread_sched_setup_running_threads(sched, vm, NULL, th, NULL); + thread_sched_setup_running_threads(sched, th->ractor, vm, NULL, th, NULL); } void @@ -623,7 +694,7 @@ thread_sched_enq(struct rb_thread_sched *sched, rb_thread_t *ready_th) if (sched->is_running) { if (ccan_list_empty(&sched->readyq)) { // add sched->running to timeslice - thread_sched_setup_running_threads(sched, ready_th->vm, NULL, NULL, sched->running); + thread_sched_setup_running_threads(sched, ready_th->ractor, ready_th->vm, NULL, NULL, sched->running); } } else { @@ -793,7 +864,7 @@ thread_sched_to_running_common(struct rb_thread_sched *sched, rb_thread_t *th) VM_ASSERT(th_has_dedicated_nt(th)); VM_ASSERT(GET_THREAD() == th); - native_thread_dedicated_dec(th->nt); + native_thread_dedicated_dec(th->vm, th->ractor, th->nt); // waiting -> ready thread_sched_to_ready_common(sched, th, false, false); @@ -866,7 +937,7 @@ thread_sched_to_waiting_common0(struct rb_thread_sched *sched, rb_thread_t *th, rb_thread_execute_hooks(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED); } - if (!to_dead) native_thread_dedicated_inc(th->nt); + if (!to_dead) native_thread_dedicated_inc(th->vm, th->ractor, th->nt); RUBY_DEBUG_LOG("%sth:%u", to_dead ? "to_dead " : "", rb_th_serial(th)); @@ -1034,35 +1105,54 @@ thread_sched_switch(rb_thread_t *cth, rb_thread_t *next_th) thread_sched_switch0(&cth->sched.context, next_th, nt); } +RBIMPL_ATTR_MAYBE_UNUSED() +static unsigned int +grq_size(rb_vm_t *vm, rb_ractor_t *cr) +{ + ASSERT_ractor_sched_locked(vm, cr); + + rb_ractor_t *r, *prev_r = NULL; + unsigned int i = 0; + + ccan_list_for_each(&vm->ractor.sched.grq, r, threads.sched.grq_node) { + i++; + + VM_ASSERT(r != prev_r); + prev_r = r; + } + return i; +} + static void ractor_sched_enq(rb_vm_t *vm, rb_ractor_t *r) { struct rb_thread_sched *sched = &r->threads.sched; + rb_ractor_t *cr = GET_RACTOR(); + VM_ASSERT(sched->running != NULL); VM_ASSERT(sched->running->nt == NULL); - rb_native_mutex_lock(&vm->ractor.sched.lock); + ractor_sched_lock(vm, cr); { +#if VM_CHECK_MODE > 0 + // check if grq contains r + rb_ractor_t *tr; + ccan_list_for_each(&vm->ractor.sched.grq, tr, threads.sched.grq_node) { + VM_ASSERT(r != tr); + } +#endif + ccan_list_add_tail(&vm->ractor.sched.grq, &sched->grq_node); vm->ractor.sched.grq_cnt++; + VM_ASSERT(grq_size(vm, cr) == vm->ractor.sched.grq_cnt); + RUBY_DEBUG_LOG("r:%u th:%u grq_cnt:%u", rb_ractor_id(r), rb_th_serial(sched->running), vm->ractor.sched.grq_cnt); -#if 0 - if (vm->ractor.sched.grq_cnt > 1) { - rb_native_cond_signal(&vm->ractor.sched.cond); - } - else { - // lazy deq - timer_thread_wakeup_locked(vm); - } -#else rb_native_cond_signal(&vm->ractor.sched.cond); -#endif - } - - // ractor_sched_dump(vm); - rb_native_mutex_unlock(&vm->ractor.sched.lock); + // ractor_sched_dump(vm); + } + ractor_sched_unlock(vm, cr); } @@ -1075,30 +1165,18 @@ ractor_sched_enq(rb_vm_t *vm, rb_ractor_t *r) #define MINIMUM_SNT 0 #endif -RBIMPL_ATTR_MAYBE_UNUSED() -static unsigned int -grq_size(const rb_vm_t *vm) -{ - rb_ractor_t *r; - unsigned int i = 0; - ccan_list_for_each(&vm->ractor.sched.grq, r, threads.sched.grq_node) { - i++; - } - return i; -} - static rb_ractor_t * -ractor_sched_deq(rb_vm_t *vm) +ractor_sched_deq(rb_vm_t *vm, rb_ractor_t *cr) { rb_ractor_t *r; - rb_native_mutex_lock(&vm->ractor.sched.lock); + ractor_sched_lock(vm, cr); { RUBY_DEBUG_LOG("empty? %d", ccan_list_empty(&vm->ractor.sched.grq)); // ractor_sched_dump(vm); VM_ASSERT(rb_current_execution_context(false) == NULL); - VM_ASSERT(grq_size(vm) == vm->ractor.sched.grq_cnt); + VM_ASSERT(grq_size(vm, cr) == vm->ractor.sched.grq_cnt); while ((r = ccan_list_pop(&vm->ractor.sched.grq, rb_ractor_t, threads.sched.grq_node)) == NULL) { RUBY_DEBUG_LOG("wait grq_cnt:%d", (int)vm->ractor.sched.grq_cnt); @@ -1116,7 +1194,10 @@ ractor_sched_deq(rb_vm_t *vm) RUBY_DEBUG_LOG("wakeup grq_cnt:%d", (int)vm->ractor.sched.grq_cnt); } #else + ractor_sched_set_unlocked(vm, cr); rb_native_cond_wait(&vm->ractor.sched.cond, &vm->ractor.sched.lock); + ractor_sched_set_locked(vm, cr); + RUBY_DEBUG_LOG("wakeup grq_cnt:%d", (int)vm->ractor.sched.grq_cnt); #endif } @@ -1133,7 +1214,7 @@ ractor_sched_deq(rb_vm_t *vm) // timeout } } - rb_native_mutex_unlock(&vm->ractor.sched.lock); + ractor_sched_unlock(vm, cr); return r; } @@ -1221,7 +1302,7 @@ rb_ractor_sched_barrier_start(rb_vm_t *vm, rb_ractor_t *cr) unsigned int lock_rec; - rb_native_mutex_lock(&vm->ractor.sched.lock); + ractor_sched_lock(vm, cr); { vm->ractor.sched.barrier_waiting = true; @@ -1242,12 +1323,13 @@ rb_ractor_sched_barrier_start(rb_vm_t *vm, rb_ractor_t *cr) // wait for other ractors while (!ractor_sched_barrier_completed_p(vm)) { - rb_native_cond_wait(&vm->ractor.sched.barrier_complete_cond, - &vm->ractor.sched.lock); + ractor_sched_set_unlocked(vm, cr); + rb_native_cond_wait(&vm->ractor.sched.barrier_complete_cond, &vm->ractor.sched.lock); + ractor_sched_set_locked(vm, cr); } } } - rb_native_mutex_unlock(&vm->ractor.sched.lock); + ractor_sched_unlock(vm, cr); // acquire VM lock rb_native_mutex_lock(&vm->ractor.sync.lock); @@ -1256,14 +1338,14 @@ rb_ractor_sched_barrier_start(rb_vm_t *vm, rb_ractor_t *cr) RUBY_DEBUG_LOG("completed seirial:%u", vm->ractor.sched.barrier_serial); - rb_native_mutex_lock(&vm->ractor.sched.lock); + ractor_sched_lock(vm, cr); { vm->ractor.sched.barrier_waiting = false; vm->ractor.sched.barrier_serial++; vm->ractor.sched.barrier_waiting_cnt = 0; rb_native_cond_broadcast(&vm->ractor.sched.barrier_release_cond); } - rb_native_mutex_unlock(&vm->ractor.sched.lock); + ractor_sched_unlock(vm, cr); } static void @@ -1284,7 +1366,11 @@ ractor_sched_barrier_join_wait_locked(rb_vm_t *vm, rb_thread_t *th) while (vm->ractor.sched.barrier_serial == barrier_serial) { RUBY_DEBUG_LOG("sleep serial:%u", barrier_serial); RB_VM_SAVE_MACHINE_CONTEXT(th); + + rb_ractor_t *cr = th->ractor; + ractor_sched_set_unlocked(vm, cr); rb_native_cond_wait(&vm->ractor.sched.barrier_release_cond, &vm->ractor.sched.lock); + ractor_sched_set_locked(vm, cr); RUBY_DEBUG_LOG("wakeup serial:%u", barrier_serial); } @@ -1309,7 +1395,7 @@ rb_ractor_sched_barrier_join(rb_vm_t *vm, rb_ractor_t *cr) VM_ASSERT(vm->ractor.sched.barrier_waiting); // VM needs barrier sync VM_ASSERT(vm->ractor.sched.barrier_serial == barrier_serial); - rb_native_mutex_lock(&vm->ractor.sched.lock); + ractor_sched_lock(vm, cr); { // running_cnt vm->ractor.sched.barrier_waiting_cnt++; @@ -1318,7 +1404,7 @@ rb_ractor_sched_barrier_join(rb_vm_t *vm, rb_ractor_t *cr) ractor_sched_barrier_join_signal_locked(vm); ractor_sched_barrier_join_wait_locked(vm, cr->threads.sched.running); } - rb_native_mutex_unlock(&vm->ractor.sched.lock); + ractor_sched_unlock(vm, cr); } rb_native_mutex_lock(&vm->ractor.sync.lock); @@ -1377,7 +1463,7 @@ thread_sched_atfork(struct rb_thread_sched *sched) thread_sched_to_running(sched, th); } else { - thread_sched_setup_running_threads(sched, vm, th, NULL, NULL); + thread_sched_setup_running_threads(sched, th->ractor, vm, th, NULL, NULL); } } @@ -1508,7 +1594,7 @@ Init_native_thread(rb_thread_t *main_th) TH_SCHED(main_th)->running = main_th; main_th->has_dedicated_nt = 1; - thread_sched_setup_running_threads(TH_SCHED(main_th), vm, main_th, NULL, NULL); + thread_sched_setup_running_threads(TH_SCHED(main_th), main_th->ractor, vm, main_th, NULL, NULL); // setup main NT main_th->nt->dedicated = 1; @@ -1520,34 +1606,36 @@ Init_native_thread(rb_thread_t *main_th) #endif static void -native_thread_dedicated_inc(struct rb_native_thread *nt) +native_thread_dedicated_inc(rb_vm_t *vm, rb_ractor_t *cr, struct rb_native_thread *nt) { RUBY_DEBUG_LOG("nt:%d %d->%d", nt->serial, nt->dedicated, nt->dedicated + 1); if (nt->dedicated == 0) { - rb_native_mutex_lock(&nt->vm->ractor.sched.lock); + ractor_sched_lock(vm, cr); { - nt->vm->ractor.sched.snt_cnt--; - nt->vm->ractor.sched.dnt_cnt++; + vm->ractor.sched.snt_cnt--; + vm->ractor.sched.dnt_cnt++; } - rb_native_mutex_unlock(&nt->vm->ractor.sched.lock); + ractor_sched_unlock(vm, cr); } + nt->dedicated++; } static void -native_thread_dedicated_dec(struct rb_native_thread *nt) +native_thread_dedicated_dec(rb_vm_t *vm, rb_ractor_t *cr, struct rb_native_thread *nt) { RUBY_DEBUG_LOG("nt:%d %d->%d", nt->serial, nt->dedicated, nt->dedicated - 1); VM_ASSERT(nt->dedicated > 0); nt->dedicated--; + if (nt->dedicated == 0) { - rb_native_mutex_lock(&nt->vm->ractor.sched.lock); + ractor_sched_lock(vm, cr); { nt->vm->ractor.sched.snt_cnt++; nt->vm->ractor.sched.dnt_cnt--; } - rb_native_mutex_unlock(&nt->vm->ractor.sched.lock); + ractor_sched_unlock(vm, cr); } } @@ -2053,7 +2141,7 @@ nt_start(void *ptr) } else { RUBY_DEBUG_LOG("check next"); - rb_ractor_t *r = ractor_sched_deq(vm); + rb_ractor_t *r = ractor_sched_deq(vm, NULL); if (r) { struct rb_thread_sched *sched = &r->threads.sched; @@ -2573,7 +2661,7 @@ timer_thread_set_timeout(rb_vm_t *vm) #else int timeout = -1; - rb_native_mutex_lock(&vm->ractor.sched.lock); + ractor_sched_lock(vm, NULL); { if ( !ccan_list_empty(&vm->ractor.sched.timeslice_threads) // (1-1) Provide time slice for active NTs || !ubf_threads_empty() // (1-3) Periodic UBF @@ -2592,7 +2680,7 @@ timer_thread_set_timeout(rb_vm_t *vm) vm->ractor.sched.timeslice_wait_inf = true; } } - rb_native_mutex_unlock(&vm->ractor.sched.lock); + ractor_sched_unlock(vm, NULL); if (vm->ractor.sched.timeslice_wait_inf) { rb_native_mutex_lock(&timer_th.waiting_lock); @@ -2792,7 +2880,7 @@ static void timer_thread_wakeup_locked(rb_vm_t *vm) { // should be locked before. - VM_ASSERT(rb_native_mutex_trylock(&vm->ractor.sched.lock) == EBUSY); + ASSERT_ractor_sched_locked(vm, NULL); if (timer_th.created_fork_gen == current_fork_gen) { if (vm->ractor.sched.timeslice_wait_inf) { @@ -2810,11 +2898,11 @@ timer_thread_wakeup(void) { rb_vm_t *vm = GET_VM(); - rb_native_mutex_lock(&vm->ractor.sched.lock); + ractor_sched_lock(vm, NULL); { timer_thread_wakeup_locked(vm); } - rb_native_mutex_unlock(&vm->ractor.sched.lock); + ractor_sched_unlock(vm, NULL); } static void diff --git a/thread_pthread_mn.c b/thread_pthread_mn.c index 63b352742406d6..b7fbea11103879 100644 --- a/thread_pthread_mn.c +++ b/thread_pthread_mn.c @@ -414,8 +414,11 @@ co_start(struct coroutine_context *from, struct coroutine_context *self) VM_ASSERT(!th_has_dedicated_nt(th)); + rb_vm_t *vm = th->vm; + bool has_ready_ractor = vm->ractor.sched.grq_cnt > 0; // at least this ractor is not queued + rb_thread_t *next_th = sched->running; - if (next_th && !next_th->nt) { + if (!has_ready_ractor && next_th && !next_th->nt) { // switch to the next thread thread_sched_set_lock_owner(sched, NULL); rb_ractor_set_current_ec(th->ractor, NULL); @@ -701,7 +704,7 @@ timer_thread_polling(rb_vm_t *vm) case 0: // timeout RUBY_DEBUG_LOG("timeout%s", ""); - rb_native_mutex_lock(&vm->ractor.sched.lock); + ractor_sched_lock(vm, NULL); { // (1-1) timeslice timer_thread_check_timeslice(vm); @@ -712,7 +715,7 @@ timer_thread_polling(rb_vm_t *vm) rb_native_cond_signal(&vm->ractor.sched.cond); } } - rb_native_mutex_unlock(&vm->ractor.sched.lock); + ractor_sched_unlock(vm, NULL); // (1-2) native_thread_check_and_create_shared(vm); diff --git a/vm_core.h b/vm_core.h index 5e733e7374e16b..add0bfc897d9f6 100644 --- a/vm_core.h +++ b/vm_core.h @@ -635,6 +635,9 @@ typedef struct rb_vm_struct { // ractor scheduling struct { rb_nativethread_lock_t lock; + struct rb_ractor_struct *lock_owner; + bool locked; + rb_nativethread_cond_t cond; // GRQ unsigned int snt_cnt; // count of shared NTs unsigned int dnt_cnt; // count of dedicated NTs