Skip to content

Commit

Permalink
feat: convert with remote cache
Browse files Browse the repository at this point in the history
Here we can pull remote cache, use remote cache for conversion, and push the updated cache back to the remote. When updating, we follow a upper->lower order.

Signed-off-by: YuQiang <y_q_email@163.com>
  • Loading branch information
PerseidMeteor committed Aug 18, 2023
1 parent 5b3f0cf commit 387c86f
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 10 deletions.
3 changes: 3 additions & 0 deletions pkg/content/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,7 @@ type Provider interface {
Image(ctx context.Context, ref string) (*ocispec.Descriptor, error)
// ContentStore gets the content store object of containerd.
ContentStore() content.Store
// CacheReference gets the cache reference of the source image,
// if ref is not empty, set cache reference as ref.
CacheReference(ref string) string
}
3 changes: 2 additions & 1 deletion pkg/converter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (cvt *Converter) pull(ctx context.Context, source string) error {
return nil
}

func (cvt *Converter) Convert(ctx context.Context, source, target string) (*Metric, error) {
func (cvt *Converter) Convert(ctx context.Context, source, target, cache string) (*Metric, error) {
var metric Metric
sourceNamed, err := docker.ParseDockerRef(source)
if err != nil {
Expand Down Expand Up @@ -122,6 +122,7 @@ func (cvt *Converter) Convert(ctx context.Context, source, target string) (*Metr

logger.Infof("converting image %s", source)
start = time.Now()
cvt.provider.CacheReference(cache)
desc, err := cvt.driver.Convert(ctx, cvt.provider, source)
if err != nil {
return nil, errors.Wrap(err, "convert image")
Expand Down
198 changes: 189 additions & 9 deletions pkg/driver/nydus/nydus.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,30 @@ package nydus
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
"os/exec"
"regexp"
"strconv"
"strings"
"sync"

"github.com/goharbor/acceleration-service/pkg/adapter/annotation"
accelcontent "github.com/goharbor/acceleration-service/pkg/content"
"github.com/goharbor/acceleration-service/pkg/driver/nydus/parser"
nydusutils "github.com/goharbor/acceleration-service/pkg/driver/nydus/utils"
"github.com/goharbor/acceleration-service/pkg/errdefs"
"github.com/goharbor/acceleration-service/pkg/utils"

"github.com/containerd/containerd"
"github.com/containerd/containerd/content"
containerdErrDefs "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/images/converter"
"github.com/containerd/containerd/platforms"
"github.com/containerd/nydus-snapshotter/pkg/backend"
nydusify "github.com/containerd/nydus-snapshotter/pkg/converter"
"github.com/goharbor/acceleration-service/pkg/adapter/annotation"
accelcontent "github.com/goharbor/acceleration-service/pkg/content"
"github.com/goharbor/acceleration-service/pkg/driver/nydus/parser"
nydusutils "github.com/goharbor/acceleration-service/pkg/driver/nydus/utils"
"github.com/goharbor/acceleration-service/pkg/errdefs"
"github.com/goharbor/acceleration-service/pkg/utils"
"github.com/opencontainers/image-spec/specs-go"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
Expand All @@ -58,6 +61,8 @@ const (
var builderVersion string
var builderVersionOnce sync.Once

type CacheRef struct{}

type chunkDictInfo struct {
BootstrapPath string
}
Expand Down Expand Up @@ -224,17 +229,45 @@ func (d *Driver) Convert(ctx context.Context, provider accelcontent.Provider, so
if err != nil {
return nil, errors.Wrap(err, "get source image")
}
desc, err := d.convert(ctx, provider, *image, sourceRef)

cacheRef := provider.CacheReference("")
useRemoteCache := cacheRef != ""
if useRemoteCache {
if err := d.FetchRemoteCache(ctx, provider, cacheRef); err != nil {
if errors.Is(err, errdefs.ErrNotSupport) {
logrus.Warn("Content store does not support remote cache")
} else {
return nil, errors.Wrap(err, "fetch remote cache")
}
}
} else {
logrus.Warn("Content store does not use remote cache")
}

desc, err := d.convert(ctx, provider, *image, sourceRef, useRemoteCache)
if err != nil {
return nil, err
}
if d.mergeManifest {
return d.makeManifestIndex(ctx, provider.ContentStore(), *image, *desc)
}

if useRemoteCache {
// Before update cache, we need to pull cache manifest from remote
if err := d.FetchRemoteCache(ctx, provider, cacheRef); err != nil {
return nil, errors.Wrap(err, "fetch remote cache")
}
if err := d.UpdateRemoteCache(ctx, provider, *image, *desc); err != nil {
return nil, errors.Wrap(err, "update remote cache")
}
if err := d.PushRemoteCache(ctx, provider, *desc, cacheRef); err != nil {
return nil, errors.Wrap(err, "push remote cache")
}
}
return desc, err
}

func (d *Driver) convert(ctx context.Context, provider accelcontent.Provider, source ocispec.Descriptor, sourceRef string) (*ocispec.Descriptor, error) {
func (d *Driver) convert(ctx context.Context, provider accelcontent.Provider, source ocispec.Descriptor, sourceRef string, useRemoteCache bool) (*ocispec.Descriptor, error) {
cs := provider.ContentStore()

chunkDictPath := ""
Expand Down Expand Up @@ -399,3 +432,150 @@ func (d *Driver) getChunkDict(ctx context.Context, provider accelcontent.Provide

return &chunkDict, nil
}

// FetchRemoteCache fetch cache manifest from remote
func (d *Driver) FetchRemoteCache(ctx context.Context, pvd accelcontent.Provider, ref string) error {
resolver, err := pvd.Resolver(ref)
if err != nil {
return err
}

// TODO: sets max concurrent downloaded layer limit by containerd.WithMaxConcurrentDownloads.
rc := &containerd.RemoteContext{
Resolver: resolver,
}
name, desc, err := rc.Resolver.Resolve(ctx, ref)
if err != nil {
if errors.Is(err, containerdErrDefs.ErrNotFound) {
// Remote cache may do not exist, just return nil
return nil
}
return err
}
fetcher, err := rc.Resolver.Fetcher(ctx, name)
if err != nil {
return err
}
ir, err := fetcher.Fetch(ctx, desc)
if err != nil {
if errdefs.NeedsRetryWithHTTP(err) {
pvd.UsePlainHTTP()
ir, err = fetcher.Fetch(ctx, desc)
if err != nil {
return errors.Wrap(err, "try to pull remote cache")
}
} else {
return errors.Wrap(err, "pull remote cache")
}
}

bytes, err := io.ReadAll(ir)
if err != nil {
return errors.Wrap(err, "read remote cache to bytes")
}
manifest := ocispec.Manifest{}
if err = json.Unmarshal(bytes, &manifest); err != nil {
return err
}

cs := pvd.ContentStore()
for _, layer := range manifest.Layers {
if _, err := cs.Update(ctx, content.Info{
Digest: layer.Digest,
Size: layer.Size,
Labels: layer.Annotations,
}); err != nil {
if errors.Is(err, containerdErrDefs.ErrNotFound) {
return errdefs.ErrNotSupport
}
}
}
return nil
}

// PushRemoteCache update cache manifest and push to remote
func (d *Driver) PushRemoteCache(ctx context.Context, pvd accelcontent.Provider, targetDesc ocispec.Descriptor, ref string) error {
imageConfig := ocispec.ImageConfig{}
imageConfigDesc, imageConfigBytes, err := nydusutils.MarshalToDesc(imageConfig, ocispec.MediaTypeImageConfig)
if err != nil {
return errors.Wrap(err, "remote cache image config marshal failed")
}
configReader := bytes.NewReader(imageConfigBytes)
if err = content.WriteBlob(ctx, pvd.ContentStore(), ref, configReader, *imageConfigDesc); err != nil {
return errors.Wrap(err, "remote cache image config write blob failed")
}

cs := pvd.ContentStore()
layers := []ocispec.Descriptor{}
if err = cs.Walk(ctx, func(info content.Info) error {
if _, ok := info.Labels[nydusutils.LayerAnnotationNydusSourceDigest]; ok {
layers = append(layers, ocispec.Descriptor{
MediaType: nydusutils.MediaTypeNydusBlob,
Digest: info.Digest,
Size: info.Size,
Annotations: info.Labels,
})
}
return nil
}); err != nil {
return errors.Wrap(err, "get remote cache layers failed")
}

manifest := ocispec.Manifest{
Versioned: specs.Versioned{
SchemaVersion: 2,
},
MediaType: ocispec.MediaTypeImageManifest,
Config: *imageConfigDesc,
Layers: layers,
}
manifestDesc, manifestBytes, err := nydusutils.MarshalToDesc(manifest, ocispec.MediaTypeImageManifest)
if err != nil {
return errors.Wrap(err, "remote cache manifest marshal failed")
}
manifestReader := bytes.NewReader(manifestBytes)
if err = content.WriteBlob(ctx, pvd.ContentStore(), ref, manifestReader, *manifestDesc); err != nil {
return errors.Wrap(err, "remote cache write blob failed")
}

if err = pvd.Push(ctx, *manifestDesc, ref); err != nil {
return err
}
return nil
}

// UpdateRemoteCache update cache layer from upper to lower
func (d *Driver) UpdateRemoteCache(ctx context.Context, provider accelcontent.Provider, orgDesc ocispec.Descriptor, newDesc ocispec.Descriptor) error {
cs := provider.ContentStore()

orgManifest := ocispec.Manifest{}
_, err := utils.ReadJSON(ctx, cs, &orgManifest, orgDesc)
if err != nil {
return errors.Wrap(err, "read original manifest json")
}

newManifest := ocispec.Manifest{}
_, err = utils.ReadJSON(ctx, cs, &newManifest, newDesc)
if err != nil {
return errors.Wrap(err, "read new manifest json")
}
newLayers := newManifest.Layers[:len(newManifest.Layers)-1]

// Update <LayerAnnotationNydusSourceDigest> label for each layer
for i, layer := range newLayers {
layer.Annotations[nydusutils.LayerAnnotationNydusSourceDigest] = orgManifest.Layers[i].Digest.String()
}

// Update cache to lru from upper to lower
for i := len(newLayers) - 1; i >= 0; i-- {
layer := newLayers[i]
if _, err := cs.Update(ctx, content.Info{
Digest: layer.Digest,
Size: layer.Size,
Labels: layer.Annotations,
}); err != nil {
return errors.Wrap(err, "update cache layer")
}
}
return nil
}

0 comments on commit 387c86f

Please sign in to comment.