Skip to content

Commit

Permalink
fix: blpop/brpop don't update cache (#2858)
Browse files Browse the repository at this point in the history
* fix the problem of blpop don't update cache
  • Loading branch information
cheniujh authored Aug 12, 2024
1 parent af72f17 commit 6991190
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 9 deletions.
2 changes: 1 addition & 1 deletion include/pika_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class BlockingBaseCmd : public Cmd {
void BlockThisClientToWaitLRPush(BlockKeyType block_pop_type, std::vector<std::string>& keys, int64_t expire_time);
void TryToServeBLrPopWithThisKey(const std::string& key, std::shared_ptr<DB> db);
static void ServeAndUnblockConns(void* args);
static void WriteBinlogOfPop(std::vector<WriteBinlogOfPopArgs>& pop_args);
static void WriteBinlogOfPopAndUpdateCache(std::vector<WriteBinlogOfPopArgs>& pop_args);
void removeDuplicates(std::vector<std::string>& keys_);
// blpop/brpop used functions end
};
Expand Down
7 changes: 4 additions & 3 deletions src/net/src/worker_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,6 @@ void* WorkerThread::ThreadMain() {
}

if (((pfe->mask & kErrorEvent) != 0) || (should_close != 0)) {
//check if this conn disconnected from being blocked by blpop/brpop
dynamic_cast<net::DispatchThread*>(server_thread_)->ClosingConnCheckForBlrPop(std::dynamic_pointer_cast<net::RedisConn>(in_conn));
net_multiplexer_->NetDelEvent(pfe->fd, 0);
CloseFd(in_conn);
in_conn = nullptr;
Expand Down Expand Up @@ -235,7 +233,6 @@ void WorkerThread::DoCronTask() {
}
conns_.clear();
deleting_conn_ipport_.clear();
return;
}

auto iter = conns_.begin();
Expand Down Expand Up @@ -274,9 +271,11 @@ void WorkerThread::DoCronTask() {
}
}
for (const auto& conn : to_close) {
net_multiplexer_->NetDelEvent(conn->fd(), 0);
CloseFd(conn);
}
for (const auto& conn : to_timeout) {
net_multiplexer_->NetDelEvent(conn->fd(), 0);
CloseFd(conn);
server_thread_->handle_->FdTimeoutHandle(conn->fd(), conn->ip_port());
}
Expand Down Expand Up @@ -304,6 +303,8 @@ bool WorkerThread::TryKillConn(const std::string& ip_port) {
void WorkerThread::CloseFd(const std::shared_ptr<NetConn>& conn) {
close(conn->fd());
if (auto dispatcher = dynamic_cast<DispatchThread *>(server_thread_); dispatcher != nullptr ) {
//check if this conn disconnected from being blocked by blpop/brpop
dispatcher->ClosingConnCheckForBlrPop(std::dynamic_pointer_cast<net::RedisConn>(conn));
dispatcher->RemoveWatchKeys(conn);
}
server_thread_->handle_->FdClosedHandle(conn->fd(), conn->ip_port());
Expand Down
11 changes: 6 additions & 5 deletions src/pika_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ void BlockingBaseCmd::ServeAndUnblockConns(void* args) {
net::BlockKey blrPop_key{db->GetDBName(), key};

pstd::lock::ScopeRecordLock record_lock(db->LockMgr(), key);//It's a RAII Lock
std::unique_lock map_lock(dispatchThread->GetBlockMtx());// do not change the sequence of these two lock, or deadlock will happen
std::unique_lock map_lock(dispatchThread->GetBlockMtx());// do not change the sequence of these 3 locks, or deadlock will happen
auto it = key_to_conns_.find(blrPop_key);
if (it == key_to_conns_.end()) {
return;
Expand Down Expand Up @@ -223,10 +223,10 @@ void BlockingBaseCmd::ServeAndUnblockConns(void* args) {
}
dispatchThread->CleanKeysAfterWaitNodeCleaned();
map_lock.unlock();
WriteBinlogOfPop(pop_binlog_args);
WriteBinlogOfPopAndUpdateCache(pop_binlog_args);
}

void BlockingBaseCmd::WriteBinlogOfPop(std::vector<WriteBinlogOfPopArgs>& pop_args) {
void BlockingBaseCmd::WriteBinlogOfPopAndUpdateCache(std::vector<WriteBinlogOfPopArgs>& pop_args) {
// write binlog of l/rpop
for (auto& pop_arg : pop_args) {
std::shared_ptr<Cmd> pop_cmd;
Expand All @@ -246,6 +246,7 @@ void BlockingBaseCmd::WriteBinlogOfPop(std::vector<WriteBinlogOfPopArgs>& pop_ar
pop_cmd->SetConn(pop_arg.conn);
auto resp_ptr = std::make_shared<std::string>("this resp won't be used for current code(consensus-level always be 0)");
pop_cmd->SetResp(resp_ptr);
pop_cmd->DoUpdateCache();
pop_cmd->DoBinlog();
}
}
Expand Down Expand Up @@ -395,7 +396,7 @@ void BLPopCmd::DoBinlog() {
}
std::vector<WriteBinlogOfPopArgs> args;
args.push_back(std::move(binlog_args_));
WriteBinlogOfPop(args);
WriteBinlogOfPopAndUpdateCache(args);
}

void LPopCmd::DoInitial() {
Expand Down Expand Up @@ -729,7 +730,7 @@ void BRPopCmd::DoBinlog() {
}
std::vector<WriteBinlogOfPopArgs> args;
args.push_back(std::move(binlog_args_));
WriteBinlogOfPop(args);
WriteBinlogOfPopAndUpdateCache(args);
}


Expand Down

0 comments on commit 6991190

Please sign in to comment.