Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@
# Temp files

/tmp/

.env
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,6 @@ docker-test:
COMMIT=$(GIT_COMMIT) VERSION=$(GIT_VERSION) BUILD_TIME=$(BUILD_TIME) docker compose -f docker-compose.yml run --rm dev make test

instrumented:
gowrap gen -p github.com/brave/go-sync/datastore -i Datastore -t ./.prom-gowrap.tmpl -o ./datastore/instrumented_datastore.go
gowrap gen -p github.com/brave/go-sync/datastore -i DynamoDatastore -t ./.prom-gowrap.tmpl -o ./datastore/instrumented_dynamo_datastore.go
gowrap gen -p github.com/brave/go-sync/datastore -i SQLDatastore -t ./.prom-gowrap.tmpl -o ./datastore/instrumented_sql_datastore.go
gowrap gen -p github.com/brave/go-sync/cache -i RedisClient -t ./.prom-gowrap.tmpl -o ./cache/instrumented_redis.go
32 changes: 23 additions & 9 deletions cache/instrumented_redis.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

76 changes: 29 additions & 47 deletions cache/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cache

import (
"context"
"fmt"
"os"
"strconv"
"strings"
Expand All @@ -18,14 +19,11 @@ type RedisClient interface {
Get(ctx context.Context, key string, delete bool) (string, error)
Del(ctx context.Context, keys ...string) error
FlushAll(ctx context.Context) error
SubscribeAndWait(ctx context.Context, channel string) error
}

type redisSimpleClient struct {
client *redis.Client
}

type redisClusterClient struct {
client *redis.ClusterClient
type redisClientImpl struct {
client redis.UniversalClient
}

// NewRedisClient create a client for standalone redis or redis cluster.
Expand All @@ -50,24 +48,24 @@ func NewRedisClient() RedisClient {
client := redis.NewClient(&redis.Options{
Addr: addrs[0],
})
r = &redisSimpleClient{client}
r = &redisClientImpl{client}
} else {
client := redis.NewClusterClient(&redis.ClusterOptions{
Addrs: addrs,
PoolSize: poolSize,
ReadOnly: true,
})
r = &redisClusterClient{client}
r = &redisClientImpl{client}
}

return r
}

func (r *redisSimpleClient) Set(ctx context.Context, key string, val string, ttl time.Duration) error {
func (r *redisClientImpl) Set(ctx context.Context, key string, val string, ttl time.Duration) error {
return r.client.Set(ctx, key, val, ttl).Err()
}

func (r *redisSimpleClient) Incr(ctx context.Context, key string, subtract bool) (int, error) {
func (r *redisClientImpl) Incr(ctx context.Context, key string, subtract bool) (int, error) {
var res *redis.IntCmd
if subtract {
res = r.client.Decr(ctx, key)
Expand All @@ -78,7 +76,7 @@ func (r *redisSimpleClient) Incr(ctx context.Context, key string, subtract bool)
return int(val), err
}

func (r *redisSimpleClient) Get(ctx context.Context, key string, delete bool) (string, error) {
func (r *redisClientImpl) Get(ctx context.Context, key string, delete bool) (string, error) {
var res *redis.StringCmd
if delete {
res = r.client.GetDel(ctx, key)
Expand All @@ -92,47 +90,31 @@ func (r *redisSimpleClient) Get(ctx context.Context, key string, delete bool) (s
return val, err
}

func (r *redisSimpleClient) Del(ctx context.Context, keys ...string) error {
func (r *redisClientImpl) Del(ctx context.Context, keys ...string) error {
return r.client.Del(ctx, keys...).Err()
}

func (r *redisSimpleClient) FlushAll(ctx context.Context) error {
func (r *redisClientImpl) FlushAll(ctx context.Context) error {
return r.client.FlushAll(ctx).Err()
}

func (r *redisClusterClient) Set(ctx context.Context, key string, val string, ttl time.Duration) error {
return r.client.Set(ctx, key, val, ttl).Err()
}

func (r *redisClusterClient) Incr(ctx context.Context, key string, subtract bool) (int, error) {
var res *redis.IntCmd
if subtract {
res = r.client.Decr(ctx, key)
} else {
res = r.client.Incr(ctx, key)
func (r *redisClientImpl) SubscribeAndWait(ctx context.Context, channel string) error {
pubsub := r.client.Subscribe(ctx, channel)
defer pubsub.Close()

ch := pubsub.Channel()

for {
select {
case msg, ok := <-ch:
if !ok {
return fmt.Errorf("redis channel unexpectedly closed")
}
if msg != nil {
return nil
}
case <-ctx.Done():
return ctx.Err()
}
}
val, err := res.Result()
return int(val), err
}

func (r *redisClusterClient) Get(ctx context.Context, key string, delete bool) (string, error) {
var res *redis.StringCmd
if delete {
res = r.client.GetDel(ctx, key)
} else {
res = r.client.Get(ctx, key)
}
val, err := res.Result()
if err == redis.Nil {
return "", nil
}
return val, err
}

func (r *redisClusterClient) Del(ctx context.Context, keys ...string) error {
return r.client.Del(ctx, keys...).Err()
}

func (r *redisClusterClient) FlushAll(ctx context.Context) error {
return r.client.FlushAll(ctx).Err()
}
Loading