diff --git a/CHANGELOG.md b/CHANGELOG.md index c92562357ca..fcd8abb378c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,7 +36,7 @@ * [FEATURE] Alertmanager: limit added for maximum size of the Grafana configuration (`-alertmanager.max-config-size-bytes`). #9402 * [FEATURE] Ingester: Experimental support for ingesting out-of-order native histograms. This is disabled by default and can be enabled by setting `-ingester.ooo-native-histograms-ingestion-enabled` to `true`. #7175 * [FEATURE] Distributor: Added `-api.skip-label-count-validation-header-enabled` option to allow skipping label count validation on the HTTP write path based on `X-Mimir-SkipLabelCountValidation` header being `true` or not. #9576 -* [FEATURE] Ruler: Add experimental support for caching the contents of rule groups. This is disabled by default and can be enabled by setting `-ruler-storage.cache.rule-group-enabled`. #9595 +* [FEATURE] Ruler: Add experimental support for caching the contents of rule groups. This is disabled by default and can be enabled by setting `-ruler-storage.cache.rule-group-enabled`. #9595 #9968 * [FEATURE] PromQL: Add experimental `info` function. Experimental functions are disabled by default, but can be enabled setting `-querier.promql-experimental-functions-enabled=true` in the query-frontend and querier. #9879 * [FEATURE] Distributor: Support promotion of OTel resource attributes to labels. #8271 * [ENHANCEMENT] mimirtool: Adds bearer token support for mimirtool's analyze ruler/prometheus commands. #9587 diff --git a/pkg/storage/tsdb/bucketcache/caching_bucket.go b/pkg/storage/tsdb/bucketcache/caching_bucket.go index 6476285e638..220de586c67 100644 --- a/pkg/storage/tsdb/bucketcache/caching_bucket.go +++ b/pkg/storage/tsdb/bucketcache/caching_bucket.go @@ -8,6 +8,8 @@ package bucketcache import ( "bytes" "context" + "crypto" + "encoding/hex" "encoding/json" "io" "strconv" @@ -165,20 +167,22 @@ func NewCachingBucket(bucketID string, bucketClient objstore.Bucket, cfg *Cachin } func (cb *CachingBucket) Upload(ctx context.Context, name string, r io.Reader) error { - cb.invalidation.start(ctx, name) + hashedName := cachingKeyHash(name) + cb.invalidation.start(ctx, name, hashedName) err := cb.Bucket.Upload(ctx, name, r) if err == nil { - cb.invalidation.finish(ctx, name) + cb.invalidation.finish(ctx, name, hashedName) } return err } func (cb *CachingBucket) Delete(ctx context.Context, name string) error { - cb.invalidation.start(ctx, name) + hashedName := cachingKeyHash(name) + cb.invalidation.start(ctx, name, hashedName) err := cb.Bucket.Delete(ctx, name) if err == nil { - cb.invalidation.finish(ctx, name) + cb.invalidation.finish(ctx, name, hashedName) } return err @@ -259,8 +263,9 @@ func (cb *CachingBucket) Exists(ctx context.Context, name string) (bool, error) return cb.Bucket.Exists(ctx, name) } - key := cachingKeyExists(cb.bucketID, name) - lockKey := cachingKeyExistsLock(cb.bucketID, name) + hashedName := cachingKeyHash(name) + key := cachingKeyExists(cb.bucketID, hashedName) + lockKey := cachingKeyExistsLock(cb.bucketID, hashedName) // Lookup the cache. if isCacheLookupEnabled(ctx) { @@ -308,10 +313,11 @@ func (cb *CachingBucket) Get(ctx context.Context, name string) (io.ReadCloser, e return cb.Bucket.Get(ctx, name) } - contentLockKey := cachingKeyContentLock(cb.bucketID, name) - contentKey := cachingKeyContent(cb.bucketID, name) - existsLockKey := cachingKeyExistsLock(cb.bucketID, name) - existsKey := cachingKeyExists(cb.bucketID, name) + hashedName := cachingKeyHash(name) + contentLockKey := cachingKeyContentLock(cb.bucketID, hashedName) + contentKey := cachingKeyContent(cb.bucketID, hashedName) + existsLockKey := cachingKeyExistsLock(cb.bucketID, hashedName) + existsKey := cachingKeyExists(cb.bucketID, hashedName) // Lookup the cache. if isCacheLookupEnabled(ctx) { @@ -393,7 +399,8 @@ func (cb *CachingBucket) GetRange(ctx context.Context, name string, off, length return cb.Bucket.GetRange(ctx, name, off, length) } - return cb.cachedGetRange(ctx, name, off, length, cfgName, cfg) + hashedName := cachingKeyHash(name) + return cb.cachedGetRange(ctx, name, hashedName, off, length, cfgName, cfg) } func (cb *CachingBucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { @@ -402,12 +409,13 @@ func (cb *CachingBucket) Attributes(ctx context.Context, name string) (objstore. return cb.Bucket.Attributes(ctx, name) } - return cb.cachedAttributes(ctx, name, cfgName, cfg.cache, cfg.ttl) + hashedName := cachingKeyHash(name) + return cb.cachedAttributes(ctx, name, hashedName, cfgName, cfg.cache, cfg.ttl) } -func (cb *CachingBucket) cachedAttributes(ctx context.Context, name, cfgName string, cache cache.Cache, ttl time.Duration) (objstore.ObjectAttributes, error) { - lockKey := cachingKeyAttributesLock(cb.bucketID, name) - key := cachingKeyAttributes(cb.bucketID, name) +func (cb *CachingBucket) cachedAttributes(ctx context.Context, name, hashedName, cfgName string, cache cache.Cache, ttl time.Duration) (objstore.ObjectAttributes, error) { + lockKey := cachingKeyAttributesLock(cb.bucketID, hashedName) + key := cachingKeyAttributes(cb.bucketID, hashedName) // Lookup the cache. if isCacheLookupEnabled(ctx) { @@ -445,8 +453,8 @@ func (cb *CachingBucket) cachedAttributes(ctx context.Context, name, cfgName str return attrs, nil } -func (cb *CachingBucket) cachedGetRange(ctx context.Context, name string, offset, length int64, cfgName string, cfg *getRangeConfig) (io.ReadCloser, error) { - attrs, err := cb.cachedAttributes(ctx, name, cfgName, cfg.attributes.cache, cfg.attributes.ttl) +func (cb *CachingBucket) cachedGetRange(ctx context.Context, name, hashedName string, offset, length int64, cfgName string, cfg *getRangeConfig) (io.ReadCloser, error) { + attrs, err := cb.cachedAttributes(ctx, name, hashedName, cfgName, cfg.attributes.cache, cfg.attributes.ttl) if err != nil { return nil, errors.Wrapf(err, "failed to get object attributes: %s", name) } @@ -484,7 +492,7 @@ func (cb *CachingBucket) cachedGetRange(ctx context.Context, name string, offset } totalRequestedBytes += (end - off) - k := cachingKeyObjectSubrange(cb.bucketID, name, off, end) + k := cachingKeyObjectSubrange(cb.bucketID, hashedName, off, end) keys = append(keys, k) offsetKeys[off] = k } @@ -663,7 +671,7 @@ func newCacheInvalidation(bucketID string, cfg *CachingBucketConfig, logger log. // prevent new cache entries for that item from being stored. This ensures that when the // cache entries for the item are deleted after it is mutated, reads which try to "add" // the lock key cannot and will go directly to object storage for a short period of time. -func (i *cacheInvalidation) start(ctx context.Context, name string) { +func (i *cacheInvalidation) start(ctx context.Context, name, hashedName string) { logger := spanlogger.FromContext(ctx, i.logger) _, attrCfg := i.cfg.findAttributesConfig(name) @@ -673,9 +681,9 @@ func (i *cacheInvalidation) start(ctx context.Context, name string) { existCfg = &getCfg.existsConfig } - attrLockKey := cachingKeyAttributesLock(i.bucketID, name) - contentLockKey := cachingKeyContentLock(i.bucketID, name) - existsLockKey := cachingKeyExistsLock(i.bucketID, name) + attrLockKey := cachingKeyAttributesLock(i.bucketID, hashedName) + contentLockKey := cachingKeyContentLock(i.bucketID, hashedName) + existsLockKey := cachingKeyExistsLock(i.bucketID, hashedName) if attrCfg != nil || getCfg != nil || existCfg != nil { err := i.runWithRetries(ctx, func() error { @@ -703,7 +711,7 @@ func (i *cacheInvalidation) start(ctx context.Context, name string) { // finish removes attribute, existence, and content entries in a cache associated with // a given item. Note that it does not remove the "lock" entries in the cache to ensure // that other requests must read directly from object storage until the lock expires. -func (i *cacheInvalidation) finish(ctx context.Context, name string) { +func (i *cacheInvalidation) finish(ctx context.Context, name, hashedName string) { logger := spanlogger.FromContext(ctx, i.logger) _, attrCfg := i.cfg.findAttributesConfig(name) @@ -713,9 +721,9 @@ func (i *cacheInvalidation) finish(ctx context.Context, name string) { existCfg = &getCfg.existsConfig } - attrKey := cachingKeyAttributes(i.bucketID, name) - contentKey := cachingKeyContent(i.bucketID, name) - existsKey := cachingKeyExists(i.bucketID, name) + attrKey := cachingKeyAttributes(i.bucketID, hashedName) + contentKey := cachingKeyContent(i.bucketID, hashedName) + existsKey := cachingKeyExists(i.bucketID, hashedName) if attrCfg != nil || getCfg != nil || existCfg != nil { err := i.runWithRetries(ctx, func() error { @@ -772,6 +780,13 @@ func (i *cacheInvalidation) runWithRetries(ctx context.Context, f func() error) return retry.Err() } +func cachingKeyHash(name string) string { + hasher := crypto.SHA3_256.New() + _, _ = hasher.Write([]byte(name)) // This will never error. + // Hex because memcache keys must be non-whitespace non-control ASCII + return hex.EncodeToString(hasher.Sum(nil)) +} + func cachingKeyAttributes(bucketID, name string) string { return composeCachingKey("attrs", bucketID, name) } diff --git a/pkg/storage/tsdb/bucketcache/caching_bucket_test.go b/pkg/storage/tsdb/bucketcache/caching_bucket_test.go index df2400d8700..2fa1b485daa 100644 --- a/pkg/storage/tsdb/bucketcache/caching_bucket_test.go +++ b/pkg/storage/tsdb/bucketcache/caching_bucket_test.go @@ -40,6 +40,7 @@ func TestChunksCaching(t *testing.T) { } name := "/test/chunks/000001" + hashedName := cachingKeyHash(name) inmem := objstore.NewInMemBucket() assert.NoError(t, inmem.Upload(context.Background(), name, bytes.NewReader(data))) @@ -149,9 +150,9 @@ func TestChunksCaching(t *testing.T) { init: func() { ctx := context.Background() // Delete first 3 subranges. - require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, name, 0*subrangeSize, 1*subrangeSize))) - require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, name, 1*subrangeSize, 2*subrangeSize))) - require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, name, 2*subrangeSize, 3*subrangeSize))) + require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, hashedName, 0*subrangeSize, 1*subrangeSize))) + require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, hashedName, 1*subrangeSize, 2*subrangeSize))) + require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, hashedName, 2*subrangeSize, 3*subrangeSize))) }, }, @@ -167,9 +168,9 @@ func TestChunksCaching(t *testing.T) { init: func() { ctx := context.Background() // Delete last 3 subranges. - require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, name, 7*subrangeSize, 8*subrangeSize))) - require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, name, 8*subrangeSize, 9*subrangeSize))) - require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, name, 9*subrangeSize, 10*subrangeSize))) + require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, hashedName, 7*subrangeSize, 8*subrangeSize))) + require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, hashedName, 8*subrangeSize, 9*subrangeSize))) + require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, hashedName, 9*subrangeSize, 10*subrangeSize))) }, }, @@ -185,9 +186,9 @@ func TestChunksCaching(t *testing.T) { init: func() { ctx := context.Background() // Delete 3 subranges in the middle. - require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, name, 3*subrangeSize, 4*subrangeSize))) - require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, name, 4*subrangeSize, 5*subrangeSize))) - require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, name, 5*subrangeSize, 6*subrangeSize))) + require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, hashedName, 3*subrangeSize, 4*subrangeSize))) + require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, hashedName, 4*subrangeSize, 5*subrangeSize))) + require.NoError(t, cache.Delete(ctx, cachingKeyObjectSubrange(bucketID, hashedName, 5*subrangeSize, 6*subrangeSize))) }, }, @@ -206,7 +207,7 @@ func TestChunksCaching(t *testing.T) { if i > 0 && i%3 == 0 { continue } - require.NoError(t, cache.Delete(context.Background(), cachingKeyObjectSubrange(bucketID, name, i*subrangeSize, (i+1)*subrangeSize))) + require.NoError(t, cache.Delete(context.Background(), cachingKeyObjectSubrange(bucketID, hashedName, i*subrangeSize, (i+1)*subrangeSize))) } }, }, @@ -228,7 +229,7 @@ func TestChunksCaching(t *testing.T) { if i == 3 || i == 5 || i == 7 { continue } - require.NoError(t, cache.Delete(context.Background(), cachingKeyObjectSubrange(bucketID, name, i*subrangeSize, (i+1)*subrangeSize))) + require.NoError(t, cache.Delete(context.Background(), cachingKeyObjectSubrange(bucketID, hashedName, i*subrangeSize, (i+1)*subrangeSize))) } }, }, @@ -249,7 +250,7 @@ func TestChunksCaching(t *testing.T) { if i == 5 || i == 6 || i == 7 { continue } - require.NoError(t, cache.Delete(context.Background(), cachingKeyObjectSubrange(bucketID, name, i*subrangeSize, (i+1)*subrangeSize))) + require.NoError(t, cache.Delete(context.Background(), cachingKeyObjectSubrange(bucketID, hashedName, i*subrangeSize, (i+1)*subrangeSize))) } }, }, @@ -1011,11 +1012,21 @@ func BenchmarkCachingKey(b *testing.B) { cachingKeyAttributes(bucketID, "/object") }, }, + "cachingKeyAttributes() hashed": { + run: func(bucketID string) { + cachingKeyAttributes(bucketID, cachingKeyHash("/object")) + }, + }, "cachingKeyObjectSubrange()": { run: func(bucketID string) { cachingKeyObjectSubrange(bucketID, "/object", 10, 20) }, }, + "cachingKeyObjectSubrange() hashed": { + run: func(bucketID string) { + cachingKeyObjectSubrange(bucketID, cachingKeyHash("/object"), 10, 20) + }, + }, "cachingKeyIter()": { run: func(bucketID string) { cachingKeyIter(bucketID, "/dir") @@ -1031,11 +1042,21 @@ func BenchmarkCachingKey(b *testing.B) { cachingKeyExists(bucketID, "/object") }, }, + "cachingKeyExists() hashed": { + run: func(bucketID string) { + cachingKeyExists(bucketID, cachingKeyHash("/object")) + }, + }, "cachingKeyContent()": { run: func(bucketID string) { cachingKeyContent(bucketID, "/object") }, }, + "cachingKeyContent() hashed": { + run: func(bucketID string) { + cachingKeyContent(bucketID, cachingKeyHash("/object")) + }, + }, } for testName, testData := range tests {