diff --git a/internal/pitsnapshot/README.md b/internal/pitsnapshot/README.md new file mode 100644 index 000000000..367288a16 --- /dev/null +++ b/internal/pitsnapshot/README.md @@ -0,0 +1,84 @@ +# Point in time Snapshots + +A point in time snapshot refers to the copy of the existing data which is representative of +the data in the memory at that specific time. + +## Goals +- Don't affect the throughput performance of the current request processing layer. +- Ability to take multiple snapshot instances simultaneously. +- Ability to snapshot and restore on systems with different shards +- Shouldn't depend on existing data files apart from the in-memory data structures + +## Design + +### Recap +DiceDB runs multiple `ShardThreads` based on the number of CPUs in the machine and there is +one `Store` object for every thread. Since DiceDB follows a shared nothing architecture, it +should be possible to snapshot and restore data with varying CPU counts. + +The `Store` object keeps a reference to the `Object` object in a map. The `Object` object is +where the data is being stored. + +### Implementing Copy on write +The snapshotting technique would be similar to the copy-on-write mechanism, ie, additional data +wouldn't have to be stored till the data has to be modified. This means additional memory would +only be required if there are changes to the underyling data. + +### Impact on current latency benchmarks +- For reads, there should be minimal latency change since there are no references to the `get` +methods even when snapshotting is running. One thing which may impact the read latency is that +it has to iterate through all the keys, so an implicit lock inside the datastructure may be +required. +- For writes, if a snapshot is going on, then it has to write in 2 places and an additional read +to a map. + +### Flow + +The initiation flow: +```bash +ShardThread::CallSnapshotter -> Snapshotter::Start -> Store::StartSnapshot -> SnapshotMap::Buffer +-> PITFlusher::Flush +``` + +When the iteration is over +```bash +Store::StopSnapshot -> SnapshotMap::FlushAllData -> PITFlusher::FlushAllData -> Snapshotter::Close +``` + +### Changes for ShardThread and Store +The snapshot would start on every `ShardThread` and fetch the `Store` object. Every `Store` object +needs to implement the interface `SnapshotStore` which is contains the `StartSnapshot` and `StopSnapshot` +methods. +The `StartSnapshot` and `StopSnapshot` methods would be called on the store from the snapshotter object. + +#### StartSnapshot +When the `StartSnapshot` method is called, the `Store` should keep note of the `SnapshotID` in a map. +There can be multiple instances of snapshots for every store as well. +For any read or write operation which is performed, the `Store` object should check if a snapshot is being +run at that instance. If no snapshot is being run, then continue as usual. +If a snapshot is being run, then for any subsequent write operation, store the previous data in the snapshot's +object, maybe a map. Let's call this the `SnapshotMap`. If there are multiple write operations to the same object +and the data already exists in the `SnapshotMap`, then skip doing anything for the snapshot. +Similarly, for reads, if a snapshot is being run, if the incoming request is from a snapshot layer, then check +if there is anything in the `SnapshotMap` for the key. If no, then return the current value from the `Store`. + +It should fetch the list of keys in its store attribute and iterate through them. + +#### StopSnapshot +When the iteration through all the keys by the `Store` object is done, the `StopSnapshot` method is called by the +`Store`. The `StopSnapshot` lets the `SnapshotMap` know that there are no more updates coming. The `SnapshotMap` +then talks to the `PITFLusher` to finish syncing all the chunks to disk and then closes the main snapshot +process. + +### Point-in-time Flusher +The `PITFlusher` serializes the store updates from the `SnapshotMap` to binary format, currently `gob`. +It serializes and appends to a file. + +## Test cases and benchmarks +- Snapshot data less than the buffer size without any subsequent writes +- Snapshot data less than the buffer size with localized subsequent writes +- Snapshot data less than the buffer size with spread out subsequent writes +- Snapshot data more than the buffer size without any subsequent writes +- Snapshot data more than the buffer size with localized subsequent writes +- Snapshot data more than the buffer size with spread out subsequent writes +- Ensure current `get` path is not affected \ No newline at end of file diff --git a/internal/pitsnapshot/dummy_store.go b/internal/pitsnapshot/dummy_store.go new file mode 100644 index 000000000..51360a4c8 --- /dev/null +++ b/internal/pitsnapshot/dummy_store.go @@ -0,0 +1,94 @@ +package pitsnapshot + +import ( + "log" + "sync" + + "github.com/dicedb/dice/internal/store" +) + +type ( + DummyStore struct { + store map[string]string + ongoingSnapshots map[uint64]store.Snapshotter + snapLock *sync.RWMutex + stLock *sync.RWMutex + } +) + +func NewDummyStore() *DummyStore { + return &DummyStore{ + store: make(map[string]string), + ongoingSnapshots: make(map[uint64]store.Snapshotter), + snapLock: &sync.RWMutex{}, + stLock: &sync.RWMutex{}, + } +} + +func (store *DummyStore) Set(key, value string) { + + // Take a snapshot of the existing data since it's going to be overridden + store.snapLock.RLock() + for _, snapshot := range store.ongoingSnapshots { + snapshot.TempAdd(key, value) + } + store.snapLock.RUnlock() + + store.stLock.Lock() + defer store.stLock.Unlock() + + store.store[key] = value +} + +func (store *DummyStore) Get(key string) string { + store.stLock.RLock() + defer store.stLock.RUnlock() + + return store.store[key] +} + +func (store *DummyStore) StartSnapshot(snapshotID uint64, snapshot store.Snapshotter) (err error) { + log.Println("Starting snapshot for snapshotID", snapshotID) + + store.snapLock.Lock() + store.ongoingSnapshots[snapshotID] = snapshot + store.snapLock.Unlock() + + store.stLock.RLock() + keys := make([]string, 0, len(store.store)) + for k := range store.store { + keys = append(keys, k) + } + store.stLock.RUnlock() + + for _, k := range keys { + + store.stLock.RLock() + v := store.store[k] + store.stLock.RUnlock() + + // Check if the data is overridden + tempVal, _ := snapshot.TempGet(k) + if tempVal != nil { + v = tempVal.(string) + } + err = snapshot.Store(k, v) + if err != nil { + log.Println("Error storing data in snapshot", "error", err) + } + } + store.StopSnapshot(snapshotID) + return +} + +func (store *DummyStore) StopSnapshot(snapshotID uint64) (err error) { + store.snapLock.Lock() + if snapshot, isPresent := store.ongoingSnapshots[snapshotID]; isPresent { + if err = snapshot.Close(); err != nil { + log.Println("Error closing snapshot", "error", err) + } + delete(store.ongoingSnapshots, snapshotID) + } + store.snapLock.Unlock() + return +} diff --git a/internal/pitsnapshot/flusher.go b/internal/pitsnapshot/flusher.go new file mode 100644 index 000000000..ead5623ba --- /dev/null +++ b/internal/pitsnapshot/flusher.go @@ -0,0 +1,128 @@ +package pitsnapshot + +import ( + "context" + "fmt" + "log" + "os" +) + +const ( + FlushFilePath = "/tmp/flusher-%d.ddb" + Delimiter = "\n" +) + +type ( + PITFlusher struct { + ctx context.Context + snapshotID uint64 + updatesCh chan []StoreMapUpdate + exitCh chan bool + dlq [][]StoreMapUpdate + + totalKeys uint64 + flushFile *os.File + } +) + +func NewPITFlusher(ctx context.Context, snapshotID uint64, exitCh chan bool) (pf *PITFlusher, err error) { + pf = &PITFlusher{ + ctx: ctx, + snapshotID: snapshotID, + exitCh: exitCh, + updatesCh: make(chan []StoreMapUpdate, 10*BufferSize), + } + if err = pf.setup(); err != nil { + return + } + return +} + +func (pf *PITFlusher) setup() (err error) { + filePath := fmt.Sprintf(FlushFilePath, pf.snapshotID) + if _, err = os.Stat(filePath); os.IsExist(err) { + return + } + if pf.flushFile, err = os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644); err != nil { + return + } + return +} + +func (pf *PITFlusher) Start() (err error) { + for { + select { + case <-pf.ctx.Done(): + return + case updates := <-pf.updatesCh: + // TODO: Store the failed updates somewhere + if err = pf.Flush(updates); err != nil { + //log.Println("error in flushing updates, pushing to DLQ", err) + pf.dlq = append(pf.dlq, updates) + continue + } + continue + } + } + return +} + +func (pf *PITFlusher) Flush(updates []StoreMapUpdate) (err error) { + var ( + serializedUpdates []byte + storeUpdate StoreMapUpdate + ) + pf.totalKeys += uint64(len(updates)) + if serializedUpdates, err = storeUpdate.Serialize(updates); err != nil { + return + } + serializedUpdates = append(serializedUpdates, Delimiter...) + if _, err = pf.flushFile.Write(serializedUpdates); err != nil { + return + } + return +} + +func (pf *PITFlusher) clearDLQ() (err error) { + for { + select { + case updates := <-pf.updatesCh: + if err := pf.Flush(updates); err != nil { + log.Println("Error in flushing updates while draining", err) + pf.dlq = append(pf.dlq, updates) + } + default: + // Channel is empty + goto Done + } + } +Done: + totalKeysInDLQ := 0 + for _, updates := range pf.dlq { + var ( + update []StoreMapUpdate + ) + update = updates + totalKeysInDLQ += len(update) + if err = pf.Flush(update); err != nil { + return + } + } + if totalKeysInDLQ > 0 { + log.Println("Total keys in DLQ", totalKeysInDLQ, len(pf.updatesCh)) + } + return +} + +// Close is called when the overlying SnapshotMap's channel is closed. +// After the SnapshotMap is closed, there is one final push to update all the pending +// data to the flusher +func (pf *PITFlusher) Close() (err error) { + if err = pf.clearDLQ(); err != nil { + log.Println("error in clearing DLQ", err) + } + //log.Println("Closing the flusher for snapshot", pf.snapshotID, ", total keys flushed", pf.totalKeys) + pf.flushFile.Close() + pf.exitCh <- true + return +} diff --git a/internal/pitsnapshot/snapshot.go b/internal/pitsnapshot/snapshot.go new file mode 100644 index 000000000..3e9c4e8ea --- /dev/null +++ b/internal/pitsnapshot/snapshot.go @@ -0,0 +1,88 @@ +package pitsnapshot + +import ( + "context" + "log" + "time" + + "github.com/dicedb/dice/internal/store" +) + +const ( + BufferSize = 1000 +) + +type ( + SnapshotStore interface { + StartSnapshot(uint64, store.Snapshotter) error + StopSnapshot(uint64) error + } + PointInTimeSnapshot struct { + ctx context.Context + cancelFunc context.CancelFunc + + ID uint64 + + store SnapshotStore + totalStoreShot uint8 + + SnapshotMap *SnapshotMap + + flusher *PITFlusher + + StartedAt time.Time + EndedAt time.Time + + exitCh chan bool + } +) + +func NewPointInTimeSnapshot(ctx context.Context, store SnapshotStore) (pit *PointInTimeSnapshot, err error) { + pit = &PointInTimeSnapshot{ + ctx: ctx, + ID: uint64(time.Now().Nanosecond()), + StartedAt: time.Now(), + + store: store, + exitCh: make(chan bool, 5), + } + pit.ctx, pit.cancelFunc = context.WithCancel(ctx) + if pit.flusher, err = NewPITFlusher(pit.ctx, pit.ID, pit.exitCh); err != nil { + return + } + if pit.SnapshotMap, err = NewSnapshotMap(pit.ctx, pit.flusher); err != nil { + return + } + return +} + +func (pit *PointInTimeSnapshot) processStoreUpdates() (err error) { + for { + select { + case <-pit.ctx.Done(): + pit.Close() + return + case <-pit.exitCh: + pit.Close() + return + } + } + return +} + +func (pit *PointInTimeSnapshot) Run() (err error) { + go pit.flusher.Start() + if err = pit.store.StartSnapshot(pit.ID, pit.SnapshotMap); err != nil { + return + } + go pit.processStoreUpdates() + return +} + +func (pit *PointInTimeSnapshot) Close() (err error) { + pit.EndedAt = time.Now() + pit.cancelFunc() + log.Println("Closing snapshot", pit.ID, ". Total time taken", + pit.EndedAt.Sub(pit.StartedAt), "for total keys", pit.SnapshotMap.totalKeys) + return +} diff --git a/internal/pitsnapshot/snapshot_map.go b/internal/pitsnapshot/snapshot_map.go new file mode 100644 index 000000000..5c58d3707 --- /dev/null +++ b/internal/pitsnapshot/snapshot_map.go @@ -0,0 +1,103 @@ +package pitsnapshot + +import ( + "bytes" + "context" + "encoding/gob" + "log" + "sync" +) + +type ( + SnapshotMap struct { + tempRepr map[string]interface{} + buffer []StoreMapUpdate + flusher *PITFlusher + closing bool + mLock *sync.RWMutex + totalKeys uint64 + } + StoreMapUpdate struct { + Key string + Value interface{} + } +) + +func (s StoreMapUpdate) Serialize(updates []StoreMapUpdate) (updateBytes []byte, err error) { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + err = enc.Encode(updates) + if err != nil { + return nil, err + } + updateBytes = buf.Bytes() + return +} + +func (s StoreMapUpdate) Deserialize(data []byte) (result []StoreMapUpdate, err error) { + buf := bytes.NewBuffer(data) + dec := gob.NewDecoder(buf) + if err = dec.Decode(&result); err != nil { + return + } + return +} + +func NewSnapshotMap(ctx context.Context, flusher *PITFlusher) (sm *SnapshotMap, err error) { + sm = &SnapshotMap{ + tempRepr: make(map[string]interface{}, 1000), + flusher: flusher, + mLock: &sync.RWMutex{}, + } + return + +} + +func (sm *SnapshotMap) TempAdd(key string, val interface{}) (err error) { + sm.mLock.Lock() + defer sm.mLock.Unlock() + + if _, ok := sm.tempRepr[key]; ok { + return + } + sm.tempRepr[key] = val + return nil +} + +func (sm *SnapshotMap) TempGet(key string) (interface{}, error) { + sm.mLock.RLock() + defer sm.mLock.RUnlock() + + return sm.tempRepr[key], nil +} + +func (sm *SnapshotMap) Store(key string, val interface{}) (err error) { + //log.Println("Storing data in snapshot", "key", key, "value", val) + if sm.closing { + log.Println("rejecting writes to the update channel since the snapshot map is closing") + return + } + sm.buffer = append(sm.buffer, StoreMapUpdate{Key: key, Value: val}) + if len(sm.buffer) >= BufferSize { + bufferCopy := make([]StoreMapUpdate, len(sm.buffer)) + copy(bufferCopy, sm.buffer) + sm.totalKeys += uint64(len(bufferCopy)) + sm.flusher.updatesCh <- bufferCopy + sm.buffer = []StoreMapUpdate{} + } + return +} + +func (sm *SnapshotMap) Close() (err error) { + sm.closing = true + // Send the remaining updates + //log.Println("Closing the snapshot map, sending the remaining updates to the flusher. Total keys processed", sm.totalKeys) + if err = sm.flusher.Flush(sm.buffer); err != nil { + return + } + sm.flusher.Close() + if sm.totalKeys != sm.flusher.totalKeys { + log.Println("[error] Total keys processed in the snapshot map and the flusher don't match", sm.totalKeys, sm.flusher.totalKeys) + } + return +} diff --git a/internal/pitsnapshot/snapshot_test.go b/internal/pitsnapshot/snapshot_test.go new file mode 100644 index 000000000..212fb56cf --- /dev/null +++ b/internal/pitsnapshot/snapshot_test.go @@ -0,0 +1,189 @@ +package pitsnapshot + +import ( + "context" + "fmt" + "log" + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +const ( + testBufferSize = 1000000 +) + +func addDataToDummyStore(store *DummyStore, count int) { + for i := 0; i < count; i++ { + store.Set(fmt.Sprintf("key%d", i), fmt.Sprintf("value%d", i)) + } + //log.Println("Total size of store", len(store.store)) +} + +func fetchOrWriteInStore(ctx context.Context, store *DummyStore, count int, rangeOfKeys int, duration time.Duration, shouldWrite bool) { + randomKeys, _ := pickRandomNumbers(0, count-1, rangeOfKeys) + readsCount := 0 + writesCount := 0 + + for { + select { + case <-time.After(duration): + log.Println("Duration over", "Reads count:", readsCount, "Writes count:", writesCount) + return + case <-ctx.Done(): + log.Println("Context done", "Reads count:", readsCount, "Writes count:", writesCount) + return + default: + for _, key := range randomKeys { + if shouldWrite { + writesCount += 1 + store.Set(fmt.Sprintf("key%d", key), fmt.Sprintf("value%d", key)) + } + readsCount += 1 + store.Get(fmt.Sprintf("key%d", key)) + } + } + } +} + +func pickRandomNumbers(min, max, x int) ([]int, error) { + if min > max { + return nil, fmt.Errorf("min should be less than or equal to max") + } + if x < 0 { + return nil, fmt.Errorf("number of random numbers to pick should be non-negative") + } + if x > (max - min + 1) { + return nil, fmt.Errorf("number of random numbers to pick is greater than the range size") + } + + // Seed the random number generator + rand.Seed(time.Now().UnixNano()) + + // Create a map to ensure unique random numbers + randomNumbers := make(map[int]struct{}) + for len(randomNumbers) < x { + num := rand.Intn(max-min+1) + min + randomNumbers[num] = struct{}{} + } + + // Convert the map keys to a slice + result := make([]int, 0, x) + for num := range randomNumbers { + result = append(result, num) + } + + return result, nil +} + +// Total key size - 1000 +// Range of keys to access - 100 +// Duration - 10ms +// Allow writes - No +func TestSnapshotWithoutChangesWithNoRangeAccess(t *testing.T) { + storeSize := testBufferSize + // Create a new dummy store + dummyStore := NewDummyStore() + addDataToDummyStore(dummyStore, storeSize) + + snapshot, err := NewPointInTimeSnapshot(context.Background(), dummyStore) + snapshot.Run() + + assert.Nil(t, err) + <-snapshot.ctx.Done() + time.Sleep(10 * time.Millisecond) +} + +// Total key size - 1000 +// Range of keys to access - 100 +// Duration - 10ms +// Allow writes - No +func TestSnapshotWithoutChangesWithLowRangeAccess(t *testing.T) { + storeSize := testBufferSize + // Create a new dummy store + dummyStore := NewDummyStore() + addDataToDummyStore(dummyStore, storeSize) + + snapshot, err := NewPointInTimeSnapshot(context.Background(), dummyStore) + go fetchOrWriteInStore(snapshot.ctx, dummyStore, storeSize/10000, storeSize/100000, 10*time.Millisecond, false) + snapshot.Run() + + assert.Nil(t, err) + <-snapshot.ctx.Done() + time.Sleep(10 * time.Millisecond) +} + +// Total key size - 1000 +// Range of keys to access - 100 +// Duration - 10ms +// Allow writes - No +func TestSnapshotWithChangesWithLowRangeAccess(t *testing.T) { + storeSize := testBufferSize + // Create a new dummy store + dummyStore := NewDummyStore() + addDataToDummyStore(dummyStore, storeSize) + + snapshot, err := NewPointInTimeSnapshot(context.Background(), dummyStore) + go fetchOrWriteInStore(snapshot.ctx, dummyStore, storeSize/10000, storeSize/100000, 10*time.Millisecond, true) + snapshot.Run() + + assert.Nil(t, err) + <-snapshot.ctx.Done() + time.Sleep(10 * time.Millisecond) +} + +// Test write speed without any snapshots +// Total key size - 1000 +// Range of keys to access - 100 +// Duration - 10ms +// Allow writes - No +func TestNoSnapshotWithChangesWithLowRangeAccess(t *testing.T) { + storeSize := testBufferSize + // Create a new dummy store + dummyStore := NewDummyStore() + addDataToDummyStore(dummyStore, storeSize) + + ctx, cancel := context.WithCancel(context.Background()) + go fetchOrWriteInStore(ctx, dummyStore, storeSize/10000, storeSize/100000, 10*time.Millisecond, true) + + time.Sleep(750 * time.Millisecond) + cancel() + <-ctx.Done() + time.Sleep(10 * time.Millisecond) +} + +func TestNoSnapshotWithChangesWithWideRangeAccess(t *testing.T) { + storeSize := testBufferSize + // Create a new dummy store + dummyStore := NewDummyStore() + addDataToDummyStore(dummyStore, storeSize) + + snapshot, err := NewPointInTimeSnapshot(context.Background(), dummyStore) + go fetchOrWriteInStore(snapshot.ctx, dummyStore, storeSize/10, storeSize/100, 10*time.Millisecond, true) + snapshot.Run() + + assert.Nil(t, err) + + time.Sleep(750 * time.Millisecond) + <-snapshot.ctx.Done() + time.Sleep(10 * time.Millisecond) +} + +func TestNoSnapshotWithChangesWithAllRangeAccess(t *testing.T) { + storeSize := testBufferSize + // Create a new dummy store + dummyStore := NewDummyStore() + addDataToDummyStore(dummyStore, storeSize) + + snapshot, err := NewPointInTimeSnapshot(context.Background(), dummyStore) + go fetchOrWriteInStore(snapshot.ctx, dummyStore, storeSize, storeSize, 10*time.Millisecond, true) + snapshot.Run() + + assert.Nil(t, err) + + time.Sleep(750 * time.Millisecond) + <-snapshot.ctx.Done() + time.Sleep(10 * time.Millisecond) +} diff --git a/internal/store/store.go b/internal/store/store.go index 110c253c1..75e129d65 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -4,7 +4,9 @@ package store import ( + "log" "path" + "sync" "github.com/dicedb/dice/internal/common" "github.com/dicedb/dice/internal/object" @@ -54,6 +56,17 @@ type Store struct { cmdWatchChan chan CmdWatchEvent evictionStrategy EvictionStrategy ShardID int + + // Snapshot related fields + ongoingSnapshots map[uint64]Snapshotter + snapLock *sync.RWMutex +} + +type Snapshotter interface { + TempAdd(string, interface{}) error + TempGet(string) (interface{}, error) + Store(string, interface{}) error + Close() error } func NewStore(cmdWatchChan chan CmdWatchEvent, evictionStrategy EvictionStrategy, shardID int) *Store { @@ -62,6 +75,8 @@ func NewStore(cmdWatchChan chan CmdWatchEvent, evictionStrategy EvictionStrategy expires: NewExpireRegMap(), cmdWatchChan: cmdWatchChan, evictionStrategy: evictionStrategy, + ongoingSnapshots: make(map[uint64]Snapshotter), + snapLock: &sync.RWMutex{}, ShardID: shardID, } if evictionStrategy == nil { @@ -97,6 +112,56 @@ func (store *Store) ResetStore() { store.expires = NewExpireMap() } +func (store *Store) StartSnapshot(snapshotID uint64, snapshot Snapshotter) { + store.snapLock.Lock() + store.ongoingSnapshots[snapshotID] = snapshot + store.snapLock.Unlock() + + // Iterate and store all the keys in the snapshot + store.IterateAllKeysForSnapshot(snapshotID) + +} + +func (store *Store) StopSnapshot(snapshotID uint64) { + store.snapLock.Lock() + if snapshot, isPresent := store.ongoingSnapshots[snapshotID]; isPresent { + snapshot.Close() + delete(store.ongoingSnapshots, snapshotID) + } + store.snapLock.Unlock() +} + +func (store *Store) IterateAllKeysForSnapshot(snapshotID uint64) (err error) { + var ( + snapshot Snapshotter + isPresent bool + ) + // If there are no ongoing snapshots, then there won't be any temp data to store + if len(store.ongoingSnapshots) == 0 { + return + } + store.snapLock.RLock() + if snapshot, isPresent = store.ongoingSnapshots[snapshotID]; !isPresent { + return + } + store.snapLock.RUnlock() + + store.store.All(func(k string, v *object.Obj) bool { + // Check if the data is overridden + tempVal, _ := snapshot.TempGet(k) + if tempVal != nil { + v = tempVal.(*object.Obj) + } + err = snapshot.Store(k, v) + if err != nil { + log.Println("Error storing data in snapshot", "error", err) + } + return true + }) + store.StopSnapshot(snapshotID) + return +} + func (store *Store) Put(k string, obj *object.Obj, opts ...PutOption) { store.putHelper(k, obj, opts...) } @@ -147,6 +212,10 @@ func (store *Store) putHelper(k string, obj *object.Obj, opts ...PutOption) { store.numKeys++ } + // Take a snapshot of the existing data since it's going to be overridden + for _, snapshot := range store.ongoingSnapshots { + snapshot.TempAdd(k, obj) + } store.store.Put(k, obj) store.evictionStrategy.OnAccess(k, obj, AccessSet) @@ -295,6 +364,10 @@ func (store *Store) deleteKey(k string, obj *object.Obj, opts ...DelOption) bool } if obj != nil { + // Take a snapshot of the existing data since it's going to be overridden + for _, snapshot := range store.ongoingSnapshots { + snapshot.TempAdd(k, obj) + } store.store.Delete(k) store.expires.Delete(obj) store.numKeys--