Skip to content

Commit

Permalink
Add more robust cache invalidation to CachingBucket
Browse files Browse the repository at this point in the history
This change makes use of `add` operations for special "lock" cache
entries to ensure that when items in object storage are mutated, stale
results are not immediately stored to cache again.

It does this by `set`ing "lock" cache entries with a short TTL when an
item in object storage is about to be mutated. This prevents reads of
the item from caching the results afterwards. After the item is mutated
in object storage, its cache entries (excluding the lock entries) are
deleted. After the lock entries expire, reads of the item are allowed
to store results in the cache again.

Part of #9386

Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com>
  • Loading branch information
56quarters committed Oct 9, 2024
1 parent 9907ea8 commit d2cfbb9
Show file tree
Hide file tree
Showing 11 changed files with 340 additions and 111 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/golang/snappy v0.0.4
github.com/google/gopacket v1.1.19
github.com/gorilla/mux v1.8.1
github.com/grafana/dskit v0.0.0-20241007163720-de20fd2fe818
github.com/grafana/dskit v0.0.0-20241009141103-2e104a8053fa
github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/json-iterator/go v1.1.12
Expand Down Expand Up @@ -205,7 +205,7 @@ require (
github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect
github.com/googleapis/gax-go/v2 v2.13.0 // indirect
github.com/gosimple/slug v1.1.1 // indirect
github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56 // indirect
github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56
github.com/hashicorp/consul/api v1.29.4 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1256,8 +1256,8 @@ github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc h1:PXZQA2WCxe85T
github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4=
github.com/grafana/alerting v0.0.0-20240926144415-27f4e81b4b6b h1:UO4mv94pG1kzKCgBKh20TXdACBCAK2vYjV3Q2MlcpEQ=
github.com/grafana/alerting v0.0.0-20240926144415-27f4e81b4b6b/go.mod h1:GMLi6d09Xqo96fCVUjNk//rcjP5NKEdjOzfWIffD5r4=
github.com/grafana/dskit v0.0.0-20241007163720-de20fd2fe818 h1:VPMurXtl+ONN13d9ge1TGTJDBsJNr9Vb1pNQHpQ/aro=
github.com/grafana/dskit v0.0.0-20241007163720-de20fd2fe818/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ=
github.com/grafana/dskit v0.0.0-20241009141103-2e104a8053fa h1:jfSQq+jfs1q0cZr9n4u9g6Wz3423VJVE9grnLpx+eS4=
github.com/grafana/dskit v0.0.0-20241009141103-2e104a8053fa/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ=
github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc h1:BW+LjKJDz0So5LI8UZfW5neWeKpSkWqhmGjQFzcFfLM=
github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc/go.mod h1:JVmqPBe8A/pZWwRoJW5ZjyALeY5OXMzPl7LrVXOdZAI=
github.com/grafana/franz-go v0.0.0-20241009101240-fa97d35e871f h1:nsrRsQHfpqs6dWxErIOS3gD6R20H/9XM0ItykNtBFW8=
Expand Down
58 changes: 38 additions & 20 deletions pkg/ruler/rulestore/bucketclient/bucket_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ func TestCachingAndInvalidation(t *testing.T) {

cacheCfg := bucketcache.NewCachingBucketConfig()
cacheCfg.CacheIter("rule-iter", mockCache, matchAll, time.Minute, iterCodec)
cacheCfg.CacheGet("rule-groups", mockCache, matchAll, 1024^2, time.Minute, time.Minute, time.Minute, true)
cacheCfg.CacheGet("rule-groups", mockCache, matchAll, 1024^2, time.Minute, time.Minute, time.Minute)
cacheClient, err := bucketcache.NewCachingBucket("rule-store", baseClient, cacheCfg, log.NewNopLogger(), prometheus.NewPedanticRegistry())
require.NoError(t, err)

Expand All @@ -478,28 +478,37 @@ func TestCachingAndInvalidation(t *testing.T) {

t.Run("list users with cache", func(t *testing.T) {
mockCache, ruleStore := setup(t)
startStores := mockCache.CountStoreCalls()
startFetches := mockCache.CountFetchCalls()

users, err := ruleStore.ListAllUsers(context.Background())

require.NoError(t, err)
require.Equal(t, []string{"user1", "user2"}, users)

require.Equal(t, 1, mockCache.CountStoreCalls())
require.Equal(t, 1, mockCache.CountFetchCalls())
require.Equal(t, 1, mockCache.CountStoreCalls()-startStores)
require.Equal(t, 1, mockCache.CountFetchCalls()-startFetches)
})

t.Run("list users no cache", func(t *testing.T) {
mockCache, rs := setup(t)
startStores := mockCache.CountStoreCalls()
startFetches := mockCache.CountFetchCalls()

users, err := rs.ListAllUsers(context.Background(), rulestore.WithCacheDisabled())

require.NoError(t, err)
require.Equal(t, []string{"user1", "user2"}, users)

require.Equal(t, 1, mockCache.CountStoreCalls())
require.Equal(t, 0, mockCache.CountFetchCalls())
require.Equal(t, 1, mockCache.CountStoreCalls()-startStores)
require.Equal(t, 0, mockCache.CountFetchCalls()-startFetches)
})

t.Run("list rule groups with cache", func(t *testing.T) {
mockCache, ruleStore := setup(t)
startStores := mockCache.CountStoreCalls()
startFetches := mockCache.CountFetchCalls()

groups, err := ruleStore.ListRuleGroupsForUserAndNamespace(context.Background(), "user1", "")

require.NoError(t, err)
Expand All @@ -521,12 +530,15 @@ func TestCachingAndInvalidation(t *testing.T) {
},
}, groups)

require.Equal(t, 1, mockCache.CountStoreCalls())
require.Equal(t, 1, mockCache.CountFetchCalls())
require.Equal(t, 1, mockCache.CountStoreCalls()-startStores)
require.Equal(t, 1, mockCache.CountFetchCalls()-startFetches)
})

t.Run("list rule groups no cache", func(t *testing.T) {
mockCache, ruleStore := setup(t)
startStores := mockCache.CountStoreCalls()
startFetches := mockCache.CountFetchCalls()

groups, err := ruleStore.ListRuleGroupsForUserAndNamespace(context.Background(), "user1", "", rulestore.WithCacheDisabled())

require.NoError(t, err)
Expand All @@ -548,48 +560,54 @@ func TestCachingAndInvalidation(t *testing.T) {
},
}, groups)

require.Equal(t, 1, mockCache.CountStoreCalls())
require.Equal(t, 0, mockCache.CountFetchCalls())
require.Equal(t, 1, mockCache.CountStoreCalls()-startStores)
require.Equal(t, 0, mockCache.CountFetchCalls()-startFetches)
})

t.Run("get rule group from cache", func(t *testing.T) {
mockCache, ruleStore := setup(t)
startStores := mockCache.CountStoreCalls()
startFetches := mockCache.CountFetchCalls()

group, err := ruleStore.GetRuleGroup(context.Background(), "user1", "world", "another namespace testGroup")

require.NoError(t, err)
require.NotNil(t, group)

require.Equal(t, 2, mockCache.CountStoreCalls())
require.Equal(t, 1, mockCache.CountFetchCalls())
require.Equal(t, 2, mockCache.CountStoreCalls()-startStores) // content set + exists set
require.Equal(t, 1, mockCache.CountFetchCalls()-startFetches)
})

t.Run("get rule groups after invalidation", func(t *testing.T) {
mockCache, ruleStore := setup(t)
startStores := mockCache.CountStoreCalls()
startFetches := mockCache.CountFetchCalls()
startDeletes := mockCache.CountDeleteCalls()

group, err := ruleStore.GetRuleGroup(context.Background(), "user1", "world", "another namespace testGroup")

require.NoError(t, err)
require.NotNil(t, group)
require.Zero(t, group.QueryOffset)

require.Equal(t, 2, mockCache.CountStoreCalls())
require.Equal(t, 1, mockCache.CountFetchCalls())
require.Equal(t, 2, mockCache.CountStoreCalls()-startStores) // content set + exists set
require.Equal(t, 1, mockCache.CountFetchCalls()-startFetches)

origDeletes := mockCache.CountDeleteCalls()
group.QueryOffset = 42 * time.Second
require.NoError(t, ruleStore.SetRuleGroup(context.Background(), group.User, group.Namespace, group))

require.Equal(t, 2, mockCache.CountStoreCalls())
require.Equal(t, 1, mockCache.CountFetchCalls())
require.Equal(t, 2, mockCache.CountDeleteCalls()-origDeletes)
require.Equal(t, 4, mockCache.CountStoreCalls()-startStores) // += content lock + exists lock
require.Equal(t, 1, mockCache.CountFetchCalls()-startFetches)
require.Equal(t, 2, mockCache.CountDeleteCalls()-startDeletes)

modifiedGroup, err := ruleStore.GetRuleGroup(context.Background(), "user1", "world", "another namespace testGroup")
require.NoError(t, err)
require.NotNil(t, modifiedGroup)
require.Equal(t, 42*time.Second, modifiedGroup.QueryOffset)

require.Equal(t, 4, mockCache.CountStoreCalls())
require.Equal(t, 2, mockCache.CountFetchCalls())
require.Equal(t, 2, mockCache.CountDeleteCalls()-origDeletes)
require.Equal(t, 6, mockCache.CountStoreCalls()-startStores) // += content set + exists set
require.Equal(t, 2, mockCache.CountFetchCalls()-startFetches)
require.Equal(t, 2, mockCache.CountDeleteCalls()-startDeletes)
})
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ruler/store_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func newInMemoryRuleStore(t *testing.T) (*cache.InstrumentedMockCache, *bucketcl
mockCache := cache.NewInstrumentedMockCache()
cfg := bucketcache.NewCachingBucketConfig()
cfg.CacheIter("iter", mockCache, isNotTenantsDir, time.Minute, &bucketcache.JSONIterCodec{})
cfg.CacheGet("rules", mockCache, isRuleGroup, 1024^2, time.Minute, time.Minute, time.Minute, true)
cfg.CacheGet("rules", mockCache, isRuleGroup, 1024^2, time.Minute, time.Minute, time.Minute)

cachingBkt, err := bucketcache.NewCachingBucket("rules", bkt, cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry())
require.NoError(t, err)
Expand Down
Loading

0 comments on commit d2cfbb9

Please sign in to comment.