Skip to content

Commit

Permalink
Fix swapdb bugs when multiple clients involved.
Browse files Browse the repository at this point in the history
  • Loading branch information
k00809413 authored and JohnSully committed Jan 10, 2024
1 parent 2abe708 commit 2ad14de
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 27 deletions.
53 changes: 31 additions & 22 deletions src/db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1723,16 +1723,17 @@ int dbSwapDatabases(int id1, int id2) {
id2 < 0 || id2 >= cserver.dbnum) return C_ERR;
if (id1 == id2) return C_OK;
std::swap(g_pserver->db[id1], g_pserver->db[id2]);

//swap db's id too, otherwise db does not match its id
std::swap(g_pserver->db[id1]->id, g_pserver->db[id2]->id);

/* Note that we don't swap blocking_keys,
* ready_keys and watched_keys, since we want clients to
* remain in the same DB they were. so put them back */
std::swap(g_pserver->db[id1]->blocking_keys, g_pserver->db[id2]->blocking_keys);
std::swap(g_pserver->db[id2]->ready_keys, g_pserver->db[id2]->ready_keys);
std::swap(g_pserver->db[id2]->watched_keys, g_pserver->db[id2]->watched_keys);
std::swap(g_pserver->db[id1]->ready_keys, g_pserver->db[id2]->ready_keys);
std::swap(g_pserver->db[id1]->watched_keys, g_pserver->db[id2]->watched_keys);

/* Don't swap the redisdb id, as it is expected to be same as database index everywhere else.
For example, flushdb, master/slave replication, etc. */
std::swap(g_pserver->db[id1]->id, g_pserver->db[id2]->id);

/* Now we need to handle clients blocked on lists: as an effect
* of swapping the two DBs, a client that was waiting for list
Expand All @@ -1755,7 +1756,11 @@ int dbSwapDatabases(int id1, int id2) {

/* SWAPDB db1 db2 */
void swapdbCommand(client *c) {
int id1, id2, oriIdx;
int id1, id2;

listNode *ln;
listIter li;
client *cl;

/* Not allowed in cluster mode: we have just DB 0 there. */
if (g_pserver->cluster_enabled) {
Expand All @@ -1772,14 +1777,6 @@ void swapdbCommand(client *c) {
"invalid second DB index") != C_OK)
return;

// get client's original db's index
for (int idb=0; idb < cserver.dbnum; ++idb) {
if (g_pserver->db[idb]->id == c->db->id) {
oriIdx = idb;
break;
}
}

/* Swap... */
if (dbSwapDatabases(id1,id2) == C_ERR) {
addReplyError(c,"DB index is out of range");
Expand All @@ -1789,18 +1786,29 @@ void swapdbCommand(client *c) {
moduleFireServerEvent(REDISMODULE_EVENT_SWAPDB,0,&si);
g_pserver->dirty++;

// set client's db to original db
c->db=g_pserver->db[oriIdx];

// Persist the databse index to dbid mapping into FLASH for later recovery.
// Persist the databse index to storage dbid mapping into FLASH for later recovery.
if (g_pserver->m_pstorageFactory != nullptr && g_pserver->metadataDb != nullptr) {
std::string dbid_key = "db-" + std::to_string(id1);
g_pserver->metadataDb->insert(dbid_key.c_str(), dbid_key.length(), &g_pserver->db[id1]->id, sizeof(g_pserver->db[id1]->id), true);
g_pserver->metadataDb->insert(dbid_key.c_str(), dbid_key.length(), &g_pserver->db[id1]->storage_id, sizeof(g_pserver->db[id1]->storage_id), true);

dbid_key = "db-" + std::to_string(id2);
g_pserver->metadataDb->insert(dbid_key.c_str(), dbid_key.length(), &g_pserver->db[id2]->id, sizeof(g_pserver->db[id2]->id), true);
g_pserver->metadataDb->insert(dbid_key.c_str(), dbid_key.length(), &g_pserver->db[id2]->storage_id, sizeof(g_pserver->db[id2]->storage_id), true);
}
addReply(c,shared.ok);

listRewind(g_pserver->clients,&li);
while ((ln = listNext(&li)) != NULL) {
cl = reinterpret_cast<struct client*>(listNodeValue(ln));
std::unique_lock<decltype(cl->lock)> lock(cl->lock);
if (cl->db->id == g_pserver->db[id1]->id) {
cl->db = g_pserver->db[id2];
updateDBWatchedKey(id1, cl);
}
else if (cl->db->id == g_pserver->db[id2]->id) {
cl->db = g_pserver->db[id1];
updateDBWatchedKey(id2, cl);
}
}
}
}

Expand Down Expand Up @@ -2631,13 +2639,14 @@ void moduleClusterLoadCallback(const char * rgchKey, size_t cchKey, void *data)
moduleLoadCallback(rgchKey, cchKey, data);
}

void redisDb::initialize(int id)
void redisDb::initialize(int id, int storage_id /* default no storage */)
{
redisDbPersistentData::initialize();
this->blocking_keys = dictCreate(&keylistDictType,NULL);
this->ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL);
this->watched_keys = dictCreate(&keylistDictType,NULL);
this->id = id;
this->storage_id = storage_id;
this->avg_ttl = 0;
this->last_expire_set = 0;
this->defrag_later = listCreate();
Expand All @@ -2649,7 +2658,7 @@ void redisDb::storageProviderInitialize()
if (g_pserver->m_pstorageFactory != nullptr)
{
IStorageFactory::key_load_iterator itr = g_pserver->cluster_enabled ? moduleClusterLoadCallback : moduleLoadCallback;
this->setStorageProvider(StorageCache::create(g_pserver->m_pstorageFactory, id, itr, &id));
this->setStorageProvider(StorageCache::create(g_pserver->m_pstorageFactory, storage_id, itr, &id));
}
}

Expand Down
18 changes: 18 additions & 0 deletions src/multi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,24 @@ void touchAllWatchedKeysInDb(redisDb *emptied, redisDb *replaced_with) {
dictReleaseIterator(di);
}

/* Update the DB of the watchedKey structure incase if the c->db (currently selected DB) is updated.
* For example, it may happen during the SWAPDB.
* watchForKey() sets the original DB of the watchedKey structure with the c->db
* but the c->db can be updated incase of SWAPDB command. */
void updateDBWatchedKey(int dbid, client *c) {
listIter li;
listNode *ln;
watchedKey *wk;

listRewind(c->watched_keys,&li);
while((ln = listNext(&li))) {
wk = (watchedKey*)listNodeValue(ln);
if (wk->db->id == dbid) {
wk->db = c->db;
}
}
}

void watchCommand(client *c) {
int j;

Expand Down
8 changes: 4 additions & 4 deletions src/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3925,18 +3925,18 @@ void initServer(void) {
g_pserver->db[j]->initialize(j);
}
} else {
// Read FLASH metadata and load the appropriate dbid into each databse index, as each DB index can have different dbid mapped due to the swapdb command.
// Read FLASH metadata and load the appropriate storage dbid into each databse index, as each DB index can have different storage dbid mapped due to the swapdb command.
g_pserver->metadataDb = g_pserver->m_pstorageFactory->createMetadataDb();
for (int idb = 0; idb < cserver.dbnum; ++idb)
{
int dbid = idb;
int storage_dbid = idb;
std::string dbid_key = "db-" + std::to_string(idb);
g_pserver->metadataDb->retrieve(dbid_key.c_str(), dbid_key.length(), [&](const char *, size_t, const void *data, size_t){
dbid = *(int*)data;
storage_dbid = *(int*)data;
});

g_pserver->db[idb] = new (MALLOC_LOCAL) redisDb();
g_pserver->db[idb]->initialize(dbid);
g_pserver->db[idb]->initialize(idb, storage_dbid);
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1325,7 +1325,7 @@ struct redisDb : public redisDbPersistentDataSnapshot

redisDb() = default;

void initialize(int id);
void initialize(int id, int storage_id=-1 /* default no storage */);
void storageProviderInitialize();
void storageProviderDelete();
virtual ~redisDb();
Expand Down Expand Up @@ -1389,6 +1389,7 @@ struct redisDb : public redisDbPersistentDataSnapshot
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id; /* Database ID */
int storage_id; /* Mapped storage provider DB id which is same as the redisdb id above. But, when the database is swapped, the redisdb id above might be swapped to be consistent with the database index (id <-> g_pserver->db[index]) however the storage_id remains unchanged in order to maintain correct mapping to the underlying storage provider DB. This is valid only if there is a storage provider set.*/
long long last_expire_set; /* when the last expire was set */
double avg_ttl; /* Average TTL, just for stats */
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
Expand Down Expand Up @@ -3114,6 +3115,7 @@ void queueMultiCommand(client *c);
void touchWatchedKey(redisDb *db, robj *key);
int isWatchedKeyExpired(client *c);
void touchAllWatchedKeysInDb(redisDb *emptied, redisDb *replaced_with);
void updateDBWatchedKey(int dbid, client *c);
void discardTransaction(client *c);
void flagTransaction(client *c);
void execCommandAbort(client *c, sds error);
Expand Down

0 comments on commit 2ad14de

Please sign in to comment.