Skip to content

Commit

Permalink
feat: convert with remote cache
Browse files Browse the repository at this point in the history
Here we can pull remote cache, use remote cache for conversion, and push the updated cache back to the remote. When updating, we follow a upper->lower order.

Signed-off-by: YuQiang <y_q_email@163.com>
  • Loading branch information
PerseidMeteor committed Aug 19, 2023
1 parent d9eb53a commit 9ce659d
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 8 deletions.
3 changes: 3 additions & 0 deletions pkg/content/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,7 @@ type Provider interface {
Image(ctx context.Context, ref string) (*ocispec.Descriptor, error)
// ContentStore gets the content store object of containerd.
ContentStore() content.Store
// SetCacheRef gets the cache reference of the source image,
// if ref is not empty, set cacheRef as ref.
SetCacheRef(ref string) string
}
3 changes: 2 additions & 1 deletion pkg/converter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (cvt *Converter) pull(ctx context.Context, source string) error {
return nil
}

func (cvt *Converter) Convert(ctx context.Context, source, target string) (*Metric, error) {
func (cvt *Converter) Convert(ctx context.Context, source, target, cache string) (*Metric, error) {
var metric Metric
sourceNamed, err := docker.ParseDockerRef(source)
if err != nil {
Expand Down Expand Up @@ -122,6 +122,7 @@ func (cvt *Converter) Convert(ctx context.Context, source, target string) (*Metr

logger.Infof("converting image %s", source)
start = time.Now()
cvt.provider.SetCacheRef(cache)
desc, err := cvt.driver.Convert(ctx, cvt.provider, source)
if err != nil {
return nil, errors.Wrap(err, "convert image")
Expand Down
195 changes: 188 additions & 7 deletions pkg/driver/nydus/nydus.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,30 @@ package nydus
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
"os/exec"
"regexp"
"strconv"
"strings"
"sync"

"github.com/goharbor/acceleration-service/pkg/adapter/annotation"
accelcontent "github.com/goharbor/acceleration-service/pkg/content"
"github.com/goharbor/acceleration-service/pkg/driver/nydus/parser"
nydusutils "github.com/goharbor/acceleration-service/pkg/driver/nydus/utils"
"github.com/goharbor/acceleration-service/pkg/errdefs"
"github.com/goharbor/acceleration-service/pkg/utils"

"github.com/containerd/containerd"
"github.com/containerd/containerd/content"
containerdErrDefs "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/images/converter"
"github.com/containerd/containerd/platforms"
"github.com/containerd/nydus-snapshotter/pkg/backend"
nydusify "github.com/containerd/nydus-snapshotter/pkg/converter"
"github.com/goharbor/acceleration-service/pkg/adapter/annotation"
accelcontent "github.com/goharbor/acceleration-service/pkg/content"
"github.com/goharbor/acceleration-service/pkg/driver/nydus/parser"
nydusutils "github.com/goharbor/acceleration-service/pkg/driver/nydus/utils"
"github.com/goharbor/acceleration-service/pkg/errdefs"
"github.com/goharbor/acceleration-service/pkg/utils"
"github.com/opencontainers/image-spec/specs-go"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
Expand All @@ -58,6 +61,8 @@ const (
var builderVersion string
var builderVersionOnce sync.Once

type CacheRef struct{}

type chunkDictInfo struct {
BootstrapPath string
}
Expand Down Expand Up @@ -224,13 +229,40 @@ func (d *Driver) Convert(ctx context.Context, provider accelcontent.Provider, so
if err != nil {
return nil, errors.Wrap(err, "get source image")
}

cacheRef := provider.SetCacheRef("")
useRemoteCache := cacheRef != ""
if useRemoteCache {
logrus.Infof("remote cache image reference: %s", cacheRef)
if err := d.FetchRemoteCache(ctx, provider, cacheRef); err != nil {
if errors.Is(err, errdefs.ErrNotSupport) {
logrus.Warn("Content store does not support remote cache")
} else {
return nil, errors.Wrap(err, "fetch remote cache")
}
}
}

desc, err := d.convert(ctx, provider, *image, sourceRef)
if err != nil {
return nil, err
}
if d.mergeManifest {
return d.makeManifestIndex(ctx, provider.ContentStore(), *image, *desc)
}

if useRemoteCache {
// Before update cache, we need to pull cache manifest from remote
if err := d.FetchRemoteCache(ctx, provider, cacheRef); err != nil {
return nil, errors.Wrap(err, "fetch remote cache")
}
if err := d.UpdateRemoteCache(ctx, provider, *image, *desc); err != nil {
return nil, errors.Wrap(err, "update remote cache")
}
if err := d.PushRemoteCache(ctx, provider, cacheRef); err != nil {
return nil, errors.Wrap(err, "push remote cache")
}
}
return desc, err
}

Expand Down Expand Up @@ -399,3 +431,152 @@ func (d *Driver) getChunkDict(ctx context.Context, provider accelcontent.Provide

return &chunkDict, nil
}

// FetchRemoteCache fetch cache manifest from remote
func (d *Driver) FetchRemoteCache(ctx context.Context, pvd accelcontent.Provider, ref string) error {
resolver, err := pvd.Resolver(ref)
if err != nil {
return err
}

rc := &containerd.RemoteContext{
Resolver: resolver,
}
name, desc, err := rc.Resolver.Resolve(ctx, ref)
if err != nil {
if errors.Is(err, containerdErrDefs.ErrNotFound) {
// Remote cache may do not exist, just return nil
return nil
}
return err
}
fetcher, err := rc.Resolver.Fetcher(ctx, name)
if err != nil {
return err
}
ir, err := fetcher.Fetch(ctx, desc)
if err != nil {
if errdefs.NeedsRetryWithHTTP(err) {
pvd.UsePlainHTTP()
ir, err = fetcher.Fetch(ctx, desc)
if err != nil {
return errors.Wrap(err, "try to pull remote cache")
}
} else {
return errors.Wrap(err, "pull remote cache")
}
}

bytes, err := io.ReadAll(ir)
if err != nil {
return errors.Wrap(err, "read remote cache to bytes")
}

// TODO: handle manifest list for multiple platform.
manifest := ocispec.Manifest{}
if err = json.Unmarshal(bytes, &manifest); err != nil {
return err
}

cs := pvd.ContentStore()
for _, layer := range manifest.Layers {
if _, err := cs.Update(ctx, content.Info{
Digest: layer.Digest,
Size: layer.Size,
Labels: layer.Annotations,
}); err != nil {
if errors.Is(err, containerdErrDefs.ErrNotFound) {
return errdefs.ErrNotSupport
}
return errors.Wrap(err, "update cache layer")
}
}
return nil
}

// PushRemoteCache update cache manifest and push to remote
func (d *Driver) PushRemoteCache(ctx context.Context, pvd accelcontent.Provider, ref string) error {
imageConfig := ocispec.ImageConfig{}
imageConfigDesc, imageConfigBytes, err := nydusutils.MarshalToDesc(imageConfig, ocispec.MediaTypeImageConfig)
if err != nil {
return errors.Wrap(err, "remote cache image config marshal failed")
}
configReader := bytes.NewReader(imageConfigBytes)
if err = content.WriteBlob(ctx, pvd.ContentStore(), ref, configReader, *imageConfigDesc); err != nil {
return errors.Wrap(err, "remote cache image config write blob failed")
}

cs := pvd.ContentStore()
layers := []ocispec.Descriptor{}
if err = cs.Walk(ctx, func(info content.Info) error {
if _, ok := info.Labels[nydusutils.LayerAnnotationNydusSourceDigest]; ok {
layers = append(layers, ocispec.Descriptor{
MediaType: nydusutils.MediaTypeNydusBlob,
Digest: info.Digest,
Size: info.Size,
Annotations: info.Labels,
})
}
return nil
}); err != nil {
return errors.Wrap(err, "get remote cache layers failed")
}

manifest := ocispec.Manifest{
Versioned: specs.Versioned{
SchemaVersion: 2,
},
MediaType: ocispec.MediaTypeImageManifest,
Config: *imageConfigDesc,
Layers: layers,
}
manifestDesc, manifestBytes, err := nydusutils.MarshalToDesc(manifest, ocispec.MediaTypeImageManifest)
if err != nil {
return errors.Wrap(err, "remote cache manifest marshal failed")
}
manifestReader := bytes.NewReader(manifestBytes)
if err = content.WriteBlob(ctx, pvd.ContentStore(), ref, manifestReader, *manifestDesc); err != nil {
return errors.Wrap(err, "remote cache write blob failed")
}

if err = pvd.Push(ctx, *manifestDesc, ref); err != nil {
return err
}
return nil
}

// UpdateRemoteCache update cache layer from upper to lower
func (d *Driver) UpdateRemoteCache(ctx context.Context, provider accelcontent.Provider, orgDesc ocispec.Descriptor, newDesc ocispec.Descriptor) error {
cs := provider.ContentStore()

orgManifest := ocispec.Manifest{}
_, err := utils.ReadJSON(ctx, cs, &orgManifest, orgDesc)
if err != nil {
return errors.Wrap(err, "read original manifest json")
}

newManifest := ocispec.Manifest{}
_, err = utils.ReadJSON(ctx, cs, &newManifest, newDesc)
if err != nil {
return errors.Wrap(err, "read new manifest json")
}
newLayers := newManifest.Layers[:len(newManifest.Layers)-1]

// Update <LayerAnnotationNydusSourceDigest> label for each layer
for i, layer := range newLayers {
layer.Annotations[nydusutils.LayerAnnotationNydusSourceDigest] = orgManifest.Layers[i].Digest.String()
}

// Update cache to lru from upper to lower
for i := len(newLayers) - 1; i >= 0; i-- {
layer := newLayers[i]
if _, err := cs.Update(ctx, content.Info{
Digest: layer.Digest,
Size: layer.Size,
Labels: layer.Annotations,
}); err != nil {
return errors.Wrap(err, "update cache layer")
}
}
return nil
}

0 comments on commit 9ce659d

Please sign in to comment.