Skip to content

feat: use redis pubsub, transactions, locks #49

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Oct 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions apps/matching-service/databases/redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package databases

import (
"context"
"fmt"
"log"
"os"
"time"

"github.com/bsm/redislock"
"github.com/redis/go-redis/v9"
)

const MatchmakingQueueRedisKey = "matchmaking_queue"
const matchmakingRedisLock = "matchmaking_lock"

var redisClient *redis.Client
var redisLock *redislock.Client

// SetupRedisClient sets-up the Redis client, and assigns it to a global variable
func SetupRedisClient() *redis.Client {
// Retrieve redis url env variable and setup the redis client
redisAddr := os.Getenv("REDIS_URL")
redisClient = redis.NewClient(&redis.Options{
Addr: redisAddr,
Password: "", // no password set
DB: 0, // use default DB
})

// Ping the redis server
_, err := redisClient.Ping(context.Background()).Result()
if err != nil {
log.Fatalf("Could not connect to Redis: %v", err)
} else {
log.Println("Connected to Redis at the following address: " + redisAddr)
}

// Create a new lock client.
redisLock = redislock.New(redisClient)

return redisClient
}

func GetRedisClient() *redis.Client {
return redisClient
}

func GetRedisLock() *redislock.Client {
return redisLock
}

func ObtainRedisLock(ctx context.Context) (*redislock.Lock, error) {
// Retry every 100ms, for up-to 100x
backoff := redislock.LimitRetry(redislock.LinearBackoff(100*time.Millisecond), 100)

// Obtain lock with retry
lock, err := redisLock.Obtain(ctx, matchmakingRedisLock, time.Second, &redislock.Options{
RetryStrategy: backoff,
})
if err == redislock.ErrNotObtained {
fmt.Println("Could not obtain lock!")
return nil, err
} else if err != nil {
log.Fatalln(err)
return nil, err
}
return lock, err
}
36 changes: 36 additions & 0 deletions apps/matching-service/databases/topic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package databases

import (
"context"
"log"
"matching-service/models"
"strings"

"github.com/redis/go-redis/v9"
)

// Add user into each specified topic set based on the topics selected by users
func AddUserToTopicSets(tx *redis.Tx, request models.MatchRequest, ctx context.Context) {
for _, topic := range request.Topics {
err := tx.SAdd(ctx, strings.ToLower(topic), request.Username).Err()
if err != nil {
log.Println("Error adding user to topic set:", err)
}
}
}

// Remove user from each specified topic set based on the topics selected by users
func RemoveUserFromTopicSets(tx *redis.Tx, username string, ctx context.Context) {
request, err := GetUserDetails(tx, username, ctx)
if err != nil {
log.Println("Error retrieving user from hashset:", err)
return
}

for _, topic := range request.Topics {
err := tx.SRem(ctx, strings.ToLower(topic), request.Username).Err()
if err != nil {
log.Println("Error removing user from topic set:", err)
}
}
}
22 changes: 22 additions & 0 deletions apps/matching-service/databases/user.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package databases

import (
"context"
"matching-service/models"

"github.com/redis/go-redis/v9"
)

// Clean up queue, sets and hashset in Redis
func CleanUpUser(tx *redis.Tx, username string, ctx context.Context) {
DequeueUser(tx, username, ctx)
RemoveUserFromTopicSets(tx, username, ctx)
RemoveUserDetails(tx, username, ctx)
}

// Adds the user to the queue, sets and hashsets in Redis
func AddUser(tx *redis.Tx, matchRequest models.MatchRequest, ctx context.Context) {
EnqueueUser(tx, matchRequest.Username, ctx)
AddUserToTopicSets(tx, matchRequest, ctx)
StoreUserDetails(tx, matchRequest, ctx)
}
202 changes: 202 additions & 0 deletions apps/matching-service/databases/userqueue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package databases

import (
"context"
"encoding/json"
"fmt"
"log"
"matching-service/models"
"strings"

"github.com/redis/go-redis/v9"
)

// Print existing users in the matching queue
func PrintMatchingQueue(tx *redis.Tx, status string, ctx context.Context) {
users, err := GetAllQueuedUsers(tx, ctx)
if err != nil {
return
}

var concatenatedUsers strings.Builder
for i, user := range users {
concatenatedUsers.WriteString(user)
if i != len(users)-1 {
concatenatedUsers.WriteString(", ")
}
}

log.Println("Redis Queue (" + status + "): " + concatenatedUsers.String())
}

func IsQueueEmpty(tx *redis.Tx, ctx context.Context) (bool, error) {
queueLength, err := tx.LLen(ctx, MatchmakingQueueRedisKey).Result()
if err != nil {
log.Println("Error checking queue length:", err)
return false, err
}
// No users in the queue, so no need to perform matching
return queueLength == 0, nil
}

// Enqueue a user into the matchmaking queue
func EnqueueUser(tx *redis.Tx, username string, ctx context.Context) {
err := tx.LPush(ctx, MatchmakingQueueRedisKey, username).Err()
if err != nil {
log.Println("Error enqueuing user:", err)
}
}

// Remove user from the matchmaking queue
func DequeueUser(tx *redis.Tx, username string, ctx context.Context) {
err := tx.LRem(ctx, MatchmakingQueueRedisKey, 1, username).Err()
if err != nil {
log.Println("Error dequeuing user:", err)
return
}
}

// Returns the first user's username from the queue.
func GetFirstUser(tx *redis.Tx, ctx context.Context) (string, error) {
// Peek at the user queue
username, err := tx.LIndex(ctx, MatchmakingQueueRedisKey, 0).Result()
if err != nil {
log.Println("Error peeking user from queue:", err)
return "", err
}
return username, nil
}

func GetAllQueuedUsers(tx *redis.Tx, ctx context.Context) ([]string, error) {
users, err := tx.LRange(ctx, MatchmakingQueueRedisKey, 0, -1).Result()
if err != nil {
log.Println("Error retrieving users from queue:", err)
return nil, err
}
return users, nil
}

// Add user details into hashset in Redis
func StoreUserDetails(tx *redis.Tx, request models.MatchRequest, ctx context.Context) {
topicsJSON, err := json.Marshal(request.Topics)
if err != nil {
log.Println("Error marshalling topics:", err)
return
}

difficultiesJSON, err := json.Marshal(request.Difficulties)
if err != nil {
log.Println("Error marshalling difficulties:", err)
return
}

err = tx.HSet(ctx, request.Username, map[string]interface{}{
"topics": topicsJSON,
"difficulty": difficultiesJSON,
"username": request.Username,
}).Err()
if err != nil {
log.Println("Error storing user details:", err)
}
}

// Retrieve user details from hashset in Redis
func GetUserDetails(tx *redis.Tx, username string, ctx context.Context) (models.MatchRequest, error) {
userDetails, err := tx.HGetAll(ctx, username).Result()
if err != nil {
return models.MatchRequest{}, err
}

if len(userDetails) == 0 {
return models.MatchRequest{}, fmt.Errorf("user not found in hashset: %s", username)
}

topicsJSON, topicsExist := userDetails["topics"]
difficultiesJSON, difficultiesExist := userDetails["difficulty"]

if !topicsExist || !difficultiesExist {
return models.MatchRequest{}, fmt.Errorf("incomplete user details for: %s", username)
}

var topics []string
err = json.Unmarshal([]byte(topicsJSON), &topics)
if err != nil {
return models.MatchRequest{}, fmt.Errorf("error unmarshalling topics: %v", err)
}

var difficulties []string
err = json.Unmarshal([]byte(difficultiesJSON), &difficulties)
if err != nil {
return models.MatchRequest{}, fmt.Errorf("error unmarshalling difficulties: %v", err)
}

matchRequest := models.MatchRequest{
Topics: topics,
Difficulties: difficulties,
Username: username,
}

return matchRequest, nil
}

// Remove user details from HashSet
func RemoveUserDetails(tx *redis.Tx, username string, ctx context.Context) {
err := tx.Del(ctx, username).Err()
if err != nil {
log.Println("Error removing user details:", err)
}
}

// Find the first matching user based on topics
// TODO: match based on available questions
func FindMatchingUser(tx *redis.Tx, username string, ctx context.Context) (*models.MatchFound, error) {
user, err := GetUserDetails(tx, username, ctx)
if err != nil {
return nil, err
}

for _, topic := range user.Topics {
users, err := tx.SMembers(ctx, strings.ToLower(topic)).Result()
if err != nil {
return nil, err
}

for _, potentialMatch := range users {
if potentialMatch == username {
continue
}

matchedUser, err := GetUserDetails(tx, potentialMatch, ctx)
if err != nil {
return nil, err
}

commonDifficulty := models.GetCommonDifficulty(user.Difficulties, matchedUser.Difficulties)

matchFound := models.MatchFound{
Type: "match_found",
MatchedUser: potentialMatch,
Topic: topic,
Difficulty: commonDifficulty,
}

return &matchFound, nil
}
}

return nil, nil
}

func PopAndInsertUser(tx *redis.Tx, username string, ctx context.Context) {
// Pop user
username, err := tx.LPop(ctx, MatchmakingQueueRedisKey).Result()
if err != nil {
log.Println("Error popping user from queue:", err)
}

// Insert back in queue
err = tx.LPush(ctx, MatchmakingQueueRedisKey, username).Err()
if err != nil {
log.Println("Error enqueuing user:", err)
}
}
1 change: 1 addition & 0 deletions apps/matching-service/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module matching-service
go 1.23.1

require (
github.com/bsm/redislock v0.9.4
github.com/gorilla/websocket v1.5.3
github.com/joho/godotenv v1.5.1
github.com/redis/go-redis/v9 v9.6.2
Expand Down
2 changes: 2 additions & 0 deletions apps/matching-service/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/bsm/redislock v0.9.4 h1:X/Wse1DPpiQgHbVYRE9zv6m070UcKoOGekgvpNhiSvw=
github.com/bsm/redislock v0.9.4/go.mod h1:Epf7AJLiSFwLCiZcfi6pWFO/8eAYrYpQXFxEDPoDeAk=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
Expand Down
35 changes: 35 additions & 0 deletions apps/matching-service/handlers/responses.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package handlers

import (
"log"
"matching-service/models"

"github.com/gorilla/websocket"
)

// sendTimeoutResponse sends a timeout message to the WebSocket client.
func sendTimeoutResponse(ws *websocket.Conn) {
result := models.Timeout{
Type: "timeout",
Message: "No match found. Please try again later.",
}
if err := ws.WriteJSON(result); err != nil {
log.Printf("write error: %v", err)
}
}

func sendRejectionResponse(ws *websocket.Conn) {
if err := ws.WriteJSON(models.MatchRejected{
Type: "match_rejected",
Message: "You are already in a matchmaking queue. Please disconnect before reconnecting.",
}); err != nil {
log.Printf("write error: %v", err)
}
}

// Send message to matched user
func sendMatchFoundResponse(ws *websocket.Conn, username string, result models.MatchFound) {
if err := ws.WriteJSON(result); err != nil {
log.Printf("Error sending message to user %s: %v\n", username, err)
}
}
Loading