-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathxscheduler.py
103 lines (83 loc) · 3.83 KB
/
xscheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import numpy as np
class XCosineScheduler():
def __init__(self, max_lr, final_lr, total_epochs, steps_per_epoch, warmup_steps):
assert max_lr >= final_lr
self.max_lr = max_lr
self.final_lr = final_lr
self.warmup_steps = warmup_steps
self.total_steps_cos = total_epochs * steps_per_epoch - warmup_steps
def get_lr(self, step):
if step < self.warmup_steps:
return self.final_lr + step * (self.max_lr - self.final_lr) / self.warmup_steps
else:
delta_step = min(step - self.warmup_steps, self.total_steps_cos)
lr = self.final_lr + 0.5 * (self.max_lr - self.final_lr) * (1 + np.cos(delta_step / self.total_steps_cos * np.pi))
return lr
def adjust_lr(self, optimizer, step):
lr = self.get_lr(step)
for g in optimizer.param_groups:
g['lr'] = lr
class XLinearScheduler():
def __init__(self, max_lr, final_lr, total_epochs, steps_per_epoch, warmup_steps):
assert max_lr >= final_lr
self.max_lr = max_lr
self.final_lr = final_lr
self.warmup_steps = warmup_steps
self.total_steps_linear = total_epochs * steps_per_epoch - warmup_steps
def get_lr(self, step):
if step < self.warmup_steps:
return self.final_lr + step * (self.max_lr - self.final_lr) / self.warmup_steps
else:
delta_step = min(step - self.warmup_steps, self.total_steps_linear)
delta_final = self.total_steps_linear - delta_step
lr = self.final_lr + delta_final * (self.max_lr - self.final_lr) / self.total_steps_linear
return lr
def adjust_lr(self, optimizer, step):
lr = self.get_lr(step)
for g in optimizer.param_groups:
g['lr'] = lr
class XCosineStepScheduler():
def __init__(self, max_lr, final_lr, total_epochs, steps_per_epoch, warmup_steps, base):
assert max_lr >= final_lr
self.max_lr = max_lr
self.final_lr = final_lr
self.warmup_steps = warmup_steps
self.total_steps_cos = total_epochs * steps_per_epoch - warmup_steps
self.type = type
self.base = base
def get_lr(self, step):
if step < self.warmup_steps:
return self.final_lr + step * (self.max_lr - self.final_lr) / self.warmup_steps
else:
delta_step = min(step - self.warmup_steps, self.total_steps_cos)
lr = self.final_lr + 0.5 * (self.max_lr - self.final_lr) * (1 + np.cos(delta_step / self.total_steps_cos * np.pi))
lr = self.base ** np.round(np.log(lr) / np.log(self.base))
return lr
def adjust_lr(self, optimizer, step):
lr = self.get_lr(step)
for g in optimizer.param_groups:
g['lr'] = lr
class XStepScheduler():
def __init__(self, max_lr, final_lr, total_epochs, steps_per_epoch, warmup_steps, decay_points, gamma=0.1):
assert max_lr >= final_lr
self.max_lr = max_lr
self.final_lr = final_lr
self.warmup_steps = warmup_steps
self.decay_points= decay_points
self.decay_points.insert(0, 0)
self.decay_points.append(total_epochs + 5)
self.decay_points = np.array(self.decay_points) * steps_per_epoch
self.num_decay_points = len(decay_points)
self.gamma = gamma
def get_lr(self, step):
if step < self.warmup_steps:
return self.final_lr + step * (self.max_lr - self.final_lr) / self.warmup_steps
else:
for i in range(self.num_decay_points):
if step >= self.decay_points[i] and step < self.decay_points[i + 1]:
lr = self.max_lr * (self.gamma ** i)
return lr
def adjust_lr(self, optimizer, step):
lr = self.get_lr(step)
for g in optimizer.param_groups:
g['lr'] = lr