diff --git a/pkg/storer/internal/cache/cache.go b/pkg/storer/internal/cache/cache.go index c7d473f6e44..4e64dfef956 100644 --- a/pkg/storer/internal/cache/cache.go +++ b/pkg/storer/internal/cache/cache.go @@ -505,6 +505,18 @@ func (c *Cache) MoveFromReserve( state.Head = entriesToAdd[0].Address.Clone() state.Tail = entriesToAdd[len(entriesToAdd)-1].Address.Clone() } else { + // update the old tail to point to the new entry + tailEntry := &cacheEntry{Address: state.Tail} + err = store.IndexStore().Get(tailEntry) + if err != nil { + return fmt.Errorf("failed getting tail entry %s: %w", tailEntry, err) + } + tailEntry.Next = entriesToAdd[0].Address.Clone() + entriesToAdd[0].Prev = tailEntry.Address.Clone() + err = batch.Put(tailEntry) + if err != nil { + return fmt.Errorf("failed updating tail entry %s: %w", tailEntry, err) + } state.Tail = entriesToAdd[len(entriesToAdd)-1].Address.Clone() if state.Count+uint64(len(entriesToAdd)) > c.capacity { // this means that we need to remove some entries from the cache. The cache diff --git a/pkg/storer/internal/cache/cache_test.go b/pkg/storer/internal/cache/cache_test.go index cc03be4021b..8715af88e7a 100644 --- a/pkg/storer/internal/cache/cache_test.go +++ b/pkg/storer/internal/cache/cache_test.go @@ -482,6 +482,30 @@ func TestMoveFromReserve(t *testing.T) { verifyCacheState(t, st.IndexStore(), c, chunks[0].Address(), chunks[9].Address(), 10) verifyCacheOrder(t, c, st.IndexStore(), chunks...) + + chunks2 := chunktest.GenerateTestRandomChunks(5) + chunksToMove2 := make([]swarm.Address, 0, 5) + + // add the chunks to chunkstore. This simulates the reserve already populating + // the chunkstore with chunks. + for _, ch := range chunks2 { + err := st.ChunkStore().Put(context.Background(), ch) + if err != nil { + t.Fatal(err) + } + chunksToMove2 = append(chunksToMove2, ch.Address()) + } + + // move new chunks + err = c.MoveFromReserve(context.Background(), st, chunksToMove2...) + if err != nil { + t.Fatal(err) + } + + cacheChunks := append(chunks[5:], chunks2...) + + verifyCacheState(t, st.IndexStore(), c, cacheChunks[0].Address(), cacheChunks[9].Address(), 10) + verifyCacheOrder(t, c, st.IndexStore(), cacheChunks...) }) t.Run("move from reserve over capacity", func(t *testing.T) { diff --git a/pkg/storer/reserve.go b/pkg/storer/reserve.go index 4813a40ee1d..3af31f78c9c 100644 --- a/pkg/storer/reserve.go +++ b/pkg/storer/reserve.go @@ -154,11 +154,10 @@ func (db *DB) evictionWorker(ctx context.Context) { defer overCapUnsub() var ( - unreserveSem = semaphore.NewWeighted(1) - unreserveCtx context.Context - cancelUnreserve context.CancelFunc - expirySem = semaphore.NewWeighted(1) - expiryWorkers = semaphore.NewWeighted(4) + unreserveSem = semaphore.NewWeighted(1) + unreserveCtx, cancelUnreserve = context.WithCancel(ctx) + expirySem = semaphore.NewWeighted(1) + expiryWorkers = semaphore.NewWeighted(4) ) stopped := make(chan struct{}) @@ -180,7 +179,13 @@ func (db *DB) evictionWorker(ctx context.Context) { } go func() { - defer expirySem.Release(1) + reTrigger := false + defer func() { + expirySem.Release(1) + if reTrigger { + db.events.Trigger(batchExpiry) + } + }() batchesToEvict, err := db.getExpiredBatches() if err != nil { @@ -192,6 +197,10 @@ func (db *DB) evictionWorker(ctx context.Context) { return } + // After this point we start swallowing signals, so ensure we do 1 more + // trigger at the end. + reTrigger = true + // we ensure unreserve is not running and if it is we cancel it and wait // for it to finish, this is to prevent unreserve and expirations running // at the same time. The expiration will free up space so the unreserve @@ -233,6 +242,7 @@ func (db *DB) evictionWorker(ctx context.Context) { }() } + // wait for all workers to finish if err := expiryWorkers.Acquire(ctx, 4); err != nil { db.logger.Error(err, "wait for expiry workers") return @@ -263,7 +273,14 @@ func (db *DB) evictionWorker(ctx context.Context) { continue } go func() { - defer unreserveSem.Release(1) + defer func() { + unreserveSem.Release(1) + if !db.reserve.IsWithinCapacity() { + // if we are still over capacity trigger again as we + // might swallow the signal + db.events.Trigger(reserveOverCapacity) + } + }() unreserveCtx, cancelUnreserve = context.WithCancel(ctx) err := db.unreserve(unreserveCtx) diff --git a/pkg/storer/reserve_test.go b/pkg/storer/reserve_test.go index 85f7aa080eb..f09288956d4 100644 --- a/pkg/storer/reserve_test.go +++ b/pkg/storer/reserve_test.go @@ -294,7 +294,7 @@ func TestUnreserveCap(t *testing.T) { if storer.ReserveSize() == capacity { break done } - case <-time.After(time.Second * 45): + case <-time.After(time.Second * 30): if storer.ReserveSize() != capacity { t.Fatal("timeout waiting for reserve to reach capacity") }