diff --git a/python/valuecell/agents/strategy_agent/agent.py b/python/valuecell/agents/strategy_agent/agent.py index 8e5406d69..7687b816d 100644 --- a/python/valuecell/agents/strategy_agent/agent.py +++ b/python/valuecell/agents/strategy_agent/agent.py @@ -22,6 +22,91 @@ class StrategyAgent(BaseAgent): """Top-level Strategy Agent integrating the decision coordinator.""" + async def _wait_until_marked_running( + self, strategy_id: str, timeout_s: int = 300 + ) -> None: + """Wait until persistence marks the strategy as running or timeout. + + This helper logs progress and returns when either the strategy is running + or the timeout elapses. It swallows exceptions from the persistence layer + to avoid bubbling nested try/except into `stream`. + """ + since = datetime.now() + try: + while not strategy_persistence.strategy_running(strategy_id): + if (datetime.now() - since).total_seconds() > timeout_s: + logger.error( + "Timeout waiting for strategy_id={} to be marked as running", + strategy_id, + ) + break + + await asyncio.sleep(1) + logger.info( + "Waiting for strategy_id={} to be marked as running", strategy_id + ) + except Exception: + # Avoid raising from persistence checks; we still proceed to start the runtime. + logger.exception( + "Error while waiting for strategy {} to be marked running", strategy_id + ) + + def _persist_initial_state(self, runtime, strategy_id: str) -> None: + """Persist initial portfolio snapshot and strategy summary. + + This helper captures and logs any errors internally so callers don't need + additional try/except nesting. + """ + try: + initial_portfolio = runtime.coordinator._portfolio_service.get_view() + try: + initial_portfolio.strategy_id = strategy_id + except Exception: + pass + + ok = strategy_persistence.persist_portfolio_view(initial_portfolio) + if ok: + logger.info( + "Persisted initial portfolio view for strategy={}", strategy_id + ) + + timestamp_ms = int(runtime.coordinator._clock().timestamp() * 1000) + initial_summary = runtime.coordinator._build_summary(timestamp_ms, []) + ok = strategy_persistence.persist_strategy_summary(initial_summary) + if ok: + logger.info( + "Persisted initial strategy summary for strategy={}", strategy_id + ) + except Exception: + logger.exception( + "Failed to persist initial portfolio/summary for {}", strategy_id + ) + + def _persist_cycle_results(self, strategy_id: str, result) -> None: + """Persist trades, portfolio view and strategy summary for a cycle. + + Errors are logged but not raised to keep the decision loop resilient. + """ + try: + for trade in result.trades: + item = strategy_persistence.persist_trade_history(strategy_id, trade) + if item: + logger.info( + "Persisted trade {} for strategy={}", + getattr(trade, "trade_id", None), + strategy_id, + ) + + ok = strategy_persistence.persist_portfolio_view(result.portfolio_view) + if ok: + logger.info("Persisted portfolio view for strategy={}", strategy_id) + + ok = strategy_persistence.persist_strategy_summary(result.strategy_summary) + if ok: + logger.info("Persisted strategy summary for strategy={}", strategy_id) + except Exception: + logger.exception("Error persisting cycle results for {}", strategy_id) + async def stream( self, query: str, @@ -55,53 +140,12 @@ async def stream( ) # Wait until strategy is marked as running in persistence layer - since = datetime.now() - while not strategy_persistence.strategy_running(strategy_id): - if (datetime.now() - since).total_seconds() > 300: - logger.error( - "Timeout waiting for strategy_id={} to be marked as running", - strategy_id, - ) - break - - await asyncio.sleep(1) - logger.info( - "Waiting for strategy_id={} to be marked as running", strategy_id - ) + await self._wait_until_marked_running(strategy_id) try: logger.info("Starting decision loop for strategy_id={}", strategy_id) # Persist initial portfolio snapshot and strategy summary before entering the loop - try: - # Get current portfolio view from the coordinator's portfolio service - initial_portfolio = runtime.coordinator._portfolio_service.get_view() - # ensure strategy_id present on the view - try: - initial_portfolio.strategy_id = strategy_id - except Exception: - pass - - ok = strategy_persistence.persist_portfolio_view(initial_portfolio) - if ok: - logger.info( - "Persisted initial portfolio view for strategy={}", - strategy_id, - ) - - # Build and persist an initial strategy summary (no trades yet) - timestamp_ms = int(runtime.coordinator._clock().timestamp() * 1000) - initial_summary = runtime.coordinator._build_summary(timestamp_ms, []) - ok = strategy_persistence.persist_strategy_summary(initial_summary) - if ok: - logger.info( - "Persisted initial strategy summary for strategy={}", - strategy_id, - ) - except Exception: - logger.exception( - "Failed to persist initial portfolio/summary for {}", - strategy_id, - ) + self._persist_initial_state(runtime, strategy_id) while True: if not strategy_persistence.strategy_running(strategy_id): logger.info( @@ -116,35 +160,8 @@ async def stream( strategy_id, len(result.trades), ) - # Persist and stream trades - for trade in result.trades: - item = strategy_persistence.persist_trade_history( - strategy_id, trade - ) - if item: - logger.info( - "Persisted trade {} for strategy={}", - getattr(trade, "trade_id", None), - strategy_id, - ) - - # Persist portfolio snapshot (positions) - ok = strategy_persistence.persist_portfolio_view(result.portfolio_view) - if ok: - logger.info( - "Persisted portfolio view for strategy={}", - strategy_id, - ) - - # Persist strategy summary - ok = strategy_persistence.persist_strategy_summary( - result.strategy_summary - ) - if ok: - logger.info( - "Persisted strategy summary for strategy={}", - strategy_id, - ) + # Persist and stream cycle results (trades, portfolio, summary) + self._persist_cycle_results(strategy_id, result) logger.info( "Waiting for next decision cycle for strategy_id={}, interval={}seconds", @@ -154,9 +171,28 @@ async def stream( await asyncio.sleep(request.trading_config.decide_interval) except asyncio.CancelledError: + # Ensure strategy is marked stopped on cancellation + try: + strategy_persistence.mark_strategy_stopped(strategy_id) + logger.info( + "Marked strategy {} as stopped due to cancellation", strategy_id + ) + except Exception: + logger.exception( + "Failed to mark strategy stopped for {} on cancellation", + strategy_id, + ) raise except Exception as err: # noqa: BLE001 logger.exception("StrategyAgent stream failed: {}", err) yield streaming.message_chunk(f"StrategyAgent error: {err}") finally: + # Always mark strategy as stopped when stream ends for any reason + try: + strategy_persistence.mark_strategy_stopped(strategy_id) + logger.info("Marked strategy {} as stopped in finalizer", strategy_id) + except Exception: + logger.exception( + "Failed to mark strategy stopped for {} in finalizer", strategy_id + ) yield streaming.done() diff --git a/python/valuecell/agents/strategy_agent/decision/composer.py b/python/valuecell/agents/strategy_agent/decision/composer.py index 3ea827ad1..550f1c7df 100644 --- a/python/valuecell/agents/strategy_agent/decision/composer.py +++ b/python/valuecell/agents/strategy_agent/decision/composer.py @@ -56,6 +56,11 @@ async def compose(self, context: ComposeContext) -> List[TradeInstruction]: ) try: plan = await self._call_llm(prompt) + if not plan.items: + logger.error( + "LLM returned empty plan for compose_id={}", context.compose_id + ) + return [] except ValidationError as exc: logger.error("LLM output failed validation: {}", exc) return [] @@ -63,12 +68,6 @@ async def compose(self, context: ComposeContext) -> List[TradeInstruction]: logger.exception("LLM invocation failed") return [] - if not plan.items: - logger.debug( - "LLM returned empty plan for compose_id={}", context.compose_id - ) - return [] - return self._normalize_plan(context, plan) # ------------------------------------------------------------------ diff --git a/python/valuecell/server/services/strategy_persistence.py b/python/valuecell/server/services/strategy_persistence.py index e502bd63e..e2a7d60b0 100644 --- a/python/valuecell/server/services/strategy_persistence.py +++ b/python/valuecell/server/services/strategy_persistence.py @@ -180,3 +180,25 @@ def strategy_running(strategy_id: str) -> bool: except Exception: logger.exception("strategy_running check failed for {}", strategy_id) return False + + +def set_strategy_status(strategy_id: str, status: str) -> bool: + """Set the status field for a strategy (convenience wrapper around upsert).""" + repo = get_strategy_repository() + try: + updated = repo.upsert_strategy(strategy_id, status=status) + return updated is not None + except Exception: + logger.exception("set_strategy_status failed for {}", strategy_id) + return False + + +def mark_strategy_stopped(strategy_id: str) -> bool: + """Mark a strategy as stopped.""" + try: + return set_strategy_status( + strategy_id, agent_models.StrategyStatus.STOPPED.value + ) + except Exception: + logger.exception("mark_strategy_stopped failed for {}", strategy_id) + return False