From 71afc27ba7c3ee5c3e2045ecbe3be32f03fdfffa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Garapich?= Date: Sat, 21 Dec 2024 10:03:49 +0000 Subject: [PATCH] feat: store ws client's current url --- public/js/main.js | 1 + public/js/navigation.js | 33 +++ .../plugins/redirect-player-to-new-game.ts | 4 +- src/games/plugins/sync-clients.ts | 54 +++-- .../views/html/go-to-game.tsx | 5 +- src/players/index.ts | 4 - src/queue/plugins/gateway-listeners.ts | 26 +-- src/queue/plugins/sync-clients.ts | 54 ++--- src/websocket/gateway.ts | 206 +++++++++++------- src/websocket/index.ts | 30 +-- 10 files changed, 259 insertions(+), 158 deletions(-) create mode 100644 public/js/navigation.js rename src/{players => games}/plugins/redirect-player-to-new-game.ts (78%) rename src/{players => games}/views/html/go-to-game.tsx (78%) diff --git a/public/js/main.js b/public/js/main.js index ed9306d3..b9a60aaf 100644 --- a/public/js/main.js +++ b/public/js/main.js @@ -8,3 +8,4 @@ import './fade-scroll.js' import './flash-message.js' import './map-thumbnail.js' import './notification.js' +import './navigation.js' diff --git a/public/js/navigation.js b/public/js/navigation.js new file mode 100644 index 00000000..612fb7d4 --- /dev/null +++ b/public/js/navigation.js @@ -0,0 +1,33 @@ +import htmx from './htmx.js' + +/** + * @typedef WsSend + * @type {function} + * @param {string} message + */ + +/** + * @typedef SocketWrapper + * @type {object} + * @property {WsSend} send + */ + +/** @type {SocketWrapper} */ +let socket + +export function reportNavigation(/** @type {string} */ path) { + const msg = JSON.stringify({ navigated: path }) + socket?.send(msg) +} + +/** + * @param {{detail: {socketWrapper: SocketWrapper}}} event + */ +htmx.on('htmx:wsOpen', event => { + socket = event.detail.socketWrapper + reportNavigation(window.location.pathname) +}) + +htmx.on('htmx:pushedIntoHistory', ({ detail }) => { + reportNavigation(detail.path) +}) diff --git a/src/players/plugins/redirect-player-to-new-game.ts b/src/games/plugins/redirect-player-to-new-game.ts similarity index 78% rename from src/players/plugins/redirect-player-to-new-game.ts rename to src/games/plugins/redirect-player-to-new-game.ts index 8d8017f5..39b4c80f 100644 --- a/src/players/plugins/redirect-player-to-new-game.ts +++ b/src/games/plugins/redirect-player-to-new-game.ts @@ -7,8 +7,8 @@ export default fp( events.on('player:updated', ({ before, after }) => { if (before.activeGame === undefined && after.activeGame !== undefined) { app.gateway - .toPlayers(after.steamId) - .broadcast(async () => await GoToGame(after.activeGame!)) + .to({ player: after.steamId }) + .send(async () => await GoToGame(after.activeGame!)) } }) }, diff --git a/src/games/plugins/sync-clients.ts b/src/games/plugins/sync-clients.ts index b4e5774c..44553293 100644 --- a/src/games/plugins/sync-clients.ts +++ b/src/games/plugins/sync-clients.ts @@ -18,38 +18,54 @@ import { GameScore } from '../views/html/game-score' export default fp(async app => { events.on('game:updated', async ({ before, after }) => { if (before.state !== after.state) { - app.gateway.broadcast(async () => await GameStateIndicator({ game: after })) - app.gateway.broadcast(async actor => await ConnectInfo({ game: after, actor })) + app.gateway + .to({ url: `/games/${after.number}` }) + .send(async () => await GameStateIndicator({ game: after })) + app.gateway + .to({ url: `/games/${after.number}` }) + .send(async actor => await ConnectInfo({ game: after, actor })) if ([GameState.launching, GameState.ended, GameState.interrupted].includes(after.state)) { - app.gateway.broadcast(async actor => await GameSlotList({ game: after, actor })) + app.gateway + .to({ url: `/games/${after.number}` }) + .send(async actor => await GameSlotList({ game: after, actor })) } } if (before.score?.blu !== after.score?.blu || before.score?.red !== after.score?.red) { - app.gateway.broadcast(async () => await GameScore({ game: after })) + app.gateway + .to({ url: `/games/${after.number}` }) + .send(async () => await GameScore({ game: after })) } if ( before.connectString !== after.connectString || before.stvConnectString !== after.stvConnectString ) { - app.gateway.broadcast(async actor => await ConnectInfo({ game: after, actor })) + app.gateway + .to({ url: `/games/${after.number}` }) + .send(async actor => await ConnectInfo({ game: after, actor })) } if (before.logsUrl !== after.logsUrl) { - app.gateway.broadcast(async () => await LogsLink({ game: after })) + app.gateway + .to({ url: `/games/${after.number}` }) + .send(async () => await LogsLink({ game: after })) } if (before.demoUrl !== after.demoUrl) { - app.gateway.broadcast(async () => await DemoLink({ game: after })) + app.gateway + .to({ url: `/games/${after.number}` }) + .send(async () => await DemoLink({ game: after })) } if (before.events.length < after.events.length) { const n = before.events.length - after.events.length const newEvents = after.events.slice(n) for (const event of newEvents) { - app.gateway.broadcast(async () => await GameEventList.append({ game: after, event })) + app.gateway + .to({ url: `/games/${after.number}` }) + .send(async () => await GameEventList.append({ game: after, event })) } } @@ -62,8 +78,9 @@ export default fp(async app => { if (beforeSlot.shouldJoinBy !== slot.shouldJoinBy) { app.gateway - .toPlayers(slot.player) - .broadcast(async actor => await ConnectInfo({ game: after, actor })) + .to({ url: `/games/${after.number}` }) + .to({ player: slot.player }) + .send(async actor => await ConnectInfo({ game: after, actor })) } }), ) @@ -76,9 +93,9 @@ export default fp(async app => { events.on( 'game:updated', - whenGameEnds(async () => { + whenGameEnds(async ({ after }) => { const cmp = await GamesLink() - app.gateway.broadcast(() => cmp) + app.gateway.to({ url: `/games/${after.number}` }).send(() => cmp) }), ) @@ -92,7 +109,10 @@ export default fp(async app => { connectionStatus: playerConnectionStatus, }), ) - app.gateway.toPlayers(player).broadcast(async actor => await ConnectInfo({ game, actor })) + app.gateway + .to({ url: `/games/${game.number}` }) + .to({ player }) + .send(async actor => await ConnectInfo({ game, actor })) }), ) @@ -102,11 +122,15 @@ export default fp(async app => { throw new Error(`no such game slot: ${game.number} ${replacee}`) } - app.gateway.broadcast(async actor => await GameSlot({ game, slot, actor })) + app.gateway + .to({ url: `/games/${game.number}` }) + .send(async actor => await GameSlot({ game, slot, actor })) }) events.on('game:playerReplaced', ({ game }) => { // fixme refresh only one slot - app.gateway.broadcast(async actor => await GameSlotList({ game, actor })) + app.gateway + .to({ url: `/games/${game.number}` }) + .send(async actor => await GameSlotList({ game, actor })) }) }) diff --git a/src/players/views/html/go-to-game.tsx b/src/games/views/html/go-to-game.tsx similarity index 78% rename from src/players/views/html/go-to-game.tsx rename to src/games/views/html/go-to-game.tsx index f7fb8ad8..8203c2b9 100644 --- a/src/players/views/html/go-to-game.tsx +++ b/src/games/views/html/go-to-game.tsx @@ -1,6 +1,5 @@ import { nanoid } from 'nanoid' import type { GameNumber } from '../../../database/models/game.model' -import { environment } from '../../../environment' export function GoToGame(number: GameNumber) { const id = nanoid() @@ -8,9 +7,11 @@ export function GoToGame(number: GameNumber) {
diff --git a/src/players/index.ts b/src/players/index.ts index 14283164..8969c28a 100644 --- a/src/players/index.ts +++ b/src/players/index.ts @@ -2,7 +2,6 @@ import fp from 'fastify-plugin' import { bySteamId } from './by-steam-id' import { getPlayerGameCountOnClasses } from './get-player-game-count-on-classes' import { update } from './update' -import { resolve } from 'node:path' export const players = { bySteamId, @@ -12,9 +11,6 @@ export const players = { export default fp( async app => { - await app.register((await import('@fastify/autoload')).default, { - dir: resolve(import.meta.dirname, 'plugins'), - }) await app.register((await import('./routes')).default) }, { name: 'players' }, diff --git a/src/queue/plugins/gateway-listeners.ts b/src/queue/plugins/gateway-listeners.ts index 925c09fc..d9975a10 100644 --- a/src/queue/plugins/gateway-listeners.ts +++ b/src/queue/plugins/gateway-listeners.ts @@ -21,7 +21,7 @@ export default fp( async function refreshTakenSlots(actor: SteamId64) { const slots = await collections.queueSlots.find({ player: { $ne: null } }).toArray() const cmps = await Promise.all(slots.map(async slot => await QueueSlot({ slot, actor }))) - app.gateway.toPlayers(actor).broadcast(() => cmps) + app.gateway.to({ player: actor }).send(() => cmps) } app.gateway.on('queue:join', async (socket, slotId) => { @@ -31,10 +31,10 @@ export default fp( try { const slots = await join(slotId, socket.player.steamId) - app.gateway.toPlayers(socket.player.steamId).broadcast(async () => await MapVote.enable()) + app.gateway.to({ player: socket.player.steamId }).send(async () => await MapVote.enable()) app.gateway - .toPlayers(socket.player.steamId) - .broadcast(async () => await PreReadyUpButton.enable()) + .to({ player: socket.player.steamId }) + .send(async () => await PreReadyUpButton.enable()) if (slots.find(s => s.canMakeFriendsWith?.length)) { await refreshTakenSlots(socket.player.steamId) @@ -51,14 +51,10 @@ export default fp( try { const slot = await leave(socket.player.steamId) - app.gateway.toPlayers(socket.player.steamId).broadcast(async () => await MapVote.disable()) + app.gateway.to({ player: socket.player.steamId }).send(async () => await MapVote.disable()) app.gateway - .toPlayers(socket.player.steamId) - .broadcast(async () => await PreReadyUpButton.disable()) - - app.gateway - .toPlayers(socket.player.steamId) - .broadcast(async actor => await MapVote({ actor })) + .to({ player: socket.player.steamId }) + .send(async () => await PreReadyUpButton.disable()) if (slot.canMakeFriendsWith?.length) { await refreshTakenSlots(socket.player.steamId) @@ -67,7 +63,7 @@ export default fp( const queueState = await getState() if (queueState === QueueState.ready) { const close = await ReadyUpDialog.close() - app.gateway.toPlayers(socket.player.steamId).broadcast(() => close) + app.gateway.to({ player: socket.player.steamId }).send(() => close) } } catch (error) { logger.error(error) @@ -81,7 +77,7 @@ export default fp( try { const [, close] = await Promise.all([readyUp(socket.player.steamId), ReadyUpDialog.close()]) - app.gateway.toPlayers(socket.player.steamId).broadcast(() => close) + app.gateway.to({ player: socket.player.steamId }).send(() => close) } catch (error) { logger.error(error) } @@ -95,8 +91,8 @@ export default fp( try { await voteMap(socket.player.steamId, map) app.gateway - .toPlayers(socket.player.steamId) - .broadcast(async actor => await MapVote({ actor })) + .to({ player: socket.player.steamId }) + .send(async actor => await MapVote({ actor })) } catch (error) { logger.error(error) } diff --git a/src/queue/plugins/sync-clients.ts b/src/queue/plugins/sync-clients.ts index f9c6456f..84697671 100644 --- a/src/queue/plugins/sync-clients.ts +++ b/src/queue/plugins/sync-clients.ts @@ -23,11 +23,15 @@ export default fp( async function syncAllSlots(...players: SteamId64[]) { const slots = await collections.queueSlots.find().toArray() slots.forEach(async slot => { - app.gateway.toPlayers(...players).broadcast(async actor => await QueueSlot({ slot, actor })) + app.gateway.to({ players }).send(async actor => await QueueSlot({ slot, actor })) }) } - app.gateway.on('connected', async socket => { + app.gateway.on('ready', async socket => { + if (socket.currentUrl !== '/') { + return + } + const slots = await collections.queueSlots.find().toArray() slots.forEach(async slot => { socket.send(await QueueSlot({ slot, actor: socket.player?.steamId })) @@ -62,14 +66,14 @@ export default fp( safe(async ({ before, after }) => { if (before.activeGame !== after.activeGame) { const cmp = await RunningGameSnackbar({ gameNumber: after.activeGame }) - app.gateway.toPlayers(after.steamId).broadcast(() => cmp) + app.gateway.to({ players: [after.steamId] }).send(() => cmp) await syncAllSlots(after.steamId) } if (before.preReadyUntil !== after.preReadyUntil) { app.gateway - .toPlayers(after.steamId) - .broadcast(async actor => await PreReadyUpButton({ actor })) + .to({ players: [after.steamId] }) + .send(async actor => await PreReadyUpButton({ actor })) } }), ) @@ -107,7 +111,7 @@ export default fp( .filter(Boolean) as SteamId64[] const show = await ReadyUpDialog.show() - app.gateway.toPlayers(...players).broadcast(() => show) + app.gateway.to({ players }).send(() => show) } }), ) @@ -133,29 +137,29 @@ export default fp( if (!slot) { return } - const actors = await collections.queueSlots - .find({ 'canMakeFriendsWith.0': { $exists: true }, player: { $ne: null } }) - .toArray() - app.gateway - .toPlayers(...actors.map(a => a.player!)) - .broadcast(async actor => await QueueSlot({ slot, actor })) + const players = ( + await collections.queueSlots + .find({ 'canMakeFriendsWith.0': { $exists: true }, player: { $ne: null } }) + .toArray() + ).map(({ player }) => player!) + app.gateway.to({ players }).send(async actor => await QueueSlot({ slot, actor })) }), ) events.on( 'queue/friendship:updated', safe(async ({ target }) => { - const actors = await collections.queueSlots - .find({ 'canMakeFriendsWith.0': { $exists: true }, player: { $ne: null } }) - .toArray() + const players = ( + await collections.queueSlots + .find({ 'canMakeFriendsWith.0': { $exists: true }, player: { $ne: null } }) + .toArray() + ).map(({ player }) => player!) const slots = await collections.queueSlots .find({ player: { $in: [target.before, target.after] } }) .toArray() slots.forEach(slot => { - app.gateway - .toPlayers(...actors.map(a => a.player!)) - .broadcast(async actor => await QueueSlot({ slot, actor })) + app.gateway.to({ players }).send(async actor => await QueueSlot({ slot, actor })) }) }), ) @@ -167,12 +171,12 @@ export default fp( if (!slot) { return } - const actors = await collections.queueSlots - .find({ 'canMakeFriendsWith.0': { $exists: true }, player: { $ne: null } }) - .toArray() - app.gateway - .toPlayers(...actors.map(a => a.player!)) - .broadcast(async actor => await QueueSlot({ slot, actor })) + const players = ( + await collections.queueSlots + .find({ 'canMakeFriendsWith.0': { $exists: true }, player: { $ne: null } }) + .toArray() + ).map(({ player }) => player!) + app.gateway.to({ players }).send(async actor => await QueueSlot({ slot, actor })) }), ) @@ -193,7 +197,7 @@ export default fp( const refreshBanAlerts = async (player: SteamId64) => { const cmp = await BanAlerts({ actor: player }) - app.gateway.toPlayers(player).broadcast(() => cmp) + app.gateway.to({ players: [player] }).send(() => cmp) setImmediate(async () => { await syncAllSlots(player) diff --git a/src/websocket/gateway.ts b/src/websocket/gateway.ts index f5735caa..6bf38df3 100644 --- a/src/websocket/gateway.ts +++ b/src/websocket/gateway.ts @@ -8,6 +8,8 @@ import { logger } from '../logger' export interface ClientToServerEvents { connected: (ipAddress: string, userAgent?: string) => void + ready: () => void + navigated: (url: string) => void 'queue:join': (slotId: number) => void 'queue:leave': () => void 'queue:votemap': (mapName: string) => void @@ -57,6 +59,11 @@ const preReadyToggle = z.object({ HEADERS: htmxHeaders, }) +const navigated = z.object({ + navigated: z.string(), + HEADERS: htmxHeaders.optional(), +}) + const clientMessage = z.union([ joinQueue, leaveQueue, @@ -64,6 +71,7 @@ const clientMessage = z.union([ voteMap, markAsFriend, preReadyToggle, + navigated, ]) type MessageFn = ( @@ -73,6 +81,102 @@ interface Broadcaster { broadcast: (messageFn: MessageFn) => void } +async function sendSafe(client: WebSocket, msg: string) { + return new Promise((resolve, reject) => { + if (client.readyState !== WebSocket.OPEN) { + resolve() + return + } + + client.send(msg, err => { + if (err) { + if ('code' in err && err.code === 'EPIPE') { + client.terminate() + resolve() + return + } + + reject(err) + } else { + resolve() + } + }) + }) +} + +async function send(client: WebSocket, message: MessageFn) { + try { + const m = await message(client.player?.steamId) + if (Array.isArray(m)) { + for (const msg of m) { + await sendSafe(client, msg) + } + } else { + await sendSafe(client, m) + } + } catch (error) { + assertIsError(error) + logger.error(error) + } +} + +type Filters = { players?: Set; urls?: Set } +type UserFilters = + | { player: SteamId64 } + | { players: SteamId64[] } + | { url: string } + | { urls: string[] } + +function mergeFilters(base: Filters, additional: UserFilters) { + if ('players' in additional || 'player' in additional) { + base.players ??= new Set() + } + + if ('url' in additional || 'urls' in additional) { + base.urls ??= new Set() + } + + if ('player' in additional) { + base.players!.add(additional.player) + } else if ('players' in additional) { + additional.players.forEach(player => base.players!.add(player)) + } else if ('url' in additional) { + base.urls!.add(additional.url) + } else if ('urls' in additional) { + additional.urls.forEach(url => base.urls!.add(url)) + } + return base +} + +class BroadcastOperator { + constructor( + public readonly app: FastifyInstance, + private readonly filters: Filters, + ) {} + + to(filter: UserFilters) { + return new BroadcastOperator(this.app, mergeFilters(this.filters, filter)) + } + + send(message: MessageFn) { + this.app.websocketServer.clients.forEach(async client => { + if (this.filters.players) { + if (!client.player || !this.filters.players.has(client.player.steamId)) { + return + } + } + + if (this.filters.urls) { + if (!this.filters.urls.has(client.currentUrl)) { + return + } + } + + await send(client, message) + }) + } +} + export class Gateway extends EventEmitter implements Broadcaster { constructor(public readonly app: FastifyInstance) { super() @@ -86,95 +190,33 @@ export class Gateway extends EventEmitter implements Broadcaster { return this } - broadcast(messageFn: MessageFn) { - this.app.websocketServer.clients.forEach(async client => { - const send = async (msg: string) => - new Promise((resolve, reject) => { - if (client.readyState !== WebSocket.OPEN) { - resolve() - return - } - - client.send(msg, err => { - if (err) { - if ('code' in err && err.code === 'EPIPE') { - client.terminate() - resolve() - return - } - - reject(err) - } else { - resolve() - } - }) - }) - - try { - const message = await messageFn(client.player?.steamId) - if (Array.isArray(message)) { - for (const msg of message) { - await send(msg) - } - } else { - await send(message) - } - } catch (error) { - assertIsError(error) - logger.error(error) - } - }) + broadcast(message: MessageFn) { + this.app.websocketServer.clients.forEach(async client => await send(client, message)) } - toPlayers(...players: SteamId64[]): Broadcaster { - return { - broadcast: (messageFn: MessageFn) => { - this.app.websocketServer.clients.forEach(async client => { - if (client.player && players.includes(client.player.steamId)) { - const send = async (msg: string) => - new Promise((resolve, reject) => { - if (client.readyState !== WebSocket.OPEN) { - resolve() - return - } - - client.send(msg, err => { - if (err) { - if ('code' in err && err.code === 'EPIPE') { - client.terminate() - resolve() - return - } - - reject(err) - } else { - resolve() - } - }) - }) - - try { - const message = await messageFn(client.player.steamId) - if (Array.isArray(message)) { - for (const msg of message) { - await send(msg) - } - } else { - await send(message) - } - } catch (error) { - assertIsError(error) - logger.error(error) - } - } - }) - }, - } + to(filter: UserFilters): BroadcastOperator { + return new BroadcastOperator(this.app, mergeFilters({}, filter)) } parse(socket: WebSocket, message: string) { try { const parsed = clientMessage.parse(JSON.parse(message)) + if ('navigated' in parsed) { + if (!socket.currentUrl) { + socket.currentUrl = parsed.navigated + this.emit('ready', socket) + } else { + socket.currentUrl = parsed.navigated + this.emit('navigated', socket, parsed.navigated) + } + return + } + + if (!socket.player) { + return + } + + // all the other calls are for authenticated clients only if ('join' in parsed) { this.emit('queue:join', socket, parsed.join) } else if ('leave' in parsed) { diff --git a/src/websocket/index.ts b/src/websocket/index.ts index bd99254b..39744a2c 100644 --- a/src/websocket/index.ts +++ b/src/websocket/index.ts @@ -5,6 +5,7 @@ import { extractClientIp } from './extract-client-ip' import websocket from '@fastify/websocket' import { secondsToMilliseconds } from 'date-fns' import type { SteamId64 } from '../shared/types/steam-id-64' +import { nanoid } from 'nanoid' declare module 'fastify' { interface FastifyInstance { @@ -14,7 +15,9 @@ declare module 'fastify' { declare module 'ws' { export default interface WebSocket { + id: string isAlive: boolean + currentUrl: string player?: { steamId: SteamId64 } @@ -55,25 +58,26 @@ export default fp( app.decorate('gateway', gateway) app.get('/ws', { websocket: true }, (socket, req) => { + socket.id = nanoid() + if (req.user) { socket.player = { steamId: req.user.player.steamId, } - - socket.on('message', message => { - let messageString: string - if (Array.isArray(message)) { - messageString = Buffer.concat(message).toString() - } else if (message instanceof ArrayBuffer) { - messageString = Buffer.from(message).toString() - } else { - messageString = message.toString() - } - logger.trace(`${req.user!.player.name}: ${messageString}`) - gateway.parse(socket, messageString) - }) } + socket.on('message', message => { + let messageString: string + if (Array.isArray(message)) { + messageString = Buffer.concat(message).toString() + } else if (message instanceof ArrayBuffer) { + messageString = Buffer.from(message).toString() + } else { + messageString = message.toString() + } + gateway.parse(socket, messageString) + }) + const ipAddress = extractClientIp(req.headers) ?? req.socket.remoteAddress const userAgent = req.headers['user-agent'] gateway.emit('connected', socket, ipAddress, userAgent)