From f808592e3ae082bc93f3b6f31809d06951355664 Mon Sep 17 00:00:00 2001 From: breezeTuT Date: Tue, 22 Aug 2023 16:27:05 +0800 Subject: [PATCH] feat: convert with remote cache Here we can pull remote cache, use remote cache for conversion, and push the updated cache back to the remote. When updating, we follow a upper->lower order. Signed-off-by: breezeTuT --- pkg/content/provider.go | 4 + pkg/converter/converter.go | 3 +- pkg/driver/nydus/nydus.go | 195 +++++++++++++++++++++++++++++++++++-- 3 files changed, 194 insertions(+), 8 deletions(-) diff --git a/pkg/content/provider.go b/pkg/content/provider.go index 9dcf6046..e11d4d57 100644 --- a/pkg/content/provider.go +++ b/pkg/content/provider.go @@ -42,4 +42,8 @@ type Provider interface { Image(ctx context.Context, ref string) (*ocispec.Descriptor, error) // ContentStore gets the content store object of containerd. ContentStore() content.Store + // SetCacheRef sets the cache reference of the source image. + SetCacheRef(ref string) + // GetCacheRef gets the cache reference of the source image. + GetCacheRef() string } diff --git a/pkg/converter/converter.go b/pkg/converter/converter.go index fae1d97a..5fe55c15 100644 --- a/pkg/converter/converter.go +++ b/pkg/converter/converter.go @@ -88,7 +88,7 @@ func (cvt *Converter) pull(ctx context.Context, source string) error { return nil } -func (cvt *Converter) Convert(ctx context.Context, source, target string) (*Metric, error) { +func (cvt *Converter) Convert(ctx context.Context, source, target, cacheRef string) (*Metric, error) { var metric Metric sourceNamed, err := docker.ParseDockerRef(source) if err != nil { @@ -122,6 +122,7 @@ func (cvt *Converter) Convert(ctx context.Context, source, target string) (*Metr logger.Infof("converting image %s", source) start = time.Now() + cvt.provider.SetCacheRef(cacheRef) desc, err := cvt.driver.Convert(ctx, cvt.provider, source) if err != nil { return nil, errors.Wrap(err, "convert image") diff --git a/pkg/driver/nydus/nydus.go b/pkg/driver/nydus/nydus.go index 2c621c0c..7e8b7de0 100644 --- a/pkg/driver/nydus/nydus.go +++ b/pkg/driver/nydus/nydus.go @@ -17,7 +17,9 @@ package nydus import ( "bytes" "context" + "encoding/json" "fmt" + "io" "os" "os/exec" "regexp" @@ -25,19 +27,20 @@ import ( "strings" "sync" - "github.com/goharbor/acceleration-service/pkg/adapter/annotation" - accelcontent "github.com/goharbor/acceleration-service/pkg/content" - "github.com/goharbor/acceleration-service/pkg/driver/nydus/parser" - nydusutils "github.com/goharbor/acceleration-service/pkg/driver/nydus/utils" - "github.com/goharbor/acceleration-service/pkg/errdefs" - "github.com/goharbor/acceleration-service/pkg/utils" - + "github.com/containerd/containerd" "github.com/containerd/containerd/content" + containerdErrDefs "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/images" "github.com/containerd/containerd/images/converter" "github.com/containerd/containerd/platforms" "github.com/containerd/nydus-snapshotter/pkg/backend" nydusify "github.com/containerd/nydus-snapshotter/pkg/converter" + "github.com/goharbor/acceleration-service/pkg/adapter/annotation" + accelcontent "github.com/goharbor/acceleration-service/pkg/content" + "github.com/goharbor/acceleration-service/pkg/driver/nydus/parser" + nydusutils "github.com/goharbor/acceleration-service/pkg/driver/nydus/utils" + "github.com/goharbor/acceleration-service/pkg/errdefs" + "github.com/goharbor/acceleration-service/pkg/utils" "github.com/opencontainers/image-spec/specs-go" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" @@ -58,6 +61,8 @@ const ( var builderVersion string var builderVersionOnce sync.Once +type CacheRef struct{} + type chunkDictInfo struct { BootstrapPath string } @@ -224,6 +229,20 @@ func (d *Driver) Convert(ctx context.Context, provider accelcontent.Provider, so if err != nil { return nil, errors.Wrap(err, "get source image") } + + cacheRef := provider.GetCacheRef() + useRemoteCache := cacheRef != "" + if useRemoteCache { + logrus.Infof("remote cache image reference: %s", cacheRef) + if err := d.FetchRemoteCache(ctx, provider, cacheRef); err != nil { + if errors.Is(err, errdefs.ErrNotSupport) { + logrus.Warn("Content store does not support remote cache") + } else { + return nil, errors.Wrap(err, "fetch remote cache") + } + } + } + desc, err := d.convert(ctx, provider, *image, sourceRef) if err != nil { return nil, err @@ -231,6 +250,19 @@ func (d *Driver) Convert(ctx context.Context, provider accelcontent.Provider, so if d.mergeManifest { return d.makeManifestIndex(ctx, provider.ContentStore(), *image, *desc) } + + if useRemoteCache { + // Fetch the old remote cache before updating and pushing the new one to avoid conflict. + if err := d.FetchRemoteCache(ctx, provider, cacheRef); err != nil { + return nil, errors.Wrap(err, "fetch remote cache") + } + if err := d.UpdateRemoteCache(ctx, provider, *image, *desc); err != nil { + return nil, errors.Wrap(err, "update remote cache") + } + if err := d.PushRemoteCache(ctx, provider, cacheRef); err != nil { + return nil, errors.Wrap(err, "push remote cache") + } + } return desc, err } @@ -399,3 +431,152 @@ func (d *Driver) getChunkDict(ctx context.Context, provider accelcontent.Provide return &chunkDict, nil } + +// FetchRemoteCache fetch cache manifest from remote +func (d *Driver) FetchRemoteCache(ctx context.Context, pvd accelcontent.Provider, ref string) error { + resolver, err := pvd.Resolver(ref) + if err != nil { + return err + } + + rc := &containerd.RemoteContext{ + Resolver: resolver, + } + name, desc, err := rc.Resolver.Resolve(ctx, ref) + if err != nil { + if errors.Is(err, containerdErrDefs.ErrNotFound) { + // Remote cache may do not exist, just return nil + return nil + } + return err + } + fetcher, err := rc.Resolver.Fetcher(ctx, name) + if err != nil { + return err + } + ir, err := fetcher.Fetch(ctx, desc) + if err != nil { + if errdefs.NeedsRetryWithHTTP(err) { + pvd.UsePlainHTTP() + ir, err = fetcher.Fetch(ctx, desc) + if err != nil { + return errors.Wrap(err, "try to pull remote cache") + } + } else { + return errors.Wrap(err, "pull remote cache") + } + } + + bytes, err := io.ReadAll(ir) + if err != nil { + return errors.Wrap(err, "read remote cache to bytes") + } + + // TODO: handle manifest list for multiple platform. + manifest := ocispec.Manifest{} + if err = json.Unmarshal(bytes, &manifest); err != nil { + return err + } + + cs := pvd.ContentStore() + for _, layer := range manifest.Layers { + if _, err := cs.Update(ctx, content.Info{ + Digest: layer.Digest, + Size: layer.Size, + Labels: layer.Annotations, + }); err != nil { + if errors.Is(err, containerdErrDefs.ErrNotFound) { + return errdefs.ErrNotSupport + } + return errors.Wrap(err, "update cache layer") + } + } + return nil +} + +// PushRemoteCache update cache manifest and push to remote +func (d *Driver) PushRemoteCache(ctx context.Context, pvd accelcontent.Provider, ref string) error { + imageConfig := ocispec.ImageConfig{} + imageConfigDesc, imageConfigBytes, err := nydusutils.MarshalToDesc(imageConfig, ocispec.MediaTypeImageConfig) + if err != nil { + return errors.Wrap(err, "remote cache image config marshal failed") + } + configReader := bytes.NewReader(imageConfigBytes) + if err = content.WriteBlob(ctx, pvd.ContentStore(), ref, configReader, *imageConfigDesc); err != nil { + return errors.Wrap(err, "remote cache image config write blob failed") + } + + cs := pvd.ContentStore() + layers := []ocispec.Descriptor{} + if err = cs.Walk(ctx, func(info content.Info) error { + if _, ok := info.Labels[nydusutils.LayerAnnotationNydusSourceDigest]; ok { + layers = append(layers, ocispec.Descriptor{ + MediaType: nydusutils.MediaTypeNydusBlob, + Digest: info.Digest, + Size: info.Size, + Annotations: info.Labels, + }) + } + return nil + }); err != nil { + return errors.Wrap(err, "get remote cache layers failed") + } + + manifest := ocispec.Manifest{ + Versioned: specs.Versioned{ + SchemaVersion: 2, + }, + MediaType: ocispec.MediaTypeImageManifest, + Config: *imageConfigDesc, + Layers: layers, + } + manifestDesc, manifestBytes, err := nydusutils.MarshalToDesc(manifest, ocispec.MediaTypeImageManifest) + if err != nil { + return errors.Wrap(err, "remote cache manifest marshal failed") + } + manifestReader := bytes.NewReader(manifestBytes) + if err = content.WriteBlob(ctx, pvd.ContentStore(), ref, manifestReader, *manifestDesc); err != nil { + return errors.Wrap(err, "remote cache write blob failed") + } + + if err = pvd.Push(ctx, *manifestDesc, ref); err != nil { + return err + } + return nil +} + +// UpdateRemoteCache update cache layer from upper to lower +func (d *Driver) UpdateRemoteCache(ctx context.Context, provider accelcontent.Provider, orgDesc ocispec.Descriptor, newDesc ocispec.Descriptor) error { + cs := provider.ContentStore() + + orgManifest := ocispec.Manifest{} + _, err := utils.ReadJSON(ctx, cs, &orgManifest, orgDesc) + if err != nil { + return errors.Wrap(err, "read original manifest json") + } + + newManifest := ocispec.Manifest{} + _, err = utils.ReadJSON(ctx, cs, &newManifest, newDesc) + if err != nil { + return errors.Wrap(err, "read new manifest json") + } + newLayers := newManifest.Layers[:len(newManifest.Layers)-1] + + // Update label for each layer + for i, layer := range newLayers { + layer.Annotations[nydusutils.LayerAnnotationNydusSourceDigest] = orgManifest.Layers[i].Digest.String() + } + + // Update cache to lru from upper to lower + for i := len(newLayers) - 1; i >= 0; i-- { + layer := newLayers[i] + if _, err := cs.Update(ctx, content.Info{ + Digest: layer.Digest, + Size: layer.Size, + Labels: layer.Annotations, + }); err != nil { + return errors.Wrap(err, "update cache layer") + } + } + return nil +}