diff --git a/python/pyproject.toml b/python/pyproject.toml index 35ce0cbd5..d3df20f08 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -14,6 +14,7 @@ dependencies = [ "fastapi>=0.104.0", "pydantic>=2.0.0", "uvicorn>=0.24.0", + "a2a-sdk[http-server]>=0.3.4", "yfinance>=0.2.65", "tushare>=1.4.24", "requests>=2.32.5", diff --git a/python/uv.lock b/python/uv.lock index d7f88bc3f..aeb283add 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -1,6 +1,33 @@ version = 1 -revision = 2 +revision = 3 requires-python = ">=3.12" +resolution-markers = [ + "python_full_version >= '3.13'", + "python_full_version < '3.13'", +] + +[[package]] +name = "a2a-sdk" +version = "0.3.4" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "google-api-core" }, + { name = "httpx" }, + { name = "httpx-sse" }, + { name = "protobuf" }, + { name = "pydantic" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/cd/ce/f50c904664a0fcafa0909be7a987ef6dcf1d75595532305bcefadb0532af/a2a_sdk-0.3.4.tar.gz", hash = "sha256:79db4c287cab1235a0b0c5af9a3a58eedd8a037b51a87bed2e89f5e5e8977f65", size = 220158, upload-time = "2025-09-02T16:53:48.962Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/44/7a/c7a13e692aba332fb55480c843acb7f3cf43668623f75d127792e8fa9b30/a2a_sdk-0.3.4-py3-none-any.whl", hash = "sha256:423f72334b4a4b34cb4da07d5db3b786975de2fbc527236e3ca713b0d08c27e1", size = 135297, upload-time = "2025-09-02T16:53:46.473Z" }, +] + +[package.optional-dependencies] +http-server = [ + { name = "fastapi" }, + { name = "sse-starlette" }, + { name = "starlette" }, +] [[package]] name = "aiohappyeyeballs" @@ -169,6 +196,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/51/bb/bf7aab772a159614954d84aa832c129624ba6c32faa559dfb200a534e50b/bs4-0.0.2-py2.py3-none-any.whl", hash = "sha256:abf8742c0805ef7f662dce4b51cca104cffe52b835238afc169142ab9b3fbccc", size = 1189, upload-time = "2024-01-17T18:15:48.613Z" }, ] +[[package]] +name = "cachetools" +version = "5.5.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/6c/81/3747dad6b14fa2cf53fcf10548cf5aea6913e96fab41a3c198676f8948a5/cachetools-5.5.2.tar.gz", hash = "sha256:1a661caa9175d26759571b2e19580f9d6393969e5dfca11fdb1f947a23e640d4", size = 28380, upload-time = "2025-02-20T21:01:19.524Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/72/76/20fa66124dbe6be5cafeb312ece67de6b61dd91a0247d1ea13db4ebb33c2/cachetools-5.5.2-py3-none-any.whl", hash = "sha256:d26a22bcc62eb95c3beabd9f1ee5e820d3d2704fe2967cbe350e20c8ffcd3f0a", size = 10080, upload-time = "2025-02-20T21:01:16.647Z" }, +] + [[package]] name = "certifi" version = "2025.8.3" @@ -462,6 +498,48 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ee/45/b82e3c16be2182bff01179db177fe144d58b5dc787a7d4492c6ed8b9317f/frozenlist-1.7.0-py3-none-any.whl", hash = "sha256:9a5af342e34f7e97caf8c995864c7a396418ae2859cc6fdf1b1073020d516a7e", size = 13106, upload-time = "2025-06-09T23:02:34.204Z" }, ] +[[package]] +name = "google-api-core" +version = "2.25.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "google-auth" }, + { name = "googleapis-common-protos" }, + { name = "proto-plus" }, + { name = "protobuf" }, + { name = "requests" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/dc/21/e9d043e88222317afdbdb567165fdbc3b0aad90064c7e0c9eb0ad9955ad8/google_api_core-2.25.1.tar.gz", hash = "sha256:d2aaa0b13c78c61cb3f4282c464c046e45fbd75755683c9c525e6e8f7ed0a5e8", size = 165443, upload-time = "2025-06-12T20:52:20.439Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/14/4b/ead00905132820b623732b175d66354e9d3e69fcf2a5dcdab780664e7896/google_api_core-2.25.1-py3-none-any.whl", hash = "sha256:8a2a56c1fef82987a524371f99f3bd0143702fecc670c72e600c1cda6bf8dbb7", size = 160807, upload-time = "2025-06-12T20:52:19.334Z" }, +] + +[[package]] +name = "google-auth" +version = "2.40.3" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "cachetools" }, + { name = "pyasn1-modules" }, + { name = "rsa" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/9e/9b/e92ef23b84fa10a64ce4831390b7a4c2e53c0132568d99d4ae61d04c8855/google_auth-2.40.3.tar.gz", hash = "sha256:500c3a29adedeb36ea9cf24b8d10858e152f2412e3ca37829b3fa18e33d63b77", size = 281029, upload-time = "2025-06-04T18:04:57.577Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/17/63/b19553b658a1692443c62bd07e5868adaa0ad746a0751ba62c59568cd45b/google_auth-2.40.3-py2.py3-none-any.whl", hash = "sha256:1370d4593e86213563547f97a92752fc658456fe4514c809544f330fed45a7ca", size = 216137, upload-time = "2025-06-04T18:04:55.573Z" }, +] + +[[package]] +name = "googleapis-common-protos" +version = "1.70.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "protobuf" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/39/24/33db22342cf4a2ea27c9955e6713140fedd51e8b141b5ce5260897020f1a/googleapis_common_protos-1.70.0.tar.gz", hash = "sha256:0e1b44e0ea153e6594f9f394fef15193a68aaaea2d843f83e2742717ca753257", size = 145903, upload-time = "2025-04-14T10:17:02.924Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/86/f1/62a193f0227cf15a920390abe675f386dec35f7ae3ffe6da582d3ade42c7/googleapis_common_protos-1.70.0-py3-none-any.whl", hash = "sha256:b8bfcca8c25a2bb253e0e0b0adaf8c00773e5e6af6fd92397576680b807e0fd8", size = 294530, upload-time = "2025-04-14T10:17:01.271Z" }, +] + [[package]] name = "h11" version = "0.16.0" @@ -484,6 +562,43 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/6c/dd/a834df6482147d48e225a49515aabc28974ad5a4ca3215c18a882565b028/html5lib-1.1-py2.py3-none-any.whl", hash = "sha256:0d78f8fde1c230e99fe37986a60526d7049ed4bf8a9fadbad5f00e22e58e041d", size = 112173, upload-time = "2020-06-22T23:32:36.781Z" }, ] +[[package]] +name = "httpcore" +version = "1.0.9" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "certifi" }, + { name = "h11" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/06/94/82699a10bca87a5556c9c59b5963f2d039dbd239f25bc2a63907a05a14cb/httpcore-1.0.9.tar.gz", hash = "sha256:6e34463af53fd2ab5d807f399a9b45ea31c3dfa2276f15a2c3f00afff6e176e8", size = 85484, upload-time = "2025-04-24T22:06:22.219Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/7e/f5/f66802a942d491edb555dd61e3a9961140fd64c90bce1eafd741609d334d/httpcore-1.0.9-py3-none-any.whl", hash = "sha256:2d400746a40668fc9dec9810239072b40b4484b640a8c38fd654a024c7a1bf55", size = 78784, upload-time = "2025-04-24T22:06:20.566Z" }, +] + +[[package]] +name = "httpx" +version = "0.28.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "anyio" }, + { name = "certifi" }, + { name = "httpcore" }, + { name = "idna" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/b1/df/48c586a5fe32a0f01324ee087459e112ebb7224f646c0b5023f5e79e9956/httpx-0.28.1.tar.gz", hash = "sha256:75e98c5f16b0f35b567856f597f06ff2270a374470a5c2392242528e3e3e42fc", size = 141406, upload-time = "2024-12-06T15:37:23.222Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/2a/39/e50c7c3a983047577ee07d2a9e53faf5a69493943ec3f6a384bdc792deb2/httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad", size = 73517, upload-time = "2024-12-06T15:37:21.509Z" }, +] + +[[package]] +name = "httpx-sse" +version = "0.4.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/6e/fa/66bd985dd0b7c109a3bcb89272ee0bfb7e2b4d06309ad7b38ff866734b2a/httpx_sse-0.4.1.tar.gz", hash = "sha256:8f44d34414bc7b21bf3602713005c5df4917884f76072479b21f68befa4ea26e", size = 12998, upload-time = "2025-06-24T13:21:05.71Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/25/0a/6269e3473b09aed2dab8aa1a600c70f31f00ae1349bee30658f7e358a159/httpx_sse-0.4.1-py3-none-any.whl", hash = "sha256:cba42174344c3a5b06f255ce65b350880f962d99ead85e776f23c6618a377a37", size = 8054, upload-time = "2025-06-24T13:21:04.772Z" }, +] + [[package]] name = "idna" version = "3.10" @@ -858,6 +973,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/cc/35/cc0aaecf278bb4575b8555f2b137de5ab821595ddae9da9d3cd1da4072c7/propcache-0.3.2-py3-none-any.whl", hash = "sha256:98f1ec44fb675f5052cccc8e609c46ed23a35a1cfd18545ad4e29002d858a43f", size = 12663, upload-time = "2025-06-09T22:56:04.484Z" }, ] +[[package]] +name = "proto-plus" +version = "1.26.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "protobuf" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/f4/ac/87285f15f7cce6d4a008f33f1757fb5a13611ea8914eb58c3d0d26243468/proto_plus-1.26.1.tar.gz", hash = "sha256:21a515a4c4c0088a773899e23c7bbade3d18f9c66c73edd4c7ee3816bc96a012", size = 56142, upload-time = "2025-03-10T15:54:38.843Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/4e/6d/280c4c2ce28b1593a19ad5239c8b826871fc6ec275c21afc8e1820108039/proto_plus-1.26.1-py3-none-any.whl", hash = "sha256:13285478c2dcf2abb829db158e1047e2f1e8d63a077d94263c2b88b043c75a66", size = 50163, upload-time = "2025-03-10T15:54:37.335Z" }, +] + [[package]] name = "protobuf" version = "6.32.0" @@ -881,6 +1008,27 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/29/a9/8ce0ca222ef04d602924a1e099be93f5435ca6f3294182a30574d4159ca2/py_mini_racer-0.6.0-py2.py3-none-manylinux1_x86_64.whl", hash = "sha256:42896c24968481dd953eeeb11de331f6870917811961c9b26ba09071e07180e2", size = 5416149, upload-time = "2021-04-22T07:58:25.615Z" }, ] +[[package]] +name = "pyasn1" +version = "0.6.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/ba/e9/01f1a64245b89f039897cb0130016d79f77d52669aae6ee7b159a6c4c018/pyasn1-0.6.1.tar.gz", hash = "sha256:6f580d2bdd84365380830acf45550f2511469f673cb4a5ae3857a3170128b034", size = 145322, upload-time = "2024-09-10T22:41:42.55Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c8/f1/d6a797abb14f6283c0ddff96bbdd46937f64122b8c925cab503dd37f8214/pyasn1-0.6.1-py3-none-any.whl", hash = "sha256:0d632f46f2ba09143da3a8afe9e33fb6f92fa2320ab7e886e2d0f7672af84629", size = 83135, upload-time = "2024-09-11T16:00:36.122Z" }, +] + +[[package]] +name = "pyasn1-modules" +version = "0.4.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pyasn1" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/e9/e6/78ebbb10a8c8e4b61a59249394a4a594c1a7af95593dc933a349c8d00964/pyasn1_modules-0.4.2.tar.gz", hash = "sha256:677091de870a80aae844b1ca6134f54652fa2c8c5a52aa396440ac3106e941e6", size = 307892, upload-time = "2025-03-28T02:41:22.17Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/47/8d/d529b5d697919ba8c11ad626e835d4039be708a35b0d22de83a269a6682c/pyasn1_modules-0.4.2-py3-none-any.whl", hash = "sha256:29253a9207ce32b64c3ac6600edc75368f98473906e8fd1043bd6b5b1de2c14a", size = 181259, upload-time = "2025-03-28T02:41:19.028Z" }, +] + [[package]] name = "pycparser" version = "2.22" @@ -1034,6 +1182,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/1e/db/4254e3eabe8020b458f1a747140d32277ec7a271daf1d235b70dc0b4e6e3/requests-2.32.5-py3-none-any.whl", hash = "sha256:2462f94637a34fd532264295e186976db0f5d453d1cdd31473c85a6a161affb6", size = 64738, upload-time = "2025-08-18T20:46:00.542Z" }, ] +[[package]] +name = "rsa" +version = "4.9.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pyasn1" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/da/8a/22b7beea3ee0d44b1916c0c1cb0ee3af23b700b6da9f04991899d0c555d4/rsa-4.9.1.tar.gz", hash = "sha256:e7bdbfdb5497da4c07dfd35530e1a902659db6ff241e39d9953cad06ebd0ae75", size = 29034, upload-time = "2025-04-16T09:51:18.218Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/64/8d/0133e4eb4beed9e425d9a98ed6e081a55d195481b7632472be1af08d2f6b/rsa-4.9.1-py3-none-any.whl", hash = "sha256:68635866661c6836b8d39430f97a996acbd61bfa49406748ea243539fe239762", size = 34696, upload-time = "2025-04-16T09:51:17.142Z" }, +] + [[package]] name = "ruff" version = "0.12.11" @@ -1122,6 +1282,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/14/a0/bb38d3b76b8cae341dad93a2dd83ab7462e6dbcdd84d43f54ee60a8dc167/soupsieve-2.8-py3-none-any.whl", hash = "sha256:0cc76456a30e20f5d7f2e14a98a4ae2ee4e5abdc7c5ea0aafe795f344bc7984c", size = 36679, upload-time = "2025-08-27T15:39:50.179Z" }, ] +[[package]] +name = "sse-starlette" +version = "3.0.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "anyio" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/42/6f/22ed6e33f8a9e76ca0a412405f31abb844b779d52c5f96660766edcd737c/sse_starlette-3.0.2.tar.gz", hash = "sha256:ccd60b5765ebb3584d0de2d7a6e4f745672581de4f5005ab31c3a25d10b52b3a", size = 20985, upload-time = "2025-07-27T09:07:44.565Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ef/10/c78f463b4ef22eef8491f218f692be838282cd65480f6e423d7730dfd1fb/sse_starlette-3.0.2-py3-none-any.whl", hash = "sha256:16b7cbfddbcd4eaca11f7b586f3b8a080f1afe952c15813455b162edea619e5a", size = 11297, upload-time = "2025-07-27T09:07:43.268Z" }, +] + [[package]] name = "starlette" version = "0.47.3" @@ -1231,6 +1403,7 @@ name = "valuecell" version = "0.1.0" source = { editable = "." } dependencies = [ + { name = "a2a-sdk", extra = ["http-server"] }, { name = "akshare" }, { name = "fastapi" }, { name = "pydantic" }, @@ -1252,6 +1425,7 @@ dev = [ [package.metadata] requires-dist = [ + { name = "a2a-sdk", extras = ["http-server"], specifier = ">=0.3.4" }, { name = "akshare", specifier = ">=1.17.44" }, { name = "fastapi", specifier = ">=0.104.0" }, { name = "pydantic", specifier = ">=2.0.0" }, diff --git a/python/valuecell/core/agent/__init__.py b/python/valuecell/core/agent/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/valuecell/core/agent/client.py b/python/valuecell/core/agent/client.py new file mode 100644 index 000000000..f9a3ce403 --- /dev/null +++ b/python/valuecell/core/agent/client.py @@ -0,0 +1,81 @@ +from typing import AsyncIterator + +import httpx +from a2a.client import A2ACardResolver, ClientConfig, ClientFactory +from a2a.types import Message, Part, PushNotificationConfig, Role, TextPart +from valuecell.utils import generate_uuid + +from .types import MessageResponse + + +class AgentClient: + def __init__(self, agent_url: str, push_notification_url: str = None): + self.agent_url = agent_url + self.push_notification_url = push_notification_url + self._client = None + self._httpx_client = None + self._initialized = False + + async def _ensure_initialized(self): + if not self._initialized: + await self._setup_client() + self._initialized = True + + async def _setup_client(self): + self._httpx_client = httpx.AsyncClient(timeout=30) + + config = ClientConfig( + httpx_client=self._httpx_client, + accepted_output_modes=["text"], + ) + + push_notification_configs = [] + if self.push_notification_url: + push_notification_configs.append( + PushNotificationConfig( + id=generate_uuid("pushcfg"), + token="token", + url=self.push_notification_url, + ) + ) + config.push_notification_configs = push_notification_configs + config.streaming = False + config.polling = True + + client_factory = ClientFactory(config) + card_resolver = A2ACardResolver(self._httpx_client, self.agent_url) + card = await card_resolver.get_agent_card() + self._client = client_factory.create(card) + + async def send_message( + self, text: str, context_id: str = None, exhaustive: bool = False + ) -> MessageResponse | AsyncIterator[MessageResponse]: + """Send message to Agent""" + await self._ensure_initialized() + + message = Message( + role=Role.user, + parts=[Part(root=TextPart(text=text))], + message_id=generate_uuid("msg"), + context_id=context_id or generate_uuid("ctx"), + ) + + generator = self._client.send_message(message) + if exhaustive: + return generator + + task, event = await generator.__anext__() + await generator.aclose() + return task, event + + async def get_agent_card(self): + await self._ensure_initialized() + card_resolver = A2ACardResolver(self._httpx_client, self.agent_url) + return await card_resolver.get_agent_card() + + async def close(self): + if self._httpx_client: + await self._httpx_client.aclose() + self._httpx_client = None + self._client = None + self._initialized = False diff --git a/python/valuecell/core/agent/connect.py b/python/valuecell/core/agent/connect.py new file mode 100644 index 000000000..0c2a05f6d --- /dev/null +++ b/python/valuecell/core/agent/connect.py @@ -0,0 +1,194 @@ +import asyncio +import logging +from typing import Dict, List + +from valuecell.core.agent.client import AgentClient +from valuecell.core.agent.registry import AgentRegistry +from valuecell.core.agent.listener import NotificationListener +from valuecell.utils import get_next_available_port + +logger = logging.getLogger(__name__) + + +class RemoteConnections: + """Manager for remote Agent connections""" + + def __init__(self): + self._connections: Dict[str, AgentClient] = {} + self._running_agents: Dict[str, asyncio.Task] = {} + self._agent_instances: Dict[str, object] = {} + self._listeners: Dict[str, asyncio.Task] = {} + self._listener_urls: Dict[str, str] = {} + + async def start_agent( + self, + agent_name: str, + with_listener: bool = True, + listener_port: int = None, + listener_host: str = "localhost", + notification_callback: callable = None, + ) -> str: + """Start an agent, optionally with a notification listener.""" + agent_class = AgentRegistry.get_agent(agent_name) + if not agent_class: + raise ValueError(f"Agent '{agent_name}' not found in registry") + + # Create Agent instance + agent_instance = agent_class() + self._agent_instances[agent_name] = agent_instance + + listener_url = None + + # Start listener if requested and agent supports push notifications + if with_listener and agent_instance.agent_card.capabilities.push_notifications: + try: + listener_url = await self._start_listener_for_agent( + agent_name, + listener_host=listener_host, + listener_port=listener_port, + notification_callback=notification_callback, + ) + except Exception as e: + logger.error(f"Failed to start listener for '{agent_name}': {e}") + await self._cleanup_agent(agent_name) + raise RuntimeError( + f"Failed to start listener for '{agent_name}'" + ) from e + + # Start agent service + try: + await self._start_agent_service(agent_name, agent_instance) + except Exception as e: + logger.error(f"Failed to start agent '{agent_name}': {e}") + await self._cleanup_agent(agent_name) + raise RuntimeError(f"Failed to start agent '{agent_name}'") from e + + # Create client connection with listener URL + agent_url = agent_instance.agent_card.url + self._create_client_for_agent(agent_name, agent_instance, listener_url) + + return agent_url + + async def _start_listener_for_agent( + self, + agent_name: str, + listener_host: str, + listener_port: int = None, + notification_callback: callable = None, + ) -> str: + """Start a NotificationListener for the agent and return its URL.""" + # Auto-assign port if not specified + if listener_port is None: + listener_port = get_next_available_port(5000) + + # Create and start listener + listener = NotificationListener( + host=listener_host, + port=listener_port, + notification_callback=notification_callback, + ) + + listener_task = asyncio.create_task(listener.start_async()) + self._listeners[agent_name] = listener_task + + listener_url = f"http://{listener_host}:{listener_port}/notify" + self._listener_urls[agent_name] = listener_url + + # Wait a moment for listener to start + await asyncio.sleep(0.3) + logger.info(f"Started listener for '{agent_name}' at {listener_url}") + + return listener_url + + async def _start_agent_service(self, agent_name: str, agent_instance: object): + """Start the agent service (serve) and track the running task.""" + server_task = asyncio.create_task(agent_instance.serve()) + self._running_agents[agent_name] = server_task + + # Wait for agent to start + await asyncio.sleep(0.5) + + def _create_client_for_agent( + self, agent_name: str, agent_instance: object, listener_url: str = None + ): + """Create an AgentClient for the agent and record the connection.""" + agent_url = agent_instance.agent_card.url + self._connections[agent_name] = AgentClient( + agent_url, push_notification_url=listener_url + ) + + logger.info(f"Started agent '{agent_name}' at {agent_url}") + if listener_url: + logger.info(f" โ””โ”€ with listener at {listener_url}") + + async def _cleanup_agent(self, agent_name: str): + """Clean up all resources for an agent""" + # Close client connection + if agent_name in self._connections: + await self._connections[agent_name].close() + + # Stop listener + if agent_name in self._listeners: + self._listeners[agent_name].cancel() + try: + await self._listeners[agent_name] + except asyncio.CancelledError: + pass + del self._listeners[agent_name] + + # Stop agent + if agent_name in self._running_agents: + self._running_agents[agent_name].cancel() + try: + await self._running_agents[agent_name] + except asyncio.CancelledError: + pass + del self._running_agents[agent_name] + + # Clean up references + if agent_name in self._connections: + del self._connections[agent_name] + if agent_name in self._agent_instances: + del self._agent_instances[agent_name] + if agent_name in self._listener_urls: + del self._listener_urls[agent_name] + + async def get_client(self, agent_name: str) -> AgentClient: + """Get Agent client connection""" + if agent_name not in self._connections: + await self.start_agent(agent_name) + + return self._connections[agent_name] + + async def stop_agent(self, agent_name: str): + """Stop Agent service and associated listener""" + await self._cleanup_agent(agent_name) + logger.info(f"Stopped agent '{agent_name}' and its listener") + + def list_running_agents(self) -> List[str]: + """List running agents""" + return list(self._running_agents.keys()) + + def list_available_agents(self) -> List[str]: + """List all available agents from registry""" + return AgentRegistry.list_agents() + + async def stop_all(self): + """Stop all running agents""" + for agent_name in list(self._running_agents.keys()): + await self.stop_agent(agent_name) + + def get_agent_info(self, agent_name: str) -> dict: + """Get agent information including listener info""" + if agent_name not in self._agent_instances: + return None + + agent_instance = self._agent_instances[agent_name] + return { + "name": agent_name, + "url": agent_instance.agent_card.url, + "listener_url": self._listener_urls.get(agent_name), + "card": agent_instance.agent_card.model_dump(exclude_none=True), + "running": agent_name in self._running_agents, + "has_listener": agent_name in self._listeners, + } diff --git a/python/valuecell/core/agent/decorator.py b/python/valuecell/core/agent/decorator.py new file mode 100644 index 000000000..ba1baff86 --- /dev/null +++ b/python/valuecell/core/agent/decorator.py @@ -0,0 +1,198 @@ +import logging +from typing import Type + +import httpx +import uvicorn +from a2a.server.agent_execution import AgentExecutor, RequestContext +from a2a.server.apps import A2AStarletteApplication +from a2a.server.events import EventQueue +from a2a.server.request_handlers import DefaultRequestHandler +from a2a.server.tasks import ( + BasePushNotificationSender, + InMemoryPushNotificationConfigStore, + InMemoryTaskStore, + TaskUpdater, +) +from a2a.types import ( + AgentCapabilities, + AgentCard, + AgentSkill, + Part, + TaskState, + TextPart, + UnsupportedOperationError, +) +from a2a.utils import new_task +from a2a.utils.errors import ServerError +from valuecell.core.agent.registry import AgentRegistry +from valuecell.core.agent.types import BaseAgent +from valuecell.utils import get_next_available_port + +logger = logging.getLogger(__name__) + + +def serve( + name: str = None, + host: str = "localhost", + port: int = None, + streaming: bool = True, + push_notifications: bool = True, + description: str = None, + version: str = "1.0.0", + skills: list[AgentSkill | dict] = None, +): + def decorator(cls: Type) -> Type: + # Build agent card (port will be assigned when server starts) + agent_skills = [] + if skills: + for skill in skills: + if isinstance(skill, dict): + agent_skills.append(AgentSkill(**skill)) + elif isinstance(skill, AgentSkill): + agent_skills.append(skill) + + # Determine the agent name consistently + agent_name = name or cls.__name__ + + # Create decorated class + class DecoratedAgent(cls): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + # Assign port when instance is created + actual_port = port or get_next_available_port() + + # Create agent card with actual port + self.agent_card = AgentCard( + name=agent_name, + description=description + or f"No description available for {agent_name}", + url=f"http://{host}:{actual_port}/", + version=version, + default_input_modes=["text"], + default_output_modes=["text"], + capabilities=AgentCapabilities( + streaming=streaming, push_notifications=push_notifications + ), + skills=agent_skills, + supports_authenticated_extended_card=False, + ) + + self._host = host + self._port = actual_port + self._executor = None + self._server_task = None + + async def serve(self): + # Create AgentExecutor wrapper + self._executor = _create_agent_executor(self) + + # Setup server components + client = httpx.AsyncClient() + push_notification_config_store = InMemoryPushNotificationConfigStore() + push_notification_sender = BasePushNotificationSender( + client, config_store=push_notification_config_store + ) + request_handler = DefaultRequestHandler( + agent_executor=self._executor, + task_store=InMemoryTaskStore(), + push_config_store=push_notification_config_store, + push_sender=push_notification_sender, + ) + + server_app = A2AStarletteApplication( + agent_card=self.agent_card, + http_handler=request_handler, + ) + + # Start server + config = uvicorn.Config( + server_app.build(), + host=self._host, + port=self._port, + log_level="info", + ) + server = uvicorn.Server(config) + logger.info(f"Starting {agent_name} server at {self.agent_card.url}") + await server.serve() + + # Preserve original class metadata + DecoratedAgent.__name__ = cls.__name__ + DecoratedAgent.__qualname__ = cls.__qualname__ + + # Store agent name as class attribute for registry management + DecoratedAgent.__agent_name__ = agent_name + + # Register to registry + try: + AgentRegistry.register(DecoratedAgent, agent_name) + except ImportError: + # Registry not available, skip registration + logger.warning( + f"Agent registry not available, skipping registration for {DecoratedAgent.__name__}" + ) + + return DecoratedAgent + + return decorator + + +class GenericAgentExecutor(AgentExecutor): + def __init__(self, agent: BaseAgent): + self.agent = agent + + async def execute(self, context: RequestContext, event_queue: EventQueue) -> None: + # Ensure agent implements streaming interface + if not hasattr(self.agent, "stream"): + raise NotImplementedError( + f"Agent {self.agent.__class__.__name__} must implement 'stream' method" + ) + + # Prepare query and ensure a task exists in the system + query = context.get_user_input() + task = context.current_task + if not task: + task = new_task(context.message) + await event_queue.enqueue_event(task) + + # Helper state + updater = TaskUpdater(event_queue, task.id, task.context_id) + artifact_id = f"{self.agent.__class__.__name__}-artifact" + chunk_idx = 0 + + # Local helper to add a chunk + async def _add_chunk(content: str, last: bool = False): + nonlocal chunk_idx + parts = [Part(root=TextPart(text=content))] + await updater.add_artifact( + parts=parts, + artifact_id=artifact_id, + append=chunk_idx > 0, + last_chunk=last, + ) + if not last: + chunk_idx += 1 + + # Stream from the user agent and update task incrementally + try: + async for item in self.agent.stream(query, task.context_id, task.id): + content = item.get("content", "") + is_complete = item.get("is_task_complete", True) + + await updater.update_status(TaskState.working) + await _add_chunk(content, last=is_complete) + + if is_complete: + await updater.complete() + break + except Exception as e: + # Convert unexpected errors into server errors so callers can handle them uniformly + raise ServerError(error=e) from e + + async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None: + # Default cancel operation + raise ServerError(error=UnsupportedOperationError()) + + +def _create_agent_executor(agent_instance): + return GenericAgentExecutor(agent_instance) diff --git a/python/valuecell/core/agent/listener.py b/python/valuecell/core/agent/listener.py new file mode 100644 index 000000000..749d34608 --- /dev/null +++ b/python/valuecell/core/agent/listener.py @@ -0,0 +1,66 @@ +import asyncio +import logging +from typing import Callable, Optional + +import uvicorn +from starlette.applications import Starlette +from starlette.requests import Request +from starlette.responses import JSONResponse + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class NotificationListener: + def __init__( + self, + host: str = "localhost", + port: int = 5000, + notification_callback: Optional[Callable] = None, + ): + self.host = host + self.port = port + self.notification_callback = notification_callback + self.app = self._create_app() + + def _create_app(self): + app = Starlette() + app.add_route("/notify", self.handle_notification, methods=["POST"]) + return app + + async def handle_notification(self, request: Request): + try: + data = await request.json() + logger.info(f"๐Ÿ“จ Notification received on {self.host}:{self.port}: {data}") + + if self.notification_callback: + if asyncio.iscoroutinefunction(self.notification_callback): + await self.notification_callback(data) + else: + self.notification_callback(data) + + return JSONResponse({"status": "ok"}) + except Exception as e: + logger.error(f"Error handling notification: {e}") + return JSONResponse({"error": str(e)}, status_code=500) + + def start(self): + logger.info(f"Starting listener on {self.host}:{self.port}") + uvicorn.run(self.app, host=self.host, port=self.port) + + async def start_async(self): + logger.info(f"Starting async listener on {self.host}:{self.port}") + config = uvicorn.Config( + self.app, host=self.host, port=self.port, log_level="info" + ) + server = uvicorn.Server(config) + await server.serve() + + +def main(): + listener = NotificationListener() + listener.start() + + +if __name__ == "__main__": + main() diff --git a/python/valuecell/core/agent/registry.py b/python/valuecell/core/agent/registry.py new file mode 100644 index 000000000..641994c8c --- /dev/null +++ b/python/valuecell/core/agent/registry.py @@ -0,0 +1,202 @@ +from typing import Dict, Type, List +import logging + +logger = logging.getLogger(__name__) + + +class AgentRegistry: + """Simple Agent registry for managing decorated agents""" + + _agents: Dict[str, Type] = {} + + @classmethod + def register(cls, agent_class: Type, agent_name: str) -> None: + """Register an Agent class + + Args: + agent_class: The decorated agent class + agent_name: The agent name (from decorator parameter or class name) + """ + class_name = agent_class.__name__ + + # Primary registration: use agent_name (this is what users will lookup) + cls._agents[agent_name] = agent_class + + # Secondary registration: use class_name if different from agent_name + # This helps with debugging and class-based lookups + if class_name != agent_name: + cls._agents[class_name] = agent_class + logger.info(f"Registered agent: '{agent_name}' (class: {class_name})") + else: + logger.info(f"Registered agent: '{agent_name}'") + + @classmethod + def get_agent(cls, name: str) -> Type: + """Get a registered Agent class by name""" + return cls._agents.get(name) + + @classmethod + def get_agent_name(cls, agent_class: Type) -> str: + """Get the agent name for a given class""" + if hasattr(agent_class, "__agent_name__"): + return agent_class.__agent_name__ + return agent_class.__name__ + + @classmethod + def list_agents(cls) -> List[str]: + """List all registered agent names (primary names only)""" + # Filter out duplicates by checking if the agent_name matches the stored __agent_name__ + unique_names = [] + for name, agent_class in cls._agents.items(): + if ( + hasattr(agent_class, "__agent_name__") + and agent_class.__agent_name__ == name + ): + unique_names.append(name) + elif ( + not hasattr(agent_class, "__agent_name__") + and agent_class.__name__ == name + ): + unique_names.append(name) + return unique_names + + @classmethod + def get_all_agents(cls) -> Dict[str, Type]: + """Get all registered Agents (includes both primary and secondary keys)""" + return cls._agents.copy() + + @classmethod + def get_registry_info(cls) -> Dict[str, dict]: + """Get detailed registry information for debugging""" + info = {} + processed_classes = set() + + for _, agent_class in cls._agents.items(): + class_id = id(agent_class) + if class_id in processed_classes: + continue + + processed_classes.add(class_id) + agent_name = cls.get_agent_name(agent_class) + + info[agent_name] = { + "class_name": agent_class.__name__, + "agent_name": agent_name, + "registered_keys": [ + k for k, v in cls._agents.items() if v is agent_class + ], + "class_qualname": getattr(agent_class, "__qualname__", "N/A"), + } + + return info + + @classmethod + def unregister(cls, name: str) -> bool: + """Unregister an agent by name (agent_name or class_name) + + Args: + name: The agent name or class name to unregister + + Returns: + bool: True if agent was found and unregistered, False otherwise + """ + agent_class = cls._agents.get(name) + if not agent_class: + return False + + # Find all keys that point to this agent class + keys_to_remove = [k for k, v in cls._agents.items() if v is agent_class] + + # Remove all keys for this agent + for key in keys_to_remove: + del cls._agents[key] + + agent_name = cls.get_agent_name(agent_class) + logger.info( + f"Unregistered agent: '{agent_name}' (removed keys: {keys_to_remove})" + ) + return True + + @classmethod + def unregister_by_class(cls, agent_class: Type) -> bool: + """Unregister an agent by class reference + + Args: + agent_class: The agent class to unregister + + Returns: + bool: True if agent was found and unregistered, False otherwise + """ + # Find all keys that point to this agent class + keys_to_remove = [k for k, v in cls._agents.items() if v is agent_class] + + if not keys_to_remove: + return False + + # Remove all keys for this agent + for key in keys_to_remove: + del cls._agents[key] + + agent_name = cls.get_agent_name(agent_class) + logger.info( + f"Unregistered agent: '{agent_name}' (removed keys: {keys_to_remove})" + ) + return True + + @classmethod + def is_registered(cls, name: str) -> bool: + """Check if an agent is registered by name + + Args: + name: The agent name or class name to check + + Returns: + bool: True if agent is registered, False otherwise + """ + return name in cls._agents + + @classmethod + def unregister_all(cls, pattern: str = None) -> List[str]: + """Unregister multiple agents, optionally by pattern + + Args: + pattern: Optional string pattern to match agent names (substring match) + If None, unregisters all agents + + Returns: + List[str]: List of unregistered agent names + """ + if pattern is None: + # Unregister all + agent_names = cls.list_agents() + cls.clear() + logger.info(f"Unregistered all agents: {agent_names}") + return agent_names + + # Find agents matching pattern + matching_agents = [] + for name in cls.list_agents(): + if pattern in name: + matching_agents.append(name) + + # Unregister matching agents + unregistered = [] + for name in matching_agents: + if cls.unregister(name): + unregistered.append(name) + + return unregistered + + @classmethod + def count(cls) -> int: + """Get the number of unique registered agents + + Returns: + int: Number of unique agents (not counting duplicate keys) + """ + return len(cls.list_agents()) + + @classmethod + def clear(cls) -> None: + """Clear all registered agents (useful for testing)""" + cls._agents.clear() diff --git a/python/valuecell/core/agent/tests/__init__.py b/python/valuecell/core/agent/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/valuecell/core/agent/tests/test_e2e_demo.py b/python/valuecell/core/agent/tests/test_e2e_demo.py new file mode 100644 index 000000000..3153ac620 --- /dev/null +++ b/python/valuecell/core/agent/tests/test_e2e_demo.py @@ -0,0 +1,171 @@ +import asyncio +import logging +from valuecell.core.agent.decorator import serve +from valuecell.core.agent.connect import RemoteConnections + +# Configure logging +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger(__name__) + + +# Demo agents using the @serve decorator +@serve(name="Calculator Agent") +class CalculatorAgent: + """A calculator agent that can do basic math""" + + def __init__(self): + logger.info("Initializing CalculatorAgent") + self.agent_name = "CalculatorAgent" + + async def stream(self, query, session_id, task_id): + """Process math queries""" + logger.info(f"Calculator processing: {query}") + + yield {"is_task_complete": False, "content": f"๐Ÿงฎ Calculating: {query}"} + await asyncio.sleep(0.5) + + try: + # Simple math evaluation (in real world, use safe parsing) + if any(op in query for op in ["+", "-", "*", "/", "(", ")"]): + # For demo, just respond with a mock calculation + result = "42" # Mock result + yield {"is_task_complete": False, "content": "๐Ÿ’ญ Computing result..."} + await asyncio.sleep(0.5) + yield {"is_task_complete": True, "content": f"โœ… Result: {result}"} + else: + yield { + "is_task_complete": True, + "content": "โ“ I can help with math calculations. Try something like '2 + 3'", + } + except Exception as e: + yield { + "is_task_complete": True, + "content": f"โŒ Error in calculation: {str(e)}", + } + + +@serve(name="Weather Agent", port=9101, description="Provides weather information") +class WeatherAgent: + """A weather information agent""" + + def __init__(self): + logger.info("Initializing WeatherAgent") + self.agent_name = "WeatherAgent" + + async def stream(self, query, session_id, task_id): + """Process weather queries""" + logger.info(f"Weather processing: {query}") + + yield {"is_task_complete": False, "content": f"๐ŸŒค๏ธ Checking weather for: {query}"} + await asyncio.sleep(0.8) + + if "weather" in query.lower(): + yield { + "is_task_complete": False, + "content": "๐ŸŒก๏ธ Fetching current conditions...", + } + await asyncio.sleep(0.5) + yield { + "is_task_complete": False, + "content": "๐Ÿ“Š Analyzing forecast data...", + } + await asyncio.sleep(0.5) + yield { + "is_task_complete": True, + "content": f"โ˜€๏ธ Weather report: Sunny, 22ยฐC. Perfect day! (for query: {query})", + } + else: + yield { + "is_task_complete": True, + "content": "๐ŸŒ I provide weather information. Ask me about the weather in any location!", + } + + +@serve(name="Simple Agent", streaming=False, push_notifications=False) +class SimpleAgent: + """A simple non-streaming agent""" + + async def stream(self, query, session_id, task_id): + """Simple response""" + yield {"is_task_complete": True, "content": f"Simple response to: {query}"} + + +async def demo_complete_system(): + """Complete demonstration of the decorator system""" + logger.info("๐Ÿš€ Starting Complete A2A Decorator System Demo") + + # Create connections manager + connections = RemoteConnections() + + try: + # Show available agents from registry + available = connections.list_available_agents() + logger.info(f"๐Ÿ“‹ Available agents from registry: {available}") + + # Start multiple agents + logger.info("โ–ถ๏ธ Starting multiple agents...") + + calc_url = await connections.start_agent("CalculatorAgent") + weather_url = await connections.start_agent("WeatherAgent") + simple_url = await connections.start_agent("SimpleAgent") + + logger.info(f"๐Ÿงฎ Calculator Agent: {calc_url}") + logger.info(f"๐ŸŒค๏ธ Weather Agent: {weather_url}") + logger.info(f"๐Ÿ“ Simple Agent: {simple_url}") + + # Wait for all agents to fully start + await asyncio.sleep(3) + + # Show running agents + running = connections.list_running_agents() + logger.info(f"๐Ÿƒ Running agents: {running}") + + # Test Calculator Agent + logger.info("๐Ÿงช Testing Calculator Agent...") + client = await connections.get_client("CalculatorAgent") + task, event = await client.send_message("What is 15 + 27?") + logger.info(f"Calculator result: {task.status}") + + # # Test Weather Agent + logger.info("๐Ÿงช Testing Weather Agent...") + client = await connections.get_client("WeatherAgent") + task, event = await client.send_message( + "What's the weather like in San Francisco?" + ) + logger.info(f"Weather result: {task.status}") + + # Test Simple Agent + logger.info("๐Ÿงช Testing Simple Agent...") + client = await connections.get_client("SimpleAgent") + task, event = await client.send_message("Hello simple agent") + logger.info(f"Simple agent result: {task.status}") + + await asyncio.sleep(10) + # Show agent information + for agent_name in running: + info = connections.get_agent_info(agent_name) + if info: + logger.info( + f"โ„น๏ธ {agent_name}: {info['url']} (running: {info['running']})" + ) + + logger.info("โœ… All tests completed successfully!") + + except Exception as e: + logger.error(f"โŒ Error in demo: {e}") + import traceback + + traceback.print_exc() + raise + + finally: + # Clean up + logger.info("๐Ÿงน Stopping all agents...") + await connections.stop_all() + logger.info("โœ… Demo completed and cleaned up") + + +if __name__ == "__main__": + asyncio.run(demo_complete_system()) diff --git a/python/valuecell/core/agent/types.py b/python/valuecell/core/agent/types.py new file mode 100644 index 000000000..bc5198ae2 --- /dev/null +++ b/python/valuecell/core/agent/types.py @@ -0,0 +1,37 @@ +from abc import ABC, abstractmethod +from typing import Optional + +from a2a.types import Task, TaskArtifactUpdateEvent, TaskStatusUpdateEvent +from pydantic import BaseModel, Field + + +class StreamResponse(BaseModel): + is_task_complete: bool = Field( + default=False, + description="Indicates whether the task associated with this stream response is complete.", + ) + content: str = Field( + ..., + description="The content of the stream response, typically a chunk of data or message.", + ) + + +class BaseAgent(ABC, BaseModel): + """ + Abstract base class for all agents. + """ + + agent_name: str = Field(..., description="Unique name of the agent") + description: str = Field( + ..., description="Description of the agent's purpose and functionality" + ) + + @abstractmethod + async def stream(self, query, session_id, task_id) -> StreamResponse: + """ + Abstract method to stream the agent with the provided input data. + Must be implemented by all subclasses. + """ + + +MessageResponse = tuple[Task, Optional[TaskStatusUpdateEvent | TaskArtifactUpdateEvent]] diff --git a/python/valuecell/utils/__init__.py b/python/valuecell/utils/__init__.py index e69de29bb..29fd4e226 100644 --- a/python/valuecell/utils/__init__.py +++ b/python/valuecell/utils/__init__.py @@ -0,0 +1,7 @@ +from .port import get_next_available_port +from .uuid import generate_uuid + +__all__ = [ + "get_next_available_port", + "generate_uuid", +] diff --git a/python/valuecell/utils/port.py b/python/valuecell/utils/port.py new file mode 100644 index 000000000..b812aa242 --- /dev/null +++ b/python/valuecell/utils/port.py @@ -0,0 +1,13 @@ +import socket + + +def get_next_available_port(start: int = 9000, num: int = 1000) -> int: + for port in range(start, start + num): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + try: + s.bind(("localhost", port)) + return port + except OSError: + continue + + raise RuntimeError("No available ports found") diff --git a/python/valuecell/utils/uuid.py b/python/valuecell/utils/uuid.py new file mode 100644 index 000000000..a904c06b2 --- /dev/null +++ b/python/valuecell/utils/uuid.py @@ -0,0 +1,8 @@ +from uuid import uuid4 + + +def generate_uuid(prefix: str = None) -> str: + if not prefix: + return str(uuid4().hex) + + return f"{prefix}-{uuid4().hex}"