Skip to content

Commit

Permalink
Feat: Add macro to resolve physical table names and enable its use in…
Browse files Browse the repository at this point in the history
… physical_properties
  • Loading branch information
erindru committed Feb 4, 2025
1 parent 129326d commit e239d4b
Show file tree
Hide file tree
Showing 8 changed files with 414 additions and 55 deletions.
40 changes: 40 additions & 0 deletions docs/concepts/macros/sqlmesh_macros.md
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,46 @@ Note: This is DuckDB SQL and other dialects will be transpiled accordingly.
- Recursive CTEs (common table expressions) will be used for `Redshift / MySQL / MSSQL`.
- For `MSSQL` in particular, there's a recursion limit of approximately 100. If this becomes a problem, you can add an `OPTION (MAXRECURSION 0)` clause after the date spine macro logic to remove the limit. This applies for long date ranges.

### @PHYSICAL_LOCATION

`@PHYSICAL_LOCATION` is a helper macro intended to be used in situations where you need to gain access to the *components* of the physical table name. It's intended for use in the following situations:

- Providing explicit control over table locations on a per-model basis for engines that decouple storage and compute (such as Athena, Trino, Spark etc)
- Generating references to engine-specific metadata tables that are derived from the physical table name, such as the [`<table>$properties`](https://trino.io/docs/current/connector/iceberg.html#metadata-tables) metadata table in Trino.

Under the hood, it uses the `@this_model` variable so it can only be used during the `creating` and `evaluation` [runtime stages](./macro_variables.md#runtime-variables).

The `@PHYSICAL_LOCATION` supports the following arguments:

- `template` - The string template to render into
- `mode` - What to return after rendering the template. Valid values are `literal` or `table`. Defaults to `literal`.

The `template` can contain the following placeholders that will be substituted:

- `@{catalog_name}` - The name of the catalog, eg `datalake`
- `@{schema_name}` - The name of the physical schema that SQLMesh is using for the model version table, eg `sqlmesh__landing`
- `@{table_name}` - The name of the physical table that SQLMesh is using for the model version, eg `landing__customers__2517971505`

It can be used in a `MODEL` block:

```sql linenums="1" hl_lines="5"
MODEL (
name datalake.landing.customers,
...
physical_properties (
location = @physical_location('s3://warehouse-data/@{catalog_name}/prod/@{schema_name}/@{table_name}')
)
);
-- CREATE TABLE "datalake"."sqlmesh__landing"."landing__customers__2517971505" ...
-- WITH (location = 's3://warehouse-data/datalake/prod/sqlmesh__landing/landing__customers__2517971505')
```

And also within a query, using `mode := 'table'`:

```sql linenums="1"
SELECT * FROM @physical_location('@{catalog_name}.@{schema_name}.@{table_name}$properties', mode := 'table')
-- SELECT * FROM "datalake"."sqlmesh__landing"."landing__customers__2517971505$properties"
```

### @AND

Expand Down
8 changes: 5 additions & 3 deletions docs/integrations/engines/trino.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,14 +185,16 @@ This would perform the following mappings:

Often, you dont need to configure an explicit table location because if you have configured explicit schema locations, table locations are automatically inferred by Trino to be a subdirectory under the schema location.

However, if you need to, you can configure an explicit table location by adding a `location` property to the model `physical_properties`:
However, if you need to, you can configure an explicit table location by adding a `location` property to the model `physical_properties`.

```
Note that you need to use the [@physical_location](../../concepts/macros/sqlmesh_macros.md#physical_location) macro to generate a unique table location for each model version. Otherwise, all model versions will be written to the same location and clobber each other.

```sql hl_lines="5"
MODEL (
name staging.customers,
kind FULL,
physical_properties (
location = 's3://warehouse/staging/customers'
location = @physical_location('s3://warehouse/@{catalog_name}/@{schema_name}/@{table_name}')
)
);
Expand Down
53 changes: 53 additions & 0 deletions sqlmesh/core/macros.py
Original file line number Diff line number Diff line change
Expand Up @@ -1183,6 +1183,59 @@ def date_spine(
return exp.select(alias_name).from_(exploded)


@macro()
def physical_location(
evaluator: MacroEvaluator,
template: exp.Literal,
mode: exp.Literal = exp.Literal.string("literal"),
) -> t.Union[exp.Literal, exp.Table]:
"""
Generates a either a String literal or an exp.Table representing a physical table location, based on rendering the provided template String literal.
Note: It relies on the @this_model variable being available in the evaluation context (@this_model resolves to an exp.Table object
representing the current physical table).
Therefore, the @physical_location macro must be used at creation or evaluation time and not at load time.
Args:
template: Template string literal. Can contain the following placeholders:
@{catalog_name} -> replaced with the catalog of the exp.Table returned from @this_model
@{schema_name} -> replaced with the schema of the exp.Table returned from @this_model
@{table_name} -> replaced with the name of the exp.Table returned from @this_model
mode: What to return.
'literal' -> return an exp.Literal string
'table' -> return an exp.Table
Example:
>>> from sqlglot import parse_one, exp
>>> from sqlmesh.core.macros import MacroEvaluator, RuntimeStage
>>> sql = "@physical_location('s3://data-bucket/prod/@{catalog_name}/@{schema_name}/@{table_name}')"
>>> evaluator = MacroEvaluator(runtime_stage=RuntimeStage.CREATING)
>>> evaluator.locals.update({"this_model": exp.to_table("test_catalog.sqlmesh__test.test__test_model__2517971505")})
>>> evaluator.transform(parse_one(sql)).sql()
"'s3://data-bucket/prod/test_catalog/sqlmesh__test/test__test_model__2517971505'"
"""
if evaluator.runtime_stage != "loading":
if "this_model" not in evaluator.locals:
raise SQLMeshError(
"@this_model must be present in the macro evaluation context in order to use @physical_location"
)

this_model = exp.to_table(evaluator.locals["this_model"])
template_str: str = template.this
result = (
template_str.replace("@{catalog_name}", this_model.catalog)
.replace("@{schema_name}", this_model.db)
.replace("@{table_name}", this_model.name)
)

if mode.this.lower() == "table":
return exp.to_table(result)
else:
return exp.Literal.string(result)

return template


def normalize_macro_name(name: str) -> str:
"""Prefix macro name with @ and upcase"""
return f"@{name.upper()}"
Expand Down
47 changes: 34 additions & 13 deletions sqlmesh/core/model/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,26 @@ def render_merge_filter(
raise SQLMeshError(f"Expected one expression but got {len(rendered_exprs)}")
return rendered_exprs[0].transform(d.replace_merge_table_aliases)

def render_physical_properties(self, **render_kwargs: t.Any) -> t.Dict[str, exp.Expression]:
def _render(expression: exp.Expression) -> exp.Expression:
# note: we use the _statement_renderer instead of _create_renderer because it sets model_fqn which
# in turn makes @this_model available in the evaluation context
rendered_exprs = self._statement_renderer(expression).render(**render_kwargs)

if not rendered_exprs:
raise SQLMeshError(
f"Expected rendering '{expression.sql(dialect=self.dialect)}' to return an expression"
)

if len(rendered_exprs) != 1:
raise SQLMeshError(
f"Expected one result when rendering '{expression.sql(dialect=self.dialect)}' but got {len(rendered_exprs)}"
)

return rendered_exprs[0]

return {k: _render(v) for k, v in self.physical_properties.items()}

def _create_renderer(self, expression: exp.Expression) -> ExpressionRenderer:
return ExpressionRenderer(
expression,
Expand Down Expand Up @@ -1784,16 +1804,16 @@ def load_sql_based_model(
meta = d.Model(expressions=[]) # Dummy meta node
expressions.insert(0, meta)

# We deliberately hold off rendering some properties at load time because there is not enough information available
# at load time to render them. They will get rendered later at evaluation time
unrendered_properties = {}
unrendered_merge_filter = None
unrendered_signals = None
unrendered_audits = None

for prop in meta.expressions:
if prop.name.lower() == "signals":
unrendered_signals = prop.args.get("value")
if prop.name.lower() == "audits":
unrendered_audits = prop.args.get("value")
if (
prop_name = prop.name.lower()
if prop_name in ("signals", "audits", "physical_properties"):
unrendered_properties[prop_name] = prop.args.get("value")
elif (
prop.name.lower() == "kind"
and (value := prop.args.get("value"))
and value.name.lower() == "incremental_by_unique_key"
Expand Down Expand Up @@ -1839,12 +1859,9 @@ def load_sql_based_model(
**kwargs,
}

# signals, audits and merge_filter must remain unrendered, so that they can be rendered later at evaluation runtime
if unrendered_signals:
meta_fields["signals"] = unrendered_signals

if unrendered_audits:
meta_fields["audits"] = unrendered_audits
# Discard the potentially half-rendered versions of these properties and replace them with the
# original unrendered versions. They will get rendered properly at evaluation time
meta_fields.update(unrendered_properties)

if unrendered_merge_filter:
for idx, kind_prop in enumerate(meta_fields["kind"].expressions):
Expand Down Expand Up @@ -2146,6 +2163,10 @@ def _create_model(
statements.extend(kwargs["post_statements"])
if "on_virtual_update" in kwargs:
statements.extend(kwargs["on_virtual_update"])
if physical_properties := kwargs.get("physical_properties"):
# to allow variables like @gateway to be used in physical_properties
# since rendering shifted from load time to run time
statements.extend(physical_properties)

jinja_macro_references, used_variables = extract_macro_references_and_variables(
*(gen(e) for e in statements)
Expand Down
Loading

0 comments on commit e239d4b

Please sign in to comment.