diff --git a/docs/content/_navigation.json b/docs/content/_navigation.json index 2009a0ba46525..bf41f1f988840 100644 --- a/docs/content/_navigation.json +++ b/docs/content/_navigation.json @@ -398,6 +398,10 @@ "title": "Dagster Pipes + AWS EMR", "path": "/concepts/dagster-pipes/aws-emr" }, + { + "title": "Dagster Pipes + AWS EMR on EKS", + "path": "/concepts/dagster-pipes/aws-emr-containers" + }, { "title": "Dagster Pipes + AWS EMR Serverless", "path": "/concepts/dagster-pipes/aws-emr-serverless" diff --git a/docs/content/api/modules.json.gz b/docs/content/api/modules.json.gz index f1000b1546fa3..fa49c81e7994d 100644 Binary files a/docs/content/api/modules.json.gz and b/docs/content/api/modules.json.gz differ diff --git a/docs/content/api/searchindex.json.gz b/docs/content/api/searchindex.json.gz index ea64157fefa27..2e2784ee6e6f8 100644 Binary files a/docs/content/api/searchindex.json.gz and b/docs/content/api/searchindex.json.gz differ diff --git a/docs/content/api/sections.json.gz b/docs/content/api/sections.json.gz index 32bbeecfce979..d3c3d3f287608 100644 Binary files a/docs/content/api/sections.json.gz and b/docs/content/api/sections.json.gz differ diff --git a/docs/content/concepts.mdx b/docs/content/concepts.mdx index 56d7260d5bec9..f2f8110e4b85f 100644 --- a/docs/content/concepts.mdx +++ b/docs/content/concepts.mdx @@ -236,6 +236,10 @@ Dagster Pipes is a toolkit for building integrations between Dagster and externa title="Dagster Pipes + AWS EMR" href="/concepts/dagster-pipes/aws-emr" > + resource, which can be used to launch EMR jobs from Dagster assets and ops. Dagster can receive regular events such as logs, asset checks, or asset materializations from jobs launched with this client. Using it requires minimal code changes to your EMR jobs. + +--- + +## Prerequisites + +- **In the Dagster environment**, you'll need to: + + - Install the following packages: + + ```shell + pip install dagster dagster-webserver dagster-aws + ``` + + Refer to the [Dagster installation guide](/getting-started/install) for more info. + + - **AWS authentication credentials configured.** If you don't have this set up already, refer to the [boto3 quickstart](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html). + +- **In AWS**: + + - An existing AWS account + - A [EMR Virtual Cluster](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/virtual-cluster.html) set up + +--- + +## Step 1: Install the dagster-pipes module in your EMR environment + +There are [a few options](https://aws.github.io/aws-emr-containers-best-practices/submit-applications/docs/spark/pyspark/#python-code-with-python-dependencies) for deploying Python code & dependencies for PySpark jobs. In this tutorial, we are going to build a custom Docker image for this purpose. + +Install `dagster-pipes`, `dagster-aws` and `boto3` Python packages in your image: + +```Dockerfile,file=/guides/dagster/dagster_pipes/emr-containers/Dockerfile +# start from EMR image +FROM public.ecr.aws/emr-containers/spark/emr-7.2.0:latest + +USER root + +RUN python -m pip install dagster-pipes + +# copy the job script +COPY . . + +USER hadoop +``` + + + It's also recommended to upgrade the default Python version included in the + base EMR image (as it has been done in the `Dockerfile` above) + + +--- + +We copy the EMR job script (`script.py`) to the image in the last step. + +## Step 2: Invoke dagster-pipes in the EMR job script + +Call `open_dagster_pipes` in the EMR script to create a context that can be used to send messages to Dagster: + +```python file=/guides/dagster/dagster_pipes/emr-containers/script.py +import sys + +import boto3 +from dagster_pipes import PipesS3MessageWriter, open_dagster_pipes +from pyspark.sql import SparkSession + + +def main(): + s3_client = boto3.client("s3") + with open_dagster_pipes( + message_writer=PipesS3MessageWriter(client=s3_client), + ) as pipes: + pipes.log.info("Hello from AWS EMR Containers!") + + spark = SparkSession.builder.appName("HelloWorld").getOrCreate() + + df = spark.createDataFrame( + [(1, "Alice", 34), (2, "Bob", 45), (3, "Charlie", 56)], + ["id", "name", "age"], + ) + + # calculate a really important statistic + avg_age = float(df.agg({"age": "avg"}).collect()[0][0]) + + # attach it to the asset materialization in Dagster + pipes.report_asset_materialization( + metadata={"average_age": {"raw_value": avg_age, "type": "float"}}, + data_version="alpha", + ) + + print("Hello from stdout!") + print("Hello from stderr!", file=sys.stderr) + + +if __name__ == "__main__": + main() +``` + + + It's best to use the `PipesS3MessageWriter` with EMR on EKS, because this + message writer has the ability to capture the Spark driver logs and send them + to Dagster. + + +--- + +## Step 3: Create an asset using the PipesEMRcontainersClient to launch the job + +In the Dagster asset/op code, use the `PipesEMRcontainersClient` resource to launch the job: + +```python file=/guides/dagster/dagster_pipes/emr-containers/dagster_code.py startafter=start_asset_marker endbefore=end_asset_marker +from dagster_aws.pipes import PipesEMRContainersClient + +import dagster as dg + + +@dg.asset +def emr_containers_asset( + context: dg.AssetExecutionContext, + pipes_emr_containers_client: PipesEMRContainersClient, +): + image = ( + ... + ) # it's likely the image can be taken from context.run_tags["dagster/image"] + + return pipes_emr_containers_client.run( + context=context, + start_job_run_params={ + "releaseLabel": "emr-7.5.0-latest", + "virtualClusterId": ..., + "clientToken": context.run_id, # idempotency identifier for the job run + "executionRoleArn": ..., + "jobDriver": { + "sparkSubmitJobDriver": { + "entryPoint": "local:///app/script.py", + "sparkSubmitParameters": f"--conf spark.kubernetes.container.image={image}", + } + }, + }, + ).get_materialize_result() +``` + + + Setting `include_stdio_in_messages` to `True` in the `PipesS3MessageReader` + will allow the driver logs to be forwarded to the Dagster process. + + +Materializing this asset will launch the AWS on EKS job and wait for it to complete. If the job fails, the Dagster process will raise an exception. If the Dagster process is interrupted while the job is still running, the job will be terminated. + +--- + +## Step 4: Create Dagster definitions + +Next, add the `PipesEMRContainersClient` resource to your project's object: + +```python file=/guides/dagster/dagster_pipes/emr-containers/dagster_code.py startafter=start_definitions_marker endbefore=end_definitions_marker +import boto3 +from dagster_aws.pipes import PipesS3ContextInjector, PipesS3MessageReader + +from dagster import Definitions + +defs = Definitions( + assets=[emr_containers_asset], + resources={ + "pipes_emr_containers_client": PipesEMRContainersClient( + message_reader=PipesS3MessageReader( + client=boto3.client("s3"), + bucket=..., + include_stdio_in_messages=True, + ), + ) + }, +) +``` + +Dagster will now be able to launch the AWS EMR Containers job from the `emr_containers_asset` asset, and receive logs and events from the job. If `include_stdio_in_messages` is set to `True`, the logs will be forwarded to the Dagster process. + +--- + +## Related + + + + + diff --git a/docs/content/integrations/spark.mdx b/docs/content/integrations/spark.mdx index ad4e374ef04ad..f92296ca38e4a 100644 --- a/docs/content/integrations/spark.mdx +++ b/docs/content/integrations/spark.mdx @@ -19,8 +19,9 @@ You can either use one of the available Pipes Clients or make your own. The avai - [Databricks](/concepts/dagster-pipes/databricks) - [AWS Glue](/concepts/dagster-pipes/aws-glue) -- [AWS EMR Serverless](/concepts/dagster-pipes/aws-emr-serverless) - [AWS EMR](/concepts/dagster-pipes/aws-emr) +- [AWS EMR on EKS](/concepts/dagster-pipes/aws-emr-containers) +- [AWS EMR Serverless](/concepts/dagster-pipes/aws-emr-serverless) Existing Spark jobs can be used with Pipes without any modifications. In this case, Dagster will be receiving logs from the job, but not events like asset checks or attached metadata. diff --git a/docs/next/public/objects.inv b/docs/next/public/objects.inv index 1cbe9becc74df..d18995cb01272 100644 Binary files a/docs/next/public/objects.inv and b/docs/next/public/objects.inv differ diff --git a/docs/sphinx/sections/api/apidocs/libraries/dagster-aws.rst b/docs/sphinx/sections/api/apidocs/libraries/dagster-aws.rst index 15d161197def5..78e043eb515a7 100644 --- a/docs/sphinx/sections/api/apidocs/libraries/dagster-aws.rst +++ b/docs/sphinx/sections/api/apidocs/libraries/dagster-aws.rst @@ -127,6 +127,8 @@ Clients .. autoclass:: dagster_aws.pipes.PipesEMRClient +.. autoclass:: dagster_aws.pipes.PipesEMRContainersClient + .. autoclass:: dagster_aws.pipes.PipesEMRServerlessClient Legacy diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr-containers/Dockerfile b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr-containers/Dockerfile index d6dfe41a3e10c..13ec40fdc3a3e 100644 --- a/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr-containers/Dockerfile +++ b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr-containers/Dockerfile @@ -6,15 +6,16 @@ USER root RUN mkdir /python && chown hadoop:hadoop /python USER hadoop -ENV UV_PYTHON_INSTALL_DIR=/python +ENV UV_PYTHON_INSTALL_DIR=/python \ + UV_BREAK_SYSTEM_PACKAGES=1 RUN uv python install --python-preference only-managed 3.9.16 -ENV PATH="${UV_PYTHON_INSTALL_DIR}/cpython-3.9.16-linux-x86_64-gnu/bin:${PATH}" -ENV UV_PYTHON="${UV_PYTHON_INSTALL_DIR}/cpython-3.9.16-linux-x86_64-gnu/bin/python" \ - UV_BREAK_SYSTEM_PACKAGES=1 \ + +ENV PATH="${UV_PYTHON_INSTALL_DIR}/cpython-3.9.16-linux-x86_64-gnu/bin:${PATH}" \ + PYTHONPATH="${UV_PYTHON_INSTALL_DIR}/cpython-3.9.16-linux-x86_64-gnu/lib/python3.9/site-packages" \ + UV_PYTHON="${UV_PYTHON_INSTALL_DIR}/cpython-3.9.16-linux-x86_64-gnu/bin/python" \ PYSPARK_PYTHON="${UV_PYTHON_INSTALL_DIR}/cpython-3.9.16-linux-x86_64-gnu/bin/python" \ - PYSPARK_DRIVER_PYTHON="${UV_PYTHON_INSTALL_DIR}/cpython-3.9.16-linux-x86_64-gnu/bin/python" \ - PYTHONPATH="${UV_PYTHON_INSTALL_DIR}/cpython-3.9.16-linux-x86_64-gnu/lib/python3.9/site-packages" + PYSPARK_DRIVER_PYTHON="${UV_PYTHON_INSTALL_DIR}/cpython-3.9.16-linux-x86_64-gnu/bin/python" RUN uv pip install --system dagster-pipes boto3 pyspark diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr-containers/dagster_code.py b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr-containers/dagster_code.py index 301461b861f9f..f0e93f4d2f655 100644 --- a/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr-containers/dagster_code.py +++ b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr-containers/dagster_code.py @@ -2,50 +2,31 @@ from dagster_aws.pipes import PipesEMRContainersClient -from dagster import AssetExecutionContext, asset +import dagster as dg -@asset +@dg.asset def emr_containers_asset( - context: AssetExecutionContext, + context: dg.AssetExecutionContext, pipes_emr_containers_client: PipesEMRContainersClient, ): + image = ( + ... + ) # it's likely the image can be taken from context.run_tags["dagster/image"] + return pipes_emr_containers_client.run( context=context, start_job_run_params={ "releaseLabel": "emr-7.5.0-latest", - "virtualClusterId": "uqcja50dzo7v1meie1wa47wa3", + "virtualClusterId": ..., "clientToken": context.run_id, # idempotency identifier for the job run - "executionRoleArn": "arn:aws:iam::467123434025:role/emr-dagster-pipes-20250109135314655100000001", + "executionRoleArn": ..., "jobDriver": { "sparkSubmitJobDriver": { "entryPoint": "local:///app/script.py", - # --conf spark.kubernetes.container.image= - "sparkSubmitParameters": "--conf spark.kubernetes.file.upload.path=/tmp/spark --conf spark.kubernetes.container.image=467123434025.dkr.ecr.eu-north-1.amazonaws.com/dagster/emr-containers:12", # --conf spark.pyspark.python=/home/hadoop/.local/share/uv/python/cpython-3.9.16-linux-x86_64-gnu/bin/python --conf spark.pyspark.driver.python=/home/hadoop/.local/share/uv/python/cpython-3.9.16-linux-x86_64-gnu/bin/python", + "sparkSubmitParameters": f"--conf spark.kubernetes.container.image={image}", } }, - "configurationOverrides": { - "monitoringConfiguration": { - "cloudWatchMonitoringConfiguration": { - "logGroupName": "/aws/emr/containers/pipes", - "logStreamNamePrefix": str(context.run_id), - } - }, - # "applicationConfiguration": [ - # { - # "Classification": "spark-env", - # "Configurations": [ - # { - # "Classification": "export", - # "Properties": { - # "PYSPARK_PYTHON": "/home/hadoop/.local/share/uv/python/cpython-3.9.16-linux-x86_64-gnu/bin/python", - # "PYSPARK_DRIVER_PYTHON": "/home/hadoop/.local/share/uv/python/cpython-3.9.16-linux-x86_64-gnu/bin/python", - # } - # } - # ] - # } - # ] - }, }, ).get_materialize_result() @@ -54,7 +35,7 @@ def emr_containers_asset( # start_definitions_marker import boto3 -from dagster_aws.pipes import PipesCloudWatchMessageReader, PipesS3ContextInjector +from dagster_aws.pipes import PipesS3ContextInjector, PipesS3MessageReader from dagster import Definitions @@ -62,8 +43,10 @@ def emr_containers_asset( assets=[emr_containers_asset], resources={ "pipes_emr_containers_client": PipesEMRContainersClient( - message_reader=PipesCloudWatchMessageReader( - client=boto3.client("logs"), + message_reader=PipesS3MessageReader( + client=boto3.client("s3"), + bucket=..., + include_stdio_in_messages=True, ), ) }, diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr-containers/script.py b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr-containers/script.py index 18e074ba2f1f0..2f70341d73135 100644 --- a/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr-containers/script.py +++ b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/emr-containers/script.py @@ -1,16 +1,14 @@ import sys import boto3 -from dagster_pipes import PipesS3ContextLoader, PipesS3MessageWriter, open_dagster_pipes +from dagster_pipes import PipesS3MessageWriter, open_dagster_pipes from pyspark.sql import SparkSession def main(): s3_client = boto3.client("s3") - with open_dagster_pipes( message_writer=PipesS3MessageWriter(client=s3_client), - context_loader=PipesS3ContextLoader(client=s3_client), ) as pipes: pipes.log.info("Hello from AWS EMR Containers!") @@ -36,11 +34,3 @@ def main(): if __name__ == "__main__": main() - -# import os -# import sys - -# print(os.getcwd()) -# print(os.environ) -# print(sys.path) -# print(sys.executable) diff --git a/python_modules/libraries/dagster-aws/dagster_aws/pipes/clients/emr_containers.py b/python_modules/libraries/dagster-aws/dagster_aws/pipes/clients/emr_containers.py index 21f74f2252fce..bb4a49acc2c34 100644 --- a/python_modules/libraries/dagster-aws/dagster_aws/pipes/clients/emr_containers.py +++ b/python_modules/libraries/dagster-aws/dagster_aws/pipes/clients/emr_containers.py @@ -40,11 +40,11 @@ class PipesEMRContainersClient(PipesClient, TreatAsResourceParam): Args: client (Optional[boto3.client]): The boto3 AWS EMR containers client used to interact with AWS EMR Containers. context_injector (Optional[PipesContextInjector]): A context injector to use to inject - context into AWS EMR containers workload. Defaults to :py:class:`PipesEnvContextInjector`. + context into AWS EMR Containers workload. Defaults to :py:class:`PipesEnvContextInjector`. message_reader (Optional[PipesMessageReader]): A message reader to use to read messages - from the AWS EMR containers workload. It's recommended to use :py:class:`PipesS3MessageReader`. - forward_termination (bool): Whether to cancel the AWS EMR containers workload if the Dagster process receives a termination signal. - pipes_params_bootstrap_method (Literal["args", "env"]): The method to use to inject parameters into the AWS EMR containers workload. Defaults to "args". + from the AWS EMR Containers workload. It's recommended to use :py:class:`PipesS3MessageReader`. + forward_termination (bool): Whether to cancel the AWS EMR Containers workload if the Dagster process receives a termination signal. + pipes_params_bootstrap_method (Literal["args", "env"]): The method to use to inject parameters into the AWS EMR Containers workload. Defaults to "args". waiter_config (Optional[WaiterConfig]): Optional waiter configuration to use. Defaults to 70 days (Delay: 6, MaxAttempts: 1000000). """