Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions codex-rs/core/src/unified_exec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,26 @@ pub(crate) struct UnifiedExecResponse {

#[derive(Default)]
pub(crate) struct UnifiedExecSessionManager {
sessions: Mutex<HashMap<String, SessionEntry>>,
used_session_ids: Mutex<HashSet<String>>,
session_store: Mutex<SessionStore>,
}

// Required for mutex sharing.
#[derive(Default)]
pub(crate) struct SessionStore {
sessions: HashMap<String, SessionEntry>,
reserved_sessions_id: HashSet<String>,
}

impl SessionStore {
fn remove(&mut self, session_id: &str) -> Option<SessionEntry> {
self.reserved_sessions_id.remove(session_id);
self.sessions.remove(session_id)
}

pub(crate) fn clear(&mut self) {
self.reserved_sessions_id.clear();
self.sessions.clear();
}
}

struct SessionEntry {
Expand Down Expand Up @@ -384,9 +402,10 @@ mod tests {
session
.services
.unified_exec_manager
.sessions
.session_store
.lock()
.await
.sessions
.is_empty()
);

Expand Down Expand Up @@ -425,9 +444,10 @@ mod tests {
session
.services
.unified_exec_manager
.sessions
.session_store
.lock()
.await
.sessions
.is_empty()
);

Expand Down
45 changes: 25 additions & 20 deletions codex-rs/core/src/unified_exec/session_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::truncate::formatted_truncate_text;
use super::ExecCommandRequest;
use super::MAX_UNIFIED_EXEC_SESSIONS;
use super::SessionEntry;
use super::SessionStore;
use super::UnifiedExecContext;
use super::UnifiedExecError;
use super::UnifiedExecResponse;
Expand Down Expand Up @@ -81,14 +82,15 @@ struct PreparedSessionHandles {
impl UnifiedExecSessionManager {
pub(crate) async fn allocate_process_id(&self) -> String {
loop {
let mut store = self.used_session_ids.lock().await;
let mut store = self.session_store.lock().await;

let process_id = if !cfg!(test) && !cfg!(feature = "deterministic_process_ids") {
// production mode → random
rand::rng().random_range(1_000..100_000).to_string()
} else {
// test or deterministic mode
let next = store
.reserved_sessions_id
.iter()
.filter_map(|s| s.parse::<i32>().ok())
.max()
Expand All @@ -98,11 +100,11 @@ impl UnifiedExecSessionManager {
next.to_string()
};

if store.contains(&process_id) {
if store.reserved_sessions_id.contains(&process_id) {
continue;
}

store.insert(process_id.clone());
store.reserved_sessions_id.insert(process_id.clone());
return process_id;
}
}
Expand Down Expand Up @@ -331,16 +333,16 @@ impl UnifiedExecSessionManager {
}

async fn refresh_session_state(&self, process_id: &str) -> SessionStatus {
let mut sessions = self.sessions.lock().await;
let Some(entry) = sessions.get(process_id) else {
let mut store = self.session_store.lock().await;
let Some(entry) = store.sessions.get(process_id) else {
return SessionStatus::Unknown;
};

let exit_code = entry.session.exit_code();
let process_id = entry.process_id.clone();

if entry.session.has_exited() {
let Some(entry) = sessions.remove(&process_id) else {
let Some(entry) = store.remove(&process_id) else {
return SessionStatus::Unknown;
};
SessionStatus::Exited {
Expand All @@ -360,12 +362,14 @@ impl UnifiedExecSessionManager {
&self,
process_id: &str,
) -> Result<PreparedSessionHandles, UnifiedExecError> {
let mut sessions = self.sessions.lock().await;
let entry = sessions
.get_mut(process_id)
.ok_or(UnifiedExecError::UnknownSessionId {
process_id: process_id.to_string(),
})?;
let mut store = self.session_store.lock().await;
let entry =
store
.sessions
.get_mut(process_id)
.ok_or(UnifiedExecError::UnknownSessionId {
process_id: process_id.to_string(),
})?;
entry.last_used = Instant::now();
let OutputHandles {
output_buffer,
Expand Down Expand Up @@ -417,9 +421,9 @@ impl UnifiedExecSessionManager {
started_at,
last_used: started_at,
};
let mut sessions = self.sessions.lock().await;
Self::prune_sessions_if_needed(&mut sessions);
sessions.insert(process_id, entry);
let mut store = self.session_store.lock().await;
Self::prune_sessions_if_needed(&mut store);
store.sessions.insert(process_id, entry);
}

async fn emit_exec_end_from_entry(
Expand Down Expand Up @@ -629,18 +633,19 @@ impl UnifiedExecSessionManager {
collected
}

fn prune_sessions_if_needed(sessions: &mut HashMap<String, SessionEntry>) {
if sessions.len() < MAX_UNIFIED_EXEC_SESSIONS {
fn prune_sessions_if_needed(store: &mut SessionStore) {
if store.sessions.len() < MAX_UNIFIED_EXEC_SESSIONS {
return;
}

let meta: Vec<(String, Instant, bool)> = sessions
let meta: Vec<(String, Instant, bool)> = store
.sessions
.iter()
.map(|(id, entry)| (id.clone(), entry.last_used, entry.session.has_exited()))
.collect();

if let Some(session_id) = Self::session_id_to_prune_from_meta(&meta) {
sessions.remove(&session_id);
store.remove(&session_id);
}
}

Expand Down Expand Up @@ -674,7 +679,7 @@ impl UnifiedExecSessionManager {
}

pub(crate) async fn terminate_all_sessions(&self) {
let mut sessions = self.sessions.lock().await;
let mut sessions = self.session_store.lock().await;
sessions.clear();
}
}
Expand Down
Loading