From 60bdaccaaed4a4c1c2ccb0972da1e6055d4388e2 Mon Sep 17 00:00:00 2001 From: breezeTuT Date: Wed, 16 Aug 2023 00:04:38 +0800 Subject: [PATCH] feat: wrap content to support remote cache We can use LRU in content store and wrap some methods to provides remote cache. Here Info, Update, ReaderAt, Walk methods were wraped and they can get, update, and do other operations in LRU cache. Also, we can configure whether to enable cache or not with cache_tag_suffix. Signed-off-by: breezeTuT --- go.mod | 1 + misc/config/config.nydus.ref.yaml | 3 + misc/config/config.nydus.yaml | 3 + pkg/adapter/adapter.go | 24 +++++-- pkg/adapter/rule.go | 38 ++++++++--- pkg/config/config.go | 3 +- pkg/content/cache.go | 97 ++++++++++++++++++++++++++ pkg/content/content.go | 109 +++++++++++++++++++++++++++--- pkg/content/content_test.go | 2 +- pkg/content/local.go | 4 +- 10 files changed, 261 insertions(+), 23 deletions(-) create mode 100644 pkg/content/cache.go diff --git a/go.mod b/go.mod index d1d5c56c..8886a2bf 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/docker/cli v23.0.3+incompatible github.com/dustin/go-humanize v1.0.1 github.com/google/uuid v1.3.0 + github.com/hashicorp/golang-lru/v2 v2.0.4 github.com/labstack/echo-contrib v0.14.1 github.com/labstack/echo/v4 v4.10.2 github.com/labstack/gommon v0.4.0 diff --git a/misc/config/config.nydus.ref.yaml b/misc/config/config.nydus.ref.yaml index 8610b0d2..458dad10 100644 --- a/misc/config/config.nydus.ref.yaml +++ b/misc/config/config.nydus.ref.yaml @@ -61,3 +61,6 @@ converter: rules: # add suffix to tag of source image reference as target image reference - tag_suffix: -nydus-oci-ref + # add suffix to tag of remote cache reference, default cache size is 200, leave empty to disable remote cache. + - cache_tag_suffix: -nydus-cache + \ No newline at end of file diff --git a/misc/config/config.nydus.yaml b/misc/config/config.nydus.yaml index abb9733e..80f20aa6 100644 --- a/misc/config/config.nydus.yaml +++ b/misc/config/config.nydus.yaml @@ -100,3 +100,6 @@ converter: rules: # add suffix to tag of source image reference as target image reference - tag_suffix: -nydus + # add suffix to tag of remote cache reference, default cache size is 200, leave empty to disable remote cache. + - cache_tag_suffix: -nydus-cache + \ No newline at end of file diff --git a/pkg/adapter/adapter.go b/pkg/adapter/adapter.go index 2f4350ac..c6744beb 100644 --- a/pkg/adapter/adapter.go +++ b/pkg/adapter/adapter.go @@ -60,8 +60,14 @@ func NewLocalAdapter(cfg *config.Config) (*LocalAdapter, error) { if err != nil { return nil, errors.Wrap(err, "invalid platform configuration") } - - provider, content, err := content.NewLocalProvider(cfg.Provider.WorkDir, cfg.Provider.GCPolicy.Threshold, cfg.Host, platformMC) + useRemoteCache := false + for _, rule := range cfg.Converter.Rules { + if rule.CacheTagSuffix != ""{ + useRemoteCache = true + break + } + } + provider, content, err := content.NewLocalProvider(cfg.Provider.WorkDir, cfg.Provider.GCPolicy.Threshold, cfg.Host, platformMC, useRemoteCache) if err != nil { return nil, errors.Wrap(err, "create content provider") } @@ -95,7 +101,7 @@ func NewLocalAdapter(cfg *config.Config) (*LocalAdapter, error) { } func (adp *LocalAdapter) Convert(ctx context.Context, source string) error { - target, err := adp.rule.Map(source) + target, err := adp.rule.Map(source, TagSuffix) if err != nil { if errors.Is(err, errdefs.ErrAlreadyConverted) { logrus.Infof("image has been converted: %s", source) @@ -103,7 +109,17 @@ func (adp *LocalAdapter) Convert(ctx context.Context, source string) error { } return errors.Wrap(err, "create target reference by rule") } - if _, err = adp.cvt.Convert(ctx, source, target); err != nil { + cacheRef, err := adp.rule.Map(source, CacheTagSuffix) + if err != nil { + if errors.Is(err, errdefs.ErrIsRemoteCache) { + logrus.Infof("image was remote cache: %s", source) + return nil + } + } + if err = adp.content.NewRemoteCache(ctx); err != nil { + return err + } + if _, err = adp.cvt.Convert(ctx, source, target, cacheRef); err != nil { return err } if err := adp.content.GC(ctx); err != nil { diff --git a/pkg/adapter/rule.go b/pkg/adapter/rule.go index ccbb8e0a..4e765818 100644 --- a/pkg/adapter/rule.go +++ b/pkg/adapter/rule.go @@ -24,6 +24,11 @@ import ( "github.com/goharbor/acceleration-service/pkg/errdefs" ) +const ( + TagSuffix = "tag_suffix" + CacheTagSuffix = "cache_tag_suffix" +) + // Add suffix to source image reference as the target // image reference, for example: // Source: 192.168.1.1/nginx:latest @@ -47,16 +52,33 @@ type Rule struct { // Map maps the source image reference to a new one according to // a rule, the new one will be used as the reference of target image. -func (rule *Rule) Map(ref string) (string, error) { - for _, item := range rule.items { - if item.TagSuffix != "" { - if strings.HasSuffix(ref, item.TagSuffix) { - // FIXME: To check if an image has been converted, a better solution - // is to use the annotation on image manifest. - return "", errdefs.ErrAlreadyConverted +func (rule *Rule) Map(ref, opt string) (string, error) { + switch opt { + case TagSuffix: + for _, item := range rule.items { + if item.TagSuffix != "" { + if strings.HasSuffix(ref, item.TagSuffix) { + // FIXME: To check if an image has been converted, a better solution + // is to use the annotation on image manifest. + return "", errdefs.ErrAlreadyConverted + } + return addSuffix(ref, item.TagSuffix) + } + } + case CacheTagSuffix: + for _, item := range rule.items { + if item.CacheTagSuffix != "" { + if strings.HasSuffix(ref, item.CacheTagSuffix) { + // FIXME: Ditto.A better way is to use the annotation on image manifest. + return "", errdefs.ErrIsRemoteCache + } + return addSuffix(ref, item.CacheTagSuffix) } - return addSuffix(ref, item.TagSuffix) } + // CacheTagSuffix empty means do not provide remote cache, just return empty string. + return "", nil + default: + return "", fmt.Errorf("unsupported map option: %s", opt) } return "", errors.New("not found matched conversion rule") } diff --git a/pkg/config/config.go b/pkg/config/config.go index 7d3e2d5e..91d1c10c 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -66,7 +66,8 @@ type SourceConfig struct { } type ConversionRule struct { - TagSuffix string `yaml:"tag_suffix"` + TagSuffix string `yaml:"tag_suffix"` + CacheTagSuffix string `yaml:"cache_tag_suffix"` } type ConverterConfig struct { diff --git a/pkg/content/cache.go b/pkg/content/cache.go new file mode 100644 index 00000000..9eb8e181 --- /dev/null +++ b/pkg/content/cache.go @@ -0,0 +1,97 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package content + +import ( + "errors" + "os" + + lru "github.com/hashicorp/golang-lru/v2" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" +) + +type RemoteCache struct { + // remoteCache is a LRU cache for remote image layers. + remoteCache *lru.Cache[string, ocispec.Descriptor] +} + +func NewRemoteCache(cacheSize int) (*RemoteCache, error) { + remoteCache, err := lru.New[string, ocispec.Descriptor](cacheSize) + if err != nil { + return nil, err + } + return &RemoteCache{ + remoteCache: remoteCache, + }, nil +} + +func (rc *RemoteCache) Values() []ocispec.Descriptor { + return rc.remoteCache.Values() +} + +func (rc *RemoteCache) Get(key string) (ocispec.Descriptor, bool) { + return rc.remoteCache.Get(key) +} + +func (rc *RemoteCache) Add(key string, value ocispec.Descriptor) { + rc.remoteCache.Add(key, value) +} + +func (rc *RemoteCache) Remove(key string) { + rc.remoteCache.Remove(key) +} + +// Update the value of the key, if the key exists, delete the old value and add the new value. +func (rc *RemoteCache) Update(key string, value ocispec.Descriptor) error{ + _, ok := rc.remoteCache.Peek(key) + if ok{ + del := rc.remoteCache.Remove(key) + if !del { + return errors.New("can not remove the key") + } + rc.Add(key, value) + }else{ + rc.remoteCache.Add(key, value) + } + return nil +} + +type FileToReaderAt struct { + file *os.File +} + +func NewFileToReaderAt(filePath string) (*FileToReaderAt, error) { + file, err := os.Open(filePath) + if err != nil { + return nil, err + } + return &FileToReaderAt{file: file}, nil +} + +func (f *FileToReaderAt) ReadAt(p []byte, off int64) (int, error) { + return f.file.ReadAt(p, off) +} + +func (f *FileToReaderAt) Close() error { + return f.file.Close() +} + +func (f *FileToReaderAt) Size() int64 { + fi, err := f.file.Stat() + if err != nil { + return 0 + } + return fi.Size() +} diff --git a/pkg/content/content.go b/pkg/content/content.go index 0c38fab9..4c8dc97f 100644 --- a/pkg/content/content.go +++ b/pkg/content/content.go @@ -23,16 +23,19 @@ import ( "github.com/containerd/containerd/content" "github.com/containerd/containerd/content/local" + "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/leases" "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/metadata/boltutil" "github.com/containerd/containerd/namespaces" "github.com/dustin/go-humanize" + nydusutils "github.com/goharbor/acceleration-service/pkg/driver/nydus/utils" "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" bolt "go.etcd.io/bbolt" + "github.com/containerd/containerd/filters" ) type Content struct { @@ -44,12 +47,14 @@ type Content struct { store content.Store // threshold is the maximum capacity of the local caches storage threshold int64 + // remoteCache is the cache of remote layers + remoteCache *RemoteCache } // NewContent return content support by content store, bolt database and threshold. // content store created in contentDir and bolt database created in databaseDir. // content.db supported by bolt database and content store, content.lm supported by content.db. -func NewContent(contentDir string, databaseDir string, threshold string) (*Content, error) { +func NewContent(contentDir string, databaseDir string, threshold string, useRemoteCache bool) (*Content, error) { store, err := local.NewLabeledStore(contentDir, newMemoryLabelStore()) if err != nil { return nil, errors.Wrap(err, "create local provider content store") @@ -66,11 +71,21 @@ func NewContent(contentDir string, databaseDir string, threshold string) (*Conte if err != nil { return nil, err } + var remoteCache *RemoteCache + if useRemoteCache { + remoteCache, err = NewRemoteCache(200) + if err != nil { + return nil, errors.Wrap(err, "create remote cache") + } + }else{ + remoteCache = nil + } content := Content{ - db: db, + db: db, lm: metadata.NewLeaseManager(db), - store: db.ContentStore(), - threshold: int64(t), + store: db.ContentStore(), + threshold: int64(t), + remoteCache: remoteCache, } return &content, nil } @@ -202,19 +217,77 @@ func (content *Content) updateLease(digest *digest.Digest) error { }) } -func (content *Content) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) { - return content.store.Info(ctx, dgst) +func (c *Content) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) { + info, err := c.store.Info(ctx, dgst) + if c.remoteCache != nil { + if err != nil && errors.Is(err, errdefs.ErrNotFound) { + layers := c.remoteCache.Values() + for _, layer := range layers { + if layer.Digest == dgst { + return content.Info{ + Digest: layer.Digest, + Size: layer.Size, + Labels: layer.Annotations, + }, nil + } + } + } else { + if desc, ok := c.remoteCache.Get(dgst.String()); ok { + if info.Labels == nil{ + info.Labels = map[string]string{} + } + info.Labels[nydusutils.LayerAnnotationNydusTargetDigest] = desc.Digest.String() + } + } + } + return info, err } func (content *Content) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) { + if content.remoteCache != nil { + sourceDesc, ok := info.Labels[nydusutils.LayerAnnotationNydusSourceDigest] + if ok { + l := ocispec.Descriptor{ + MediaType: nydusutils.MediaTypeNydusBlob, + Digest: info.Digest, + Size: info.Size, + Annotations: info.Labels, + } + _, get := content.remoteCache.Get(sourceDesc) + if get { + content.remoteCache.Update(sourceDesc, l) + }else{ + content.remoteCache.Add(sourceDesc, l) + } + return info, nil + } + } if info.Labels != nil { info.Labels = nil } return content.store.Update(ctx, info, fieldpaths...) } -func (content *Content) Walk(ctx context.Context, fn content.WalkFunc, filters ...string) error { - return content.store.Walk(ctx, fn, filters...) +func (c *Content) Walk(ctx context.Context, fn content.WalkFunc, fs ...string) error { + if c.remoteCache != nil { + filter, err := filters.ParseAll(fs...) + if err != nil { + return err + } + for _, layer := range c.remoteCache.Values() { + info := content.Info{ + Digest: layer.Digest, + Size: layer.Size, + Labels: layer.Annotations, + } + if filter.Match(content.AdaptInfo(info)) { + if err := fn(info); err != nil { + return err + } + } + } + } + return c.store.Walk(ctx, fn, fs...) } func (content *Content) Delete(ctx context.Context, dgst digest.Digest) error { @@ -224,6 +297,16 @@ func (content *Content) Delete(ctx context.Context, dgst digest.Digest) error { func (content *Content) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) { readerAt, err := content.store.ReaderAt(ctx, desc) if err != nil { + if content.remoteCache != nil && errors.Is(err, errdefs.ErrNotFound) { + for _, layer := range content.remoteCache.Values() { + if layer.Digest == desc.Digest { + raFile, get := layer.Annotations[nydusutils.LayerAnnotationNydusReaderAtPath] + if get { + return NewFileToReaderAt(raFile) + } + } + } + } return readerAt, err } return readerAt, content.updateLease(&desc.Digest) @@ -255,3 +338,13 @@ func (localWriter localWriter) Commit(ctx context.Context, size int64, expected // we don't write any lables, drop the opts return localWriter.Writer.Commit(ctx, size, expected) } + +func (content *Content) NewRemoteCache(ctx context.Context) error { + if content.remoteCache != nil { + var err error + content.remoteCache, err = NewRemoteCache(200) + return err + } + // If remote cache is not enabled, return nil + return nil +} diff --git a/pkg/content/content_test.go b/pkg/content/content_test.go index ab4eef36..a0b184d8 100644 --- a/pkg/content/content_test.go +++ b/pkg/content/content_test.go @@ -24,7 +24,7 @@ import ( func TestContenSize(t *testing.T) { os.MkdirAll("./tmp", 0755) defer os.RemoveAll("./tmp") - content, err := NewContent("./tmp", "./tmp", "1000MB") + content, err := NewContent("./tmp", "./tmp", "1000MB", false) require.NoError(t, err) size, err := content.Size() require.NoError(t, err) diff --git a/pkg/content/local.go b/pkg/content/local.go index c1b2e19f..f1d2e468 100644 --- a/pkg/content/local.go +++ b/pkg/content/local.go @@ -44,12 +44,14 @@ func NewLocalProvider( threshold string, hosts remote.HostFunc, platformMC platforms.MatchComparer, + useRemoteCache bool, ) (Provider, *Content, error) { contentDir := filepath.Join(workDir, "content") if err := os.MkdirAll(contentDir, 0755); err != nil { return nil, nil, errors.Wrap(err, "create local provider work directory") } - content, err := NewContent(contentDir, workDir, threshold) + content, err := NewContent(contentDir, workDir, threshold, useRemoteCache) + if err != nil { return nil, nil, errors.Wrap(err, "create local provider content") }