Skip to content

Commit b273c47

Browse files
committed
feat(dbt): collect table schema metadata using the dagster dbt package
1 parent 1512b9c commit b273c47

File tree

7 files changed

+114
-9
lines changed

7 files changed

+114
-9
lines changed

python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
from typing_extensions import Literal
4444

4545
from ..asset_utils import (
46+
default_metadata_from_dbt_resource_props,
4647
get_manifest_and_translator_from_dbt_assets,
4748
output_name_fn,
4849
)
@@ -147,10 +148,16 @@ def to_default_asset_events(
147148
"No dbt manifest was provided. Dagster events for dbt tests will not be created."
148149
)
149150

151+
unique_id: str = event_node_info["unique_id"]
152+
invocation_id: str = self.raw_event["info"]["invocation_id"]
153+
dbt_resource_props = manifest["nodes"][unique_id]
154+
default_metadata = {
155+
**default_metadata_from_dbt_resource_props(dbt_resource_props.get("dagster", {})),
156+
"unique_id": unique_id,
157+
"invocation_id": invocation_id,
158+
}
150159
has_asset_def: bool = bool(context and context.has_assets_def)
151160

152-
invocation_id: str = self.raw_event["info"]["invocation_id"]
153-
unique_id: str = event_node_info["unique_id"]
154161
node_resource_type: str = event_node_info["resource_type"]
155162
node_status: str = event_node_info["node_status"]
156163
node_materialization: str = self.raw_event["data"]["node_info"]["materialized"]
@@ -172,8 +179,7 @@ def to_default_asset_events(
172179
value=None,
173180
output_name=output_name_fn(event_node_info),
174181
metadata={
175-
"unique_id": unique_id,
176-
"invocation_id": invocation_id,
182+
**default_metadata,
177183
"Execution Duration": duration_seconds,
178184
**adapter_response_metadata,
179185
},
@@ -185,8 +191,7 @@ def to_default_asset_events(
185191
yield AssetMaterialization(
186192
asset_key=asset_key,
187193
metadata={
188-
"unique_id": unique_id,
189-
"invocation_id": invocation_id,
194+
**default_metadata,
190195
"Execution Duration": duration_seconds,
191196
**adapter_response_metadata,
192197
},
@@ -195,8 +200,7 @@ def to_default_asset_events(
195200
upstream_unique_ids: List[str] = manifest["parent_map"][unique_id]
196201
test_resource_props = manifest["nodes"][unique_id]
197202
metadata = {
198-
"unique_id": unique_id,
199-
"invocation_id": invocation_id,
203+
**default_metadata,
200204
"status": node_status,
201205
**adapter_response_metadata,
202206
}

python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_resources_v2.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,7 @@ def test_no_default_asset_events_emitted(data: dict) -> None:
497497
},
498498
"data": data,
499499
}
500-
).to_default_asset_events(manifest={})
500+
).to_default_asset_events(manifest={"nodes": {"a.b.c": {}}})
501501

502502
assert list(asset_events) == []
503503

python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,4 @@ venv/
99
env/
1010
**/*.duckdb
1111
**/*.duckdb.wal
12+
package-lock.yml

python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/dbt_project.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,13 @@ clean-targets:
2020
require-dbt-version: [">=1.0.0", "<2.0.0"]
2121

2222
models:
23+
+post-hook:
24+
- "{{ dagster.dagster__log_columns_in_relation() }}"
2325
test_dagster_metadata:
2426
materialized: table
2527
staging:
2628
materialized: view
29+
30+
seeds:
31+
+post-hook:
32+
- "{{ dagster.dagster__log_columns_in_relation() }}"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
packages:
2+
- local: "../../../packages/dagster"

python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/manifest.json

Lines changed: 1 addition & 0 deletions
Large diffs are not rendered by default.
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
import json
2+
import os
3+
from pathlib import Path
4+
from typing import Iterator
5+
6+
import pytest
7+
from dagster import AssetKey, AssetMaterialization, TableColumn, TableSchema
8+
from dagster_dbt.core.resources_v2 import DbtCliResource
9+
from pytest_mock import MockerFixture
10+
11+
test_no_dagster_metadata_dbt_project_dir = (
12+
Path(__file__).joinpath("..", "..", "dbt_projects", "test_dagster_meta_config").resolve()
13+
)
14+
test_no_dagster_metadata_manifest = json.loads(
15+
test_no_dagster_metadata_dbt_project_dir.joinpath("manifest.json").read_bytes()
16+
)
17+
18+
test_dagster_metadata_dbt_project_dir = (
19+
Path(__file__).joinpath("..", "..", "dbt_projects", "test_dagster_metadata").resolve()
20+
)
21+
test_dagster_metadata_manifest = json.loads(
22+
test_dagster_metadata_dbt_project_dir.joinpath("manifest.json").read_bytes()
23+
)
24+
25+
26+
@pytest.fixture(name="dbt")
27+
def dbt_fixture() -> Iterator[DbtCliResource]:
28+
dbt = DbtCliResource(project_dir=os.fspath(test_dagster_metadata_dbt_project_dir))
29+
30+
dbt.cli(["deps"]).wait()
31+
32+
yield dbt
33+
34+
35+
def test_no_table_schema_metadata(mocker: MockerFixture) -> None:
36+
mock_context = mocker.MagicMock()
37+
mock_context.assets_def = None
38+
mock_context.has_assets_def = False
39+
40+
dbt = DbtCliResource(project_dir=os.fspath(test_no_dagster_metadata_dbt_project_dir))
41+
42+
events = list(
43+
dbt.cli(
44+
["build"],
45+
manifest=test_no_dagster_metadata_manifest,
46+
context=mock_context,
47+
).stream()
48+
)
49+
materializations_by_asset_key = {
50+
dagster_event.asset_key: dagster_event
51+
for dagster_event in events
52+
if isinstance(dagster_event, AssetMaterialization)
53+
}
54+
customers_materialization = materializations_by_asset_key[AssetKey(["customers"])]
55+
56+
assert "table_schema" not in customers_materialization.metadata
57+
58+
59+
def test_table_schema_metadata(mocker: MockerFixture, dbt: DbtCliResource) -> None:
60+
mock_context = mocker.MagicMock()
61+
mock_context.assets_def = None
62+
mock_context.has_assets_def = False
63+
64+
events = list(
65+
dbt.cli(
66+
["build"],
67+
manifest=test_dagster_metadata_manifest,
68+
context=mock_context,
69+
).stream()
70+
)
71+
materializations_by_asset_key = {
72+
dagster_event.asset_key: dagster_event
73+
for dagster_event in events
74+
if isinstance(dagster_event, AssetMaterialization)
75+
}
76+
customers_materialization = materializations_by_asset_key[AssetKey(["customers"])]
77+
78+
assert (
79+
TableSchema(
80+
columns=[
81+
TableColumn("customer_id", type="INTEGER"),
82+
TableColumn("first_name", type="character varying(256)"),
83+
TableColumn("last_name", type="character varying(256)"),
84+
TableColumn("first_order", type="DATE"),
85+
TableColumn("most_recent_order", type="DATE"),
86+
TableColumn("number_of_orders", type="BIGINT"),
87+
TableColumn("customer_lifetime_value", type="DOUBLE"),
88+
]
89+
)
90+
== customers_materialization.metadata["table_schema"].value
91+
)

0 commit comments

Comments
 (0)