Skip to content

Commit

Permalink
feat: add leaseManager
Browse files Browse the repository at this point in the history
We can use leaseManager to keep the content blob cache. LeaseManager will
create lease of eache content blob and update the latest used time and
used counts of each blob in lease bucket.
We call updateLease in content store ReadAt and cleanLease in GC.

Signed-off-by: Yadong Ding <ding_yadong@foxmail.com>
  • Loading branch information
Desiki-high committed Aug 9, 2023
1 parent c2a7902 commit 9439819
Showing 1 changed file with 90 additions and 9 deletions.
99 changes: 90 additions & 9 deletions pkg/content/content.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@ package content
import (
"context"
"path/filepath"
"sort"
"strconv"
"time"

"github.com/containerd/containerd/content"
"github.com/containerd/containerd/content/local"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/metadata/boltutil"
"github.com/containerd/containerd/namespaces"
"github.com/dustin/go-humanize"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
Expand All @@ -33,6 +38,8 @@ import (
type Content struct {
// db is the bolt database of content
db *metadata.DB
// lm is lease manager for managing leases using the provided database transaction.
lm leases.Manager
// store is the local content store wrapped inner db
store content.Store
// threshold is the maximum capacity of the local caches storage
Expand All @@ -41,7 +48,7 @@ type Content struct {

// NewContent return content support by content store, bolt database and threshold.
// content store created in contentDir and bolt database created in databaseDir.
// content.db supported by bolt database and content store.
// content.db supported by bolt database and content store, content.lm supported by content.db.
func NewContent(contentDir string, databaseDir string, threshold string) (*Content, error) {
store, err := local.NewLabeledStore(contentDir, newMemoryLabelStore())
if err != nil {
Expand All @@ -61,6 +68,7 @@ func NewContent(contentDir string, databaseDir string, threshold string) (*Conte
}
content := Content{
db: db,
lm: metadata.NewLeaseManager(db),
store: db.ContentStore(),
threshold: int64(t),
}
Expand Down Expand Up @@ -98,8 +106,11 @@ func (content *Content) GC(ctx context.Context) error {
if err != nil {
return err
}
if size > content.threshold {
// TODO: *metadata.DB.GarbageCollect will clear all caches, we need to rewrite gc
// if the local content size over eighty percent of threshold, gc start
if size > (content.threshold*int64(80))/100 {
if err := content.cleanLeases(ctx, size-(content.threshold*int64(80))/100); err != nil {
return err
}
gcStatus, err := content.db.GarbageCollect(ctx)
if err != nil {
return err
Expand All @@ -109,15 +120,85 @@ func (content *Content) GC(ctx context.Context) error {
return nil
}

// updateTime update the latest used time
func (content *Content) updateTime(digest *digest.Digest) error {
// cleanLeases use lease to manage content blob, delete lease of content which should be gc
func (content *Content) cleanLeases(ctx context.Context, size int64) error {
ls, err := content.lm.List(ctx)
if err != nil {
return err
}
// TODO: now the order of gc is LRU, should update to LRFU by usedCountLabel
sort.Slice(ls, func(i, j int) bool {
return ls[i].Labels[usedAtLabel] < ls[j].Labels[usedAtLabel]
})
for _, lease := range ls {
if err := content.db.View(func(tx *bolt.Tx) error {
blobsize, err := blobSize(getBlobsBucket(tx).Bucket([]byte(lease.ID)))
if err != nil {
return err
}
size -= blobsize
return nil
}); err != nil {
return err
}
if err := content.lm.Delete(ctx, lease); err != nil {
return err
}
if size <= 0 {
break
}
}
if err != nil {
return err
}
return nil
}

// updateLease update the latest used time and used counts in lease
func (content *Content) updateLease(digest *digest.Digest) error {
ctx := namespaces.WithNamespace(context.Background(), accelerationServiceNamespace)
contentLeases, err := content.lm.List(ctx, "id=="+digest.String())
if err != nil {
return nil
}
if len(contentLeases) == 0 {
l, err := content.lm.Create(ctx, leases.WithID(digest.String()))
if err != nil {
return err
}
if err := content.lm.AddResource(ctx, l, leases.Resource{
ID: digest.String(),
Type: "content",
}); err != nil {
return err
}
}
return content.db.Update(func(tx *bolt.Tx) error {
bucket := getBlobBucket(tx, *digest)
updatedAt, err := time.Now().UTC().MarshalBinary()
bucket := getLeaseBucket(tx, digest.String())
// if can't find lease bucket, it maens content store is empty
if bucket == nil {
return nil
}
// read the labels from lease bucket
labels, err := boltutil.ReadLabels(bucket)
if err != nil {
return err
}
return bucket.Put(bucketKeyUpdatedAt, updatedAt)
// update the usedCountLabel
usedCount := 1
count, ok := labels[usedCountLabel]
if ok {
usedCount, err = strconv.Atoi(count)
if err != nil {
return err
}
usedCount++
}
// write the new labels into lease bucket
return boltutil.WriteLabels(bucket, map[string]string{
usedCountLabel: strconv.Itoa(usedCount),
usedAtLabel: time.Now().UTC().String(),
})
})
}

Expand Down Expand Up @@ -145,7 +226,7 @@ func (content *Content) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (
if err != nil {
return readerAt, err
}
return readerAt, content.updateTime(&desc.Digest)
return readerAt, content.updateLease(&desc.Digest)
}

func (content *Content) Status(ctx context.Context, ref string) (content.Status, error) {
Expand Down

0 comments on commit 9439819

Please sign in to comment.