forked from pganalyze/collector
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscheduler.go
101 lines (84 loc) · 2.67 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package scheduler
import (
"context"
"time"
"github.com/gorhill/cronexpr"
"github.com/pganalyze/collector/util"
)
type Group struct {
interval *cronexpr.Expression
}
func (group Group) Schedule(ctx context.Context, runner func(context.Context), logger *util.Logger, logName string) {
go func() {
for {
nextExecutions := group.interval.NextN(time.Now(), 2)
delay := time.Until(nextExecutions[0])
logger.PrintVerbose("Scheduled next run for %s in %+v", logName, delay)
select {
case <-ctx.Done():
return
case <-time.After(delay):
func() {
// Cancel runner at latest right before next scheduled execution should
// occur, to prevent skipping over runner executions by accident.
deadline := nextExecutions[1].Add(-1 * time.Second)
// Extend the deadline of very short runs to avoid pointless cancellations.
if nextExecutions[1].Sub(nextExecutions[0]) < 19*time.Second {
deadline = nextExecutions[0].Add(19 * time.Second)
}
ctx, cancel := context.WithDeadline(ctx, deadline)
defer cancel()
runner(ctx)
}()
}
}
}()
}
// ScheduleSecondary - Behaves almost like Schedule, but ignores the point in time
// where the primary group also has a run (to avoid overlapping statistics)
func (group Group) ScheduleSecondary(ctx context.Context, runner func(context.Context), logger *util.Logger, logName string, primaryGroup Group) {
go func() {
for {
timeNow := time.Now()
delay := group.interval.Next(timeNow).Sub(timeNow)
delayPrimary := primaryGroup.interval.Next(timeNow).Sub(timeNow)
// Make sure to not run more often than once a second - this can happen
// due to rounding errors in the interval logic
if delay.Seconds() < 1.0 {
time.Sleep(delay)
continue
}
logger.PrintVerbose("Scheduled next run for %s in %+v", logName, delay)
select {
case <-ctx.Done():
return
case <-time.After(delay):
if int(delay.Seconds()) == int(delayPrimary.Seconds()) {
logger.PrintVerbose("Skipping run for %s since it overlaps with primary group time", logName)
} else {
runner(ctx)
}
}
}
}()
}
const FullSnapshotsPerHour = 6
func GetSchedulerGroups() (groups map[string]Group, err error) {
tenSecondInterval, err := cronexpr.Parse("*/10 * * * * * *")
if err != nil {
return
}
oneMinuteInterval, err := cronexpr.Parse("0 * * * * * *")
if err != nil {
return
}
tenMinuteInterval, err := cronexpr.Parse("0 */10 * * * * *")
if err != nil {
return
}
groups = make(map[string]Group)
groups["stats"] = Group{interval: tenMinuteInterval}
groups["activity"] = Group{interval: tenSecondInterval}
groups["query_stats"] = Group{interval: oneMinuteInterval}
return
}