Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions python/valuecell/agents/common/trading/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,12 @@ class MyFeaturesPipeline(BaseFeaturesPipeline):
class MyCustomAgent(BaseStrategyAgent):
"""Agent with custom feature computation."""

def _build_features_pipeline(
async def _build_features_pipeline(
self, request: UserRequest
) -> BaseFeaturesPipeline | None:
return MyFeaturesPipeline(request)

def _create_decision_composer(self, request: UserRequest):
async def _create_decision_composer(self, request: UserRequest):
# Use default LLM composer
return None
```
Expand Down Expand Up @@ -345,11 +345,11 @@ class RuleBasedComposer(BaseComposer):
class RuleBasedAgent(BaseStrategyAgent):
"""Agent using rule-based decisions."""

def _build_features_pipeline(self, request: UserRequest):
async def _build_features_pipeline(self, request: UserRequest):
# Use default pipeline
return None

def _create_decision_composer(self, request: UserRequest):
async def _create_decision_composer(self, request: UserRequest):
return RuleBasedComposer(request)
```

Expand All @@ -359,12 +359,12 @@ class RuleBasedAgent(BaseStrategyAgent):
class MonitoredAgent(BaseStrategyAgent):
"""Agent with custom monitoring and logging."""

def _on_start(self, runtime, request):
async def _on_start(self, runtime, request):
"""Called once after runtime creation."""
self.cycle_count = 0
print(f"Strategy {runtime.strategy_id} starting...")

def _on_cycle_result(self, result, runtime, request):
async def _on_cycle_result(self, result, runtime, request):
"""Called after each cycle completes."""
self.cycle_count += 1
print(f"Cycle {self.cycle_count}: "
Expand All @@ -374,15 +374,15 @@ class MonitoredAgent(BaseStrategyAgent):
# Send metrics to external monitoring
# ... custom logic ...

def _on_stop(self, runtime, request, reason):
async def _on_stop(self, runtime, request, reason):
"""Called before finalization."""
print(f"Strategy stopping: {reason}")
print(f"Total cycles: {self.cycle_count}")

def _build_features_pipeline(self, request):
async def _build_features_pipeline(self, request):
return None # Use defaults

def _create_decision_composer(self, request):
async def _create_decision_composer(self, request):
return None # Use defaults
```

Expand Down Expand Up @@ -422,11 +422,11 @@ from .features import MyFeaturesPipeline # if custom
from .composer import MyComposer # if custom

class MyAgent(BaseStrategyAgent):
def _build_features_pipeline(self, request: UserRequest):
async def _build_features_pipeline(self, request: UserRequest):
# Return custom pipeline or None for default
return MyFeaturesPipeline(request)

def _create_decision_composer(self, request: UserRequest):
async def _create_decision_composer(self, request: UserRequest):
# Return custom composer or None for default
return MyComposer(request)
```
Expand Down
8 changes: 6 additions & 2 deletions python/valuecell/agents/common/trading/_internal/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ async def create_strategy_runtime(
request: UserRequest,
composer: Optional[BaseComposer] = None,
features_pipeline: Optional[BaseFeaturesPipeline] = None,
strategy_id_override: Optional[str] = None,
initial_capital_override: Optional[float] = None,
) -> StrategyRuntime:
"""Create a strategy runtime with async initialization (supports both paper and live trading).

Expand Down Expand Up @@ -108,8 +110,10 @@ async def create_strategy_runtime(
execution_gateway = await _create_execution_gateway(request)

# Create strategy runtime components
strategy_id = generate_uuid("strategy")
initial_capital = request.trading_config.initial_capital or 0.0
strategy_id = strategy_id_override or generate_uuid("strategy")
initial_capital = (
initial_capital_override or request.trading_config.initial_capital or 0.0
)
constraints = Constraints(
max_positions=request.trading_config.max_positions,
max_leverage=request.trading_config.max_leverage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from valuecell.agents.common.trading import models as agent_models
from valuecell.agents.common.trading.utils import get_current_timestamp_ms
from valuecell.server.db.repositories.strategy_repository import get_strategy_repository
from valuecell.server.services import strategy_persistence

if TYPE_CHECKING:
Expand Down Expand Up @@ -124,6 +125,38 @@ def persist_initial_state(self, runtime: StrategyRuntime) -> None:
"Failed to persist initial portfolio/summary for {}", self.strategy_id
)

def has_initial_state(self) -> bool:
"""Return True if an initial portfolio snapshot already exists.

This allows idempotent strategy restarts without duplicating the first snapshot.
"""
try:
repo = get_strategy_repository()
snap = repo.get_latest_portfolio_snapshot(self.strategy_id)
return snap is not None
except Exception:
logger.warning(
"has_initial_state check failed for strategy {}", self.strategy_id
)
return False

def get_latest_portfolio_snapshot(self):
"""Return the latest stored portfolio snapshot or None.

This is a convenience wrapper around the repository call so callers
can inspect persisted initial state (for resume semantics).
"""
try:
repo = get_strategy_repository()
snap = repo.get_latest_portfolio_snapshot(self.strategy_id)
return snap
except Exception:
logger.warning(
"Failed to fetch latest portfolio snapshot for strategy {}",
self.strategy_id,
)
return None

def persist_cycle_results(self, result: DecisionCycleResult) -> None:
"""Persist trades, portfolio view, and strategy summary for a cycle.

Expand Down Expand Up @@ -184,8 +217,33 @@ def persist_cycle_results(self, result: DecisionCycleResult) -> None:
except Exception:
logger.exception("Error persisting cycle results for {}", self.strategy_id)

def persist_portfolio_snapshot(self, runtime: StrategyRuntime) -> None:
"""Persist a final portfolio snapshot (used at shutdown).

Mirrors portfolio part of cycle persistence but without trades or summary refresh.
Errors are logged and swallowed.
"""
try:
view = runtime.coordinator.portfolio_service.get_view()
try:
view.strategy_id = self.strategy_id
except Exception:
pass
ok = strategy_persistence.persist_portfolio_view(view)
if ok:
logger.info(
"Persisted final portfolio snapshot for strategy={}",
self.strategy_id,
)
except Exception:
logger.exception(
"Failed to persist final portfolio snapshot for {}", self.strategy_id
)

async def finalize(
self, runtime: StrategyRuntime, reason: str = "normal_exit"
self,
runtime: StrategyRuntime,
reason: agent_models.StopReason | str = agent_models.StopReason.NORMAL_EXIT,
) -> None:
"""Finalize strategy: close resources and mark as stopped.

Expand All @@ -207,28 +265,28 @@ async def finalize(
"Failed to close runtime resources for strategy {}", self.strategy_id
)

if reason == "error_closing_positions":
# Special case: we failed to close positions, so mark as ERROR to alert user
final_status = agent_models.StrategyStatus.ERROR.value
else:
final_status = agent_models.StrategyStatus.STOPPED.value
# With simplified statuses, all terminal states map to STOPPED.
# Preserve the detailed stop reason in strategy metadata for resume logic.
final_status = agent_models.StrategyStatus.STOPPED.value

# Mark strategy as stopped/error in persistence
try:
strategy_persistence.set_strategy_status(self.strategy_id, final_status)
reason_value = getattr(reason, "value", reason)
logger.info(
"Marked strategy {} as {} (reason: {})",
self.strategy_id,
final_status,
reason,
reason_value,
)
except Exception:
logger.exception(
"Failed to mark strategy {} for {} (reason: {})",
final_status,
self.strategy_id,
reason,
getattr(reason, "value", reason),
)
self._record_stop_reason(reason)

def is_running(self) -> bool:
"""Check if strategy is still running according to persistence layer."""
Expand Down Expand Up @@ -259,3 +317,25 @@ def persist_trades(self, trades: list) -> None:
logger.exception(
"Error persisting ad-hoc trades for strategy {}", self.strategy_id
)

def _record_stop_reason(self, reason: agent_models.StopReason | str) -> None:
"""Persist last stop reason inside strategy metadata for resume decisions.

Accept either a StopReason enum or a raw string; store the normalized
string value in the DB metadata.
"""
try:
repo = get_strategy_repository()
strategy = repo.get_strategy_by_strategy_id(self.strategy_id)
if strategy is None:
return
metadata = dict(strategy.strategy_metadata or {})
normalized = getattr(reason, "value", reason)
if metadata.get("stop_reason") == normalized:
return
metadata["stop_reason"] = normalized
repo.upsert_strategy(strategy_id=self.strategy_id, metadata=metadata)
except Exception:
logger.warning(
"Failed to record stop reason for strategy %s", self.strategy_id
)
Loading