Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@
<br>


## 📰 News


- **2026-01**: We support the sandbox feature! [**Docs**](https://github.com/QinbinLi/Maze/tree/develop/examples/sandbox)

<br>

## 🚀Quick Start

## 1. Install
Expand Down Expand Up @@ -105,5 +112,3 @@ Here are two vedios which show the process of using builtin tasks and uploading
### User Defined Task Workflow
![Check Result Screenshot](https://meeting-agent1.oss-cn-beijing.aliyuncs.com/userdef_task.png)
[Check Result Video](https://meeting-agent1.oss-cn-beijing.aliyuncs.com/userdef_task.mp4)


38 changes: 38 additions & 0 deletions maze/cli/sandbox_cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import argparse
import sys
import uvicorn
from maze.config.logging_config import setup_logging
from maze.sandbox.server import app as sandbox_app

def start_sandbox(host: str = "0.0.0.0", port: int = 8000):
print(f"🚀 Starting Code Sandbox Server at {host}:{port}")

uvicorn.run(
sandbox_app,
host=host,
port=port,
log_level="info"
)


def main():
parser = argparse.ArgumentParser(prog="maze-sandbox", description="Maze code sandbox service")
subparsers = parser.add_subparsers(dest="command", required=True, help="Available commands")

# === start subcommand ===
start_parser = subparsers.add_parser("start", help="Start the sandbox server")
start_parser.add_argument("--host", type=str, metavar="HOST", help="Host for sandbox server", default="0.0.0.0")
start_parser.add_argument("--port", type=int, metavar="PORT", help="Port for sandbox server", default=8000)


# Parse args
args = parser.parse_args()
if args.command == "start":
start_sandbox(args.host, args.port)
else:
parser.print_help()
sys.exit(1)


if __name__ == "__main__":
main()
7 changes: 7 additions & 0 deletions maze/sandbox/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CPU_PERIOD = 100000

from maze.sandbox.client import CodeSandboxClient

__all__ = [
"CodeSandboxClient",
]
94 changes: 94 additions & 0 deletions maze/sandbox/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import aiohttp
from typing import Dict, Any, Optional

class CodeSandboxClient:
def __init__(self, url: str, cpu_nums: int = 1, gpu_nums: int = 0, memory_mb: int = 512):
"""
Initialize remote code sandbox client

Args:
url: Server URL (e.g., "http://localhost:8000")
cpu_nums: Number of CPUs
gpu_nums: Number of GPUs
memory_mb: Memory size in MB
"""
self.url = url.rstrip('/')
self.cpu_nums = cpu_nums
self.gpu_nums = gpu_nums
self.memory_mb = memory_mb
self.session_id: Optional[str] = None
self._session: Optional[aiohttp.ClientSession] = None

async def _ensure_session(self):
if self._session is None:
self._session = aiohttp.ClientSession()

async def _create_remote_session(self):
await self._ensure_session()

payload = {
"code": "",
"cpu_nums": self.cpu_nums,
"gpu_nums": self.gpu_nums,
"memory_mb": self.memory_mb
}

if self._session is None:
raise RuntimeError("Client session not initialized")

try:
async with self._session.post(f"{self.url}/create_session", json=payload) as response:
if response.status != 200:
text = await response.text()
raise Exception(f"Failed to create session: {response.status}, {text}")

result = await response.json()
self.session_id = result["session_id"]
return result
except Exception as e:
raise Exception(f"Error connecting to sandbox server: {str(e)}")

async def run_code(self, code: str, timeout: float = 120.0) -> Dict[str, Any]:
if not self.session_id:
await self._create_remote_session()

if self._session is None:
raise RuntimeError("Client session not initialized")

payload = {
"code": code,
"timeout": timeout
}

try:
async with self._session.post(
f"{self.url}/execute/{self.session_id}",
json=payload
) as response:
if response.status != 200:
text = await response.text()
raise Exception(f"Code execution failed: {response.status}, {text}")

result = await response.json()
return result
except Exception as e:
raise Exception(f"Error executing code remotely: {str(e)}")

async def close(self):
if self.session_id and self._session:
if self._session is not None:
try:
async with self._session.delete(f"{self.url}/close/{self.session_id}") as response:
if response.status != 200:
text = await response.text()
print(f"Warning: Failed to close session: {response.status}, {text}")
else:
print("Remote sandbox session closed successfully")
except Exception as e:
print(f"Warning: Error closing remote session: {str(e)}")

if self._session:
await self._session.close()
self._session = None

self.session_id = None
59 changes: 59 additions & 0 deletions maze/sandbox/code_sandbox.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import ray
from typing import Dict, Optional
from maze.sandbox.launcher import SandboxActor

class CodeSandbox:
def __init__(self, cpu_nums: int = 1, gpu_nums: int = 0, memory_mb: int = 512):
"""
Initialize code sandbox

Args:
cpu_nums: Number of CPUs
gpu_nums: Number of GPUs
memory_mb: Memory size in MB
"""
# Initialize Ray
if not ray.is_initialized():
ray.init()

self.cpu_nums = cpu_nums
self.gpu_nums = gpu_nums
self.memory_mb = memory_mb
self.actor = None

memory_bytes = memory_mb * 1024

# Create Ray Actor
self.actor = SandboxActor.options(
num_cpus=cpu_nums,
num_gpus=gpu_nums,
memory=memory_bytes
).remote(cpu_nums, gpu_nums, memory_mb)

async def run_code(self, code: str, timeout: float = 10.0) -> Dict[str, any]:
"""
Run code in sandbox

Args:
code: Python code string to execute
timeout: Execution timeout in seconds (default 10 seconds)

Returns:
Dictionary containing stdout, stderr, exit_code, timed_out
"""
if not self.actor:
raise RuntimeError("Sandbox actor not initialized")

return await self.actor.run_code.remote(code, timeout)

def close(self):
"""Close sandbox and clean up resources"""
if self.actor:
# Call actor's close method to clean up container
ray.get(self.actor.close.remote())

# Delete actor reference
del self.actor
self.actor = None
print("Code sandbox closed")

137 changes: 137 additions & 0 deletions maze/sandbox/launcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@

import asyncio
import docker
import ray
from typing import Any, Dict, Optional
import os
from maze.sandbox import CPU_PERIOD
import docker
import tarfile
import io

@ray.remote
class SandboxActor:
def __init__(self, cpu_nums: int, gpu_nums: int, memory_mb: int):
self.cpu_nums = cpu_nums
self.gpu_nums = gpu_nums
self.memory_mb = memory_mb
self.client = docker.from_env()
self.container = None
self.container_id = None

self._start_container()

def _start_container(self):
"""Start sandbox container"""
try:

# Start container
gpu_ids_str = os.environ.get('CUDA_VISIBLE_DEVICES', '') # e.g., "0" or "0,1"
if gpu_ids_str:
device_ids = gpu_ids_str.split(',')
else:
device_ids = []
self.container = self.client.containers.run(
image="pytorch/pytorch",
command="tail -f /dev/null",
user='nobody',
detach=True,
remove=False,
network_mode='default',
cpu_period=CPU_PERIOD,
cpu_quota=CPU_PERIOD * self.cpu_nums,
mem_limit=f"{self.memory_mb}m",
device_requests=[
docker.types.DeviceRequest(
device_ids=device_ids,
capabilities=[['gpu']] # ⚠️ 强烈建议加上这个!
)
]
)

self.container_id = self.container.id
print(f"Sandbox container started, ID: {self.container_id[:12]}")

except Exception as e:
print(f"Failed to start sandbox container: {str(e)}")
raise e

async def run_code(self, code: str, timeout: float = 10.0) -> Dict[str, any]:
"""Execute code in container"""
if not self.container:
return {
"stdout": "",
"stderr": "Container not initialized",
"exit_code": -1,
"timed_out": True
}

try:
code_bytes = code.encode('utf-8')
tar_stream = io.BytesIO()
with tarfile.open(fileobj=tar_stream, mode='w') as tar:
tarinfo = tarfile.TarInfo(name='tmp.py')
tarinfo.size = len(code_bytes)
tar.addfile(tarinfo, io.BytesIO(code_bytes))
tar_stream.seek(0)
self.container.put_archive(path='/tmp', data=tar_stream.getvalue())


cmd = f"python /tmp/tmp.py"

loop = asyncio.get_event_loop()
result = await asyncio.wait_for(
loop.run_in_executor(None, self._exec_command, cmd),
timeout=timeout
)

return result
except asyncio.TimeoutError:
# Handle timeout but don't destroy container
print("Code execution timed out")
return {
"stdout": "",
"stderr": "Execution timed out after specified duration",
"exit_code": -1,
"timed_out": True
}
except Exception as e:
print(f"Exception occurred during code execution: {str(e)}")
return {
"stdout": "",
"stderr": str(e),
"exit_code": -1,
"timed_out": False
}

def _exec_command(self, cmd: str) -> Dict[str, any]:
"""Synchronous method to execute command in container"""
try:
result = self.container.exec_run(cmd)
return {
"stdout": result.output.decode('utf-8'),
"stderr": "", # exec_run usually merges stderr into stdout
"exit_code": result.exit_code,
"timed_out": False
}
except Exception as e:
return {
"stdout": "",
"stderr": str(e),
"exit_code": -1,
"timed_out": False
}

def close(self):
"""Close and clean up container"""
if self.container:
try:
print(f"Stopping and removing sandbox container {self.container_id[:12]}")
self.container.stop()
self.container.remove()
print(f"Sandbox container cleaned up successfully")
except Exception as e:
print(f"Exception occurred during container cleanup: {str(e)}")
finally:
self.container = None
self.container_id = None
Loading