Skip to content

Commit 5c89d3e

Browse files
authored
Unify direct and caching RuleStores in ruler (#9434)
Instead of using two different `RuleStore` implementations within the Ruler, use a single caching implementation and selectively disable caching when required. This change removes the "direct" `RuleStore` implementation from the Ruler's gRPC and HTTP API layers. Instead, the caching implementation is used for all calls. In cases where caching returning stale results would not be acceptable, the caching is disabled _just_ for that call. This allows rule group contents to be safety cached with the understanding that it is safe to cache them because they will correctly invalidated when deleted or modified. Part of #9386 Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com>
1 parent b54ab80 commit 5c89d3e

File tree

15 files changed

+410
-84
lines changed

15 files changed

+410
-84
lines changed

development/mimir-microservices-mode/docker-compose.jsonnet

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ std.manifestYamlDoc({
294294

295295
memcached:: {
296296
memcached: {
297-
image: 'memcached:1.6.19-alpine',
297+
image: 'memcached:1.6.28-alpine',
298298
ports: [
299299
'11211:11211',
300300
],
@@ -303,7 +303,7 @@ std.manifestYamlDoc({
303303

304304
memcached_exporter:: {
305305
'memcached-exporter': {
306-
image: 'prom/memcached-exporter:v0.6.0',
306+
image: 'prom/memcached-exporter:v0.14.4',
307307
command: ['--memcached.address=memcached:11211', '--web.listen-address=0.0.0.0:9150'],
308308
},
309309
},

development/mimir-microservices-mode/docker-compose.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -275,14 +275,14 @@
275275
"ports":
276276
- "9900:9900"
277277
"memcached":
278-
"image": "memcached:1.6.19-alpine"
278+
"image": "memcached:1.6.28-alpine"
279279
"ports":
280280
- "11211:11211"
281281
"memcached-exporter":
282282
"command":
283283
- "--memcached.address=memcached:11211"
284284
- "--web.listen-address=0.0.0.0:9150"
285-
"image": "prom/memcached-exporter:v0.6.0"
285+
"image": "prom/memcached-exporter:v0.14.4"
286286
"minio":
287287
"command":
288288
- "server"

pkg/mimir/mimir.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -723,8 +723,7 @@ type Mimir struct {
723723
QueryFrontendTopicOffsetsReader *ingest.TopicOffsetsReader
724724
QueryFrontendCodec querymiddleware.Codec
725725
Ruler *ruler.Ruler
726-
RulerDirectStorage rulestore.RuleStore
727-
RulerCachedStorage rulestore.RuleStore
726+
RulerStorage rulestore.RuleStore
728727
Alertmanager *alertmanager.MultitenantAlertmanager
729728
Compactor *compactor.MultitenantCompactor
730729
StoreGateway *storegateway.StoreGateway

pkg/mimir/modules.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -830,13 +830,12 @@ func (t *Mimir) initRulerStorage() (serv services.Service, err error) {
830830
// we do accept stale data for about a polling interval (2 intervals in the worst
831831
// case scenario due to the jitter applied).
832832
cacheTTL := t.Cfg.Ruler.PollInterval
833-
834-
t.RulerDirectStorage, t.RulerCachedStorage, err = ruler.NewRuleStore(context.Background(), t.Cfg.RulerStorage, t.Overrides, rules.FileLoader{}, cacheTTL, util_log.Logger, t.Registerer)
833+
t.RulerStorage, err = ruler.NewRuleStore(context.Background(), t.Cfg.RulerStorage, t.Overrides, rules.FileLoader{}, cacheTTL, util_log.Logger, t.Registerer)
835834
return
836835
}
837836

838837
func (t *Mimir) initRuler() (serv services.Service, err error) {
839-
if t.RulerDirectStorage == nil {
838+
if t.RulerStorage == nil {
840839
level.Info(util_log.Logger).Log("msg", "The ruler storage has not been configured. Not starting the ruler.")
841840
return nil, nil
842841
}
@@ -939,8 +938,7 @@ func (t *Mimir) initRuler() (serv services.Service, err error) {
939938
manager,
940939
t.Registerer,
941940
util_log.Logger,
942-
t.RulerDirectStorage,
943-
t.RulerCachedStorage,
941+
t.RulerStorage,
944942
t.Overrides,
945943
)
946944
if err != nil {
@@ -951,7 +949,7 @@ func (t *Mimir) initRuler() (serv services.Service, err error) {
951949
t.API.RegisterRuler(t.Ruler)
952950

953951
// Expose HTTP configuration and prometheus-compatible Ruler APIs
954-
t.API.RegisterRulerAPI(ruler.NewAPI(t.Ruler, t.RulerDirectStorage, util_log.Logger), t.Cfg.Ruler.EnableAPI, t.BuildInfoHandler)
952+
t.API.RegisterRulerAPI(ruler.NewAPI(t.Ruler, t.RulerStorage, util_log.Logger), t.Cfg.Ruler.EnableAPI, t.BuildInfoHandler)
955953

956954
return t.Ruler, nil
957955
}

pkg/mimir/modules_test.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -159,11 +159,9 @@ func TestMimir_InitRulerStorage(t *testing.T) {
159159
require.NoError(t, err)
160160

161161
if testData.expectedInit {
162-
assert.NotNil(t, mimir.RulerDirectStorage)
163-
assert.NotNil(t, mimir.RulerCachedStorage)
162+
assert.NotNil(t, mimir.RulerStorage)
164163
} else {
165-
assert.Nil(t, mimir.RulerDirectStorage)
166-
assert.Nil(t, mimir.RulerCachedStorage)
164+
assert.Nil(t, mimir.RulerStorage)
167165
}
168166
})
169167
}

pkg/ruler/api.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -473,7 +473,9 @@ func (a *API) ListRules(w http.ResponseWriter, req *http.Request) {
473473
}
474474

475475
level.Debug(logger).Log("msg", "retrieving rule groups with namespace", "userID", userID, "namespace", namespace)
476-
rgs, err := a.store.ListRuleGroupsForUserAndNamespace(ctx, userID, namespace)
476+
// Disable any caching when getting list of all rule groups since listing results
477+
// are cached and not invalidated and this API is expected to be strongly consistent.
478+
rgs, err := a.store.ListRuleGroupsForUserAndNamespace(ctx, userID, namespace, rulestore.WithCacheDisabled())
477479
if err != nil {
478480
http.Error(w, err.Error(), http.StatusBadRequest)
479481
return
@@ -606,7 +608,9 @@ func (a *API) CreateRuleGroup(w http.ResponseWriter, req *http.Request) {
606608

607609
// Only list rule groups when enforcing a max number of groups for this tenant and namespace.
608610
if a.ruler.IsMaxRuleGroupsLimited(userID, namespace) {
609-
rgs, err := a.store.ListRuleGroupsForUserAndNamespace(ctx, userID, "")
611+
// Disable any caching when getting list of all rule groups since listing results
612+
// are cached and not invalidated and we need the most up-to-date number.
613+
rgs, err := a.store.ListRuleGroupsForUserAndNamespace(ctx, userID, "", rulestore.WithCacheDisabled())
610614
if err != nil {
611615
level.Error(logger).Log("msg", "unable to fetch current rule groups for validation", "err", err.Error(), "user", userID)
612616
http.Error(w, err.Error(), http.StatusInternalServerError)

pkg/ruler/api_test.go

Lines changed: 96 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ func TestRuler_ListRules(t *testing.T) {
163163
store.setMissingRuleGroups(tc.missingRules)
164164

165165
r := prepareRuler(t, cfg, store, withStart())
166-
a := NewAPI(r, r.directStore, log.NewNopLogger())
166+
a := NewAPI(r, r.store, log.NewNopLogger())
167167

168168
router := mux.NewRouter()
169169
router.Path("/prometheus/config/v1/rules").Methods("GET").HandlerFunc(a.ListRules)
@@ -936,7 +936,7 @@ func TestRuler_PrometheusRules(t *testing.T) {
936936
return len(rls.Groups)
937937
})
938938

939-
a := NewAPI(r, r.directStore, log.NewNopLogger())
939+
a := NewAPI(r, r.store, log.NewNopLogger())
940940

941941
req := requestFor(t, http.MethodGet, "https://localhost:8080/prometheus/api/v1/rules"+tc.queryParams, nil, userID)
942942
w := httptest.NewRecorder()
@@ -993,7 +993,7 @@ func TestRuler_PrometheusAlerts(t *testing.T) {
993993
return len(rls.Groups)
994994
})
995995

996-
a := NewAPI(r, r.directStore, log.NewNopLogger())
996+
a := NewAPI(r, r.store, log.NewNopLogger())
997997

998998
req := requestFor(t, http.MethodGet, "https://localhost:8080/prometheus/api/v1/alerts", nil, "user1")
999999
w := httptest.NewRecorder()
@@ -1172,7 +1172,7 @@ rules:
11721172

11731173
reg := prometheus.NewPedanticRegistry()
11741174
r := prepareRuler(t, rulerCfg, newMockRuleStore(make(map[string]rulespb.RuleGroupList)), withStart(), withRulerAddrAutomaticMapping(), withPrometheusRegisterer(reg))
1175-
a := NewAPI(r, r.directStore, log.NewNopLogger())
1175+
a := NewAPI(r, r.store, log.NewNopLogger())
11761176

11771177
router := mux.NewRouter()
11781178
router.Path("/prometheus/config/v1/rules/{namespace}").Methods("POST").HandlerFunc(a.CreateRuleGroup)
@@ -1207,6 +1207,92 @@ rules:
12071207
}
12081208
}
12091209

1210+
func TestAPI_CreateRuleGroupWithCaching(t *testing.T) {
1211+
// Configure the ruler to only sync the rules based on notifications upon API changes.
1212+
cfg := defaultRulerConfig(t)
1213+
cfg.PollInterval = time.Hour
1214+
cfg.OutboundSyncQueuePollInterval = 100 * time.Millisecond
1215+
cfg.InboundSyncQueuePollInterval = 100 * time.Millisecond
1216+
1217+
const successResponse = `{"status":"success","data":null,"errorType":"","error":""}`
1218+
1219+
ruleGroupVersion1 := `name: group1
1220+
interval: 15s
1221+
rules:
1222+
- record: up_rule
1223+
expr: up
1224+
- alert: up_alert
1225+
expr: up < 1
1226+
`
1227+
ruleGroupVersion2 := `name: group1
1228+
interval: 15s
1229+
rules:
1230+
- record: up_rule
1231+
expr: up
1232+
- alert: up_alert
1233+
expr: up <= 1
1234+
`
1235+
1236+
mockCache, store := newInMemoryRuleStore(t)
1237+
1238+
reg := prometheus.NewPedanticRegistry()
1239+
// Set rule group limits since this performs a list call to count the current number of rule groups
1240+
// and we're testing if the API layer is correctly telling the rule store not to serve cached results.
1241+
r := prepareRuler(t, cfg, store, withStart(), withRulerAddrAutomaticMapping(), withPrometheusRegisterer(reg), withLimits(validation.MockOverrides(func(defaults *validation.Limits, _ map[string]*validation.Limits) {
1242+
defaults.RulerMaxRuleGroupsPerTenant = 2
1243+
defaults.RulerMaxRulesPerRuleGroup = 2
1244+
})))
1245+
a := NewAPI(r, r.store, log.NewNopLogger())
1246+
1247+
router := mux.NewRouter()
1248+
router.Path("/prometheus/config/v1/rules/{namespace}/{groupName}").Methods(http.MethodGet).HandlerFunc(a.GetRuleGroup)
1249+
router.Path("/prometheus/config/v1/rules/{namespace}").Methods(http.MethodPost).HandlerFunc(a.CreateRuleGroup)
1250+
1251+
// Pre-condition check: the ruler should have run the initial rules sync.
1252+
verifySyncRulesMetric(t, reg, 1, 0)
1253+
1254+
// Store the initial version of the rule group
1255+
req := requestFor(t, http.MethodPost, "https://localhost:8080/prometheus/config/v1/rules/namespace1", strings.NewReader(ruleGroupVersion1), "user1")
1256+
w := httptest.NewRecorder()
1257+
router.ServeHTTP(w, req)
1258+
assert.Equal(t, http.StatusAccepted, w.Code)
1259+
assert.Equal(t, successResponse, w.Body.String())
1260+
// Invalidation of exists and content
1261+
assert.Equal(t, 2, mockCache.CountDeleteCalls())
1262+
1263+
verifySyncRulesMetric(t, reg, 1, 1)
1264+
1265+
// Fetch it back and ensure the content is what we expect even though content can be cached
1266+
req = requestFor(t, http.MethodGet, "https://localhost:8080/prometheus/config/v1/rules/namespace1/group1", nil, "user1")
1267+
w = httptest.NewRecorder()
1268+
router.ServeHTTP(w, req)
1269+
assert.Equal(t, http.StatusOK, w.Code)
1270+
assert.Equal(t, ruleGroupVersion1, w.Body.String())
1271+
// Iter from initial sync, get, iter from sync
1272+
assert.Equal(t, 3, mockCache.CountFetchCalls())
1273+
1274+
// Store a new version of the group that is slightly different
1275+
req = requestFor(t, http.MethodPost, "https://localhost:8080/prometheus/config/v1/rules/namespace1", strings.NewReader(ruleGroupVersion2), "user1")
1276+
w = httptest.NewRecorder()
1277+
router.ServeHTTP(w, req)
1278+
assert.Equal(t, http.StatusAccepted, w.Code)
1279+
assert.Equal(t, successResponse, w.Body.String())
1280+
// Invalidating exists and content again
1281+
assert.Equal(t, 4, mockCache.CountDeleteCalls())
1282+
1283+
verifySyncRulesMetric(t, reg, 1, 2)
1284+
1285+
// Fetch it back and ensure content is updated to the new version meaning the cache was invalidated
1286+
req = requestFor(t, http.MethodGet, "https://localhost:8080/prometheus/config/v1/rules/namespace1/group1", nil, "user1")
1287+
w = httptest.NewRecorder()
1288+
router.ServeHTTP(w, req)
1289+
assert.Equal(t, http.StatusOK, w.Code)
1290+
assert.Equal(t, ruleGroupVersion2, w.Body.String())
1291+
// Iter from initial sync, get, iter from sync, another get, iter from sync
1292+
assert.Equal(t, 5, mockCache.CountFetchCalls())
1293+
1294+
}
1295+
12101296
func TestAPI_DeleteNamespace(t *testing.T) {
12111297
// Configure the ruler to only sync the rules based on notifications upon API changes.
12121298
cfg := defaultRulerConfig(t)
@@ -1237,7 +1323,7 @@ func TestAPI_DeleteNamespace(t *testing.T) {
12371323

12381324
reg := prometheus.NewPedanticRegistry()
12391325
r := prepareRuler(t, cfg, newMockRuleStore(mockRulesNamespaces), withStart(), withRulerAddrAutomaticMapping(), withPrometheusRegisterer(reg))
1240-
a := NewAPI(r, r.directStore, log.NewNopLogger())
1326+
a := NewAPI(r, r.store, log.NewNopLogger())
12411327

12421328
router := mux.NewRouter()
12431329
router.Path("/prometheus/config/v1/rules/{namespace}").Methods(http.MethodDelete).HandlerFunc(a.DeleteNamespace)
@@ -1294,7 +1380,7 @@ func TestAPI_DeleteRuleGroup(t *testing.T) {
12941380

12951381
reg := prometheus.NewPedanticRegistry()
12961382
r := prepareRuler(t, cfg, newMockRuleStore(mockRulesNamespaces), withStart(), withRulerAddrAutomaticMapping(), withPrometheusRegisterer(reg))
1297-
a := NewAPI(r, r.directStore, log.NewNopLogger())
1383+
a := NewAPI(r, r.store, log.NewNopLogger())
12981384

12991385
router := mux.NewRouter()
13001386
router.Path("/prometheus/config/v1/rules/{namespace}/{groupName}").Methods(http.MethodDelete).HandlerFunc(a.DeleteRuleGroup)
@@ -1336,7 +1422,7 @@ func TestRuler_LimitsPerGroup(t *testing.T) {
13361422
defaults.RulerMaxRulesPerRuleGroup = 1
13371423
})))
13381424

1339-
a := NewAPI(r, r.directStore, log.NewNopLogger())
1425+
a := NewAPI(r, r.store, log.NewNopLogger())
13401426

13411427
tc := []struct {
13421428
name string
@@ -1389,7 +1475,7 @@ func TestRuler_RulerGroupLimits(t *testing.T) {
13891475
defaults.RulerMaxRulesPerRuleGroup = 1
13901476
})))
13911477

1392-
a := NewAPI(r, r.directStore, log.NewNopLogger())
1478+
a := NewAPI(r, r.store, log.NewNopLogger())
13931479

13941480
tc := []struct {
13951481
name string
@@ -1449,7 +1535,7 @@ func TestRuler_RulerGroupLimitsDisabled(t *testing.T) {
14491535
defaults.RulerMaxRulesPerRuleGroup = 0
14501536
})))
14511537

1452-
a := NewAPI(r, r.directStore, log.NewNopLogger())
1538+
a := NewAPI(r, r.store, log.NewNopLogger())
14531539

14541540
tc := []struct {
14551541
name string
@@ -1551,7 +1637,7 @@ func TestAPIRoutesCorrectlyHandleInvalidOrgID(t *testing.T) {
15511637

15521638
r := prepareRuler(t, cfg, newMockRuleStore(map[string]rulespb.RuleGroupList{}), withStart())
15531639

1554-
a := NewAPI(r, r.directStore, log.NewNopLogger())
1640+
a := NewAPI(r, r.store, log.NewNopLogger())
15551641

15561642
router := mux.NewRouter()
15571643
router.Path("/api/v1/rules").Methods(http.MethodGet).HandlerFunc(a.PrometheusRules)

0 commit comments

Comments
 (0)