Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Show #6

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .Rprofile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
source("renv/activate.R")
31 changes: 21 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,27 @@ To begin exploring the integration of Dagster and R:
```bash
cd dagster-and-r
```
3. **Install Dependencies**
Using [poetry](https://python-poetry.org/), install the package and its dependencies:
```bash
poetry install
3. **Install Python Dependencies**
# you'll need a version of python installed
Using uv
# install uv
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing an opening ``` here?

# curl -LsSf https://astral.sh/uv/install.sh | sh
uv venv
source .venv/bin/activate
uv sync
```
4. ** Install R dependencies**
```
# from R
# if you haven't installed renv before
# install.packages("renv")
# renv::restore()
```

4. **Set RETICULATE_PYTHON environment variable**
Determine the path to the python binary associated with this project's poetry environment.
```bash
poetry run
# from your viritual environment
which python
# /home/user/.cache/pypoetry/virtualenvs/dagster-and-r-kS5e8P_l-py3.10/bin/python
Copy link
Owner

@philiporlando philiporlando Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I needed to update my .Renviron to avoid a cryptic error message when trying to materialize the iris_r asset:

#.Renviron
RETICULATE_PYTHON=.venv/bin/python

We should probably update the README to include this new path after migrating to uv.

```
Expand All @@ -38,7 +49,7 @@ Create a new `.Renviron` file at the root of the project and set the `RETICULATE
5. **Launch the Dagster UI**
Start the Dagster web server:
```bash
poetry run dagster dev
dagster dev
```
Access the UI at http://localhost:3000 in your browser.

Expand All @@ -61,7 +72,7 @@ Create a new `.Renviron` file at the root of the project and set the `RETICULATE
Then, start the Dagster UI web server:

```bash
poetry run dagster dev -m dagster_and_r
dagster dev -m dagster_and_r
```

Open http://localhost:3000 with your browser to see the project.
Expand All @@ -85,21 +96,21 @@ Open http://localhost:3000 with your browser to see the project.
### Adding Python Dependencies
To add new Python packages to the project:
```bash
poetry add <pkg-name>
uv add <pkg-name>
```

### Unit Testing
Unit tests are essential for ensuring code reliability and are currently being developed. Run existing tests using `pytest`:
```bash
poetry run pytest dagster_and_r_tests
pytest dagster_and_r_tests
```
> [!NOTE]
> Unit tests are a work in progress.

### Schedules and Sensors
To enable [Schedules](https://docs.dagster.io/concepts/partitions-schedules-sensors/schedules) and [Sensors](https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors), ensure the [Dagster Daemon](https://docs.dagster.io/deployment/dagster-daemon) is active:
```bash
poetry run dagster dev
dagster dev
```
With the Daemon running, you can start using schedules and sensors for your jobs.

Expand Down
46 changes: 44 additions & 2 deletions dagster_and_r/R/iris.R
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# make sure these packages are installed
library(reticulate)
library(readr)
library(glue)
Expand All @@ -7,6 +8,36 @@ library(magrittr)
reticulate::py_config()
stopifnot(reticulate::py_module_available("dagster_pipes"))

# Function to convert R types to Python types
convert_r_to_python_types <- function(df) {
# Get R types
r_types <- sapply(df, class)

# Define type mapping
type_mapping <- list(
"numeric" = "float",
"integer" = "int",
"character" = "str",
"factor" = "str",
"logical" = "bool",
"Date" = "datetime.date",
"POSIXct" = "datetime.datetime",
"POSIXlt" = "datetime.datetime"
)

# Convert types
python_types <- sapply(r_types, function(x) {
if (x %in% names(type_mapping)) {
type_mapping[[x]]
} else {
"object" # default type
}
})

return(reticulate::r_to_py(as.list(python_types)))
}
Comment on lines +12 to +38
Copy link
Owner

@philiporlando philiporlando Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised this type_mapping list is needed. Shouldn't type conversion be handled implicitly by reticulate?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It did the floats fine but defaults to "object" for the strings. I'll check into it some more

Copy link
Owner

@philiporlando philiporlando Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the info. I haven't had a chance to test your changes yet, but I'm hoping there's still a way that we can piggyback off reticulate's type conversion somehow.

The r_to_py.data.frame() function should be able to handle most R data types. Maybe we can pass df to this function, instead of using sapply(df, class), and then extract the python types from the resulting data frame?

It's possible that reticulate isn't recognizing that pandas is available, which could be causing r_to_py_impl() to be called instead. However, I'm noticing that uv support was added to reticulate in October, and the renv.lock shows the latest version, so this shouldn't be an issue.

I'll try taking a closer look in the next couple of days.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh r_to_py.data.frame was the function I was missing after rifling through the reticulate docs. Yeah that might do it

Copy link
Owner

@philiporlando philiporlando Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaning on reticulate more produces the below dictionary:

reticulate::r_to_py(df)
{'Sepal.Length': dtype('float64'), 'Sepal.Width': dtype('float64'), 'Petal.Length': dtype('float64'), 'Petal.Width': dtype('float64'), 'Species': CategoricalDtype(categories=['setosa', 'versicolor', 'virginica'], ordered=False, categories_dtype=object)}

With a little more work, we can extract the name of each data type from its dtype() object:

convert_r_to_python_types <- function(df) {
    df_pandas <- reticulate::r_to_py(df)
    dtypes_dict <- df_pandas$dtypes$to_dict()
    for (col_name in names(dtypes_dict)) {
        dtypes_dict[[col_name]] <- as.character(dtypes_dict[[col_name]]$name)
    }
    return(dtypes_dict)
}

{'Sepal.Length': 'float64', 'Sepal.Width': 'float64', 'Petal.Length': 'float64', 'Petal.Width': 'float64', 'Species': 'category'}

This will materialize successfully, however, reticulate's float64 is not recognized as a Python float by Dagster. Instead, it is interpreted as a VARCHAR:

image

I'm not really sure if there's an implicit way to rely on reticulate here. The hard-coded mapping dictionary may be the only way.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think relying on reticluate and then manually overriding a few might make the most sense (like float64 vs. float).

I'll take the code here and add it to my PR with an override



# Import Python modules
# R doesn't support selective imports like Python, so you have to do this
# to avoid typing the full namespace path repeatedly...
Expand All @@ -23,9 +54,20 @@ with(open_dagster_pipes() %as% pipes, {
context$log$info(head(iris))
context$log$info(os$environ["MY_ENV_VAR_IN_SUBPROCESS"])
output_dir <- Sys.getenv("OUTPUT_DIR")
iris_head <- head(iris)
context$log$info(glue::glue("output_dir: {output_dir}"))
context$report_asset_materialization()

#python function to report back the materialization and metadata
context$report_custom_message(
payload = reticulate::r_to_py(list(
"dagster/row_count" = nrow(iris),
# if using report_asset_materialization
#list( type = "md", "raw_value" = paste(knitr::kable(iris_head, format = "pipe"), collapse = "\n") ),
"preview" = paste(knitr::kable(iris_head, format = "pipe"), collapse = "\n"),
"iris_head_df" = reticulate::r_to_py(jsonlite::toJSON(x = iris_head, dataframe = "columns")),
"column_types" = convert_r_to_python_types(iris_head)
))
)
context$log$info(glue::glue("got here!"))
# Ensure that Sepal.Length field does not contain any NAs
context$report_asset_check(
asset_key="iris_r",
Expand Down
Empty file added dagster_and_r/README.md
Empty file.
30 changes: 0 additions & 30 deletions dagster_and_r/__init__.py
Original file line number Diff line number Diff line change
@@ -1,30 +0,0 @@
from dagster import (
Definitions,
PipesSubprocessClient,
)
from . jobs import docker_container_op_r
from . asset_checks import (
# no_missing_sepal_length_check_r,
no_missing_sepal_length_check_py,
)

# python_assets = load_assets_from_modules([assets])
from . assets import (
hello_world_r,
iris_r,
iris_py,
)

defs = Definitions(
assets=[
hello_world_r,
iris_r,
iris_py,
],
asset_checks=[
# no_missing_sepal_length_check_r,
no_missing_sepal_length_check_py,
],
jobs=[docker_container_op_r],
resources={"pipes_subprocess_client": PipesSubprocessClient()},
)
90 changes: 62 additions & 28 deletions dagster_and_r/assets.py
Original file line number Diff line number Diff line change
@@ -1,56 +1,90 @@
import json
import shutil
import pandas as pd
import dagster as dg
from dagster_pandas.data_frame import create_table_schema_metadata_from_dataframe

from dagster import (
AssetExecutionContext,
asset,
AssetCheckSpec,
MaterializeResult,
PipesSubprocessClient,
file_relative_path,
Field,
String,
def create_table_schema_from_dict(type_dict):
# Map Python types to SQL types
type_mapping = {
'float': 'FLOAT',
'int': 'INTEGER',
'numeric': 'numeric',
'str': 'VARCHAR',
'bool': 'BOOLEAN',
'datetime.date': 'DATE',
'datetime.datetime': 'TIMESTAMP'
}

columns = []
for col_name, col_type in type_dict.items():
columns.append(
dg.TableColumn(
name=col_name,
type=type_mapping.get(col_type, 'VARCHAR'),
description=f"Column {col_name} of type {col_type}",
)
)

return dg.TableSchema(
columns=columns,
)

@asset

# example that runs an R script without modification. R script runs but does not report anything in Dagster other than succes.
@dg.asset
def hello_world_r(
context: AssetExecutionContext,
pipes_subprocess_client: PipesSubprocessClient,
) -> MaterializeResult:
cmd = [shutil.which("Rscript"), file_relative_path(__file__, "./R/hello_world.R")]
context: dg.AssetExecutionContext,
pipes_subprocess_client: dg.PipesSubprocessClient,
) -> dg.MaterializeResult:
cmd = [shutil.which("Rscript"), dg.file_relative_path(__file__, "./R/hello_world.R")]
return pipes_subprocess_client.run(
command=cmd,
context=context,
).get_materialize_result()


@asset(
config_schema={"output_dir": Field(String, default_value="./data")},


@dg.asset(
config_schema={"output_dir": dg.Field(dg.String, default_value="./data")},
check_specs=[
AssetCheckSpec(name="no_missing_sepal_length_check_r", asset="iris_r"),
AssetCheckSpec(name="no_missing_sepal_width_check_r", asset="iris_r"),
AssetCheckSpec(name="no_missing_petal_length_check_r", asset="iris_r"),
AssetCheckSpec(name="no_missing_petal_width_check_r", asset="iris_r"),
AssetCheckSpec(name="species_name_check_r", asset="iris_r"),
dg.AssetCheckSpec(name="no_missing_sepal_length_check_r", asset="iris_r"),
dg.AssetCheckSpec(name="no_missing_sepal_width_check_r", asset="iris_r"),
dg.AssetCheckSpec(name="no_missing_petal_length_check_r", asset="iris_r"),
dg.AssetCheckSpec(name="no_missing_petal_width_check_r", asset="iris_r"),
dg.AssetCheckSpec(name="species_name_check_r", asset="iris_r"),
],
)
def iris_r(
context: AssetExecutionContext,
pipes_subprocess_client: PipesSubprocessClient,
) -> MaterializeResult:
context: dg.AssetExecutionContext,
pipes_subprocess_client: dg.PipesSubprocessClient,
) -> dg.MaterializeResult:
output_dir = context.op_config["output_dir"]
cmd = [shutil.which("Rscript"), file_relative_path(__file__, "./R/iris.R")]
return pipes_subprocess_client.run(
cmd = [shutil.which("Rscript"), dg.file_relative_path(__file__, "./R/iris.R")]
result = pipes_subprocess_client.run(
command=cmd,
context=context,
env={
"MY_ENV_VAR_IN_SUBPROCESS": "This is an environment variable passed from Dagster to R!",
"OUTPUT_DIR": output_dir,
},
).get_materialize_result()
)

result_message = result.get_custom_messages()[0]
schema = create_table_schema_from_dict(result_message.get("column_types"))

context.add_output_metadata(output_name = "result", metadata={
"dagster/row_count": dg.MetadataValue.int(result_message.get("dagster/row_count")),
"preview": dg.MetadataValue.md(result_message.get("preview")),
"dagster/column_schema": schema,
})

return result.get_results()



@asset(deps=[iris_r])
@dg.asset(deps=[iris_r])
def iris_py(context):
# TODO replace hardcoded output_dir with resource key
iris = pd.read_csv(f"data/iris.csv")
Expand Down
30 changes: 30 additions & 0 deletions dagster_and_r/definitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from dagster import (
Definitions,
PipesSubprocessClient,
)
from . jobs import docker_container_op_r
from . asset_checks import (
# no_missing_sepal_length_check_r,
no_missing_sepal_length_check_py,
)

# python_assets = load_assets_from_modules([assets])
from . assets import (
hello_world_r,
iris_r,
iris_py,
)

defs = Definitions(
assets=[
hello_world_r,
iris_r,
iris_py,
],
asset_checks=[
# no_missing_sepal_length_check_r,
no_missing_sepal_length_check_py,
],
jobs=[docker_container_op_r],
resources={"pipes_subprocess_client": PipesSubprocessClient()},
)
6 changes: 6 additions & 0 deletions dagster_and_r/hello.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
def main():
print("Hello from dagster-and-r!")


if __name__ == "__main__":
main()
6 changes: 6 additions & 0 deletions hello.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
def main():
print("Hello from dagster-and-r!")


if __name__ == "__main__":
main()
Loading