-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat:change thread sheduling method in ThreadPool class (ospp 2024) #2648
feat:change thread sheduling method in ThreadPool class (ospp 2024) #2648
Conversation
…rting pika in centos
src/net/src/thread_pool.cc
Outdated
// 1. loop for short time | ||
for (uint32_t tries = 0; tries < 200; ++tries) { | ||
if (newest_node_.load(std::memory_order_acquire) != nullptr) { | ||
last = newest_node_.exchange(nullptr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里先到的线程直接摘了整个链表,据为己有,在去线性消费,可能会导致延迟波动大,建议尽量将任务均匀分给线程池里的worker。毕竟Pika读写链路上都是自己的线程,和rocksdb的线程模型差异比较大(Rocksdb这块都是application线程在对每个writer并发),这一块可能得多一些考量。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我想了一下,一次只取一定数量的 task 大概有两种办法:
1、一个 worker 对应一个无锁链表,然后新的 task 就随机或者遍历地往这些链表中加;
2、依旧使用一个无锁链表,但是无锁链表的容量较低,比如为 10 个这样的,这样保证一个 worker 一次最多取 10 个。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
第二个方法直接测试就行,第一个方法见我新的分支:https://github.com/QlQlqiqi/pika/tree/change-thread-shedule-with-mutil-list
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我这里测的结果是这两个方法速度不相上下,当然如果调参合适的话应该会有较大的差距
@CodiumAI-Agent /review |
PR Review 🔍
Code feedback:
|
@CodiumAI-Agent /improve |
PR Code Suggestions ✨
|
WalkthroughThe recent updates to the Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant ThreadPool
participant Worker
Client->>ThreadPool: Schedule(task)
ThreadPool->>Worker: Notify worker
Worker-->>ThreadPool: Fetch task
Worker->>Worker: Execute task
Worker-->>ThreadPool: Task complete
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Outside diff range and nitpick comments (2)
tests/integration/start_codis.sh (1)
Line range hint
38-38
: Add error handling after thecd
command to ensure script robustness.- cd ../codis + cd ../codis || exitsrc/net/src/thread_pool.cc (1)
Line range hint
102-283
: Replace busy waiting with more efficient waiting mechanisms and ensure robust memory management to prevent leaks and ensure thread safety.- while (node_cnt_.load(std::memory_order_relaxed) >= max_queue_size_) { - std::this_thread::yield(); - } + std::unique_lock<std::mutex> lock(mu_); + rsignal_.wait(lock, [this] { return node_cnt_.load() < max_queue_size_; }); - auto node = new Node(func, arg); + auto node = std::make_unique<Node>(func, arg); - LinkOne(node, &newest_node_); + LinkOne(node.get(), &newest_node_); - node.release(); // Transfer ownership to the linked list
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (6)
- src/net/include/likely.h (1 hunks)
- src/net/include/net_define.h (1 hunks)
- src/net/include/random.h (1 hunks)
- src/net/include/thread_pool.h (3 hunks)
- src/net/src/thread_pool.cc (5 hunks)
- tests/integration/start_codis.sh (2 hunks)
Files skipped from review due to trivial changes (2)
- src/net/include/likely.h
- src/net/include/net_define.h
Additional context used
Shellcheck
tests/integration/start_codis.sh
[warning] 38-38: Use 'cd ... || exit' or 'cd ... || return' in case cd fails. (SC2164)
#rm -rf /tmp/codis | ||
#rm -rf codis_data_1 | ||
#rm -rf codis_data_2 | ||
# sugguest clear these before starting codis |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct the typo in the comment.
- # sugguest clear these before starting codis
+ # suggest clear these before starting codis
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
# sugguest clear these before starting codis | |
# suggest clear these before starting codis |
src/net/include/thread_pool.h
Outdated
private: | ||
struct Node { | ||
Node* link_older = nullptr; | ||
Node* link_newer = nullptr; | ||
|
||
// true if task is TimeTask | ||
bool is_time_task; | ||
TimeTask task; | ||
|
||
Node(TaskFunc func, void* arg) : is_time_task(false), task(0, func, arg) {} | ||
Node(uint64_t exec_time, TaskFunc func, void* arg) : is_time_task(true), task(exec_time, func, arg) {} | ||
|
||
inline void Exec() { task.func(task.arg); } | ||
inline Node* Next() { return link_newer; } | ||
}; | ||
|
||
static inline void AsmVolatilePause() { | ||
#if defined(__i386__) || defined(__x86_64__) | ||
asm volatile("pause"); | ||
#elif defined(__aarch64__) | ||
asm volatile("wfe"); | ||
#elif defined(__powerpc64__) | ||
asm volatile("or 27,27,27"); | ||
#endif | ||
// it's okay for other platforms to be no-ops | ||
} | ||
|
||
Node* CreateMissingNewerLinks(Node* head); | ||
bool LinkOne(Node* node, std::atomic<Node*>* newest_node); | ||
|
||
std::atomic<Node*> newest_node_; | ||
std::atomic<int> node_cnt_; // for task | ||
std::atomic<Node*> time_newest_node_; | ||
std::atomic<int> time_node_cnt_; // for time task | ||
|
||
const int queue_slow_size_; // default value: min(worker_num_ * 10, max_queue_size_) | ||
size_t max_queue_size_; | ||
|
||
const uint64_t max_yield_usec_; | ||
const uint64_t slow_yield_usec_; | ||
|
||
AdaptationContext adp_ctx; | ||
|
||
size_t worker_num_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure proper memory management for Node
objects and correct ordering of atomic operations to prevent race conditions.
- Node* link_older = nullptr;
- Node* link_newer = nullptr;
+ std::atomic<Node*> link_older{nullptr};
+ std::atomic<Node*> link_newer{nullptr};
- std::atomic<Node*> newest_node_;
- std::atomic<int> node_cnt_;
+ std::atomic<Node*> newest_node_{nullptr};
+ std::atomic<int> node_cnt_{0};
- if (newest_node->compare_exchange_weak(nodes, node)) {
+ if (newest_node->compare_exchange_weak(nodes, node, std::memory_order_release, std::memory_order_relaxed)) {
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
private: | |
struct Node { | |
Node* link_older = nullptr; | |
Node* link_newer = nullptr; | |
// true if task is TimeTask | |
bool is_time_task; | |
TimeTask task; | |
Node(TaskFunc func, void* arg) : is_time_task(false), task(0, func, arg) {} | |
Node(uint64_t exec_time, TaskFunc func, void* arg) : is_time_task(true), task(exec_time, func, arg) {} | |
inline void Exec() { task.func(task.arg); } | |
inline Node* Next() { return link_newer; } | |
}; | |
static inline void AsmVolatilePause() { | |
#if defined(__i386__) || defined(__x86_64__) | |
asm volatile("pause"); | |
#elif defined(__aarch64__) | |
asm volatile("wfe"); | |
#elif defined(__powerpc64__) | |
asm volatile("or 27,27,27"); | |
#endif | |
// it's okay for other platforms to be no-ops | |
} | |
Node* CreateMissingNewerLinks(Node* head); | |
bool LinkOne(Node* node, std::atomic<Node*>* newest_node); | |
std::atomic<Node*> newest_node_; | |
std::atomic<int> node_cnt_; // for task | |
std::atomic<Node*> time_newest_node_; | |
std::atomic<int> time_node_cnt_; // for time task | |
const int queue_slow_size_; // default value: min(worker_num_ * 10, max_queue_size_) | |
size_t max_queue_size_; | |
const uint64_t max_yield_usec_; | |
const uint64_t slow_yield_usec_; | |
AdaptationContext adp_ctx; | |
size_t worker_num_; | |
private: | |
struct Node { | |
std::atomic<Node*> link_older{nullptr}; | |
std::atomic<Node*> link_newer{nullptr}; | |
// true if task is TimeTask | |
bool is_time_task; | |
TimeTask task; | |
Node(TaskFunc func, void* arg) : is_time_task(false), task(0, func, arg) {} | |
Node(uint64_t exec_time, TaskFunc func, void* arg) : is_time_task(true), task(exec_time, func, arg) {} | |
inline void Exec() { task.func(task.arg); } | |
inline Node* Next() { return link_newer.load(); } | |
}; | |
static inline void AsmVolatilePause() { | |
#if defined(__i386__) || defined(__x86_64__) | |
asm volatile("pause"); | |
#elif defined(__aarch64__) | |
asm volatile("wfe"); | |
#elif defined(__powerpc64__) | |
asm volatile("or 27,27,27"); | |
#endif | |
// it's okay for other platforms to be no-ops | |
} | |
Node* CreateMissingNewerLinks(Node* head); | |
bool LinkOne(Node* node, std::atomic<Node*>* newest_node); | |
std::atomic<Node*> newest_node_{nullptr}; | |
std::atomic<int> node_cnt_{0}; // for task | |
std::atomic<Node*> time_newest_node_; | |
std::atomic<int> time_node_cnt_; // for time task | |
const int queue_slow_size_; // default value: min(worker_num_ * 10, max_queue_size_) | |
size_t max_queue_size_; | |
const uint64_t max_yield_usec_; | |
const uint64_t slow_yield_usec_; | |
AdaptationContext adp_ctx; | |
size_t worker_num_; |
这些都是一个 worker 一个无锁链表的测试(具体上面讨论有描述),代码:https://github.com/QlQlqiqi/pika/tree/change-thread-shedule-with-mutil-list 关闭 binlog、cache
修改参数kMaxSlowYieldsWhileSpinning、slow_yield_usec_、max_yield_usec_、指令类型
通过调参,可以降低一部分 get 的 p99 和 set 的 qps,但是 get 的 qps 和 set 的 p99 会提升。这块参数是和机器性能有关的,我本地测试机器性能一般,通过提高二级等待时间,可以适度提高效果。 或者可以通过牺牲 set 来换取 get 性能如果恒让 yield_credit 为正数(即:总是进入二级等待),那么 get 的 qps 会提高,p99 会降低,对比如下:
因为总是进入二级等待,几乎可以让 get 测试不会触发 cv.wait,耗时减少;但是,这样会让本身就较为耗时的 set 总是在等待任务,占据了那些处理数据本身的时间,并且因为 set 执行慢,二级等待后可能依旧触发 cv.wait,从而导致慢。 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
Outside diff range and nitpick comments (1)
tests/integration/start_codis.sh (1)
Line range hint
38-38
: Add error handling for thecd
command.The script should handle the possibility of the
cd
command failing, which would prevent errors in subsequent commands if the directory does not exist or is not accessible.- cd ../codis + cd ../codis || exit
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (6)
- src/net/include/likely.h (1 hunks)
- src/net/include/net_define.h (1 hunks)
- src/net/include/random.h (1 hunks)
- src/net/include/thread_pool.h (4 hunks)
- src/net/src/thread_pool.cc (5 hunks)
- tests/integration/start_codis.sh (2 hunks)
Files skipped from review due to trivial changes (2)
- src/net/include/likely.h
- src/net/include/net_define.h
Additional context used
Shellcheck
tests/integration/start_codis.sh
[warning] 38-38: Use 'cd ... || exit' or 'cd ... || return' in case cd fails. (SC2164)
Additional comments not posted (1)
tests/integration/start_codis.sh (1)
3-3
: Correct the typo in the comment.The word "sugguest" should be corrected to "suggest".
- # sugguest clear these before starting codis + # suggest clear these before starting codisLikely invalid or redundant comment.
static __thread Random* tls_instance; | ||
static __thread std::aligned_storage<sizeof(Random)>::type tls_instance_bytes; | ||
|
||
auto rv = tls_instance; | ||
if (UNLIKELY(rv == nullptr)) { | ||
size_t seed = std::hash<std::thread::id>()(std::this_thread::get_id()); | ||
rv = new (&tls_instance_bytes) Random((uint32_t)seed); | ||
tls_instance = rv; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use smart pointers for thread-local storage.
To manage memory more safely and efficiently, consider using smart pointers instead of raw pointers for the thread-local instance of the Random
class.
- static __thread Random* tls_instance;
- static __thread std::aligned_storage<sizeof(Random)>::type tls_instance_bytes;
+ static thread_local std::unique_ptr<Random> tls_instance;
- auto rv = tls_instance;
- if (UNLIKELY(rv == nullptr)) {
- size_t seed = std::hash<std::thread::id>()(std::this_thread::get_id());
- rv = new (&tls_instance_bytes) Random((uint32_t)seed);
- tls_instance = rv;
- }
+ if (!tls_instance) {
+ size_t seed = std::hash<std::thread::id>()(std::this_thread::get_id());
+ tls_instance = std::make_unique<Random>((uint32_t)seed);
+ }
+ return tls_instance.get();
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
static __thread Random* tls_instance; | |
static __thread std::aligned_storage<sizeof(Random)>::type tls_instance_bytes; | |
auto rv = tls_instance; | |
if (UNLIKELY(rv == nullptr)) { | |
size_t seed = std::hash<std::thread::id>()(std::this_thread::get_id()); | |
rv = new (&tls_instance_bytes) Random((uint32_t)seed); | |
tls_instance = rv; | |
} | |
static thread_local std::unique_ptr<Random> tls_instance; | |
if (!tls_instance) { | |
size_t seed = std::hash<std::thread::id>()(std::this_thread::get_id()); | |
tls_instance = std::make_unique<Random>((uint32_t)seed); | |
} | |
return tls_instance.get(); |
@@ -0,0 +1,88 @@ | |||
// Copyright (c) 2024-present, Qihoo, Inc. All rights reserved. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a license declaration at the start of the file.
As suggested in previous comments, a license declaration should be consistently present at the start of each file.
+ // Copyright (c) 2024-present, OpenAtom Foundation. All rights reserved.
+ // This source code is licensed under the BSD-style license found in the
+ // LICENSE file in the root directory of this source tree. An additional grant
+ // of patent rights can be found in the PATENTS file in the same directory.
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
// Copyright (c) 2024-present, Qihoo, Inc. All rights reserved. | |
// Copyright (c) 2024-present, OpenAtom Foundation. All rights reserved. | |
// This source code is licensed under the BSD-style license found in the | |
// LICENSE file in the root directory of this source tree. An additional grant | |
// of patent rights can be found in the PATENTS file in the same directory. | |
// Copyright (c) 2024-present, Qihoo, Inc. All rights reserved. |
src/net/include/thread_pool.h
Outdated
struct Node { | ||
Node* link_older = nullptr; | ||
Node* link_newer = nullptr; | ||
|
||
size_t worker_num_; | ||
// true if task is TimeTask | ||
bool is_time_task; | ||
TimeTask task; | ||
|
||
Node(TaskFunc func, void* arg) : is_time_task(false), task(0, func, arg) {} | ||
Node(uint64_t exec_time, TaskFunc func, void* arg) : is_time_task(true), task(exec_time, func, arg) {} | ||
|
||
inline void Exec() { task.func(task.arg); } | ||
inline Node* Next() { return link_newer; } | ||
}; | ||
|
||
static inline void AsmVolatilePause() { | ||
#if defined(__i386__) || defined(__x86_64__) | ||
asm volatile("pause"); | ||
#elif defined(__aarch64__) | ||
asm volatile("wfe"); | ||
#elif defined(__powerpc64__) | ||
asm volatile("or 27,27,27"); | ||
#endif | ||
// it's okay for other platforms to be no-ops | ||
} | ||
|
||
Node* CreateMissingNewerLinks(Node* head, int* cnt); | ||
bool LinkOne(Node* node, std::atomic<Node*>* newest_node); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure proper memory management for Node
objects and correct ordering of atomic operations.
To prevent memory leaks and race conditions, consider using smart pointers for Node
objects and ensure correct memory order tags for atomic operations.
- Node* link_older = nullptr;
- Node* link_newer = nullptr;
+ std::atomic<Node*> link_older{nullptr};
+ std::atomic<Node*> link_newer{nullptr};
- std::atomic<Node*> newest_node_;
- std::atomic<int> node_cnt_;
+ std::atomic<Node*> newest_node_{nullptr};
+ std::atomic<int> node_cnt_{0};
- if (newest_node->compare_exchange_weak(nodes, node)) {
+ if (newest_node->compare_exchange_weak(nodes, node, std::memory_order_release, std::memory_order_relaxed)) {
Committable suggestion was skipped due to low confidence.
lock.lock();
// if task is coming now, do task immediately
auto res = rsignal.wait_for(lock, std::chrono::microseconds(exec_time - unow), [this, &newest_node]() {
return newest_node.load(std::memory_order_relaxed) != nullptr || UNLIKELY(should_stop());
});
lock.unlock();
if (res) {
// re-push the timer tasks
ReDelaySchedule(time_first);
goto retry;
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 7
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (2)
- src/net/include/thread_pool.h (4 hunks)
- src/net/src/thread_pool.cc (5 hunks)
Additional comments not posted (10)
src/net/include/thread_pool.h (7)
12-12
: Consider sorting imports for better readability.Imports like
<vector>
are typically grouped and sorted with other standard library includes. This can improve readability and maintainability of the code.
20-20
: Modernize function pointer declaration.Using
using
for type aliasing improves readability and is more in line with modern C++ standards.
34-38
: Introduce a constructor forArg
struct.The explicit constructor initializes
arg
andidx
, ensuring that objects of this struct are always in a valid state when created.
51-52
: Initialization ofWorker
class members.The members
idx_
andarg_
are correctly initialized in the constructor, ensuring that each worker has a unique index and associated argument.
72-73
: Method to run tasks in a thread is well encapsulated.The
runInThread
method allows for tasks to be executed in a specific thread based on the index, which is a good design for load balancing across workers.
75-79
: Introduction ofAdaptationContext
struct.This struct is used to manage adaptive thresholds for task execution, which is crucial for optimizing performance under different load conditions.
100-109
: Platform-specific pause operations are correctly implemented.The use of assembly instructions for pausing the CPU is appropriate for reducing CPU usage during spin-wait loops.
src/net/src/thread_pool.cc (3)
17-20
: Correct implementation ofWorkerMain
.The method correctly casts the argument and calls
runInThread
with the appropriate worker index, which is essential for correct task execution.
93-95
: Proper cleanup instop_thread_pool
.Signaling all condition variables before joining and deleting worker threads is a good practice to ensure clean shutdown.
77-78
: Worker initialization in the thread pool.Workers are dynamically allocated and started correctly. However, ensure that there is a corresponding deallocation to prevent memory leaks.
Verification successful
Worker initialization in the thread pool.
The
Worker
objects are properly deallocated, as evidenced by thedelete worker
statements found in the codebase.
src/net/src/thread_pool.cc
:delete worker;
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify memory management for workers. # Test: Search for worker deallocation. rg --type py 'delete worker'Length of output: 30
Script:
#!/bin/bash # Description: Verify memory management for workers. # Test: Search for worker deallocation in C++ files. rg --type cpp 'delete worker'Length of output: 227
int idx; | ||
}; | ||
|
||
explicit Worker(ThreadPool* tp, int idx = 0) : start_(false), thread_pool_(tp), idx_(idx), arg_(tp, idx){}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure thread safety during worker initialization.
The initialization of Worker
objects includes setting start_
to false
, which is correct. However, consider using std::atomic
for start_
to ensure thread safety during checks and updates.
}; | ||
|
||
explicit ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name = "ThreadPool"); | ||
explicit ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name = "ThreadPool"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ThreadPool constructor lacks explicit member initializations.
The constructor should explicitly initialize all its members to ensure that all objects are in a predictable state. This is particularly important for synchronization primitives like mutexes and condition variables.
- explicit ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name = "ThreadPool");
+ explicit ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name = "ThreadPool") : worker_num_(worker_num), max_queue_size_(max_queue_size), thread_pool_name_(std::move(thread_pool_name)), running_(false), should_stop_(false) {}
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
explicit ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name = "ThreadPool"); | |
explicit ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name = "ThreadPool") : worker_num_(worker_num), max_queue_size_(max_queue_size), thread_pool_name_(std::move(thread_pool_name)), running_(false), should_stop_(false) {} |
struct Node { | ||
Node* link_older = nullptr; | ||
Node* link_newer = nullptr; | ||
|
||
size_t worker_num_; | ||
// true if task is TimeTask | ||
bool is_time_task; | ||
TimeTask task; | ||
|
||
Node(TaskFunc func, void* arg) : is_time_task(false), task(0, func, arg) {} | ||
Node(uint64_t exec_time, TaskFunc func, void* arg) : is_time_task(true), task(exec_time, func, arg) {} | ||
|
||
inline void Exec() { task.func(task.arg); } | ||
inline Node* Next() { return link_newer; } | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Memory management for Node
objects needs attention.
The Node
struct handles tasks but does not use smart pointers for managing dynamic memory, which can lead to memory leaks.
- Node* link_older = nullptr;
- Node* link_newer = nullptr;
+ std::unique_ptr<Node> link_older;
+ std::unique_ptr<Node> link_newer;
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
struct Node { | |
Node* link_older = nullptr; | |
Node* link_newer = nullptr; | |
size_t worker_num_; | |
// true if task is TimeTask | |
bool is_time_task; | |
TimeTask task; | |
Node(TaskFunc func, void* arg) : is_time_task(false), task(0, func, arg) {} | |
Node(uint64_t exec_time, TaskFunc func, void* arg) : is_time_task(true), task(exec_time, func, arg) {} | |
inline void Exec() { task.func(task.arg); } | |
inline Node* Next() { return link_newer; } | |
}; | |
struct Node { | |
std::unique_ptr<Node> link_older; | |
std::unique_ptr<Node> link_newer; | |
// true if task is TimeTask | |
bool is_time_task; | |
TimeTask task; | |
Node(TaskFunc func, void* arg) : is_time_task(false), task(0, func, arg) {} | |
Node(uint64_t exec_time, TaskFunc func, void* arg) : is_time_task(true), task(exec_time, func, arg) {} | |
inline void Exec() { task.func(task.arg); } | |
inline Node* Next() { return link_newer.get(); } | |
}; |
Node* CreateMissingNewerLinks(Node* head, int* cnt); | ||
bool LinkOne(Node* node, std::atomic<Node*>* newest_node); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tip
Codebase Verification
Link management methods need thorough testing.
The methods CreateMissingNewerLinks
and LinkOne
are critical for maintaining the integrity of the task list. Ensure these methods are covered by unit tests to prevent issues in task scheduling.
- No unit tests found for
CreateMissingNewerLinks
. - No unit tests found for
LinkOne
.
Analysis chain
Link management methods need thorough testing.
The methods CreateMissingNewerLinks
and LinkOne
are critical for maintaining the integrity of the task list. Ensure these methods are covered by unit tests to prevent issues in task scheduling.
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify unit tests for link management methods.
# Test: Search for tests related to `CreateMissingNewerLinks` and `LinkOne`.
rg --type py 'test_CreateMissingNewerLinks|test_LinkOne'
Length of output: 58
Script:
#!/bin/bash
# Description: Verify unit tests for link management methods.
# Test: Search for tests related to `CreateMissingNewerLinks` and `LinkOne` in all test files.
rg 'CreateMissingNewerLinks|LinkOne' --type-add 'test:*.{cpp,h,py}' --type test
Length of output: 257
src/net/src/thread_pool.cc
Outdated
|
||
if (newest_node.load(std::memory_order_acquire) != nullptr) { | ||
last = newest_node.exchange(nullptr); | ||
if (last != nullptr) { | ||
would_spin_again = true; | ||
// success | ||
break; | ||
} | ||
} | ||
if (UNLIKELY(time_newest_node.load(std::memory_order_acquire) != nullptr)) { | ||
time_last = time_newest_node.exchange(nullptr); | ||
if (time_last != nullptr) { | ||
would_spin_again = true; | ||
// success | ||
break; | ||
} | ||
} | ||
|
||
auto now = std::chrono::steady_clock::now(); | ||
if (now == iter_begin || now - iter_begin >= std::chrono::microseconds(slow_yield_usec_)) { | ||
++slow_yield_count; | ||
if (slow_yield_count >= kMaxSlowYieldsWhileSpinning) { | ||
update_ctx = true; | ||
break; | ||
} | ||
} | ||
iter_begin = now; | ||
} | ||
} | ||
|
||
// update percentage of next loop 2 | ||
if (update_ctx) { | ||
auto v = yield_credit.load(std::memory_order_relaxed); | ||
v = v - (v / 1024) + (would_spin_again ? 1 : -1) * 131072; | ||
yield_credit.store(v, std::memory_order_relaxed); | ||
} | ||
|
||
if (!would_spin_again) { | ||
// 3. wait for new task | ||
continue; | ||
} | ||
} | ||
|
||
if (!queue_.empty()) { | ||
auto [func, arg] = queue_.front(); | ||
queue_.pop(); | ||
wsignal_.notify_one(); | ||
lock.unlock(); | ||
(*func)(arg); | ||
exec: | ||
// do all normal tasks older than this task pointed last | ||
if (LIKELY(last != nullptr)) { | ||
int cnt = 1; | ||
auto first = CreateMissingNewerLinks(last, &cnt); | ||
// node_cnt_ -= cnt; | ||
assert(!first->is_time_task); | ||
do { | ||
first->Exec(); | ||
tmp = first; | ||
first = first->Next(); | ||
node_cnt_--; | ||
delete tmp; | ||
} while (first != nullptr); | ||
} | ||
|
||
// do all time tasks older than this task pointed time_last | ||
if (UNLIKELY(time_last != nullptr)) { | ||
int cnt = 1; | ||
auto time_first = CreateMissingNewerLinks(time_last, &cnt); | ||
do { | ||
// time task may block normal task | ||
auto now = std::chrono::system_clock::now(); | ||
uint64_t unow = std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count(); | ||
|
||
auto [exec_time, func, arg] = time_first->task; | ||
assert(time_first->is_time_task); | ||
if (unow >= exec_time) { | ||
time_first->Exec(); | ||
} else { | ||
lock.lock(); | ||
// if task is coming now, do task immediately | ||
auto res = rsignal.wait_for(lock, std::chrono::microseconds(exec_time - unow), [this, &newest_node]() { | ||
return newest_node.load(std::memory_order_relaxed) != nullptr || UNLIKELY(should_stop()); | ||
}); | ||
lock.unlock(); | ||
if (res) { | ||
// re-push the timer tasks | ||
ReDelaySchedule(time_first); | ||
goto retry; | ||
} | ||
time_first->Exec(); | ||
} | ||
tmp = time_first; | ||
time_first = time_first->Next(); | ||
time_node_cnt_--; | ||
delete tmp; | ||
} while (time_first != nullptr); | ||
} | ||
goto retry; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Complex task handling logic in runInThread
.
The method handles both normal and time-based tasks with appropriate checks and synchronization. However, the complexity of this method could benefit from further comments or refactoring to improve readability.
src/net/src/thread_pool.cc
Outdated
// stop until the size of tasks queue is not greater than max_queue_size_ | ||
while (node_cnt_.load(std::memory_order_relaxed) >= max_queue_size_) { | ||
std::this_thread::yield(); | ||
} | ||
// slow like above | ||
if (node_cnt_.load(std::memory_order_relaxed) >= queue_slow_size_) { | ||
std::this_thread::yield(); | ||
} | ||
|
||
if (!should_stop()) { | ||
queue_.emplace(func, arg); | ||
rsignal_.notify_one(); | ||
if (LIKELY(!should_stop())) { | ||
auto node = new Node(func, arg); | ||
auto idx = ++task_idx_; | ||
LinkOne(node, &newest_node_[idx % nlinks_]); | ||
node_cnt_++; | ||
rsignal_[idx % nlinks_].notify_one(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Task scheduling needs optimization to reduce busy-waiting.
The use of std::this_thread::yield()
in a loop can lead to high CPU usage. Consider using condition variables or other synchronization mechanisms to reduce busy-waiting.
- while (node_cnt_.load(std::memory_order_relaxed) >= max_queue_size_) {
- std::this_thread::yield();
+ std::unique_lock lock(mu_);
+ rsignal_.wait(lock, [this]{ return node_cnt_.load(std::memory_order_relaxed) < max_queue_size_; });
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
// stop until the size of tasks queue is not greater than max_queue_size_ | |
while (node_cnt_.load(std::memory_order_relaxed) >= max_queue_size_) { | |
std::this_thread::yield(); | |
} | |
// slow like above | |
if (node_cnt_.load(std::memory_order_relaxed) >= queue_slow_size_) { | |
std::this_thread::yield(); | |
} | |
if (!should_stop()) { | |
queue_.emplace(func, arg); | |
rsignal_.notify_one(); | |
if (LIKELY(!should_stop())) { | |
auto node = new Node(func, arg); | |
auto idx = ++task_idx_; | |
LinkOne(node, &newest_node_[idx % nlinks_]); | |
node_cnt_++; | |
rsignal_[idx % nlinks_].notify_one(); | |
// stop until the size of tasks queue is not greater than max_queue_size_ | |
std::unique_lock lock(mu_); | |
rsignal_.wait(lock, [this]{ return node_cnt_.load(std::memory_order_relaxed) < max_queue_size_; }); | |
// slow like above | |
if (node_cnt_.load(std::memory_order_relaxed) >= queue_slow_size_) { | |
std::this_thread::yield(); | |
} | |
if (LIKELY(!should_stop())) { | |
auto node = new Node(func, arg); | |
auto idx = ++task_idx_; | |
LinkOne(node, &newest_node_[idx % nlinks_]); | |
node_cnt_++; | |
rsignal_[idx % nlinks_].notify_one(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (2)
- src/net/include/thread_pool.h (4 hunks)
- src/net/src/thread_pool.cc (5 hunks)
Additional comments not posted (20)
src/net/include/thread_pool.h (10)
34-38
: Ensure correct initialization ofArg
struct.The
Arg
struct is correctly initialized with the provided arguments.
40-40
: Ensure thread safety during worker initialization.The initialization of
Worker
objects includes settingstart_
tofalse
, which is correct. However, consider usingstd::atomic
forstart_
to ensure thread safety during checks and updates.
55-55
: ThreadPool constructor lacks explicit member initializations.The constructor should explicitly initialize all its members to ensure that all objects are in a predictable state. This is particularly important for synchronization primitives like mutexes and condition variables.
72-72
: Ensure proper memory management forNode
objects and correct ordering of atomic operations.To prevent memory leaks and race conditions, consider using smart pointers for
Node
objects and ensure correct memory order tags for atomic operations.
111-112
: Link management methods need thorough testing.The methods
CreateMissingNewerLinks
andLinkOne
are critical for maintaining the integrity of the task list. Ensure these methods are covered by unit tests to prevent issues in task scheduling.
72-73
: Complex task handling logic inrunInThread
.The method handles both normal and time-based tasks with appropriate checks and synchronization. However, the complexity of this method could benefit from further comments or refactoring to improve readability.
97-99
: LGTM!The
ReDelaySchedule
method correctly handles re-scheduling of timer tasks.
100-109
: LGTM!The
AsmVolatilePause
method correctly uses architecture-specific instructions for pausing execution.
111-112
: LGTM!The
CreateMissingNewerLinks
method correctly handles link creation in the task list.
112-113
: LGTM!The
LinkOne
method correctly handles node linking in the task list.src/net/src/thread_pool.cc (10)
17-20
: LGTM!The
WorkerMain
method correctly handles the worker's main execution loop with the updated argument.
26-26
: Error handling in thread creation needs improvement.When
pthread_create
fails, consider logging the error or throwing an exception to help with debugging and fault tolerance.
Line range hint
36-46
: LGTM!The
stop
method correctly handles stopping the worker thread.
48-70
: ThreadPool constructor lacks explicit member initializations.The constructor should explicitly initialize all its members to ensure that all objects are in a predictable state. This is particularly important for synchronization primitives like mutexes and condition variables.
Line range hint
76-85
: LGTM!The
start_thread_pool
method correctly handles starting the worker threads.
Line range hint
92-108
: LGTM!The
stop_thread_pool
method correctly handles stopping the worker threads and clearing the worker list.
115-129
: Task scheduling needs optimization to reduce busy-waiting.The use of
std::this_thread::yield()
in a loop can lead to high CPU usage. Consider using condition variables or other synchronization mechanisms to reduce busy-waiting.
140-146
: LGTM!The
DelaySchedule
method correctly handles scheduling delayed tasks in the thread pool.
160-313
: Complex task handling logic inrunInThread
.The method handles both normal and time-based tasks with appropriate checks and synchronization. However, the complexity of this method could benefit from further comments or refactoring to improve readability.
316-350
: LGTM!The
ReDelaySchedule
,CreateMissingNewerLinks
, andLinkOne
methods correctly handle their respective tasks.
在我这测试结果如下,命令为:
结论
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (2)
- conf/pika.conf (1 hunks)
- src/net/src/thread_pool.cc (5 hunks)
Additional comments not posted (16)
src/net/src/thread_pool.cc (7)
19-22
: LGTM! The change enhances the function's flexibility.The function now accepts a structured argument, improving control over task execution.
50-72
: LGTM! The constructor refactoring enhances adaptability.The changes introduce a more complex internal state, improving the pool's adaptability to varying workloads.
81-81
: LGTM! The dynamic calculation of the maximum queue size is a good enhancement.This change improves the pool's adaptability to varying workloads.
96-98
: LGTM! The notification mechanism for stopping worker threads is a good enhancement.This change ensures that all worker threads are correctly stopped.
145-150
: LGTM! The notification mechanism for delayed tasks is a good enhancement.This change ensures that worker threads are correctly notified of new tasks.
164-202
: LGTM! The retry mechanism ensures all tasks are executed.This change improves the thread pool's responsiveness and throughput.
205-215
: LGTM! The notification mechanism for re-queued tasks is a good enhancement.This change ensures that worker threads are correctly notified of re-queued tasks.
conf/pika.conf (9)
24-24
: LGTM! Increasing the number of threads enhances concurrency.This change allows for greater concurrency in handling requests.
28-28
: LGTM! Increasing the thread pool size enhances concurrency.This change indicates a substantial increase in the number of threads available for user requests.
567-567
: Verify the decrease in cache max memory.Decreasing the cache max memory may affect performance. Ensure that this change is intentional and based on performance testing.
718-718
: LGTM! Changing the compaction strategy may improve performance.This change may enhance performance during data writes and reads.
180-180
: LGTM! Increasing the number of log files enhances log retention.This change may improve log retention and analysis.
375-375
: LGTM! Increasing the number of background jobs enhances concurrency.This change allows for more concurrent background operations.
381-381
: LGTM! Changing the number of background flushes may improve performance.This change may enhance performance during data writes.
390-390
: LGTM! Changing the number of background compactions may improve performance.This change may enhance performance during data writes.
676-676
: Verify the increase in write buffer size.Increasing the write buffer size may improve write performance but could generate heavier IO load when flushing from buffer to disk. Ensure that this change is intentional and based on performance testing.
return nullptr; | ||
} | ||
|
||
int ThreadPool::Worker::start() { | ||
if (!start_.load()) { | ||
if (pthread_create(&thread_id_, nullptr, &WorkerMain, thread_pool_) != 0) { | ||
if (pthread_create(&thread_id_, nullptr, &WorkerMain, &arg_) != 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider improving error handling in thread creation.
When pthread_create
fails, consider logging the error or throwing an exception to help with debugging and fault tolerance.
- if (pthread_create(&thread_id_, nullptr, &WorkerMain, &arg_) != 0) {
+ if (int err = pthread_create(&thread_id_, nullptr, &WorkerMain, &arg_); err != 0) {
+ throw std::runtime_error("Failed to create thread: " + std::strerror(err));
}
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
if (pthread_create(&thread_id_, nullptr, &WorkerMain, &arg_) != 0) { | |
if (int err = pthread_create(&thread_id_, nullptr, &WorkerMain, &arg_); err != 0) { | |
throw std::runtime_error("Failed to create thread: " + std::strerror(err)); | |
} |
node_cnt_++; | ||
// stop until the size of tasks queue is not greater than max_queue_size_ | ||
while (node_cnt_.load(std::memory_order_relaxed) >= max_queue_size_) { | ||
std::this_thread::yield(); | ||
// pstd::SleepForMicroseconds(1); | ||
} | ||
// slow like above | ||
if (node_cnt_.load(std::memory_order_relaxed) >= queue_slow_size_) { | ||
std::this_thread::yield(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using condition variables to reduce busy-waiting.
The use of std::this_thread::yield()
in a loop can lead to high CPU usage. Consider using condition variables or other synchronization mechanisms to reduce busy-waiting.
- while (node_cnt_.load(std::memory_order_relaxed) >= max_queue_size_) {
- std::this_thread::yield();
+ std::unique_lock lock(mu_);
+ rsignal_.wait(lock, [this]{ return node_cnt_.load(std::memory_order_relaxed) < max_queue_size_; });
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
node_cnt_++; | |
// stop until the size of tasks queue is not greater than max_queue_size_ | |
while (node_cnt_.load(std::memory_order_relaxed) >= max_queue_size_) { | |
std::this_thread::yield(); | |
// pstd::SleepForMicroseconds(1); | |
} | |
// slow like above | |
if (node_cnt_.load(std::memory_order_relaxed) >= queue_slow_size_) { | |
std::this_thread::yield(); | |
} | |
node_cnt_++; | |
// stop until the size of tasks queue is not greater than max_queue_size_ | |
std::unique_lock lock(mu_); | |
rsignal_.wait(lock, [this]{ return node_cnt_.load(std::memory_order_relaxed) < max_queue_size_; }); | |
// slow like above | |
if (node_cnt_.load(std::memory_order_relaxed) >= queue_slow_size_) { | |
std::this_thread::yield(); | |
} |
OS: ubuntu 2204.3
测试命令和结果如下
|
@cheniujh review |
该 pr 的 p99 问题结合 #2816 解决。 |
This pr's p99 issue is resolved in conjunction with #2816. |
e4e8c1d
into
OpenAtomFoundation:OSPP-ThreadPool-class
The logic is based on function
WriteThread::AwaitState
in rocksdb. linkBefore:
await
. It can make the worker sleep with high probability due to intense competition. And it can cost much time to sleep and wake up.After:
2.1. 1-level. Using spin-loop to wait.
2.2. 2-level. Using long-time-loop to wait. The worker maybe yield the cpu when some condition is reached. And using a data to store probability of entering 2-level loop.
2.3. 3-level. Using function
await
to wait for new tasks.params
default:
200
. Too much number maybe cause high cpu load. Too few number maybe cause vain opration.default:
std::min(worker_num, 100)
. When the number of tasks in queue exceeds it, the main thread which call functionSchedule
callstd::this_thread::yield()
.default:
max_queue_size
. When the number of tasks in queue exceeds it, the main thread which call functionSchedule
callstd::this_thread::yield()
till the number of tasks in queue is less than threshold.default:
100
. The max time of loop in 2-level loop.default:
3
. If the time the functionstd::this_thread::yield()
spends exceeds the threshold, the data sorce may be updated.default:
3
. If the times of reaching condition above(5), the data sorce will be updated.default:
256
. It represent the provability of enter 2-level loop is not lower than1/sampling_base
.Summary by CodeRabbit
Refactor
ThreadPool
andWorker
classes for better performance and scalability.New Features
Improvements