Skip to content

Commit

Permalink
misc: introduce uv_get|set_threadpool_size
Browse files Browse the repository at this point in the history
This patch adds the posibility to modify the threadpool size at runtime

Fixes: libuv#4401
Signed-off-by: Juan José Arboleda <soyjuanarbol@gmail.com>
  • Loading branch information
juanarbol committed Jan 12, 2025
1 parent e59e2a9 commit 8fb28b0
Show file tree
Hide file tree
Showing 7 changed files with 236 additions and 7 deletions.
13 changes: 7 additions & 6 deletions .github/workflows/CI-win.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ on:
push:
branches:
- v[0-9].*
- juan/tp
- master

jobs:
Expand All @@ -21,13 +22,13 @@ jobs:
fail-fast: false
matrix:
config:
- {toolchain: Visual Studio 16 2019, arch: Win32, server: 2019}
- {toolchain: Visual Studio 16 2019, arch: x64, server: 2019}
- {toolchain: Visual Studio 17 2022, arch: Win32, server: 2022}
- {toolchain: Visual Studio 17 2022, arch: x64, server: 2022}
# - {toolchain: Visual Studio 16 2019, arch: Win32, server: 2019}
# - {toolchain: Visual Studio 16 2019, arch: x64, server: 2019}
# - {toolchain: Visual Studio 17 2022, arch: Win32, server: 2022}
# - {toolchain: Visual Studio 17 2022, arch: x64, server: 2022}
- {toolchain: Visual Studio 17 2022, arch: x64, server: 2022, config: ASAN}
- {toolchain: Visual Studio 17 2022, arch: x64, server: 2022, config: UBSAN}
- {toolchain: Visual Studio 17 2022, arch: arm64, server: 2022}
# - {toolchain: Visual Studio 17 2022, arch: x64, server: 2022, config: UBSAN}
# - {toolchain: Visual Studio 17 2022, arch: arm64, server: 2022}
steps:
- uses: actions/checkout@v4
- name: Build
Expand Down
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,7 @@ if(LIBUV_BUILD_TESTS)
test/test-thread-name.c
test/test-thread-priority.c
test/test-threadpool-cancel.c
test/test-threadpool-size.c
test/test-threadpool.c
test/test-timer-again.c
test/test-timer-from-check.c
Expand Down
1 change: 1 addition & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ test_run_tests_SOURCES = test/blackhole-server.c \
test/test-thread-name.c \
test/test-thread-priority.c \
test/test-threadpool-cancel.c \
test/test-threadpool-size.c \
test/test-threadpool.c \
test/test-timer-again.c \
test/test-timer-from-check.c \
Expand Down
3 changes: 3 additions & 0 deletions include/uv.h
Original file line number Diff line number Diff line change
Expand Up @@ -1337,6 +1337,9 @@ enum {
UV_EXTERN int uv_thread_getpriority(uv_thread_t tid, int* priority);
UV_EXTERN int uv_thread_setpriority(uv_thread_t tid, int priority);

UV_EXTERN unsigned int uv_get_threadpool_size(void);
UV_EXTERN unsigned int uv_set_threadpool_size(unsigned int n);

UV_EXTERN unsigned int uv_available_parallelism(void);
UV_EXTERN int uv_cpu_info(uv_cpu_info_t** cpu_infos, int* count);
UV_EXTERN void uv_free_cpu_info(uv_cpu_info_t* cpu_infos, int count);
Expand Down
178 changes: 177 additions & 1 deletion src/threadpool.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,20 @@

static uv_once_t once = UV_ONCE_INIT;
static uv_cond_t cond;
static uv_cond_t cond_kill;
static uv_mutex_t mutex;
static unsigned int idle_threads;
static unsigned int slow_io_work_running;
static unsigned int nthreads;
static uv_thread_t* threads;
static uv_thread_t default_threads[4];
static uv_thread_t tid_to_join;
static struct uv__queue exit_message;
static struct uv__queue kill_thread_message;
static struct uv__queue wq;
static struct uv__queue run_slow_work_message;
static struct uv__queue slow_io_pending_wq;
static uv_sem_t sem;

static unsigned int slow_work_thread_threshold(void) {
return (nthreads + 1) / 2;
Expand Down Expand Up @@ -85,6 +89,17 @@ static void worker(void* arg) {
break;
}

/* Allow the main thread to kill targeted thread. */
if (q == &kill_thread_message) {
tid_to_join = uv_thread_self();
printf("tid_to_join = %lu\n", tid_to_join);
uv__queue_remove(q); /* Remove the kill message. */
uv__queue_init(q);
uv_cond_signal(&cond_kill);
uv_mutex_unlock(&mutex);
break;
}

uv__queue_remove(q);
uv__queue_init(q); /* Signal uv_cancel() that the work req is executing. */

Expand Down Expand Up @@ -185,6 +200,7 @@ void uv__threadpool_cleanup(void) {

uv_mutex_destroy(&mutex);
uv_cond_destroy(&cond);
uv_cond_destroy(&cond_kill);

threads = NULL;
nthreads = 0;
Expand All @@ -195,7 +211,6 @@ static void init_threads(void) {
uv_thread_options_t config;
unsigned int i;
const char* val;
uv_sem_t sem;

nthreads = ARRAY_SIZE(default_threads);
val = getenv("UV_THREADPOOL_SIZE");
Expand All @@ -218,6 +233,9 @@ static void init_threads(void) {
if (uv_cond_init(&cond))
abort();

if (uv_cond_init(&cond_kill))
abort();

if (uv_mutex_init(&mutex))
abort();

Expand Down Expand Up @@ -263,6 +281,164 @@ static void init_once(void) {
}


int uv__threads_spin(unsigned int n) {
uv_thread_options_t config;
int r;
size_t i;
uv_thread_t* threads_tmp;

assert(n > nthreads);

config.flags = UV_THREAD_HAS_STACK_SIZE;
config.stack_size = 8u << 20; /* 8 MB */

threads_tmp = uv__malloc(n * sizeof(threads[0]));
if (threads_tmp == NULL)
return UV_ENOMEM;


/* Copy all threads into the new list. */
for (i = 0; i < nthreads; i++)
threads_tmp[i] = threads[i];

/* Free the old list. And make it points to the new one */
if (threads != default_threads)
uv__free(threads);

threads = threads_tmp;

r = uv_sem_init(&sem, 0);
if (r)
goto out;

while (nthreads < n) {
/* Wire up the error code to the return value. */
r = uv_thread_create_ex(threads + nthreads, &config, worker, &sem);
if (r)
goto out;

uv_sem_wait(&sem);
nthreads++;
}

assert(nthreads == n);

out:
uv_sem_destroy(&sem);
return r;
}


int uv__threads_join(unsigned int n) {
int r;
size_t i;
size_t j;
uv_thread_t* threads_tmp;

assert(nthreads > n);
while (nthreads != n) {
/* Request any thread to kill */
uv__queue_insert_tail(&wq, &kill_thread_message);

if (idle_threads > 0)
uv_cond_signal(&cond);

uv_cond_wait(&cond_kill, &mutex);
r = uv_thread_join(&tid_to_join);
if (r)
return r;

/* Alloc for one thread less */
threads_tmp = uv__malloc((nthreads - 1) * sizeof(threads[0]));
if (threads_tmp == NULL)
return UV_ENOMEM;


int match_count = 0;
for (i = 0; i < nthreads; i++) {
if (threads[i] == tid_to_join) {
match_count++;
}
}

if (match_count != 1) {
fprintf(stderr, "Error: tid_to_join must be present exactly once not %d(times)\n", match_count);
return UV_EINVAL;
}

/* Copy all threads into the new list except the joined one. */
j = 0;
for (i = 0; i < nthreads; i++) {
#ifdef _WIN32
if (CompareObjectHandles(tid_to_join, threads[i]) == 0) {
threads_tmp[j++] = threads[i];
}
#else
if (tid_to_join != threads[i]) {
printf("Copying threads[%d] = %lu to threads_tmp[%d]\n", (int)i, threads[i], (int)j);
threads_tmp[j++] = threads[i];
}
#endif
printf("Iteration i=%d, j=%d\n", (int)i, (int)j);
}

/* Free the old list (if malloc'd) */
if (threads != default_threads)
uv__free(threads);

/* Decrease the number of threads */
nthreads--;
threads = threads_tmp;
}

assert(nthreads == n);

return 0;
}


unsigned int uv_get_threadpool_size(void) {
int r;

uv_once(&once, init_once);

uv_mutex_lock(&mutex);
r = nthreads;
uv_mutex_unlock(&mutex);
return r;
}


unsigned int uv_set_threadpool_size(unsigned int n) {
int r;
uv_once(&once, init_once);

/* For now, NO-threadpool is not supported. */
if (n > MAX_THREADPOOL_SIZE || n < 1)
return UV_EINVAL;

uv_mutex_lock(&mutex);

/* No-op */
if (n == nthreads) {
uv_mutex_unlock(&mutex);
return 0;
}

/* Shrink the threadpool request. */
if (n < nthreads) {
r = uv__threads_join(n);
uv_mutex_unlock(&mutex);
return r;
}

/* Grow the threadpool. */
r = uv__threads_spin(n);
uv_mutex_unlock(&mutex);
return r;
}


void uv__work_submit(uv_loop_t* loop,
struct uv__work* w,
enum uv__work_kind kind,
Expand Down
2 changes: 2 additions & 0 deletions test/test-list.h
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ TEST_DECLARE (fs_wtf)
TEST_DECLARE (fs_get_system_error)
TEST_DECLARE (strscpy)
TEST_DECLARE (strtok)
TEST_DECLARE (threadpool_size)
TEST_DECLARE (threadpool_queue_work_simple)
TEST_DECLARE (threadpool_queue_work_einval)
TEST_DECLARE (threadpool_multiple_event_loops)
Expand Down Expand Up @@ -1175,6 +1176,7 @@ TASK_LIST_START
TEST_ENTRY (open_osfhandle_valid_handle)
TEST_ENTRY (strscpy)
TEST_ENTRY (strtok)
TEST_ENTRY (threadpool_size)
TEST_ENTRY (threadpool_queue_work_simple)
TEST_ENTRY (threadpool_queue_work_einval)
TEST_ENTRY_CUSTOM (threadpool_multiple_event_loops, 0, 0, 60000)
Expand Down
45 changes: 45 additions & 0 deletions test/test-threadpool-size.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/* Copyright libuv project contributors. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/

#include "uv.h"
#include "task.h"

TEST_IMPL(threadpool_size) {
/* The default thread pool size is 4 */
ASSERT_EQ(uv_get_threadpool_size(), 4);

/* Normal use case (increase the size of the thread pool) */
ASSERT_OK(uv_set_threadpool_size(5));
ASSERT_EQ(uv_get_threadpool_size(), 5);

/* Shrink the thread pool */
ASSERT_OK(uv_set_threadpool_size(4));
ASSERT_EQ(uv_get_threadpool_size(), 4);

/* Grow the thread pool again */
ASSERT_OK(uv_set_threadpool_size(5));
ASSERT_EQ(uv_get_threadpool_size(), 5);

uv_run(uv_default_loop(), UV_RUN_DEFAULT);

MAKE_VALGRIND_HAPPY(uv_default_loop());
return 0;
}

0 comments on commit 8fb28b0

Please sign in to comment.