diff --git a/go.mod b/go.mod index d1d5c56c..8886a2bf 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/docker/cli v23.0.3+incompatible github.com/dustin/go-humanize v1.0.1 github.com/google/uuid v1.3.0 + github.com/hashicorp/golang-lru/v2 v2.0.4 github.com/labstack/echo-contrib v0.14.1 github.com/labstack/echo/v4 v4.10.2 github.com/labstack/gommon v0.4.0 diff --git a/misc/config/config.nydus.ref.yaml b/misc/config/config.nydus.ref.yaml index 8610b0d2..458dad10 100644 --- a/misc/config/config.nydus.ref.yaml +++ b/misc/config/config.nydus.ref.yaml @@ -61,3 +61,6 @@ converter: rules: # add suffix to tag of source image reference as target image reference - tag_suffix: -nydus-oci-ref + # add suffix to tag of remote cache reference, default cache size is 200, leave empty to disable remote cache. + - cache_tag_suffix: -nydus-cache + \ No newline at end of file diff --git a/misc/config/config.nydus.yaml b/misc/config/config.nydus.yaml index abb9733e..80f20aa6 100644 --- a/misc/config/config.nydus.yaml +++ b/misc/config/config.nydus.yaml @@ -100,3 +100,6 @@ converter: rules: # add suffix to tag of source image reference as target image reference - tag_suffix: -nydus + # add suffix to tag of remote cache reference, default cache size is 200, leave empty to disable remote cache. + - cache_tag_suffix: -nydus-cache + \ No newline at end of file diff --git a/pkg/adapter/adapter.go b/pkg/adapter/adapter.go index 2f4350ac..c6744beb 100644 --- a/pkg/adapter/adapter.go +++ b/pkg/adapter/adapter.go @@ -60,8 +60,14 @@ func NewLocalAdapter(cfg *config.Config) (*LocalAdapter, error) { if err != nil { return nil, errors.Wrap(err, "invalid platform configuration") } - - provider, content, err := content.NewLocalProvider(cfg.Provider.WorkDir, cfg.Provider.GCPolicy.Threshold, cfg.Host, platformMC) + useRemoteCache := false + for _, rule := range cfg.Converter.Rules { + if rule.CacheTagSuffix != ""{ + useRemoteCache = true + break + } + } + provider, content, err := content.NewLocalProvider(cfg.Provider.WorkDir, cfg.Provider.GCPolicy.Threshold, cfg.Host, platformMC, useRemoteCache) if err != nil { return nil, errors.Wrap(err, "create content provider") } @@ -95,7 +101,7 @@ func NewLocalAdapter(cfg *config.Config) (*LocalAdapter, error) { } func (adp *LocalAdapter) Convert(ctx context.Context, source string) error { - target, err := adp.rule.Map(source) + target, err := adp.rule.Map(source, TagSuffix) if err != nil { if errors.Is(err, errdefs.ErrAlreadyConverted) { logrus.Infof("image has been converted: %s", source) @@ -103,7 +109,17 @@ func (adp *LocalAdapter) Convert(ctx context.Context, source string) error { } return errors.Wrap(err, "create target reference by rule") } - if _, err = adp.cvt.Convert(ctx, source, target); err != nil { + cacheRef, err := adp.rule.Map(source, CacheTagSuffix) + if err != nil { + if errors.Is(err, errdefs.ErrIsRemoteCache) { + logrus.Infof("image was remote cache: %s", source) + return nil + } + } + if err = adp.content.NewRemoteCache(ctx); err != nil { + return err + } + if _, err = adp.cvt.Convert(ctx, source, target, cacheRef); err != nil { return err } if err := adp.content.GC(ctx); err != nil { diff --git a/pkg/adapter/rule.go b/pkg/adapter/rule.go index ccbb8e0a..4e765818 100644 --- a/pkg/adapter/rule.go +++ b/pkg/adapter/rule.go @@ -24,6 +24,11 @@ import ( "github.com/goharbor/acceleration-service/pkg/errdefs" ) +const ( + TagSuffix = "tag_suffix" + CacheTagSuffix = "cache_tag_suffix" +) + // Add suffix to source image reference as the target // image reference, for example: // Source: 192.168.1.1/nginx:latest @@ -47,16 +52,33 @@ type Rule struct { // Map maps the source image reference to a new one according to // a rule, the new one will be used as the reference of target image. -func (rule *Rule) Map(ref string) (string, error) { - for _, item := range rule.items { - if item.TagSuffix != "" { - if strings.HasSuffix(ref, item.TagSuffix) { - // FIXME: To check if an image has been converted, a better solution - // is to use the annotation on image manifest. - return "", errdefs.ErrAlreadyConverted +func (rule *Rule) Map(ref, opt string) (string, error) { + switch opt { + case TagSuffix: + for _, item := range rule.items { + if item.TagSuffix != "" { + if strings.HasSuffix(ref, item.TagSuffix) { + // FIXME: To check if an image has been converted, a better solution + // is to use the annotation on image manifest. + return "", errdefs.ErrAlreadyConverted + } + return addSuffix(ref, item.TagSuffix) + } + } + case CacheTagSuffix: + for _, item := range rule.items { + if item.CacheTagSuffix != "" { + if strings.HasSuffix(ref, item.CacheTagSuffix) { + // FIXME: Ditto.A better way is to use the annotation on image manifest. + return "", errdefs.ErrIsRemoteCache + } + return addSuffix(ref, item.CacheTagSuffix) } - return addSuffix(ref, item.TagSuffix) } + // CacheTagSuffix empty means do not provide remote cache, just return empty string. + return "", nil + default: + return "", fmt.Errorf("unsupported map option: %s", opt) } return "", errors.New("not found matched conversion rule") } diff --git a/pkg/config/config.go b/pkg/config/config.go index 7d3e2d5e..91d1c10c 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -66,7 +66,8 @@ type SourceConfig struct { } type ConversionRule struct { - TagSuffix string `yaml:"tag_suffix"` + TagSuffix string `yaml:"tag_suffix"` + CacheTagSuffix string `yaml:"cache_tag_suffix"` } type ConverterConfig struct { diff --git a/pkg/content/cache.go b/pkg/content/cache.go new file mode 100644 index 00000000..9eb8e181 --- /dev/null +++ b/pkg/content/cache.go @@ -0,0 +1,97 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package content + +import ( + "errors" + "os" + + lru "github.com/hashicorp/golang-lru/v2" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" +) + +type RemoteCache struct { + // remoteCache is a LRU cache for remote image layers. + remoteCache *lru.Cache[string, ocispec.Descriptor] +} + +func NewRemoteCache(cacheSize int) (*RemoteCache, error) { + remoteCache, err := lru.New[string, ocispec.Descriptor](cacheSize) + if err != nil { + return nil, err + } + return &RemoteCache{ + remoteCache: remoteCache, + }, nil +} + +func (rc *RemoteCache) Values() []ocispec.Descriptor { + return rc.remoteCache.Values() +} + +func (rc *RemoteCache) Get(key string) (ocispec.Descriptor, bool) { + return rc.remoteCache.Get(key) +} + +func (rc *RemoteCache) Add(key string, value ocispec.Descriptor) { + rc.remoteCache.Add(key, value) +} + +func (rc *RemoteCache) Remove(key string) { + rc.remoteCache.Remove(key) +} + +// Update the value of the key, if the key exists, delete the old value and add the new value. +func (rc *RemoteCache) Update(key string, value ocispec.Descriptor) error{ + _, ok := rc.remoteCache.Peek(key) + if ok{ + del := rc.remoteCache.Remove(key) + if !del { + return errors.New("can not remove the key") + } + rc.Add(key, value) + }else{ + rc.remoteCache.Add(key, value) + } + return nil +} + +type FileToReaderAt struct { + file *os.File +} + +func NewFileToReaderAt(filePath string) (*FileToReaderAt, error) { + file, err := os.Open(filePath) + if err != nil { + return nil, err + } + return &FileToReaderAt{file: file}, nil +} + +func (f *FileToReaderAt) ReadAt(p []byte, off int64) (int, error) { + return f.file.ReadAt(p, off) +} + +func (f *FileToReaderAt) Close() error { + return f.file.Close() +} + +func (f *FileToReaderAt) Size() int64 { + fi, err := f.file.Stat() + if err != nil { + return 0 + } + return fi.Size() +} diff --git a/pkg/content/content.go b/pkg/content/content.go index 0c38fab9..4c8dc97f 100644 --- a/pkg/content/content.go +++ b/pkg/content/content.go @@ -23,16 +23,19 @@ import ( "github.com/containerd/containerd/content" "github.com/containerd/containerd/content/local" + "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/leases" "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/metadata/boltutil" "github.com/containerd/containerd/namespaces" "github.com/dustin/go-humanize" + nydusutils "github.com/goharbor/acceleration-service/pkg/driver/nydus/utils" "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" bolt "go.etcd.io/bbolt" + "github.com/containerd/containerd/filters" ) type Content struct { @@ -44,12 +47,14 @@ type Content struct { store content.Store // threshold is the maximum capacity of the local caches storage threshold int64 + // remoteCache is the cache of remote layers + remoteCache *RemoteCache } // NewContent return content support by content store, bolt database and threshold. // content store created in contentDir and bolt database created in databaseDir. // content.db supported by bolt database and content store, content.lm supported by content.db. -func NewContent(contentDir string, databaseDir string, threshold string) (*Content, error) { +func NewContent(contentDir string, databaseDir string, threshold string, useRemoteCache bool) (*Content, error) { store, err := local.NewLabeledStore(contentDir, newMemoryLabelStore()) if err != nil { return nil, errors.Wrap(err, "create local provider content store") @@ -66,11 +71,21 @@ func NewContent(contentDir string, databaseDir string, threshold string) (*Conte if err != nil { return nil, err } + var remoteCache *RemoteCache + if useRemoteCache { + remoteCache, err = NewRemoteCache(200) + if err != nil { + return nil, errors.Wrap(err, "create remote cache") + } + }else{ + remoteCache = nil + } content := Content{ - db: db, + db: db, lm: metadata.NewLeaseManager(db), - store: db.ContentStore(), - threshold: int64(t), + store: db.ContentStore(), + threshold: int64(t), + remoteCache: remoteCache, } return &content, nil } @@ -202,19 +217,77 @@ func (content *Content) updateLease(digest *digest.Digest) error { }) } -func (content *Content) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) { - return content.store.Info(ctx, dgst) +func (c *Content) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) { + info, err := c.store.Info(ctx, dgst) + if c.remoteCache != nil { + if err != nil && errors.Is(err, errdefs.ErrNotFound) { + layers := c.remoteCache.Values() + for _, layer := range layers { + if layer.Digest == dgst { + return content.Info{ + Digest: layer.Digest, + Size: layer.Size, + Labels: layer.Annotations, + }, nil + } + } + } else { + if desc, ok := c.remoteCache.Get(dgst.String()); ok { + if info.Labels == nil{ + info.Labels = map[string]string{} + } + info.Labels[nydusutils.LayerAnnotationNydusTargetDigest] = desc.Digest.String() + } + } + } + return info, err } func (content *Content) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) { + if content.remoteCache != nil { + sourceDesc, ok := info.Labels[nydusutils.LayerAnnotationNydusSourceDigest] + if ok { + l := ocispec.Descriptor{ + MediaType: nydusutils.MediaTypeNydusBlob, + Digest: info.Digest, + Size: info.Size, + Annotations: info.Labels, + } + _, get := content.remoteCache.Get(sourceDesc) + if get { + content.remoteCache.Update(sourceDesc, l) + }else{ + content.remoteCache.Add(sourceDesc, l) + } + return info, nil + } + } if info.Labels != nil { info.Labels = nil } return content.store.Update(ctx, info, fieldpaths...) } -func (content *Content) Walk(ctx context.Context, fn content.WalkFunc, filters ...string) error { - return content.store.Walk(ctx, fn, filters...) +func (c *Content) Walk(ctx context.Context, fn content.WalkFunc, fs ...string) error { + if c.remoteCache != nil { + filter, err := filters.ParseAll(fs...) + if err != nil { + return err + } + for _, layer := range c.remoteCache.Values() { + info := content.Info{ + Digest: layer.Digest, + Size: layer.Size, + Labels: layer.Annotations, + } + if filter.Match(content.AdaptInfo(info)) { + if err := fn(info); err != nil { + return err + } + } + } + } + return c.store.Walk(ctx, fn, fs...) } func (content *Content) Delete(ctx context.Context, dgst digest.Digest) error { @@ -224,6 +297,16 @@ func (content *Content) Delete(ctx context.Context, dgst digest.Digest) error { func (content *Content) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) { readerAt, err := content.store.ReaderAt(ctx, desc) if err != nil { + if content.remoteCache != nil && errors.Is(err, errdefs.ErrNotFound) { + for _, layer := range content.remoteCache.Values() { + if layer.Digest == desc.Digest { + raFile, get := layer.Annotations[nydusutils.LayerAnnotationNydusReaderAtPath] + if get { + return NewFileToReaderAt(raFile) + } + } + } + } return readerAt, err } return readerAt, content.updateLease(&desc.Digest) @@ -255,3 +338,13 @@ func (localWriter localWriter) Commit(ctx context.Context, size int64, expected // we don't write any lables, drop the opts return localWriter.Writer.Commit(ctx, size, expected) } + +func (content *Content) NewRemoteCache(ctx context.Context) error { + if content.remoteCache != nil { + var err error + content.remoteCache, err = NewRemoteCache(200) + return err + } + // If remote cache is not enabled, return nil + return nil +} diff --git a/pkg/content/content_test.go b/pkg/content/content_test.go index ab4eef36..a0b184d8 100644 --- a/pkg/content/content_test.go +++ b/pkg/content/content_test.go @@ -24,7 +24,7 @@ import ( func TestContenSize(t *testing.T) { os.MkdirAll("./tmp", 0755) defer os.RemoveAll("./tmp") - content, err := NewContent("./tmp", "./tmp", "1000MB") + content, err := NewContent("./tmp", "./tmp", "1000MB", false) require.NoError(t, err) size, err := content.Size() require.NoError(t, err) diff --git a/pkg/content/local.go b/pkg/content/local.go index c1b2e19f..f1d2e468 100644 --- a/pkg/content/local.go +++ b/pkg/content/local.go @@ -44,12 +44,14 @@ func NewLocalProvider( threshold string, hosts remote.HostFunc, platformMC platforms.MatchComparer, + useRemoteCache bool, ) (Provider, *Content, error) { contentDir := filepath.Join(workDir, "content") if err := os.MkdirAll(contentDir, 0755); err != nil { return nil, nil, errors.Wrap(err, "create local provider work directory") } - content, err := NewContent(contentDir, workDir, threshold) + content, err := NewContent(contentDir, workDir, threshold, useRemoteCache) + if err != nil { return nil, nil, errors.Wrap(err, "create local provider content") }