diff --git a/pkg/content/bucket.go b/pkg/content/bucket.go index 8b2fe237..eeff5b7c 100644 --- a/pkg/content/bucket.go +++ b/pkg/content/bucket.go @@ -18,21 +18,24 @@ import ( "encoding/binary" "fmt" - "github.com/opencontainers/go-digest" bolt "go.etcd.io/bbolt" ) var ( bucketKeyObjectContent = []byte("content") bucketKeyObjectBlob = []byte("blob") + bucketKeyObjectLeases = []byte("leases") bucketKeyVersion = []byte("v1") bucketKeyNamespace = []byte(accelerationServiceNamespace) bucketKeySize = []byte("size") - bucketKeyUpdatedAt = []byte("updatedat") ) -const accelerationServiceNamespace = "acceleration-service" +const ( + accelerationServiceNamespace = "acceleration-service" + usedAtLabel = "usedat" + usedCountLabel = "usedcount" +) func getBucket(tx *bolt.Tx, keys ...[]byte) *bolt.Bucket { bucket := tx.Bucket(keys[0]) @@ -52,9 +55,9 @@ func getBlobsBucket(tx *bolt.Tx) *bolt.Bucket { return getBucket(tx, bucketKeyVersion, bucketKeyNamespace, bucketKeyObjectContent, bucketKeyObjectBlob) } -// getBlobBucket return the blob bucket by digest -func getBlobBucket(tx *bolt.Tx, digst digest.Digest) *bolt.Bucket { - return getBucket(tx, bucketKeyVersion, bucketKeyNamespace, bucketKeyObjectContent, bucketKeyObjectBlob, []byte(digst.String())) +// get the lease bucket by lease id +func getLeaseBucket(tx *bolt.Tx, lease string) *bolt.Bucket { + return getBucket(tx, bucketKeyVersion, bucketKeyNamespace, bucketKeyObjectLeases, []byte(lease)) } // bolbSize return the content blob size in a bucket diff --git a/pkg/content/content.go b/pkg/content/content.go index fbe69da5..0c38fab9 100644 --- a/pkg/content/content.go +++ b/pkg/content/content.go @@ -17,11 +17,16 @@ package content import ( "context" "path/filepath" + "sort" + "strconv" "time" "github.com/containerd/containerd/content" "github.com/containerd/containerd/content/local" + "github.com/containerd/containerd/leases" "github.com/containerd/containerd/metadata" + "github.com/containerd/containerd/metadata/boltutil" + "github.com/containerd/containerd/namespaces" "github.com/dustin/go-humanize" "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" @@ -33,6 +38,8 @@ import ( type Content struct { // db is the bolt database of content db *metadata.DB + // lm is lease manager for managing leases using the provided database transaction. + lm leases.Manager // store is the local content store wrapped inner db store content.Store // threshold is the maximum capacity of the local caches storage @@ -41,7 +48,7 @@ type Content struct { // NewContent return content support by content store, bolt database and threshold. // content store created in contentDir and bolt database created in databaseDir. -// content.db supported by bolt database and content store. +// content.db supported by bolt database and content store, content.lm supported by content.db. func NewContent(contentDir string, databaseDir string, threshold string) (*Content, error) { store, err := local.NewLabeledStore(contentDir, newMemoryLabelStore()) if err != nil { @@ -61,6 +68,7 @@ func NewContent(contentDir string, databaseDir string, threshold string) (*Conte } content := Content{ db: db, + lm: metadata.NewLeaseManager(db), store: db.ContentStore(), threshold: int64(t), } @@ -98,8 +106,11 @@ func (content *Content) GC(ctx context.Context) error { if err != nil { return err } - if size > content.threshold { - // TODO: *metadata.DB.GarbageCollect will clear all caches, we need to rewrite gc + // if the local content size over eighty percent of threshold, gc start + if size > (content.threshold*int64(80))/100 { + if err := content.cleanLeases(ctx, size-(content.threshold*int64(80))/100); err != nil { + return err + } gcStatus, err := content.db.GarbageCollect(ctx) if err != nil { return err @@ -109,15 +120,85 @@ func (content *Content) GC(ctx context.Context) error { return nil } -// updateTime update the latest used time -func (content *Content) updateTime(digest *digest.Digest) error { +// cleanLeases use lease to manage content blob, delete lease of content which should be gc +func (content *Content) cleanLeases(ctx context.Context, size int64) error { + ls, err := content.lm.List(ctx) + if err != nil { + return err + } + // TODO: now the order of gc is LRU, should update to LRFU by usedCountLabel + sort.Slice(ls, func(i, j int) bool { + return ls[i].Labels[usedAtLabel] < ls[j].Labels[usedAtLabel] + }) + for _, lease := range ls { + if err := content.db.View(func(tx *bolt.Tx) error { + blobsize, err := blobSize(getBlobsBucket(tx).Bucket([]byte(lease.ID))) + if err != nil { + return err + } + size -= blobsize + return nil + }); err != nil { + return err + } + if err := content.lm.Delete(ctx, lease); err != nil { + return err + } + if size <= 0 { + break + } + } + if err != nil { + return err + } + return nil +} + +// updateLease update the latest used time and used counts in lease +func (content *Content) updateLease(digest *digest.Digest) error { + ctx := namespaces.WithNamespace(context.Background(), accelerationServiceNamespace) + contentLeases, err := content.lm.List(ctx, "id=="+digest.String()) + if err != nil { + return err + } + if len(contentLeases) == 0 { + l, err := content.lm.Create(ctx, leases.WithID(digest.String())) + if err != nil { + return err + } + if err := content.lm.AddResource(ctx, l, leases.Resource{ + ID: digest.String(), + Type: "content", + }); err != nil { + return err + } + } return content.db.Update(func(tx *bolt.Tx) error { - bucket := getBlobBucket(tx, *digest) - updatedAt, err := time.Now().UTC().MarshalBinary() + bucket := getLeaseBucket(tx, digest.String()) + // if can't find lease bucket, it maens content store is empty + if bucket == nil { + return nil + } + // read the labels from lease bucket + labels, err := boltutil.ReadLabels(bucket) if err != nil { return err } - return bucket.Put(bucketKeyUpdatedAt, updatedAt) + // update the usedCountLabel + usedCount := 1 + count, ok := labels[usedCountLabel] + if ok { + usedCount, err = strconv.Atoi(count) + if err != nil { + return err + } + usedCount++ + } + // write the new labels into lease bucket + return boltutil.WriteLabels(bucket, map[string]string{ + usedCountLabel: strconv.Itoa(usedCount), + usedAtLabel: time.Now().UTC().String(), + }) }) } @@ -145,7 +226,7 @@ func (content *Content) ReaderAt(ctx context.Context, desc ocispec.Descriptor) ( if err != nil { return readerAt, err } - return readerAt, content.updateTime(&desc.Digest) + return readerAt, content.updateLease(&desc.Digest) } func (content *Content) Status(ctx context.Context, ref string) (content.Status, error) {