From 8fb61ce48bafa85d9dda6285b09e99318685fc7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20J=2E=20Est=C3=A9banez?= Date: Tue, 11 Nov 2025 13:05:42 +0100 Subject: [PATCH 1/2] CommandQueueMT: Fix race conditions --- core/templates/command_queue_mt.h | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/core/templates/command_queue_mt.h b/core/templates/command_queue_mt.h index 8154796eeead..bda9bfedd96f 100644 --- a/core/templates/command_queue_mt.h +++ b/core/templates/command_queue_mt.h @@ -39,6 +39,8 @@ #include "core/typedefs.h" class CommandQueueMT { + static const size_t MAX_COMMAND_SIZE = 1024; + struct CommandBase { bool sync = false; virtual void call() = 0; @@ -54,6 +56,8 @@ class CommandQueueMT { M method; Tuple...> args; + static_assert(sizeof(*this) <= MAX_COMMAND_SIZE); + template _FORCE_INLINE_ Command(T *p_instance, M p_method, FwdArgs &&...p_args) : CommandBase(NeedsSync), instance(p_instance), method(p_method), args(std::forward(p_args)...) {} @@ -82,6 +86,8 @@ class CommandQueueMT { R *ret; Tuple...> args; + static_assert(sizeof(*this) <= MAX_COMMAND_SIZE); + _FORCE_INLINE_ CommandRet(T *p_instance, M p_method, R *p_ret, GetSimpleTypeT... p_args) : CommandBase(true), instance(p_instance), method(p_method), ret(p_ret), args{ p_args... } {} @@ -154,19 +160,27 @@ class CommandQueueMT { } void _flush() { + MutexLock lock(mutex); + if (unlikely(flush_read_ptr)) { // Re-entrant call. return; } - MutexLock lock(mutex); + char *cmd_backup = (char *)alloca(MAX_COMMAND_SIZE); while (flush_read_ptr < command_mem.size()) { uint64_t size = *(uint64_t *)&command_mem[flush_read_ptr]; flush_read_ptr += 8; CommandBase *cmd = reinterpret_cast(&command_mem[flush_read_ptr]); + + // Protect against against race condition between this thread + // during the call to the command and other threads potentially + // invalidating the pointer due to reallocs. + memcpy(cmd_backup, (char *)cmd, size); + uint32_t allowance_id = WorkerThreadPool::thread_enter_unlock_allowance_zone(lock); - cmd->call(); + ((CommandBase *)cmd_backup)->call(); WorkerThreadPool::thread_exit_unlock_allowance_zone(allowance_id); // Handle potential realloc due to the command and unlock allowance. @@ -174,9 +188,9 @@ class CommandQueueMT { if (unlikely(cmd->sync)) { sync_head++; - lock.~MutexLock(); // Give an opportunity to awaiters right away. + lock.temp_unlock(); // Give an opportunity to awaiters right away. sync_cond_var.notify_all(); - new (&lock) MutexLock(mutex); + lock.temp_relock(); // Handle potential realloc happened during unlock. cmd = reinterpret_cast(&command_mem[flush_read_ptr]); } From bd615b42d4d986526cc8d5cd462ed6ffb60b4473 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20J=2E=20Est=C3=A9banez?= Date: Fri, 7 Nov 2025 12:38:29 +0100 Subject: [PATCH 2/2] CommandQueueMT: Reduce lock contention in cases of single flusher --- core/templates/command_queue_mt.cpp | 3 ++- core/templates/command_queue_mt.h | 17 +++++++++++++---- modules/betsy/image_compress_betsy.h | 2 +- servers/rendering/rendering_server_default.h | 2 +- 4 files changed, 17 insertions(+), 7 deletions(-) diff --git a/core/templates/command_queue_mt.cpp b/core/templates/command_queue_mt.cpp index 08b903d92ebb..11efe20de8f3 100644 --- a/core/templates/command_queue_mt.cpp +++ b/core/templates/command_queue_mt.cpp @@ -30,7 +30,8 @@ #include "command_queue_mt.h" -CommandQueueMT::CommandQueueMT() { +CommandQueueMT::CommandQueueMT(bool p_unique_flusher) : + unique_flusher(p_unique_flusher) { command_mem.reserve(DEFAULT_COMMAND_MEM_SIZE_KB * 1024); } diff --git a/core/templates/command_queue_mt.h b/core/templates/command_queue_mt.h index bda9bfedd96f..c251bfcba8b8 100644 --- a/core/templates/command_queue_mt.h +++ b/core/templates/command_queue_mt.h @@ -111,6 +111,7 @@ class CommandQueueMT { static const uint32_t DEFAULT_COMMAND_MEM_SIZE_KB = 64; + bool unique_flusher = false; BinaryMutex mutex; LocalVector command_mem; ConditionVariable sync_cond_var; @@ -179,9 +180,17 @@ class CommandQueueMT { // invalidating the pointer due to reallocs. memcpy(cmd_backup, (char *)cmd, size); - uint32_t allowance_id = WorkerThreadPool::thread_enter_unlock_allowance_zone(lock); - ((CommandBase *)cmd_backup)->call(); - WorkerThreadPool::thread_exit_unlock_allowance_zone(allowance_id); + if (unique_flusher) { + // A single thread will pump; the lock is only needed for the command queue itself. + lock.temp_unlock(); + ((CommandBase *)cmd_backup)->call(); + lock.temp_relock(); + } else { + // At least we can unlock during WTP operations. + uint32_t allowance_id = WorkerThreadPool::thread_enter_unlock_allowance_zone(lock); + ((CommandBase *)cmd_backup)->call(); + WorkerThreadPool::thread_exit_unlock_allowance_zone(allowance_id); + } // Handle potential realloc due to the command and unlock allowance. cmd = reinterpret_cast(&command_mem[flush_read_ptr]); @@ -266,6 +275,6 @@ class CommandQueueMT { pump_task_id = p_task_id; } - CommandQueueMT(); + CommandQueueMT(bool p_unique_flusher = false); ~CommandQueueMT(); }; diff --git a/modules/betsy/image_compress_betsy.h b/modules/betsy/image_compress_betsy.h index 15b588949703..78352a12947f 100644 --- a/modules/betsy/image_compress_betsy.h +++ b/modules/betsy/image_compress_betsy.h @@ -103,7 +103,7 @@ Error _betsy_compress_s3tc(Image *r_img, Image::UsedChannels p_channels); class BetsyCompressor : public Object { GDSOFTCLASS(BetsyCompressor, Object); - mutable CommandQueueMT command_queue; + mutable CommandQueueMT command_queue = CommandQueueMT(true); bool exit = false; WorkerThreadPool::TaskID task_id = WorkerThreadPool::INVALID_TASK_ID; diff --git a/servers/rendering/rendering_server_default.h b/servers/rendering/rendering_server_default.h index 34faef662c41..e9ac6098ec68 100644 --- a/servers/rendering/rendering_server_default.h +++ b/servers/rendering/rendering_server_default.h @@ -74,7 +74,7 @@ class RenderingServerDefault : public RenderingServer { uint64_t print_frame_profile_ticks_from = 0; uint32_t print_frame_profile_frame_count = 0; - mutable CommandQueueMT command_queue; + mutable CommandQueueMT command_queue = CommandQueueMT(true); Thread::ID server_thread = Thread::MAIN_ID; WorkerThreadPool::TaskID server_task_id = WorkerThreadPool::INVALID_TASK_ID;