From a388a651d54d9d239d1cab250861ba9e717c49c2 Mon Sep 17 00:00:00 2001 From: tbuchaillot Date: Tue, 23 Jan 2024 12:45:09 +0100 Subject: [PATCH] different approach for fixing cursors on GetKeysWithOpts --- temporal/internal/driver/redisv9/keyvalue.go | 67 ++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/temporal/internal/driver/redisv9/keyvalue.go b/temporal/internal/driver/redisv9/keyvalue.go index e93adf60..cd807cf4 100644 --- a/temporal/internal/driver/redisv9/keyvalue.go +++ b/temporal/internal/driver/redisv9/keyvalue.go @@ -335,6 +335,7 @@ func (r *RedisV9) GetKeysAndValuesWithFilter(ctx context.Context, } // GetKeysWithOpts retrieves keys with options like filter, cursor, and count +// It properly handles cursors for Redis Clusters by aggregating cursors from all masters. func (r *RedisV9) GetKeysWithOpts(ctx context.Context, searchStr string, cursor uint64, @@ -342,6 +343,72 @@ func (r *RedisV9) GetKeysWithOpts(ctx context.Context, ) ([]string, uint64, error) { var keys []string var finalCursor uint64 + var err error + + switch client := r.client.(type) { + case *redis.ClusterClient: + clusterSize, err := client.ClusterNodes(ctx).Result() + if err != nil { + return nil, 0, err + } + + // Calculate the number of keys to fetch per master based on count and cluster size + perNodeCount := count / len(clusterSize) + if perNodeCount == 0 { + perNodeCount = 1 // Ensure at least one key is fetched per node + } + + err = client.ForEachMaster(ctx, func(ctx context.Context, client *redis.Client) error { + localKeys, localCursor, err := fetchKeys(ctx, client, searchStr, cursor, int64(perNodeCount)) + if err != nil { + return err + } + + keys = append(keys, localKeys...) + // For cluster, always set the final cursor to 0 when any master reports it's done + if localCursor == 0 { + finalCursor = 0 + } + return nil + }) + if err != nil { + if errors.Is(err, redis.ErrClosed) { + return keys, finalCursor, temperr.ClosedConnection + } + + return keys, finalCursor, err + } + + // If finalCursor is still the initial value, it means no master has reported it's done + if finalCursor != 0 { + finalCursor = cursor // Use the initial cursor as the next cursor + } + + case *redis.Client: + keys, finalCursor, err = fetchKeys(ctx, client, searchStr, cursor, int64(count)) + if err != nil { + if errors.Is(err, redis.ErrClosed) { + return keys, finalCursor, temperr.ClosedConnection + } + + return keys, finalCursor, err + } + + default: + return nil, 0, temperr.InvalidRedisClient + } + + return keys, finalCursor, nil +} + +// GetKeysWithOpts retrieves keys with options like filter, cursor, and count +func (r *RedisV9) GetKeysWithOpts2(ctx context.Context, + searchStr string, + cursor uint64, + count int, +) ([]string, uint64, error) { + var keys []string + var finalCursor uint64 switch client := r.client.(type) { case *redis.ClusterClient: