Skip to content

Commit

Permalink
Auth fixes and better concurrency for game rooms (#38)
Browse files Browse the repository at this point in the history
- fix: Changed Cookie names because __Secure is restrictive
- fix: Add AllowCredentials to cors
- fix: Cleanup idle room connections
- fix: add set-cookie for access cookie after refresh
- fix: include sessionTs when fetching user from db
- fix: prevent panic when closing room by niling channel instead of closing
- chore: Print number of rooms in hub after loading into memory.
- Update docker-image.yml

---------

Co-authored-by: Evans Owamoyo <oovamoyo@ozon.ru>
  • Loading branch information
lordvidex and Evans Owamoyo authored Jan 7, 2025
1 parent 7d1240b commit 7e6b1f6
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 48 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/docker-image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,4 @@ jobs:
with:
subject-name: ${{env.REGISTRY}}/${{env.IMAGE_NAME}}
subject-digest: ${{steps.push.outputs.digest}}
push-to-registry: true
push-to-registry: false
66 changes: 38 additions & 28 deletions game/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,10 @@ func (r *Room) play(m Payload) {
}
r.sendAll(newPayload(CPlay, result, m.From))

// Check if the game has finished, if so, close the room
// Check if the game has finished, if so, saveAndClose the room
if r.g.HasEnded() {
r.sendAll(newPayload(CFinish, "Game has ended", ""))
r.close()
r.saveAndClose()
}
}

Expand All @@ -244,7 +244,7 @@ func (r *Room) join(m Payload) {
r.g.Join(pconn.player)
}
// Send the player his current state in the game.
// On error, close the player connection since he will have inconsistent data with which he can't play the game.
// On error, saveAndClose the player connection since he will have inconsistent data with which he can't play the game.
err := pconn.write(newPayload(CData, ToInitialData(ptr.ToObj(r.g), pconn.PName()), ""))
if err != nil {
log.Err(err).Caller().Msg("failed to send player data")
Expand Down Expand Up @@ -298,49 +298,59 @@ func (r *Room) leave(m Payload) {
}
}

// close closes the room and all players in the room.
// saveAndClose closes the room and all players in the room.
// This is used when the game is finished.
func (r *Room) close() {
func (r *Room) saveAndClose() {
r.Close()
// Store the game in the database
if r.saver != nil {
err := r.saver.FinishGame(context.Background(), r.g)
if err != nil {
log.Err(err).Caller().Msg("failed to store game")
}
}
}

// Close should be called only by the gc
func (r *Room) Close() {
if r.closed {
return
}
r.closed = true
r.active = false
// Cancel the context to stop the `leave` goroutine and close
// Cancel the context to stop the `leave` goroutine and saveAndClose
// all prevent any new players from sending messages to the room.
r.cancelCtx()
// Close all players connection
for _, p := range r.players {
p.close()
delete(r.players, p.PName())
}
close(r.broadcast)
// Store the game in the database
if r.saver != nil {
err := r.saver.FinishGame(context.Background(), r.g)
if err != nil {
log.Err(err).Caller().Msg("failed to store game")
}
}
r.broadcast = nil // nil channel will prevent send while closing it will cause panics
}

// run processes all messages sent to the room.
// This function is blocking until the room is closed (r.broadcast is closed)
func (r *Room) run() {
for message := range r.broadcast {
switch message.Type {
case SStart:
r.start(message)
case SMessage:
r.message(message)
case SPlay:
r.play(message)
case PJoin:
r.join(message)
case PLeave:
r.leave(message)
default:
message.sender.write(newPayload(CError, "Unknown message type", ""))
for {
select {
case <-r.ctx.Done():
return
case message := <-r.broadcast:
switch message.Type {
case SStart:
r.start(message)
case SMessage:
r.message(message)
case SPlay:
r.play(message)
case PJoin:
r.join(message)
case PLeave:
r.leave(message)
default:
message.sender.write(newPayload(CError, "Unknown message type", ""))
}
}
}
}
Expand Down
9 changes: 2 additions & 7 deletions handler/cookie.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,9 @@ import (
"net/http"
"time"

"github.com/kodekulture/wordle-server/internal/config"
"github.com/lordvidex/x/auth"
)

func isProd() bool {
return config.Get("ENV") == "prod"
}

const (
accessTokenTTL = 1 * time.Hour // 1 hr
refreshTokenTTL = 365 * 24 * time.Hour // 1 year
Expand All @@ -22,7 +17,7 @@ func newAccessCookie(token auth.Token) http.Cookie {
Name: accessTokenKey,
Value: string(token),
Expires: time.Now().Add(accessTokenTTL),
Secure: isProd(), // enable development usage
Secure: true,
HttpOnly: true,
SameSite: http.SameSiteNoneMode,
}
Expand All @@ -33,7 +28,7 @@ func newRefreshCookie(token auth.Token) http.Cookie {
Name: refreshTokenKey,
Value: string(token),
Expires: time.Now().Add(refreshTokenTTL),
Secure: isProd(), // enable development usage
Secure: true,
HttpOnly: true,
SameSite: http.SameSiteLaxMode,
}
Expand Down
8 changes: 4 additions & 4 deletions handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import (

var (
kors = cors.New(cors.Options{
AllowedOrigins: []string{"http://localhost:3000"},
AllowedMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"},
AllowedHeaders: []string{"Accept", "Authorization", "Content-Type"},
MaxAge: 300,
AllowedOrigins: []string{"http://localhost:3000"},
AllowedMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"},
AllowedHeaders: []string{"Accept", "Authorization", "Content-Type"},
AllowCredentials: true,
})
// Create upgrade websocket connection
upgrader = websocket.Upgrader{
Expand Down
5 changes: 3 additions & 2 deletions handler/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
)

const (
accessTokenKey = "__Secure-access-token"
refreshTokenKey = "__Secure-refresh-token"
accessTokenKey = "X-Access-Token"
refreshTokenKey = "X-Refresh-Token"
)

type (
Expand Down Expand Up @@ -62,6 +62,7 @@ func (h *Handler) sessionMiddleware(next http.Handler) http.Handler {
resp.Error(w, err)
return
}
http.SetCookie(w, accessCk) // set recently refreshed cookie
isDBPlayer = true
}

Expand Down
7 changes: 4 additions & 3 deletions repository/postgres/player.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ func (r *PlayerRepo) GetByUsername(ctx context.Context, username string) (*game.
return nil, err
}
return &game.Player{
ID: int(player.ID),
Username: player.Username,
Password: player.Password,
ID: int(player.ID),
Username: player.Username,
Password: player.Password,
SessionTs: player.SessionTs.Int64,
}, nil
}

Expand Down
85 changes: 83 additions & 2 deletions service/hub_service.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,37 @@
package service

import (
"context"
"sync"
"time"

"github.com/google/uuid"
"github.com/kodekulture/wordle-server/game"
)

const (
// RoomDuration is the maximum time for a game, room is deleted and game data is wiped if
// game is not played during this time.
RoomDuration = time.Hour
// EmptyRoomDuration is the maximum time a game is left without players. It is shorter than RoomDuration
// because Room is probably not in use anymore and contains no data.
EmptyRoomDuration = time.Minute * 15
)

type hub struct {
mu sync.RWMutex
rooms map[uuid.UUID]*game.Room
}

// NewStorage returns a new Storage.
func newHub(r map[uuid.UUID]*game.Room) *hub {
func newHub(ctx context.Context, r map[uuid.UUID]*game.Room) *hub {
if r == nil {
r = make(map[uuid.UUID]*game.Room)
}
return &hub{rooms: r}
h := hub{rooms: r}
go h.gc(ctx)

return &h
}

// GetRoom returns the room with the given id and a bool indicating whether the room was found.
Expand All @@ -41,3 +55,70 @@ func (s *hub) DeleteRoom(id uuid.UUID) {
defer s.mu.Unlock()
delete(s.rooms, id)
}

func (s *hub) gc(ctx context.Context) {
ticker := time.NewTicker(15 * time.Minute)
defer ticker.Stop()

isMarkPhase := true
garbage := make([]*game.Room, 0)

mark := func() {
garbage = nil
s.mu.RLock()
defer s.mu.RUnlock()

for _, r := range s.rooms {
g := r.Game()
if g == nil {
garbage = append(garbage, r)
continue
}

if time.Since(g.CreatedAt) >= RoomDuration {
garbage = append(garbage, r)
continue
}

if time.Since(g.CreatedAt) >= EmptyRoomDuration && len(g.Sessions) == 0 {
garbage = append(garbage, r)
continue
}
}
}

sweep := func() {
// 1. fast remove from rooms
s.mu.Lock()
for _, r := range garbage {
v, _ := uuid.Parse(r.ID())
delete(s.rooms, v)
}
s.mu.Unlock()

// 2. cleanup later
for _, r := range garbage {
if r.IsClosed() {
continue
}
r.Close()
}
garbage = nil
}

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if isMarkPhase {
mark()
} else {
// sweep phase
sweep()
}

isMarkPhase = !isMarkPhase
}
}
}
4 changes: 3 additions & 1 deletion service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ func New(appCtx context.Context, gr repository.Game, pr repository.Player, cr re
if err != nil {
return nil, fmt.Errorf("failed to load hub: %s", err.Error())
}
s.hub = newHub(data)

log.Warn().Msgf("loaded hub with %d rooms", len(data))
s.hub = newHub(appCtx, data)
go s.drop(appCtx)
return s, nil
}

0 comments on commit 7e6b1f6

Please sign in to comment.