From f8bf1d4a8bd50da83ad55c1eefbb9b0e651892e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ksawery=20Wr=C3=B3bel?= Date: Tue, 19 Aug 2025 11:06:47 +0200 Subject: [PATCH] stream and restore snapshot, fix imports --- cmd/pruner.go | 75 +++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 61 insertions(+), 14 deletions(-) diff --git a/cmd/pruner.go b/cmd/pruner.go index adb5f44..69c0b00 100644 --- a/cmd/pruner.go +++ b/cmd/pruner.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "io" "io/fs" "os" "path/filepath" @@ -14,7 +15,6 @@ import ( "cosmossdk.io/store" "cosmossdk.io/store/metrics" "cosmossdk.io/store/snapshots" - "cosmossdk.io/store/types" storetypes "cosmossdk.io/store/types" "golang.org/x/sync/errgroup" @@ -83,9 +83,9 @@ func PruneAppState(params *ApplicationPrunerParams) (bool, error) { } } - keys := types.NewKVStoreKeys(storeNames...) + keys := storetypes.NewKVStoreKeys(storeNames...) for _, value := range keys { - appStore.MountStoreWithDB(value, types.StoreTypeIAVL, nil) + appStore.MountStoreWithDB(value, storetypes.StoreTypeIAVL, nil) } err := appStore.LoadLatestVersion() @@ -198,27 +198,25 @@ func SnapshotAndRestoreApp(params *ApplicationPrunerParams) (bool, error) { if err != nil { return false, fmt.Errorf("failed to create snapshot: %w", err) } - logger.Info("snapshot created, removing old application.db") + logger.Info("snapshot created, creating new application DB") - _ = params.appDB.Close() - if err := os.RemoveAll(filepath.Join(params.dataDir, "application.db")); err != nil { - return false, fmt.Errorf("failed to remove application.db: %w", err) - } - params.appDB, err = db.NewDB("application", params.dbfmt, params.dataDir) + newAppDB, err := db.NewDB("application_new", params.dbfmt, params.dataDir) if err != nil { - return false, fmt.Errorf("failed to recreate application DB: %w", err) + return false, fmt.Errorf("failed to create new application DB: %w", err) } - freshStore := store.NewCommitMultiStore(params.appDB, logger, metrics.NewNoOpMetrics()) + freshStore := store.NewCommitMultiStore(newAppDB, logger, metrics.NewNoOpMetrics()) freshStore.SetIAVLDisableFastNode(params.iavlDisableFastNode) for _, key := range keys { freshStore.MountStoreWithDB(key, storetypes.StoreTypeIAVL, nil) } if err := freshStore.LoadLatestVersion(); err != nil { + _ = newAppDB.Close() return false, fmt.Errorf("failed to load fresh store: %w", err) } - snapshotManager = snapshots.NewManager(snapshotStore, opts, freshStore, nil, logger) + + newSnapshotManager := snapshots.NewManager(snapshotStore, opts, freshStore, nil, logger) snapSize, err := dirSize(tmpDir) if err != nil { @@ -226,8 +224,57 @@ func SnapshotAndRestoreApp(params *ApplicationPrunerParams) (bool, error) { } logger.Info("proceeding with snapshot restore", "size", formatSize(snapSize)) - if err := snapshotManager.RestoreLocalSnapshot(snapshot.Height, snapshot.Format); err != nil { - return false, fmt.Errorf("failed to restore local snapshot: %w", err) + + _, chunkReaders, err := snapshotStore.Load(snapshot.Height, snapshot.Format) + if err != nil { + _ = newAppDB.Close() + return false, fmt.Errorf("failed to load snapshot chunks: %w", err) + } + + if err := newSnapshotManager.Restore(*snapshot); err != nil { + _ = newAppDB.Close() + return false, fmt.Errorf("failed to begin restore: %w", err) + } + + // i always forget this: range on channels will loop until the channel is closed + for chunkReader := range chunkReaders { + chunkData, err := io.ReadAll(chunkReader) + _ = chunkReader.Close() + if err != nil { + _ = newAppDB.Close() + return false, fmt.Errorf("failed to read chunk: %w", err) + } + + done, err := newSnapshotManager.RestoreChunk(chunkData) + if err != nil { + _ = newAppDB.Close() + return false, fmt.Errorf("failed to restore chunk: %w", err) + } + if done { + break + } + } + + if err := newAppDB.Close(); err != nil { + logger.Error("error closing new db, continuing", "err", err) + } + + _ = params.appDB.Close() + + newPath := filepath.Join(params.dataDir, "application_new.db") + oldPath := filepath.Join(params.dataDir, "application.db") + + if err := os.RemoveAll(oldPath); err != nil { + return false, fmt.Errorf("failed to remove old application.db: %w", err) + } + + if err := os.Rename(newPath, oldPath); err != nil { + return false, fmt.Errorf("failed to swap new DB into place: %w", err) + } + + params.appDB, err = db.NewDB("application", params.dbfmt, params.dataDir) + if err != nil { + return false, fmt.Errorf("failed to reopen application DB: %w", err) } newAppSize, err := dirSize(appPath)