From f07701924fadf5d662fa4b330a32cfbf0d3e2d21 Mon Sep 17 00:00:00 2001 From: Nils Hasenbanck Date: Mon, 30 Dec 2024 15:56:42 +0100 Subject: [PATCH] Properly handle and smooth client ticks After long experimentation it seems that the server is actually predicting the client tick rate for us and the client does not need to estimate the correct client tick rate using the round trip time. This also means, that we as the client only need to adjust our current local tick time gently via adjusting a correction frequency. The current setup feels very smooth and the client seems to be kept properly in sync. --- korangar/src/main.rs | 3 + korangar/src/system/timer.rs | 105 ++++++++------------------------- korangar_networking/src/lib.rs | 86 +++++++++++++++++++++------ 3 files changed, 98 insertions(+), 96 deletions(-) diff --git a/korangar/src/main.rs b/korangar/src/main.rs index fc5511d9..5766dde7 100644 --- a/korangar/src/main.rs +++ b/korangar/src/main.rs @@ -931,6 +931,9 @@ impl Client { let saved_login_data = self.saved_login_data.as_ref().unwrap(); self.networking_system.disconnect_from_character_server(); self.networking_system.connect_to_map_server(saved_login_data, login_data); + // Ask for the client tick right away, so that the player isn't de-synced when + // they spawn on the map. + let _ = self.networking_system.request_client_tick(); let character_information = self .saved_characters diff --git a/korangar/src/system/timer.rs b/korangar/src/system/timer.rs index 8feffc3c..ac0fb251 100644 --- a/korangar/src/system/timer.rs +++ b/korangar/src/system/timer.rs @@ -1,4 +1,3 @@ -use std::collections::VecDeque; use std::time::Instant; use chrono::prelude::*; @@ -12,25 +11,13 @@ pub struct GameTimer { frames_per_second: usize, animation_timer: f32, day_timer: f32, - last_client_tick: Instant, + last_packet_receive_time: Instant, first_tick_received: bool, - base_client_tick: u32, + base_client_tick: f64, frequency: f64, - last_update: Instant, - last_error: f64, - integral_error: f64, - error_history: VecDeque<(Instant, f64)>, } const TIME_FACTOR: f32 = 1000.0; -// PID constants -const KP: f64 = 0.0005; -const KI: f64 = 0.00005; -const KD: f64 = 0.00005; -// Gaussian filter constants -const GAUSSIAN_SIGMA: f64 = 4.0; -const GAUSSIAN_DENOMINATOR: f64 = 2.0 * GAUSSIAN_SIGMA * GAUSSIAN_SIGMA; -const GAUSSIAN_WINDOW_SIZE: usize = 15; impl GameTimer { pub fn new() -> Self { @@ -45,86 +32,46 @@ impl GameTimer { frames_per_second: Default::default(), animation_timer: Default::default(), day_timer, - last_client_tick: Instant::now(), + last_packet_receive_time: Instant::now(), first_tick_received: false, - base_client_tick: 0, + base_client_tick: 0.0, frequency: 0.0, - last_update: Instant::now(), - last_error: 0.0, - integral_error: 0.0, - error_history: VecDeque::with_capacity(GAUSSIAN_WINDOW_SIZE), } } - fn gaussian_filter(&self, packet_time: Instant) -> f64 { - if self.error_history.is_empty() { - return 0.0; - } - - let mut weighted_sum = 0.0; - let mut weight_sum = 0.0; - - for (time, error) in &self.error_history { - let dt = packet_time.duration_since(*time).as_secs_f64(); - let weight = (-dt.powi(2) / GAUSSIAN_DENOMINATOR).exp(); - - weighted_sum += error * weight; - weight_sum += weight; - } - - if weight_sum > 0.0 { - weighted_sum / weight_sum - } else { - 0.0 - } - } - - /// Uses a simple PID regulator that uses a gaussian filter to be a bit more - /// resistant against network jitter to synchronize the client side tick and - /// the server tick. - pub fn set_client_tick(&mut self, server_tick: ClientTick, packet_receive_time: Instant) { + /// The networking system sends a request for the newest global tick rate + /// every 10 seconds. The server seems to send the estimated tick rate + /// for the client, based on the round trip time. We only need to make + /// sure, that we send the current tick of the client to the server + /// (networking system does that transparently) and then apply gentle, + /// continuous correction on our local client tick. + pub fn set_client_tick(&mut self, client_tick: ClientTick, packet_receive_time: Instant) { if !self.first_tick_received { self.first_tick_received = true; - self.base_client_tick = server_tick.0; - self.last_client_tick = packet_receive_time; - self.last_update = packet_receive_time; - self.last_error = 0.0; - self.integral_error = 0.0; + self.base_client_tick = client_tick.0 as f64; + self.last_packet_receive_time = packet_receive_time; return; } - let elapsed = packet_receive_time.duration_since(self.last_client_tick).as_secs_f64(); - let adjustment = self.frequency * elapsed; - let tick_at_receive = self.base_client_tick as f64 + (elapsed * 1000.0) + adjustment; + let local_tick = self.get_client_tick_at(packet_receive_time); + let tick_difference = client_tick.0 as f64 - local_tick; - let error = server_tick.0 as f64 - tick_at_receive; - - self.error_history.push_back((packet_receive_time, error)); - while self.error_history.len() > GAUSSIAN_WINDOW_SIZE { - self.error_history.pop_front(); - } - - let filtered_error = self.gaussian_filter(packet_receive_time); - - let dt = packet_receive_time.duration_since(self.last_update).as_secs_f64(); - - self.integral_error = (self.integral_error + filtered_error * dt).clamp(-10.0, 10.0); - - let derivative = (filtered_error - self.last_error) / dt; - - self.frequency = (KP * filtered_error + KI * self.integral_error + KD * derivative).clamp(-0.1, 0.1); + // Calculate frequency needed to make up the difference over the next 10 + // seconds. + self.frequency = tick_difference / 10000.0; + self.base_client_tick = local_tick; + self.last_packet_receive_time = packet_receive_time; + } - self.last_error = filtered_error; - self.base_client_tick = server_tick.0; - self.last_client_tick = packet_receive_time; - self.last_update = packet_receive_time; + #[cfg_attr(feature = "debug", korangar_debug::profile)] + pub fn get_client_tick_at(&self, time: Instant) -> f64 { + let elapsed = time.duration_since(self.last_packet_receive_time).as_secs_f64(); + self.base_client_tick + (elapsed * 1000.0) + (elapsed * self.frequency * 1000.0) } #[cfg_attr(feature = "debug", korangar_debug::profile)] pub fn get_client_tick(&self) -> ClientTick { - let elapsed = self.last_client_tick.elapsed().as_secs_f64(); - let adjustment = self.frequency * elapsed; - let tick = self.base_client_tick as f64 + (elapsed * 1000.0) + adjustment; + let tick = self.get_client_tick_at(Instant::now()); ClientTick(tick as u32) } diff --git a/korangar_networking/src/lib.rs b/korangar_networking/src/lib.rs index b8ab10da..6de778b1 100644 --- a/korangar_networking/src/lib.rs +++ b/korangar_networking/src/lib.rs @@ -1,3 +1,5 @@ +#![feature(let_chains)] + mod entity; mod event; mod hotkey; @@ -8,6 +10,7 @@ mod server; use std::cell::RefCell; use std::net::{IpAddr, SocketAddr}; use std::rc::Rc; +use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use event::{ @@ -44,8 +47,34 @@ impl NetworkEventBuffer { } } +struct TimeContext { + request_received: Instant, + client_tick: u32, +} + +impl TimeContext { + pub fn new() -> Self { + let now = Instant::now(); + Self { + request_received: now, + client_tick: 100, + } + } + + fn current_client_tick(&mut self) -> u32 { + let elapsed = Instant::now().duration_since(self.request_received).as_secs_f64(); + (self.client_tick as f64 + (elapsed * 1000.0)) as u32 + } + + fn update_client_tick(&mut self, server_tick: u32, request_received: Instant) { + self.request_received = request_received; + self.client_tick = server_tick; + } +} + pub struct NetworkingSystem { command_sender: UnboundedSender, + time_context: Arc>, login_server_connection: ServerConnection, character_server_connection: ServerConnection, map_server_connection: ServerConnection, @@ -54,9 +83,8 @@ pub struct NetworkingSystem { impl NetworkingSystem { pub fn spawn() -> (Self, NetworkEventBuffer) { - let command_sender = Self::spawn_networking_thread(NoPacketCallback); - - Self::inner_new(command_sender, NoPacketCallback) + let (command_sender, time_context) = Self::spawn_networking_thread(NoPacketCallback); + Self::inner_new(command_sender, time_context, NoPacketCallback) } } @@ -64,9 +92,14 @@ impl NetworkingSystem where Callback: PacketCallback + Send, { - fn inner_new(command_sender: UnboundedSender, packet_callback: Callback) -> (Self, NetworkEventBuffer) { + fn inner_new( + command_sender: UnboundedSender, + time_context: Arc>, + packet_callback: Callback, + ) -> (Self, NetworkEventBuffer) { let networking_system = Self { command_sender, + time_context, login_server_connection: ServerConnection::Disconnected, character_server_connection: ServerConnection::Disconnected, map_server_connection: ServerConnection::Disconnected, @@ -78,13 +111,14 @@ where } pub fn spawn_with_callback(packet_callback: Callback) -> (Self, NetworkEventBuffer) { - let command_sender = Self::spawn_networking_thread(packet_callback.clone()); - - Self::inner_new(command_sender, packet_callback) + let (command_sender, time_context) = Self::spawn_networking_thread(packet_callback.clone()); + Self::inner_new(command_sender, time_context, packet_callback) } - fn spawn_networking_thread(packet_callback: Callback) -> UnboundedSender { + fn spawn_networking_thread(packet_callback: Callback) -> (UnboundedSender, Arc>) { let (command_sender, mut command_receiver) = tokio::sync::mpsc::unbounded_channel::(); + let time_context = Arc::new(Mutex::new(TimeContext::new())); + let thread_time_context = Arc::clone(&time_context); std::thread::spawn(move || { let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); @@ -115,9 +149,10 @@ where action_receiver, event_sender, packet_handler, - LoginServerKeepalivePacket::new, + |_| LoginServerKeepalivePacket::new(), Duration::from_secs(58), false, + thread_time_context.clone(), )); login_server_task_handle = Some(handle); @@ -138,9 +173,10 @@ where action_receiver, event_sender, packet_handler, - CharacterServerKeepalivePacket::new, + |_| CharacterServerKeepalivePacket::new(), Duration::from_secs(10), true, + thread_time_context.clone(), )); character_server_task_handle = Some(handle); @@ -161,11 +197,16 @@ where action_receiver, event_sender, packet_handler, - // Always passing 100 seems to work fine for now, but it might cause - // issues when connecting to something other than rAthena. - || RequestServerTickPacket::new(ClientTick(100)), - Duration::from_secs(4), + |time_context| match time_context.lock() { + Ok(mut context) => { + let client_tick = context.current_client_tick(); + RequestServerTickPacket::new(ClientTick(client_tick)) + } + Err(_) => RequestServerTickPacket::new(ClientTick(100)), + }, + Duration::from_secs(10), false, + thread_time_context.clone(), )); map_server_task_handle = Some(handle); @@ -175,7 +216,7 @@ where }); }); - command_sender + (command_sender, time_context) } fn handle_connection(connection: &mut ServerConnection, event_buffer: &mut NetworkEventBuffer) @@ -219,17 +260,19 @@ where Self::handle_connection::(&mut self.map_server_connection, events); } + #[allow(clippy::too_many_arguments)] async fn handle_server_connection( address: SocketAddr, mut action_receiver: UnboundedReceiver>, event_sender: UnboundedSender, mut packet_handler: PacketHandler, - ping_factory: impl Fn() -> PingPacket, + ping_factory: impl Fn(&Mutex) -> PingPacket, ping_frequency: Duration, // After logging in to the character server, it sends the account id without any packet. // Since our packet handler has no way of working with this, we need to add some special // logic. mut read_account_id: bool, + time_context: Arc>, ) -> Result<(), NetworkTaskError> where PingPacket: Packet + ClientPacket, @@ -307,12 +350,16 @@ where } for event in events.drain(..) { + if let NetworkEvent::UpdateClientTick {client_tick,received_at} = &event && let Ok(mut context) = time_context.lock() { + context.update_client_tick(client_tick.0, *received_at); + } + event_sender.send(event).map_err(|_| NetworkTaskError::ConnectionClosed)?; } } // Send a keep-alive packet to the server. _ = interval.tick() => { - let packet_bytes = ping_factory().packet_to_bytes().unwrap(); + let packet_bytes = ping_factory(&time_context).packet_to_bytes().unwrap(); stream.write_all(&packet_bytes).await.map_err(|_| NetworkTaskError::ConnectionClosed)?; } } @@ -1097,6 +1144,11 @@ where self.send_map_server_packet(&MapLoadedPacket::default()) } + pub fn request_client_tick(&mut self) -> Result<(), NotConnectedError> { + let client_tick = self.time_context.lock().map(|context| context.client_tick).unwrap_or(100); + self.send_map_server_packet(&RequestServerTickPacket::new(ClientTick(client_tick))) + } + pub fn respawn(&mut self) -> Result<(), NotConnectedError> { self.send_map_server_packet(&RestartPacket::new(RestartType::Respawn)) }