From ba83fc280e21ab1515f57d5cc47fec8195440014 Mon Sep 17 00:00:00 2001 From: konsumlamm Date: Thu, 23 May 2024 15:20:52 +0200 Subject: [PATCH] Cleanup Extract parts to functions Rename Add some comments `ws_message::ServerMessage` to `ClientResponse` --- liberica/src/lib/bindings.ts | 4 +- liberica/src/lib/websockets.ts | 18 +++--- robusta/src/main.rs | 106 +++++++++++++++++++-------------- robusta/src/ws_message.rs | 2 +- 4 files changed, 73 insertions(+), 57 deletions(-) diff --git a/liberica/src/lib/bindings.ts b/liberica/src/lib/bindings.ts index 857a2ad..6b8a150 100644 --- a/liberica/src/lib/bindings.ts +++ b/liberica/src/lib/bindings.ts @@ -1,7 +1,5 @@ // This file has been generated by Specta. DO NOT EDIT. -export type ServerMessage = { GameState: GameState } - /** * Information about a tram station. */ @@ -21,5 +19,7 @@ export type TeamState = { team: Team; long: number; lat: number; on_train: strin export type CreateTeam = { name: string; color: string; kind: TeamKind } +export type ClientResponse = { GameState: GameState } + export type GameState = { teams: TeamState[]; trains: Train[] } diff --git a/liberica/src/lib/websockets.ts b/liberica/src/lib/websockets.ts index a12713a..0530d6a 100644 --- a/liberica/src/lib/websockets.ts +++ b/liberica/src/lib/websockets.ts @@ -1,4 +1,4 @@ -import { ClientMessage, ServerMessage } from "lib/bindings"; +import { ClientMessage, ClientResponse } from "lib/bindings"; export type Keys = T extends T ? keyof T : never; export type Concrete> = T extends { [P in K]: infer V } @@ -36,7 +36,7 @@ export class WebSocketApi { private connection!: WebSocket; private endpoint!: string; - private handlers: WSHandlerMap = {}; + private handlers: WSHandlerMap = {}; private metaHandlers: WSHandlerMap = {}; constructor(endpoint: string) { @@ -66,9 +66,9 @@ export class WebSocketApi { }; } - private parseMsg(msg: string): ServerMessage | undefined { + private parseMsg(msg: string): ClientResponse | undefined { try { - const json = JSON.parse(msg) as ServerMessage; + const json = JSON.parse(msg) as ClientResponse; return json; } catch (e) { this.metaHandlers.Error?.(e as Event); @@ -76,21 +76,21 @@ export class WebSocketApi { } } - private handleMessage(msg: ServerMessage) { + private handleMessage(msg: ClientResponse) { this.lastMessage = new Date(); for (const key in msg) { - const handler = key as Keys; + const handler = key as Keys; if (!this.handlers[handler]) console.warn( "No message handler found for message type " + handler, ); - this.handlers[handler]?.(msg[key as keyof ServerMessage]); + this.handlers[handler]?.(msg[key as keyof ClientResponse]); } } - public register>( + public register>( type: T, - handler: WSHandlerMap[T], + handler: WSHandlerMap[T], ): this { this.handlers[type] = handler; return this; diff --git a/robusta/src/main.rs b/robusta/src/main.rs index 9b06fbd..c514e4a 100644 --- a/robusta/src/main.rs +++ b/robusta/src/main.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use std::fs; use std::io::Write; +use std::sync::Arc; use std::time::Duration; use axum::{ @@ -15,7 +16,6 @@ use axum::{ Json, Router, }; use futures_util::SinkExt; -use kvv::LineDepartures; use reqwest::StatusCode; use tokio::sync::mpsc::{Receiver, Sender}; use tower::util::ServiceExt; @@ -25,10 +25,10 @@ use tower_http::{ }; use tracing::{error, info, warn, Level}; use tracing_appender::rolling::{self, Rotation}; -use unique_id::UniqueIdGen; -use ws_message::{ClientMessage, GameState, Team, TeamState}; -use crate::ws_message::TeamKind; +use crate::kvv::LineDepartures; +use crate::unique_id::UniqueIdGen; +use crate::ws_message::{ClientMessage, ClientResponse, GameState, Team, TeamKind, TeamState}; mod kvv; mod point; @@ -54,7 +54,7 @@ enum ServerMessage { #[derive(Debug)] struct Client { - recv: Receiver, + recv: Receiver, send: Sender, id: u32, } @@ -63,7 +63,7 @@ struct Client { struct ClientConnection { id: u32, team_id: u32, - send: Sender, + send: Sender, } #[derive(Debug)] @@ -89,9 +89,11 @@ impl AppState { fn client(&self, id: u32) -> Option<&ClientConnection> { self.connections.iter().find(|x| x.id == id) } + fn client_mut(&mut self, id: u32) -> Option<&mut ClientConnection> { self.connections.iter_mut().find(|x| x.id == id) } + fn team_mut_by_client_id(&mut self, id: u32) -> Option<&mut TeamState> { self.client(id) .map(|x| x.team_id) @@ -99,7 +101,7 @@ impl AppState { } } -type SharedState = std::sync::Arc>; +type SharedState = Arc>; async fn handler(ws: WebSocketUpgrade, State(state): State) -> Response { let (send, rec) = tokio::sync::mpsc::channel(100); @@ -148,7 +150,7 @@ async fn handle_socket(socket: WebSocket, mut client: Client) { }; if let Some(msg) = opt_msg { - if let Ok(client_msg) = serde_json::from_str::(&msg) { + if let Ok(client_msg) = serde_json::from_str::(&msg) { client .send .send(InputMessage::Client(client_msg, client.id)) @@ -256,47 +258,13 @@ async fn main() { .with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE) .init(); - const BINDINGS: &str = "../liberica/src/lib/bindings.ts"; - const TEMP_BINDINGS: &str = "../target/bindings.ts.tmp"; - - specta::export::ts(TEMP_BINDINGS).unwrap(); - let old = fs::read_to_string(BINDINGS).unwrap_or_default(); - let new = fs::read_to_string(TEMP_BINDINGS).unwrap(); - - // Only update bindings if they changed to avoid triggering a recompile of the frontend - if old != new { - info!("Updating bindings"); - fs::write(BINDINGS, new).unwrap(); - } + update_bindings(); info!("Starting server"); kvv::init().await; let (send, recv) = tokio::sync::mpsc::channel(100); - - let mut teams = fs::read_to_string(TEAMS_FILE) - .ok() - .and_then(|x| serde_json::from_str::>(&x).ok()) - .unwrap_or_default(); - - let mut state = AppState::new(send.clone()); - let max_id = teams.iter().map(|ts| ts.team.id).max().unwrap_or(0); - state.team_id_gen.set_min(max_id + 1); - if !teams.iter().any(|ts| ts.team.kind == TeamKind::MrX) { - // no Mr. X present - teams.push(TeamState { - team: Team { - id: state.team_id_gen.next(), - name: MRX.to_owned(), - color: "#000000".to_owned(), - kind: TeamKind::MrX, - }, - ..Default::default() - }); - } - state.teams = teams; - - let state = std::sync::Arc::new(tokio::sync::Mutex::new(state)); + let state = load_state(send.clone()); // fetch departures every 60 seconds and send them to the game logic queue tokio::spawn(async move { @@ -347,6 +315,47 @@ async fn main() { .unwrap(); } +fn update_bindings() { + const BINDINGS: &str = "../liberica/src/lib/bindings.ts"; + const TEMP_BINDINGS: &str = "../target/bindings.ts.tmp"; + + specta::export::ts(TEMP_BINDINGS).unwrap(); + let old = fs::read_to_string(BINDINGS).unwrap_or_default(); + let new = fs::read_to_string(TEMP_BINDINGS).unwrap(); + + // Only update bindings if they changed to avoid triggering a recompile of the frontend + if old != new { + info!("Updating bindings"); + fs::write(BINDINGS, new).unwrap(); + } +} + +fn load_state(send: Sender) -> SharedState { + let mut teams = fs::read_to_string(TEAMS_FILE) + .ok() + .and_then(|x| serde_json::from_str::>(&x).ok()) + .unwrap_or_default(); + + let mut state = AppState::new(send.clone()); + let max_id = teams.iter().map(|ts| ts.team.id).max().unwrap_or(0); + state.team_id_gen.set_min(max_id + 1); + if !teams.iter().any(|ts| ts.team.kind == TeamKind::MrX) { + // no Mr. X present + teams.push(TeamState { + team: Team { + id: state.team_id_gen.next(), + name: MRX.to_owned(), + color: "#000000".to_owned(), + kind: TeamKind::MrX, + }, + ..Default::default() + }); + } + state.teams = teams; + + Arc::new(tokio::sync::Mutex::new(state)) +} + async fn run_game_loop(mut recv: Receiver, state: SharedState) { let mut departures = HashMap::new(); let mut log_file = rolling::Builder::new() @@ -356,10 +365,14 @@ async fn run_game_loop(mut recv: Receiver, state: SharedState) { .max_log_files(1) .build("logs") .expect("failed to initialize rolling file appender"); + + // the time for a single frame let mut interval = tokio::time::interval(Duration::from_millis(500)); + loop { interval.tick().await; + // handle messages let mut state = state.lock().await; while let Ok(msg) = recv.try_recv() { match msg { @@ -410,6 +423,7 @@ async fn run_game_loop(mut recv: Receiver, state: SharedState) { } } + // compute train positions let time = chrono::Utc::now(); let mut trains = kvv::train_positions(&departures, time); trains.retain(|x| !x.line_id.contains("bus")); @@ -424,6 +438,7 @@ async fn run_game_loop(mut recv: Receiver, state: SharedState) { } } + // log game state let game_state = GameState { teams: state.teams.clone(), trains, @@ -437,6 +452,7 @@ async fn run_game_loop(mut recv: Receiver, state: SharedState) { .unwrap(); fs::write(TEAMS_FILE, serde_json::to_string_pretty(&game_state.teams).unwrap()).unwrap(); + // send game state to clients for connection in state.connections.iter_mut() { let game_state = GameState { teams: game_state @@ -449,7 +465,7 @@ async fn run_game_loop(mut recv: Receiver, state: SharedState) { }; if let Err(err) = connection .send - .send(ws_message::ServerMessage::GameState(game_state.clone())) + .send(ClientResponse::GameState(game_state.clone())) .await { error!("failed to send game state to client {}: {}", connection.id, err); diff --git a/robusta/src/ws_message.rs b/robusta/src/ws_message.rs index 75af5a6..a3e4d70 100644 --- a/robusta/src/ws_message.rs +++ b/robusta/src/ws_message.rs @@ -11,7 +11,7 @@ pub enum ClientMessage { } #[derive(specta::Type, Clone, Serialize, Deserialize, Debug)] -pub enum ServerMessage { +pub enum ClientResponse { GameState(GameState), }