diff --git a/.gitignore b/.gitignore index 34bc804..056a7ef 100644 --- a/.gitignore +++ b/.gitignore @@ -144,3 +144,5 @@ cython_debug/ # python sphinx docs _build/ testing.py + +.DS_Store \ No newline at end of file diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 2d10c71..cf04d50 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,6 +1,8 @@ Changelog ========= +* Added inference client to run inference requests and get status and results + v1.9.0 (2025-04-04) ------------------- diff --git a/README.md b/README.md index a490856..8d5ee46 100644 --- a/README.md +++ b/README.md @@ -24,26 +24,41 @@ DataCrunch's Public API documentation [is available here](https://api.datacrunch - Generate your client credentials - [instructions in the public API docs](https://api.datacrunch.io/v1/docs#description/quick-start-guide). -- Add the client secret to an environment variable (don't want it to be hardcoded): + +- Add your client id and client secret to an environment variable (don't want it to be hardcoded): Linux (bash): ```bash - export DATACRUNCH_CLIENT_SECRET=Z4CZq02rdwdB7ISV0k4Z2gtwAFKiyvr2U1l0KDIeYi + export DATACRUNCH_CLIENT_ID=YOUR_ID_HERE + export DATACRUNCH_CLIENT_SECRET=YOUR_SECRET_HERE ``` +- To enable sending inference requests from SDK you must generate an inference key - [Instructions on inference authorization](https://docs.datacrunch.io/inference/authorization) + + +- Add your inference key to an environment variable + + Linux (bash): + + ```bash + export DATACRUNCH_INFERENCE_KEY=YOUR_API_KEY_HERE + ``` + Other platforms: https://en.wikipedia.org/wiki/Environment_variable + + - Example for creating a new instance: ```python import os from datacrunch import DataCrunchClient - # Get client secret from environment variable + # Get credentials from environment variables + CLIENT_ID = os.environ.get('DATACRUNCH_CLIENT_ID') CLIENT_SECRET = os.environ['DATACRUNCH_CLIENT_SECRET'] - CLIENT_ID = 'Ibk5bdxV64lKAWOqYnvSi' # Create datcrunch client datacrunch = DataCrunchClient(CLIENT_ID, CLIENT_SECRET) @@ -118,7 +133,7 @@ Create this file in the root directory of the project: from datacrunch.datacrunch import DataCrunchClient CLIENT_SECRET = 'secret' -CLIENT_ID = 'Ibk5bdxV64lKAWOqYnvSi' +CLIENT_ID = 'your-id' # Create datcrunch client datacrunch = DataCrunchClient(CLIENT_ID, CLIENT_SECRET, base_url='http://localhost:3001/v1') diff --git a/datacrunch/InferenceClient/__init__.py b/datacrunch/InferenceClient/__init__.py new file mode 100644 index 0000000..e680d86 --- /dev/null +++ b/datacrunch/InferenceClient/__init__.py @@ -0,0 +1,3 @@ +from .inference_client import InferenceClient, InferenceResponse + +__all__ = ['InferenceClient', 'InferenceResponse'] diff --git a/datacrunch/InferenceClient/inference_client.py b/datacrunch/InferenceClient/inference_client.py new file mode 100644 index 0000000..7835d35 --- /dev/null +++ b/datacrunch/InferenceClient/inference_client.py @@ -0,0 +1,343 @@ +from dataclasses import dataclass +from dataclasses_json import dataclass_json, Undefined # type: ignore +import requests +from requests.structures import CaseInsensitiveDict +from typing import Optional, Dict, Any, Union, Generator +from urllib.parse import urlparse +from enum import Enum + +class InferenceClientError(Exception): + """Base exception for InferenceClient errors.""" + pass + +class AsyncStatus(int, Enum): + Initialized = 0 + Queue = 1 + Inference = 2 + Completed = 3 + +@dataclass_json(undefined=Undefined.EXCLUDE) +@dataclass +class InferenceResponse: + headers: CaseInsensitiveDict[str] + status_code: int + status_text: str + _original_response: requests.Response + _stream: bool = False + + def _is_stream_response(self, headers: CaseInsensitiveDict[str]) -> bool: + """Check if the response headers indicate a streaming response. + + Args: + headers: The response headers to check + + Returns: + bool: True if the response is likely a stream, False otherwise + """ + # Standard chunked transfer encoding + is_chunked_transfer = headers.get( + 'Transfer-Encoding', '').lower() == 'chunked' + # Server-Sent Events content type + is_event_stream = headers.get( + 'Content-Type', '').lower() == 'text/event-stream' + # NDJSON + is_ndjson = headers.get( + 'Content-Type', '').lower() == 'application/x-ndjson' + # Stream JSON + is_stream_json = headers.get( + 'Content-Type', '').lower() == 'application/stream+json' + # Keep-alive + is_keep_alive = headers.get( + 'Connection', '').lower() == 'keep-alive' + # No content length + has_no_content_length = 'Content-Length' not in headers + + # No Content-Length with keep-alive often suggests streaming (though not definitive) + is_keep_alive_and_no_content_length = is_keep_alive and has_no_content_length + + return (self._stream or is_chunked_transfer or is_event_stream or is_ndjson or + is_stream_json or is_keep_alive_and_no_content_length) + + def output(self, is_text: bool = False) -> Any: + try: + if is_text: + return self._original_response.text + return self._original_response.json() + except Exception as e: + # if the response is a stream (check headers), raise relevant error + if self._is_stream_response(self._original_response.headers): + raise InferenceClientError( + f"Response might be a stream, use the stream method instead") + raise InferenceClientError( + f"Failed to parse response as JSON: {str(e)}") + + def stream(self, chunk_size: int = 512, as_text: bool = True) -> Generator[Any, None, None]: + """Stream the response content. + + Args: + chunk_size: Size of chunks to stream, in bytes + as_text: If True, stream as text using iter_lines. If False, stream as binary using iter_content. + + Returns: + Generator yielding chunks of the response + """ + if as_text: + for chunk in self._original_response.iter_lines(chunk_size=chunk_size): + if chunk: + yield chunk + else: + for chunk in self._original_response.iter_content(chunk_size=chunk_size): + if chunk: + yield chunk + + +class InferenceClient: + def __init__(self, inference_key: str, endpoint_base_url: str, timeout_seconds: int = 60 * 5) -> None: + """ + Initialize the InferenceClient. + + Args: + inference_key: The authentication key for the API + endpoint_base_url: The base URL for the API + timeout_seconds: Request timeout in seconds + + Raises: + InferenceClientError: If the parameters are invalid + """ + if not inference_key: + raise InferenceClientError("inference_key cannot be empty") + + parsed_url = urlparse(endpoint_base_url) + if not parsed_url.scheme or not parsed_url.netloc: + raise InferenceClientError("endpoint_base_url must be a valid URL") + + self.inference_key = inference_key + self.endpoint_base_url = endpoint_base_url.rstrip('/') + self.base_domain = self.endpoint_base_url[:self.endpoint_base_url.rindex( + '/')] + self.deployment_name = self.endpoint_base_url[self.endpoint_base_url.rindex( + '/')+1:] + self.timeout_seconds = timeout_seconds + self._session = requests.Session() + self._global_headers = { + 'Authorization': f'Bearer {inference_key}', + 'Content-Type': 'application/json' + } + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self._session.close() + + @property + def global_headers(self) -> Dict[str, str]: + """ + Get the current global headers that will be used for all requests. + + Returns: + Dictionary of current global headers + """ + return self._global_headers.copy() + + def set_global_header(self, key: str, value: str) -> None: + """ + Set or update a global header that will be used for all requests. + + Args: + key: Header name + value: Header value + """ + self._global_headers[key] = value + + def set_global_headers(self, headers: Dict[str, str]) -> None: + """ + Set multiple global headers at once that will be used for all requests. + + Args: + headers: Dictionary of headers to set globally + """ + self._global_headers.update(headers) + + def remove_global_header(self, key: str) -> None: + """ + Remove a global header. + + Args: + key: Header name to remove from global headers + """ + if key in self._global_headers: + del self._global_headers[key] + + def _build_url(self, path: str) -> str: + """Construct the full URL by joining the base URL with the path.""" + return f"{self.endpoint_base_url}/{path.lstrip('/')}" + + def _build_request_headers(self, request_headers: Optional[Dict[str, str]] = None) -> Dict[str, str]: + """ + Build the final headers by merging global headers with request-specific headers. + + Args: + request_headers: Optional headers specific to this request + + Returns: + Merged headers dictionary + """ + headers = self._global_headers.copy() + if request_headers: + headers.update(request_headers) + return headers + + def _make_request(self, method: str, path: str, **kwargs) -> requests.Response: + """ + Make an HTTP request with error handling. + + Args: + method: HTTP method to use + path: API endpoint path + **kwargs: Additional arguments to pass to the request + + Returns: + Response object from the request + + Raises: + InferenceClientError: If the request fails + """ + timeout = kwargs.pop('timeout_seconds', self.timeout_seconds) + try: + response = self._session.request( + method=method, + url=self._build_url(path), + headers=self._build_request_headers( + kwargs.pop('headers', None)), + timeout=timeout, + **kwargs + ) + response.raise_for_status() + return response + except requests.exceptions.Timeout: + raise InferenceClientError( + f"Request to {path} timed out after {timeout} seconds") + except requests.exceptions.RequestException as e: + raise InferenceClientError(f"Request to {path} failed: {str(e)}") + + def run_sync(self, data: Dict[str, Any], path: str = "", timeout_seconds: int = 60 * 5, headers: Optional[Dict[str, str]] = None, http_method: str = "POST", stream: bool = False): + response = self._make_request( + http_method, path, json=data, timeout_seconds=timeout_seconds, headers=headers, stream=stream) + + return InferenceResponse( + headers=response.headers, + status_code=response.status_code, + status_text=response.reason, + _original_response=response + ) + + def run(self, data: Dict[str, Any], path: str = "", timeout_seconds: int = 60 * 5, headers: Optional[Dict[str, str]] = None, http_method: str = "POST", no_response: bool = False): + # Add relevant headers to the request, to indicate that the request is async + headers = headers or {} + if no_response: + # If no_response is True, use the "Prefer: respond-async-proxy" header to run async and don't wait for the response + headers['Prefer'] = 'respond-async-proxy' + self._make_request( + http_method, path, json=data, timeout_seconds=timeout_seconds, headers=headers) + return + # Add the "Prefer: respond-async" header to the request, to run async and wait for the response + headers['Prefer'] = 'respond-async' + + response = self._make_request( + http_method, path, json=data, timeout_seconds=timeout_seconds, headers=headers) + + result = response.json() + execution_id = result['Id'] + + return AsyncInferenceExecution(self, execution_id, AsyncStatus.Initialized) + + def get(self, path: str, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, timeout_seconds: Optional[int] = None) -> requests.Response: + return self._make_request('GET', path, params=params, headers=headers, timeout_seconds=timeout_seconds) + + def post(self, path: str, json: Optional[Dict[str, Any]] = None, data: Optional[Union[str, Dict[str, Any]]] = None, + params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, timeout_seconds: Optional[int] = None) -> requests.Response: + return self._make_request('POST', path, json=json, data=data, params=params, headers=headers, timeout_seconds=timeout_seconds) + + def put(self, path: str, json: Optional[Dict[str, Any]] = None, data: Optional[Union[str, Dict[str, Any]]] = None, + params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, timeout_seconds: Optional[int] = None) -> requests.Response: + return self._make_request('PUT', path, json=json, data=data, params=params, headers=headers, timeout_seconds=timeout_seconds) + + def delete(self, path: str, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, timeout_seconds: Optional[int] = None) -> requests.Response: + return self._make_request('DELETE', path, params=params, headers=headers, timeout_seconds=timeout_seconds) + + def patch(self, path: str, json: Optional[Dict[str, Any]] = None, data: Optional[Union[str, Dict[str, Any]]] = None, + params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, timeout_seconds: Optional[int] = None) -> requests.Response: + return self._make_request('PATCH', path, json=json, data=data, params=params, headers=headers, timeout_seconds=timeout_seconds) + + def head(self, path: str, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, timeout_seconds: Optional[int] = None) -> requests.Response: + return self._make_request('HEAD', path, params=params, headers=headers, timeout_seconds=timeout_seconds) + + def options(self, path: str, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, timeout_seconds: Optional[int] = None) -> requests.Response: + return self._make_request('OPTIONS', path, params=params, headers=headers, timeout_seconds=timeout_seconds) + + def health(self, healthcheck_path: str = "/health") -> requests.Response: + """ + Check the health status of the API. + + Returns: + requests.Response: The response from the health check + + Raises: + InferenceClientError: If the health check fails + """ + try: + return self.get(healthcheck_path) + except InferenceClientError as e: + raise InferenceClientError(f"Health check failed: {str(e)}") + + +@dataclass_json(undefined=Undefined.EXCLUDE) +@dataclass +class AsyncInferenceExecution: + _inference_client: 'InferenceClient' + id: str + _status: AsyncStatus + INFERENCE_ID_HEADER = 'X-Inference-Id' + + def status(self) -> AsyncStatus: + """Get the current stored status of the async inference execution. Only the status value type + + Returns: + AsyncStatus: The status object + """ + + return self._status + + def status_json(self) -> Dict[str, Any]: + """Get the current status of the async inference execution. Return the status json + + Returns: + Dict[str, Any]: The status response containing the execution status and other metadata + """ + url = f'{self._inference_client.base_domain}/status/{self._inference_client.deployment_name}' + response = self._inference_client._session.get( + url, headers=self._inference_client._build_request_headers({self.INFERENCE_ID_HEADER: self.id})) + + response_json = response.json() + self._status = AsyncStatus(response_json['Status']) + + return response_json + + def result(self) -> Dict[str, Any]: + """Get the results of the async inference execution. + + Returns: + Dict[str, Any]: The results of the inference execution + """ + url = f'{self._inference_client.base_domain}/result/{self._inference_client.deployment_name}' + response = self._inference_client._session.get( + url, headers=self._inference_client._build_request_headers({self.INFERENCE_ID_HEADER: self.id})) + + if response.headers['Content-Type'] == 'application/json': + return response.json() + else: + return {'result': response.text} + + # alias for get_results + output = result diff --git a/datacrunch/containers/containers.py b/datacrunch/containers/containers.py index 65c3d37..0ec28ae 100644 --- a/datacrunch/containers/containers.py +++ b/datacrunch/containers/containers.py @@ -1,8 +1,17 @@ -from dataclasses import dataclass +"""Container deployment and management service for DataCrunch. + +This module provides functionality for managing container deployments, including +creation, updates, deletion, and monitoring of containerized applications. +""" + +from dataclasses import dataclass, field from dataclasses_json import dataclass_json, Undefined # type: ignore -from typing import List, Optional, Dict +from typing import List, Optional, Dict, Any from enum import Enum +from datacrunch.http_client.http_client import HTTPClient +from datacrunch.InferenceClient import InferenceClient, InferenceResponse + # API endpoints CONTAINER_DEPLOYMENTS_ENDPOINT = '/container-deployments' @@ -12,17 +21,23 @@ class EnvVarType(str, Enum): + """Types of environment variables that can be set in containers.""" + PLAIN = "plain" SECRET = "secret" class VolumeMountType(str, Enum): + """Types of volume mounts that can be configured for containers.""" + SCRATCH = "scratch" SECRET = "secret" MEMORY = "memory" class ContainerRegistryType(str, Enum): + """Supported container registry types.""" + GCR = "gcr" DOCKERHUB = "dockerhub" GITHUB = "ghcr" @@ -31,6 +46,8 @@ class ContainerRegistryType(str, Enum): class ContainerDeploymentStatus(str, Enum): + """Possible states of a container deployment.""" + INITIALIZING = "initializing" HEALTHY = "healthy" DEGRADED = "degraded" @@ -44,12 +61,14 @@ class ContainerDeploymentStatus(str, Enum): @dataclass_json @dataclass class HealthcheckSettings: - """Settings for container health checking. + """Configuration for container health checking. - :param enabled: Whether health checking is enabled - :param port: Port number to perform health check on - :param path: HTTP path to perform health check on + Attributes: + enabled: Whether health checking is enabled. + port: Port number to perform health check on. + path: HTTP path to perform health check on. """ + enabled: bool = True port: Optional[int] = None path: Optional[str] = None @@ -58,12 +77,14 @@ class HealthcheckSettings: @dataclass_json @dataclass class EntrypointOverridesSettings: - """Settings for overriding container entrypoint and command. + """Configuration for overriding container entrypoint and command. - :param enabled: Whether entrypoint overrides are enabled - :param entrypoint: List of strings forming the entrypoint command - :param cmd: List of strings forming the command arguments + Attributes: + enabled: Whether entrypoint overrides are enabled. + entrypoint: List of strings forming the entrypoint command. + cmd: List of strings forming the command arguments. """ + enabled: bool = True entrypoint: Optional[List[str]] = None cmd: Optional[List[str]] = None @@ -74,10 +95,12 @@ class EntrypointOverridesSettings: class EnvVar: """Environment variable configuration for containers. - :param name: Name of the environment variable - :param value_or_reference_to_secret: Direct value or reference to a secret - :param type: Type of the environment variable + Attributes: + name: Name of the environment variable. + value_or_reference_to_secret: Direct value or reference to a secret. + type: Type of the environment variable. """ + name: str value_or_reference_to_secret: str type: EnvVarType @@ -88,10 +111,12 @@ class EnvVar: class VolumeMount: """Volume mount configuration for containers. - :param type: Type of volume mount - :param mount_path: Path where the volume should be mounted in the container - :param size_in_mb: Size of the volume in megabytes, only used for memory volume mounts + Attributes: + type: Type of volume mount. + mount_path: Path where the volume should be mounted in the container. + size_in_mb: Size of the memory volume in megabytes, only used for memory volume mounts """ + type: VolumeMountType mount_path: str size_in_mb: Optional[int] = None @@ -101,40 +126,20 @@ class VolumeMount: @dataclass class Container: """Container configuration for deployment creation and updates. - This class omits the name field which is managed by the system. - - :param image: Container image to use - :param exposed_port: Port to expose from the container - :param healthcheck: Optional health check configuration - :param entrypoint_overrides: Optional entrypoint override settings - :param env: Optional list of environment variables - :param volume_mounts: Optional list of volume mounts - """ - image: str - exposed_port: int - healthcheck: Optional[HealthcheckSettings] = None - entrypoint_overrides: Optional[EntrypointOverridesSettings] = None - env: Optional[List[EnvVar]] = None - volume_mounts: Optional[List[VolumeMount]] = None - -@dataclass_json -@dataclass -class ContainerInfo: - """Container configuration for deployments. - This class is read-only and includes the system-managed name field. - - :param name: Name of the container (system-managed) - :param image: Container image to use - :param exposed_port: Port to expose from the container - :param healthcheck: Optional health check configuration - :param entrypoint_overrides: Optional entrypoint override settings - :param env: Optional list of environment variables - :param volume_mounts: Optional list of volume mounts + Attributes: + image: Container image to use. + exposed_port: Port to expose from the container. + name: Name of the container (system-managed, read-only). + healthcheck: Optional health check configuration. + entrypoint_overrides: Optional entrypoint override settings. + env: Optional list of environment variables. + volume_mounts: Optional list of volume mounts. """ - name: str + image: str exposed_port: int + name: Optional[str] = None healthcheck: Optional[HealthcheckSettings] = None entrypoint_overrides: Optional[EntrypointOverridesSettings] = None env: Optional[List[EnvVar]] = None @@ -146,8 +151,10 @@ class ContainerInfo: class ContainerRegistryCredentials: """Credentials for accessing a container registry. - :param name: Name of the credentials + Attributes: + name: Name of the credentials. """ + name: str @@ -156,9 +163,11 @@ class ContainerRegistryCredentials: class ContainerRegistrySettings: """Settings for container registry access. - :param is_private: Whether the registry is private - :param credentials: Optional credentials for accessing private registry + Attributes: + is_private: Whether the registry is private. + credentials: Optional credentials for accessing private registry. """ + is_private: bool credentials: Optional[ContainerRegistryCredentials] = None @@ -168,10 +177,12 @@ class ContainerRegistrySettings: class ComputeResource: """Compute resource configuration. - :param name: Name of the compute resource - :param size: Size of the compute resource - :param is_available: Whether the compute resource is currently available + Attributes: + name: Name of the compute resource. + size: Size of the compute resource. + is_available: Whether the compute resource is currently available. """ + name: str size: int # Made optional since it's only used in API responses @@ -183,8 +194,10 @@ class ComputeResource: class ScalingPolicy: """Policy for controlling scaling behavior. - :param delay_seconds: Number of seconds to wait before applying scaling action + Attributes: + delay_seconds: Number of seconds to wait before applying scaling action. """ + delay_seconds: int @@ -193,8 +206,10 @@ class ScalingPolicy: class QueueLoadScalingTrigger: """Trigger for scaling based on queue load. - :param threshold: Queue load threshold that triggers scaling + Attributes: + threshold: Queue load threshold that triggers scaling. """ + threshold: float @@ -203,9 +218,11 @@ class QueueLoadScalingTrigger: class UtilizationScalingTrigger: """Trigger for scaling based on resource utilization. - :param enabled: Whether this trigger is enabled - :param threshold: Utilization threshold that triggers scaling + Attributes: + enabled: Whether this trigger is enabled. + threshold: Utilization threshold that triggers scaling. """ + enabled: bool threshold: Optional[float] = None @@ -215,10 +232,12 @@ class UtilizationScalingTrigger: class ScalingTriggers: """Collection of triggers that can cause scaling actions. - :param queue_load: Optional trigger based on queue load - :param cpu_utilization: Optional trigger based on CPU utilization - :param gpu_utilization: Optional trigger based on GPU utilization + Attributes: + queue_load: Optional trigger based on queue load. + cpu_utilization: Optional trigger based on CPU utilization. + gpu_utilization: Optional trigger based on GPU utilization. """ + queue_load: Optional[QueueLoadScalingTrigger] = None cpu_utilization: Optional[UtilizationScalingTrigger] = None gpu_utilization: Optional[UtilizationScalingTrigger] = None @@ -229,14 +248,16 @@ class ScalingTriggers: class ScalingOptions: """Configuration for automatic scaling behavior. - :param min_replica_count: Minimum number of replicas to maintain - :param max_replica_count: Maximum number of replicas allowed - :param scale_down_policy: Policy for scaling down replicas - :param scale_up_policy: Policy for scaling up replicas - :param queue_message_ttl_seconds: Time-to-live for queue messages in seconds - :param concurrent_requests_per_replica: Number of concurrent requests each replica can handle - :param scaling_triggers: Configuration for various scaling triggers + Attributes: + min_replica_count: Minimum number of replicas to maintain. + max_replica_count: Maximum number of replicas allowed. + scale_down_policy: Policy for scaling down replicas. + scale_up_policy: Policy for scaling up replicas. + queue_message_ttl_seconds: Time-to-live for queue messages in seconds. + concurrent_requests_per_replica: Number of concurrent requests each replica can handle. + scaling_triggers: Configuration for various scaling triggers. """ + min_replica_count: int max_replica_count: int scale_down_policy: ScalingPolicy @@ -250,48 +271,155 @@ class ScalingOptions: @dataclass class Deployment: """Configuration for creating or updating a container deployment. - This class uses Container instead of ContainerInfo to prevent name setting. - - :param name: Name of the deployment - :param container_registry_settings: Settings for accessing container registry - :param containers: List of container specifications in the deployment - :param compute: Compute resource configuration - :param is_spot: Whether is spot deployment - :param endpoint_base_url: Optional base URL for the deployment endpoint - :param scaling: Optional scaling configuration + + Attributes: + name: Name of the deployment. + container_registry_settings: Settings for accessing container registry. + containers: List of container specifications in the deployment. + compute: Compute resource configuration. + is_spot: Whether is spot deployment. + endpoint_base_url: Optional base URL for the deployment endpoint. + scaling: Optional scaling configuration. + created_at: Optional timestamp when the deployment was created. """ + name: str - container_registry_settings: ContainerRegistrySettings containers: List[Container] compute: ComputeResource + container_registry_settings: ContainerRegistrySettings = field( + default_factory=lambda: ContainerRegistrySettings(is_private=False)) is_spot: bool = False endpoint_base_url: Optional[str] = None scaling: Optional[ScalingOptions] = None + created_at: Optional[str] = None + _inference_client: Optional[InferenceClient] = None -@dataclass_json(undefined=Undefined.EXCLUDE) -@dataclass -class DeploymentInfo: - """Configuration for a container deployment. - This class is read-only and includes system-managed fields. - - :param name: Name of the deployment - :param container_registry_settings: Settings for accessing container registry - :param containers: List of containers in the deployment - :param compute: Compute resource configuration - :param is_spot: Whether is spot deployment - :param endpoint_base_url: Optional base URL for the deployment endpoint - :param scaling: Optional scaling configuration - :param created_at: Timestamp when the deployment was created - """ - name: str - container_registry_settings: ContainerRegistrySettings - containers: List[ContainerInfo] - compute: ComputeResource - is_spot: bool = False - endpoint_base_url: Optional[str] = None - scaling: Optional[ScalingOptions] = None - created_at: Optional[str] = None + def __str__(self): + """Returns a string representation of the deployment, excluding sensitive information. + + Returns: + str: A formatted string representation of the deployment. + """ + # Get all attributes except _inference_client + attrs = {k: v for k, v in self.__dict__.items() if k != + '_inference_client'} + # Format each attribute + attr_strs = [f"{k}={repr(v)}" for k, v in attrs.items()] + return f"Deployment({', '.join(attr_strs)})" + + def __repr__(self): + """Returns a repr representation of the deployment, excluding sensitive information. + + Returns: + str: A formatted string representation of the deployment. + """ + return self.__str__() + + @classmethod + def from_dict_with_inference_key(cls, data: Dict[str, Any], inference_key: str = None) -> 'Deployment': + """Creates a Deployment instance from a dictionary with an inference key. + + Args: + data: Dictionary containing deployment data. + inference_key: Inference key to set on the deployment. + + Returns: + Deployment: A new Deployment instance with the inference client initialized. + """ + deployment = Deployment.from_dict(data, infer_missing=True) + if inference_key and deployment.endpoint_base_url: + deployment._inference_client = InferenceClient( + inference_key=inference_key, + endpoint_base_url=deployment.endpoint_base_url + ) + return deployment + + def set_inference_client(self, inference_key: str) -> None: + """Sets the inference client for this deployment. + + Args: + inference_key: The inference key to use for authentication. + + Raises: + ValueError: If endpoint_base_url is not set. + """ + if self.endpoint_base_url is None: + raise ValueError( + "Endpoint base URL must be set to use inference client") + self._inference_client = InferenceClient( + inference_key=inference_key, + endpoint_base_url=self.endpoint_base_url + ) + + def _validate_inference_client(self) -> None: + """Validates that the inference client is initialized. + + Raises: + ValueError: If inference client is not initialized. + """ + if self._inference_client is None: + raise ValueError( + "Inference client not initialized. Use from_dict_with_inference_key or set_inference_client to initialize inference capabilities.") + + def run_sync(self, data: Dict[str, Any], path: str = "", timeout_seconds: int = 60 * 5, headers: Optional[Dict[str, str]] = None, http_method: str = "POST", stream: bool = False) -> InferenceResponse: + """Runs a synchronous inference request. + + Args: + data: The data to send in the request. + path: The endpoint path to send the request to. + timeout_seconds: Maximum time to wait for the response. + headers: Optional headers to include in the request. + http_method: The HTTP method to use for the request. + stream: Whether to stream the response. + + Returns: + InferenceResponse: The response from the inference request. + + Raises: + ValueError: If the inference client is not initialized. + """ + self._validate_inference_client() + return self._inference_client.run_sync(data, path, timeout_seconds, headers, http_method, stream) + + def run(self, data: Dict[str, Any], path: str = "", timeout_seconds: int = 60 * 5, headers: Optional[Dict[str, str]] = None, http_method: str = "POST", stream: bool = False): + """Runs an asynchronous inference request. + + Args: + data: The data to send in the request. + path: The endpoint path to send the request to. + timeout_seconds: Maximum time to wait for the response. + headers: Optional headers to include in the request. + http_method: The HTTP method to use for the request. + stream: Whether to stream the response. + + Returns: + The response from the inference request. + + Raises: + ValueError: If the inference client is not initialized. + """ + self._validate_inference_client() + return self._inference_client.run(data, path, timeout_seconds, headers, http_method, stream) + + def health(self): + """Checks the health of the deployed application. + + Returns: + The health check response. + + Raises: + ValueError: If the inference client is not initialized. + """ + self._validate_inference_client() + # build healthcheck path + healthcheck_path = "/health" + if self.containers and self.containers[0].healthcheck and self.containers[0].healthcheck.path: + healthcheck_path = self.containers[0].healthcheck.path + + return self._inference_client.health(healthcheck_path) + # Function alias + healthcheck = health @dataclass_json @@ -299,10 +427,12 @@ class DeploymentInfo: class ReplicaInfo: """Information about a deployment replica. - :param id: Unique identifier of the replica - :param status: Current status of the replica - :param started_at: Timestamp when the replica was started + Attributes: + id: Unique identifier of the replica. + status: Current status of the replica. + started_at: Timestamp when the replica was started. """ + id: str status: str started_at: str @@ -311,7 +441,13 @@ class ReplicaInfo: @dataclass_json @dataclass class Secret: - """A secret model class""" + """A secret model class. + + Attributes: + name: Name of the secret. + created_at: Timestamp when the secret was created. + """ + name: str created_at: str @@ -319,7 +455,13 @@ class Secret: @dataclass_json @dataclass class RegistryCredential: - """A container registry credential model class""" + """A container registry credential model class. + + Attributes: + name: Name of the registry credential. + created_at: Timestamp when the credential was created. + """ + name: str created_at: str @@ -327,7 +469,13 @@ class RegistryCredential: @dataclass_json @dataclass class BaseRegistryCredentials: - """Base class for registry credentials""" + """Base class for registry credentials. + + Attributes: + name: Name of the registry credential. + type: Type of the container registry. + """ + name: str type: ContainerRegistryType @@ -335,11 +483,24 @@ class BaseRegistryCredentials: @dataclass_json @dataclass class DockerHubCredentials(BaseRegistryCredentials): - """Credentials for DockerHub registry""" + """Credentials for DockerHub registry. + + Attributes: + username: DockerHub username. + access_token: DockerHub access token. + """ + username: str access_token: str def __init__(self, name: str, username: str, access_token: str): + """Initializes DockerHub credentials. + + Args: + name: Name of the credentials. + username: DockerHub username. + access_token: DockerHub access token. + """ super().__init__(name=name, type=ContainerRegistryType.DOCKERHUB) self.username = username self.access_token = access_token @@ -348,11 +509,24 @@ def __init__(self, name: str, username: str, access_token: str): @dataclass_json @dataclass class GithubCredentials(BaseRegistryCredentials): - """Credentials for GitHub Container Registry""" + """Credentials for GitHub Container Registry. + + Attributes: + username: GitHub username. + access_token: GitHub access token. + """ + username: str access_token: str def __init__(self, name: str, username: str, access_token: str): + """Initializes GitHub credentials. + + Args: + name: Name of the credentials. + username: GitHub username. + access_token: GitHub access token. + """ super().__init__(name=name, type=ContainerRegistryType.GITHUB) self.username = username self.access_token = access_token @@ -361,10 +535,21 @@ def __init__(self, name: str, username: str, access_token: str): @dataclass_json @dataclass class GCRCredentials(BaseRegistryCredentials): - """Credentials for Google Container Registry""" + """Credentials for Google Container Registry. + + Attributes: + service_account_key: GCP service account key JSON. + """ + service_account_key: str def __init__(self, name: str, service_account_key: str): + """Initializes GCR credentials. + + Args: + name: Name of the credentials. + service_account_key: GCP service account key JSON. + """ super().__init__(name=name, type=ContainerRegistryType.GCR) self.service_account_key = service_account_key @@ -372,13 +557,30 @@ def __init__(self, name: str, service_account_key: str): @dataclass_json @dataclass class AWSECRCredentials(BaseRegistryCredentials): - """Credentials for AWS Elastic Container Registry""" + """Credentials for AWS Elastic Container Registry. + + Attributes: + access_key_id: AWS access key ID. + secret_access_key: AWS secret access key. + region: AWS region. + ecr_repo: ECR repository name. + """ + access_key_id: str secret_access_key: str region: str ecr_repo: str def __init__(self, name: str, access_key_id: str, secret_access_key: str, region: str, ecr_repo: str): + """Initializes AWS ECR credentials. + + Args: + name: Name of the credentials. + access_key_id: AWS access key ID. + secret_access_key: AWS secret access key. + region: AWS region. + ecr_repo: ECR repository name. + """ super().__init__(name=name, type=ContainerRegistryType.AWS_ECR) self.access_key_id = access_key_id self.secret_access_key = secret_access_key @@ -389,130 +591,154 @@ def __init__(self, name: str, access_key_id: str, secret_access_key: str, region @dataclass_json @dataclass class CustomRegistryCredentials(BaseRegistryCredentials): - """Credentials for custom container registries""" + """Credentials for custom container registries. + + Attributes: + docker_config_json: Docker config JSON containing registry credentials. + """ + docker_config_json: str def __init__(self, name: str, docker_config_json: str): + """Initializes custom registry credentials. + + Args: + name: Name of the credentials. + docker_config_json: Docker config JSON containing registry credentials. + """ super().__init__(name=name, type=ContainerRegistryType.CUSTOM) self.docker_config_json = docker_config_json class ContainersService: - """Service for managing container deployments""" + """Service for managing container deployments. + + This class provides methods for interacting with the DataCrunch container + deployment API, including CRUD operations for deployments and related resources. + """ - def __init__(self, http_client) -> None: - """Initialize the containers service + def __init__(self, http_client: HTTPClient, inference_key: str = None) -> None: + """Initializes the containers service. - :param http_client: HTTP client for making API requests - :type http_client: Any + Args: + http_client: HTTP client for making API requests. + inference_key: Optional inference key for authenticating inference requests. """ self.client = http_client + self._inference_key = inference_key - def get_deployments(self) -> List[DeploymentInfo]: - """Get all deployments + def get_deployments(self) -> List[Deployment]: + """Retrieves all container deployments. - :return: list of deployments - :rtype: List[DeploymentInfo] + Returns: + List[Deployment]: List of all deployments. """ response = self.client.get(CONTAINER_DEPLOYMENTS_ENDPOINT) - return [DeploymentInfo.from_dict(deployment, infer_missing=True) for deployment in response.json()] + return [Deployment.from_dict_with_inference_key(deployment, self._inference_key) for deployment in response.json()] - def get_deployment_by_name(self, deployment_name: str) -> DeploymentInfo: - """Get a deployment by name + def get_deployment_by_name(self, deployment_name: str) -> Deployment: + """Retrieves a specific deployment by name. - :param deployment_name: name of the deployment - :type deployment_name: str - :return: deployment - :rtype: DeploymentInfo + Args: + deployment_name: Name of the deployment to retrieve. + + Returns: + Deployment: The requested deployment. """ response = self.client.get( f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}") - return DeploymentInfo.from_dict(response.json(), infer_missing=True) + return Deployment.from_dict_with_inference_key(response.json(), self._inference_key) + + # Function alias + get_deployment = get_deployment_by_name def create_deployment( self, deployment: Deployment - ) -> DeploymentInfo: - """Create a new deployment + ) -> Deployment: + """Creates a new container deployment. - :param deployment: deployment configuration - :type deployment: Deployment - :return: created deployment - :rtype: DeploymentInfo + Args: + deployment: Deployment configuration to create. + + Returns: + Deployment: The created deployment. """ response = self.client.post( CONTAINER_DEPLOYMENTS_ENDPOINT, deployment.to_dict() ) - return DeploymentInfo.from_dict(response.json(), infer_missing=True) + return Deployment.from_dict_with_inference_key(response.json(), self._inference_key) + + def update_deployment(self, deployment_name: str, deployment: Deployment) -> Deployment: + """Updates an existing deployment. - def update_deployment(self, deployment_name: str, deployment: DeploymentInfo) -> DeploymentInfo: - """Update an existing deployment + Args: + deployment_name: Name of the deployment to update. + deployment: Updated deployment configuration. - :param deployment_name: name of the deployment to update - :type deployment_name: str - :param deployment: updated deployment - :type deployment: DeploymentInfo - :return: updated deployment - :rtype: DeploymentInfo + Returns: + Deployment: The updated deployment. """ response = self.client.patch( f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}", deployment.to_dict() ) - return DeploymentInfo.from_dict(response.json(), infer_missing=True) + return Deployment.from_dict_with_inference_key(response.json(), self._inference_key) def delete_deployment(self, deployment_name: str) -> None: - """Delete a deployment + """Deletes a deployment. - :param deployment_name: name of the deployment to delete - :type deployment_name: str + Args: + deployment_name: Name of the deployment to delete. """ self.client.delete( f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}") def get_deployment_status(self, deployment_name: str) -> ContainerDeploymentStatus: - """Get deployment status + """Retrieves the current status of a deployment. - :param deployment_name: name of the deployment - :type deployment_name: str - :return: deployment status - :rtype: ContainerDeploymentStatus + Args: + deployment_name: Name of the deployment. + + Returns: + ContainerDeploymentStatus: Current status of the deployment. """ response = self.client.get( f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}/status") return ContainerDeploymentStatus(response.json()["status"]) def restart_deployment(self, deployment_name: str) -> None: - """Restart a deployment + """Restarts a deployment. - :param deployment_name: name of the deployment to restart - :type deployment_name: str + Args: + deployment_name: Name of the deployment to restart. """ self.client.post( f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}/restart") def get_deployment_scaling_options(self, deployment_name: str) -> ScalingOptions: - """Get deployment scaling options + """Retrieves the scaling options for a deployment. + + Args: + deployment_name: Name of the deployment. - :param deployment_name: name of the deployment - :type deployment_name: str - :return: scaling options - :rtype: ScalingOptions + Returns: + ScalingOptions: Current scaling options for the deployment. """ response = self.client.get( f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}/scaling") return ScalingOptions.from_dict(response.json()) def update_deployment_scaling_options(self, deployment_name: str, scaling_options: ScalingOptions) -> ScalingOptions: - """Update deployment scaling options + """Updates the scaling options for a deployment. + + Args: + deployment_name: Name of the deployment. + scaling_options: New scaling options to apply. - :param deployment_name: name of the deployment - :type deployment_name: str - :param scaling_options: new scaling options - :type scaling_options: ScalingOptions - :return: updated scaling options - :rtype: ScalingOptions + Returns: + ScalingOptions: Updated scaling options for the deployment. """ response = self.client.patch( f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}/scaling", @@ -521,51 +747,53 @@ def update_deployment_scaling_options(self, deployment_name: str, scaling_option return ScalingOptions.from_dict(response.json()) def get_deployment_replicas(self, deployment_name: str) -> List[ReplicaInfo]: - """Get deployment replicas + """Retrieves information about deployment replicas. - :param deployment_name: name of the deployment - :type deployment_name: str - :return: list of replicas information - :rtype: List[ReplicaInfo] + Args: + deployment_name: Name of the deployment. + + Returns: + List[ReplicaInfo]: List of replica information. """ response = self.client.get( f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}/replicas") return [ReplicaInfo.from_dict(replica) for replica in response.json()["list"]] def purge_deployment_queue(self, deployment_name: str) -> None: - """Purge deployment queue + """Purges the deployment queue. - :param deployment_name: name of the deployment - :type deployment_name: str + Args: + deployment_name: Name of the deployment. """ self.client.post( f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}/purge-queue") def pause_deployment(self, deployment_name: str) -> None: - """Pause a deployment + """Pauses a deployment. - :param deployment_name: name of the deployment to pause - :type deployment_name: str + Args: + deployment_name: Name of the deployment to pause. """ self.client.post( f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}/pause") def resume_deployment(self, deployment_name: str) -> None: - """Resume a deployment + """Resumes a paused deployment. - :param deployment_name: name of the deployment to resume - :type deployment_name: str + Args: + deployment_name: Name of the deployment to resume. """ self.client.post( f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}/resume") def get_deployment_environment_variables(self, deployment_name: str) -> Dict[str, List[EnvVar]]: - """Get deployment environment variables + """Retrieves environment variables for a deployment. + + Args: + deployment_name: Name of the deployment. - :param deployment_name: name of the deployment - :type deployment_name: str - :return: dictionary mapping container names to their environment variables - :rtype: Dict[str, List[EnvVar]] + Returns: + Dict[str, List[EnvVar]]: Dictionary mapping container names to their environment variables. """ response = self.client.get( f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}/environment-variables") @@ -578,16 +806,15 @@ def get_deployment_environment_variables(self, deployment_name: str) -> Dict[str return result def add_deployment_environment_variables(self, deployment_name: str, container_name: str, env_vars: List[EnvVar]) -> Dict[str, List[EnvVar]]: - """Add environment variables to a container + """Adds environment variables to a container in a deployment. - :param deployment_name: name of the deployment - :type deployment_name: str - :param container_name: name of the container - :type container_name: str - :param env_vars: environment variables to add - :type env_vars: List[EnvVar] - :return: updated environment variables - :rtype: Dict[str, List[EnvVar]] + Args: + deployment_name: Name of the deployment. + container_name: Name of the container. + env_vars: List of environment variables to add. + + Returns: + Dict[str, List[EnvVar]]: Updated environment variables for all containers. """ response = self.client.post( f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}/environment-variables", @@ -603,16 +830,15 @@ def add_deployment_environment_variables(self, deployment_name: str, container_n return result def update_deployment_environment_variables(self, deployment_name: str, container_name: str, env_vars: List[EnvVar]) -> Dict[str, List[EnvVar]]: - """Update environment variables of a container + """Updates environment variables for a container in a deployment. + + Args: + deployment_name: Name of the deployment. + container_name: Name of the container. + env_vars: List of updated environment variables. - :param deployment_name: name of the deployment - :type deployment_name: str - :param container_name: name of the container - :type container_name: str - :param env_vars: updated environment variables - :type env_vars: List[EnvVar] - :return: updated environment variables - :rtype: Dict[str, List[EnvVar]] + Returns: + Dict[str, List[EnvVar]]: Updated environment variables for all containers. """ response = self.client.patch( f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}/environment-variables", @@ -628,16 +854,15 @@ def update_deployment_environment_variables(self, deployment_name: str, containe return result def delete_deployment_environment_variables(self, deployment_name: str, container_name: str, env_var_names: List[str]) -> Dict[str, List[EnvVar]]: - """Delete environment variables from a container + """Deletes environment variables from a container in a deployment. - :param deployment_name: name of the deployment - :type deployment_name: str - :param container_name: name of the container - :type container_name: str - :param env_var_names: names of environment variables to delete - :type env_var_names: List[str] - :return: remaining environment variables - :rtype: Dict[str, List[EnvVar]] + Args: + deployment_name: Name of the deployment. + container_name: Name of the container. + env_var_names: List of environment variable names to delete. + + Returns: + Dict[str, List[EnvVar]]: Updated environment variables for all containers. """ response = self.client.delete( f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}/environment-variables", @@ -651,72 +876,83 @@ def delete_deployment_environment_variables(self, deployment_name: str, containe env_var) for env_var in env_vars] return result - def get_compute_resources(self) -> List[ComputeResource]: - """Get available compute resources + def get_compute_resources(self, size: int = None, is_available: bool = None) -> List[ComputeResource]: + """Retrieves compute resources, optionally filtered by size and availability. + + Args: + size: Optional size to filter resources by (e.g. 8 for 8x GPUs) + available: Optional boolean to filter by availability status - :return: list of compute resources - :rtype: List[ComputeResource] + Returns: + List[ComputeResource]: List of compute resources matching the filters. + If no filters provided, returns all resources. """ response = self.client.get(SERVERLESS_COMPUTE_RESOURCES_ENDPOINT) resources = [] for resource_group in response.json(): for resource in resource_group: resources.append(ComputeResource.from_dict(resource)) + if size: + resources = [r for r in resources if r.size == size] + if is_available: + resources = [ + r for r in resources if r.is_available == is_available] return resources + # Function alias + get_gpus = get_compute_resources + def get_secrets(self) -> List[Secret]: - """Get all secrets + """Retrieves all secrets. - :return: list of secrets - :rtype: List[Secret] + Returns: + List[Secret]: List of all secrets. """ response = self.client.get(SECRETS_ENDPOINT) return [Secret.from_dict(secret) for secret in response.json()] def create_secret(self, name: str, value: str) -> None: - """Create a new secret + """Creates a new secret. - :param name: name of the secret - :type name: str - :param value: value of the secret - :type value: str + Args: + name: Name of the secret. + value: Value of the secret. """ self.client.post(SECRETS_ENDPOINT, {"name": name, "value": value}) def delete_secret(self, secret_name: str, force: bool = False) -> None: - """Delete a secret + """Deletes a secret. - :param secret_name: name of the secret to delete - :type secret_name: str - :param force: force delete even if secret is in use - :type force: bool + Args: + secret_name: Name of the secret to delete. + force: Whether to force delete even if secret is in use. """ self.client.delete( f"{SECRETS_ENDPOINT}/{secret_name}", params={"force": str(force).lower()}) def get_registry_credentials(self) -> List[RegistryCredential]: - """Get all registry credentials + """Retrieves all registry credentials. - :return: list of registry credentials - :rtype: List[RegistryCredential] + Returns: + List[RegistryCredential]: List of all registry credentials. """ response = self.client.get(CONTAINER_REGISTRY_CREDENTIALS_ENDPOINT) return [RegistryCredential.from_dict(credential) for credential in response.json()] def add_registry_credentials(self, credentials: BaseRegistryCredentials) -> None: - """Add registry credentials + """Adds new registry credentials. - :param credentials: Registry credentials object - :type credentials: BaseRegistryCredentials + Args: + credentials: Registry credentials to add. """ data = credentials.to_dict() self.client.post(CONTAINER_REGISTRY_CREDENTIALS_ENDPOINT, data) def delete_registry_credentials(self, credentials_name: str) -> None: - """Delete registry credentials + """Deletes registry credentials. - :param credentials_name: name of the credentials to delete - :type credentials_name: str + Args: + credentials_name: Name of the credentials to delete. """ self.client.delete( f"{CONTAINER_REGISTRY_CREDENTIALS_ENDPOINT}/{credentials_name}") diff --git a/datacrunch/datacrunch.py b/datacrunch/datacrunch.py index ce5005f..14c90ed 100644 --- a/datacrunch/datacrunch.py +++ b/datacrunch/datacrunch.py @@ -17,7 +17,7 @@ class DataCrunchClient: """Client for interacting with DataCrunch's public API""" - def __init__(self, client_id: str, client_secret: str, base_url: str = "https://api.datacrunch.io/v1") -> None: + def __init__(self, client_id: str, client_secret: str, base_url: str = "https://api.datacrunch.io/v1", inference_key: str = None) -> None: """The DataCrunch client :param client_id: client id @@ -26,6 +26,8 @@ def __init__(self, client_id: str, client_secret: str, base_url: str = "https:// :type client_secret: str :param base_url: base url for all the endpoints, optional, defaults to "https://api.datacrunch.io/v1" :type base_url: str, optional + :param inference_key: inference key, optional + :type inference_key: str, optional """ # Validate that client_id and client_secret are not empty @@ -75,5 +77,5 @@ def __init__(self, client_id: str, client_secret: str, base_url: str = "https:// """Locations service. Get locations""" self.containers: ContainersService = ContainersService( - self._http_client) + self._http_client, inference_key) """Containers service. Deploy, manage, and monitor container deployments""" diff --git a/docs/source/examples/containers/inference_sync.rst b/docs/source/examples/containers/inference_sync.rst new file mode 100644 index 0000000..a4c6ff2 --- /dev/null +++ b/docs/source/examples/containers/inference_sync.rst @@ -0,0 +1,8 @@ +Calling the inference endpoint in sync mode +=========================================== + +This example demonstrates how to call the inference endpoint in sync mode. + +.. literalinclude:: ../../../../examples/containers/calling_the_inference_endpoint_in_sync_mode.py + :language: python + :caption: Calling the inference endpoint in sync mode \ No newline at end of file diff --git a/examples/containers/calling_the_endpoint_asynchronously.py b/examples/containers/calling_the_endpoint_asynchronously.py new file mode 100644 index 0000000..7e713f8 --- /dev/null +++ b/examples/containers/calling_the_endpoint_asynchronously.py @@ -0,0 +1,43 @@ +import os +from time import sleep +from datacrunch import DataCrunchClient +from datacrunch.InferenceClient.inference_client import AsyncStatus + +# Configuration - replace with your deployment name +DEPLOYMENT_NAME = "sglang-deployment-example-20250411-160652" + +# Get client secret and id from environment variables +DATACRUNCH_CLIENT_ID = os.environ.get('DATACRUNCH_CLIENT_ID') +DATACRUNCH_CLIENT_SECRET = os.environ.get('DATACRUNCH_CLIENT_SECRET') +DATACRUNCH_INFERENCE_KEY = os.environ.get('DATACRUNCH_INFERENCE_KEY') + +# DataCrunch client instance +datacrunch = DataCrunchClient(DATACRUNCH_CLIENT_ID, DATACRUNCH_CLIENT_SECRET, inference_key=DATACRUNCH_INFERENCE_KEY) + +# Get the deployment +deployment = datacrunch.containers.get_deployment_by_name(DEPLOYMENT_NAME) + +# Make an asynchronous request to the endpoint. +# This example demonstrates calling a SGLang deployment which serves LLMs using an OpenAI-compatible API format +data = { + "model": "deepseek-ai/deepseek-llm-7b-chat", + "prompt": "Is consciousness fundamentally computational, or is there something more to subjective experience that cannot be reduced to information processing?", + "max_tokens": 128, + "temperature": 0.7, + "top_p": 0.9 +} + +header = { + "Content-Type": "application/json" +} + +response = deployment.run( + data=data, + path='v1/completions', + headers=header, +) + +while response.status() != AsyncStatus.Completed: + print(response.status_json()) + sleep(1) +print(response.output()) diff --git a/examples/containers/calling_the_endpoint_synchronously.py b/examples/containers/calling_the_endpoint_synchronously.py new file mode 100644 index 0000000..c65cca3 --- /dev/null +++ b/examples/containers/calling_the_endpoint_synchronously.py @@ -0,0 +1,33 @@ +import os +from datacrunch import DataCrunchClient + +# Configuration - replace with your deployment name +DEPLOYMENT_NAME = "sglang-deployment-example-20250411-160652" + +# Get client secret and id from environment variables +DATACRUNCH_CLIENT_ID = os.environ.get('DATACRUNCH_CLIENT_ID') +DATACRUNCH_CLIENT_SECRET = os.environ.get('DATACRUNCH_CLIENT_SECRET') +DATACRUNCH_INFERENCE_KEY = os.environ.get('DATACRUNCH_INFERENCE_KEY') + +# DataCrunch client instance +datacrunch = DataCrunchClient(DATACRUNCH_CLIENT_ID, DATACRUNCH_CLIENT_SECRET, inference_key=DATACRUNCH_INFERENCE_KEY) + +# Get the deployment +deployment = datacrunch.containers.get_deployment_by_name(DEPLOYMENT_NAME) + +# Make a synchronous request to the endpoint. +# This example demonstrates calling a SGLang deployment which serves LLMs using an OpenAI-compatible API format +data = { + "model": "deepseek-ai/deepseek-llm-7b-chat", + "prompt": "Is consciousness fundamentally computational, or is there something more to subjective experience that cannot be reduced to information processing?", + "max_tokens": 128, + "temperature": 0.7, + "top_p": 0.9 +} +response = deployment.run_sync( + data=data, + path='v1/completions' +) # wait for the response + +# Print the response +print(response.output()) diff --git a/examples/containers/compute_resources_example.py b/examples/containers/compute_resources_example.py index a2501c0..e6f2758 100644 --- a/examples/containers/compute_resources_example.py +++ b/examples/containers/compute_resources_example.py @@ -1,76 +1,29 @@ import os from datacrunch import DataCrunchClient -from typing import List -from datacrunch.containers.containers import ComputeResource # Get client secret and id from environment variables DATACRUNCH_CLIENT_ID = os.environ.get('DATACRUNCH_CLIENT_ID') DATACRUNCH_CLIENT_SECRET = os.environ.get('DATACRUNCH_CLIENT_SECRET') - -def list_all_compute_resources(client: DataCrunchClient) -> List[ComputeResource]: - """List all available compute resources. - - Args: - client (DataCrunchClient): The DataCrunch API client. - - Returns: - List[ComputeResource]: List of all compute resources. - """ - return client.containers.get_compute_resources() - - -def list_available_compute_resources(client: DataCrunchClient) -> List[ComputeResource]: - """List only the available compute resources. - - Args: - client (DataCrunchClient): The DataCrunch API client. - - Returns: - List[ComputeResource]: List of available compute resources. - """ - all_resources = client.containers.get_compute_resources() - return [r for r in all_resources if r.is_available] - - -def list_compute_resources_by_size(client: DataCrunchClient, size: int) -> List[ComputeResource]: - """List compute resources filtered by size. - - Args: - client (DataCrunchClient): The DataCrunch API client. - size (int): The size to filter by. - - Returns: - List[ComputeResource]: List of compute resources with the specified size. - """ - all_resources = client.containers.get_compute_resources() - return [r for r in all_resources if r.size == size] - - -def main(): - # Initialize the client with your credentials - datacrunch = DataCrunchClient( - DATACRUNCH_CLIENT_ID, DATACRUNCH_CLIENT_SECRET) - - # Example 1: List all compute resources - print("All compute resources:") - all_resources = list_all_compute_resources(datacrunch) - for resource in all_resources: - print( - f"Name: {resource.name}, Size: {resource.size}, Available: {resource.is_available}") - - # Example 2: List available compute resources - print("Available compute resources:") - available_resources = list_available_compute_resources(datacrunch) - for resource in available_resources: - print(f"Name: {resource.name}, Size: {resource.size}") - - # Example 3: List compute resources of size 8 - print("Compute resources with size 8:") - size_8_resources = list_compute_resources_by_size(datacrunch, 8) - for resource in size_8_resources: - print(f"Name: {resource.name}, Available: {resource.is_available}") - - -if __name__ == "__main__": - main() +# Initialize the client with your credentials +datacrunch = DataCrunchClient(DATACRUNCH_CLIENT_ID, DATACRUNCH_CLIENT_SECRET) + +# Example 1: List all compute resources +print("All compute resources:") +all_resources = datacrunch.containers.get_compute_resources() +for resource in all_resources: + print( + f"Name: {resource.name}, Size: {resource.size}, Available: {resource.is_available}") + +# Example 2: List available compute resources +print("\nAvailable compute resources:") +available_resources = datacrunch.containers.get_compute_resources( + is_available=True) +for resource in available_resources: + print(f"Name: {resource.name}, Size: {resource.size}") + +# Example 3: List compute resources of size 8 +print("\nCompute resources with size 8:") +size_8_resources = datacrunch.containers.get_compute_resources(size=8) +for resource in size_8_resources: + print(f"Name: {resource.name}, Available: {resource.is_available}") diff --git a/examples/containers/container_deployments_example.py b/examples/containers/container_deployments_example.py index cdc7652..38a4877 100644 --- a/examples/containers/container_deployments_example.py +++ b/examples/containers/container_deployments_example.py @@ -9,7 +9,7 @@ from datacrunch import DataCrunchClient from datacrunch.exceptions import APIException -from datacrunch.containers.containers import ( +from datacrunch.containers import ( Container, ComputeResource, EnvVar, diff --git a/examples/containers/delete_deployment_example.py b/examples/containers/delete_deployment_example.py new file mode 100644 index 0000000..4a1c98c --- /dev/null +++ b/examples/containers/delete_deployment_example.py @@ -0,0 +1,18 @@ +"""Example script demonstrating deleting a deployment using the DataCrunch API. +""" + +import os +from datacrunch import DataCrunchClient + +DEPLOYMENT_NAME = "sglang-deployment-example-20250411-160652" + +# Get confidential values from environment variables +DATACRUNCH_CLIENT_ID = os.environ.get('DATACRUNCH_CLIENT_ID') +DATACRUNCH_CLIENT_SECRET = os.environ.get('DATACRUNCH_CLIENT_SECRET') + +# Initialize client with inference key +datacrunch = DataCrunchClient(DATACRUNCH_CLIENT_ID, DATACRUNCH_CLIENT_SECRET) + +# Register signal handlers for cleanup +datacrunch.containers.delete_deployment(DEPLOYMENT_NAME) +print("Deployment deleted") \ No newline at end of file diff --git a/examples/containers/sglang_deployment_example.py b/examples/containers/sglang_deployment_example.py index df668db..32f5105 100644 --- a/examples/containers/sglang_deployment_example.py +++ b/examples/containers/sglang_deployment_example.py @@ -8,11 +8,11 @@ import time import signal import sys -import requests - +import json +from datetime import datetime from datacrunch import DataCrunchClient from datacrunch.exceptions import APIException -from datacrunch.containers.containers import ( +from datacrunch.containers import ( Container, ComputeResource, ScalingOptions, @@ -29,23 +29,20 @@ ContainerDeploymentStatus, ) +CURRENT_TIMESTAMP = datetime.now().strftime( + "%Y%m%d-%H%M%S").lower() # e.g. 20250403-120000 + # Configuration constants -DEPLOYMENT_NAME = "sglang-deployment-tutorial" -CONTAINER_NAME = "sglang-server" -MODEL_PATH = "deepseek-ai/deepseek-llm-7b-chat" +DEPLOYMENT_NAME = f"sglang-deployment-example-{CURRENT_TIMESTAMP}" +SGLANG_IMAGE_URL = "docker.io/lmsysorg/sglang:v0.4.1.post6-cu124" +DEEPSEEK_MODEL_PATH = "deepseek-ai/deepseek-llm-7b-chat" HF_SECRET_NAME = "huggingface-token" -IMAGE_URL = "docker.io/lmsysorg/sglang:v0.4.1.post6-cu124" -CONTAINERS_API_URL = f'https://containers.datacrunch.io/{DEPLOYMENT_NAME}' # Get confidential values from environment variables DATACRUNCH_CLIENT_ID = os.environ.get('DATACRUNCH_CLIENT_ID') DATACRUNCH_CLIENT_SECRET = os.environ.get('DATACRUNCH_CLIENT_SECRET') +DATACRUNCH_INFERENCE_KEY = os.environ.get('DATACRUNCH_INFERENCE_KEY') HF_TOKEN = os.environ.get('HF_TOKEN') -INFERENCE_API_KEY = os.environ.get('INFERENCE_API_KEY') - -# DataCrunch client instance (global for graceful shutdown) -datacrunch = None - def wait_for_deployment_health(datacrunch_client: DataCrunchClient, deployment_name: str, max_attempts: int = 20, delay: int = 30) -> bool: """Wait for deployment to reach healthy status. @@ -99,217 +96,194 @@ def graceful_shutdown(signum, frame) -> None: sys.exit(0) -def test_deployment(base_url: str, api_key: str) -> None: - """Test the deployment with a simple request. - - Args: - base_url: The base URL of the deployment - api_key: The API key for authentication - """ - # First, check if the model info endpoint is working - model_info_url = f"{base_url}/get_model_info" - headers = { - 'Authorization': f'Bearer {api_key}', - 'Content-Type': 'application/json' - } - +try: + # Get the inference API key + datacrunch_inference_key = DATACRUNCH_INFERENCE_KEY + if not datacrunch_inference_key: + datacrunch_inference_key = input( + "Enter your Inference API Key from the DataCrunch dashboard: ") + else: + print("Using Inference API Key from environment") + + # Initialize client with inference key + datacrunch = DataCrunchClient( + client_id=DATACRUNCH_CLIENT_ID, + client_secret=DATACRUNCH_CLIENT_SECRET, + inference_key=datacrunch_inference_key + ) + + # Register signal handlers for cleanup + signal.signal(signal.SIGINT, graceful_shutdown) + signal.signal(signal.SIGTERM, graceful_shutdown) + + # Create a secret for the Hugging Face token + print(f"Creating secret for Hugging Face token: {HF_SECRET_NAME}") try: - print("\nTesting /get_model_info endpoint...") - response = requests.get(model_info_url, headers=headers) - if response.status_code == 200: - print("Model info endpoint is working!") - print(f"Response: {response.json()}") + # Check if secret already exists + existing_secrets = datacrunch.containers.get_secrets() + secret_exists = any( + secret.name == HF_SECRET_NAME for secret in existing_secrets) + + if not secret_exists: + # check is HF_TOKEN is set, if not, prompt the user + if not HF_TOKEN: + HF_TOKEN = input( + "Enter your Hugging Face token: ") + datacrunch.containers.create_secret( + HF_SECRET_NAME, HF_TOKEN) + print(f"Secret '{HF_SECRET_NAME}' created successfully") else: - print(f"Request failed with status code {response.status_code}") - print(f"Response: {response.text}") - return - - # Now test completions endpoint - print("\nTesting completions API with streaming...") - completions_url = f"{base_url}/v1/completions" - - headers = { - 'Content-Type': 'application/json', - 'Authorization': f'Bearer {api_key}', - 'Accept': 'text/event-stream', - 'Cache-Control': 'no-cache', - 'Connection': 'keep-alive', - } + print( + f"Secret '{HF_SECRET_NAME}' already exists, using existing secret") + except APIException as e: + print(f"Error creating secret: {e}") + sys.exit(1) + + # Create container configuration + APP_PORT = 30000 + container = Container( + image=SGLANG_IMAGE_URL, + exposed_port=APP_PORT, + healthcheck=HealthcheckSettings( + enabled=True, + port=APP_PORT, + path="/health" + ), + entrypoint_overrides=EntrypointOverridesSettings( + enabled=True, + cmd=["python3", "-m", "sglang.launch_server", "--model-path", + DEEPSEEK_MODEL_PATH, "--host", "0.0.0.0", "--port", str(APP_PORT)] + ), + env=[ + EnvVar( + name="HF_TOKEN", + value_or_reference_to_secret=HF_SECRET_NAME, + type=EnvVarType.SECRET + ) + ] + ) + + # Create scaling configuration + scaling_options = ScalingOptions( + min_replica_count=1, + max_replica_count=5, + scale_down_policy=ScalingPolicy(delay_seconds=60 * 5), + scale_up_policy=ScalingPolicy( + delay_seconds=0), # No delay for scale up + queue_message_ttl_seconds=500, + # Modern LLM engines are optimized for batching requests, with minimal performance impact. Taking advantage of batching can significantly improve throughput. + concurrent_requests_per_replica=32, + scaling_triggers=ScalingTriggers( + # lower value means more aggressive scaling + queue_load=QueueLoadScalingTrigger(threshold=0.1), + cpu_utilization=UtilizationScalingTrigger( + enabled=True, + threshold=90 + ), + gpu_utilization=UtilizationScalingTrigger( + enabled=True, + threshold=90 + ) + ) + ) + + # Set compute settings. For a 7B model, General Compute (24GB VRAM) is sufficient + compute = ComputeResource(name="General Compute", size=1) + + # Create deployment object (no need to provide container_registry_settings because it's public) + deployment = Deployment( + name=DEPLOYMENT_NAME, + containers=[container], + compute=compute, + scaling=scaling_options, + is_spot=False + ) + + # Create the deployment + created_deployment = datacrunch.containers.create_deployment( + deployment) + print(f"Created deployment: {created_deployment.name}") + print("This could take several minutes while the model is downloaded and the server starts...") + + # Wait for deployment to be healthy + if not wait_for_deployment_health(datacrunch, DEPLOYMENT_NAME): + print("Deployment health check failed") + cleanup_resources(datacrunch) + sys.exit(1) - data = { - "model": MODEL_PATH, - "prompt": "Solar wind is a curious phenomenon. Tell me more about it", + # Test the deployment with a simple request + print("\nTesting the deployment...") + try: + # Test model info endpoint + print( + "Testing /get_model_info endpoint by making a sync GET request to the SGLang server...") + model_info_response = created_deployment._inference_client.get( + path="/get_model_info") + print("Model info endpoint is working!") + print(f"Response: {model_info_response}") + + # Test completions endpoint + print("\nTesting completions API...") + completions_data = { + "model": DEEPSEEK_MODEL_PATH, + "prompt": "Is consciousness fundamentally computational, or is there something more to subjective experience that cannot be reduced to information processing?", "max_tokens": 128, "temperature": 0.7, "top_p": 0.9, - "stream": True } - with requests.post(completions_url, headers=headers, json=data, stream=True) as response: - if response.status_code == 200: - print("Stream started. Receiving first 5 events...\n") - for i, line in enumerate(response.iter_lines(decode_unicode=True)): - if line: - print(line) - if i >= 4: # Only show first 5 events - print("...(response continues)...") - break - else: - print( - f"Request failed with status code {response.status_code}") - print(f"Response: {response.text}") - - except requests.RequestException as e: - print(f"An error occurred: {e}") - - -def main() -> None: - """Main function demonstrating SGLang deployment.""" - try: - if not HF_TOKEN: - print("Please set HF_TOKEN environment variable with your Hugging Face token") - return - - # Initialize client - global datacrunch - datacrunch = DataCrunchClient( - DATACRUNCH_CLIENT_ID, DATACRUNCH_CLIENT_SECRET) - - # Register signal handlers for cleanup - signal.signal(signal.SIGINT, graceful_shutdown) - signal.signal(signal.SIGTERM, graceful_shutdown) - - # Create a secret for the Hugging Face token - print(f"Creating secret for Hugging Face token: {HF_SECRET_NAME}") - try: - # Check if secret already exists - existing_secrets = datacrunch.containers.get_secrets() - secret_exists = any( - secret.name == HF_SECRET_NAME for secret in existing_secrets) - - if not secret_exists: - datacrunch.containers.create_secret( - HF_SECRET_NAME, HF_TOKEN) - print(f"Secret '{HF_SECRET_NAME}' created successfully") - else: - print( - f"Secret '{HF_SECRET_NAME}' already exists, using existing secret") - except APIException as e: - print(f"Error creating secret: {e}") - return - - # Create container configuration - container = Container( - image=IMAGE_URL, - exposed_port=30000, - healthcheck=HealthcheckSettings( - enabled=True, - port=30000, - path="/health" - ), - entrypoint_overrides=EntrypointOverridesSettings( - enabled=True, - cmd=["python3", "-m", "sglang.launch_server", "--model-path", - MODEL_PATH, "--host", "0.0.0.0", "--port", "30000"] - ), - env=[ - EnvVar( - name="HF_TOKEN", - value_or_reference_to_secret=HF_SECRET_NAME, - type=EnvVarType.SECRET - ) - ] - ) - - # Create scaling configuration - default values - scaling_options = ScalingOptions( - min_replica_count=1, - max_replica_count=2, - scale_down_policy=ScalingPolicy(delay_seconds=300), - scale_up_policy=ScalingPolicy(delay_seconds=300), - queue_message_ttl_seconds=500, - concurrent_requests_per_replica=1, - scaling_triggers=ScalingTriggers( - queue_load=QueueLoadScalingTrigger(threshold=1), - cpu_utilization=UtilizationScalingTrigger( - enabled=True, - threshold=90 - ), - gpu_utilization=UtilizationScalingTrigger( - enabled=True, - threshold=90 - ) - ) + # Make a sync inference request to the SGLang server + completions_response = created_deployment.run_sync( + completions_data, + path="/v1/completions", ) - - # Create registry and compute settings - registry_settings = ContainerRegistrySettings(is_private=False) - # For a 7B model, General Compute (24GB VRAM) is sufficient - compute = ComputeResource(name="General Compute", size=1) - - # Create deployment object - deployment = Deployment( - name=DEPLOYMENT_NAME, - container_registry_settings=registry_settings, - containers=[container], - compute=compute, - scaling=scaling_options, - is_spot=False + print("Completions API is working!") + print(f"Response: {completions_response.output()}\n") + + # Make a stream sync inference request to the SGLang server + completions_response_stream = created_deployment.run_sync( + {**completions_data, "stream": True}, + path="/v1/completions", + stream=True ) + print("Stream completions API is working!") + # Print the streamed response + for line in completions_response_stream.stream(as_text=True): + if line: + line = line.decode('utf-8') + + if line.startswith('data:'): + data = line[5:] # Remove 'data: ' prefix + if data == '[DONE]': + break + try: + event_data = json.loads(data) + token_text = event_data['choices'][0]['text'] - # Create the deployment - created_deployment = datacrunch.containers.create(deployment) - print(f"Created deployment: {created_deployment.name}") - print("This will take several minutes while the model is downloaded and the server starts...") - - # Wait for deployment to be healthy - if not wait_for_deployment_health(datacrunch, DEPLOYMENT_NAME): - print("Deployment health check failed") - cleanup_resources(datacrunch) - return - - # Get the deployment endpoint URL and inference API key - containers_api_url = CONTAINERS_API_URL - inference_api_key = INFERENCE_API_KEY - - # If not provided as environment variables, prompt the user - if not containers_api_url: - containers_api_url = input( - "Enter your Containers API URL from the DataCrunch dashboard: ") - else: - print( - f"Using Containers API URL from environment: {containers_api_url}") - - if not inference_api_key: - inference_api_key = input( - "Enter your Inference API Key from the DataCrunch dashboard: ") - else: - print("Using Inference API Key from environment") - - # Test the deployment - if containers_api_url and inference_api_key: - print("\nTesting the deployment...") - test_deployment(containers_api_url, inference_api_key) - - # Cleanup or keep running based on user input - keep_running = input( - "\nDo you want to keep the deployment running? (y/n): ") - if keep_running.lower() != 'y': - cleanup_resources(datacrunch) - else: - print( - f"Deployment {DEPLOYMENT_NAME} is running. Don't forget to delete it when finished.") - print("You can delete it from the DataCrunch dashboard or by running:") - print(f"datacrunch.containers.delete('{DEPLOYMENT_NAME}')") + # Print token immediately to show progress + print(token_text, end='', flush=True) + except json.JSONDecodeError: + continue except Exception as e: - print(f"Unexpected error: {e}") - # Attempt cleanup even if there was an error - try: - cleanup_resources(datacrunch) - except Exception as cleanup_error: - print(f"Error during cleanup after failure: {cleanup_error}") - + print(f"Error testing deployment: {e}") -if __name__ == "__main__": - main() + # Cleanup or keep running based on user input + keep_running = input( + "\nDo you want to keep the deployment running? (y/n): ") + if keep_running.lower() != 'y': + cleanup_resources(datacrunch) + else: + print( + f"Deployment {DEPLOYMENT_NAME} is running. Don't forget to delete it when finished.") + print("You can delete it from the DataCrunch dashboard or by running:") + print(f"datacrunch.containers.delete('{DEPLOYMENT_NAME}')") + +except Exception as e: + print(f"Unexpected error: {e}") + # Attempt cleanup even if there was an error + try: + cleanup_resources(datacrunch) + except Exception as cleanup_error: + print(f"Error during cleanup after failure: {cleanup_error}") + sys.exit(1) diff --git a/examples/containers/update_deployment_scaling_example.py b/examples/containers/update_deployment_scaling_example.py index d06f9d4..cd31d6e 100644 --- a/examples/containers/update_deployment_scaling_example.py +++ b/examples/containers/update_deployment_scaling_example.py @@ -7,7 +7,7 @@ from datacrunch import DataCrunchClient from datacrunch.exceptions import APIException -from datacrunch.containers.containers import ( +from datacrunch.containers import ( ScalingOptions, ScalingPolicy, ScalingTriggers, @@ -15,106 +15,99 @@ UtilizationScalingTrigger ) -# Configuration - replace with your deployment name -DEPLOYMENT_NAME = "my-deployment" -# Get client secret and id from environment variables +# Get deployment name, client secret and id from environment variables +DEPLOYMENT_NAME = os.environ.get('DATACRUNCH_DEPLOYMENT_NAME') DATACRUNCH_CLIENT_ID = os.environ.get('DATACRUNCH_CLIENT_ID') DATACRUNCH_CLIENT_SECRET = os.environ.get('DATACRUNCH_CLIENT_SECRET') - -def check_deployment_exists(client: DataCrunchClient, deployment_name: str) -> bool: - """Check if a deployment exists. - - Args: - client: DataCrunch API client - deployment_name: Name of the deployment to check - - Returns: - bool: True if deployment exists, False otherwise - """ - try: - client.containers.get_deployment_by_name(deployment_name) - return True - except APIException as e: - print(f"Error: {e}") - return False - - -def update_deployment_scaling(client: DataCrunchClient, deployment_name: str) -> None: - """Update scaling options using the dedicated scaling options API. - - Args: - client: DataCrunch API client - deployment_name: Name of the deployment to update - """ - try: - # Create scaling options using ScalingOptions dataclass - scaling_options = ScalingOptions( - min_replica_count=1, - max_replica_count=5, - scale_down_policy=ScalingPolicy( - delay_seconds=600), # Longer cooldown period - scale_up_policy=ScalingPolicy(delay_seconds=60), # Quick scale-up - queue_message_ttl_seconds=500, - concurrent_requests_per_replica=1, - scaling_triggers=ScalingTriggers( - queue_load=QueueLoadScalingTrigger(threshold=1.0), - cpu_utilization=UtilizationScalingTrigger( - enabled=True, - threshold=75 - ), - gpu_utilization=UtilizationScalingTrigger( - enabled=False # Disable GPU utilization trigger - ) +# Initialize client +datacrunch = DataCrunchClient(DATACRUNCH_CLIENT_ID, DATACRUNCH_CLIENT_SECRET) + +try: + # Get current scaling options + scaling_options = datacrunch.containers.get_deployment_scaling_options( + DEPLOYMENT_NAME) + + print(f"Current scaling configuration:\n") + print(f"Min replicas: {scaling_options.min_replica_count}") + print(f"Max replicas: {scaling_options.max_replica_count}") + print( + f"Scale-up delay: {scaling_options.scale_up_policy.delay_seconds} seconds") + print( + f"Scale-down delay: {scaling_options.scale_down_policy.delay_seconds} seconds") + print( + f"Queue message TTL: {scaling_options.queue_message_ttl_seconds} seconds") + print( + f"Concurrent requests per replica: {scaling_options.concurrent_requests_per_replica}") + print("Scaling Triggers:") + print( + f" Queue load threshold: {scaling_options.scaling_triggers.queue_load.threshold}") + if scaling_options.scaling_triggers.cpu_utilization: + print( + f" CPU utilization enabled: {scaling_options.scaling_triggers.cpu_utilization.enabled}") + print( + f" CPU utilization threshold: {scaling_options.scaling_triggers.cpu_utilization.threshold}%") + if scaling_options.scaling_triggers.gpu_utilization: + print( + f" GPU utilization enabled: {scaling_options.scaling_triggers.gpu_utilization.enabled}") + if scaling_options.scaling_triggers.gpu_utilization.threshold: + print( + f" GPU utilization threshold: {scaling_options.scaling_triggers.gpu_utilization.threshold}%") + + # Create scaling options using ScalingOptions dataclass + scaling_options = ScalingOptions( + min_replica_count=1, + max_replica_count=5, + scale_down_policy=ScalingPolicy( + delay_seconds=600), # Longer cooldown period + scale_up_policy=ScalingPolicy(delay_seconds=0), # Quick scale-up + queue_message_ttl_seconds=500, + concurrent_requests_per_replica=50, # LLMs can handle concurrent requests + scaling_triggers=ScalingTriggers( + queue_load=QueueLoadScalingTrigger(threshold=1.0), + cpu_utilization=UtilizationScalingTrigger( + enabled=True, + threshold=75 + ), + gpu_utilization=UtilizationScalingTrigger( + enabled=False # Disable GPU utilization trigger ) ) - - # Update scaling options - updated_options = client.containers.update_deployment_scaling_options( - deployment_name, scaling_options) - print(f"Updated deployment scaling options") - print(f"New min replicas: {updated_options.min_replica_count}") - print(f"New max replicas: {updated_options.max_replica_count}") + ) + + # Update scaling options + updated_options = datacrunch.containers.update_deployment_scaling_options( + DEPLOYMENT_NAME, scaling_options) + + print(f"\nUpdated scaling configuration:\n") + print(f"Min replicas: {updated_options.min_replica_count}") + print(f"Max replicas: {updated_options.max_replica_count}") + print( + f"Scale-up delay: {updated_options.scale_up_policy.delay_seconds} seconds") + print( + f"Scale-down delay: {updated_options.scale_down_policy.delay_seconds} seconds") + print( + f"Queue message TTL: {updated_options.queue_message_ttl_seconds} seconds") + print( + f"Concurrent requests per replica: {updated_options.concurrent_requests_per_replica}") + print("Scaling Triggers:") + print( + f" Queue load threshold: {updated_options.scaling_triggers.queue_load.threshold}") + if updated_options.scaling_triggers.cpu_utilization: print( - f"CPU utilization trigger enabled: {updated_options.scaling_triggers.cpu_utilization.enabled}") + f" CPU utilization enabled: {updated_options.scaling_triggers.cpu_utilization.enabled}") print( - f"CPU utilization threshold: {updated_options.scaling_triggers.cpu_utilization.threshold}%") - except APIException as e: - print(f"Error updating scaling options: {e}") - - -def main() -> None: - """Main function demonstrating scaling updates.""" - try: - # Initialize client - datacrunch = DataCrunchClient( - DATACRUNCH_CLIENT_ID, DATACRUNCH_CLIENT_SECRET) - - # Verify deployment exists - if not check_deployment_exists(datacrunch, DEPLOYMENT_NAME): - print(f"Deployment {DEPLOYMENT_NAME} does not exist.") - return - - # Update scaling options using the API - update_deployment_scaling(datacrunch, DEPLOYMENT_NAME) - - # Get current scaling options - scaling_options = datacrunch.containers.get_deployment_scaling_options( - DEPLOYMENT_NAME) - print(f"\nCurrent scaling configuration:") - print(f"Min replicas: {scaling_options.min_replica_count}") - print(f"Max replicas: {scaling_options.max_replica_count}") - print( - f"Scale-up delay: {scaling_options.scale_up_policy.delay_seconds} seconds") + f" CPU utilization threshold: {updated_options.scaling_triggers.cpu_utilization.threshold}%") + if updated_options.scaling_triggers.gpu_utilization: print( - f"Scale-down delay: {scaling_options.scale_down_policy.delay_seconds} seconds") - - print("\nScaling update completed successfully.") - - except Exception as e: - print(f"Unexpected error: {e}") + f" GPU utilization enabled: {updated_options.scaling_triggers.gpu_utilization.enabled}") + if updated_options.scaling_triggers.gpu_utilization.threshold: + print( + f" GPU utilization threshold: {updated_options.scaling_triggers.gpu_utilization.threshold}%") -if __name__ == "__main__": - main() +except APIException as e: + print(f"Error updating scaling options: {e}") +except Exception as e: + print(f"Unexpected error: {e}") diff --git a/tests/unit_tests/conftest.py b/tests/unit_tests/conftest.py index dc5c1d3..413787d 100644 --- a/tests/unit_tests/conftest.py +++ b/tests/unit_tests/conftest.py @@ -6,6 +6,7 @@ BASE_URL = "https://api-testing.datacrunch.io/v1" ACCESS_TOKEN = "test-token" CLIENT_ID = "0123456789xyz" +CLIENT_SECRET = "0123456789xyz" @pytest.fixture @@ -15,5 +16,6 @@ def http_client(): auth_service.is_expired = Mock(return_value=True) auth_service.refresh = Mock(return_value=None) auth_service._client_id = CLIENT_ID + auth_service._client_secret = CLIENT_SECRET return HTTPClient(auth_service, BASE_URL) diff --git a/tests/unit_tests/containers/test_containers.py b/tests/unit_tests/containers/test_containers.py index 030ccbe..806b670 100644 --- a/tests/unit_tests/containers/test_containers.py +++ b/tests/unit_tests/containers/test_containers.py @@ -8,12 +8,10 @@ SECRETS_ENDPOINT, SERVERLESS_COMPUTE_RESOURCES_ENDPOINT, Container, - ContainerInfo, ContainerDeploymentStatus, ContainerRegistrySettings, ContainersService, Deployment, - DeploymentInfo, EnvVar, EnvVarType, EntrypointOverridesSettings, @@ -39,7 +37,8 @@ DEPLOYMENT_NAME = "test-deployment" CONTAINER_NAME = "test-container" -COMPUTE_RESOURCE_NAME = "test-compute" +COMPUTE_RESOURCE_NAME_GENERAL_COMPUTE = "General Compute" +COMPUTE_RESOURCE_NAME_H100 = "H100" SECRET_NAME = "test-secret" SECRET_VALUE = "test-secret-value" REGISTRY_CREDENTIAL_NAME = "test-credential" @@ -84,7 +83,7 @@ } ], "compute": { - "name": COMPUTE_RESOURCE_NAME, + "name": COMPUTE_RESOURCE_NAME_GENERAL_COMPUTE, "size": 1, "is_available": True }, @@ -120,12 +119,12 @@ # Sample compute resources data COMPUTE_RESOURCES_DATA = [ { - "name": COMPUTE_RESOURCE_NAME, + "name": COMPUTE_RESOURCE_NAME_GENERAL_COMPUTE, "size": 1, "is_available": True }, { - "name": "large-compute", + "name": COMPUTE_RESOURCE_NAME_H100, "size": 4, "is_available": True } @@ -215,12 +214,12 @@ def test_get_deployments(self, containers_service, deployments_endpoint): # assert assert type(deployments) == list assert len(deployments) == 1 - assert type(deployment) == DeploymentInfo + assert type(deployment) == Deployment assert deployment.name == DEPLOYMENT_NAME assert len(deployment.containers) == 1 - assert type(deployment.containers[0]) == ContainerInfo + assert type(deployment.containers[0]) == Container assert type(deployment.compute) == ComputeResource - assert deployment.compute.name == COMPUTE_RESOURCE_NAME + assert deployment.compute.name == COMPUTE_RESOURCE_NAME_GENERAL_COMPUTE assert responses.assert_call_count(deployments_endpoint, 1) is True @responses.activate @@ -238,11 +237,11 @@ def test_get_deployment_by_name(self, containers_service, deployments_endpoint): deployment = containers_service.get_deployment_by_name(DEPLOYMENT_NAME) # assert - assert type(deployment) == DeploymentInfo + assert type(deployment) == Deployment assert deployment.name == DEPLOYMENT_NAME assert len(deployment.containers) == 1 assert deployment.containers[0].name == CONTAINER_NAME - assert deployment.compute.name == COMPUTE_RESOURCE_NAME + assert deployment.compute.name == COMPUTE_RESOURCE_NAME_GENERAL_COMPUTE assert responses.assert_call_count(url, 1) is True @responses.activate @@ -287,7 +286,8 @@ def test_create_deployment(self, containers_service, deployments_endpoint): type=VolumeMountType.SCRATCH, mount_path="/data")] ) - compute = ComputeResource(name=COMPUTE_RESOURCE_NAME, size=1) + compute = ComputeResource( + name=COMPUTE_RESOURCE_NAME_GENERAL_COMPUTE, size=1) container_registry_settings = ContainerRegistrySettings( is_private=False) @@ -305,11 +305,11 @@ def test_create_deployment(self, containers_service, deployments_endpoint): created_deployment = containers_service.create_deployment(deployment) # assert - assert type(created_deployment) == DeploymentInfo + assert type(created_deployment) == Deployment assert created_deployment.name == DEPLOYMENT_NAME assert len(created_deployment.containers) == 1 assert created_deployment.containers[0].name == CONTAINER_NAME - assert created_deployment.compute.name == COMPUTE_RESOURCE_NAME + assert created_deployment.compute.name == COMPUTE_RESOURCE_NAME_GENERAL_COMPUTE assert responses.assert_call_count(deployments_endpoint, 1) is True @responses.activate @@ -324,7 +324,7 @@ def test_update_deployment(self, containers_service, deployments_endpoint): ) # create deployment object - container = ContainerInfo( + container = Container( name=CONTAINER_NAME, image="nginx:latest", exposed_port=80 @@ -333,9 +333,10 @@ def test_update_deployment(self, containers_service, deployments_endpoint): container_registry_settings = ContainerRegistrySettings( is_private=False) - compute = ComputeResource(name=COMPUTE_RESOURCE_NAME, size=1) + compute = ComputeResource( + name=COMPUTE_RESOURCE_NAME_GENERAL_COMPUTE, size=1) - deployment = DeploymentInfo( + deployment = Deployment( name=DEPLOYMENT_NAME, container_registry_settings=container_registry_settings, containers=[container], @@ -347,11 +348,11 @@ def test_update_deployment(self, containers_service, deployments_endpoint): DEPLOYMENT_NAME, deployment) # assert - assert type(updated_deployment) == DeploymentInfo + assert type(updated_deployment) == Deployment assert updated_deployment.name == DEPLOYMENT_NAME assert len(updated_deployment.containers) == 1 assert updated_deployment.containers[0].name == CONTAINER_NAME - assert updated_deployment.compute.name == COMPUTE_RESOURCE_NAME + assert updated_deployment.compute.name == COMPUTE_RESOURCE_NAME_GENERAL_COMPUTE assert responses.assert_call_count(url, 1) is True @responses.activate @@ -650,7 +651,73 @@ def test_get_compute_resources(self, containers_service, compute_resources_endpo assert type(resources) == list assert len(resources) == 2 assert type(resources[0]) == ComputeResource - assert resources[0].name == COMPUTE_RESOURCE_NAME + assert resources[0].name == COMPUTE_RESOURCE_NAME_GENERAL_COMPUTE + assert resources[0].size == 1 + assert resources[0].is_available == True + assert responses.assert_call_count( + compute_resources_endpoint, 1) is True + + @responses.activate + def test_get_compute_resources_filter_by_size(self, containers_service, compute_resources_endpoint): + # arrange - add response mock + responses.add( + responses.GET, + compute_resources_endpoint, + json=[COMPUTE_RESOURCES_DATA], + status=200 + ) + + # act + resources = containers_service.get_compute_resources(size=4) + + # assert + assert type(resources) == list + assert len(resources) == 1 + assert type(resources[0]) == ComputeResource + assert resources[0].name == COMPUTE_RESOURCE_NAME_H100 + assert resources[0].size == 4 + assert resources[0].is_available == True + assert responses.assert_call_count( + compute_resources_endpoint, 1) is True + + @responses.activate + def test_get_compute_resources_filter_by_availability(self, containers_service, compute_resources_endpoint): + # arrange - add response mock + responses.add( + responses.GET, + compute_resources_endpoint, + json=[COMPUTE_RESOURCES_DATA], + status=200 + ) + + # act + resources = containers_service.get_compute_resources(is_available=True) + + # assert + assert type(resources) == list + assert len(resources) == 2 + assert all(r.is_available == True for r in resources) + assert responses.assert_call_count( + compute_resources_endpoint, 1) is True + + @responses.activate + def test_get_compute_resources_filter_by_size_and_availability(self, containers_service, compute_resources_endpoint): + # arrange - add response mock + responses.add( + responses.GET, + compute_resources_endpoint, + json=[COMPUTE_RESOURCES_DATA], + status=200 + ) + + # act + resources = containers_service.get_compute_resources( + size=1, is_available=True) + + # assert + assert type(resources) == list + assert len(resources) == 1 + assert resources[0].name == COMPUTE_RESOURCE_NAME_GENERAL_COMPUTE assert resources[0].size == 1 assert resources[0].is_available == True assert responses.assert_call_count(