Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dbt): emit table schema metadata #19548

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import dateutil.parser
import orjson
import yaml
from dagster import (
AssetCheckResult,
AssetCheckSeverity,
Expand All @@ -44,6 +45,7 @@
from typing_extensions import Literal

from ..asset_utils import (
default_metadata_from_dbt_resource_props,
get_manifest_and_translator_from_dbt_assets,
output_name_fn,
)
Expand All @@ -57,6 +59,11 @@
validate_manifest,
)
from ..errors import DagsterDbtCliRuntimeError
from ..include.macros import (
DAGSTER_DBT_TABLE_SCHEMA_MACRO_INVOCATION,
DAGSTER_DBT_TABLE_SCHEMA_MACRO_NAME,
DAGSTER_DBT_TABLE_SCHEMA_MACRO_PATH,
)
from ..utils import ASSET_RESOURCE_TYPES, get_dbt_resource_props_by_dbt_unique_id_from_manifest

logger = get_dagster_logger()
Expand Down Expand Up @@ -141,10 +148,12 @@ def to_default_asset_events(
"No dbt manifest was provided. Dagster events for dbt tests will not be created."
)

unique_id: str = event_node_info["unique_id"]
dbt_resource_props = manifest["nodes"][unique_id]
metadata = default_metadata_from_dbt_resource_props(dbt_resource_props.get("dagster", {}))
has_asset_def: bool = bool(context and context.has_assets_def)

invocation_id: str = self.raw_event["info"]["invocation_id"]
unique_id: str = event_node_info["unique_id"]
node_resource_type: str = event_node_info["resource_type"]
node_status: str = event_node_info["node_status"]

Expand All @@ -160,18 +169,19 @@ def to_default_asset_events(
value=None,
output_name=output_name_fn(event_node_info),
metadata={
**metadata,
"unique_id": unique_id,
"invocation_id": invocation_id,
"Execution Duration": duration_seconds,
},
)
else:
dbt_resource_props = manifest["nodes"][unique_id]
asset_key = dagster_dbt_translator.get_asset_key(dbt_resource_props)

yield AssetMaterialization(
asset_key=asset_key,
metadata={
**metadata,
"unique_id": unique_id,
"invocation_id": invocation_id,
"Execution Duration": duration_seconds,
Expand Down Expand Up @@ -286,6 +296,28 @@ def run(
partial_parse_destination_target_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy(partial_parse_file_path, partial_parse_destination_target_path)

# Attempt to edit the dbt_project.yml file to include the macro to generate table schema
rexledesma marked this conversation as resolved.
Show resolved Hide resolved
# information.
if dagster_dbt_translator.settings.enable_table_schema_metadata:
dbt_project_yml_path = project_dir.joinpath(DBT_PROJECT_YML_NAME)
dbt_project_yml = yaml.safe_load(dbt_project_yml_path.read_bytes())

macro_destination_path = project_dir.joinpath(
"macros", DAGSTER_DBT_TABLE_SCHEMA_MACRO_NAME
)

macro_destination_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy(DAGSTER_DBT_TABLE_SCHEMA_MACRO_PATH, macro_destination_path)

for resource_type in ["models", "seeds", "snapshots"]:
resource = dbt_project_yml.setdefault(resource_type, {})
post_hooks = resource.setdefault("+post-hook", [])

if DAGSTER_DBT_TABLE_SCHEMA_MACRO_INVOCATION not in post_hooks:
post_hooks.append(DAGSTER_DBT_TABLE_SCHEMA_MACRO_INVOCATION)

dbt_project_yml_path.write_text(yaml.dump(dbt_project_yml))

# Create a subprocess that runs the dbt CLI command.
process = subprocess.Popen(
args=args,
Expand Down Expand Up @@ -411,6 +443,23 @@ def stream_raw_events(self) -> Iterator[DbtCliEventMessage]:
if is_error_message:
self._error_messages.append(str(event))

if (
self.dagster_dbt_translator.settings.enable_table_schema_metadata
and event.raw_event["info"]["name"] == "JinjaLogInfo"
):
unique_id: str = event.raw_event["data"]["node_info"]["unique_id"]
dbt_resource_props = self.manifest["nodes"][unique_id]

# Attempt to parse the table schema from the event message.
# If it exists, then save it as metadata for the dbt node in the
# manifest.
with contextlib.suppress(orjson.JSONDecodeError):
table_schema = orjson.loads(event.raw_event["info"]["msg"])
dbt_resource_props["dagster"] = {"columns": table_schema}

# Don't show this message in stdout
continue

# Re-emit the logs from dbt CLI process into stdout.
sys.stdout.write(str(event) + "\n")
sys.stdout.flush()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@ class DagsterDbtTranslatorSettings:
Args:
enable_asset_checks (bool): Whether to load dbt tests as Dagster asset checks.
Defaults to False.
enable_table_schema_metadata (bool): Whether to emit table schema metadata when
materializing dbt models, seeds, or snapshots. Defaults to False.
"""

enable_asset_checks: bool = False
enable_table_schema_metadata: bool = False


class DagsterDbtTranslator:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from pathlib import Path

DAGSTER_DBT_TABLE_SCHEMA_MACRO_NAME = "dagster__log_columns_in_relation.sql"
DAGSTER_DBT_TABLE_SCHEMA_MACRO_PATH = (
Path(__file__).joinpath("..").resolve().joinpath(DAGSTER_DBT_TABLE_SCHEMA_MACRO_NAME)
)
DAGSTER_DBT_TABLE_SCHEMA_MACRO_INVOCATION = r"{{ dagster__log_columns_in_relation() }}"
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{% macro dagster__log_columns_in_relation() %}
{%- set columns = adapter.get_columns_in_relation(this) -%}
{%- set table_schema = {} -%}

{% for column in columns %}
{%- set serializable_column = {column.name: {'data_type': column.data_type}} -%}
rexledesma marked this conversation as resolved.
Show resolved Hide resolved
{%- set _ = table_schema.update(serializable_column) -%}
{% endfor %}

{% do log(tojson(table_schema), info=true) %}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ def test_no_default_asset_events_emitted(data: dict) -> None:
},
"data": data,
}
).to_default_asset_events(manifest={})
).to_default_asset_events(manifest={"nodes": {"a.b.c": {}}})

assert list(asset_events) == []

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import json
import os
from pathlib import Path

from dagster import AssetKey, AssetMaterialization, TableColumn, TableSchema
from dagster_dbt.core.resources_v2 import DbtCliResource
from dagster_dbt.dagster_dbt_translator import DagsterDbtTranslator, DagsterDbtTranslatorSettings
from pytest_mock import MockerFixture

test_dagster_metadata_dbt_project_dir = test_dagster_metadata_manifest_path = (
Path(__file__).joinpath("..", "dbt_projects", "test_dagster_metadata").resolve()
)
test_dagster_metadata_manifest_path = test_dagster_metadata_dbt_project_dir.joinpath(
"manifest.json"
)
test_dagster_metadata_manifest = json.loads(test_dagster_metadata_manifest_path.read_bytes())

dagster_dbt_translator_with_table_schema_metadata = DagsterDbtTranslator(
settings=DagsterDbtTranslatorSettings(enable_table_schema_metadata=True)
)


def test_no_table_schema_metadata(mocker: MockerFixture) -> None:
mock_context = mocker.MagicMock()
mock_context.assets_def = None
mock_context.has_assets_def = False

dbt = DbtCliResource(project_dir=os.fspath(test_dagster_metadata_dbt_project_dir))

events = list(
dbt.cli(
["build"],
manifest=test_dagster_metadata_manifest,
context=mock_context,
).stream()
)
materializations_by_asset_key = {
dagster_event.asset_key: dagster_event
for dagster_event in events
if isinstance(dagster_event, AssetMaterialization)
}
customers_materialization = materializations_by_asset_key[AssetKey(["customers"])]

assert "table_schema" not in customers_materialization.metadata


def test_table_schema_metadata(mocker: MockerFixture) -> None:
mock_context = mocker.MagicMock()
mock_context.assets_def = None
mock_context.has_assets_def = False

dbt = DbtCliResource(project_dir=os.fspath(test_dagster_metadata_dbt_project_dir))

events = list(
dbt.cli(
["build"],
manifest=test_dagster_metadata_manifest,
dagster_dbt_translator=dagster_dbt_translator_with_table_schema_metadata,
context=mock_context,
).stream()
)
materializations_by_asset_key = {
dagster_event.asset_key: dagster_event
for dagster_event in events
if isinstance(dagster_event, AssetMaterialization)
}
customers_materialization = materializations_by_asset_key[AssetKey(["customers"])]

assert (
TableSchema(
columns=[
TableColumn("customer_id", type="INTEGER"),
TableColumn("first_name", type="character varying(256)"),
TableColumn("last_name", type="character varying(256)"),
TableColumn("first_order", type="DATE"),
TableColumn("most_recent_order", type="DATE"),
TableColumn("number_of_orders", type="BIGINT"),
TableColumn("customer_lifetime_value", type="DOUBLE"),
]
)
== customers_materialization.metadata["table_schema"].value
)