Skip to content

Commit fdb1e65

Browse files
authored
PLA-1052 add scheduler delay instrumentation (#28425)
## Summary & Motivation This allows us to instrument how far behind the expected "next iteration time" for a schedule we're running
1 parent 46734d8 commit fdb1e65

File tree

2 files changed

+27
-1
lines changed

2 files changed

+27
-1
lines changed

python_modules/dagster/dagster/_daemon/daemon.py

+6
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,11 @@ class SchedulerDaemon(DagsterDaemon):
272272
def daemon_type(cls) -> str:
273273
return "SCHEDULER"
274274

275+
def scheduler_delay_instrumentation(
276+
self, scheduler_id: str, next_iteration_timestamp: float, now_timestamp: float
277+
) -> None:
278+
pass
279+
275280
def core_loop(
276281
self,
277282
workspace_process_context: IWorkspaceProcessContext,
@@ -287,6 +292,7 @@ def core_loop(
287292
scheduler.max_catchup_runs,
288293
scheduler.max_tick_retries,
289294
shutdown_event,
295+
self.scheduler_delay_instrumentation,
290296
)
291297

292298

python_modules/dagster/dagster/_scheduler/scheduler.py

+21-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from collections.abc import Generator, Mapping, Sequence
99
from concurrent.futures import Future, ThreadPoolExecutor
1010
from contextlib import AbstractContextManager, ExitStack
11-
from typing import TYPE_CHECKING, NamedTuple, Optional, Union, cast
11+
from typing import TYPE_CHECKING, Callable, NamedTuple, Optional, Union, cast
1212

1313
from typing_extensions import Self
1414

@@ -49,6 +49,16 @@
4949
from dagster._daemon.daemon import DaemonIterator
5050

5151

52+
# scheduler_id, next_iteration_timestamp, now
53+
SchedulerDelayInstrumentation = Callable[[str, float, float], None]
54+
55+
56+
def default_scheduler_delay_instrumentation(
57+
scheduler_id: str, next_iteration_timestamp: float, now_timestamp: float
58+
) -> None:
59+
pass
60+
61+
5262
# how often do we update the job row in the database with the last iteration timestamp. This
5363
# creates a checkpoint so that if the cron schedule changes, we don't try to backfill schedule ticks
5464
# from the start of the schedule, just since the last recorded iteration interval.
@@ -189,6 +199,7 @@ def execute_scheduler_iteration_loop(
189199
max_catchup_runs: int,
190200
max_tick_retries: int,
191201
shutdown_event: threading.Event,
202+
scheduler_delay_instrumentation: SchedulerDelayInstrumentation = default_scheduler_delay_instrumentation,
192203
) -> "DaemonIterator":
193204
from dagster._daemon.daemon import SpanMarker
194205

@@ -235,6 +246,7 @@ def execute_scheduler_iteration_loop(
235246
scheduler_run_futures=scheduler_run_futures,
236247
max_catchup_runs=max_catchup_runs,
237248
max_tick_retries=max_tick_retries,
249+
scheduler_delay_instrumentation=scheduler_delay_instrumentation,
238250
)
239251
except Exception:
240252
error_info = DaemonErrorCapture.process_exception(
@@ -267,6 +279,7 @@ def launch_scheduled_runs(
267279
max_catchup_runs: int = DEFAULT_MAX_CATCHUP_RUNS,
268280
max_tick_retries: int = 0,
269281
debug_crash_flags: Optional[DebugCrashFlags] = None,
282+
scheduler_delay_instrumentation: SchedulerDelayInstrumentation = default_scheduler_delay_instrumentation,
270283
) -> "DaemonIterator":
271284
instance = workspace_process_context.instance
272285

@@ -412,6 +425,13 @@ def launch_scheduled_runs(
412425
# Not enough time has passed for this schedule, don't bother creating a thread
413426
continue
414427

428+
if previous_iteration_times:
429+
scheduler_delay_instrumentation(
430+
schedule.selector_id,
431+
previous_iteration_times.next_iteration_timestamp,
432+
now_timestamp,
433+
)
434+
415435
future = threadpool_executor.submit(
416436
launch_scheduled_runs_for_schedule,
417437
workspace_process_context,

0 commit comments

Comments
 (0)