Skip to content

Commit

Permalink
feat: convert with remote cache
Browse files Browse the repository at this point in the history
Here we can pull remote cache, use remote cache for conversion, and push the updated cache back to the remote. When updating, we follow a upper->lower order.It is worth noting that we have ported the HTTPReadSeeker here, so that we can pull some layer's bootstrap  according to the bootstrap offset and size when merging layers into a boostrap layer.

Signed-off-by: breezeTuT <y_q_email@163.com>
  • Loading branch information
PerseidMeteor committed Aug 15, 2023
1 parent 60bdacc commit 490b91e
Show file tree
Hide file tree
Showing 6 changed files with 954 additions and 5 deletions.
5 changes: 3 additions & 2 deletions pkg/converter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (cvt *Converter) pull(ctx context.Context, source string) error {
return nil
}

func (cvt *Converter) Convert(ctx context.Context, source, target string) (*Metric, error) {
func (cvt *Converter) Convert(ctx context.Context, source, target, cache string) (*Metric, error) {
var metric Metric
sourceNamed, err := docker.ParseDockerRef(source)
if err != nil {
Expand Down Expand Up @@ -119,8 +119,9 @@ func (cvt *Converter) Convert(ctx context.Context, source, target string) (*Metr
return nil, errors.Wrap(err, "get source image size")
}
logger.Infof("pulled image %s, elapse %s", source, metric.SourcePullElapsed)

logger.Infof("converting image %s", source)
ctx = context.WithValue(ctx, "remote cache", cache)
start = time.Now()
desc, err := cvt.driver.Convert(ctx, cvt.provider, source)
if err != nil {
Expand Down
283 changes: 280 additions & 3 deletions pkg/driver/nydus/nydus.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ package nydus
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
"os/exec"

"path/filepath"
"regexp"
"strconv"
"strings"
Expand All @@ -30,11 +34,14 @@ import (
"github.com/goharbor/acceleration-service/pkg/driver/nydus/parser"
nydusutils "github.com/goharbor/acceleration-service/pkg/driver/nydus/utils"
"github.com/goharbor/acceleration-service/pkg/errdefs"
"github.com/goharbor/acceleration-service/pkg/remote"
"github.com/goharbor/acceleration-service/pkg/utils"

"github.com/containerd/containerd"
"github.com/containerd/containerd/content"
containerdErrDefs "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/images/converter"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/platforms"
"github.com/containerd/nydus-snapshotter/pkg/backend"
nydusify "github.com/containerd/nydus-snapshotter/pkg/converter"
Expand Down Expand Up @@ -224,17 +231,45 @@ func (d *Driver) Convert(ctx context.Context, provider accelcontent.Provider, so
if err != nil {
return nil, errors.Wrap(err, "get source image")
}
desc, err := d.convert(ctx, provider, *image, sourceRef)

cacheRef := ctx.Value("remote cache").(string)
useRemoteCache := cacheRef != ""
if useRemoteCache {
if err := d.FetchRemoteCache(ctx, provider, cacheRef); err != nil {
if errors.Is(err, errdefs.ErrNotSupport) {
logrus.Warn("Content store does not support remote cache")
} else {
return nil, errors.Wrap(err, "fetch remote cache")
}
}
} else {
logrus.Warn("Content store does not use remote cache")
}

desc, err := d.convert(ctx, provider, *image, sourceRef, useRemoteCache)
if err != nil {
return nil, err
}
if d.mergeManifest {
return d.makeManifestIndex(ctx, provider.ContentStore(), *image, *desc)
}

if useRemoteCache {
// Before update cache, we need to pull cache manifest from remote
if err := d.FetchRemoteCache(ctx, provider, cacheRef); err != nil {
return nil, errors.Wrap(err, "fetch remote cache")
}
if err := d.UpdateRemoteCache(ctx, provider, *image, *desc); err != nil {
return nil, errors.Wrap(err, "update remote cache")
}
if err := d.PushRemoteCache(ctx, provider, *desc, cacheRef); err != nil {
return nil, errors.Wrap(err, "push remote cache")
}
}
return desc, err
}

func (d *Driver) convert(ctx context.Context, provider accelcontent.Provider, source ocispec.Descriptor, sourceRef string) (*ocispec.Descriptor, error) {
func (d *Driver) convert(ctx context.Context, provider accelcontent.Provider, source ocispec.Descriptor, sourceRef string, useRemoteCache bool) (*ocispec.Descriptor, error) {
cs := provider.ContentStore()

chunkDictPath := ""
Expand Down Expand Up @@ -275,6 +310,21 @@ func (d *Driver) convert(ctx context.Context, provider accelcontent.Provider, so
convertHookFunc := func(
ctx context.Context, cs content.Store, orgDesc ocispec.Descriptor, newDesc *ocispec.Descriptor,
) (*ocispec.Descriptor, error) {
if useRemoteCache {
if newDesc.MediaType == nydusutils.MediaTypeNydusBlob {
if _, err := cs.ReaderAt(ctx, *newDesc); err != nil {
if errors.Is(err, containerdErrDefs.ErrNotFound) {
newCtx := namespaces.WithNamespace(context.Background(), "acceleration-service")
cacheRef := ctx.Value("remote cache").(string)
if err := d.FetchRemoteBoot(newCtx, provider, *newDesc, cacheRef); err != nil {
return nil, errors.Wrap(err, "fetch remote bootstrap")
}
}else{
return nil, errors.Wrap(err, "get cache layer reader")
}
}
}
}
// Append the nydus bootstrap layer for image manifest.
desc, err := nydusify.ConvertHookFunc(mergeOpt)(ctx, cs, orgDesc, newDesc)
if err != nil {
Expand Down Expand Up @@ -399,3 +449,230 @@ func (d *Driver) getChunkDict(ctx context.Context, provider accelcontent.Provide

return &chunkDict, nil
}

// FetchRemoteBoot fetches the bootstrap from remote and unpack it to the work dir.
func (d *Driver) FetchRemoteBoot(ctx context.Context, provider accelcontent.Provider, desc ocispec.Descriptor, cacheRef string) error {
cs := provider.ContentStore()

info, err := cs.Info(ctx, desc.Digest)
if err != nil {
return err
}
if info.Labels == nil || info.Labels[nydusutils.LayerAnnotationNydusBootOffsetSize] == "" {
return errors.Errorf("can not get bootstrap offset and size %s", desc.Digest.String())
}
offsetSize := strings.Split(info.Labels[nydusutils.LayerAnnotationNydusBootOffsetSize], ":")
if len(offsetSize) != 2 {
return errors.Errorf("invalid bootstrap offset and size %s", offsetSize)
}
offset, err := strconv.ParseInt(offsetSize[0], 10, 64)
if err != nil {
return errors.Wrapf(err, "invalid bootstrap offset %s", offsetSize[0])
}
size, err := strconv.ParseInt(offsetSize[1], 10, 64)
if err != nil {
return errors.Wrapf(err, "invalid bootstrap size %s", offsetSize[1])
}

// Use '<workdir>/<digest>' as boot path
bootDst := filepath.Join(d.workDir, desc.Digest.Encoded())
bootFile, err := os.OpenFile(bootDst, os.O_CREATE|os.O_RDWR, 0655)
if err != nil {
return errors.Wrapf(err, "create boot file %s", bootDst)
}
defer bootFile.Close()

var ra io.ReadCloser
ra, err = remote.Fetch(ctx, cacheRef, desc, offset, size, false, false)
if err != nil {
return err
}
defer ra.Close()

if _, err := io.Copy(bootFile, ra); err != nil {
if errdefs.NeedsRetryWithHTTP(err) {
ra, err = remote.Fetch(ctx, cacheRef, desc, offset, size, true, true)
if err != nil {
return err
}
defer ra.Close()
if _, err := io.Copy(bootFile, ra); err != nil {
return errors.Wrapf(err, "fetch bootstrap %s", cacheRef)
}
}else{
return errors.Wrapf(err, "write boot file %s", bootDst)
}
}

info.Labels[nydusutils.LayerAnnotationNydusReaderAtPath] = bootDst
if _, err = cs.Update(ctx, info); err != nil {
return errors.Wrap(err, "update layer label")
}
return nil
}

// FetchRemoteCache fetch cache manifest from remote
func (d *Driver) FetchRemoteCache(ctx context.Context, pvd accelcontent.Provider, ref string) error {
resolver, err := pvd.Resolver(ref)
if err != nil {
return err
}

// TODO: sets max concurrent downloaded layer limit by containerd.WithMaxConcurrentDownloads.
rc := &containerd.RemoteContext{
Resolver: resolver,
}
name, desc, err := rc.Resolver.Resolve(ctx, ref)
if err != nil {
if errors.Is(err, containerdErrDefs.ErrNotFound) {
// Remote cache may do not exist, just return nil
return nil
}
return err
}
fetcher, err := rc.Resolver.Fetcher(ctx, name)
if err != nil {
return err
}
ir, err := fetcher.Fetch(ctx, desc)
if err != nil {
if errdefs.NeedsRetryWithHTTP(err) {
pvd.UsePlainHTTP()
ir, err = fetcher.Fetch(ctx, desc)
if err != nil {
return errors.Wrap(err, "try to pull remote cache")
}
} else {
return errors.Wrap(err, "pull remote cache")
}
}

bytes, err := io.ReadAll(ir)
if err != nil {
return errors.Wrap(err, "read remote cache to bytes")
}
manifest := ocispec.Manifest{}
if err = json.Unmarshal(bytes, &manifest); err != nil {
return err
}

cs := pvd.ContentStore()
for _, layer := range manifest.Layers {
if _, err := cs.Update(ctx, content.Info{
Digest: layer.Digest,
Size: layer.Size,
Labels: layer.Annotations,
});err != nil {
if errors.Is(err, containerdErrDefs.ErrNotFound) {
return errdefs.ErrNotSupport
}
}
}
return nil
}

// PushRemoteCache update cache manifest and push to remote
func (d *Driver) PushRemoteCache(ctx context.Context, pvd accelcontent.Provider, targetDesc ocispec.Descriptor, ref string) error {
imageConfig := ocispec.ImageConfig{}
imageConfigDesc, imageConfigBytes, err := nydusutils.MarshalToDesc(imageConfig, ocispec.MediaTypeImageConfig)
if err != nil {
return errors.Wrap(err, "remote cache image config marshal failed")
}
configReader := bytes.NewReader(imageConfigBytes)
if err = content.WriteBlob(ctx, pvd.ContentStore(), ref, configReader, *imageConfigDesc); err != nil {
return errors.Wrap(err, "remote cache image config write blob failed")
}

cs := pvd.ContentStore()
layers := []ocispec.Descriptor{}
if err = cs.Walk(ctx, func(info content.Info) error {
if _, ok := info.Labels[nydusutils.LayerAnnotationNydusSourceDigest]; ok {
layers = append(layers, ocispec.Descriptor{
MediaType: nydusutils.MediaTypeNydusBlob,
Digest: info.Digest,
Size: info.Size,
Annotations: info.Labels,
})
}
return nil
}); err != nil {
return errors.Wrap(err, "get remote cache layers failed")
}

manifest := ocispec.Manifest{
Versioned: specs.Versioned{
SchemaVersion: 2,
},
MediaType: ocispec.MediaTypeImageManifest,
Config: *imageConfigDesc,
Layers: layers,
}
manifestDesc, manifestBytes, err := nydusutils.MarshalToDesc(manifest, ocispec.MediaTypeImageManifest)
if err != nil {
return errors.Wrap(err, "remote cache manifest marshal failed")
}
manifestReader := bytes.NewReader(manifestBytes)
if err = content.WriteBlob(ctx, pvd.ContentStore(), ref, manifestReader, *manifestDesc); err != nil {
return errors.Wrap(err, "remote cache write blob failed")
}

if err = pvd.Push(ctx, *manifestDesc, ref); err != nil {
return err
}
return nil
}

// UpdateRemoteCache update cache layer from upper to lower
func (d *Driver) UpdateRemoteCache(ctx context.Context, provider accelcontent.Provider, orgDesc ocispec.Descriptor, newDesc ocispec.Descriptor) error{
cs := provider.ContentStore()

orgManifest := ocispec.Manifest{}
_, err := utils.ReadJSON(ctx, cs, &orgManifest, orgDesc)
if err != nil {
return errors.Wrap(err, "read original manifest json")
}

newManifest := ocispec.Manifest{}
_, err = utils.ReadJSON(ctx, cs, &newManifest, newDesc)
if err != nil {
return errors.Wrap(err, "read new manifest json")
}
newLayers := newManifest.Layers[:len(newManifest.Layers) - 1]

// Update <LayerAnnotationNydusSourceDigest> and <LayerAnnotationNydusBootOffsetSize>
// labels for each layer
for i, layer := range newLayers {
layer.Annotations[nydusutils.LayerAnnotationNydusSourceDigest] = orgManifest.Layers[i].Digest.String()
info, err := cs.Info(ctx, layer.Digest)
if err != nil{
return errors.Wrap(err, "get layer info")
}
if _, ok := info.Labels[nydusutils.LayerAnnotationNydusBootOffsetSize]; !ok {
ra, err := cs.ReaderAt(ctx, layer)
if err != nil {
return errors.Wrap(err, "get layer reader")
}
offset, size, err := nydusutils.SeekBootOffsetAndSize(ra, nydusify.EntryBootstrap, nil)
if err != nil {
return errors.Wrap(err, "seek bootstrap offset and size")
}
layer.Annotations[nydusutils.LayerAnnotationNydusBootOffsetSize] = fmt.Sprintf("%d:%d", offset, size)
}else{
layer.Annotations[nydusutils.LayerAnnotationNydusBootOffsetSize] = info.Labels[nydusutils.LayerAnnotationNydusBootOffsetSize]
os.Remove(info.Labels[nydusutils.LayerAnnotationNydusReaderAtPath])
}
}

// Update cache to lru from upper to lower
for i := len(newLayers) - 1; i >= 0; i-- {
layer := newLayers[i]
if _, err := cs.Update(ctx, content.Info{
Digest: layer.Digest,
Size: layer.Size,
Labels: layer.Annotations,
}); err != nil {
return errors.Wrap(err, "update cache layer")
}
}
return nil
}
Loading

0 comments on commit 490b91e

Please sign in to comment.