Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Revamp matching service #39

Merged
merged 7 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions apps/matching-service/.env.example
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
PORT=8081
MATCH_TIMEOUT=3
JWT_SECRET=you-can-replace-this-with-your-own-secret
MATCH_TIMEOUT=10
JWT_SECRET=you-can-replace-this-with-your-own-secret
REDIS_URL=localhost:6379
31 changes: 26 additions & 5 deletions apps/matching-service/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,15 @@ go mod tidy
- `PORT`: Specifies the port for the WebSocket server. Default is `8081`.
- `JWT_SECRET`: The secret key used to verify JWT tokens.
- `MATCH_TIMEOUT`: The time in seconds to wait for a match before timing out.
- `REDIS_URL`: The URL for the Redis server. Default is `localhost:6379`.

4. Start the WebSocket server:
4. Start a local redis server:

```bash
docker run -d -p 6379:6379 redis
```

5. Start the WebSocket server:

```bash
go run main.go
Expand Down Expand Up @@ -68,7 +75,8 @@ Client sends matching parameters:
{
"type": "match_request",
"topics": ["Algorithms", "Arrays"],
"difficulties": ["Easy", "Medium"]
"difficulties": ["Easy", "Medium"],
"username": "Jane Doe"
}
```

Expand All @@ -77,9 +85,11 @@ Server response on successful match:
```json
{
"type": "match_found",
"matchID": 67890,
"partnerID": 54321,
"partnerName": "John Doe"
"matchId": "1c018916a34c5bee21af0b2670bd6156",
"user": "zkb4px",
"matchedUser": "JohnDoe",
"topic": "Algorithms",
"difficulty": "Medium"
}
```

Expand All @@ -92,6 +102,15 @@ If no match is found after a set period of time, the server will send a timeout
}
```

If user has an existing websocket connection and wants to initiate another match, the server will reject the request:

```json
{
"type": "match_rejected",
"message": "You are already in a matchmaking queue. Please disconnect before reconnecting."
}
```

If the server encounters an issue during the WebSocket connection or processing, the connection will be closed without any error message. The client should treat the unexpected closing as an error.

## Testing
Expand All @@ -100,6 +119,8 @@ Utilize `./tests/websocket-test.html` for a basic debugging interface of the mat

Make sure to open the HTML file in a web browser while the WebSocket server is running to perform your tests.

You can open one instance of the HTML file in multiple tabs to simulate multiple clients connecting to the server. (In the future: ensure that only one connection is allowed per user)

## Docker Support

TODO: Add section for Docker setup and usage instructions.
6 changes: 6 additions & 0 deletions apps/matching-service/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,10 @@ go 1.23.1
require (
github.com/gorilla/websocket v1.5.3
github.com/joho/godotenv v1.5.1
github.com/redis/go-redis/v9 v9.6.2
)

require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
)
10 changes: 10 additions & 0 deletions apps/matching-service/go.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/redis/go-redis/v9 v9.6.2 h1:w0uvkRbc9KpgD98zcvo5IrVUsn0lXpRMuhNgiHDJzdk=
github.com/redis/go-redis/v9 v9.6.2/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA=
143 changes: 127 additions & 16 deletions apps/matching-service/handlers/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,27 @@ import (
"matching-service/processes"
"matching-service/utils"
"net/http"
"strings"
"sync"

"github.com/gorilla/websocket"
)

var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
// Allow all connections by skipping the origin check (set more restrictions in production)
return true
},
}
var (
upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
// Allow all connections by skipping the origin check (set more restrictions in production)
return true
},
}
// A map to hold active WebSocket connections per username
activeConnections = make(map[string]*websocket.Conn)
// A map to hold user's match ctx cancel function
matchContexts = make(map[string]context.CancelFunc)
// A map to hold user's match channels
matchFoundChannels = make(map[string]chan models.MatchFound)
mu sync.Mutex // Mutex for thread-safe access to activeConnections
)

// handleConnections manages WebSocket connections and matching logic.
func HandleWebSocketConnections(w http.ResponseWriter, r *http.Request) {
Expand All @@ -42,25 +53,48 @@ func HandleWebSocketConnections(w http.ResponseWriter, r *http.Request) {
return
}

// Store WebSocket connection in the activeConnections map.
mu.Lock()
// Checks if user is already an existing websocket connection
if _, exists := activeConnections[matchRequest.Username]; exists {
mu.Unlock()
log.Printf("User %s is already connected, rejecting new connection.", matchRequest.Username)
ws.WriteJSON(models.MatchRejected{
Type: "match_rejected",
Message: "You are already in a matchmaking queue. Please disconnect before reconnecting.",
})
ws.Close()
return
}
activeConnections[matchRequest.Username] = ws
matchCtx, matchCancel := context.WithCancel(context.Background())
matchContexts[matchRequest.Username] = matchCancel

matchFoundChan := make(chan models.MatchFound)
matchFoundChannels[matchRequest.Username] = matchFoundChan
mu.Unlock()

// Create a context for cancellation
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // Ensure cancel is called to release resources

processes.EnqueueUser(processes.GetRedisClient(), matchRequest.Username, ctx)
processes.AddUserToTopicSets(processes.GetRedisClient(), matchRequest, ctx)
processes.StoreUserDetails(processes.GetRedisClient(), matchRequest, ctx)

timeoutCtx, timeoutCancel, err := createTimeoutContext()
if err != nil {
log.Printf("Error creating timeout context: %v", err)
return
}
defer timeoutCancel()

matchFoundChan := make(chan models.MatchFound)

// Start goroutines for handling messages and performing matching.
go processes.ReadMessages(ws, ctx, cancel)
go processes.PerformMatching(matchRequest, ctx, matchFoundChan) // Perform matching
go processes.PerformMatching(matchRequest, context.Background(), matchFoundChannels) // Perform matching

// Wait for a match, timeout, or cancellation.
waitForResult(ws, ctx, timeoutCtx, matchFoundChan)
waitForResult(ws, ctx, timeoutCtx, matchCtx, matchFoundChan, matchRequest.Username)
}

// readMatchRequest reads the initial match request from the WebSocket connection.
Expand All @@ -69,7 +103,13 @@ func readMatchRequest(ws *websocket.Conn) (models.MatchRequest, error) {
if err := ws.ReadJSON(&matchRequest); err != nil {
return matchRequest, err
}
log.Printf("Received match request: %v", matchRequest)
// Get the remote address (client's IP and port)
clientAddr := ws.RemoteAddr().String()

// Extract the port (after the last ':')
clientPort := clientAddr[strings.LastIndex(clientAddr, ":")+1:]

log.Printf("Received match request: %v from client port: %s", matchRequest, clientPort)
return matchRequest, nil
}

Expand All @@ -84,25 +124,53 @@ func createTimeoutContext() (context.Context, context.CancelFunc, error) {
}

// waitForResult waits for a match result, timeout, or cancellation.
func waitForResult(ws *websocket.Conn, ctx, timeoutCtx context.Context, matchFoundChan chan models.MatchFound) {
func waitForResult(ws *websocket.Conn, ctx, timeoutCtx, matchCtx context.Context, matchFoundChan chan models.MatchFound, username string) {
select {
case <-ctx.Done():
log.Println("Matching cancelled")
// Cleanup Redis
processes.CleanUpUser(processes.GetRedisClient(), username, context.Background())
// Remove the match context and active
if _, exists := matchContexts[username]; exists {
delete(matchContexts, username)
}
if _, exists := activeConnections[username]; exists {
delete(activeConnections, username)
}
if _, exists := matchFoundChannels[username]; exists {
delete(matchFoundChannels, username)
}

return
case <-timeoutCtx.Done():
log.Println("Connection timed out")
// Cleanup Redis
processes.CleanUpUser(processes.GetRedisClient(), username, context.Background())
// Remove the match context and active
if _, exists := matchContexts[username]; exists {
delete(matchContexts, username)
}
if _, exists := activeConnections[username]; exists {
delete(activeConnections, username)
}
if _, exists := matchFoundChannels[username]; exists {
delete(matchFoundChannels, username)
}
Comment on lines +149 to +158

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should abstract out the match context and active connections, and the redis cleanup, to a function. We don't have to call it here, but we can call the function after waitForResult, since it is blocking, which should clean up the code.


sendTimeoutResponse(ws)
return
case <-matchCtx.Done():
log.Println("Match found for user: " + username)
return
case result, ok := <-matchFoundChan:
if !ok {
// Channel closed without a match, possibly due to context cancellation
log.Println("Match channel closed without finding a match")
return
}
log.Println("Match found")
if err := ws.WriteJSON(result); err != nil {
log.Printf("write error: %v", err)
}
log.Println("Match found for user: " + username)
// Notify the users about the match
notifyMatch(result.User, result.MatchedUser, result)
return
}
}
Expand All @@ -117,3 +185,46 @@ func sendTimeoutResponse(ws *websocket.Conn) {
log.Printf("write error: %v", err)
}
}

func notifyMatch(username, matchedUsername string, result models.MatchFound) {
mu.Lock()
defer mu.Unlock()

// Send message to the first user
if userConn, userExists := activeConnections[username]; userExists {
if err := userConn.WriteJSON(result); err != nil {
log.Printf("Error sending message to user %s: %v\n", username, err)
}
}

// Send message to the matched user
if matchedUserConn, matchedUserExists := activeConnections[matchedUsername]; matchedUserExists {
result.User, result.MatchedUser = result.MatchedUser, result.User // Swap User and MatchedUser values
if err := matchedUserConn.WriteJSON(result); err != nil {
log.Printf("Error sending message to user %s: %v\n", username, err)
}
}

// Remove the match context for both users and cancel for matched user
if cancelFunc, exists := matchContexts[username]; exists {
cancelFunc()
delete(matchContexts, username)
}

if cancelFunc2, exists := matchContexts[matchedUsername]; exists {
cancelFunc2()
delete(matchContexts, matchedUsername)
}

// Remove the match channels
if _, exists := matchFoundChannels[username]; exists {
delete(matchFoundChannels, username)
}
if _, exists := matchFoundChannels[matchedUsername]; exists {
delete(matchFoundChannels, matchedUsername)
}

// Remove users from the activeConnections map
delete(activeConnections, username)
delete(activeConnections, matchedUsername)
}
24 changes: 24 additions & 0 deletions apps/matching-service/main.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package main

import (
"context"
"fmt"
"log"
"matching-service/handlers"
"matching-service/processes"
"net/http"
"os"

"github.com/joho/godotenv"
"github.com/redis/go-redis/v9"
)

func main() {
Expand All @@ -18,6 +21,27 @@ func main() {
}
port := os.Getenv("PORT")

// Retrieve redis url env variable and setup the redis client
redisAddr := os.Getenv("REDIS_URL")
client := redis.NewClient(&redis.Options{
Addr: redisAddr,
Password: "", // no password set
DB: 0, // use default DB
})

// Ping the redis server
_, err = client.Ping(context.Background()).Result()
if err != nil {
log.Fatalf("Could not connect to Redis: %v", err)
} else {
log.Println("Connected to Redis at the following address: " + redisAddr)
}

// Set redis client
processes.SetRedisClient(client)

// Run a goroutine that matches users

// Routes
http.HandleFunc("/match", handlers.HandleWebSocketConnections)

Expand Down
16 changes: 12 additions & 4 deletions apps/matching-service/models/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,24 @@ type MatchRequest struct {
Type string `json:"type"`
Topics []string `json:"topics"`
Difficulties []string `json:"difficulties"`
Username string `json:"username"`
}

type MatchFound struct {
Type string `json:"type"`
MatchID int64 `json:"matchId"`
PartnerID int64 `json:"partnerId"`
PartnerName string `json:"partnerName"`
MatchID string `json:"matchId"`
User string `json:"user"` // username
MatchedUser string `json:"matchedUser"` // matched username
Topic string `json:"topic"` // matched topic
Difficulty string `json:"difficulty"` // matched difficulty
}

type Timeout struct {
Type string `json:"timeout"`
Type string `json:"type"`
Message string `json:"message"`
}

type MatchRejected struct {
Type string `json:"type"`
Message string `json:"message"`
}
Loading