From e89f5e55778eb8cefd67241d9da9a9be7b90ab6d Mon Sep 17 00:00:00 2001 From: bensohh Date: Tue, 15 Oct 2024 11:08:12 +0800 Subject: [PATCH 1/7] Update instructions on running matching-service with redis --- apps/matching-service/README.md | 21 ++- .../tests/websocket-test.html | 123 ++++++++++-------- 2 files changed, 86 insertions(+), 58 deletions(-) diff --git a/apps/matching-service/README.md b/apps/matching-service/README.md index 232e65497c..3828f81018 100644 --- a/apps/matching-service/README.md +++ b/apps/matching-service/README.md @@ -27,8 +27,15 @@ go mod tidy - `PORT`: Specifies the port for the WebSocket server. Default is `8081`. - `JWT_SECRET`: The secret key used to verify JWT tokens. - `MATCH_TIMEOUT`: The time in seconds to wait for a match before timing out. +- `REDIS_URL`: The URL for the Redis server. Default is `localhost:6379`. -4. Start the WebSocket server: +4. Start a local redis server: + +```bash +docker run -d -p 6379:6379 redis +``` + +5. Start the WebSocket server: ```bash go run main.go @@ -68,7 +75,9 @@ Client sends matching parameters: { "type": "match_request", "topics": ["Algorithms", "Arrays"], - "difficulties": ["Easy", "Medium"] + "difficulties": ["Easy", "Medium"], + "username": "Jane Doe", + "email": "janedoe@gmail.com" // possible to change to user ID in mongodb } ``` @@ -77,9 +86,9 @@ Server response on successful match: ```json { "type": "match_found", - "matchID": 67890, - "partnerID": 54321, - "partnerName": "John Doe" + "matchID": "aa41c004e589642402215c2c0a3a165a", + "partnerID": 54321, // currently identifying via partner's websocket port --> change to user ID in mongodb + "partnerName": "John Doe" // partner username } ``` @@ -100,6 +109,8 @@ Utilize `./tests/websocket-test.html` for a basic debugging interface of the mat Make sure to open the HTML file in a web browser while the WebSocket server is running to perform your tests. +You can open one instance of the HTML file in multiple tabs to simulate multiple clients connecting to the server. (In the future: ensure that only one connection is allowed per user) + ## Docker Support TODO: Add section for Docker setup and usage instructions. diff --git a/apps/matching-service/tests/websocket-test.html b/apps/matching-service/tests/websocket-test.html index cdbe3446db..4fb9ebfe75 100644 --- a/apps/matching-service/tests/websocket-test.html +++ b/apps/matching-service/tests/websocket-test.html @@ -1,66 +1,83 @@ - - - + + + Matching service: websocket test - - + +

Status: no matching yet

- + From 57c4fd9618922b05d73db4321d20f72e8fa1fe78 Mon Sep 17 00:00:00 2001 From: bensohh Date: Tue, 15 Oct 2024 11:12:41 +0800 Subject: [PATCH 2/7] Update env example and log statements in main --- apps/matching-service/.env.example | 5 +- apps/matching-service/go.mod | 6 ++ apps/matching-service/go.sum | 10 ++ apps/matching-service/handlers/websocket.go | 1 + apps/matching-service/main.go | 22 ++++ apps/matching-service/models/match.go | 5 +- apps/matching-service/processes/match.go | 103 +++++++++++++----- apps/matching-service/processes/queue.go | 112 ++++++++++++++++++++ 8 files changed, 236 insertions(+), 28 deletions(-) create mode 100644 apps/matching-service/processes/queue.go diff --git a/apps/matching-service/.env.example b/apps/matching-service/.env.example index 961e78fd21..fb40560876 100644 --- a/apps/matching-service/.env.example +++ b/apps/matching-service/.env.example @@ -1,3 +1,4 @@ PORT=8081 -MATCH_TIMEOUT=3 -JWT_SECRET=you-can-replace-this-with-your-own-secret \ No newline at end of file +MATCH_TIMEOUT=10 +JWT_SECRET=you-can-replace-this-with-your-own-secret +REDIS_URL=localhost:6379 \ No newline at end of file diff --git a/apps/matching-service/go.mod b/apps/matching-service/go.mod index 4bb3e79d9b..89ad38d6b5 100644 --- a/apps/matching-service/go.mod +++ b/apps/matching-service/go.mod @@ -5,4 +5,10 @@ go 1.23.1 require ( github.com/gorilla/websocket v1.5.3 github.com/joho/godotenv v1.5.1 + github.com/redis/go-redis/v9 v9.6.2 +) + +require ( + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect ) diff --git a/apps/matching-service/go.sum b/apps/matching-service/go.sum index 09f4ebfc33..722db5bc36 100644 --- a/apps/matching-service/go.sum +++ b/apps/matching-service/go.sum @@ -1,4 +1,14 @@ +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/redis/go-redis/v9 v9.6.2 h1:w0uvkRbc9KpgD98zcvo5IrVUsn0lXpRMuhNgiHDJzdk= +github.com/redis/go-redis/v9 v9.6.2/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= diff --git a/apps/matching-service/handlers/websocket.go b/apps/matching-service/handlers/websocket.go index 6c34aade4b..854bdd54c3 100644 --- a/apps/matching-service/handlers/websocket.go +++ b/apps/matching-service/handlers/websocket.go @@ -69,6 +69,7 @@ func readMatchRequest(ws *websocket.Conn) (models.MatchRequest, error) { if err := ws.ReadJSON(&matchRequest); err != nil { return matchRequest, err } + matchRequest.Port = ws.RemoteAddr().String() log.Printf("Received match request: %v", matchRequest) return matchRequest, nil } diff --git a/apps/matching-service/main.go b/apps/matching-service/main.go index 6a8af7f96b..655ac5d24a 100644 --- a/apps/matching-service/main.go +++ b/apps/matching-service/main.go @@ -1,13 +1,16 @@ package main import ( + "context" "fmt" "log" "matching-service/handlers" + "matching-service/processes" "net/http" "os" "github.com/joho/godotenv" + "github.com/redis/go-redis/v9" ) func main() { @@ -18,6 +21,25 @@ func main() { } port := os.Getenv("PORT") + // Set up link with redis server + redisAddr := os.Getenv("REDIS_URL") + client := redis.NewClient(&redis.Options{ + Addr: redisAddr, + Password: "", // no password set + DB: 0, // use default DB + }) + + // Ping the redis server + _, err = client.Ping(context.Background()).Result() + if err != nil { + log.Fatalf("Could not connect to Redis: %v", err) + } else { + log.Println("Connected to Redis at the following address: " + redisAddr) + } + + // Pass the connect redis client to processes in match + processes.SetRedisClient(client) + // Routes http.HandleFunc("/match", handlers.HandleWebSocketConnections) diff --git a/apps/matching-service/models/match.go b/apps/matching-service/models/match.go index e7cd2a6ff6..453fecc497 100644 --- a/apps/matching-service/models/match.go +++ b/apps/matching-service/models/match.go @@ -4,11 +4,14 @@ type MatchRequest struct { Type string `json:"type"` Topics []string `json:"topics"` Difficulties []string `json:"difficulties"` + Username string `json:"username"` + Email string `json:"email"` + Port string `json:"port"` } type MatchFound struct { Type string `json:"type"` - MatchID int64 `json:"matchId"` + MatchID string `json:"matchId"` PartnerID int64 `json:"partnerId"` PartnerName string `json:"partnerName"` } diff --git a/apps/matching-service/processes/match.go b/apps/matching-service/processes/match.go index 0565e3555e..a4d627a4fe 100644 --- a/apps/matching-service/processes/match.go +++ b/apps/matching-service/processes/match.go @@ -2,34 +2,87 @@ package processes import ( "context" + "fmt" + "log" "matching-service/models" - "time" + "strconv" + "strings" + "sync" + + "github.com/redis/go-redis/v9" ) -// PerformMatching reads the match request and simulates a matching process. -func PerformMatching(matchRequest models.MatchRequest, ctx context.Context, matchFoundChan chan models.MatchFound) { - defer close(matchFoundChan) // Safely close the channel after matching completes. - - // TODO: matching algorithm - // for { - // select { - // case <-ctx.Done(): - // // The context has been cancelled, so stop the matching process. - // return - // default: - // // Continue matching logic... - // } - // } - // Simulate the matching process with a sleep (replace with actual logic). - time.Sleep(2 * time.Second) - - // Create a mock result and send it to the channel. - result := models.MatchFound{ - Type: "match_found", - MatchID: 67890, - PartnerID: 54321, - PartnerName: "John Doe", +var ( + redisClient *redis.Client + mu sync.Mutex +) + +// SetRedisClient sets the Redis client to a global variable +func SetRedisClient(client *redis.Client) { + redisClient = client +} + +func getPortNumber(addr string) (int64, error) { + // Split the string by the colon + parts := strings.Split(addr, ":") + if len(parts) < 2 { + return 0, fmt.Errorf("no port number found") + } + + // Convert the port string to an integer + port, err := strconv.ParseInt(parts[len(parts)-1], 10, 64) + if err != nil { + return 0, err // Return an error if conversion fails } - matchFoundChan <- result + return port, nil +} + +func PerformMatching(matchRequest models.MatchRequest, ctx context.Context, matchFoundChan chan models.MatchFound) { + defer close(matchFoundChan) // Safely close the channel after matching completes + + for { + select { + case <-ctx.Done(): + // Cleaning up, dequeue the user + err := dequeueUser(redisClient, matchRequest) + if err != nil { + log.Println("Failed to dequeue user:", err) + } + + return + + default: + // Continue matching logic... + mu.Lock() + match, err := matchUser(redisClient, matchRequest) + mu.Unlock() + if err != nil { + log.Println("Error occurred during matching:", err) + return + } + + if match != "" { + arr := strings.Split(match, ",") + username := arr[0] + // email := arr[1] + port := arr[2] + matchId := arr[3] + partnerPort, err := getPortNumber(port) + if err != nil { + log.Println("Error occurred while getting PartnerID:", err) + return + } + + result := models.MatchFound{ + Type: "match_found", + MatchID: matchId, + PartnerID: partnerPort, // Use the retrieved PartnerID + PartnerName: username, + } + matchFoundChan <- result + return + } + } + } } diff --git a/apps/matching-service/processes/queue.go b/apps/matching-service/processes/queue.go new file mode 100644 index 0000000000..abe42b5603 --- /dev/null +++ b/apps/matching-service/processes/queue.go @@ -0,0 +1,112 @@ +package processes + +import ( + "context" + "crypto/rand" + "encoding/hex" + "fmt" + "matching-service/models" + + "github.com/redis/go-redis/v9" +) + +var ctx = context.Background() + +func generateMatchID() (string, error) { + // Create a byte slice to hold random data + b := make([]byte, 16) // 16 bytes = 128 bits + _, err := rand.Read(b) + if err != nil { + return "", err + } + + // Encode the byte slice to a hexadecimal string + matchID := hex.EncodeToString(b) + return matchID, nil +} + +func enqueueUser(client *redis.Client, request models.MatchRequest) error { + // Generate the key for storing in redis + key := fmt.Sprintf("queue:%s:%s", request.Topics, request.Difficulties) + + // Concatenate the username,email,port + value := fmt.Sprintf("%s,%s,%s", request.Username, request.Email, request.Port) + + // Push the user into the matching queue + return client.LPush(ctx, key, value).Err() +} + +func dequeueUser(client *redis.Client, request models.MatchRequest) error { + // Generate the key for storing in redis + key := fmt.Sprintf("queue:%s:%s", request.Topics, request.Difficulties) + + value := fmt.Sprintf("%s,%s,%s", request.Username, request.Email, request.Port) + + // Remove user from the matching queue + _, err := client.LRem(ctx, key, 1, value).Result() + return err +} + +func matchUser(client *redis.Client, request models.MatchRequest) (string, error) { + key := fmt.Sprintf("queue:%s:%s", request.Topics, request.Difficulties) + value := fmt.Sprintf("%s,%s,%s", request.Username, request.Email, request.Port) + + // Check if the user is already matched + exists, err := client.HExists(ctx, value, "username").Result() + if err != nil { + return "", err + } + + if exists { + // User is already matched, retrieve their details + matchedUser, err := client.HGetAll(ctx, value).Result() + if err != nil { + return "", err + } + + // Remove from Redis once retrieved + _, err = client.Del(ctx, value).Result() + if err != nil { + return "", err + } + + return fmt.Sprintf("%s,%s,%s,%s", matchedUser["username"], matchedUser["email"], matchedUser["port"], matchedUser["matchid"]), nil + } + + match, err := client.RPop(ctx, key).Result() + + if err == redis.Nil { + // No match found, enqueue user and return nil + enqueueUser(client, request) + return "", nil + } else if err != nil { + return "", err + } + + // Check if the matched user is the same as the requesting user + if match == value { + // If matched user is the same, you can push it back to the queue and try again + client.LPush(ctx, key, match) // Re-add the matched user back to the queue + return "", nil + } + + // Randomly create a matchid + matchID, err := generateMatchID() + if err != nil { + fmt.Println("Error generating match ID:", err) + return "", err + } + + // Store matched user details in a hash + err = client.HSet(ctx, match, map[string]interface{}{ + "username": request.Username, + "email": request.Email, + "port": request.Port, + "matchid": matchID, + }).Err() + if err != nil { + return "", err + } + + return fmt.Sprintf("%s,%s", match, matchID), nil +} From 6becc5c4fe07e31d396ea2604cbb3de8a9e3c6fe Mon Sep 17 00:00:00 2001 From: bensohh Date: Fri, 18 Oct 2024 15:16:24 +0800 Subject: [PATCH 3/7] Revamp matching service and update README --- apps/matching-service/README.md | 20 +- apps/matching-service/handlers/websocket.go | 100 +++++- apps/matching-service/main.go | 6 +- apps/matching-service/models/match.go | 13 +- apps/matching-service/processes/match.go | 116 ++++-- apps/matching-service/processes/queue.go | 333 +++++++++++++++--- .../tests/websocket-test.html | 2 +- 7 files changed, 470 insertions(+), 120 deletions(-) diff --git a/apps/matching-service/README.md b/apps/matching-service/README.md index 3828f81018..702d3dcdf3 100644 --- a/apps/matching-service/README.md +++ b/apps/matching-service/README.md @@ -76,8 +76,7 @@ Client sends matching parameters: "type": "match_request", "topics": ["Algorithms", "Arrays"], "difficulties": ["Easy", "Medium"], - "username": "Jane Doe", - "email": "janedoe@gmail.com" // possible to change to user ID in mongodb + "username": "Jane Doe" } ``` @@ -86,9 +85,11 @@ Server response on successful match: ```json { "type": "match_found", - "matchID": "aa41c004e589642402215c2c0a3a165a", - "partnerID": 54321, // currently identifying via partner's websocket port --> change to user ID in mongodb - "partnerName": "John Doe" // partner username + "matchId": "1c018916a34c5bee21af0b2670bd6156", + "user": "zkb4px", + "matchedUser": "JohnDoe", + "topic": "Algorithms", + "difficulty": "Medium" } ``` @@ -101,6 +102,15 @@ If no match is found after a set period of time, the server will send a timeout } ``` +If user has an existing websocket connection and wants to initiate another match, the server will reject the request: + +```json +{ + "type": "match_rejected", + "message": "You are already in a matchmaking queue. Please disconnect before reconnecting." +} +``` + If the server encounters an issue during the WebSocket connection or processing, the connection will be closed without any error message. The client should treat the unexpected closing as an error. ## Testing diff --git a/apps/matching-service/handlers/websocket.go b/apps/matching-service/handlers/websocket.go index 854bdd54c3..c797b9db48 100644 --- a/apps/matching-service/handlers/websocket.go +++ b/apps/matching-service/handlers/websocket.go @@ -7,16 +7,24 @@ import ( "matching-service/processes" "matching-service/utils" "net/http" + "sync" "github.com/gorilla/websocket" ) -var upgrader = websocket.Upgrader{ - CheckOrigin: func(r *http.Request) bool { - // Allow all connections by skipping the origin check (set more restrictions in production) - return true - }, -} +var ( + upgrader = websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + // Allow all connections by skipping the origin check (set more restrictions in production) + return true + }, + } + // A map to hold active WebSocket connections per username + activeConnections = make(map[string]*websocket.Conn) + // A map to hold user's match ctx cancel function + matchContexts = make(map[string]context.CancelFunc) + mu sync.Mutex // Mutex for thread-safe access to activeConnections +) // handleConnections manages WebSocket connections and matching logic. func HandleWebSocketConnections(w http.ResponseWriter, r *http.Request) { @@ -42,10 +50,32 @@ func HandleWebSocketConnections(w http.ResponseWriter, r *http.Request) { return } + // Store WebSocket connection in the activeConnections map. + mu.Lock() + // Checks if user is already an existing websocket connection + if _, exists := activeConnections[matchRequest.Username]; exists { + mu.Unlock() + log.Printf("User %s is already connected, rejecting new connection.", matchRequest.Username) + ws.WriteJSON(models.MatchRejected{ + Type: "match_rejected", + Message: "You are already in a matchmaking queue. Please disconnect before reconnecting.", + }) + ws.Close() + return + } + activeConnections[matchRequest.Username] = ws + matchCtx, matchCancel := context.WithCancel(context.Background()) + matchContexts[matchRequest.Username] = matchCancel + mu.Unlock() + // Create a context for cancellation ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Ensure cancel is called to release resources + processes.EnqueueUser(processes.GetRedisClient(), matchRequest.Username, ctx) + processes.AddUserToTopicSets(processes.GetRedisClient(), matchRequest, ctx) + processes.StoreUserDetails(processes.GetRedisClient(), matchRequest, ctx) + timeoutCtx, timeoutCancel, err := createTimeoutContext() if err != nil { log.Printf("Error creating timeout context: %v", err) @@ -60,7 +90,7 @@ func HandleWebSocketConnections(w http.ResponseWriter, r *http.Request) { go processes.PerformMatching(matchRequest, ctx, matchFoundChan) // Perform matching // Wait for a match, timeout, or cancellation. - waitForResult(ws, ctx, timeoutCtx, matchFoundChan) + waitForResult(ws, ctx, timeoutCtx, matchCtx, matchFoundChan, matchRequest.Username) } // readMatchRequest reads the initial match request from the WebSocket connection. @@ -69,7 +99,6 @@ func readMatchRequest(ws *websocket.Conn) (models.MatchRequest, error) { if err := ws.ReadJSON(&matchRequest); err != nil { return matchRequest, err } - matchRequest.Port = ws.RemoteAddr().String() log.Printf("Received match request: %v", matchRequest) return matchRequest, nil } @@ -85,25 +114,36 @@ func createTimeoutContext() (context.Context, context.CancelFunc, error) { } // waitForResult waits for a match result, timeout, or cancellation. -func waitForResult(ws *websocket.Conn, ctx, timeoutCtx context.Context, matchFoundChan chan models.MatchFound) { +func waitForResult(ws *websocket.Conn, ctx, timeoutCtx, matchCtx context.Context, matchFoundChan chan models.MatchFound, username string) { select { case <-ctx.Done(): log.Println("Matching cancelled") + // Cleanup Redis + processes.CleanUpUser(processes.GetRedisClient(), username, ctx) return case <-timeoutCtx.Done(): log.Println("Connection timed out") + // Cleanup Redis + processes.CleanUpUser(processes.GetRedisClient(), username, ctx) sendTimeoutResponse(ws) return + case <-matchCtx.Done(): + log.Println("Match found for user HEREEE: " + username) + return case result, ok := <-matchFoundChan: if !ok { // Channel closed without a match, possibly due to context cancellation log.Println("Match channel closed without finding a match") return } - log.Println("Match found") - if err := ws.WriteJSON(result); err != nil { - log.Printf("write error: %v", err) - } + log.Println("Match found for user: " + result.User) + + // Notify the users about the match + notifyMatch(result.User, result.MatchedUser, result) + + // if err := ws.WriteJSON(result); err != nil { + // log.Printf("write error: %v", err) + // } return } } @@ -118,3 +158,37 @@ func sendTimeoutResponse(ws *websocket.Conn) { log.Printf("write error: %v", err) } } + +func notifyMatch(username, matchedUsername string, result models.MatchFound) { + mu.Lock() + defer mu.Unlock() + + // Send message to the first user + if userConn, userExists := activeConnections[username]; userExists { + if err := userConn.WriteJSON(result); err != nil { + log.Printf("Error sending message to user %s: %v\n", username, err) + } + } + + // Send message to the matched user + if matchedUserConn, matchedUserExists := activeConnections[matchedUsername]; matchedUserExists { + result.User, result.MatchedUser = result.MatchedUser, result.User // Swap User and MatchedUser values + if err := matchedUserConn.WriteJSON(result); err != nil { + log.Printf("Error sending message to user %s: %v\n", username, err) + } + } + + // Remove the match context for both users and cancel for matched user + if _, exists := matchContexts[username]; exists { + delete(matchContexts, username) + } + + if cancelFunc, exists := matchContexts[matchedUsername]; exists { + delete(matchContexts, matchedUsername) + cancelFunc() + } + + // Remove users from the activeConnections map + delete(activeConnections, username) + delete(activeConnections, matchedUsername) +} diff --git a/apps/matching-service/main.go b/apps/matching-service/main.go index 655ac5d24a..6688de6547 100644 --- a/apps/matching-service/main.go +++ b/apps/matching-service/main.go @@ -21,7 +21,7 @@ func main() { } port := os.Getenv("PORT") - // Set up link with redis server + // Retrieve redis url env variable and setup the redis client redisAddr := os.Getenv("REDIS_URL") client := redis.NewClient(&redis.Options{ Addr: redisAddr, @@ -37,9 +37,11 @@ func main() { log.Println("Connected to Redis at the following address: " + redisAddr) } - // Pass the connect redis client to processes in match + // Set redis client processes.SetRedisClient(client) + // Run a goroutine that matches users + // Routes http.HandleFunc("/match", handlers.HandleWebSocketConnections) diff --git a/apps/matching-service/models/match.go b/apps/matching-service/models/match.go index 453fecc497..1f832e5637 100644 --- a/apps/matching-service/models/match.go +++ b/apps/matching-service/models/match.go @@ -5,18 +5,23 @@ type MatchRequest struct { Topics []string `json:"topics"` Difficulties []string `json:"difficulties"` Username string `json:"username"` - Email string `json:"email"` - Port string `json:"port"` } type MatchFound struct { Type string `json:"type"` MatchID string `json:"matchId"` - PartnerID int64 `json:"partnerId"` - PartnerName string `json:"partnerName"` + User string `json:"user"` // username + MatchedUser string `json:"matchedUser"` // matched username + Topic string `json:"topic"` // matched topic + Difficulty string `json:"difficulty"` // matched difficulty } type Timeout struct { Type string `json:"timeout"` Message string `json:"message"` } + +type MatchRejected struct { + Type string `json:"type"` + Message string `json:"message"` +} diff --git a/apps/matching-service/processes/match.go b/apps/matching-service/processes/match.go index a4d627a4fe..54019e80a1 100644 --- a/apps/matching-service/processes/match.go +++ b/apps/matching-service/processes/match.go @@ -8,13 +8,15 @@ import ( "strconv" "strings" "sync" + "time" "github.com/redis/go-redis/v9" ) var ( redisClient *redis.Client - mu sync.Mutex + mu sync.Mutex // Mutex to ensure only one matchmaking goroutine is running + ctx = context.Background() ) // SetRedisClient sets the Redis client to a global variable @@ -22,6 +24,11 @@ func SetRedisClient(client *redis.Client) { redisClient = client } +// Get redisclient +func GetRedisClient() *redis.Client { + return redisClient +} + func getPortNumber(addr string) (int64, error) { // Split the string by the colon parts := strings.Split(addr, ":") @@ -39,50 +46,85 @@ func getPortNumber(addr string) (int64, error) { } func PerformMatching(matchRequest models.MatchRequest, ctx context.Context, matchFoundChan chan models.MatchFound) { - defer close(matchFoundChan) // Safely close the channel after matching completes + // Acquire mutex + mu.Lock() + // Defer unlocking the mutex + defer mu.Unlock() for { - select { - case <-ctx.Done(): - // Cleaning up, dequeue the user - err := dequeueUser(redisClient, matchRequest) - if err != nil { - log.Println("Failed to dequeue user:", err) - } - return + // Log queue before matchmaking + // PrintMatchingQueue(redisClient, "Before Matchmaking", context.Background()) + + // Check if the queue is empty + queueLength, err := redisClient.LLen(context.Background(), "matchmaking_queue").Result() + if err != nil { + log.Println("Error checking queue length:", err) + time.Sleep(1 * time.Second) + continue + } + + if queueLength == 0 { + // log.Println("No users in the queue") + time.Sleep(1 * time.Second) + continue + } + + // Peek at the user queue + username, err := redisClient.LIndex(context.Background(), "matchmaking_queue", 0).Result() + if err != nil { + // log.Println("Error peeking user from queue:", err) + time.Sleep(1 * time.Second) + continue + } + + // log.Printf("Performing matching for user: %s", username) + matchedUsername, matchedTopic, matchedDifficulty, err := FindMatchingUser(redisClient, username, ctx) + if err != nil { + // log.Println("Error finding matching user:", err) + time.Sleep(1 * time.Second) + continue + } + + if matchedUsername != "" { + // Log down the state of queue before matchmaking + PrintMatchingQueue(redisClient, "Before Matchmaking", context.Background()) + + // Log down which users got matched + log.Printf("Users %s and %s matched on the topic: %s with difficulty: %s", username, matchedUsername, matchedTopic, matchedDifficulty) - default: - // Continue matching logic... - mu.Lock() - match, err := matchUser(redisClient, matchRequest) - mu.Unlock() + // Clean up queue, sets and hashset in Redis + DequeueUser(redisClient, username, ctx) + DequeueUser(redisClient, matchedUsername, ctx) + RemoveUserFromTopicSets(redisClient, username, ctx) + RemoveUserFromTopicSets(redisClient, matchedUsername, ctx) + RemoveUserDetails(redisClient, username, ctx) + RemoveUserDetails(redisClient, matchedUsername, ctx) + + // Log queue after matchmaking + PrintMatchingQueue(redisClient, "After Matchmaking", context.Background()) + + // Generate a random match ID + matchId, err := GenerateMatchID() if err != nil { - log.Println("Error occurred during matching:", err) - return + log.Println("Unable to randomly generate matchID") } - if match != "" { - arr := strings.Split(match, ",") - username := arr[0] - // email := arr[1] - port := arr[2] - matchId := arr[3] - partnerPort, err := getPortNumber(port) - if err != nil { - log.Println("Error occurred while getting PartnerID:", err) - return - } - - result := models.MatchFound{ - Type: "match_found", - MatchID: matchId, - PartnerID: partnerPort, // Use the retrieved PartnerID - PartnerName: username, - } - matchFoundChan <- result - return + // Signal that a match has been found + matchFoundChan <- models.MatchFound{ + Type: "match_found", + MatchID: matchId, + User: username, + MatchedUser: matchedUsername, + Topic: matchedTopic, + Difficulty: matchedDifficulty, } + + } else { + // log.Printf("No match found for user: %s", username) + + // Pop user and add user back into queue + PopAndInsert(redisClient, username, ctx) } } } diff --git a/apps/matching-service/processes/queue.go b/apps/matching-service/processes/queue.go index abe42b5603..254675f9e7 100644 --- a/apps/matching-service/processes/queue.go +++ b/apps/matching-service/processes/queue.go @@ -4,109 +4,326 @@ import ( "context" "crypto/rand" "encoding/hex" + "encoding/json" "fmt" + "log" "matching-service/models" + "strings" + "sync" "github.com/redis/go-redis/v9" ) -var ctx = context.Background() +var mutex sync.Mutex // Mutex for concurrency safety -func generateMatchID() (string, error) { - // Create a byte slice to hold random data +// To simulate generating a random matchID for collaboration service (TODO: Future) +func GenerateMatchID() (string, error) { b := make([]byte, 16) // 16 bytes = 128 bits _, err := rand.Read(b) if err != nil { return "", err } - - // Encode the byte slice to a hexadecimal string matchID := hex.EncodeToString(b) return matchID, nil } -func enqueueUser(client *redis.Client, request models.MatchRequest) error { - // Generate the key for storing in redis - key := fmt.Sprintf("queue:%s:%s", request.Topics, request.Difficulties) +// Print existing users in the matching queue +func PrintMatchingQueue(client *redis.Client, status string, ctx context.Context) { + mutex.Lock() + defer mutex.Unlock() + + users, err := client.LRange(ctx, "matchmaking_queue", 0, -1).Result() + if err != nil { + log.Println("Error retrieving users from queue:", err) + return + } - // Concatenate the username,email,port - value := fmt.Sprintf("%s,%s,%s", request.Username, request.Email, request.Port) + var concatenatedUsers strings.Builder + for i, user := range users { + concatenatedUsers.WriteString(user) + if i != len(users)-1 { + concatenatedUsers.WriteString(", ") + } + } - // Push the user into the matching queue - return client.LPush(ctx, key, value).Err() + log.Println("Redis Queue (" + status + "): " + concatenatedUsers.String()) } -func dequeueUser(client *redis.Client, request models.MatchRequest) error { - // Generate the key for storing in redis - key := fmt.Sprintf("queue:%s:%s", request.Topics, request.Difficulties) +// - value := fmt.Sprintf("%s,%s,%s", request.Username, request.Email, request.Port) +// Enqueue a user into the matchmaking queue +func EnqueueUser(client *redis.Client, username string, ctx context.Context) { + mutex.Lock() + defer mutex.Unlock() - // Remove user from the matching queue - _, err := client.LRem(ctx, key, 1, value).Result() - return err + key := "matchmaking_queue" + err := client.LPush(ctx, key, username).Err() + if err != nil { + log.Println("Error enqueuing user:", err) + } } -func matchUser(client *redis.Client, request models.MatchRequest) (string, error) { - key := fmt.Sprintf("queue:%s:%s", request.Topics, request.Difficulties) - value := fmt.Sprintf("%s,%s,%s", request.Username, request.Email, request.Port) +// Remove user from the matchmaking queue +func DequeueUser(client *redis.Client, username string, ctx context.Context) { + mutex.Lock() + defer mutex.Unlock() - // Check if the user is already matched - exists, err := client.HExists(ctx, value, "username").Result() + key := "matchmaking_queue" + err := client.LRem(ctx, key, 1, username).Err() if err != nil { - return "", err + log.Println("Error dequeuing user:", err) } +} + +// Add user into each specified topic set based on the topics selected by users +func AddUserToTopicSets(client *redis.Client, request models.MatchRequest, ctx context.Context) { + mutex.Lock() + defer mutex.Unlock() - if exists { - // User is already matched, retrieve their details - matchedUser, err := client.HGetAll(ctx, value).Result() + for _, topic := range request.Topics { + err := client.SAdd(ctx, strings.ToLower(topic), request.Username).Err() if err != nil { - return "", err + log.Println("Error adding user to topic set:", err) } + } +} + +// Remove user from each specified topic set based on the topics selected by users +func RemoveUserFromTopicSets(client *redis.Client, username string, ctx context.Context) { + mutex.Lock() + defer mutex.Unlock() + + request, err := GetUserDetails(client, username, ctx) + if err != nil { + log.Println("Error retrieving user from hashset:", err) + return + } - // Remove from Redis once retrieved - _, err = client.Del(ctx, value).Result() + for _, topic := range request.Topics { + err := client.SRem(ctx, strings.ToLower(topic), request.Username).Err() if err != nil { - return "", err + log.Println("Error removing user from topic set:", err) } + } +} + +// Add user details into hashset in Redis +func StoreUserDetails(client *redis.Client, request models.MatchRequest, ctx context.Context) { + mutex.Lock() + defer mutex.Unlock() - return fmt.Sprintf("%s,%s,%s,%s", matchedUser["username"], matchedUser["email"], matchedUser["port"], matchedUser["matchid"]), nil + topicsJSON, err := json.Marshal(request.Topics) + if err != nil { + log.Println("Error marshalling topics:", err) + return } - match, err := client.RPop(ctx, key).Result() + difficultiesJSON, err := json.Marshal(request.Difficulties) + if err != nil { + log.Println("Error marshalling difficulties:", err) + return + } - if err == redis.Nil { - // No match found, enqueue user and return nil - enqueueUser(client, request) - return "", nil - } else if err != nil { - return "", err + err = client.HSet(ctx, request.Username, map[string]interface{}{ + "topics": topicsJSON, + "difficulty": difficultiesJSON, + "username": request.Username, + }).Err() + if err != nil { + log.Println("Error storing user details:", err) } +} - // Check if the matched user is the same as the requesting user - if match == value { - // If matched user is the same, you can push it back to the queue and try again - client.LPush(ctx, key, match) // Re-add the matched user back to the queue - return "", nil +// Retrieve user details from hashset in Redis +func GetUserDetails(client *redis.Client, username string, ctx context.Context) (models.MatchRequest, error) { + userDetails, err := client.HGetAll(ctx, username).Result() + if err != nil { + return models.MatchRequest{}, err } - // Randomly create a matchid - matchID, err := generateMatchID() + if len(userDetails) == 0 { + return models.MatchRequest{}, fmt.Errorf("user not found in hashset: %s", username) + } + + topicsJSON, topicsExist := userDetails["topics"] + difficultiesJSON, difficultiesExist := userDetails["difficulty"] + + if !topicsExist || !difficultiesExist { + return models.MatchRequest{}, fmt.Errorf("incomplete user details for: %s", username) + } + + var topics []string + err = json.Unmarshal([]byte(topicsJSON), &topics) if err != nil { - fmt.Println("Error generating match ID:", err) - return "", err + return models.MatchRequest{}, fmt.Errorf("error unmarshalling topics: %v", err) } - // Store matched user details in a hash - err = client.HSet(ctx, match, map[string]interface{}{ - "username": request.Username, - "email": request.Email, - "port": request.Port, - "matchid": matchID, - }).Err() + var difficulties []string + err = json.Unmarshal([]byte(difficultiesJSON), &difficulties) if err != nil { - return "", err + return models.MatchRequest{}, fmt.Errorf("error unmarshalling difficulties: %v", err) + } + + matchRequest := models.MatchRequest{ + Topics: topics, + Difficulties: difficulties, + Username: username, + } + + return matchRequest, nil +} + +// Remove user details from HashSet +func RemoveUserDetails(client *redis.Client, username string, ctx context.Context) { + mutex.Lock() + defer mutex.Unlock() + + err := client.Del(ctx, username).Err() + if err != nil { + log.Println("Error removing user details:", err) + } +} + +// Find the first matching user based on topics +func FindMatchingUser(client *redis.Client, username string, ctx context.Context) (string, string, string, error) { + mutex.Lock() + defer mutex.Unlock() + + user, err := GetUserDetails(client, username, ctx) + if err != nil { + return "", "", "", err + } + + for _, topic := range user.Topics { + users, err := client.SMembers(ctx, strings.ToLower(topic)).Result() + if err != nil { + return "", "", "", err + } + + for _, potentialMatch := range users { + if potentialMatch != username { + matchedUser, err := GetUserDetails(client, potentialMatch, ctx) + if err != nil { + return "", "", "", err + } + + commonDifficulty := GetCommonDifficulty(user.Difficulties, matchedUser.Difficulties) + return potentialMatch, topic, commonDifficulty, nil + } + } } - return fmt.Sprintf("%s,%s", match, matchID), nil + return "", "", "", nil +} + +// Get the highest common difficulty between two users, if no common difficulty found, choose the min of the 2 arrays +func GetCommonDifficulty(userArr []string, matchedUserArr []string) string { + commonDifficulties := make([]int, 3) + for i := range commonDifficulties { + commonDifficulties[i] = 0 + } + + for _, difficulty := range userArr { + formattedDifficulty := strings.ToLower(difficulty) + switch formattedDifficulty { + case "easy": + commonDifficulties[0]++ + case "medium": + commonDifficulties[1]++ + case "hard": + commonDifficulties[2]++ + default: + log.Println("Unknown difficulty specified: " + difficulty) + } + } + + for _, difficulty := range matchedUserArr { + formattedDifficulty := strings.ToLower(difficulty) + switch formattedDifficulty { + case "easy": + commonDifficulties[0]++ + case "medium": + commonDifficulties[1]++ + case "hard": + commonDifficulties[2]++ + default: + log.Println("Unknown difficulty specified: " + difficulty) + } + } + + lowest := "Hard" + for i := 2; i >= 0; i-- { + if commonDifficulties[i] == 2 { + switch i { + case 0: + return "Easy" + case 1: + return "Medium" + case 2: + return "Hard" + } + } else if commonDifficulties[i] > 0 { + switch i { + case 0: + lowest = "Easy" + case 1: + lowest = "Medium" + case 2: + lowest = "Hard" + } + } + } + return lowest +} + +func CleanUpUser(client *redis.Client, username string, ctx context.Context) { + mutex.Lock() + defer mutex.Unlock() + // Dequeue user + key := "matchmaking_queue" + err := client.LRem(ctx, key, 1, username).Err() + if err != nil { + log.Println("Error dequeuing user:", err) + } + + // Remove user from topic sets + request, err := GetUserDetails(client, username, ctx) + if err != nil { + log.Println("Error retrieving user from hashset:", err) + return + } + + for _, topic := range request.Topics { + err := client.SRem(ctx, strings.ToLower(topic), request.Username).Err() + if err != nil { + log.Println("Error removing user from topic set:", err) + } + } + + // Remove user details + err = client.Del(ctx, username).Err() + if err != nil { + log.Println("Error removing user details:", err) + } + return +} + +func PopAndInsert(client *redis.Client, username string, ctx context.Context) { + // Acquire Lock + mutex.Lock() + defer mutex.Unlock() + + // Pop user + username, err := client.LPop(ctx, "matchmaking_queue").Result() + if err != nil { + log.Println("Error popping user from queue:", err) + } + + // Insert back in queue + key := "matchmaking_queue" + err = client.LPush(ctx, key, username).Err() + if err != nil { + log.Println("Error enqueuing user:", err) + } + return } diff --git a/apps/matching-service/tests/websocket-test.html b/apps/matching-service/tests/websocket-test.html index 4fb9ebfe75..86d2be302c 100644 --- a/apps/matching-service/tests/websocket-test.html +++ b/apps/matching-service/tests/websocket-test.html @@ -42,8 +42,8 @@ type: "match_request", topics: ["Algorithms", "Arrays"], difficulties: ["Easy", "Medium"], + // username: "JohnDoe", // Uncomment for same user test for rejection username: arr[0], - email: arr[1], }) ); From 021eade16a4b1d9126f40cc56df5b766fb4fa191 Mon Sep 17 00:00:00 2001 From: bensohh Date: Fri, 18 Oct 2024 15:35:52 +0800 Subject: [PATCH 4/7] Add port into logging --- apps/matching-service/handlers/websocket.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/apps/matching-service/handlers/websocket.go b/apps/matching-service/handlers/websocket.go index c797b9db48..af113c1048 100644 --- a/apps/matching-service/handlers/websocket.go +++ b/apps/matching-service/handlers/websocket.go @@ -7,6 +7,7 @@ import ( "matching-service/processes" "matching-service/utils" "net/http" + "strings" "sync" "github.com/gorilla/websocket" @@ -99,7 +100,13 @@ func readMatchRequest(ws *websocket.Conn) (models.MatchRequest, error) { if err := ws.ReadJSON(&matchRequest); err != nil { return matchRequest, err } - log.Printf("Received match request: %v", matchRequest) + // Get the remote address (client's IP and port) + clientAddr := ws.RemoteAddr().String() + + // Extract the port (after the last ':') + clientPort := clientAddr[strings.LastIndex(clientAddr, ":")+1:] + + log.Printf("Received match request: %v from client port: %s", matchRequest, clientPort) return matchRequest, nil } @@ -137,7 +144,6 @@ func waitForResult(ws *websocket.Conn, ctx, timeoutCtx, matchCtx context.Context return } log.Println("Match found for user: " + result.User) - // Notify the users about the match notifyMatch(result.User, result.MatchedUser, result) @@ -185,7 +191,7 @@ func notifyMatch(username, matchedUsername string, result models.MatchFound) { if cancelFunc, exists := matchContexts[matchedUsername]; exists { delete(matchContexts, matchedUsername) - cancelFunc() + defer cancelFunc() // TODO: CancelFunction here is not causing the matchCtx to be done } // Remove users from the activeConnections map From 58f9b3bea51f63bf82069a2c10535acf7bf4bc71 Mon Sep 17 00:00:00 2001 From: bensohh Date: Fri, 18 Oct 2024 16:01:58 +0800 Subject: [PATCH 5/7] Update context so cleaning up after cancellation works --- apps/matching-service/handlers/websocket.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/matching-service/handlers/websocket.go b/apps/matching-service/handlers/websocket.go index af113c1048..4d553faff7 100644 --- a/apps/matching-service/handlers/websocket.go +++ b/apps/matching-service/handlers/websocket.go @@ -126,12 +126,12 @@ func waitForResult(ws *websocket.Conn, ctx, timeoutCtx, matchCtx context.Context case <-ctx.Done(): log.Println("Matching cancelled") // Cleanup Redis - processes.CleanUpUser(processes.GetRedisClient(), username, ctx) + processes.CleanUpUser(processes.GetRedisClient(), username, context.Background()) return case <-timeoutCtx.Done(): log.Println("Connection timed out") // Cleanup Redis - processes.CleanUpUser(processes.GetRedisClient(), username, ctx) + processes.CleanUpUser(processes.GetRedisClient(), username, context.Background()) sendTimeoutResponse(ws) return case <-matchCtx.Done(): From 7228473f5ed3a2720915a1017a06e6e2029aefc0 Mon Sep 17 00:00:00 2001 From: bensohh Date: Sat, 19 Oct 2024 00:59:31 +0800 Subject: [PATCH 6/7] Fix error with matchmaking service --- apps/matching-service/handlers/websocket.go | 56 ++++++++++++++++----- apps/matching-service/processes/match.go | 8 +-- 2 files changed, 47 insertions(+), 17 deletions(-) diff --git a/apps/matching-service/handlers/websocket.go b/apps/matching-service/handlers/websocket.go index 4d553faff7..066466e9ee 100644 --- a/apps/matching-service/handlers/websocket.go +++ b/apps/matching-service/handlers/websocket.go @@ -24,7 +24,9 @@ var ( activeConnections = make(map[string]*websocket.Conn) // A map to hold user's match ctx cancel function matchContexts = make(map[string]context.CancelFunc) - mu sync.Mutex // Mutex for thread-safe access to activeConnections + // A map to hold user's match channels + matchFoundChannels = make(map[string]chan models.MatchFound) + mu sync.Mutex // Mutex for thread-safe access to activeConnections ) // handleConnections manages WebSocket connections and matching logic. @@ -67,6 +69,9 @@ func HandleWebSocketConnections(w http.ResponseWriter, r *http.Request) { activeConnections[matchRequest.Username] = ws matchCtx, matchCancel := context.WithCancel(context.Background()) matchContexts[matchRequest.Username] = matchCancel + + matchFoundChan := make(chan models.MatchFound) + matchFoundChannels[matchRequest.Username] = matchFoundChan mu.Unlock() // Create a context for cancellation @@ -84,11 +89,9 @@ func HandleWebSocketConnections(w http.ResponseWriter, r *http.Request) { } defer timeoutCancel() - matchFoundChan := make(chan models.MatchFound) - // Start goroutines for handling messages and performing matching. go processes.ReadMessages(ws, ctx, cancel) - go processes.PerformMatching(matchRequest, ctx, matchFoundChan) // Perform matching + go processes.PerformMatching(matchRequest, context.Background(), matchFoundChannels) // Perform matching // Wait for a match, timeout, or cancellation. waitForResult(ws, ctx, timeoutCtx, matchCtx, matchFoundChan, matchRequest.Username) @@ -127,15 +130,37 @@ func waitForResult(ws *websocket.Conn, ctx, timeoutCtx, matchCtx context.Context log.Println("Matching cancelled") // Cleanup Redis processes.CleanUpUser(processes.GetRedisClient(), username, context.Background()) + // Remove the match context and active + if _, exists := matchContexts[username]; exists { + delete(matchContexts, username) + } + if _, exists := activeConnections[username]; exists { + delete(activeConnections, username) + } + if _, exists := matchFoundChannels[username]; exists { + delete(matchFoundChannels, username) + } + return case <-timeoutCtx.Done(): log.Println("Connection timed out") // Cleanup Redis processes.CleanUpUser(processes.GetRedisClient(), username, context.Background()) + // Remove the match context and active + if _, exists := matchContexts[username]; exists { + delete(matchContexts, username) + } + if _, exists := activeConnections[username]; exists { + delete(activeConnections, username) + } + if _, exists := matchFoundChannels[username]; exists { + delete(matchFoundChannels, username) + } + sendTimeoutResponse(ws) return case <-matchCtx.Done(): - log.Println("Match found for user HEREEE: " + username) + log.Println("Match found for user: " + username) return case result, ok := <-matchFoundChan: if !ok { @@ -143,13 +168,9 @@ func waitForResult(ws *websocket.Conn, ctx, timeoutCtx, matchCtx context.Context log.Println("Match channel closed without finding a match") return } - log.Println("Match found for user: " + result.User) + log.Println("Match found for user: " + username) // Notify the users about the match notifyMatch(result.User, result.MatchedUser, result) - - // if err := ws.WriteJSON(result); err != nil { - // log.Printf("write error: %v", err) - // } return } } @@ -185,13 +206,22 @@ func notifyMatch(username, matchedUsername string, result models.MatchFound) { } // Remove the match context for both users and cancel for matched user - if _, exists := matchContexts[username]; exists { + if cancelFunc, exists := matchContexts[username]; exists { + cancelFunc() delete(matchContexts, username) } - if cancelFunc, exists := matchContexts[matchedUsername]; exists { + if cancelFunc2, exists := matchContexts[matchedUsername]; exists { + cancelFunc2() delete(matchContexts, matchedUsername) - defer cancelFunc() // TODO: CancelFunction here is not causing the matchCtx to be done + } + + // Remove the match channels + if _, exists := matchFoundChannels[username]; exists { + delete(matchFoundChannels, username) + } + if _, exists := matchFoundChannels[matchedUsername]; exists { + delete(matchFoundChannels, matchedUsername) } // Remove users from the activeConnections map diff --git a/apps/matching-service/processes/match.go b/apps/matching-service/processes/match.go index 54019e80a1..22a264eb30 100644 --- a/apps/matching-service/processes/match.go +++ b/apps/matching-service/processes/match.go @@ -45,7 +45,7 @@ func getPortNumber(addr string) (int64, error) { return port, nil } -func PerformMatching(matchRequest models.MatchRequest, ctx context.Context, matchFoundChan chan models.MatchFound) { +func PerformMatching(matchRequest models.MatchRequest, ctx context.Context, matchFoundChannels map[string]chan models.MatchFound) { // Acquire mutex mu.Lock() // Defer unlocking the mutex @@ -73,7 +73,7 @@ func PerformMatching(matchRequest models.MatchRequest, ctx context.Context, matc // Peek at the user queue username, err := redisClient.LIndex(context.Background(), "matchmaking_queue", 0).Result() if err != nil { - // log.Println("Error peeking user from queue:", err) + log.Println("Error peeking user from queue:", err) time.Sleep(1 * time.Second) continue } @@ -81,7 +81,7 @@ func PerformMatching(matchRequest models.MatchRequest, ctx context.Context, matc // log.Printf("Performing matching for user: %s", username) matchedUsername, matchedTopic, matchedDifficulty, err := FindMatchingUser(redisClient, username, ctx) if err != nil { - // log.Println("Error finding matching user:", err) + log.Println("Error finding matching user:", err) time.Sleep(1 * time.Second) continue } @@ -111,7 +111,7 @@ func PerformMatching(matchRequest models.MatchRequest, ctx context.Context, matc } // Signal that a match has been found - matchFoundChan <- models.MatchFound{ + matchFoundChannels[username] <- models.MatchFound{ Type: "match_found", MatchID: matchId, User: username, From ad3397d142722eb02d640cc97b5e0ae3e0e0e4a9 Mon Sep 17 00:00:00 2001 From: tituschewxj Date: Sat, 19 Oct 2024 04:49:01 +0800 Subject: [PATCH 7/7] fix: json for timeout --- apps/matching-service/models/match.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/matching-service/models/match.go b/apps/matching-service/models/match.go index 1f832e5637..ad555b555b 100644 --- a/apps/matching-service/models/match.go +++ b/apps/matching-service/models/match.go @@ -17,7 +17,7 @@ type MatchFound struct { } type Timeout struct { - Type string `json:"timeout"` + Type string `json:"type"` Message string `json:"message"` }