Skip to content

Commit

Permalink
Fix: Make sure downstream models for which restatements are disabled …
Browse files Browse the repository at this point in the history
…are not auto-restated (#3597)
  • Loading branch information
izeigerman committed Jan 8, 2025
1 parent 34b9a1a commit 920ad1c
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 0 deletions.
4 changes: 4 additions & 0 deletions sqlmesh/core/snapshot/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -1081,6 +1081,8 @@ def update_next_auto_restatement_ts(self, execution_time: TimeLike) -> t.Optiona

def apply_pending_restatement_intervals(self) -> None:
"""Applies the pending restatement intervals to the snapshot's intervals."""
if not self.is_model or self.model.disable_restatement:
return
for pending_restatement_interval in self.pending_restatement_intervals:
logger.info(
"Applying the auto restated interval (%s, %s) to snapshot %s",
Expand Down Expand Up @@ -1956,6 +1958,8 @@ def apply_auto_restatements(
if s_id not in snapshots:
continue
snapshot = snapshots[s_id]
if not snapshot.is_model or snapshot.model.disable_restatement:
continue

next_auto_restated_interval = snapshot.get_next_auto_restatement_interval(execution_time)
auto_restated_intervals = [
Expand Down
69 changes: 69 additions & 0 deletions tests/core/test_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -2590,3 +2590,72 @@ def test_apply_auto_restatements(make_snapshot):
assert snapshot_f.intervals == [
(to_timestamp("2020-01-01"), to_timestamp("2020-01-06")),
]


def test_apply_auto_restatements_disable_restatement_downstream(make_snapshot):
# Hourly upstream model with auto restatement intervals set to 24
model_a = SqlModel(
name="test_model_a",
kind=IncrementalByTimeRangeKind(
time_column=TimeColumn(column="ds"),
auto_restatement_cron="0 10 * * *",
auto_restatement_intervals=24,
),
cron="@hourly",
query=parse_one("SELECT 1, ds FROM name"),
)
snapshot_a = make_snapshot(model_a, version="1")
snapshot_a.add_interval("2020-01-01", "2020-01-06 09:00:00")
snapshot_a.next_auto_restatement_ts = to_timestamp("2020-01-06 10:00:00")

# Daily downstream model with disable restatement
model_b = SqlModel(
name="test_model_b",
kind=IncrementalByTimeRangeKind(
time_column=TimeColumn(column="ds"),
disable_restatement=True,
),
cron="@daily",
query=parse_one("SELECT ds FROM test_model_a"),
)
snapshot_b = make_snapshot(model_b, nodes={model_a.fqn: model_a}, version="2")
snapshot_b.add_interval("2020-01-01", "2020-01-05")
assert snapshot_a.snapshot_id in snapshot_b.parents

restated_intervals = apply_auto_restatements(
{
snapshot_a.snapshot_id: snapshot_a,
snapshot_b.snapshot_id: snapshot_b,
},
"2020-01-06 10:01:00",
)
assert sorted(restated_intervals, key=lambda x: x.name) == [
SnapshotIntervals(
name=snapshot_a.name,
identifier=snapshot_a.identifier,
version=snapshot_a.version,
intervals=[],
dev_intervals=[],
pending_restatement_intervals=[
(to_timestamp("2020-01-05 10:00:00"), to_timestamp("2020-01-06 10:00:00"))
],
),
]

assert snapshot_a.next_auto_restatement_ts == to_timestamp("2020-01-07 10:00:00")
assert snapshot_b.next_auto_restatement_ts is None

assert snapshot_a.intervals == [
(to_timestamp("2020-01-01"), to_timestamp("2020-01-05 10:00:00")),
]
assert snapshot_b.intervals == [
(to_timestamp("2020-01-01"), to_timestamp("2020-01-06")),
]

snapshot_b.pending_restatement_intervals = [
(to_timestamp("2020-01-03"), to_timestamp("2020-01-06"))
]
snapshot_b.apply_pending_restatement_intervals()
assert snapshot_b.intervals == [
(to_timestamp("2020-01-01"), to_timestamp("2020-01-06")),
]

0 comments on commit 920ad1c

Please sign in to comment.