Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 61 additions & 14 deletions cmd/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
Expand All @@ -14,7 +15,6 @@ import (
"cosmossdk.io/store"
"cosmossdk.io/store/metrics"
"cosmossdk.io/store/snapshots"
"cosmossdk.io/store/types"
storetypes "cosmossdk.io/store/types"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -83,9 +83,9 @@ func PruneAppState(params *ApplicationPrunerParams) (bool, error) {
}
}

keys := types.NewKVStoreKeys(storeNames...)
keys := storetypes.NewKVStoreKeys(storeNames...)
for _, value := range keys {
appStore.MountStoreWithDB(value, types.StoreTypeIAVL, nil)
appStore.MountStoreWithDB(value, storetypes.StoreTypeIAVL, nil)
}

err := appStore.LoadLatestVersion()
Expand Down Expand Up @@ -198,36 +198,83 @@ func SnapshotAndRestoreApp(params *ApplicationPrunerParams) (bool, error) {
if err != nil {
return false, fmt.Errorf("failed to create snapshot: %w", err)
}
logger.Info("snapshot created, removing old application.db")
logger.Info("snapshot created, creating new application DB")

_ = params.appDB.Close()
if err := os.RemoveAll(filepath.Join(params.dataDir, "application.db")); err != nil {
return false, fmt.Errorf("failed to remove application.db: %w", err)
}
params.appDB, err = db.NewDB("application", params.dbfmt, params.dataDir)
newAppDB, err := db.NewDB("application_new", params.dbfmt, params.dataDir)
if err != nil {
return false, fmt.Errorf("failed to recreate application DB: %w", err)
return false, fmt.Errorf("failed to create new application DB: %w", err)
}

freshStore := store.NewCommitMultiStore(params.appDB, logger, metrics.NewNoOpMetrics())
freshStore := store.NewCommitMultiStore(newAppDB, logger, metrics.NewNoOpMetrics())
freshStore.SetIAVLDisableFastNode(params.iavlDisableFastNode)
for _, key := range keys {
freshStore.MountStoreWithDB(key, storetypes.StoreTypeIAVL, nil)
}

if err := freshStore.LoadLatestVersion(); err != nil {
_ = newAppDB.Close()
return false, fmt.Errorf("failed to load fresh store: %w", err)
}
snapshotManager = snapshots.NewManager(snapshotStore, opts, freshStore, nil, logger)

newSnapshotManager := snapshots.NewManager(snapshotStore, opts, freshStore, nil, logger)

snapSize, err := dirSize(tmpDir)
if err != nil {
logger.Error("cannot calculate snapshot size")
}

logger.Info("proceeding with snapshot restore", "size", formatSize(snapSize))
if err := snapshotManager.RestoreLocalSnapshot(snapshot.Height, snapshot.Format); err != nil {
return false, fmt.Errorf("failed to restore local snapshot: %w", err)

_, chunkReaders, err := snapshotStore.Load(snapshot.Height, snapshot.Format)
if err != nil {
_ = newAppDB.Close()
return false, fmt.Errorf("failed to load snapshot chunks: %w", err)
}

if err := newSnapshotManager.Restore(*snapshot); err != nil {
_ = newAppDB.Close()
return false, fmt.Errorf("failed to begin restore: %w", err)
}

// i always forget this: range on channels will loop until the channel is closed
for chunkReader := range chunkReaders {
chunkData, err := io.ReadAll(chunkReader)
_ = chunkReader.Close()
if err != nil {
_ = newAppDB.Close()
return false, fmt.Errorf("failed to read chunk: %w", err)
}

done, err := newSnapshotManager.RestoreChunk(chunkData)
if err != nil {
_ = newAppDB.Close()
return false, fmt.Errorf("failed to restore chunk: %w", err)
}
if done {
break
}
}

if err := newAppDB.Close(); err != nil {
logger.Error("error closing new db, continuing", "err", err)
}

_ = params.appDB.Close()

newPath := filepath.Join(params.dataDir, "application_new.db")
oldPath := filepath.Join(params.dataDir, "application.db")

if err := os.RemoveAll(oldPath); err != nil {
return false, fmt.Errorf("failed to remove old application.db: %w", err)
}

if err := os.Rename(newPath, oldPath); err != nil {
return false, fmt.Errorf("failed to swap new DB into place: %w", err)
}

params.appDB, err = db.NewDB("application", params.dbfmt, params.dataDir)
if err != nil {
return false, fmt.Errorf("failed to reopen application DB: %w", err)
}

newAppSize, err := dirSize(appPath)
Expand Down