diff --git a/Cargo.lock b/Cargo.lock index 449785b..54a632d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,24 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "ahash" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + +[[package]] +name = "allocator-api2" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" + [[package]] name = "anstream" version = "0.6.13" @@ -65,6 +83,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "anyhow" +version = "1.0.80" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ad32ce52e4161730f7098c077cd2ed6229b5804ccf99e5366be1ab72a98b4e1" + [[package]] name = "async-channel" version = "2.2.0" @@ -302,6 +326,16 @@ dependencies = [ "typenum", ] +[[package]] +name = "ctrlc" +version = "3.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b467862cc8610ca6fc9a1532d7777cee0804e678ab45410897b9396495994a0b" +dependencies = [ + "nix", + "windows-sys 0.52.0", +] + [[package]] name = "data-encoding" version = "2.5.0" @@ -354,6 +388,21 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "f1-packet-recorder" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap", + "ctrlc", + "f1-telemetry", + "f1-telemetry-common", + "log", + "rusqlite", + "simplelog", + "time", +] + [[package]] name = "f1-telemetry" version = "0.3.0" @@ -411,6 +460,18 @@ dependencies = [ "tokio-tungstenite", ] +[[package]] +name = "fallible-iterator" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" + +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + [[package]] name = "field-offset" version = "0.3.6" @@ -825,6 +886,19 @@ name = "hashbrown" version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +dependencies = [ + "ahash", + "allocator-api2", +] + +[[package]] +name = "hashlink" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "692eaaf7f7607518dd3cef090f1474b61edc5301d8012f09579920df68b725ee" +dependencies = [ + "hashbrown", +] [[package]] name = "heck" @@ -899,6 +973,17 @@ version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" +[[package]] +name = "libsqlite3-sys" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c10584274047cb335c23d3e61bcef8e323adae7c5c8c760540f73610177fc3f" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "lock_api" version = "0.4.11" @@ -961,6 +1046,17 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "nix" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053" +dependencies = [ + "bitflags 2.4.2", + "cfg-if", + "libc", +] + [[package]] name = "num-conv" version = "0.1.0" @@ -1200,6 +1296,20 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "rusqlite" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b838eba278d213a8beaf485bd313fd580ca4505a00d5871caeb1457c55322cae" +dependencies = [ + "bitflags 2.4.2", + "fallible-iterator", + "fallible-streaming-iterator", + "hashlink", + "libsqlite3-sys", + "smallvec", +] + [[package]] name = "rustc-demangle" version = "0.1.23" @@ -1619,6 +1729,12 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version-compare" version = "0.1.1" @@ -1808,3 +1924,23 @@ checksum = "f593a95398737aeed53e489c785df13f3618e41dbcd6718c6addbf1395aa6876" dependencies = [ "memchr", ] + +[[package]] +name = "zerocopy" +version = "0.7.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74d4d3961e53fa4c9a25a8637fc2bfaf2595b3d3ae34875568a5cf64787716be" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.52", +] diff --git a/Cargo.toml b/Cargo.toml index 8df316f..67f5a14 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,6 @@ [workspace] members = [ + "f1-packet-recorder", "f1-telemetry", "f1-telemetry-common", "f1-telemetry-display", diff --git a/f1-packet-recorder/Cargo.toml b/f1-packet-recorder/Cargo.toml new file mode 100644 index 0000000..57973d0 --- /dev/null +++ b/f1-packet-recorder/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "f1-packet-recorder" +version = "0.1.0" +description = "Utility to record and replay sessions." +authors = ["Mathieu Lemay "] +edition = "2021" + +[dependencies] +f1-telemetry = { path = "../f1-telemetry" } +f1-telemetry-common = { path = "../f1-telemetry-common" } +anyhow = "1.0.80" +clap = { version = "4.0.4", features = ["derive", "env"] } +ctrlc = "3.4.2" +log = "0.4.17" +rusqlite = { version = "0.31.0", features = ["bundled"] } +simplelog = "0.12.0" +time = "0.3.11" diff --git a/f1-packet-recorder/src/main.rs b/f1-packet-recorder/src/main.rs new file mode 100644 index 0000000..9b95d81 --- /dev/null +++ b/f1-packet-recorder/src/main.rs @@ -0,0 +1,86 @@ +use anyhow::Result; +use clap::{Args, Parser, Subcommand}; +use log::LevelFilter; +use simplelog::{ColorChoice, TerminalMode}; + +use f1_telemetry_common::logging::LogBuilder; + +mod player; +mod recorder; +mod utils; + +#[derive(Parser)] +#[command(author, version, about, long_about = None, propagate_version = true)] +struct Cli { + #[command(subcommand)] + command: Commands, +} + +#[derive(Subcommand)] +enum Commands { + /// Replay a recorded session + Play(PlayArgs), + + /// Record an incoming session + Record(RecordArgs), +} + +#[derive(Debug, Args)] +struct PlayArgs { + /// Database file to replay + // #[clap(long, default_value = None)] + file: String, + + /// Destination IP address. Leave empty to use broadcast. + #[clap(long, default_value = None)] + destination: Option, + + /// Port to write to. + #[clap(long, default_value = "20777")] + port: u16, + + /// Loop over the database + #[clap(long = "loop")] + loop_play: bool, + + /// Play the packets in realtime or not. + #[clap(long)] + realtime: bool, + + /// Real-time playback factor (higher is faster) + #[clap(long, default_value = "1.0")] + realtime_factor: f32, + + /// Number of packets to skip at the start of the file + #[clap(long, default_value = "0")] + skip: u64, +} + +#[derive(Debug, Args)] +struct RecordArgs { + /// Database file to record to + #[clap(short, long, default_value = None)] + file: String, + + /// Host to bind on for the UDP packet listener + #[clap(long, default_value = "0.0.0.0")] + host: String, + + /// Port to bind on for the UDP packet listener + #[clap(long, default_value = "20777")] + port: u16, +} + +fn main() -> Result<()> { + let cli = Cli::parse(); + + LogBuilder::new() + .with_term_logger(LevelFilter::Info, TerminalMode::Mixed, ColorChoice::Auto) + .build() + .expect("Error initializing logger."); + + match &cli.command { + Commands::Play(args) => player::play(args), + Commands::Record(args) => recorder::record(args), + } +} diff --git a/f1-packet-recorder/src/player.rs b/f1-packet-recorder/src/player.rs new file mode 100644 index 0000000..f52f3a7 --- /dev/null +++ b/f1-packet-recorder/src/player.rs @@ -0,0 +1,161 @@ +use std::net::UdpSocket; +use std::sync::mpsc::Receiver; +use std::thread::sleep; +use std::time::Duration; + +use anyhow::{Error, Result}; +use log::info; +use rusqlite::Connection; +use time::Instant; + +use crate::utils::{ctrl_c_channel, get_database_connection}; + +use super::PlayArgs; + +pub(crate) fn play(args: &PlayArgs) -> Result<()> { + info!( + "Replaying {} to {}:{} (realtime: {}/{:.1}x, loop: {})", + args.file, + args.destination + .as_ref() + .unwrap_or(&String::from("")), + args.port, + args.realtime, + args.realtime_factor, + args.loop_play + ); + + let player = Player::new(args)?; + let ctrl_receiver = ctrl_c_channel()?; + + loop { + player.play(&ctrl_receiver)?; + + if !args.loop_play { + break; + } + } + + Ok(()) +} + +fn get_socket(destination: &Option, port: u16) -> Result { + let socket = UdpSocket::bind("0.0.0.0:0")?; + + match destination { + Some(d) => socket.connect(format!("{}:{}", d, port))?, + None => { + socket.connect(format!("0.0.0.0:{}", port))?; + socket.set_broadcast(true)?; + } + }; + + Ok(socket) +} + +struct TimestampedPacket { + timestamp: f64, + data: Vec, +} + +struct Player { + socket: UdpSocket, + conn: Connection, + + realtime: bool, + realtime_factor: f32, + skip: u64, +} + +impl Player { + fn new(args: &PlayArgs) -> Result { + let socket = get_socket(&args.destination, args.port)?; + let conn = get_database_connection(&args.file)?; + + Ok(Self { + socket, + conn, + realtime: args.realtime, + realtime_factor: args.realtime_factor, + skip: args.skip, + }) + } + + fn play(&self, ctrl_receiver: &Receiver<()>) -> Result<()> { + let playback_start = Instant::now(); + let first_timestamp = self.get_first_timestamp()?; + + let mut stmt = self + .conn + .prepare("SELECT timestamp, packet FROM packets ORDER BY pkt_id LIMIT -1 OFFSET ?;")?; + + let packets = stmt.query_map([self.skip], |r| { + Ok(TimestampedPacket { + timestamp: r.get(0)?, + data: r.get(1)?, + }) + })?; + + for (idx, packet) in packets.enumerate() { + if ctrl_receiver.try_recv().is_ok() { + info!("Stopping playback"); + return Err(Error::msg("ctrl-c received")); + } + + let packet = packet?; + + self.delay_next_packet(playback_start, first_timestamp, packet.timestamp); + + self.socket.send(&packet.data)?; + + if (idx + 1) % 500 == 0 { + let since_start = playback_start.elapsed().as_seconds_f64(); + let expected_elapsed = packet.timestamp - first_timestamp; + + info!( + "{} packages sent, delay: {:.3}ms", + idx + 1, + (since_start - expected_elapsed) * 1000.0 + ) + } + } + + Ok(()) + } + + fn get_first_timestamp(&self) -> Result { + let mut stmt = self.conn.prepare( + "SELECT timestamp FROM packets WHERE pkt_id = (SELECT min(pkt_id) FROM packets);", + )?; + + let mut rows = stmt.query([])?; + + Ok(rows.next()?.unwrap().get(0)?) + } + + fn delay_next_packet( + &self, + playback_start: Instant, + first_packet_timestamp: f64, + packet_timestamp: f64, + ) { + let duration = if self.realtime { + let expected_delay = + (packet_timestamp - first_packet_timestamp) / self.realtime_factor as f64; + let real_delay = playback_start.elapsed().as_seconds_f64(); + + if real_delay < expected_delay { + let delta = expected_delay - real_delay; + Some(Duration::from_secs_f64(delta)) + } else { + None + } + } else { + Some(Duration::from_millis(1)) + }; + + if let Some(d) = duration { + sleep(d); + } + } +} diff --git a/f1-packet-recorder/src/recorder.rs b/f1-packet-recorder/src/recorder.rs new file mode 100644 index 0000000..b82abeb --- /dev/null +++ b/f1-packet-recorder/src/recorder.rs @@ -0,0 +1,188 @@ +use std::net::UdpSocket; +use std::sync::mpsc::{channel, Receiver, TryRecvError}; +use std::thread::{sleep, spawn}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use anyhow::Result; +use log::{info, warn}; +use rusqlite::Connection; +use time::Instant; + +use crate::utils::{ctrl_c_channel, get_database_connection}; +use f1_telemetry::packet::{parse_packet, Packet}; + +use super::RecordArgs; + +pub(crate) fn record(args: &RecordArgs) -> Result<()> { + info!("Recording {}:{} to {}", args.host, args.port, args.file); + + let recorder = Recorder::new(args)?; + let ctrl_receiver = ctrl_c_channel()?; + + recorder.record(&ctrl_receiver)?; + + Ok(()) +} + +struct TimestampedPacket { + timestamp: f64, + packet: Packet, + raw_packet: Vec, +} + +struct Recorder { + host: String, + port: u16, + conn: Connection, +} + +impl Recorder { + fn new(args: &RecordArgs) -> Result { + let conn = get_database_connection(&args.file)?; + + Ok(Self { + host: args.host.clone(), + port: args.port, + conn, + }) + } + + fn record(&self, ctrl_receiver: &Receiver<()>) -> Result<()> { + self.init_db()?; + + let rx = self.start_receiver_socket()?; + let mut last_recv = Instant::now(); + let mut packets: Vec = Vec::with_capacity(256); + + loop { + if ctrl_receiver.try_recv().is_ok() { + info!("Stopping recording"); + break; + } + + let packet = match rx.try_recv() { + Ok(p) => Some(p), + Err(TryRecvError::Empty) => None, + Err(e) => { + warn!("Error receiving packet: {:?}", e); + continue; + } + }; + + if packet.is_none() { + if !packets.is_empty() && last_recv.elapsed() >= Duration::from_secs(5) { + info!("No packets received in the last 5 sec, saving packets to database."); + + self.save_packets(&mut packets)?; + } + sleep(Duration::from_millis(1)); + continue; + } + + packets.push(packet.unwrap()); + if packets.len() >= 256 { + info!("Saving packets to database."); + self.save_packets(&mut packets)?; + } + + last_recv = Instant::now(); + } + + info!("Saving packets to database."); + self.save_packets(&mut packets)?; + + Ok(()) + } + + fn init_db(&self) -> Result<()> { + let create_table_stmt = " + CREATE TABLE IF NOT EXISTS packets ( + pkt_id INTEGER PRIMARY KEY, + timestamp REAL NOT NULL, + packetFormat INTEGER NOT NULL, + gameMajorVersion INTEGER NOT NULL, + gameMinorVersion INTEGER NOT NULL, + packetVersion INTEGER NOT NULL, + packetID INTEGER NOT NULL, + sessionID CHAR(16) NOT NULL, + sessionTime REAL NOT NULL, + frameIdentifier INTEGER NOT NULL, + playerCarIndex INTEGER NOT NULL, + packet BLOB NOT NULL + ); + "; + + self.conn.execute(create_table_stmt, ())?; + + Ok(()) + } + + fn start_receiver_socket(&self) -> Result> { + let socket = UdpSocket::bind(format!("{}:{}", self.host, self.port))?; + let (tx, rx) = channel(); + + spawn(move || { + loop { + let mut buf = [0; 2048]; // All packets fit in 2048 bytes + match socket.recv(&mut buf) { + Ok(len) => match process_incoming_packet(len, &buf) { + Ok(p) => { + let _ = tx.send(p); + } + Err(e) => warn!("Error processing packet: {:?}", e), + }, + Err(e) => { + warn!("Invalid packet received: {:?}", e); + } + } + } + }); + + Ok(rx) + } + + fn save_packets(&self, packets: &mut Vec) -> Result<()> { + let mut stmt = self.conn.prepare_cached(" + INSERT INTO packets( + timestamp, packetFormat, gameMajorVersion, gameMinorVersion, packetVersion, packetID, + sessionID, sessionTime, frameIdentifier, playerCarIndex, packet + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?); + ")?; + + self.conn.execute("BEGIN;", ())?; + for p in packets.iter() { + let header = p.packet.header(); + stmt.execute(( + p.timestamp, + header.packet_format, + header.game_major_version, + header.game_minor_version, + header.packet_version, + header.packet_type as u8, + format!("{:16x}", header.session_uid), + header.session_time, + header.frame_identifier, + header.player_car_index, + &p.raw_packet, + ))?; + } + self.conn.execute("COMMIT;", ())?; + packets.clear(); + + Ok(()) + } +} + +fn process_incoming_packet(len: usize, buf: &[u8]) -> Result { + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Somehow we went back in time") + .as_secs_f64(); + let packet = parse_packet(len, buf)?; + + Ok(TimestampedPacket { + timestamp, + packet, + raw_packet: Vec::from(&buf[..len]), + }) +} diff --git a/f1-packet-recorder/src/utils.rs b/f1-packet-recorder/src/utils.rs new file mode 100644 index 0000000..66ced94 --- /dev/null +++ b/f1-packet-recorder/src/utils.rs @@ -0,0 +1,16 @@ +use rusqlite::Connection; +use std::sync::mpsc::{channel, Receiver}; + +pub(crate) fn ctrl_c_channel() -> anyhow::Result, ctrlc::Error> { + let (sender, receiver) = channel(); + ctrlc::set_handler(move || { + let _ = sender.send(()); + })?; + + Ok(receiver) +} +pub(crate) fn get_database_connection(file: &str) -> anyhow::Result { + let conn = Connection::open(file)?; + + Ok(conn) +} diff --git a/f1-telemetry-display/src/main.rs b/f1-telemetry-display/src/main.rs index 1424619..b6f9e1c 100644 --- a/f1-telemetry-display/src/main.rs +++ b/f1-telemetry-display/src/main.rs @@ -69,7 +69,7 @@ async fn main() { log_builder.with_term_logger(LevelFilter::Info, TerminalMode::Mixed, ColorChoice::Auto); } - log_builder.build().expect("Error initializing loggger."); + log_builder.build().expect("Error initializing logger."); start_stream(args.host, args.port).await; run(&args.ui).await; diff --git a/f1-telemetry/src/lib.rs b/f1-telemetry/src/lib.rs index ef0d04c..dd7967e 100644 --- a/f1-telemetry/src/lib.rs +++ b/f1-telemetry/src/lib.rs @@ -63,6 +63,7 @@ impl SyncStream { pub fn next(&self) -> Result { self.rt.block_on(self.stream.next()) } + pub fn next_from(&self) -> Result<(Packet, SocketAddr), UnpackError> { self.rt.block_on(self.stream.next_from()) } diff --git a/f1-telemetry/src/packet.rs b/f1-telemetry/src/packet.rs index 5ecb610..5834462 100644 --- a/f1-telemetry/src/packet.rs +++ b/f1-telemetry/src/packet.rs @@ -1,4 +1,6 @@ use serde::Serialize; +use std::error::Error; +use std::fmt::{Display, Formatter}; use car_damage::PacketCarDamageData; use car_setup::PacketCarSetupData; @@ -44,6 +46,14 @@ impl From> for UnpackError { } } +impl Display for UnpackError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + Display::fmt(&format!("Unpack error: {}", &self.0), f) + } +} + +impl Error for UnpackError {} + #[derive(Debug, Clone, Serialize)] #[serde(tag = "packet_type")] pub enum Packet { @@ -102,7 +112,28 @@ pub enum PacketType { MotionEx, } -pub(crate) fn parse_packet(size: usize, packet: &[u8]) -> Result { +impl From for u8 { + fn from(val: PacketType) -> u8 { + match val { + PacketType::Motion => 0, + PacketType::Session => 1, + PacketType::LapData => 2, + PacketType::Event => 3, + PacketType::Participants => 4, + PacketType::CarSetups => 5, + PacketType::CarTelemetry => 6, + PacketType::CarStatus => 7, + PacketType::FinalClassification => 8, + PacketType::LobbyInfo => 9, + PacketType::CarDamage => 10, + PacketType::SessionHistory => 11, + PacketType::TyreSets => 12, + PacketType::MotionEx => 13, + } + } +} + +pub fn parse_packet(size: usize, packet: &[u8]) -> Result { let packet_format = parse_version(packet); match packet_format { diff --git a/f1-ws-server/src/main.rs b/f1-ws-server/src/main.rs index 4b55e05..0530129 100644 --- a/f1-ws-server/src/main.rs +++ b/f1-ws-server/src/main.rs @@ -41,7 +41,7 @@ async fn main() { LogBuilder::new() .with_term_logger(LevelFilter::Info, TerminalMode::Mixed, ColorChoice::Auto) .build() - .expect("Error initializing loggger."); + .expect("Error initializing logger."); let addr = format!("{}:{}", args.listener_host, args.listener_port); let packet_stream = Stream::new(&addr)