Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 108 additions & 72 deletions python/valuecell/agents/strategy_agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,91 @@
class StrategyAgent(BaseAgent):
"""Top-level Strategy Agent integrating the decision coordinator."""

async def _wait_until_marked_running(
self, strategy_id: str, timeout_s: int = 300
) -> None:
"""Wait until persistence marks the strategy as running or timeout.

This helper logs progress and returns when either the strategy is running
or the timeout elapses. It swallows exceptions from the persistence layer
to avoid bubbling nested try/except into `stream`.
"""
since = datetime.now()
try:
while not strategy_persistence.strategy_running(strategy_id):
if (datetime.now() - since).total_seconds() > timeout_s:
logger.error(
"Timeout waiting for strategy_id={} to be marked as running",
strategy_id,
)
break

await asyncio.sleep(1)
logger.info(
"Waiting for strategy_id={} to be marked as running", strategy_id
)
except Exception:
# Avoid raising from persistence checks; we still proceed to start the runtime.
logger.exception(
"Error while waiting for strategy {} to be marked running", strategy_id
)

def _persist_initial_state(self, runtime, strategy_id: str) -> None:
"""Persist initial portfolio snapshot and strategy summary.

This helper captures and logs any errors internally so callers don't need
additional try/except nesting.
"""
try:
initial_portfolio = runtime.coordinator._portfolio_service.get_view()
try:
initial_portfolio.strategy_id = strategy_id
except Exception:
pass

ok = strategy_persistence.persist_portfolio_view(initial_portfolio)
if ok:
logger.info(
"Persisted initial portfolio view for strategy={}", strategy_id
)

timestamp_ms = int(runtime.coordinator._clock().timestamp() * 1000)
initial_summary = runtime.coordinator._build_summary(timestamp_ms, [])
ok = strategy_persistence.persist_strategy_summary(initial_summary)
if ok:
logger.info(
"Persisted initial strategy summary for strategy={}", strategy_id
)
except Exception:
logger.exception(
"Failed to persist initial portfolio/summary for {}", strategy_id
)

def _persist_cycle_results(self, strategy_id: str, result) -> None:
"""Persist trades, portfolio view and strategy summary for a cycle.

Errors are logged but not raised to keep the decision loop resilient.
"""
try:
for trade in result.trades:
item = strategy_persistence.persist_trade_history(strategy_id, trade)
if item:
logger.info(
"Persisted trade {} for strategy={}",
getattr(trade, "trade_id", None),
strategy_id,
)

ok = strategy_persistence.persist_portfolio_view(result.portfolio_view)
if ok:
logger.info("Persisted portfolio view for strategy={}", strategy_id)

ok = strategy_persistence.persist_strategy_summary(result.strategy_summary)
if ok:
logger.info("Persisted strategy summary for strategy={}", strategy_id)
except Exception:
logger.exception("Error persisting cycle results for {}", strategy_id)

async def stream(
self,
query: str,
Expand Down Expand Up @@ -55,53 +140,12 @@ async def stream(
)

# Wait until strategy is marked as running in persistence layer
since = datetime.now()
while not strategy_persistence.strategy_running(strategy_id):
if (datetime.now() - since).total_seconds() > 300:
logger.error(
"Timeout waiting for strategy_id={} to be marked as running",
strategy_id,
)
break

await asyncio.sleep(1)
logger.info(
"Waiting for strategy_id={} to be marked as running", strategy_id
)
await self._wait_until_marked_running(strategy_id)

try:
logger.info("Starting decision loop for strategy_id={}", strategy_id)
# Persist initial portfolio snapshot and strategy summary before entering the loop
try:
# Get current portfolio view from the coordinator's portfolio service
initial_portfolio = runtime.coordinator._portfolio_service.get_view()
# ensure strategy_id present on the view
try:
initial_portfolio.strategy_id = strategy_id
except Exception:
pass

ok = strategy_persistence.persist_portfolio_view(initial_portfolio)
if ok:
logger.info(
"Persisted initial portfolio view for strategy={}",
strategy_id,
)

# Build and persist an initial strategy summary (no trades yet)
timestamp_ms = int(runtime.coordinator._clock().timestamp() * 1000)
initial_summary = runtime.coordinator._build_summary(timestamp_ms, [])
ok = strategy_persistence.persist_strategy_summary(initial_summary)
if ok:
logger.info(
"Persisted initial strategy summary for strategy={}",
strategy_id,
)
except Exception:
logger.exception(
"Failed to persist initial portfolio/summary for {}",
strategy_id,
)
self._persist_initial_state(runtime, strategy_id)
while True:
if not strategy_persistence.strategy_running(strategy_id):
logger.info(
Expand All @@ -116,35 +160,8 @@ async def stream(
strategy_id,
len(result.trades),
)
# Persist and stream trades
for trade in result.trades:
item = strategy_persistence.persist_trade_history(
strategy_id, trade
)
if item:
logger.info(
"Persisted trade {} for strategy={}",
getattr(trade, "trade_id", None),
strategy_id,
)

# Persist portfolio snapshot (positions)
ok = strategy_persistence.persist_portfolio_view(result.portfolio_view)
if ok:
logger.info(
"Persisted portfolio view for strategy={}",
strategy_id,
)

# Persist strategy summary
ok = strategy_persistence.persist_strategy_summary(
result.strategy_summary
)
if ok:
logger.info(
"Persisted strategy summary for strategy={}",
strategy_id,
)
# Persist and stream cycle results (trades, portfolio, summary)
self._persist_cycle_results(strategy_id, result)

logger.info(
"Waiting for next decision cycle for strategy_id={}, interval={}seconds",
Expand All @@ -154,9 +171,28 @@ async def stream(
await asyncio.sleep(request.trading_config.decide_interval)

except asyncio.CancelledError:
# Ensure strategy is marked stopped on cancellation
try:
strategy_persistence.mark_strategy_stopped(strategy_id)
logger.info(
"Marked strategy {} as stopped due to cancellation", strategy_id
)
except Exception:
logger.exception(
"Failed to mark strategy stopped for {} on cancellation",
strategy_id,
)
raise
except Exception as err: # noqa: BLE001
logger.exception("StrategyAgent stream failed: {}", err)
yield streaming.message_chunk(f"StrategyAgent error: {err}")
finally:
# Always mark strategy as stopped when stream ends for any reason
try:
strategy_persistence.mark_strategy_stopped(strategy_id)
logger.info("Marked strategy {} as stopped in finalizer", strategy_id)
except Exception:
logger.exception(
"Failed to mark strategy stopped for {} in finalizer", strategy_id
)
yield streaming.done()
11 changes: 5 additions & 6 deletions python/valuecell/agents/strategy_agent/decision/composer.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,18 @@ async def compose(self, context: ComposeContext) -> List[TradeInstruction]:
)
try:
plan = await self._call_llm(prompt)
if not plan.items:
logger.error(
"LLM returned empty plan for compose_id={}", context.compose_id
)
return []
except ValidationError as exc:
logger.error("LLM output failed validation: {}", exc)
return []
except Exception: # noqa: BLE001
logger.exception("LLM invocation failed")
return []

if not plan.items:
logger.debug(
"LLM returned empty plan for compose_id={}", context.compose_id
)
return []

return self._normalize_plan(context, plan)

# ------------------------------------------------------------------
Expand Down
22 changes: 22 additions & 0 deletions python/valuecell/server/services/strategy_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,25 @@ def strategy_running(strategy_id: str) -> bool:
except Exception:
logger.exception("strategy_running check failed for {}", strategy_id)
return False


def set_strategy_status(strategy_id: str, status: str) -> bool:
"""Set the status field for a strategy (convenience wrapper around upsert)."""
repo = get_strategy_repository()
try:
updated = repo.upsert_strategy(strategy_id, status=status)
return updated is not None
except Exception:
logger.exception("set_strategy_status failed for {}", strategy_id)
return False


def mark_strategy_stopped(strategy_id: str) -> bool:
"""Mark a strategy as stopped."""
try:
return set_strategy_status(
strategy_id, agent_models.StrategyStatus.STOPPED.value
)
except Exception:
logger.exception("mark_strategy_stopped failed for {}", strategy_id)
return False