Skip to content

Commit

Permalink
fix: lightnode migration (#4238)
Browse files Browse the repository at this point in the history
Co-authored-by: alok <aloknerurkar@no-reply.com>
  • Loading branch information
aloknerurkar and alok authored Aug 4, 2023
1 parent 9fe73fb commit 27f2fd1
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 6 deletions.
65 changes: 63 additions & 2 deletions pkg/storer/epoch_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,11 @@ func TestEpochMigration(t *testing.T) {
t.Fatal(err)
}

if !strings.ContainsAny(logBytes.String(), "migrating pinning collections done") {
if !strings.Contains(logBytes.String(), "migrating pinning collections done") {
t.Fatalf("expected log to contain 'migrating pinning collections done', got %s", logBytes.String())
}

if !strings.ContainsAny(logBytes.String(), "migrating reserve contents done") {
if !strings.Contains(logBytes.String(), "migrating reserve contents done") {
t.Fatalf("expected log to contain 'migrating pinning collections done', got %s", logBytes.String())
}

Expand All @@ -264,6 +264,67 @@ func TestEpochMigration(t *testing.T) {
t.Fatalf("expected 1 pin, got %d", len(pins))
}

if !strings.Contains(logBytes.String(), pins[0].String()) {
t.Fatalf("expected log to contain root pin reference, got %s", logBytes.String())
}
}

func TestEpochMigrationLightNode(t *testing.T) {
t.Parallel()

var (
dataPath = t.TempDir()
baseAddress = swarm.RandAddress(t)
stateStore = mockstatestore.NewStateStore()
reserve storer.ReservePutter
logBytes = bytes.NewBuffer(nil)
logger = log.NewLogger("test", log.WithSink(logBytes))
indexStore = inmemstore.New()
)

createOldDataDir(t, dataPath, baseAddress, stateStore)

r, err := sharky.NewRecovery(path.Join(dataPath, "sharky"), 2, swarm.SocMaxChunkSize)
if err != nil {
t.Fatal(err)
}

sharkyRecovery := &testSharkyRecovery{Recovery: r}

err = storer.EpochMigration(
context.Background(),
dataPath,
stateStore,
indexStore,
reserve,
sharkyRecovery,
logger,
)
if err != nil {
t.Fatal(err)
}

if !strings.Contains(logBytes.String(), "migrating pinning collections done") {
t.Fatalf("expected log to contain 'migrating pinning collections done', got %s", logBytes.String())
}

if strings.Contains(logBytes.String(), "migrating reserve contents done") {
t.Fatalf("expected log to not contain 'migrating reserve contents done', got %s", logBytes.String())
}

if sharkyRecovery.addCalls != 21 {
t.Fatalf("expected 31 add calls, got %d", sharkyRecovery.addCalls)
}

pins, err := pinstore.Pins(indexStore)
if err != nil {
t.Fatal(err)
}

if len(pins) != 1 {
t.Fatalf("expected 1 pin, got %d", len(pins))
}

if !strings.ContainsAny(logBytes.String(), pins[0].String()) {
t.Fatalf("expected log to contain root pin reference, got %s", logBytes.String())
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/storer/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ var (
EpochMigration = epochMigration
)

type (
ReservePutter = reservePutter
)

func (db *DB) Reserve() *reserve.Reserve {
return db.reserve
}
Expand Down
19 changes: 16 additions & 3 deletions pkg/storer/internal/reserve/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,10 +341,19 @@ func (r *Reserve) EvictBatchBin(
}
moveToCache = append(moveToCache, item.Address)
}
if err := r.cacheCb(ctx, store, moveToCache...); err != nil {
if err := batch.Commit(); err != nil {
return err
}
return batch.Commit()
if err := r.cacheCb(ctx, store, moveToCache...); err != nil {
r.logger.Error(err, "evict and move to cache")
for _, rItem := range moveToCache {
err = store.ChunkStore().Delete(ctx, rItem)
if err != nil {
return err
}
}
}
return nil
})
if err != nil {
return evictionCompleted, err
Expand Down Expand Up @@ -375,7 +384,11 @@ func (r *Reserve) DeleteChunk(
if err != nil {
return err
}
return r.cacheCb(ctx, store, item.Address)
if err := r.cacheCb(ctx, store, item.Address); err != nil {
r.logger.Error(err, "delete and move to cache")
return store.ChunkStore().Delete(ctx, item.Address)
}
return nil
}

// CleanupBinIndex removes the bin index entry for the chunk. This is called mainly
Expand Down
1 change: 1 addition & 0 deletions pkg/storer/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,7 @@ func (db *DB) unreserve(ctx context.Context) (err error) {
if target == 0 {
return nil
}
db.logger.Info("unreserve", "target", target, "radius", radius)

totalEvicted := 0
for radius < swarm.MaxBins {
Expand Down
3 changes: 2 additions & 1 deletion pkg/storer/storer.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,8 @@ func performEpochMigration(ctx context.Context, basePath string, opts *Options)

logger := opts.Logger.WithName("epochmigration").Register()

var rs *reserve.Reserve
var rs reservePutter

if opts.ReserveCapacity > 0 {
rs, err = reserve.New(
opts.Address,
Expand Down

0 comments on commit 27f2fd1

Please sign in to comment.