Skip to content

Commit

Permalink
using map[string]uint64 as cursor parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
mativm02 committed Jan 23, 2024
1 parent 19ebd79 commit 3bdf654
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 47 deletions.
64 changes: 47 additions & 17 deletions temporal/internal/driver/redisv9/keyvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,60 +369,90 @@ func (r *RedisV9) GetKeysAndValuesWithFilter(ctx context.Context,
return result, nil
}

// GetKeysWithOpts retrieves keys with options like filter, cursor, and count
// GetKeysWithOpts performs a paginated scan of keys in a Redis database using the SCAN command.
//
// Parameters:
//
// ctx: Execution context.
// searchStr: Pattern for filtering keys (glob-style patterns allowed).
// cursor: Map of Redis node addresses to cursor positions for pagination.
// Initializes internally if nil.
// count: Approximate number of keys to return per scan.
//
// Returns:
//
// keys: Slice of keys matching the searchStr pattern.
// updatedCursor: Updated cursor map for subsequent pagination.
// continueScan: Indicates if more keys are available for scanning (true if any cursor is non-zero).
// err: Error, if any occurred during execution.
func (r *RedisV9) GetKeysWithOpts(ctx context.Context,
searchStr string,
cursor uint64,
count int,
) ([]string, int, error) {
cursor map[string]uint64,
count int64,
) ([]string, map[string]uint64, bool, error) {
var keys []string
var mutex sync.Mutex
var continueScan bool

if cursor == nil {
cursor = make(map[string]uint64)
}

currentCursor := 0
switch client := r.client.(type) {
case *redis.ClusterClient:
err := client.ForEachMaster(ctx, func(ctx context.Context, client *redis.Client) error {
localKeys, cursor, err := fetchKeys(ctx, client, searchStr, cursor, int64(count))
currentCursor, exists := cursor[client.String()]
if exists && currentCursor == 0 {
// Cursor is zero, no more keys to scan
return nil
}

localKeys, fkCursor, err := fetchKeys(ctx, client, searchStr, cursor[client.String()], count)
if err != nil {
return err
}

mutex.Lock()
keys = append(keys, localKeys...)

currentCursor = int(cursor)

cursor[client.String()] = fkCursor
if fkCursor != 0 {
continueScan = true
}
mutex.Unlock()

return nil
})

if errors.Is(err, redis.ErrClosed) {
return keys, 0, temperr.ClosedConnection
return keys, cursor, continueScan, temperr.ClosedConnection
}

if err != nil {
return keys, 0, err
return keys, cursor, continueScan, err
}

case *redis.Client:
localKeys, cursor, err := fetchKeys(ctx, client, searchStr, cursor, int64(count))
localKeys, fkCursor, err := fetchKeys(ctx, client, searchStr, cursor[client.String()], int64(count))
if err != nil {
if errors.Is(err, redis.ErrClosed) {
return localKeys, currentCursor, temperr.ClosedConnection
return localKeys, cursor, continueScan, temperr.ClosedConnection
}

return localKeys, currentCursor, err
return localKeys, cursor, continueScan, err
}

currentCursor = int(cursor)
cursor[client.String()] = fkCursor

if fkCursor != 0 {
continueScan = true
}
keys = localKeys

default:
return nil, currentCursor, temperr.InvalidRedisClient
return nil, cursor, continueScan, temperr.InvalidRedisClient
}

return keys, currentCursor, nil
return keys, cursor, continueScan, nil
}

func fetchKeys(ctx context.Context,
Expand Down
167 changes: 139 additions & 28 deletions temporal/keyvalue/keyvalue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"os"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -952,11 +953,13 @@ func TestKeyValue_GetKeysWithOpts(t *testing.T) {
name string
setup func(model.KeyValue)
searchStr string
cursor uint64
count int
cursor map[string]uint64
count int64
expectedKeysCheck func([]string) bool
expectedCursorCheck func(uint64) bool
expectedCursorCheck func(cursorMap map[string]uint64) bool
continueScanCheck bool
expectedErr error
onlyCluster bool // Only run certain checks on cluster connectors
}{
{
name: "valid_search",
Expand All @@ -966,27 +969,47 @@ func TestKeyValue_GetKeysWithOpts(t *testing.T) {
assert.NoError(t, redis.Set(ctx, "key1", "value1", 0))
},
searchStr: "key*",
cursor: 0,
cursor: nil,
count: 10,
expectedKeysCheck: func(s []string) bool {
return len(s) == 2 && (s[0] == "key1" || s[0] == "key2") && (s[1] == "key1" || s[1] == "key2")
},
expectedCursorCheck: func(c uint64) bool {
return c == 0
expectedCursorCheck: func(cursorMap map[string]uint64) bool {
if len(cursorMap) == 0 {
return false
}

for _, c := range cursorMap {
if c != 0 {
return false
}
}

return true
},
expectedErr: nil,
},
{
name: "empty_search",
setup: nil,
searchStr: "",
cursor: 0,
cursor: nil,
count: 10,
expectedKeysCheck: func(s []string) bool {
return len(s) == 0
},
expectedCursorCheck: func(c uint64) bool {
return c == 0
expectedCursorCheck: func(cursorMap map[string]uint64) bool {
if len(cursorMap) == 0 {
return false
}

for _, c := range cursorMap {
if c != 0 {
return false
}
}

return true
},
expectedErr: nil,
},
Expand All @@ -998,29 +1021,49 @@ func TestKeyValue_GetKeysWithOpts(t *testing.T) {
assert.NoError(t, kv.Set(ctx, "specific2", "value2", 0))
},
searchStr: "specific*",
cursor: 0,
cursor: nil,
count: 10,
expectedKeysCheck: func(s []string) bool {
return len(s) == 2 &&
(s[0] == "specific1" || s[0] == "specific2") &&
(s[1] == "specific1" || s[1] == "specific2")
},
expectedCursorCheck: func(c uint64) bool {
return c == 0
expectedCursorCheck: func(cursorMap map[string]uint64) bool {
if len(cursorMap) == 0 {
return false
}

for _, c := range cursorMap {
if c != 0 {
return false
}
}

return true
},
expectedErr: nil,
},
{
name: "non_matching_pattern",
setup: nil,
searchStr: "nomatch*",
cursor: 0,
cursor: nil,
count: 10,
expectedKeysCheck: func(s []string) bool {
return len(s) == 0
},
expectedCursorCheck: func(c uint64) bool {
return c == 0
expectedCursorCheck: func(cursorMap map[string]uint64) bool {
if len(cursorMap) == 0 {
return false
}

for _, c := range cursorMap {
if c != 0 {
return false
}
}

return true
},
expectedErr: nil,
},
Expand All @@ -1033,7 +1076,7 @@ func TestKeyValue_GetKeysWithOpts(t *testing.T) {
}
},
searchStr: "pagekey*",
cursor: 0,
cursor: nil,
count: 5, // Count is 1 but Redis SCAN does not guarantee that it will return 1 keys
expectedKeysCheck: func(s []string) bool {
if len(s) == 0 {
Expand All @@ -1055,11 +1098,21 @@ func TestKeyValue_GetKeysWithOpts(t *testing.T) {

return true
},
expectedCursorCheck: func(c uint64) bool {
// Cursor should be different than 0 as there are more keys to be fetched
return c != 0
expectedCursorCheck: func(cursorMap map[string]uint64) bool {
if len(cursorMap) == 0 {
return false
}

for _, c := range cursorMap {
if c == 0 {
return false
}
}

return true
},
expectedErr: nil,
continueScanCheck: true,
expectedErr: nil,
},
{
name: "count_higher_than_actual_keys",
Expand All @@ -1070,26 +1123,76 @@ func TestKeyValue_GetKeysWithOpts(t *testing.T) {
}
},
searchStr: "cursorkey*",
cursor: 0,
cursor: nil,
count: 100,
expectedKeysCheck: func(s []string) bool {
return len(s) == 15
},
expectedCursorCheck: func(c uint64) bool {
return c == 0
expectedCursorCheck: func(cursorMap map[string]uint64) bool {
if len(cursorMap) == 0 {
return false
}

for _, c := range cursorMap {
if c != 0 {
return false
}
}

return true
},
expectedErr: nil,
},
{
name: "mixed_cursors_in_cluster",
setup: func(kv model.KeyValue) {
ctx := context.Background()
for i := 0; i < 15; i++ {
assert.NoError(t, kv.Set(ctx, fmt.Sprintf("mixedkey%d", i), fmt.Sprintf("value%d", i), 0))
}
},
searchStr: "mixed*",
cursor: nil,
count: 2,
expectedKeysCheck: func(s []string) bool {
return len(s) != 0
},
expectedCursorCheck: func(cursorMap map[string]uint64) bool {
// Ensuring some cursors are zero and others are not
var zeroExists, nonZeroExists bool
for _, c := range cursorMap {
if c == 0 {
zeroExists = true
} else {
nonZeroExists = true
}
}
return zeroExists && nonZeroExists
},
continueScanCheck: true,
expectedErr: nil,
onlyCluster: true,
},
{
name: "test_with_error_condition",
searchStr: "keys*",
cursor: 0,
cursor: nil,
count: 10,
expectedKeysCheck: func(s []string) bool {
return false
},
expectedCursorCheck: func(c uint64) bool {
return c == 0
expectedCursorCheck: func(cursorMap map[string]uint64) bool {
if len(cursorMap) == 0 {
return false
}

for _, c := range cursorMap {
if c != 0 {
return false
}
}

return true
},
expectedErr: temperr.ClosedConnection,
},
Expand All @@ -1116,11 +1219,19 @@ func TestKeyValue_GetKeysWithOpts(t *testing.T) {
tc.setup(kv)
}

keys, cursor, err := kv.GetKeysWithOpts(ctx, tc.searchStr, tc.cursor, tc.count)
keys, newCursor, continueScan, err := kv.GetKeysWithOpts(ctx, tc.searchStr, tc.cursor, tc.count)
assert.Equal(t, tc.expectedErr, err)
assert.Equal(t, tc.continueScanCheck, continueScan)
if err == nil {
assert.True(t, tc.expectedKeysCheck(keys))
assert.True(t, tc.expectedCursorCheck(uint64(cursor)))
if tc.onlyCluster {
if os.Getenv("REDIS_CLUSTER") == "true" {
assert.True(t, tc.expectedCursorCheck(newCursor))
}
return
}

assert.True(t, tc.expectedCursorCheck(newCursor))
}
})
}
Expand Down
Loading

0 comments on commit 3bdf654

Please sign in to comment.