diff --git a/apps/frontman_server/lib/frontman_server/tasks.ex b/apps/frontman_server/lib/frontman_server/tasks.ex index 5e521888..766c473e 100644 --- a/apps/frontman_server/lib/frontman_server/tasks.ex +++ b/apps/frontman_server/lib/frontman_server/tasks.ex @@ -254,19 +254,39 @@ defmodule FrontmanServer.Tasks do This is the primary "user turn" use case — recording what the user said and kicking off the agent loop. If an execution is already running, the - message is persisted but no new run is started. + prompt is rejected entirely (nothing persisted). """ @spec submit_user_message(Scope.t(), String.t(), list(), list(), keyword()) :: - {:ok, Interaction.UserMessage.t()} | {:error, :not_found} + {:ok, Interaction.UserMessage.t()} | {:error, :already_running} | {:error, :not_found} def submit_user_message(%Scope{} = scope, task_id, content_blocks, tools, opts \\ []) do - with {:ok, schema} <- get_task_by_id(scope, task_id), - interaction = Interaction.UserMessage.new(content_blocks), - {:ok, interaction} <- append_interaction(schema, interaction) do + with :ok <- guard_not_running(scope, task_id), + {:ok, interaction} <- add_user_message(scope, task_id, content_blocks) do + opts = Keyword.put(opts, :interaction_id, interaction.id) maybe_start_execution(scope, task_id, tools, opts) {:ok, interaction} end end + defp guard_not_running(scope, task_id) do + if Execution.running?(scope, task_id), do: {:error, :already_running}, else: :ok + end + + @doc """ + Persists a user message without starting execution. + + Use this when you need to record a user message in the conversation history + but don't want to trigger the agent loop (e.g., populating history for tests + or replaying messages). + """ + @spec add_user_message(Scope.t(), String.t(), list()) :: + {:ok, Interaction.UserMessage.t()} | {:error, :not_found} + def add_user_message(%Scope{} = scope, task_id, content_blocks) do + with {:ok, schema} <- get_task_by_id(scope, task_id) do + interaction = Interaction.UserMessage.new(content_blocks) + append_interaction(schema, interaction) + end + end + @doc """ Creates and appends an AgentResponse interaction. """ @@ -409,10 +429,10 @@ defmodule FrontmanServer.Tasks do Starts an execution if none is already running for this task. Fetches the task and delegates to Execution.run. """ - @spec maybe_start_execution(Scope.t(), String.t(), list(), keyword()) :: :ok + @spec maybe_start_execution(Scope.t(), String.t(), list(), keyword()) :: :ok | :already_running def maybe_start_execution(scope, task_id, tools, opts) do if Execution.running?(scope, task_id) do - :ok + :already_running else {:ok, task} = get_task(scope, task_id) diff --git a/apps/frontman_server/lib/frontman_server/tasks/execution.ex b/apps/frontman_server/lib/frontman_server/tasks/execution.ex index 08f282ca..8651ef01 100644 --- a/apps/frontman_server/lib/frontman_server/tasks/execution.ex +++ b/apps/frontman_server/lib/frontman_server/tasks/execution.ex @@ -27,8 +27,8 @@ defmodule FrontmanServer.Tasks.Execution do alias FrontmanServer.Observability.TelemetryEvents alias FrontmanServer.Providers alias FrontmanServer.Providers.{Model, Registry, ResolvedKey} - alias FrontmanServer.Tasks.Execution.{Framework, LLMError, RootAgent, ToolExecutor} - alias FrontmanServer.Tasks.{Interaction, StreamStallTimeout, Task} + alias FrontmanServer.Tasks.Execution.{Framework, RootAgent, ToolExecutor} + alias FrontmanServer.Tasks.{Interaction, Task} alias FrontmanServer.Tools alias SwarmAi.Message @@ -95,7 +95,8 @@ defmodule FrontmanServer.Tasks.Execution do submit_to_runtime(scope, agent, task_id, messages, api_key_info: api_key_info, mcp_tool_defs: mcp_tool_defs, - backend_tool_modules: backend_tool_modules + backend_tool_modules: backend_tool_modules, + interaction_id: Keyword.get(opts, :interaction_id) ) {:error, reason} -> @@ -124,88 +125,6 @@ defmodule FrontmanServer.Tasks.Execution do end end - # --- Event Handling --- - - @doc """ - Translates a SwarmAi event to a transport-level action for the channel. - - Called by the TaskChannel from `handle_info({:swarm_event, event})`. - - **Persistence is handled by SwarmDispatcher** (runs in the Runtime Task - process). This function only determines what the channel should push to - the client — if the channel is dead, no data is lost. - """ - @spec handle_swarm_event(Scope.t(), String.t(), term()) :: term() - def handle_swarm_event(_scope, _task_id, {:response, _response}), do: :ok - - def handle_swarm_event(_scope, _task_id, {:completed, {:ok, _result, _loop_id}}), - do: :agent_completed - - def handle_swarm_event(_scope, _task_id, {:failed, {:error, reason, _loop_id}}) do - {msg, category, retryable} = classify_error(reason) - {:agent_error, %{message: msg, category: category, retryable: retryable}} - end - - def handle_swarm_event(_scope, _task_id, {:crashed, %{reason: reason}}) do - msg = humanize_crash(reason) - {:agent_error, %{message: msg, category: "unknown", retryable: false}} - end - - def handle_swarm_event(_scope, _task_id, {:cancelled, _}), - do: :agent_cancelled - - def handle_swarm_event(_scope, _task_id, {:terminated, _}), - do: :agent_cancelled - - # Paused — ToolResult and AgentPaused already persisted by SwarmDispatcher. - # Return :agent_paused so the channel can resolve the pending prompt and notify the client. - def handle_swarm_event(_scope, _task_id, {:paused, _}), do: :agent_paused - - # Tool calls are persisted by ToolExecutor; no channel action needed. - def handle_swarm_event(_scope, _task_id, {:tool_call, _}), do: :ok - - @doc """ - Classifies an error reason into `{message, category, retryable}`. - - `category` is one of: "auth", "billing", "rate_limit", "overload", - "payload_too_large", "output_truncated", "unknown". - """ - @spec classify_error(term()) :: {String.t(), String.t(), boolean()} - def classify_error(%LLMError{message: msg, category: cat, retryable: r}), do: {msg, cat, r} - - def classify_error(%StreamStallTimeout.Error{}) do - {"The AI provider stopped responding mid-reply. " <> - "This usually happens when the provider is temporarily overloaded. " <> - "Try sending your message again.", "overload", true} - end - - def classify_error(:genserver_call_timeout) do - {"The request to the AI provider timed out. " <> - "This can happen during high traffic. Try again in a moment.", "overload", true} - end - - def classify_error(:stream_timeout) do - {"The request to the AI provider timed out. " <> - "This can happen during high traffic. Try again in a moment.", "overload", true} - end - - def classify_error(:output_truncated) do - {"The AI response was too long and got cut off. " <> - "This usually happens when writing large files. " <> - "Try asking the AI to write the file in smaller sections.", "output_truncated", false} - end - - def classify_error({:exit, reason}) do - {"Something went wrong while communicating with the AI provider: #{inspect(reason)}", - "unknown", false} - end - - def classify_error(reason) when is_exception(reason), - do: {Exception.message(reason), "unknown", false} - - def classify_error(reason) when is_binary(reason), do: {reason, "unknown", false} - def classify_error(reason), do: {inspect(reason), "unknown", false} - # --- Private --- # Dialyzer warning suppressed: protocol dispatch on Agent can't be statically proven. @@ -233,8 +152,15 @@ defmodule FrontmanServer.Tasks.Execution do # in event handlers — the agent may complete before this line returns. TelemetryEvents.task_start(task_id) + interaction_id = Keyword.get(opts, :interaction_id) + case SwarmAi.Runtime.run(FrontmanServer.AgentRuntime, task_id, agent, messages, - metadata: %{task_id: task_id, resolved_key: resolved_key, scope: scope}, + metadata: %{ + task_id: task_id, + resolved_key: resolved_key, + scope: scope, + interaction_id: interaction_id + }, tool_executor: tool_executor ) do {:ok, pid} -> @@ -418,16 +344,4 @@ defmodule FrontmanServer.Tasks.Execution do def error_message(%Scope{}, reason), do: inspect(reason) - - # Translates internal error reasons into user-friendly messages. - # Delegates to classify_error/1 to keep message strings in one place. - defp humanize_error(reason) do - {message, _category, _retryable} = classify_error(reason) - message - end - - # Like humanize_error, but prefixes unknown/fallback reasons with crash context. - defp humanize_crash(reason) when is_exception(reason), do: humanize_error(reason) - defp humanize_crash(reason) when is_atom(reason), do: "Execution crashed: #{inspect(reason)}" - defp humanize_crash(reason), do: "Execution crashed: #{humanize_error(reason)}" end diff --git a/apps/frontman_server/lib/frontman_server/tasks/execution_event.ex b/apps/frontman_server/lib/frontman_server/tasks/execution_event.ex new file mode 100644 index 00000000..09c33835 --- /dev/null +++ b/apps/frontman_server/lib/frontman_server/tasks/execution_event.ex @@ -0,0 +1,112 @@ +defmodule FrontmanServer.Tasks.ExecutionEvent do + @moduledoc """ + Domain event emitted during task execution. + + Wraps raw SwarmAi runtime events with causation context — which user + interaction triggered this execution. The SwarmDispatcher acts as an + Anti-Corruption Layer, translating infrastructure events into these + domain events before broadcasting on PubSub. + """ + + alias FrontmanServer.Tasks.Execution.LLMError + alias FrontmanServer.Tasks.StreamStallTimeout + + @type interaction_id :: String.t() + + @type event_type :: + :chunk + | :response + | :tool_call + | :completed + | :failed + | :crashed + | :cancelled + | :terminated + | :paused + + @enforce_keys [:type] + defstruct [:type, :payload, :caused_by] + + @type t :: %__MODULE__{ + type: event_type(), + payload: term(), + caused_by: interaction_id() | nil + } + + @doc """ + Classifies an execution event into a channel action. + + Persistence is handled by SwarmDispatcher — this function only determines + what the channel should do in response. + """ + @spec classify(t()) :: term() + def classify(%__MODULE__{type: :response}), do: :ok + def classify(%__MODULE__{type: :completed}), do: :agent_completed + def classify(%__MODULE__{type: :cancelled}), do: :agent_cancelled + def classify(%__MODULE__{type: :terminated}), do: :agent_cancelled + def classify(%__MODULE__{type: :paused}), do: :agent_paused + def classify(%__MODULE__{type: :tool_call}), do: :ok + + def classify(%__MODULE__{type: :failed, payload: {:error, reason, _loop_id}}) do + {msg, category, retryable} = classify_error(reason) + {:agent_error, %{message: msg, category: category, retryable: retryable}} + end + + def classify(%__MODULE__{type: :crashed, payload: %{reason: reason}}) do + msg = humanize_crash(reason) + {:agent_error, %{message: msg, category: "unknown", retryable: false}} + end + + @doc """ + Classifies an error reason into `{message, category, retryable}`. + + `category` is one of: "auth", "billing", "rate_limit", "overload", + "payload_too_large", "output_truncated", "unknown". + """ + @spec classify_error(term()) :: {String.t(), String.t(), boolean()} + def classify_error(%LLMError{message: msg, category: cat, retryable: r}), do: {msg, cat, r} + + def classify_error(%StreamStallTimeout.Error{}) do + {"The AI provider stopped responding mid-reply. " <> + "This usually happens when the provider is temporarily overloaded. " <> + "Try sending your message again.", "overload", true} + end + + def classify_error(:genserver_call_timeout) do + {"The request to the AI provider timed out. " <> + "This can happen during high traffic. Try again in a moment.", "overload", true} + end + + def classify_error(:stream_timeout) do + {"The request to the AI provider timed out. " <> + "This can happen during high traffic. Try again in a moment.", "overload", true} + end + + def classify_error(:output_truncated) do + {"The AI response was too long and got cut off. " <> + "This usually happens when writing large files. " <> + "Try asking the AI to write the file in smaller sections.", "output_truncated", false} + end + + def classify_error({:exit, reason}) do + {"Something went wrong while communicating with the AI provider: #{inspect(reason)}", + "unknown", false} + end + + def classify_error(reason) when is_exception(reason), + do: {Exception.message(reason), "unknown", false} + + def classify_error(reason) when is_binary(reason), do: {reason, "unknown", false} + def classify_error(reason), do: {inspect(reason), "unknown", false} + + # Translates internal error reasons into user-friendly messages. + defp humanize_error(reason) do + {message, _category, _retryable} = classify_error(reason) + message + end + + # Like humanize_error, but prefixes unknown/fallback reasons with crash context. + defp humanize_crash(reason) when is_exception(reason), do: humanize_error(reason) + defp humanize_crash(reason) when is_atom(reason), do: "Execution crashed: #{inspect(reason)}" + defp humanize_crash(reason), do: "Execution crashed: #{humanize_error(reason)}" +end diff --git a/apps/frontman_server/lib/frontman_server/tasks/swarm_dispatcher.ex b/apps/frontman_server/lib/frontman_server/tasks/swarm_dispatcher.ex index c2bf5903..4afc75b4 100644 --- a/apps/frontman_server/lib/frontman_server/tasks/swarm_dispatcher.ex +++ b/apps/frontman_server/lib/frontman_server/tasks/swarm_dispatcher.ex @@ -25,7 +25,7 @@ defmodule FrontmanServer.Tasks.SwarmDispatcher do alias FrontmanServer.Observability.TelemetryEvents alias FrontmanServer.Providers alias FrontmanServer.Tasks - alias FrontmanServer.Tasks.Execution + alias FrontmanServer.Tasks.ExecutionEvent def dispatch(key, event, metadata) do scope = Map.get(metadata, :scope) @@ -34,9 +34,17 @@ defmodule FrontmanServer.Tasks.SwarmDispatcher do # 1. Persist (runs in the Runtime Task process — channel-independent) persist(scope, task_id, event, metadata) - # 2. Broadcast for real-time UI (ephemeral, OK to lose if channel is dead) + # 2. Broadcast as a domain event (ACL boundary). topic = Tasks.topic(task_id) - Phoenix.PubSub.broadcast(FrontmanServer.PubSub, topic, {:swarm_event, event}) + {type, payload} = event + + domain_event = %ExecutionEvent{ + type: type, + payload: payload, + caused_by: metadata[:interaction_id] + } + + Phoenix.PubSub.broadcast(FrontmanServer.PubSub, topic, {:execution_event, domain_event}) end # --- Persistence --- @@ -64,7 +72,7 @@ defmodule FrontmanServer.Tasks.SwarmDispatcher do # Agent turn failed (LLM error, tool error, etc.) defp persist(%Scope{} = scope, task_id, {:failed, {:error, reason, loop_id}}, _metadata) do - {reason_str, category, retryable} = Execution.classify_error(reason) + {reason_str, category, retryable} = ExecutionEvent.classify_error(reason) Logger.error( "Execution failed for task #{task_id}, loop_id: #{loop_id}, reason: #{reason_str}" diff --git a/apps/frontman_server/lib/frontman_server_web/channels/task_channel.ex b/apps/frontman_server/lib/frontman_server_web/channels/task_channel.ex index 05d93045..ff4b14e2 100644 --- a/apps/frontman_server/lib/frontman_server_web/channels/task_channel.ex +++ b/apps/frontman_server/lib/frontman_server_web/channels/task_channel.ex @@ -19,7 +19,7 @@ defmodule FrontmanServerWeb.TaskChannel do alias FrontmanServer.Providers alias FrontmanServer.Providers.{Model, Registry} alias FrontmanServer.Tasks - alias FrontmanServer.Tasks.{Execution, RetryCoordinator, Todos} + alias FrontmanServer.Tasks.{Execution, ExecutionEvent, RetryCoordinator, Todos} alias FrontmanServer.Tools alias FrontmanServerWeb.ACPHistory alias FrontmanServerWeb.TaskChannel.MCPInitializer @@ -406,7 +406,7 @@ defmodule FrontmanServerWeb.TaskChannel do if had_retry do # Was in retry countdown — no execution to cancel, end the turn now - finalize_turn(socket, {:completed, ACP.stop_reason_cancelled()}) + finalize_turn(socket, {:completed, ACP.stop_reason_cancelled()}, nil) else {:noreply, socket} end @@ -526,8 +526,6 @@ defmodule FrontmanServerWeb.TaskChannel do # Prepare tools (domain service) all_tools = mcp_tools |> Tools.prepare_for_task(task_id) - socket = assign(socket, :pending_prompt_id, id) - opts = build_execution_opts(socket, env_api_key: env_api_key, @@ -535,10 +533,24 @@ defmodule FrontmanServerWeb.TaskChannel do mcp_tool_defs: mcp_tools ) - socket = assign(socket, :last_execution_opts, opts) - case Tasks.submit_user_message(scope, task_id, prompt.content, all_tools, opts) do - {:ok, _interaction} -> + {:error, :already_running} -> + Logger.info("Rejected prompt — agent already running for task #{task_id}") + error_response = JsonRpc.error_response(id, -32_000, "Agent already running") + {:reply, {:ok, %{@acp_message => error_response}}, socket} + + {:ok, interaction} -> + # Execution started — correlate this JSON-RPC request with the domain interaction + socket = + assign(socket, :pending_prompt, %{ + interaction_id: interaction.id, + jsonrpc_id: id + }) + + # Store interaction_id in execution opts for retry flow + opts = Keyword.put(opts, :interaction_id, interaction.id) + socket = assign(socket, :last_execution_opts, opts) + Logger.info("User message added, agent spawned for task #{task_id}") Tasks.enqueue_title_generation(scope, task_id, prompt.text_summary, @@ -594,9 +606,9 @@ defmodule FrontmanServerWeb.TaskChannel do {:noreply, socket} end - # --- SwarmAi events (dispatched via MFA → PubSub) --- + # --- Execution events (domain events from SwarmDispatcher via PubSub) --- - def handle_info({:swarm_event, {:chunk, chunk}}, socket) do + def handle_info({:execution_event, %ExecutionEvent{type: :chunk, payload: chunk}}, socket) do task_id = socket.assigns.task_id socket = @@ -632,25 +644,22 @@ defmodule FrontmanServerWeb.TaskChannel do {:noreply, socket} end - def handle_info({:swarm_event, event}, socket) do - scope = socket.assigns.scope - task_id = socket.assigns.task_id - - case Execution.handle_swarm_event(scope, task_id, event) do + def handle_info({:execution_event, %ExecutionEvent{} = event}, socket) do + case ExecutionEvent.classify(event) do :agent_completed -> - finalize_turn(socket, {:completed, ACP.stop_reason_end_turn()}) + finalize_turn(socket, {:completed, ACP.stop_reason_end_turn()}, event.caused_by) :agent_cancelled -> - finalize_turn(socket, {:completed, ACP.stop_reason_cancelled()}) + finalize_turn(socket, {:completed, ACP.stop_reason_cancelled()}, event.caused_by) :agent_paused -> - finalize_turn(socket, {:completed, ACP.stop_reason_end_turn()}) + finalize_turn(socket, {:completed, ACP.stop_reason_end_turn()}, event.caused_by) {:agent_error, %{retryable: true} = error_info} -> - handle_transient_error(socket, error_info) + handle_transient_error(socket, error_info, event.caused_by) {:agent_error, %{retryable: false} = error_info} -> - finalize_turn(socket, {:error, error_info.message, error_info.category}) + finalize_turn(socket, {:error, error_info.message, error_info.category}, event.caused_by) :ok -> {:noreply, socket} @@ -742,7 +751,7 @@ defmodule FrontmanServerWeb.TaskChannel do # Agent failed to start (e.g. no API key, usage limit). Broadcast by # Tasks.maybe_start_execution when Execution.run returns an error. def handle_info({:execution_start_error, msg}, socket) do - finalize_turn(socket, {:error, msg, "unknown"}) + finalize_turn(socket, {:error, msg, "unknown"}, nil) end def handle_info(:fire_retry, socket) do @@ -804,10 +813,10 @@ defmodule FrontmanServerWeb.TaskChannel do {:noreply, socket} end - defp handle_transient_error(socket, error_info) do + defp handle_transient_error(socket, error_info, caused_by) do case RetryCoordinator.handle_error(socket.assigns[:retry_state], error_info) do {:exhausted, error_info} -> - finalize_turn(socket, {:error, error_info.message, error_info.category}) + finalize_turn(socket, {:error, error_info.message, error_info.category}, caused_by) {:retry_scheduled, state, notification} -> task_id = socket.assigns.task_id @@ -831,8 +840,9 @@ defmodule FrontmanServerWeb.TaskChannel do {:completed, stop_reason :: String.t()} | {:error, message :: String.t(), category :: String.t()} - @spec finalize_turn(Phoenix.Socket.t(), turn_outcome()) :: {:noreply, Phoenix.Socket.t()} - defp finalize_turn(socket, outcome) do + @spec finalize_turn(Phoenix.Socket.t(), turn_outcome(), String.t() | nil) :: + {:noreply, Phoenix.Socket.t()} + defp finalize_turn(socket, outcome, caused_by) do task_id = socket.assigns.task_id socket = assign(socket, :retry_state, RetryCoordinator.clear(socket.assigns[:retry_state])) @@ -840,27 +850,34 @@ defmodule FrontmanServerWeb.TaskChannel do {:completed, stop_reason} -> notification = ACP.build_agent_turn_complete_notification(task_id, stop_reason) push(socket, @acp_message, notification) - resolve_pending_prompt(socket, {:ok, stop_reason}) + resolve_pending_prompt(socket, {:ok, stop_reason}, caused_by) {:error, message, category} -> notification = ACP.build_error_notification(task_id, message, DateTime.utc_now(), category: category) push(socket, @acp_message, notification) - resolve_pending_prompt(socket, {:error, message}) + resolve_pending_prompt(socket, {:error, message}, caused_by) end end - defp resolve_pending_prompt(socket, result) do + defp resolve_pending_prompt(socket, result, caused_by) do task_id = socket.assigns.task_id socket = - case socket.assigns[:pending_prompt_id] do + case socket.assigns[:pending_prompt] do nil -> - Logger.info("Turn finalized with no pending_prompt_id for task #{task_id}") + Logger.info("Turn finalized with no pending prompt for task #{task_id}") socket - prompt_id -> + %{interaction_id: interaction_id, jsonrpc_id: prompt_id} -> + if caused_by && caused_by != interaction_id do + Logger.warning( + "Causation mismatch resolving prompt for task #{task_id}: " <> + "pending interaction #{interaction_id}, event caused by #{caused_by}" + ) + end + response = case result do {:ok, stop_reason} -> @@ -876,7 +893,7 @@ defmodule FrontmanServerWeb.TaskChannel do end push(socket, @acp_message, response) - assign(socket, :pending_prompt_id, nil) + assign(socket, :pending_prompt, nil) end {:noreply, socket} diff --git a/apps/frontman_server/test/frontman_server/tasks/execution/error_propagation_test.exs b/apps/frontman_server/test/frontman_server/tasks/execution/error_propagation_test.exs index b7038eb6..11a1de2a 100644 --- a/apps/frontman_server/test/frontman_server/tasks/execution/error_propagation_test.exs +++ b/apps/frontman_server/test/frontman_server/tasks/execution/error_propagation_test.exs @@ -17,6 +17,7 @@ defmodule FrontmanServer.Tasks.Execution.ErrorPropagationTest do alias Ecto.Adapters.SQL.Sandbox alias FrontmanServer.Tasks + alias FrontmanServer.Tasks.ExecutionEvent describe "LLM stream error propagation" do setup do @@ -30,7 +31,7 @@ defmodule FrontmanServer.Tasks.Execution.ErrorPropagationTest do end @tag :capture_log - test "LLM stream raise propagates as {:swarm_event, {:failed, ...}} via PubSub", %{ + test "LLM stream raise propagates as ExecutionEvent{type: :failed} via PubSub", %{ task_id: task_id, scope: scope } do @@ -52,12 +53,15 @@ defmodule FrontmanServer.Tasks.Execution.ErrorPropagationTest do ) # Stream errors are now caught and surfaced as graceful failures - assert_receive {:swarm_event, {:failed, {:error, reason, _loop_id}}}, 5_000 + assert_receive {:execution_event, + %ExecutionEvent{type: :failed, payload: {:error, reason, _loop_id}}}, + 5_000 + assert Exception.message(reason) =~ "image exceeds the maximum allowed size" end @tag :capture_log - test "LLM returning {:error, reason} surfaces as {:swarm_event, {:failed, ...}}", %{ + test "LLM returning {:error, reason} surfaces as ExecutionEvent{type: :failed}", %{ task_id: task_id, scope: scope } do @@ -71,7 +75,9 @@ defmodule FrontmanServer.Tasks.Execution.ErrorPropagationTest do ) # Should receive a failed event broadcast - assert_receive {:swarm_event, {:failed, {:error, _reason, _loop_id}}}, 5_000 + assert_receive {:execution_event, + %ExecutionEvent{type: :failed, payload: {:error, _reason, _loop_id}}}, + 5_000 end end end diff --git a/apps/frontman_server/test/frontman_server/tasks/execution/execution_sentry_test.exs b/apps/frontman_server/test/frontman_server/tasks/execution/execution_sentry_test.exs index e8a79184..96539a6f 100644 --- a/apps/frontman_server/test/frontman_server/tasks/execution/execution_sentry_test.exs +++ b/apps/frontman_server/test/frontman_server/tasks/execution/execution_sentry_test.exs @@ -14,6 +14,7 @@ defmodule FrontmanServer.Tasks.Execution.ExecutionSentryTest do alias Ecto.Adapters.SQL.Sandbox alias FrontmanServer.Tasks + alias FrontmanServer.Tasks.ExecutionEvent setup do Sentry.Test.start_collecting_sentry_reports() @@ -43,7 +44,7 @@ defmodule FrontmanServer.Tasks.Execution.ExecutionSentryTest do ) # Wait for the failed event broadcast (Sentry call completes before broadcast) - assert_receive {:swarm_event, {:failed, _}}, 5_000 + assert_receive {:execution_event, %ExecutionEvent{type: :failed}}, 5_000 reports = Sentry.Test.pop_sentry_reports() @@ -85,7 +86,9 @@ defmodule FrontmanServer.Tasks.Execution.ExecutionSentryTest do ) # Stream errors now produce {:failed, ...} instead of {:crashed, ...} - assert_receive {:swarm_event, {:failed, {:error, _reason, _loop_id}}}, 5_000 + assert_receive {:execution_event, + %ExecutionEvent{type: :failed, payload: {:error, _reason, _loop_id}}}, + 5_000 reports = Sentry.Test.pop_sentry_reports() diff --git a/apps/frontman_server/test/frontman_server/tasks/execution_classify_error_test.exs b/apps/frontman_server/test/frontman_server/tasks/execution_classify_error_test.exs index eced4e2f..610145a2 100644 --- a/apps/frontman_server/test/frontman_server/tasks/execution_classify_error_test.exs +++ b/apps/frontman_server/test/frontman_server/tasks/execution_classify_error_test.exs @@ -1,54 +1,54 @@ defmodule FrontmanServer.Tasks.ExecutionClassifyErrorTest do use ExUnit.Case, async: true - alias FrontmanServer.Tasks.Execution alias FrontmanServer.Tasks.Execution.LLMError + alias FrontmanServer.Tasks.ExecutionEvent alias FrontmanServer.Tasks.StreamStallTimeout describe "classify_error/1" do test "LLMError passes through message, category, retryable" do err = %LLMError{message: "Rate limited", category: "rate_limit", retryable: true} - assert {"Rate limited", "rate_limit", true} = Execution.classify_error(err) + assert {"Rate limited", "rate_limit", true} = ExecutionEvent.classify_error(err) end test "StreamStallTimeout.Error returns overload, retryable" do err = %StreamStallTimeout.Error{} - {msg, "overload", true} = Execution.classify_error(err) + {msg, "overload", true} = ExecutionEvent.classify_error(err) assert is_binary(msg) and String.length(msg) > 0 end test ":genserver_call_timeout returns overload, retryable" do - {msg, "overload", true} = Execution.classify_error(:genserver_call_timeout) + {msg, "overload", true} = ExecutionEvent.classify_error(:genserver_call_timeout) assert is_binary(msg) and String.length(msg) > 0 end test ":stream_timeout returns overload, retryable" do - {msg, "overload", true} = Execution.classify_error(:stream_timeout) + {msg, "overload", true} = ExecutionEvent.classify_error(:stream_timeout) assert is_binary(msg) and String.length(msg) > 0 end test ":output_truncated returns output_truncated, not retryable" do - {msg, "output_truncated", false} = Execution.classify_error(:output_truncated) + {msg, "output_truncated", false} = ExecutionEvent.classify_error(:output_truncated) assert is_binary(msg) and String.length(msg) > 0 end test "{:exit, reason} returns unknown, not retryable" do - {msg, "unknown", false} = Execution.classify_error({:exit, :some_reason}) + {msg, "unknown", false} = ExecutionEvent.classify_error({:exit, :some_reason}) assert String.contains?(msg, "some_reason") end test "generic exception returns unknown, not retryable" do err = %RuntimeError{message: "something bad"} - {msg, "unknown", false} = Execution.classify_error(err) + {msg, "unknown", false} = ExecutionEvent.classify_error(err) assert String.contains?(msg, "something bad") end test "binary reason returns as-is with unknown, not retryable" do - {"custom error", "unknown", false} = Execution.classify_error("custom error") + {"custom error", "unknown", false} = ExecutionEvent.classify_error("custom error") end test "unknown atom returns inspect string with unknown, not retryable" do - {msg, "unknown", false} = Execution.classify_error(:some_weird_atom) + {msg, "unknown", false} = ExecutionEvent.classify_error(:some_weird_atom) assert String.contains?(msg, "some_weird_atom") end end diff --git a/apps/frontman_server/test/frontman_server/tasks/execution_classify_test.exs b/apps/frontman_server/test/frontman_server/tasks/execution_classify_test.exs index 61009096..1e04e93b 100644 --- a/apps/frontman_server/test/frontman_server/tasks/execution_classify_test.exs +++ b/apps/frontman_server/test/frontman_server/tasks/execution_classify_test.exs @@ -1,44 +1,44 @@ defmodule FrontmanServer.Tasks.ExecutionClassifyTest do use ExUnit.Case, async: true - alias FrontmanServer.Tasks.Execution alias FrontmanServer.Tasks.Execution.LLMError + alias FrontmanServer.Tasks.ExecutionEvent alias FrontmanServer.Tasks.StreamStallTimeout describe "classify_error/1" do test "LLMError preserves category and retryable" do err = %LLMError{message: "Rate limited", category: "rate_limit", retryable: true} - assert {msg, "rate_limit", true} = Execution.classify_error(err) + assert {msg, "rate_limit", true} = ExecutionEvent.classify_error(err) assert msg == "Rate limited" end test "LLMError auth is not retryable" do err = %LLMError{message: "Auth failed", category: "auth", retryable: false} - assert {"Auth failed", "auth", false} = Execution.classify_error(err) + assert {"Auth failed", "auth", false} = ExecutionEvent.classify_error(err) end test "StreamStallTimeout is retryable with overload category" do err = %StreamStallTimeout.Error{timeout_ms: 30_000} - {msg, category, retryable} = Execution.classify_error(err) + {msg, category, retryable} = ExecutionEvent.classify_error(err) assert retryable == true assert category == "overload" assert String.contains?(msg, "stopped responding") end test ":genserver_call_timeout is retryable with overload category" do - {_msg, category, retryable} = Execution.classify_error(:genserver_call_timeout) + {_msg, category, retryable} = ExecutionEvent.classify_error(:genserver_call_timeout) assert retryable == true assert category == "overload" end test ":output_truncated is not retryable" do - {_msg, category, retryable} = Execution.classify_error(:output_truncated) + {_msg, category, retryable} = ExecutionEvent.classify_error(:output_truncated) assert retryable == false assert category == "output_truncated" end test "unknown reason is not retryable with unknown category" do - {_msg, category, retryable} = Execution.classify_error(:some_unknown_atom) + {_msg, category, retryable} = ExecutionEvent.classify_error(:some_unknown_atom) assert retryable == false assert category == "unknown" end diff --git a/apps/frontman_server/test/frontman_server/tasks/execution_test.exs b/apps/frontman_server/test/frontman_server/tasks/execution_test.exs index ba365b73..2a270f00 100644 --- a/apps/frontman_server/test/frontman_server/tasks/execution_test.exs +++ b/apps/frontman_server/test/frontman_server/tasks/execution_test.exs @@ -20,7 +20,7 @@ defmodule FrontmanServer.Tasks.ExecutionIntegrationTest do alias Ecto.Adapters.SQL.Sandbox alias FrontmanServer.Accounts.Scope alias FrontmanServer.Tasks - alias FrontmanServer.Tasks.Interaction + alias FrontmanServer.Tasks.{ExecutionEvent, Interaction} alias FrontmanServer.Tools.MCP alias FrontmanServer.Workers.GenerateTitle @@ -139,11 +139,51 @@ defmodule FrontmanServer.Tasks.ExecutionIntegrationTest do assert :ok = Tasks.cancel_execution(scope, task_id) - assert_receive {:swarm_event, {:cancelled, _}}, 5_000 + assert_receive {:execution_event, %ExecutionEvent{type: :cancelled}}, 5_000 refute SwarmAi.Runtime.running?(FrontmanServer.AgentRuntime, task_id) end end + # -- Concurrent execution prevention ---------------------------------------- + + describe "concurrent execution prevention" do + setup [:setup_sandbox, :setup_user, :setup_task] + + test "second submit returns :already_running while agent is executing", %{ + task_id: task_id, + scope: scope + } do + slow_llm = %MockLLM{response: "slow response", delay_ms: 5_000} + agent = test_agent(slow_llm, "SlowAgent") + + {:ok, _interaction} = + Tasks.submit_user_message(scope, task_id, user_content("First"), [], agent: agent) + + Process.sleep(100) + assert SwarmAi.Runtime.running?(FrontmanServer.AgentRuntime, task_id) + + assert {:error, :already_running} = + Tasks.submit_user_message(scope, task_id, user_content("Second"), [], agent: agent) + + # Only one completion should fire + assert_receive {:interaction, %Interaction.AgentCompleted{}}, 6_000 + refute_receive {:interaction, %Interaction.AgentCompleted{}}, 500 + + # Only one agent response persisted — second message was rejected entirely + {:ok, task} = Tasks.get_task(scope, task_id) + + agent_responses = + Enum.filter(task.interactions, &match?(%Interaction.AgentResponse{}, &1)) + + assert length(agent_responses) == 1 + + user_messages = + Enum.filter(task.interactions, &match?(%Interaction.UserMessage{}, &1)) + + assert length(user_messages) == 1 + end + end + # -- Consecutive messages -------------------------------------------------- describe "consecutive messages" do @@ -247,7 +287,7 @@ defmodule FrontmanServer.Tasks.ExecutionIntegrationTest do false ) - assert_receive {:swarm_event, {:completed, _}}, 5_000 + assert_receive {:execution_event, %ExecutionEvent{type: :completed}}, 5_000 {:ok, task} = Tasks.get_task(scope, task_id) @@ -349,7 +389,7 @@ defmodule FrontmanServer.Tasks.ExecutionIntegrationTest do ) # Wait for the ParallelExecutor deadline to fire and the paused event to broadcast - assert_receive {:swarm_event, {:paused, _}}, 5_000 + assert_receive {:execution_event, %ExecutionEvent{type: :paused}}, 5_000 {:ok, task} = Tasks.get_task(scope, task_id) @@ -407,7 +447,7 @@ defmodule FrontmanServer.Tasks.ExecutionIntegrationTest do ) # Agent completes (not pauses) — the error result is sent to the LLM which responds - assert_receive {:swarm_event, {:completed, _}}, 5_000 + assert_receive {:execution_event, %ExecutionEvent{type: :completed}}, 5_000 {:ok, task} = Tasks.get_task(scope, task_id) @@ -454,7 +494,11 @@ defmodule FrontmanServer.Tasks.ExecutionIntegrationTest do Phoenix.PubSub.broadcast( FrontmanServer.PubSub, Tasks.topic(task_id), - {:swarm_event, {:paused, {:timeout, "tc_fake", "question", 120_000}}} + {:execution_event, + %ExecutionEvent{ + type: :paused, + payload: {:timeout, "tc_fake", "question", 120_000} + }} ) # Flush the channel's message queue before asserting pushes @@ -687,7 +731,7 @@ defmodule FrontmanServer.Tasks.ExecutionIntegrationTest do Tasks.submit_user_message(scope, task_id, user_content("Hello"), [], agent: agent) # Wait for SwarmDispatcher to broadcast the terminated event before checking the channel. - assert_receive {:swarm_event, {:terminated, _}}, 5_000 + assert_receive {:execution_event, %ExecutionEvent{type: :terminated}}, 5_000 # Flush the channel's message queue before asserting pushes. :sys.get_state(socket.channel_pid) @@ -746,7 +790,7 @@ defmodule FrontmanServer.Tasks.ExecutionIntegrationTest do {:ok, _} = Tasks.submit_user_message(scope, task_id, user_content("Hello"), [], agent: agent) - assert_receive {:swarm_event, {:crashed, _}}, 5_000 + assert_receive {:execution_event, %ExecutionEvent{type: :crashed}}, 5_000 :sys.get_state(socket.channel_pid) @@ -803,7 +847,7 @@ defmodule FrontmanServer.Tasks.ExecutionIntegrationTest do {:ok, _} = Tasks.submit_user_message(scope, task_id, user_content("Hello"), [], agent: agent) - assert_receive {:swarm_event, {:failed, _}}, 5_000 + assert_receive {:execution_event, %ExecutionEvent{type: :failed}}, 5_000 :sys.get_state(socket.channel_pid) @@ -855,7 +899,7 @@ defmodule FrontmanServer.Tasks.ExecutionIntegrationTest do backend_tool_modules: [CrashTool] ) - assert_receive {:swarm_event, {:completed, _}}, 5_000 + assert_receive {:execution_event, %ExecutionEvent{type: :completed}}, 5_000 {:ok, task} = Tasks.get_task(scope, task_id) @@ -899,7 +943,7 @@ defmodule FrontmanServer.Tasks.ExecutionIntegrationTest do ) # on_timeout: :error feeds the error back to the LLM, agent completes normally - assert_receive {:swarm_event, {:completed, _}}, 5_000 + assert_receive {:execution_event, %ExecutionEvent{type: :completed}}, 5_000 {:ok, task} = Tasks.get_task(scope, task_id) diff --git a/apps/frontman_server/test/frontman_server/tasks_test.exs b/apps/frontman_server/test/frontman_server/tasks_test.exs index 76a10027..7ec3dd82 100644 --- a/apps/frontman_server/test/frontman_server/tasks_test.exs +++ b/apps/frontman_server/test/frontman_server/tasks_test.exs @@ -255,17 +255,9 @@ defmodule FrontmanServer.TasksTest do test "assigns monotonically increasing sequences", %{scope: scope} do task_id = task_fixture(scope) - {:ok, msg1} = - Tasks.submit_user_message(scope, task_id, user_content("hello"), [], - agent: %FrontmanServer.Testing.BlockingAgent{} - ) - - {:ok, msg2} = Tasks.add_agent_response(scope, task_id, "hi there") - - {:ok, msg3} = - Tasks.submit_user_message(scope, task_id, user_content("again"), [], - agent: %FrontmanServer.Testing.BlockingAgent{} - ) + {:ok, msg1} = Tasks.add_agent_response(scope, task_id, "first") + {:ok, msg2} = Tasks.add_agent_response(scope, task_id, "second") + {:ok, msg3} = Tasks.add_agent_response(scope, task_id, "third") assert msg1.sequence > 0 assert msg2.sequence > msg1.sequence diff --git a/apps/frontman_server/test/frontman_server_web/channels/task_channel_test.exs b/apps/frontman_server/test/frontman_server_web/channels/task_channel_test.exs index ba009978..da25c837 100644 --- a/apps/frontman_server/test/frontman_server_web/channels/task_channel_test.exs +++ b/apps/frontman_server/test/frontman_server_web/channels/task_channel_test.exs @@ -10,21 +10,45 @@ defmodule FrontmanServerWeb.TaskChannelTest do alias FrontmanServer.Workers.GenerateTitle alias FrontmanServerWeb.UserSocket - # --- Swarm event builders (match production shapes from Runtime) --- + alias FrontmanServer.Tasks.ExecutionEvent - defp swarm_chunk(type, text), do: {:swarm_event, {:chunk, %{type: type, text: text}}} + # --- Execution event builders (domain events from SwarmDispatcher) --- - defp swarm_tool_call_start(id, name), + defp execution_chunk(type, text), do: - {:swarm_event, {:chunk, %{type: :tool_call_start, tool_call_id: id, tool_call_name: name}}} + {:execution_event, + %ExecutionEvent{type: :chunk, payload: %{type: type, text: text}, caused_by: nil}} - defp swarm_completed, - do: {:swarm_event, {:completed, {:ok, nil, System.unique_integer([:positive])}}} - - defp swarm_failed(reason), - do: {:swarm_event, {:failed, {:error, reason, System.unique_integer([:positive])}}} - - defp swarm_cancelled, do: {:swarm_event, {:cancelled, %{loop: nil}}} + defp execution_tool_call_start(id, name), + do: + {:execution_event, + %ExecutionEvent{ + type: :chunk, + payload: %{type: :tool_call_start, tool_call_id: id, tool_call_name: name}, + caused_by: nil + }} + + defp execution_completed, + do: + {:execution_event, + %ExecutionEvent{ + type: :completed, + payload: {:ok, nil, System.unique_integer([:positive])}, + caused_by: nil + }} + + defp execution_failed(reason), + do: + {:execution_event, + %ExecutionEvent{ + type: :failed, + payload: {:error, reason, System.unique_integer([:positive])}, + caused_by: nil + }} + + defp execution_cancelled, + do: + {:execution_event, %ExecutionEvent{type: :cancelled, payload: %{loop: nil}, caused_by: nil}} # Collects all pending push messages from the test process mailbox. # Phoenix.ChannelTest sends pushes as {:socket_push, event, payload} messages. @@ -200,7 +224,7 @@ defmodule FrontmanServerWeb.TaskChannelTest do Phoenix.PubSub.broadcast( FrontmanServer.PubSub, Tasks.topic(task_id), - swarm_chunk(:token, "Hello world") + execution_chunk(:token, "Hello world") ) # Channel should forward this as an ACP notification @@ -224,7 +248,7 @@ defmodule FrontmanServerWeb.TaskChannelTest do Phoenix.PubSub.broadcast( FrontmanServer.PubSub, Tasks.topic(task_id), - swarm_chunk(:thinking, "reasoning...") + execution_chunk(:thinking, "reasoning...") ) refute_push("acp:message", %{ @@ -234,7 +258,7 @@ defmodule FrontmanServerWeb.TaskChannelTest do Phoenix.PubSub.broadcast( FrontmanServer.PubSub, Tasks.topic(task_id), - swarm_chunk(:token, "") + execution_chunk(:token, "") ) refute_push("acp:message", %{ @@ -245,7 +269,7 @@ defmodule FrontmanServerWeb.TaskChannelTest do Phoenix.PubSub.broadcast( FrontmanServer.PubSub, Tasks.topic(task_id), - swarm_chunk(:token, "after thinking") + execution_chunk(:token, "after thinking") ) assert_push("acp:message", %{ @@ -277,7 +301,7 @@ defmodule FrontmanServerWeb.TaskChannelTest do Phoenix.PubSub.broadcast( FrontmanServer.PubSub, Tasks.topic(task_id), - swarm_failed("Rate limit exceeded") + execution_failed("Rate limit exceeded") ) # Assert session/update notification was pushed with error @@ -298,7 +322,7 @@ defmodule FrontmanServerWeb.TaskChannelTest do socket: socket, task_id: task_id } do - # First, send a prompt to set pending_prompt_id + # First, send a prompt to set pending_prompt push(socket, "acp:message", build_prompt_request(id: 42)) # Wait for the prompt to be processed :sys.get_state(socket.channel_pid) @@ -306,7 +330,7 @@ defmodule FrontmanServerWeb.TaskChannelTest do Phoenix.PubSub.broadcast( FrontmanServer.PubSub, Tasks.topic(task_id), - swarm_failed("No API key available") + execution_failed("No API key available") ) # Assert session/update notification is pushed @@ -339,7 +363,7 @@ defmodule FrontmanServerWeb.TaskChannelTest do Phoenix.PubSub.broadcast( FrontmanServer.PubSub, Tasks.topic(task_id), - swarm_failed("Connection failed") + execution_failed("Connection failed") ) # Should get session/update notification @@ -365,13 +389,13 @@ defmodule FrontmanServerWeb.TaskChannelTest do {:ok, socket: socket, task_id: task_id} end - test "sends agent_turn_complete notification when pending_prompt_id is nil", %{ + test "sends agent_turn_complete notification when pending_prompt is nil", %{ socket: socket, task_id: task_id } do # Simulate: execution was resumed after tool result (no pending prompt), # then the agent completes. There's no JSON-RPC request to respond to. - Phoenix.PubSub.broadcast(FrontmanServer.PubSub, Tasks.topic(task_id), swarm_completed()) + Phoenix.PubSub.broadcast(FrontmanServer.PubSub, Tasks.topic(task_id), execution_completed()) :sys.get_state(socket.channel_pid) @@ -401,7 +425,7 @@ defmodule FrontmanServerWeb.TaskChannelTest do task_id: task_id } do # First: completed event with no pending prompt - Phoenix.PubSub.broadcast(FrontmanServer.PubSub, Tasks.topic(task_id), swarm_completed()) + Phoenix.PubSub.broadcast(FrontmanServer.PubSub, Tasks.topic(task_id), execution_completed()) :sys.get_state(socket.channel_pid) @@ -410,7 +434,7 @@ defmodule FrontmanServerWeb.TaskChannelTest do :sys.get_state(socket.channel_pid) - Phoenix.PubSub.broadcast(FrontmanServer.PubSub, Tasks.topic(task_id), swarm_completed()) + Phoenix.PubSub.broadcast(FrontmanServer.PubSub, Tasks.topic(task_id), execution_completed()) assert_push("acp:message", %{ "jsonrpc" => "2.0", @@ -834,11 +858,11 @@ defmodule FrontmanServerWeb.TaskChannelTest do socket: socket, task_id: task_id } do - # Send a prompt to set pending_prompt_id + # Send a prompt to set pending_prompt push(socket, "acp:message", build_prompt_request(id: 99)) :sys.get_state(socket.channel_pid) - Phoenix.PubSub.broadcast(FrontmanServer.PubSub, Tasks.topic(task_id), swarm_cancelled()) + Phoenix.PubSub.broadcast(FrontmanServer.PubSub, Tasks.topic(task_id), execution_cancelled()) # The pending prompt should resolve with stopReason: "cancelled" assert_push("acp:message", %{ @@ -852,45 +876,14 @@ defmodule FrontmanServerWeb.TaskChannelTest do socket: socket, task_id: task_id } do - # No prompt was sent, so no pending_prompt_id exists - Phoenix.PubSub.broadcast(FrontmanServer.PubSub, Tasks.topic(task_id), swarm_cancelled()) + # No prompt was sent, so no pending_prompt exists + Phoenix.PubSub.broadcast(FrontmanServer.PubSub, Tasks.topic(task_id), execution_cancelled()) :sys.get_state(socket.channel_pid) # No prompt response should be pushed refute_push("acp:message", %{"result" => %{"stopReason" => "cancelled"}}) end - - test "cancel does not interfere with subsequent prompts", %{ - socket: socket, - task_id: task_id - } do - # Send first prompt - push(socket, "acp:message", build_prompt_request(id: 1)) - - :sys.get_state(socket.channel_pid) - - # Cancel it - Phoenix.PubSub.broadcast(FrontmanServer.PubSub, Tasks.topic(task_id), swarm_cancelled()) - - assert_push("acp:message", %{ - "id" => 1, - "result" => %{"stopReason" => "cancelled"} - }) - - # Send a second prompt - this should work normally - push(socket, "acp:message", build_prompt_request(id: 2, text: "Follow up")) - - :sys.get_state(socket.channel_pid) - - # Complete the second prompt normally - Phoenix.PubSub.broadcast(FrontmanServer.PubSub, Tasks.topic(task_id), swarm_completed()) - - assert_push("acp:message", %{ - "id" => 2, - "result" => %{"stopReason" => "end_turn"} - }) - end end describe "tool_call_start chunk streaming" do @@ -909,7 +902,7 @@ defmodule FrontmanServerWeb.TaskChannelTest do Phoenix.PubSub.broadcast( FrontmanServer.PubSub, Tasks.topic(task_id), - swarm_tool_call_start(tool_call_id, "write_file") + execution_tool_call_start(tool_call_id, "write_file") ) assert_push("acp:message", %{ @@ -933,7 +926,7 @@ defmodule FrontmanServerWeb.TaskChannelTest do tool_call_id = "call_dedup_#{:rand.uniform(1_000_000)}" # Step 1: Send tool_call_start chunk (early streaming notification) - send(socket.channel_pid, swarm_tool_call_start(tool_call_id, "write_file")) + send(socket.channel_pid, execution_tool_call_start(tool_call_id, "write_file")) :sys.get_state(socket.channel_pid) assert_push("acp:message", %{ @@ -1016,7 +1009,7 @@ defmodule FrontmanServerWeb.TaskChannelTest do call_id_2 = "call_multi_2_#{:rand.uniform(1_000_000)}" # Announce first tool call via streaming - send(socket.channel_pid, swarm_tool_call_start(call_id_1, "write_file")) + send(socket.channel_pid, execution_tool_call_start(call_id_1, "write_file")) :sys.get_state(socket.channel_pid) assert_push("acp:message", %{ @@ -1311,7 +1304,7 @@ defmodule FrontmanServerWeb.TaskChannelTest do Phoenix.PubSub.broadcast( FrontmanServer.PubSub, Tasks.topic(task_id), - swarm_failed(error) + execution_failed(error) ) assert_push("acp:message", %{ @@ -1338,7 +1331,7 @@ defmodule FrontmanServerWeb.TaskChannelTest do Phoenix.PubSub.broadcast( FrontmanServer.PubSub, Tasks.topic(task_id), - swarm_failed(error) + execution_failed(error) ) assert_push("acp:message", %{ @@ -1367,7 +1360,12 @@ defmodule FrontmanServerWeb.TaskChannelTest do retryable: true } - Phoenix.PubSub.broadcast(FrontmanServer.PubSub, Tasks.topic(task_id), swarm_failed(error)) + Phoenix.PubSub.broadcast( + FrontmanServer.PubSub, + Tasks.topic(task_id), + execution_failed(error) + ) + :sys.get_state(socket.channel_pid) retried_error_id = "error-#{task_id}-some-timestamp" @@ -1396,6 +1394,52 @@ defmodule FrontmanServerWeb.TaskChannelTest do ) end + test "when all retries are exhausted, the pending prompt is resolved with an error", %{ + socket: socket, + task_id: task_id + } do + push(socket, "acp:message", build_prompt_request(id: 99)) + :sys.get_state(socket.channel_pid) + + error = %FrontmanServer.Tasks.Execution.LLMError{ + message: "Rate limited", + category: "rate_limit", + retryable: true + } + + # Cycle through all 5 attempts: the first error starts the coordinator, + # then each fire_retry + error pair increments the attempt until exhausted. + Phoenix.PubSub.broadcast( + FrontmanServer.PubSub, + Tasks.topic(task_id), + execution_failed(error) + ) + + :sys.get_state(socket.channel_pid) + + Enum.each(1..5, fn _ -> + send(socket.channel_pid, :fire_retry) + :sys.get_state(socket.channel_pid) + + Phoenix.PubSub.broadcast( + FrontmanServer.PubSub, + Tasks.topic(task_id), + execution_failed(error) + ) + + :sys.get_state(socket.channel_pid) + end) + + assert_push("acp:message", %{ + "jsonrpc" => "2.0", + "id" => 99, + "error" => %{"code" => -32_000, "message" => "Rate limited"} + }) + + %{assigns: assigns} = :sys.get_state(socket.channel_pid) + assert is_nil(assigns[:retry_state]) + end + test "second transient error increments retry attempt instead of resetting to 1", %{ socket: _socket, task_id: task_id @@ -1406,13 +1450,21 @@ defmodule FrontmanServerWeb.TaskChannelTest do retryable: true } - Phoenix.PubSub.broadcast(FrontmanServer.PubSub, Tasks.topic(task_id), swarm_failed(error)) + Phoenix.PubSub.broadcast( + FrontmanServer.PubSub, + Tasks.topic(task_id), + execution_failed(error) + ) assert_push("acp:message", %{ "params" => %{"update" => %{"sessionUpdate" => "error", "attempt" => 1, "retryAt" => _}} }) - Phoenix.PubSub.broadcast(FrontmanServer.PubSub, Tasks.topic(task_id), swarm_failed(error)) + Phoenix.PubSub.broadcast( + FrontmanServer.PubSub, + Tasks.topic(task_id), + execution_failed(error) + ) assert_push("acp:message", %{ "params" => %{"update" => %{"sessionUpdate" => "error", "attempt" => 2, "retryAt" => _}} @@ -1439,7 +1491,12 @@ defmodule FrontmanServerWeb.TaskChannelTest do } # First transient error — coordinator starts - Phoenix.PubSub.broadcast(FrontmanServer.PubSub, Tasks.topic(task_id), swarm_failed(error)) + Phoenix.PubSub.broadcast( + FrontmanServer.PubSub, + Tasks.topic(task_id), + execution_failed(error) + ) + :sys.get_state(socket.channel_pid) assert_push("acp:message", %{ @@ -1451,12 +1508,17 @@ defmodule FrontmanServerWeb.TaskChannelTest do :sys.get_state(socket.channel_pid) # Execution succeeds — handle_turn_ended should stop and clear the coordinator - Phoenix.PubSub.broadcast(FrontmanServer.PubSub, Tasks.topic(task_id), swarm_completed()) + Phoenix.PubSub.broadcast(FrontmanServer.PubSub, Tasks.topic(task_id), execution_completed()) :sys.get_state(socket.channel_pid) flush_mailbox() # New turn hits a transient error — should start fresh at attempt 1 - Phoenix.PubSub.broadcast(FrontmanServer.PubSub, Tasks.topic(task_id), swarm_failed(error)) + Phoenix.PubSub.broadcast( + FrontmanServer.PubSub, + Tasks.topic(task_id), + execution_failed(error) + ) + :sys.get_state(socket.channel_pid) # BUG: handle_turn_ended does not clear retry_coordinator, so handle_transient_error @@ -1485,7 +1547,12 @@ defmodule FrontmanServerWeb.TaskChannelTest do } # Trigger a transient error so retry state is created - Phoenix.PubSub.broadcast(FrontmanServer.PubSub, Tasks.topic(task_id), swarm_failed(error)) + Phoenix.PubSub.broadcast( + FrontmanServer.PubSub, + Tasks.topic(task_id), + execution_failed(error) + ) + :sys.get_state(socket.channel_pid) assert_push("acp:message", %{ @@ -1536,7 +1603,11 @@ defmodule FrontmanServerWeb.TaskChannelTest do retryable: true } - Phoenix.PubSub.broadcast(FrontmanServer.PubSub, Tasks.topic(task_id), swarm_failed(error)) + Phoenix.PubSub.broadcast( + FrontmanServer.PubSub, + Tasks.topic(task_id), + execution_failed(error) + ) # BUG: RetryCoordinator.schedule_retry/1 omits error_info.category from the # :retrying_status tuple. The channel's handle_info handler cannot forward @@ -1572,7 +1643,12 @@ defmodule FrontmanServerWeb.TaskChannelTest do } # Trigger a transient error — coordinator starts, enters countdown - Phoenix.PubSub.broadcast(FrontmanServer.PubSub, Tasks.topic(task_id), swarm_failed(error)) + Phoenix.PubSub.broadcast( + FrontmanServer.PubSub, + Tasks.topic(task_id), + execution_failed(error) + ) + :sys.get_state(socket.channel_pid) assert_push("acp:message", %{ @@ -1618,7 +1694,12 @@ defmodule FrontmanServerWeb.TaskChannelTest do retryable: true } - Phoenix.PubSub.broadcast(FrontmanServer.PubSub, Tasks.topic(task_id), swarm_failed(error)) + Phoenix.PubSub.broadcast( + FrontmanServer.PubSub, + Tasks.topic(task_id), + execution_failed(error) + ) + :sys.get_state(socket.channel_pid) assert_push("acp:message", %{ @@ -1656,7 +1737,12 @@ defmodule FrontmanServerWeb.TaskChannelTest do } # Trigger transient error — retry state created, timer scheduled - Phoenix.PubSub.broadcast(FrontmanServer.PubSub, Tasks.topic(task_id), swarm_failed(error)) + Phoenix.PubSub.broadcast( + FrontmanServer.PubSub, + Tasks.topic(task_id), + execution_failed(error) + ) + :sys.get_state(socket.channel_pid) assert_push("acp:message", %{ @@ -1713,7 +1799,12 @@ defmodule FrontmanServerWeb.TaskChannelTest do } # Trigger a transient error — coordinator starts - Phoenix.PubSub.broadcast(FrontmanServer.PubSub, Tasks.topic(task_id), swarm_failed(error)) + Phoenix.PubSub.broadcast( + FrontmanServer.PubSub, + Tasks.topic(task_id), + execution_failed(error) + ) + :sys.get_state(socket.channel_pid) assert_push("acp:message", %{ @@ -1760,7 +1851,12 @@ defmodule FrontmanServerWeb.TaskChannelTest do } # First: transient error → coordinator starts - Phoenix.PubSub.broadcast(FrontmanServer.PubSub, Tasks.topic(task_id), swarm_failed(error)) + Phoenix.PubSub.broadcast( + FrontmanServer.PubSub, + Tasks.topic(task_id), + execution_failed(error) + ) + :sys.get_state(socket.channel_pid) assert_push("acp:message", %{ @@ -1781,7 +1877,12 @@ defmodule FrontmanServerWeb.TaskChannelTest do flush_mailbox() # Now a new transient error arrives — should start a FRESH coordinator at attempt 1 - Phoenix.PubSub.broadcast(FrontmanServer.PubSub, Tasks.topic(task_id), swarm_failed(error)) + Phoenix.PubSub.broadcast( + FrontmanServer.PubSub, + Tasks.topic(task_id), + execution_failed(error) + ) + :sys.get_state(socket.channel_pid) assert_push("acp:message", %{ @@ -1817,7 +1918,7 @@ defmodule FrontmanServerWeb.TaskChannelTest do Phoenix.PubSub.broadcast( FrontmanServer.PubSub, Tasks.topic(task_id), - swarm_failed(retryable_error) + execution_failed(retryable_error) ) :sys.get_state(socket.channel_pid) @@ -1833,7 +1934,7 @@ defmodule FrontmanServerWeb.TaskChannelTest do Phoenix.PubSub.broadcast( FrontmanServer.PubSub, Tasks.topic(task_id), - swarm_failed(non_retryable_error) + execution_failed(non_retryable_error) ) :sys.get_state(socket.channel_pid) @@ -1854,7 +1955,7 @@ defmodule FrontmanServerWeb.TaskChannelTest do Phoenix.PubSub.broadcast( FrontmanServer.PubSub, Tasks.topic(task_id), - swarm_failed(retryable_error) + execution_failed(retryable_error) ) :sys.get_state(socket.channel_pid) diff --git a/apps/frontman_server/test/frontman_server_web/channels/tasks_channel_test.exs b/apps/frontman_server/test/frontman_server_web/channels/tasks_channel_test.exs index ca3b071b..824b593d 100644 --- a/apps/frontman_server/test/frontman_server_web/channels/tasks_channel_test.exs +++ b/apps/frontman_server/test/frontman_server_web/channels/tasks_channel_test.exs @@ -448,7 +448,6 @@ defmodule FrontmanServerWeb.TasksChannelTest do end describe "ACP session/load" do - # Shared sandbox mode because submit_user_message can spawn agent Tasks needing DB access @describetag shared_sandbox: true setup %{scope: scope} do @@ -471,21 +470,15 @@ defmodule FrontmanServerWeb.TasksChannelTest do scope: scope, task_id: task_id } do - FrontmanServer.Tasks.submit_user_message( - scope, - task_id, - [%{"type" => "text", "text" => "Hello"}], - [], - agent: %FrontmanServer.Testing.BlockingAgent{} - ) + # Persist messages without starting execution — this test is about + # session/load history streaming, not the agent loop. + FrontmanServer.Tasks.add_user_message(scope, task_id, [ + %{"type" => "text", "text" => "Hello"} + ]) - FrontmanServer.Tasks.submit_user_message( - scope, - task_id, - [%{"type" => "text", "text" => "World"}], - [], - agent: %FrontmanServer.Testing.BlockingAgent{} - ) + FrontmanServer.Tasks.add_user_message(scope, task_id, [ + %{"type" => "text", "text" => "World"} + ]) push(socket, "acp:message", acp_request(1, "session/load", %{"sessionId" => task_id})) @@ -558,13 +551,9 @@ defmodule FrontmanServerWeb.TasksChannelTest do end test "streams mixed history in order", %{socket: socket, scope: scope, task_id: task_id} do - FrontmanServer.Tasks.submit_user_message( - scope, - task_id, - [%{"type" => "text", "text" => "Question"}], - [], - agent: %FrontmanServer.Testing.BlockingAgent{} - ) + FrontmanServer.Tasks.add_user_message(scope, task_id, [ + %{"type" => "text", "text" => "Question"} + ]) FrontmanServer.Tasks.add_agent_response(scope, task_id, "Answer", %{})