From 08d2227a76536d518be85fd1e268487a06b04281 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 4 Mar 2024 19:35:27 +0000 Subject: [PATCH] Fix race in writes --- src/AsyncWorkQueue.h | 9 +++++++++ src/debug.cpp | 5 +++++ src/replication.cpp | 6 ++++++ src/storage/rocksdb.cpp | 7 ++++--- 4 files changed, 24 insertions(+), 3 deletions(-) diff --git a/src/AsyncWorkQueue.h b/src/AsyncWorkQueue.h index 1f019324a..c21a96590 100644 --- a/src/AsyncWorkQueue.h +++ b/src/AsyncWorkQueue.h @@ -37,4 +37,13 @@ class AsyncWorkQueue void shutdown(); void abandonThreads(); + + void waitForEmpty() { + for (;;) { + std::unique_lock l(m_mutex); + if (m_workqueue.empty()) + return; + sched_yield(); + } + } }; \ No newline at end of file diff --git a/src/debug.cpp b/src/debug.cpp index f30d8f698..e0ad32988 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -298,6 +298,11 @@ void computeDatasetDigest(unsigned char *final) { memset(final,0,20); /* Start with a clean result */ + // For test reliabilty ensure all writes are applied + if (g_pserver->m_pstorageFactory) { + g_pserver->asyncwriteworkqueue->waitForEmpty(); + } + for (j = 0; j < cserver.dbnum; j++) { redisDb *db = g_pserver->db[j]; diff --git a/src/replication.cpp b/src/replication.cpp index 91b995991..e83b3db2c 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -1324,6 +1324,12 @@ int startBgsaveForReplication(int mincapa) { serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s", socket_target ? "replicas sockets" : "disk"); + // When FLASH is enabled we have a seperate responsibility of ensuring the write queue is flushed otherwise + // the data won't match the repl buffer + if (g_pserver->m_pstorageFactory) { + g_pserver->asyncwriteworkqueue->waitForEmpty(); + } + rdbSaveInfo rsi, *rsiptr; rsiptr = rdbPopulateSaveInfo(&rsi); /* Only do rdbSave* when rsiptr is not NULL, diff --git a/src/storage/rocksdb.cpp b/src/storage/rocksdb.cpp index c8e364117..bbf8814fa 100644 --- a/src/storage/rocksdb.cpp +++ b/src/storage/rocksdb.cpp @@ -344,11 +344,12 @@ StorageToken* RocksDBStorageProvider::begin_endWriteBatch(struct aeEventLoop *el tok->tspbatch = std::move(m_spbatch); tok->tspdb = m_spdb; m_spbatch = nullptr; - m_lock.unlock(); - (*m_pfactory->m_wwqueue)->AddWorkFunction([this, el,callback,tok]{ + // This has a race with future writes so async is disabled + //(*m_pfactory->m_wwqueue)->AddWorkFunction([this, el,callback,tok]{ tok->tspdb->Write(WriteOptions(),tok->tspbatch.get()->GetWriteBatch()); aePostFunction(el,callback,tok); - }); + //}); + m_lock.unlock(); return tok; }