Skip to content

Commit

Permalink
Merge pull request #172 from Desiki-high/feat-lrfu
Browse files Browse the repository at this point in the history
feat: add leaseCache to change gc policy
  • Loading branch information
imeoer authored Aug 21, 2023
2 parents f132200 + deed9db commit 1d72948
Show file tree
Hide file tree
Showing 5 changed files with 243 additions and 27 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/docker/cli v23.0.3+incompatible
github.com/dustin/go-humanize v1.0.1
github.com/google/uuid v1.3.0
github.com/hashicorp/golang-lru/v2 v2.0.5
github.com/labstack/echo-contrib v0.14.1
github.com/labstack/echo/v4 v4.10.2
github.com/labstack/gommon v0.4.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/golang-lru/v2 v2.0.5 h1:wW7h1TG88eUIJ2i69gaE3uNVtEPIagzhGvHgwfx2Vm4=
github.com/hashicorp/golang-lru/v2 v2.0.5/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
Expand Down
124 changes: 124 additions & 0 deletions pkg/content/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package content

import (
"context"
"fmt"
"math"
"sort"
"strconv"

"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/namespaces"
lru "github.com/hashicorp/golang-lru/v2"
)

// This is not thread-safe, which means it will depend on the parent implementation to do the locking mechanism.
type leaseCache struct {
caches map[string]*lru.Cache[string, any]
cachesIndex []int
size int
}

// newleaseCache return new empty leaseCache
func newLeaseCache() *leaseCache {
return &leaseCache{
caches: make(map[string]*lru.Cache[string, any]),
cachesIndex: make([]int, 0),
size: 0,
}
}

// Init leaseCache by leases manager from db
func (leaseCache *leaseCache) Init(lm leases.Manager) error {
ls, err := lm.List(namespaces.WithNamespace(context.Background(), accelerationServiceNamespace))
if err != nil {
return err
}
sort.Slice(ls, func(i, j int) bool {
return ls[i].Labels[usedAtLabel] > ls[j].Labels[usedAtLabel]
})
for _, lease := range ls {
if err := leaseCache.Add(lease.ID, lease.Labels[usedCountLabel]); err != nil {
return err
}
}
return nil
}

// Add the key into cache
func (leaseCache *leaseCache) Add(key string, usedCount string) error {
count, err := strconv.Atoi(usedCount)
if err != nil {
return err
}
if cache, ok := leaseCache.caches[usedCount]; ok {
cache.Add(key, nil)
} else {
cache, err := lru.New[string, any](math.MaxInt)
if err != nil {
return err
}
cache.Add(key, nil)
leaseCache.caches[usedCount] = cache
usedCount, err := strconv.Atoi(usedCount)
if err != nil {
return err
}
leaseCache.cachesIndex = append(leaseCache.cachesIndex, usedCount)
sort.Ints(leaseCache.cachesIndex)
}
// remove old cache
if usedCount != "1" {
if cache, ok := leaseCache.caches[strconv.Itoa(count-1)]; ok {
if cache.Contains(key) {
leaseCache.remove(key, strconv.Itoa(count-1))
leaseCache.size--
}
}
}
leaseCache.size++
return nil
}

// Remove oldest key from cache
func (leaseCache *leaseCache) Remove() (string, error) {
if key, _, ok := leaseCache.caches[strconv.Itoa(leaseCache.cachesIndex[0])].GetOldest(); ok {
leaseCache.remove(key, strconv.Itoa(leaseCache.cachesIndex[0]))
leaseCache.size--
return key, nil
}
return "", fmt.Errorf("leaseCache have empty cache with cachesIndex")
}

func (leaseCache *leaseCache) remove(key string, usedCount string) {
leaseCache.caches[usedCount].Remove(key)
if leaseCache.caches[usedCount].Len() == 0 {
delete(leaseCache.caches, usedCount)
var newCachesIndex []int
for _, index := range leaseCache.cachesIndex {
if usedCount != strconv.Itoa(index) {
newCachesIndex = append(newCachesIndex, index)
}
}
leaseCache.cachesIndex = newCachesIndex
}
}

// Len return the size of leaseCache
func (leaseCache *leaseCache) Len() int {
return leaseCache.size
}
62 changes: 62 additions & 0 deletions pkg/content/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package content

import (
"os"
"testing"

"github.com/opencontainers/go-digest"
"github.com/stretchr/testify/require"
)

func TestLeaseCache(t *testing.T) {
lc := newLeaseCache()
require.NoError(t, lc.Add("test_1", "1"))
require.NoError(t, lc.Add("test_2", "1"))
require.NoError(t, lc.Add("test_3", "2"))
require.NoError(t, lc.Add("test_2", "2"))
require.Error(t, lc.Add("test_3", "test"))
require.Equal(t, 3, lc.Len())
key, err := lc.Remove()
require.NoError(t, err)
require.Equal(t, "test_1", key)
key, err = lc.Remove()
require.NoError(t, err)
require.Equal(t, "test_3", key)
key, err = lc.Remove()
require.NoError(t, err)
require.Equal(t, "test_2", key)
}

func TestLeaseCacheInit(t *testing.T) {
os.MkdirAll("./tmp", 0755)
defer os.RemoveAll("./tmp")
content, err := NewContent("./tmp", "./tmp", "100MB")
require.NoError(t, err)
testDigest := []string{
"sha256:9bb13890319dc01e5f8a4d3d0c4c72685654d682d568350fd38a02b1d70aee6b",
"sha256:17dc42e40d4af0a9e84c738313109f3a95e598081beef6c18a05abb57337aa5d",
"sha256:9bb13890319dc01e5f8a4d3d0c4c72685654d682d568350fd38a02b1d70aee6b",
"sha256:17dc42e40d4af0a9e84c738313109f3a95e598081beef6c18a05abb57337aa5d",
"sha256:aeb53f8db8c94d2cd63ca860d635af4307967aa11a2fdead98ae0ab3a329f470",
"sha256:613f4797d2b6653634291a990f3e32378c7cfe3cdd439567b26ca340b8946013",
}
for _, digestString := range testDigest {
require.NoError(t, content.updateLease((*digest.Digest)(&digestString)))
}
lc := newLeaseCache()
require.NoError(t, lc.Init(content.lm))
}
81 changes: 54 additions & 27 deletions pkg/content/content.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ package content

import (
"context"
"fmt"
"path/filepath"
"sort"
"strconv"
"time"

Expand All @@ -33,13 +33,18 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
bolt "go.etcd.io/bbolt"
"golang.org/x/sync/singleflight"
)

type Content struct {
// db is the bolt database of content
db *metadata.DB
// lm is lease manager for managing leases using the provided database transaction.
lm leases.Manager
// gcSingleflight help to resolve concurrent gc
gcSingleflight *singleflight.Group
// lc cache the used count and reference order of lease
lc *leaseCache
// store is the local content store wrapped inner db
store content.Store
// threshold is the maximum capacity of the local caches storage
Expand All @@ -66,11 +71,18 @@ func NewContent(contentDir string, databaseDir string, threshold string) (*Conte
if err != nil {
return nil, err
}
lm := metadata.NewLeaseManager(db)
lc := newLeaseCache()
if err := lc.Init(lm); err != nil {
return nil, err
}
content := Content{
db: db,
lm: metadata.NewLeaseManager(db),
store: db.ContentStore(),
threshold: int64(t),
db: db,
lm: metadata.NewLeaseManager(db),
gcSingleflight: &singleflight.Group{},
lc: lc,
store: db.ContentStore(),
threshold: int64(t),
}
return &content, nil
}
Expand Down Expand Up @@ -108,31 +120,40 @@ func (content *Content) GC(ctx context.Context) error {
}
// if the local content size over eighty percent of threshold, gc start
if size > (content.threshold*int64(80))/100 {
if err := content.cleanLeases(ctx, size-(content.threshold*int64(80))/100); err != nil {
return err
}
gcStatus, err := content.db.GarbageCollect(ctx)
if err != nil {
if _, err, _ := content.gcSingleflight.Do(accelerationServiceNamespace, func() (interface{}, error) {
return nil, content.garbageCollect(ctx, size-(content.threshold*int64(80))/100)
}); err != nil {
return err
}
logrus.Infof("garbage collect, elapse %s", gcStatus.Elapsed())
}
return nil
}

// cleanLeases use lease to manage content blob, delete lease of content which should be gc
func (content *Content) cleanLeases(ctx context.Context, size int64) error {
ls, err := content.lm.List(ctx)
// garbageCollect clean the local caches by lease
func (content *Content) garbageCollect(ctx context.Context, size int64) error {
if err := content.cleanLeases(ctx, size); err != nil {
return err
}
gcStatus, err := content.db.GarbageCollect(ctx)
if err != nil {
return err
}
// TODO: now the order of gc is LRU, should update to LRFU by usedCountLabel
sort.Slice(ls, func(i, j int) bool {
return ls[i].Labels[usedAtLabel] < ls[j].Labels[usedAtLabel]
})
for _, lease := range ls {
logrus.Infof("garbage collect, elapse %s", gcStatus.Elapsed())
return nil
}

// cleanLeases use lease to manage content blob, delete lease of content which should be gc
func (content *Content) cleanLeases(ctx context.Context, size int64) error {
for size > 0 {
if content.lc.Len() == 0 {
return fmt.Errorf("cleanLeases: leaseCache is empty, error caches")
}
digest, err := content.lc.Remove()
if err != nil {
return err
}
if err := content.db.View(func(tx *bolt.Tx) error {
blobsize, err := blobSize(getBlobsBucket(tx).Bucket([]byte(lease.ID)))
blobsize, err := blobSize(getBlobsBucket(tx).Bucket([]byte(digest)))
if err != nil {
return err
}
Expand All @@ -141,15 +162,16 @@ func (content *Content) cleanLeases(ctx context.Context, size int64) error {
}); err != nil {
return err
}
if err := content.lm.Delete(ctx, lease); err != nil {
contentLease, err := content.lm.List(ctx, "id=="+digest)
if err != nil {
return err
}
if size <= 0 {
break
if len(contentLease) != 1 {
return fmt.Errorf("cleanLeases: find lease by digest failed")
}
if err := content.lm.Delete(ctx, contentLease[0]); err != nil {
return err
}
}
if err != nil {
return err
}
return nil
}
Expand Down Expand Up @@ -194,6 +216,9 @@ func (content *Content) updateLease(digest *digest.Digest) error {
}
usedCount++
}
if err := content.lc.Add(digest.String(), strconv.Itoa(usedCount)); err != nil {
return err
}
// write the new labels into lease bucket
return boltutil.WriteLabels(bucket, map[string]string{
usedCountLabel: strconv.Itoa(usedCount),
Expand Down Expand Up @@ -243,15 +268,17 @@ func (content *Content) Abort(ctx context.Context, ref string) error {

func (content *Content) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
writer, err := content.store.Writer(ctx, opts...)
return &localWriter{writer}, err
return &localWriter{writer, content}, err
}

// localWriter wrap the content.Writer
type localWriter struct {
content.Writer
content *Content
}

func (localWriter localWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
// we don't write any lables, drop the opts
localWriter.content.updateLease(&expected)
return localWriter.Writer.Commit(ctx, size, expected)
}

0 comments on commit 1d72948

Please sign in to comment.