Skip to content

Commit

Permalink
Initial async impl
Browse files Browse the repository at this point in the history
  • Loading branch information
JohnSully committed Sep 25, 2023
1 parent 1e67982 commit e95c8b4
Show file tree
Hide file tree
Showing 12 changed files with 93 additions and 71 deletions.
2 changes: 2 additions & 0 deletions src/IStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#define METADATA_DB_IDENTIFIER "c299fde0-6d42-4ec4-b939-34f680ffe39f"

struct StorageToken {
struct client *c;
struct redisDbPersistentData *db;
virtual ~StorageToken() {}
};

Expand Down
18 changes: 18 additions & 0 deletions src/StorageCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,24 @@ void StorageCache::retrieve(sds key, IStorage::callbackSingle fn) const
m_spstorage->retrieve(key, sdslen(key), fn);
}

StorageToken *StorageCache::begin_retrieve(struct aeEventLoop *el, aePostFunctionTokenProc proc, sds key) {
std::unique_lock<fastlock> ul(m_lock);
if (m_pdict != nullptr)
{
uint64_t hash = dictSdsHash(key);
dictEntry *de = dictFind(m_pdict, reinterpret_cast<void*>(hash));

if (de == nullptr)
return nullptr; // Not found
}
ul.unlock();
return m_spstorage->begin_retrieve(el, proc, key, sdslen(key));
}

void StorageCache::complete_retrieve(StorageToken *tok, IStorage::callbackSingle fn) {
m_spstorage->complete_retrieve(tok, fn);
}

size_t StorageCache::count() const
{
std::unique_lock<fastlock> ul(m_lock, std::defer_lock);
Expand Down
2 changes: 2 additions & 0 deletions src/StorageCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class StorageCache
void insert(sds key, const void *data, size_t cbdata, bool fOverwrite);
void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem);
void retrieve(sds key, IStorage::callbackSingle fn) const;
StorageToken *begin_retrieve(struct aeEventLoop *el, aePostFunctionTokenProc proc, sds key);
void complete_retrieve(StorageToken *tok, IStorage::callbackSingle fn);
bool erase(sds key);
void emergencyFreeCache();
bool keycacheIsEnabled() const { return m_pdict != nullptr; }
Expand Down
2 changes: 2 additions & 0 deletions src/blocked.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ void unblockClient(client *c) {
} else if (c->btype == BLOCKED_PAUSE) {
listDelNode(g_pserver->paused_clients,c->paused_list_node);
c->paused_list_node = NULL;
} else if (c->btype == BLOCKED_STORAGE) {
serverTL->vecclientsProcess.push_back(c);
} else {
serverPanic("Unknown btype in unblockClient().");
}
Expand Down
2 changes: 1 addition & 1 deletion src/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ bool initializeStorageProvider(const char **err)
// Create The Storage Factory (if necessary)
serverLog(LL_NOTICE, "Initializing FLASH storage provider (this may take a long time)");
adjustOpenFilesLimit();
g_pserver->m_pstorageFactory = CreateRocksDBStorageFactory(g_sdsArgs, cserver.dbnum, cserver.storage_conf, cserver.storage_conf ? strlen(cserver.storage_conf) : 0);
g_pserver->m_pstorageFactory = CreateRocksDBStorageFactory(g_sdsArgs, cserver.dbnum, cserver.storage_conf, cserver.storage_conf ? strlen(cserver.storage_conf) : 0, &g_pserver->asyncworkqueue);
#else
serverLog(LL_WARNING, "To use the flash storage provider please compile KeyDB with ENABLE_FLASH=yes");
serverLog(LL_WARNING, "Exiting due to the use of an unsupported storage provider");
Expand Down
94 changes: 38 additions & 56 deletions src/db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3383,76 +3383,58 @@ void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command
if (this->find_cached_threadsafe(szFromObj(objKey)) == nullptr)
veckeys.push_back(objKey);
}
lock.disarm();

getKeysFreeResult(&result);

std::vector<std::tuple<sds, robj*, std::unique_ptr<expireEntry>>> vecInserts;
for (robj *objKey : veckeys)
{
sds sharedKey = sdsdupshared((sds)szFromObj(objKey));
std::unique_ptr<expireEntry> spexpire;
robj *o = nullptr;
m_spstorage->retrieve((sds)szFromObj(objKey), [&](const char *, size_t, const void *data, size_t cb){
size_t offset = 0;
spexpire = deserializeExpire(sharedKey, (const char*)data, cb, &offset);
o = deserializeStoredObject(this, sharedKey, reinterpret_cast<const char*>(data) + offset, cb - offset);
serverAssert(o != nullptr);
});

if (o != nullptr) {
vecInserts.emplace_back(sharedKey, o, std::move(spexpire));
} else if (sharedKey != nullptr) {
sdsfree(sharedKey);
auto *tok = m_spstorage->begin_retrieve(serverTL->el, storageLoadCallback, (sds)szFromObj(objKey));
if (tok != nullptr) {
tok->c = c;
tok->db = this;
blockClient(c, BLOCKED_STORAGE);
}
}

if (!vecInserts.empty()) {
lock.arm(c);
for (auto &tuple : vecInserts)
{
sds sharedKey = std::get<0>(tuple);
robj *o = std::get<1>(tuple);
std::unique_ptr<expireEntry> spexpire = std::move(std::get<2>(tuple));
return;
}

if (o != nullptr)
{
if (this->find_cached_threadsafe(sharedKey) != nullptr)
{
// While unlocked this was already ensured
decrRefCount(o);
sdsfree(sharedKey);
}
else
{
if (spexpire != nullptr) {
if (spexpire->when() < mstime()) {
break;
}
}
dictAdd(m_pdict, sharedKey, o);
o->SetFExpires(spexpire != nullptr);
/*static*/ void redisDbPersistentData::storageLoadCallback(aeEventLoop *el, StorageToken *tok) {
tok->db->m_spstorage->complete_retrieve(tok, [&](const char *szKey, size_t cbKey, const void *data, size_t cb) {
auto *db = tok->db;
size_t offset = 0;
sds key = sdsnewlen(szKey, -((ssize_t)cbKey));
auto spexpire = deserializeExpire(key, (const char*)data, cb, &offset);
robj *o = deserializeStoredObject(db, key, reinterpret_cast<const char*>(data) + offset, cb - offset);
serverAssert(o != nullptr);

std::unique_lock<fastlock> ul(g_expireLock);
if (spexpire != nullptr)
{
auto itr = m_setexpire->find(sharedKey);
if (itr != m_setexpire->end())
m_setexpire->erase(itr);
m_setexpire->insert(std::move(*spexpire));
serverAssert(m_setexpire->find(sharedKey) != m_setexpire->end());
}
serverAssert(o->FExpires() == (m_setexpire->find(sharedKey) != m_setexpire->end()));
if (db->find_cached_threadsafe(key) != nullptr) {
LUnneeded:
// While unlocked this was already ensured
decrRefCount(o);
sdsfree(key);
} else {
if (spexpire != nullptr) {
if (spexpire->when() < mstime()) {
goto LUnneeded;
}
}
else
{
if (sharedKey != nullptr)
sdsfree(sharedKey); // BUG but don't bother crashing
dictAdd(db->m_pdict, key, o);
o->SetFExpires(spexpire != nullptr);

std::unique_lock<fastlock> ul(g_expireLock);
if (spexpire != nullptr) {
auto itr = db->m_setexpire->find(key);
if (itr != db->m_setexpire->end())
db->m_setexpire->erase(itr);
db->m_setexpire->insert(std::move(*spexpire));
serverAssert(db->m_setexpire->find(key) != db->m_setexpire->end());
}
serverAssert(o->FExpires() == (db->m_setexpire->find(key) != db->m_setexpire->end()));
}
lock.disarm();
}

return;
});
std::unique_lock<fastlock> ul(tok->c->lock);
unblockClient(tok->c);
}
2 changes: 1 addition & 1 deletion src/networking.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2774,7 +2774,7 @@ void readQueryFromClient(connection *conn) {
processInputBuffer(c, false, CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_ASYNC);
}
}
if (!c->vecqueuedcmd.empty())
if (!c->vecqueuedcmd.empty() && !(c->flags & CLIENT_BLOCKED))
serverTL->vecclientsProcess.push_back(c);
} else {
// If we're single threaded its actually better to just process the command here while the query is hot in the cache
Expand Down
5 changes: 4 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,8 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define BLOCKED_ZSET 5 /* BZPOP et al. */
#define BLOCKED_PAUSE 6 /* Blocked by CLIENT PAUSE */
#define BLOCKED_ASYNC 7
#define BLOCKED_NUM 8 /* Number of blocked states. */
#define BLOCKED_STORAGE 8
#define BLOCKED_NUM 9 /* Number of blocked states. */

/* Client request types */
#define PROTO_REQ_INLINE 1
Expand Down Expand Up @@ -1218,6 +1219,8 @@ class redisDbPersistentData

dict_iter find_cached_threadsafe(const char *key) const;

static void storageLoadCallback(struct aeEventLoop *el, struct StorageToken *token);

protected:
uint64_t m_mvccCheckpoint = 0;

Expand Down
22 changes: 16 additions & 6 deletions src/storage/rocksdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,19 +280,29 @@ bool RocksDBStorageProvider::FKeyExists(std::string& key) const

struct RetrievalStorageToken : public StorageToken {
std::string key;
std::vector<char> data;
bool fFound = false;
};

StorageToken *RocksDBStorageProvider::begin_retrieve(struct aeEventLoop *el, aePostFunctionTokenProc callback, const char *key, size_t cchKey) {
RetrievalStorageToken *tok = new RetrievalStorageToken();
tok->key = std::string(key, cchKey);
aePostFunction(el, callback, tok);
(*m_pfactory->m_wqueue)->AddWorkFunction([this, el, callback, tok]{
rocksdb::PinnableSlice slice;
auto status = m_spdb->Get(ReadOptions(), m_spcolfamily.get(), rocksdb::Slice(prefixKey(tok->key.data(), tok->key.size())), &slice);
if (status.ok()) {
tok->data.resize(slice.size());
memcpy(tok->data.data(), slice.data(), slice.size());
tok->fFound = true;
}
aePostFunction(el, callback, tok);
});
return tok;
}

void RocksDBStorageProvider::complete_retrieve(StorageToken *tok, callbackSingle fn) {
rocksdb::PinnableSlice slice;
RetrievalStorageToken *rtok = static_cast<RetrievalStorageToken*>(tok);
auto status = m_spdb->Get(ReadOptions(), m_spcolfamily.get(), rocksdb::Slice(rtok->key), &slice);
if (status.ok())
fn(rtok->key.data(), rtok->key.size(), slice.data(), slice.size());
RetrievalStorageToken *rtok = reinterpret_cast<RetrievalStorageToken*>(tok);
if (rtok->fFound)
fn(rtok->key.data(), rtok->key.size(), rtok->data.data(), rtok->data.size());
delete rtok;
}
5 changes: 4 additions & 1 deletion src/storage/rocksdbfactor_internal.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once
#include "rocksdb.h"
#include "../AsyncWorkQueue.h"

class RocksDBStorageFactory : public IStorageFactory
{
Expand All @@ -10,7 +11,9 @@ class RocksDBStorageFactory : public IStorageFactory
bool m_fCreatedTempFolder = false;

public:
RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig);
AsyncWorkQueue **m_wqueue;

RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **wqueue);
~RocksDBStorageFactory();

virtual IStorage *create(int db, key_load_iterator iter, void *privdata) override;
Expand Down
8 changes: 4 additions & 4 deletions src/storage/rocksdbfactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ rocksdb::Options DefaultRocksDBOptions() {
return options;
}

IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum, const char *rgchConfig, size_t cchConfig)
IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **wqueue)
{
return new RocksDBStorageFactory(path, dbnum, rgchConfig, cchConfig);
return new RocksDBStorageFactory(path, dbnum, rgchConfig, cchConfig, wqueue);
}

rocksdb::Options RocksDBStorageFactory::RocksDbOptions()
Expand All @@ -52,8 +52,8 @@ rocksdb::Options RocksDBStorageFactory::RocksDbOptions()
return options;
}

RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig)
: m_path(dbfile)
RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **wqueue)
: m_path(dbfile), m_wqueue(wqueue)
{
dbnum++; // create an extra db for metadata
// Get the count of column families in the actual database
Expand Down
2 changes: 1 addition & 1 deletion src/storage/rocksdbfactory.h
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#pragma once

class IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum, const char *rgchConfig, size_t cchConfig);
class IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum, const char *rgchConfig, size_t cchConfig, AsyncWorkQueue **wqueue);

0 comments on commit e95c8b4

Please sign in to comment.