Skip to content

Commit

Permalink
refactor(general): Increase restore progress granularity (kopia#3655)
Browse files Browse the repository at this point in the history
When restoring huge file(s), the progress reporting is done in a bit
weird way:

```
kopia_test % kopia snapshot restore ka2084d263182164b6cf3456668e6b6da /Users/eugen.sumin/kopia_test/2
Restoring to local filesystem (/Users/eugen.sumin/kopia_test/2) with parallelism=8...
Processed 6 (5.4 GB) of 5 (5.4 GB) 1.6 MB/s (100.0%) remaining 0s.
Processed 6 (5.4 GB) of 5 (5.4 GB) 1.6 MB/s (100.0%) remaining 0s.
Processed 6 (5.4 GB) of 5 (5.4 GB) 1.6 MB/s (100.0%) remaining 0s.
Processed 6 (5.4 GB) of 5 (5.4 GB) 1.5 MB/s (100.0%) remaining 0s.
Processed 6 (5.4 GB) of 5 (5.4 GB) 1.5 MB/s (100.0%) remaining 0s.
Processed 6 (5.4 GB) of 5 (5.4 GB) 1.5 MB/s (100.0%) remaining 0s.
Restored 5 files, 1 directories and 0 symbolic links (5.4 GB).
```
In fact, the amount of restored data is dumped when particular file
completely restored.

This PR contains the least invasive change, which allows us to see
progress update while file is downloaded from object storage.
```
Restoring to local filesystem (/Users/eugen.sumin/kopia_test/55) with parallelism=8...
Processed 2 (3.1 MB) of 5 (1.8 GB).
Processed 4 (459.6 MB) of 5 (1.8 GB) 270.3 MB/s (25.2%) remaining 4s.
Processed 4 (468.7 MB) of 5 (1.8 GB) 269 MB/s (25.7%) remaining 4s.
Processed 4 (741.6 MB) of 5 (1.8 GB) 269 MB/s (40.6%) remaining 3s.
Processed 4 (1.1 GB) of 5 (1.8 GB) 280 MB/s (57.6%) remaining 2s.
Processed 5 (1.4 GB) of 5 (1.8 GB) 291.1 MB/s (75.2%) remaining 1s.
Processed 5 (1.4 GB) of 5 (1.8 GB) 289.8 MB/s (75.6%) remaining 1s.
Processed 5 (1.6 GB) of 5 (1.8 GB) 270.2 MB/s (85.3%) remaining 0s.
Processed 5 (1.7 GB) of 5 (1.8 GB) 256.3 MB/s (95.0%) remaining 0s.
Processed 6 (1.8 GB) of 5 (1.8 GB) 251 MB/s (100.0%) remaining 0s.
Processed 6 (1.8 GB) of 5 (1.8 GB) 251 MB/s (100.0%) remaining 0s.
Restored 5 files, 1 directories and 0 symbolic links (1.8 GB).
```

---------

Co-authored-by: Shikhar Mall <mall.shikhar.in@gmail.com>
  • Loading branch information
e-sumin and Shrekster authored May 10, 2024
1 parent e5790e3 commit 2b92388
Show file tree
Hide file tree
Showing 10 changed files with 305 additions and 61 deletions.
20 changes: 19 additions & 1 deletion cli/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/logging"
"github.com/kopia/kopia/repo/maintenance"
"github.com/kopia/kopia/snapshot/restore"
"github.com/kopia/kopia/snapshot/snapshotmaintenance"
)

Expand Down Expand Up @@ -86,6 +87,8 @@ type appServices interface {
advancedCommand(ctx context.Context)
repositoryConfigFileName() string
getProgress() *cliProgress
getRestoreProgress() restore.Progress

stdout() io.Writer
Stderr() io.Writer
stdin() io.Reader
Expand Down Expand Up @@ -117,6 +120,7 @@ type App struct {
enableAutomaticMaintenance bool
pf profileFlags
progress *cliProgress
restoreProgress restore.Progress
initialUpdateCheckDelay time.Duration
updateCheckInterval time.Duration
updateAvailableNotifyInterval time.Duration
Expand Down Expand Up @@ -181,6 +185,15 @@ func (c *App) getProgress() *cliProgress {
return c.progress
}

// SetRestoreProgress is used to set custom restore progress, purposed to be used in tests.
func (c *App) SetRestoreProgress(p restore.Progress) {
c.restoreProgress = p
}

func (c *App) getRestoreProgress() restore.Progress {
return c.restoreProgress
}

func (c *App) stdin() io.Reader {
return c.stdinReader
}
Expand Down Expand Up @@ -280,6 +293,10 @@ func (c *App) setup(app *kingpin.Application) {
c.pf.setup(app)
c.progress.setup(c, app)

if rp, ok := c.restoreProgress.(*cliRestoreProgress); ok {
rp.setup(c, app)
}

c.blob.setup(c, app)
c.benchmark.setup(c, app)
c.cache.setup(c, app)
Expand Down Expand Up @@ -308,7 +325,8 @@ type commandParent interface {
// NewApp creates a new instance of App.
func NewApp() *App {
return &App{
progress: &cliProgress{},
progress: &cliProgress{},
restoreProgress: &cliRestoreProgress{},
cliStorageProviders: []StorageProvider{
{"from-config", "the provided configuration file", func() StorageFlags { return &storageFromConfigFlags{} }},

Expand Down
120 changes: 120 additions & 0 deletions cli/cli_progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,4 +259,124 @@ func (p *cliProgress) Finish() {
}
}

type cliRestoreProgress struct {
restoredCount atomic.Int32
enqueuedCount atomic.Int32
skippedCount atomic.Int32
ignoredErrorsCount atomic.Int32

restoredTotalFileSize atomic.Int64
enqueuedTotalFileSize atomic.Int64
skippedTotalFileSize atomic.Int64

progressUpdateInterval time.Duration
enableProgress bool

svc appServices
outputThrottle timetrack.Throttle
outputMutex sync.Mutex
out textOutput
eta timetrack.Estimator

// +checklocks:outputMutex
lastLineLength int
}

func (p *cliRestoreProgress) setup(svc appServices, _ *kingpin.Application) {
cp := svc.getProgress()
if cp == nil {
return
}

p.progressUpdateInterval = cp.progressUpdateInterval
p.enableProgress = cp.enableProgress
p.out = cp.out
p.svc = svc

p.eta = timetrack.Start()
}

func (p *cliRestoreProgress) SetCounters(
enqueuedCount, restoredCount, skippedCount, ignoredErrors int32,
enqueuedBytes, restoredBytes, skippedBytes int64,
) {
p.enqueuedCount.Store(enqueuedCount)
p.enqueuedTotalFileSize.Store(enqueuedBytes)

p.restoredCount.Store(restoredCount)
p.restoredTotalFileSize.Store(restoredBytes)

p.skippedCount.Store(skippedCount)
p.skippedTotalFileSize.Store(skippedBytes)

p.ignoredErrorsCount.Store(ignoredErrors)

p.maybeOutput()
}

func (p *cliRestoreProgress) Flush() {
p.outputThrottle.Reset()
p.output("\n")
}

func (p *cliRestoreProgress) maybeOutput() {
if p.outputThrottle.ShouldOutput(p.svc.getProgress().progressUpdateInterval) {
p.output("")
}
}

func (p *cliRestoreProgress) output(suffix string) {
if !p.svc.getProgress().enableProgress {
return
}

p.outputMutex.Lock()
defer p.outputMutex.Unlock()

restoredCount := p.restoredCount.Load()
enqueuedCount := p.enqueuedCount.Load()
skippedCount := p.skippedCount.Load()
ignoredCount := p.ignoredErrorsCount.Load()

restoredSize := p.restoredTotalFileSize.Load()
enqueuedSize := p.enqueuedTotalFileSize.Load()
skippedSize := p.skippedTotalFileSize.Load()

if restoredSize == 0 {
return
}

var maybeRemaining, maybeSkipped, maybeErrors string
if est, ok := p.eta.Estimate(float64(restoredSize), float64(enqueuedSize)); ok {
maybeRemaining = fmt.Sprintf(" %v (%.1f%%) remaining %v",
units.BytesPerSecondsString(est.SpeedPerSecond),
est.PercentComplete,
est.Remaining)
}

if skippedCount > 0 {
maybeSkipped = fmt.Sprintf(", skipped %v (%v)", skippedCount, units.BytesString(skippedSize))
}

if ignoredCount > 0 {
maybeErrors = fmt.Sprintf(", ignored %v errors", ignoredCount)
}

line := fmt.Sprintf("Processed %v (%v) of %v (%v)%v%v%v.",
restoredCount+skippedCount, units.BytesString(restoredSize),
enqueuedCount, units.BytesString(enqueuedSize),
maybeSkipped, maybeErrors, maybeRemaining,
)

var extraSpaces string

if len(line) < p.lastLineLength {
// add extra spaces to wipe over previous line if it was longer than current
extraSpaces = strings.Repeat(" ", p.lastLineLength-len(line))
}

p.lastLineLength = len(line)
p.out.printStderr("\r%v%v%v", line, extraSpaces, suffix)
}

var _ snapshotfs.UploadProgress = (*cliProgress)(nil)
52 changes: 18 additions & 34 deletions cli/command_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/fs/localfs"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/timetrack"
"github.com/kopia/kopia/internal/units"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/object"
Expand Down Expand Up @@ -123,10 +122,13 @@ type commandRestore struct {
snapshotTime string

restores []restoreSourceTarget

svc appServices
}

func (c *commandRestore) setup(svc appServices, parent commandParent) {
c.restoreShallowAtDepth = unlimitedDepth
c.svc = svc

cmd := parent.Command("restore", restoreCommandHelp)
cmd.Arg("sources", restoreCommandSourcePathHelp).Required().StringsVar(&c.restoreTargetPaths)
Expand Down Expand Up @@ -394,51 +396,33 @@ func (c *commandRestore) run(ctx context.Context, rep repo.Repository) error {
rootEntry = re
}

eta := timetrack.Start()
restoreProgress := c.svc.getRestoreProgress()
progressCallback := func(ctx context.Context, stats restore.Stats) {
restoreProgress.SetCounters(
stats.EnqueuedFileCount+stats.EnqueuedDirCount+stats.EnqueuedSymlinkCount,
stats.RestoredFileCount+stats.RestoredDirCount+stats.RestoredSymlinkCount,
stats.SkippedCount,
stats.IgnoredErrorCount,
stats.EnqueuedTotalFileSize,
stats.RestoredTotalFileSize,
stats.SkippedTotalFileSize,
)
}

st, err := restore.Entry(ctx, rep, output, rootEntry, restore.Options{
Parallel: c.restoreParallel,
Incremental: c.restoreIncremental,
IgnoreErrors: c.restoreIgnoreErrors,
RestoreDirEntryAtDepth: c.restoreShallowAtDepth,
MinSizeForPlaceholder: c.minSizeForPlaceholder,
ProgressCallback: func(ctx context.Context, stats restore.Stats) {
restoredCount := stats.RestoredFileCount + stats.RestoredDirCount + stats.RestoredSymlinkCount + stats.SkippedCount
enqueuedCount := stats.EnqueuedFileCount + stats.EnqueuedDirCount + stats.EnqueuedSymlinkCount

if restoredCount == 0 {
return
}

var maybeRemaining, maybeSkipped, maybeErrors string

if est, ok := eta.Estimate(float64(stats.RestoredTotalFileSize), float64(stats.EnqueuedTotalFileSize)); ok {
maybeRemaining = fmt.Sprintf(" %v (%.1f%%) remaining %v",
units.BytesPerSecondsString(est.SpeedPerSecond),
est.PercentComplete,
est.Remaining)
}

if stats.SkippedCount > 0 {
maybeSkipped = fmt.Sprintf(", skipped %v (%v)", stats.SkippedCount, units.BytesString(stats.SkippedTotalFileSize))
}

if stats.IgnoredErrorCount > 0 {
maybeErrors = fmt.Sprintf(", ignored %v errors", stats.IgnoredErrorCount)
}

log(ctx).Infof("Processed %v (%v) of %v (%v)%v%v%v.",
restoredCount, units.BytesString(stats.RestoredTotalFileSize),
enqueuedCount, units.BytesString(stats.EnqueuedTotalFileSize),
maybeSkipped,
maybeErrors,
maybeRemaining)
},
ProgressCallback: progressCallback,
})
if err != nil {
return errors.Wrap(err, "error restoring")
}

progressCallback(ctx, st)
restoreProgress.Flush() // Force last progress values to be printed
printRestoreStats(ctx, &st)
}

Expand Down
43 changes: 38 additions & 5 deletions snapshot/restore/local_fs_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,34 @@ func getStreamCopier(ctx context.Context, targetpath string, sparse bool) (strea
}, nil
}

// progressReportingReader is just a wrapper for fs.Reader which is used to capture and pass to cb number of bytes read.
type progressReportingReader struct {
r fs.Reader

cb FileWriteProgress
}

func (r *progressReportingReader) Entry() (fs.Entry, error) {
return r.r.Entry() //nolint:wrapcheck
}

func (r *progressReportingReader) Seek(offset int64, whence int) (int64, error) {
return r.r.Seek(offset, whence) //nolint:wrapcheck
}

func (r *progressReportingReader) Close() error {
return r.r.Close() //nolint:wrapcheck
}

func (r *progressReportingReader) Read(p []byte) (int, error) {
bytesRead, err := r.r.Read(p)
if err == nil && r.cb != nil {
r.cb(int64(bytesRead))
}

return bytesRead, err //nolint:wrapcheck
}

// FilesystemOutput contains the options for outputting a file system tree.
type FilesystemOutput struct {
// TargetPath for restore.
Expand Down Expand Up @@ -147,11 +175,11 @@ func (o *FilesystemOutput) Close(ctx context.Context) error {
}

// WriteFile implements restore.Output interface.
func (o *FilesystemOutput) WriteFile(ctx context.Context, relativePath string, f fs.File) error {
func (o *FilesystemOutput) WriteFile(ctx context.Context, relativePath string, f fs.File, progressCb FileWriteProgress) error {
log(ctx).Debugf("WriteFile %v (%v bytes) %v, %v", filepath.Join(o.TargetPath, relativePath), f.Size(), f.Mode(), f.ModTime())
path := filepath.Join(o.TargetPath, filepath.FromSlash(relativePath))

if err := o.copyFileContent(ctx, path, f); err != nil {
if err := o.copyFileContent(ctx, path, f, progressCb); err != nil {
return errors.Wrap(err, "error creating file")
}

Expand Down Expand Up @@ -384,7 +412,7 @@ func write(targetPath string, r fs.Reader, size int64, c streamCopier) error {
return nil
}

func (o *FilesystemOutput) copyFileContent(ctx context.Context, targetPath string, f fs.File) error {
func (o *FilesystemOutput) copyFileContent(ctx context.Context, targetPath string, f fs.File, progressCb FileWriteProgress) error {
switch _, err := os.Stat(targetPath); {
case os.IsNotExist(err): // copy file below
case err == nil:
Expand All @@ -403,15 +431,20 @@ func (o *FilesystemOutput) copyFileContent(ctx context.Context, targetPath strin
}
defer r.Close() //nolint:errcheck

wr := &progressReportingReader{
r: r,
cb: progressCb,
}

log(ctx).Debugf("copying file contents to: %v", targetPath)
targetPath = atomicfile.MaybePrefixLongFilenameOnWindows(targetPath)

if o.WriteFilesAtomically {
//nolint:wrapcheck
return atomicfile.Write(targetPath, r)
return atomicfile.Write(targetPath, wr)
}

return write(targetPath, r, f.Size(), o.copier)
return write(targetPath, wr, f.Size(), o.copier)
}

func isEmptyDirectory(name string) (bool, error) {
Expand Down
Loading

0 comments on commit 2b92388

Please sign in to comment.