diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt new file mode 100644 index 0000000..844e803 --- /dev/null +++ b/src/CMakeLists.txt @@ -0,0 +1,27 @@ +cmake_minimum_required(VERSION 3.10) + +project(lsmtree C CXX) + +set(CXX_FLAGS + -g + -DCHECK_PTHREAD_RETURN_VALUE + -D_FILE_OFFSET_BITS=64 + -Wall + -Wextra + -Werror + -Wconversion + -Wno-unused-parameter + -Wold-style-cast + -Woverloaded-virtual + -Wpointer-arith + -Wshadow + -Wwrite-strings + -march=native + -std=c++14 + -rdynamic + ) + +include_directories(../../wal) +include_directories(../../naughty/fio) +aux_source_directory(. DIR_LIB_LSMTREE_SRCS) +add_library(lsmtree STATIC ${DIR_LIB_LSMTREE_SRCS}) diff --git a/src/basetable.h b/src/basetable.h index 5f2adf8..65e67fe 100644 --- a/src/basetable.h +++ b/src/basetable.h @@ -5,19 +5,24 @@ #include #include #include +#include #include "tablecache.h" #include "types.h" +const int PATH_LEN = 64; extern std::string basedir; const int MAX_LEVELS = 8; -const int SST_LIMIT = 1<<18; //default sst size:256KB -const int MAX_ALLOWED_SEEKS = SST_LIMIT / 256; //max seeks before compaction +const int SST_LIMIT = 1<<19; //default sst size:512KB +const int MAX_ALLOWED_SEEKS = SST_LIMIT / 64; //max seeks before compaction + +const int MAX_KEYLEN = 1024; +const int MAX_VALLEN = 1<<16; //64KB class basetable: public cached { public: int level; - char path[64]; + char path[PATH_LEN]; int idxoffset; int datoffset; @@ -29,8 +34,12 @@ class basetable: public cached { std::string smallest; std::string largest; - int key_num; + int keynum; int ref_num; + + std::mutex mutex; + bool isclosed; + bool isloaded; bool incache; basetable(): @@ -42,36 +51,39 @@ class basetable: public cached { allowed_seeks(MAX_ALLOWED_SEEKS), smallest(64, '\xff'), largest(""), - key_num(0), + keynum(0), ref_num(0), - incache(false){ + isclosed(true), + isloaded(false), + incache(false) { } int remove(){ return ::remove(path); } - bool overlap(basetable *other){ - return overlap(other->smallest, other->largest); - } - bool overlap(const std::string &start, const std::string &end){ const std::string &lower = smallest > start? smallest: start; const std::string &upper = largest < end? largest: end; return lower <= upper; } + bool containedby(const std::string &start, const std::string &end){ + return smallest>=start && largest<=end; + } + int ref(){ return ++ref_num; } int unref(){ assert(ref_num>=1); - if(--ref_num==0){ - fprintf(stderr, "unref destroy level-%d sst-%d <%s,%s>\n", level, file_number, smallest.c_str(), largest.c_str()); + int refcnt = --ref_num; + if(refcnt==0){ + fprintf(stderr, "unref zero destroy level-%d sst-%d <%s,%s>\n", level, file_number, smallest.c_str(), largest.c_str()); delete this; } - return ref_num; + return refcnt; } int refnum(){ @@ -91,29 +103,40 @@ class basetable: public cached { } } - void cache(){ - if(incache){ - return ; + bool cache(){ + std::unique_lock lock{mutex}; + if(isclosed){ + open(); } - fprintf(stderr, "CACHEING %s\n", path); - //cache: idxoffset, datoffset, codemap - load(); - incache = true; + if(!isloaded){ + fprintf(stderr, "LOAD IN CACHE %s\n", path); + load(); //cache: idxoffset, datoffset, codemap + } + incache = true; + return true; } void uncache(){ - if(!incache){ - return ; + std::unique_lock lock{mutex}; + if(isloaded){ + fprintf(stderr, "UNCACHEING %s\n", path); + release(); + } + if(!isclosed){ + close(); } - fprintf(stderr, "UNCACHEING %s\n", path); - release(); - this->close(); incache = false; } bool iscached(){ + std::unique_lock lock{mutex}; return incache; } + + void printinfo(){ + fprintf(stderr, "%d %d %s %s %d\n", level, file_number, smallest.c_str(), largest.c_str(), keynum); + } + public: virtual int open() = 0; virtual int close() = 0; @@ -141,6 +164,10 @@ class basetable: public cached { return idxoffset < table->idxoffset; } + basetable *belong(){ + return table; + } + iterator &next(){ idxoffset += sizeof(rowmeta); return *this; diff --git a/src/clock.h b/src/clock.h index dc4c3f2..637b977 100644 --- a/src/clock.h +++ b/src/clock.h @@ -1,4 +1,5 @@ #include +#include #include @@ -8,6 +9,18 @@ inline long get_time_sec(){ return ts.tv_sec; } +inline long get_time_msec(){ + struct timespec ts; + clock_gettime(CLOCK_PROCESS_CPUTIME_ID, &ts); + return ts.tv_sec*1000 + ts.tv_nsec/1000000; +} + +inline long get_time_usec(){ + struct timespec ts; + clock_gettime(CLOCK_PROCESS_CPUTIME_ID, &ts); + return ts.tv_sec*1000000 + ts.tv_nsec/1000; +} + inline long get_time_nsec(){ struct timespec ts; /* @@ -21,11 +34,7 @@ inline long get_time_nsec(){ Thread-specific CPU-time clock. */ clock_gettime(CLOCK_PROCESS_CPUTIME_ID, &ts); - return ts.tv_nsec; -} - -inline long get_time_usec(){ - return get_time_sec()*1000+get_time_nsec()/1000; + return ts.tv_sec*1000000000 + ts.tv_nsec; } inline std::string timestamp(){ diff --git a/src/compaction.cpp b/src/compaction.cpp index 63de3e8..506548c 100644 --- a/src/compaction.cpp +++ b/src/compaction.cpp @@ -1,40 +1,32 @@ #include "compaction.h" #include "version.h" -void compaction::settle_inputs(version *ver){ - assert(level_>=1 && level_smallest; - std::string end = inputs_[0][0]->largest; +void compaction::settle(){ + assert(level_>0 && level_ unique; - unique.insert(inputs_[0][0]); - const int src_level= inputs_[0][0]->level; - const int dest_level = level_; + inputs_.push_back(from_); + fprintf(stderr, " |compaction| start:%s, end:%s\n", start_.c_str(), end_.c_str()); - for (int delta=(src_level==dest_level?1:0); ; delta=1-delta) { - int level = dest_level-delta; - int affected = 0; - for (int j=0; jssts[level].size(); ++j) { - basetable *t = ver->ssts[level][j]; - if (unique.count(t)!=0) { - continue; - } - if (!t->overlap(start, end)) { - continue; - } - inputs_[delta].push_back(t); - unique.insert(t); - ++affected; + //TODO optimize by binary search + for (basetable *t : ver_->ssts[level_]) { + if (!t->overlap(start_, end_)) { + continue; + } + + inputs_.push_back(t); + fprintf(stderr, " |compaction| add sst-%d <%s, %s>\n", t->file_number, t->smallest.c_str(), t->largest.c_str()); - if(t->smallest < start){ - start = t->smallest; - } - if(t->largest > end){ - end = t->largest; - } + if (t->smallest < start_) { + start_ = t->smallest; + } + if (t->largest > end_) { + end_ = t->largest; } - if (affected==0) { - break; + } + + for (basetable *t : ver_->ssts[level_-1]) { + if (t!=inputs_[0] && t->containedby(start_, end_)) { + inputs_.push_back(t); } } } diff --git a/src/compaction.h b/src/compaction.h index 3f97838..1844ca7 100644 --- a/src/compaction.h +++ b/src/compaction.h @@ -16,36 +16,40 @@ class version; class compaction{ -public: - std::vector inputs_[2]; + version *ver_; int level_; - - compaction(int lev): - level_(lev){ - } - - basetable *input(int w, int idx){ - return inputs_[w][idx]; + std::string start_; + std::string end_; + basetable *from_; + std::vector inputs_; + void settle(); +public: + compaction(version *ver, basetable *focus): + ver_(ver), + level_(focus->level+1), + start_(focus->smallest), + end_(focus->largest), + from_(focus){ + settle(); } - void settle_inputs(version *ver); - int level(){ return level_; } int size(){ - return inputs_[0].size()+inputs_[1].size(); + return inputs_.size(); } - void print(){ - for(int i=0; i<2; ++i){ - for(basetable *t : inputs_[i]){ - fprintf(stderr, " compaction input %i level-%d sst-%d <%s,%s>\n", i, t->level, t->file_number, t->smallest.c_str(), t->largest.c_str()); - } - } + std::vector &inputs(){ + return inputs_; } + void print(){ + for(basetable *t : inputs_){ + fprintf(stderr, " compaction input level-%d sst-%d <%s,%s>\n", t->level, t->file_number, t->smallest.c_str(), t->largest.c_str()); + } + } }; #endif diff --git a/src/lru.h b/src/lru.h index 6025675..03f96ad 100644 --- a/src/lru.h +++ b/src/lru.h @@ -17,18 +17,21 @@ class Node{ VT val; Node *prev, *next; + bool fixed; friend NodeList; friend LRUCache; public: Node(): prev(nullptr), - next(nullptr){ + next(nullptr), + fixed(true){ }; - Node(const KT &k, const VT &v): + Node(const KT &k, const VT &v, bool pined): key(k), - val(v){ + val(v), + fixed(pined){ prev = nullptr; next = nullptr; } @@ -57,7 +60,7 @@ class NodeList{ if(empty()){ return nullptr; } - Node *res = head.prev;; + Node *res = head.prev; head.prev->next = &head; head.prev = head.prev->prev; res->next = nullptr; @@ -93,7 +96,8 @@ class LRUCache{ NodeList list; std::map*> cache; int size; - const int capacity; + int capacity; + std::mutex mutex; public: LRUCache(int cap): @@ -106,7 +110,16 @@ class LRUCache{ return cache.find(key) != cache.end(); } + void setfixed(const KT &key, bool isfixed){ + auto it= cache.find(key) ; + if(it==cache.end()){ + return; + } + it->second->fixed = isfixed; + } + int get(const KT &key, VT &val){ + std::unique_lock lock{mutex}; auto it = cache.find(key); if(it == cache.end()){ return -1; @@ -117,22 +130,39 @@ class LRUCache{ return 0; } - void put(const KT &key, VT &val){ + void put(const KT &key, VT &val, bool fixed){ + std::unique_lock lock{mutex}; auto it = cache.find(key); if(it != cache.end()){ //update + fprintf(stderr, "WARNING, LRU put %s, but has exist\n", key.c_str()); Node *node = it->second; node->val = val; list.update(node); }else{ //add if(size == capacity){ - Node *node = list.pop(); - cache.erase(node->key); - VT v = node->val; - v->uncache(); - delete node; + Node *node = nullptr; + int n=capacity; + while(--n>=0){ //TODO if all FIXED + node = list.pop(); + if(node->fixed!=true){ + break; + } + list.push(node); + } + if(n<0){ + capacity+=1; + } else { + assert(node!=nullptr); + fprintf(stderr, "LRU IS FULL, pop %s\n", node->key.c_str()); + cache.erase(node->key); + VT v = node->val; + v->uncache(); + delete node; + --size; + } } - Node *node = new Node(key, val); + Node *node = new Node(key, val, fixed); list.push(node); cache[key] = node; ++size; @@ -140,14 +170,18 @@ class LRUCache{ } int del(const std::string &key, VT &val){ + std::unique_lock lock{mutex}; auto it = cache.find(key); if(it == cache.end()){ return -1; } Node *node = it->second; val = node->val; + val->uncache(); cache.erase(key); list.remove(node); + delete node; + --size; return 0; } diff --git a/src/lsmtree.cpp b/src/lsmtree.cpp index 995e259..e8282df 100644 --- a/src/lsmtree.cpp +++ b/src/lsmtree.cpp @@ -1,51 +1,50 @@ #include #include "lsmtree.h" #include "sstable.h" +#include "clock.h" -thread_pool_t backstage(1); std::string basedir; -const int TIER_SST_COUNT(int level){ - return TIER_PRI_COUNT * pow(10, level); -} - int lsmtree::open(const options *opt, const char *dirpath){ basedir = dirpath; char path[64]; sprintf(path, "%s/sst/\0", dirpath); - if(!exist(path)){ - mkdir(path); + if(!fio::fexist(path)){ + fio::mkdir(path); } + versions_.recover(); wal::option walopt = {20971520, 1024}; char logpath[64]; sprintf(logpath, "%s/log/\0", basedir.c_str()); wal_ = new wal::walog(walopt, logpath); + redolog(); - recover(); + flusher_ = new std::thread(std::bind(&lsmtree::schedule_flush, this)); return 0; } -int lsmtree::recover(){ - versions_.recover(); - +int lsmtree::redolog(){ int startidx = versions_.apply_logidx()+1; int endidx = -1; wal_->truncatefront(startidx); wal_->lastindex(endidx); for(int idx=startidx; idx<=endidx; ++idx){ std::string row; - wal_->read(idx, row); - int seqno=0; - char kv[2][523]; - int flag=0; - memset(kv, 0, sizeof(kv)); - sscanf(row.c_str(), "%d %s %s %d", &seqno, kv[0], kv[1], &flag); + if(wal_->read(idx, row)<0){ + fprintf(stderr, "error: failed when redo log, idx=%d\n", idx); + return -1; + } + int seqno = 0; + char k[MAX_KEYLEN] = {}; + char v[MAX_VALLEN] = {}; + int flag = 0; + sscanf(row.c_str(), "%d %s %s %d", &seqno, k, v, &flag); if(flag==FLAG_VAL){ - mutab_->put(idx, seqno, kv[0], kv[1]); + mutab_->put(idx, seqno, k, v); }else{ - mutab_->del(idx, seqno, kv[0]); + mutab_->del(idx, seqno, k); } } return 0; @@ -54,42 +53,46 @@ int lsmtree::recover(){ int lsmtree::get(const roptions &opt, const std::string &key, std::string &val){ int seqno = (opt.snap==nullptr)? versions_.last_sequence() : opt.snap->sequence(); memtable *mtab = nullptr; - memtable *imtab = nullptr; - version *ver = nullptr; + memtable *imtab[MAX_IMMUTAB_SIZE]; + int tabnum = 0; + { std::unique_lock lock{mutex_}; mtab = mutab_; - imtab = immutab_; - ver = versions_.current(); mutab_->ref(); - if(immutab_!=nullptr){ - immutab_->ref(); + tabnum = imsize_; + for(int i=0; iref(); } - ver->ref(); } { int err = mtab->get(seqno, key, val); mtab->unref(); if(err==0){ - if(imtab!=nullptr){ - imtab->unref(); + for(int i=0; iunref(); } - ver->unref(); return 0; } } - if(immutab_!=nullptr){ - int err = immutab_->get(seqno, key, val); - immutab_->unref(); + { + int err = -1; + for(int i=0; iget(seqno, key, val); + } + imtab[i]->unref(); + } if(err==0){ - ver->unref(); return 0; } } { + version *ver = ver = versions_.curversion(); int err = ver->get(seqno, key, val); ver->unref(); schedule_compaction(); @@ -97,119 +100,213 @@ int lsmtree::get(const roptions &opt, const std::string &key, std::string &val){ } } +int lsmtree::put(const woptions &opt, const std::string &key, const std::string &val){ + wbatch wb; + if(wb.put(key, val)<0){ + return -1; + } + return write(opt, wb); +} + +int lsmtree::del(const woptions &opt, const std::string &key){ + wbatch wb; + if(wb.del(key)<0){ + return -1; + } + return write(opt, wb); +} + +int lsmtree::write(const woptions &opt, const wbatch &bat){ + int seqno = versions_.add_sequence(bat.size()); + bat.scan([this, seqno](const char *key, const char *val, const int flag)->int{ + make_space(); //transfer full memtable to immutable + + char log[1024]; + if(flag==FLAG_VAL){ + sprintf(log, "%d %s %s %d\n\0", seqno, key, val, flag); + wal_->write(++logidx_, log); + return mutab_->put(logidx_, seqno, key, val); + }else{ + sprintf(log, "%d %s * %d\n\0", seqno, key, flag); + wal_->write(++logidx_, log); + return mutab_->del(logidx_, seqno, key); + } + }); + return 0; +} + +void lsmtree::schedule_flush(){ + while(running_){ + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += 2; //tomeout 2 seconds + if(sem_timedwait(&sem_occu_, &ts)!=0){ + continue; + } + int tabnum = 0; + { + std::unique_lock lock{mutex_}; + tabnum = imsize_; + if(tabnum>MAX_FLUSH_SIZE){ + tabnum = MAX_FLUSH_SIZE; + } + } + if(tabnum<2){ + continue; + } + + long start = get_time_usec(); + minor_compact(tabnum); + fprintf(stderr, "minor cost: %d, flush num:%d\n", get_time_usec()-start, tabnum); + for(int i=0; icalculate(); + schedule_compaction(); + } +} + void lsmtree::schedule_compaction(){ - if(compacting){ + if(compacting_){ return; } - compacting = true; - backstage.post([this]{ - { - if(immutab_!=nullptr){ - this->minor_compact(); - } - bool compacted = false; - if(versions_.need_compact()){ - compaction *c = versions_.plan_compact(); - if(c!=nullptr){ //do nothing - this->major_compact(c); - compacted = true; - } + compacting_ = true; + backstage_.post([this]{ + bool compacted = false; + if(versions_.need_compact()){ + compaction *c = nullptr; + { + std::unique_lock lock{sstmutex_}; + c = versions_.plan_compact(versions_.current()); } - compacting = false; - if(compacted){ - schedule_compaction(); + if(c!=nullptr){ //do nothing + long start = get_time_usec(); + this->major_compact(c); + fprintf(stderr, "major cost: %d\n", get_time_usec()-start); + compacted = true; } } + compacting_ = false; + if(compacted){ + versions_.current()->calculate(); + schedule_compaction(); + } }); } -int lsmtree::sweep_space(){ +int lsmtree::make_space(){ if(mutab_->size() < MAX_MEMTAB_SIZE){ return 0; } - std::unique_lock lock{mutex_}; - if (immutab_!=nullptr) { - fprintf(stderr, "level-0 wait\n"); - solid_cv_.wait(lock); - return 0; - } else { - immutab_ = mutab_; + sem_wait(&sem_free_); + { + std::unique_lock lock{mutex_}; + immutab_[imsize_++] = mutab_; mutab_ = new memtable; mutab_->ref(); - return 1; - } -} - -int lsmtree::put(const woptions &opt, const std::string &key, const std::string &val){ - wbatch wb; - if(wb.put(key, val)<0){ - return -1; } - return write(opt, wb); -} - -int lsmtree::del(const woptions &opt, const std::string &key){ - wbatch wb; - if(wb.del(key)<0){ - return -1; - } - return write(opt, wb); + sem_post(&sem_occu_); + return 0; } -int lsmtree::minor_compact(){ - if(immutab_==nullptr){ - return -1; +int lsmtree::minor_compact(const int tabnum){ + assert(tabnum>0); + std::vector vec; + for(int i=0; ibegin()); } + int n = 0; //merge keys + std::string lastkey; int persist_logidx = -1; versionedit edit; + primarysst *sst = new primarysst(versions_.next_fnumber()); sst->open(); - immutab_->scan(versions_.last_sequence(), [=, &edit, &sst, &persist_logidx](const int logidx, const uint64_t seqno, const std::string &key, const std::string &val, int flag) ->int { - persist_logidx = logidx; - if(sst->put(seqno, key, val, flag)==ERROR_SPACE_NOT_ENOUGH){ - fprintf(stderr, "minor compact into sst-%d range:[%s, %s]\n", sst->file_number, sst->smallest.c_str(), sst->largest.c_str()); - edit.add(0, sst); + edit.add(0, sst); + + make_heap(vec.begin(), vec.end(), memtable::compare_gt); + while(!vec.empty()){ + memtable::iterator it = vec.front(); + pop_heap(vec.begin(), vec.end(), memtable::compare_gt); + vec.pop_back(); //remove iterator + if(!it.valid()){ + fprintf(stderr, "error: pop invalid memtable::iterator\n"); + continue; + } + + onval *t = it.val(); + it.next(); + if(it.valid()){ + vec.push_back(it); + push_heap(vec.begin(), vec.end(), memtable::compare_gt); + } + + //merge same key + if(n++>0 && lastkey==t->key && t->seqnologidx){ + persist_logidx = t->logidx; + } + if(sst->put(t->seqno, t->key, t->val, t->flag)==ERROR_SPACE_NOT_ENOUGH){ + fprintf(stderr, " >>>minor compact into sst-%d range:[%s, %s]\n", + sst->file_number, sst->smallest.c_str(), sst->largest.c_str()); + sst = new primarysst(versions_.next_fnumber()); sst->open(); - sst->put(seqno, key, val, flag); - } - return 0; - }); + edit.add(0, sst); - fprintf(stderr, "minor compact into sst-%d range:[%s, %s]\n", sst->file_number, sst->smallest.c_str(), sst->largest.c_str()); - edit.add(0, sst); + sst->put(t->seqno, t->key, t->val, t->flag); + } + lastkey = t->key; + } + fprintf(stderr, " >>>minor compact into sst-%d range:[%s, %s]\n", + sst->file_number, sst->smallest.c_str(), sst->largest.c_str()); + { + std::unique_lock lock{sstmutex_}; + version *neo = versions_.apply(&edit); + versions_.appoint(neo); + versions_.persist(versions_.current()); + } versions_.apply_logidx(persist_logidx); - version *neo = versions_.apply(&edit); + { std::unique_lock lock{mutex_}; - immutab_->unref(); - immutab_ = nullptr; + for(int i=0; iunref(); + } + for(int i=0; icalculate(); + fprintf(stderr, "MINOR COMPACT DONE!!!\n"); return 0; } int lsmtree::major_compact(compaction* c){ - assert(c->inputs_[0].size()>0); + assert(c->size()>0); + versionedit edit; if(c->size()==1){ //directly move to next level - edit.remove(c->inputs_[0][0]); - edit.add(c->level(), c->inputs_[0][0]); + edit.remove(c->inputs()[0]); + edit.add(c->level(), c->inputs()[0]); } else { std::vector vec; - for(int i=0; i<2; ++i){ - for(basetable *t : c->inputs_[i]){ - edit.remove(t); - vec.push_back(t->begin()); - fprintf(stderr, " ...major compact from input-%d level:%d sst-%d <%s, %s>\n", i, t->level, t->file_number, t->smallest.c_str(), t->largest.c_str()); - } + for(basetable *t : c->inputs()){ + fprintf(stderr, "cache sst-%d for major_compact\n", t->file_number); + versions_.cachein(t, true); + edit.remove(t); + vec.push_back(t->begin()); + fprintf(stderr, " ...major compact from level:%d sst-%d <%s, %s>\n", + t->level, t->file_number, t->smallest.c_str(), t->largest.c_str()); } if(vec.empty()){ return 0; @@ -228,8 +325,8 @@ int lsmtree::major_compact(compaction* c){ pop_heap(vec.begin(), vec.end(), basetable::compare_gt); vec.pop_back(); //remove iterator - kvtuple t; - it.parse(t); + kvtuple e; + it.parse(e); it.next(); if(it.valid()){ @@ -237,55 +334,32 @@ int lsmtree::major_compact(compaction* c){ push_heap(vec.begin(), vec.end(), basetable::compare_gt); } - if(n++>0 && lastkey==t.ckey && t.seqno0 && lastkey==e.ckey && e.seqnoput(t.seqno, std::string(t.ckey), std::string(t.cval), t.flag)==ERROR_SPACE_NOT_ENOUGH){ + if(sst->put(e.seqno, std::string(e.ckey), std::string(e.cval), e.flag)==ERROR_SPACE_NOT_ENOUGH){ fprintf(stderr, " >>>major compact into level:%d sst-%d range:[%s, %s]\n", destlevel, sst->file_number, sst->smallest.c_str(), sst->largest.c_str()); sst = new sstable(destlevel, versions_.next_fnumber()); sst->open(); edit.add(destlevel, sst); - sst->put(t.seqno, std::string(t.ckey), std::string(t.cval), t.flag); + sst->put(e.seqno, std::string(e.ckey), std::string(e.cval), e.flag); } + lastkey = e.ckey; } fprintf(stderr, " >>>major compact into level:%d sst-%d range:[%s, %s]\n", destlevel, sst->file_number, sst->smallest.c_str(), sst->largest.c_str()); } - version * neo = versions_.apply(&edit); - { - //std::unique_lock lock{mutex_}; + std::unique_lock lock{sstmutex_}; + version *neo = versions_.apply(&edit); versions_.appoint(neo); + versions_.persist(versions_.current()); } - versions_.current()->calculate(); - fprintf(stderr, "major compact DONE!!!\n"); - return 0; -} - -int lsmtree::write(const woptions &opt, const wbatch &bat){ - int seqno = versions_.add_sequence(bat.size()); - bat.scan([this, seqno](const char *key, const char *val, const int flag)->int{ - if(sweep_space()==1){ - schedule_compaction(); - } - - char log[1024]; - if(flag==FLAG_VAL){ - sprintf(log, "%d %s %s %d\n\0", seqno, key, val, flag); - wal_->write(++logidx_, log); - return mutab_->put(logidx_, seqno, key, val); - }else{ - sprintf(log, "%d %s * %d\n\0", seqno, key, flag); - wal_->write(++logidx_, log); - return mutab_->del(logidx_, seqno, key); - } - }); + fprintf(stderr, "MAJOR COMPACT DONE!!!\n"); return 0; } diff --git a/src/lsmtree.h b/src/lsmtree.h index 4fa00a2..34cdd3c 100644 --- a/src/lsmtree.h +++ b/src/lsmtree.h @@ -5,8 +5,10 @@ #include #include #include +#include #include #include +#include #include #include "memtable.h" #include "sstable.h" @@ -22,49 +24,69 @@ typedef std::pair kvpair; -const int TIER_PRI_COUNT = 8; -const int TIER_SST_COUNT(int level); const int MAX_COMPACT_LEVELS = 2; //everytime compact 2 levels at most +const int MAX_IMMUTAB_SIZE = 8; +const int MAX_FLUSH_SIZE = 4; +const int MIN_FLUSH_SIZE = 1; class lsmtree{ int logidx_; wal::walog *wal_; memtable *mutab_; - memtable *immutab_; + memtable *immutab_[MAX_IMMUTAB_SIZE]; + std::atomic imsize_; std::mutex mutex_; - std::condition_variable solid_cv_; - std::atomic compacting; + thread_pool_t backstage_; + std::thread *flusher_; + sem_t sem_free_; + sem_t sem_occu_; + + std::atomic running_; + std::atomic compacting_; + versionset versions_; + std::mutex sstmutex_; + snapshotlist snapshots_; - int minor_compact(); + int minor_compact(const int tabnum); int major_compact(compaction* c); - int select_overlap(const int ln, std::vector &from, std::vector &to); - int sweep_space(); + int make_space(); + void schedule_flush(); void schedule_compaction(); - int recover(); + int redolog(); public: lsmtree(): logidx_(0), wal_(nullptr), mutab_(nullptr), - immutab_(nullptr), - compacting(false){ + imsize_(0), + running_(true), + compacting_(false), + backstage_(1), + flusher_(nullptr) { mutab_ = new memtable; mutab_->ref(); + sem_init(&sem_occu_, 0, 0); + sem_init(&sem_free_, 0, MAX_IMMUTAB_SIZE); } ~lsmtree(){ - if(mutab_){ - mutab_->unref(); + running_ = false; + if(flusher_->joinable()){ + flusher_->join(); } - if(immutab_){ - immutab_->unref(); + if(mutab_){ + delete mutab_; } + if(flusher_!=nullptr){ + delete flusher_; + flusher_ = nullptr; + } } int open(const options *opt, const char *basedir); diff --git a/src/memtable.cpp b/src/memtable.cpp index 115961e..3515656 100644 --- a/src/memtable.cpp +++ b/src/memtable.cpp @@ -18,7 +18,7 @@ int memtable::get(const uint64_t seqno, const std::string &key, std::string &val int memtable::put(const int logidx, const uint64_t seqno, const std::string &key, const std::string &val, const uint8_t flag){ size_ += key.size() + sizeof(onval); - onval *vobj = new onval{logidx, seqno, val, flag}; + onval *vobj = new onval{logidx, seqno, key, val, flag}; node *neo = table_.insert(key, vobj); assert(neo!=nullptr); return 0; @@ -30,7 +30,7 @@ int memtable::del(const int logidx, const uint64_t seqno, const std::string &key } int memtable::scan(const uint64_t seqno, std::function visit){ - for(skiplist::iterator it = table_.begin(); it!=table_.end(); ++it){ + for(memtable::iterator it =this->begin(); it!=this->end(); ++it){ node *p = *it; onval *v = p->val; visit(v->logidx, v->seqno, p->key, v->val, v->flag); diff --git a/src/memtable.h b/src/memtable.h index cb563a9..e662d22 100644 --- a/src/memtable.h +++ b/src/memtable.h @@ -3,6 +3,7 @@ #include #include +#include #include "types.h" #include "skiplist.h" @@ -13,6 +14,7 @@ const int BRANCH_SIZE = 16; typedef struct{ int logidx; uint64_t seqno; + std::string key; std::string val; uint8_t flag; } onval; @@ -56,6 +58,71 @@ class memtable{ int scan(const uint64_t seqno, std::function visit); + void print(int seqno){ + this->scan(seqno, [seqno](int logidx, uint64_t seq, const std::string &key, const std::string &val, int flag) -> int{ + fprintf(stderr, " logidx:%d seq:%d<%d key:%s val:%s flag:%d\n", logidx, seq, seqno, key.c_str(), val.c_str(), flag); + return 0; + }); + } + +public: + class iterator{ + skiplist *table; + node *ptr; + friend class memtable; + public: + node * operator *(){ + return ptr; + } + + onval* val(){ + assert(ptr!=nullptr); + return ptr->val; + } + + iterator & operator ++(){ + ptr = ptr->next(); + return *this; + } + + bool operator !=(const iterator &other){ + return ptr!=other.ptr; + } + + void next(){ + ptr = ptr->next(); + } + + bool valid(){ + return ptr != table->nil; + } + + }; + + iterator begin(){ + iterator it; + it.table = &table_; + it.ptr = table_.head->next(); + assert(it.ptr!=nullptr); + return it; + } + + iterator end(){ + iterator it; + it.table = &table_; + it.ptr = table_.nil; + return it; + } + + static bool compare_gt(iterator &a, iterator &b){ + node * na = *a; + node * nb = *b; + int delta = strcmp(na->val->key.c_str(), nb->val->key.c_str()); + if(delta!=0){ + return delta > 0; + } + return na->val->seqno < na->val->seqno; + } }; #endif diff --git a/src/primarysst.cpp b/src/primarysst.cpp index 1de029d..c0a476d 100644 --- a/src/primarysst.cpp +++ b/src/primarysst.cpp @@ -14,16 +14,21 @@ primarysst::primarysst(const int fileno, const char*start, const char *end, int if(end!=nullptr){ largest = end; } - key_num = keys; + keynum = keys; } primarysst::~primarysst(){ - ::munmap(mem, SST_LIMIT); + if(isloaded){ + release(); + } + if(!isclosed){ + close(); + } } int primarysst::open(){ int fd= -1; - if(!exist(path)){ + if(!fio::fexist(path)){ fd = ::open(path, O_RDWR | O_CREAT , 0664); if(fd<0) { fprintf(stderr, "open file %s error: %s\n", path, strerror(errno)); @@ -31,7 +36,7 @@ int primarysst::open(){ return -1; } ::ftruncate(fd, SST_LIMIT); - } else { + }else{ fd = ::open(path, O_RDWR, 0664); if(fd<0) { fprintf(stderr, "open file %s error: %s\n", path, strerror(errno)); @@ -43,17 +48,26 @@ int primarysst::open(){ assert(sb.st_size <= SST_LIMIT); } mem = (char*)::mmap(nullptr, SST_LIMIT, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0); - if(mem == MAP_FAILED) { + if(mem == MAP_FAILED){ fprintf(stderr, "mmap error: %s\n", strerror(errno)); ::close(fd); return -1; } ::close(fd); + isclosed = false; return 0; } +int primarysst::close(){ + munmap(mem, SST_LIMIT); + mem = nullptr; + isclosed = true; + return 0; +} + int primarysst::load(){ + keynum = 0; idxoffset = 0; datoffset = SST_LIMIT; for(int pos=0; pos _; _.swap(codemap); - return 0; -} - -int primarysst::close(){ - munmap(mem, SST_LIMIT); - mem = nullptr; + isloaded = false; return 0; } @@ -98,28 +110,29 @@ int primarysst::put(const uint64_t seqno, const std::string &key, const std::str memcpy(mem+datoffset+sizeof(int), key.c_str(), keylen); memcpy(mem+datoffset+sizeof(int)+keylen, &vallen, sizeof(int)); memcpy(mem+datoffset+sizeof(int)+keylen+sizeof(int), val.c_str(), vallen); - msync(mem+datoffset, datlen, MS_SYNC); + //msync(mem+datoffset, datlen, MS_SYNC); //TODO: determine by option const int hashcode = hash(key.c_str(), key.size()); rowmeta meta = {seqno, hashcode, datoffset, datlen, flag}; - memcpy(mem+idxoffset, &meta, sizeof(meta)); - msync(mem+idxoffset, sizeof(meta), MS_SYNC); - idxoffset += sizeof(meta); + memcpy(mem+idxoffset, &meta, sizeof(rowmeta)); + //msync(mem+idxoffset, sizeof(rowmeta), MS_SYNC); //TODO: determine by option + idxoffset += sizeof(rowmeta); char *ckey = mem+datoffset+sizeof(int); char *cval = mem+datoffset+sizeof(int)+keylen+sizeof(int); - kvtuple t(seqno, ckey, cval, flag); codemap.insert(std::make_pair(hashcode, t)); + isloaded = true; - ++key_num; - const int rowlen = sizeof(int)+keylen+sizeof(int)+vallen + sizeof(meta); + ++keynum; + const int rowlen = datlen + sizeof(rowmeta); file_size += rowlen; uplimit(key); return 0; } int primarysst::get(const uint64_t seqno, const std::string &key, kvtuple &res){ + assert(codemap.size()>0); const int hashcode = hash(key.c_str(), key.size()); auto pr = codemap.equal_range(hashcode); for (auto iter = pr.first ; iter != pr.second; ++iter){ @@ -159,18 +172,14 @@ int primarysst::scan(const uint64_t seqno, std::functionseqno==0 && meta->hashcode==0){ return -1; } - loadkv(mem+meta.datoffset, &record.ckey, &record.cval); - record.seqno = meta.seqno; - record.flag = meta.flag; + loadkv(mem+meta->datoffset, &record.ckey, &record.cval); + record.seqno = meta->seqno; + record.flag = meta->flag; return 0; } diff --git a/src/skiplist.h b/src/skiplist.h index 0f92d66..adec1ac 100644 --- a/src/skiplist.h +++ b/src/skiplist.h @@ -53,6 +53,7 @@ class node { template class skiplist { + friend class memtable; node *head; node *nil; @@ -153,38 +154,6 @@ class skiplist { } head->forwards[0] = nil; } - -public: - class iterator{ - node *ptr; - friend class skiplist; - public: - node * operator *(){ - return ptr; - } - - iterator & operator ++(){ - ptr = ptr->next(); - return *this; - } - - bool operator !=(const iterator &other){ - return ptr!=other.ptr; - } - }; - - iterator begin(){ - iterator it; - it.ptr = head->next(); - return it; - } - - iterator end(){ - iterator it; - it.ptr = nil; - return it; - } - }; #endif diff --git a/src/sstable.cpp b/src/sstable.cpp index fbd92ac..36ea9d3 100644 --- a/src/sstable.cpp +++ b/src/sstable.cpp @@ -14,26 +14,35 @@ sstable::sstable(const int lev, const int fileno, const char *start, const char if(end!=nullptr){ largest = end; } - key_num = keys; + keynum = keys; +} + +sstable::~sstable(){ + if(isloaded){ + release(); + } + if(!isclosed){ + close(); + } } int sstable::open(){ - if(!exist(path)){ + if(!fio::fexist(path)){ fd = ::open(path, O_RDWR | O_CREAT , 0664); if(fd<0) { fprintf(stderr, "open file %s error: %s\n", path, strerror(errno)); return -1; } ::ftruncate(fd, SST_LIMIT); - return 0; } else { fd = ::open(path, O_RDWR, 0664); if(fd<0) { fprintf(stderr, "open file %s error: %s\n", path, strerror(errno)); return -1; } - return 0; } + isclosed = false; + return 0; } int sstable::close(){ @@ -41,6 +50,7 @@ int sstable::close(){ ::close(fd); } fd = -1; + isclosed = true; return 0; } @@ -48,6 +58,7 @@ int sstable::load(){ if(fd<0){ open(); } + keynum = 0; idxoffset = 0; datoffset = SST_LIMIT; //loop break if meta is {0,0,0,0,0} @@ -61,23 +72,27 @@ int sstable::load(){ if(meta.hashcode==0 && meta.datoffset==0){ break; } + ++keynum; codemap.insert(std::make_pair(meta.hashcode, meta)); datoffset = meta.datoffset; } + isloaded = true; return 0; } int sstable::release(){ + keynum = 0; idxoffset = 0; datoffset = SST_LIMIT; std::multimap _; _.swap(codemap); + isloaded = false; return 0; } int sstable::get(const uint64_t seqno, const std::string &key, std::string &val){ assert(codemap.size()>0); - assert(incache); + assert(isloaded); const int hashcode = hash(key.c_str(), key.size()); auto res = codemap.equal_range(hashcode); for(auto iter = res.first; iter!=res.second; ++iter){ @@ -130,7 +145,10 @@ int sstable::put(const uint64_t seqno, const std::string &key, const std::string pwrite(fd, (void*)&meta, sizeof(rowmeta), idxoffset); idxoffset += sizeof(rowmeta); - ++key_num; + codemap.insert(std::make_pair(meta.hashcode, meta)); + isloaded = true; + + ++keynum; const int rowlen = sizeof(int)+keylen+sizeof(int)+vallen + sizeof(meta); file_size += rowlen; uplimit(key); @@ -139,8 +157,8 @@ int sstable::put(const uint64_t seqno, const std::string &key, const std::string int sstable::scan(const uint64_t seqno, std::function func){ rowmeta meta; - for(int pos=0; posseqno){ continue; } @@ -157,7 +175,8 @@ int sstable::scan(const uint64_t seqno, std::function lru; public: tablecache(): - lru(512){ //default:512 sstables will be cached at most + lru(960){ //such sstables will be cached at most } - int insert(const std::string &k, cached* v){ + int insert(const std::string &k, cached* v, bool fixed){ if(lru.exist(k)){ fprintf(stderr, "warning, ignore cache %s exist\n", k.c_str()); + //lru.display(); return -1; } - fprintf(stderr, "tablecache::insert, start cache %s\n", k.c_str()); - lru.put(k, v); - fprintf(stderr, "tablecache::insert, ok, cacheing %s\n", k.c_str()); - v->cache(); + fprintf(stderr, "LRU put cache %s \n", k.c_str()); + lru.put(k, v, fixed); return 0; } + void setfixed(const std::string &k, bool isfixed){ + lru.setfixed(k, isfixed); + } + int lookup(const std::string &k, cached* &v){ return lru.get(k, v); } @@ -35,7 +38,7 @@ class tablecache{ int evict(const std::string &k){ cached *v; if(lru.del(k, v)==0){ - v->uncache(); + fprintf(stderr, "LRU evict cache %s \n", k.c_str()); return 0; } return -1; diff --git a/src/types.h b/src/types.h index 2f3b9d1..c564cd5 100644 --- a/src/types.h +++ b/src/types.h @@ -28,19 +28,27 @@ class kvtuple{ int flag; kvtuple(): + buffer(nullptr), seqno(0), ckey(nullptr), cval(nullptr), - flag(0), - buffer(nullptr){ + flag(0) { } kvtuple(uint64_t seq, char *k, char *v, int f): + buffer(nullptr), seqno(seq), ckey(k), cval(v), - flag(f), - buffer(nullptr) { + flag(f) { + } + + kvtuple(const kvtuple &kt){ + buffer = kt.buffer; + seqno = kt.seqno; + ckey = kt.ckey; + cval = kt.cval; + flag = kt.flag; } void reserve(const int size){ diff --git a/src/version.cpp b/src/version.cpp index b9cbf9e..c8a1445 100644 --- a/src/version.cpp +++ b/src/version.cpp @@ -2,24 +2,24 @@ #include "clock.h" -const int PATH_LEN = 64; - -inline uint64_t max_level_size(int ln){ - return (ln==0)? 8*SST_LIMIT : uint64_t(pow(10,ln))*SST_LIMIT; +inline int TIER_SST_COUNT(int level){ + static const int PRISST_COUNT = 8; + return PRISST_COUNT * pow(16, level); } version::version(versionset *vs): vset(vs), refnum(0), - hot_sst(nullptr){ + tricky_sst(nullptr){ } version::~version(){ for(int i=0; irefnum()==1){ - vset->cache_.evict(std::string(t->path)); - t->remove(); + fprintf(stderr, "REMOVE CACHE AND FILE, sst-%d\n", t->file_number); + vset->cacheout(t); + t->remove(); //TODO: async } t->unref(); } @@ -30,20 +30,18 @@ int version::get(const uint64_t seqno, const std::string &key, std::string &val) kvtuple res; res.seqno = 0; for(int j=0; jsmallest || key>ssts[0][j]->largest){ + primarysst *t = dynamic_cast(ssts[0][j]); + if(keysmallest || key>t->largest){ continue; } - primarysst *t = dynamic_cast(ssts[0][j]); - t->ref(); if(!t->iscached()){ - vset->cache_.insert(std::string(t->path), t); + vset->cachein(t, false); } - kvtuple tmp; int err = t->get(seqno, key, tmp); - t->unref(); if(err<0){ + fprintf(stderr, "err:%d, try find %s in sst-%d\n", err, key.c_str(), t->file_number); continue; } @@ -67,22 +65,24 @@ int version::get(const uint64_t seqno, const std::string &key, std::string &val) if(it==ssts[i].end()){ continue; } + basetable *t = *it; if(keysmallest || key>t->largest){ continue; } - t->ref(); + if(!t->iscached()){ - vset->cache_.insert(std::string(t->path), t); + vset->cachein(t, false); } int err = t->get(seqno, key, val); - t->unref(); if(err==0 || err==ERROR_KEY_DELETED){ + return 0; + }else{ //miss seek + fprintf(stderr, "missed seek %s in sst-%d\n", key.c_str(), t->file_number); t->allowed_seeks -= 1; - if(t->allowed_seeks==0){ - hot_sst = t; + if(tricky_sst && t->allowed_seeks==0){ + tricky_sst = t; } - return 0; } } return -1; @@ -92,11 +92,7 @@ void version::calculate(){ crownd_score = 0.0; crownd_level = -1; for(int i=0; ifilesize(); - } - const double rate = (double)total/max_level_size(i); + const double rate = (double)ssts[i].size()/TIER_SST_COUNT(i); if(rate > crownd_score){ crownd_score = rate; crownd_level = i; @@ -108,66 +104,76 @@ void version::calculate(){ versionset::versionset(): next_fnumber_(10000), last_sequence_(1), - verhead_(this) { + apply_logidx_(0), + verhead_(this), + current_(nullptr){ verhead_.prev = &verhead_; verhead_.next = &verhead_; this->appoint(new version(this)); } -compaction *versionset::plan_compact(){ - compaction *c; - const bool size_too_big = current_->crownd_score >= 1.0; - const bool seek_too_many = current_->hot_sst != nullptr; +void versionset::appoint(version *ver){ + version *old = current_; + ver->ref(); + //TODO: memory barrier + current_ = ver; + if(old!=nullptr){ + old->unref(); + } + + //append current_ to tail + ver->next = &verhead_; + ver->prev = verhead_.prev; + verhead_.prev->next = ver; + verhead_.prev = ver; +} + +compaction *versionset::plan_compact(version *ver){ + const bool size_too_big = ver->crownd_score >= 1.0; + const bool miss_too_many = ver->tricky_sst != nullptr; + basetable *focus = nullptr; + int level = 0; if(size_too_big){ - int level = current_->crownd_level; + level = ver->crownd_level; fprintf(stderr, "plan to compact because size too big, level:%d\n", level); - c = new compaction(level+1); //compact into next level - for(int i=0; i < current_->ssts[level].size(); ++i){ - basetable *t = current_->ssts[level][i]; - if(roller_key_[i].empty() || t->smallest > roller_key_[i]){ - c->inputs_[0].push_back(t); + for(basetable *t : ver->ssts[level]){ + if(roller_key_[level].empty() || t->smallest > roller_key_[level]){ + focus = t; break; } } - if(c->inputs_[0].empty()){ //roll back - basetable *t = current_->ssts[level][0]; - c->inputs_[0].push_back(t); + if(focus==nullptr){ //roll back + focus = ver->ssts[level][0]; } - }else if(seek_too_many){ - fprintf(stderr, "plan to compact because seek too many, level:%d\n", current_->hot_sst->level); - c = new compaction(current_->hot_sst->level); - c->inputs_[0].push_back(current_->hot_sst); - current_->hot_sst->allowed_seeks = MAX_ALLOWED_SEEKS; - current_->hot_sst = nullptr; - }else{ - return nullptr; - } + compaction *c = new compaction(ver, focus); //compact into next level + roller_key_[level] = focus->largest; + return c; + }else if(miss_too_many){ + fprintf(stderr, "plan to compact because seek too many, level:%d\n", current_->tricky_sst->level); + focus = ver->tricky_sst; + ver->tricky_sst->allowed_seeks = MAX_ALLOWED_SEEKS; + ver->tricky_sst = nullptr; - c->settle_inputs(current_); - if(seek_too_many){ - if(c->inputs_[1].size()==0){ + compaction *c = new compaction(ver, focus); //compact into next level + if(c->size()==1){ delete c; return nullptr; } + return c; }else{ - for(int i=0; i<2; ++i){ //roll compact_key ahead - if(c->inputs_[i].size()>0){ - basetable *t = c->inputs_[i].back(); - roller_key_[t->level] = t->largest; - } - } + return nullptr; } - return c; } version *versionset::apply(versionedit *edit){ + version *cur = current(); version *neo = new version(this); for(int i=0; i &added = edit->addfiles[i]; int k = 0; - for(int j=0; jssts[i].size(); ++j){ - basetable *t = current_->ssts[i][j]; + for(int j=0; jssts[i].size(); ++j){ + basetable *t = cur->ssts[i][j]; if(edit->delfiles.count(t)!=0){ continue; } @@ -175,7 +181,12 @@ version *versionset::apply(versionedit *edit){ if(added[k]->smallest < t->smallest){ neo->ssts[i].push_back(added[k]); added[k]->ref(); - fprintf(stderr, "apply: add added sst-%d to level:%d [%s, %s]\n", added[k]->file_number, i, added[k]->smallest.c_str(), added[k]->largest.c_str()); + if(!added[k]->iscached()){ + cachein(added[k], false); + } + + fprintf(stderr, "apply: add added sst-%d to level:%d [%s, %s]\n", + added[k]->file_number, i, added[k]->smallest.c_str(), added[k]->largest.c_str()); } else { break; } @@ -186,32 +197,28 @@ version *versionset::apply(versionedit *edit){ for(; kssts[i].push_back(added[k]); added[k]->ref(); - fprintf(stderr, "apply: add added sst-%d to level:%d [%s, %s]\n", added[k]->file_number, i, added[k]->smallest.c_str(), added[k]->largest.c_str()); + if(!added[k]->iscached()){ + cachein(added[k], false); + } + + fprintf(stderr, "apply: add added sst-%d to level:%d [%s, %s]\n", + added[k]->file_number, i, added[k]->smallest.c_str(), added[k]->largest.c_str()); } } - - this->persist(neo->ssts); return neo; } -int versionset::persist(const std::vector ssts[MAX_LEVELS]){ - char metapath[PATH_LEN]; - sprintf(metapath, "%s/meta/\0", basedir.c_str()); - if(!exist(metapath)){ - mkdir(metapath); - } - +int versionset::persist(version *ver){ char manifest_path[PATH_LEN]; - sprintf(manifest_path, "%s/meta/MANIFEST-%ld\0", basedir.c_str(), get_time_usec()); + sprintf(manifest_path, "%s/MANIFEST-%ld\0", metapath_, get_time_usec()); { - int fd = open_create(manifest_path); + int fd = fio::fopen(manifest_path, fio::CREATE); fprintf(stderr, "persist MANIFEST: %s\n", manifest_path); for(int level=0; levelfile_number, t->smallest.c_str(), t->largest.c_str(), t->key_num); - write_file(fd, line, strlen(line)); + for(basetable *t : ver->ssts[level]){ + char line[128+t->smallest.size()+t->largest.size()] = {}; + sprintf(line, "%d %d %s %s %d\n\0", level, t->file_number, t->smallest.c_str(), t->largest.c_str(), t->keynum); + fio::fwrite(fd, line, strlen(line)); } } ::close(fd); @@ -222,31 +229,30 @@ int versionset::persist(const std::vector ssts[MAX_LEVELS]){ char data[256]; sprintf(data, "%d %d %s\0", apply_logidx_, last_sequence_, manifest_path); - write_file(temporary, data, strlen(data)); - char current[PATH_LEN]; - sprintf(current, "%s/CURRENT\0", basedir.c_str()); - fprintf(stderr, "MOVE MANIFEST from %s to %s\n", temporary, current); - if(rename_file(temporary, current)<0){ + fio::fwrite(temporary, data, strlen(data)); + + char curpath[PATH_LEN]; + sprintf(curpath, "%s/CURRENT\0", basedir.c_str()); + fprintf(stderr, "MOVE MANIFEST from %s to %s\n", temporary, curpath); + if(fio::frename(temporary, curpath)<0){ fprintf(stderr, "rename file failed\n"); } return 0; } int versionset::recover(){ - char metapath[PATH_LEN]; - sprintf(metapath, "%s/meta/\0", basedir.c_str()); - if(!exist(metapath)){ - mkdir(metapath); - return 0; + sprintf(metapath_, "%s/meta/\0", basedir.c_str()); + if(!fio::fexist(metapath_)){ + fio::mkdir(metapath_); + return -1; } - char current[PATH_LEN]; - sprintf(current, "%s/CURRENT\0", basedir.c_str()); - char manifest_path[PATH_LEN]; - memset(manifest_path, 0, sizeof(manifest_path)); + char curpath[PATH_LEN]; + sprintf(curpath, "%s/CURRENT\0", basedir.c_str()); + char manifest_path[PATH_LEN]; std::string data; - if(read_file(current, data)<0){ + if(fio::fread(curpath, data)<0){ return -1; } sscanf(data.c_str(), "%d %d %s\0", &apply_logidx_, &last_sequence_, manifest_path); @@ -254,7 +260,7 @@ int versionset::recover(){ versionedit edit; data.clear(); - if(read_file(manifest_path, data)<0){ + if(fio::fread(manifest_path, data)<0){ fprintf(stderr, "read manifest error\n"); return -1; } @@ -264,8 +270,6 @@ int versionset::recover(){ while(token!=nullptr){ int level=0, fnumber=0, keynum=0; char limit[2][64]; - memset(limit, 0, sizeof(limit)); - fprintf(stderr, "token:[%s]\n", token); sscanf(token, "%d %d %s %s %s %d", &level, &fnumber, limit[0], limit[1], &keynum); fprintf(stderr, "RECOVER sstable level-%d sst-%d <%s,%s>\n", level, fnumber, limit[0], limit[1]); diff --git a/src/version.h b/src/version.h index 26f6678..3d28799 100644 --- a/src/version.h +++ b/src/version.h @@ -22,21 +22,26 @@ class version { int refnum; versionset *vset; std::vector ssts[MAX_LEVELS]; + std::mutex mutex_; //max compaction score and corresponding level double crownd_score; int crownd_level; //compact caused by allowed_seeks become zero - basetable *hot_sst; + basetable *tricky_sst; public: version(versionset *vs); ~version(); - void ref(){ + version *ref(){ + std::unique_lock lock{mutex_}; + version *ver = this; ++refnum; + return ver; } void unref(){ + std::unique_lock lock{mutex_}; assert(refnum>=1); if(--refnum==0){ this->prev->next = this->next; @@ -56,6 +61,10 @@ class version { void calculate(); + int sst_size(int level){ + return ssts[level].size(); + } + void print(){ for(int i=0; iiscached()){ + cache_.setfixed(std::string(t->path), fixed); + return; + } + t->cache(); + cache_.insert(std::string(t->path), t, fixed); + } + + void cacheout(basetable *t){ + if(!t->iscached()){ + fprintf(stderr, "warning, cacheout sst-%d but isn't in cache\n", t->file_number); + return; + } + //TODO uncache here + cache_.evict(std::string(t->path)); + } + void apply_logidx(int idx){ apply_logidx_ = idx; } int apply_logidx(){ return apply_logidx_; } @@ -127,31 +157,24 @@ class versionset { int add_sequence(int cnt){ last_sequence_ += cnt; return last_sequence_; } version *current(){ + std::unique_lock lock{mutex_}; return current_; } - void appoint(version *ver){ - if(current_!=nullptr){ - current_->unref(); - } - - current_ = ver; - ver->ref(); - - //append current_ to tail - ver->next = &verhead_; - ver->prev = verhead_.prev; - verhead_.prev->next = ver; - verhead_.prev = ver; + version *curversion(){ + std::unique_lock lock{mutex_}; + return current_->ref(); } - bool need_compact(){ return current_->crownd_score>1 || current_->hot_sst!=nullptr; } + void appoint(version *ver); + + bool need_compact(){ return current_->crownd_score>1.0 || current_->tricky_sst!=nullptr; } - compaction *plan_compact(); + compaction *plan_compact(version *ver); version *apply(versionedit *edit); - int persist(const std::vector ssts[MAX_LEVELS]); + int persist(version *ver); int recover(); }; diff --git a/src/wbatch.h b/src/wbatch.h index 1c944d4..50163bb 100644 --- a/src/wbatch.h +++ b/src/wbatch.h @@ -28,7 +28,7 @@ class wbatch{ const int flag = std::get<2>(rows[i]); int err = func(key, val, flag); if(err<0){ - fprintf(stderr, "failed scan writebatch, %s:%s %d\n", key, val, flag); + fprintf(stderr, "error: failed scan writebatch, %s:%s %d\n", key, val, flag); return -1; } } @@ -44,7 +44,7 @@ class wbatch{ const char *key = std::get<0>(r).c_str(); const char *val = std::get<1>(r).c_str(); const int flag = std::get<2>(r); - fprintf(stderr, "row= %s:%s %d\n", key, val, flag); + fprintf(stderr, "batch item= %s:%s %d\n", key, val, flag); } } diff --git a/test/Makefile b/test/Makefile index 438d2e1..3bf159a 100644 --- a/test/Makefile +++ b/test/Makefile @@ -7,11 +7,11 @@ all: ../src/lsmtree.cpp \ ../src/compaction.cpp \ ../src/version.cpp \ - ../../naughty/file/fio.cpp \ + ../../naughty/fio/fio.cpp \ ./test_default.cpp \ -lpthread \ -std=c++14 \ - -I../../naughty/file \ + -I../../naughty/fio \ -I../../wal \ -I../src kvtuple: @@ -37,7 +37,7 @@ sstable: memtable: g++ -g -o run_test_memtable \ test_memtable.cpp \ - ../src/skiplist.cpp \ + ../src/memtable.cpp \ ../../naughty/file/fio.cpp \ -I../../naughty/file \ -I../../wal \ @@ -112,5 +112,5 @@ major: -std=c++14 \ -I../src clean: - rm -rf run* data/ + rm -rf run* data/ out diff --git a/test/autotest.sh b/test/autotest.sh new file mode 100755 index 0000000..55f98c4 --- /dev/null +++ b/test/autotest.sh @@ -0,0 +1,15 @@ +#!/bin/bash +if [ ! -e ./run_test_default ]; then + make +fi + +j=$1 +for ((i=0; iout + if [ $? -ne 0 ]; then + exit -1 + fi +done diff --git a/test/test_codemap.cpp b/test/test_codemap.cpp new file mode 100644 index 0000000..5bcac83 --- /dev/null +++ b/test/test_codemap.cpp @@ -0,0 +1,26 @@ +#include + + +int main(){ + std::multimap codemap; //hashcode => kvtuple(in order to speed up) + for(int i=0; i<2000; ++i){ + codemap.insert(std::make_pair(i, i)); + } + + bool same=false; + for(int i=0; i<2000; ++i){ + auto pr = codemap.equal_range(i); + for (auto iter = pr.first ; iter != pr.second; ++iter){ + const int v = iter->second; + if(v==i){ + same = true; + fprintf(stderr, "ok, i=%d\n", i); + } + } + if(!same){ + fprintf(stderr, "not same, i=%d\n", i); + } + } + + return 0; +} diff --git a/test/test_default.cpp b/test/test_default.cpp index f39b0b7..faf8374 100644 --- a/test/test_default.cpp +++ b/test/test_default.cpp @@ -2,7 +2,7 @@ #include "lsmtree.h" #include "clock.h" -const int COUNT = 500000; +const int COUNT = 5000000; void test_w(lsmtree &db){ woptions wopt; @@ -12,8 +12,10 @@ void test_w(lsmtree &db){ sprintf(k, "key_%d", i); sprintf(v, "val_%d", i); - std::cout << " insert " << k << ", hashcode:" << hash(k, strlen(k)) < #include "memtable.h" +const int COUNT = 500000; +memtable tab; + void test(){ - const int COUNT = 10000; - memtable tab; - for(int i=0; ilogidx, v->seqno, v->key.c_str(), v->val.c_str(), v->flag); + } +} + +void test_r(){ + for(int i=0; i