From 2c8265735a1705cab6c41401a46c84ee8e5a3ec0 Mon Sep 17 00:00:00 2001 From: Nick Pillitteri <56quarters@users.noreply.github.com> Date: Fri, 11 Oct 2024 09:39:04 -0400 Subject: [PATCH] Add more robust cache invalidation to CachingBucket (#9575) * Add more robust cache invalidation to CachingBucket This change makes use of `add` operations for special "lock" cache entries to ensure that when items in object storage are mutated, stale results are not immediately stored to cache again. It does this by `set`ing "lock" cache entries with a short TTL when an item in object storage is about to be mutated. This prevents reads of the item from caching the results afterwards. After the item is mutated in object storage, its cache entries (excluding the lock entries) are deleted. After the lock entries expire, reads of the item are allowed to store results in the cache again. Part of #9386 Signed-off-by: Nick Pillitteri * Code review feedback Signed-off-by: Nick Pillitteri --------- Signed-off-by: Nick Pillitteri --- go.mod | 4 +- go.sum | 4 +- .../bucketclient/bucket_client_test.go | 58 +++-- pkg/ruler/store_mock_test.go | 2 +- .../tsdb/bucketcache/caching_bucket.go | 237 ++++++++++++++---- .../tsdb/bucketcache/caching_bucket_config.go | 22 +- .../tsdb/bucketcache/caching_bucket_test.go | 76 ++++-- pkg/storage/tsdb/caching_config.go | 8 +- vendor/github.com/grafana/dskit/cache/mock.go | 30 ++- .../dskit/ring/partition_instance_ring.go | 16 +- vendor/modules.txt | 2 +- 11 files changed, 348 insertions(+), 111 deletions(-) diff --git a/go.mod b/go.mod index 4132199f1eb..b1e21026878 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/golang/snappy v0.0.4 github.com/google/gopacket v1.1.19 github.com/gorilla/mux v1.8.1 - github.com/grafana/dskit v0.0.0-20241007163720-de20fd2fe818 + github.com/grafana/dskit v0.0.0-20241009141103-2e104a8053fa github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/json-iterator/go v1.1.12 @@ -205,7 +205,7 @@ require ( github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect github.com/googleapis/gax-go/v2 v2.13.0 // indirect github.com/gosimple/slug v1.1.1 // indirect - github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56 // indirect + github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56 github.com/hashicorp/consul/api v1.29.4 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect diff --git a/go.sum b/go.sum index 1618943aa99..a500f0c2419 100644 --- a/go.sum +++ b/go.sum @@ -1256,8 +1256,8 @@ github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc h1:PXZQA2WCxe85T github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4= github.com/grafana/alerting v0.0.0-20240926144415-27f4e81b4b6b h1:UO4mv94pG1kzKCgBKh20TXdACBCAK2vYjV3Q2MlcpEQ= github.com/grafana/alerting v0.0.0-20240926144415-27f4e81b4b6b/go.mod h1:GMLi6d09Xqo96fCVUjNk//rcjP5NKEdjOzfWIffD5r4= -github.com/grafana/dskit v0.0.0-20241007163720-de20fd2fe818 h1:VPMurXtl+ONN13d9ge1TGTJDBsJNr9Vb1pNQHpQ/aro= -github.com/grafana/dskit v0.0.0-20241007163720-de20fd2fe818/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ= +github.com/grafana/dskit v0.0.0-20241009141103-2e104a8053fa h1:jfSQq+jfs1q0cZr9n4u9g6Wz3423VJVE9grnLpx+eS4= +github.com/grafana/dskit v0.0.0-20241009141103-2e104a8053fa/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc h1:BW+LjKJDz0So5LI8UZfW5neWeKpSkWqhmGjQFzcFfLM= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc/go.mod h1:JVmqPBe8A/pZWwRoJW5ZjyALeY5OXMzPl7LrVXOdZAI= github.com/grafana/franz-go v0.0.0-20241009101240-fa97d35e871f h1:nsrRsQHfpqs6dWxErIOS3gD6R20H/9XM0ItykNtBFW8= diff --git a/pkg/ruler/rulestore/bucketclient/bucket_client_test.go b/pkg/ruler/rulestore/bucketclient/bucket_client_test.go index a1251892b5e..3219db5c5da 100644 --- a/pkg/ruler/rulestore/bucketclient/bucket_client_test.go +++ b/pkg/ruler/rulestore/bucketclient/bucket_client_test.go @@ -462,7 +462,7 @@ func TestCachingAndInvalidation(t *testing.T) { cacheCfg := bucketcache.NewCachingBucketConfig() cacheCfg.CacheIter("rule-iter", mockCache, matchAll, time.Minute, iterCodec) - cacheCfg.CacheGet("rule-groups", mockCache, matchAll, 1024^2, time.Minute, time.Minute, time.Minute, true) + cacheCfg.CacheGet("rule-groups", mockCache, matchAll, 1024^2, time.Minute, time.Minute, time.Minute) cacheClient, err := bucketcache.NewCachingBucket("rule-store", baseClient, cacheCfg, log.NewNopLogger(), prometheus.NewPedanticRegistry()) require.NoError(t, err) @@ -478,28 +478,37 @@ func TestCachingAndInvalidation(t *testing.T) { t.Run("list users with cache", func(t *testing.T) { mockCache, ruleStore := setup(t) + startStores := mockCache.CountStoreCalls() + startFetches := mockCache.CountFetchCalls() + users, err := ruleStore.ListAllUsers(context.Background()) require.NoError(t, err) require.Equal(t, []string{"user1", "user2"}, users) - require.Equal(t, 1, mockCache.CountStoreCalls()) - require.Equal(t, 1, mockCache.CountFetchCalls()) + require.Equal(t, 1, mockCache.CountStoreCalls()-startStores) + require.Equal(t, 1, mockCache.CountFetchCalls()-startFetches) }) t.Run("list users no cache", func(t *testing.T) { mockCache, rs := setup(t) + startStores := mockCache.CountStoreCalls() + startFetches := mockCache.CountFetchCalls() + users, err := rs.ListAllUsers(context.Background(), rulestore.WithCacheDisabled()) require.NoError(t, err) require.Equal(t, []string{"user1", "user2"}, users) - require.Equal(t, 1, mockCache.CountStoreCalls()) - require.Equal(t, 0, mockCache.CountFetchCalls()) + require.Equal(t, 1, mockCache.CountStoreCalls()-startStores) + require.Equal(t, 0, mockCache.CountFetchCalls()-startFetches) }) t.Run("list rule groups with cache", func(t *testing.T) { mockCache, ruleStore := setup(t) + startStores := mockCache.CountStoreCalls() + startFetches := mockCache.CountFetchCalls() + groups, err := ruleStore.ListRuleGroupsForUserAndNamespace(context.Background(), "user1", "") require.NoError(t, err) @@ -521,12 +530,15 @@ func TestCachingAndInvalidation(t *testing.T) { }, }, groups) - require.Equal(t, 1, mockCache.CountStoreCalls()) - require.Equal(t, 1, mockCache.CountFetchCalls()) + require.Equal(t, 1, mockCache.CountStoreCalls()-startStores) + require.Equal(t, 1, mockCache.CountFetchCalls()-startFetches) }) t.Run("list rule groups no cache", func(t *testing.T) { mockCache, ruleStore := setup(t) + startStores := mockCache.CountStoreCalls() + startFetches := mockCache.CountFetchCalls() + groups, err := ruleStore.ListRuleGroupsForUserAndNamespace(context.Background(), "user1", "", rulestore.WithCacheDisabled()) require.NoError(t, err) @@ -548,48 +560,54 @@ func TestCachingAndInvalidation(t *testing.T) { }, }, groups) - require.Equal(t, 1, mockCache.CountStoreCalls()) - require.Equal(t, 0, mockCache.CountFetchCalls()) + require.Equal(t, 1, mockCache.CountStoreCalls()-startStores) + require.Equal(t, 0, mockCache.CountFetchCalls()-startFetches) }) t.Run("get rule group from cache", func(t *testing.T) { mockCache, ruleStore := setup(t) + startStores := mockCache.CountStoreCalls() + startFetches := mockCache.CountFetchCalls() + group, err := ruleStore.GetRuleGroup(context.Background(), "user1", "world", "another namespace testGroup") require.NoError(t, err) require.NotNil(t, group) - require.Equal(t, 2, mockCache.CountStoreCalls()) - require.Equal(t, 1, mockCache.CountFetchCalls()) + require.Equal(t, 2, mockCache.CountStoreCalls()-startStores) // content set + exists set + require.Equal(t, 1, mockCache.CountFetchCalls()-startFetches) }) t.Run("get rule groups after invalidation", func(t *testing.T) { mockCache, ruleStore := setup(t) + startStores := mockCache.CountStoreCalls() + startFetches := mockCache.CountFetchCalls() + startDeletes := mockCache.CountDeleteCalls() + group, err := ruleStore.GetRuleGroup(context.Background(), "user1", "world", "another namespace testGroup") require.NoError(t, err) require.NotNil(t, group) require.Zero(t, group.QueryOffset) - require.Equal(t, 2, mockCache.CountStoreCalls()) - require.Equal(t, 1, mockCache.CountFetchCalls()) + require.Equal(t, 2, mockCache.CountStoreCalls()-startStores) // content set + exists set + require.Equal(t, 1, mockCache.CountFetchCalls()-startFetches) - origDeletes := mockCache.CountDeleteCalls() group.QueryOffset = 42 * time.Second require.NoError(t, ruleStore.SetRuleGroup(context.Background(), group.User, group.Namespace, group)) - require.Equal(t, 2, mockCache.CountStoreCalls()) - require.Equal(t, 1, mockCache.CountFetchCalls()) - require.Equal(t, 2, mockCache.CountDeleteCalls()-origDeletes) + require.Equal(t, 4, mockCache.CountStoreCalls()-startStores) // += content lock + exists lock + require.Equal(t, 1, mockCache.CountFetchCalls()-startFetches) + require.Equal(t, 2, mockCache.CountDeleteCalls()-startDeletes) modifiedGroup, err := ruleStore.GetRuleGroup(context.Background(), "user1", "world", "another namespace testGroup") require.NoError(t, err) require.NotNil(t, modifiedGroup) require.Equal(t, 42*time.Second, modifiedGroup.QueryOffset) - require.Equal(t, 4, mockCache.CountStoreCalls()) - require.Equal(t, 2, mockCache.CountFetchCalls()) - require.Equal(t, 2, mockCache.CountDeleteCalls()-origDeletes) + require.Equal(t, 6, mockCache.CountStoreCalls()-startStores) // += content set + exists set + require.Equal(t, 2, mockCache.CountFetchCalls()-startFetches) + require.Equal(t, 2, mockCache.CountDeleteCalls()-startDeletes) }) } diff --git a/pkg/ruler/store_mock_test.go b/pkg/ruler/store_mock_test.go index 34552ebe669..3402773b284 100644 --- a/pkg/ruler/store_mock_test.go +++ b/pkg/ruler/store_mock_test.go @@ -55,7 +55,7 @@ func newInMemoryRuleStore(t *testing.T) (*cache.InstrumentedMockCache, *bucketcl mockCache := cache.NewInstrumentedMockCache() cfg := bucketcache.NewCachingBucketConfig() cfg.CacheIter("iter", mockCache, isNotTenantsDir, time.Minute, &bucketcache.JSONIterCodec{}) - cfg.CacheGet("rules", mockCache, isRuleGroup, 1024^2, time.Minute, time.Minute, time.Minute, true) + cfg.CacheGet("rules", mockCache, isRuleGroup, 1024^2, time.Minute, time.Minute, time.Minute) cachingBkt, err := bucketcache.NewCachingBucket("rules", bkt, cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry()) require.NoError(t, err) diff --git a/pkg/storage/tsdb/bucketcache/caching_bucket.go b/pkg/storage/tsdb/bucketcache/caching_bucket.go index 3495f69d5a5..6476285e638 100644 --- a/pkg/storage/tsdb/bucketcache/caching_bucket.go +++ b/pkg/storage/tsdb/bucketcache/caching_bucket.go @@ -18,8 +18,11 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/golang/snappy" + "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/cache" + "github.com/grafana/dskit/multierror" "github.com/grafana/dskit/runutil" + "github.com/grafana/gomemcache/memcache" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -27,6 +30,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/grafana/mimir/pkg/util/pool" + "github.com/grafana/mimir/pkg/util/spanlogger" ) type contextKey int @@ -37,6 +41,8 @@ const ( memoryPoolContextKey contextKey = 0 cacheLookupEnabledContextKey contextKey = 1 + + invalidationLockTTL = 15 * time.Second ) var errObjNotFound = errors.Errorf("object not found") @@ -91,15 +97,15 @@ func getCacheOptions(slabs *pool.SafeSlabPool[byte]) []cache.Option { type CachingBucket struct { objstore.Bucket - bucketID string - cfg *CachingBucketConfig - logger log.Logger + bucketID string + cfg *CachingBucketConfig + invalidation *cacheInvalidation + logger log.Logger requestedGetRangeBytes *prometheus.CounterVec fetchedGetRangeBytes *prometheus.CounterVec refetchedGetRangeBytes *prometheus.CounterVec - operationConfigs map[string][]*operationConfig operationRequests *prometheus.CounterVec operationHits *prometheus.CounterVec } @@ -112,12 +118,11 @@ func NewCachingBucket(bucketID string, bucketClient objstore.Bucket, cfg *Cachin } cb := &CachingBucket{ - Bucket: bucketClient, - bucketID: bucketID, - cfg: cfg, - logger: logger, - - operationConfigs: map[string][]*operationConfig{}, + Bucket: bucketClient, + bucketID: bucketID, + cfg: cfg, + invalidation: newCacheInvalidation(bucketID, cfg, logger), + logger: logger, requestedGetRangeBytes: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_store_bucket_cache_getrange_requested_bytes_total", @@ -159,42 +164,21 @@ func NewCachingBucket(bucketID string, bucketClient objstore.Bucket, cfg *Cachin return cb, nil } -// invalidate invalidates content, existence, and attribute caches for the given object. -// Note that this is best-effort and errors invalidating the cache are ignored. -func (cb *CachingBucket) invalidate(ctx context.Context, name string) { - _, getCfg := cb.cfg.findGetConfig(name) - if getCfg != nil && getCfg.invalidateOnMutation { - // Get config includes an embedded Exists config and the Get() method - // caches if an object exists or doesn't. Because of that, we invalidate - // the exists key here with the same configuration and at the same time - // as the object content. - contentKey := cachingKeyContent(cb.bucketID, name) - existsKey := cachingKeyExists(cb.bucketID, name) - - _ = getCfg.cache.Delete(ctx, contentKey) - _ = getCfg.cache.Delete(ctx, existsKey) - } - - _, attrCfg := cb.cfg.findAttributesConfig(name) - if attrCfg != nil && attrCfg.invalidateOnMutation { - attrKey := cachingKeyAttributes(cb.bucketID, name) - _ = attrCfg.cache.Delete(ctx, attrKey) - } -} - func (cb *CachingBucket) Upload(ctx context.Context, name string, r io.Reader) error { + cb.invalidation.start(ctx, name) err := cb.Bucket.Upload(ctx, name, r) if err == nil { - cb.invalidate(ctx, name) + cb.invalidation.finish(ctx, name) } return err } func (cb *CachingBucket) Delete(ctx context.Context, name string) error { + cb.invalidation.start(ctx, name) err := cb.Bucket.Delete(ctx, name) if err == nil { - cb.invalidate(ctx, name) + cb.invalidation.finish(ctx, name) } return err @@ -276,6 +260,7 @@ func (cb *CachingBucket) Exists(ctx context.Context, name string) (bool, error) } key := cachingKeyExists(cb.bucketID, name) + lockKey := cachingKeyExistsLock(cb.bucketID, name) // Lookup the cache. if isCacheLookupEnabled(ctx) { @@ -296,13 +281,13 @@ func (cb *CachingBucket) Exists(ctx context.Context, name string) (bool, error) existsTime := time.Now() ok, err := cb.Bucket.Exists(ctx, name) if err == nil { - storeExistsCacheEntry(key, ok, existsTime, cfg.cache, cfg.existsTTL, cfg.doesntExistTTL) + storeExistsCacheEntry(ctx, key, lockKey, ok, existsTime, cfg.cache, cfg.existsTTL, cfg.doesntExistTTL) } return ok, err } -func storeExistsCacheEntry(cachingKey string, exists bool, ts time.Time, cache cache.Cache, existsTTL, doesntExistTTL time.Duration) { +func storeExistsCacheEntry(ctx context.Context, cachingKey, lockKey string, exists bool, ts time.Time, cache cache.Cache, existsTTL, doesntExistTTL time.Duration) { var ttl time.Duration if exists { ttl = existsTTL - time.Since(ts) @@ -311,7 +296,9 @@ func storeExistsCacheEntry(cachingKey string, exists bool, ts time.Time, cache c } if ttl > 0 { - cache.SetMultiAsync(map[string][]byte{cachingKey: []byte(strconv.FormatBool(exists))}, ttl) + if addErr := cache.Add(ctx, lockKey, []byte{}, invalidationLockTTL); addErr == nil { + cache.SetMultiAsync(map[string][]byte{cachingKey: []byte(strconv.FormatBool(exists))}, ttl) + } } } @@ -321,7 +308,9 @@ func (cb *CachingBucket) Get(ctx context.Context, name string) (io.ReadCloser, e return cb.Bucket.Get(ctx, name) } + contentLockKey := cachingKeyContentLock(cb.bucketID, name) contentKey := cachingKeyContent(cb.bucketID, name) + existsLockKey := cachingKeyExistsLock(cb.bucketID, name) existsKey := cachingKeyExists(cb.bucketID, name) // Lookup the cache. @@ -371,19 +360,20 @@ func (cb *CachingBucket) Get(ctx context.Context, name string) (io.ReadCloser, e if err != nil { if cb.Bucket.IsObjNotFoundErr(err) { // Cache that object doesn't exist. - storeExistsCacheEntry(existsKey, false, getTime, cfg.cache, cfg.existsTTL, cfg.doesntExistTTL) + storeExistsCacheEntry(ctx, existsKey, existsLockKey, false, getTime, cfg.cache, cfg.existsTTL, cfg.doesntExistTTL) } return nil, err } - storeExistsCacheEntry(existsKey, true, getTime, cfg.cache, cfg.existsTTL, cfg.doesntExistTTL) + storeExistsCacheEntry(ctx, existsKey, existsLockKey, true, getTime, cfg.cache, cfg.existsTTL, cfg.doesntExistTTL) return &getReader{ c: cfg.cache, r: reader, buf: new(bytes.Buffer), startTime: getTime, ttl: cfg.contentTTL, + lockKey: contentLockKey, cacheKey: contentKey, maxSize: cfg.maxCacheableSize, }, nil @@ -416,6 +406,7 @@ func (cb *CachingBucket) Attributes(ctx context.Context, name string) (objstore. } func (cb *CachingBucket) cachedAttributes(ctx context.Context, name, cfgName string, cache cache.Cache, ttl time.Duration) (objstore.ObjectAttributes, error) { + lockKey := cachingKeyAttributesLock(cb.bucketID, name) key := cachingKeyAttributes(cb.bucketID, name) // Lookup the cache. @@ -441,7 +432,12 @@ func (cb *CachingBucket) cachedAttributes(ctx context.Context, name, cfgName str } if raw, err := json.Marshal(attrs); err == nil { - cache.SetMultiAsync(map[string][]byte{key: raw}, ttl) + // Attempt to add a "lock" key to the cache if it does not already exist. Only cache this + // content when we were able to insert the lock key meaning this object isn't being updated + // by another request. + if addErr := cache.Add(ctx, lockKey, []byte{}, invalidationLockTTL); addErr == nil { + cache.SetMultiAsync(map[string][]byte{key: raw}, ttl) + } } else { level.Warn(cb.logger).Log("msg", "failed to encode cached Attributes result", "key", key, "err", err) } @@ -637,10 +633,153 @@ func mergeRanges(input []rng, limit int64) []rng { return input[:last+1] } +// cacheInvalidation manages cache entries associated with object storage items +// to ensure that stale results are not cached when the items are modified or +// deleted. +type cacheInvalidation struct { + bucketID string + cfg *CachingBucketConfig + logger log.Logger + retryCfg backoff.Config +} + +func newCacheInvalidation(bucketID string, cfg *CachingBucketConfig, logger log.Logger) *cacheInvalidation { + return &cacheInvalidation{ + bucketID: bucketID, + cfg: cfg, + logger: logger, + // Hardcoded retry configuration since it's not really important to be able + // to configure this. If we can't make a call to the cache in three tries, + // another one probably isn't going to help. + retryCfg: backoff.Config{ + MinBackoff: 10 * time.Millisecond, + MaxBackoff: 200 * time.Millisecond, + MaxRetries: 3, + }, + } +} + +// start inserts "lock" entries with a short TTL in the cache for the given item that +// prevent new cache entries for that item from being stored. This ensures that when the +// cache entries for the item are deleted after it is mutated, reads which try to "add" +// the lock key cannot and will go directly to object storage for a short period of time. +func (i *cacheInvalidation) start(ctx context.Context, name string) { + logger := spanlogger.FromContext(ctx, i.logger) + + _, attrCfg := i.cfg.findAttributesConfig(name) + _, existCfg := i.cfg.findExistConfig(name) + _, getCfg := i.cfg.findGetConfig(name) + if existCfg == nil && getCfg != nil { + existCfg = &getCfg.existsConfig + } + + attrLockKey := cachingKeyAttributesLock(i.bucketID, name) + contentLockKey := cachingKeyContentLock(i.bucketID, name) + existsLockKey := cachingKeyExistsLock(i.bucketID, name) + + if attrCfg != nil || getCfg != nil || existCfg != nil { + err := i.runWithRetries(ctx, func() error { + me := multierror.MultiError{} + if attrCfg != nil { + me.Add(attrCfg.cache.Set(ctx, attrLockKey, []byte{}, invalidationLockTTL)) + } + if getCfg != nil { + me.Add(getCfg.cache.Set(ctx, contentLockKey, []byte{}, invalidationLockTTL)) + } + if existCfg != nil { + me.Add(existCfg.cache.Set(ctx, existsLockKey, []byte{}, invalidationLockTTL)) + } + return me.Err() + }) + + if err != nil { + level.Warn(logger).Log("msg", "failed to set lock object storage cache entries", "object", name, "err", err) + } else { + logger.DebugLog("msg", "set lock object storage cache entries", "object", name) + } + } +} + +// finish removes attribute, existence, and content entries in a cache associated with +// a given item. Note that it does not remove the "lock" entries in the cache to ensure +// that other requests must read directly from object storage until the lock expires. +func (i *cacheInvalidation) finish(ctx context.Context, name string) { + logger := spanlogger.FromContext(ctx, i.logger) + + _, attrCfg := i.cfg.findAttributesConfig(name) + _, existCfg := i.cfg.findExistConfig(name) + _, getCfg := i.cfg.findGetConfig(name) + if existCfg == nil && getCfg != nil { + existCfg = &getCfg.existsConfig + } + + attrKey := cachingKeyAttributes(i.bucketID, name) + contentKey := cachingKeyContent(i.bucketID, name) + existsKey := cachingKeyExists(i.bucketID, name) + + if attrCfg != nil || getCfg != nil || existCfg != nil { + err := i.runWithRetries(ctx, func() error { + me := multierror.MultiError{} + // Breaking the cache abstraction here to test for Memcached-specific + // errors to avoid retries when we attempt to invalidate something that + // doesn't exist (which is fine and expected). + if attrCfg != nil { + if err := attrCfg.cache.Delete(ctx, attrKey); err != nil && !errors.Is(err, memcache.ErrCacheMiss) { + me.Add(err) + } + } + if getCfg != nil { + if err := getCfg.cache.Delete(ctx, contentKey); err != nil && !errors.Is(err, memcache.ErrCacheMiss) { + me.Add(err) + } + } + if existCfg != nil { + if err := existCfg.cache.Delete(ctx, existsKey); err != nil && !errors.Is(err, memcache.ErrCacheMiss) { + me.Add(err) + } + } + return me.Err() + }) + + if err != nil { + level.Warn(logger).Log("msg", "failed to delete object storage cache entries", "object", name, "err", err) + } else { + logger.DebugLog("msg", "deleted object storage cache entries", "object", name) + } + } +} + +func (i *cacheInvalidation) runWithRetries(ctx context.Context, f func() error) error { + retry := backoff.New(ctx, i.retryCfg) + var err error + + for retry.Ongoing() { + err = f() + if err == nil { + return nil + } + + retry.Wait() + } + + // If the operation failed, that's the more relevant error for why we weren't able + // to run some cache operation even if the context was canceled before the operation + // could be retried. + if err != nil { + return err + } + + return retry.Err() +} + func cachingKeyAttributes(bucketID, name string) string { return composeCachingKey("attrs", bucketID, name) } +func cachingKeyAttributesLock(bucketID, name string) string { + return composeCachingKey("attrs", bucketID, name, "lock") +} + func cachingKeyObjectSubrange(bucketID, name string, start, end int64) string { return composeCachingKey("subrange", bucketID, name, strconv.FormatInt(start, 10), strconv.FormatInt(end, 10)) } @@ -659,10 +798,18 @@ func cachingKeyExists(bucketID, name string) string { return composeCachingKey("exists", bucketID, name) } +func cachingKeyExistsLock(bucketID, name string) string { + return composeCachingKey("exists", bucketID, name, "lock") +} + func cachingKeyContent(bucketID, name string) string { return composeCachingKey("content", bucketID, name) } +func cachingKeyContentLock(bucketID, name string) string { + return composeCachingKey("content", bucketID, name, "lock") +} + func composeCachingKey(op, bucketID string, values ...string) string { // Estimate size. estimatedSize := len(op) + len(bucketID) + (2 + len(values)) @@ -774,6 +921,7 @@ type getReader struct { startTime time.Time ttl time.Duration cacheKey string + lockKey string maxSize int } @@ -797,7 +945,12 @@ func (g *getReader) Read(p []byte) (n int, err error) { if errors.Is(err, io.EOF) && g.buf != nil { remainingTTL := g.ttl - time.Since(g.startTime) if remainingTTL > 0 { - g.c.SetMultiAsync(map[string][]byte{g.cacheKey: g.buf.Bytes()}, remainingTTL) + // Attempt to add a "lock" key to the cache if it does not already exist. Only cache this + // content when we were able to insert the lock key meaning this object isn't being updated + // by another request. + if addErr := g.c.Add(context.Background(), g.lockKey, []byte{}, invalidationLockTTL); addErr == nil { + g.c.SetMultiAsync(map[string][]byte{g.cacheKey: g.buf.Bytes()}, remainingTTL) + } } // Clear reference, to avoid doing another Store on next read. g.buf = nil diff --git a/pkg/storage/tsdb/bucketcache/caching_bucket_config.go b/pkg/storage/tsdb/bucketcache/caching_bucket_config.go index 8803e8c2fe6..76779175a69 100644 --- a/pkg/storage/tsdb/bucketcache/caching_bucket_config.go +++ b/pkg/storage/tsdb/bucketcache/caching_bucket_config.go @@ -60,9 +60,8 @@ type existsConfig struct { type getConfig struct { existsConfig - contentTTL time.Duration - maxCacheableSize int - invalidateOnMutation bool + contentTTL time.Duration + maxCacheableSize int } type getRangeConfig struct { @@ -78,8 +77,7 @@ type getRangeConfig struct { type attributesConfig struct { operationConfig - ttl time.Duration - invalidateOnMutation bool + ttl time.Duration } func newOperationConfig(cache cache.Cache, matcher func(string) bool) operationConfig { @@ -106,16 +104,15 @@ func (cfg *CachingBucketConfig) CacheIter(configName string, cache cache.Cache, } // CacheGet configures caching of "Get" operation for matching files. Content of the object is cached, as well as whether object exists or not. -func (cfg *CachingBucketConfig) CacheGet(configName string, cache cache.Cache, matcher func(string) bool, maxCacheableSize int, contentTTL, existsTTL, doesntExistTTL time.Duration, invalidateOnMutation bool) { +func (cfg *CachingBucketConfig) CacheGet(configName string, cache cache.Cache, matcher func(string) bool, maxCacheableSize int, contentTTL, existsTTL, doesntExistTTL time.Duration) { cfg.get[configName] = &getConfig{ existsConfig: existsConfig{ operationConfig: newOperationConfig(cache, matcher), existsTTL: existsTTL, doesntExistTTL: doesntExistTTL, }, - contentTTL: contentTTL, - maxCacheableSize: maxCacheableSize, - invalidateOnMutation: invalidateOnMutation, + contentTTL: contentTTL, + maxCacheableSize: maxCacheableSize, } } @@ -147,11 +144,10 @@ func (cfg *CachingBucketConfig) CacheGetRange(configName string, cache cache.Cac } // CacheAttributes configures caching of "Attributes" operation for matching files. -func (cfg *CachingBucketConfig) CacheAttributes(configName string, cache cache.Cache, matcher func(name string) bool, ttl time.Duration, invalidateOnMutation bool) { +func (cfg *CachingBucketConfig) CacheAttributes(configName string, cache cache.Cache, matcher func(name string) bool, ttl time.Duration) { cfg.attributes[configName] = &attributesConfig{ - operationConfig: newOperationConfig(cache, matcher), - ttl: ttl, - invalidateOnMutation: invalidateOnMutation, + operationConfig: newOperationConfig(cache, matcher), + ttl: ttl, } } diff --git a/pkg/storage/tsdb/bucketcache/caching_bucket_test.go b/pkg/storage/tsdb/bucketcache/caching_bucket_test.go index 2e5d5685b5a..eb3b4a76bcb 100644 --- a/pkg/storage/tsdb/bucketcache/caching_bucket_test.go +++ b/pkg/storage/tsdb/bucketcache/caching_bucket_test.go @@ -20,6 +20,7 @@ import ( "github.com/grafana/dskit/runutil" "github.com/grafana/regexp" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -290,7 +291,7 @@ func TestChunksCaching(t *testing.T) { cfg := NewCachingBucketConfig() cfg.CacheGetRange(cfgName, cache, isTSDBChunkFile, subrangeSize, cache, time.Hour, time.Hour, tc.maxGetRangeRequests) - cachingBucket, err := NewCachingBucket(bucketID, inmem, cfg, nil, nil) + cachingBucket, err := NewCachingBucket(bucketID, inmem, cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry()) assert.NoError(t, err) ctx := context.Background() @@ -366,7 +367,7 @@ func TestInvalidOffsetAndLength(t *testing.T) { cfg := NewCachingBucketConfig() cfg.CacheGetRange("chunks", cache, func(string) bool { return true }, 10000, cache, time.Hour, time.Hour, 3) - c, err := NewCachingBucket("test", b, cfg, nil, nil) + c, err := NewCachingBucket("test", b, cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry()) assert.NoError(t, err) r, err := c.GetRange(context.Background(), "test", -1, 1000) @@ -411,7 +412,7 @@ func TestCachedIter(t *testing.T) { cfg := NewCachingBucketConfig() cfg.CacheIter(cfgName, cache, func(string) bool { return true }, 5*time.Minute, JSONIterCodec{}) - cb, err := NewCachingBucket("test", inmem, cfg, nil, nil) + cb, err := NewCachingBucket("test", inmem, cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry()) assert.NoError(t, err) t.Run("Iter() should return objects list from the cache on cache hit", func(t *testing.T) { @@ -503,7 +504,7 @@ func TestExists(t *testing.T) { const cfgName = "test" cfg.CacheExists(cfgName, cache, matchAll, 10*time.Minute, 2*time.Minute) - cb, err := NewCachingBucket("test", inmem, cfg, nil, nil) + cb, err := NewCachingBucket("test", inmem, cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry()) assert.NoError(t, err) t.Run("Exists() should return cached value on cache hit", func(t *testing.T) { @@ -536,6 +537,9 @@ func TestExists(t *testing.T) { assert.NoError(t, inmem.Upload(context.Background(), filename, strings.NewReader("hej"))) verifyExists(ctx, t, cb, filename, false, true, true, cfgName) // Reused cache result. + // Advance "now" for the cache so that lock keys with TTLs used for invalidation expire. + cache.Advance(2 * invalidationLockTTL) + // Calling Exists() with cache lookup disabled should lookup the object storage and also update the cached value. verifyExists(WithCacheLookupEnabled(ctx, false), t, cb, filename, true, false, false, cfgName) verifyExists(ctx, t, cb, filename, true, true, true, cfgName) @@ -553,7 +557,7 @@ func TestExistsCachingDisabled(t *testing.T) { const cfgName = "test" cfg.CacheExists(cfgName, cache, func(string) bool { return false }, 10*time.Minute, 2*time.Minute) - cb, err := NewCachingBucket("test", inmem, cfg, nil, nil) + cb, err := NewCachingBucket("test", inmem, cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry()) assert.NoError(t, err) t.Run("Exists() should not use the cache when caching is disabled for the given object", func(t *testing.T) { @@ -604,10 +608,10 @@ func TestGet(t *testing.T) { cfg := NewCachingBucketConfig() const cfgName = "metafile" - cfg.CacheGet(cfgName, cache, matchAll, 1024, 10*time.Minute, 10*time.Minute, 2*time.Minute, false) + cfg.CacheGet(cfgName, cache, matchAll, 1024, 10*time.Minute, 10*time.Minute, 2*time.Minute) cfg.CacheExists(cfgName, cache, matchAll, 10*time.Minute, 2*time.Minute) - cb, err := NewCachingBucket("test", inmem, cfg, nil, nil) + cb, err := NewCachingBucket("test", inmem, cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry()) assert.NoError(t, err) t.Run("Get() should cache non-existence of the requested object if it doesn't exist", func(t *testing.T) { @@ -650,6 +654,9 @@ func TestGet(t *testing.T) { data := []byte("content-3") assert.NoError(t, inmem.Upload(ctx, filename, bytes.NewBuffer(data))) + // Advance "now" for the cache so that lock keys with TTLs used for invalidation expire. + cache.Advance(2 * invalidationLockTTL) + // Calling Get() with cache lookup disabled should lookup the object storage and also update the cached value. verifyGet(WithCacheLookupEnabled(ctx, false), t, cb, filename, data, false, false, cfgName) @@ -663,6 +670,9 @@ func TestGet(t *testing.T) { verifyGet(ctx, t, cb, filename, data, true, true, cfgName) verifyExists(ctx, t, cb, filename, true, true, true, cfgName) + // Advance "now" for the cache so that lock keys with TTLs used for invalidation expire. + cache.Advance(2 * invalidationLockTTL) + // Calling Get() with cache lookup disabled should lookup the object storage and also update the cached value. verifyGet(WithCacheLookupEnabled(ctx, false), t, cb, filename, nil, false, false, cfgName) @@ -682,10 +692,10 @@ func TestGetTooBigObject(t *testing.T) { const filename = "/object-1" const cfgName = "metafile" // Only allow 5 bytes to be cached. - cfg.CacheGet(cfgName, cache, matchAll, 5, 10*time.Minute, 10*time.Minute, 2*time.Minute, false) + cfg.CacheGet(cfgName, cache, matchAll, 5, 10*time.Minute, 10*time.Minute, 2*time.Minute) cfg.CacheExists(cfgName, cache, matchAll, 10*time.Minute, 2*time.Minute) - cb, err := NewCachingBucket("test", inmem, cfg, nil, nil) + cb, err := NewCachingBucket("test", inmem, cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry()) assert.NoError(t, err) data := []byte("hello world") @@ -706,10 +716,10 @@ func TestGetPartialRead(t *testing.T) { cfg := NewCachingBucketConfig() const filename = "/object-1" const cfgName = "metafile" - cfg.CacheGet(cfgName, cache, matchAll, 1024, 10*time.Minute, 10*time.Minute, 2*time.Minute, false) + cfg.CacheGet(cfgName, cache, matchAll, 1024, 10*time.Minute, 10*time.Minute, 2*time.Minute) cfg.CacheExists(cfgName, cache, matchAll, 10*time.Minute, 2*time.Minute) - cb, err := NewCachingBucket("test", inmem, cfg, nil, nil) + cb, err := NewCachingBucket("test", inmem, cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry()) assert.NoError(t, err) data := []byte("hello world") @@ -770,9 +780,9 @@ func TestAttributes(t *testing.T) { cfg := NewCachingBucketConfig() const cfgName = "test" - cfg.CacheAttributes(cfgName, cache, matchAll, time.Minute, false) + cfg.CacheAttributes(cfgName, cache, matchAll, time.Minute) - cb, err := NewCachingBucket("test", inmem, cfg, nil, nil) + cb, err := NewCachingBucket("test", inmem, cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry()) assert.NoError(t, err) t.Run("Attributes() should not cache non existing objects", func(t *testing.T) { @@ -810,6 +820,9 @@ func TestAttributes(t *testing.T) { verifyObjectAttrs(ctx, t, cb, filename, len(firstData), true, false, cfgName) verifyObjectAttrs(ctx, t, cb, filename, len(firstData), true, true, cfgName) + // Advance "now" for the cache so that lock keys with TTLs used for invalidation expire. + cache.Advance(2 * invalidationLockTTL) + // Modify the object. secondData := append(firstData, []byte("with additional data")...) require.NotEqual(t, len(firstData), len(secondData)) @@ -918,11 +931,11 @@ func TestMutationInvalidatesCache(t *testing.T) { const cfgName = "test" cfg := NewCachingBucketConfig() - cfg.CacheGet(cfgName, c, matchAll, 1024, time.Minute, time.Minute, time.Minute, true) + cfg.CacheGet(cfgName, c, matchAll, 1024, time.Minute, time.Minute, time.Minute) cfg.CacheExists(cfgName, c, matchAll, time.Minute, time.Minute) - cfg.CacheAttributes(cfgName, c, matchAll, time.Minute, true) + cfg.CacheAttributes(cfgName, c, matchAll, time.Minute) - cb, err := NewCachingBucket("test", inmem, cfg, nil, nil) + cb, err := NewCachingBucket("test", inmem, cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry()) require.NoError(t, err) t.Run("invalidated on upload", func(t *testing.T) { @@ -936,6 +949,10 @@ func TestMutationInvalidatesCache(t *testing.T) { // Do an upload via the CachingBucket and ensure the first read after does not come from cache. require.NoError(t, cb.Upload(ctx, "/object-1", strings.NewReader("test content 12"))) + + // Advance "now" for the cache so that lock keys with TTLs used for invalidation expire. + c.Advance(2 * invalidationLockTTL) + verifyGet(ctx, t, cb, "/object-1", []byte("test content 12"), true, false, cfgName) verifyExists(ctx, t, cb, "/object-1", true, true, true, cfgName) verifyObjectAttrs(ctx, t, cb, "/object-1", 15, true, false, cfgName) @@ -952,10 +969,37 @@ func TestMutationInvalidatesCache(t *testing.T) { // Delete via the CachingBucket and ensure the first read after does not come from cache but non-existence is cached. require.NoError(t, cb.Delete(ctx, "/object-1")) + + // Advance "now" for the cache so that lock keys with TTLs used for invalidation expire. + c.Advance(2 * invalidationLockTTL) + verifyGet(ctx, t, cb, "/object-1", nil, true, false, cfgName) verifyExists(ctx, t, cb, "/object-1", false, true, true, cfgName) verifyObjectAttrs(ctx, t, cb, "/object-1", -1, true, false, cfgName) }) + + t.Run("item not cached immediately after invalidation", func(t *testing.T) { + c.Flush() + + // Initial upload bypassing the CachingBucket but read the object back to ensure it is in cache. + require.NoError(t, inmem.Upload(ctx, "/object-1", strings.NewReader("test content 1"))) + verifyGet(ctx, t, cb, "/object-1", []byte("test content 1"), true, false, cfgName) + verifyExists(ctx, t, cb, "/object-1", true, true, true, cfgName) + + // Do an upload via the CachingBucket and ensure all reads after do not come from cache until + // locks are allowed to expire. + require.NoError(t, cb.Upload(ctx, "/object-1", strings.NewReader("test content 12"))) + + // Verify that we can read the object but none of the reads come from cache. + verifyGet(ctx, t, cb, "/object-1", []byte("test content 12"), true, false, cfgName) + verifyGet(ctx, t, cb, "/object-1", []byte("test content 12"), true, false, cfgName) + + // Advance "now" for the cache so that lock keys with TTLs used for invalidation expire. + c.Advance(2 * invalidationLockTTL) + + verifyGet(ctx, t, cb, "/object-1", []byte("test content 12"), true, false, cfgName) + verifyGet(ctx, t, cb, "/object-1", []byte("test content 12"), true, true, cfgName) + }) } func BenchmarkCachingKey(b *testing.B) { diff --git a/pkg/storage/tsdb/caching_config.go b/pkg/storage/tsdb/caching_config.go index 1b461cbd685..45e194d0f11 100644 --- a/pkg/storage/tsdb/caching_config.go +++ b/pkg/storage/tsdb/caching_config.go @@ -107,10 +107,10 @@ func CreateCachingBucket(chunksCache cache.Cache, chunksConfig ChunksCacheConfig metadataCache = cache.NewSpanlessTracingCache(metadataCache, logger, tenant.NewMultiResolver()) cfg.CacheExists("metafile", metadataCache, isMetaFile, metadataConfig.MetafileExistsTTL, metadataConfig.MetafileDoesntExistTTL) - cfg.CacheGet("metafile", metadataCache, isMetaFile, metadataConfig.MetafileMaxSize, metadataConfig.MetafileContentTTL, metadataConfig.MetafileExistsTTL, metadataConfig.MetafileDoesntExistTTL, false) - cfg.CacheAttributes("metafile", metadataCache, isMetaFile, metadataConfig.MetafileAttributesTTL, false) - cfg.CacheAttributes("block-index", metadataCache, isBlockIndexFile, metadataConfig.BlockIndexAttributesTTL, false) - cfg.CacheGet("bucket-index", metadataCache, isBucketIndexFile, metadataConfig.BucketIndexMaxSize, metadataConfig.BucketIndexContentTTL /* do not cache exist / not exist: */, 0, 0, false) + cfg.CacheGet("metafile", metadataCache, isMetaFile, metadataConfig.MetafileMaxSize, metadataConfig.MetafileContentTTL, metadataConfig.MetafileExistsTTL, metadataConfig.MetafileDoesntExistTTL) + cfg.CacheAttributes("metafile", metadataCache, isMetaFile, metadataConfig.MetafileAttributesTTL) + cfg.CacheAttributes("block-index", metadataCache, isBlockIndexFile, metadataConfig.BlockIndexAttributesTTL) + cfg.CacheGet("bucket-index", metadataCache, isBucketIndexFile, metadataConfig.BucketIndexMaxSize, metadataConfig.BucketIndexContentTTL /* do not cache exist / not exist: */, 0, 0) codec := bucketcache.SnappyIterCodec{IterCodec: bucketcache.JSONIterCodec{}} cfg.CacheIter("tenants-iter", metadataCache, isTenantsDir, metadataConfig.TenantsListTTL, codec) diff --git a/vendor/github.com/grafana/dskit/cache/mock.go b/vendor/github.com/grafana/dskit/cache/mock.go index 4a5dae962d9..15d95419adc 100644 --- a/vendor/github.com/grafana/dskit/cache/mock.go +++ b/vendor/github.com/grafana/dskit/cache/mock.go @@ -19,10 +19,11 @@ var ( type MockCache struct { mu sync.Mutex cache map[string]Item + now time.Time } func NewMockCache() *MockCache { - c := &MockCache{} + c := &MockCache{now: time.Now()} c.Flush() return c } @@ -30,14 +31,14 @@ func NewMockCache() *MockCache { func (m *MockCache) SetAsync(key string, value []byte, ttl time.Duration) { m.mu.Lock() defer m.mu.Unlock() - m.cache[key] = Item{Data: value, ExpiresAt: time.Now().Add(ttl)} + m.cache[key] = Item{Data: value, ExpiresAt: m.now.Add(ttl)} } func (m *MockCache) SetMultiAsync(data map[string][]byte, ttl time.Duration) { m.mu.Lock() defer m.mu.Unlock() - exp := time.Now().Add(ttl) + exp := m.now.Add(ttl) for key, val := range data { m.cache[key] = Item{Data: val, ExpiresAt: exp} } @@ -46,7 +47,7 @@ func (m *MockCache) SetMultiAsync(data map[string][]byte, ttl time.Duration) { func (m *MockCache) Set(_ context.Context, key string, value []byte, ttl time.Duration) error { m.mu.Lock() defer m.mu.Unlock() - m.cache[key] = Item{Data: value, ExpiresAt: time.Now().Add(ttl)} + m.cache[key] = Item{Data: value, ExpiresAt: m.now.Add(ttl)} return nil } @@ -54,11 +55,11 @@ func (m *MockCache) Add(_ context.Context, key string, value []byte, ttl time.Du m.mu.Lock() defer m.mu.Unlock() - if _, ok := m.cache[key]; ok { + if i, ok := m.cache[key]; ok && i.ExpiresAt.After(m.now) { return ErrNotStored } - m.cache[key] = Item{Data: value, ExpiresAt: time.Now().Add(ttl)} + m.cache[key] = Item{Data: value, ExpiresAt: m.now.Add(ttl)} return nil } @@ -68,7 +69,7 @@ func (m *MockCache) GetMulti(_ context.Context, keys []string, _ ...Option) map[ found := make(map[string][]byte, len(keys)) - now := time.Now() + now := m.now for _, k := range keys { v, ok := m.cache[k] if ok && now.Before(v.ExpiresAt) { @@ -107,6 +108,7 @@ func (m *MockCache) Delete(_ context.Context, key string) error { return nil } +// Flush removes all entries from the cache func (m *MockCache) Flush() { m.mu.Lock() defer m.mu.Unlock() @@ -114,6 +116,14 @@ func (m *MockCache) Flush() { m.cache = map[string]Item{} } +// Advance changes "now" by the given duration +func (m *MockCache) Advance(d time.Duration) { + m.mu.Lock() + defer m.mu.Unlock() + + m.now = m.now.Add(d) +} + // InstrumentedMockCache is a mocked cache implementation which also tracks the number // of times its functions are called. type InstrumentedMockCache struct { @@ -172,10 +182,16 @@ func (m *InstrumentedMockCache) GetItems() map[string]Item { return m.cache.GetItems() } +// Flush removes all entries from the cache func (m *InstrumentedMockCache) Flush() { m.cache.Flush() } +// Advance changes "now" by the given duration +func (m *InstrumentedMockCache) Advance(d time.Duration) { + m.cache.Advance(d) +} + func (m *InstrumentedMockCache) CountStoreCalls() int { return int(m.storeCount.Load()) } diff --git a/vendor/github.com/grafana/dskit/ring/partition_instance_ring.go b/vendor/github.com/grafana/dskit/ring/partition_instance_ring.go index 2fb15d8af98..cffa4b2fcc5 100644 --- a/vendor/github.com/grafana/dskit/ring/partition_instance_ring.go +++ b/vendor/github.com/grafana/dskit/ring/partition_instance_ring.go @@ -13,15 +13,25 @@ type PartitionRingReader interface { PartitionRing() *PartitionRing } +type InstanceRingReader interface { + // GetInstance return the InstanceDesc for the given instanceID or an error + // if the instance doesn't exist in the ring. The returned InstanceDesc is NOT a + // deep copy, so the caller should never modify it. + GetInstance(string) (InstanceDesc, error) + + // InstancesCount returns the number of instances in the ring. + InstancesCount() int +} + // PartitionInstanceRing holds a partitions ring and a instances ring, and provide functions // to look up the intersection of the two (e.g. healthy instances by partition). type PartitionInstanceRing struct { partitionsRingReader PartitionRingReader - instancesRing *Ring + instancesRing InstanceRingReader heartbeatTimeout time.Duration } -func NewPartitionInstanceRing(partitionsRingWatcher PartitionRingReader, instancesRing *Ring, heartbeatTimeout time.Duration) *PartitionInstanceRing { +func NewPartitionInstanceRing(partitionsRingWatcher PartitionRingReader, instancesRing InstanceRingReader, heartbeatTimeout time.Duration) *PartitionInstanceRing { return &PartitionInstanceRing{ partitionsRingReader: partitionsRingWatcher, instancesRing: instancesRing, @@ -33,7 +43,7 @@ func (r *PartitionInstanceRing) PartitionRing() *PartitionRing { return r.partitionsRingReader.PartitionRing() } -func (r *PartitionInstanceRing) InstanceRing() *Ring { +func (r *PartitionInstanceRing) InstanceRing() InstanceRingReader { return r.instancesRing } diff --git a/vendor/modules.txt b/vendor/modules.txt index 6fff0e31472..2eda84b5813 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -611,7 +611,7 @@ github.com/grafana/alerting/receivers/webex github.com/grafana/alerting/receivers/webhook github.com/grafana/alerting/receivers/wecom github.com/grafana/alerting/templates -# github.com/grafana/dskit v0.0.0-20241007163720-de20fd2fe818 +# github.com/grafana/dskit v0.0.0-20241009141103-2e104a8053fa ## explicit; go 1.21 github.com/grafana/dskit/backoff github.com/grafana/dskit/ballast