From d9f8d6b86f8d2c6a4dbe9d35c880f2560e227c49 Mon Sep 17 00:00:00 2001 From: Your Name Date: Fri, 22 Nov 2024 23:10:47 -0800 Subject: [PATCH] [CLEANUP] --- .gitignore | 2 + examples.py | 4 +- experimental/swarm_deploy_v3.py | 114 ++++++++++++++++++++++++-------- swarm_deploy/__init__.py | 8 ++- swarm_deploy/main.py | 7 +- swarm_deploy/message.py | 8 +++ 6 files changed, 107 insertions(+), 36 deletions(-) create mode 100644 swarm_deploy/message.py diff --git a/.gitignore b/.gitignore index 36b36ef..8df5298 100644 --- a/.gitignore +++ b/.gitignore @@ -13,6 +13,8 @@ __pycache__/ build/ develop-eggs/ dist/ +agent_workspace + downloads/ eggs/ .eggs/ diff --git a/examples.py b/examples.py index a91ba4a..302d041 100644 --- a/examples.py +++ b/examples.py @@ -112,13 +112,13 @@ # Advanced usage with configuration swarm = SwarmDeploy( router, - max_workers=4, + # max_workers=4, # cache_backend="redis" ) swarm.start( host="0.0.0.0", port=8000, - workers=4, + # workers=4, # ssl_keyfile="key.pem", # ssl_certfile="cert.pem" ) diff --git a/experimental/swarm_deploy_v3.py b/experimental/swarm_deploy_v3.py index 41ef620..41f344a 100644 --- a/experimental/swarm_deploy_v3.py +++ b/experimental/swarm_deploy_v3.py @@ -101,10 +101,16 @@ class SwarmState(BaseModel): class SwarmOutput(BaseModel): - id: str = Field(..., description="Unique identifier for the output") + id: str = Field( + ..., description="Unique identifier for the output" + ) timestamp: datetime = Field(default_factory=datetime.now) - status: str = Field(..., description="Status of the task execution") - execution_time: float = Field(..., description="Time taken to execute in seconds") + status: str = Field( + ..., description="Status of the task execution" + ) + execution_time: float = Field( + ..., description="Time taken to execute in seconds" + ) result: Any = Field(..., description="Task execution result") metadata: Dict[str, Any] = Field(default_factory=dict) cached: bool = Field(default=False) @@ -119,10 +125,13 @@ class Config: def dict(self, *args, **kwargs): d = super().dict(*args, **kwargs) # Ensure result is JSON serializable - if isinstance(d['result'], (dict, list, str, int, float, bool, None.__class__)): + if isinstance( + d["result"], + (dict, list, str, int, float, bool, None.__class__), + ): return d # Convert non-JSON serializable results to string representation - d['result'] = str(d['result']) + d["result"] = str(d["result"]) return d @@ -403,7 +412,12 @@ async def _background_worker(self): logger.error("Background worker error", error=str(e)) await asyncio.sleep(1) - async def _process_task(self, task_id: str, task_input: SwarmInput, request_id: Optional[str]): + async def _process_task( + self, + task_id: str, + task_input: SwarmInput, + request_id: Optional[str], + ): """Process a single task with enhanced error handling""" start_time = time.time() @@ -412,16 +426,28 @@ async def _process_task(self, task_id: str, task_input: SwarmInput, request_id: self.state.active_tasks += 1 self.state.status = "processing" - logger.info(f"Starting task processing", + logger.info( + "Starting task processing", task_id=task_id, - task=task_input.task[:100] + task=task_input.task[:100], ) try: result = await self._execute_task(task_input) - + # Ensure result is JSON serializable - if not isinstance(result, (dict, list, str, int, float, bool, None.__class__)): + if not isinstance( + result, + ( + dict, + list, + str, + int, + float, + bool, + None.__class__, + ), + ): result = str(result) execution_time = time.time() - start_time @@ -447,29 +473,34 @@ async def _process_task(self, task_id: str, task_input: SwarmInput, request_id: # Cache successful results cache_key = f"{self.callable_name}:{hash(task_input.task)}" - await self.cache.set(cache_key, result, ttl=self.config.cache_ttl) + await self.cache.set( + cache_key, result, ttl=self.config.cache_ttl + ) - logger.info(f"Task completed successfully", + logger.info( + "Task completed successfully", task_id=task_id, - execution_time=execution_time + execution_time=execution_time, ) return output except Exception as e: - logger.error(f"Task execution error", + logger.error( + "Task execution error", task_id=task_id, error=str(e), - exc_info=True + exc_info=True, ) raise except Exception as e: error_msg = str(e) - logger.error(f"Task processing error", + logger.error( + "Task processing error", task_id=task_id, error=error_msg, - exc_info=True + exc_info=True, ) ERROR_COUNTER.inc() self.state.error_count += 1 @@ -481,7 +512,7 @@ async def _process_task(self, task_id: str, task_input: SwarmInput, request_id: result={"error": error_msg}, metadata={ "request_id": request_id, - "error_type": type(e).__name__ + "error_type": type(e).__name__, }, ) self.task_history[task_id] = output @@ -489,7 +520,11 @@ async def _process_task(self, task_id: str, task_input: SwarmInput, request_id: finally: self.state.active_tasks -= 1 - self.state.status = "idle" if self.state.active_tasks == 0 else "processing" + self.state.status = ( + "idle" + if self.state.active_tasks == 0 + else "processing" + ) self.state.last_activity = datetime.now() await self.task_queue.task_done() @@ -498,7 +533,11 @@ async def _execute_task(self, task_input: SwarmInput) -> Any: try: self.formatter.print_panel( f"Executing {self.callable_name} with task: {task_input.task}" - + (f" and image: {task_input.img}" if task_input.img else ""), + + ( + f" and image: {task_input.img}" + if task_input.img + else "" + ), title=f"SwarmDeploy Task - {self.config.type}", ) @@ -507,11 +546,18 @@ async def _execute_task(self, task_input: SwarmInput) -> Any: # Run async function with proper error handling result = await self.callable.run(task_input.task) if result is None: - raise ValueError(f"Callable {self.callable_name} returned None") + raise ValueError( + f"Callable {self.callable_name} returned None" + ) return result except Exception as e: - logger.error(f"Error in async execution: {str(e)}", exc_info=True) - raise RuntimeError(f"Task execution failed: {str(e)}") + logger.error( + f"Error in async execution: {str(e)}", + exc_info=True, + ) + raise RuntimeError( + f"Task execution failed: {str(e)}" + ) else: # Run CPU-bound tasks in thread pool with proper error handling try: @@ -520,21 +566,31 @@ async def _execute_task(self, task_input: SwarmInput) -> Any: lambda: ( self.callable.run(task_input.task) if task_input.img is None - else self.callable.run(task_input.task, task_input.img) + else self.callable.run( + task_input.task, task_input.img + ) ), ) if result is None: - raise ValueError(f"Callable {self.callable_name} returned None") + raise ValueError( + f"Callable {self.callable_name} returned None" + ) return result except Exception as e: - logger.error(f"Error in sync execution: {str(e)}", exc_info=True) - raise RuntimeError(f"Task execution failed: {str(e)}") + logger.error( + f"Error in sync execution: {str(e)}", + exc_info=True, + ) + raise RuntimeError( + f"Task execution failed: {str(e)}" + ) except Exception as e: - logger.error(f"Error in _execute_task: {str(e)}", exc_info=True) + logger.error( + f"Error in _execute_task: {str(e)}", exc_info=True + ) raise # Re-raise the exception to be handled by _process_task - def _setup_signal_handlers(self): """Setup graceful shutdown handlers""" for sig in (signal.SIGTERM, signal.SIGINT): diff --git a/swarm_deploy/__init__.py b/swarm_deploy/__init__.py index 5c38554..e6ceece 100644 --- a/swarm_deploy/__init__.py +++ b/swarm_deploy/__init__.py @@ -1,3 +1,7 @@ -from swarm_deploy.main import SwarmBatchOutput, SwarmConfig, SwarmDeploy +from swarm_deploy.main import ( + SwarmBatchOutput, + SwarmConfig, + SwarmDeploy, +) -__all__ = ["SwarmDeploy"] \ No newline at end of file +__all__ = ["SwarmDeploy"] diff --git a/swarm_deploy/main.py b/swarm_deploy/main.py index 0324edd..b7410c3 100644 --- a/swarm_deploy/main.py +++ b/swarm_deploy/main.py @@ -1,4 +1,3 @@ - import asyncio import time import uuid @@ -10,6 +9,7 @@ from swarms.utils.formatter import formatter from swarm_deploy.callable_name import NameResolver +from swarm_deploy.message import LOGO T = TypeVar("T") @@ -126,7 +126,7 @@ def _create_config(self, agents: int) -> SwarmConfig: def _setup_routes(self): @self.app.post( - f"/v1/swarms/completions/{self.callable_name}", + f"/v1/swarms/completions/{self.callable_name}/{self.id}", response_model=Union[SwarmOutput, SwarmBatchOutput], ) async def create_completion(task_input: SwarmInput): @@ -267,8 +267,9 @@ def start(self, host: str = "0.0.0.0", port: int = 8000): import uvicorn self.formatter.print_panel( + f"\n {LOGO} \n" f"Starting SwarmDeploy API server on {host}:{port} for {self.callable_name}\n" - f"Endpoint: /v1/swarms/completions/{self.callable_name}", + f"Endpoint: /v1/swarms/completions/{self.callable_name}/{self.id}", title="Server Startup", style="bold green", ) diff --git a/swarm_deploy/message.py b/swarm_deploy/message.py new file mode 100644 index 0000000..9078c88 --- /dev/null +++ b/swarm_deploy/message.py @@ -0,0 +1,8 @@ +LOGO = """ + _________ + / _____/_ _ _______ _______ _____ ______ + \_____ \\ \/ \/ /\__ \\_ __ \/ \ / ___/ + / \\ / / __ \| | \/ Y Y \\___ \ +/_______ / \/\_/ (____ /__| |__|_| /____ > + \/ \/ \/ \/ +"""