Skip to content

Commit

Permalink
feat: wrap content to support remote cache
Browse files Browse the repository at this point in the history
We can use  LRU in content store  and wrap some methods to provides  remote cache. Here Info, Update, ReaderAt, Walk methods were wraped and they can get, update, and do other operations in LRU cache.
Also, we can configure whether to enable cache or not with cache_tag_suffix.And we can config cache size.

Signed-off-by: YuQiang <y_q_email@163.com>
  • Loading branch information
PerseidMeteor committed Aug 21, 2023
1 parent 1d72948 commit 416c0d3
Show file tree
Hide file tree
Showing 13 changed files with 966 additions and 40 deletions.
4 changes: 4 additions & 0 deletions misc/config/config.nydus.ref.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ provider:
gcpolicy:
# size threshold that triggers GC, the oldest used blobs will be reclaimed if exceeds the size.
threshold: 1000MB
# remote cache record capacity of converted layers, default is 200.
cache_size: 200

converter:
# number of worker for executing conversion task
Expand Down Expand Up @@ -61,3 +63,5 @@ converter:
rules:
# add suffix to tag of source image reference as target image reference
- tag_suffix: -nydus-oci-ref
# add suffix to tag of source image reference as remote cache reference, leave empty to disable remote cache.
- cache_tag_suffix: -nydus-cache
4 changes: 4 additions & 0 deletions misc/config/config.nydus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ provider:
gcpolicy:
# size threshold that triggers GC, the oldest used blobs will be reclaimed if exceeds the size.
threshold: 1000MB
# remote cache record capacity of converted layers, default is 200.
cache_size: 200

converter:
# number of worker for executing conversion task
Expand Down Expand Up @@ -100,3 +102,5 @@ converter:
rules:
# add suffix to tag of source image reference as target image reference
- tag_suffix: -nydus
# add suffix to tag of source image reference as remote cache reference, leave empty to disable remote cache.
- cache_tag_suffix: -nydus-cache
16 changes: 13 additions & 3 deletions pkg/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func NewLocalAdapter(cfg *config.Config) (*LocalAdapter, error) {
return nil, errors.Wrap(err, "invalid platform configuration")
}

provider, content, err := content.NewLocalProvider(cfg.Provider.WorkDir, cfg.Provider.GCPolicy.Threshold, cfg.Host, platformMC)
provider, content, err := content.NewLocalProvider(cfg, platformMC)
if err != nil {
return nil, errors.Wrap(err, "create content provider")
}
Expand Down Expand Up @@ -95,15 +95,25 @@ func NewLocalAdapter(cfg *config.Config) (*LocalAdapter, error) {
}

func (adp *LocalAdapter) Convert(ctx context.Context, source string) error {
target, err := adp.rule.Map(source)
target, err := adp.rule.Map(source, TagSuffix)
if err != nil {
if errors.Is(err, errdefs.ErrAlreadyConverted) {
logrus.Infof("image has been converted: %s", source)
return nil
}
return errors.Wrap(err, "create target reference by rule")
}
if _, err = adp.cvt.Convert(ctx, source, target); err != nil {
cacheRef, err := adp.rule.Map(source, CacheTagSuffix)
if err != nil {
if errors.Is(err, errdefs.ErrIsRemoteCache) {
logrus.Infof("image was remote cache: %s", source)
return nil
}
}
if err = adp.content.NewRemoteCache(cacheRef); err != nil {
return err
}
if _, err = adp.cvt.Convert(ctx, source, target, cacheRef); err != nil {
return err
}
if err := adp.content.GC(ctx); err != nil {
Expand Down
38 changes: 30 additions & 8 deletions pkg/adapter/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ import (
"github.com/goharbor/acceleration-service/pkg/errdefs"
)

const (
TagSuffix = "tag_suffix"
CacheTagSuffix = "cache_tag_suffix"
)

// Add suffix to source image reference as the target
// image reference, for example:
// Source: 192.168.1.1/nginx:latest
Expand All @@ -47,16 +52,33 @@ type Rule struct {

// Map maps the source image reference to a new one according to
// a rule, the new one will be used as the reference of target image.
func (rule *Rule) Map(ref string) (string, error) {
for _, item := range rule.items {
if item.TagSuffix != "" {
if strings.HasSuffix(ref, item.TagSuffix) {
// FIXME: To check if an image has been converted, a better solution
// is to use the annotation on image manifest.
return "", errdefs.ErrAlreadyConverted
func (rule *Rule) Map(ref, opt string) (string, error) {
switch opt {
case TagSuffix:
for _, item := range rule.items {
if item.TagSuffix != "" {
if strings.HasSuffix(ref, item.TagSuffix) {
// FIXME: To check if an image has been converted, a better solution
// is to use the annotation on image manifest.
return "", errdefs.ErrAlreadyConverted
}
return addSuffix(ref, item.TagSuffix)
}
}
case CacheTagSuffix:
for _, item := range rule.items {
if item.CacheTagSuffix != "" {
if strings.HasSuffix(ref, item.CacheTagSuffix) {
// FIXME: Ditto.A better way is to use the annotation on image manifest.
return "", errdefs.ErrIsRemoteCache
}
return addSuffix(ref, item.CacheTagSuffix)
}
return addSuffix(ref, item.TagSuffix)
}
// CacheTagSuffix empty means do not provide remote cache, just return empty string.
return "", nil
default:
return "", fmt.Errorf("unsupported map option: %s", opt)
}
return "", errors.New("not found matched conversion rule")
}
19 changes: 15 additions & 4 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ type MetricConfig struct {
}

type ProviderConfig struct {
Source map[string]SourceConfig `yaml:"source"`
WorkDir string `yaml:"work_dir"`
GCPolicy GCPolicy `yaml:"gcpolicy"`
Source map[string]SourceConfig `yaml:"source"`
WorkDir string `yaml:"work_dir"`
GCPolicy GCPolicy `yaml:"gcpolicy"`
CacheSize int `yaml:"cache_size"`
}

type GCPolicy struct {
Expand All @@ -66,7 +67,8 @@ type SourceConfig struct {
}

type ConversionRule struct {
TagSuffix string `yaml:"tag_suffix"`
TagSuffix string `yaml:"tag_suffix"`
CacheTagSuffix string `yaml:"cache_tag_suffix"`
}

type ConverterConfig struct {
Expand Down Expand Up @@ -137,3 +139,12 @@ func (cfg *Config) Host(ref string) (remote.CredentialFunc, bool, error) {
return ary[0], ary[1], nil
}, auth.Insecure, nil
}

func (cfg *Config) EnableRemoteCache() bool {
for _, rule := range cfg.Converter.Rules {
if rule.CacheTagSuffix != "" {
return true
}
}
return false
}
60 changes: 60 additions & 0 deletions pkg/content/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (

"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/namespaces"
"github.com/goharbor/acceleration-service/pkg/remote"
lru "github.com/hashicorp/golang-lru/v2"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)

// This is not thread-safe, which means it will depend on the parent implementation to do the locking mechanism.
Expand Down Expand Up @@ -122,3 +124,61 @@ func (leaseCache *leaseCache) remove(key string, usedCount string) {
func (leaseCache *leaseCache) Len() int {
return leaseCache.size
}

type RemoteCache struct {
// remoteCache is an LRU cache for caching target layer descriptors, the cache key is the source layer digest,
// and the cache value is the target layer descriptor after conversion.
remoteCache *lru.Cache[string, ocispec.Descriptor]
// cacheRef is the remote cache reference.
cacheRef string
// host is a func to provide registry credential by host name.
host remote.HostFunc
// cacheSize is the remote cache record capacity of converted layers.
cacheSize int
}

func NewRemoteCache(cacheSize int, host remote.HostFunc) (*RemoteCache, error) {
remoteCache, err := lru.New[string, ocispec.Descriptor](cacheSize)
if err != nil {
return nil, err
}
return &RemoteCache{
remoteCache: remoteCache,
host: host,
cacheSize: cacheSize,
}, nil
}

func (rc *RemoteCache) Values() []ocispec.Descriptor {
return rc.remoteCache.Values()
}

func (rc *RemoteCache) Get(key string) (ocispec.Descriptor, bool) {
return rc.remoteCache.Get(key)
}

func (rc *RemoteCache) Add(key string, value ocispec.Descriptor) {
rc.remoteCache.Add(key, value)
}

func (rc *RemoteCache) Remove(key string) {
rc.remoteCache.Remove(key)
}

// Size returns the number of items in the cache.
func (rc *RemoteCache) Size() int {
return rc.remoteCache.Len()

}

func (rc *RemoteCache) NewLRUCache(cacheSize int, cacheRef string) error {
if rc != nil {
remoteCache, err := lru.New[string, ocispec.Descriptor](cacheSize)
if err != nil {
return err
}
rc.remoteCache = remoteCache
rc.cacheRef = cacheRef
}
return nil
}
2 changes: 1 addition & 1 deletion pkg/content/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestLeaseCache(t *testing.T) {
func TestLeaseCacheInit(t *testing.T) {
os.MkdirAll("./tmp", 0755)
defer os.RemoveAll("./tmp")
content, err := NewContent("./tmp", "./tmp", "100MB")
content, err := NewContent("./tmp", "./tmp", "100MB", false, 200, nil)
require.NoError(t, err)
testDigest := []string{
"sha256:9bb13890319dc01e5f8a4d3d0c4c72685654d682d568350fd38a02b1d70aee6b",
Expand Down
Loading

0 comments on commit 416c0d3

Please sign in to comment.