Skip to content

Commit

Permalink
feat: add leaseManager
Browse files Browse the repository at this point in the history
We can use leaseManager to keep the content blob cache. LeaseManager will
create lease of eache content blob and update the time of each blob used time in lease.
We call updatetime in content store ReadAt.

Signed-off-by: Yadong Ding <ding_yadong@foxmail.com>
  • Loading branch information
Desiki-high committed Aug 6, 2023
1 parent 286927a commit e1d5bbc
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 13 deletions.
14 changes: 8 additions & 6 deletions pkg/content/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,23 @@ import (
"encoding/binary"
"fmt"

"github.com/opencontainers/go-digest"
bolt "go.etcd.io/bbolt"
)

var (
bucketKeyObjectContent = []byte("content")
bucketKeyObjectBlob = []byte("blob")
bucketKeyObjectLeases = []byte("leases")

bucketKeyVersion = []byte("v1")
bucketKeyNamespace = []byte(accelerationServiceNamespace)
bucketKeySize = []byte("size")
bucketKeyUpdatedAt = []byte("updatedat")
)

const accelerationServiceNamespace = "acceleration-service"
const (
accelerationServiceNamespace = "acceleration-service"
usedAtLabel = "usedat"
)

func getBucket(tx *bolt.Tx, keys ...[]byte) *bolt.Bucket {
bucket := tx.Bucket(keys[0])
Expand All @@ -52,9 +54,9 @@ func getBlobsBucket(tx *bolt.Tx) *bolt.Bucket {
return getBucket(tx, bucketKeyVersion, bucketKeyNamespace, bucketKeyObjectContent, bucketKeyObjectBlob)
}

// getBlobBucket return the blob bucket by digest
func getBlobBucket(tx *bolt.Tx, digst digest.Digest) *bolt.Bucket {
return getBucket(tx, bucketKeyVersion, bucketKeyNamespace, bucketKeyObjectContent, bucketKeyObjectBlob, []byte(digst.String()))
// get the lease bucket by lease id
func getLeaseBucket(tx *bolt.Tx, lease string) *bolt.Bucket {
return getBucket(tx, bucketKeyVersion, bucketKeyNamespace, bucketKeyObjectLeases, []byte(lease))
}

// bolbSize return the content blob size in a bucket
Expand Down
75 changes: 68 additions & 7 deletions pkg/content/content.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@ package content
import (
"context"
"path/filepath"
"sort"
"time"

"github.com/containerd/containerd/content"
"github.com/containerd/containerd/content/local"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/metadata/boltutil"
"github.com/containerd/containerd/namespaces"
"github.com/dustin/go-humanize"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
Expand All @@ -33,6 +37,8 @@ import (
type Content struct {
// db is the bolt database of content
db *metadata.DB
// lm is lease manager for managing leases using the provided database transaction.
lm leases.Manager
// store is the local content store wrapped inner db
store content.Store
// threshold is the maximum capacity of the local caches storage
Expand All @@ -41,7 +47,7 @@ type Content struct {

// NewContent return content support by content store, bolt database and threshold.
// content store created in contentDir and bolt database created in databaseDir.
// content.db supported by bolt database and content store.
// content.db supported by bolt database and content store, content.lm supported by content.db.
func NewContent(contentDir string, databaseDir string, threshold string) (*Content, error) {
store, err := local.NewLabeledStore(contentDir, newMemoryLabelStore())
if err != nil {
Expand All @@ -61,6 +67,7 @@ func NewContent(contentDir string, databaseDir string, threshold string) (*Conte
}
content := Content{
db: db,
lm: metadata.NewLeaseManager(db),
store: db.ContentStore(),
threshold: int64(t),
}
Expand Down Expand Up @@ -99,7 +106,9 @@ func (content *Content) GC(ctx context.Context) error {
return err
}
if size > content.threshold {
// TODO: *metadata.DB.GarbageCollect will clear all caches, we need to rewrite gc
if err := content.cleanLeases(ctx, size-content.threshold); err != nil {
return err
}
gcStatus, err := content.db.GarbageCollect(ctx)
if err != nil {
return err
Expand All @@ -109,15 +118,67 @@ func (content *Content) GC(ctx context.Context) error {
return nil
}

// updateTime update the latest used time
// cleanLeases use lease to manage content blob, delete lease of content which should be gc
func (content *Content) cleanLeases(ctx context.Context, size int64) error {
ls, err := content.lm.List(ctx)
if err != nil {
return nil
}
sort.Slice(ls, func(i, j int) bool {
return ls[i].Labels[usedAtLabel] < ls[j].Labels[usedAtLabel]
})
for _, lease := range ls {
if err := content.db.View(func(tx *bolt.Tx) error {
blobsize, err := blobSize(getBlobsBucket(tx).Bucket([]byte(lease.ID)))
if err != nil {
return err
}
size -= blobsize
return nil
}); err != nil {
return err
}
if err := content.lm.Delete(ctx, lease); err != nil {
return err
}
if size <= 0 {
break
}
}
if err != nil {
return nil
}
return nil
}

// updateTime update the latest used time in lease
func (content *Content) updateTime(digest *digest.Digest) error {
return content.db.Update(func(tx *bolt.Tx) error {
bucket := getBlobBucket(tx, *digest)
updatedAt, err := time.Now().UTC().MarshalBinary()
ctx := namespaces.WithNamespace(context.Background(), accelerationServiceNamespace)
contentLeases, err := content.lm.List(ctx, "id=="+digest.String())
if err != nil {
return nil
}
if len(contentLeases) == 0 {
l, err := content.lm.Create(ctx, leases.WithID(digest.String()))
if err != nil {
return err
}
return bucket.Put(bucketKeyUpdatedAt, updatedAt)
if err := content.lm.AddResource(ctx, l, leases.Resource{
ID: digest.String(),
Type: "content",
}); err != nil {
return err
}
}
return content.db.Update(func(tx *bolt.Tx) error {
bucket := getLeaseBucket(tx, digest.String())
// if can't find lease bucket, it maens content store is empty
if bucket == nil {
return nil
}
return boltutil.WriteLabels(bucket, map[string]string{
usedAtLabel: time.Now().UTC().String(),
})
})
}

Expand Down

0 comments on commit e1d5bbc

Please sign in to comment.