Skip to content

Commit

Permalink
changed
Browse files Browse the repository at this point in the history
  • Loading branch information
venkatajagannath committed Jun 10, 2024
1 parent f1fe9af commit bde0caa
Showing 1 changed file with 9 additions and 5 deletions.
14 changes: 9 additions & 5 deletions anyscale_provider/operators/anyscale.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ def __init__(self,
super().__init__(**kwargs)
self.conn_id = conn_id

# Set up explicit parameters
self.service_params: Dict[str, Any] = {
'name': name,
'image_uri': image_uri,
Expand All @@ -243,9 +244,11 @@ def __init__(self,
self.canary_percent = canary_percent
self.max_surge_percent = max_surge_percent

# Ensure name is not empty
if not self.service_params['name']:
raise ValueError("Service name is required.")

# Ensure at least one application is specified
if not self.service_params['applications']:
raise ValueError("At least one application must be specified.")

Expand All @@ -262,6 +265,7 @@ def execute(self, context: Context) -> Optional[str]:
svc_config = ServiceConfig(**self.service_params)
self.log.info("Service with config object: {}".format(svc_config))

# Call the SDK method with the dynamically created service model
service_id = self.hook.deploy_service(config=svc_config,
in_place=self.in_place,
canary_percent=self.canary_percent,
Expand All @@ -280,13 +284,13 @@ def execute(self, context: Context) -> Optional[str]:

def execute_complete(self, context: Context, event: TriggerEvent) -> None:
self.log.info(f"Execution completed...")
service_id = event.payload["service_name"]
service_id = event["service_name"]

if event.payload["status"] == ServiceState.SYSTEM_FAILURE:
self.log.info(f"Anyscale service deployment {service_id} ended with status: {event.payload['status']}")
raise AirflowException(f"Job {service_id} failed with error {event.payload['message']}")
if event["status"] == ServiceState.SYSTEM_FAILURE:
self.log.info(f"Anyscale service deployment {service_id} ended with status: {event['status']}")
raise AirflowException(f"Job {service_id} failed with error {event['message']}")
else:
self.log.info(f"Anyscale service deployment {service_id} completed with status: {event.payload['status']}")
self.log.info(f"Anyscale service deployment {service_id} completed with status: {event['status']}")

return None

Expand Down

0 comments on commit bde0caa

Please sign in to comment.