Skip to content

Commit de20fd2

Browse files
authored
cache: Add .Set and .Add methods to cache clients (#591)
* cache: Add `.Set` and `.Add` methods to cache clients This change adds a synchronous version of `.Set` to Memcached and Redis clients as well as the various `Cache` wrapper implementations. This allows callers to set a key and be sure it exists in the cache. This change also adds an `.Add` method which conditionally adds an item to the cache only if it does not already exist. This change is a prerequisite for grafana/mimir#9386 Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com> * Changelog Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com> * Code review fixes Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com> * Add test for .Add() method semantics for LRU cache Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com> * Assert on cache contents for LRU `.Add()` test Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com> --------- Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com>
1 parent a711ce3 commit de20fd2

13 files changed

+331
-28
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@
230230
* [ENHANCEMENT] Runtimeconfig: support gzip-compressed files with `.gz` extension. #571
231231
* [ENHANCEMENT] grpcclient: Support custom gRPC compressors. #583
232232
* [ENHANCEMENT] Adapt `metrics.SendSumOfGaugesPerTenant` to use `metrics.MetricOption`. #584
233+
* [ENHANCEMENT] Cache: Add `.Add()` and `.Set()` methods to cache clients. #591
233234
* [CHANGE] Backoff: added `Backoff.ErrCause()` which is like `Backoff.Err()` but returns the context cause if backoff is terminated because the context has been canceled. #538
234235
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
235236
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85

cache/cache.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@ import (
1313
"github.com/prometheus/client_golang/prometheus"
1414
)
1515

16+
var (
17+
ErrNotStored = errors.New("item not stored")
18+
ErrInvalidTTL = errors.New("invalid TTL")
19+
)
20+
1621
// Cache is a high level interface to interact with a cache.
1722
type Cache interface {
1823
// GetMulti fetches multiple keys at once from a cache. In case of error,
@@ -28,6 +33,14 @@ type Cache interface {
2833
// any underlying async operations fail, the errors will be tracked/logged.
2934
SetMultiAsync(data map[string][]byte, ttl time.Duration)
3035

36+
// Set stores a key and value into a cache.
37+
Set(ctx context.Context, key string, value []byte, ttl time.Duration) error
38+
39+
// Add stores a key and value into a cache only if it does not already exist. If the
40+
// item was not stored because an entry already exists in the cache, ErrNotStored will
41+
// be returned.
42+
Add(ctx context.Context, key string, value []byte, ttl time.Duration) error
43+
3144
// Delete deletes a key from a cache. This is a synchronous operation. If an asynchronous
3245
// set operation for key is still pending to be processed, it will wait for it to complete
3346
// before performing deletion.

cache/client.go

Lines changed: 46 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
// Common functionality shared between the Memcached and Redis Cache implementations
1818

1919
const (
20+
opAdd = "add"
2021
opSet = "set"
2122
opGetMulti = "getmulti"
2223
opDelete = "delete"
@@ -29,6 +30,8 @@ const (
2930
reasonMaxItemSize = "max-item-size"
3031
reasonAsyncBufferFull = "async-buffer-full"
3132
reasonMalformedKey = "malformed-key"
33+
reasonInvalidTTL = "invalid-ttl"
34+
reasonNotStored = "not-stored"
3235
reasonConnectTimeout = "connect-timeout"
3336
reasonTimeout = "request-timeout"
3437
reasonServerError = "server-error"
@@ -74,6 +77,7 @@ func newClientMetrics(reg prometheus.Registerer) *clientMetrics {
7477
Help: "Total number of operations against cache.",
7578
}, []string{"operation"})
7679
cm.operations.WithLabelValues(opGetMulti)
80+
cm.operations.WithLabelValues(opAdd)
7781
cm.operations.WithLabelValues(opSet)
7882
cm.operations.WithLabelValues(opDelete)
7983
cm.operations.WithLabelValues(opIncrement)
@@ -85,10 +89,12 @@ func newClientMetrics(reg prometheus.Registerer) *clientMetrics {
8589
Name: "operation_failures_total",
8690
Help: "Total number of operations against cache that failed.",
8791
}, []string{"operation", "reason"})
88-
for _, op := range []string{opGetMulti, opSet, opDelete, opIncrement, opFlush, opTouch, opCompareAndSwap} {
92+
for _, op := range []string{opGetMulti, opAdd, opSet, opDelete, opIncrement, opFlush, opTouch, opCompareAndSwap} {
8993
cm.failures.WithLabelValues(op, reasonConnectTimeout)
9094
cm.failures.WithLabelValues(op, reasonTimeout)
9195
cm.failures.WithLabelValues(op, reasonMalformedKey)
96+
cm.failures.WithLabelValues(op, reasonInvalidTTL)
97+
cm.failures.WithLabelValues(op, reasonNotStored)
9298
cm.failures.WithLabelValues(op, reasonServerError)
9399
cm.failures.WithLabelValues(op, reasonNetworkError)
94100
cm.failures.WithLabelValues(op, reasonOther)
@@ -99,6 +105,7 @@ func newClientMetrics(reg prometheus.Registerer) *clientMetrics {
99105
Help: "Total number of operations against cache that have been skipped.",
100106
}, []string{"operation", "reason"})
101107
cm.skipped.WithLabelValues(opGetMulti, reasonMaxItemSize)
108+
cm.skipped.WithLabelValues(opAdd, reasonMaxItemSize)
102109
cm.skipped.WithLabelValues(opSet, reasonMaxItemSize)
103110
cm.skipped.WithLabelValues(opSet, reasonAsyncBufferFull)
104111

@@ -112,6 +119,7 @@ func newClientMetrics(reg prometheus.Registerer) *clientMetrics {
112119
NativeHistogramMinResetDuration: time.Hour,
113120
}, []string{"operation"})
114121
cm.duration.WithLabelValues(opGetMulti)
122+
cm.duration.WithLabelValues(opAdd)
115123
cm.duration.WithLabelValues(opSet)
116124
cm.duration.WithLabelValues(opDelete)
117125
cm.duration.WithLabelValues(opIncrement)
@@ -129,6 +137,7 @@ func newClientMetrics(reg prometheus.Registerer) *clientMetrics {
129137
[]string{"operation"},
130138
)
131139
cm.dataSize.WithLabelValues(opGetMulti)
140+
cm.dataSize.WithLabelValues(opAdd)
132141
cm.dataSize.WithLabelValues(opSet)
133142
cm.dataSize.WithLabelValues(opCompareAndSwap)
134143

@@ -172,22 +181,12 @@ func (c *baseClient) setAsync(key string, value []byte, ttl time.Duration, f fun
172181
}
173182

174183
err := c.asyncQueue.submit(func() {
175-
start := time.Now()
176-
c.metrics.operations.WithLabelValues(opSet).Inc()
177-
178-
err := f(key, value, ttl)
179-
if err != nil {
180-
level.Debug(c.logger).Log(
181-
"msg", "failed to store item to cache",
182-
"key", key,
183-
"sizeBytes", len(value),
184-
"err", err,
185-
)
186-
c.trackError(opSet, err)
187-
}
188-
189-
c.metrics.dataSize.WithLabelValues(opSet).Observe(float64(len(value)))
190-
c.metrics.duration.WithLabelValues(opSet).Observe(time.Since(start).Seconds())
184+
// Because this operation is executed in a separate goroutine: We run the operation without
185+
// a context (it is expected to keep running no matter what happens) and we don't return the
186+
// error (it will be tracked via metrics instead of being returned to the caller).
187+
_ = c.storeOperation(context.Background(), key, value, ttl, opSet, func(_ context.Context, key string, value []byte, ttl time.Duration) error {
188+
return f(key, value, ttl)
189+
})
191190
})
192191

193192
if err != nil {
@@ -196,6 +195,32 @@ func (c *baseClient) setAsync(key string, value []byte, ttl time.Duration, f fun
196195
}
197196
}
198197

198+
func (c *baseClient) storeOperation(ctx context.Context, key string, value []byte, ttl time.Duration, operation string, f func(ctx context.Context, key string, value []byte, ttl time.Duration) error) error {
199+
if c.maxItemSize > 0 && uint64(len(value)) > c.maxItemSize {
200+
c.metrics.skipped.WithLabelValues(operation, reasonMaxItemSize).Inc()
201+
return nil
202+
}
203+
204+
start := time.Now()
205+
c.metrics.operations.WithLabelValues(operation).Inc()
206+
207+
err := f(ctx, key, value, ttl)
208+
if err != nil {
209+
level.Debug(c.logger).Log(
210+
"msg", "failed to store item to cache",
211+
"operation", operation,
212+
"key", key,
213+
"sizeBytes", len(value),
214+
"err", err,
215+
)
216+
c.trackError(operation, err)
217+
}
218+
219+
c.metrics.dataSize.WithLabelValues(operation).Observe(float64(len(value)))
220+
c.metrics.duration.WithLabelValues(operation).Observe(time.Since(start).Seconds())
221+
return err
222+
}
223+
199224
// wait submits an async task and blocks until it completes. This can be used during
200225
// tests to ensure that async "sets" have completed before attempting to read them.
201226
func (c *baseClient) wait() error {
@@ -255,6 +280,10 @@ func (c *baseClient) trackError(op string, err error) {
255280
} else {
256281
c.metrics.failures.WithLabelValues(op, reasonNetworkError).Inc()
257282
}
283+
case errors.Is(err, ErrNotStored):
284+
c.metrics.failures.WithLabelValues(op, reasonNotStored).Inc()
285+
case errors.Is(err, ErrInvalidTTL):
286+
c.metrics.failures.WithLabelValues(op, reasonInvalidTTL).Inc()
258287
case errors.Is(err, memcache.ErrMalformedKey):
259288
c.metrics.failures.WithLabelValues(op, reasonMalformedKey).Inc()
260289
case errors.Is(err, memcache.ErrServerError):

cache/compression.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,14 @@ func (s *SnappyCache) SetMultiAsync(data map[string][]byte, ttl time.Duration) {
8585
s.next.SetMultiAsync(encoded, ttl)
8686
}
8787

88+
func (s *SnappyCache) Set(ctx context.Context, key string, value []byte, ttl time.Duration) error {
89+
return s.next.Set(ctx, key, snappy.Encode(nil, value), ttl)
90+
}
91+
92+
func (s *SnappyCache) Add(ctx context.Context, key string, value []byte, ttl time.Duration) error {
93+
return s.next.Add(ctx, key, snappy.Encode(nil, value), ttl)
94+
}
95+
8896
// GetMulti implements Cache.
8997
func (s *SnappyCache) GetMulti(ctx context.Context, keys []string, opts ...Option) map[string][]byte {
9098
found := s.next.GetMulti(ctx, keys, opts...)

cache/lru.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,41 @@ func (l *LRUCache) SetMultiAsync(data map[string][]byte, ttl time.Duration) {
103103
}
104104
}
105105

106+
func (l *LRUCache) Set(ctx context.Context, key string, value []byte, ttl time.Duration) error {
107+
err := l.c.Set(ctx, key, value, ttl)
108+
109+
l.mtx.Lock()
110+
defer l.mtx.Unlock()
111+
112+
expires := time.Now().Add(ttl)
113+
l.lru.Add(key, &Item{
114+
Data: value,
115+
ExpiresAt: expires,
116+
})
117+
118+
return err
119+
}
120+
121+
func (l *LRUCache) Add(ctx context.Context, key string, value []byte, ttl time.Duration) error {
122+
err := l.c.Add(ctx, key, value, ttl)
123+
124+
// When a caller uses the Add method, the presence of absence of an entry in the cache
125+
// has significance. In order to maintain the semantics of that, we only add an entry to
126+
// the LRU when it was able to be successfully added to the shared cache.
127+
if err == nil {
128+
l.mtx.Lock()
129+
defer l.mtx.Unlock()
130+
131+
expires := time.Now().Add(ttl)
132+
l.lru.Add(key, &Item{
133+
Data: value,
134+
ExpiresAt: expires,
135+
})
136+
}
137+
138+
return err
139+
}
140+
106141
func (l *LRUCache) GetMulti(ctx context.Context, keys []string, opts ...Option) (result map[string][]byte) {
107142
l.requests.Add(float64(len(keys)))
108143
l.mtx.Lock()

cache/lru_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,3 +94,36 @@ func TestLRUCache_Evictions(t *testing.T) {
9494
cache_memory_items_count{name="test"} 2
9595
`), "cache_memory_items_count"))
9696
}
97+
98+
func TestLRUCache_SetAdd(t *testing.T) {
99+
const maxItems = 10
100+
101+
ctx := context.Background()
102+
reg := prometheus.NewPedanticRegistry()
103+
lru, err := WrapWithLRUCache(NewMockCache(), "test", reg, maxItems, 2*time.Hour)
104+
require.NoError(t, err)
105+
106+
// Trying to .Add() a key that already exists should result in an error
107+
require.NoError(t, lru.Set(ctx, "key_1", []byte("value_1"), time.Minute))
108+
require.NoError(t, lru.Set(ctx, "key_2", []byte("value_2"), time.Minute))
109+
require.NoError(t, lru.Set(ctx, "key_3", []byte("value_3"), time.Minute))
110+
require.ErrorIs(t, lru.Add(ctx, "key_1", []byte("value_1_2"), time.Minute), ErrNotStored)
111+
112+
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
113+
# HELP cache_memory_items_count Total number of items currently in the in-memory cache.
114+
# TYPE cache_memory_items_count gauge
115+
cache_memory_items_count{name="test"} 3
116+
`), "cache_memory_items_count"))
117+
118+
result := lru.GetMulti(ctx, []string{"key_1", "key_2", "key_3"})
119+
require.Equal(t, map[string][]byte{
120+
"key_1": []byte("value_1"),
121+
"key_2": []byte("value_2"),
122+
"key_3": []byte("value_3"),
123+
}, result)
124+
125+
// Ensure we cache back entries from the underlying cache.
126+
item, ok := lru.lru.Get("key_1")
127+
require.True(t, ok, "expected to fetch %s from inner LRU cache, got %+v", "key_1", item)
128+
require.Equal(t, []byte("value_1"), item.Data)
129+
}

cache/memcached_client.go

Lines changed: 60 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828

2929
const (
3030
dnsProviderUpdateInterval = 30 * time.Second
31+
maxTTL = 30 * 24 * time.Hour
3132
)
3233

3334
var (
@@ -43,6 +44,7 @@ var (
4344
type memcachedClientBackend interface {
4445
GetMulti(keys []string, opts ...memcache.Option) (map[string]*memcache.Item, error)
4546
Set(item *memcache.Item) error
47+
Add(item *memcache.Item) error
4648
Delete(key string) error
4749
Decrement(key string, delta uint64) (uint64, error)
4850
Increment(key string, delta uint64) (uint64, error)
@@ -322,14 +324,47 @@ func (c *MemcachedClient) SetAsync(key string, value []byte, ttl time.Duration)
322324
c.setAsync(key, value, ttl, c.setSingleItem)
323325
}
324326

327+
func (c *MemcachedClient) Set(ctx context.Context, key string, value []byte, ttl time.Duration) error {
328+
return c.storeOperation(ctx, key, value, ttl, opSet, func(ctx context.Context, key string, value []byte, ttl time.Duration) error {
329+
select {
330+
case <-ctx.Done():
331+
return ctx.Err()
332+
default:
333+
return c.setSingleItem(key, value, ttl)
334+
}
335+
})
336+
}
337+
338+
func (c *MemcachedClient) Add(ctx context.Context, key string, value []byte, ttl time.Duration) error {
339+
return c.storeOperation(ctx, key, value, ttl, opAdd, func(ctx context.Context, key string, value []byte, ttl time.Duration) error {
340+
select {
341+
case <-ctx.Done():
342+
return ctx.Err()
343+
default:
344+
ttlSeconds, ok := toSeconds(ttl)
345+
if !ok {
346+
return fmt.Errorf("%w: for set operation on %s %s", ErrInvalidTTL, key, ttl)
347+
}
348+
349+
err := c.client.Add(&memcache.Item{
350+
Key: key,
351+
Value: value,
352+
Expiration: ttlSeconds,
353+
})
354+
355+
if errors.Is(err, memcache.ErrNotStored) {
356+
return fmt.Errorf("%w: for add operation on %s", ErrNotStored, key)
357+
}
358+
359+
return err
360+
}
361+
})
362+
}
363+
325364
func (c *MemcachedClient) setSingleItem(key string, value []byte, ttl time.Duration) error {
326-
ttlSeconds := int32(ttl.Seconds())
327-
// If a TTL of exactly 0 is passed, we honor it and pass it to Memcached which will
328-
// interpret it as an infinite TTL. However, if we get a non-zero TTL that is truncated
329-
// to 0 seconds, we discard the update since the caller didn't intend to set an infinite
330-
// TTL.
331-
if ttl != 0 && ttlSeconds <= 0 {
332-
return nil
365+
ttlSeconds, ok := toSeconds(ttl)
366+
if !ok {
367+
return fmt.Errorf("%w: for set operation on %s %s", ErrInvalidTTL, key, ttl)
333368
}
334369

335370
return c.client.Set(&memcache.Item{
@@ -339,6 +374,24 @@ func (c *MemcachedClient) setSingleItem(key string, value []byte, ttl time.Durat
339374
})
340375
}
341376

377+
// toSeconds converts a time.Duration to seconds as an int32 and returns a boolean
378+
// indicating if the value is valid to be used as a TTL. Durations might not be valid
379+
// to be used for a TTL if they are non-zero but less than a second long (Memcached
380+
// uses seconds for TTL units but "0" to mean infinite TTL) or if they are longer than
381+
// 30 days (Memcached treats TTLs more than 30 days as UNIX timestamps).
382+
func toSeconds(d time.Duration) (int32, bool) {
383+
if d > maxTTL {
384+
return 0, false
385+
}
386+
387+
secs := int32(d.Seconds())
388+
if d != 0 && secs <= 0 {
389+
return 0, false
390+
}
391+
392+
return secs, true
393+
}
394+
342395
func toMemcacheOptions(opts ...Option) []memcache.Option {
343396
if len(opts) == 0 {
344397
return nil

0 commit comments

Comments
 (0)