Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Different approach for fixing cursors on GetKeysWithOpts #104

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions temporal/internal/driver/redisv9/keyvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,13 +335,80 @@ func (r *RedisV9) GetKeysAndValuesWithFilter(ctx context.Context,
}

// GetKeysWithOpts retrieves keys with options like filter, cursor, and count
// It properly handles cursors for Redis Clusters by aggregating cursors from all masters.
func (r *RedisV9) GetKeysWithOpts(ctx context.Context,
searchStr string,
cursor uint64,
count int,
) ([]string, uint64, error) {
var keys []string
var finalCursor uint64
var err error

switch client := r.client.(type) {
case *redis.ClusterClient:
clusterSize, err := client.ClusterNodes(ctx).Result()
if err != nil {
return nil, 0, err
}

// Calculate the number of keys to fetch per master based on count and cluster size
perNodeCount := count / len(clusterSize)
if perNodeCount == 0 {
perNodeCount = 1 // Ensure at least one key is fetched per node
}

err = client.ForEachMaster(ctx, func(ctx context.Context, client *redis.Client) error {
localKeys, localCursor, err := fetchKeys(ctx, client, searchStr, cursor, int64(perNodeCount))
if err != nil {
return err
}

keys = append(keys, localKeys...)
// For cluster, always set the final cursor to 0 when any master reports it's done
if localCursor == 0 {
finalCursor = 0
}
return nil
})
if err != nil {
if errors.Is(err, redis.ErrClosed) {
return keys, finalCursor, temperr.ClosedConnection
}

return keys, finalCursor, err
}

// If finalCursor is still the initial value, it means no master has reported it's done
if finalCursor != 0 {
finalCursor = cursor // Use the initial cursor as the next cursor
}

case *redis.Client:
keys, finalCursor, err = fetchKeys(ctx, client, searchStr, cursor, int64(count))
if err != nil {
if errors.Is(err, redis.ErrClosed) {
return keys, finalCursor, temperr.ClosedConnection
}

return keys, finalCursor, err
}

default:
return nil, 0, temperr.InvalidRedisClient
}

return keys, finalCursor, nil
}

// GetKeysWithOpts retrieves keys with options like filter, cursor, and count
func (r *RedisV9) GetKeysWithOpts2(ctx context.Context,
searchStr string,
cursor uint64,
count int,
) ([]string, uint64, error) {
var keys []string
var finalCursor uint64

switch client := r.client.(type) {
case *redis.ClusterClient:
Expand Down
Loading