From bf58a6a823a34a7a31b792c4b445f8656a82e03b Mon Sep 17 00:00:00 2001 From: Kaushal_26 Date: Sat, 17 May 2025 19:12:07 +0530 Subject: [PATCH 1/3] fix: Handle multiple watch commands for same client and minor improvements - Simplified ClientID assignment logic to ensure it is set directly without error checks after the initial loop. - Updated HANDSHAKE command handling to directly assign ClientID and Mode without redundant error checks. - Modified watch command handling to ensure proper notification of watchers without unnecessary conditions. - Enhanced cleanup logic in WatchManager to delete command entries when fingerprints are removed. These changes improve code clarity and maintainability while ensuring correct behavior during command processing. --- internal/server/ironhawk/iothread.go | 24 ++++++++--------------- internal/server/ironhawk/main.go | 1 + internal/server/ironhawk/watch_manager.go | 5 ++++- 3 files changed, 13 insertions(+), 17 deletions(-) diff --git a/internal/server/ironhawk/iothread.go b/internal/server/ironhawk/iothread.go index 2c3955817..bab8d061b 100644 --- a/internal/server/ironhawk/iothread.go +++ b/internal/server/ironhawk/iothread.go @@ -85,18 +85,16 @@ func (t *IOThread) Start(ctx context.Context, shardManager *shardmanager.ShardMa // Also, CLientID is duplicated in command and io-thread. // Also, we shouldn't allow execution/registration incase of invalid commands // like for B.WATCH cmd since it'll err out we shall return and not create subscription - if err == nil { - t.ClientID = _c.ClientID - } + // No error handling after this as we have continued loop above if error found + t.ClientID = _c.ClientID - if c.Cmd == "HANDSHAKE" && err == nil { + if c.Cmd == "HANDSHAKE" { t.ClientID = _c.C.Args[0] t.Mode = _c.C.Args[1] } - isWatchCmd := strings.HasSuffix(c.Cmd, "WATCH") - - if isWatchCmd{ + // NOTE: Do not remove . from here, as UNWATCH will be handled as WATCH + if strings.HasSuffix(c.Cmd, ".WATCH") { watchManager.HandleWatch(_c, t) } else if strings.HasSuffix(c.Cmd, "UNWATCH") { watchManager.HandleUnwatch(_c, t) @@ -104,19 +102,13 @@ func (t *IOThread) Start(ctx context.Context, shardManager *shardmanager.ShardMa watchManager.RegisterThread(t) - // Only send the response directly if this is not a watch command - // For watch commands, the response will be sent by NotifyWatchers - if !isWatchCmd{ - if sendErr := t.serverWire.Send(ctx, res.Rs); sendErr != nil { - return sendErr.Unwrap() - } + if sendErr := t.serverWire.Send(ctx, res.Rs); sendErr != nil { + return sendErr.Unwrap() } // TODO: Streamline this because we need ordering of updates // that are being sent to watchers. - if err == nil { - watchManager.NotifyWatchers(_c, shardManager, t) - } + watchManager.NotifyWatchers(_c, shardManager) } } diff --git a/internal/server/ironhawk/main.go b/internal/server/ironhawk/main.go index a2df3ef6f..bd50a6a79 100644 --- a/internal/server/ironhawk/main.go +++ b/internal/server/ironhawk/main.go @@ -170,6 +170,7 @@ func (s *Server) startIOThread(ctx context.Context, wg *sync.WaitGroup, thread * slog.Any("error", err)) } } + thread.Stop() } func (s *Server) Shutdown() { diff --git a/internal/server/ironhawk/watch_manager.go b/internal/server/ironhawk/watch_manager.go index 757d0b4a7..8fa728d83 100644 --- a/internal/server/ironhawk/watch_manager.go +++ b/internal/server/ironhawk/watch_manager.go @@ -125,11 +125,14 @@ func (w *WatchManager) CleanupThreadWatchSubscriptions(t *IOThread) { delete(w.fpClientMap[fp], t.ClientID) if len(w.fpClientMap[fp]) == 0 { delete(w.fpClientMap, fp) + + // If we have deleted the fingerprint, delete the command from the map + delete(w.fpCmdMap, fp) } } } -func (w *WatchManager) NotifyWatchers(c *cmd.Cmd, shardManager *shardmanager.ShardManager, t *IOThread) { +func (w *WatchManager) NotifyWatchers(c *cmd.Cmd, shardManager *shardmanager.ShardManager) { // Use RLock instead as we are not really modifying any shared maps here. w.mu.RLock() defer w.mu.RUnlock() From 60ce03bb73c5b546b607ecda1aea17d5540a5e0b Mon Sep 17 00:00:00 2001 From: Kaushal_26 Date: Sat, 17 May 2025 19:50:08 +0530 Subject: [PATCH 2/3] fix(linter): Improve error handling during IO thread shutdown - Added error handling for the thread.Stop() method to log failures when stopping the IO thread. - Enhanced logging to include client ID and mode for better traceability in case of errors. --- internal/server/ironhawk/main.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/internal/server/ironhawk/main.go b/internal/server/ironhawk/main.go index bd50a6a79..2419fe819 100644 --- a/internal/server/ironhawk/main.go +++ b/internal/server/ironhawk/main.go @@ -170,7 +170,13 @@ func (s *Server) startIOThread(ctx context.Context, wg *sync.WaitGroup, thread * slog.Any("error", err)) } } - thread.Stop() + if err := thread.Stop(); err != nil { + slog.Debug("failed to stop io-thread", + slog.String("client_id", thread.ClientID), + slog.String("mode", thread.Mode), + slog.Any("error", err), + ) + } } func (s *Server) Shutdown() { From 10775f7a75e1fa83851fadcf3237724abd6b1a9d Mon Sep 17 00:00:00 2001 From: Kaushal_26 Date: Sun, 25 May 2025 18:48:37 +0530 Subject: [PATCH 3/3] Refactor IOThread and WatchManager: Simplify WAL logging condition and update NotifyWatchers signature to include IOThread instance. --- internal/server/ironhawk/iothread.go | 6 +++--- internal/server/ironhawk/watch_manager.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/server/ironhawk/iothread.go b/internal/server/ironhawk/iothread.go index 266554398..8a4b4591a 100644 --- a/internal/server/ironhawk/iothread.go +++ b/internal/server/ironhawk/iothread.go @@ -95,7 +95,7 @@ func (t *IOThread) Start(ctx context.Context, shardManager *shardmanager.ShardMa } // Log command to WAL if enabled and not a replay - if err == nil && wal.GetWAL() != nil && !_c.IsReplay { + if wal.GetWAL() != nil && !_c.IsReplay { // Create WAL entry using protobuf message if err := wal.GetWAL().LogCommand(_c.C); err != nil { slog.Error("failed to log command to WAL", slog.Any("error", err)) @@ -116,7 +116,7 @@ func (t *IOThread) Start(ctx context.Context, shardManager *shardmanager.ShardMa res.Rs.Fingerprint64 = _cWatch.Fingerprint() } - if c.Cmd == "HANDSHAKE" && err == nil { + if c.Cmd == "HANDSHAKE" { t.ClientID = _c.C.Args[0] t.Mode = _c.C.Args[1] } @@ -141,7 +141,7 @@ func (t *IOThread) Start(ctx context.Context, shardManager *shardmanager.ShardMa // TODO: Streamline this because we need ordering of updates // that are being sent to watchers. - watchManager.NotifyWatchers(_c, shardManager) + watchManager.NotifyWatchers(_c, shardManager, t) } } diff --git a/internal/server/ironhawk/watch_manager.go b/internal/server/ironhawk/watch_manager.go index f34adf77d..d83099181 100644 --- a/internal/server/ironhawk/watch_manager.go +++ b/internal/server/ironhawk/watch_manager.go @@ -132,7 +132,7 @@ func (w *WatchManager) CleanupThreadWatchSubscriptions(t *IOThread) { } } -func (w *WatchManager) NotifyWatchers(c *cmd.Cmd, shardManager *shardmanager.ShardManager) { +func (w *WatchManager) NotifyWatchers(c *cmd.Cmd, shardManager *shardmanager.ShardManager, t *IOThread) { // Use RLock instead as we are not really modifying any shared maps here. w.mu.RLock() defer w.mu.RUnlock()