diff --git a/sqlmesh/core/snapshot/definition.py b/sqlmesh/core/snapshot/definition.py index 4faeed348..397966cef 100644 --- a/sqlmesh/core/snapshot/definition.py +++ b/sqlmesh/core/snapshot/definition.py @@ -1081,6 +1081,8 @@ def update_next_auto_restatement_ts(self, execution_time: TimeLike) -> t.Optiona def apply_pending_restatement_intervals(self) -> None: """Applies the pending restatement intervals to the snapshot's intervals.""" + if not self.is_model or self.model.disable_restatement: + return for pending_restatement_interval in self.pending_restatement_intervals: logger.info( "Applying the auto restated interval (%s, %s) to snapshot %s", @@ -1956,6 +1958,8 @@ def apply_auto_restatements( if s_id not in snapshots: continue snapshot = snapshots[s_id] + if not snapshot.is_model or snapshot.model.disable_restatement: + continue next_auto_restated_interval = snapshot.get_next_auto_restatement_interval(execution_time) auto_restated_intervals = [ diff --git a/tests/core/test_snapshot.py b/tests/core/test_snapshot.py index 8e739a5cb..7c0f13c42 100644 --- a/tests/core/test_snapshot.py +++ b/tests/core/test_snapshot.py @@ -2590,3 +2590,72 @@ def test_apply_auto_restatements(make_snapshot): assert snapshot_f.intervals == [ (to_timestamp("2020-01-01"), to_timestamp("2020-01-06")), ] + + +def test_apply_auto_restatements_disable_restatement_downstream(make_snapshot): + # Hourly upstream model with auto restatement intervals set to 24 + model_a = SqlModel( + name="test_model_a", + kind=IncrementalByTimeRangeKind( + time_column=TimeColumn(column="ds"), + auto_restatement_cron="0 10 * * *", + auto_restatement_intervals=24, + ), + cron="@hourly", + query=parse_one("SELECT 1, ds FROM name"), + ) + snapshot_a = make_snapshot(model_a, version="1") + snapshot_a.add_interval("2020-01-01", "2020-01-06 09:00:00") + snapshot_a.next_auto_restatement_ts = to_timestamp("2020-01-06 10:00:00") + + # Daily downstream model with disable restatement + model_b = SqlModel( + name="test_model_b", + kind=IncrementalByTimeRangeKind( + time_column=TimeColumn(column="ds"), + disable_restatement=True, + ), + cron="@daily", + query=parse_one("SELECT ds FROM test_model_a"), + ) + snapshot_b = make_snapshot(model_b, nodes={model_a.fqn: model_a}, version="2") + snapshot_b.add_interval("2020-01-01", "2020-01-05") + assert snapshot_a.snapshot_id in snapshot_b.parents + + restated_intervals = apply_auto_restatements( + { + snapshot_a.snapshot_id: snapshot_a, + snapshot_b.snapshot_id: snapshot_b, + }, + "2020-01-06 10:01:00", + ) + assert sorted(restated_intervals, key=lambda x: x.name) == [ + SnapshotIntervals( + name=snapshot_a.name, + identifier=snapshot_a.identifier, + version=snapshot_a.version, + intervals=[], + dev_intervals=[], + pending_restatement_intervals=[ + (to_timestamp("2020-01-05 10:00:00"), to_timestamp("2020-01-06 10:00:00")) + ], + ), + ] + + assert snapshot_a.next_auto_restatement_ts == to_timestamp("2020-01-07 10:00:00") + assert snapshot_b.next_auto_restatement_ts is None + + assert snapshot_a.intervals == [ + (to_timestamp("2020-01-01"), to_timestamp("2020-01-05 10:00:00")), + ] + assert snapshot_b.intervals == [ + (to_timestamp("2020-01-01"), to_timestamp("2020-01-06")), + ] + + snapshot_b.pending_restatement_intervals = [ + (to_timestamp("2020-01-03"), to_timestamp("2020-01-06")) + ] + snapshot_b.apply_pending_restatement_intervals() + assert snapshot_b.intervals == [ + (to_timestamp("2020-01-01"), to_timestamp("2020-01-06")), + ]