Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ dependencies = [
"huggingface_hub>=0.20.0",
"openai>=2.7.2",
"tomli>=2.3.0",
"tomli-w>=1.2.0"
"tomli-w>=1.2.0",
"websockets>=15.0.1",
]

[project.optional-dependencies]
Expand Down
11 changes: 4 additions & 7 deletions src/openenv/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,10 @@
"""Core components for agentic environments."""

# Re-export main components from submodules for convenience
from .env_server import *
from .client_types import StepResult
from .http_env_client import HTTPEnvClient
from .env_server import * # noqa: F403
from .env_server import __all__ as _env_server_all


# Note: MCP module doesn't export anything yet

__all__ = [
"HTTPEnvClient",
"StepResult",
]
__all__ = list(_env_server_all)
47 changes: 46 additions & 1 deletion src/openenv/core/env_server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,32 @@
deserialize_action_with_preprocessing,
serialize_observation,
)
from .types import Action, Observation, State, SchemaResponse, HealthResponse
from .types import (
Action,
Observation,
State,
SchemaResponse,
HealthResponse,
WSMessage,
WSResetMessage,
WSStepMessage,
WSStateMessage,
WSCloseMessage,
WSObservationResponse,
WSStateResponse,
WSErrorResponse,
ConcurrencyConfig,
ServerCapacityStatus,
SessionInfo,
)
from .exceptions import (
OpenEnvError,
ConcurrencyConfigurationError,
SessionCapacityError,
SessionNotFoundError,
SessionCreationError,
EnvironmentFactoryError,
)
from .web_interface import create_web_interface_app, WebInterfaceManager

__all__ = [
Expand All @@ -30,6 +55,26 @@
"State",
"SchemaResponse",
"HealthResponse",
# WebSocket message types
"WSMessage",
"WSResetMessage",
"WSStepMessage",
"WSStateMessage",
"WSCloseMessage",
"WSObservationResponse",
"WSStateResponse",
"WSErrorResponse",
# Concurrency types
"ConcurrencyConfig",
"ServerCapacityStatus",
"SessionInfo",
# Exceptions
"OpenEnvError",
"ConcurrencyConfigurationError",
"SessionCapacityError",
"SessionNotFoundError",
"SessionCreationError",
"EnvironmentFactoryError",
# Base transforms
"CompositeTransform",
"NullTransform",
Expand Down
105 changes: 105 additions & 0 deletions src/openenv/core/env_server/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

"""Custom exceptions for environment server operations."""

from typing import Optional


class OpenEnvError(Exception):
"""Base exception for all OpenEnv errors."""

pass


class ConcurrencyConfigurationError(OpenEnvError):
"""
Raised when an environment is misconfigured for concurrent sessions.

This error is raised during server startup when max_concurrent_envs > 1
is specified for an environment that is not marked as CONCURRENCY_SAFE.
"""

def __init__(
self,
environment_name: str,
max_concurrent_envs: int,
message: Optional[str] = None,
):
self.environment_name = environment_name
self.max_concurrent_envs = max_concurrent_envs

if message is None:
message = (
f"Environment '{environment_name}' is not marked as CONCURRENCY_SAFE. "
f"Cannot run with max_concurrent_envs={max_concurrent_envs}. "
f"Either set max_concurrent_envs=1 or ensure the environment "
f"properly isolates session state and set CONCURRENCY_SAFE=True."
)

super().__init__(message)
Comment on lines +18 to +43
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not add the option to set a custom error message. Looking at the code, it doesn't seems that you are passing the optional message once so better to not implement it (and worth case, it's possible to implement it later or to subclass this exception for a different error message).

Suggested change
class ConcurrencyConfigurationError(OpenEnvError):
"""
Raised when an environment is misconfigured for concurrent sessions.
This error is raised during server startup when max_concurrent_envs > 1
is specified for an environment that is not marked as CONCURRENCY_SAFE.
"""
def __init__(
self,
environment_name: str,
max_concurrent_envs: int,
message: Optional[str] = None,
):
self.environment_name = environment_name
self.max_concurrent_envs = max_concurrent_envs
if message is None:
message = (
f"Environment '{environment_name}' is not marked as CONCURRENCY_SAFE. "
f"Cannot run with max_concurrent_envs={max_concurrent_envs}. "
f"Either set max_concurrent_envs=1 or ensure the environment "
f"properly isolates session state and set CONCURRENCY_SAFE=True."
)
super().__init__(message)
class ConcurrencyConfigurationError(OpenEnvError):
"""
Raised when an environment is misconfigured for concurrent sessions.
This error is raised during server startup when max_concurrent_envs > 1
is specified for an environment that is not marked as CONCURRENCY_SAFE.
"""
def __init__(self, environment_name: str, max_concurrent_envs: int):
self.environment_name = environment_name
self.max_concurrent_envs = max_concurrent_envs
message = (
f"Environment '{environment_name}' is not marked as CONCURRENCY_SAFE. "
f"Cannot run with max_concurrent_envs={max_concurrent_envs}. "
f"Either set max_concurrent_envs=1 or ensure the environment "
f"properly isolates session state and set CONCURRENCY_SAFE=True."
)
super().__init__(message)



class SessionCapacityError(OpenEnvError):
"""
Raised when the server cannot accept new sessions due to capacity limits.

This error is raised when a new WebSocket connection is attempted but
the server has already reached max_concurrent_envs active sessions.
"""

def __init__(
self,
active_sessions: int,
max_sessions: int,
message: Optional[str] = None,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for all other exceptions defined in this module

):
self.active_sessions = active_sessions
self.max_sessions = max_sessions

if message is None:
message = (
f"Server at capacity: {active_sessions}/{max_sessions} sessions active. "
f"Cannot accept new connections."
)

super().__init__(message)


class SessionNotFoundError(OpenEnvError):
"""Raised when attempting to access a session that does not exist."""

def __init__(self, session_id: str, message: Optional[str] = None):
self.session_id = session_id

if message is None:
message = f"Session '{session_id}' not found."

super().__init__(message)


class SessionCreationError(OpenEnvError):
"""Raised when a session cannot be created."""

def __init__(self, reason: str, message: Optional[str] = None):
self.reason = reason

if message is None:
message = f"Failed to create session: {reason}"

super().__init__(message)


class EnvironmentFactoryError(OpenEnvError):
"""Raised when the environment factory fails to create an instance."""

def __init__(self, factory_name: str, cause: Exception):
self.factory_name = factory_name
self.cause = cause
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cause is already bound to the exception object when doing raise ... from e.

So instead of doing raise EnvironmentFactoryError(factory_name, e) from e you can just do raise EnvironmentFactoryError(factory_name) from e.

If you need to programmatically retrieve the cause of an exception, you should check e.__cause__. It's more pythonic than having a dedicated field for it.


message = f"Environment factory '{factory_name}' failed to create instance: {cause}"

super().__init__(message)
Loading