diff --git a/code-rs/core/src/agent_tool.rs b/code-rs/core/src/agent_tool.rs index 599f6f9cc38..ed194435cd9 100644 --- a/code-rs/core/src/agent_tool.rs +++ b/code-rs/core/src/agent_tool.rs @@ -6,6 +6,7 @@ use serde::Serialize; use uuid::Uuid; use std::collections::BTreeMap; use std::collections::HashMap; +use std::collections::HashSet; use std::path::PathBuf; use std::process::Stdio; use std::sync::Arc; @@ -14,6 +15,7 @@ use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::sync::RwLock; use tokio::sync::mpsc; use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; use crate::agent_defaults::{agent_model_spec, default_params_for}; use crate::config_types::AgentConfig; @@ -65,9 +67,14 @@ lazy_static::lazy_static! { pub static ref AGENT_MANAGER: Arc> = Arc::new(RwLock::new(AgentManager::new())); } +const MAX_TRACKED_COMPLETED_AGENTS: usize = 200; +const MAX_COMPLETED_AGENT_AGE_HOURS: i64 = 6; +const MAX_AGENT_PROGRESS_ENTRIES: usize = 200; + pub struct AgentManager { agents: HashMap, handles: HashMap>, + cancel_tokens: HashMap, event_sender: Option>, } @@ -83,6 +90,7 @@ impl AgentManager { Self { agents: HashMap::new(), handles: HashMap::new(), + cancel_tokens: HashMap::new(), event_sender: None, } } @@ -91,68 +99,128 @@ impl AgentManager { self.event_sender = Some(sender); } - async fn send_agent_status_update(&self) { - if let Some(ref sender) = self.event_sender { - let now = Utc::now(); - let agents: Vec = self - .agents - .values() - .map(|agent| { - // Just show the model name - status provides the useful info - let name = agent - .name - .as_ref() - .map(|value| value.clone()) - .unwrap_or_else(|| agent.model.clone()); - let start = agent.started_at.unwrap_or(agent.created_at); - let end = agent.completed_at.unwrap_or(now); - let elapsed_ms = match end.signed_duration_since(start).num_milliseconds() { - value if value >= 0 => Some(value as u64), - _ => None, - }; - - AgentInfo { - id: agent.id.clone(), - name, - status: format!("{:?}", agent.status).to_lowercase(), - batch_id: agent.batch_id.clone(), - model: Some(agent.model.clone()), - last_progress: agent.progress.last().cloned(), - result: agent.result.clone(), - error: agent.error.clone(), - elapsed_ms, - token_count: None, + pub(crate) fn build_status_payload(&self) -> AgentStatusUpdatePayload { + let now = Utc::now(); + + let max_age = Duration::hours(MAX_COMPLETED_AGENT_AGE_HOURS); + + let mut active: Vec<(DateTime, String)> = Vec::new(); + let mut terminal: Vec<(DateTime, String)> = Vec::new(); + + for agent in self.agents.values() { + match agent.status { + AgentStatus::Pending | AgentStatus::Running => { + active.push((agent.created_at, agent.id.clone())); + } + AgentStatus::Completed | AgentStatus::Failed | AgentStatus::Cancelled => { + let completed_at = agent + .completed_at + .or(agent.started_at) + .unwrap_or(agent.created_at); + if now.signed_duration_since(completed_at) <= max_age { + terminal.push((completed_at, agent.id.clone())); } - }) - .collect(); - - // Get context and task from the first agent (they're all the same) - let (context, task) = self - .agents - .values() - .next() - .map(|agent| { - let context = agent - .context - .as_ref() - .and_then(|value| if value.trim().is_empty() { - None - } else { - Some(value.clone()) - }); - let task = if agent.prompt.trim().is_empty() { - None - } else { - Some(agent.prompt.clone()) - }; - (context, task) - }) - .unwrap_or((None, None)); - let payload = AgentStatusUpdatePayload { agents, context, task }; + } + } + } + + active.sort_by_key(|(created_at, _)| *created_at); + terminal.sort_by_key(|(completed_at, _)| *completed_at); + terminal.reverse(); + if terminal.len() > MAX_TRACKED_COMPLETED_AGENTS { + terminal.truncate(MAX_TRACKED_COMPLETED_AGENTS); + } + terminal.reverse(); + + let mut ordered_ids: Vec = Vec::with_capacity(active.len() + terminal.len()); + ordered_ids.extend(active.into_iter().map(|(_, id)| id)); + ordered_ids.extend(terminal.into_iter().map(|(_, id)| id)); + + let first_agent_id = ordered_ids.first().cloned(); + + let agents: Vec = ordered_ids + .into_iter() + .filter_map(|id| self.agents.get(&id).map(|agent| agent_info_snapshot(agent, now))) + .collect(); + + let (context, task) = first_agent_id + .and_then(|id| self.agents.get(&id)) + .map(|agent| { + let context = agent + .context + .as_ref() + .and_then(|value| if value.trim().is_empty() { None } else { Some(value.clone()) }); + let task = if agent.prompt.trim().is_empty() { + None + } else { + Some(agent.prompt.clone()) + }; + (context, task) + }) + .unwrap_or((None, None)); + + AgentStatusUpdatePayload { agents, context, task } + } + + async fn send_agent_status_update(&mut self) { + self.prune_finished_agents(); + if let Some(ref sender) = self.event_sender { + let payload = self.build_status_payload(); let _ = sender.send(payload); } } + fn prune_finished_agents(&mut self) { + let max_age = Duration::hours(MAX_COMPLETED_AGENT_AGE_HOURS); + let now = Utc::now(); + + let mut terminal: Vec<(String, DateTime)> = self + .agents + .iter() + .filter_map(|(id, agent)| match agent.status { + AgentStatus::Completed | AgentStatus::Failed | AgentStatus::Cancelled => { + let completed_at = agent + .completed_at + .or(agent.started_at) + .unwrap_or(agent.created_at); + Some((id.clone(), completed_at)) + } + _ => None, + }) + .collect(); + + if terminal.is_empty() { + return; + } + + terminal.sort_by_key(|(_, completed_at)| *completed_at); + + let mut to_remove: HashSet = HashSet::new(); + for (id, completed_at) in terminal.iter() { + if now.signed_duration_since(*completed_at) > max_age { + to_remove.insert(id.clone()); + } + } + + let survivors: Vec<(String, DateTime)> = terminal + .into_iter() + .filter(|(id, _)| !to_remove.contains(id)) + .collect(); + + if survivors.len() > MAX_TRACKED_COMPLETED_AGENTS { + let excess = survivors.len() - MAX_TRACKED_COMPLETED_AGENTS; + for (id, _) in survivors.iter().take(excess) { + to_remove.insert(id.clone()); + } + } + + for id in to_remove { + self.agents.remove(&id); + self.handles.remove(&id); + self.cancel_tokens.remove(&id); + } + } + pub async fn create_agent( &mut self, model: String, @@ -242,13 +310,16 @@ impl AgentManager { self.agents.insert(agent_id.clone(), agent.clone()); + let cancel_token = CancellationToken::new(); + self.cancel_tokens.insert(agent_id.clone(), cancel_token.clone()); + // Send initial status update self.send_agent_status_update().await; // Spawn async agent let agent_id_clone = agent_id.clone(); let handle = tokio::spawn(async move { - execute_agent(agent_id_clone, config).await; + execute_agent(agent_id_clone, config, cancel_token).await; }); self.handles.insert(agent_id.clone(), handle); @@ -260,10 +331,6 @@ impl AgentManager { self.agents.get(agent_id).cloned() } - pub fn get_all_agents(&self) -> impl Iterator { - self.agents.values() - } - pub fn list_agents( &self, status_filter: Option, @@ -307,12 +374,16 @@ impl AgentManager { } pub async fn cancel_agent(&mut self, agent_id: &str) -> bool { + if let Some(token) = self.cancel_tokens.remove(agent_id) { + token.cancel(); + } if let Some(handle) = self.handles.remove(agent_id) { handle.abort(); if let Some(agent) = self.agents.get_mut(agent_id) { agent.status = AgentStatus::Cancelled; agent.completed_at = Some(Utc::now()); } + self.prune_finished_agents(); true } else { false @@ -337,6 +408,7 @@ impl AgentManager { } pub async fn update_agent_status(&mut self, agent_id: &str, status: AgentStatus) { + let mut should_send = false; if let Some(agent) = self.agents.get_mut(agent_id) { agent.status = status; if agent.status == AgentStatus::Running && agent.started_at.is_none() { @@ -348,34 +420,56 @@ impl AgentManager { ) { agent.completed_at = Some(Utc::now()); } + should_send = true; + } + if should_send { // Send status update event self.send_agent_status_update().await; } } pub async fn update_agent_result(&mut self, agent_id: &str, result: Result) { + let mut should_send = false; if let Some(agent) = self.agents.get_mut(agent_id) { - match result { - Ok(output) => { - agent.result = Some(output); - agent.status = AgentStatus::Completed; - } - Err(error) => { - agent.error = Some(error); - agent.status = AgentStatus::Failed; + if agent.status == AgentStatus::Cancelled { + agent.completed_at = Some(Utc::now()); + should_send = true; + } else { + match result { + Ok(output) => { + agent.result = Some(output); + agent.status = AgentStatus::Completed; + } + Err(error) => { + agent.error = Some(error); + agent.status = AgentStatus::Failed; + } } + agent.completed_at = Some(Utc::now()); + should_send = true; } - agent.completed_at = Some(Utc::now()); + } + self.handles.remove(agent_id); + self.cancel_tokens.remove(agent_id); + if should_send { // Send status update event self.send_agent_status_update().await; } } pub async fn add_progress(&mut self, agent_id: &str, message: String) { + let mut should_send = false; if let Some(agent) = self.agents.get_mut(agent_id) { agent .progress .push(format!("{}: {}", Utc::now().format("%H:%M:%S"), message)); + if agent.progress.len() > MAX_AGENT_PROGRESS_ENTRIES { + let excess = agent.progress.len() - MAX_AGENT_PROGRESS_ENTRIES; + agent.progress.drain(0..excess); + } + should_send = true; + } + if should_send { // Send updated agent status with the latest progress self.send_agent_status_update().await; } @@ -394,6 +488,33 @@ impl AgentManager { } } +fn agent_info_snapshot(agent: &Agent, now: DateTime) -> AgentInfo { + let name = agent + .name + .as_ref() + .cloned() + .unwrap_or_else(|| agent.model.clone()); + let start = agent.started_at.unwrap_or(agent.created_at); + let end = agent.completed_at.unwrap_or(now); + let elapsed_ms = match end.signed_duration_since(start).num_milliseconds() { + value if value >= 0 => Some(value as u64), + _ => None, + }; + + AgentInfo { + id: agent.id.clone(), + name, + status: format!("{:?}", agent.status).to_lowercase(), + batch_id: agent.batch_id.clone(), + model: Some(agent.model.clone()), + last_progress: agent.progress.last().cloned(), + result: agent.result.clone(), + error: agent.error.clone(), + elapsed_ms, + token_count: None, + } +} + async fn get_git_root() -> Result { let output = Command::new("git") .args(&["rev-parse", "--show-toplevel"]) @@ -449,7 +570,7 @@ fn generate_branch_id(model: &str, agent: &str) -> String { use crate::git_worktree::setup_worktree; -async fn execute_agent(agent_id: String, config: Option) { +async fn execute_agent(agent_id: String, config: Option, cancel_token: CancellationToken) { let mut manager = AGENT_MANAGER.write().await; // Get agent details @@ -558,6 +679,7 @@ async fn execute_agent(agent_id: String, config: Option) { Some(worktree_path), config.clone(), spec.slug, + cancel_token.clone(), ) .await } @@ -568,6 +690,7 @@ async fn execute_agent(agent_id: String, config: Option) { Some(worktree_path), config.clone(), model.as_str(), + cancel_token.clone(), ) .await } @@ -578,6 +701,7 @@ async fn execute_agent(agent_id: String, config: Option) { false, Some(worktree_path), config.clone(), + cancel_token.clone(), ) .await } @@ -603,13 +727,37 @@ async fn execute_agent(agent_id: String, config: Option) { if !spec.is_enabled() { Err(gating_error_message(spec)) } else { - execute_cloud_built_in_streaming(&agent_id, &full_prompt, None, config, spec.slug).await + execute_cloud_built_in_streaming( + &agent_id, + &full_prompt, + None, + config, + spec.slug, + cancel_token.clone(), + ) + .await } } else { - execute_cloud_built_in_streaming(&agent_id, &full_prompt, None, config, model.as_str()).await + execute_cloud_built_in_streaming( + &agent_id, + &full_prompt, + None, + config, + model.as_str(), + cancel_token.clone(), + ) + .await } } else { - execute_model_with_permissions(&model, &full_prompt, true, None, config).await + execute_model_with_permissions( + &model, + &full_prompt, + true, + None, + config, + cancel_token.clone(), + ) + .await } }; @@ -624,6 +772,7 @@ async fn execute_model_with_permissions( read_only: bool, working_dir: Option, config: Option, + _cancel_token: CancellationToken, ) -> Result { // Helper: cross‑platform check whether an executable is available in PATH // and is directly spawnable by std::process::Command (no shell wrappers). @@ -1017,6 +1166,7 @@ async fn execute_cloud_built_in_streaming( working_dir: Option, _config: Option, model_slug: &str, + cancel_token: CancellationToken, ) -> Result { // Program and argv let program = std::env::current_exe() @@ -1044,16 +1194,34 @@ async fn execute_cloud_built_in_streaming( .await .map_err(|e| format!("Failed to spawn cloud submit: {}", e))?; - // Stream stderr to HUD - let stderr_task = if let Some(stderr) = child.stderr.take() { + let cancel_for_stderr = cancel_token.clone(); + let mut stderr_task = if let Some(stderr) = child.stderr.take() { let agent = agent_id.to_string(); Some(tokio::spawn(async move { let mut lines = BufReader::new(stderr).lines(); - while let Ok(Some(line)) = lines.next_line().await { - let msg = line.trim(); - if msg.is_empty() { continue; } - let mut mgr = AGENT_MANAGER.write().await; - mgr.add_progress(&agent, msg.to_string()).await; + loop { + tokio::select! { + _ = cancel_for_stderr.cancelled() => { + break; + } + line = lines.next_line() => { + match line { + Ok(Some(line)) => { + let msg = line.trim(); + if msg.is_empty() { + continue; + } + let mut mgr = AGENT_MANAGER.write().await; + mgr.add_progress(&agent, msg.to_string()).await; + } + Ok(None) => break, + Err(err) => { + tracing::warn!("failed to read agent stderr: {}", err); + break; + } + } + } + } } })) } else { None }; @@ -1061,15 +1229,52 @@ async fn execute_cloud_built_in_streaming( // Collect stdout fully (final result) let mut stdout_buf = String::new(); if let Some(stdout) = child.stdout.take() { + let cancel_for_stdout = cancel_token.clone(); let mut lines = BufReader::new(stdout).lines(); - while let Ok(Some(line)) = lines.next_line().await { - stdout_buf.push_str(&line); - stdout_buf.push('\n'); + loop { + tokio::select! { + _ = cancel_for_stdout.cancelled() => { + break; + } + line = lines.next_line() => { + match line { + Ok(Some(line)) => { + stdout_buf.push_str(&line); + stdout_buf.push('\n'); + } + Ok(None) => break, + Err(err) => { + tracing::warn!("failed to read agent stdout: {}", err); + break; + } + } + } + } } } - let status = child.wait().await.map_err(|e| format!("Failed to wait: {}", e))?; - if let Some(t) = stderr_task { let _ = t.await; } + let cancelled = cancel_token.cancelled(); + tokio::pin!(cancelled); + let status = tokio::select! { + status = child.wait() => status, + _ = &mut cancelled => { + if let Err(err) = child.start_kill() { + tracing::warn!("failed to kill cancelled agent child: {}", err); + } + match child.wait().await { + Ok(_) => {} + Err(wait_err) => { + tracing::warn!("failed to reap cancelled agent child: {}", wait_err); + } + } + if let Some(task) = stderr_task.take() { + let _ = task.await; + } + return Err("Agent cancelled".to_string()); + } + } + .map_err(|e| format!("Failed to wait: {}", e))?; + if let Some(task) = stderr_task.take() { let _ = task.await; } if !status.success() { return Err(format!("cloud submit exited with status {}", status)); } diff --git a/code-rs/core/src/codex.rs b/code-rs/core/src/codex.rs index 59330e41264..c987f1ac16c 100644 --- a/code-rs/core/src/codex.rs +++ b/code-rs/core/src/codex.rs @@ -9845,46 +9845,17 @@ async fn capture_browser_screenshot(_sess: &Session) -> Result<(PathBuf, String) /// Send agent status update event to the TUI async fn send_agent_status_update(sess: &Session) { let manager = AGENT_MANAGER.read().await; + let payload = manager.build_status_payload(); + drop(manager); - // Collect all agents; include completed/failed so HUD can show final messages - let now = Utc::now(); - let agents: Vec = manager - .get_all_agents() - .map(|agent| { - let start = agent.started_at.unwrap_or(agent.created_at); - let end = agent.completed_at.unwrap_or(now); - let elapsed_ms = match end.signed_duration_since(start).num_milliseconds() { - value if value >= 0 => Some(value as u64), - _ => None, - }; - - crate::protocol::AgentInfo { - id: agent.id.clone(), - name: agent.model.clone(), // Use model name as the display name - status: match agent.status { - AgentStatus::Pending => "pending".to_string(), - AgentStatus::Running => "running".to_string(), - AgentStatus::Completed => "completed".to_string(), - AgentStatus::Failed => "failed".to_string(), - AgentStatus::Cancelled => "cancelled".to_string(), - }, - batch_id: agent.batch_id.clone(), - model: Some(agent.model.clone()), - last_progress: agent.progress.last().cloned(), - result: agent.result.clone(), - error: agent.error.clone(), - elapsed_ms, - token_count: None, - } - }) - .collect(); + let AgentStatusUpdatePayload { agents, context, task } = payload; let event = sess.make_event( "agent_status", EventMsg::AgentStatusUpdate(AgentStatusUpdateEvent { agents, - context: None, - task: None, + context, + task, }), ); diff --git a/code-rs/tui/src/chatwidget.rs b/code-rs/tui/src/chatwidget.rs index c79cdbfc96d..19221867ace 100644 --- a/code-rs/tui/src/chatwidget.rs +++ b/code-rs/tui/src/chatwidget.rs @@ -12836,6 +12836,38 @@ impl ChatWidget<'_> { } } + pub(super) fn prune_agents_terminal_state( + &mut self, + active_agent_ids: &HashSet, + active_batch_ids: &HashSet, + ) { + self.agents_terminal + .entries + .retain(|id, _| active_agent_ids.contains(id)); + self.agents_terminal + .order + .retain(|id| active_agent_ids.contains(id)); + self.agents_terminal + .scroll_offsets + .retain(|key, _| { + if let Some(agent_id) = key.strip_prefix("agent:") { + active_agent_ids.contains(agent_id) + } else if let Some(batch_id) = key.strip_prefix("batch:") { + if batch_id == "__adhoc__" { + true + } else { + active_batch_ids.contains(batch_id) + } + } else { + true + } + }); + self.agents_terminal.clamp_selected_index(); + if self.agents_terminal.active { + self.restore_selected_agent_scroll(); + } + } + fn enter_agents_terminal_mode(&mut self) { if self.agents_terminal.active { return; diff --git a/code-rs/tui/src/chatwidget/agent_runs.rs b/code-rs/tui/src/chatwidget/agent_runs.rs index 2d512dca443..7a570c91713 100644 --- a/code-rs/tui/src/chatwidget/agent_runs.rs +++ b/code-rs/tui/src/chatwidget/agent_runs.rs @@ -927,6 +927,15 @@ pub(super) fn handle_status_update(chat: &mut ChatWidget<'_>, event: &AgentStatu return; } + let mut active_agent_ids: HashSet = HashSet::with_capacity(event.agents.len()); + let mut active_batch_ids: HashSet = HashSet::new(); + for agent in &event.agents { + active_agent_ids.insert(agent.id.clone()); + if let Some(batch_id) = agent.batch_id.clone() { + active_batch_ids.insert(batch_id); + } + } + let mut grouped: Vec<(String, Vec)> = Vec::new(); let mut missing: Vec = Vec::new(); @@ -961,6 +970,9 @@ pub(super) fn handle_status_update(chat: &mut ChatWidget<'_>, event: &AgentStatu for (batch_id, agents) in grouped { process_status_update_for_batch(chat, &batch_id, &agents, event); } + + prune_finished_runs(chat, &active_agent_ids, &active_batch_ids); + chat.prune_agents_terminal_state(&active_agent_ids, &active_batch_ids); } fn process_status_update_for_batch( @@ -1317,6 +1329,65 @@ fn update_mappings( key } +pub(super) fn prune_finished_runs( + chat: &mut ChatWidget<'_>, + active_agent_ids: &HashSet, + active_batch_ids: &HashSet, +) { + let mut active_keys: HashSet = HashSet::new(); + + chat + .tools_state + .agent_run_by_agent + .retain(|agent_id, key| { + if active_agent_ids.contains(agent_id) { + active_keys.insert(key.clone()); + true + } else { + false + } + }); + + chat + .tools_state + .agent_run_by_batch + .retain(|batch_id, key| { + if active_batch_ids.contains(batch_id) { + active_keys.insert(key.clone()); + true + } else { + false + } + }); + + chat + .tools_state + .agent_run_by_call + .retain(|_, key| active_keys.contains(key)); + chat + .tools_state + .agent_run_by_order + .retain(|_, key| active_keys.contains(key)); + + chat.tools_state.agent_runs.retain(|key, tracker| { + if !active_keys.contains(key) { + return false; + } + tracker.agent_ids.retain(|agent_id| active_agent_ids.contains(agent_id)); + !tracker.agent_ids.is_empty() + }); + + if chat + .tools_state + .agent_last_key + .as_ref() + .map(|key| !active_keys.contains(key)) + .unwrap_or(false) + { + chat.tools_state.agent_last_key = None; + } +} + #[derive(Default)] struct StatusSummary { any_failed: bool,