Skip to content

Commit

Permalink
[CLEANUP]
Browse files Browse the repository at this point in the history
  • Loading branch information
Your Name committed Nov 23, 2024
1 parent 50dcfaf commit d9f8d6b
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 36 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ __pycache__/
build/
develop-eggs/
dist/
agent_workspace

downloads/
eggs/
.eggs/
Expand Down
4 changes: 2 additions & 2 deletions examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,13 @@
# Advanced usage with configuration
swarm = SwarmDeploy(
router,
max_workers=4,
# max_workers=4,
# cache_backend="redis"
)
swarm.start(
host="0.0.0.0",
port=8000,
workers=4,
# workers=4,
# ssl_keyfile="key.pem",
# ssl_certfile="cert.pem"
)
Expand Down
114 changes: 85 additions & 29 deletions experimental/swarm_deploy_v3.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,16 @@ class SwarmState(BaseModel):


class SwarmOutput(BaseModel):
id: str = Field(..., description="Unique identifier for the output")
id: str = Field(
..., description="Unique identifier for the output"
)
timestamp: datetime = Field(default_factory=datetime.now)
status: str = Field(..., description="Status of the task execution")
execution_time: float = Field(..., description="Time taken to execute in seconds")
status: str = Field(
..., description="Status of the task execution"
)
execution_time: float = Field(
..., description="Time taken to execute in seconds"
)
result: Any = Field(..., description="Task execution result")
metadata: Dict[str, Any] = Field(default_factory=dict)
cached: bool = Field(default=False)
Expand All @@ -119,10 +125,13 @@ class Config:
def dict(self, *args, **kwargs):
d = super().dict(*args, **kwargs)
# Ensure result is JSON serializable
if isinstance(d['result'], (dict, list, str, int, float, bool, None.__class__)):
if isinstance(
d["result"],
(dict, list, str, int, float, bool, None.__class__),
):
return d
# Convert non-JSON serializable results to string representation
d['result'] = str(d['result'])
d["result"] = str(d["result"])
return d


Expand Down Expand Up @@ -403,7 +412,12 @@ async def _background_worker(self):
logger.error("Background worker error", error=str(e))
await asyncio.sleep(1)

async def _process_task(self, task_id: str, task_input: SwarmInput, request_id: Optional[str]):
async def _process_task(
self,
task_id: str,
task_input: SwarmInput,
request_id: Optional[str],
):
"""Process a single task with enhanced error handling"""
start_time = time.time()

Expand All @@ -412,16 +426,28 @@ async def _process_task(self, task_id: str, task_input: SwarmInput, request_id:
self.state.active_tasks += 1
self.state.status = "processing"

logger.info(f"Starting task processing",
logger.info(
"Starting task processing",
task_id=task_id,
task=task_input.task[:100]
task=task_input.task[:100],
)

try:
result = await self._execute_task(task_input)

# Ensure result is JSON serializable
if not isinstance(result, (dict, list, str, int, float, bool, None.__class__)):
if not isinstance(
result,
(
dict,
list,
str,
int,
float,
bool,
None.__class__,
),
):
result = str(result)

execution_time = time.time() - start_time
Expand All @@ -447,29 +473,34 @@ async def _process_task(self, task_id: str, task_input: SwarmInput, request_id:

# Cache successful results
cache_key = f"{self.callable_name}:{hash(task_input.task)}"
await self.cache.set(cache_key, result, ttl=self.config.cache_ttl)
await self.cache.set(
cache_key, result, ttl=self.config.cache_ttl
)

logger.info(f"Task completed successfully",
logger.info(
"Task completed successfully",
task_id=task_id,
execution_time=execution_time
execution_time=execution_time,
)

return output

except Exception as e:
logger.error(f"Task execution error",
logger.error(
"Task execution error",
task_id=task_id,
error=str(e),
exc_info=True
exc_info=True,
)
raise

except Exception as e:
error_msg = str(e)
logger.error(f"Task processing error",
logger.error(
"Task processing error",
task_id=task_id,
error=error_msg,
exc_info=True
exc_info=True,
)
ERROR_COUNTER.inc()
self.state.error_count += 1
Expand All @@ -481,15 +512,19 @@ async def _process_task(self, task_id: str, task_input: SwarmInput, request_id:
result={"error": error_msg},
metadata={
"request_id": request_id,
"error_type": type(e).__name__
"error_type": type(e).__name__,
},
)
self.task_history[task_id] = output
return output

finally:
self.state.active_tasks -= 1
self.state.status = "idle" if self.state.active_tasks == 0 else "processing"
self.state.status = (
"idle"
if self.state.active_tasks == 0
else "processing"
)
self.state.last_activity = datetime.now()
await self.task_queue.task_done()

Expand All @@ -498,7 +533,11 @@ async def _execute_task(self, task_input: SwarmInput) -> Any:
try:
self.formatter.print_panel(
f"Executing {self.callable_name} with task: {task_input.task}"
+ (f" and image: {task_input.img}" if task_input.img else ""),
+ (
f" and image: {task_input.img}"
if task_input.img
else ""
),
title=f"SwarmDeploy Task - {self.config.type}",
)

Expand All @@ -507,11 +546,18 @@ async def _execute_task(self, task_input: SwarmInput) -> Any:
# Run async function with proper error handling
result = await self.callable.run(task_input.task)
if result is None:
raise ValueError(f"Callable {self.callable_name} returned None")
raise ValueError(
f"Callable {self.callable_name} returned None"
)
return result
except Exception as e:
logger.error(f"Error in async execution: {str(e)}", exc_info=True)
raise RuntimeError(f"Task execution failed: {str(e)}")
logger.error(
f"Error in async execution: {str(e)}",
exc_info=True,
)
raise RuntimeError(
f"Task execution failed: {str(e)}"
)
else:
# Run CPU-bound tasks in thread pool with proper error handling
try:
Expand All @@ -520,21 +566,31 @@ async def _execute_task(self, task_input: SwarmInput) -> Any:
lambda: (
self.callable.run(task_input.task)
if task_input.img is None
else self.callable.run(task_input.task, task_input.img)
else self.callable.run(
task_input.task, task_input.img
)
),
)
if result is None:
raise ValueError(f"Callable {self.callable_name} returned None")
raise ValueError(
f"Callable {self.callable_name} returned None"
)
return result
except Exception as e:
logger.error(f"Error in sync execution: {str(e)}", exc_info=True)
raise RuntimeError(f"Task execution failed: {str(e)}")
logger.error(
f"Error in sync execution: {str(e)}",
exc_info=True,
)
raise RuntimeError(
f"Task execution failed: {str(e)}"
)

except Exception as e:
logger.error(f"Error in _execute_task: {str(e)}", exc_info=True)
logger.error(
f"Error in _execute_task: {str(e)}", exc_info=True
)
raise # Re-raise the exception to be handled by _process_task


def _setup_signal_handlers(self):
"""Setup graceful shutdown handlers"""
for sig in (signal.SIGTERM, signal.SIGINT):
Expand Down
8 changes: 6 additions & 2 deletions swarm_deploy/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
from swarm_deploy.main import SwarmBatchOutput, SwarmConfig, SwarmDeploy
from swarm_deploy.main import (
SwarmBatchOutput,
SwarmConfig,
SwarmDeploy,
)

__all__ = ["SwarmDeploy"]
__all__ = ["SwarmDeploy"]
7 changes: 4 additions & 3 deletions swarm_deploy/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

import asyncio
import time
import uuid
Expand All @@ -10,6 +9,7 @@
from swarms.utils.formatter import formatter

from swarm_deploy.callable_name import NameResolver
from swarm_deploy.message import LOGO

T = TypeVar("T")

Expand Down Expand Up @@ -126,7 +126,7 @@ def _create_config(self, agents: int) -> SwarmConfig:

def _setup_routes(self):
@self.app.post(
f"/v1/swarms/completions/{self.callable_name}",
f"/v1/swarms/completions/{self.callable_name}/{self.id}",
response_model=Union[SwarmOutput, SwarmBatchOutput],
)
async def create_completion(task_input: SwarmInput):
Expand Down Expand Up @@ -267,8 +267,9 @@ def start(self, host: str = "0.0.0.0", port: int = 8000):
import uvicorn

self.formatter.print_panel(
f"\n {LOGO} \n"
f"Starting SwarmDeploy API server on {host}:{port} for {self.callable_name}\n"
f"Endpoint: /v1/swarms/completions/{self.callable_name}",
f"Endpoint: /v1/swarms/completions/{self.callable_name}/{self.id}",
title="Server Startup",
style="bold green",
)
Expand Down
8 changes: 8 additions & 0 deletions swarm_deploy/message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
LOGO = """
_________
/ _____/_ _ _______ _______ _____ ______
\_____ \\ \/ \/ /\__ \\_ __ \/ \ / ___/
/ \\ / / __ \| | \/ Y Y \\___ \
/_______ / \/\_/ (____ /__| |__|_| /____ >
\/ \/ \/ \/
"""

0 comments on commit d9f8d6b

Please sign in to comment.