diff --git a/CHANGELOG.md b/CHANGELOG.md index cbbe6eb8..021e8cc2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,6 +55,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 tests and blocks all package builds (deb, rpm, arch, AppImage, snap, Homebrew, Docker) if they fail. +### Added + +- **XML tag stripping**: Strip internal XML tags from LLM responses to prevent + tag leakage in chat (thinking, reasoning, scratchpad, etc.) +- **Runtime model metadata**: Fetch model metadata from provider APIs for + accurate context window detection during auto-compaction +- **Run detail UI**: Panel showing tool calls and message flow for agent runs, + accessible via expandable button on assistant messages + ### Fixed - **Docker TLS setup**: All Docker examples now expose port 13132 for CA diff --git a/crates/agents/src/lib.rs b/crates/agents/src/lib.rs index feed9f65..ecef5fa9 100644 --- a/crates/agents/src/lib.rs +++ b/crates/agents/src/lib.rs @@ -14,6 +14,7 @@ pub use { runner::AgentRunError, }; pub mod provider_chain; +pub mod response_sanitizer; pub mod silent_turn; pub mod skills; pub mod tool_registry; diff --git a/crates/agents/src/model.rs b/crates/agents/src/model.rs index 1866bb9c..f13cbb47 100644 --- a/crates/agents/src/model.rs +++ b/crates/agents/src/model.rs @@ -364,6 +364,18 @@ pub trait LlmProvider: Send + Sync { ) -> Pin + Send + '_>> { self.stream(messages) } + + /// Fetch runtime model metadata from the provider API. + /// + /// The default implementation returns a `ModelMetadata` derived from the + /// static `context_window()` value. Providers that support a `/models` + /// endpoint can override this to fetch the actual context length at runtime. + async fn model_metadata(&self) -> anyhow::Result { + Ok(ModelMetadata { + id: self.id().to_string(), + context_length: self.context_window(), + }) + } } /// Response from an LLM completion call. @@ -389,6 +401,13 @@ pub struct Usage { pub cache_write_tokens: u32, } +/// Runtime model metadata fetched from provider APIs. +#[derive(Debug, Clone)] +pub struct ModelMetadata { + pub id: String, + pub context_length: u32, +} + #[allow(clippy::unwrap_used, clippy::expect_used)] #[cfg(test)] mod tests { @@ -473,14 +492,11 @@ mod tests { #[test] fn to_openai_assistant_with_tools() { - let msg = ChatMessage::assistant_with_tools( - Some("thinking".into()), - vec![ToolCall { - id: "call_1".into(), - name: "exec".into(), - arguments: serde_json::json!({"cmd": "ls"}), - }], - ); + let msg = ChatMessage::assistant_with_tools(Some("thinking".into()), vec![ToolCall { + id: "call_1".into(), + name: "exec".into(), + arguments: serde_json::json!({"cmd": "ls"}), + }]); let val = msg.to_openai_value(); assert_eq!(val["role"], "assistant"); assert_eq!(val["content"], "thinking"); @@ -657,4 +673,47 @@ mod tests { assert!(matches!(&msgs[0], ChatMessage::User { .. })); assert!(matches!(&msgs[1], ChatMessage::Assistant { .. })); } + + // ── ModelMetadata default trait impl ──────────────────────────── + + /// Minimal provider to test default `model_metadata()` behavior. + struct StubProvider; + + #[async_trait::async_trait] + impl LlmProvider for StubProvider { + fn name(&self) -> &str { + "stub" + } + + fn id(&self) -> &str { + "stub-model" + } + + fn context_window(&self) -> u32 { + 42_000 + } + + async fn complete( + &self, + _: &[ChatMessage], + _: &[serde_json::Value], + ) -> anyhow::Result { + anyhow::bail!("not implemented") + } + + fn stream( + &self, + _: Vec, + ) -> Pin + Send + '_>> { + Box::pin(tokio_stream::empty()) + } + } + + #[tokio::test] + async fn default_model_metadata_returns_context_window() { + let provider = StubProvider; + let meta = provider.model_metadata().await.unwrap(); + assert_eq!(meta.id, "stub-model"); + assert_eq!(meta.context_length, 42_000); + } } diff --git a/crates/agents/src/providers/github_copilot.rs b/crates/agents/src/providers/github_copilot.rs index 69928566..8d5ebded 100644 --- a/crates/agents/src/providers/github_copilot.rs +++ b/crates/agents/src/providers/github_copilot.rs @@ -235,14 +235,11 @@ async fn fetch_valid_copilot_token( } let copilot_resp: CopilotTokenResponse = resp.json().await?; - let _ = token_store.save( - "github-copilot-api", - &OAuthTokens { - access_token: Secret::new(copilot_resp.token.clone()), - refresh_token: None, - expires_at: Some(copilot_resp.expires_at), - }, - ); + let _ = token_store.save("github-copilot-api", &OAuthTokens { + access_token: Secret::new(copilot_resp.token.clone()), + refresh_token: None, + expires_at: Some(copilot_resp.expires_at), + }); Ok(copilot_resp.token) } diff --git a/crates/agents/src/providers/mod.rs b/crates/agents/src/providers/mod.rs index 21575768..99aa4a81 100644 --- a/crates/agents/src/providers/mod.rs +++ b/crates/agents/src/providers/mod.rs @@ -103,6 +103,10 @@ impl LlmProvider for RegistryModelProvider { ) -> Pin + Send + '_>> { self.inner.stream_with_tools(messages, tools) } + + async fn model_metadata(&self) -> anyhow::Result { + self.inner.model_metadata().await + } } /// Resolve an API key from config (Secret) or environment variable, @@ -1552,13 +1556,12 @@ mod tests { #[test] fn mistral_registers_with_api_key() { let mut config = ProvidersConfig::default(); - config.providers.insert( - "mistral".into(), - moltis_config::schema::ProviderEntry { + config + .providers + .insert("mistral".into(), moltis_config::schema::ProviderEntry { api_key: Some(secrecy::Secret::new("sk-test-mistral".into())), ..Default::default() - }, - ); + }); let reg = ProviderRegistry::from_env_with_config(&config); // Should have registered Mistral models @@ -1580,13 +1583,12 @@ mod tests { #[test] fn cerebras_registers_with_api_key() { let mut config = ProvidersConfig::default(); - config.providers.insert( - "cerebras".into(), - moltis_config::schema::ProviderEntry { + config + .providers + .insert("cerebras".into(), moltis_config::schema::ProviderEntry { api_key: Some(secrecy::Secret::new("sk-test-cerebras".into())), ..Default::default() - }, - ); + }); let reg = ProviderRegistry::from_env_with_config(&config); let cerebras_models: Vec<_> = reg @@ -1600,13 +1602,12 @@ mod tests { #[test] fn minimax_registers_with_api_key() { let mut config = ProvidersConfig::default(); - config.providers.insert( - "minimax".into(), - moltis_config::schema::ProviderEntry { + config + .providers + .insert("minimax".into(), moltis_config::schema::ProviderEntry { api_key: Some(secrecy::Secret::new("sk-test-minimax".into())), ..Default::default() - }, - ); + }); let reg = ProviderRegistry::from_env_with_config(&config); assert!(reg.list_models().iter().any(|m| m.provider == "minimax")); @@ -1615,13 +1616,12 @@ mod tests { #[test] fn moonshot_registers_with_api_key() { let mut config = ProvidersConfig::default(); - config.providers.insert( - "moonshot".into(), - moltis_config::schema::ProviderEntry { + config + .providers + .insert("moonshot".into(), moltis_config::schema::ProviderEntry { api_key: Some(secrecy::Secret::new("sk-test-moonshot".into())), ..Default::default() - }, - ); + }); let reg = ProviderRegistry::from_env_with_config(&config); assert!(reg.list_models().iter().any(|m| m.provider == "moonshot")); @@ -1631,13 +1631,12 @@ mod tests { fn openrouter_requires_model_in_config() { // OpenRouter has no default models — without a model in config it registers nothing. let mut config = ProvidersConfig::default(); - config.providers.insert( - "openrouter".into(), - moltis_config::schema::ProviderEntry { + config + .providers + .insert("openrouter".into(), moltis_config::schema::ProviderEntry { api_key: Some(secrecy::Secret::new("sk-test-or".into())), ..Default::default() - }, - ); + }); let reg = ProviderRegistry::from_env_with_config(&config); assert!(!reg.list_models().iter().any(|m| m.provider == "openrouter")); @@ -1646,14 +1645,13 @@ mod tests { #[test] fn openrouter_registers_with_model_in_config() { let mut config = ProvidersConfig::default(); - config.providers.insert( - "openrouter".into(), - moltis_config::schema::ProviderEntry { + config + .providers + .insert("openrouter".into(), moltis_config::schema::ProviderEntry { api_key: Some(secrecy::Secret::new("sk-test-or".into())), model: Some("anthropic/claude-3-haiku".into()), ..Default::default() - }, - ); + }); let reg = ProviderRegistry::from_env_with_config(&config); let or_models: Vec<_> = reg @@ -1669,13 +1667,12 @@ mod tests { fn ollama_registers_without_api_key_env() { // Ollama should use a dummy key if no env var is set. let mut config = ProvidersConfig::default(); - config.providers.insert( - "ollama".into(), - moltis_config::schema::ProviderEntry { + config + .providers + .insert("ollama".into(), moltis_config::schema::ProviderEntry { model: Some("llama3".into()), ..Default::default() - }, - ); + }); let reg = ProviderRegistry::from_env_with_config(&config); assert!(reg.list_models().iter().any(|m| m.provider == "ollama")); @@ -1685,13 +1682,12 @@ mod tests { #[test] fn venice_requires_model_in_config() { let mut config = ProvidersConfig::default(); - config.providers.insert( - "venice".into(), - moltis_config::schema::ProviderEntry { + config + .providers + .insert("venice".into(), moltis_config::schema::ProviderEntry { api_key: Some(secrecy::Secret::new("sk-test-venice".into())), ..Default::default() - }, - ); + }); let reg = ProviderRegistry::from_env_with_config(&config); assert!(!reg.list_models().iter().any(|m| m.provider == "venice")); @@ -1700,14 +1696,13 @@ mod tests { #[test] fn disabled_provider_not_registered() { let mut config = ProvidersConfig::default(); - config.providers.insert( - "mistral".into(), - moltis_config::schema::ProviderEntry { + config + .providers + .insert("mistral".into(), moltis_config::schema::ProviderEntry { api_key: Some(secrecy::Secret::new("sk-test".into())), enabled: false, ..Default::default() - }, - ); + }); let reg = ProviderRegistry::from_env_with_config(&config); assert!(!reg.list_models().iter().any(|m| m.provider == "mistral")); @@ -1727,14 +1722,13 @@ mod tests { #[test] fn custom_base_url_from_config() { let mut config = ProvidersConfig::default(); - config.providers.insert( - "mistral".into(), - moltis_config::schema::ProviderEntry { + config + .providers + .insert("mistral".into(), moltis_config::schema::ProviderEntry { api_key: Some(secrecy::Secret::new("sk-test".into())), base_url: Some("https://custom.mistral.example.com/v1".into()), ..Default::default() - }, - ); + }); let reg = ProviderRegistry::from_env_with_config(&config); assert!(reg.list_models().iter().any(|m| m.provider == "mistral")); @@ -1743,14 +1737,13 @@ mod tests { #[test] fn specific_model_override() { let mut config = ProvidersConfig::default(); - config.providers.insert( - "mistral".into(), - moltis_config::schema::ProviderEntry { + config + .providers + .insert("mistral".into(), moltis_config::schema::ProviderEntry { api_key: Some(secrecy::Secret::new("sk-test".into())), model: Some("mistral-small-latest".into()), ..Default::default() - }, - ); + }); let reg = ProviderRegistry::from_env_with_config(&config); let mistral_models: Vec<_> = reg @@ -1831,12 +1824,11 @@ mod tests { fn local_llm_requires_model_in_config() { // local-llm is a "bring your own model" provider — without a model it registers nothing. let mut config = ProvidersConfig::default(); - config.providers.insert( - "local".into(), - moltis_config::schema::ProviderEntry { + config + .providers + .insert("local".into(), moltis_config::schema::ProviderEntry { ..Default::default() - }, - ); + }); let reg = ProviderRegistry::from_env_with_config(&config); assert!(!reg.list_models().iter().any(|m| m.provider == "local-llm")); @@ -1846,13 +1838,12 @@ mod tests { #[test] fn local_llm_registers_with_model_in_config() { let mut config = ProvidersConfig::default(); - config.providers.insert( - "local".into(), - moltis_config::schema::ProviderEntry { + config + .providers + .insert("local".into(), moltis_config::schema::ProviderEntry { model: Some("qwen2.5-coder-7b-q4_k_m".into()), ..Default::default() - }, - ); + }); let reg = ProviderRegistry::from_env_with_config(&config); let local_models: Vec<_> = reg @@ -1868,14 +1859,13 @@ mod tests { #[test] fn local_llm_disabled_not_registered() { let mut config = ProvidersConfig::default(); - config.providers.insert( - "local".into(), - moltis_config::schema::ProviderEntry { + config + .providers + .insert("local".into(), moltis_config::schema::ProviderEntry { enabled: false, model: Some("qwen2.5-coder-7b-q4_k_m".into()), ..Default::default() - }, - ); + }); let reg = ProviderRegistry::from_env_with_config(&config); assert!(!reg.list_models().iter().any(|m| m.provider == "local-llm")); diff --git a/crates/agents/src/providers/openai.rs b/crates/agents/src/providers/openai.rs index 13d1036e..85e6d931 100644 --- a/crates/agents/src/providers/openai.rs +++ b/crates/agents/src/providers/openai.rs @@ -18,6 +18,7 @@ pub struct OpenAiProvider { base_url: String, provider_name: String, client: reqwest::Client, + metadata_cache: tokio::sync::OnceCell, } impl OpenAiProvider { @@ -28,6 +29,7 @@ impl OpenAiProvider { base_url, provider_name: "openai".into(), client: reqwest::Client::new(), + metadata_cache: tokio::sync::OnceCell::new(), } } @@ -43,6 +45,7 @@ impl OpenAiProvider { base_url, provider_name, client: reqwest::Client::new(), + metadata_cache: tokio::sync::OnceCell::new(), } } } @@ -69,6 +72,55 @@ impl LlmProvider for OpenAiProvider { super::supports_vision_for_model(&self.model) } + async fn model_metadata(&self) -> anyhow::Result { + let meta = self + .metadata_cache + .get_or_try_init(|| async { + let url = format!("{}/models/{}", self.base_url, self.model); + debug!(url = %url, model = %self.model, "fetching model metadata"); + + let resp = self + .client + .get(&url) + .header( + "Authorization", + format!("Bearer {}", self.api_key.expose_secret()), + ) + .send() + .await?; + + if !resp.status().is_success() { + anyhow::bail!( + "model metadata API returned HTTP {}", + resp.status().as_u16() + ); + } + + let body: serde_json::Value = resp.json().await?; + + // OpenAI uses "context_window", some compat providers use "context_length". + let context_length = body + .get("context_window") + .or_else(|| body.get("context_length")) + .and_then(|v| v.as_u64()) + .map(|v| v as u32) + .unwrap_or_else(|| self.context_window()); + + debug!( + model = %self.model, + context_length, + "model metadata fetched" + ); + + Ok(crate::model::ModelMetadata { + id: self.model.clone(), + context_length, + }) + }) + .await?; + Ok(meta.clone()) + } + async fn complete( &self, messages: &[ChatMessage], @@ -513,4 +565,99 @@ mod tests { assert_eq!(text_deltas.join(""), "Let me help."); assert_eq!(tool_starts, vec!["my_tool"]); } + + // ── model_metadata tests ────────────────────────────────────────── + + /// Start a mock server with both /chat/completions and /models/:model endpoints. + async fn start_model_metadata_mock(model_response: Option) -> String { + use axum::routing::get; + + let mut app = + Router::new().route("/chat/completions", post(|| async { "data: [DONE]\n\n" })); + + if let Some(resp) = model_response { + let resp_str = serde_json::to_string(&resp).unwrap(); + app = app.route( + "/models/{model}", + get(move || { + let body = resp_str.clone(); + async move { + axum::response::Response::builder() + .header("content-type", "application/json") + .body(axum::body::Body::from(body)) + .unwrap() + } + }), + ); + } else { + app = app.route( + "/models/{model}", + get(|| async { + axum::response::Response::builder() + .status(404) + .body(axum::body::Body::from("not found")) + .unwrap() + }), + ); + } + + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + tokio::spawn(async move { + axum::serve(listener, app).await.unwrap(); + }); + + format!("http://{addr}") + } + + #[tokio::test] + async fn model_metadata_returns_context_window_from_api() { + let base_url = start_model_metadata_mock(Some(serde_json::json!({ + "id": "gpt-4o", + "context_window": 128000 + }))) + .await; + + let provider = test_provider(&base_url); + let meta = provider.model_metadata().await.unwrap(); + assert_eq!(meta.context_length, 128_000); + assert_eq!(meta.id, "gpt-4o"); + } + + #[tokio::test] + async fn model_metadata_uses_context_length_field() { + let base_url = start_model_metadata_mock(Some(serde_json::json!({ + "id": "some-model", + "context_length": 64000 + }))) + .await; + + let provider = test_provider(&base_url); + let meta = provider.model_metadata().await.unwrap(); + assert_eq!(meta.context_length, 64_000); + } + + #[tokio::test] + async fn model_metadata_fallback_on_api_error() { + let base_url = start_model_metadata_mock(None).await; + + let provider = test_provider(&base_url); + let result = provider.model_metadata().await; + // API returns 404, so this should error + assert!(result.is_err()); + } + + #[tokio::test] + async fn model_metadata_caches_result() { + let base_url = start_model_metadata_mock(Some(serde_json::json!({ + "id": "gpt-4o", + "context_window": 128000 + }))) + .await; + + let provider = test_provider(&base_url); + let meta1 = provider.model_metadata().await.unwrap(); + let meta2 = provider.model_metadata().await.unwrap(); + assert_eq!(meta1.context_length, meta2.context_length); + } } diff --git a/crates/agents/src/providers/openai_codex.rs b/crates/agents/src/providers/openai_codex.rs index e0de2c53..2f17e23d 100644 --- a/crates/agents/src/providers/openai_codex.rs +++ b/crates/agents/src/providers/openai_codex.rs @@ -940,14 +940,11 @@ mod tests { #[test] fn convert_messages_tool_call_and_result() { let messages = vec![ - ChatMessage::assistant_with_tools( - None, - vec![ToolCall { - id: "call_1".to_string(), - name: "get_time".to_string(), - arguments: serde_json::json!({}), - }], - ), + ChatMessage::assistant_with_tools(None, vec![ToolCall { + id: "call_1".to_string(), + name: "get_time".to_string(), + arguments: serde_json::json!({}), + }]), ChatMessage::tool("call_1", "12:00"), ]; let converted = OpenAiCodexProvider::convert_messages(&messages); @@ -1072,14 +1069,11 @@ mod tests { .to_string(); let messages = vec![ ChatMessage::user("Take a screenshot"), - ChatMessage::assistant_with_tools( - None, - vec![ToolCall { - id: "call_screenshot".to_string(), - name: "browser_screenshot".to_string(), - arguments: serde_json::json!({}), - }], - ), + ChatMessage::assistant_with_tools(None, vec![ToolCall { + id: "call_screenshot".to_string(), + name: "browser_screenshot".to_string(), + arguments: serde_json::json!({}), + }]), ChatMessage::tool("call_screenshot", &tool_output), ChatMessage::assistant("Here is the screenshot."), ]; diff --git a/crates/agents/src/providers/openai_compat.rs b/crates/agents/src/providers/openai_compat.rs index e2b0b3e1..7788c3cf 100644 --- a/crates/agents/src/providers/openai_compat.rs +++ b/crates/agents/src/providers/openai_compat.rs @@ -608,10 +608,9 @@ mod tests { let events = finalize_stream(&state); assert_eq!(events.len(), 2); - assert!(matches!( - &events[0], - StreamEvent::ToolCallComplete { index: 0 } - )); + assert!(matches!(&events[0], StreamEvent::ToolCallComplete { + index: 0 + })); assert!(matches!( &events[1], StreamEvent::Done(usage) if usage.input_tokens == 10 && usage.output_tokens == 5 diff --git a/crates/agents/src/response_sanitizer.rs b/crates/agents/src/response_sanitizer.rs new file mode 100644 index 00000000..877b43fe --- /dev/null +++ b/crates/agents/src/response_sanitizer.rs @@ -0,0 +1,409 @@ +//! Strip internal XML tags and special tokens from LLM responses. +//! +//! Some models leak internal reasoning tags (``, ``, etc.) +//! or special control tokens (`<|eot_id|>`, `<|im_end|>`, etc.) into their +//! responses. This module strips them to produce clean user-facing text. +//! +//! The stripping is done with hand-rolled string scanning (no regex) to match +//! the existing `strip_base64_blobs` pattern in `runner.rs`. + +use crate::model::ToolCall; + +/// Known internal XML tags that should be stripped from LLM responses. +const INTERNAL_TAGS: &[&str] = &[ + "thinking", + "think", + "reflection", + "inner_monologue", + "scratchpad", + "reasoning", + "analysis", + "self_reflection", + "meta", + "internal_thought", + "function_call", + "tool_use", +]; + +/// Standalone pipe tokens that should be stripped. +const STANDALONE_PIPE_TOKENS: &[&str] = &[ + "<|eot_id|>", + "<|end|>", + "<|im_end|>", + "<|im_start|>", + "<|begin_of_text|>", + "<|end_of_text|>", + "<|python_tag|>", + "<|eom_id|>", + "<|start_header_id|>", + "<|end_header_id|>", +]; + +/// Tags used for tool call recovery. +const TOOL_CALL_TAGS: &[&str] = &["function_call", "tool_call"]; + +/// Main entry point: chain all stripping passes and trim the result. +pub fn clean_response(text: &str) -> String { + let mut result = strip_internal_tags(text); + result = strip_standalone_pipe_tokens(&result); + result = strip_reasoning_patterns(&result); + result.trim().to_string() +} + +/// Strip all known internal XML tags and their content. +fn strip_internal_tags(text: &str) -> String { + let mut result = text.to_string(); + for tag in INTERNAL_TAGS { + result = strip_xml_tag(&result, tag); + result = strip_pipe_tag(&result, tag); + } + result +} + +/// Strip `content` pairs, handling optional attributes. +/// +/// Matches opening tags with or without attributes (e.g. ``, +/// ``), and removes everything up to and including +/// the corresponding closing tag. +fn strip_xml_tag(text: &str, tag: &str) -> String { + let mut result = String::with_capacity(text.len()); + let mut rest = text; + + let open_exact = format!("<{tag}>"); + let open_with_space = format!("<{tag} "); + let close = format!(""); + + loop { + // Find the earliest opening tag variant. + let exact_pos = rest.find(&open_exact); + let space_pos = rest.find(&open_with_space); + + let start = match (exact_pos, space_pos) { + (Some(a), Some(b)) => Some(a.min(b)), + (Some(a), None) => Some(a), + (None, Some(b)) => Some(b), + (None, None) => None, + }; + + let Some(start) = start else { + result.push_str(rest); + break; + }; + + // Push everything before the tag. + result.push_str(&rest[..start]); + + // Find the end of the opening tag (the `>`). + let after_open = &rest[start..]; + let Some(gt_pos) = after_open.find('>') else { + // Malformed tag — keep everything as-is. + result.push_str(&rest[start..]); + break; + }; + + // Now look for the closing tag. + let after_open_tag = &rest[start + gt_pos + 1..]; + if let Some(close_pos) = after_open_tag.find(&close) { + // Skip past the closing tag. + rest = &after_open_tag[close_pos + close.len()..]; + } else { + // No closing tag — strip everything from open tag to end + // (the tag is likely wrapping remaining content). + break; + } + } + result +} + +/// Strip `<|tag|>...<|/tag|>` pairs (pipe-delimited variant). +fn strip_pipe_tag(text: &str, tag: &str) -> String { + let mut result = String::with_capacity(text.len()); + let mut rest = text; + + let open = format!("<|{tag}|>"); + let close = format!("<|/{tag}|>"); + + loop { + let Some(start) = rest.find(&open) else { + result.push_str(rest); + break; + }; + + result.push_str(&rest[..start]); + + let after_open = &rest[start + open.len()..]; + if let Some(close_pos) = after_open.find(&close) { + rest = &after_open[close_pos + close.len()..]; + } else { + // No closing tag — strip to end. + break; + } + } + result +} + +/// Strip standalone pipe tokens (`<|eot_id|>`, `<|im_end|>`, etc.). +fn strip_standalone_pipe_tokens(text: &str) -> String { + let mut result = text.to_string(); + for token in STANDALONE_PIPE_TOKENS { + // Simple replacement — these tokens are always standalone. + result = result.replace(token, ""); + } + result +} + +/// Strip reasoning pattern blocks: `...` and similar +/// capitalized variants that some models produce at the start of responses. +fn strip_reasoning_patterns(text: &str) -> String { + let mut result = text.to_string(); + // Handle capitalized variants not covered by the lowercase tag list. + for tag in &["Thinking", "Reflection", "Reasoning", "Analysis"] { + result = strip_xml_tag(&result, tag); + } + result +} + +/// Attempt to recover structured `ToolCall` from `` or +/// `` XML blocks embedded in the response text. +/// +/// Returns the cleaned text (with recovered blocks removed) and any +/// recovered tool calls. +pub fn recover_tool_calls_from_content(text: &str) -> (String, Vec) { + let mut cleaned = text.to_string(); + let mut tool_calls = Vec::new(); + + for tag in TOOL_CALL_TAGS { + let open_exact = format!("<{tag}>"); + let open_with_space = format!("<{tag} "); + let close = format!(""); + + loop { + let exact_pos = cleaned.find(&open_exact); + let space_pos = cleaned.find(&open_with_space); + + let start = match (exact_pos, space_pos) { + (Some(a), Some(b)) => Some(a.min(b)), + (Some(a), None) => Some(a), + (None, Some(b)) => Some(b), + (None, None) => None, + }; + + let Some(start) = start else { + break; + }; + + let after_open = &cleaned[start..]; + let Some(gt_pos) = after_open.find('>') else { + break; + }; + + let content_start = start + gt_pos + 1; + let after_content = &cleaned[content_start..]; + + let Some(close_pos) = after_content.find(&close) else { + break; + }; + + let xml_content = &cleaned[content_start..content_start + close_pos].trim(); + + // Try to parse the content as JSON to extract tool call info. + if let Some(tc) = parse_tool_call_json(xml_content) { + tool_calls.push(tc); + } + + // Remove the entire block from cleaned text. + let end = content_start + close_pos + close.len(); + cleaned = format!("{}{}", &cleaned[..start], &cleaned[end..]); + } + } + + (cleaned.trim().to_string(), tool_calls) +} + +/// Parse JSON content from an XML tool call block into a `ToolCall`. +/// +/// Accepts formats like: +/// ```json +/// {"name": "exec", "arguments": {"command": "ls"}} +/// ``` +/// or: +/// ```json +/// {"tool": "exec", "arguments": {"command": "ls"}} +/// ``` +fn parse_tool_call_json(content: &str) -> Option { + let parsed: serde_json::Value = serde_json::from_str(content).ok()?; + let name = parsed + .get("name") + .or_else(|| parsed.get("tool")) + .and_then(|v| v.as_str())? + .to_string(); + let arguments = parsed + .get("arguments") + .or_else(|| parsed.get("parameters")) + .cloned() + .unwrap_or(serde_json::json!({})); + let id = format!("xml-{}", uuid::Uuid::new_v4()); + Some(ToolCall { + id, + name, + arguments, + }) +} + +#[allow(clippy::unwrap_used, clippy::expect_used)] +#[cfg(test)] +mod tests { + use super::*; + + // ── strip_xml_tag ────────────────────────────────────────────── + + #[test] + fn strip_simple_thinking_tag() { + let input = "Hello internal thought world"; + assert_eq!(strip_xml_tag(input, "thinking"), "Hello world"); + } + + #[test] + fn strip_tag_with_attributes() { + let input = "Start reasoning here end"; + assert_eq!(strip_xml_tag(input, "thinking"), "Start end"); + } + + #[test] + fn strip_multiple_tags() { + let input = "atextb"; + assert_eq!(strip_xml_tag(input, "think"), "text"); + } + + #[test] + fn no_matching_tag_unchanged() { + let input = "Hello world"; + assert_eq!(strip_xml_tag(input, "thinking"), "Hello world"); + } + + #[test] + fn unclosed_tag_strips_to_end() { + let input = "Hello unfinished"; + // No closing tag — everything from open tag to end is stripped. + assert_eq!(strip_xml_tag(input, "thinking"), "Hello "); + } + + #[test] + fn nested_content_preserved_outside() { + let input = "beforesome analysisafter"; + assert_eq!(strip_xml_tag(input, "reflection"), "beforeafter"); + } + + // ── strip_pipe_tag ───────────────────────────────────────────── + + #[test] + fn strip_pipe_tag_basic() { + let input = "Hello <|thinking|>internal<|/thinking|> world"; + assert_eq!(strip_pipe_tag(input, "thinking"), "Hello world"); + } + + #[test] + fn strip_pipe_tag_no_close() { + let input = "Hello <|thinking|>unfinished"; + assert_eq!(strip_pipe_tag(input, "thinking"), "Hello "); + } + + // ── strip_standalone_pipe_tokens ─────────────────────────────── + + #[test] + fn strip_eot_tokens() { + let input = "Hello world<|eot_id|>"; + assert_eq!(strip_standalone_pipe_tokens(input), "Hello world"); + } + + #[test] + fn strip_multiple_standalone_tokens() { + let input = "<|begin_of_text|>Hello<|im_end|> world<|end|>"; + assert_eq!(strip_standalone_pipe_tokens(input), "Hello world"); + } + + // ── strip_reasoning_patterns ─────────────────────────────────── + + #[test] + fn strip_capitalized_thinking() { + let input = "Let me reason about this...Here is my answer."; + assert_eq!(strip_reasoning_patterns(input), "Here is my answer."); + } + + // ── clean_response (integration) ─────────────────────────────── + + #[test] + fn clean_response_strips_all_tag_types() { + let input = "reasoningAnswer here<|eot_id|><|im_end|>"; + assert_eq!(clean_response(input), "Answer here"); + } + + #[test] + fn clean_response_preserves_normal_text() { + let input = "This is a normal response with no tags."; + assert_eq!( + clean_response(input), + "This is a normal response with no tags." + ); + } + + #[test] + fn clean_response_trims_whitespace() { + let input = " x Hello "; + assert_eq!(clean_response(input), "Hello"); + } + + #[test] + fn clean_response_complex_mixed() { + let input = "Step 1: analyze\n\nThe answer is 42.<|end|>\nWas I right?"; + assert_eq!(clean_response(input), "The answer is 42."); + } + + // ── recover_tool_calls_from_content ──────────────────────────── + + #[test] + fn recover_tool_call_from_function_call_block() { + let input = r#"Some text {"name": "exec", "arguments": {"command": "ls"}} more text"#; + let (cleaned, calls) = recover_tool_calls_from_content(input); + assert_eq!(cleaned, "Some text more text"); + assert_eq!(calls.len(), 1); + assert_eq!(calls[0].name, "exec"); + assert_eq!(calls[0].arguments, serde_json::json!({"command": "ls"})); + } + + #[test] + fn recover_tool_call_with_tool_key() { + let input = + r#"{"tool": "web_search", "arguments": {"query": "rust"}}"#; + let (cleaned, calls) = recover_tool_calls_from_content(input); + assert_eq!(cleaned, ""); + assert_eq!(calls.len(), 1); + assert_eq!(calls[0].name, "web_search"); + } + + #[test] + fn recover_no_tool_calls_returns_empty() { + let input = "Just normal text with no tool calls."; + let (cleaned, calls) = recover_tool_calls_from_content(input); + assert_eq!(cleaned, "Just normal text with no tool calls."); + assert!(calls.is_empty()); + } + + #[test] + fn recover_malformed_json_skipped() { + let input = "not jsonrest"; + let (cleaned, calls) = recover_tool_calls_from_content(input); + assert_eq!(cleaned, "rest"); + assert!(calls.is_empty()); + } + + #[test] + fn recover_multiple_tool_calls() { + let input = r#"{"name": "a", "arguments": {}}text{"name": "b", "arguments": {}}"#; + let (cleaned, calls) = recover_tool_calls_from_content(input); + assert_eq!(cleaned, "text"); + assert_eq!(calls.len(), 2); + assert_eq!(calls[0].name, "a"); + assert_eq!(calls[1].name, "b"); + } +} diff --git a/crates/agents/src/runner.rs b/crates/agents/src/runner.rs index 0574baea..3c9fbee8 100644 --- a/crates/agents/src/runner.rs +++ b/crates/agents/src/runner.rs @@ -14,6 +14,7 @@ use crate::{ model::{ ChatMessage, CompletionResponse, LlmProvider, StreamEvent, ToolCall, Usage, UserContent, }, + response_sanitizer::{clean_response, recover_tool_calls_from_content}, tool_registry::ToolRegistry, }; @@ -604,6 +605,26 @@ pub async fn run_agent_loop_with_context( response.tool_calls = vec![tc]; } + // Fallback: recover tool calls from XML blocks (, ). + if !native_tools + && response.tool_calls.is_empty() + && let Some(ref text) = response.text + { + let (cleaned, recovered) = recover_tool_calls_from_content(text); + if !recovered.is_empty() { + info!( + count = recovered.len(), + "recovered tool calls from XML blocks in response text" + ); + response.text = if cleaned.is_empty() { + None + } else { + Some(cleaned) + }; + response.tool_calls = recovered; + } + } + for tc in &response.tool_calls { info!( iteration = iterations, @@ -655,7 +676,7 @@ pub async fn run_agent_loop_with_context( // If no tool calls, return the text response. if response.tool_calls.is_empty() { - let text = response.text.unwrap_or_default(); + let text = clean_response(&response.text.unwrap_or_default()); info!( iterations, @@ -1164,6 +1185,19 @@ pub async fn run_agent_loop_streaming( tool_calls = vec![tc]; } + // Fallback: recover tool calls from XML blocks (, ). + if !native_tools && tool_calls.is_empty() && !accumulated_text.is_empty() { + let (cleaned, recovered) = recover_tool_calls_from_content(&accumulated_text); + if !recovered.is_empty() { + info!( + count = recovered.len(), + "recovered tool calls from XML blocks in streamed text" + ); + accumulated_text = cleaned; + tool_calls = recovered; + } + } + // Dispatch AfterLLMCall hook — may block tool execution. if let Some(ref hooks) = hook_registry { let tc_json: Vec = tool_calls @@ -1215,7 +1249,7 @@ pub async fn run_agent_loop_streaming( "streaming agent loop complete — returning text" ); return Ok(AgentRunResult { - text: accumulated_text, + text: clean_response(&accumulated_text), iterations, tool_calls_made: total_tool_calls, usage: Usage { diff --git a/crates/agents/src/tool_registry.rs b/crates/agents/src/tool_registry.rs index bebe4985..89234c7d 100644 --- a/crates/agents/src/tool_registry.rs +++ b/crates/agents/src/tool_registry.rs @@ -52,25 +52,19 @@ impl ToolRegistry { /// Register a built-in tool. pub fn register(&mut self, tool: Box) { let name = tool.name().to_string(); - self.tools.insert( - name, - ToolEntry { - tool: Arc::from(tool), - source: ToolSource::Builtin, - }, - ); + self.tools.insert(name, ToolEntry { + tool: Arc::from(tool), + source: ToolSource::Builtin, + }); } /// Register a tool from an MCP server. pub fn register_mcp(&mut self, tool: Box, server: String) { let name = tool.name().to_string(); - self.tools.insert( - name, - ToolEntry { - tool: Arc::from(tool), - source: ToolSource::Mcp { server }, - }, - ); + self.tools.insert(name, ToolEntry { + tool: Arc::from(tool), + source: ToolSource::Mcp { server }, + }); } pub fn unregister(&mut self, name: &str) -> bool { @@ -124,13 +118,10 @@ impl ToolRegistry { .iter() .filter(|(name, _)| !name.starts_with(prefix)) .map(|(name, entry)| { - ( - name.clone(), - ToolEntry { - tool: Arc::clone(&entry.tool), - source: entry.source.clone(), - }, - ) + (name.clone(), ToolEntry { + tool: Arc::clone(&entry.tool), + source: entry.source.clone(), + }) }) .collect(); ToolRegistry { tools } @@ -143,13 +134,10 @@ impl ToolRegistry { .iter() .filter(|(_, entry)| !matches!(entry.source, ToolSource::Mcp { .. })) .map(|(name, entry)| { - ( - name.clone(), - ToolEntry { - tool: Arc::clone(&entry.tool), - source: entry.source.clone(), - }, - ) + (name.clone(), ToolEntry { + tool: Arc::clone(&entry.tool), + source: entry.source.clone(), + }) }) .collect(); ToolRegistry { tools } @@ -162,13 +150,10 @@ impl ToolRegistry { .iter() .filter(|(name, _)| !exclude.contains(&name.as_str())) .map(|(name, entry)| { - ( - name.clone(), - ToolEntry { - tool: Arc::clone(&entry.tool), - source: entry.source.clone(), - }, - ) + (name.clone(), ToolEntry { + tool: Arc::clone(&entry.tool), + source: entry.source.clone(), + }) }) .collect(); ToolRegistry { tools } @@ -184,13 +169,10 @@ impl ToolRegistry { .iter() .filter(|(name, _)| predicate(name)) .map(|(name, entry)| { - ( - name.clone(), - ToolEntry { - tool: Arc::clone(&entry.tool), - source: entry.source.clone(), - }, - ) + (name.clone(), ToolEntry { + tool: Arc::clone(&entry.tool), + source: entry.source.clone(), + }) }) .collect(); ToolRegistry { tools } diff --git a/crates/browser/src/container.rs b/crates/browser/src/container.rs index b68a61d1..6fa6f1d7 100644 --- a/crates/browser/src/container.rs +++ b/crates/browser/src/container.rs @@ -700,13 +700,10 @@ mod tests { fn test_parse_docker_container_names_filters_prefix() { let input = b"moltis-test-browser-abc\nother-container\nmoltis-test-browser-def\n"; let parsed = parse_docker_container_names(input, "moltis-test-browser"); - assert_eq!( - parsed, - vec![ - "moltis-test-browser-abc".to_string(), - "moltis-test-browser-def".to_string() - ] - ); + assert_eq!(parsed, vec![ + "moltis-test-browser-abc".to_string(), + "moltis-test-browser-def".to_string() + ]); } #[cfg(target_os = "macos")] @@ -718,13 +715,10 @@ mod tests { {"configuration":{"id":"moltis-test-browser-456"}} ]"#; let parsed = parse_apple_container_names_for_prefix(json, "moltis-test-browser").unwrap(); - assert_eq!( - parsed, - vec![ - "moltis-test-browser-123".to_string(), - "moltis-test-browser-456".to_string() - ] - ); + assert_eq!(parsed, vec![ + "moltis-test-browser-123".to_string(), + "moltis-test-browser-456".to_string() + ]); } #[test] diff --git a/crates/config/src/validate.rs b/crates/config/src/validate.rs index 92227e96..121223e2 100644 --- a/crates/config/src/validate.rs +++ b/crates/config/src/validate.rs @@ -276,13 +276,10 @@ fn build_schema_map() -> KnownKeys { ("update_repository_url", Leaf), ])), ), - ( - "providers", - MapWithFields { - value: Box::new(provider_entry()), - fields: HashMap::from([("offered", Array(Box::new(Leaf)))]), - }, - ), + ("providers", MapWithFields { + value: Box::new(provider_entry()), + fields: HashMap::from([("offered", Array(Box::new(Leaf)))]), + }), ( "chat", Struct(HashMap::from([ diff --git a/crates/cron/src/service.rs b/crates/cron/src/service.rs index e21c3f50..5fc82385 100644 --- a/crates/cron/src/service.rs +++ b/crates/cron/src/service.rs @@ -801,13 +801,10 @@ mod tests { .unwrap(); let updated = svc - .update( - &job.id, - CronJobPatch { - name: Some("renamed".into()), - ..Default::default() - }, - ) + .update(&job.id, CronJobPatch { + name: Some("renamed".into()), + ..Default::default() + }) .await .unwrap(); diff --git a/crates/gateway/src/assets/js/components/run-detail.js b/crates/gateway/src/assets/js/components/run-detail.js new file mode 100644 index 00000000..08d9b589 --- /dev/null +++ b/crates/gateway/src/assets/js/components/run-detail.js @@ -0,0 +1,187 @@ +// ── RunDetail Preact component ─────────────────────────── +// +// Expand/collapse panel showing tool calls and message flow +// for a specific agent run. Lazy-loads data via RPC. + +import { html } from "htm/preact"; +import { useCallback, useState } from "preact/hooks"; +import { sendRpc } from "../helpers.js"; + +var TABS = ["overview", "actions", "messages"]; + +function TabButton({ label, active, onClick }) { + return html``; +} + +function OverviewTab({ data }) { + if (!data) return null; + var summary = data.summary || {}; + var messages = data.messages || []; + var model = null; + var provider = null; + var totalInput = 0; + var totalOutput = 0; + for (var m of messages) { + if (m.role === "assistant") { + if (m.model) model = m.model; + if (m.provider) provider = m.provider; + totalInput += m.inputTokens || 0; + totalOutput += m.outputTokens || 0; + } + } + return html`
+
+ User messages: + ${summary.userMessages || 0} +
+
+ Tool calls: + ${summary.toolCalls || 0} +
+
+ Assistant messages: + ${summary.assistantMessages || 0} +
+ ${ + model + ? html`
+ Model: + ${provider ? `${provider} / ` : ""}${model} +
` + : null + } + ${ + totalInput + totalOutput > 0 + ? html`
+ Tokens: + ${totalInput} in / ${totalOutput} out +
` + : null + } +
`; +} + +function ActionsTab({ data }) { + if (!data) return null; + var toolResults = (data.messages || []).filter((m) => m.role === "tool_result"); + if (toolResults.length === 0) return html`
No tool calls in this run.
`; + return html`
+ ${toolResults.map( + (tr) => + html`
+
+ ${tr.tool_name || "unknown"} + ${tr.success ? "ok" : "error"} +
+ ${ + tr.arguments + ? html`
+${JSON.stringify(tr.arguments, null, 2)}
` + : null + } + ${tr.error ? html`
${tr.error}
` : null} +
`, + )} +
`; +} + +function MessagesTab({ data }) { + if (!data) return null; + var messages = data.messages || []; + if (messages.length === 0) return html`
No messages.
`; + return html`
+ ${messages.map( + (m, i) => + html`
+ ${m.role} + #${i} + ${ + typeof m.content === "string" && m.content + ? html`
+ ${m.content.length > 500 ? `${m.content.slice(0, 500)}\u2026` : m.content} +
` + : null + } +
`, + )} +
`; +} + +export function RunDetail({ sessionKey, runId }) { + var [expanded, setExpanded] = useState(false); + var [data, setData] = useState(null); + var [loading, setLoading] = useState(false); + var [activeTab, setActiveTab] = useState("overview"); + + var toggle = useCallback(() => { + var next = !expanded; + setExpanded(next); + if (next && !data && !loading) { + setLoading(true); + sendRpc("sessions.run_detail", { sessionKey, runId }).then((res) => { + setLoading(false); + if (res?.ok && res.payload) { + setData(res.payload); + } + }); + } + }, [expanded, data, loading, sessionKey, runId]); + + return html`
+ + ${ + expanded + ? html`
+ ${ + loading + ? html`
Loading\u2026
` + : html`
+
+ ${TABS.map( + (t) => + html`<${TabButton} + label=${t.charAt(0).toUpperCase() + t.slice(1)} + active=${activeTab === t} + onClick=${() => setActiveTab(t)} + />`, + )} +
+ ${activeTab === "overview" && html`<${OverviewTab} data=${data} />`} + ${activeTab === "actions" && html`<${ActionsTab} data=${data} />`} + ${activeTab === "messages" && html`<${MessagesTab} data=${data} />`} +
` + } +
` + : null + } +
`; +} diff --git a/crates/gateway/src/assets/js/run-detail-mount.js b/crates/gateway/src/assets/js/run-detail-mount.js new file mode 100644 index 00000000..c3955f30 --- /dev/null +++ b/crates/gateway/src/assets/js/run-detail-mount.js @@ -0,0 +1,18 @@ +// ── Bridge between imperative DOM and Preact RunDetail component ── + +import { html } from "htm/preact"; +import { render } from "preact"; +import { RunDetail } from "./components/run-detail.js"; + +/** + * Mount a RunDetail component inside a DOM element. + * @param {HTMLElement} container - The parent element to render into + * @param {string} sessionKey - Session key for RPC calls + * @param {string} runId - The run ID to display details for + */ +export function mountRunDetail(container, sessionKey, runId) { + var wrapper = document.createElement("div"); + wrapper.className = "run-detail-mount"; + container.appendChild(wrapper); + render(html`<${RunDetail} sessionKey=${sessionKey} runId=${runId} />`, wrapper); +} diff --git a/crates/gateway/src/assets/js/sessions.js b/crates/gateway/src/assets/js/sessions.js index 7e9d61fe..bc153e45 100644 --- a/crates/gateway/src/assets/js/sessions.js +++ b/crates/gateway/src/assets/js/sessions.js @@ -21,6 +21,7 @@ import { } from "./helpers.js"; import { updateSessionProjectSelect } from "./project-combo.js"; import { currentPrefix, navigate, sessionPath } from "./router.js"; +import { mountRunDetail } from "./run-detail-mount.js"; import { updateSandboxImageUI, updateSandboxUI } from "./sandbox.js"; import * as S from "./state.js"; import { modelStore } from "./stores/model-store.js"; @@ -276,6 +277,10 @@ function renderHistoryAssistantMessage(msg) { if (el && msg.model) { el.appendChild(createModelFooter(msg)); } + // Mount run detail component if this message has a run_id. + if (el && msg.run_id) { + mountRunDetail(el, S.activeSessionKey, msg.run_id); + } if (msg.inputTokens || msg.outputTokens) { S.sessionTokens.input += msg.inputTokens || 0; S.sessionTokens.output += msg.outputTokens || 0; diff --git a/crates/gateway/src/auth_webauthn.rs b/crates/gateway/src/auth_webauthn.rs index 2527b9e7..0f0daa8c 100644 --- a/crates/gateway/src/auth_webauthn.rs +++ b/crates/gateway/src/auth_webauthn.rs @@ -83,13 +83,11 @@ impl WebAuthnState { .map_err(|e| anyhow::anyhow!("start_passkey_registration: {e}"))?; let challenge_id = uuid::Uuid::new_v4().to_string(); - self.pending_registrations.insert( - challenge_id.clone(), - PendingRegistration { + self.pending_registrations + .insert(challenge_id.clone(), PendingRegistration { state: reg_state, created_at: Instant::now(), - }, - ); + }); Ok((challenge_id, ccr)) } @@ -134,13 +132,11 @@ impl WebAuthnState { .map_err(|e| anyhow::anyhow!("start_passkey_authentication: {e}"))?; let challenge_id = uuid::Uuid::new_v4().to_string(); - self.pending_authentications.insert( - challenge_id.clone(), - PendingAuthentication { + self.pending_authentications + .insert(challenge_id.clone(), PendingAuthentication { state: auth_state, created_at: Instant::now(), - }, - ); + }); Ok((challenge_id, rcr)) } diff --git a/crates/gateway/src/channel_events.rs b/crates/gateway/src/channel_events.rs index 537f8fd4..3bdfd29a 100644 --- a/crates/gateway/src/channel_events.rs +++ b/crates/gateway/src/channel_events.rs @@ -72,15 +72,10 @@ impl ChannelEventSink for GatewayChannelEventSink { return; }, }; - broadcast( - state, - "channel", - payload, - BroadcastOpts { - drop_if_slow: true, - ..Default::default() - }, - ) + broadcast(state, "channel", payload, BroadcastOpts { + drop_if_slow: true, + ..Default::default() + }) .await; } } @@ -113,15 +108,10 @@ impl ChannelEventSink for GatewayChannelEventSink { "sessionKey": &session_key, "messageIndex": msg_index, }); - broadcast( - state, - "chat", - payload, - BroadcastOpts { - drop_if_slow: true, - ..Default::default() - }, - ) + broadcast(state, "chat", payload, BroadcastOpts { + drop_if_slow: true, + ..Default::default() + }) .await; // Register the reply target so the chat "final" broadcast can @@ -346,15 +336,10 @@ impl ChannelEventSink for GatewayChannelEventSink { return; }, }; - broadcast( - state, - "channel", - payload, - BroadcastOpts { - drop_if_slow: true, - ..Default::default() - }, - ) + broadcast(state, "channel", payload, BroadcastOpts { + drop_if_slow: true, + ..Default::default() + }) .await; } else { warn!("request_disable_account: gateway not ready"); @@ -556,15 +541,10 @@ impl ChannelEventSink for GatewayChannelEventSink { "messageIndex": msg_index, "hasAttachments": true, }); - broadcast( - state, - "chat", - payload, - BroadcastOpts { - drop_if_slow: true, - ..Default::default() - }, - ) + broadcast(state, "chat", payload, BroadcastOpts { + drop_if_slow: true, + ..Default::default() + }) .await; // Register the reply target diff --git a/crates/gateway/src/chat.rs b/crates/gateway/src/chat.rs index 8cd7ba6e..8e77c0ef 100644 --- a/crates/gateway/src/chat.rs +++ b/crates/gateway/src/chat.rs @@ -2063,7 +2063,10 @@ impl ChatService for LiveChatService { .and_then(|v| v.as_str()) .map(String::from); // Auto-compact: if conversation input tokens exceed 95% of context window, compact first. - let context_window = provider.context_window() as u64; + let context_window = match provider.model_metadata().await { + Ok(meta) => u64::from(meta.context_length), + Err(_) => u64::from(provider.context_window()), + }; let total_input: u64 = history .iter() .filter_map(|m| m.get("inputTokens").and_then(|v| v.as_u64())) @@ -3818,13 +3821,14 @@ async fn run_with_tools( } r }); - let tool_result_msg = PersistedMessage::tool_result( + let tool_result_msg = PersistedMessage::tool_result_with_run_id( id, name, tracked_args, success, persisted_result, error, + run_id.clone(), ); let store_clone = Arc::clone(store); let sk_persist = sk.clone(); @@ -4303,6 +4307,7 @@ async fn run_streaming( .record(duration); } + let accumulated = moltis_agents::response_sanitizer::clean_response(&accumulated); let is_silent = accumulated.trim().is_empty(); info!( @@ -5847,12 +5852,10 @@ mod tests { ); let disabled = Arc::new(RwLock::new(DisabledModelsStore::default())); - let service = LiveModelService::new( - Arc::new(RwLock::new(registry)), - disabled, - vec![], - vec!["opus".into()], - ); + let service = + LiveModelService::new(Arc::new(RwLock::new(registry)), disabled, vec![], vec![ + "opus".into(), + ]); // list() should only contain opus. let result = service.list().await.unwrap(); @@ -5896,12 +5899,10 @@ mod tests { ); let disabled = Arc::new(RwLock::new(DisabledModelsStore::default())); - let service = LiveModelService::new( - Arc::new(RwLock::new(registry)), - disabled, - vec![], - vec!["opus".into()], - ); + let service = + LiveModelService::new(Arc::new(RwLock::new(registry)), disabled, vec![], vec![ + "opus".into(), + ]); let result = service.list().await.unwrap(); let arr = result.as_array().unwrap(); diff --git a/crates/gateway/src/methods.rs b/crates/gateway/src/methods.rs index c52312d8..d6bba33a 100644 --- a/crates/gateway/src/methods.rs +++ b/crates/gateway/src/methods.rs @@ -65,6 +65,7 @@ const READ_METHODS: &[&str] = &[ "sessions.preview", "sessions.search", "sessions.branches", + "sessions.run_detail", "projects.list", "projects.get", "projects.context", @@ -1418,6 +1419,19 @@ impl MethodRegistry { }) }), ); + self.register( + "sessions.run_detail", + Box::new(|ctx| { + Box::pin(async move { + ctx.state + .services + .session + .run_detail(ctx.params.clone()) + .await + .map_err(|e| ErrorShape::new(error_codes::UNAVAILABLE, e)) + }) + }), + ); // Channels self.register( @@ -5520,10 +5534,10 @@ mod tests { VoiceProviderId::parse_tts_list_id, ); let ids: Vec<_> = filtered.into_iter().map(|p| p.id).collect(); - assert_eq!( - ids, - vec![VoiceProviderId::OpenaiTts, VoiceProviderId::Piper] - ); + assert_eq!(ids, vec![ + VoiceProviderId::OpenaiTts, + VoiceProviderId::Piper + ]); } #[test] diff --git a/crates/gateway/src/provider_setup.rs b/crates/gateway/src/provider_setup.rs index e030f75a..4d6ce3eb 100644 --- a/crates/gateway/src/provider_setup.rs +++ b/crates/gateway/src/provider_setup.rs @@ -86,14 +86,11 @@ impl KeyStore { return old_format .into_iter() .map(|(k, v)| { - ( - k, - ProviderConfig { - api_key: Some(v), - base_url: None, - model: None, - }, - ) + (k, ProviderConfig { + api_key: Some(v), + base_url: None, + model: None, + }) }) .collect(); } @@ -1890,13 +1887,12 @@ mod tests { .expect("openai-codex should exist"); let mut config = ProvidersConfig::default(); - config.providers.insert( - "openai-codex".into(), - ProviderEntry { + config + .providers + .insert("openai-codex".into(), ProviderEntry { enabled: false, ..Default::default() - }, - ); + }); assert!(!svc.is_provider_configured(&provider, &config)); } @@ -1923,13 +1919,10 @@ mod tests { store.save("anthropic", "sk-saved").unwrap(); let mut base = ProvidersConfig::default(); - base.providers.insert( - "anthropic".into(), - ProviderEntry { - api_key: Some(Secret::new("sk-config".into())), - ..Default::default() - }, - ); + base.providers.insert("anthropic".into(), ProviderEntry { + api_key: Some(Secret::new("sk-config".into())), + ..Default::default() + }); let merged = config_with_saved_keys(&base, &store); let entry = merged.get("anthropic").unwrap(); // Config key takes precedence over saved key. @@ -2128,14 +2121,11 @@ mod tests { Some(&home) )); - home.save( - "github-copilot", - &OAuthTokens { - access_token: Secret::new("home-token".to_string()), - refresh_token: None, - expires_at: None, - }, - ) + home.save("github-copilot", &OAuthTokens { + access_token: Secret::new("home-token".to_string()), + refresh_token: None, + expires_at: None, + }) .expect("save home token"); assert!(has_oauth_tokens_for_provider( @@ -2330,23 +2320,17 @@ mod tests { let mut empty = ProvidersConfig::default(); assert!(!has_explicit_provider_settings(&empty)); - empty.providers.insert( - "openai".into(), - ProviderEntry { - api_key: Some(Secret::new("sk-test".into())), - ..Default::default() - }, - ); + empty.providers.insert("openai".into(), ProviderEntry { + api_key: Some(Secret::new("sk-test".into())), + ..Default::default() + }); assert!(has_explicit_provider_settings(&empty)); let mut model_only = ProvidersConfig::default(); - model_only.providers.insert( - "ollama".into(), - ProviderEntry { - model: Some("llama3".into()), - ..Default::default() - }, - ); + model_only.providers.insert("ollama".into(), ProviderEntry { + model: Some("llama3".into()), + ..Default::default() + }); assert!(has_explicit_provider_settings(&model_only)); } diff --git a/crates/gateway/src/server.rs b/crates/gateway/src/server.rs index f81bc95d..003aa919 100644 --- a/crates/gateway/src/server.rs +++ b/crates/gateway/src/server.rs @@ -129,14 +129,11 @@ impl moltis_tools::location::LocationRequester for GatewayLocationRequester { { let mut inner_w = self.state.inner.write().await; let invokes = &mut inner_w.pending_invokes; - invokes.insert( - request_id.clone(), - crate::state::PendingInvoke { - request_id: request_id.clone(), - sender: tx, - created_at: std::time::Instant::now(), - }, - ); + invokes.insert(request_id.clone(), crate::state::PendingInvoke { + request_id: request_id.clone(), + sender: tx, + created_at: std::time::Instant::now(), + }); } // Wait up to 30 seconds for the user to grant/deny permission. @@ -250,14 +247,13 @@ impl moltis_tools::location::LocationRequester for GatewayLocationRequester { let (tx, rx) = tokio::sync::oneshot::channel(); { let mut inner = self.state.inner.write().await; - inner.pending_invokes.insert( - pending_key.clone(), - crate::state::PendingInvoke { + inner + .pending_invokes + .insert(pending_key.clone(), crate::state::PendingInvoke { request_id: pending_key.clone(), sender: tx, created_at: std::time::Instant::now(), - }, - ); + }); } // Wait up to 60 seconds — user needs to navigate Telegram's UI. @@ -1031,17 +1027,16 @@ pub async fn start_gateway( "sse" => moltis_mcp::registry::TransportType::Sse, _ => moltis_mcp::registry::TransportType::Stdio, }; - merged.servers.insert( - name.clone(), - moltis_mcp::McpServerConfig { + merged + .servers + .insert(name.clone(), moltis_mcp::McpServerConfig { command: entry.command.clone(), args: entry.args.clone(), env: entry.env.clone(), enabled: entry.enabled, transport, url: entry.url.clone(), - }, - ); + }); } } mcp_configured_count = merged.servers.values().filter(|s| s.enabled).count(); @@ -2874,15 +2869,10 @@ pub async fn start_gateway( } }; if changed && let Ok(payload) = serde_json::to_value(&next) { - broadcast( - &update_state, - "update.available", - payload, - BroadcastOpts { - drop_if_slow: true, - ..Default::default() - }, - ) + broadcast(&update_state, "update.available", payload, BroadcastOpts { + drop_if_slow: true, + ..Default::default() + }) .await; } }, @@ -2945,15 +2935,12 @@ pub async fn start_gateway( .by_provider .iter() .map(|(name, metrics)| { - ( - name.clone(), - moltis_metrics::ProviderTokens { - input_tokens: metrics.input_tokens, - output_tokens: metrics.output_tokens, - completions: metrics.completions, - errors: metrics.errors, - }, - ) + (name.clone(), moltis_metrics::ProviderTokens { + input_tokens: metrics.input_tokens, + output_tokens: metrics.output_tokens, + completions: metrics.completions, + errors: metrics.errors, + }) }) .collect(); diff --git a/crates/gateway/src/services.rs b/crates/gateway/src/services.rs index bcfdd86e..865432e3 100644 --- a/crates/gateway/src/services.rs +++ b/crates/gateway/src/services.rs @@ -218,6 +218,7 @@ pub trait SessionService: Send + Sync { async fn search(&self, params: Value) -> ServiceResult; async fn fork(&self, params: Value) -> ServiceResult; async fn branches(&self, params: Value) -> ServiceResult; + async fn run_detail(&self, params: Value) -> ServiceResult; async fn clear_all(&self) -> ServiceResult; async fn mark_seen(&self, key: &str); } @@ -266,6 +267,10 @@ impl SessionService for NoopSessionService { Ok(serde_json::json!([])) } + async fn run_detail(&self, _p: Value) -> ServiceResult { + Ok(serde_json::json!({})) + } + async fn clear_all(&self) -> ServiceResult { Ok(serde_json::json!({ "deleted": 0 })) } diff --git a/crates/gateway/src/session.rs b/crates/gateway/src/session.rs index 68fb0927..da5a24bc 100644 --- a/crates/gateway/src/session.rs +++ b/crates/gateway/src/session.rs @@ -676,6 +676,47 @@ impl SessionService for LiveSessionService { Ok(serde_json::json!({ "deleted": deleted })) } + + async fn run_detail(&self, params: serde_json::Value) -> ServiceResult { + let session_key = params + .get("sessionKey") + .and_then(|v| v.as_str()) + .ok_or_else(|| "missing 'sessionKey' parameter".to_string())?; + let run_id = params + .get("runId") + .and_then(|v| v.as_str()) + .ok_or_else(|| "missing 'runId' parameter".to_string())?; + + let messages = self + .store + .read_by_run_id(session_key, run_id) + .await + .map_err(|e| e.to_string())?; + + // Build summary counts. + let mut user_messages = 0u32; + let mut tool_calls = 0u32; + let mut assistant_messages = 0u32; + + for msg in &messages { + match msg.get("role").and_then(|v| v.as_str()) { + Some("user") => user_messages += 1, + Some("assistant") => assistant_messages += 1, + Some("tool_result") => tool_calls += 1, + _ => {}, + } + } + + Ok(serde_json::json!({ + "runId": run_id, + "messages": messages, + "summary": { + "userMessages": user_messages, + "toolCalls": tool_calls, + "assistantMessages": assistant_messages, + } + })) + } } #[cfg(test)] diff --git a/crates/gateway/src/state.rs b/crates/gateway/src/state.rs index 7460fc6d..a83991e0 100644 --- a/crates/gateway/src/state.rs +++ b/crates/gateway/src/state.rs @@ -183,12 +183,9 @@ impl DedupeCache { { self.entries.remove(&oldest_key); } - self.entries.insert( - key.to_string(), - DedupeEntry { - inserted_at: Instant::now(), - }, - ); + self.entries.insert(key.to_string(), DedupeEntry { + inserted_at: Instant::now(), + }); false } diff --git a/crates/gateway/ui/e2e/specs/run-detail.spec.js b/crates/gateway/ui/e2e/specs/run-detail.spec.js new file mode 100644 index 00000000..0da487e0 --- /dev/null +++ b/crates/gateway/ui/e2e/specs/run-detail.spec.js @@ -0,0 +1,71 @@ +const { expect, test } = require("@playwright/test"); +const { navigateAndWait, waitForWsConnected, watchPageErrors } = require("../helpers"); + +test.describe("Run detail panel", () => { + test("run detail button is not visible for messages without run_id", async ({ page }) => { + const pageErrors = await navigateAndWait(page, "/"); + await waitForWsConnected(page); + + // The default "main" session may have history — but any plain assistant + // message from before this feature won't have a run_id, so no "Run details" + // button should appear. + const runDetailButtons = page.locator("text=Run details"); + // Count should be 0 or match only messages that have run_id. + // Since this is a fresh instance, there should be no run detail buttons. + const count = await runDetailButtons.count(); + expect(count).toBe(0); + + expect(pageErrors).toEqual([]); + }); + + test("sessions.run_detail RPC returns valid structure", async ({ page }) => { + const pageErrors = await navigateAndWait(page, "/"); + await waitForWsConnected(page); + + // Call the RPC directly and verify the response structure. + const result = await page.evaluate(async () => { + // Access the sendRpc function via the global WebSocket. + const ws = window.__moltis_ws; + if (!ws) return { error: "no websocket" }; + + return new Promise((resolve) => { + const id = Math.random().toString(36).slice(2); + const handler = (event) => { + try { + const data = JSON.parse(event.data); + if (data.id === id) { + ws.removeEventListener("message", handler); + resolve(data); + } + } catch { + // ignore non-JSON frames + } + }; + ws.addEventListener("message", handler); + ws.send( + JSON.stringify({ + jsonrpc: "2.0", + id, + method: "sessions.run_detail", + params: { sessionKey: "main", runId: "nonexistent-run-id" }, + }), + ); + // Timeout after 5s + setTimeout(() => { + ws.removeEventListener("message", handler); + resolve({ error: "timeout" }); + }, 5000); + }); + }); + + // The RPC should succeed (even if no messages match the run_id). + if (result.result) { + expect(result.result).toHaveProperty("runId", "nonexistent-run-id"); + expect(result.result).toHaveProperty("messages"); + expect(result.result).toHaveProperty("summary"); + expect(Array.isArray(result.result.messages)).toBe(true); + } + + expect(pageErrors).toEqual([]); + }); +}); diff --git a/crates/mcp/src/manager.rs b/crates/mcp/src/manager.rs index 8965a9df..8c87fffb 100644 --- a/crates/mcp/src/manager.rs +++ b/crates/mcp/src/manager.rs @@ -318,13 +318,10 @@ mod tests { #[tokio::test] async fn test_status_shows_stopped_for_configured_but_not_started() { let mut reg = McpRegistry::new(); - reg.servers.insert( - "test".into(), - McpServerConfig { - command: "echo".into(), - ..Default::default() - }, - ); + reg.servers.insert("test".into(), McpServerConfig { + command: "echo".into(), + ..Default::default() + }); let mgr = McpManager::new(reg); let statuses = mgr.status_all().await; diff --git a/crates/mcp/src/registry.rs b/crates/mcp/src/registry.rs index b309e7b1..5e0e5f83 100644 --- a/crates/mcp/src/registry.rs +++ b/crates/mcp/src/registry.rs @@ -167,13 +167,10 @@ mod tests { #[test] fn test_registry_add_remove() { let mut reg = McpRegistry::new(); - reg.servers.insert( - "test".into(), - McpServerConfig { - command: "echo".into(), - ..Default::default() - }, - ); + reg.servers.insert("test".into(), McpServerConfig { + command: "echo".into(), + ..Default::default() + }); assert_eq!(reg.list().len(), 1); assert!(reg.get("test").is_some()); @@ -184,13 +181,10 @@ mod tests { #[test] fn test_registry_enable_disable() { let mut reg = McpRegistry::new(); - reg.servers.insert( - "srv".into(), - McpServerConfig { - command: "test".into(), - ..Default::default() - }, - ); + reg.servers.insert("srv".into(), McpServerConfig { + command: "test".into(), + ..Default::default() + }); assert_eq!(reg.enabled_servers().len(), 1); @@ -201,14 +195,11 @@ mod tests { #[test] fn test_registry_serialization() { let mut reg = McpRegistry::new(); - reg.servers.insert( - "fs".into(), - McpServerConfig { - command: "mcp-server-filesystem".into(), - args: vec!["/tmp".into()], - ..Default::default() - }, - ); + reg.servers.insert("fs".into(), McpServerConfig { + command: "mcp-server-filesystem".into(), + args: vec!["/tmp".into()], + ..Default::default() + }); let json = serde_json::to_string(®).unwrap(); let parsed: McpRegistry = serde_json::from_str(&json).unwrap(); @@ -229,15 +220,12 @@ mod tests { let path = dir.path().join("mcp.json"); let mut reg = McpRegistry::load(&path).unwrap(); - reg.servers.insert( - "test".into(), - McpServerConfig { - command: "echo".into(), - args: vec!["hello".into()], - env: HashMap::from([("FOO".into(), "bar".into())]), - ..Default::default() - }, - ); + reg.servers.insert("test".into(), McpServerConfig { + command: "echo".into(), + args: vec!["hello".into()], + env: HashMap::from([("FOO".into(), "bar".into())]), + ..Default::default() + }); reg.save().unwrap(); let loaded = McpRegistry::load(&path).unwrap(); diff --git a/crates/metrics/src/store.rs b/crates/metrics/src/store.rs index 95b457f9..d783e9d6 100644 --- a/crates/metrics/src/store.rs +++ b/crates/metrics/src/store.rs @@ -343,24 +343,22 @@ mod tests { let store = SqliteMetricsStore::in_memory().await.unwrap(); let mut point = make_point(1000, 10); - point.by_provider.insert( - "anthropic".to_string(), - ProviderTokens { + point + .by_provider + .insert("anthropic".to_string(), ProviderTokens { input_tokens: 500, output_tokens: 200, completions: 5, errors: 0, - }, - ); - point.by_provider.insert( - "openai".to_string(), - ProviderTokens { + }); + point + .by_provider + .insert("openai".to_string(), ProviderTokens { input_tokens: 300, output_tokens: 100, completions: 5, errors: 1, - }, - ); + }); store.save_point(&point).await.unwrap(); diff --git a/crates/oauth/src/defaults.rs b/crates/oauth/src/defaults.rs index c08e30dc..e1ead945 100644 --- a/crates/oauth/src/defaults.rs +++ b/crates/oauth/src/defaults.rs @@ -8,50 +8,41 @@ fn builtin_defaults() -> HashMap { // GitHub Copilot uses device flow (handled by the provider itself), // but we store a config entry so `load_oauth_config` returns Some // and the gateway recognises it as an OAuth provider. - m.insert( - "github-copilot".into(), - OAuthConfig { - client_id: "Iv1.b507a08c87ecfe98".into(), - auth_url: "https://github.com/login/device/code".into(), - token_url: "https://github.com/login/oauth/access_token".into(), - redirect_uri: String::new(), - scopes: vec![], - extra_auth_params: vec![], - device_flow: true, - }, - ); - m.insert( - "kimi-code".into(), - OAuthConfig { - client_id: "17e5f671-d194-4dfb-9706-5516cb48c098".into(), - auth_url: "https://auth.kimi.com/api/oauth/device_authorization".into(), - token_url: "https://auth.kimi.com/api/oauth/token".into(), - redirect_uri: String::new(), - scopes: vec![], - extra_auth_params: vec![], - device_flow: true, - }, - ); - m.insert( - "openai-codex".into(), - OAuthConfig { - client_id: "app_EMoamEEZ73f0CkXaXp7hrann".into(), - auth_url: "https://auth.openai.com/oauth/authorize".into(), - token_url: "https://auth.openai.com/oauth/token".into(), - redirect_uri: "http://localhost:1455/auth/callback".into(), - scopes: vec![ - "openid".into(), - "profile".into(), - "email".into(), - "offline_access".into(), - ], - extra_auth_params: vec![ - ("id_token_add_organizations".into(), "true".into()), - ("codex_cli_simplified_flow".into(), "true".into()), - ], - device_flow: false, - }, - ); + m.insert("github-copilot".into(), OAuthConfig { + client_id: "Iv1.b507a08c87ecfe98".into(), + auth_url: "https://github.com/login/device/code".into(), + token_url: "https://github.com/login/oauth/access_token".into(), + redirect_uri: String::new(), + scopes: vec![], + extra_auth_params: vec![], + device_flow: true, + }); + m.insert("kimi-code".into(), OAuthConfig { + client_id: "17e5f671-d194-4dfb-9706-5516cb48c098".into(), + auth_url: "https://auth.kimi.com/api/oauth/device_authorization".into(), + token_url: "https://auth.kimi.com/api/oauth/token".into(), + redirect_uri: String::new(), + scopes: vec![], + extra_auth_params: vec![], + device_flow: true, + }); + m.insert("openai-codex".into(), OAuthConfig { + client_id: "app_EMoamEEZ73f0CkXaXp7hrann".into(), + auth_url: "https://auth.openai.com/oauth/authorize".into(), + token_url: "https://auth.openai.com/oauth/token".into(), + redirect_uri: "http://localhost:1455/auth/callback".into(), + scopes: vec![ + "openid".into(), + "profile".into(), + "email".into(), + "offline_access".into(), + ], + extra_auth_params: vec![ + ("id_token_add_organizations".into(), "true".into()), + ("codex_cli_simplified_flow".into(), "true".into()), + ], + device_flow: false, + }); m } diff --git a/crates/sessions/src/message.rs b/crates/sessions/src/message.rs index b856ddba..23514111 100644 --- a/crates/sessions/src/message.rs +++ b/crates/sessions/src/message.rs @@ -80,6 +80,9 @@ pub enum PersistedMessage { error: Option, #[serde(skip_serializing_if = "Option::is_none")] created_at: Option, + /// Agent run ID linking this result to its parent run. + #[serde(skip_serializing_if = "Option::is_none")] + run_id: Option, }, } @@ -226,6 +229,29 @@ impl PersistedMessage { result, error, created_at: Some(now_ms()), + run_id: None, + } + } + + /// Create a tool result message with a run ID linking it to its agent run. + pub fn tool_result_with_run_id( + tool_call_id: impl Into, + tool_name: impl Into, + arguments: Option, + success: bool, + result: Option, + error: Option, + run_id: impl Into, + ) -> Self { + Self::ToolResult { + tool_call_id: tool_call_id.into(), + tool_name: tool_name.into(), + arguments, + success, + result, + error, + created_at: Some(now_ms()), + run_id: Some(run_id.into()), } } @@ -463,6 +489,7 @@ mod tests { result: Some(serde_json::json!({"stdout": "file.txt", "exit_code": 0})), error: None, created_at: Some(12345), + run_id: None, }; let json = serde_json::to_value(&msg).unwrap(); assert_eq!(json["role"], "tool_result"); @@ -484,6 +511,7 @@ mod tests { result: None, error: Some("command not found".to_string()), created_at: Some(12345), + run_id: None, }; let json = serde_json::to_value(&msg).unwrap(); assert_eq!(json["role"], "tool_result"); diff --git a/crates/sessions/src/store.rs b/crates/sessions/src/store.rs index d4e979e3..e791ef0a 100644 --- a/crates/sessions/src/store.rs +++ b/crates/sessions/src/store.rs @@ -72,8 +72,8 @@ impl SessionStore { .await? } - /// Append a message (JSON value) as a single line to the session file. - pub async fn append(&self, key: &str, message: &serde_json::Value) -> Result<()> { + /// Append a message as a single line to the session file. + pub async fn append(&self, key: &str, message: &T) -> Result<()> { let path = self.path_for(key); let line = serde_json::to_string(message)?; @@ -150,6 +150,46 @@ impl SessionStore { .await? } + /// Read all messages belonging to a specific agent run. + /// + /// Returns messages that have a matching `run_id` field, plus adjacent + /// `tool_result` messages that sit between the matching user and assistant + /// messages (these don't carry `run_id` themselves but belong to the run). + pub async fn read_by_run_id(&self, key: &str, run_id: &str) -> Result> { + let all = self.read(key).await?; + let run_id = run_id.to_string(); + + tokio::task::spawn_blocking(move || -> Result> { + let mut result = Vec::new(); + let mut in_run = false; + + for msg in &all { + let msg_run_id = msg.get("run_id").and_then(|v| v.as_str()); + + if msg_run_id == Some(&run_id) { + in_run = true; + result.push(msg.clone()); + continue; + } + + // Collect tool_result messages between the run's user and assistant messages. + let role = msg.get("role").and_then(|v| v.as_str()).unwrap_or(""); + if in_run && role == "tool_result" { + result.push(msg.clone()); + continue; + } + + // Any non-tool_result message without matching run_id ends the run scope. + if in_run { + in_run = false; + } + } + + Ok(result) + }) + .await? + } + /// Delete the session file and its media directory. pub async fn clear(&self, key: &str) -> Result<()> { let path = self.path_for(key); @@ -305,7 +345,11 @@ impl SessionStore { #[allow(clippy::unwrap_used, clippy::expect_used)] #[cfg(test)] mod tests { - use {super::*, serde_json::json}; + use { + super::*, + crate::{MessageContent, PersistedMessage}, + serde_json::json, + }; fn temp_store() -> (SessionStore, tempfile::TempDir) { let dir = tempfile::tempdir().unwrap(); @@ -313,16 +357,69 @@ mod tests { (store, dir) } + fn user_message(text: impl Into) -> PersistedMessage { + PersistedMessage::User { + content: MessageContent::Text(text.into()), + created_at: None, + channel: None, + seq: None, + run_id: None, + } + } + + fn user_message_with_run_id( + text: impl Into, + run_id: impl Into, + ) -> PersistedMessage { + PersistedMessage::User { + content: MessageContent::Text(text.into()), + created_at: None, + channel: None, + seq: None, + run_id: Some(run_id.into()), + } + } + + fn assistant_message(text: impl Into) -> PersistedMessage { + PersistedMessage::Assistant { + content: text.into(), + created_at: None, + model: None, + provider: None, + input_tokens: None, + output_tokens: None, + tool_calls: None, + audio: None, + seq: None, + run_id: None, + } + } + + fn assistant_message_with_run_id( + text: impl Into, + run_id: impl Into, + ) -> PersistedMessage { + PersistedMessage::Assistant { + content: text.into(), + created_at: None, + model: None, + provider: None, + input_tokens: None, + output_tokens: None, + tool_calls: None, + audio: None, + seq: None, + run_id: Some(run_id.into()), + } + } + #[tokio::test] async fn test_append_and_read() { let (store, _dir) = temp_store(); + store.append("main", &user_message("hello")).await.unwrap(); store - .append("main", &json!({"role": "user", "content": "hello"})) - .await - .unwrap(); - store - .append("main", &json!({"role": "assistant", "content": "hi"})) + .append("main", &assistant_message("hi")) .await .unwrap(); @@ -357,10 +454,7 @@ mod tests { async fn test_clear() { let (store, _dir) = temp_store(); - store - .append("main", &json!({"role": "user", "content": "hello"})) - .await - .unwrap(); + store.append("main", &user_message("hello")).await.unwrap(); assert_eq!(store.read("main").await.unwrap().len(), 1); store.clear("main").await.unwrap(); @@ -372,14 +466,8 @@ mod tests { let (store, _dir) = temp_store(); assert_eq!(store.count("main").await.unwrap(), 0); - store - .append("main", &json!({"role": "user"})) - .await - .unwrap(); - store - .append("main", &json!({"role": "assistant"})) - .await - .unwrap(); + store.append("main", &user_message("")).await.unwrap(); + store.append("main", &assistant_message("")).await.unwrap(); assert_eq!(store.count("main").await.unwrap(), 2); } @@ -388,15 +476,15 @@ mod tests { let (store, _dir) = temp_store(); store - .append("s1", &json!({"role": "user", "content": "hello world"})) + .append("s1", &user_message("hello world")) .await .unwrap(); store - .append("s1", &json!({"role": "assistant", "content": "hi there"})) + .append("s1", &assistant_message("hi there")) .await .unwrap(); store - .append("s2", &json!({"role": "user", "content": "goodbye world"})) + .append("s2", &user_message("goodbye world")) .await .unwrap(); @@ -412,7 +500,7 @@ mod tests { let (store, _dir) = temp_store(); store - .append("s1", &json!({"role": "user", "content": "Hello World"})) + .append("s1", &user_message("Hello World")) .await .unwrap(); @@ -425,10 +513,7 @@ mod tests { async fn test_search_no_match() { let (store, _dir) = temp_store(); - store - .append("s1", &json!({"role": "user", "content": "hello"})) - .await - .unwrap(); + store.append("s1", &user_message("hello")).await.unwrap(); let results = store.search("xyz", 10).await.unwrap(); assert!(results.is_empty()); @@ -438,10 +523,7 @@ mod tests { async fn test_search_empty_query() { let (store, _dir) = temp_store(); - store - .append("s1", &json!({"role": "user", "content": "hello"})) - .await - .unwrap(); + store.append("s1", &user_message("hello")).await.unwrap(); // Empty query should match nothing (caller should guard against this) let results = store.search("", 10).await.unwrap(); @@ -455,18 +537,15 @@ mod tests { let (store, _dir) = temp_store(); store - .append("s1", &json!({"role": "user", "content": "rust is great"})) + .append("s1", &user_message("rust is great")) .await .unwrap(); store - .append( - "s2", - &json!({"role": "assistant", "content": "rust is awesome"}), - ) + .append("s2", &assistant_message("rust is awesome")) .await .unwrap(); store - .append("s3", &json!({"role": "user", "content": "python is nice"})) + .append("s3", &user_message("python is nice")) .await .unwrap(); @@ -484,7 +563,7 @@ mod tests { for i in 0..10 { let key = format!("s{i}"); store - .append(&key, &json!({"role": "user", "content": "common term"})) + .append(&key, &user_message("common term")) .await .unwrap(); } @@ -497,12 +576,9 @@ mod tests { async fn test_replace_history() { let (store, _dir) = temp_store(); + store.append("main", &user_message("hello")).await.unwrap(); store - .append("main", &json!({"role": "user", "content": "hello"})) - .await - .unwrap(); - store - .append("main", &json!({"role": "assistant", "content": "hi"})) + .append("main", &assistant_message("hi")) .await .unwrap(); assert_eq!(store.read("main").await.unwrap().len(), 2); @@ -519,10 +595,7 @@ mod tests { async fn test_replace_history_empty() { let (store, _dir) = temp_store(); - store - .append("main", &json!({"role": "user", "content": "hello"})) - .await - .unwrap(); + store.append("main", &user_message("hello")).await.unwrap(); store.replace_history("main", vec![]).await.unwrap(); assert!(store.read("main").await.unwrap().is_empty()); @@ -533,7 +606,7 @@ mod tests { let (store, _dir) = temp_store(); store - .append("session:abc-123", &json!({"role": "user"})) + .append("session:abc-123", &user_message("")) .await .unwrap(); let msgs = store.read("session:abc-123").await.unwrap(); @@ -579,10 +652,7 @@ mod tests { let (store, dir) = temp_store(); // Create a session and media. - store - .append("main", &json!({"role": "user", "content": "hello"})) - .await - .unwrap(); + store.append("main", &user_message("hello")).await.unwrap(); store .save_media("main", "shot.png", b"img data") .await @@ -596,4 +666,58 @@ mod tests { assert!(!media_dir.exists()); assert!(store.read("main").await.unwrap().is_empty()); } + + #[tokio::test] + async fn test_read_by_run_id() { + let (store, _dir) = temp_store(); + + // Simulate a session with two runs. + store + .append("main", &user_message_with_run_id("hello", "run-1")) + .await + .unwrap(); + store + .append( + "main", + &PersistedMessage::tool_result("call-1", "exec", None, true, None, None), + ) + .await + .unwrap(); + store + .append("main", &assistant_message_with_run_id("done", "run-1")) + .await + .unwrap(); + store + .append("main", &user_message_with_run_id("another", "run-2")) + .await + .unwrap(); + store + .append("main", &assistant_message_with_run_id("ok", "run-2")) + .await + .unwrap(); + + let run1 = store.read_by_run_id("main", "run-1").await.unwrap(); + assert_eq!(run1.len(), 3); + assert_eq!(run1[0]["role"], "user"); + assert_eq!(run1[1]["role"], "tool_result"); + assert_eq!(run1[2]["role"], "assistant"); + + let run2 = store.read_by_run_id("main", "run-2").await.unwrap(); + assert_eq!(run2.len(), 2); + assert_eq!(run2[0]["role"], "user"); + assert_eq!(run2[1]["role"], "assistant"); + } + + #[tokio::test] + async fn test_read_by_run_id_nonexistent() { + let (store, _dir) = temp_store(); + + store + .append("main", &user_message_with_run_id("hi", "run-1")) + .await + .unwrap(); + + let result = store.read_by_run_id("main", "no-such-run").await.unwrap(); + assert!(result.is_empty()); + } } diff --git a/crates/telegram/src/handlers.rs b/crates/telegram/src/handlers.rs index 4f7cb808..efd43281 100644 --- a/crates/telegram/src/handlers.rs +++ b/crates/telegram/src/handlers.rs @@ -1746,23 +1746,20 @@ mod tests { { let mut map = accounts.write().expect("accounts write lock"); - map.insert( - account_id.to_string(), - AccountState { - bot: bot.clone(), - bot_username: Some("test_bot".into()), - account_id: account_id.to_string(), - config: TelegramAccountConfig { - token: Secret::new("test-token".to_string()), - ..Default::default() - }, - outbound: Arc::clone(&outbound), - cancel: CancellationToken::new(), - message_log: None, - event_sink: Some(Arc::clone(&sink) as Arc), - otp: std::sync::Mutex::new(OtpState::new(300)), + map.insert(account_id.to_string(), AccountState { + bot: bot.clone(), + bot_username: Some("test_bot".into()), + account_id: account_id.to_string(), + config: TelegramAccountConfig { + token: Secret::new("test-token".to_string()), + ..Default::default() }, - ); + outbound: Arc::clone(&outbound), + cancel: CancellationToken::new(), + message_log: None, + event_sink: Some(Arc::clone(&sink) as Arc), + otp: std::sync::Mutex::new(OtpState::new(300)), + }); } let msg: Message = serde_json::from_value(json!({ diff --git a/crates/telegram/src/otp.rs b/crates/telegram/src/otp.rs index 4d1ebee4..bdec9f24 100644 --- a/crates/telegram/src/otp.rs +++ b/crates/telegram/src/otp.rs @@ -158,12 +158,9 @@ impl OtpState { challenge.attempts += 1; if challenge.attempts >= MAX_ATTEMPTS { self.challenges.remove(peer_id); - self.lockouts.insert( - peer_id.to_string(), - Lockout { - until: now + self.cooldown, - }, - ); + self.lockouts.insert(peer_id.to_string(), Lockout { + until: now + self.cooldown, + }); return OtpVerifyResult::LockedOut; } diff --git a/crates/tools/src/branch_session.rs b/crates/tools/src/branch_session.rs index d16e7432..4a44eae1 100644 --- a/crates/tools/src/branch_session.rs +++ b/crates/tools/src/branch_session.rs @@ -125,7 +125,20 @@ impl AgentTool for BranchSessionTool { #[allow(clippy::unwrap_used, clippy::expect_used)] #[cfg(test)] mod tests { - use super::*; + use { + super::*, + moltis_sessions::{MessageContent, PersistedMessage}, + }; + + fn user_message(text: impl Into) -> PersistedMessage { + PersistedMessage::User { + content: MessageContent::Text(text.into()), + created_at: None, + channel: None, + seq: None, + run_id: None, + } + } async fn setup() -> ( Arc, @@ -168,10 +181,7 @@ mod tests { .unwrap(); for i in 0..4 { store - .append( - parent_key, - &json!({"role": "user", "content": format!("msg {i}")}), - ) + .append(parent_key, &user_message(format!("msg {i}"))) .await .unwrap(); } @@ -211,10 +221,7 @@ mod tests { let parent_key = "session:parent2"; metadata.upsert(parent_key, None).await.unwrap(); - store - .append(parent_key, &json!({"role": "user", "content": "hi"})) - .await - .unwrap(); + store.append(parent_key, &user_message("hi")).await.unwrap(); let result = tool .execute(json!({ @@ -235,10 +242,7 @@ mod tests { metadata.upsert(parent_key, None).await.unwrap(); for i in 0..3 { store - .append( - parent_key, - &json!({"role": "user", "content": format!("msg {i}")}), - ) + .append(parent_key, &user_message(format!("msg {i}"))) .await .unwrap(); } diff --git a/crates/tools/src/sandbox.rs b/crates/tools/src/sandbox.rs index a63e0d3b..1ed52987 100644 --- a/crates/tools/src/sandbox.rs +++ b/crates/tools/src/sandbox.rs @@ -1741,10 +1741,14 @@ mod tests { }; let docker = DockerSandbox::new(config); let args = docker.resource_args(); - assert_eq!( - args, - vec!["--memory", "256M", "--cpus", "0.5", "--pids-limit", "50"] - ); + assert_eq!(args, vec![ + "--memory", + "256M", + "--cpus", + "0.5", + "--pids-limit", + "50" + ]); } #[test] diff --git a/crates/tools/src/sandbox_packages.rs b/crates/tools/src/sandbox_packages.rs index ea8bbaed..c805facf 100644 --- a/crates/tools/src/sandbox_packages.rs +++ b/crates/tools/src/sandbox_packages.rs @@ -35,156 +35,123 @@ use crate::{exec::ExecOpts, sandbox::SandboxRouter}; /// in "Other". Library/dev/font packages are filtered out before /// categorization (see [`is_infrastructure_package`]). const CATEGORY_MAP: &[(&str, &[&str])] = &[ - ( - "Networking", - &[ - "curl", - "wget", - "ca-certificates", - "dnsutils", - "netcat-openbsd", - "openssh-client", - "iproute2", - "net-tools", - ], - ), - ( - "Languages", - &[ - "python3", - "python3-pip", - "python3-venv", - "python-is-python3", - "nodejs", - "npm", - "ruby", - ], - ), - ( - "Build tools", - &[ - "build-essential", - "clang", - "pkg-config", - "autoconf", - "automake", - "libtool", - "bison", - "flex", - "dpkg-dev", - "fakeroot", - ], - ), - ( - "Compression", - &[ - "zip", - "unzip", - "bzip2", - "xz-utils", - "p7zip-full", - "tar", - "zstd", - "lz4", - "pigz", - ], - ), - ( - "CLI utilities", - &[ - "git", - "gnupg2", - "jq", - "rsync", - "file", - "tree", - "sqlite3", - "sudo", - "locales", - "tzdata", - "shellcheck", - "patchelf", - "tmux", - ], - ), + ("Networking", &[ + "curl", + "wget", + "ca-certificates", + "dnsutils", + "netcat-openbsd", + "openssh-client", + "iproute2", + "net-tools", + ]), + ("Languages", &[ + "python3", + "python3-pip", + "python3-venv", + "python-is-python3", + "nodejs", + "npm", + "ruby", + ]), + ("Build tools", &[ + "build-essential", + "clang", + "pkg-config", + "autoconf", + "automake", + "libtool", + "bison", + "flex", + "dpkg-dev", + "fakeroot", + ]), + ("Compression", &[ + "zip", + "unzip", + "bzip2", + "xz-utils", + "p7zip-full", + "tar", + "zstd", + "lz4", + "pigz", + ]), + ("CLI utilities", &[ + "git", + "gnupg2", + "jq", + "rsync", + "file", + "tree", + "sqlite3", + "sudo", + "locales", + "tzdata", + "shellcheck", + "patchelf", + "tmux", + ]), ("Text processing", &["ripgrep", "fd-find", "yq"]), ("Browser automation", &["chromium"]), - ( - "Image processing", - &[ - "imagemagick", - "graphicsmagick", - "libvips-tools", - "pngquant", - "optipng", - "jpegoptim", - "webp", - "libimage-exiftool-perl", - ], - ), - ( - "Audio/video", - &[ - "ffmpeg", - "sox", - "lame", - "flac", - "vorbis-tools", - "opus-tools", - "mediainfo", - ], - ), - ( - "Documents", - &[ - "pandoc", - "poppler-utils", - "ghostscript", - "texlive-latex-base", - "texlive-latex-extra", - "texlive-fonts-recommended", - "antiword", - "catdoc", - "unrtf", - "libreoffice-core", - "libreoffice-writer", - ], - ), - ( - "Data processing", - &[ - "csvtool", - "xmlstarlet", - "html2text", - "dos2unix", - "miller", - "datamash", - ], - ), - ( - "GIS/maps", - &[ - "gdal-bin", - "mapnik-utils", - "osm2pgsql", - "osmium-tool", - "osmctools", - "python3-mapnik", - ], - ), + ("Image processing", &[ + "imagemagick", + "graphicsmagick", + "libvips-tools", + "pngquant", + "optipng", + "jpegoptim", + "webp", + "libimage-exiftool-perl", + ]), + ("Audio/video", &[ + "ffmpeg", + "sox", + "lame", + "flac", + "vorbis-tools", + "opus-tools", + "mediainfo", + ]), + ("Documents", &[ + "pandoc", + "poppler-utils", + "ghostscript", + "texlive-latex-base", + "texlive-latex-extra", + "texlive-fonts-recommended", + "antiword", + "catdoc", + "unrtf", + "libreoffice-core", + "libreoffice-writer", + ]), + ("Data processing", &[ + "csvtool", + "xmlstarlet", + "html2text", + "dos2unix", + "miller", + "datamash", + ]), + ("GIS/maps", &[ + "gdal-bin", + "mapnik-utils", + "osm2pgsql", + "osmium-tool", + "osmctools", + "python3-mapnik", + ]), ("CalDAV/CardDAV", &["vdirsyncer", "khal", "python3-caldav"]), - ( - "Email", - &[ - "isync", - "offlineimap3", - "notmuch", - "notmuch-mutt", - "aerc", - "mutt", - "neomutt", - ], - ), + ("Email", &[ + "isync", + "offlineimap3", + "notmuch", + "notmuch-mutt", + "aerc", + "mutt", + "neomutt", + ]), ("Newsgroups (NNTP)", &["tin", "slrn"]), ("Messaging APIs", &["python3-discord"]), ]; diff --git a/crates/tools/src/web_fetch.rs b/crates/tools/src/web_fetch.rs index 7e383b32..c12e704e 100644 --- a/crates/tools/src/web_fetch.rs +++ b/crates/tools/src/web_fetch.rs @@ -65,13 +65,10 @@ impl WebFetchTool { let now = Instant::now(); cache.retain(|_, e| e.expires_at > now); } - cache.insert( - key, - CacheEntry { - value, - expires_at: Instant::now() + self.cache_ttl, - }, - ); + cache.insert(key, CacheEntry { + value, + expires_at: Instant::now() + self.cache_ttl, + }); } } diff --git a/crates/tools/src/web_search.rs b/crates/tools/src/web_search.rs index d5404fdb..d1112725 100644 --- a/crates/tools/src/web_search.rs +++ b/crates/tools/src/web_search.rs @@ -181,13 +181,10 @@ impl WebSearchTool { let now = Instant::now(); cache.retain(|_, e| e.expires_at > now); } - cache.insert( - key, - CacheEntry { - value, - expires_at: Instant::now() + self.cache_ttl, - }, - ); + cache.insert(key, CacheEntry { + value, + expires_at: Instant::now() + self.cache_ttl, + }); } }