Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
326 changes: 319 additions & 7 deletions aenv/src/aenv/client/scheduler_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
EnvInstance,
EnvInstanceCreateRequest,
EnvInstanceListResponse,
EnvService,
EnvStatus,
)

Expand Down Expand Up @@ -157,11 +158,9 @@ async def create_env_instance(
logger.info(f"Environment instance created: {instance.id}")
return instance
else:
error_msg = getattr(
api_response, "error_message", "Unknown error"
)
error_msg = api_response.get_error_message()
raise EnvironmentError(
f"Failed to create instance: {error_msg}, rsp: {api_response}"
f"Failed to create instance: {error_msg}"
)
except ValueError as e:
raise EnvironmentError(
Expand Down Expand Up @@ -211,9 +210,7 @@ async def get_env_instance(self, instance_id: str) -> EnvInstance:
)
return instance
else:
error_msg = getattr(
api_response, "error_message", "Unknown error"
)
error_msg = api_response.get_error_message()
raise EnvironmentError(f"Failed to get instance: {error_msg}")
except ValueError as e:
raise EnvironmentError(
Expand Down Expand Up @@ -354,3 +351,318 @@ async def wait_for_status(
)

await asyncio.sleep(check_interval)

# ========== Service Management Methods ==========

async def create_env_service(
self,
name: str,
replicas: int = 1,
environment_variables: Optional[Dict[str, str]] = None,
owner: Optional[str] = None,
# Storage configuration
pvc_name: Optional[str] = None,
mount_path: Optional[str] = None,
storage_size: Optional[str] = None, # If specified, PVC will be created
# Service configuration
port: Optional[int] = None,
# Resource limits
cpu_request: Optional[str] = None,
cpu_limit: Optional[str] = None,
memory_request: Optional[str] = None,
memory_limit: Optional[str] = None,
ephemeral_storage_request: Optional[str] = None,
ephemeral_storage_limit: Optional[str] = None,
) -> "EnvService":
"""
Create a new environment service (Deployment + Service + optionally PVC).

Args:
name: Service name (envName format: name@version)
replicas: Number of replicas (default: 1, must be 1 if storage_size is specified)
environment_variables: Optional environment variables
owner: Optional owner of the service
pvc_name: Optional PVC name (default: envName)
mount_path: Optional mount path (default: /home/admin/data)
storage_size: Optional storage size (e.g., "10Gi"). If specified, PVC will be created and replicas must be 1.
storageClass is configured in helm values.yaml deployment, not via API.
port: Optional service port (default: 8080)
cpu_request: Optional CPU request (default: 1)
cpu_limit: Optional CPU limit (default: 2)
memory_request: Optional memory request (default: 2Gi)
memory_limit: Optional memory limit (default: 4Gi)
ephemeral_storage_request: Optional ephemeral storage request (default: 5Gi)
ephemeral_storage_limit: Optional ephemeral storage limit (default: 10Gi)

Returns:
Created EnvService

Raises:
EnvironmentError: If creation fails
NetworkError: If network request fails
"""
if not self._client:
raise NetworkError("Client not connected")

from aenv.core.models import EnvServiceCreateRequest

logger.info(
f"Creating environment service: {name}, replicas: {replicas}, owner: {owner}"
)
request = EnvServiceCreateRequest(
envName=name,
replicas=replicas,
environment_variables=environment_variables,
owner=owner,
pvc_name=pvc_name,
mount_path=mount_path,
storage_size=storage_size,
port=port,
cpu_request=cpu_request,
cpu_limit=cpu_limit,
memory_request=memory_request,
memory_limit=memory_limit,
ephemeral_storage_request=ephemeral_storage_request,
ephemeral_storage_limit=ephemeral_storage_limit,
)

for attempt in range(self.max_retries + 1):
try:
response = await self._client.post(
"/env-service",
json=request.model_dump(exclude_none=True),
)

try:
api_response = APIResponse(**response.json())
if api_response.success and api_response.data:
from aenv.core.models import EnvService

service = EnvService(**api_response.data)
logger.info(f"Environment service created: {service.id}")
return service
else:
error_msg = api_response.get_error_message()
raise EnvironmentError(f"Failed to create service: {error_msg}")
except ValueError as e:
raise EnvironmentError(
f"Invalid server response: {response.status_code} - {response.text[:200]}"
) from e

except httpx.RequestError as e:
import random
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The import random statement is located inside an except block. According to PEP 8, imports should generally be at the top of the file. This improves readability and makes dependencies clear. Please move this import to the top of the file.


if attempt < self.max_retries:
wait_time = 2**attempt + random.uniform(0, 1)
logger.warning(
f"Network error, retrying in {wait_time:.2f}s: {str(e)}"
)
await asyncio.sleep(wait_time)
continue
raise NetworkError(f"Network error: {str(e)}") from e

async def get_env_service(self, service_id: str) -> "EnvService":
"""
Get environment service by ID.

Args:
service_id: Environment service ID

Returns:
EnvService details

Raises:
EnvironmentError: If service not found
NetworkError: If network request fails
"""
if not self._client:
raise NetworkError("Client not connected")

logger.debug(f"Querying environment service: {service_id}")
for attempt in range(self.max_retries + 1):
try:
response = await self._client.get(f"/env-service/{service_id}")

try:
api_response = APIResponse(**response.json())
if api_response.success and api_response.data:
from aenv.core.models import EnvService

service = EnvService(**api_response.data)
logger.debug(
f"Service status: {service.id} -> {service.status}"
)
return service
else:
error_msg = api_response.get_error_message()
raise EnvironmentError(f"Failed to get service: {error_msg}")
except ValueError as e:
raise EnvironmentError(
f"Invalid server response: {response.status_code} - {response.text[:200]}"
) from e

except httpx.RequestError as e:
if attempt < self.max_retries:
wait_time = 2**attempt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The retry backoff logic is inconsistent across different methods in this class. The create_env_service method uses exponential backoff with jitter (2**attempt + random.uniform(0, 1)), which is a best practice to prevent thundering herd issues. However, this method and others (list_env_services, delete_env_service, update_env_service) use simple exponential backoff without jitter. For consistency and robustness, I recommend applying the same jittered backoff strategy to all retry mechanisms in this client.

Suggested change
wait_time = 2**attempt
wait_time = 2**attempt + random.uniform(0, 1)

logger.warning(f"Network error, retrying in {wait_time}s: {str(e)}")
await asyncio.sleep(wait_time)
continue
raise NetworkError(f"Network error: {str(e)}") from e

async def list_env_services(
self,
env_name: Optional[str] = None,
) -> List["EnvService"]:
"""
List environment services.

Args:
env_name: Optional environment name filter

Returns:
List of EnvService

Raises:
EnvironmentError: If listing fails
NetworkError: If network request fails
"""
if not self._client:
raise NetworkError("Client not connected")

url = "/env-service/*/list"
if env_name:
url = f"/env-service/{env_name}/list"

for attempt in range(self.max_retries + 1):
try:
response = await self._client.get(url)

try:
api_response = APIResponse(**response.json())
if api_response.success and api_response.data:
if isinstance(api_response.data, list):
from aenv.core.models import EnvService

return [EnvService(**item) for item in api_response.data]
return []
else:
error_msg = api_response.get_error_message()
raise EnvironmentError(f"Failed to list services: {error_msg}")
except ValueError as e:
raise EnvironmentError(
f"Invalid server response: {response.status_code} - {response.text[:200]}"
) from e

except httpx.RequestError as e:
if attempt < self.max_retries:
await asyncio.sleep(2**attempt)
continue
raise NetworkError(f"Network error: {str(e)}") from e

async def delete_env_service(
self, service_id: str, delete_storage: bool = False
) -> bool:
"""
Delete environment service.

Args:
service_id: Environment service ID
delete_storage: If True, also delete associated storage (PVC). Default False.

Returns:
True if deletion successful

Raises:
EnvironmentError: If deletion fails
NetworkError: If network request fails
"""
if not self._client:
raise NetworkError("Client not connected")

# Build URL with query parameter if delete_storage is True
url = f"/env-service/{service_id}"
if delete_storage:
url += "?deleteStorage=true"

for attempt in range(self.max_retries + 1):
try:
response = await self._client.delete(url)

try:
api_response = APIResponse(**response.json())
return api_response.success
except ValueError as e:
raise EnvironmentError(
f"Invalid server response: {response.status_code} - {response.text[:200]}"
) from e

except httpx.RequestError as e:
if attempt < self.max_retries:
await asyncio.sleep(2**attempt)
continue
raise NetworkError(f"Network error: {str(e)}") from e

async def update_env_service(
self,
service_id: str,
replicas: Optional[int] = None,
image: Optional[str] = None,
environment_variables: Optional[Dict[str, str]] = None,
) -> "EnvService":
"""
Update environment service.

Args:
service_id: Environment service ID
replicas: Optional number of replicas
image: Optional container image
environment_variables: Optional environment variables

Returns:
Updated EnvService

Raises:
EnvironmentError: If update fails
NetworkError: If network request fails
"""
if not self._client:
raise NetworkError("Client not connected")

from aenv.core.models import EnvServiceUpdateRequest

request = EnvServiceUpdateRequest(
replicas=replicas,
image=image,
environment_variables=environment_variables,
)

for attempt in range(self.max_retries + 1):
try:
response = await self._client.put(
f"/env-service/{service_id}",
json=request.model_dump(exclude_none=True),
)

try:
api_response = APIResponse(**response.json())
if api_response.success and api_response.data:
from aenv.core.models import EnvService

service = EnvService(**api_response.data)
logger.info(f"Environment service updated: {service.id}")
return service
else:
error_msg = api_response.get_error_message()
raise EnvironmentError(f"Failed to update service: {error_msg}")
except ValueError as e:
raise EnvironmentError(
f"Invalid server response: {response.status_code} - {response.text[:200]}"
) from e

except httpx.RequestError as e:
if attempt < self.max_retries:
wait_time = 2**attempt
logger.warning(f"Network error, retrying in {wait_time}s: {str(e)}")
await asyncio.sleep(wait_time)
continue
raise NetworkError(f"Network error: {str(e)}") from e
Loading
Loading