From 95aec27cab008ea0b903f93c386c63def411f00f Mon Sep 17 00:00:00 2001 From: Gaurav Sarma Date: Sun, 9 Feb 2025 10:45:01 +0800 Subject: [PATCH 1/4] Point in time snapshots --- internal/pitsnapshot/README.md | 66 ++++++++++++++++++++ internal/pitsnapshot/dummy_store.go | 62 +++++++++++++++++++ internal/pitsnapshot/flusher.go | 84 +++++++++++++++++++++++++ internal/pitsnapshot/snapshot.go | 89 +++++++++++++++++++++++++++ internal/pitsnapshot/snapshot_map.go | 86 ++++++++++++++++++++++++++ internal/pitsnapshot/snapshot_test.go | 25 ++++++++ internal/store/store.go | 70 +++++++++++++++++++++ 7 files changed, 482 insertions(+) create mode 100644 internal/pitsnapshot/README.md create mode 100644 internal/pitsnapshot/dummy_store.go create mode 100644 internal/pitsnapshot/flusher.go create mode 100644 internal/pitsnapshot/snapshot.go create mode 100644 internal/pitsnapshot/snapshot_map.go create mode 100644 internal/pitsnapshot/snapshot_test.go diff --git a/internal/pitsnapshot/README.md b/internal/pitsnapshot/README.md new file mode 100644 index 000000000..036a978a7 --- /dev/null +++ b/internal/pitsnapshot/README.md @@ -0,0 +1,66 @@ +# Point in time Snapshots + +A point in time snapshot refers to the copy of the existing data which is representative of +the data in the memory at that specific time. + +## Goals +- Don't affect the throughput performance of the current request processing layer. +- Ability to take multiple snapshot instances simultaneously. +- Ability to snapshot and restore on systems with different shards +- Shouldn't depend on existing data files apart from the in-memory data structures + +## Design + +### Recap +DiceDB runs multiple `ShardThreads` based on the number of CPUs in the machine and there is +one `Store` object for every thread. Since DiceDB follows a shared nothing architecture, it +should be possible to snapshot and restore data with varying CPU counts. + +The `Store` object keeps a reference to the `Object` object in a map. The `Object` object is +where the data is being stored. + +### Implementing Copy on write +The snapshotting technique would be similar to the copy-on-write mechanism, ie, additional data +wouldn't have to be stored till the data has to be modified. This means additional memory would +only be required if there are changes to the underyling data. + +### Flow + +The initiation flow: +```bash +ShardThread::CallSnapshotter -> Snapshotter::Start -> Store::StartSnapshot -> SnapshotMap::Buffer +-> PITFlusher::Flush +``` + +When the iteration is over +```bash +Store::StopSnapshot -> SnapshotMap::FlushAllData -> PITFlusher::FlushAllData -> Snapshotter::Close +``` + +### Changes for ShardThread and Store +The snapshot would start on every `ShardThread` and fetch the `Store` object. Every `Store` object +needs to implement the interface `SnapshotStore` which is contains the `StartSnapshot` and `StopSnapshot` +methods. +The `StartSnapshot` and `StopSnapshot` methods would be called on the store from the snapshotter object. + +#### StartSnapshot +When the `StartSnapshot` method is called, the `Store` should keep note of the `SnapshotID` in a map. +There can be multiple instances of snapshots for every store as well. +For any read or write operation which is performed, the `Store` object should check if a snapshot is being +run at that instance. If no snapshot is being run, then continue as usual. +If a snapshot is being run, then for any subsequent write operation, store the previous data in the snapshot's +object, maybe a map. Let's call this the `SnapshotMap`. If there are multiple write operations to the same object +and the data already exists in the `SnapshotMap`, then skip doing anything for the snapshot. +Similarly, for reads, if a snapshot is being run, if the incoming request is from a snapshot layer, then check +if there is anything in the `SnapshotMap` for the key. If no, then return the current value from the `Store`. + +It should fetch the list of keys in its store attribute and iterate through them. + +#### StopSnapshot +When the iteration through all the keys by the `Store` object is done, the `StopSnapshot` method is called by the +`Store`. The `StopSnapshot` lets the `SnapshotMap` know that there are no more updates coming. The `SnapshotMap` +then talks to the `PITFLusher` to finish syncing all the chunks to disk and then closes the main snapshot +process. + +### Point-in-time Flusher +TBD \ No newline at end of file diff --git a/internal/pitsnapshot/dummy_store.go b/internal/pitsnapshot/dummy_store.go new file mode 100644 index 000000000..6f50d90e1 --- /dev/null +++ b/internal/pitsnapshot/dummy_store.go @@ -0,0 +1,62 @@ +package pitsnapshot + +import ( + "log" + + "github.com/dicedb/dice/internal/store" +) + +type ( + DummyStore struct { + store map[string]string + ongoingSnapshots map[uint64]store.Snapshotter + } +) + +func NewDummyStore() *DummyStore { + return &DummyStore{ + store: make(map[string]string), + ongoingSnapshots: make(map[uint64]store.Snapshotter), + } +} + +func (store *DummyStore) Set(key, value string) { + + // Take a snapshot of the existing data since it's going to be overridden + for _, snapshot := range store.ongoingSnapshots { + snapshot.TempAdd(key, value) + } + store.store[key] = value +} + +func (store *DummyStore) Get(key string) string { + return store.store[key] +} + +func (store *DummyStore) StartSnapshot(snapshotID uint64, snapshot store.Snapshotter) (err error) { + log.Println("Starting snapshot for snapshotID", snapshotID) + store.ongoingSnapshots[snapshotID] = snapshot + + for k, v := range store.store { + // Check if the data is overridden + tempVal, _ := snapshot.TempGet(k) + if tempVal != nil { + v = tempVal.(string) + } + err = snapshot.Store(k, v) + if err != nil { + log.Println("Error storing data in snapshot", "error", err) + } + } + store.StopSnapshot(snapshotID) + return +} + +func (store *DummyStore) StopSnapshot(snapshotID uint64) (err error) { + if snapshot, isPresent := store.ongoingSnapshots[snapshotID]; isPresent { + snapshot.Close() + delete(store.ongoingSnapshots, snapshotID) + } + delete(store.ongoingSnapshots, snapshotID) + return +} diff --git a/internal/pitsnapshot/flusher.go b/internal/pitsnapshot/flusher.go new file mode 100644 index 000000000..e1523cad4 --- /dev/null +++ b/internal/pitsnapshot/flusher.go @@ -0,0 +1,84 @@ +package pitsnapshot + +import ( + "context" + "fmt" + "log" + "os" +) + +const ( + FlushFilePath = "/tmp/flusher-%d.ddb" + Delimiter = "\n" +) + +type ( + PITFlusher struct { + ctx context.Context + snapshotID uint64 + updatesCh chan []StoreMapUpdate + exitCh chan bool + + flushFile *os.File + } +) + +func NewPITFlusher(ctx context.Context, snapshotID uint64, exitCh chan bool) (pf *PITFlusher, err error) { + pf = &PITFlusher{ + ctx: ctx, + snapshotID: snapshotID, + exitCh: exitCh, + updatesCh: make(chan []StoreMapUpdate, BufferSize), + } + if err = pf.setup(); err != nil { + return + } + return +} + +func (pf *PITFlusher) setup() (err error) { + if pf.flushFile, err = os.Create(fmt.Sprintf(FlushFilePath, pf.snapshotID)); err != nil { + return + } + return +} + +func (pf *PITFlusher) Start() (err error) { + for { + select { + case <-pf.ctx.Done(): + return + case updates := <-pf.updatesCh: + // TODO: Store the failed updates somewhere + if err = pf.Flush(updates); err != nil { + log.Println("error in flushing updates", err) + continue + } + continue + } + } + return +} + +func (pf *PITFlusher) Flush(updates []StoreMapUpdate) (err error) { + var ( + serializedUpdates []byte + storeUpdate StoreMapUpdate + ) + if serializedUpdates, err = storeUpdate.Serialize(updates); err != nil { + return + } + serializedUpdates = append(serializedUpdates, Delimiter...) + if _, err = pf.flushFile.Write(serializedUpdates); err != nil { + } + return +} + +// Close is called when the overlying SnapshotMap's channel is closed. +// After the SnapshotMap is closed, there is one final push to update all the pending +// data to the flusher +func (pf *PITFlusher) Close() (err error) { + log.Println("Closing the flusher for snapshot", pf.snapshotID) + pf.exitCh <- true + return +} diff --git a/internal/pitsnapshot/snapshot.go b/internal/pitsnapshot/snapshot.go new file mode 100644 index 000000000..d629f2070 --- /dev/null +++ b/internal/pitsnapshot/snapshot.go @@ -0,0 +1,89 @@ +package pitsnapshot + +import ( + "context" + "log" + "time" + + "github.com/dicedb/dice/internal/store" +) + +const ( + BufferSize = 1000 +) + +type ( + SnapshotStore interface { + StartSnapshot(uint64, store.Snapshotter) error + StopSnapshot(uint64) error + } + PointInTimeSnapshot struct { + ctx context.Context + cancelFunc context.CancelFunc + + ID uint64 + + store SnapshotStore + totalStoreShot uint8 + + SnapshotMap *SnapshotMap + + flusher *PITFlusher + + StartedAt time.Time + EndedAt time.Time + + exitCh chan bool + } +) + +func NewPointInTimeSnapshot(ctx context.Context, store SnapshotStore) (pit *PointInTimeSnapshot, err error) { + pit = &PointInTimeSnapshot{ + ctx: ctx, + ID: uint64(time.Now().Nanosecond()), + StartedAt: time.Now(), + + store: store, + exitCh: make(chan bool, 5), + } + pit.ctx, pit.cancelFunc = context.WithCancel(ctx) + if pit.flusher, err = NewPITFlusher(ctx, pit.ID, pit.exitCh); err != nil { + return + } + if pit.SnapshotMap, err = NewSnapshotMap(ctx, pit.flusher); err != nil { + return + } + if err = pit.Run(); err != nil { + return + } + return +} + +func (pit *PointInTimeSnapshot) processStoreUpdates() (err error) { + for { + select { + case <-pit.ctx.Done(): + pit.Close() + return + case <-pit.exitCh: + pit.Close() + return + } + } + return +} + +func (pit *PointInTimeSnapshot) Run() (err error) { + if err = pit.store.StartSnapshot(pit.ID, pit.SnapshotMap); err != nil { + return + } + go pit.processStoreUpdates() + return +} + +func (pit *PointInTimeSnapshot) Close() (err error) { + pit.EndedAt = time.Now() + pit.cancelFunc() + log.Println("Closing snapshot", pit.ID, ". Total time taken", pit.EndedAt.Sub(pit.StartedAt)) + return +} diff --git a/internal/pitsnapshot/snapshot_map.go b/internal/pitsnapshot/snapshot_map.go new file mode 100644 index 000000000..d8cc43628 --- /dev/null +++ b/internal/pitsnapshot/snapshot_map.go @@ -0,0 +1,86 @@ +package pitsnapshot + +import ( + "bytes" + "context" + "encoding/gob" + "log" +) + +type ( + SnapshotMap struct { + tempRepr map[string]interface{} + buffer []StoreMapUpdate + flusher *PITFlusher + closing bool + } + StoreMapUpdate struct { + Key string + Value interface{} + } +) + +func (s StoreMapUpdate) Serialize(updates []StoreMapUpdate) (updateBytes []byte, err error) { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + err = enc.Encode(updates) + if err != nil { + return nil, err + } + updateBytes = buf.Bytes() + return +} + +func (s StoreMapUpdate) Deserialize(data []byte) (result []StoreMapUpdate, err error) { + buf := bytes.NewBuffer(data) + dec := gob.NewDecoder(buf) + if err = dec.Decode(&result); err != nil { + return + } + return +} + +func NewSnapshotMap(ctx context.Context, flusher *PITFlusher) (sm *SnapshotMap, err error) { + sm = &SnapshotMap{ + tempRepr: make(map[string]interface{}, 1000), + flusher: flusher, + } + return + +} + +func (sm *SnapshotMap) TempAdd(key string, val interface{}) (err error) { + if _, ok := sm.tempRepr[key]; ok { + return + } + sm.tempRepr[key] = val + return nil +} + +func (sm *SnapshotMap) TempGet(key string) (interface{}, error) { + return sm.tempRepr[key], nil +} + +func (sm *SnapshotMap) Store(key string, val interface{}) (err error) { + log.Println("Storing data in snapshot", "key", key, "value", val) + if sm.closing { + log.Println("rejecting writes to the update channel since the snapshot map is closing") + return + } + sm.buffer = append(sm.buffer, StoreMapUpdate{Key: key, Value: val}) + if len(sm.buffer) == BufferSize { + sm.flusher.updatesCh <- sm.buffer + sm.buffer = []StoreMapUpdate{} + } + return +} + +func (sm *SnapshotMap) Close() (err error) { + sm.closing = true + // Send the remaining updates + if err = sm.flusher.Flush(sm.buffer); err != nil { + return + } + sm.flusher.Close() + return +} diff --git a/internal/pitsnapshot/snapshot_test.go b/internal/pitsnapshot/snapshot_test.go new file mode 100644 index 000000000..fea87af57 --- /dev/null +++ b/internal/pitsnapshot/snapshot_test.go @@ -0,0 +1,25 @@ +package pitsnapshot + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +func addDataToDummyStore(store *DummyStore) { + store.Set("key1", "value1") + store.Set("key2", "value2") + store.Set("key3", "value3") +} + +func TestSnapshot(t *testing.T) { + // Create a new dummy store + dummyStore := NewDummyStore() + addDataToDummyStore(dummyStore) + + snapshot, err := NewPointInTimeSnapshot(context.Background(), dummyStore) + + assert.Nil(t, err) + <-snapshot.ctx.Done() +} diff --git a/internal/store/store.go b/internal/store/store.go index 110c253c1..4974310e9 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -4,7 +4,9 @@ package store import ( + "log" "path" + "sync" "github.com/dicedb/dice/internal/common" "github.com/dicedb/dice/internal/object" @@ -54,6 +56,17 @@ type Store struct { cmdWatchChan chan CmdWatchEvent evictionStrategy EvictionStrategy ShardID int + + // Snapshot related fields + ongoingSnapshots map[uint64]Snapshotter + snapLock *sync.Mutex +} + +type Snapshotter interface { + TempAdd(string, interface{}) error + TempGet(string) (interface{}, error) + Store(string, interface{}) error + Close() error } func NewStore(cmdWatchChan chan CmdWatchEvent, evictionStrategy EvictionStrategy, shardID int) *Store { @@ -62,6 +75,8 @@ func NewStore(cmdWatchChan chan CmdWatchEvent, evictionStrategy EvictionStrategy expires: NewExpireRegMap(), cmdWatchChan: cmdWatchChan, evictionStrategy: evictionStrategy, + ongoingSnapshots: make(map[uint64]Snapshotter), + snapLock: &sync.Mutex{}, ShardID: shardID, } if evictionStrategy == nil { @@ -97,6 +112,53 @@ func (store *Store) ResetStore() { store.expires = NewExpireMap() } +func (store *Store) StartSnapshot(snapshotID uint64, snapshot Snapshotter) { + store.snapLock.Lock() + store.ongoingSnapshots[snapshotID] = snapshot + store.snapLock.Unlock() + + // Iterate and store all the keys in the snapshot + store.IterateAllKeysForSnapshot(snapshotID) + +} + +func (store *Store) StopSnapshot(snapshotID uint64) { + store.snapLock.Lock() + if snapshot, isPresent := store.ongoingSnapshots[snapshotID]; isPresent { + snapshot.Close() + delete(store.ongoingSnapshots, snapshotID) + } + store.snapLock.Unlock() +} + +func (store *Store) IterateAllKeysForSnapshot(snapshotID uint64) (err error) { + var ( + snapshot Snapshotter + isPresent bool + ) + // If there are no ongoing snapshots, then there won't be any temp data to store + if len(store.ongoingSnapshots) == 0 { + return + } + if snapshot, isPresent = store.ongoingSnapshots[snapshotID]; !isPresent { + return + } + store.store.All(func(k string, v *object.Obj) bool { + // Check if the data is overridden + tempVal, _ := snapshot.TempGet(k) + if tempVal != nil { + v = tempVal.(*object.Obj) + } + err = snapshot.Store(k, v) + if err != nil { + log.Println("Error storing data in snapshot", "error", err) + } + return true + }) + store.StopSnapshot(snapshotID) + return +} + func (store *Store) Put(k string, obj *object.Obj, opts ...PutOption) { store.putHelper(k, obj, opts...) } @@ -147,6 +209,10 @@ func (store *Store) putHelper(k string, obj *object.Obj, opts ...PutOption) { store.numKeys++ } + // Take a snapshot of the existing data since it's going to be overridden + for _, snapshot := range store.ongoingSnapshots { + snapshot.TempAdd(k, obj) + } store.store.Put(k, obj) store.evictionStrategy.OnAccess(k, obj, AccessSet) @@ -295,6 +361,10 @@ func (store *Store) deleteKey(k string, obj *object.Obj, opts ...DelOption) bool } if obj != nil { + // Take a snapshot of the existing data since it's going to be overridden + for _, snapshot := range store.ongoingSnapshots { + snapshot.TempAdd(k, obj) + } store.store.Delete(k) store.expires.Delete(obj) store.numKeys-- From 802173b506d421211c58f8a4e825becffd714441 Mon Sep 17 00:00:00 2001 From: Gaurav Sarma Date: Mon, 10 Feb 2025 07:14:46 +0800 Subject: [PATCH 2/4] Add tests --- internal/pitsnapshot/README.md | 12 ++- internal/pitsnapshot/dummy_store.go | 38 +++++++- internal/pitsnapshot/flusher.go | 52 ++++++++++- internal/pitsnapshot/snapshot.go | 8 +- internal/pitsnapshot/snapshot_map.go | 31 +++++-- internal/pitsnapshot/snapshot_test.go | 124 ++++++++++++++++++++++++-- internal/store/store.go | 7 +- 7 files changed, 244 insertions(+), 28 deletions(-) diff --git a/internal/pitsnapshot/README.md b/internal/pitsnapshot/README.md index 036a978a7..0810de910 100644 --- a/internal/pitsnapshot/README.md +++ b/internal/pitsnapshot/README.md @@ -63,4 +63,14 @@ then talks to the `PITFLusher` to finish syncing all the chunks to disk and then process. ### Point-in-time Flusher -TBD \ No newline at end of file +The `PITFlusher` serializes the store updates from the `SnapshotMap` to binary format, currently `gob`. +It serializes and appends to a file. + +## Test cases and benchmarks +- Snapshot data less than the buffer size without any subsequent writes +- Snapshot data less than the buffer size with localized subsequent writes +- Snapshot data less than the buffer size with spread out subsequent writes +- Snapshot data more than the buffer size without any subsequent writes +- Snapshot data more than the buffer size with localized subsequent writes +- Snapshot data more than the buffer size with spread out subsequent writes +- Ensure current `get` path is not affected \ No newline at end of file diff --git a/internal/pitsnapshot/dummy_store.go b/internal/pitsnapshot/dummy_store.go index 6f50d90e1..51360a4c8 100644 --- a/internal/pitsnapshot/dummy_store.go +++ b/internal/pitsnapshot/dummy_store.go @@ -2,6 +2,7 @@ package pitsnapshot import ( "log" + "sync" "github.com/dicedb/dice/internal/store" ) @@ -10,6 +11,8 @@ type ( DummyStore struct { store map[string]string ongoingSnapshots map[uint64]store.Snapshotter + snapLock *sync.RWMutex + stLock *sync.RWMutex } ) @@ -17,27 +20,53 @@ func NewDummyStore() *DummyStore { return &DummyStore{ store: make(map[string]string), ongoingSnapshots: make(map[uint64]store.Snapshotter), + snapLock: &sync.RWMutex{}, + stLock: &sync.RWMutex{}, } } func (store *DummyStore) Set(key, value string) { // Take a snapshot of the existing data since it's going to be overridden + store.snapLock.RLock() for _, snapshot := range store.ongoingSnapshots { snapshot.TempAdd(key, value) } + store.snapLock.RUnlock() + + store.stLock.Lock() + defer store.stLock.Unlock() + store.store[key] = value } func (store *DummyStore) Get(key string) string { + store.stLock.RLock() + defer store.stLock.RUnlock() + return store.store[key] } func (store *DummyStore) StartSnapshot(snapshotID uint64, snapshot store.Snapshotter) (err error) { log.Println("Starting snapshot for snapshotID", snapshotID) + + store.snapLock.Lock() store.ongoingSnapshots[snapshotID] = snapshot + store.snapLock.Unlock() + + store.stLock.RLock() + keys := make([]string, 0, len(store.store)) + for k := range store.store { + keys = append(keys, k) + } + store.stLock.RUnlock() + + for _, k := range keys { + + store.stLock.RLock() + v := store.store[k] + store.stLock.RUnlock() - for k, v := range store.store { // Check if the data is overridden tempVal, _ := snapshot.TempGet(k) if tempVal != nil { @@ -53,10 +82,13 @@ func (store *DummyStore) StartSnapshot(snapshotID uint64, snapshot store.Snapsho } func (store *DummyStore) StopSnapshot(snapshotID uint64) (err error) { + store.snapLock.Lock() if snapshot, isPresent := store.ongoingSnapshots[snapshotID]; isPresent { - snapshot.Close() + if err = snapshot.Close(); err != nil { + log.Println("Error closing snapshot", "error", err) + } delete(store.ongoingSnapshots, snapshotID) } - delete(store.ongoingSnapshots, snapshotID) + store.snapLock.Unlock() return } diff --git a/internal/pitsnapshot/flusher.go b/internal/pitsnapshot/flusher.go index e1523cad4..dfa4948db 100644 --- a/internal/pitsnapshot/flusher.go +++ b/internal/pitsnapshot/flusher.go @@ -18,7 +18,9 @@ type ( snapshotID uint64 updatesCh chan []StoreMapUpdate exitCh chan bool + dlq [][]StoreMapUpdate + totalKeys uint64 flushFile *os.File } ) @@ -28,7 +30,7 @@ func NewPITFlusher(ctx context.Context, snapshotID uint64, exitCh chan bool) (pf ctx: ctx, snapshotID: snapshotID, exitCh: exitCh, - updatesCh: make(chan []StoreMapUpdate, BufferSize), + updatesCh: make(chan []StoreMapUpdate, 10*BufferSize), } if err = pf.setup(); err != nil { return @@ -37,7 +39,11 @@ func NewPITFlusher(ctx context.Context, snapshotID uint64, exitCh chan bool) (pf } func (pf *PITFlusher) setup() (err error) { - if pf.flushFile, err = os.Create(fmt.Sprintf(FlushFilePath, pf.snapshotID)); err != nil { + filePath := fmt.Sprintf(FlushFilePath, pf.snapshotID) + if _, err = os.Stat(filePath); os.IsExist(err) { + return + } + if pf.flushFile, err = os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644); err != nil { return } return @@ -51,7 +57,8 @@ func (pf *PITFlusher) Start() (err error) { case updates := <-pf.updatesCh: // TODO: Store the failed updates somewhere if err = pf.Flush(updates); err != nil { - log.Println("error in flushing updates", err) + //log.Println("error in flushing updates, pushing to DLQ", err) + pf.dlq = append(pf.dlq, updates) continue } continue @@ -65,11 +72,44 @@ func (pf *PITFlusher) Flush(updates []StoreMapUpdate) (err error) { serializedUpdates []byte storeUpdate StoreMapUpdate ) + pf.totalKeys += uint64(len(updates)) if serializedUpdates, err = storeUpdate.Serialize(updates); err != nil { return } serializedUpdates = append(serializedUpdates, Delimiter...) if _, err = pf.flushFile.Write(serializedUpdates); err != nil { + return + } + return +} + +func (pf *PITFlusher) clearDLQ() (err error) { + for { + select { + case updates := <-pf.updatesCh: + if err := pf.Flush(updates); err != nil { + log.Println("Error in flushing updates while draining", err) + pf.dlq = append(pf.dlq, updates) + } + default: + // Channel is empty + goto Done + } + } +Done: + totalKeysInDLQ := 0 + for _, updates := range pf.dlq { + var ( + update []StoreMapUpdate + ) + update = updates + totalKeysInDLQ += len(update) + if err = pf.Flush(update); err != nil { + return + } + } + if totalKeysInDLQ > 0 { + log.Println("Total keys in DLQ", totalKeysInDLQ, len(pf.updatesCh)) } return } @@ -78,7 +118,11 @@ func (pf *PITFlusher) Flush(updates []StoreMapUpdate) (err error) { // After the SnapshotMap is closed, there is one final push to update all the pending // data to the flusher func (pf *PITFlusher) Close() (err error) { - log.Println("Closing the flusher for snapshot", pf.snapshotID) + if err = pf.clearDLQ(); err != nil { + log.Println("error in clearing DLQ", err) + } + log.Println("Closing the flusher for snapshot", pf.snapshotID, ", total keys flushed", pf.totalKeys) + pf.flushFile.Close() pf.exitCh <- true return } diff --git a/internal/pitsnapshot/snapshot.go b/internal/pitsnapshot/snapshot.go index d629f2070..e79faa704 100644 --- a/internal/pitsnapshot/snapshot.go +++ b/internal/pitsnapshot/snapshot.go @@ -47,13 +47,10 @@ func NewPointInTimeSnapshot(ctx context.Context, store SnapshotStore) (pit *Poin exitCh: make(chan bool, 5), } pit.ctx, pit.cancelFunc = context.WithCancel(ctx) - if pit.flusher, err = NewPITFlusher(ctx, pit.ID, pit.exitCh); err != nil { + if pit.flusher, err = NewPITFlusher(pit.ctx, pit.ID, pit.exitCh); err != nil { return } - if pit.SnapshotMap, err = NewSnapshotMap(ctx, pit.flusher); err != nil { - return - } - if err = pit.Run(); err != nil { + if pit.SnapshotMap, err = NewSnapshotMap(pit.ctx, pit.flusher); err != nil { return } return @@ -74,6 +71,7 @@ func (pit *PointInTimeSnapshot) processStoreUpdates() (err error) { } func (pit *PointInTimeSnapshot) Run() (err error) { + go pit.flusher.Start() if err = pit.store.StartSnapshot(pit.ID, pit.SnapshotMap); err != nil { return } diff --git a/internal/pitsnapshot/snapshot_map.go b/internal/pitsnapshot/snapshot_map.go index d8cc43628..5c58d3707 100644 --- a/internal/pitsnapshot/snapshot_map.go +++ b/internal/pitsnapshot/snapshot_map.go @@ -5,14 +5,17 @@ import ( "context" "encoding/gob" "log" + "sync" ) type ( SnapshotMap struct { - tempRepr map[string]interface{} - buffer []StoreMapUpdate - flusher *PITFlusher - closing bool + tempRepr map[string]interface{} + buffer []StoreMapUpdate + flusher *PITFlusher + closing bool + mLock *sync.RWMutex + totalKeys uint64 } StoreMapUpdate struct { Key string @@ -44,12 +47,16 @@ func NewSnapshotMap(ctx context.Context, flusher *PITFlusher) (sm *SnapshotMap, sm = &SnapshotMap{ tempRepr: make(map[string]interface{}, 1000), flusher: flusher, + mLock: &sync.RWMutex{}, } return } func (sm *SnapshotMap) TempAdd(key string, val interface{}) (err error) { + sm.mLock.Lock() + defer sm.mLock.Unlock() + if _, ok := sm.tempRepr[key]; ok { return } @@ -58,18 +65,24 @@ func (sm *SnapshotMap) TempAdd(key string, val interface{}) (err error) { } func (sm *SnapshotMap) TempGet(key string) (interface{}, error) { + sm.mLock.RLock() + defer sm.mLock.RUnlock() + return sm.tempRepr[key], nil } func (sm *SnapshotMap) Store(key string, val interface{}) (err error) { - log.Println("Storing data in snapshot", "key", key, "value", val) + //log.Println("Storing data in snapshot", "key", key, "value", val) if sm.closing { log.Println("rejecting writes to the update channel since the snapshot map is closing") return } sm.buffer = append(sm.buffer, StoreMapUpdate{Key: key, Value: val}) - if len(sm.buffer) == BufferSize { - sm.flusher.updatesCh <- sm.buffer + if len(sm.buffer) >= BufferSize { + bufferCopy := make([]StoreMapUpdate, len(sm.buffer)) + copy(bufferCopy, sm.buffer) + sm.totalKeys += uint64(len(bufferCopy)) + sm.flusher.updatesCh <- bufferCopy sm.buffer = []StoreMapUpdate{} } return @@ -78,9 +91,13 @@ func (sm *SnapshotMap) Store(key string, val interface{}) (err error) { func (sm *SnapshotMap) Close() (err error) { sm.closing = true // Send the remaining updates + //log.Println("Closing the snapshot map, sending the remaining updates to the flusher. Total keys processed", sm.totalKeys) if err = sm.flusher.Flush(sm.buffer); err != nil { return } sm.flusher.Close() + if sm.totalKeys != sm.flusher.totalKeys { + log.Println("[error] Total keys processed in the snapshot map and the flusher don't match", sm.totalKeys, sm.flusher.totalKeys) + } return } diff --git a/internal/pitsnapshot/snapshot_test.go b/internal/pitsnapshot/snapshot_test.go index fea87af57..9fa8cf488 100644 --- a/internal/pitsnapshot/snapshot_test.go +++ b/internal/pitsnapshot/snapshot_test.go @@ -2,24 +2,136 @@ package pitsnapshot import ( "context" + "fmt" + "log" + "math/rand" "testing" + "time" "github.com/stretchr/testify/assert" ) -func addDataToDummyStore(store *DummyStore) { - store.Set("key1", "value1") - store.Set("key2", "value2") - store.Set("key3", "value3") +const ( + testBufferSize = 1000000 +) + +func addDataToDummyStore(store *DummyStore, count int) { + for i := 0; i < count; i++ { + store.Set(fmt.Sprintf("key%d", i), fmt.Sprintf("value%d", i)) + } + log.Println("Total size of store", len(store.store)) +} + +func fetchOrWriteInStore(ctx context.Context, store *DummyStore, count int, rangeOfKeys int, duration time.Duration, shouldWrite bool) { + randomKeys, _ := pickRandomNumbers(0, count-1, rangeOfKeys) + readsCount := 0 + writesCount := 0 + + for { + select { + case <-time.After(duration): + log.Println("Duration over", "Reads count:", readsCount, "Writes count:", writesCount) + return + case <-ctx.Done(): + log.Println("Context done", "Reads count:", readsCount, "Writes count:", writesCount) + return + default: + for _, key := range randomKeys { + if shouldWrite { + writesCount += 1 + store.Set(fmt.Sprintf("key%d", key), fmt.Sprintf("value%d", key)) + } + readsCount += 1 + store.Get(fmt.Sprintf("key%d", key)) + } + } + } +} + +func pickRandomNumbers(min, max, x int) ([]int, error) { + if min > max { + return nil, fmt.Errorf("min should be less than or equal to max") + } + if x < 0 { + return nil, fmt.Errorf("number of random numbers to pick should be non-negative") + } + if x > (max - min + 1) { + return nil, fmt.Errorf("number of random numbers to pick is greater than the range size") + } + + // Seed the random number generator + rand.Seed(time.Now().UnixNano()) + + // Create a map to ensure unique random numbers + randomNumbers := make(map[int]struct{}) + for len(randomNumbers) < x { + num := rand.Intn(max-min+1) + min + randomNumbers[num] = struct{}{} + } + + // Convert the map keys to a slice + result := make([]int, 0, x) + for num := range randomNumbers { + result = append(result, num) + } + + return result, nil +} + +// Total key size - 1000 +// Range of keys to access - 100 +// Duration - 10ms +// Allow writes - No +func TestSnapshotWithoutChangesWithLowRangeAccess(t *testing.T) { + storeSize := testBufferSize + // Create a new dummy store + dummyStore := NewDummyStore() + addDataToDummyStore(dummyStore, storeSize) + + snapshot, err := NewPointInTimeSnapshot(context.Background(), dummyStore) + go fetchOrWriteInStore(snapshot.ctx, dummyStore, storeSize/10000, storeSize/100000, 10*time.Millisecond, false) + snapshot.Run() + + assert.Nil(t, err) + <-snapshot.ctx.Done() + time.Sleep(10 * time.Millisecond) } -func TestSnapshot(t *testing.T) { +// Total key size - 1000 +// Range of keys to access - 100 +// Duration - 10ms +// Allow writes - No +func TestSnapshotWithChangesWithLowRangeAccess(t *testing.T) { + storeSize := testBufferSize // Create a new dummy store dummyStore := NewDummyStore() - addDataToDummyStore(dummyStore) + addDataToDummyStore(dummyStore, storeSize) snapshot, err := NewPointInTimeSnapshot(context.Background(), dummyStore) + go fetchOrWriteInStore(snapshot.ctx, dummyStore, storeSize/10000, storeSize/100000, 10*time.Millisecond, true) + snapshot.Run() assert.Nil(t, err) <-snapshot.ctx.Done() + time.Sleep(10 * time.Millisecond) +} + +// Test write speed without any snapshots +// Total key size - 1000 +// Range of keys to access - 100 +// Duration - 10ms +// Allow writes - No +func TestNoSnapshotWithChangesWithLowRangeAccess(t *testing.T) { + storeSize := testBufferSize + // Create a new dummy store + dummyStore := NewDummyStore() + addDataToDummyStore(dummyStore, storeSize) + + ctx, cancel := context.WithCancel(context.Background()) + go fetchOrWriteInStore(ctx, dummyStore, storeSize/10000, storeSize/100000, 10*time.Millisecond, true) + + time.Sleep(750 * time.Millisecond) + cancel() + <-ctx.Done() + time.Sleep(10 * time.Millisecond) } diff --git a/internal/store/store.go b/internal/store/store.go index 4974310e9..75e129d65 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -59,7 +59,7 @@ type Store struct { // Snapshot related fields ongoingSnapshots map[uint64]Snapshotter - snapLock *sync.Mutex + snapLock *sync.RWMutex } type Snapshotter interface { @@ -76,7 +76,7 @@ func NewStore(cmdWatchChan chan CmdWatchEvent, evictionStrategy EvictionStrategy cmdWatchChan: cmdWatchChan, evictionStrategy: evictionStrategy, ongoingSnapshots: make(map[uint64]Snapshotter), - snapLock: &sync.Mutex{}, + snapLock: &sync.RWMutex{}, ShardID: shardID, } if evictionStrategy == nil { @@ -140,9 +140,12 @@ func (store *Store) IterateAllKeysForSnapshot(snapshotID uint64) (err error) { if len(store.ongoingSnapshots) == 0 { return } + store.snapLock.RLock() if snapshot, isPresent = store.ongoingSnapshots[snapshotID]; !isPresent { return } + store.snapLock.RUnlock() + store.store.All(func(k string, v *object.Obj) bool { // Check if the data is overridden tempVal, _ := snapshot.TempGet(k) From 9dd1482c73a989b34794c86aead79a9ea12b0e90 Mon Sep 17 00:00:00 2001 From: Gaurav Sarma Date: Mon, 10 Feb 2025 07:27:28 +0800 Subject: [PATCH 3/4] Removed logs --- internal/pitsnapshot/flusher.go | 2 +- internal/pitsnapshot/snapshot.go | 3 ++- internal/pitsnapshot/snapshot_test.go | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/internal/pitsnapshot/flusher.go b/internal/pitsnapshot/flusher.go index dfa4948db..ead5623ba 100644 --- a/internal/pitsnapshot/flusher.go +++ b/internal/pitsnapshot/flusher.go @@ -121,7 +121,7 @@ func (pf *PITFlusher) Close() (err error) { if err = pf.clearDLQ(); err != nil { log.Println("error in clearing DLQ", err) } - log.Println("Closing the flusher for snapshot", pf.snapshotID, ", total keys flushed", pf.totalKeys) + //log.Println("Closing the flusher for snapshot", pf.snapshotID, ", total keys flushed", pf.totalKeys) pf.flushFile.Close() pf.exitCh <- true return diff --git a/internal/pitsnapshot/snapshot.go b/internal/pitsnapshot/snapshot.go index e79faa704..3e9c4e8ea 100644 --- a/internal/pitsnapshot/snapshot.go +++ b/internal/pitsnapshot/snapshot.go @@ -82,6 +82,7 @@ func (pit *PointInTimeSnapshot) Run() (err error) { func (pit *PointInTimeSnapshot) Close() (err error) { pit.EndedAt = time.Now() pit.cancelFunc() - log.Println("Closing snapshot", pit.ID, ". Total time taken", pit.EndedAt.Sub(pit.StartedAt)) + log.Println("Closing snapshot", pit.ID, ". Total time taken", + pit.EndedAt.Sub(pit.StartedAt), "for total keys", pit.SnapshotMap.totalKeys) return } diff --git a/internal/pitsnapshot/snapshot_test.go b/internal/pitsnapshot/snapshot_test.go index 9fa8cf488..4b0e48194 100644 --- a/internal/pitsnapshot/snapshot_test.go +++ b/internal/pitsnapshot/snapshot_test.go @@ -19,7 +19,7 @@ func addDataToDummyStore(store *DummyStore, count int) { for i := 0; i < count; i++ { store.Set(fmt.Sprintf("key%d", i), fmt.Sprintf("value%d", i)) } - log.Println("Total size of store", len(store.store)) + //log.Println("Total size of store", len(store.store)) } func fetchOrWriteInStore(ctx context.Context, store *DummyStore, count int, rangeOfKeys int, duration time.Duration, shouldWrite bool) { From 8b687923e9c207cda456b82ff41c13ebc98270a3 Mon Sep 17 00:00:00 2001 From: Gaurav Sarma Date: Mon, 10 Feb 2025 09:26:30 +0800 Subject: [PATCH 4/4] Updated README --- internal/pitsnapshot/README.md | 8 +++++ internal/pitsnapshot/snapshot_test.go | 52 +++++++++++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/internal/pitsnapshot/README.md b/internal/pitsnapshot/README.md index 0810de910..367288a16 100644 --- a/internal/pitsnapshot/README.md +++ b/internal/pitsnapshot/README.md @@ -24,6 +24,14 @@ The snapshotting technique would be similar to the copy-on-write mechanism, ie, wouldn't have to be stored till the data has to be modified. This means additional memory would only be required if there are changes to the underyling data. +### Impact on current latency benchmarks +- For reads, there should be minimal latency change since there are no references to the `get` +methods even when snapshotting is running. One thing which may impact the read latency is that +it has to iterate through all the keys, so an implicit lock inside the datastructure may be +required. +- For writes, if a snapshot is going on, then it has to write in 2 places and an additional read +to a map. + ### Flow The initiation flow: diff --git a/internal/pitsnapshot/snapshot_test.go b/internal/pitsnapshot/snapshot_test.go index 4b0e48194..212fb56cf 100644 --- a/internal/pitsnapshot/snapshot_test.go +++ b/internal/pitsnapshot/snapshot_test.go @@ -78,6 +78,24 @@ func pickRandomNumbers(min, max, x int) ([]int, error) { return result, nil } +// Total key size - 1000 +// Range of keys to access - 100 +// Duration - 10ms +// Allow writes - No +func TestSnapshotWithoutChangesWithNoRangeAccess(t *testing.T) { + storeSize := testBufferSize + // Create a new dummy store + dummyStore := NewDummyStore() + addDataToDummyStore(dummyStore, storeSize) + + snapshot, err := NewPointInTimeSnapshot(context.Background(), dummyStore) + snapshot.Run() + + assert.Nil(t, err) + <-snapshot.ctx.Done() + time.Sleep(10 * time.Millisecond) +} + // Total key size - 1000 // Range of keys to access - 100 // Duration - 10ms @@ -135,3 +153,37 @@ func TestNoSnapshotWithChangesWithLowRangeAccess(t *testing.T) { <-ctx.Done() time.Sleep(10 * time.Millisecond) } + +func TestNoSnapshotWithChangesWithWideRangeAccess(t *testing.T) { + storeSize := testBufferSize + // Create a new dummy store + dummyStore := NewDummyStore() + addDataToDummyStore(dummyStore, storeSize) + + snapshot, err := NewPointInTimeSnapshot(context.Background(), dummyStore) + go fetchOrWriteInStore(snapshot.ctx, dummyStore, storeSize/10, storeSize/100, 10*time.Millisecond, true) + snapshot.Run() + + assert.Nil(t, err) + + time.Sleep(750 * time.Millisecond) + <-snapshot.ctx.Done() + time.Sleep(10 * time.Millisecond) +} + +func TestNoSnapshotWithChangesWithAllRangeAccess(t *testing.T) { + storeSize := testBufferSize + // Create a new dummy store + dummyStore := NewDummyStore() + addDataToDummyStore(dummyStore, storeSize) + + snapshot, err := NewPointInTimeSnapshot(context.Background(), dummyStore) + go fetchOrWriteInStore(snapshot.ctx, dummyStore, storeSize, storeSize, 10*time.Millisecond, true) + snapshot.Run() + + assert.Nil(t, err) + + time.Sleep(750 * time.Millisecond) + <-snapshot.ctx.Done() + time.Sleep(10 * time.Millisecond) +}