Skip to content

Commit

Permalink
cache: Add .Set and .Add methods to cache clients
Browse files Browse the repository at this point in the history
This change adds a synchronous version of `.Set` to Memcached and Redis
clients as well as the various `Cache` wrapper implementations. This
allows callers to set a key and be sure it exists in the cache. This
change also adds an `.Add` method which conditionally adds an item to
the cache only if it does not already exist.

This change is a prerequisite for grafana/mimir#9386

Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com>
  • Loading branch information
56quarters committed Oct 3, 2024
1 parent f52de24 commit 2f51483
Show file tree
Hide file tree
Showing 11 changed files with 292 additions and 28 deletions.
13 changes: 13 additions & 0 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

var (
ErrNotStored = errors.New("item not stored")
ErrInvalidTTL = errors.New("invalid TTL")
)

// Cache is a high level interface to interact with a cache.
type Cache interface {
// GetMulti fetches multiple keys at once from a cache. In case of error,
Expand All @@ -28,6 +33,14 @@ type Cache interface {
// any underlying async operations fail, the errors will be tracked/logged.
SetMultiAsync(data map[string][]byte, ttl time.Duration)

// Set stores a key and value into a cache.
Set(ctx context.Context, key string, value []byte, ttl time.Duration) error

// Add stores a key and value into a cache only if it does not already exist. If the
// item was not stored because an entry already exists in the cache, ErrNotStored will
// be returned.
Add(ctx context.Context, key string, value []byte, ttl time.Duration) error

// Delete deletes a key from a cache. This is a synchronous operation. If an asynchronous
// set operation for key is still pending to be processed, it will wait for it to complete
// before performing deletion.
Expand Down
62 changes: 45 additions & 17 deletions cache/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
// Common functionality shared between the Memcached and Redis Cache implementations

const (
opAdd = "add"
opSet = "set"
opGetMulti = "getmulti"
opDelete = "delete"
Expand All @@ -29,6 +30,8 @@ const (
reasonMaxItemSize = "max-item-size"
reasonAsyncBufferFull = "async-buffer-full"
reasonMalformedKey = "malformed-key"
reasonInvalidTTL = "invalid-ttl"
reasonNotStored = "not-stored"
reasonConnectTimeout = "connect-timeout"
reasonTimeout = "request-timeout"
reasonServerError = "server-error"
Expand Down Expand Up @@ -85,10 +88,12 @@ func newClientMetrics(reg prometheus.Registerer) *clientMetrics {
Name: "operation_failures_total",
Help: "Total number of operations against cache that failed.",
}, []string{"operation", "reason"})
for _, op := range []string{opGetMulti, opSet, opDelete, opIncrement, opFlush, opTouch, opCompareAndSwap} {
for _, op := range []string{opGetMulti, opAdd, opSet, opDelete, opIncrement, opFlush, opTouch, opCompareAndSwap} {
cm.failures.WithLabelValues(op, reasonConnectTimeout)
cm.failures.WithLabelValues(op, reasonTimeout)
cm.failures.WithLabelValues(op, reasonMalformedKey)
cm.failures.WithLabelValues(op, reasonInvalidTTL)
cm.failures.WithLabelValues(op, reasonNotStored)
cm.failures.WithLabelValues(op, reasonServerError)
cm.failures.WithLabelValues(op, reasonNetworkError)
cm.failures.WithLabelValues(op, reasonOther)
Expand All @@ -99,6 +104,7 @@ func newClientMetrics(reg prometheus.Registerer) *clientMetrics {
Help: "Total number of operations against cache that have been skipped.",
}, []string{"operation", "reason"})
cm.skipped.WithLabelValues(opGetMulti, reasonMaxItemSize)
cm.skipped.WithLabelValues(opAdd, reasonMaxItemSize)
cm.skipped.WithLabelValues(opSet, reasonMaxItemSize)
cm.skipped.WithLabelValues(opSet, reasonAsyncBufferFull)

Expand All @@ -112,6 +118,7 @@ func newClientMetrics(reg prometheus.Registerer) *clientMetrics {
NativeHistogramMinResetDuration: time.Hour,
}, []string{"operation"})
cm.duration.WithLabelValues(opGetMulti)
cm.duration.WithLabelValues(opAdd)
cm.duration.WithLabelValues(opSet)
cm.duration.WithLabelValues(opDelete)
cm.duration.WithLabelValues(opIncrement)
Expand All @@ -129,6 +136,7 @@ func newClientMetrics(reg prometheus.Registerer) *clientMetrics {
[]string{"operation"},
)
cm.dataSize.WithLabelValues(opGetMulti)
cm.dataSize.WithLabelValues(opAdd)
cm.dataSize.WithLabelValues(opSet)
cm.dataSize.WithLabelValues(opCompareAndSwap)

Expand Down Expand Up @@ -172,22 +180,12 @@ func (c *baseClient) setAsync(key string, value []byte, ttl time.Duration, f fun
}

err := c.asyncQueue.submit(func() {
start := time.Now()
c.metrics.operations.WithLabelValues(opSet).Inc()

err := f(key, value, ttl)
if err != nil {
level.Debug(c.logger).Log(
"msg", "failed to store item to cache",
"key", key,
"sizeBytes", len(value),
"err", err,
)
c.trackError(opSet, err)
}

c.metrics.dataSize.WithLabelValues(opSet).Observe(float64(len(value)))
c.metrics.duration.WithLabelValues(opSet).Observe(time.Since(start).Seconds())
// Because this operation is executed in a separate goroutine: We run the operation without
// a context (it is expected to keep running no matter what happens) and we don't return the
// error (it will be tracked via metrics instead of being returned to the caller).
_ = c.storeOperation(context.Background(), key, value, ttl, opSet, func(_ context.Context, key string, value []byte, ttl time.Duration) error {
return f(key, value, ttl)
})
})

if err != nil {
Expand All @@ -196,6 +194,32 @@ func (c *baseClient) setAsync(key string, value []byte, ttl time.Duration, f fun
}
}

func (c *baseClient) storeOperation(ctx context.Context, key string, value []byte, ttl time.Duration, operation string, f func(ctx context.Context, key string, value []byte, ttl time.Duration) error) error {
if c.maxItemSize > 0 && uint64(len(value)) > c.maxItemSize {
c.metrics.skipped.WithLabelValues(operation, reasonMaxItemSize).Inc()
return nil
}

start := time.Now()
c.metrics.operations.WithLabelValues(operation).Inc()

err := f(ctx, key, value, ttl)
if err != nil {
level.Debug(c.logger).Log(
"msg", "failed to store item to cache",
"operation", operation,
"key", key,
"sizeBytes", len(value),
"err", err,
)
c.trackError(operation, err)
}

c.metrics.dataSize.WithLabelValues(operation).Observe(float64(len(value)))
c.metrics.duration.WithLabelValues(operation).Observe(time.Since(start).Seconds())
return err
}

// wait submits an async task and blocks until it completes. This can be used during
// tests to ensure that async "sets" have completed before attempting to read them.
func (c *baseClient) wait() error {
Expand Down Expand Up @@ -255,6 +279,10 @@ func (c *baseClient) trackError(op string, err error) {
} else {
c.metrics.failures.WithLabelValues(op, reasonNetworkError).Inc()
}
case errors.Is(err, ErrNotStored):
c.metrics.failures.WithLabelValues(op, reasonNotStored).Inc()
case errors.Is(err, ErrInvalidTTL):
c.metrics.failures.WithLabelValues(op, reasonInvalidTTL).Inc()
case errors.Is(err, memcache.ErrMalformedKey):
c.metrics.failures.WithLabelValues(op, reasonMalformedKey).Inc()
case errors.Is(err, memcache.ErrServerError):
Expand Down
8 changes: 8 additions & 0 deletions cache/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ func (s *SnappyCache) SetMultiAsync(data map[string][]byte, ttl time.Duration) {
s.next.SetMultiAsync(encoded, ttl)
}

func (s *SnappyCache) Set(ctx context.Context, key string, value []byte, ttl time.Duration) error {
return s.next.Set(ctx, key, snappy.Encode(nil, value), ttl)
}

func (s *SnappyCache) Add(ctx context.Context, key string, value []byte, ttl time.Duration) error {
return s.next.Add(ctx, key, snappy.Encode(nil, value), ttl)
}

// GetMulti implements Cache.
func (s *SnappyCache) GetMulti(ctx context.Context, keys []string, opts ...Option) map[string][]byte {
found := s.next.GetMulti(ctx, keys, opts...)
Expand Down
35 changes: 35 additions & 0 deletions cache/lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,41 @@ func (l *LRUCache) SetMultiAsync(data map[string][]byte, ttl time.Duration) {
}
}

func (l *LRUCache) Set(ctx context.Context, key string, value []byte, ttl time.Duration) error {
err := l.c.Set(ctx, key, value, ttl)

l.mtx.Lock()
defer l.mtx.Unlock()

expires := time.Now().Add(ttl)
l.lru.Add(key, &Item{
Data: value,
ExpiresAt: expires,
})

return err
}

func (l *LRUCache) Add(ctx context.Context, key string, value []byte, ttl time.Duration) error {
err := l.c.Add(ctx, key, value, ttl)

// When a caller uses the Add method, the presence of absence of an entry in the cache
// has significance. In order to maintain the semantics of that, we only add an entry to
// the LRU when it was able to be successfully added to the shared cache.
if err == nil {
l.mtx.Lock()
defer l.mtx.Unlock()

expires := time.Now().Add(ttl)
l.lru.Add(key, &Item{
Data: value,
ExpiresAt: expires,
})
}

return err
}

func (l *LRUCache) GetMulti(ctx context.Context, keys []string, opts ...Option) (result map[string][]byte) {
l.requests.Add(float64(len(keys)))
l.mtx.Lock()
Expand Down
63 changes: 56 additions & 7 deletions cache/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

const (
dnsProviderUpdateInterval = 30 * time.Second
maxTTL = 30 * 24 * time.Hour
)

var (
Expand All @@ -43,6 +44,7 @@ var (
type memcachedClientBackend interface {
GetMulti(keys []string, opts ...memcache.Option) (map[string]*memcache.Item, error)
Set(item *memcache.Item) error
Add(item *memcache.Item) error
Delete(key string) error
Decrement(key string, delta uint64) (uint64, error)
Increment(key string, delta uint64) (uint64, error)
Expand Down Expand Up @@ -322,14 +324,47 @@ func (c *MemcachedClient) SetAsync(key string, value []byte, ttl time.Duration)
c.setAsync(key, value, ttl, c.setSingleItem)
}

func (c *MemcachedClient) Set(ctx context.Context, key string, value []byte, ttl time.Duration) error {
return c.storeOperation(ctx, key, value, ttl, opSet, func(ctx context.Context, key string, value []byte, ttl time.Duration) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
return c.setSingleItem(key, value, ttl)
}
})
}

func (c *MemcachedClient) Add(ctx context.Context, key string, value []byte, ttl time.Duration) error {
return c.storeOperation(ctx, key, value, ttl, opAdd, func(ctx context.Context, key string, value []byte, ttl time.Duration) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
ttlSeconds, ok := toSeconds(ttl)
if !ok {
return fmt.Errorf("%w: for set operation on %s %s", ErrInvalidTTL, key, ttl)
}

err := c.client.Add(&memcache.Item{
Key: key,
Value: value,
Expiration: ttlSeconds,
})

if errors.Is(err, memcache.ErrNotStored) {
return fmt.Errorf("%w: for add operation on %s", ErrNotStored, key)
}

return err
}
})
}

func (c *MemcachedClient) setSingleItem(key string, value []byte, ttl time.Duration) error {
ttlSeconds := int32(ttl.Seconds())
// If a TTL of exactly 0 is passed, we honor it and pass it to Memcached which will
// interpret it as an infinite TTL. However, if we get a non-zero TTL that is truncated
// to 0 seconds, we discard the update since the caller didn't intend to set an infinite
// TTL.
if ttl != 0 && ttlSeconds <= 0 {
return nil
ttlSeconds, ok := toSeconds(ttl)
if !ok {
return fmt.Errorf("%w: for set operation on %s %s", ErrInvalidTTL, key, ttl)
}

return c.client.Set(&memcache.Item{
Expand All @@ -339,6 +374,20 @@ func (c *MemcachedClient) setSingleItem(key string, value []byte, ttl time.Durat
})
}

// TODO: Docs
func toSeconds(d time.Duration) (int32, bool) {
if d > maxTTL {
return 0, false
}

secs := int32(d.Seconds())
if d != 0 && secs <= 0 {
return 0, false
}

return secs, true
}

func toMemcacheOptions(opts ...Option) []memcache.Option {
if len(opts) == 0 {
return nil
Expand Down
63 changes: 63 additions & 0 deletions cache/memcached_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,60 @@ func TestMemcachedClient_SetAsync(t *testing.T) {
})
}

func TestMemcachedClient_Set(t *testing.T) {
t.Run("with non-zero TTL", func(t *testing.T) {
ctx := context.Background()
client, _, err := setupDefaultMemcachedClient()
require.NoError(t, err)
require.NoError(t, client.Set(ctx, "foo", []byte("bar"), time.Minute))

res := client.GetMulti(ctx, []string{"foo"})
require.Equal(t, map[string][]byte{"foo": []byte("bar")}, res)
})

t.Run("with truncated TTL", func(t *testing.T) {
ctx := context.Background()
client, _, err := setupDefaultMemcachedClient()
require.NoError(t, err)
err = client.Set(ctx, "foo", []byte("bar"), 100*time.Millisecond)
require.ErrorIs(t, err, ErrInvalidTTL)
})

t.Run("with zero TTL", func(t *testing.T) {
ctx := context.Background()
client, _, err := setupDefaultMemcachedClient()
require.NoError(t, err)
require.NoError(t, client.Set(ctx, "foo", []byte("bar"), 0))

res := client.GetMulti(ctx, []string{"foo"})
require.Equal(t, map[string][]byte{"foo": []byte("bar")}, res)
})
}

func TestMemcachedClient_Add(t *testing.T) {
t.Run("item does not exist", func(t *testing.T) {
ctx := context.Background()
client, _, err := setupDefaultMemcachedClient()
require.NoError(t, err)
require.NoError(t, client.Add(ctx, "foo", []byte("bar"), time.Minute))
})

t.Run("item already exists", func(t *testing.T) {
ctx := context.Background()
client, mock, err := setupDefaultMemcachedClient()
require.NoError(t, err)

require.NoError(t, mock.Set(&memcache.Item{
Key: "foo",
Value: []byte("baz"),
Expiration: 60,
}))

err = client.Add(ctx, "foo", []byte("bar"), time.Minute)
require.ErrorIs(t, err, ErrNotStored)
})
}

func TestMemcachedClient_GetMulti(t *testing.T) {
t.Run("no allocator", func(t *testing.T) {
client, backend, err := setupDefaultMemcachedClient()
Expand Down Expand Up @@ -334,6 +388,15 @@ func (m *mockMemcachedClientBackend) Set(item *memcache.Item) error {
return nil
}

func (m *mockMemcachedClientBackend) Add(item *memcache.Item) error {
if _, ok := m.values[item.Key]; ok {
return memcache.ErrNotStored
}

m.values[item.Key] = item
return nil
}

func (m *mockMemcachedClientBackend) Delete(key string) error {
delete(m.values, key)
return nil
Expand Down
Loading

0 comments on commit 2f51483

Please sign in to comment.