From 4ff727b6eef298a08f3553e7fa600050814d1799 Mon Sep 17 00:00:00 2001 From: jason yang Date: Thu, 12 Sep 2024 01:41:31 +0000 Subject: [PATCH] Re-implement bars when fetching remote OCI images via ggcr Signed-off-by: jason yang --- .../pkg/build/sources/conveyorPacker_oci.go | 3 ++ internal/pkg/client/progress.go | 29 ++++++--------- internal/pkg/client/progress_roundtrip.go | 9 +++++ internal/pkg/ociimage/fetch.go | 33 ++++++++++++----- internal/pkg/ociimage/sourcesink.go | 37 +++++++++++++++++-- 5 files changed, 80 insertions(+), 31 deletions(-) diff --git a/internal/pkg/build/sources/conveyorPacker_oci.go b/internal/pkg/build/sources/conveyorPacker_oci.go index 585e4b74a..4e384b122 100644 --- a/internal/pkg/build/sources/conveyorPacker_oci.go +++ b/internal/pkg/build/sources/conveyorPacker_oci.go @@ -128,6 +128,7 @@ type OCIConveyorPacker struct { // Get downloads container information from the specified source func (cp *OCIConveyorPacker) Get(ctx context.Context, b *sytypes.Bundle) (err error) { + sylog.Infof("Fetching OCI image...") cp.b = b cp.topts = &ociimage.TransportOptions{ @@ -190,11 +191,13 @@ func (cp *OCIConveyorPacker) Get(ctx context.Context, b *sytypes.Bundle) (err er // Pack puts relevant objects in a Bundle. func (cp *OCIConveyorPacker) Pack(ctx context.Context) (*sytypes.Bundle, error) { + sylog.Infof("Extracting OCI image...") err := cp.unpackRootfs(ctx) if err != nil { return nil, fmt.Errorf("while unpacking rootfs: %v", err) } + sylog.Infof("Inserting Apptainer configuration...") err = cp.insertBaseEnv() if err != nil { return nil, fmt.Errorf("while inserting base environment: %v", err) diff --git a/internal/pkg/client/progress.go b/internal/pkg/client/progress.go index a634a8ef8..f4fb3115d 100644 --- a/internal/pkg/client/progress.go +++ b/internal/pkg/client/progress.go @@ -29,29 +29,22 @@ var defaultOption = []mpb.BarOption{ ), } +var unknownSizeOption = []mpb.BarOption{ + mpb.PrependDecorators( + decor.Current(decor.SizeB1024(0), "%.1f / ???"), + ), + mpb.AppendDecorators( + decor.AverageSpeed(decor.SizeB1024(0), " % .1f "), + ), +} + func initProgressBar(totalSize int64) (*mpb.Progress, *mpb.Bar) { p := mpb.New() if totalSize > 0 { - return p, p.AddBar(totalSize, - mpb.PrependDecorators( - decor.Counters(decor.SizeB1024(0), "%.1f / %.1f"), - ), - mpb.AppendDecorators( - decor.Percentage(), - decor.AverageSpeed(decor.SizeB1024(0), " % .1f "), - decor.AverageETA(decor.ET_STYLE_GO), - ), - ) + return p, p.AddBar(totalSize, defaultOption...) } - return p, p.AddBar(totalSize, - mpb.PrependDecorators( - decor.Current(decor.SizeB1024(0), "%.1f / ???"), - ), - mpb.AppendDecorators( - decor.AverageSpeed(decor.SizeB1024(0), " % .1f "), - ), - ) + return p, p.AddBar(totalSize, unknownSizeOption...) } // See: https://ixday.github.io/post/golang-cancel-copy/ diff --git a/internal/pkg/client/progress_roundtrip.go b/internal/pkg/client/progress_roundtrip.go index aa292d6c0..a6612b9d1 100644 --- a/internal/pkg/client/progress_roundtrip.go +++ b/internal/pkg/client/progress_roundtrip.go @@ -28,6 +28,15 @@ type RoundTripper struct { sizes []int64 } +// NewRoundTripper wraps inner (or http.DefaultTransport if inner is nil) with +// progress bar functionality. A separate bar will be displayed for every GET +// request that returns a body >64KiB, updated as the response body is read. The +// caller is responsible for calling rt.ProgressWait / rt.ProgressShutdown when +// all requests are completed, so that the mpb progress container exits +// correctly. Note that if requests are made, but the response body is not +// read, the progress bar will remain 'stuck', preventing rt.ProgressWait +// from returning. rt.ProgressComplete is provided to override all bars to be +// 100% complete, to satisfy rt.ProgressWait where appropriate. func NewRoundTripper(ctx context.Context, inner http.RoundTripper) *RoundTripper { if inner == nil { inner = http.DefaultTransport diff --git a/internal/pkg/ociimage/fetch.go b/internal/pkg/ociimage/fetch.go index ac3f7acaf..e5438b592 100644 --- a/internal/pkg/ociimage/fetch.go +++ b/internal/pkg/ociimage/fetch.go @@ -22,14 +22,15 @@ import ( "strings" "github.com/apptainer/apptainer/internal/pkg/cache" + progressClient "github.com/apptainer/apptainer/internal/pkg/client" "github.com/apptainer/apptainer/pkg/sylog" ggcrv1 "github.com/google/go-containerregistry/pkg/v1" v1 "github.com/google/go-containerregistry/pkg/v1" ) -// CachedImage will ensure that the provided v1.Image is present in the Apptainer +// cachedImage will ensure that the provided v1.Image is present in the Apptainer // OCI cache layout dir, and return a new v1.Image pointing to the cached copy. -func CachedImage(ctx context.Context, imgCache *cache.Handle, srcImg v1.Image) (v1.Image, error) { +func cachedImage(ctx context.Context, imgCache *cache.Handle, srcImg v1.Image) (v1.Image, error) { if imgCache == nil || imgCache.IsDisabled() { return nil, fmt.Errorf("undefined image cache") } @@ -46,10 +47,11 @@ func CachedImage(ctx context.Context, imgCache *cache.Handle, srcImg v1.Image) ( cachedRef := layoutDir + "@" + digest.String() sylog.Debugf("Caching image to %s", cachedRef) + if err := OCISourceSink.WriteImage(srcImg, layoutDir, nil); err != nil { + return nil, err + } - OCISourceSink.WriteImage(srcImg, layoutDir) - - return OCISourceSink.Image(ctx, cachedRef, nil) + return OCISourceSink.Image(ctx, cachedRef, nil, nil) } // FetchToLayout will fetch the OCI image specified by imageRef to an OCI layout @@ -87,14 +89,24 @@ func FetchToLayout(ctx context.Context, tOpts *TransportOptions, imgCache *cache return nil, err } - srcImg, err := srcType.Image(ctx, srcRef, tOpts) + rt := progressClient.NewRoundTripper(ctx, nil) + + srcImg, err := srcType.Image(ctx, srcRef, tOpts, rt) if err != nil { + rt.ProgressShutdown() return nil, err } if imgCache != nil && !imgCache.IsDisabled() { // Ensure the image is cached, and return reference to the cached image. - return CachedImage(ctx, imgCache, srcImg) + cachedImg, err := cachedImage(ctx, imgCache, srcImg) + if err != nil { + rt.ProgressShutdown() + return nil, err + } + rt.ProgressComplete() + rt.ProgressWait() + return cachedImg, nil } // No cache - write to layout directory provided @@ -103,11 +115,14 @@ func FetchToLayout(ctx context.Context, tOpts *TransportOptions, imgCache *cache return nil, err } sylog.Debugf("Copying %q to temporary layout at %q", srcRef, tmpLayout) - if err = OCISourceSink.WriteImage(srcImg, tmpLayout); err != nil { + if err = OCISourceSink.WriteImage(srcImg, tmpLayout, nil); err != nil { + rt.ProgressShutdown() return nil, err } + rt.ProgressComplete() + rt.ProgressWait() - return OCISourceSink.Image(ctx, tmpLayout, tOpts) + return OCISourceSink.Image(ctx, tmpLayout, tOpts, nil) } // Perform a dumb tar(gz) extraction with no chown, id remapping etc. diff --git a/internal/pkg/ociimage/sourcesink.go b/internal/pkg/ociimage/sourcesink.go index e751b2e72..284cf57c1 100644 --- a/internal/pkg/ociimage/sourcesink.go +++ b/internal/pkg/ociimage/sourcesink.go @@ -14,6 +14,7 @@ import ( "fmt" "strings" + progressClient "github.com/apptainer/apptainer/internal/pkg/client" "github.com/apptainer/apptainer/internal/pkg/util/ociauth" "github.com/apptainer/apptainer/pkg/sylog" "github.com/docker/docker/client" @@ -36,7 +37,7 @@ const ( DaemonSourceSink ) -func getDockerImage(ctx context.Context, src string, tOpts *TransportOptions) (v1.Image, error) { +func getDockerImage(ctx context.Context, src string, tOpts *TransportOptions, rt *progressClient.RoundTripper) (v1.Image, error) { var nameOpts []name.Option if tOpts != nil && tOpts.Insecure { nameOpts = append(nameOpts, name.Insecure) @@ -57,6 +58,10 @@ func getDockerImage(ctx context.Context, src string, tOpts *TransportOptions) (v ociauth.AuthOptn(tOpts.AuthConfig, tOpts.AuthFilePath)) } + if rt != nil { + pullOpts = append(pullOpts, remote.WithTransport(rt)) + } + return remote.Image(srcRef, pullOpts...) } @@ -145,10 +150,10 @@ func (ss SourceSink) Reference(s string, tOpts *TransportOptions) (name.Referenc } } -func (ss SourceSink) Image(ctx context.Context, ref string, tOpts *TransportOptions) (v1.Image, error) { +func (ss SourceSink) Image(ctx context.Context, ref string, tOpts *TransportOptions, rt *progressClient.RoundTripper) (v1.Image, error) { switch ss { case RegistrySourceSink: - return getDockerImage(ctx, ref, tOpts) + return getDockerImage(ctx, ref, tOpts, rt) case TarballSourceSink: return tarball.ImageFromPath(ref, nil) case OCISourceSink: @@ -162,7 +167,7 @@ func (ss SourceSink) Image(ctx context.Context, ref string, tOpts *TransportOpti } } -func (ss SourceSink) WriteImage(img v1.Image, dstName string) error { +func (ss SourceSink) WriteImage(img v1.Image, dstName string, tOpts *TransportOptions) error { switch ss { case OCISourceSink: lp, err := layout.FromPath(dstName) @@ -173,8 +178,32 @@ func (ss SourceSink) WriteImage(img v1.Image, dstName string) error { } } return lp.AppendImage(img) + + case RegistrySourceSink: + var nameOpts []name.Option + if tOpts != nil && tOpts.Insecure { + nameOpts = append(nameOpts, name.Insecure) + } + dstRef, err := name.ParseReference(dstName, nameOpts...) + if err != nil { + return err + } + remoteOpts := []remote.Option{} + if tOpts != nil { + remoteOpts = append(remoteOpts, + remote.WithPlatform(tOpts.Platform), + ociauth.AuthOptn(tOpts.AuthConfig, tOpts.AuthFilePath)) + } + return remote.Write(dstRef, img, remoteOpts...) + + case TarballSourceSink: + // Only supports writing a single image per tarball. + dstRef := name.MustParseReference("image") + return tarball.WriteToFile(dstName, dstRef, img) + case UnknownSourceSink: return errUnsupportedTransport + default: return errUnsupportedTransport }