From 387ae7e2aaccd2463c93b4ff9c75867405daaab0 Mon Sep 17 00:00:00 2001 From: RobotMan2412 Date: Fri, 23 Aug 2024 18:01:09 +0200 Subject: [PATCH] Scheduler on two CPUs with load balancing --- files/sbin/init/main.c | 3 +- kernel/cpu/riscv/src/scheduler.c | 16 ++ kernel/include/badgelib/time.h | 1 + kernel/include/process/types.h | 2 + kernel/include/scheduler/scheduler.h | 13 ++ kernel/include/scheduler/types.h | 26 +++- kernel/src/main.c | 2 +- kernel/src/scheduler/scheduler.c | 212 +++++++++++++++++++++++---- 8 files changed, 239 insertions(+), 36 deletions(-) diff --git a/files/sbin/init/main.c b/files/sbin/init/main.c index 684ccca..65d52ca 100644 --- a/files/sbin/init/main.c +++ b/files/sbin/init/main.c @@ -24,6 +24,7 @@ int main() { badge_err_t ec = {0}; print("Hi, Ther.\n"); - syscall_sys_shutdown(true); + syscall_sys_shutdown(false); + return 0; } diff --git a/kernel/cpu/riscv/src/scheduler.c b/kernel/cpu/riscv/src/scheduler.c index f1caccb..e678987 100644 --- a/kernel/cpu/riscv/src/scheduler.c +++ b/kernel/cpu/riscv/src/scheduler.c @@ -44,6 +44,14 @@ void sched_raise_from_isr(sched_thread_t *thread, bool syscall, void *entry_poin thread->kernel_isr_ctx.regs.a7 = thread->user_isr_ctx.regs.a7; } + // Do time accounting. + timestamp_us_t now = time_us(); + sched_cpulocal_t *info = isr_ctx_get()->cpulocal->sched; + timestamp_us_t used = now - info->last_preempt; + thread->timeusage.cycle_time += used; + thread->timeusage.user_time += used; + info->last_preempt = now; + // Set context switch target to kernel thread. isr_ctx_switch_set(&thread->kernel_isr_ctx); } @@ -56,6 +64,14 @@ void sched_lower_from_isr() { assert_dev_drop(!(thread->flags & THREAD_KERNEL) && (thread->flags & THREAD_PRIVILEGED)); atomic_fetch_and(&thread->flags, ~THREAD_PRIVILEGED); + // Do time accounting. + timestamp_us_t now = time_us(); + sched_cpulocal_t *info = isr_ctx_get()->cpulocal->sched; + timestamp_us_t used = now - info->last_preempt; + thread->timeusage.cycle_time += used; + thread->timeusage.kernel_time += used; + info->last_preempt = now; + // Set context switch target to user thread. isr_ctx_switch_set(&thread->user_isr_ctx); assert_dev_drop(!(thread->user_isr_ctx.flags & ISR_CTX_FLAG_KERNEL)); diff --git a/kernel/include/badgelib/time.h b/kernel/include/badgelib/time.h index 747d85b..07b2ed5 100644 --- a/kernel/include/badgelib/time.h +++ b/kernel/include/badgelib/time.h @@ -6,6 +6,7 @@ #include "attributes.h" #include "port/hardware_allocation.h" +#include #include #include #include diff --git a/kernel/include/process/types.h b/kernel/include/process/types.h index e1cc80f..e1bc7f2 100644 --- a/kernel/include/process/types.h +++ b/kernel/include/process/types.h @@ -107,4 +107,6 @@ typedef struct process_t { size_t sighandlers[SIG_COUNT]; // Exit code if applicable. int state_code; + // Total time usage. + timeusage_t timeusage; } process_t; diff --git a/kernel/include/scheduler/scheduler.h b/kernel/include/scheduler/scheduler.h index 923829b..5c4eaf5 100644 --- a/kernel/include/scheduler/scheduler.h +++ b/kernel/include/scheduler/scheduler.h @@ -5,6 +5,7 @@ #include "attributes.h" #include "badge_err.h" +#include "time.h" #include #include @@ -21,6 +22,18 @@ typedef int (*sched_entry_t)(void *arg); // CPU-local scheduler data. typedef struct sched_cpulocal_t sched_cpulocal_t; +// Time usage information. +typedef struct { + // Time spent running in user-space. + timestamp_us_t user_time; + // Time spent running in kernel-space. + timestamp_us_t kernel_time; + // Total time usage since last load measurement. + timestamp_us_t cycle_time; + // Current number of CPUs used in 0.01% units. + atomic_int cpu_usage; +} timeusage_t; + // will be scheduled with smaller time slices than normal #define SCHED_PRIO_LOW 0 // default value diff --git a/kernel/include/scheduler/types.h b/kernel/include/scheduler/types.h index 647a15f..97b6ce6 100644 --- a/kernel/include/scheduler/types.h +++ b/kernel/include/scheduler/types.h @@ -18,9 +18,11 @@ // The minimum time a thread will run. `SCHED_PRIO_LOW` maps to this. -#define SCHED_MIN_US 5000 +#define SCHED_MIN_US 5000 // The time quota increment per increased priority. -#define SCHED_INC_US 500 +#define SCHED_INC_US 500 +// The interval on which schedulers measure CPU load. +#define SCHED_LOAD_INTERVAL 250000 @@ -56,13 +58,15 @@ struct sched_thread_t { dlist_node_t node; // Process to which this thread belongs. - process_t *process; + process_t *process; // Lowest address of the kernel stack. - size_t kernel_stack_bottom; + size_t kernel_stack_bottom; // Highest address of the kernel stack. - size_t kernel_stack_top; + size_t kernel_stack_top; // Priority of this thread. - int priority; + int priority; + // Time usage information. + timeusage_t timeusage; // Thread flags. atomic_int flags; @@ -92,8 +96,14 @@ struct sched_cpulocal_t { dlist_t queue; // CPU-local scheduler state flags. atomic_int flags; - // CPU load estimate in 0.01% increments. - atomic_int load; + // Last preemption time. + timestamp_us_t last_preempt; + // Time until next measurement interval. + timestamp_us_t load_measure_time; + // CPU load average in 0.01% increments. + atomic_int load_average; + // CPU load estimate for load-balancing purposes. + atomic_int load_estimate; // Idle thread. sched_thread_t idle_thread; }; diff --git a/kernel/src/main.c b/kernel/src/main.c index f1f9250..e6d358c 100644 --- a/kernel/src/main.c +++ b/kernel/src/main.c @@ -49,7 +49,7 @@ static void kernel_lifetime_func() { // Start the kernel services. kernel_init(); // Start other CPUs. - // sched_start_altcpus(); + sched_start_altcpus(); // Start userland. userland_init(); diff --git a/kernel/src/scheduler/scheduler.c b/kernel/src/scheduler/scheduler.c index ce81284..21e8693 100644 --- a/kernel/src/scheduler/scheduler.c +++ b/kernel/src/scheduler/scheduler.c @@ -20,6 +20,10 @@ +// Number of CPUs with running schedulers. +static atomic_int running_sched_count; +// Number of CPUs ready to perform a load balance. +static atomic_int loadbalance_ready_count; // CPU-local scheduler structs. static sched_cpulocal_t *cpu_ctx; // Threads list mutex. @@ -52,17 +56,8 @@ static sched_thread_t *find_thread(tid_t tid) { -// Idle function ran when a CPU has no threads. -static void idle_func(void *arg) { - (void)arg; - while (1) { - isr_pause(); - sched_yield(); - } -} - // Set the context switch to a certain thread. -static inline void set_switch(sched_thread_t *thread) { +static inline void set_switch(sched_cpulocal_t *info, sched_thread_t *thread) { int pflags = thread->process ? atomic_load(&thread->process->flags) : 0; int tflags = atomic_load(&thread->flags); @@ -79,42 +74,79 @@ static inline void set_switch(sched_thread_t *thread) { // Set preemption timer. timestamp_us_t timeout = SCHED_MIN_US + SCHED_INC_US * thread->priority; - time_set_next_task_switch(time_us() + timeout); + if (timeout > info->load_measure_time) { + timeout = info->load_measure_time; + } + timestamp_us_t now = time_us(); + info->last_preempt = now; + time_set_next_task_switch(now + timeout); } // Try to hand a thread off to another CPU. -static bool thread_handoff(sched_thread_t *thread, int cpu, bool force) { +static bool thread_handoff(sched_thread_t *thread, int cpu, bool force, int max_load) { sched_cpulocal_t *info = cpu_ctx + cpu; assert_dev_keep(mutex_acquire_shared_from_isr(NULL, &info->run_mtx, TIMESTAMP_US_MAX)); - int flags = atomic_load(&info->flags); - if (force || ((flags & SCHED_RUNNING) && !(flags & SCHED_EXITING))) { + + int flags = atomic_load(&info->flags); + bool is_running = (flags & SCHED_RUNNING) && !(flags & SCHED_EXITING); + if (!force && !is_running) { + return false; + } + int usage = atomic_load(&thread->timeusage.cpu_usage); + bool has_space = true; + + if (force) { + // Force handoff; always add to load estimate. + atomic_fetch_add_explicit(&info->load_estimate, usage, memory_order_relaxed); + } else { + // Normal handoff; try to claim space on this CPU for this thread. + int cur = atomic_load_explicit(&info->load_estimate, memory_order_relaxed); + int next; + do { + next = cur + usage; + if (next > max_load) { + has_space = false; + break; + } + } while (!atomic_compare_exchange_strong_explicit( + &info->load_estimate, + &cur, + next, + memory_order_relaxed, + memory_order_relaxed + )); + } + + if (force || has_space) { + // Scheduler is running and has capacity for this thread. assert_dev_keep(mutex_acquire_from_isr(NULL, &info->incoming_mtx, TIMESTAMP_US_MAX)); dlist_append(&info->incoming, &thread->node); assert_dev_keep(mutex_release_from_isr(NULL, &info->incoming_mtx)); } + assert_dev_keep(mutex_release_shared_from_isr(NULL, &info->run_mtx)); return (flags & SCHED_RUNNING) && !(flags & SCHED_EXITING); } -// Requests the scheduler to prepare a switch from inside an interrupt routine. -void sched_request_switch_from_isr() { - int cur_cpu = smp_cur_cpu(); - sched_cpulocal_t *info = cpu_ctx + cur_cpu; +// Handle non-normal scheduler flags. +static void sw_handle_sched_flags(timestamp_us_t now, int cur_cpu, sched_cpulocal_t *info, int sched_fl) { + (void)now; - // Check the exiting flag. - int sched_fl = atomic_load(&info->flags); if (!(sched_fl & (SCHED_RUNNING | SCHED_STARTING))) { // Mark as starting in the first cycle. atomic_fetch_or(&info->flags, SCHED_STARTING); } else if (sched_fl & SCHED_STARTING) { // Mark as running afterwards so CPU0 can free the stack. - atomic_fetch_or(&info->flags, SCHED_RUNNING); - atomic_fetch_and(&info->flags, ~SCHED_STARTING); + atomic_fetch_xor_explicit(&info->flags, SCHED_RUNNING | SCHED_STARTING, memory_order_relaxed); + atomic_fetch_add_explicit(&running_sched_count, 1, memory_order_relaxed); } else if (sched_fl & SCHED_EXITING) { // Exit the scheduler on this CPU. assert_dev_keep(mutex_acquire_from_isr(NULL, &info->run_mtx, TIMESTAMP_US_MAX)); + atomic_store_explicit(&info->load_average, 0, memory_order_relaxed); + atomic_store_explicit(&info->load_estimate, 0, memory_order_relaxed); + atomic_fetch_sub_explicit(&running_sched_count, 1, memory_order_relaxed); atomic_fetch_and(&info->flags, ~(SCHED_RUNNING | SCHED_EXITING)); // Hand all threads over to other CPUs. @@ -124,13 +156,126 @@ void sched_request_switch_from_isr() { sched_thread_t *thread = (void *)dlist_pop_front(&info->queue); do { cpu = (cpu + 1) % smp_count; - } while (cpu == cur_cpu || !thread_handoff(thread, cpu, false)); + } while (cpu == cur_cpu || !thread_handoff(thread, cpu, false, __INT_MAX__)); } assert_dev_keep(mutex_release_from_isr(NULL, &info->run_mtx)); // Power off this CPU. assert_dev_keep(smp_poweroff()); } +} + +// Measure load on this CPU. +static void sw_measure_load(timestamp_us_t now, int cur_cpu, sched_cpulocal_t *info) { + (void)now; + (void)cur_cpu; + + // Measure time usage. + timestamp_us_t used_time = 0; + sched_thread_t *thread = (sched_thread_t *)info->queue.head; + while (thread) { + used_time += thread->timeusage.cycle_time; + thread = (sched_thread_t *)thread->node.next; + } + + timestamp_us_t idle_time = info->idle_thread.timeusage.cycle_time; + info->idle_thread.timeusage.cycle_time = 0; + timestamp_us_t total_time = used_time + idle_time; + + // Account per-thread CPU usage. + int total_load = 0; + thread = (sched_thread_t *)info->queue.head; + while (thread) { + timestamp_us_t cpu_time = thread->timeusage.cycle_time; + thread->timeusage.cycle_time = 0; + int cpu_permil = cpu_time * 10000 / total_time; + total_load += cpu_permil; + atomic_store(&thread->timeusage.cpu_usage, cpu_permil); + thread = (sched_thread_t *)thread->node.next; + } + + info->load_average = total_load; + info->load_estimate = total_load; +} + +// Perform load balancing. +static void sw_handle_loadbalance(timestamp_us_t now, int cur_cpu, sched_cpulocal_t *info) { + (void)now; + + // Wait for all CPUs to have measured their respective load. + atomic_fetch_add(&loadbalance_ready_count, 1); + int min = atomic_load(&running_sched_count); + while (atomic_load(&loadbalance_ready_count) < min) { + isr_pause(); + } + + // Measure global load average. + int global_load_average = 0; + for (int i = 0; i < smp_count; i++) { + global_load_average += cpu_ctx[i].load_average; + } + global_load_average /= atomic_load(&running_sched_count); + + // If this CPU started with less than the load average, don't hand off any threads. + if (info->load_average <= global_load_average) { + atomic_fetch_sub(&loadbalance_ready_count, 1); + return; + } + + // Hand off threads until either all CPUs expect to meet the load average, or this one dips below. + sched_thread_t *thread; + for (size_t i = 0; i < info->queue.len; i++) { + thread = (sched_thread_t *)dlist_pop_front(&info->queue); + bool handoff_ok = false; + for (int cpu = 0; cpu < smp_count; cpu++) { + if (cpu == cur_cpu) + continue; + if (thread_handoff(thread, cpu, false, global_load_average)) { + handoff_ok = true; + break; + } + } + if (!handoff_ok) { + dlist_append(&info->queue, &thread->node); + } + } + atomic_fetch_sub(&loadbalance_ready_count, 1); +} + +// Requests the scheduler to prepare a switch from inside an interrupt routine. +void sched_request_switch_from_isr() { + timestamp_us_t now = time_us(); + int cur_cpu = smp_cur_cpu(); + sched_cpulocal_t *info = cpu_ctx + cur_cpu; + + // Check the exiting flag. + int sched_fl = atomic_load(&info->flags); + if (sched_fl != SCHED_RUNNING) { + sw_handle_sched_flags(now, cur_cpu, info, sched_fl); + } + + // Account thread time usage. + sched_thread_t *cur_thread = sched_current_thread_unsafe(); + if (cur_thread) { + timestamp_us_t used = now - info->last_preempt; + cur_thread->timeusage.cycle_time += used; + if (cur_thread->flags & THREAD_PRIVILEGED) { + cur_thread->timeusage.kernel_time += used; + } else { + cur_thread->timeusage.user_time += used; + } + } + + // Check for load measurement tiemr. + if (now >= info->load_measure_time) { + // Measure load on this CPU. + sw_measure_load(now, cur_cpu, info); + // Balance load with other running CPUs. + sw_handle_loadbalance(now, cur_cpu, info); + + // Set next timestamp to measure load average. + info->load_measure_time = now + SCHED_LOAD_INTERVAL - (now % SCHED_LOAD_INTERVAL); + } // Check for incoming threads. assert_dev_keep(mutex_acquire_from_isr(NULL, &info->incoming_mtx, TIMESTAMP_US_MAX)); @@ -173,13 +318,13 @@ void sched_request_switch_from_isr() { // Runnable thread found; perform context switch. assert_dev_drop(flags & THREAD_RUNNING); dlist_append(&info->queue, &thread->node); - set_switch(thread); + set_switch(info, thread); return; } } // If nothing is running on this CPU, run the idle thread. - set_switch(&info->idle_thread); + set_switch(info, &info->idle_thread); } // Scheduler housekeeping. @@ -222,6 +367,15 @@ static void sched_housekeeping(int taskno, void *arg) { +// Idle function ran when a CPU has no threads. +static void idle_func(void *arg) { + (void)arg; + while (1) { + isr_pause(); + sched_yield(); + } +} + // Global scheduler initialization. void sched_init() { cpu_ctx = malloc(smp_count * sizeof(sched_cpulocal_t)); @@ -271,11 +425,17 @@ bool sched_start_on(int cpu) { // Start executing the scheduler on this CPU. void sched_exec() { + timestamp_us_t now = time_us(); + // Allocate CPU-local scheduler data. sched_cpulocal_t *info = cpu_ctx + smp_cur_cpu(); isr_ctx_get()->cpulocal->sched = info; logkf_from_isr(LOG_DEBUG, "Starting scheduler on CPU%{d}", smp_cur_cpu()); + // Set next timestamp to measure load average. + info->load_average = 0; + info->load_estimate = 0; + info->load_measure_time = now + SCHED_LOAD_INTERVAL - (now % SCHED_LOAD_INTERVAL); atomic_store_explicit(&info->flags, 0, memory_order_release); // Start handed over threads or idle until one is handed over to this CPU. @@ -463,7 +623,7 @@ static void thread_resume_impl(badge_err_t *ec, tid_t tid, bool now) { if (thread) { if (thread_try_mark_running(thread, now)) { irq_disable(); - thread_handoff(thread, smp_cur_cpu(), true); + thread_handoff(thread, smp_cur_cpu(), true, __INT_MAX__); irq_enable(); } badge_err_set_ok(ec);