Skip to content

Commit

Permalink
Hash object names used as cache keys for CachingBucket
Browse files Browse the repository at this point in the history
When caching rule groups, the base64 encoded name of the rule group is
used as part of the key for caching its contents. For long rule group
names, this can exceed the max key length of Memcached. To solve this
we use the sha256 of the object name in the cache key instead of the
object name itself.

Related #9386
  • Loading branch information
56quarters committed Nov 20, 2024
1 parent 92ebc23 commit 29c831a
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 39 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
* [FEATURE] Alertmanager: limit added for maximum size of the Grafana configuration (`-alertmanager.max-config-size-bytes`). #9402
* [FEATURE] Ingester: Experimental support for ingesting out-of-order native histograms. This is disabled by default and can be enabled by setting `-ingester.ooo-native-histograms-ingestion-enabled` to `true`. #7175
* [FEATURE] Distributor: Added `-api.skip-label-count-validation-header-enabled` option to allow skipping label count validation on the HTTP write path based on `X-Mimir-SkipLabelCountValidation` header being `true` or not. #9576
* [FEATURE] Ruler: Add experimental support for caching the contents of rule groups. This is disabled by default and can be enabled by setting `-ruler-storage.cache.rule-group-enabled`. #9595
* [FEATURE] Ruler: Add experimental support for caching the contents of rule groups. This is disabled by default and can be enabled by setting `-ruler-storage.cache.rule-group-enabled`. #9595 #9968
* [FEATURE] PromQL: Add experimental `info` function. Experimental functions are disabled by default, but can be enabled setting `-querier.promql-experimental-functions-enabled=true` in the query-frontend and querier. #9879
* [ENHANCEMENT] mimirtool: Adds bearer token support for mimirtool's analyze ruler/prometheus commands. #9587
* [ENHANCEMENT] Ruler: Support `exclude_alerts` parameter in `<prometheus-http-prefix>/api/v1/rules` endpoint. #9300
Expand Down
67 changes: 41 additions & 26 deletions pkg/storage/tsdb/bucketcache/caching_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ package bucketcache
import (
"bytes"
"context"
"crypto"
"encoding/hex"
"encoding/json"
"io"
"strconv"
Expand Down Expand Up @@ -165,20 +167,22 @@ func NewCachingBucket(bucketID string, bucketClient objstore.Bucket, cfg *Cachin
}

func (cb *CachingBucket) Upload(ctx context.Context, name string, r io.Reader) error {
cb.invalidation.start(ctx, name)
hashedName := cachingKeyHash(name)
cb.invalidation.start(ctx, name, hashedName)
err := cb.Bucket.Upload(ctx, name, r)
if err == nil {
cb.invalidation.finish(ctx, name)
cb.invalidation.finish(ctx, name, hashedName)
}

return err
}

func (cb *CachingBucket) Delete(ctx context.Context, name string) error {
cb.invalidation.start(ctx, name)
hashedName := cachingKeyHash(name)
cb.invalidation.start(ctx, name, hashedName)
err := cb.Bucket.Delete(ctx, name)
if err == nil {
cb.invalidation.finish(ctx, name)
cb.invalidation.finish(ctx, name, hashedName)
}

return err
Expand Down Expand Up @@ -259,8 +263,9 @@ func (cb *CachingBucket) Exists(ctx context.Context, name string) (bool, error)
return cb.Bucket.Exists(ctx, name)
}

key := cachingKeyExists(cb.bucketID, name)
lockKey := cachingKeyExistsLock(cb.bucketID, name)
hashedName := cachingKeyHash(name)
key := cachingKeyExists(cb.bucketID, hashedName)
lockKey := cachingKeyExistsLock(cb.bucketID, hashedName)

// Lookup the cache.
if isCacheLookupEnabled(ctx) {
Expand Down Expand Up @@ -308,10 +313,11 @@ func (cb *CachingBucket) Get(ctx context.Context, name string) (io.ReadCloser, e
return cb.Bucket.Get(ctx, name)
}

contentLockKey := cachingKeyContentLock(cb.bucketID, name)
contentKey := cachingKeyContent(cb.bucketID, name)
existsLockKey := cachingKeyExistsLock(cb.bucketID, name)
existsKey := cachingKeyExists(cb.bucketID, name)
hashedName := cachingKeyHash(name)
contentLockKey := cachingKeyContentLock(cb.bucketID, hashedName)
contentKey := cachingKeyContent(cb.bucketID, hashedName)
existsLockKey := cachingKeyExistsLock(cb.bucketID, hashedName)
existsKey := cachingKeyExists(cb.bucketID, hashedName)

// Lookup the cache.
if isCacheLookupEnabled(ctx) {
Expand Down Expand Up @@ -393,7 +399,8 @@ func (cb *CachingBucket) GetRange(ctx context.Context, name string, off, length
return cb.Bucket.GetRange(ctx, name, off, length)
}

return cb.cachedGetRange(ctx, name, off, length, cfgName, cfg)
hashedName := cachingKeyHash(name)
return cb.cachedGetRange(ctx, name, hashedName, off, length, cfgName, cfg)
}

func (cb *CachingBucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
Expand All @@ -402,12 +409,13 @@ func (cb *CachingBucket) Attributes(ctx context.Context, name string) (objstore.
return cb.Bucket.Attributes(ctx, name)
}

return cb.cachedAttributes(ctx, name, cfgName, cfg.cache, cfg.ttl)
hashedName := cachingKeyHash(name)
return cb.cachedAttributes(ctx, name, hashedName, cfgName, cfg.cache, cfg.ttl)
}

func (cb *CachingBucket) cachedAttributes(ctx context.Context, name, cfgName string, cache cache.Cache, ttl time.Duration) (objstore.ObjectAttributes, error) {
lockKey := cachingKeyAttributesLock(cb.bucketID, name)
key := cachingKeyAttributes(cb.bucketID, name)
func (cb *CachingBucket) cachedAttributes(ctx context.Context, name, hashedName, cfgName string, cache cache.Cache, ttl time.Duration) (objstore.ObjectAttributes, error) {
lockKey := cachingKeyAttributesLock(cb.bucketID, hashedName)
key := cachingKeyAttributes(cb.bucketID, hashedName)

// Lookup the cache.
if isCacheLookupEnabled(ctx) {
Expand Down Expand Up @@ -445,8 +453,8 @@ func (cb *CachingBucket) cachedAttributes(ctx context.Context, name, cfgName str
return attrs, nil
}

func (cb *CachingBucket) cachedGetRange(ctx context.Context, name string, offset, length int64, cfgName string, cfg *getRangeConfig) (io.ReadCloser, error) {
attrs, err := cb.cachedAttributes(ctx, name, cfgName, cfg.attributes.cache, cfg.attributes.ttl)
func (cb *CachingBucket) cachedGetRange(ctx context.Context, name, hashedName string, offset, length int64, cfgName string, cfg *getRangeConfig) (io.ReadCloser, error) {
attrs, err := cb.cachedAttributes(ctx, name, hashedName, cfgName, cfg.attributes.cache, cfg.attributes.ttl)
if err != nil {
return nil, errors.Wrapf(err, "failed to get object attributes: %s", name)
}
Expand Down Expand Up @@ -484,7 +492,7 @@ func (cb *CachingBucket) cachedGetRange(ctx context.Context, name string, offset
}
totalRequestedBytes += (end - off)

k := cachingKeyObjectSubrange(cb.bucketID, name, off, end)
k := cachingKeyObjectSubrange(cb.bucketID, hashedName, off, end)
keys = append(keys, k)
offsetKeys[off] = k
}
Expand Down Expand Up @@ -663,7 +671,7 @@ func newCacheInvalidation(bucketID string, cfg *CachingBucketConfig, logger log.
// prevent new cache entries for that item from being stored. This ensures that when the
// cache entries for the item are deleted after it is mutated, reads which try to "add"
// the lock key cannot and will go directly to object storage for a short period of time.
func (i *cacheInvalidation) start(ctx context.Context, name string) {
func (i *cacheInvalidation) start(ctx context.Context, name, hashedName string) {
logger := spanlogger.FromContext(ctx, i.logger)

_, attrCfg := i.cfg.findAttributesConfig(name)
Expand All @@ -673,9 +681,9 @@ func (i *cacheInvalidation) start(ctx context.Context, name string) {
existCfg = &getCfg.existsConfig
}

attrLockKey := cachingKeyAttributesLock(i.bucketID, name)
contentLockKey := cachingKeyContentLock(i.bucketID, name)
existsLockKey := cachingKeyExistsLock(i.bucketID, name)
attrLockKey := cachingKeyAttributesLock(i.bucketID, hashedName)
contentLockKey := cachingKeyContentLock(i.bucketID, hashedName)
existsLockKey := cachingKeyExistsLock(i.bucketID, hashedName)

if attrCfg != nil || getCfg != nil || existCfg != nil {
err := i.runWithRetries(ctx, func() error {
Expand Down Expand Up @@ -703,7 +711,7 @@ func (i *cacheInvalidation) start(ctx context.Context, name string) {
// finish removes attribute, existence, and content entries in a cache associated with
// a given item. Note that it does not remove the "lock" entries in the cache to ensure
// that other requests must read directly from object storage until the lock expires.
func (i *cacheInvalidation) finish(ctx context.Context, name string) {
func (i *cacheInvalidation) finish(ctx context.Context, name, hashedName string) {
logger := spanlogger.FromContext(ctx, i.logger)

_, attrCfg := i.cfg.findAttributesConfig(name)
Expand All @@ -713,9 +721,9 @@ func (i *cacheInvalidation) finish(ctx context.Context, name string) {
existCfg = &getCfg.existsConfig
}

attrKey := cachingKeyAttributes(i.bucketID, name)
contentKey := cachingKeyContent(i.bucketID, name)
existsKey := cachingKeyExists(i.bucketID, name)
attrKey := cachingKeyAttributes(i.bucketID, hashedName)
contentKey := cachingKeyContent(i.bucketID, hashedName)
existsKey := cachingKeyExists(i.bucketID, hashedName)

if attrCfg != nil || getCfg != nil || existCfg != nil {
err := i.runWithRetries(ctx, func() error {
Expand Down Expand Up @@ -772,6 +780,13 @@ func (i *cacheInvalidation) runWithRetries(ctx context.Context, f func() error)
return retry.Err()
}

func cachingKeyHash(name string) string {
hasher := crypto.SHA3_256.New()
_, _ = hasher.Write([]byte(name)) // This will never error.
// Hex because memcache keys must be non-whitespace non-control ASCII
return hex.EncodeToString(hasher.Sum(nil))
}

func cachingKeyAttributes(bucketID, name string) string {
return composeCachingKey("attrs", bucketID, name)
}
Expand Down
25 changes: 13 additions & 12 deletions pkg/storage/tsdb/bucketcache/caching_bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestChunksCaching(t *testing.T) {
}

name := "/test/chunks/000001"
hashedName := cachingKeyHash(name)

inmem := objstore.NewInMemBucket()
assert.NoError(t, inmem.Upload(context.Background(), name, bytes.NewReader(data)))
Expand Down Expand Up @@ -149,9 +150,9 @@ func TestChunksCaching(t *testing.T) {
init: func() {
ctx := context.Background()
// Delete first 3 subranges.
require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, name, 0*subrangeSize, 1*subrangeSize)))
require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, name, 1*subrangeSize, 2*subrangeSize)))
require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, name, 2*subrangeSize, 3*subrangeSize)))
require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, hashedName, 0*subrangeSize, 1*subrangeSize)))
require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, hashedName, 1*subrangeSize, 2*subrangeSize)))
require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, hashedName, 2*subrangeSize, 3*subrangeSize)))
},
},

Expand All @@ -167,9 +168,9 @@ func TestChunksCaching(t *testing.T) {
init: func() {
ctx := context.Background()
// Delete last 3 subranges.
require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, name, 7*subrangeSize, 8*subrangeSize)))
require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, name, 8*subrangeSize, 9*subrangeSize)))
require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, name, 9*subrangeSize, 10*subrangeSize)))
require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, hashedName, 7*subrangeSize, 8*subrangeSize)))
require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, hashedName, 8*subrangeSize, 9*subrangeSize)))
require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, hashedName, 9*subrangeSize, 10*subrangeSize)))
},
},

Expand All @@ -185,9 +186,9 @@ func TestChunksCaching(t *testing.T) {
init: func() {
ctx := context.Background()
// Delete 3 subranges in the middle.
require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, name, 3*subrangeSize, 4*subrangeSize)))
require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, name, 4*subrangeSize, 5*subrangeSize)))
require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, name, 5*subrangeSize, 6*subrangeSize)))
require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, hashedName, 3*subrangeSize, 4*subrangeSize)))
require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, hashedName, 4*subrangeSize, 5*subrangeSize)))
require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, hashedName, 5*subrangeSize, 6*subrangeSize)))
},
},

Expand All @@ -206,7 +207,7 @@ func TestChunksCaching(t *testing.T) {
if i > 0 && i%3 == 0 {
continue
}
require.NoError(t, cache.Delete(context.Background(), cachingKeyObjectSubrange(bucketID, name, i*subrangeSize, (i+1)*subrangeSize)))
require.NoError(t, cache.Delete(context.Background(), cachingKeyObjectSubrange(bucketID, hashedName, i*subrangeSize, (i+1)*subrangeSize)))
}
},
},
Expand All @@ -228,7 +229,7 @@ func TestChunksCaching(t *testing.T) {
if i == 3 || i == 5 || i == 7 {
continue
}
require.NoError(t, cache.Delete(context.Background(), cachingKeyObjectSubrange(bucketID, name, i*subrangeSize, (i+1)*subrangeSize)))
require.NoError(t, cache.Delete(context.Background(), cachingKeyObjectSubrange(bucketID, hashedName, i*subrangeSize, (i+1)*subrangeSize)))
}
},
},
Expand All @@ -249,7 +250,7 @@ func TestChunksCaching(t *testing.T) {
if i == 5 || i == 6 || i == 7 {
continue
}
require.NoError(t, cache.Delete(context.Background(), cachingKeyObjectSubrange(bucketID, name, i*subrangeSize, (i+1)*subrangeSize)))
require.NoError(t, cache.Delete(context.Background(), cachingKeyObjectSubrange(bucketID, hashedName, i*subrangeSize, (i+1)*subrangeSize)))
}
},
},
Expand Down

0 comments on commit 29c831a

Please sign in to comment.