Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
cmake_minimum_required(VERSION 3.10)

project(lsmtree C CXX)

set(CXX_FLAGS
-g
-DCHECK_PTHREAD_RETURN_VALUE
-D_FILE_OFFSET_BITS=64
-Wall
-Wextra
-Werror
-Wconversion
-Wno-unused-parameter
-Wold-style-cast
-Woverloaded-virtual
-Wpointer-arith
-Wshadow
-Wwrite-strings
-march=native
-std=c++14
-rdynamic
)

include_directories(../../wal)
include_directories(../../naughty/fio)
aux_source_directory(. DIR_LIB_LSMTREE_SRCS)
add_library(lsmtree STATIC ${DIR_LIB_LSMTREE_SRCS})
77 changes: 52 additions & 25 deletions src/basetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,24 @@
#include <assert.h>
#include <functional>
#include <string>
#include <mutex>
#include "tablecache.h"
#include "types.h"

const int PATH_LEN = 64;
extern std::string basedir;

const int MAX_LEVELS = 8;
const int SST_LIMIT = 1<<18; //default sst size:256KB
const int MAX_ALLOWED_SEEKS = SST_LIMIT / 256; //max seeks before compaction
const int SST_LIMIT = 1<<19; //default sst size:512KB
const int MAX_ALLOWED_SEEKS = SST_LIMIT / 64; //max seeks before compaction

const int MAX_KEYLEN = 1024;
const int MAX_VALLEN = 1<<16; //64KB

class basetable: public cached {
public:
int level;
char path[64];
char path[PATH_LEN];

int idxoffset;
int datoffset;
Expand All @@ -29,8 +34,12 @@ class basetable: public cached {
std::string smallest;
std::string largest;

int key_num;
int keynum;
int ref_num;

std::mutex mutex;
bool isclosed;
bool isloaded;
bool incache;

basetable():
Expand All @@ -42,36 +51,39 @@ class basetable: public cached {
allowed_seeks(MAX_ALLOWED_SEEKS),
smallest(64, '\xff'),
largest(""),
key_num(0),
keynum(0),
ref_num(0),
incache(false){
isclosed(true),
isloaded(false),
incache(false) {
}

int remove(){
return ::remove(path);
}

bool overlap(basetable *other){
return overlap(other->smallest, other->largest);
}

bool overlap(const std::string &start, const std::string &end){
const std::string &lower = smallest > start? smallest: start;
const std::string &upper = largest < end? largest: end;
return lower <= upper;
}

bool containedby(const std::string &start, const std::string &end){
return smallest>=start && largest<=end;
}

int ref(){
return ++ref_num;
}

int unref(){
assert(ref_num>=1);
if(--ref_num==0){
fprintf(stderr, "unref destroy level-%d sst-%d <%s,%s>\n", level, file_number, smallest.c_str(), largest.c_str());
int refcnt = --ref_num;
if(refcnt==0){
fprintf(stderr, "unref zero destroy level-%d sst-%d <%s,%s>\n", level, file_number, smallest.c_str(), largest.c_str());
delete this;
}
return ref_num;
return refcnt;
}

int refnum(){
Expand All @@ -91,29 +103,40 @@ class basetable: public cached {
}
}

void cache(){
if(incache){
return ;
bool cache(){
std::unique_lock<std::mutex> lock{mutex};
if(isclosed){
open();
}
fprintf(stderr, "CACHEING %s\n", path);
//cache: idxoffset, datoffset, codemap
load();
incache = true;
if(!isloaded){
fprintf(stderr, "LOAD IN CACHE %s\n", path);
load(); //cache: idxoffset, datoffset, codemap
}
incache = true;
return true;
}

void uncache(){
if(!incache){
return ;
std::unique_lock<std::mutex> lock{mutex};
if(isloaded){
fprintf(stderr, "UNCACHEING %s\n", path);
release();
}
if(!isclosed){
close();
}
fprintf(stderr, "UNCACHEING %s\n", path);
release();
this->close();
incache = false;
}

bool iscached(){
std::unique_lock<std::mutex> lock{mutex};
return incache;
}

void printinfo(){
fprintf(stderr, "%d %d %s %s %d\n", level, file_number, smallest.c_str(), largest.c_str(), keynum);
}

public:
virtual int open() = 0;
virtual int close() = 0;
Expand Down Expand Up @@ -141,6 +164,10 @@ class basetable: public cached {
return idxoffset < table->idxoffset;
}

basetable *belong(){
return table;
}

iterator &next(){
idxoffset += sizeof(rowmeta);
return *this;
Expand Down
19 changes: 14 additions & 5 deletions src/clock.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <time.h>
#include <sys/time.h>
#include <string>


Expand All @@ -8,6 +9,18 @@ inline long get_time_sec(){
return ts.tv_sec;
}

inline long get_time_msec(){
struct timespec ts;
clock_gettime(CLOCK_PROCESS_CPUTIME_ID, &ts);
return ts.tv_sec*1000 + ts.tv_nsec/1000000;
}

inline long get_time_usec(){
struct timespec ts;
clock_gettime(CLOCK_PROCESS_CPUTIME_ID, &ts);
return ts.tv_sec*1000000 + ts.tv_nsec/1000;
}

inline long get_time_nsec(){
struct timespec ts;
/*
Expand All @@ -21,11 +34,7 @@ inline long get_time_nsec(){
Thread-specific CPU-time clock.
*/
clock_gettime(CLOCK_PROCESS_CPUTIME_ID, &ts);
return ts.tv_nsec;
}

inline long get_time_usec(){
return get_time_sec()*1000+get_time_nsec()/1000;
return ts.tv_sec*1000000000 + ts.tv_nsec;
}

inline std::string timestamp(){
Expand Down
52 changes: 22 additions & 30 deletions src/compaction.cpp
Original file line number Diff line number Diff line change
@@ -1,40 +1,32 @@
#include "compaction.h"
#include "version.h"

void compaction::settle_inputs(version *ver){
assert(level_>=1 && level_<MAX_LEVELS);
std::string start = inputs_[0][0]->smallest;
std::string end = inputs_[0][0]->largest;
void compaction::settle(){
assert(level_>0 && level_<MAX_LEVELS);

std::set<basetable *> unique;
unique.insert(inputs_[0][0]);
const int src_level= inputs_[0][0]->level;
const int dest_level = level_;
inputs_.push_back(from_);
fprintf(stderr, " |compaction| start:%s, end:%s\n", start_.c_str(), end_.c_str());

for (int delta=(src_level==dest_level?1:0); ; delta=1-delta) {
int level = dest_level-delta;
int affected = 0;
for (int j=0; j<ver->ssts[level].size(); ++j) {
basetable *t = ver->ssts[level][j];
if (unique.count(t)!=0) {
continue;
}
if (!t->overlap(start, end)) {
continue;
}
inputs_[delta].push_back(t);
unique.insert(t);
++affected;
//TODO optimize by binary search
for (basetable *t : ver_->ssts[level_]) {
if (!t->overlap(start_, end_)) {
continue;
}

inputs_.push_back(t);
fprintf(stderr, " |compaction| add sst-%d <%s, %s>\n", t->file_number, t->smallest.c_str(), t->largest.c_str());

if(t->smallest < start){
start = t->smallest;
}
if(t->largest > end){
end = t->largest;
}
if (t->smallest < start_) {
start_ = t->smallest;
}
if (t->largest > end_) {
end_ = t->largest;
}
if (affected==0) {
break;
}

for (basetable *t : ver_->ssts[level_-1]) {
if (t!=inputs_[0] && t->containedby(start_, end_)) {
inputs_.push_back(t);
}
}
}
40 changes: 22 additions & 18 deletions src/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,40 @@
class version;

class compaction{
public:
std::vector<basetable *> inputs_[2];
version *ver_;
int level_;

compaction(int lev):
level_(lev){
}

basetable *input(int w, int idx){
return inputs_[w][idx];
std::string start_;
std::string end_;
basetable *from_;
std::vector<basetable *> inputs_;
void settle();
public:
compaction(version *ver, basetable *focus):
ver_(ver),
level_(focus->level+1),
start_(focus->smallest),
end_(focus->largest),
from_(focus){
settle();
}

void settle_inputs(version *ver);

int level(){
return level_;
}

int size(){
return inputs_[0].size()+inputs_[1].size();
return inputs_.size();
}

void print(){
for(int i=0; i<2; ++i){
for(basetable *t : inputs_[i]){
fprintf(stderr, " compaction input %i level-%d sst-%d <%s,%s>\n", i, t->level, t->file_number, t->smallest.c_str(), t->largest.c_str());
}
}
std::vector<basetable *> &inputs(){
return inputs_;
}

void print(){
for(basetable *t : inputs_){
fprintf(stderr, " compaction input level-%d sst-%d <%s,%s>\n", t->level, t->file_number, t->smallest.c_str(), t->largest.c_str());
}
}
};

#endif
Loading