diff --git a/go.mod b/go.mod index 01155ec..ef6cb13 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.23 require ( github.com/chenyanchen/sync v0.5.1 github.com/hashicorp/golang-lru/v2 v2.0.7 - github.com/stretchr/testify v1.8.4 + github.com/stretchr/testify v1.10.0 ) require ( diff --git a/go.sum b/go.sum index 5e200d3..61cdec8 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,8 @@ github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/layerkv/batch.go b/layerkv/batch.go new file mode 100644 index 0000000..5fb8f01 --- /dev/null +++ b/layerkv/batch.go @@ -0,0 +1,73 @@ +package layerkv + +import ( + "context" + "errors" + "maps" + "slices" + + "github.com/chenyanchen/db" +) + +type batch[K comparable, V any] struct { + cache db.BatchKV[K, V] + store db.BatchKV[K, V] +} + +func NewBatch[K comparable, V any](cache, store db.BatchKV[K, V]) (*batch[K, V], error) { + if cache == nil { + return nil, errors.New("cache is nil") + } + if store == nil { + return nil, errors.New("store is nil") + } + return &batch[K, V]{ + cache: cache, + store: store, + }, nil +} + +func (l batch[K, V]) Get(ctx context.Context, keys []K) (map[K]V, error) { + cache, err := l.cache.Get(ctx, keys) + if err != nil { + return nil, err + } + + if len(cache) == len(keys) { + return cache, nil + } + + miss := make([]K, 0, len(keys)-len(cache)) + for _, key := range keys { + if _, ok := cache[key]; !ok { + miss = append(miss, key) + } + } + + store, err := l.store.Get(ctx, miss) + if err != nil { + return nil, err + } + + for k, v := range store { + cache[k] = v + } + + return cache, l.cache.Set(ctx, store) +} + +func (l batch[K, V]) Set(ctx context.Context, kvs map[K]V) error { + if err := l.store.Set(ctx, kvs); err != nil { + return err + } + + return l.cache.Del(ctx, slices.Collect(maps.Keys(kvs))) +} + +func (l batch[K, V]) Del(ctx context.Context, keys []K) error { + if err := l.store.Del(ctx, keys); err != nil { + return err + } + + return l.cache.Del(ctx, keys) +} diff --git a/layerkv/batch_test.go b/layerkv/batch_test.go new file mode 100644 index 0000000..c1c4b8c --- /dev/null +++ b/layerkv/batch_test.go @@ -0,0 +1,205 @@ +package layerkv + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/chenyanchen/db/mocks" +) + +func Test_batch_Get(t *testing.T) { + type args[K comparable] struct { + ctx context.Context + keys []K + } + type testCase[K comparable, V any] struct { + name string + l batch[K, V] + args args[K] + want map[K]V + wantErr assert.ErrorAssertionFunc + } + tests := []testCase[string, string]{ + { + name: "cache error", + l: batch[string, string]{ + cache: &mocks.MockBatchKVStore[string, string]{ + GetFunc: func(ctx context.Context, keys []string) (map[string]string, error) { + return nil, assert.AnError + }, + }, + }, + args: args[string]{context.Background(), []string{"key1", "key2"}}, + want: nil, + wantErr: assert.Error, + }, { + name: "all from cache", + l: batch[string, string]{ + cache: &mocks.MockBatchKVStore[string, string]{ + GetFunc: func(ctx context.Context, keys []string) (map[string]string, error) { + return map[string]string{ + "key1": "value1", + "key2": "value2", + }, nil + }, + }, + }, + args: args[string]{context.Background(), []string{"key1", "key2"}}, + want: map[string]string{ + "key1": "value1", + "key2": "value2", + }, + wantErr: assert.NoError, + }, { + name: "store error", + l: batch[string, string]{ + cache: &mocks.MockBatchKVStore[string, string]{ + GetFunc: func(ctx context.Context, keys []string) (map[string]string, error) { + return map[string]string{ + "key1": "value1", + }, nil + }, + }, + store: &mocks.MockBatchKVStore[string, string]{ + GetFunc: func(ctx context.Context, keys []string) (map[string]string, error) { + return nil, assert.AnError + }, + }, + }, + args: args[string]{context.Background(), []string{"key1", "key2"}}, + want: nil, + wantErr: assert.Error, + }, { + name: "mixed", + l: batch[string, string]{ + cache: &mocks.MockBatchKVStore[string, string]{ + GetFunc: func(ctx context.Context, keys []string) (map[string]string, error) { + return map[string]string{"key1": "value1"}, nil + }, + SetFunc: func(ctx context.Context, m map[string]string) error { + return nil + }, + }, + store: &mocks.MockBatchKVStore[string, string]{ + GetFunc: func(ctx context.Context, keys []string) (map[string]string, error) { + return map[string]string{"key2": "value2"}, nil + }, + }, + }, + args: args[string]{context.Background(), []string{"key1", "key2"}}, + want: map[string]string{ + "key1": "value1", + "key2": "value2", + }, + wantErr: assert.NoError, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := tt.l.Get(tt.args.ctx, tt.args.keys) + if !tt.wantErr(t, err, fmt.Sprintf("Get(%v, %v)", tt.args.ctx, tt.args.keys)) { + return + } + assert.Equalf(t, tt.want, got, "Get(%v, %v)", tt.args.ctx, tt.args.keys) + }) + } +} + +func Test_batch_Set(t *testing.T) { + type args[K comparable, V any] struct { + ctx context.Context + kvs map[K]V + } + type testCase[K comparable, V any] struct { + name string + l batch[K, V] + args args[K, V] + wantErr assert.ErrorAssertionFunc + } + tests := []testCase[string, string]{ + { + name: "store error", + l: batch[string, string]{ + store: &mocks.MockBatchKVStore[string, string]{ + SetFunc: func(ctx context.Context, m map[string]string) error { + return assert.AnError + }, + }, + }, + args: args[string, string]{context.Background(), map[string]string{"key1": "value1"}}, + wantErr: assert.Error, + }, { + name: "cache error", + l: batch[string, string]{ + store: &mocks.MockBatchKVStore[string, string]{ + SetFunc: func(ctx context.Context, m map[string]string) error { + return nil + }, + }, + cache: &mocks.MockBatchKVStore[string, string]{ + DelFunc: func(ctx context.Context, keys []string) error { + return assert.AnError + }, + }, + }, + args: args[string, string]{context.Background(), map[string]string{"key1": "value1"}}, + wantErr: assert.Error, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.wantErr(t, tt.l.Set(tt.args.ctx, tt.args.kvs), fmt.Sprintf("Set(%v, %v)", tt.args.ctx, tt.args.kvs)) + }) + } +} + +func Test_batch_Del(t *testing.T) { + type args[K comparable] struct { + ctx context.Context + keys []K + } + type testCase[K comparable, V any] struct { + name string + l batch[K, V] + args args[K] + wantErr assert.ErrorAssertionFunc + } + tests := []testCase[string, string]{ + { + name: "store error", + l: batch[string, string]{ + store: &mocks.MockBatchKVStore[string, string]{ + DelFunc: func(ctx context.Context, keys []string) error { + return assert.AnError + }, + }, + }, + args: args[string]{}, + wantErr: assert.Error, + }, { + name: "cache error", + l: batch[string, string]{ + store: &mocks.MockBatchKVStore[string, string]{ + DelFunc: func(ctx context.Context, keys []string) error { + return nil + }, + }, + cache: &mocks.MockBatchKVStore[string, string]{ + DelFunc: func(ctx context.Context, keys []string) error { + return assert.AnError + }, + }, + }, + args: args[string]{}, + wantErr: assert.Error, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.wantErr(t, tt.l.Del(tt.args.ctx, tt.args.keys), fmt.Sprintf("Del(%v, %v)", tt.args.ctx, tt.args.keys)) + }) + } +} diff --git a/layerkv/kv.go b/layerkv/kv.go new file mode 100644 index 0000000..f5e1b64 --- /dev/null +++ b/layerkv/kv.go @@ -0,0 +1,57 @@ +package layerkv + +import ( + "context" + "errors" + + "github.com/chenyanchen/db" +) + +type layerKV[K comparable, V any] struct { + cache db.KV[K, V] + store db.KV[K, V] +} + +func New[K comparable, V any](cache, store db.KV[K, V]) (*layerKV[K, V], error) { + if cache == nil { + return nil, errors.New("cache is nil") + } + if store == nil { + return nil, errors.New("store is nil") + } + return &layerKV[K, V]{cache: cache, store: store}, nil +} + +func (l *layerKV[K, V]) Get(ctx context.Context, k K) (V, error) { + v, err := l.cache.Get(ctx, k) + if err == nil { + return v, nil + } + + if !errors.Is(err, db.ErrNotFound) { + return v, err + } + + v, err = l.store.Get(ctx, k) + if err != nil { + return v, err + } + + return v, l.cache.Set(ctx, k, v) +} + +func (l *layerKV[K, V]) Set(ctx context.Context, k K, v V) error { + if err := l.store.Set(ctx, k, v); err != nil { + return err + } + + return l.cache.Del(ctx, k) +} + +func (l *layerKV[K, V]) Del(ctx context.Context, k K) error { + if err := l.store.Del(ctx, k); err != nil { + return err + } + + return l.cache.Del(ctx, k) +} diff --git a/layerkv/kv_test.go b/layerkv/kv_test.go new file mode 100644 index 0000000..3224cf3 --- /dev/null +++ b/layerkv/kv_test.go @@ -0,0 +1,248 @@ +package layerkv + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/chenyanchen/db" + "github.com/chenyanchen/db/mocks" +) + +func Test_layerKV_Get(t *testing.T) { + type args[K comparable] struct { + ctx context.Context + k K + } + type testCase[K comparable, V any] struct { + name string + l layerKV[K, V] + args args[K] + want V + wantErr assert.ErrorAssertionFunc + } + tests := []testCase[string, string]{ + { + name: "from cache", + l: layerKV[string, string]{ + cache: &mocks.MockKVStore[string, string]{ + GetFunc: func(ctx context.Context, k string) (string, error) { + return "value", nil + }, + }, + }, + args: args[string]{context.Background(), "key"}, + want: "value", + wantErr: assert.NoError, + }, { + name: "cache error", + l: layerKV[string, string]{ + cache: &mocks.MockKVStore[string, string]{ + GetFunc: func(ctx context.Context, k string) (string, error) { + return "", assert.AnError + }, + }, + }, + args: args[string]{context.Background(), "key"}, + want: "", + wantErr: assert.Error, + }, { + name: "store error", + l: layerKV[string, string]{ + cache: &mocks.MockKVStore[string, string]{ + GetFunc: func(ctx context.Context, k string) (string, error) { + return "", db.ErrNotFound + }, + }, + store: &mocks.MockKVStore[string, string]{ + GetFunc: func(ctx context.Context, k string) (string, error) { + return "", assert.AnError + }, + }, + }, + args: args[string]{context.Background(), "key"}, + want: "", + wantErr: assert.Error, + }, { + name: "cache set error", + l: layerKV[string, string]{ + cache: &mocks.MockKVStore[string, string]{ + GetFunc: func(ctx context.Context, k string) (string, error) { + return "", db.ErrNotFound + }, + SetFunc: func(ctx context.Context, k string, v string) error { + return assert.AnError + }, + }, + store: &mocks.MockKVStore[string, string]{ + GetFunc: func(ctx context.Context, k string) (string, error) { + return "value", nil + }, + }, + }, + args: args[string]{context.Background(), "key"}, + want: "value", + wantErr: assert.Error, + }, { + name: "no error", + l: layerKV[string, string]{ + cache: &mocks.MockKVStore[string, string]{ + GetFunc: func(ctx context.Context, k string) (string, error) { + return "", db.ErrNotFound + }, + SetFunc: func(ctx context.Context, k string, v string) error { + return nil + }, + }, + store: &mocks.MockKVStore[string, string]{ + GetFunc: func(ctx context.Context, k string) (string, error) { + return "value", nil + }, + }, + }, + args: args[string]{context.Background(), "key"}, + want: "value", + wantErr: assert.NoError, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := tt.l.Get(tt.args.ctx, tt.args.k) + if !tt.wantErr(t, err, fmt.Sprintf("Get(%v, %v)", tt.args.ctx, tt.args.k)) { + return + } + assert.Equalf(t, tt.want, got, "Get(%v, %v)", tt.args.ctx, tt.args.k) + }) + } +} + +func Test_layerKV_Set(t *testing.T) { + type args[K comparable, V any] struct { + ctx context.Context + k K + v V + } + type testCase[K comparable, V any] struct { + name string + l layerKV[K, V] + args args[K, V] + wantErr assert.ErrorAssertionFunc + } + tests := []testCase[string, string]{ + { + name: "store error", + l: layerKV[string, string]{ + store: &mocks.MockKVStore[string, string]{ + SetFunc: func(ctx context.Context, k string, v string) error { + return assert.AnError + }, + }, + }, + args: args[string, string]{context.Background(), "key", "value"}, + wantErr: assert.Error, + }, { + name: "cache error", + l: layerKV[string, string]{ + store: &mocks.MockKVStore[string, string]{ + SetFunc: func(ctx context.Context, k string, v string) error { + return nil + }, + }, + cache: &mocks.MockKVStore[string, string]{ + DelFunc: func(ctx context.Context, k string) error { + return assert.AnError + }, + }, + }, + args: args[string, string]{context.Background(), "key", "value"}, + wantErr: assert.Error, + }, { + name: "no error", + l: layerKV[string, string]{ + cache: &mocks.MockKVStore[string, string]{ + DelFunc: func(ctx context.Context, k string) error { + return nil + }, + }, + store: &mocks.MockKVStore[string, string]{ + SetFunc: func(ctx context.Context, k string, v string) error { + return nil + }, + }, + }, + args: args[string, string]{context.Background(), "key", "value"}, + wantErr: assert.NoError, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.wantErr(t, tt.l.Set(tt.args.ctx, tt.args.k, tt.args.v), fmt.Sprintf("Set(%v, %v, %v)", tt.args.ctx, tt.args.k, tt.args.v)) + }) + } +} + +func Test_layerKV_Del(t *testing.T) { + type args[K comparable] struct { + ctx context.Context + k K + } + type testCase[K comparable, V any] struct { + name string + l layerKV[K, V] + args args[K] + wantErr assert.ErrorAssertionFunc + } + tests := []testCase[string, string]{ + { + name: "store error", + l: layerKV[string, string]{ + store: &mocks.MockKVStore[string, string]{ + DelFunc: func(ctx context.Context, k string) error { + return assert.AnError + }, + }, + }, + args: args[string]{context.Background(), "key"}, + wantErr: assert.Error, + }, { + name: "cache error", + l: layerKV[string, string]{ + store: &mocks.MockKVStore[string, string]{ + DelFunc: func(ctx context.Context, k string) error { + return nil + }, + }, + cache: &mocks.MockKVStore[string, string]{ + DelFunc: func(ctx context.Context, k string) error { + return assert.AnError + }, + }, + }, + args: args[string]{context.Background(), "key"}, + wantErr: assert.Error, + }, { + name: "no error", + l: layerKV[string, string]{ + cache: &mocks.MockKVStore[string, string]{ + DelFunc: func(ctx context.Context, k string) error { + return nil + }, + }, + store: &mocks.MockKVStore[string, string]{ + DelFunc: func(ctx context.Context, k string) error { + return nil + }, + }, + }, + args: args[string]{context.Background(), "key"}, + wantErr: assert.NoError, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.wantErr(t, tt.l.Del(tt.args.ctx, tt.args.k), fmt.Sprintf("Del(%v, %v)", tt.args.ctx, tt.args.k)) + }) + } +} diff --git a/layerkv/layer.go b/layerkv/layer.go deleted file mode 100644 index ae240b3..0000000 --- a/layerkv/layer.go +++ /dev/null @@ -1,126 +0,0 @@ -package layerkv - -import ( - "context" - "errors" - "time" - - "github.com/chenyanchen/db" -) - -type State int8 - -const ( - StateHit = iota - StateMiss - StateError -) - -func (s State) String() string { - switch s { - case StateHit: - return "hit" - case StateMiss: - return "miss" - case StateError: - return "error" - } - return "unknown" -} - -type telemetryFn[K comparable, V any] func(key K, state State, layer int) - -type layerKV[K comparable, V any] struct { - layers []db.KV[K, V] - - // telFn is a telemetry function to record the state of the key. - telFn telemetryFn[K, V] - - // writebackTimeout is the timeout for write back operation. - // If it is zero, the write back operation will be synchronous, - // Otherwise, it will be asynchronous with the given timeout. - writebackTimeout time.Duration -} - -func NewLayerKV[K comparable, V any]( - telFn telemetryFn[K, V], - writebackTimeout time.Duration, - layers ...db.KV[K, V], -) *layerKV[K, V] { - return &layerKV[K, V]{ - layers: layers, - telFn: telFn, - writebackTimeout: writebackTimeout, - } -} - -func (l *layerKV[K, V]) Get(ctx context.Context, k K) (V, error) { - misses := make([]db.KV[K, V], 0, len(l.layers)-1) - for i, layer := range l.layers { - got, err := layer.Get(ctx, k) - if err != nil { - if errors.Is(err, db.ErrNotFound) { - misses = append(misses, layer) - l.telemetry(k, StateMiss, i) - continue - } - - l.telemetry(k, StateError, i) - return got, err - } - - // write back into missed layers. - l.writeback(ctx, misses, k, got) - - l.telemetry(k, StateHit, i) - return got, nil - } - - var v V - return v, db.ErrNotFound -} - -func (l *layerKV[K, V]) telemetry(k K, state State, layer int) { - if l.telFn != nil { - // Record the state of the key. - // layer+1 for human-readable. - l.telFn(k, state, layer+1) - } -} - -func (l *layerKV[K, V]) writeback(ctx context.Context, layers []db.KV[K, V], k K, got V) { - if l.writebackTimeout <= 0 { - l._writeback(ctx, layers, k, got) - return - } - - go func() { - asyncCtx, cancel := context.WithTimeout(ctx, l.writebackTimeout) - defer cancel() - l._writeback(asyncCtx, layers, k, got) - }() -} - -func (l *layerKV[K, V]) _writeback(ctx context.Context, layers []db.KV[K, V], k K, got V) { - for _, layer := range layers { - _ = layer.Set(ctx, k, got) - } -} - -func (l *layerKV[K, V]) Set(ctx context.Context, k K, v V) error { - for _, layer := range l.layers { - if err := layer.Set(ctx, k, v); err != nil { - return err - } - } - return nil -} - -func (l *layerKV[K, V]) Del(ctx context.Context, k K) error { - for _, layer := range l.layers { - if err := layer.Del(ctx, k); err != nil { - return err - } - } - return nil -}