Skip to content

Commit

Permalink
refactor: clean up files in matching-service
Browse files Browse the repository at this point in the history
  • Loading branch information
tituschewxj committed Oct 19, 2024
1 parent fc0fdac commit fea8537
Show file tree
Hide file tree
Showing 9 changed files with 217 additions and 236 deletions.
2 changes: 1 addition & 1 deletion apps/matching-service/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ go mod tidy
- `MATCH_TIMEOUT`: The time in seconds to wait for a match before timing out.
- `REDIS_URL`: The URL for the Redis server. Default is `localhost:6379`.

4. Start a local redis server:
4. Start a local Redis server:

```bash
docker run -d -p 6379:6379 redis
Expand Down
80 changes: 29 additions & 51 deletions apps/matching-service/handlers/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,44 +123,42 @@ func createTimeoutContext() (context.Context, context.CancelFunc, error) {
return ctx, cancel, nil
}

// Cleans up the data associated with the user before ending the websocket connection.
// If user is already removed, then nothing happens.
func cleanUpUser(username string) {
// Cleanup Redis
processes.CleanUpUser(processes.GetRedisClient(), username, context.Background())

// Removes the match context and active
if cancelFunc, exists := matchContexts[username]; exists {
cancelFunc()
delete(matchContexts, username)
}
if _, exists := activeConnections[username]; exists {
delete(activeConnections, username)
}
if _, exists := matchFoundChannels[username]; exists {
delete(matchFoundChannels, username)
}
}

// waitForResult waits for a match result, timeout, or cancellation.
func waitForResult(ws *websocket.Conn, ctx, timeoutCtx, matchCtx context.Context, matchFoundChan chan models.MatchFound, username string) {
select {
case <-ctx.Done():
log.Println("Matching cancelled")
// Cleanup Redis
processes.CleanUpUser(processes.GetRedisClient(), username, context.Background())
// Remove the match context and active
if _, exists := matchContexts[username]; exists {
delete(matchContexts, username)
}
if _, exists := activeConnections[username]; exists {
delete(activeConnections, username)
}
if _, exists := matchFoundChannels[username]; exists {
delete(matchFoundChannels, username)
}

cleanUpUser(username)
return
case <-timeoutCtx.Done():
log.Println("Connection timed out")
// Cleanup Redis
processes.CleanUpUser(processes.GetRedisClient(), username, context.Background())
// Remove the match context and active
if _, exists := matchContexts[username]; exists {
delete(matchContexts, username)
}
if _, exists := activeConnections[username]; exists {
delete(activeConnections, username)
}
if _, exists := matchFoundChannels[username]; exists {
delete(matchFoundChannels, username)
}

sendTimeoutResponse(ws)
cleanUpUser(username)
return
case <-matchCtx.Done():
log.Println("Match found for user: " + username)

// NOTE: user is already cleaned-up in the other process,
// so there is no need to clean up again.
return
case result, ok := <-matchFoundChan:
if !ok {
Expand All @@ -170,7 +168,10 @@ func waitForResult(ws *websocket.Conn, ctx, timeoutCtx, matchCtx context.Context
}
log.Println("Match found for user: " + username)
// Notify the users about the match
notifyMatch(result.User, result.MatchedUser, result)
notifyMatches(result.User, result.MatchedUser, result)

// NOTE: user and other user are already cleaned up in a separate matching algorithm process
// so no clean up is required here.
return
}
}
Expand All @@ -186,7 +187,7 @@ func sendTimeoutResponse(ws *websocket.Conn) {
}
}

func notifyMatch(username, matchedUsername string, result models.MatchFound) {
func notifyMatches(username, matchedUsername string, result models.MatchFound) {
mu.Lock()
defer mu.Unlock()

Expand All @@ -204,27 +205,4 @@ func notifyMatch(username, matchedUsername string, result models.MatchFound) {
log.Printf("Error sending message to user %s: %v\n", username, err)
}
}

// Remove the match context for both users and cancel for matched user
if cancelFunc, exists := matchContexts[username]; exists {
cancelFunc()
delete(matchContexts, username)
}

if cancelFunc2, exists := matchContexts[matchedUsername]; exists {
cancelFunc2()
delete(matchContexts, matchedUsername)
}

// Remove the match channels
if _, exists := matchFoundChannels[username]; exists {
delete(matchFoundChannels, username)
}
if _, exists := matchFoundChannels[matchedUsername]; exists {
delete(matchFoundChannels, matchedUsername)
}

// Remove users from the activeConnections map
delete(activeConnections, username)
delete(activeConnections, matchedUsername)
}
30 changes: 6 additions & 24 deletions apps/matching-service/main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"context"
"fmt"
"log"
"matching-service/handlers"
Expand All @@ -10,7 +9,6 @@ import (
"os"

"github.com/joho/godotenv"
"github.com/redis/go-redis/v9"
)

func main() {
Expand All @@ -19,33 +17,17 @@ func main() {
if err != nil {
log.Fatalf("err loading: %v", err)
}
port := os.Getenv("PORT")

// Retrieve redis url env variable and setup the redis client
redisAddr := os.Getenv("REDIS_URL")
client := redis.NewClient(&redis.Options{
Addr: redisAddr,
Password: "", // no password set
DB: 0, // use default DB
})

// Ping the redis server
_, err = client.Ping(context.Background()).Result()
if err != nil {
log.Fatalf("Could not connect to Redis: %v", err)
} else {
log.Println("Connected to Redis at the following address: " + redisAddr)
}

// Set redis client
processes.SetRedisClient(client)

// Setup redis client
processes.SetupRedisClient()

// Run a goroutine that matches users

// Routes
http.HandleFunc("/match", handlers.HandleWebSocketConnections)

// Start the server
port := os.Getenv("PORT")
log.Println(fmt.Sprintf("Server starting on :%s", port))
err = http.ListenAndServe(fmt.Sprintf(":%s", port), nil)
if err != nil {
Expand Down
67 changes: 67 additions & 0 deletions apps/matching-service/models/complexity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package models

import (
"log"
"strings"
)

// Get the highest common difficulty (aka complexity) between two users,
// If no common difficulty found, choose the min of the 2 arrays.
func GetCommonDifficulty(userArr []string, matchedUserArr []string) string {
commonDifficulties := make([]int, 3)
for i := range commonDifficulties {
commonDifficulties[i] = 0
}

for _, difficulty := range userArr {
formattedDifficulty := strings.ToLower(difficulty)
switch formattedDifficulty {
case "easy":
commonDifficulties[0]++
case "medium":
commonDifficulties[1]++
case "hard":
commonDifficulties[2]++
default:
log.Println("Unknown difficulty specified: " + difficulty)
}
}

for _, difficulty := range matchedUserArr {
formattedDifficulty := strings.ToLower(difficulty)
switch formattedDifficulty {
case "easy":
commonDifficulties[0]++
case "medium":
commonDifficulties[1]++
case "hard":
commonDifficulties[2]++
default:
log.Println("Unknown difficulty specified: " + difficulty)
}
}

lowest := "Hard"
for i := 2; i >= 0; i-- {
if commonDifficulties[i] == 2 {
switch i {
case 0:
return "Easy"
case 1:
return "Medium"
case 2:
return "Hard"
}
} else if commonDifficulties[i] > 0 {
switch i {
case 0:
lowest = "Easy"
case 1:
lowest = "Medium"
case 2:
lowest = "Hard"
}
}
}
return lowest
}
17 changes: 17 additions & 0 deletions apps/matching-service/processes/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package processes

import (
"context"
"sync"

"github.com/redis/go-redis/v9"
)

const matchmakingQueueRedisKey = "matchmaking_queue"

var (
redisClient *redis.Client
matchingRoutineMutex sync.Mutex // Mutex to ensure only one matchmaking goroutine is running
redisAccessMutex sync.Mutex // Mutex for Redis access for concurrency safety
ctx = context.Background()
)
67 changes: 17 additions & 50 deletions apps/matching-service/processes/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,62 +2,26 @@ package processes

import (
"context"
"fmt"
"log"
"matching-service/models"
"strconv"
"strings"
"sync"
"matching-service/utils"
"time"

"github.com/redis/go-redis/v9"
)

var (
redisClient *redis.Client
mu sync.Mutex // Mutex to ensure only one matchmaking goroutine is running
ctx = context.Background()
)

// SetRedisClient sets the Redis client to a global variable
func SetRedisClient(client *redis.Client) {
redisClient = client
}

// Get redisclient
func GetRedisClient() *redis.Client {
return redisClient
}

func getPortNumber(addr string) (int64, error) {
// Split the string by the colon
parts := strings.Split(addr, ":")
if len(parts) < 2 {
return 0, fmt.Errorf("no port number found")
}

// Convert the port string to an integer
port, err := strconv.ParseInt(parts[len(parts)-1], 10, 64)
if err != nil {
return 0, err // Return an error if conversion fails
}

return port, nil
}

func PerformMatching(matchRequest models.MatchRequest, ctx context.Context, matchFoundChannels map[string]chan models.MatchFound) {
// Acquire mutex
mu.Lock()
matchingRoutineMutex.Lock()
// Defer unlocking the mutex
defer mu.Unlock()
defer matchingRoutineMutex.Unlock()

for {

// Log queue before matchmaking
// PrintMatchingQueue(redisClient, "Before Matchmaking", context.Background())

// Check if the queue is empty
queueLength, err := redisClient.LLen(context.Background(), "matchmaking_queue").Result()
queueLength, err := redisClient.LLen(context.Background(), matchmakingQueueRedisKey).Result()
if err != nil {
log.Println("Error checking queue length:", err)
time.Sleep(1 * time.Second)
Expand All @@ -71,7 +35,7 @@ func PerformMatching(matchRequest models.MatchRequest, ctx context.Context, matc
}

// Peek at the user queue
username, err := redisClient.LIndex(context.Background(), "matchmaking_queue", 0).Result()
username, err := redisClient.LIndex(context.Background(), matchmakingQueueRedisKey, 0).Result()
if err != nil {
log.Println("Error peeking user from queue:", err)
time.Sleep(1 * time.Second)
Expand All @@ -93,19 +57,15 @@ func PerformMatching(matchRequest models.MatchRequest, ctx context.Context, matc
// Log down which users got matched
log.Printf("Users %s and %s matched on the topic: %s with difficulty: %s", username, matchedUsername, matchedTopic, matchedDifficulty)

// Clean up queue, sets and hashset in Redis
DequeueUser(redisClient, username, ctx)
DequeueUser(redisClient, matchedUsername, ctx)
RemoveUserFromTopicSets(redisClient, username, ctx)
RemoveUserFromTopicSets(redisClient, matchedUsername, ctx)
RemoveUserDetails(redisClient, username, ctx)
RemoveUserDetails(redisClient, matchedUsername, ctx)

// Log queue after matchmaking
PrintMatchingQueue(redisClient, "After Matchmaking", context.Background())

// Clean up redis for this match
cleanUp(redisClient, username, ctx)
cleanUp(redisClient, matchedUsername, ctx)

// Generate a random match ID
matchId, err := GenerateMatchID()
matchId, err := utils.GenerateMatchID()
if err != nil {
log.Println("Unable to randomly generate matchID")
}
Expand All @@ -128,3 +88,10 @@ func PerformMatching(matchRequest models.MatchRequest, ctx context.Context, matc
}
}
}

// Clean up queue, sets and hashset in Redis
func cleanUp(redisClient *redis.Client, username string, ctx context.Context) {
DequeueUser(redisClient, username, ctx)
RemoveUserFromTopicSets(redisClient, username, ctx)
RemoveUserDetails(redisClient, username, ctx)
}
Loading

0 comments on commit fea8537

Please sign in to comment.