From 95423456f16d7f26f14d6a982d13e17906586f1f Mon Sep 17 00:00:00 2001 From: breezeTuT Date: Tue, 22 Aug 2023 16:26:07 +0800 Subject: [PATCH 1/2] feat: wrap content to support remote cache We can use LRU in content store and wrap some methods to provides remote cache. Here Info, Update, ReaderAt, Walk methods were wraped and they can get, update, and do other operations in LRU cache. Also, we can configure whether to enable cache or not with cache_tag_suffix.And we can config cache size. Signed-off-by: breezeTuT --- misc/config/config.nydus.ref.yaml | 4 + misc/config/config.nydus.yaml | 4 + pkg/adapter/adapter.go | 16 +- pkg/adapter/rule.go | 38 +- pkg/config/config.go | 19 +- pkg/content/cache.go | 60 +++ pkg/content/cache_test.go | 2 +- pkg/content/content.go | 114 +++++- pkg/content/content_test.go | 96 ++++- pkg/content/local.go | 29 +- pkg/driver/nydus/utils/constant.go | 2 + pkg/errdefs/errors.go | 2 + pkg/remote/ported.go | 617 +++++++++++++++++++++++++++++ 13 files changed, 963 insertions(+), 40 deletions(-) create mode 100644 pkg/remote/ported.go diff --git a/misc/config/config.nydus.ref.yaml b/misc/config/config.nydus.ref.yaml index 8610b0d2..63dc0304 100644 --- a/misc/config/config.nydus.ref.yaml +++ b/misc/config/config.nydus.ref.yaml @@ -31,6 +31,8 @@ provider: gcpolicy: # size threshold that triggers GC, the oldest used blobs will be reclaimed if exceeds the size. threshold: 1000MB + # remote cache record capacity of converted layers, default is 200. + cache_size: 200 converter: # number of worker for executing conversion task @@ -61,3 +63,5 @@ converter: rules: # add suffix to tag of source image reference as target image reference - tag_suffix: -nydus-oci-ref + # add suffix to tag of source image reference as remote cache reference, leave empty to disable remote cache. + - cache_tag_suffix: -nydus-cache diff --git a/misc/config/config.nydus.yaml b/misc/config/config.nydus.yaml index abb9733e..35758e58 100644 --- a/misc/config/config.nydus.yaml +++ b/misc/config/config.nydus.yaml @@ -31,6 +31,8 @@ provider: gcpolicy: # size threshold that triggers GC, the oldest used blobs will be reclaimed if exceeds the size. threshold: 1000MB + # remote cache record capacity of converted layers, default is 200. + cache_size: 200 converter: # number of worker for executing conversion task @@ -100,3 +102,5 @@ converter: rules: # add suffix to tag of source image reference as target image reference - tag_suffix: -nydus + # add suffix to tag of source image reference as remote cache reference, leave empty to disable remote cache. + - cache_tag_suffix: -nydus-cache diff --git a/pkg/adapter/adapter.go b/pkg/adapter/adapter.go index 2f4350ac..37318091 100644 --- a/pkg/adapter/adapter.go +++ b/pkg/adapter/adapter.go @@ -61,7 +61,7 @@ func NewLocalAdapter(cfg *config.Config) (*LocalAdapter, error) { return nil, errors.Wrap(err, "invalid platform configuration") } - provider, content, err := content.NewLocalProvider(cfg.Provider.WorkDir, cfg.Provider.GCPolicy.Threshold, cfg.Host, platformMC) + provider, content, err := content.NewLocalProvider(cfg, platformMC) if err != nil { return nil, errors.Wrap(err, "create content provider") } @@ -95,7 +95,7 @@ func NewLocalAdapter(cfg *config.Config) (*LocalAdapter, error) { } func (adp *LocalAdapter) Convert(ctx context.Context, source string) error { - target, err := adp.rule.Map(source) + target, err := adp.rule.Map(source, TagSuffix) if err != nil { if errors.Is(err, errdefs.ErrAlreadyConverted) { logrus.Infof("image has been converted: %s", source) @@ -103,7 +103,17 @@ func (adp *LocalAdapter) Convert(ctx context.Context, source string) error { } return errors.Wrap(err, "create target reference by rule") } - if _, err = adp.cvt.Convert(ctx, source, target); err != nil { + cacheRef, err := adp.rule.Map(source, CacheTagSuffix) + if err != nil { + if errors.Is(err, errdefs.ErrIsRemoteCache) { + logrus.Infof("image was remote cache: %s", source) + return nil + } + } + if err = adp.content.NewRemoteCache(cacheRef); err != nil { + return err + } + if _, err = adp.cvt.Convert(ctx, source, target, cacheRef); err != nil { return err } if err := adp.content.GC(ctx); err != nil { diff --git a/pkg/adapter/rule.go b/pkg/adapter/rule.go index ccbb8e0a..4e765818 100644 --- a/pkg/adapter/rule.go +++ b/pkg/adapter/rule.go @@ -24,6 +24,11 @@ import ( "github.com/goharbor/acceleration-service/pkg/errdefs" ) +const ( + TagSuffix = "tag_suffix" + CacheTagSuffix = "cache_tag_suffix" +) + // Add suffix to source image reference as the target // image reference, for example: // Source: 192.168.1.1/nginx:latest @@ -47,16 +52,33 @@ type Rule struct { // Map maps the source image reference to a new one according to // a rule, the new one will be used as the reference of target image. -func (rule *Rule) Map(ref string) (string, error) { - for _, item := range rule.items { - if item.TagSuffix != "" { - if strings.HasSuffix(ref, item.TagSuffix) { - // FIXME: To check if an image has been converted, a better solution - // is to use the annotation on image manifest. - return "", errdefs.ErrAlreadyConverted +func (rule *Rule) Map(ref, opt string) (string, error) { + switch opt { + case TagSuffix: + for _, item := range rule.items { + if item.TagSuffix != "" { + if strings.HasSuffix(ref, item.TagSuffix) { + // FIXME: To check if an image has been converted, a better solution + // is to use the annotation on image manifest. + return "", errdefs.ErrAlreadyConverted + } + return addSuffix(ref, item.TagSuffix) + } + } + case CacheTagSuffix: + for _, item := range rule.items { + if item.CacheTagSuffix != "" { + if strings.HasSuffix(ref, item.CacheTagSuffix) { + // FIXME: Ditto.A better way is to use the annotation on image manifest. + return "", errdefs.ErrIsRemoteCache + } + return addSuffix(ref, item.CacheTagSuffix) } - return addSuffix(ref, item.TagSuffix) } + // CacheTagSuffix empty means do not provide remote cache, just return empty string. + return "", nil + default: + return "", fmt.Errorf("unsupported map option: %s", opt) } return "", errors.New("not found matched conversion rule") } diff --git a/pkg/config/config.go b/pkg/config/config.go index 7d3e2d5e..9bb6e881 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -46,9 +46,10 @@ type MetricConfig struct { } type ProviderConfig struct { - Source map[string]SourceConfig `yaml:"source"` - WorkDir string `yaml:"work_dir"` - GCPolicy GCPolicy `yaml:"gcpolicy"` + Source map[string]SourceConfig `yaml:"source"` + WorkDir string `yaml:"work_dir"` + GCPolicy GCPolicy `yaml:"gcpolicy"` + CacheSize int `yaml:"cache_size"` } type GCPolicy struct { @@ -66,7 +67,8 @@ type SourceConfig struct { } type ConversionRule struct { - TagSuffix string `yaml:"tag_suffix"` + TagSuffix string `yaml:"tag_suffix"` + CacheTagSuffix string `yaml:"cache_tag_suffix"` } type ConverterConfig struct { @@ -137,3 +139,12 @@ func (cfg *Config) Host(ref string) (remote.CredentialFunc, bool, error) { return ary[0], ary[1], nil }, auth.Insecure, nil } + +func (cfg *Config) EnableRemoteCache() bool { + for _, rule := range cfg.Converter.Rules { + if rule.CacheTagSuffix != "" { + return true + } + } + return false +} diff --git a/pkg/content/cache.go b/pkg/content/cache.go index 34a2bf0a..c98282c9 100644 --- a/pkg/content/cache.go +++ b/pkg/content/cache.go @@ -23,7 +23,9 @@ import ( "github.com/containerd/containerd/leases" "github.com/containerd/containerd/namespaces" + "github.com/goharbor/acceleration-service/pkg/remote" lru "github.com/hashicorp/golang-lru/v2" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" ) // This is not thread-safe, which means it will depend on the parent implementation to do the locking mechanism. @@ -122,3 +124,61 @@ func (leaseCache *leaseCache) remove(key string, usedCount string) { func (leaseCache *leaseCache) Len() int { return leaseCache.size } + +type RemoteCache struct { + // remoteCache is an LRU cache for caching target layer descriptors, the cache key is the source layer digest, + // and the cache value is the target layer descriptor after conversion. + remoteCache *lru.Cache[string, ocispec.Descriptor] + // cacheRef is the remote cache reference. + cacheRef string + // host is a func to provide registry credential by host name. + host remote.HostFunc + // cacheSize is the remote cache record capacity of converted layers. + cacheSize int +} + +func NewRemoteCache(cacheSize int, host remote.HostFunc) (*RemoteCache, error) { + remoteCache, err := lru.New[string, ocispec.Descriptor](cacheSize) + if err != nil { + return nil, err + } + return &RemoteCache{ + remoteCache: remoteCache, + host: host, + cacheSize: cacheSize, + }, nil +} + +func (rc *RemoteCache) Values() []ocispec.Descriptor { + return rc.remoteCache.Values() +} + +func (rc *RemoteCache) Get(key string) (ocispec.Descriptor, bool) { + return rc.remoteCache.Get(key) +} + +func (rc *RemoteCache) Add(key string, value ocispec.Descriptor) { + rc.remoteCache.Add(key, value) +} + +func (rc *RemoteCache) Remove(key string) { + rc.remoteCache.Remove(key) +} + +// Size returns the number of items in the cache. +func (rc *RemoteCache) Size() int { + return rc.remoteCache.Len() + +} + +func (rc *RemoteCache) NewLRUCache(cacheSize int, cacheRef string) error { + if rc != nil { + remoteCache, err := lru.New[string, ocispec.Descriptor](cacheSize) + if err != nil { + return err + } + rc.remoteCache = remoteCache + rc.cacheRef = cacheRef + } + return nil +} diff --git a/pkg/content/cache_test.go b/pkg/content/cache_test.go index 148d04f5..52603705 100644 --- a/pkg/content/cache_test.go +++ b/pkg/content/cache_test.go @@ -44,7 +44,7 @@ func TestLeaseCache(t *testing.T) { func TestLeaseCacheInit(t *testing.T) { os.MkdirAll("./tmp", 0755) defer os.RemoveAll("./tmp") - content, err := NewContent("./tmp", "./tmp", "100MB") + content, err := NewContent("./tmp", "./tmp", "100MB", false, 200, nil) require.NoError(t, err) testDigest := []string{ "sha256:9bb13890319dc01e5f8a4d3d0c4c72685654d682d568350fd38a02b1d70aee6b", diff --git a/pkg/content/content.go b/pkg/content/content.go index 7f0d3126..6612aae6 100644 --- a/pkg/content/content.go +++ b/pkg/content/content.go @@ -21,13 +21,17 @@ import ( "strconv" "time" - "github.com/containerd/containerd/content" + ctrcontent "github.com/containerd/containerd/content" "github.com/containerd/containerd/content/local" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/filters" "github.com/containerd/containerd/leases" "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/metadata/boltutil" "github.com/containerd/containerd/namespaces" "github.com/dustin/go-humanize" + nydusutils "github.com/goharbor/acceleration-service/pkg/driver/nydus/utils" + "github.com/goharbor/acceleration-service/pkg/remote" "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" @@ -46,15 +50,17 @@ type Content struct { // lc cache the used count and reference order of lease lc *leaseCache // store is the local content store wrapped inner db - store content.Store + store ctrcontent.Store // threshold is the maximum capacity of the local caches storage threshold int64 + // remoteCache is the cache of remote layers + remoteCache *RemoteCache } // NewContent return content support by content store, bolt database and threshold. // content store created in contentDir and bolt database created in databaseDir. // content.db supported by bolt database and content store, content.lm supported by content.db. -func NewContent(contentDir string, databaseDir string, threshold string) (*Content, error) { +func NewContent(contentDir string, databaseDir string, threshold string, useRemoteCache bool, cacheSize int, host remote.HostFunc) (*Content, error) { store, err := local.NewLabeledStore(contentDir, newMemoryLabelStore()) if err != nil { return nil, errors.Wrap(err, "create local provider content store") @@ -76,6 +82,15 @@ func NewContent(contentDir string, databaseDir string, threshold string) (*Conte if err := lc.Init(lm); err != nil { return nil, err } + var remoteCache *RemoteCache + if useRemoteCache { + remoteCache, err = NewRemoteCache(cacheSize, host) + if err != nil { + return nil, err + } + } else { + remoteCache = nil + } content := Content{ db: db, lm: metadata.NewLeaseManager(db), @@ -83,6 +98,7 @@ func NewContent(contentDir string, databaseDir string, threshold string) (*Conte lc: lc, store: db.ContentStore(), threshold: int64(t), + remoteCache: remoteCache, } return &content, nil } @@ -227,38 +243,100 @@ func (content *Content) updateLease(digest *digest.Digest) error { }) } -func (content *Content) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) { - return content.store.Info(ctx, dgst) +func (content *Content) Info(ctx context.Context, dgst digest.Digest) (ctrcontent.Info, error) { + info, err := content.store.Info(ctx, dgst) + if content.remoteCache != nil { + if err != nil { + if errors.Is(err, errdefs.ErrNotFound) { + layers := content.remoteCache.Values() + for _, layer := range layers { + if layer.Digest == dgst { + return ctrcontent.Info{ + Digest: layer.Digest, + Size: layer.Size, + Labels: layer.Annotations, + }, nil + } + } + } + return info, err + } + if desc, ok := content.remoteCache.Get(dgst.String()); ok { + if info.Labels == nil { + info.Labels = map[string]string{} + } + info.Labels[nydusutils.LayerAnnotationNydusTargetDigest] = desc.Digest.String() + } + } + return info, err } -func (content *Content) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) { +func (content *Content) Update(ctx context.Context, info ctrcontent.Info, fieldpaths ...string) (ctrcontent.Info, error) { + if content.remoteCache != nil { + sourceDesc, ok := info.Labels[nydusutils.LayerAnnotationNydusSourceDigest] + if ok { + l := ocispec.Descriptor{ + MediaType: nydusutils.MediaTypeNydusBlob, + Digest: info.Digest, + Size: info.Size, + Annotations: info.Labels, + } + content.remoteCache.Add(sourceDesc, l) + return info, nil + } + } if info.Labels != nil { info.Labels = nil } return content.store.Update(ctx, info, fieldpaths...) } -func (content *Content) Walk(ctx context.Context, fn content.WalkFunc, filters ...string) error { - return content.store.Walk(ctx, fn, filters...) +func (content *Content) Walk(ctx context.Context, fn ctrcontent.WalkFunc, fs ...string) error { + if content.remoteCache != nil { + filter, err := filters.ParseAll(fs...) + if err != nil { + return err + } + for _, layer := range content.remoteCache.Values() { + info := ctrcontent.Info{ + Digest: layer.Digest, + Size: layer.Size, + Labels: layer.Annotations, + } + if filter.Match(ctrcontent.AdaptInfo(info)) { + if err := fn(info); err != nil { + return err + } + } + } + } + return content.store.Walk(ctx, fn, fs...) } func (content *Content) Delete(ctx context.Context, dgst digest.Digest) error { return content.store.Delete(ctx, dgst) } -func (content *Content) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) { +func (content *Content) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (ctrcontent.ReaderAt, error) { readerAt, err := content.store.ReaderAt(ctx, desc) if err != nil { + if content.remoteCache != nil && errors.Is(err, errdefs.ErrNotFound) { + for _, layer := range content.remoteCache.Values() { + if layer.Digest == desc.Digest { + return remote.Fetch(ctx, content.remoteCache.cacheRef, desc, content.remoteCache.host, false) + } + } + } return readerAt, err } return readerAt, content.updateLease(&desc.Digest) } -func (content *Content) Status(ctx context.Context, ref string) (content.Status, error) { +func (content *Content) Status(ctx context.Context, ref string) (ctrcontent.Status, error) { return content.store.Status(ctx, ref) } -func (content *Content) ListStatuses(ctx context.Context, filters ...string) ([]content.Status, error) { +func (content *Content) ListStatuses(ctx context.Context, filters ...string) ([]ctrcontent.Status, error) { return content.store.ListStatuses(ctx, filters...) } @@ -266,18 +344,26 @@ func (content *Content) Abort(ctx context.Context, ref string) error { return content.store.Abort(ctx, ref) } -func (content *Content) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) { +func (content *Content) Writer(ctx context.Context, opts ...ctrcontent.WriterOpt) (ctrcontent.Writer, error) { writer, err := content.store.Writer(ctx, opts...) return &localWriter{writer, content}, err } +func (content *Content) NewRemoteCache(cacheRef string) error { + if content.remoteCache != nil { + cacheSize := content.remoteCache.cacheSize + return content.remoteCache.NewLRUCache(cacheSize, cacheRef) + } + return nil +} + // localWriter wrap the content.Writer type localWriter struct { - content.Writer + ctrcontent.Writer content *Content } -func (localWriter localWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { +func (localWriter localWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...ctrcontent.Opt) error { // we don't write any lables, drop the opts localWriter.content.updateLease(&expected) return localWriter.Writer.Commit(ctx, size, expected) diff --git a/pkg/content/content_test.go b/pkg/content/content_test.go index ab4eef36..bfe07ddc 100644 --- a/pkg/content/content_test.go +++ b/pkg/content/content_test.go @@ -15,18 +15,112 @@ package content import ( + "context" "os" "testing" + "github.com/containerd/containerd/content" + "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/stretchr/testify/require" ) func TestContenSize(t *testing.T) { os.MkdirAll("./tmp", 0755) defer os.RemoveAll("./tmp") - content, err := NewContent("./tmp", "./tmp", "1000MB") + content, err := NewContent("./tmp", "./tmp", "1000MB", false, 200, nil) require.NoError(t, err) size, err := content.Size() require.NoError(t, err) require.Equal(t, size, int64(0)) } + +func TestRemoteCacheFIFO(t *testing.T) { + os.MkdirAll("./tmp", 0755) + defer os.RemoveAll("./tmp") + cs, err := NewContent("./tmp", "./tmp", "1000MB", true, 5, nil) + require.NoError(t, err) + + // Assume the following layers were already cached in remote cache + cs.remoteCache.Add("sha256:3ea3641165a4082d1feb7f933b11589a98f6b44906b3ab7224fd57afdb81bd22", ocispec.Descriptor{ + Digest: digest.Digest("sha256:937705f5a7ed2202ef5efab32f37940bcdc3f3bd8da5d472a605f92a1ee2abb8"), + MediaType: "application/vnd.oci.image.layer.nydus.blob.v1", + Size: 28104, + Annotations: map[string]string{ + "containerd.io/snapshot/nydus-source-digest": "sha256:3ea3641165a4082d1feb7f933b11589a98f6b44906b3ab7224fd57afdb81bd22", + }, + }) + // This layer will be update by testTargetDesc, with different size. + cs.remoteCache.Add("sha256:d2a0c55811ee17d810756e8abe1c0e1680b83c1674b167116b5d22c92426271e", ocispec.Descriptor{ + Digest: digest.Digest("sha256:29de1420e90dc712e218943e2503bb8f8e7e0ea33094d21386fc6dfb6296cdaf"), + MediaType: "application/vnd.oci.image.layer.nydus.blob.v1", + Size: 30000000, + Annotations: map[string]string{ + "containerd.io/snapshot/nydus-source-digest": "sha256:d2a0c55811ee17d810756e8abe1c0e1680b83c1674b167116b5d22c92426271e", + }, + }) + cs.remoteCache.Add("sha256:61c5a878f9b478eabfb7d805f8b28037bc75fc2723a8ed34a25ed926bb235630", ocispec.Descriptor{ + Digest: digest.Digest("sha256:5068ad3783f63b82ecf9434161923dec024275c2c09f79f517a8eaeba9765488"), + MediaType: "application/vnd.oci.image.layer.nydus.blob.v1", + Size: 4629756, + Annotations: map[string]string{ + "containerd.io/snapshot/nydus-source-digest": "sha256:61c5a878f9b478eabfb7d805f8b28037bc75fc2723a8ed34a25ed926bb235630", + }, + }) + + // The testTargetDesc is from a manifest, will be update to the remote cache. + // More closer to the front, more lower. More closer to the back, more upper. + testTargetDesc := []ocispec.Descriptor{ + { + Digest: digest.Digest("sha256:281acccc8425676d6cb1840e2656409f58da7e0e8d4c07f9092d35f9c9810e20"), + MediaType: "application/vnd.oci.image.layer.nydus.blob.v1", + Size: 39784900, + Annotations: map[string]string{ + "containerd.io/snapshot/nydus-source-digest": "sha256:ccc37bca66e7c29e0d65a4279511fe9a93932a4bb80e79e95144f3812632b61a", + }, + }, + { + Digest: digest.Digest("sha256:29de1420e90dc712e218943e2503bb8f8e7e0ea33094d21386fc6dfb6296cdaf"), + MediaType: "application/vnd.oci.image.layer.nydus.blob.v1", + Size: 38238606, + Annotations: map[string]string{ + "containerd.io/snapshot/nydus-source-digest": "sha256:d2a0c55811ee17d810756e8abe1c0e1680b83c1674b167116b5d22c92426271e", + }, + }, + { + Digest: digest.Digest("sha256:a3a8cb24ca266f5d7513f2c8be9a140c9f5abe223e0d80c68024b2ad5b6c928a"), + MediaType: "application/vnd.oci.image.layer.nydus.blob.v1", + Size: 26308, + Annotations: map[string]string{ + "containerd.io/snapshot/nydus-source-digest": "sha256:163d9761f77314ae2beb0cfdb0f86245bef6071233fece6a3e4a3d1d4db23c5f", + }, + }, + { + Digest: digest.Digest("sha256:a9453f674413979bd6cdaaeb4399cd8d9c5449cf7625860e6d93eb2f916b4e50"), + MediaType: "application/vnd.oci.image.layer.nydus.blob.v1", + Size: 26315, + Annotations: map[string]string{ + "containerd.io/snapshot/nydus-source-digest": "sha256:c767afe512614d4adf05b9136a92c89b6151804d41ca92f005ce3af0032c36de", + }, + }, + } + + // Apdate cache to lru from upper to lower + ctx := context.Background() + for i := len(testTargetDesc) - 1; i >= 0; i-- { + layer := testTargetDesc[i] + _, err := cs.Update(ctx, content.Info{ + Digest: layer.Digest, + Size: layer.Size, + Labels: layer.Annotations, + }) + require.NoError(t, err) + } + require.Equal(t, cs.remoteCache.Size(), 5) + require.Equal(t, "sha256:5068ad3783f63b82ecf9434161923dec024275c2c09f79f517a8eaeba9765488", cs.remoteCache.Values()[0].Digest.String()) + require.Equal(t, "sha256:a9453f674413979bd6cdaaeb4399cd8d9c5449cf7625860e6d93eb2f916b4e50", cs.remoteCache.Values()[1].Digest.String()) + require.Equal(t, "sha256:a3a8cb24ca266f5d7513f2c8be9a140c9f5abe223e0d80c68024b2ad5b6c928a", cs.remoteCache.Values()[2].Digest.String()) + require.Equal(t, "sha256:29de1420e90dc712e218943e2503bb8f8e7e0ea33094d21386fc6dfb6296cdaf", cs.remoteCache.Values()[3].Digest.String()) + require.Equal(t, int64(38238606), cs.remoteCache.Values()[3].Size) + require.Equal(t, "sha256:281acccc8425676d6cb1840e2656409f58da7e0e8d4c07f9092d35f9c9810e20", cs.remoteCache.Values()[4].Digest.String()) +} diff --git a/pkg/content/local.go b/pkg/content/local.go index c1b2e19f..2582ba86 100644 --- a/pkg/content/local.go +++ b/pkg/content/local.go @@ -25,6 +25,7 @@ import ( "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/platforms" "github.com/containerd/containerd/remotes" + "github.com/goharbor/acceleration-service/pkg/config" "github.com/goharbor/acceleration-service/pkg/remote" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" @@ -37,26 +38,24 @@ type LocalProvider struct { content *Content hosts remote.HostFunc platformMC platforms.MatchComparer + cacheRef string } -func NewLocalProvider( - workDir string, - threshold string, - hosts remote.HostFunc, - platformMC platforms.MatchComparer, -) (Provider, *Content, error) { - contentDir := filepath.Join(workDir, "content") +func NewLocalProvider(cfg *config.Config, platformMC platforms.MatchComparer) (Provider, *Content, error) { + contentDir := filepath.Join(cfg.Provider.WorkDir, "content") if err := os.MkdirAll(contentDir, 0755); err != nil { return nil, nil, errors.Wrap(err, "create local provider work directory") } - content, err := NewContent(contentDir, workDir, threshold) + content, err := NewContent(contentDir, cfg.Provider.WorkDir, cfg.Provider.GCPolicy.Threshold, + cfg.EnableRemoteCache(), cfg.Provider.CacheSize, cfg.Host) + if err != nil { return nil, nil, errors.Wrap(err, "create local provider content") } return &LocalProvider{ content: content, images: make(map[string]*ocispec.Descriptor), - hosts: hosts, + hosts: cfg.Host, platformMC: platformMC, }, content, nil } @@ -130,3 +129,15 @@ func (pvd *LocalProvider) getImage(ref string) (*ocispec.Descriptor, error) { } return nil, errdefs.ErrNotFound } + +func (pvd *LocalProvider) SetCacheRef(ref string) { + pvd.mutex.Lock() + defer pvd.mutex.Unlock() + pvd.cacheRef = ref +} + +func (pvd *LocalProvider) GetCacheRef() string { + pvd.mutex.Lock() + defer pvd.mutex.Unlock() + return pvd.cacheRef +} diff --git a/pkg/driver/nydus/utils/constant.go b/pkg/driver/nydus/utils/constant.go index 9053233b..7c552762 100644 --- a/pkg/driver/nydus/utils/constant.go +++ b/pkg/driver/nydus/utils/constant.go @@ -26,6 +26,8 @@ const ( LayerAnnotationNydusBootstrapDigest = "containerd.io/snapshot/nydus-bootstrap-digest" LayerAnnotationNydusFsVersion = "containerd.io/snapshot/nydus-fs-version" LayerAnnotationUncompressed = "containerd.io/uncompressed" + LayerAnnotationNydusSourceDigest = "containerd.io/snapshot/nydus-source-digest" + LayerAnnotationNydusTargetDigest = "containerd.io/snapshot/nydus-target-digest" ) var NydusAnnotations = []string{LayerAnnotationNydusBlob, LayerAnnotationNydusBootstrap, LayerAnnotationNydusRAFSVersion} diff --git a/pkg/errdefs/errors.go b/pkg/errdefs/errors.go index 39effa84..bf3d06e9 100644 --- a/pkg/errdefs/errors.go +++ b/pkg/errdefs/errors.go @@ -17,6 +17,8 @@ var ( ErrConvertFailed = errors.New("ERR_CONVERT_FAILED") ErrAlreadyConverted = errors.New("ERR_ALREADY_CONVERTED") ErrUnhealthy = errors.New("ERR_UNHEALTHY") + ErrIsRemoteCache = errors.New("ERR_IS_REMOTE_CACHE") + ErrNotSupport = errors.New("ERR_NOT_SUPPORT") ) // IsErrHTTPResponseToHTTPSClient returns whether err is diff --git a/pkg/remote/ported.go b/pkg/remote/ported.go new file mode 100644 index 00000000..70c48414 --- /dev/null +++ b/pkg/remote/ported.go @@ -0,0 +1,617 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "path" + "strings" + + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/reference" + docker "github.com/containerd/containerd/remotes/docker" + "github.com/containerd/containerd/tracing" + acceldErrdefs "github.com/goharbor/acceleration-service/pkg/errdefs" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" +) + +const maxRetry = 3 + +// Modified from containerd project, copyright The containerd Authors. +// https://github.com/containerd/containerd/remotes/docker/fetcher.go +func Fetch(ctx context.Context, cacheRef string, desc ocispec.Descriptor, insecureHost HostFunc, plainHTTP bool) (content.ReaderAt, error) { + ctx = log.WithLogger(ctx, log.G(ctx).WithField("digest", desc.Digest)) + + refspec, err := reference.Parse(cacheRef) + if err != nil { + return nil, err + } + cred, insecure, err := insecureHost(cacheRef) + if err != nil { + return nil, err + } + + registryHosts := docker.ConfigureDefaultRegistries( + docker.WithAuthorizer( + docker.NewDockerAuthorizer( + docker.WithAuthClient(newDefaultClient(insecure)), + docker.WithAuthCreds(cred), + ), + ), + docker.WithClient(newDefaultClient(insecure)), + docker.WithPlainHTTP(func(host string) (bool, error) { + return plainHTTP, nil + }), + ) + hosts, err := registryHosts(refspec.Hostname()) + if err != nil { + return nil, err + } + + hosts = filterHosts(hosts, docker.HostCapabilityPull) + if len(hosts) == 0 { + return nil, fmt.Errorf("no pull hosts: %w", errdefs.ErrNotFound) + } + + parts := strings.SplitN(refspec.Locator, "/", 2) + if len(parts) < 2 { + return nil, fmt.Errorf("invalid cache ref: %w", errdefs.ErrInvalidArgument) + } + repository := parts[1] + + ctx, err = docker.ContextWithRepositoryScope(ctx, refspec, false) + if err != nil { + return nil, err + } + + hrs, _ := newHTTPReadSeeker(desc.Size, func(offset int64) (io.ReadCloser, error) { + // firstly try fetch via external urls + for _, us := range desc.URLs { + u, err := url.Parse(us) + if err != nil { + log.G(ctx).WithError(err).Debugf("failed to parse %q", us) + continue + } + if u.Scheme != "http" && u.Scheme != "https" { + log.G(ctx).Debug("non-http(s) alternative url is unsupported") + continue + } + ctx = log.WithLogger(ctx, log.G(ctx).WithField("url", u)) + log.G(ctx).Info("request") + + // Try this first, parse it + host := docker.RegistryHost{ + Client: http.DefaultClient, + Host: u.Host, + Scheme: u.Scheme, + Path: u.Path, + Capabilities: docker.HostCapabilityPull, + } + header := http.Header{} + req := newRequest(header, host, http.MethodGet, repository) + // Strip namespace from base + req.path = u.Path + if u.RawQuery != "" { + req.path = req.path + "?" + u.RawQuery + } + + rc, err := open(ctx, req, desc.MediaType, offset) + if err != nil { + if errdefs.IsNotFound(err) { + continue // try one of the other urls. + } + + return nil, err + } + return rc, nil + } + + // Try manifests endpoints for manifests types + switch desc.MediaType { + case images.MediaTypeDockerSchema2Manifest, images.MediaTypeDockerSchema2ManifestList, + images.MediaTypeDockerSchema1Manifest, + ocispec.MediaTypeImageManifest, ocispec.MediaTypeImageIndex: + + var firstErr error + for _, host := range hosts { + header := http.Header{} + req := newRequest(header, host, http.MethodGet, repository, "manifests", desc.Digest.String()) + if err := req.addNamespace(refspec.Hostname()); err != nil { + return nil, err + } + + rc, err := open(ctx, req, desc.MediaType, offset) + if err != nil { + // Store the error for referencing later + if firstErr == nil { + firstErr = err + } + continue // try another host + } + + return rc, nil + } + + return nil, firstErr + } + + // Finally use blobs endpoints + var firstErr error + for _, host := range hosts { + header := http.Header{} + req := newRequest(header, host, http.MethodGet, repository, "blobs", desc.Digest.String()) + if err := req.addNamespace(refspec.Hostname()); err != nil { + return nil, err + } + + rc, err := open(ctx, req, desc.MediaType, offset) + if err != nil { + // Store the error for referencing later + if firstErr == nil { + firstErr = err + } + continue // try another host + } + + return rc, nil + } + + if errdefs.IsNotFound(firstErr) { + firstErr = fmt.Errorf("could not fetch content descriptor %v (%v) from remote: %w", + desc.Digest, desc.MediaType, errdefs.ErrNotFound, + ) + } + + return nil, firstErr + + }) + // try to use open to trigger http request + if _, err = hrs.open(0); err != nil { + if acceldErrdefs.NeedsRetryWithHTTP(err) { + return Fetch(ctx, cacheRef, desc, insecureHost, true) + } + } + return hrs, nil +} + +// Modified from containerd project, copyright The containerd Authors. +// https://github.com/containerd/containerd/remotes/docker/httpreadseeker.go +type httpReadSeeker struct { + size int64 + offset int64 + rc io.ReadCloser + open func(offset int64) (io.ReadCloser, error) + closed bool + + errsWithNoProgress int +} + +func newHTTPReadSeeker(size int64, open func(offset int64) (io.ReadCloser, error)) (*httpReadSeeker, error) { + return &httpReadSeeker{ + size: size, + open: open, + }, nil +} + +func (hrs *httpReadSeeker) ReadAt(p []byte, offset int64) (n int, err error) { + if _, err = hrs.Seek(offset, io.SeekStart); err != nil { + return 0, err + } + return hrs.Read(p) +} + +func (hrs *httpReadSeeker) Size() int64 { + return hrs.size +} + +func (hrs *httpReadSeeker) Read(p []byte) (n int, err error) { + if hrs.closed { + return 0, io.EOF + } + + rd, err := hrs.reader() + if err != nil { + return 0, err + } + + n, err = rd.Read(p) + hrs.offset += int64(n) + if n > 0 || err == nil { + hrs.errsWithNoProgress = 0 + } + if err == io.ErrUnexpectedEOF { + // connection closed unexpectedly. try reconnecting. + if n == 0 { + hrs.errsWithNoProgress++ + if hrs.errsWithNoProgress > maxRetry { + return // too many retries for this offset with no progress + } + } + if hrs.rc != nil { + if clsErr := hrs.rc.Close(); clsErr != nil { + log.L.WithError(clsErr).Error("httpReadSeeker: failed to close ReadCloser") + } + hrs.rc = nil + } + if _, err2 := hrs.reader(); err2 == nil { + return n, nil + } + } + return +} + +func (hrs *httpReadSeeker) Close() error { + if hrs.closed { + return nil + } + hrs.closed = true + if hrs.rc != nil { + return hrs.rc.Close() + } + + return nil +} + +func (hrs *httpReadSeeker) Seek(offset int64, whence int) (int64, error) { + if hrs.closed { + return 0, fmt.Errorf("Fetcher.Seek: closed: %w", errdefs.ErrUnavailable) + } + + abs := hrs.offset + switch whence { + case io.SeekStart: + abs = offset + case io.SeekCurrent: + abs += offset + case io.SeekEnd: + if hrs.size == -1 { + return 0, fmt.Errorf("Fetcher.Seek: unknown size, cannot seek from end: %w", errdefs.ErrUnavailable) + } + abs = hrs.size + offset + default: + return 0, fmt.Errorf("Fetcher.Seek: invalid whence: %w", errdefs.ErrInvalidArgument) + } + + if abs < 0 { + return 0, fmt.Errorf("Fetcher.Seek: negative offset: %w", errdefs.ErrInvalidArgument) + } + + if abs != hrs.offset { + if hrs.rc != nil { + if err := hrs.rc.Close(); err != nil { + log.L.WithError(err).Error("Fetcher.Seek: failed to close ReadCloser") + } + + hrs.rc = nil + } + + hrs.offset = abs + } + + return hrs.offset, nil +} + +func (hrs *httpReadSeeker) reader() (io.Reader, error) { + if hrs.rc != nil { + return hrs.rc, nil + } + + if hrs.size == -1 || hrs.offset < hrs.size { + // only try to reopen the body request if we are seeking to a value + // less than the actual size. + if hrs.open == nil { + return nil, fmt.Errorf("cannot open: %w", errdefs.ErrNotImplemented) + } + + rc, err := hrs.open(hrs.offset) + if err != nil { + return nil, fmt.Errorf("httpReadSeeker: failed open: %w", err) + } + + if hrs.rc != nil { + if err := hrs.rc.Close(); err != nil { + log.L.WithError(err).Error("httpReadSeeker: failed to close ReadCloser") + } + } + hrs.rc = rc + } else { + // There is an edge case here where offset == size of the content. If + // we seek, we will probably get an error for content that cannot be + // sought (?). In that case, we should err on committing the content, + // as the length is already satisfied but we just return the empty + // reader instead. + + hrs.rc = io.NopCloser(bytes.NewReader([]byte{})) + } + + return hrs.rc, nil +} + +// Ported from containerd project, copyright The containerd Authors. +// https://github.com/containerd/containerd/remotes/docker/fetcher.go +func open(ctx context.Context, req *request, mediatype string, offset int64) (_ io.ReadCloser, retErr error) { + if mediatype == "" { + req.header.Set("Accept", "*/*") + } else { + req.header.Set("Accept", strings.Join([]string{mediatype, `*/*`}, ", ")) + } + + if offset > 0 { + // Note: "Accept-Ranges: bytes" cannot be trusted as some endpoints + // will return the header without supporting the range. The content + // range must always be checked. + req.header.Set("Range", fmt.Sprintf("bytes=%d-", offset)) + } + + resp, err := req.doWithRetries(ctx, nil) + if err != nil { + return nil, err + } + defer func() { + if retErr != nil { + resp.Body.Close() + } + }() + + if resp.StatusCode > 299 { + // TODO(stevvooe): When doing a offset specific request, we should + // really distinguish between a 206 and a 200. In the case of 200, we + // can discard the bytes, hiding the seek behavior from the + // implementation. + + if resp.StatusCode == http.StatusNotFound { + return nil, fmt.Errorf("content at %v not found: %w", req.String(), errdefs.ErrNotFound) + } + var registryErr docker.Errors + if err := json.NewDecoder(resp.Body).Decode(®istryErr); err != nil || registryErr.Len() < 1 { + return nil, fmt.Errorf("unexpected status code %v: %v", req.String(), resp.Status) + } + return nil, fmt.Errorf("unexpected status code %v: %s - Server message: %s", req.String(), resp.Status, registryErr.Error()) + } + if offset > 0 { + cr := resp.Header.Get("content-range") + if cr != "" { + if !strings.HasPrefix(cr, fmt.Sprintf("bytes %d-", offset)) { + return nil, fmt.Errorf("unhandled content range in response: %v", cr) + + } + } else { + // TODO: Should any cases where use of content range + // without the proper header be considered? + // 206 responses? + + // Discard up to offset + // Could use buffer pool here but this case should be rare + n, err := io.Copy(io.Discard, io.LimitReader(resp.Body, offset)) + if err != nil { + return nil, fmt.Errorf("failed to discard to offset: %w", err) + } + if n != offset { + return nil, errors.New("unable to discard to offset") + } + + } + } + + return resp.Body, nil +} + +// Ported from containerd project, copyright The containerd Authors. +// https://github.com/containerd/containerd/remotes/docker/resolver.go +type request struct { + method string + path string + header http.Header + host docker.RegistryHost + body func() (io.ReadCloser, error) + size int64 +} + +func newRequest(dockerHeader http.Header, host docker.RegistryHost, method string, + repository string, ps ...string) *request { + header := dockerHeader.Clone() + if header == nil { + header = http.Header{} + } + + for key, value := range host.Header { + header[key] = append(header[key], value...) + } + parts := append([]string{"/", host.Path, repository}, ps...) + p := path.Join(parts...) + // Join strips trailing slash, re-add ending "/" if included + if len(parts) > 0 && strings.HasSuffix(parts[len(parts)-1], "/") { + p = p + "/" + } + return &request{ + method: method, + path: p, + header: header, + host: host, + } +} + +func (r *request) do(ctx context.Context) (*http.Response, error) { + u := r.host.Scheme + "://" + r.host.Host + r.path + req, err := http.NewRequestWithContext(ctx, r.method, u, nil) + if err != nil { + return nil, err + } + req.Header = http.Header{} // headers need to be copied to avoid concurrent map access + for k, v := range r.header { + req.Header[k] = v + } + if r.body != nil { + body, err := r.body() + if err != nil { + return nil, err + } + req.Body = body + req.GetBody = r.body + if r.size > 0 { + req.ContentLength = r.size + } + } + + ctx = log.WithLogger(ctx, log.G(ctx).WithField("url", u)) + if err := r.authorize(ctx, req); err != nil { + return nil, fmt.Errorf("failed to authorize: %w", err) + } + + client := &http.Client{} + if r.host.Client != nil { + *client = *r.host.Client + } + if client.CheckRedirect == nil { + client.CheckRedirect = func(req *http.Request, via []*http.Request) error { + if len(via) >= 10 { + return errors.New("stopped after 10 redirects") + } + if err := r.authorize(ctx, req); err != nil { + return fmt.Errorf("failed to authorize redirect: %w", err) + } + return nil + } + } + _, httpSpan := tracing.StartSpan( + ctx, + tracing.Name("remotes.docker.resolver", "HTTPRequest"), + tracing.WithHTTPRequest(req), + ) + defer httpSpan.End() + resp, err := client.Do(req) + if err != nil { + httpSpan.SetStatus(err) + return nil, fmt.Errorf("failed to do request: %w", err) + } + httpSpan.SetAttributes(tracing.HTTPStatusCodeAttributes(resp.StatusCode)...) + return resp, nil +} + +func (r *request) doWithRetries(ctx context.Context, responses []*http.Response) (*http.Response, error) { + resp, err := r.do(ctx) + if err != nil { + return nil, err + } + + responses = append(responses, resp) + retry, err := r.retryRequest(ctx, responses) + if err != nil { + resp.Body.Close() + return nil, err + } + if retry { + resp.Body.Close() + return r.doWithRetries(ctx, responses) + } + return resp, err +} + +func (r *request) authorize(ctx context.Context, req *http.Request) error { + // Check if has header for host + if r.host.Authorizer != nil { + if err := r.host.Authorizer.Authorize(ctx, req); err != nil { + return err + } + } + return nil +} + +func (r *request) addNamespace(ns string) (err error) { + if !isProxy(r.host.Host, ns) { + return nil + } + var q url.Values + // Parse query + if i := strings.IndexByte(r.path, '?'); i > 0 { + r.path = r.path[:i+1] + q, err = url.ParseQuery(r.path[i+1:]) + if err != nil { + return + } + } else { + r.path = r.path + "?" + q = url.Values{} + } + q.Add("ns", ns) + + r.path = r.path + q.Encode() + + return +} + +func (r *request) retryRequest(ctx context.Context, responses []*http.Response) (bool, error) { + if len(responses) > 5 { + return false, nil + } + last := responses[len(responses)-1] + switch last.StatusCode { + case http.StatusUnauthorized: + log.G(ctx).WithField("header", last.Header.Get("WWW-Authenticate")).Debug("Unauthorized") + if r.host.Authorizer != nil { + if err := r.host.Authorizer.AddResponses(ctx, responses); err == nil { + return true, nil + } else if !errdefs.IsNotImplemented(err) { + return false, err + } + } + + return false, nil + case http.StatusMethodNotAllowed: + // Support registries which have not properly implemented the HEAD method for + // manifests endpoint + if r.method == http.MethodHead && strings.Contains(r.path, "/manifests/") { + r.method = http.MethodGet + return true, nil + } + case http.StatusRequestTimeout, http.StatusTooManyRequests: + return true, nil + } + + // TODO: Handle 50x errors accounting for attempt history + return false, nil +} + +func (r *request) String() string { + return r.host.Scheme + "://" + r.host.Host + r.path +} + +func isProxy(host, refhost string) bool { + if refhost != host { + if refhost != "docker.io" || host != "registry-1.docker.io" { + return true + } + } + return false +} + +func filterHosts(hosts []docker.RegistryHost, caps docker.HostCapabilities) []docker.RegistryHost { + for _, host := range hosts { + if host.Capabilities.Has(caps) { + hosts = append(hosts, host) + } + } + return hosts +} From f808592e3ae082bc93f3b6f31809d06951355664 Mon Sep 17 00:00:00 2001 From: breezeTuT Date: Tue, 22 Aug 2023 16:27:05 +0800 Subject: [PATCH 2/2] feat: convert with remote cache Here we can pull remote cache, use remote cache for conversion, and push the updated cache back to the remote. When updating, we follow a upper->lower order. Signed-off-by: breezeTuT --- pkg/content/provider.go | 4 + pkg/converter/converter.go | 3 +- pkg/driver/nydus/nydus.go | 195 +++++++++++++++++++++++++++++++++++-- 3 files changed, 194 insertions(+), 8 deletions(-) diff --git a/pkg/content/provider.go b/pkg/content/provider.go index 9dcf6046..e11d4d57 100644 --- a/pkg/content/provider.go +++ b/pkg/content/provider.go @@ -42,4 +42,8 @@ type Provider interface { Image(ctx context.Context, ref string) (*ocispec.Descriptor, error) // ContentStore gets the content store object of containerd. ContentStore() content.Store + // SetCacheRef sets the cache reference of the source image. + SetCacheRef(ref string) + // GetCacheRef gets the cache reference of the source image. + GetCacheRef() string } diff --git a/pkg/converter/converter.go b/pkg/converter/converter.go index fae1d97a..5fe55c15 100644 --- a/pkg/converter/converter.go +++ b/pkg/converter/converter.go @@ -88,7 +88,7 @@ func (cvt *Converter) pull(ctx context.Context, source string) error { return nil } -func (cvt *Converter) Convert(ctx context.Context, source, target string) (*Metric, error) { +func (cvt *Converter) Convert(ctx context.Context, source, target, cacheRef string) (*Metric, error) { var metric Metric sourceNamed, err := docker.ParseDockerRef(source) if err != nil { @@ -122,6 +122,7 @@ func (cvt *Converter) Convert(ctx context.Context, source, target string) (*Metr logger.Infof("converting image %s", source) start = time.Now() + cvt.provider.SetCacheRef(cacheRef) desc, err := cvt.driver.Convert(ctx, cvt.provider, source) if err != nil { return nil, errors.Wrap(err, "convert image") diff --git a/pkg/driver/nydus/nydus.go b/pkg/driver/nydus/nydus.go index 2c621c0c..7e8b7de0 100644 --- a/pkg/driver/nydus/nydus.go +++ b/pkg/driver/nydus/nydus.go @@ -17,7 +17,9 @@ package nydus import ( "bytes" "context" + "encoding/json" "fmt" + "io" "os" "os/exec" "regexp" @@ -25,19 +27,20 @@ import ( "strings" "sync" - "github.com/goharbor/acceleration-service/pkg/adapter/annotation" - accelcontent "github.com/goharbor/acceleration-service/pkg/content" - "github.com/goharbor/acceleration-service/pkg/driver/nydus/parser" - nydusutils "github.com/goharbor/acceleration-service/pkg/driver/nydus/utils" - "github.com/goharbor/acceleration-service/pkg/errdefs" - "github.com/goharbor/acceleration-service/pkg/utils" - + "github.com/containerd/containerd" "github.com/containerd/containerd/content" + containerdErrDefs "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/images" "github.com/containerd/containerd/images/converter" "github.com/containerd/containerd/platforms" "github.com/containerd/nydus-snapshotter/pkg/backend" nydusify "github.com/containerd/nydus-snapshotter/pkg/converter" + "github.com/goharbor/acceleration-service/pkg/adapter/annotation" + accelcontent "github.com/goharbor/acceleration-service/pkg/content" + "github.com/goharbor/acceleration-service/pkg/driver/nydus/parser" + nydusutils "github.com/goharbor/acceleration-service/pkg/driver/nydus/utils" + "github.com/goharbor/acceleration-service/pkg/errdefs" + "github.com/goharbor/acceleration-service/pkg/utils" "github.com/opencontainers/image-spec/specs-go" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" @@ -58,6 +61,8 @@ const ( var builderVersion string var builderVersionOnce sync.Once +type CacheRef struct{} + type chunkDictInfo struct { BootstrapPath string } @@ -224,6 +229,20 @@ func (d *Driver) Convert(ctx context.Context, provider accelcontent.Provider, so if err != nil { return nil, errors.Wrap(err, "get source image") } + + cacheRef := provider.GetCacheRef() + useRemoteCache := cacheRef != "" + if useRemoteCache { + logrus.Infof("remote cache image reference: %s", cacheRef) + if err := d.FetchRemoteCache(ctx, provider, cacheRef); err != nil { + if errors.Is(err, errdefs.ErrNotSupport) { + logrus.Warn("Content store does not support remote cache") + } else { + return nil, errors.Wrap(err, "fetch remote cache") + } + } + } + desc, err := d.convert(ctx, provider, *image, sourceRef) if err != nil { return nil, err @@ -231,6 +250,19 @@ func (d *Driver) Convert(ctx context.Context, provider accelcontent.Provider, so if d.mergeManifest { return d.makeManifestIndex(ctx, provider.ContentStore(), *image, *desc) } + + if useRemoteCache { + // Fetch the old remote cache before updating and pushing the new one to avoid conflict. + if err := d.FetchRemoteCache(ctx, provider, cacheRef); err != nil { + return nil, errors.Wrap(err, "fetch remote cache") + } + if err := d.UpdateRemoteCache(ctx, provider, *image, *desc); err != nil { + return nil, errors.Wrap(err, "update remote cache") + } + if err := d.PushRemoteCache(ctx, provider, cacheRef); err != nil { + return nil, errors.Wrap(err, "push remote cache") + } + } return desc, err } @@ -399,3 +431,152 @@ func (d *Driver) getChunkDict(ctx context.Context, provider accelcontent.Provide return &chunkDict, nil } + +// FetchRemoteCache fetch cache manifest from remote +func (d *Driver) FetchRemoteCache(ctx context.Context, pvd accelcontent.Provider, ref string) error { + resolver, err := pvd.Resolver(ref) + if err != nil { + return err + } + + rc := &containerd.RemoteContext{ + Resolver: resolver, + } + name, desc, err := rc.Resolver.Resolve(ctx, ref) + if err != nil { + if errors.Is(err, containerdErrDefs.ErrNotFound) { + // Remote cache may do not exist, just return nil + return nil + } + return err + } + fetcher, err := rc.Resolver.Fetcher(ctx, name) + if err != nil { + return err + } + ir, err := fetcher.Fetch(ctx, desc) + if err != nil { + if errdefs.NeedsRetryWithHTTP(err) { + pvd.UsePlainHTTP() + ir, err = fetcher.Fetch(ctx, desc) + if err != nil { + return errors.Wrap(err, "try to pull remote cache") + } + } else { + return errors.Wrap(err, "pull remote cache") + } + } + + bytes, err := io.ReadAll(ir) + if err != nil { + return errors.Wrap(err, "read remote cache to bytes") + } + + // TODO: handle manifest list for multiple platform. + manifest := ocispec.Manifest{} + if err = json.Unmarshal(bytes, &manifest); err != nil { + return err + } + + cs := pvd.ContentStore() + for _, layer := range manifest.Layers { + if _, err := cs.Update(ctx, content.Info{ + Digest: layer.Digest, + Size: layer.Size, + Labels: layer.Annotations, + }); err != nil { + if errors.Is(err, containerdErrDefs.ErrNotFound) { + return errdefs.ErrNotSupport + } + return errors.Wrap(err, "update cache layer") + } + } + return nil +} + +// PushRemoteCache update cache manifest and push to remote +func (d *Driver) PushRemoteCache(ctx context.Context, pvd accelcontent.Provider, ref string) error { + imageConfig := ocispec.ImageConfig{} + imageConfigDesc, imageConfigBytes, err := nydusutils.MarshalToDesc(imageConfig, ocispec.MediaTypeImageConfig) + if err != nil { + return errors.Wrap(err, "remote cache image config marshal failed") + } + configReader := bytes.NewReader(imageConfigBytes) + if err = content.WriteBlob(ctx, pvd.ContentStore(), ref, configReader, *imageConfigDesc); err != nil { + return errors.Wrap(err, "remote cache image config write blob failed") + } + + cs := pvd.ContentStore() + layers := []ocispec.Descriptor{} + if err = cs.Walk(ctx, func(info content.Info) error { + if _, ok := info.Labels[nydusutils.LayerAnnotationNydusSourceDigest]; ok { + layers = append(layers, ocispec.Descriptor{ + MediaType: nydusutils.MediaTypeNydusBlob, + Digest: info.Digest, + Size: info.Size, + Annotations: info.Labels, + }) + } + return nil + }); err != nil { + return errors.Wrap(err, "get remote cache layers failed") + } + + manifest := ocispec.Manifest{ + Versioned: specs.Versioned{ + SchemaVersion: 2, + }, + MediaType: ocispec.MediaTypeImageManifest, + Config: *imageConfigDesc, + Layers: layers, + } + manifestDesc, manifestBytes, err := nydusutils.MarshalToDesc(manifest, ocispec.MediaTypeImageManifest) + if err != nil { + return errors.Wrap(err, "remote cache manifest marshal failed") + } + manifestReader := bytes.NewReader(manifestBytes) + if err = content.WriteBlob(ctx, pvd.ContentStore(), ref, manifestReader, *manifestDesc); err != nil { + return errors.Wrap(err, "remote cache write blob failed") + } + + if err = pvd.Push(ctx, *manifestDesc, ref); err != nil { + return err + } + return nil +} + +// UpdateRemoteCache update cache layer from upper to lower +func (d *Driver) UpdateRemoteCache(ctx context.Context, provider accelcontent.Provider, orgDesc ocispec.Descriptor, newDesc ocispec.Descriptor) error { + cs := provider.ContentStore() + + orgManifest := ocispec.Manifest{} + _, err := utils.ReadJSON(ctx, cs, &orgManifest, orgDesc) + if err != nil { + return errors.Wrap(err, "read original manifest json") + } + + newManifest := ocispec.Manifest{} + _, err = utils.ReadJSON(ctx, cs, &newManifest, newDesc) + if err != nil { + return errors.Wrap(err, "read new manifest json") + } + newLayers := newManifest.Layers[:len(newManifest.Layers)-1] + + // Update label for each layer + for i, layer := range newLayers { + layer.Annotations[nydusutils.LayerAnnotationNydusSourceDigest] = orgManifest.Layers[i].Digest.String() + } + + // Update cache to lru from upper to lower + for i := len(newLayers) - 1; i >= 0; i-- { + layer := newLayers[i] + if _, err := cs.Update(ctx, content.Info{ + Digest: layer.Digest, + Size: layer.Size, + Labels: layer.Annotations, + }); err != nil { + return errors.Wrap(err, "update cache layer") + } + } + return nil +}