Skip to content

Commit

Permalink
Re-implement bars when fetching remote OCI images via ggcr
Browse files Browse the repository at this point in the history
Co-authored-by: David Trudgian <david.trudgian@sylabs.io>
Signed-off-by: jason yang <jasonyangshadow@gmail.com>
  • Loading branch information
JasonYangShadow and dtrudg committed Sep 13, 2024
1 parent 19f57a2 commit 6088342
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 31 deletions.
3 changes: 3 additions & 0 deletions internal/pkg/build/sources/conveyorPacker_oci.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ type OCIConveyorPacker struct {

// Get downloads container information from the specified source
func (cp *OCIConveyorPacker) Get(ctx context.Context, b *sytypes.Bundle) (err error) {
sylog.Infof("Fetching OCI image...")
cp.b = b

cp.topts = &ociimage.TransportOptions{
Expand Down Expand Up @@ -190,11 +191,13 @@ func (cp *OCIConveyorPacker) Get(ctx context.Context, b *sytypes.Bundle) (err er

// Pack puts relevant objects in a Bundle.
func (cp *OCIConveyorPacker) Pack(ctx context.Context) (*sytypes.Bundle, error) {
sylog.Infof("Extracting OCI image...")
err := cp.unpackRootfs(ctx)
if err != nil {
return nil, fmt.Errorf("while unpacking rootfs: %v", err)
}

sylog.Infof("Inserting Apptainer configuration...")
err = cp.insertBaseEnv()
if err != nil {
return nil, fmt.Errorf("while inserting base environment: %v", err)
Expand Down
29 changes: 11 additions & 18 deletions internal/pkg/client/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,22 @@ var defaultOption = []mpb.BarOption{
),
}

var unknownSizeOption = []mpb.BarOption{
mpb.PrependDecorators(
decor.Current(decor.SizeB1024(0), "%.1f / ???"),
),
mpb.AppendDecorators(
decor.AverageSpeed(decor.SizeB1024(0), " % .1f "),
),
}

func initProgressBar(totalSize int64) (*mpb.Progress, *mpb.Bar) {
p := mpb.New()

if totalSize > 0 {
return p, p.AddBar(totalSize,
mpb.PrependDecorators(
decor.Counters(decor.SizeB1024(0), "%.1f / %.1f"),
),
mpb.AppendDecorators(
decor.Percentage(),
decor.AverageSpeed(decor.SizeB1024(0), " % .1f "),
decor.AverageETA(decor.ET_STYLE_GO),
),
)
return p, p.AddBar(totalSize, defaultOption...)
}
return p, p.AddBar(totalSize,
mpb.PrependDecorators(
decor.Current(decor.SizeB1024(0), "%.1f / ???"),
),
mpb.AppendDecorators(
decor.AverageSpeed(decor.SizeB1024(0), " % .1f "),
),
)
return p, p.AddBar(totalSize, unknownSizeOption...)
}

// See: https://ixday.github.io/post/golang-cancel-copy/
Expand Down
9 changes: 9 additions & 0 deletions internal/pkg/client/progress_roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ type RoundTripper struct {
sizes []int64
}

// NewRoundTripper wraps inner (or http.DefaultTransport if inner is nil) with
// progress bar functionality. A separate bar will be displayed for every GET
// request that returns a body >64KiB, updated as the response body is read. The
// caller is responsible for calling rt.ProgressWait / rt.ProgressShutdown when
// all requests are completed, so that the mpb progress container exits
// correctly. Note that if requests are made, but the response body is not
// read, the progress bar will remain 'stuck', preventing rt.ProgressWait
// from returning. rt.ProgressComplete is provided to override all bars to be
// 100% complete, to satisfy rt.ProgressWait where appropriate.
func NewRoundTripper(ctx context.Context, inner http.RoundTripper) *RoundTripper {
if inner == nil {
inner = http.DefaultTransport
Expand Down
33 changes: 24 additions & 9 deletions internal/pkg/ociimage/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ import (
"strings"

"github.com/apptainer/apptainer/internal/pkg/cache"
progressClient "github.com/apptainer/apptainer/internal/pkg/client"
"github.com/apptainer/apptainer/pkg/sylog"
ggcrv1 "github.com/google/go-containerregistry/pkg/v1"
v1 "github.com/google/go-containerregistry/pkg/v1"
)

// CachedImage will ensure that the provided v1.Image is present in the Apptainer
// cachedImage will ensure that the provided v1.Image is present in the Apptainer
// OCI cache layout dir, and return a new v1.Image pointing to the cached copy.
func CachedImage(ctx context.Context, imgCache *cache.Handle, srcImg v1.Image) (v1.Image, error) {
func cachedImage(ctx context.Context, imgCache *cache.Handle, srcImg v1.Image) (v1.Image, error) {
if imgCache == nil || imgCache.IsDisabled() {
return nil, fmt.Errorf("undefined image cache")
}
Expand All @@ -46,10 +47,11 @@ func CachedImage(ctx context.Context, imgCache *cache.Handle, srcImg v1.Image) (

cachedRef := layoutDir + "@" + digest.String()
sylog.Debugf("Caching image to %s", cachedRef)
if err := OCISourceSink.WriteImage(srcImg, layoutDir, nil); err != nil {
return nil, err
}

OCISourceSink.WriteImage(srcImg, layoutDir)

return OCISourceSink.Image(ctx, cachedRef, nil)
return OCISourceSink.Image(ctx, cachedRef, nil, nil)
}

// FetchToLayout will fetch the OCI image specified by imageRef to an OCI layout
Expand Down Expand Up @@ -87,14 +89,24 @@ func FetchToLayout(ctx context.Context, tOpts *TransportOptions, imgCache *cache
return nil, err
}

srcImg, err := srcType.Image(ctx, srcRef, tOpts)
rt := progressClient.NewRoundTripper(ctx, nil)

srcImg, err := srcType.Image(ctx, srcRef, tOpts, rt)
if err != nil {
rt.ProgressShutdown()
return nil, err
}

if imgCache != nil && !imgCache.IsDisabled() {
// Ensure the image is cached, and return reference to the cached image.
return CachedImage(ctx, imgCache, srcImg)
cachedImg, err := cachedImage(ctx, imgCache, srcImg)
if err != nil {
rt.ProgressShutdown()
return nil, err
}
rt.ProgressComplete()
rt.ProgressWait()
return cachedImg, nil
}

// No cache - write to layout directory provided
Expand All @@ -103,11 +115,14 @@ func FetchToLayout(ctx context.Context, tOpts *TransportOptions, imgCache *cache
return nil, err
}
sylog.Debugf("Copying %q to temporary layout at %q", srcRef, tmpLayout)
if err = OCISourceSink.WriteImage(srcImg, tmpLayout); err != nil {
if err = OCISourceSink.WriteImage(srcImg, tmpLayout, nil); err != nil {
rt.ProgressShutdown()
return nil, err
}
rt.ProgressComplete()
rt.ProgressWait()

return OCISourceSink.Image(ctx, tmpLayout, tOpts)
return OCISourceSink.Image(ctx, tmpLayout, tOpts, nil)
}

// Perform a dumb tar(gz) extraction with no chown, id remapping etc.
Expand Down
37 changes: 33 additions & 4 deletions internal/pkg/ociimage/sourcesink.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"fmt"
"strings"

progressClient "github.com/apptainer/apptainer/internal/pkg/client"
"github.com/apptainer/apptainer/internal/pkg/util/ociauth"
"github.com/apptainer/apptainer/pkg/sylog"
"github.com/docker/docker/client"
Expand All @@ -36,7 +37,7 @@ const (
DaemonSourceSink
)

func getDockerImage(ctx context.Context, src string, tOpts *TransportOptions) (v1.Image, error) {
func getDockerImage(ctx context.Context, src string, tOpts *TransportOptions, rt *progressClient.RoundTripper) (v1.Image, error) {
var nameOpts []name.Option
if tOpts != nil && tOpts.Insecure {
nameOpts = append(nameOpts, name.Insecure)
Expand All @@ -57,6 +58,10 @@ func getDockerImage(ctx context.Context, src string, tOpts *TransportOptions) (v
ociauth.AuthOptn(tOpts.AuthConfig, tOpts.AuthFilePath))
}

if rt != nil {
pullOpts = append(pullOpts, remote.WithTransport(rt))
}

return remote.Image(srcRef, pullOpts...)
}

Expand Down Expand Up @@ -145,10 +150,10 @@ func (ss SourceSink) Reference(s string, tOpts *TransportOptions) (name.Referenc
}
}

func (ss SourceSink) Image(ctx context.Context, ref string, tOpts *TransportOptions) (v1.Image, error) {
func (ss SourceSink) Image(ctx context.Context, ref string, tOpts *TransportOptions, rt *progressClient.RoundTripper) (v1.Image, error) {
switch ss {
case RegistrySourceSink:
return getDockerImage(ctx, ref, tOpts)
return getDockerImage(ctx, ref, tOpts, rt)
case TarballSourceSink:
return tarball.ImageFromPath(ref, nil)
case OCISourceSink:
Expand All @@ -162,7 +167,7 @@ func (ss SourceSink) Image(ctx context.Context, ref string, tOpts *TransportOpti
}
}

func (ss SourceSink) WriteImage(img v1.Image, dstName string) error {
func (ss SourceSink) WriteImage(img v1.Image, dstName string, tOpts *TransportOptions) error {
switch ss {
case OCISourceSink:
lp, err := layout.FromPath(dstName)
Expand All @@ -173,8 +178,32 @@ func (ss SourceSink) WriteImage(img v1.Image, dstName string) error {
}
}
return lp.AppendImage(img)

case RegistrySourceSink:
var nameOpts []name.Option
if tOpts != nil && tOpts.Insecure {
nameOpts = append(nameOpts, name.Insecure)
}
dstRef, err := name.ParseReference(dstName, nameOpts...)
if err != nil {
return err
}
remoteOpts := []remote.Option{}
if tOpts != nil {
remoteOpts = append(remoteOpts,
remote.WithPlatform(tOpts.Platform),
ociauth.AuthOptn(tOpts.AuthConfig, tOpts.AuthFilePath))
}
return remote.Write(dstRef, img, remoteOpts...)

case TarballSourceSink:
// Only supports writing a single image per tarball.
dstRef := name.MustParseReference("image")
return tarball.WriteToFile(dstName, dstRef, img)

case UnknownSourceSink:
return errUnsupportedTransport

default:
return errUnsupportedTransport
}
Expand Down

0 comments on commit 6088342

Please sign in to comment.