Skip to content
Open
34 changes: 27 additions & 7 deletions apps/frontman_server/lib/frontman_server/tasks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -254,19 +254,39 @@ defmodule FrontmanServer.Tasks do

This is the primary "user turn" use case — recording what the user said
and kicking off the agent loop. If an execution is already running, the
message is persisted but no new run is started.
prompt is rejected entirely (nothing persisted).
"""
@spec submit_user_message(Scope.t(), String.t(), list(), list(), keyword()) ::
{:ok, Interaction.UserMessage.t()} | {:error, :not_found}
{:ok, Interaction.UserMessage.t()} | {:error, :already_running} | {:error, :not_found}
def submit_user_message(%Scope{} = scope, task_id, content_blocks, tools, opts \\ []) do
with {:ok, schema} <- get_task_by_id(scope, task_id),
interaction = Interaction.UserMessage.new(content_blocks),
{:ok, interaction} <- append_interaction(schema, interaction) do
with :ok <- guard_not_running(scope, task_id),
{:ok, interaction} <- add_user_message(scope, task_id, content_blocks) do
opts = Keyword.put(opts, :interaction_id, interaction.id)
maybe_start_execution(scope, task_id, tools, opts)
{:ok, interaction}
end
end

defp guard_not_running(scope, task_id) do
if Execution.running?(scope, task_id), do: {:error, :already_running}, else: :ok
end

@doc """
Persists a user message without starting execution.

Use this when you need to record a user message in the conversation history
but don't want to trigger the agent loop (e.g., populating history for tests
or replaying messages).
"""
@spec add_user_message(Scope.t(), String.t(), list()) ::
{:ok, Interaction.UserMessage.t()} | {:error, :not_found}
def add_user_message(%Scope{} = scope, task_id, content_blocks) do
with {:ok, schema} <- get_task_by_id(scope, task_id) do
interaction = Interaction.UserMessage.new(content_blocks)
append_interaction(schema, interaction)
end
end

@doc """
Creates and appends an AgentResponse interaction.
"""
Expand Down Expand Up @@ -409,10 +429,10 @@ defmodule FrontmanServer.Tasks do
Starts an execution if none is already running for this task.
Fetches the task and delegates to Execution.run.
"""
@spec maybe_start_execution(Scope.t(), String.t(), list(), keyword()) :: :ok
@spec maybe_start_execution(Scope.t(), String.t(), list(), keyword()) :: :ok | :already_running
def maybe_start_execution(scope, task_id, tools, opts) do
if Execution.running?(scope, task_id) do
:ok
:already_running
else
{:ok, task} = get_task(scope, task_id)

Expand Down
110 changes: 12 additions & 98 deletions apps/frontman_server/lib/frontman_server/tasks/execution.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ defmodule FrontmanServer.Tasks.Execution do
alias FrontmanServer.Observability.TelemetryEvents
alias FrontmanServer.Providers
alias FrontmanServer.Providers.{Model, Registry, ResolvedKey}
alias FrontmanServer.Tasks.Execution.{Framework, LLMError, RootAgent, ToolExecutor}
alias FrontmanServer.Tasks.{Interaction, StreamStallTimeout, Task}
alias FrontmanServer.Tasks.Execution.{Framework, RootAgent, ToolExecutor}
alias FrontmanServer.Tasks.{Interaction, Task}
alias FrontmanServer.Tools
alias SwarmAi.Message

Expand Down Expand Up @@ -95,7 +95,8 @@ defmodule FrontmanServer.Tasks.Execution do
submit_to_runtime(scope, agent, task_id, messages,
api_key_info: api_key_info,
mcp_tool_defs: mcp_tool_defs,
backend_tool_modules: backend_tool_modules
backend_tool_modules: backend_tool_modules,
interaction_id: Keyword.get(opts, :interaction_id)
)

{:error, reason} ->
Expand Down Expand Up @@ -124,88 +125,6 @@ defmodule FrontmanServer.Tasks.Execution do
end
end

# --- Event Handling ---

@doc """
Translates a SwarmAi event to a transport-level action for the channel.

Called by the TaskChannel from `handle_info({:swarm_event, event})`.

**Persistence is handled by SwarmDispatcher** (runs in the Runtime Task
process). This function only determines what the channel should push to
the client — if the channel is dead, no data is lost.
"""
@spec handle_swarm_event(Scope.t(), String.t(), term()) :: term()
def handle_swarm_event(_scope, _task_id, {:response, _response}), do: :ok

def handle_swarm_event(_scope, _task_id, {:completed, {:ok, _result, _loop_id}}),
do: :agent_completed

def handle_swarm_event(_scope, _task_id, {:failed, {:error, reason, _loop_id}}) do
{msg, category, retryable} = classify_error(reason)
{:agent_error, %{message: msg, category: category, retryable: retryable}}
end

def handle_swarm_event(_scope, _task_id, {:crashed, %{reason: reason}}) do
msg = humanize_crash(reason)
{:agent_error, %{message: msg, category: "unknown", retryable: false}}
end

def handle_swarm_event(_scope, _task_id, {:cancelled, _}),
do: :agent_cancelled

def handle_swarm_event(_scope, _task_id, {:terminated, _}),
do: :agent_cancelled

# Paused — ToolResult and AgentPaused already persisted by SwarmDispatcher.
# Return :agent_paused so the channel can resolve the pending prompt and notify the client.
def handle_swarm_event(_scope, _task_id, {:paused, _}), do: :agent_paused

# Tool calls are persisted by ToolExecutor; no channel action needed.
def handle_swarm_event(_scope, _task_id, {:tool_call, _}), do: :ok

@doc """
Classifies an error reason into `{message, category, retryable}`.

`category` is one of: "auth", "billing", "rate_limit", "overload",
"payload_too_large", "output_truncated", "unknown".
"""
@spec classify_error(term()) :: {String.t(), String.t(), boolean()}
def classify_error(%LLMError{message: msg, category: cat, retryable: r}), do: {msg, cat, r}

def classify_error(%StreamStallTimeout.Error{}) do
{"The AI provider stopped responding mid-reply. " <>
"This usually happens when the provider is temporarily overloaded. " <>
"Try sending your message again.", "overload", true}
end

def classify_error(:genserver_call_timeout) do
{"The request to the AI provider timed out. " <>
"This can happen during high traffic. Try again in a moment.", "overload", true}
end

def classify_error(:stream_timeout) do
{"The request to the AI provider timed out. " <>
"This can happen during high traffic. Try again in a moment.", "overload", true}
end

def classify_error(:output_truncated) do
{"The AI response was too long and got cut off. " <>
"This usually happens when writing large files. " <>
"Try asking the AI to write the file in smaller sections.", "output_truncated", false}
end

def classify_error({:exit, reason}) do
{"Something went wrong while communicating with the AI provider: #{inspect(reason)}",
"unknown", false}
end

def classify_error(reason) when is_exception(reason),
do: {Exception.message(reason), "unknown", false}

def classify_error(reason) when is_binary(reason), do: {reason, "unknown", false}
def classify_error(reason), do: {inspect(reason), "unknown", false}

# --- Private ---

# Dialyzer warning suppressed: protocol dispatch on Agent can't be statically proven.
Expand Down Expand Up @@ -233,8 +152,15 @@ defmodule FrontmanServer.Tasks.Execution do
# in event handlers — the agent may complete before this line returns.
TelemetryEvents.task_start(task_id)

interaction_id = Keyword.get(opts, :interaction_id)

case SwarmAi.Runtime.run(FrontmanServer.AgentRuntime, task_id, agent, messages,
metadata: %{task_id: task_id, resolved_key: resolved_key, scope: scope},
metadata: %{
task_id: task_id,
resolved_key: resolved_key,
scope: scope,
interaction_id: interaction_id
},
tool_executor: tool_executor
) do
{:ok, pid} ->
Expand Down Expand Up @@ -418,16 +344,4 @@ defmodule FrontmanServer.Tasks.Execution do

def error_message(%Scope{}, reason),
do: inspect(reason)

# Translates internal error reasons into user-friendly messages.
# Delegates to classify_error/1 to keep message strings in one place.
defp humanize_error(reason) do
{message, _category, _retryable} = classify_error(reason)
message
end

# Like humanize_error, but prefixes unknown/fallback reasons with crash context.
defp humanize_crash(reason) when is_exception(reason), do: humanize_error(reason)
defp humanize_crash(reason) when is_atom(reason), do: "Execution crashed: #{inspect(reason)}"
defp humanize_crash(reason), do: "Execution crashed: #{humanize_error(reason)}"
end
112 changes: 112 additions & 0 deletions apps/frontman_server/lib/frontman_server/tasks/execution_event.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
defmodule FrontmanServer.Tasks.ExecutionEvent do
@moduledoc """
Domain event emitted during task execution.

Wraps raw SwarmAi runtime events with causation context — which user
interaction triggered this execution. The SwarmDispatcher acts as an
Anti-Corruption Layer, translating infrastructure events into these
domain events before broadcasting on PubSub.
"""

alias FrontmanServer.Tasks.Execution.LLMError
alias FrontmanServer.Tasks.StreamStallTimeout

@type interaction_id :: String.t()

@type event_type ::
:chunk
| :response
| :tool_call
| :completed
| :failed
| :crashed
| :cancelled
| :terminated
| :paused

@enforce_keys [:type]
defstruct [:type, :payload, :caused_by]

@type t :: %__MODULE__{
type: event_type(),
payload: term(),
caused_by: interaction_id() | nil
}

@doc """
Classifies an execution event into a channel action.

Persistence is handled by SwarmDispatcher — this function only determines
what the channel should do in response.
"""
@spec classify(t()) :: term()
def classify(%__MODULE__{type: :response}), do: :ok
def classify(%__MODULE__{type: :completed}), do: :agent_completed
def classify(%__MODULE__{type: :cancelled}), do: :agent_cancelled
def classify(%__MODULE__{type: :terminated}), do: :agent_cancelled
def classify(%__MODULE__{type: :paused}), do: :agent_paused
def classify(%__MODULE__{type: :tool_call}), do: :ok

def classify(%__MODULE__{type: :failed, payload: {:error, reason, _loop_id}}) do
{msg, category, retryable} = classify_error(reason)
{:agent_error, %{message: msg, category: category, retryable: retryable}}
end

def classify(%__MODULE__{type: :crashed, payload: %{reason: reason}}) do
msg = humanize_crash(reason)
{:agent_error, %{message: msg, category: "unknown", retryable: false}}
end

@doc """
Classifies an error reason into `{message, category, retryable}`.

`category` is one of: "auth", "billing", "rate_limit", "overload",
"payload_too_large", "output_truncated", "unknown".
"""
@spec classify_error(term()) :: {String.t(), String.t(), boolean()}
def classify_error(%LLMError{message: msg, category: cat, retryable: r}), do: {msg, cat, r}

def classify_error(%StreamStallTimeout.Error{}) do
{"The AI provider stopped responding mid-reply. " <>
"This usually happens when the provider is temporarily overloaded. " <>
"Try sending your message again.", "overload", true}
end

def classify_error(:genserver_call_timeout) do
{"The request to the AI provider timed out. " <>
"This can happen during high traffic. Try again in a moment.", "overload", true}
end

def classify_error(:stream_timeout) do
{"The request to the AI provider timed out. " <>
"This can happen during high traffic. Try again in a moment.", "overload", true}
end

def classify_error(:output_truncated) do
{"The AI response was too long and got cut off. " <>
"This usually happens when writing large files. " <>
"Try asking the AI to write the file in smaller sections.", "output_truncated", false}
end

def classify_error({:exit, reason}) do
{"Something went wrong while communicating with the AI provider: #{inspect(reason)}",
"unknown", false}
end

def classify_error(reason) when is_exception(reason),
do: {Exception.message(reason), "unknown", false}

def classify_error(reason) when is_binary(reason), do: {reason, "unknown", false}
def classify_error(reason), do: {inspect(reason), "unknown", false}

# Translates internal error reasons into user-friendly messages.
defp humanize_error(reason) do
{message, _category, _retryable} = classify_error(reason)
message
end

# Like humanize_error, but prefixes unknown/fallback reasons with crash context.
defp humanize_crash(reason) when is_exception(reason), do: humanize_error(reason)
defp humanize_crash(reason) when is_atom(reason), do: "Execution crashed: #{inspect(reason)}"
defp humanize_crash(reason), do: "Execution crashed: #{humanize_error(reason)}"
end
16 changes: 12 additions & 4 deletions apps/frontman_server/lib/frontman_server/tasks/swarm_dispatcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ defmodule FrontmanServer.Tasks.SwarmDispatcher do
alias FrontmanServer.Observability.TelemetryEvents
alias FrontmanServer.Providers
alias FrontmanServer.Tasks
alias FrontmanServer.Tasks.Execution
alias FrontmanServer.Tasks.ExecutionEvent

def dispatch(key, event, metadata) do
scope = Map.get(metadata, :scope)
Expand All @@ -34,9 +34,17 @@ defmodule FrontmanServer.Tasks.SwarmDispatcher do
# 1. Persist (runs in the Runtime Task process — channel-independent)
persist(scope, task_id, event, metadata)

# 2. Broadcast for real-time UI (ephemeral, OK to lose if channel is dead)
# 2. Broadcast as a domain event (ACL boundary).
topic = Tasks.topic(task_id)
Phoenix.PubSub.broadcast(FrontmanServer.PubSub, topic, {:swarm_event, event})
{type, payload} = event

domain_event = %ExecutionEvent{
type: type,
payload: payload,
caused_by: metadata[:interaction_id]
}

Phoenix.PubSub.broadcast(FrontmanServer.PubSub, topic, {:execution_event, domain_event})
end

# --- Persistence ---
Expand Down Expand Up @@ -64,7 +72,7 @@ defmodule FrontmanServer.Tasks.SwarmDispatcher do

# Agent turn failed (LLM error, tool error, etc.)
defp persist(%Scope{} = scope, task_id, {:failed, {:error, reason, loop_id}}, _metadata) do
{reason_str, category, retryable} = Execution.classify_error(reason)
{reason_str, category, retryable} = ExecutionEvent.classify_error(reason)

Logger.error(
"Execution failed for task #{task_id}, loop_id: #{loop_id}, reason: #{reason_str}"
Expand Down
Loading
Loading