diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 0d2a1ad306..877de8eaf8 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -36,6 +36,7 @@ jobs: MATCHING_SERVICE_PORT: ${{ vars.MATCHING_SERVICE_PORT }} MATCHING_SERVICE_TIMEOUT: ${{ vars.MATCHING_SERVICE_TIMEOUT }} REDIS_URL: ${{ vars.REDIS_URL }} + QUESTION_SERVICE_GRPC_URL: ${{ vars.QUESTION_SERVICE_GPRC_URL }} run: | cd ./apps/frontend echo "NEXT_PUBLIC_QUESTION_SERVICE_URL=$QUESTION_SERVICE_URL" >> .env @@ -56,6 +57,7 @@ jobs: echo "MATCH_TIMEOUT=$MATCHING_SERVICE_TIMEOUT" >> .env echo "JWT_SECRET=$JWT_SECRET" >> .env echo "REDIS_URL=$REDIS_URL" >> .env + echo "QUESTION_SERVICE_GRPC_URL=$QUESTION_SERVICE_GRPC_URL" >> .env - name: Create Database Credential Files env: diff --git a/apps/README.md b/apps/README.md index 18525dd4f6..ae12a36fc1 100644 --- a/apps/README.md +++ b/apps/README.md @@ -56,7 +56,7 @@ Once running, you can access: - The **frontend** at http://localhost:3000 - The **user service** at http://localhost:3001 -- The **question service** at http://localhost:8080 +- The **question service** at http://localhost:8080 (REST) and http://localhost:50051 (gRPC) - The **matching service** at http://localhost:8081 - The **redis service** at http://localhost:6379 diff --git a/apps/docker-compose.yml b/apps/docker-compose.yml index e69e2bcc65..a8ee1986f9 100644 --- a/apps/docker-compose.yml +++ b/apps/docker-compose.yml @@ -31,6 +31,7 @@ services: dockerfile: Dockerfile ports: - 8080:8080 + - 50051:50051 env_file: - ./question-service/.env networks: diff --git a/apps/frontend/src/app/matching/modalContent/FindMatchContent.tsx b/apps/frontend/src/app/matching/modalContent/FindMatchContent.tsx index ac65cbe033..383f049385 100644 --- a/apps/frontend/src/app/matching/modalContent/FindMatchContent.tsx +++ b/apps/frontend/src/app/matching/modalContent/FindMatchContent.tsx @@ -94,8 +94,8 @@ const DifficultySelector: React.FC = ({ selectedDifficu handleChange(difficultyOption.label)} + checked={selectedDifficulties.includes(difficultyOption.value)} + onChange={() => handleChange(difficultyOption.value)} > {difficultyOption.label} diff --git a/apps/frontend/src/app/services/use-matching.ts b/apps/frontend/src/app/services/use-matching.ts index ce9d463b53..4a37c9f153 100644 --- a/apps/frontend/src/app/services/use-matching.ts +++ b/apps/frontend/src/app/services/use-matching.ts @@ -17,17 +17,15 @@ export type MatchRequestParams = { } export type MatchFoundResponse = { - type: "match_found", - matchId: number, - partnerId: number, - partnerName: string, -} | { - type: "match_found", - matchId: string, + type: "match_question_found", + match_id: string, user: string, - matchedUser: string, - topic: string | string[], - difficulty: string + matched_user: string, + matched_topics: string[], + question_doc_ref_id: string, + question_name: string, + question_difficulty: string, + question_topics: string[], } export type MatchTimeoutResponse = { @@ -61,7 +59,7 @@ export default function useMatching(): MatchState { return; } - if (responseJson.type == "match_found") { + if (responseJson.type == "match_question_found") { setIsSocket(false); const info: MatchInfo = parseInfoFromResponse(responseJson); @@ -136,20 +134,9 @@ export default function useMatching(): MatchState { } function parseInfoFromResponse(responseJson: MatchFoundResponse): MatchInfo { - // test whether old or new - if ("partnerId" in responseJson) { - return { - matchId: responseJson.matchId?.toString() ?? "unknown", - partnerId: responseJson.partnerId?.toString() ?? "unknown", - partnerName: responseJson.partnerName ?? "unknown", - myName: "unknown", - }; - } else { - return { - matchId: responseJson.matchId?.toString() ?? "unknown", - partnerId: "unknown", - partnerName: responseJson.matchedUser ?? "unknown", - myName: responseJson.user ?? "unknown", - }; - } + return { + matchId: responseJson.match_id?.toString() ?? "unknown", + partnerName: responseJson.matched_user ?? "unknown", + myName: responseJson.user ?? "unknown", + }; } diff --git a/apps/frontend/src/contexts/websocketcontext.tsx b/apps/frontend/src/contexts/websocketcontext.tsx index fe0c95624d..84893c15ed 100644 --- a/apps/frontend/src/contexts/websocketcontext.tsx +++ b/apps/frontend/src/contexts/websocketcontext.tsx @@ -15,7 +15,6 @@ export type SocketState = { }; export type MatchInfo = { matchId: string; - partnerId: string; myName: string; partnerName: string; } diff --git a/apps/matching-service/.env.example b/apps/matching-service/.env.example index 8fc9a4032a..50dc113f63 100644 --- a/apps/matching-service/.env.example +++ b/apps/matching-service/.env.example @@ -2,8 +2,10 @@ PORT=8081 MATCH_TIMEOUT=30 JWT_SECRET=you-can-replace-this-with-your-own-secret -# if you are NOT USING docker, use the below url +# If you are NOT USING docker, use the below variables REDIS_URL=localhost:6379 +QUESTION_SERVICE_GRPC_URL=localhost:50051 -# if you are USING docker, use the below url +# If you are USING docker, use the below variables # REDIS_URL=redis-container:6379 +# QUESTION_SERVICE_GRPC_URL=question-service:50051 diff --git a/apps/matching-service/README.md b/apps/matching-service/README.md index 641a890fe5..92b957c7b0 100644 --- a/apps/matching-service/README.md +++ b/apps/matching-service/README.md @@ -75,8 +75,8 @@ Client sends matching parameters: { "type": "match_request", "topics": ["Algorithms", "Arrays"], - "difficulties": ["Easy", "Medium"], - "username": "Jane Doe" + "difficulties": ["easy", "medium"], + "username": "1f0myn" } ``` @@ -84,12 +84,15 @@ Server response on successful match: ```json { - "type": "match_found", - "matchId": "1c018916a34c5bee21af0b2670bd6156", - "user": "zkb4px", - "matchedUser": "JohnDoe", - "topic": "Algorithms", - "difficulty": "Medium" + "type": "match_question_found", + "match_id": "c377f463d380a9bd1dd03242892ef32e", + "user": "1f0myn", + "matched_user": "jrsznp", + "matched_topics": ["Graphs", "Bit Manipulation", "Databases"], + "question_doc_ref_id": "5lObMfyyKPgNXSuLcGEm", + "question_name": "Repeated DNA Sequences", + "question_difficulty": "medium", + "question_topics": ["Algorithms", "Bit Manipulation"] } ``` @@ -128,21 +131,27 @@ Before running the following commands, ensure that the URL for the Redis server To run the application via Docker, run the following command: 1. Set up the Go Docker container for the matching service + ```bash docker build -f Dockerfile -t match-go-app . ``` 2. Create the Docker network for Redis and Go + ```bash docker network create redis-go-network ``` 3. Start a new Redis container in detached mode using the Redis image from Docker Hub + ```bash docker run -d --name redis-container --network redis-go-network redis ``` 4. Run the Go Docker container for the matching-service + ```bash docker run -d -p 8081:8081 --name go-app-container --network redis-go-network match-go-app ``` + +**NOTE:** As there is a dependency on the question-service to return the found questions, the matching-service does not work fully unless the question-service is present. diff --git a/apps/matching-service/databases/topic.go b/apps/matching-service/databases/topic.go new file mode 100644 index 0000000000..144a1a47ce --- /dev/null +++ b/apps/matching-service/databases/topic.go @@ -0,0 +1,36 @@ +package databases + +import ( + "context" + "log" + "matching-service/models" + "strings" + + "github.com/redis/go-redis/v9" +) + +// Add user into each specified topic set based on the topics selected by users +func AddUserToTopicSets(tx *redis.Tx, request models.MatchRequest, ctx context.Context) { + for _, topic := range request.Topics { + err := tx.SAdd(ctx, strings.ToLower(topic), request.Username).Err() + if err != nil { + log.Println("Error adding user to topic set:", err) + } + } +} + +// Remove user from each specified topic set based on the topics selected by users +func RemoveUserFromTopicSets(tx *redis.Tx, username string, ctx context.Context) { + request, err := GetUserDetails(tx, username, ctx) + if err != nil { + log.Println("Error retrieving user from hashset:", err) + return + } + + for _, topic := range request.Topics { + err := tx.SRem(ctx, strings.ToLower(topic), request.Username).Err() + if err != nil { + log.Println("Error removing user from topic set:", err) + } + } +} diff --git a/apps/matching-service/databases/user.go b/apps/matching-service/databases/user.go new file mode 100644 index 0000000000..459447b96a --- /dev/null +++ b/apps/matching-service/databases/user.go @@ -0,0 +1,22 @@ +package databases + +import ( + "context" + "matching-service/models" + + "github.com/redis/go-redis/v9" +) + +// Clean up queue, sets and hashset in Redis +func CleanUpUser(tx *redis.Tx, username string, ctx context.Context) { + DequeueUser(tx, username, ctx) + RemoveUserFromTopicSets(tx, username, ctx) + RemoveUserDetails(tx, username, ctx) +} + +// Adds the user to the queue, sets and hashsets in Redis +func AddUser(tx *redis.Tx, matchRequest models.MatchRequest, ctx context.Context) { + EnqueueUser(tx, matchRequest.Username, ctx) + AddUserToTopicSets(tx, matchRequest, ctx) + StoreUserDetails(tx, matchRequest, ctx) +} diff --git a/apps/matching-service/databases/userqueue.go b/apps/matching-service/databases/userqueue.go new file mode 100644 index 0000000000..9c71533972 --- /dev/null +++ b/apps/matching-service/databases/userqueue.go @@ -0,0 +1,179 @@ +package databases + +import ( + "context" + "encoding/json" + "fmt" + "log" + "matching-service/models" + "matching-service/servers" + "strings" + + "github.com/redis/go-redis/v9" +) + +// Print existing users in the matching queue +func PrintMatchingQueue(tx *redis.Tx, status string, ctx context.Context) { + users, err := GetAllQueuedUsers(tx, ctx) + if err != nil { + return + } + + var concatenatedUsers strings.Builder + for i, user := range users { + concatenatedUsers.WriteString(user) + if i != len(users)-1 { + concatenatedUsers.WriteString(", ") + } + } + + log.Println("Redis Queue (" + status + "): " + concatenatedUsers.String()) +} + +func IsQueueEmpty(tx *redis.Tx, ctx context.Context) (bool, error) { + queueLength, err := tx.LLen(ctx, servers.MatchmakingQueueRedisKey).Result() + if err != nil { + log.Println("Error checking queue length:", err) + return false, err + } + // No users in the queue, so no need to perform matching + return queueLength == 0, nil +} + +// Enqueue a user into the matchmaking queue +func EnqueueUser(tx *redis.Tx, username string, ctx context.Context) { + err := tx.LPush(ctx, servers.MatchmakingQueueRedisKey, username).Err() + if err != nil { + log.Println("Error enqueuing user:", err) + } +} + +// Remove user from the matchmaking queue +func DequeueUser(tx *redis.Tx, username string, ctx context.Context) { + err := tx.LRem(ctx, servers.MatchmakingQueueRedisKey, 1, username).Err() + if err != nil { + log.Println("Error dequeuing user:", err) + return + } +} + +// Returns the first user's username from the queue. +func GetFirstUser(tx *redis.Tx, ctx context.Context) (string, error) { + // Peek at the user queue + username, err := tx.LIndex(ctx, servers.MatchmakingQueueRedisKey, 0).Result() + if err != nil { + log.Println("Error peeking user from queue:", err) + return "", err + } + return username, nil +} + +// Return the usernames of all the queued users. +func GetAllQueuedUsers(tx *redis.Tx, ctx context.Context) ([]string, error) { + users, err := tx.LRange(ctx, servers.MatchmakingQueueRedisKey, 0, -1).Result() + if err != nil { + log.Println("Error retrieving users from queue:", err) + return nil, err + } + return users, nil +} + +func ValidateNotDuplicateUser(tx *redis.Tx, ctx context.Context, currentUsername string) error { + queuedUsernames, err := GetAllQueuedUsers(tx, ctx) + if err != nil { + return err + } + + // Check that user is not part of the existing queue + for _, username := range queuedUsernames { + if username == currentUsername { + return models.ExistingUserError + } + } + return nil +} + +// Add user details into hashset in Redis +func StoreUserDetails(tx *redis.Tx, request models.MatchRequest, ctx context.Context) { + topicsJSON, err := json.Marshal(request.Topics) + if err != nil { + log.Println("Error marshalling topics:", err) + return + } + + difficultiesJSON, err := json.Marshal(request.Difficulties) + if err != nil { + log.Println("Error marshalling difficulties:", err) + return + } + + err = tx.HSet(ctx, request.Username, map[string]interface{}{ + "topics": topicsJSON, + "difficulty": difficultiesJSON, + "username": request.Username, + }).Err() + if err != nil { + log.Println("Error storing user details:", err) + } +} + +// Retrieve user details from hashset in Redis +func GetUserDetails(tx *redis.Tx, username string, ctx context.Context) (models.MatchRequest, error) { + userDetails, err := tx.HGetAll(ctx, username).Result() + if err != nil { + return models.MatchRequest{}, err + } + + if len(userDetails) == 0 { + return models.MatchRequest{}, fmt.Errorf("user not found in hashset: %s", username) + } + + topicsJSON, topicsExist := userDetails["topics"] + difficultiesJSON, difficultiesExist := userDetails["difficulty"] + + if !topicsExist || !difficultiesExist { + return models.MatchRequest{}, fmt.Errorf("incomplete user details for: %s", username) + } + + var topics []string + err = json.Unmarshal([]byte(topicsJSON), &topics) + if err != nil { + return models.MatchRequest{}, fmt.Errorf("error unmarshalling topics: %v", err) + } + + var difficulties []string + err = json.Unmarshal([]byte(difficultiesJSON), &difficulties) + if err != nil { + return models.MatchRequest{}, fmt.Errorf("error unmarshalling difficulties: %v", err) + } + + matchRequest := models.MatchRequest{ + Topics: topics, + Difficulties: difficulties, + Username: username, + } + + return matchRequest, nil +} + +// Remove user details from HashSet +func RemoveUserDetails(tx *redis.Tx, username string, ctx context.Context) { + err := tx.Del(ctx, username).Err() + if err != nil { + log.Println("Error removing user details:", err) + } +} + +func PopAndInsertUser(tx *redis.Tx, username string, ctx context.Context) { + // Pop user + username, err := tx.LPop(ctx, servers.MatchmakingQueueRedisKey).Result() + if err != nil { + log.Println("Error popping user from queue:", err) + } + + // Insert back in queue + err = tx.LPush(ctx, servers.MatchmakingQueueRedisKey, username).Err() + if err != nil { + log.Println("Error enqueuing user:", err) + } +} diff --git a/apps/matching-service/go.mod b/apps/matching-service/go.mod index 89ad38d6b5..e4ee0c799d 100644 --- a/apps/matching-service/go.mod +++ b/apps/matching-service/go.mod @@ -3,12 +3,19 @@ module matching-service go 1.23.1 require ( + github.com/bsm/redislock v0.9.4 github.com/gorilla/websocket v1.5.3 github.com/joho/godotenv v1.5.1 github.com/redis/go-redis/v9 v9.6.2 + google.golang.org/grpc v1.67.1 + google.golang.org/protobuf v1.35.1 ) require ( - github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + golang.org/x/net v0.28.0 // indirect + golang.org/x/sys v0.24.0 // indirect + golang.org/x/text v0.17.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 // indirect ) diff --git a/apps/matching-service/go.sum b/apps/matching-service/go.sum index 722db5bc36..aff4c057a6 100644 --- a/apps/matching-service/go.sum +++ b/apps/matching-service/go.sum @@ -2,13 +2,29 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= -github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= -github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/bsm/redislock v0.9.4 h1:X/Wse1DPpiQgHbVYRE9zv6m070UcKoOGekgvpNhiSvw= +github.com/bsm/redislock v0.9.4/go.mod h1:Epf7AJLiSFwLCiZcfi6pWFO/8eAYrYpQXFxEDPoDeAk= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/redis/go-redis/v9 v9.6.2 h1:w0uvkRbc9KpgD98zcvo5IrVUsn0lXpRMuhNgiHDJzdk= github.com/redis/go-redis/v9 v9.6.2/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= +golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= +golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 h1:e7S5W7MGGLaSu8j3YjdezkZ+m1/Nm0uRVRMEMGk26Xs= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= +google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E= +google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= +google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= +google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= diff --git a/apps/matching-service/handlers/responses.go b/apps/matching-service/handlers/responses.go new file mode 100644 index 0000000000..1baa208473 --- /dev/null +++ b/apps/matching-service/handlers/responses.go @@ -0,0 +1,44 @@ +package handlers + +import ( + "log" + "matching-service/models" + + "github.com/gorilla/websocket" +) + +// sendTimeoutResponse sends a timeout message to the WebSocket client. +func sendTimeoutResponse(ws *websocket.Conn) { + result := models.Timeout{ + Type: "timeout", + Message: "No match found. Please try again later.", + } + if err := ws.WriteJSON(result); err != nil { + log.Printf("write error: %v", err) + } +} + +func sendDuplicateUserRejectionResponse(ws *websocket.Conn) { + if err := ws.WriteJSON(models.MatchRejected{ + Type: "match_rejected", + Message: "You are already in a matchmaking queue. Please disconnect before reconnecting.", + }); err != nil { + log.Printf("write error: %v", err) + } +} + +func sendDefaultRejectionResponse(ws *websocket.Conn) { + if err := ws.WriteJSON(models.MatchRejected{ + Type: "match_rejected", + Message: "An unexpected error occurred. Please try again later.", + }); err != nil { + log.Printf("write error: %v", err) + } +} + +// Send message to matched user +func sendMatchFoundResponse(ws *websocket.Conn, username string, result models.MatchQuestionFound) { + if err := ws.WriteJSON(result); err != nil { + log.Printf("Error sending message to user %s: %v\n", username, err) + } +} diff --git a/apps/matching-service/handlers/websocket.go b/apps/matching-service/handlers/websocket.go index 7432ab3a8e..6b28340c1d 100644 --- a/apps/matching-service/handlers/websocket.go +++ b/apps/matching-service/handlers/websocket.go @@ -2,15 +2,18 @@ package handlers import ( "context" + "encoding/json" + "errors" "log" + "matching-service/databases" "matching-service/models" "matching-service/processes" + "matching-service/servers" "matching-service/utils" "net/http" - "strings" - "sync" "github.com/gorilla/websocket" + "github.com/redis/go-redis/v9" ) var ( @@ -20,17 +23,13 @@ var ( return true }, } - // A map to hold active WebSocket connections per username - activeConnections = make(map[string]*websocket.Conn) - // A map to hold user's match ctx cancel function - matchContexts = make(map[string]context.CancelFunc) - // A map to hold user's match channels - matchFoundChannels = make(map[string]chan models.MatchFound) - mu sync.Mutex // Mutex for thread-safe access to activeConnections ) // handleConnections manages WebSocket connections and matching logic. func HandleWebSocketConnections(w http.ResponseWriter, r *http.Request) { + rdb := servers.GetRedisClient() + ctx := context.Background() + // TODO: Parse the authorization header to validate the JWT token and get the user ID claim. ws, err := upgrader.Upgrade(w, r, nil) @@ -45,7 +44,7 @@ func HandleWebSocketConnections(w http.ResponseWriter, r *http.Request) { } }() - log.Println("WebSocket connection established") + log.Printf("WebSocket connection established for port %s", utils.ExtractWebsocketPort(ws)) matchRequest, err := readMatchRequest(ws) if err != nil { @@ -53,36 +52,19 @@ func HandleWebSocketConnections(w http.ResponseWriter, r *http.Request) { return } - // Store WebSocket connection in the activeConnections map. - mu.Lock() - // Checks if user is already an existing websocket connection - if _, exists := activeConnections[matchRequest.Username]; exists { - mu.Unlock() - log.Printf("User %s is already connected, rejecting new connection.", matchRequest.Username) - ws.WriteJSON(models.MatchRejected{ - Type: "match_rejected", - Message: "You are already in a matchmaking queue. Please disconnect before reconnecting.", - }) - ws.Close() - return - } - activeConnections[matchRequest.Username] = ws - matchCtx, matchCancel := context.WithCancel(context.Background()) - matchContexts[matchRequest.Username] = matchCancel + // Subscribes to a channel that returns a message if a match is found + matchFoundPubsub := rdb.Subscribe(ctx, matchRequest.Username) + defer matchFoundPubsub.Close() - matchFoundChan := make(chan models.MatchFound) - matchFoundChannels[matchRequest.Username] = matchFoundChan - mu.Unlock() + // Create a context for user cancellation + userCtx, userCancel := context.WithCancel(ctx) + defer userCancel() // Ensure cancel is called to release resources - // Create a context for cancellation - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() // Ensure cancel is called to release resources + // Create channel for handling errors + errorChan := make(chan error) - processes.EnqueueUser(processes.GetRedisClient(), matchRequest.Username, ctx) - processes.AddUserToTopicSets(processes.GetRedisClient(), matchRequest, ctx) - processes.StoreUserDetails(processes.GetRedisClient(), matchRequest, ctx) - - timeoutCtx, timeoutCancel, err := createTimeoutContext() + // Create a context for matching timeout + timeoutCtx, timeoutCancel, err := utils.CreateTimeoutContext() if err != nil { log.Printf("Error creating timeout context: %v", err) return @@ -90,11 +72,11 @@ func HandleWebSocketConnections(w http.ResponseWriter, r *http.Request) { defer timeoutCancel() // Start goroutines for handling messages and performing matching. - go processes.ReadMessages(ws, ctx, cancel) - go processes.PerformMatching(matchRequest, context.Background(), matchFoundChannels) // Perform matching + go processes.ReadMessages(ws, userCancel) + go processes.PerformMatching(rdb, matchRequest, ctx, errorChan) // Wait for a match, timeout, or cancellation. - waitForResult(ws, ctx, timeoutCtx, matchCtx, matchFoundChan, matchRequest.Username) + waitForResult(ws, userCtx, timeoutCtx, matchFoundPubsub, errorChan, matchRequest.Username) } // readMatchRequest reads the initial match request from the WebSocket connection. @@ -103,108 +85,67 @@ func readMatchRequest(ws *websocket.Conn) (models.MatchRequest, error) { if err := ws.ReadJSON(&matchRequest); err != nil { return matchRequest, err } - // Get the remote address (client's IP and port) - clientAddr := ws.RemoteAddr().String() - - // Extract the port (after the last ':') - clientPort := clientAddr[strings.LastIndex(clientAddr, ":")+1:] - log.Printf("Received match request: %v from client port: %s", matchRequest, clientPort) + log.Printf("Received match request: %v from client port: %s", matchRequest, utils.ExtractWebsocketPort(ws)) return matchRequest, nil } -// createTimeoutContext sets up a timeout context based on configuration. -func createTimeoutContext() (context.Context, context.CancelFunc, error) { - timeoutDuration, err := utils.GetTimeoutDuration() - if err != nil { - return nil, nil, err - } - ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration) - return ctx, cancel, nil -} - // Cleans up the data associated with the user before ending the websocket connection. // If user is already removed, then nothing happens. +// This function is unaffected by the external context. func cleanUpUser(username string) { - // Cleanup Redis - processes.CleanUpUser(processes.GetRedisClient(), username, context.Background()) + rdb := servers.GetRedisClient() + ctx := context.Background() - // Removes the match context and active - if cancelFunc, exists := matchContexts[username]; exists { - cancelFunc() - delete(matchContexts, username) - } - if _, exists := activeConnections[username]; exists { - delete(activeConnections, username) - } - if _, exists := matchFoundChannels[username]; exists { - delete(matchFoundChannels, username) + // Obtain lock with retry + lock, err := servers.ObtainRedisLock(ctx) + if err != nil { + return } + defer lock.Release(ctx) + + // Cleanup Redis + databases.CleanUpUser((*redis.Tx)(rdb.Conn()), username, ctx) } // waitForResult waits for a match result, timeout, or cancellation. -func waitForResult(ws *websocket.Conn, ctx, timeoutCtx, matchCtx context.Context, matchFoundChan chan models.MatchFound, username string) { +func waitForResult(ws *websocket.Conn, userCtx, timeoutCtx context.Context, matchFoundPubsub *redis.PubSub, errorChan chan error, username string) { select { - case <-ctx.Done(): - log.Println("Matching cancelled") + case <-userCtx.Done(): + log.Printf("Matching cancelled for port %v", utils.ExtractWebsocketPort(ws)) cleanUpUser(username) return case <-timeoutCtx.Done(): - log.Println("Connection timed out") + log.Printf("Connection timed out for port %v", utils.ExtractWebsocketPort(ws)) sendTimeoutResponse(ws) cleanUpUser(username) return - case <-matchCtx.Done(): - log.Println("Match found for user: " + username) - - // NOTE: user is already cleaned-up in the other process, - // so there is no need to clean up again. - return - case result, ok := <-matchFoundChan: + case err, ok := <-errorChan: if !ok { - // Channel closed without a match, possibly due to context cancellation - log.Println("Match channel closed without finding a match") return } - log.Println("Match found for user: " + username) - // Notify the user about the match - notifyMatches(result.User, result) - - // cleaning up from the global maps used still required - if _, exists := matchContexts[username]; exists { - delete(matchContexts, username) + log.Printf("Error occured performing matching: %v", err) + if errors.Is(err, models.ExistingUserError) { + sendDuplicateUserRejectionResponse(ws) + } else { + sendDefaultRejectionResponse(ws) + cleanUpUser(username) } - if _, exists := activeConnections[username]; exists { - delete(activeConnections, username) + return + case msg, ok := <-matchFoundPubsub.Channel(): + if !ok { + return } - if _, exists := matchFoundChannels[username]; exists { - delete(matchFoundChannels, username) + var result models.MatchQuestionFound + // Unmarshal the JSON message into the struct + err := json.Unmarshal([]byte(msg.Payload), &result) + if err != nil { + log.Printf("Error unmarshaling JSON: %v", err) + return } + // Notify the user about the match + sendMatchFoundResponse(ws, result.User, result) return } } - -// sendTimeoutResponse sends a timeout message to the WebSocket client. -func sendTimeoutResponse(ws *websocket.Conn) { - result := models.Timeout{ - Type: "timeout", - Message: "No match found. Please try again later.", - } - if err := ws.WriteJSON(result); err != nil { - log.Printf("write error: %v", err) - } -} - -// Notify matches -func notifyMatches(username string, result models.MatchFound) { - mu.Lock() - defer mu.Unlock() - - // Send message to matched user - if userConn, userExists := activeConnections[username]; userExists { - if err := userConn.WriteJSON(result); err != nil { - log.Printf("Error sending message to user %s: %v\n", username, err) - } - } -} diff --git a/apps/matching-service/main.go b/apps/matching-service/main.go index 678ec5312c..7c612f1522 100644 --- a/apps/matching-service/main.go +++ b/apps/matching-service/main.go @@ -4,7 +4,7 @@ import ( "fmt" "log" "matching-service/handlers" - "matching-service/processes" + "matching-service/servers" "net/http" "os" @@ -12,24 +12,30 @@ import ( ) func main() { - // Set up environment + setUpEnvironment() + client := servers.SetupRedisClient() + defer client.Close() + grpcClient := servers.InitGrpcServer() + defer grpcClient.Close() + setupRoutes() + startServer() +} + +func setUpEnvironment() { err := godotenv.Load() if err != nil { log.Fatalf("err loading: %v", err) } +} - // Setup redis client - processes.SetupRedisClient() - - // Run a goroutine that matches users - - // Routes +func setupRoutes() { http.HandleFunc("/match", handlers.HandleWebSocketConnections) +} - // Start the server +func startServer() { port := os.Getenv("PORT") log.Println(fmt.Sprintf("Server starting on :%s", port)) - err = http.ListenAndServe(fmt.Sprintf(":%s", port), nil) + err := http.ListenAndServe(fmt.Sprintf(":%s", port), nil) if err != nil { log.Fatal("ListenAndServe: ", err) } diff --git a/apps/matching-service/models/complexity.go b/apps/matching-service/models/difficulty.go similarity index 100% rename from apps/matching-service/models/complexity.go rename to apps/matching-service/models/difficulty.go diff --git a/apps/matching-service/models/error.go b/apps/matching-service/models/error.go new file mode 100644 index 0000000000..76f5dca975 --- /dev/null +++ b/apps/matching-service/models/error.go @@ -0,0 +1,6 @@ +package models + +import "errors" + +var ExistingUserError = errors.New("already has an existing user in matchmaking") +var NoMatchFound = errors.New("no matches found for current user") diff --git a/apps/matching-service/models/match.go b/apps/matching-service/models/match.go index ad555b555b..220febb713 100644 --- a/apps/matching-service/models/match.go +++ b/apps/matching-service/models/match.go @@ -8,12 +8,24 @@ type MatchRequest struct { } type MatchFound struct { - Type string `json:"type"` - MatchID string `json:"matchId"` - User string `json:"user"` // username - MatchedUser string `json:"matchedUser"` // matched username - Topic string `json:"topic"` // matched topic - Difficulty string `json:"difficulty"` // matched difficulty + Type string `json:"type"` + MatchID string `json:"match_id"` + User string `json:"user"` + MatchedUser string `json:"matched_user"` + MatchedTopics []string `json:"matched_topics"` + MatchedDifficulties []string `json:"matched_difficulties"` +} + +type MatchQuestionFound struct { + Type string `json:"type"` + MatchID string `json:"match_id"` + User string `json:"user"` + MatchedUser string `json:"matched_user"` + MatchedTopics []string `json:"matched_topics"` + QuestionDocRefID string `json:"question_doc_ref_id"` + QuestionName string `json:"question_name"` + QuestionDifficulty string `json:"question_difficulty"` + QuestionTopics []string `json:"question_topics"` } type Timeout struct { diff --git a/apps/matching-service/models/question.go b/apps/matching-service/models/question.go new file mode 100644 index 0000000000..c5bd7de721 --- /dev/null +++ b/apps/matching-service/models/question.go @@ -0,0 +1,13 @@ +package models + +type MatchQuestionRequest struct { + MatchedTopics []string `json:"matched_topics"` + MatchedDifficulties []string `json:"matched_difficulties"` +} + +type QuestionFound struct { + QuestionDocRefID string `json:"question_doc_ref_id"` + QuestionName string `json:"question_name"` + QuestionDifficulty string `json:"question_difficulty"` + QuestionTopics []string `json:"question_topics"` +} diff --git a/apps/matching-service/processes/constants.go b/apps/matching-service/processes/constants.go deleted file mode 100644 index eb53b5879e..0000000000 --- a/apps/matching-service/processes/constants.go +++ /dev/null @@ -1,17 +0,0 @@ -package processes - -import ( - "context" - "sync" - - "github.com/redis/go-redis/v9" -) - -const matchmakingQueueRedisKey = "matchmaking_queue" - -var ( - redisClient *redis.Client - matchingRoutineMutex sync.Mutex // Mutex to ensure only one matchmaking goroutine is running - redisAccessMutex sync.Mutex // Mutex for Redis access for concurrency safety - ctx = context.Background() -) diff --git a/apps/matching-service/processes/findmatches.go b/apps/matching-service/processes/findmatches.go new file mode 100644 index 0000000000..338f82006b --- /dev/null +++ b/apps/matching-service/processes/findmatches.go @@ -0,0 +1,221 @@ +package processes + +import ( + "context" + "log" + "matching-service/databases" + "matching-service/models" + "matching-service/utils" + "strings" + + "github.com/redis/go-redis/v9" +) + +// Find the first matching user from the front on the queue, based on topics, then difficulty. +func findMatchingUser(tx *redis.Tx, currentUsername string, ctx context.Context) (*models.MatchFound, error) { + currentUser, err := databases.GetUserDetails(tx, currentUsername, ctx) + if err != nil { + return nil, err + } + + // If currentUser has no topics, it is treated as user accepts any topic. + shouldDoTopicMatching := len(currentUser.Topics) != 0 + + // If currentUser has no difficulty, it is treated as user accepts any difficulty + shouldDoDifficultyMatching := len(currentUser.Difficulties) != 0 + + queuedUsers, err := databases.GetAllQueuedUsers(tx, ctx) + if err != nil { + return nil, err + } + + // For each step, filter the number of potential matches + potentialMatches := append([]string(nil), queuedUsers...) + + if shouldDoTopicMatching { + potentialMatches, err = doTopicMatching(tx, ctx, ¤tUser, potentialMatches) + if err != nil { + return nil, err + } + } + + if shouldDoDifficultyMatching { + potentialMatches, err = doDifficultyMatching(tx, ctx, ¤tUser, potentialMatches) + if err != nil { + return nil, err + } + } + + // Pick the first user from potential matches, which is earlier in the queue. + var foundUser *string + for _, otherUser := range potentialMatches { + if otherUser == currentUsername { + continue + } + foundUser = &otherUser + } + + if foundUser == nil { + return nil, models.NoMatchFound + } + return foundMatch(tx, ctx, ¤tUser, foundUser) +} + +func foundMatch(tx *redis.Tx, ctx context.Context, currentUser *models.MatchRequest, matchedUsername *string) (*models.MatchFound, error) { + // Generate a random match ID + matchId, err := utils.GenerateMatchID() + if err != nil { + log.Println("Unable to randomly generate matchID") + } + + matchedUser, err := databases.GetUserDetails(tx, *matchedUsername, ctx) + if err != nil { + return nil, err + } + + var matchedTopics []string + for _, topic := range currentUser.Topics { + for _, otherTopic := range matchedUser.Topics { + if topic == otherTopic { + matchedTopics = append(matchedTopics, topic) + } + } + } + var matchedDifficulties []string + for _, topic := range currentUser.Difficulties { + for _, otherTopic := range matchedUser.Difficulties { + if topic == otherTopic { + matchedDifficulties = append(matchedDifficulties, topic) + } + } + } + + matchFound := models.MatchFound{ + Type: "match_found", + MatchID: matchId, + User: currentUser.Username, + MatchedUser: *matchedUsername, + MatchedTopics: matchedTopics, + MatchedDifficulties: matchedDifficulties, + } + + return &matchFound, nil +} + +func findSameTopicsUsers(tx *redis.Tx, ctx context.Context, currentUser *models.MatchRequest) (map[string]struct{}, error) { + sameTopicUsersSet := make(map[string]struct{}) + for _, topic := range currentUser.Topics { + topicUsers, err := tx.SMembers(ctx, strings.ToLower(topic)).Result() + if err != nil { + return nil, err + } + + for _, potentialMatchUser := range topicUsers { + sameTopicUsersSet[potentialMatchUser] = struct{}{} + } + } + return sameTopicUsersSet, nil +} + +func findSameDifficultiesUsers(tx *redis.Tx, ctx context.Context, currentUser *models.MatchRequest) (map[string]struct{}, error) { + sameDifficultyUsersSet := make(map[string]struct{}) + queuedUsers, err := databases.GetAllQueuedUsers(tx, ctx) + if err != nil { + return nil, err + } + + for _, potentialMatch := range queuedUsers { + if potentialMatch == currentUser.Username { + continue + } + + potentialMatchUser, err := databases.GetUserDetails(tx, potentialMatch, ctx) + if err != nil { + return nil, err + } + + for _, a := range potentialMatchUser.Difficulties { + for _, b := range currentUser.Difficulties { + if a == b { + sameDifficultyUsersSet[potentialMatch] = struct{}{} + } + } + } + } + return sameDifficultyUsersSet, nil +} + +func doTopicMatching(tx *redis.Tx, ctx context.Context, currentUser *models.MatchRequest, potentialMatches []string) ([]string, error) { + sameTopicUsers, err := findSameTopicsUsers(tx, ctx, currentUser) + if err != nil { + return nil, err + } + + // Iterate through the queue to find a match, so a user in the queue the longest is more likely to be matched. + var foundUsers []string + for _, otherUsername := range potentialMatches { + if otherUsername == currentUser.Username { + continue + } + + // Include users without any difficulty selected + otherUser, err := databases.GetUserDetails(tx, otherUsername, ctx) + if err != nil { + return nil, err + } + if len(otherUser.Topics) == 0 { + foundUsers = append(foundUsers, otherUsername) + } + + // other user has matching topic + if _, ok := sameTopicUsers[otherUsername]; ok { + foundUsers = append(foundUsers, otherUsername) + break + } + } + + // If no other user with same topics, then is not valid match + if len(foundUsers) == 0 { + return nil, models.NoMatchFound + } + + return foundUsers, nil +} + +func doDifficultyMatching(tx *redis.Tx, ctx context.Context, currentUser *models.MatchRequest, potentialMatches []string) ([]string, error) { + sameDifficultyUsers, err := findSameDifficultiesUsers(tx, ctx, currentUser) + if err != nil { + return nil, err + } + + // Iterate through the queue to find a match, so a user in the queue the longest is more likely to be matched. + var foundUsers []string + for _, otherUsername := range potentialMatches { + if otherUsername == currentUser.Username { + continue + } + + // Include users without any difficulty selected + otherUser, err := databases.GetUserDetails(tx, otherUsername, ctx) + if err != nil { + return nil, err + } + if len(otherUser.Topics) == 0 { + foundUsers = append(foundUsers, otherUsername) + } + + // other user has matching difficulty or has no difficulty + if _, ok := sameDifficultyUsers[otherUsername]; ok { + foundUsers = append(foundUsers, otherUsername) + break + } + } + + // If no other user with same difficulties, then skip this criteria + // so that at least a match is found + if len(foundUsers) == 0 { + return nil, models.NoMatchFound + } + + return foundUsers, nil +} diff --git a/apps/matching-service/processes/match.go b/apps/matching-service/processes/match.go deleted file mode 100644 index 7864a75253..0000000000 --- a/apps/matching-service/processes/match.go +++ /dev/null @@ -1,107 +0,0 @@ -package processes - -import ( - "context" - "log" - "matching-service/models" - "matching-service/utils" - "time" - - "github.com/redis/go-redis/v9" -) - -func PerformMatching(matchRequest models.MatchRequest, ctx context.Context, matchFoundChannels map[string]chan models.MatchFound) { - // Acquire mutex - matchingRoutineMutex.Lock() - // Defer unlocking the mutex - defer matchingRoutineMutex.Unlock() - - for { - // Log queue before matchmaking - // PrintMatchingQueue(redisClient, "Before Matchmaking", context.Background()) - - // Check if the queue is empty - queueLength, err := redisClient.LLen(context.Background(), matchmakingQueueRedisKey).Result() - if err != nil { - log.Println("Error checking queue length:", err) - time.Sleep(1 * time.Second) - continue - } - - if queueLength == 0 { - // log.Println("No users in the queue") - time.Sleep(1 * time.Second) - continue - } - - // Peek at the user queue - username, err := redisClient.LIndex(context.Background(), matchmakingQueueRedisKey, 0).Result() - if err != nil { - log.Println("Error peeking user from queue:", err) - time.Sleep(1 * time.Second) - continue - } - - // log.Printf("Performing matching for user: %s", username) - matchedUsername, matchedTopic, matchedDifficulty, err := FindMatchingUser(redisClient, username, ctx) - if err != nil { - log.Println("Error finding matching user:", err) - time.Sleep(1 * time.Second) - continue - } - - if matchedUsername != "" { - // Log down the state of queue before matchmaking - PrintMatchingQueue(redisClient, "Before Matchmaking", context.Background()) - - // Log down which users got matched - log.Printf("Users %s and %s matched on the topic: %s with difficulty: %s", username, matchedUsername, matchedTopic, matchedDifficulty) - - // Clean up redis for this match - cleanUp(redisClient, username, ctx) - cleanUp(redisClient, matchedUsername, ctx) - - // Log queue after matchmaking - PrintMatchingQueue(redisClient, "After Matchmaking", context.Background()) - - // Generate a random match ID - matchId, err := utils.GenerateMatchID() - if err != nil { - log.Println("Unable to randomly generate matchID") - } - - // Signal that a match has been found for user - matchFoundChannels[username] <- models.MatchFound{ - Type: "match_found", - MatchID: matchId, - User: username, - MatchedUser: matchedUsername, - Topic: matchedTopic, - Difficulty: matchedDifficulty, - } - - // Signal that a match has been found for matchedUser - matchFoundChannels[matchedUsername] <- models.MatchFound{ - Type: "match_found", - MatchID: matchId, - User: matchedUsername, - MatchedUser: username, - Topic: matchedTopic, - Difficulty: matchedDifficulty, - } - - } else { - // log.Printf("No match found for user: %s", username) - - // Pop user and add user back into queue - PopAndInsert(redisClient, username, ctx) - } - } -} - -// Clean up queue, sets and hashset in Redis -func cleanUp(redisClient *redis.Client, username string, ctx context.Context) { - DequeueUser(redisClient, username, ctx) - RemoveUserFromTopicSets(redisClient, username, ctx) - RemoveUserDetails(redisClient, username, ctx) -} diff --git a/apps/matching-service/processes/performmatches.go b/apps/matching-service/processes/performmatches.go new file mode 100644 index 0000000000..0c9740338b --- /dev/null +++ b/apps/matching-service/processes/performmatches.go @@ -0,0 +1,129 @@ +package processes + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "log" + "matching-service/databases" + "matching-service/models" + pb "matching-service/proto" + + "matching-service/servers" + + "github.com/redis/go-redis/v9" +) + +// Performs the matching algorithm at most once starting from the front of the queue of users, +// until a match is found or no match and the user is enqueued to the queue. +func PerformMatching(rdb *redis.Client, matchRequest models.MatchRequest, ctx context.Context, errorChan chan error) { + currentUsername := matchRequest.Username + var matchFound *models.MatchFound + + // Obtain lock with retry + lock, err := servers.ObtainRedisLock(ctx) + if err != nil { + errorChan <- err + return + } + defer lock.Release(ctx) + + if err := rdb.Watch(ctx, func(tx *redis.Tx) error { + if err := databases.ValidateNotDuplicateUser(tx, ctx, currentUsername); err != nil { + return err + } + + databases.AddUser(tx, matchRequest, ctx) + + // Log queue before and after matchmaking + databases.PrintMatchingQueue(tx, "Before Matchmaking", ctx) + defer databases.PrintMatchingQueue(tx, "After Matchmaking", ctx) + + // Find a matching user if any + matchFound, err = findMatchingUser(tx, currentUsername, ctx) + if err != nil { + if errors.Is(err, models.NoMatchFound) { + return nil + } + log.Println("Error finding matching user:", err) + return err + } else if matchFound != nil && err != models.NoMatchFound { + matchedUsername := matchFound.MatchedUser + + // Log down which users got matched + log.Printf("Match %v: Users %s and %s matched on the topics: %v; with difficulties: %v", + matchFound.MatchID, currentUsername, matchedUsername, + matchFound.MatchedTopics, matchFound.MatchedDifficulties) + + // Clean up redis for this match + databases.CleanUpUser(tx, currentUsername, ctx) + databases.CleanUpUser(tx, matchedUsername, ctx) + } + + return nil + }); err != nil { + if errors.Is(err, models.ExistingUserError) { + errorChan <- err + } else { + // transaction failed, no retry + println(fmt.Errorf("Transaction execution failed: %v", err)) + } + } else if matchFound != nil { + completeMatch((*redis.Tx)(rdb.Conn()), ctx, matchFound) + } +} + +// Finds the question and publishes it to complete the matching process. +func completeMatch(tx *redis.Tx, ctx context.Context, matchFound *models.MatchFound) { + matchQuestionFound := queryQuestionService(ctx, matchFound) + + log.Printf("Match %v: Question %v found with topics: %v and difficulty %v", + matchFound.MatchID, matchQuestionFound.QuestionDocRefID, + matchQuestionFound.QuestionTopics, matchQuestionFound.QuestionDifficulty) + + currentUsername := matchFound.User + matchedUsername := matchFound.MatchedUser + + publishMatch(tx, ctx, currentUsername, matchedUsername, matchQuestionFound) + publishMatch(tx, ctx, matchedUsername, currentUsername, matchQuestionFound) +} + +// Publish a match to the target user's pub/sub channel +func publishMatch(tx *redis.Tx, ctx context.Context, targetUser string, otherMatchedUser string, matchFound *models.MatchQuestionFound) error { + matchFound.User = targetUser + matchFound.MatchedUser = otherMatchedUser + + msg, err := json.Marshal(matchFound) + if err != nil { + log.Fatalf("Could not marshal message: %v", err) + return err + } + + tx.Publish(ctx, targetUser, msg) + + return nil +} + +// Query question service to find a question for the match +func queryQuestionService(ctx context.Context, matchFound *models.MatchFound) *models.MatchQuestionFound { + question, err := servers.GetGrpcClient().FindMatchingQuestion(ctx, &pb.MatchQuestionRequest{ + MatchedTopics: matchFound.MatchedTopics, + MatchedDifficulties: matchFound.MatchedDifficulties, + }) + if err != nil { + log.Fatalf("Could not retrieve question from question-service: %v", err) + } + + return &models.MatchQuestionFound{ + Type: "match_question_found", + MatchID: matchFound.MatchID, + User: matchFound.User, + MatchedUser: matchFound.MatchedUser, + MatchedTopics: matchFound.MatchedTopics, + QuestionDocRefID: question.QuestionDocRefId, + QuestionName: question.QuestionName, + QuestionDifficulty: question.QuestionDifficulty, + QuestionTopics: question.QuestionTopics, + } +} diff --git a/apps/matching-service/processes/queue.go b/apps/matching-service/processes/queue.go deleted file mode 100644 index fbf2a865e8..0000000000 --- a/apps/matching-service/processes/queue.go +++ /dev/null @@ -1,247 +0,0 @@ -package processes - -import ( - "context" - "encoding/json" - "fmt" - "log" - "matching-service/models" - "strings" - - "github.com/redis/go-redis/v9" -) - -// Print existing users in the matching queue -func PrintMatchingQueue(client *redis.Client, status string, ctx context.Context) { - redisAccessMutex.Lock() - defer redisAccessMutex.Unlock() - - users, err := client.LRange(ctx, matchmakingQueueRedisKey, 0, -1).Result() - if err != nil { - log.Println("Error retrieving users from queue:", err) - return - } - - var concatenatedUsers strings.Builder - for i, user := range users { - concatenatedUsers.WriteString(user) - if i != len(users)-1 { - concatenatedUsers.WriteString(", ") - } - } - - log.Println("Redis Queue (" + status + "): " + concatenatedUsers.String()) -} - -// Enqueue a user into the matchmaking queue -func EnqueueUser(client *redis.Client, username string, ctx context.Context) { - redisAccessMutex.Lock() - defer redisAccessMutex.Unlock() - - err := client.LPush(ctx, matchmakingQueueRedisKey, username).Err() - if err != nil { - log.Println("Error enqueuing user:", err) - } -} - -// Remove user from the matchmaking queue -func DequeueUser(client *redis.Client, username string, ctx context.Context) { - redisAccessMutex.Lock() - defer redisAccessMutex.Unlock() - - err := client.LRem(ctx, matchmakingQueueRedisKey, 1, username).Err() - if err != nil { - log.Println("Error dequeuing user:", err) - } -} - -// Add user into each specified topic set based on the topics selected by users -func AddUserToTopicSets(client *redis.Client, request models.MatchRequest, ctx context.Context) { - redisAccessMutex.Lock() - defer redisAccessMutex.Unlock() - - for _, topic := range request.Topics { - err := client.SAdd(ctx, strings.ToLower(topic), request.Username).Err() - if err != nil { - log.Println("Error adding user to topic set:", err) - } - } -} - -// Remove user from each specified topic set based on the topics selected by users -func RemoveUserFromTopicSets(client *redis.Client, username string, ctx context.Context) { - redisAccessMutex.Lock() - defer redisAccessMutex.Unlock() - - request, err := GetUserDetails(client, username, ctx) - if err != nil { - log.Println("Error retrieving user from hashset:", err) - return - } - - for _, topic := range request.Topics { - err := client.SRem(ctx, strings.ToLower(topic), request.Username).Err() - if err != nil { - log.Println("Error removing user from topic set:", err) - } - } -} - -// Add user details into hashset in Redis -func StoreUserDetails(client *redis.Client, request models.MatchRequest, ctx context.Context) { - redisAccessMutex.Lock() - defer redisAccessMutex.Unlock() - - topicsJSON, err := json.Marshal(request.Topics) - if err != nil { - log.Println("Error marshalling topics:", err) - return - } - - difficultiesJSON, err := json.Marshal(request.Difficulties) - if err != nil { - log.Println("Error marshalling difficulties:", err) - return - } - - err = client.HSet(ctx, request.Username, map[string]interface{}{ - "topics": topicsJSON, - "difficulty": difficultiesJSON, - "username": request.Username, - }).Err() - if err != nil { - log.Println("Error storing user details:", err) - } -} - -// Retrieve user details from hashset in Redis -func GetUserDetails(client *redis.Client, username string, ctx context.Context) (models.MatchRequest, error) { - userDetails, err := client.HGetAll(ctx, username).Result() - if err != nil { - return models.MatchRequest{}, err - } - - if len(userDetails) == 0 { - return models.MatchRequest{}, fmt.Errorf("user not found in hashset: %s", username) - } - - topicsJSON, topicsExist := userDetails["topics"] - difficultiesJSON, difficultiesExist := userDetails["difficulty"] - - if !topicsExist || !difficultiesExist { - return models.MatchRequest{}, fmt.Errorf("incomplete user details for: %s", username) - } - - var topics []string - err = json.Unmarshal([]byte(topicsJSON), &topics) - if err != nil { - return models.MatchRequest{}, fmt.Errorf("error unmarshalling topics: %v", err) - } - - var difficulties []string - err = json.Unmarshal([]byte(difficultiesJSON), &difficulties) - if err != nil { - return models.MatchRequest{}, fmt.Errorf("error unmarshalling difficulties: %v", err) - } - - matchRequest := models.MatchRequest{ - Topics: topics, - Difficulties: difficulties, - Username: username, - } - - return matchRequest, nil -} - -// Remove user details from HashSet -func RemoveUserDetails(client *redis.Client, username string, ctx context.Context) { - redisAccessMutex.Lock() - defer redisAccessMutex.Unlock() - - err := client.Del(ctx, username).Err() - if err != nil { - log.Println("Error removing user details:", err) - } -} - -// Find the first matching user based on topics -func FindMatchingUser(client *redis.Client, username string, ctx context.Context) (string, string, string, error) { - redisAccessMutex.Lock() - defer redisAccessMutex.Unlock() - - user, err := GetUserDetails(client, username, ctx) - if err != nil { - return "", "", "", err - } - - for _, topic := range user.Topics { - users, err := client.SMembers(ctx, strings.ToLower(topic)).Result() - if err != nil { - return "", "", "", err - } - - for _, potentialMatch := range users { - if potentialMatch != username { - matchedUser, err := GetUserDetails(client, potentialMatch, ctx) - if err != nil { - return "", "", "", err - } - - commonDifficulty := models.GetCommonDifficulty(user.Difficulties, matchedUser.Difficulties) - return potentialMatch, topic, commonDifficulty, nil - } - } - } - - return "", "", "", nil -} - -func CleanUpUser(client *redis.Client, username string, ctx context.Context) { - redisAccessMutex.Lock() - defer redisAccessMutex.Unlock() - - // Dequeue user - err := client.LRem(ctx, matchmakingQueueRedisKey, 1, username).Err() - if err != nil { - log.Println("Error dequeuing user:", err) - } - - // Remove user from topic sets - request, err := GetUserDetails(client, username, ctx) - if err != nil { - log.Println("Error retrieving user from hashset:", err) - return - } - - for _, topic := range request.Topics { - err := client.SRem(ctx, strings.ToLower(topic), request.Username).Err() - if err != nil { - log.Println("Error removing user from topic set:", err) - } - } - - // Remove user details - err = client.Del(ctx, username).Err() - if err != nil { - log.Println("Error removing user details:", err) - } - return -} - -func PopAndInsert(client *redis.Client, username string, ctx context.Context) { - redisAccessMutex.Lock() - defer redisAccessMutex.Unlock() - - // Pop user - username, err := client.LPop(ctx, matchmakingQueueRedisKey).Result() - if err != nil { - log.Println("Error popping user from queue:", err) - } - - // Insert back in queue - err = client.LPush(ctx, matchmakingQueueRedisKey, username).Err() - if err != nil { - log.Println("Error enqueuing user:", err) - } - return -} diff --git a/apps/matching-service/processes/read.go b/apps/matching-service/processes/readmessages.go similarity index 54% rename from apps/matching-service/processes/read.go rename to apps/matching-service/processes/readmessages.go index 27e241b96c..ac939263ea 100644 --- a/apps/matching-service/processes/read.go +++ b/apps/matching-service/processes/readmessages.go @@ -1,20 +1,21 @@ package processes import ( - "context" "log" + "matching-service/utils" "github.com/gorilla/websocket" ) // ReadMessages reads messages from the WebSocket and cancels on error. // This is primarily meant for detecting if the client cancels the matching -func ReadMessages(ws *websocket.Conn, ctx context.Context, cancel func()) { +// userCancel() should only be called in this function +func ReadMessages(ws *websocket.Conn, userCancel func()) { for { _, _, err := ws.ReadMessage() if err != nil { - log.Println("Connection closed or error:", err) - cancel() //Cancel the context to terminate other goroutines + log.Printf("Connection closed at port %v", utils.ExtractWebsocketPort(ws)) + userCancel() //Cancel the context to terminate other goroutines return } // Optional: Reset any timeout here if needed. diff --git a/apps/matching-service/processes/redis.go b/apps/matching-service/processes/redis.go deleted file mode 100644 index 6c5240219c..0000000000 --- a/apps/matching-service/processes/redis.go +++ /dev/null @@ -1,35 +0,0 @@ -package processes - -import ( - "context" - "log" - "os" - - "github.com/redis/go-redis/v9" -) - -// SetupRedisClient sets-up the Redis client, and assigns it to a global variable -func SetupRedisClient() { - // Retrieve redis url env variable and setup the redis client - redisAddr := os.Getenv("REDIS_URL") - client := redis.NewClient(&redis.Options{ - Addr: redisAddr, - Password: "", // no password set - DB: 0, // use default DB - }) - - // Ping the redis server - _, err := client.Ping(context.Background()).Result() - if err != nil { - log.Fatalf("Could not connect to Redis: %v", err) - } else { - log.Println("Connected to Redis at the following address: " + redisAddr) - } - - redisClient = client -} - -// Get redisclient -func GetRedisClient() *redis.Client { - return redisClient -} \ No newline at end of file diff --git a/apps/matching-service/proto/questionmatching.pb.go b/apps/matching-service/proto/questionmatching.pb.go new file mode 100644 index 0000000000..f5081758fa --- /dev/null +++ b/apps/matching-service/proto/questionmatching.pb.go @@ -0,0 +1,231 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.35.1 +// protoc v3.21.12 +// source: questionmatching.proto + +package proto + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type MatchQuestionRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + MatchedTopics []string `protobuf:"bytes,1,rep,name=matched_topics,json=matchedTopics,proto3" json:"matched_topics,omitempty"` + MatchedDifficulties []string `protobuf:"bytes,2,rep,name=matched_difficulties,json=matchedDifficulties,proto3" json:"matched_difficulties,omitempty"` +} + +func (x *MatchQuestionRequest) Reset() { + *x = MatchQuestionRequest{} + mi := &file_questionmatching_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *MatchQuestionRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MatchQuestionRequest) ProtoMessage() {} + +func (x *MatchQuestionRequest) ProtoReflect() protoreflect.Message { + mi := &file_questionmatching_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MatchQuestionRequest.ProtoReflect.Descriptor instead. +func (*MatchQuestionRequest) Descriptor() ([]byte, []int) { + return file_questionmatching_proto_rawDescGZIP(), []int{0} +} + +func (x *MatchQuestionRequest) GetMatchedTopics() []string { + if x != nil { + return x.MatchedTopics + } + return nil +} + +func (x *MatchQuestionRequest) GetMatchedDifficulties() []string { + if x != nil { + return x.MatchedDifficulties + } + return nil +} + +type QuestionFound struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + QuestionDocRefId string `protobuf:"bytes,1,opt,name=question_doc_ref_id,json=questionDocRefId,proto3" json:"question_doc_ref_id,omitempty"` + QuestionName string `protobuf:"bytes,2,opt,name=question_name,json=questionName,proto3" json:"question_name,omitempty"` + QuestionDifficulty string `protobuf:"bytes,3,opt,name=question_difficulty,json=questionDifficulty,proto3" json:"question_difficulty,omitempty"` + QuestionTopics []string `protobuf:"bytes,4,rep,name=question_topics,json=questionTopics,proto3" json:"question_topics,omitempty"` +} + +func (x *QuestionFound) Reset() { + *x = QuestionFound{} + mi := &file_questionmatching_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *QuestionFound) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*QuestionFound) ProtoMessage() {} + +func (x *QuestionFound) ProtoReflect() protoreflect.Message { + mi := &file_questionmatching_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use QuestionFound.ProtoReflect.Descriptor instead. +func (*QuestionFound) Descriptor() ([]byte, []int) { + return file_questionmatching_proto_rawDescGZIP(), []int{1} +} + +func (x *QuestionFound) GetQuestionDocRefId() string { + if x != nil { + return x.QuestionDocRefId + } + return "" +} + +func (x *QuestionFound) GetQuestionName() string { + if x != nil { + return x.QuestionName + } + return "" +} + +func (x *QuestionFound) GetQuestionDifficulty() string { + if x != nil { + return x.QuestionDifficulty + } + return "" +} + +func (x *QuestionFound) GetQuestionTopics() []string { + if x != nil { + return x.QuestionTopics + } + return nil +} + +var File_questionmatching_proto protoreflect.FileDescriptor + +var file_questionmatching_proto_rawDesc = []byte{ + 0x0a, 0x16, 0x71, 0x75, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x6d, 0x61, 0x74, 0x63, 0x68, 0x69, + 0x6e, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x10, 0x71, 0x75, 0x65, 0x73, 0x74, 0x69, + 0x6f, 0x6e, 0x6d, 0x61, 0x74, 0x63, 0x68, 0x69, 0x6e, 0x67, 0x22, 0x70, 0x0a, 0x14, 0x4d, 0x61, + 0x74, 0x63, 0x68, 0x51, 0x75, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x12, 0x25, 0x0a, 0x0e, 0x6d, 0x61, 0x74, 0x63, 0x68, 0x65, 0x64, 0x5f, 0x74, 0x6f, + 0x70, 0x69, 0x63, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0d, 0x6d, 0x61, 0x74, 0x63, + 0x68, 0x65, 0x64, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x12, 0x31, 0x0a, 0x14, 0x6d, 0x61, 0x74, + 0x63, 0x68, 0x65, 0x64, 0x5f, 0x64, 0x69, 0x66, 0x66, 0x69, 0x63, 0x75, 0x6c, 0x74, 0x69, 0x65, + 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x13, 0x6d, 0x61, 0x74, 0x63, 0x68, 0x65, 0x64, + 0x44, 0x69, 0x66, 0x66, 0x69, 0x63, 0x75, 0x6c, 0x74, 0x69, 0x65, 0x73, 0x22, 0xbd, 0x01, 0x0a, + 0x0d, 0x51, 0x75, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x46, 0x6f, 0x75, 0x6e, 0x64, 0x12, 0x2d, + 0x0a, 0x13, 0x71, 0x75, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x64, 0x6f, 0x63, 0x5f, 0x72, + 0x65, 0x66, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x6f, 0x63, 0x52, 0x65, 0x66, 0x49, 0x64, 0x12, 0x23, 0x0a, + 0x0d, 0x71, 0x75, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x71, 0x75, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4e, 0x61, + 0x6d, 0x65, 0x12, 0x2f, 0x0a, 0x13, 0x71, 0x75, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x64, + 0x69, 0x66, 0x66, 0x69, 0x63, 0x75, 0x6c, 0x74, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x12, 0x71, 0x75, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x69, 0x66, 0x66, 0x69, 0x63, 0x75, + 0x6c, 0x74, 0x79, 0x12, 0x27, 0x0a, 0x0f, 0x71, 0x75, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x5f, + 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0e, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x32, 0x7a, 0x0a, 0x17, + 0x51, 0x75, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x61, 0x74, 0x63, 0x68, 0x69, 0x6e, 0x67, + 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x5f, 0x0a, 0x14, 0x46, 0x69, 0x6e, 0x64, 0x4d, + 0x61, 0x74, 0x63, 0x68, 0x69, 0x6e, 0x67, 0x51, 0x75, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x12, + 0x26, 0x2e, 0x71, 0x75, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x6d, 0x61, 0x74, 0x63, 0x68, 0x69, + 0x6e, 0x67, 0x2e, 0x4d, 0x61, 0x74, 0x63, 0x68, 0x51, 0x75, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x71, 0x75, 0x65, 0x73, 0x74, 0x69, + 0x6f, 0x6e, 0x6d, 0x61, 0x74, 0x63, 0x68, 0x69, 0x6e, 0x67, 0x2e, 0x51, 0x75, 0x65, 0x73, 0x74, + 0x69, 0x6f, 0x6e, 0x46, 0x6f, 0x75, 0x6e, 0x64, 0x42, 0x09, 0x5a, 0x07, 0x2e, 0x2f, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_questionmatching_proto_rawDescOnce sync.Once + file_questionmatching_proto_rawDescData = file_questionmatching_proto_rawDesc +) + +func file_questionmatching_proto_rawDescGZIP() []byte { + file_questionmatching_proto_rawDescOnce.Do(func() { + file_questionmatching_proto_rawDescData = protoimpl.X.CompressGZIP(file_questionmatching_proto_rawDescData) + }) + return file_questionmatching_proto_rawDescData +} + +var file_questionmatching_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_questionmatching_proto_goTypes = []any{ + (*MatchQuestionRequest)(nil), // 0: questionmatching.MatchQuestionRequest + (*QuestionFound)(nil), // 1: questionmatching.QuestionFound +} +var file_questionmatching_proto_depIdxs = []int32{ + 0, // 0: questionmatching.QuestionMatchingService.FindMatchingQuestion:input_type -> questionmatching.MatchQuestionRequest + 1, // 1: questionmatching.QuestionMatchingService.FindMatchingQuestion:output_type -> questionmatching.QuestionFound + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_questionmatching_proto_init() } +func file_questionmatching_proto_init() { + if File_questionmatching_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_questionmatching_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_questionmatching_proto_goTypes, + DependencyIndexes: file_questionmatching_proto_depIdxs, + MessageInfos: file_questionmatching_proto_msgTypes, + }.Build() + File_questionmatching_proto = out.File + file_questionmatching_proto_rawDesc = nil + file_questionmatching_proto_goTypes = nil + file_questionmatching_proto_depIdxs = nil +} diff --git a/apps/matching-service/proto/questionmatching_grpc.pb.go b/apps/matching-service/proto/questionmatching_grpc.pb.go new file mode 100644 index 0000000000..7d65ce155e --- /dev/null +++ b/apps/matching-service/proto/questionmatching_grpc.pb.go @@ -0,0 +1,122 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.5.1 +// - protoc v3.21.12 +// source: questionmatching.proto + +package proto + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 + +const ( + QuestionMatchingService_FindMatchingQuestion_FullMethodName = "/questionmatching.QuestionMatchingService/FindMatchingQuestion" +) + +// QuestionMatchingServiceClient is the client API for QuestionMatchingService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type QuestionMatchingServiceClient interface { + FindMatchingQuestion(ctx context.Context, in *MatchQuestionRequest, opts ...grpc.CallOption) (*QuestionFound, error) +} + +type questionMatchingServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewQuestionMatchingServiceClient(cc grpc.ClientConnInterface) QuestionMatchingServiceClient { + return &questionMatchingServiceClient{cc} +} + +func (c *questionMatchingServiceClient) FindMatchingQuestion(ctx context.Context, in *MatchQuestionRequest, opts ...grpc.CallOption) (*QuestionFound, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(QuestionFound) + err := c.cc.Invoke(ctx, QuestionMatchingService_FindMatchingQuestion_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +// QuestionMatchingServiceServer is the server API for QuestionMatchingService service. +// All implementations must embed UnimplementedQuestionMatchingServiceServer +// for forward compatibility. +type QuestionMatchingServiceServer interface { + FindMatchingQuestion(context.Context, *MatchQuestionRequest) (*QuestionFound, error) + mustEmbedUnimplementedQuestionMatchingServiceServer() +} + +// UnimplementedQuestionMatchingServiceServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedQuestionMatchingServiceServer struct{} + +func (UnimplementedQuestionMatchingServiceServer) FindMatchingQuestion(context.Context, *MatchQuestionRequest) (*QuestionFound, error) { + return nil, status.Errorf(codes.Unimplemented, "method FindMatchingQuestion not implemented") +} +func (UnimplementedQuestionMatchingServiceServer) mustEmbedUnimplementedQuestionMatchingServiceServer() { +} +func (UnimplementedQuestionMatchingServiceServer) testEmbeddedByValue() {} + +// UnsafeQuestionMatchingServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to QuestionMatchingServiceServer will +// result in compilation errors. +type UnsafeQuestionMatchingServiceServer interface { + mustEmbedUnimplementedQuestionMatchingServiceServer() +} + +func RegisterQuestionMatchingServiceServer(s grpc.ServiceRegistrar, srv QuestionMatchingServiceServer) { + // If the following call pancis, it indicates UnimplementedQuestionMatchingServiceServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } + s.RegisterService(&QuestionMatchingService_ServiceDesc, srv) +} + +func _QuestionMatchingService_FindMatchingQuestion_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(MatchQuestionRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(QuestionMatchingServiceServer).FindMatchingQuestion(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: QuestionMatchingService_FindMatchingQuestion_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(QuestionMatchingServiceServer).FindMatchingQuestion(ctx, req.(*MatchQuestionRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// QuestionMatchingService_ServiceDesc is the grpc.ServiceDesc for QuestionMatchingService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var QuestionMatchingService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "questionmatching.QuestionMatchingService", + HandlerType: (*QuestionMatchingServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "FindMatchingQuestion", + Handler: _QuestionMatchingService_FindMatchingQuestion_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "questionmatching.proto", +} diff --git a/apps/matching-service/servers/grpc.go b/apps/matching-service/servers/grpc.go new file mode 100644 index 0000000000..a6a2aac507 --- /dev/null +++ b/apps/matching-service/servers/grpc.go @@ -0,0 +1,34 @@ +package servers + +import ( + "log" + pb "matching-service/proto" + "os" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +var ( + questionMatchingClient pb.QuestionMatchingServiceClient +) + +func InitGrpcServer() *grpc.ClientConn { + questionServiceAddr := os.Getenv("QUESTION_SERVICE_GRPC_URL") + // Dial the server + conn, err := grpc.NewClient(questionServiceAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + log.Fatalf("Did not connect to %v: %v", questionServiceAddr, err) + } else { + log.Println("Connected to Grpc server at %v", questionServiceAddr) + } + + // Create a new client for the ExampleService + questionMatchingClient = pb.NewQuestionMatchingServiceClient(conn) + + return conn +} + +func GetGrpcClient() pb.QuestionMatchingServiceClient { + return questionMatchingClient +} diff --git a/apps/matching-service/servers/redis.go b/apps/matching-service/servers/redis.go new file mode 100644 index 0000000000..395cbde51f --- /dev/null +++ b/apps/matching-service/servers/redis.go @@ -0,0 +1,68 @@ +package servers + +import ( + "context" + "fmt" + "log" + "os" + "time" + + "github.com/bsm/redislock" + "github.com/redis/go-redis/v9" +) + +const MatchmakingQueueRedisKey = "matchmaking_queue" +const matchmakingRedisLock = "matchmaking_lock" + +var redisClient *redis.Client +var redisLock *redislock.Client + +// SetupRedisClient sets-up the Redis client, and assigns it to a global variable +func SetupRedisClient() *redis.Client { + // Retrieve redis url env variable and setup the redis client + redisAddr := os.Getenv("REDIS_URL") + redisClient = redis.NewClient(&redis.Options{ + Addr: redisAddr, + Password: "", // no password set + DB: 0, // use default DB + }) + + // Ping the redis server + _, err := redisClient.Ping(context.Background()).Result() + if err != nil { + log.Fatalf("Could not connect to Redis: %v", err) + } else { + log.Println("Connected to Redis at the following address: " + redisAddr) + } + + // Create a new lock client. + redisLock = redislock.New(redisClient) + + return redisClient +} + +func GetRedisClient() *redis.Client { + return redisClient +} + +func GetRedisLock() *redislock.Client { + return redisLock +} + +func ObtainRedisLock(ctx context.Context) (*redislock.Lock, error) { + // Retry every 100ms, for up-to 100x + backoff := redislock.LimitRetry(redislock.LinearBackoff(100*time.Millisecond), 100) + + // Obtain lock with retry + lock, err := redisLock.Obtain(ctx, matchmakingRedisLock, time.Second, &redislock.Options{ + RetryStrategy: backoff, + }) + if err == redislock.ErrNotObtained { + fmt.Println("Could not obtain lock!") + return nil, err + } else if err != nil { + log.Fatalln(err) + return nil, err + } + return lock, err +} diff --git a/apps/matching-service/tests/websocket-test.html b/apps/matching-service/tests/websocket-test.html index 4cba225c44..cd0717e099 100644 --- a/apps/matching-service/tests/websocket-test.html +++ b/apps/matching-service/tests/websocket-test.html @@ -1,120 +1,512 @@ + - Matching service: websocket test + Matching Service: WebSocket Test + -

Status (Conn 1): no matching yet

- - -

- -

Status (Conn 2): no matching yet

- - -

+
+
+ + + + +
+
+

Summary of User Match Requests

+
+
+
+
diff --git a/apps/matching-service/utils/timeout.go b/apps/matching-service/utils/timeout.go index 1876518f07..08d6b8c340 100644 --- a/apps/matching-service/utils/timeout.go +++ b/apps/matching-service/utils/timeout.go @@ -1,6 +1,7 @@ package utils import ( + "context" "os" "strconv" "time" @@ -15,3 +16,13 @@ func GetTimeoutDuration() (time.Duration, error) { } return time.Duration(timeout) * time.Second, nil } + +// createTimeoutContext sets up a timeout context based on configuration. +func CreateTimeoutContext() (context.Context, context.CancelFunc, error) { + timeoutDuration, err := GetTimeoutDuration() + if err != nil { + return nil, nil, err + } + ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration) + return ctx, cancel, nil +} diff --git a/apps/matching-service/utils/websocket.go b/apps/matching-service/utils/websocket.go new file mode 100644 index 0000000000..f342ba1af6 --- /dev/null +++ b/apps/matching-service/utils/websocket.go @@ -0,0 +1,16 @@ +package utils + +import ( + "strings" + + "github.com/gorilla/websocket" +) + +func ExtractWebsocketPort(ws *websocket.Conn) string { + + // Get the remote address (client's IP and port) + clientAddr := ws.RemoteAddr().String() + + // Extract the port (after the last ':') + return clientAddr[strings.LastIndex(clientAddr, ":")+1:] +} diff --git a/apps/proto/README.md b/apps/proto/README.md new file mode 100644 index 0000000000..ca87aca110 --- /dev/null +++ b/apps/proto/README.md @@ -0,0 +1,46 @@ +This directory contains the protocol buffers used for gRPC protocols + +## Developer set up + +1. Install Protocol Buffers (protoc) + +- For macOS (using Homebrew): + +```bash +brew install protobuf +``` + +- For Ubuntu/Debian: + +```bash +sudo apt-get install -y protobuf-compiler +``` + +2. Install gRPC and Protocol Buffers Go Plugin + +Next, install the Go-specific protoc-gen-go and protoc-gen-go-grpc plugins: + +```bash +go install google.golang.org/protobuf/cmd/protoc-gen-go@latest +go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest +``` + +Make sure to add the GOPATH/bin directory to your PATH so that protoc can find the plugins: + +```bash +export PATH="$PATH:$(go env GOPATH)/bin" +``` + +3. Generate Go Code from `.proto` Files + +Now use the protoc compiler to generate the Go code for the service and messages. + +```bash +protoc --go_out=../matching-service --go-grpc_out=../matching-service ./questionmatching.proto +protoc --go_out=../question-service --go-grpc_out=../question-service ./questionmatching.proto +``` + +This command will generate two files: + +- `questionmatching.pb.go`: Contains the message structures for requests and responses. +- `questionmatching_grpc.pb.go`: Contains the interface and client/server code for the service. diff --git a/apps/proto/questionmatching.proto b/apps/proto/questionmatching.proto new file mode 100644 index 0000000000..46d4d24c49 --- /dev/null +++ b/apps/proto/questionmatching.proto @@ -0,0 +1,21 @@ +syntax = "proto3"; + +package questionmatching; + +option go_package = "./proto"; + +message MatchQuestionRequest { + repeated string matched_topics = 1; + repeated string matched_difficulties = 2; +} + +message QuestionFound { + string question_doc_ref_id = 1; + string question_name = 2; + string question_difficulty = 3; + repeated string question_topics = 4; +} + +service QuestionMatchingService { + rpc FindMatchingQuestion (MatchQuestionRequest) returns (QuestionFound); +} \ No newline at end of file diff --git a/apps/question-service/Dockerfile b/apps/question-service/Dockerfile index c2a9e0836b..0f7b9b698a 100644 --- a/apps/question-service/Dockerfile +++ b/apps/question-service/Dockerfile @@ -11,6 +11,6 @@ COPY . . RUN go build -v -o /usr/local/bin/app ./main.go -EXPOSE 8080 +EXPOSE 8080 50051 CMD ["app"] diff --git a/apps/question-service/README.md b/apps/question-service/README.md index 99ed749159..3045231030 100644 --- a/apps/question-service/README.md +++ b/apps/question-service/README.md @@ -58,7 +58,7 @@ docker build -t question-service . ``` ```bash -docker run -p 8080:8080 --env-file .env -d question-service +docker run -p 8080:8080 -p 50051:50051 --env-file .env -d question-service ``` The server will be available at http://localhost:8080. diff --git a/apps/question-service/go.mod b/apps/question-service/go.mod index 4ac0fa8e1e..481f8efbb7 100644 --- a/apps/question-service/go.mod +++ b/apps/question-service/go.mod @@ -7,7 +7,7 @@ require ( firebase.google.com/go/v4 v4.14.1 github.com/go-chi/chi/v5 v5.1.0 google.golang.org/api v0.198.0 - google.golang.org/grpc v1.67.0 + google.golang.org/grpc v1.67.1 ) require github.com/joho/godotenv v1.5.1 @@ -49,5 +49,5 @@ require ( google.golang.org/genproto v0.0.0-20240903143218-8af14fe29dc1 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect - google.golang.org/protobuf v1.34.2 // indirect + google.golang.org/protobuf v1.35.1 ) diff --git a/apps/question-service/go.sum b/apps/question-service/go.sum index 1b4b765942..02ba516f91 100644 --- a/apps/question-service/go.sum +++ b/apps/question-service/go.sum @@ -188,8 +188,8 @@ google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyac google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= -google.golang.org/grpc v1.67.0 h1:IdH9y6PF5MPSdAntIcpjQ+tXO41pcQsfZV2RxtQgVcw= -google.golang.org/grpc v1.67.0/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= +google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E= +google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -201,8 +201,8 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= -google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= +google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/apps/question-service/handlers/findmatchingquestion.go b/apps/question-service/handlers/findmatchingquestion.go new file mode 100644 index 0000000000..44f830add2 --- /dev/null +++ b/apps/question-service/handlers/findmatchingquestion.go @@ -0,0 +1,113 @@ +package handlers + +import ( + "context" + "errors" + "log" + "math/rand" + "question-service/models" + pb "question-service/proto" + + "cloud.google.com/go/firestore" +) + +func (s *GrpcServer) FindMatchingQuestion(ctx context.Context, req *pb.MatchQuestionRequest) (*pb.QuestionFound, error) { + log.Printf("Received matching question request: %v", req) + + var docs []*firestore.DocumentSnapshot + + matchedDifficulties := make([]models.ComplexityType, len(req.MatchedDifficulties)) + for i := range req.MatchedDifficulties { + d, err := models.ParseComplexity(req.MatchedDifficulties[i]) + if err != nil { + return nil, err + } + matchedDifficulties[i] = d + } + + // 1. Match by both topic and difficulty + if len(docs) == 0 && len(req.MatchedTopics) > 0 && len(matchedDifficulties) > 0 { + d, err := queryTopicAndDifficultyQuestion(s.Client, ctx, req.MatchedTopics, matchedDifficulties) + if err != nil { + return nil, err + } + docs = d + } + + // 2. Match by just topic + if len(docs) == 0 && len(req.MatchedTopics) > 0 { + d, err := queryTopicQuestion(s.Client, ctx, req.MatchedTopics) + if err != nil { + return nil, err + } + docs = d + } + + // 3. Match by difficulty + if len(docs) == 0 && len(matchedDifficulties) > 0 { + d, err := queryDifficultyQuestion(s.Client, ctx, matchedDifficulties) + if err != nil { + return nil, err + } + docs = d + } + + // 4. No matches, so return random question + if len(docs) == 0 { + d, err := queryAllQuestions(s.Client, ctx) + if err != nil { + return nil, err + } + docs = d + } + + // 5a. No matches, return error + if len(docs) == 0 { + return nil, errors.New("No questions found") + } + + // 5b. Retrieve random question from potential questions + question, err := retrieveRandomQuestion(docs) + if err != nil { + return nil, err + } + + return &pb.QuestionFound{ + QuestionDocRefId: question.DocRefID, + QuestionName: question.Title, + QuestionDifficulty: question.Complexity.String(), + QuestionTopics: question.Categories, + }, nil +} + +// Retrieve the document at the random offset +func retrieveRandomQuestion(docs []*firestore.DocumentSnapshot) (*models.Question, error) { + // Generate a random offset + randomOffset := rand.Intn(len(docs)) + + doc := docs[randomOffset] + var question models.Question + if err := doc.DataTo(&question); err != nil { + return nil, err + + } + question.DocRefID = doc.Ref.ID + return &question, nil +} + +func queryTopicAndDifficultyQuestion(client *firestore.Client, ctx context.Context, topics []string, difficulties []models.ComplexityType) ([]*firestore.DocumentSnapshot, error) { + return client.Collection("questions").Where("complexity", "in", difficulties).Where("categories", "array-contains-any", topics).Documents(ctx).GetAll() +} + +func queryTopicQuestion(client *firestore.Client, ctx context.Context, topics []string) ([]*firestore.DocumentSnapshot, error) { + return client.Collection("questions").Where("categories", "array-contains-any", topics).Documents(ctx).GetAll() +} + +func queryDifficultyQuestion(client *firestore.Client, ctx context.Context, difficulties []models.ComplexityType) ([]*firestore.DocumentSnapshot, error) { + return client.Collection("questions").Where("complexity", "in", difficulties).Documents(ctx).GetAll() + +} + +func queryAllQuestions(client *firestore.Client, ctx context.Context) ([]*firestore.DocumentSnapshot, error) { + return client.Collection("questions").Documents(ctx).GetAll() +} diff --git a/apps/question-service/handlers/list.go b/apps/question-service/handlers/list.go index 8f2463cdfa..47c00800ec 100644 --- a/apps/question-service/handlers/list.go +++ b/apps/question-service/handlers/list.go @@ -1,12 +1,13 @@ package handlers import ( - "cloud.google.com/go/firestore" "encoding/json" "net/http" "question-service/models" "strconv" "strings" + + "cloud.google.com/go/firestore" ) var isValidSortField = map[string]bool{ diff --git a/apps/question-service/handlers/server.go b/apps/question-service/handlers/server.go new file mode 100644 index 0000000000..27c1b20bfb --- /dev/null +++ b/apps/question-service/handlers/server.go @@ -0,0 +1,16 @@ +package handlers + +import ( + pb "question-service/proto" + + "cloud.google.com/go/firestore" +) + +type Service struct { + Client *firestore.Client +} + +type GrpcServer struct { + pb.UnimplementedQuestionMatchingServiceServer // Embed the unimplemented service + Client *firestore.Client +} diff --git a/apps/question-service/handlers/types.go b/apps/question-service/handlers/types.go deleted file mode 100644 index 5829c0ad39..0000000000 --- a/apps/question-service/handlers/types.go +++ /dev/null @@ -1,7 +0,0 @@ -package handlers - -import "cloud.google.com/go/firestore" - -type Service struct { - Client *firestore.Client -} diff --git a/apps/question-service/main.go b/apps/question-service/main.go index 9f9c11d2c7..349a66402e 100644 --- a/apps/question-service/main.go +++ b/apps/question-service/main.go @@ -5,10 +5,12 @@ import ( "flag" "fmt" "log" + "net" "net/http" "os" "question-service/handlers" mymiddleware "question-service/middleware" + pb "question-service/proto" "question-service/utils" "time" @@ -19,23 +21,9 @@ import ( "github.com/go-chi/cors" "github.com/joho/godotenv" "google.golang.org/api/option" + "google.golang.org/grpc" ) -// initFirestore initializes the Firestore client -func initFirestore(ctx context.Context, credentialsPath string) (*firestore.Client, error) { - opt := option.WithCredentialsFile(credentialsPath) - app, err := firebase.NewApp(ctx, nil, opt) - if err != nil { - return nil, fmt.Errorf("failed to initialize Firebase App: %v", err) - } - - client, err := app.Firestore(ctx) - if err != nil { - return nil, fmt.Errorf("failed to get Firestore client: %v", err) - } - return client, nil -} - func main() { // Load .env file err := godotenv.Load() @@ -45,8 +33,7 @@ func main() { // Initialize Firestore client ctx := context.Background() - firebaseCredentialPath := os.Getenv("FIREBASE_CREDENTIAL_PATH") - client, err := initFirestore(ctx, firebaseCredentialPath) + client, err := initFirestore(ctx) if err != nil { log.Fatalf("Failed to initialize Firestore client: %v", err) } @@ -62,7 +49,29 @@ func main() { return } - // Set up chi router + go initGrpcServer(service) + + r := initChiRouter(service) + initRestServer(r) +} + +// initFirestore initializes the Firestore client +func initFirestore(ctx context.Context) (*firestore.Client, error) { + credentialsPath := os.Getenv("FIREBASE_CREDENTIAL_PATH") + opt := option.WithCredentialsFile(credentialsPath) + app, err := firebase.NewApp(ctx, nil, opt) + if err != nil { + return nil, fmt.Errorf("failed to initialize Firebase App: %v", err) + } + + client, err := app.Firestore(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get Firestore client: %v", err) + } + return client, nil +} + +func initChiRouter(service *handlers.Service) *chi.Mux { r := chi.NewRouter() r.Use(middleware.Logger) r.Use(middleware.Timeout(60 * time.Second)) @@ -79,7 +88,12 @@ func main() { MaxAge: 300, // Maximum value not ignored by any of major browsers })) - // Register routes + registerRoutes(r, service) + + return r +} + +func registerRoutes(r *chi.Mux, service *handlers.Service) { r.Route("/questions", func(r chi.Router) { r.Get("/", service.ListQuestions) r.Post("/", service.CreateQuestion) @@ -90,7 +104,9 @@ func main() { r.Delete("/", service.DeleteQuestion) }) }) +} +func initRestServer(r *chi.Mux) { // Serve on port 8080 port := os.Getenv("PORT") if port == "" { @@ -98,9 +114,25 @@ func main() { } // Start the server - log.Printf("Starting server on http://localhost:%s", port) - err = http.ListenAndServe(fmt.Sprintf(":%s", port), r) + log.Printf("Starting REST server on http://localhost:%s", port) + err := http.ListenAndServe(fmt.Sprintf(":%s", port), r) if err != nil { log.Fatalf("Failed to start server: %v", err) } } + +func initGrpcServer(service *handlers.Service) { + lis, err := net.Listen("tcp", ":50051") + if err != nil { + log.Fatalf("failed to listen: %v", err) + } + s := grpc.NewServer() + pb.RegisterQuestionMatchingServiceServer(s, &handlers.GrpcServer{ + Client: service.Client, + }) + + log.Printf("gRPC Server is listening on port 50051...") + if err := s.Serve(lis); err != nil { + log.Fatalf("failed to serve: %v", err) + } +} diff --git a/apps/question-service/proto/questionmatching.pb.go b/apps/question-service/proto/questionmatching.pb.go new file mode 100644 index 0000000000..f5081758fa --- /dev/null +++ b/apps/question-service/proto/questionmatching.pb.go @@ -0,0 +1,231 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.35.1 +// protoc v3.21.12 +// source: questionmatching.proto + +package proto + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type MatchQuestionRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + MatchedTopics []string `protobuf:"bytes,1,rep,name=matched_topics,json=matchedTopics,proto3" json:"matched_topics,omitempty"` + MatchedDifficulties []string `protobuf:"bytes,2,rep,name=matched_difficulties,json=matchedDifficulties,proto3" json:"matched_difficulties,omitempty"` +} + +func (x *MatchQuestionRequest) Reset() { + *x = MatchQuestionRequest{} + mi := &file_questionmatching_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *MatchQuestionRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MatchQuestionRequest) ProtoMessage() {} + +func (x *MatchQuestionRequest) ProtoReflect() protoreflect.Message { + mi := &file_questionmatching_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MatchQuestionRequest.ProtoReflect.Descriptor instead. +func (*MatchQuestionRequest) Descriptor() ([]byte, []int) { + return file_questionmatching_proto_rawDescGZIP(), []int{0} +} + +func (x *MatchQuestionRequest) GetMatchedTopics() []string { + if x != nil { + return x.MatchedTopics + } + return nil +} + +func (x *MatchQuestionRequest) GetMatchedDifficulties() []string { + if x != nil { + return x.MatchedDifficulties + } + return nil +} + +type QuestionFound struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + QuestionDocRefId string `protobuf:"bytes,1,opt,name=question_doc_ref_id,json=questionDocRefId,proto3" json:"question_doc_ref_id,omitempty"` + QuestionName string `protobuf:"bytes,2,opt,name=question_name,json=questionName,proto3" json:"question_name,omitempty"` + QuestionDifficulty string `protobuf:"bytes,3,opt,name=question_difficulty,json=questionDifficulty,proto3" json:"question_difficulty,omitempty"` + QuestionTopics []string `protobuf:"bytes,4,rep,name=question_topics,json=questionTopics,proto3" json:"question_topics,omitempty"` +} + +func (x *QuestionFound) Reset() { + *x = QuestionFound{} + mi := &file_questionmatching_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *QuestionFound) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*QuestionFound) ProtoMessage() {} + +func (x *QuestionFound) ProtoReflect() protoreflect.Message { + mi := &file_questionmatching_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use QuestionFound.ProtoReflect.Descriptor instead. +func (*QuestionFound) Descriptor() ([]byte, []int) { + return file_questionmatching_proto_rawDescGZIP(), []int{1} +} + +func (x *QuestionFound) GetQuestionDocRefId() string { + if x != nil { + return x.QuestionDocRefId + } + return "" +} + +func (x *QuestionFound) GetQuestionName() string { + if x != nil { + return x.QuestionName + } + return "" +} + +func (x *QuestionFound) GetQuestionDifficulty() string { + if x != nil { + return x.QuestionDifficulty + } + return "" +} + +func (x *QuestionFound) GetQuestionTopics() []string { + if x != nil { + return x.QuestionTopics + } + return nil +} + +var File_questionmatching_proto protoreflect.FileDescriptor + +var file_questionmatching_proto_rawDesc = []byte{ + 0x0a, 0x16, 0x71, 0x75, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x6d, 0x61, 0x74, 0x63, 0x68, 0x69, + 0x6e, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x10, 0x71, 0x75, 0x65, 0x73, 0x74, 0x69, + 0x6f, 0x6e, 0x6d, 0x61, 0x74, 0x63, 0x68, 0x69, 0x6e, 0x67, 0x22, 0x70, 0x0a, 0x14, 0x4d, 0x61, + 0x74, 0x63, 0x68, 0x51, 0x75, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x12, 0x25, 0x0a, 0x0e, 0x6d, 0x61, 0x74, 0x63, 0x68, 0x65, 0x64, 0x5f, 0x74, 0x6f, + 0x70, 0x69, 0x63, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0d, 0x6d, 0x61, 0x74, 0x63, + 0x68, 0x65, 0x64, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x12, 0x31, 0x0a, 0x14, 0x6d, 0x61, 0x74, + 0x63, 0x68, 0x65, 0x64, 0x5f, 0x64, 0x69, 0x66, 0x66, 0x69, 0x63, 0x75, 0x6c, 0x74, 0x69, 0x65, + 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x13, 0x6d, 0x61, 0x74, 0x63, 0x68, 0x65, 0x64, + 0x44, 0x69, 0x66, 0x66, 0x69, 0x63, 0x75, 0x6c, 0x74, 0x69, 0x65, 0x73, 0x22, 0xbd, 0x01, 0x0a, + 0x0d, 0x51, 0x75, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x46, 0x6f, 0x75, 0x6e, 0x64, 0x12, 0x2d, + 0x0a, 0x13, 0x71, 0x75, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x64, 0x6f, 0x63, 0x5f, 0x72, + 0x65, 0x66, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x6f, 0x63, 0x52, 0x65, 0x66, 0x49, 0x64, 0x12, 0x23, 0x0a, + 0x0d, 0x71, 0x75, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x71, 0x75, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4e, 0x61, + 0x6d, 0x65, 0x12, 0x2f, 0x0a, 0x13, 0x71, 0x75, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x64, + 0x69, 0x66, 0x66, 0x69, 0x63, 0x75, 0x6c, 0x74, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x12, 0x71, 0x75, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x69, 0x66, 0x66, 0x69, 0x63, 0x75, + 0x6c, 0x74, 0x79, 0x12, 0x27, 0x0a, 0x0f, 0x71, 0x75, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x5f, + 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0e, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x32, 0x7a, 0x0a, 0x17, + 0x51, 0x75, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x61, 0x74, 0x63, 0x68, 0x69, 0x6e, 0x67, + 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x5f, 0x0a, 0x14, 0x46, 0x69, 0x6e, 0x64, 0x4d, + 0x61, 0x74, 0x63, 0x68, 0x69, 0x6e, 0x67, 0x51, 0x75, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x12, + 0x26, 0x2e, 0x71, 0x75, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, 0x6d, 0x61, 0x74, 0x63, 0x68, 0x69, + 0x6e, 0x67, 0x2e, 0x4d, 0x61, 0x74, 0x63, 0x68, 0x51, 0x75, 0x65, 0x73, 0x74, 0x69, 0x6f, 0x6e, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x71, 0x75, 0x65, 0x73, 0x74, 0x69, + 0x6f, 0x6e, 0x6d, 0x61, 0x74, 0x63, 0x68, 0x69, 0x6e, 0x67, 0x2e, 0x51, 0x75, 0x65, 0x73, 0x74, + 0x69, 0x6f, 0x6e, 0x46, 0x6f, 0x75, 0x6e, 0x64, 0x42, 0x09, 0x5a, 0x07, 0x2e, 0x2f, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_questionmatching_proto_rawDescOnce sync.Once + file_questionmatching_proto_rawDescData = file_questionmatching_proto_rawDesc +) + +func file_questionmatching_proto_rawDescGZIP() []byte { + file_questionmatching_proto_rawDescOnce.Do(func() { + file_questionmatching_proto_rawDescData = protoimpl.X.CompressGZIP(file_questionmatching_proto_rawDescData) + }) + return file_questionmatching_proto_rawDescData +} + +var file_questionmatching_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_questionmatching_proto_goTypes = []any{ + (*MatchQuestionRequest)(nil), // 0: questionmatching.MatchQuestionRequest + (*QuestionFound)(nil), // 1: questionmatching.QuestionFound +} +var file_questionmatching_proto_depIdxs = []int32{ + 0, // 0: questionmatching.QuestionMatchingService.FindMatchingQuestion:input_type -> questionmatching.MatchQuestionRequest + 1, // 1: questionmatching.QuestionMatchingService.FindMatchingQuestion:output_type -> questionmatching.QuestionFound + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_questionmatching_proto_init() } +func file_questionmatching_proto_init() { + if File_questionmatching_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_questionmatching_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_questionmatching_proto_goTypes, + DependencyIndexes: file_questionmatching_proto_depIdxs, + MessageInfos: file_questionmatching_proto_msgTypes, + }.Build() + File_questionmatching_proto = out.File + file_questionmatching_proto_rawDesc = nil + file_questionmatching_proto_goTypes = nil + file_questionmatching_proto_depIdxs = nil +} diff --git a/apps/question-service/proto/questionmatching_grpc.pb.go b/apps/question-service/proto/questionmatching_grpc.pb.go new file mode 100644 index 0000000000..7d65ce155e --- /dev/null +++ b/apps/question-service/proto/questionmatching_grpc.pb.go @@ -0,0 +1,122 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.5.1 +// - protoc v3.21.12 +// source: questionmatching.proto + +package proto + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 + +const ( + QuestionMatchingService_FindMatchingQuestion_FullMethodName = "/questionmatching.QuestionMatchingService/FindMatchingQuestion" +) + +// QuestionMatchingServiceClient is the client API for QuestionMatchingService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type QuestionMatchingServiceClient interface { + FindMatchingQuestion(ctx context.Context, in *MatchQuestionRequest, opts ...grpc.CallOption) (*QuestionFound, error) +} + +type questionMatchingServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewQuestionMatchingServiceClient(cc grpc.ClientConnInterface) QuestionMatchingServiceClient { + return &questionMatchingServiceClient{cc} +} + +func (c *questionMatchingServiceClient) FindMatchingQuestion(ctx context.Context, in *MatchQuestionRequest, opts ...grpc.CallOption) (*QuestionFound, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(QuestionFound) + err := c.cc.Invoke(ctx, QuestionMatchingService_FindMatchingQuestion_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +// QuestionMatchingServiceServer is the server API for QuestionMatchingService service. +// All implementations must embed UnimplementedQuestionMatchingServiceServer +// for forward compatibility. +type QuestionMatchingServiceServer interface { + FindMatchingQuestion(context.Context, *MatchQuestionRequest) (*QuestionFound, error) + mustEmbedUnimplementedQuestionMatchingServiceServer() +} + +// UnimplementedQuestionMatchingServiceServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedQuestionMatchingServiceServer struct{} + +func (UnimplementedQuestionMatchingServiceServer) FindMatchingQuestion(context.Context, *MatchQuestionRequest) (*QuestionFound, error) { + return nil, status.Errorf(codes.Unimplemented, "method FindMatchingQuestion not implemented") +} +func (UnimplementedQuestionMatchingServiceServer) mustEmbedUnimplementedQuestionMatchingServiceServer() { +} +func (UnimplementedQuestionMatchingServiceServer) testEmbeddedByValue() {} + +// UnsafeQuestionMatchingServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to QuestionMatchingServiceServer will +// result in compilation errors. +type UnsafeQuestionMatchingServiceServer interface { + mustEmbedUnimplementedQuestionMatchingServiceServer() +} + +func RegisterQuestionMatchingServiceServer(s grpc.ServiceRegistrar, srv QuestionMatchingServiceServer) { + // If the following call pancis, it indicates UnimplementedQuestionMatchingServiceServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } + s.RegisterService(&QuestionMatchingService_ServiceDesc, srv) +} + +func _QuestionMatchingService_FindMatchingQuestion_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(MatchQuestionRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(QuestionMatchingServiceServer).FindMatchingQuestion(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: QuestionMatchingService_FindMatchingQuestion_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(QuestionMatchingServiceServer).FindMatchingQuestion(ctx, req.(*MatchQuestionRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// QuestionMatchingService_ServiceDesc is the grpc.ServiceDesc for QuestionMatchingService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var QuestionMatchingService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "questionmatching.QuestionMatchingService", + HandlerType: (*QuestionMatchingServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "FindMatchingQuestion", + Handler: _QuestionMatchingService_FindMatchingQuestion_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "questionmatching.proto", +}