Skip to content

Commit fc039e5

Browse files
committed
misc: introduce uv_get|set_threadpool_size
This patch adds the posibility to modify the threadpool size at runtime Fixes: libuv#4401 Signed-off-by: Juan José Arboleda <soyjuanarbol@gmail.com>
1 parent e59e2a9 commit fc039e5

File tree

7 files changed

+230
-7
lines changed

7 files changed

+230
-7
lines changed

.github/workflows/CI-win.yml

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ on:
1111
push:
1212
branches:
1313
- v[0-9].*
14+
- juan/tp
1415
- master
1516

1617
jobs:
@@ -21,13 +22,13 @@ jobs:
2122
fail-fast: false
2223
matrix:
2324
config:
24-
- {toolchain: Visual Studio 16 2019, arch: Win32, server: 2019}
25-
- {toolchain: Visual Studio 16 2019, arch: x64, server: 2019}
26-
- {toolchain: Visual Studio 17 2022, arch: Win32, server: 2022}
27-
- {toolchain: Visual Studio 17 2022, arch: x64, server: 2022}
25+
# - {toolchain: Visual Studio 16 2019, arch: Win32, server: 2019}
26+
# - {toolchain: Visual Studio 16 2019, arch: x64, server: 2019}
27+
# - {toolchain: Visual Studio 17 2022, arch: Win32, server: 2022}
28+
# - {toolchain: Visual Studio 17 2022, arch: x64, server: 2022}
2829
- {toolchain: Visual Studio 17 2022, arch: x64, server: 2022, config: ASAN}
29-
- {toolchain: Visual Studio 17 2022, arch: x64, server: 2022, config: UBSAN}
30-
- {toolchain: Visual Studio 17 2022, arch: arm64, server: 2022}
30+
# - {toolchain: Visual Studio 17 2022, arch: x64, server: 2022, config: UBSAN}
31+
# - {toolchain: Visual Studio 17 2022, arch: arm64, server: 2022}
3132
steps:
3233
- uses: actions/checkout@v4
3334
- name: Build

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -670,6 +670,7 @@ if(LIBUV_BUILD_TESTS)
670670
test/test-thread-name.c
671671
test/test-thread-priority.c
672672
test/test-threadpool-cancel.c
673+
test/test-threadpool-size.c
673674
test/test-threadpool.c
674675
test/test-timer-again.c
675676
test/test-timer-from-check.c

Makefile.am

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,7 @@ test_run_tests_SOURCES = test/blackhole-server.c \
297297
test/test-thread-name.c \
298298
test/test-thread-priority.c \
299299
test/test-threadpool-cancel.c \
300+
test/test-threadpool-size.c \
300301
test/test-threadpool.c \
301302
test/test-timer-again.c \
302303
test/test-timer-from-check.c \

include/uv.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1337,6 +1337,9 @@ enum {
13371337
UV_EXTERN int uv_thread_getpriority(uv_thread_t tid, int* priority);
13381338
UV_EXTERN int uv_thread_setpriority(uv_thread_t tid, int priority);
13391339

1340+
UV_EXTERN unsigned int uv_get_threadpool_size(void);
1341+
UV_EXTERN unsigned int uv_set_threadpool_size(unsigned int n);
1342+
13401343
UV_EXTERN unsigned int uv_available_parallelism(void);
13411344
UV_EXTERN int uv_cpu_info(uv_cpu_info_t** cpu_infos, int* count);
13421345
UV_EXTERN void uv_free_cpu_info(uv_cpu_info_t* cpu_infos, int count);

src/threadpool.c

Lines changed: 171 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,20 @@
3131

3232
static uv_once_t once = UV_ONCE_INIT;
3333
static uv_cond_t cond;
34+
static uv_cond_t cond_kill;
3435
static uv_mutex_t mutex;
3536
static unsigned int idle_threads;
3637
static unsigned int slow_io_work_running;
3738
static unsigned int nthreads;
3839
static uv_thread_t* threads;
3940
static uv_thread_t default_threads[4];
41+
static uv_thread_t tid_to_join;
4042
static struct uv__queue exit_message;
43+
static struct uv__queue kill_thread_message;
4144
static struct uv__queue wq;
4245
static struct uv__queue run_slow_work_message;
4346
static struct uv__queue slow_io_pending_wq;
47+
static uv_sem_t sem;
4448

4549
static unsigned int slow_work_thread_threshold(void) {
4650
return (nthreads + 1) / 2;
@@ -85,6 +89,17 @@ static void worker(void* arg) {
8589
break;
8690
}
8791

92+
/* Allow the main thread to kill targeted thread. */
93+
if (q == &kill_thread_message) {
94+
tid_to_join = uv_thread_self();
95+
printf("tid_to_join = %lu\n", tid_to_join);
96+
uv__queue_remove(q); /* Remove the kill message. */
97+
uv__queue_init(q);
98+
uv_cond_signal(&cond_kill);
99+
uv_mutex_unlock(&mutex);
100+
break;
101+
}
102+
88103
uv__queue_remove(q);
89104
uv__queue_init(q); /* Signal uv_cancel() that the work req is executing. */
90105

@@ -185,6 +200,7 @@ void uv__threadpool_cleanup(void) {
185200

186201
uv_mutex_destroy(&mutex);
187202
uv_cond_destroy(&cond);
203+
uv_cond_destroy(&cond_kill);
188204

189205
threads = NULL;
190206
nthreads = 0;
@@ -195,7 +211,6 @@ static void init_threads(void) {
195211
uv_thread_options_t config;
196212
unsigned int i;
197213
const char* val;
198-
uv_sem_t sem;
199214

200215
nthreads = ARRAY_SIZE(default_threads);
201216
val = getenv("UV_THREADPOOL_SIZE");
@@ -218,6 +233,9 @@ static void init_threads(void) {
218233
if (uv_cond_init(&cond))
219234
abort();
220235

236+
if (uv_cond_init(&cond_kill))
237+
abort();
238+
221239
if (uv_mutex_init(&mutex))
222240
abort();
223241

@@ -263,6 +281,158 @@ static void init_once(void) {
263281
}
264282

265283

284+
int uv__threads_spin(unsigned int n) {
285+
uv_thread_options_t config;
286+
int r;
287+
size_t i;
288+
uv_thread_t* threads_tmp;
289+
290+
assert(n > nthreads);
291+
292+
config.flags = UV_THREAD_HAS_STACK_SIZE;
293+
config.stack_size = 8u << 20; /* 8 MB */
294+
295+
threads_tmp = uv__malloc(n * sizeof(threads[0]));
296+
if (threads_tmp == NULL)
297+
return UV_ENOMEM;
298+
299+
300+
/* Copy all threads into the new list. */
301+
for (i = 0; i < nthreads; i++)
302+
threads_tmp[i] = threads[i];
303+
304+
/* Free the old list. And make it points to the new one */
305+
if (threads != default_threads)
306+
uv__free(threads);
307+
308+
threads = threads_tmp;
309+
310+
r = uv_sem_init(&sem, 0);
311+
if (r)
312+
goto out;
313+
314+
while (nthreads < n) {
315+
/* Wire up the error code to the return value. */
316+
r = uv_thread_create_ex(threads + nthreads, &config, worker, &sem);
317+
if (r)
318+
goto out;
319+
320+
uv_sem_wait(&sem);
321+
nthreads++;
322+
}
323+
324+
assert(nthreads == n);
325+
326+
out:
327+
uv_sem_destroy(&sem);
328+
return r;
329+
}
330+
331+
332+
int uv__threads_join(unsigned int n) {
333+
int r;
334+
size_t i;
335+
size_t j;
336+
uv_thread_t* threads_tmp;
337+
338+
assert(nthreads > n);
339+
while (nthreads != n) {
340+
/* Request any thread to kill */
341+
uv__queue_insert_tail(&wq, &kill_thread_message);
342+
343+
if (idle_threads > 0)
344+
uv_cond_signal(&cond);
345+
346+
uv_cond_wait(&cond_kill, &mutex);
347+
r = uv_thread_join(&tid_to_join);
348+
if (r)
349+
return r;
350+
351+
/* Alloc for one thread less */
352+
threads_tmp = uv__malloc((nthreads - 1) * sizeof(threads[0]));
353+
if (threads_tmp == NULL)
354+
return UV_ENOMEM;
355+
356+
357+
int match_count = 0;
358+
for (i = 0; i < nthreads; i++) {
359+
if (uv_thread_equal(&threads[i], &tid_to_join)) {
360+
match_count++;
361+
}
362+
}
363+
364+
if (match_count != 1) {
365+
fprintf(stderr, "Error: tid_to_join must be present exactly once not %d(times)\n", match_count);
366+
return UV_EINVAL;
367+
}
368+
369+
/* Copy all threads into the new list except the joined one. */
370+
j = 0;
371+
for (i = 0; i < nthreads; i++) {
372+
if (!uv_thread_equal(&threads[i], &tid_to_join)) {
373+
printf("Copying threads[%d] = %lu to threads_tmp[%d]\n", (int)i, (unsigned long)threads[i], (int)j);
374+
threads_tmp[j++] = threads[i];
375+
}
376+
printf("Iteration i=%d, j=%d\n", (int)i, (int)j);
377+
}
378+
379+
/* Free the old list (if malloc'd) */
380+
if (threads != default_threads)
381+
uv__free(threads);
382+
383+
/* Decrease the number of threads */
384+
nthreads--;
385+
threads = threads_tmp;
386+
}
387+
388+
assert(nthreads == n);
389+
390+
return 0;
391+
}
392+
393+
394+
unsigned int uv_get_threadpool_size(void) {
395+
int r;
396+
397+
uv_once(&once, init_once);
398+
399+
uv_mutex_lock(&mutex);
400+
r = nthreads;
401+
uv_mutex_unlock(&mutex);
402+
return r;
403+
}
404+
405+
406+
unsigned int uv_set_threadpool_size(unsigned int n) {
407+
int r;
408+
uv_once(&once, init_once);
409+
410+
/* For now, NO-threadpool is not supported. */
411+
if (n > MAX_THREADPOOL_SIZE || n < 1)
412+
return UV_EINVAL;
413+
414+
uv_mutex_lock(&mutex);
415+
416+
/* No-op */
417+
if (n == nthreads) {
418+
uv_mutex_unlock(&mutex);
419+
return 0;
420+
}
421+
422+
/* Shrink the threadpool request. */
423+
if (n < nthreads) {
424+
r = uv__threads_join(n);
425+
uv_mutex_unlock(&mutex);
426+
return r;
427+
}
428+
429+
/* Grow the threadpool. */
430+
r = uv__threads_spin(n);
431+
uv_mutex_unlock(&mutex);
432+
return r;
433+
}
434+
435+
266436
void uv__work_submit(uv_loop_t* loop,
267437
struct uv__work* w,
268438
enum uv__work_kind kind,

test/test-list.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,7 @@ TEST_DECLARE (fs_wtf)
456456
TEST_DECLARE (fs_get_system_error)
457457
TEST_DECLARE (strscpy)
458458
TEST_DECLARE (strtok)
459+
TEST_DECLARE (threadpool_size)
459460
TEST_DECLARE (threadpool_queue_work_simple)
460461
TEST_DECLARE (threadpool_queue_work_einval)
461462
TEST_DECLARE (threadpool_multiple_event_loops)
@@ -1175,6 +1176,7 @@ TASK_LIST_START
11751176
TEST_ENTRY (open_osfhandle_valid_handle)
11761177
TEST_ENTRY (strscpy)
11771178
TEST_ENTRY (strtok)
1179+
TEST_ENTRY (threadpool_size)
11781180
TEST_ENTRY (threadpool_queue_work_simple)
11791181
TEST_ENTRY (threadpool_queue_work_einval)
11801182
TEST_ENTRY_CUSTOM (threadpool_multiple_event_loops, 0, 0, 60000)

test/test-threadpool-size.c

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/* Copyright libuv project contributors. All rights reserved.
2+
*
3+
* Permission is hereby granted, free of charge, to any person obtaining a copy
4+
* of this software and associated documentation files (the "Software"), to
5+
* deal in the Software without restriction, including without limitation the
6+
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7+
* sell copies of the Software, and to permit persons to whom the Software is
8+
* furnished to do so, subject to the following conditions:
9+
*
10+
* The above copyright notice and this permission notice shall be included in
11+
* all copies or substantial portions of the Software.
12+
*
13+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18+
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19+
* IN THE SOFTWARE.
20+
*/
21+
22+
#include "uv.h"
23+
#include "task.h"
24+
25+
TEST_IMPL(threadpool_size) {
26+
/* The default thread pool size is 4 */
27+
ASSERT_EQ(uv_get_threadpool_size(), 4);
28+
29+
/* Normal use case (increase the size of the thread pool) */
30+
ASSERT_OK(uv_set_threadpool_size(5));
31+
ASSERT_EQ(uv_get_threadpool_size(), 5);
32+
33+
/* Shrink the thread pool */
34+
ASSERT_OK(uv_set_threadpool_size(4));
35+
ASSERT_EQ(uv_get_threadpool_size(), 4);
36+
37+
/* Grow the thread pool again */
38+
ASSERT_OK(uv_set_threadpool_size(5));
39+
ASSERT_EQ(uv_get_threadpool_size(), 5);
40+
41+
uv_run(uv_default_loop(), UV_RUN_DEFAULT);
42+
43+
MAKE_VALGRIND_HAPPY(uv_default_loop());
44+
return 0;
45+
}

0 commit comments

Comments
 (0)