Skip to content

Commit

Permalink
refactor: use progress bar pool
Browse files Browse the repository at this point in the history
Signed-off-by: nikpivkin <nikita.pivkin@smartforce.io>
  • Loading branch information
nikpivkin committed Dec 27, 2024
1 parent 729e89a commit 486cb9a
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 18 deletions.
76 changes: 58 additions & 18 deletions pkg/fanal/artifact/image/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/cheggaaa/pb/v3"
"github.com/docker/go-units"
v1 "github.com/google/go-containerregistry/pkg/v1"
"github.com/google/go-containerregistry/pkg/v1/partial"
"github.com/samber/lo"
"golang.org/x/xerrors"

Expand Down Expand Up @@ -222,7 +223,7 @@ func (a Artifact) checkImageSize(ctx context.Context) error {
return nil
}

imageSize, err := a.imageSize(ctx)
imageSize, err := a.downloadImage(ctx)
if err != nil {
return xerrors.Errorf("failed to calculate image size: %w", err)
}
Expand All @@ -239,14 +240,49 @@ func (a Artifact) checkImageSize(ctx context.Context) error {
return nil
}

func (a Artifact) imageSize(ctx context.Context) (int64, error) {
// progressLayer wraps a v1.Layer to add progress bar functionality
type progressLayer struct {
v1.Layer
bar *pb.ProgressBar
}

func newProgressLayer(layer v1.Layer, bar *pb.ProgressBar) (v1.Layer, error) {
return partial.CompressedToLayer(&progressLayer{
Layer: layer, bar: bar,
})
}

func (l *progressLayer) Compressed() (io.ReadCloser, error) {
rc, err := l.Layer.Compressed()
if err != nil {
return nil, err
}

return l.bar.NewProxyReader(rc), nil
}

func (a Artifact) downloadImage(ctx context.Context) (int64, error) {
layers, err := a.image.Layers()
if err != nil {
return -1, xerrors.Errorf("failed to get image layers: %w", err)
}

var imageSize int64
if !a.artifactOption.NoProgress {
progressPool := pb.NewPool()
wrappedLayers, err := a.wrapLayers(layers, progressPool)
if err != nil {
return -1, xerrors.Errorf("failed to wrap")
}
if err := progressPool.Start(); err != nil {
log.Error("Failed to start progress bar pool", log.Err(err))
} else {
defer progressPool.Stop()
layers = wrappedLayers
}

}

var imageSize int64
p := parallel.NewPipeline(a.artifactOption.Parallel, false, layers,
func(ctx context.Context, layer v1.Layer) (int64, error) {
layerSize, err := a.downloadLayer(layer)
Expand All @@ -268,6 +304,24 @@ func (a Artifact) imageSize(ctx context.Context) (int64, error) {
return imageSize, nil
}

func (a Artifact) wrapLayers(layers []v1.Layer, progressPool *pb.Pool) ([]v1.Layer, error) {
wrappedLayers := make([]v1.Layer, 0, len(layers))
for _, l := range layers {
size, err := l.Size()
if err != nil {
return nil, err
}
bar := pb.New64(size).SetTemplate(pb.Full)
progressPool.Add(bar)
pl, err := newProgressLayer(l, bar)
if err != nil {
return nil, xerrors.Errorf("failed to create progress layer: %w", err)
}
wrappedLayers = append(wrappedLayers, pl)
}
return wrappedLayers, nil
}

func (a Artifact) downloadLayer(layer v1.Layer) (int64, error) {
rc, err := layer.Compressed()
if err != nil {
Expand All @@ -287,21 +341,7 @@ func (a Artifact) downloadLayer(layer v1.Layer) (int64, error) {
}
defer f.Close()

size, err := layer.Size()
if err != nil {
return -1, xerrors.Errorf("size error: %w", err)
}

bar := pb.Simple.Start64(size)
if a.artifactOption.NoProgress {
bar.SetWriter(io.Discard)
}
pr := bar.NewProxyReader(rc)
defer bar.Finish()

bar.Set("prefix", shortenHash(h.Hex, 12))

dr, err := uncompressed(pr)
dr, err := uncompressed(rc)
if err != nil {
return -1, xerrors.Errorf("failed to init decompressor: %w", err)
}
Expand Down
21 changes: 21 additions & 0 deletions pkg/fanal/image/daemon/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ func (img *image) configFile() (*v1.ConfigFile, error) {
return img.Image.ConfigFile()
}

func (img *image) Layers() ([]v1.Layer, error) {
if err := img.populateImage(); err != nil {
return nil, xerrors.Errorf("unable to populate: %w", err)
}
return img.Image.Layers()
}

func (img *image) LayerByDiffID(h v1.Hash) (v1.Layer, error) {
if err := img.populateImage(); err != nil {
return nil, xerrors.Errorf("unable to populate: %w", err)
Expand All @@ -173,6 +180,20 @@ func (img *image) RepoDigests() []string {
return img.inspect.RepoDigests
}

// func (img *image) Digest() (v1.Hash, error) {
// return v1.Hash{}, nil
// }

// func (img *image) LayerByDigest(v1.Hash) (v1.Layer, error) {
// return nil, nil
// }

// func (img *image) Manifest() (*v1.Manifest, error) {
// return nil, nil
// }

// func (img *image) MediaType() types.Med

func (img *image) diffIDs() ([]v1.Hash, error) {
var diffIDs []v1.Hash
for _, l := range img.inspect.RootFS.Layers {
Expand Down

0 comments on commit 486cb9a

Please sign in to comment.