diff --git a/common.mk b/common.mk index 0b348bf52d2bbd..7a53990769735b 100644 --- a/common.mk +++ b/common.mk @@ -16561,6 +16561,7 @@ thread.$(OBJEXT): {$(VPATH)}thread.c thread.$(OBJEXT): {$(VPATH)}thread.h thread.$(OBJEXT): {$(VPATH)}thread_$(THREAD_MODEL).c thread.$(OBJEXT): {$(VPATH)}thread_$(THREAD_MODEL).h +thread.$(OBJEXT): {$(VPATH)}thread_$(THREAD_MODEL)_mn.c thread.$(OBJEXT): {$(VPATH)}thread_native.h thread.$(OBJEXT): {$(VPATH)}thread_sync.c thread.$(OBJEXT): {$(VPATH)}thread_sync.rbinc diff --git a/thread_pthread.c b/thread_pthread.c index 31f2333cc0a0ae..3672962f924f9e 100644 --- a/thread_pthread.c +++ b/thread_pthread.c @@ -60,6 +60,17 @@ static pthread_condattr_t *condattr_monotonic = &condattr_mono; static const void *const condattr_monotonic = NULL; #endif +#ifndef HAVE_EPOLL +#define HAVE_EPOLL 0 +#endif + +#if HAVE_EPOLL + #include + #define USE_MN_THREADS 1 +#else + #define USE_MN_THREADS 0 +#endif + // native thread wrappers #define NATIVE_MUTEX_LOCK_DEBUG 0 @@ -677,6 +688,7 @@ thread_sched_to_ready_common(struct rb_thread_sched *sched, rb_thread_t *th, boo // // `th` had became "waiting" state by `thread_sched_to_waiting` // and `thread_sched_to_ready` enqueue `th` to the thread ready queue. +RBIMPL_ATTR_MAYBE_UNUSED() static void thread_sched_to_ready(struct rb_thread_sched *sched, rb_thread_t *th) { @@ -916,95 +928,6 @@ setup_ubf(rb_thread_t *th, rb_unblock_function_t *func, void *arg) rb_native_mutex_unlock(&th->interrupt_lock); } -static bool timer_thread_cancel_waiting(rb_thread_t *th); - -static void -ubf_event_waiting(void *ptr) -{ - rb_thread_t *th = (rb_thread_t *)ptr; - struct rb_thread_sched *sched = TH_SCHED(th); - - RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); - - VM_ASSERT(th->nt == NULL || !th_has_dedicated_nt(th)); - - // only once. it is safe because th->interrupt_lock is already acquired. - th->unblock.func = NULL; - th->unblock.arg = NULL; - - bool canceled = timer_thread_cancel_waiting(th); - - thread_sched_lock(sched, th); - { - if (sched->running == th) { - RUBY_DEBUG_LOG("not waiting yet"); - } - else if (canceled) { - thread_sched_to_ready_common(sched, th, true, false); - } - else { - RUBY_DEBUG_LOG("already not waiting"); - } - } - thread_sched_unlock(sched, th); -} - -static bool timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags, rb_hrtime_t *rel); - -// return true if timed out -static bool -thread_sched_wait_events(struct rb_thread_sched *sched, rb_thread_t *th, int fd, enum thread_sched_waiting_flag events, rb_hrtime_t *rel) -{ - VM_ASSERT(!th_has_dedicated_nt(th)); // on SNT - - volatile bool timedout = false, need_cancel = false; - - if (timer_thread_register_waiting(th, fd, events, rel)) { - RUBY_DEBUG_LOG("wait fd:%d", fd); - - RB_VM_SAVE_MACHINE_CONTEXT(th); - setup_ubf(th, ubf_event_waiting, (void *)th); - - thread_sched_lock(sched, th); - { - if (th->sched.waiting_reason.flags == thread_sched_waiting_none) { - // already awaken - } - else if (RUBY_VM_INTERRUPTED(th->ec)) { - need_cancel = true; - } - else { - RUBY_DEBUG_LOG("sleep"); - - th->status = THREAD_STOPPED_FOREVER; - thread_sched_wakeup_next_thread(sched, th, true); - thread_sched_wait_running_turn(sched, th, true); - - RUBY_DEBUG_LOG("wakeup"); - } - - timedout = th->sched.waiting_reason.data.result == 0; - } - thread_sched_unlock(sched, th); - - if (need_cancel) { - timer_thread_cancel_waiting(th); - } - - setup_ubf(th, NULL, NULL); // TODO: maybe it is already NULL? - - th->status = THREAD_RUNNABLE; - } - else { - RUBY_DEBUG_LOG("can not wait fd:%d", fd); - return false; - } - - VM_ASSERT(sched->running == th); - - return timedout; -} - static void ubf_waiting(void *ptr) { @@ -2086,47 +2009,6 @@ call_thread_start_func_2(rb_thread_t *th) #endif } -static COROUTINE -co_start(struct coroutine_context *from, struct coroutine_context *self) -{ - rb_thread_t *th = (rb_thread_t *)self->argument; - struct rb_thread_sched *sched = TH_SCHED(th); - VM_ASSERT(th->nt != NULL); - VM_ASSERT(th == sched->running); - - // RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); - - thread_sched_add_running_thread(TH_SCHED(th), th); - - thread_sched_unlock(sched, NULL); - { - call_thread_start_func_2(th); - } - thread_sched_lock(sched, NULL); - - RUBY_DEBUG_LOG("terminated th:%d", (int)th->serial); - - // Thread is terminated - - VM_ASSERT(!th_has_dedicated_nt(th)); - - rb_thread_t *next_th = sched->running; - if (next_th && !next_th->nt) { - // switch to the next thread - thread_sched_set_lock_owner(sched, NULL); - rb_ractor_set_current_ec(th->ractor, NULL); - thread_sched_switch(th, next_th); - } - else { - struct rb_native_thread *nt = th->nt; - // switch to the next Ractor - rb_ractor_set_current_ec(th->ractor, NULL); - native_thread_assign(NULL, th); - coroutine_transfer(self, &nt->nt_context); - } - rb_bug("unreachable"); -} - static void * nt_start(void *ptr) { @@ -2197,282 +2079,12 @@ nt_start(void *ptr) return NULL; } -static int -native_thread_check_and_create_shared(rb_vm_t *vm) -{ - bool need_to_make = false; - - rb_native_mutex_lock(&vm->ractor.sched.lock); - { - unsigned int snt_cnt = vm->ractor.sched.snt_cnt; - if (((int)snt_cnt < MINIMUM_SNT) || - (snt_cnt < vm->ractor.cnt && - snt_cnt < vm->ractor.sched.max_proc)) { - - RUBY_DEBUG_LOG("added snt:%u dnt:%u ractor_cnt:%u grq_cnt:%u", - vm->ractor.sched.snt_cnt, - vm->ractor.sched.dnt_cnt, - vm->ractor.cnt, - vm->ractor.sched.grq_cnt); - - vm->ractor.sched.snt_cnt++; - need_to_make = true; - } - else { - RUBY_DEBUG_LOG("snt:%d ractor_cnt:%d", (int)vm->ractor.sched.snt_cnt, (int)vm->ractor.cnt); - } - } - rb_native_mutex_unlock(&vm->ractor.sched.lock); - - if (need_to_make) { - struct rb_native_thread *nt = native_thread_alloc(); - nt->vm = vm; - return native_thread_create0(nt); - } - else { - return 0; - } -} - -#define MSTACK_CHUNK_SIZE (512 * 1024 * 1024) // 512MB -#define MSTACK_PAGE_SIZE 4096 -#define MSTACK_CHUNK_PAGE_NUM (MSTACK_CHUNK_SIZE / MSTACK_PAGE_SIZE - 1) // 1 is start redzone - -// 512MB chunk -// 131,072 pages (> 65,536) -// 0th page is Redzone. Start from 1st page. - -/* - * <--> machine stack + vm stack - * ---------------------------------- - * |HD...|RZ| ... |RZ| ... ... |RZ| - * <------------- 512MB -------------> - */ - -static struct nt_stack_chunk_header { - struct nt_stack_chunk_header *prev_chunk; - struct nt_stack_chunk_header *prev_free_chunk; - - uint16_t start_page; - uint16_t stack_count; - uint16_t uninitialized_stack_count; - - uint16_t free_stack_pos; - uint16_t free_stack[]; -} *nt_stack_chunks = NULL, - *nt_free_stack_chunks = NULL; - -struct nt_machine_stack_footer { - struct nt_stack_chunk_header *ch; - size_t index; -}; - -static rb_nativethread_lock_t nt_machine_stack_lock = RB_NATIVETHREAD_LOCK_INIT; - -#include - -// vm_stack_size + machine_stack_size + 1 * (guard page size) -static inline size_t -nt_therad_stack_size(void) -{ - static size_t msz; - if (LIKELY(msz > 0)) return msz; - - rb_vm_t *vm = GET_VM(); - int sz = (int)(vm->default_params.thread_vm_stack_size + vm->default_params.thread_machine_stack_size + MSTACK_PAGE_SIZE); - int page_num = (sz + MSTACK_PAGE_SIZE - 1) / MSTACK_PAGE_SIZE; - msz = page_num * MSTACK_PAGE_SIZE; - return msz; -} - -static struct nt_stack_chunk_header * -nt_alloc_thread_stack_chunk(void) -{ - const char *m = (void *)mmap(NULL, MSTACK_CHUNK_SIZE, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE | MAP_STACK, -1, 0); - if (m == MAP_FAILED) { - return NULL; - } - - size_t msz = nt_therad_stack_size(); - int header_page_cnt = 1; - int stack_count = ((MSTACK_CHUNK_PAGE_NUM - header_page_cnt) * MSTACK_PAGE_SIZE) / msz; - int ch_size = sizeof(struct nt_stack_chunk_header) + sizeof(uint16_t) * stack_count; - - if (ch_size > MSTACK_PAGE_SIZE * header_page_cnt) { - header_page_cnt = (ch_size + MSTACK_PAGE_SIZE - 1) / MSTACK_PAGE_SIZE; - stack_count = ((MSTACK_CHUNK_PAGE_NUM - header_page_cnt) * MSTACK_PAGE_SIZE) / msz; - } - - VM_ASSERT(stack_count <= UINT16_MAX); - - struct nt_stack_chunk_header *ch = (struct nt_stack_chunk_header *)m; - - ch->start_page = header_page_cnt; - ch->prev_chunk = nt_stack_chunks; - ch->prev_free_chunk = nt_free_stack_chunks; - ch->uninitialized_stack_count = ch->stack_count = (uint16_t)stack_count; - ch->free_stack_pos = 0; - - RUBY_DEBUG_LOG("ch:%p start_page:%d stack_cnt:%d stack_size:%d", ch, (int)ch->start_page, (int)ch->stack_count, (int)msz); - - return ch; -} - -static void * -nt_stack_chunk_get_stack_start(struct nt_stack_chunk_header *ch, size_t idx) -{ - const char *m = (char *)ch; - return (void *)(m + ch->start_page * MSTACK_PAGE_SIZE + idx * nt_therad_stack_size()); -} - -static struct nt_machine_stack_footer * -nt_stack_chunk_get_msf(const rb_vm_t *vm, const char *mstack) -{ - // TODO: stack direction - const size_t msz = vm->default_params.thread_machine_stack_size; - return (struct nt_machine_stack_footer *)&mstack[msz - sizeof(struct nt_machine_stack_footer)]; -} - -static void * -nt_stack_chunk_get_stack(const rb_vm_t *vm, struct nt_stack_chunk_header *ch, size_t idx, void **vm_stack, void **machine_stack) -{ - // TODO: only support stack going down - // [VM ... machine stack ...] - - const char *vstack, *mstack; - const char *guard_page; - vstack = nt_stack_chunk_get_stack_start(ch, idx); - guard_page = vstack + vm->default_params.thread_vm_stack_size; - mstack = guard_page + MSTACK_PAGE_SIZE; - - struct nt_machine_stack_footer *msf = nt_stack_chunk_get_msf(vm, mstack); - msf->ch = ch; - msf->index = idx; - -#if 0 - RUBY_DEBUG_LOG("msf:%p vstack:%p-%p guard_page:%p-%p mstack:%p-%p", msf, - vstack, (void *)(guard_page-1), - guard_page, (void *)(mstack-1), - mstack, (void *)(msf)); -#endif - - *vm_stack = (void *)vstack; - *machine_stack = (void *)mstack; - - return (void *)guard_page; -} - -RBIMPL_ATTR_MAYBE_UNUSED() -static void -nt_stack_chunk_dump(void) -{ - struct nt_stack_chunk_header *ch; - int i; - - fprintf(stderr, "** nt_stack_chunks\n"); - ch = nt_stack_chunks; - for (i=0; ch; i++, ch = ch->prev_chunk) { - fprintf(stderr, "%d %p free_pos:%d\n", i, ch, (int)ch->free_stack_pos); - } - - fprintf(stderr, "** nt_free_stack_chunks\n"); - ch = nt_free_stack_chunks; - for (i=0; ch; i++, ch = ch->prev_free_chunk) { - fprintf(stderr, "%d %p free_pos:%d\n", i, ch, (int)ch->free_stack_pos); - } -} - -static int -nt_guard_page(const char *p, size_t len) -{ - if (mprotect((void *)p, len, PROT_NONE) != -1) { - return 0; - } - else { - return errno; - } -} - -static int -nt_alloc_stack(rb_vm_t *vm, void **vm_stack, void **machine_stack) -{ - int err = 0; - - rb_native_mutex_lock(&nt_machine_stack_lock); - { - retry: - if (nt_free_stack_chunks) { - struct nt_stack_chunk_header *ch = nt_free_stack_chunks; - if (ch->free_stack_pos > 0) { - RUBY_DEBUG_LOG("free_stack_pos:%d", ch->free_stack_pos); - nt_stack_chunk_get_stack(vm, ch, ch->free_stack[--ch->free_stack_pos], vm_stack, machine_stack); - } - else if (ch->uninitialized_stack_count > 0) { - RUBY_DEBUG_LOG("uninitialized_stack_count:%d", ch->uninitialized_stack_count); - - size_t idx = ch->stack_count - ch->uninitialized_stack_count--; - void *guard_page = nt_stack_chunk_get_stack(vm, ch, idx, vm_stack, machine_stack); - err = nt_guard_page(guard_page, MSTACK_PAGE_SIZE); - } - else { - nt_free_stack_chunks = ch->prev_free_chunk; - ch->prev_free_chunk = NULL; - goto retry; - } - } - else { - struct nt_stack_chunk_header *p = nt_alloc_thread_stack_chunk(); - if (p == NULL) { - err = errno; - } - else { - nt_free_stack_chunks = nt_stack_chunks = p; - goto retry; - } - } - } - rb_native_mutex_unlock(&nt_machine_stack_lock); - - return err; -} - -static void -nt_free_stack(void *mstack) -{ - if (!mstack) return; - - rb_native_mutex_lock(&nt_machine_stack_lock); - { - struct nt_machine_stack_footer *msf = nt_stack_chunk_get_msf(GET_VM(), mstack); - struct nt_stack_chunk_header *ch = msf->ch; - int idx = (int)msf->index; - void *stack = nt_stack_chunk_get_stack_start(ch, idx); - - RUBY_DEBUG_LOG("stack:%p mstack:%p ch:%p index:%d", stack, mstack, ch, idx); - - if (ch->prev_free_chunk == NULL) { - ch->prev_free_chunk = nt_free_stack_chunks; - nt_free_stack_chunks = ch; - } - ch->free_stack[ch->free_stack_pos++] = idx; - - // clear the stack pages -#if defined(MADV_FREE) - int r = madvise(stack, nt_therad_stack_size(), MADV_FREE); -#elif defined(MADV_DONTNEED) - int r = madvise(stack, nt_therad_stack_size(), MADV_DONTNEED); -#else - int r = 0; -#endif - - if (r != 0) rb_bug("madvise errno:%d", errno); - } - rb_native_mutex_unlock(&nt_machine_stack_lock); -} +static int native_thread_create_shared(rb_thread_t *th); void rb_threadptr_sched_free(rb_thread_t *th) { +#if USE_MN_THREADS if (th->nt && th->ec && th_has_dedicated_nt(th)) { VM_ASSERT(th_has_dedicated_nt(th)); ruby_xfree(th->sched.context_stack); @@ -2480,37 +2092,9 @@ rb_threadptr_sched_free(rb_thread_t *th) else { nt_free_stack(th->sched.context_stack); } -} - -static int -native_thread_create_shared(rb_thread_t *th) -{ - // setup coroutine - rb_vm_t *vm = th->vm; - void *vm_stack = NULL, *machine_stack = NULL; - int err = nt_alloc_stack(vm, &vm_stack, &machine_stack); - if (err) return err; - - VM_ASSERT(vm_stack < machine_stack); - - // setup vm stack - size_t vm_stack_words = th->vm->default_params.thread_vm_stack_size/sizeof(VALUE); - rb_ec_initialize_vm_stack(th->ec, vm_stack, vm_stack_words); - - // setup machine stack - size_t machine_stack_size = vm->default_params.thread_machine_stack_size - sizeof(struct nt_machine_stack_footer); - coroutine_initialize(&th->sched.context, co_start, machine_stack, machine_stack_size); - th->ec->machine.stack_start = (void *)((uintptr_t)machine_stack + machine_stack_size); - th->ec->machine.stack_maxsize = machine_stack_size; // TODO - - th->sched.context_stack = machine_stack; - th->sched.context.argument = th; - - RUBY_DEBUG_LOG("th:%u vm_stack:%p machine_stack:%p", rb_th_serial(th), vm_stack, machine_stack); - thread_sched_to_ready(TH_SCHED(th), th); - - // setup nt - return native_thread_check_and_create_shared(th->vm); +#else + ruby_xfree(th->sched.context_stack); +#endif } static int @@ -2951,7 +2535,12 @@ static struct { pthread_t pthread_id; int comm_fds[2]; // r, w + +#if HAVE_EPOLL +#define EPOLL_EVENTS_MAX 0x10 int epoll_fd; + struct epoll_event finished_events[EPOLL_EVENTS_MAX]; +#endif // waiting threads list struct ccan_list_head waiting; // waiting threads in ractors @@ -2962,6 +2551,12 @@ static struct { #define TIMER_THREAD_CREATED_P() (timer_th.created_fork_gen == current_fork_gen) +static void timer_thread_check_timeslice(rb_vm_t *vm); +static int timer_thread_set_timeout(rb_vm_t *vm); +static void timer_thread_wakeup_thread(rb_thread_t *th); + +#include "thread_pthread_mn.c" + static int timer_thread_set_timeout(rb_vm_t *vm) { @@ -2988,9 +2583,6 @@ timer_thread_set_timeout(rb_vm_t *vm) else { vm->ractor.sched.timeslice_wait_inf = true; } - - // TODO: check NT shortage - // TODO: timeout events } rb_native_mutex_unlock(&vm->ractor.sched.lock); @@ -3018,33 +2610,6 @@ timer_thread_set_timeout(rb_vm_t *vm) #endif } -static void timer_thread_unregister_waiting(rb_thread_t *th, int fd); - -static bool -timer_thread_cancel_waiting(rb_thread_t *th) -{ - bool canceled = false; - - if (th->sched.waiting_reason.flags) { - rb_native_mutex_lock(&timer_th.waiting_lock); - { - if (th->sched.waiting_reason.flags) { - canceled = true; - ccan_list_del_init(&th->sched.waiting_reason.node); - if (th->sched.waiting_reason.flags & (thread_sched_waiting_io_read | thread_sched_waiting_io_write)) { - timer_thread_unregister_waiting(th, th->sched.waiting_reason.data.fd); - } - th->sched.waiting_reason.flags = thread_sched_waiting_none; - } - } - rb_native_mutex_unlock(&timer_th.waiting_lock); - } - - return canceled; -} - -#include - static void timer_thread_check_signal(rb_vm_t *vm) { @@ -3128,6 +2693,17 @@ timer_thread_check_timeout(rb_vm_t *vm) rb_native_mutex_unlock(&timer_th.waiting_lock); } +static void +timer_thread_check_timeslice(rb_vm_t *vm) +{ + // TODO: check time + rb_thread_t *th; + ccan_list_for_each(&vm->ractor.sched.timeslice_threads, th, sched.node.timeslice_threads) { + RUBY_DEBUG_LOG("timeslice th:%u", rb_th_serial(th)); + RUBY_VM_SET_TIMER_INTERRUPT(th->ec); + } +} + void rb_assert_sig(void) { @@ -3141,20 +2717,6 @@ rb_assert_sig(void) } } -/* - * The purpose of the timer thread: - * - * (1) Periodic checking - * (1-1) Provide time slice for active NTs - * (1-2) Check NT shortage - * (1-3) Periodic UBF - * (1-4) Lazy GRQ deq start - * (2) Receive notification - * (2-1) async I/O termination - * (2-2) timeout - * (2-2-1) sleep(n) - * (2-2-2) timeout(n), I/O, ... - */ static void * timer_thread_func(void *ptr) { @@ -3166,102 +2728,11 @@ timer_thread_func(void *ptr) RUBY_DEBUG_LOG("started%s", ""); while (system_working) { -#define EPOLL_EVENTS_MAX 0x10 - int r; - struct epoll_event finished_events[EPOLL_EVENTS_MAX]; - timer_thread_check_signal(vm); - timer_thread_check_timeout(vm); // (2-2) - - r = epoll_wait(timer_th.epoll_fd, finished_events, EPOLL_EVENTS_MAX, timer_thread_set_timeout(vm)); - RUBY_DEBUG_LOG("r:%d", r); - - switch (r) { - case 0: // timeout - RUBY_DEBUG_LOG("timeout%s", ""); - - rb_native_mutex_lock(&vm->ractor.sched.lock); - { - // (1-1) timeslice - - rb_thread_t *th; - ccan_list_for_each(&vm->ractor.sched.timeslice_threads, th, sched.node.timeslice_threads) { - RUBY_DEBUG_LOG("timeslice th:%u", rb_th_serial(th)); - RUBY_VM_SET_TIMER_INTERRUPT(th->ec); - } - - // (1-4) lazy grq deq - if (vm->ractor.sched.grq_cnt > 0) { - RUBY_DEBUG_LOG("GRQ cnt: %u", vm->ractor.sched.grq_cnt); - rb_native_cond_signal(&vm->ractor.sched.cond); - } - } - rb_native_mutex_unlock(&vm->ractor.sched.lock); - - // (1-2) - native_thread_check_and_create_shared(vm); + timer_thread_check_timeout(vm); + ubf_wakeup_all_threads(); - // (1-3) - ubf_wakeup_all_threads(); - - break; - - case -1: - switch (errno) { - case EINTR: - // simply retry - break; - default: - perror("epoll_wait"); - rb_bug("epoll_wait errno:%d", errno); - } - break; - - default: - RUBY_DEBUG_LOG("%d event(s)", r); - - for (int i=0; isched.waiting_reason.flags) { - // delete from chain - ccan_list_del_init(&th->sched.waiting_reason.node); - timer_thread_unregister_waiting(th, th->sched.waiting_reason.data.fd); - - th->sched.waiting_reason.flags = thread_sched_waiting_none; - th->sched.waiting_reason.data.fd = -1; - th->sched.waiting_reason.data.result = (int)events; - - timer_thread_wakeup_thread(th); - } - else { - // already released - } - } - rb_native_mutex_unlock(&timer_th.waiting_lock); - } - } - } + timer_thread_polling(vm); } RUBY_DEBUG_LOG("terminated"); @@ -3338,196 +2809,6 @@ timer_thread_wakeup(void) rb_native_mutex_unlock(&vm->ractor.sched.lock); } -static bool -fd_readable_nonblock(int fd) -{ - struct pollfd pfd = { - .fd = fd, - .events = POLLIN, - }; - return poll(&pfd, 1, 0) != 0; -} - -static bool -fd_writable_nonblock(int fd) -{ - struct pollfd pfd = { - .fd = fd, - .events = POLLOUT, - }; - return poll(&pfd, 1, 0) != 0; -} - -static void -verify_waiting_list(void) -{ -#if VM_CHECK_MODE > 0 - rb_thread_t *wth, *prev_wth = NULL; - ccan_list_for_each(&timer_th.waiting, wth, sched.waiting_reason.node) { - // fprintf(stderr, "verify_waiting_list th:%u abs:%lu\n", rb_th_serial(wth), (unsigned long)wth->sched.waiting_reason.data.timeout); - if (prev_wth) { - rb_hrtime_t timeout = wth->sched.waiting_reason.data.timeout; - rb_hrtime_t prev_timeout = prev_wth->sched.waiting_reason.data.timeout; - VM_ASSERT(timeout == 0 || prev_timeout <= timeout); - } - prev_wth = wth; - } -#endif -} - -// return false if the fd is not waitable or not need to wait. -static bool -timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags, rb_hrtime_t *rel) -{ - RUBY_DEBUG_LOG("th:%u fd:%d flag:%d rel:%lu", rb_th_serial(th), fd, flags, rel ? (unsigned long)*rel : 0); - - VM_ASSERT(th == NULL || TH_SCHED(th)->running == th); - VM_ASSERT(flags != 0); - - rb_hrtime_t abs = 0; // 0 means no timeout - - // epoll case - - if (rel) { - if (*rel > 0) { - flags |= thread_sched_waiting_timeout; - } - else { - return false; - } - } - - if (rel && *rel > 0) { - flags |= thread_sched_waiting_timeout; - } - - __uint32_t epoll_events = 0; - if (flags & thread_sched_waiting_timeout) { - VM_ASSERT(rel != NULL); - abs = rb_hrtime_add(rb_hrtime_now(), *rel); - } - - if (flags & thread_sched_waiting_io_read) { - if (fd_readable_nonblock(fd)) { - return false; - } - else { - VM_ASSERT(fd >= 0); - epoll_events |= EPOLLIN; - } - } - - if (flags & thread_sched_waiting_io_write) { - if (fd_writable_nonblock(fd)) { - return false; - } - else { - VM_ASSERT(fd >= 0); - epoll_events |= EPOLLOUT; - } - } - - rb_native_mutex_lock(&timer_th.waiting_lock); - { - if (epoll_events) { - struct epoll_event event = { - .events = epoll_events, - .data = { - .ptr = (void *)th, - }, - }; - if (epoll_ctl(timer_th.epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) { - RUBY_DEBUG_LOG("failed (%d)", errno); - - switch (errno) { - case EBADF: - // the fd is closed? - case EPERM: - // the fd doesn't support epoll - case EEXIST: - // the fd is already registerred by another thread - rb_native_mutex_unlock(&timer_th.waiting_lock); - return false; - default: - perror("epoll_ctl"); - rb_bug("register/epoll_ctl failed(fd:%d, errno:%d)", fd, errno); - } - } - RUBY_DEBUG_LOG("epoll_ctl(add, fd:%d, events:%d) success", fd, epoll_events); - } - - if (th) { - VM_ASSERT(th->sched.waiting_reason.flags == thread_sched_waiting_none); - - // setup waiting information - { - th->sched.waiting_reason.flags = flags; - th->sched.waiting_reason.data.timeout = abs; - th->sched.waiting_reason.data.fd = fd; - th->sched.waiting_reason.data.result = 0; - } - - if (abs == 0) { // no timeout - VM_ASSERT(!(flags & thread_sched_waiting_timeout)); - ccan_list_add_tail(&timer_th.waiting, &th->sched.waiting_reason.node); - } - else { - RUBY_DEBUG_LOG("abs:%lu", abs); - VM_ASSERT(flags & thread_sched_waiting_timeout); - - // insert th to sorted list (TODO: O(n)) - rb_thread_t *wth, *prev_wth = NULL; - - ccan_list_for_each(&timer_th.waiting, wth, sched.waiting_reason.node) { - if ((wth->sched.waiting_reason.flags & thread_sched_waiting_timeout) && - wth->sched.waiting_reason.data.timeout < abs) { - prev_wth = wth; - } - else { - break; - } - } - - if (prev_wth) { - ccan_list_add_after(&timer_th.waiting, &prev_wth->sched.waiting_reason.node, &th->sched.waiting_reason.node); - } - else { - ccan_list_add(&timer_th.waiting, &th->sched.waiting_reason.node); - } - - verify_waiting_list(); - - // update timeout seconds - timer_thread_wakeup(); - } - } - else { - VM_ASSERT(abs == 0); - } - } - rb_native_mutex_unlock(&timer_th.waiting_lock); - - return true; -} - -static void -timer_thread_unregister_waiting(rb_thread_t *th, int fd) -{ - RUBY_DEBUG_LOG("th:%u fd:%d", rb_th_serial(th), fd); - - // Linux 2.6.9 or later is needed to pass NULL as data. - if (epoll_ctl(timer_th.epoll_fd, EPOLL_CTL_DEL, fd, NULL) == -1) { - switch (errno) { - case EBADF: - // just ignore. maybe fd is closed. - break; - default: - perror("epoll_ctl"); - rb_bug("unregister/epoll_ctl fails. errno:%d", errno); - } - } -} - static void rb_thread_create_timer_thread(void) { @@ -3542,7 +2823,9 @@ rb_thread_create_timer_thread(void) RUBY_DEBUG_LOG("forked child process"); CLOSE_INVALIDATE_PAIR(timer_th.comm_fds); +#if HAVE_EPOLL close_invalidate(&timer_th.epoll_fd, "close epoll_fd"); +#endif rb_native_mutex_destroy(&timer_th.waiting_lock); } @@ -3553,8 +2836,7 @@ rb_thread_create_timer_thread(void) setup_communication_pipe_internal(timer_th.comm_fds); // open epoll fd - if ((timer_th.epoll_fd = epoll_create1(EPOLL_CLOEXEC)) == -1) rb_bug("epoll_create (errno:%d)", errno); - timer_thread_register_waiting(NULL, timer_th.comm_fds[0], thread_sched_waiting_io_read, NULL); + timer_thread_setup_nm(); } pthread_create(&timer_th.pthread_id, NULL, timer_thread_func, GET_VM()); @@ -3632,8 +2914,11 @@ rb_reserved_fd_p(int fd) if (fd < 0) return 0; if (fd == timer_th.comm_fds[0] || - fd == timer_th.comm_fds[1] || - fd == timer_th.epoll_fd) { + fd == timer_th.comm_fds[1] +#if HAVE_EPOLL + || fd == timer_th.epoll_fd +#endif + ) { goto check_fork_gen; } return 0;