Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8922,27 +8922,37 @@ var _ = Describe("Commands", func() {
const key = "latency-monitor-threshold"

old := client.ConfigGet(ctx, key).Val()
client.ConfigSet(ctx, key, "1")
// Use a higher threshold (100ms) to avoid capturing normal operations
// that could cause flakiness due to timing variations
client.ConfigSet(ctx, key, "100")
defer client.ConfigSet(ctx, key, old[key])

result, err := client.Latency(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(result)).Should(Equal(0))

err = client.Do(ctx, "DEBUG", "SLEEP", 0.01).Err()
// Use a longer sleep (150ms) to ensure it exceeds the 100ms threshold
err = client.Do(ctx, "DEBUG", "SLEEP", 0.15).Err()
Expect(err).NotTo(HaveOccurred())

result, err = client.Latency(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(result)).Should(Equal(1))
Expect(len(result)).Should(BeNumerically(">=", 1))

// reset latency by event name
err = client.LatencyReset(ctx, result[0].Name).Err()
eventName := result[0].Name
err = client.LatencyReset(ctx, eventName).Err()
Expect(err).NotTo(HaveOccurred())

// Verify the specific event was reset (not that all events are gone)
// This avoids flakiness from other operations triggering latency events
result, err = client.Latency(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(result)).Should(Equal(0))
for _, event := range result {
if event.Name == eventName {
Fail("Event " + eventName + " should have been reset")
}
}
})
})
})
Expand Down
135 changes: 108 additions & 27 deletions osscluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,29 @@ type ClusterOptions struct {
MinRetryBackoff time.Duration
MaxRetryBackoff time.Duration

DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration

// DialerRetries is the maximum number of retry attempts when dialing fails.
//
// default: 5
DialerRetries int

// DialerRetryTimeout is the backoff duration between retry attempts.
//
// default: 100 milliseconds
DialerRetryTimeout time.Duration

ContextTimeoutEnabled bool

PoolFIFO bool
PoolSize int // applies per cluster node and not for the whole cluster
PoolFIFO bool
PoolSize int // applies per cluster node and not for the whole cluster

// MaxConcurrentDials is the maximum number of concurrent connection creation goroutines.
// If <= 0, defaults to PoolSize. If > PoolSize, it will be capped at PoolSize.
MaxConcurrentDials int

PoolTimeout time.Duration
MinIdleConns int
MaxIdleConns int
Expand Down Expand Up @@ -157,7 +173,8 @@ type ClusterOptions struct {
// cluster upgrade notifications gracefully and manage connection/pool state
// transitions seamlessly. Requires Protocol: 3 (RESP3) for push notifications.
// If nil, maintnotifications upgrades are in "auto" mode and will be enabled if the server supports it.
// The ClusterClient does not directly work with maintnotifications, it is up to the clients in the Nodes map to work with maintnotifications.
// The ClusterClient supports SMIGRATING and SMIGRATED notifications for cluster state management.
// Individual node clients handle other maintenance notifications (MOVING, MIGRATING, etc.).
MaintNotificationsConfig *maintnotifications.Config
// ShardPicker is used to pick a shard when the request_policy is
// ReqDefault and the command has no keys.
Expand All @@ -176,9 +193,24 @@ func (opt *ClusterOptions) init() {
opt.ReadOnly = true
}

if opt.DialTimeout == 0 {
opt.DialTimeout = 5 * time.Second
}
if opt.DialerRetries == 0 {
opt.DialerRetries = 5
}
if opt.DialerRetryTimeout == 0 {
opt.DialerRetryTimeout = 100 * time.Millisecond
}

if opt.PoolSize == 0 {
opt.PoolSize = 5 * runtime.GOMAXPROCS(0)
}
if opt.MaxConcurrentDials <= 0 {
opt.MaxConcurrentDials = opt.PoolSize
} else if opt.MaxConcurrentDials > opt.PoolSize {
opt.MaxConcurrentDials = opt.PoolSize
}
if opt.ReadBufferSize == 0 {
opt.ReadBufferSize = proto.DefaultBufferSize
}
Expand Down Expand Up @@ -320,10 +352,13 @@ func setupClusterQueryParams(u *url.URL, o *ClusterOptions) (*ClusterOptions, er
o.MinRetryBackoff = q.duration("min_retry_backoff")
o.MaxRetryBackoff = q.duration("max_retry_backoff")
o.DialTimeout = q.duration("dial_timeout")
o.DialerRetries = q.int("dialer_retries")
o.DialerRetryTimeout = q.duration("dialer_retry_timeout")
o.ReadTimeout = q.duration("read_timeout")
o.WriteTimeout = q.duration("write_timeout")
o.PoolFIFO = q.bool("pool_fifo")
o.PoolSize = q.int("pool_size")
o.MaxConcurrentDials = q.int("max_concurrent_dials")
o.MinIdleConns = q.int("min_idle_conns")
o.MaxIdleConns = q.int("max_idle_conns")
o.MaxActiveConns = q.int("max_active_conns")
Expand Down Expand Up @@ -379,21 +414,25 @@ func (opt *ClusterOptions) clientOptions() *Options {
MinRetryBackoff: opt.MinRetryBackoff,
MaxRetryBackoff: opt.MaxRetryBackoff,

DialTimeout: opt.DialTimeout,
ReadTimeout: opt.ReadTimeout,
WriteTimeout: opt.WriteTimeout,
DialTimeout: opt.DialTimeout,
DialerRetries: opt.DialerRetries,
DialerRetryTimeout: opt.DialerRetryTimeout,
ReadTimeout: opt.ReadTimeout,
WriteTimeout: opt.WriteTimeout,

ContextTimeoutEnabled: opt.ContextTimeoutEnabled,

PoolFIFO: opt.PoolFIFO,
PoolSize: opt.PoolSize,
PoolTimeout: opt.PoolTimeout,
MinIdleConns: opt.MinIdleConns,
MaxIdleConns: opt.MaxIdleConns,
MaxActiveConns: opt.MaxActiveConns,
ConnMaxIdleTime: opt.ConnMaxIdleTime,
ConnMaxLifetime: opt.ConnMaxLifetime,
ReadBufferSize: opt.ReadBufferSize,
WriteBufferSize: opt.WriteBufferSize,
PoolFIFO: opt.PoolFIFO,
PoolSize: opt.PoolSize,
MaxConcurrentDials: opt.MaxConcurrentDials,
PoolTimeout: opt.PoolTimeout,
MinIdleConns: opt.MinIdleConns,
MaxIdleConns: opt.MaxIdleConns,
MaxActiveConns: opt.MaxActiveConns,
ConnMaxIdleTime: opt.ConnMaxIdleTime,
ConnMaxLifetime: opt.ConnMaxLifetime,
ReadBufferSize: opt.ReadBufferSize,
WriteBufferSize: opt.WriteBufferSize,
DisableIdentity: opt.DisableIdentity,
DisableIndentity: opt.DisableIdentity,
IdentitySuffix: opt.IdentitySuffix,
Expand Down Expand Up @@ -984,9 +1023,11 @@ func (c *clusterState) slotNodes(slot int) []*clusterNode {
//------------------------------------------------------------------------------

type clusterStateHolder struct {
load func(ctx context.Context) (*clusterState, error)
state atomic.Value
reloading uint32 // atomic
load func(ctx context.Context) (*clusterState, error)

state atomic.Value
reloading uint32 // atomic
reloadPending uint32 // atomic - set to 1 when reload is requested during active reload
}

func newClusterStateHolder(load func(ctx context.Context) (*clusterState, error)) *clusterStateHolder {
Expand All @@ -1005,17 +1046,37 @@ func (c *clusterStateHolder) Reload(ctx context.Context) (*clusterState, error)
}

func (c *clusterStateHolder) LazyReload() {
// If already reloading, mark that another reload is pending
if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) {
atomic.StoreUint32(&c.reloadPending, 1)
return
}

go func() {
defer atomic.StoreUint32(&c.reloading, 0)
for {
_, err := c.Reload(context.Background())
if err != nil {
atomic.StoreUint32(&c.reloadPending, 0)
atomic.StoreUint32(&c.reloading, 0)
return
}

_, err := c.Reload(context.Background())
if err != nil {
return
// Clear pending flag after reload completes, before cooldown
// This captures notifications that arrived during the reload
atomic.StoreUint32(&c.reloadPending, 0)

// Wait cooldown period
time.Sleep(200 * time.Millisecond)

// Check if another reload was requested during cooldown
if atomic.LoadUint32(&c.reloadPending) == 0 {
// No pending reload, we're done
atomic.StoreUint32(&c.reloading, 0)
return
}

// Pending reload requested, loop to reload again
}
time.Sleep(200 * time.Millisecond)
}()
}

Expand Down Expand Up @@ -1079,6 +1140,26 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
txPipeline: c.processTxPipeline,
})

// Set up SMIGRATED notification handling for cluster state reload
// When a node client receives a SMIGRATED notification, it should trigger
// cluster state reload on the parent ClusterClient
if opt.MaintNotificationsConfig != nil {
c.nodes.OnNewNode(func(nodeClient *Client) {
manager := nodeClient.GetMaintNotificationsManager()
if manager != nil {
manager.SetClusterStateReloadCallback(func(ctx context.Context, hostPort string, slotRanges []string) {
// Log the migration details for now
if internal.LogLevel.InfoOrAbove() {
internal.Logger.Printf(ctx, "cluster: slots %v migrated to %s, reloading cluster state", slotRanges, hostPort)
}
// Currently we reload the entire cluster state
// In the future, this could be optimized to reload only the specific slots
c.state.LazyReload()
})
}
})
}
Comment on lines +1146 to +1161
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cluster state reload callback is only registered for new nodes via OnNewNode(). This means that any nodes that already exist when the ClusterClient is created will not have this callback registered, and SMIGRATED notifications from these existing nodes will not trigger cluster state reloads.

Consider also registering the callback for existing nodes, similar to how it's done in maintnotifications/e2e/notiftracker_test.go:

if opt.MaintNotificationsConfig != nil {
	// Register callback for existing nodes
	ctx := context.Background()
	_ = c.ForEachShard(ctx, func(ctx context.Context, nodeClient *Client) error {
		manager := nodeClient.GetMaintNotificationsManager()
		if manager != nil {
			manager.SetClusterStateReloadCallback(func(ctx context.Context, hostPort string, slotRanges []string) {
				// ... callback implementation
			})
		}
		return nil
	})
	
	// Register callback for new nodes
	c.nodes.OnNewNode(func(nodeClient *Client) {
		// ... existing implementation
	})
}

Copilot uses AI. Check for mistakes.

return c
}

Expand Down
Loading