diff --git a/apps/matching-service/handlers/websocket.go b/apps/matching-service/handlers/websocket.go index 4d553faff7..066466e9ee 100644 --- a/apps/matching-service/handlers/websocket.go +++ b/apps/matching-service/handlers/websocket.go @@ -24,7 +24,9 @@ var ( activeConnections = make(map[string]*websocket.Conn) // A map to hold user's match ctx cancel function matchContexts = make(map[string]context.CancelFunc) - mu sync.Mutex // Mutex for thread-safe access to activeConnections + // A map to hold user's match channels + matchFoundChannels = make(map[string]chan models.MatchFound) + mu sync.Mutex // Mutex for thread-safe access to activeConnections ) // handleConnections manages WebSocket connections and matching logic. @@ -67,6 +69,9 @@ func HandleWebSocketConnections(w http.ResponseWriter, r *http.Request) { activeConnections[matchRequest.Username] = ws matchCtx, matchCancel := context.WithCancel(context.Background()) matchContexts[matchRequest.Username] = matchCancel + + matchFoundChan := make(chan models.MatchFound) + matchFoundChannels[matchRequest.Username] = matchFoundChan mu.Unlock() // Create a context for cancellation @@ -84,11 +89,9 @@ func HandleWebSocketConnections(w http.ResponseWriter, r *http.Request) { } defer timeoutCancel() - matchFoundChan := make(chan models.MatchFound) - // Start goroutines for handling messages and performing matching. go processes.ReadMessages(ws, ctx, cancel) - go processes.PerformMatching(matchRequest, ctx, matchFoundChan) // Perform matching + go processes.PerformMatching(matchRequest, context.Background(), matchFoundChannels) // Perform matching // Wait for a match, timeout, or cancellation. waitForResult(ws, ctx, timeoutCtx, matchCtx, matchFoundChan, matchRequest.Username) @@ -127,15 +130,37 @@ func waitForResult(ws *websocket.Conn, ctx, timeoutCtx, matchCtx context.Context log.Println("Matching cancelled") // Cleanup Redis processes.CleanUpUser(processes.GetRedisClient(), username, context.Background()) + // Remove the match context and active + if _, exists := matchContexts[username]; exists { + delete(matchContexts, username) + } + if _, exists := activeConnections[username]; exists { + delete(activeConnections, username) + } + if _, exists := matchFoundChannels[username]; exists { + delete(matchFoundChannels, username) + } + return case <-timeoutCtx.Done(): log.Println("Connection timed out") // Cleanup Redis processes.CleanUpUser(processes.GetRedisClient(), username, context.Background()) + // Remove the match context and active + if _, exists := matchContexts[username]; exists { + delete(matchContexts, username) + } + if _, exists := activeConnections[username]; exists { + delete(activeConnections, username) + } + if _, exists := matchFoundChannels[username]; exists { + delete(matchFoundChannels, username) + } + sendTimeoutResponse(ws) return case <-matchCtx.Done(): - log.Println("Match found for user HEREEE: " + username) + log.Println("Match found for user: " + username) return case result, ok := <-matchFoundChan: if !ok { @@ -143,13 +168,9 @@ func waitForResult(ws *websocket.Conn, ctx, timeoutCtx, matchCtx context.Context log.Println("Match channel closed without finding a match") return } - log.Println("Match found for user: " + result.User) + log.Println("Match found for user: " + username) // Notify the users about the match notifyMatch(result.User, result.MatchedUser, result) - - // if err := ws.WriteJSON(result); err != nil { - // log.Printf("write error: %v", err) - // } return } } @@ -185,13 +206,22 @@ func notifyMatch(username, matchedUsername string, result models.MatchFound) { } // Remove the match context for both users and cancel for matched user - if _, exists := matchContexts[username]; exists { + if cancelFunc, exists := matchContexts[username]; exists { + cancelFunc() delete(matchContexts, username) } - if cancelFunc, exists := matchContexts[matchedUsername]; exists { + if cancelFunc2, exists := matchContexts[matchedUsername]; exists { + cancelFunc2() delete(matchContexts, matchedUsername) - defer cancelFunc() // TODO: CancelFunction here is not causing the matchCtx to be done + } + + // Remove the match channels + if _, exists := matchFoundChannels[username]; exists { + delete(matchFoundChannels, username) + } + if _, exists := matchFoundChannels[matchedUsername]; exists { + delete(matchFoundChannels, matchedUsername) } // Remove users from the activeConnections map diff --git a/apps/matching-service/processes/match.go b/apps/matching-service/processes/match.go index 54019e80a1..22a264eb30 100644 --- a/apps/matching-service/processes/match.go +++ b/apps/matching-service/processes/match.go @@ -45,7 +45,7 @@ func getPortNumber(addr string) (int64, error) { return port, nil } -func PerformMatching(matchRequest models.MatchRequest, ctx context.Context, matchFoundChan chan models.MatchFound) { +func PerformMatching(matchRequest models.MatchRequest, ctx context.Context, matchFoundChannels map[string]chan models.MatchFound) { // Acquire mutex mu.Lock() // Defer unlocking the mutex @@ -73,7 +73,7 @@ func PerformMatching(matchRequest models.MatchRequest, ctx context.Context, matc // Peek at the user queue username, err := redisClient.LIndex(context.Background(), "matchmaking_queue", 0).Result() if err != nil { - // log.Println("Error peeking user from queue:", err) + log.Println("Error peeking user from queue:", err) time.Sleep(1 * time.Second) continue } @@ -81,7 +81,7 @@ func PerformMatching(matchRequest models.MatchRequest, ctx context.Context, matc // log.Printf("Performing matching for user: %s", username) matchedUsername, matchedTopic, matchedDifficulty, err := FindMatchingUser(redisClient, username, ctx) if err != nil { - // log.Println("Error finding matching user:", err) + log.Println("Error finding matching user:", err) time.Sleep(1 * time.Second) continue } @@ -111,7 +111,7 @@ func PerformMatching(matchRequest models.MatchRequest, ctx context.Context, matc } // Signal that a match has been found - matchFoundChan <- models.MatchFound{ + matchFoundChannels[username] <- models.MatchFound{ Type: "match_found", MatchID: matchId, User: username,