Skip to content

Commit

Permalink
fix: swallowed batchExpiry trigger (#4237)
Browse files Browse the repository at this point in the history
  • Loading branch information
aloknerurkar authored Aug 1, 2023
1 parent d666ed4 commit 9fe73fb
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 8 deletions.
12 changes: 12 additions & 0 deletions pkg/storer/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,18 @@ func (c *Cache) MoveFromReserve(
state.Head = entriesToAdd[0].Address.Clone()
state.Tail = entriesToAdd[len(entriesToAdd)-1].Address.Clone()
} else {
// update the old tail to point to the new entry
tailEntry := &cacheEntry{Address: state.Tail}
err = store.IndexStore().Get(tailEntry)
if err != nil {
return fmt.Errorf("failed getting tail entry %s: %w", tailEntry, err)
}
tailEntry.Next = entriesToAdd[0].Address.Clone()
entriesToAdd[0].Prev = tailEntry.Address.Clone()
err = batch.Put(tailEntry)
if err != nil {
return fmt.Errorf("failed updating tail entry %s: %w", tailEntry, err)
}
state.Tail = entriesToAdd[len(entriesToAdd)-1].Address.Clone()
if state.Count+uint64(len(entriesToAdd)) > c.capacity {
// this means that we need to remove some entries from the cache. The cache
Expand Down
24 changes: 24 additions & 0 deletions pkg/storer/internal/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,30 @@ func TestMoveFromReserve(t *testing.T) {

verifyCacheState(t, st.IndexStore(), c, chunks[0].Address(), chunks[9].Address(), 10)
verifyCacheOrder(t, c, st.IndexStore(), chunks...)

chunks2 := chunktest.GenerateTestRandomChunks(5)
chunksToMove2 := make([]swarm.Address, 0, 5)

// add the chunks to chunkstore. This simulates the reserve already populating
// the chunkstore with chunks.
for _, ch := range chunks2 {
err := st.ChunkStore().Put(context.Background(), ch)
if err != nil {
t.Fatal(err)
}
chunksToMove2 = append(chunksToMove2, ch.Address())
}

// move new chunks
err = c.MoveFromReserve(context.Background(), st, chunksToMove2...)
if err != nil {
t.Fatal(err)
}

cacheChunks := append(chunks[5:], chunks2...)

verifyCacheState(t, st.IndexStore(), c, cacheChunks[0].Address(), cacheChunks[9].Address(), 10)
verifyCacheOrder(t, c, st.IndexStore(), cacheChunks...)
})

t.Run("move from reserve over capacity", func(t *testing.T) {
Expand Down
31 changes: 24 additions & 7 deletions pkg/storer/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,10 @@ func (db *DB) evictionWorker(ctx context.Context) {
defer overCapUnsub()

var (
unreserveSem = semaphore.NewWeighted(1)
unreserveCtx context.Context
cancelUnreserve context.CancelFunc
expirySem = semaphore.NewWeighted(1)
expiryWorkers = semaphore.NewWeighted(4)
unreserveSem = semaphore.NewWeighted(1)
unreserveCtx, cancelUnreserve = context.WithCancel(ctx)
expirySem = semaphore.NewWeighted(1)
expiryWorkers = semaphore.NewWeighted(4)
)

stopped := make(chan struct{})
Expand All @@ -180,7 +179,13 @@ func (db *DB) evictionWorker(ctx context.Context) {
}

go func() {
defer expirySem.Release(1)
reTrigger := false
defer func() {
expirySem.Release(1)
if reTrigger {
db.events.Trigger(batchExpiry)
}
}()

batchesToEvict, err := db.getExpiredBatches()
if err != nil {
Expand All @@ -192,6 +197,10 @@ func (db *DB) evictionWorker(ctx context.Context) {
return
}

// After this point we start swallowing signals, so ensure we do 1 more
// trigger at the end.
reTrigger = true

// we ensure unreserve is not running and if it is we cancel it and wait
// for it to finish, this is to prevent unreserve and expirations running
// at the same time. The expiration will free up space so the unreserve
Expand Down Expand Up @@ -233,6 +242,7 @@ func (db *DB) evictionWorker(ctx context.Context) {
}()
}

// wait for all workers to finish
if err := expiryWorkers.Acquire(ctx, 4); err != nil {
db.logger.Error(err, "wait for expiry workers")
return
Expand Down Expand Up @@ -263,7 +273,14 @@ func (db *DB) evictionWorker(ctx context.Context) {
continue
}
go func() {
defer unreserveSem.Release(1)
defer func() {
unreserveSem.Release(1)
if !db.reserve.IsWithinCapacity() {
// if we are still over capacity trigger again as we
// might swallow the signal
db.events.Trigger(reserveOverCapacity)
}
}()

unreserveCtx, cancelUnreserve = context.WithCancel(ctx)
err := db.unreserve(unreserveCtx)
Expand Down
2 changes: 1 addition & 1 deletion pkg/storer/reserve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func TestUnreserveCap(t *testing.T) {
if storer.ReserveSize() == capacity {
break done
}
case <-time.After(time.Second * 45):
case <-time.After(time.Second * 30):
if storer.ReserveSize() != capacity {
t.Fatal("timeout waiting for reserve to reach capacity")
}
Expand Down

0 comments on commit 9fe73fb

Please sign in to comment.