Skip to content

Commit

Permalink
[docs] add docs for PipesEMRContainersClient (#27159)
Browse files Browse the repository at this point in the history
## Summary & Motivation

This PR adds docs for the new PipesEMRContainersCient. 

It sounds confusing but AWS EMR Containers and AWS EMR on EKS is
actually the same thing.
The former is the name of the AWS API, and the latter is the service
name (sounds like the same thing huh?) which is more used in human
language rather than automation.

For example, the `boto3` client is called `emr-containers`. 

I tried to preserve this differentiation in these docs.

## How I Tested These Changes

The snippets were tested with a real EMR on EKS cluster.
  • Loading branch information
danielgafni authored and alangenfeld committed Jan 23, 2025
1 parent 7449e78 commit 72d469a
Show file tree
Hide file tree
Showing 14 changed files with 240 additions and 55 deletions.
4 changes: 4 additions & 0 deletions docs/content/_navigation.json
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,10 @@
"title": "Dagster Pipes + AWS EMR",
"path": "/concepts/dagster-pipes/aws-emr"
},
{
"title": "Dagster Pipes + AWS EMR on EKS",
"path": "/concepts/dagster-pipes/aws-emr-containers"
},
{
"title": "Dagster Pipes + AWS EMR Serverless",
"path": "/concepts/dagster-pipes/aws-emr-serverless"
Expand Down
Binary file modified docs/content/api/modules.json.gz
Binary file not shown.
Binary file modified docs/content/api/searchindex.json.gz
Binary file not shown.
Binary file modified docs/content/api/sections.json.gz
Binary file not shown.
4 changes: 4 additions & 0 deletions docs/content/concepts.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ Dagster Pipes is a toolkit for building integrations between Dagster and externa
title="Dagster Pipes + AWS EMR"
href="/concepts/dagster-pipes/aws-emr"
></ArticleListItem>
<ArticleListItem
title="Dagster Pipes + AWS EMR on EKS"
href="/concepts/dagster-pipes/aws-emr-containers"
></ArticleListItem>
<ArticleListItem
title="Dagster Pipes + AWS EMR Serverless"
href="/concepts/dagster-pipes/aws-emr-serverless"
Expand Down
3 changes: 2 additions & 1 deletion docs/content/concepts/dagster-pipes.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ Ready to get started with Dagster Pipes? Depending on what your goal is, how you
- [AWS ECS](/concepts/dagster-pipes/aws-ecs)
- [AWS Lambda](/concepts/dagster-pipes/aws-lambda)
- [AWS Glue](/concepts/dagster-pipes/aws-glue)
- [AWS EMR Serverless](/concepts/dagster-pipes/aws-emr-serverless)
- [AWS EMR](/concepts/dagster-pipes/aws-emr)
- [AWS EMR on EKS](/concepts/dagster-pipes/aws-emr-containers)
- [AWS EMR Serverless](/concepts/dagster-pipes/aws-emr-serverless)

- **If you don’t see your integration or you want to fully customize your Pipes experience**, check out the [Dagster Pipes details and customization guide](/concepts/dagster-pipes/dagster-pipes-details-and-customization) to learn how to create a custom experience.
199 changes: 199 additions & 0 deletions docs/content/concepts/dagster-pipes/aws-emr-containers.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
---
title: "Integrating AWS EMR on EKS with Dagster Pipes | Dagster Docs"
description: "Learn to integrate Dagster Pipes with AWS EMR Containers to launch external code from Dagster assets."
---

# AWS EMR on EKS & Dagster Pipes

This tutorial gives a short overview on how to use [Dagster Pipes](/concepts/dagster-pipes) with [AWS EMR on EKS](https://aws.amazon.com/emr/features/eks/) (the corresponding AWS API is called `emr-containers`).

The [dagster-aws](/\_apidocs/libraries/dagster-aws) integration library provides the <PyObject object="PipesEMRContainersClient" module="dagster_aws.pipes" /> resource, which can be used to launch EMR jobs from Dagster assets and ops. Dagster can receive regular events such as logs, asset checks, or asset materializations from jobs launched with this client. Using it requires minimal code changes to your EMR jobs.

---

## Prerequisites

- **In the Dagster environment**, you'll need to:

- Install the following packages:

```shell
pip install dagster dagster-webserver dagster-aws
```

Refer to the [Dagster installation guide](/getting-started/install) for more info.

- **AWS authentication credentials configured.** If you don't have this set up already, refer to the [boto3 quickstart](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html).

- **In AWS**:

- An existing AWS account
- A [EMR Virtual Cluster](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/virtual-cluster.html) set up

---

## Step 1: Install the dagster-pipes module in your EMR environment

There are [a few options](https://aws.github.io/aws-emr-containers-best-practices/submit-applications/docs/spark/pyspark/#python-code-with-python-dependencies) for deploying Python code & dependencies for PySpark jobs. In this tutorial, we are going to build a custom Docker image for this purpose.

Install `dagster-pipes`, `dagster-aws` and `boto3` Python packages in your image:

```Dockerfile,file=/guides/dagster/dagster_pipes/emr-containers/Dockerfile
# start from EMR image
FROM public.ecr.aws/emr-containers/spark/emr-7.2.0:latest
USER root
RUN python -m pip install dagster-pipes
# copy the job script
COPY . .
USER hadoop
```

<Note>
It's also recommended to upgrade the default Python version included in the
base EMR image (as it has been done in the `Dockerfile` above)
</Note>

---

We copy the EMR job script (`script.py`) to the image in the last step.

## Step 2: Invoke dagster-pipes in the EMR job script

Call `open_dagster_pipes` in the EMR script to create a context that can be used to send messages to Dagster:

```python file=/guides/dagster/dagster_pipes/emr-containers/script.py
import sys

import boto3
from dagster_pipes import PipesS3MessageWriter, open_dagster_pipes
from pyspark.sql import SparkSession


def main():
s3_client = boto3.client("s3")
with open_dagster_pipes(
message_writer=PipesS3MessageWriter(client=s3_client),
) as pipes:
pipes.log.info("Hello from AWS EMR Containers!")

spark = SparkSession.builder.appName("HelloWorld").getOrCreate()

df = spark.createDataFrame(
[(1, "Alice", 34), (2, "Bob", 45), (3, "Charlie", 56)],
["id", "name", "age"],
)

# calculate a really important statistic
avg_age = float(df.agg({"age": "avg"}).collect()[0][0])

# attach it to the asset materialization in Dagster
pipes.report_asset_materialization(
metadata={"average_age": {"raw_value": avg_age, "type": "float"}},
data_version="alpha",
)

print("Hello from stdout!")
print("Hello from stderr!", file=sys.stderr)


if __name__ == "__main__":
main()
```

<Note>
It's best to use the `PipesS3MessageWriter` with EMR on EKS, because this
message writer has the ability to capture the Spark driver logs and send them
to Dagster.
</Note>

---

## Step 3: Create an asset using the PipesEMRcontainersClient to launch the job

In the Dagster asset/op code, use the `PipesEMRcontainersClient` resource to launch the job:

```python file=/guides/dagster/dagster_pipes/emr-containers/dagster_code.py startafter=start_asset_marker endbefore=end_asset_marker
from dagster_aws.pipes import PipesEMRContainersClient

import dagster as dg


@dg.asset
def emr_containers_asset(
context: dg.AssetExecutionContext,
pipes_emr_containers_client: PipesEMRContainersClient,
):
image = (
...
) # it's likely the image can be taken from context.run_tags["dagster/image"]

return pipes_emr_containers_client.run(
context=context,
start_job_run_params={
"releaseLabel": "emr-7.5.0-latest",
"virtualClusterId": ...,
"clientToken": context.run_id, # idempotency identifier for the job run
"executionRoleArn": ...,
"jobDriver": {
"sparkSubmitJobDriver": {
"entryPoint": "local:///app/script.py",
"sparkSubmitParameters": f"--conf spark.kubernetes.container.image={image}",
}
},
},
).get_materialize_result()
```

<Note>
Setting `include_stdio_in_messages` to `True` in the `PipesS3MessageReader`
will allow the driver logs to be forwarded to the Dagster process.
</Note>

Materializing this asset will launch the AWS on EKS job and wait for it to complete. If the job fails, the Dagster process will raise an exception. If the Dagster process is interrupted while the job is still running, the job will be terminated.

---

## Step 4: Create Dagster definitions

Next, add the `PipesEMRContainersClient` resource to your project's <PyObject object="Definitions" /> object:

```python file=/guides/dagster/dagster_pipes/emr-containers/dagster_code.py startafter=start_definitions_marker endbefore=end_definitions_marker
import boto3
from dagster_aws.pipes import PipesS3ContextInjector, PipesS3MessageReader

from dagster import Definitions

defs = Definitions(
assets=[emr_containers_asset],
resources={
"pipes_emr_containers_client": PipesEMRContainersClient(
message_reader=PipesS3MessageReader(
client=boto3.client("s3"),
bucket=...,
include_stdio_in_messages=True,
),
)
},
)
```

Dagster will now be able to launch the AWS EMR Containers job from the `emr_containers_asset` asset, and receive logs and events from the job. If `include_stdio_in_messages` is set to `True`, the logs will be forwarded to the Dagster process.

---

## Related

<ArticleList>
<ArticleListItem
title="Dagster Pipes"
href="/concepts/dagster-pipes"
></ArticleListItem>
<ArticleListItem
title="AWS EMR Containers API reference"
href="/_apidocs/libraries/dagster-aws#dagster_aws.pipes.PipesEMRContainersClient"
></ArticleListItem>
</ArticleList>
3 changes: 2 additions & 1 deletion docs/content/integrations/spark.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ You can either use one of the available Pipes Clients or make your own. The avai

- [Databricks](/concepts/dagster-pipes/databricks)
- [AWS Glue](/concepts/dagster-pipes/aws-glue)
- [AWS EMR Serverless](/concepts/dagster-pipes/aws-emr-serverless)
- [AWS EMR](/concepts/dagster-pipes/aws-emr)
- [AWS EMR on EKS](/concepts/dagster-pipes/aws-emr-containers)
- [AWS EMR Serverless](/concepts/dagster-pipes/aws-emr-serverless)

Existing Spark jobs can be used with Pipes without any modifications. In this case, Dagster will be receiving logs from the job, but not events like asset checks or attached metadata.

Expand Down
Binary file modified docs/next/public/objects.inv
Binary file not shown.
2 changes: 2 additions & 0 deletions docs/sphinx/sections/api/apidocs/libraries/dagster-aws.rst
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ Clients

.. autoclass:: dagster_aws.pipes.PipesEMRClient

.. autoclass:: dagster_aws.pipes.PipesEMRContainersClient

.. autoclass:: dagster_aws.pipes.PipesEMRServerlessClient

Legacy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ USER root
RUN mkdir /python && chown hadoop:hadoop /python

USER hadoop
ENV UV_PYTHON_INSTALL_DIR=/python
ENV UV_PYTHON_INSTALL_DIR=/python \
UV_BREAK_SYSTEM_PACKAGES=1

RUN uv python install --python-preference only-managed 3.9.16
ENV PATH="${UV_PYTHON_INSTALL_DIR}/cpython-3.9.16-linux-x86_64-gnu/bin:${PATH}"
ENV UV_PYTHON="${UV_PYTHON_INSTALL_DIR}/cpython-3.9.16-linux-x86_64-gnu/bin/python" \
UV_BREAK_SYSTEM_PACKAGES=1 \

ENV PATH="${UV_PYTHON_INSTALL_DIR}/cpython-3.9.16-linux-x86_64-gnu/bin:${PATH}" \
PYTHONPATH="${UV_PYTHON_INSTALL_DIR}/cpython-3.9.16-linux-x86_64-gnu/lib/python3.9/site-packages" \
UV_PYTHON="${UV_PYTHON_INSTALL_DIR}/cpython-3.9.16-linux-x86_64-gnu/bin/python" \
PYSPARK_PYTHON="${UV_PYTHON_INSTALL_DIR}/cpython-3.9.16-linux-x86_64-gnu/bin/python" \
PYSPARK_DRIVER_PYTHON="${UV_PYTHON_INSTALL_DIR}/cpython-3.9.16-linux-x86_64-gnu/bin/python" \
PYTHONPATH="${UV_PYTHON_INSTALL_DIR}/cpython-3.9.16-linux-x86_64-gnu/lib/python3.9/site-packages"
PYSPARK_DRIVER_PYTHON="${UV_PYTHON_INSTALL_DIR}/cpython-3.9.16-linux-x86_64-gnu/bin/python"

RUN uv pip install --system dagster-pipes boto3 pyspark

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,50 +2,31 @@

from dagster_aws.pipes import PipesEMRContainersClient

from dagster import AssetExecutionContext, asset
import dagster as dg


@asset
@dg.asset
def emr_containers_asset(
context: AssetExecutionContext,
context: dg.AssetExecutionContext,
pipes_emr_containers_client: PipesEMRContainersClient,
):
image = (
...
) # it's likely the image can be taken from context.run_tags["dagster/image"]

return pipes_emr_containers_client.run(
context=context,
start_job_run_params={
"releaseLabel": "emr-7.5.0-latest",
"virtualClusterId": "uqcja50dzo7v1meie1wa47wa3",
"virtualClusterId": ...,
"clientToken": context.run_id, # idempotency identifier for the job run
"executionRoleArn": "arn:aws:iam::467123434025:role/emr-dagster-pipes-20250109135314655100000001",
"executionRoleArn": ...,
"jobDriver": {
"sparkSubmitJobDriver": {
"entryPoint": "local:///app/script.py",
# --conf spark.kubernetes.container.image=
"sparkSubmitParameters": "--conf spark.kubernetes.file.upload.path=/tmp/spark --conf spark.kubernetes.container.image=467123434025.dkr.ecr.eu-north-1.amazonaws.com/dagster/emr-containers:12", # --conf spark.pyspark.python=/home/hadoop/.local/share/uv/python/cpython-3.9.16-linux-x86_64-gnu/bin/python --conf spark.pyspark.driver.python=/home/hadoop/.local/share/uv/python/cpython-3.9.16-linux-x86_64-gnu/bin/python",
"sparkSubmitParameters": f"--conf spark.kubernetes.container.image={image}",
}
},
"configurationOverrides": {
"monitoringConfiguration": {
"cloudWatchMonitoringConfiguration": {
"logGroupName": "/aws/emr/containers/pipes",
"logStreamNamePrefix": str(context.run_id),
}
},
# "applicationConfiguration": [
# {
# "Classification": "spark-env",
# "Configurations": [
# {
# "Classification": "export",
# "Properties": {
# "PYSPARK_PYTHON": "/home/hadoop/.local/share/uv/python/cpython-3.9.16-linux-x86_64-gnu/bin/python",
# "PYSPARK_DRIVER_PYTHON": "/home/hadoop/.local/share/uv/python/cpython-3.9.16-linux-x86_64-gnu/bin/python",
# }
# }
# ]
# }
# ]
},
},
).get_materialize_result()

Expand All @@ -54,16 +35,18 @@ def emr_containers_asset(

# start_definitions_marker
import boto3
from dagster_aws.pipes import PipesCloudWatchMessageReader, PipesS3ContextInjector
from dagster_aws.pipes import PipesS3ContextInjector, PipesS3MessageReader

from dagster import Definitions

defs = Definitions(
assets=[emr_containers_asset],
resources={
"pipes_emr_containers_client": PipesEMRContainersClient(
message_reader=PipesCloudWatchMessageReader(
client=boto3.client("logs"),
message_reader=PipesS3MessageReader(
client=boto3.client("s3"),
bucket=...,
include_stdio_in_messages=True,
),
)
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
import sys

import boto3
from dagster_pipes import PipesS3ContextLoader, PipesS3MessageWriter, open_dagster_pipes
from dagster_pipes import PipesS3MessageWriter, open_dagster_pipes
from pyspark.sql import SparkSession


def main():
s3_client = boto3.client("s3")

with open_dagster_pipes(
message_writer=PipesS3MessageWriter(client=s3_client),
context_loader=PipesS3ContextLoader(client=s3_client),
) as pipes:
pipes.log.info("Hello from AWS EMR Containers!")

Expand All @@ -36,11 +34,3 @@ def main():

if __name__ == "__main__":
main()

# import os
# import sys

# print(os.getcwd())
# print(os.environ)
# print(sys.path)
# print(sys.executable)
Loading

1 comment on commit 72d469a

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagster-docs-legacy ready!

✅ Preview
https://dagster-docs-legacy-ku7arol20-elementl.vercel.app
https://release-1-9-10.dagster.dagster-docs.io

Built with commit 72d469a.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.