Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions python/valuecell/core/coordinate/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,11 @@ async def _monitor_planning_task(

# Planning completed, execute plan
plan = await planning_task

# Set conversation title once if not set yet and a task title is available
if getattr(plan, "tasks", None):
first_title = getattr(plan.tasks[0], "title", None)
await self._maybe_set_conversation_title(conversation_id, first_title)
async for response in self.task_executor.execute_plan(plan, thread_id):
yield response

Expand Down Expand Up @@ -454,9 +459,41 @@ async def _continue_planning(
plan = await planning_task
del self._execution_contexts[conversation_id]

# If this conversation was just created (tracked in context), set its title once.
if getattr(plan, "tasks", None):
first_title = getattr(plan.tasks[0], "title", None)
await self._maybe_set_conversation_title(conversation_id, first_title)

async for response in self.task_executor.execute_plan(plan, thread_id):
yield response

async def _maybe_set_conversation_title(
self, conversation_id: str, title: Optional[str]
):
"""Set conversation title once after creation when a task title is available.

Only sets the title if:
- title is provided and non-empty
- the conversation exists and currently has no title
"""
try:
if not title or not str(title).strip():
return
conversation = await self.conversation_service.get_conversation(
conversation_id
)
if not conversation:
return
# Avoid overwriting any existing title
if conversation.title:
return
conversation.title = str(title).strip()
# Persist via manager to avoid expanding ConversationService API
await self.conversation_service.manager.update_conversation(conversation)
except Exception:
# Title setting is best-effort; failures shouldn't break flow
logger.exception(f"Failed to set conversation title for {conversation_id}")

async def _cancel_execution(self, conversation_id: str):
"""Cancel and clean up any execution resources associated with a
conversation.
Expand Down
104 changes: 102 additions & 2 deletions python/valuecell/core/coordinate/tests/test_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def _sample_task(conversation_id: str, user_id: str, sample_query: str) -> Task:
user_id=user_id,
agent_name="TestAgent",
query=sample_query,
title="Auto Title",
status=CoreTaskStatus.PENDING,
remote_task_ids=[],
)
Expand All @@ -101,9 +102,11 @@ def _sample_plan(
)


def _stub_conversation(status: Any = ConversationStatus.ACTIVE):
def _stub_conversation(
status: Any = ConversationStatus.ACTIVE, title: str | None = None
):
# Minimal conversation stub with status and basic methods used by orchestrator
s = SimpleNamespace(status=status)
s = SimpleNamespace(status=status, title=title)

def activate():
s.status = ConversationStatus.ACTIVE
Expand Down Expand Up @@ -331,6 +334,103 @@ async def test_happy_path_streaming(
assert len(out) >= 1


@pytest.mark.asyncio
async def test_sets_conversation_title_on_first_plan(
orchestrator: AgentOrchestrator,
mock_agent_client: Mock,
mock_agent_card_non_streaming: AgentCard,
sample_user_input: UserInput,
mock_conversation_manager: Mock,
):
# Non-streaming to complete quickly
bundle = orchestrator._testing_bundle # type: ignore[attr-defined]
bundle.agent_connections.start_agent.return_value = mock_agent_card_non_streaming
bundle.agent_connections.get_client.return_value = mock_agent_client

# Agent returns a quick completion
mock_agent_client.send_message.return_value = _make_non_streaming_response()

# Ensure conversation initially has no title
conv = _stub_conversation(title=None)
mock_conversation_manager.get_conversation.return_value = conv

# Run once
out = []
async for chunk in orchestrator.process_user_input(sample_user_input):
out.append(chunk)

# After planning, title should be set from first task title (fixture: "Auto Title")
called_with_titles = [
getattr(c.args[0], "title", None)
for c in mock_conversation_manager.update_conversation.call_args_list
if c.args
]
assert any(t == "Auto Title" for t in called_with_titles)


@pytest.mark.asyncio
async def test_does_not_override_existing_title(
orchestrator: AgentOrchestrator,
mock_agent_client: Mock,
mock_agent_card_non_streaming: AgentCard,
sample_user_input: UserInput,
mock_conversation_manager: Mock,
):
bundle = orchestrator._testing_bundle # type: ignore[attr-defined]
bundle.agent_connections.start_agent.return_value = mock_agent_card_non_streaming
bundle.agent_connections.get_client.return_value = mock_agent_client
mock_agent_client.send_message.return_value = _make_non_streaming_response()

# Existing title should remain unchanged
conv = _stub_conversation(title="Existing Title")
mock_conversation_manager.get_conversation.return_value = conv

out = []
async for chunk in orchestrator.process_user_input(sample_user_input):
out.append(chunk)

# Conversation object must still have existing title
assert conv.title == "Existing Title"


@pytest.mark.asyncio
async def test_no_title_set_when_no_tasks(
orchestrator: AgentOrchestrator,
mock_agent_client: Mock,
mock_agent_card_non_streaming: AgentCard,
sample_user_input: UserInput,
mock_conversation_manager: Mock,
monkeypatch: pytest.MonkeyPatch,
conversation_id: str,
user_id: str,
):
bundle = orchestrator._testing_bundle # type: ignore[attr-defined]
bundle.agent_connections.start_agent.return_value = mock_agent_card_non_streaming
bundle.agent_connections.get_client.return_value = mock_agent_client
mock_agent_client.send_message.return_value = _make_non_streaming_response()

# Planner returns a plan with no tasks
empty_plan = ExecutionPlan(
plan_id="plan-empty",
conversation_id=conversation_id,
user_id=user_id,
orig_query="q",
tasks=[],
created_at="2025-09-16T10:00:00",
)
orchestrator.plan_service.planner.create_plan = AsyncMock(return_value=empty_plan)

conv = _stub_conversation(title=None)
mock_conversation_manager.get_conversation.return_value = conv

out = []
async for chunk in orchestrator.process_user_input(sample_user_input):
out.append(chunk)

# Title should remain None
assert conv.title is None


@pytest.mark.asyncio
async def test_happy_path_non_streaming(
orchestrator: AgentOrchestrator,
Expand Down
1 change: 1 addition & 0 deletions python/valuecell/core/task/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ async def emit_subagent_end_once() -> Optional[BaseResponse]:
user_id=plan.user_id,
conversation_id=task.conversation_id,
agent_name=task.agent_name,
title=task.title,
)

# Emit subagent conversation start component
Expand Down
6 changes: 5 additions & 1 deletion python/valuecell/core/task/tests/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ def __init__(self) -> None:
self.calls: list[tuple[str, str]] = []

async def ensure_conversation(
self, user_id: str, conversation_id: str, agent_name: str
self,
user_id: str,
conversation_id: str,
agent_name: str,
title: str | None = None,
):
self.calls.append((user_id, conversation_id))

Expand Down