Skip to content

Commit

Permalink
Readme updated
Browse files Browse the repository at this point in the history
  • Loading branch information
venkatajagannath committed Jun 13, 2024
1 parent 03e3999 commit c80b288
Showing 1 changed file with 79 additions and 43 deletions.
122 changes: 79 additions & 43 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,87 +25,123 @@ To integrate Airflow with Anyscale, you will need to provide several configurati
- Dynamically provide this via the `compute_config` input parameter, or
- Create a compute configuration in Anyscale and use the resulting ID in the `compute_config_id` parameter.

- **Build ID**: Retrieve the Build ID by logging into the [Anyscale platform](https://anyscale.com/).
- **image_uri**: Retrieve the Image URI by logging into the [Anyscale platform](https://anyscale.com/).


### Example Usage
### Usage

Install the anyscale provider using the below pip command

```pip install astro-provider-anyscale```


The provided `submit_anyscale_job.py` script is an example of how to configure and use the `SubmitAnyscaleJob` operator within an Airflow DAG:

```python
from airflow.models import DAG
from datetime import datetime
from datetime import datetime, timedelta
from airflow import DAG
from pathlib import Path
from anyscale_provider.operators.anyscale import SubmitAnyscaleJob

# Define the default DAG arguments.
default_args = {
'owner': 'airflow',
'start_date': datetime(2024, 5, 1),
'depends_on_past': False,
'start_date': datetime(2024, 4, 2),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

# Define the DAG
# Define the Anyscale connection
ANYSCALE_CONN_ID = "anyscale_conn"

# Constants
FOLDER_PATH = Path(__file__).parent /"ray_scripts"

dag = DAG(
'anyscale_job_submission',
'sample_anyscale_workflow',
default_args=default_args,
schedule_interval='@daily'
description='A DAG to interact with Anyscale triggered manually',
schedule_interval=None, # This DAG is not scheduled, only triggered manually
catchup=False,
)

# Job submission to Anyscale
submit_anyscale_job = SubmitAnyscaleJob(
task_id='submit_anyscale_job',
conn_id='anyscale_conn_id', # Airflow connection ID for Anyscale
name='AirflowJob',
config={
"entrypoint": 'python script.py',
"build_id": 'anyscaleray2100-py39',
"compute_config_id": '<my-compute-config-id>',
"runtime_env": {}, # Dynamic runtime environment configurations
"max_retries": 2
},
conn_id = ANYSCALE_CONN_ID,
name = 'AstroJob',
image_uri = 'anyscale/ray:2.23.0-py311',
compute_config = 'my-compute-config:1',
working_dir = str(FOLDER_PATH),
entrypoint= 'python script.py',
requirements = ["requests","pandas","numpy","torch"],
max_retries = 1,
dag=dag,
)


# Defining the task sequence
submit_anyscale_job
```
The `deploy_anyscale_service.py` script uses the `RolloutAnyscaleService` operator to deploy a service on Anyscale:

```python
from airflow.models import DAG
from datetime import datetime
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from anyscale_provider.operators.anyscale import RolloutAnyscaleService
from anyscale_provider.hooks.anyscale import AnyscaleHook

# Define the default DAG arguments.
default_args = {
'owner': 'airflow',
'start_date': datetime(2024, 5, 1),
'depends_on_past': False,
'start_date': datetime(2024, 4, 2),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

# Define the DAG
# Define the Anyscale connection
ANYSCALE_CONN_ID = "anyscale_conn"

dag = DAG(
'anyscale_service_deployment',
'sample_anyscale_service_workflow',
default_args=default_args,
schedule_interval='@daily'
description='A DAG to interact with Anyscale triggered manually',
schedule_interval=None, # This DAG is not scheduled, only triggered manually
catchup=False,
)

# Deploy a service to Anyscale
deploy_anyscale_service = RolloutAnyscaleService(
task_id="rollout_anyscale_service",
conn_id='anyscale_conn_id', # Airflow connection ID for Anyscale
name="AirflowService",
build_id="<my-build-id>",
compute_config_id="<my-compute-config-id>",
ray_serve_config={
"applications": [
{
"name": "sentiment_analysis",
"runtime_env": {
"working_dir": "https://github.com/anyscale/docs_examples/archive/refs/heads/main.zip"
},
"import_path": "sentiment_analysis.app:model",
}
]
},
version = 11,
conn_id=ANYSCALE_CONN_ID,
name="AstroService",
image_uri='anyscale/ray:2.23.0-py311',
compute_config='my-compute-config:1',
working_dir="https://github.com/anyscale/docs_examples/archive/refs/heads/main.zip",
applications=[{"import_path": "sentiment_analysis.app:model"}],
requirements=["transformers", "requests", "pandas", "numpy", "torch"],
in_place=False,
canary_percent=None,
dag=dag
)

def terminate_service():
hook = AnyscaleHook(conn_id=ANYSCALE_CONN_ID)
result = hook.terminate_service(service_id="AstroService",
time_delay=5)
print(result)

terminate_anyscale_service = PythonOperator(
task_id='initialize_anyscale_hook',
python_callable=terminate_service,
dag=dag,
)

# Defining the task sequence
deploy_anyscale_service >> terminate_anyscale_service
```

### Changelog
Expand Down

0 comments on commit c80b288

Please sign in to comment.