diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml deleted file mode 100644 index 96c891c..0000000 --- a/.github/workflows/ci.yml +++ /dev/null @@ -1,71 +0,0 @@ ---- -name: CI - -on: - push: - branches: [main] - workflow_dispatch: - -concurrency: - group: ci-${{ github.ref }} - cancel-in-progress: true - -env: - DOCKER_BUILDKIT: 1 - COMPOSE_DOCKER_CLI_BUILD: 1 - -jobs: - ci: - name: Game Mods CI - runs-on: self-hosted - timeout-minutes: 30 - - steps: - - name: Pre-checkout cleanup - run: | - for item in outputs target .git/index.lock; do - if [ -d "$item" ] || [ -f "$item" ]; then - docker run --rm -v "$(pwd):/workspace" busybox:1.36.1 sh -c \ - "rm -rf /workspace/$item" 2>/dev/null || \ - sudo rm -rf "$item" 2>/dev/null || true - fi - done - - - name: Checkout - uses: actions/checkout@v4 - - - name: Set UID/GID - run: | - echo "USER_ID=$(id -u)" >> $GITHUB_ENV - echo "GROUP_ID=$(id -g)" >> $GITHUB_ENV - - # -- Formatting ------------------------------------------------------- - - name: Format check - run: docker compose --profile ci run --rm rust-ci cargo fmt --all -- --check - - # -- Linting ----------------------------------------------------------- - - name: Clippy - run: docker compose --profile ci run --rm rust-ci cargo clippy --all-targets -- -D warnings - - # -- Tests ------------------------------------------------------------- - - name: Test - run: docker compose --profile ci run --rm rust-ci cargo test - - # -- Build ------------------------------------------------------------- - - name: Build - run: docker compose --profile ci run --rm rust-ci cargo build --release - - # -- License / Advisory ------------------------------------------------ - - name: cargo-deny - run: docker compose --profile ci run --rm rust-ci cargo deny check - - # -- Cleanup ----------------------------------------------------------- - - name: Fix Docker file ownership - if: always() - run: | - for dir in target outputs; do - if [ -d "$dir" ]; then - docker run --rm -v "$(pwd)/$dir:/workspace" busybox:1.36.1 \ - chown -Rh "$(id -u):$(id -g)" /workspace 2>/dev/null || true - fi - done diff --git a/.github/workflows/main-ci.yml b/.github/workflows/main-ci.yml index 588539f..940a302 100644 --- a/.github/workflows/main-ci.yml +++ b/.github/workflows/main-ci.yml @@ -25,13 +25,9 @@ jobs: steps: - name: Pre-checkout cleanup run: | - for item in outputs target .git/index.lock; do - if [ -d "$item" ] || [ -f "$item" ]; then - docker run --rm -v "$(pwd):/workspace" busybox:1.36.1 sh -c \ - "rm -rf /workspace/$item" 2>/dev/null || \ - sudo rm -rf "$item" 2>/dev/null || true - fi - done + docker run --rm -v "$(pwd):/workspace" busybox:1.36.1 sh -c \ + "rm -rf /workspace/outputs /workspace/.git/index.lock" 2>/dev/null || \ + sudo rm -rf outputs .git/index.lock 2>/dev/null || true - name: Checkout uses: actions/checkout@v4 @@ -54,7 +50,7 @@ jobs: # -- Tests ------------------------------------------------------------- - name: Test - run: docker compose --profile ci run --rm rust-ci cargo test + run: docker compose --profile ci run --rm rust-ci cargo test -- --test-threads=4 # -- Build ------------------------------------------------------------- - name: Build diff --git a/.github/workflows/pr-validation.yml b/.github/workflows/pr-validation.yml index ab3e8bb..807440f 100644 --- a/.github/workflows/pr-validation.yml +++ b/.github/workflows/pr-validation.yml @@ -40,13 +40,9 @@ jobs: steps: - name: Pre-checkout cleanup run: | - for item in outputs target .git/index.lock; do - if [ -d "$item" ] || [ -f "$item" ]; then - docker run --rm -v "$(pwd):/workspace" busybox:1.36.1 sh -c \ - "rm -rf /workspace/$item" 2>/dev/null || \ - sudo rm -rf "$item" 2>/dev/null || true - fi - done + docker run --rm -v "$(pwd):/workspace" busybox:1.36.1 sh -c \ + "rm -rf /workspace/outputs /workspace/.git/index.lock" 2>/dev/null || \ + sudo rm -rf outputs .git/index.lock 2>/dev/null || true - name: Checkout uses: actions/checkout@v4 @@ -71,7 +67,7 @@ jobs: # -- Tests ------------------------------------------------------------- - name: Test - run: docker compose --profile ci run --rm rust-ci cargo test + run: docker compose --profile ci run --rm rust-ci cargo test -- --test-threads=4 # -- Build ------------------------------------------------------------- - name: Build diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 81309a7..d3879e1 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -13,6 +13,7 @@ repos: args: [--unsafe] - id: check-added-large-files args: [--maxkb=1000] + exclude: '^Cargo\.lock$' - id: check-json - id: pretty-format-json args: [--autofix, --no-sort-keys] @@ -35,19 +36,19 @@ repos: types: [shell] args: [-x] - # Rust formatting and linting (containerized) + # Rust formatting and linting (local, no Docker rebuild) - repo: local hooks: - id: rust-fmt name: Rust format check - entry: docker compose --profile ci run --rm rust-ci cargo fmt --all -- --check + entry: cargo fmt --all -- --check language: system files: '\.rs$' pass_filenames: false - id: rust-clippy name: Rust clippy lint - entry: docker compose --profile ci run --rm rust-ci cargo clippy --all-targets -- -D warnings + entry: cargo clippy --all-targets -- -D warnings language: system files: '\.rs$' pass_filenames: false diff --git a/Cargo.toml b/Cargo.toml index ff4667f..7e574eb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,6 +53,7 @@ authors = ["AndrewAltimit"] [workspace.dependencies] # Serialization serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" bitcode = { version = "0.6", features = ["serde"] } bincode = "1.3" # Legacy: only used in itk-net (laminar dependency) @@ -89,6 +90,17 @@ parking_lot = "0.12" # Vulkan ash = { version = "0.38", features = ["loaded"] } +# CLI +clap = { version = "4", features = ["derive"] } + +# GUI +egui = "0.28" +egui-wgpu = "0.28" +egui-winit = "0.28" +wgpu = "0.20" +winit = "0.29" +once_cell = "1.19" + # Hooking retour = { version = "0.3", features = ["static-detour"] } @@ -125,6 +137,8 @@ clone_on_ref_ptr = "warn" dbg_macro = "deny" todo = "deny" unimplemented = "deny" +undocumented_unsafe_blocks = "warn" +missing_safety_doc = "warn" [workspace.lints.rust] unsafe_op_in_unsafe_fn = "warn" diff --git a/core/itk-ipc/src/unix_impl.rs b/core/itk-ipc/src/unix_impl.rs index a8c9a24..93601b9 100644 --- a/core/itk-ipc/src/unix_impl.rs +++ b/core/itk-ipc/src/unix_impl.rs @@ -56,13 +56,16 @@ fn remove_socket_file(path: &str) { /// Non-blocking receive helper that keeps the lock held while consuming data. /// -/// This prevents race conditions where another thread could consume data between -/// peeking and actually reading. +/// Uses a single MSG_PEEK to check for the header, then a single blocking recv +/// to consume the full message. This avoids the race condition where data could +/// arrive between multiple peek operations. fn try_recv_with_fd(fd: std::os::unix::io::RawFd) -> Result>> { use itk_protocol::HEADER_SIZE; - // Use MSG_PEEK to check if enough data is available without consuming bytes. + // Use MSG_PEEK to check if enough data is available for the header. let mut peek_buf = [0u8; HEADER_SIZE]; + // SAFETY: `fd` is a valid socket file descriptor owned by the caller. + // `peek_buf` is a stack-allocated buffer with known size HEADER_SIZE. let peeked = unsafe { libc::recv( fd, @@ -96,55 +99,40 @@ fn try_recv_with_fd(fd: std::os::unix::io::RawFd) -> Result>> { let header = itk_protocol::Header::from_bytes(&peek_buf).map_err(IpcError::Protocol)?; let total_size = HEADER_SIZE + header.payload_len as usize; - // Peek again to check if full message is available + // Allocate buffer for the full message based on header-parsed length let mut message = vec![0u8; total_size]; - let peeked_full = unsafe { - libc::recv( - fd, - message.as_mut_ptr() as *mut libc::c_void, - total_size, - libc::MSG_PEEK | libc::MSG_DONTWAIT, - ) - }; - - if peeked_full < 0 { - let err = std::io::Error::last_os_error(); - if err.kind() == std::io::ErrorKind::WouldBlock - || err.kind() == std::io::ErrorKind::Interrupted - { - return Ok(None); - } - return Err(IpcError::Io(err)); - } - if (peeked_full as usize) < total_size { - return Ok(None); - } - - // Full message is available - consume it (we still hold the lock in the caller) + // Single recv to consume the full message (we already know the header is available, + // and stream sockets guarantee payload follows header in order) + // SAFETY: `fd` is a valid socket file descriptor. `message` is a heap-allocated + // buffer of `total_size` bytes, matching the length passed to recv. let received = unsafe { libc::recv( fd, message.as_mut_ptr() as *mut libc::c_void, total_size, - 0, // Blocking read, but we know data is available + libc::MSG_DONTWAIT, ) }; if received < 0 { let err = std::io::Error::last_os_error(); - // EINTR on final recv is unusual but handle it by reporting no data available - if err.kind() == std::io::ErrorKind::Interrupted { + if err.kind() == std::io::ErrorKind::WouldBlock + || err.kind() == std::io::ErrorKind::Interrupted + { return Ok(None); } return Err(IpcError::Io(err)); } - if (received as usize) != total_size { + if (received as usize) < total_size { + // Partial message: payload not fully available yet. + // Since we already consumed partial data from the stream, we can't + // put it back. This indicates a genuine protocol framing issue. return Err(IpcError::Protocol( itk_protocol::ProtocolError::IncompletePayload { - need: total_size - itk_protocol::HEADER_SIZE, - have: (received as usize).saturating_sub(itk_protocol::HEADER_SIZE), + need: total_size - HEADER_SIZE, + have: (received as usize).saturating_sub(HEADER_SIZE), }, )); } @@ -250,11 +238,14 @@ impl UnixSocketServer { // Set restrictive umask before creating socket to prevent race condition. // Without this, the socket would briefly exist with default permissions // (potentially allowing other users to connect) before set_permissions runs. + // SAFETY: `libc::umask` is always safe to call; it atomically sets the + // process umask and returns the previous value. let old_umask = unsafe { libc::umask(0o077) }; let bind_result = UnixListener::bind(&path); // Restore original umask immediately after bind + // SAFETY: Restoring the previously-saved umask value; always safe. unsafe { libc::umask(old_umask); } diff --git a/core/itk-protocol/src/lib.rs b/core/itk-protocol/src/lib.rs index 5cd5cc8..2da6f97 100644 --- a/core/itk-protocol/src/lib.rs +++ b/core/itk-protocol/src/lib.rs @@ -51,8 +51,12 @@ pub enum ProtocolError { #[error("payload too large: {size} bytes (max {max})")] PayloadTooLarge { size: usize, max: usize }, - #[error("CRC mismatch: expected {expected:#x}, got {got:#x}")] - CrcMismatch { expected: u32, got: u32 }, + #[error("CRC mismatch for {msg_type:?}: expected {expected:#x}, got {got:#x}")] + CrcMismatch { + msg_type: MessageType, + expected: u32, + got: u32, + }, #[error("unknown message type: {0}")] UnknownMessageType(u32), @@ -554,6 +558,12 @@ impl Validate for ScreenRect { impl Validate for WindowState { fn validate(&self) -> Result<(), ProtocolError> { validate_finite_f32("dpi_scale", self.dpi_scale)?; + if !(0.1..=10.0).contains(&self.dpi_scale) { + return Err(ProtocolError::ValidationFailed { + field: "dpi_scale".to_string(), + reason: format!("value {} out of valid range 0.1..=10.0", self.dpi_scale), + }); + } Ok(()) } } @@ -606,6 +616,12 @@ impl Validate for SyncState { fn validate(&self) -> Result<(), ProtocolError> { validate_string("content_id", &self.content_id)?; validate_finite_f64("playback_rate", self.playback_rate)?; + if self.playback_rate <= 0.0 { + return Err(ProtocolError::ValidationFailed { + field: "playback_rate".to_string(), + reason: format!("value {} must be positive (non-zero)", self.playback_rate), + }); + } Ok(()) } } @@ -629,6 +645,12 @@ impl Validate for VideoState { fn validate(&self) -> Result<(), ProtocolError> { validate_string("content_id", &self.content_id)?; validate_finite_f64("playback_rate", self.playback_rate)?; + if self.playback_rate <= 0.0 { + return Err(ProtocolError::ValidationFailed { + field: "playback_rate".to_string(), + reason: format!("value {} must be positive (non-zero)", self.playback_rate), + }); + } validate_finite_f32("volume", self.volume)?; Ok(()) } @@ -749,6 +771,7 @@ pub fn decode Deserialize<'de>>( let computed_crc = crc32fast::hash(payload_bytes); if computed_crc != header.crc32 { return Err(ProtocolError::CrcMismatch { + msg_type: header.msg_type, expected: header.crc32, got: computed_crc, }); @@ -1050,6 +1073,259 @@ mod tests { } let result: Result<(_, ScreenRect), _> = decode(&encoded); - assert!(matches!(result, Err(ProtocolError::CrcMismatch { .. }))); + match &result { + Err(ProtocolError::CrcMismatch { msg_type, .. }) => { + assert_eq!(*msg_type, MessageType::ScreenRect); + }, + other => panic!("expected CrcMismatch, got {other:?}"), + } + } + + // ========================================================================= + // Edge case tests (Phase 6) + // ========================================================================= + + #[test] + fn test_truncated_header() { + // Less than HEADER_SIZE bytes + let bytes = [0u8; HEADER_SIZE - 1]; + let result = Header::from_bytes(&bytes); + assert!(matches!( + result, + Err(ProtocolError::IncompleteHeader { need: 20, have: 19 }) + )); + } + + #[test] + fn test_empty_bytes() { + let result = Header::from_bytes(&[]); + assert!(matches!( + result, + Err(ProtocolError::IncompleteHeader { need: 20, have: 0 }) + )); + } + + #[test] + fn test_wrong_version() { + let header = Header { + magic: MAGIC, + version: 99, + msg_type: MessageType::Ping, + payload_len: 0, + crc32: 0, + }; + let bytes = header.to_bytes(); + // Manually write version since Header::to_bytes uses the field directly + let result = Header::from_bytes(&bytes); + assert!(matches!(result, Err(ProtocolError::UnsupportedVersion(99)))); + } + + #[test] + fn test_unknown_message_type() { + let mut bytes = [0u8; HEADER_SIZE]; + bytes[0..4].copy_from_slice(&MAGIC); + bytes[4..8].copy_from_slice(&VERSION.to_le_bytes()); + bytes[8..12].copy_from_slice(&999u32.to_le_bytes()); // invalid msg type + let result = Header::from_bytes(&bytes); + assert!(matches!( + result, + Err(ProtocolError::UnknownMessageType(999)) + )); + } + + #[test] + fn test_payload_too_large() { + let mut bytes = [0u8; HEADER_SIZE]; + bytes[0..4].copy_from_slice(&MAGIC); + bytes[4..8].copy_from_slice(&VERSION.to_le_bytes()); + bytes[8..12].copy_from_slice(&0u32.to_le_bytes()); // Ping + let oversized = (MAX_PAYLOAD_SIZE as u32) + 1; + bytes[12..16].copy_from_slice(&oversized.to_le_bytes()); + let result = Header::from_bytes(&bytes); + assert!(matches!(result, Err(ProtocolError::PayloadTooLarge { .. }))); + } + + #[test] + fn test_incomplete_payload() { + let rect = ScreenRect { + x: 1.0, + y: 2.0, + width: 3.0, + height: 4.0, + rotation: 0.0, + visible: true, + }; + let encoded = encode(MessageType::ScreenRect, &rect).unwrap(); + // Truncate the payload + let truncated = &encoded[..HEADER_SIZE + 1]; + let result: Result<(_, ScreenRect), _> = decode(truncated); + assert!(matches!( + result, + Err(ProtocolError::IncompletePayload { .. }) + )); + } + + #[test] + fn test_all_message_type_roundtrip() { + // Ensure every MessageType variant can be converted from its u32 value + let types = [ + (0, MessageType::Ping), + (1, MessageType::Pong), + (10, MessageType::ScreenRect), + (11, MessageType::WindowState), + (12, MessageType::OverlayUpdate), + (20, MessageType::StateSnapshot), + (21, MessageType::StateEvent), + (22, MessageType::StateQuery), + (23, MessageType::StateResponse), + (30, MessageType::SyncState), + (31, MessageType::ClockPing), + (32, MessageType::ClockPong), + (40, MessageType::VideoLoad), + (41, MessageType::VideoPlay), + (42, MessageType::VideoPause), + (43, MessageType::VideoSeek), + (44, MessageType::VideoState), + (45, MessageType::VideoMetadata), + (46, MessageType::VideoError), + (255, MessageType::Error), + ]; + + for (raw, expected) in types { + let result = MessageType::try_from(raw).unwrap(); + assert_eq!(result, expected, "MessageType mismatch for raw value {raw}"); + assert_eq!(result as u32, raw); + } + } + + #[test] + fn test_validate_dpi_scale_boundary_values() { + // Lower boundary + let ws = WindowState { + x: 0, + y: 0, + width: 100, + height: 100, + dpi_scale: 0.1, + is_fullscreen: false, + is_borderless: false, + is_focused: true, + }; + assert!(ws.validate().is_ok()); + + // Upper boundary + let ws = WindowState { + dpi_scale: 10.0, + ..ws + }; + assert!(ws.validate().is_ok()); + + // Just below lower boundary + let ws = WindowState { + dpi_scale: 0.09, + ..ws + }; + assert!(ws.validate().is_err()); + + // Just above upper boundary + let ws = WindowState { + dpi_scale: 10.1, + ..ws + }; + assert!(ws.validate().is_err()); + } + + #[test] + fn test_validate_zero_playback_rate_rejected() { + let state = VideoState { + content_id: "abc".to_string(), + position_ms: 0, + duration_ms: 0, + is_playing: false, + is_buffering: false, + playback_rate: 0.0, + volume: 0.5, + }; + let encoded = encode(MessageType::VideoState, &state).unwrap(); + let result: Result<(_, VideoState), _> = decode_validated(&encoded); + assert!(matches!( + result, + Err(ProtocolError::ValidationFailed { .. }) + )); + } + + #[test] + fn test_validate_negative_playback_rate_rejected() { + let sync = SyncState { + content_id: "abc".to_string(), + position_at_ref_ms: 0, + ref_wallclock_ms: 0, + is_playing: true, + playback_rate: -1.0, + }; + let encoded = encode(MessageType::SyncState, &sync).unwrap(); + let result: Result<(_, SyncState), _> = decode_validated(&encoded); + assert!(matches!( + result, + Err(ProtocolError::ValidationFailed { .. }) + )); + } + + #[test] + fn test_encode_decode_all_video_messages() { + // VideoPlay + let play = VideoPlay { + from_position_ms: Some(1000), + }; + let enc = encode(MessageType::VideoPlay, &play).unwrap(); + let (mt, dec): (_, VideoPlay) = decode(&enc).unwrap(); + assert_eq!(mt, MessageType::VideoPlay); + assert_eq!(dec.from_position_ms, Some(1000)); + + // VideoPause + let pause = VideoPause {}; + let enc = encode(MessageType::VideoPause, &pause).unwrap(); + let (mt, _dec): (_, VideoPause) = decode(&enc).unwrap(); + assert_eq!(mt, MessageType::VideoPause); + + // VideoSeek + let seek = VideoSeek { position_ms: 42000 }; + let enc = encode(MessageType::VideoSeek, &seek).unwrap(); + let (mt, dec): (_, VideoSeek) = decode(&enc).unwrap(); + assert_eq!(mt, MessageType::VideoSeek); + assert_eq!(dec.position_ms, 42000); + + // VideoMetadata + let meta = VideoMetadata { + content_id: "test".to_string(), + width: 1920, + height: 1080, + duration_ms: 60000, + fps: 30.0, + codec: "h264".to_string(), + is_live: false, + title: Some("Test Video".to_string()), + }; + let enc = encode(MessageType::VideoMetadata, &meta).unwrap(); + let (mt, dec): (_, VideoMetadata) = decode_validated(&enc).unwrap(); + assert_eq!(mt, MessageType::VideoMetadata); + assert_eq!(dec.width, 1920); + assert_eq!(dec.title, Some("Test Video".to_string())); + } + + #[test] + fn test_decode_header_only() { + let rect = ScreenRect { + x: 1.0, + y: 2.0, + width: 3.0, + height: 4.0, + rotation: 0.0, + visible: true, + }; + let encoded = encode(MessageType::ScreenRect, &rect).unwrap(); + let header = decode_header(&encoded).unwrap(); + assert_eq!(header.msg_type, MessageType::ScreenRect); + assert!(header.payload_len > 0); } } diff --git a/core/itk-shmem/src/lib.rs b/core/itk-shmem/src/lib.rs index 793ec04..adc797f 100644 --- a/core/itk-shmem/src/lib.rs +++ b/core/itk-shmem/src/lib.rs @@ -287,20 +287,20 @@ impl SeqlockHeader { /// Uses Acquire ordering to prevent subsequent data writes from being /// reordered before the sequence increment. This ensures readers see /// the odd sequence before any new data is written. - pub fn begin_write(&self) { + /// Returns `Err(ShmemError::SeqlockContention)` if another writer is already active. + pub fn begin_write(&self) -> Result<()> { let prev = self.writer_count.fetch_add(1, Ordering::Relaxed); - debug_assert_eq!( - prev, 0, - "Multiple concurrent writers detected (count was {prev}). \ - Seqlock requires single-writer only." - ); if prev != 0 { + // Undo the increment so the count stays correct + self.writer_count.fetch_sub(1, Ordering::Relaxed); warn!( - writer_count = prev + 1, - "Multiple concurrent seqlock writers detected — data may be corrupted" + writer_count = prev, + "Multiple concurrent seqlock writers detected — rejecting write" ); + return Err(ShmemError::SeqlockContention); } self.seq.fetch_add(1, Ordering::Acquire); + Ok(()) } /// End a write operation (marks sequence as even) @@ -564,7 +564,7 @@ impl FrameBuffer { // Begin critical section - marks seq odd // Acquire ordering prevents subsequent data writes from floating up - header.begin_write(); + header.begin_write()?; // Write buffer data inside the seqlock critical section // This ensures proper ordering on weak memory models (ARM) @@ -587,16 +587,17 @@ impl FrameBuffer { Ok(()) } - /// Maximum retry attempts before returning contention error - const MAX_READ_ATTEMPTS: u32 = 10000; + /// Maximum outer retry attempts for frame read consistency + const MAX_READ_ATTEMPTS: u32 = 100; /// Read the current frame (consumer side) /// /// Returns (pts_ms, data_changed) where data_changed indicates /// if this is a new frame since the last read. /// - /// This method has bounded retries to prevent infinite spinning if the - /// writer crashes or holds the seqlock for too long. + /// Uses exponential backoff on the inner seqlock read to avoid + /// excessive spinning. Total worst-case spins: ~25,500 iterations + /// (100 outer * ~255 average inner) instead of 10,000,000. pub fn read_frame(&self, last_pts: u64, buf: &mut [u8]) -> Result<(u64, bool)> { if buf.len() != self.frame_size { return Err(ShmemError::SizeMismatch { @@ -605,9 +606,10 @@ impl FrameBuffer { }); } - for _ in 0..Self::MAX_READ_ATTEMPTS { - // Use bounded read to prevent spinning on crashed writer - let state = self.header().read_with_timeout(1000)?; + for attempt in 0..Self::MAX_READ_ATTEMPTS { + // Exponential backoff: 100, 200, 400, 800, capped at 5000 + let inner_timeout = (100u32 << attempt.min(6)).min(5000); + let state = self.header().read_with_timeout(inner_timeout)?; // Skip copy if same frame if state.pts_ms == last_pts { @@ -669,7 +671,7 @@ mod tests { assert_eq!(header.seq.load(Ordering::SeqCst), 0); assert!(!header.is_write_in_progress()); - header.begin_write(); + header.begin_write().unwrap(); assert_eq!(header.seq.load(Ordering::SeqCst), 1); assert!(header.is_write_in_progress()); @@ -693,7 +695,7 @@ mod tests { assert_eq!(state.unwrap().read_idx, 1); // During write - should fail - header.begin_write(); + header.begin_write().unwrap(); assert!(header.try_read().is_none()); // After write - should succeed @@ -705,6 +707,7 @@ mod tests { #[test] fn test_seqlock_detects_concurrent_modification() { let mut header_mem = AlignedHeaderMem([0u8; SeqlockHeader::SIZE]); + // SAFETY: Aligned memory of correct size for SeqlockHeader. let header = unsafe { SeqlockHeader::init(header_mem.0.as_mut_ptr()) }; // Simulate a "torn read" scenario: @@ -713,7 +716,7 @@ mod tests { assert_eq!(seq1, 0); // Writer does a complete write - header.begin_write(); + header.begin_write().unwrap(); header.read_idx.store(42, Ordering::Relaxed); header.end_write(); @@ -721,6 +724,134 @@ mod tests { let seq2 = header.seq.load(Ordering::Acquire); assert_ne!(seq1, seq2); } + + // ========================================================================= + // Phase 6: Additional boundary/edge case tests + // ========================================================================= + + #[test] + fn test_calculate_size_zero_dimensions() { + assert!(FrameBuffer::calculate_size(0, 720).is_err()); + assert!(FrameBuffer::calculate_size(1280, 0).is_err()); + assert!(FrameBuffer::calculate_size(0, 0).is_err()); + } + + #[test] + fn test_calculate_size_max_dimension() { + // Just at the limit should work + let result = FrameBuffer::calculate_size(MAX_FRAME_DIMENSION, 1); + assert!(result.is_ok()); + + let result = FrameBuffer::calculate_size(1, MAX_FRAME_DIMENSION); + assert!(result.is_ok()); + + // Over the limit should fail + let result = FrameBuffer::calculate_size(MAX_FRAME_DIMENSION + 1, 1); + assert!(result.is_err()); + + let result = FrameBuffer::calculate_size(1, MAX_FRAME_DIMENSION + 1); + assert!(result.is_err()); + } + + #[test] + fn test_calculate_size_known_resolutions() { + // 720p + let size = FrameBuffer::calculate_size(1280, 720).unwrap(); + assert_eq!(size, 64 + (1280 * 720 * 4 * 3)); + + // 1080p + let size = FrameBuffer::calculate_size(1920, 1080).unwrap(); + assert_eq!(size, 64 + (1920 * 1080 * 4 * 3)); + + // 4K + let size = FrameBuffer::calculate_size(3840, 2160).unwrap(); + assert_eq!(size, 64 + (3840 * 2160 * 4 * 3)); + } + + #[test] + fn test_seqlock_read_with_timeout_returns_ok_when_no_writer() { + let mut header_mem = AlignedHeaderMem([0u8; SeqlockHeader::SIZE]); + // SAFETY: Aligned memory of correct size for SeqlockHeader. + let header = unsafe { SeqlockHeader::init(header_mem.0.as_mut_ptr()) }; + + // No writer active, should read immediately + let result = header.read_with_timeout(1); + assert!(result.is_ok()); + } + + #[test] + fn test_seqlock_read_with_timeout_returns_err_during_write() { + let mut header_mem = AlignedHeaderMem([0u8; SeqlockHeader::SIZE]); + // SAFETY: Aligned memory of correct size for SeqlockHeader. + let header = unsafe { SeqlockHeader::init(header_mem.0.as_mut_ptr()) }; + + header.begin_write().unwrap(); + // Writer is active, should time out + let result = header.read_with_timeout(10); + assert!(matches!(result, Err(ShmemError::SeqlockContention))); + header.end_write(); + } + + #[test] + fn test_seqlock_multi_writer_detection() { + let mut header_mem = AlignedHeaderMem([0u8; SeqlockHeader::SIZE]); + // SAFETY: Aligned memory of correct size for SeqlockHeader. + let header = unsafe { SeqlockHeader::init(header_mem.0.as_mut_ptr()) }; + + // First writer succeeds + assert!(header.begin_write().is_ok()); + // Second writer is rejected + assert!(matches!( + header.begin_write(), + Err(ShmemError::SeqlockContention) + )); + // Writer count is still 1 (not corrupted) + assert_eq!(header.writer_count.load(Ordering::Relaxed), 1); + header.end_write(); + // After end_write, count is 0 + assert_eq!(header.writer_count.load(Ordering::Relaxed), 0); + } + + #[test] + fn test_seqlock_state_fields() { + let mut header_mem = AlignedHeaderMem([0u8; SeqlockHeader::SIZE]); + // SAFETY: Aligned memory of correct size for SeqlockHeader. + let header = unsafe { SeqlockHeader::init(header_mem.0.as_mut_ptr()) }; + + // Write specific values + header.begin_write().unwrap(); + header.read_idx.store(2, Ordering::Relaxed); + header.pts_ms.store(99999, Ordering::Relaxed); + header.frame_width.store(1920, Ordering::Relaxed); + header.frame_height.store(1080, Ordering::Relaxed); + header.is_playing.store(1, Ordering::Relaxed); + header.content_id_hash.store(0xDEAD, Ordering::Relaxed); + header.duration_ms.store(60000, Ordering::Relaxed); + header.end_write(); + + // Read back and verify all fields + let state = header.try_read().expect("should read without contention"); + assert_eq!(state.read_idx, 2); + assert_eq!(state.pts_ms, 99999); + assert_eq!(state.frame_width, 1920); + assert_eq!(state.frame_height, 1080); + assert!(state.is_playing); + assert_eq!(state.content_id_hash, 0xDEAD); + assert_eq!(state.duration_ms, 60000); + } + + #[test] + fn test_seqlock_read_blocking() { + let mut header_mem = AlignedHeaderMem([0u8; SeqlockHeader::SIZE]); + // SAFETY: Aligned memory of correct size for SeqlockHeader. + let header = unsafe { SeqlockHeader::init(header_mem.0.as_mut_ptr()) }; + + header.pts_ms.store(42, Ordering::Relaxed); + + // No writer active — should return immediately + let state = header.read_blocking(); + assert_eq!(state.pts_ms, 42); + } } /// Loom-based concurrency tests for verifying seqlock correctness diff --git a/core/itk-sync/src/lib.rs b/core/itk-sync/src/lib.rs index c1b0902..76fbbdb 100644 --- a/core/itk-sync/src/lib.rs +++ b/core/itk-sync/src/lib.rs @@ -31,6 +31,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; #[cfg(test)] use std::time::Duration; use thiserror::Error; +use tracing::warn; /// Sync errors #[derive(Error, Debug)] @@ -244,7 +245,19 @@ impl PlaybackSync { return self.position_at_ref_ms; } - let elapsed = now_ms().saturating_sub(self.ref_wallclock_ms); + let now = now_ms(); + let elapsed = now.saturating_sub(self.ref_wallclock_ms); + + // Detect backward clock jumps (ref_wallclock is in the future) + if self.ref_wallclock_ms > now && self.ref_wallclock_ms - now > 1000 { + warn!( + ref_wallclock_ms = self.ref_wallclock_ms, + now_ms = now, + jump_ms = self.ref_wallclock_ms - now, + "clock jump detected: reference time is in the future" + ); + } + let adjusted_elapsed = (elapsed as f64 * self.playback_rate) as u64; // Use saturating_add to prevent overflow on large values @@ -656,4 +669,136 @@ mod tests { // Should be approximately +500 (we're 500ms ahead) assert!((400..=600).contains(&d)); } + + // ========================================================================= + // Phase 6: Additional edge case tests + // ========================================================================= + + #[test] + fn test_clock_sync_no_offset_before_samples() { + let sync = ClockSync::new(); + assert!(!sync.is_synced()); + assert_eq!(sync.offset_ms(), None); + assert!(sync.local_to_remote(1000).is_err()); + assert!(sync.remote_to_local(1000).is_err()); + } + + #[test] + fn test_clock_sync_negative_offset() { + let mut sync = ClockSync::new(); + // Remote is behind local: remote_time < local_at_remote + // send=1000, remote=950, recv=1100 -> rtt=100, one_way=50 + // offset = 950 - (1000+50) = -100 + sync.process_pong(1000, 950, 1100); + let offset = sync.offset_ms().unwrap(); + assert_eq!(offset, -100); + + // Conversions should work with negative offset + let remote = sync.local_to_remote(1000).unwrap(); + assert_eq!(remote, 900); // 1000 - 100 + + let local = sync.remote_to_local(900).unwrap(); + assert_eq!(local, 1000); // 900 + 100 + } + + #[test] + fn test_clock_sync_max_samples_eviction() { + let mut sync = ClockSync::new(); + + // Add more than max_samples (10) to test eviction + for i in 0..15u64 { + sync.process_pong(i * 100, i * 100 + 50, i * 100 + 100); + } + + assert!(sync.is_synced()); + // After eviction, should still have valid offset + let offset = sync.offset_ms().unwrap(); + assert_eq!(offset, 0); // All samples have 0 offset + } + + #[test] + fn test_clock_sync_zero_rtt() { + let mut sync = ClockSync::new(); + // Zero RTT (instantaneous round trip) + sync.process_pong(1000, 1000, 1000); + assert_eq!(sync.offset_ms(), Some(0)); + } + + #[test] + fn test_playback_sync_position_with_rate() { + let mut sync = PlaybackSync::new("test".to_string()); + sync.position_at_ref_ms = 10000; + sync.ref_wallclock_ms = now_ms() - 1000; // 1 second ago + sync.is_playing = true; + sync.playback_rate = 2.0; // Double speed + + // Should be approximately 12000 (10000 + 1000 * 2.0) + let pos = sync.current_position_ms(); + assert!((11800..=12200).contains(&pos)); + } + + #[test] + fn test_playback_sync_position_saturates() { + let mut sync = PlaybackSync::new("test".to_string()); + sync.position_at_ref_ms = u64::MAX - 100; + sync.ref_wallclock_ms = now_ms() - 1000; + sync.is_playing = true; + sync.playback_rate = 1.0; + + // Should saturate at u64::MAX, not wrap around + let pos = sync.current_position_ms(); + assert_eq!(pos, u64::MAX); + } + + #[test] + fn test_playback_sync_update_from() { + let mut local = PlaybackSync::new("old".to_string()); + local.playback_rate = 0.98; // Local drift correction + + let remote = PlaybackSync { + content_id: "new_content".to_string(), + position_at_ref_ms: 5000, + ref_wallclock_ms: 123456789, + is_playing: true, + playback_rate: 1.05, + }; + + local.update_from(&remote); + + assert_eq!(local.content_id, "new_content"); + assert_eq!(local.position_at_ref_ms, 5000); + assert_eq!(local.ref_wallclock_ms, 123456789); + assert!(local.is_playing); + // Playback rate should NOT be copied (it's local drift correction) + assert_eq!(local.playback_rate, 0.98); + } + + #[test] + fn test_drift_corrector_no_target_no_seek() { + let corrector = DriftCorrector::new(); + assert!(corrector.should_seek(99999).is_none()); + assert!(corrector.current_drift_ms(99999).is_none()); + } + + #[test] + fn test_drift_correction_with_clock_offset() { + let mut corrector = DriftCorrector::new(); + + // Sync clocks: remote is 100ms ahead + corrector.clock_sync_mut().process_pong(1000, 1150, 1100); + // offset = 1150 - (1000+50) = 100 + + let mut target = PlaybackSync::new("test".to_string()); + target.position_at_ref_ms = 10000; + target.ref_wallclock_ms = now_ms() + 100; // Remote time (100ms ahead) + target.is_playing = true; + target.playback_rate = 1.0; + + corrector.update_target(target); + + // With clock correction applied, position should be close to target + let rate = corrector.calculate_rate(10000); + // Should be within tolerance (1.0) since we account for clock offset + assert_eq!(rate, 1.0); + } } diff --git a/core/itk-video/src/decoder.rs b/core/itk-video/src/decoder.rs index 2e76d0d..d07f673 100644 --- a/core/itk-video/src/decoder.rs +++ b/core/itk-video/src/decoder.rs @@ -152,6 +152,8 @@ impl VideoDecoder { // Set hardware device context if available if let Some(ref hw) = hw_ctx { + // SAFETY: `decoder_ctx` is a valid codec context. `hw.new_ref()` returns + // a valid AVBufferRef that ffmpeg takes ownership of via hw_device_ctx. unsafe { let raw = decoder_ctx.as_mut_ptr(); (*raw).hw_device_ctx = hw.new_ref(); @@ -160,6 +162,8 @@ impl VideoDecoder { info!("Hardware acceleration enabled (D3D11VA)"); } else { // Software mode: set thread_count for better performance + // SAFETY: `decoder_ctx` is a valid codec context. Setting thread_count + // to 0 tells ffmpeg to auto-detect the optimal thread count. unsafe { let raw = decoder_ctx.as_mut_ptr(); (*raw).thread_count = 0; // auto-detect @@ -169,6 +173,24 @@ impl VideoDecoder { let decoder = decoder_ctx.decoder().video()?; + // Validate source dimensions against safety limit + let src_w = decoder.width(); + let src_h = decoder.height(); + if src_w > itk_shmem::MAX_FRAME_DIMENSION || src_h > itk_shmem::MAX_FRAME_DIMENSION { + return Err(VideoError::InvalidFrame(format!( + "source dimensions {}x{} exceed maximum {}", + src_w, + src_h, + itk_shmem::MAX_FRAME_DIMENSION + ))); + } + if src_w == 0 || src_h == 0 { + return Err(VideoError::InvalidFrame(format!( + "source dimensions {}x{} are zero", + src_w, src_h + ))); + } + info!( width = decoder.width(), height = decoder.height(), @@ -184,7 +206,7 @@ impl VideoDecoder { decoder, video_stream_index, time_base, - scaler: FrameScaler::with_size(width, height), + scaler: FrameScaler::with_size(width, height)?, duration_ms, fps, frame: VideoFrame::empty(), @@ -234,6 +256,8 @@ impl VideoDecoder { Ok(()) => { // Transfer from GPU to CPU memory if using hardware acceleration if self.hw_accel_active { + // SAFETY: `self.frame` is a valid, just-decoded AVFrame from + // `receive_frame`. The transfer function handles null checks. unsafe { crate::hwaccel::transfer_hw_frame_if_needed(self.frame.as_mut_ptr()); } @@ -241,7 +265,7 @@ impl VideoDecoder { // Calculate PTS in milliseconds let pts = self.frame.pts().unwrap_or(0); - let pts_ms = self.pts_to_ms(pts); + let pts_ms = self.pts_to_ms(pts)?; // Scale the frame to output resolution let scaled_data = self.scaler.scale(&self.frame)?; @@ -300,20 +324,28 @@ impl VideoDecoder { } /// Convert a PTS value to milliseconds. - fn pts_to_ms(&self, pts: i64) -> u64 { + fn pts_to_ms(&self, pts: i64) -> VideoResult { if pts < 0 { - return 0; + return Ok(0); } let num = self.time_base.numerator() as i64; let den = self.time_base.denominator() as i64; if den == 0 { - return 0; + return Ok(0); } // pts * (num / den) * 1000 = pts * num * 1000 / den - ((pts * num * 1000) / den) as u64 + // Use checked arithmetic to prevent overflow with large PTS values + let numerator = pts + .checked_mul(num) + .and_then(|v| v.checked_mul(1000)) + .ok_or_else(|| { + VideoError::DecodeError(format!("PTS overflow: pts={pts}, time_base={num}/{den}")) + })?; + + Ok((numerator / den) as u64) } /// Convert milliseconds to PTS value. diff --git a/core/itk-video/src/frame_writer.rs b/core/itk-video/src/frame_writer.rs index f54b265..5cc3183 100644 --- a/core/itk-video/src/frame_writer.rs +++ b/core/itk-video/src/frame_writer.rs @@ -13,7 +13,7 @@ pub const DEFAULT_SHMEM_NAME: &str = "itk_video_frames"; /// Writes decoded video frames to a shared memory buffer. pub struct FrameWriter { buffer: FrameBuffer, - last_pts_ms: u64, + last_pts_ms: Option, content_id_hash: u64, } @@ -36,7 +36,7 @@ impl FrameWriter { }; Ok(Self { buffer, - last_pts_ms: 0, + last_pts_ms: None, content_id_hash: 0, }) } @@ -51,7 +51,7 @@ impl FrameWriter { let buffer = FrameBuffer::open(name, width, height)?; Ok(Self { buffer, - last_pts_ms: 0, + last_pts_ms: None, content_id_hash: 0, }) } @@ -81,13 +81,21 @@ impl FrameWriter { /// Returns `true` if the frame was written, `false` if skipped. pub fn write_frame(&mut self, frame: &DecodedFrame) -> VideoResult { // Frame-skip optimization: don't write if PTS hasn't changed - if frame.pts_ms == self.last_pts_ms && self.last_pts_ms > 0 { + if self.last_pts_ms == Some(frame.pts_ms) { trace!(pts_ms = frame.pts_ms, "skipping duplicate frame"); return Ok(false); } - // Verify frame dimensions match buffer - let expected_size = (frame.width as usize) * (frame.height as usize) * 4; + // Verify frame dimensions match buffer (checked arithmetic for untrusted dimensions) + let expected_size = (frame.width as usize) + .checked_mul(frame.height as usize) + .and_then(|s| s.checked_mul(4)) + .ok_or_else(|| { + VideoError::InvalidFrame(format!( + "frame size overflow: {}x{}x4", + frame.width, frame.height + )) + })?; if frame.data.len() != expected_size { return Err(VideoError::InvalidFrame(format!( "frame size mismatch: expected {} bytes, got {}", @@ -97,12 +105,14 @@ impl FrameWriter { } // Write to the shared memory buffer + // SAFETY: Frame data has been validated above to match expected dimensions. + // FrameWriter is the sole writer to this shared memory region. unsafe { self.buffer .write_frame(&frame.data, frame.pts_ms, self.content_id_hash)?; } - self.last_pts_ms = frame.pts_ms; + self.last_pts_ms = Some(frame.pts_ms); trace!(pts_ms = frame.pts_ms, "wrote frame to shmem"); Ok(true) } @@ -112,21 +122,23 @@ impl FrameWriter { /// This is a lower-level API for when you have raw RGBA data. pub fn write_raw(&mut self, data: &[u8], pts_ms: u64) -> VideoResult { // Frame-skip optimization - if pts_ms == self.last_pts_ms && self.last_pts_ms > 0 { + if self.last_pts_ms == Some(pts_ms) { return Ok(false); } + // SAFETY: Caller provides raw RGBA data; FrameBuffer validates size. + // FrameWriter is the sole writer to this shared memory region. unsafe { self.buffer .write_frame(data, pts_ms, self.content_id_hash)?; } - self.last_pts_ms = pts_ms; + self.last_pts_ms = Some(pts_ms); Ok(true) } /// Get the last written PTS in milliseconds. - pub fn last_pts_ms(&self) -> u64 { + pub fn last_pts_ms(&self) -> Option { self.last_pts_ms } @@ -167,7 +179,12 @@ impl FrameReader { /// Open an existing shared memory region for reading. pub fn open(name: &str, width: u32, height: u32) -> VideoResult { let buffer = FrameBuffer::open(name, width, height)?; - let frame_size = (width as usize) * (height as usize) * 4; + let frame_size = (width as usize) + .checked_mul(height as usize) + .and_then(|s| s.checked_mul(4)) + .ok_or_else(|| { + VideoError::InvalidFrame(format!("frame size overflow: {width}x{height}x4")) + })?; Ok(Self { buffer, last_pts_ms: 0, diff --git a/core/itk-video/src/hwaccel.rs b/core/itk-video/src/hwaccel.rs index 04ae89c..27d5bb6 100644 --- a/core/itk-video/src/hwaccel.rs +++ b/core/itk-video/src/hwaccel.rs @@ -30,6 +30,8 @@ impl HwDeviceContext { /// Returns `None` if D3D11VA is not available on this system. /// This is expected on systems without a GPU or with incompatible drivers. pub fn create_d3d11va() -> Option { + // SAFETY: `av_hwdevice_ctx_create` is called with valid out-pointer and + // enum discriminant. Null device/options pointers are permitted by ffmpeg API. unsafe { let mut device_ctx: *mut ffi::AVBufferRef = ptr::null_mut(); let ret = ffi::av_hwdevice_ctx_create( @@ -58,6 +60,8 @@ impl HwDeviceContext { /// # Safety /// The caller must ensure `self` contains a valid device context pointer. pub unsafe fn new_ref(&self) -> *mut ffi::AVBufferRef { + // SAFETY: `self.device_ctx` is valid and non-null (guaranteed by constructor). + // `av_buffer_ref` creates a new reference to the same underlying buffer. unsafe { ffi::av_buffer_ref(self.device_ctx) } } @@ -69,6 +73,8 @@ impl HwDeviceContext { impl Drop for HwDeviceContext { fn drop(&mut self) { + // SAFETY: `device_ctx` is either null (checked) or a valid AVBufferRef + // allocated by `av_hwdevice_ctx_create`. `av_buffer_unref` frees it. unsafe { if !self.device_ctx.is_null() { ffi::av_buffer_unref(&mut self.device_ctx); @@ -93,7 +99,7 @@ pub unsafe fn transfer_hw_frame_if_needed(frame: *mut ffi::AVFrame) -> bool { return false; } - // Safety: all operations below involve FFI calls and raw pointer derefs + // SAFETY: All operations below involve FFI calls and raw pointer derefs // that require unsafe. The caller guarantees `frame` is valid and non-null. unsafe { let format = (*frame).format; diff --git a/core/itk-video/src/scaler.rs b/core/itk-video/src/scaler.rs index 1fe1874..53211e9 100644 --- a/core/itk-video/src/scaler.rs +++ b/core/itk-video/src/scaler.rs @@ -25,19 +25,26 @@ pub struct FrameScaler { impl FrameScaler { /// Create a new scaler with the default 720p output resolution. - pub fn new() -> Self { + pub fn new() -> VideoResult { Self::with_size(DEFAULT_WIDTH, DEFAULT_HEIGHT) } /// Create a new scaler with a custom output resolution. - pub fn with_size(width: u32, height: u32) -> Self { - let buffer_size = (width as usize) * (height as usize) * BYTES_PER_PIXEL; + pub fn with_size(width: u32, height: u32) -> VideoResult { + let buffer_size = (width as usize) + .checked_mul(height as usize) + .and_then(|s| s.checked_mul(BYTES_PER_PIXEL)) + .ok_or_else(|| { + VideoError::InvalidFrame(format!( + "frame size overflow: {width}x{height}x{BYTES_PER_PIXEL}" + )) + })?; let mut output_frame = VideoFrame::empty(); output_frame.set_format(Pixel::RGBA); output_frame.set_width(width); output_frame.set_height(height); - Self { + Ok(Self { context: None, output_width: width, output_height: height, @@ -47,7 +54,7 @@ impl FrameScaler { last_input_width: 0, last_input_height: 0, last_input_format: Pixel::None, - } + }) } /// Get the output width. @@ -93,10 +100,15 @@ impl FrameScaler { self.last_input_format = input_format; } - let ctx = self.context.as_mut().unwrap(); + let ctx = self + .context + .as_mut() + .ok_or_else(|| VideoError::ScaleError("scaling context not initialized".to_string()))?; // Allocate output frame buffer if needed if !self.frame_allocated { + // SAFETY: `output_frame` is a valid AVFrame with format, width, and height + // set. `av_frame_get_buffer` allocates data planes for these parameters. unsafe { let ret = ffmpeg_next::ffi::av_frame_get_buffer( self.output_frame.as_mut_ptr(), @@ -140,14 +152,20 @@ impl FrameScaler { } /// Get the expected output buffer size in bytes. + /// + /// Panics on overflow (dimensions are validated in `with_size`). pub fn buffer_size(&self) -> usize { - (self.output_width as usize) * (self.output_height as usize) * BYTES_PER_PIXEL + (self.output_width as usize) + .checked_mul(self.output_height as usize) + .and_then(|s| s.checked_mul(BYTES_PER_PIXEL)) + .expect("buffer_size overflow (dimensions should have been validated)") } } impl Default for FrameScaler { fn default() -> Self { - Self::new() + // Default 720p dimensions are always valid; unwrap is safe + Self::new().expect("default 720p scaler dimensions should never overflow") } } @@ -157,7 +175,7 @@ mod tests { #[test] fn test_scaler_dimensions() { - let scaler = FrameScaler::new(); + let scaler = FrameScaler::new().unwrap(); assert_eq!(scaler.width(), DEFAULT_WIDTH); assert_eq!(scaler.height(), DEFAULT_HEIGHT); assert_eq!(scaler.buffer_size(), 1280 * 720 * 4); @@ -165,9 +183,15 @@ mod tests { #[test] fn test_custom_dimensions() { - let scaler = FrameScaler::with_size(640, 480); + let scaler = FrameScaler::with_size(640, 480).unwrap(); assert_eq!(scaler.width(), 640); assert_eq!(scaler.height(), 480); assert_eq!(scaler.buffer_size(), 640 * 480 * 4); } + + #[test] + fn test_overflow_dimensions() { + let result = FrameScaler::with_size(u32::MAX, u32::MAX); + assert!(result.is_err()); + } } diff --git a/core/itk-video/src/stream.rs b/core/itk-video/src/stream.rs index 5e0a3db..7c1ee23 100644 --- a/core/itk-video/src/stream.rs +++ b/core/itk-video/src/stream.rs @@ -34,10 +34,13 @@ impl StreamSource { } /// Check if this source is a YouTube URL. + /// + /// Validates using host matching rather than substring search to avoid + /// false positives (e.g., a URL containing "youtube.com" in the path). pub fn is_youtube(&self) -> bool { match self { StreamSource::Url(url) | StreamSource::UrlWithAudio { video: url, .. } => { - url.contains("youtube.com") || url.contains("youtu.be") + is_youtube_url(url) }, StreamSource::File(_) => false, } @@ -60,6 +63,31 @@ impl StreamSource { } } +/// Check if a URL string is a YouTube URL by parsing the host. +/// +/// Matches `youtube.com`, `www.youtube.com`, `m.youtube.com`, and `youtu.be`. +fn is_youtube_url(url: &str) -> bool { + // Extract host from URL: skip scheme, take up to first '/' or end + let without_scheme = url + .strip_prefix("https://") + .or_else(|| url.strip_prefix("http://")) + .unwrap_or(url); + + let host = without_scheme + .split('/') + .next() + .unwrap_or("") + .split(':') + .next() + .unwrap_or(""); + + host == "youtube.com" + || host == "www.youtube.com" + || host == "m.youtube.com" + || host == "youtu.be" + || host == "www.youtu.be" +} + impl From for StreamSource { fn from(path: PathBuf) -> Self { StreamSource::File(path) @@ -107,7 +135,10 @@ mod tests { fn test_is_youtube() { assert!(StreamSource::from_string("https://www.youtube.com/watch?v=abc").is_youtube()); assert!(StreamSource::from_string("https://youtu.be/abc").is_youtube()); + assert!(StreamSource::from_string("https://m.youtube.com/watch?v=abc").is_youtube()); assert!(!StreamSource::from_string("https://example.com/video.mp4").is_youtube()); assert!(!StreamSource::from_string("/path/to/video.mp4").is_youtube()); + // False positive case: "youtube.com" in path should NOT match + assert!(!StreamSource::from_string("https://example.com/youtube.com/video").is_youtube()); } } diff --git a/daemon/Cargo.toml b/daemon/Cargo.toml index a3f5197..db27b33 100644 --- a/daemon/Cargo.toml +++ b/daemon/Cargo.toml @@ -14,7 +14,7 @@ itk-ipc = { path = "../core/itk-ipc" } itk-sync = { path = "../core/itk-sync" } serde = { workspace = true } -serde_json = "1.0" +serde_json = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/deny.toml b/deny.toml index 738b389..7e00a81 100644 --- a/deny.toml +++ b/deny.toml @@ -7,9 +7,7 @@ ignore = [ # bincode 1.3: still used transitively by itk-net (via laminar) # Direct protocol serialization migrated to bitcode. "RUSTSEC-2025-0141", - # bytes: integer overflow in BytesMut::reserve (RUSTSEC-2026-0007) - # TODO: update bytes to patched version when laminar updates - "RUSTSEC-2026-0007", + # bytes advisory resolved — bytes updated to patched version ] [licenses] @@ -39,7 +37,12 @@ private = { ignore = true } [bans] multiple-versions = "warn" wildcards = "allow" -deny = [] +# Deny known-problematic crates from being added as dependencies +deny = [ + # Use parking_lot instead of std::sync::Mutex (never poisons) + # Note: this only prevents direct deps, not transitive + { crate = "openssl", use-instead = "rustls" }, +] [sources] unknown-registry = "warn" diff --git a/docker/rust-ci.Dockerfile b/docker/rust-ci.Dockerfile index 5b20adf..58f5a80 100644 --- a/docker/rust-ci.Dockerfile +++ b/docker/rust-ci.Dockerfile @@ -30,7 +30,7 @@ RUN rustup component add rustfmt clippy # Install cargo-deny for license/advisory checks RUN --mount=type=cache,target=/usr/local/cargo/registry \ - cargo install cargo-deny --locked 2>/dev/null || true + cargo install cargo-deny --locked # Non-root user (overridden by docker-compose USER_ID/GROUP_ID) RUN useradd -m -u 1000 ciuser \ diff --git a/projects/nms-cockpit-video/daemon/Cargo.toml b/projects/nms-cockpit-video/daemon/Cargo.toml index 6565800..8e79385 100644 --- a/projects/nms-cockpit-video/daemon/Cargo.toml +++ b/projects/nms-cockpit-video/daemon/Cargo.toml @@ -30,7 +30,7 @@ tokio = { workspace = true } # Serialization serde = { workspace = true } -serde_json = "1.0" +serde_json = { workspace = true } # Logging tracing = { workspace = true } @@ -44,7 +44,7 @@ anyhow = { workspace = true } parking_lot = { workspace = true } # CLI -clap = { version = "4", features = ["derive"] } +clap = { workspace = true } # Audio output cpal = { workspace = true } diff --git a/projects/nms-cockpit-video/daemon/src/video/audio.rs b/projects/nms-cockpit-video/daemon/src/video/audio.rs index 4a0ef3b..d73afc1 100644 --- a/projects/nms-cockpit-video/daemon/src/video/audio.rs +++ b/projects/nms-cockpit-video/daemon/src/video/audio.rs @@ -397,6 +397,9 @@ fn audio_decode_loop( // Extract f32 samples from resampled frame let data = resampled_frame.data(0); let sample_count = resampled_frame.samples() * target_channels as usize; + // SAFETY: Resampled frame data is in FLT (f32) format + // (configured via target_format). `data` points to valid + // memory of `sample_count` f32 samples. let samples: &[f32] = unsafe { std::slice::from_raw_parts( data.as_ptr() as *const f32, @@ -407,13 +410,12 @@ fn audio_decode_loop( // Write to ring buffer, blocking briefly if full to // maintain A/V sync (instead of dropping samples) let mut offset = 0; - let mut retries = 0; + let deadline = std::time::Instant::now() + Duration::from_millis(50); while offset < samples.len() && running { let written = producer.push_slice(&samples[offset..]); offset += written; if offset < samples.len() { - retries += 1; - if retries > 50 { + if std::time::Instant::now() >= deadline { // Timeout: consumer stalled, drop remaining warn!( "Audio ring buffer blocked too long, dropped {} samples", diff --git a/projects/nms-cockpit-video/daemon/src/video/player.rs b/projects/nms-cockpit-video/daemon/src/video/player.rs index 31f3a29..43c9edc 100644 --- a/projects/nms-cockpit-video/daemon/src/video/player.rs +++ b/projects/nms-cockpit-video/daemon/src/video/player.rs @@ -522,7 +522,10 @@ fn handle_pause(state: &Arc>, ctx: &Option) { let mut s = lock_state(state); if let PlayerState::Playing { info, .. } = s.clone() { // Use the frame writer's last PTS as the accurate position - let current_pos = ctx.as_ref().map(|c| c.writer.last_pts_ms()).unwrap_or(0); + let current_pos = ctx + .as_ref() + .and_then(|c| c.writer.last_pts_ms()) + .unwrap_or(0); info!(position_ms = current_pos, "Pausing playback"); // Pause audio diff --git a/projects/nms-cockpit-video/injector/Cargo.toml b/projects/nms-cockpit-video/injector/Cargo.toml index 738d1d0..3e71436 100644 --- a/projects/nms-cockpit-video/injector/Cargo.toml +++ b/projects/nms-cockpit-video/injector/Cargo.toml @@ -19,7 +19,7 @@ itk-ipc = { path = "../../../core/itk-ipc" } itk-protocol = { path = "../../../core/itk-protocol" } serde = { workspace = true } tracing = { workspace = true } -once_cell = "1.19" +once_cell = { workspace = true } [build-dependencies] naga = { version = "24", features = ["wgsl-in", "spv-out"] } diff --git a/projects/nms-cockpit-video/injector/src/hooks/vulkan.rs b/projects/nms-cockpit-video/injector/src/hooks/vulkan.rs index e62df04..9c807aa 100644 --- a/projects/nms-cockpit-video/injector/src/hooks/vulkan.rs +++ b/projects/nms-cockpit-video/injector/src/hooks/vulkan.rs @@ -50,8 +50,8 @@ static PRESENT_QUEUE: OnceCell = OnceCell::new(); /// Queue family index (captured from device creation). static QUEUE_FAMILY_INDEX: AtomicU64 = AtomicU64::new(0); -/// Current swapchain handle. -static SWAPCHAIN: OnceCell = OnceCell::new(); +/// Current swapchain handle (updated on recreation). +static SWAPCHAIN: Mutex> = Mutex::new(None); /// Cached ash::Device dispatch table (avoids per-frame function pointer resolution). static ASH_DEVICE: OnceCell = OnceCell::new(); @@ -259,8 +259,10 @@ fn hooked_create_swapchain( SWAPCHAIN_FORMAT.store(format.as_raw() as u64, Ordering::Relaxed); let sc = unsafe { *swapchain }; - // Store swapchain (first one only for now) - let _ = SWAPCHAIN.set(sc); + // Update swapchain handle (supports recreation) + if let Ok(mut guard) = SWAPCHAIN.lock() { + *guard = Some(sc); + } vlog!( "Swapchain created: {}x{} format={:?} handle={:?}", @@ -485,7 +487,9 @@ unsafe extern "system" fn icd_hooked_create_swapchain( SWAPCHAIN_FORMAT.store(format.as_raw() as u64, Ordering::Relaxed); let sc = *swapchain; - let _ = SWAPCHAIN.set(sc); + if let Ok(mut guard) = SWAPCHAIN.lock() { + *guard = Some(sc); + } vlog!( "Swapchain created (ICD hook): {}x{} format={:?} handle={:?}", diff --git a/projects/nms-cockpit-video/injector/src/renderer/mod.rs b/projects/nms-cockpit-video/injector/src/renderer/mod.rs index 599e271..1fb9c91 100644 --- a/projects/nms-cockpit-video/injector/src/renderer/mod.rs +++ b/projects/nms-cockpit-video/injector/src/renderer/mod.rs @@ -233,7 +233,7 @@ impl VulkanRenderer { // Wait for previous use of this frame's resources self.device - .wait_for_fences(&[frame.fence], true, u64::MAX) + .wait_for_fences(&[frame.fence], true, 1_000_000_000) .map_err(|e| format!("Wait fence failed: {:?}", e))?; self.device .reset_fences(&[frame.fence]) @@ -392,7 +392,7 @@ impl VulkanRenderer { // Wait for our rendering to complete before the game presents self.device - .wait_for_fences(&[frame.fence], true, u64::MAX) + .wait_for_fences(&[frame.fence], true, 1_000_000_000) .map_err(|e| format!("Wait after submit failed: {:?}", e))?; Ok(()) @@ -426,7 +426,7 @@ impl VulkanRenderer { // Wait for previous use of this cached fence self.device - .wait_for_fences(&[fence], true, u64::MAX) + .wait_for_fences(&[fence], true, 1_000_000_000) .map_err(|e| format!("VR fence wait failed: {:?}", e))?; self.device .reset_fences(&[fence]) @@ -573,7 +573,7 @@ impl VulkanRenderer { .map_err(|e| format!("VR queue submit failed: {:?}", e))?; self.device - .wait_for_fences(&[fence], true, u64::MAX) + .wait_for_fences(&[fence], true, 1_000_000_000) .map_err(|e| format!("VR fence wait failed: {:?}", e))?; Ok(()) diff --git a/projects/nms-cockpit-video/overlay/Cargo.toml b/projects/nms-cockpit-video/overlay/Cargo.toml index 4759ca9..6258d8e 100644 --- a/projects/nms-cockpit-video/overlay/Cargo.toml +++ b/projects/nms-cockpit-video/overlay/Cargo.toml @@ -21,16 +21,16 @@ itk-shmem = { path = "../../../core/itk-shmem" } itk-ipc = { path = "../../../core/itk-ipc" } # GUI -egui = "0.28" -egui-wgpu = "0.28" -egui-winit = "0.28" +egui = { workspace = true } +egui-wgpu = { workspace = true } +egui-winit = { workspace = true } # Async runtime tokio = { workspace = true } # Serialization serde = { workspace = true } -serde_json = "1.0" +serde_json = { workspace = true } # Logging tracing = { workspace = true } @@ -41,14 +41,14 @@ thiserror = { workspace = true } anyhow = { workspace = true } # Graphics - must match egui-wgpu 0.28's wgpu version -wgpu = "0.20" -winit = { version = "0.29", features = ["rwh_06"] } +wgpu = { workspace = true } +winit = { workspace = true, features = ["rwh_06"] } pollster = "0.3" bytemuck = { version = "1.14", features = ["derive"] } raw-window-handle = "0.6" # CLI -clap = { version = "4", features = ["derive"] } +clap = { workspace = true } # Windows platform APIs [target.'cfg(windows)'.dependencies] diff --git a/tools/mem-scanner/Cargo.toml b/tools/mem-scanner/Cargo.toml index e9a65fa..3a81964 100644 --- a/tools/mem-scanner/Cargo.toml +++ b/tools/mem-scanner/Cargo.toml @@ -10,7 +10,7 @@ path = "src/main.rs" [dependencies] serde = { workspace = true } -serde_json = "1.0" +serde_json = { workspace = true } [target.'cfg(windows)'.dependencies] windows = { version = "0.58", features = [