-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscheduler.go
174 lines (155 loc) · 3.93 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package cronSchedule
import (
"fmt"
"sort"
"time"
)
//
// Phase 相对于零点的偏移量(以秒为单位),可以有多个或为空。
// Period 周期
type CronJob interface {
// 任务的唯一标识
Name() string
// 具体的任务执行方法
Process() error
// 是否启用该任务
IfActive() bool
// 发生 panic 后是否自动重启
IfReboot() bool
}
// 对CronJob接口的包装
type CronJobWrapper struct {
phase []int
period int
job CronJob
count int
}
func (c *CronJobWrapper) name() string {
return c.job.Name()
}
func (c *CronJobWrapper) Process() error {
return c.job.Process()
}
func (c *CronJobWrapper) ifActive() bool {
return c.job.IfActive()
}
func (c *CronJobWrapper) ifReboot() bool {
return c.job.IfReboot()
}
type Scheduler struct {
jobs []*CronJobWrapper
logger Logger
nameSet map[string]bool
}
// 生成一个Scheduler实例
func New() *Scheduler {
return &Scheduler{
jobs: nil,
nameSet: make(map[string]bool),
logger: defaultLogger,
}
}
// 将任务注册到Scheduler
// phase: 相对于零点的偏移量(以秒为单位),可以有多个或为空。例如
// []int{} => 从启动时刻开始执行
// []int{3600, 22*3600} => 在 1:00 和 22:00 执行
//
// period: 任务循环的周期(单位为秒)
// 若Phase为空,则Period为任意大于0的值
// 若Phase不为空,则Period必须能被24*3600整除
func (sche *Scheduler) Register(phase []int, period int, job CronJob) {
// phase 排序
if len(phase) > 1 {
sort.Slice(phase, func(i, j int) bool {
return phase[i] < phase[j]
})
}
jobName := job.Name()
if _, ok := sche.nameSet[jobName]; jobName == "" || ok {
// 任务名为空或重复的情况
sche.logger.ErrorF("CronScheduler register failed to register job, name=%s", jobName)
} else {
sche.jobs = append(sche.jobs, &CronJobWrapper{
job: job,
phase: phase,
period: period,
})
sche.nameSet[job.Name()] = true
}
}
func (sche *Scheduler) Start() {
sche.logger.InfoF("CronScheduler starting......")
for i := 0; i < len(sche.jobs); i++ {
if sche.jobs[i].ifActive() && validateJob(sche.jobs[i]) {
go sche.run(sche.jobs[i])
}
}
}
func (sche *Scheduler) SetLogger(logger Logger) {
sche.logger = logger
}
func (sche *Scheduler) run(job *CronJobWrapper) {
sche.logger.InfoF("Cron scheduler start job[name=%s]", job.name())
for {
// 计算下一次运行时间
nextTimeInterval := calculateNextTime(job.phase, job.period, job.count)
if nextTimeInterval >= 0 {
time.Sleep(time.Duration(nextTimeInterval) * time.Second)
} else {
sche.logger.ErrorF("CronScheduler job[name=%s] calculateNextTime wrong number=%d", job.name(), nextTimeInterval)
break
}
err := func() (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("job %s panic, recover=%v", job.name(), r)
sche.logger.ErrorF("CronScheduler job[name=%s] recover err=%s", job.name(), err.Error())
}
}()
return job.Process()
}()
// 对任务执行进行计数
job.count++
// 如果出错不选择重启,那么直接退出
if err != nil && !job.ifReboot() {
sche.logger.ErrorF("CronScheduler job[name=%s] exit err=%s", job.name(), err.Error())
break
}
}
}
var (
SecondOfDay = 24 * 3600
)
// 计算下一次任务运行的时间
// phase 初相位
// period 周期
func calculateNextTime(phase []int, period int, calCount int) int {
if len(phase) == 0 {
if calCount == 0 {
return 0
} else {
return period
}
} else {
nowTimePhase := int((time.Now().Unix() + 28800) % 86400)
var i = 0
for i = 0; i < len(phase); i++ {
if nowTimePhase < phase[i] {
break
}
}
if i == len(phase) {
return period - nowTimePhase + phase[0]
} else {
return phase[i] - nowTimePhase
}
}
}
// 校验任务是否可以运行
func validateJob(job *CronJobWrapper) bool {
if len(job.phase) == 0 {
return job.period > 0
} else {
return job.period >= SecondOfDay && job.period%SecondOfDay == 0
}
}