Skip to content

Commit

Permalink
support dump database (OpenAtomFoundation#83)
Browse files Browse the repository at this point in the history
* add RedisZsets::ZScan() interface

* create root directory before open db

* support dump database
  • Loading branch information
Axlgrep authored Jun 22, 2018
1 parent 2091dfd commit 8a1137d
Show file tree
Hide file tree
Showing 16 changed files with 816 additions and 9 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ ifndef DISABLE_WARNING_AS_ERROR
WARNING_FLAGS += -Werror
endif

CXXFLAGS += $(WARNING_FLAGS) -I. -I./include -I$(ROCKSDB_INCLUDE_DIR) -I$(SLASH_INCLUDE_DIR) $(OPT)
CXXFLAGS += $(WARNING_FLAGS) -I. -I./include -I$(ROCKSDB_INCLUDE_DIR) -I$(ROCKSDB_PATH) -I$(SLASH_INCLUDE_DIR) $(OPT)

date := $(shell date +%F)
git_sha := $(shell git rev-parse HEAD 2>/dev/null)
Expand Down
72 changes: 72 additions & 0 deletions include/blackwidow/backupable.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright (c) 2017-present The blackwidow Authors. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef SRC_BACKUPABLE_H_
#define SRC_BACKUPABLE_H_

#include "rocksdb/db.h"

#include "util.h"
#include "blackwidow.h"
#include "db_checkpoint.h"

namespace blackwidow {

const std::string DEFAULT_BK_PATH = "dump"; //Default backup root dir
const std::string DEFAULT_RS_PATH = "db"; //Default restore root dir

// Arguments which will used by BackupSave Thread
// p_engine for BackupEngine handler
// backup_dir
// key_type kv, hash, list, set or zset
struct BackupSaveArgs {
void *p_engine;
const std::string backup_dir;
const std::string key_type;
Status res;

BackupSaveArgs(void *_p_engine,
const std::string &_backup_dir,
const std::string &_key_type)
: p_engine(_p_engine), backup_dir(_backup_dir), key_type(_key_type) {}
};

struct BackupContent {
std::vector<std::string> live_files;
rocksdb::VectorLogPtr live_wal_files;
uint64_t manifest_file_size = 0;
uint64_t sequence_number = 0;
};

class BackupEngine {
public:
~BackupEngine();
static Status Open(BlackWidow *db, BackupEngine** backup_engine_ptr);

Status SetBackupContent();

Status CreateNewBackup(const std::string &dir);

void StopBackup();

Status CreateNewBackupSpecify(const std::string &dir, const std::string &type);

private:
BackupEngine() {}

std::map<std::string, rocksdb::DBCheckpoint*> engines_;
std::map<std::string, BackupContent> backup_content_;
std::map<std::string, pthread_t> backup_pthread_ts_;

Status NewCheckpoint(rocksdb::DB* rocksdb_db, const std::string& type);
std::string GetSaveDirByType(const std::string _dir, const std::string& _type) const {
std::string backup_dir = _dir.empty() ? DEFAULT_BK_PATH : _dir;
return backup_dir + ((backup_dir.back() != '/') ? "/" : "") + _type;
}
Status WaitBackupPthread();
};

} // namespace blackwidow
#endif // SRC_BACKUPABLE_H_
13 changes: 13 additions & 0 deletions include/blackwidow/blackwidow.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ const std::string USAGE_TYPE_ROCKSDB_MEMTABLE = "rocksdb.memtable";
//const std::string USAGE_TYPE_ROCKSDB_BLOCK_CACHE = "rocksdb.block_cache";
const std::string USAGE_TYPE_ROCKSDB_TABLE_READER = "rocksdb.table_reader";

const std::string ALL_DB = "all";
const std::string STRINGS_DB = "strings";
const std::string HASHES_DB = "hashes";
const std::string LISTS_DB = "lists";
const std::string ZSETS_DB = "zsets";
const std::string SETS_DB = "sets";

using Options = rocksdb::Options;
using Status = rocksdb::Status;
using Slice = rocksdb::Slice;
Expand Down Expand Up @@ -867,6 +874,10 @@ class BlackWidow {
bool right_close,
int32_t* ret);

// See SCAN for ZSCAN documentation.
Status ZScan(const Slice& key, int64_t cursor, const std::string& pattern,
int64_t count, std::vector<ScoreMember>* score_members, int64_t* next_cursor);

// Keys Commands

// Note:
Expand Down Expand Up @@ -970,6 +981,8 @@ class BlackWidow {
Status GetKeyNum(std::vector<uint64_t>* nums);
Status StopScanKeyNum();

rocksdb::DB* GetDBByType(const std::string& type);

private:
RedisStrings* strings_db_;
RedisHashes* hashes_db_;
Expand Down
44 changes: 44 additions & 0 deletions include/blackwidow/db_checkpoint.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// A checkpoint is an openable snapshot of a database at a point in time.

#ifndef ROCKSDB_LITE

#include <vector>
#include "rocksdb/status.h"
#include "rocksdb/transaction_log.h"

namespace rocksdb {

class DB;

class DBCheckpoint {
public:
// Creates a Checkpoint object to be used for creating openable sbapshots
static Status Create(DB* db, DBCheckpoint** checkpoint_ptr);

// Builds an openable snapshot of RocksDB on the same disk, which
// accepts an output directory on the same disk, and under the directory
// (1) hard-linked SST files pointing to existing live SST files
// SST files will be copied if output directory is on a different filesystem
// (2) a copied manifest files and other files
// The directory should not already exist and will be created by this API.
// The directory will be an absolute path
virtual Status CreateCheckpoint(const std::string& checkpoint_dir) = 0;

virtual Status GetCheckpointFiles(std::vector<std::string> &live_files,
VectorLogPtr &live_wal_files, uint64_t &manifest_file_size,
uint64_t &sequence_number) = 0;

virtual Status CreateCheckpointWithFiles(const std::string& checkpoint_dir,
std::vector<std::string> &live_files, VectorLogPtr &live_wal_files,
uint64_t manifest_file_size, uint64_t sequence_number) = 0;

virtual ~DBCheckpoint() {}
};

} // namespace rocksdb
#endif // !ROCKSDB_LITE
7 changes: 7 additions & 0 deletions src/util.h → include/blackwidow/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
#include <string>
#include <cstring>
#include <cmath>
#include <dirent.h>
#include <sys/stat.h>
#include <sys/types.h>

namespace blackwidow {

Expand All @@ -19,6 +22,10 @@ namespace blackwidow {
int string_len, int nocase);
int StrToLongDouble(const char* s, size_t slen, long double* ldval);
int LongDoubleToStr(long double ldval, std::string* value);
int do_mkdir(const char *path, mode_t mode);
int mkpath(const char *path, mode_t mode);
int delete_dir(const char* dirname);
int is_dir(const char* filename);
}

#endif // SRC_UTIL_H_
164 changes: 164 additions & 0 deletions src/backupable.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
#include <sys/types.h>
#include <dirent.h>
#include <utility>

#include "blackwidow/backupable.h"

namespace blackwidow {

BackupEngine::~BackupEngine() {
// Wait all children threads
StopBackup();
WaitBackupPthread();
// Delete engines
for (auto& engine : engines_) {
delete engine.second;
}
engines_.clear();
}

Status BackupEngine::NewCheckpoint(rocksdb::DB* rocksdb_db, const std::string& type) {
rocksdb::DBCheckpoint* checkpoint;
Status s = rocksdb::DBCheckpoint::Create(rocksdb_db, &checkpoint);
if (!s.ok()) {
//log_warn("create checkpoint failed, error %s", s.ToString().c_str());
return s;
}
engines_.insert(std::make_pair(type, checkpoint));
return s;
}

Status BackupEngine::Open(blackwidow::BlackWidow *blackwidow,
BackupEngine** backup_engine_ptr) {
*backup_engine_ptr = new BackupEngine();
if (!*backup_engine_ptr){
return Status::Corruption("New BackupEngine failed!");
}

// Create BackupEngine for each db type
rocksdb::Status s;
rocksdb::DB *rocksdb_db;
std::string types[] = {STRINGS_DB, HASHES_DB, LISTS_DB, ZSETS_DB, SETS_DB};
for (const auto& type : types) {
if ((rocksdb_db = blackwidow->GetDBByType(type)) == NULL) {
s = Status::Corruption("Error db type");
}

if (s.ok()) {
s = (*backup_engine_ptr)->NewCheckpoint(rocksdb_db, type);
}

if (!s.ok()) {
delete *backup_engine_ptr;
break;
}
}
return s;
}

Status BackupEngine::SetBackupContent() {
Status s;
for (const auto& engine : engines_) {
//Get backup content
BackupContent bcontent;
s = engine.second->GetCheckpointFiles(bcontent.live_files,
bcontent.live_wal_files,
bcontent.manifest_file_size, bcontent.sequence_number);
if (!s.ok()) {
//log_warn("get backup files faild for type: %s", engine.first.c_str());
return s;
}
backup_content_[engine.first] = std::move(bcontent);
}
return s;
}

Status BackupEngine::CreateNewBackupSpecify(const std::string &backup_dir, const std::string &type) {
std::map<std::string, rocksdb::DBCheckpoint*>::iterator it_engine = engines_.find(type);
std::map<std::string, BackupContent>::iterator it_content = backup_content_.find(type);
std::string dir = GetSaveDirByType(backup_dir, type);
delete_dir(dir.c_str());

if (it_content != backup_content_.end() &&
it_engine != engines_.end()) {
Status s = it_engine->second->CreateCheckpointWithFiles(
dir,
it_content->second.live_files,
it_content->second.live_wal_files,
it_content->second.manifest_file_size,
it_content->second.sequence_number);
if (!s.ok()) {
//log_warn("backup engine create new failed, type: %s, error %s", type.c_str(), s.ToString().c_str());
return s;
}

} else {
//log_warn("invalid db type: %s", type.c_str());
return Status::Corruption("invalid db type");
}
return Status::OK();
}

void* ThreadFuncSaveSpecify(void *arg) {
BackupSaveArgs* arg_ptr = static_cast<BackupSaveArgs*>(arg);
BackupEngine* p = static_cast<BackupEngine*>(arg_ptr->p_engine);

arg_ptr->res = p->CreateNewBackupSpecify(arg_ptr->backup_dir, arg_ptr->key_type);

pthread_exit(&(arg_ptr->res));
}

Status BackupEngine::WaitBackupPthread() {
int ret;
Status s = Status::OK();
for (auto& pthread : backup_pthread_ts_) {
void *res;
if ((ret = pthread_join(pthread.second, &res)) != 0) {
//log_warn("pthread_join failed with backup thread for key_type: %s, error %d", pthread.first.c_str(), ret);
}
Status cur_s = *(static_cast<Status*>(res));
if (!cur_s.ok()) {
//log_warn("pthread executed failed with key_type: %s, error %s", pthread.first.c_str(), cur_s.ToString().c_str());
StopBackup(); //stop others when someone failed
s = cur_s;
}
}
backup_pthread_ts_.clear();
return s;
}


Status BackupEngine::CreateNewBackup(const std::string &dir) {
Status s = Status::OK();
std::vector<BackupSaveArgs*> args;
for (const auto& engine : engines_) {
pthread_t tid;
BackupSaveArgs *arg = new BackupSaveArgs((void*)this, dir, engine.first);
args.push_back(arg);
if (pthread_create(&tid, NULL, &ThreadFuncSaveSpecify, arg) != 0) {
s = Status::Corruption("pthead_create failed.");
break;
}
if (!(backup_pthread_ts_.insert(std::make_pair(engine.first, tid)).second)) {
//log_warn("thread open dupilicated, type: %s", engine.first.c_str());
backup_pthread_ts_[engine.first] = tid;
}
}

// Wait threads stop
if (!s.ok()) {
StopBackup();
}
s = WaitBackupPthread();

for (auto& a : args) {
delete a;
}
return s;
}

void BackupEngine::StopBackup() {
// DEPRECATED
}

}
23 changes: 23 additions & 0 deletions src/blackwidow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
// of patent rights can be found in the PATENTS file in the same directory.

#include "blackwidow/blackwidow.h"
#include "blackwidow/util.h"

#include "src/mutex_impl.h"
#include "src/redis_strings.h"
Expand Down Expand Up @@ -65,6 +66,8 @@ static std::string AppendSubDirectory(const std::string& db_path,

Status BlackWidow::Open(const rocksdb::Options& options,
const std::string& db_path) {
mkpath(db_path.c_str(), 0755);

strings_db_ = new RedisStrings();
Status s = strings_db_->Open(options, AppendSubDirectory(db_path, "strings"));
if (!s.ok()) {
Expand Down Expand Up @@ -608,6 +611,10 @@ Status BlackWidow::ZRemrangebylex(const Slice& key,
return zsets_db_->ZRemrangebylex(key, min, max, left_close, right_close, ret);
}

Status BlackWidow::ZScan(const Slice& key, int64_t cursor, const std::string& pattern,
int64_t count, std::vector<ScoreMember>* score_members, int64_t* next_cursor) {
return zsets_db_->ZScan(key, cursor, pattern, count, score_members, next_cursor);
}


// Keys Commands
Expand Down Expand Up @@ -1420,4 +1427,20 @@ Status BlackWidow::StopScanKeyNum() {
return Status::OK();
}

rocksdb::DB* BlackWidow::GetDBByType(const std::string& type) {
if (type == STRINGS_DB) {
return strings_db_->get_db();
} else if (type == HASHES_DB) {
return hashes_db_->get_db();
} else if (type == LISTS_DB) {
return lists_db_->get_db();
} else if (type == SETS_DB) {
return sets_db_->get_db();
} else if (type == ZSETS_DB) {
return zsets_db_->get_db();
} else {
return NULL;
}
}

} // namespace blackwidow
Loading

0 comments on commit 8a1137d

Please sign in to comment.