Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use lease to manage local cache for gc #157

Merged
merged 2 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions pkg/content/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,24 @@ import (
"encoding/binary"
"fmt"

"github.com/opencontainers/go-digest"
bolt "go.etcd.io/bbolt"
)

var (
bucketKeyObjectContent = []byte("content")
bucketKeyObjectBlob = []byte("blob")
bucketKeyObjectLeases = []byte("leases")

bucketKeyVersion = []byte("v1")
bucketKeyNamespace = []byte(accelerationServiceNamespace)
bucketKeySize = []byte("size")
bucketKeyUpdatedAt = []byte("updatedat")
)

const accelerationServiceNamespace = "acceleration-service"
const (
accelerationServiceNamespace = "acceleration-service"
usedAtLabel = "usedat"
usedCountLabel = "usedcount"
)

func getBucket(tx *bolt.Tx, keys ...[]byte) *bolt.Bucket {
bucket := tx.Bucket(keys[0])
Expand All @@ -52,9 +55,9 @@ func getBlobsBucket(tx *bolt.Tx) *bolt.Bucket {
return getBucket(tx, bucketKeyVersion, bucketKeyNamespace, bucketKeyObjectContent, bucketKeyObjectBlob)
}

// getBlobBucket return the blob bucket by digest
func getBlobBucket(tx *bolt.Tx, digst digest.Digest) *bolt.Bucket {
return getBucket(tx, bucketKeyVersion, bucketKeyNamespace, bucketKeyObjectContent, bucketKeyObjectBlob, []byte(digst.String()))
// get the lease bucket by lease id
func getLeaseBucket(tx *bolt.Tx, lease string) *bolt.Bucket {
return getBucket(tx, bucketKeyVersion, bucketKeyNamespace, bucketKeyObjectLeases, []byte(lease))
}

// bolbSize return the content blob size in a bucket
Expand Down
99 changes: 90 additions & 9 deletions pkg/content/content.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@ package content
import (
"context"
"path/filepath"
"sort"
"strconv"
"time"

"github.com/containerd/containerd/content"
"github.com/containerd/containerd/content/local"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/metadata/boltutil"
"github.com/containerd/containerd/namespaces"
"github.com/dustin/go-humanize"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
Expand All @@ -33,6 +38,8 @@ import (
type Content struct {
// db is the bolt database of content
db *metadata.DB
// lm is lease manager for managing leases using the provided database transaction.
lm leases.Manager
// store is the local content store wrapped inner db
store content.Store
// threshold is the maximum capacity of the local caches storage
Expand All @@ -41,7 +48,7 @@ type Content struct {

// NewContent return content support by content store, bolt database and threshold.
// content store created in contentDir and bolt database created in databaseDir.
// content.db supported by bolt database and content store.
// content.db supported by bolt database and content store, content.lm supported by content.db.
func NewContent(contentDir string, databaseDir string, threshold string) (*Content, error) {
store, err := local.NewLabeledStore(contentDir, newMemoryLabelStore())
if err != nil {
Expand All @@ -61,6 +68,7 @@ func NewContent(contentDir string, databaseDir string, threshold string) (*Conte
}
content := Content{
db: db,
lm: metadata.NewLeaseManager(db),
store: db.ContentStore(),
threshold: int64(t),
}
Expand Down Expand Up @@ -98,8 +106,11 @@ func (content *Content) GC(ctx context.Context) error {
if err != nil {
return err
}
if size > content.threshold {
// TODO: *metadata.DB.GarbageCollect will clear all caches, we need to rewrite gc
// if the local content size over eighty percent of threshold, gc start
if size > (content.threshold*int64(80))/100 {
if err := content.cleanLeases(ctx, size-(content.threshold*int64(80))/100); err != nil {
return err
}
gcStatus, err := content.db.GarbageCollect(ctx)
if err != nil {
return err
Expand All @@ -109,15 +120,85 @@ func (content *Content) GC(ctx context.Context) error {
return nil
}

// updateTime update the latest used time
func (content *Content) updateTime(digest *digest.Digest) error {
// cleanLeases use lease to manage content blob, delete lease of content which should be gc
func (content *Content) cleanLeases(ctx context.Context, size int64) error {
ls, err := content.lm.List(ctx)
if err != nil {
return err
}
// TODO: now the order of gc is LRU, should update to LRFU by usedCountLabel
sort.Slice(ls, func(i, j int) bool {
return ls[i].Labels[usedAtLabel] < ls[j].Labels[usedAtLabel]
})
for _, lease := range ls {
if err := content.db.View(func(tx *bolt.Tx) error {
blobsize, err := blobSize(getBlobsBucket(tx).Bucket([]byte(lease.ID)))
if err != nil {
return err
}
size -= blobsize
return nil
}); err != nil {
return err
}
if err := content.lm.Delete(ctx, lease); err != nil {
return err
}
if size <= 0 {
break
}
}
if err != nil {
return err
}
return nil
}

// updateLease update the latest used time and used counts in lease
func (content *Content) updateLease(digest *digest.Digest) error {
ctx := namespaces.WithNamespace(context.Background(), accelerationServiceNamespace)
contentLeases, err := content.lm.List(ctx, "id=="+digest.String())
if err != nil {
return err
}
if len(contentLeases) == 0 {
l, err := content.lm.Create(ctx, leases.WithID(digest.String()))
if err != nil {
return err
}
if err := content.lm.AddResource(ctx, l, leases.Resource{
ID: digest.String(),
Type: "content",
}); err != nil {
return err
}
}
return content.db.Update(func(tx *bolt.Tx) error {
bucket := getBlobBucket(tx, *digest)
updatedAt, err := time.Now().UTC().MarshalBinary()
bucket := getLeaseBucket(tx, digest.String())
// if can't find lease bucket, it maens content store is empty
if bucket == nil {
return nil
}
// read the labels from lease bucket
labels, err := boltutil.ReadLabels(bucket)
if err != nil {
return err
}
return bucket.Put(bucketKeyUpdatedAt, updatedAt)
// update the usedCountLabel
usedCount := 1
count, ok := labels[usedCountLabel]
if ok {
usedCount, err = strconv.Atoi(count)
if err != nil {
return err
}
usedCount++
}
// write the new labels into lease bucket
return boltutil.WriteLabels(bucket, map[string]string{
usedCountLabel: strconv.Itoa(usedCount),
usedAtLabel: time.Now().UTC().String(),
})
})
}

Expand Down Expand Up @@ -145,7 +226,7 @@ func (content *Content) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (
if err != nil {
return readerAt, err
}
return readerAt, content.updateTime(&desc.Digest)
return readerAt, content.updateLease(&desc.Digest)
}

func (content *Content) Status(ctx context.Context, ref string) (content.Status, error) {
Expand Down
Loading