From ea821c73d3560c384426a9eb69395bf768595c8a Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 19 Feb 2026 18:14:54 +0000 Subject: [PATCH 1/2] fix: enforce Sync invariant at compile time, guard callback aliasing, lock in Drop - Introduce InferenceGuard: the model's raw FFI handle is now only accessible through the guard returned by lock_inference(), making it a compile error to touch the handle without holding the lock. stop() remains the sole documented exception (atomic-only). - Rewrite token_trampoline to use &CallbackState (shared ref) with Cell/UnsafeCell interior mutability and an in_callback re-entrancy guard, eliminating the previous &mut aliasing risk. - Acquire inference_lock in Model::drop so cactus_destroy waits for any in-flight FFI operation to complete. - Add SAFETY comment for the stack-pinned CallbackState pointer passed to cactus_complete. Co-Authored-By: yujonglee --- crates/cactus/src/llm/complete.rs | 50 ++++++++++++++++++---------- crates/cactus/src/model.rs | 36 ++++++++++++++------ crates/cactus/src/stt/batch.rs | 4 +-- crates/cactus/src/stt/transcriber.rs | 4 +-- crates/cactus/src/vad.rs | 4 +-- 5 files changed, 64 insertions(+), 34 deletions(-) diff --git a/crates/cactus/src/llm/complete.rs b/crates/cactus/src/llm/complete.rs index 6b750b83a0..9f9a2753ff 100644 --- a/crates/cactus/src/llm/complete.rs +++ b/crates/cactus/src/llm/complete.rs @@ -1,17 +1,19 @@ +use std::cell::{Cell, UnsafeCell}; use std::ffi::{CStr, CString}; use crate::error::{Error, Result}; use crate::ffi_utils::{RESPONSE_BUF_SIZE, parse_buf}; -use crate::model::Model; +use crate::model::{InferenceGuard, Model}; use super::{CompleteOptions, CompletionResult, Message}; type TokenCallback = unsafe extern "C" fn(*const std::ffi::c_char, u32, *mut std::ffi::c_void); struct CallbackState<'a, F: FnMut(&str) -> bool> { - on_token: &'a mut F, + on_token: UnsafeCell<&'a mut F>, model: &'a Model, - stopped: bool, + stopped: Cell, + in_callback: Cell, } unsafe extern "C" fn token_trampoline bool>( @@ -23,21 +25,28 @@ unsafe extern "C" fn token_trampoline bool>( return; } - let state = unsafe { &mut *(user_data as *mut CallbackState) }; - if state.stopped { + // SAFETY: We only create a shared reference to CallbackState. Interior + // mutability (Cell/UnsafeCell) handles mutation. The `in_callback` guard + // prevents re-entrant access to the UnsafeCell contents. + let state = unsafe { &*(user_data as *const CallbackState) }; + if state.stopped.get() || state.in_callback.get() { return; } + state.in_callback.set(true); let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { let chunk = unsafe { CStr::from_ptr(token) }.to_string_lossy(); - if !(state.on_token)(&chunk) { - state.stopped = true; + // SAFETY: The `in_callback` flag ensures exclusive access to the closure. + let on_token = unsafe { &mut *state.on_token.get() }; + if !on_token(&chunk) { + state.stopped.set(true); state.model.stop(); } })); + state.in_callback.set(false); if result.is_err() { - state.stopped = true; + state.stopped.set(true); state.model.stop(); } } @@ -58,6 +67,7 @@ pub(super) fn complete_error(rc: i32) -> Error { impl Model { fn call_complete( &self, + guard: &InferenceGuard<'_>, messages_c: &CString, options_c: &CString, callback: Option, @@ -67,7 +77,7 @@ impl Model { let rc = unsafe { cactus_sys::cactus_complete( - self.raw_handle(), + guard.raw_handle(), messages_c.as_ptr(), buf.as_mut_ptr().cast::(), buf.len(), @@ -86,9 +96,10 @@ impl Model { messages: &[Message], options: &CompleteOptions, ) -> Result { - let _guard = self.lock_inference(); + let guard = self.lock_inference(); let (messages_c, options_c) = serialize_complete_request(messages, options)?; - let (rc, buf) = self.call_complete(&messages_c, &options_c, None, std::ptr::null_mut()); + let (rc, buf) = + self.call_complete(&guard, &messages_c, &options_c, None, std::ptr::null_mut()); if rc < 0 { return Err(complete_error(rc)); @@ -106,23 +117,28 @@ impl Model { where F: FnMut(&str) -> bool, { - let _guard = self.lock_inference(); + let guard = self.lock_inference(); let (messages_c, options_c) = serialize_complete_request(messages, options)?; - let mut state = CallbackState { - on_token: &mut on_token, + let state = CallbackState { + on_token: UnsafeCell::new(&mut on_token), model: self, - stopped: false, + stopped: Cell::new(false), + in_callback: Cell::new(false), }; + // SAFETY: `state` is stack-allocated and lives for the duration of the + // FFI call. The C++ side must not retain this pointer beyond the return + // of `cactus_complete`. let (rc, buf) = self.call_complete( + &guard, &messages_c, &options_c, Some(token_trampoline::), - (&mut state as *mut CallbackState).cast::(), + (&state as *const CallbackState as *mut std::ffi::c_void), ); - if rc < 0 && !state.stopped { + if rc < 0 && !state.stopped.get() { return Err(complete_error(rc)); } diff --git a/crates/cactus/src/model.rs b/crates/cactus/src/model.rs index 685619800e..24e9f2c949 100644 --- a/crates/cactus/src/model.rs +++ b/crates/cactus/src/model.rs @@ -11,10 +11,23 @@ pub struct Model { } unsafe impl Send for Model {} -// SAFETY: All FFI methods that touch model state are serialized by `inference_lock`. +// SAFETY: All FFI methods that touch model state are serialized by `inference_lock`, +// which is enforced at compile time via `InferenceGuard` — the model's raw handle is +// only accessible through the guard returned by `lock_inference()`. // The sole exception is `stop()`, which only sets a `std::atomic` on the C++ side. unsafe impl Sync for Model {} +pub(crate) struct InferenceGuard<'a> { + handle: NonNull, + _guard: MutexGuard<'a, ()>, +} + +impl InferenceGuard<'_> { + pub(crate) fn raw_handle(&self) -> *mut std::ffi::c_void { + self.handle.as_ptr() + } +} + pub struct ModelBuilder { model_path: PathBuf, } @@ -53,27 +66,28 @@ impl Model { } pub fn reset(&mut self) { - let _guard = self.lock_inference(); + let guard = self.lock_inference(); unsafe { - cactus_sys::cactus_reset(self.handle.as_ptr()); + cactus_sys::cactus_reset(guard.raw_handle()); } } - pub(crate) fn lock_inference(&self) -> MutexGuard<'_, ()> { - self.inference_lock + pub(crate) fn lock_inference(&self) -> InferenceGuard<'_> { + let guard = self.inference_lock .lock() - .unwrap_or_else(|e| e.into_inner()) - } - - pub(crate) fn raw_handle(&self) -> *mut std::ffi::c_void { - self.handle.as_ptr() + .unwrap_or_else(|e| e.into_inner()); + InferenceGuard { + handle: self.handle, + _guard: guard, + } } } impl Drop for Model { fn drop(&mut self) { + let guard = self.lock_inference(); unsafe { - cactus_sys::cactus_destroy(self.handle.as_ptr()); + cactus_sys::cactus_destroy(guard.raw_handle()); } } } diff --git a/crates/cactus/src/stt/batch.rs b/crates/cactus/src/stt/batch.rs index 05799c21f2..8f90e2ab86 100644 --- a/crates/cactus/src/stt/batch.rs +++ b/crates/cactus/src/stt/batch.rs @@ -32,7 +32,7 @@ impl Model { input: TranscribeInput<'_>, options: &TranscribeOptions, ) -> Result { - let _guard = self.lock_inference(); + let guard = self.lock_inference(); let prompt_c = CString::new(build_whisper_prompt(options))?; let options_c = CString::new(serde_json::to_string(options)?)?; let mut buf = vec![0u8; RESPONSE_BUF_SIZE]; @@ -44,7 +44,7 @@ impl Model { let rc = unsafe { cactus_sys::cactus_transcribe( - self.raw_handle(), + guard.raw_handle(), path_ptr, prompt_c.as_ptr(), buf.as_mut_ptr() as *mut std::ffi::c_char, diff --git a/crates/cactus/src/stt/transcriber.rs b/crates/cactus/src/stt/transcriber.rs index 5c461a2a44..a5b006a3c9 100644 --- a/crates/cactus/src/stt/transcriber.rs +++ b/crates/cactus/src/stt/transcriber.rs @@ -96,11 +96,11 @@ impl std::str::FromStr for StreamResult { impl<'a> Transcriber<'a> { pub fn new(model: &'a Model, options: &TranscribeOptions, cloud: CloudConfig) -> Result { - let _guard = model.lock_inference(); + let guard = model.lock_inference(); let options_c = serialize_stream_options(options, &cloud)?; let raw = unsafe { - cactus_sys::cactus_stream_transcribe_start(model.raw_handle(), options_c.as_ptr()) + cactus_sys::cactus_stream_transcribe_start(guard.raw_handle(), options_c.as_ptr()) }; let handle = NonNull::new(raw).ok_or_else(|| { diff --git a/crates/cactus/src/vad.rs b/crates/cactus/src/vad.rs index a9ea3c21d0..f09b68cad3 100644 --- a/crates/cactus/src/vad.rs +++ b/crates/cactus/src/vad.rs @@ -54,7 +54,7 @@ impl Model { pcm: Option<&[u8]>, options: &VadOptions, ) -> Result { - let _guard = self.lock_inference(); + let guard = self.lock_inference(); let options_c = CString::new(serde_json::to_string(options)?)?; let mut buf = vec![0u8; RESPONSE_BUF_SIZE]; @@ -64,7 +64,7 @@ impl Model { let rc = unsafe { cactus_sys::cactus_vad( - self.raw_handle(), + guard.raw_handle(), path.map_or(std::ptr::null(), |p| p.as_ptr()), buf.as_mut_ptr() as *mut std::ffi::c_char, buf.len(), From 119ce0c68ce0a8e6b9f132d06052fb142358b445 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 19 Feb 2026 18:22:29 +0000 Subject: [PATCH 2/2] style: fix dprint formatting in lock_inference Co-Authored-By: yujonglee --- crates/cactus/src/model.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/cactus/src/model.rs b/crates/cactus/src/model.rs index 24e9f2c949..ca8c04c7ad 100644 --- a/crates/cactus/src/model.rs +++ b/crates/cactus/src/model.rs @@ -73,7 +73,8 @@ impl Model { } pub(crate) fn lock_inference(&self) -> InferenceGuard<'_> { - let guard = self.inference_lock + let guard = self + .inference_lock .lock() .unwrap_or_else(|e| e.into_inner()); InferenceGuard {