diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index b3fc813611..14e01860ef 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -4,7 +4,7 @@ on: push: branches: [master, v9, 'v9.*'] pull_request: - branches: [master, v9, v9.7, v9.8, 'ndyakov/*', 'ofekshenawa/*', 'htemelski-redis/*', 'ce/*'] + branches: [master, v9, v9.7, v9.8, 'ndyakov/**', 'ofekshenawa/**', 'ce/**'] permissions: contents: read diff --git a/internal/maintnotifications/logs/log_messages.go b/internal/maintnotifications/logs/log_messages.go index 34cb1692d9..7418d9f652 100644 --- a/internal/maintnotifications/logs/log_messages.go +++ b/internal/maintnotifications/logs/log_messages.go @@ -121,6 +121,11 @@ const ( UnrelaxedTimeoutMessage = "clearing relaxed timeout" ManagerNotInitializedMessage = "manager not initialized" FailedToMarkForHandoffMessage = "failed to mark connection for handoff" + InvalidSeqIDInSMigratingNotificationMessage = "invalid SeqID in SMIGRATING notification" + InvalidSeqIDInSMigratedNotificationMessage = "invalid SeqID in SMIGRATED notification" + InvalidHostPortInSMigratedNotificationMessage = "invalid host:port in SMIGRATED notification" + SlotMigratingMessage = "slots migrating, applying relaxed timeout" + SlotMigratedMessage = "slots migrated, triggering cluster state reload" // ======================================== // used in pool/conn @@ -623,3 +628,43 @@ func ExtractDataFromLogMessage(logMessage string) map[string]interface{} { // If JSON parsing fails, return empty map return result } + +// Cluster notification functions +func InvalidSeqIDInSMigratingNotification(seqID interface{}) string { + message := fmt.Sprintf("%s: %v", InvalidSeqIDInSMigratingNotificationMessage, seqID) + return appendJSONIfDebug(message, map[string]interface{}{ + "seqID": fmt.Sprintf("%v", seqID), + }) +} + +func InvalidSeqIDInSMigratedNotification(seqID interface{}) string { + message := fmt.Sprintf("%s: %v", InvalidSeqIDInSMigratedNotificationMessage, seqID) + return appendJSONIfDebug(message, map[string]interface{}{ + "seqID": fmt.Sprintf("%v", seqID), + }) +} + +func InvalidHostPortInSMigratedNotification(hostPort interface{}) string { + message := fmt.Sprintf("%s: %v", InvalidHostPortInSMigratedNotificationMessage, hostPort) + return appendJSONIfDebug(message, map[string]interface{}{ + "hostPort": fmt.Sprintf("%v", hostPort), + }) +} + +func SlotMigrating(connID uint64, seqID int64, slotRanges []string) string { + message := fmt.Sprintf("conn[%d] %s seqID=%d slots=%v", connID, SlotMigratingMessage, seqID, slotRanges) + return appendJSONIfDebug(message, map[string]interface{}{ + "connID": connID, + "seqID": seqID, + "slotRanges": slotRanges, + }) +} + +func SlotMigrated(seqID int64, hostPort string, slotRanges []string) string { + message := fmt.Sprintf("%s seqID=%d host:port=%s slots=%v", SlotMigratedMessage, seqID, hostPort, slotRanges) + return appendJSONIfDebug(message, map[string]interface{}{ + "seqID": seqID, + "hostPort": hostPort, + "slotRanges": slotRanges, + }) +} diff --git a/maintnotifications/README.md b/maintnotifications/README.md index 2ac6b9cb1d..c931e61f8f 100644 --- a/maintnotifications/README.md +++ b/maintnotifications/README.md @@ -2,8 +2,14 @@ Seamless Redis connection handoffs during cluster maintenance operations without dropping connections. -## ⚠️ **Important Note** -**Maintenance notifications are currently supported only in standalone Redis clients.** Cluster clients (ClusterClient, FailoverClient, etc.) do not yet support this functionality. +## Cluster Support + +**Cluster notifications are now supported for ClusterClient!** + +- **SMIGRATING**: `["SMIGRATING", SeqID, slot/range, ...]` - Relaxes timeouts when slots are being migrated +- **SMIGRATED**: `["SMIGRATED", SeqID, host:port, slot/range, ...]` - Reloads cluster state when slot migration completes + +**Note:** Other maintenance notifications (MOVING, MIGRATING, MIGRATED, FAILING_OVER, FAILED_OVER) are supported only in standalone Redis clients. Cluster clients support SMIGRATING and SMIGRATED for cluster-specific slot migration handling. ## Quick Start diff --git a/maintnotifications/manager.go b/maintnotifications/manager.go index 775c163e14..cf35b9605a 100644 --- a/maintnotifications/manager.go +++ b/maintnotifications/manager.go @@ -18,11 +18,13 @@ import ( // Push notification type constants for maintenance const ( - NotificationMoving = "MOVING" - NotificationMigrating = "MIGRATING" - NotificationMigrated = "MIGRATED" - NotificationFailingOver = "FAILING_OVER" - NotificationFailedOver = "FAILED_OVER" + NotificationMoving = "MOVING" // Per-connection handoff notification + NotificationMigrating = "MIGRATING" // Per-connection migration start notification - relaxes timeouts + NotificationMigrated = "MIGRATED" // Per-connection migration complete notification - clears relaxed timeouts + NotificationFailingOver = "FAILING_OVER" // Per-connection failover start notification - relaxes timeouts + NotificationFailedOver = "FAILED_OVER" // Per-connection failover complete notification - clears relaxed timeouts + NotificationSMigrating = "SMIGRATING" // Cluster slot migrating notification - relaxes timeouts + NotificationSMigrated = "SMIGRATED" // Cluster slot migrated notification - triggers cluster state reload ) // maintenanceNotificationTypes contains all notification types that maintenance handles @@ -32,6 +34,8 @@ var maintenanceNotificationTypes = []string{ NotificationMigrated, NotificationFailingOver, NotificationFailedOver, + NotificationSMigrating, + NotificationSMigrated, } // NotificationHook is called before and after notification processing @@ -65,6 +69,10 @@ type Manager struct { // MOVING operation tracking - using sync.Map for better concurrent performance activeMovingOps sync.Map // map[MovingOperationKey]*MovingOperation + // SMIGRATED notification deduplication - tracks processed SeqIDs + // Multiple connections may receive the same SMIGRATED notification + processedSMigratedSeqIDs sync.Map // map[int64]bool + // Atomic state tracking - no locks needed for state queries activeOperationCount atomic.Int64 // Number of active operations closed atomic.Bool // Manager closed state @@ -73,6 +81,9 @@ type Manager struct { hooks []NotificationHook hooksMu sync.RWMutex // Protects hooks slice poolHooksRef *PoolHook + + // Cluster state reload callback for SMIGRATED notifications + clusterStateReloadCallback ClusterStateReloadCallback } // MovingOperation tracks an active MOVING operation. @@ -83,6 +94,14 @@ type MovingOperation struct { Deadline time.Time } +// ClusterStateReloadCallback is a callback function that triggers cluster state reload. +// This is used by node clients to notify their parent ClusterClient about SMIGRATED notifications. +// The hostPort parameter indicates the destination node (e.g., "127.0.0.1:6379"). +// The slotRanges parameter contains the migrated slots (e.g., ["1234", "5000-6000"]). +// Currently, implementations typically reload the entire cluster state, but in the future +// this could be optimized to reload only the specific slots. +type ClusterStateReloadCallback func(ctx context.Context, hostPort string, slotRanges []string) + // NewManager creates a new simplified manager. func NewManager(client interfaces.ClientInterface, pool pool.Pooler, config *Config) (*Manager, error) { if client == nil { @@ -223,6 +242,15 @@ func (hm *Manager) GetActiveOperationCount() int64 { return hm.activeOperationCount.Load() } +// MarkSMigratedSeqIDProcessed attempts to mark a SMIGRATED SeqID as processed. +// Returns true if this is the first time processing this SeqID (should process), +// false if it was already processed (should skip). +// This prevents duplicate processing when multiple connections receive the same notification. +func (hm *Manager) MarkSMigratedSeqIDProcessed(seqID int64) bool { + _, alreadyProcessed := hm.processedSMigratedSeqIDs.LoadOrStore(seqID, true) + return !alreadyProcessed // Return true if NOT already processed +} + // Close closes the manager. func (hm *Manager) Close() error { // Use atomic operation for thread-safe close check @@ -318,3 +346,17 @@ func (hm *Manager) AddNotificationHook(notificationHook NotificationHook) { defer hm.hooksMu.Unlock() hm.hooks = append(hm.hooks, notificationHook) } + +// SetClusterStateReloadCallback sets the callback function that will be called when a SMIGRATED notification is received. +// This allows node clients to notify their parent ClusterClient to reload cluster state. +func (hm *Manager) SetClusterStateReloadCallback(callback ClusterStateReloadCallback) { + hm.clusterStateReloadCallback = callback +} + +// TriggerClusterStateReload calls the cluster state reload callback if it's set. +// This is called when a SMIGRATED notification is received. +func (hm *Manager) TriggerClusterStateReload(ctx context.Context, hostPort string, slotRanges []string) { + if hm.clusterStateReloadCallback != nil { + hm.clusterStateReloadCallback(ctx, hostPort, slotRanges) + } +} diff --git a/maintnotifications/manager_test.go b/maintnotifications/manager_test.go index 35dc4a32ab..aed0c9f7d5 100644 --- a/maintnotifications/manager_test.go +++ b/maintnotifications/manager_test.go @@ -217,6 +217,8 @@ func TestManagerRefactoring(t *testing.T) { NotificationMigrated, NotificationFailingOver, NotificationFailedOver, + NotificationSMigrating, + NotificationSMigrated, } if len(maintenanceNotificationTypes) != len(expectedTypes) { diff --git a/maintnotifications/pool_hook_test.go b/maintnotifications/pool_hook_test.go index 6ec61eeda0..972605949d 100644 --- a/maintnotifications/pool_hook_test.go +++ b/maintnotifications/pool_hook_test.go @@ -700,9 +700,25 @@ func TestConnectionHook(t *testing.T) { t.Errorf("Connection should be pooled after handoff (shouldPool=%v, shouldRemove=%v)", shouldPool, shouldRemove) } - // Wait for handoff to complete - time.Sleep(50 * time.Millisecond) + // Wait for handoff to complete with polling instead of fixed sleep + // This avoids flakiness on slow CI runners where 50ms may not be enough + maxWait := 500 * time.Millisecond + pollInterval := 10 * time.Millisecond + deadline := time.Now().Add(maxWait) + handoffCompleted := false + for time.Now().Before(deadline) { + if conn.IsUsable() && !processor.IsHandoffPending(conn) { + handoffCompleted = true + break + } + time.Sleep(pollInterval) + } + + if !handoffCompleted { + t.Fatalf("Handoff did not complete within %v (IsUsable=%v, IsHandoffPending=%v)", + maxWait, conn.IsUsable(), processor.IsHandoffPending(conn)) + } // After handoff completion, connection should be usable again if !conn.IsUsable() { t.Error("Connection should be usable after handoff completion") diff --git a/maintnotifications/push_notification_handler.go b/maintnotifications/push_notification_handler.go index 937b4ae82e..020400208e 100644 --- a/maintnotifications/push_notification_handler.go +++ b/maintnotifications/push_notification_handler.go @@ -49,6 +49,10 @@ func (snh *NotificationHandler) HandlePushNotification(ctx context.Context, hand err = snh.handleFailingOver(ctx, handlerCtx, modifiedNotification) case NotificationFailedOver: err = snh.handleFailedOver(ctx, handlerCtx, modifiedNotification) + case NotificationSMigrating: + err = snh.handleSMigrating(ctx, handlerCtx, modifiedNotification) + case NotificationSMigrated: + err = snh.handleSMigrated(ctx, handlerCtx, modifiedNotification) default: // Ignore other notification types (e.g., pub/sub messages) err = nil @@ -61,7 +65,9 @@ func (snh *NotificationHandler) HandlePushNotification(ctx context.Context, hand } // handleMoving processes MOVING notifications. -// ["MOVING", seqNum, timeS, endpoint] - per-connection handoff +// MOVING indicates that a connection should be handed off to a new endpoint. +// This is a per-connection notification that triggers connection handoff. +// Expected format: ["MOVING", seqNum, timeS, endpoint] func (snh *NotificationHandler) handleMoving(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error { if len(notification) < 3 { internal.Logger.Printf(ctx, logs.InvalidNotification("MOVING", notification)) @@ -167,9 +173,10 @@ func (snh *NotificationHandler) markConnForHandoff(conn *pool.Conn, newEndpoint } // handleMigrating processes MIGRATING notifications. +// MIGRATING indicates that a connection migration is starting. +// This is a per-connection notification that applies relaxed timeouts. +// Expected format: ["MIGRATING", ...] func (snh *NotificationHandler) handleMigrating(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error { - // MIGRATING notifications indicate that a connection is about to be migrated - // Apply relaxed timeouts to the specific connection that received this notification if len(notification) < 2 { internal.Logger.Printf(ctx, logs.InvalidNotification("MIGRATING", notification)) return ErrInvalidNotification @@ -195,9 +202,10 @@ func (snh *NotificationHandler) handleMigrating(ctx context.Context, handlerCtx } // handleMigrated processes MIGRATED notifications. +// MIGRATED indicates that a connection migration has completed. +// This is a per-connection notification that clears relaxed timeouts. +// Expected format: ["MIGRATED", ...] func (snh *NotificationHandler) handleMigrated(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error { - // MIGRATED notifications indicate that a connection migration has completed - // Restore normal timeouts for the specific connection that received this notification if len(notification) < 2 { internal.Logger.Printf(ctx, logs.InvalidNotification("MIGRATED", notification)) return ErrInvalidNotification @@ -224,9 +232,10 @@ func (snh *NotificationHandler) handleMigrated(ctx context.Context, handlerCtx p } // handleFailingOver processes FAILING_OVER notifications. +// FAILING_OVER indicates that a failover is starting. +// This is a per-connection notification that applies relaxed timeouts. +// Expected format: ["FAILING_OVER", ...] func (snh *NotificationHandler) handleFailingOver(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error { - // FAILING_OVER notifications indicate that a connection is about to failover - // Apply relaxed timeouts to the specific connection that received this notification if len(notification) < 2 { internal.Logger.Printf(ctx, logs.InvalidNotification("FAILING_OVER", notification)) return ErrInvalidNotification @@ -253,9 +262,10 @@ func (snh *NotificationHandler) handleFailingOver(ctx context.Context, handlerCt } // handleFailedOver processes FAILED_OVER notifications. +// FAILED_OVER indicates that a failover has completed. +// This is a per-connection notification that clears relaxed timeouts. +// Expected format: ["FAILED_OVER", ...] func (snh *NotificationHandler) handleFailedOver(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error { - // FAILED_OVER notifications indicate that a connection failover has completed - // Restore normal timeouts for the specific connection that received this notification if len(notification) < 2 { internal.Logger.Printf(ctx, logs.InvalidNotification("FAILED_OVER", notification)) return ErrInvalidNotification @@ -280,3 +290,107 @@ func (snh *NotificationHandler) handleFailedOver(ctx context.Context, handlerCtx conn.ClearRelaxedTimeout() return nil } + +// handleSMigrating processes SMIGRATING notifications. +// SMIGRATING indicates that a cluster slot is in the process of migrating to a different node. +// This is a per-connection notification that applies relaxed timeouts during slot migration. +// Expected format: ["SMIGRATING", SeqID, slot/range1-range2, ...] +func (snh *NotificationHandler) handleSMigrating(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error { + if len(notification) < 3 { + internal.Logger.Printf(ctx, logs.InvalidNotification("SMIGRATING", notification)) + return ErrInvalidNotification + } + + // Extract SeqID (position 1) + seqID, ok := notification[1].(int64) + if !ok { + internal.Logger.Printf(ctx, logs.InvalidSeqIDInSMigratingNotification(notification[1])) + return ErrInvalidNotification + } + + // Extract slot ranges (position 2+) + // For now, we just extract them for logging + // Format can be: single slot "1234" or range "100-200" + var slotRanges []string + for i := 2; i < len(notification); i++ { + if slotRange, ok := notification[i].(string); ok { + slotRanges = append(slotRanges, slotRange) + } + } + + if handlerCtx.Conn == nil { + internal.Logger.Printf(ctx, logs.NoConnectionInHandlerContext("SMIGRATING")) + return ErrInvalidNotification + } + + conn, ok := handlerCtx.Conn.(*pool.Conn) + if !ok { + internal.Logger.Printf(ctx, logs.InvalidConnectionTypeInHandlerContext("SMIGRATING", handlerCtx.Conn, handlerCtx)) + return ErrInvalidNotification + } + + // Apply relaxed timeout to this specific connection + if internal.LogLevel.InfoOrAbove() { + internal.Logger.Printf(ctx, logs.SlotMigrating(conn.GetID(), seqID, slotRanges)) + } + conn.SetRelaxedTimeout(snh.manager.config.RelaxedTimeout, snh.manager.config.RelaxedTimeout) + return nil +} + +// handleSMigrated processes SMIGRATED notifications. +// SMIGRATED indicates that a cluster slot has finished migrating to a different node. +// This is a cluster-level notification that triggers cluster state reload. +// Expected format: ["SMIGRATED", SeqID, host:port, slot1/range1-range2, ...] +// Note: Multiple connections may receive the same notification, so we deduplicate by SeqID before triggering reload. +// but we still process the notification on each connection to clear the relaxed timeout. +func (snh *NotificationHandler) handleSMigrated(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error { + if len(notification) < 4 { + internal.Logger.Printf(ctx, logs.InvalidNotification("SMIGRATED", notification)) + return ErrInvalidNotification + } + + // Extract SeqID (position 1) + seqID, ok := notification[1].(int64) + if !ok { + internal.Logger.Printf(ctx, logs.InvalidSeqIDInSMigratedNotification(notification[1])) + return ErrInvalidNotification + } + + // Deduplicate by SeqID - multiple connections may receive the same notification + if snh.manager.MarkSMigratedSeqIDProcessed(seqID) { + // Extract host:port (position 2) + hostPort, ok := notification[2].(string) + if !ok { + internal.Logger.Printf(ctx, logs.InvalidHostPortInSMigratedNotification(notification[2])) + return ErrInvalidNotification + } + + // Extract slot ranges (position 3+) + // For now, we just extract them for logging + // Format can be: single slot "1234" or range "100-200" + var slotRanges []string + for i := 3; i < len(notification); i++ { + if slotRange, ok := notification[i].(string); ok { + slotRanges = append(slotRanges, slotRange) + } + } + + if internal.LogLevel.InfoOrAbove() { + internal.Logger.Printf(ctx, logs.SlotMigrated(seqID, hostPort, slotRanges)) + } + // Trigger cluster state reload via callback, passing host:port and slot ranges + // For now, implementations just log these and trigger a full reload + // In the future, this could be optimized to reload only the specific slots + snh.manager.TriggerClusterStateReload(ctx, hostPort, slotRanges) + } + + // clear relaxed timeout + if handlerCtx.Conn != nil { + conn, ok := handlerCtx.Conn.(*pool.Conn) + if ok { + conn.ClearRelaxedTimeout() + } + } + + return nil +}