From ca1afc33a07f263091bb360201ab64d7de709b24 Mon Sep 17 00:00:00 2001 From: Yadong Ding Date: Wed, 16 Aug 2023 10:58:55 +0800 Subject: [PATCH 1/4] chore: use singleflight for gc Signed-off-by: Yadong Ding --- pkg/content/content.go | 34 ++++++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/pkg/content/content.go b/pkg/content/content.go index 0c38fab9..ac4b8122 100644 --- a/pkg/content/content.go +++ b/pkg/content/content.go @@ -33,6 +33,7 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" bolt "go.etcd.io/bbolt" + "golang.org/x/sync/singleflight" ) type Content struct { @@ -40,6 +41,8 @@ type Content struct { db *metadata.DB // lm is lease manager for managing leases using the provided database transaction. lm leases.Manager + // gcSingleflight help to resolve concurrent gc + gcSingleflight *singleflight.Group // store is the local content store wrapped inner db store content.Store // threshold is the maximum capacity of the local caches storage @@ -67,10 +70,11 @@ func NewContent(contentDir string, databaseDir string, threshold string) (*Conte return nil, err } content := Content{ - db: db, - lm: metadata.NewLeaseManager(db), - store: db.ContentStore(), - threshold: int64(t), + db: db, + lm: metadata.NewLeaseManager(db), + gcSingleflight: &singleflight.Group{}, + store: db.ContentStore(), + threshold: int64(t), } return &content, nil } @@ -108,18 +112,28 @@ func (content *Content) GC(ctx context.Context) error { } // if the local content size over eighty percent of threshold, gc start if size > (content.threshold*int64(80))/100 { - if err := content.cleanLeases(ctx, size-(content.threshold*int64(80))/100); err != nil { - return err - } - gcStatus, err := content.db.GarbageCollect(ctx) - if err != nil { + if _, err, _ := content.gcSingleflight.Do(accelerationServiceNamespace, func() (interface{}, error) { + return nil, content.garbageCollect(ctx, size-(content.threshold*int64(80))/100) + }); err != nil { return err } - logrus.Infof("garbage collect, elapse %s", gcStatus.Elapsed()) } return nil } +// garbageCollect clean the local caches by lease +func (content *Content) garbageCollect(ctx context.Context, size int64) error { + if err := content.cleanLeases(ctx, size); err != nil { + return err + } + gcStatus, err := content.db.GarbageCollect(ctx) + if err != nil { + return err + } + logrus.Infof("garbage collect, elapse %s", gcStatus.Elapsed()) + return nil +} + // cleanLeases use lease to manage content blob, delete lease of content which should be gc func (content *Content) cleanLeases(ctx context.Context, size int64) error { ls, err := content.lm.List(ctx) From a6111db030bccd583e6ca63a00a5a7584dbe0865 Mon Sep 17 00:00:00 2001 From: Yadong Ding Date: Wed, 16 Aug 2023 11:01:32 +0800 Subject: [PATCH 2/4] feat: add leaseCache LeaseCache will cache the used count and last used order of each blob. Gc can clean the lease by leaseCache. Signed-off-by: Yadong Ding --- go.mod | 1 + go.sum | 2 + pkg/content/cache.go | 124 +++++++++++++++++++++++++++++++++++++++++ pkg/content/content.go | 45 +++++++++------ 4 files changed, 155 insertions(+), 17 deletions(-) create mode 100644 pkg/content/cache.go diff --git a/go.mod b/go.mod index d1d5c56c..5f7f5e99 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/docker/cli v23.0.3+incompatible github.com/dustin/go-humanize v1.0.1 github.com/google/uuid v1.3.0 + github.com/hashicorp/golang-lru/v2 v2.0.5 github.com/labstack/echo-contrib v0.14.1 github.com/labstack/echo/v4 v4.10.2 github.com/labstack/gommon v0.4.0 diff --git a/go.sum b/go.sum index 3c5622cc..e400b3cc 100644 --- a/go.sum +++ b/go.sum @@ -151,6 +151,8 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/golang-lru/v2 v2.0.5 h1:wW7h1TG88eUIJ2i69gaE3uNVtEPIagzhGvHgwfx2Vm4= +github.com/hashicorp/golang-lru/v2 v2.0.5/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= diff --git a/pkg/content/cache.go b/pkg/content/cache.go new file mode 100644 index 00000000..34a2bf0a --- /dev/null +++ b/pkg/content/cache.go @@ -0,0 +1,124 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package content + +import ( + "context" + "fmt" + "math" + "sort" + "strconv" + + "github.com/containerd/containerd/leases" + "github.com/containerd/containerd/namespaces" + lru "github.com/hashicorp/golang-lru/v2" +) + +// This is not thread-safe, which means it will depend on the parent implementation to do the locking mechanism. +type leaseCache struct { + caches map[string]*lru.Cache[string, any] + cachesIndex []int + size int +} + +// newleaseCache return new empty leaseCache +func newLeaseCache() *leaseCache { + return &leaseCache{ + caches: make(map[string]*lru.Cache[string, any]), + cachesIndex: make([]int, 0), + size: 0, + } +} + +// Init leaseCache by leases manager from db +func (leaseCache *leaseCache) Init(lm leases.Manager) error { + ls, err := lm.List(namespaces.WithNamespace(context.Background(), accelerationServiceNamespace)) + if err != nil { + return err + } + sort.Slice(ls, func(i, j int) bool { + return ls[i].Labels[usedAtLabel] > ls[j].Labels[usedAtLabel] + }) + for _, lease := range ls { + if err := leaseCache.Add(lease.ID, lease.Labels[usedCountLabel]); err != nil { + return err + } + } + return nil +} + +// Add the key into cache +func (leaseCache *leaseCache) Add(key string, usedCount string) error { + count, err := strconv.Atoi(usedCount) + if err != nil { + return err + } + if cache, ok := leaseCache.caches[usedCount]; ok { + cache.Add(key, nil) + } else { + cache, err := lru.New[string, any](math.MaxInt) + if err != nil { + return err + } + cache.Add(key, nil) + leaseCache.caches[usedCount] = cache + usedCount, err := strconv.Atoi(usedCount) + if err != nil { + return err + } + leaseCache.cachesIndex = append(leaseCache.cachesIndex, usedCount) + sort.Ints(leaseCache.cachesIndex) + } + // remove old cache + if usedCount != "1" { + if cache, ok := leaseCache.caches[strconv.Itoa(count-1)]; ok { + if cache.Contains(key) { + leaseCache.remove(key, strconv.Itoa(count-1)) + leaseCache.size-- + } + } + } + leaseCache.size++ + return nil +} + +// Remove oldest key from cache +func (leaseCache *leaseCache) Remove() (string, error) { + if key, _, ok := leaseCache.caches[strconv.Itoa(leaseCache.cachesIndex[0])].GetOldest(); ok { + leaseCache.remove(key, strconv.Itoa(leaseCache.cachesIndex[0])) + leaseCache.size-- + return key, nil + } + return "", fmt.Errorf("leaseCache have empty cache with cachesIndex") +} + +func (leaseCache *leaseCache) remove(key string, usedCount string) { + leaseCache.caches[usedCount].Remove(key) + if leaseCache.caches[usedCount].Len() == 0 { + delete(leaseCache.caches, usedCount) + var newCachesIndex []int + for _, index := range leaseCache.cachesIndex { + if usedCount != strconv.Itoa(index) { + newCachesIndex = append(newCachesIndex, index) + } + } + leaseCache.cachesIndex = newCachesIndex + } +} + +// Len return the size of leaseCache +func (leaseCache *leaseCache) Len() int { + return leaseCache.size +} diff --git a/pkg/content/content.go b/pkg/content/content.go index ac4b8122..5759133b 100644 --- a/pkg/content/content.go +++ b/pkg/content/content.go @@ -16,8 +16,8 @@ package content import ( "context" + "fmt" "path/filepath" - "sort" "strconv" "time" @@ -43,6 +43,8 @@ type Content struct { lm leases.Manager // gcSingleflight help to resolve concurrent gc gcSingleflight *singleflight.Group + // lc cache the used count and reference order of lease + lc *leaseCache // store is the local content store wrapped inner db store content.Store // threshold is the maximum capacity of the local caches storage @@ -69,10 +71,16 @@ func NewContent(contentDir string, databaseDir string, threshold string) (*Conte if err != nil { return nil, err } + lm := metadata.NewLeaseManager(db) + lc := newLeaseCache() + if err := lc.Init(lm); err != nil { + return nil, err + } content := Content{ db: db, lm: metadata.NewLeaseManager(db), gcSingleflight: &singleflight.Group{}, + lc: lc, store: db.ContentStore(), threshold: int64(t), } @@ -136,17 +144,16 @@ func (content *Content) garbageCollect(ctx context.Context, size int64) error { // cleanLeases use lease to manage content blob, delete lease of content which should be gc func (content *Content) cleanLeases(ctx context.Context, size int64) error { - ls, err := content.lm.List(ctx) - if err != nil { - return err - } - // TODO: now the order of gc is LRU, should update to LRFU by usedCountLabel - sort.Slice(ls, func(i, j int) bool { - return ls[i].Labels[usedAtLabel] < ls[j].Labels[usedAtLabel] - }) - for _, lease := range ls { + for size > 0 { + if content.lc.Len() == 0 { + return fmt.Errorf("cleanLeases: leaseCache is empty, error caches") + } + digest, err := content.lc.Remove() + if err != nil { + return err + } if err := content.db.View(func(tx *bolt.Tx) error { - blobsize, err := blobSize(getBlobsBucket(tx).Bucket([]byte(lease.ID))) + blobsize, err := blobSize(getBlobsBucket(tx).Bucket([]byte(digest))) if err != nil { return err } @@ -155,15 +162,16 @@ func (content *Content) cleanLeases(ctx context.Context, size int64) error { }); err != nil { return err } - if err := content.lm.Delete(ctx, lease); err != nil { + contentLease, err := content.lm.List(ctx, "id=="+digest) + if err != nil { return err } - if size <= 0 { - break + if len(contentLease) != 1 { + return fmt.Errorf("cleanLeases: find lease by digest failed") + } + if err := content.lm.Delete(ctx, contentLease[0]); err != nil { + return err } - } - if err != nil { - return err } return nil } @@ -208,6 +216,9 @@ func (content *Content) updateLease(digest *digest.Digest) error { } usedCount++ } + if err := content.lc.Add(digest.String(), strconv.Itoa(usedCount)); err != nil { + return err + } // write the new labels into lease bucket return boltutil.WriteLabels(bucket, map[string]string{ usedCountLabel: strconv.Itoa(usedCount), From a6bf3b331099c0df8983006b9c84e6d83c0f5a88 Mon Sep 17 00:00:00 2001 From: Yadong Ding Date: Sat, 19 Aug 2023 00:38:29 +0800 Subject: [PATCH 3/4] chore: add test for leaseCache Signed-off-by: Yadong Ding --- pkg/content/cache_test.go | 62 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 pkg/content/cache_test.go diff --git a/pkg/content/cache_test.go b/pkg/content/cache_test.go new file mode 100644 index 00000000..148d04f5 --- /dev/null +++ b/pkg/content/cache_test.go @@ -0,0 +1,62 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package content + +import ( + "os" + "testing" + + "github.com/opencontainers/go-digest" + "github.com/stretchr/testify/require" +) + +func TestLeaseCache(t *testing.T) { + lc := newLeaseCache() + require.NoError(t, lc.Add("test_1", "1")) + require.NoError(t, lc.Add("test_2", "1")) + require.NoError(t, lc.Add("test_3", "2")) + require.NoError(t, lc.Add("test_2", "2")) + require.Error(t, lc.Add("test_3", "test")) + require.Equal(t, 3, lc.Len()) + key, err := lc.Remove() + require.NoError(t, err) + require.Equal(t, "test_1", key) + key, err = lc.Remove() + require.NoError(t, err) + require.Equal(t, "test_3", key) + key, err = lc.Remove() + require.NoError(t, err) + require.Equal(t, "test_2", key) +} + +func TestLeaseCacheInit(t *testing.T) { + os.MkdirAll("./tmp", 0755) + defer os.RemoveAll("./tmp") + content, err := NewContent("./tmp", "./tmp", "100MB") + require.NoError(t, err) + testDigest := []string{ + "sha256:9bb13890319dc01e5f8a4d3d0c4c72685654d682d568350fd38a02b1d70aee6b", + "sha256:17dc42e40d4af0a9e84c738313109f3a95e598081beef6c18a05abb57337aa5d", + "sha256:9bb13890319dc01e5f8a4d3d0c4c72685654d682d568350fd38a02b1d70aee6b", + "sha256:17dc42e40d4af0a9e84c738313109f3a95e598081beef6c18a05abb57337aa5d", + "sha256:aeb53f8db8c94d2cd63ca860d635af4307967aa11a2fdead98ae0ab3a329f470", + "sha256:613f4797d2b6653634291a990f3e32378c7cfe3cdd439567b26ca340b8946013", + } + for _, digestString := range testDigest { + require.NoError(t, content.updateLease((*digest.Digest)(&digestString))) + } + lc := newLeaseCache() + require.NoError(t, lc.Init(content.lm)) +} From deed9db694d8bf06cbd80a10a2debc3ab6452518 Mon Sep 17 00:00:00 2001 From: Yadong Ding Date: Sat, 19 Aug 2023 22:24:38 +0800 Subject: [PATCH 4/4] fix: update lease in commit To make leaseCache is synchronized with local storage before read each blob. Signed-off-by: Yadong Ding --- pkg/content/content.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/content/content.go b/pkg/content/content.go index 5759133b..7f0d3126 100644 --- a/pkg/content/content.go +++ b/pkg/content/content.go @@ -268,15 +268,17 @@ func (content *Content) Abort(ctx context.Context, ref string) error { func (content *Content) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) { writer, err := content.store.Writer(ctx, opts...) - return &localWriter{writer}, err + return &localWriter{writer, content}, err } // localWriter wrap the content.Writer type localWriter struct { content.Writer + content *Content } func (localWriter localWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { // we don't write any lables, drop the opts + localWriter.content.updateLease(&expected) return localWriter.Writer.Commit(ctx, size, expected) }