Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat!: Adjust physical_properties evaluation and add macro to resolve physical table names #3772

Merged
merged 4 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions docs/concepts/macros/sqlmesh_macros.md
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,46 @@ Note: This is DuckDB SQL and other dialects will be transpiled accordingly.
- Recursive CTEs (common table expressions) will be used for `Redshift / MySQL / MSSQL`.
- For `MSSQL` in particular, there's a recursion limit of approximately 100. If this becomes a problem, you can add an `OPTION (MAXRECURSION 0)` clause after the date spine macro logic to remove the limit. This applies for long date ranges.

### @RESOLVE_TEMPLATE

`@resolve_template` is a helper macro intended to be used in situations where you need to gain access to the *components* of the physical object name. It's intended for use in the following situations:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why template? i'm not sure i understand the name, seems a bit too generic

Copy link
Collaborator Author

@erindru erindru Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe @render_template would be more appropriate?

It's essentially rendering an in-line template with @this_model broken out into its components and made available as @{catalog_name}, @{schema_name} and @{table_name} so the user can recombine those as required

My original implementation added these as variables to the main evaluation context so users could access them directly, something like:

physical_properties (
  location = @'s3://bucket/@{catalog_name}/@{schema_name}/@{table_name}'
)

but there was a preference to create a macro instead?


- Providing explicit control over table locations on a per-model basis for engines that decouple storage and compute (such as Athena, Trino, Spark etc)
- Generating references to engine-specific metadata tables that are derived from the physical table name, such as the [`<table>$properties`](https://trino.io/docs/current/connector/iceberg.html#metadata-tables) metadata table in Trino.

Under the hood, it uses the `@this_model` variable so it can only be used during the `creating` and `evaluation` [runtime stages](./macro_variables.md#runtime-variables). Attempting to use it at the `loading` runtime stage will result in a no-op.

The `@resolve_template` macro supports the following arguments:

- `template` - The string template to render into an AST node
- `mode` - What type of SQLGlot AST node to return after rendering the template. Valid values are `literal` or `table`. Defaults to `literal`.

The `template` can contain the following placeholders that will be substituted:

- `@{catalog_name}` - The name of the catalog, eg `datalake`
- `@{schema_name}` - The name of the physical schema that SQLMesh is using for the model version table, eg `sqlmesh__landing`
- `@{table_name}` - The name of the physical table that SQLMesh is using for the model version, eg `landing__customers__2517971505`

It can be used in a `MODEL` block:

```sql linenums="1" hl_lines="5"
MODEL (
name datalake.landing.customers,
...
physical_properties (
location = @resolve_template('s3://warehouse-data/@{catalog_name}/prod/@{schema_name}/@{table_name}')
)
);
-- CREATE TABLE "datalake"."sqlmesh__landing"."landing__customers__2517971505" ...
-- WITH (location = 's3://warehouse-data/datalake/prod/sqlmesh__landing/landing__customers__2517971505')
```

And also within a query, using `mode := 'table'`:

```sql linenums="1"
SELECT * FROM @resolve_template('@{catalog_name}.@{schema_name}.@{table_name}$properties', mode := 'table')
-- SELECT * FROM "datalake"."sqlmesh__landing"."landing__customers__2517971505$properties"
```

### @AND

Expand Down
8 changes: 5 additions & 3 deletions docs/integrations/engines/trino.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,14 +185,16 @@ This would perform the following mappings:

Often, you dont need to configure an explicit table location because if you have configured explicit schema locations, table locations are automatically inferred by Trino to be a subdirectory under the schema location.

However, if you need to, you can configure an explicit table location by adding a `location` property to the model `physical_properties`:
However, if you need to, you can configure an explicit table location by adding a `location` property to the model `physical_properties`.

```
Note that you need to use the [@resolve_template](../../concepts/macros/sqlmesh_macros.md#resolve_template) macro to generate a unique table location for each model version. Otherwise, all model versions will be written to the same location and clobber each other.

```sql hl_lines="5"
MODEL (
name staging.customers,
kind FULL,
physical_properties (
location = 's3://warehouse/staging/customers'
location = @resolve_template('s3://warehouse/@{catalog_name}/@{schema_name}/@{table_name}')
)
);

Expand Down
53 changes: 53 additions & 0 deletions sqlmesh/core/macros.py
Original file line number Diff line number Diff line change
Expand Up @@ -1183,6 +1183,59 @@ def date_spine(
return exp.select(alias_name).from_(exploded)


@macro()
def resolve_template(
evaluator: MacroEvaluator,
template: exp.Literal,
mode: str = "literal",
) -> t.Union[exp.Literal, exp.Table]:
"""
Generates either a String literal or an exp.Table representing a physical table location, based on rendering the provided template String literal.

Note: It relies on the @this_model variable being available in the evaluation context (@this_model resolves to an exp.Table object
representing the current physical table).
Therefore, the @resolve_template macro must be used at creation or evaluation time and not at load time.

Args:
template: Template string literal. Can contain the following placeholders:
@{catalog_name} -> replaced with the catalog of the exp.Table returned from @this_model
@{schema_name} -> replaced with the schema of the exp.Table returned from @this_model
@{table_name} -> replaced with the name of the exp.Table returned from @this_model
mode: What to return.
'literal' -> return an exp.Literal string
'table' -> return an exp.Table

Example:
>>> from sqlglot import parse_one, exp
>>> from sqlmesh.core.macros import MacroEvaluator, RuntimeStage
>>> sql = "@resolve_template('s3://data-bucket/prod/@{catalog_name}/@{schema_name}/@{table_name}')"
>>> evaluator = MacroEvaluator(runtime_stage=RuntimeStage.CREATING)
>>> evaluator.locals.update({"this_model": exp.to_table("test_catalog.sqlmesh__test.test__test_model__2517971505")})
>>> evaluator.transform(parse_one(sql)).sql()
"'s3://data-bucket/prod/test_catalog/sqlmesh__test/test__test_model__2517971505'"
"""
if "this_model" in evaluator.locals:
this_model = exp.to_table(evaluator.locals["this_model"], dialect=evaluator.dialect)
template_str: str = template.this
result = (
template_str.replace("@{catalog_name}", this_model.catalog)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this just doing string replacement?, is this safe?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In short, yes. It's doing the same thing that this is doing to keep consistency. It deliberately mimics our macro syntax, is that what you meant by safety?

I was holding off "upgrading" it to actually run the template string through the macro evaluator via evaluator.evaluate(), figuring that could be easily added later if the need arose

.replace("@{schema_name}", this_model.db)
.replace("@{table_name}", this_model.name)
)

if mode.lower() == "table":
return exp.to_table(result, dialect=evaluator.dialect)
return exp.Literal.string(result)
elif evaluator.runtime_stage != RuntimeStage.LOADING.value:
# only error if we are CREATING, EVALUATING or TESTING and @this_model is not present; this could indicate a bug
# otherwise, for LOADING, it's a no-op
raise SQLMeshError(
"@this_model must be present in the macro evaluation context in order to use @resolve_template"
)

return template


def normalize_macro_name(name: str) -> str:
"""Prefix macro name with @ and upcase"""
return f"@{name.upper()}"
Expand Down
47 changes: 34 additions & 13 deletions sqlmesh/core/model/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,26 @@ def render_merge_filter(
raise SQLMeshError(f"Expected one expression but got {len(rendered_exprs)}")
return rendered_exprs[0].transform(d.replace_merge_table_aliases)

def render_physical_properties(self, **render_kwargs: t.Any) -> t.Dict[str, exp.Expression]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want the same behavior for virtual properties?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recording the result of our internal discussion here: maybe, but I couldnt think of a use-case and was trying to keep this PR small.

Since the load time rendering context is a subset of the runtime rendering context, if we were to later find a use-case to render virtual_properties at runtime then we could make the switch then without breaking backwards compatibility

def _render(expression: exp.Expression) -> exp.Expression:
# note: we use the _statement_renderer instead of _create_renderer because it sets model_fqn which
# in turn makes @this_model available in the evaluation context
rendered_exprs = self._statement_renderer(expression).render(**render_kwargs)

if not rendered_exprs:
raise SQLMeshError(
f"Expected rendering '{expression.sql(dialect=self.dialect)}' to return an expression"
)

if len(rendered_exprs) != 1:
raise SQLMeshError(
f"Expected one result when rendering '{expression.sql(dialect=self.dialect)}' but got {len(rendered_exprs)}"
)

return rendered_exprs[0]

return {k: _render(v) for k, v in self.physical_properties.items()}

def _create_renderer(self, expression: exp.Expression) -> ExpressionRenderer:
return ExpressionRenderer(
expression,
Expand Down Expand Up @@ -1784,16 +1804,16 @@ def load_sql_based_model(
meta = d.Model(expressions=[]) # Dummy meta node
expressions.insert(0, meta)

# We deliberately hold off rendering some properties at load time because there is not enough information available
# at load time to render them. They will get rendered later at evaluation time
unrendered_properties = {}
unrendered_merge_filter = None
unrendered_signals = None
unrendered_audits = None

for prop in meta.expressions:
if prop.name.lower() == "signals":
unrendered_signals = prop.args.get("value")
if prop.name.lower() == "audits":
unrendered_audits = prop.args.get("value")
if (
prop_name = prop.name.lower()
if prop_name in ("signals", "audits", "physical_properties"):
unrendered_properties[prop_name] = prop.args.get("value")
elif (
prop.name.lower() == "kind"
and (value := prop.args.get("value"))
and value.name.lower() == "incremental_by_unique_key"
Expand Down Expand Up @@ -1839,12 +1859,9 @@ def load_sql_based_model(
**kwargs,
}

# signals, audits and merge_filter must remain unrendered, so that they can be rendered later at evaluation runtime
if unrendered_signals:
meta_fields["signals"] = unrendered_signals

if unrendered_audits:
meta_fields["audits"] = unrendered_audits
# Discard the potentially half-rendered versions of these properties and replace them with the
# original unrendered versions. They will get rendered properly at evaluation time
meta_fields.update(unrendered_properties)
Comment on lines +1862 to +1864
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is breaking because it impacts the data hash. If someone used @gateway or other load-time-renderable variables within physical_properties, then they should get a diff due to this change.

This is similar to the recent when_matched refactor, IIRC.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes, I was meant to ask about this.

I'm still not 100% with how to write migrations and how to test they're working, they seem to be quite difficult / error prone to write with lots of pitfalls.

Also, is there any default upgrade logic that we get "for free" with a sqlglot version change?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spoke with @izeigerman and there isnt a way to migrate this automatically.

The problem is that the state would contain already-rendered macros and we wouldnt know what the unrendered version would look like without the project files.

When a migration occurs, access to the project files is not guaranteed. So we are just hoping that not many people were using macros / variables in physical_properties.

The workaround if this triggers a bunch of metadata changes and tries to rebuild everything would be to run a --forward-only plan to update state but not rebuild any tables

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good– sorry for not getting back to this thread earlier.


if unrendered_merge_filter:
for idx, kind_prop in enumerate(meta_fields["kind"].expressions):
Expand Down Expand Up @@ -2146,6 +2163,10 @@ def _create_model(
statements.extend(kwargs["post_statements"])
if "on_virtual_update" in kwargs:
statements.extend(kwargs["on_virtual_update"])
if physical_properties := kwargs.get("physical_properties"):
# to allow variables like @gateway to be used in physical_properties
# since rendering shifted from load time to run time
statements.extend(physical_properties)

jinja_macro_references, used_variables = extract_macro_references_and_variables(
*(gen(e) for e in statements)
Expand Down
Loading