Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Revamp matching service #39

Merged
merged 7 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 43 additions & 13 deletions apps/matching-service/handlers/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ var (
activeConnections = make(map[string]*websocket.Conn)
// A map to hold user's match ctx cancel function
matchContexts = make(map[string]context.CancelFunc)
mu sync.Mutex // Mutex for thread-safe access to activeConnections
// A map to hold user's match channels
matchFoundChannels = make(map[string]chan models.MatchFound)
mu sync.Mutex // Mutex for thread-safe access to activeConnections
)

// handleConnections manages WebSocket connections and matching logic.
Expand Down Expand Up @@ -67,6 +69,9 @@ func HandleWebSocketConnections(w http.ResponseWriter, r *http.Request) {
activeConnections[matchRequest.Username] = ws
matchCtx, matchCancel := context.WithCancel(context.Background())
matchContexts[matchRequest.Username] = matchCancel

matchFoundChan := make(chan models.MatchFound)
matchFoundChannels[matchRequest.Username] = matchFoundChan
mu.Unlock()

// Create a context for cancellation
Expand All @@ -84,11 +89,9 @@ func HandleWebSocketConnections(w http.ResponseWriter, r *http.Request) {
}
defer timeoutCancel()

matchFoundChan := make(chan models.MatchFound)

// Start goroutines for handling messages and performing matching.
go processes.ReadMessages(ws, ctx, cancel)
go processes.PerformMatching(matchRequest, ctx, matchFoundChan) // Perform matching
go processes.PerformMatching(matchRequest, context.Background(), matchFoundChannels) // Perform matching

// Wait for a match, timeout, or cancellation.
waitForResult(ws, ctx, timeoutCtx, matchCtx, matchFoundChan, matchRequest.Username)
Expand Down Expand Up @@ -127,29 +130,47 @@ func waitForResult(ws *websocket.Conn, ctx, timeoutCtx, matchCtx context.Context
log.Println("Matching cancelled")
// Cleanup Redis
processes.CleanUpUser(processes.GetRedisClient(), username, context.Background())
// Remove the match context and active
if _, exists := matchContexts[username]; exists {
delete(matchContexts, username)
}
if _, exists := activeConnections[username]; exists {
delete(activeConnections, username)
}
if _, exists := matchFoundChannels[username]; exists {
delete(matchFoundChannels, username)
}

return
case <-timeoutCtx.Done():
log.Println("Connection timed out")
// Cleanup Redis
processes.CleanUpUser(processes.GetRedisClient(), username, context.Background())
// Remove the match context and active
if _, exists := matchContexts[username]; exists {
delete(matchContexts, username)
}
if _, exists := activeConnections[username]; exists {
delete(activeConnections, username)
}
if _, exists := matchFoundChannels[username]; exists {
delete(matchFoundChannels, username)
}
Comment on lines +149 to +158

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should abstract out the match context and active connections, and the redis cleanup, to a function. We don't have to call it here, but we can call the function after waitForResult, since it is blocking, which should clean up the code.


sendTimeoutResponse(ws)
return
case <-matchCtx.Done():
log.Println("Match found for user HEREEE: " + username)
log.Println("Match found for user: " + username)
return
case result, ok := <-matchFoundChan:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably do not need the case for <-matchCtx.Done if we have the <-matchFoundChan that checks if matching is done

if !ok {
// Channel closed without a match, possibly due to context cancellation
log.Println("Match channel closed without finding a match")
return
}
log.Println("Match found for user: " + result.User)
log.Println("Match found for user: " + username)
// Notify the users about the match
notifyMatch(result.User, result.MatchedUser, result)

// if err := ws.WriteJSON(result); err != nil {
// log.Printf("write error: %v", err)
// }
return
}
}
Expand Down Expand Up @@ -185,13 +206,22 @@ func notifyMatch(username, matchedUsername string, result models.MatchFound) {
}

// Remove the match context for both users and cancel for matched user
if _, exists := matchContexts[username]; exists {
if cancelFunc, exists := matchContexts[username]; exists {
cancelFunc()
delete(matchContexts, username)
}

if cancelFunc, exists := matchContexts[matchedUsername]; exists {
if cancelFunc2, exists := matchContexts[matchedUsername]; exists {
cancelFunc2()
delete(matchContexts, matchedUsername)
defer cancelFunc() // TODO: CancelFunction here is not causing the matchCtx to be done
}

// Remove the match channels
if _, exists := matchFoundChannels[username]; exists {
delete(matchFoundChannels, username)
}
if _, exists := matchFoundChannels[matchedUsername]; exists {
delete(matchFoundChannels, matchedUsername)
}

// Remove users from the activeConnections map
Expand Down
8 changes: 4 additions & 4 deletions apps/matching-service/processes/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func getPortNumber(addr string) (int64, error) {
return port, nil
}

func PerformMatching(matchRequest models.MatchRequest, ctx context.Context, matchFoundChan chan models.MatchFound) {
func PerformMatching(matchRequest models.MatchRequest, ctx context.Context, matchFoundChannels map[string]chan models.MatchFound) {
// Acquire mutex
mu.Lock()
// Defer unlocking the mutex
Expand Down Expand Up @@ -73,15 +73,15 @@ func PerformMatching(matchRequest models.MatchRequest, ctx context.Context, matc
// Peek at the user queue
username, err := redisClient.LIndex(context.Background(), "matchmaking_queue", 0).Result()
if err != nil {
// log.Println("Error peeking user from queue:", err)
log.Println("Error peeking user from queue:", err)
time.Sleep(1 * time.Second)
continue
}

// log.Printf("Performing matching for user: %s", username)
matchedUsername, matchedTopic, matchedDifficulty, err := FindMatchingUser(redisClient, username, ctx)
if err != nil {
// log.Println("Error finding matching user:", err)
log.Println("Error finding matching user:", err)
time.Sleep(1 * time.Second)
continue
}
Expand Down Expand Up @@ -111,7 +111,7 @@ func PerformMatching(matchRequest models.MatchRequest, ctx context.Context, matc
}

// Signal that a match has been found
matchFoundChan <- models.MatchFound{
matchFoundChannels[username] <- models.MatchFound{
Type: "match_found",
MatchID: matchId,
User: username,
Expand Down