From f6e69a2425d09496ec1263fec36409a180b4adfc Mon Sep 17 00:00:00 2001 From: Nick Pillitteri Date: Tue, 24 Sep 2024 19:55:08 -0400 Subject: [PATCH] Unify direct and caching `RuleStore`s in ruler Instead of using two different `RuleStore` implementations within the Ruler, use a single caching implementation and selectively disable caching when required. This change removes the "direct" `RuleStore` implementation from the Ruler's gRPC and HTTP API layers. Instead, the caching implementation is used for all calls. In cases where caching returning stale results would not be acceptable, the caching is disabled _just_ for that call. This allows rule group contents to be safety cached with the understanding that it is safe to cache them because they will correctly invalidated when deleted or modified. Part of #9386 Signed-off-by: Nick Pillitteri --- .../docker-compose.jsonnet | 4 +- .../docker-compose.yml | 4 +- pkg/mimir/mimir.go | 3 +- pkg/mimir/modules.go | 10 +- pkg/mimir/modules_test.go | 6 +- pkg/ruler/api.go | 8 +- pkg/ruler/api_test.go | 106 ++++++++++-- pkg/ruler/ruler.go | 56 ++++--- pkg/ruler/ruler_test.go | 8 +- .../rulestore/bucketclient/bucket_client.go | 56 +++++-- .../bucketclient/bucket_client_test.go | 153 ++++++++++++++++++ pkg/ruler/rulestore/local/local.go | 4 +- pkg/ruler/rulestore/store.go | 30 +++- pkg/ruler/storage.go | 20 +-- pkg/ruler/store_mock_test.go | 26 ++- 15 files changed, 410 insertions(+), 84 deletions(-) diff --git a/development/mimir-microservices-mode/docker-compose.jsonnet b/development/mimir-microservices-mode/docker-compose.jsonnet index 5a099617ef9..b4f5838bd66 100644 --- a/development/mimir-microservices-mode/docker-compose.jsonnet +++ b/development/mimir-microservices-mode/docker-compose.jsonnet @@ -294,7 +294,7 @@ std.manifestYamlDoc({ memcached:: { memcached: { - image: 'memcached:1.6.19-alpine', + image: 'memcached:1.6.28-alpine', ports: [ '11211:11211', ], @@ -303,7 +303,7 @@ std.manifestYamlDoc({ memcached_exporter:: { 'memcached-exporter': { - image: 'prom/memcached-exporter:v0.6.0', + image: 'prom/memcached-exporter:v0.14.4', command: ['--memcached.address=memcached:11211', '--web.listen-address=0.0.0.0:9150'], }, }, diff --git a/development/mimir-microservices-mode/docker-compose.yml b/development/mimir-microservices-mode/docker-compose.yml index b548bbc4797..44f9806172b 100644 --- a/development/mimir-microservices-mode/docker-compose.yml +++ b/development/mimir-microservices-mode/docker-compose.yml @@ -275,14 +275,14 @@ "ports": - "9900:9900" "memcached": - "image": "memcached:1.6.19-alpine" + "image": "memcached:1.6.28-alpine" "ports": - "11211:11211" "memcached-exporter": "command": - "--memcached.address=memcached:11211" - "--web.listen-address=0.0.0.0:9150" - "image": "prom/memcached-exporter:v0.6.0" + "image": "prom/memcached-exporter:v0.14.4" "minio": "command": - "server" diff --git a/pkg/mimir/mimir.go b/pkg/mimir/mimir.go index a778a05ac3a..96556d55fc4 100644 --- a/pkg/mimir/mimir.go +++ b/pkg/mimir/mimir.go @@ -723,8 +723,7 @@ type Mimir struct { QueryFrontendTopicOffsetsReader *ingest.TopicOffsetsReader QueryFrontendCodec querymiddleware.Codec Ruler *ruler.Ruler - RulerDirectStorage rulestore.RuleStore - RulerCachedStorage rulestore.RuleStore + RulerStorage rulestore.RuleStore Alertmanager *alertmanager.MultitenantAlertmanager Compactor *compactor.MultitenantCompactor StoreGateway *storegateway.StoreGateway diff --git a/pkg/mimir/modules.go b/pkg/mimir/modules.go index c372bea1c25..c4ad9010fb4 100644 --- a/pkg/mimir/modules.go +++ b/pkg/mimir/modules.go @@ -830,13 +830,12 @@ func (t *Mimir) initRulerStorage() (serv services.Service, err error) { // we do accept stale data for about a polling interval (2 intervals in the worst // case scenario due to the jitter applied). cacheTTL := t.Cfg.Ruler.PollInterval - - t.RulerDirectStorage, t.RulerCachedStorage, err = ruler.NewRuleStore(context.Background(), t.Cfg.RulerStorage, t.Overrides, rules.FileLoader{}, cacheTTL, util_log.Logger, t.Registerer) + t.RulerStorage, err = ruler.NewRuleStore(context.Background(), t.Cfg.RulerStorage, t.Overrides, rules.FileLoader{}, cacheTTL, util_log.Logger, t.Registerer) return } func (t *Mimir) initRuler() (serv services.Service, err error) { - if t.RulerDirectStorage == nil { + if t.RulerStorage == nil { level.Info(util_log.Logger).Log("msg", "The ruler storage has not been configured. Not starting the ruler.") return nil, nil } @@ -939,8 +938,7 @@ func (t *Mimir) initRuler() (serv services.Service, err error) { manager, t.Registerer, util_log.Logger, - t.RulerDirectStorage, - t.RulerCachedStorage, + t.RulerStorage, t.Overrides, ) if err != nil { @@ -951,7 +949,7 @@ func (t *Mimir) initRuler() (serv services.Service, err error) { t.API.RegisterRuler(t.Ruler) // Expose HTTP configuration and prometheus-compatible Ruler APIs - t.API.RegisterRulerAPI(ruler.NewAPI(t.Ruler, t.RulerDirectStorage, util_log.Logger), t.Cfg.Ruler.EnableAPI, t.BuildInfoHandler) + t.API.RegisterRulerAPI(ruler.NewAPI(t.Ruler, t.RulerStorage, util_log.Logger), t.Cfg.Ruler.EnableAPI, t.BuildInfoHandler) return t.Ruler, nil } diff --git a/pkg/mimir/modules_test.go b/pkg/mimir/modules_test.go index a726a8609f9..c8ecc7b8daf 100644 --- a/pkg/mimir/modules_test.go +++ b/pkg/mimir/modules_test.go @@ -159,11 +159,9 @@ func TestMimir_InitRulerStorage(t *testing.T) { require.NoError(t, err) if testData.expectedInit { - assert.NotNil(t, mimir.RulerDirectStorage) - assert.NotNil(t, mimir.RulerCachedStorage) + assert.NotNil(t, mimir.RulerStorage) } else { - assert.Nil(t, mimir.RulerDirectStorage) - assert.Nil(t, mimir.RulerCachedStorage) + assert.Nil(t, mimir.RulerStorage) } }) } diff --git a/pkg/ruler/api.go b/pkg/ruler/api.go index 05f02a91f97..1d205c421b2 100644 --- a/pkg/ruler/api.go +++ b/pkg/ruler/api.go @@ -473,7 +473,9 @@ func (a *API) ListRules(w http.ResponseWriter, req *http.Request) { } level.Debug(logger).Log("msg", "retrieving rule groups with namespace", "userID", userID, "namespace", namespace) - rgs, err := a.store.ListRuleGroupsForUserAndNamespace(ctx, userID, namespace) + // Disable any caching when getting list of all rule groups since listing results + // are cached and not invalidated and this API is expected to be strongly consistent. + rgs, err := a.store.ListRuleGroupsForUserAndNamespace(ctx, userID, namespace, rulestore.WithCacheDisabled()) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return @@ -606,7 +608,9 @@ func (a *API) CreateRuleGroup(w http.ResponseWriter, req *http.Request) { // Only list rule groups when enforcing a max number of groups for this tenant and namespace. if a.ruler.IsMaxRuleGroupsLimited(userID, namespace) { - rgs, err := a.store.ListRuleGroupsForUserAndNamespace(ctx, userID, "") + // Disable any caching when getting list of all rule groups since listing results + // are cached and not invalidated and we need the most up-to-date number. + rgs, err := a.store.ListRuleGroupsForUserAndNamespace(ctx, userID, "", rulestore.WithCacheDisabled()) if err != nil { level.Error(logger).Log("msg", "unable to fetch current rule groups for validation", "err", err.Error(), "user", userID) http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/pkg/ruler/api_test.go b/pkg/ruler/api_test.go index e66d247a71a..e566fd15e7d 100644 --- a/pkg/ruler/api_test.go +++ b/pkg/ruler/api_test.go @@ -163,7 +163,7 @@ func TestRuler_ListRules(t *testing.T) { store.setMissingRuleGroups(tc.missingRules) r := prepareRuler(t, cfg, store, withStart()) - a := NewAPI(r, r.directStore, log.NewNopLogger()) + a := NewAPI(r, r.store, log.NewNopLogger()) router := mux.NewRouter() router.Path("/prometheus/config/v1/rules").Methods("GET").HandlerFunc(a.ListRules) @@ -936,7 +936,7 @@ func TestRuler_PrometheusRules(t *testing.T) { return len(rls.Groups) }) - a := NewAPI(r, r.directStore, log.NewNopLogger()) + a := NewAPI(r, r.store, log.NewNopLogger()) req := requestFor(t, http.MethodGet, "https://localhost:8080/prometheus/api/v1/rules"+tc.queryParams, nil, userID) w := httptest.NewRecorder() @@ -993,7 +993,7 @@ func TestRuler_PrometheusAlerts(t *testing.T) { return len(rls.Groups) }) - a := NewAPI(r, r.directStore, log.NewNopLogger()) + a := NewAPI(r, r.store, log.NewNopLogger()) req := requestFor(t, http.MethodGet, "https://localhost:8080/prometheus/api/v1/alerts", nil, "user1") w := httptest.NewRecorder() @@ -1172,7 +1172,7 @@ rules: reg := prometheus.NewPedanticRegistry() r := prepareRuler(t, rulerCfg, newMockRuleStore(make(map[string]rulespb.RuleGroupList)), withStart(), withRulerAddrAutomaticMapping(), withPrometheusRegisterer(reg)) - a := NewAPI(r, r.directStore, log.NewNopLogger()) + a := NewAPI(r, r.store, log.NewNopLogger()) router := mux.NewRouter() router.Path("/prometheus/config/v1/rules/{namespace}").Methods("POST").HandlerFunc(a.CreateRuleGroup) @@ -1207,6 +1207,92 @@ rules: } } +func TestAPI_CreateRuleGroupWithCaching(t *testing.T) { + // Configure the ruler to only sync the rules based on notifications upon API changes. + cfg := defaultRulerConfig(t) + cfg.PollInterval = time.Hour + cfg.OutboundSyncQueuePollInterval = 100 * time.Millisecond + cfg.InboundSyncQueuePollInterval = 100 * time.Millisecond + + const successResponse = `{"status":"success","data":null,"errorType":"","error":""}` + + ruleGroupVersion1 := `name: group1 +interval: 15s +rules: + - record: up_rule + expr: up + - alert: up_alert + expr: up < 1 +` + ruleGroupVersion2 := `name: group1 +interval: 15s +rules: + - record: up_rule + expr: up + - alert: up_alert + expr: up <= 1 +` + + mockCache, store := newInMemoryRuleStore(t) + + reg := prometheus.NewPedanticRegistry() + // Set rule group limits since this performs a list call to count the current number of rule groups + // and we're testing if the API layer is correctly telling the rule store not to serve cached results. + r := prepareRuler(t, cfg, store, withStart(), withRulerAddrAutomaticMapping(), withPrometheusRegisterer(reg), withLimits(validation.MockOverrides(func(defaults *validation.Limits, _ map[string]*validation.Limits) { + defaults.RulerMaxRuleGroupsPerTenant = 2 + defaults.RulerMaxRulesPerRuleGroup = 2 + }))) + a := NewAPI(r, r.store, log.NewNopLogger()) + + router := mux.NewRouter() + router.Path("/prometheus/config/v1/rules/{namespace}/{groupName}").Methods(http.MethodGet).HandlerFunc(a.GetRuleGroup) + router.Path("/prometheus/config/v1/rules/{namespace}").Methods(http.MethodPost).HandlerFunc(a.CreateRuleGroup) + + // Pre-condition check: the ruler should have run the initial rules sync. + verifySyncRulesMetric(t, reg, 1, 0) + + // Store the initial version of the rule group + req := requestFor(t, http.MethodPost, "https://localhost:8080/prometheus/config/v1/rules/namespace1", strings.NewReader(ruleGroupVersion1), "user1") + w := httptest.NewRecorder() + router.ServeHTTP(w, req) + assert.Equal(t, 202, w.Code) + assert.Equal(t, successResponse, w.Body.String()) + // Invalidation of exists and content + assert.Equal(t, 2, mockCache.CountDeleteCalls()) + + verifySyncRulesMetric(t, reg, 1, 1) + + // Fetch it back and ensure the content is what we expect even though content can be cached + req = requestFor(t, http.MethodGet, "https://localhost:8080/prometheus/config/v1/rules/namespace1/group1", nil, "user1") + w = httptest.NewRecorder() + router.ServeHTTP(w, req) + assert.Equal(t, http.StatusOK, w.Code) + assert.Equal(t, ruleGroupVersion1, w.Body.String()) + // Iter from initial sync, get, iter from sync + assert.Equal(t, 3, mockCache.CountFetchCalls()) + + // Store a new version of the group that is slightly different + req = requestFor(t, http.MethodPost, "https://localhost:8080/prometheus/config/v1/rules/namespace1", strings.NewReader(ruleGroupVersion2), "user1") + w = httptest.NewRecorder() + router.ServeHTTP(w, req) + assert.Equal(t, 202, w.Code) + assert.Equal(t, successResponse, w.Body.String()) + // Invalidating exists and content again + assert.Equal(t, 4, mockCache.CountDeleteCalls()) + + verifySyncRulesMetric(t, reg, 1, 2) + + // Fetch it back and ensure content is updated to the new version meaning the cache was invalidated + req = requestFor(t, http.MethodGet, "https://localhost:8080/prometheus/config/v1/rules/namespace1/group1", nil, "user1") + w = httptest.NewRecorder() + router.ServeHTTP(w, req) + assert.Equal(t, http.StatusOK, w.Code) + assert.Equal(t, ruleGroupVersion2, w.Body.String()) + // Iter from initial sync, get, iter from sync, another get, iter from sync + assert.Equal(t, 5, mockCache.CountFetchCalls()) + +} + func TestAPI_DeleteNamespace(t *testing.T) { // Configure the ruler to only sync the rules based on notifications upon API changes. cfg := defaultRulerConfig(t) @@ -1237,7 +1323,7 @@ func TestAPI_DeleteNamespace(t *testing.T) { reg := prometheus.NewPedanticRegistry() r := prepareRuler(t, cfg, newMockRuleStore(mockRulesNamespaces), withStart(), withRulerAddrAutomaticMapping(), withPrometheusRegisterer(reg)) - a := NewAPI(r, r.directStore, log.NewNopLogger()) + a := NewAPI(r, r.store, log.NewNopLogger()) router := mux.NewRouter() router.Path("/prometheus/config/v1/rules/{namespace}").Methods(http.MethodDelete).HandlerFunc(a.DeleteNamespace) @@ -1294,7 +1380,7 @@ func TestAPI_DeleteRuleGroup(t *testing.T) { reg := prometheus.NewPedanticRegistry() r := prepareRuler(t, cfg, newMockRuleStore(mockRulesNamespaces), withStart(), withRulerAddrAutomaticMapping(), withPrometheusRegisterer(reg)) - a := NewAPI(r, r.directStore, log.NewNopLogger()) + a := NewAPI(r, r.store, log.NewNopLogger()) router := mux.NewRouter() router.Path("/prometheus/config/v1/rules/{namespace}/{groupName}").Methods(http.MethodDelete).HandlerFunc(a.DeleteRuleGroup) @@ -1336,7 +1422,7 @@ func TestRuler_LimitsPerGroup(t *testing.T) { defaults.RulerMaxRulesPerRuleGroup = 1 }))) - a := NewAPI(r, r.directStore, log.NewNopLogger()) + a := NewAPI(r, r.store, log.NewNopLogger()) tc := []struct { name string @@ -1389,7 +1475,7 @@ func TestRuler_RulerGroupLimits(t *testing.T) { defaults.RulerMaxRulesPerRuleGroup = 1 }))) - a := NewAPI(r, r.directStore, log.NewNopLogger()) + a := NewAPI(r, r.store, log.NewNopLogger()) tc := []struct { name string @@ -1449,7 +1535,7 @@ func TestRuler_RulerGroupLimitsDisabled(t *testing.T) { defaults.RulerMaxRulesPerRuleGroup = 0 }))) - a := NewAPI(r, r.directStore, log.NewNopLogger()) + a := NewAPI(r, r.store, log.NewNopLogger()) tc := []struct { name string @@ -1551,7 +1637,7 @@ func TestAPIRoutesCorrectlyHandleInvalidOrgID(t *testing.T) { r := prepareRuler(t, cfg, newMockRuleStore(map[string]rulespb.RuleGroupList{}), withStart()) - a := NewAPI(r, r.directStore, log.NewNopLogger()) + a := NewAPI(r, r.store, log.NewNopLogger()) router := mux.NewRouter() router.Path("/api/v1/rules").Methods(http.MethodGet).HandlerFunc(a.PrometheusRules) diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index 9ccc2a00f23..9c8cddf1688 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -38,7 +38,6 @@ import ( "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/ruler/rulespb" "github.com/grafana/mimir/pkg/ruler/rulestore" - "github.com/grafana/mimir/pkg/storage/tsdb/bucketcache" "github.com/grafana/mimir/pkg/util" "github.com/grafana/mimir/pkg/util/grpcencoding/s2" util_log "github.com/grafana/mimir/pkg/util/log" @@ -314,13 +313,12 @@ type MultiTenantManager interface { type Ruler struct { services.Service - cfg Config - lifecycler *ring.BasicLifecycler - ring *ring.Ring - directStore rulestore.RuleStore - cachedStore rulestore.RuleStore - manager MultiTenantManager - limits RulesLimits + cfg Config + lifecycler *ring.BasicLifecycler + ring *ring.Ring + store rulestore.RuleStore + manager MultiTenantManager + limits RulesLimits metrics *rulerMetrics @@ -346,20 +344,14 @@ type Ruler struct { } // NewRuler creates a new ruler from a distributor and chunk store. -func NewRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, logger log.Logger, directStore, cachedStore rulestore.RuleStore, limits RulesLimits) (*Ruler, error) { - // If the cached store is not configured, just fallback to the direct one. - if cachedStore == nil { - cachedStore = directStore - } - - return newRuler(cfg, manager, reg, logger, directStore, cachedStore, limits, newRulerClientPool(cfg.ClientTLSConfig, logger, reg)) +func NewRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, logger log.Logger, store rulestore.RuleStore, limits RulesLimits) (*Ruler, error) { + return newRuler(cfg, manager, reg, logger, store, limits, newRulerClientPool(cfg.ClientTLSConfig, logger, reg)) } -func newRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, logger log.Logger, directStore, cachedStore rulestore.RuleStore, limits RulesLimits, clientPool ClientsPool) (*Ruler, error) { +func newRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, logger log.Logger, store rulestore.RuleStore, limits RulesLimits, clientPool ClientsPool) (*Ruler, error) { ruler := &Ruler{ cfg: cfg, - directStore: directStore, - cachedStore: cachedStore, + store: store, manager: manager, registry: reg, logger: logger, @@ -633,7 +625,7 @@ func (r *Ruler) syncRules(ctx context.Context, userIDs []string, reason rulesSyn func (r *Ruler) loadRuleGroupsToSync(ctx context.Context, configs map[string]rulespb.RuleGroupList) (map[string]rulespb.RuleGroupList, error) { // Load rule groups. start := time.Now() - missing, err := r.directStore.LoadRuleGroups(ctx, configs) + missing, err := r.store.LoadRuleGroups(ctx, configs) r.metrics.loadRuleGroups.Observe(time.Since(start).Seconds()) if err != nil { @@ -660,7 +652,12 @@ func (r *Ruler) listRuleGroupsToSyncForAllUsers(ctx context.Context, reason rule // In order to reduce API calls to the object storage among all ruler replicas, // we support lookup of stale data for a short period. - users, err := r.cachedStore.ListAllUsers(bucketcache.WithCacheLookupEnabled(ctx, cacheLookupEnabled)) + var opts []rulestore.Option + if !cacheLookupEnabled { + opts = append(opts, rulestore.WithCacheDisabled()) + } + + users, err := r.store.ListAllUsers(ctx, opts...) if err != nil { return nil, errors.Wrap(err, "unable to list users of ruler") } @@ -711,11 +708,16 @@ func (r *Ruler) listRuleGroupsToSyncForUsers(ctx context.Context, userIDs []stri concurrency = len(userRings) } + var opts []rulestore.Option + if !cacheLookupEnabled { + opts = append(opts, rulestore.WithCacheDisabled()) + } + g, gctx := errgroup.WithContext(ctx) for i := 0; i < concurrency; i++ { g.Go(func() error { for userID := range userCh { - groups, err := r.cachedStore.ListRuleGroupsForUserAndNamespace(bucketcache.WithCacheLookupEnabled(gctx, cacheLookupEnabled), userID, "") + groups, err := r.store.ListRuleGroupsForUserAndNamespace(gctx, userID, "", opts...) if err != nil { return errors.Wrapf(err, "failed to fetch rule groups for user %s", userID) } @@ -1224,7 +1226,7 @@ func (r *Ruler) DeleteTenantConfiguration(w http.ResponseWriter, req *http.Reque return } - err = r.directStore.DeleteNamespace(req.Context(), userID, "") // Empty namespace = delete all rule groups. + err = r.store.DeleteNamespace(req.Context(), userID, "") // Empty namespace = delete all rule groups. if err != nil && !errors.Is(err, rulestore.ErrGroupNamespaceNotFound) { respondServerError(logger, w, err.Error()) return @@ -1238,8 +1240,8 @@ func (r *Ruler) DeleteTenantConfiguration(w http.ResponseWriter, req *http.Reque func (r *Ruler) ListAllRules(w http.ResponseWriter, req *http.Request) { logger := util_log.WithContext(req.Context(), r.logger) - - userIDs, err := r.directStore.ListAllUsers(req.Context()) + // Disable caching when getting a list of users since this API is expected to be strongly consistent. + userIDs, err := r.store.ListAllUsers(req.Context(), rulestore.WithCacheDisabled()) if err != nil { level.Error(logger).Log("msg", errListAllUser, "err", err) http.Error(w, fmt.Sprintf("%s: %s", errListAllUser, err.Error()), http.StatusInternalServerError) @@ -1255,12 +1257,14 @@ func (r *Ruler) ListAllRules(w http.ResponseWriter, req *http.Request) { }() err = concurrency.ForEachUser(req.Context(), userIDs, fetchRulesConcurrency, func(ctx context.Context, userID string) error { - rg, err := r.directStore.ListRuleGroupsForUserAndNamespace(ctx, userID, "") + // Disable any caching when getting list of all rule groups since listing results + // are cached and not invalidated and this API is expected to be strongly consistent. + rg, err := r.store.ListRuleGroupsForUserAndNamespace(ctx, userID, "", rulestore.WithCacheDisabled()) if err != nil { return errors.Wrapf(err, "failed to fetch ruler config for user %s", userID) } userRules := map[string]rulespb.RuleGroupList{userID: rg} - if missing, err := r.directStore.LoadRuleGroups(ctx, userRules); err != nil { + if missing, err := r.store.LoadRuleGroups(ctx, userRules); err != nil { return errors.Wrapf(err, "failed to load ruler config for user %s", userID) } else if len(missing) > 0 { // This API is expected to be strongly consistent, so it's an error if any rule group was missing. diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index 6a9533ad1bf..a10c4005f7c 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -54,12 +54,12 @@ import ( "github.com/grafana/mimir/pkg/storage/bucket" "github.com/grafana/mimir/pkg/storage/bucket/filesystem" "github.com/grafana/mimir/pkg/util" - util_test "github.com/grafana/mimir/pkg/util/test" + utiltest "github.com/grafana/mimir/pkg/util/test" "github.com/grafana/mimir/pkg/util/validation" ) func TestMain(m *testing.M) { - util_test.VerifyNoLeakTestMain(m) + utiltest.VerifyNoLeakTestMain(m) } func defaultRulerConfig(t testing.TB) Config { @@ -210,7 +210,7 @@ func prepareRuler(t *testing.T, cfg Config, storage rulestore.RuleStore, opts .. options := applyPrepareOptions(t, cfg.Ring.Common.InstanceID, opts...) manager := prepareRulerManager(t, cfg, opts...) - ruler, err := newRuler(cfg, manager, options.registerer, options.logger, storage, storage, options.limits, newMockClientsPool(cfg, options.logger, options.registerer, options.rulerAddrMap)) + ruler, err := newRuler(cfg, manager, options.registerer, options.logger, storage, options.limits, newMockClientsPool(cfg, options.logger, options.registerer, options.rulerAddrMap)) require.NoError(t, err) if options.rulerAddrAutoMap { @@ -1571,7 +1571,7 @@ func verifyExpectedDeletedRuleGroupsForUser(t *testing.T, r *Ruler, userID strin ctx := context.Background() t.Run("ListRuleGroupsForUserAndNamespace()", func(t *testing.T) { - list, err := r.directStore.ListRuleGroupsForUserAndNamespace(ctx, userID, "") + list, err := r.store.ListRuleGroupsForUserAndNamespace(ctx, userID, "") require.NoError(t, err) if expectedDeleted { diff --git a/pkg/ruler/rulestore/bucketclient/bucket_client.go b/pkg/ruler/rulestore/bucketclient/bucket_client.go index dfc87128eff..4a75cb2f489 100644 --- a/pkg/ruler/rulestore/bucketclient/bucket_client.go +++ b/pkg/ruler/rulestore/bucketclient/bucket_client.go @@ -24,6 +24,8 @@ import ( "github.com/grafana/mimir/pkg/ruler/rulespb" "github.com/grafana/mimir/pkg/ruler/rulestore" "github.com/grafana/mimir/pkg/storage/bucket" + "github.com/grafana/mimir/pkg/storage/tsdb/bucketcache" + "github.com/grafana/mimir/pkg/util/spanlogger" ) const ( @@ -57,13 +59,13 @@ func NewBucketRuleStore(bkt objstore.Bucket, cfgProvider bucket.TenantConfigProv } // getRuleGroup loads and return a rules group. If existing rule group is supplied, it is Reset and reused. If nil, new RuleGroupDesc is allocated. -func (b *BucketRuleStore) getRuleGroup(ctx context.Context, userID, namespace, groupName string, rg *rulespb.RuleGroupDesc) (*rulespb.RuleGroupDesc, error) { +func (b *BucketRuleStore) getRuleGroup(ctx context.Context, userID, namespace, groupName string, rg *rulespb.RuleGroupDesc, spanlog *spanlogger.SpanLogger) (*rulespb.RuleGroupDesc, error) { userBucket := bucket.NewUserBucketClient(userID, b.bucket, b.cfgProvider) objectKey := getRuleGroupObjectKey(namespace, groupName) reader, err := userBucket.Get(ctx, objectKey) if userBucket.IsObjNotFoundErr(err) { - level.Debug(b.logger).Log("msg", "rule group does not exist", "user", userID, "key", objectKey) + spanlog.DebugLog("msg", "rule group does not exist", "user", userID, "key", objectKey) return nil, rulestore.ErrGroupNotFound } @@ -92,7 +94,15 @@ func (b *BucketRuleStore) getRuleGroup(ctx context.Context, userID, namespace, g } // ListAllUsers implements rules.RuleStore. -func (b *BucketRuleStore) ListAllUsers(ctx context.Context) ([]string, error) { +func (b *BucketRuleStore) ListAllUsers(ctx context.Context, opts ...rulestore.Option) ([]string, error) { + logger, ctx := spanlogger.NewWithLogger(ctx, b.logger, "BucketRuleStore.ListAllUsers") + defer logger.Finish() + + options := rulestore.CollectOptions(opts...) + if options.DisableCache { + ctx = bucketcache.WithCacheLookupEnabled(ctx, false) + } + var users []string err := b.bucket.Iter(ctx, "", func(user string) error { users = append(users, strings.TrimSuffix(user, objstore.DirDelim)) @@ -106,11 +116,18 @@ func (b *BucketRuleStore) ListAllUsers(ctx context.Context) ([]string, error) { } // ListRuleGroupsForUserAndNamespace implements rules.RuleStore. -func (b *BucketRuleStore) ListRuleGroupsForUserAndNamespace(ctx context.Context, userID string, namespace string) (rulespb.RuleGroupList, error) { - userBucket := bucket.NewUserBucketClient(userID, b.bucket, b.cfgProvider) +func (b *BucketRuleStore) ListRuleGroupsForUserAndNamespace(ctx context.Context, userID string, namespace string, opts ...rulestore.Option) (rulespb.RuleGroupList, error) { + logger, ctx := spanlogger.NewWithLogger(ctx, b.logger, "BucketRuleStore.ListRuleGroupsForUserAndNamespace") + defer logger.Finish() + userBucket := bucket.NewUserBucketClient(userID, b.bucket, b.cfgProvider) groupList := rulespb.RuleGroupList{} + options := rulestore.CollectOptions(opts...) + if options.DisableCache { + ctx = bucketcache.WithCacheLookupEnabled(ctx, false) + } + // The prefix to list objects depends on whether the namespace has been // specified in the request. prefix := "" @@ -121,7 +138,7 @@ func (b *BucketRuleStore) ListRuleGroupsForUserAndNamespace(ctx context.Context, err := userBucket.Iter(ctx, prefix, func(key string) error { namespace, group, err := parseRuleGroupObjectKey(key) if err != nil { - level.Warn(b.logger).Log("msg", "invalid rule group object key found while listing rule groups", "user", userID, "key", key, "err", err) + level.Warn(logger).Log("msg", "invalid rule group object key found while listing rule groups", "user", userID, "key", key, "err", err) // Do not fail just because of a spurious item in the bucket. return nil @@ -143,6 +160,9 @@ func (b *BucketRuleStore) ListRuleGroupsForUserAndNamespace(ctx context.Context, // LoadRuleGroups implements rules.RuleStore. func (b *BucketRuleStore) LoadRuleGroups(ctx context.Context, groupsToLoad map[string]rulespb.RuleGroupList) (missing rulespb.RuleGroupList, err error) { + logger, ctx := spanlogger.NewWithLogger(ctx, b.logger, "BucketRuleStore.LoadRuleGroups") + defer logger.Finish() + var ( ch = make(chan *rulespb.RuleGroupDesc) missingMx sync.Mutex @@ -161,7 +181,7 @@ func (b *BucketRuleStore) LoadRuleGroups(ctx context.Context, groupsToLoad map[s } // Reuse group pointer from the map. - loadedGroup, err := b.getRuleGroup(gCtx, user, namespace, groupName, inputGroup) + loadedGroup, err := b.getRuleGroup(gCtx, user, namespace, groupName, inputGroup, logger) switch { case errors.Is(err, rulestore.ErrGroupNotFound): @@ -202,11 +222,17 @@ outer: // GetRuleGroup implements rules.RuleStore. func (b *BucketRuleStore) GetRuleGroup(ctx context.Context, userID string, namespace string, group string) (*rulespb.RuleGroupDesc, error) { - return b.getRuleGroup(ctx, userID, namespace, group, nil) + logger, ctx := spanlogger.NewWithLogger(ctx, b.logger, "BucketRuleStore.GetRuleGroup") + defer logger.Finish() + + return b.getRuleGroup(ctx, userID, namespace, group, nil, logger) } // SetRuleGroup implements rules.RuleStore. func (b *BucketRuleStore) SetRuleGroup(ctx context.Context, userID string, namespace string, group *rulespb.RuleGroupDesc) error { + logger, ctx := spanlogger.NewWithLogger(ctx, b.logger, "BucketRuleStore.SetRuleGroup") + defer logger.Finish() + userBucket := bucket.NewUserBucketClient(userID, b.bucket, b.cfgProvider) data, err := proto.Marshal(group) if err != nil { @@ -218,6 +244,9 @@ func (b *BucketRuleStore) SetRuleGroup(ctx context.Context, userID string, names // DeleteRuleGroup implements rules.RuleStore. func (b *BucketRuleStore) DeleteRuleGroup(ctx context.Context, userID string, namespace string, group string) error { + logger, ctx := spanlogger.NewWithLogger(ctx, b.logger, "BucketRuleStore.DeleteRuleGroup") + defer logger.Finish() + userBucket := bucket.NewUserBucketClient(userID, b.bucket, b.cfgProvider) err := userBucket.Delete(ctx, getRuleGroupObjectKey(namespace, group)) if b.bucket.IsObjNotFoundErr(err) { @@ -228,7 +257,12 @@ func (b *BucketRuleStore) DeleteRuleGroup(ctx context.Context, userID string, na // DeleteNamespace implements rules.RuleStore. func (b *BucketRuleStore) DeleteNamespace(ctx context.Context, userID string, namespace string) error { - ruleGroupList, err := b.ListRuleGroupsForUserAndNamespace(ctx, userID, namespace) + logger, ctx := spanlogger.NewWithLogger(ctx, b.logger, "BucketRuleStore.DeleteNamespace") + defer logger.Finish() + + // Disable caching when listing all rule groups for a user since listing entries are not + // invalidated in the cache when rule groups are modified and we need to delete everything. + ruleGroupList, err := b.ListRuleGroupsForUserAndNamespace(ctx, userID, namespace, rulestore.WithCacheDisabled()) if err != nil { return err } @@ -243,10 +277,10 @@ func (b *BucketRuleStore) DeleteNamespace(ctx context.Context, userID string, na return err } objectKey := getRuleGroupObjectKey(rg.Namespace, rg.Name) - level.Debug(b.logger).Log("msg", "deleting rule group", "user", userID, "namespace", namespace, "key", objectKey) + logger.DebugLog("msg", "deleting rule group", "user", userID, "namespace", namespace, "key", objectKey) err = userBucket.Delete(ctx, objectKey) if err != nil { - level.Error(b.logger).Log("msg", "unable to delete rule group from namespace", "user", userID, "namespace", namespace, "key", objectKey, "err", err) + level.Error(logger).Log("msg", "unable to delete rule group from namespace", "user", userID, "namespace", namespace, "key", objectKey, "err", err) return err } } diff --git a/pkg/ruler/rulestore/bucketclient/bucket_client_test.go b/pkg/ruler/rulestore/bucketclient/bucket_client_test.go index b34aae4f3fa..a1251892b5e 100644 --- a/pkg/ruler/rulestore/bucketclient/bucket_client_test.go +++ b/pkg/ruler/rulestore/bucketclient/bucket_client_test.go @@ -13,7 +13,9 @@ import ( "time" "github.com/go-kit/log" + "github.com/grafana/dskit/cache" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/rulefmt" "github.com/stretchr/testify/assert" @@ -24,6 +26,7 @@ import ( "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/ruler/rulespb" "github.com/grafana/mimir/pkg/ruler/rulestore" + "github.com/grafana/mimir/pkg/storage/tsdb/bucketcache" ) type testGroup struct { @@ -443,3 +446,153 @@ func (mb mockBucket) Iter(_ context.Context, _ string, f func(string) error, _ . } return nil } + +func TestCachingAndInvalidation(t *testing.T) { + fixtureGroups := []testGroup{ + {user: "user1", namespace: "hello", ruleGroup: rulefmt.RuleGroup{Name: "first testGroup"}}, + {user: "user1", namespace: "hello", ruleGroup: rulefmt.RuleGroup{Name: "second testGroup"}}, + {user: "user1", namespace: "world", ruleGroup: rulefmt.RuleGroup{Name: "another namespace testGroup"}}, + {user: "user2", namespace: "+-!@#$%. ", ruleGroup: rulefmt.RuleGroup{Name: "different user"}}, + } + + setup := func(t *testing.T) (*cache.InstrumentedMockCache, *BucketRuleStore) { + iterCodec := &bucketcache.JSONIterCodec{} + baseClient := objstore.NewInMemBucket() + mockCache := cache.NewInstrumentedMockCache() + + cacheCfg := bucketcache.NewCachingBucketConfig() + cacheCfg.CacheIter("rule-iter", mockCache, matchAll, time.Minute, iterCodec) + cacheCfg.CacheGet("rule-groups", mockCache, matchAll, 1024^2, time.Minute, time.Minute, time.Minute, true) + cacheClient, err := bucketcache.NewCachingBucket("rule-store", baseClient, cacheCfg, log.NewNopLogger(), prometheus.NewPedanticRegistry()) + require.NoError(t, err) + + ruleStore := NewBucketRuleStore(cacheClient, nil, log.NewNopLogger()) + + for _, g := range fixtureGroups { + desc := rulespb.ToProto(g.user, g.namespace, g.ruleGroup) + require.NoError(t, ruleStore.SetRuleGroup(context.Background(), g.user, g.namespace, desc)) + } + + return mockCache, ruleStore + } + + t.Run("list users with cache", func(t *testing.T) { + mockCache, ruleStore := setup(t) + users, err := ruleStore.ListAllUsers(context.Background()) + + require.NoError(t, err) + require.Equal(t, []string{"user1", "user2"}, users) + + require.Equal(t, 1, mockCache.CountStoreCalls()) + require.Equal(t, 1, mockCache.CountFetchCalls()) + }) + + t.Run("list users no cache", func(t *testing.T) { + mockCache, rs := setup(t) + users, err := rs.ListAllUsers(context.Background(), rulestore.WithCacheDisabled()) + + require.NoError(t, err) + require.Equal(t, []string{"user1", "user2"}, users) + + require.Equal(t, 1, mockCache.CountStoreCalls()) + require.Equal(t, 0, mockCache.CountFetchCalls()) + }) + + t.Run("list rule groups with cache", func(t *testing.T) { + mockCache, ruleStore := setup(t) + groups, err := ruleStore.ListRuleGroupsForUserAndNamespace(context.Background(), "user1", "") + + require.NoError(t, err) + require.Equal(t, rulespb.RuleGroupList{ + { + Name: "first testGroup", + User: "user1", + Namespace: "hello", + }, + { + Name: "second testGroup", + User: "user1", + Namespace: "hello", + }, + { + Name: "another namespace testGroup", + User: "user1", + Namespace: "world", + }, + }, groups) + + require.Equal(t, 1, mockCache.CountStoreCalls()) + require.Equal(t, 1, mockCache.CountFetchCalls()) + }) + + t.Run("list rule groups no cache", func(t *testing.T) { + mockCache, ruleStore := setup(t) + groups, err := ruleStore.ListRuleGroupsForUserAndNamespace(context.Background(), "user1", "", rulestore.WithCacheDisabled()) + + require.NoError(t, err) + require.Equal(t, rulespb.RuleGroupList{ + { + Name: "first testGroup", + User: "user1", + Namespace: "hello", + }, + { + Name: "second testGroup", + User: "user1", + Namespace: "hello", + }, + { + Name: "another namespace testGroup", + User: "user1", + Namespace: "world", + }, + }, groups) + + require.Equal(t, 1, mockCache.CountStoreCalls()) + require.Equal(t, 0, mockCache.CountFetchCalls()) + }) + + t.Run("get rule group from cache", func(t *testing.T) { + mockCache, ruleStore := setup(t) + group, err := ruleStore.GetRuleGroup(context.Background(), "user1", "world", "another namespace testGroup") + + require.NoError(t, err) + require.NotNil(t, group) + + require.Equal(t, 2, mockCache.CountStoreCalls()) + require.Equal(t, 1, mockCache.CountFetchCalls()) + }) + + t.Run("get rule groups after invalidation", func(t *testing.T) { + mockCache, ruleStore := setup(t) + group, err := ruleStore.GetRuleGroup(context.Background(), "user1", "world", "another namespace testGroup") + + require.NoError(t, err) + require.NotNil(t, group) + require.Zero(t, group.QueryOffset) + + require.Equal(t, 2, mockCache.CountStoreCalls()) + require.Equal(t, 1, mockCache.CountFetchCalls()) + + origDeletes := mockCache.CountDeleteCalls() + group.QueryOffset = 42 * time.Second + require.NoError(t, ruleStore.SetRuleGroup(context.Background(), group.User, group.Namespace, group)) + + require.Equal(t, 2, mockCache.CountStoreCalls()) + require.Equal(t, 1, mockCache.CountFetchCalls()) + require.Equal(t, 2, mockCache.CountDeleteCalls()-origDeletes) + + modifiedGroup, err := ruleStore.GetRuleGroup(context.Background(), "user1", "world", "another namespace testGroup") + require.NoError(t, err) + require.NotNil(t, modifiedGroup) + require.Equal(t, 42*time.Second, modifiedGroup.QueryOffset) + + require.Equal(t, 4, mockCache.CountStoreCalls()) + require.Equal(t, 2, mockCache.CountFetchCalls()) + require.Equal(t, 2, mockCache.CountDeleteCalls()-origDeletes) + }) +} + +func matchAll(string) bool { + return true +} diff --git a/pkg/ruler/rulestore/local/local.go b/pkg/ruler/rulestore/local/local.go index ff849c2d563..acdde110315 100644 --- a/pkg/ruler/rulestore/local/local.go +++ b/pkg/ruler/rulestore/local/local.go @@ -37,7 +37,7 @@ func NewLocalRulesClient(cfg rulestore.LocalStoreConfig, loader promRules.GroupL }, nil } -func (l *Client) ListAllUsers(_ context.Context) ([]string, error) { +func (l *Client) ListAllUsers(_ context.Context, _ ...rulestore.Option) ([]string, error) { root := l.cfg.Directory infos, err := os.ReadDir(root) if err != nil { @@ -69,7 +69,7 @@ func (l *Client) ListAllUsers(_ context.Context) ([]string, error) { } // ListRuleGroupsForUserAndNamespace implements rules.RuleStore. This method also loads the rules. -func (l *Client) ListRuleGroupsForUserAndNamespace(ctx context.Context, userID string, namespace string) (rulespb.RuleGroupList, error) { +func (l *Client) ListRuleGroupsForUserAndNamespace(ctx context.Context, userID string, namespace string, _ ...rulestore.Option) (rulespb.RuleGroupList, error) { if namespace == "" { return l.loadAllRulesGroupsForUser(ctx, userID) } diff --git a/pkg/ruler/rulestore/store.go b/pkg/ruler/rulestore/store.go index 50f0ec60d2c..06e753b4539 100644 --- a/pkg/ruler/rulestore/store.go +++ b/pkg/ruler/rulestore/store.go @@ -21,18 +21,44 @@ var ( ErrUserNotFound = errors.New("no rule groups found for user") ) +// Options are per-call options that can be used to modify the behavior of RuleStore methods. +type Options struct { + DisableCache bool +} + +// CollectOptions applies one or more Option callbacks to produce an Options struct. +func CollectOptions(opts ...Option) *Options { + o := &Options{} + for _, opt := range opts { + opt(o) + } + + return o +} + +// Option is a callback the modifies per-call options for RuleStore methods. +type Option func(opts *Options) + +// WithCacheDisabled returns an Option callback to disable any caching used +// by a RuleStore method call. +func WithCacheDisabled() Option { + return func(opts *Options) { + opts.DisableCache = true + } +} + // RuleStore is used to store and retrieve rules. // Methods starting with "List" prefix may return partially loaded groups: with only group Name, Namespace and User fields set. // To make sure that rules within each group are loaded, client must use LoadRuleGroups method. type RuleStore interface { // ListAllUsers returns all users with rule groups configured. - ListAllUsers(ctx context.Context) ([]string, error) + ListAllUsers(ctx context.Context, opts ...Option) ([]string, error) // ListRuleGroupsForUserAndNamespace returns all the active rule groups for a user from given namespace. // It *MUST* populate fields User, Namespace, Name of all rule groups. // It *MAY* populate the actual rules. // If namespace is empty, groups from all namespaces are returned. - ListRuleGroupsForUserAndNamespace(ctx context.Context, userID string, namespace string) (rulespb.RuleGroupList, error) + ListRuleGroupsForUserAndNamespace(ctx context.Context, userID string, namespace string, opts ...Option) (rulespb.RuleGroupList, error) // LoadRuleGroups loads rules for each rule group in the map. // diff --git a/pkg/ruler/storage.go b/pkg/ruler/storage.go index 8a9398beaa4..13dc1a32bed 100644 --- a/pkg/ruler/storage.go +++ b/pkg/ruler/storage.go @@ -7,6 +7,7 @@ package ruler import ( "context" + "strings" "time" "github.com/go-kit/log" @@ -25,14 +26,14 @@ import ( ) // NewRuleStore returns a rule store backend client based on the provided cfg. -func NewRuleStore(ctx context.Context, cfg rulestore.Config, cfgProvider bucket.TenantConfigProvider, loader promRules.GroupLoader, cacheTTL time.Duration, logger log.Logger, reg prometheus.Registerer) (directStore, cachedStore rulestore.RuleStore, _ error) { +func NewRuleStore(ctx context.Context, cfg rulestore.Config, cfgProvider bucket.TenantConfigProvider, loader promRules.GroupLoader, cacheTTL time.Duration, logger log.Logger, reg prometheus.Registerer) (store rulestore.RuleStore, _ error) { if cfg.Backend == rulestore.BackendLocal { store, err := local.NewLocalRulesClient(cfg.Local, loader) if err != nil { - return nil, nil, err + return nil, err } - return store, store, nil + return store, nil } if cfg.Backend == bucket.Filesystem { @@ -41,18 +42,15 @@ func NewRuleStore(ctx context.Context, cfg rulestore.Config, cfgProvider bucket. directBucketClient, err := bucket.NewClient(ctx, cfg.Config, "ruler-storage", logger, reg) if err != nil { - return nil, nil, err + return nil, err } cachedBucketClient, err := wrapBucketWithCache(directBucketClient, cfg, cacheTTL, logger, reg) if err != nil { - return nil, nil, err + return nil, err } - directStore = bucketclient.NewBucketRuleStore(directBucketClient, cfgProvider, logger) - cachedStore = bucketclient.NewBucketRuleStore(cachedBucketClient, cfgProvider, logger) - - return directStore, cachedStore, nil + return bucketclient.NewBucketRuleStore(cachedBucketClient, cfgProvider, logger), nil } func wrapBucketWithCache(bkt objstore.Bucket, cfg rulestore.Config, cacheTTL time.Duration, logger log.Logger, reg prometheus.Registerer) (objstore.Bucket, error) { @@ -85,3 +83,7 @@ func wrapBucketWithCache(bkt objstore.Bucket, cfg rulestore.Config, cacheTTL tim func isNotTenantsDir(name string) bool { return name != "" } + +func isRuleGroup(name string) bool { + return strings.HasPrefix(name, "rules/") +} diff --git a/pkg/ruler/store_mock_test.go b/pkg/ruler/store_mock_test.go index 230aaa4410f..34552ebe669 100644 --- a/pkg/ruler/store_mock_test.go +++ b/pkg/ruler/store_mock_test.go @@ -10,10 +10,19 @@ import ( "encoding/base64" "fmt" "sync" + "testing" "time" + "github.com/go-kit/log" + "github.com/grafana/dskit/cache" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" + "github.com/grafana/mimir/pkg/ruler/rulespb" "github.com/grafana/mimir/pkg/ruler/rulestore" + "github.com/grafana/mimir/pkg/ruler/rulestore/bucketclient" + "github.com/grafana/mimir/pkg/storage/tsdb/bucketcache" ) var ( @@ -41,6 +50,19 @@ var ( } ) +func newInMemoryRuleStore(t *testing.T) (*cache.InstrumentedMockCache, *bucketclient.BucketRuleStore) { + bkt := objstore.NewInMemBucket() + mockCache := cache.NewInstrumentedMockCache() + cfg := bucketcache.NewCachingBucketConfig() + cfg.CacheIter("iter", mockCache, isNotTenantsDir, time.Minute, &bucketcache.JSONIterCodec{}) + cfg.CacheGet("rules", mockCache, isRuleGroup, 1024^2, time.Minute, time.Minute, time.Minute, true) + + cachingBkt, err := bucketcache.NewCachingBucket("rules", bkt, cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry()) + require.NoError(t, err) + + return mockCache, bucketclient.NewBucketRuleStore(cachingBkt, nil, log.NewNopLogger()) +} + type mockRuleStore struct { rules map[string]rulespb.RuleGroupList missingRules rulespb.RuleGroupList @@ -61,7 +83,7 @@ func (m *mockRuleStore) setMissingRuleGroups(missing rulespb.RuleGroupList) { m.mtx.Unlock() } -func (m *mockRuleStore) ListAllUsers(_ context.Context) ([]string, error) { +func (m *mockRuleStore) ListAllUsers(_ context.Context, _ ...rulestore.Option) ([]string, error) { m.mtx.Lock() defer m.mtx.Unlock() @@ -72,7 +94,7 @@ func (m *mockRuleStore) ListAllUsers(_ context.Context) ([]string, error) { return result, nil } -func (m *mockRuleStore) ListRuleGroupsForUserAndNamespace(_ context.Context, userID, namespace string) (rulespb.RuleGroupList, error) { +func (m *mockRuleStore) ListRuleGroupsForUserAndNamespace(_ context.Context, userID, namespace string, _ ...rulestore.Option) (rulespb.RuleGroupList, error) { m.mtx.Lock() defer m.mtx.Unlock()