Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
push:
branches: [master, v9, 'v9.*']
pull_request:
branches: [master, v9, v9.7, v9.8, 'ndyakov/*', 'ofekshenawa/*', 'htemelski-redis/*', 'ce/*']
branches: [master, v9, v9.7, v9.8, 'ndyakov/**', 'ofekshenawa/**', 'ce/**']

permissions:
contents: read
Expand Down
45 changes: 45 additions & 0 deletions internal/maintnotifications/logs/log_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ const (
UnrelaxedTimeoutMessage = "clearing relaxed timeout"
ManagerNotInitializedMessage = "manager not initialized"
FailedToMarkForHandoffMessage = "failed to mark connection for handoff"
InvalidSeqIDInSMigratingNotificationMessage = "invalid SeqID in SMIGRATING notification"
InvalidSeqIDInSMigratedNotificationMessage = "invalid SeqID in SMIGRATED notification"
InvalidHostPortInSMigratedNotificationMessage = "invalid host:port in SMIGRATED notification"
SlotMigratingMessage = "slots migrating, applying relaxed timeout"
SlotMigratedMessage = "slots migrated, triggering cluster state reload"

// ========================================
// used in pool/conn
Expand Down Expand Up @@ -623,3 +628,43 @@ func ExtractDataFromLogMessage(logMessage string) map[string]interface{} {
// If JSON parsing fails, return empty map
return result
}

// Cluster notification functions
func InvalidSeqIDInSMigratingNotification(seqID interface{}) string {
message := fmt.Sprintf("%s: %v", InvalidSeqIDInSMigratingNotificationMessage, seqID)
return appendJSONIfDebug(message, map[string]interface{}{
"seqID": fmt.Sprintf("%v", seqID),
})
}

func InvalidSeqIDInSMigratedNotification(seqID interface{}) string {
message := fmt.Sprintf("%s: %v", InvalidSeqIDInSMigratedNotificationMessage, seqID)
return appendJSONIfDebug(message, map[string]interface{}{
"seqID": fmt.Sprintf("%v", seqID),
})
}

func InvalidHostPortInSMigratedNotification(hostPort interface{}) string {
message := fmt.Sprintf("%s: %v", InvalidHostPortInSMigratedNotificationMessage, hostPort)
return appendJSONIfDebug(message, map[string]interface{}{
"hostPort": fmt.Sprintf("%v", hostPort),
})
}

func SlotMigrating(connID uint64, seqID int64, slotRanges []string) string {
message := fmt.Sprintf("conn[%d] %s seqID=%d slots=%v", connID, SlotMigratingMessage, seqID, slotRanges)
return appendJSONIfDebug(message, map[string]interface{}{
"connID": connID,
"seqID": seqID,
"slotRanges": slotRanges,
})
}

func SlotMigrated(seqID int64, hostPort string, slotRanges []string) string {
message := fmt.Sprintf("%s seqID=%d host:port=%s slots=%v", SlotMigratedMessage, seqID, hostPort, slotRanges)
return appendJSONIfDebug(message, map[string]interface{}{
"seqID": seqID,
"hostPort": hostPort,
"slotRanges": slotRanges,
})
}
10 changes: 8 additions & 2 deletions maintnotifications/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@

Seamless Redis connection handoffs during cluster maintenance operations without dropping connections.

## ⚠️ **Important Note**
**Maintenance notifications are currently supported only in standalone Redis clients.** Cluster clients (ClusterClient, FailoverClient, etc.) do not yet support this functionality.
## Cluster Support

**Cluster notifications are now supported for ClusterClient!**

- **SMIGRATING**: `["SMIGRATING", SeqID, slot/range, ...]` - Relaxes timeouts when slots are being migrated
- **SMIGRATED**: `["SMIGRATED", SeqID, host:port, slot/range, ...]` - Reloads cluster state when slot migration completes

**Note:** Other maintenance notifications (MOVING, MIGRATING, MIGRATED, FAILING_OVER, FAILED_OVER) are supported only in standalone Redis clients. Cluster clients support SMIGRATING and SMIGRATED for cluster-specific slot migration handling.

## Quick Start

Expand Down
52 changes: 47 additions & 5 deletions maintnotifications/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ import (

// Push notification type constants for maintenance
const (
NotificationMoving = "MOVING"
NotificationMigrating = "MIGRATING"
NotificationMigrated = "MIGRATED"
NotificationFailingOver = "FAILING_OVER"
NotificationFailedOver = "FAILED_OVER"
NotificationMoving = "MOVING" // Per-connection handoff notification
NotificationMigrating = "MIGRATING" // Per-connection migration start notification - relaxes timeouts
NotificationMigrated = "MIGRATED" // Per-connection migration complete notification - clears relaxed timeouts
NotificationFailingOver = "FAILING_OVER" // Per-connection failover start notification - relaxes timeouts
NotificationFailedOver = "FAILED_OVER" // Per-connection failover complete notification - clears relaxed timeouts
NotificationSMigrating = "SMIGRATING" // Cluster slot migrating notification - relaxes timeouts
NotificationSMigrated = "SMIGRATED" // Cluster slot migrated notification - triggers cluster state reload
)

// maintenanceNotificationTypes contains all notification types that maintenance handles
Expand All @@ -32,6 +34,8 @@ var maintenanceNotificationTypes = []string{
NotificationMigrated,
NotificationFailingOver,
NotificationFailedOver,
NotificationSMigrating,
NotificationSMigrated,
}

// NotificationHook is called before and after notification processing
Expand Down Expand Up @@ -65,6 +69,10 @@ type Manager struct {
// MOVING operation tracking - using sync.Map for better concurrent performance
activeMovingOps sync.Map // map[MovingOperationKey]*MovingOperation

// SMIGRATED notification deduplication - tracks processed SeqIDs
// Multiple connections may receive the same SMIGRATED notification
processedSMigratedSeqIDs sync.Map // map[int64]bool

// Atomic state tracking - no locks needed for state queries
activeOperationCount atomic.Int64 // Number of active operations
closed atomic.Bool // Manager closed state
Expand All @@ -73,6 +81,9 @@ type Manager struct {
hooks []NotificationHook
hooksMu sync.RWMutex // Protects hooks slice
poolHooksRef *PoolHook

// Cluster state reload callback for SMIGRATED notifications
clusterStateReloadCallback ClusterStateReloadCallback
}

// MovingOperation tracks an active MOVING operation.
Expand All @@ -83,6 +94,14 @@ type MovingOperation struct {
Deadline time.Time
}

// ClusterStateReloadCallback is a callback function that triggers cluster state reload.
// This is used by node clients to notify their parent ClusterClient about SMIGRATED notifications.
// The hostPort parameter indicates the destination node (e.g., "127.0.0.1:6379").
// The slotRanges parameter contains the migrated slots (e.g., ["1234", "5000-6000"]).
// Currently, implementations typically reload the entire cluster state, but in the future
// this could be optimized to reload only the specific slots.
type ClusterStateReloadCallback func(ctx context.Context, hostPort string, slotRanges []string)

// NewManager creates a new simplified manager.
func NewManager(client interfaces.ClientInterface, pool pool.Pooler, config *Config) (*Manager, error) {
if client == nil {
Expand Down Expand Up @@ -223,6 +242,15 @@ func (hm *Manager) GetActiveOperationCount() int64 {
return hm.activeOperationCount.Load()
}

// MarkSMigratedSeqIDProcessed attempts to mark a SMIGRATED SeqID as processed.
// Returns true if this is the first time processing this SeqID (should process),
// false if it was already processed (should skip).
// This prevents duplicate processing when multiple connections receive the same notification.
func (hm *Manager) MarkSMigratedSeqIDProcessed(seqID int64) bool {
_, alreadyProcessed := hm.processedSMigratedSeqIDs.LoadOrStore(seqID, true)
return !alreadyProcessed // Return true if NOT already processed
}

// Close closes the manager.
func (hm *Manager) Close() error {
// Use atomic operation for thread-safe close check
Expand Down Expand Up @@ -318,3 +346,17 @@ func (hm *Manager) AddNotificationHook(notificationHook NotificationHook) {
defer hm.hooksMu.Unlock()
hm.hooks = append(hm.hooks, notificationHook)
}

// SetClusterStateReloadCallback sets the callback function that will be called when a SMIGRATED notification is received.
// This allows node clients to notify their parent ClusterClient to reload cluster state.
func (hm *Manager) SetClusterStateReloadCallback(callback ClusterStateReloadCallback) {
hm.clusterStateReloadCallback = callback
}

// TriggerClusterStateReload calls the cluster state reload callback if it's set.
// This is called when a SMIGRATED notification is received.
func (hm *Manager) TriggerClusterStateReload(ctx context.Context, hostPort string, slotRanges []string) {
if hm.clusterStateReloadCallback != nil {
hm.clusterStateReloadCallback(ctx, hostPort, slotRanges)
}
}
2 changes: 2 additions & 0 deletions maintnotifications/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ func TestManagerRefactoring(t *testing.T) {
NotificationMigrated,
NotificationFailingOver,
NotificationFailedOver,
NotificationSMigrating,
NotificationSMigrated,
}

if len(maintenanceNotificationTypes) != len(expectedTypes) {
Expand Down
20 changes: 18 additions & 2 deletions maintnotifications/pool_hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,9 +700,25 @@ func TestConnectionHook(t *testing.T) {
t.Errorf("Connection should be pooled after handoff (shouldPool=%v, shouldRemove=%v)", shouldPool, shouldRemove)
}

// Wait for handoff to complete
time.Sleep(50 * time.Millisecond)
// Wait for handoff to complete with polling instead of fixed sleep
// This avoids flakiness on slow CI runners where 50ms may not be enough
maxWait := 500 * time.Millisecond
pollInterval := 10 * time.Millisecond
deadline := time.Now().Add(maxWait)

handoffCompleted := false
for time.Now().Before(deadline) {
if conn.IsUsable() && !processor.IsHandoffPending(conn) {
handoffCompleted = true
break
}
time.Sleep(pollInterval)
}

if !handoffCompleted {
t.Fatalf("Handoff did not complete within %v (IsUsable=%v, IsHandoffPending=%v)",
maxWait, conn.IsUsable(), processor.IsHandoffPending(conn))
}
// After handoff completion, connection should be usable again
if !conn.IsUsable() {
t.Error("Connection should be usable after handoff completion")
Expand Down
132 changes: 123 additions & 9 deletions maintnotifications/push_notification_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ func (snh *NotificationHandler) HandlePushNotification(ctx context.Context, hand
err = snh.handleFailingOver(ctx, handlerCtx, modifiedNotification)
case NotificationFailedOver:
err = snh.handleFailedOver(ctx, handlerCtx, modifiedNotification)
case NotificationSMigrating:
err = snh.handleSMigrating(ctx, handlerCtx, modifiedNotification)
case NotificationSMigrated:
err = snh.handleSMigrated(ctx, handlerCtx, modifiedNotification)
default:
// Ignore other notification types (e.g., pub/sub messages)
err = nil
Expand All @@ -61,7 +65,9 @@ func (snh *NotificationHandler) HandlePushNotification(ctx context.Context, hand
}

// handleMoving processes MOVING notifications.
// ["MOVING", seqNum, timeS, endpoint] - per-connection handoff
// MOVING indicates that a connection should be handed off to a new endpoint.
// This is a per-connection notification that triggers connection handoff.
// Expected format: ["MOVING", seqNum, timeS, endpoint]
func (snh *NotificationHandler) handleMoving(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error {
if len(notification) < 3 {
internal.Logger.Printf(ctx, logs.InvalidNotification("MOVING", notification))
Expand Down Expand Up @@ -167,9 +173,10 @@ func (snh *NotificationHandler) markConnForHandoff(conn *pool.Conn, newEndpoint
}

// handleMigrating processes MIGRATING notifications.
// MIGRATING indicates that a connection migration is starting.
// This is a per-connection notification that applies relaxed timeouts.
// Expected format: ["MIGRATING", ...]
func (snh *NotificationHandler) handleMigrating(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error {
// MIGRATING notifications indicate that a connection is about to be migrated
// Apply relaxed timeouts to the specific connection that received this notification
if len(notification) < 2 {
internal.Logger.Printf(ctx, logs.InvalidNotification("MIGRATING", notification))
return ErrInvalidNotification
Expand All @@ -195,9 +202,10 @@ func (snh *NotificationHandler) handleMigrating(ctx context.Context, handlerCtx
}

// handleMigrated processes MIGRATED notifications.
// MIGRATED indicates that a connection migration has completed.
// This is a per-connection notification that clears relaxed timeouts.
// Expected format: ["MIGRATED", ...]
func (snh *NotificationHandler) handleMigrated(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error {
// MIGRATED notifications indicate that a connection migration has completed
// Restore normal timeouts for the specific connection that received this notification
if len(notification) < 2 {
internal.Logger.Printf(ctx, logs.InvalidNotification("MIGRATED", notification))
return ErrInvalidNotification
Expand All @@ -224,9 +232,10 @@ func (snh *NotificationHandler) handleMigrated(ctx context.Context, handlerCtx p
}

// handleFailingOver processes FAILING_OVER notifications.
// FAILING_OVER indicates that a failover is starting.
// This is a per-connection notification that applies relaxed timeouts.
// Expected format: ["FAILING_OVER", ...]
func (snh *NotificationHandler) handleFailingOver(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error {
// FAILING_OVER notifications indicate that a connection is about to failover
// Apply relaxed timeouts to the specific connection that received this notification
if len(notification) < 2 {
internal.Logger.Printf(ctx, logs.InvalidNotification("FAILING_OVER", notification))
return ErrInvalidNotification
Expand All @@ -253,9 +262,10 @@ func (snh *NotificationHandler) handleFailingOver(ctx context.Context, handlerCt
}

// handleFailedOver processes FAILED_OVER notifications.
// FAILED_OVER indicates that a failover has completed.
// This is a per-connection notification that clears relaxed timeouts.
// Expected format: ["FAILED_OVER", ...]
func (snh *NotificationHandler) handleFailedOver(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error {
// FAILED_OVER notifications indicate that a connection failover has completed
// Restore normal timeouts for the specific connection that received this notification
if len(notification) < 2 {
internal.Logger.Printf(ctx, logs.InvalidNotification("FAILED_OVER", notification))
return ErrInvalidNotification
Expand All @@ -280,3 +290,107 @@ func (snh *NotificationHandler) handleFailedOver(ctx context.Context, handlerCtx
conn.ClearRelaxedTimeout()
return nil
}

// handleSMigrating processes SMIGRATING notifications.
// SMIGRATING indicates that a cluster slot is in the process of migrating to a different node.
// This is a per-connection notification that applies relaxed timeouts during slot migration.
// Expected format: ["SMIGRATING", SeqID, slot/range1-range2, ...]
func (snh *NotificationHandler) handleSMigrating(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error {
if len(notification) < 3 {
internal.Logger.Printf(ctx, logs.InvalidNotification("SMIGRATING", notification))
return ErrInvalidNotification
}

// Extract SeqID (position 1)
seqID, ok := notification[1].(int64)
if !ok {
internal.Logger.Printf(ctx, logs.InvalidSeqIDInSMigratingNotification(notification[1]))
return ErrInvalidNotification
}

// Extract slot ranges (position 2+)
// For now, we just extract them for logging
// Format can be: single slot "1234" or range "100-200"
var slotRanges []string
for i := 2; i < len(notification); i++ {
if slotRange, ok := notification[i].(string); ok {
slotRanges = append(slotRanges, slotRange)
}
}

if handlerCtx.Conn == nil {
internal.Logger.Printf(ctx, logs.NoConnectionInHandlerContext("SMIGRATING"))
return ErrInvalidNotification
}

conn, ok := handlerCtx.Conn.(*pool.Conn)
if !ok {
internal.Logger.Printf(ctx, logs.InvalidConnectionTypeInHandlerContext("SMIGRATING", handlerCtx.Conn, handlerCtx))
return ErrInvalidNotification
}

// Apply relaxed timeout to this specific connection
if internal.LogLevel.InfoOrAbove() {
internal.Logger.Printf(ctx, logs.SlotMigrating(conn.GetID(), seqID, slotRanges))
}
conn.SetRelaxedTimeout(snh.manager.config.RelaxedTimeout, snh.manager.config.RelaxedTimeout)
return nil
}

// handleSMigrated processes SMIGRATED notifications.
// SMIGRATED indicates that a cluster slot has finished migrating to a different node.
// This is a cluster-level notification that triggers cluster state reload.
// Expected format: ["SMIGRATED", SeqID, host:port, slot1/range1-range2, ...]
// Note: Multiple connections may receive the same notification, so we deduplicate by SeqID before triggering reload.
// but we still process the notification on each connection to clear the relaxed timeout.
func (snh *NotificationHandler) handleSMigrated(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error {
if len(notification) < 4 {
internal.Logger.Printf(ctx, logs.InvalidNotification("SMIGRATED", notification))
return ErrInvalidNotification
}

// Extract SeqID (position 1)
seqID, ok := notification[1].(int64)
if !ok {
internal.Logger.Printf(ctx, logs.InvalidSeqIDInSMigratedNotification(notification[1]))
return ErrInvalidNotification
}

// Deduplicate by SeqID - multiple connections may receive the same notification
if snh.manager.MarkSMigratedSeqIDProcessed(seqID) {
// Extract host:port (position 2)
hostPort, ok := notification[2].(string)
if !ok {
internal.Logger.Printf(ctx, logs.InvalidHostPortInSMigratedNotification(notification[2]))
return ErrInvalidNotification
}

// Extract slot ranges (position 3+)
// For now, we just extract them for logging
// Format can be: single slot "1234" or range "100-200"
var slotRanges []string
for i := 3; i < len(notification); i++ {
if slotRange, ok := notification[i].(string); ok {
slotRanges = append(slotRanges, slotRange)
}
}

if internal.LogLevel.InfoOrAbove() {
internal.Logger.Printf(ctx, logs.SlotMigrated(seqID, hostPort, slotRanges))
}
// Trigger cluster state reload via callback, passing host:port and slot ranges
// For now, implementations just log these and trigger a full reload
// In the future, this could be optimized to reload only the specific slots
snh.manager.TriggerClusterStateReload(ctx, hostPort, slotRanges)
}

// clear relaxed timeout
if handlerCtx.Conn != nil {
conn, ok := handlerCtx.Conn.(*pool.Conn)
if ok {
conn.ClearRelaxedTimeout()
}
}

return nil
}
Loading