diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml
new file mode 100644
index 0000000..484a0c6
--- /dev/null
+++ b/.github/workflows/main.yml
@@ -0,0 +1,39 @@
+name: Go
+
+on:
+ push:
+ branches: [ master ]
+ pull_request:
+ branches: [ master ]
+
+jobs:
+
+ build:
+ name: Build
+ runs-on: ubuntu-latest
+ strategy:
+ matrix:
+ go-version: [1.18, 1.19, 1.20, 1.21, 1.22]
+ steps:
+ - uses: actions/checkout@v4
+ with:
+ fetch-depth: 10
+
+ - name: Set up Go
+ uses: actions/setup-go@v2
+ with:
+ go-version: "${{ matrix.go-version }}"
+ id: go
+
+ - name: Get dependencies
+ run: go get -v -t -d ./...
+
+ - uses: gwatts/go-coverage-action@v2
+ id: coverage
+ with:
+ # Optional coverage threshold
+ # use fail-coverage to determine what should happen below this threshold
+ coverage-threshold: 100
+
+ # collect coverage for all packages beyond the one under test
+ cover-pkg: ./...
diff --git a/.idea/.gitignore b/.idea/.gitignore
new file mode 100644
index 0000000..13566b8
--- /dev/null
+++ b/.idea/.gitignore
@@ -0,0 +1,8 @@
+# Default ignored files
+/shelf/
+/workspace.xml
+# Editor-based HTTP Client requests
+/httpRequests/
+# Datasource local storage ignored files
+/dataSources/
+/dataSources.local.xml
diff --git a/.idea/kvsync.iml b/.idea/kvsync.iml
new file mode 100644
index 0000000..5e764c4
--- /dev/null
+++ b/.idea/kvsync.iml
@@ -0,0 +1,9 @@
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/modules.xml b/.idea/modules.xml
new file mode 100644
index 0000000..c62eb75
--- /dev/null
+++ b/.idea/modules.xml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
new file mode 100644
index 0000000..35eb1dd
--- /dev/null
+++ b/.idea/vcs.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..6680a6a
--- /dev/null
+++ b/README.md
@@ -0,0 +1,139 @@
+# KVSync
+
+KVSync is a Go package that provides a simple and efficient way to synchronize your GORM models with a key-value store.
+
+There can be multiple key definitions for each model. For example, you can have a key for fetching by ID, a key for fetching by UUID, and a composite key for fetching by both ID and UUID. For each key, the model data is replicated accordingly to the key-value store.
+
+## Installation
+
+To install KVSync, use the `go get` command:
+
+```bash
+go get github.com/ndthuan/kvsync
+```
+
+## Sync Setup
+
+### Define Synced Models
+
+Implement `kvsync.Syncable` and provide sync keys mapped by a name for fetching later. Each key is unique on the key-value store.
+
+```go
+type SyncedUser struct {
+ gorm.Model
+ UUID string
+ Username string
+}
+
+func (u SyncedUser) SyncKeys() map[string]string {
+ return map[string]string{
+ "id": fmt.Sprintf("user:id:%d", u.ID),
+ "uuid": fmt.Sprintf("user:uuid:%s", u.UUID),
+ "composite": fmt.Sprintf("user:composite:%d_%s", u.ID, u.UUID),
+ }
+}
+
+```
+
+### Configure Key-Value Store
+
+With Redis for example, you can use the provided `RedisStore`. Steps:
+- Init GORM DB instance
+- Init Redis client
+- Create a new `RedisStore` instance
+- Create a new `KVSync` instance
+- Register GORM callbacks
+
+```go
+package main
+
+import (
+ "context"
+ "fmt"
+ "github.com/ndthuan/kvsync"
+ "github.com/redis/go-redis/v9"
+ "gorm.io/gorm"
+ "time"
+)
+
+func main() {
+ db, err := gorm.Open(...)
+ if err != nil {
+ panic(fmt.Sprintf("Failed to connect to database: %v", err))
+ }
+
+ clusterClient := redis.NewClusterClient(&redis.ClusterOptions{
+ Addrs: []string{},
+ })
+
+ // Create a new RedisStore
+ store := &kvsync.RedisStore{
+ Client: clusterClient,
+ Expiration: time.Hour * 24 * 365, // Set the expiration time for the keys
+ Prefix: "kvsync:", // Optional, defaults to "kvsync:"
+ Marshaler: &kvsync.BSONMarshalingAdapter{}, // Optional, BSONMarshalingAdapter (using Mongo's BSON) is the default and recommended
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ kvSync := kvsync.NewKVSync(ctx, kvsync.Options{
+ Store: store,
+ Workers: 4,
+ ReportCallback: func(r kvsync.Report) {
+ if r.Err == nil {
+ actualDoneCount++
+ }
+ },
+ })
+
+ // Register the GORM callbacks for automated synchronization
+ db.Callback().Create().After("gorm:create").Register("kvsync:create", kvSync.GormCallback())
+ db.Callback().Update().After("gorm:update").Register("kvsync:update", kvSync.GormCallback())
+
+}
+```
+
+### And create/update your model as usual
+
+```go
+// Create a new SyncedUser
+db.Create(&SyncedUser{
+ UUID: "test-uuid",
+ Username: "test-username",
+})
+// The SyncedUser is automatically synchronized with the key-value store
+```
+
+## Fetching Synced Models
+
+You can fetch the model by any of the keys you defined. You must provide a struct with non-zero values for the keys you want to fetch by.
+
+By ID
+```go
+user := SyncedUser{
+ Model: gorm.Model{ID: 1},
+}
+kvSync.Fetch(&user, "id")
+```
+
+By UUID
+```go
+user := SyncedUser{
+ UUID: "test-uuid",
+}
+kvSync.Fetch(&user, "uuid")
+```
+
+By composite key
+```go
+user := SyncedUser{
+ Model: gorm.Model{ID: 1},
+ UUID: "test-uuid",
+}
+kvSync.Fetch(&user, "composite")
+```
+
+## License
+
+KVSync is licensed under the MIT License. See the [LICENSE](LICENSE) file for more information.
diff --git a/go.mod b/go.mod
new file mode 100644
index 0000000..3cd6944
--- /dev/null
+++ b/go.mod
@@ -0,0 +1,25 @@
+module github.com/ndthuan/kvsync
+
+go 1.18
+
+require (
+ github.com/alicebob/miniredis/v2 v2.33.0
+ github.com/redis/go-redis/v9 v9.5.3
+ github.com/stretchr/testify v1.9.0
+ go.mongodb.org/mongo-driver v1.15.0
+ gorm.io/driver/sqlite v1.5.5
+ gorm.io/gorm v1.25.10
+)
+
+require (
+ github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
+ github.com/cespare/xxhash/v2 v2.2.0 // indirect
+ github.com/davecgh/go-spew v1.1.1 // indirect
+ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
+ github.com/jinzhu/inflection v1.0.0 // indirect
+ github.com/jinzhu/now v1.1.5 // indirect
+ github.com/mattn/go-sqlite3 v1.14.17 // indirect
+ github.com/pmezard/go-difflib v1.0.0 // indirect
+ github.com/yuin/gopher-lua v1.1.1 // indirect
+ gopkg.in/yaml.v3 v3.0.1 // indirect
+)
diff --git a/go.sum b/go.sum
new file mode 100644
index 0000000..e624c20
--- /dev/null
+++ b/go.sum
@@ -0,0 +1,37 @@
+github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk=
+github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
+github.com/alicebob/miniredis/v2 v2.33.0 h1:uvTF0EDeu9RLnUEG27Db5I68ESoIxTiXbNUiji6lZrA=
+github.com/alicebob/miniredis/v2 v2.33.0/go.mod h1:MhP4a3EU7aENRi9aO+tHfTBZicLqQevyi/DJpoj6mi0=
+github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
+github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
+github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
+github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
+github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
+github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM=
+github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
+github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
+github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
+github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
+github.com/mattn/go-sqlite3 v1.14.17 h1:mCRHCLDUBXgpKAqIKsaAaAsrAlbkeomtRFKXh2L6YIM=
+github.com/mattn/go-sqlite3 v1.14.17/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/redis/go-redis/v9 v9.5.3 h1:fOAp1/uJG+ZtcITgZOfYFmTKPE7n4Vclj1wZFgRciUU=
+github.com/redis/go-redis/v9 v9.5.3/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
+github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
+github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
+github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M=
+github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
+go.mongodb.org/mongo-driver v1.15.0 h1:rJCKC8eEliewXjZGf0ddURtl7tTVy1TK3bfl0gkUSLc=
+go.mongodb.org/mongo-driver v1.15.0/go.mod h1:Vzb0Mk/pa7e6cWw85R4F/endUC3u0U9jGcNU603k65c=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gorm.io/driver/sqlite v1.5.5 h1:7MDMtUZhV065SilG62E0MquljeArQZNfJnjd9i9gx3E=
+gorm.io/driver/sqlite v1.5.5/go.mod h1:6NgQ7sQWAIFsPrJJl1lSNSu2TABh0ZZ/zm5fosATavE=
+gorm.io/gorm v1.25.10 h1:dQpO+33KalOA+aFYGlK+EfxcI5MbO7EP2yYygwh9h+s=
+gorm.io/gorm v1.25.10/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8=
diff --git a/kvsync.go b/kvsync.go
new file mode 100644
index 0000000..db6baca
--- /dev/null
+++ b/kvsync.go
@@ -0,0 +1,199 @@
+package kvsync
+
+import (
+ "context"
+ "errors"
+ "gorm.io/gorm"
+ "reflect"
+)
+
+// KVStore is the interface for a key-value store
+type KVStore interface {
+ Put(key string, value any) error
+ Fetch(key string, dest any) error
+}
+
+// Syncable is the interface for a Gorm model that can be synced with a KVStore
+type Syncable interface {
+ SyncKeys() map[string]string
+}
+
+// Report is a struct that represents a report of a sync operation
+type Report struct {
+ Model any
+ KeyName string
+ Key string
+ Err error
+}
+
+type ReportCallback func(Report)
+
+// KVSync is the interface for a service that syncs Gorm models with a KVStore
+type KVSync interface {
+ Fetch(dest Syncable, keyName string) error
+ GormCallback() func(db *gorm.DB)
+ Sync(entity any) error
+}
+
+// Options is a struct that contains options for creating a KVSync instance
+type Options struct {
+ Store KVStore
+ Workers int
+ ReportCallback ReportCallback
+}
+
+// NewKVSync creates a new KVSync instance
+func NewKVSync(ctx context.Context, options Options) KVSync {
+ workers := options.Workers
+ if workers < 1 {
+ workers = 1
+ }
+
+ k := &kvSync{
+ store: options.Store,
+ ctx: ctx,
+ queue: make(chan queueItem, options.Workers),
+ workers: workers,
+ reports: make(chan Report),
+ reportCallback: options.ReportCallback,
+ }
+
+ k.launchWorkers()
+
+ go func() {
+ for {
+ select {
+ case <-k.ctx.Done():
+ return
+ case r := <-k.reports:
+ if k.reportCallback != nil {
+ k.reportCallback(r)
+ }
+ }
+ }
+ }()
+
+ return k
+}
+
+type queueItem struct {
+ entity any
+ keyName string
+ key string
+}
+
+// kvSync is a struct that syncs a Gorm model with a KVStore
+type kvSync struct {
+ store KVStore
+ queue chan queueItem
+ reports chan Report
+ ctx context.Context
+ workers int
+ reportCallback ReportCallback
+}
+
+func (k *kvSync) launchWorkers() {
+ for i := 0; i < k.workers; i++ {
+ go func() {
+ for {
+ select {
+ case <-k.ctx.Done():
+ return
+ case item := <-k.queue:
+ k.syncByKey(item.entity, item.key, true)
+ }
+ }
+ }()
+ }
+}
+
+// Fetch fetches a Syncable model from a KVStore and populates a new model with the data
+func (k *kvSync) Fetch(dest Syncable, keyName string) error {
+ if reflect.TypeOf(dest).Kind() != reflect.Ptr {
+ return errors.New("destination must be a pointer")
+ }
+
+ return k.store.Fetch(dest.SyncKeys()[keyName], dest)
+}
+
+// GormCallback returns a Gorm callback that syncs a model with a KVStore
+func (k *kvSync) GormCallback() func(db *gorm.DB) {
+ return func(db *gorm.DB) {
+ model := resolvePointer(db.Statement.Dest)
+
+ if reflect.TypeOf(model).Kind() == reflect.Slice {
+ val := reflect.ValueOf(model)
+
+ for i := 0; i < val.Len(); i++ {
+ item := val.Index(i).Interface()
+ go k.enqueue(item)
+ }
+ return
+ } else {
+ go k.enqueue(model)
+ }
+ }
+}
+
+// Sync syncs a model with a KVStore synchronously
+func (k *kvSync) Sync(entity any) error {
+ entity = resolvePointer(entity)
+
+ syncable, ok := entity.(Syncable)
+
+ if !ok {
+ return errors.New("model is not syncable")
+ }
+
+ for _, key := range syncable.SyncKeys() {
+ k.syncByKey(entity, key, false)
+ }
+
+ return nil
+}
+
+func (k *kvSync) syncByKey(entity any, key string, report bool) {
+ entity = resolvePointer(entity)
+
+ err := k.store.Put(key, entity)
+
+ if !report {
+ return
+ }
+
+ k.reports <- Report{
+ Model: entity,
+ Key: key,
+ Err: err,
+ }
+}
+
+func (k *kvSync) enqueue(entity any) {
+ entity = resolvePointer(entity)
+
+ syncable, ok := entity.(Syncable)
+
+ if !ok {
+ return
+ }
+
+ for keyName, key := range syncable.SyncKeys() {
+ k.queue <- queueItem{
+ entity: entity,
+ keyName: keyName,
+ key: key,
+ }
+ }
+}
+
+func resolvePointer(item interface{}) interface{} {
+ for {
+ val := reflect.ValueOf(item)
+
+ if val.Kind() != reflect.Ptr {
+ return item
+ }
+
+ item = reflect.Indirect(val).Interface()
+ }
+}
diff --git a/kvsync_test.go b/kvsync_test.go
new file mode 100644
index 0000000..8613f55
--- /dev/null
+++ b/kvsync_test.go
@@ -0,0 +1,206 @@
+package kvsync_test
+
+import (
+ "context"
+ "fmt"
+ "github.com/ndthuan/kvsync"
+ "github.com/stretchr/testify/assert"
+ "gorm.io/driver/sqlite"
+ "gorm.io/gorm"
+ "testing"
+)
+
+type SyncedUser struct {
+ gorm.Model
+ UUID string
+ Username string
+}
+
+func (u SyncedUser) SyncKeys() map[string]string {
+ return map[string]string{
+ "id": fmt.Sprintf("user:id:%d", u.ID),
+ "uuid": fmt.Sprintf("user:uuid:%s", u.UUID),
+ "composite": fmt.Sprintf("user:composite:%d_%s", u.ID, u.UUID),
+ }
+}
+
+type UnsyncedUser struct {
+ gorm.Model
+ UUID string
+ Username string
+}
+
+func TestAutomatedSync(t *testing.T) {
+ var expectedDoneCount = 9 // 3 keys per SyncedUser
+ var actualDoneCount int
+
+ store := &kvsync.InMemoryStore{
+ Store: make(map[string]any),
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ kvSync := kvsync.NewKVSync(ctx, kvsync.Options{
+ Store: store,
+ Workers: 4,
+ ReportCallback: func(r kvsync.Report) {
+ if r.Err == nil {
+ actualDoneCount++
+ }
+ },
+ })
+
+ db := setUpDB()
+ defer tearDownDB(db)
+
+ if err := db.Callback().Create().After("gorm:create").Register("kvsync:create", kvSync.GormCallback()); err != nil {
+ t.Fatal("failed to register gorm:create callback", err)
+ }
+
+ if err := db.Callback().Update().After("gorm:update").Register("kvsync:update", kvSync.GormCallback()); err != nil {
+ t.Fatal("failed to register gorm:update callback", err)
+ }
+
+ // single model
+ db.Create(&SyncedUser{
+ UUID: "test-uuid",
+ Username: "test-username",
+ })
+
+ // slice of models
+ db.Create(&[]SyncedUser{
+ {
+ UUID: "test-uuid-2",
+ Username: "test-username-2",
+ },
+ {
+ UUID: "test-uuid-3",
+ Username: "test-username-3",
+ },
+ })
+
+ db.Create(&UnsyncedUser{
+ UUID: "test-uuid-4",
+ })
+
+ for {
+ if actualDoneCount >= expectedDoneCount {
+ break
+ }
+ }
+
+ fetchedUser := SyncedUser{
+ UUID: "test-uuid",
+ }
+
+ err := kvSync.Fetch(&fetchedUser, "uuid")
+ assert.NoError(t, err)
+ assert.Equal(t, "test-username", fetchedUser.Username)
+
+ assert.Equal(t, expectedDoneCount, len(store.Store))
+ //assert.Equal(t, expectedErrorCount, actualErrorCount)
+}
+
+func TestManualSync(t *testing.T) {
+ store := &kvsync.InMemoryStore{
+ Store: make(map[string]any),
+ }
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ kvSync := kvsync.NewKVSync(ctx, kvsync.Options{
+ Store: store,
+ Workers: 2,
+ })
+
+ db := setUpDB()
+ defer tearDownDB(db)
+
+ // single model
+ kvSync.Sync(&SyncedUser{
+ UUID: "test-uuid-manual",
+ Username: "test-username-manual",
+ })
+ kvSync.Sync(&UnsyncedUser{
+ UUID: "test-uuid-manual-2",
+ Username: "test-username-manual-2",
+ })
+
+ assert.Equal(t, 3, len(store.Store))
+
+ fetchUser := SyncedUser{
+ UUID: "test-uuid-manual",
+ }
+
+ err := kvSync.Fetch(&fetchUser, "uuid")
+ assert.NoError(t, err)
+ assert.Equal(t, "test-username-manual", fetchUser.Username)
+}
+
+func TestFetch_Errors(t *testing.T) {
+ store := &kvsync.InMemoryStore{
+ Store: make(map[string]any),
+ }
+
+ kvSync := kvsync.NewKVSync(context.Background(), kvsync.Options{
+ Store: store,
+ })
+
+ testCases := []struct {
+ name string
+ dest kvsync.Syncable
+ keyName string
+ }{
+ {
+ name: "invalid dest (non-pointer)",
+ dest: SyncedUser{},
+ keyName: "uuid",
+ },
+ {
+ name: "key not found",
+ dest: &SyncedUser{},
+ keyName: "uuid",
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ err := kvSync.Fetch(tc.dest, tc.keyName)
+ assert.Error(t, err)
+ })
+ }
+}
+
+func TestFetch_KeyNotFound(t *testing.T) {
+ store := &kvsync.InMemoryStore{
+ Store: make(map[string]any),
+ }
+ kvSync := kvsync.NewKVSync(context.Background(), kvsync.Options{
+ Store: store,
+ })
+
+ err := kvSync.Fetch(&SyncedUser{}, "uuid")
+ assert.Error(t, err)
+}
+
+func setUpDB() *gorm.DB {
+ db, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{})
+ if err != nil {
+ panic(fmt.Sprintf("Failed to connect to database: %v", err))
+ }
+
+ if err = db.AutoMigrate(&SyncedUser{}, &UnsyncedUser{}); err != nil {
+ panic(fmt.Sprintf("Failed to auto migrate: %v", err))
+ }
+
+ return db
+}
+
+func tearDownDB(db *gorm.DB) {
+ _ = db.Migrator().DropTable(&SyncedUser{}, &UnsyncedUser{})
+ conn, err := db.DB()
+ if err == nil {
+ _ = conn.Close()
+ }
+}
diff --git a/memory.go b/memory.go
new file mode 100644
index 0000000..2848788
--- /dev/null
+++ b/memory.go
@@ -0,0 +1,47 @@
+package kvsync
+
+import (
+ "fmt"
+ "reflect"
+ "sync"
+)
+
+// InMemoryStore is an in-memory implementation of KVStore
+type InMemoryStore struct {
+ Store map[string]any
+ mutex sync.Mutex
+}
+
+func copyFields(val interface{}, dest interface{}) error {
+ vVal := reflect.ValueOf(val)
+ vDest := reflect.ValueOf(dest)
+
+ vDest = vDest.Elem()
+
+ for i := 0; i < vDest.NumField(); i++ {
+ vDest.Field(i).Set(vVal.Field(i))
+ }
+
+ return nil
+}
+
+func (m *InMemoryStore) Fetch(key string, dest any) error {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+
+ val, ok := m.Store[key]
+ if !ok {
+ return fmt.Errorf("key %s not found", key)
+ }
+
+ return copyFields(val, dest)
+}
+
+func (m *InMemoryStore) Put(key string, value any) error {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+
+ m.Store[key] = value
+
+ return nil
+}
diff --git a/redis.go b/redis.go
new file mode 100644
index 0000000..e55bc0d
--- /dev/null
+++ b/redis.go
@@ -0,0 +1,85 @@
+package kvsync
+
+import (
+ "context"
+ "errors"
+ "github.com/redis/go-redis/v9"
+ "go.mongodb.org/mongo-driver/bson"
+ "reflect"
+ "time"
+)
+
+// MarshalingAdapter is an interface for marshaling and unmarshaling data
+type MarshalingAdapter interface {
+ Marshal(v any) ([]byte, error)
+ Unmarshal(data []byte, v any) error
+}
+
+// BSONMarshalingAdapter is a BSON implementation of MarshalingAdapter
+type BSONMarshalingAdapter struct{}
+
+func (b *BSONMarshalingAdapter) Marshal(v any) ([]byte, error) {
+ return bson.Marshal(v)
+}
+
+func (b *BSONMarshalingAdapter) Unmarshal(data []byte, v any) error {
+ return bson.Unmarshal(data, v)
+}
+
+// RedisStore is a Redis implementation of KVStore
+type RedisStore struct {
+ Client *redis.ClusterClient
+ Prefix string
+ Expiration time.Duration
+ Marshaler MarshalingAdapter
+}
+
+func (r *RedisStore) Fetch(key string, dest any) error {
+ if r.Marshaler == nil {
+ r.Marshaler = &BSONMarshalingAdapter{}
+ }
+
+ if reflect.TypeOf(dest).Kind() != reflect.Ptr || !isStruct(dest) {
+ return errors.New("destination must be a pointer to a struct")
+ }
+
+ val, err := r.Client.Get(context.Background(), r.prefixedKey(key)).Result()
+
+ if err != nil {
+ return err
+ }
+
+ return r.Marshaler.Unmarshal([]byte(val), dest)
+}
+
+func (r *RedisStore) Put(key string, value any) error {
+ if r.Marshaler == nil {
+ r.Marshaler = &BSONMarshalingAdapter{}
+ }
+
+ if !isStruct(value) {
+ return errors.New("value must be a struct")
+ }
+
+ b, err := r.Marshaler.Marshal(value)
+ if err != nil {
+ return err
+ }
+
+ return r.Client.Set(context.Background(), r.prefixedKey(key), b, r.Expiration).Err()
+}
+
+func (r *RedisStore) prefixedKey(key string) string {
+ if r.Prefix == "" {
+ r.Prefix = "kvsync:"
+ }
+
+ return r.Prefix + key
+}
+
+func isStruct(value any) bool {
+ val := reflect.ValueOf(value)
+ kind := val.Kind()
+
+ return kind == reflect.Struct || (kind == reflect.Ptr && val.Elem().Kind() == reflect.Struct)
+}
diff --git a/redis_test.go b/redis_test.go
new file mode 100644
index 0000000..0958ec4
--- /dev/null
+++ b/redis_test.go
@@ -0,0 +1,158 @@
+package kvsync_test
+
+import (
+ "errors"
+ "github.com/alicebob/miniredis/v2"
+ "github.com/ndthuan/kvsync"
+ "github.com/redis/go-redis/v9"
+ "github.com/stretchr/testify/assert"
+ "go.mongodb.org/mongo-driver/bson"
+ "testing"
+)
+
+type erroneousMarshaler struct{}
+
+func (e erroneousMarshaler) Marshal(v any) ([]byte, error) {
+ return nil, errors.New("marshaling error")
+}
+
+func (e erroneousMarshaler) Unmarshal(data []byte, v any) error {
+ return errors.New("unmarshaling error")
+}
+
+type User struct {
+ ID int
+ Name string
+}
+
+func TestRedisStore_Set(t *testing.T) {
+ redisStore, miniRedis := setUpStore()
+ defer miniRedis.Close()
+ defer func() {
+ redisStore.Marshaler = nil
+ }()
+
+ testCases := []struct {
+ name string
+ marshaler kvsync.MarshalingAdapter
+ key string
+ value any
+ wantErr bool
+ }{
+ {
+ name: "set a struct",
+ key: "user:1",
+ value: &User{ID: 1, Name: "Alice"},
+ wantErr: false,
+ },
+ {
+ name: "set a struct but marshaler returns error",
+ marshaler: &erroneousMarshaler{},
+ key: "user:1",
+ value: &User{ID: 1, Name: "Alice"},
+ wantErr: true,
+ },
+ {
+ name: "set a non-struct",
+ key: "user:2",
+ value: "Alice",
+ wantErr: true,
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ redisStore.Marshaler = tc.marshaler
+ err := redisStore.Put(tc.key, tc.value)
+ if tc.wantErr {
+ assert.Error(t, err)
+ } else {
+ assert.NoError(t, err)
+ }
+ })
+ }
+}
+
+func TestRedisStore_FetchInto(t *testing.T) {
+ redisStore, miniRedis := setUpStore()
+ defer miniRedis.Close()
+ defer func() {
+ redisStore.Marshaler = nil
+ }()
+
+ var validDest User
+
+ validMarshal, err := bson.Marshal(&User{ID: 1, Name: "Alice"})
+ assert.NoError(t, err)
+ _ = miniRedis.Set("kvsync:user:1", string(validMarshal))
+ _ = miniRedis.Set("kvsync:unmarshallable", "unmarshalable")
+
+ testCases := []struct {
+ name string
+ key string
+ dest any
+ wantUsername string
+ wantErr bool
+ }{
+ {
+ name: "valid dest and marshalable",
+ key: "user:1",
+ dest: &validDest,
+ wantUsername: "Alice",
+ wantErr: false,
+ },
+ {
+ name: "valid dest but unmarshalable",
+ key: "unmarshalable",
+ dest: &validDest,
+ wantUsername: "",
+ wantErr: true,
+ },
+ {
+ name: "invalid dest (non-struct)",
+ key: "user:1",
+ dest: "Alice",
+ wantErr: true,
+ },
+ {
+ name: "valid dest but key not found",
+ key: "user:999",
+ dest: &validDest,
+ wantUsername: "",
+ wantErr: true,
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ err = redisStore.Fetch(tc.key, tc.dest)
+ if tc.wantErr {
+ assert.Error(t, err)
+ } else {
+ assert.NoError(t, err)
+ assert.Equal(t, tc.wantUsername, tc.dest.(*User).Name)
+ }
+ })
+
+ }
+}
+
+func setUpStore() (*kvsync.RedisStore, *miniredis.Miniredis) {
+ // Create a new miniredis server
+ s, err := miniredis.Run()
+ if err != nil {
+ panic(err)
+ }
+
+ clusterClient := redis.NewClusterClient(&redis.ClusterOptions{
+ Addrs: []string{s.Addr()},
+ })
+
+ // Create a new RedisStore
+ store := &kvsync.RedisStore{
+ Client: clusterClient,
+ Expiration: 0,
+ }
+
+ return store, s
+}