From fe32f8a7449a3631bd1597638e7dc6e65026f6bb Mon Sep 17 00:00:00 2001 From: Tamir Date: Mon, 31 Mar 2025 14:07:42 +0300 Subject: [PATCH 01/22] WIP --- datacrunch/containers/containers.py | 91 +++++++++++++++++------------ datacrunch/datacrunch.py | 6 +- 2 files changed, 57 insertions(+), 40 deletions(-) diff --git a/datacrunch/containers/containers.py b/datacrunch/containers/containers.py index ef7f890..51db308 100644 --- a/datacrunch/containers/containers.py +++ b/datacrunch/containers/containers.py @@ -1,8 +1,11 @@ +import requests from dataclasses import dataclass from dataclasses_json import dataclass_json, Undefined # type: ignore -from typing import List, Optional, Dict +from typing import List, Optional, Dict, Any from enum import Enum +from datacrunch.http_client.http_client import HTTPClient + # API endpoints CONTAINER_DEPLOYMENTS_ENDPOINT = '/container-deployments' @@ -256,39 +259,44 @@ class Deployment: :param is_spot: Whether is spot deployment :param endpoint_base_url: Optional base URL for the deployment endpoint :param scaling: Optional scaling configuration + :param created_at: Optional timestamp when the deployment was created + :param inference_key: Optional inference key for the deployment """ name: str container_registry_settings: ContainerRegistrySettings - containers: List[Container] + containers: List[Container] | List[ContainerInfo] compute: ComputeResource is_spot: bool = False endpoint_base_url: Optional[str] = None scaling: Optional[ScalingOptions] = None + created_at: Optional[str] = None + inference_key: Optional[str] = None -@dataclass_json(undefined=Undefined.EXCLUDE) -@dataclass -class DeploymentInfo: - """Configuration for a container deployment. - This class is read-only and includes system-managed fields. + @classmethod + def from_dict_with_inference_key(cls, data: Dict[str, Any], inference_key: str = None, **kwargs) -> 'Deployment': + """Create a Deployment instance from a dictionary with an inference key. - :param name: Name of the deployment - :param container_registry_settings: Settings for accessing container registry - :param containers: List of containers in the deployment - :param compute: Compute resource configuration - :param is_spot: Whether is spot deployment - :param endpoint_base_url: Optional base URL for the deployment endpoint - :param scaling: Optional scaling configuration - :param created_at: Timestamp when the deployment was created - """ - name: str - container_registry_settings: ContainerRegistrySettings - containers: List[ContainerInfo] - compute: ComputeResource - is_spot: bool = False - endpoint_base_url: Optional[str] = None - scaling: Optional[ScalingOptions] = None - created_at: Optional[str] = None + :param data: Dictionary containing deployment data + :param inference_key: inference key to set on the deployment + :param **kwargs: Additional arguments to pass to from_dict + :return: Deployment instance + """ + deployment = cls.from_dict(data, **kwargs) + deployment._inference_key = inference_key + return deployment + + def run_sync(self, data: Dict[str, Any], path: str = "", timeout_seconds: int = 60 * 5): + if self._inference_key is None: + raise ValueError("Inference key is not set") # TODO: review this + + # TODO: create a request object + return requests.post( + url=f"{self.endpoint_base_url}{path}", + json=data, + headers={"Authorization": f"Bearer {self._inference_key}"}, + timeout=timeout_seconds + ) @dataclass_json @@ -397,67 +405,71 @@ def __init__(self, name: str, docker_config_json: str): class ContainersService: """Service for managing container deployments""" - def __init__(self, http_client) -> None: + def __init__(self, http_client: HTTPClient, inference_key: str = None) -> None: """Initialize the containers service :param http_client: HTTP client for making API requests :type http_client: Any """ self.client = http_client + self._inference_key = inference_key - def get_deployments(self) -> List[DeploymentInfo]: + def get_deployments(self) -> List[Deployment]: """Get all deployments :return: list of deployments - :rtype: List[DeploymentInfo] + :rtype: List[Deployment] """ response = self.client.get(CONTAINER_DEPLOYMENTS_ENDPOINT) - return [DeploymentInfo.from_dict(deployment, infer_missing=True) for deployment in response.json()] + return [Deployment.from_dict(deployment, infer_missing=True) for deployment in response.json()] - def get_deployment_by_name(self, deployment_name: str) -> DeploymentInfo: + def get_deployment_by_name(self, deployment_name: str) -> Deployment: """Get a deployment by name :param deployment_name: name of the deployment :type deployment_name: str :return: deployment - :rtype: DeploymentInfo + :rtype: Deployment """ response = self.client.get( f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}") - return DeploymentInfo.from_dict(response.json(), infer_missing=True) + return Deployment.from_dict_with_inference_key(response.json(), self._inference_key) + + # Function alias + get_deployment = get_deployment_by_name def create_deployment( self, deployment: Deployment - ) -> DeploymentInfo: + ) -> Deployment: """Create a new deployment :param deployment: deployment configuration :type deployment: Deployment :return: created deployment - :rtype: DeploymentInfo + :rtype: Deployment """ response = self.client.post( CONTAINER_DEPLOYMENTS_ENDPOINT, deployment.to_dict() ) - return DeploymentInfo.from_dict(response.json(), infer_missing=True) + return Deployment.from_dict(response.json(), infer_missing=True) - def update_deployment(self, deployment_name: str, deployment: DeploymentInfo) -> DeploymentInfo: + def update_deployment(self, deployment_name: str, deployment: Deployment) -> Deployment: """Update an existing deployment :param deployment_name: name of the deployment to update :type deployment_name: str :param deployment: updated deployment - :type deployment: DeploymentInfo + :type deployment: Deployment :return: updated deployment - :rtype: DeploymentInfo + :rtype: Deployment """ response = self.client.patch( f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}", deployment.to_dict() ) - return DeploymentInfo.from_dict(response.json(), infer_missing=True) + return Deployment.from_dict(response.json(), infer_missing=True) def delete_deployment(self, deployment_name: str) -> None: """Delete a deployment @@ -661,6 +673,9 @@ def get_compute_resources(self) -> List[ComputeResource]: resources.append(ComputeResource.from_dict(resource)) return resources + # Function alias + get_gpus = get_compute_resources + def get_secrets(self) -> List[Secret]: """Get all secrets diff --git a/datacrunch/datacrunch.py b/datacrunch/datacrunch.py index 2f5f98b..7f8be09 100644 --- a/datacrunch/datacrunch.py +++ b/datacrunch/datacrunch.py @@ -17,7 +17,7 @@ class DataCrunchClient: """Client for interacting with DataCrunch's public API""" - def __init__(self, client_id: str, client_secret: str, base_url: str = "https://api.datacrunch.io/v1") -> None: + def __init__(self, client_id: str, client_secret: str, base_url: str = "https://api.datacrunch.io/v1", inference_key: str = None) -> None: """The DataCrunch client :param client_id: client id @@ -26,6 +26,8 @@ def __init__(self, client_id: str, client_secret: str, base_url: str = "https:// :type client_secret: str :param base_url: base url for all the endpoints, optional, defaults to "https://api.datacrunch.io/v1" :type base_url: str, optional + :param inference_key: inference key, optional + :type inference_key: str, optional """ # Constants @@ -70,5 +72,5 @@ def __init__(self, client_id: str, client_secret: str, base_url: str = "https:// """Locations service. Get locations""" self.containers: ContainersService = ContainersService( - self._http_client) + self._http_client, inference_key) """Containers service. Deploy, manage, and monitor container deployments""" From d92163cb848d38dfb110fbac0ad8fb052338b2a0 Mon Sep 17 00:00:00 2001 From: Tamir Date: Tue, 1 Apr 2025 08:53:42 +0300 Subject: [PATCH 02/22] WIP Removed ContainerInfo class, Instantiate all Deployment instances with inference key, don't print key, Created InferenceResponse, WIP healthcheck method --- datacrunch/containers/containers.py | 88 ++++++++++++++++++----------- 1 file changed, 54 insertions(+), 34 deletions(-) diff --git a/datacrunch/containers/containers.py b/datacrunch/containers/containers.py index 51db308..d92683a 100644 --- a/datacrunch/containers/containers.py +++ b/datacrunch/containers/containers.py @@ -1,4 +1,5 @@ import requests +from requests.structures import CaseInsensitiveDict from dataclasses import dataclass from dataclasses_json import dataclass_json, Undefined # type: ignore from typing import List, Optional, Dict, Any @@ -101,10 +102,10 @@ class VolumeMount: @dataclass class Container: """Container configuration for deployment creation and updates. - This class omits the name field which is managed by the system. :param image: Container image to use :param exposed_port: Port to expose from the container + :param name: Name of the container (system-managed, read-only) :param healthcheck: Optional health check configuration :param entrypoint_overrides: Optional entrypoint override settings :param env: Optional list of environment variables @@ -112,29 +113,7 @@ class Container: """ image: str exposed_port: int - healthcheck: Optional[HealthcheckSettings] = None - entrypoint_overrides: Optional[EntrypointOverridesSettings] = None - env: Optional[List[EnvVar]] = None - volume_mounts: Optional[List[VolumeMount]] = None - - -@dataclass_json -@dataclass -class ContainerInfo: - """Container configuration for deployments. - This class is read-only and includes the system-managed name field. - - :param name: Name of the container (system-managed) - :param image: Container image to use - :param exposed_port: Port to expose from the container - :param healthcheck: Optional health check configuration - :param entrypoint_overrides: Optional entrypoint override settings - :param env: Optional list of environment variables - :param volume_mounts: Optional list of volume mounts - """ - name: str - image: str - exposed_port: int + name: Optional[str] = None healthcheck: Optional[HealthcheckSettings] = None entrypoint_overrides: Optional[EntrypointOverridesSettings] = None env: Optional[List[EnvVar]] = None @@ -250,7 +229,6 @@ class ScalingOptions: @dataclass class Deployment: """Configuration for creating or updating a container deployment. - This class uses Container instead of ContainerInfo to prevent name setting. :param name: Name of the deployment :param container_registry_settings: Settings for accessing container registry @@ -264,14 +242,27 @@ class Deployment: """ name: str container_registry_settings: ContainerRegistrySettings - containers: List[Container] | List[ContainerInfo] + containers: List[Container] compute: ComputeResource is_spot: bool = False endpoint_base_url: Optional[str] = None scaling: Optional[ScalingOptions] = None created_at: Optional[str] = None - inference_key: Optional[str] = None + _inference_key: Optional[str] = None + + def __str__(self): + """String representation of the deployment, excluding sensitive information.""" + # Get all attributes except _inference_key + attrs = {k: v for k, v in self.__dict__.items() if k != + '_inference_key'} + # Format each attribute + attr_strs = [f"{k}={repr(v)}" for k, v in attrs.items()] + return f"Deployment({', '.join(attr_strs)})" + + def __repr__(self): + """Repr representation of the deployment, excluding sensitive information.""" + return self.__str__() @classmethod def from_dict_with_inference_key(cls, data: Dict[str, Any], inference_key: str = None, **kwargs) -> 'Deployment': @@ -282,22 +273,51 @@ def from_dict_with_inference_key(cls, data: Dict[str, Any], inference_key: str = :param **kwargs: Additional arguments to pass to from_dict :return: Deployment instance """ - deployment = cls.from_dict(data, **kwargs) + deployment = Deployment.from_dict(data, infer_missing=True) deployment._inference_key = inference_key return deployment def run_sync(self, data: Dict[str, Any], path: str = "", timeout_seconds: int = 60 * 5): if self._inference_key is None: - raise ValueError("Inference key is not set") # TODO: review this + # TODO: do something better + raise ValueError("Inference key is not set") - # TODO: create a request object - return requests.post( + response = requests.post( url=f"{self.endpoint_base_url}{path}", json=data, headers={"Authorization": f"Bearer {self._inference_key}"}, timeout=timeout_seconds ) + return InferenceResponse( + body=response.json(), + headers=response.headers, + status_code=response.status_code, + status_text=response.reason + ) + + def health(self): + healthcheck_path = "health" + if self.containers and self.containers[0].healthcheck and self.containers[0].healthcheck.path: + healthcheck_path = self.containers[0].healthcheck.path.lstrip('/') + + response = requests.get( + url=f"{self.endpoint_base_url}{healthcheck_path}", + headers={"Authorization": f"Bearer {self._inference_key}"}, + ) + return response # TODO: agree on response format + # Function alias + healthcheck = health + + +@dataclass_json(undefined=Undefined.EXCLUDE) +@dataclass +class InferenceResponse: + body: Any + headers: CaseInsensitiveDict[str] + status_code: int + status_text: str + @dataclass_json @dataclass @@ -421,7 +441,7 @@ def get_deployments(self) -> List[Deployment]: :rtype: List[Deployment] """ response = self.client.get(CONTAINER_DEPLOYMENTS_ENDPOINT) - return [Deployment.from_dict(deployment, infer_missing=True) for deployment in response.json()] + return [Deployment.from_dict_with_inference_key(deployment, self._inference_key) for deployment in response.json()] def get_deployment_by_name(self, deployment_name: str) -> Deployment: """Get a deployment by name @@ -453,7 +473,7 @@ def create_deployment( CONTAINER_DEPLOYMENTS_ENDPOINT, deployment.to_dict() ) - return Deployment.from_dict(response.json(), infer_missing=True) + return Deployment.from_dict_with_inference_key(response.json(), self._inference_key) def update_deployment(self, deployment_name: str, deployment: Deployment) -> Deployment: """Update an existing deployment @@ -469,7 +489,7 @@ def update_deployment(self, deployment_name: str, deployment: Deployment) -> Dep f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}", deployment.to_dict() ) - return Deployment.from_dict(response.json(), infer_missing=True) + return Deployment.from_dict_with_inference_key(response.json(), self._inference_key) def delete_deployment(self, deployment_name: str) -> None: """Delete a deployment From 68275a2a3994f51b4456dd074bd663e362af41ff Mon Sep 17 00:00:00 2001 From: Tamir Date: Tue, 1 Apr 2025 10:57:11 +0300 Subject: [PATCH 03/22] Created InferenceClient class, use that in Deployment --- datacrunch/InferenceClient/__init__.py | 0 .../InferenceClient/inference_client.py | 198 ++++++++++++++++++ datacrunch/containers/containers.py | 63 +++--- 3 files changed, 229 insertions(+), 32 deletions(-) create mode 100644 datacrunch/InferenceClient/__init__.py create mode 100644 datacrunch/InferenceClient/inference_client.py diff --git a/datacrunch/InferenceClient/__init__.py b/datacrunch/InferenceClient/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/datacrunch/InferenceClient/inference_client.py b/datacrunch/InferenceClient/inference_client.py new file mode 100644 index 0000000..f0a25c6 --- /dev/null +++ b/datacrunch/InferenceClient/inference_client.py @@ -0,0 +1,198 @@ +from dataclasses import dataclass +from dataclasses_json import dataclass_json, Undefined # type: ignore +import requests +from requests.structures import CaseInsensitiveDict +from typing import Optional, Dict, Any, Union +from urllib.parse import urlparse + + +class InferenceClientError(Exception): + """Base exception for InferenceClient errors.""" + pass + + +@dataclass_json(undefined=Undefined.EXCLUDE) +@dataclass +class InferenceResponse: + body: Any + headers: CaseInsensitiveDict[str] + status_code: int + status_text: str + + +class InferenceClient: + def __init__(self, inference_key: str, endpoint_base_url: str, timeout_seconds: int = 300) -> None: + """ + Initialize the InferenceClient. + + Args: + inference_key: The authentication key for the API + endpoint_base_url: The base URL for the API + timeout_seconds: Request timeout in seconds + + Raises: + InferenceClientError: If the parameters are invalid + """ + if not inference_key: + raise InferenceClientError("inference_key cannot be empty") + + parsed_url = urlparse(endpoint_base_url) + if not parsed_url.scheme or not parsed_url.netloc: + raise InferenceClientError("endpoint_base_url must be a valid URL") + + self.inference_key = inference_key + self.endpoint_base_url = endpoint_base_url.rstrip('/') + self.timeout_seconds = timeout_seconds + self._session = requests.Session() + self._global_headers = { + 'Authorization': f'Bearer {inference_key}', + 'Content-Type': 'application/json' + } + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self._session.close() + + @property + def global_headers(self) -> Dict[str, str]: + """ + Get the current global headers that will be used for all requests. + + Returns: + Dictionary of current global headers + """ + return self._global_headers.copy() + + def set_global_header(self, key: str, value: str) -> None: + """ + Set or update a global header that will be used for all requests. + + Args: + key: Header name + value: Header value + """ + self._global_headers[key] = value + + def set_global_headers(self, headers: Dict[str, str]) -> None: + """ + Set multiple global headers at once that will be used for all requests. + + Args: + headers: Dictionary of headers to set globally + """ + self._global_headers.update(headers) + + def remove_global_header(self, key: str) -> None: + """ + Remove a global header. + + Args: + key: Header name to remove from global headers + """ + if key in self._global_headers: + del self._global_headers[key] + + def _build_url(self, path: str) -> str: + """Construct the full URL by joining the base URL with the path.""" + return f"{self.endpoint_base_url}/{path.lstrip('/')}" + + def _build_request_headers(self, request_headers: Optional[Dict[str, str]] = None) -> Dict[str, str]: + """ + Build the final headers by merging global headers with request-specific headers. + + Args: + request_headers: Optional headers specific to this request + + Returns: + Merged headers dictionary + """ + headers = self._global_headers.copy() + if request_headers: + headers.update(request_headers) + return headers + + def _make_request(self, method: str, path: str, **kwargs) -> requests.Response: + """ + Make an HTTP request with error handling. + + Args: + method: HTTP method to use + path: API endpoint path + **kwargs: Additional arguments to pass to the request + + Returns: + Response object from the request + + Raises: + InferenceClientError: If the request fails + """ + timeout = kwargs.pop('timeout_seconds', self.timeout_seconds) + try: + response = self._session.request( + method=method, + url=self._build_url(path), + headers=self._build_request_headers( + kwargs.pop('headers', None)), + timeout=timeout, + **kwargs + ) + response.raise_for_status() + return response + except requests.exceptions.Timeout: + raise InferenceClientError( + f"Request to {path} timed out after {timeout} seconds") + except requests.exceptions.RequestException as e: + raise InferenceClientError(f"Request to {path} failed: {str(e)}") + + def run_sync(self, data: Dict[str, Any], path: str = "", timeout_seconds: int = 60 * 5, headers: Optional[Dict[str, str]] = None): + response = self.post( + path, json=data, timeout_seconds=timeout_seconds, headers=headers) + + return InferenceResponse( + body=response.json(), + headers=response.headers, + status_code=response.status_code, + status_text=response.reason + ) + + def get(self, path: str, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, timeout_seconds: Optional[int] = None) -> requests.Response: + return self._make_request('GET', path, params=params, headers=headers, timeout_seconds=timeout_seconds) + + def post(self, path: str, json: Optional[Dict[str, Any]] = None, data: Optional[Union[str, Dict[str, Any]]] = None, + params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, timeout_seconds: Optional[int] = None) -> requests.Response: + return self._make_request('POST', path, json=json, data=data, params=params, headers=headers, timeout_seconds=timeout_seconds) + + def put(self, path: str, json: Optional[Dict[str, Any]] = None, data: Optional[Union[str, Dict[str, Any]]] = None, + params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, timeout_seconds: Optional[int] = None) -> requests.Response: + return self._make_request('PUT', path, json=json, data=data, params=params, headers=headers, timeout_seconds=timeout_seconds) + + def delete(self, path: str, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, timeout_seconds: Optional[int] = None) -> requests.Response: + return self._make_request('DELETE', path, params=params, headers=headers, timeout_seconds=timeout_seconds) + + def patch(self, path: str, json: Optional[Dict[str, Any]] = None, data: Optional[Union[str, Dict[str, Any]]] = None, + params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, timeout_seconds: Optional[int] = None) -> requests.Response: + return self._make_request('PATCH', path, json=json, data=data, params=params, headers=headers, timeout_seconds=timeout_seconds) + + def head(self, path: str, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, timeout_seconds: Optional[int] = None) -> requests.Response: + return self._make_request('HEAD', path, params=params, headers=headers, timeout_seconds=timeout_seconds) + + def options(self, path: str, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, timeout_seconds: Optional[int] = None) -> requests.Response: + return self._make_request('OPTIONS', path, params=params, headers=headers, timeout_seconds=timeout_seconds) + + def health(self) -> dict: + """ + Check the health status of the API. + + Returns: + dict: Health status information + + Raises: + InferenceClientError: If the health check fails + """ + try: + response = self.get('/health') + return response.json() + except InferenceClientError as e: + raise InferenceClientError(f"Health check failed: {str(e)}") diff --git a/datacrunch/containers/containers.py b/datacrunch/containers/containers.py index d92683a..1ecc642 100644 --- a/datacrunch/containers/containers.py +++ b/datacrunch/containers/containers.py @@ -1,11 +1,11 @@ import requests -from requests.structures import CaseInsensitiveDict from dataclasses import dataclass from dataclasses_json import dataclass_json, Undefined # type: ignore from typing import List, Optional, Dict, Any from enum import Enum from datacrunch.http_client.http_client import HTTPClient +from datacrunch.InferenceClient.inference_client import InferenceClient, InferenceResponse # API endpoints @@ -238,7 +238,6 @@ class Deployment: :param endpoint_base_url: Optional base URL for the deployment endpoint :param scaling: Optional scaling configuration :param created_at: Optional timestamp when the deployment was created - :param inference_key: Optional inference key for the deployment """ name: str container_registry_settings: ContainerRegistrySettings @@ -249,13 +248,13 @@ class Deployment: scaling: Optional[ScalingOptions] = None created_at: Optional[str] = None - _inference_key: Optional[str] = None + _inference_client: Optional[InferenceClient] = None def __str__(self): """String representation of the deployment, excluding sensitive information.""" - # Get all attributes except _inference_key + # Get all attributes except _inference_client attrs = {k: v for k, v in self.__dict__.items() if k != - '_inference_key'} + '_inference_client'} # Format each attribute attr_strs = [f"{k}={repr(v)}" for k, v in attrs.items()] return f"Deployment({', '.join(attr_strs)})" @@ -265,38 +264,47 @@ def __repr__(self): return self.__str__() @classmethod - def from_dict_with_inference_key(cls, data: Dict[str, Any], inference_key: str = None, **kwargs) -> 'Deployment': + def from_dict_with_inference_key(cls, data: Dict[str, Any], inference_key: str = None) -> 'Deployment': """Create a Deployment instance from a dictionary with an inference key. :param data: Dictionary containing deployment data :param inference_key: inference key to set on the deployment - :param **kwargs: Additional arguments to pass to from_dict :return: Deployment instance """ deployment = Deployment.from_dict(data, infer_missing=True) - deployment._inference_key = inference_key + if inference_key and deployment.endpoint_base_url: + deployment._inference_client = InferenceClient( + inference_key=inference_key, + endpoint_base_url=deployment.endpoint_base_url + ) return deployment - def run_sync(self, data: Dict[str, Any], path: str = "", timeout_seconds: int = 60 * 5): - if self._inference_key is None: - # TODO: do something better - raise ValueError("Inference key is not set") + def set_inference_client(self, inference_key: str) -> None: + """Set the inference client for this deployment. - response = requests.post( - url=f"{self.endpoint_base_url}{path}", - json=data, - headers={"Authorization": f"Bearer {self._inference_key}"}, - timeout=timeout_seconds + :param inference_key: The inference key to use for authentication + :type inference_key: str + :raises ValueError: If endpoint_base_url is not set + """ + if self.endpoint_base_url is None: + raise ValueError( + "Endpoint base URL must be set to use inference client") + self._inference_client = InferenceClient( + inference_key=inference_key, + endpoint_base_url=self.endpoint_base_url ) - return InferenceResponse( - body=response.json(), - headers=response.headers, - status_code=response.status_code, - status_text=response.reason - ) + def run_sync(self, data: Dict[str, Any], path: str = "", timeout_seconds: int = 60 * 5) -> InferenceResponse: + if self._inference_client is None: + if self.endpoint_base_url is None: + raise ValueError( + "Endpoint base URL must be set to use run_sync") + raise ValueError( + "Inference client not initialized. Use from_dict_with_inference_key or set_inference_client to initialize inference capabilities.") + return self._inference_client.run_sync(data, path, timeout_seconds) def health(self): + # TODO: use inference client? healthcheck_path = "health" if self.containers and self.containers[0].healthcheck and self.containers[0].healthcheck.path: healthcheck_path = self.containers[0].healthcheck.path.lstrip('/') @@ -310,15 +318,6 @@ def health(self): healthcheck = health -@dataclass_json(undefined=Undefined.EXCLUDE) -@dataclass -class InferenceResponse: - body: Any - headers: CaseInsensitiveDict[str] - status_code: int - status_text: str - - @dataclass_json @dataclass class ReplicaInfo: From 92a463348d343328c879599bd7d9716c75875a53 Mon Sep 17 00:00:00 2001 From: Tamir Date: Thu, 3 Apr 2025 08:15:21 +0300 Subject: [PATCH 04/22] improved docstrings, wip AsyncInferenceExecution --- .../InferenceClient/inference_client.py | 45 +- datacrunch/containers/containers.py | 589 ++++++++++++------ 2 files changed, 428 insertions(+), 206 deletions(-) diff --git a/datacrunch/InferenceClient/inference_client.py b/datacrunch/InferenceClient/inference_client.py index f0a25c6..a592c83 100644 --- a/datacrunch/InferenceClient/inference_client.py +++ b/datacrunch/InferenceClient/inference_client.py @@ -20,8 +20,29 @@ class InferenceResponse: status_text: str +@dataclass_json(undefined=Undefined.EXCLUDE) +@dataclass +class AsyncInferenceExecution: + id: str + status: str + + _client: 'InferenceClient' + + def status(self) -> str: + # TODO: Call the status endpoint and update the status + return self.status + + # def cancel(self) -> None: + # # TODO: Call the cancel inference executionendpoint + # pass + + def get_results(self) -> Dict[str, Any]: + # TODO: Call the results endpoint + pass + + class InferenceClient: - def __init__(self, inference_key: str, endpoint_base_url: str, timeout_seconds: int = 300) -> None: + def __init__(self, inference_key: str, endpoint_base_url: str, timeout_seconds: int = 60 * 5) -> None: """ Initialize the InferenceClient. @@ -157,6 +178,21 @@ def run_sync(self, data: Dict[str, Any], path: str = "", timeout_seconds: int = status_text=response.reason ) + def run(self, data: Dict[str, Any], path: str = "", timeout_seconds: int = 60 * 5, headers: Optional[Dict[str, str]] = None): + # Add the "Prefer: respond-async" header to the request, to indicate that the request is async + headers = headers or {} + headers['Prefer'] = 'respond-async' + + response = self.post( + path, json=data, timeout_seconds=timeout_seconds, headers=headers) + + # TODO: create an async response class: + # TODO: add a method to check the status of the async request + # TODO: add a method to cancel the async request + # TODO: add a method to get the results of the async request + + return '837cdf50-6cf1-44b0-884e-ed115e700480' + def get(self, path: str, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, timeout_seconds: Optional[int] = None) -> requests.Response: return self._make_request('GET', path, params=params, headers=headers, timeout_seconds=timeout_seconds) @@ -181,18 +217,17 @@ def head(self, path: str, params: Optional[Dict[str, Any]] = None, headers: Opti def options(self, path: str, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, timeout_seconds: Optional[int] = None) -> requests.Response: return self._make_request('OPTIONS', path, params=params, headers=headers, timeout_seconds=timeout_seconds) - def health(self) -> dict: + def health(self, healthcheck_path: str = "/health") -> requests.Response: """ Check the health status of the API. Returns: - dict: Health status information + requests.Response: The response from the health check Raises: InferenceClientError: If the health check fails """ try: - response = self.get('/health') - return response.json() + return self.get(healthcheck_path) except InferenceClientError as e: raise InferenceClientError(f"Health check failed: {str(e)}") diff --git a/datacrunch/containers/containers.py b/datacrunch/containers/containers.py index 1ecc642..4fedbdf 100644 --- a/datacrunch/containers/containers.py +++ b/datacrunch/containers/containers.py @@ -1,4 +1,9 @@ -import requests +"""Container deployment and management service for DataCrunch. + +This module provides functionality for managing container deployments, including +creation, updates, deletion, and monitoring of containerized applications. +""" + from dataclasses import dataclass from dataclasses_json import dataclass_json, Undefined # type: ignore from typing import List, Optional, Dict, Any @@ -16,16 +21,22 @@ class EnvVarType(str, Enum): + """Types of environment variables that can be set in containers.""" + PLAIN = "plain" SECRET = "secret" class VolumeMountType(str, Enum): + """Types of volume mounts that can be configured for containers.""" + SCRATCH = "scratch" SECRET = "secret" class ContainerRegistryType(str, Enum): + """Supported container registry types.""" + GCR = "gcr" DOCKERHUB = "dockerhub" GITHUB = "ghcr" @@ -34,6 +45,8 @@ class ContainerRegistryType(str, Enum): class ContainerDeploymentStatus(str, Enum): + """Possible states of a container deployment.""" + INITIALIZING = "initializing" HEALTHY = "healthy" DEGRADED = "degraded" @@ -47,12 +60,14 @@ class ContainerDeploymentStatus(str, Enum): @dataclass_json @dataclass class HealthcheckSettings: - """Settings for container health checking. + """Configuration for container health checking. - :param enabled: Whether health checking is enabled - :param port: Port number to perform health check on - :param path: HTTP path to perform health check on + Attributes: + enabled: Whether health checking is enabled. + port: Port number to perform health check on. + path: HTTP path to perform health check on. """ + enabled: bool = True port: Optional[int] = None path: Optional[str] = None @@ -61,12 +76,14 @@ class HealthcheckSettings: @dataclass_json @dataclass class EntrypointOverridesSettings: - """Settings for overriding container entrypoint and command. + """Configuration for overriding container entrypoint and command. - :param enabled: Whether entrypoint overrides are enabled - :param entrypoint: List of strings forming the entrypoint command - :param cmd: List of strings forming the command arguments + Attributes: + enabled: Whether entrypoint overrides are enabled. + entrypoint: List of strings forming the entrypoint command. + cmd: List of strings forming the command arguments. """ + enabled: bool = True entrypoint: Optional[List[str]] = None cmd: Optional[List[str]] = None @@ -77,10 +94,12 @@ class EntrypointOverridesSettings: class EnvVar: """Environment variable configuration for containers. - :param name: Name of the environment variable - :param value_or_reference_to_secret: Direct value or reference to a secret - :param type: Type of the environment variable + Attributes: + name: Name of the environment variable. + value_or_reference_to_secret: Direct value or reference to a secret. + type: Type of the environment variable. """ + name: str value_or_reference_to_secret: str type: EnvVarType @@ -91,9 +110,11 @@ class EnvVar: class VolumeMount: """Volume mount configuration for containers. - :param type: Type of volume mount - :param mount_path: Path where the volume should be mounted in the container + Attributes: + type: Type of volume mount. + mount_path: Path where the volume should be mounted in the container. """ + type: VolumeMountType mount_path: str @@ -103,14 +124,16 @@ class VolumeMount: class Container: """Container configuration for deployment creation and updates. - :param image: Container image to use - :param exposed_port: Port to expose from the container - :param name: Name of the container (system-managed, read-only) - :param healthcheck: Optional health check configuration - :param entrypoint_overrides: Optional entrypoint override settings - :param env: Optional list of environment variables - :param volume_mounts: Optional list of volume mounts + Attributes: + image: Container image to use. + exposed_port: Port to expose from the container. + name: Name of the container (system-managed, read-only). + healthcheck: Optional health check configuration. + entrypoint_overrides: Optional entrypoint override settings. + env: Optional list of environment variables. + volume_mounts: Optional list of volume mounts. """ + image: str exposed_port: int name: Optional[str] = None @@ -125,8 +148,10 @@ class Container: class ContainerRegistryCredentials: """Credentials for accessing a container registry. - :param name: Name of the credentials + Attributes: + name: Name of the credentials. """ + name: str @@ -135,9 +160,11 @@ class ContainerRegistryCredentials: class ContainerRegistrySettings: """Settings for container registry access. - :param is_private: Whether the registry is private - :param credentials: Optional credentials for accessing private registry + Attributes: + is_private: Whether the registry is private. + credentials: Optional credentials for accessing private registry. """ + is_private: bool credentials: Optional[ContainerRegistryCredentials] = None @@ -147,10 +174,12 @@ class ContainerRegistrySettings: class ComputeResource: """Compute resource configuration. - :param name: Name of the compute resource - :param size: Size of the compute resource - :param is_available: Whether the compute resource is currently available + Attributes: + name: Name of the compute resource. + size: Size of the compute resource. + is_available: Whether the compute resource is currently available. """ + name: str size: int # Made optional since it's only used in API responses @@ -162,8 +191,10 @@ class ComputeResource: class ScalingPolicy: """Policy for controlling scaling behavior. - :param delay_seconds: Number of seconds to wait before applying scaling action + Attributes: + delay_seconds: Number of seconds to wait before applying scaling action. """ + delay_seconds: int @@ -172,8 +203,10 @@ class ScalingPolicy: class QueueLoadScalingTrigger: """Trigger for scaling based on queue load. - :param threshold: Queue load threshold that triggers scaling + Attributes: + threshold: Queue load threshold that triggers scaling. """ + threshold: float @@ -182,9 +215,11 @@ class QueueLoadScalingTrigger: class UtilizationScalingTrigger: """Trigger for scaling based on resource utilization. - :param enabled: Whether this trigger is enabled - :param threshold: Utilization threshold that triggers scaling + Attributes: + enabled: Whether this trigger is enabled. + threshold: Utilization threshold that triggers scaling. """ + enabled: bool threshold: Optional[float] = None @@ -194,10 +229,12 @@ class UtilizationScalingTrigger: class ScalingTriggers: """Collection of triggers that can cause scaling actions. - :param queue_load: Optional trigger based on queue load - :param cpu_utilization: Optional trigger based on CPU utilization - :param gpu_utilization: Optional trigger based on GPU utilization + Attributes: + queue_load: Optional trigger based on queue load. + cpu_utilization: Optional trigger based on CPU utilization. + gpu_utilization: Optional trigger based on GPU utilization. """ + queue_load: Optional[QueueLoadScalingTrigger] = None cpu_utilization: Optional[UtilizationScalingTrigger] = None gpu_utilization: Optional[UtilizationScalingTrigger] = None @@ -208,14 +245,16 @@ class ScalingTriggers: class ScalingOptions: """Configuration for automatic scaling behavior. - :param min_replica_count: Minimum number of replicas to maintain - :param max_replica_count: Maximum number of replicas allowed - :param scale_down_policy: Policy for scaling down replicas - :param scale_up_policy: Policy for scaling up replicas - :param queue_message_ttl_seconds: Time-to-live for queue messages in seconds - :param concurrent_requests_per_replica: Number of concurrent requests each replica can handle - :param scaling_triggers: Configuration for various scaling triggers + Attributes: + min_replica_count: Minimum number of replicas to maintain. + max_replica_count: Maximum number of replicas allowed. + scale_down_policy: Policy for scaling down replicas. + scale_up_policy: Policy for scaling up replicas. + queue_message_ttl_seconds: Time-to-live for queue messages in seconds. + concurrent_requests_per_replica: Number of concurrent requests each replica can handle. + scaling_triggers: Configuration for various scaling triggers. """ + min_replica_count: int max_replica_count: int scale_down_policy: ScalingPolicy @@ -230,15 +269,17 @@ class ScalingOptions: class Deployment: """Configuration for creating or updating a container deployment. - :param name: Name of the deployment - :param container_registry_settings: Settings for accessing container registry - :param containers: List of container specifications in the deployment - :param compute: Compute resource configuration - :param is_spot: Whether is spot deployment - :param endpoint_base_url: Optional base URL for the deployment endpoint - :param scaling: Optional scaling configuration - :param created_at: Optional timestamp when the deployment was created + Attributes: + name: Name of the deployment. + container_registry_settings: Settings for accessing container registry. + containers: List of container specifications in the deployment. + compute: Compute resource configuration. + is_spot: Whether is spot deployment. + endpoint_base_url: Optional base URL for the deployment endpoint. + scaling: Optional scaling configuration. + created_at: Optional timestamp when the deployment was created. """ + name: str container_registry_settings: ContainerRegistrySettings containers: List[Container] @@ -251,7 +292,11 @@ class Deployment: _inference_client: Optional[InferenceClient] = None def __str__(self): - """String representation of the deployment, excluding sensitive information.""" + """Returns a string representation of the deployment, excluding sensitive information. + + Returns: + str: A formatted string representation of the deployment. + """ # Get all attributes except _inference_client attrs = {k: v for k, v in self.__dict__.items() if k != '_inference_client'} @@ -260,16 +305,23 @@ def __str__(self): return f"Deployment({', '.join(attr_strs)})" def __repr__(self): - """Repr representation of the deployment, excluding sensitive information.""" + """Returns a repr representation of the deployment, excluding sensitive information. + + Returns: + str: A formatted string representation of the deployment. + """ return self.__str__() @classmethod def from_dict_with_inference_key(cls, data: Dict[str, Any], inference_key: str = None) -> 'Deployment': - """Create a Deployment instance from a dictionary with an inference key. + """Creates a Deployment instance from a dictionary with an inference key. + + Args: + data: Dictionary containing deployment data. + inference_key: Inference key to set on the deployment. - :param data: Dictionary containing deployment data - :param inference_key: inference key to set on the deployment - :return: Deployment instance + Returns: + Deployment: A new Deployment instance with the inference client initialized. """ deployment = Deployment.from_dict(data, infer_missing=True) if inference_key and deployment.endpoint_base_url: @@ -280,11 +332,13 @@ def from_dict_with_inference_key(cls, data: Dict[str, Any], inference_key: str = return deployment def set_inference_client(self, inference_key: str) -> None: - """Set the inference client for this deployment. + """Sets the inference client for this deployment. + + Args: + inference_key: The inference key to use for authentication. - :param inference_key: The inference key to use for authentication - :type inference_key: str - :raises ValueError: If endpoint_base_url is not set + Raises: + ValueError: If endpoint_base_url is not set. """ if self.endpoint_base_url is None: raise ValueError( @@ -294,26 +348,68 @@ def set_inference_client(self, inference_key: str) -> None: endpoint_base_url=self.endpoint_base_url ) - def run_sync(self, data: Dict[str, Any], path: str = "", timeout_seconds: int = 60 * 5) -> InferenceResponse: + def _validate_inference_client(self) -> None: + """Validates that the inference client is initialized. + + Raises: + ValueError: If inference client is not initialized. + """ if self._inference_client is None: - if self.endpoint_base_url is None: - raise ValueError( - "Endpoint base URL must be set to use run_sync") raise ValueError( "Inference client not initialized. Use from_dict_with_inference_key or set_inference_client to initialize inference capabilities.") - return self._inference_client.run_sync(data, path, timeout_seconds) + + def run_sync(self, data: Dict[str, Any], path: str = "", timeout_seconds: int = 60 * 5, headers: Optional[Dict[str, str]] = None) -> InferenceResponse: + """Runs a synchronous inference request. + + Args: + data: The data to send in the request. + path: The endpoint path to send the request to. + timeout_seconds: Maximum time to wait for the response. + headers: Optional headers to include in the request. + + Returns: + InferenceResponse: The response from the inference request. + + Raises: + ValueError: If the inference client is not initialized. + """ + self._validate_inference_client() + return self._inference_client.run_sync(data, path, timeout_seconds, headers) + + def run(self, data: Dict[str, Any], path: str = "", timeout_seconds: int = 60 * 5, headers: Optional[Dict[str, str]] = None): + """Runs an asynchronous inference request. + + Args: + data: The data to send in the request. + path: The endpoint path to send the request to. + timeout_seconds: Maximum time to wait for the response. + headers: Optional headers to include in the request. + + Returns: + The response from the inference request. + + Raises: + ValueError: If the inference client is not initialized. + """ + self._validate_inference_client() + return self._inference_client.run(data, path, timeout_seconds, headers) def health(self): - # TODO: use inference client? - healthcheck_path = "health" + """Checks the health of the deployed application. + + Returns: + The health check response. + + Raises: + ValueError: If the inference client is not initialized. + """ + self._validate_inference_client() + # build healthcheck path + healthcheck_path = "/health" if self.containers and self.containers[0].healthcheck and self.containers[0].healthcheck.path: - healthcheck_path = self.containers[0].healthcheck.path.lstrip('/') + healthcheck_path = self.containers[0].healthcheck.path - response = requests.get( - url=f"{self.endpoint_base_url}{healthcheck_path}", - headers={"Authorization": f"Bearer {self._inference_key}"}, - ) - return response # TODO: agree on response format + return self._inference_client.health(healthcheck_path) # Function alias healthcheck = health @@ -323,10 +419,12 @@ def health(self): class ReplicaInfo: """Information about a deployment replica. - :param id: Unique identifier of the replica - :param status: Current status of the replica - :param started_at: Timestamp when the replica was started + Attributes: + id: Unique identifier of the replica. + status: Current status of the replica. + started_at: Timestamp when the replica was started. """ + id: str status: str started_at: str @@ -335,7 +433,13 @@ class ReplicaInfo: @dataclass_json @dataclass class Secret: - """A secret model class""" + """A secret model class. + + Attributes: + name: Name of the secret. + created_at: Timestamp when the secret was created. + """ + name: str created_at: str @@ -343,7 +447,13 @@ class Secret: @dataclass_json @dataclass class RegistryCredential: - """A container registry credential model class""" + """A container registry credential model class. + + Attributes: + name: Name of the registry credential. + created_at: Timestamp when the credential was created. + """ + name: str created_at: str @@ -351,7 +461,13 @@ class RegistryCredential: @dataclass_json @dataclass class BaseRegistryCredentials: - """Base class for registry credentials""" + """Base class for registry credentials. + + Attributes: + name: Name of the registry credential. + type: Type of the container registry. + """ + name: str type: ContainerRegistryType @@ -359,11 +475,24 @@ class BaseRegistryCredentials: @dataclass_json @dataclass class DockerHubCredentials(BaseRegistryCredentials): - """Credentials for DockerHub registry""" + """Credentials for DockerHub registry. + + Attributes: + username: DockerHub username. + access_token: DockerHub access token. + """ + username: str access_token: str def __init__(self, name: str, username: str, access_token: str): + """Initializes DockerHub credentials. + + Args: + name: Name of the credentials. + username: DockerHub username. + access_token: DockerHub access token. + """ super().__init__(name=name, type=ContainerRegistryType.DOCKERHUB) self.username = username self.access_token = access_token @@ -372,11 +501,24 @@ def __init__(self, name: str, username: str, access_token: str): @dataclass_json @dataclass class GithubCredentials(BaseRegistryCredentials): - """Credentials for GitHub Container Registry""" + """Credentials for GitHub Container Registry. + + Attributes: + username: GitHub username. + access_token: GitHub access token. + """ + username: str access_token: str def __init__(self, name: str, username: str, access_token: str): + """Initializes GitHub credentials. + + Args: + name: Name of the credentials. + username: GitHub username. + access_token: GitHub access token. + """ super().__init__(name=name, type=ContainerRegistryType.GITHUB) self.username = username self.access_token = access_token @@ -385,10 +527,21 @@ def __init__(self, name: str, username: str, access_token: str): @dataclass_json @dataclass class GCRCredentials(BaseRegistryCredentials): - """Credentials for Google Container Registry""" + """Credentials for Google Container Registry. + + Attributes: + service_account_key: GCP service account key JSON. + """ + service_account_key: str def __init__(self, name: str, service_account_key: str): + """Initializes GCR credentials. + + Args: + name: Name of the credentials. + service_account_key: GCP service account key JSON. + """ super().__init__(name=name, type=ContainerRegistryType.GCR) self.service_account_key = service_account_key @@ -396,13 +549,30 @@ def __init__(self, name: str, service_account_key: str): @dataclass_json @dataclass class AWSECRCredentials(BaseRegistryCredentials): - """Credentials for AWS Elastic Container Registry""" + """Credentials for AWS Elastic Container Registry. + + Attributes: + access_key_id: AWS access key ID. + secret_access_key: AWS secret access key. + region: AWS region. + ecr_repo: ECR repository name. + """ + access_key_id: str secret_access_key: str region: str ecr_repo: str def __init__(self, name: str, access_key_id: str, secret_access_key: str, region: str, ecr_repo: str): + """Initializes AWS ECR credentials. + + Args: + name: Name of the credentials. + access_key_id: AWS access key ID. + secret_access_key: AWS secret access key. + region: AWS region. + ecr_repo: ECR repository name. + """ super().__init__(name=name, type=ContainerRegistryType.AWS_ECR) self.access_key_id = access_key_id self.secret_access_key = secret_access_key @@ -413,42 +583,59 @@ def __init__(self, name: str, access_key_id: str, secret_access_key: str, region @dataclass_json @dataclass class CustomRegistryCredentials(BaseRegistryCredentials): - """Credentials for custom container registries""" + """Credentials for custom container registries. + + Attributes: + docker_config_json: Docker config JSON containing registry credentials. + """ + docker_config_json: str def __init__(self, name: str, docker_config_json: str): + """Initializes custom registry credentials. + + Args: + name: Name of the credentials. + docker_config_json: Docker config JSON containing registry credentials. + """ super().__init__(name=name, type=ContainerRegistryType.CUSTOM) self.docker_config_json = docker_config_json class ContainersService: - """Service for managing container deployments""" + """Service for managing container deployments. + + This class provides methods for interacting with the DataCrunch container + deployment API, including CRUD operations for deployments and related resources. + """ def __init__(self, http_client: HTTPClient, inference_key: str = None) -> None: - """Initialize the containers service + """Initializes the containers service. - :param http_client: HTTP client for making API requests - :type http_client: Any + Args: + http_client: HTTP client for making API requests. + inference_key: Optional inference key for authenticating inference requests. """ self.client = http_client self._inference_key = inference_key def get_deployments(self) -> List[Deployment]: - """Get all deployments + """Retrieves all container deployments. - :return: list of deployments - :rtype: List[Deployment] + Returns: + List[Deployment]: List of all deployments. """ response = self.client.get(CONTAINER_DEPLOYMENTS_ENDPOINT) return [Deployment.from_dict_with_inference_key(deployment, self._inference_key) for deployment in response.json()] def get_deployment_by_name(self, deployment_name: str) -> Deployment: - """Get a deployment by name + """Retrieves a specific deployment by name. - :param deployment_name: name of the deployment - :type deployment_name: str - :return: deployment - :rtype: Deployment + Args: + deployment_name: Name of the deployment to retrieve. + + Returns: + Deployment: The requested deployment. """ response = self.client.get( f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}") @@ -461,12 +648,13 @@ def create_deployment( self, deployment: Deployment ) -> Deployment: - """Create a new deployment + """Creates a new container deployment. + + Args: + deployment: Deployment configuration to create. - :param deployment: deployment configuration - :type deployment: Deployment - :return: created deployment - :rtype: Deployment + Returns: + Deployment: The created deployment. """ response = self.client.post( CONTAINER_DEPLOYMENTS_ENDPOINT, @@ -475,14 +663,14 @@ def create_deployment( return Deployment.from_dict_with_inference_key(response.json(), self._inference_key) def update_deployment(self, deployment_name: str, deployment: Deployment) -> Deployment: - """Update an existing deployment + """Updates an existing deployment. + + Args: + deployment_name: Name of the deployment to update. + deployment: Updated deployment configuration. - :param deployment_name: name of the deployment to update - :type deployment_name: str - :param deployment: updated deployment - :type deployment: Deployment - :return: updated deployment - :rtype: Deployment + Returns: + Deployment: The updated deployment. """ response = self.client.patch( f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}", @@ -491,56 +679,58 @@ def update_deployment(self, deployment_name: str, deployment: Deployment) -> Dep return Deployment.from_dict_with_inference_key(response.json(), self._inference_key) def delete_deployment(self, deployment_name: str) -> None: - """Delete a deployment + """Deletes a deployment. - :param deployment_name: name of the deployment to delete - :type deployment_name: str + Args: + deployment_name: Name of the deployment to delete. """ self.client.delete( f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}") def get_deployment_status(self, deployment_name: str) -> ContainerDeploymentStatus: - """Get deployment status + """Retrieves the current status of a deployment. - :param deployment_name: name of the deployment - :type deployment_name: str - :return: deployment status - :rtype: ContainerDeploymentStatus + Args: + deployment_name: Name of the deployment. + + Returns: + ContainerDeploymentStatus: Current status of the deployment. """ response = self.client.get( f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}/status") return ContainerDeploymentStatus(response.json()["status"]) def restart_deployment(self, deployment_name: str) -> None: - """Restart a deployment + """Restarts a deployment. - :param deployment_name: name of the deployment to restart - :type deployment_name: str + Args: + deployment_name: Name of the deployment to restart. """ self.client.post( f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}/restart") def get_deployment_scaling_options(self, deployment_name: str) -> ScalingOptions: - """Get deployment scaling options + """Retrieves the scaling options for a deployment. + + Args: + deployment_name: Name of the deployment. - :param deployment_name: name of the deployment - :type deployment_name: str - :return: scaling options - :rtype: ScalingOptions + Returns: + ScalingOptions: Current scaling options for the deployment. """ response = self.client.get( f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}/scaling") return ScalingOptions.from_dict(response.json()) def update_deployment_scaling_options(self, deployment_name: str, scaling_options: ScalingOptions) -> ScalingOptions: - """Update deployment scaling options + """Updates the scaling options for a deployment. + + Args: + deployment_name: Name of the deployment. + scaling_options: New scaling options to apply. - :param deployment_name: name of the deployment - :type deployment_name: str - :param scaling_options: new scaling options - :type scaling_options: ScalingOptions - :return: updated scaling options - :rtype: ScalingOptions + Returns: + ScalingOptions: Updated scaling options for the deployment. """ response = self.client.patch( f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}/scaling", @@ -549,51 +739,53 @@ def update_deployment_scaling_options(self, deployment_name: str, scaling_option return ScalingOptions.from_dict(response.json()) def get_deployment_replicas(self, deployment_name: str) -> List[ReplicaInfo]: - """Get deployment replicas + """Retrieves information about deployment replicas. - :param deployment_name: name of the deployment - :type deployment_name: str - :return: list of replicas information - :rtype: List[ReplicaInfo] + Args: + deployment_name: Name of the deployment. + + Returns: + List[ReplicaInfo]: List of replica information. """ response = self.client.get( f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}/replicas") return [ReplicaInfo.from_dict(replica) for replica in response.json()["list"]] def purge_deployment_queue(self, deployment_name: str) -> None: - """Purge deployment queue + """Purges the deployment queue. - :param deployment_name: name of the deployment - :type deployment_name: str + Args: + deployment_name: Name of the deployment. """ self.client.post( f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}/purge-queue") def pause_deployment(self, deployment_name: str) -> None: - """Pause a deployment + """Pauses a deployment. - :param deployment_name: name of the deployment to pause - :type deployment_name: str + Args: + deployment_name: Name of the deployment to pause. """ self.client.post( f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}/pause") def resume_deployment(self, deployment_name: str) -> None: - """Resume a deployment + """Resumes a paused deployment. - :param deployment_name: name of the deployment to resume - :type deployment_name: str + Args: + deployment_name: Name of the deployment to resume. """ self.client.post( f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}/resume") def get_deployment_environment_variables(self, deployment_name: str) -> Dict[str, List[EnvVar]]: - """Get deployment environment variables + """Retrieves environment variables for a deployment. + + Args: + deployment_name: Name of the deployment. - :param deployment_name: name of the deployment - :type deployment_name: str - :return: dictionary mapping container names to their environment variables - :rtype: Dict[str, List[EnvVar]] + Returns: + Dict[str, List[EnvVar]]: Dictionary mapping container names to their environment variables. """ response = self.client.get( f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}/environment-variables") @@ -606,16 +798,15 @@ def get_deployment_environment_variables(self, deployment_name: str) -> Dict[str return result def add_deployment_environment_variables(self, deployment_name: str, container_name: str, env_vars: List[EnvVar]) -> Dict[str, List[EnvVar]]: - """Add environment variables to a container + """Adds environment variables to a container in a deployment. + + Args: + deployment_name: Name of the deployment. + container_name: Name of the container. + env_vars: List of environment variables to add. - :param deployment_name: name of the deployment - :type deployment_name: str - :param container_name: name of the container - :type container_name: str - :param env_vars: environment variables to add - :type env_vars: List[EnvVar] - :return: updated environment variables - :rtype: Dict[str, List[EnvVar]] + Returns: + Dict[str, List[EnvVar]]: Updated environment variables for all containers. """ response = self.client.post( f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}/environment-variables", @@ -631,16 +822,15 @@ def add_deployment_environment_variables(self, deployment_name: str, container_n return result def update_deployment_environment_variables(self, deployment_name: str, container_name: str, env_vars: List[EnvVar]) -> Dict[str, List[EnvVar]]: - """Update environment variables of a container + """Updates environment variables for a container in a deployment. - :param deployment_name: name of the deployment - :type deployment_name: str - :param container_name: name of the container - :type container_name: str - :param env_vars: updated environment variables - :type env_vars: List[EnvVar] - :return: updated environment variables - :rtype: Dict[str, List[EnvVar]] + Args: + deployment_name: Name of the deployment. + container_name: Name of the container. + env_vars: List of updated environment variables. + + Returns: + Dict[str, List[EnvVar]]: Updated environment variables for all containers. """ response = self.client.patch( f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}/environment-variables", @@ -656,16 +846,15 @@ def update_deployment_environment_variables(self, deployment_name: str, containe return result def delete_deployment_environment_variables(self, deployment_name: str, container_name: str, env_var_names: List[str]) -> Dict[str, List[EnvVar]]: - """Delete environment variables from a container + """Deletes environment variables from a container in a deployment. + + Args: + deployment_name: Name of the deployment. + container_name: Name of the container. + env_var_names: List of environment variable names to delete. - :param deployment_name: name of the deployment - :type deployment_name: str - :param container_name: name of the container - :type container_name: str - :param env_var_names: names of environment variables to delete - :type env_var_names: List[str] - :return: remaining environment variables - :rtype: Dict[str, List[EnvVar]] + Returns: + Dict[str, List[EnvVar]]: Updated environment variables for all containers. """ response = self.client.delete( f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}/environment-variables", @@ -680,10 +869,10 @@ def delete_deployment_environment_variables(self, deployment_name: str, containe return result def get_compute_resources(self) -> List[ComputeResource]: - """Get available compute resources + """Retrieves available compute resources. - :return: list of compute resources - :rtype: List[ComputeResource] + Returns: + List[ComputeResource]: List of available compute resources. """ response = self.client.get(SERVERLESS_COMPUTE_RESOURCES_ENDPOINT) resources = [] @@ -696,58 +885,56 @@ def get_compute_resources(self) -> List[ComputeResource]: get_gpus = get_compute_resources def get_secrets(self) -> List[Secret]: - """Get all secrets + """Retrieves all secrets. - :return: list of secrets - :rtype: List[Secret] + Returns: + List[Secret]: List of all secrets. """ response = self.client.get(SECRETS_ENDPOINT) return [Secret.from_dict(secret) for secret in response.json()] def create_secret(self, name: str, value: str) -> None: - """Create a new secret + """Creates a new secret. - :param name: name of the secret - :type name: str - :param value: value of the secret - :type value: str + Args: + name: Name of the secret. + value: Value of the secret. """ self.client.post(SECRETS_ENDPOINT, {"name": name, "value": value}) def delete_secret(self, secret_name: str, force: bool = False) -> None: - """Delete a secret + """Deletes a secret. - :param secret_name: name of the secret to delete - :type secret_name: str - :param force: force delete even if secret is in use - :type force: bool + Args: + secret_name: Name of the secret to delete. + force: Whether to force delete even if secret is in use. """ self.client.delete( f"{SECRETS_ENDPOINT}/{secret_name}", params={"force": str(force).lower()}) def get_registry_credentials(self) -> List[RegistryCredential]: - """Get all registry credentials + """Retrieves all registry credentials. - :return: list of registry credentials - :rtype: List[RegistryCredential] + Returns: + List[RegistryCredential]: List of all registry credentials. """ response = self.client.get(CONTAINER_REGISTRY_CREDENTIALS_ENDPOINT) return [RegistryCredential.from_dict(credential) for credential in response.json()] def add_registry_credentials(self, credentials: BaseRegistryCredentials) -> None: - """Add registry credentials + """Adds new registry credentials. - :param credentials: Registry credentials object - :type credentials: BaseRegistryCredentials + Args: + credentials: Registry credentials to add. """ data = credentials.to_dict() self.client.post(CONTAINER_REGISTRY_CREDENTIALS_ENDPOINT, data) def delete_registry_credentials(self, credentials_name: str) -> None: - """Delete registry credentials + """Deletes registry credentials. - :param credentials_name: name of the credentials to delete - :type credentials_name: str + Args: + credentials_name: Name of the credentials to delete. """ self.client.delete( f"{CONTAINER_REGISTRY_CREDENTIALS_ENDPOINT}/{credentials_name}") From 887b834cb45a343ab8db8fcc4f76a0a5c7477ed2 Mon Sep 17 00:00:00 2001 From: Tamir Date: Thu, 3 Apr 2025 09:41:42 +0300 Subject: [PATCH 05/22] replaced fastai image --- README.md | 2 +- .../examples/advanced_create_instance.rst | 4 ++-- docs/source/examples/instance_actions.rst | 2 +- .../source/examples/instances_and_volumes.rst | 4 ++-- .../examples/simple_create_instance.rst | 2 +- docs/source/index.rst | 2 +- examples/advanced_create_instance.py | 10 ++++---- examples/instance_actions.py | 15 +++++++----- examples/instances_and_volumes.py | 24 ++++++++++--------- examples/simple_create_instance.py | 5 ++-- tests/unit_tests/instances/test_instances.py | 2 +- 11 files changed, 39 insertions(+), 33 deletions(-) diff --git a/README.md b/README.md index b1fef0f..a490856 100644 --- a/README.md +++ b/README.md @@ -54,7 +54,7 @@ DataCrunch's Public API documentation [is available here](https://api.datacrunch # Create a new instance instance = datacrunch.instances.create(instance_type='1V100.6V', - image='fastai', + image='ubuntu-24.04-cuda-12.8-open-docker', ssh_key_ids=ssh_keys, hostname='example', description='example instance') diff --git a/docs/source/examples/advanced_create_instance.rst b/docs/source/examples/advanced_create_instance.rst index 880effd..fc04494 100644 --- a/docs/source/examples/advanced_create_instance.rst +++ b/docs/source/examples/advanced_create_instance.rst @@ -56,7 +56,7 @@ Advanced Create Instance if price_per_hour * DURATION < balance.amount: # Deploy a new 8V instance instance = datacrunch.instances.create(instance_type=INSTANCE_TYPE_8V, - image='fastai', + image='ubuntu-24.04-cuda-12.8-open-docker', ssh_key_ids=ssh_keys_ids, hostname='example', description='large instance' @@ -67,7 +67,7 @@ Advanced Create Instance else: # Deploy a new 4V instance instance = datacrunch.instances.create(instance_type=INSTANCE_TYPE_4V, - image='fastai', + image='ubuntu-24.04-cuda-12.8-open-docker', ssh_key_ids=ssh_keys_ids, hostname='example', description='medium instance') diff --git a/docs/source/examples/instance_actions.rst b/docs/source/examples/instance_actions.rst index 77b115b..c59690b 100644 --- a/docs/source/examples/instance_actions.rst +++ b/docs/source/examples/instance_actions.rst @@ -22,7 +22,7 @@ Instance Actions # Create a new 1V100.6V instance instance = datacrunch.instances.create(instance_type='1V100.6V', - image='fastai', + image='ubuntu-24.04-cuda-12.8-open-docker', ssh_key_ids=ssh_keys_ids, hostname='example', description='example instance') diff --git a/docs/source/examples/instances_and_volumes.rst b/docs/source/examples/instances_and_volumes.rst index 6701c4c..d3852e1 100644 --- a/docs/source/examples/instances_and_volumes.rst +++ b/docs/source/examples/instances_and_volumes.rst @@ -27,7 +27,7 @@ Instances and Volumes # Create instance with extra attached volumes instance_with_extra_volumes = datacrunch.instances.create(instance_type='1V100.6V', - image='fastai', + image='ubuntu-24.04-cuda-12.8-open-docker', ssh_key_ids=ssh_keys, hostname='example', description='example instance', @@ -38,7 +38,7 @@ Instances and Volumes # Create instance with custom OS volume size and name instance_with_custom_os_volume = datacrunch.instances.create(instance_type='1V100.6V', - image='fastai', + image='ubuntu-24.04-cuda-12.8-open-docker', ssh_key_ids=ssh_keys, hostname='example', description='example instance', diff --git a/docs/source/examples/simple_create_instance.rst b/docs/source/examples/simple_create_instance.rst index 72f3051..c845321 100644 --- a/docs/source/examples/simple_create_instance.rst +++ b/docs/source/examples/simple_create_instance.rst @@ -19,7 +19,7 @@ Simple Create Instance # Create a new instance instance = datacrunch.instances.create(instance_type='1V100.6V', - image='fastai', + image='ubuntu-24.04-cuda-12.8-open-docker', ssh_key_ids=ssh_keys_ids, hostname='example', description='example instance') diff --git a/docs/source/index.rst b/docs/source/index.rst index a96c7cb..33ddc67 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -37,7 +37,7 @@ Deploy a new instance: # Create a new instance instance = datacrunch.instances.create(instance_type='1V100.6V', - image='fastai', + image='ubuntu-24.04-cuda-12.8-open-docker', ssh_key_ids=ssh_keys_ids, hostname='example', description='example instance') diff --git a/examples/advanced_create_instance.py b/examples/advanced_create_instance.py index 564b032..b46f9bb 100644 --- a/examples/advanced_create_instance.py +++ b/examples/advanced_create_instance.py @@ -51,18 +51,18 @@ if price_per_hour * DURATION < balance.amount: # Deploy a new 8V instance instance = datacrunch.instances.create(instance_type=INSTANCE_TYPE_8V, - image='fastai', + image='ubuntu-24.04-cuda-12.8-open-docker', ssh_key_ids=ssh_keys_ids, hostname='example', description='large instance', os_volume={ - "name": "Large OS volume", - "size": 95 - }) + "name": "Large OS volume", + "size": 95 + }) else: # Deploy a new 4V instance instance = datacrunch.instances.create(instance_type=INSTANCE_TYPE_4V, - image='fastai', + image='ubuntu-24.04-cuda-12.8-open-docker', ssh_key_ids=ssh_keys_ids, hostname='example', description='medium instance') diff --git a/examples/instance_actions.py b/examples/instance_actions.py index ceb7006..0a0909b 100644 --- a/examples/instance_actions.py +++ b/examples/instance_actions.py @@ -17,7 +17,7 @@ # Create a new 1V100.6V instance instance = datacrunch.instances.create(instance_type='1V100.6V', - image='fastai', + image='ubuntu-24.04-cuda-12.8-open-docker', ssh_key_ids=ssh_keys_ids, hostname='example', description='example instance') @@ -27,28 +27,31 @@ # Try to shutdown instance right away, # encounter an error (because it's still provisioning) try: - datacrunch.instances.action(instance.id, datacrunch.constants.instance_actions.SHUTDOWN) + datacrunch.instances.action( + instance.id, datacrunch.constants.instance_actions.SHUTDOWN) except APIException as exception: print(exception) # we were too eager... # Wait until instance is running (check every 30sec), only then shut it down -while(instance.status != datacrunch.constants.instance_status.RUNNING): +while (instance.status != datacrunch.constants.instance_status.RUNNING): time.sleep(30) instance = datacrunch.instances.get_by_id(instance.id) # Shutdown! try: - datacrunch.instances.action(instance.id, datacrunch.constants.instance_actions.SHUTDOWN) + datacrunch.instances.action( + instance.id, datacrunch.constants.instance_actions.SHUTDOWN) except APIException as exception: print(exception) # no exception this time # Wait until instance is offline (check every 30sec), only then hibernate -while(instance.status != datacrunch.constants.instance_status.OFFLINE): +while (instance.status != datacrunch.constants.instance_status.OFFLINE): time.sleep(30) instance = datacrunch.instances.get_by_id(instance.id) # Hibernate the instance try: - datacrunch.instances.action(instance.id, datacrunch.constants.instance_actions.HIBERNATE) + datacrunch.instances.action( + instance.id, datacrunch.constants.instance_actions.HIBERNATE) except APIException as exception: print(exception) diff --git a/examples/instances_and_volumes.py b/examples/instances_and_volumes.py index 369b5b3..624584d 100644 --- a/examples/instances_and_volumes.py +++ b/examples/instances_and_volumes.py @@ -21,25 +21,27 @@ # Create instance with extra attached volumes instance_with_extra_volumes = datacrunch.instances.create(instance_type='1V100.6V', - image='fastai', + image='ubuntu-24.04-cuda-12.8-open-docker', ssh_key_ids=ssh_keys, hostname='example', description='example instance', volumes=[ - {"type": HDD, "name": "volume-1", "size": 95}, - {"type": NVMe, "name": "volume-2", "size": 95} + {"type": HDD, "name": "volume-1", + "size": 95}, + {"type": NVMe, + "name": "volume-2", "size": 95} ]) # Create instance with custom OS volume size and name instance_with_custom_os_volume = datacrunch.instances.create(instance_type='1V100.6V', - image='fastai', - ssh_key_ids=ssh_keys, - hostname='example', - description='example instance', - os_volume={ - "name": "OS volume", - "size": 95 - }) + image='ubuntu-24.04-cuda-12.8-open-docker', + ssh_key_ids=ssh_keys, + hostname='example', + description='example instance', + os_volume={ + "name": "OS volume", + "size": 95 + }) # Create instance with existing OS volume as an image instance_with_existing_os_volume = datacrunch.instances.create(instance_type='1V100.6V', diff --git a/examples/simple_create_instance.py b/examples/simple_create_instance.py index 0742322..b98173f 100644 --- a/examples/simple_create_instance.py +++ b/examples/simple_create_instance.py @@ -14,10 +14,11 @@ # Create a new instance instance = datacrunch.instances.create(instance_type='1V100.6V', - image='fastai', + image='ubuntu-24.04-cuda-12.8-open-docker', ssh_key_ids=ssh_keys_ids, hostname='example', description='example instance') # Delete instance -datacrunch.instances.action(instance.id, datacrunch.constants.instance_actions.DELETE) +datacrunch.instances.action( + instance.id, datacrunch.constants.instance_actions.DELETE) diff --git a/tests/unit_tests/instances/test_instances.py b/tests/unit_tests/instances/test_instances.py index 6ad85da..7030ea6 100644 --- a/tests/unit_tests/instances/test_instances.py +++ b/tests/unit_tests/instances/test_instances.py @@ -13,7 +13,7 @@ OS_VOLUME_ID = '46fc0247-8f65-4d8a-ad73-852a8b3dc1d3' INSTANCE_TYPE = "1V100.6V" -INSTANCE_IMAGE = "fastai" +INSTANCE_IMAGE = "ubuntu-24.04-cuda-12.8-open-docker" INSTANCE_HOSTNAME = "I'll be your host for today" INSTANCE_DESCRIPTION = "hope you enjoy your GPU" INSTANCE_STATUS = 'running' From d73f897edb19b0ac02cb90040425d563e1bda3a9 Mon Sep 17 00:00:00 2001 From: Tamir Date: Thu, 3 Apr 2025 11:42:20 +0300 Subject: [PATCH 06/22] fix --- datacrunch/InferenceClient/__init__.py | 3 +++ .../InferenceClient/inference_client.py | 22 +++++++++---------- datacrunch/containers/containers.py | 2 +- 3 files changed, 15 insertions(+), 12 deletions(-) diff --git a/datacrunch/InferenceClient/__init__.py b/datacrunch/InferenceClient/__init__.py index e69de29..e680d86 100644 --- a/datacrunch/InferenceClient/__init__.py +++ b/datacrunch/InferenceClient/__init__.py @@ -0,0 +1,3 @@ +from .inference_client import InferenceClient, InferenceResponse + +__all__ = ['InferenceClient', 'InferenceResponse'] diff --git a/datacrunch/InferenceClient/inference_client.py b/datacrunch/InferenceClient/inference_client.py index a592c83..b8028ec 100644 --- a/datacrunch/InferenceClient/inference_client.py +++ b/datacrunch/InferenceClient/inference_client.py @@ -23,22 +23,24 @@ class InferenceResponse: @dataclass_json(undefined=Undefined.EXCLUDE) @dataclass class AsyncInferenceExecution: - id: str - status: str - _client: 'InferenceClient' + id: str + status: str # TODO: add a status enum + # TODO: Implement when the status endpoint is done def status(self) -> str: - # TODO: Call the status endpoint and update the status + # Call the status endpoint and update the status when return self.status + # TODO: Implement when the cancel inference execution endpoint is done # def cancel(self) -> None: - # # TODO: Call the cancel inference executionendpoint # pass + # TODO: Implement when the results endpoint is done def get_results(self) -> Dict[str, Any]: - # TODO: Call the results endpoint pass + # alias for get_results + output = get_results class InferenceClient: @@ -186,12 +188,10 @@ def run(self, data: Dict[str, Any], path: str = "", timeout_seconds: int = 60 * response = self.post( path, json=data, timeout_seconds=timeout_seconds, headers=headers) - # TODO: create an async response class: - # TODO: add a method to check the status of the async request - # TODO: add a method to cancel the async request - # TODO: add a method to get the results of the async request + # TODO: this response format isn't final + execution_id = response.json()['id'] - return '837cdf50-6cf1-44b0-884e-ed115e700480' + return AsyncInferenceExecution(self, execution_id) def get(self, path: str, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, timeout_seconds: Optional[int] = None) -> requests.Response: return self._make_request('GET', path, params=params, headers=headers, timeout_seconds=timeout_seconds) diff --git a/datacrunch/containers/containers.py b/datacrunch/containers/containers.py index 4fedbdf..2fb01ea 100644 --- a/datacrunch/containers/containers.py +++ b/datacrunch/containers/containers.py @@ -10,7 +10,7 @@ from enum import Enum from datacrunch.http_client.http_client import HTTPClient -from datacrunch.InferenceClient.inference_client import InferenceClient, InferenceResponse +from datacrunch.InferenceClient import InferenceClient, InferenceResponse # API endpoints From ba00acb5c3b21e0d0316a5471963507a9aba9a71 Mon Sep 17 00:00:00 2001 From: Tamir Date: Thu, 3 Apr 2025 11:45:32 +0300 Subject: [PATCH 07/22] fixed test --- tests/unit_tests/containers/test_containers.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/tests/unit_tests/containers/test_containers.py b/tests/unit_tests/containers/test_containers.py index 030ccbe..54cec26 100644 --- a/tests/unit_tests/containers/test_containers.py +++ b/tests/unit_tests/containers/test_containers.py @@ -8,12 +8,10 @@ SECRETS_ENDPOINT, SERVERLESS_COMPUTE_RESOURCES_ENDPOINT, Container, - ContainerInfo, ContainerDeploymentStatus, ContainerRegistrySettings, ContainersService, Deployment, - DeploymentInfo, EnvVar, EnvVarType, EntrypointOverridesSettings, @@ -215,10 +213,10 @@ def test_get_deployments(self, containers_service, deployments_endpoint): # assert assert type(deployments) == list assert len(deployments) == 1 - assert type(deployment) == DeploymentInfo + assert type(deployment) == Deployment assert deployment.name == DEPLOYMENT_NAME assert len(deployment.containers) == 1 - assert type(deployment.containers[0]) == ContainerInfo + assert type(deployment.containers[0]) == Container assert type(deployment.compute) == ComputeResource assert deployment.compute.name == COMPUTE_RESOURCE_NAME assert responses.assert_call_count(deployments_endpoint, 1) is True @@ -238,7 +236,7 @@ def test_get_deployment_by_name(self, containers_service, deployments_endpoint): deployment = containers_service.get_deployment_by_name(DEPLOYMENT_NAME) # assert - assert type(deployment) == DeploymentInfo + assert type(deployment) == Deployment assert deployment.name == DEPLOYMENT_NAME assert len(deployment.containers) == 1 assert deployment.containers[0].name == CONTAINER_NAME @@ -305,7 +303,7 @@ def test_create_deployment(self, containers_service, deployments_endpoint): created_deployment = containers_service.create_deployment(deployment) # assert - assert type(created_deployment) == DeploymentInfo + assert type(created_deployment) == Deployment assert created_deployment.name == DEPLOYMENT_NAME assert len(created_deployment.containers) == 1 assert created_deployment.containers[0].name == CONTAINER_NAME @@ -324,7 +322,7 @@ def test_update_deployment(self, containers_service, deployments_endpoint): ) # create deployment object - container = ContainerInfo( + container = Container( name=CONTAINER_NAME, image="nginx:latest", exposed_port=80 @@ -335,7 +333,7 @@ def test_update_deployment(self, containers_service, deployments_endpoint): compute = ComputeResource(name=COMPUTE_RESOURCE_NAME, size=1) - deployment = DeploymentInfo( + deployment = Deployment( name=DEPLOYMENT_NAME, container_registry_settings=container_registry_settings, containers=[container], @@ -347,7 +345,7 @@ def test_update_deployment(self, containers_service, deployments_endpoint): DEPLOYMENT_NAME, deployment) # assert - assert type(updated_deployment) == DeploymentInfo + assert type(updated_deployment) == Deployment assert updated_deployment.name == DEPLOYMENT_NAME assert len(updated_deployment.containers) == 1 assert updated_deployment.containers[0].name == CONTAINER_NAME From 35bc29d0b53513b8015628fdf4ebe65c8d11f7d7 Mon Sep 17 00:00:00 2001 From: Tamir Date: Thu, 3 Apr 2025 15:15:06 +0300 Subject: [PATCH 08/22] created a sync inference example, improved sglang example --- .../calling_the_endpoint_synchronously.py | 33 ++++ .../containers/sglang_deployment_example.py | 149 +++++++----------- 2 files changed, 86 insertions(+), 96 deletions(-) create mode 100644 examples/containers/calling_the_endpoint_synchronously.py diff --git a/examples/containers/calling_the_endpoint_synchronously.py b/examples/containers/calling_the_endpoint_synchronously.py new file mode 100644 index 0000000..91fcba9 --- /dev/null +++ b/examples/containers/calling_the_endpoint_synchronously.py @@ -0,0 +1,33 @@ +import os +from datacrunch import DataCrunchClient + +# Configuration - replace with your deployment name +DEPLOYMENT_NAME = "sglang-deployment-example" + +# Get client secret and id from environment variables +DATACRUNCH_CLIENT_ID = os.environ.get('DATACRUNCH_CLIENT_ID') +DATACRUNCH_CLIENT_SECRET = os.environ.get('DATACRUNCH_CLIENT_SECRET') +DATACRUNCH_INFERENCE_KEY = os.environ.get('DATACRUNCH_INFERENCE_KEY') + +# DataCrunch client instance +datacrunch = DataCrunchClient(DATACRUNCH_CLIENT_ID, DATACRUNCH_CLIENT_SECRET) + +# Get the deployment +deployment = datacrunch.containers.get_deployment_by_name(DEPLOYMENT_NAME) + +# Make a synchronous request to the endpoint. +# This example demonstrates calling a SGLang deployment which serves LLMs using an OpenAI-compatible API format +data = { + "model": "deepseek-ai/deepseek-llm-7b-chat", + "prompt": "Is consciousness fundamentally computational, or is there something more to subjective experience that cannot be reduced to information processing?", + "max_tokens": 128, + "temperature": 0.7, + "top_p": 0.9 +} +response = deployment.run_sync( + data=data, + path='v1/completions' +) # wait for the response + +# Print the response +print(response.body) diff --git a/examples/containers/sglang_deployment_example.py b/examples/containers/sglang_deployment_example.py index df668db..7903f9f 100644 --- a/examples/containers/sglang_deployment_example.py +++ b/examples/containers/sglang_deployment_example.py @@ -8,8 +8,7 @@ import time import signal import sys -import requests - +from datetime import datetime from datacrunch import DataCrunchClient from datacrunch.exceptions import APIException from datacrunch.containers.containers import ( @@ -29,19 +28,20 @@ ContainerDeploymentStatus, ) +CURRENT_TIMESTAMP = datetime.now().strftime( + "%Y%m%d-%H%M%S").lower() # e.g. 20250403-120000 + # Configuration constants -DEPLOYMENT_NAME = "sglang-deployment-tutorial" -CONTAINER_NAME = "sglang-server" +DEPLOYMENT_NAME = f"sglang-deployment-example-{CURRENT_TIMESTAMP}" MODEL_PATH = "deepseek-ai/deepseek-llm-7b-chat" HF_SECRET_NAME = "huggingface-token" IMAGE_URL = "docker.io/lmsysorg/sglang:v0.4.1.post6-cu124" -CONTAINERS_API_URL = f'https://containers.datacrunch.io/{DEPLOYMENT_NAME}' # Get confidential values from environment variables DATACRUNCH_CLIENT_ID = os.environ.get('DATACRUNCH_CLIENT_ID') DATACRUNCH_CLIENT_SECRET = os.environ.get('DATACRUNCH_CLIENT_SECRET') +INFERENCE_KEY = os.environ.get('INFERENCE_KEY') HF_TOKEN = os.environ.get('HF_TOKEN') -INFERENCE_API_KEY = os.environ.get('INFERENCE_API_KEY') # DataCrunch client instance (global for graceful shutdown) datacrunch = None @@ -99,81 +99,24 @@ def graceful_shutdown(signum, frame) -> None: sys.exit(0) -def test_deployment(base_url: str, api_key: str) -> None: - """Test the deployment with a simple request. - - Args: - base_url: The base URL of the deployment - api_key: The API key for authentication - """ - # First, check if the model info endpoint is working - model_info_url = f"{base_url}/get_model_info" - headers = { - 'Authorization': f'Bearer {api_key}', - 'Content-Type': 'application/json' - } - - try: - print("\nTesting /get_model_info endpoint...") - response = requests.get(model_info_url, headers=headers) - if response.status_code == 200: - print("Model info endpoint is working!") - print(f"Response: {response.json()}") - else: - print(f"Request failed with status code {response.status_code}") - print(f"Response: {response.text}") - return - - # Now test completions endpoint - print("\nTesting completions API with streaming...") - completions_url = f"{base_url}/v1/completions" - - headers = { - 'Content-Type': 'application/json', - 'Authorization': f'Bearer {api_key}', - 'Accept': 'text/event-stream', - 'Cache-Control': 'no-cache', - 'Connection': 'keep-alive', - } - - data = { - "model": MODEL_PATH, - "prompt": "Solar wind is a curious phenomenon. Tell me more about it", - "max_tokens": 128, - "temperature": 0.7, - "top_p": 0.9, - "stream": True - } - - with requests.post(completions_url, headers=headers, json=data, stream=True) as response: - if response.status_code == 200: - print("Stream started. Receiving first 5 events...\n") - for i, line in enumerate(response.iter_lines(decode_unicode=True)): - if line: - print(line) - if i >= 4: # Only show first 5 events - print("...(response continues)...") - break - else: - print( - f"Request failed with status code {response.status_code}") - print(f"Response: {response.text}") - - except requests.RequestException as e: - print(f"An error occurred: {e}") - - def main() -> None: """Main function demonstrating SGLang deployment.""" try: - if not HF_TOKEN: - print("Please set HF_TOKEN environment variable with your Hugging Face token") - return + # Get the inference API key + inference_key = INFERENCE_KEY + if not inference_key: + inference_key = input( + "Enter your Inference API Key from the DataCrunch dashboard: ") + else: + print("Using Inference API Key from environment") - # Initialize client + # Initialize client with inference key global datacrunch datacrunch = DataCrunchClient( - DATACRUNCH_CLIENT_ID, DATACRUNCH_CLIENT_SECRET) + DATACRUNCH_CLIENT_ID, + DATACRUNCH_CLIENT_SECRET, + inference_key=inference_key + ) # Register signal handlers for cleanup signal.signal(signal.SIGINT, graceful_shutdown) @@ -188,6 +131,10 @@ def main() -> None: secret.name == HF_SECRET_NAME for secret in existing_secrets) if not secret_exists: + # check is HF_TOKEN is set, if not, prompt the user + if not HF_TOKEN: + HF_TOKEN = input( + "Enter your Hugging Face token: ") datacrunch.containers.create_secret( HF_SECRET_NAME, HF_TOKEN) print(f"Secret '{HF_SECRET_NAME}' created successfully") @@ -258,7 +205,8 @@ def main() -> None: ) # Create the deployment - created_deployment = datacrunch.containers.create(deployment) + created_deployment = datacrunch.containers.create_deployment( + deployment) print(f"Created deployment: {created_deployment.name}") print("This will take several minutes while the model is downloaded and the server starts...") @@ -268,28 +216,37 @@ def main() -> None: cleanup_resources(datacrunch) return - # Get the deployment endpoint URL and inference API key - containers_api_url = CONTAINERS_API_URL - inference_api_key = INFERENCE_API_KEY - - # If not provided as environment variables, prompt the user - if not containers_api_url: - containers_api_url = input( - "Enter your Containers API URL from the DataCrunch dashboard: ") - else: + # Test the deployment with a simple request + print("\nTesting the deployment...") + try: + # Test model info endpoint print( - f"Using Containers API URL from environment: {containers_api_url}") - - if not inference_api_key: - inference_api_key = input( - "Enter your Inference API Key from the DataCrunch dashboard: ") - else: - print("Using Inference API Key from environment") + "Testing /get_model_info endpoint by making a sync GET request to the SGLang server...") + model_info_response = created_deployment._inference_client.get( + path="/get_model_info") + print("Model info endpoint is working!") + print(f"Response: {model_info_response}") + + # Test completions endpoint + print("\nTesting completions API...") + completions_data = { + "model": MODEL_PATH, + "prompt": "Is consciousness fundamentally computational, or is there something more to subjective experience that cannot be reduced to information processing?", + "max_tokens": 128, + "temperature": 0.7, + "top_p": 0.9, + } + + # Make a sync inference request to the SGLang server + completions_response = created_deployment.run_sync( + completions_data, + path="/v1/completions", + ) + print("Completions API is working!") + print(f"Response: {completions_response}") - # Test the deployment - if containers_api_url and inference_api_key: - print("\nTesting the deployment...") - test_deployment(containers_api_url, inference_api_key) + except Exception as e: + print(f"Error testing deployment: {e}") # Cleanup or keep running based on user input keep_running = input( From 847021985ca9a8beb14f17a9f020e2abb0261d2f Mon Sep 17 00:00:00 2001 From: Tamir Date: Thu, 3 Apr 2025 15:38:57 +0300 Subject: [PATCH 09/22] added doc generation --- docs/source/examples/containers/inference_sync.rst | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 docs/source/examples/containers/inference_sync.rst diff --git a/docs/source/examples/containers/inference_sync.rst b/docs/source/examples/containers/inference_sync.rst new file mode 100644 index 0000000..a4c6ff2 --- /dev/null +++ b/docs/source/examples/containers/inference_sync.rst @@ -0,0 +1,8 @@ +Calling the inference endpoint in sync mode +=========================================== + +This example demonstrates how to call the inference endpoint in sync mode. + +.. literalinclude:: ../../../../examples/containers/calling_the_inference_endpoint_in_sync_mode.py + :language: python + :caption: Calling the inference endpoint in sync mode \ No newline at end of file From 01c8d54798d6e75454ec3787050f96da64914422 Mon Sep 17 00:00:00 2001 From: Tamir Date: Thu, 3 Apr 2025 15:49:04 +0300 Subject: [PATCH 10/22] simplify --- .../containers/sglang_deployment_example.py | 315 +++++++++--------- 1 file changed, 153 insertions(+), 162 deletions(-) diff --git a/examples/containers/sglang_deployment_example.py b/examples/containers/sglang_deployment_example.py index 7903f9f..c05b4fd 100644 --- a/examples/containers/sglang_deployment_example.py +++ b/examples/containers/sglang_deployment_example.py @@ -43,9 +43,6 @@ INFERENCE_KEY = os.environ.get('INFERENCE_KEY') HF_TOKEN = os.environ.get('HF_TOKEN') -# DataCrunch client instance (global for graceful shutdown) -datacrunch = None - def wait_for_deployment_health(datacrunch_client: DataCrunchClient, deployment_name: str, max_attempts: int = 20, delay: int = 30) -> bool: """Wait for deployment to reach healthy status. @@ -99,174 +96,168 @@ def graceful_shutdown(signum, frame) -> None: sys.exit(0) -def main() -> None: - """Main function demonstrating SGLang deployment.""" +try: + # Get the inference API key + inference_key = INFERENCE_KEY + if not inference_key: + inference_key = input( + "Enter your Inference API Key from the DataCrunch dashboard: ") + else: + print("Using Inference API Key from environment") + + # Initialize client with inference key + datacrunch = DataCrunchClient( + DATACRUNCH_CLIENT_ID, + DATACRUNCH_CLIENT_SECRET, + inference_key=inference_key + ) + + # Register signal handlers for cleanup + signal.signal(signal.SIGINT, graceful_shutdown) + signal.signal(signal.SIGTERM, graceful_shutdown) + + # Create a secret for the Hugging Face token + print(f"Creating secret for Hugging Face token: {HF_SECRET_NAME}") try: - # Get the inference API key - inference_key = INFERENCE_KEY - if not inference_key: - inference_key = input( - "Enter your Inference API Key from the DataCrunch dashboard: ") + # Check if secret already exists + existing_secrets = datacrunch.containers.get_secrets() + secret_exists = any( + secret.name == HF_SECRET_NAME for secret in existing_secrets) + + if not secret_exists: + # check is HF_TOKEN is set, if not, prompt the user + if not HF_TOKEN: + HF_TOKEN = input( + "Enter your Hugging Face token: ") + datacrunch.containers.create_secret( + HF_SECRET_NAME, HF_TOKEN) + print(f"Secret '{HF_SECRET_NAME}' created successfully") else: - print("Using Inference API Key from environment") - - # Initialize client with inference key - global datacrunch - datacrunch = DataCrunchClient( - DATACRUNCH_CLIENT_ID, - DATACRUNCH_CLIENT_SECRET, - inference_key=inference_key - ) - - # Register signal handlers for cleanup - signal.signal(signal.SIGINT, graceful_shutdown) - signal.signal(signal.SIGTERM, graceful_shutdown) - - # Create a secret for the Hugging Face token - print(f"Creating secret for Hugging Face token: {HF_SECRET_NAME}") - try: - # Check if secret already exists - existing_secrets = datacrunch.containers.get_secrets() - secret_exists = any( - secret.name == HF_SECRET_NAME for secret in existing_secrets) - - if not secret_exists: - # check is HF_TOKEN is set, if not, prompt the user - if not HF_TOKEN: - HF_TOKEN = input( - "Enter your Hugging Face token: ") - datacrunch.containers.create_secret( - HF_SECRET_NAME, HF_TOKEN) - print(f"Secret '{HF_SECRET_NAME}' created successfully") - else: - print( - f"Secret '{HF_SECRET_NAME}' already exists, using existing secret") - except APIException as e: - print(f"Error creating secret: {e}") - return - - # Create container configuration - container = Container( - image=IMAGE_URL, - exposed_port=30000, - healthcheck=HealthcheckSettings( + print( + f"Secret '{HF_SECRET_NAME}' already exists, using existing secret") + except APIException as e: + print(f"Error creating secret: {e}") + sys.exit(1) + + # Create container configuration + container = Container( + image=IMAGE_URL, + exposed_port=30000, + healthcheck=HealthcheckSettings( + enabled=True, + port=30000, + path="/health" + ), + entrypoint_overrides=EntrypointOverridesSettings( + enabled=True, + cmd=["python3", "-m", "sglang.launch_server", "--model-path", + MODEL_PATH, "--host", "0.0.0.0", "--port", "30000"] + ), + env=[ + EnvVar( + name="HF_TOKEN", + value_or_reference_to_secret=HF_SECRET_NAME, + type=EnvVarType.SECRET + ) + ] + ) + + # Create scaling configuration - default values + scaling_options = ScalingOptions( + min_replica_count=1, + max_replica_count=2, + scale_down_policy=ScalingPolicy(delay_seconds=300), + scale_up_policy=ScalingPolicy(delay_seconds=300), + queue_message_ttl_seconds=500, + concurrent_requests_per_replica=1, + scaling_triggers=ScalingTriggers( + queue_load=QueueLoadScalingTrigger(threshold=1), + cpu_utilization=UtilizationScalingTrigger( enabled=True, - port=30000, - path="/health" + threshold=90 ), - entrypoint_overrides=EntrypointOverridesSettings( + gpu_utilization=UtilizationScalingTrigger( enabled=True, - cmd=["python3", "-m", "sglang.launch_server", "--model-path", - MODEL_PATH, "--host", "0.0.0.0", "--port", "30000"] - ), - env=[ - EnvVar( - name="HF_TOKEN", - value_or_reference_to_secret=HF_SECRET_NAME, - type=EnvVarType.SECRET - ) - ] - ) - - # Create scaling configuration - default values - scaling_options = ScalingOptions( - min_replica_count=1, - max_replica_count=2, - scale_down_policy=ScalingPolicy(delay_seconds=300), - scale_up_policy=ScalingPolicy(delay_seconds=300), - queue_message_ttl_seconds=500, - concurrent_requests_per_replica=1, - scaling_triggers=ScalingTriggers( - queue_load=QueueLoadScalingTrigger(threshold=1), - cpu_utilization=UtilizationScalingTrigger( - enabled=True, - threshold=90 - ), - gpu_utilization=UtilizationScalingTrigger( - enabled=True, - threshold=90 - ) + threshold=90 ) ) + ) + + # Create registry and compute settings + registry_settings = ContainerRegistrySettings(is_private=False) + # For a 7B model, General Compute (24GB VRAM) is sufficient + compute = ComputeResource(name="General Compute", size=1) + + # Create deployment object + deployment = Deployment( + name=DEPLOYMENT_NAME, + container_registry_settings=registry_settings, + containers=[container], + compute=compute, + scaling=scaling_options, + is_spot=False + ) + + # Create the deployment + created_deployment = datacrunch.containers.create_deployment( + deployment) + print(f"Created deployment: {created_deployment.name}") + print("This will take several minutes while the model is downloaded and the server starts...") + + # Wait for deployment to be healthy + if not wait_for_deployment_health(datacrunch, DEPLOYMENT_NAME): + print("Deployment health check failed") + cleanup_resources(datacrunch) + sys.exit(1) - # Create registry and compute settings - registry_settings = ContainerRegistrySettings(is_private=False) - # For a 7B model, General Compute (24GB VRAM) is sufficient - compute = ComputeResource(name="General Compute", size=1) - - # Create deployment object - deployment = Deployment( - name=DEPLOYMENT_NAME, - container_registry_settings=registry_settings, - containers=[container], - compute=compute, - scaling=scaling_options, - is_spot=False + # Test the deployment with a simple request + print("\nTesting the deployment...") + try: + # Test model info endpoint + print( + "Testing /get_model_info endpoint by making a sync GET request to the SGLang server...") + model_info_response = created_deployment._inference_client.get( + path="/get_model_info") + print("Model info endpoint is working!") + print(f"Response: {model_info_response}") + + # Test completions endpoint + print("\nTesting completions API...") + completions_data = { + "model": MODEL_PATH, + "prompt": "Is consciousness fundamentally computational, or is there something more to subjective experience that cannot be reduced to information processing?", + "max_tokens": 128, + "temperature": 0.7, + "top_p": 0.9, + } + + # Make a sync inference request to the SGLang server + completions_response = created_deployment.run_sync( + completions_data, + path="/v1/completions", ) - - # Create the deployment - created_deployment = datacrunch.containers.create_deployment( - deployment) - print(f"Created deployment: {created_deployment.name}") - print("This will take several minutes while the model is downloaded and the server starts...") - - # Wait for deployment to be healthy - if not wait_for_deployment_health(datacrunch, DEPLOYMENT_NAME): - print("Deployment health check failed") - cleanup_resources(datacrunch) - return - - # Test the deployment with a simple request - print("\nTesting the deployment...") - try: - # Test model info endpoint - print( - "Testing /get_model_info endpoint by making a sync GET request to the SGLang server...") - model_info_response = created_deployment._inference_client.get( - path="/get_model_info") - print("Model info endpoint is working!") - print(f"Response: {model_info_response}") - - # Test completions endpoint - print("\nTesting completions API...") - completions_data = { - "model": MODEL_PATH, - "prompt": "Is consciousness fundamentally computational, or is there something more to subjective experience that cannot be reduced to information processing?", - "max_tokens": 128, - "temperature": 0.7, - "top_p": 0.9, - } - - # Make a sync inference request to the SGLang server - completions_response = created_deployment.run_sync( - completions_data, - path="/v1/completions", - ) - print("Completions API is working!") - print(f"Response: {completions_response}") - - except Exception as e: - print(f"Error testing deployment: {e}") - - # Cleanup or keep running based on user input - keep_running = input( - "\nDo you want to keep the deployment running? (y/n): ") - if keep_running.lower() != 'y': - cleanup_resources(datacrunch) - else: - print( - f"Deployment {DEPLOYMENT_NAME} is running. Don't forget to delete it when finished.") - print("You can delete it from the DataCrunch dashboard or by running:") - print(f"datacrunch.containers.delete('{DEPLOYMENT_NAME}')") + print("Completions API is working!") + print(f"Response: {completions_response}") except Exception as e: - print(f"Unexpected error: {e}") - # Attempt cleanup even if there was an error - try: - cleanup_resources(datacrunch) - except Exception as cleanup_error: - print(f"Error during cleanup after failure: {cleanup_error}") - + print(f"Error testing deployment: {e}") -if __name__ == "__main__": - main() + # Cleanup or keep running based on user input + keep_running = input( + "\nDo you want to keep the deployment running? (y/n): ") + if keep_running.lower() != 'y': + cleanup_resources(datacrunch) + else: + print( + f"Deployment {DEPLOYMENT_NAME} is running. Don't forget to delete it when finished.") + print("You can delete it from the DataCrunch dashboard or by running:") + print(f"datacrunch.containers.delete('{DEPLOYMENT_NAME}')") + +except Exception as e: + print(f"Unexpected error: {e}") + # Attempt cleanup even if there was an error + try: + cleanup_resources(datacrunch) + except Exception as cleanup_error: + print(f"Error during cleanup after failure: {cleanup_error}") + sys.exit(1) From 8307fe390e7de73834884b0c64bd3ac93b616a06 Mon Sep 17 00:00:00 2001 From: Tamir Date: Fri, 4 Apr 2025 08:50:10 +0300 Subject: [PATCH 11/22] added filter params to get_compute_resources, simplify example --- datacrunch/containers/containers.py | 16 +++- .../containers/compute_resources_example.py | 91 +++++-------------- 2 files changed, 35 insertions(+), 72 deletions(-) diff --git a/datacrunch/containers/containers.py b/datacrunch/containers/containers.py index 2fb01ea..1c0a169 100644 --- a/datacrunch/containers/containers.py +++ b/datacrunch/containers/containers.py @@ -868,17 +868,27 @@ def delete_deployment_environment_variables(self, deployment_name: str, containe env_var) for env_var in env_vars] return result - def get_compute_resources(self) -> List[ComputeResource]: - """Retrieves available compute resources. + def get_compute_resources(self, size: int = None, is_available: bool = None) -> List[ComputeResource]: + """Retrieves compute resources, optionally filtered by size and availability. + + Args: + size: Optional size to filter resources by (e.g. 8 for 8x GPUs) + available: Optional boolean to filter by availability status Returns: - List[ComputeResource]: List of available compute resources. + List[ComputeResource]: List of compute resources matching the filters. + If no filters provided, returns all resources. """ response = self.client.get(SERVERLESS_COMPUTE_RESOURCES_ENDPOINT) resources = [] for resource_group in response.json(): for resource in resource_group: resources.append(ComputeResource.from_dict(resource)) + if size: + resources = [r for r in resources if r.size == size] + if is_available: + resources = [ + r for r in resources if r.is_available == is_available] return resources # Function alias diff --git a/examples/containers/compute_resources_example.py b/examples/containers/compute_resources_example.py index a2501c0..e6f2758 100644 --- a/examples/containers/compute_resources_example.py +++ b/examples/containers/compute_resources_example.py @@ -1,76 +1,29 @@ import os from datacrunch import DataCrunchClient -from typing import List -from datacrunch.containers.containers import ComputeResource # Get client secret and id from environment variables DATACRUNCH_CLIENT_ID = os.environ.get('DATACRUNCH_CLIENT_ID') DATACRUNCH_CLIENT_SECRET = os.environ.get('DATACRUNCH_CLIENT_SECRET') - -def list_all_compute_resources(client: DataCrunchClient) -> List[ComputeResource]: - """List all available compute resources. - - Args: - client (DataCrunchClient): The DataCrunch API client. - - Returns: - List[ComputeResource]: List of all compute resources. - """ - return client.containers.get_compute_resources() - - -def list_available_compute_resources(client: DataCrunchClient) -> List[ComputeResource]: - """List only the available compute resources. - - Args: - client (DataCrunchClient): The DataCrunch API client. - - Returns: - List[ComputeResource]: List of available compute resources. - """ - all_resources = client.containers.get_compute_resources() - return [r for r in all_resources if r.is_available] - - -def list_compute_resources_by_size(client: DataCrunchClient, size: int) -> List[ComputeResource]: - """List compute resources filtered by size. - - Args: - client (DataCrunchClient): The DataCrunch API client. - size (int): The size to filter by. - - Returns: - List[ComputeResource]: List of compute resources with the specified size. - """ - all_resources = client.containers.get_compute_resources() - return [r for r in all_resources if r.size == size] - - -def main(): - # Initialize the client with your credentials - datacrunch = DataCrunchClient( - DATACRUNCH_CLIENT_ID, DATACRUNCH_CLIENT_SECRET) - - # Example 1: List all compute resources - print("All compute resources:") - all_resources = list_all_compute_resources(datacrunch) - for resource in all_resources: - print( - f"Name: {resource.name}, Size: {resource.size}, Available: {resource.is_available}") - - # Example 2: List available compute resources - print("Available compute resources:") - available_resources = list_available_compute_resources(datacrunch) - for resource in available_resources: - print(f"Name: {resource.name}, Size: {resource.size}") - - # Example 3: List compute resources of size 8 - print("Compute resources with size 8:") - size_8_resources = list_compute_resources_by_size(datacrunch, 8) - for resource in size_8_resources: - print(f"Name: {resource.name}, Available: {resource.is_available}") - - -if __name__ == "__main__": - main() +# Initialize the client with your credentials +datacrunch = DataCrunchClient(DATACRUNCH_CLIENT_ID, DATACRUNCH_CLIENT_SECRET) + +# Example 1: List all compute resources +print("All compute resources:") +all_resources = datacrunch.containers.get_compute_resources() +for resource in all_resources: + print( + f"Name: {resource.name}, Size: {resource.size}, Available: {resource.is_available}") + +# Example 2: List available compute resources +print("\nAvailable compute resources:") +available_resources = datacrunch.containers.get_compute_resources( + is_available=True) +for resource in available_resources: + print(f"Name: {resource.name}, Size: {resource.size}") + +# Example 3: List compute resources of size 8 +print("\nCompute resources with size 8:") +size_8_resources = datacrunch.containers.get_compute_resources(size=8) +for resource in size_8_resources: + print(f"Name: {resource.name}, Available: {resource.is_available}") From ea069f985aee296b42a27ffdb74af47f0ba9baca Mon Sep 17 00:00:00 2001 From: Tamir Date: Fri, 4 Apr 2025 08:57:03 +0300 Subject: [PATCH 12/22] added tests --- .../unit_tests/containers/test_containers.py | 91 ++++++++++++++++--- 1 file changed, 80 insertions(+), 11 deletions(-) diff --git a/tests/unit_tests/containers/test_containers.py b/tests/unit_tests/containers/test_containers.py index 54cec26..806b670 100644 --- a/tests/unit_tests/containers/test_containers.py +++ b/tests/unit_tests/containers/test_containers.py @@ -37,7 +37,8 @@ DEPLOYMENT_NAME = "test-deployment" CONTAINER_NAME = "test-container" -COMPUTE_RESOURCE_NAME = "test-compute" +COMPUTE_RESOURCE_NAME_GENERAL_COMPUTE = "General Compute" +COMPUTE_RESOURCE_NAME_H100 = "H100" SECRET_NAME = "test-secret" SECRET_VALUE = "test-secret-value" REGISTRY_CREDENTIAL_NAME = "test-credential" @@ -82,7 +83,7 @@ } ], "compute": { - "name": COMPUTE_RESOURCE_NAME, + "name": COMPUTE_RESOURCE_NAME_GENERAL_COMPUTE, "size": 1, "is_available": True }, @@ -118,12 +119,12 @@ # Sample compute resources data COMPUTE_RESOURCES_DATA = [ { - "name": COMPUTE_RESOURCE_NAME, + "name": COMPUTE_RESOURCE_NAME_GENERAL_COMPUTE, "size": 1, "is_available": True }, { - "name": "large-compute", + "name": COMPUTE_RESOURCE_NAME_H100, "size": 4, "is_available": True } @@ -218,7 +219,7 @@ def test_get_deployments(self, containers_service, deployments_endpoint): assert len(deployment.containers) == 1 assert type(deployment.containers[0]) == Container assert type(deployment.compute) == ComputeResource - assert deployment.compute.name == COMPUTE_RESOURCE_NAME + assert deployment.compute.name == COMPUTE_RESOURCE_NAME_GENERAL_COMPUTE assert responses.assert_call_count(deployments_endpoint, 1) is True @responses.activate @@ -240,7 +241,7 @@ def test_get_deployment_by_name(self, containers_service, deployments_endpoint): assert deployment.name == DEPLOYMENT_NAME assert len(deployment.containers) == 1 assert deployment.containers[0].name == CONTAINER_NAME - assert deployment.compute.name == COMPUTE_RESOURCE_NAME + assert deployment.compute.name == COMPUTE_RESOURCE_NAME_GENERAL_COMPUTE assert responses.assert_call_count(url, 1) is True @responses.activate @@ -285,7 +286,8 @@ def test_create_deployment(self, containers_service, deployments_endpoint): type=VolumeMountType.SCRATCH, mount_path="/data")] ) - compute = ComputeResource(name=COMPUTE_RESOURCE_NAME, size=1) + compute = ComputeResource( + name=COMPUTE_RESOURCE_NAME_GENERAL_COMPUTE, size=1) container_registry_settings = ContainerRegistrySettings( is_private=False) @@ -307,7 +309,7 @@ def test_create_deployment(self, containers_service, deployments_endpoint): assert created_deployment.name == DEPLOYMENT_NAME assert len(created_deployment.containers) == 1 assert created_deployment.containers[0].name == CONTAINER_NAME - assert created_deployment.compute.name == COMPUTE_RESOURCE_NAME + assert created_deployment.compute.name == COMPUTE_RESOURCE_NAME_GENERAL_COMPUTE assert responses.assert_call_count(deployments_endpoint, 1) is True @responses.activate @@ -331,7 +333,8 @@ def test_update_deployment(self, containers_service, deployments_endpoint): container_registry_settings = ContainerRegistrySettings( is_private=False) - compute = ComputeResource(name=COMPUTE_RESOURCE_NAME, size=1) + compute = ComputeResource( + name=COMPUTE_RESOURCE_NAME_GENERAL_COMPUTE, size=1) deployment = Deployment( name=DEPLOYMENT_NAME, @@ -349,7 +352,7 @@ def test_update_deployment(self, containers_service, deployments_endpoint): assert updated_deployment.name == DEPLOYMENT_NAME assert len(updated_deployment.containers) == 1 assert updated_deployment.containers[0].name == CONTAINER_NAME - assert updated_deployment.compute.name == COMPUTE_RESOURCE_NAME + assert updated_deployment.compute.name == COMPUTE_RESOURCE_NAME_GENERAL_COMPUTE assert responses.assert_call_count(url, 1) is True @responses.activate @@ -648,7 +651,73 @@ def test_get_compute_resources(self, containers_service, compute_resources_endpo assert type(resources) == list assert len(resources) == 2 assert type(resources[0]) == ComputeResource - assert resources[0].name == COMPUTE_RESOURCE_NAME + assert resources[0].name == COMPUTE_RESOURCE_NAME_GENERAL_COMPUTE + assert resources[0].size == 1 + assert resources[0].is_available == True + assert responses.assert_call_count( + compute_resources_endpoint, 1) is True + + @responses.activate + def test_get_compute_resources_filter_by_size(self, containers_service, compute_resources_endpoint): + # arrange - add response mock + responses.add( + responses.GET, + compute_resources_endpoint, + json=[COMPUTE_RESOURCES_DATA], + status=200 + ) + + # act + resources = containers_service.get_compute_resources(size=4) + + # assert + assert type(resources) == list + assert len(resources) == 1 + assert type(resources[0]) == ComputeResource + assert resources[0].name == COMPUTE_RESOURCE_NAME_H100 + assert resources[0].size == 4 + assert resources[0].is_available == True + assert responses.assert_call_count( + compute_resources_endpoint, 1) is True + + @responses.activate + def test_get_compute_resources_filter_by_availability(self, containers_service, compute_resources_endpoint): + # arrange - add response mock + responses.add( + responses.GET, + compute_resources_endpoint, + json=[COMPUTE_RESOURCES_DATA], + status=200 + ) + + # act + resources = containers_service.get_compute_resources(is_available=True) + + # assert + assert type(resources) == list + assert len(resources) == 2 + assert all(r.is_available == True for r in resources) + assert responses.assert_call_count( + compute_resources_endpoint, 1) is True + + @responses.activate + def test_get_compute_resources_filter_by_size_and_availability(self, containers_service, compute_resources_endpoint): + # arrange - add response mock + responses.add( + responses.GET, + compute_resources_endpoint, + json=[COMPUTE_RESOURCES_DATA], + status=200 + ) + + # act + resources = containers_service.get_compute_resources( + size=1, is_available=True) + + # assert + assert type(resources) == list + assert len(resources) == 1 + assert resources[0].name == COMPUTE_RESOURCE_NAME_GENERAL_COMPUTE assert resources[0].size == 1 assert resources[0].is_available == True assert responses.assert_call_count( From 1527c3c83cc92b390ce21d6396dcb926adb3b47a Mon Sep 17 00:00:00 2001 From: Tamir Date: Fri, 4 Apr 2025 10:29:07 +0300 Subject: [PATCH 13/22] wip added inference streaming option, improve example --- .../InferenceClient/inference_client.py | 86 ++++++++++++++++--- datacrunch/containers/containers.py | 12 ++- .../containers/sglang_deployment_example.py | 56 +++++++++--- 3 files changed, 127 insertions(+), 27 deletions(-) diff --git a/datacrunch/InferenceClient/inference_client.py b/datacrunch/InferenceClient/inference_client.py index b8028ec..5d81181 100644 --- a/datacrunch/InferenceClient/inference_client.py +++ b/datacrunch/InferenceClient/inference_client.py @@ -2,7 +2,7 @@ from dataclasses_json import dataclass_json, Undefined # type: ignore import requests from requests.structures import CaseInsensitiveDict -from typing import Optional, Dict, Any, Union +from typing import Optional, Dict, Any, Union, Generator from urllib.parse import urlparse @@ -14,10 +14,76 @@ class InferenceClientError(Exception): @dataclass_json(undefined=Undefined.EXCLUDE) @dataclass class InferenceResponse: - body: Any headers: CaseInsensitiveDict[str] status_code: int status_text: str + _original_response: requests.Response + _stream: bool = False + + def _is_stream_response(self, headers: CaseInsensitiveDict[str]) -> bool: + """Check if the response headers indicate a streaming response. + + Args: + headers: The response headers to check + + Returns: + bool: True if the response is likely a stream, False otherwise + """ + # Standard chunked transfer encoding + is_chunked_transfer = headers.get( + 'Transfer-Encoding', '').lower() == 'chunked' + # Server-Sent Events content type + is_event_stream = headers.get( + 'Content-Type', '').lower() == 'text/event-stream' + # NDJSON + is_ndjson = headers.get( + 'Content-Type', '').lower() == 'application/x-ndjson' + # Stream JSON + is_stream_json = headers.get( + 'Content-Type', '').lower() == 'application/stream+json' + # Keep-alive + is_keep_alive = headers.get( + 'Connection', '').lower() == 'keep-alive' + # No content length + has_no_content_length = 'Content-Length' not in headers + + # No Content-Length with keep-alive often suggests streaming (though not definitive) + is_keep_alive_and_no_content_length = is_keep_alive and has_no_content_length + + return (self._stream or is_chunked_transfer or is_event_stream or is_ndjson or + is_stream_json or is_keep_alive_and_no_content_length) + + def output(self, is_text: bool = False) -> Any: + try: + if is_text: + return self._original_response.text + return self._original_response.json() + except Exception as e: + # if the response is a stream (check headers), raise relevant error + if self._is_stream_response(self._original_response.headers): + raise InferenceClientError( + f"Response might be a stream, use the stream method instead") + raise InferenceClientError( + f"Failed to parse response as JSON: {str(e)}") + + def stream(self, chunk_size: int = 512, as_text: bool = True) -> Generator[Any, None, None]: + """Stream the response content. + + Args: + chunk_size: Size of chunks to stream, in bytes + as_text: If True, stream as text using iter_lines. If False, stream as binary using iter_content. + + Returns: + Generator yielding chunks of the response + """ + if as_text: + for chunk in self._original_response.iter_lines(chunk_size=chunk_size): + if chunk: + yield chunk + else: + for chunk in self._original_response.iter_content(chunk_size=chunk_size): + if chunk: + yield chunk @dataclass_json(undefined=Undefined.EXCLUDE) @@ -169,24 +235,24 @@ def _make_request(self, method: str, path: str, **kwargs) -> requests.Response: except requests.exceptions.RequestException as e: raise InferenceClientError(f"Request to {path} failed: {str(e)}") - def run_sync(self, data: Dict[str, Any], path: str = "", timeout_seconds: int = 60 * 5, headers: Optional[Dict[str, str]] = None): - response = self.post( - path, json=data, timeout_seconds=timeout_seconds, headers=headers) + def run_sync(self, data: Dict[str, Any], path: str = "", timeout_seconds: int = 60 * 5, headers: Optional[Dict[str, str]] = None, http_method: str = "POST", stream: bool = False): + response = self._make_request( + http_method, path, json=data, timeout_seconds=timeout_seconds, headers=headers, stream=stream) return InferenceResponse( - body=response.json(), headers=response.headers, status_code=response.status_code, - status_text=response.reason + status_text=response.reason, + _original_response=response ) - def run(self, data: Dict[str, Any], path: str = "", timeout_seconds: int = 60 * 5, headers: Optional[Dict[str, str]] = None): + def run(self, data: Dict[str, Any], path: str = "", timeout_seconds: int = 60 * 5, headers: Optional[Dict[str, str]] = None, http_method: str = "POST"): # Add the "Prefer: respond-async" header to the request, to indicate that the request is async headers = headers or {} headers['Prefer'] = 'respond-async' - response = self.post( - path, json=data, timeout_seconds=timeout_seconds, headers=headers) + response = self._make_request( + http_method, path, json=data, timeout_seconds=timeout_seconds, headers=headers) # TODO: this response format isn't final execution_id = response.json()['id'] diff --git a/datacrunch/containers/containers.py b/datacrunch/containers/containers.py index 1c0a169..f058adb 100644 --- a/datacrunch/containers/containers.py +++ b/datacrunch/containers/containers.py @@ -358,7 +358,7 @@ def _validate_inference_client(self) -> None: raise ValueError( "Inference client not initialized. Use from_dict_with_inference_key or set_inference_client to initialize inference capabilities.") - def run_sync(self, data: Dict[str, Any], path: str = "", timeout_seconds: int = 60 * 5, headers: Optional[Dict[str, str]] = None) -> InferenceResponse: + def run_sync(self, data: Dict[str, Any], path: str = "", timeout_seconds: int = 60 * 5, headers: Optional[Dict[str, str]] = None, http_method: str = "POST", stream: bool = False) -> InferenceResponse: """Runs a synchronous inference request. Args: @@ -366,6 +366,8 @@ def run_sync(self, data: Dict[str, Any], path: str = "", timeout_seconds: int = path: The endpoint path to send the request to. timeout_seconds: Maximum time to wait for the response. headers: Optional headers to include in the request. + http_method: The HTTP method to use for the request. + stream: Whether to stream the response. Returns: InferenceResponse: The response from the inference request. @@ -374,9 +376,9 @@ def run_sync(self, data: Dict[str, Any], path: str = "", timeout_seconds: int = ValueError: If the inference client is not initialized. """ self._validate_inference_client() - return self._inference_client.run_sync(data, path, timeout_seconds, headers) + return self._inference_client.run_sync(data, path, timeout_seconds, headers, http_method, stream) - def run(self, data: Dict[str, Any], path: str = "", timeout_seconds: int = 60 * 5, headers: Optional[Dict[str, str]] = None): + def run(self, data: Dict[str, Any], path: str = "", timeout_seconds: int = 60 * 5, headers: Optional[Dict[str, str]] = None, http_method: str = "POST", stream: bool = False): """Runs an asynchronous inference request. Args: @@ -384,6 +386,8 @@ def run(self, data: Dict[str, Any], path: str = "", timeout_seconds: int = 60 * path: The endpoint path to send the request to. timeout_seconds: Maximum time to wait for the response. headers: Optional headers to include in the request. + http_method: The HTTP method to use for the request. + stream: Whether to stream the response. Returns: The response from the inference request. @@ -392,7 +396,7 @@ def run(self, data: Dict[str, Any], path: str = "", timeout_seconds: int = 60 * ValueError: If the inference client is not initialized. """ self._validate_inference_client() - return self._inference_client.run(data, path, timeout_seconds, headers) + return self._inference_client.run(data, path, timeout_seconds, headers, http_method, stream) def health(self): """Checks the health of the deployed application. diff --git a/examples/containers/sglang_deployment_example.py b/examples/containers/sglang_deployment_example.py index c05b4fd..dcfc192 100644 --- a/examples/containers/sglang_deployment_example.py +++ b/examples/containers/sglang_deployment_example.py @@ -8,6 +8,7 @@ import time import signal import sys +import json from datetime import datetime from datacrunch import DataCrunchClient from datacrunch.exceptions import APIException @@ -33,9 +34,9 @@ # Configuration constants DEPLOYMENT_NAME = f"sglang-deployment-example-{CURRENT_TIMESTAMP}" -MODEL_PATH = "deepseek-ai/deepseek-llm-7b-chat" +SGLANG_IMAGE_URL = "docker.io/lmsysorg/sglang:v0.4.1.post6-cu124" +DEEPSEEK_MODEL_PATH = "deepseek-ai/deepseek-llm-7b-chat" HF_SECRET_NAME = "huggingface-token" -IMAGE_URL = "docker.io/lmsysorg/sglang:v0.4.1.post6-cu124" # Get confidential values from environment variables DATACRUNCH_CLIENT_ID = os.environ.get('DATACRUNCH_CLIENT_ID') @@ -140,18 +141,19 @@ def graceful_shutdown(signum, frame) -> None: sys.exit(1) # Create container configuration + APP_PORT = 30000 container = Container( - image=IMAGE_URL, - exposed_port=30000, + image=SGLANG_IMAGE_URL, + exposed_port=APP_PORT, healthcheck=HealthcheckSettings( enabled=True, - port=30000, + port=APP_PORT, path="/health" ), entrypoint_overrides=EntrypointOverridesSettings( enabled=True, cmd=["python3", "-m", "sglang.launch_server", "--model-path", - MODEL_PATH, "--host", "0.0.0.0", "--port", "30000"] + DEEPSEEK_MODEL_PATH, "--host", "0.0.0.0", "--port", str(APP_PORT)] ), env=[ EnvVar( @@ -162,16 +164,19 @@ def graceful_shutdown(signum, frame) -> None: ] ) - # Create scaling configuration - default values + # Create scaling configuration scaling_options = ScalingOptions( min_replica_count=1, - max_replica_count=2, - scale_down_policy=ScalingPolicy(delay_seconds=300), - scale_up_policy=ScalingPolicy(delay_seconds=300), + max_replica_count=5, + scale_down_policy=ScalingPolicy(delay_seconds=60 * 5), + scale_up_policy=ScalingPolicy( + delay_seconds=0), # No delay for scale up queue_message_ttl_seconds=500, - concurrent_requests_per_replica=1, + # Modern LLM engines are optimized for batching requests, with minimal performance impact. Taking advantage of batching can significantly improve throughput. + concurrent_requests_per_replica=32, scaling_triggers=ScalingTriggers( - queue_load=QueueLoadScalingTrigger(threshold=1), + # lower value means more aggressive scaling + queue_load=QueueLoadScalingTrigger(threshold=0.1), cpu_utilization=UtilizationScalingTrigger( enabled=True, threshold=90 @@ -224,7 +229,7 @@ def graceful_shutdown(signum, frame) -> None: # Test completions endpoint print("\nTesting completions API...") completions_data = { - "model": MODEL_PATH, + "model": DEEPSEEK_MODEL_PATH, "prompt": "Is consciousness fundamentally computational, or is there something more to subjective experience that cannot be reduced to information processing?", "max_tokens": 128, "temperature": 0.7, @@ -239,6 +244,31 @@ def graceful_shutdown(signum, frame) -> None: print("Completions API is working!") print(f"Response: {completions_response}") + # Make a stream sync inference request to the SGLang server + completions_response_stream = created_deployment.run_sync( + completions_data, + path="/v1/completions", + stream=True + ) + print("Stream completions API is working!") + # Print the streamed response + for line in completions_response_stream.stream(as_text=True): + if line: + line = line.decode('utf-8') + + if line.startswith('data:'): + data = line[5:] # Remove 'data: ' prefix + if data == '[DONE]': + break + try: + event_data = json.loads(data) + token_text = event_data['choices'][0]['text'] + + # Print token immediately to show progress + print(token_text, end='', flush=True) + except json.JSONDecodeError: + continue + except Exception as e: print(f"Error testing deployment: {e}") From 530f5902cbe47eb2eda9bdb977f6a95d836f36b9 Mon Sep 17 00:00:00 2001 From: Tamir Date: Fri, 4 Apr 2025 11:12:05 +0300 Subject: [PATCH 14/22] fixed stream in example --- examples/containers/sglang_deployment_example.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/containers/sglang_deployment_example.py b/examples/containers/sglang_deployment_example.py index dcfc192..7feb8b3 100644 --- a/examples/containers/sglang_deployment_example.py +++ b/examples/containers/sglang_deployment_example.py @@ -242,11 +242,11 @@ def graceful_shutdown(signum, frame) -> None: path="/v1/completions", ) print("Completions API is working!") - print(f"Response: {completions_response}") + print(f"Response: {completions_response.output()}\n") # Make a stream sync inference request to the SGLang server completions_response_stream = created_deployment.run_sync( - completions_data, + {**completions_data, "stream": True}, path="/v1/completions", stream=True ) From 4dc5ddbaed6e0c10dddde553cbaaa47262ede05b Mon Sep 17 00:00:00 2001 From: Tamir Date: Fri, 4 Apr 2025 15:26:36 +0300 Subject: [PATCH 15/22] added default value to container registry, improved examples --- datacrunch/containers/containers.py | 5 +- .../containers/sglang_deployment_example.py | 13 +- .../update_deployment_scaling_example.py | 177 +++++++++--------- 3 files changed, 93 insertions(+), 102 deletions(-) diff --git a/datacrunch/containers/containers.py b/datacrunch/containers/containers.py index 3983a3c..0ec28ae 100644 --- a/datacrunch/containers/containers.py +++ b/datacrunch/containers/containers.py @@ -4,7 +4,7 @@ creation, updates, deletion, and monitoring of containerized applications. """ -from dataclasses import dataclass +from dataclasses import dataclass, field from dataclasses_json import dataclass_json, Undefined # type: ignore from typing import List, Optional, Dict, Any from enum import Enum @@ -284,9 +284,10 @@ class Deployment: """ name: str - container_registry_settings: ContainerRegistrySettings containers: List[Container] compute: ComputeResource + container_registry_settings: ContainerRegistrySettings = field( + default_factory=lambda: ContainerRegistrySettings(is_private=False)) is_spot: bool = False endpoint_base_url: Optional[str] = None scaling: Optional[ScalingOptions] = None diff --git a/examples/containers/sglang_deployment_example.py b/examples/containers/sglang_deployment_example.py index 7feb8b3..70a30a4 100644 --- a/examples/containers/sglang_deployment_example.py +++ b/examples/containers/sglang_deployment_example.py @@ -41,7 +41,7 @@ # Get confidential values from environment variables DATACRUNCH_CLIENT_ID = os.environ.get('DATACRUNCH_CLIENT_ID') DATACRUNCH_CLIENT_SECRET = os.environ.get('DATACRUNCH_CLIENT_SECRET') -INFERENCE_KEY = os.environ.get('INFERENCE_KEY') +DATACRUNCH_INFERENCE_KEY = os.environ.get('DATACRUNCH_INFERENCE_KEY') HF_TOKEN = os.environ.get('HF_TOKEN') @@ -99,7 +99,7 @@ def graceful_shutdown(signum, frame) -> None: try: # Get the inference API key - inference_key = INFERENCE_KEY + inference_key = DATACRUNCH_INFERENCE_KEY if not inference_key: inference_key = input( "Enter your Inference API Key from the DataCrunch dashboard: ") @@ -188,15 +188,12 @@ def graceful_shutdown(signum, frame) -> None: ) ) - # Create registry and compute settings - registry_settings = ContainerRegistrySettings(is_private=False) - # For a 7B model, General Compute (24GB VRAM) is sufficient + # Set compute settings. For a 7B model, General Compute (24GB VRAM) is sufficient compute = ComputeResource(name="General Compute", size=1) - # Create deployment object + # Create deployment object (no need to provide container_registry_settings because it's public) deployment = Deployment( name=DEPLOYMENT_NAME, - container_registry_settings=registry_settings, containers=[container], compute=compute, scaling=scaling_options, @@ -207,7 +204,7 @@ def graceful_shutdown(signum, frame) -> None: created_deployment = datacrunch.containers.create_deployment( deployment) print(f"Created deployment: {created_deployment.name}") - print("This will take several minutes while the model is downloaded and the server starts...") + print("This could take several minutes while the model is downloaded and the server starts...") # Wait for deployment to be healthy if not wait_for_deployment_health(datacrunch, DEPLOYMENT_NAME): diff --git a/examples/containers/update_deployment_scaling_example.py b/examples/containers/update_deployment_scaling_example.py index d06f9d4..1a3cd65 100644 --- a/examples/containers/update_deployment_scaling_example.py +++ b/examples/containers/update_deployment_scaling_example.py @@ -15,106 +15,99 @@ UtilizationScalingTrigger ) -# Configuration - replace with your deployment name -DEPLOYMENT_NAME = "my-deployment" -# Get client secret and id from environment variables +# Get deployment name, client secret and id from environment variables +DEPLOYMENT_NAME = os.environ.get('DATACRUNCH_DEPLOYMENT_NAME') DATACRUNCH_CLIENT_ID = os.environ.get('DATACRUNCH_CLIENT_ID') DATACRUNCH_CLIENT_SECRET = os.environ.get('DATACRUNCH_CLIENT_SECRET') - -def check_deployment_exists(client: DataCrunchClient, deployment_name: str) -> bool: - """Check if a deployment exists. - - Args: - client: DataCrunch API client - deployment_name: Name of the deployment to check - - Returns: - bool: True if deployment exists, False otherwise - """ - try: - client.containers.get_deployment_by_name(deployment_name) - return True - except APIException as e: - print(f"Error: {e}") - return False - - -def update_deployment_scaling(client: DataCrunchClient, deployment_name: str) -> None: - """Update scaling options using the dedicated scaling options API. - - Args: - client: DataCrunch API client - deployment_name: Name of the deployment to update - """ - try: - # Create scaling options using ScalingOptions dataclass - scaling_options = ScalingOptions( - min_replica_count=1, - max_replica_count=5, - scale_down_policy=ScalingPolicy( - delay_seconds=600), # Longer cooldown period - scale_up_policy=ScalingPolicy(delay_seconds=60), # Quick scale-up - queue_message_ttl_seconds=500, - concurrent_requests_per_replica=1, - scaling_triggers=ScalingTriggers( - queue_load=QueueLoadScalingTrigger(threshold=1.0), - cpu_utilization=UtilizationScalingTrigger( - enabled=True, - threshold=75 - ), - gpu_utilization=UtilizationScalingTrigger( - enabled=False # Disable GPU utilization trigger - ) +# Initialize client +datacrunch = DataCrunchClient(DATACRUNCH_CLIENT_ID, DATACRUNCH_CLIENT_SECRET) + +try: + # Get current scaling options + scaling_options = datacrunch.containers.get_deployment_scaling_options( + DEPLOYMENT_NAME) + + print(f"Current scaling configuration:\n") + print(f"Min replicas: {scaling_options.min_replica_count}") + print(f"Max replicas: {scaling_options.max_replica_count}") + print( + f"Scale-up delay: {scaling_options.scale_up_policy.delay_seconds} seconds") + print( + f"Scale-down delay: {scaling_options.scale_down_policy.delay_seconds} seconds") + print( + f"Queue message TTL: {scaling_options.queue_message_ttl_seconds} seconds") + print( + f"Concurrent requests per replica: {scaling_options.concurrent_requests_per_replica}") + print("Scaling Triggers:") + print( + f" Queue load threshold: {scaling_options.scaling_triggers.queue_load.threshold}") + if scaling_options.scaling_triggers.cpu_utilization: + print( + f" CPU utilization enabled: {scaling_options.scaling_triggers.cpu_utilization.enabled}") + print( + f" CPU utilization threshold: {scaling_options.scaling_triggers.cpu_utilization.threshold}%") + if scaling_options.scaling_triggers.gpu_utilization: + print( + f" GPU utilization enabled: {scaling_options.scaling_triggers.gpu_utilization.enabled}") + if scaling_options.scaling_triggers.gpu_utilization.threshold: + print( + f" GPU utilization threshold: {scaling_options.scaling_triggers.gpu_utilization.threshold}%") + + # Create scaling options using ScalingOptions dataclass + scaling_options = ScalingOptions( + min_replica_count=1, + max_replica_count=5, + scale_down_policy=ScalingPolicy( + delay_seconds=600), # Longer cooldown period + scale_up_policy=ScalingPolicy(delay_seconds=0), # Quick scale-up + queue_message_ttl_seconds=500, + concurrent_requests_per_replica=50, # LLMs can handle concurrent requests + scaling_triggers=ScalingTriggers( + queue_load=QueueLoadScalingTrigger(threshold=1.0), + cpu_utilization=UtilizationScalingTrigger( + enabled=True, + threshold=75 + ), + gpu_utilization=UtilizationScalingTrigger( + enabled=False # Disable GPU utilization trigger ) ) - - # Update scaling options - updated_options = client.containers.update_deployment_scaling_options( - deployment_name, scaling_options) - print(f"Updated deployment scaling options") - print(f"New min replicas: {updated_options.min_replica_count}") - print(f"New max replicas: {updated_options.max_replica_count}") + ) + + # Update scaling options + updated_options = datacrunch.containers.update_deployment_scaling_options( + DEPLOYMENT_NAME, scaling_options) + + print(f"\nUpdated scaling configuration:\n") + print(f"Min replicas: {updated_options.min_replica_count}") + print(f"Max replicas: {updated_options.max_replica_count}") + print( + f"Scale-up delay: {updated_options.scale_up_policy.delay_seconds} seconds") + print( + f"Scale-down delay: {updated_options.scale_down_policy.delay_seconds} seconds") + print( + f"Queue message TTL: {updated_options.queue_message_ttl_seconds} seconds") + print( + f"Concurrent requests per replica: {updated_options.concurrent_requests_per_replica}") + print("Scaling Triggers:") + print( + f" Queue load threshold: {updated_options.scaling_triggers.queue_load.threshold}") + if updated_options.scaling_triggers.cpu_utilization: print( - f"CPU utilization trigger enabled: {updated_options.scaling_triggers.cpu_utilization.enabled}") + f" CPU utilization enabled: {updated_options.scaling_triggers.cpu_utilization.enabled}") print( - f"CPU utilization threshold: {updated_options.scaling_triggers.cpu_utilization.threshold}%") - except APIException as e: - print(f"Error updating scaling options: {e}") - - -def main() -> None: - """Main function demonstrating scaling updates.""" - try: - # Initialize client - datacrunch = DataCrunchClient( - DATACRUNCH_CLIENT_ID, DATACRUNCH_CLIENT_SECRET) - - # Verify deployment exists - if not check_deployment_exists(datacrunch, DEPLOYMENT_NAME): - print(f"Deployment {DEPLOYMENT_NAME} does not exist.") - return - - # Update scaling options using the API - update_deployment_scaling(datacrunch, DEPLOYMENT_NAME) - - # Get current scaling options - scaling_options = datacrunch.containers.get_deployment_scaling_options( - DEPLOYMENT_NAME) - print(f"\nCurrent scaling configuration:") - print(f"Min replicas: {scaling_options.min_replica_count}") - print(f"Max replicas: {scaling_options.max_replica_count}") - print( - f"Scale-up delay: {scaling_options.scale_up_policy.delay_seconds} seconds") + f" CPU utilization threshold: {updated_options.scaling_triggers.cpu_utilization.threshold}%") + if updated_options.scaling_triggers.gpu_utilization: print( - f"Scale-down delay: {scaling_options.scale_down_policy.delay_seconds} seconds") - - print("\nScaling update completed successfully.") - - except Exception as e: - print(f"Unexpected error: {e}") + f" GPU utilization enabled: {updated_options.scaling_triggers.gpu_utilization.enabled}") + if updated_options.scaling_triggers.gpu_utilization.threshold: + print( + f" GPU utilization threshold: {updated_options.scaling_triggers.gpu_utilization.threshold}%") -if __name__ == "__main__": - main() +except APIException as e: + print(f"Error updating scaling options: {e}") +except Exception as e: + print(f"Unexpected error: {e}") From a744acbde8c0af98ba291bffad80faa369f8f5e9 Mon Sep 17 00:00:00 2001 From: Tamir Date: Mon, 7 Apr 2025 10:41:23 +0300 Subject: [PATCH 16/22] wip async inference --- .../InferenceClient/inference_client.py | 78 ++++++++++++------- .../container_deployments_example.py | 2 +- .../containers/sglang_deployment_example.py | 2 +- .../update_deployment_scaling_example.py | 2 +- 4 files changed, 55 insertions(+), 29 deletions(-) diff --git a/datacrunch/InferenceClient/inference_client.py b/datacrunch/InferenceClient/inference_client.py index 5d81181..6d1826e 100644 --- a/datacrunch/InferenceClient/inference_client.py +++ b/datacrunch/InferenceClient/inference_client.py @@ -86,29 +86,6 @@ def stream(self, chunk_size: int = 512, as_text: bool = True) -> Generator[Any, yield chunk -@dataclass_json(undefined=Undefined.EXCLUDE) -@dataclass -class AsyncInferenceExecution: - _client: 'InferenceClient' - id: str - status: str # TODO: add a status enum - - # TODO: Implement when the status endpoint is done - def status(self) -> str: - # Call the status endpoint and update the status when - return self.status - - # TODO: Implement when the cancel inference execution endpoint is done - # def cancel(self) -> None: - # pass - - # TODO: Implement when the results endpoint is done - def get_results(self) -> Dict[str, Any]: - pass - # alias for get_results - output = get_results - - class InferenceClient: def __init__(self, inference_key: str, endpoint_base_url: str, timeout_seconds: int = 60 * 5) -> None: """ @@ -131,6 +108,10 @@ def __init__(self, inference_key: str, endpoint_base_url: str, timeout_seconds: self.inference_key = inference_key self.endpoint_base_url = endpoint_base_url.rstrip('/') + self.base_domain = self.endpoint_base_url[:self.endpoint_base_url.rindex( + '/')] + self.deployment_name = self.endpoint_base_url[self.endpoint_base_url.rindex( + '/')+1:] self.timeout_seconds = timeout_seconds self._session = requests.Session() self._global_headers = { @@ -246,10 +227,17 @@ def run_sync(self, data: Dict[str, Any], path: str = "", timeout_seconds: int = _original_response=response ) - def run(self, data: Dict[str, Any], path: str = "", timeout_seconds: int = 60 * 5, headers: Optional[Dict[str, str]] = None, http_method: str = "POST"): - # Add the "Prefer: respond-async" header to the request, to indicate that the request is async + def run(self, data: Dict[str, Any], path: str = "", timeout_seconds: int = 60 * 5, headers: Optional[Dict[str, str]] = None, http_method: str = "POST", no_response: bool = False): + # Add relevant headers to the request, to indicate that the request is async headers = headers or {} - headers['Prefer'] = 'respond-async' + if no_response: + # If no_response is True, use the "Prefer: respond-async-proxy" header to run async and don't wait for the response + headers['Prefer'] = 'respond-async-proxy' + self._make_request( + http_method, path, json=data, timeout_seconds=timeout_seconds, headers=headers) + return + # Add the "Prefer: async-inference" header to the request, to run async and wait for the response + headers['Prefer'] = 'async-inference' response = self._make_request( http_method, path, json=data, timeout_seconds=timeout_seconds, headers=headers) @@ -297,3 +285,41 @@ def health(self, healthcheck_path: str = "/health") -> requests.Response: return self.get(healthcheck_path) except InferenceClientError as e: raise InferenceClientError(f"Health check failed: {str(e)}") + + +@dataclass_json(undefined=Undefined.EXCLUDE) +@dataclass +class AsyncInferenceExecution: + _inference_client: 'InferenceClient' + id: str + _status: str # TODO: add a status enum? + INFERENCE_ID_HEADER = 'X-Inference-Id' + + def status(self) -> Dict[str, Any]: + """Get the current status of the async inference execution. + + Returns: + Dict[str, Any]: The status response containing the execution status and other metadata + """ + url = f'{self._inference_client.base_domain}/status/{self._inference_client.deployment_name}' + response = self._inference_client._session.get( + url, headers={self.INFERENCE_ID_HEADER: self.id, **self._inference_client._global_headers}) + + response_json = response.json() + self._status = response_json['status'] + + return response_json + + def result(self) -> Dict[str, Any]: + """Get the results of the async inference execution. + + Returns: + Dict[str, Any]: The results of the inference execution + """ + url = f'{self._inference_client.base_domain}/results/{self._inference_client.deployment_name}' + response = self._inference_client._session.get( + url, headers={self.INFERENCE_ID_HEADER: self.id, **self._inference_client._global_headers}) + + return response + # alias for get_results + output = result diff --git a/examples/containers/container_deployments_example.py b/examples/containers/container_deployments_example.py index cdc7652..38a4877 100644 --- a/examples/containers/container_deployments_example.py +++ b/examples/containers/container_deployments_example.py @@ -9,7 +9,7 @@ from datacrunch import DataCrunchClient from datacrunch.exceptions import APIException -from datacrunch.containers.containers import ( +from datacrunch.containers import ( Container, ComputeResource, EnvVar, diff --git a/examples/containers/sglang_deployment_example.py b/examples/containers/sglang_deployment_example.py index 70a30a4..c70b997 100644 --- a/examples/containers/sglang_deployment_example.py +++ b/examples/containers/sglang_deployment_example.py @@ -12,7 +12,7 @@ from datetime import datetime from datacrunch import DataCrunchClient from datacrunch.exceptions import APIException -from datacrunch.containers.containers import ( +from datacrunch.containers import ( Container, ComputeResource, ScalingOptions, diff --git a/examples/containers/update_deployment_scaling_example.py b/examples/containers/update_deployment_scaling_example.py index 1a3cd65..cd31d6e 100644 --- a/examples/containers/update_deployment_scaling_example.py +++ b/examples/containers/update_deployment_scaling_example.py @@ -7,7 +7,7 @@ from datacrunch import DataCrunchClient from datacrunch.exceptions import APIException -from datacrunch.containers.containers import ( +from datacrunch.containers import ( ScalingOptions, ScalingPolicy, ScalingTriggers, From f19a236ac6bcf4ace6b987fa9d0dc4f61c0ea9ce Mon Sep 17 00:00:00 2001 From: Tamir Date: Mon, 7 Apr 2025 13:35:46 +0300 Subject: [PATCH 17/22] use _build_request_headers method --- datacrunch/InferenceClient/inference_client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datacrunch/InferenceClient/inference_client.py b/datacrunch/InferenceClient/inference_client.py index 6d1826e..b024ae1 100644 --- a/datacrunch/InferenceClient/inference_client.py +++ b/datacrunch/InferenceClient/inference_client.py @@ -303,7 +303,7 @@ def status(self) -> Dict[str, Any]: """ url = f'{self._inference_client.base_domain}/status/{self._inference_client.deployment_name}' response = self._inference_client._session.get( - url, headers={self.INFERENCE_ID_HEADER: self.id, **self._inference_client._global_headers}) + url, headers=self._inference_client._build_request_headers({self.INFERENCE_ID_HEADER: self.id})) response_json = response.json() self._status = response_json['status'] @@ -318,7 +318,7 @@ def result(self) -> Dict[str, Any]: """ url = f'{self._inference_client.base_domain}/results/{self._inference_client.deployment_name}' response = self._inference_client._session.get( - url, headers={self.INFERENCE_ID_HEADER: self.id, **self._inference_client._global_headers}) + url, headers=self._inference_client._build_request_headers({self.INFERENCE_ID_HEADER: self.id})) return response # alias for get_results From bf3d18f0bf3f438a4952566b42eb4f1b67b4f97a Mon Sep 17 00:00:00 2001 From: Tamir Date: Mon, 7 Apr 2025 13:45:31 +0300 Subject: [PATCH 18/22] changelog --- CHANGELOG.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 2d10c71..cf04d50 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,6 +1,8 @@ Changelog ========= +* Added inference client to run inference requests and get status and results + v1.9.0 (2025-04-04) ------------------- From 86cec409c21b2984ef5acddf76616b6270848f0c Mon Sep 17 00:00:00 2001 From: Jaakko Varjo Date: Fri, 11 Apr 2025 16:11:24 +0300 Subject: [PATCH 19/22] async inference --- .DS_Store | Bin 0 -> 6148 bytes README.md | 25 ++++++++-- .../InferenceClient/inference_client.py | 44 ++++++++++++------ .../calling_the_endpoint_asynchronously.py | 43 +++++++++++++++++ .../calling_the_endpoint_synchronously.py | 6 +-- .../containers/delete_deployment_example.py | 18 +++++++ .../containers/sglang_deployment_example.py | 13 +++--- 7 files changed, 121 insertions(+), 28 deletions(-) create mode 100644 .DS_Store create mode 100644 examples/containers/calling_the_endpoint_asynchronously.py create mode 100644 examples/containers/delete_deployment_example.py diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..55388e3fb67a9c7aedd9f4de645e10b63e42c40c GIT binary patch literal 6148 zcmeHLu};G<5Iwg+M1`RvVv6_xL{%1+DpchQx-mgp+8{Ngf>vTe`2-e5Rz3qUu`{wU zG9bai!o)k<4R({34FU2=zUTPve6O!c+?0q!bJD93Rf)*QVDu_*1;%k6IV(IJg7x^xOg zC&0*N^|~=H&*XNsKbUeW+|y+EU9QDbq|DO;&jj4gY;OGs{QRuu=z$xGq$Q0n`Lx8~;YD^W%M+aO9mmK#P*)Xqa3^axs zQ-!h=d9oC!N`-#K5UL#O0jC2)jj2LaPC}KF(4B>TLlLq&>H?WgB2?(SQ@|-;D^OOK zRX+ds=D+{jMefQe;1u{*3J9;&ZZ+|d^x2yGaD3JV7zY?^oR=z;C2;yUmJ>dT`F{mz Zhy^?Wh8k0aSb@180#XL&oB}_pz$f7d%QpZ3 literal 0 HcmV?d00001 diff --git a/README.md b/README.md index a490856..8d5ee46 100644 --- a/README.md +++ b/README.md @@ -24,26 +24,41 @@ DataCrunch's Public API documentation [is available here](https://api.datacrunch - Generate your client credentials - [instructions in the public API docs](https://api.datacrunch.io/v1/docs#description/quick-start-guide). -- Add the client secret to an environment variable (don't want it to be hardcoded): + +- Add your client id and client secret to an environment variable (don't want it to be hardcoded): Linux (bash): ```bash - export DATACRUNCH_CLIENT_SECRET=Z4CZq02rdwdB7ISV0k4Z2gtwAFKiyvr2U1l0KDIeYi + export DATACRUNCH_CLIENT_ID=YOUR_ID_HERE + export DATACRUNCH_CLIENT_SECRET=YOUR_SECRET_HERE ``` +- To enable sending inference requests from SDK you must generate an inference key - [Instructions on inference authorization](https://docs.datacrunch.io/inference/authorization) + + +- Add your inference key to an environment variable + + Linux (bash): + + ```bash + export DATACRUNCH_INFERENCE_KEY=YOUR_API_KEY_HERE + ``` + Other platforms: https://en.wikipedia.org/wiki/Environment_variable + + - Example for creating a new instance: ```python import os from datacrunch import DataCrunchClient - # Get client secret from environment variable + # Get credentials from environment variables + CLIENT_ID = os.environ.get('DATACRUNCH_CLIENT_ID') CLIENT_SECRET = os.environ['DATACRUNCH_CLIENT_SECRET'] - CLIENT_ID = 'Ibk5bdxV64lKAWOqYnvSi' # Create datcrunch client datacrunch = DataCrunchClient(CLIENT_ID, CLIENT_SECRET) @@ -118,7 +133,7 @@ Create this file in the root directory of the project: from datacrunch.datacrunch import DataCrunchClient CLIENT_SECRET = 'secret' -CLIENT_ID = 'Ibk5bdxV64lKAWOqYnvSi' +CLIENT_ID = 'your-id' # Create datcrunch client datacrunch = DataCrunchClient(CLIENT_ID, CLIENT_SECRET, base_url='http://localhost:3001/v1') diff --git a/datacrunch/InferenceClient/inference_client.py b/datacrunch/InferenceClient/inference_client.py index b024ae1..e8c9c8f 100644 --- a/datacrunch/InferenceClient/inference_client.py +++ b/datacrunch/InferenceClient/inference_client.py @@ -4,12 +4,17 @@ from requests.structures import CaseInsensitiveDict from typing import Optional, Dict, Any, Union, Generator from urllib.parse import urlparse - +from enum import Enum class InferenceClientError(Exception): """Base exception for InferenceClient errors.""" pass +class AsyncStatus(int, Enum): + Initialized = 0 + Queue = 1 + Inference = 2 + Completed = 3 @dataclass_json(undefined=Undefined.EXCLUDE) @dataclass @@ -236,16 +241,16 @@ def run(self, data: Dict[str, Any], path: str = "", timeout_seconds: int = 60 * self._make_request( http_method, path, json=data, timeout_seconds=timeout_seconds, headers=headers) return - # Add the "Prefer: async-inference" header to the request, to run async and wait for the response - headers['Prefer'] = 'async-inference' + # Add the "Prefer: respond-async" header to the request, to run async and wait for the response + headers['Prefer'] = 'respond-async' response = self._make_request( http_method, path, json=data, timeout_seconds=timeout_seconds, headers=headers) - # TODO: this response format isn't final - execution_id = response.json()['id'] + result = response.json() + execution_id = result['Id'] - return AsyncInferenceExecution(self, execution_id) + return AsyncInferenceExecution(self, execution_id, AsyncStatus.Initialized) def get(self, path: str, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, timeout_seconds: Optional[int] = None) -> requests.Response: return self._make_request('GET', path, params=params, headers=headers, timeout_seconds=timeout_seconds) @@ -292,11 +297,20 @@ def health(self, healthcheck_path: str = "/health") -> requests.Response: class AsyncInferenceExecution: _inference_client: 'InferenceClient' id: str - _status: str # TODO: add a status enum? + _status: AsyncStatus INFERENCE_ID_HEADER = 'X-Inference-Id' - def status(self) -> Dict[str, Any]: - """Get the current status of the async inference execution. + def status(self) -> AsyncStatus: + """Get the current stored status of the async inference execution. Only the status value type + + Returns: + AsyncStatus: The status object + """ + + return self._status + + def status_json(self) -> Dict[str, Any]: + """Get the current status of the async inference execution. Return the status json Returns: Dict[str, Any]: The status response containing the execution status and other metadata @@ -306,20 +320,24 @@ def status(self) -> Dict[str, Any]: url, headers=self._inference_client._build_request_headers({self.INFERENCE_ID_HEADER: self.id})) response_json = response.json() - self._status = response_json['status'] + self._status = AsyncStatus(response_json['Status']) return response_json - def result(self) -> Dict[str, Any]: + def result(self) -> Dict[str, Any] | str: """Get the results of the async inference execution. Returns: Dict[str, Any]: The results of the inference execution """ - url = f'{self._inference_client.base_domain}/results/{self._inference_client.deployment_name}' + url = f'{self._inference_client.base_domain}/result/{self._inference_client.deployment_name}' response = self._inference_client._session.get( url, headers=self._inference_client._build_request_headers({self.INFERENCE_ID_HEADER: self.id})) - return response + if response.headers['Content-Type'] == 'application/json': + return response.json() + else: + return response.text + # alias for get_results output = result diff --git a/examples/containers/calling_the_endpoint_asynchronously.py b/examples/containers/calling_the_endpoint_asynchronously.py new file mode 100644 index 0000000..7e713f8 --- /dev/null +++ b/examples/containers/calling_the_endpoint_asynchronously.py @@ -0,0 +1,43 @@ +import os +from time import sleep +from datacrunch import DataCrunchClient +from datacrunch.InferenceClient.inference_client import AsyncStatus + +# Configuration - replace with your deployment name +DEPLOYMENT_NAME = "sglang-deployment-example-20250411-160652" + +# Get client secret and id from environment variables +DATACRUNCH_CLIENT_ID = os.environ.get('DATACRUNCH_CLIENT_ID') +DATACRUNCH_CLIENT_SECRET = os.environ.get('DATACRUNCH_CLIENT_SECRET') +DATACRUNCH_INFERENCE_KEY = os.environ.get('DATACRUNCH_INFERENCE_KEY') + +# DataCrunch client instance +datacrunch = DataCrunchClient(DATACRUNCH_CLIENT_ID, DATACRUNCH_CLIENT_SECRET, inference_key=DATACRUNCH_INFERENCE_KEY) + +# Get the deployment +deployment = datacrunch.containers.get_deployment_by_name(DEPLOYMENT_NAME) + +# Make an asynchronous request to the endpoint. +# This example demonstrates calling a SGLang deployment which serves LLMs using an OpenAI-compatible API format +data = { + "model": "deepseek-ai/deepseek-llm-7b-chat", + "prompt": "Is consciousness fundamentally computational, or is there something more to subjective experience that cannot be reduced to information processing?", + "max_tokens": 128, + "temperature": 0.7, + "top_p": 0.9 +} + +header = { + "Content-Type": "application/json" +} + +response = deployment.run( + data=data, + path='v1/completions', + headers=header, +) + +while response.status() != AsyncStatus.Completed: + print(response.status_json()) + sleep(1) +print(response.output()) diff --git a/examples/containers/calling_the_endpoint_synchronously.py b/examples/containers/calling_the_endpoint_synchronously.py index 91fcba9..c65cca3 100644 --- a/examples/containers/calling_the_endpoint_synchronously.py +++ b/examples/containers/calling_the_endpoint_synchronously.py @@ -2,7 +2,7 @@ from datacrunch import DataCrunchClient # Configuration - replace with your deployment name -DEPLOYMENT_NAME = "sglang-deployment-example" +DEPLOYMENT_NAME = "sglang-deployment-example-20250411-160652" # Get client secret and id from environment variables DATACRUNCH_CLIENT_ID = os.environ.get('DATACRUNCH_CLIENT_ID') @@ -10,7 +10,7 @@ DATACRUNCH_INFERENCE_KEY = os.environ.get('DATACRUNCH_INFERENCE_KEY') # DataCrunch client instance -datacrunch = DataCrunchClient(DATACRUNCH_CLIENT_ID, DATACRUNCH_CLIENT_SECRET) +datacrunch = DataCrunchClient(DATACRUNCH_CLIENT_ID, DATACRUNCH_CLIENT_SECRET, inference_key=DATACRUNCH_INFERENCE_KEY) # Get the deployment deployment = datacrunch.containers.get_deployment_by_name(DEPLOYMENT_NAME) @@ -30,4 +30,4 @@ ) # wait for the response # Print the response -print(response.body) +print(response.output()) diff --git a/examples/containers/delete_deployment_example.py b/examples/containers/delete_deployment_example.py new file mode 100644 index 0000000..4a1c98c --- /dev/null +++ b/examples/containers/delete_deployment_example.py @@ -0,0 +1,18 @@ +"""Example script demonstrating deleting a deployment using the DataCrunch API. +""" + +import os +from datacrunch import DataCrunchClient + +DEPLOYMENT_NAME = "sglang-deployment-example-20250411-160652" + +# Get confidential values from environment variables +DATACRUNCH_CLIENT_ID = os.environ.get('DATACRUNCH_CLIENT_ID') +DATACRUNCH_CLIENT_SECRET = os.environ.get('DATACRUNCH_CLIENT_SECRET') + +# Initialize client with inference key +datacrunch = DataCrunchClient(DATACRUNCH_CLIENT_ID, DATACRUNCH_CLIENT_SECRET) + +# Register signal handlers for cleanup +datacrunch.containers.delete_deployment(DEPLOYMENT_NAME) +print("Deployment deleted") \ No newline at end of file diff --git a/examples/containers/sglang_deployment_example.py b/examples/containers/sglang_deployment_example.py index c70b997..32f5105 100644 --- a/examples/containers/sglang_deployment_example.py +++ b/examples/containers/sglang_deployment_example.py @@ -44,7 +44,6 @@ DATACRUNCH_INFERENCE_KEY = os.environ.get('DATACRUNCH_INFERENCE_KEY') HF_TOKEN = os.environ.get('HF_TOKEN') - def wait_for_deployment_health(datacrunch_client: DataCrunchClient, deployment_name: str, max_attempts: int = 20, delay: int = 30) -> bool: """Wait for deployment to reach healthy status. @@ -99,18 +98,18 @@ def graceful_shutdown(signum, frame) -> None: try: # Get the inference API key - inference_key = DATACRUNCH_INFERENCE_KEY - if not inference_key: - inference_key = input( + datacrunch_inference_key = DATACRUNCH_INFERENCE_KEY + if not datacrunch_inference_key: + datacrunch_inference_key = input( "Enter your Inference API Key from the DataCrunch dashboard: ") else: print("Using Inference API Key from environment") # Initialize client with inference key datacrunch = DataCrunchClient( - DATACRUNCH_CLIENT_ID, - DATACRUNCH_CLIENT_SECRET, - inference_key=inference_key + client_id=DATACRUNCH_CLIENT_ID, + client_secret=DATACRUNCH_CLIENT_SECRET, + inference_key=datacrunch_inference_key ) # Register signal handlers for cleanup From b62425122821583218e139ae9284f8ef22029587 Mon Sep 17 00:00:00 2001 From: Jaakko Varjo Date: Fri, 11 Apr 2025 16:13:46 +0300 Subject: [PATCH 20/22] .gitignore --- .DS_Store | Bin 6148 -> 0 bytes .gitignore | 2 ++ 2 files changed, 2 insertions(+) delete mode 100644 .DS_Store diff --git a/.DS_Store b/.DS_Store deleted file mode 100644 index 55388e3fb67a9c7aedd9f4de645e10b63e42c40c..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 6148 zcmeHLu};G<5Iwg+M1`RvVv6_xL{%1+DpchQx-mgp+8{Ngf>vTe`2-e5Rz3qUu`{wU zG9bai!o)k<4R({34FU2=zUTPve6O!c+?0q!bJD93Rf)*QVDu_*1;%k6IV(IJg7x^xOg zC&0*N^|~=H&*XNsKbUeW+|y+EU9QDbq|DO;&jj4gY;OGs{QRuu=z$xGq$Q0n`Lx8~;YD^W%M+aO9mmK#P*)Xqa3^axs zQ-!h=d9oC!N`-#K5UL#O0jC2)jj2LaPC}KF(4B>TLlLq&>H?WgB2?(SQ@|-;D^OOK zRX+ds=D+{jMefQe;1u{*3J9;&ZZ+|d^x2yGaD3JV7zY?^oR=z;C2;yUmJ>dT`F{mz Zhy^?Wh8k0aSb@180#XL&oB}_pz$f7d%QpZ3 diff --git a/.gitignore b/.gitignore index 34bc804..056a7ef 100644 --- a/.gitignore +++ b/.gitignore @@ -144,3 +144,5 @@ cython_debug/ # python sphinx docs _build/ testing.py + +.DS_Store \ No newline at end of file From ecf7da81d88785ef55260b028ba85ca211a17ee4 Mon Sep 17 00:00:00 2001 From: Jaakko Varjo Date: Fri, 11 Apr 2025 16:23:48 +0300 Subject: [PATCH 21/22] test parameter --- tests/unit_tests/conftest.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/unit_tests/conftest.py b/tests/unit_tests/conftest.py index dc5c1d3..413787d 100644 --- a/tests/unit_tests/conftest.py +++ b/tests/unit_tests/conftest.py @@ -6,6 +6,7 @@ BASE_URL = "https://api-testing.datacrunch.io/v1" ACCESS_TOKEN = "test-token" CLIENT_ID = "0123456789xyz" +CLIENT_SECRET = "0123456789xyz" @pytest.fixture @@ -15,5 +16,6 @@ def http_client(): auth_service.is_expired = Mock(return_value=True) auth_service.refresh = Mock(return_value=None) auth_service._client_id = CLIENT_ID + auth_service._client_secret = CLIENT_SECRET return HTTPClient(auth_service, BASE_URL) From 73f85caeb6ed04d9ccc0cafead4da4412b924b40 Mon Sep 17 00:00:00 2001 From: Jaakko Varjo Date: Fri, 11 Apr 2025 16:33:03 +0300 Subject: [PATCH 22/22] test fix --- datacrunch/InferenceClient/inference_client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datacrunch/InferenceClient/inference_client.py b/datacrunch/InferenceClient/inference_client.py index e8c9c8f..7835d35 100644 --- a/datacrunch/InferenceClient/inference_client.py +++ b/datacrunch/InferenceClient/inference_client.py @@ -324,7 +324,7 @@ def status_json(self) -> Dict[str, Any]: return response_json - def result(self) -> Dict[str, Any] | str: + def result(self) -> Dict[str, Any]: """Get the results of the async inference execution. Returns: @@ -337,7 +337,7 @@ def result(self) -> Dict[str, Any] | str: if response.headers['Content-Type'] == 'application/json': return response.json() else: - return response.text + return {'result': response.text} # alias for get_results output = result