Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When updating FLASH values use bulk insert #577

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/IStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ class IStorage
virtual bool enumerate(callback fn) const = 0;
virtual size_t count() const = 0;

virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) {
virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, char *rgfOverwrite, size_t celem) {
beginWriteBatch();
for (size_t ielem = 0; ielem < celem; ++ielem) {
insert(rgkeys[ielem], rgcbkeys[ielem], rgvals[ielem], rgcbvals[ielem], false);
bool fOverwrite = (rgfOverwrite != nullptr) ? rgfOverwrite[ielem] : false;
insert(rgkeys[ielem], rgcbkeys[ielem], rgvals[ielem], rgcbvals[ielem], fOverwrite);
}
endWriteBatch();
}
Expand Down
5 changes: 3 additions & 2 deletions src/StorageCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,14 @@ void StorageCache::insert(sds key, const void *data, size_t cbdata, bool fOverwr
}

long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing);
void StorageCache::bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem)
void StorageCache::bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, char *rgfOverwrite, size_t celem)
{
std::vector<dictEntry*> vechashes;
if (m_pdict != nullptr) {
vechashes.reserve(celem);

for (size_t ielem = 0; ielem < celem; ++ielem) {
if (rgfOverwrite != nullptr && rgfOverwrite[ielem]) continue;
dictEntry *de = (dictEntry*)zmalloc(sizeof(dictEntry));
de->key = (void*)dictGenHashFunction(rgkeys[ielem], (int)rgcbkeys[ielem]);
de->v.u64 = 1;
Expand Down Expand Up @@ -152,7 +153,7 @@ void StorageCache::bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, si
}
ul.unlock();

m_spstorage->bulkInsert(rgkeys, rgcbkeys, rgvals, rgcbvals, celem);
m_spstorage->bulkInsert(rgkeys, rgcbkeys, rgvals, rgcbvals, rgfOverwrite, celem);

bulkInsertsInProgress--;
}
Expand Down
2 changes: 1 addition & 1 deletion src/StorageCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class StorageCache
void clear(void(callback)(void*));
void clearAsync();
void insert(sds key, const void *data, size_t cbdata, bool fOverwrite);
void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem);
void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, char *rgfOverwrite, size_t celem);
void retrieve(sds key, IStorage::callbackSingle fn) const;
bool erase(sds key);
void emergencyFreeCache();
Expand Down
56 changes: 47 additions & 9 deletions src/db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2908,15 +2908,13 @@ void redisDbPersistentData::storeDatabase()
dictReleaseIterator(di);
}

/* static */ void redisDbPersistentData::serializeAndStoreChange(StorageCache *storage, redisDbPersistentData *db, const char *key, bool fUpdate)
/* static */ sds redisDbPersistentData::serializeChange(redisDbPersistentData *db, const char *key)
{
auto itr = db->find_cached_threadsafe(key);
if (itr == nullptr)
return;
return nullptr;
robj *o = itr.val();
sds temp = serializeStoredObjectAndExpire(db, (const char*) itr.key(), o);
storage->insert((sds)key, temp, sdslen(temp), fUpdate);
sdsfree(temp);
return serializeStoredObjectAndExpire(db, (const char*) itr.key(), o);
}

bool redisDbPersistentData::processChanges(bool fSnapshot)
Expand Down Expand Up @@ -2959,10 +2957,30 @@ bool redisDbPersistentData::processChanges(bool fSnapshot)
{
dictIterator *di = dictGetIterator(m_dictChanged);
dictEntry *de;
std::vector<char*> veckeys;
std::vector<size_t> veccbkeys;
std::vector<char*> vecvals;
std::vector<size_t> veccbvals;
std::vector<char> vecoverwrite;
veckeys.reserve(dictSize(m_dictChanged));
veccbkeys.reserve(dictSize(m_dictChanged));
vecvals.reserve(dictSize(m_dictChanged));
veccbvals.reserve(dictSize(m_dictChanged));
vecoverwrite.reserve(dictSize(m_dictChanged));
while ((de = dictNext(di)) != nullptr)
{
serializeAndStoreChange(m_spstorage.get(), this, (const char*)dictGetKey(de), (bool)dictGetVal(de));
sds val = serializeChange(this, (const char*)dictGetKey(de));
if (val != nullptr) {
veckeys.push_back((char*)dictGetKey(de));
veccbkeys.push_back(sdslen((sds)dictGetKey(de)));
vecvals.push_back(val);
veccbvals.push_back(sdslen(val));
vecoverwrite.push_back((bool)dictGetVal(de));
}
}
m_spstorage->bulkInsert(veckeys.data(), veccbkeys.data(), vecvals.data(), veccbvals.data(), vecoverwrite.data(), veckeys.size());
for (auto val : vecvals)
sdsfree(val);
dictReleaseIterator(di);
}
}
Expand Down Expand Up @@ -2996,7 +3014,7 @@ void redisDbPersistentData::processChangesAsync(std::atomic<int> &pendingJobs)
vecvals.push_back(temp);
veccbvals.push_back(sdslen(temp));
}
m_spstorage->bulkInsert(veckeys.data(), veccbkeys.data(), vecvals.data(), veccbvals.data(), veckeys.size());
m_spstorage->bulkInsert(veckeys.data(), veccbkeys.data(), vecvals.data(), veccbvals.data(), nullptr, veckeys.size());
for (auto val : vecvals)
sdsfree(val);
dictReleaseIterator(di);
Expand All @@ -3015,7 +3033,7 @@ void redisDbPersistentData::bulkDirectStorageInsert(char **rgKeys, size_t *rgcbK
}
aeReleaseLock();
}
m_spstorage->bulkInsert(rgKeys, rgcbKeys, rgVals, rgcbVals, celem);
m_spstorage->bulkInsert(rgKeys, rgcbKeys, rgVals, rgcbVals, nullptr, celem);
}

void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot **psnapshotFree)
Expand All @@ -3024,10 +3042,30 @@ void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot **
{
dictIterator *di = dictGetIterator(m_dictChangedStorageFlush);
dictEntry *de;
std::vector<char*> veckeys;
std::vector<size_t> veccbkeys;
std::vector<char*> vecvals;
std::vector<size_t> veccbvals;
std::vector<char> vecoverwrite;
veckeys.reserve(dictSize(m_dictChanged));
veccbkeys.reserve(dictSize(m_dictChanged));
vecvals.reserve(dictSize(m_dictChanged));
veccbvals.reserve(dictSize(m_dictChanged));
vecoverwrite.resize(dictSize(m_dictChanged));
while ((de = dictNext(di)) != nullptr)
{
serializeAndStoreChange(m_spstorage.get(), (redisDbPersistentData*)m_pdbSnapshotStorageFlush, (const char*)dictGetKey(de), (bool)dictGetVal(de));
sds val = serializeChange((redisDbPersistentData*)m_pdbSnapshotStorageFlush, (const char*)dictGetKey(de));
if (val != nullptr) {
veckeys.push_back((char*)dictGetKey(de));
veccbkeys.push_back(sdslen((sds)dictGetKey(de)));
vecvals.push_back(val);
veccbvals.push_back(sdslen(val));
vecoverwrite.push_back((bool)dictGetVal(de));
}
}
m_spstorage->bulkInsert(veckeys.data(), veccbkeys.data(), vecvals.data(), veccbvals.data(), vecoverwrite.data(), veckeys.size());
for (auto val : vecvals)
sdsfree(val);
dictReleaseIterator(di);
dictRelease(m_dictChangedStorageFlush);
m_dictChangedStorageFlush = nullptr;
Expand Down
2 changes: 1 addition & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1217,7 +1217,7 @@ class redisDbPersistentData
uint64_t m_mvccCheckpoint = 0;

private:
static void serializeAndStoreChange(StorageCache *storage, redisDbPersistentData *db, const char *key, bool fUpdate);
static sds serializeChange(redisDbPersistentData *db, const char *key);

void ensure(const char *key);
void ensure(const char *key, dictEntry **de);
Expand Down
12 changes: 10 additions & 2 deletions src/storage/rocksdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ void RocksDBStorageProvider::insert(const char *key, size_t cchKey, void *data,
++m_count;
}

void RocksDBStorageProvider::bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem)
void RocksDBStorageProvider::bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, char *rgfOverwrite, size_t celem)
{
size_t coverwrites = 0;
if (celem >= 16384) {
rocksdb::Options options = DefaultRocksDBOptions();
rocksdb::SstFileWriter sst_file_writer(rocksdb::EnvOptions(), options, options.comparator);
Expand Down Expand Up @@ -92,8 +93,15 @@ void RocksDBStorageProvider::bulkInsert(char **rgkeys, size_t *rgcbkeys, char **
m_spdb->Write(WriteOptions(), spbatch.get());
}

if (rgfOverwrite != nullptr) {
for (size_t ielem = 0; ielem < celem; ++ielem) {
if (rgfOverwrite[ielem])
++coverwrites;
}
}

std::unique_lock<fastlock> l(m_lock);
m_count += celem;
m_count += celem - coverwrites;
}

bool RocksDBStorageProvider::erase(const char *key, size_t cchKey)
Expand Down
2 changes: 1 addition & 1 deletion src/storage/rocksdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class RocksDBStorageProvider : public IStorage
virtual void beginWriteBatch() override;
virtual void endWriteBatch() override;

virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) override;
virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, char *rgfOverwrite, size_t celem) override;

virtual void batch_lock() override;
virtual void batch_unlock() override;
Expand Down