Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hash object names used as cache keys for CachingBucket #9968

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
* [FEATURE] Alertmanager: limit added for maximum size of the Grafana configuration (`-alertmanager.max-config-size-bytes`). #9402
* [FEATURE] Ingester: Experimental support for ingesting out-of-order native histograms. This is disabled by default and can be enabled by setting `-ingester.ooo-native-histograms-ingestion-enabled` to `true`. #7175
* [FEATURE] Distributor: Added `-api.skip-label-count-validation-header-enabled` option to allow skipping label count validation on the HTTP write path based on `X-Mimir-SkipLabelCountValidation` header being `true` or not. #9576
* [FEATURE] Ruler: Add experimental support for caching the contents of rule groups. This is disabled by default and can be enabled by setting `-ruler-storage.cache.rule-group-enabled`. #9595
* [FEATURE] Ruler: Add experimental support for caching the contents of rule groups. This is disabled by default and can be enabled by setting `-ruler-storage.cache.rule-group-enabled`. #9595 #9968
* [FEATURE] PromQL: Add experimental `info` function. Experimental functions are disabled by default, but can be enabled setting `-querier.promql-experimental-functions-enabled=true` in the query-frontend and querier. #9879
* [FEATURE] Distributor: Support promotion of OTel resource attributes to labels. #8271
* [ENHANCEMENT] mimirtool: Adds bearer token support for mimirtool's analyze ruler/prometheus commands. #9587
Expand Down
67 changes: 41 additions & 26 deletions pkg/storage/tsdb/bucketcache/caching_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ package bucketcache
import (
"bytes"
"context"
"crypto"
"encoding/hex"
"encoding/json"
"io"
"strconv"
Expand Down Expand Up @@ -165,20 +167,22 @@ func NewCachingBucket(bucketID string, bucketClient objstore.Bucket, cfg *Cachin
}

func (cb *CachingBucket) Upload(ctx context.Context, name string, r io.Reader) error {
cb.invalidation.start(ctx, name)
hashedName := cachingKeyHash(name)
cb.invalidation.start(ctx, name, hashedName)
err := cb.Bucket.Upload(ctx, name, r)
if err == nil {
cb.invalidation.finish(ctx, name)
cb.invalidation.finish(ctx, name, hashedName)
}

return err
}

func (cb *CachingBucket) Delete(ctx context.Context, name string) error {
cb.invalidation.start(ctx, name)
hashedName := cachingKeyHash(name)
cb.invalidation.start(ctx, name, hashedName)
err := cb.Bucket.Delete(ctx, name)
if err == nil {
cb.invalidation.finish(ctx, name)
cb.invalidation.finish(ctx, name, hashedName)
}

return err
Expand Down Expand Up @@ -259,8 +263,9 @@ func (cb *CachingBucket) Exists(ctx context.Context, name string) (bool, error)
return cb.Bucket.Exists(ctx, name)
}

key := cachingKeyExists(cb.bucketID, name)
lockKey := cachingKeyExistsLock(cb.bucketID, name)
hashedName := cachingKeyHash(name)
key := cachingKeyExists(cb.bucketID, hashedName)
lockKey := cachingKeyExistsLock(cb.bucketID, hashedName)

// Lookup the cache.
if isCacheLookupEnabled(ctx) {
Expand Down Expand Up @@ -308,10 +313,11 @@ func (cb *CachingBucket) Get(ctx context.Context, name string) (io.ReadCloser, e
return cb.Bucket.Get(ctx, name)
}

contentLockKey := cachingKeyContentLock(cb.bucketID, name)
contentKey := cachingKeyContent(cb.bucketID, name)
existsLockKey := cachingKeyExistsLock(cb.bucketID, name)
existsKey := cachingKeyExists(cb.bucketID, name)
hashedName := cachingKeyHash(name)
contentLockKey := cachingKeyContentLock(cb.bucketID, hashedName)
contentKey := cachingKeyContent(cb.bucketID, hashedName)
existsLockKey := cachingKeyExistsLock(cb.bucketID, hashedName)
existsKey := cachingKeyExists(cb.bucketID, hashedName)

// Lookup the cache.
if isCacheLookupEnabled(ctx) {
Expand Down Expand Up @@ -393,7 +399,8 @@ func (cb *CachingBucket) GetRange(ctx context.Context, name string, off, length
return cb.Bucket.GetRange(ctx, name, off, length)
}

return cb.cachedGetRange(ctx, name, off, length, cfgName, cfg)
hashedName := cachingKeyHash(name)
return cb.cachedGetRange(ctx, name, hashedName, off, length, cfgName, cfg)
}

func (cb *CachingBucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
Expand All @@ -402,12 +409,13 @@ func (cb *CachingBucket) Attributes(ctx context.Context, name string) (objstore.
return cb.Bucket.Attributes(ctx, name)
}

return cb.cachedAttributes(ctx, name, cfgName, cfg.cache, cfg.ttl)
hashedName := cachingKeyHash(name)
return cb.cachedAttributes(ctx, name, hashedName, cfgName, cfg.cache, cfg.ttl)
}

func (cb *CachingBucket) cachedAttributes(ctx context.Context, name, cfgName string, cache cache.Cache, ttl time.Duration) (objstore.ObjectAttributes, error) {
lockKey := cachingKeyAttributesLock(cb.bucketID, name)
key := cachingKeyAttributes(cb.bucketID, name)
func (cb *CachingBucket) cachedAttributes(ctx context.Context, name, hashedName, cfgName string, cache cache.Cache, ttl time.Duration) (objstore.ObjectAttributes, error) {
lockKey := cachingKeyAttributesLock(cb.bucketID, hashedName)
key := cachingKeyAttributes(cb.bucketID, hashedName)

// Lookup the cache.
if isCacheLookupEnabled(ctx) {
Expand Down Expand Up @@ -445,8 +453,8 @@ func (cb *CachingBucket) cachedAttributes(ctx context.Context, name, cfgName str
return attrs, nil
}

func (cb *CachingBucket) cachedGetRange(ctx context.Context, name string, offset, length int64, cfgName string, cfg *getRangeConfig) (io.ReadCloser, error) {
attrs, err := cb.cachedAttributes(ctx, name, cfgName, cfg.attributes.cache, cfg.attributes.ttl)
func (cb *CachingBucket) cachedGetRange(ctx context.Context, name, hashedName string, offset, length int64, cfgName string, cfg *getRangeConfig) (io.ReadCloser, error) {
attrs, err := cb.cachedAttributes(ctx, name, hashedName, cfgName, cfg.attributes.cache, cfg.attributes.ttl)
if err != nil {
return nil, errors.Wrapf(err, "failed to get object attributes: %s", name)
}
Expand Down Expand Up @@ -484,7 +492,7 @@ func (cb *CachingBucket) cachedGetRange(ctx context.Context, name string, offset
}
totalRequestedBytes += (end - off)

k := cachingKeyObjectSubrange(cb.bucketID, name, off, end)
k := cachingKeyObjectSubrange(cb.bucketID, hashedName, off, end)
keys = append(keys, k)
offsetKeys[off] = k
}
Expand Down Expand Up @@ -663,7 +671,7 @@ func newCacheInvalidation(bucketID string, cfg *CachingBucketConfig, logger log.
// prevent new cache entries for that item from being stored. This ensures that when the
// cache entries for the item are deleted after it is mutated, reads which try to "add"
// the lock key cannot and will go directly to object storage for a short period of time.
func (i *cacheInvalidation) start(ctx context.Context, name string) {
func (i *cacheInvalidation) start(ctx context.Context, name, hashedName string) {
logger := spanlogger.FromContext(ctx, i.logger)

_, attrCfg := i.cfg.findAttributesConfig(name)
Expand All @@ -673,9 +681,9 @@ func (i *cacheInvalidation) start(ctx context.Context, name string) {
existCfg = &getCfg.existsConfig
}

attrLockKey := cachingKeyAttributesLock(i.bucketID, name)
contentLockKey := cachingKeyContentLock(i.bucketID, name)
existsLockKey := cachingKeyExistsLock(i.bucketID, name)
attrLockKey := cachingKeyAttributesLock(i.bucketID, hashedName)
contentLockKey := cachingKeyContentLock(i.bucketID, hashedName)
existsLockKey := cachingKeyExistsLock(i.bucketID, hashedName)

if attrCfg != nil || getCfg != nil || existCfg != nil {
err := i.runWithRetries(ctx, func() error {
Expand Down Expand Up @@ -703,7 +711,7 @@ func (i *cacheInvalidation) start(ctx context.Context, name string) {
// finish removes attribute, existence, and content entries in a cache associated with
// a given item. Note that it does not remove the "lock" entries in the cache to ensure
// that other requests must read directly from object storage until the lock expires.
func (i *cacheInvalidation) finish(ctx context.Context, name string) {
func (i *cacheInvalidation) finish(ctx context.Context, name, hashedName string) {
logger := spanlogger.FromContext(ctx, i.logger)

_, attrCfg := i.cfg.findAttributesConfig(name)
Expand All @@ -713,9 +721,9 @@ func (i *cacheInvalidation) finish(ctx context.Context, name string) {
existCfg = &getCfg.existsConfig
}

attrKey := cachingKeyAttributes(i.bucketID, name)
contentKey := cachingKeyContent(i.bucketID, name)
existsKey := cachingKeyExists(i.bucketID, name)
attrKey := cachingKeyAttributes(i.bucketID, hashedName)
contentKey := cachingKeyContent(i.bucketID, hashedName)
existsKey := cachingKeyExists(i.bucketID, hashedName)

if attrCfg != nil || getCfg != nil || existCfg != nil {
err := i.runWithRetries(ctx, func() error {
Expand Down Expand Up @@ -772,6 +780,13 @@ func (i *cacheInvalidation) runWithRetries(ctx context.Context, f func() error)
return retry.Err()
}

func cachingKeyHash(name string) string {
hasher := crypto.SHA3_256.New()
_, _ = hasher.Write([]byte(name)) // This will never error.
// Hex because memcache keys must be non-whitespace non-control ASCII
return hex.EncodeToString(hasher.Sum(nil))
}

func cachingKeyAttributes(bucketID, name string) string {
return composeCachingKey("attrs", bucketID, name)
}
Expand Down
45 changes: 33 additions & 12 deletions pkg/storage/tsdb/bucketcache/caching_bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestChunksCaching(t *testing.T) {
}

name := "/test/chunks/000001"
hashedName := cachingKeyHash(name)

inmem := objstore.NewInMemBucket()
assert.NoError(t, inmem.Upload(context.Background(), name, bytes.NewReader(data)))
Expand Down Expand Up @@ -149,9 +150,9 @@ func TestChunksCaching(t *testing.T) {
init: func() {
ctx := context.Background()
// Delete first 3 subranges.
require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, name, 0*subrangeSize, 1*subrangeSize)))
require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, name, 1*subrangeSize, 2*subrangeSize)))
require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, name, 2*subrangeSize, 3*subrangeSize)))
require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, hashedName, 0*subrangeSize, 1*subrangeSize)))
require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, hashedName, 1*subrangeSize, 2*subrangeSize)))
require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, hashedName, 2*subrangeSize, 3*subrangeSize)))
},
},

Expand All @@ -167,9 +168,9 @@ func TestChunksCaching(t *testing.T) {
init: func() {
ctx := context.Background()
// Delete last 3 subranges.
require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, name, 7*subrangeSize, 8*subrangeSize)))
require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, name, 8*subrangeSize, 9*subrangeSize)))
require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, name, 9*subrangeSize, 10*subrangeSize)))
require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, hashedName, 7*subrangeSize, 8*subrangeSize)))
require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, hashedName, 8*subrangeSize, 9*subrangeSize)))
require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, hashedName, 9*subrangeSize, 10*subrangeSize)))
},
},

Expand All @@ -185,9 +186,9 @@ func TestChunksCaching(t *testing.T) {
init: func() {
ctx := context.Background()
// Delete 3 subranges in the middle.
require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, name, 3*subrangeSize, 4*subrangeSize)))
require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, name, 4*subrangeSize, 5*subrangeSize)))
require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, name, 5*subrangeSize, 6*subrangeSize)))
require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, hashedName, 3*subrangeSize, 4*subrangeSize)))
require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, hashedName, 4*subrangeSize, 5*subrangeSize)))
require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, hashedName, 5*subrangeSize, 6*subrangeSize)))
},
},

Expand All @@ -206,7 +207,7 @@ func TestChunksCaching(t *testing.T) {
if i > 0 && i%3 == 0 {
continue
}
require.NoError(t, cache.Delete(context.Background(), cachingKeyObjectSubrange(bucketID, name, i*subrangeSize, (i+1)*subrangeSize)))
require.NoError(t, cache.Delete(context.Background(), cachingKeyObjectSubrange(bucketID, hashedName, i*subrangeSize, (i+1)*subrangeSize)))
}
},
},
Expand All @@ -228,7 +229,7 @@ func TestChunksCaching(t *testing.T) {
if i == 3 || i == 5 || i == 7 {
continue
}
require.NoError(t, cache.Delete(context.Background(), cachingKeyObjectSubrange(bucketID, name, i*subrangeSize, (i+1)*subrangeSize)))
require.NoError(t, cache.Delete(context.Background(), cachingKeyObjectSubrange(bucketID, hashedName, i*subrangeSize, (i+1)*subrangeSize)))
}
},
},
Expand All @@ -249,7 +250,7 @@ func TestChunksCaching(t *testing.T) {
if i == 5 || i == 6 || i == 7 {
continue
}
require.NoError(t, cache.Delete(context.Background(), cachingKeyObjectSubrange(bucketID, name, i*subrangeSize, (i+1)*subrangeSize)))
require.NoError(t, cache.Delete(context.Background(), cachingKeyObjectSubrange(bucketID, hashedName, i*subrangeSize, (i+1)*subrangeSize)))
}
},
},
Expand Down Expand Up @@ -1011,11 +1012,21 @@ func BenchmarkCachingKey(b *testing.B) {
cachingKeyAttributes(bucketID, "/object")
},
},
"cachingKeyAttributes() hashed": {
run: func(bucketID string) {
cachingKeyAttributes(bucketID, cachingKeyHash("/object"))
},
},
"cachingKeyObjectSubrange()": {
run: func(bucketID string) {
cachingKeyObjectSubrange(bucketID, "/object", 10, 20)
},
},
"cachingKeyObjectSubrange() hashed": {
run: func(bucketID string) {
cachingKeyObjectSubrange(bucketID, cachingKeyHash("/object"), 10, 20)
},
},
"cachingKeyIter()": {
run: func(bucketID string) {
cachingKeyIter(bucketID, "/dir")
Expand All @@ -1031,11 +1042,21 @@ func BenchmarkCachingKey(b *testing.B) {
cachingKeyExists(bucketID, "/object")
},
},
"cachingKeyExists() hashed": {
run: func(bucketID string) {
cachingKeyExists(bucketID, cachingKeyHash("/object"))
},
},
"cachingKeyContent()": {
run: func(bucketID string) {
cachingKeyContent(bucketID, "/object")
},
},
"cachingKeyContent() hashed": {
run: func(bucketID string) {
cachingKeyContent(bucketID, cachingKeyHash("/object"))
},
},
}

for testName, testData := range tests {
Expand Down
Loading