From 516f035bcbf9ad23c073c655ebb6dba5f18c511c Mon Sep 17 00:00:00 2001 From: Thuan Nguyen Date: Mon, 10 Jun 2024 12:05:32 +0700 Subject: [PATCH] Initial version --- .github/workflows/main.yml | 43 ++++++++ .idea/.gitignore | 8 ++ .idea/kvsync.iml | 9 ++ .idea/modules.xml | 8 ++ .idea/vcs.xml | 6 ++ README.md | 139 +++++++++++++++++++++++++ go.mod | 25 +++++ go.sum | 37 +++++++ kvsync.go | 199 +++++++++++++++++++++++++++++++++++ kvsync_test.go | 206 +++++++++++++++++++++++++++++++++++++ memory.go | 47 +++++++++ redis.go | 85 +++++++++++++++ redis_test.go | 158 ++++++++++++++++++++++++++++ 13 files changed, 970 insertions(+) create mode 100644 .github/workflows/main.yml create mode 100644 .idea/.gitignore create mode 100644 .idea/kvsync.iml create mode 100644 .idea/modules.xml create mode 100644 .idea/vcs.xml create mode 100644 README.md create mode 100644 go.mod create mode 100644 go.sum create mode 100644 kvsync.go create mode 100644 kvsync_test.go create mode 100644 memory.go create mode 100644 redis.go create mode 100644 redis_test.go diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml new file mode 100644 index 0000000..df400aa --- /dev/null +++ b/.github/workflows/main.yml @@ -0,0 +1,43 @@ +name: Go + +on: + push: + branches: [ master ] + pull_request: + branches: [ master ] + +permissions: + contents: write + +jobs: + + build: + name: Build + runs-on: ubuntu-latest + strategy: + max-parallel: 1 + matrix: + go-version: ['1.18', '1.19', '1.20', '1.21', '1.22'] + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 10 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version: ${{ matrix.go-version }} + id: go + + - name: Get dependencies + run: go get -v -t -d ./... + + - uses: gwatts/go-coverage-action@v2 + id: coverage + with: + # Optional coverage threshold + # use fail-coverage to determine what should happen below this threshold + coverage-threshold: 99.999999 + + # collect coverage for all packages beyond the one under test + cover-pkg: ./... diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/kvsync.iml b/.idea/kvsync.iml new file mode 100644 index 0000000..5e764c4 --- /dev/null +++ b/.idea/kvsync.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..c62eb75 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..6680a6a --- /dev/null +++ b/README.md @@ -0,0 +1,139 @@ +# KVSync + +KVSync is a Go package that provides a simple and efficient way to synchronize your GORM models with a key-value store. + +There can be multiple key definitions for each model. For example, you can have a key for fetching by ID, a key for fetching by UUID, and a composite key for fetching by both ID and UUID. For each key, the model data is replicated accordingly to the key-value store. + +## Installation + +To install KVSync, use the `go get` command: + +```bash +go get github.com/ndthuan/kvsync +``` + +## Sync Setup + +### Define Synced Models + +Implement `kvsync.Syncable` and provide sync keys mapped by a name for fetching later. Each key is unique on the key-value store. + +```go +type SyncedUser struct { + gorm.Model + UUID string + Username string +} + +func (u SyncedUser) SyncKeys() map[string]string { + return map[string]string{ + "id": fmt.Sprintf("user:id:%d", u.ID), + "uuid": fmt.Sprintf("user:uuid:%s", u.UUID), + "composite": fmt.Sprintf("user:composite:%d_%s", u.ID, u.UUID), + } +} + +``` + +### Configure Key-Value Store + +With Redis for example, you can use the provided `RedisStore`. Steps: +- Init GORM DB instance +- Init Redis client +- Create a new `RedisStore` instance +- Create a new `KVSync` instance +- Register GORM callbacks + +```go +package main + +import ( + "context" + "fmt" + "github.com/ndthuan/kvsync" + "github.com/redis/go-redis/v9" + "gorm.io/gorm" + "time" +) + +func main() { + db, err := gorm.Open(...) + if err != nil { + panic(fmt.Sprintf("Failed to connect to database: %v", err)) + } + + clusterClient := redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: []string{}, + }) + + // Create a new RedisStore + store := &kvsync.RedisStore{ + Client: clusterClient, + Expiration: time.Hour * 24 * 365, // Set the expiration time for the keys + Prefix: "kvsync:", // Optional, defaults to "kvsync:" + Marshaler: &kvsync.BSONMarshalingAdapter{}, // Optional, BSONMarshalingAdapter (using Mongo's BSON) is the default and recommended + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + kvSync := kvsync.NewKVSync(ctx, kvsync.Options{ + Store: store, + Workers: 4, + ReportCallback: func(r kvsync.Report) { + if r.Err == nil { + actualDoneCount++ + } + }, + }) + + // Register the GORM callbacks for automated synchronization + db.Callback().Create().After("gorm:create").Register("kvsync:create", kvSync.GormCallback()) + db.Callback().Update().After("gorm:update").Register("kvsync:update", kvSync.GormCallback()) + +} +``` + +### And create/update your model as usual + +```go +// Create a new SyncedUser +db.Create(&SyncedUser{ + UUID: "test-uuid", + Username: "test-username", +}) +// The SyncedUser is automatically synchronized with the key-value store +``` + +## Fetching Synced Models + +You can fetch the model by any of the keys you defined. You must provide a struct with non-zero values for the keys you want to fetch by. + +By ID +```go +user := SyncedUser{ + Model: gorm.Model{ID: 1}, +} +kvSync.Fetch(&user, "id") +``` + +By UUID +```go +user := SyncedUser{ + UUID: "test-uuid", +} +kvSync.Fetch(&user, "uuid") +``` + +By composite key +```go +user := SyncedUser{ + Model: gorm.Model{ID: 1}, + UUID: "test-uuid", +} +kvSync.Fetch(&user, "composite") +``` + +## License + +KVSync is licensed under the MIT License. See the [LICENSE](LICENSE) file for more information. diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..3cd6944 --- /dev/null +++ b/go.mod @@ -0,0 +1,25 @@ +module github.com/ndthuan/kvsync + +go 1.18 + +require ( + github.com/alicebob/miniredis/v2 v2.33.0 + github.com/redis/go-redis/v9 v9.5.3 + github.com/stretchr/testify v1.9.0 + go.mongodb.org/mongo-driver v1.15.0 + gorm.io/driver/sqlite v1.5.5 + gorm.io/gorm v1.25.10 +) + +require ( + github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/jinzhu/inflection v1.0.0 // indirect + github.com/jinzhu/now v1.1.5 // indirect + github.com/mattn/go-sqlite3 v1.14.17 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/yuin/gopher-lua v1.1.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..e624c20 --- /dev/null +++ b/go.sum @@ -0,0 +1,37 @@ +github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk= +github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= +github.com/alicebob/miniredis/v2 v2.33.0 h1:uvTF0EDeu9RLnUEG27Db5I68ESoIxTiXbNUiji6lZrA= +github.com/alicebob/miniredis/v2 v2.33.0/go.mod h1:MhP4a3EU7aENRi9aO+tHfTBZicLqQevyi/DJpoj6mi0= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM= +github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= +github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= +github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +github.com/mattn/go-sqlite3 v1.14.17 h1:mCRHCLDUBXgpKAqIKsaAaAsrAlbkeomtRFKXh2L6YIM= +github.com/mattn/go-sqlite3 v1.14.17/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/redis/go-redis/v9 v9.5.3 h1:fOAp1/uJG+ZtcITgZOfYFmTKPE7n4Vclj1wZFgRciUU= +github.com/redis/go-redis/v9 v9.5.3/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M= +github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= +go.mongodb.org/mongo-driver v1.15.0 h1:rJCKC8eEliewXjZGf0ddURtl7tTVy1TK3bfl0gkUSLc= +go.mongodb.org/mongo-driver v1.15.0/go.mod h1:Vzb0Mk/pa7e6cWw85R4F/endUC3u0U9jGcNU603k65c= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gorm.io/driver/sqlite v1.5.5 h1:7MDMtUZhV065SilG62E0MquljeArQZNfJnjd9i9gx3E= +gorm.io/driver/sqlite v1.5.5/go.mod h1:6NgQ7sQWAIFsPrJJl1lSNSu2TABh0ZZ/zm5fosATavE= +gorm.io/gorm v1.25.10 h1:dQpO+33KalOA+aFYGlK+EfxcI5MbO7EP2yYygwh9h+s= +gorm.io/gorm v1.25.10/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8= diff --git a/kvsync.go b/kvsync.go new file mode 100644 index 0000000..db6baca --- /dev/null +++ b/kvsync.go @@ -0,0 +1,199 @@ +package kvsync + +import ( + "context" + "errors" + "gorm.io/gorm" + "reflect" +) + +// KVStore is the interface for a key-value store +type KVStore interface { + Put(key string, value any) error + Fetch(key string, dest any) error +} + +// Syncable is the interface for a Gorm model that can be synced with a KVStore +type Syncable interface { + SyncKeys() map[string]string +} + +// Report is a struct that represents a report of a sync operation +type Report struct { + Model any + KeyName string + Key string + Err error +} + +type ReportCallback func(Report) + +// KVSync is the interface for a service that syncs Gorm models with a KVStore +type KVSync interface { + Fetch(dest Syncable, keyName string) error + GormCallback() func(db *gorm.DB) + Sync(entity any) error +} + +// Options is a struct that contains options for creating a KVSync instance +type Options struct { + Store KVStore + Workers int + ReportCallback ReportCallback +} + +// NewKVSync creates a new KVSync instance +func NewKVSync(ctx context.Context, options Options) KVSync { + workers := options.Workers + if workers < 1 { + workers = 1 + } + + k := &kvSync{ + store: options.Store, + ctx: ctx, + queue: make(chan queueItem, options.Workers), + workers: workers, + reports: make(chan Report), + reportCallback: options.ReportCallback, + } + + k.launchWorkers() + + go func() { + for { + select { + case <-k.ctx.Done(): + return + case r := <-k.reports: + if k.reportCallback != nil { + k.reportCallback(r) + } + } + } + }() + + return k +} + +type queueItem struct { + entity any + keyName string + key string +} + +// kvSync is a struct that syncs a Gorm model with a KVStore +type kvSync struct { + store KVStore + queue chan queueItem + reports chan Report + ctx context.Context + workers int + reportCallback ReportCallback +} + +func (k *kvSync) launchWorkers() { + for i := 0; i < k.workers; i++ { + go func() { + for { + select { + case <-k.ctx.Done(): + return + case item := <-k.queue: + k.syncByKey(item.entity, item.key, true) + } + } + }() + } +} + +// Fetch fetches a Syncable model from a KVStore and populates a new model with the data +func (k *kvSync) Fetch(dest Syncable, keyName string) error { + if reflect.TypeOf(dest).Kind() != reflect.Ptr { + return errors.New("destination must be a pointer") + } + + return k.store.Fetch(dest.SyncKeys()[keyName], dest) +} + +// GormCallback returns a Gorm callback that syncs a model with a KVStore +func (k *kvSync) GormCallback() func(db *gorm.DB) { + return func(db *gorm.DB) { + model := resolvePointer(db.Statement.Dest) + + if reflect.TypeOf(model).Kind() == reflect.Slice { + val := reflect.ValueOf(model) + + for i := 0; i < val.Len(); i++ { + item := val.Index(i).Interface() + go k.enqueue(item) + } + return + } else { + go k.enqueue(model) + } + } +} + +// Sync syncs a model with a KVStore synchronously +func (k *kvSync) Sync(entity any) error { + entity = resolvePointer(entity) + + syncable, ok := entity.(Syncable) + + if !ok { + return errors.New("model is not syncable") + } + + for _, key := range syncable.SyncKeys() { + k.syncByKey(entity, key, false) + } + + return nil +} + +func (k *kvSync) syncByKey(entity any, key string, report bool) { + entity = resolvePointer(entity) + + err := k.store.Put(key, entity) + + if !report { + return + } + + k.reports <- Report{ + Model: entity, + Key: key, + Err: err, + } +} + +func (k *kvSync) enqueue(entity any) { + entity = resolvePointer(entity) + + syncable, ok := entity.(Syncable) + + if !ok { + return + } + + for keyName, key := range syncable.SyncKeys() { + k.queue <- queueItem{ + entity: entity, + keyName: keyName, + key: key, + } + } +} + +func resolvePointer(item interface{}) interface{} { + for { + val := reflect.ValueOf(item) + + if val.Kind() != reflect.Ptr { + return item + } + + item = reflect.Indirect(val).Interface() + } +} diff --git a/kvsync_test.go b/kvsync_test.go new file mode 100644 index 0000000..8613f55 --- /dev/null +++ b/kvsync_test.go @@ -0,0 +1,206 @@ +package kvsync_test + +import ( + "context" + "fmt" + "github.com/ndthuan/kvsync" + "github.com/stretchr/testify/assert" + "gorm.io/driver/sqlite" + "gorm.io/gorm" + "testing" +) + +type SyncedUser struct { + gorm.Model + UUID string + Username string +} + +func (u SyncedUser) SyncKeys() map[string]string { + return map[string]string{ + "id": fmt.Sprintf("user:id:%d", u.ID), + "uuid": fmt.Sprintf("user:uuid:%s", u.UUID), + "composite": fmt.Sprintf("user:composite:%d_%s", u.ID, u.UUID), + } +} + +type UnsyncedUser struct { + gorm.Model + UUID string + Username string +} + +func TestAutomatedSync(t *testing.T) { + var expectedDoneCount = 9 // 3 keys per SyncedUser + var actualDoneCount int + + store := &kvsync.InMemoryStore{ + Store: make(map[string]any), + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + kvSync := kvsync.NewKVSync(ctx, kvsync.Options{ + Store: store, + Workers: 4, + ReportCallback: func(r kvsync.Report) { + if r.Err == nil { + actualDoneCount++ + } + }, + }) + + db := setUpDB() + defer tearDownDB(db) + + if err := db.Callback().Create().After("gorm:create").Register("kvsync:create", kvSync.GormCallback()); err != nil { + t.Fatal("failed to register gorm:create callback", err) + } + + if err := db.Callback().Update().After("gorm:update").Register("kvsync:update", kvSync.GormCallback()); err != nil { + t.Fatal("failed to register gorm:update callback", err) + } + + // single model + db.Create(&SyncedUser{ + UUID: "test-uuid", + Username: "test-username", + }) + + // slice of models + db.Create(&[]SyncedUser{ + { + UUID: "test-uuid-2", + Username: "test-username-2", + }, + { + UUID: "test-uuid-3", + Username: "test-username-3", + }, + }) + + db.Create(&UnsyncedUser{ + UUID: "test-uuid-4", + }) + + for { + if actualDoneCount >= expectedDoneCount { + break + } + } + + fetchedUser := SyncedUser{ + UUID: "test-uuid", + } + + err := kvSync.Fetch(&fetchedUser, "uuid") + assert.NoError(t, err) + assert.Equal(t, "test-username", fetchedUser.Username) + + assert.Equal(t, expectedDoneCount, len(store.Store)) + //assert.Equal(t, expectedErrorCount, actualErrorCount) +} + +func TestManualSync(t *testing.T) { + store := &kvsync.InMemoryStore{ + Store: make(map[string]any), + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + kvSync := kvsync.NewKVSync(ctx, kvsync.Options{ + Store: store, + Workers: 2, + }) + + db := setUpDB() + defer tearDownDB(db) + + // single model + kvSync.Sync(&SyncedUser{ + UUID: "test-uuid-manual", + Username: "test-username-manual", + }) + kvSync.Sync(&UnsyncedUser{ + UUID: "test-uuid-manual-2", + Username: "test-username-manual-2", + }) + + assert.Equal(t, 3, len(store.Store)) + + fetchUser := SyncedUser{ + UUID: "test-uuid-manual", + } + + err := kvSync.Fetch(&fetchUser, "uuid") + assert.NoError(t, err) + assert.Equal(t, "test-username-manual", fetchUser.Username) +} + +func TestFetch_Errors(t *testing.T) { + store := &kvsync.InMemoryStore{ + Store: make(map[string]any), + } + + kvSync := kvsync.NewKVSync(context.Background(), kvsync.Options{ + Store: store, + }) + + testCases := []struct { + name string + dest kvsync.Syncable + keyName string + }{ + { + name: "invalid dest (non-pointer)", + dest: SyncedUser{}, + keyName: "uuid", + }, + { + name: "key not found", + dest: &SyncedUser{}, + keyName: "uuid", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := kvSync.Fetch(tc.dest, tc.keyName) + assert.Error(t, err) + }) + } +} + +func TestFetch_KeyNotFound(t *testing.T) { + store := &kvsync.InMemoryStore{ + Store: make(map[string]any), + } + kvSync := kvsync.NewKVSync(context.Background(), kvsync.Options{ + Store: store, + }) + + err := kvSync.Fetch(&SyncedUser{}, "uuid") + assert.Error(t, err) +} + +func setUpDB() *gorm.DB { + db, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{}) + if err != nil { + panic(fmt.Sprintf("Failed to connect to database: %v", err)) + } + + if err = db.AutoMigrate(&SyncedUser{}, &UnsyncedUser{}); err != nil { + panic(fmt.Sprintf("Failed to auto migrate: %v", err)) + } + + return db +} + +func tearDownDB(db *gorm.DB) { + _ = db.Migrator().DropTable(&SyncedUser{}, &UnsyncedUser{}) + conn, err := db.DB() + if err == nil { + _ = conn.Close() + } +} diff --git a/memory.go b/memory.go new file mode 100644 index 0000000..2848788 --- /dev/null +++ b/memory.go @@ -0,0 +1,47 @@ +package kvsync + +import ( + "fmt" + "reflect" + "sync" +) + +// InMemoryStore is an in-memory implementation of KVStore +type InMemoryStore struct { + Store map[string]any + mutex sync.Mutex +} + +func copyFields(val interface{}, dest interface{}) error { + vVal := reflect.ValueOf(val) + vDest := reflect.ValueOf(dest) + + vDest = vDest.Elem() + + for i := 0; i < vDest.NumField(); i++ { + vDest.Field(i).Set(vVal.Field(i)) + } + + return nil +} + +func (m *InMemoryStore) Fetch(key string, dest any) error { + m.mutex.Lock() + defer m.mutex.Unlock() + + val, ok := m.Store[key] + if !ok { + return fmt.Errorf("key %s not found", key) + } + + return copyFields(val, dest) +} + +func (m *InMemoryStore) Put(key string, value any) error { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.Store[key] = value + + return nil +} diff --git a/redis.go b/redis.go new file mode 100644 index 0000000..e55bc0d --- /dev/null +++ b/redis.go @@ -0,0 +1,85 @@ +package kvsync + +import ( + "context" + "errors" + "github.com/redis/go-redis/v9" + "go.mongodb.org/mongo-driver/bson" + "reflect" + "time" +) + +// MarshalingAdapter is an interface for marshaling and unmarshaling data +type MarshalingAdapter interface { + Marshal(v any) ([]byte, error) + Unmarshal(data []byte, v any) error +} + +// BSONMarshalingAdapter is a BSON implementation of MarshalingAdapter +type BSONMarshalingAdapter struct{} + +func (b *BSONMarshalingAdapter) Marshal(v any) ([]byte, error) { + return bson.Marshal(v) +} + +func (b *BSONMarshalingAdapter) Unmarshal(data []byte, v any) error { + return bson.Unmarshal(data, v) +} + +// RedisStore is a Redis implementation of KVStore +type RedisStore struct { + Client *redis.ClusterClient + Prefix string + Expiration time.Duration + Marshaler MarshalingAdapter +} + +func (r *RedisStore) Fetch(key string, dest any) error { + if r.Marshaler == nil { + r.Marshaler = &BSONMarshalingAdapter{} + } + + if reflect.TypeOf(dest).Kind() != reflect.Ptr || !isStruct(dest) { + return errors.New("destination must be a pointer to a struct") + } + + val, err := r.Client.Get(context.Background(), r.prefixedKey(key)).Result() + + if err != nil { + return err + } + + return r.Marshaler.Unmarshal([]byte(val), dest) +} + +func (r *RedisStore) Put(key string, value any) error { + if r.Marshaler == nil { + r.Marshaler = &BSONMarshalingAdapter{} + } + + if !isStruct(value) { + return errors.New("value must be a struct") + } + + b, err := r.Marshaler.Marshal(value) + if err != nil { + return err + } + + return r.Client.Set(context.Background(), r.prefixedKey(key), b, r.Expiration).Err() +} + +func (r *RedisStore) prefixedKey(key string) string { + if r.Prefix == "" { + r.Prefix = "kvsync:" + } + + return r.Prefix + key +} + +func isStruct(value any) bool { + val := reflect.ValueOf(value) + kind := val.Kind() + + return kind == reflect.Struct || (kind == reflect.Ptr && val.Elem().Kind() == reflect.Struct) +} diff --git a/redis_test.go b/redis_test.go new file mode 100644 index 0000000..0958ec4 --- /dev/null +++ b/redis_test.go @@ -0,0 +1,158 @@ +package kvsync_test + +import ( + "errors" + "github.com/alicebob/miniredis/v2" + "github.com/ndthuan/kvsync" + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/assert" + "go.mongodb.org/mongo-driver/bson" + "testing" +) + +type erroneousMarshaler struct{} + +func (e erroneousMarshaler) Marshal(v any) ([]byte, error) { + return nil, errors.New("marshaling error") +} + +func (e erroneousMarshaler) Unmarshal(data []byte, v any) error { + return errors.New("unmarshaling error") +} + +type User struct { + ID int + Name string +} + +func TestRedisStore_Set(t *testing.T) { + redisStore, miniRedis := setUpStore() + defer miniRedis.Close() + defer func() { + redisStore.Marshaler = nil + }() + + testCases := []struct { + name string + marshaler kvsync.MarshalingAdapter + key string + value any + wantErr bool + }{ + { + name: "set a struct", + key: "user:1", + value: &User{ID: 1, Name: "Alice"}, + wantErr: false, + }, + { + name: "set a struct but marshaler returns error", + marshaler: &erroneousMarshaler{}, + key: "user:1", + value: &User{ID: 1, Name: "Alice"}, + wantErr: true, + }, + { + name: "set a non-struct", + key: "user:2", + value: "Alice", + wantErr: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + redisStore.Marshaler = tc.marshaler + err := redisStore.Put(tc.key, tc.value) + if tc.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestRedisStore_FetchInto(t *testing.T) { + redisStore, miniRedis := setUpStore() + defer miniRedis.Close() + defer func() { + redisStore.Marshaler = nil + }() + + var validDest User + + validMarshal, err := bson.Marshal(&User{ID: 1, Name: "Alice"}) + assert.NoError(t, err) + _ = miniRedis.Set("kvsync:user:1", string(validMarshal)) + _ = miniRedis.Set("kvsync:unmarshallable", "unmarshalable") + + testCases := []struct { + name string + key string + dest any + wantUsername string + wantErr bool + }{ + { + name: "valid dest and marshalable", + key: "user:1", + dest: &validDest, + wantUsername: "Alice", + wantErr: false, + }, + { + name: "valid dest but unmarshalable", + key: "unmarshalable", + dest: &validDest, + wantUsername: "", + wantErr: true, + }, + { + name: "invalid dest (non-struct)", + key: "user:1", + dest: "Alice", + wantErr: true, + }, + { + name: "valid dest but key not found", + key: "user:999", + dest: &validDest, + wantUsername: "", + wantErr: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err = redisStore.Fetch(tc.key, tc.dest) + if tc.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, tc.wantUsername, tc.dest.(*User).Name) + } + }) + + } +} + +func setUpStore() (*kvsync.RedisStore, *miniredis.Miniredis) { + // Create a new miniredis server + s, err := miniredis.Run() + if err != nil { + panic(err) + } + + clusterClient := redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: []string{s.Addr()}, + }) + + // Create a new RedisStore + store := &kvsync.RedisStore{ + Client: clusterClient, + Expiration: 0, + } + + return store, s +}