Skip to content

Commit

Permalink
feat: handle duplicate connection
Browse files Browse the repository at this point in the history
  • Loading branch information
tituschewxj committed Oct 22, 2024
1 parent a483202 commit 9834600
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 44 deletions.
24 changes: 17 additions & 7 deletions apps/matching-service/handlers/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package handlers
import (
"context"
"encoding/json"
"errors"
"log"
"matching-service/databases"
"matching-service/models"
Expand Down Expand Up @@ -50,8 +51,6 @@ func HandleWebSocketConnections(w http.ResponseWriter, r *http.Request) {
return
}

// TODO: Checks if user is already an existing websocket connection

// Subscribes to a channel that returns a message if a match is found
matchFoundPubsub := rdb.Subscribe(ctx, matchRequest.Username)
defer matchFoundPubsub.Close()
Expand All @@ -60,6 +59,9 @@ func HandleWebSocketConnections(w http.ResponseWriter, r *http.Request) {
userCtx, userCancel := context.WithCancel(ctx)
defer userCancel() // Ensure cancel is called to release resources

// Create channel for handling errors
errorChan := make(chan error)

// Create a context for matching timeout
timeoutCtx, timeoutCancel, err := utils.CreateTimeoutContext()
if err != nil {
Expand All @@ -70,10 +72,10 @@ func HandleWebSocketConnections(w http.ResponseWriter, r *http.Request) {

// Start goroutines for handling messages and performing matching.
go processes.ReadMessages(ws, userCancel)
go processes.PerformMatching(matchRequest, ctx) // Perform matching
go processes.PerformMatching(rdb, matchRequest, ctx, errorChan)

// Wait for a match, timeout, or cancellation.
waitForResult(ws, userCtx, timeoutCtx, matchFoundPubsub, matchRequest.Username)
waitForResult(ws, userCtx, timeoutCtx, matchFoundPubsub, errorChan, matchRequest.Username)
}

// readMatchRequest reads the initial match request from the WebSocket connection.
Expand Down Expand Up @@ -105,7 +107,7 @@ func cleanUpUser(username string) {
}

// waitForResult waits for a match result, timeout, or cancellation.
func waitForResult(ws *websocket.Conn, userCtx, timeoutCtx context.Context, matchFoundPubsub *redis.PubSub, username string) {
func waitForResult(ws *websocket.Conn, userCtx, timeoutCtx context.Context, matchFoundPubsub *redis.PubSub, errorChan chan error, username string) {
select {
case <-userCtx.Done():
log.Printf("Matching cancelled for port %v", utils.ExtractWebsocketPort(ws))
Expand All @@ -116,10 +118,18 @@ func waitForResult(ws *websocket.Conn, userCtx, timeoutCtx context.Context, matc
sendTimeoutResponse(ws)
cleanUpUser(username)
return
case err, ok := <-errorChan:
if !ok {
return
}
if errors.Is(err, models.ExistingUserError) {
sendRejectionResponse(ws)
} else {
cleanUpUser(username)
}
return
case msg, ok := <-matchFoundPubsub.Channel():
if !ok {
// Channel closed without a match, possibly due to context cancellation
log.Println("Match found channel closed without finding a match")
return
}
var result models.MatchFound
Expand Down
5 changes: 5 additions & 0 deletions apps/matching-service/models/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package models

import "errors"

var ExistingUserError = errors.New("already has an existing user in matchmaking")
72 changes: 35 additions & 37 deletions apps/matching-service/processes/performmatches.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package processes
import (
"context"
"encoding/json"
"errors"
"log"
"matching-service/databases"
"matching-service/models"
Expand All @@ -13,68 +14,65 @@ import (

// Performs the matching algorithm at most once starting from the front of the queue of users,
// until a match is found or no match and the user is enqueued to the queue.
func PerformMatching(matchRequest models.MatchRequest, ctx context.Context) {
redisClient := databases.GetRedisClient()

err := redisClient.Watch(ctx, func(tx *redis.Tx) error {
func PerformMatching(rdb *redis.Client, matchRequest models.MatchRequest, ctx context.Context, errorChan chan error) {
err := rdb.Watch(ctx, func(tx *redis.Tx) error {
// Log queue before and after matchmaking
databases.PrintMatchingQueue(tx, "Before Matchmaking", ctx)
defer databases.PrintMatchingQueue(tx, "After Matchmaking", ctx)

// Iterate through the users in the queue from the front of the queue
currentUsername := matchRequest.Username
queuedUsernames, err := databases.GetAllQueuedUsers(tx, ctx)
if err != nil {
return err
}

currentUsername := matchRequest.Username
databases.AddUser(tx, matchRequest, ctx)

// Check that user is not part of the existing queue
for _, username := range queuedUsernames {
// Skip same user
if username == currentUsername {
// WARN: same user should not appear, since user is added after the queue users is accessed
// so this means that the user has another active websocket connection
continue
return models.ExistingUserError
}
}

// Find a matching user if any
matchFound, err := databases.FindMatchingUser(tx, currentUsername, ctx)
if err != nil {
log.Println("Error finding matching user:", err)
return err
}
databases.AddUser(tx, matchRequest, ctx)

if matchFound != nil {
matchedUsername := matchFound.MatchedUser
matchedTopic := matchFound.Topic
matchedDifficulty := matchFound.Difficulty
// Find a matching user if any
matchFound, err := databases.FindMatchingUser(tx, currentUsername, ctx)
if err != nil {
log.Println("Error finding matching user:", err)
return err
}

if matchFound != nil {
matchedUsername := matchFound.MatchedUser
matchedTopic := matchFound.Topic
matchedDifficulty := matchFound.Difficulty

// Generate a random match ID
matchId, err := utils.GenerateMatchID()
if err != nil {
log.Println("Unable to randomly generate matchID")
}
// Generate a random match ID
matchId, err := utils.GenerateMatchID()
if err != nil {
log.Println("Unable to randomly generate matchID")
}

// Log down which users got matched
matchFound.MatchID = matchId
log.Printf("Users %s and %s matched on the topic: %s with difficulty: %s", currentUsername, matchedUsername, matchedTopic, matchedDifficulty)
// Log down which users got matched
matchFound.MatchID = matchId
log.Printf("Users %s and %s matched on the topic: %s with difficulty: %s", currentUsername, matchedUsername, matchedTopic, matchedDifficulty)

// Clean up redis for this match
databases.CleanUpUser(tx, currentUsername, ctx)
databases.CleanUpUser(tx, matchedUsername, ctx)
// Clean up redis for this match
databases.CleanUpUser(tx, currentUsername, ctx)
databases.CleanUpUser(tx, matchedUsername, ctx)

publishMatch(tx, ctx, currentUsername, matchedUsername, matchFound)
publishMatch(tx, ctx, matchedUsername, currentUsername, matchFound)
}
publishMatch(tx, ctx, currentUsername, matchedUsername, matchFound)
publishMatch(tx, ctx, matchedUsername, currentUsername, matchFound)
}

return nil
})
if err != nil {
// Handle error (like retry logic could be added here)
// return fmt.Errorf("transaction execution failed: %v", err)
return
if errors.Is(err, models.ExistingUserError) {
errorChan <- err
}
}
}

Expand Down

0 comments on commit 9834600

Please sign in to comment.