Skip to content

Commit

Permalink
Merge pull request #157 from Desiki-high/feat-lm
Browse files Browse the repository at this point in the history
feat: use lease to manage local cache for gc
  • Loading branch information
imeoer authored Aug 10, 2023
2 parents 6493dc1 + 5066efc commit f132200
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 15 deletions.
15 changes: 9 additions & 6 deletions pkg/content/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,24 @@ import (
"encoding/binary"
"fmt"

"github.com/opencontainers/go-digest"
bolt "go.etcd.io/bbolt"
)

var (
bucketKeyObjectContent = []byte("content")
bucketKeyObjectBlob = []byte("blob")
bucketKeyObjectLeases = []byte("leases")

bucketKeyVersion = []byte("v1")
bucketKeyNamespace = []byte(accelerationServiceNamespace)
bucketKeySize = []byte("size")
bucketKeyUpdatedAt = []byte("updatedat")
)

const accelerationServiceNamespace = "acceleration-service"
const (
accelerationServiceNamespace = "acceleration-service"
usedAtLabel = "usedat"
usedCountLabel = "usedcount"
)

func getBucket(tx *bolt.Tx, keys ...[]byte) *bolt.Bucket {
bucket := tx.Bucket(keys[0])
Expand All @@ -52,9 +55,9 @@ func getBlobsBucket(tx *bolt.Tx) *bolt.Bucket {
return getBucket(tx, bucketKeyVersion, bucketKeyNamespace, bucketKeyObjectContent, bucketKeyObjectBlob)
}

// getBlobBucket return the blob bucket by digest
func getBlobBucket(tx *bolt.Tx, digst digest.Digest) *bolt.Bucket {
return getBucket(tx, bucketKeyVersion, bucketKeyNamespace, bucketKeyObjectContent, bucketKeyObjectBlob, []byte(digst.String()))
// get the lease bucket by lease id
func getLeaseBucket(tx *bolt.Tx, lease string) *bolt.Bucket {
return getBucket(tx, bucketKeyVersion, bucketKeyNamespace, bucketKeyObjectLeases, []byte(lease))
}

// bolbSize return the content blob size in a bucket
Expand Down
99 changes: 90 additions & 9 deletions pkg/content/content.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@ package content
import (
"context"
"path/filepath"
"sort"
"strconv"
"time"

"github.com/containerd/containerd/content"
"github.com/containerd/containerd/content/local"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/metadata/boltutil"
"github.com/containerd/containerd/namespaces"
"github.com/dustin/go-humanize"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
Expand All @@ -33,6 +38,8 @@ import (
type Content struct {
// db is the bolt database of content
db *metadata.DB
// lm is lease manager for managing leases using the provided database transaction.
lm leases.Manager
// store is the local content store wrapped inner db
store content.Store
// threshold is the maximum capacity of the local caches storage
Expand All @@ -41,7 +48,7 @@ type Content struct {

// NewContent return content support by content store, bolt database and threshold.
// content store created in contentDir and bolt database created in databaseDir.
// content.db supported by bolt database and content store.
// content.db supported by bolt database and content store, content.lm supported by content.db.
func NewContent(contentDir string, databaseDir string, threshold string) (*Content, error) {
store, err := local.NewLabeledStore(contentDir, newMemoryLabelStore())
if err != nil {
Expand All @@ -61,6 +68,7 @@ func NewContent(contentDir string, databaseDir string, threshold string) (*Conte
}
content := Content{
db: db,
lm: metadata.NewLeaseManager(db),
store: db.ContentStore(),
threshold: int64(t),
}
Expand Down Expand Up @@ -98,8 +106,11 @@ func (content *Content) GC(ctx context.Context) error {
if err != nil {
return err
}
if size > content.threshold {
// TODO: *metadata.DB.GarbageCollect will clear all caches, we need to rewrite gc
// if the local content size over eighty percent of threshold, gc start
if size > (content.threshold*int64(80))/100 {
if err := content.cleanLeases(ctx, size-(content.threshold*int64(80))/100); err != nil {
return err
}
gcStatus, err := content.db.GarbageCollect(ctx)
if err != nil {
return err
Expand All @@ -109,15 +120,85 @@ func (content *Content) GC(ctx context.Context) error {
return nil
}

// updateTime update the latest used time
func (content *Content) updateTime(digest *digest.Digest) error {
// cleanLeases use lease to manage content blob, delete lease of content which should be gc
func (content *Content) cleanLeases(ctx context.Context, size int64) error {
ls, err := content.lm.List(ctx)
if err != nil {
return err
}
// TODO: now the order of gc is LRU, should update to LRFU by usedCountLabel
sort.Slice(ls, func(i, j int) bool {
return ls[i].Labels[usedAtLabel] < ls[j].Labels[usedAtLabel]
})
for _, lease := range ls {
if err := content.db.View(func(tx *bolt.Tx) error {
blobsize, err := blobSize(getBlobsBucket(tx).Bucket([]byte(lease.ID)))
if err != nil {
return err
}
size -= blobsize
return nil
}); err != nil {
return err
}
if err := content.lm.Delete(ctx, lease); err != nil {
return err
}
if size <= 0 {
break
}
}
if err != nil {
return err
}
return nil
}

// updateLease update the latest used time and used counts in lease
func (content *Content) updateLease(digest *digest.Digest) error {
ctx := namespaces.WithNamespace(context.Background(), accelerationServiceNamespace)
contentLeases, err := content.lm.List(ctx, "id=="+digest.String())
if err != nil {
return err
}
if len(contentLeases) == 0 {
l, err := content.lm.Create(ctx, leases.WithID(digest.String()))
if err != nil {
return err
}
if err := content.lm.AddResource(ctx, l, leases.Resource{
ID: digest.String(),
Type: "content",
}); err != nil {
return err
}
}
return content.db.Update(func(tx *bolt.Tx) error {
bucket := getBlobBucket(tx, *digest)
updatedAt, err := time.Now().UTC().MarshalBinary()
bucket := getLeaseBucket(tx, digest.String())
// if can't find lease bucket, it maens content store is empty
if bucket == nil {
return nil
}
// read the labels from lease bucket
labels, err := boltutil.ReadLabels(bucket)
if err != nil {
return err
}
return bucket.Put(bucketKeyUpdatedAt, updatedAt)
// update the usedCountLabel
usedCount := 1
count, ok := labels[usedCountLabel]
if ok {
usedCount, err = strconv.Atoi(count)
if err != nil {
return err
}
usedCount++
}
// write the new labels into lease bucket
return boltutil.WriteLabels(bucket, map[string]string{
usedCountLabel: strconv.Itoa(usedCount),
usedAtLabel: time.Now().UTC().String(),
})
})
}

Expand Down Expand Up @@ -145,7 +226,7 @@ func (content *Content) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (
if err != nil {
return readerAt, err
}
return readerAt, content.updateTime(&desc.Digest)
return readerAt, content.updateLease(&desc.Digest)
}

func (content *Content) Status(ctx context.Context, ref string) (content.Status, error) {
Expand Down

0 comments on commit f132200

Please sign in to comment.