Skip to content

Commit

Permalink
Return set to vector for clients to be processed ensuring we run them…
Browse files Browse the repository at this point in the history
… in order
  • Loading branch information
JohnSully committed Mar 1, 2024
1 parent 490ee84 commit c08683e
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 13 deletions.
2 changes: 1 addition & 1 deletion src/AsyncWorkQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ void AsyncWorkQueue::WorkerThreadMain()
listRelease(vars.clients_pending_asyncwrite);

std::unique_lock<fastlock> lockf(serverTL->lockPendingWrite);
serverTL->setclientsProcess.clear();
serverTL->vecclientsProcess.clear();
serverTL->clients_pending_write.clear();
std::atomic_thread_fence(std::memory_order_seq_cst);
}
Expand Down
2 changes: 1 addition & 1 deletion src/blocked.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ void unblockClient(client *c) {
listDelNode(g_pserver->paused_clients,c->paused_list_node);
c->paused_list_node = NULL;
} else if (c->btype == BLOCKED_STORAGE) {
serverTL->setclientsProcess.insert(c);
serverTL->vecclientsProcess.push_back(c);
} else {
serverPanic("Unknown btype in unblockClient().");
}
Expand Down
10 changes: 7 additions & 3 deletions src/db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3142,8 +3142,11 @@ void redisDbPersistentData::trackChanges(bool fBulk, size_t sizeHint)
if (fBulk)
m_fAllChanged.fetch_add(1, std::memory_order_acq_rel);

if (sizeHint > 0 && aeThreadOwnsLock())
if (sizeHint > 0) {
aeAcquireLock();
dictExpand(m_dictChanged, sizeHint, false);
aeReleaseLock();
}
}

void redisDbPersistentData::removeAllCachedValues()
Expand Down Expand Up @@ -3362,8 +3365,9 @@ void redisDbPersistentData::prefetchKeysFlash(std::unordered_set<client*> &setc)
auto *tok = m_spstorage->begin_retrieve(serverTL->el, storageLoadCallback, veckeys.data(), veckeys.size());
if (tok != nullptr) {
for (client *c : setcBlocked) {
if (!(c->flags & CLIENT_BLOCKED))
if (!(c->flags & CLIENT_BLOCKED)) {
blockClient(c, BLOCKED_STORAGE);
}
}
tok->setc = std::move(setcBlocked);
tok->type = StorageToken::TokenType::SingleRead;
Expand Down Expand Up @@ -3429,6 +3433,6 @@ void redisDbPersistentData::processStorageToken(StorageToken *tok) {
if (c->flags & CLIENT_BLOCKED)
unblockClient(c);
else
serverTL->setclientsProcess.insert(c);
serverTL->vecclientsProcess.push_back(c);
}
}
12 changes: 6 additions & 6 deletions src/networking.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1626,7 +1626,7 @@ void unlinkClient(client *c) {
c->fPendingAsyncWrite = FALSE;
}

serverTL->setclientsProcess.erase(c);
serverTL->vecclientsProcess.erase(std::remove(serverTL->vecclientsProcess.begin(), serverTL->vecclientsProcess.end(), c), serverTL->vecclientsProcess.end());
serverTL->setclientsPrefetch.erase(c);

/* Clear the tracking status. */
Expand Down Expand Up @@ -2822,7 +2822,7 @@ void readQueryFromClient(connection *conn) {
}
c->vecqueuedcmd.clear();
} else {
serverTL->setclientsProcess.insert(c);
serverTL->vecclientsProcess.push_back(c);
}
}
}
Expand All @@ -2839,10 +2839,10 @@ void processClients()
{
serverAssert(GlobalLocksAcquired());

// Note that this function is reentrant and vecclients may be modified by code called from processInputBuffer
while (!serverTL->setclientsProcess.empty()) {
client *c = *serverTL->setclientsProcess.begin();
serverTL->setclientsProcess.erase(serverTL->setclientsProcess.begin());
// Note that this function is reentrant and vecclientsProcess may be modified by code called from processInputBuffer
while (!serverTL->vecclientsProcess.empty()) {
client *c = *serverTL->vecclientsProcess.begin();
serverTL->vecclientsProcess.erase(serverTL->vecclientsProcess.begin());

/* There is more data in the client input buffer, continue parsing it
* in case to check if there is a full command to execute. */
Expand Down
2 changes: 1 addition & 1 deletion src/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2847,7 +2847,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
g_pserver->db[0]->prefetchKeysFlash(serverTL->setclientsPrefetch);
for (client *c : serverTL->setclientsPrefetch) {
if (!(c->flags & CLIENT_BLOCKED))
serverTL->setclientsProcess.insert(c);
serverTL->vecclientsProcess.push_back(c);
}
serverTL->setclientsPrefetch.clear();
}
Expand Down
2 changes: 1 addition & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2230,7 +2230,7 @@ struct redisServerThreadVars {

int propagate_in_transaction = 0; /* Make sure we don't propagate nested MULTI/EXEC */
int client_pause_in_transaction = 0; /* Was a client pause executed during this Exec? */
std::unordered_set<client*> setclientsProcess;
std::vector<client*> vecclientsProcess;
std::unordered_set<client*> setclientsPrefetch;
std::unordered_set<StorageToken*> setStorageTokensProcess;
dictAsyncRehashCtl *rehashCtl = nullptr;
Expand Down

0 comments on commit c08683e

Please sign in to comment.