From d56c9c7d2f295fc5190d729aaffd5b468347f70a Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Sat, 8 Nov 2025 14:54:16 +0800 Subject: [PATCH 1/4] feat: enhance strategy cancellation handling and add status marking --- .../valuecell/agents/strategy_agent/agent.py | 19 ++++++++++++++++++ .../server/services/strategy_persistence.py | 20 +++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/python/valuecell/agents/strategy_agent/agent.py b/python/valuecell/agents/strategy_agent/agent.py index 8e5406d69..872ec8470 100644 --- a/python/valuecell/agents/strategy_agent/agent.py +++ b/python/valuecell/agents/strategy_agent/agent.py @@ -154,9 +154,28 @@ async def stream( await asyncio.sleep(request.trading_config.decide_interval) except asyncio.CancelledError: + # Ensure strategy is marked stopped on cancellation + try: + strategy_persistence.mark_strategy_stopped(strategy_id) + logger.info( + "Marked strategy {} as stopped due to cancellation", strategy_id + ) + except Exception: + logger.exception( + "Failed to mark strategy stopped for {} on cancellation", + strategy_id, + ) raise except Exception as err: # noqa: BLE001 logger.exception("StrategyAgent stream failed: {}", err) yield streaming.message_chunk(f"StrategyAgent error: {err}") finally: + # Always mark strategy as stopped when stream ends for any reason + try: + strategy_persistence.mark_strategy_stopped(strategy_id) + logger.info("Marked strategy {} as stopped in finalizer", strategy_id) + except Exception: + logger.exception( + "Failed to mark strategy stopped for {} in finalizer", strategy_id + ) yield streaming.done() diff --git a/python/valuecell/server/services/strategy_persistence.py b/python/valuecell/server/services/strategy_persistence.py index e502bd63e..e1ab6206a 100644 --- a/python/valuecell/server/services/strategy_persistence.py +++ b/python/valuecell/server/services/strategy_persistence.py @@ -180,3 +180,23 @@ def strategy_running(strategy_id: str) -> bool: except Exception: logger.exception("strategy_running check failed for {}", strategy_id) return False + + +def set_strategy_status(strategy_id: str, status: str) -> bool: + """Set the status field for a strategy (convenience wrapper around upsert).""" + repo = get_strategy_repository() + try: + updated = repo.upsert_strategy(strategy_id, status=status) + return updated is not None + except Exception: + logger.exception("set_strategy_status failed for {}", strategy_id) + return False + + +def mark_strategy_stopped(strategy_id: str) -> bool: + """Mark a strategy as stopped.""" + try: + return set_strategy_status(strategy_id, agent_models.StrategyStatus.STOPPED.value) + except Exception: + logger.exception("mark_strategy_stopped failed for {}", strategy_id) + return False From ee681659caf449a92147248ac09a4a7a5cdc05df Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Sat, 8 Nov 2025 16:22:16 +0800 Subject: [PATCH 2/4] fix: handle empty LLM plan response and improve strategy status setting --- .../agents/strategy_agent/decision/composer.py | 11 +++++------ .../valuecell/server/services/strategy_persistence.py | 4 +++- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/python/valuecell/agents/strategy_agent/decision/composer.py b/python/valuecell/agents/strategy_agent/decision/composer.py index 3ea827ad1..550f1c7df 100644 --- a/python/valuecell/agents/strategy_agent/decision/composer.py +++ b/python/valuecell/agents/strategy_agent/decision/composer.py @@ -56,6 +56,11 @@ async def compose(self, context: ComposeContext) -> List[TradeInstruction]: ) try: plan = await self._call_llm(prompt) + if not plan.items: + logger.error( + "LLM returned empty plan for compose_id={}", context.compose_id + ) + return [] except ValidationError as exc: logger.error("LLM output failed validation: {}", exc) return [] @@ -63,12 +68,6 @@ async def compose(self, context: ComposeContext) -> List[TradeInstruction]: logger.exception("LLM invocation failed") return [] - if not plan.items: - logger.debug( - "LLM returned empty plan for compose_id={}", context.compose_id - ) - return [] - return self._normalize_plan(context, plan) # ------------------------------------------------------------------ diff --git a/python/valuecell/server/services/strategy_persistence.py b/python/valuecell/server/services/strategy_persistence.py index e1ab6206a..e2a7d60b0 100644 --- a/python/valuecell/server/services/strategy_persistence.py +++ b/python/valuecell/server/services/strategy_persistence.py @@ -196,7 +196,9 @@ def set_strategy_status(strategy_id: str, status: str) -> bool: def mark_strategy_stopped(strategy_id: str) -> bool: """Mark a strategy as stopped.""" try: - return set_strategy_status(strategy_id, agent_models.StrategyStatus.STOPPED.value) + return set_strategy_status( + strategy_id, agent_models.StrategyStatus.STOPPED.value + ) except Exception: logger.exception("mark_strategy_stopped failed for {}", strategy_id) return False From 5369f50009a0e5ccb930c42556dbfe88183b3c3b Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Sat, 8 Nov 2025 16:37:33 +0800 Subject: [PATCH 3/4] refactor: extract persistence logic into separate methods for clarity and maintainability --- .../valuecell/agents/strategy_agent/agent.py | 154 ++++++++++-------- 1 file changed, 82 insertions(+), 72 deletions(-) diff --git a/python/valuecell/agents/strategy_agent/agent.py b/python/valuecell/agents/strategy_agent/agent.py index 872ec8470..200ad2657 100644 --- a/python/valuecell/agents/strategy_agent/agent.py +++ b/python/valuecell/agents/strategy_agent/agent.py @@ -22,6 +22,84 @@ class StrategyAgent(BaseAgent): """Top-level Strategy Agent integrating the decision coordinator.""" + async def _wait_until_marked_running(self, strategy_id: str, timeout_s: int = 300) -> None: + """Wait until persistence marks the strategy as running or timeout. + + This helper logs progress and returns when either the strategy is running + or the timeout elapses. It swallows exceptions from the persistence layer + to avoid bubbling nested try/except into `stream`. + """ + since = datetime.now() + try: + while not strategy_persistence.strategy_running(strategy_id): + if (datetime.now() - since).total_seconds() > timeout_s: + logger.error( + "Timeout waiting for strategy_id={} to be marked as running", + strategy_id, + ) + break + + await asyncio.sleep(1) + logger.info( + "Waiting for strategy_id={} to be marked as running", strategy_id + ) + except Exception: + # Avoid raising from persistence checks; we still proceed to start the runtime. + logger.exception( + "Error while waiting for strategy {} to be marked running", strategy_id + ) + + def _persist_initial_state(self, runtime, strategy_id: str) -> None: + """Persist initial portfolio snapshot and strategy summary. + + This helper captures and logs any errors internally so callers don't need + additional try/except nesting. + """ + try: + initial_portfolio = runtime.coordinator._portfolio_service.get_view() + try: + initial_portfolio.strategy_id = strategy_id + except Exception: + pass + + ok = strategy_persistence.persist_portfolio_view(initial_portfolio) + if ok: + logger.info("Persisted initial portfolio view for strategy={}", strategy_id) + + timestamp_ms = int(runtime.coordinator._clock().timestamp() * 1000) + initial_summary = runtime.coordinator._build_summary(timestamp_ms, []) + ok = strategy_persistence.persist_strategy_summary(initial_summary) + if ok: + logger.info("Persisted initial strategy summary for strategy={}", strategy_id) + except Exception: + logger.exception("Failed to persist initial portfolio/summary for {}", strategy_id) + + def _persist_cycle_results(self, strategy_id: str, result) -> None: + """Persist trades, portfolio view and strategy summary for a cycle. + + Errors are logged but not raised to keep the decision loop resilient. + """ + try: + for trade in result.trades: + item = strategy_persistence.persist_trade_history(strategy_id, trade) + if item: + logger.info( + "Persisted trade {} for strategy={}", + getattr(trade, "trade_id", None), + strategy_id, + ) + + ok = strategy_persistence.persist_portfolio_view(result.portfolio_view) + if ok: + logger.info("Persisted portfolio view for strategy={}", strategy_id) + + ok = strategy_persistence.persist_strategy_summary(result.strategy_summary) + if ok: + logger.info("Persisted strategy summary for strategy={}", strategy_id) + except Exception: + logger.exception("Error persisting cycle results for {}", strategy_id) + + async def stream( self, query: str, @@ -55,53 +133,12 @@ async def stream( ) # Wait until strategy is marked as running in persistence layer - since = datetime.now() - while not strategy_persistence.strategy_running(strategy_id): - if (datetime.now() - since).total_seconds() > 300: - logger.error( - "Timeout waiting for strategy_id={} to be marked as running", - strategy_id, - ) - break - - await asyncio.sleep(1) - logger.info( - "Waiting for strategy_id={} to be marked as running", strategy_id - ) + await self._wait_until_marked_running(strategy_id) try: logger.info("Starting decision loop for strategy_id={}", strategy_id) # Persist initial portfolio snapshot and strategy summary before entering the loop - try: - # Get current portfolio view from the coordinator's portfolio service - initial_portfolio = runtime.coordinator._portfolio_service.get_view() - # ensure strategy_id present on the view - try: - initial_portfolio.strategy_id = strategy_id - except Exception: - pass - - ok = strategy_persistence.persist_portfolio_view(initial_portfolio) - if ok: - logger.info( - "Persisted initial portfolio view for strategy={}", - strategy_id, - ) - - # Build and persist an initial strategy summary (no trades yet) - timestamp_ms = int(runtime.coordinator._clock().timestamp() * 1000) - initial_summary = runtime.coordinator._build_summary(timestamp_ms, []) - ok = strategy_persistence.persist_strategy_summary(initial_summary) - if ok: - logger.info( - "Persisted initial strategy summary for strategy={}", - strategy_id, - ) - except Exception: - logger.exception( - "Failed to persist initial portfolio/summary for {}", - strategy_id, - ) + self._persist_initial_state(runtime, strategy_id) while True: if not strategy_persistence.strategy_running(strategy_id): logger.info( @@ -116,35 +153,8 @@ async def stream( strategy_id, len(result.trades), ) - # Persist and stream trades - for trade in result.trades: - item = strategy_persistence.persist_trade_history( - strategy_id, trade - ) - if item: - logger.info( - "Persisted trade {} for strategy={}", - getattr(trade, "trade_id", None), - strategy_id, - ) - - # Persist portfolio snapshot (positions) - ok = strategy_persistence.persist_portfolio_view(result.portfolio_view) - if ok: - logger.info( - "Persisted portfolio view for strategy={}", - strategy_id, - ) - - # Persist strategy summary - ok = strategy_persistence.persist_strategy_summary( - result.strategy_summary - ) - if ok: - logger.info( - "Persisted strategy summary for strategy={}", - strategy_id, - ) + # Persist and stream cycle results (trades, portfolio, summary) + self._persist_cycle_results(strategy_id, result) logger.info( "Waiting for next decision cycle for strategy_id={}, interval={}seconds", From ee36dda8c21afc9f731303404bc6236d7b548a70 Mon Sep 17 00:00:00 2001 From: Zhaofeng Zhang <24791380+vcfgv@users.noreply.github.com> Date: Sat, 8 Nov 2025 16:38:48 +0800 Subject: [PATCH 4/4] make format --- python/valuecell/agents/strategy_agent/agent.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/python/valuecell/agents/strategy_agent/agent.py b/python/valuecell/agents/strategy_agent/agent.py index 200ad2657..7687b816d 100644 --- a/python/valuecell/agents/strategy_agent/agent.py +++ b/python/valuecell/agents/strategy_agent/agent.py @@ -22,7 +22,9 @@ class StrategyAgent(BaseAgent): """Top-level Strategy Agent integrating the decision coordinator.""" - async def _wait_until_marked_running(self, strategy_id: str, timeout_s: int = 300) -> None: + async def _wait_until_marked_running( + self, strategy_id: str, timeout_s: int = 300 + ) -> None: """Wait until persistence marks the strategy as running or timeout. This helper logs progress and returns when either the strategy is running @@ -64,15 +66,21 @@ def _persist_initial_state(self, runtime, strategy_id: str) -> None: ok = strategy_persistence.persist_portfolio_view(initial_portfolio) if ok: - logger.info("Persisted initial portfolio view for strategy={}", strategy_id) + logger.info( + "Persisted initial portfolio view for strategy={}", strategy_id + ) timestamp_ms = int(runtime.coordinator._clock().timestamp() * 1000) initial_summary = runtime.coordinator._build_summary(timestamp_ms, []) ok = strategy_persistence.persist_strategy_summary(initial_summary) if ok: - logger.info("Persisted initial strategy summary for strategy={}", strategy_id) + logger.info( + "Persisted initial strategy summary for strategy={}", strategy_id + ) except Exception: - logger.exception("Failed to persist initial portfolio/summary for {}", strategy_id) + logger.exception( + "Failed to persist initial portfolio/summary for {}", strategy_id + ) def _persist_cycle_results(self, strategy_id: str, result) -> None: """Persist trades, portfolio view and strategy summary for a cycle. @@ -99,7 +107,6 @@ def _persist_cycle_results(self, strategy_id: str, result) -> None: except Exception: logger.exception("Error persisting cycle results for {}", strategy_id) - async def stream( self, query: str,