-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathutils.py
144 lines (116 loc) · 4.91 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# Copyright (c) 2023-present, Royal Bank of Canada.
# Copyright (c) 2021-present, Yuzhe Yang
# All rights reserved.
#
# This source code is licensed under the license found in the
# LICENSE file in the root directory of this source tree.
#
########################################################################################
# Code is based on the LDS and FDS (https://arxiv.org/pdf/2102.09554.pdf) implementation
# from https://github.com/YyzHarry/imbalanced-regression/tree/main/imdb-wiki-dir
# by Yuzhe Yang et al.
########################################################################################
import os
import shutil
import torch
import logging
import numpy as np
from scipy.ndimage import gaussian_filter1d
from scipy.signal.windows import triang
class AverageMeter(object):
def __init__(self, name, fmt=':f'):
self.name = name
self.fmt = fmt
self.reset()
def reset(self):
self.val = 0
self.avg = 0
self.sum = 0
self.count = 0
def update(self, val, n=1):
self.val = val
self.sum += val * n
self.count += n
self.avg = self.sum / self.count
def __str__(self):
fmtstr = '{name} {val' + self.fmt + '} ({avg' + self.fmt + '})'
return fmtstr.format(**self.__dict__)
class ProgressMeter(object):
def __init__(self, num_batches, meters, prefix=""):
self.batch_fmtstr = self._get_batch_fmtstr(num_batches)
self.meters = meters
self.prefix = prefix
def display(self, batch):
entries = [self.prefix + self.batch_fmtstr.format(batch)]
entries += [str(meter) for meter in self.meters]
logging.info('\t'.join(entries))
@staticmethod
def _get_batch_fmtstr(num_batches):
num_digits = len(str(num_batches // 1))
fmt = '{:' + str(num_digits) + 'd}'
return '[' + fmt + '/' + fmt.format(num_batches) + ']'
def query_yes_no(question):
""" Ask a yes/no question via input() and return their answer. """
valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False}
prompt = " [Y/n] "
while True:
print(question + prompt, end=':')
choice = input().lower()
if choice == '':
return valid['y']
elif choice in valid:
return valid[choice]
else:
print("Please respond with 'yes' or 'no' (or 'y' or 'n').\n")
def prepare_folders(args):
folders_util = [args.store_root, os.path.join(args.store_root, args.store_name)]
if os.path.exists(folders_util[-1]) and not args.resume and not args.pretrained and not args.evaluate:
if query_yes_no('overwrite previous folder: {} ?'.format(folders_util[-1])):
shutil.rmtree(folders_util[-1])
print(folders_util[-1] + ' removed.')
else:
raise RuntimeError('Output folder {} already exists'.format(folders_util[-1]))
for folder in folders_util:
if not os.path.exists(folder):
print(f"===> Creating folder: {folder}")
os.mkdir(folder)
def adjust_learning_rate(optimizer, epoch, args):
lr = args.lr
for milestone in args.schedule:
lr *= 0.1 if epoch >= milestone else 1.
for param_group in optimizer.param_groups:
param_group['lr'] = lr
def save_checkpoint(args, state, is_best, prefix=''):
filename = f"{args.store_root}/{args.store_name}/{prefix}ckpt.pth.tar"
torch.save(state, filename)
if is_best:
logging.info("===> Saving current best checkpoint...")
shutil.copyfile(filename, filename.replace('pth.tar', 'best.pth.tar'))
def save_checkpoint_per_epoch(args, state, epoch, prefix=''):
filename = f"{args.store_root}/{args.store_name}/{prefix}ckpt_ep"+str(epoch)+".pth.tar"
torch.save(state, filename)
def calibrate_mean_var(matrix, m1, v1, m2, v2, clip_min=0.1, clip_max=10):
if torch.sum(v1) < 1e-10:
return matrix
if (v1 == 0.).any():
valid = (v1 != 0.)
factor = torch.clamp(v2[valid] / v1[valid], clip_min, clip_max)
matrix[:, valid] = (matrix[:, valid] - m1[valid]) * torch.sqrt(factor) + m2[valid]
return matrix
factor = torch.clamp(v2 / v1, clip_min, clip_max)
return (matrix - m1) * torch.sqrt(factor) + m2
def get_lds_kernel_window(kernel, ks, sigma):
assert kernel in ['gaussian', 'triang', 'laplace']
half_ks = (ks - 1) // 2
if kernel == 'gaussian':
base_kernel = [0.] * half_ks + [1.] + [0.] * half_ks
kernel_window = gaussian_filter1d(base_kernel, sigma=sigma) / max(gaussian_filter1d(base_kernel, sigma=sigma))
elif kernel == 'triang':
kernel_window = triang(ks)
else:
laplace = lambda x: np.exp(-abs(x) / sigma) / (2. * sigma)
kernel_window = list(map(laplace, np.arange(-half_ks, half_ks + 1))) / max(map(laplace, np.arange(-half_ks, half_ks + 1)))
return kernel_window
def get_lambda(epoch, max_epoch):
p = epoch / max_epoch
return 2. / (1+np.exp(-10.*p)) - 1.