Skip to content

Commit

Permalink
Hash object names used as cache keys for CachingBucket
Browse files Browse the repository at this point in the history
When caching rule groups, the base64 encoded name of the rule group is
used as part of the key for caching its contents. For long rule group
names, this can exceed the max key length of Memcached. To solve this
we use a cryptographic hash of the object name when the hashed version
of the name is shorter than the full name. This is the same approach
taken for postings in the store-gateway `indexcache`.

This has the added benefit of not invalidating most existing cache
entries when rolling out this change. With the hash function picked,
key generation is between 5 and 10 times slower than _not_ hashing
the key but still dramatically faster than a network operation.

Related #9386
  • Loading branch information
56quarters committed Nov 25, 2024
1 parent acda41c commit e4de319
Show file tree
Hide file tree
Showing 3 changed files with 211 additions and 128 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
* [FEATURE] Alertmanager: limit added for maximum size of the Grafana configuration (`-alertmanager.max-config-size-bytes`). #9402
* [FEATURE] Ingester: Experimental support for ingesting out-of-order native histograms. This is disabled by default and can be enabled by setting `-ingester.ooo-native-histograms-ingestion-enabled` to `true`. #7175
* [FEATURE] Distributor: Added `-api.skip-label-count-validation-header-enabled` option to allow skipping label count validation on the HTTP write path based on `X-Mimir-SkipLabelCountValidation` header being `true` or not. #9576
* [FEATURE] Ruler: Add experimental support for caching the contents of rule groups. This is disabled by default and can be enabled by setting `-ruler-storage.cache.rule-group-enabled`. #9595
* [FEATURE] Ruler: Add experimental support for caching the contents of rule groups. This is disabled by default and can be enabled by setting `-ruler-storage.cache.rule-group-enabled`. #9595 #9968
* [FEATURE] PromQL: Add experimental `info` function. Experimental functions are disabled by default, but can be enabled setting `-querier.promql-experimental-functions-enabled=true` in the query-frontend and querier. #9879
* [FEATURE] Distributor: Support promotion of OTel resource attributes to labels. #8271
* [ENHANCEMENT] Query Frontend: Return server-side `bytes_processed` statistics following Server-Timing format. #9645 #9985
Expand Down
124 changes: 80 additions & 44 deletions pkg/storage/tsdb/bucketcache/caching_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package bucketcache
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"io"
"strconv"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/thanos-io/objstore"
"golang.org/x/crypto/blake2b"
"golang.org/x/sync/errgroup"

"github.com/grafana/mimir/pkg/util/pool"
Expand Down Expand Up @@ -165,20 +167,22 @@ func NewCachingBucket(bucketID string, bucketClient objstore.Bucket, cfg *Cachin
}

func (cb *CachingBucket) Upload(ctx context.Context, name string, r io.Reader) error {
cb.invalidation.start(ctx, name)
keyGen := newCacheKeyBuilder(cb.bucketID, name)
cb.invalidation.start(ctx, name, keyGen)
err := cb.Bucket.Upload(ctx, name, r)
if err == nil {
cb.invalidation.finish(ctx, name)
cb.invalidation.finish(ctx, name, keyGen)
}

return err
}

func (cb *CachingBucket) Delete(ctx context.Context, name string) error {
cb.invalidation.start(ctx, name)
keyGen := newCacheKeyBuilder(cb.bucketID, name)
cb.invalidation.start(ctx, name, keyGen)
err := cb.Bucket.Delete(ctx, name)
if err == nil {
cb.invalidation.finish(ctx, name)
cb.invalidation.finish(ctx, name, keyGen)
}

return err
Expand Down Expand Up @@ -210,7 +214,8 @@ func (cb *CachingBucket) Iter(ctx context.Context, dir string, f func(string) er
return cb.Bucket.Iter(ctx, dir, f, options...)
}

key := cachingKeyIter(cb.bucketID, dir, options...)
keyGen := newCacheKeyBuilder(cb.bucketID, dir)
key := keyGen.iter(options...)

// Lookup the cache.
if isCacheLookupEnabled(ctx) {
Expand Down Expand Up @@ -259,8 +264,9 @@ func (cb *CachingBucket) Exists(ctx context.Context, name string) (bool, error)
return cb.Bucket.Exists(ctx, name)
}

key := cachingKeyExists(cb.bucketID, name)
lockKey := cachingKeyExistsLock(cb.bucketID, name)
keyGen := newCacheKeyBuilder(cb.bucketID, name)
key := keyGen.exists()
lockKey := keyGen.existsLock()

// Lookup the cache.
if isCacheLookupEnabled(ctx) {
Expand Down Expand Up @@ -308,10 +314,11 @@ func (cb *CachingBucket) Get(ctx context.Context, name string) (io.ReadCloser, e
return cb.Bucket.Get(ctx, name)
}

contentLockKey := cachingKeyContentLock(cb.bucketID, name)
contentKey := cachingKeyContent(cb.bucketID, name)
existsLockKey := cachingKeyExistsLock(cb.bucketID, name)
existsKey := cachingKeyExists(cb.bucketID, name)
keyGen := newCacheKeyBuilder(cb.bucketID, name)
contentLockKey := keyGen.contentLock()
contentKey := keyGen.content()
existsLockKey := keyGen.existsLock()
existsKey := keyGen.exists()

// Lookup the cache.
if isCacheLookupEnabled(ctx) {
Expand Down Expand Up @@ -393,7 +400,8 @@ func (cb *CachingBucket) GetRange(ctx context.Context, name string, off, length
return cb.Bucket.GetRange(ctx, name, off, length)
}

return cb.cachedGetRange(ctx, name, off, length, cfgName, cfg)
keyGen := newCacheKeyBuilder(cb.bucketID, name)
return cb.cachedGetRange(ctx, name, keyGen, off, length, cfgName, cfg)
}

func (cb *CachingBucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
Expand All @@ -402,12 +410,13 @@ func (cb *CachingBucket) Attributes(ctx context.Context, name string) (objstore.
return cb.Bucket.Attributes(ctx, name)
}

return cb.cachedAttributes(ctx, name, cfgName, cfg.cache, cfg.ttl)
keyGen := newCacheKeyBuilder(cb.bucketID, name)
return cb.cachedAttributes(ctx, name, keyGen, cfgName, cfg.cache, cfg.ttl)
}

func (cb *CachingBucket) cachedAttributes(ctx context.Context, name, cfgName string, cache cache.Cache, ttl time.Duration) (objstore.ObjectAttributes, error) {
lockKey := cachingKeyAttributesLock(cb.bucketID, name)
key := cachingKeyAttributes(cb.bucketID, name)
func (cb *CachingBucket) cachedAttributes(ctx context.Context, name string, keyGen cacheKeyBuilder, cfgName string, cache cache.Cache, ttl time.Duration) (objstore.ObjectAttributes, error) {
lockKey := keyGen.attributesLock()
key := keyGen.attributes()

// Lookup the cache.
if isCacheLookupEnabled(ctx) {
Expand Down Expand Up @@ -445,8 +454,8 @@ func (cb *CachingBucket) cachedAttributes(ctx context.Context, name, cfgName str
return attrs, nil
}

func (cb *CachingBucket) cachedGetRange(ctx context.Context, name string, offset, length int64, cfgName string, cfg *getRangeConfig) (io.ReadCloser, error) {
attrs, err := cb.cachedAttributes(ctx, name, cfgName, cfg.attributes.cache, cfg.attributes.ttl)
func (cb *CachingBucket) cachedGetRange(ctx context.Context, name string, keyGen cacheKeyBuilder, offset, length int64, cfgName string, cfg *getRangeConfig) (io.ReadCloser, error) {
attrs, err := cb.cachedAttributes(ctx, name, keyGen, cfgName, cfg.attributes.cache, cfg.attributes.ttl)
if err != nil {
return nil, errors.Wrapf(err, "failed to get object attributes: %s", name)
}
Expand Down Expand Up @@ -484,7 +493,7 @@ func (cb *CachingBucket) cachedGetRange(ctx context.Context, name string, offset
}
totalRequestedBytes += (end - off)

k := cachingKeyObjectSubrange(cb.bucketID, name, off, end)
k := keyGen.objectSubrange(off, end)
keys = append(keys, k)
offsetKeys[off] = k
}
Expand Down Expand Up @@ -663,7 +672,7 @@ func newCacheInvalidation(bucketID string, cfg *CachingBucketConfig, logger log.
// prevent new cache entries for that item from being stored. This ensures that when the
// cache entries for the item are deleted after it is mutated, reads which try to "add"
// the lock key cannot and will go directly to object storage for a short period of time.
func (i *cacheInvalidation) start(ctx context.Context, name string) {
func (i *cacheInvalidation) start(ctx context.Context, name string, keyGen cacheKeyBuilder) {
logger := spanlogger.FromContext(ctx, i.logger)

_, attrCfg := i.cfg.findAttributesConfig(name)
Expand All @@ -673,9 +682,9 @@ func (i *cacheInvalidation) start(ctx context.Context, name string) {
existCfg = &getCfg.existsConfig
}

attrLockKey := cachingKeyAttributesLock(i.bucketID, name)
contentLockKey := cachingKeyContentLock(i.bucketID, name)
existsLockKey := cachingKeyExistsLock(i.bucketID, name)
attrLockKey := keyGen.attributesLock()
contentLockKey := keyGen.contentLock()
existsLockKey := keyGen.existsLock()

if attrCfg != nil || getCfg != nil || existCfg != nil {
err := i.runWithRetries(ctx, func() error {
Expand Down Expand Up @@ -703,7 +712,7 @@ func (i *cacheInvalidation) start(ctx context.Context, name string) {
// finish removes attribute, existence, and content entries in a cache associated with
// a given item. Note that it does not remove the "lock" entries in the cache to ensure
// that other requests must read directly from object storage until the lock expires.
func (i *cacheInvalidation) finish(ctx context.Context, name string) {
func (i *cacheInvalidation) finish(ctx context.Context, name string, keyGen cacheKeyBuilder) {
logger := spanlogger.FromContext(ctx, i.logger)

_, attrCfg := i.cfg.findAttributesConfig(name)
Expand All @@ -713,9 +722,9 @@ func (i *cacheInvalidation) finish(ctx context.Context, name string) {
existCfg = &getCfg.existsConfig
}

attrKey := cachingKeyAttributes(i.bucketID, name)
contentKey := cachingKeyContent(i.bucketID, name)
existsKey := cachingKeyExists(i.bucketID, name)
attrKey := keyGen.attributes()
contentKey := keyGen.content()
existsKey := keyGen.exists()

if attrCfg != nil || getCfg != nil || existCfg != nil {
err := i.runWithRetries(ctx, func() error {
Expand Down Expand Up @@ -772,42 +781,69 @@ func (i *cacheInvalidation) runWithRetries(ctx context.Context, f func() error)
return retry.Err()
}

func cachingKeyAttributes(bucketID, name string) string {
return composeCachingKey("attrs", bucketID, name)
// cacheKeyBuilder generates cache keys for the results of different operations that
// can be performed on a particular object in object storage, hashing the name if needed
// to ensure we don't exceed key length limits of the underlying cache.
type cacheKeyBuilder struct {
bucketID string
name string
}

func newCacheKeyBuilder(bucketID, name string) cacheKeyBuilder {
return cacheKeyBuilder{
bucketID: bucketID,
name: maybeHashName(name),
}
}

func maybeHashName(name string) string {
// We hash object names to avoid hitting cache key length limits. If the object name
// is shorter than the hashed version would be, skip hashing it since it provides no
// value and the original name is more useful when debugging.
if len(name) < base64.RawURLEncoding.EncodedLen(blake2b.Size256) {
return name
}

sum := blake2b.Sum256([]byte(name))
return base64.RawURLEncoding.EncodeToString(sum[:blake2b.Size256])
}

func (b cacheKeyBuilder) attributes() string {
return composeCachingKey("attrs", b.bucketID, b.name)
}

func cachingKeyAttributesLock(bucketID, name string) string {
return composeCachingKey("attrs", bucketID, name, "lock")
func (b cacheKeyBuilder) attributesLock() string {
return composeCachingKey("attrs", b.bucketID, b.name, "lock")
}

func cachingKeyObjectSubrange(bucketID, name string, start, end int64) string {
return composeCachingKey("subrange", bucketID, name, strconv.FormatInt(start, 10), strconv.FormatInt(end, 10))
func (b cacheKeyBuilder) objectSubrange(start, end int64) string {
return composeCachingKey("subrange", b.bucketID, b.name, strconv.FormatInt(start, 10), strconv.FormatInt(end, 10))
}

func cachingKeyIter(bucketID, name string, options ...objstore.IterOption) string {
func (b cacheKeyBuilder) iter(options ...objstore.IterOption) string {
// Ensure the caching key is different for the same request but different
// recursive config.
if params := objstore.ApplyIterOptions(options...); params.Recursive {
return composeCachingKey("iter", bucketID, name, "recursive")
return composeCachingKey("iter", b.bucketID, b.name, "recursive")
}

return composeCachingKey("iter", bucketID, name)
return composeCachingKey("iter", b.bucketID, b.name)
}

func cachingKeyExists(bucketID, name string) string {
return composeCachingKey("exists", bucketID, name)
func (b cacheKeyBuilder) exists() string {
return composeCachingKey("exists", b.bucketID, b.name)
}

func cachingKeyExistsLock(bucketID, name string) string {
return composeCachingKey("exists", bucketID, name, "lock")
func (b cacheKeyBuilder) existsLock() string {
return composeCachingKey("exists", b.bucketID, b.name, "lock")
}

func cachingKeyContent(bucketID, name string) string {
return composeCachingKey("content", bucketID, name)
func (b cacheKeyBuilder) content() string {
return composeCachingKey("content", b.bucketID, b.name)
}

func cachingKeyContentLock(bucketID, name string) string {
return composeCachingKey("content", bucketID, name, "lock")
func (b cacheKeyBuilder) contentLock() string {
return composeCachingKey("content", b.bucketID, b.name, "lock")
}

func composeCachingKey(op, bucketID string, values ...string) string {
Expand Down
Loading

0 comments on commit e4de319

Please sign in to comment.