From fea8537ec28537cc9308f4f431046a6970c4d03a Mon Sep 17 00:00:00 2001 From: tituschewxj Date: Sat, 19 Oct 2024 08:03:17 +0800 Subject: [PATCH] refactor: clean up files in matching-service --- apps/matching-service/README.md | 2 +- apps/matching-service/handlers/websocket.go | 80 ++++------- apps/matching-service/main.go | 30 +--- apps/matching-service/models/complexity.go | 67 +++++++++ apps/matching-service/processes/constants.go | 17 +++ apps/matching-service/processes/match.go | 67 +++------ apps/matching-service/processes/queue.go | 138 ++++--------------- apps/matching-service/processes/redis.go | 35 +++++ apps/matching-service/utils/match.go | 17 +++ 9 files changed, 217 insertions(+), 236 deletions(-) create mode 100644 apps/matching-service/models/complexity.go create mode 100644 apps/matching-service/processes/constants.go create mode 100644 apps/matching-service/processes/redis.go create mode 100644 apps/matching-service/utils/match.go diff --git a/apps/matching-service/README.md b/apps/matching-service/README.md index 702d3dcdf3..826565ac2b 100644 --- a/apps/matching-service/README.md +++ b/apps/matching-service/README.md @@ -29,7 +29,7 @@ go mod tidy - `MATCH_TIMEOUT`: The time in seconds to wait for a match before timing out. - `REDIS_URL`: The URL for the Redis server. Default is `localhost:6379`. -4. Start a local redis server: +4. Start a local Redis server: ```bash docker run -d -p 6379:6379 redis diff --git a/apps/matching-service/handlers/websocket.go b/apps/matching-service/handlers/websocket.go index 066466e9ee..0c417c7e1d 100644 --- a/apps/matching-service/handlers/websocket.go +++ b/apps/matching-service/handlers/websocket.go @@ -123,44 +123,42 @@ func createTimeoutContext() (context.Context, context.CancelFunc, error) { return ctx, cancel, nil } +// Cleans up the data associated with the user before ending the websocket connection. +// If user is already removed, then nothing happens. +func cleanUpUser(username string) { + // Cleanup Redis + processes.CleanUpUser(processes.GetRedisClient(), username, context.Background()) + + // Removes the match context and active + if cancelFunc, exists := matchContexts[username]; exists { + cancelFunc() + delete(matchContexts, username) + } + if _, exists := activeConnections[username]; exists { + delete(activeConnections, username) + } + if _, exists := matchFoundChannels[username]; exists { + delete(matchFoundChannels, username) + } +} + // waitForResult waits for a match result, timeout, or cancellation. func waitForResult(ws *websocket.Conn, ctx, timeoutCtx, matchCtx context.Context, matchFoundChan chan models.MatchFound, username string) { select { case <-ctx.Done(): log.Println("Matching cancelled") - // Cleanup Redis - processes.CleanUpUser(processes.GetRedisClient(), username, context.Background()) - // Remove the match context and active - if _, exists := matchContexts[username]; exists { - delete(matchContexts, username) - } - if _, exists := activeConnections[username]; exists { - delete(activeConnections, username) - } - if _, exists := matchFoundChannels[username]; exists { - delete(matchFoundChannels, username) - } - + cleanUpUser(username) return case <-timeoutCtx.Done(): log.Println("Connection timed out") - // Cleanup Redis - processes.CleanUpUser(processes.GetRedisClient(), username, context.Background()) - // Remove the match context and active - if _, exists := matchContexts[username]; exists { - delete(matchContexts, username) - } - if _, exists := activeConnections[username]; exists { - delete(activeConnections, username) - } - if _, exists := matchFoundChannels[username]; exists { - delete(matchFoundChannels, username) - } - sendTimeoutResponse(ws) + cleanUpUser(username) return case <-matchCtx.Done(): log.Println("Match found for user: " + username) + + // NOTE: user is already cleaned-up in the other process, + // so there is no need to clean up again. return case result, ok := <-matchFoundChan: if !ok { @@ -170,7 +168,10 @@ func waitForResult(ws *websocket.Conn, ctx, timeoutCtx, matchCtx context.Context } log.Println("Match found for user: " + username) // Notify the users about the match - notifyMatch(result.User, result.MatchedUser, result) + notifyMatches(result.User, result.MatchedUser, result) + + // NOTE: user and other user are already cleaned up in a separate matching algorithm process + // so no clean up is required here. return } } @@ -186,7 +187,7 @@ func sendTimeoutResponse(ws *websocket.Conn) { } } -func notifyMatch(username, matchedUsername string, result models.MatchFound) { +func notifyMatches(username, matchedUsername string, result models.MatchFound) { mu.Lock() defer mu.Unlock() @@ -204,27 +205,4 @@ func notifyMatch(username, matchedUsername string, result models.MatchFound) { log.Printf("Error sending message to user %s: %v\n", username, err) } } - - // Remove the match context for both users and cancel for matched user - if cancelFunc, exists := matchContexts[username]; exists { - cancelFunc() - delete(matchContexts, username) - } - - if cancelFunc2, exists := matchContexts[matchedUsername]; exists { - cancelFunc2() - delete(matchContexts, matchedUsername) - } - - // Remove the match channels - if _, exists := matchFoundChannels[username]; exists { - delete(matchFoundChannels, username) - } - if _, exists := matchFoundChannels[matchedUsername]; exists { - delete(matchFoundChannels, matchedUsername) - } - - // Remove users from the activeConnections map - delete(activeConnections, username) - delete(activeConnections, matchedUsername) } diff --git a/apps/matching-service/main.go b/apps/matching-service/main.go index 6688de6547..a19a417922 100644 --- a/apps/matching-service/main.go +++ b/apps/matching-service/main.go @@ -1,7 +1,6 @@ package main import ( - "context" "fmt" "log" "matching-service/handlers" @@ -10,7 +9,6 @@ import ( "os" "github.com/joho/godotenv" - "github.com/redis/go-redis/v9" ) func main() { @@ -19,33 +17,17 @@ func main() { if err != nil { log.Fatalf("err loading: %v", err) } - port := os.Getenv("PORT") - - // Retrieve redis url env variable and setup the redis client - redisAddr := os.Getenv("REDIS_URL") - client := redis.NewClient(&redis.Options{ - Addr: redisAddr, - Password: "", // no password set - DB: 0, // use default DB - }) - - // Ping the redis server - _, err = client.Ping(context.Background()).Result() - if err != nil { - log.Fatalf("Could not connect to Redis: %v", err) - } else { - log.Println("Connected to Redis at the following address: " + redisAddr) - } - - // Set redis client - processes.SetRedisClient(client) + + // Setup redis client + processes.SetupRedisClient() // Run a goroutine that matches users - + // Routes http.HandleFunc("/match", handlers.HandleWebSocketConnections) - + // Start the server + port := os.Getenv("PORT") log.Println(fmt.Sprintf("Server starting on :%s", port)) err = http.ListenAndServe(fmt.Sprintf(":%s", port), nil) if err != nil { diff --git a/apps/matching-service/models/complexity.go b/apps/matching-service/models/complexity.go new file mode 100644 index 0000000000..88a5e03ebe --- /dev/null +++ b/apps/matching-service/models/complexity.go @@ -0,0 +1,67 @@ +package models + +import ( + "log" + "strings" +) + +// Get the highest common difficulty (aka complexity) between two users, +// If no common difficulty found, choose the min of the 2 arrays. +func GetCommonDifficulty(userArr []string, matchedUserArr []string) string { + commonDifficulties := make([]int, 3) + for i := range commonDifficulties { + commonDifficulties[i] = 0 + } + + for _, difficulty := range userArr { + formattedDifficulty := strings.ToLower(difficulty) + switch formattedDifficulty { + case "easy": + commonDifficulties[0]++ + case "medium": + commonDifficulties[1]++ + case "hard": + commonDifficulties[2]++ + default: + log.Println("Unknown difficulty specified: " + difficulty) + } + } + + for _, difficulty := range matchedUserArr { + formattedDifficulty := strings.ToLower(difficulty) + switch formattedDifficulty { + case "easy": + commonDifficulties[0]++ + case "medium": + commonDifficulties[1]++ + case "hard": + commonDifficulties[2]++ + default: + log.Println("Unknown difficulty specified: " + difficulty) + } + } + + lowest := "Hard" + for i := 2; i >= 0; i-- { + if commonDifficulties[i] == 2 { + switch i { + case 0: + return "Easy" + case 1: + return "Medium" + case 2: + return "Hard" + } + } else if commonDifficulties[i] > 0 { + switch i { + case 0: + lowest = "Easy" + case 1: + lowest = "Medium" + case 2: + lowest = "Hard" + } + } + } + return lowest +} diff --git a/apps/matching-service/processes/constants.go b/apps/matching-service/processes/constants.go new file mode 100644 index 0000000000..eb53b5879e --- /dev/null +++ b/apps/matching-service/processes/constants.go @@ -0,0 +1,17 @@ +package processes + +import ( + "context" + "sync" + + "github.com/redis/go-redis/v9" +) + +const matchmakingQueueRedisKey = "matchmaking_queue" + +var ( + redisClient *redis.Client + matchingRoutineMutex sync.Mutex // Mutex to ensure only one matchmaking goroutine is running + redisAccessMutex sync.Mutex // Mutex for Redis access for concurrency safety + ctx = context.Background() +) diff --git a/apps/matching-service/processes/match.go b/apps/matching-service/processes/match.go index 22a264eb30..e09f546798 100644 --- a/apps/matching-service/processes/match.go +++ b/apps/matching-service/processes/match.go @@ -2,62 +2,26 @@ package processes import ( "context" - "fmt" "log" "matching-service/models" - "strconv" - "strings" - "sync" + "matching-service/utils" "time" "github.com/redis/go-redis/v9" ) -var ( - redisClient *redis.Client - mu sync.Mutex // Mutex to ensure only one matchmaking goroutine is running - ctx = context.Background() -) - -// SetRedisClient sets the Redis client to a global variable -func SetRedisClient(client *redis.Client) { - redisClient = client -} - -// Get redisclient -func GetRedisClient() *redis.Client { - return redisClient -} - -func getPortNumber(addr string) (int64, error) { - // Split the string by the colon - parts := strings.Split(addr, ":") - if len(parts) < 2 { - return 0, fmt.Errorf("no port number found") - } - - // Convert the port string to an integer - port, err := strconv.ParseInt(parts[len(parts)-1], 10, 64) - if err != nil { - return 0, err // Return an error if conversion fails - } - - return port, nil -} - func PerformMatching(matchRequest models.MatchRequest, ctx context.Context, matchFoundChannels map[string]chan models.MatchFound) { // Acquire mutex - mu.Lock() + matchingRoutineMutex.Lock() // Defer unlocking the mutex - defer mu.Unlock() + defer matchingRoutineMutex.Unlock() for { - // Log queue before matchmaking // PrintMatchingQueue(redisClient, "Before Matchmaking", context.Background()) // Check if the queue is empty - queueLength, err := redisClient.LLen(context.Background(), "matchmaking_queue").Result() + queueLength, err := redisClient.LLen(context.Background(), matchmakingQueueRedisKey).Result() if err != nil { log.Println("Error checking queue length:", err) time.Sleep(1 * time.Second) @@ -71,7 +35,7 @@ func PerformMatching(matchRequest models.MatchRequest, ctx context.Context, matc } // Peek at the user queue - username, err := redisClient.LIndex(context.Background(), "matchmaking_queue", 0).Result() + username, err := redisClient.LIndex(context.Background(), matchmakingQueueRedisKey, 0).Result() if err != nil { log.Println("Error peeking user from queue:", err) time.Sleep(1 * time.Second) @@ -93,19 +57,15 @@ func PerformMatching(matchRequest models.MatchRequest, ctx context.Context, matc // Log down which users got matched log.Printf("Users %s and %s matched on the topic: %s with difficulty: %s", username, matchedUsername, matchedTopic, matchedDifficulty) - // Clean up queue, sets and hashset in Redis - DequeueUser(redisClient, username, ctx) - DequeueUser(redisClient, matchedUsername, ctx) - RemoveUserFromTopicSets(redisClient, username, ctx) - RemoveUserFromTopicSets(redisClient, matchedUsername, ctx) - RemoveUserDetails(redisClient, username, ctx) - RemoveUserDetails(redisClient, matchedUsername, ctx) - // Log queue after matchmaking PrintMatchingQueue(redisClient, "After Matchmaking", context.Background()) + // Clean up redis for this match + cleanUp(redisClient, username, ctx) + cleanUp(redisClient, matchedUsername, ctx) + // Generate a random match ID - matchId, err := GenerateMatchID() + matchId, err := utils.GenerateMatchID() if err != nil { log.Println("Unable to randomly generate matchID") } @@ -128,3 +88,10 @@ func PerformMatching(matchRequest models.MatchRequest, ctx context.Context, matc } } } + +// Clean up queue, sets and hashset in Redis +func cleanUp(redisClient *redis.Client, username string, ctx context.Context) { + DequeueUser(redisClient, username, ctx) + RemoveUserFromTopicSets(redisClient, username, ctx) + RemoveUserDetails(redisClient, username, ctx) +} diff --git a/apps/matching-service/processes/queue.go b/apps/matching-service/processes/queue.go index 254675f9e7..fbf2a865e8 100644 --- a/apps/matching-service/processes/queue.go +++ b/apps/matching-service/processes/queue.go @@ -2,37 +2,21 @@ package processes import ( "context" - "crypto/rand" - "encoding/hex" "encoding/json" "fmt" "log" "matching-service/models" "strings" - "sync" "github.com/redis/go-redis/v9" ) -var mutex sync.Mutex // Mutex for concurrency safety - -// To simulate generating a random matchID for collaboration service (TODO: Future) -func GenerateMatchID() (string, error) { - b := make([]byte, 16) // 16 bytes = 128 bits - _, err := rand.Read(b) - if err != nil { - return "", err - } - matchID := hex.EncodeToString(b) - return matchID, nil -} - // Print existing users in the matching queue func PrintMatchingQueue(client *redis.Client, status string, ctx context.Context) { - mutex.Lock() - defer mutex.Unlock() + redisAccessMutex.Lock() + defer redisAccessMutex.Unlock() - users, err := client.LRange(ctx, "matchmaking_queue", 0, -1).Result() + users, err := client.LRange(ctx, matchmakingQueueRedisKey, 0, -1).Result() if err != nil { log.Println("Error retrieving users from queue:", err) return @@ -49,15 +33,12 @@ func PrintMatchingQueue(client *redis.Client, status string, ctx context.Context log.Println("Redis Queue (" + status + "): " + concatenatedUsers.String()) } -// - // Enqueue a user into the matchmaking queue func EnqueueUser(client *redis.Client, username string, ctx context.Context) { - mutex.Lock() - defer mutex.Unlock() + redisAccessMutex.Lock() + defer redisAccessMutex.Unlock() - key := "matchmaking_queue" - err := client.LPush(ctx, key, username).Err() + err := client.LPush(ctx, matchmakingQueueRedisKey, username).Err() if err != nil { log.Println("Error enqueuing user:", err) } @@ -65,11 +46,10 @@ func EnqueueUser(client *redis.Client, username string, ctx context.Context) { // Remove user from the matchmaking queue func DequeueUser(client *redis.Client, username string, ctx context.Context) { - mutex.Lock() - defer mutex.Unlock() + redisAccessMutex.Lock() + defer redisAccessMutex.Unlock() - key := "matchmaking_queue" - err := client.LRem(ctx, key, 1, username).Err() + err := client.LRem(ctx, matchmakingQueueRedisKey, 1, username).Err() if err != nil { log.Println("Error dequeuing user:", err) } @@ -77,8 +57,8 @@ func DequeueUser(client *redis.Client, username string, ctx context.Context) { // Add user into each specified topic set based on the topics selected by users func AddUserToTopicSets(client *redis.Client, request models.MatchRequest, ctx context.Context) { - mutex.Lock() - defer mutex.Unlock() + redisAccessMutex.Lock() + defer redisAccessMutex.Unlock() for _, topic := range request.Topics { err := client.SAdd(ctx, strings.ToLower(topic), request.Username).Err() @@ -90,8 +70,8 @@ func AddUserToTopicSets(client *redis.Client, request models.MatchRequest, ctx c // Remove user from each specified topic set based on the topics selected by users func RemoveUserFromTopicSets(client *redis.Client, username string, ctx context.Context) { - mutex.Lock() - defer mutex.Unlock() + redisAccessMutex.Lock() + defer redisAccessMutex.Unlock() request, err := GetUserDetails(client, username, ctx) if err != nil { @@ -109,8 +89,8 @@ func RemoveUserFromTopicSets(client *redis.Client, username string, ctx context. // Add user details into hashset in Redis func StoreUserDetails(client *redis.Client, request models.MatchRequest, ctx context.Context) { - mutex.Lock() - defer mutex.Unlock() + redisAccessMutex.Lock() + defer redisAccessMutex.Unlock() topicsJSON, err := json.Marshal(request.Topics) if err != nil { @@ -175,8 +155,8 @@ func GetUserDetails(client *redis.Client, username string, ctx context.Context) // Remove user details from HashSet func RemoveUserDetails(client *redis.Client, username string, ctx context.Context) { - mutex.Lock() - defer mutex.Unlock() + redisAccessMutex.Lock() + defer redisAccessMutex.Unlock() err := client.Del(ctx, username).Err() if err != nil { @@ -186,8 +166,8 @@ func RemoveUserDetails(client *redis.Client, username string, ctx context.Contex // Find the first matching user based on topics func FindMatchingUser(client *redis.Client, username string, ctx context.Context) (string, string, string, error) { - mutex.Lock() - defer mutex.Unlock() + redisAccessMutex.Lock() + defer redisAccessMutex.Unlock() user, err := GetUserDetails(client, username, ctx) if err != nil { @@ -207,7 +187,7 @@ func FindMatchingUser(client *redis.Client, username string, ctx context.Context return "", "", "", err } - commonDifficulty := GetCommonDifficulty(user.Difficulties, matchedUser.Difficulties) + commonDifficulty := models.GetCommonDifficulty(user.Difficulties, matchedUser.Difficulties) return potentialMatch, topic, commonDifficulty, nil } } @@ -216,72 +196,12 @@ func FindMatchingUser(client *redis.Client, username string, ctx context.Context return "", "", "", nil } -// Get the highest common difficulty between two users, if no common difficulty found, choose the min of the 2 arrays -func GetCommonDifficulty(userArr []string, matchedUserArr []string) string { - commonDifficulties := make([]int, 3) - for i := range commonDifficulties { - commonDifficulties[i] = 0 - } - - for _, difficulty := range userArr { - formattedDifficulty := strings.ToLower(difficulty) - switch formattedDifficulty { - case "easy": - commonDifficulties[0]++ - case "medium": - commonDifficulties[1]++ - case "hard": - commonDifficulties[2]++ - default: - log.Println("Unknown difficulty specified: " + difficulty) - } - } - - for _, difficulty := range matchedUserArr { - formattedDifficulty := strings.ToLower(difficulty) - switch formattedDifficulty { - case "easy": - commonDifficulties[0]++ - case "medium": - commonDifficulties[1]++ - case "hard": - commonDifficulties[2]++ - default: - log.Println("Unknown difficulty specified: " + difficulty) - } - } - - lowest := "Hard" - for i := 2; i >= 0; i-- { - if commonDifficulties[i] == 2 { - switch i { - case 0: - return "Easy" - case 1: - return "Medium" - case 2: - return "Hard" - } - } else if commonDifficulties[i] > 0 { - switch i { - case 0: - lowest = "Easy" - case 1: - lowest = "Medium" - case 2: - lowest = "Hard" - } - } - } - return lowest -} - func CleanUpUser(client *redis.Client, username string, ctx context.Context) { - mutex.Lock() - defer mutex.Unlock() + redisAccessMutex.Lock() + defer redisAccessMutex.Unlock() + // Dequeue user - key := "matchmaking_queue" - err := client.LRem(ctx, key, 1, username).Err() + err := client.LRem(ctx, matchmakingQueueRedisKey, 1, username).Err() if err != nil { log.Println("Error dequeuing user:", err) } @@ -309,19 +229,17 @@ func CleanUpUser(client *redis.Client, username string, ctx context.Context) { } func PopAndInsert(client *redis.Client, username string, ctx context.Context) { - // Acquire Lock - mutex.Lock() - defer mutex.Unlock() + redisAccessMutex.Lock() + defer redisAccessMutex.Unlock() // Pop user - username, err := client.LPop(ctx, "matchmaking_queue").Result() + username, err := client.LPop(ctx, matchmakingQueueRedisKey).Result() if err != nil { log.Println("Error popping user from queue:", err) } // Insert back in queue - key := "matchmaking_queue" - err = client.LPush(ctx, key, username).Err() + err = client.LPush(ctx, matchmakingQueueRedisKey, username).Err() if err != nil { log.Println("Error enqueuing user:", err) } diff --git a/apps/matching-service/processes/redis.go b/apps/matching-service/processes/redis.go new file mode 100644 index 0000000000..6c5240219c --- /dev/null +++ b/apps/matching-service/processes/redis.go @@ -0,0 +1,35 @@ +package processes + +import ( + "context" + "log" + "os" + + "github.com/redis/go-redis/v9" +) + +// SetupRedisClient sets-up the Redis client, and assigns it to a global variable +func SetupRedisClient() { + // Retrieve redis url env variable and setup the redis client + redisAddr := os.Getenv("REDIS_URL") + client := redis.NewClient(&redis.Options{ + Addr: redisAddr, + Password: "", // no password set + DB: 0, // use default DB + }) + + // Ping the redis server + _, err := client.Ping(context.Background()).Result() + if err != nil { + log.Fatalf("Could not connect to Redis: %v", err) + } else { + log.Println("Connected to Redis at the following address: " + redisAddr) + } + + redisClient = client +} + +// Get redisclient +func GetRedisClient() *redis.Client { + return redisClient +} \ No newline at end of file diff --git a/apps/matching-service/utils/match.go b/apps/matching-service/utils/match.go new file mode 100644 index 0000000000..e2060008ad --- /dev/null +++ b/apps/matching-service/utils/match.go @@ -0,0 +1,17 @@ +package utils + +import ( + "crypto/rand" + "encoding/hex" +) + +// To simulate generating a random matchID for collaboration service (TODO: Future) +func GenerateMatchID() (string, error) { + b := make([]byte, 16) // 16 bytes = 128 bits + _, err := rand.Read(b) + if err != nil { + return "", err + } + matchID := hex.EncodeToString(b) + return matchID, nil +}