Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ dependencies = [
"huggingface_hub>=0.20.0",
"openai>=2.7.2",
"tomli>=2.3.0",
"tomli-w>=1.2.0"
"tomli-w>=1.2.0",
"websockets>=15.0.1",
]

[project.optional-dependencies]
Expand Down
60 changes: 58 additions & 2 deletions src/openenv/cli/templates/openenv_env/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ The deployed space includes:
- **Web Interface** at `/web` - Interactive UI for exploring the environment
- **API Documentation** at `/docs` - Full OpenAPI/Swagger interface
- **Health Check** at `/health` - Container health monitoring
- **WebSocket** at `/ws` - Persistent session endpoint for low-latency interactions

## Environment Details

Expand Down Expand Up @@ -154,6 +155,61 @@ result = __ENV_NAME__env.step(__ENV_CLASS_NAME__Action(message="Hello!"))

Note: When connecting to an existing server, `__ENV_NAME__env.close()` will NOT stop the server.

### WebSocket Client for Persistent Sessions

For long-running episodes or when you need lower latency, use the WebSocket client:

```python
from __ENV_NAME__ import __ENV_CLASS_NAME__Action, __ENV_CLASS_NAME__EnvWS

# Connect via WebSocket (maintains persistent connection)
with __ENV_CLASS_NAME__EnvWS(base_url="http://localhost:8000") as env:
result = env.reset()
print(f"Reset: {result.observation.echoed_message}")
# Multiple steps with low latency
for msg in ["Hello", "World", "!"]:
result = env.step(__ENV_CLASS_NAME__Action(message=msg))
print(f"Echoed: {result.observation.echoed_message}")
```

WebSocket advantages:
- **Lower latency**: No HTTP connection overhead per request
- **Persistent session**: Server maintains your environment state
- **Efficient for episodes**: Better for many sequential steps

### Concurrent WebSocket Sessions

The server supports multiple concurrent WebSocket connections. To enable this,
modify `server/app.py` to use factory mode:

```python
# In server/app.py - use factory mode for concurrent sessions
app = create_app(
__ENV_CLASS_NAME__Environment, # Pass class, not instance
__ENV_CLASS_NAME__Action,
__ENV_CLASS_NAME__Observation,
max_concurrent_envs=4, # Allow 4 concurrent sessions
)
```

Then multiple clients can connect simultaneously:

```python
from __ENV_NAME__ import __ENV_CLASS_NAME__Action, __ENV_CLASS_NAME__EnvWS
from concurrent.futures import ThreadPoolExecutor

def run_episode(client_id: int):
with __ENV_CLASS_NAME__EnvWS(base_url="http://localhost:8000") as env:
result = env.reset()
for i in range(10):
result = env.step(__ENV_CLASS_NAME__Action(message=f"Client {client_id}, step {i}"))
return client_id, result.observation.message_length

# Run 4 episodes concurrently
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(run_episode, range(4)))
```

## Development & Testing

### Direct Environment Testing
Expand Down Expand Up @@ -189,11 +245,11 @@ __ENV_NAME__/
├── openenv.yaml # OpenEnv manifest
├── pyproject.toml # Project metadata and dependencies
├── uv.lock # Locked dependencies (generated)
├── client.py # __ENV_CLASS_NAME__Env client implementation
├── client.py # __ENV_CLASS_NAME__Env (HTTP) and __ENV_CLASS_NAME__EnvWS (WebSocket) clients
├── models.py # Action and Observation models
└── server/
├── __init__.py # Server module exports
├── __ENV_NAME___environment.py # Core environment logic
├── app.py # FastAPI application
├── app.py # FastAPI application (HTTP + WebSocket endpoints)
└── Dockerfile # Container image definition
```
10 changes: 7 additions & 3 deletions src/openenv/cli/templates/openenv_env/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@

"""__ENV_TITLE_NAME__ Environment - A simple test environment for HTTP server."""

from .client import __ENV_CLASS_NAME__Env
from .client import __ENV_CLASS_NAME__Env, __ENV_CLASS_NAME__EnvWS
from .models import __ENV_CLASS_NAME__Action, __ENV_CLASS_NAME__Observation

__all__ = ["__ENV_CLASS_NAME__Action", "__ENV_CLASS_NAME__Observation", "__ENV_CLASS_NAME__Env"]

__all__ = [
"__ENV_CLASS_NAME__Action",
"__ENV_CLASS_NAME__Observation",
"__ENV_CLASS_NAME__Env",
"__ENV_CLASS_NAME__EnvWS",
]
95 changes: 92 additions & 3 deletions src/openenv/cli/templates/openenv_env/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@
# LICENSE file in the root directory of this source tree.

"""
__ENV_TITLE_NAME__ Environment HTTP Client.
__ENV_TITLE_NAME__ Environment Clients.

This module provides the client for connecting to a __ENV_TITLE_NAME__ Environment server
over HTTP.
This module provides clients for connecting to a __ENV_TITLE_NAME__ Environment server:
- __ENV_CLASS_NAME__Env: HTTP client for request/response interactions
- __ENV_CLASS_NAME__EnvWS: WebSocket client for persistent sessions
"""

from typing import Any, Dict

from openenv.core.client_types import StepResult
from openenv.core.env_server.types import State
from openenv.core.http_env_client import HTTPEnvClient
from openenv.core.ws_env_client import WebSocketEnvClient

from .models import __ENV_CLASS_NAME__Action, __ENV_CLASS_NAME__Observation

Expand Down Expand Up @@ -98,3 +100,90 @@ def _parse_state(self, payload: Dict) -> State:
episode_id=payload.get("episode_id"),
step_count=payload.get("step_count", 0),
)


class __ENV_CLASS_NAME__EnvWS(WebSocketEnvClient[__ENV_CLASS_NAME__Action, __ENV_CLASS_NAME__Observation]):
"""
WebSocket client for the __ENV_TITLE_NAME__ Environment.

This client maintains a persistent WebSocket connection to the environment server,
enabling efficient multi-step interactions with lower latency than HTTP.
Each client instance has its own dedicated environment session on the server.

Advantages over HTTP client:
- Lower latency for sequential interactions (no connection overhead per request)
- Session state is maintained server-side
- Better suited for long-running episodes

Example:
>>> # Connect to a running server via WebSocket
>>> with __ENV_CLASS_NAME__EnvWS(base_url="http://localhost:8000") as client:
... result = client.reset()
... print(result.observation.echoed_message)
...
... result = client.step(__ENV_CLASS_NAME__Action(message="Hello!"))
... print(result.observation.echoed_message)

Example with Docker:
>>> # Automatically start container and connect via WebSocket
>>> client = __ENV_CLASS_NAME__EnvWS.from_docker_image("__ENV_NAME__-env:latest")
>>> try:
... result = client.reset()
... result = client.step(__ENV_CLASS_NAME__Action(message="Test"))
... finally:
... client.close()
"""

def _step_payload(self, action: __ENV_CLASS_NAME__Action) -> Dict:
"""
Convert __ENV_CLASS_NAME__Action to JSON payload for step message.

Args:
action: __ENV_CLASS_NAME__Action instance

Returns:
Dictionary representation suitable for JSON encoding
"""
return {
"message": action.message,
}

def _parse_result(self, payload: Dict) -> StepResult[__ENV_CLASS_NAME__Observation]:
"""
Parse WebSocket response into StepResult[__ENV_CLASS_NAME__Observation].

Args:
payload: JSON response data from server

Returns:
StepResult with __ENV_CLASS_NAME__Observation
"""
obs_data = payload.get("observation", {})
observation = __ENV_CLASS_NAME__Observation(
echoed_message=obs_data.get("echoed_message", ""),
message_length=obs_data.get("message_length", 0),
done=payload.get("done", False),
reward=payload.get("reward"),
metadata=obs_data.get("metadata", {}),
)

return StepResult(
observation=observation,
reward=payload.get("reward"),
done=payload.get("done", False),
)

def _parse_state(self, payload: Dict) -> State:
"""
Parse WebSocket state response into State object.

Args:
payload: JSON response from state request

Returns:
State object with episode_id and step_count
"""
return State(
episode_id=payload.get("episode_id"),
step_count=payload.get("step_count", 0),
)
10 changes: 4 additions & 6 deletions src/openenv/cli/templates/openenv_env/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,20 @@
The __ENV_NAME__ environment is a simple test environment that echoes back messages.
"""

from dataclasses import dataclass
from pydantic import Field

from openenv.core.env_server.types import Action, Observation


@dataclass(kw_only=True)
class __ENV_CLASS_NAME__Action(Action):
"""Action for the __ENV_TITLE_NAME__ environment - just a message to echo."""

message: str
message: str = Field(..., description="Message to echo back")


@dataclass(kw_only=True)
class __ENV_CLASS_NAME__Observation(Observation):
"""Observation from the __ENV_TITLE_NAME__ environment - the echoed message."""

echoed_message: str
message_length: int = 0
echoed_message: str = Field(default="", description="The echoed message")
message_length: int = Field(default=0, description="Length of the echoed message")

2 changes: 2 additions & 0 deletions src/openenv/cli/templates/openenv_env/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ description = "__ENV_TITLE_NAME__ environment for OpenEnv"
requires-python = ">=3.10"
dependencies = [
# Core OpenEnv runtime (provides FastAPI server + HTTP client types)
# install from github
# "openenv[core] @ git+https://github.com/meta-pytorch/OpenEnv.git",
"openenv[core]>=0.2.0",
# Environment-specific dependencies
# Add all dependencies needed for your environment here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ class __ENV_CLASS_NAME__Environment(Environment):
>>> print(obs.message_length) # 5
"""

# Enable concurrent WebSocket sessions.
# Set to True if your environment isolates state between instances.
# When True, multiple WebSocket clients can connect simultaneously, each
# getting their own environment instance (when using factory mode in app.py).
CONCURRENCY_SAFE: bool = True

def __init__(self):
"""Initialize the __ENV_NAME__ environment."""
self._state = State(episode_id=str(uuid4()), step_count=0)
Expand Down
17 changes: 12 additions & 5 deletions src/openenv/cli/templates/openenv_env/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@
FastAPI application for the __ENV_TITLE_NAME__ Environment.

This module creates an HTTP server that exposes the __ENV_CLASS_NAME__Environment
over HTTP endpoints, making it compatible with HTTPEnvClient.
over HTTP and WebSocket endpoints, compatible with HTTPEnvClient and WebSocketEnvClient.

Endpoints:
- POST /reset: Reset the environment
- POST /step: Execute an action
- GET /state: Get current environment state
- GET /schema: Get action/observation schemas
- WS /ws: WebSocket endpoint for persistent sessions

Usage:
# Development (with auto-reload):
Expand All @@ -28,18 +35,18 @@
"openenv is required for the web interface. Install dependencies with '\n uv sync\n'"
) from e

from __ENV_NAME__.models import __ENV_CLASS_NAME__Action, __ENV_CLASS_NAME__Observation
# Import from local models.py (PYTHONPATH includes /app/env in Docker)
from models import __ENV_CLASS_NAME__Action, __ENV_CLASS_NAME__Observation
from .__ENV_NAME___environment import __ENV_CLASS_NAME__Environment

# Create the environment instance
env = __ENV_CLASS_NAME__Environment()

# Create the app with web interface and README integration
app = create_app(
env,
__ENV_CLASS_NAME__Environment,
__ENV_CLASS_NAME__Action,
__ENV_CLASS_NAME__Observation,
env_name="__ENV_NAME__",
max_concurrent_envs=1, # increase this number to allow more concurrent WebSocket sessions
)


Expand Down
11 changes: 4 additions & 7 deletions src/openenv/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,10 @@
"""Core components for agentic environments."""

# Re-export main components from submodules for convenience
from .env_server import *
from .client_types import StepResult
from .http_env_client import HTTPEnvClient
from .env_server import * # noqa: F403
from .env_server import __all__ as _env_server_all


# Note: MCP module doesn't export anything yet

__all__ = [
"HTTPEnvClient",
"StepResult",
]
__all__ = list(_env_server_all)
47 changes: 46 additions & 1 deletion src/openenv/core/env_server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,32 @@
deserialize_action_with_preprocessing,
serialize_observation,
)
from .types import Action, Observation, State, SchemaResponse, HealthResponse
from .types import (
Action,
Observation,
State,
SchemaResponse,
HealthResponse,
WSMessage,
WSResetMessage,
WSStepMessage,
WSStateMessage,
WSCloseMessage,
WSObservationResponse,
WSStateResponse,
WSErrorResponse,
ConcurrencyConfig,
ServerCapacityStatus,
SessionInfo,
)
from .exceptions import (
OpenEnvError,
ConcurrencyConfigurationError,
SessionCapacityError,
SessionNotFoundError,
SessionCreationError,
EnvironmentFactoryError,
)
from .web_interface import create_web_interface_app, WebInterfaceManager

__all__ = [
Expand All @@ -30,6 +55,26 @@
"State",
"SchemaResponse",
"HealthResponse",
# WebSocket message types
"WSMessage",
"WSResetMessage",
"WSStepMessage",
"WSStateMessage",
"WSCloseMessage",
"WSObservationResponse",
"WSStateResponse",
"WSErrorResponse",
# Concurrency types
"ConcurrencyConfig",
"ServerCapacityStatus",
"SessionInfo",
# Exceptions
"OpenEnvError",
"ConcurrencyConfigurationError",
"SessionCapacityError",
"SessionNotFoundError",
"SessionCreationError",
"EnvironmentFactoryError",
# Base transforms
"CompositeTransform",
"NullTransform",
Expand Down
Loading