From 399df5f9a5c6cb9e599d70bb9019c253b1dce351 Mon Sep 17 00:00:00 2001 From: zhuangxing <277308275@qq.com> Date: Wed, 14 Jan 2026 10:24:19 +0000 Subject: [PATCH] feat:sandbox --- README.md | 9 +- maze/cli/sandbox_cli.py | 38 +++++++++ maze/sandbox/__init__.py | 7 ++ maze/sandbox/client.py | 94 ++++++++++++++++++++ maze/sandbox/code_sandbox.py | 59 +++++++++++++ maze/sandbox/launcher.py | 137 ++++++++++++++++++++++++++++++ maze/sandbox/server.py | 89 +++++++++++++++++++ pyproject.toml | 3 + test/sandbox_test/test_sandbox.py | 29 +++++++ 9 files changed, 463 insertions(+), 2 deletions(-) create mode 100644 maze/cli/sandbox_cli.py create mode 100644 maze/sandbox/__init__.py create mode 100644 maze/sandbox/client.py create mode 100644 maze/sandbox/code_sandbox.py create mode 100644 maze/sandbox/launcher.py create mode 100644 maze/sandbox/server.py create mode 100644 test/sandbox_test/test_sandbox.py diff --git a/README.md b/README.md index 5e2ffe81..8f6ef9ba 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,13 @@
+## 📰 News + + +- **2026-01**: We support the sandbox feature! [**Docs**](https://github.com/QinbinLi/Maze/tree/develop/examples/sandbox) + +
+ ## 🚀Quick Start ## 1. Install @@ -105,5 +112,3 @@ Here are two vedios which show the process of using builtin tasks and uploading ### User Defined Task Workflow ![Check Result Screenshot](https://meeting-agent1.oss-cn-beijing.aliyuncs.com/userdef_task.png) [Check Result Video](https://meeting-agent1.oss-cn-beijing.aliyuncs.com/userdef_task.mp4) - - diff --git a/maze/cli/sandbox_cli.py b/maze/cli/sandbox_cli.py new file mode 100644 index 00000000..5e40bfe6 --- /dev/null +++ b/maze/cli/sandbox_cli.py @@ -0,0 +1,38 @@ +import argparse +import sys +import uvicorn +from maze.config.logging_config import setup_logging +from maze.sandbox.server import app as sandbox_app + +def start_sandbox(host: str = "0.0.0.0", port: int = 8000): + print(f"🚀 Starting Code Sandbox Server at {host}:{port}") + + uvicorn.run( + sandbox_app, + host=host, + port=port, + log_level="info" + ) + + +def main(): + parser = argparse.ArgumentParser(prog="maze-sandbox", description="Maze code sandbox service") + subparsers = parser.add_subparsers(dest="command", required=True, help="Available commands") + + # === start subcommand === + start_parser = subparsers.add_parser("start", help="Start the sandbox server") + start_parser.add_argument("--host", type=str, metavar="HOST", help="Host for sandbox server", default="0.0.0.0") + start_parser.add_argument("--port", type=int, metavar="PORT", help="Port for sandbox server", default=8000) + + + # Parse args + args = parser.parse_args() + if args.command == "start": + start_sandbox(args.host, args.port) + else: + parser.print_help() + sys.exit(1) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/maze/sandbox/__init__.py b/maze/sandbox/__init__.py new file mode 100644 index 00000000..7b3d6a6f --- /dev/null +++ b/maze/sandbox/__init__.py @@ -0,0 +1,7 @@ +CPU_PERIOD = 100000 + +from maze.sandbox.client import CodeSandboxClient + +__all__ = [ + "CodeSandboxClient", +] \ No newline at end of file diff --git a/maze/sandbox/client.py b/maze/sandbox/client.py new file mode 100644 index 00000000..b3b82c95 --- /dev/null +++ b/maze/sandbox/client.py @@ -0,0 +1,94 @@ +import aiohttp +from typing import Dict, Any, Optional + +class CodeSandboxClient: + def __init__(self, url: str, cpu_nums: int = 1, gpu_nums: int = 0, memory_mb: int = 512): + """ + Initialize remote code sandbox client + + Args: + url: Server URL (e.g., "http://localhost:8000") + cpu_nums: Number of CPUs + gpu_nums: Number of GPUs + memory_mb: Memory size in MB + """ + self.url = url.rstrip('/') + self.cpu_nums = cpu_nums + self.gpu_nums = gpu_nums + self.memory_mb = memory_mb + self.session_id: Optional[str] = None + self._session: Optional[aiohttp.ClientSession] = None + + async def _ensure_session(self): + if self._session is None: + self._session = aiohttp.ClientSession() + + async def _create_remote_session(self): + await self._ensure_session() + + payload = { + "code": "", + "cpu_nums": self.cpu_nums, + "gpu_nums": self.gpu_nums, + "memory_mb": self.memory_mb + } + + if self._session is None: + raise RuntimeError("Client session not initialized") + + try: + async with self._session.post(f"{self.url}/create_session", json=payload) as response: + if response.status != 200: + text = await response.text() + raise Exception(f"Failed to create session: {response.status}, {text}") + + result = await response.json() + self.session_id = result["session_id"] + return result + except Exception as e: + raise Exception(f"Error connecting to sandbox server: {str(e)}") + + async def run_code(self, code: str, timeout: float = 120.0) -> Dict[str, Any]: + if not self.session_id: + await self._create_remote_session() + + if self._session is None: + raise RuntimeError("Client session not initialized") + + payload = { + "code": code, + "timeout": timeout + } + + try: + async with self._session.post( + f"{self.url}/execute/{self.session_id}", + json=payload + ) as response: + if response.status != 200: + text = await response.text() + raise Exception(f"Code execution failed: {response.status}, {text}") + + result = await response.json() + return result + except Exception as e: + raise Exception(f"Error executing code remotely: {str(e)}") + + async def close(self): + if self.session_id and self._session: + if self._session is not None: + try: + async with self._session.delete(f"{self.url}/close/{self.session_id}") as response: + if response.status != 200: + text = await response.text() + print(f"Warning: Failed to close session: {response.status}, {text}") + else: + print("Remote sandbox session closed successfully") + except Exception as e: + print(f"Warning: Error closing remote session: {str(e)}") + + if self._session: + await self._session.close() + self._session = None + + self.session_id = None \ No newline at end of file diff --git a/maze/sandbox/code_sandbox.py b/maze/sandbox/code_sandbox.py new file mode 100644 index 00000000..2d9ead5b --- /dev/null +++ b/maze/sandbox/code_sandbox.py @@ -0,0 +1,59 @@ +import ray +from typing import Dict, Optional +from maze.sandbox.launcher import SandboxActor + +class CodeSandbox: + def __init__(self, cpu_nums: int = 1, gpu_nums: int = 0, memory_mb: int = 512): + """ + Initialize code sandbox + + Args: + cpu_nums: Number of CPUs + gpu_nums: Number of GPUs + memory_mb: Memory size in MB + """ + # Initialize Ray + if not ray.is_initialized(): + ray.init() + + self.cpu_nums = cpu_nums + self.gpu_nums = gpu_nums + self.memory_mb = memory_mb + self.actor = None + + memory_bytes = memory_mb * 1024 + + # Create Ray Actor + self.actor = SandboxActor.options( + num_cpus=cpu_nums, + num_gpus=gpu_nums, + memory=memory_bytes + ).remote(cpu_nums, gpu_nums, memory_mb) + + async def run_code(self, code: str, timeout: float = 10.0) -> Dict[str, any]: + """ + Run code in sandbox + + Args: + code: Python code string to execute + timeout: Execution timeout in seconds (default 10 seconds) + + Returns: + Dictionary containing stdout, stderr, exit_code, timed_out + """ + if not self.actor: + raise RuntimeError("Sandbox actor not initialized") + + return await self.actor.run_code.remote(code, timeout) + + def close(self): + """Close sandbox and clean up resources""" + if self.actor: + # Call actor's close method to clean up container + ray.get(self.actor.close.remote()) + + # Delete actor reference + del self.actor + self.actor = None + print("Code sandbox closed") + diff --git a/maze/sandbox/launcher.py b/maze/sandbox/launcher.py new file mode 100644 index 00000000..78937d76 --- /dev/null +++ b/maze/sandbox/launcher.py @@ -0,0 +1,137 @@ + +import asyncio +import docker +import ray +from typing import Any, Dict, Optional +import os +from maze.sandbox import CPU_PERIOD +import docker +import tarfile +import io + +@ray.remote +class SandboxActor: + def __init__(self, cpu_nums: int, gpu_nums: int, memory_mb: int): + self.cpu_nums = cpu_nums + self.gpu_nums = gpu_nums + self.memory_mb = memory_mb + self.client = docker.from_env() + self.container = None + self.container_id = None + + self._start_container() + + def _start_container(self): + """Start sandbox container""" + try: + + # Start container + gpu_ids_str = os.environ.get('CUDA_VISIBLE_DEVICES', '') # e.g., "0" or "0,1" + if gpu_ids_str: + device_ids = gpu_ids_str.split(',') + else: + device_ids = [] + self.container = self.client.containers.run( + image="pytorch/pytorch", + command="tail -f /dev/null", + user='nobody', + detach=True, + remove=False, + network_mode='default', + cpu_period=CPU_PERIOD, + cpu_quota=CPU_PERIOD * self.cpu_nums, + mem_limit=f"{self.memory_mb}m", + device_requests=[ + docker.types.DeviceRequest( + device_ids=device_ids, + capabilities=[['gpu']] # ⚠️ 强烈建议加上这个! + ) + ] + ) + + self.container_id = self.container.id + print(f"Sandbox container started, ID: {self.container_id[:12]}") + + except Exception as e: + print(f"Failed to start sandbox container: {str(e)}") + raise e + + async def run_code(self, code: str, timeout: float = 10.0) -> Dict[str, any]: + """Execute code in container""" + if not self.container: + return { + "stdout": "", + "stderr": "Container not initialized", + "exit_code": -1, + "timed_out": True + } + + try: + code_bytes = code.encode('utf-8') + tar_stream = io.BytesIO() + with tarfile.open(fileobj=tar_stream, mode='w') as tar: + tarinfo = tarfile.TarInfo(name='tmp.py') + tarinfo.size = len(code_bytes) + tar.addfile(tarinfo, io.BytesIO(code_bytes)) + tar_stream.seek(0) + self.container.put_archive(path='/tmp', data=tar_stream.getvalue()) + + + cmd = f"python /tmp/tmp.py" + + loop = asyncio.get_event_loop() + result = await asyncio.wait_for( + loop.run_in_executor(None, self._exec_command, cmd), + timeout=timeout + ) + + return result + except asyncio.TimeoutError: + # Handle timeout but don't destroy container + print("Code execution timed out") + return { + "stdout": "", + "stderr": "Execution timed out after specified duration", + "exit_code": -1, + "timed_out": True + } + except Exception as e: + print(f"Exception occurred during code execution: {str(e)}") + return { + "stdout": "", + "stderr": str(e), + "exit_code": -1, + "timed_out": False + } + + def _exec_command(self, cmd: str) -> Dict[str, any]: + """Synchronous method to execute command in container""" + try: + result = self.container.exec_run(cmd) + return { + "stdout": result.output.decode('utf-8'), + "stderr": "", # exec_run usually merges stderr into stdout + "exit_code": result.exit_code, + "timed_out": False + } + except Exception as e: + return { + "stdout": "", + "stderr": str(e), + "exit_code": -1, + "timed_out": False + } + + def close(self): + """Close and clean up container""" + if self.container: + try: + print(f"Stopping and removing sandbox container {self.container_id[:12]}") + self.container.stop() + self.container.remove() + print(f"Sandbox container cleaned up successfully") + except Exception as e: + print(f"Exception occurred during container cleanup: {str(e)}") + finally: + self.container = None + self.container_id = None diff --git a/maze/sandbox/server.py b/maze/sandbox/server.py new file mode 100644 index 00000000..432d4a15 --- /dev/null +++ b/maze/sandbox/server.py @@ -0,0 +1,89 @@ +import ray +from typing import Dict, Any +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel +from maze.sandbox.code_sandbox import CodeSandbox + +app = FastAPI(title="Code Sandbox Server", description="Remote code execution sandbox service") + +class CodeExecutionRequest(BaseModel): + code: str + timeout: float = 10.0 + cpu_nums: int = 1 + gpu_nums: int = 0 + memory_mb: int = 512 + +class SandboxSession: + def __init__(self, session_id: str, cpu_nums: int, gpu_nums: int, memory_mb: int): + self.session_id = session_id + self.cpu_nums = cpu_nums + self.gpu_nums = gpu_nums + self.memory_mb = memory_mb + self.sandbox = CodeSandbox(cpu_nums=cpu_nums, gpu_nums=gpu_nums, memory_mb=memory_mb) + +active_sessions: Dict[str, SandboxSession] = {} + + +@app.post("/create_session") +async def create_session(request: CodeExecutionRequest): + session_id = f"sandbox_{len(active_sessions) + 1}" + + try: + session = SandboxSession( + session_id=session_id, + cpu_nums=request.cpu_nums, + gpu_nums=request.gpu_nums, + memory_mb=request.memory_mb + ) + active_sessions[session_id] = session + + return { + "session_id": session_id, + "message": "Sandbox session created successfully", + "cpu_nums": request.cpu_nums, + "gpu_nums": request.gpu_nums, + "memory_mb": request.memory_mb + } + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to create sandbox session: {str(e)}") + +@app.post("/execute/{session_id}") +async def execute_code(session_id: str, request: CodeExecutionRequest): + if session_id not in active_sessions: + raise HTTPException(status_code=404, detail=f"Session {session_id} not found") + + session = active_sessions[session_id] + + try: + result = await session.sandbox.run_code(request.code, timeout=request.timeout) + return result + except Exception as e: + raise HTTPException(status_code=500, detail=f"Code execution failed: {str(e)}") + +@app.delete("/close/{session_id}") +async def close_session(session_id: str): + if session_id not in active_sessions: + raise HTTPException(status_code=404, detail=f"Session {session_id} not found") + + session = active_sessions[session_id] + try: + session.sandbox.close() + del active_sessions[session_id] + return {"message": f"Session {session_id} closed successfully"} + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to close session: {str(e)}") + + +@app.on_event("startup") +async def startup_event(): + ray.init() + +@app.on_event("shutdown") +async def shutdown_event(): + for session_id, session in list(active_sessions.items()): + try: + session.sandbox.close() + except Exception as e: + print(f"Error closing session {session_id}: {str(e)}") + finally: + del active_sessions[session_id] \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index bb4d6c7e..309859b9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -60,6 +60,8 @@ dependencies = [ "pandas>=2.3.3", "xgboost>=3.1.2", "scikit-learn>=1.7.2", + "docker>=7.1.0", + "aiohttp>=3.13.3", ] [project.optional-dependencies] @@ -77,6 +79,7 @@ Repository = "https://github.com/QinbinLi/Maze" [project.scripts] maze = "maze.cli.cli:main" +maze-sandbox = "maze.cli.sandbox_cli:main" [tool.setuptools.package-dir] "" = "." diff --git a/test/sandbox_test/test_sandbox.py b/test/sandbox_test/test_sandbox.py new file mode 100644 index 00000000..6a741813 --- /dev/null +++ b/test/sandbox_test/test_sandbox.py @@ -0,0 +1,29 @@ +import pytest +import asyncio +from maze.sandbox.client import CodeSandboxClient + + + +@pytest.mark.asyncio +async def test_sandbox_concurrent_execution(): + """Test concurrent execution in multiple sandboxes.""" + codesandbox1 = CodeSandboxClient(url="http://localhost:8000") + codesandbox2 = CodeSandboxClient(url="http://localhost:8000") + + try: + # Run simple code concurrently + result1, result2 = await asyncio.gather( + codesandbox1.run_code("print('hello world from sandbox1')"), + codesandbox2.run_code("print('hello world from sandbox2')") + ) + + assert 'stdout' in result1 + assert 'stdout' in result2 + assert 'hello world from sandbox1' in result1['stdout'] + assert 'hello world from sandbox2' in result2['stdout'] + + + finally: + await codesandbox1.close() + await codesandbox2.close() + \ No newline at end of file