Skip to content

Commit 07d8067

Browse files
committed
fix: avoid cq burn out
1 parent caba2ec commit 07d8067

File tree

5 files changed

+39
-11
lines changed

5 files changed

+39
-11
lines changed

mooncake-transfer-engine/include/transport/rdma_transport/rdma_context.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,12 @@ class RdmaTransport;
3939
class WorkerPool;
4040
class EndpointStore;
4141

42+
struct RdmaCq {
43+
RdmaCq() : native(nullptr), outstanding(0) {}
44+
ibv_cq *native;
45+
volatile int outstanding;
46+
};
47+
4248
// RdmaContext represents the set of resources controlled by each local NIC,
4349
// including Memory Region, CQ, EndPoint (QPs), etc.
4450
class RdmaContext {
@@ -111,6 +117,10 @@ class RdmaContext {
111117

112118
ibv_cq *cq();
113119

120+
volatile int *cqOutstandingCount(int cq_index) {
121+
return &cq_list_[cq_index].outstanding;
122+
}
123+
114124
int cqCount() const { return cq_list_.size(); }
115125

116126
int poll(int num_entries, ibv_wc *wc, int cq_index = 0);
@@ -150,7 +160,8 @@ class RdmaContext {
150160

151161
RWSpinlock memory_regions_lock_;
152162
std::vector<ibv_mr *> memory_region_list_;
153-
std::vector<ibv_cq *> cq_list_;
163+
// std::vector<ibv_cq *> cq_list_;
164+
std::vector<RdmaCq> cq_list_;
154165

155166
std::shared_ptr<EndpointStore> endpoint_store_;
156167

mooncake-transfer-engine/include/transport/rdma_transport/rdma_endpoint.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ class RdmaEndPoint {
125125
int max_wr_depth_;
126126

127127
volatile bool active_;
128+
volatile int *cq_outstanding_;
128129
};
129130

130131
} // namespace mooncake

mooncake-transfer-engine/src/transport/rdma_transport/rdma_context.cpp

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -108,13 +108,16 @@ int RdmaContext::construct(size_t num_cq_list, size_t num_comp_channels,
108108

109109
cq_list_.resize(num_cq_list);
110110
for (size_t i = 0; i < num_cq_list; ++i) {
111-
cq_list_[i] = ibv_create_cq(context_, max_cqe, this /* CQ context */,
112-
compChannel(), compVector());
113-
if (!cq_list_[i]) {
111+
auto cq =
112+
ibv_create_cq(context_, max_cqe,
113+
(void *)&cq_list_[i].outstanding /* CQ context */,
114+
compChannel(), compVector());
115+
if (!cq) {
114116
PLOG(ERROR) << "Failed to create completion queue";
115117
close(event_fd_);
116118
return ERR_CONTEXT;
117119
}
120+
cq_list_[i].native = cq;
118121
}
119122

120123
worker_pool_ = std::make_shared<WorkerPool>(*this, socketId());
@@ -153,7 +156,7 @@ int RdmaContext::deconstruct() {
153156
memory_region_list_.clear();
154157

155158
for (size_t i = 0; i < cq_list_.size(); ++i) {
156-
int ret = ibv_destroy_cq(cq_list_[i]);
159+
int ret = ibv_destroy_cq(cq_list_[i].native);
157160
if (ret) {
158161
PLOG(ERROR) << "Failed to destroy completion queue";
159162
}
@@ -310,7 +313,7 @@ std::string RdmaContext::gid() const {
310313

311314
ibv_cq *RdmaContext::cq() {
312315
int index = (next_cq_list_index_++) % cq_list_.size();
313-
return cq_list_[index];
316+
return cq_list_[index].native;
314317
}
315318

316319
ibv_comp_channel *RdmaContext::compChannel() {
@@ -498,7 +501,7 @@ int RdmaContext::joinNonblockingPollList(int event_fd, int data_fd) {
498501
}
499502

500503
int RdmaContext::poll(int num_entries, ibv_wc *wc, int cq_index) {
501-
int nr_poll = ibv_poll_cq(cq_list_[cq_index], num_entries, wc);
504+
int nr_poll = ibv_poll_cq(cq_list_[cq_index].native, num_entries, wc);
502505
if (nr_poll < 0) {
503506
LOG(ERROR) << "Failed to poll CQ " << cq_index << " of device "
504507
<< device_name_;

mooncake-transfer-engine/src/transport/rdma_transport/rdma_endpoint.cpp

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ const static uint8_t TIMEOUT = 14;
2727
const static uint8_t RETRY_CNT = 7;
2828

2929
RdmaEndPoint::RdmaEndPoint(RdmaContext &context)
30-
: context_(context), status_(INITIALIZING), active_(true) {}
30+
: context_(context),
31+
status_(INITIALIZING),
32+
active_(true),
33+
cq_outstanding_(nullptr) {}
3134

3235
RdmaEndPoint::~RdmaEndPoint() {
3336
if (!qp_list_.empty()) deconstruct();
@@ -42,6 +45,7 @@ int RdmaEndPoint::construct(ibv_cq *cq, size_t num_qp_list,
4245
}
4346

4447
qp_list_.resize(num_qp_list);
48+
cq_outstanding_ = (volatile int *)cq->cq_context;
4549

4650
max_wr_depth_ = (int)max_wr_depth;
4751
wr_depth_list_ = new volatile int[num_qp_list];
@@ -224,7 +228,9 @@ int RdmaEndPoint::submitPostSend(
224228
int qp_index = SimpleRandom::Get().next(qp_list_.size());
225229
int wr_count = std::min(max_wr_depth_ - wr_depth_list_[qp_index],
226230
(int)slice_list.size());
227-
if (wr_count == 0) return 0;
231+
wr_count =
232+
std::min(int(globalConfig().max_cqe) - *cq_outstanding_, wr_count);
233+
if (wr_count <= 0) return 0;
228234

229235
ibv_send_wr wr_list[wr_count], *bad_wr = nullptr;
230236
ibv_sge sge_list[wr_count];
@@ -258,13 +264,15 @@ int RdmaEndPoint::submitPostSend(
258264
// }
259265
}
260266
__sync_fetch_and_add(&wr_depth_list_[qp_index], wr_count);
267+
__sync_fetch_and_add(cq_outstanding_, wr_count);
261268
int rc = ibv_post_send(qp_list_[qp_index], wr_list, &bad_wr);
262269
if (rc) {
263270
PLOG(ERROR) << "Failed to ibv_post_send";
264271
while (bad_wr) {
265272
int i = bad_wr - wr_list;
266273
failed_slice_list.push_back(slice_list[i]);
267274
__sync_fetch_and_sub(&wr_depth_list_[qp_index], 1);
275+
__sync_fetch_and_sub(cq_outstanding_, 1);
268276
bad_wr = bad_wr->next;
269277
}
270278
}

mooncake-transfer-engine/src/transport/rdma_transport/worker_pool.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,9 @@ void WorkerPool::performPollCq(int thread_id) {
305305
processed_slice_count++;
306306
}
307307
}
308+
if (nr_poll)
309+
__sync_fetch_and_sub(context_.cqOutstandingCount(cq_index),
310+
nr_poll);
308311
}
309312

310313
for (auto &entry : qp_depth_set)
@@ -393,10 +396,12 @@ int WorkerPool::doProcessContextEvents() {
393396
event.event_type == IBV_EVENT_LID_CHANGE) {
394397
context_.set_active(false);
395398
context_.disconnectAllEndpoints();
396-
LOG(INFO) << "Worker: Context " << context_.deviceName() << " is now inactive";
399+
LOG(INFO) << "Worker: Context " << context_.deviceName()
400+
<< " is now inactive";
397401
} else if (event.event_type == IBV_EVENT_PORT_ACTIVE) {
398402
context_.set_active(true);
399-
LOG(INFO) << "Worker: Context " << context_.deviceName() << " is now active";
403+
LOG(INFO) << "Worker: Context " << context_.deviceName()
404+
<< " is now active";
400405
}
401406
ibv_ack_async_event(&event);
402407
return 0;

0 commit comments

Comments
 (0)