Skip to content

Commit

Permalink
Merge branch 'new-cluster12' into 'master' (merge request !328)
Browse files Browse the repository at this point in the history
fix migrate snapshot timeout bug#101
### MR描述
搬迁过程中线程调度分配问题 或者是binlog接收时候会导致没sender线程发送的问题。

发送端线程池需要做完send Snapshot+ send binlog 而接收端线程池只做 receive snapshot ,binlog走的是session执行命令,这两个线程的执行时间通常有几秒的误差,导致了任务调度时,receiver调度更快这样可能会触发snapshot接收超时的bug

### 修改动机和上下文背景
上下文参考 #101

思路1:让接受端线程池 调度时考虑到正在接受binlog的任务,如果有接受binlog的任务暂时延迟

思路2:

交给recever接收端去决定重试和调度,当receiver 发送readymigrate命令给sender时,判断线程池是不是满了,如果满了,sender就不去发数据(改为waiting状态 然后100毫秒后再次调度),也就是是sender不会出于在线程池排队的状态(这个状态会导致超时的bug)

### 此MR如何进行测试 ?
cluster_test 增加了测例migrateChangeThread


### change 类型
<!---你的代码引入了何种类型的change, 在所有关联的复选框前选择"x" -->
- [ ] Bug fix (修复了issue的非侵入式修改)
- [ ] New feature (增加功能的非侵入式修改)
- [ ] Breaking change (修复或者增加特性, 但是会造成现有行为的非预期行为)

### 清单
<!--- 查看下述选项,并进行"x"勾选 -->
<!--- 如果你对所有都不确定, 请随时咨询我们 -->
- [ ] 遵循项目的Code-Style
- [ ] Change 需要文档的修改
- [ ] 我已经进行相关文档的修改
- [ ] 我的MR已经通过的相关流水线测试
  • Loading branch information
TendisDev committed Apr 29, 2021
2 parents 2f259db + 2e6d1a3 commit d542c5b
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 10 deletions.
2 changes: 1 addition & 1 deletion src/tendisplus/cluster/cluster_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1214,7 +1214,7 @@ void ClusterState::setFailAuthSent(uint32_t t) {
bool ClusterState::clusterNodeFailed(const std::string& nodeid) {
std::lock_guard<myMutex> lk(_mutex);
auto node = clusterLookupNodeNoLock(nodeid);
return node->nodeFailed();
return node == nullptr ? true : node->nodeFailed();
}

bool ClusterState::isDataAgeTooLarge() {
Expand Down
138 changes: 138 additions & 0 deletions src/tendisplus/cluster/cluster_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1360,6 +1360,144 @@ TEST(Cluster, migrate) {
servers.clear();
}

TEST(Cluster, migrateChangeThread) {
std::vector<std::string> dirs = {"node1", "node2"};
uint32_t startPort = 12300;

const auto guard = MakeGuard([dirs] {
for (auto dir : dirs) {
destroyEnv(dir);
}
std::this_thread::sleep_for(std::chrono::seconds(5));
});

std::vector<std::shared_ptr<ServerEntry>> servers;

uint32_t index = 0;
for (auto dir : dirs) {
uint32_t nodePort = startPort + index++;
EXPECT_TRUE(setupEnv(dir));

auto cfg1 = makeServerParam(nodePort, storeCnt, dir, true);
cfg1->clusterEnabled = true;
cfg1->pauseTimeIndexMgr = 1;
cfg1->rocksBlockcacheMB = 24;
// this test will migrate back
cfg1->enableGcInMigate = true;
// make sender thread less than receive num
cfg1->migrateReceiveThreadnum = 10;
cfg1->migrateSenderThreadnum = 3;

auto master = std::make_shared<ServerEntry>(cfg1);
auto s = master->startup(cfg1);
if (!s.ok()) {
LOG(ERROR) << "server start fail:" << s.toString();
}
INVARIANT(s.ok());
servers.emplace_back(std::move(master));
}

auto& srcNode = servers[0];
auto& dstNode = servers[1];

auto ctx1 = std::make_shared<asio::io_context>();
auto sess1 = makeSession(srcNode, ctx1);
WorkLoad work1(srcNode, sess1);
work1.init();
auto ctx2 = std::make_shared<asio::io_context>();
auto sess2 = makeSession(dstNode, ctx2);
WorkLoad work2(dstNode, sess2);
work2.init();

// addSlots
LOG(INFO) << "begin meet";
work1.clusterMeet(dstNode->getParams()->bindIp, dstNode->getParams()->port);
std::this_thread::sleep_for(std::chrono::seconds(10));

std::vector<std::string> slots = {"{0..9300}", "{9301..16383}"};

// addSlots
LOG(INFO) << "begin addSlots.";
work1.addSlots(slots[0]);
work2.addSlots(slots[1]);
LOG(INFO) << "add slots sucess";
std::this_thread::sleep_for(std::chrono::seconds(10));

const uint32_t numData = 10000;
// for support MOVED
string srcAddr =
srcNode->getParams()->bindIp + ":" + to_string(srcNode->getParams()->port);
string dstAddr =
dstNode->getParams()->bindIp + ":" + to_string(dstNode->getParams()->port);
work1.addClusterSession(srcAddr, sess1);
work1.addClusterSession(dstAddr, sess2);
work2.addClusterSession(srcAddr, sess1);
work2.addClusterSession(dstAddr, sess2);

std::vector<uint32_t> slotsList;
uint32_t keysize1 = 0;
// migrate slots {8000..9300}
uint32_t startSlot = 8000;
uint32_t endSlot = 9300;
for (uint32_t i = startSlot; i <= endSlot; i++) {
slotsList.push_back(i);
}

auto bitmap = getBitSet(slotsList);

for (size_t j = 0; j < numData; ++j) {
string key = getUUid(10);
string value = getUUid(10);
auto ret = work1.getStringResult({"set", key, value});
EXPECT_EQ(ret, "+OK\r\n");

// begin to migate when half data been writen
if (j == numData / 2) {
LOG(INFO) << "migrate begin";
auto s = migrate(srcNode, dstNode, bitmap);
EXPECT_TRUE(s.ok());
}
// compute migrate key num
uint32_t hash = uint32_t(redis_port::keyHashSlot(key.c_str(), key.size()));
auto writeSlots = hash % srcNode->getParams()->chunkSize;
if (bitmap.test(writeSlots)) {
keysize1++;
}
}

std::this_thread::sleep_for(35s);

// bitmap should belong to dstNode
ASSERT_EQ(checkSlotsBlong(
bitmap,
srcNode,
srcNode->getClusterMgr()->getClusterState()->getMyselfName()),
false);
ASSERT_EQ(checkSlotsBlong(
bitmap,
dstNode,
dstNode->getClusterMgr()->getClusterState()->getMyselfName()),
true);

uint32_t keysize2 = 0;
for (auto& vs : slotsList) {
keysize2 += dstNode->getClusterMgr()->countKeysInSlot(vs);
}

// migrate key num is right
ASSERT_EQ(keysize1, keysize2);

#ifndef _WIN32
for (auto svr : servers) {
svr->stop();
LOG(INFO) << "stop " << svr->getParams()->port << " success";
}
#endif
LOG(INFO) << "stop servers here";
servers.clear();
}


TEST(Cluster, stopMigrate) {
std::vector<std::string> dirs = {"node1", "node2"};
uint32_t startPort = 15000;
Expand Down
44 changes: 39 additions & 5 deletions src/tendisplus/cluster/migrate_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ Status MigrateManager::stopTasks(const std::string& taskid) {
}

/* stop all tasks which is doing now */
void MigrateManager::stopAllTasks(bool saveSlots) {
Status MigrateManager::stopAllTasks(bool saveSlots) {
std::map<std::string, std::string> pTaskMap;
{
std::lock_guard<myMutex> lk(_mutex);
Expand Down Expand Up @@ -288,10 +288,20 @@ void MigrateManager::stopAllTasks(bool saveSlots) {
for (auto iter = pTaskMap.begin(); iter != pTaskMap.end(); iter++) {
auto node = _cluster->clusterLookupNode(iter->second);
// send stop command to srcNode
if (!node) {
LOG(ERROR) << "can not find srcNode:" << iter->second;
return {ErrorCodes::ERR_CLUSTER, "can not find srcNode"};
}

auto s = stopSrcNode(iter->first, node->getNodeIp(), node->getPort());
if (!s.ok()) {
LOG(ERROR) << "stop srcNode task fail:" << s.toString();
return s;
}
}

LOG(INFO) << "stop all import slots:" << bitsetStrEncode(_stopImportSlots);
return {ErrorCodes::ERR_OK, ""};
}

void MigrateManager::removeRestartSlots(const std::string& nodeid,
Expand Down Expand Up @@ -345,8 +355,21 @@ bool MigrateManager::senderSchedule(const SCLOCK::time_point& now) {
}
++it;
} else if (taskPtr->_state == MigrateSendState::START) {
taskPtr->_isRunning = true;
_migrateSender->schedule([this, iter = taskPtr]() { iter->sendSlots(); });
/* NOTE(wayenchen) send slots task should not be in queue in case of
* timeout */
if (!_migrateSender->isFull()) {
taskPtr->_isRunning = true;
_migrateSender->schedule(
[this, iter = taskPtr]() { iter->sendSlots(); });
} else {
LOG(ERROR) << "sender task threadpool is full on slots:"
<< bitsetStrEncode(taskPtr->_slots)
<< "taskid:" << taskPtr->_taskid;
taskPtr->_sender->stop();
taskPtr->_nextSchedTime =
SCLOCK::now() + std::chrono::milliseconds(100);
taskPtr->_state = MigrateSendState::WAIT;
}
++it;
} else if (taskPtr->_state == MigrateSendState::CLEAR) {
taskPtr->_isRunning = true;
Expand Down Expand Up @@ -693,6 +716,10 @@ bool MigrateManager::checkSlotOK(const SlotsBitmap& bitMap,
std::vector<uint32_t>* taskSlots) {
CNodePtr dstNode = _cluster->clusterLookupNode(nodeid);
CNodePtr myself = _cluster->getMyselfNode();
if (dstNode == nullptr) {
LOG(ERROR) << "can not find dstNode:" << nodeid;
return false;
}
size_t idx = 0;

while (idx < bitMap.size()) {
Expand Down Expand Up @@ -1017,6 +1044,14 @@ void MigrateManager::dstReadyMigrate(asio::ip::tcp::socket sock,
client->writeLine(ss.str());
return;
}
auto dstNode = _cluster->clusterLookupNode(nodeidArg);
if (!dstNode) {
LOG(ERROR) << "import node" << nodeidArg << "not find";
std::stringstream ss;
ss << "-ERR can not find dst node: invalid nodeid";
client->writeLine(ss.str());
return;
}
std::lock_guard<myMutex> lk(_mutex);
/* find the right task by taskid, if not found, return error */
if (_migrateSendTaskMap.find(taskidArg) != _migrateSendTaskMap.end()) {
Expand Down Expand Up @@ -1290,7 +1325,6 @@ void MigrateReceiveTask::fullReceive() {
/*NOTE(wayenchen) if srcNode not Fail
and my receiver snapshot key num is zero,
retry for three times*/

bool srcNodeFail =
_svr->getClusterMgr()->getClusterState()->clusterNodeFailed(
_pTask->_nodeid);
Expand All @@ -1301,7 +1335,7 @@ void MigrateReceiveTask::fullReceive() {
auto delayTime = 1000 + redis_port::random() % 5000;
_nextSchedTime = SCLOCK::now() + std::chrono::milliseconds(delayTime);
_state = MigrateReceiveState::RECEIVE_SNAPSHOT;
_retryTime++;
_retryTime.fetch_add(1, memory_order_relaxed);
LOG(ERROR) << "receiveSnapshot need retry" << bitsetStrEncode(_slots)
<< "taskid:" << _taskid << "error str:" << s.toString();
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/tendisplus/cluster/migrate_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ class MigrateManager {
void stop();

Status stopTasks(const std::string& taskid);
void stopAllTasks(bool saveSlots = true);
Status stopAllTasks(bool saveSlots = true);
// sender POV
bool senderSchedule(const SCLOCK::time_point& now);

Expand Down
3 changes: 2 additions & 1 deletion src/tendisplus/cluster/migrate_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ Expected<uint64_t> ChunkMigrateSender::sendRange(Transaction* txn,
uint32_t curWriteNum = 0;
uint32_t timeoutSec = 5;
Status s;
uint64_t sendKeyNum = _cfg->migrateSnapshotKeyNum;
while (true) {
Expected<Record> expRcd = cursor->next();
if (expRcd.status().code() == ErrorCodes::ERR_EXHAUST) {
Expand Down Expand Up @@ -247,7 +248,7 @@ Expected<uint64_t> ChunkMigrateSender::sendRange(Transaction* txn,
*/
_svr->getMigrateManager()->requestRateLimit(sendBytes);

if (curWriteNum >= 10000 || curWriteLen > 10 * 1024 * 1024) {
if (curWriteNum >= sendKeyNum || curWriteLen > 10 * 1024 * 1024) {
SyncWriteData("1");
SyncReadData(exptData, _OKSTR.length(), timeoutSec);
if (exptData.value() != _OKSTR) {
Expand Down
3 changes: 3 additions & 0 deletions src/tendisplus/commands/cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,9 @@ class ClusterCommand : public Command {
std::string nodeId = (*it).first;
auto taskMap = (*it).second;
auto srcNode = clusterState->clusterLookupNode(nodeId);
if (srcNode == nullptr) {
return {ErrorCodes::ERR_CLUSTER, "import node not find"};
}
auto exptTaskid = startAllSlotsTasks(
taskMap, svr, nodeId, clusterState, srcNode, myself, true);
if (!exptTaskid.ok()) {
Expand Down
4 changes: 2 additions & 2 deletions src/tendisplus/server/server_params.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,8 @@ ServerParams::ServerParams() {
migrateBinlogIter);
REGISTER_VARS_DIFF_NAME_DYNAMIC("cluster-migration-slots-num-per-task",
migrateTaskSlotsLimit);
REGISTER_VARS_DIFF_NAME_DYNAMIC("migrate-snapshot-retry-num",
snapShotRetryCnt);
REGISTER_VARS_DIFF_NAME_DYNAMIC("migrate-snapshot-key-num",
migrateSnapshotKeyNum);
REGISTER_VARS_DIFF_NAME_DYNAMIC("cluster-migration-rate-limit",
migrateRateLimitMB);
REGISTER_VARS_DIFF_NAME_DYNAMIC("migrate-snapshot-retry-num",
Expand Down
1 change: 1 addition & 0 deletions src/tendisplus/server/server_params.h
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ class ServerParams {
uint32_t migrateDistance = 10000;
uint32_t migrateBinlogIter = 10;
uint32_t migrateRateLimitMB = 32;
uint32_t migrateSnapshotKeyNum = 100000;
uint32_t clusterNodeTimeout = 15000;
bool clusterRequireFullCoverage = true;
bool clusterSlaveNoFailover = false;
Expand Down

0 comments on commit d542c5b

Please sign in to comment.