Skip to content

Commit 457c9fd

Browse files
authored
chore: better session recycling (#7368)
1 parent 6eeaf46 commit 457c9fd

File tree

2 files changed

+49
-24
lines changed

2 files changed

+49
-24
lines changed

codex-rs/core/src/unified_exec/mod.rs

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,26 @@ pub(crate) struct UnifiedExecResponse {
9898

9999
#[derive(Default)]
100100
pub(crate) struct UnifiedExecSessionManager {
101-
sessions: Mutex<HashMap<String, SessionEntry>>,
102-
used_session_ids: Mutex<HashSet<String>>,
101+
session_store: Mutex<SessionStore>,
102+
}
103+
104+
// Required for mutex sharing.
105+
#[derive(Default)]
106+
pub(crate) struct SessionStore {
107+
sessions: HashMap<String, SessionEntry>,
108+
reserved_sessions_id: HashSet<String>,
109+
}
110+
111+
impl SessionStore {
112+
fn remove(&mut self, session_id: &str) -> Option<SessionEntry> {
113+
self.reserved_sessions_id.remove(session_id);
114+
self.sessions.remove(session_id)
115+
}
116+
117+
pub(crate) fn clear(&mut self) {
118+
self.reserved_sessions_id.clear();
119+
self.sessions.clear();
120+
}
103121
}
104122

105123
struct SessionEntry {
@@ -384,9 +402,10 @@ mod tests {
384402
session
385403
.services
386404
.unified_exec_manager
387-
.sessions
405+
.session_store
388406
.lock()
389407
.await
408+
.sessions
390409
.is_empty()
391410
);
392411

@@ -425,9 +444,10 @@ mod tests {
425444
session
426445
.services
427446
.unified_exec_manager
428-
.sessions
447+
.session_store
429448
.lock()
430449
.await
450+
.sessions
431451
.is_empty()
432452
);
433453

codex-rs/core/src/unified_exec/session_manager.rs

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use crate::truncate::formatted_truncate_text;
3636
use super::ExecCommandRequest;
3737
use super::MAX_UNIFIED_EXEC_SESSIONS;
3838
use super::SessionEntry;
39+
use super::SessionStore;
3940
use super::UnifiedExecContext;
4041
use super::UnifiedExecError;
4142
use super::UnifiedExecResponse;
@@ -81,14 +82,15 @@ struct PreparedSessionHandles {
8182
impl UnifiedExecSessionManager {
8283
pub(crate) async fn allocate_process_id(&self) -> String {
8384
loop {
84-
let mut store = self.used_session_ids.lock().await;
85+
let mut store = self.session_store.lock().await;
8586

8687
let process_id = if !cfg!(test) && !cfg!(feature = "deterministic_process_ids") {
8788
// production mode → random
8889
rand::rng().random_range(1_000..100_000).to_string()
8990
} else {
9091
// test or deterministic mode
9192
let next = store
93+
.reserved_sessions_id
9294
.iter()
9395
.filter_map(|s| s.parse::<i32>().ok())
9496
.max()
@@ -98,11 +100,11 @@ impl UnifiedExecSessionManager {
98100
next.to_string()
99101
};
100102

101-
if store.contains(&process_id) {
103+
if store.reserved_sessions_id.contains(&process_id) {
102104
continue;
103105
}
104106

105-
store.insert(process_id.clone());
107+
store.reserved_sessions_id.insert(process_id.clone());
106108
return process_id;
107109
}
108110
}
@@ -331,16 +333,16 @@ impl UnifiedExecSessionManager {
331333
}
332334

333335
async fn refresh_session_state(&self, process_id: &str) -> SessionStatus {
334-
let mut sessions = self.sessions.lock().await;
335-
let Some(entry) = sessions.get(process_id) else {
336+
let mut store = self.session_store.lock().await;
337+
let Some(entry) = store.sessions.get(process_id) else {
336338
return SessionStatus::Unknown;
337339
};
338340

339341
let exit_code = entry.session.exit_code();
340342
let process_id = entry.process_id.clone();
341343

342344
if entry.session.has_exited() {
343-
let Some(entry) = sessions.remove(&process_id) else {
345+
let Some(entry) = store.remove(&process_id) else {
344346
return SessionStatus::Unknown;
345347
};
346348
SessionStatus::Exited {
@@ -360,12 +362,14 @@ impl UnifiedExecSessionManager {
360362
&self,
361363
process_id: &str,
362364
) -> Result<PreparedSessionHandles, UnifiedExecError> {
363-
let mut sessions = self.sessions.lock().await;
364-
let entry = sessions
365-
.get_mut(process_id)
366-
.ok_or(UnifiedExecError::UnknownSessionId {
367-
process_id: process_id.to_string(),
368-
})?;
365+
let mut store = self.session_store.lock().await;
366+
let entry =
367+
store
368+
.sessions
369+
.get_mut(process_id)
370+
.ok_or(UnifiedExecError::UnknownSessionId {
371+
process_id: process_id.to_string(),
372+
})?;
369373
entry.last_used = Instant::now();
370374
let OutputHandles {
371375
output_buffer,
@@ -417,9 +421,9 @@ impl UnifiedExecSessionManager {
417421
started_at,
418422
last_used: started_at,
419423
};
420-
let mut sessions = self.sessions.lock().await;
421-
Self::prune_sessions_if_needed(&mut sessions);
422-
sessions.insert(process_id, entry);
424+
let mut store = self.session_store.lock().await;
425+
Self::prune_sessions_if_needed(&mut store);
426+
store.sessions.insert(process_id, entry);
423427
}
424428

425429
async fn emit_exec_end_from_entry(
@@ -629,18 +633,19 @@ impl UnifiedExecSessionManager {
629633
collected
630634
}
631635

632-
fn prune_sessions_if_needed(sessions: &mut HashMap<String, SessionEntry>) {
633-
if sessions.len() < MAX_UNIFIED_EXEC_SESSIONS {
636+
fn prune_sessions_if_needed(store: &mut SessionStore) {
637+
if store.sessions.len() < MAX_UNIFIED_EXEC_SESSIONS {
634638
return;
635639
}
636640

637-
let meta: Vec<(String, Instant, bool)> = sessions
641+
let meta: Vec<(String, Instant, bool)> = store
642+
.sessions
638643
.iter()
639644
.map(|(id, entry)| (id.clone(), entry.last_used, entry.session.has_exited()))
640645
.collect();
641646

642647
if let Some(session_id) = Self::session_id_to_prune_from_meta(&meta) {
643-
sessions.remove(&session_id);
648+
store.remove(&session_id);
644649
}
645650
}
646651

@@ -674,7 +679,7 @@ impl UnifiedExecSessionManager {
674679
}
675680

676681
pub(crate) async fn terminate_all_sessions(&self) {
677-
let mut sessions = self.sessions.lock().await;
682+
let mut sessions = self.session_store.lock().await;
678683
sessions.clear();
679684
}
680685
}

0 commit comments

Comments
 (0)