From e1d5bbc160f33d2592e52a0b5d29e14d89b6f919 Mon Sep 17 00:00:00 2001 From: Yadong Ding Date: Sun, 6 Aug 2023 11:36:27 +0800 Subject: [PATCH] feat: add leaseManager We can use leaseManager to keep the content blob cache. LeaseManager will create lease of eache content blob and update the time of each blob used time in lease. We call updatetime in content store ReadAt. Signed-off-by: Yadong Ding --- pkg/content/bucket.go | 14 ++++---- pkg/content/content.go | 75 ++++++++++++++++++++++++++++++++++++++---- 2 files changed, 76 insertions(+), 13 deletions(-) diff --git a/pkg/content/bucket.go b/pkg/content/bucket.go index 8b2fe237..40186f64 100644 --- a/pkg/content/bucket.go +++ b/pkg/content/bucket.go @@ -18,21 +18,23 @@ import ( "encoding/binary" "fmt" - "github.com/opencontainers/go-digest" bolt "go.etcd.io/bbolt" ) var ( bucketKeyObjectContent = []byte("content") bucketKeyObjectBlob = []byte("blob") + bucketKeyObjectLeases = []byte("leases") bucketKeyVersion = []byte("v1") bucketKeyNamespace = []byte(accelerationServiceNamespace) bucketKeySize = []byte("size") - bucketKeyUpdatedAt = []byte("updatedat") ) -const accelerationServiceNamespace = "acceleration-service" +const ( + accelerationServiceNamespace = "acceleration-service" + usedAtLabel = "usedat" +) func getBucket(tx *bolt.Tx, keys ...[]byte) *bolt.Bucket { bucket := tx.Bucket(keys[0]) @@ -52,9 +54,9 @@ func getBlobsBucket(tx *bolt.Tx) *bolt.Bucket { return getBucket(tx, bucketKeyVersion, bucketKeyNamespace, bucketKeyObjectContent, bucketKeyObjectBlob) } -// getBlobBucket return the blob bucket by digest -func getBlobBucket(tx *bolt.Tx, digst digest.Digest) *bolt.Bucket { - return getBucket(tx, bucketKeyVersion, bucketKeyNamespace, bucketKeyObjectContent, bucketKeyObjectBlob, []byte(digst.String())) +// get the lease bucket by lease id +func getLeaseBucket(tx *bolt.Tx, lease string) *bolt.Bucket { + return getBucket(tx, bucketKeyVersion, bucketKeyNamespace, bucketKeyObjectLeases, []byte(lease)) } // bolbSize return the content blob size in a bucket diff --git a/pkg/content/content.go b/pkg/content/content.go index f544c57f..e9827241 100644 --- a/pkg/content/content.go +++ b/pkg/content/content.go @@ -17,11 +17,15 @@ package content import ( "context" "path/filepath" + "sort" "time" "github.com/containerd/containerd/content" "github.com/containerd/containerd/content/local" + "github.com/containerd/containerd/leases" "github.com/containerd/containerd/metadata" + "github.com/containerd/containerd/metadata/boltutil" + "github.com/containerd/containerd/namespaces" "github.com/dustin/go-humanize" "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" @@ -33,6 +37,8 @@ import ( type Content struct { // db is the bolt database of content db *metadata.DB + // lm is lease manager for managing leases using the provided database transaction. + lm leases.Manager // store is the local content store wrapped inner db store content.Store // threshold is the maximum capacity of the local caches storage @@ -41,7 +47,7 @@ type Content struct { // NewContent return content support by content store, bolt database and threshold. // content store created in contentDir and bolt database created in databaseDir. -// content.db supported by bolt database and content store. +// content.db supported by bolt database and content store, content.lm supported by content.db. func NewContent(contentDir string, databaseDir string, threshold string) (*Content, error) { store, err := local.NewLabeledStore(contentDir, newMemoryLabelStore()) if err != nil { @@ -61,6 +67,7 @@ func NewContent(contentDir string, databaseDir string, threshold string) (*Conte } content := Content{ db: db, + lm: metadata.NewLeaseManager(db), store: db.ContentStore(), threshold: int64(t), } @@ -99,7 +106,9 @@ func (content *Content) GC(ctx context.Context) error { return err } if size > content.threshold { - // TODO: *metadata.DB.GarbageCollect will clear all caches, we need to rewrite gc + if err := content.cleanLeases(ctx, size-content.threshold); err != nil { + return err + } gcStatus, err := content.db.GarbageCollect(ctx) if err != nil { return err @@ -109,15 +118,67 @@ func (content *Content) GC(ctx context.Context) error { return nil } -// updateTime update the latest used time +// cleanLeases use lease to manage content blob, delete lease of content which should be gc +func (content *Content) cleanLeases(ctx context.Context, size int64) error { + ls, err := content.lm.List(ctx) + if err != nil { + return nil + } + sort.Slice(ls, func(i, j int) bool { + return ls[i].Labels[usedAtLabel] < ls[j].Labels[usedAtLabel] + }) + for _, lease := range ls { + if err := content.db.View(func(tx *bolt.Tx) error { + blobsize, err := blobSize(getBlobsBucket(tx).Bucket([]byte(lease.ID))) + if err != nil { + return err + } + size -= blobsize + return nil + }); err != nil { + return err + } + if err := content.lm.Delete(ctx, lease); err != nil { + return err + } + if size <= 0 { + break + } + } + if err != nil { + return nil + } + return nil +} + +// updateTime update the latest used time in lease func (content *Content) updateTime(digest *digest.Digest) error { - return content.db.Update(func(tx *bolt.Tx) error { - bucket := getBlobBucket(tx, *digest) - updatedAt, err := time.Now().UTC().MarshalBinary() + ctx := namespaces.WithNamespace(context.Background(), accelerationServiceNamespace) + contentLeases, err := content.lm.List(ctx, "id=="+digest.String()) + if err != nil { + return nil + } + if len(contentLeases) == 0 { + l, err := content.lm.Create(ctx, leases.WithID(digest.String())) if err != nil { return err } - return bucket.Put(bucketKeyUpdatedAt, updatedAt) + if err := content.lm.AddResource(ctx, l, leases.Resource{ + ID: digest.String(), + Type: "content", + }); err != nil { + return err + } + } + return content.db.Update(func(tx *bolt.Tx) error { + bucket := getLeaseBucket(tx, digest.String()) + // if can't find lease bucket, it maens content store is empty + if bucket == nil { + return nil + } + return boltutil.WriteLabels(bucket, map[string]string{ + usedAtLabel: time.Now().UTC().String(), + }) }) }