diff --git a/anyscale_provider/__init__.py b/anyscale_provider/__init__.py index 93423b0..b71ea8b 100644 --- a/anyscale_provider/__init__.py +++ b/anyscale_provider/__init__.py @@ -1,9 +1,11 @@ +from __future__ import annotations + __version__ = "1.0.0" -from typing import Any, Dict +from typing import Any -def get_provider_info() -> Dict[str, Any]: +def get_provider_info() -> dict[str, Any]: return { "package-name": "astro-provider-anyscale", # Required "name": "Anyscale", # Required diff --git a/anyscale_provider/hooks/anyscale.py b/anyscale_provider/hooks/anyscale.py index e374ad1..d8d77fb 100644 --- a/anyscale_provider/hooks/anyscale.py +++ b/anyscale_provider/hooks/anyscale.py @@ -1,6 +1,8 @@ +from __future__ import annotations + import os import time -from typing import Any, Dict, Optional +from typing import Any from airflow.exceptions import AirflowException from airflow.hooks.base import BaseHook @@ -44,7 +46,7 @@ def __init__(self, conn_id: str = default_conn_name, **kwargs: Any) -> None: self.sdk = Anyscale(auth_token=token) @classmethod - def get_ui_field_behaviour(cls) -> Dict[str, Any]: + def get_ui_field_behaviour(cls) -> dict[str, Any]: """Return custom field behaviour for the connection form in the UI.""" return { "hidden_fields": ["schema", "port", "login"], @@ -66,8 +68,8 @@ def deploy_service( self, config: ServiceConfig, in_place: bool = False, - canary_percent: Optional[float] = None, - max_surge_percent: Optional[float] = None, + canary_percent: float | None = None, + max_surge_percent: float | None = None, ) -> str: """ Deploy a service to Anyscale. diff --git a/anyscale_provider/operators/anyscale.py b/anyscale_provider/operators/anyscale.py index 838321b..aad57bd 100644 --- a/anyscale_provider/operators/anyscale.py +++ b/anyscale_provider/operators/anyscale.py @@ -1,5 +1,7 @@ +from __future__ import annotations + import time -from typing import Any, Dict, List, Optional, Union +from typing import Any import anyscale from airflow.compat.functools import cached_property @@ -46,13 +48,13 @@ def __init__( conn_id: str, name: str, image_uri: str, - compute_config: Union[ComputeConfig, Dict[str, Any], str], + compute_config: ComputeConfig | dict[str, Any] | str, working_dir: str, entrypoint: str, - excludes: Optional[List[str]] = None, - requirements: Optional[Union[str, List[str]]] = None, - env_vars: Optional[Dict[str, str]] = None, - py_modules: Optional[List[str]] = None, + excludes: list[str] | None = None, + requirements: str | list[str] | None = None, + env_vars: dict[str, str] | None = None, + py_modules: list[str] | None = None, max_retries: int = 1, *args: Any, **kwargs: Any, @@ -70,9 +72,9 @@ def __init__( self.entrypoint = entrypoint self.max_retries = max_retries - self.job_id: Optional[str] = None + self.job_id: str | None = None - self.fields: Dict[str, Any] = { + self.fields: dict[str, Any] = { "name": name, "image_uri": image_uri, "compute_config": compute_config, @@ -103,7 +105,7 @@ def hook(self) -> AnyscaleHook: """Return an instance of the AnyscaleHook.""" return AnyscaleHook(conn_id=self.conn_id) - def execute(self, context: Context) -> Optional[str]: + def execute(self, context: Context) -> str | None: if not self.hook: self.log.info("SDK is not available.") @@ -198,29 +200,29 @@ def __init__( conn_id: str, name: str, image_uri: str, - compute_config: Union[ComputeConfig, Dict[str, Any], str], - applications: List[Dict[str, Any]], + compute_config: ComputeConfig | dict[str, Any] | str, + applications: list[dict[str, Any]], working_dir: str, - containerfile: Optional[str] = None, - excludes: Optional[List[str]] = None, - requirements: Optional[Union[str, List[str]]] = None, - env_vars: Optional[Dict[str, str]] = None, - py_modules: Optional[List[str]] = None, + containerfile: str | None = None, + excludes: list[str] | None = None, + requirements: str | list[str] | None = None, + env_vars: dict[str, str] | None = None, + py_modules: list[str] | None = None, query_auth_token_enabled: bool = False, - http_options: Optional[Dict[str, Any]] = None, - grpc_options: Optional[Dict[str, Any]] = None, - logging_config: Optional[Dict[str, Any]] = None, - ray_gcs_external_storage_config: Optional[Union[RayGCSExternalStorageConfig, Dict[str, Any]]] = None, + http_options: dict[str, Any] | None = None, + grpc_options: dict[str, Any] | None = None, + logging_config: dict[str, Any] | None = None, + ray_gcs_external_storage_config: RayGCSExternalStorageConfig | dict[str, Any] | None = None, in_place: bool = False, - canary_percent: Optional[float] = None, - max_surge_percent: Optional[float] = None, + canary_percent: float | None = None, + max_surge_percent: float | None = None, **kwargs: Any, ) -> None: super().__init__(**kwargs) self.conn_id = conn_id # Set up explicit parameters - self.service_params: Dict[str, Any] = { + self.service_params: dict[str, Any] = { "name": name, "image_uri": image_uri, "containerfile": containerfile, @@ -255,7 +257,7 @@ def hook(self) -> AnyscaleHook: """Return an instance of the AnyscaleHook.""" return AnyscaleHook(conn_id=self.conn_id) - def execute(self, context: Context) -> Optional[str]: + def execute(self, context: Context) -> str | None: if not self.hook: self.log.info(f"SDK is not available...") raise AirflowException("SDK is not available") diff --git a/anyscale_provider/triggers/anyscale.py b/anyscale_provider/triggers/anyscale.py index 0e820bc..d83f8db 100644 --- a/anyscale_provider/triggers/anyscale.py +++ b/anyscale_provider/triggers/anyscale.py @@ -1,7 +1,9 @@ +from __future__ import annotations + import asyncio import time from functools import partial -from typing import Any, AsyncIterator, Dict, Optional, Tuple +from typing import Any, AsyncIterator from airflow.compat.functools import cached_property from airflow.triggers.base import BaseTrigger, TriggerEvent @@ -46,7 +48,7 @@ def hook(self) -> AnyscaleHook: """Return an instance of the AnyscaleHook.""" return AnyscaleHook(conn_id=self.conn_id) - def serialize(self) -> Tuple[str, Dict[str, Any]]: + def serialize(self) -> tuple[str, dict[str, Any]]: return ( "anyscale_provider.triggers.anyscale.AnyscaleJobTrigger", { @@ -140,7 +142,7 @@ def __init__( conn_id: str, service_name: str, expected_state: str, - canary_percent: Optional[float], + canary_percent: float | None, poll_interval: int = 60, timeout: int = 600, ): @@ -158,7 +160,7 @@ def hook(self) -> AnyscaleHook: """Return an instance of the AnyscaleHook.""" return AnyscaleHook(conn_id=self.conn_id) - def serialize(self) -> Tuple[str, Dict[str, Any]]: + def serialize(self) -> tuple[str, dict[str, Any]]: return ( "anyscale_provider.triggers.anyscale.AnyscaleServiceTrigger", {