diff --git a/python/configs/agent_cards/strategy_agent.json b/python/configs/agent_cards/strategy_agent.json new file mode 100644 index 000000000..4e7073fbe --- /dev/null +++ b/python/configs/agent_cards/strategy_agent.json @@ -0,0 +1,29 @@ +{ + "name": "StrategyAgent", + "display_name": "Strategy Agent", + "description": "LLM-driven strategy composer that turns market features into normalized trade instructions. Includes a simple runtime for demo and testing.", + "capabilities": { + "streaming": true, + "push_notifications": true + }, + "skills": [ + { + "id": "strategy_run", + "name": "Run Strategy", + "description": "Start a strategy using the provided model, exchange and trading configuration.", + "examples": [ + "Run strategy with DeepSeek model on BTC-USD and ETH-USD", + "Start a virtual backtest with $10,000 initial capital for BTC" + ], + "tags": ["strategy", "run", "trading", "compose"] + } + ], + "enabled": true, + "metadata": { + "planner_passthrough": true, + "version": "0.1.0", + "author": "ValueCell Team", + "tags": ["strategy", "trading", "llm", "demo"], + "notes": "This card is a lightweight example; replace model api_key and tune parameters for production use." + } +} diff --git a/python/pyproject.toml b/python/pyproject.toml index e90044315..0fc093590 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -28,6 +28,7 @@ dependencies = [ "loguru>=0.7.3", "aiofiles>=24.1.0", "crawl4ai>=0.7.4", + "ccxt>=4.5.15", ] [project.optional-dependencies] diff --git a/python/scripts/launch.py b/python/scripts/launch.py index 22bcd653e..054a61805 100644 --- a/python/scripts/launch.py +++ b/python/scripts/launch.py @@ -32,12 +32,18 @@ RESEARCH_AGENT_NAME = "ResearchAgent" AUTO_TRADING_AGENT_NAME = "AutoTradingAgent" NEWS_AGENT_NAME = "NewsAgent" +STRATEGY_AGENT_NAME = "StrategyAgent" # AGENTS = list(MAP_NAME_ANALYST.keys()) + [ # TRADING_AGENTS_NAME, # RESEARCH_AGENT_NAME, # AUTO_TRADING_AGENT_NAME, # ] -AGENTS = [RESEARCH_AGENT_NAME, AUTO_TRADING_AGENT_NAME, NEWS_AGENT_NAME] +AGENTS = [ + RESEARCH_AGENT_NAME, + AUTO_TRADING_AGENT_NAME, + NEWS_AGENT_NAME, + STRATEGY_AGENT_NAME, +] PROJECT_DIR = Path(__file__).resolve().parent.parent.parent PYTHON_DIR = PROJECT_DIR / "python" @@ -78,6 +84,9 @@ MAP_NAME_COMMAND[NEWS_AGENT_NAME] = ( f"uv run --env-file {ENV_PATH_STR} -m valuecell.agents.news_agent" ) +MAP_NAME_COMMAND[STRATEGY_AGENT_NAME] = ( + f"uv run --env-file {ENV_PATH_STR} -m valuecell.agents.strategy_agent" +) BACKEND_COMMAND = ( f"cd {PYTHON_DIR_STR} && uv run --env-file {ENV_PATH_STR} -m valuecell.server.main" ) diff --git a/python/uv.lock b/python/uv.lock index b938008f6..0987c4ec1 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -1,8 +1,9 @@ version = 1 -revision = 2 +revision = 3 requires-python = ">=3.12" resolution-markers = [ - "python_full_version >= '3.13'", + "python_full_version >= '3.14'", + "python_full_version == '3.13.*'", "python_full_version < '3.13'", ] @@ -65,6 +66,18 @@ openai = [ { name = "openai" }, ] +[[package]] +name = "aiodns" +version = "3.5.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pycares" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/17/0a/163e5260cecc12de6abc259d158d9da3b8ec062ab863107dcdb1166cdcef/aiodns-3.5.0.tar.gz", hash = "sha256:11264edbab51896ecf546c18eb0dd56dff0428c6aa6d2cd87e643e07300eb310", size = 14380, upload-time = "2025-06-13T16:21:53.595Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/f6/2c/711076e5f5d0707b8ec55a233c8bfb193e0981a800cd1b3b123e8ff61ca1/aiodns-3.5.0-py3-none-any.whl", hash = "sha256:6d0404f7d5215849233f6ee44854f2bb2481adf71b336b2279016ea5990ca5c5", size = 8068, upload-time = "2025-06-13T16:21:52.45Z" }, +] + [[package]] name = "aiofiles" version = "24.1.0" @@ -325,6 +338,25 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/72/76/20fa66124dbe6be5cafeb312ece67de6b61dd91a0247d1ea13db4ebb33c2/cachetools-5.5.2-py3-none-any.whl", hash = "sha256:d26a22bcc62eb95c3beabd9f1ee5e820d3d2704fe2967cbe350e20c8ffcd3f0a", size = 10080, upload-time = "2025-02-20T21:01:16.647Z" }, ] +[[package]] +name = "ccxt" +version = "4.5.15" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "aiodns" }, + { name = "aiohttp" }, + { name = "certifi" }, + { name = "cryptography" }, + { name = "requests" }, + { name = "setuptools" }, + { name = "typing-extensions" }, + { name = "yarl" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/32/a1/1dfc0e6c466efabfc9c88a81bb5ee737616bcada2b3c425608a4482eab81/ccxt-4.5.15.tar.gz", hash = "sha256:13c846088c8a1e2b45b0e629b18b6c739e712db77cdce3540d0abdc078bd16b7", size = 5435128, upload-time = "2025-11-03T18:18:12.76Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/16/e9/abe7404f64b191b3326ad9f94096ff116468af6c4f8f14c785285d1dc6a5/ccxt-4.5.15-py2.py3-none-any.whl", hash = "sha256:4220118d146a6e8b74b52918ae99508c1b12ae7b41298170fab14e8ef14c7f9d", size = 5789862, upload-time = "2025-11-03T18:18:10.233Z" }, +] + [[package]] name = "certifi" version = "2025.8.3" @@ -1041,6 +1073,8 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/19/0d/6660d55f7373b2ff8152401a83e02084956da23ae58cddbfb0b330978fe9/greenlet-3.2.4-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:3b3812d8d0c9579967815af437d96623f45c0f2ae5f04e366de62a12d83a8fb0", size = 607586, upload-time = "2025-08-07T13:18:28.544Z" }, { url = "https://files.pythonhosted.org/packages/8e/1a/c953fdedd22d81ee4629afbb38d2f9d71e37d23caace44775a3a969147d4/greenlet-3.2.4-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:abbf57b5a870d30c4675928c37278493044d7c14378350b3aa5d484fa65575f0", size = 1123281, upload-time = "2025-08-07T13:42:39.858Z" }, { url = "https://files.pythonhosted.org/packages/3f/c7/12381b18e21aef2c6bd3a636da1088b888b97b7a0362fac2e4de92405f97/greenlet-3.2.4-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:20fb936b4652b6e307b8f347665e2c615540d4b42b3b4c8a321d8286da7e520f", size = 1151142, upload-time = "2025-08-07T13:18:22.981Z" }, + { url = "https://files.pythonhosted.org/packages/27/45/80935968b53cfd3f33cf99ea5f08227f2646e044568c9b1555b58ffd61c2/greenlet-3.2.4-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:ee7a6ec486883397d70eec05059353b8e83eca9168b9f3f9a361971e77e0bcd0", size = 1564846, upload-time = "2025-11-04T12:42:15.191Z" }, + { url = "https://files.pythonhosted.org/packages/69/02/b7c30e5e04752cb4db6202a3858b149c0710e5453b71a3b2aec5d78a1aab/greenlet-3.2.4-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:326d234cbf337c9c3def0676412eb7040a35a768efc92504b947b3e9cfc7543d", size = 1633814, upload-time = "2025-11-04T12:42:17.175Z" }, { url = "https://files.pythonhosted.org/packages/e9/08/b0814846b79399e585f974bbeebf5580fbe59e258ea7be64d9dfb253c84f/greenlet-3.2.4-cp312-cp312-win_amd64.whl", hash = "sha256:a7d4e128405eea3814a12cc2605e0e6aedb4035bf32697f72deca74de4105e02", size = 299899, upload-time = "2025-08-07T13:38:53.448Z" }, { url = "https://files.pythonhosted.org/packages/49/e8/58c7f85958bda41dafea50497cbd59738c5c43dbbea5ee83d651234398f4/greenlet-3.2.4-cp313-cp313-macosx_11_0_universal2.whl", hash = "sha256:1a921e542453fe531144e91e1feedf12e07351b1cf6c9e8a3325ea600a715a31", size = 272814, upload-time = "2025-08-07T13:15:50.011Z" }, { url = "https://files.pythonhosted.org/packages/62/dd/b9f59862e9e257a16e4e610480cfffd29e3fae018a68c2332090b53aac3d/greenlet-3.2.4-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:cd3c8e693bff0fff6ba55f140bf390fa92c994083f838fece0f63be121334945", size = 641073, upload-time = "2025-08-07T13:42:57.23Z" }, @@ -1050,6 +1084,8 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ee/43/3cecdc0349359e1a527cbf2e3e28e5f8f06d3343aaf82ca13437a9aa290f/greenlet-3.2.4-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:23768528f2911bcd7e475210822ffb5254ed10d71f4028387e5a99b4c6699671", size = 610497, upload-time = "2025-08-07T13:18:31.636Z" }, { url = "https://files.pythonhosted.org/packages/b8/19/06b6cf5d604e2c382a6f31cafafd6f33d5dea706f4db7bdab184bad2b21d/greenlet-3.2.4-cp313-cp313-musllinux_1_1_aarch64.whl", hash = "sha256:00fadb3fedccc447f517ee0d3fd8fe49eae949e1cd0f6a611818f4f6fb7dc83b", size = 1121662, upload-time = "2025-08-07T13:42:41.117Z" }, { url = "https://files.pythonhosted.org/packages/a2/15/0d5e4e1a66fab130d98168fe984c509249c833c1a3c16806b90f253ce7b9/greenlet-3.2.4-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:d25c5091190f2dc0eaa3f950252122edbbadbb682aa7b1ef2f8af0f8c0afefae", size = 1149210, upload-time = "2025-08-07T13:18:24.072Z" }, + { url = "https://files.pythonhosted.org/packages/1c/53/f9c440463b3057485b8594d7a638bed53ba531165ef0ca0e6c364b5cc807/greenlet-3.2.4-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:6e343822feb58ac4d0a1211bd9399de2b3a04963ddeec21530fc426cc121f19b", size = 1564759, upload-time = "2025-11-04T12:42:19.395Z" }, + { url = "https://files.pythonhosted.org/packages/47/e4/3bb4240abdd0a8d23f4f88adec746a3099f0d86bfedb623f063b2e3b4df0/greenlet-3.2.4-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:ca7f6f1f2649b89ce02f6f229d7c19f680a6238af656f61e0115b24857917929", size = 1634288, upload-time = "2025-11-04T12:42:21.174Z" }, { url = "https://files.pythonhosted.org/packages/0b/55/2321e43595e6801e105fcfdee02b34c0f996eb71e6ddffca6b10b7e1d771/greenlet-3.2.4-cp313-cp313-win_amd64.whl", hash = "sha256:554b03b6e73aaabec3745364d6239e9e012d64c68ccd0b8430c64ccc14939a8b", size = 299685, upload-time = "2025-08-07T13:24:38.824Z" }, { url = "https://files.pythonhosted.org/packages/22/5c/85273fd7cc388285632b0498dbbab97596e04b154933dfe0f3e68156c68c/greenlet-3.2.4-cp314-cp314-macosx_11_0_universal2.whl", hash = "sha256:49a30d5fda2507ae77be16479bdb62a660fa51b1eb4928b524975b3bde77b3c0", size = 273586, upload-time = "2025-08-07T13:16:08.004Z" }, { url = "https://files.pythonhosted.org/packages/d1/75/10aeeaa3da9332c2e761e4c50d4c3556c21113ee3f0afa2cf5769946f7a3/greenlet-3.2.4-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:299fd615cd8fc86267b47597123e3f43ad79c9d8a22bebdce535e53550763e2f", size = 686346, upload-time = "2025-08-07T13:42:59.944Z" }, @@ -1057,6 +1093,8 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/dc/8b/29aae55436521f1d6f8ff4e12fb676f3400de7fcf27fccd1d4d17fd8fecd/greenlet-3.2.4-cp314-cp314-manylinux2014_s390x.manylinux_2_17_s390x.whl", hash = "sha256:b4a1870c51720687af7fa3e7cda6d08d801dae660f75a76f3845b642b4da6ee1", size = 694659, upload-time = "2025-08-07T13:53:17.759Z" }, { url = "https://files.pythonhosted.org/packages/92/2e/ea25914b1ebfde93b6fc4ff46d6864564fba59024e928bdc7de475affc25/greenlet-3.2.4-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:061dc4cf2c34852b052a8620d40f36324554bc192be474b9e9770e8c042fd735", size = 695355, upload-time = "2025-08-07T13:18:34.517Z" }, { url = "https://files.pythonhosted.org/packages/72/60/fc56c62046ec17f6b0d3060564562c64c862948c9d4bc8aa807cf5bd74f4/greenlet-3.2.4-cp314-cp314-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:44358b9bf66c8576a9f57a590d5f5d6e72fa4228b763d0e43fee6d3b06d3a337", size = 657512, upload-time = "2025-08-07T13:18:33.969Z" }, + { url = "https://files.pythonhosted.org/packages/23/6e/74407aed965a4ab6ddd93a7ded3180b730d281c77b765788419484cdfeef/greenlet-3.2.4-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:2917bdf657f5859fbf3386b12d68ede4cf1f04c90c3a6bc1f013dd68a22e2269", size = 1612508, upload-time = "2025-11-04T12:42:23.427Z" }, + { url = "https://files.pythonhosted.org/packages/0d/da/343cd760ab2f92bac1845ca07ee3faea9fe52bee65f7bcb19f16ad7de08b/greenlet-3.2.4-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:015d48959d4add5d6c9f6c5210ee3803a830dce46356e3bc326d6776bde54681", size = 1680760, upload-time = "2025-11-04T12:42:25.341Z" }, { url = "https://files.pythonhosted.org/packages/e3/a5/6ddab2b4c112be95601c13428db1d8b6608a8b6039816f2ba09c346c08fc/greenlet-3.2.4-cp314-cp314-win_amd64.whl", hash = "sha256:e37ab26028f12dbb0ff65f29a8d3d44a765c61e729647bf2ddfbbed621726f01", size = 303425, upload-time = "2025-08-07T13:32:27.59Z" }, ] @@ -2293,6 +2331,69 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/47/8d/d529b5d697919ba8c11ad626e835d4039be708a35b0d22de83a269a6682c/pyasn1_modules-0.4.2-py3-none-any.whl", hash = "sha256:29253a9207ce32b64c3ac6600edc75368f98473906e8fd1043bd6b5b1de2c14a", size = 181259, upload-time = "2025-03-28T02:41:19.028Z" }, ] +[[package]] +name = "pycares" +version = "4.11.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "cffi" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/8d/ad/9d1e96486d2eb5a2672c4d9a2dd372d015b8d7a332c6ac2722c4c8e6bbbf/pycares-4.11.0.tar.gz", hash = "sha256:c863d9003ca0ce7df26429007859afd2a621d3276ed9fef154a9123db9252557", size = 654473, upload-time = "2025-09-09T15:18:21.849Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e2/4e/4821b66feefaaa8ec03494c1a11614c430983572e54ff062b4589441e199/pycares-4.11.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:b93d624560ba52287873bacff70b42c99943821ecbc810b959b0953560f53c36", size = 145906, upload-time = "2025-09-09T15:16:53.204Z" }, + { url = "https://files.pythonhosted.org/packages/e8/81/93a505dcbb7533254b0ce1da519591dcda889d6a66dcdfa5737e3280e18a/pycares-4.11.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:775d99966e28c8abd9910ddef2de0f1e173afc5a11cea9f184613c747373ab80", size = 141972, upload-time = "2025-09-09T15:16:54.43Z" }, + { url = "https://files.pythonhosted.org/packages/7d/d6/76994c8b21316e48ea6c3ce3298574c28f90c9c41428a3349a57104621c9/pycares-4.11.0-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:84fde689557361764f052850a2d68916050adbfd9321f6105aca1d8f1a9bd49b", size = 637832, upload-time = "2025-09-09T15:16:55.523Z" }, + { url = "https://files.pythonhosted.org/packages/bb/a4/5ca7e316d0edb714d78974cb34f4883f63fe9f580644c2db99fb62b05f56/pycares-4.11.0-cp312-cp312-manylinux_2_28_ppc64le.whl", hash = "sha256:30ceed06f3bf5eff865a34d21562c25a7f3dad0ed336b9dd415330e03a6c50c4", size = 687751, upload-time = "2025-09-09T15:16:57.55Z" }, + { url = "https://files.pythonhosted.org/packages/cb/8d/c5c578fdd335d7b1dcaea88fae3497390095b5b05a1ba34a29f62d037abb/pycares-4.11.0-cp312-cp312-manylinux_2_28_s390x.whl", hash = "sha256:97d971b3a88a803bb95ff8a40ea4d68da59319eb8b59e924e318e2560af8c16d", size = 678362, upload-time = "2025-09-09T15:16:58.859Z" }, + { url = "https://files.pythonhosted.org/packages/b9/96/9be4d838a9348dd2e72a90c34d186b918b66d499af5be79afa18a6ba2808/pycares-4.11.0-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:2d5cac829da91ade70ce1af97dad448c6cd4778b48facbce1b015e16ced93642", size = 641069, upload-time = "2025-09-09T15:17:00.046Z" }, + { url = "https://files.pythonhosted.org/packages/39/d6/8ea9b5dcef6b566cde034aa2b68743f7b0a19fa0fba9ea01a4f98b8a57fb/pycares-4.11.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:ee1ea367835eb441d246164c09d1f9703197af4425fc6865cefcde9e2ca81f85", size = 622357, upload-time = "2025-09-09T15:17:01.205Z" }, + { url = "https://files.pythonhosted.org/packages/07/f8/3401e89b5d2970e30e02f9beb29ad59e2a8f19ef2c68c978de2b764cacb0/pycares-4.11.0-cp312-cp312-musllinux_1_2_ppc64le.whl", hash = "sha256:3139ec1f4450a4b253386035c5ecd2722582ae3320a456df5021ffe3f174260a", size = 670290, upload-time = "2025-09-09T15:17:02.413Z" }, + { url = "https://files.pythonhosted.org/packages/a2/c4/ff6a166e1d1d1987339548a19d0b1d52ec3ead8b3a8a2247a0d96e56013c/pycares-4.11.0-cp312-cp312-musllinux_1_2_s390x.whl", hash = "sha256:5d70324ca1d82c6c4b00aa678347f7560d1ef2ce1d181978903459a97751543a", size = 652958, upload-time = "2025-09-09T15:17:04.203Z" }, + { url = "https://files.pythonhosted.org/packages/b8/7c/fc084b395921c9b862d31a83f809fe649c24314b51b527ad0ab0df33edd4/pycares-4.11.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:e2f8d9cfe0eb3a2997fde5df99b1aaea5a46dabfcfcac97b2d05f027c2cd5e28", size = 629239, upload-time = "2025-09-09T15:17:05.477Z" }, + { url = "https://files.pythonhosted.org/packages/b0/7f/2f26062bea95ab657f979217d50df563dc9fd9cc4c5dd21a6e7323e9efe7/pycares-4.11.0-cp312-cp312-win32.whl", hash = "sha256:1571a7055c03a95d5270c914034eac7f8bfa1b432fc1de53d871b821752191a4", size = 118918, upload-time = "2025-09-09T15:17:06.882Z" }, + { url = "https://files.pythonhosted.org/packages/a5/86/277473d20f3df4e00fa7e0ebb21955b2830b15247462aaf8f3fc8c4950be/pycares-4.11.0-cp312-cp312-win_amd64.whl", hash = "sha256:7570e0b50db619b2ee370461c462617225dc3a3f63f975c6f117e2f0c94f82ca", size = 144560, upload-time = "2025-09-09T15:17:07.891Z" }, + { url = "https://files.pythonhosted.org/packages/f0/f9/d65ad17ec921d8b7eb42161dec2024ee2f5c9f1c44cabf0dd1b7f4fac6c5/pycares-4.11.0-cp312-cp312-win_arm64.whl", hash = "sha256:f199702740f3b766ed8c70efb885538be76cb48cd0cb596b948626f0b825e07a", size = 115695, upload-time = "2025-09-09T15:17:09.333Z" }, + { url = "https://files.pythonhosted.org/packages/dc/a9/62fea7ad72ac1fed2ac9dd8e9a7379b7eb0288bf2b3ea5731642c3a6f7de/pycares-4.11.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:2c296ab94d1974f8d2f76c499755a9ce31ffd4986e8898ef19b90e32525f7d84", size = 145909, upload-time = "2025-09-09T15:17:10.491Z" }, + { url = "https://files.pythonhosted.org/packages/f4/ac/0317d6d0d3bd7599c53b8f1db09ad04260647d2f6842018e322584791fd5/pycares-4.11.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:e0fcd3a8bac57a0987d9b09953ba0f8703eb9dca7c77f7051d8c2ed001185be8", size = 141974, upload-time = "2025-09-09T15:17:11.634Z" }, + { url = "https://files.pythonhosted.org/packages/63/11/731b565ae1e81c43dac247a248ee204628186f6df97c9927bd06c62237f8/pycares-4.11.0-cp313-cp313-manylinux_2_28_aarch64.whl", hash = "sha256:bac55842047567ddae177fb8189b89a60633ac956d5d37260f7f71b517fd8b87", size = 637796, upload-time = "2025-09-09T15:17:12.815Z" }, + { url = "https://files.pythonhosted.org/packages/f5/30/a2631fe2ffaa85475cdbff7df1d9376bc0b2a6ae77ca55d53233c937a5da/pycares-4.11.0-cp313-cp313-manylinux_2_28_ppc64le.whl", hash = "sha256:4da2e805ed8c789b9444ef4053f6ef8040cd13b0c1ca6d3c4fe6f9369c458cb4", size = 687734, upload-time = "2025-09-09T15:17:14.015Z" }, + { url = "https://files.pythonhosted.org/packages/a9/b7/b3a5f99d4ab776662e71d5a56e8f6ea10741230ff988d1f502a8d429236b/pycares-4.11.0-cp313-cp313-manylinux_2_28_s390x.whl", hash = "sha256:ea785d1f232b42b325578f0c8a2fa348192e182cc84a1e862896076a4a2ba2a7", size = 678320, upload-time = "2025-09-09T15:17:15.442Z" }, + { url = "https://files.pythonhosted.org/packages/ea/77/a00d962b90432993afbf3bd05da8fe42117e0d9037cd7fd428dc41094d7b/pycares-4.11.0-cp313-cp313-manylinux_2_28_x86_64.whl", hash = "sha256:aa160dc9e785212c49c12bb891e242c949758b99542946cc8e2098ef391f93b0", size = 641012, upload-time = "2025-09-09T15:17:16.728Z" }, + { url = "https://files.pythonhosted.org/packages/c6/fb/9266979ba59d37deee1fd74452b2ae32a7395acafe1bee510ac023c6c9a5/pycares-4.11.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:7830709c23bbc43fbaefbb3dde57bdd295dc86732504b9d2e65044df8fd5e9fb", size = 622363, upload-time = "2025-09-09T15:17:17.835Z" }, + { url = "https://files.pythonhosted.org/packages/91/c2/16dbc3dc33781a3c79cbdd76dd1cda808d98ba078d9a63a725d6a1fad181/pycares-4.11.0-cp313-cp313-musllinux_1_2_ppc64le.whl", hash = "sha256:3ef1ab7abbd238bb2dbbe871c3ea39f5a7fc63547c015820c1e24d0d494a1689", size = 670294, upload-time = "2025-09-09T15:17:19.214Z" }, + { url = "https://files.pythonhosted.org/packages/ff/75/f003905e55298a6dd5e0673a2dc11e31518a5141393b925dc05fcaba9fb4/pycares-4.11.0-cp313-cp313-musllinux_1_2_s390x.whl", hash = "sha256:a4060d8556c908660512d42df1f4a874e4e91b81f79e3a9090afedc7690ea5ba", size = 652973, upload-time = "2025-09-09T15:17:20.388Z" }, + { url = "https://files.pythonhosted.org/packages/55/2a/eafb235c371979e11f8998d686cbaa91df6a84a34ffe4d997dfe57c45445/pycares-4.11.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:a98fac4a3d4f780817016b6f00a8a2c2f41df5d25dfa8e5b1aa0d783645a6566", size = 629235, upload-time = "2025-09-09T15:17:21.92Z" }, + { url = "https://files.pythonhosted.org/packages/05/99/60f19eb1c8eb898882dd8875ea51ad0aac3aff5780b27247969e637cc26a/pycares-4.11.0-cp313-cp313-win32.whl", hash = "sha256:faa8321bc2a366189dcf87b3823e030edf5ac97a6b9a7fc99f1926c4bf8ef28e", size = 118918, upload-time = "2025-09-09T15:17:23.327Z" }, + { url = "https://files.pythonhosted.org/packages/2a/14/bc89ad7225cba73068688397de09d7cad657d67b93641c14e5e18b88e685/pycares-4.11.0-cp313-cp313-win_amd64.whl", hash = "sha256:6f74b1d944a50fa12c5006fd10b45e1a45da0c5d15570919ce48be88e428264c", size = 144556, upload-time = "2025-09-09T15:17:24.341Z" }, + { url = "https://files.pythonhosted.org/packages/af/88/4309576bd74b5e6fc1f39b9bc5e4b578df2cadb16bdc026ac0cc15663763/pycares-4.11.0-cp313-cp313-win_arm64.whl", hash = "sha256:4b6f7581793d8bb3014028b8397f6f80b99db8842da58f4409839c29b16397ad", size = 115692, upload-time = "2025-09-09T15:17:25.637Z" }, + { url = "https://files.pythonhosted.org/packages/2a/70/a723bc79bdcac60361b40184b649282ac0ab433b90e9cc0975370c2ff9c9/pycares-4.11.0-cp314-cp314-macosx_10_13_x86_64.whl", hash = "sha256:df0a17f4e677d57bca3624752bbb515316522ad1ce0de07ed9d920e6c4ee5d35", size = 145910, upload-time = "2025-09-09T15:17:26.774Z" }, + { url = "https://files.pythonhosted.org/packages/d5/4e/46311ef5a384b5f0bb206851135dde8f86b3def38fdbee9e3c03475d35ae/pycares-4.11.0-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:3b44e54cad31d3c3be5e8149ac36bc1c163ec86e0664293402f6f846fb22ad00", size = 142053, upload-time = "2025-09-09T15:17:27.956Z" }, + { url = "https://files.pythonhosted.org/packages/74/23/d236fc4f134d6311e4ad6445571e8285e84a3e155be36422ff20c0fbe471/pycares-4.11.0-cp314-cp314-manylinux_2_28_aarch64.whl", hash = "sha256:80752133442dc7e6dd9410cec227c49f69283c038c316a8585cca05ec32c2766", size = 637878, upload-time = "2025-09-09T15:17:29.173Z" }, + { url = "https://files.pythonhosted.org/packages/f7/92/6edd41282b3f0e3d9defaba7b05c39730d51c37c165d9d3b319349c975aa/pycares-4.11.0-cp314-cp314-manylinux_2_28_ppc64le.whl", hash = "sha256:84b0b402dd333403fdce0e204aef1ef834d839c439c0c1aa143dc7d1237bb197", size = 687865, upload-time = "2025-09-09T15:17:30.549Z" }, + { url = "https://files.pythonhosted.org/packages/a7/a9/4d7cf4d72600fd47d9518f9ce99703a3e8711fb08d2ef63d198056cdc9a9/pycares-4.11.0-cp314-cp314-manylinux_2_28_s390x.whl", hash = "sha256:c0eec184df42fc82e43197e073f9cc8f93b25ad2f11f230c64c2dc1c80dbc078", size = 678396, upload-time = "2025-09-09T15:17:32.304Z" }, + { url = "https://files.pythonhosted.org/packages/0b/4b/e546eeb1d8ff6559e2e3bef31a6ea0c6e57ec826191941f83a3ce900ca89/pycares-4.11.0-cp314-cp314-manylinux_2_28_x86_64.whl", hash = "sha256:ee751409322ff10709ee867d5aea1dc8431eec7f34835f0f67afd016178da134", size = 640786, upload-time = "2025-09-09T15:17:33.602Z" }, + { url = "https://files.pythonhosted.org/packages/0e/f5/b4572d9ee9c26de1f8d1dc80730df756276b9243a6794fa3101bbe56613d/pycares-4.11.0-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:1732db81e348bfce19c9bf9448ba660aea03042eeeea282824da1604a5bd4dcf", size = 621857, upload-time = "2025-09-09T15:17:34.74Z" }, + { url = "https://files.pythonhosted.org/packages/17/f2/639090376198bcaeff86562b25e1bce05a481cfb1e605f82ce62285230cd/pycares-4.11.0-cp314-cp314-musllinux_1_2_ppc64le.whl", hash = "sha256:702d21823996f139874aba5aa9bb786d69e93bde6e3915b99832eb4e335d31ae", size = 670130, upload-time = "2025-09-09T15:17:35.982Z" }, + { url = "https://files.pythonhosted.org/packages/3a/c4/cf40773cd9c36a12cebbe1e9b6fb120f9160dc9bfe0398d81a20b6c69972/pycares-4.11.0-cp314-cp314-musllinux_1_2_s390x.whl", hash = "sha256:218619b912cef7c64a339ab0e231daea10c994a05699740714dff8c428b9694a", size = 653133, upload-time = "2025-09-09T15:17:37.179Z" }, + { url = "https://files.pythonhosted.org/packages/32/6b/06054d977b0a9643821043b59f523f3db5e7684c4b1b4f5821994d5fa780/pycares-4.11.0-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:719f7ddff024fdacde97b926b4b26d0cc25901d5ef68bb994a581c420069936d", size = 629344, upload-time = "2025-09-09T15:17:38.308Z" }, + { url = "https://files.pythonhosted.org/packages/d6/6f/14bb0c2171a286d512e3f02d6168e608ffe5f6eceab78bf63e3073091ae3/pycares-4.11.0-cp314-cp314-win32.whl", hash = "sha256:d552fb2cb513ce910d1dc22dbba6420758a991a356f3cd1b7ec73a9e31f94d01", size = 121804, upload-time = "2025-09-09T15:17:39.388Z" }, + { url = "https://files.pythonhosted.org/packages/24/dc/6822f9ad6941027f70e1cf161d8631456531a87061588ed3b1dcad07d49d/pycares-4.11.0-cp314-cp314-win_amd64.whl", hash = "sha256:23d50a0842e8dbdddf870a7218a7ab5053b68892706b3a391ecb3d657424d266", size = 148005, upload-time = "2025-09-09T15:17:40.44Z" }, + { url = "https://files.pythonhosted.org/packages/ea/24/24ff3a80aa8471fbb62785c821a8e90f397ca842e0489f83ebf7ee274397/pycares-4.11.0-cp314-cp314-win_arm64.whl", hash = "sha256:836725754c32363d2c5d15b931b3ebd46b20185c02e850672cb6c5f0452c1e80", size = 119239, upload-time = "2025-09-09T15:17:42.094Z" }, + { url = "https://files.pythonhosted.org/packages/54/fe/2f3558d298ff8db31d5c83369001ab72af3b86a0374d9b0d40dc63314187/pycares-4.11.0-cp314-cp314t-macosx_10_13_x86_64.whl", hash = "sha256:c9d839b5700542b27c1a0d359cbfad6496341e7c819c7fea63db9588857065ed", size = 146408, upload-time = "2025-09-09T15:17:43.74Z" }, + { url = "https://files.pythonhosted.org/packages/3c/c8/516901e46a1a73b3a75e87a35f3a3a4fe085f1214f37d954c9d7e782bd6d/pycares-4.11.0-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:31b85ad00422b38f426e5733a71dfb7ee7eb65a99ea328c508d4f552b1760dc8", size = 142371, upload-time = "2025-09-09T15:17:45.186Z" }, + { url = "https://files.pythonhosted.org/packages/ac/99/c3fba0aa575f331ebed91f87ba960ffbe0849211cdf103ab275bc0107ac6/pycares-4.11.0-cp314-cp314t-manylinux_2_28_aarch64.whl", hash = "sha256:cdac992206756b024b371760c55719eb5cd9d6b2cb25a8d5a04ae1b0ff426232", size = 647504, upload-time = "2025-09-09T15:17:46.503Z" }, + { url = "https://files.pythonhosted.org/packages/5c/e4/1cdc3ec9c92f8069ec18c58b016b2df7c44a088e2849f37ed457554961aa/pycares-4.11.0-cp314-cp314t-manylinux_2_28_ppc64le.whl", hash = "sha256:ffb22cee640bc12ee0e654eba74ecfb59e2e0aebc5bccc3cc7ef92f487008af7", size = 697122, upload-time = "2025-09-09T15:17:47.772Z" }, + { url = "https://files.pythonhosted.org/packages/9c/d5/bd8f370b97bb73e5bdd55dc2a78e18d6f49181cf77e88af0599d16f5c073/pycares-4.11.0-cp314-cp314t-manylinux_2_28_s390x.whl", hash = "sha256:00538826d2eaf4a0e4becb0753b0ac8d652334603c445c9566c9eb273657eb4c", size = 687543, upload-time = "2025-09-09T15:17:49.183Z" }, + { url = "https://files.pythonhosted.org/packages/33/38/49b77b9cf5dffc0b1fdd86656975c3bc1a58b79bdc883a9ef749b17a013c/pycares-4.11.0-cp314-cp314t-manylinux_2_28_x86_64.whl", hash = "sha256:29daa36548c04cdcd1a78ae187a4b7b003f0b357a2f4f1f98f9863373eedc759", size = 649565, upload-time = "2025-09-09T15:17:51.03Z" }, + { url = "https://files.pythonhosted.org/packages/3c/23/f6d57bfb99d00a6a7363f95c8d3a930fe82a868d9de24c64c8048d66f16a/pycares-4.11.0-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:cf306f3951740d7bed36149a6d8d656a7d5432dd4bbc6af3bb6554361fc87401", size = 631242, upload-time = "2025-09-09T15:17:52.298Z" }, + { url = "https://files.pythonhosted.org/packages/33/a2/7b9121c71cfe06a8474e221593f83a78176fae3b79e5853d2dfd13ab01cc/pycares-4.11.0-cp314-cp314t-musllinux_1_2_ppc64le.whl", hash = "sha256:386da2581db4ea2832629e275c061103b0be32f9391c5dfaea7f6040951950ad", size = 680304, upload-time = "2025-09-09T15:17:53.638Z" }, + { url = "https://files.pythonhosted.org/packages/5b/07/dfe76807f637d8b80e1a59dfc4a1bceabdd0205a45b2ebf78b415ae72af3/pycares-4.11.0-cp314-cp314t-musllinux_1_2_s390x.whl", hash = "sha256:45d3254a694459fdb0640ef08724ca9d4b4f6ff6d7161c9b526d7d2e2111379e", size = 661039, upload-time = "2025-09-09T15:17:55.024Z" }, + { url = "https://files.pythonhosted.org/packages/b2/9b/55d50c5acd46cbe95d0da27740a83e721d89c0ce7e42bff9891a9f29a855/pycares-4.11.0-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:eddf5e520bb88b23b04ac1f28f5e9a7c77c718b8b4af3a4a7a2cc4a600f34502", size = 637560, upload-time = "2025-09-09T15:17:56.492Z" }, + { url = "https://files.pythonhosted.org/packages/1f/79/2b2e723d1b929dbe7f99e80a56abb29a4f86988c1f73195d960d706b1629/pycares-4.11.0-cp314-cp314t-win32.whl", hash = "sha256:8a75a406432ce39ce0ca41edff7486df6c970eb0fe5cfbe292f195a6b8654461", size = 122235, upload-time = "2025-09-09T15:17:57.576Z" }, + { url = "https://files.pythonhosted.org/packages/93/fe/bf3b3ed9345a38092e72cd9890a5df5c2349fc27846a714d823a41f0ee27/pycares-4.11.0-cp314-cp314t-win_amd64.whl", hash = "sha256:3784b80d797bcc2ff2bf3d4b27f46d8516fe1707ff3b82c2580dc977537387f9", size = 148575, upload-time = "2025-09-09T15:17:58.699Z" }, + { url = "https://files.pythonhosted.org/packages/ce/20/c0c5cfcf89725fe533b27bc5f714dc4efa8e782bf697c36f9ddf04ba975d/pycares-4.11.0-cp314-cp314t-win_arm64.whl", hash = "sha256:afc6503adf8b35c21183b9387be64ca6810644ef54c9ef6c99d1d5635c01601b", size = 119690, upload-time = "2025-09-09T15:17:59.809Z" }, +] + [[package]] name = "pycparser" version = "2.22" @@ -3525,6 +3626,7 @@ dependencies = [ { name = "aiofiles" }, { name = "aiosqlite" }, { name = "akshare" }, + { name = "ccxt" }, { name = "crawl4ai" }, { name = "edgartools" }, { name = "fastapi" }, @@ -3580,6 +3682,7 @@ requires-dist = [ { name = "aiofiles", specifier = ">=24.1.0" }, { name = "aiosqlite", specifier = ">=0.19.0" }, { name = "akshare", specifier = ">=1.17.44" }, + { name = "ccxt", specifier = ">=4.5.15" }, { name = "crawl4ai", specifier = ">=0.7.4" }, { name = "diff-cover", marker = "extra == 'dev'", specifier = ">=9.0.0" }, { name = "edgartools", specifier = ">=4.12.2" }, diff --git a/python/valuecell/agents/strategy_agent/agent.py b/python/valuecell/agents/strategy_agent/agent.py index 37d6382ec..a1705b96e 100644 --- a/python/valuecell/agents/strategy_agent/agent.py +++ b/python/valuecell/agents/strategy_agent/agent.py @@ -1,21 +1,26 @@ from __future__ import annotations +import asyncio +from datetime import datetime from typing import AsyncGenerator, Dict, Optional +from loguru import logger + from valuecell.core.agent.responses import streaming from valuecell.core.types import BaseAgent, StreamResponse +from valuecell.server.services import strategy_persistence +from .models import ( + ComponentType, + StrategyStatus, + StrategyStatusContent, + UserRequest, +) +from .runtime import create_strategy_runtime -class StrategyAgent(BaseAgent): - """Minimal StrategyAgent entry for system integration. - - This is a placeholder agent that streams a short greeting and completes. - It can be extended to wire the Strategy Agent decision loop - (data -> features -> composer -> execution -> history/digest). - """ - def __init__(self, **kwargs): - super().__init__(**kwargs) +class StrategyAgent(BaseAgent): + """Top-level Strategy Agent integrating the decision coordinator.""" async def stream( self, @@ -24,8 +29,96 @@ async def stream( task_id: str, dependencies: Optional[Dict] = None, ) -> AsyncGenerator[StreamResponse, None]: - # Minimal streaming lifecycle: one message and done - yield streaming.message_chunk( - "StrategyAgent is online. Decision pipeline will be wired here." + try: + request = UserRequest.model_validate_json(query) + except ValueError as exc: + logger.exception("StrategyAgent received invalid payload") + yield streaming.message_chunk(str(exc)) + yield streaming.done() + return + + runtime = create_strategy_runtime(request) + strategy_id = runtime.strategy_id + logger.info( + "Created runtime for strategy_id={} conversation={} task={}", + strategy_id, + conversation_id, + task_id, + ) + initial_payload = StrategyStatusContent( + strategy_id=strategy_id, + status=StrategyStatus.RUNNING, + ) + yield streaming.component_generator( + content=initial_payload.model_dump_json(), + component_type=ComponentType.STATUS.value, ) - yield streaming.done() + + # Wait until strategy is marked as running in persistence layer + since = datetime.now() + while not strategy_persistence.strategy_running(strategy_id): + if (datetime.now() - since).total_seconds() > 300: + logger.error( + "Timeout waiting for strategy_id={} to be marked as running", + strategy_id, + ) + break + + await asyncio.sleep(1) + logger.info( + "Waiting for strategy_id={} to be marked as running", strategy_id + ) + + try: + logger.info("Starting decision loop for strategy_id={}", strategy_id) + while True: + if not strategy_persistence.strategy_running(strategy_id): + logger.info( + "Strategy_id={} is no longer running, exiting decision loop", + strategy_id, + ) + break + + result = await runtime.run_cycle() + logger.info( + "Run cycle completed for strategy={} trades_count={}", + strategy_id, + len(result.trades), + ) + # Persist and stream trades + for trade in result.trades: + item = strategy_persistence.persist_trade_history( + strategy_id, trade + ) + if item: + logger.info( + "Persisted trade {} for strategy={}", + getattr(trade, "trade_id", None), + strategy_id, + ) + + # Persist portfolio snapshot (positions) + ok = strategy_persistence.persist_portfolio_view(result.portfolio_view) + if ok: + logger.info( + "Persisted portfolio view for strategy={}", + strategy_id, + ) + + # Persist strategy summary + ok = strategy_persistence.persist_strategy_summary( + result.strategy_summary + ) + if ok: + logger.info( + "Persisted strategy summary for strategy={}", + strategy_id, + ) + + except asyncio.CancelledError: + raise + except Exception as err: # noqa: BLE001 + logger.exception("StrategyAgent stream failed: {}", err) + yield streaming.message_chunk(f"StrategyAgent error: {err}") + finally: + yield streaming.done() diff --git a/python/valuecell/agents/strategy_agent/constants.py b/python/valuecell/agents/strategy_agent/constants.py index 5b9efdbb8..37097ea94 100644 --- a/python/valuecell/agents/strategy_agent/constants.py +++ b/python/valuecell/agents/strategy_agent/constants.py @@ -9,3 +9,4 @@ DEFAULT_MAX_POSITIONS = 5 DEFAULT_MAX_SYMBOLS = 5 DEFAULT_MAX_LEVERAGE = 10.0 +DEFAULT_CAP_FACTOR = 1.5 diff --git a/python/valuecell/agents/strategy_agent/core.py b/python/valuecell/agents/strategy_agent/core.py index 6c9969f4d..758919eb9 100644 --- a/python/valuecell/agents/strategy_agent/core.py +++ b/python/valuecell/agents/strategy_agent/core.py @@ -1,6 +1,48 @@ from __future__ import annotations from abc import ABC, abstractmethod +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Callable, Dict, List + +from valuecell.utils.uuid import generate_uuid + +from .data.interfaces import MarketDataSource +from .decision.interfaces import Composer +from .execution.interfaces import ExecutionGateway +from .features.interfaces import FeatureComputer +from .models import ( + ComposeContext, + FeatureVector, + HistoryRecord, + PortfolioView, + StrategyStatus, + StrategySummary, + TradeDigest, + TradeHistoryEntry, + TradeInstruction, + TradeSide, + TradeType, + TxResult, + UserRequest, +) +from .portfolio.interfaces import PortfolioService +from .trading_history.interfaces import DigestBuilder, HistoryRecorder + + +@dataclass +class DecisionCycleResult: + """Outcome of a single decision cycle.""" + + compose_id: str + timestamp_ms: int + strategy_summary: StrategySummary + instructions: List[TradeInstruction] + trades: List[TradeHistoryEntry] + history_records: List[HistoryRecord] + digest: TradeDigest + portfolio_view: PortfolioView + # Core interfaces for orchestration and portfolio service. # Plain ABCs to avoid runtime dependencies on pydantic. Concrete implementations @@ -20,6 +62,402 @@ class DecisionCoordinator(ABC): """ @abstractmethod - def run_once(self) -> None: - """Execute one decision cycle.""" + async def run_once(self) -> DecisionCycleResult: + """Execute one decision cycle and return the result.""" raise NotImplementedError + + +def _default_clock() -> datetime: + """Return current time in UTC.""" + + return datetime.now(timezone.utc) + + +def _build_market_snapshot(features: List[FeatureVector]) -> Dict[str, float]: + """Derive latest market snapshot from feature vectors.""" + + snapshot: Dict[str, float] = {} + for vector in features: + price = vector.values.get("close") + if price is not None: + snapshot[vector.instrument.symbol] = float(price) + return snapshot + + +class DefaultDecisionCoordinator(DecisionCoordinator): + """Default implementation that wires the full decision pipeline.""" + + def __init__( + self, + *, + request: UserRequest, + strategy_id: str, + portfolio_service: PortfolioService, + market_data_source: MarketDataSource, + feature_computer: FeatureComputer, + composer: Composer, + execution_gateway: ExecutionGateway, + history_recorder: HistoryRecorder, + digest_builder: DigestBuilder, + prompt_provider: Callable[[UserRequest], str], + history_limit: int = 200, + ) -> None: + self._request = request + self.strategy_id = strategy_id + self._portfolio_service = portfolio_service + self._market_data_source = market_data_source + self._feature_computer = feature_computer + self._composer = composer + self._execution_gateway = execution_gateway + self._history_recorder = history_recorder + self._digest_builder = digest_builder + self._history_limit = max(history_limit, 1) + self._symbols = list(dict.fromkeys(request.trading_config.symbols)) + # prompt_provider is a required parameter (caller must supply a prompt builder) + self._prompt_provider = prompt_provider + # Use the default clock internally; clock is not a constructor parameter + self._clock = _default_clock + self._history_records: List[HistoryRecord] = [] + self._realized_pnl: float = 0.0 + self._unrealized_pnl: float = 0.0 + self._cycle_index: int = 0 + self._strategy_name = request.trading_config.strategy_name or strategy_id + + async def run_once(self) -> DecisionCycleResult: + timestamp_ms = int(self._clock().timestamp() * 1000) + compose_id = generate_uuid("compose") + + portfolio = self._portfolio_service.get_view() + # Use fixed 1-minute interval and lookback of 4 hours (60 * 4 minutes) + candles = await self._market_data_source.get_recent_candles( + self._symbols, "1m", 60 * 4 + ) + features = self._feature_computer.compute_features(candles=candles) + market_snapshot = _build_market_snapshot(features) + digest = self._digest_builder.build(list(self._history_records)) + + context = ComposeContext( + ts=timestamp_ms, + compose_id=compose_id, + strategy_id=self.strategy_id, + features=features, + portfolio=portfolio, + digest=digest, + prompt_text=self._prompt_provider(self._request), + market_snapshot=market_snapshot, + ) + + instructions = await self._composer.compose(context) + # Execute instructions via async gateway to obtain execution results + tx_results = await self._execution_gateway.execute( + instructions, market_snapshot + ) + + trades = self._create_trades(tx_results, compose_id, timestamp_ms) + self._portfolio_service.apply_trades(trades, market_snapshot) + summary = self._build_summary(timestamp_ms, trades) + + history_records = self._create_history_records( + timestamp_ms, compose_id, features, instructions, trades, summary + ) + + for record in history_records: + self._history_recorder.record(record) + + self._history_records.extend(history_records) + if len(self._history_records) > self._history_limit: + self._history_records = self._history_records[-self._history_limit :] + + digest = self._digest_builder.build(list(self._history_records)) + self._cycle_index += 1 + + portfolio = self._portfolio_service.get_view() + return DecisionCycleResult( + compose_id=compose_id, + timestamp_ms=timestamp_ms, + strategy_summary=summary, + instructions=instructions, + trades=trades, + history_records=history_records, + digest=digest, + portfolio_view=portfolio, + ) + + def _create_trades( + self, + tx_results: List[TxResult], + compose_id: str, + timestamp_ms: int, + ) -> List[TradeHistoryEntry]: + trades: List[TradeHistoryEntry] = [] + # Current portfolio view (pre-apply) used to detect closes + try: + pre_view = self._portfolio_service.get_view() + except Exception: + pre_view = None + + for tx in tx_results: + qty = float(tx.filled_qty or 0.0) + price = float(tx.avg_exec_price or 0.0) + notional = (price * qty) if price and qty else None + # Immediate realized effect: fees are costs (negative PnL). Slippage already baked into exec price. + fee = float(tx.fee_cost or 0.0) + realized_pnl = -fee if notional else None + + # Determine if this trade fully closes an existing position for this symbol + prev_pos = None + prev_qty = 0.0 + try: + if pre_view is not None: + prev_pos = pre_view.positions.get(tx.instrument.symbol) + prev_qty = float(prev_pos.quantity) if prev_pos is not None else 0.0 + except Exception: + prev_pos = None + prev_qty = 0.0 + + eps = 1e-12 + is_full_close = False + close_units = 0.0 + pos_dir_type: TradeType | None = None + if prev_pos is not None: + if prev_qty > 0 and tx.side == TradeSide.SELL: + close_units = min(qty, abs(prev_qty)) + is_full_close = close_units >= abs(prev_qty) - eps + pos_dir_type = TradeType.LONG + elif prev_qty < 0 and tx.side == TradeSide.BUY: + close_units = min(qty, abs(prev_qty)) + is_full_close = close_units >= abs(prev_qty) - eps + pos_dir_type = TradeType.SHORT + + if ( + is_full_close + and prev_pos is not None + and prev_pos.avg_price is not None + ): + # Build a completed trade that ties back to the original open (avg_price/entry_ts) + entry_px = float(prev_pos.avg_price or 0.0) + entry_ts_prev = int(prev_pos.entry_ts) if prev_pos.entry_ts else None + exit_px = price or None + exit_ts = timestamp_ms + qty_closed = float(close_units or 0.0) + # Realized PnL on close (exclude prior fees; subtract this tx fee) + core_pnl = None + if entry_px and exit_px and qty_closed: + if pos_dir_type == TradeType.LONG: + core_pnl = (float(exit_px) - float(entry_px)) * qty_closed + else: # SHORT + core_pnl = (float(entry_px) - float(exit_px)) * qty_closed + realized_pnl = core_pnl if core_pnl is not None else None + if realized_pnl is not None: + realized_pnl = float(realized_pnl) - fee + notional_entry = ( + (qty_closed * entry_px) if entry_px and qty_closed else None + ) + notional_exit = ( + (qty_closed * float(exit_px)) if exit_px and qty_closed else None + ) + realized_pnl_pct = ( + (realized_pnl / notional_entry) + if realized_pnl is not None and notional_entry + else None + ) + + trade = TradeHistoryEntry( + trade_id=generate_uuid("trade"), + compose_id=compose_id, + instruction_id=tx.instruction_id, + strategy_id=self.strategy_id, + instrument=tx.instrument, + side=tx.side, + type=pos_dir_type + or ( + TradeType.LONG if tx.side == TradeSide.BUY else TradeType.SHORT + ), + quantity=qty_closed or qty, + entry_price=entry_px or None, + exit_price=exit_px, + notional_entry=notional_entry, + notional_exit=notional_exit, + entry_ts=entry_ts_prev or timestamp_ms, + exit_ts=exit_ts, + trade_ts=timestamp_ms, + holding_ms=(exit_ts - entry_ts_prev) if entry_ts_prev else None, + realized_pnl=realized_pnl, + realized_pnl_pct=realized_pnl_pct, + # For a full close, reflect the leverage of the closed position, not the closing instruction + leverage=( + float(prev_pos.leverage) + if getattr(prev_pos, "leverage", None) is not None + else tx.leverage + ), + fee_cost=fee or None, + note=(tx.meta.get("rationale") if tx.meta else None), + ) + else: + # Default behavior for opens/increases/reductions that are not full closes + trade = TradeHistoryEntry( + trade_id=generate_uuid("trade"), + compose_id=compose_id, + instruction_id=tx.instruction_id, + strategy_id=self.strategy_id, + instrument=tx.instrument, + side=tx.side, + type=( + TradeType.LONG if tx.side == TradeSide.BUY else TradeType.SHORT + ), + quantity=qty, + entry_price=price or None, + exit_price=None, + notional_entry=notional or None, + notional_exit=None, + entry_ts=timestamp_ms, + exit_ts=None, + trade_ts=timestamp_ms, + holding_ms=None, + realized_pnl=realized_pnl, + realized_pnl_pct=( + ((realized_pnl or 0.0) / notional) if notional else None + ), + leverage=tx.leverage, + fee_cost=fee or None, + note=(tx.meta.get("rationale") if tx.meta else None), + ) + + # If reducing/closing but not a full close, try to annotate the most recent open trade + is_closing = prev_pos is not None and ( + (prev_qty > 0 and tx.side == TradeSide.SELL) + or (prev_qty < 0 and tx.side == TradeSide.BUY) + ) + if is_closing and not is_full_close: + # scan history records (most recent first) to find an open trade for this symbol + paired_id = None + for record in reversed(self._history_records): + if record.kind != "execution": + continue + trades_payload = record.payload.get("trades", []) or [] + # iterate trades in reverse to find latest + for t in reversed(trades_payload): + try: + inst = t.get("instrument") or {} + if inst.get("symbol") != tx.instrument.symbol: + continue + # consider open if no exit_ts or exit_price present + if not t.get("exit_ts") and not t.get("exit_price"): + # annotate this historic trade dict with exit fields + t["exit_price"] = float(price) if price else None + t["exit_ts"] = timestamp_ms + entry_ts_prev = t.get("entry_ts") or t.get("trade_ts") + if entry_ts_prev: + try: + t["holding_ms"] = int( + timestamp_ms - int(entry_ts_prev) + ) + except Exception: + t["holding_ms"] = None + t["notional_exit"] = ( + float(price * qty) if price and qty else None + ) + paired_id = t.get("trade_id") + break + except Exception: + continue + if paired_id: + break + + # if we found a paired trade, record the pairing in the new trade's note + if paired_id: + # preserve LLM rationale (if any) and append pairing info + existing = trade.note or "" + suffix = f"paired_exit_of:{paired_id}" + trade.note = f"{existing} {suffix}".strip() + + trades.append(trade) + return trades + + def _build_summary( + self, + timestamp_ms: int, + trades: List[TradeHistoryEntry], + ) -> StrategySummary: + realized_delta = sum(trade.realized_pnl or 0.0 for trade in trades) + self._realized_pnl += realized_delta + # Prefer authoritative unrealized PnL from the portfolio view when available. + try: + view = self._portfolio_service.get_view() + unrealized = float(view.total_unrealized_pnl or 0.0) + equity = float(view.total_value or 0.0) + except Exception: + # Fallback to internal tracking if portfolio service is unavailable + unrealized = float(self._unrealized_pnl or 0.0) + equity = float(self._request.trading_config.initial_capital or 0.0) + + # Keep internal state in sync (allow negative unrealized PnL) + self._unrealized_pnl = float(unrealized) + + initial_capital = self._request.trading_config.initial_capital or 0.0 + pnl_pct = ( + (self._realized_pnl + self._unrealized_pnl) / initial_capital + if initial_capital + else None + ) + + # Strategy-level unrealized percent: percent of equity (if equity is available) + unrealized_pnl_pct = (self._unrealized_pnl / equity * 100.0) if equity else None + + return StrategySummary( + strategy_id=self.strategy_id, + name=self._strategy_name, + model_provider=self._request.llm_model_config.provider, + model_id=self._request.llm_model_config.model_id, + exchange_id=self._request.exchange_config.exchange_id, + mode=self._request.exchange_config.trading_mode, + status=StrategyStatus.RUNNING, + realized_pnl=self._realized_pnl, + unrealized_pnl=self._unrealized_pnl, + unrealized_pnl_pct=unrealized_pnl_pct, + pnl_pct=pnl_pct, + last_updated_ts=timestamp_ms, + ) + + def _create_history_records( + self, + timestamp_ms: int, + compose_id: str, + features: List[FeatureVector], + instructions: List[TradeInstruction], + trades: List[TradeHistoryEntry], + summary: StrategySummary, + ) -> List[HistoryRecord]: + feature_payload = [vector.model_dump(mode="json") for vector in features] + instruction_payload = [inst.model_dump(mode="json") for inst in instructions] + trade_payload = [trade.model_dump(mode="json") for trade in trades] + + return [ + HistoryRecord( + ts=timestamp_ms, + kind="features", + reference_id=compose_id, + payload={"features": feature_payload}, + ), + HistoryRecord( + ts=timestamp_ms, + kind="compose", + reference_id=compose_id, + payload={ + "prompt": self._prompt_provider(self._request), + "summary": summary.model_dump(mode="json"), + }, + ), + HistoryRecord( + ts=timestamp_ms, + kind="instructions", + reference_id=compose_id, + payload={"instructions": instruction_payload}, + ), + HistoryRecord( + ts=timestamp_ms, + kind="execution", + reference_id=compose_id, + payload={"trades": trade_payload}, + ), + ] diff --git a/python/valuecell/agents/strategy_agent/data/interfaces.py b/python/valuecell/agents/strategy_agent/data/interfaces.py index 3899ab5ba..31ac1d406 100644 --- a/python/valuecell/agents/strategy_agent/data/interfaces.py +++ b/python/valuecell/agents/strategy_agent/data/interfaces.py @@ -19,7 +19,7 @@ class MarketDataSource(ABC): """ @abstractmethod - def get_recent_candles( + async def get_recent_candles( self, symbols: List[str], interval: str, lookback: int ) -> List[Candle]: """Return recent candles (OHLCV) for the given symbols/interval. diff --git a/python/valuecell/agents/strategy_agent/data/market.py b/python/valuecell/agents/strategy_agent/data/market.py index e69de29bb..bf2b1f279 100644 --- a/python/valuecell/agents/strategy_agent/data/market.py +++ b/python/valuecell/agents/strategy_agent/data/market.py @@ -0,0 +1,81 @@ +from typing import Dict, List, Optional + +import ccxt.pro as ccxtpro +from loguru import logger + +from ..models import Candle, InstrumentRef +from .interfaces import MarketDataSource + + +class SimpleMarketDataSource(MarketDataSource): + """Generates synthetic candle data for each symbol or fetches via ccxt.pro. + + If `exchange_id` was provided at construction time and `ccxt.pro` is + available, this class will attempt to fetch OHLCV data from the + specified exchange. If any error occurs (missing library, unknown + exchange, network error), it falls back to the built-in synthetic + generator so the runtime remains functional in tests and offline. + """ + + def __init__( + self, + exchange_id: Optional[str] = None, + ccxt_options: Optional[Dict] = None, + ) -> None: + self._exchange_id = exchange_id or "binance" + self._ccxt_options = ccxt_options or {} + + async def get_recent_candles( + self, symbols: List[str], interval: str, lookback: int + ) -> List[Candle]: + async def _fetch(symbol: str) -> List[List]: + # instantiate exchange class by name (e.g., ccxtpro.kraken) + exchange_cls = getattr(ccxtpro, self._exchange_id, None) + if exchange_cls is None: + raise RuntimeError( + f"Exchange '{self._exchange_id}' not found in ccxt.pro" + ) + exchange = exchange_cls({"newUpdates": False, **self._ccxt_options}) + try: + # ccxt.pro uses async fetch_ohlcv + data = await exchange.fetch_ohlcv( + symbol, timeframe=interval, since=None, limit=lookback + ) + return data + finally: + try: + await exchange.close() + except Exception: + pass + + candles: List[Candle] = [] + # Run fetch for each symbol sequentially + for symbol in symbols: + try: + raw = await _fetch(symbol) + # raw is list of [ts, open, high, low, close, volume] + for row in raw: + ts, open_v, high_v, low_v, close_v, vol = row + candles.append( + Candle( + ts=int(ts), + instrument=InstrumentRef( + symbol=symbol, + exchange_id=self._exchange_id, + quote_ccy="USD", + ), + open=float(open_v), + high=float(high_v), + low=float(low_v), + close=float(close_v), + volume=float(vol), + interval=interval, + ) + ) + except Exception: + logger.exception( + "Failed to fetch candles for {} from {}, return empty candles", + symbol, + self._exchange_id, + ) + return candles diff --git a/python/valuecell/agents/strategy_agent/decision/composer.py b/python/valuecell/agents/strategy_agent/decision/composer.py index e69de29bb..3ea827ad1 100644 --- a/python/valuecell/agents/strategy_agent/decision/composer.py +++ b/python/valuecell/agents/strategy_agent/decision/composer.py @@ -0,0 +1,553 @@ +from __future__ import annotations + +import json +import math +from typing import Dict, List, Optional + +from loguru import logger +from pydantic import ValidationError + +from ..models import ( + ComposeContext, + Constraints, + LlmDecisionAction, + LlmPlanProposal, + PriceMode, + TradeInstruction, + TradeSide, + UserRequest, +) +from .interfaces import Composer + + +class LlmComposer(Composer): + """LLM-driven composer that turns context into trade instructions. + + The core flow follows the README design: + 1. Build a serialized prompt from the compose context (features, portfolio, + digest, prompt text, market snapshot, constraints). + 2. Call an LLM to obtain an :class:`LlmPlanProposal` (placeholder method). + 3. Normalize the proposal into executable :class:`TradeInstruction` objects, + applying guardrails based on context constraints and trading config. + + The `_call_llm` method is intentionally left unimplemented so callers can + supply their own integration. Override it in a subclass or monkeypatch at + runtime. The method should accept a string prompt and return an instance of + :class:`LlmPlanProposal` (validated via Pydantic). + """ + + def __init__( + self, + request: UserRequest, + *, + default_slippage_bps: int = 25, + quantity_precision: float = 1e-9, + ) -> None: + self._request = request + self._default_slippage_bps = default_slippage_bps + self._quantity_precision = quantity_precision + + async def compose(self, context: ComposeContext) -> List[TradeInstruction]: + prompt = self._build_llm_prompt(context) + logger.debug( + "Built LLM prompt for compose_id={}: {}", + context.compose_id, + prompt, + ) + try: + plan = await self._call_llm(prompt) + except ValidationError as exc: + logger.error("LLM output failed validation: {}", exc) + return [] + except Exception: # noqa: BLE001 + logger.exception("LLM invocation failed") + return [] + + if not plan.items: + logger.debug( + "LLM returned empty plan for compose_id={}", context.compose_id + ) + return [] + + return self._normalize_plan(context, plan) + + # ------------------------------------------------------------------ + # Prompt + LLM helpers + + def _build_llm_prompt(self, context: ComposeContext) -> str: + """Serialize compose context into a textual prompt for the LLM.""" + + payload = { + "strategy_prompt": context.prompt_text, + "compose_id": context.compose_id, + "timestamp": context.ts, + "portfolio": context.portfolio.model_dump(mode="json"), + "market_snapshot": context.market_snapshot or {}, + "digest": context.digest.model_dump(mode="json"), + "features": [vector.model_dump(mode="json") for vector in context.features], + # Constraints live on the portfolio view; prefer typed model_dump when present + "constraints": ( + context.portfolio.constraints.model_dump(mode="json", exclude_none=True) + if context.portfolio and context.portfolio.constraints + else {} + ), + } + + instructions = ( + "You are a trading strategy planner. Analyze the JSON context and " + "produce a structured plan that aligns with the LlmPlanProposal " + "schema (items array with instrument, action, target_qty, rationale, " + "confidence). Focus on risk-aware, executable decisions." + ) + + return f"{instructions}\n\nContext:\n{json.dumps(payload, ensure_ascii=False, indent=2)}" + + async def _call_llm(self, prompt: str) -> LlmPlanProposal: + """Invoke an LLM asynchronously and parse the response into LlmPlanProposal. + + This implementation follows the parser_agent pattern: it creates a model + via `create_model_with_provider`, wraps it in an `agno.agent.Agent` with + `output_schema=LlmPlanProposal`, and awaits `agent.arun(prompt)`. The + agent's `response.content` is returned (or validated) as a + `LlmPlanProposal`. + """ + + from agno.agent import Agent as AgnoAgent + + from valuecell.utils.model import create_model_with_provider + + cfg = self._request.llm_model_config + model = create_model_with_provider( + provider=cfg.provider, + model_id=cfg.model_id, + api_key=cfg.api_key, + ) + + # Wrap model in an Agent (consistent with parser_agent usage) + agent = AgnoAgent(model=model, output_schema=LlmPlanProposal, markdown=False) + response = await agent.arun(prompt) + content = getattr(response, "content", None) or response + logger.debug("Received LLM response {}", content) + return content + + # ------------------------------------------------------------------ + # Normalization / guardrails helpers + + def _init_buying_power_context( + self, + context: ComposeContext, + ) -> tuple: + """Initialize buying power tracking context. + + Returns: + (equity, allowed_lev, constraints, projected_gross, price_map) + """ + constraints = context.portfolio.constraints or Constraints( + max_positions=self._request.trading_config.max_positions, + max_leverage=self._request.trading_config.max_leverage, + ) + + # Compute equity (prefer total_value, fallback to cash + net_exposure) + if getattr(context.portfolio, "total_value", None) is not None: + equity = float(context.portfolio.total_value or 0.0) + else: + cash = float(getattr(context.portfolio, "cash", 0.0) or 0.0) + net = float(getattr(context.portfolio, "net_exposure", 0.0) or 0.0) + equity = cash + net + + allowed_lev = ( + float(constraints.max_leverage) + if constraints.max_leverage is not None + else 1.0 + ) + + # Initialize projected gross exposure + price_map = context.market_snapshot or {} + if getattr(context.portfolio, "gross_exposure", None) is not None: + projected_gross = float(context.portfolio.gross_exposure or 0.0) + else: + projected_gross = 0.0 + for sym, snap in context.portfolio.positions.items(): + px = float( + price_map.get(sym) or getattr(snap, "mark_price", 0.0) or 0.0 + ) + projected_gross += abs(float(snap.quantity)) * px + + return equity, allowed_lev, constraints, projected_gross, price_map + + def _normalize_quantity( + self, + symbol: str, + quantity: float, + side: TradeSide, + current_qty: float, + constraints: Constraints, + equity: float, + allowed_lev: float, + projected_gross: float, + price_map: Dict[str, float], + ) -> tuple: + """Normalize quantity through all guardrails: filters, caps, and buying power. + + Returns: + (final_qty, consumed_buying_power_delta) + """ + qty = quantity + + # Step 1: per-order filters (step size, min notional, max order qty) + qty = self._apply_quantity_filters( + symbol, + qty, + float(constraints.quantity_step or 0.0), + float(constraints.min_trade_qty or 0.0), + constraints.max_order_qty, + constraints.min_notional, + price_map, + ) + + if qty <= self._quantity_precision: + logger.debug( + "Post-filter quantity for {} is {} <= precision {} -> skipping", + symbol, + qty, + self._quantity_precision, + ) + return 0.0, 0.0 + + # Step 2: notional/leverage cap (Phase 1 rules) + price = price_map.get(symbol) + if price is not None and price > 0: + # cap_factor controls how aggressively we allow position sizing by notional. + # Make it configurable via trading_config.cap_factor (strategy parameter). + cap_factor = float( + getattr(self._request.trading_config, "cap_factor", 1.5) or 1.5 + ) + if constraints.quantity_step and constraints.quantity_step > 0: + cap_factor = max(cap_factor, 1.5) + + allowed_lev_cap = ( + allowed_lev if math.isfinite(allowed_lev) else float("inf") + ) + max_abs_by_factor = (cap_factor * equity) / float(price) + max_abs_by_lev = (allowed_lev_cap * equity) / float(price) + max_abs_final = min(max_abs_by_factor, max_abs_by_lev) + + desired_final = current_qty + (qty if side is TradeSide.BUY else -qty) + if math.isfinite(max_abs_final) and abs(desired_final) > max_abs_final: + target_abs = max_abs_final + new_qty = max(0.0, target_abs - abs(current_qty)) + if new_qty < qty: + logger.debug( + "Capping {} qty due to notional/leverage (price={}, cap_factor={}, old_qty={}, new_qty={})", + symbol, + price, + cap_factor, + qty, + new_qty, + ) + qty = new_qty + + if qty <= self._quantity_precision: + logger.debug( + "Post-cap quantity for {} is {} <= precision {} -> skipping", + symbol, + qty, + self._quantity_precision, + ) + return 0.0, 0.0 + + # Step 3: buying power clamp + px = price_map.get(symbol) + if px is None or px <= 0: + logger.debug( + "No price for {} to evaluate buying power; using full quantity", + symbol, + ) + final_qty = qty + else: + avail_bp = max(0.0, equity * allowed_lev - projected_gross) + # When buying power is exhausted, we should still allow reductions/closures. + # Set additional purchasable units to 0 but proceed with piecewise logic + # so that de-risking trades are not blocked. + a = abs(current_qty) + # Conservative buffer for expected slippage: assume execution price may move + # against us by `self._default_slippage_bps`. Use a higher effective price + # when computing how many units fit into available buying power so that + # planned increases don't accidentally exceed real-world costs. + slip_bps = float(self._default_slippage_bps or 0.0) + slip = slip_bps / 10000.0 + effective_px = float(px) * (1.0 + slip) + ap_units = (avail_bp / effective_px) if avail_bp > 0 else 0.0 + + # Piecewise: additional gross consumption must fit into available BP + if side is TradeSide.BUY: + if current_qty >= 0: + q_allowed = ap_units + else: + if qty <= 2 * a: + q_allowed = qty + else: + q_allowed = 2 * a + ap_units + else: # SELL + if current_qty <= 0: + q_allowed = ap_units + else: + if qty <= 2 * a: + q_allowed = qty + else: + q_allowed = 2 * a + ap_units + + final_qty = max(0.0, min(qty, q_allowed)) + + if final_qty <= self._quantity_precision: + logger.debug( + "Post-buying-power quantity for {} is {} <= precision {} -> skipping", + symbol, + final_qty, + self._quantity_precision, + ) + return 0.0, 0.0 + + # Compute consumed buying power delta + abs_before = abs(current_qty) + abs_after = abs( + current_qty + (final_qty if side is TradeSide.BUY else -final_qty) + ) + delta_abs = abs_after - abs_before + consumed_bp_delta = ( + delta_abs * price_map.get(symbol, 0.0) if delta_abs > 0 else 0.0 + ) + + return final_qty, consumed_bp_delta + + def _normalize_plan( + self, + context: ComposeContext, + plan: LlmPlanProposal, + ) -> List[TradeInstruction]: + instructions: List[TradeInstruction] = [] + + # --- prepare state --- + projected_positions: Dict[str, float] = { + symbol: snapshot.quantity + for symbol, snapshot in context.portfolio.positions.items() + } + + def _count_active(pos_map: Dict[str, float]) -> int: + return sum(1 for q in pos_map.values() if abs(q) > self._quantity_precision) + + active_positions = _count_active(projected_positions) + + # Initialize buying power context + equity, allowed_lev, constraints, projected_gross, price_map = ( + self._init_buying_power_context(context) + ) + + max_positions = constraints.max_positions + max_position_qty = constraints.max_position_qty + + # --- process each planned item --- + for idx, item in enumerate(plan.items): + symbol = item.instrument.symbol + current_qty = projected_positions.get(symbol, 0.0) + + # determine the intended target quantity (clamped by max_position_qty) + target_qty = self._resolve_target_quantity( + item, current_qty, max_position_qty + ) + # Enforce: single-lot per symbol and no direct flip. If target flips side, + # split into two sub-steps: first flat to 0, then open to target side. + sub_targets: List[float] = [] + if current_qty * target_qty < 0: + sub_targets = [0.0, float(target_qty)] + else: + sub_targets = [float(target_qty)] + + local_current = float(current_qty) + for sub_i, sub_target in enumerate(sub_targets): + delta = sub_target - local_current + + if abs(delta) <= self._quantity_precision: + continue + + is_new_position = ( + abs(local_current) <= self._quantity_precision + and abs(sub_target) > self._quantity_precision + ) + if ( + is_new_position + and max_positions is not None + and active_positions >= int(max_positions) + ): + logger.warning( + "Skipping symbol {} due to max_positions constraint (active={} max={})", + symbol, + active_positions, + max_positions, + ) + continue + + side = TradeSide.BUY if delta > 0 else TradeSide.SELL + # requested leverage (default 1.0), clamped to constraints + requested_lev = ( + float(item.leverage) + if getattr(item, "leverage", None) is not None + else 1.0 + ) + allowed_lev_item = ( + float(constraints.max_leverage) + if constraints.max_leverage is not None + else requested_lev + ) + final_leverage = max(1.0, min(requested_lev, allowed_lev_item)) + quantity = abs(delta) + + # Normalize quantity through all guardrails + quantity, consumed_bp = self._normalize_quantity( + symbol, + quantity, + side, + local_current, + constraints, + equity, + allowed_lev, + projected_gross, + price_map, + ) + + if quantity <= self._quantity_precision: + continue + + # Update projected positions for subsequent guardrails + signed_delta = quantity if side is TradeSide.BUY else -quantity + projected_positions[symbol] = local_current + signed_delta + projected_gross += consumed_bp + + # active positions accounting + if is_new_position: + active_positions += 1 + if abs(projected_positions[symbol]) <= self._quantity_precision: + active_positions = max(active_positions - 1, 0) + + # Use a stable per-item sub-index to keep instruction ids unique + instr = self._create_instruction( + context, + idx * 10 + sub_i, + item, + symbol, + side, + quantity, + final_leverage, + local_current, + sub_target, + ) + instructions.append(instr) + + # advance local_current for the next sub-step + local_current = projected_positions[symbol] + + return instructions + + def _create_instruction( + self, + context: ComposeContext, + idx: int, + item, + symbol: str, + side: TradeSide, + quantity: float, + final_leverage: float, + current_qty: float, + target_qty: float, + ) -> TradeInstruction: + """Create a normalized TradeInstruction with metadata.""" + final_target = current_qty + (quantity if side is TradeSide.BUY else -quantity) + meta = { + "requested_target_qty": target_qty, + "current_qty": current_qty, + "final_target_qty": final_target, + "action": item.action.value, + } + if item.confidence is not None: + meta["confidence"] = item.confidence + if item.rationale: + meta["rationale"] = item.rationale + + instruction = TradeInstruction( + instruction_id=f"{context.compose_id}:{symbol}:{idx}", + compose_id=context.compose_id, + instrument=item.instrument, + side=side, + quantity=quantity, + leverage=final_leverage, + price_mode=PriceMode.MARKET, + limit_price=None, + max_slippage_bps=self._default_slippage_bps, + meta=meta, + ) + logger.debug( + "Created TradeInstruction {} for {} side={} qty={} lev={}", + instruction.instruction_id, + symbol, + instruction.side, + instruction.quantity, + final_leverage, + ) + return instruction + + def _resolve_target_quantity( + self, + item, + current_qty: float, + max_position_qty: Optional[float], + ) -> float: + # If the composer requested NOOP, keep current quantity + if item.action == LlmDecisionAction.NOOP: + return current_qty + + # Interpret target_qty as a magnitude; apply action to determine sign + mag = float(item.target_qty) + if item.action == LlmDecisionAction.SELL: + target = -abs(mag) + else: + # default to BUY semantics + target = abs(mag) + + if max_position_qty is not None: + max_abs = abs(float(max_position_qty)) + target = max(-max_abs, min(max_abs, target)) + + return target + + def _apply_quantity_filters( + self, + symbol: str, + quantity: float, + quantity_step: float, + min_trade_qty: float, + max_order_qty: Optional[float], + min_notional: Optional[float], + market_snapshot: Dict[str, float], + ) -> float: + qty = quantity + + if max_order_qty is not None: + qty = min(qty, float(max_order_qty)) + + if quantity_step > 0: + qty = math.floor(qty / quantity_step) * quantity_step + + if qty <= 0: + return 0.0 + + if qty < min_trade_qty: + return 0.0 + + if min_notional is not None: + price = market_snapshot.get(symbol) + if price is None: + return 0.0 + if qty * price < float(min_notional): + return 0.0 + + return qty diff --git a/python/valuecell/agents/strategy_agent/decision/interfaces.py b/python/valuecell/agents/strategy_agent/decision/interfaces.py index 41568bd05..fc3b833a2 100644 --- a/python/valuecell/agents/strategy_agent/decision/interfaces.py +++ b/python/valuecell/agents/strategy_agent/decision/interfaces.py @@ -17,10 +17,11 @@ class Composer(ABC): """ @abstractmethod - def compose(self, context: ComposeContext) -> List[TradeInstruction]: + async def compose(self, context: ComposeContext) -> List[TradeInstruction]: """Produce normalized trade instructions given the current context. - Call the LLM, parse/validate output, apply guardrails (limits, step size, - min notional, cool-down), and return executable instructions. - Any optional auditing metadata should be recorded via HistoryRecorder. + + This method is async because LLM providers and agent wrappers are often + asynchronous. Implementations should perform any network/IO and return + a validated list of TradeInstruction objects. """ raise NotImplementedError diff --git a/python/valuecell/agents/strategy_agent/decision/system_prompt.py b/python/valuecell/agents/strategy_agent/decision/system_prompt.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/python/valuecell/agents/strategy_agent/decision/validator.py b/python/valuecell/agents/strategy_agent/decision/validator.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/python/valuecell/agents/strategy_agent/execution/interfaces.py b/python/valuecell/agents/strategy_agent/execution/interfaces.py index c1e745bec..ce665372b 100644 --- a/python/valuecell/agents/strategy_agent/execution/interfaces.py +++ b/python/valuecell/agents/strategy_agent/execution/interfaces.py @@ -1,9 +1,9 @@ from __future__ import annotations from abc import ABC, abstractmethod -from typing import List +from typing import Dict, List, Optional -from ..models import TradeInstruction +from ..models import TradeInstruction, TxResult # Contracts for execution gateways (module-local abstract interfaces). # An implementation may route to a real exchange or a paper broker. @@ -13,10 +13,17 @@ class ExecutionGateway(ABC): """Executes normalized trade instructions against an exchange/broker.""" @abstractmethod - def execute(self, instructions: List[TradeInstruction]) -> None: - """Submit the provided instructions for execution. - Implementors may be synchronous or asynchronous. At this stage we - do not model order/fill/cancel lifecycles. + async def execute( + self, + instructions: List[TradeInstruction], + market_snapshot: Optional[Dict[str, float]] = None, + ) -> List[TxResult]: + """Execute the provided instructions and return TxResult items. + + Notes: + - Implementations may simulate fills (paper) or submit to a real exchange. + - market_snapshot is optional context for pricing simulations. + - Lifecycle (partial fills, cancels) can be represented with PARTIAL/REJECTED. """ raise NotImplementedError diff --git a/python/valuecell/agents/strategy_agent/execution/paper_trading.py b/python/valuecell/agents/strategy_agent/execution/paper_trading.py index e69de29bb..a6dfbcec9 100644 --- a/python/valuecell/agents/strategy_agent/execution/paper_trading.py +++ b/python/valuecell/agents/strategy_agent/execution/paper_trading.py @@ -0,0 +1,54 @@ +from typing import Dict, List, Optional + +from ..models import TradeInstruction, TradeSide, TxResult +from .interfaces import ExecutionGateway + + +class PaperExecutionGateway(ExecutionGateway): + """Async paper executor that simulates fills with slippage and fees. + + - Uses instruction.max_slippage_bps to compute execution price around snapshot. + - Applies a flat fee_bps to notional to produce fee_cost. + - Marks orders as FILLED with filled_qty=requested quantity. + """ + + def __init__(self, fee_bps: float = 10.0) -> None: + self._fee_bps = float(fee_bps) + self.executed: List[TradeInstruction] = [] + + async def execute( + self, + instructions: List[TradeInstruction], + market_snapshot: Optional[Dict[str, float]] = None, + ) -> List[TxResult]: + results: List[TxResult] = [] + price_map = market_snapshot or {} + for inst in instructions: + self.executed.append(inst) + ref_price = float(price_map.get(inst.instrument.symbol, 0.0) or 0.0) + slip_bps = float(inst.max_slippage_bps or 0.0) + slip = slip_bps / 10_000.0 + if inst.side == TradeSide.BUY: + exec_price = ref_price * (1.0 + slip) + else: + exec_price = ref_price * (1.0 - slip) + + notional = exec_price * float(inst.quantity) + fee_cost = notional * (self._fee_bps / 10_000.0) if notional else 0.0 + + results.append( + TxResult( + instruction_id=inst.instruction_id, + instrument=inst.instrument, + side=inst.side, + requested_qty=float(inst.quantity), + filled_qty=float(inst.quantity), + avg_exec_price=float(exec_price) if exec_price else None, + slippage_bps=slip_bps or None, + fee_cost=fee_cost or None, + leverage=inst.leverage, + meta=inst.meta, + ) + ) + + return results diff --git a/python/valuecell/agents/strategy_agent/features/simple.py b/python/valuecell/agents/strategy_agent/features/simple.py new file mode 100644 index 000000000..d3b07a03f --- /dev/null +++ b/python/valuecell/agents/strategy_agent/features/simple.py @@ -0,0 +1,139 @@ +from collections import defaultdict +from typing import Dict, List, Optional + +import numpy as np +import pandas as pd + +from ..models import Candle, FeatureVector +from .interfaces import FeatureComputer + + +class SimpleFeatureComputer(FeatureComputer): + """Computes basic momentum and volume features.""" + + def compute_features( + self, candles: Optional[List[Candle]] = None + ) -> List[FeatureVector]: + if not candles: + return [] + + grouped: Dict[str, List[Candle]] = defaultdict(list) + for candle in candles: + grouped[candle.instrument.symbol].append(candle) + + features: List[FeatureVector] = [] + for symbol, series in grouped.items(): + # Build a DataFrame for indicator calculations + series.sort(key=lambda item: item.ts) + rows = [ + { + "ts": c.ts, + "open": c.open, + "high": c.high, + "low": c.low, + "close": c.close, + "volume": c.volume, + "interval": c.interval, + } + for c in series + ] + df = pd.DataFrame(rows) + + # EMAs + df["ema_12"] = df["close"].ewm(span=12, adjust=False).mean() + df["ema_26"] = df["close"].ewm(span=26, adjust=False).mean() + df["ema_50"] = df["close"].ewm(span=50, adjust=False).mean() + + # MACD + df["macd"] = df["ema_12"] - df["ema_26"] + df["macd_signal"] = df["macd"].ewm(span=9, adjust=False).mean() + df["macd_histogram"] = df["macd"] - df["macd_signal"] + + # RSI + delta = df["close"].diff() + gain = delta.clip(lower=0).rolling(window=14).mean() + loss = (-delta).clip(lower=0).rolling(window=14).mean() + rs = gain / loss.replace(0, np.inf) + df["rsi"] = 100 - (100 / (1 + rs)) + + # Bollinger Bands + df["bb_middle"] = df["close"].rolling(window=20).mean() + bb_std = df["close"].rolling(window=20).std() + df["bb_upper"] = df["bb_middle"] + (bb_std * 2) + df["bb_lower"] = df["bb_middle"] - (bb_std * 2) + + last = df.iloc[-1] + prev = df.iloc[-2] if len(df) > 1 else last + + change_pct = ( + (float(last.close) - float(prev.close)) / float(prev.close) + if prev.close + else 0.0 + ) + + values = { + "close": float(last.close), + "volume": float(last.volume), + "change_pct": float(change_pct), + "ema_12": ( + float(last.get("ema_12", np.nan)) + if not pd.isna(last.get("ema_12")) + else None + ), + "ema_26": ( + float(last.get("ema_26", np.nan)) + if not pd.isna(last.get("ema_26")) + else None + ), + "ema_50": ( + float(last.get("ema_50", np.nan)) + if not pd.isna(last.get("ema_50")) + else None + ), + "macd": ( + float(last.get("macd", np.nan)) + if not pd.isna(last.get("macd")) + else None + ), + "macd_signal": ( + float(last.get("macd_signal", np.nan)) + if not pd.isna(last.get("macd_signal")) + else None + ), + "macd_histogram": ( + float(last.get("macd_histogram", np.nan)) + if not pd.isna(last.get("macd_histogram")) + else None + ), + "rsi": ( + float(last.get("rsi", np.nan)) + if not pd.isna(last.get("rsi")) + else None + ), + "bb_upper": ( + float(last.get("bb_upper", np.nan)) + if not pd.isna(last.get("bb_upper")) + else None + ), + "bb_middle": ( + float(last.get("bb_middle", np.nan)) + if not pd.isna(last.get("bb_middle")) + else None + ), + "bb_lower": ( + float(last.get("bb_lower", np.nan)) + if not pd.isna(last.get("bb_lower")) + else None + ), + } + + features.append( + FeatureVector( + ts=int(last["ts"]), + instrument=series[-1].instrument, + values=values, + meta={"interval": series[-1].interval, "count": len(series)}, + ) + ) + + return features diff --git a/python/valuecell/agents/strategy_agent/models.py b/python/valuecell/agents/strategy_agent/models.py index 42276f9bf..e28eae634 100644 --- a/python/valuecell/agents/strategy_agent/models.py +++ b/python/valuecell/agents/strategy_agent/models.py @@ -5,6 +5,7 @@ from .constants import ( DEFAULT_AGENT_MODEL, + DEFAULT_CAP_FACTOR, DEFAULT_INITIAL_CAPITAL, DEFAULT_MAX_LEVERAGE, DEFAULT_MAX_POSITIONS, @@ -34,7 +35,16 @@ class TradeSide(str, Enum): SELL = "SELL" -class ModelConfig(BaseModel): +class ComponentType(str, Enum): + """Component types for StrategyAgent streaming responses.""" + + STATUS = "strategy_agent_status" + UPDATE_TRADE = "strategy_agent_update_trade" + UPDATE_PORTFOLIO = "strategy_agent_update_portfolio" + UPDATE_STRATEGY_SUMMARY = "strategy_agent_update_strategy_summary" + + +class LLMModelConfig(BaseModel): """AI model configuration for strategy.""" provider: str = Field( @@ -103,6 +113,12 @@ class TradingConfig(BaseModel): description="Optional custom prompt to customize strategy behavior", ) + cap_factor: float = Field( + default=DEFAULT_CAP_FACTOR, + description="Notional cap factor used by the composer to limit per-symbol exposure (e.g., 1.5)", + gt=0, + ) + @field_validator("symbols") @classmethod def validate_symbols(cls, v): @@ -121,8 +137,8 @@ class UserRequest(BaseModel): update a strategy instance. It was previously named `Strategy`. """ - model_config: ModelConfig = Field( - default_factory=ModelConfig, description="AI model configuration" + llm_model_config: LLMModelConfig = Field( + default_factory=LLMModelConfig, description="AI model configuration" ) exchange_config: ExchangeConfig = Field( default_factory=ExchangeConfig, description="Exchange configuration for trading" @@ -191,6 +207,42 @@ class StrategyStatus(str, Enum): ERROR = "error" +class Constraints(BaseModel): + """Typed constraints model used by the runtime and composer. + + Only includes guardrails used in Phase 1. Extend later in Phase 2. + """ + + max_positions: Optional[int] = Field( + default=None, + description="Maximum number of concurrent positions allowed for the strategy", + ) + max_leverage: Optional[float] = Field( + default=None, + description="Maximum leverage allowed for the strategy (e.g., 2.0 means up to 2x).", + ) + quantity_step: Optional[float] = Field( + default=None, + description="Minimum increment / step size for order quantities (in instrument units).", + ) + min_trade_qty: Optional[float] = Field( + default=None, + description="Minimum trade quantity (in instrument units) allowed for a single order.", + ) + max_order_qty: Optional[float] = Field( + default=None, + description="Maximum quantity allowed per single order (in instrument units).", + ) + min_notional: Optional[float] = Field( + default=None, + description="Minimum order notional (in quote currency) required for an order to be placed.", + ) + max_position_qty: Optional[float] = Field( + default=None, + description="Maximum absolute position quantity allowed for any single instrument (in instrument units).", + ) + + class PositionSnapshot(BaseModel): """Current position snapshot for one instrument.""" @@ -201,6 +253,9 @@ class PositionSnapshot(BaseModel): default=None, description="Current mark/reference price for P&L calc" ) unrealized_pnl: Optional[float] = Field(default=None, description="Unrealized PnL") + unrealized_pnl_pct: Optional[float] = Field( + default=None, description="Unrealized P&L as a percent of position value" + ) # Optional fields useful for UI and reporting notional: Optional[float] = Field( default=None, description="Position notional in quote currency" @@ -236,7 +291,7 @@ class PortfolioView(BaseModel): net_exposure: Optional[float] = Field( default=None, description="Net exposure (optional)" ) - constraints: Optional[Dict[str, float | int]] = Field( + constraints: Optional[Constraints] = Field( default=None, description="Optional risk/limits snapshot (e.g., max position, step size)", ) @@ -247,8 +302,9 @@ class PortfolioView(BaseModel): total_unrealized_pnl: Optional[float] = Field( default=None, description="Sum of unrealized PnL across positions" ) - available_cash: Optional[float] = Field( - default=None, description="Cash available for new positions" + buying_power: Optional[float] = Field( + default=None, + description="Buying power: max(0, equity * max_leverage - gross_exposure)", ) @@ -257,13 +313,11 @@ class LlmDecisionAction(str, Enum): Semantics: - BUY/SELL: directional intent; final TradeSide is decided by delta (target - current) - - FLAT: target position is zero (may produce close-out instructions) - NOOP: target equals current (delta == 0), no instruction should be emitted """ BUY = "buy" SELL = "sell" - FLAT = "flat" NOOP = "noop" @@ -278,6 +332,11 @@ class LlmDecisionItem(BaseModel): target_qty: float = Field( ..., description="Desired position quantity after execution" ) + leverage: Optional[float] = Field( + default=None, + description="Requested leverage multiple for this target (e.g., 1.0 = no leverage)." + " Composer will clamp to allowed constraints.", + ) confidence: Optional[float] = Field( default=None, description="Optional confidence score [0,1]" ) @@ -291,10 +350,13 @@ class LlmPlanProposal(BaseModel): ts: int items: List[LlmDecisionItem] = Field(default_factory=list) - notes: Optional[List[str]] = Field(default=None) - model_meta: Optional[Dict[str, str]] = Field( - default=None, description="Optional model metadata (e.g., model_name)" - ) + + +class PriceMode(str, Enum): + """Order price mode: market vs limit.""" + + MARKET = "market" + LIMIT = "limit" class TradeInstruction(BaseModel): @@ -309,8 +371,12 @@ class TradeInstruction(BaseModel): instrument: InstrumentRef side: TradeSide quantity: float = Field(..., description="Order quantity in instrument units") - price_mode: str = Field( - ..., description='"market" or "limit" (initial versions may use only "market")' + leverage: Optional[float] = Field( + default=None, + description="Leverage multiple to apply for this instruction (if supported).", + ) + price_mode: PriceMode = Field( + PriceMode.MARKET, description="Order price mode: market vs limit" ) limit_price: Optional[float] = Field(default=None) max_slippage_bps: Optional[float] = Field(default=None) @@ -319,6 +385,46 @@ class TradeInstruction(BaseModel): ) +class TxStatus(str, Enum): + """Execution status of a submitted instruction.""" + + FILLED = "filled" + PARTIAL = "partial" + REJECTED = "rejected" + ERROR = "error" + + +class TxResult(BaseModel): + """Result of executing a TradeInstruction at a broker/exchange. + + This captures execution-side details such as fills, effective price, + fees and slippage. The coordinator converts TxResult into TradeHistoryEntry. + """ + + instruction_id: str = Field(..., description="Originating instruction id") + instrument: InstrumentRef + side: TradeSide + requested_qty: float = Field(..., description="Requested order quantity") + filled_qty: float = Field(..., description="Filled quantity (<= requested)") + avg_exec_price: Optional[float] = Field( + default=None, description="Average execution price for the fills" + ) + slippage_bps: Optional[float] = Field( + default=None, description="Observed slippage in basis points" + ) + fee_cost: Optional[float] = Field( + default=None, description="Total fees charged in quote currency" + ) + leverage: Optional[float] = Field( + default=None, description="Leverage applied, if any" + ) + status: TxStatus = Field(default=TxStatus.FILLED) + reason: Optional[str] = Field( + default=None, description="Message for rejects/errors" + ) + meta: Optional[Dict[str, str | float]] = Field(default=None) + + class MetricPoint(BaseModel): """Generic time-value point, used for value history charts.""" @@ -352,9 +458,6 @@ class ComposeContext(BaseModel): market_snapshot: Optional[Dict[str, float]] = Field( default=None, description="Optional map symbol -> current reference price" ) - constraints: Optional[Dict[str, float | int]] = Field( - default=None, description="Optional extra constraints for guardrails" - ) class HistoryRecord(BaseModel): @@ -412,6 +515,10 @@ class TradeHistoryEntry(BaseModel): holding_ms: Optional[int] = Field(default=None, description="Holding time in ms") realized_pnl: Optional[float] = Field(default=None) realized_pnl_pct: Optional[float] = Field(default=None) + # Total fees charged for this trade in quote currency (if available) + fee_cost: Optional[float] = Field( + default=None, description="Total fees charged in quote currency for this trade" + ) leverage: Optional[float] = Field(default=None) note: Optional[str] = Field( default=None, description="Optional free-form note or comment about the trade" @@ -443,10 +550,20 @@ class StrategySummary(BaseModel): realized_pnl: Optional[float] = Field( default=None, description="Realized P&L in quote CCY" ) + pnl_pct: Optional[float] = Field( + default=None, description="P&L as percent of equity or initial capital" + ) unrealized_pnl: Optional[float] = Field( default=None, description="Unrealized P&L in quote CCY" ) - pnl_pct: Optional[float] = Field( - default=None, description="P&L as percent of equity or initial capital" + unrealized_pnl_pct: Optional[float] = Field( + default=None, description="Unrealized P&L as a percent of position value" ) last_updated_ts: Optional[int] = Field(default=None) + + +class StrategyStatusContent(BaseModel): + """Content for strategy agent status component.""" + + strategy_id: str + status: StrategyStatus diff --git a/python/valuecell/agents/strategy_agent/portfolio/in_memory.py b/python/valuecell/agents/strategy_agent/portfolio/in_memory.py new file mode 100644 index 000000000..2e56c27d6 --- /dev/null +++ b/python/valuecell/agents/strategy_agent/portfolio/in_memory.py @@ -0,0 +1,229 @@ +from datetime import datetime, timezone +from typing import Dict, List, Optional + +from ..models import ( + Constraints, + PortfolioView, + PositionSnapshot, + TradeHistoryEntry, + TradeSide, + TradeType, + TradingMode, +) +from .interfaces import PortfolioService + + +class InMemoryPortfolioService(PortfolioService): + """Tracks cash and positions in memory and computes derived metrics. + + Notes: + - cash reflects running cash balance from trade settlements + - gross_exposure = sum(abs(qty) * mark_price) + - net_exposure = sum(qty * mark_price) + - equity (total_value) = cash + net_exposure [correct for both long and short] + - total_unrealized_pnl = sum((mark_price - avg_price) * qty) + - buying_power: max(0, equity * max_leverage - gross_exposure) + where max_leverage comes from portfolio.constraints (default 1.0) + """ + + def __init__( + self, + initial_capital: float, + trading_mode: TradingMode, + constraints: Optional[Constraints] = None, + strategy_id: Optional[str] = None, + ) -> None: + # Store owning strategy id on the view so downstream components + # always see which strategy this portfolio belongs to. + self._strategy_id = strategy_id + self._view = PortfolioView( + strategy_id=strategy_id, + ts=int(datetime.now(timezone.utc).timestamp() * 1000), + cash=initial_capital, + positions={}, + gross_exposure=0.0, + net_exposure=0.0, + constraints=constraints or None, + total_value=initial_capital, + total_unrealized_pnl=0.0, + buying_power=initial_capital, + ) + self._trading_mode = trading_mode + + def get_view(self) -> PortfolioView: + self._view.ts = int(datetime.now(timezone.utc).timestamp() * 1000) + # Ensure strategy_id is present on each view retrieval + if self._strategy_id is not None: + try: + self._view.strategy_id = self._strategy_id + except Exception: + pass + return self._view + + def apply_trades( + self, trades: List[TradeHistoryEntry], market_snapshot: Dict[str, float] + ) -> None: + """Apply trades and update portfolio positions and aggregates. + + This method updates: + - cash (subtract on BUY, add on SELL at trade price) + - positions with weighted avg price, entry_ts on (re)open, and mark_price + - per-position notional, unrealized_pnl, unrealized_pnl_pct (and keeps pnl_pct for + backward compatibility) + - portfolio aggregates: gross_exposure, net_exposure, total_value (equity), total_unrealized_pnl, buying_power + """ + for trade in trades: + symbol = trade.instrument.symbol + price = float(trade.entry_price or market_snapshot.get(symbol, 0.0) or 0.0) + delta = float(trade.quantity or 0.0) + quantity_delta = delta if trade.side == TradeSide.BUY else -delta + + position = self._view.positions.get(symbol) + if position is None: + position = PositionSnapshot( + instrument=trade.instrument, + quantity=0.0, + avg_price=None, + mark_price=price, + unrealized_pnl=0.0, + ) + self._view.positions[symbol] = position + + current_qty = float(position.quantity) + avg_price = float(position.avg_price or 0.0) + new_qty = current_qty + quantity_delta + + # Update mark price + position.mark_price = price + + # Handle position quantity transitions and avg price + if new_qty == 0.0: + # Fully closed + self._view.positions.pop(symbol, None) + elif current_qty == 0.0: + # Opening new position + position.quantity = new_qty + position.avg_price = price + position.entry_ts = ( + trade.entry_ts + or trade.trade_ts + or int(datetime.now(timezone.utc).timestamp() * 1000) + ) + position.trade_type = TradeType.LONG if new_qty > 0 else TradeType.SHORT + # Initialize leverage from trade if provided + if trade.leverage is not None: + position.leverage = float(trade.leverage) + elif (current_qty > 0 and new_qty > 0) or (current_qty < 0 and new_qty < 0): + # Same direction + if abs(new_qty) > abs(current_qty): + # Increasing position: weighted average price + position.avg_price = ( + abs(current_qty) * avg_price + abs(quantity_delta) * price + ) / abs(new_qty) + position.quantity = new_qty + # Update leverage as size-weighted average if provided + if trade.leverage is not None: + prev_lev = float(position.leverage or trade.leverage) + position.leverage = ( + abs(current_qty) * prev_lev + + abs(quantity_delta) * float(trade.leverage) + ) / abs(new_qty) + else: + # Reducing position: keep avg price, update quantity + position.quantity = new_qty + # entry_ts remains from original opening + else: + # Crossing through zero to opposite direction: reset avg price and entry_ts + position.quantity = new_qty + position.avg_price = price + position.entry_ts = ( + trade.entry_ts + or trade.trade_ts + or int(datetime.now(timezone.utc).timestamp() * 1000) + ) + position.trade_type = TradeType.LONG if new_qty > 0 else TradeType.SHORT + # Reset leverage when flipping direction + if trade.leverage is not None: + position.leverage = float(trade.leverage) + + # Update cash by trade notional + notional = price * delta + # Deduct fees from cash as well. Trade may include fee_cost (in quote ccy). + fee = trade.fee_cost or 0.0 + if trade.side == TradeSide.BUY: + # buying reduces cash by notional plus fees + self._view.cash -= notional + self._view.cash -= fee + else: + # selling increases cash by notional minus fees + self._view.cash += notional + self._view.cash -= fee + + # Recompute per-position derived fields (if position still exists) + pos = self._view.positions.get(symbol) + if pos is not None: + qty = float(pos.quantity) + mpx = float(pos.mark_price or 0.0) + apx = float(pos.avg_price or 0.0) + pos.notional = abs(qty) * mpx if mpx else None + if apx and mpx: + pos.unrealized_pnl = (mpx - apx) * qty + denom = abs(qty) * apx + pct = (pos.unrealized_pnl / denom) * 100.0 if denom else None + # populate both the newer field and keep the legacy alias + pos.unrealized_pnl_pct = pct + pos.pnl_pct = pct + else: + pos.unrealized_pnl = None + pos.unrealized_pnl_pct = None + pos.pnl_pct = None + + # Recompute portfolio aggregates + gross = 0.0 + net = 0.0 + unreal = 0.0 + for pos in self._view.positions.values(): + # Refresh mark price from snapshot if available + try: + sym = pos.instrument.symbol + except Exception: + sym = None + if sym and sym in market_snapshot: + snap_px = float(market_snapshot.get(sym) or 0.0) + if snap_px > 0: + pos.mark_price = snap_px + + mpx = float(pos.mark_price or 0.0) + qty = float(pos.quantity) + apx = float(pos.avg_price or 0.0) + # Recompute unrealized PnL and percent (populate both new and legacy fields) + if apx and mpx: + pos.unrealized_pnl = (mpx - apx) * qty + denom = abs(qty) * apx + pct = (pos.unrealized_pnl / denom) * 100.0 if denom else None + pos.unrealized_pnl_pct = pct + pos.pnl_pct = pct + else: + pos.unrealized_pnl = None + pos.unrealized_pnl_pct = None + pos.pnl_pct = None + gross += abs(qty) * mpx + net += qty * mpx + if pos.unrealized_pnl is not None: + unreal += float(pos.unrealized_pnl) + + self._view.gross_exposure = gross + self._view.net_exposure = net + self._view.total_unrealized_pnl = unreal + # Equity is cash plus net exposure (correct for both long and short) + equity = self._view.cash + net + self._view.total_value = equity + + # Approximate buying power using max leverage constraint + max_lev = ( + float(self._view.constraints.max_leverage) + if (self._view.constraints and self._view.constraints.max_leverage) + else 1.0 + ) + buying_power = max(0.0, equity * max_lev - gross) + self._view.buying_power = buying_power diff --git a/python/valuecell/agents/strategy_agent/portfolio/interfaces.py b/python/valuecell/agents/strategy_agent/portfolio/interfaces.py index a81e366fa..e08360a3b 100644 --- a/python/valuecell/agents/strategy_agent/portfolio/interfaces.py +++ b/python/valuecell/agents/strategy_agent/portfolio/interfaces.py @@ -1,9 +1,9 @@ from __future__ import annotations from abc import ABC, abstractmethod -from typing import Optional +from typing import Dict, List, Optional -from ..models import PortfolioView +from ..models import PortfolioView, TradeHistoryEntry class PortfolioService(ABC): @@ -17,6 +17,18 @@ def get_view(self) -> PortfolioView: """Return the latest portfolio view (positions, cash, optional constraints).""" raise NotImplementedError + def apply_trades( + self, trades: List[TradeHistoryEntry], market_snapshot: Dict[str, float] + ) -> None: + """Apply executed trades to the portfolio view (optional). + + Implementations that support state changes (paper trading, backtests) + should update their internal view accordingly. This method is optional + for read-only portfolio services, but providing it here makes the + contract explicit to callers. + """ + raise NotImplementedError + class PortfolioSnapshotStore(ABC): """Persist/load portfolio snapshots (optional for paper/backtest modes).""" diff --git a/python/valuecell/agents/strategy_agent/runtime.py b/python/valuecell/agents/strategy_agent/runtime.py new file mode 100644 index 000000000..186e99a0c --- /dev/null +++ b/python/valuecell/agents/strategy_agent/runtime.py @@ -0,0 +1,117 @@ +from dataclasses import dataclass +from pathlib import Path +from typing import Optional + +from valuecell.utils.uuid import generate_uuid + +from .core import DecisionCycleResult, DefaultDecisionCoordinator +from .data.market import SimpleMarketDataSource +from .decision.composer import LlmComposer +from .execution.paper_trading import PaperExecutionGateway +from .features.simple import SimpleFeatureComputer +from .models import Constraints, UserRequest +from .portfolio.in_memory import InMemoryPortfolioService +from .trading_history.digest import RollingDigestBuilder +from .trading_history.recorder import InMemoryHistoryRecorder + + +def _make_prompt_provider(template_dir: Optional[Path] = None): + """Return a prompt_provider callable that builds prompts from templates. + + Behavior: + - If request.trading_config.template_id matches a file under templates dir + (try extensions .txt, .md, or exact name), the file content is used. + - If request.trading_config.custom_prompt is present, it is appended after + the template content (separated by two newlines). + - If neither is present, fall back to a simple generated prompt mentioning + the symbols. + """ + base = Path(__file__).parent / "templates" if template_dir is None else template_dir + + def provider(request: UserRequest) -> str: + tid = request.trading_config.template_id + custom = request.trading_config.custom_prompt + + template_text = "" + if tid: + # safe-resolve candidate files + candidates = [tid, f"{tid}.txt", f"{tid}.md"] + for name in candidates: + try_path = base / name + try: + resolved = try_path.resolve() + # ensure resolved path is inside base + if base.resolve() in resolved.parents or resolved == base.resolve(): + if resolved.exists() and resolved.is_file(): + template_text = resolved.read_text(encoding="utf-8") + break + except Exception: + continue + + parts = [] + if template_text: + parts.append(template_text.strip()) + if custom: + parts.append(custom.strip()) + + if parts: + return "\n\n".join(parts) + + # fallback: simple generated prompt referencing symbols + symbols = ", ".join(request.trading_config.symbols) + return f"Compose trading instructions for symbols: {symbols}." + + return provider + + +@dataclass +class StrategyRuntime: + request: UserRequest + strategy_id: str + coordinator: DefaultDecisionCoordinator + + async def run_cycle(self) -> DecisionCycleResult: + return await self.coordinator.run_once() + + +def create_strategy_runtime(request: UserRequest) -> StrategyRuntime: + strategy_id = generate_uuid("strategy") + initial_capital = request.trading_config.initial_capital or 0.0 + constraints = Constraints( + max_positions=request.trading_config.max_positions, + max_leverage=request.trading_config.max_leverage, + ) + portfolio_service = InMemoryPortfolioService( + initial_capital=initial_capital, + trading_mode=request.exchange_config.trading_mode, + constraints=constraints, + strategy_id=strategy_id, + ) + + market_data_source = SimpleMarketDataSource( + exchange_id=request.exchange_config.exchange_id + ) + feature_computer = SimpleFeatureComputer() + composer = LlmComposer(request=request) + execution_gateway = PaperExecutionGateway() + history_recorder = InMemoryHistoryRecorder() + digest_builder = RollingDigestBuilder() + + coordinator = DefaultDecisionCoordinator( + request=request, + strategy_id=strategy_id, + portfolio_service=portfolio_service, + market_data_source=market_data_source, + feature_computer=feature_computer, + composer=composer, + execution_gateway=execution_gateway, + history_recorder=history_recorder, + digest_builder=digest_builder, + prompt_provider=_make_prompt_provider(), + ) + + return StrategyRuntime( + request=request, + strategy_id=strategy_id, + coordinator=coordinator, + ) diff --git a/python/valuecell/agents/strategy_agent/templates/aggressive.txt b/python/valuecell/agents/strategy_agent/templates/aggressive.txt new file mode 100644 index 000000000..405f53db4 --- /dev/null +++ b/python/valuecell/agents/strategy_agent/templates/aggressive.txt @@ -0,0 +1,70 @@ +Aggressive Trading Strategy + +Overview +- Style: Aggressive momentum / breakout trader. High conviction, high turnover, uses leverage where available. Targets rapid capture of directional moves and volatility spikes. +- Objective: Maximize short-term returns by taking large, time-limited positions around breakouts, trend accelerations, and catalyst-driven moves. Accept higher drawdown and frequency of small losses for larger win potential. + +Trading Regime & Timeframes +- Primary timeframes: 5m, 15m, 1h (entry/exit). Use 1m for micro-execution and slippage control when needed. +- Market types: Liquid equities, crypto, futures, or FX where tight spreads and sufficient depth exist. + +Signals & Indicators +- Trend / Momentum: + - EMA(8), EMA(21), EMA(50) for short-term trend alignment. + - MACD(12,26,9) for momentum acceleration signals. +- Volatility / breakout: + - ATR(14) for dynamic stop sizing and identifying volatility expansion. + - Bollinger Bands (20, 2.0) for breakout confirmation. +- Confirmation: + - Volume spike (current volume > 1.5x average) near breakout. + - Price closing beyond recent consolidation (range breakout). + +Entry Rules (Aggressive) +- Primary entry (breakout momentum): + 1. Price closes above the consolidation high (e.g., prior 20-period high) on 5m or 15m timeframe. + 2. EMA(8) > EMA(21) and EMA(21) > EMA(50) (trend alignment) OR MACD histogram > 0 and rising. + 3. Volume >= 1.5x average volume over the consolidation window OR ATR expansion > recent ATR. + 4. Enter with market or aggressive limit (tight) order sized per position-sizing rules below. + +- Aggressive intraday add-on: + - If momentum continues and price breaks a subsequent micro-high on 1m with supporting volume, add up to a fixed add-on fraction of initial position (scale-in). Respect max_position_qty. + +Exit Rules +- Profit target: use a trailing stop based on ATR (e.g., trail = 1.5 * ATR(14)) or lock partial profits at predefined multiples (1st take: +1.5*ATR, scale out 25-50%). +- Hard stop: initial stop at entry_price - (stop_multiplier * ATR) for longs (reverse sign for shorts). Typical stop_multiplier=1.0–2.5 depending on aggressiveness. +- Time stop: exit any position that fails to reach profit target within a fixed time window (e.g., 6–12 candles on the entry timeframe). +- Flip / fast reversal: if the price rapidly reverses and crosses key EMAs in the opposite direction, flatten and consider re-entry in the new direction only if filters re-align. + +Position Sizing & Risk +- Base risk per trade: aggressive (e.g., 1.0%–3.0% of account equity) per open position. Use higher risk when confidence is high. +- Leverage: allowed if product supports it, but cap net leverage at the trading_config.max_leverage. +- Scaling: initial entry = 60% of target position; add-ons up to 40% on confirmed continuation moves. +- Max exposure: enforce max_positions and max_position_qty constraints. +- Min notional: ensure each order meets minimum notional and exchange limits. + +Execution & Slippage Control +- Use market orders when momentum is fast and limit orders when liquidity allows. Prefer immediate-or-cancel aggressive limits around breakouts. +- Respect quantity_step and exchange min increments. +- If slippage exceeds max_slippage_bps threshold repeatedly, reduce position sizing or widen stop targets. + +Risk Controls & Guardrails +- Max concurrent positions: obey `max_positions` provided in trading config. +- Per-instrument max notional and position cap: obey `max_position_qty` and `min_notional`. +- Daily drawdown kill-switch: if daily drawdown > X% (configurable), stop new entries until manual review. +- Rate-limit entries to avoid overtrading during noise: minimum time between new full-size entries for the same symbol (e.g., 15m). + +Parameters (example defaults) +- EMA periods: 8, 21, 50 +- MACD: 12,26,9 +- ATR period: 14 +- Stop multiplier: 1.5 +- Trail multiplier: 1.5 +- Volume spike multiplier: 1.5 +- Initial size fraction: 0.6 (60%) +- Add-on fraction: 0.4 (40%) +- Time stop window: 12 candles + +Operational Notes +- Backtest thoroughly across market regimes (bull, bear, sideways) and on multiple symbols before live deployment. +- Use paper trading first; expect frequent small losses and occasional large gains. +- Log all entries/exits with reasons (signal that triggered, indicators values, volume) for post-trade analysis and strategy tuning. diff --git a/python/valuecell/agents/strategy_agent/templates/default.txt b/python/valuecell/agents/strategy_agent/templates/default.txt new file mode 100644 index 000000000..57b5fb4f8 --- /dev/null +++ b/python/valuecell/agents/strategy_agent/templates/default.txt @@ -0,0 +1,63 @@ + +Goal: +Produce steady, risk-aware crypto trading decisions that aim for consistent small gains while protecting capital. + +Style & constraints: +- Focus on liquid major symbols (e.g., BTC-USD, ETH-USD). Avoid low-liquidity altcoins. +- Use conservative position sizing: target at most 1-2% of portfolio NAV per new trade (respecting `cap_factor`). +- Limit concurrent open positions to moderate number (use strategy config `max_positions`). +- Prefer market or tight-limit entries on pullbacks; avoid chasing large, fast moves. +- Use clear stop-loss and profit-target logic (see Risk Management section below). +- Favor trend-aligned entries: if the short- to mid-term trend is bullish, prefer long entries; if bearish, prefer shorts or sit out. +- Avoid entering during major macro events, maintenance windows, or low-volume periods (e.g., holidays, weekends depending on instrument). + +Signals & decision heuristics: +- Trend detection: compute short EMA (e.g., 20) vs long EMA (e.g., 100). Require short EMA > long EMA for a bullish bias and vice versa for bearish bias. +- Momentum confirmation: require a momentum feature (e.g., RSI between 30-70 band moving toward oversold for entries) to avoid overbought entries. +- Volatility filter: if realized volatility in recent window is above a configurable threshold, reduce position size or skip signals. +- Pullback entries: prefer to enter on a pullback toward a moving average or a defined support zone rather than at local highs. +- Confluence: prefer signals with at least two confirming indicators (trend + momentum or trend + volume spike on breakout). + +Order sizing & execution: +- Determine notional for a trade = min( cap_factor * average_symbol_daily_volume_notional, requested_notional, available buying power ). +- Convert notional -> quantity using current mark price. +- Clamp size to `min_trade_qty` and `max_order_qty` from runtime constraints. +- Use market orders for small/frequent rebalances; use limit orders (near current spread) for larger entries to avoid slippage. +- If partial fills occur, allow reattempts up to a short retry limit, then treat as partial fill and update portfolio accordingly. + +Risk management (mandatory): +- Stop-loss: set a stop at a fixed percentage or ATR multiple (e.g., 1.5x ATR) below entry for longs (above for shorts). +- Take-profit: set a profit target at a risk:reward ratio of at least 1:1.5 (configurable). +- Trailing stop: optionally convert stop to trailing at meaningful profit thresholds (e.g., after 1x risk reached). +- Cap total portfolio risk: do not allow aggregated potential loss (sum of per-position risk) to exceed a configurable fraction of NAV. +- Fees: account for estimated fees when sizing orders and when evaluating profit/loss. + +Position management & lifecycle: +- If a position is opened, compute and record entry price, notional, leverage, and planned stop/take levels in the trade meta. +- Re-evaluate open positions each cycle: if stop or take conditions hit, close; if market regime flips (trend opposite), consider reducing size or closing. +- Avoid frequent flipping: prefer 'flip-by-flat' — close an opposite-direction position fully before opening a new one (do not net opposite directions in same symbol). + +Edge cases & guards: +- If the computed quantity is below `min_trade_qty`, skip the trade. +- If the current spread or slippage estimate is larger than an acceptable threshold, skip or reduce order size. +- If data for an instrument is stale (last candle older than 2x interval), skip trading that instrument this cycle. + +Rationale and explainability: +- For each suggested action, include a short rationale string: why the signal triggered, which indicators agreed, and the planned stop/take. +- For rejected/ignored signals, include a brief reason (e.g., "skipped: notional below min_trade_qty", "skipped: volatility too high"). + +Telemetry & meta: +- Attach these meta fields to each instruction: compose_id, strategy_id, timestamp, estimated_fee, estimated_notional, confidence_score. +- Confidence: normalize to [0,1]; reduce size proportionally to confidence if below a threshold (e.g., 0.5). + +Failure modes & safe-fallbacks: +- If execution gateway returns an error or rejects, do not keep trying indefinitely—mark instruction as ERROR and surface reason in logs and history. +- If critical internal errors occur, pause trading and emit a status update. + +Summary (one-sentence): +Be conservative and trend-aware: take small, well-sized positions on pullbacks or confirmed breakouts, protect capital with explicit stops, and prefer gradual, repeatable profits over large, risky bets. + +Examples (short): +- Bullish pullback: short EMA > long EMA, RSI dropped below 50 and turning up, enter long sized at 1% NAV, stop = 2% below entry, target = 3% above entry. +- Breakout: short EMA crosses above long EMA with volume spike, enter on a tight breakout candle close, stop under breakout low, R:R = 1:1.5. + diff --git a/python/valuecell/agents/strategy_agent/templates/insane.txt b/python/valuecell/agents/strategy_agent/templates/insane.txt new file mode 100644 index 000000000..69186e669 --- /dev/null +++ b/python/valuecell/agents/strategy_agent/templates/insane.txt @@ -0,0 +1,39 @@ +(Insane) Reckless Trading Strategy Prompt + +Overview +- Tone: Aggressive, risk-seeking, high-frequency mindset. The strategy should prioritize capturing rapid, large directional moves and volatility spikes with little regard for traditional risk controls. +- Objective: Maximize short-term upside by taking outsized positions quickly, scaling into strong momentum, and continuously seeking opportunities to compound returns. Accept frequent drawdowns and rapid position turnover. + +Behavioral Instructions for the Agent +- Seek high-conviction momentum breakouts and jump into positions quickly. +- Favor larger position sizes and use available leverage wherever possible to amplify gains. +- Do not prioritize stop-loss discipline; prefer aggressive re-entry unless an outright opposite signal appears. +- Take multiple simultaneous positions across symbols to chase opportunities; avoid idle cash. +- If a trade runs against you but the underlying momentum remains plausible, scale in more rather than cutting losses. + +Entry & Execution Preferences +- Enter on confirmed breakouts across favored timeframes (1m, 5m, 15m), prioritizing market orders to ensure entry. +- Scale-in on continuation: initial entry at 50% target, add aggressively on micro-breakouts up to full target. +- Prefer immediate execution; if limit orders would delay entry, use market or IOC aggressive limits. + +Exit Preferences +- Take profits quickly on strong moves; scale out in chunks but keep a core position running to catch larger trends. +- Do not rely on tight, mechanical stop-losses. Instead, use confident assessments: only exit on clear reversal signals. + +Sizing & Leverage +- Use high risk per trade relative to capital (well above conservative norms). When allowed, apply leverage up to the maximum permitted. +- Ignore conservative min_notional thresholds only if doing so materially improves entry timing and the platform permits it. + +Signals & Filters +- Momentum indicators (fast EMAs, MACD histogram), volume spikes, and volatility expansions are primary signals. +- Favor symbols with the strongest recent move and highest momentum scores across provided features. + +Auditing & Explanation +- For each trade, provide a short natural-language rationale emphasizing the momentum, catalyst, and confidence level. +- Include any scale-in logic and the intended target quantity. + +Safety Notice (for humans): +This prompt intentionally biases the agent toward risky, aggressive behavior and minimal risk controls. Use only for simulation, stress testing, or demo purposes. Do NOT deploy live without adding firm risk limits, stop-loss rules, and human oversight. + +Template variables available: +- {max_positions}, {max_leverage}, {min_notional}, {quantity_step}, {max_order_qty}, {max_position_qty} diff --git a/python/valuecell/agents/strategy_agent/tests/__init__.py b/python/valuecell/agents/strategy_agent/tests/__init__.py new file mode 100644 index 000000000..616ed7f21 --- /dev/null +++ b/python/valuecell/agents/strategy_agent/tests/__init__.py @@ -0,0 +1 @@ +# Tests for strategy_agent diff --git a/python/valuecell/agents/strategy_agent/tests/test_agent.py b/python/valuecell/agents/strategy_agent/tests/test_agent.py new file mode 100644 index 000000000..ba11f0429 --- /dev/null +++ b/python/valuecell/agents/strategy_agent/tests/test_agent.py @@ -0,0 +1,47 @@ +import asyncio +import json +import os +from pprint import pprint + +from valuecell.agents.strategy_agent.agent import StrategyAgent + + +# @pytest.mark.asyncio +async def strategy_agent_basic_stream(): + """Test basic functionality of StrategyAgent stream method.""" + agent = StrategyAgent() + + # Prepare a valid JSON query based on UserRequest structure + query = json.dumps( + { + "llm_model_config": { + "provider": "openrouter", + "model_id": "deepseek/deepseek-v3.1-terminus", + "api_key": os.getenv("OPENROUTER_API_KEY"), + }, + "exchange_config": { + "exchange_id": "binance", + "trading_mode": "virtual", + "api_key": "test-exchange-key", + "secret_key": "test-secret-key", + }, + "trading_config": { + "strategy_name": "Test Strategy", + "initial_capital": 10000.0, + "max_leverage": 5.0, + "max_positions": 5, + "symbols": ["BTC/USDT", "ETH/USDT", "SOL/USDT"], + "decide_interval": 60, + "template_id": "aggressive", + "custom_prompt": "no custom prompt", + }, + } + ) + + async for response in agent.stream(query, "test-conversation", "test-task"): + pprint(response.metadata) + pprint(json.loads(response.content)) + print("\n\n") + + +asyncio.run(strategy_agent_basic_stream()) diff --git a/python/valuecell/agents/strategy_agent/trading_history/digest.py b/python/valuecell/agents/strategy_agent/trading_history/digest.py index e69de29bb..832b31686 100644 --- a/python/valuecell/agents/strategy_agent/trading_history/digest.py +++ b/python/valuecell/agents/strategy_agent/trading_history/digest.py @@ -0,0 +1,45 @@ +from datetime import datetime, timezone +from typing import Dict, List + +from ..models import HistoryRecord, InstrumentRef, TradeDigest, TradeDigestEntry +from .interfaces import DigestBuilder + + +class RollingDigestBuilder(DigestBuilder): + """Builds a lightweight digest from recent execution records.""" + + def __init__(self, window: int = 50) -> None: + self._window = max(window, 1) + + def build(self, records: List[HistoryRecord]) -> TradeDigest: + recent = records[-self._window :] + by_instrument: Dict[str, TradeDigestEntry] = {} + + for record in recent: + if record.kind != "execution": + continue + trades = record.payload.get("trades", []) + for trade_dict in trades: + instrument_dict = trade_dict.get("instrument") or {} + symbol = instrument_dict.get("symbol") + if not symbol: + continue + entry = by_instrument.get(symbol) + if entry is None: + entry = TradeDigestEntry( + instrument=InstrumentRef(**instrument_dict), + trade_count=0, + realized_pnl=0.0, + ) + by_instrument[symbol] = entry + entry.trade_count += 1 + realized = float(trade_dict.get("realized_pnl") or 0.0) + entry.realized_pnl += realized + entry.last_trade_ts = trade_dict.get("trade_ts") or entry.last_trade_ts + + timestamp = ( + recent[-1].ts + if recent + else int(datetime.now(timezone.utc).timestamp() * 1000) + ) + return TradeDigest(ts=timestamp, by_instrument=by_instrument) diff --git a/python/valuecell/agents/strategy_agent/trading_history/recorder.py b/python/valuecell/agents/strategy_agent/trading_history/recorder.py index e69de29bb..e6fb7857b 100644 --- a/python/valuecell/agents/strategy_agent/trading_history/recorder.py +++ b/python/valuecell/agents/strategy_agent/trading_history/recorder.py @@ -0,0 +1,14 @@ +from typing import List + +from ..models import HistoryRecord +from .interfaces import HistoryRecorder + + +class InMemoryHistoryRecorder(HistoryRecorder): + """In-memory recorder storing history records.""" + + def __init__(self) -> None: + self.records: List[HistoryRecord] = [] + + def record(self, record: HistoryRecord) -> None: + self.records.append(record) diff --git a/python/valuecell/core/agent/connect.py b/python/valuecell/core/agent/connect.py index 1594adf69..e60a4b231 100644 --- a/python/valuecell/core/agent/connect.py +++ b/python/valuecell/core/agent/connect.py @@ -31,6 +31,8 @@ class AgentContext: listener_task: Optional[asyncio.Task] = None listener_url: Optional[str] = None client: Optional[AgentClient] = None + # Planner passthrough flag derived from raw agent card JSON + planner_passthrough: bool = False # Listener preferences desired_listener_host: Optional[str] = None desired_listener_port: Optional[int] = None @@ -91,13 +93,20 @@ def _load_remote_contexts(self, agent_card_dir: str = None) -> None: continue if not agent_card_dict.get("enabled", True): continue + # Detect planner passthrough from raw JSON (top-level or metadata) + passthrough = bool(agent_card_dict.get("planner_passthrough")) + if not passthrough: + meta = agent_card_dict.get("metadata") or {} + if isinstance(meta, dict): + passthrough = bool(meta.get("planner_passthrough")) local_agent_card = parse_local_agent_card_dict(agent_card_dict) - if not local_agent_card or not local_agent_card.url: + if not local_agent_card: continue self._contexts[agent_name] = AgentContext( name=agent_name, url=local_agent_card.url, local_agent_card=local_agent_card, + planner_passthrough=passthrough, ) except (json.JSONDecodeError, FileNotFoundError, KeyError) as e: logger.warning( @@ -330,3 +339,12 @@ def get_all_agent_cards(self) -> Dict[str, AgentCard]: agent_cards[name] = card return agent_cards + + def is_planner_passthrough(self, agent_name: str) -> bool: + """Return True if the named agent is marked as planner passthrough. + + The flag is read once from raw JSON on load and cached in AgentContext. + """ + self._ensure_remote_contexts_loaded() + ctx = self._contexts.get(agent_name) + return bool(getattr(ctx, "planner_passthrough", False)) if ctx else False diff --git a/python/valuecell/core/coordinate/tests/test_orchestrator.py b/python/valuecell/core/coordinate/tests/test_orchestrator.py index 2ec36b7b6..0b303b27e 100644 --- a/python/valuecell/core/coordinate/tests/test_orchestrator.py +++ b/python/valuecell/core/coordinate/tests/test_orchestrator.py @@ -123,7 +123,8 @@ def require_user_input(): def _mock_conversation_manager() -> Mock: m = Mock() m.add_item = AsyncMock() - m.create_conversation = AsyncMock(return_value="new-conversation-id") + # Return a stub conversation object (not just an ID) so title logic works + m.create_conversation = AsyncMock(return_value=_stub_conversation(title=None)) m.get_conversation_items = AsyncMock(return_value=[]) m.list_user_conversations = AsyncMock(return_value=[]) m.get_conversation = AsyncMock(return_value=_stub_conversation()) @@ -206,6 +207,8 @@ def _orchestrator( agent_connections = Mock(spec=RemoteConnections) agent_connections.get_client = AsyncMock() agent_connections.start_agent = AsyncMock() + # Ensure passthrough detection returns False so tests relying on planner output remain stable + agent_connections.is_planner_passthrough = Mock(return_value=False) conversation_service = ConversationService(manager=mock_conversation_manager) event_service = EventResponseService(conversation_service=conversation_service) @@ -350,9 +353,9 @@ async def test_sets_conversation_title_on_first_plan( # Agent returns a quick completion mock_agent_client.send_message.return_value = _make_non_streaming_response() - # Ensure conversation initially has no title - conv = _stub_conversation(title=None) - mock_conversation_manager.get_conversation.return_value = conv + # Force conversation creation path (first call returns None then a stub) + conv_created = _stub_conversation(title=None) + mock_conversation_manager.get_conversation.side_effect = [None, conv_created] # Run once out = [] @@ -360,12 +363,8 @@ async def test_sets_conversation_title_on_first_plan( out.append(chunk) # After planning, title should be set from first task title (fixture: "Auto Title") - called_with_titles = [ - getattr(c.args[0], "title", None) - for c in mock_conversation_manager.update_conversation.call_args_list - if c.args - ] - assert any(t == "Auto Title" for t in called_with_titles) + # Inspect final conversation object for title assignment + assert conv_created.title == "Auto Title" @pytest.mark.asyncio @@ -421,13 +420,14 @@ async def test_no_title_set_when_no_tasks( orchestrator.plan_service.planner.create_plan = AsyncMock(return_value=empty_plan) conv = _stub_conversation(title=None) - mock_conversation_manager.get_conversation.return_value = conv + mock_conversation_manager.get_conversation.side_effect = [conv] out = [] async for chunk in orchestrator.process_user_input(sample_user_input): out.append(chunk) # Title should remain None + # Empty plan should not set a title assert conv.title is None @@ -468,9 +468,14 @@ async def test_planner_error( async for chunk in orchestrator.process_user_input(sample_user_input): out.append(chunk) - assert len(out) == 3 - assert "(Error)" in out[1].data.payload.content - assert "Planning failed" in out[1].data.payload.content + # Expect at least system_failed and done; may include conversation_started if newly created + assert len(out) >= 2 + error_contents = [ + getattr(getattr(r.data, "payload", None), "content", "") + for r in out + if getattr(r, "data", None) + ] + assert any("(Error)" in c and "Planning failed" in c for c in error_contents) @pytest.mark.asyncio diff --git a/python/valuecell/core/plan/service.py b/python/valuecell/core/plan/service.py index 48b0729b2..a4ce8ac92 100644 --- a/python/valuecell/core/plan/service.py +++ b/python/valuecell/core/plan/service.py @@ -1,16 +1,26 @@ -"""Planning service coordinating planner and user input lifecycle.""" +"""Planning service coordinating planner and user input lifecycle. + +Enhancement: supports "planner passthrough" agents. When a target agent is +marked as passthrough (flag captured by RemoteConnections at startup), the +planner will skip running the LLM planning agent and directly synthesize a +single-task ExecutionPlan that hands the user's query to the specified agent. +""" from __future__ import annotations import asyncio +from datetime import datetime from typing import Awaitable, Callable, Dict, Optional from valuecell.core.agent.connect import RemoteConnections +from valuecell.core.plan.models import ExecutionPlan from valuecell.core.plan.planner import ( ExecutionPlanner, UserInputRequest, ) +from valuecell.core.task.models import Task from valuecell.core.types import UserInput +from valuecell.utils import generate_uuid class UserInputRegistry: @@ -49,6 +59,7 @@ def __init__( execution_planner: ExecutionPlanner | None = None, user_input_registry: UserInputRegistry | None = None, ) -> None: + self._agent_connections = agent_connections self._planner = execution_planner or ExecutionPlanner(agent_connections) self._input_registry = user_input_registry or UserInputRegistry() @@ -81,6 +92,56 @@ def start_planning_task( ) -> asyncio.Task: """Kick off asynchronous planning.""" + agent_name = (user_input.target_agent_name or "").strip() + is_passthrough = False + if agent_name: + try: + is_passthrough = bool( + self._agent_connections.is_planner_passthrough(agent_name) + ) + except Exception: + is_passthrough = False + if is_passthrough: + # Directly create a simple one-task plan without invoking the LLM planner + return asyncio.create_task( + self._create_passthrough_plan(user_input, thread_id) + ) + return asyncio.create_task( self._planner.create_plan(user_input, callback, thread_id) ) + + # ------------------------ + # Internal helpers + # ------------------------ + async def _create_passthrough_plan( + self, user_input: UserInput, thread_id: str + ) -> ExecutionPlan: + """Synthesize a simple one-task plan that directly invokes target agent. + + The produced plan mirrors the structure of a normal planner output but + avoids any LLM calls. It simply wraps the user's query into a Task + addressed to the target agent. + """ + conversation_id = user_input.meta.conversation_id + plan = ExecutionPlan( + plan_id=generate_uuid("plan"), + conversation_id=conversation_id, + user_id=user_input.meta.user_id, + orig_query=user_input.query, + created_at=datetime.now().isoformat(), + ) + + agent_name = user_input.target_agent_name or "" + # Keep a concise title so UI/conversation title can reuse it + title = f"Run {agent_name}".strip() + task = Task( + conversation_id=conversation_id, + thread_id=thread_id, + user_id=user_input.meta.user_id, + agent_name=agent_name, + title=title, + query=user_input.query, + ) + plan.tasks = [task] + return plan diff --git a/python/valuecell/server/services/strategy_persistence.py b/python/valuecell/server/services/strategy_persistence.py new file mode 100644 index 000000000..1e869fcdf --- /dev/null +++ b/python/valuecell/server/services/strategy_persistence.py @@ -0,0 +1,157 @@ +from datetime import datetime, timezone +from typing import Optional + +from loguru import logger + +from valuecell.agents.strategy_agent import models as agent_models +from valuecell.server.db.repositories.strategy_repository import ( + get_strategy_repository, +) + + +def persist_trade_history( + strategy_id: str, trade: agent_models.TradeHistoryEntry +) -> Optional[dict]: + """Persist a single TradeHistoryEntry into strategy_details via repository. + + Returns the inserted StrategyDetail-like dict on success, or None on failure. + """ + repo = get_strategy_repository() + try: + # map direction and type + ttype = trade.type.value if getattr(trade, "type", None) is not None else None + side = trade.side.value if getattr(trade, "side", None) is not None else None + + event_time = ( + datetime.fromtimestamp(trade.trade_ts / 1000.0, tz=timezone.utc) + if trade.trade_ts + else None + ) + + item = repo.add_detail_item( + strategy_id=strategy_id, + trade_id=trade.trade_id, + symbol=trade.instrument.symbol, + type=ttype or ("LONG" if (trade.quantity or 0) > 0 else "SHORT"), + side=side or ("BUY" if (trade.quantity or 0) > 0 else "SELL"), + leverage=float(trade.leverage) if trade.leverage is not None else None, + quantity=abs(float(trade.quantity or 0.0)), + entry_price=( + float(trade.entry_price) if trade.entry_price is not None else None + ), + exit_price=( + float(trade.exit_price) if trade.exit_price is not None else None + ), + unrealized_pnl=( + float(trade.unrealized_pnl) + if getattr(trade, "unrealized_pnl", None) is not None + else ( + float(trade.realized_pnl) + if getattr(trade, "realized_pnl", None) is not None + else None + ) + ), + # Note: store unrealized_pnl separately if available on the DTO + # (some callers may populate unrealized vs realized differently) + # Keep backward-compatibility: prefer trade.unrealized_pnl when present + # If both present, the DTO should include both; StrategyDetail currently only stores unrealized_pnl. + holding_ms=int(trade.holding_ms) if trade.holding_ms is not None else None, + event_time=event_time, + note=trade.note, + ) + + if item is None: + logger.error( + "Failed to persist trade detail for strategy={} trade={}", + strategy_id, + trade.trade_id, + ) + return None + + return item.to_dict() + except Exception: + logger.exception( + "persist_trade_history failed for {} {}", + strategy_id, + getattr(trade, "trade_id", None), + ) + return None + + +def persist_portfolio_view(view: agent_models.PortfolioView) -> bool: + """Persist PortfolioView.positions into strategy_holdings (one row per symbol snapshot). + + Writes each position as a `StrategyHolding` snapshot with current timestamp if not provided. + """ + repo = get_strategy_repository() + strategy_id = view.strategy_id + try: + snapshot_ts = ( + datetime.fromtimestamp(view.ts / 1000.0, tz=timezone.utc) + if view.ts + else None + ) + for symbol, pos in view.positions.items(): + # pos is PositionSnapshot + ttype = ( + pos.trade_type.value + if pos.trade_type + else ("LONG" if pos.quantity >= 0 else "SHORT") + ) + repo.add_holding_item( + strategy_id=strategy_id, + symbol=symbol, + type=ttype, + leverage=float(pos.leverage) if pos.leverage is not None else None, + entry_price=float(pos.avg_price) if pos.avg_price is not None else None, + quantity=abs(float(pos.quantity)), + unrealized_pnl=( + float(pos.unrealized_pnl) + if pos.unrealized_pnl is not None + else None + ), + unrealized_pnl_pct=( + float(pos.unrealized_pnl_pct) + if pos.unrealized_pnl_pct is not None + else None + ), + snapshot_ts=snapshot_ts, + ) + return True + except Exception: + logger.exception("persist_portfolio_view failed for {}", strategy_id) + return False + + +def persist_strategy_summary(summary: agent_models.StrategySummary) -> bool: + """Persist a StrategySummary into the Strategy.strategy_metadata JSON. + + Returns True on success, False on failure. + """ + repo = get_strategy_repository() + strategy_id = summary.strategy_id + try: + strategy = repo.get_strategy_by_strategy_id(strategy_id) + existing_meta = ( + (strategy.strategy_metadata or {}) if strategy is not None else {} + ) + meta = {**dict(existing_meta), **summary.model_dump(exclude_none=True)} + updated = repo.upsert_strategy(strategy_id, metadata=meta) + return updated is not None + except Exception: + logger.exception("persist_strategy_summary failed for {}", strategy_id) + return False + + +def strategy_running(strategy_id: str) -> bool: + """Check if a strategy with the given strategy_id exists.""" + repo = get_strategy_repository() + try: + strategy = repo.get_strategy_by_strategy_id(strategy_id) + return ( + strategy is not None + and strategy.status == agent_models.StrategyStatus.RUNNING.value + ) + except Exception: + logger.exception("strategy_running check failed for {}", strategy_id) + return False diff --git a/python/valuecell/utils/model.py b/python/valuecell/utils/model.py index 86ba9509d..f37b5fbde 100644 --- a/python/valuecell/utils/model.py +++ b/python/valuecell/utils/model.py @@ -143,7 +143,12 @@ def get_model_for_agent(agent_name: str, **kwargs): raise -def create_model_with_provider(provider: str, model_id: Optional[str] = None, **kwargs): +def create_model_with_provider( + provider: str, + model_id: Optional[str] = None, + api_key: Optional[str] = None, + **kwargs, +): """ Create a model from a specific provider. @@ -173,13 +178,60 @@ def create_model_with_provider(provider: str, model_id: Optional[str] = None, ** ValueError: If provider not found or not configured """ - return create_model( - model_id=model_id, - provider=provider, - use_fallback=False, # Don't fallback when explicitly requesting a provider - **kwargs, + # If no api_key override is supplied, use the standard factory path. + if not api_key: + return create_model( + model_id=model_id, + provider=provider, + use_fallback=False, # Don't fallback when explicitly requesting a provider + **kwargs, + ) + + # Minimal override: instantiate the provider class with a copy of its + # ProviderConfig but using the provided api_key. This avoids changing the + # global configuration and keeps the change localized to this call. + try: + from valuecell.adapters.models.factory import get_model_factory + from valuecell.config.manager import ProviderConfig, get_config_manager + except Exception: + # Fallback to factory convenience if imports fail for some reason + return create_model( + model_id=model_id, + provider=provider, + use_fallback=False, + api_key=api_key, + **kwargs, + ) + + cfg_mgr = get_config_manager() + existing = cfg_mgr.get_provider_config(provider) + if not existing: + raise ValueError(f"Provider configuration not found: {provider}") + + # Build a shallow copy of ProviderConfig overriding api_key + overridden = ProviderConfig( + name=existing.name, + enabled=existing.enabled, + api_key=api_key, + base_url=existing.base_url, + default_model=existing.default_model, + models=existing.models, + parameters=existing.parameters, + default_embedding_model=existing.default_embedding_model, + embedding_models=existing.embedding_models, + embedding_parameters=existing.embedding_parameters, + extra_config=existing.extra_config, ) + factory = get_model_factory() + provider_class = factory._providers.get(provider) + if not provider_class: + raise ValueError(f"Unsupported provider: {provider}") + + provider_instance = provider_class(overridden) + # Delegate to the provider instance directly so the supplied api_key is used + return provider_instance.create_model(model_id, **kwargs) + # ============================================ # Embedding Functions