-
Notifications
You must be signed in to change notification settings - Fork 0
/
fillcache.go
executable file
·166 lines (142 loc) · 3.81 KB
/
fillcache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
// Package fillcache is an in-process cache with single-flight filling
// semantics.
//
// In short: Given a function that computes the value a cache key, it will
// ensure that the function is called only once per key no matter how many
// concurrent cache gets are issued for a key.
package fillcache
import (
"context"
"sync"
"time"
)
// Config configures a Cache.
type Config struct {
TTL time.Duration
ServeStale bool
}
// Cache is a cache whose entries are calculated and filled on-demand.
type Cache[T any] struct {
// a function that knows how to compute the value for a cache key
filler Filler[T]
// an optional TTL for cache entries; if unset, cache entries never
// expire
ttl time.Duration
// if a cache value has expired, should the stale value be used if an error
// is encountered during update?
serveStale bool
cache map[string]*cacheEntry[T]
inflight map[string]*fillRequest[T]
mu sync.RWMutex
}
// New creates a Cache whose entries will be computed by the given Filler.
func New[T any](filler Filler[T], cfg *Config) *Cache[T] {
c := &Cache[T]{
filler: filler,
cache: make(map[string]*cacheEntry[T]),
inflight: make(map[string]*fillRequest[T]),
}
if cfg != nil {
c.ttl = cfg.TTL
c.serveStale = cfg.ServeStale
}
return c
}
// Filler is a function that computes the value to cache for a given key.
type Filler[T any] func(ctx context.Context, key string) (val T, err error)
// Get returns the cache value for the given key, computing it as necessary.
func (c *Cache[T]) Get(ctx context.Context, key string) (T, error) {
c.mu.Lock()
entry, found := c.cache[key]
c.mu.Unlock()
if found && !entry.expired() {
return entry.val, nil
}
val, err := c.Update(ctx, key)
if err != nil {
if c.serveStale && found {
// TODO: should this return something like ErrStaleResults so that
// consumers know an error occurred?
return entry.val, nil
}
var zero T
return zero, err
}
return val, err
}
// Update recomputes, stores, and returns the value for the given key. If an
// error occurs, the cache is not updated.
//
// Update can be used to proactively update cache entries without waiting for a
// Get.
func (c *Cache[T]) Update(ctx context.Context, key string) (T, error) {
c.mu.Lock()
// Another goroutine is updating this entry, just wait for it to finish
if w, waiting := c.inflight[key]; waiting {
c.mu.Unlock()
return w.wait(ctx)
}
// Otherwise, we'll update this entry ourselves
r := newFillRequest[T]()
c.inflight[key] = r
c.mu.Unlock()
val, err := c.filler(ctx, key)
c.mu.Lock()
defer c.mu.Unlock()
r.finish(val, err)
delete(c.inflight, key)
if err == nil {
c.cache[key] = newCacheEntry(val, c.ttl)
}
return val, err
}
// Size returns the number of entries in the cache
func (c *Cache[T]) Size() int {
c.mu.RLock()
defer c.mu.RUnlock()
return len(c.cache)
}
// cacheEntry captures a cached value and its optional expiration time
type cacheEntry[T any] struct {
val T
expiresAt time.Time
}
func newCacheEntry[T any](val T, ttl time.Duration) *cacheEntry[T] {
var expiresAt time.Time
if ttl > 0 {
expiresAt = time.Now().Add(ttl)
}
return &cacheEntry[T]{
val: val,
expiresAt: expiresAt,
}
}
func (e *cacheEntry[T]) expired() bool {
return !e.expiresAt.IsZero() && e.expiresAt.Before(time.Now())
}
// fillRequest represents an outstanding computation of the value for a cache
// key
type fillRequest[T any] struct {
val T
err error
done chan struct{}
}
func newFillRequest[T any]() *fillRequest[T] {
return &fillRequest[T]{
done: make(chan struct{}),
}
}
func (r *fillRequest[T]) wait(ctx context.Context) (T, error) {
select {
case <-ctx.Done():
var zero T
return zero, ctx.Err()
case <-r.done:
return r.val, r.err
}
}
func (r *fillRequest[T]) finish(val T, err error) {
r.val = val
r.err = err
close(r.done)
}