Skip to content

Commit

Permalink
Fix race in writes
Browse files Browse the repository at this point in the history
  • Loading branch information
JohnSully committed Mar 4, 2024
1 parent e5f8e77 commit 08d2227
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 3 deletions.
9 changes: 9 additions & 0 deletions src/AsyncWorkQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,13 @@ class AsyncWorkQueue
void shutdown();

void abandonThreads();

void waitForEmpty() {
for (;;) {
std::unique_lock<std::mutex> l(m_mutex);
if (m_workqueue.empty())
return;
sched_yield();
}
}
};
5 changes: 5 additions & 0 deletions src/debug.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,11 @@ void computeDatasetDigest(unsigned char *final) {

memset(final,0,20); /* Start with a clean result */

// For test reliabilty ensure all writes are applied
if (g_pserver->m_pstorageFactory) {
g_pserver->asyncwriteworkqueue->waitForEmpty();
}

for (j = 0; j < cserver.dbnum; j++) {
redisDb *db = g_pserver->db[j];

Expand Down
6 changes: 6 additions & 0 deletions src/replication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1324,6 +1324,12 @@ int startBgsaveForReplication(int mincapa) {
serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s",
socket_target ? "replicas sockets" : "disk");

// When FLASH is enabled we have a seperate responsibility of ensuring the write queue is flushed otherwise
// the data won't match the repl buffer
if (g_pserver->m_pstorageFactory) {
g_pserver->asyncwriteworkqueue->waitForEmpty();
}

rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
/* Only do rdbSave* when rsiptr is not NULL,
Expand Down
7 changes: 4 additions & 3 deletions src/storage/rocksdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -344,11 +344,12 @@ StorageToken* RocksDBStorageProvider::begin_endWriteBatch(struct aeEventLoop *el
tok->tspbatch = std::move(m_spbatch);
tok->tspdb = m_spdb;
m_spbatch = nullptr;
m_lock.unlock();
(*m_pfactory->m_wwqueue)->AddWorkFunction([this, el,callback,tok]{
// This has a race with future writes so async is disabled
//(*m_pfactory->m_wwqueue)->AddWorkFunction([this, el,callback,tok]{
tok->tspdb->Write(WriteOptions(),tok->tspbatch.get()->GetWriteBatch());
aePostFunction(el,callback,tok);
});
//});
m_lock.unlock();

return tok;
}
Expand Down

0 comments on commit 08d2227

Please sign in to comment.