Skip to content

Commit 22f5a27

Browse files
authored
feat(dbt): collect columns metadata using the dagster dbt package (#19631)
## Summary & Motivation Rework #19548 on top of #19623. ## How I Tested These Changes pytest
1 parent 68b540a commit 22f5a27

File tree

10 files changed

+157
-25
lines changed

10 files changed

+157
-25
lines changed

python_modules/libraries/dagster-dbt/dagster_dbt/asset_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ def default_metadata_from_dbt_resource_props(
362362
metadata: Dict[str, Any] = {}
363363
columns = dbt_resource_props.get("columns", {})
364364
if len(columns) > 0:
365-
metadata["table_schema"] = MetadataValue.table_schema(
365+
metadata["columns"] = MetadataValue.table_schema(
366366
TableSchema(
367367
columns=[
368368
TableColumn(

python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py

Lines changed: 45 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
1+
import contextlib
2+
import copy
13
import os
24
import shutil
35
import signal
46
import subprocess
57
import sys
68
import uuid
79
from contextlib import suppress
8-
from dataclasses import dataclass, field
10+
from dataclasses import InitVar, dataclass, field
911
from pathlib import Path
1012
from typing import (
1113
Any,
@@ -44,6 +46,7 @@
4446

4547
from ..asset_utils import (
4648
dagster_name_fn,
49+
default_metadata_from_dbt_resource_props,
4750
get_manifest_and_translator_from_dbt_assets,
4851
)
4952
from ..dagster_dbt_translator import (
@@ -80,19 +83,15 @@ class DbtCliEventMessage:
8083
raw_event (Dict[str, Any]): The raw event dictionary.
8184
See https://docs.getdbt.com/reference/events-logging#structured-logging for more
8285
information.
86+
event_history_metadata (Dict[str, Any]): A dictionary of metadata about the
87+
current event, gathered from previous historical events.
8388
"""
8489

8590
raw_event: Dict[str, Any]
91+
event_history_metadata: InitVar[Dict[str, Any]]
8692

87-
@classmethod
88-
def from_log(cls, log: str) -> "DbtCliEventMessage":
89-
"""Parse an event according to https://docs.getdbt.com/reference/events-logging#structured-logging.
90-
91-
We assume that the log format is json.
92-
"""
93-
raw_event: Dict[str, Any] = orjson.loads(log)
94-
95-
return cls(raw_event=raw_event)
93+
def __post_init__(self, event_history_metadata: Dict[str, Any]):
94+
self._event_history_metadata = event_history_metadata
9695

9796
def __str__(self) -> str:
9897
return self.raw_event["info"]["msg"]
@@ -147,10 +146,16 @@ def to_default_asset_events(
147146
"No dbt manifest was provided. Dagster events for dbt tests will not be created."
148147
)
149148

149+
unique_id: str = event_node_info["unique_id"]
150+
invocation_id: str = self.raw_event["info"]["invocation_id"]
151+
dbt_resource_props = manifest["nodes"][unique_id]
152+
default_metadata = {
153+
**default_metadata_from_dbt_resource_props(self._event_history_metadata),
154+
"unique_id": unique_id,
155+
"invocation_id": invocation_id,
156+
}
150157
has_asset_def: bool = bool(context and context.has_assets_def)
151158

152-
invocation_id: str = self.raw_event["info"]["invocation_id"]
153-
unique_id: str = event_node_info["unique_id"]
154159
node_resource_type: str = event_node_info["resource_type"]
155160
node_status: str = event_node_info["node_status"]
156161
node_materialization: str = self.raw_event["data"]["node_info"]["materialized"]
@@ -172,8 +177,7 @@ def to_default_asset_events(
172177
value=None,
173178
output_name=dagster_name_fn(event_node_info),
174179
metadata={
175-
"unique_id": unique_id,
176-
"invocation_id": invocation_id,
180+
**default_metadata,
177181
"Execution Duration": duration_seconds,
178182
**adapter_response_metadata,
179183
},
@@ -185,8 +189,7 @@ def to_default_asset_events(
185189
yield AssetMaterialization(
186190
asset_key=asset_key,
187191
metadata={
188-
"unique_id": unique_id,
189-
"invocation_id": invocation_id,
192+
**default_metadata,
190193
"Execution Duration": duration_seconds,
191194
**adapter_response_metadata,
192195
},
@@ -195,8 +198,7 @@ def to_default_asset_events(
195198
upstream_unique_ids: List[str] = manifest["parent_map"][unique_id]
196199
test_resource_props = manifest["nodes"][unique_id]
197200
metadata = {
198-
"unique_id": unique_id,
199-
"invocation_id": invocation_id,
201+
**default_metadata,
200202
"status": node_status,
201203
**adapter_response_metadata,
202204
}
@@ -460,9 +462,21 @@ def stream_raw_events(self) -> Iterator[DbtCliEventMessage]:
460462
Returns:
461463
Iterator[DbtCliEventMessage]: An iterator of events from the dbt CLI process.
462464
"""
465+
event_history_metadata_by_unique_id: Dict[str, Dict[str, Any]] = {}
466+
463467
for log in self._stdout or self._stream_stdout():
464468
try:
465-
event = DbtCliEventMessage.from_log(log=log)
469+
raw_event: Dict[str, Any] = orjson.loads(log)
470+
unique_id: Optional[str] = raw_event["data"].get("node_info", {}).get("unique_id")
471+
event_history_metadata: Dict[str, Any] = {}
472+
if unique_id and raw_event["info"]["name"] == "NodeFinished":
473+
event_history_metadata = copy.deepcopy(
474+
event_history_metadata_by_unique_id.get(unique_id, {})
475+
)
476+
477+
event = DbtCliEventMessage(
478+
raw_event=raw_event, event_history_metadata=event_history_metadata
479+
)
466480

467481
is_error_message = event.log_level == "error"
468482
is_debug_message = event.log_level == "debug"
@@ -472,6 +486,18 @@ def stream_raw_events(self) -> Iterator[DbtCliEventMessage]:
472486
if is_error_message:
473487
self._error_messages.append(str(event))
474488

489+
# Attempt to parse the columns metadata from the event message.
490+
# If it exists, save it as historical metadata to attach to the NodeFinished event.
491+
if event.raw_event["info"]["name"] == "JinjaLogInfo":
492+
with contextlib.suppress(orjson.JSONDecodeError):
493+
columns = orjson.loads(event.raw_event["info"]["msg"])
494+
event_history_metadata_by_unique_id[cast(str, unique_id)] = {
495+
"columns": columns
496+
}
497+
498+
# Don't show this message in stdout
499+
continue
500+
475501
# Only write debug logs to stdout if the user explicitly set
476502
# the log level to debug.
477503
if not is_debug_message or is_debug_user_log_level:

python_modules/libraries/dagster-dbt/dagster_dbt_tests/conftest.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@
1212
test_asset_key_exceptions_path,
1313
test_dbt_alias_path,
1414
test_dbt_python_interleaving_path,
15+
test_jaffle_shop_path,
1516
test_meta_config_path,
17+
test_metadata_path,
1618
)
1719

1820
# ======= CONFIG ========
@@ -94,12 +96,19 @@ def dbt_build(dbt_executable, dbt_config_dir):
9496

9597

9698
def _create_dbt_manifest(project_dir: Path) -> Dict[str, Any]:
97-
dbt = DbtCliResource(project_dir=os.fspath(project_dir))
98-
dbt_invocation = dbt.cli(["--quiet", "compile"]).wait()
99+
dbt = DbtCliResource(project_dir=os.fspath(project_dir), global_config_flags=["--quiet"])
100+
101+
dbt.cli(["deps"]).wait()
102+
dbt_invocation = dbt.cli(["compile"]).wait()
99103

100104
return dbt_invocation.get_artifact("manifest.json")
101105

102106

107+
@pytest.fixture(name="test_jaffle_shop_manifest", scope="session")
108+
def test_jaffle_shop_manifest_fixture() -> Dict[str, Any]:
109+
return _create_dbt_manifest(test_jaffle_shop_path)
110+
111+
103112
@pytest.fixture(name="test_asset_checks_manifest", scope="session")
104113
def test_asset_checks_manifest_fixture() -> Dict[str, Any]:
105114
return _create_dbt_manifest(test_asset_checks_path)
@@ -123,3 +132,8 @@ def test_dbt_python_interleaving_manifest_fixture() -> Dict[str, Any]:
123132
@pytest.fixture(name="test_meta_config_manifest", scope="session")
124133
def test_meta_config_manifest_fixture() -> Dict[str, Any]:
125134
return _create_dbt_manifest(test_meta_config_path)
135+
136+
137+
@pytest.fixture(name="test_metadata_manifest", scope="session")
138+
def test_metadata_manifest_fixture() -> Dict[str, Any]:
139+
return _create_dbt_manifest(test_metadata_path)

python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_resources_v2.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -487,8 +487,9 @@ def test_no_default_asset_events_emitted(data: dict) -> None:
487487
"invocation_id": "1-2-3",
488488
},
489489
"data": data,
490-
}
491-
).to_default_asset_events(manifest={})
490+
},
491+
event_history_metadata={},
492+
).to_default_asset_events(manifest={"nodes": {"a.b.c": {}}})
492493

493494
assert list(asset_events) == []
494495

@@ -531,7 +532,9 @@ def test_to_default_asset_output_events() -> None:
531532
}
532533

533534
asset_events = list(
534-
DbtCliEventMessage(raw_event=raw_event).to_default_asset_events(manifest=manifest)
535+
DbtCliEventMessage(raw_event=raw_event, event_history_metadata={}).to_default_asset_events(
536+
manifest=manifest
537+
)
535538
)
536539

537540
assert len(asset_events) == 1

python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_packages/__init__.py

Whitespace-only changes.
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
import os
2+
from typing import Any, Dict, cast
3+
4+
from dagster import (
5+
AssetExecutionContext,
6+
Output,
7+
TableColumn,
8+
TableSchema,
9+
materialize,
10+
)
11+
from dagster_dbt.asset_decorator import dbt_assets
12+
from dagster_dbt.core.resources_v2 import DbtCliResource
13+
14+
from ..dbt_projects import test_jaffle_shop_path, test_metadata_path
15+
16+
17+
def test_no_columns_metadata(test_jaffle_shop_manifest: Dict[str, Any]) -> None:
18+
@dbt_assets(manifest=test_jaffle_shop_manifest)
19+
def assert_no_columns_metadata(context: AssetExecutionContext, dbt: DbtCliResource):
20+
events = list(dbt.cli(["build"], context=context).stream())
21+
output_by_dbt_unique_id: Dict[str, Output] = {
22+
cast(str, dagster_event.metadata["unique_id"].value): dagster_event
23+
for dagster_event in events
24+
if isinstance(dagster_event, Output)
25+
}
26+
27+
for output in output_by_dbt_unique_id.values():
28+
assert "columns" not in output.metadata
29+
30+
yield from events
31+
32+
result = materialize(
33+
[assert_no_columns_metadata],
34+
resources={"dbt": DbtCliResource(project_dir=os.fspath(test_jaffle_shop_path))},
35+
)
36+
37+
assert result.success
38+
39+
40+
def test_columns_metadata(test_metadata_manifest: Dict[str, Any]) -> None:
41+
@dbt_assets(manifest=test_metadata_manifest)
42+
def assert_columns_metadata(context: AssetExecutionContext, dbt: DbtCliResource):
43+
events = list(dbt.cli(["build"], context=context).stream())
44+
output_by_dbt_unique_id: Dict[str, Output] = {
45+
cast(str, dagster_event.metadata["unique_id"].value): dagster_event
46+
for dagster_event in events
47+
if isinstance(dagster_event, Output)
48+
}
49+
50+
for output in output_by_dbt_unique_id.values():
51+
assert "columns" in output.metadata
52+
53+
customers_output = output_by_dbt_unique_id["model.test_dagster_metadata.customers"]
54+
assert (
55+
TableSchema(
56+
columns=[
57+
TableColumn("customer_id", type="INTEGER"),
58+
TableColumn("first_name", type="character varying(256)"),
59+
TableColumn("last_name", type="character varying(256)"),
60+
TableColumn("first_order", type="DATE"),
61+
TableColumn("most_recent_order", type="DATE"),
62+
TableColumn("number_of_orders", type="BIGINT"),
63+
TableColumn("customer_lifetime_value", type="DOUBLE"),
64+
]
65+
)
66+
== customers_output.metadata["columns"].value
67+
)
68+
69+
yield from events
70+
71+
result = materialize(
72+
[assert_columns_metadata],
73+
resources={"dbt": DbtCliResource(project_dir=os.fspath(test_metadata_path))},
74+
)
75+
76+
assert result.success

python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22

33
projects_path = Path(__file__).joinpath("..").resolve()
44

5+
test_jaffle_shop_path = projects_path.joinpath("jaffle_shop")
56
test_asset_checks_path = projects_path.joinpath("test_dagster_asset_checks")
67
test_asset_key_exceptions_path = projects_path.joinpath("test_dagster_asset_key_exceptions")
78
test_dbt_alias_path = projects_path.joinpath("test_dagster_dbt_alias")
89
test_dbt_python_interleaving_path = projects_path.joinpath("test_dagster_dbt_python_interleaving")
910
test_exceptions_path = projects_path.joinpath("test_dagster_exceptions")
1011
test_meta_config_path = projects_path.joinpath("test_dagster_meta_config")
12+
test_metadata_path = projects_path.joinpath("test_dagster_metadata")

python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,4 @@ venv/
99
env/
1010
**/*.duckdb
1111
**/*.duckdb.wal
12+
package-lock.yml

python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/dbt_project.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,13 @@ clean-targets:
2020
require-dbt-version: [">=1.0.0", "<2.0.0"]
2121

2222
models:
23+
+post-hook:
24+
- "{{ dagster.log_columns_in_relation() }}"
2325
test_dagster_metadata:
2426
materialized: table
2527
staging:
2628
materialized: view
29+
30+
seeds:
31+
+post-hook:
32+
- "{{ dagster.log_columns_in_relation() }}"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# We keep use `packages.yml` for compatability with `dbt-core==1.5.*`.
2+
# Once we remove support for that version, we should rename this file to `dependencies.yml`
3+
packages:
4+
- local: "../../../dbt_packages/dagster"

0 commit comments

Comments
 (0)